Coverage for src / rtflite / encoding / strategies.py: 84%

429 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-28 05:09 +0000

1"""Encoding strategies for different types of RTF documents.""" 

2 

3from abc import ABC, abstractmethod 

4from typing import TYPE_CHECKING 

5 

6from ..services.grouping_service import grouping_service 

7from ..type_guards import ( 

8 is_flat_header_list, 

9 is_nested_header_list, 

10 is_single_body, 

11 is_single_header, 

12) 

13 

14if TYPE_CHECKING: 

15 from ..encode import RTFDocument 

16 

17 

18class EncodingStrategy(ABC): 

19 """Abstract base class for RTF encoding strategies.""" 

20 

21 @abstractmethod 

22 def encode(self, document: "RTFDocument") -> str: 

23 """Encode the document using this strategy. 

24 

25 Args: 

26 document: The RTF document to encode 

27 

28 Returns: 

29 Complete RTF string 

30 """ 

31 pass 

32 

33 

34class SinglePageStrategy(EncodingStrategy): 

35 """Encoding strategy for single-page documents without pagination.""" 

36 

37 def __init__(self): 

38 from ..services import RTFEncodingService 

39 from ..services.document_service import RTFDocumentService 

40 from ..services.figure_service import RTFFigureService 

41 

42 self.encoding_service = RTFEncodingService() 

43 self.document_service = RTFDocumentService() 

44 self.figure_service = RTFFigureService() 

45 

46 def encode(self, document: "RTFDocument") -> str: 

47 """Encode a single-page document with complete border and layout handling. 

48 

49 Args: 

50 document: The RTF document to encode 

51 

52 Returns: 

53 Complete RTF string 

54 """ 

55 import polars as pl 

56 

57 from ..attributes import BroadcastValue 

58 

59 # Handle figure-only documents (no table) 

60 if document.df is None: 

61 return self._encode_figure_only_document_simple(document) 

62 

63 # Check if this is a multi-section document 

64 if isinstance(document.df, list): 

65 return self._encode_multi_section_document(document) 

66 

67 # Original single-page encoding logic for table documents 

68 dim = document.df.shape 

69 

70 # Title 

71 rtf_title = self.encoding_service.encode_title( 

72 document.rtf_title, method="line" 

73 ) 

74 

75 # Page Border 

76 doc_border_top_list = BroadcastValue( 

77 value=document.rtf_page.border_first, dimension=(1, dim[1]) 

78 ).to_list() 

79 doc_border_top = doc_border_top_list[0] if doc_border_top_list else None 

80 doc_border_bottom_list = BroadcastValue( 

81 value=document.rtf_page.border_last, dimension=(1, dim[1]) 

82 ).to_list() 

83 doc_border_bottom = ( 

84 doc_border_bottom_list[0] if doc_border_bottom_list else None 

85 ) 

86 page_border_top = None 

87 page_border_bottom = None 

88 if document.rtf_body is not None and is_single_body(document.rtf_body): 

89 page_border_top_list = BroadcastValue( 

90 value=document.rtf_body.border_first, dimension=(1, dim[1]) 

91 ).to_list() 

92 page_border_top = page_border_top_list[0] if page_border_top_list else None 

93 page_border_bottom_list = BroadcastValue( 

94 value=document.rtf_body.border_last, dimension=(1, dim[1]) 

95 ).to_list() 

96 page_border_bottom = ( 

97 page_border_bottom_list[0] if page_border_bottom_list else None 

98 ) 

99 

100 # Column header 

101 if document.rtf_column_header is None: 

102 rtf_column_header = "" 

103 # Only update borders if DataFrame has rows 

104 if dim[0] > 0: 

105 document.rtf_body.border_top = BroadcastValue( 

106 value=document.rtf_body.border_top, dimension=dim 

107 ).update_row(0, doc_border_top) 

108 else: 

109 # Check if rtf_column_header is a list 

110 header_to_check = None 

111 if is_nested_header_list(document.rtf_column_header): 

112 # Nested list case - get first section's first header 

113 if ( 

114 document.rtf_column_header[0] 

115 and len(document.rtf_column_header[0]) > 0 

116 ): 

117 header_to_check = document.rtf_column_header[0][0] 

118 elif is_flat_header_list(document.rtf_column_header): 

119 # Flat list case - get first header 

120 if len(document.rtf_column_header) > 0: 

121 header_to_check = document.rtf_column_header[0] 

122 elif is_single_header(document.rtf_column_header): # type: ignore[arg-type] 

123 header_to_check = document.rtf_column_header 

124 

125 if ( 

126 header_to_check is not None 

127 and header_to_check.text is None 

128 and is_single_body(document.rtf_body) 

129 and document.rtf_body.as_colheader 

130 ): 

131 # Determine which columns to exclude from headers 

132 excluded_columns = list(document.rtf_body.page_by or []) + list( 

133 document.rtf_body.subline_by or [] 

134 ) 

135 columns = [ 

136 col for col in document.df.columns if col not in excluded_columns 

137 ] 

138 # Create DataFrame with explicit column names to ensure single row 

139 header_df = pl.DataFrame( 

140 [columns], 

141 schema=[f"col_{i}" for i in range(len(columns))], 

142 orient="row", 

143 ) 

144 # Only assign if we have a valid flat header list 

145 if ( 

146 is_flat_header_list(document.rtf_column_header) 

147 and len(document.rtf_column_header) > 0 

148 and document.rtf_column_header[0] is not None 

149 ): 

150 document.rtf_column_header[0].text = header_df # type: ignore[assignment] 

151 

152 # Adjust col_rel_width to match the processed columns 

153 if excluded_columns: 

154 original_cols = list(document.df.columns) 

155 excluded_cols_set = set(excluded_columns) 

156 processed_col_indices = [ 

157 i 

158 for i, col in enumerate(original_cols) 

159 if col not in excluded_cols_set 

160 ] 

161 

162 # Ensure there are enough col_rel_width values for all 

163 # original columns 

164 if document.rtf_body.col_rel_width is not None and len( 

165 document.rtf_body.col_rel_width 

166 ) >= len(original_cols): 

167 if ( 

168 is_flat_header_list(document.rtf_column_header) 

169 and len(document.rtf_column_header) > 0 

170 and document.rtf_column_header[0] is not None 

171 ): 

172 document.rtf_column_header[0].col_rel_width = [ 

173 document.rtf_body.col_rel_width[i] 

174 for i in processed_col_indices 

175 ] 

176 else: 

177 # Fallback: use equal widths if col_rel_width does not 

178 # match or is None 

179 if ( 

180 is_flat_header_list(document.rtf_column_header) 

181 and len(document.rtf_column_header) > 0 

182 and document.rtf_column_header[0] is not None 

183 ): 

184 document.rtf_column_header[0].col_rel_width = [1] * len( 

185 columns 

186 ) 

187 

188 document.rtf_column_header = document.rtf_column_header[:1] 

189 

190 # Only update borders if DataFrame has rows 

191 if ( 

192 dim[0] > 0 

193 and is_flat_header_list(document.rtf_column_header) 

194 and len(document.rtf_column_header) > 0 

195 and document.rtf_column_header[0] is not None 

196 ): 

197 document.rtf_column_header[0].border_top = BroadcastValue( 

198 value=document.rtf_column_header[0].border_top, dimension=dim 

199 ).update_row(0, doc_border_top if doc_border_top is not None else []) 

200 

201 if is_nested_header_list(document.rtf_column_header): 

202 # Handle nested list of headers 

203 rtf_column_header = [] 

204 for section_headers in document.rtf_column_header: 

205 if section_headers: 

206 for header in section_headers: 

207 if header: 

208 rtf_column_header.append( 

209 self.encoding_service.encode_column_header( 

210 header.text, header, document.rtf_page.col_width 

211 ) 

212 ) 

213 elif is_flat_header_list(document.rtf_column_header): 

214 rtf_column_header = [ 

215 self.encoding_service.encode_column_header( 

216 header.text if header else None, 

217 header, 

218 document.rtf_page.col_width, 

219 ) 

220 for header in document.rtf_column_header 

221 ] 

222 elif is_single_header(document.rtf_column_header): # type: ignore[arg-type] 

223 rtf_column_header = [ 

224 self.encoding_service.encode_column_header( 

225 document.rtf_column_header.text, 

226 document.rtf_column_header, 

227 document.rtf_page.col_width, 

228 ) 

229 ] 

230 else: 

231 rtf_column_header = [] 

232 

233 # Only update borders if DataFrame has rows 

234 if ( 

235 dim[0] > 0 

236 and is_single_body(document.rtf_body) 

237 and page_border_top is not None 

238 ): 

239 document.rtf_body.border_top = BroadcastValue( 

240 value=document.rtf_body.border_top, dimension=dim 

241 ).update_row(0, page_border_top) 

242 

243 # Bottom border last line update 

244 if document.rtf_footnote is not None: 

245 if page_border_bottom is not None: 

246 document.rtf_footnote.border_bottom = BroadcastValue( 

247 value=document.rtf_footnote.border_bottom, dimension=(1, 1) 

248 ).update_row(0, [page_border_bottom[0]]) 

249 

250 if doc_border_bottom is not None: 

251 document.rtf_footnote.border_bottom = BroadcastValue( 

252 value=document.rtf_footnote.border_bottom, dimension=(1, 1) 

253 ).update_row(0, [doc_border_bottom[0]]) 

254 else: 

255 # Only update borders if DataFrame has rows 

256 if dim[0] > 0: 

257 if page_border_bottom is not None and is_single_body(document.rtf_body): 

258 document.rtf_body.border_bottom = BroadcastValue( 

259 value=document.rtf_body.border_bottom, dimension=dim 

260 ).update_row(dim[0] - 1, page_border_bottom) 

261 

262 if doc_border_bottom is not None and is_single_body(document.rtf_body): 

263 document.rtf_body.border_bottom = BroadcastValue( 

264 value=document.rtf_body.border_bottom, dimension=dim 

265 ).update_row(dim[0] - 1, doc_border_bottom) 

266 

267 # Set document color context for accurate color index resolution 

268 from ..services.color_service import color_service 

269 

270 color_service.set_document_context(document) 

271 

272 # Body 

273 rtf_body = self.encoding_service.encode_body( 

274 document, document.df, document.rtf_body, force_single_page=True 

275 ) 

276 

277 result = "\n".join( 

278 [ 

279 item 

280 for item in [ 

281 self.encoding_service.encode_document_start(), 

282 self.encoding_service.encode_font_table(), 

283 self.encoding_service.encode_color_table(document), 

284 "\n", 

285 self.encoding_service.encode_page_header( 

286 document.rtf_page_header, method="line" 

287 ), 

288 self.encoding_service.encode_page_footer( 

289 document.rtf_page_footer, method="line" 

290 ), 

291 self.encoding_service.encode_page_settings(document.rtf_page), 

292 rtf_title, 

293 "\n", 

294 self.encoding_service.encode_subline( 

295 document.rtf_subline, method="line" 

296 ), 

297 self.figure_service.encode_figure(document.rtf_figure) 

298 if document.rtf_figure is not None 

299 and document.rtf_figure.fig_pos == "before" 

300 else None, 

301 "\n".join( 

302 header for sublist in rtf_column_header for header in sublist 

303 ) 

304 if rtf_column_header 

305 else None, 

306 "\n".join(rtf_body), 

307 "\n".join( 

308 self.encoding_service.encode_footnote( 

309 document.rtf_footnote, 

310 page_number=1, 

311 page_col_width=document.rtf_page.col_width, 

312 ) 

313 ) 

314 if document.rtf_footnote is not None 

315 else None, 

316 "\n".join( 

317 self.encoding_service.encode_source( 

318 document.rtf_source, 

319 page_number=1, 

320 page_col_width=document.rtf_page.col_width, 

321 ) 

322 ) 

323 if document.rtf_source is not None 

324 else None, 

325 self.figure_service.encode_figure(document.rtf_figure) 

326 if document.rtf_figure is not None 

327 and document.rtf_figure.fig_pos == "after" 

328 else None, 

329 "\n\n", 

330 "}", 

331 ] 

332 if item is not None 

333 ] 

334 ) 

335 

336 # Clear document context after encoding 

337 color_service.clear_document_context() 

338 

339 return result 

340 

341 def _encode_multi_section_document(self, document: "RTFDocument") -> str: 

342 """Encode a multi-section document where sections are concatenated row by row. 

343 

344 Args: 

345 document: The RTF document with multiple df/rtf_body sections 

346 

347 Returns: 

348 Complete RTF string 

349 """ 

350 from ..attributes import BroadcastValue 

351 

352 # Calculate column counts for border management 

353 if isinstance(document.df, list): 

354 first_section_cols = document.df[0].shape[1] if document.df else 0 

355 else: 

356 first_section_cols = document.df.shape[1] if document.df is not None else 0 

357 

358 # Document structure components 

359 rtf_title = self.encoding_service.encode_title( 

360 document.rtf_title, method="line" 

361 ) 

362 

363 # Handle page borders (use first section for dimensions) 

364 doc_border_top_list = BroadcastValue( 

365 value=document.rtf_page.border_first, dimension=(1, first_section_cols) 

366 ).to_list() 

367 doc_border_top = doc_border_top_list[0] if doc_border_top_list else None 

368 doc_border_bottom_list = BroadcastValue( 

369 value=document.rtf_page.border_last, dimension=(1, first_section_cols) 

370 ).to_list() 

371 doc_border_bottom = ( 

372 doc_border_bottom_list[0] if doc_border_bottom_list else None 

373 ) 

374 

375 # Encode sections 

376 all_section_content = [] 

377 is_nested_headers = is_nested_header_list(document.rtf_column_header) 

378 

379 df_list = ( 

380 document.df 

381 if isinstance(document.df, list) 

382 else [document.df] 

383 if document.df is not None 

384 else [] 

385 ) 

386 body_list = ( 

387 document.rtf_body 

388 if isinstance(document.rtf_body, list) 

389 else [document.rtf_body] 

390 if document.rtf_body is not None 

391 else [] 

392 ) 

393 

394 for i, (section_df, section_body) in enumerate( 

395 zip(df_list, body_list, strict=True) 

396 ): 

397 dim = section_df.shape 

398 

399 # Handle column headers for this section 

400 section_headers: list[str] = [] 

401 if is_nested_headers: 

402 # Nested format: [[header1], [None], [header3]] 

403 if ( 

404 i < len(document.rtf_column_header) 

405 and document.rtf_column_header[i] 

406 ): 

407 for header in document.rtf_column_header[i]: 

408 if header is not None: 

409 from ..input import RTFColumnHeader 

410 

411 # Ensure header is RTFColumnHeader, not tuple 

412 if not isinstance(header, RTFColumnHeader): 

413 continue 

414 # Apply top border to first section's first header 

415 if ( 

416 i == 0 

417 and not section_headers 

418 and doc_border_top is not None 

419 ): 

420 header.border_top = BroadcastValue( 

421 value=header.border_top, dimension=dim 

422 ).update_row(0, doc_border_top) 

423 

424 section_headers.append( 

425 self.encoding_service.encode_column_header( 

426 header.text, header, document.rtf_page.col_width 

427 ) 

428 ) 

429 else: 

430 # Flat format - only apply to first section 

431 if i == 0: 

432 headers_to_check = [] 

433 if is_flat_header_list(document.rtf_column_header): 

434 headers_to_check = document.rtf_column_header 

435 elif is_single_header(document.rtf_column_header): # type: ignore[arg-type] 

436 headers_to_check = [document.rtf_column_header] 

437 

438 for header in headers_to_check: 

439 if ( 

440 header is not None 

441 and header.text is None 

442 and section_body.as_colheader 

443 ): 

444 # Auto-generate headers from column names 

445 columns = [ 

446 col 

447 for col in section_df.columns 

448 if col not in (section_body.page_by or []) 

449 ] 

450 import polars as pl 

451 

452 header_df = pl.DataFrame( 

453 [columns], 

454 schema=[f"col_{j}" for j in range(len(columns))], 

455 orient="row", 

456 ) 

457 header.text = header_df # type: ignore[assignment] 

458 

459 # Apply top border to first header 

460 if ( 

461 not section_headers 

462 and doc_border_top is not None 

463 and header is not None 

464 ): 

465 header.border_top = BroadcastValue( 

466 value=header.border_top, dimension=dim 

467 ).update_row( 

468 0, doc_border_top if doc_border_top is not None else [] 

469 ) 

470 

471 if header is not None: 

472 section_headers.append( 

473 self.encoding_service.encode_column_header( 

474 header.text, header, document.rtf_page.col_width 

475 ) 

476 ) 

477 

478 # Handle borders for section body 

479 if i == 0 and not section_headers: # First section, no headers 

480 # Apply top border to first row of first section 

481 section_body.border_top = BroadcastValue( 

482 value=section_body.border_top, dimension=dim 

483 ).update_row(0, doc_border_top if doc_border_top is not None else []) 

484 

485 # Create a temporary document for this section to maintain compatibility 

486 from copy import deepcopy 

487 

488 temp_document = deepcopy(document) 

489 temp_document.df = section_df 

490 temp_document.rtf_body = section_body 

491 

492 # Encode section body 

493 section_body_content = self.encoding_service.encode_body( 

494 temp_document, section_df, section_body 

495 ) 

496 

497 # Add section content 

498 if section_headers: 

499 all_section_content.extend( 

500 [ 

501 "\n".join( 

502 header for sublist in section_headers for header in sublist 

503 ) 

504 ] 

505 ) 

506 all_section_content.extend(section_body_content) 

507 

508 # Handle bottom borders on last section 

509 if document.rtf_footnote is not None and doc_border_bottom is not None: 

510 document.rtf_footnote.border_bottom = BroadcastValue( 

511 value=document.rtf_footnote.border_bottom, dimension=(1, 1) 

512 ).update_row(0, [doc_border_bottom[0]]) 

513 else: 

514 # Apply bottom border to last section's last row 

515 if isinstance(document.rtf_body, list) and isinstance(document.df, list): 

516 last_section_body = document.rtf_body[-1] 

517 last_section_dim = document.df[-1].shape 

518 if last_section_dim[0] > 0 and doc_border_bottom is not None: 

519 last_section_body.border_bottom = BroadcastValue( 

520 value=last_section_body.border_bottom, 

521 dimension=last_section_dim, 

522 ).update_row(last_section_dim[0] - 1, doc_border_bottom) 

523 

524 return "\n".join( 

525 [ 

526 item 

527 for item in [ 

528 self.encoding_service.encode_document_start(), 

529 self.encoding_service.encode_font_table(), 

530 "\n", 

531 self.encoding_service.encode_page_header( 

532 document.rtf_page_header, method="line" 

533 ), 

534 self.encoding_service.encode_page_footer( 

535 document.rtf_page_footer, method="line" 

536 ), 

537 self.encoding_service.encode_page_settings(document.rtf_page), 

538 rtf_title, 

539 "\n", 

540 self.encoding_service.encode_subline( 

541 document.rtf_subline, method="line" 

542 ), 

543 "\n".join(all_section_content), 

544 "\n".join( 

545 self.encoding_service.encode_footnote( 

546 document.rtf_footnote, 

547 page_number=1, 

548 page_col_width=document.rtf_page.col_width, 

549 ) 

550 ) 

551 if document.rtf_footnote is not None 

552 else None, 

553 "\n".join( 

554 self.encoding_service.encode_source( 

555 document.rtf_source, 

556 page_number=1, 

557 page_col_width=document.rtf_page.col_width, 

558 ) 

559 ) 

560 if document.rtf_source is not None 

561 else None, 

562 "\n\n", 

563 "}", 

564 ] 

565 if item is not None 

566 ] 

567 ) 

568 

569 def _encode_figure_only_document_simple(self, document: "RTFDocument") -> str: 

570 """Encode a figure-only document with simple page layout. 

571 

572 This handles figure-only documents with default page settings. 

573 Multiple figures will have page breaks between them (handled by FigureService). 

574 

575 Args: 

576 document: The RTF document with only figure content 

577 

578 Returns: 

579 Complete RTF string 

580 """ 

581 # Build RTF components for simple figure-only document 

582 rtf_title = self.encoding_service.encode_title( 

583 document.rtf_title, method="line" 

584 ) 

585 

586 # Assemble final RTF document 

587 return "".join( 

588 [ 

589 item 

590 for item in [ 

591 self.encoding_service.encode_document_start(), 

592 self.encoding_service.encode_font_table(), 

593 self.encoding_service.encode_color_table(document), 

594 "\n", 

595 self.encoding_service.encode_page_header( 

596 document.rtf_page_header, method="line" 

597 ), 

598 self.encoding_service.encode_page_footer( 

599 document.rtf_page_footer, method="line" 

600 ), 

601 self.encoding_service.encode_page_settings(document.rtf_page), 

602 rtf_title, 

603 "\n", 

604 self.encoding_service.encode_subline( 

605 document.rtf_subline, method="line" 

606 ), 

607 # FigureService handles page breaks between multiple figures 

608 self.figure_service.encode_figure(document.rtf_figure), 

609 "\n".join( 

610 self.encoding_service.encode_footnote( 

611 document.rtf_footnote, 

612 page_number=1, 

613 page_col_width=document.rtf_page.col_width, 

614 ) 

615 ) 

616 if document.rtf_footnote is not None 

617 else None, 

618 "\n".join( 

619 self.encoding_service.encode_source( 

620 document.rtf_source, 

621 page_number=1, 

622 page_col_width=document.rtf_page.col_width, 

623 ) 

624 ) 

625 if document.rtf_source is not None 

626 else None, 

627 "\n\n", 

628 "}", 

629 ] 

630 if item is not None 

631 ] 

632 ) 

633 

634 

635class PaginatedStrategy(EncodingStrategy): 

636 """Encoding strategy for multi-page documents with pagination.""" 

637 

638 def __init__(self): 

639 from ..services import RTFEncodingService 

640 from ..services.document_service import RTFDocumentService 

641 from ..services.figure_service import RTFFigureService 

642 

643 self.encoding_service = RTFEncodingService() 

644 self.document_service = RTFDocumentService() 

645 self.figure_service = RTFFigureService() 

646 

647 def encode(self, document: "RTFDocument") -> str: 

648 """Encode a paginated document with full pagination support. 

649 

650 Args: 

651 document: The RTF document to encode 

652 

653 Returns: 

654 Complete RTF string 

655 """ 

656 from copy import deepcopy 

657 

658 import polars as pl 

659 

660 from ..attributes import BroadcastValue 

661 from ..row import Utils 

662 

663 # Handle figure-only documents with multi-page behavior 

664 if document.df is None: 

665 return self._encode_figure_only_document_with_pagination(document) 

666 

667 # Get dimensions based on DataFrame type 

668 if isinstance(document.df, list): 

669 # For list of DataFrames, use first one's columns 

670 dim = ( 

671 sum(df.shape[0] for df in document.df), 

672 document.df[0].shape[1] if document.df else 0, 

673 ) 

674 else: 

675 dim = document.df.shape 

676 

677 # Set document color context for accurate color index resolution 

678 from ..services.color_service import color_service 

679 

680 color_service.set_document_context(document) 

681 

682 # Prepare DataFrame for processing (remove subline_by columns, apply 

683 # group_by if needed) 

684 processed_df, original_df = ( 

685 self.encoding_service.prepare_dataframe_for_body_encoding( 

686 document.df, document.rtf_body 

687 ) 

688 ) 

689 

690 # Validate subline_by formatting consistency before processing 

691 if ( 

692 is_single_body(document.rtf_body) 

693 and document.rtf_body.subline_by is not None 

694 ): 

695 import warnings 

696 from typing import cast 

697 

698 subline_by_list = cast(list[str], document.rtf_body.subline_by) 

699 formatting_warnings = ( 

700 grouping_service.validate_subline_formatting_consistency( 

701 original_df, subline_by_list, document.rtf_body 

702 ) 

703 ) 

704 for warning_msg in formatting_warnings: 

705 warnings.warn( 

706 f"subline_by formatting: {warning_msg}", UserWarning, stacklevel=3 

707 ) 

708 

709 # Get pagination instance and distribute content (use processed data 

710 # for distribution) 

711 _, distributor = self.document_service.create_pagination_instance(document) 

712 col_total_width = document.rtf_page.col_width 

713 if ( 

714 is_single_body(document.rtf_body) 

715 and document.rtf_body.col_rel_width is not None 

716 ): 

717 col_widths = Utils._col_widths( 

718 document.rtf_body.col_rel_width, 

719 col_total_width if col_total_width is not None else 8.5, 

720 ) 

721 else: 

722 # Default to equal widths if body is not single 

723 # Use processed_df column count (after page_by/subline_by columns removed) 

724 col_widths = Utils._col_widths( 

725 [1] * processed_df.shape[1], col_total_width if col_total_width is not None else 8.5 

726 ) 

727 

728 # Calculate additional rows per page for r2rtf compatibility 

729 additional_rows = self.document_service.calculate_additional_rows_per_page( 

730 document 

731 ) 

732 

733 # Use original DataFrame for pagination logic (to identify subline_by breaks) 

734 # but processed DataFrame for the actual content 

735 if is_single_body(document.rtf_body): 

736 # Use original DataFrame for proper pagination distribution logic 

737 pages = distributor.distribute_content( 

738 df=original_df, 

739 col_widths=col_widths, 

740 page_by=document.rtf_body.page_by, 

741 new_page=document.rtf_body.new_page, 

742 pageby_header=document.rtf_body.pageby_header, 

743 table_attrs=document.rtf_body, 

744 additional_rows_per_page=additional_rows, 

745 subline_by=document.rtf_body.subline_by, 

746 ) 

747 else: 

748 # Default pagination if body is not single 

749 pages = distributor.distribute_content( 

750 df=original_df, 

751 col_widths=col_widths, 

752 page_by=None, 

753 new_page=None, 

754 pageby_header=None, 

755 table_attrs=None, 

756 additional_rows_per_page=additional_rows, 

757 subline_by=None, 

758 ) 

759 

760 # Replace page data with processed data (without subline_by columns) 

761 for page_info in pages: 

762 start_row = page_info["start_row"] 

763 end_row = page_info["end_row"] 

764 page_info["data"] = processed_df.slice(start_row, end_row - start_row + 1) 

765 

766 # Apply group_by processing to each page if needed 

767 if is_single_body(document.rtf_body) and document.rtf_body.group_by: 

768 # Calculate global page start indices for context restoration 

769 page_start_indices = [] 

770 cumulative_rows = 0 

771 for i, page_info in enumerate(pages): 

772 if i > 0: # Skip first page (starts at 0) 

773 page_start_indices.append(cumulative_rows) 

774 cumulative_rows += len(page_info["data"]) 

775 

776 # Process all pages together for proper group_by and page context 

777 # restoration 

778 all_page_data = [] 

779 for page_info in pages: 

780 all_page_data.append(page_info["data"]) 

781 

782 # Concatenate all page data 

783 full_df = all_page_data[0] 

784 for page_df in all_page_data[1:]: 

785 full_df = full_df.vstack(page_df) 

786 

787 # Apply group_by suppression to the full dataset 

788 from typing import cast 

789 

790 group_by_param = cast(list[str] | None, document.rtf_body.group_by) 

791 suppressed_df = grouping_service.enhance_group_by(full_df, group_by_param) 

792 

793 # Apply page context restoration 

794 from typing import cast 

795 

796 group_by_list2 = cast(list[str], document.rtf_body.group_by) 

797 restored_df = grouping_service.restore_page_context( 

798 suppressed_df, full_df, group_by_list2, page_start_indices 

799 ) 

800 

801 # Split the processed data back to pages 

802 start_idx = 0 

803 for page_info in pages: 

804 page_rows = len(page_info["data"]) 

805 page_info["data"] = restored_df.slice(start_idx, page_rows) 

806 start_idx += page_rows 

807 

808 # Prepare border settings 

809 border_first_list = BroadcastValue( 

810 value=document.rtf_page.border_first, dimension=(1, dim[1]) 

811 ).to_list() 

812 _ = ( 

813 border_first_list[0] if border_first_list else None 

814 ) # May be used for validation 

815 border_last_list = BroadcastValue( 

816 value=document.rtf_page.border_last, dimension=(1, dim[1]) 

817 ).to_list() 

818 _ = ( 

819 border_last_list[0] if border_last_list else None 

820 ) # May be used for validation 

821 

822 # Generate RTF for each page 

823 page_contents = [] 

824 

825 for page_info in pages: 

826 page_elements = [] 

827 

828 # Add page break before each page (except first) 

829 if not page_info["is_first_page"]: 

830 page_elements.append( 

831 self.document_service.generate_page_break(document) 

832 ) 

833 

834 # Add title if it should appear on this page 

835 if ( 

836 document.rtf_title 

837 and document.rtf_title.text 

838 and self.document_service.should_show_element_on_page( 

839 document.rtf_page.page_title, page_info 

840 ) 

841 ): 

842 title_content = self.encoding_service.encode_title( 

843 document.rtf_title, method="line" 

844 ) 

845 if title_content: 

846 page_elements.append(title_content) 

847 page_elements.append("\n") 

848 

849 # Add subline if it should appear on this page 

850 if ( 

851 document.rtf_subline 

852 and document.rtf_subline.text 

853 and self.document_service.should_show_element_on_page( 

854 document.rtf_page.page_title, page_info 

855 ) 

856 ): 

857 subline_content = self.encoding_service.encode_subline( 

858 document.rtf_subline, method="line" 

859 ) 

860 if subline_content: 

861 page_elements.append(subline_content) 

862 

863 # Add subline_by header paragraph if specified 

864 if page_info.get("subline_header"): 

865 subline_header_content = self._generate_subline_header( 

866 page_info["subline_header"], document.rtf_body 

867 ) 

868 if subline_header_content: 

869 page_elements.append(subline_header_content) 

870 

871 # Add figures if they should appear on the first page 

872 # and position is 'before' 

873 if ( 

874 document.rtf_figure 

875 and document.rtf_figure.figures 

876 and document.rtf_figure.fig_pos == "before" 

877 and page_info["is_first_page"] 

878 ): 

879 figure_content = self.figure_service.encode_figure(document.rtf_figure) 

880 if figure_content: 

881 page_elements.append(figure_content) 

882 page_elements.append("\n") 

883 

884 # Add column headers if needed 

885 if page_info["needs_header"] and document.rtf_column_header: 

886 if ( 

887 is_flat_header_list(document.rtf_column_header) 

888 and len(document.rtf_column_header) > 0 

889 and document.rtf_column_header[0] is not None 

890 and document.rtf_column_header[0].text is None 

891 and is_single_body(document.rtf_body) 

892 and document.rtf_body.as_colheader 

893 ): 

894 # Use processed page data columns (which already have 

895 # subline_by columns removed) 

896 page_df = page_info["data"] 

897 columns = list(page_df.columns) 

898 # Create DataFrame for text field (not assign list to text) 

899 import polars as pl 

900 

901 header_df = pl.DataFrame( 

902 [columns], 

903 schema=[f"col_{i}" for i in range(len(columns))], 

904 orient="row", 

905 ) 

906 document.rtf_column_header[0].text = header_df # type: ignore[assignment] 

907 

908 # Adjust col_rel_width to match processed columns (without 

909 # subline_by and page_by) 

910 if ( 

911 is_single_body(document.rtf_body) 

912 and (document.rtf_body.subline_by or document.rtf_body.page_by) 

913 ): 

914 original_cols = ( 

915 list(document.df.columns) 

916 if isinstance(document.df, pl.DataFrame) 

917 else [] 

918 ) 

919 # Collect columns that should be excluded 

920 excluded_cols: set[str] = set() 

921 if document.rtf_body.subline_by: 

922 excluded_cols.update(document.rtf_body.subline_by) 

923 if document.rtf_body.page_by: 

924 excluded_cols.update(document.rtf_body.page_by) 

925 

926 processed_col_indices = [ 

927 i 

928 for i, col in enumerate(original_cols) 

929 if col not in excluded_cols 

930 ] 

931 

932 # Ensure there are enough col_rel_width values for all 

933 # original columns 

934 if ( 

935 is_single_body(document.rtf_body) 

936 and document.rtf_body.col_rel_width is not None 

937 and len(document.rtf_body.col_rel_width) 

938 >= len(original_cols) 

939 and is_flat_header_list(document.rtf_column_header) 

940 and len(document.rtf_column_header) > 0 

941 and document.rtf_column_header[0] is not None 

942 ): 

943 document.rtf_column_header[0].col_rel_width = [ 

944 document.rtf_body.col_rel_width[i] 

945 for i in processed_col_indices 

946 ] 

947 elif ( 

948 is_flat_header_list(document.rtf_column_header) 

949 and len(document.rtf_column_header) > 0 

950 and document.rtf_column_header[0] is not None 

951 ): 

952 # Fallback: use equal widths if col_rel_width doesn't match 

953 document.rtf_column_header[0].col_rel_width = [1] * len( 

954 columns 

955 ) 

956 

957 # Apply pagination borders to column headers 

958 # Process each column header with proper borders 

959 header_elements = [] 

960 headers_to_process = [] 

961 if is_nested_header_list(document.rtf_column_header): 

962 # For nested headers, flatten them 

963 for section_headers in document.rtf_column_header: 

964 if section_headers: 

965 headers_to_process.extend(section_headers) 

966 elif is_flat_header_list(document.rtf_column_header): 

967 headers_to_process = document.rtf_column_header 

968 

969 for i, header in enumerate(headers_to_process): 

970 if header is None: 

971 continue 

972 header_copy = deepcopy(header) 

973 

974 # Remove page_by/subline_by columns from header to match body 

975 import polars as pl 

976 if isinstance(header_copy.text, pl.DataFrame): 

977 columns_to_remove = set() 

978 if document.rtf_body.page_by: 

979 columns_to_remove.update(document.rtf_body.page_by) 

980 if document.rtf_body.subline_by: 

981 columns_to_remove.update(document.rtf_body.subline_by) 

982 

983 if columns_to_remove: 

984 remaining_columns = [ 

985 col for col in header_copy.text.columns 

986 if col not in columns_to_remove 

987 ] 

988 header_copy.text = header_copy.text.select(remaining_columns) 

989 

990 # Apply page-level borders to column headers (matching 

991 # non-paginated behavior) 

992 if ( 

993 page_info["is_first_page"] 

994 and i == 0 

995 and document.rtf_page.border_first 

996 and header_copy.text is not None 

997 ): # First header on first page 

998 # Get dimensions based on text type 

999 if isinstance(header_copy.text, pl.DataFrame): 

1000 header_dims = header_copy.text.shape 

1001 else: 

1002 # For Sequence[str], assume single row 

1003 header_dims = ( 

1004 1, 

1005 len(header_copy.text) if header_copy.text else 0, 

1006 ) 

1007 # Apply page border_first to top of first column header 

1008 header_copy.border_top = BroadcastValue( 

1009 value=header_copy.border_top, dimension=header_dims 

1010 ).update_row( 

1011 0, [document.rtf_page.border_first] * header_dims[1] 

1012 ) 

1013 

1014 # Encode the header with modified borders 

1015 # Use the header_copy to preserve border modifications 

1016 header_rtf = self.encoding_service.encode_column_header( 

1017 header_copy.text, header_copy, document.rtf_page.col_width 

1018 ) 

1019 header_elements.extend(header_rtf) 

1020 

1021 page_elements.extend(header_elements) 

1022 

1023 # Add page_by spanning table row after headers if specified 

1024 if page_info.get("pageby_header_info"): 

1025 # Extract group values for spanning row text 

1026 header_info = page_info["pageby_header_info"] 

1027 if "group_values" in header_info: 

1028 header_parts = [ 

1029 str(value) 

1030 for value in header_info["group_values"].values() 

1031 if value is not None 

1032 ] 

1033 if header_parts: 

1034 header_text = ", ".join(header_parts) 

1035 # Use shared encoding service method 

1036 pageby_row_content = self.encoding_service.encode_spanning_row( 

1037 text=header_text, 

1038 page_width=document.rtf_page.col_width 

1039 if document.rtf_page.col_width 

1040 else 8.5, 

1041 rtf_body_attrs=document.rtf_body, 

1042 ) 

1043 page_elements.extend(pageby_row_content) 

1044 

1045 # Add page content (table body) with proper border handling 

1046 page_df = page_info["data"] 

1047 

1048 # Apply pagination borders to the body attributes 

1049 page_attrs = self.document_service.apply_pagination_borders( 

1050 document, document.rtf_body, page_info, len(pages) 

1051 ) 

1052 

1053 # Check if there are group boundaries within this page 

1054 if page_info.get("group_boundaries"): 

1055 # Handle mid-page group changes: insert spanning rows at boundaries 

1056 group_boundaries = page_info["group_boundaries"] 

1057 prev_row = 0 

1058 

1059 for boundary in group_boundaries: 

1060 page_relative_row = boundary["page_relative_row"] 

1061 

1062 # Encode rows before this boundary 

1063 if page_relative_row > prev_row: 

1064 segment_df = page_df[prev_row:page_relative_row] 

1065 segment_body = page_attrs._encode(segment_df, col_widths) 

1066 page_elements.extend(segment_body) 

1067 

1068 # Insert spanning row at boundary 

1069 group_values = boundary["group_values"] 

1070 header_parts = [ 

1071 str(value) 

1072 for value in group_values.values() 

1073 if value is not None 

1074 ] 

1075 if header_parts: 

1076 header_text = ", ".join(header_parts) 

1077 spanning_row = self.encoding_service.encode_spanning_row( 

1078 text=header_text, 

1079 page_width=document.rtf_page.col_width or 8.5, 

1080 rtf_body_attrs=document.rtf_body, 

1081 ) 

1082 page_elements.extend(spanning_row) 

1083 

1084 prev_row = page_relative_row 

1085 

1086 # Encode remaining rows after last boundary 

1087 if prev_row < len(page_df): 

1088 segment_df = page_df[prev_row:] 

1089 

1090 # For the last segment on non-last pages, we need to ensure 

1091 # the bottom border is applied correctly 

1092 # The border was applied to page_df row indices, but we're now 

1093 # encoding a segment, so we need to adjust 

1094 if ( 

1095 not page_info["is_last_page"] 

1096 and is_single_body(document.rtf_body) 

1097 and document.rtf_body.border_last 

1098 ): 

1099 # Apply bottom border to the last row of this segment 

1100 # This ensures proper table closing on middle pages 

1101 import copy 

1102 

1103 segment_attrs = copy.deepcopy(page_attrs) 

1104 

1105 # Adjust border_bottom to apply to last row of segment 

1106 last_segment_row = len(segment_df) - 1 

1107 if segment_attrs.border_bottom: 

1108 # Ensure border_bottom is sized correctly for segment 

1109 border_style = ( 

1110 document.rtf_body.border_last[0][0] 

1111 if isinstance(document.rtf_body.border_last, list) 

1112 else document.rtf_body.border_last 

1113 ) 

1114 # Set bottom border for all columns on last row 

1115 for col_idx in range(len(segment_df.columns)): 

1116 if last_segment_row < len( 

1117 segment_attrs.border_bottom 

1118 ): 

1119 if col_idx < len( 

1120 segment_attrs.border_bottom[last_segment_row] 

1121 ): 

1122 segment_attrs.border_bottom[last_segment_row][ 

1123 col_idx 

1124 ] = border_style 

1125 

1126 segment_body = segment_attrs._encode(segment_df, col_widths) 

1127 else: 

1128 segment_body = page_attrs._encode(segment_df, col_widths) 

1129 

1130 page_elements.extend(segment_body) 

1131 else: 

1132 # No group boundaries: encode entire page as before 

1133 page_body = page_attrs._encode(page_df, col_widths) 

1134 page_elements.extend(page_body) 

1135 

1136 # Add footnote if it should appear on this page 

1137 if ( 

1138 document.rtf_footnote 

1139 and document.rtf_footnote.text 

1140 and self.document_service.should_show_element_on_page( 

1141 document.rtf_page.page_footnote, page_info 

1142 ) 

1143 ): 

1144 footnote_content = self.encoding_service.encode_footnote( 

1145 document.rtf_footnote, 

1146 page_info["page_number"], 

1147 document.rtf_page.col_width, 

1148 ) 

1149 if footnote_content: 

1150 page_elements.extend(footnote_content) 

1151 

1152 # Add source if it should appear on this page 

1153 if ( 

1154 document.rtf_source 

1155 and document.rtf_source.text 

1156 and self.document_service.should_show_element_on_page( 

1157 document.rtf_page.page_source, page_info 

1158 ) 

1159 ): 

1160 source_content = self.encoding_service.encode_source( 

1161 document.rtf_source, 

1162 page_info["page_number"], 

1163 document.rtf_page.col_width, 

1164 ) 

1165 if source_content: 

1166 page_elements.extend(source_content) 

1167 

1168 # Add figures if they should appear on the last page and position is 'after' 

1169 if ( 

1170 document.rtf_figure 

1171 and document.rtf_figure.figures 

1172 and document.rtf_figure.fig_pos == "after" 

1173 and page_info["is_last_page"] 

1174 ): 

1175 figure_content = self.figure_service.encode_figure(document.rtf_figure) 

1176 if figure_content: 

1177 page_elements.append(figure_content) 

1178 

1179 page_contents.extend(page_elements) 

1180 

1181 # Build complete RTF document 

1182 result = "\n".join( 

1183 [ 

1184 item 

1185 for item in [ 

1186 self.encoding_service.encode_document_start(), 

1187 self.encoding_service.encode_font_table(), 

1188 self.encoding_service.encode_color_table(document), 

1189 "\n", 

1190 self.encoding_service.encode_page_header( 

1191 document.rtf_page_header, method="line" 

1192 ), 

1193 self.encoding_service.encode_page_footer( 

1194 document.rtf_page_footer, method="line" 

1195 ), 

1196 self.encoding_service.encode_page_settings(document.rtf_page), 

1197 "\n".join(page_contents), 

1198 "\n\n", 

1199 "}", 

1200 ] 

1201 if item is not None 

1202 ] 

1203 ) 

1204 

1205 # Clear document context after encoding 

1206 color_service.clear_document_context() 

1207 

1208 return result 

1209 

1210 def _encode_figure_only_document_with_pagination( 

1211 self, document: "RTFDocument" 

1212 ) -> str: 

1213 """Encode a figure-only document with multi-page behavior. 

1214 

1215 This method handles figure-only documents where the user has requested 

1216 elements to appear on all pages (page_title="all", etc.) 

1217 

1218 For multiple figures, each figure will be on a separate page with 

1219 repeated titles/footnotes/sources as specified. 

1220 

1221 Args: 

1222 document: The RTF document with only figure content 

1223 

1224 Returns: 

1225 Complete RTF string 

1226 """ 

1227 from copy import deepcopy 

1228 

1229 from ..figure import rtf_read_figure 

1230 

1231 # Get figure information 

1232 if document.rtf_figure is None or document.rtf_figure.figures is None: 

1233 return "" 

1234 

1235 # Read figure data to determine number of figures 

1236 figure_data_list, figure_formats = rtf_read_figure(document.rtf_figure.figures) 

1237 num_figures = len(figure_data_list) 

1238 

1239 # Build RTF components 

1240 rtf_title = self.encoding_service.encode_title( 

1241 document.rtf_title, method="line" 

1242 ) 

1243 

1244 # For figure-only documents, footnote should be as_table=False 

1245 footnote_component = document.rtf_footnote 

1246 if footnote_component is not None: 

1247 footnote_component = deepcopy(footnote_component) 

1248 footnote_component.as_table = False 

1249 

1250 # Determine which elements should show on each page 

1251 show_title_on_all = document.rtf_page.page_title == "all" 

1252 show_footnote_on_all = document.rtf_page.page_footnote == "all" 

1253 show_source_on_all = document.rtf_page.page_source == "all" 

1254 

1255 page_elements = [] 

1256 

1257 # Add document start 

1258 page_elements.append(self.encoding_service.encode_document_start()) 

1259 page_elements.append(self.encoding_service.encode_font_table()) 

1260 page_elements.append(self.encoding_service.encode_color_table(document)) 

1261 page_elements.append("\n") 

1262 

1263 # Add page settings (headers/footers) 

1264 page_elements.append( 

1265 self.encoding_service.encode_page_header( 

1266 document.rtf_page_header, method="line" 

1267 ) 

1268 ) 

1269 page_elements.append( 

1270 self.encoding_service.encode_page_footer( 

1271 document.rtf_page_footer, method="line" 

1272 ) 

1273 ) 

1274 page_elements.append( 

1275 self.encoding_service.encode_page_settings(document.rtf_page) 

1276 ) 

1277 

1278 # Create each page with figure and repeated elements 

1279 for i in range(num_figures): 

1280 is_first_page = i == 0 

1281 is_last_page = i == num_figures - 1 

1282 

1283 # Add title based on page settings 

1284 if ( 

1285 show_title_on_all 

1286 or (document.rtf_page.page_title == "first" and is_first_page) 

1287 or (document.rtf_page.page_title == "last" and is_last_page) 

1288 ): 

1289 page_elements.append(rtf_title) 

1290 page_elements.append("\n") 

1291 

1292 # Add subline 

1293 if is_first_page: # Only on first page 

1294 page_elements.append( 

1295 self.encoding_service.encode_subline( 

1296 document.rtf_subline, method="line" 

1297 ) 

1298 ) 

1299 

1300 # Add single figure 

1301 width = self.figure_service._get_dimension(document.rtf_figure.fig_width, i) 

1302 height = self.figure_service._get_dimension( 

1303 document.rtf_figure.fig_height, i 

1304 ) 

1305 

1306 figure_rtf = self.figure_service._encode_single_figure( 

1307 figure_data_list[i], 

1308 figure_formats[i], 

1309 width, 

1310 height, 

1311 document.rtf_figure.fig_align, 

1312 ) 

1313 page_elements.append(figure_rtf) 

1314 page_elements.append("\\par ") 

1315 

1316 # Add footnote based on page settings 

1317 if footnote_component is not None and ( 

1318 show_footnote_on_all 

1319 or (document.rtf_page.page_footnote == "first" and is_first_page) 

1320 or (document.rtf_page.page_footnote == "last" and is_last_page) 

1321 ): 

1322 footnote_content = "\n".join( 

1323 self.encoding_service.encode_footnote( 

1324 footnote_component, 

1325 page_number=i + 1, 

1326 page_col_width=document.rtf_page.col_width, 

1327 ) 

1328 ) 

1329 if footnote_content: 

1330 page_elements.append(footnote_content) 

1331 

1332 # Add source based on page settings 

1333 if document.rtf_source is not None and ( 

1334 show_source_on_all 

1335 or (document.rtf_page.page_source == "first" and is_first_page) 

1336 or (document.rtf_page.page_source == "last" and is_last_page) 

1337 ): 

1338 source_content = "\n".join( 

1339 self.encoding_service.encode_source( 

1340 document.rtf_source, 

1341 page_number=i + 1, 

1342 page_col_width=document.rtf_page.col_width, 

1343 ) 

1344 ) 

1345 if source_content: 

1346 page_elements.append(source_content) 

1347 

1348 # Add page break between figures (except after last figure) 

1349 if not is_last_page: 

1350 page_elements.append("\\page ") 

1351 

1352 # Close document 

1353 page_elements.append("\n\n") 

1354 page_elements.append("}") 

1355 

1356 return "".join([item for item in page_elements if item is not None]) 

1357 

1358 def _generate_subline_header(self, subline_header_info: dict, rtf_body) -> str: 

1359 """Generate RTF paragraph for subline_by header. 

1360 

1361 Args: 

1362 subline_header_info: Dictionary with column values for the subline header 

1363 rtf_body: RTFBody attributes for formatting 

1364 

1365 Returns: 

1366 RTF string for the subline paragraph 

1367 """ 

1368 if not subline_header_info: 

1369 return "" 

1370 

1371 # Use the raw group values without column names 

1372 if "group_values" in subline_header_info: 

1373 # Extract just the values without column prefixes 

1374 header_parts = [] 

1375 for _col, value in subline_header_info["group_values"].items(): 

1376 if value is not None: 

1377 header_parts.append(str(value)) 

1378 

1379 if not header_parts: 

1380 return "" 

1381 

1382 header_text = ", ".join(header_parts) 

1383 else: 

1384 # Fallback for backward compatibility 

1385 header_parts = [] 

1386 for col, value in subline_header_info.items(): 

1387 if value is not None and col not in ["group_by_columns", "header_text"]: 

1388 header_parts.append(str(value)) 

1389 

1390 if not header_parts: 

1391 return "" 

1392 

1393 header_text = ", ".join(header_parts) 

1394 

1395 # Create RTF paragraph with minimal spacing (no sb180/sa180 to eliminate 

1396 # space between header and table) 

1397 return ( 

1398 f"{{\\pard\\hyphpar\\fi0\\li0\\ri0\\ql\\fs18{{\\f0 {header_text}}}\\par}}" 

1399 ) 

1400