Coverage for src/rtflite/encoding/strategies.py: 83%
381 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 16:35 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 16:35 +0000
1"""Encoding strategies for different types of RTF documents."""
3from abc import ABC, abstractmethod
4from typing import TYPE_CHECKING
6from ..services.grouping_service import grouping_service
7from ..type_guards import (
8 is_flat_header_list,
9 is_list_body,
10 is_list_header,
11 is_nested_header_list,
12 is_single_body,
13 is_single_header,
14)
16if TYPE_CHECKING:
17 from ..encode import RTFDocument
20class EncodingStrategy(ABC):
21 """Abstract base class for RTF encoding strategies."""
23 @abstractmethod
24 def encode(self, document: "RTFDocument") -> str:
25 """Encode the document using this strategy.
27 Args:
28 document: The RTF document to encode
30 Returns:
31 Complete RTF string
32 """
33 pass
36class SinglePageStrategy(EncodingStrategy):
37 """Encoding strategy for single-page documents without pagination."""
39 def __init__(self):
40 from ..services import RTFEncodingService
41 from ..services.document_service import RTFDocumentService
42 from ..services.figure_service import RTFFigureService
44 self.encoding_service = RTFEncodingService()
45 self.document_service = RTFDocumentService()
46 self.figure_service = RTFFigureService()
48 def encode(self, document: "RTFDocument") -> str:
49 """Encode a single-page document with complete border and layout handling.
51 Args:
52 document: The RTF document to encode
54 Returns:
55 Complete RTF string
56 """
57 import polars as pl
59 from ..attributes import BroadcastValue
61 # Handle figure-only documents (no table)
62 if document.df is None:
63 return self._encode_figure_only_document_simple(document)
65 # Check if this is a multi-section document
66 if isinstance(document.df, list):
67 return self._encode_multi_section_document(document)
69 # Original single-page encoding logic for table documents
70 dim = document.df.shape
72 # Title
73 rtf_title = self.encoding_service.encode_title(
74 document.rtf_title, method="line"
75 )
77 # Page Border
78 doc_border_top_list = BroadcastValue(
79 value=document.rtf_page.border_first, dimension=(1, dim[1])
80 ).to_list()
81 doc_border_top = doc_border_top_list[0] if doc_border_top_list else None
82 doc_border_bottom_list = BroadcastValue(
83 value=document.rtf_page.border_last, dimension=(1, dim[1])
84 ).to_list()
85 doc_border_bottom = (
86 doc_border_bottom_list[0] if doc_border_bottom_list else None
87 )
88 page_border_top = None
89 page_border_bottom = None
90 if document.rtf_body is not None and not isinstance(document.rtf_body, list):
91 page_border_top_list = BroadcastValue(
92 value=document.rtf_body.border_first, dimension=(1, dim[1])
93 ).to_list()
94 page_border_top = page_border_top_list[0] if page_border_top_list else None
95 page_border_bottom_list = BroadcastValue(
96 value=document.rtf_body.border_last, dimension=(1, dim[1])
97 ).to_list()
98 page_border_bottom = (
99 page_border_bottom_list[0] if page_border_bottom_list else None
100 )
102 # Column header
103 if document.rtf_column_header is None:
104 rtf_column_header = ""
105 # Only update borders if DataFrame has rows
106 if dim[0] > 0:
107 document.rtf_body.border_top = BroadcastValue(
108 value=document.rtf_body.border_top, dimension=dim
109 ).update_row(0, doc_border_top)
110 else:
111 # Check if rtf_column_header is a list
112 header_to_check = None
113 if is_nested_header_list(document.rtf_column_header):
114 # Nested list case - get first section's first header
115 if (
116 document.rtf_column_header[0]
117 and len(document.rtf_column_header[0]) > 0
118 ):
119 header_to_check = document.rtf_column_header[0][0]
120 elif is_flat_header_list(document.rtf_column_header):
121 # Flat list case - get first header
122 if len(document.rtf_column_header) > 0:
123 header_to_check = document.rtf_column_header[0]
124 elif is_single_header(document.rtf_column_header): # type: ignore[arg-type]
125 header_to_check = document.rtf_column_header
127 if (
128 header_to_check is not None
129 and header_to_check.text is None
130 and is_single_body(document.rtf_body)
131 and document.rtf_body.as_colheader
132 ):
133 # Determine which columns to exclude from headers
134 excluded_columns = list(document.rtf_body.page_by or []) + list(
135 document.rtf_body.subline_by or []
136 )
137 columns = [
138 col for col in document.df.columns if col not in excluded_columns
139 ]
140 # Create DataFrame with explicit column names to ensure single row
141 header_df = pl.DataFrame(
142 [columns],
143 schema=[f"col_{i}" for i in range(len(columns))],
144 orient="row",
145 )
146 # Only assign if we have a valid flat header list
147 if (
148 is_flat_header_list(document.rtf_column_header)
149 and len(document.rtf_column_header) > 0
150 and document.rtf_column_header[0] is not None
151 ):
152 document.rtf_column_header[0].text = header_df # type: ignore[assignment]
154 # Adjust col_rel_width to match the processed columns
155 if excluded_columns:
156 original_cols = list(document.df.columns)
157 excluded_cols_set = set(excluded_columns)
158 processed_col_indices = [
159 i
160 for i, col in enumerate(original_cols)
161 if col not in excluded_cols_set
162 ]
164 # Ensure we have enough col_rel_width values for all original columns
165 if document.rtf_body.col_rel_width is not None and len(
166 document.rtf_body.col_rel_width
167 ) >= len(original_cols):
168 if (
169 is_flat_header_list(document.rtf_column_header)
170 and len(document.rtf_column_header) > 0
171 and document.rtf_column_header[0] is not None
172 ):
173 document.rtf_column_header[0].col_rel_width = [
174 document.rtf_body.col_rel_width[i]
175 for i in processed_col_indices
176 ]
177 else:
178 # Fallback: use equal widths if col_rel_width doesn't match or is None
179 if (
180 is_flat_header_list(document.rtf_column_header)
181 and len(document.rtf_column_header) > 0
182 and document.rtf_column_header[0] is not None
183 ):
184 document.rtf_column_header[0].col_rel_width = [1] * len(
185 columns
186 )
188 document.rtf_column_header = document.rtf_column_header[:1]
190 # Only update borders if DataFrame has rows
191 if (
192 dim[0] > 0
193 and is_flat_header_list(document.rtf_column_header)
194 and len(document.rtf_column_header) > 0
195 and document.rtf_column_header[0] is not None
196 ):
197 document.rtf_column_header[0].border_top = BroadcastValue(
198 value=document.rtf_column_header[0].border_top, dimension=dim
199 ).update_row(0, doc_border_top if doc_border_top is not None else [])
201 if is_nested_header_list(document.rtf_column_header):
202 # Handle nested list of headers
203 rtf_column_header = []
204 for section_headers in document.rtf_column_header:
205 if section_headers:
206 for header in section_headers:
207 if header:
208 rtf_column_header.append(
209 self.encoding_service.encode_column_header(
210 header.text, header, document.rtf_page.col_width
211 )
212 )
213 elif is_flat_header_list(document.rtf_column_header):
214 rtf_column_header = [
215 self.encoding_service.encode_column_header(
216 header.text if header else None,
217 header,
218 document.rtf_page.col_width,
219 )
220 for header in document.rtf_column_header
221 ]
222 elif is_single_header(document.rtf_column_header): # type: ignore[arg-type]
223 rtf_column_header = [
224 self.encoding_service.encode_column_header(
225 document.rtf_column_header.text,
226 document.rtf_column_header,
227 document.rtf_page.col_width,
228 )
229 ]
230 else:
231 rtf_column_header = []
233 # Only update borders if DataFrame has rows
234 if dim[0] > 0 and is_single_body(document.rtf_body):
235 if page_border_top is not None:
236 document.rtf_body.border_top = BroadcastValue(
237 value=document.rtf_body.border_top, dimension=dim
238 ).update_row(0, page_border_top)
240 # Bottom border last line update
241 if document.rtf_footnote is not None:
242 if page_border_bottom is not None:
243 document.rtf_footnote.border_bottom = BroadcastValue(
244 value=document.rtf_footnote.border_bottom, dimension=(1, 1)
245 ).update_row(0, [page_border_bottom[0]])
247 if doc_border_bottom is not None:
248 document.rtf_footnote.border_bottom = BroadcastValue(
249 value=document.rtf_footnote.border_bottom, dimension=(1, 1)
250 ).update_row(0, [doc_border_bottom[0]])
251 else:
252 # Only update borders if DataFrame has rows
253 if dim[0] > 0:
254 if page_border_bottom is not None and is_single_body(document.rtf_body):
255 document.rtf_body.border_bottom = BroadcastValue(
256 value=document.rtf_body.border_bottom, dimension=dim
257 ).update_row(dim[0] - 1, page_border_bottom)
259 if doc_border_bottom is not None and is_single_body(document.rtf_body):
260 document.rtf_body.border_bottom = BroadcastValue(
261 value=document.rtf_body.border_bottom, dimension=dim
262 ).update_row(dim[0] - 1, doc_border_bottom)
264 # Set document color context for accurate color index resolution
265 from ..services.color_service import color_service
267 color_service.set_document_context(document)
269 # Body
270 rtf_body = self.encoding_service.encode_body(
271 document, document.df, document.rtf_body, force_single_page=True
272 )
274 result = "\n".join(
275 [
276 item
277 for item in [
278 self.encoding_service.encode_document_start(),
279 self.encoding_service.encode_font_table(),
280 self.encoding_service.encode_color_table(document),
281 "\n",
282 self.encoding_service.encode_page_header(
283 document.rtf_page_header, method="line"
284 ),
285 self.encoding_service.encode_page_footer(
286 document.rtf_page_footer, method="line"
287 ),
288 self.encoding_service.encode_page_settings(document.rtf_page),
289 rtf_title,
290 "\n",
291 self.encoding_service.encode_subline(
292 document.rtf_subline, method="line"
293 ),
294 self.figure_service.encode_figure(document.rtf_figure)
295 if document.rtf_figure is not None
296 and document.rtf_figure.fig_pos == "before"
297 else None,
298 "\n".join(
299 header for sublist in rtf_column_header for header in sublist
300 )
301 if rtf_column_header
302 else None,
303 "\n".join(rtf_body),
304 "\n".join(
305 self.encoding_service.encode_footnote(
306 document.rtf_footnote,
307 page_number=1,
308 page_col_width=document.rtf_page.col_width,
309 )
310 )
311 if document.rtf_footnote is not None
312 else None,
313 "\n".join(
314 self.encoding_service.encode_source(
315 document.rtf_source,
316 page_number=1,
317 page_col_width=document.rtf_page.col_width,
318 )
319 )
320 if document.rtf_source is not None
321 else None,
322 self.figure_service.encode_figure(document.rtf_figure)
323 if document.rtf_figure is not None
324 and document.rtf_figure.fig_pos == "after"
325 else None,
326 "\n\n",
327 "}",
328 ]
329 if item is not None
330 ]
331 )
333 # Clear document context after encoding
334 color_service.clear_document_context()
336 return result
338 def _encode_multi_section_document(self, document: "RTFDocument") -> str:
339 """Encode a multi-section document where sections are concatenated row by row.
341 Args:
342 document: The RTF document with multiple df/rtf_body sections
344 Returns:
345 Complete RTF string
346 """
347 from ..attributes import BroadcastValue
349 # Calculate total rows across all sections for border management
350 if isinstance(document.df, list):
351 total_rows = sum(df.shape[0] for df in document.df)
352 first_section_cols = document.df[0].shape[1] if document.df else 0
353 else:
354 total_rows = document.df.shape[0] if document.df is not None else 0
355 first_section_cols = document.df.shape[1] if document.df is not None else 0
357 # Document structure components
358 rtf_title = self.encoding_service.encode_title(
359 document.rtf_title, method="line"
360 )
362 # Handle page borders (use first section for dimensions)
363 doc_border_top_list = BroadcastValue(
364 value=document.rtf_page.border_first, dimension=(1, first_section_cols)
365 ).to_list()
366 doc_border_top = doc_border_top_list[0] if doc_border_top_list else None
367 doc_border_bottom_list = BroadcastValue(
368 value=document.rtf_page.border_last, dimension=(1, first_section_cols)
369 ).to_list()
370 doc_border_bottom = (
371 doc_border_bottom_list[0] if doc_border_bottom_list else None
372 )
374 # Encode sections
375 all_section_content = []
376 is_nested_headers = is_nested_header_list(document.rtf_column_header)
378 df_list = (
379 document.df
380 if isinstance(document.df, list)
381 else [document.df]
382 if document.df is not None
383 else []
384 )
385 body_list = (
386 document.rtf_body
387 if isinstance(document.rtf_body, list)
388 else [document.rtf_body]
389 if document.rtf_body is not None
390 else []
391 )
393 for i, (section_df, section_body) in enumerate(zip(df_list, body_list)):
394 dim = section_df.shape
396 # Handle column headers for this section
397 section_headers: list[str] = []
398 if is_nested_headers:
399 # Nested format: [[header1], [None], [header3]]
400 if (
401 i < len(document.rtf_column_header)
402 and document.rtf_column_header[i]
403 ):
404 for header in document.rtf_column_header[i]:
405 if header is not None:
406 from ..input import RTFColumnHeader
408 # Ensure header is RTFColumnHeader, not tuple
409 if not isinstance(header, RTFColumnHeader):
410 continue
411 # Apply top border to first section's first header
412 if (
413 i == 0
414 and not section_headers
415 and doc_border_top is not None
416 ):
417 header.border_top = BroadcastValue(
418 value=header.border_top, dimension=dim
419 ).update_row(0, doc_border_top)
421 section_headers.append(
422 self.encoding_service.encode_column_header(
423 header.text, header, document.rtf_page.col_width
424 )
425 )
426 else:
427 # Flat format - only apply to first section
428 if i == 0:
429 headers_to_check = []
430 if is_flat_header_list(document.rtf_column_header):
431 headers_to_check = document.rtf_column_header
432 elif is_single_header(document.rtf_column_header): # type: ignore[arg-type]
433 headers_to_check = [document.rtf_column_header]
435 for header in headers_to_check:
436 if (
437 header is not None
438 and header.text is None
439 and section_body.as_colheader
440 ):
441 # Auto-generate headers from column names
442 columns = [
443 col
444 for col in section_df.columns
445 if col not in (section_body.page_by or [])
446 ]
447 import polars as pl
449 header_df = pl.DataFrame(
450 [columns],
451 schema=[f"col_{j}" for j in range(len(columns))],
452 orient="row",
453 )
454 header.text = header_df # type: ignore[assignment]
456 # Apply top border to first header
457 if (
458 not section_headers
459 and doc_border_top is not None
460 and header is not None
461 ):
462 header.border_top = BroadcastValue(
463 value=header.border_top, dimension=dim
464 ).update_row(
465 0, doc_border_top if doc_border_top is not None else []
466 )
468 if header is not None:
469 section_headers.append(
470 self.encoding_service.encode_column_header(
471 header.text, header, document.rtf_page.col_width
472 )
473 )
475 # Handle borders for section body
476 if i == 0 and not section_headers: # First section, no headers
477 # Apply top border to first row of first section
478 section_body.border_top = BroadcastValue(
479 value=section_body.border_top, dimension=dim
480 ).update_row(0, doc_border_top if doc_border_top is not None else [])
482 # Create a temporary document for this section to maintain compatibility
483 from copy import deepcopy
485 temp_document = deepcopy(document)
486 temp_document.df = section_df
487 temp_document.rtf_body = section_body
489 # Encode section body
490 section_body_content = self.encoding_service.encode_body(
491 temp_document, section_df, section_body
492 )
494 # Add section content
495 if section_headers:
496 all_section_content.extend(
497 [
498 "\n".join(
499 header for sublist in section_headers for header in sublist
500 )
501 ]
502 )
503 all_section_content.extend(section_body_content)
505 # Handle bottom borders on last section
506 if document.rtf_footnote is not None and doc_border_bottom is not None:
507 document.rtf_footnote.border_bottom = BroadcastValue(
508 value=document.rtf_footnote.border_bottom, dimension=(1, 1)
509 ).update_row(0, [doc_border_bottom[0]])
510 else:
511 # Apply bottom border to last section's last row
512 if isinstance(document.rtf_body, list) and isinstance(document.df, list):
513 last_section_body = document.rtf_body[-1]
514 last_section_dim = document.df[-1].shape
515 if last_section_dim[0] > 0 and doc_border_bottom is not None:
516 last_section_body.border_bottom = BroadcastValue(
517 value=last_section_body.border_bottom,
518 dimension=last_section_dim,
519 ).update_row(last_section_dim[0] - 1, doc_border_bottom)
521 return "\n".join(
522 [
523 item
524 for item in [
525 self.encoding_service.encode_document_start(),
526 self.encoding_service.encode_font_table(),
527 "\n",
528 self.encoding_service.encode_page_header(
529 document.rtf_page_header, method="line"
530 ),
531 self.encoding_service.encode_page_footer(
532 document.rtf_page_footer, method="line"
533 ),
534 self.encoding_service.encode_page_settings(document.rtf_page),
535 rtf_title,
536 "\n",
537 self.encoding_service.encode_subline(
538 document.rtf_subline, method="line"
539 ),
540 "\n".join(all_section_content),
541 "\n".join(
542 self.encoding_service.encode_footnote(
543 document.rtf_footnote,
544 page_number=1,
545 page_col_width=document.rtf_page.col_width,
546 )
547 )
548 if document.rtf_footnote is not None
549 else None,
550 "\n".join(
551 self.encoding_service.encode_source(
552 document.rtf_source,
553 page_number=1,
554 page_col_width=document.rtf_page.col_width,
555 )
556 )
557 if document.rtf_source is not None
558 else None,
559 "\n\n",
560 "}",
561 ]
562 if item is not None
563 ]
564 )
566 def _encode_figure_only_document_simple(self, document: "RTFDocument") -> str:
567 """Encode a figure-only document with simple page layout.
569 This handles figure-only documents with default page settings.
570 Multiple figures will have page breaks between them (handled by FigureService).
572 Args:
573 document: The RTF document with only figure content
575 Returns:
576 Complete RTF string
577 """
578 # Build RTF components for simple figure-only document
579 rtf_title = self.encoding_service.encode_title(
580 document.rtf_title, method="line"
581 )
583 # Assemble final RTF document
584 return "".join(
585 [
586 item
587 for item in [
588 self.encoding_service.encode_document_start(),
589 self.encoding_service.encode_font_table(),
590 self.encoding_service.encode_color_table(document),
591 "\n",
592 self.encoding_service.encode_page_header(
593 document.rtf_page_header, method="line"
594 ),
595 self.encoding_service.encode_page_footer(
596 document.rtf_page_footer, method="line"
597 ),
598 self.encoding_service.encode_page_settings(document.rtf_page),
599 rtf_title,
600 "\n",
601 self.encoding_service.encode_subline(
602 document.rtf_subline, method="line"
603 ),
604 # FigureService handles page breaks between multiple figures
605 self.figure_service.encode_figure(document.rtf_figure),
606 "\n".join(
607 self.encoding_service.encode_footnote(
608 document.rtf_footnote,
609 page_number=1,
610 page_col_width=document.rtf_page.col_width,
611 )
612 )
613 if document.rtf_footnote is not None
614 else None,
615 "\n".join(
616 self.encoding_service.encode_source(
617 document.rtf_source,
618 page_number=1,
619 page_col_width=document.rtf_page.col_width,
620 )
621 )
622 if document.rtf_source is not None
623 else None,
624 "\n\n",
625 "}",
626 ]
627 if item is not None
628 ]
629 )
632class PaginatedStrategy(EncodingStrategy):
633 """Encoding strategy for multi-page documents with pagination."""
635 def __init__(self):
636 from ..services import RTFEncodingService
637 from ..services.document_service import RTFDocumentService
638 from ..services.figure_service import RTFFigureService
640 self.encoding_service = RTFEncodingService()
641 self.document_service = RTFDocumentService()
642 self.figure_service = RTFFigureService()
644 def encode(self, document: "RTFDocument") -> str:
645 """Encode a paginated document with full pagination support.
647 Args:
648 document: The RTF document to encode
650 Returns:
651 Complete RTF string
652 """
653 from copy import deepcopy
655 import polars as pl
657 from ..attributes import BroadcastValue
658 from ..row import Utils
660 # Handle figure-only documents with multi-page behavior
661 if document.df is None:
662 return self._encode_figure_only_document_with_pagination(document)
664 # Get dimensions based on DataFrame type
665 if isinstance(document.df, list):
666 # For list of DataFrames, use first one's columns
667 dim = (
668 sum(df.shape[0] for df in document.df),
669 document.df[0].shape[1] if document.df else 0,
670 )
671 else:
672 dim = document.df.shape
674 # Set document color context for accurate color index resolution
675 from ..services.color_service import color_service
677 color_service.set_document_context(document)
679 # Prepare DataFrame for processing (remove subline_by columns, apply group_by if needed)
680 processed_df, original_df = (
681 self.encoding_service.prepare_dataframe_for_body_encoding(
682 document.df, document.rtf_body
683 )
684 )
686 # Validate subline_by formatting consistency before processing
687 if (
688 is_single_body(document.rtf_body)
689 and document.rtf_body.subline_by is not None
690 ):
691 import warnings
692 from typing import cast
694 subline_by_list = cast(list[str], document.rtf_body.subline_by)
695 formatting_warnings = (
696 grouping_service.validate_subline_formatting_consistency(
697 original_df, subline_by_list, document.rtf_body
698 )
699 )
700 for warning_msg in formatting_warnings:
701 warnings.warn(
702 f"subline_by formatting: {warning_msg}", UserWarning, stacklevel=3
703 )
705 # Get pagination instance and distribute content (use processed data for distribution)
706 _, distributor = self.document_service.create_pagination_instance(document)
707 col_total_width = document.rtf_page.col_width
708 if (
709 is_single_body(document.rtf_body)
710 and document.rtf_body.col_rel_width is not None
711 ):
712 col_widths = Utils._col_widths(
713 document.rtf_body.col_rel_width,
714 col_total_width if col_total_width is not None else 8.5,
715 )
716 else:
717 # Default to equal widths if body is not single
718 col_widths = Utils._col_widths(
719 [1] * dim[1], col_total_width if col_total_width is not None else 8.5
720 )
722 # Calculate additional rows per page for r2rtf compatibility
723 additional_rows = self.document_service.calculate_additional_rows_per_page(
724 document
725 )
727 # Use original DataFrame for pagination logic (to identify subline_by breaks)
728 # but processed DataFrame for the actual content
729 if is_single_body(document.rtf_body):
730 pages = distributor.distribute_content(
731 df=original_df, # Use original DataFrame for proper pagination distribution logic
732 col_widths=col_widths,
733 page_by=document.rtf_body.page_by,
734 new_page=document.rtf_body.new_page,
735 pageby_header=document.rtf_body.pageby_header,
736 table_attrs=document.rtf_body,
737 additional_rows_per_page=additional_rows,
738 subline_by=document.rtf_body.subline_by,
739 )
740 else:
741 # Default pagination if body is not single
742 pages = distributor.distribute_content(
743 df=original_df,
744 col_widths=col_widths,
745 page_by=None,
746 new_page=None,
747 pageby_header=None,
748 table_attrs=None,
749 additional_rows_per_page=additional_rows,
750 subline_by=None,
751 )
753 # Replace page data with processed data (without subline_by columns)
754 for i, page_info in enumerate(pages):
755 start_row = page_info["start_row"]
756 end_row = page_info["end_row"]
757 page_info["data"] = processed_df.slice(start_row, end_row - start_row + 1)
759 # Apply group_by processing to each page if needed
760 if is_single_body(document.rtf_body) and document.rtf_body.group_by:
761 # Calculate global page start indices for context restoration
762 page_start_indices = []
763 cumulative_rows = 0
764 for i, page_info in enumerate(pages):
765 if i > 0: # Skip first page (starts at 0)
766 page_start_indices.append(cumulative_rows)
767 cumulative_rows += len(page_info["data"])
769 # Process all pages together for proper group_by and page context restoration
770 all_page_data = []
771 for page_info in pages:
772 all_page_data.append(page_info["data"])
774 # Concatenate all page data
775 full_df = all_page_data[0]
776 for page_df in all_page_data[1:]:
777 full_df = full_df.vstack(page_df)
779 # Apply group_by suppression to the full dataset
780 from typing import cast
782 group_by_param = cast(list[str] | None, document.rtf_body.group_by)
783 suppressed_df = grouping_service.enhance_group_by(full_df, group_by_param)
785 # Apply page context restoration
786 from typing import cast
788 group_by_list2 = cast(list[str], document.rtf_body.group_by)
789 restored_df = grouping_service.restore_page_context(
790 suppressed_df, full_df, group_by_list2, page_start_indices
791 )
793 # Split the processed data back to pages
794 start_idx = 0
795 for page_info in pages:
796 page_rows = len(page_info["data"])
797 page_info["data"] = restored_df.slice(start_idx, page_rows)
798 start_idx += page_rows
800 # Prepare border settings
801 border_first_list = BroadcastValue(
802 value=document.rtf_page.border_first, dimension=(1, dim[1])
803 ).to_list()
804 _ = (
805 border_first_list[0] if border_first_list else None
806 ) # May be used for validation
807 border_last_list = BroadcastValue(
808 value=document.rtf_page.border_last, dimension=(1, dim[1])
809 ).to_list()
810 _ = (
811 border_last_list[0] if border_last_list else None
812 ) # May be used for validation
814 # Generate RTF for each page
815 page_contents = []
817 for page_info in pages:
818 page_elements = []
820 # Add page break before each page (except first)
821 if not page_info["is_first_page"]:
822 page_elements.append(
823 self.document_service.generate_page_break(document)
824 )
826 # Add title if it should appear on this page
827 if (
828 document.rtf_title
829 and document.rtf_title.text
830 and self.document_service.should_show_element_on_page(
831 document.rtf_page.page_title, page_info
832 )
833 ):
834 title_content = self.encoding_service.encode_title(
835 document.rtf_title, method="line"
836 )
837 if title_content:
838 page_elements.append(title_content)
839 page_elements.append("\n")
841 # Add subline if it should appear on this page
842 if (
843 document.rtf_subline
844 and document.rtf_subline.text
845 and self.document_service.should_show_element_on_page(
846 document.rtf_page.page_title, page_info
847 )
848 ):
849 subline_content = self.encoding_service.encode_subline(
850 document.rtf_subline, method="line"
851 )
852 if subline_content:
853 page_elements.append(subline_content)
855 # Add subline_by header paragraph if specified
856 if page_info.get("subline_header"):
857 subline_header_content = self._generate_subline_header(
858 page_info["subline_header"], document.rtf_body
859 )
860 if subline_header_content:
861 page_elements.append(subline_header_content)
863 # Add figures if they should appear on the first page and position is 'before'
864 if (
865 document.rtf_figure
866 and document.rtf_figure.figures
867 and document.rtf_figure.fig_pos == "before"
868 and page_info["is_first_page"]
869 ):
870 figure_content = self.figure_service.encode_figure(document.rtf_figure)
871 if figure_content:
872 page_elements.append(figure_content)
873 page_elements.append("\n")
875 # Add column headers if needed
876 if page_info["needs_header"] and document.rtf_column_header:
877 if (
878 is_flat_header_list(document.rtf_column_header)
879 and len(document.rtf_column_header) > 0
880 and document.rtf_column_header[0] is not None
881 and document.rtf_column_header[0].text is None
882 and is_single_body(document.rtf_body)
883 and document.rtf_body.as_colheader
884 ):
885 # Use the processed page data columns (which already have subline_by columns removed)
886 page_df = page_info["data"]
887 columns = list(page_df.columns)
888 # Create DataFrame for text field (not assign list to text)
889 import polars as pl
891 header_df = pl.DataFrame(
892 [columns],
893 schema=[f"col_{i}" for i in range(len(columns))],
894 orient="row",
895 )
896 document.rtf_column_header[0].text = header_df # type: ignore[assignment]
898 # Adjust col_rel_width to match the processed columns (without subline_by)
899 if (
900 is_single_body(document.rtf_body)
901 and document.rtf_body.subline_by
902 ):
903 original_cols = (
904 list(document.df.columns)
905 if isinstance(document.df, pl.DataFrame)
906 else []
907 )
908 subline_cols = set(document.rtf_body.subline_by)
909 processed_col_indices = [
910 i
911 for i, col in enumerate(original_cols)
912 if col not in subline_cols
913 ]
915 # Ensure we have enough col_rel_width values for all original columns
916 if (
917 is_single_body(document.rtf_body)
918 and document.rtf_body.col_rel_width is not None
919 and len(document.rtf_body.col_rel_width)
920 >= len(original_cols)
921 and is_flat_header_list(document.rtf_column_header)
922 and len(document.rtf_column_header) > 0
923 and document.rtf_column_header[0] is not None
924 ):
925 document.rtf_column_header[0].col_rel_width = [
926 document.rtf_body.col_rel_width[i]
927 for i in processed_col_indices
928 ]
929 elif (
930 is_flat_header_list(document.rtf_column_header)
931 and len(document.rtf_column_header) > 0
932 and document.rtf_column_header[0] is not None
933 ):
934 # Fallback: use equal widths if col_rel_width doesn't match
935 document.rtf_column_header[0].col_rel_width = [1] * len(
936 columns
937 )
939 # Apply pagination borders to column headers
940 # Process each column header with proper borders
941 header_elements = []
942 headers_to_process = []
943 if is_nested_header_list(document.rtf_column_header):
944 # For nested headers, flatten them
945 for section_headers in document.rtf_column_header:
946 if section_headers:
947 headers_to_process.extend(section_headers)
948 elif is_flat_header_list(document.rtf_column_header):
949 headers_to_process = document.rtf_column_header
951 for i, header in enumerate(headers_to_process):
952 if header is None:
953 continue
954 header_copy = deepcopy(header)
956 # Apply page-level borders to column headers (matching non-paginated behavior)
957 if (
958 page_info["is_first_page"] and i == 0
959 ): # First header on first page
960 if (
961 document.rtf_page.border_first
962 and header_copy.text is not None
963 ):
964 # Get dimensions based on text type
965 import polars as pl
967 if isinstance(header_copy.text, pl.DataFrame):
968 header_dims = header_copy.text.shape
969 else:
970 # For Sequence[str], assume single row
971 header_dims = (
972 1,
973 len(header_copy.text) if header_copy.text else 0,
974 )
975 # Apply page border_first to top of first column header
976 header_copy.border_top = BroadcastValue(
977 value=header_copy.border_top, dimension=header_dims
978 ).update_row(
979 0, [document.rtf_page.border_first] * header_dims[1]
980 )
982 # Encode the header with modified borders
983 # Use the header_copy to preserve border modifications
984 header_rtf = self.encoding_service.encode_column_header(
985 header_copy.text, header_copy, document.rtf_page.col_width
986 )
987 header_elements.extend(header_rtf)
989 page_elements.extend(header_elements)
991 # Add page content (table body) with proper border handling
992 page_df = page_info["data"]
994 # Apply pagination borders to the body attributes
995 page_attrs = self.document_service.apply_pagination_borders(
996 document, document.rtf_body, page_info, len(pages)
997 )
999 # Encode page content with modified borders
1000 page_body = page_attrs._encode(page_df, col_widths)
1001 page_elements.extend(page_body)
1003 # Add footnote if it should appear on this page
1004 if (
1005 document.rtf_footnote
1006 and document.rtf_footnote.text
1007 and self.document_service.should_show_element_on_page(
1008 document.rtf_page.page_footnote, page_info
1009 )
1010 ):
1011 footnote_content = self.encoding_service.encode_footnote(
1012 document.rtf_footnote,
1013 page_info["page_number"],
1014 document.rtf_page.col_width,
1015 )
1016 if footnote_content:
1017 page_elements.extend(footnote_content)
1019 # Add source if it should appear on this page
1020 if (
1021 document.rtf_source
1022 and document.rtf_source.text
1023 and self.document_service.should_show_element_on_page(
1024 document.rtf_page.page_source, page_info
1025 )
1026 ):
1027 source_content = self.encoding_service.encode_source(
1028 document.rtf_source,
1029 page_info["page_number"],
1030 document.rtf_page.col_width,
1031 )
1032 if source_content:
1033 page_elements.extend(source_content)
1035 # Add figures if they should appear on the last page and position is 'after'
1036 if (
1037 document.rtf_figure
1038 and document.rtf_figure.figures
1039 and document.rtf_figure.fig_pos == "after"
1040 and page_info["is_last_page"]
1041 ):
1042 figure_content = self.figure_service.encode_figure(document.rtf_figure)
1043 if figure_content:
1044 page_elements.append(figure_content)
1046 page_contents.extend(page_elements)
1048 # Build complete RTF document
1049 result = "\n".join(
1050 [
1051 item
1052 for item in [
1053 self.encoding_service.encode_document_start(),
1054 self.encoding_service.encode_font_table(),
1055 self.encoding_service.encode_color_table(document),
1056 "\n",
1057 self.encoding_service.encode_page_header(
1058 document.rtf_page_header, method="line"
1059 ),
1060 self.encoding_service.encode_page_footer(
1061 document.rtf_page_footer, method="line"
1062 ),
1063 self.encoding_service.encode_page_settings(document.rtf_page),
1064 "\n".join(page_contents),
1065 "\n\n",
1066 "}",
1067 ]
1068 if item is not None
1069 ]
1070 )
1072 # Clear document context after encoding
1073 color_service.clear_document_context()
1075 return result
1077 def _encode_figure_only_document_with_pagination(
1078 self, document: "RTFDocument"
1079 ) -> str:
1080 """Encode a figure-only document with multi-page behavior.
1082 This method handles figure-only documents where the user has requested
1083 elements to appear on all pages (page_title="all", etc.)
1085 For multiple figures, each figure will be on a separate page with
1086 repeated titles/footnotes/sources as specified.
1088 Args:
1089 document: The RTF document with only figure content
1091 Returns:
1092 Complete RTF string
1093 """
1094 from copy import deepcopy
1096 from ..figure import rtf_read_figure
1098 # Get figure information
1099 if document.rtf_figure is None or document.rtf_figure.figures is None:
1100 return ""
1102 # Read figure data to determine number of figures
1103 figure_data_list, figure_formats = rtf_read_figure(document.rtf_figure.figures)
1104 num_figures = len(figure_data_list)
1106 # Build RTF components
1107 rtf_title = self.encoding_service.encode_title(
1108 document.rtf_title, method="line"
1109 )
1111 # For figure-only documents, footnote should be as_table=False
1112 footnote_component = document.rtf_footnote
1113 if footnote_component is not None:
1114 footnote_component = deepcopy(footnote_component)
1115 footnote_component.as_table = False
1117 # Determine which elements should show on each page
1118 show_title_on_all = document.rtf_page.page_title == "all"
1119 show_footnote_on_all = document.rtf_page.page_footnote == "all"
1120 show_source_on_all = document.rtf_page.page_source == "all"
1122 page_elements = []
1124 # Add document start
1125 page_elements.append(self.encoding_service.encode_document_start())
1126 page_elements.append(self.encoding_service.encode_font_table())
1127 page_elements.append(self.encoding_service.encode_color_table(document))
1128 page_elements.append("\n")
1130 # Add page settings (headers/footers)
1131 page_elements.append(
1132 self.encoding_service.encode_page_header(
1133 document.rtf_page_header, method="line"
1134 )
1135 )
1136 page_elements.append(
1137 self.encoding_service.encode_page_footer(
1138 document.rtf_page_footer, method="line"
1139 )
1140 )
1141 page_elements.append(
1142 self.encoding_service.encode_page_settings(document.rtf_page)
1143 )
1145 # Create each page with figure and repeated elements
1146 for i in range(num_figures):
1147 is_first_page = i == 0
1148 is_last_page = i == num_figures - 1
1150 # Add title based on page settings
1151 if (
1152 show_title_on_all
1153 or (document.rtf_page.page_title == "first" and is_first_page)
1154 or (document.rtf_page.page_title == "last" and is_last_page)
1155 ):
1156 page_elements.append(rtf_title)
1157 page_elements.append("\n")
1159 # Add subline
1160 if is_first_page: # Only on first page
1161 page_elements.append(
1162 self.encoding_service.encode_subline(
1163 document.rtf_subline, method="line"
1164 )
1165 )
1167 # Add single figure
1168 width = self.figure_service._get_dimension(document.rtf_figure.fig_width, i)
1169 height = self.figure_service._get_dimension(
1170 document.rtf_figure.fig_height, i
1171 )
1173 figure_rtf = self.figure_service._encode_single_figure(
1174 figure_data_list[i],
1175 figure_formats[i],
1176 width,
1177 height,
1178 document.rtf_figure.fig_align,
1179 )
1180 page_elements.append(figure_rtf)
1181 page_elements.append("\\par ")
1183 # Add footnote based on page settings
1184 if footnote_component is not None and (
1185 show_footnote_on_all
1186 or (document.rtf_page.page_footnote == "first" and is_first_page)
1187 or (document.rtf_page.page_footnote == "last" and is_last_page)
1188 ):
1189 footnote_content = "\n".join(
1190 self.encoding_service.encode_footnote(
1191 footnote_component,
1192 page_number=i + 1,
1193 page_col_width=document.rtf_page.col_width,
1194 )
1195 )
1196 if footnote_content:
1197 page_elements.append(footnote_content)
1199 # Add source based on page settings
1200 if document.rtf_source is not None and (
1201 show_source_on_all
1202 or (document.rtf_page.page_source == "first" and is_first_page)
1203 or (document.rtf_page.page_source == "last" and is_last_page)
1204 ):
1205 source_content = "\n".join(
1206 self.encoding_service.encode_source(
1207 document.rtf_source,
1208 page_number=i + 1,
1209 page_col_width=document.rtf_page.col_width,
1210 )
1211 )
1212 if source_content:
1213 page_elements.append(source_content)
1215 # Add page break between figures (except after last figure)
1216 if not is_last_page:
1217 page_elements.append("\\page ")
1219 # Close document
1220 page_elements.append("\n\n")
1221 page_elements.append("}")
1223 return "".join([item for item in page_elements if item is not None])
1225 def _generate_subline_header(self, subline_header_info: dict, rtf_body) -> str:
1226 """Generate RTF paragraph for subline_by header.
1228 Args:
1229 subline_header_info: Dictionary with column values for the subline header
1230 rtf_body: RTFBody attributes for formatting
1232 Returns:
1233 RTF string for the subline paragraph
1234 """
1235 if not subline_header_info:
1236 return ""
1238 # Use the raw group values without column names
1239 if "group_values" in subline_header_info:
1240 # Extract just the values without column prefixes
1241 header_parts = []
1242 for col, value in subline_header_info["group_values"].items():
1243 if value is not None:
1244 header_parts.append(str(value))
1246 if not header_parts:
1247 return ""
1249 header_text = ", ".join(header_parts)
1250 else:
1251 # Fallback for backward compatibility
1252 header_parts = []
1253 for col, value in subline_header_info.items():
1254 if value is not None and col not in ["group_by_columns", "header_text"]:
1255 header_parts.append(str(value))
1257 if not header_parts:
1258 return ""
1260 header_text = ", ".join(header_parts)
1262 # Create RTF paragraph with minimal spacing (no sb180/sa180 to eliminate space between header and table)
1263 return (
1264 f"{{\\pard\\hyphpar\\fi0\\li0\\ri0\\ql\\fs18{{\\f0 {header_text}}}\\par}}"
1265 )