Coverage for src/rtflite/services/encoding_service.py: 72%

177 statements  

« prev     ^ index     » next       coverage.py v7.10.5, created at 2025-08-25 22:35 +0000

1"""RTF encoding service that handles document component encoding.""" 

2 

3from collections.abc import Sequence 

4 

5from .grouping_service import grouping_service 

6 

7 

8class RTFEncodingService: 

9 """Service class that handles RTF component encoding operations. 

10 

11 This class extracts encoding logic from RTFDocument to improve separation 

12 of concerns and enable better testing and maintainability. 

13 """ 

14 

15 def __init__(self): 

16 from ..rtf import RTFSyntaxGenerator 

17 

18 self.syntax = RTFSyntaxGenerator() 

19 

20 def encode_document_start(self) -> str: 

21 """Encode RTF document start.""" 

22 return "{\\rtf1\\ansi\n\\deff0\\deflang1033" 

23 

24 def encode_font_table(self) -> str: 

25 """Encode RTF font table.""" 

26 return self.syntax.generate_font_table() 

27 

28 def encode_color_table( 

29 self, document=None, used_colors: Sequence[str] | None = None 

30 ) -> str: 

31 """Encode RTF color table with comprehensive 657-color support. 

32 

33 Args: 

34 document: RTF document to analyze for color usage (preferred) 

35 used_colors: List of color names used in document. If None and document provided, colors are auto-detected. 

36 

37 Returns: 

38 RTF color table string (empty if no colors beyond black/"" are used) 

39 """ 

40 if document is not None and used_colors is None: 

41 # Auto-detect colors from document 

42 from ..services.color_service import color_service 

43 

44 used_colors = color_service.collect_document_colors(document) 

45 

46 return self.syntax.generate_color_table(used_colors) 

47 

48 def encode_page_settings(self, page_config) -> str: 

49 """Encode RTF page settings. 

50 

51 Args: 

52 page_config: RTFPage configuration object 

53 

54 Returns: 

55 RTF page settings string 

56 """ 

57 return self.syntax.generate_page_settings( 

58 page_config.width, 

59 page_config.height, 

60 page_config.margin, 

61 page_config.orientation, 

62 ) 

63 

64 def encode_page_header(self, header_config, method: str = "line") -> str: 

65 """Encode page header component. 

66 

67 Args: 

68 header_config: RTFPageHeader configuration 

69 method: Encoding method 

70 

71 Returns: 

72 RTF header string 

73 """ 

74 if header_config is None or not header_config.text: 

75 return "" 

76 

77 # Use the existing text encoding method 

78 result = header_config._encode_text(text=header_config.text, method=method) 

79 

80 return f"{{\\header{result}}}" 

81 

82 def encode_page_footer(self, footer_config, method: str = "line") -> str: 

83 """Encode page footer component. 

84 

85 Args: 

86 footer_config: RTFPageFooter configuration 

87 method: Encoding method 

88 

89 Returns: 

90 RTF footer string 

91 """ 

92 if footer_config is None or not footer_config.text: 

93 return "" 

94 

95 # Use the existing text encoding method 

96 result = footer_config._encode_text(text=footer_config.text, method=method) 

97 return f"{{\\footer{result}}}" 

98 

99 def encode_title(self, title_config, method: str = "line") -> str: 

100 """Encode title component. 

101 

102 Args: 

103 title_config: RTFTitle configuration 

104 method: Encoding method 

105 

106 Returns: 

107 RTF title string 

108 """ 

109 if not title_config or not title_config.text: 

110 return "" 

111 

112 # Use the existing text encoding method 

113 return title_config._encode_text(text=title_config.text, method=method) 

114 

115 def encode_subline(self, subline_config, method: str = "line") -> str: 

116 """Encode subline component. 

117 

118 Args: 

119 subline_config: RTFSubline configuration 

120 method: Encoding method 

121 

122 Returns: 

123 RTF subline string 

124 """ 

125 if subline_config is None or not subline_config.text: 

126 return "" 

127 

128 # Use the existing text encoding method 

129 return subline_config._encode_text(text=subline_config.text, method=method) 

130 

131 def encode_footnote( 

132 self, 

133 footnote_config, 

134 page_number: int | None = None, 

135 page_col_width: float | None = None, 

136 ) -> Sequence[str]: 

137 """Encode footnote component with advanced formatting. 

138 

139 Args: 

140 footnote_config: RTFFootnote configuration 

141 page_number: Page number for footnote 

142 page_col_width: Page column width for calculations 

143 

144 Returns: 

145 List of RTF footnote strings 

146 """ 

147 if footnote_config is None: 

148 return [] 

149 

150 rtf_attrs = footnote_config 

151 

152 # Apply page-specific border if set 

153 if ( 

154 hasattr(rtf_attrs, "_page_border_style") 

155 and page_number is not None 

156 and page_number in rtf_attrs._page_border_style 

157 ): 

158 border_style = rtf_attrs._page_border_style[page_number] 

159 # Create a copy with modified border 

160 rtf_attrs = rtf_attrs.model_copy() 

161 rtf_attrs.border_bottom = [[border_style]] 

162 

163 # Check if footnote should be rendered as table or paragraph 

164 if hasattr(rtf_attrs, "as_table") and not rtf_attrs.as_table: 

165 # Render as paragraph (plain text) 

166 if isinstance(rtf_attrs.text, list): 

167 text_list = rtf_attrs.text 

168 else: 

169 text_list = [rtf_attrs.text] if rtf_attrs.text else [] 

170 

171 # Use TextAttributes._encode_text method directly for paragraph rendering 

172 return rtf_attrs._encode_text(text_list, method="paragraph") 

173 else: 

174 # Render as table (default behavior) 

175 if page_col_width is not None: 

176 from ..row import Utils 

177 

178 col_total_width = page_col_width 

179 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, col_total_width) 

180 

181 # Create DataFrame from text string 

182 import polars as pl 

183 

184 df = pl.DataFrame([[rtf_attrs.text]]) 

185 return rtf_attrs._encode(df, col_widths) 

186 else: 

187 # Fallback without column width calculations 

188 import polars as pl 

189 

190 df = pl.DataFrame([[rtf_attrs.text]]) 

191 return rtf_attrs._encode(df) 

192 

193 def encode_source( 

194 self, 

195 source_config, 

196 page_number: int | None = None, 

197 page_col_width: float | None = None, 

198 ) -> Sequence[str]: 

199 """Encode source component with advanced formatting. 

200 

201 Args: 

202 source_config: RTFSource configuration 

203 page_number: Page number for source 

204 page_col_width: Page column width for calculations 

205 

206 Returns: 

207 List of RTF source strings 

208 """ 

209 if source_config is None: 

210 return [] 

211 

212 rtf_attrs = source_config 

213 

214 # Apply page-specific border if set 

215 if ( 

216 hasattr(rtf_attrs, "_page_border_style") 

217 and page_number is not None 

218 and page_number in rtf_attrs._page_border_style 

219 ): 

220 border_style = rtf_attrs._page_border_style[page_number] 

221 # Create a copy with modified border 

222 rtf_attrs = rtf_attrs.model_copy() 

223 rtf_attrs.border_bottom = [[border_style]] 

224 

225 # Check if source should be rendered as table or paragraph 

226 if hasattr(rtf_attrs, "as_table") and not rtf_attrs.as_table: 

227 # Render as paragraph (plain text) 

228 if isinstance(rtf_attrs.text, list): 

229 text_list = rtf_attrs.text 

230 else: 

231 text_list = [rtf_attrs.text] if rtf_attrs.text else [] 

232 

233 # Use TextAttributes._encode_text method directly for paragraph rendering 

234 return rtf_attrs._encode_text(text_list, method="paragraph") 

235 else: 

236 # Render as table (default behavior) 

237 if page_col_width is not None: 

238 from ..row import Utils 

239 

240 col_total_width = page_col_width 

241 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, col_total_width) 

242 

243 # Create DataFrame from text string 

244 import polars as pl 

245 

246 df = pl.DataFrame([[rtf_attrs.text]]) 

247 return rtf_attrs._encode(df, col_widths) 

248 else: 

249 # Fallback without column width calculations 

250 import polars as pl 

251 

252 df = pl.DataFrame([[rtf_attrs.text]]) 

253 return rtf_attrs._encode(df) 

254 

255 def prepare_dataframe_for_body_encoding(self, df, rtf_attrs): 

256 """Prepare DataFrame for body encoding with group_by processing and column removal. 

257 

258 Args: 

259 df: Input DataFrame 

260 rtf_attrs: RTFBody attributes 

261 

262 Returns: 

263 Tuple of (processed_df, original_df) where processed_df has transformations applied 

264 """ 

265 original_df = df.clone() 

266 processed_df = df.clone() 

267 

268 # Remove subline_by columns from the processed DataFrame 

269 if rtf_attrs.subline_by is not None: 

270 columns_to_remove = set(rtf_attrs.subline_by) 

271 remaining_columns = [ 

272 col for col in processed_df.columns if col not in columns_to_remove 

273 ] 

274 processed_df = processed_df.select(remaining_columns) 

275 

276 # Note: group_by suppression is handled in the pagination strategy 

277 # for documents that need pagination. For non-paginated documents, 

278 # group_by is handled separately in encode_body method. 

279 

280 return processed_df, original_df 

281 

282 def encode_body( 

283 self, document, df, rtf_attrs, force_single_page=False 

284 ) -> Sequence[str] | None: 

285 """Encode table body component with full pagination support. 

286 

287 Args: 

288 document: RTFDocument instance for accessing pagination logic 

289 df: DataFrame containing table data 

290 rtf_attrs: RTFBody attributes 

291 

292 Returns: 

293 List of RTF body strings 

294 """ 

295 if rtf_attrs is None: 

296 return None 

297 

298 # Initialize dimensions and widths 

299 from ..row import Utils 

300 from .document_service import RTFDocumentService 

301 

302 document_service = RTFDocumentService() 

303 col_total_width = document.rtf_page.col_width 

304 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, col_total_width) 

305 

306 # Validate data sorting for all grouping parameters 

307 if any([rtf_attrs.group_by, rtf_attrs.page_by, rtf_attrs.subline_by]): 

308 grouping_service.validate_data_sorting( 

309 df, 

310 group_by=rtf_attrs.group_by, 

311 page_by=rtf_attrs.page_by, 

312 subline_by=rtf_attrs.subline_by, 

313 ) 

314 

315 # Validate subline_by formatting consistency and issue warnings 

316 if rtf_attrs.subline_by is not None: 

317 import warnings 

318 

319 formatting_warnings = ( 

320 grouping_service.validate_subline_formatting_consistency( 

321 df, rtf_attrs.subline_by, rtf_attrs 

322 ) 

323 ) 

324 for warning_msg in formatting_warnings: 

325 warnings.warn( 

326 f"subline_by formatting: {warning_msg}", UserWarning, stacklevel=2 

327 ) 

328 

329 # Apply group_by and subline_by processing if specified 

330 processed_df, original_df = self.prepare_dataframe_for_body_encoding( 

331 df, rtf_attrs 

332 ) 

333 

334 # Check if pagination is needed (unless forced to single page) 

335 if not force_single_page and document_service.needs_pagination(document): 

336 return self._encode_body_paginated( 

337 document, processed_df, rtf_attrs, col_widths 

338 ) 

339 

340 # Handle existing page_by grouping (non-paginated) 

341 page_by = document_service.process_page_by(document) 

342 if page_by is None: 

343 # Note: subline_by documents should use pagination, so this path should not be reached for them 

344 # Apply group_by processing for non-paginated documents 

345 if rtf_attrs.group_by is not None: 

346 processed_df = grouping_service.enhance_group_by( 

347 processed_df, rtf_attrs.group_by 

348 ) 

349 return rtf_attrs._encode(processed_df, col_widths) 

350 

351 rows: list[str] = [] 

352 for section in page_by: 

353 # Skip empty sections 

354 indices = [(row, col) for row, col, level in section] 

355 if not indices: 

356 continue 

357 

358 # Create DataFrame for current section 

359 import polars as pl 

360 

361 from ..attributes import BroadcastValue 

362 

363 section_df = pl.DataFrame( 

364 { 

365 str(i): [ 

366 BroadcastValue(value=processed_df, dimension=None).iloc( 

367 row, col 

368 ) 

369 ] 

370 for i, (row, col) in enumerate(indices) 

371 } 

372 ) 

373 

374 # Collect all text and table attributes 

375 from ..input import TableAttributes 

376 

377 section_attrs_dict = rtf_attrs._get_section_attributes(indices) 

378 section_attrs = TableAttributes(**section_attrs_dict) 

379 

380 # Calculate column widths and encode section 

381 if section_attrs.col_rel_width is None: 

382 # Default to equal widths if not specified 

383 section_attrs.col_rel_width = [1.0] * len(indices) 

384 section_col_widths = Utils._col_widths( 

385 section_attrs.col_rel_width, col_total_width 

386 ) 

387 rows.extend(section_attrs._encode(section_df, section_col_widths)) 

388 

389 return rows 

390 

391 def _encode_body_paginated( 

392 self, document, df, rtf_attrs, col_widths 

393 ) -> Sequence[str]: 

394 """Encode body content with pagination support.""" 

395 from .document_service import RTFDocumentService 

396 

397 document_service = RTFDocumentService() 

398 _, distributor = document_service.create_pagination_instance(document) 

399 

400 # Distribute content across pages (r2rtf compatible) 

401 additional_rows = document_service.calculate_additional_rows_per_page(document) 

402 pages = distributor.distribute_content( 

403 df=df, 

404 col_widths=col_widths, 

405 table_attrs=rtf_attrs, 

406 additional_rows_per_page=additional_rows, 

407 ) 

408 

409 # Generate RTF for each page 

410 all_rows = [] 

411 for page_num, page_content in enumerate(pages, 1): 

412 page_rows = [] 

413 

414 # Add page header content 

415 if page_content.get("headers"): 

416 for header_content in page_content["headers"]: 

417 header_text = header_content.get("text", "") 

418 if header_text: 

419 page_rows.append(header_text) 

420 

421 # Add table data 

422 page_data = page_content.get("data") 

423 if page_data is not None: 

424 # Check if it's a DataFrame or a list 

425 if hasattr(page_data, "is_empty"): 

426 # It's a DataFrame 

427 if not page_data.is_empty(): 

428 page_rows.extend(page_data) 

429 else: 

430 # It's a list or other iterable 

431 if page_data: 

432 page_rows.extend(page_data) 

433 

434 # Add footer content 

435 if page_content.get("footers"): 

436 for footer_content in page_content["footers"]: 

437 footer_text = footer_content.get("text", "") 

438 if footer_text: 

439 page_rows.append(footer_text) 

440 

441 # Add page break between pages (except last page) 

442 if page_num < len(pages): 

443 page_rows.append(document_service.generate_page_break(document)) 

444 

445 all_rows.extend(page_rows) 

446 

447 return all_rows 

448 

449 def encode_column_header( 

450 self, df, rtf_attrs, page_col_width: float 

451 ) -> Sequence[str] | None: 

452 """Encode column header component with column width support. 

453 

454 Args: 

455 df: DataFrame containing header data 

456 rtf_attrs: RTFColumnHeader attributes 

457 page_col_width: Page column width for calculations 

458 

459 Returns: 

460 List of RTF header strings 

461 """ 

462 if rtf_attrs is None: 

463 return None 

464 

465 dim = df.shape 

466 

467 rtf_attrs.col_rel_width = rtf_attrs.col_rel_width or [1] * dim[1] 

468 rtf_attrs = rtf_attrs._set_default() 

469 

470 from ..row import Utils 

471 

472 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, page_col_width) 

473 

474 return rtf_attrs._encode(df, col_widths) 

475 

476 def encode_page_break(self, page_config, page_margin_encode_func) -> str: 

477 """Generate proper RTF page break sequence matching r2rtf format. 

478 

479 Args: 

480 page_config: RTFPage configuration 

481 page_margin_encode_func: Function to encode page margins 

482 

483 Returns: 

484 RTF page break string 

485 """ 

486 from ..core import RTFConstants 

487 

488 page_setup = ( 

489 f"\\paperw{int(page_config.width * RTFConstants.TWIPS_PER_INCH)}" 

490 f"\\paperh{int(page_config.height * RTFConstants.TWIPS_PER_INCH)}\n\n" 

491 f"{page_margin_encode_func()}\n" 

492 ) 

493 

494 return f"{{\\pard\\fs2\\par}}\\page{{\\pard\\fs2\\par}}\n{page_setup}" 

495 

496 def encode_page_margin(self, page_config) -> str: 

497 """Define RTF margin settings. 

498 

499 Args: 

500 page_config: RTFPage configuration with margin settings 

501 

502 Returns: 

503 RTF margin settings string 

504 """ 

505 from ..row import Utils 

506 

507 margin_codes = [ 

508 "\\margl", 

509 "\\margr", 

510 "\\margt", 

511 "\\margb", 

512 "\\headery", 

513 "\\footery", 

514 ] 

515 margins = [Utils._inch_to_twip(m) for m in page_config.margin] 

516 margin = "".join( 

517 f"{code}{margin}" for code, margin in zip(margin_codes, margins) 

518 ) 

519 return margin + "\n"