Coverage for src / rtflite / encode.py: 84%

192 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-08 17:03 +0000

1"""RTF Document class - main entry point for RTF generation. 

2 

3This module provides the RTFDocument class with a clean, service-oriented architecture. 

4All complex logic has been delegated to specialized services and strategies. 

5""" 

6 

7import shutil 

8import tempfile 

9from collections.abc import Sequence 

10from pathlib import Path 

11 

12import polars as pl 

13from pydantic import ( 

14 BaseModel, 

15 ConfigDict, 

16 Field, 

17 PrivateAttr, 

18 field_validator, 

19 model_validator, 

20) 

21 

22from .convert import LibreOfficeConverter 

23from .input import ( 

24 RTFBody, 

25 RTFColumnHeader, 

26 RTFFigure, 

27 RTFFootnote, 

28 RTFPage, 

29 RTFPageFooter, 

30 RTFPageHeader, 

31 RTFSource, 

32 RTFSubline, 

33 RTFTitle, 

34) 

35from .row import Utils 

36 

37 

38class RTFDocument(BaseModel): 

39 """Main class for creating RTF documents with tables, text, and figures. 

40 

41 RTFDocument is the central class for generating Rich Text Format (RTF) files 

42 containing formatted tables, titles, footnotes, and other document elements. 

43 It provides a comprehensive API for creating professional documents commonly 

44 used in clinical trials, scientific research, and data reporting. 

45 

46 Examples: 

47 Simple table with title: 

48 ```python 

49 import rtflite as rtf 

50 import polars as pl 

51 

52 df = pl.DataFrame({ 

53 "Subject": ["001", "002", "003"], 

54 "Age": [45, 52, 38], 

55 "Treatment": ["Drug A", "Drug B", "Placebo"] 

56 }) 

57 

58 doc = rtf.RTFDocument( 

59 df=df, 

60 rtf_title=rtf.RTFTitle(text="Patient Demographics"), 

61 rtf_body=rtf.RTFBody(col_rel_width=[2, 1, 2]) 

62 ) 

63 doc.write_rtf("demographics.rtf") 

64 ``` 

65 

66 Multi-page document with headers and footers: 

67 ```python 

68 doc = rtf.RTFDocument( 

69 df=large_df, 

70 rtf_page=rtf.RTFPage(nrow=40, orientation="landscape"), 

71 rtf_page_header=rtf.RTFPageHeader(), # Default page numbering 

72 rtf_page_footer=rtf.RTFPageFooter(text="Confidential"), 

73 rtf_title=rtf.RTFTitle(text="Clinical Study Results"), 

74 rtf_column_header=rtf.RTFColumnHeader( 

75 text=["Subject ID", "Visit", "Result", "Units"] 

76 ), 

77 rtf_body=rtf.RTFBody( 

78 col_rel_width=[2, 1, 1, 1], 

79 text_justification=[["l", "c", "r", "c"]] 

80 ), 

81 rtf_footnote=rtf.RTFFootnote( 

82 text="Results are mean +/- SD" 

83 ) 

84 ) 

85 doc.write_rtf("results.rtf") 

86 ``` 

87 

88 Document with grouped data and sublines: 

89 ```python 

90 doc = rtf.RTFDocument( 

91 df=grouped_df, 

92 rtf_body=rtf.RTFBody( 

93 group_by=["SITE", "TREATMENT"], # Suppress duplicate values 

94 subline_by=["STUDY_PHASE"], # Create section headers 

95 col_rel_width=[2, 2, 1, 1] 

96 ) 

97 ) 

98 ``` 

99 

100 Attributes: 

101 df: Data to display in the table. Can be a single DataFrame or list of 

102 DataFrames for multi-section documents. Accepts pandas or polars 

103 DataFrames (automatically converted to polars internally). 

104 

105 rtf_page: Page configuration including size, orientation, margins, and 

106 pagination settings. 

107 

108 rtf_page_header: Optional header appearing at the top of every page. 

109 

110 rtf_page_footer: Optional footer appearing at the bottom of every page. 

111 

112 rtf_title: Document title(s) displayed at the top. 

113 

114 rtf_column_header: Column headers for the table. Can be a single header 

115 or list of headers for multi-row headers. 

116 

117 rtf_body: Table body configuration including column widths, formatting, 

118 borders, and special features like group_by and subline_by. 

119 

120 rtf_footnote: Optional footnote text displayed after the table. 

121 

122 rtf_source: Optional source citation displayed at the very bottom. 

123 

124 rtf_figure: Optional figure/image to embed in the document. 

125 

126 Methods: 

127 rtf_encode(): Generate the complete RTF document as a string. 

128 write_rtf(file_path): Write the RTF document to a file. 

129 """ 

130 

131 model_config = ConfigDict(arbitrary_types_allowed=True) 

132 _table_space: int = PrivateAttr(default=0) 

133 

134 # Core data 

135 df: pl.DataFrame | list[pl.DataFrame] | None = Field( 

136 default=None, 

137 description=( 

138 "The DataFrame(s) containing the data for the RTF document. " 

139 "Accepts single DataFrame or list of DataFrames for " 

140 "multi-section documents. Accepts pandas or polars DataFrame, " 

141 "internally converted to polars. Optional when using figure-only " 

142 "documents." 

143 ), 

144 ) 

145 

146 # Document structure 

147 rtf_page: RTFPage = Field( 

148 default_factory=lambda: RTFPage(), 

149 description="Page settings including size, orientation and margins", 

150 ) 

151 rtf_page_header: RTFPageHeader | None = Field( 

152 default=None, description="Text to appear in the header of each page" 

153 ) 

154 rtf_title: RTFTitle | None = Field( 

155 default_factory=lambda: RTFTitle(), 

156 description="Title section settings including text and formatting", 

157 ) 

158 rtf_subline: RTFSubline | None = Field( 

159 default=None, description="Subject line text to appear below the title" 

160 ) 

161 rtf_column_header: ( 

162 Sequence[RTFColumnHeader] | Sequence[Sequence[RTFColumnHeader | None]] 

163 ) = Field( 

164 default_factory=lambda: [RTFColumnHeader()], 

165 description=( 

166 "Column header settings. For multi-section documents, use nested " 

167 "list format: [[header1], [header2], [None]] where None means no " 

168 "header for that section." 

169 ), 

170 ) 

171 rtf_body: RTFBody | Sequence[RTFBody] | None = Field( 

172 default_factory=lambda: RTFBody(), 

173 description=( 

174 "Table body section settings including column widths and " 

175 "formatting. For multi-section documents, provide a list of " 

176 "RTFBody objects." 

177 ), 

178 ) 

179 rtf_footnote: RTFFootnote | None = Field( 

180 default=None, description="Footnote text to appear at bottom of document" 

181 ) 

182 rtf_source: RTFSource | None = Field( 

183 default=None, description="Data source citation text" 

184 ) 

185 rtf_page_footer: RTFPageFooter | None = Field( 

186 default=None, description="Text to appear in the footer of each page" 

187 ) 

188 rtf_figure: RTFFigure | None = Field( 

189 default=None, description="Figure/image content to embed in the document" 

190 ) 

191 

192 @field_validator("rtf_column_header", mode="before") 

193 def convert_column_header_to_list(cls, v): 

194 """Convert single RTFColumnHeader to list or handle nested list format""" 

195 if v is not None and isinstance(v, RTFColumnHeader): 

196 return [v] 

197 return v 

198 

199 @model_validator(mode="before") 

200 @classmethod 

201 def validate_dataframe(cls, values): 

202 """Convert DataFrame(s) to polars for internal processing.""" 

203 if "df" in values and values["df"] is not None: 

204 df = values["df"] 

205 import narwhals as nw 

206 import polars as pl 

207 

208 # Handle single DataFrame 

209 if not isinstance(df, list): 

210 if isinstance(df, pl.DataFrame): 

211 pass # Already polars 

212 else: 

213 # Use narwhals to handle any DataFrame type 

214 try: 

215 nw_df = nw.from_native(df) 

216 values["df"] = nw_df.to_native(pl.DataFrame) 

217 except Exception as e: 

218 raise ValueError( 

219 f"DataFrame must be a valid DataFrame: {str(e)}" 

220 ) from e 

221 # Handle list of DataFrames 

222 else: 

223 converted_dfs = [] 

224 for i, single_df in enumerate(df): 

225 if isinstance(single_df, pl.DataFrame): 

226 converted_dfs.append(single_df) 

227 else: 

228 try: 

229 # Use narwhals to handle any DataFrame type 

230 nw_df = nw.from_native(single_df) 

231 converted_dfs.append(nw_df.to_native(pl.DataFrame)) 

232 except Exception as e: 

233 raise ValueError( 

234 f"DataFrame at index {i} must be a valid " 

235 f"DataFrame: {str(e)}" 

236 ) from e 

237 values["df"] = converted_dfs 

238 return values 

239 

240 @model_validator(mode="after") 

241 def validate_column_names(self): 

242 """Validate column references and multi-section consistency.""" 

243 # Validate df and rtf_figure usage 

244 if self.df is None and self.rtf_figure is None: 

245 raise ValueError("Either 'df' or 'rtf_figure' must be provided") 

246 

247 if self.df is not None and self.rtf_figure is not None: 

248 raise ValueError( 

249 "Cannot use both 'df' and 'rtf_figure' together. Use either " 

250 "tables or figures in a single document." 

251 ) 

252 

253 # When RTFFigure is used, enforce as_table=False for footnotes and sources 

254 if self.rtf_figure is not None: 

255 if self.rtf_footnote is not None and getattr( 

256 self.rtf_footnote, "as_table", True 

257 ): 

258 raise ValueError( 

259 "When using RTFFigure, RTFFootnote must have as_table=False" 

260 ) 

261 if self.rtf_source is not None and getattr( 

262 self.rtf_source, "as_table", False 

263 ): 

264 raise ValueError( 

265 "When using RTFFigure, RTFSource must have as_table=False" 

266 ) 

267 

268 # Skip column validation if no DataFrame provided (figure-only documents) 

269 if self.df is None: 

270 return self 

271 

272 # Multi-section validation 

273 is_multi_section = isinstance(self.df, list) 

274 if is_multi_section: 

275 # Validate rtf_body is also a list with matching length 

276 if not isinstance(self.rtf_body, list): 

277 raise ValueError("When df is a list, rtf_body must also be a list") 

278 if len(self.df) != len(self.rtf_body): 

279 raise ValueError( 

280 "df list length " 

281 f"({len(self.df)}) must match rtf_body list length " 

282 f"({len(self.rtf_body)})" 

283 ) 

284 

285 # Validate rtf_column_header if it's nested list format 

286 if ( 

287 isinstance(self.rtf_column_header, list) 

288 and self.rtf_column_header 

289 and isinstance(self.rtf_column_header[0], list) 

290 and len(self.rtf_column_header) != len(self.df) 

291 ): 

292 raise ValueError( 

293 "rtf_column_header nested list length " 

294 f"({len(self.rtf_column_header)}) must match df list " 

295 f"length ({len(self.df)})" 

296 ) 

297 

298 # Per-section column validation 

299 for i, (section_df, section_body) in enumerate( 

300 zip(self.df, self.rtf_body, strict=True) 

301 ): 

302 self._validate_section_columns(section_df, section_body, i) 

303 else: 

304 # Single section validation (existing logic) 

305 self._validate_section_columns(self.df, self.rtf_body, 0) 

306 

307 return self 

308 

309 def _validate_section_columns(self, df, body, section_index): 

310 """Validate column references for a single section.""" 

311 columns = df.columns 

312 section_label = f"section {section_index}" if section_index > 0 else "df" 

313 

314 if body.group_by is not None: 

315 for column in body.group_by: 

316 if column not in columns: 

317 raise ValueError( 

318 f"`group_by` column {column} not found in {section_label}" 

319 ) 

320 

321 if body.page_by is not None: 

322 for column in body.page_by: 

323 if column not in columns: 

324 raise ValueError( 

325 f"`page_by` column {column} not found in {section_label}" 

326 ) 

327 

328 if body.subline_by is not None: 

329 for column in body.subline_by: 

330 if column not in columns: 

331 raise ValueError( 

332 f"`subline_by` column {column} not found in {section_label}" 

333 ) 

334 

335 def __init__(self, **data): 

336 super().__init__(**data) 

337 

338 # Set default column widths based on DataFrame dimensions when a 

339 # DataFrame is provided. 

340 if self.df is not None: 

341 is_multi_section = isinstance(self.df, list) 

342 

343 if is_multi_section: 

344 # Handle multi-section documents 

345 for section_df, section_body in zip( 

346 self.df, self.rtf_body, strict=True 

347 ): 

348 dim = section_df.shape 

349 if section_body.col_rel_width is None: 

350 section_body.col_rel_width = [1] * dim[1] 

351 elif len(section_body.col_rel_width) == 1 and dim[1] > 1: 

352 section_body.col_rel_width = section_body.col_rel_width * dim[1] 

353 

354 # Handle column headers for multi-section 

355 if self.rtf_column_header and isinstance( 

356 self.rtf_column_header[0], list 

357 ): 

358 # Nested list format: [[header1], [header2], [None]] 

359 for section_headers, section_body in zip( 

360 self.rtf_column_header, self.rtf_body, strict=True 

361 ): 

362 if section_headers: # Skip if [None] 

363 for header in section_headers: 

364 if header and header.col_rel_width is None: 

365 header.col_rel_width = ( 

366 section_body.col_rel_width.copy() 

367 ) 

368 elif self.rtf_column_header: 

369 # Flat list format - apply to first section only 

370 for header in self.rtf_column_header: 

371 if header.col_rel_width is None: 

372 header.col_rel_width = self.rtf_body[0].col_rel_width.copy() 

373 else: 

374 # Handle single section documents (existing logic) 

375 dim = self.df.shape 

376 if self.rtf_body.col_rel_width is None: 

377 self.rtf_body.col_rel_width = [1] * dim[1] 

378 elif len(self.rtf_body.col_rel_width) == 1 and dim[1] > 1: 

379 self.rtf_body.col_rel_width = self.rtf_body.col_rel_width * dim[1] 

380 

381 # Inherit col_rel_width from rtf_body to rtf_column_header if 

382 # not specified 

383 if self.rtf_column_header: 

384 for header in self.rtf_column_header: 

385 if header.col_rel_width is None: 

386 header.col_rel_width = self.rtf_body.col_rel_width.copy() 

387 

388 # Calculate table spacing for text components 

389 self._table_space = int( 

390 Utils._inch_to_twip(self.rtf_page.width - self.rtf_page.col_width) / 2 

391 ) 

392 

393 # Apply table spacing to text components if needed 

394 self._apply_table_spacing() 

395 

396 def _apply_table_spacing(self): 

397 """Apply table-based spacing to text components that reference the table.""" 

398 for component in [self.rtf_subline, self.rtf_page_header, self.rtf_page_footer]: 

399 if component is not None and component.text_indent_reference == "table": 

400 component.text_space_before = ( 

401 self._table_space + component.text_space_before 

402 ) 

403 component.text_space_after = ( 

404 self._table_space + component.text_space_after 

405 ) 

406 

407 def rtf_encode(self) -> str: 

408 """Generate the complete RTF document as a string. 

409 

410 This method processes all document components and generates the final 

411 RTF code including headers, formatting, tables, and all other elements. 

412 The resulting string can be written to a file or processed further. 

413 

414 Returns: 

415 str: Complete RTF document string ready to be saved as an .rtf file. 

416 

417 Examples: 

418 ```python 

419 doc = RTFDocument(df=data, rtf_title=RTFTitle(text="Report")) 

420 rtf_string = doc.rtf_encode() 

421 # Can write manually or process further 

422 with open("output.rtf", "w") as f: 

423 f.write(rtf_string) 

424 ``` 

425 """ 

426 from .encoding import RTFEncodingEngine 

427 

428 engine = RTFEncodingEngine() 

429 return engine.encode_document(self) 

430 

431 def write_rtf(self, file_path: str | Path) -> None: 

432 """Write the RTF document to a file. 

433 

434 Generates the complete RTF document and writes it to the specified file path. 

435 The file is written in UTF-8 encoding and will have the `.rtf` extension. 

436 

437 Args: 

438 file_path: Path where the RTF file should be saved. 

439 Accepts string or Path input. Can be absolute or relative. 

440 Directories are created if they do not already exist. 

441 

442 Examples: 

443 ```python 

444 doc = RTFDocument(df=data, rtf_title=RTFTitle(text="Report")) 

445 doc.write_rtf("output/report.rtf") 

446 ``` 

447 

448 Note: 

449 The method prints the file path to stdout for confirmation. 

450 """ 

451 target_path = Path(file_path).expanduser() 

452 target_path.parent.mkdir(parents=True, exist_ok=True) 

453 print(target_path) 

454 rtf_code = self.rtf_encode() 

455 target_path.write_text(rtf_code, encoding="utf-8") 

456 

457 def write_docx( 

458 self, 

459 file_path: str | Path, 

460 *, 

461 converter: LibreOfficeConverter | None = None, 

462 ) -> None: 

463 """Write the document as a DOCX file. 

464 

465 Writes the document to a temporary RTF file first, and then converts 

466 it to DOCX with LibreOffice. Temporary directories are used for 

467 all intermediate files to avoid placing artifacts alongside the 

468 requested output path. 

469 

470 Args: 

471 file_path: Destination path for the DOCX file. 

472 Accepts string or Path input. Can be absolute or relative. 

473 Directories are created if they do not already exist. 

474 converter: Optional LibreOffice converter instance. 

475 Pass a configured instance (for example with a custom 

476 `executable_path`) to control how LibreOffice is invoked and to 

477 avoid re-initializing and re-verifying the executable path across 

478 multiple conversions. Note that each call to ``convert()`` still 

479 starts a new LibreOffice process in headless mode; the process is 

480 not kept alive between conversions. 

481 

482 Examples: 

483 ```python 

484 doc = RTFDocument(df=data, rtf_title=RTFTitle(text="Report")) 

485 doc.write_docx("output/report.docx") 

486 ``` 

487 

488 Custom LibreOffice executable: 

489 ```python 

490 converter = LibreOfficeConverter(executable_path="/custom/path/to/soffice") 

491 doc.write_docx("output/report.docx", converter=converter) 

492 ``` 

493 

494 Note: 

495 The method prints the file path to stdout for confirmation. 

496 """ 

497 target_path = Path(file_path).expanduser() 

498 target_path.parent.mkdir(parents=True, exist_ok=True) 

499 

500 if converter is None: 

501 converter = LibreOfficeConverter() 

502 with tempfile.TemporaryDirectory() as tmpdir: 

503 rtf_path = Path(tmpdir) / f"{target_path.stem}.rtf" 

504 rtf_code = self.rtf_encode() 

505 rtf_path.write_text(rtf_code, encoding="utf-8") 

506 

507 with tempfile.TemporaryDirectory() as convert_tmpdir: 

508 converted = converter.convert( 

509 input_files=rtf_path, 

510 output_dir=Path(convert_tmpdir), 

511 format="docx", 

512 overwrite=True, 

513 ) 

514 if not isinstance(converted, Path): 

515 raise TypeError( 

516 "LibreOffice conversion returned an unexpected output for a " 

517 "single input file; expected `Path`, got object of type " 

518 f"{type(converted)!r} with value {converted!r}." 

519 ) 

520 docx_path = converted 

521 shutil.move(str(docx_path), target_path) 

522 

523 print(target_path) 

524 

525 def write_html( 

526 self, 

527 file_path: str | Path, 

528 *, 

529 converter: LibreOfficeConverter | None = None, 

530 ) -> None: 

531 """Write the document as an HTML file. 

532 

533 Writes the document to a temporary RTF file first, and then converts 

534 it to HTML with LibreOffice. Temporary directories are used for 

535 all intermediate files to avoid placing artifacts alongside the 

536 requested output path. 

537 

538 Args: 

539 file_path: Destination path for the HTML file. 

540 Accepts string or Path input. Can be absolute or relative. 

541 Directories are created if they do not already exist. 

542 converter: Optional LibreOffice converter instance. 

543 Pass a configured instance (for example with a custom 

544 `executable_path`) to control how LibreOffice is invoked and to 

545 avoid re-initializing and re-verifying the executable path across 

546 multiple conversions. Note that each call to ``convert()`` still 

547 starts a new LibreOffice process in headless mode; the process is 

548 not kept alive between conversions. 

549 

550 Examples: 

551 ```python 

552 doc = RTFDocument(df=data, rtf_title=RTFTitle(text="Report")) 

553 doc.write_html("output/report.html") 

554 ``` 

555 

556 Note: 

557 LibreOffice may create a companion directory (for example 

558 `report.html_files`) for embedded resources. When present, it is moved 

559 alongside the requested output path. 

560 """ 

561 target_path = Path(file_path).expanduser() 

562 target_path.parent.mkdir(parents=True, exist_ok=True) 

563 

564 if converter is None: 

565 converter = LibreOfficeConverter() 

566 with tempfile.TemporaryDirectory() as tmpdir: 

567 rtf_path = Path(tmpdir) / f"{target_path.stem}.rtf" 

568 rtf_code = self.rtf_encode() 

569 rtf_path.write_text(rtf_code, encoding="utf-8") 

570 

571 with tempfile.TemporaryDirectory() as convert_tmpdir: 

572 converted = converter.convert( 

573 input_files=rtf_path, 

574 output_dir=Path(convert_tmpdir), 

575 format="html", 

576 overwrite=True, 

577 ) 

578 if not isinstance(converted, Path): 

579 raise TypeError( 

580 "LibreOffice conversion returned an unexpected output for a " 

581 "single input file; expected `Path`, got object of type " 

582 f"{type(converted)!r} with value {converted!r}." 

583 ) 

584 html_path = converted 

585 resources_dir = html_path.with_name(f"{html_path.name}_files") 

586 shutil.move(str(html_path), target_path) 

587 if resources_dir.is_dir(): 

588 shutil.move( 

589 str(resources_dir), target_path.parent / resources_dir.name 

590 ) 

591 

592 print(target_path) 

593 

594 def write_pdf( 

595 self, 

596 file_path: str | Path, 

597 *, 

598 converter: LibreOfficeConverter | None = None, 

599 ) -> None: 

600 """Write the document as a PDF file. 

601 

602 Writes the document to a temporary RTF file first, and then converts 

603 it to PDF with LibreOffice. Temporary directories are used for 

604 all intermediate files to avoid placing artifacts alongside the 

605 requested output path. 

606 

607 Args: 

608 file_path: Destination path for the PDF file. 

609 Accepts string or Path input. Can be absolute or relative. 

610 Directories are created if they do not already exist. 

611 converter: Optional LibreOffice converter instance. 

612 Pass a configured instance (for example with a custom 

613 `executable_path`) to control how LibreOffice is invoked and to 

614 avoid re-initializing and re-verifying the executable path across 

615 multiple conversions. Note that each call to ``convert()`` still 

616 starts a new LibreOffice process in headless mode; the process is 

617 not kept alive between conversions. 

618 

619 Examples: 

620 ```python 

621 doc = RTFDocument(df=data, rtf_title=RTFTitle(text="Report")) 

622 doc.write_pdf("output/report.pdf") 

623 ``` 

624 """ 

625 target_path = Path(file_path).expanduser() 

626 target_path.parent.mkdir(parents=True, exist_ok=True) 

627 

628 if converter is None: 

629 converter = LibreOfficeConverter() 

630 with tempfile.TemporaryDirectory() as tmpdir: 

631 rtf_path = Path(tmpdir) / f"{target_path.stem}.rtf" 

632 rtf_code = self.rtf_encode() 

633 rtf_path.write_text(rtf_code, encoding="utf-8") 

634 

635 with tempfile.TemporaryDirectory() as convert_tmpdir: 

636 converted = converter.convert( 

637 input_files=rtf_path, 

638 output_dir=Path(convert_tmpdir), 

639 format="pdf", 

640 overwrite=True, 

641 ) 

642 if not isinstance(converted, Path): 

643 raise TypeError( 

644 "LibreOffice conversion returned an unexpected output for a " 

645 "single input file; expected `Path`, got object of type " 

646 f"{type(converted)!r} with value {converted!r}." 

647 ) 

648 pdf_path = converted 

649 shutil.move(str(pdf_path), target_path) 

650 

651 print(target_path)