Coverage for src / rtflite / services / document_service.py: 89%

195 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-28 05:09 +0000

1"""RTF Document Service - handles all document-level operations.""" 

2 

3from collections.abc import Mapping, Sequence 

4from typing import Any 

5 

6 

7class RTFDocumentService: 

8 """Service for handling RTF document operations including pagination and layout.""" 

9 

10 def __init__(self): 

11 from .encoding_service import RTFEncodingService 

12 

13 self.encoding_service = RTFEncodingService() 

14 

15 def calculate_additional_rows_per_page(self, document) -> int: 

16 """Calculate additional rows needed per page for headers, footnotes, sources.""" 

17 additional_rows = 0 

18 

19 # Count subline_by header (appears on each page) 

20 if document.rtf_body.subline_by: 

21 additional_rows += 1 # Each subline_by header consumes 1 row 

22 

23 # Count column headers (repeat on each page) 

24 if document.rtf_column_header: 

25 # Handle nested column headers for multi-section documents 

26 if isinstance(document.rtf_column_header[0], list): 

27 # Nested format: count all non-None headers across all sections 

28 for section_headers in document.rtf_column_header: 

29 if section_headers: # Skip [None] sections 

30 for header in section_headers: 

31 if header and header.text is not None: 

32 additional_rows += 1 

33 else: 

34 # Flat format: original logic 

35 for header in document.rtf_column_header: 

36 if header.text is not None: 

37 additional_rows += 1 

38 

39 # Count footnote rows 

40 if document.rtf_footnote and document.rtf_footnote.text: 

41 additional_rows += 1 

42 

43 # Count source rows 

44 if document.rtf_source and document.rtf_source.text: 

45 additional_rows += 1 

46 

47 return additional_rows 

48 

49 def needs_pagination(self, document) -> bool: 

50 """Check if document needs pagination based on content size and page limits.""" 

51 

52 # Multiple figures always need pagination (each figure on separate page) 

53 if document.rtf_figure and document.rtf_figure.figures: 

54 # Check if multiple figures are provided 

55 figures = document.rtf_figure.figures 

56 if isinstance(figures, (list, tuple)) and len(figures) > 1: 

57 return True 

58 

59 # Figure-only documents don't need pagination beyond multi-figure handling above 

60 if document.df is None: 

61 return False 

62 

63 # Handle multi-section documents 

64 if isinstance(document.df, list): 

65 # Check if any section needs pagination 

66 for body in document.rtf_body: 

67 # Use PaginatedStrategy when page_by or subline_by is set 

68 # (page_by requires spanning row logic, which is in PaginatedStrategy) 

69 if body.page_by or body.subline_by: 

70 return True 

71 # For now, multi-section documents use single page strategy 

72 return False 

73 else: 

74 # Single section document 

75 # Use PaginatedStrategy when page_by or subline_by is set 

76 if document.rtf_body.page_by or document.rtf_body.subline_by: 

77 return True 

78 

79 # Create pagination instance to calculate rows needed 

80 from ..pagination import PageBreakCalculator, RTFPagination 

81 

82 pagination = RTFPagination( 

83 page_width=document.rtf_page.width, 

84 page_height=document.rtf_page.height, 

85 margin=document.rtf_page.margin, 

86 nrow=document.rtf_page.nrow, 

87 orientation=document.rtf_page.orientation, 

88 ) 

89 

90 calculator = PageBreakCalculator(pagination=pagination) 

91 from ..row import Utils 

92 

93 col_total_width = document.rtf_page.col_width 

94 

95 # Handle multi-section vs single section for column widths 

96 if isinstance(document.df, list): 

97 # Use first section for pagination calculation 

98 col_widths = Utils._col_widths( 

99 document.rtf_body[0].col_rel_width, col_total_width 

100 ) 

101 # Calculate rows needed for all sections combined 

102 total_content_rows: list[Any] = [] 

103 for df, body in zip(document.df, document.rtf_body, strict=True): 

104 section_col_widths = Utils._col_widths( 

105 body.col_rel_width, col_total_width 

106 ) 

107 section_content_rows = calculator.calculate_content_rows( 

108 df, section_col_widths, body 

109 ) 

110 total_content_rows.extend(section_content_rows) 

111 content_rows = total_content_rows 

112 else: 

113 col_widths = Utils._col_widths( 

114 document.rtf_body.col_rel_width, col_total_width 

115 ) 

116 # Calculate rows needed for data content only 

117 content_rows = list( 

118 calculator.calculate_content_rows( 

119 document.df, col_widths, document.rtf_body 

120 ) 

121 ) 

122 

123 # Calculate additional rows per page 

124 additional_rows_per_page = self.calculate_additional_rows_per_page(document) 

125 

126 # Calculate how many data rows can fit per page 

127 data_rows = sum(content_rows) 

128 available_data_rows_per_page = max( 

129 1, document.rtf_page.nrow - additional_rows_per_page 

130 ) 

131 

132 # If we can't fit even the additional components, we definitely need pagination 

133 if additional_rows_per_page >= document.rtf_page.nrow: 

134 return True 

135 

136 # Check if data rows exceed what can fit on a single page 

137 return data_rows > available_data_rows_per_page 

138 

139 def create_pagination_instance(self, document) -> tuple: 

140 """Create pagination and content distributor instances.""" 

141 from ..pagination import ContentDistributor, PageBreakCalculator, RTFPagination 

142 

143 pagination = RTFPagination( 

144 page_width=document.rtf_page.width, 

145 page_height=document.rtf_page.height, 

146 margin=document.rtf_page.margin, 

147 nrow=document.rtf_page.nrow, 

148 orientation=document.rtf_page.orientation, 

149 ) 

150 

151 calculator = PageBreakCalculator(pagination=pagination) 

152 distributor = ContentDistributor(pagination=pagination, calculator=calculator) 

153 

154 return pagination, distributor 

155 

156 def generate_page_break(self, document) -> str: 

157 """Generate proper RTF page break sequence.""" 

158 return self.encoding_service.encode_page_break( 

159 document.rtf_page, 

160 lambda: self.encoding_service.encode_page_margin(document.rtf_page), 

161 ) 

162 

163 def should_show_element_on_page( 

164 self, element_location: str, page_info: dict 

165 ) -> bool: 

166 """Determine if an element should be shown on a specific page.""" 

167 if element_location == "all": 

168 return True 

169 elif element_location == "first": 

170 return page_info["is_first_page"] 

171 elif element_location == "last": 

172 return page_info["is_last_page"] 

173 else: 

174 return False 

175 

176 def process_page_by( 

177 self, document 

178 ) -> Sequence[Sequence[tuple[int, int, int]]] | None: 

179 """Create components for page_by format.""" 

180 # Obtain input data 

181 data = document.df.to_dicts() 

182 var = document.rtf_body.page_by 

183 

184 # Handle empty DataFrame 

185 if len(data) == 0: 

186 return None 

187 

188 # Obtain column names and dimensions 

189 columns = list(data[0].keys()) 

190 

191 if var is None: 

192 return None 

193 

194 def get_column_index(column_name: str) -> int: 

195 """Get the index of a column in the column list.""" 

196 return columns.index(column_name) 

197 

198 def get_matching_rows(group_values: Mapping) -> Sequence[int]: 

199 """Get row indices that match the group values.""" 

200 return [ 

201 i 

202 for i, row in enumerate(data) 

203 if all(row[k] == v for k, v in group_values.items()) 

204 ] 

205 

206 def get_unique_combinations(variables: Sequence[str]) -> Sequence[Mapping]: 

207 """Get unique combinations of values for the specified variables.""" 

208 seen = set() 

209 unique = [] 

210 for row in data: 

211 key = tuple(row[v] for v in variables) 

212 if key not in seen: 

213 seen.add(key) 

214 unique.append({v: row[v] for v in variables}) 

215 return unique 

216 

217 output = [] 

218 prev_values = {v: None for v in var} 

219 

220 # Process each unique combination of grouping variables 

221 for group in get_unique_combinations(var): 

222 indices = get_matching_rows(group) 

223 

224 # Handle headers for each level 

225 for level, var_name in enumerate(var): 

226 current_val = group[var_name] 

227 

228 need_header = False 

229 if level == len(var) - 1: 

230 need_header = True 

231 else: 

232 for lvl in range(level + 1): 

233 if group[var[lvl]] != prev_values[var[lvl]]: 

234 need_header = True 

235 break 

236 

237 if need_header and current_val != "-----": 

238 col_idx = get_column_index(var_name) 

239 # Add level information as third element in tuple 

240 output.append([(indices[0], col_idx, level)]) 

241 

242 prev_values[var_name] = current_val 

243 

244 # Handle data rows 

245 for index in indices: 

246 output.append( 

247 [ 

248 (index, j, len(var)) 

249 for j in range(len(columns)) 

250 if columns[j] not in var 

251 ] 

252 ) 

253 

254 return output 

255 

256 def apply_pagination_borders( 

257 self, document, rtf_attrs, page_info: dict, total_pages: int 

258 ): 

259 """Apply proper borders for paginated context following r2rtf design: 

260 

261 rtf_page.border_first/last: Controls borders for the entire table 

262 rtf_body.border_first/last: Controls borders for each page 

263 rtf_body.border_top/bottom: Controls borders for individual cells 

264 

265 Logic: 

266 - First page, first row: apply rtf_page.border_first (overrides 

267 rtf_body.border_first) 

268 - Last page, last row: apply rtf_page.border_last (overrides 

269 rtf_body.border_last) 

270 - Non-first pages, first row: apply rtf_body.border_first 

271 - Non-last pages, last row: apply rtf_body.border_last 

272 - All other rows: use existing border_top/bottom from rtf_body 

273 """ 

274 from copy import deepcopy 

275 

276 # Create a deep copy of the attributes to avoid modifying the original 

277 page_attrs = deepcopy(rtf_attrs) 

278 page_df_height = page_info["data"].height 

279 page_df_width = page_info["data"].width 

280 page_shape = (page_df_height, page_df_width) 

281 

282 if page_df_height == 0: 

283 return page_attrs 

284 

285 # Clear border_first and border_last from being broadcast to all rows 

286 # These should only apply to specific rows based on pagination logic 

287 if hasattr(page_attrs, "border_first") and page_attrs.border_first: 

288 # Don't use border_first in pagination - it's handled separately 

289 page_attrs.border_first = None 

290 

291 if hasattr(page_attrs, "border_last") and page_attrs.border_last: 

292 # Don't use border_last in pagination - it's handled separately 

293 page_attrs.border_last = None 

294 

295 # Ensure border_top and border_bottom are properly sized for this page 

296 if not page_attrs.border_top: 

297 page_attrs.border_top = [ 

298 [""] * page_df_width for _ in range(page_df_height) 

299 ] 

300 if not page_attrs.border_bottom: 

301 page_attrs.border_bottom = [ 

302 [""] * page_df_width for _ in range(page_df_height) 

303 ] 

304 

305 # Apply borders based on page position 

306 # For first page: only apply rtf_page.border_first to table body 

307 # if NO column headers 

308 has_column_headers = ( 

309 document.rtf_column_header and len(document.rtf_column_header) > 0 

310 ) 

311 if ( 

312 page_info["is_first_page"] 

313 and not has_column_headers 

314 and document.rtf_page.border_first 

315 ): 

316 # Apply border to all cells in the first row 

317 for col_idx in range(page_df_width): 

318 page_attrs = self._apply_border_to_cell( 

319 page_attrs, 

320 0, 

321 col_idx, 

322 "top", 

323 document.rtf_page.border_first, 

324 page_shape, 

325 ) 

326 

327 # For first page with column headers: ensure consistent border style 

328 if ( 

329 page_info["is_first_page"] 

330 and has_column_headers 

331 and document.rtf_body.border_first 

332 ): 

333 # Apply same border style as non-first pages to maintain consistency 

334 border_style = ( 

335 document.rtf_body.border_first[0][0] 

336 if isinstance(document.rtf_body.border_first, list) 

337 else document.rtf_body.border_first 

338 ) 

339 # Apply single border style to first data row (same as other pages) 

340 for col_idx in range(page_df_width): 

341 page_attrs = self._apply_border_to_cell( 

342 page_attrs, 0, col_idx, "top", border_style, page_shape 

343 ) 

344 

345 # Apply page-level borders for non-first/last pages 

346 if not page_info["is_first_page"] and document.rtf_body.border_first: 

347 # Apply border_first to first row of non-first pages 

348 border_style = ( 

349 document.rtf_body.border_first[0][0] 

350 if isinstance(document.rtf_body.border_first, list) 

351 else document.rtf_body.border_first 

352 ) 

353 for col_idx in range(page_df_width): 

354 page_attrs = self._apply_border_to_cell( 

355 page_attrs, 0, col_idx, "top", border_style, page_shape 

356 ) 

357 

358 # Check if footnotes or sources will appear on this page 

359 has_footnote_on_page = ( 

360 document.rtf_footnote 

361 and document.rtf_footnote.text 

362 and self.should_show_element_on_page( 

363 document.rtf_page.page_footnote, page_info 

364 ) 

365 ) 

366 has_source_on_page = ( 

367 document.rtf_source 

368 and document.rtf_source.text 

369 and self.should_show_element_on_page( 

370 document.rtf_page.page_source, page_info 

371 ) 

372 ) 

373 

374 # Determine if footnotes/sources appear as tables on the last page 

375 # This is crucial for border placement when components are set to "first" only 

376 footnote_as_table_on_last = ( 

377 document.rtf_footnote 

378 and document.rtf_footnote.text 

379 and getattr(document.rtf_footnote, "as_table", True) 

380 and document.rtf_page.page_footnote in ("last", "all") 

381 ) 

382 source_as_table_on_last = ( 

383 document.rtf_source 

384 and document.rtf_source.text 

385 and getattr(document.rtf_source, "as_table", False) 

386 and document.rtf_page.page_source in ("last", "all") 

387 ) 

388 

389 # Apply border logic based on page position and footnote/source presence 

390 if not page_info["is_last_page"]: 

391 # Non-last pages: apply single border after footnote/source, or 

392 # after data if no footnote/source 

393 if document.rtf_body.border_last: 

394 border_style = ( 

395 document.rtf_body.border_last[0][0] 

396 if isinstance(document.rtf_body.border_last, list) 

397 else document.rtf_body.border_last 

398 ) 

399 

400 if not (has_footnote_on_page or has_source_on_page): 

401 # No footnote/source: apply border to last data row 

402 for col_idx in range(page_df_width): 

403 page_attrs = self._apply_border_to_cell( 

404 page_attrs, 

405 page_df_height - 1, 

406 col_idx, 

407 "bottom", 

408 border_style, 

409 page_shape, 

410 ) 

411 else: 

412 # Has footnote/source: apply border_last from RTFBody 

413 self._apply_footnote_source_borders( 

414 document, page_info, border_style, is_last_page=False 

415 ) 

416 

417 else: # is_last_page 

418 # Last page: check if we should apply border to data or footnote/source 

419 if document.rtf_page.border_last: 

420 # Check if this page contains the absolute last row 

421 total_rows = document.df.height 

422 is_absolute_last_row = page_info["end_row"] == total_rows - 1 

423 

424 if is_absolute_last_row: 

425 # If footnotes/sources are set to "first" only and appear as tables, 

426 # they won't be on the last page, so apply border to last data row 

427 if not (footnote_as_table_on_last or source_as_table_on_last): 

428 # No footnote/source on last page: apply border to last data row 

429 last_row_idx = page_df_height - 1 

430 for col_idx in range(page_df_width): 

431 page_attrs = self._apply_border_to_cell( 

432 page_attrs, 

433 last_row_idx, 

434 col_idx, 

435 "bottom", 

436 document.rtf_page.border_last, 

437 page_shape, 

438 ) 

439 else: 

440 # Has footnote/source on last page: set border for 

441 # footnote/source 

442 self._apply_footnote_source_borders( 

443 document, 

444 page_info, 

445 document.rtf_page.border_last, 

446 is_last_page=True, 

447 ) 

448 

449 return page_attrs 

450 

451 def _apply_footnote_source_borders( 

452 self, document, page_info: dict, border_style: str, is_last_page: bool 

453 ): 

454 """Apply borders to footnote and source components based on page position.""" 

455 # Determine which component should get the border 

456 has_footnote = ( 

457 document.rtf_footnote 

458 and document.rtf_footnote.text 

459 and self.should_show_element_on_page( 

460 document.rtf_page.page_footnote, page_info 

461 ) 

462 ) 

463 has_source = ( 

464 document.rtf_source 

465 and document.rtf_source.text 

466 and self.should_show_element_on_page( 

467 document.rtf_page.page_source, page_info 

468 ) 

469 ) 

470 

471 # Apply border to components based on as_table setting 

472 # Priority: Source with as_table=True > Footnote with as_table=True > 

473 # any component 

474 target_component = None 

475 

476 # Extract as_table values (now stored as booleans) 

477 footnote_as_table = None 

478 if has_footnote: 

479 footnote_as_table = getattr(document.rtf_footnote, "as_table", True) 

480 

481 source_as_table = None 

482 if has_source: 

483 source_as_table = getattr(document.rtf_source, "as_table", False) 

484 

485 if has_source and source_as_table: 

486 # Source is rendered as table: prioritize source for borders 

487 target_component = ("source", document.rtf_source) 

488 elif has_footnote and footnote_as_table: 

489 # Footnote is rendered as table: use footnote for borders 

490 target_component = ("footnote", document.rtf_footnote) 

491 # Note: Removed fallback to plain text components - borders should only 

492 # apply to components rendered as tables (as_table=True) 

493 

494 if target_component: 

495 component_name, component = target_component 

496 if not hasattr(component, "_page_border_style"): 

497 component._page_border_style = {} 

498 component._page_border_style[page_info["page_number"]] = border_style 

499 

500 def _apply_border_to_cell( 

501 self, 

502 page_attrs, 

503 row_idx: int, 

504 col_idx: int, 

505 border_side: str, 

506 border_style: str, 

507 page_shape: tuple, 

508 ): 

509 """Apply specified border style to a specific cell using BroadcastValue""" 

510 from ..attributes import BroadcastValue 

511 

512 border_attr = f"border_{border_side}" 

513 

514 if not hasattr(page_attrs, border_attr): 

515 return page_attrs 

516 

517 # Get current border values 

518 current_borders = getattr(page_attrs, border_attr) 

519 

520 # Create BroadcastValue to expand borders to page shape 

521 border_broadcast = BroadcastValue(value=current_borders, dimension=page_shape) 

522 

523 # Update the specific cell 

524 border_broadcast.update_cell(row_idx, col_idx, border_style) 

525 

526 # Update the attribute with the expanded value 

527 setattr(page_attrs, border_attr, border_broadcast.value) 

528 return page_attrs