Coverage for src / rtflite / pagination / core.py: 83%

188 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-08 04:50 +0000

1from collections.abc import Mapping, Sequence 

2 

3import polars as pl 

4from pydantic import BaseModel, ConfigDict, Field 

5 

6from ..attributes import TableAttributes 

7from ..fonts_mapping import FontName, FontNumber 

8from ..strwidth import get_string_width 

9 

10 

11class RTFPagination(BaseModel): 

12 """Core pagination logic and calculations for RTF documents""" 

13 

14 model_config = ConfigDict(arbitrary_types_allowed=True) 

15 

16 page_width: float = Field(..., description="Page width in inches") 

17 page_height: float = Field(..., description="Page height in inches") 

18 margin: Sequence[float] = Field( 

19 ..., description="Page margins [left, right, top, bottom, header, footer]" 

20 ) 

21 nrow: int = Field(..., description="Maximum rows per page") 

22 orientation: str = Field(..., description="Page orientation") 

23 

24 def calculate_available_space(self) -> Mapping[str, float]: 

25 """Calculate available space for content on each page""" 

26 content_width = ( 

27 self.page_width - self.margin[0] - self.margin[1] 

28 ) # left + right margins 

29 content_height = ( 

30 self.page_height - self.margin[2] - self.margin[3] 

31 ) # top + bottom margins 

32 header_space = self.margin[4] # header margin 

33 footer_space = self.margin[5] # footer margin 

34 

35 return { 

36 "content_width": content_width, 

37 "content_height": content_height, 

38 "header_space": header_space, 

39 "footer_space": footer_space, 

40 } 

41 

42 

43class RowMetadata(BaseModel): 

44 """Metadata for a single row's pagination information.""" 

45 

46 model_config = ConfigDict(arbitrary_types_allowed=True) 

47 

48 row_index: int = Field(..., description="Original data row index (0-based)") 

49 data_rows: int = Field(..., description="Number of rows the data content occupies") 

50 pageby_header_rows: int = Field( 

51 default=0, description="Number of rows the page_by header occupies" 

52 ) 

53 subline_header_rows: int = Field( 

54 default=0, description="Number of rows the subline_by header occupies" 

55 ) 

56 column_header_rows: int = Field( 

57 default=0, description="Number of rows for column headers" 

58 ) 

59 total_rows: int = Field(..., description="Sum of all row counts") 

60 page: int = Field(default=0, description="Assigned page number") 

61 is_group_start: bool = Field( 

62 default=False, description="True if this row starts a new page_by group" 

63 ) 

64 is_subline_start: bool = Field( 

65 default=False, description="True if this row starts a new subline_by group" 

66 ) 

67 

68 

69class PageBreakCalculator(BaseModel): 

70 """Calculates where page breaks should occur based on content and constraints""" 

71 

72 model_config = ConfigDict(arbitrary_types_allowed=True) 

73 

74 pagination: RTFPagination = Field(..., description="Pagination configuration") 

75 

76 def calculate_content_rows( 

77 self, 

78 df: pl.DataFrame, 

79 col_widths: Sequence[float], 

80 table_attrs: TableAttributes | None = None, 

81 font_size: float = 9, 

82 spanning_columns: Sequence[str] | None = None, 

83 ) -> Sequence[int]: 

84 """Calculate how many rows each content row will occupy when rendered 

85 

86 Args: 

87 df: DataFrame containing the content 

88 col_widths: Width of each column in inches 

89 table_attrs: Table attributes containing cell height and font size info 

90 font_size: Default font size in points 

91 spanning_columns: Columns that should be treated as spanning the full width 

92 

93 Returns: 

94 List of row counts for each data row 

95 """ 

96 row_counts = [] 

97 dim = df.shape 

98 spanning_columns = spanning_columns or [] 

99 total_width = sum(col_widths) 

100 

101 for row_idx in range(df.height): 

102 max_lines_in_row = 1 

103 

104 for col_idx, col_width in enumerate(col_widths): 

105 if col_idx < len(df.columns): 

106 # Use proper polars column access - df[column_name][row_idx] 

107 col_name = df.columns[col_idx] 

108 cell_value = str(df[col_name][row_idx]) 

109 

110 # Get actual font size from table attributes if available 

111 actual_font_size = font_size 

112 if table_attrs and hasattr(table_attrs, "text_font_size"): 

113 from ..attributes import BroadcastValue 

114 

115 actual_font_size = BroadcastValue( 

116 value=table_attrs.text_font_size, dimension=dim 

117 ).iloc(row_idx, col_idx) 

118 

119 # Get actual font from table attributes if available 

120 actual_font: FontName | FontNumber = ( 

121 1 # Default to font number 1 (Times New Roman) 

122 ) 

123 if table_attrs and hasattr(table_attrs, "text_font"): 

124 from ..attributes import BroadcastValue 

125 

126 font_value = BroadcastValue( 

127 value=table_attrs.text_font, dimension=dim 

128 ).iloc(row_idx, col_idx) 

129 # Handle both FontNumber (int) and FontName (str) 

130 if isinstance(font_value, int) and 1 <= font_value <= 10: 

131 actual_font = font_value # type: ignore[assignment] 

132 elif isinstance(font_value, str): 

133 # If it's a string, use it directly 

134 actual_font = font_value # type: ignore[assignment] 

135 

136 # Calculate how many lines this text will need 

137 # Use the actual font from table attributes with actual font size 

138 text_width = get_string_width( 

139 cell_value, 

140 font=actual_font, 

141 font_size=actual_font_size, # type: ignore[arg-type] 

142 ) 

143 

144 # Determine effective width for wrapping 

145 # If column is a spanning column, use total table width 

146 effective_width = ( 

147 total_width if col_name in spanning_columns else col_width 

148 ) 

149 

150 lines_needed = max(1, int(text_width / effective_width) + 1) 

151 max_lines_in_row = max(max_lines_in_row, lines_needed) 

152 

153 # Account for cell height if specified in table attributes 

154 cell_height_lines = 1 

155 if table_attrs and hasattr(table_attrs, "cell_height"): 

156 from ..attributes import BroadcastValue 

157 

158 cell_height = BroadcastValue( 

159 value=table_attrs.cell_height, dimension=dim 

160 ).iloc(row_idx, 0) 

161 # Convert cell height from inches to approximate line count 

162 # Assuming default line height of ~0.15 inches 

163 cell_height_lines = max(1, int(cell_height / 0.15)) 

164 

165 row_counts.append(max(max_lines_in_row, cell_height_lines)) 

166 

167 return row_counts 

168 

169 def find_page_breaks( 

170 self, 

171 df: pl.DataFrame, 

172 col_widths: Sequence[float], 

173 page_by: Sequence[str] | None = None, 

174 new_page: bool = False, 

175 table_attrs: TableAttributes | None = None, 

176 additional_rows_per_page: int = 0, 

177 ) -> Sequence[tuple[int, int]]: 

178 """Find optimal page break positions (r2rtf compatible) 

179 

180 Args: 

181 df: DataFrame to paginate 

182 col_widths: Column widths in inches 

183 page_by: Columns to group by for page breaks 

184 new_page: Whether to force new pages between groups 

185 table_attrs: Table attributes for accurate row calculation 

186 additional_rows_per_page: Additional rows per page (headers, 

187 footnotes, sources) 

188 

189 Returns: 

190 List of (start_row, end_row) tuples for each page 

191 """ 

192 if df.height == 0: 

193 return [] 

194 

195 row_counts = self.calculate_content_rows( 

196 df, col_widths, table_attrs, spanning_columns=page_by 

197 ) 

198 page_breaks = [] 

199 current_page_start = 0 

200 current_page_rows = 0 

201 

202 # Calculate available rows for data (r2rtf compatible) 

203 # In r2rtf, nrow includes ALL rows (headers, data, footnotes, sources) 

204 available_data_rows_per_page = max( 

205 1, self.pagination.nrow - additional_rows_per_page 

206 ) 

207 

208 for row_idx, row_height in enumerate(row_counts): 

209 # Check if adding this row would exceed the page limit (including 

210 # additional rows) 

211 if current_page_rows + row_height > available_data_rows_per_page: 

212 # Create page break before this row 

213 if current_page_start < row_idx: 

214 page_breaks.append((current_page_start, row_idx - 1)) 

215 current_page_start = row_idx 

216 current_page_rows = row_height 

217 else: 

218 current_page_rows += row_height 

219 

220 # Handle group-based page breaks 

221 # When page_by + new_page=True, force breaks at group boundaries 

222 # When page_by alone, allow natural pagination with spanning rows mid-page 

223 if page_by and new_page and row_idx < df.height - 1: 

224 current_group = {col: df[col][row_idx] for col in page_by} 

225 next_group = {col: df[col][row_idx + 1] for col in page_by} 

226 

227 if current_group != next_group: 

228 # Force page break between groups 

229 page_breaks.append((current_page_start, row_idx)) 

230 current_page_start = row_idx + 1 

231 current_page_rows = 0 

232 

233 # Add final page 

234 if current_page_start < df.height: 

235 page_breaks.append((current_page_start, df.height - 1)) 

236 

237 return page_breaks 

238 

239 def calculate_row_metadata( 

240 self, 

241 df: pl.DataFrame, 

242 col_widths: Sequence[float], 

243 page_by: Sequence[str] | None = None, 

244 subline_by: Sequence[str] | None = None, 

245 table_attrs: TableAttributes | None = None, 

246 removed_column_indices: Sequence[int] | None = None, 

247 font_size: float = 9, 

248 additional_rows_per_page: int = 0, 

249 new_page: bool = False, 

250 ) -> pl.DataFrame: 

251 """Generate complete row metadata for pagination.""" 

252 

253 # 1. Calculate data rows 

254 # Use existing calculation logic but handle removed columns manually 

255 row_metadata_list = [] 

256 total_width = sum(col_widths) 

257 

258 # Pre-calculate group changes 

259 page_by_changes = [True] * df.height 

260 subline_by_changes = [True] * df.height 

261 

262 if page_by: 

263 # Calculate changes for page_by 

264 # We can use polars shift/diff logic or simple iteration 

265 # Simple iteration is safer for now 

266 for i in range(1, df.height): 

267 prev_row = df.row(i - 1, named=True) 

268 curr_row = df.row(i, named=True) 

269 

270 # Check page_by 

271 is_diff = False 

272 for col in page_by: 

273 if str(prev_row[col]) != str(curr_row[col]): 

274 is_diff = True 

275 break 

276 page_by_changes[i] = is_diff 

277 

278 if subline_by: 

279 for i in range(1, df.height): 

280 prev_row = df.row(i - 1, named=True) 

281 curr_row = df.row(i, named=True) 

282 

283 # Check subline_by 

284 is_diff = False 

285 for col in subline_by: 

286 if str(prev_row[col]) != str(curr_row[col]): 

287 is_diff = True 

288 break 

289 subline_by_changes[i] = is_diff 

290 

291 # Iterate rows 

292 removed_indices = set(removed_column_indices or []) 

293 

294 for row_idx in range(df.height): 

295 # 1. Calculate data_rows 

296 max_lines_in_row = 1 

297 width_idx = 0 

298 

299 for col_idx in range(df.width): 

300 if col_idx in removed_indices: 

301 continue 

302 

303 if width_idx >= len(col_widths): 

304 break 

305 

306 # Calculate individual column width from cumulative widths 

307 # col_widths contains cumulative widths (right boundaries) 

308 current_cumulative = col_widths[width_idx] 

309 prev_cumulative = col_widths[width_idx - 1] if width_idx > 0 else 0 

310 col_width = current_cumulative - prev_cumulative 

311 col_name = df.columns[col_idx] 

312 cell_value = str(df[col_name][row_idx]) 

313 

314 # Font logic 

315 actual_font_size = font_size 

316 actual_font = 1 

317 

318 if table_attrs: 

319 pass 

320 

321 text_width = get_string_width( 

322 cell_value, 

323 font=actual_font, # type: ignore 

324 font_size=actual_font_size, # type: ignore 

325 ) 

326 

327 effective_width = col_width 

328 lines_needed = max(1, int(text_width / effective_width) + 1) 

329 max_lines_in_row = max(max_lines_in_row, lines_needed) 

330 width_idx += 1 

331 

332 # 2. Calculate header rows 

333 pageby_rows = 0 

334 if page_by and page_by_changes[row_idx]: 

335 # Construct header text 

336 header_parts = [] 

337 for col in page_by: 

338 val = df[col][row_idx] 

339 if str(val) != "-----": 

340 header_parts.append(f"{col}: {val}") 

341 header_text = " | ".join(header_parts) 

342 if header_text: 

343 pageby_rows = self._calculate_header_rows( 

344 header_text, total_width, font_size=int(font_size) 

345 ) # type: ignore 

346 

347 subline_rows = 0 

348 if subline_by and subline_by_changes[row_idx]: 

349 # Construct header text 

350 header_parts = [] 

351 for col in subline_by: 

352 val = df[col][row_idx] 

353 if str(val) != "-----": 

354 header_parts.append(f"{col}: {val}") 

355 header_text = " | ".join(header_parts) 

356 if header_text: 

357 subline_rows = self._calculate_header_rows( 

358 header_text, total_width, font_size=int(font_size) 

359 ) # type: ignore 

360 

361 total_rows = max_lines_in_row + pageby_rows + subline_rows 

362 

363 row_metadata_list.append( 

364 { 

365 "row_index": row_idx, 

366 "data_rows": max_lines_in_row, 

367 "pageby_header_rows": pageby_rows, 

368 "subline_header_rows": subline_rows, 

369 "column_header_rows": 0, # To be filled later or passed in 

370 "total_rows": total_rows, 

371 "page": 0, # To be assigned 

372 "is_group_start": page_by_changes[row_idx] if page_by else False, 

373 "is_subline_start": subline_by_changes[row_idx] 

374 if subline_by 

375 else False, 

376 } 

377 ) 

378 

379 # Create DataFrame with explicit schema to handle empty case 

380 schema = { 

381 "row_index": pl.Int64, 

382 "data_rows": pl.Int64, 

383 "pageby_header_rows": pl.Int64, 

384 "subline_header_rows": pl.Int64, 

385 "column_header_rows": pl.Int64, 

386 "total_rows": pl.Int64, 

387 "page": pl.Int64, 

388 "is_group_start": pl.Boolean, 

389 "is_subline_start": pl.Boolean, 

390 } 

391 meta_df = pl.DataFrame(row_metadata_list, schema=schema, orient="row") 

392 

393 # Assign pages 

394 return self._assign_pages(meta_df, additional_rows_per_page, new_page) 

395 

396 def _calculate_header_rows( 

397 self, 

398 header_text: str, 

399 total_width: float, 

400 font: FontName | FontNumber = 1, 

401 font_size: int = 18, 

402 ) -> int: 

403 """Calculate how many rows a header will occupy.""" 

404 text_width = get_string_width(header_text, font=font, font_size=font_size) 

405 return max(1, int(text_width / total_width) + 1) 

406 

407 def _assign_pages( 

408 self, 

409 meta_df: pl.DataFrame, 

410 additional_rows_per_page: int = 0, 

411 new_page: bool = False, 

412 ) -> pl.DataFrame: 

413 """Assign page numbers to the metadata DataFrame.""" 

414 if meta_df.height == 0: 

415 return meta_df 

416 

417 available_rows = max(1, self.pagination.nrow - additional_rows_per_page) 

418 current_page = 1 

419 current_rows = 0 

420 

421 # We need to iterate and update 'page' column 

422 # Convert to list of dicts for mutable iteration 

423 rows = meta_df.to_dicts() 

424 

425 for i, row in enumerate(rows): 

426 row_height = row["total_rows"] 

427 

428 # Check if we need a new page 

429 force_break = False 

430 

431 # Force break on subline start (except first row) 

432 if row["is_subline_start"] and i > 0: 

433 force_break = True 

434 

435 # Force break on group start if requested 

436 if new_page and row["is_group_start"] and i > 0: 

437 force_break = True 

438 

439 if ( 

440 force_break or (current_rows + row_height > available_rows) 

441 ) and current_rows > 0: 

442 current_page += 1 

443 current_rows = 0 

444 

445 row["page"] = current_page 

446 current_rows += row_height 

447 

448 return pl.DataFrame(rows)