Coverage for src / rtflite / pagination / core.py: 93%
117 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-28 05:09 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-28 05:09 +0000
1from collections.abc import Mapping, Sequence
2from typing import Any
4import polars as pl
5from pydantic import BaseModel, ConfigDict, Field
7from ..attributes import TableAttributes
8from ..fonts_mapping import FontName, FontNumber
9from ..strwidth import get_string_width
12class RTFPagination(BaseModel):
13 """Core pagination logic and calculations for RTF documents"""
15 model_config = ConfigDict(arbitrary_types_allowed=True)
17 page_width: float = Field(..., description="Page width in inches")
18 page_height: float = Field(..., description="Page height in inches")
19 margin: Sequence[float] = Field(
20 ..., description="Page margins [left, right, top, bottom, header, footer]"
21 )
22 nrow: int = Field(..., description="Maximum rows per page")
23 orientation: str = Field(..., description="Page orientation")
25 def calculate_available_space(self) -> Mapping[str, float]:
26 """Calculate available space for content on each page"""
27 content_width = (
28 self.page_width - self.margin[0] - self.margin[1]
29 ) # left + right margins
30 content_height = (
31 self.page_height - self.margin[2] - self.margin[3]
32 ) # top + bottom margins
33 header_space = self.margin[4] # header margin
34 footer_space = self.margin[5] # footer margin
36 return {
37 "content_width": content_width,
38 "content_height": content_height,
39 "header_space": header_space,
40 "footer_space": footer_space,
41 }
44class PageBreakCalculator(BaseModel):
45 """Calculates where page breaks should occur based on content and constraints"""
47 model_config = ConfigDict(arbitrary_types_allowed=True)
49 pagination: RTFPagination = Field(..., description="Pagination configuration")
51 def calculate_content_rows(
52 self,
53 df: pl.DataFrame,
54 col_widths: Sequence[float],
55 table_attrs: TableAttributes | None = None,
56 font_size: float = 9,
57 ) -> Sequence[int]:
58 """Calculate how many rows each content row will occupy when rendered
60 Args:
61 df: DataFrame containing the content
62 col_widths: Width of each column in inches
63 table_attrs: Table attributes containing cell height and font size info
64 font_size: Default font size in points
66 Returns:
67 List of row counts for each data row
68 """
69 row_counts = []
70 dim = df.shape
72 for row_idx in range(df.height):
73 max_lines_in_row = 1
75 for col_idx, col_width in enumerate(col_widths):
76 if col_idx < len(df.columns):
77 # Use proper polars column access - df[column_name][row_idx]
78 col_name = df.columns[col_idx]
79 cell_value = str(df[col_name][row_idx])
81 # Get actual font size from table attributes if available
82 actual_font_size = font_size
83 if table_attrs and hasattr(table_attrs, "text_font_size"):
84 from ..attributes import BroadcastValue
86 actual_font_size = BroadcastValue(
87 value=table_attrs.text_font_size, dimension=dim
88 ).iloc(row_idx, col_idx)
90 # Get actual font from table attributes if available
91 actual_font: FontName | FontNumber = (
92 1 # Default to font number 1 (Times New Roman)
93 )
94 if table_attrs and hasattr(table_attrs, "text_font"):
95 from ..attributes import BroadcastValue
97 font_value = BroadcastValue(
98 value=table_attrs.text_font, dimension=dim
99 ).iloc(row_idx, col_idx)
100 # Handle both FontNumber (int) and FontName (str)
101 if isinstance(font_value, int) and 1 <= font_value <= 10:
102 actual_font = font_value # type: ignore[assignment]
103 elif isinstance(font_value, str):
104 # If it's a string, use it directly
105 actual_font = font_value # type: ignore[assignment]
107 # Calculate how many lines this text will need
108 # Use the actual font from table attributes with actual font size
109 text_width = get_string_width(
110 cell_value,
111 font=actual_font,
112 font_size=actual_font_size, # type: ignore[arg-type]
113 )
114 lines_needed = max(1, int(text_width / col_width) + 1)
115 max_lines_in_row = max(max_lines_in_row, lines_needed)
117 # Account for cell height if specified in table attributes
118 cell_height_lines = 1
119 if table_attrs and hasattr(table_attrs, "cell_height"):
120 from ..attributes import BroadcastValue
122 cell_height = BroadcastValue(
123 value=table_attrs.cell_height, dimension=dim
124 ).iloc(row_idx, 0)
125 # Convert cell height from inches to approximate line count
126 # Assuming default line height of ~0.15 inches
127 cell_height_lines = max(1, int(cell_height / 0.15))
129 row_counts.append(max(max_lines_in_row, cell_height_lines))
131 return row_counts
133 def find_page_breaks(
134 self,
135 df: pl.DataFrame,
136 col_widths: Sequence[float],
137 page_by: Sequence[str] | None = None,
138 new_page: bool = False,
139 table_attrs: TableAttributes | None = None,
140 additional_rows_per_page: int = 0,
141 ) -> Sequence[tuple[int, int]]:
142 """Find optimal page break positions (r2rtf compatible)
144 Args:
145 df: DataFrame to paginate
146 col_widths: Column widths in inches
147 page_by: Columns to group by for page breaks
148 new_page: Whether to force new pages between groups
149 table_attrs: Table attributes for accurate row calculation
150 additional_rows_per_page: Additional rows per page (headers,
151 footnotes, sources)
153 Returns:
154 List of (start_row, end_row) tuples for each page
155 """
156 if df.height == 0:
157 return []
159 row_counts = self.calculate_content_rows(df, col_widths, table_attrs)
160 page_breaks = []
161 current_page_start = 0
162 current_page_rows = 0
164 # Calculate available rows for data (r2rtf compatible)
165 # In r2rtf, nrow includes ALL rows (headers, data, footnotes, sources)
166 available_data_rows_per_page = max(
167 1, self.pagination.nrow - additional_rows_per_page
168 )
170 for row_idx, row_height in enumerate(row_counts):
171 # Check if adding this row would exceed the page limit (including
172 # additional rows)
173 if current_page_rows + row_height > available_data_rows_per_page:
174 # Create page break before this row
175 if current_page_start < row_idx:
176 page_breaks.append((current_page_start, row_idx - 1))
177 current_page_start = row_idx
178 current_page_rows = row_height
179 else:
180 current_page_rows += row_height
182 # Handle group-based page breaks
183 # When page_by + new_page=True, force breaks at group boundaries
184 # When page_by alone, allow natural pagination with spanning rows mid-page
185 if page_by and new_page and row_idx < df.height - 1:
186 current_group = {col: df[col][row_idx] for col in page_by}
187 next_group = {col: df[col][row_idx + 1] for col in page_by}
189 if current_group != next_group:
190 # Force page break between groups
191 page_breaks.append((current_page_start, row_idx))
192 current_page_start = row_idx + 1
193 current_page_rows = 0
195 # Add final page
196 if current_page_start < df.height:
197 page_breaks.append((current_page_start, df.height - 1))
199 return page_breaks
202class ContentDistributor(BaseModel):
203 """Manages content distribution across multiple pages"""
205 model_config = ConfigDict(arbitrary_types_allowed=True)
207 pagination: RTFPagination = Field(..., description="Pagination configuration")
208 calculator: PageBreakCalculator = Field(..., description="Page break calculator")
210 def distribute_content(
211 self,
212 df: pl.DataFrame,
213 col_widths: Sequence[float],
214 page_by: Sequence[str] | None = None,
215 new_page: bool = False,
216 pageby_header: bool = True,
217 table_attrs: TableAttributes | None = None,
218 additional_rows_per_page: int = 0,
219 subline_by: Sequence[str] | None = None,
220 ) -> Sequence[Mapping[str, Any]]:
221 """Distribute content across multiple pages (r2rtf compatible)
223 Args:
224 df: DataFrame to distribute
225 col_widths: Column widths in inches
226 page_by: Columns to group by
227 new_page: Force new pages between groups
228 pageby_header: Repeat headers on new pages
229 table_attrs: Table attributes for accurate calculations
230 additional_rows_per_page: Additional rows per page (headers,
231 footnotes, sources)
232 subline_by: Columns to create subline headers by (forces new_page=True)
234 Returns:
235 List of page information dictionaries
236 """
237 # If subline_by is specified, treat it as page_by with new_page=True
238 if subline_by:
239 page_by = subline_by
240 new_page = True
242 page_breaks = self.calculator.find_page_breaks(
243 df, col_widths, page_by, new_page, table_attrs, additional_rows_per_page
244 )
245 pages = []
247 for page_num, (start_row, end_row) in enumerate(page_breaks):
248 page_df = df[start_row : end_row + 1]
250 page_info = {
251 "page_number": page_num + 1,
252 "total_pages": len(page_breaks),
253 "data": page_df,
254 "start_row": start_row,
255 "end_row": end_row,
256 "is_first_page": page_num == 0,
257 "is_last_page": page_num == len(page_breaks) - 1,
258 "needs_header": pageby_header or page_num == 0,
259 "col_widths": col_widths,
260 }
262 # Add subline_by header information for each page
263 if subline_by:
264 page_info["subline_header"] = self.get_group_headers(
265 df, subline_by, start_row
266 )
267 # Add page_by header information (spanning rows) on each page
268 # Note: new_page flag only controls forced page breaks, not spanning row creation
269 elif page_by:
270 # Get header for first group on this page
271 page_info["pageby_header_info"] = self.get_group_headers(
272 df, page_by, start_row
273 )
275 # Detect all group boundaries within this page
276 # This allows spanning rows to be inserted mid-page when new_page=False
277 group_boundaries = []
278 for row_idx in range(start_row, end_row):
279 if row_idx + 1 <= end_row:
280 current_group = {col: df[col][row_idx] for col in page_by}
281 next_group = {col: df[col][row_idx + 1] for col in page_by}
282 if current_group != next_group:
283 # Filter out divider values for the next group header
284 next_group_filtered = {
285 k: v for k, v in next_group.items() if str(v) != "-----"
286 }
288 # Group changes at row_idx+1 (relative to page: row_idx+1-start_row)
289 group_boundaries.append({
290 "absolute_row": row_idx + 1,
291 "page_relative_row": row_idx + 1 - start_row,
292 "group_values": next_group_filtered
293 })
295 if group_boundaries:
296 page_info["group_boundaries"] = group_boundaries
298 pages.append(page_info)
300 return pages
302 def get_group_headers(
303 self, df: pl.DataFrame, page_by: Sequence[str], start_row: int
304 ) -> Mapping[str, Any]:
305 """Get group header information for a page
307 Args:
308 df: Original DataFrame
309 page_by: Grouping columns
310 start_row: Starting row for this page
312 Returns:
313 Dictionary with group header information
314 """
315 if not page_by or start_row >= df.height:
316 return {}
318 group_values = {}
319 for col in page_by:
320 val = df[col][start_row]
321 # Filter out divider rows marked with "-----"
322 if str(val) != "-----":
323 group_values[col] = val
325 return {
326 "group_by_columns": page_by,
327 "group_values": group_values,
328 "header_text": " | ".join(
329 f"{col}: {val}" for col, val in group_values.items()
330 ),
331 }