Coverage for src / rtflite / encoding / strategies.py: 84%
429 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-28 05:09 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-28 05:09 +0000
1"""Encoding strategies for different types of RTF documents."""
3from abc import ABC, abstractmethod
4from typing import TYPE_CHECKING
6from ..services.grouping_service import grouping_service
7from ..type_guards import (
8 is_flat_header_list,
9 is_nested_header_list,
10 is_single_body,
11 is_single_header,
12)
14if TYPE_CHECKING:
15 from ..encode import RTFDocument
18class EncodingStrategy(ABC):
19 """Abstract base class for RTF encoding strategies."""
21 @abstractmethod
22 def encode(self, document: "RTFDocument") -> str:
23 """Encode the document using this strategy.
25 Args:
26 document: The RTF document to encode
28 Returns:
29 Complete RTF string
30 """
31 pass
34class SinglePageStrategy(EncodingStrategy):
35 """Encoding strategy for single-page documents without pagination."""
37 def __init__(self):
38 from ..services import RTFEncodingService
39 from ..services.document_service import RTFDocumentService
40 from ..services.figure_service import RTFFigureService
42 self.encoding_service = RTFEncodingService()
43 self.document_service = RTFDocumentService()
44 self.figure_service = RTFFigureService()
46 def encode(self, document: "RTFDocument") -> str:
47 """Encode a single-page document with complete border and layout handling.
49 Args:
50 document: The RTF document to encode
52 Returns:
53 Complete RTF string
54 """
55 import polars as pl
57 from ..attributes import BroadcastValue
59 # Handle figure-only documents (no table)
60 if document.df is None:
61 return self._encode_figure_only_document_simple(document)
63 # Check if this is a multi-section document
64 if isinstance(document.df, list):
65 return self._encode_multi_section_document(document)
67 # Original single-page encoding logic for table documents
68 dim = document.df.shape
70 # Title
71 rtf_title = self.encoding_service.encode_title(
72 document.rtf_title, method="line"
73 )
75 # Page Border
76 doc_border_top_list = BroadcastValue(
77 value=document.rtf_page.border_first, dimension=(1, dim[1])
78 ).to_list()
79 doc_border_top = doc_border_top_list[0] if doc_border_top_list else None
80 doc_border_bottom_list = BroadcastValue(
81 value=document.rtf_page.border_last, dimension=(1, dim[1])
82 ).to_list()
83 doc_border_bottom = (
84 doc_border_bottom_list[0] if doc_border_bottom_list else None
85 )
86 page_border_top = None
87 page_border_bottom = None
88 if document.rtf_body is not None and is_single_body(document.rtf_body):
89 page_border_top_list = BroadcastValue(
90 value=document.rtf_body.border_first, dimension=(1, dim[1])
91 ).to_list()
92 page_border_top = page_border_top_list[0] if page_border_top_list else None
93 page_border_bottom_list = BroadcastValue(
94 value=document.rtf_body.border_last, dimension=(1, dim[1])
95 ).to_list()
96 page_border_bottom = (
97 page_border_bottom_list[0] if page_border_bottom_list else None
98 )
100 # Column header
101 if document.rtf_column_header is None:
102 rtf_column_header = ""
103 # Only update borders if DataFrame has rows
104 if dim[0] > 0:
105 document.rtf_body.border_top = BroadcastValue(
106 value=document.rtf_body.border_top, dimension=dim
107 ).update_row(0, doc_border_top)
108 else:
109 # Check if rtf_column_header is a list
110 header_to_check = None
111 if is_nested_header_list(document.rtf_column_header):
112 # Nested list case - get first section's first header
113 if (
114 document.rtf_column_header[0]
115 and len(document.rtf_column_header[0]) > 0
116 ):
117 header_to_check = document.rtf_column_header[0][0]
118 elif is_flat_header_list(document.rtf_column_header):
119 # Flat list case - get first header
120 if len(document.rtf_column_header) > 0:
121 header_to_check = document.rtf_column_header[0]
122 elif is_single_header(document.rtf_column_header): # type: ignore[arg-type]
123 header_to_check = document.rtf_column_header
125 if (
126 header_to_check is not None
127 and header_to_check.text is None
128 and is_single_body(document.rtf_body)
129 and document.rtf_body.as_colheader
130 ):
131 # Determine which columns to exclude from headers
132 excluded_columns = list(document.rtf_body.page_by or []) + list(
133 document.rtf_body.subline_by or []
134 )
135 columns = [
136 col for col in document.df.columns if col not in excluded_columns
137 ]
138 # Create DataFrame with explicit column names to ensure single row
139 header_df = pl.DataFrame(
140 [columns],
141 schema=[f"col_{i}" for i in range(len(columns))],
142 orient="row",
143 )
144 # Only assign if we have a valid flat header list
145 if (
146 is_flat_header_list(document.rtf_column_header)
147 and len(document.rtf_column_header) > 0
148 and document.rtf_column_header[0] is not None
149 ):
150 document.rtf_column_header[0].text = header_df # type: ignore[assignment]
152 # Adjust col_rel_width to match the processed columns
153 if excluded_columns:
154 original_cols = list(document.df.columns)
155 excluded_cols_set = set(excluded_columns)
156 processed_col_indices = [
157 i
158 for i, col in enumerate(original_cols)
159 if col not in excluded_cols_set
160 ]
162 # Ensure there are enough col_rel_width values for all
163 # original columns
164 if document.rtf_body.col_rel_width is not None and len(
165 document.rtf_body.col_rel_width
166 ) >= len(original_cols):
167 if (
168 is_flat_header_list(document.rtf_column_header)
169 and len(document.rtf_column_header) > 0
170 and document.rtf_column_header[0] is not None
171 ):
172 document.rtf_column_header[0].col_rel_width = [
173 document.rtf_body.col_rel_width[i]
174 for i in processed_col_indices
175 ]
176 else:
177 # Fallback: use equal widths if col_rel_width does not
178 # match or is None
179 if (
180 is_flat_header_list(document.rtf_column_header)
181 and len(document.rtf_column_header) > 0
182 and document.rtf_column_header[0] is not None
183 ):
184 document.rtf_column_header[0].col_rel_width = [1] * len(
185 columns
186 )
188 document.rtf_column_header = document.rtf_column_header[:1]
190 # Only update borders if DataFrame has rows
191 if (
192 dim[0] > 0
193 and is_flat_header_list(document.rtf_column_header)
194 and len(document.rtf_column_header) > 0
195 and document.rtf_column_header[0] is not None
196 ):
197 document.rtf_column_header[0].border_top = BroadcastValue(
198 value=document.rtf_column_header[0].border_top, dimension=dim
199 ).update_row(0, doc_border_top if doc_border_top is not None else [])
201 if is_nested_header_list(document.rtf_column_header):
202 # Handle nested list of headers
203 rtf_column_header = []
204 for section_headers in document.rtf_column_header:
205 if section_headers:
206 for header in section_headers:
207 if header:
208 rtf_column_header.append(
209 self.encoding_service.encode_column_header(
210 header.text, header, document.rtf_page.col_width
211 )
212 )
213 elif is_flat_header_list(document.rtf_column_header):
214 rtf_column_header = [
215 self.encoding_service.encode_column_header(
216 header.text if header else None,
217 header,
218 document.rtf_page.col_width,
219 )
220 for header in document.rtf_column_header
221 ]
222 elif is_single_header(document.rtf_column_header): # type: ignore[arg-type]
223 rtf_column_header = [
224 self.encoding_service.encode_column_header(
225 document.rtf_column_header.text,
226 document.rtf_column_header,
227 document.rtf_page.col_width,
228 )
229 ]
230 else:
231 rtf_column_header = []
233 # Only update borders if DataFrame has rows
234 if (
235 dim[0] > 0
236 and is_single_body(document.rtf_body)
237 and page_border_top is not None
238 ):
239 document.rtf_body.border_top = BroadcastValue(
240 value=document.rtf_body.border_top, dimension=dim
241 ).update_row(0, page_border_top)
243 # Bottom border last line update
244 if document.rtf_footnote is not None:
245 if page_border_bottom is not None:
246 document.rtf_footnote.border_bottom = BroadcastValue(
247 value=document.rtf_footnote.border_bottom, dimension=(1, 1)
248 ).update_row(0, [page_border_bottom[0]])
250 if doc_border_bottom is not None:
251 document.rtf_footnote.border_bottom = BroadcastValue(
252 value=document.rtf_footnote.border_bottom, dimension=(1, 1)
253 ).update_row(0, [doc_border_bottom[0]])
254 else:
255 # Only update borders if DataFrame has rows
256 if dim[0] > 0:
257 if page_border_bottom is not None and is_single_body(document.rtf_body):
258 document.rtf_body.border_bottom = BroadcastValue(
259 value=document.rtf_body.border_bottom, dimension=dim
260 ).update_row(dim[0] - 1, page_border_bottom)
262 if doc_border_bottom is not None and is_single_body(document.rtf_body):
263 document.rtf_body.border_bottom = BroadcastValue(
264 value=document.rtf_body.border_bottom, dimension=dim
265 ).update_row(dim[0] - 1, doc_border_bottom)
267 # Set document color context for accurate color index resolution
268 from ..services.color_service import color_service
270 color_service.set_document_context(document)
272 # Body
273 rtf_body = self.encoding_service.encode_body(
274 document, document.df, document.rtf_body, force_single_page=True
275 )
277 result = "\n".join(
278 [
279 item
280 for item in [
281 self.encoding_service.encode_document_start(),
282 self.encoding_service.encode_font_table(),
283 self.encoding_service.encode_color_table(document),
284 "\n",
285 self.encoding_service.encode_page_header(
286 document.rtf_page_header, method="line"
287 ),
288 self.encoding_service.encode_page_footer(
289 document.rtf_page_footer, method="line"
290 ),
291 self.encoding_service.encode_page_settings(document.rtf_page),
292 rtf_title,
293 "\n",
294 self.encoding_service.encode_subline(
295 document.rtf_subline, method="line"
296 ),
297 self.figure_service.encode_figure(document.rtf_figure)
298 if document.rtf_figure is not None
299 and document.rtf_figure.fig_pos == "before"
300 else None,
301 "\n".join(
302 header for sublist in rtf_column_header for header in sublist
303 )
304 if rtf_column_header
305 else None,
306 "\n".join(rtf_body),
307 "\n".join(
308 self.encoding_service.encode_footnote(
309 document.rtf_footnote,
310 page_number=1,
311 page_col_width=document.rtf_page.col_width,
312 )
313 )
314 if document.rtf_footnote is not None
315 else None,
316 "\n".join(
317 self.encoding_service.encode_source(
318 document.rtf_source,
319 page_number=1,
320 page_col_width=document.rtf_page.col_width,
321 )
322 )
323 if document.rtf_source is not None
324 else None,
325 self.figure_service.encode_figure(document.rtf_figure)
326 if document.rtf_figure is not None
327 and document.rtf_figure.fig_pos == "after"
328 else None,
329 "\n\n",
330 "}",
331 ]
332 if item is not None
333 ]
334 )
336 # Clear document context after encoding
337 color_service.clear_document_context()
339 return result
341 def _encode_multi_section_document(self, document: "RTFDocument") -> str:
342 """Encode a multi-section document where sections are concatenated row by row.
344 Args:
345 document: The RTF document with multiple df/rtf_body sections
347 Returns:
348 Complete RTF string
349 """
350 from ..attributes import BroadcastValue
352 # Calculate column counts for border management
353 if isinstance(document.df, list):
354 first_section_cols = document.df[0].shape[1] if document.df else 0
355 else:
356 first_section_cols = document.df.shape[1] if document.df is not None else 0
358 # Document structure components
359 rtf_title = self.encoding_service.encode_title(
360 document.rtf_title, method="line"
361 )
363 # Handle page borders (use first section for dimensions)
364 doc_border_top_list = BroadcastValue(
365 value=document.rtf_page.border_first, dimension=(1, first_section_cols)
366 ).to_list()
367 doc_border_top = doc_border_top_list[0] if doc_border_top_list else None
368 doc_border_bottom_list = BroadcastValue(
369 value=document.rtf_page.border_last, dimension=(1, first_section_cols)
370 ).to_list()
371 doc_border_bottom = (
372 doc_border_bottom_list[0] if doc_border_bottom_list else None
373 )
375 # Encode sections
376 all_section_content = []
377 is_nested_headers = is_nested_header_list(document.rtf_column_header)
379 df_list = (
380 document.df
381 if isinstance(document.df, list)
382 else [document.df]
383 if document.df is not None
384 else []
385 )
386 body_list = (
387 document.rtf_body
388 if isinstance(document.rtf_body, list)
389 else [document.rtf_body]
390 if document.rtf_body is not None
391 else []
392 )
394 for i, (section_df, section_body) in enumerate(
395 zip(df_list, body_list, strict=True)
396 ):
397 dim = section_df.shape
399 # Handle column headers for this section
400 section_headers: list[str] = []
401 if is_nested_headers:
402 # Nested format: [[header1], [None], [header3]]
403 if (
404 i < len(document.rtf_column_header)
405 and document.rtf_column_header[i]
406 ):
407 for header in document.rtf_column_header[i]:
408 if header is not None:
409 from ..input import RTFColumnHeader
411 # Ensure header is RTFColumnHeader, not tuple
412 if not isinstance(header, RTFColumnHeader):
413 continue
414 # Apply top border to first section's first header
415 if (
416 i == 0
417 and not section_headers
418 and doc_border_top is not None
419 ):
420 header.border_top = BroadcastValue(
421 value=header.border_top, dimension=dim
422 ).update_row(0, doc_border_top)
424 section_headers.append(
425 self.encoding_service.encode_column_header(
426 header.text, header, document.rtf_page.col_width
427 )
428 )
429 else:
430 # Flat format - only apply to first section
431 if i == 0:
432 headers_to_check = []
433 if is_flat_header_list(document.rtf_column_header):
434 headers_to_check = document.rtf_column_header
435 elif is_single_header(document.rtf_column_header): # type: ignore[arg-type]
436 headers_to_check = [document.rtf_column_header]
438 for header in headers_to_check:
439 if (
440 header is not None
441 and header.text is None
442 and section_body.as_colheader
443 ):
444 # Auto-generate headers from column names
445 columns = [
446 col
447 for col in section_df.columns
448 if col not in (section_body.page_by or [])
449 ]
450 import polars as pl
452 header_df = pl.DataFrame(
453 [columns],
454 schema=[f"col_{j}" for j in range(len(columns))],
455 orient="row",
456 )
457 header.text = header_df # type: ignore[assignment]
459 # Apply top border to first header
460 if (
461 not section_headers
462 and doc_border_top is not None
463 and header is not None
464 ):
465 header.border_top = BroadcastValue(
466 value=header.border_top, dimension=dim
467 ).update_row(
468 0, doc_border_top if doc_border_top is not None else []
469 )
471 if header is not None:
472 section_headers.append(
473 self.encoding_service.encode_column_header(
474 header.text, header, document.rtf_page.col_width
475 )
476 )
478 # Handle borders for section body
479 if i == 0 and not section_headers: # First section, no headers
480 # Apply top border to first row of first section
481 section_body.border_top = BroadcastValue(
482 value=section_body.border_top, dimension=dim
483 ).update_row(0, doc_border_top if doc_border_top is not None else [])
485 # Create a temporary document for this section to maintain compatibility
486 from copy import deepcopy
488 temp_document = deepcopy(document)
489 temp_document.df = section_df
490 temp_document.rtf_body = section_body
492 # Encode section body
493 section_body_content = self.encoding_service.encode_body(
494 temp_document, section_df, section_body
495 )
497 # Add section content
498 if section_headers:
499 all_section_content.extend(
500 [
501 "\n".join(
502 header for sublist in section_headers for header in sublist
503 )
504 ]
505 )
506 all_section_content.extend(section_body_content)
508 # Handle bottom borders on last section
509 if document.rtf_footnote is not None and doc_border_bottom is not None:
510 document.rtf_footnote.border_bottom = BroadcastValue(
511 value=document.rtf_footnote.border_bottom, dimension=(1, 1)
512 ).update_row(0, [doc_border_bottom[0]])
513 else:
514 # Apply bottom border to last section's last row
515 if isinstance(document.rtf_body, list) and isinstance(document.df, list):
516 last_section_body = document.rtf_body[-1]
517 last_section_dim = document.df[-1].shape
518 if last_section_dim[0] > 0 and doc_border_bottom is not None:
519 last_section_body.border_bottom = BroadcastValue(
520 value=last_section_body.border_bottom,
521 dimension=last_section_dim,
522 ).update_row(last_section_dim[0] - 1, doc_border_bottom)
524 return "\n".join(
525 [
526 item
527 for item in [
528 self.encoding_service.encode_document_start(),
529 self.encoding_service.encode_font_table(),
530 "\n",
531 self.encoding_service.encode_page_header(
532 document.rtf_page_header, method="line"
533 ),
534 self.encoding_service.encode_page_footer(
535 document.rtf_page_footer, method="line"
536 ),
537 self.encoding_service.encode_page_settings(document.rtf_page),
538 rtf_title,
539 "\n",
540 self.encoding_service.encode_subline(
541 document.rtf_subline, method="line"
542 ),
543 "\n".join(all_section_content),
544 "\n".join(
545 self.encoding_service.encode_footnote(
546 document.rtf_footnote,
547 page_number=1,
548 page_col_width=document.rtf_page.col_width,
549 )
550 )
551 if document.rtf_footnote is not None
552 else None,
553 "\n".join(
554 self.encoding_service.encode_source(
555 document.rtf_source,
556 page_number=1,
557 page_col_width=document.rtf_page.col_width,
558 )
559 )
560 if document.rtf_source is not None
561 else None,
562 "\n\n",
563 "}",
564 ]
565 if item is not None
566 ]
567 )
569 def _encode_figure_only_document_simple(self, document: "RTFDocument") -> str:
570 """Encode a figure-only document with simple page layout.
572 This handles figure-only documents with default page settings.
573 Multiple figures will have page breaks between them (handled by FigureService).
575 Args:
576 document: The RTF document with only figure content
578 Returns:
579 Complete RTF string
580 """
581 # Build RTF components for simple figure-only document
582 rtf_title = self.encoding_service.encode_title(
583 document.rtf_title, method="line"
584 )
586 # Assemble final RTF document
587 return "".join(
588 [
589 item
590 for item in [
591 self.encoding_service.encode_document_start(),
592 self.encoding_service.encode_font_table(),
593 self.encoding_service.encode_color_table(document),
594 "\n",
595 self.encoding_service.encode_page_header(
596 document.rtf_page_header, method="line"
597 ),
598 self.encoding_service.encode_page_footer(
599 document.rtf_page_footer, method="line"
600 ),
601 self.encoding_service.encode_page_settings(document.rtf_page),
602 rtf_title,
603 "\n",
604 self.encoding_service.encode_subline(
605 document.rtf_subline, method="line"
606 ),
607 # FigureService handles page breaks between multiple figures
608 self.figure_service.encode_figure(document.rtf_figure),
609 "\n".join(
610 self.encoding_service.encode_footnote(
611 document.rtf_footnote,
612 page_number=1,
613 page_col_width=document.rtf_page.col_width,
614 )
615 )
616 if document.rtf_footnote is not None
617 else None,
618 "\n".join(
619 self.encoding_service.encode_source(
620 document.rtf_source,
621 page_number=1,
622 page_col_width=document.rtf_page.col_width,
623 )
624 )
625 if document.rtf_source is not None
626 else None,
627 "\n\n",
628 "}",
629 ]
630 if item is not None
631 ]
632 )
635class PaginatedStrategy(EncodingStrategy):
636 """Encoding strategy for multi-page documents with pagination."""
638 def __init__(self):
639 from ..services import RTFEncodingService
640 from ..services.document_service import RTFDocumentService
641 from ..services.figure_service import RTFFigureService
643 self.encoding_service = RTFEncodingService()
644 self.document_service = RTFDocumentService()
645 self.figure_service = RTFFigureService()
647 def encode(self, document: "RTFDocument") -> str:
648 """Encode a paginated document with full pagination support.
650 Args:
651 document: The RTF document to encode
653 Returns:
654 Complete RTF string
655 """
656 from copy import deepcopy
658 import polars as pl
660 from ..attributes import BroadcastValue
661 from ..row import Utils
663 # Handle figure-only documents with multi-page behavior
664 if document.df is None:
665 return self._encode_figure_only_document_with_pagination(document)
667 # Get dimensions based on DataFrame type
668 if isinstance(document.df, list):
669 # For list of DataFrames, use first one's columns
670 dim = (
671 sum(df.shape[0] for df in document.df),
672 document.df[0].shape[1] if document.df else 0,
673 )
674 else:
675 dim = document.df.shape
677 # Set document color context for accurate color index resolution
678 from ..services.color_service import color_service
680 color_service.set_document_context(document)
682 # Prepare DataFrame for processing (remove subline_by columns, apply
683 # group_by if needed)
684 processed_df, original_df = (
685 self.encoding_service.prepare_dataframe_for_body_encoding(
686 document.df, document.rtf_body
687 )
688 )
690 # Validate subline_by formatting consistency before processing
691 if (
692 is_single_body(document.rtf_body)
693 and document.rtf_body.subline_by is not None
694 ):
695 import warnings
696 from typing import cast
698 subline_by_list = cast(list[str], document.rtf_body.subline_by)
699 formatting_warnings = (
700 grouping_service.validate_subline_formatting_consistency(
701 original_df, subline_by_list, document.rtf_body
702 )
703 )
704 for warning_msg in formatting_warnings:
705 warnings.warn(
706 f"subline_by formatting: {warning_msg}", UserWarning, stacklevel=3
707 )
709 # Get pagination instance and distribute content (use processed data
710 # for distribution)
711 _, distributor = self.document_service.create_pagination_instance(document)
712 col_total_width = document.rtf_page.col_width
713 if (
714 is_single_body(document.rtf_body)
715 and document.rtf_body.col_rel_width is not None
716 ):
717 col_widths = Utils._col_widths(
718 document.rtf_body.col_rel_width,
719 col_total_width if col_total_width is not None else 8.5,
720 )
721 else:
722 # Default to equal widths if body is not single
723 # Use processed_df column count (after page_by/subline_by columns removed)
724 col_widths = Utils._col_widths(
725 [1] * processed_df.shape[1], col_total_width if col_total_width is not None else 8.5
726 )
728 # Calculate additional rows per page for r2rtf compatibility
729 additional_rows = self.document_service.calculate_additional_rows_per_page(
730 document
731 )
733 # Use original DataFrame for pagination logic (to identify subline_by breaks)
734 # but processed DataFrame for the actual content
735 if is_single_body(document.rtf_body):
736 # Use original DataFrame for proper pagination distribution logic
737 pages = distributor.distribute_content(
738 df=original_df,
739 col_widths=col_widths,
740 page_by=document.rtf_body.page_by,
741 new_page=document.rtf_body.new_page,
742 pageby_header=document.rtf_body.pageby_header,
743 table_attrs=document.rtf_body,
744 additional_rows_per_page=additional_rows,
745 subline_by=document.rtf_body.subline_by,
746 )
747 else:
748 # Default pagination if body is not single
749 pages = distributor.distribute_content(
750 df=original_df,
751 col_widths=col_widths,
752 page_by=None,
753 new_page=None,
754 pageby_header=None,
755 table_attrs=None,
756 additional_rows_per_page=additional_rows,
757 subline_by=None,
758 )
760 # Replace page data with processed data (without subline_by columns)
761 for page_info in pages:
762 start_row = page_info["start_row"]
763 end_row = page_info["end_row"]
764 page_info["data"] = processed_df.slice(start_row, end_row - start_row + 1)
766 # Apply group_by processing to each page if needed
767 if is_single_body(document.rtf_body) and document.rtf_body.group_by:
768 # Calculate global page start indices for context restoration
769 page_start_indices = []
770 cumulative_rows = 0
771 for i, page_info in enumerate(pages):
772 if i > 0: # Skip first page (starts at 0)
773 page_start_indices.append(cumulative_rows)
774 cumulative_rows += len(page_info["data"])
776 # Process all pages together for proper group_by and page context
777 # restoration
778 all_page_data = []
779 for page_info in pages:
780 all_page_data.append(page_info["data"])
782 # Concatenate all page data
783 full_df = all_page_data[0]
784 for page_df in all_page_data[1:]:
785 full_df = full_df.vstack(page_df)
787 # Apply group_by suppression to the full dataset
788 from typing import cast
790 group_by_param = cast(list[str] | None, document.rtf_body.group_by)
791 suppressed_df = grouping_service.enhance_group_by(full_df, group_by_param)
793 # Apply page context restoration
794 from typing import cast
796 group_by_list2 = cast(list[str], document.rtf_body.group_by)
797 restored_df = grouping_service.restore_page_context(
798 suppressed_df, full_df, group_by_list2, page_start_indices
799 )
801 # Split the processed data back to pages
802 start_idx = 0
803 for page_info in pages:
804 page_rows = len(page_info["data"])
805 page_info["data"] = restored_df.slice(start_idx, page_rows)
806 start_idx += page_rows
808 # Prepare border settings
809 border_first_list = BroadcastValue(
810 value=document.rtf_page.border_first, dimension=(1, dim[1])
811 ).to_list()
812 _ = (
813 border_first_list[0] if border_first_list else None
814 ) # May be used for validation
815 border_last_list = BroadcastValue(
816 value=document.rtf_page.border_last, dimension=(1, dim[1])
817 ).to_list()
818 _ = (
819 border_last_list[0] if border_last_list else None
820 ) # May be used for validation
822 # Generate RTF for each page
823 page_contents = []
825 for page_info in pages:
826 page_elements = []
828 # Add page break before each page (except first)
829 if not page_info["is_first_page"]:
830 page_elements.append(
831 self.document_service.generate_page_break(document)
832 )
834 # Add title if it should appear on this page
835 if (
836 document.rtf_title
837 and document.rtf_title.text
838 and self.document_service.should_show_element_on_page(
839 document.rtf_page.page_title, page_info
840 )
841 ):
842 title_content = self.encoding_service.encode_title(
843 document.rtf_title, method="line"
844 )
845 if title_content:
846 page_elements.append(title_content)
847 page_elements.append("\n")
849 # Add subline if it should appear on this page
850 if (
851 document.rtf_subline
852 and document.rtf_subline.text
853 and self.document_service.should_show_element_on_page(
854 document.rtf_page.page_title, page_info
855 )
856 ):
857 subline_content = self.encoding_service.encode_subline(
858 document.rtf_subline, method="line"
859 )
860 if subline_content:
861 page_elements.append(subline_content)
863 # Add subline_by header paragraph if specified
864 if page_info.get("subline_header"):
865 subline_header_content = self._generate_subline_header(
866 page_info["subline_header"], document.rtf_body
867 )
868 if subline_header_content:
869 page_elements.append(subline_header_content)
871 # Add figures if they should appear on the first page
872 # and position is 'before'
873 if (
874 document.rtf_figure
875 and document.rtf_figure.figures
876 and document.rtf_figure.fig_pos == "before"
877 and page_info["is_first_page"]
878 ):
879 figure_content = self.figure_service.encode_figure(document.rtf_figure)
880 if figure_content:
881 page_elements.append(figure_content)
882 page_elements.append("\n")
884 # Add column headers if needed
885 if page_info["needs_header"] and document.rtf_column_header:
886 if (
887 is_flat_header_list(document.rtf_column_header)
888 and len(document.rtf_column_header) > 0
889 and document.rtf_column_header[0] is not None
890 and document.rtf_column_header[0].text is None
891 and is_single_body(document.rtf_body)
892 and document.rtf_body.as_colheader
893 ):
894 # Use processed page data columns (which already have
895 # subline_by columns removed)
896 page_df = page_info["data"]
897 columns = list(page_df.columns)
898 # Create DataFrame for text field (not assign list to text)
899 import polars as pl
901 header_df = pl.DataFrame(
902 [columns],
903 schema=[f"col_{i}" for i in range(len(columns))],
904 orient="row",
905 )
906 document.rtf_column_header[0].text = header_df # type: ignore[assignment]
908 # Adjust col_rel_width to match processed columns (without
909 # subline_by and page_by)
910 if (
911 is_single_body(document.rtf_body)
912 and (document.rtf_body.subline_by or document.rtf_body.page_by)
913 ):
914 original_cols = (
915 list(document.df.columns)
916 if isinstance(document.df, pl.DataFrame)
917 else []
918 )
919 # Collect columns that should be excluded
920 excluded_cols: set[str] = set()
921 if document.rtf_body.subline_by:
922 excluded_cols.update(document.rtf_body.subline_by)
923 if document.rtf_body.page_by:
924 excluded_cols.update(document.rtf_body.page_by)
926 processed_col_indices = [
927 i
928 for i, col in enumerate(original_cols)
929 if col not in excluded_cols
930 ]
932 # Ensure there are enough col_rel_width values for all
933 # original columns
934 if (
935 is_single_body(document.rtf_body)
936 and document.rtf_body.col_rel_width is not None
937 and len(document.rtf_body.col_rel_width)
938 >= len(original_cols)
939 and is_flat_header_list(document.rtf_column_header)
940 and len(document.rtf_column_header) > 0
941 and document.rtf_column_header[0] is not None
942 ):
943 document.rtf_column_header[0].col_rel_width = [
944 document.rtf_body.col_rel_width[i]
945 for i in processed_col_indices
946 ]
947 elif (
948 is_flat_header_list(document.rtf_column_header)
949 and len(document.rtf_column_header) > 0
950 and document.rtf_column_header[0] is not None
951 ):
952 # Fallback: use equal widths if col_rel_width doesn't match
953 document.rtf_column_header[0].col_rel_width = [1] * len(
954 columns
955 )
957 # Apply pagination borders to column headers
958 # Process each column header with proper borders
959 header_elements = []
960 headers_to_process = []
961 if is_nested_header_list(document.rtf_column_header):
962 # For nested headers, flatten them
963 for section_headers in document.rtf_column_header:
964 if section_headers:
965 headers_to_process.extend(section_headers)
966 elif is_flat_header_list(document.rtf_column_header):
967 headers_to_process = document.rtf_column_header
969 for i, header in enumerate(headers_to_process):
970 if header is None:
971 continue
972 header_copy = deepcopy(header)
974 # Remove page_by/subline_by columns from header to match body
975 import polars as pl
976 if isinstance(header_copy.text, pl.DataFrame):
977 columns_to_remove = set()
978 if document.rtf_body.page_by:
979 columns_to_remove.update(document.rtf_body.page_by)
980 if document.rtf_body.subline_by:
981 columns_to_remove.update(document.rtf_body.subline_by)
983 if columns_to_remove:
984 remaining_columns = [
985 col for col in header_copy.text.columns
986 if col not in columns_to_remove
987 ]
988 header_copy.text = header_copy.text.select(remaining_columns)
990 # Apply page-level borders to column headers (matching
991 # non-paginated behavior)
992 if (
993 page_info["is_first_page"]
994 and i == 0
995 and document.rtf_page.border_first
996 and header_copy.text is not None
997 ): # First header on first page
998 # Get dimensions based on text type
999 if isinstance(header_copy.text, pl.DataFrame):
1000 header_dims = header_copy.text.shape
1001 else:
1002 # For Sequence[str], assume single row
1003 header_dims = (
1004 1,
1005 len(header_copy.text) if header_copy.text else 0,
1006 )
1007 # Apply page border_first to top of first column header
1008 header_copy.border_top = BroadcastValue(
1009 value=header_copy.border_top, dimension=header_dims
1010 ).update_row(
1011 0, [document.rtf_page.border_first] * header_dims[1]
1012 )
1014 # Encode the header with modified borders
1015 # Use the header_copy to preserve border modifications
1016 header_rtf = self.encoding_service.encode_column_header(
1017 header_copy.text, header_copy, document.rtf_page.col_width
1018 )
1019 header_elements.extend(header_rtf)
1021 page_elements.extend(header_elements)
1023 # Add page_by spanning table row after headers if specified
1024 if page_info.get("pageby_header_info"):
1025 # Extract group values for spanning row text
1026 header_info = page_info["pageby_header_info"]
1027 if "group_values" in header_info:
1028 header_parts = [
1029 str(value)
1030 for value in header_info["group_values"].values()
1031 if value is not None
1032 ]
1033 if header_parts:
1034 header_text = ", ".join(header_parts)
1035 # Use shared encoding service method
1036 pageby_row_content = self.encoding_service.encode_spanning_row(
1037 text=header_text,
1038 page_width=document.rtf_page.col_width
1039 if document.rtf_page.col_width
1040 else 8.5,
1041 rtf_body_attrs=document.rtf_body,
1042 )
1043 page_elements.extend(pageby_row_content)
1045 # Add page content (table body) with proper border handling
1046 page_df = page_info["data"]
1048 # Apply pagination borders to the body attributes
1049 page_attrs = self.document_service.apply_pagination_borders(
1050 document, document.rtf_body, page_info, len(pages)
1051 )
1053 # Check if there are group boundaries within this page
1054 if page_info.get("group_boundaries"):
1055 # Handle mid-page group changes: insert spanning rows at boundaries
1056 group_boundaries = page_info["group_boundaries"]
1057 prev_row = 0
1059 for boundary in group_boundaries:
1060 page_relative_row = boundary["page_relative_row"]
1062 # Encode rows before this boundary
1063 if page_relative_row > prev_row:
1064 segment_df = page_df[prev_row:page_relative_row]
1065 segment_body = page_attrs._encode(segment_df, col_widths)
1066 page_elements.extend(segment_body)
1068 # Insert spanning row at boundary
1069 group_values = boundary["group_values"]
1070 header_parts = [
1071 str(value)
1072 for value in group_values.values()
1073 if value is not None
1074 ]
1075 if header_parts:
1076 header_text = ", ".join(header_parts)
1077 spanning_row = self.encoding_service.encode_spanning_row(
1078 text=header_text,
1079 page_width=document.rtf_page.col_width or 8.5,
1080 rtf_body_attrs=document.rtf_body,
1081 )
1082 page_elements.extend(spanning_row)
1084 prev_row = page_relative_row
1086 # Encode remaining rows after last boundary
1087 if prev_row < len(page_df):
1088 segment_df = page_df[prev_row:]
1090 # For the last segment on non-last pages, we need to ensure
1091 # the bottom border is applied correctly
1092 # The border was applied to page_df row indices, but we're now
1093 # encoding a segment, so we need to adjust
1094 if (
1095 not page_info["is_last_page"]
1096 and is_single_body(document.rtf_body)
1097 and document.rtf_body.border_last
1098 ):
1099 # Apply bottom border to the last row of this segment
1100 # This ensures proper table closing on middle pages
1101 import copy
1103 segment_attrs = copy.deepcopy(page_attrs)
1105 # Adjust border_bottom to apply to last row of segment
1106 last_segment_row = len(segment_df) - 1
1107 if segment_attrs.border_bottom:
1108 # Ensure border_bottom is sized correctly for segment
1109 border_style = (
1110 document.rtf_body.border_last[0][0]
1111 if isinstance(document.rtf_body.border_last, list)
1112 else document.rtf_body.border_last
1113 )
1114 # Set bottom border for all columns on last row
1115 for col_idx in range(len(segment_df.columns)):
1116 if last_segment_row < len(
1117 segment_attrs.border_bottom
1118 ):
1119 if col_idx < len(
1120 segment_attrs.border_bottom[last_segment_row]
1121 ):
1122 segment_attrs.border_bottom[last_segment_row][
1123 col_idx
1124 ] = border_style
1126 segment_body = segment_attrs._encode(segment_df, col_widths)
1127 else:
1128 segment_body = page_attrs._encode(segment_df, col_widths)
1130 page_elements.extend(segment_body)
1131 else:
1132 # No group boundaries: encode entire page as before
1133 page_body = page_attrs._encode(page_df, col_widths)
1134 page_elements.extend(page_body)
1136 # Add footnote if it should appear on this page
1137 if (
1138 document.rtf_footnote
1139 and document.rtf_footnote.text
1140 and self.document_service.should_show_element_on_page(
1141 document.rtf_page.page_footnote, page_info
1142 )
1143 ):
1144 footnote_content = self.encoding_service.encode_footnote(
1145 document.rtf_footnote,
1146 page_info["page_number"],
1147 document.rtf_page.col_width,
1148 )
1149 if footnote_content:
1150 page_elements.extend(footnote_content)
1152 # Add source if it should appear on this page
1153 if (
1154 document.rtf_source
1155 and document.rtf_source.text
1156 and self.document_service.should_show_element_on_page(
1157 document.rtf_page.page_source, page_info
1158 )
1159 ):
1160 source_content = self.encoding_service.encode_source(
1161 document.rtf_source,
1162 page_info["page_number"],
1163 document.rtf_page.col_width,
1164 )
1165 if source_content:
1166 page_elements.extend(source_content)
1168 # Add figures if they should appear on the last page and position is 'after'
1169 if (
1170 document.rtf_figure
1171 and document.rtf_figure.figures
1172 and document.rtf_figure.fig_pos == "after"
1173 and page_info["is_last_page"]
1174 ):
1175 figure_content = self.figure_service.encode_figure(document.rtf_figure)
1176 if figure_content:
1177 page_elements.append(figure_content)
1179 page_contents.extend(page_elements)
1181 # Build complete RTF document
1182 result = "\n".join(
1183 [
1184 item
1185 for item in [
1186 self.encoding_service.encode_document_start(),
1187 self.encoding_service.encode_font_table(),
1188 self.encoding_service.encode_color_table(document),
1189 "\n",
1190 self.encoding_service.encode_page_header(
1191 document.rtf_page_header, method="line"
1192 ),
1193 self.encoding_service.encode_page_footer(
1194 document.rtf_page_footer, method="line"
1195 ),
1196 self.encoding_service.encode_page_settings(document.rtf_page),
1197 "\n".join(page_contents),
1198 "\n\n",
1199 "}",
1200 ]
1201 if item is not None
1202 ]
1203 )
1205 # Clear document context after encoding
1206 color_service.clear_document_context()
1208 return result
1210 def _encode_figure_only_document_with_pagination(
1211 self, document: "RTFDocument"
1212 ) -> str:
1213 """Encode a figure-only document with multi-page behavior.
1215 This method handles figure-only documents where the user has requested
1216 elements to appear on all pages (page_title="all", etc.)
1218 For multiple figures, each figure will be on a separate page with
1219 repeated titles/footnotes/sources as specified.
1221 Args:
1222 document: The RTF document with only figure content
1224 Returns:
1225 Complete RTF string
1226 """
1227 from copy import deepcopy
1229 from ..figure import rtf_read_figure
1231 # Get figure information
1232 if document.rtf_figure is None or document.rtf_figure.figures is None:
1233 return ""
1235 # Read figure data to determine number of figures
1236 figure_data_list, figure_formats = rtf_read_figure(document.rtf_figure.figures)
1237 num_figures = len(figure_data_list)
1239 # Build RTF components
1240 rtf_title = self.encoding_service.encode_title(
1241 document.rtf_title, method="line"
1242 )
1244 # For figure-only documents, footnote should be as_table=False
1245 footnote_component = document.rtf_footnote
1246 if footnote_component is not None:
1247 footnote_component = deepcopy(footnote_component)
1248 footnote_component.as_table = False
1250 # Determine which elements should show on each page
1251 show_title_on_all = document.rtf_page.page_title == "all"
1252 show_footnote_on_all = document.rtf_page.page_footnote == "all"
1253 show_source_on_all = document.rtf_page.page_source == "all"
1255 page_elements = []
1257 # Add document start
1258 page_elements.append(self.encoding_service.encode_document_start())
1259 page_elements.append(self.encoding_service.encode_font_table())
1260 page_elements.append(self.encoding_service.encode_color_table(document))
1261 page_elements.append("\n")
1263 # Add page settings (headers/footers)
1264 page_elements.append(
1265 self.encoding_service.encode_page_header(
1266 document.rtf_page_header, method="line"
1267 )
1268 )
1269 page_elements.append(
1270 self.encoding_service.encode_page_footer(
1271 document.rtf_page_footer, method="line"
1272 )
1273 )
1274 page_elements.append(
1275 self.encoding_service.encode_page_settings(document.rtf_page)
1276 )
1278 # Create each page with figure and repeated elements
1279 for i in range(num_figures):
1280 is_first_page = i == 0
1281 is_last_page = i == num_figures - 1
1283 # Add title based on page settings
1284 if (
1285 show_title_on_all
1286 or (document.rtf_page.page_title == "first" and is_first_page)
1287 or (document.rtf_page.page_title == "last" and is_last_page)
1288 ):
1289 page_elements.append(rtf_title)
1290 page_elements.append("\n")
1292 # Add subline
1293 if is_first_page: # Only on first page
1294 page_elements.append(
1295 self.encoding_service.encode_subline(
1296 document.rtf_subline, method="line"
1297 )
1298 )
1300 # Add single figure
1301 width = self.figure_service._get_dimension(document.rtf_figure.fig_width, i)
1302 height = self.figure_service._get_dimension(
1303 document.rtf_figure.fig_height, i
1304 )
1306 figure_rtf = self.figure_service._encode_single_figure(
1307 figure_data_list[i],
1308 figure_formats[i],
1309 width,
1310 height,
1311 document.rtf_figure.fig_align,
1312 )
1313 page_elements.append(figure_rtf)
1314 page_elements.append("\\par ")
1316 # Add footnote based on page settings
1317 if footnote_component is not None and (
1318 show_footnote_on_all
1319 or (document.rtf_page.page_footnote == "first" and is_first_page)
1320 or (document.rtf_page.page_footnote == "last" and is_last_page)
1321 ):
1322 footnote_content = "\n".join(
1323 self.encoding_service.encode_footnote(
1324 footnote_component,
1325 page_number=i + 1,
1326 page_col_width=document.rtf_page.col_width,
1327 )
1328 )
1329 if footnote_content:
1330 page_elements.append(footnote_content)
1332 # Add source based on page settings
1333 if document.rtf_source is not None and (
1334 show_source_on_all
1335 or (document.rtf_page.page_source == "first" and is_first_page)
1336 or (document.rtf_page.page_source == "last" and is_last_page)
1337 ):
1338 source_content = "\n".join(
1339 self.encoding_service.encode_source(
1340 document.rtf_source,
1341 page_number=i + 1,
1342 page_col_width=document.rtf_page.col_width,
1343 )
1344 )
1345 if source_content:
1346 page_elements.append(source_content)
1348 # Add page break between figures (except after last figure)
1349 if not is_last_page:
1350 page_elements.append("\\page ")
1352 # Close document
1353 page_elements.append("\n\n")
1354 page_elements.append("}")
1356 return "".join([item for item in page_elements if item is not None])
1358 def _generate_subline_header(self, subline_header_info: dict, rtf_body) -> str:
1359 """Generate RTF paragraph for subline_by header.
1361 Args:
1362 subline_header_info: Dictionary with column values for the subline header
1363 rtf_body: RTFBody attributes for formatting
1365 Returns:
1366 RTF string for the subline paragraph
1367 """
1368 if not subline_header_info:
1369 return ""
1371 # Use the raw group values without column names
1372 if "group_values" in subline_header_info:
1373 # Extract just the values without column prefixes
1374 header_parts = []
1375 for _col, value in subline_header_info["group_values"].items():
1376 if value is not None:
1377 header_parts.append(str(value))
1379 if not header_parts:
1380 return ""
1382 header_text = ", ".join(header_parts)
1383 else:
1384 # Fallback for backward compatibility
1385 header_parts = []
1386 for col, value in subline_header_info.items():
1387 if value is not None and col not in ["group_by_columns", "header_text"]:
1388 header_parts.append(str(value))
1390 if not header_parts:
1391 return ""
1393 header_text = ", ".join(header_parts)
1395 # Create RTF paragraph with minimal spacing (no sb180/sa180 to eliminate
1396 # space between header and table)
1397 return (
1398 f"{{\\pard\\hyphpar\\fi0\\li0\\ri0\\ql\\fs18{{\\f0 {header_text}}}\\par}}"
1399 )