Coverage for src / rtflite / encoding / renderer.py: 86%

163 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-08 04:50 +0000

1from copy import deepcopy 

2from typing import Any 

3 

4import polars as pl 

5 

6from ..attributes import BroadcastValue 

7from ..pagination.strategies.base import PageContext 

8from ..services import RTFEncodingService 

9from ..services.document_service import RTFDocumentService 

10from ..services.figure_service import RTFFigureService 

11from ..type_guards import ( 

12 is_flat_header_list, 

13 is_nested_header_list, 

14 is_single_body, 

15 is_single_header, 

16) 

17 

18 

19class PageRenderer: 

20 """Renders a single PageContext into RTF string chunks.""" 

21 

22 def __init__(self): 

23 self.encoding_service = RTFEncodingService() 

24 self.document_service = RTFDocumentService() 

25 self.figure_service = RTFFigureService() 

26 

27 def render(self, document: Any, page: PageContext) -> list[str]: 

28 """Render a single page to RTF.""" 

29 

30 page_elements = [] 

31 

32 # 1. Page Break (except first page) 

33 if not page.is_first_page: 

34 page_elements.append(self.document_service.generate_page_break(document)) 

35 

36 # 2. Title 

37 if ( 

38 document.rtf_title 

39 and document.rtf_title.text 

40 and self._should_show(document.rtf_page.page_title, page) 

41 ): 

42 title_content = self.encoding_service.encode_title( 

43 document.rtf_title, method="line" 

44 ) 

45 if title_content: 

46 page_elements.append(title_content) 

47 page_elements.append("\n") 

48 

49 # 3. Subline 

50 if ( 

51 document.rtf_subline 

52 and document.rtf_subline.text 

53 and self._should_show( 

54 document.rtf_page.page_title, page 

55 ) # Using page_title rule for subline visibility as per original 

56 ): 

57 subline_content = self.encoding_service.encode_subline( 

58 document.rtf_subline, method="line" 

59 ) 

60 if subline_content: 

61 page_elements.append(subline_content) 

62 

63 # 4. Subline Header (from Strategy) 

64 if page.subline_header: 

65 subline_header_content = self._generate_subline_header(page.subline_header) 

66 if subline_header_content: 

67 page_elements.append(subline_header_content) 

68 

69 # 5. Figures (Position: Before) 

70 if ( 

71 document.rtf_figure 

72 and document.rtf_figure.figures 

73 and document.rtf_figure.fig_pos == "before" 

74 and page.is_first_page 

75 ): 

76 figure_content = self.figure_service.encode_figure(document.rtf_figure) 

77 if figure_content: 

78 page_elements.append(figure_content) 

79 page_elements.append("\n") 

80 

81 # 6. Column Headers 

82 if page.needs_header and document.rtf_column_header: 

83 header_elements = self._render_column_headers(document, page) 

84 page_elements.extend(header_elements) 

85 

86 # 7. Page By Spanning Row (Header) 

87 if ( 

88 is_single_body(document.rtf_body) 

89 and page.pageby_header_info 

90 and ( 

91 not document.rtf_body.new_page 

92 or document.rtf_body.pageby_row != "column" 

93 ) 

94 and "group_values" in page.pageby_header_info 

95 ): 

96 for col_name, val in page.pageby_header_info["group_values"].items(): 

97 if val is None: 

98 continue 

99 

100 # Find col index for attributes 

101 current_col_idx = 0 

102 if isinstance(document.df, pl.DataFrame): 

103 try: 

104 current_col_idx = document.df.columns.index(col_name) 

105 except ValueError: 

106 current_col_idx = 0 

107 

108 header_text = str(val) 

109 spanning_row = self.encoding_service.encode_spanning_row( 

110 text=header_text, 

111 page_width=document.rtf_page.col_width or 8.5, 

112 rtf_body_attrs=document.rtf_body, 

113 col_idx=current_col_idx, 

114 ) 

115 page_elements.extend(spanning_row) 

116 

117 # 8. Body (with potential internal group boundaries) 

118 body_elements = self._render_body(document, page) 

119 page_elements.extend(body_elements) 

120 

121 # 9. Footnotes 

122 if ( 

123 document.rtf_footnote 

124 and document.rtf_footnote.text 

125 and self._should_show(document.rtf_page.page_footnote, page) 

126 ): 

127 # Check for border override from processor 

128 border_style = page.component_borders.get("footnote") 

129 

130 footnote_content = self.encoding_service.encode_footnote( 

131 document.rtf_footnote, 

132 page.page_number, 

133 document.rtf_page.col_width, 

134 border_style=border_style, 

135 ) 

136 if footnote_content: 

137 page_elements.extend(footnote_content) 

138 

139 # 10. Sources 

140 if ( 

141 document.rtf_source 

142 and document.rtf_source.text 

143 and self._should_show(document.rtf_page.page_source, page) 

144 ): 

145 # Check for border override from processor 

146 border_style = page.component_borders.get("source") 

147 

148 source_content = self.encoding_service.encode_source( 

149 document.rtf_source, 

150 page.page_number, 

151 document.rtf_page.col_width, 

152 border_style=border_style, 

153 ) 

154 if source_content: 

155 page_elements.extend(source_content) 

156 

157 # 11. Figures (Position: After) 

158 if ( 

159 document.rtf_figure 

160 and document.rtf_figure.figures 

161 and document.rtf_figure.fig_pos == "after" 

162 and page.is_last_page 

163 ): 

164 figure_content = self.figure_service.encode_figure(document.rtf_figure) 

165 if figure_content: 

166 page_elements.append(figure_content) 

167 

168 return page_elements 

169 

170 def _should_show(self, location: str, page: PageContext) -> bool: 

171 if location == "all": 

172 return True 

173 if location == "first": 

174 return page.is_first_page 

175 if location == "last": 

176 return page.is_last_page 

177 return False 

178 

179 def _format_group_header(self, info: dict) -> str: 

180 if "group_values" in info: 

181 parts = [str(v) for v in info["group_values"].values() if v is not None] 

182 return ", ".join(parts) 

183 return "" 

184 

185 def _generate_subline_header(self, info: dict) -> str: 

186 text = self._format_group_header(info) 

187 if not text: 

188 return "" 

189 return rf"{{\pard\hyphpar\fi0\li0\ri0\ql\fs18{{\f0 {text}}}\par}}" 

190 

191 def _render_column_headers(self, document: Any, page: PageContext) -> list[str]: 

192 # Similar logic to PaginatedStrategy.encode header section 

193 

194 header_elements = [] 

195 headers_to_process = [] 

196 

197 if is_nested_header_list(document.rtf_column_header): 

198 for section in document.rtf_column_header: 

199 if section: 

200 headers_to_process.extend(section) 

201 elif is_flat_header_list(document.rtf_column_header): 

202 headers_to_process = document.rtf_column_header 

203 elif is_single_header(document.rtf_column_header): 

204 headers_to_process = [document.rtf_column_header] 

205 

206 for i, header in enumerate(headers_to_process): 

207 if header is None: 

208 continue 

209 header_copy = deepcopy(header) 

210 

211 # Auto-populate header text from columns if missing and as_colheader is True 

212 if ( 

213 header_copy.text is None 

214 and is_single_body(document.rtf_body) 

215 and document.rtf_body.as_colheader 

216 ): 

217 # Use processed page data columns 

218 page_df = page.data 

219 if isinstance(page_df, pl.DataFrame): 

220 columns = list(page_df.columns) 

221 header_df = pl.DataFrame( 

222 [columns], 

223 schema=[f"col_{j}" for j in range(len(columns))], 

224 orient="row", 

225 ) 

226 header_copy.text = header_df # type: ignore[assignment] 

227 

228 # Adjust col_rel_width if needed (logic from PaginatedStrategy) 

229 # Since we are using page.data which is already sliced/processed, 

230 # Might need to adjust widths if defined for full table. 

231 if document.rtf_body.col_rel_width is not None: 

232 # If body has specific widths, try to map them. 

233 # If header text exists, proceed. 

234 pass 

235 

236 # Remove columns if necessary (page_by/subline_by) 

237 # Note: page.data already has columns removed if populated from it. 

238 # Filter only if text is from original document with extra columns. 

239 # Since we simplified text to be a list, we can't easily filter by name 

240 # unless we assume order or have metadata. 

241 # For now, we assume header text matches the current page columns. 

242 pass 

243 

244 # Apply top border for first page/first header 

245 if ( 

246 page.is_first_page 

247 and i == 0 

248 and document.rtf_page.border_first 

249 and header_copy.text is not None 

250 ): 

251 if isinstance(header_copy.text, pl.DataFrame): 

252 dims = header_copy.text.shape 

253 else: 

254 dims = (1, len(header_copy.text) if header_copy.text else 0) 

255 

256 header_copy.border_top = BroadcastValue( 

257 value=header_copy.border_top, dimension=dims 

258 ).update_row(0, [document.rtf_page.border_first] * dims[1]) 

259 

260 header_rtf = self.encoding_service.encode_column_header( 

261 header_copy.text, header_copy, document.rtf_page.col_width 

262 ) 

263 header_elements.extend(header_rtf) 

264 

265 return header_elements 

266 

267 def _render_body(self, document: Any, page: PageContext) -> list[str]: 

268 page_attrs = page.final_body_attrs or page.table_attrs or document.rtf_body 

269 page_df = page.data 

270 col_widths = page.col_widths 

271 

272 elements: list[str] = [] 

273 

274 # Check for internal group boundaries 

275 if ( 

276 is_single_body(document.rtf_body) 

277 and page.group_boundaries 

278 and ( 

279 not document.rtf_body.new_page 

280 or document.rtf_body.pageby_row != "column" 

281 ) 

282 ): 

283 # Find col idx for spanning 

284 if document.rtf_body.page_by and isinstance(document.df, pl.DataFrame): 

285 # Just check if column exists, index not strictly needed here 

286 # as we iterate page_by_cols later 

287 pass 

288 

289 # Initialize last_values from page header info to track state 

290 last_values = {} 

291 if page.pageby_header_info and "group_values" in page.pageby_header_info: 

292 last_values = page.pageby_header_info["group_values"].copy() 

293 

294 prev_row = 0 

295 for boundary in page.group_boundaries: 

296 page_rel_row = boundary["page_relative_row"] 

297 

298 if page_rel_row > prev_row: 

299 segment = page_df[prev_row:page_rel_row] 

300 # Use internal _encode method (attributes already finalized). 

301 # Note: We need to ensure page_attrs is the TableAttributes object 

302 elements.extend( 

303 page_attrs._encode(segment, col_widths, row_offset=prev_row) 

304 ) 

305 

306 # Spanning Row (Nested) 

307 if "group_values" in boundary: 

308 new_values = boundary["group_values"] 

309 force_render = False 

310 

311 # Iterate in order of page_by columns to handle hierarchy 

312 page_by_cols = document.rtf_body.page_by or [] 

313 

314 for col_name in page_by_cols: 

315 val = new_values.get(col_name) 

316 last_val = last_values.get(col_name) 

317 

318 if val is None: 

319 continue 

320 

321 # Check for change 

322 # If a higher level changed (force_render), 

323 # we must render this level too. 

324 if str(val) != str(last_val) or force_render: 

325 force_render = True 

326 

327 # Find col index for attributes 

328 current_col_idx = 0 

329 if isinstance(document.df, pl.DataFrame): 

330 try: 

331 current_col_idx = document.df.columns.index( 

332 col_name 

333 ) 

334 except ValueError: 

335 current_col_idx = 0 

336 

337 header_text = str(val) 

338 spanning = self.encoding_service.encode_spanning_row( 

339 text=header_text, 

340 page_width=document.rtf_page.col_width or 8.5, 

341 rtf_body_attrs=document.rtf_body, 

342 col_idx=current_col_idx, 

343 ) 

344 elements.extend(spanning) 

345 

346 # Update state 

347 last_values.update(new_values) 

348 

349 prev_row = page_rel_row 

350 

351 if prev_row < len(page_df): 

352 segment = page_df[prev_row:] 

353 elements.extend( 

354 page_attrs._encode(segment, col_widths, row_offset=prev_row) 

355 ) 

356 else: 

357 # Simple body render 

358 elements.extend(page_attrs._encode(page_df, col_widths, row_offset=0)) 

359 

360 return elements