Coverage for src / rtflite / services / encoding_service.py: 65%

216 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-28 05:09 +0000

1"""RTF encoding service that handles document component encoding.""" 

2 

3from collections.abc import Sequence 

4 

5from .grouping_service import grouping_service 

6 

7 

8class RTFEncodingService: 

9 """Service class that handles RTF component encoding operations. 

10 

11 This class extracts encoding logic from RTFDocument to improve separation 

12 of concerns and enable better testing and maintainability. 

13 """ 

14 

15 def __init__(self): 

16 from ..rtf import RTFSyntaxGenerator 

17 

18 self.syntax = RTFSyntaxGenerator() 

19 

20 def encode_spanning_row( 

21 self, 

22 text: str, 

23 page_width: float, 

24 rtf_body_attrs=None, 

25 ) -> Sequence[str]: 

26 """Generate a spanning table row (single cell spanning full width). 

27 

28 This is used for page_by group headers that span across all columns. 

29 Works for both single-page and paginated documents. 

30 

31 Args: 

32 text: Text to display in the spanning row 

33 page_width: Total page width in inches 

34 rtf_body_attrs: RTFBody attributes for styling (optional) 

35 

36 Returns: 

37 List of RTF strings for the spanning row 

38 """ 

39 from ..row import Border, Cell, Row, TextContent 

40 

41 # Use body attributes if provided, otherwise use defaults 

42 if rtf_body_attrs: 

43 font = rtf_body_attrs.text_font[0][0] if rtf_body_attrs.text_font else 0 

44 size = ( 

45 rtf_body_attrs.text_font_size[0][0] 

46 if rtf_body_attrs.text_font_size 

47 else 18 

48 ) 

49 text_format = ( 

50 rtf_body_attrs.text_format[0][0] if rtf_body_attrs.text_format else "" 

51 ) 

52 color = rtf_body_attrs.text_color[0][0] if rtf_body_attrs.text_color else "" 

53 bg_color = ( 

54 rtf_body_attrs.text_background_color[0][0] 

55 if rtf_body_attrs.text_background_color 

56 else "" 

57 ) 

58 justification = ( 

59 rtf_body_attrs.text_justification[0][0] 

60 if rtf_body_attrs.text_justification 

61 else "c" 

62 ) 

63 border_left = ( 

64 rtf_body_attrs.border_left[0][0] 

65 if rtf_body_attrs.border_left 

66 else "single" 

67 ) 

68 border_right = ( 

69 rtf_body_attrs.border_right[0][0] 

70 if rtf_body_attrs.border_right 

71 else "single" 

72 ) 

73 border_top = ( 

74 rtf_body_attrs.border_top[0][0] 

75 if rtf_body_attrs.border_top 

76 else "single" 

77 ) 

78 border_bottom = ( 

79 rtf_body_attrs.border_bottom[0][0] 

80 if rtf_body_attrs.border_bottom 

81 else "single" 

82 ) 

83 v_just = ( 

84 rtf_body_attrs.cell_vertical_justification[0][0] 

85 if rtf_body_attrs.cell_vertical_justification 

86 else "b" 

87 ) 

88 cell_just = ( 

89 rtf_body_attrs.cell_justification[0][0] 

90 if rtf_body_attrs.cell_justification 

91 else "c" 

92 ) 

93 else: 

94 font = 0 

95 size = 18 

96 text_format = "" 

97 color = "" 

98 bg_color = "" 

99 justification = "c" 

100 border_left = "single" 

101 border_right = "single" 

102 border_top = "single" 

103 border_bottom = "single" 

104 v_just = "b" 

105 cell_just = "c" 

106 

107 # Create spanning cell 

108 cell = Cell( 

109 text=TextContent( 

110 text=text, 

111 font=font, 

112 size=size, 

113 format=text_format, 

114 color=color, 

115 background_color=bg_color, 

116 justification=justification, 

117 indent_first=0, 

118 indent_left=0, 

119 indent_right=0, 

120 space=0, # No line spacing 

121 space_before=15, 

122 space_after=15, 

123 convert=False, 

124 hyphenation=True, 

125 ), 

126 width=page_width, 

127 border_left=Border(style=border_left), 

128 border_right=Border(style=border_right), 

129 border_top=Border(style=border_top), 

130 border_bottom=Border(style=border_bottom), 

131 vertical_justification=v_just, 

132 ) 

133 

134 # Create row with single spanning cell 

135 row = Row(row_cells=[cell], justification=cell_just, height=0) 

136 

137 return row._as_rtf() 

138 

139 def encode_document_start(self) -> str: 

140 """Encode RTF document start.""" 

141 return "{\\rtf1\\ansi\n\\deff0\\deflang1033" 

142 

143 def encode_font_table(self) -> str: 

144 """Encode RTF font table.""" 

145 return self.syntax.generate_font_table() 

146 

147 def encode_color_table( 

148 self, document=None, used_colors: Sequence[str] | None = None 

149 ) -> str: 

150 """Encode RTF color table with comprehensive 657-color support. 

151 

152 Args: 

153 document: RTF document to analyze for color usage (preferred) 

154 used_colors: Color names used in the document. If None and a 

155 document is provided, colors are auto-detected. 

156 

157 Returns: 

158 RTF color table string (empty if no colors beyond black/"" are used) 

159 """ 

160 if document is not None and used_colors is None: 

161 # Auto-detect colors from document 

162 from ..services.color_service import color_service 

163 

164 used_colors = color_service.collect_document_colors(document) 

165 

166 return self.syntax.generate_color_table(used_colors) 

167 

168 def encode_page_settings(self, page_config) -> str: 

169 """Encode RTF page settings. 

170 

171 Args: 

172 page_config: RTFPage configuration object 

173 

174 Returns: 

175 RTF page settings string 

176 """ 

177 return self.syntax.generate_page_settings( 

178 page_config.width, 

179 page_config.height, 

180 page_config.margin, 

181 page_config.orientation, 

182 ) 

183 

184 def encode_page_header(self, header_config, method: str = "line") -> str: 

185 """Encode page header component. 

186 

187 Args: 

188 header_config: RTFPageHeader configuration 

189 method: Encoding method 

190 

191 Returns: 

192 RTF header string 

193 """ 

194 if header_config is None or not header_config.text: 

195 return "" 

196 

197 # Use the existing text encoding method 

198 result = header_config._encode_text(text=header_config.text, method=method) 

199 

200 return f"{{\\header{result}}}" 

201 

202 def encode_page_footer(self, footer_config, method: str = "line") -> str: 

203 """Encode page footer component. 

204 

205 Args: 

206 footer_config: RTFPageFooter configuration 

207 method: Encoding method 

208 

209 Returns: 

210 RTF footer string 

211 """ 

212 if footer_config is None or not footer_config.text: 

213 return "" 

214 

215 # Use the existing text encoding method 

216 result = footer_config._encode_text(text=footer_config.text, method=method) 

217 return f"{{\\footer{result}}}" 

218 

219 def encode_title(self, title_config, method: str = "line") -> str: 

220 """Encode title component. 

221 

222 Args: 

223 title_config: RTFTitle configuration 

224 method: Encoding method 

225 

226 Returns: 

227 RTF title string 

228 """ 

229 if not title_config or not title_config.text: 

230 return "" 

231 

232 # Use the existing text encoding method 

233 return title_config._encode_text(text=title_config.text, method=method) 

234 

235 def encode_subline(self, subline_config, method: str = "line") -> str: 

236 """Encode subline component. 

237 

238 Args: 

239 subline_config: RTFSubline configuration 

240 method: Encoding method 

241 

242 Returns: 

243 RTF subline string 

244 """ 

245 if subline_config is None or not subline_config.text: 

246 return "" 

247 

248 # Use the existing text encoding method 

249 return subline_config._encode_text(text=subline_config.text, method=method) 

250 

251 def encode_footnote( 

252 self, 

253 footnote_config, 

254 page_number: int | None = None, 

255 page_col_width: float | None = None, 

256 ) -> Sequence[str]: 

257 """Encode footnote component with advanced formatting. 

258 

259 Args: 

260 footnote_config: RTFFootnote configuration 

261 page_number: Page number for footnote 

262 page_col_width: Page column width for calculations 

263 

264 Returns: 

265 List of RTF footnote strings 

266 """ 

267 if footnote_config is None: 

268 return [] 

269 

270 rtf_attrs = footnote_config 

271 

272 # Apply page-specific border if set 

273 if ( 

274 hasattr(rtf_attrs, "_page_border_style") 

275 and page_number is not None 

276 and page_number in rtf_attrs._page_border_style 

277 ): 

278 border_style = rtf_attrs._page_border_style[page_number] 

279 # Create a copy with modified border 

280 rtf_attrs = rtf_attrs.model_copy() 

281 rtf_attrs.border_bottom = [[border_style]] 

282 

283 # Check if footnote should be rendered as table or paragraph 

284 if hasattr(rtf_attrs, "as_table") and not rtf_attrs.as_table: 

285 # Render as paragraph (plain text) 

286 if isinstance(rtf_attrs.text, list): 

287 text_list = rtf_attrs.text 

288 else: 

289 text_list = [rtf_attrs.text] if rtf_attrs.text else [] 

290 

291 # Use TextAttributes._encode_text method directly for paragraph rendering 

292 return rtf_attrs._encode_text(text_list, method="paragraph") 

293 else: 

294 # Render as table (default behavior) 

295 if page_col_width is not None: 

296 from ..row import Utils 

297 

298 col_total_width = page_col_width 

299 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, col_total_width) 

300 

301 # Create DataFrame from text string 

302 import polars as pl 

303 

304 df = pl.DataFrame([[rtf_attrs.text]]) 

305 return rtf_attrs._encode(df, col_widths) 

306 else: 

307 # Fallback without column width calculations 

308 import polars as pl 

309 

310 df = pl.DataFrame([[rtf_attrs.text]]) 

311 return rtf_attrs._encode(df) 

312 

313 def encode_source( 

314 self, 

315 source_config, 

316 page_number: int | None = None, 

317 page_col_width: float | None = None, 

318 ) -> Sequence[str]: 

319 """Encode source component with advanced formatting. 

320 

321 Args: 

322 source_config: RTFSource configuration 

323 page_number: Page number for source 

324 page_col_width: Page column width for calculations 

325 

326 Returns: 

327 List of RTF source strings 

328 """ 

329 if source_config is None: 

330 return [] 

331 

332 rtf_attrs = source_config 

333 

334 # Apply page-specific border if set 

335 if ( 

336 hasattr(rtf_attrs, "_page_border_style") 

337 and page_number is not None 

338 and page_number in rtf_attrs._page_border_style 

339 ): 

340 border_style = rtf_attrs._page_border_style[page_number] 

341 # Create a copy with modified border 

342 rtf_attrs = rtf_attrs.model_copy() 

343 rtf_attrs.border_bottom = [[border_style]] 

344 

345 # Check if source should be rendered as table or paragraph 

346 if hasattr(rtf_attrs, "as_table") and not rtf_attrs.as_table: 

347 # Render as paragraph (plain text) 

348 if isinstance(rtf_attrs.text, list): 

349 text_list = rtf_attrs.text 

350 else: 

351 text_list = [rtf_attrs.text] if rtf_attrs.text else [] 

352 

353 # Use TextAttributes._encode_text method directly for paragraph rendering 

354 return rtf_attrs._encode_text(text_list, method="paragraph") 

355 else: 

356 # Render as table (default behavior) 

357 if page_col_width is not None: 

358 from ..row import Utils 

359 

360 col_total_width = page_col_width 

361 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, col_total_width) 

362 

363 # Create DataFrame from text string 

364 import polars as pl 

365 

366 df = pl.DataFrame([[rtf_attrs.text]]) 

367 return rtf_attrs._encode(df, col_widths) 

368 else: 

369 # Fallback without column width calculations 

370 import polars as pl 

371 

372 df = pl.DataFrame([[rtf_attrs.text]]) 

373 return rtf_attrs._encode(df) 

374 

375 def prepare_dataframe_for_body_encoding(self, df, rtf_attrs): 

376 """Prepare DataFrame for body encoding with group_by and column removal. 

377 

378 Args: 

379 df: Input DataFrame 

380 rtf_attrs: RTFBody attributes 

381 

382 Returns: 

383 Tuple of (processed_df, original_df) where processed_df has 

384 transformations applied 

385 """ 

386 original_df = df.clone() 

387 processed_df = df.clone() 

388 

389 # Collect columns to remove 

390 columns_to_remove = set() 

391 

392 # Remove subline_by columns from the processed DataFrame 

393 if rtf_attrs.subline_by is not None: 

394 columns_to_remove.update(rtf_attrs.subline_by) 

395 

396 # Remove page_by columns from table display 

397 # page_by columns are shown as spanning rows, not as table columns 

398 # The new_page flag only controls whether to force page breaks at group boundaries 

399 if rtf_attrs.page_by is not None: 

400 columns_to_remove.update(rtf_attrs.page_by) 

401 

402 # Apply column removal if any columns need to be removed 

403 if columns_to_remove: 

404 remaining_columns = [ 

405 col for col in processed_df.columns if col not in columns_to_remove 

406 ] 

407 processed_df = processed_df.select(remaining_columns) 

408 

409 # Update col_rel_width to match the new column count 

410 # Find indices of removed columns to remove corresponding width entries 

411 if rtf_attrs.col_rel_width is not None: 

412 if len(rtf_attrs.col_rel_width) == len(original_df.columns): 

413 removed_indices = [ 

414 i 

415 for i, col in enumerate(original_df.columns) 

416 if col in columns_to_remove 

417 ] 

418 # Create new col_rel_width with removed column widths excluded 

419 new_col_rel_width = [ 

420 width 

421 for i, width in enumerate(rtf_attrs.col_rel_width) 

422 if i not in removed_indices 

423 ] 

424 # Update rtf_attrs with new col_rel_width 

425 rtf_attrs.col_rel_width = new_col_rel_width 

426 

427 # Note: group_by suppression is handled in the pagination strategy 

428 # for documents that need pagination. For non-paginated documents, 

429 # group_by is handled separately in encode_body method. 

430 

431 return processed_df, original_df 

432 

433 def encode_body( 

434 self, document, df, rtf_attrs, force_single_page=False 

435 ) -> Sequence[str] | None: 

436 """Encode table body component with full pagination support. 

437 

438 Args: 

439 document: RTFDocument instance for accessing pagination logic 

440 df: DataFrame containing table data 

441 rtf_attrs: RTFBody attributes 

442 

443 Returns: 

444 List of RTF body strings 

445 """ 

446 if rtf_attrs is None: 

447 return None 

448 

449 # Initialize dimensions and widths 

450 from ..row import Utils 

451 from .document_service import RTFDocumentService 

452 

453 document_service = RTFDocumentService() 

454 col_total_width = document.rtf_page.col_width 

455 

456 # Validate data sorting for all grouping parameters 

457 if any([rtf_attrs.group_by, rtf_attrs.page_by, rtf_attrs.subline_by]): 

458 grouping_service.validate_data_sorting( 

459 df, 

460 group_by=rtf_attrs.group_by, 

461 page_by=rtf_attrs.page_by, 

462 subline_by=rtf_attrs.subline_by, 

463 ) 

464 

465 # Validate subline_by formatting consistency and issue warnings 

466 if rtf_attrs.subline_by is not None: 

467 import warnings 

468 

469 formatting_warnings = ( 

470 grouping_service.validate_subline_formatting_consistency( 

471 df, rtf_attrs.subline_by, rtf_attrs 

472 ) 

473 ) 

474 for warning_msg in formatting_warnings: 

475 warnings.warn( 

476 f"subline_by formatting: {warning_msg}", UserWarning, stacklevel=2 

477 ) 

478 

479 # Apply group_by and subline_by processing if specified 

480 processed_df, original_df = self.prepare_dataframe_for_body_encoding( 

481 df, rtf_attrs 

482 ) 

483 

484 # Calculate col_widths AFTER prepare_dataframe_for_body_encoding() 

485 # because that method may modify col_rel_width when removing columns (page_by, subline_by) 

486 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, col_total_width) 

487 

488 # Check if pagination is needed (unless forced to single page) 

489 if not force_single_page and document_service.needs_pagination(document): 

490 return self._encode_body_paginated( 

491 document, processed_df, rtf_attrs, col_widths 

492 ) 

493 

494 # Handle existing page_by grouping (non-paginated) 

495 page_by = document_service.process_page_by(document) 

496 if page_by is None: 

497 # Note: subline_by documents should use pagination, so this path 

498 # should not be reached for them 

499 # Apply group_by processing for non-paginated documents 

500 if rtf_attrs.group_by is not None: 

501 processed_df = grouping_service.enhance_group_by( 

502 processed_df, rtf_attrs.group_by 

503 ) 

504 return rtf_attrs._encode(processed_df, col_widths) 

505 

506 rows: list[str] = [] 

507 for section in page_by: 

508 # Skip empty sections 

509 indices = [(row, col) for row, col, level in section] 

510 if not indices: 

511 continue 

512 

513 # Create DataFrame for current section 

514 import polars as pl 

515 

516 from ..attributes import BroadcastValue 

517 

518 section_df = pl.DataFrame( 

519 { 

520 str(i): [ 

521 BroadcastValue(value=processed_df, dimension=None).iloc( 

522 row, col 

523 ) 

524 ] 

525 for i, (row, col) in enumerate(indices) 

526 } 

527 ) 

528 

529 # Collect all text and table attributes 

530 from ..input import TableAttributes 

531 

532 section_attrs_dict = rtf_attrs._get_section_attributes(indices) 

533 section_attrs = TableAttributes(**section_attrs_dict) 

534 

535 # Calculate column widths and encode section 

536 if section_attrs.col_rel_width is None: 

537 # Default to equal widths if not specified 

538 section_attrs.col_rel_width = [1.0] * len(indices) 

539 section_col_widths = Utils._col_widths( 

540 section_attrs.col_rel_width, col_total_width 

541 ) 

542 rows.extend(section_attrs._encode(section_df, section_col_widths)) 

543 

544 return rows 

545 

546 def _encode_body_paginated( 

547 self, document, df, rtf_attrs, col_widths 

548 ) -> Sequence[str]: 

549 """Encode body content with pagination support.""" 

550 from .document_service import RTFDocumentService 

551 

552 document_service = RTFDocumentService() 

553 _, distributor = document_service.create_pagination_instance(document) 

554 

555 # Distribute content across pages (r2rtf compatible) 

556 additional_rows = document_service.calculate_additional_rows_per_page(document) 

557 pages = distributor.distribute_content( 

558 df=df, 

559 col_widths=col_widths, 

560 table_attrs=rtf_attrs, 

561 additional_rows_per_page=additional_rows, 

562 ) 

563 

564 # Generate RTF for each page 

565 all_rows = [] 

566 for page_num, page_content in enumerate(pages, 1): 

567 page_rows = [] 

568 

569 # Add page header content 

570 if page_content.get("headers"): 

571 for header_content in page_content["headers"]: 

572 header_text = header_content.get("text", "") 

573 if header_text: 

574 page_rows.append(header_text) 

575 

576 # Add table data 

577 page_data = page_content.get("data") 

578 if page_data is not None: 

579 # Check if it's a DataFrame or a list 

580 if hasattr(page_data, "is_empty"): 

581 # It's a DataFrame 

582 if not page_data.is_empty(): 

583 page_rows.extend(page_data) 

584 else: 

585 # It's a list or other iterable 

586 if page_data: 

587 page_rows.extend(page_data) 

588 

589 # Add footer content 

590 if page_content.get("footers"): 

591 for footer_content in page_content["footers"]: 

592 footer_text = footer_content.get("text", "") 

593 if footer_text: 

594 page_rows.append(footer_text) 

595 

596 # Add page break between pages (except last page) 

597 if page_num < len(pages): 

598 page_rows.append(document_service.generate_page_break(document)) 

599 

600 all_rows.extend(page_rows) 

601 

602 return all_rows 

603 

604 def encode_column_header( 

605 self, df, rtf_attrs, page_col_width: float 

606 ) -> Sequence[str] | None: 

607 """Encode column header component with column width support. 

608 

609 Args: 

610 df: DataFrame containing header data 

611 rtf_attrs: RTFColumnHeader attributes 

612 page_col_width: Page column width for calculations 

613 

614 Returns: 

615 List of RTF header strings 

616 """ 

617 if rtf_attrs is None: 

618 return None 

619 

620 dim = df.shape 

621 

622 rtf_attrs.col_rel_width = rtf_attrs.col_rel_width or [1] * dim[1] 

623 rtf_attrs = rtf_attrs._set_default() 

624 

625 from ..row import Utils 

626 

627 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, page_col_width) 

628 

629 return rtf_attrs._encode(df, col_widths) 

630 

631 def encode_page_break(self, page_config, page_margin_encode_func) -> str: 

632 """Generate proper RTF page break sequence matching r2rtf format. 

633 

634 Args: 

635 page_config: RTFPage configuration 

636 page_margin_encode_func: Function to encode page margins 

637 

638 Returns: 

639 RTF page break string 

640 """ 

641 from ..core import RTFConstants 

642 

643 page_setup = ( 

644 f"\\paperw{int(page_config.width * RTFConstants.TWIPS_PER_INCH)}" 

645 f"\\paperh{int(page_config.height * RTFConstants.TWIPS_PER_INCH)}\n\n" 

646 f"{page_margin_encode_func()}\n" 

647 ) 

648 

649 return f"{{\\pard\\fs2\\par}}\\page{{\\pard\\fs2\\par}}\n{page_setup}" 

650 

651 def encode_page_margin(self, page_config) -> str: 

652 """Define RTF margin settings. 

653 

654 Args: 

655 page_config: RTFPage configuration with margin settings 

656 

657 Returns: 

658 RTF margin settings string 

659 """ 

660 from ..row import Utils 

661 

662 margin_codes = [ 

663 "\\margl", 

664 "\\margr", 

665 "\\margt", 

666 "\\margb", 

667 "\\headery", 

668 "\\footery", 

669 ] 

670 margins = [Utils._inch_to_twip(m) for m in page_config.margin] 

671 margin = "".join( 

672 f"{code}{margin}" 

673 for code, margin in zip(margin_codes, margins, strict=True) 

674 ) 

675 return margin + "\n"