Coverage for src/rtflite/encode.py: 63%

146 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-02-03 15:40 +0000

1from collections.abc import MutableSequence 

2 

3import pandas as pd 

4from pydantic import BaseModel, ConfigDict, Field 

5 

6from .input import ( 

7 BroadcastValue, 

8 RTFBody, 

9 RTFColumnHeader, 

10 RTFPage, 

11 RTFTitle, 

12 TableAttributes, 

13) 

14from .row import Border, Cell, Row, TextContent, Utils 

15from .strwidth import get_string_width 

16 

17 

18class RTFDocument(BaseModel): 

19 model_config = ConfigDict(arbitrary_types_allowed=True) 

20 

21 df: pd.DataFrame = Field( 

22 ..., description="The DataFrame containing the data for the RTF document." 

23 ) 

24 rtf_page: RTFPage = Field( 

25 default_factory=lambda: RTFPage(), 

26 description="Page settings including size, orientation and margins", 

27 ) 

28 rtf_page_header: str | None = Field( 

29 default=None, description="Text to appear in the header of each page" 

30 ) 

31 rtf_title: RTFTitle | None = Field( 

32 default_factory=lambda: RTFTitle(), 

33 description="Title section settings including text and formatting", 

34 ) 

35 rtf_subline: str | None = Field( 

36 default=None, description="Subject line text to appear below the title" 

37 ) 

38 rtf_column_header: list[RTFColumnHeader] = Field( 

39 default_factory=lambda: [RTFColumnHeader()], 

40 description="Column header settings", 

41 ) 

42 rtf_body: RTFBody | None = Field( 

43 default_factory=lambda: RTFBody(), 

44 description="Table body section settings including column widths and formatting", 

45 ) 

46 rtf_footnote: str | None = Field( 

47 default=None, description="Footnote text to appear at bottom of document" 

48 ) 

49 rtf_source: str | None = Field( 

50 default=None, description="Data source citation text" 

51 ) 

52 rtf_page_footer: str | None = Field( 

53 default=None, description="Text to appear in the footer of each page" 

54 ) 

55 

56 def _rtf_page_encode(self) -> str: 

57 """Define RTF page settings""" 

58 

59 self.rtf_page = self.rtf_page._set_default() 

60 

61 page_size = [ 

62 f"\\paperw{Utils._inch_to_twip(self.rtf_page.width)}", 

63 f"\\paperh{Utils._inch_to_twip(self.rtf_page.height)}", 

64 ] 

65 page_size = "".join(page_size) 

66 

67 if self.rtf_page.orientation == "landscape": 

68 page_size += "\\landscape\n" 

69 else: 

70 page_size += "\n" 

71 

72 # Add page footer if exists 

73 # if self.rtf_page.page_footer: 

74 # footer = ["{\\footer", self._rtf_paragraph(self.rtf_page.page_footer), "}"] 

75 # page_size = "\n".join(footer + [page_size]) 

76 

77 # Add page header if exists 

78 # if self.rtf_page.page_header: 

79 # header = ["{\\header", self._rtf_paragraph(self.rtf_page.page_header), "}"] 

80 # page_size = "\n".join(header + [page_size]) 

81 

82 return page_size 

83 

84 def _rtf_page_margin_encode(self) -> str: 

85 """Define RTF margin settings""" 

86 

87 self.rtf_page = self.rtf_page._set_default() 

88 

89 margin_codes = [ 

90 "\\margl", 

91 "\\margr", 

92 "\\margt", 

93 "\\margb", 

94 "\\headery", 

95 "\\footery", 

96 ] 

97 margins = [Utils._inch_to_twip(m) for m in self.rtf_page.margin] 

98 margin = "".join( 

99 f"{code}{margin}" for code, margin in zip(margin_codes, margins) 

100 ) 

101 return margin + "\n" 

102 

103 def _rtf_title_encode(self, method: str) -> str: 

104 """Convert the RTF title into RTF syntax using the Text class.""" 

105 if not self.rtf_title: 

106 return None 

107 

108 self.rtf_title = self.rtf_title._set_default() 

109 

110 return self.rtf_title._encode(text=self.rtf_title.text, method=method) 

111 

112 def _page_by(self) -> list[list[tuple[int, int, int]]]: 

113 """Create components for page_by format. 

114 

115 This method organizes data into sections based on the page_by grouping variables. 

116 

117 Returns: 

118 A list of sections, where each section is a list of tuples (row_idx, col_idx, level). 

119 Each tuple represents: 

120 - row_idx: The row index in the dataframe 

121 - col_idx: The column index in the dataframe 

122 - level: The nesting level of the section header. 

123 

124 """ 

125 # obtain input data 

126 data = self.df.to_dict("records") 

127 var = self.rtf_body.page_by 

128 

129 # obtain column names and dimensions 

130 columns = list(data[0].keys()) 

131 dim = (len(data), len(columns)) 

132 

133 if var is None: 

134 return None 

135 

136 def get_column_index(column_name: str) -> int: 

137 """Get the index of a column in the column list.""" 

138 return columns.index(column_name) 

139 

140 def get_matching_rows(group_values: dict) -> list[int]: 

141 """Get row indices that match the group values.""" 

142 return [ 

143 i 

144 for i, row in enumerate(data) 

145 if all(row[k] == v for k, v in group_values.items()) 

146 ] 

147 

148 def get_unique_combinations(variables: list[str]) -> list[dict]: 

149 """Get unique combinations of values for the specified variables.""" 

150 seen = set() 

151 unique = [] 

152 for row in data: 

153 key = tuple(row[v] for v in variables) 

154 if key not in seen: 

155 seen.add(key) 

156 unique.append({v: row[v] for v in variables}) 

157 return unique 

158 

159 output = [] 

160 prev_values = {v: None for v in var} 

161 

162 # Process each unique combination of grouping variables 

163 for group in get_unique_combinations(var): 

164 indices = get_matching_rows(group) 

165 

166 # Handle headers for each level 

167 for level, var_name in enumerate(var): 

168 current_val = group[var_name] 

169 

170 # Check if we need to print this level's header 

171 # We print if either: 

172 # 1. This is the deepest level (always print) 

173 # 2. The value at this level has changed 

174 need_header = False 

175 if level == len(var) - 1: 

176 need_header = True 

177 else: 

178 for l in range(level + 1): 

179 if group[var[l]] != prev_values[var[l]]: 

180 need_header = True 

181 break 

182 

183 if need_header: 

184 col_idx = get_column_index(var_name) 

185 # Add level information as third element in tuple 

186 output.append([(indices[0], col_idx, level)]) 

187 

188 prev_values[var_name] = current_val 

189 

190 # Handle data rows 

191 for index in indices: 

192 output.append( 

193 [ 

194 (index, j, len(var)) 

195 for j in range(len(columns)) 

196 if columns[j] not in var 

197 ] 

198 ) 

199 

200 return output 

201 

202 def _rtf_body_encode( 

203 self, df: pd.DataFrame, rtf_attrs: TableAttributes | None 

204 ) -> MutableSequence[str]: 

205 """Convert the RTF table into RTF syntax using the Cell class. 

206 

207 Args: 

208 df: Input DataFrame to encode 

209 rtf_attrs: Table attributes for styling 

210 

211 Returns: 

212 List of RTF-encoded strings representing table rows 

213 """ 

214 if rtf_attrs is None: 

215 return None 

216 

217 # Initialize dimensions and widths 

218 dim = df.shape 

219 col_total_width = self.rtf_page._set_default().col_width 

220 page_by = self._page_by() 

221 

222 if page_by is None: 

223 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, col_total_width) 

224 return rtf_attrs._encode(df, col_widths) 

225 

226 rows = [] 

227 for section in page_by: 

228 # Skip empty sections 

229 indices = [(row, col) for row, col, level in section] 

230 if not indices: 

231 continue 

232 

233 # Create DataFrame for current section 

234 section_df = pd.DataFrame( 

235 { 

236 i: [BroadcastValue(value=df).iloc(row, col)] 

237 for i, (row, col) in enumerate(indices) 

238 } 

239 ) 

240 

241 # Collect all text and table attributes 

242 section_attrs_dict = rtf_attrs._get_section_attributes(indices) 

243 section_attrs = TableAttributes(**section_attrs_dict) 

244 

245 # Calculate column widths and encode section 

246 col_widths = Utils._col_widths(section_attrs.col_rel_width, col_total_width) 

247 rows.extend(section_attrs._encode(section_df, col_widths)) 

248 

249 return rows 

250 

251 def _rtf_column_header_encode( 

252 self, df: pd.DataFrame, rtf_attrs: TableAttributes | None 

253 ) -> MutableSequence[str]: 

254 dim = df.shape 

255 col_total_width = self.rtf_page._set_default().col_width 

256 

257 if rtf_attrs is None: 

258 return None 

259 

260 rtf_attrs.col_rel_width = rtf_attrs.col_rel_width or [1] * dim[1] 

261 rtf_attrs = rtf_attrs._set_default() 

262 

263 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, col_total_width) 

264 

265 return rtf_attrs._encode(df, col_widths) 

266 

267 def _rtf_start_encode(self) -> str: 

268 return "{\\rtf1\\ansi\n\\deff0\\deflang1033" 

269 

270 def _rtf_font_table_encode(self) -> str: 

271 """Define RTF fonts""" 

272 font_types = Utils._font_type() 

273 font_rtf = [f"\\f{i}" for i in range(10)] 

274 font_style = font_types["style"] 

275 font_name = font_types["name"] 

276 

277 font_table = "{\\fonttbl" 

278 for rtf, style, name in zip(font_rtf, font_style, font_name): 

279 font_table += f"{ {rtf}{style}\\fcharset161\\fprq2 {name};} \n" 

280 font_table += "}" 

281 

282 return font_table 

283 

284 def rtf_encode(self) -> str: 

285 """Generate RTF code""" 

286 dim = self.df.shape 

287 # Set default values 

288 self.rtf_body.col_rel_width = self.rtf_body.col_rel_width or [1] * dim[1] 

289 self.rtf_body = self.rtf_body._set_default() 

290 

291 # Title 

292 rtf_title = self._rtf_title_encode(method="line") 

293 

294 # Page Border 

295 doc_border_top = BroadcastValue( 

296 value=self.rtf_page.border_first, dimension=(1, dim[1]) 

297 ).to_dataframe() 

298 doc_border_bottom = BroadcastValue( 

299 value=self.rtf_page.border_last, dimension=(1, dim[1]) 

300 ).to_dataframe() 

301 page_border_top = BroadcastValue( 

302 value=self.rtf_body.border_first, dimension=(1, dim[1]) 

303 ).to_dataframe() 

304 page_border_bottom = BroadcastValue( 

305 value=self.rtf_body.border_last, dimension=(1, dim[1]) 

306 ).to_dataframe() 

307 

308 # Column header 

309 if self.rtf_column_header is None: 

310 rtf_column_header = "" 

311 self.rtf_body.border_top = BroadcastValue( 

312 value=self.rtf_body.border_top, dimension=dim 

313 ).update_row(0, doc_border_top) 

314 else: 

315 if self.rtf_column_header[0].df is None and self.rtf_body.as_colheader: 

316 columns = [ 

317 col 

318 for col in self.df.columns 

319 if col not in (self.rtf_body.page_by or []) 

320 ] 

321 self.rtf_column_header[0].df = pd.DataFrame([columns]) 

322 self.rtf_column_header = self.rtf_column_header[:1] 

323 

324 self.rtf_column_header[0].border_top = BroadcastValue( 

325 value=self.rtf_column_header[0], dimension=dim 

326 ).update_row(0, doc_border_top) 

327 

328 rtf_column_header = [ 

329 self._rtf_column_header_encode(df=header.df, rtf_attrs=header) 

330 for header in self.rtf_column_header 

331 ] 

332 

333 self.rtf_body.border_top = BroadcastValue( 

334 value=self.rtf_body.border_top, dimension=dim 

335 ).update_row(0, page_border_top) 

336 self.rtf_body.border_bottom = BroadcastValue( 

337 value=self.rtf_body.border_bottom, dimension=dim 

338 ).update_row(dim[0] - 1, page_border_bottom) 

339 self.rtf_body.border_bottom = BroadcastValue( 

340 value=self.rtf_body.border_bottom, dimension=dim 

341 ).update_row(dim[0] - 1, doc_border_bottom) 

342 

343 # Body 

344 rtf_body = self._rtf_body_encode(df=self.df, rtf_attrs=self.rtf_body) 

345 

346 return "\n".join( 

347 [ 

348 self._rtf_start_encode(), 

349 self._rtf_font_table_encode(), 

350 "\n", 

351 self._rtf_page_encode(), 

352 self._rtf_page_margin_encode(), 

353 rtf_title, 

354 "\n", 

355 "\n".join( 

356 header for sublist in rtf_column_header for header in sublist 

357 ), 

358 "\n".join(rtf_body), 

359 "\n\n", 

360 "}", 

361 ] 

362 ) 

363 

364 def write_rtf(self, file_path: str) -> None: 

365 """Write the RTF code into a `.rtf` file.""" 

366 print(file_path) 

367 rtf_code = self.rtf_encode() 

368 with open(file_path, "w", encoding="utf-8") as f: 

369 f.write(rtf_code)