Coverage for src/rtflite/encode.py: 55%

210 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-07 05:03 +0000

1from collections.abc import MutableSequence 

2 

3import pandas as pd 

4from pydantic import BaseModel, ConfigDict, Field, model_validator 

5 

6from .input import ( 

7 BroadcastValue, 

8 RTFBody, 

9 RTFColumnHeader, 

10 RTFFootnote, 

11 RTFPage, 

12 RTFPageFooter, 

13 RTFPageHeader, 

14 RTFSource, 

15 RTFSubline, 

16 RTFTitle, 

17 TableAttributes, 

18) 

19from .row import Utils 

20 

21 

22class RTFDocument(BaseModel): 

23 model_config = ConfigDict(arbitrary_types_allowed=True) 

24 

25 df: pd.DataFrame = Field( 

26 ..., description="The DataFrame containing the data for the RTF document." 

27 ) 

28 rtf_page: RTFPage = Field( 

29 default_factory=lambda: RTFPage(), 

30 description="Page settings including size, orientation and margins", 

31 ) 

32 rtf_page_header: RTFPageHeader | None = Field( 

33 default=None, description="Text to appear in the header of each page" 

34 ) 

35 rtf_title: RTFTitle | None = Field( 

36 default_factory=lambda: RTFTitle(), 

37 description="Title section settings including text and formatting", 

38 ) 

39 rtf_subline: RTFSubline | None = Field( 

40 default=None, description="Subject line text to appear below the title" 

41 ) 

42 rtf_column_header: list[RTFColumnHeader] = Field( 

43 default_factory=lambda: [RTFColumnHeader()], 

44 description="Column header settings", 

45 ) 

46 rtf_body: RTFBody | None = Field( 

47 default_factory=lambda: RTFBody(), 

48 description="Table body section settings including column widths and formatting", 

49 ) 

50 rtf_footnote: RTFFootnote | None = Field( 

51 default=None, description="Footnote text to appear at bottom of document" 

52 ) 

53 rtf_source: RTFSource | None = Field( 

54 default=None, description="Data source citation text" 

55 ) 

56 rtf_page_footer: RTFPageFooter | None = Field( 

57 default=None, description="Text to appear in the footer of each page" 

58 ) 

59 

60 @model_validator(mode="after") 

61 def validate_column_names(self): 

62 columns = self.df.columns.tolist() 

63 

64 if self.rtf_body.group_by is not None: 

65 for column in self.rtf_body.group_by: 

66 if column not in columns: 

67 raise ValueError(f"`group_by` column {column} not found in `df`") 

68 

69 if self.rtf_body.page_by is not None: 

70 for column in self.rtf_body.page_by: 

71 if column not in columns: 

72 raise ValueError(f"`page_by` column {column} not found in `df`") 

73 

74 if self.rtf_body.subline_by is not None: 

75 for column in self.rtf_body.subline_by: 

76 if column not in columns: 

77 raise ValueError(f"`subline_by` column {column} not found in `df`") 

78 

79 return self 

80 

81 def __init__(self, **data): 

82 super().__init__(**data) 

83 dim = self.df.shape 

84 # Set default values 

85 self.rtf_body.col_rel_width = self.rtf_body.col_rel_width or [1] * dim[1] 

86 self._table_space = int( 

87 Utils._inch_to_twip(self.rtf_page.width - self.rtf_page.col_width) / 2 

88 ) 

89 

90 if self.rtf_subline is not None: 

91 if self.rtf_subline.text_indent_reference == "table": 

92 self.rtf_subline.text_space_before = ( 

93 self._table_space + self.rtf_subline.text_space_before 

94 ) 

95 self.rtf_subline.text_space_after = ( 

96 self._table_space + self.rtf_subline.text_space_after 

97 ) 

98 

99 if self.rtf_page_header is not None: 

100 if self.rtf_page_header.text_indent_reference == "table": 

101 self.rtf_page_header.text_space_before = ( 

102 self._table_space + self.rtf_page_header.text_space_before 

103 ) 

104 self.rtf_page_header.text_space_after = ( 

105 self._table_space + self.rtf_page_header.text_space_after 

106 ) 

107 

108 if self.rtf_page_footer is not None: 

109 if self.rtf_page_footer.text_indent_reference == "table": 

110 self.rtf_page_footer.text_space_before = ( 

111 self._table_space + self.rtf_page_footer.text_space_before 

112 ) 

113 self.rtf_page_footer.text_space_after = ( 

114 self._table_space + self.rtf_page_footer.text_space_after 

115 ) 

116 

117 def _rtf_page_encode(self) -> str: 

118 """Define RTF page settings""" 

119 page_size = [ 

120 f"\\paperw{Utils._inch_to_twip(self.rtf_page.width)}", 

121 f"\\paperh{Utils._inch_to_twip(self.rtf_page.height)}", 

122 ] 

123 page_size = "".join(page_size) 

124 

125 if self.rtf_page.orientation == "landscape": 

126 page_size += "\\landscape\n" 

127 else: 

128 page_size += "\n" 

129 

130 # Add page footer if exists 

131 # if self.rtf_page.page_footer: 

132 # footer = ["{\\footer", self._rtf_paragraph(self.rtf_page.page_footer), "}"] 

133 # page_size = "\n".join(footer + [page_size]) 

134 

135 # Add page header if exists 

136 # if self.rtf_page.page_header: 

137 # header = ["{\\header", self._rtf_paragraph(self.rtf_page.page_header), "}"] 

138 # page_size = "\n".join(header + [page_size]) 

139 

140 return page_size 

141 

142 def _rtf_page_margin_encode(self) -> str: 

143 """Define RTF margin settings""" 

144 margin_codes = [ 

145 "\\margl", 

146 "\\margr", 

147 "\\margt", 

148 "\\margb", 

149 "\\headery", 

150 "\\footery", 

151 ] 

152 margins = [Utils._inch_to_twip(m) for m in self.rtf_page.margin] 

153 margin = "".join( 

154 f"{code}{margin}" for code, margin in zip(margin_codes, margins) 

155 ) 

156 return margin + "\n" 

157 

158 def _rtf_page_header_encode(self, method: str) -> str: 

159 """Convert the RTF page header into RTF syntax using the Text class.""" 

160 if not self.rtf_page_header: 

161 return None 

162 

163 return self.rtf_page_header._encode( 

164 text=self.rtf_page_header.text, method=method 

165 ) 

166 

167 def _rtf_page_header_encode(self, method: str) -> str: 

168 """Convert the RTF page header into RTF syntax using the Text class.""" 

169 if self.rtf_page_header is None: 

170 return None 

171 

172 encode = self.rtf_page_header._encode( 

173 text=self.rtf_page_header.text, method=method 

174 ) 

175 return f"{ \\header{encode}} " 

176 

177 def _rtf_page_footer_encode(self, method: str) -> str: 

178 """Convert the RTF page footer into RTF syntax using the Text class.""" 

179 if self.rtf_page_footer is None: 

180 return None 

181 

182 encode = self.rtf_page_footer._encode( 

183 text=self.rtf_page_footer.text, method=method 

184 ) 

185 return f"{ \\footer{encode}} " 

186 

187 def _rtf_title_encode(self, method: str) -> str: 

188 """Convert the RTF title into RTF syntax using the Text class.""" 

189 if not self.rtf_title: 

190 return None 

191 

192 return self.rtf_title._encode(text=self.rtf_title.text, method=method) 

193 

194 def _rtf_subline_encode(self, method: str) -> str: 

195 """Convert the RTF subline into RTF syntax using the Text class.""" 

196 if self.rtf_subline is None: 

197 return None 

198 

199 encode = self.rtf_subline._encode(text=self.rtf_subline.text, method=method) 

200 return encode 

201 

202 def _page_by(self) -> list[list[tuple[int, int, int]]]: 

203 """Create components for page_by format. 

204 

205 This method organizes data into sections based on the page_by grouping variables. 

206 

207 Returns: 

208 A list of sections, where each section is a list of tuples (row_idx, col_idx, level). 

209 Each tuple represents: 

210 - row_idx: The row index in the dataframe 

211 - col_idx: The column index in the dataframe 

212 - level: The nesting level of the section header. 

213 

214 """ 

215 # obtain input data 

216 data = self.df.to_dict("records") 

217 var = self.rtf_body.page_by 

218 

219 # obtain column names and dimensions 

220 columns = list(data[0].keys()) 

221 dim = (len(data), len(columns)) 

222 

223 if var is None: 

224 return None 

225 

226 def get_column_index(column_name: str) -> int: 

227 """Get the index of a column in the column list.""" 

228 return columns.index(column_name) 

229 

230 def get_matching_rows(group_values: dict) -> list[int]: 

231 """Get row indices that match the group values.""" 

232 return [ 

233 i 

234 for i, row in enumerate(data) 

235 if all(row[k] == v for k, v in group_values.items()) 

236 ] 

237 

238 def get_unique_combinations(variables: list[str]) -> list[dict]: 

239 """Get unique combinations of values for the specified variables.""" 

240 seen = set() 

241 unique = [] 

242 for row in data: 

243 key = tuple(row[v] for v in variables) 

244 if key not in seen: 

245 seen.add(key) 

246 unique.append({v: row[v] for v in variables}) 

247 return unique 

248 

249 output = [] 

250 prev_values = {v: None for v in var} 

251 

252 # Process each unique combination of grouping variables 

253 for group in get_unique_combinations(var): 

254 indices = get_matching_rows(group) 

255 

256 # Handle headers for each level 

257 for level, var_name in enumerate(var): 

258 current_val = group[var_name] 

259 

260 need_header = False 

261 if level == len(var) - 1: 

262 need_header = True 

263 else: 

264 for l in range(level + 1): 

265 if group[var[l]] != prev_values[var[l]]: 

266 need_header = True 

267 break 

268 

269 if need_header: 

270 col_idx = get_column_index(var_name) 

271 # Add level information as third element in tuple 

272 output.append([(indices[0], col_idx, level)]) 

273 

274 prev_values[var_name] = current_val 

275 

276 # Handle data rows 

277 for index in indices: 

278 output.append( 

279 [ 

280 (index, j, len(var)) 

281 for j in range(len(columns)) 

282 if columns[j] not in var 

283 ] 

284 ) 

285 

286 return output 

287 

288 def _rtf_footnote_encode(self) -> str: 

289 """Convert the RTF footnote into RTF syntax using the Text class.""" 

290 rtf_attrs = self.rtf_footnote 

291 

292 if rtf_attrs is None: 

293 return None 

294 

295 col_total_width = self.rtf_page.col_width 

296 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, col_total_width) 

297 return rtf_attrs._encode(rtf_attrs.text, col_widths) 

298 

299 def _rtf_source_encode(self) -> str: 

300 """Convert the RTF source into RTF syntax using the Text class.""" 

301 rtf_attrs = self.rtf_source 

302 

303 if rtf_attrs is None: 

304 return None 

305 

306 col_total_width = self.rtf_page.col_width 

307 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, col_total_width) 

308 return rtf_attrs._encode(rtf_attrs.text, col_widths) 

309 

310 def _rtf_body_encode( 

311 self, df: pd.DataFrame, rtf_attrs: TableAttributes | None 

312 ) -> MutableSequence[str]: 

313 """Convert the RTF table into RTF syntax using the Cell class. 

314 

315 Args: 

316 df: Input DataFrame to encode 

317 rtf_attrs: Table attributes for styling 

318 

319 Returns: 

320 List of RTF-encoded strings representing table rows 

321 """ 

322 if rtf_attrs is None: 

323 return None 

324 

325 # Initialize dimensions and widths 

326 dim = df.shape 

327 col_total_width = self.rtf_page.col_width 

328 page_by = self._page_by() 

329 

330 if page_by is None: 

331 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, col_total_width) 

332 return rtf_attrs._encode(df, col_widths) 

333 

334 rows = [] 

335 for section in page_by: 

336 # Skip empty sections 

337 indices = [(row, col) for row, col, level in section] 

338 if not indices: 

339 continue 

340 

341 # Create DataFrame for current section 

342 section_df = pd.DataFrame( 

343 { 

344 i: [BroadcastValue(value=df).iloc(row, col)] 

345 for i, (row, col) in enumerate(indices) 

346 } 

347 ) 

348 

349 # Collect all text and table attributes 

350 section_attrs_dict = rtf_attrs._get_section_attributes(indices) 

351 section_attrs = TableAttributes(**section_attrs_dict) 

352 

353 # Calculate column widths and encode section 

354 col_widths = Utils._col_widths(section_attrs.col_rel_width, col_total_width) 

355 rows.extend(section_attrs._encode(section_df, col_widths)) 

356 

357 return rows 

358 

359 def _rtf_column_header_encode( 

360 self, df: pd.DataFrame, rtf_attrs: TableAttributes | None 

361 ) -> MutableSequence[str]: 

362 dim = df.shape 

363 col_total_width = self.rtf_page.col_width 

364 

365 if rtf_attrs is None: 

366 return None 

367 

368 rtf_attrs.col_rel_width = rtf_attrs.col_rel_width or [1] * dim[1] 

369 rtf_attrs = rtf_attrs._set_default() 

370 

371 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, col_total_width) 

372 

373 return rtf_attrs._encode(df, col_widths) 

374 

375 def _rtf_start_encode(self) -> str: 

376 return "{\\rtf1\\ansi\n\\deff0\\deflang1033" 

377 

378 def _rtf_font_table_encode(self) -> str: 

379 """Define RTF fonts""" 

380 font_types = Utils._font_type() 

381 font_rtf = [f"\\f{i}" for i in range(10)] 

382 font_style = font_types["style"] 

383 font_name = font_types["name"] 

384 font_charset = font_types["charset"] 

385 

386 font_table = "{\\fonttbl" 

387 for rtf, style, name, charset in zip( 

388 font_rtf, font_style, font_name, font_charset 

389 ): 

390 font_table += f"{ {rtf}{style}{charset}\\fprq2 {name};} \n" 

391 font_table += "}" 

392 

393 return font_table 

394 

395 def rtf_encode(self) -> str: 

396 """Generate RTF code""" 

397 dim = self.df.shape 

398 

399 # Title 

400 rtf_title = self._rtf_title_encode(method="line") 

401 

402 # Page Border 

403 doc_border_top = BroadcastValue( 

404 value=self.rtf_page.border_first, dimension=(1, dim[1]) 

405 ).to_list()[0] 

406 doc_border_bottom = BroadcastValue( 

407 value=self.rtf_page.border_last, dimension=(1, dim[1]) 

408 ).to_list()[0] 

409 page_border_top = BroadcastValue( 

410 value=self.rtf_body.border_first, dimension=(1, dim[1]) 

411 ).to_list()[0] 

412 page_border_bottom = BroadcastValue( 

413 value=self.rtf_body.border_last, dimension=(1, dim[1]) 

414 ).to_list()[0] 

415 

416 # Column header 

417 if self.rtf_column_header is None: 

418 rtf_column_header = "" 

419 self.rtf_body.border_top = BroadcastValue( 

420 value=self.rtf_body.border_top, dimension=dim 

421 ).update_row(0, doc_border_top) 

422 else: 

423 if self.rtf_column_header[0].text is None and self.rtf_body.as_colheader: 

424 columns = [ 

425 col 

426 for col in self.df.columns 

427 if col not in (self.rtf_body.page_by or []) 

428 ] 

429 self.rtf_column_header[0].text = pd.DataFrame([columns]) 

430 self.rtf_column_header = self.rtf_column_header[:1] 

431 

432 self.rtf_column_header[0].border_top = BroadcastValue( 

433 value=self.rtf_column_header[0].border_top, dimension=dim 

434 ).update_row(0, doc_border_top) 

435 

436 rtf_column_header = [ 

437 self._rtf_column_header_encode(df=header.text, rtf_attrs=header) 

438 for header in self.rtf_column_header 

439 ] 

440 

441 self.rtf_body.border_top = BroadcastValue( 

442 value=self.rtf_body.border_top, dimension=dim 

443 ).update_row(0, page_border_top) 

444 

445 # Bottom border last line update 

446 if self.rtf_footnote is not None: 

447 self.rtf_footnote.border_bottom = BroadcastValue( 

448 value=self.rtf_footnote.border_bottom, dimension=(1, 1) 

449 ).update_row(0, page_border_bottom[0]) 

450 

451 self.rtf_footnote.border_bottom = BroadcastValue( 

452 value=self.rtf_footnote.border_bottom, dimension=(1, 1) 

453 ).update_row(0, doc_border_bottom[0]) 

454 else: 

455 self.rtf_body.border_bottom = BroadcastValue( 

456 value=self.rtf_body.border_bottom, dimension=dim 

457 ).update_row(dim[0] - 1, page_border_bottom) 

458 

459 self.rtf_body.border_bottom = BroadcastValue( 

460 value=self.rtf_body.border_bottom, dimension=dim 

461 ).update_row(dim[0] - 1, doc_border_bottom) 

462 

463 # Body 

464 rtf_body = self._rtf_body_encode(df=self.df, rtf_attrs=self.rtf_body) 

465 

466 return "\n".join( 

467 [ 

468 item 

469 for item in [ 

470 self._rtf_start_encode(), 

471 self._rtf_font_table_encode(), 

472 "\n", 

473 self._rtf_page_encode(), 

474 self._rtf_page_margin_encode(), 

475 self._rtf_page_header_encode(method="line"), 

476 self._rtf_page_footer_encode(method="line"), 

477 rtf_title, 

478 "\n", 

479 self._rtf_subline_encode(method="line"), 

480 "\n".join( 

481 header for sublist in rtf_column_header for header in sublist 

482 ) 

483 if rtf_column_header 

484 else None, 

485 "\n".join(rtf_body), 

486 "\n".join(self._rtf_footnote_encode()) 

487 if self.rtf_footnote is not None 

488 else None, 

489 "\n".join(self._rtf_source_encode()) 

490 if self.rtf_source is not None 

491 else None, 

492 "\n\n", 

493 "}", 

494 ] 

495 if item is not None 

496 ] 

497 ) 

498 

499 def write_rtf(self, file_path: str) -> None: 

500 """Write the RTF code into a `.rtf` file.""" 

501 print(file_path) 

502 rtf_code = self.rtf_encode() 

503 with open(file_path, "w", encoding="utf-8") as f: 

504 f.write(rtf_code)