Coverage for src / rtflite / encoding / unified_encoder.py: 95%
194 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-08 04:50 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-08 04:50 +0000
1from __future__ import annotations
3from typing import Any
5import polars as pl
7from rtflite import RTFDocument
9from ..attributes import BroadcastValue
10from ..input import RTFBody
11from ..pagination.processor import PageFeatureProcessor
12from ..pagination.strategies import PageContext, PaginationContext, StrategyRegistry
13from ..pagination.strategies.defaults import DefaultPaginationStrategy
14from ..pagination.strategies.grouping import PageByStrategy, SublineStrategy
15from ..row import Utils
16from ..services import RTFEncodingService
17from ..services.color_service import color_service
18from ..services.document_service import RTFDocumentService
19from ..services.figure_service import RTFFigureService
20from ..services.grouping_service import grouping_service
21from ..type_guards import is_single_body
22from .base import EncodingStrategy
23from .renderer import PageRenderer
26class UnifiedRTFEncoder(EncodingStrategy):
27 """Unified RTF Encoder using the strategy pattern for pagination and rendering."""
29 def __init__(self):
30 self.encoding_service = RTFEncodingService()
31 self.document_service = RTFDocumentService()
32 self.figure_service = RTFFigureService()
33 self.feature_processor = PageFeatureProcessor()
34 self.renderer = PageRenderer()
36 # Register strategies (if not already registered elsewhere)
37 # Ideally this happens at app startup, but for now we ensure they are available
38 StrategyRegistry.register("default", DefaultPaginationStrategy)
39 StrategyRegistry.register("page_by", PageByStrategy)
40 StrategyRegistry.register("subline", SublineStrategy)
42 def _encode_body_section(
43 self, document: RTFDocument, df: Any, rtf_body: Any
44 ) -> list[str]:
45 """Encode a single body section using the unified pipeline.
47 Args:
48 document: The RTF document context
49 df: DataFrame for this section
50 rtf_body: RTFBody attributes for this section
52 Returns:
53 List of RTF strings (rendered pages/rows)
54 """
56 # A. Prepare Data
57 processed_df, original_df, processed_attrs = (
58 self.encoding_service.prepare_dataframe_for_body_encoding(df, rtf_body)
59 )
61 # B. Select Strategy
62 strategy_name = "default"
63 if is_single_body(rtf_body):
64 if rtf_body.subline_by:
65 strategy_name = "subline"
66 elif rtf_body.page_by:
67 strategy_name = "page_by"
69 strategy_cls = StrategyRegistry.get(strategy_name)
70 strategy = strategy_cls()
72 # C. Prepare Context
73 col_total_width = document.rtf_page.col_width
74 if is_single_body(rtf_body) and processed_attrs.col_rel_width:
75 col_widths = Utils._col_widths(
76 processed_attrs.col_rel_width,
77 col_total_width if col_total_width is not None else 8.5,
78 )
79 else:
80 col_widths = Utils._col_widths(
81 [1] * processed_df.shape[1],
82 col_total_width if col_total_width is not None else 8.5,
83 )
85 additional_rows = self.document_service.calculate_additional_rows_per_page(
86 document
87 )
89 # Calculate removed column indices
90 # Calculate removed column indices
91 removed_column_indices = []
92 # Ensure we are working with a DataFrame and RTFBody for single section encoding
93 if isinstance(original_df, pl.DataFrame) and isinstance(rtf_body, RTFBody):
94 if processed_df.shape[1] < original_df.shape[1]:
95 # Find indices of columns that were removed
96 # We assume columns are removed, not reordered significantly enough to
97 # break this simple check for the purpose of pagination context
98 processed_cols = set(processed_df.columns)
99 for i, col in enumerate(original_df.columns):
100 if col not in processed_cols:
101 removed_column_indices.append(i)
103 pagination_ctx = PaginationContext(
104 df=original_df, # Use original DF for context
105 rtf_body=rtf_body,
106 rtf_page=document.rtf_page,
107 col_widths=col_widths,
108 table_attrs=processed_attrs,
109 additional_rows_per_page=additional_rows,
110 removed_column_indices=removed_column_indices,
111 )
112 else:
113 # Fallback or error for unexpected types in this context
114 # Should not happen given is_single_body checks usually
115 pagination_ctx = PaginationContext(
116 df=processed_df,
117 rtf_body=processed_attrs, # Best effort fallback
118 rtf_page=document.rtf_page,
119 col_widths=col_widths,
120 table_attrs=processed_attrs,
121 additional_rows_per_page=additional_rows,
122 )
124 # D. Paginate
126 pages = strategy.paginate(pagination_ctx)
128 # Handle case where no pages are generated (e.g. empty dataframe)
129 if not pages:
130 # Create empty page to ensure document structure (title, etc.) is rendered.
131 pages = [
132 PageContext(
133 page_number=1,
134 total_pages=1,
135 data=processed_df,
136 is_first_page=True,
137 is_last_page=True,
138 col_widths=col_widths,
139 needs_header=True,
140 table_attrs=processed_attrs,
141 )
142 ]
144 # Post-pagination fixup
145 if is_single_body(rtf_body):
146 self._apply_data_post_processing(pages, processed_df, rtf_body)
148 # E. Process & Render Pages
149 section_rtf_chunks = []
151 for _i, page in enumerate(pages):
152 # Process features (borders, etc.)
153 processed_page = self.feature_processor.process(document, page)
155 # Render
156 chunks = self.renderer.render(document, processed_page)
157 section_rtf_chunks.extend(chunks)
159 # Add page break between pages (except last page)
160 # Note: PageRenderer handles page breaks at the start of non-first pages.
161 # So we do NOT add them here to avoid double breaks.
162 pass
164 return section_rtf_chunks
166 def encode(self, document: Any) -> str:
167 """Encode the document using the unified pipeline."""
169 # 1. Figure-only handling
170 if document.df is None:
171 return self._encode_figure_only(document)
173 # 2. Multi-section handling
174 if isinstance(document.df, list):
175 return self._encode_multi_section(document)
177 # 3. Standard Pipeline
178 color_service.set_document_context(document)
180 page_rtf_chunks = self._encode_body_section(
181 document, document.df, document.rtf_body
182 )
184 # F. Assembly
185 result = "\n".join(
186 [
187 item
188 for item in [
189 self.encoding_service.encode_document_start(),
190 self.encoding_service.encode_font_table(),
191 self.encoding_service.encode_color_table(document),
192 "\n",
193 self.encoding_service.encode_page_header(
194 document.rtf_page_header, method="line"
195 ),
196 self.encoding_service.encode_page_footer(
197 document.rtf_page_footer, method="line"
198 ),
199 self.encoding_service.encode_page_settings(document.rtf_page),
200 "\n".join(page_rtf_chunks),
201 "\n\n",
202 "}",
203 ]
204 if item is not None
205 ]
206 )
208 color_service.clear_document_context()
209 return result
211 def _apply_data_post_processing(self, pages, processed_df, rtf_body):
212 """Sync page data with processed dataframe and handle group_by restoration."""
213 # 1. Replace data slices
214 # We assume the pagination strategy preserved the row order and counts
215 # matching the processed_df (which corresponds to the original df structure
216 # minus excluded columns).
217 current_idx = 0
218 for page in pages:
219 rows = page.data.height
220 page.data = processed_df.slice(current_idx, rows)
221 current_idx += rows
223 # 2. Re-implementation of group_by logic
224 if rtf_body.group_by:
225 # Collect page start indices for context restoration
226 page_start_indices = []
227 cumulative = 0
228 for i, p in enumerate(pages):
229 if i > 0:
230 page_start_indices.append(cumulative)
231 cumulative += p.data.height
233 full_df = processed_df
235 suppressed = grouping_service.enhance_group_by(full_df, rtf_body.group_by)
236 restored = grouping_service.restore_page_context(
237 suppressed, full_df, rtf_body.group_by, page_start_indices
238 )
240 curr = 0
241 for p in pages:
242 rows = p.data.height
243 p.data = restored.slice(curr, rows)
244 curr += rows
246 def _encode_figure_only(self, document: RTFDocument):
247 """Encode a figure-only document."""
248 from copy import deepcopy
250 from ..figure import rtf_read_figure
252 if not document.rtf_figure or not document.rtf_figure.figures:
253 return ""
255 figs, formats = rtf_read_figure(document.rtf_figure.figures)
256 num = len(figs)
258 # Pre-calculate shared elements
259 title = self.encoding_service.encode_title(document.rtf_title, method="line")
261 # For figure-only documents, footnote should be as_table=False
262 footnote_component = document.rtf_footnote
263 if footnote_component is not None:
264 footnote_component = deepcopy(footnote_component)
265 footnote_component.as_table = False
267 # Determine which elements should show on each page
268 show_title_on_all = document.rtf_page.page_title == "all"
269 show_footnote_on_all = document.rtf_page.page_footnote == "all"
270 show_source_on_all = document.rtf_page.page_source == "all"
272 # Build
273 parts = [
274 self.encoding_service.encode_document_start(),
275 self.encoding_service.encode_font_table(),
276 self.encoding_service.encode_color_table(document),
277 "\n",
278 self.encoding_service.encode_page_header(
279 document.rtf_page_header, method="line"
280 ),
281 self.encoding_service.encode_page_footer(
282 document.rtf_page_footer, method="line"
283 ),
284 self.encoding_service.encode_page_settings(document.rtf_page),
285 ]
287 for i in range(num):
288 is_first = i == 0
289 is_last = i == num - 1
291 # Title
292 if (
293 show_title_on_all
294 or (document.rtf_page.page_title == "first" and is_first)
295 or (document.rtf_page.page_title == "last" and is_last)
296 ):
297 parts.append(title)
298 parts.append("\n")
300 # Subline
301 if is_first and document.rtf_subline:
302 parts.append(
303 self.encoding_service.encode_subline(
304 document.rtf_subline, method="line"
305 )
306 )
308 # Figure
309 w = self.figure_service._get_dimension(document.rtf_figure.fig_width, i)
310 h = self.figure_service._get_dimension(document.rtf_figure.fig_height, i)
311 parts.append(
312 self.figure_service._encode_single_figure(
313 figs[i], formats[i], w, h, document.rtf_figure.fig_align
314 )
315 )
316 parts.append(r"\par ")
318 # Footnote based on page settings
319 if footnote_component is not None and (
320 show_footnote_on_all
321 or (document.rtf_page.page_footnote == "first" and is_first)
322 or (document.rtf_page.page_footnote == "last" and is_last)
323 ):
324 footnote_content = "\n".join(
325 self.encoding_service.encode_footnote(
326 footnote_component,
327 page_number=i + 1,
328 page_col_width=document.rtf_page.col_width,
329 )
330 )
331 if footnote_content:
332 parts.append(footnote_content)
334 # Source based on page settings
335 if document.rtf_source is not None and (
336 show_source_on_all
337 or (document.rtf_page.page_source == "first" and is_first)
338 or (document.rtf_page.page_source == "last" and is_last)
339 ):
340 source_content = "\n".join(
341 self.encoding_service.encode_source(
342 document.rtf_source,
343 page_number=i + 1,
344 page_col_width=document.rtf_page.col_width,
345 )
346 )
347 if source_content:
348 parts.append(source_content)
350 if not is_last:
351 parts.append(r"\page ")
353 parts.append("\n\n}")
354 return "".join([p for p in parts if p])
356 def _encode_multi_section(self, document: RTFDocument) -> str:
357 """Encode a multi-section document where sections are concatenated row by row.
359 Args:
360 document: The RTF document with multiple df/rtf_body sections
362 Returns:
363 Complete RTF string
364 """
366 from ..type_guards import is_nested_header_list
368 # Calculate column counts for border management
369 if isinstance(document.df, list):
370 first_section_cols = document.df[0].shape[1] if document.df else 0
371 else:
372 first_section_cols = document.df.shape[1] if document.df is not None else 0
374 # Document structure components
375 # rtf_title is handled per section via temp_document and renderer
376 # so we don't need to pre-calculate it here.
378 # Handle page borders (use first section for dimensions)
379 # doc_border_top is not used in this scope
380 doc_border_bottom_list = BroadcastValue(
381 value=document.rtf_page.border_last, dimension=(1, first_section_cols)
382 ).to_list()
383 doc_border_bottom = (
384 doc_border_bottom_list[0] if doc_border_bottom_list else None
385 )
387 # Encode sections
388 all_section_content = []
389 is_nested_headers = is_nested_header_list(document.rtf_column_header)
391 df_list = (
392 document.df
393 if isinstance(document.df, list)
394 else [document.df]
395 if document.df is not None
396 else []
397 )
398 body_list = (
399 document.rtf_body
400 if isinstance(document.rtf_body, list)
401 else [document.rtf_body]
402 if document.rtf_body is not None
403 else []
404 )
406 for i, (section_df, section_body) in enumerate(
407 zip(df_list, body_list, strict=True)
408 ):
409 # Determine column headers for this section
410 section_headers_obj = None
411 if is_nested_headers:
412 if isinstance(document.rtf_column_header, list) and i < len(
413 document.rtf_column_header
414 ):
415 section_headers_obj = document.rtf_column_header[i]
416 else:
417 # Flat format - only apply to first section
418 if i == 0:
419 section_headers_obj = document.rtf_column_header
421 # Create a temporary document for this section
422 # We need to adjust page borders:
423 # - border_first only applies to the first section
424 # - border_last only applies to the last section
425 section_page = document.rtf_page.model_copy()
426 if i > 0:
427 section_page.border_first = None
428 if i < len(df_list) - 1:
429 section_page.border_last = None
431 # Handle component visibility across sections
432 # Use model_copy to avoid modifying original document components
433 section_title = (
434 document.rtf_title.model_copy() if document.rtf_title else None
435 )
436 section_footnote = (
437 document.rtf_footnote.model_copy() if document.rtf_footnote else None
438 )
439 section_source = (
440 document.rtf_source.model_copy() if document.rtf_source else None
441 )
442 section_subline = (
443 document.rtf_subline.model_copy() if document.rtf_subline else None
444 )
445 section_page_header = (
446 document.rtf_page_header.model_copy()
447 if document.rtf_page_header
448 else None
449 )
450 section_page_footer = (
451 document.rtf_page_footer.model_copy()
452 if document.rtf_page_footer
453 else None
454 )
456 # Title: if "first", only show on first section
457 # Also suppress if this section continues on the same page (new_page=False)
458 if i > 0:
459 should_suppress = not section_body.new_page
461 if (document.rtf_page.page_title == "first") or should_suppress:
462 if section_title:
463 section_title.text = None
464 if section_subline:
465 section_subline.text = None
467 # Suppress Page Header/Footer for continuous sections
468 if should_suppress:
469 if section_page_header:
470 section_page_header.text = None
471 if section_page_footer:
472 section_page_footer.text = None
474 # Footnote/Source: if "last", only show on last section
475 # For continuous sections, suppress them unless it's the last one.
476 if i < len(df_list) - 1:
477 should_suppress = not body_list[
478 i + 1
479 ].new_page # Next section continues
481 if document.rtf_page.page_footnote == "last" and section_footnote:
482 section_footnote.text = None
483 if document.rtf_page.page_source == "last" and section_source:
484 section_source.text = None
486 # Use model_copy to safely create a new instance with updated fields
487 temp_document = document.model_copy(
488 update={
489 "df": section_df,
490 "rtf_body": section_body,
491 "rtf_column_header": section_headers_obj,
492 "rtf_page": section_page,
493 "rtf_title": section_title,
494 "rtf_subline": section_subline,
495 "rtf_page_header": section_page_header,
496 "rtf_page_footer": section_page_footer,
497 "rtf_footnote": section_footnote,
498 "rtf_source": section_source,
499 }
500 )
502 # Encode section body (headers will be handled by PageRenderer)
503 section_body_content = self._encode_body_section(
504 temp_document, section_df, section_body
505 )
506 all_section_content.extend(section_body_content)
508 # Handle bottom borders on last section
509 if document.rtf_footnote is not None and doc_border_bottom is not None:
510 document.rtf_footnote.border_bottom = BroadcastValue(
511 value=document.rtf_footnote.border_bottom, dimension=(1, 1)
512 ).update_row(0, [doc_border_bottom[0]])
513 else:
514 # Apply bottom border to last section's last row
515 if isinstance(document.rtf_body, list) and isinstance(document.df, list):
516 last_section_body = document.rtf_body[-1]
517 last_section_dim = document.df[-1].shape
518 if last_section_dim[0] > 0 and doc_border_bottom is not None:
519 last_section_body.border_bottom = BroadcastValue(
520 value=last_section_body.border_bottom,
521 dimension=last_section_dim,
522 ).update_row(last_section_dim[0] - 1, doc_border_bottom)
524 return "\n".join(
525 [
526 item
527 for item in [
528 self.encoding_service.encode_document_start(),
529 self.encoding_service.encode_font_table(),
530 self.encoding_service.encode_color_table(document),
531 "\n",
532 self.encoding_service.encode_page_header(
533 document.rtf_page_header, method="line"
534 ),
535 self.encoding_service.encode_page_footer(
536 document.rtf_page_footer, method="line"
537 ),
538 self.encoding_service.encode_page_settings(document.rtf_page),
539 "\n".join(all_section_content),
540 "\n\n",
541 "}",
542 ]
543 if item is not None
544 ]
545 )
547 # 3. Standard Pipeline
548 color_service.set_document_context(document)
550 page_rtf_chunks = self._encode_body_section(
551 document, document.df, document.rtf_body
552 )
554 # F. Assembly
555 result = "\n".join(
556 [
557 item
558 for item in [
559 self.encoding_service.encode_document_start(),
560 self.encoding_service.encode_font_table(),
561 self.encoding_service.encode_color_table(document),
562 "\n",
563 self.encoding_service.encode_page_header(
564 document.rtf_page_header, method="line"
565 ),
566 self.encoding_service.encode_page_footer(
567 document.rtf_page_footer, method="line"
568 ),
569 self.encoding_service.encode_page_settings(document.rtf_page),
570 "\n".join(page_rtf_chunks),
571 "\n\n",
572 "}",
573 ]
574 if item is not None
575 ]
576 )
578 color_service.clear_document_context()
579 return result