Coverage for src / rtflite / pagination / core.py: 93%

117 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-28 05:09 +0000

1from collections.abc import Mapping, Sequence 

2from typing import Any 

3 

4import polars as pl 

5from pydantic import BaseModel, ConfigDict, Field 

6 

7from ..attributes import TableAttributes 

8from ..fonts_mapping import FontName, FontNumber 

9from ..strwidth import get_string_width 

10 

11 

12class RTFPagination(BaseModel): 

13 """Core pagination logic and calculations for RTF documents""" 

14 

15 model_config = ConfigDict(arbitrary_types_allowed=True) 

16 

17 page_width: float = Field(..., description="Page width in inches") 

18 page_height: float = Field(..., description="Page height in inches") 

19 margin: Sequence[float] = Field( 

20 ..., description="Page margins [left, right, top, bottom, header, footer]" 

21 ) 

22 nrow: int = Field(..., description="Maximum rows per page") 

23 orientation: str = Field(..., description="Page orientation") 

24 

25 def calculate_available_space(self) -> Mapping[str, float]: 

26 """Calculate available space for content on each page""" 

27 content_width = ( 

28 self.page_width - self.margin[0] - self.margin[1] 

29 ) # left + right margins 

30 content_height = ( 

31 self.page_height - self.margin[2] - self.margin[3] 

32 ) # top + bottom margins 

33 header_space = self.margin[4] # header margin 

34 footer_space = self.margin[5] # footer margin 

35 

36 return { 

37 "content_width": content_width, 

38 "content_height": content_height, 

39 "header_space": header_space, 

40 "footer_space": footer_space, 

41 } 

42 

43 

44class PageBreakCalculator(BaseModel): 

45 """Calculates where page breaks should occur based on content and constraints""" 

46 

47 model_config = ConfigDict(arbitrary_types_allowed=True) 

48 

49 pagination: RTFPagination = Field(..., description="Pagination configuration") 

50 

51 def calculate_content_rows( 

52 self, 

53 df: pl.DataFrame, 

54 col_widths: Sequence[float], 

55 table_attrs: TableAttributes | None = None, 

56 font_size: float = 9, 

57 ) -> Sequence[int]: 

58 """Calculate how many rows each content row will occupy when rendered 

59 

60 Args: 

61 df: DataFrame containing the content 

62 col_widths: Width of each column in inches 

63 table_attrs: Table attributes containing cell height and font size info 

64 font_size: Default font size in points 

65 

66 Returns: 

67 List of row counts for each data row 

68 """ 

69 row_counts = [] 

70 dim = df.shape 

71 

72 for row_idx in range(df.height): 

73 max_lines_in_row = 1 

74 

75 for col_idx, col_width in enumerate(col_widths): 

76 if col_idx < len(df.columns): 

77 # Use proper polars column access - df[column_name][row_idx] 

78 col_name = df.columns[col_idx] 

79 cell_value = str(df[col_name][row_idx]) 

80 

81 # Get actual font size from table attributes if available 

82 actual_font_size = font_size 

83 if table_attrs and hasattr(table_attrs, "text_font_size"): 

84 from ..attributes import BroadcastValue 

85 

86 actual_font_size = BroadcastValue( 

87 value=table_attrs.text_font_size, dimension=dim 

88 ).iloc(row_idx, col_idx) 

89 

90 # Get actual font from table attributes if available 

91 actual_font: FontName | FontNumber = ( 

92 1 # Default to font number 1 (Times New Roman) 

93 ) 

94 if table_attrs and hasattr(table_attrs, "text_font"): 

95 from ..attributes import BroadcastValue 

96 

97 font_value = BroadcastValue( 

98 value=table_attrs.text_font, dimension=dim 

99 ).iloc(row_idx, col_idx) 

100 # Handle both FontNumber (int) and FontName (str) 

101 if isinstance(font_value, int) and 1 <= font_value <= 10: 

102 actual_font = font_value # type: ignore[assignment] 

103 elif isinstance(font_value, str): 

104 # If it's a string, use it directly 

105 actual_font = font_value # type: ignore[assignment] 

106 

107 # Calculate how many lines this text will need 

108 # Use the actual font from table attributes with actual font size 

109 text_width = get_string_width( 

110 cell_value, 

111 font=actual_font, 

112 font_size=actual_font_size, # type: ignore[arg-type] 

113 ) 

114 lines_needed = max(1, int(text_width / col_width) + 1) 

115 max_lines_in_row = max(max_lines_in_row, lines_needed) 

116 

117 # Account for cell height if specified in table attributes 

118 cell_height_lines = 1 

119 if table_attrs and hasattr(table_attrs, "cell_height"): 

120 from ..attributes import BroadcastValue 

121 

122 cell_height = BroadcastValue( 

123 value=table_attrs.cell_height, dimension=dim 

124 ).iloc(row_idx, 0) 

125 # Convert cell height from inches to approximate line count 

126 # Assuming default line height of ~0.15 inches 

127 cell_height_lines = max(1, int(cell_height / 0.15)) 

128 

129 row_counts.append(max(max_lines_in_row, cell_height_lines)) 

130 

131 return row_counts 

132 

133 def find_page_breaks( 

134 self, 

135 df: pl.DataFrame, 

136 col_widths: Sequence[float], 

137 page_by: Sequence[str] | None = None, 

138 new_page: bool = False, 

139 table_attrs: TableAttributes | None = None, 

140 additional_rows_per_page: int = 0, 

141 ) -> Sequence[tuple[int, int]]: 

142 """Find optimal page break positions (r2rtf compatible) 

143 

144 Args: 

145 df: DataFrame to paginate 

146 col_widths: Column widths in inches 

147 page_by: Columns to group by for page breaks 

148 new_page: Whether to force new pages between groups 

149 table_attrs: Table attributes for accurate row calculation 

150 additional_rows_per_page: Additional rows per page (headers, 

151 footnotes, sources) 

152 

153 Returns: 

154 List of (start_row, end_row) tuples for each page 

155 """ 

156 if df.height == 0: 

157 return [] 

158 

159 row_counts = self.calculate_content_rows(df, col_widths, table_attrs) 

160 page_breaks = [] 

161 current_page_start = 0 

162 current_page_rows = 0 

163 

164 # Calculate available rows for data (r2rtf compatible) 

165 # In r2rtf, nrow includes ALL rows (headers, data, footnotes, sources) 

166 available_data_rows_per_page = max( 

167 1, self.pagination.nrow - additional_rows_per_page 

168 ) 

169 

170 for row_idx, row_height in enumerate(row_counts): 

171 # Check if adding this row would exceed the page limit (including 

172 # additional rows) 

173 if current_page_rows + row_height > available_data_rows_per_page: 

174 # Create page break before this row 

175 if current_page_start < row_idx: 

176 page_breaks.append((current_page_start, row_idx - 1)) 

177 current_page_start = row_idx 

178 current_page_rows = row_height 

179 else: 

180 current_page_rows += row_height 

181 

182 # Handle group-based page breaks 

183 # When page_by + new_page=True, force breaks at group boundaries 

184 # When page_by alone, allow natural pagination with spanning rows mid-page 

185 if page_by and new_page and row_idx < df.height - 1: 

186 current_group = {col: df[col][row_idx] for col in page_by} 

187 next_group = {col: df[col][row_idx + 1] for col in page_by} 

188 

189 if current_group != next_group: 

190 # Force page break between groups 

191 page_breaks.append((current_page_start, row_idx)) 

192 current_page_start = row_idx + 1 

193 current_page_rows = 0 

194 

195 # Add final page 

196 if current_page_start < df.height: 

197 page_breaks.append((current_page_start, df.height - 1)) 

198 

199 return page_breaks 

200 

201 

202class ContentDistributor(BaseModel): 

203 """Manages content distribution across multiple pages""" 

204 

205 model_config = ConfigDict(arbitrary_types_allowed=True) 

206 

207 pagination: RTFPagination = Field(..., description="Pagination configuration") 

208 calculator: PageBreakCalculator = Field(..., description="Page break calculator") 

209 

210 def distribute_content( 

211 self, 

212 df: pl.DataFrame, 

213 col_widths: Sequence[float], 

214 page_by: Sequence[str] | None = None, 

215 new_page: bool = False, 

216 pageby_header: bool = True, 

217 table_attrs: TableAttributes | None = None, 

218 additional_rows_per_page: int = 0, 

219 subline_by: Sequence[str] | None = None, 

220 ) -> Sequence[Mapping[str, Any]]: 

221 """Distribute content across multiple pages (r2rtf compatible) 

222 

223 Args: 

224 df: DataFrame to distribute 

225 col_widths: Column widths in inches 

226 page_by: Columns to group by 

227 new_page: Force new pages between groups 

228 pageby_header: Repeat headers on new pages 

229 table_attrs: Table attributes for accurate calculations 

230 additional_rows_per_page: Additional rows per page (headers, 

231 footnotes, sources) 

232 subline_by: Columns to create subline headers by (forces new_page=True) 

233 

234 Returns: 

235 List of page information dictionaries 

236 """ 

237 # If subline_by is specified, treat it as page_by with new_page=True 

238 if subline_by: 

239 page_by = subline_by 

240 new_page = True 

241 

242 page_breaks = self.calculator.find_page_breaks( 

243 df, col_widths, page_by, new_page, table_attrs, additional_rows_per_page 

244 ) 

245 pages = [] 

246 

247 for page_num, (start_row, end_row) in enumerate(page_breaks): 

248 page_df = df[start_row : end_row + 1] 

249 

250 page_info = { 

251 "page_number": page_num + 1, 

252 "total_pages": len(page_breaks), 

253 "data": page_df, 

254 "start_row": start_row, 

255 "end_row": end_row, 

256 "is_first_page": page_num == 0, 

257 "is_last_page": page_num == len(page_breaks) - 1, 

258 "needs_header": pageby_header or page_num == 0, 

259 "col_widths": col_widths, 

260 } 

261 

262 # Add subline_by header information for each page 

263 if subline_by: 

264 page_info["subline_header"] = self.get_group_headers( 

265 df, subline_by, start_row 

266 ) 

267 # Add page_by header information (spanning rows) on each page 

268 # Note: new_page flag only controls forced page breaks, not spanning row creation 

269 elif page_by: 

270 # Get header for first group on this page 

271 page_info["pageby_header_info"] = self.get_group_headers( 

272 df, page_by, start_row 

273 ) 

274 

275 # Detect all group boundaries within this page 

276 # This allows spanning rows to be inserted mid-page when new_page=False 

277 group_boundaries = [] 

278 for row_idx in range(start_row, end_row): 

279 if row_idx + 1 <= end_row: 

280 current_group = {col: df[col][row_idx] for col in page_by} 

281 next_group = {col: df[col][row_idx + 1] for col in page_by} 

282 if current_group != next_group: 

283 # Filter out divider values for the next group header 

284 next_group_filtered = { 

285 k: v for k, v in next_group.items() if str(v) != "-----" 

286 } 

287 

288 # Group changes at row_idx+1 (relative to page: row_idx+1-start_row) 

289 group_boundaries.append({ 

290 "absolute_row": row_idx + 1, 

291 "page_relative_row": row_idx + 1 - start_row, 

292 "group_values": next_group_filtered 

293 }) 

294 

295 if group_boundaries: 

296 page_info["group_boundaries"] = group_boundaries 

297 

298 pages.append(page_info) 

299 

300 return pages 

301 

302 def get_group_headers( 

303 self, df: pl.DataFrame, page_by: Sequence[str], start_row: int 

304 ) -> Mapping[str, Any]: 

305 """Get group header information for a page 

306 

307 Args: 

308 df: Original DataFrame 

309 page_by: Grouping columns 

310 start_row: Starting row for this page 

311 

312 Returns: 

313 Dictionary with group header information 

314 """ 

315 if not page_by or start_row >= df.height: 

316 return {} 

317 

318 group_values = {} 

319 for col in page_by: 

320 val = df[col][start_row] 

321 # Filter out divider rows marked with "-----" 

322 if str(val) != "-----": 

323 group_values[col] = val 

324 

325 return { 

326 "group_by_columns": page_by, 

327 "group_values": group_values, 

328 "header_text": " | ".join( 

329 f"{col}: {val}" for col, val in group_values.items() 

330 ), 

331 }