Coverage for src / rtflite / encode.py: 82%

152 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-08 04:50 +0000

1"""RTF Document class - main entry point for RTF generation. 

2 

3This module provides the RTFDocument class with a clean, service-oriented architecture. 

4All complex logic has been delegated to specialized services and strategies. 

5""" 

6 

7import shutil 

8import tempfile 

9from collections.abc import Sequence 

10from pathlib import Path 

11 

12import polars as pl 

13from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator 

14 

15from .convert import LibreOfficeConverter 

16from .input import ( 

17 RTFBody, 

18 RTFColumnHeader, 

19 RTFFigure, 

20 RTFFootnote, 

21 RTFPage, 

22 RTFPageFooter, 

23 RTFPageHeader, 

24 RTFSource, 

25 RTFSubline, 

26 RTFTitle, 

27) 

28from .row import Utils 

29 

30 

31class RTFDocument(BaseModel): 

32 """Main class for creating RTF documents with tables, text, and figures. 

33 

34 RTFDocument is the central class for generating Rich Text Format (RTF) files 

35 containing formatted tables, titles, footnotes, and other document elements. 

36 It provides a comprehensive API for creating professional documents commonly 

37 used in clinical trials, scientific research, and data reporting. 

38 

39 Examples: 

40 Simple table with title: 

41 ```python 

42 import rtflite as rtf 

43 import polars as pl 

44 

45 df = pl.DataFrame({ 

46 "Subject": ["001", "002", "003"], 

47 "Age": [45, 52, 38], 

48 "Treatment": ["Drug A", "Drug B", "Placebo"] 

49 }) 

50 

51 doc = rtf.RTFDocument( 

52 df=df, 

53 rtf_title=rtf.RTFTitle(text="Patient Demographics"), 

54 rtf_body=rtf.RTFBody(col_rel_width=[2, 1, 2]) 

55 ) 

56 doc.write_rtf("demographics.rtf") 

57 ``` 

58 

59 Multi-page document with headers and footers: 

60 ```python 

61 doc = rtf.RTFDocument( 

62 df=large_df, 

63 rtf_page=rtf.RTFPage(nrow=40, orientation="landscape"), 

64 rtf_page_header=rtf.RTFPageHeader(), # Default page numbering 

65 rtf_page_footer=rtf.RTFPageFooter(text="Confidential"), 

66 rtf_title=rtf.RTFTitle(text="Clinical Study Results"), 

67 rtf_column_header=rtf.RTFColumnHeader( 

68 text=["Subject ID", "Visit", "Result", "Units"] 

69 ), 

70 rtf_body=rtf.RTFBody( 

71 col_rel_width=[2, 1, 1, 1], 

72 text_justification=[["l", "c", "r", "c"]] 

73 ), 

74 rtf_footnote=rtf.RTFFootnote( 

75 text="Results are mean +/- SD" 

76 ) 

77 ) 

78 doc.write_rtf("results.rtf") 

79 ``` 

80 

81 Document with grouped data and sublines: 

82 ```python 

83 doc = rtf.RTFDocument( 

84 df=grouped_df, 

85 rtf_body=rtf.RTFBody( 

86 group_by=["SITE", "TREATMENT"], # Suppress duplicate values 

87 subline_by=["STUDY_PHASE"], # Create section headers 

88 col_rel_width=[2, 2, 1, 1] 

89 ) 

90 ) 

91 ``` 

92 

93 Attributes: 

94 df: Data to display in the table. Can be a single DataFrame or list of 

95 DataFrames for multi-section documents. Accepts pandas or polars 

96 DataFrames (automatically converted to polars internally). 

97 

98 rtf_page: Page configuration including size, orientation, margins, and 

99 pagination settings. 

100 

101 rtf_page_header: Optional header appearing at the top of every page. 

102 

103 rtf_page_footer: Optional footer appearing at the bottom of every page. 

104 

105 rtf_title: Document title(s) displayed at the top. 

106 

107 rtf_column_header: Column headers for the table. Can be a single header 

108 or list of headers for multi-row headers. 

109 

110 rtf_body: Table body configuration including column widths, formatting, 

111 borders, and special features like group_by and subline_by. 

112 

113 rtf_footnote: Optional footnote text displayed after the table. 

114 

115 rtf_source: Optional source citation displayed at the very bottom. 

116 

117 rtf_figure: Optional figure/image to embed in the document. 

118 

119 Methods: 

120 rtf_encode(): Generate the complete RTF document as a string. 

121 write_rtf(file_path): Write the RTF document to a file. 

122 """ 

123 

124 model_config = ConfigDict(arbitrary_types_allowed=True) 

125 

126 # Core data 

127 df: pl.DataFrame | list[pl.DataFrame] | None = Field( 

128 default=None, 

129 description=( 

130 "The DataFrame(s) containing the data for the RTF document. " 

131 "Accepts single DataFrame or list of DataFrames for " 

132 "multi-section documents. Accepts pandas or polars DataFrame, " 

133 "internally converted to polars. Optional when using figure-only " 

134 "documents." 

135 ), 

136 ) 

137 

138 # Document structure 

139 rtf_page: RTFPage = Field( 

140 default_factory=lambda: RTFPage(), 

141 description="Page settings including size, orientation and margins", 

142 ) 

143 rtf_page_header: RTFPageHeader | None = Field( 

144 default=None, description="Text to appear in the header of each page" 

145 ) 

146 rtf_title: RTFTitle | None = Field( 

147 default_factory=lambda: RTFTitle(), 

148 description="Title section settings including text and formatting", 

149 ) 

150 rtf_subline: RTFSubline | None = Field( 

151 default=None, description="Subject line text to appear below the title" 

152 ) 

153 rtf_column_header: ( 

154 Sequence[RTFColumnHeader] | Sequence[Sequence[RTFColumnHeader | None]] 

155 ) = Field( 

156 default_factory=lambda: [RTFColumnHeader()], 

157 description=( 

158 "Column header settings. For multi-section documents, use nested " 

159 "list format: [[header1], [header2], [None]] where None means no " 

160 "header for that section." 

161 ), 

162 ) 

163 rtf_body: RTFBody | Sequence[RTFBody] | None = Field( 

164 default_factory=lambda: RTFBody(), 

165 description=( 

166 "Table body section settings including column widths and " 

167 "formatting. For multi-section documents, provide a list of " 

168 "RTFBody objects." 

169 ), 

170 ) 

171 rtf_footnote: RTFFootnote | None = Field( 

172 default=None, description="Footnote text to appear at bottom of document" 

173 ) 

174 rtf_source: RTFSource | None = Field( 

175 default=None, description="Data source citation text" 

176 ) 

177 rtf_page_footer: RTFPageFooter | None = Field( 

178 default=None, description="Text to appear in the footer of each page" 

179 ) 

180 rtf_figure: RTFFigure | None = Field( 

181 default=None, description="Figure/image content to embed in the document" 

182 ) 

183 

184 @field_validator("rtf_column_header", mode="before") 

185 def convert_column_header_to_list(cls, v): 

186 """Convert single RTFColumnHeader to list or handle nested list format""" 

187 if v is not None and isinstance(v, RTFColumnHeader): 

188 return [v] 

189 return v 

190 

191 @model_validator(mode="before") 

192 @classmethod 

193 def validate_dataframe(cls, values): 

194 """Convert DataFrame(s) to polars for internal processing.""" 

195 if "df" in values and values["df"] is not None: 

196 df = values["df"] 

197 import narwhals as nw 

198 import polars as pl 

199 

200 # Handle single DataFrame 

201 if not isinstance(df, list): 

202 if isinstance(df, pl.DataFrame): 

203 pass # Already polars 

204 else: 

205 # Use narwhals to handle any DataFrame type 

206 try: 

207 nw_df = nw.from_native(df) 

208 values["df"] = nw_df.to_native(pl.DataFrame) 

209 except Exception as e: 

210 raise ValueError( 

211 f"DataFrame must be a valid DataFrame: {str(e)}" 

212 ) from e 

213 # Handle list of DataFrames 

214 else: 

215 converted_dfs = [] 

216 for i, single_df in enumerate(df): 

217 if isinstance(single_df, pl.DataFrame): 

218 converted_dfs.append(single_df) 

219 else: 

220 try: 

221 # Use narwhals to handle any DataFrame type 

222 nw_df = nw.from_native(single_df) 

223 converted_dfs.append(nw_df.to_native(pl.DataFrame)) 

224 except Exception as e: 

225 raise ValueError( 

226 f"DataFrame at index {i} must be a valid " 

227 f"DataFrame: {str(e)}" 

228 ) from e 

229 values["df"] = converted_dfs 

230 return values 

231 

232 @model_validator(mode="after") 

233 def validate_column_names(self): 

234 """Validate column references and multi-section consistency.""" 

235 # Validate df and rtf_figure usage 

236 if self.df is None and self.rtf_figure is None: 

237 raise ValueError("Either 'df' or 'rtf_figure' must be provided") 

238 

239 if self.df is not None and self.rtf_figure is not None: 

240 raise ValueError( 

241 "Cannot use both 'df' and 'rtf_figure' together. Use either " 

242 "tables or figures in a single document." 

243 ) 

244 

245 # When RTFFigure is used, enforce as_table=False for footnotes and sources 

246 if self.rtf_figure is not None: 

247 if self.rtf_footnote is not None and getattr( 

248 self.rtf_footnote, "as_table", True 

249 ): 

250 raise ValueError( 

251 "When using RTFFigure, RTFFootnote must have as_table=False" 

252 ) 

253 if self.rtf_source is not None and getattr( 

254 self.rtf_source, "as_table", False 

255 ): 

256 raise ValueError( 

257 "When using RTFFigure, RTFSource must have as_table=False" 

258 ) 

259 

260 # Skip column validation if no DataFrame provided (figure-only documents) 

261 if self.df is None: 

262 return self 

263 

264 # Multi-section validation 

265 is_multi_section = isinstance(self.df, list) 

266 if is_multi_section: 

267 # Validate rtf_body is also a list with matching length 

268 if not isinstance(self.rtf_body, list): 

269 raise ValueError("When df is a list, rtf_body must also be a list") 

270 if len(self.df) != len(self.rtf_body): 

271 raise ValueError( 

272 "df list length " 

273 f"({len(self.df)}) must match rtf_body list length " 

274 f"({len(self.rtf_body)})" 

275 ) 

276 

277 # Validate rtf_column_header if it's nested list format 

278 if ( 

279 isinstance(self.rtf_column_header, list) 

280 and self.rtf_column_header 

281 and isinstance(self.rtf_column_header[0], list) 

282 and len(self.rtf_column_header) != len(self.df) 

283 ): 

284 raise ValueError( 

285 "rtf_column_header nested list length " 

286 f"({len(self.rtf_column_header)}) must match df list " 

287 f"length ({len(self.df)})" 

288 ) 

289 

290 # Per-section column validation 

291 for i, (section_df, section_body) in enumerate( 

292 zip(self.df, self.rtf_body, strict=True) 

293 ): 

294 self._validate_section_columns(section_df, section_body, i) 

295 else: 

296 # Single section validation (existing logic) 

297 self._validate_section_columns(self.df, self.rtf_body, 0) 

298 

299 return self 

300 

301 def _validate_section_columns(self, df, body, section_index): 

302 """Validate column references for a single section.""" 

303 columns = df.columns 

304 section_label = f"section {section_index}" if section_index > 0 else "df" 

305 

306 if body.group_by is not None: 

307 for column in body.group_by: 

308 if column not in columns: 

309 raise ValueError( 

310 f"`group_by` column {column} not found in {section_label}" 

311 ) 

312 

313 if body.page_by is not None: 

314 for column in body.page_by: 

315 if column not in columns: 

316 raise ValueError( 

317 f"`page_by` column {column} not found in {section_label}" 

318 ) 

319 

320 if body.subline_by is not None: 

321 for column in body.subline_by: 

322 if column not in columns: 

323 raise ValueError( 

324 f"`subline_by` column {column} not found in {section_label}" 

325 ) 

326 

327 def __init__(self, **data): 

328 super().__init__(**data) 

329 

330 # Set default column widths based on DataFrame dimensions when a 

331 # DataFrame is provided. 

332 if self.df is not None: 

333 is_multi_section = isinstance(self.df, list) 

334 

335 if is_multi_section: 

336 # Handle multi-section documents 

337 for section_df, section_body in zip( 

338 self.df, self.rtf_body, strict=True 

339 ): 

340 dim = section_df.shape 

341 if section_body.col_rel_width is None: 

342 section_body.col_rel_width = [1] * dim[1] 

343 elif len(section_body.col_rel_width) == 1 and dim[1] > 1: 

344 section_body.col_rel_width = section_body.col_rel_width * dim[1] 

345 

346 # Handle column headers for multi-section 

347 if self.rtf_column_header and isinstance( 

348 self.rtf_column_header[0], list 

349 ): 

350 # Nested list format: [[header1], [header2], [None]] 

351 for section_headers, section_body in zip( 

352 self.rtf_column_header, self.rtf_body, strict=True 

353 ): 

354 if section_headers: # Skip if [None] 

355 for header in section_headers: 

356 if header and header.col_rel_width is None: 

357 header.col_rel_width = ( 

358 section_body.col_rel_width.copy() 

359 ) 

360 elif self.rtf_column_header: 

361 # Flat list format - apply to first section only 

362 for header in self.rtf_column_header: 

363 if header.col_rel_width is None: 

364 header.col_rel_width = self.rtf_body[0].col_rel_width.copy() 

365 else: 

366 # Handle single section documents (existing logic) 

367 dim = self.df.shape 

368 if self.rtf_body.col_rel_width is None: 

369 self.rtf_body.col_rel_width = [1] * dim[1] 

370 elif len(self.rtf_body.col_rel_width) == 1 and dim[1] > 1: 

371 self.rtf_body.col_rel_width = self.rtf_body.col_rel_width * dim[1] 

372 

373 # Inherit col_rel_width from rtf_body to rtf_column_header if 

374 # not specified 

375 if self.rtf_column_header: 

376 for header in self.rtf_column_header: 

377 if header.col_rel_width is None: 

378 header.col_rel_width = self.rtf_body.col_rel_width.copy() 

379 

380 # Calculate table spacing for text components 

381 self._table_space = int( 

382 Utils._inch_to_twip(self.rtf_page.width - self.rtf_page.col_width) / 2 

383 ) 

384 

385 # Apply table spacing to text components if needed 

386 self._apply_table_spacing() 

387 

388 def _apply_table_spacing(self): 

389 """Apply table-based spacing to text components that reference the table.""" 

390 for component in [self.rtf_subline, self.rtf_page_header, self.rtf_page_footer]: 

391 if component is not None and component.text_indent_reference == "table": 

392 component.text_space_before = ( 

393 self._table_space + component.text_space_before 

394 ) 

395 component.text_space_after = ( 

396 self._table_space + component.text_space_after 

397 ) 

398 

399 def rtf_encode(self) -> str: 

400 """Generate the complete RTF document as a string. 

401 

402 This method processes all document components and generates the final 

403 RTF code including headers, formatting, tables, and all other elements. 

404 The resulting string can be written to a file or processed further. 

405 

406 Returns: 

407 str: Complete RTF document string ready to be saved as an .rtf file. 

408 

409 Examples: 

410 ```python 

411 doc = RTFDocument(df=data, rtf_title=RTFTitle(text="Report")) 

412 rtf_string = doc.rtf_encode() 

413 # Can write manually or process further 

414 with open("output.rtf", "w") as f: 

415 f.write(rtf_string) 

416 ``` 

417 """ 

418 from .encoding import RTFEncodingEngine 

419 

420 engine = RTFEncodingEngine() 

421 return engine.encode_document(self) 

422 

423 def write_rtf(self, file_path: str | Path) -> None: 

424 """Write the RTF document to a file. 

425 

426 Generates the complete RTF document and writes it to the specified file path. 

427 The file is written in UTF-8 encoding and will have the `.rtf` extension. 

428 

429 Args: 

430 file_path: Path where the RTF file should be saved. 

431 Accepts string or Path input. Can be absolute or relative. 

432 Directories are created if they do not already exist. 

433 

434 Examples: 

435 ```python 

436 doc = RTFDocument(df=data, rtf_title=RTFTitle(text="Report")) 

437 doc.write_rtf("output/report.rtf") 

438 ``` 

439 

440 Note: 

441 The method prints the file path to stdout for confirmation. 

442 """ 

443 target_path = Path(file_path).expanduser() 

444 target_path.parent.mkdir(parents=True, exist_ok=True) 

445 print(target_path) 

446 rtf_code = self.rtf_encode() 

447 target_path.write_text(rtf_code, encoding="utf-8") 

448 

449 def write_docx(self, file_path: str | Path) -> None: 

450 """Write the document as a DOCX file. 

451 

452 Writes the document to a temporary RTF file first, and then converts 

453 it to DOCX with LibreOffice. Temporary directories are used for 

454 all intermediate files to avoid placing artifacts alongside the 

455 requested output path. 

456 

457 Args: 

458 file_path: Destination path for the DOCX file. 

459 Accepts string or Path input. Can be absolute or relative. 

460 Directories are created if they do not already exist. 

461 

462 Examples: 

463 ```python 

464 doc = RTFDocument(df=data, rtf_title=RTFTitle(text="Report")) 

465 doc.write_docx("output/report.docx") 

466 ``` 

467 

468 Note: 

469 The method prints the file path to stdout for confirmation. 

470 """ 

471 target_path = Path(file_path).expanduser() 

472 target_path.parent.mkdir(parents=True, exist_ok=True) 

473 

474 with tempfile.TemporaryDirectory() as tmpdir: 

475 rtf_path = Path(tmpdir) / f"{target_path.stem}.rtf" 

476 rtf_code = self.rtf_encode() 

477 rtf_path.write_text(rtf_code, encoding="utf-8") 

478 

479 converter = LibreOfficeConverter() 

480 with tempfile.TemporaryDirectory() as convert_tmpdir: 

481 docx_path = converter.convert( 

482 input_files=rtf_path, 

483 output_dir=Path(convert_tmpdir), 

484 format="docx", 

485 overwrite=True, 

486 ) 

487 shutil.move(str(docx_path), target_path) 

488 

489 print(target_path)