Coverage for src/rtflite/services/encoding_service.py: 72%

176 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 16:35 +0000

1"""RTF encoding service that handles document component encoding.""" 

2 

3from .grouping_service import grouping_service 

4 

5 

6class RTFEncodingService: 

7 """Service class that handles RTF component encoding operations. 

8 

9 This class extracts encoding logic from RTFDocument to improve separation 

10 of concerns and enable better testing and maintainability. 

11 """ 

12 

13 def __init__(self): 

14 from ..rtf import RTFSyntaxGenerator 

15 

16 self.syntax = RTFSyntaxGenerator() 

17 

18 def encode_document_start(self) -> str: 

19 """Encode RTF document start.""" 

20 return "{\\rtf1\\ansi\n\\deff0\\deflang1033" 

21 

22 def encode_font_table(self) -> str: 

23 """Encode RTF font table.""" 

24 return self.syntax.generate_font_table() 

25 

26 def encode_color_table( 

27 self, document=None, used_colors: list[str] | None = None 

28 ) -> str: 

29 """Encode RTF color table with comprehensive 657-color support. 

30 

31 Args: 

32 document: RTF document to analyze for color usage (preferred) 

33 used_colors: List of color names used in document. If None and document provided, colors are auto-detected. 

34 

35 Returns: 

36 RTF color table string (empty if no colors beyond black/"" are used) 

37 """ 

38 if document is not None and used_colors is None: 

39 # Auto-detect colors from document 

40 from ..services.color_service import color_service 

41 

42 used_colors = color_service.collect_document_colors(document) 

43 

44 return self.syntax.generate_color_table(used_colors) 

45 

46 def encode_page_settings(self, page_config) -> str: 

47 """Encode RTF page settings. 

48 

49 Args: 

50 page_config: RTFPage configuration object 

51 

52 Returns: 

53 RTF page settings string 

54 """ 

55 return self.syntax.generate_page_settings( 

56 page_config.width, 

57 page_config.height, 

58 page_config.margin, 

59 page_config.orientation, 

60 ) 

61 

62 def encode_page_header(self, header_config, method: str = "line") -> str: 

63 """Encode page header component. 

64 

65 Args: 

66 header_config: RTFPageHeader configuration 

67 method: Encoding method 

68 

69 Returns: 

70 RTF header string 

71 """ 

72 if header_config is None or not header_config.text: 

73 return "" 

74 

75 # Use the existing text encoding method 

76 result = header_config._encode_text(text=header_config.text, method=method) 

77 

78 return f"{{\\header{result}}}" 

79 

80 def encode_page_footer(self, footer_config, method: str = "line") -> str: 

81 """Encode page footer component. 

82 

83 Args: 

84 footer_config: RTFPageFooter configuration 

85 method: Encoding method 

86 

87 Returns: 

88 RTF footer string 

89 """ 

90 if footer_config is None or not footer_config.text: 

91 return "" 

92 

93 # Use the existing text encoding method 

94 result = footer_config._encode_text(text=footer_config.text, method=method) 

95 return f"{{\\footer{result}}}" 

96 

97 def encode_title(self, title_config, method: str = "line") -> str: 

98 """Encode title component. 

99 

100 Args: 

101 title_config: RTFTitle configuration 

102 method: Encoding method 

103 

104 Returns: 

105 RTF title string 

106 """ 

107 if not title_config or not title_config.text: 

108 return "" 

109 

110 # Use the existing text encoding method 

111 return title_config._encode_text(text=title_config.text, method=method) 

112 

113 def encode_subline(self, subline_config, method: str = "line") -> str: 

114 """Encode subline component. 

115 

116 Args: 

117 subline_config: RTFSubline configuration 

118 method: Encoding method 

119 

120 Returns: 

121 RTF subline string 

122 """ 

123 if subline_config is None or not subline_config.text: 

124 return "" 

125 

126 # Use the existing text encoding method 

127 return subline_config._encode_text(text=subline_config.text, method=method) 

128 

129 def encode_footnote( 

130 self, 

131 footnote_config, 

132 page_number: int | None = None, 

133 page_col_width: float | None = None, 

134 ) -> list[str]: 

135 """Encode footnote component with advanced formatting. 

136 

137 Args: 

138 footnote_config: RTFFootnote configuration 

139 page_number: Page number for footnote 

140 page_col_width: Page column width for calculations 

141 

142 Returns: 

143 List of RTF footnote strings 

144 """ 

145 if footnote_config is None: 

146 return [] 

147 

148 rtf_attrs = footnote_config 

149 

150 # Apply page-specific border if set 

151 if ( 

152 hasattr(rtf_attrs, "_page_border_style") 

153 and page_number is not None 

154 and page_number in rtf_attrs._page_border_style 

155 ): 

156 border_style = rtf_attrs._page_border_style[page_number] 

157 # Create a copy with modified border 

158 rtf_attrs = rtf_attrs.model_copy() 

159 rtf_attrs.border_bottom = [[border_style]] 

160 

161 # Check if footnote should be rendered as table or paragraph 

162 if hasattr(rtf_attrs, "as_table") and not rtf_attrs.as_table: 

163 # Render as paragraph (plain text) 

164 if isinstance(rtf_attrs.text, list): 

165 text_list = rtf_attrs.text 

166 else: 

167 text_list = [rtf_attrs.text] if rtf_attrs.text else [] 

168 

169 # Use TextAttributes._encode_text method directly for paragraph rendering 

170 return rtf_attrs._encode_text(text_list, method="paragraph") 

171 else: 

172 # Render as table (default behavior) 

173 if page_col_width is not None: 

174 from ..row import Utils 

175 

176 col_total_width = page_col_width 

177 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, col_total_width) 

178 

179 # Create DataFrame from text string 

180 import polars as pl 

181 

182 df = pl.DataFrame([[rtf_attrs.text]]) 

183 return rtf_attrs._encode(df, col_widths) 

184 else: 

185 # Fallback without column width calculations 

186 import polars as pl 

187 

188 df = pl.DataFrame([[rtf_attrs.text]]) 

189 return rtf_attrs._encode(df) 

190 

191 def encode_source( 

192 self, 

193 source_config, 

194 page_number: int | None = None, 

195 page_col_width: float | None = None, 

196 ) -> list[str]: 

197 """Encode source component with advanced formatting. 

198 

199 Args: 

200 source_config: RTFSource configuration 

201 page_number: Page number for source 

202 page_col_width: Page column width for calculations 

203 

204 Returns: 

205 List of RTF source strings 

206 """ 

207 if source_config is None: 

208 return [] 

209 

210 rtf_attrs = source_config 

211 

212 # Apply page-specific border if set 

213 if ( 

214 hasattr(rtf_attrs, "_page_border_style") 

215 and page_number is not None 

216 and page_number in rtf_attrs._page_border_style 

217 ): 

218 border_style = rtf_attrs._page_border_style[page_number] 

219 # Create a copy with modified border 

220 rtf_attrs = rtf_attrs.model_copy() 

221 rtf_attrs.border_bottom = [[border_style]] 

222 

223 # Check if source should be rendered as table or paragraph 

224 if hasattr(rtf_attrs, "as_table") and not rtf_attrs.as_table: 

225 # Render as paragraph (plain text) 

226 if isinstance(rtf_attrs.text, list): 

227 text_list = rtf_attrs.text 

228 else: 

229 text_list = [rtf_attrs.text] if rtf_attrs.text else [] 

230 

231 # Use TextAttributes._encode_text method directly for paragraph rendering 

232 return rtf_attrs._encode_text(text_list, method="paragraph") 

233 else: 

234 # Render as table (default behavior) 

235 if page_col_width is not None: 

236 from ..row import Utils 

237 

238 col_total_width = page_col_width 

239 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, col_total_width) 

240 

241 # Create DataFrame from text string 

242 import polars as pl 

243 

244 df = pl.DataFrame([[rtf_attrs.text]]) 

245 return rtf_attrs._encode(df, col_widths) 

246 else: 

247 # Fallback without column width calculations 

248 import polars as pl 

249 

250 df = pl.DataFrame([[rtf_attrs.text]]) 

251 return rtf_attrs._encode(df) 

252 

253 def prepare_dataframe_for_body_encoding(self, df, rtf_attrs): 

254 """Prepare DataFrame for body encoding with group_by processing and column removal. 

255 

256 Args: 

257 df: Input DataFrame 

258 rtf_attrs: RTFBody attributes 

259 

260 Returns: 

261 Tuple of (processed_df, original_df) where processed_df has transformations applied 

262 """ 

263 original_df = df.clone() 

264 processed_df = df.clone() 

265 

266 # Remove subline_by columns from the processed DataFrame 

267 if rtf_attrs.subline_by is not None: 

268 columns_to_remove = set(rtf_attrs.subline_by) 

269 remaining_columns = [ 

270 col for col in processed_df.columns if col not in columns_to_remove 

271 ] 

272 processed_df = processed_df.select(remaining_columns) 

273 

274 # Note: group_by suppression is handled in the pagination strategy 

275 # for documents that need pagination. For non-paginated documents, 

276 # group_by is handled separately in encode_body method. 

277 

278 return processed_df, original_df 

279 

280 def encode_body( 

281 self, document, df, rtf_attrs, force_single_page=False 

282 ) -> list[str] | None: 

283 """Encode table body component with full pagination support. 

284 

285 Args: 

286 document: RTFDocument instance for accessing pagination logic 

287 df: DataFrame containing table data 

288 rtf_attrs: RTFBody attributes 

289 

290 Returns: 

291 List of RTF body strings 

292 """ 

293 if rtf_attrs is None: 

294 return None 

295 

296 # Initialize dimensions and widths 

297 from ..row import Utils 

298 from .document_service import RTFDocumentService 

299 

300 document_service = RTFDocumentService() 

301 col_total_width = document.rtf_page.col_width 

302 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, col_total_width) 

303 

304 # Validate data sorting for all grouping parameters 

305 if any([rtf_attrs.group_by, rtf_attrs.page_by, rtf_attrs.subline_by]): 

306 grouping_service.validate_data_sorting( 

307 df, 

308 group_by=rtf_attrs.group_by, 

309 page_by=rtf_attrs.page_by, 

310 subline_by=rtf_attrs.subline_by, 

311 ) 

312 

313 # Validate subline_by formatting consistency and issue warnings 

314 if rtf_attrs.subline_by is not None: 

315 import warnings 

316 

317 formatting_warnings = ( 

318 grouping_service.validate_subline_formatting_consistency( 

319 df, rtf_attrs.subline_by, rtf_attrs 

320 ) 

321 ) 

322 for warning_msg in formatting_warnings: 

323 warnings.warn( 

324 f"subline_by formatting: {warning_msg}", UserWarning, stacklevel=2 

325 ) 

326 

327 # Apply group_by and subline_by processing if specified 

328 processed_df, original_df = self.prepare_dataframe_for_body_encoding( 

329 df, rtf_attrs 

330 ) 

331 

332 # Check if pagination is needed (unless forced to single page) 

333 if not force_single_page and document_service.needs_pagination(document): 

334 return self._encode_body_paginated( 

335 document, processed_df, rtf_attrs, col_widths 

336 ) 

337 

338 # Handle existing page_by grouping (non-paginated) 

339 page_by = document_service.process_page_by(document) 

340 if page_by is None: 

341 # Note: subline_by documents should use pagination, so this path should not be reached for them 

342 # Apply group_by processing for non-paginated documents 

343 if rtf_attrs.group_by is not None: 

344 processed_df = grouping_service.enhance_group_by( 

345 processed_df, rtf_attrs.group_by 

346 ) 

347 return rtf_attrs._encode(processed_df, col_widths) 

348 

349 rows: list[str] = [] 

350 for section in page_by: 

351 # Skip empty sections 

352 indices = [(row, col) for row, col, level in section] 

353 if not indices: 

354 continue 

355 

356 # Create DataFrame for current section 

357 import polars as pl 

358 

359 from ..attributes import BroadcastValue 

360 

361 section_df = pl.DataFrame( 

362 { 

363 str(i): [ 

364 BroadcastValue(value=processed_df, dimension=None).iloc( 

365 row, col 

366 ) 

367 ] 

368 for i, (row, col) in enumerate(indices) 

369 } 

370 ) 

371 

372 # Collect all text and table attributes 

373 from ..input import TableAttributes 

374 

375 section_attrs_dict = rtf_attrs._get_section_attributes(indices) 

376 section_attrs = TableAttributes(**section_attrs_dict) 

377 

378 # Calculate column widths and encode section 

379 if section_attrs.col_rel_width is None: 

380 # Default to equal widths if not specified 

381 section_attrs.col_rel_width = [1.0] * len(indices) 

382 section_col_widths = Utils._col_widths( 

383 section_attrs.col_rel_width, col_total_width 

384 ) 

385 rows.extend(section_attrs._encode(section_df, section_col_widths)) 

386 

387 return rows 

388 

389 def _encode_body_paginated(self, document, df, rtf_attrs, col_widths) -> list[str]: 

390 """Encode body content with pagination support.""" 

391 from .document_service import RTFDocumentService 

392 

393 document_service = RTFDocumentService() 

394 _, distributor = document_service.create_pagination_instance(document) 

395 

396 # Distribute content across pages (r2rtf compatible) 

397 additional_rows = document_service.calculate_additional_rows_per_page(document) 

398 pages = distributor.distribute_content( 

399 df=df, 

400 col_widths=col_widths, 

401 table_attrs=rtf_attrs, 

402 additional_rows_per_page=additional_rows, 

403 ) 

404 

405 # Generate RTF for each page 

406 all_rows = [] 

407 for page_num, page_content in enumerate(pages, 1): 

408 page_rows = [] 

409 

410 # Add page header content 

411 if page_content.get("headers"): 

412 for header_content in page_content["headers"]: 

413 header_text = header_content.get("text", "") 

414 if header_text: 

415 page_rows.append(header_text) 

416 

417 # Add table data 

418 page_data = page_content.get("data") 

419 if page_data is not None: 

420 # Check if it's a DataFrame or a list 

421 if hasattr(page_data, "is_empty"): 

422 # It's a DataFrame 

423 if not page_data.is_empty(): 

424 page_rows.extend(page_data) 

425 else: 

426 # It's a list or other iterable 

427 if page_data: 

428 page_rows.extend(page_data) 

429 

430 # Add footer content 

431 if page_content.get("footers"): 

432 for footer_content in page_content["footers"]: 

433 footer_text = footer_content.get("text", "") 

434 if footer_text: 

435 page_rows.append(footer_text) 

436 

437 # Add page break between pages (except last page) 

438 if page_num < len(pages): 

439 page_rows.append(document_service.generate_page_break(document)) 

440 

441 all_rows.extend(page_rows) 

442 

443 return all_rows 

444 

445 def encode_column_header( 

446 self, df, rtf_attrs, page_col_width: float 

447 ) -> list[str] | None: 

448 """Encode column header component with column width support. 

449 

450 Args: 

451 df: DataFrame containing header data 

452 rtf_attrs: RTFColumnHeader attributes 

453 page_col_width: Page column width for calculations 

454 

455 Returns: 

456 List of RTF header strings 

457 """ 

458 if rtf_attrs is None: 

459 return None 

460 

461 dim = df.shape 

462 

463 rtf_attrs.col_rel_width = rtf_attrs.col_rel_width or [1] * dim[1] 

464 rtf_attrs = rtf_attrs._set_default() 

465 

466 from ..row import Utils 

467 

468 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, page_col_width) 

469 

470 return rtf_attrs._encode(df, col_widths) 

471 

472 def encode_page_break(self, page_config, page_margin_encode_func) -> str: 

473 """Generate proper RTF page break sequence matching r2rtf format. 

474 

475 Args: 

476 page_config: RTFPage configuration 

477 page_margin_encode_func: Function to encode page margins 

478 

479 Returns: 

480 RTF page break string 

481 """ 

482 from ..core import RTFConstants 

483 

484 page_setup = ( 

485 f"\\paperw{int(page_config.width * RTFConstants.TWIPS_PER_INCH)}" 

486 f"\\paperh{int(page_config.height * RTFConstants.TWIPS_PER_INCH)}\n\n" 

487 f"{page_margin_encode_func()}\n" 

488 ) 

489 

490 return f"{{\\pard\\fs2\\par}}\\page{{\\pard\\fs2\\par}}\n{page_setup}" 

491 

492 def encode_page_margin(self, page_config) -> str: 

493 """Define RTF margin settings. 

494 

495 Args: 

496 page_config: RTFPage configuration with margin settings 

497 

498 Returns: 

499 RTF margin settings string 

500 """ 

501 from ..row import Utils 

502 

503 margin_codes = [ 

504 "\\margl", 

505 "\\margr", 

506 "\\margt", 

507 "\\margb", 

508 "\\headery", 

509 "\\footery", 

510 ] 

511 margins = [Utils._inch_to_twip(m) for m in page_config.margin] 

512 margin = "".join( 

513 f"{code}{margin}" for code, margin in zip(margin_codes, margins) 

514 ) 

515 return margin + "\n"