Coverage for src / rtflite / encoding / unified_encoder.py: 95%

194 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-08 04:50 +0000

1from __future__ import annotations 

2 

3from typing import Any 

4 

5import polars as pl 

6 

7from rtflite import RTFDocument 

8 

9from ..attributes import BroadcastValue 

10from ..input import RTFBody 

11from ..pagination.processor import PageFeatureProcessor 

12from ..pagination.strategies import PageContext, PaginationContext, StrategyRegistry 

13from ..pagination.strategies.defaults import DefaultPaginationStrategy 

14from ..pagination.strategies.grouping import PageByStrategy, SublineStrategy 

15from ..row import Utils 

16from ..services import RTFEncodingService 

17from ..services.color_service import color_service 

18from ..services.document_service import RTFDocumentService 

19from ..services.figure_service import RTFFigureService 

20from ..services.grouping_service import grouping_service 

21from ..type_guards import is_single_body 

22from .base import EncodingStrategy 

23from .renderer import PageRenderer 

24 

25 

26class UnifiedRTFEncoder(EncodingStrategy): 

27 """Unified RTF Encoder using the strategy pattern for pagination and rendering.""" 

28 

29 def __init__(self): 

30 self.encoding_service = RTFEncodingService() 

31 self.document_service = RTFDocumentService() 

32 self.figure_service = RTFFigureService() 

33 self.feature_processor = PageFeatureProcessor() 

34 self.renderer = PageRenderer() 

35 

36 # Register strategies (if not already registered elsewhere) 

37 # Ideally this happens at app startup, but for now we ensure they are available 

38 StrategyRegistry.register("default", DefaultPaginationStrategy) 

39 StrategyRegistry.register("page_by", PageByStrategy) 

40 StrategyRegistry.register("subline", SublineStrategy) 

41 

42 def _encode_body_section( 

43 self, document: RTFDocument, df: Any, rtf_body: Any 

44 ) -> list[str]: 

45 """Encode a single body section using the unified pipeline. 

46 

47 Args: 

48 document: The RTF document context 

49 df: DataFrame for this section 

50 rtf_body: RTFBody attributes for this section 

51 

52 Returns: 

53 List of RTF strings (rendered pages/rows) 

54 """ 

55 

56 # A. Prepare Data 

57 processed_df, original_df, processed_attrs = ( 

58 self.encoding_service.prepare_dataframe_for_body_encoding(df, rtf_body) 

59 ) 

60 

61 # B. Select Strategy 

62 strategy_name = "default" 

63 if is_single_body(rtf_body): 

64 if rtf_body.subline_by: 

65 strategy_name = "subline" 

66 elif rtf_body.page_by: 

67 strategy_name = "page_by" 

68 

69 strategy_cls = StrategyRegistry.get(strategy_name) 

70 strategy = strategy_cls() 

71 

72 # C. Prepare Context 

73 col_total_width = document.rtf_page.col_width 

74 if is_single_body(rtf_body) and processed_attrs.col_rel_width: 

75 col_widths = Utils._col_widths( 

76 processed_attrs.col_rel_width, 

77 col_total_width if col_total_width is not None else 8.5, 

78 ) 

79 else: 

80 col_widths = Utils._col_widths( 

81 [1] * processed_df.shape[1], 

82 col_total_width if col_total_width is not None else 8.5, 

83 ) 

84 

85 additional_rows = self.document_service.calculate_additional_rows_per_page( 

86 document 

87 ) 

88 

89 # Calculate removed column indices 

90 # Calculate removed column indices 

91 removed_column_indices = [] 

92 # Ensure we are working with a DataFrame and RTFBody for single section encoding 

93 if isinstance(original_df, pl.DataFrame) and isinstance(rtf_body, RTFBody): 

94 if processed_df.shape[1] < original_df.shape[1]: 

95 # Find indices of columns that were removed 

96 # We assume columns are removed, not reordered significantly enough to 

97 # break this simple check for the purpose of pagination context 

98 processed_cols = set(processed_df.columns) 

99 for i, col in enumerate(original_df.columns): 

100 if col not in processed_cols: 

101 removed_column_indices.append(i) 

102 

103 pagination_ctx = PaginationContext( 

104 df=original_df, # Use original DF for context 

105 rtf_body=rtf_body, 

106 rtf_page=document.rtf_page, 

107 col_widths=col_widths, 

108 table_attrs=processed_attrs, 

109 additional_rows_per_page=additional_rows, 

110 removed_column_indices=removed_column_indices, 

111 ) 

112 else: 

113 # Fallback or error for unexpected types in this context 

114 # Should not happen given is_single_body checks usually 

115 pagination_ctx = PaginationContext( 

116 df=processed_df, 

117 rtf_body=processed_attrs, # Best effort fallback 

118 rtf_page=document.rtf_page, 

119 col_widths=col_widths, 

120 table_attrs=processed_attrs, 

121 additional_rows_per_page=additional_rows, 

122 ) 

123 

124 # D. Paginate 

125 

126 pages = strategy.paginate(pagination_ctx) 

127 

128 # Handle case where no pages are generated (e.g. empty dataframe) 

129 if not pages: 

130 # Create empty page to ensure document structure (title, etc.) is rendered. 

131 pages = [ 

132 PageContext( 

133 page_number=1, 

134 total_pages=1, 

135 data=processed_df, 

136 is_first_page=True, 

137 is_last_page=True, 

138 col_widths=col_widths, 

139 needs_header=True, 

140 table_attrs=processed_attrs, 

141 ) 

142 ] 

143 

144 # Post-pagination fixup 

145 if is_single_body(rtf_body): 

146 self._apply_data_post_processing(pages, processed_df, rtf_body) 

147 

148 # E. Process & Render Pages 

149 section_rtf_chunks = [] 

150 

151 for _i, page in enumerate(pages): 

152 # Process features (borders, etc.) 

153 processed_page = self.feature_processor.process(document, page) 

154 

155 # Render 

156 chunks = self.renderer.render(document, processed_page) 

157 section_rtf_chunks.extend(chunks) 

158 

159 # Add page break between pages (except last page) 

160 # Note: PageRenderer handles page breaks at the start of non-first pages. 

161 # So we do NOT add them here to avoid double breaks. 

162 pass 

163 

164 return section_rtf_chunks 

165 

166 def encode(self, document: Any) -> str: 

167 """Encode the document using the unified pipeline.""" 

168 

169 # 1. Figure-only handling 

170 if document.df is None: 

171 return self._encode_figure_only(document) 

172 

173 # 2. Multi-section handling 

174 if isinstance(document.df, list): 

175 return self._encode_multi_section(document) 

176 

177 # 3. Standard Pipeline 

178 color_service.set_document_context(document) 

179 

180 page_rtf_chunks = self._encode_body_section( 

181 document, document.df, document.rtf_body 

182 ) 

183 

184 # F. Assembly 

185 result = "\n".join( 

186 [ 

187 item 

188 for item in [ 

189 self.encoding_service.encode_document_start(), 

190 self.encoding_service.encode_font_table(), 

191 self.encoding_service.encode_color_table(document), 

192 "\n", 

193 self.encoding_service.encode_page_header( 

194 document.rtf_page_header, method="line" 

195 ), 

196 self.encoding_service.encode_page_footer( 

197 document.rtf_page_footer, method="line" 

198 ), 

199 self.encoding_service.encode_page_settings(document.rtf_page), 

200 "\n".join(page_rtf_chunks), 

201 "\n\n", 

202 "}", 

203 ] 

204 if item is not None 

205 ] 

206 ) 

207 

208 color_service.clear_document_context() 

209 return result 

210 

211 def _apply_data_post_processing(self, pages, processed_df, rtf_body): 

212 """Sync page data with processed dataframe and handle group_by restoration.""" 

213 # 1. Replace data slices 

214 # We assume the pagination strategy preserved the row order and counts 

215 # matching the processed_df (which corresponds to the original df structure 

216 # minus excluded columns). 

217 current_idx = 0 

218 for page in pages: 

219 rows = page.data.height 

220 page.data = processed_df.slice(current_idx, rows) 

221 current_idx += rows 

222 

223 # 2. Re-implementation of group_by logic 

224 if rtf_body.group_by: 

225 # Collect page start indices for context restoration 

226 page_start_indices = [] 

227 cumulative = 0 

228 for i, p in enumerate(pages): 

229 if i > 0: 

230 page_start_indices.append(cumulative) 

231 cumulative += p.data.height 

232 

233 full_df = processed_df 

234 

235 suppressed = grouping_service.enhance_group_by(full_df, rtf_body.group_by) 

236 restored = grouping_service.restore_page_context( 

237 suppressed, full_df, rtf_body.group_by, page_start_indices 

238 ) 

239 

240 curr = 0 

241 for p in pages: 

242 rows = p.data.height 

243 p.data = restored.slice(curr, rows) 

244 curr += rows 

245 

246 def _encode_figure_only(self, document: RTFDocument): 

247 """Encode a figure-only document.""" 

248 from copy import deepcopy 

249 

250 from ..figure import rtf_read_figure 

251 

252 if not document.rtf_figure or not document.rtf_figure.figures: 

253 return "" 

254 

255 figs, formats = rtf_read_figure(document.rtf_figure.figures) 

256 num = len(figs) 

257 

258 # Pre-calculate shared elements 

259 title = self.encoding_service.encode_title(document.rtf_title, method="line") 

260 

261 # For figure-only documents, footnote should be as_table=False 

262 footnote_component = document.rtf_footnote 

263 if footnote_component is not None: 

264 footnote_component = deepcopy(footnote_component) 

265 footnote_component.as_table = False 

266 

267 # Determine which elements should show on each page 

268 show_title_on_all = document.rtf_page.page_title == "all" 

269 show_footnote_on_all = document.rtf_page.page_footnote == "all" 

270 show_source_on_all = document.rtf_page.page_source == "all" 

271 

272 # Build 

273 parts = [ 

274 self.encoding_service.encode_document_start(), 

275 self.encoding_service.encode_font_table(), 

276 self.encoding_service.encode_color_table(document), 

277 "\n", 

278 self.encoding_service.encode_page_header( 

279 document.rtf_page_header, method="line" 

280 ), 

281 self.encoding_service.encode_page_footer( 

282 document.rtf_page_footer, method="line" 

283 ), 

284 self.encoding_service.encode_page_settings(document.rtf_page), 

285 ] 

286 

287 for i in range(num): 

288 is_first = i == 0 

289 is_last = i == num - 1 

290 

291 # Title 

292 if ( 

293 show_title_on_all 

294 or (document.rtf_page.page_title == "first" and is_first) 

295 or (document.rtf_page.page_title == "last" and is_last) 

296 ): 

297 parts.append(title) 

298 parts.append("\n") 

299 

300 # Subline 

301 if is_first and document.rtf_subline: 

302 parts.append( 

303 self.encoding_service.encode_subline( 

304 document.rtf_subline, method="line" 

305 ) 

306 ) 

307 

308 # Figure 

309 w = self.figure_service._get_dimension(document.rtf_figure.fig_width, i) 

310 h = self.figure_service._get_dimension(document.rtf_figure.fig_height, i) 

311 parts.append( 

312 self.figure_service._encode_single_figure( 

313 figs[i], formats[i], w, h, document.rtf_figure.fig_align 

314 ) 

315 ) 

316 parts.append(r"\par ") 

317 

318 # Footnote based on page settings 

319 if footnote_component is not None and ( 

320 show_footnote_on_all 

321 or (document.rtf_page.page_footnote == "first" and is_first) 

322 or (document.rtf_page.page_footnote == "last" and is_last) 

323 ): 

324 footnote_content = "\n".join( 

325 self.encoding_service.encode_footnote( 

326 footnote_component, 

327 page_number=i + 1, 

328 page_col_width=document.rtf_page.col_width, 

329 ) 

330 ) 

331 if footnote_content: 

332 parts.append(footnote_content) 

333 

334 # Source based on page settings 

335 if document.rtf_source is not None and ( 

336 show_source_on_all 

337 or (document.rtf_page.page_source == "first" and is_first) 

338 or (document.rtf_page.page_source == "last" and is_last) 

339 ): 

340 source_content = "\n".join( 

341 self.encoding_service.encode_source( 

342 document.rtf_source, 

343 page_number=i + 1, 

344 page_col_width=document.rtf_page.col_width, 

345 ) 

346 ) 

347 if source_content: 

348 parts.append(source_content) 

349 

350 if not is_last: 

351 parts.append(r"\page ") 

352 

353 parts.append("\n\n}") 

354 return "".join([p for p in parts if p]) 

355 

356 def _encode_multi_section(self, document: RTFDocument) -> str: 

357 """Encode a multi-section document where sections are concatenated row by row. 

358 

359 Args: 

360 document: The RTF document with multiple df/rtf_body sections 

361 

362 Returns: 

363 Complete RTF string 

364 """ 

365 

366 from ..type_guards import is_nested_header_list 

367 

368 # Calculate column counts for border management 

369 if isinstance(document.df, list): 

370 first_section_cols = document.df[0].shape[1] if document.df else 0 

371 else: 

372 first_section_cols = document.df.shape[1] if document.df is not None else 0 

373 

374 # Document structure components 

375 # rtf_title is handled per section via temp_document and renderer 

376 # so we don't need to pre-calculate it here. 

377 

378 # Handle page borders (use first section for dimensions) 

379 # doc_border_top is not used in this scope 

380 doc_border_bottom_list = BroadcastValue( 

381 value=document.rtf_page.border_last, dimension=(1, first_section_cols) 

382 ).to_list() 

383 doc_border_bottom = ( 

384 doc_border_bottom_list[0] if doc_border_bottom_list else None 

385 ) 

386 

387 # Encode sections 

388 all_section_content = [] 

389 is_nested_headers = is_nested_header_list(document.rtf_column_header) 

390 

391 df_list = ( 

392 document.df 

393 if isinstance(document.df, list) 

394 else [document.df] 

395 if document.df is not None 

396 else [] 

397 ) 

398 body_list = ( 

399 document.rtf_body 

400 if isinstance(document.rtf_body, list) 

401 else [document.rtf_body] 

402 if document.rtf_body is not None 

403 else [] 

404 ) 

405 

406 for i, (section_df, section_body) in enumerate( 

407 zip(df_list, body_list, strict=True) 

408 ): 

409 # Determine column headers for this section 

410 section_headers_obj = None 

411 if is_nested_headers: 

412 if isinstance(document.rtf_column_header, list) and i < len( 

413 document.rtf_column_header 

414 ): 

415 section_headers_obj = document.rtf_column_header[i] 

416 else: 

417 # Flat format - only apply to first section 

418 if i == 0: 

419 section_headers_obj = document.rtf_column_header 

420 

421 # Create a temporary document for this section 

422 # We need to adjust page borders: 

423 # - border_first only applies to the first section 

424 # - border_last only applies to the last section 

425 section_page = document.rtf_page.model_copy() 

426 if i > 0: 

427 section_page.border_first = None 

428 if i < len(df_list) - 1: 

429 section_page.border_last = None 

430 

431 # Handle component visibility across sections 

432 # Use model_copy to avoid modifying original document components 

433 section_title = ( 

434 document.rtf_title.model_copy() if document.rtf_title else None 

435 ) 

436 section_footnote = ( 

437 document.rtf_footnote.model_copy() if document.rtf_footnote else None 

438 ) 

439 section_source = ( 

440 document.rtf_source.model_copy() if document.rtf_source else None 

441 ) 

442 section_subline = ( 

443 document.rtf_subline.model_copy() if document.rtf_subline else None 

444 ) 

445 section_page_header = ( 

446 document.rtf_page_header.model_copy() 

447 if document.rtf_page_header 

448 else None 

449 ) 

450 section_page_footer = ( 

451 document.rtf_page_footer.model_copy() 

452 if document.rtf_page_footer 

453 else None 

454 ) 

455 

456 # Title: if "first", only show on first section 

457 # Also suppress if this section continues on the same page (new_page=False) 

458 if i > 0: 

459 should_suppress = not section_body.new_page 

460 

461 if (document.rtf_page.page_title == "first") or should_suppress: 

462 if section_title: 

463 section_title.text = None 

464 if section_subline: 

465 section_subline.text = None 

466 

467 # Suppress Page Header/Footer for continuous sections 

468 if should_suppress: 

469 if section_page_header: 

470 section_page_header.text = None 

471 if section_page_footer: 

472 section_page_footer.text = None 

473 

474 # Footnote/Source: if "last", only show on last section 

475 # For continuous sections, suppress them unless it's the last one. 

476 if i < len(df_list) - 1: 

477 should_suppress = not body_list[ 

478 i + 1 

479 ].new_page # Next section continues 

480 

481 if document.rtf_page.page_footnote == "last" and section_footnote: 

482 section_footnote.text = None 

483 if document.rtf_page.page_source == "last" and section_source: 

484 section_source.text = None 

485 

486 # Use model_copy to safely create a new instance with updated fields 

487 temp_document = document.model_copy( 

488 update={ 

489 "df": section_df, 

490 "rtf_body": section_body, 

491 "rtf_column_header": section_headers_obj, 

492 "rtf_page": section_page, 

493 "rtf_title": section_title, 

494 "rtf_subline": section_subline, 

495 "rtf_page_header": section_page_header, 

496 "rtf_page_footer": section_page_footer, 

497 "rtf_footnote": section_footnote, 

498 "rtf_source": section_source, 

499 } 

500 ) 

501 

502 # Encode section body (headers will be handled by PageRenderer) 

503 section_body_content = self._encode_body_section( 

504 temp_document, section_df, section_body 

505 ) 

506 all_section_content.extend(section_body_content) 

507 

508 # Handle bottom borders on last section 

509 if document.rtf_footnote is not None and doc_border_bottom is not None: 

510 document.rtf_footnote.border_bottom = BroadcastValue( 

511 value=document.rtf_footnote.border_bottom, dimension=(1, 1) 

512 ).update_row(0, [doc_border_bottom[0]]) 

513 else: 

514 # Apply bottom border to last section's last row 

515 if isinstance(document.rtf_body, list) and isinstance(document.df, list): 

516 last_section_body = document.rtf_body[-1] 

517 last_section_dim = document.df[-1].shape 

518 if last_section_dim[0] > 0 and doc_border_bottom is not None: 

519 last_section_body.border_bottom = BroadcastValue( 

520 value=last_section_body.border_bottom, 

521 dimension=last_section_dim, 

522 ).update_row(last_section_dim[0] - 1, doc_border_bottom) 

523 

524 return "\n".join( 

525 [ 

526 item 

527 for item in [ 

528 self.encoding_service.encode_document_start(), 

529 self.encoding_service.encode_font_table(), 

530 self.encoding_service.encode_color_table(document), 

531 "\n", 

532 self.encoding_service.encode_page_header( 

533 document.rtf_page_header, method="line" 

534 ), 

535 self.encoding_service.encode_page_footer( 

536 document.rtf_page_footer, method="line" 

537 ), 

538 self.encoding_service.encode_page_settings(document.rtf_page), 

539 "\n".join(all_section_content), 

540 "\n\n", 

541 "}", 

542 ] 

543 if item is not None 

544 ] 

545 ) 

546 

547 # 3. Standard Pipeline 

548 color_service.set_document_context(document) 

549 

550 page_rtf_chunks = self._encode_body_section( 

551 document, document.df, document.rtf_body 

552 ) 

553 

554 # F. Assembly 

555 result = "\n".join( 

556 [ 

557 item 

558 for item in [ 

559 self.encoding_service.encode_document_start(), 

560 self.encoding_service.encode_font_table(), 

561 self.encoding_service.encode_color_table(document), 

562 "\n", 

563 self.encoding_service.encode_page_header( 

564 document.rtf_page_header, method="line" 

565 ), 

566 self.encoding_service.encode_page_footer( 

567 document.rtf_page_footer, method="line" 

568 ), 

569 self.encoding_service.encode_page_settings(document.rtf_page), 

570 "\n".join(page_rtf_chunks), 

571 "\n\n", 

572 "}", 

573 ] 

574 if item is not None 

575 ] 

576 ) 

577 

578 color_service.clear_document_context() 

579 return result