Coverage for src / rtflite / services / encoding_service.py: 65%
216 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-28 05:09 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-28 05:09 +0000
1"""RTF encoding service that handles document component encoding."""
3from collections.abc import Sequence
5from .grouping_service import grouping_service
8class RTFEncodingService:
9 """Service class that handles RTF component encoding operations.
11 This class extracts encoding logic from RTFDocument to improve separation
12 of concerns and enable better testing and maintainability.
13 """
15 def __init__(self):
16 from ..rtf import RTFSyntaxGenerator
18 self.syntax = RTFSyntaxGenerator()
20 def encode_spanning_row(
21 self,
22 text: str,
23 page_width: float,
24 rtf_body_attrs=None,
25 ) -> Sequence[str]:
26 """Generate a spanning table row (single cell spanning full width).
28 This is used for page_by group headers that span across all columns.
29 Works for both single-page and paginated documents.
31 Args:
32 text: Text to display in the spanning row
33 page_width: Total page width in inches
34 rtf_body_attrs: RTFBody attributes for styling (optional)
36 Returns:
37 List of RTF strings for the spanning row
38 """
39 from ..row import Border, Cell, Row, TextContent
41 # Use body attributes if provided, otherwise use defaults
42 if rtf_body_attrs:
43 font = rtf_body_attrs.text_font[0][0] if rtf_body_attrs.text_font else 0
44 size = (
45 rtf_body_attrs.text_font_size[0][0]
46 if rtf_body_attrs.text_font_size
47 else 18
48 )
49 text_format = (
50 rtf_body_attrs.text_format[0][0] if rtf_body_attrs.text_format else ""
51 )
52 color = rtf_body_attrs.text_color[0][0] if rtf_body_attrs.text_color else ""
53 bg_color = (
54 rtf_body_attrs.text_background_color[0][0]
55 if rtf_body_attrs.text_background_color
56 else ""
57 )
58 justification = (
59 rtf_body_attrs.text_justification[0][0]
60 if rtf_body_attrs.text_justification
61 else "c"
62 )
63 border_left = (
64 rtf_body_attrs.border_left[0][0]
65 if rtf_body_attrs.border_left
66 else "single"
67 )
68 border_right = (
69 rtf_body_attrs.border_right[0][0]
70 if rtf_body_attrs.border_right
71 else "single"
72 )
73 border_top = (
74 rtf_body_attrs.border_top[0][0]
75 if rtf_body_attrs.border_top
76 else "single"
77 )
78 border_bottom = (
79 rtf_body_attrs.border_bottom[0][0]
80 if rtf_body_attrs.border_bottom
81 else "single"
82 )
83 v_just = (
84 rtf_body_attrs.cell_vertical_justification[0][0]
85 if rtf_body_attrs.cell_vertical_justification
86 else "b"
87 )
88 cell_just = (
89 rtf_body_attrs.cell_justification[0][0]
90 if rtf_body_attrs.cell_justification
91 else "c"
92 )
93 else:
94 font = 0
95 size = 18
96 text_format = ""
97 color = ""
98 bg_color = ""
99 justification = "c"
100 border_left = "single"
101 border_right = "single"
102 border_top = "single"
103 border_bottom = "single"
104 v_just = "b"
105 cell_just = "c"
107 # Create spanning cell
108 cell = Cell(
109 text=TextContent(
110 text=text,
111 font=font,
112 size=size,
113 format=text_format,
114 color=color,
115 background_color=bg_color,
116 justification=justification,
117 indent_first=0,
118 indent_left=0,
119 indent_right=0,
120 space=0, # No line spacing
121 space_before=15,
122 space_after=15,
123 convert=False,
124 hyphenation=True,
125 ),
126 width=page_width,
127 border_left=Border(style=border_left),
128 border_right=Border(style=border_right),
129 border_top=Border(style=border_top),
130 border_bottom=Border(style=border_bottom),
131 vertical_justification=v_just,
132 )
134 # Create row with single spanning cell
135 row = Row(row_cells=[cell], justification=cell_just, height=0)
137 return row._as_rtf()
139 def encode_document_start(self) -> str:
140 """Encode RTF document start."""
141 return "{\\rtf1\\ansi\n\\deff0\\deflang1033"
143 def encode_font_table(self) -> str:
144 """Encode RTF font table."""
145 return self.syntax.generate_font_table()
147 def encode_color_table(
148 self, document=None, used_colors: Sequence[str] | None = None
149 ) -> str:
150 """Encode RTF color table with comprehensive 657-color support.
152 Args:
153 document: RTF document to analyze for color usage (preferred)
154 used_colors: Color names used in the document. If None and a
155 document is provided, colors are auto-detected.
157 Returns:
158 RTF color table string (empty if no colors beyond black/"" are used)
159 """
160 if document is not None and used_colors is None:
161 # Auto-detect colors from document
162 from ..services.color_service import color_service
164 used_colors = color_service.collect_document_colors(document)
166 return self.syntax.generate_color_table(used_colors)
168 def encode_page_settings(self, page_config) -> str:
169 """Encode RTF page settings.
171 Args:
172 page_config: RTFPage configuration object
174 Returns:
175 RTF page settings string
176 """
177 return self.syntax.generate_page_settings(
178 page_config.width,
179 page_config.height,
180 page_config.margin,
181 page_config.orientation,
182 )
184 def encode_page_header(self, header_config, method: str = "line") -> str:
185 """Encode page header component.
187 Args:
188 header_config: RTFPageHeader configuration
189 method: Encoding method
191 Returns:
192 RTF header string
193 """
194 if header_config is None or not header_config.text:
195 return ""
197 # Use the existing text encoding method
198 result = header_config._encode_text(text=header_config.text, method=method)
200 return f"{{\\header{result}}}"
202 def encode_page_footer(self, footer_config, method: str = "line") -> str:
203 """Encode page footer component.
205 Args:
206 footer_config: RTFPageFooter configuration
207 method: Encoding method
209 Returns:
210 RTF footer string
211 """
212 if footer_config is None or not footer_config.text:
213 return ""
215 # Use the existing text encoding method
216 result = footer_config._encode_text(text=footer_config.text, method=method)
217 return f"{{\\footer{result}}}"
219 def encode_title(self, title_config, method: str = "line") -> str:
220 """Encode title component.
222 Args:
223 title_config: RTFTitle configuration
224 method: Encoding method
226 Returns:
227 RTF title string
228 """
229 if not title_config or not title_config.text:
230 return ""
232 # Use the existing text encoding method
233 return title_config._encode_text(text=title_config.text, method=method)
235 def encode_subline(self, subline_config, method: str = "line") -> str:
236 """Encode subline component.
238 Args:
239 subline_config: RTFSubline configuration
240 method: Encoding method
242 Returns:
243 RTF subline string
244 """
245 if subline_config is None or not subline_config.text:
246 return ""
248 # Use the existing text encoding method
249 return subline_config._encode_text(text=subline_config.text, method=method)
251 def encode_footnote(
252 self,
253 footnote_config,
254 page_number: int | None = None,
255 page_col_width: float | None = None,
256 ) -> Sequence[str]:
257 """Encode footnote component with advanced formatting.
259 Args:
260 footnote_config: RTFFootnote configuration
261 page_number: Page number for footnote
262 page_col_width: Page column width for calculations
264 Returns:
265 List of RTF footnote strings
266 """
267 if footnote_config is None:
268 return []
270 rtf_attrs = footnote_config
272 # Apply page-specific border if set
273 if (
274 hasattr(rtf_attrs, "_page_border_style")
275 and page_number is not None
276 and page_number in rtf_attrs._page_border_style
277 ):
278 border_style = rtf_attrs._page_border_style[page_number]
279 # Create a copy with modified border
280 rtf_attrs = rtf_attrs.model_copy()
281 rtf_attrs.border_bottom = [[border_style]]
283 # Check if footnote should be rendered as table or paragraph
284 if hasattr(rtf_attrs, "as_table") and not rtf_attrs.as_table:
285 # Render as paragraph (plain text)
286 if isinstance(rtf_attrs.text, list):
287 text_list = rtf_attrs.text
288 else:
289 text_list = [rtf_attrs.text] if rtf_attrs.text else []
291 # Use TextAttributes._encode_text method directly for paragraph rendering
292 return rtf_attrs._encode_text(text_list, method="paragraph")
293 else:
294 # Render as table (default behavior)
295 if page_col_width is not None:
296 from ..row import Utils
298 col_total_width = page_col_width
299 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, col_total_width)
301 # Create DataFrame from text string
302 import polars as pl
304 df = pl.DataFrame([[rtf_attrs.text]])
305 return rtf_attrs._encode(df, col_widths)
306 else:
307 # Fallback without column width calculations
308 import polars as pl
310 df = pl.DataFrame([[rtf_attrs.text]])
311 return rtf_attrs._encode(df)
313 def encode_source(
314 self,
315 source_config,
316 page_number: int | None = None,
317 page_col_width: float | None = None,
318 ) -> Sequence[str]:
319 """Encode source component with advanced formatting.
321 Args:
322 source_config: RTFSource configuration
323 page_number: Page number for source
324 page_col_width: Page column width for calculations
326 Returns:
327 List of RTF source strings
328 """
329 if source_config is None:
330 return []
332 rtf_attrs = source_config
334 # Apply page-specific border if set
335 if (
336 hasattr(rtf_attrs, "_page_border_style")
337 and page_number is not None
338 and page_number in rtf_attrs._page_border_style
339 ):
340 border_style = rtf_attrs._page_border_style[page_number]
341 # Create a copy with modified border
342 rtf_attrs = rtf_attrs.model_copy()
343 rtf_attrs.border_bottom = [[border_style]]
345 # Check if source should be rendered as table or paragraph
346 if hasattr(rtf_attrs, "as_table") and not rtf_attrs.as_table:
347 # Render as paragraph (plain text)
348 if isinstance(rtf_attrs.text, list):
349 text_list = rtf_attrs.text
350 else:
351 text_list = [rtf_attrs.text] if rtf_attrs.text else []
353 # Use TextAttributes._encode_text method directly for paragraph rendering
354 return rtf_attrs._encode_text(text_list, method="paragraph")
355 else:
356 # Render as table (default behavior)
357 if page_col_width is not None:
358 from ..row import Utils
360 col_total_width = page_col_width
361 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, col_total_width)
363 # Create DataFrame from text string
364 import polars as pl
366 df = pl.DataFrame([[rtf_attrs.text]])
367 return rtf_attrs._encode(df, col_widths)
368 else:
369 # Fallback without column width calculations
370 import polars as pl
372 df = pl.DataFrame([[rtf_attrs.text]])
373 return rtf_attrs._encode(df)
375 def prepare_dataframe_for_body_encoding(self, df, rtf_attrs):
376 """Prepare DataFrame for body encoding with group_by and column removal.
378 Args:
379 df: Input DataFrame
380 rtf_attrs: RTFBody attributes
382 Returns:
383 Tuple of (processed_df, original_df) where processed_df has
384 transformations applied
385 """
386 original_df = df.clone()
387 processed_df = df.clone()
389 # Collect columns to remove
390 columns_to_remove = set()
392 # Remove subline_by columns from the processed DataFrame
393 if rtf_attrs.subline_by is not None:
394 columns_to_remove.update(rtf_attrs.subline_by)
396 # Remove page_by columns from table display
397 # page_by columns are shown as spanning rows, not as table columns
398 # The new_page flag only controls whether to force page breaks at group boundaries
399 if rtf_attrs.page_by is not None:
400 columns_to_remove.update(rtf_attrs.page_by)
402 # Apply column removal if any columns need to be removed
403 if columns_to_remove:
404 remaining_columns = [
405 col for col in processed_df.columns if col not in columns_to_remove
406 ]
407 processed_df = processed_df.select(remaining_columns)
409 # Update col_rel_width to match the new column count
410 # Find indices of removed columns to remove corresponding width entries
411 if rtf_attrs.col_rel_width is not None:
412 if len(rtf_attrs.col_rel_width) == len(original_df.columns):
413 removed_indices = [
414 i
415 for i, col in enumerate(original_df.columns)
416 if col in columns_to_remove
417 ]
418 # Create new col_rel_width with removed column widths excluded
419 new_col_rel_width = [
420 width
421 for i, width in enumerate(rtf_attrs.col_rel_width)
422 if i not in removed_indices
423 ]
424 # Update rtf_attrs with new col_rel_width
425 rtf_attrs.col_rel_width = new_col_rel_width
427 # Note: group_by suppression is handled in the pagination strategy
428 # for documents that need pagination. For non-paginated documents,
429 # group_by is handled separately in encode_body method.
431 return processed_df, original_df
433 def encode_body(
434 self, document, df, rtf_attrs, force_single_page=False
435 ) -> Sequence[str] | None:
436 """Encode table body component with full pagination support.
438 Args:
439 document: RTFDocument instance for accessing pagination logic
440 df: DataFrame containing table data
441 rtf_attrs: RTFBody attributes
443 Returns:
444 List of RTF body strings
445 """
446 if rtf_attrs is None:
447 return None
449 # Initialize dimensions and widths
450 from ..row import Utils
451 from .document_service import RTFDocumentService
453 document_service = RTFDocumentService()
454 col_total_width = document.rtf_page.col_width
456 # Validate data sorting for all grouping parameters
457 if any([rtf_attrs.group_by, rtf_attrs.page_by, rtf_attrs.subline_by]):
458 grouping_service.validate_data_sorting(
459 df,
460 group_by=rtf_attrs.group_by,
461 page_by=rtf_attrs.page_by,
462 subline_by=rtf_attrs.subline_by,
463 )
465 # Validate subline_by formatting consistency and issue warnings
466 if rtf_attrs.subline_by is not None:
467 import warnings
469 formatting_warnings = (
470 grouping_service.validate_subline_formatting_consistency(
471 df, rtf_attrs.subline_by, rtf_attrs
472 )
473 )
474 for warning_msg in formatting_warnings:
475 warnings.warn(
476 f"subline_by formatting: {warning_msg}", UserWarning, stacklevel=2
477 )
479 # Apply group_by and subline_by processing if specified
480 processed_df, original_df = self.prepare_dataframe_for_body_encoding(
481 df, rtf_attrs
482 )
484 # Calculate col_widths AFTER prepare_dataframe_for_body_encoding()
485 # because that method may modify col_rel_width when removing columns (page_by, subline_by)
486 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, col_total_width)
488 # Check if pagination is needed (unless forced to single page)
489 if not force_single_page and document_service.needs_pagination(document):
490 return self._encode_body_paginated(
491 document, processed_df, rtf_attrs, col_widths
492 )
494 # Handle existing page_by grouping (non-paginated)
495 page_by = document_service.process_page_by(document)
496 if page_by is None:
497 # Note: subline_by documents should use pagination, so this path
498 # should not be reached for them
499 # Apply group_by processing for non-paginated documents
500 if rtf_attrs.group_by is not None:
501 processed_df = grouping_service.enhance_group_by(
502 processed_df, rtf_attrs.group_by
503 )
504 return rtf_attrs._encode(processed_df, col_widths)
506 rows: list[str] = []
507 for section in page_by:
508 # Skip empty sections
509 indices = [(row, col) for row, col, level in section]
510 if not indices:
511 continue
513 # Create DataFrame for current section
514 import polars as pl
516 from ..attributes import BroadcastValue
518 section_df = pl.DataFrame(
519 {
520 str(i): [
521 BroadcastValue(value=processed_df, dimension=None).iloc(
522 row, col
523 )
524 ]
525 for i, (row, col) in enumerate(indices)
526 }
527 )
529 # Collect all text and table attributes
530 from ..input import TableAttributes
532 section_attrs_dict = rtf_attrs._get_section_attributes(indices)
533 section_attrs = TableAttributes(**section_attrs_dict)
535 # Calculate column widths and encode section
536 if section_attrs.col_rel_width is None:
537 # Default to equal widths if not specified
538 section_attrs.col_rel_width = [1.0] * len(indices)
539 section_col_widths = Utils._col_widths(
540 section_attrs.col_rel_width, col_total_width
541 )
542 rows.extend(section_attrs._encode(section_df, section_col_widths))
544 return rows
546 def _encode_body_paginated(
547 self, document, df, rtf_attrs, col_widths
548 ) -> Sequence[str]:
549 """Encode body content with pagination support."""
550 from .document_service import RTFDocumentService
552 document_service = RTFDocumentService()
553 _, distributor = document_service.create_pagination_instance(document)
555 # Distribute content across pages (r2rtf compatible)
556 additional_rows = document_service.calculate_additional_rows_per_page(document)
557 pages = distributor.distribute_content(
558 df=df,
559 col_widths=col_widths,
560 table_attrs=rtf_attrs,
561 additional_rows_per_page=additional_rows,
562 )
564 # Generate RTF for each page
565 all_rows = []
566 for page_num, page_content in enumerate(pages, 1):
567 page_rows = []
569 # Add page header content
570 if page_content.get("headers"):
571 for header_content in page_content["headers"]:
572 header_text = header_content.get("text", "")
573 if header_text:
574 page_rows.append(header_text)
576 # Add table data
577 page_data = page_content.get("data")
578 if page_data is not None:
579 # Check if it's a DataFrame or a list
580 if hasattr(page_data, "is_empty"):
581 # It's a DataFrame
582 if not page_data.is_empty():
583 page_rows.extend(page_data)
584 else:
585 # It's a list or other iterable
586 if page_data:
587 page_rows.extend(page_data)
589 # Add footer content
590 if page_content.get("footers"):
591 for footer_content in page_content["footers"]:
592 footer_text = footer_content.get("text", "")
593 if footer_text:
594 page_rows.append(footer_text)
596 # Add page break between pages (except last page)
597 if page_num < len(pages):
598 page_rows.append(document_service.generate_page_break(document))
600 all_rows.extend(page_rows)
602 return all_rows
604 def encode_column_header(
605 self, df, rtf_attrs, page_col_width: float
606 ) -> Sequence[str] | None:
607 """Encode column header component with column width support.
609 Args:
610 df: DataFrame containing header data
611 rtf_attrs: RTFColumnHeader attributes
612 page_col_width: Page column width for calculations
614 Returns:
615 List of RTF header strings
616 """
617 if rtf_attrs is None:
618 return None
620 dim = df.shape
622 rtf_attrs.col_rel_width = rtf_attrs.col_rel_width or [1] * dim[1]
623 rtf_attrs = rtf_attrs._set_default()
625 from ..row import Utils
627 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, page_col_width)
629 return rtf_attrs._encode(df, col_widths)
631 def encode_page_break(self, page_config, page_margin_encode_func) -> str:
632 """Generate proper RTF page break sequence matching r2rtf format.
634 Args:
635 page_config: RTFPage configuration
636 page_margin_encode_func: Function to encode page margins
638 Returns:
639 RTF page break string
640 """
641 from ..core import RTFConstants
643 page_setup = (
644 f"\\paperw{int(page_config.width * RTFConstants.TWIPS_PER_INCH)}"
645 f"\\paperh{int(page_config.height * RTFConstants.TWIPS_PER_INCH)}\n\n"
646 f"{page_margin_encode_func()}\n"
647 )
649 return f"{{\\pard\\fs2\\par}}\\page{{\\pard\\fs2\\par}}\n{page_setup}"
651 def encode_page_margin(self, page_config) -> str:
652 """Define RTF margin settings.
654 Args:
655 page_config: RTFPage configuration with margin settings
657 Returns:
658 RTF margin settings string
659 """
660 from ..row import Utils
662 margin_codes = [
663 "\\margl",
664 "\\margr",
665 "\\margt",
666 "\\margb",
667 "\\headery",
668 "\\footery",
669 ]
670 margins = [Utils._inch_to_twip(m) for m in page_config.margin]
671 margin = "".join(
672 f"{code}{margin}"
673 for code, margin in zip(margin_codes, margins, strict=True)
674 )
675 return margin + "\n"