Coverage for src / rtflite / pagination / core.py: 83%
188 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-08 04:50 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-08 04:50 +0000
1from collections.abc import Mapping, Sequence
3import polars as pl
4from pydantic import BaseModel, ConfigDict, Field
6from ..attributes import TableAttributes
7from ..fonts_mapping import FontName, FontNumber
8from ..strwidth import get_string_width
11class RTFPagination(BaseModel):
12 """Core pagination logic and calculations for RTF documents"""
14 model_config = ConfigDict(arbitrary_types_allowed=True)
16 page_width: float = Field(..., description="Page width in inches")
17 page_height: float = Field(..., description="Page height in inches")
18 margin: Sequence[float] = Field(
19 ..., description="Page margins [left, right, top, bottom, header, footer]"
20 )
21 nrow: int = Field(..., description="Maximum rows per page")
22 orientation: str = Field(..., description="Page orientation")
24 def calculate_available_space(self) -> Mapping[str, float]:
25 """Calculate available space for content on each page"""
26 content_width = (
27 self.page_width - self.margin[0] - self.margin[1]
28 ) # left + right margins
29 content_height = (
30 self.page_height - self.margin[2] - self.margin[3]
31 ) # top + bottom margins
32 header_space = self.margin[4] # header margin
33 footer_space = self.margin[5] # footer margin
35 return {
36 "content_width": content_width,
37 "content_height": content_height,
38 "header_space": header_space,
39 "footer_space": footer_space,
40 }
43class RowMetadata(BaseModel):
44 """Metadata for a single row's pagination information."""
46 model_config = ConfigDict(arbitrary_types_allowed=True)
48 row_index: int = Field(..., description="Original data row index (0-based)")
49 data_rows: int = Field(..., description="Number of rows the data content occupies")
50 pageby_header_rows: int = Field(
51 default=0, description="Number of rows the page_by header occupies"
52 )
53 subline_header_rows: int = Field(
54 default=0, description="Number of rows the subline_by header occupies"
55 )
56 column_header_rows: int = Field(
57 default=0, description="Number of rows for column headers"
58 )
59 total_rows: int = Field(..., description="Sum of all row counts")
60 page: int = Field(default=0, description="Assigned page number")
61 is_group_start: bool = Field(
62 default=False, description="True if this row starts a new page_by group"
63 )
64 is_subline_start: bool = Field(
65 default=False, description="True if this row starts a new subline_by group"
66 )
69class PageBreakCalculator(BaseModel):
70 """Calculates where page breaks should occur based on content and constraints"""
72 model_config = ConfigDict(arbitrary_types_allowed=True)
74 pagination: RTFPagination = Field(..., description="Pagination configuration")
76 def calculate_content_rows(
77 self,
78 df: pl.DataFrame,
79 col_widths: Sequence[float],
80 table_attrs: TableAttributes | None = None,
81 font_size: float = 9,
82 spanning_columns: Sequence[str] | None = None,
83 ) -> Sequence[int]:
84 """Calculate how many rows each content row will occupy when rendered
86 Args:
87 df: DataFrame containing the content
88 col_widths: Width of each column in inches
89 table_attrs: Table attributes containing cell height and font size info
90 font_size: Default font size in points
91 spanning_columns: Columns that should be treated as spanning the full width
93 Returns:
94 List of row counts for each data row
95 """
96 row_counts = []
97 dim = df.shape
98 spanning_columns = spanning_columns or []
99 total_width = sum(col_widths)
101 for row_idx in range(df.height):
102 max_lines_in_row = 1
104 for col_idx, col_width in enumerate(col_widths):
105 if col_idx < len(df.columns):
106 # Use proper polars column access - df[column_name][row_idx]
107 col_name = df.columns[col_idx]
108 cell_value = str(df[col_name][row_idx])
110 # Get actual font size from table attributes if available
111 actual_font_size = font_size
112 if table_attrs and hasattr(table_attrs, "text_font_size"):
113 from ..attributes import BroadcastValue
115 actual_font_size = BroadcastValue(
116 value=table_attrs.text_font_size, dimension=dim
117 ).iloc(row_idx, col_idx)
119 # Get actual font from table attributes if available
120 actual_font: FontName | FontNumber = (
121 1 # Default to font number 1 (Times New Roman)
122 )
123 if table_attrs and hasattr(table_attrs, "text_font"):
124 from ..attributes import BroadcastValue
126 font_value = BroadcastValue(
127 value=table_attrs.text_font, dimension=dim
128 ).iloc(row_idx, col_idx)
129 # Handle both FontNumber (int) and FontName (str)
130 if isinstance(font_value, int) and 1 <= font_value <= 10:
131 actual_font = font_value # type: ignore[assignment]
132 elif isinstance(font_value, str):
133 # If it's a string, use it directly
134 actual_font = font_value # type: ignore[assignment]
136 # Calculate how many lines this text will need
137 # Use the actual font from table attributes with actual font size
138 text_width = get_string_width(
139 cell_value,
140 font=actual_font,
141 font_size=actual_font_size, # type: ignore[arg-type]
142 )
144 # Determine effective width for wrapping
145 # If column is a spanning column, use total table width
146 effective_width = (
147 total_width if col_name in spanning_columns else col_width
148 )
150 lines_needed = max(1, int(text_width / effective_width) + 1)
151 max_lines_in_row = max(max_lines_in_row, lines_needed)
153 # Account for cell height if specified in table attributes
154 cell_height_lines = 1
155 if table_attrs and hasattr(table_attrs, "cell_height"):
156 from ..attributes import BroadcastValue
158 cell_height = BroadcastValue(
159 value=table_attrs.cell_height, dimension=dim
160 ).iloc(row_idx, 0)
161 # Convert cell height from inches to approximate line count
162 # Assuming default line height of ~0.15 inches
163 cell_height_lines = max(1, int(cell_height / 0.15))
165 row_counts.append(max(max_lines_in_row, cell_height_lines))
167 return row_counts
169 def find_page_breaks(
170 self,
171 df: pl.DataFrame,
172 col_widths: Sequence[float],
173 page_by: Sequence[str] | None = None,
174 new_page: bool = False,
175 table_attrs: TableAttributes | None = None,
176 additional_rows_per_page: int = 0,
177 ) -> Sequence[tuple[int, int]]:
178 """Find optimal page break positions (r2rtf compatible)
180 Args:
181 df: DataFrame to paginate
182 col_widths: Column widths in inches
183 page_by: Columns to group by for page breaks
184 new_page: Whether to force new pages between groups
185 table_attrs: Table attributes for accurate row calculation
186 additional_rows_per_page: Additional rows per page (headers,
187 footnotes, sources)
189 Returns:
190 List of (start_row, end_row) tuples for each page
191 """
192 if df.height == 0:
193 return []
195 row_counts = self.calculate_content_rows(
196 df, col_widths, table_attrs, spanning_columns=page_by
197 )
198 page_breaks = []
199 current_page_start = 0
200 current_page_rows = 0
202 # Calculate available rows for data (r2rtf compatible)
203 # In r2rtf, nrow includes ALL rows (headers, data, footnotes, sources)
204 available_data_rows_per_page = max(
205 1, self.pagination.nrow - additional_rows_per_page
206 )
208 for row_idx, row_height in enumerate(row_counts):
209 # Check if adding this row would exceed the page limit (including
210 # additional rows)
211 if current_page_rows + row_height > available_data_rows_per_page:
212 # Create page break before this row
213 if current_page_start < row_idx:
214 page_breaks.append((current_page_start, row_idx - 1))
215 current_page_start = row_idx
216 current_page_rows = row_height
217 else:
218 current_page_rows += row_height
220 # Handle group-based page breaks
221 # When page_by + new_page=True, force breaks at group boundaries
222 # When page_by alone, allow natural pagination with spanning rows mid-page
223 if page_by and new_page and row_idx < df.height - 1:
224 current_group = {col: df[col][row_idx] for col in page_by}
225 next_group = {col: df[col][row_idx + 1] for col in page_by}
227 if current_group != next_group:
228 # Force page break between groups
229 page_breaks.append((current_page_start, row_idx))
230 current_page_start = row_idx + 1
231 current_page_rows = 0
233 # Add final page
234 if current_page_start < df.height:
235 page_breaks.append((current_page_start, df.height - 1))
237 return page_breaks
239 def calculate_row_metadata(
240 self,
241 df: pl.DataFrame,
242 col_widths: Sequence[float],
243 page_by: Sequence[str] | None = None,
244 subline_by: Sequence[str] | None = None,
245 table_attrs: TableAttributes | None = None,
246 removed_column_indices: Sequence[int] | None = None,
247 font_size: float = 9,
248 additional_rows_per_page: int = 0,
249 new_page: bool = False,
250 ) -> pl.DataFrame:
251 """Generate complete row metadata for pagination."""
253 # 1. Calculate data rows
254 # Use existing calculation logic but handle removed columns manually
255 row_metadata_list = []
256 total_width = sum(col_widths)
258 # Pre-calculate group changes
259 page_by_changes = [True] * df.height
260 subline_by_changes = [True] * df.height
262 if page_by:
263 # Calculate changes for page_by
264 # We can use polars shift/diff logic or simple iteration
265 # Simple iteration is safer for now
266 for i in range(1, df.height):
267 prev_row = df.row(i - 1, named=True)
268 curr_row = df.row(i, named=True)
270 # Check page_by
271 is_diff = False
272 for col in page_by:
273 if str(prev_row[col]) != str(curr_row[col]):
274 is_diff = True
275 break
276 page_by_changes[i] = is_diff
278 if subline_by:
279 for i in range(1, df.height):
280 prev_row = df.row(i - 1, named=True)
281 curr_row = df.row(i, named=True)
283 # Check subline_by
284 is_diff = False
285 for col in subline_by:
286 if str(prev_row[col]) != str(curr_row[col]):
287 is_diff = True
288 break
289 subline_by_changes[i] = is_diff
291 # Iterate rows
292 removed_indices = set(removed_column_indices or [])
294 for row_idx in range(df.height):
295 # 1. Calculate data_rows
296 max_lines_in_row = 1
297 width_idx = 0
299 for col_idx in range(df.width):
300 if col_idx in removed_indices:
301 continue
303 if width_idx >= len(col_widths):
304 break
306 # Calculate individual column width from cumulative widths
307 # col_widths contains cumulative widths (right boundaries)
308 current_cumulative = col_widths[width_idx]
309 prev_cumulative = col_widths[width_idx - 1] if width_idx > 0 else 0
310 col_width = current_cumulative - prev_cumulative
311 col_name = df.columns[col_idx]
312 cell_value = str(df[col_name][row_idx])
314 # Font logic
315 actual_font_size = font_size
316 actual_font = 1
318 if table_attrs:
319 pass
321 text_width = get_string_width(
322 cell_value,
323 font=actual_font, # type: ignore
324 font_size=actual_font_size, # type: ignore
325 )
327 effective_width = col_width
328 lines_needed = max(1, int(text_width / effective_width) + 1)
329 max_lines_in_row = max(max_lines_in_row, lines_needed)
330 width_idx += 1
332 # 2. Calculate header rows
333 pageby_rows = 0
334 if page_by and page_by_changes[row_idx]:
335 # Construct header text
336 header_parts = []
337 for col in page_by:
338 val = df[col][row_idx]
339 if str(val) != "-----":
340 header_parts.append(f"{col}: {val}")
341 header_text = " | ".join(header_parts)
342 if header_text:
343 pageby_rows = self._calculate_header_rows(
344 header_text, total_width, font_size=int(font_size)
345 ) # type: ignore
347 subline_rows = 0
348 if subline_by and subline_by_changes[row_idx]:
349 # Construct header text
350 header_parts = []
351 for col in subline_by:
352 val = df[col][row_idx]
353 if str(val) != "-----":
354 header_parts.append(f"{col}: {val}")
355 header_text = " | ".join(header_parts)
356 if header_text:
357 subline_rows = self._calculate_header_rows(
358 header_text, total_width, font_size=int(font_size)
359 ) # type: ignore
361 total_rows = max_lines_in_row + pageby_rows + subline_rows
363 row_metadata_list.append(
364 {
365 "row_index": row_idx,
366 "data_rows": max_lines_in_row,
367 "pageby_header_rows": pageby_rows,
368 "subline_header_rows": subline_rows,
369 "column_header_rows": 0, # To be filled later or passed in
370 "total_rows": total_rows,
371 "page": 0, # To be assigned
372 "is_group_start": page_by_changes[row_idx] if page_by else False,
373 "is_subline_start": subline_by_changes[row_idx]
374 if subline_by
375 else False,
376 }
377 )
379 # Create DataFrame with explicit schema to handle empty case
380 schema = {
381 "row_index": pl.Int64,
382 "data_rows": pl.Int64,
383 "pageby_header_rows": pl.Int64,
384 "subline_header_rows": pl.Int64,
385 "column_header_rows": pl.Int64,
386 "total_rows": pl.Int64,
387 "page": pl.Int64,
388 "is_group_start": pl.Boolean,
389 "is_subline_start": pl.Boolean,
390 }
391 meta_df = pl.DataFrame(row_metadata_list, schema=schema, orient="row")
393 # Assign pages
394 return self._assign_pages(meta_df, additional_rows_per_page, new_page)
396 def _calculate_header_rows(
397 self,
398 header_text: str,
399 total_width: float,
400 font: FontName | FontNumber = 1,
401 font_size: int = 18,
402 ) -> int:
403 """Calculate how many rows a header will occupy."""
404 text_width = get_string_width(header_text, font=font, font_size=font_size)
405 return max(1, int(text_width / total_width) + 1)
407 def _assign_pages(
408 self,
409 meta_df: pl.DataFrame,
410 additional_rows_per_page: int = 0,
411 new_page: bool = False,
412 ) -> pl.DataFrame:
413 """Assign page numbers to the metadata DataFrame."""
414 if meta_df.height == 0:
415 return meta_df
417 available_rows = max(1, self.pagination.nrow - additional_rows_per_page)
418 current_page = 1
419 current_rows = 0
421 # We need to iterate and update 'page' column
422 # Convert to list of dicts for mutable iteration
423 rows = meta_df.to_dicts()
425 for i, row in enumerate(rows):
426 row_height = row["total_rows"]
428 # Check if we need a new page
429 force_break = False
431 # Force break on subline start (except first row)
432 if row["is_subline_start"] and i > 0:
433 force_break = True
435 # Force break on group start if requested
436 if new_page and row["is_group_start"] and i > 0:
437 force_break = True
439 if (
440 force_break or (current_rows + row_height > available_rows)
441 ) and current_rows > 0:
442 current_page += 1
443 current_rows = 0
445 row["page"] = current_page
446 current_rows += row_height
448 return pl.DataFrame(rows)