Coverage for src/rtflite/encoding/strategies.py: 83%

381 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 16:35 +0000

1"""Encoding strategies for different types of RTF documents.""" 

2 

3from abc import ABC, abstractmethod 

4from typing import TYPE_CHECKING 

5 

6from ..services.grouping_service import grouping_service 

7from ..type_guards import ( 

8 is_flat_header_list, 

9 is_list_body, 

10 is_list_header, 

11 is_nested_header_list, 

12 is_single_body, 

13 is_single_header, 

14) 

15 

16if TYPE_CHECKING: 

17 from ..encode import RTFDocument 

18 

19 

20class EncodingStrategy(ABC): 

21 """Abstract base class for RTF encoding strategies.""" 

22 

23 @abstractmethod 

24 def encode(self, document: "RTFDocument") -> str: 

25 """Encode the document using this strategy. 

26 

27 Args: 

28 document: The RTF document to encode 

29 

30 Returns: 

31 Complete RTF string 

32 """ 

33 pass 

34 

35 

36class SinglePageStrategy(EncodingStrategy): 

37 """Encoding strategy for single-page documents without pagination.""" 

38 

39 def __init__(self): 

40 from ..services import RTFEncodingService 

41 from ..services.document_service import RTFDocumentService 

42 from ..services.figure_service import RTFFigureService 

43 

44 self.encoding_service = RTFEncodingService() 

45 self.document_service = RTFDocumentService() 

46 self.figure_service = RTFFigureService() 

47 

48 def encode(self, document: "RTFDocument") -> str: 

49 """Encode a single-page document with complete border and layout handling. 

50 

51 Args: 

52 document: The RTF document to encode 

53 

54 Returns: 

55 Complete RTF string 

56 """ 

57 import polars as pl 

58 

59 from ..attributes import BroadcastValue 

60 

61 # Handle figure-only documents (no table) 

62 if document.df is None: 

63 return self._encode_figure_only_document_simple(document) 

64 

65 # Check if this is a multi-section document 

66 if isinstance(document.df, list): 

67 return self._encode_multi_section_document(document) 

68 

69 # Original single-page encoding logic for table documents 

70 dim = document.df.shape 

71 

72 # Title 

73 rtf_title = self.encoding_service.encode_title( 

74 document.rtf_title, method="line" 

75 ) 

76 

77 # Page Border 

78 doc_border_top_list = BroadcastValue( 

79 value=document.rtf_page.border_first, dimension=(1, dim[1]) 

80 ).to_list() 

81 doc_border_top = doc_border_top_list[0] if doc_border_top_list else None 

82 doc_border_bottom_list = BroadcastValue( 

83 value=document.rtf_page.border_last, dimension=(1, dim[1]) 

84 ).to_list() 

85 doc_border_bottom = ( 

86 doc_border_bottom_list[0] if doc_border_bottom_list else None 

87 ) 

88 page_border_top = None 

89 page_border_bottom = None 

90 if document.rtf_body is not None and not isinstance(document.rtf_body, list): 

91 page_border_top_list = BroadcastValue( 

92 value=document.rtf_body.border_first, dimension=(1, dim[1]) 

93 ).to_list() 

94 page_border_top = page_border_top_list[0] if page_border_top_list else None 

95 page_border_bottom_list = BroadcastValue( 

96 value=document.rtf_body.border_last, dimension=(1, dim[1]) 

97 ).to_list() 

98 page_border_bottom = ( 

99 page_border_bottom_list[0] if page_border_bottom_list else None 

100 ) 

101 

102 # Column header 

103 if document.rtf_column_header is None: 

104 rtf_column_header = "" 

105 # Only update borders if DataFrame has rows 

106 if dim[0] > 0: 

107 document.rtf_body.border_top = BroadcastValue( 

108 value=document.rtf_body.border_top, dimension=dim 

109 ).update_row(0, doc_border_top) 

110 else: 

111 # Check if rtf_column_header is a list 

112 header_to_check = None 

113 if is_nested_header_list(document.rtf_column_header): 

114 # Nested list case - get first section's first header 

115 if ( 

116 document.rtf_column_header[0] 

117 and len(document.rtf_column_header[0]) > 0 

118 ): 

119 header_to_check = document.rtf_column_header[0][0] 

120 elif is_flat_header_list(document.rtf_column_header): 

121 # Flat list case - get first header 

122 if len(document.rtf_column_header) > 0: 

123 header_to_check = document.rtf_column_header[0] 

124 elif is_single_header(document.rtf_column_header): # type: ignore[arg-type] 

125 header_to_check = document.rtf_column_header 

126 

127 if ( 

128 header_to_check is not None 

129 and header_to_check.text is None 

130 and is_single_body(document.rtf_body) 

131 and document.rtf_body.as_colheader 

132 ): 

133 # Determine which columns to exclude from headers 

134 excluded_columns = list(document.rtf_body.page_by or []) + list( 

135 document.rtf_body.subline_by or [] 

136 ) 

137 columns = [ 

138 col for col in document.df.columns if col not in excluded_columns 

139 ] 

140 # Create DataFrame with explicit column names to ensure single row 

141 header_df = pl.DataFrame( 

142 [columns], 

143 schema=[f"col_{i}" for i in range(len(columns))], 

144 orient="row", 

145 ) 

146 # Only assign if we have a valid flat header list 

147 if ( 

148 is_flat_header_list(document.rtf_column_header) 

149 and len(document.rtf_column_header) > 0 

150 and document.rtf_column_header[0] is not None 

151 ): 

152 document.rtf_column_header[0].text = header_df # type: ignore[assignment] 

153 

154 # Adjust col_rel_width to match the processed columns 

155 if excluded_columns: 

156 original_cols = list(document.df.columns) 

157 excluded_cols_set = set(excluded_columns) 

158 processed_col_indices = [ 

159 i 

160 for i, col in enumerate(original_cols) 

161 if col not in excluded_cols_set 

162 ] 

163 

164 # Ensure we have enough col_rel_width values for all original columns 

165 if document.rtf_body.col_rel_width is not None and len( 

166 document.rtf_body.col_rel_width 

167 ) >= len(original_cols): 

168 if ( 

169 is_flat_header_list(document.rtf_column_header) 

170 and len(document.rtf_column_header) > 0 

171 and document.rtf_column_header[0] is not None 

172 ): 

173 document.rtf_column_header[0].col_rel_width = [ 

174 document.rtf_body.col_rel_width[i] 

175 for i in processed_col_indices 

176 ] 

177 else: 

178 # Fallback: use equal widths if col_rel_width doesn't match or is None 

179 if ( 

180 is_flat_header_list(document.rtf_column_header) 

181 and len(document.rtf_column_header) > 0 

182 and document.rtf_column_header[0] is not None 

183 ): 

184 document.rtf_column_header[0].col_rel_width = [1] * len( 

185 columns 

186 ) 

187 

188 document.rtf_column_header = document.rtf_column_header[:1] 

189 

190 # Only update borders if DataFrame has rows 

191 if ( 

192 dim[0] > 0 

193 and is_flat_header_list(document.rtf_column_header) 

194 and len(document.rtf_column_header) > 0 

195 and document.rtf_column_header[0] is not None 

196 ): 

197 document.rtf_column_header[0].border_top = BroadcastValue( 

198 value=document.rtf_column_header[0].border_top, dimension=dim 

199 ).update_row(0, doc_border_top if doc_border_top is not None else []) 

200 

201 if is_nested_header_list(document.rtf_column_header): 

202 # Handle nested list of headers 

203 rtf_column_header = [] 

204 for section_headers in document.rtf_column_header: 

205 if section_headers: 

206 for header in section_headers: 

207 if header: 

208 rtf_column_header.append( 

209 self.encoding_service.encode_column_header( 

210 header.text, header, document.rtf_page.col_width 

211 ) 

212 ) 

213 elif is_flat_header_list(document.rtf_column_header): 

214 rtf_column_header = [ 

215 self.encoding_service.encode_column_header( 

216 header.text if header else None, 

217 header, 

218 document.rtf_page.col_width, 

219 ) 

220 for header in document.rtf_column_header 

221 ] 

222 elif is_single_header(document.rtf_column_header): # type: ignore[arg-type] 

223 rtf_column_header = [ 

224 self.encoding_service.encode_column_header( 

225 document.rtf_column_header.text, 

226 document.rtf_column_header, 

227 document.rtf_page.col_width, 

228 ) 

229 ] 

230 else: 

231 rtf_column_header = [] 

232 

233 # Only update borders if DataFrame has rows 

234 if dim[0] > 0 and is_single_body(document.rtf_body): 

235 if page_border_top is not None: 

236 document.rtf_body.border_top = BroadcastValue( 

237 value=document.rtf_body.border_top, dimension=dim 

238 ).update_row(0, page_border_top) 

239 

240 # Bottom border last line update 

241 if document.rtf_footnote is not None: 

242 if page_border_bottom is not None: 

243 document.rtf_footnote.border_bottom = BroadcastValue( 

244 value=document.rtf_footnote.border_bottom, dimension=(1, 1) 

245 ).update_row(0, [page_border_bottom[0]]) 

246 

247 if doc_border_bottom is not None: 

248 document.rtf_footnote.border_bottom = BroadcastValue( 

249 value=document.rtf_footnote.border_bottom, dimension=(1, 1) 

250 ).update_row(0, [doc_border_bottom[0]]) 

251 else: 

252 # Only update borders if DataFrame has rows 

253 if dim[0] > 0: 

254 if page_border_bottom is not None and is_single_body(document.rtf_body): 

255 document.rtf_body.border_bottom = BroadcastValue( 

256 value=document.rtf_body.border_bottom, dimension=dim 

257 ).update_row(dim[0] - 1, page_border_bottom) 

258 

259 if doc_border_bottom is not None and is_single_body(document.rtf_body): 

260 document.rtf_body.border_bottom = BroadcastValue( 

261 value=document.rtf_body.border_bottom, dimension=dim 

262 ).update_row(dim[0] - 1, doc_border_bottom) 

263 

264 # Set document color context for accurate color index resolution 

265 from ..services.color_service import color_service 

266 

267 color_service.set_document_context(document) 

268 

269 # Body 

270 rtf_body = self.encoding_service.encode_body( 

271 document, document.df, document.rtf_body, force_single_page=True 

272 ) 

273 

274 result = "\n".join( 

275 [ 

276 item 

277 for item in [ 

278 self.encoding_service.encode_document_start(), 

279 self.encoding_service.encode_font_table(), 

280 self.encoding_service.encode_color_table(document), 

281 "\n", 

282 self.encoding_service.encode_page_header( 

283 document.rtf_page_header, method="line" 

284 ), 

285 self.encoding_service.encode_page_footer( 

286 document.rtf_page_footer, method="line" 

287 ), 

288 self.encoding_service.encode_page_settings(document.rtf_page), 

289 rtf_title, 

290 "\n", 

291 self.encoding_service.encode_subline( 

292 document.rtf_subline, method="line" 

293 ), 

294 self.figure_service.encode_figure(document.rtf_figure) 

295 if document.rtf_figure is not None 

296 and document.rtf_figure.fig_pos == "before" 

297 else None, 

298 "\n".join( 

299 header for sublist in rtf_column_header for header in sublist 

300 ) 

301 if rtf_column_header 

302 else None, 

303 "\n".join(rtf_body), 

304 "\n".join( 

305 self.encoding_service.encode_footnote( 

306 document.rtf_footnote, 

307 page_number=1, 

308 page_col_width=document.rtf_page.col_width, 

309 ) 

310 ) 

311 if document.rtf_footnote is not None 

312 else None, 

313 "\n".join( 

314 self.encoding_service.encode_source( 

315 document.rtf_source, 

316 page_number=1, 

317 page_col_width=document.rtf_page.col_width, 

318 ) 

319 ) 

320 if document.rtf_source is not None 

321 else None, 

322 self.figure_service.encode_figure(document.rtf_figure) 

323 if document.rtf_figure is not None 

324 and document.rtf_figure.fig_pos == "after" 

325 else None, 

326 "\n\n", 

327 "}", 

328 ] 

329 if item is not None 

330 ] 

331 ) 

332 

333 # Clear document context after encoding 

334 color_service.clear_document_context() 

335 

336 return result 

337 

338 def _encode_multi_section_document(self, document: "RTFDocument") -> str: 

339 """Encode a multi-section document where sections are concatenated row by row. 

340 

341 Args: 

342 document: The RTF document with multiple df/rtf_body sections 

343 

344 Returns: 

345 Complete RTF string 

346 """ 

347 from ..attributes import BroadcastValue 

348 

349 # Calculate total rows across all sections for border management 

350 if isinstance(document.df, list): 

351 total_rows = sum(df.shape[0] for df in document.df) 

352 first_section_cols = document.df[0].shape[1] if document.df else 0 

353 else: 

354 total_rows = document.df.shape[0] if document.df is not None else 0 

355 first_section_cols = document.df.shape[1] if document.df is not None else 0 

356 

357 # Document structure components 

358 rtf_title = self.encoding_service.encode_title( 

359 document.rtf_title, method="line" 

360 ) 

361 

362 # Handle page borders (use first section for dimensions) 

363 doc_border_top_list = BroadcastValue( 

364 value=document.rtf_page.border_first, dimension=(1, first_section_cols) 

365 ).to_list() 

366 doc_border_top = doc_border_top_list[0] if doc_border_top_list else None 

367 doc_border_bottom_list = BroadcastValue( 

368 value=document.rtf_page.border_last, dimension=(1, first_section_cols) 

369 ).to_list() 

370 doc_border_bottom = ( 

371 doc_border_bottom_list[0] if doc_border_bottom_list else None 

372 ) 

373 

374 # Encode sections 

375 all_section_content = [] 

376 is_nested_headers = is_nested_header_list(document.rtf_column_header) 

377 

378 df_list = ( 

379 document.df 

380 if isinstance(document.df, list) 

381 else [document.df] 

382 if document.df is not None 

383 else [] 

384 ) 

385 body_list = ( 

386 document.rtf_body 

387 if isinstance(document.rtf_body, list) 

388 else [document.rtf_body] 

389 if document.rtf_body is not None 

390 else [] 

391 ) 

392 

393 for i, (section_df, section_body) in enumerate(zip(df_list, body_list)): 

394 dim = section_df.shape 

395 

396 # Handle column headers for this section 

397 section_headers: list[str] = [] 

398 if is_nested_headers: 

399 # Nested format: [[header1], [None], [header3]] 

400 if ( 

401 i < len(document.rtf_column_header) 

402 and document.rtf_column_header[i] 

403 ): 

404 for header in document.rtf_column_header[i]: 

405 if header is not None: 

406 from ..input import RTFColumnHeader 

407 

408 # Ensure header is RTFColumnHeader, not tuple 

409 if not isinstance(header, RTFColumnHeader): 

410 continue 

411 # Apply top border to first section's first header 

412 if ( 

413 i == 0 

414 and not section_headers 

415 and doc_border_top is not None 

416 ): 

417 header.border_top = BroadcastValue( 

418 value=header.border_top, dimension=dim 

419 ).update_row(0, doc_border_top) 

420 

421 section_headers.append( 

422 self.encoding_service.encode_column_header( 

423 header.text, header, document.rtf_page.col_width 

424 ) 

425 ) 

426 else: 

427 # Flat format - only apply to first section 

428 if i == 0: 

429 headers_to_check = [] 

430 if is_flat_header_list(document.rtf_column_header): 

431 headers_to_check = document.rtf_column_header 

432 elif is_single_header(document.rtf_column_header): # type: ignore[arg-type] 

433 headers_to_check = [document.rtf_column_header] 

434 

435 for header in headers_to_check: 

436 if ( 

437 header is not None 

438 and header.text is None 

439 and section_body.as_colheader 

440 ): 

441 # Auto-generate headers from column names 

442 columns = [ 

443 col 

444 for col in section_df.columns 

445 if col not in (section_body.page_by or []) 

446 ] 

447 import polars as pl 

448 

449 header_df = pl.DataFrame( 

450 [columns], 

451 schema=[f"col_{j}" for j in range(len(columns))], 

452 orient="row", 

453 ) 

454 header.text = header_df # type: ignore[assignment] 

455 

456 # Apply top border to first header 

457 if ( 

458 not section_headers 

459 and doc_border_top is not None 

460 and header is not None 

461 ): 

462 header.border_top = BroadcastValue( 

463 value=header.border_top, dimension=dim 

464 ).update_row( 

465 0, doc_border_top if doc_border_top is not None else [] 

466 ) 

467 

468 if header is not None: 

469 section_headers.append( 

470 self.encoding_service.encode_column_header( 

471 header.text, header, document.rtf_page.col_width 

472 ) 

473 ) 

474 

475 # Handle borders for section body 

476 if i == 0 and not section_headers: # First section, no headers 

477 # Apply top border to first row of first section 

478 section_body.border_top = BroadcastValue( 

479 value=section_body.border_top, dimension=dim 

480 ).update_row(0, doc_border_top if doc_border_top is not None else []) 

481 

482 # Create a temporary document for this section to maintain compatibility 

483 from copy import deepcopy 

484 

485 temp_document = deepcopy(document) 

486 temp_document.df = section_df 

487 temp_document.rtf_body = section_body 

488 

489 # Encode section body 

490 section_body_content = self.encoding_service.encode_body( 

491 temp_document, section_df, section_body 

492 ) 

493 

494 # Add section content 

495 if section_headers: 

496 all_section_content.extend( 

497 [ 

498 "\n".join( 

499 header for sublist in section_headers for header in sublist 

500 ) 

501 ] 

502 ) 

503 all_section_content.extend(section_body_content) 

504 

505 # Handle bottom borders on last section 

506 if document.rtf_footnote is not None and doc_border_bottom is not None: 

507 document.rtf_footnote.border_bottom = BroadcastValue( 

508 value=document.rtf_footnote.border_bottom, dimension=(1, 1) 

509 ).update_row(0, [doc_border_bottom[0]]) 

510 else: 

511 # Apply bottom border to last section's last row 

512 if isinstance(document.rtf_body, list) and isinstance(document.df, list): 

513 last_section_body = document.rtf_body[-1] 

514 last_section_dim = document.df[-1].shape 

515 if last_section_dim[0] > 0 and doc_border_bottom is not None: 

516 last_section_body.border_bottom = BroadcastValue( 

517 value=last_section_body.border_bottom, 

518 dimension=last_section_dim, 

519 ).update_row(last_section_dim[0] - 1, doc_border_bottom) 

520 

521 return "\n".join( 

522 [ 

523 item 

524 for item in [ 

525 self.encoding_service.encode_document_start(), 

526 self.encoding_service.encode_font_table(), 

527 "\n", 

528 self.encoding_service.encode_page_header( 

529 document.rtf_page_header, method="line" 

530 ), 

531 self.encoding_service.encode_page_footer( 

532 document.rtf_page_footer, method="line" 

533 ), 

534 self.encoding_service.encode_page_settings(document.rtf_page), 

535 rtf_title, 

536 "\n", 

537 self.encoding_service.encode_subline( 

538 document.rtf_subline, method="line" 

539 ), 

540 "\n".join(all_section_content), 

541 "\n".join( 

542 self.encoding_service.encode_footnote( 

543 document.rtf_footnote, 

544 page_number=1, 

545 page_col_width=document.rtf_page.col_width, 

546 ) 

547 ) 

548 if document.rtf_footnote is not None 

549 else None, 

550 "\n".join( 

551 self.encoding_service.encode_source( 

552 document.rtf_source, 

553 page_number=1, 

554 page_col_width=document.rtf_page.col_width, 

555 ) 

556 ) 

557 if document.rtf_source is not None 

558 else None, 

559 "\n\n", 

560 "}", 

561 ] 

562 if item is not None 

563 ] 

564 ) 

565 

566 def _encode_figure_only_document_simple(self, document: "RTFDocument") -> str: 

567 """Encode a figure-only document with simple page layout. 

568 

569 This handles figure-only documents with default page settings. 

570 Multiple figures will have page breaks between them (handled by FigureService). 

571 

572 Args: 

573 document: The RTF document with only figure content 

574 

575 Returns: 

576 Complete RTF string 

577 """ 

578 # Build RTF components for simple figure-only document 

579 rtf_title = self.encoding_service.encode_title( 

580 document.rtf_title, method="line" 

581 ) 

582 

583 # Assemble final RTF document 

584 return "".join( 

585 [ 

586 item 

587 for item in [ 

588 self.encoding_service.encode_document_start(), 

589 self.encoding_service.encode_font_table(), 

590 self.encoding_service.encode_color_table(document), 

591 "\n", 

592 self.encoding_service.encode_page_header( 

593 document.rtf_page_header, method="line" 

594 ), 

595 self.encoding_service.encode_page_footer( 

596 document.rtf_page_footer, method="line" 

597 ), 

598 self.encoding_service.encode_page_settings(document.rtf_page), 

599 rtf_title, 

600 "\n", 

601 self.encoding_service.encode_subline( 

602 document.rtf_subline, method="line" 

603 ), 

604 # FigureService handles page breaks between multiple figures 

605 self.figure_service.encode_figure(document.rtf_figure), 

606 "\n".join( 

607 self.encoding_service.encode_footnote( 

608 document.rtf_footnote, 

609 page_number=1, 

610 page_col_width=document.rtf_page.col_width, 

611 ) 

612 ) 

613 if document.rtf_footnote is not None 

614 else None, 

615 "\n".join( 

616 self.encoding_service.encode_source( 

617 document.rtf_source, 

618 page_number=1, 

619 page_col_width=document.rtf_page.col_width, 

620 ) 

621 ) 

622 if document.rtf_source is not None 

623 else None, 

624 "\n\n", 

625 "}", 

626 ] 

627 if item is not None 

628 ] 

629 ) 

630 

631 

632class PaginatedStrategy(EncodingStrategy): 

633 """Encoding strategy for multi-page documents with pagination.""" 

634 

635 def __init__(self): 

636 from ..services import RTFEncodingService 

637 from ..services.document_service import RTFDocumentService 

638 from ..services.figure_service import RTFFigureService 

639 

640 self.encoding_service = RTFEncodingService() 

641 self.document_service = RTFDocumentService() 

642 self.figure_service = RTFFigureService() 

643 

644 def encode(self, document: "RTFDocument") -> str: 

645 """Encode a paginated document with full pagination support. 

646 

647 Args: 

648 document: The RTF document to encode 

649 

650 Returns: 

651 Complete RTF string 

652 """ 

653 from copy import deepcopy 

654 

655 import polars as pl 

656 

657 from ..attributes import BroadcastValue 

658 from ..row import Utils 

659 

660 # Handle figure-only documents with multi-page behavior 

661 if document.df is None: 

662 return self._encode_figure_only_document_with_pagination(document) 

663 

664 # Get dimensions based on DataFrame type 

665 if isinstance(document.df, list): 

666 # For list of DataFrames, use first one's columns 

667 dim = ( 

668 sum(df.shape[0] for df in document.df), 

669 document.df[0].shape[1] if document.df else 0, 

670 ) 

671 else: 

672 dim = document.df.shape 

673 

674 # Set document color context for accurate color index resolution 

675 from ..services.color_service import color_service 

676 

677 color_service.set_document_context(document) 

678 

679 # Prepare DataFrame for processing (remove subline_by columns, apply group_by if needed) 

680 processed_df, original_df = ( 

681 self.encoding_service.prepare_dataframe_for_body_encoding( 

682 document.df, document.rtf_body 

683 ) 

684 ) 

685 

686 # Validate subline_by formatting consistency before processing 

687 if ( 

688 is_single_body(document.rtf_body) 

689 and document.rtf_body.subline_by is not None 

690 ): 

691 import warnings 

692 from typing import cast 

693 

694 subline_by_list = cast(list[str], document.rtf_body.subline_by) 

695 formatting_warnings = ( 

696 grouping_service.validate_subline_formatting_consistency( 

697 original_df, subline_by_list, document.rtf_body 

698 ) 

699 ) 

700 for warning_msg in formatting_warnings: 

701 warnings.warn( 

702 f"subline_by formatting: {warning_msg}", UserWarning, stacklevel=3 

703 ) 

704 

705 # Get pagination instance and distribute content (use processed data for distribution) 

706 _, distributor = self.document_service.create_pagination_instance(document) 

707 col_total_width = document.rtf_page.col_width 

708 if ( 

709 is_single_body(document.rtf_body) 

710 and document.rtf_body.col_rel_width is not None 

711 ): 

712 col_widths = Utils._col_widths( 

713 document.rtf_body.col_rel_width, 

714 col_total_width if col_total_width is not None else 8.5, 

715 ) 

716 else: 

717 # Default to equal widths if body is not single 

718 col_widths = Utils._col_widths( 

719 [1] * dim[1], col_total_width if col_total_width is not None else 8.5 

720 ) 

721 

722 # Calculate additional rows per page for r2rtf compatibility 

723 additional_rows = self.document_service.calculate_additional_rows_per_page( 

724 document 

725 ) 

726 

727 # Use original DataFrame for pagination logic (to identify subline_by breaks) 

728 # but processed DataFrame for the actual content 

729 if is_single_body(document.rtf_body): 

730 pages = distributor.distribute_content( 

731 df=original_df, # Use original DataFrame for proper pagination distribution logic 

732 col_widths=col_widths, 

733 page_by=document.rtf_body.page_by, 

734 new_page=document.rtf_body.new_page, 

735 pageby_header=document.rtf_body.pageby_header, 

736 table_attrs=document.rtf_body, 

737 additional_rows_per_page=additional_rows, 

738 subline_by=document.rtf_body.subline_by, 

739 ) 

740 else: 

741 # Default pagination if body is not single 

742 pages = distributor.distribute_content( 

743 df=original_df, 

744 col_widths=col_widths, 

745 page_by=None, 

746 new_page=None, 

747 pageby_header=None, 

748 table_attrs=None, 

749 additional_rows_per_page=additional_rows, 

750 subline_by=None, 

751 ) 

752 

753 # Replace page data with processed data (without subline_by columns) 

754 for i, page_info in enumerate(pages): 

755 start_row = page_info["start_row"] 

756 end_row = page_info["end_row"] 

757 page_info["data"] = processed_df.slice(start_row, end_row - start_row + 1) 

758 

759 # Apply group_by processing to each page if needed 

760 if is_single_body(document.rtf_body) and document.rtf_body.group_by: 

761 # Calculate global page start indices for context restoration 

762 page_start_indices = [] 

763 cumulative_rows = 0 

764 for i, page_info in enumerate(pages): 

765 if i > 0: # Skip first page (starts at 0) 

766 page_start_indices.append(cumulative_rows) 

767 cumulative_rows += len(page_info["data"]) 

768 

769 # Process all pages together for proper group_by and page context restoration 

770 all_page_data = [] 

771 for page_info in pages: 

772 all_page_data.append(page_info["data"]) 

773 

774 # Concatenate all page data 

775 full_df = all_page_data[0] 

776 for page_df in all_page_data[1:]: 

777 full_df = full_df.vstack(page_df) 

778 

779 # Apply group_by suppression to the full dataset 

780 from typing import cast 

781 

782 group_by_param = cast(list[str] | None, document.rtf_body.group_by) 

783 suppressed_df = grouping_service.enhance_group_by(full_df, group_by_param) 

784 

785 # Apply page context restoration 

786 from typing import cast 

787 

788 group_by_list2 = cast(list[str], document.rtf_body.group_by) 

789 restored_df = grouping_service.restore_page_context( 

790 suppressed_df, full_df, group_by_list2, page_start_indices 

791 ) 

792 

793 # Split the processed data back to pages 

794 start_idx = 0 

795 for page_info in pages: 

796 page_rows = len(page_info["data"]) 

797 page_info["data"] = restored_df.slice(start_idx, page_rows) 

798 start_idx += page_rows 

799 

800 # Prepare border settings 

801 border_first_list = BroadcastValue( 

802 value=document.rtf_page.border_first, dimension=(1, dim[1]) 

803 ).to_list() 

804 _ = ( 

805 border_first_list[0] if border_first_list else None 

806 ) # May be used for validation 

807 border_last_list = BroadcastValue( 

808 value=document.rtf_page.border_last, dimension=(1, dim[1]) 

809 ).to_list() 

810 _ = ( 

811 border_last_list[0] if border_last_list else None 

812 ) # May be used for validation 

813 

814 # Generate RTF for each page 

815 page_contents = [] 

816 

817 for page_info in pages: 

818 page_elements = [] 

819 

820 # Add page break before each page (except first) 

821 if not page_info["is_first_page"]: 

822 page_elements.append( 

823 self.document_service.generate_page_break(document) 

824 ) 

825 

826 # Add title if it should appear on this page 

827 if ( 

828 document.rtf_title 

829 and document.rtf_title.text 

830 and self.document_service.should_show_element_on_page( 

831 document.rtf_page.page_title, page_info 

832 ) 

833 ): 

834 title_content = self.encoding_service.encode_title( 

835 document.rtf_title, method="line" 

836 ) 

837 if title_content: 

838 page_elements.append(title_content) 

839 page_elements.append("\n") 

840 

841 # Add subline if it should appear on this page 

842 if ( 

843 document.rtf_subline 

844 and document.rtf_subline.text 

845 and self.document_service.should_show_element_on_page( 

846 document.rtf_page.page_title, page_info 

847 ) 

848 ): 

849 subline_content = self.encoding_service.encode_subline( 

850 document.rtf_subline, method="line" 

851 ) 

852 if subline_content: 

853 page_elements.append(subline_content) 

854 

855 # Add subline_by header paragraph if specified 

856 if page_info.get("subline_header"): 

857 subline_header_content = self._generate_subline_header( 

858 page_info["subline_header"], document.rtf_body 

859 ) 

860 if subline_header_content: 

861 page_elements.append(subline_header_content) 

862 

863 # Add figures if they should appear on the first page and position is 'before' 

864 if ( 

865 document.rtf_figure 

866 and document.rtf_figure.figures 

867 and document.rtf_figure.fig_pos == "before" 

868 and page_info["is_first_page"] 

869 ): 

870 figure_content = self.figure_service.encode_figure(document.rtf_figure) 

871 if figure_content: 

872 page_elements.append(figure_content) 

873 page_elements.append("\n") 

874 

875 # Add column headers if needed 

876 if page_info["needs_header"] and document.rtf_column_header: 

877 if ( 

878 is_flat_header_list(document.rtf_column_header) 

879 and len(document.rtf_column_header) > 0 

880 and document.rtf_column_header[0] is not None 

881 and document.rtf_column_header[0].text is None 

882 and is_single_body(document.rtf_body) 

883 and document.rtf_body.as_colheader 

884 ): 

885 # Use the processed page data columns (which already have subline_by columns removed) 

886 page_df = page_info["data"] 

887 columns = list(page_df.columns) 

888 # Create DataFrame for text field (not assign list to text) 

889 import polars as pl 

890 

891 header_df = pl.DataFrame( 

892 [columns], 

893 schema=[f"col_{i}" for i in range(len(columns))], 

894 orient="row", 

895 ) 

896 document.rtf_column_header[0].text = header_df # type: ignore[assignment] 

897 

898 # Adjust col_rel_width to match the processed columns (without subline_by) 

899 if ( 

900 is_single_body(document.rtf_body) 

901 and document.rtf_body.subline_by 

902 ): 

903 original_cols = ( 

904 list(document.df.columns) 

905 if isinstance(document.df, pl.DataFrame) 

906 else [] 

907 ) 

908 subline_cols = set(document.rtf_body.subline_by) 

909 processed_col_indices = [ 

910 i 

911 for i, col in enumerate(original_cols) 

912 if col not in subline_cols 

913 ] 

914 

915 # Ensure we have enough col_rel_width values for all original columns 

916 if ( 

917 is_single_body(document.rtf_body) 

918 and document.rtf_body.col_rel_width is not None 

919 and len(document.rtf_body.col_rel_width) 

920 >= len(original_cols) 

921 and is_flat_header_list(document.rtf_column_header) 

922 and len(document.rtf_column_header) > 0 

923 and document.rtf_column_header[0] is not None 

924 ): 

925 document.rtf_column_header[0].col_rel_width = [ 

926 document.rtf_body.col_rel_width[i] 

927 for i in processed_col_indices 

928 ] 

929 elif ( 

930 is_flat_header_list(document.rtf_column_header) 

931 and len(document.rtf_column_header) > 0 

932 and document.rtf_column_header[0] is not None 

933 ): 

934 # Fallback: use equal widths if col_rel_width doesn't match 

935 document.rtf_column_header[0].col_rel_width = [1] * len( 

936 columns 

937 ) 

938 

939 # Apply pagination borders to column headers 

940 # Process each column header with proper borders 

941 header_elements = [] 

942 headers_to_process = [] 

943 if is_nested_header_list(document.rtf_column_header): 

944 # For nested headers, flatten them 

945 for section_headers in document.rtf_column_header: 

946 if section_headers: 

947 headers_to_process.extend(section_headers) 

948 elif is_flat_header_list(document.rtf_column_header): 

949 headers_to_process = document.rtf_column_header 

950 

951 for i, header in enumerate(headers_to_process): 

952 if header is None: 

953 continue 

954 header_copy = deepcopy(header) 

955 

956 # Apply page-level borders to column headers (matching non-paginated behavior) 

957 if ( 

958 page_info["is_first_page"] and i == 0 

959 ): # First header on first page 

960 if ( 

961 document.rtf_page.border_first 

962 and header_copy.text is not None 

963 ): 

964 # Get dimensions based on text type 

965 import polars as pl 

966 

967 if isinstance(header_copy.text, pl.DataFrame): 

968 header_dims = header_copy.text.shape 

969 else: 

970 # For Sequence[str], assume single row 

971 header_dims = ( 

972 1, 

973 len(header_copy.text) if header_copy.text else 0, 

974 ) 

975 # Apply page border_first to top of first column header 

976 header_copy.border_top = BroadcastValue( 

977 value=header_copy.border_top, dimension=header_dims 

978 ).update_row( 

979 0, [document.rtf_page.border_first] * header_dims[1] 

980 ) 

981 

982 # Encode the header with modified borders 

983 # Use the header_copy to preserve border modifications 

984 header_rtf = self.encoding_service.encode_column_header( 

985 header_copy.text, header_copy, document.rtf_page.col_width 

986 ) 

987 header_elements.extend(header_rtf) 

988 

989 page_elements.extend(header_elements) 

990 

991 # Add page content (table body) with proper border handling 

992 page_df = page_info["data"] 

993 

994 # Apply pagination borders to the body attributes 

995 page_attrs = self.document_service.apply_pagination_borders( 

996 document, document.rtf_body, page_info, len(pages) 

997 ) 

998 

999 # Encode page content with modified borders 

1000 page_body = page_attrs._encode(page_df, col_widths) 

1001 page_elements.extend(page_body) 

1002 

1003 # Add footnote if it should appear on this page 

1004 if ( 

1005 document.rtf_footnote 

1006 and document.rtf_footnote.text 

1007 and self.document_service.should_show_element_on_page( 

1008 document.rtf_page.page_footnote, page_info 

1009 ) 

1010 ): 

1011 footnote_content = self.encoding_service.encode_footnote( 

1012 document.rtf_footnote, 

1013 page_info["page_number"], 

1014 document.rtf_page.col_width, 

1015 ) 

1016 if footnote_content: 

1017 page_elements.extend(footnote_content) 

1018 

1019 # Add source if it should appear on this page 

1020 if ( 

1021 document.rtf_source 

1022 and document.rtf_source.text 

1023 and self.document_service.should_show_element_on_page( 

1024 document.rtf_page.page_source, page_info 

1025 ) 

1026 ): 

1027 source_content = self.encoding_service.encode_source( 

1028 document.rtf_source, 

1029 page_info["page_number"], 

1030 document.rtf_page.col_width, 

1031 ) 

1032 if source_content: 

1033 page_elements.extend(source_content) 

1034 

1035 # Add figures if they should appear on the last page and position is 'after' 

1036 if ( 

1037 document.rtf_figure 

1038 and document.rtf_figure.figures 

1039 and document.rtf_figure.fig_pos == "after" 

1040 and page_info["is_last_page"] 

1041 ): 

1042 figure_content = self.figure_service.encode_figure(document.rtf_figure) 

1043 if figure_content: 

1044 page_elements.append(figure_content) 

1045 

1046 page_contents.extend(page_elements) 

1047 

1048 # Build complete RTF document 

1049 result = "\n".join( 

1050 [ 

1051 item 

1052 for item in [ 

1053 self.encoding_service.encode_document_start(), 

1054 self.encoding_service.encode_font_table(), 

1055 self.encoding_service.encode_color_table(document), 

1056 "\n", 

1057 self.encoding_service.encode_page_header( 

1058 document.rtf_page_header, method="line" 

1059 ), 

1060 self.encoding_service.encode_page_footer( 

1061 document.rtf_page_footer, method="line" 

1062 ), 

1063 self.encoding_service.encode_page_settings(document.rtf_page), 

1064 "\n".join(page_contents), 

1065 "\n\n", 

1066 "}", 

1067 ] 

1068 if item is not None 

1069 ] 

1070 ) 

1071 

1072 # Clear document context after encoding 

1073 color_service.clear_document_context() 

1074 

1075 return result 

1076 

1077 def _encode_figure_only_document_with_pagination( 

1078 self, document: "RTFDocument" 

1079 ) -> str: 

1080 """Encode a figure-only document with multi-page behavior. 

1081 

1082 This method handles figure-only documents where the user has requested 

1083 elements to appear on all pages (page_title="all", etc.) 

1084 

1085 For multiple figures, each figure will be on a separate page with 

1086 repeated titles/footnotes/sources as specified. 

1087 

1088 Args: 

1089 document: The RTF document with only figure content 

1090 

1091 Returns: 

1092 Complete RTF string 

1093 """ 

1094 from copy import deepcopy 

1095 

1096 from ..figure import rtf_read_figure 

1097 

1098 # Get figure information 

1099 if document.rtf_figure is None or document.rtf_figure.figures is None: 

1100 return "" 

1101 

1102 # Read figure data to determine number of figures 

1103 figure_data_list, figure_formats = rtf_read_figure(document.rtf_figure.figures) 

1104 num_figures = len(figure_data_list) 

1105 

1106 # Build RTF components 

1107 rtf_title = self.encoding_service.encode_title( 

1108 document.rtf_title, method="line" 

1109 ) 

1110 

1111 # For figure-only documents, footnote should be as_table=False 

1112 footnote_component = document.rtf_footnote 

1113 if footnote_component is not None: 

1114 footnote_component = deepcopy(footnote_component) 

1115 footnote_component.as_table = False 

1116 

1117 # Determine which elements should show on each page 

1118 show_title_on_all = document.rtf_page.page_title == "all" 

1119 show_footnote_on_all = document.rtf_page.page_footnote == "all" 

1120 show_source_on_all = document.rtf_page.page_source == "all" 

1121 

1122 page_elements = [] 

1123 

1124 # Add document start 

1125 page_elements.append(self.encoding_service.encode_document_start()) 

1126 page_elements.append(self.encoding_service.encode_font_table()) 

1127 page_elements.append(self.encoding_service.encode_color_table(document)) 

1128 page_elements.append("\n") 

1129 

1130 # Add page settings (headers/footers) 

1131 page_elements.append( 

1132 self.encoding_service.encode_page_header( 

1133 document.rtf_page_header, method="line" 

1134 ) 

1135 ) 

1136 page_elements.append( 

1137 self.encoding_service.encode_page_footer( 

1138 document.rtf_page_footer, method="line" 

1139 ) 

1140 ) 

1141 page_elements.append( 

1142 self.encoding_service.encode_page_settings(document.rtf_page) 

1143 ) 

1144 

1145 # Create each page with figure and repeated elements 

1146 for i in range(num_figures): 

1147 is_first_page = i == 0 

1148 is_last_page = i == num_figures - 1 

1149 

1150 # Add title based on page settings 

1151 if ( 

1152 show_title_on_all 

1153 or (document.rtf_page.page_title == "first" and is_first_page) 

1154 or (document.rtf_page.page_title == "last" and is_last_page) 

1155 ): 

1156 page_elements.append(rtf_title) 

1157 page_elements.append("\n") 

1158 

1159 # Add subline 

1160 if is_first_page: # Only on first page 

1161 page_elements.append( 

1162 self.encoding_service.encode_subline( 

1163 document.rtf_subline, method="line" 

1164 ) 

1165 ) 

1166 

1167 # Add single figure 

1168 width = self.figure_service._get_dimension(document.rtf_figure.fig_width, i) 

1169 height = self.figure_service._get_dimension( 

1170 document.rtf_figure.fig_height, i 

1171 ) 

1172 

1173 figure_rtf = self.figure_service._encode_single_figure( 

1174 figure_data_list[i], 

1175 figure_formats[i], 

1176 width, 

1177 height, 

1178 document.rtf_figure.fig_align, 

1179 ) 

1180 page_elements.append(figure_rtf) 

1181 page_elements.append("\\par ") 

1182 

1183 # Add footnote based on page settings 

1184 if footnote_component is not None and ( 

1185 show_footnote_on_all 

1186 or (document.rtf_page.page_footnote == "first" and is_first_page) 

1187 or (document.rtf_page.page_footnote == "last" and is_last_page) 

1188 ): 

1189 footnote_content = "\n".join( 

1190 self.encoding_service.encode_footnote( 

1191 footnote_component, 

1192 page_number=i + 1, 

1193 page_col_width=document.rtf_page.col_width, 

1194 ) 

1195 ) 

1196 if footnote_content: 

1197 page_elements.append(footnote_content) 

1198 

1199 # Add source based on page settings 

1200 if document.rtf_source is not None and ( 

1201 show_source_on_all 

1202 or (document.rtf_page.page_source == "first" and is_first_page) 

1203 or (document.rtf_page.page_source == "last" and is_last_page) 

1204 ): 

1205 source_content = "\n".join( 

1206 self.encoding_service.encode_source( 

1207 document.rtf_source, 

1208 page_number=i + 1, 

1209 page_col_width=document.rtf_page.col_width, 

1210 ) 

1211 ) 

1212 if source_content: 

1213 page_elements.append(source_content) 

1214 

1215 # Add page break between figures (except after last figure) 

1216 if not is_last_page: 

1217 page_elements.append("\\page ") 

1218 

1219 # Close document 

1220 page_elements.append("\n\n") 

1221 page_elements.append("}") 

1222 

1223 return "".join([item for item in page_elements if item is not None]) 

1224 

1225 def _generate_subline_header(self, subline_header_info: dict, rtf_body) -> str: 

1226 """Generate RTF paragraph for subline_by header. 

1227 

1228 Args: 

1229 subline_header_info: Dictionary with column values for the subline header 

1230 rtf_body: RTFBody attributes for formatting 

1231 

1232 Returns: 

1233 RTF string for the subline paragraph 

1234 """ 

1235 if not subline_header_info: 

1236 return "" 

1237 

1238 # Use the raw group values without column names 

1239 if "group_values" in subline_header_info: 

1240 # Extract just the values without column prefixes 

1241 header_parts = [] 

1242 for col, value in subline_header_info["group_values"].items(): 

1243 if value is not None: 

1244 header_parts.append(str(value)) 

1245 

1246 if not header_parts: 

1247 return "" 

1248 

1249 header_text = ", ".join(header_parts) 

1250 else: 

1251 # Fallback for backward compatibility 

1252 header_parts = [] 

1253 for col, value in subline_header_info.items(): 

1254 if value is not None and col not in ["group_by_columns", "header_text"]: 

1255 header_parts.append(str(value)) 

1256 

1257 if not header_parts: 

1258 return "" 

1259 

1260 header_text = ", ".join(header_parts) 

1261 

1262 # Create RTF paragraph with minimal spacing (no sb180/sa180 to eliminate space between header and table) 

1263 return ( 

1264 f"{{\\pard\\hyphpar\\fi0\\li0\\ri0\\ql\\fs18{{\\f0 {header_text}}}\\par}}" 

1265 )