Coverage for src / rtflite / pagination / strategies / grouping.py: 96%

92 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-08 04:50 +0000

1from collections.abc import Sequence 

2from typing import Any, cast 

3 

4import polars as pl 

5 

6from ..core import PageBreakCalculator, RTFPagination 

7from .base import PageContext, PaginationContext, PaginationStrategy 

8 

9 

10class PageByStrategy(PaginationStrategy): 

11 """Pagination strategy that respects grouping columns (page_by).""" 

12 

13 def paginate(self, context: PaginationContext) -> list[PageContext]: 

14 # Initialize calculator 

15 assert context.rtf_page.width is not None 

16 assert context.rtf_page.height is not None 

17 assert context.rtf_page.margin is not None 

18 assert context.rtf_page.nrow is not None 

19 assert context.rtf_page.orientation is not None 

20 

21 pagination_config = RTFPagination( 

22 page_width=context.rtf_page.width, 

23 page_height=context.rtf_page.height, 

24 margin=context.rtf_page.margin, 

25 nrow=context.rtf_page.nrow, 

26 orientation=context.rtf_page.orientation, 

27 ) 

28 calculator = PageBreakCalculator(pagination=pagination_config) 

29 

30 page_by = context.rtf_body.page_by 

31 

32 # Calculate metadata 

33 metadata = calculator.calculate_row_metadata( 

34 df=context.df, 

35 col_widths=context.col_widths, 

36 page_by=page_by, 

37 table_attrs=context.table_attrs, 

38 removed_column_indices=context.removed_column_indices, 

39 additional_rows_per_page=context.additional_rows_per_page, 

40 new_page=context.rtf_body.new_page, 

41 ) 

42 

43 pages = [] 

44 import polars as pl 

45 

46 unique_pages = metadata["page"].unique().sort() 

47 total_pages = len(unique_pages) 

48 

49 for page_num in unique_pages: 

50 page_rows = metadata.filter(pl.col("page") == page_num) 

51 

52 if page_rows.height == 0: 

53 continue 

54 

55 start_row = cast(int, page_rows["row_index"].min()) 

56 end_row = cast(int, page_rows["row_index"].max()) 

57 

58 page_df = context.df.slice(start_row, end_row - start_row + 1) 

59 display_page_num = int(page_num) 

60 

61 is_first = display_page_num == 1 

62 # Repeating headers: if pageby_header is True, or if it's the first page. 

63 needs_header = context.rtf_body.pageby_header or is_first 

64 

65 page_ctx = PageContext( 

66 page_number=display_page_num, 

67 total_pages=total_pages, 

68 data=page_df, 

69 is_first_page=is_first, 

70 is_last_page=(display_page_num == total_pages), 

71 col_widths=context.col_widths, 

72 needs_header=needs_header, 

73 table_attrs=context.table_attrs, 

74 ) 

75 

76 # Add page_by header info 

77 if page_by: 

78 page_ctx.pageby_header_info = self._get_group_headers( 

79 context.df, page_by, start_row 

80 ) 

81 

82 # Detect group boundaries for spanning rows mid-page 

83 group_boundaries = self._detect_group_boundaries( 

84 context.df, page_by, start_row, end_row 

85 ) 

86 if group_boundaries: 

87 page_ctx.group_boundaries = group_boundaries 

88 

89 pages.append(page_ctx) 

90 

91 return pages 

92 

93 def _get_group_headers( 

94 self, df: pl.DataFrame, page_by: Sequence[str], start_row: int 

95 ) -> dict[str, Any]: 

96 """Get group header information for a page.""" 

97 if not page_by or start_row >= df.height: 

98 return {} 

99 

100 group_values = {} 

101 for col in page_by: 

102 val = df[col][start_row] 

103 if str(val) != "-----": 

104 group_values[col] = val 

105 

106 return { 

107 "group_by_columns": page_by, 

108 "group_values": group_values, 

109 "header_text": " | ".join( 

110 f"{col}: {val}" for col, val in group_values.items() 

111 ), 

112 } 

113 

114 def _detect_group_boundaries( 

115 self, df: pl.DataFrame, page_by: Sequence[str], start_row: int, end_row: int 

116 ) -> list[dict[str, Any]]: 

117 """Detect group boundaries within a page range.""" 

118 group_boundaries = [] 

119 for row_idx in range(start_row, end_row): 

120 if row_idx + 1 <= end_row: 

121 current_group = {col: df[col][row_idx] for col in page_by} 

122 next_group = {col: df[col][row_idx + 1] for col in page_by} 

123 

124 if current_group != next_group: 

125 next_group_filtered = { 

126 k: v for k, v in next_group.items() if str(v) != "-----" 

127 } 

128 group_boundaries.append( 

129 { 

130 "absolute_row": row_idx + 1, 

131 "page_relative_row": row_idx + 1 - start_row, 

132 "group_values": next_group_filtered, 

133 } 

134 ) 

135 return group_boundaries 

136 

137 

138class SublineStrategy(PageByStrategy): 

139 """Pagination strategy for subline_by (forces new pages and special headers).""" 

140 

141 def paginate(self, context: PaginationContext) -> list[PageContext]: 

142 # Subline strategy uses subline_by columns and forces new_page=True. 

143 subline_by = context.rtf_body.subline_by 

144 

145 # Initialize calculator 

146 assert context.rtf_page.width is not None 

147 assert context.rtf_page.height is not None 

148 assert context.rtf_page.margin is not None 

149 assert context.rtf_page.nrow is not None 

150 assert context.rtf_page.orientation is not None 

151 

152 pagination_config = RTFPagination( 

153 page_width=context.rtf_page.width, 

154 page_height=context.rtf_page.height, 

155 margin=context.rtf_page.margin, 

156 nrow=context.rtf_page.nrow, 

157 orientation=context.rtf_page.orientation, 

158 ) 

159 calculator = PageBreakCalculator(pagination=pagination_config) 

160 

161 # Calculate metadata 

162 # SublineStrategy forces new page on subline change. 

163 metadata = calculator.calculate_row_metadata( 

164 df=context.df, 

165 col_widths=context.col_widths, 

166 page_by=context.rtf_body.page_by, 

167 subline_by=subline_by, 

168 table_attrs=context.table_attrs, 

169 removed_column_indices=context.removed_column_indices, 

170 additional_rows_per_page=context.additional_rows_per_page, 

171 new_page=True, 

172 ) 

173 

174 pages = [] 

175 import polars as pl 

176 

177 unique_pages = metadata["page"].unique().sort() 

178 total_pages = len(unique_pages) 

179 

180 for page_num in unique_pages: 

181 page_rows = metadata.filter(pl.col("page") == page_num) 

182 

183 if page_rows.height == 0: 

184 continue 

185 

186 start_row = cast(int, page_rows["row_index"].min()) 

187 end_row = cast(int, page_rows["row_index"].max()) 

188 

189 page_df = context.df.slice(start_row, end_row - start_row + 1) 

190 display_page_num = int(page_num) 

191 

192 is_first = display_page_num == 1 

193 

194 page_ctx = PageContext( 

195 page_number=display_page_num, 

196 total_pages=total_pages, 

197 data=page_df, 

198 is_first_page=is_first, 

199 is_last_page=(display_page_num == total_pages), 

200 col_widths=context.col_widths, 

201 needs_header=is_first or context.rtf_body.pageby_header, 

202 table_attrs=context.table_attrs, 

203 ) 

204 

205 if subline_by: 

206 page_ctx.subline_header = self._get_group_headers( 

207 context.df, subline_by, start_row 

208 ) 

209 

210 # Also handle page_by if present (spanning rows) 

211 page_by = context.rtf_body.page_by 

212 if page_by: 

213 page_ctx.pageby_header_info = self._get_group_headers( 

214 context.df, page_by, start_row 

215 ) 

216 

217 # Detect group boundaries for spanning rows mid-page 

218 group_boundaries = self._detect_group_boundaries( 

219 context.df, page_by, start_row, end_row 

220 ) 

221 if group_boundaries: 

222 page_ctx.group_boundaries = group_boundaries 

223 

224 pages.append(page_ctx) 

225 

226 return pages