Coverage for src / rtflite / pagination / strategies / grouping.py: 96%
92 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-08 04:50 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-08 04:50 +0000
1from collections.abc import Sequence
2from typing import Any, cast
4import polars as pl
6from ..core import PageBreakCalculator, RTFPagination
7from .base import PageContext, PaginationContext, PaginationStrategy
10class PageByStrategy(PaginationStrategy):
11 """Pagination strategy that respects grouping columns (page_by)."""
13 def paginate(self, context: PaginationContext) -> list[PageContext]:
14 # Initialize calculator
15 assert context.rtf_page.width is not None
16 assert context.rtf_page.height is not None
17 assert context.rtf_page.margin is not None
18 assert context.rtf_page.nrow is not None
19 assert context.rtf_page.orientation is not None
21 pagination_config = RTFPagination(
22 page_width=context.rtf_page.width,
23 page_height=context.rtf_page.height,
24 margin=context.rtf_page.margin,
25 nrow=context.rtf_page.nrow,
26 orientation=context.rtf_page.orientation,
27 )
28 calculator = PageBreakCalculator(pagination=pagination_config)
30 page_by = context.rtf_body.page_by
32 # Calculate metadata
33 metadata = calculator.calculate_row_metadata(
34 df=context.df,
35 col_widths=context.col_widths,
36 page_by=page_by,
37 table_attrs=context.table_attrs,
38 removed_column_indices=context.removed_column_indices,
39 additional_rows_per_page=context.additional_rows_per_page,
40 new_page=context.rtf_body.new_page,
41 )
43 pages = []
44 import polars as pl
46 unique_pages = metadata["page"].unique().sort()
47 total_pages = len(unique_pages)
49 for page_num in unique_pages:
50 page_rows = metadata.filter(pl.col("page") == page_num)
52 if page_rows.height == 0:
53 continue
55 start_row = cast(int, page_rows["row_index"].min())
56 end_row = cast(int, page_rows["row_index"].max())
58 page_df = context.df.slice(start_row, end_row - start_row + 1)
59 display_page_num = int(page_num)
61 is_first = display_page_num == 1
62 # Repeating headers: if pageby_header is True, or if it's the first page.
63 needs_header = context.rtf_body.pageby_header or is_first
65 page_ctx = PageContext(
66 page_number=display_page_num,
67 total_pages=total_pages,
68 data=page_df,
69 is_first_page=is_first,
70 is_last_page=(display_page_num == total_pages),
71 col_widths=context.col_widths,
72 needs_header=needs_header,
73 table_attrs=context.table_attrs,
74 )
76 # Add page_by header info
77 if page_by:
78 page_ctx.pageby_header_info = self._get_group_headers(
79 context.df, page_by, start_row
80 )
82 # Detect group boundaries for spanning rows mid-page
83 group_boundaries = self._detect_group_boundaries(
84 context.df, page_by, start_row, end_row
85 )
86 if group_boundaries:
87 page_ctx.group_boundaries = group_boundaries
89 pages.append(page_ctx)
91 return pages
93 def _get_group_headers(
94 self, df: pl.DataFrame, page_by: Sequence[str], start_row: int
95 ) -> dict[str, Any]:
96 """Get group header information for a page."""
97 if not page_by or start_row >= df.height:
98 return {}
100 group_values = {}
101 for col in page_by:
102 val = df[col][start_row]
103 if str(val) != "-----":
104 group_values[col] = val
106 return {
107 "group_by_columns": page_by,
108 "group_values": group_values,
109 "header_text": " | ".join(
110 f"{col}: {val}" for col, val in group_values.items()
111 ),
112 }
114 def _detect_group_boundaries(
115 self, df: pl.DataFrame, page_by: Sequence[str], start_row: int, end_row: int
116 ) -> list[dict[str, Any]]:
117 """Detect group boundaries within a page range."""
118 group_boundaries = []
119 for row_idx in range(start_row, end_row):
120 if row_idx + 1 <= end_row:
121 current_group = {col: df[col][row_idx] for col in page_by}
122 next_group = {col: df[col][row_idx + 1] for col in page_by}
124 if current_group != next_group:
125 next_group_filtered = {
126 k: v for k, v in next_group.items() if str(v) != "-----"
127 }
128 group_boundaries.append(
129 {
130 "absolute_row": row_idx + 1,
131 "page_relative_row": row_idx + 1 - start_row,
132 "group_values": next_group_filtered,
133 }
134 )
135 return group_boundaries
138class SublineStrategy(PageByStrategy):
139 """Pagination strategy for subline_by (forces new pages and special headers)."""
141 def paginate(self, context: PaginationContext) -> list[PageContext]:
142 # Subline strategy uses subline_by columns and forces new_page=True.
143 subline_by = context.rtf_body.subline_by
145 # Initialize calculator
146 assert context.rtf_page.width is not None
147 assert context.rtf_page.height is not None
148 assert context.rtf_page.margin is not None
149 assert context.rtf_page.nrow is not None
150 assert context.rtf_page.orientation is not None
152 pagination_config = RTFPagination(
153 page_width=context.rtf_page.width,
154 page_height=context.rtf_page.height,
155 margin=context.rtf_page.margin,
156 nrow=context.rtf_page.nrow,
157 orientation=context.rtf_page.orientation,
158 )
159 calculator = PageBreakCalculator(pagination=pagination_config)
161 # Calculate metadata
162 # SublineStrategy forces new page on subline change.
163 metadata = calculator.calculate_row_metadata(
164 df=context.df,
165 col_widths=context.col_widths,
166 page_by=context.rtf_body.page_by,
167 subline_by=subline_by,
168 table_attrs=context.table_attrs,
169 removed_column_indices=context.removed_column_indices,
170 additional_rows_per_page=context.additional_rows_per_page,
171 new_page=True,
172 )
174 pages = []
175 import polars as pl
177 unique_pages = metadata["page"].unique().sort()
178 total_pages = len(unique_pages)
180 for page_num in unique_pages:
181 page_rows = metadata.filter(pl.col("page") == page_num)
183 if page_rows.height == 0:
184 continue
186 start_row = cast(int, page_rows["row_index"].min())
187 end_row = cast(int, page_rows["row_index"].max())
189 page_df = context.df.slice(start_row, end_row - start_row + 1)
190 display_page_num = int(page_num)
192 is_first = display_page_num == 1
194 page_ctx = PageContext(
195 page_number=display_page_num,
196 total_pages=total_pages,
197 data=page_df,
198 is_first_page=is_first,
199 is_last_page=(display_page_num == total_pages),
200 col_widths=context.col_widths,
201 needs_header=is_first or context.rtf_body.pageby_header,
202 table_attrs=context.table_attrs,
203 )
205 if subline_by:
206 page_ctx.subline_header = self._get_group_headers(
207 context.df, subline_by, start_row
208 )
210 # Also handle page_by if present (spanning rows)
211 page_by = context.rtf_body.page_by
212 if page_by:
213 page_ctx.pageby_header_info = self._get_group_headers(
214 context.df, page_by, start_row
215 )
217 # Detect group boundaries for spanning rows mid-page
218 group_boundaries = self._detect_group_boundaries(
219 context.df, page_by, start_row, end_row
220 )
221 if group_boundaries:
222 page_ctx.group_boundaries = group_boundaries
224 pages.append(page_ctx)
226 return pages