Coverage for src/rtflite/pagination/core.py: 91%
103 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 16:35 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 16:35 +0000
1from collections.abc import Sequence
2from typing import Any
4import polars as pl
5from pydantic import BaseModel, ConfigDict, Field
7from ..attributes import TableAttributes
8from ..fonts_mapping import FontName, FontNumber
9from ..strwidth import get_string_width
12class RTFPagination(BaseModel):
13 """Core pagination logic and calculations for RTF documents"""
15 model_config = ConfigDict(arbitrary_types_allowed=True)
17 page_width: float = Field(..., description="Page width in inches")
18 page_height: float = Field(..., description="Page height in inches")
19 margin: Sequence[float] = Field(
20 ..., description="Page margins [left, right, top, bottom, header, footer]"
21 )
22 nrow: int = Field(..., description="Maximum rows per page")
23 orientation: str = Field(..., description="Page orientation")
25 def calculate_available_space(self) -> dict[str, float]:
26 """Calculate available space for content on each page"""
27 content_width = (
28 self.page_width - self.margin[0] - self.margin[1]
29 ) # left + right margins
30 content_height = (
31 self.page_height - self.margin[2] - self.margin[3]
32 ) # top + bottom margins
33 header_space = self.margin[4] # header margin
34 footer_space = self.margin[5] # footer margin
36 return {
37 "content_width": content_width,
38 "content_height": content_height,
39 "header_space": header_space,
40 "footer_space": footer_space,
41 }
44class PageBreakCalculator(BaseModel):
45 """Calculates where page breaks should occur based on content and constraints"""
47 model_config = ConfigDict(arbitrary_types_allowed=True)
49 pagination: RTFPagination = Field(..., description="Pagination configuration")
51 def calculate_content_rows(
52 self,
53 df: pl.DataFrame,
54 col_widths: list[float],
55 table_attrs: TableAttributes | None = None,
56 font_size: float = 9,
57 ) -> list[int]:
58 """Calculate how many rows each content row will occupy when rendered
60 Args:
61 df: DataFrame containing the content
62 col_widths: Width of each column in inches
63 table_attrs: Table attributes containing cell height and font size info
64 font_size: Default font size in points
66 Returns:
67 List of row counts for each data row
68 """
69 row_counts = []
70 dim = df.shape
72 for row_idx in range(df.height):
73 max_lines_in_row = 1
75 for col_idx, col_width in enumerate(col_widths):
76 if col_idx < len(df.columns):
77 # Use proper polars column access - df[column_name][row_idx]
78 col_name = df.columns[col_idx]
79 cell_value = str(df[col_name][row_idx])
81 # Get actual font size from table attributes if available
82 actual_font_size = font_size
83 if table_attrs and hasattr(table_attrs, "text_font_size"):
84 from ..attributes import BroadcastValue
86 actual_font_size = BroadcastValue(
87 value=table_attrs.text_font_size, dimension=dim
88 ).iloc(row_idx, col_idx)
90 # Get actual font from table attributes if available
91 actual_font: FontName | FontNumber = (
92 1 # Default to font number 1 (Times New Roman)
93 )
94 if table_attrs and hasattr(table_attrs, "text_font"):
95 from ..attributes import BroadcastValue
97 font_value = BroadcastValue(
98 value=table_attrs.text_font, dimension=dim
99 ).iloc(row_idx, col_idx)
100 # Handle both FontNumber (int) and FontName (str)
101 if isinstance(font_value, int) and 1 <= font_value <= 10:
102 actual_font = font_value # type: ignore[assignment]
103 elif isinstance(font_value, str):
104 # If it's a string, use it directly
105 actual_font = font_value # type: ignore[assignment]
107 # Calculate how many lines this text will need
108 # Use the actual font from table attributes with actual font size
109 text_width = get_string_width(
110 cell_value,
111 font=actual_font,
112 font_size=actual_font_size, # type: ignore[arg-type]
113 )
114 lines_needed = max(1, int(text_width / col_width) + 1)
115 max_lines_in_row = max(max_lines_in_row, lines_needed)
117 # Account for cell height if specified in table attributes
118 cell_height_lines = 1
119 if table_attrs and hasattr(table_attrs, "cell_height"):
120 from ..attributes import BroadcastValue
122 cell_height = BroadcastValue(
123 value=table_attrs.cell_height, dimension=dim
124 ).iloc(row_idx, 0)
125 # Convert cell height from inches to approximate line count
126 # Assuming default line height of ~0.15 inches
127 cell_height_lines = max(1, int(cell_height / 0.15))
129 row_counts.append(max(max_lines_in_row, cell_height_lines))
131 return row_counts
133 def find_page_breaks(
134 self,
135 df: pl.DataFrame,
136 col_widths: list[float],
137 page_by: list[str] | None = None,
138 new_page: bool = False,
139 table_attrs: TableAttributes | None = None,
140 additional_rows_per_page: int = 0,
141 ) -> list[tuple[int, int]]:
142 """Find optimal page break positions (r2rtf compatible)
144 Args:
145 df: DataFrame to paginate
146 col_widths: Column widths in inches
147 page_by: Columns to group by for page breaks
148 new_page: Whether to force new pages between groups
149 table_attrs: Table attributes for accurate row calculation
150 additional_rows_per_page: Additional rows per page (headers, footnotes, sources)
152 Returns:
153 List of (start_row, end_row) tuples for each page
154 """
155 if df.height == 0:
156 return []
158 row_counts = self.calculate_content_rows(df, col_widths, table_attrs)
159 page_breaks = []
160 current_page_start = 0
161 current_page_rows = 0
163 # Calculate available rows for data (r2rtf compatible)
164 # In r2rtf, nrow includes ALL rows (headers, data, footnotes, sources)
165 available_data_rows_per_page = max(
166 1, self.pagination.nrow - additional_rows_per_page
167 )
169 for row_idx, row_height in enumerate(row_counts):
170 # Check if adding this row would exceed page limit (accounting for additional rows)
171 if current_page_rows + row_height > available_data_rows_per_page:
172 # Create page break before this row
173 if current_page_start < row_idx:
174 page_breaks.append((current_page_start, row_idx - 1))
175 current_page_start = row_idx
176 current_page_rows = row_height
177 else:
178 current_page_rows += row_height
180 # Handle group-based page breaks
181 if page_by and new_page and row_idx < df.height - 1:
182 current_group = {col: df[col][row_idx] for col in page_by}
183 next_group = {col: df[col][row_idx + 1] for col in page_by}
185 if current_group != next_group:
186 # Force page break between groups
187 page_breaks.append((current_page_start, row_idx))
188 current_page_start = row_idx + 1
189 current_page_rows = 0
191 # Add final page
192 if current_page_start < df.height:
193 page_breaks.append((current_page_start, df.height - 1))
195 return page_breaks
198class ContentDistributor(BaseModel):
199 """Manages content distribution across multiple pages"""
201 model_config = ConfigDict(arbitrary_types_allowed=True)
203 pagination: RTFPagination = Field(..., description="Pagination configuration")
204 calculator: PageBreakCalculator = Field(..., description="Page break calculator")
206 def distribute_content(
207 self,
208 df: pl.DataFrame,
209 col_widths: list[float],
210 page_by: list[str] | None = None,
211 new_page: bool = False,
212 pageby_header: bool = True,
213 table_attrs: TableAttributes | None = None,
214 additional_rows_per_page: int = 0,
215 subline_by: list[str] | None = None,
216 ) -> list[dict[str, Any]]:
217 """Distribute content across multiple pages (r2rtf compatible)
219 Args:
220 df: DataFrame to distribute
221 col_widths: Column widths in inches
222 page_by: Columns to group by
223 new_page: Force new pages between groups
224 pageby_header: Repeat headers on new pages
225 table_attrs: Table attributes for accurate calculations
226 additional_rows_per_page: Additional rows per page (headers, footnotes, sources)
227 subline_by: Columns to create subline headers by (forces new_page=True)
229 Returns:
230 List of page information dictionaries
231 """
232 # If subline_by is specified, treat it as page_by with new_page=True
233 if subline_by:
234 page_by = subline_by
235 new_page = True
237 page_breaks = self.calculator.find_page_breaks(
238 df, col_widths, page_by, new_page, table_attrs, additional_rows_per_page
239 )
240 pages = []
242 for page_num, (start_row, end_row) in enumerate(page_breaks):
243 page_df = df[start_row : end_row + 1]
245 page_info = {
246 "page_number": page_num + 1,
247 "total_pages": len(page_breaks),
248 "data": page_df,
249 "start_row": start_row,
250 "end_row": end_row,
251 "is_first_page": page_num == 0,
252 "is_last_page": page_num == len(page_breaks) - 1,
253 "needs_header": pageby_header or page_num == 0,
254 "col_widths": col_widths,
255 }
257 # Add subline_by header information for each page
258 if subline_by:
259 page_info["subline_header"] = self.get_group_headers(
260 df, subline_by, start_row
261 )
263 pages.append(page_info)
265 return pages
267 def get_group_headers(
268 self, df: pl.DataFrame, page_by: list[str], start_row: int
269 ) -> dict[str, Any]:
270 """Get group header information for a page
272 Args:
273 df: Original DataFrame
274 page_by: Grouping columns
275 start_row: Starting row for this page
277 Returns:
278 Dictionary with group header information
279 """
280 if not page_by or start_row >= df.height:
281 return {}
283 group_values = {}
284 for col in page_by:
285 group_values[col] = df[col][start_row]
287 return {
288 "group_by_columns": page_by,
289 "group_values": group_values,
290 "header_text": " | ".join(
291 f"{col}: {val}" for col, val in group_values.items()
292 ),
293 }