Coverage for src/rtflite/services/encoding_service.py: 72%
177 statements
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-25 22:35 +0000
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-25 22:35 +0000
1"""RTF encoding service that handles document component encoding."""
3from collections.abc import Sequence
5from .grouping_service import grouping_service
8class RTFEncodingService:
9 """Service class that handles RTF component encoding operations.
11 This class extracts encoding logic from RTFDocument to improve separation
12 of concerns and enable better testing and maintainability.
13 """
15 def __init__(self):
16 from ..rtf import RTFSyntaxGenerator
18 self.syntax = RTFSyntaxGenerator()
20 def encode_document_start(self) -> str:
21 """Encode RTF document start."""
22 return "{\\rtf1\\ansi\n\\deff0\\deflang1033"
24 def encode_font_table(self) -> str:
25 """Encode RTF font table."""
26 return self.syntax.generate_font_table()
28 def encode_color_table(
29 self, document=None, used_colors: Sequence[str] | None = None
30 ) -> str:
31 """Encode RTF color table with comprehensive 657-color support.
33 Args:
34 document: RTF document to analyze for color usage (preferred)
35 used_colors: List of color names used in document. If None and document provided, colors are auto-detected.
37 Returns:
38 RTF color table string (empty if no colors beyond black/"" are used)
39 """
40 if document is not None and used_colors is None:
41 # Auto-detect colors from document
42 from ..services.color_service import color_service
44 used_colors = color_service.collect_document_colors(document)
46 return self.syntax.generate_color_table(used_colors)
48 def encode_page_settings(self, page_config) -> str:
49 """Encode RTF page settings.
51 Args:
52 page_config: RTFPage configuration object
54 Returns:
55 RTF page settings string
56 """
57 return self.syntax.generate_page_settings(
58 page_config.width,
59 page_config.height,
60 page_config.margin,
61 page_config.orientation,
62 )
64 def encode_page_header(self, header_config, method: str = "line") -> str:
65 """Encode page header component.
67 Args:
68 header_config: RTFPageHeader configuration
69 method: Encoding method
71 Returns:
72 RTF header string
73 """
74 if header_config is None or not header_config.text:
75 return ""
77 # Use the existing text encoding method
78 result = header_config._encode_text(text=header_config.text, method=method)
80 return f"{{\\header{result}}}"
82 def encode_page_footer(self, footer_config, method: str = "line") -> str:
83 """Encode page footer component.
85 Args:
86 footer_config: RTFPageFooter configuration
87 method: Encoding method
89 Returns:
90 RTF footer string
91 """
92 if footer_config is None or not footer_config.text:
93 return ""
95 # Use the existing text encoding method
96 result = footer_config._encode_text(text=footer_config.text, method=method)
97 return f"{{\\footer{result}}}"
99 def encode_title(self, title_config, method: str = "line") -> str:
100 """Encode title component.
102 Args:
103 title_config: RTFTitle configuration
104 method: Encoding method
106 Returns:
107 RTF title string
108 """
109 if not title_config or not title_config.text:
110 return ""
112 # Use the existing text encoding method
113 return title_config._encode_text(text=title_config.text, method=method)
115 def encode_subline(self, subline_config, method: str = "line") -> str:
116 """Encode subline component.
118 Args:
119 subline_config: RTFSubline configuration
120 method: Encoding method
122 Returns:
123 RTF subline string
124 """
125 if subline_config is None or not subline_config.text:
126 return ""
128 # Use the existing text encoding method
129 return subline_config._encode_text(text=subline_config.text, method=method)
131 def encode_footnote(
132 self,
133 footnote_config,
134 page_number: int | None = None,
135 page_col_width: float | None = None,
136 ) -> Sequence[str]:
137 """Encode footnote component with advanced formatting.
139 Args:
140 footnote_config: RTFFootnote configuration
141 page_number: Page number for footnote
142 page_col_width: Page column width for calculations
144 Returns:
145 List of RTF footnote strings
146 """
147 if footnote_config is None:
148 return []
150 rtf_attrs = footnote_config
152 # Apply page-specific border if set
153 if (
154 hasattr(rtf_attrs, "_page_border_style")
155 and page_number is not None
156 and page_number in rtf_attrs._page_border_style
157 ):
158 border_style = rtf_attrs._page_border_style[page_number]
159 # Create a copy with modified border
160 rtf_attrs = rtf_attrs.model_copy()
161 rtf_attrs.border_bottom = [[border_style]]
163 # Check if footnote should be rendered as table or paragraph
164 if hasattr(rtf_attrs, "as_table") and not rtf_attrs.as_table:
165 # Render as paragraph (plain text)
166 if isinstance(rtf_attrs.text, list):
167 text_list = rtf_attrs.text
168 else:
169 text_list = [rtf_attrs.text] if rtf_attrs.text else []
171 # Use TextAttributes._encode_text method directly for paragraph rendering
172 return rtf_attrs._encode_text(text_list, method="paragraph")
173 else:
174 # Render as table (default behavior)
175 if page_col_width is not None:
176 from ..row import Utils
178 col_total_width = page_col_width
179 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, col_total_width)
181 # Create DataFrame from text string
182 import polars as pl
184 df = pl.DataFrame([[rtf_attrs.text]])
185 return rtf_attrs._encode(df, col_widths)
186 else:
187 # Fallback without column width calculations
188 import polars as pl
190 df = pl.DataFrame([[rtf_attrs.text]])
191 return rtf_attrs._encode(df)
193 def encode_source(
194 self,
195 source_config,
196 page_number: int | None = None,
197 page_col_width: float | None = None,
198 ) -> Sequence[str]:
199 """Encode source component with advanced formatting.
201 Args:
202 source_config: RTFSource configuration
203 page_number: Page number for source
204 page_col_width: Page column width for calculations
206 Returns:
207 List of RTF source strings
208 """
209 if source_config is None:
210 return []
212 rtf_attrs = source_config
214 # Apply page-specific border if set
215 if (
216 hasattr(rtf_attrs, "_page_border_style")
217 and page_number is not None
218 and page_number in rtf_attrs._page_border_style
219 ):
220 border_style = rtf_attrs._page_border_style[page_number]
221 # Create a copy with modified border
222 rtf_attrs = rtf_attrs.model_copy()
223 rtf_attrs.border_bottom = [[border_style]]
225 # Check if source should be rendered as table or paragraph
226 if hasattr(rtf_attrs, "as_table") and not rtf_attrs.as_table:
227 # Render as paragraph (plain text)
228 if isinstance(rtf_attrs.text, list):
229 text_list = rtf_attrs.text
230 else:
231 text_list = [rtf_attrs.text] if rtf_attrs.text else []
233 # Use TextAttributes._encode_text method directly for paragraph rendering
234 return rtf_attrs._encode_text(text_list, method="paragraph")
235 else:
236 # Render as table (default behavior)
237 if page_col_width is not None:
238 from ..row import Utils
240 col_total_width = page_col_width
241 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, col_total_width)
243 # Create DataFrame from text string
244 import polars as pl
246 df = pl.DataFrame([[rtf_attrs.text]])
247 return rtf_attrs._encode(df, col_widths)
248 else:
249 # Fallback without column width calculations
250 import polars as pl
252 df = pl.DataFrame([[rtf_attrs.text]])
253 return rtf_attrs._encode(df)
255 def prepare_dataframe_for_body_encoding(self, df, rtf_attrs):
256 """Prepare DataFrame for body encoding with group_by processing and column removal.
258 Args:
259 df: Input DataFrame
260 rtf_attrs: RTFBody attributes
262 Returns:
263 Tuple of (processed_df, original_df) where processed_df has transformations applied
264 """
265 original_df = df.clone()
266 processed_df = df.clone()
268 # Remove subline_by columns from the processed DataFrame
269 if rtf_attrs.subline_by is not None:
270 columns_to_remove = set(rtf_attrs.subline_by)
271 remaining_columns = [
272 col for col in processed_df.columns if col not in columns_to_remove
273 ]
274 processed_df = processed_df.select(remaining_columns)
276 # Note: group_by suppression is handled in the pagination strategy
277 # for documents that need pagination. For non-paginated documents,
278 # group_by is handled separately in encode_body method.
280 return processed_df, original_df
282 def encode_body(
283 self, document, df, rtf_attrs, force_single_page=False
284 ) -> Sequence[str] | None:
285 """Encode table body component with full pagination support.
287 Args:
288 document: RTFDocument instance for accessing pagination logic
289 df: DataFrame containing table data
290 rtf_attrs: RTFBody attributes
292 Returns:
293 List of RTF body strings
294 """
295 if rtf_attrs is None:
296 return None
298 # Initialize dimensions and widths
299 from ..row import Utils
300 from .document_service import RTFDocumentService
302 document_service = RTFDocumentService()
303 col_total_width = document.rtf_page.col_width
304 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, col_total_width)
306 # Validate data sorting for all grouping parameters
307 if any([rtf_attrs.group_by, rtf_attrs.page_by, rtf_attrs.subline_by]):
308 grouping_service.validate_data_sorting(
309 df,
310 group_by=rtf_attrs.group_by,
311 page_by=rtf_attrs.page_by,
312 subline_by=rtf_attrs.subline_by,
313 )
315 # Validate subline_by formatting consistency and issue warnings
316 if rtf_attrs.subline_by is not None:
317 import warnings
319 formatting_warnings = (
320 grouping_service.validate_subline_formatting_consistency(
321 df, rtf_attrs.subline_by, rtf_attrs
322 )
323 )
324 for warning_msg in formatting_warnings:
325 warnings.warn(
326 f"subline_by formatting: {warning_msg}", UserWarning, stacklevel=2
327 )
329 # Apply group_by and subline_by processing if specified
330 processed_df, original_df = self.prepare_dataframe_for_body_encoding(
331 df, rtf_attrs
332 )
334 # Check if pagination is needed (unless forced to single page)
335 if not force_single_page and document_service.needs_pagination(document):
336 return self._encode_body_paginated(
337 document, processed_df, rtf_attrs, col_widths
338 )
340 # Handle existing page_by grouping (non-paginated)
341 page_by = document_service.process_page_by(document)
342 if page_by is None:
343 # Note: subline_by documents should use pagination, so this path should not be reached for them
344 # Apply group_by processing for non-paginated documents
345 if rtf_attrs.group_by is not None:
346 processed_df = grouping_service.enhance_group_by(
347 processed_df, rtf_attrs.group_by
348 )
349 return rtf_attrs._encode(processed_df, col_widths)
351 rows: list[str] = []
352 for section in page_by:
353 # Skip empty sections
354 indices = [(row, col) for row, col, level in section]
355 if not indices:
356 continue
358 # Create DataFrame for current section
359 import polars as pl
361 from ..attributes import BroadcastValue
363 section_df = pl.DataFrame(
364 {
365 str(i): [
366 BroadcastValue(value=processed_df, dimension=None).iloc(
367 row, col
368 )
369 ]
370 for i, (row, col) in enumerate(indices)
371 }
372 )
374 # Collect all text and table attributes
375 from ..input import TableAttributes
377 section_attrs_dict = rtf_attrs._get_section_attributes(indices)
378 section_attrs = TableAttributes(**section_attrs_dict)
380 # Calculate column widths and encode section
381 if section_attrs.col_rel_width is None:
382 # Default to equal widths if not specified
383 section_attrs.col_rel_width = [1.0] * len(indices)
384 section_col_widths = Utils._col_widths(
385 section_attrs.col_rel_width, col_total_width
386 )
387 rows.extend(section_attrs._encode(section_df, section_col_widths))
389 return rows
391 def _encode_body_paginated(
392 self, document, df, rtf_attrs, col_widths
393 ) -> Sequence[str]:
394 """Encode body content with pagination support."""
395 from .document_service import RTFDocumentService
397 document_service = RTFDocumentService()
398 _, distributor = document_service.create_pagination_instance(document)
400 # Distribute content across pages (r2rtf compatible)
401 additional_rows = document_service.calculate_additional_rows_per_page(document)
402 pages = distributor.distribute_content(
403 df=df,
404 col_widths=col_widths,
405 table_attrs=rtf_attrs,
406 additional_rows_per_page=additional_rows,
407 )
409 # Generate RTF for each page
410 all_rows = []
411 for page_num, page_content in enumerate(pages, 1):
412 page_rows = []
414 # Add page header content
415 if page_content.get("headers"):
416 for header_content in page_content["headers"]:
417 header_text = header_content.get("text", "")
418 if header_text:
419 page_rows.append(header_text)
421 # Add table data
422 page_data = page_content.get("data")
423 if page_data is not None:
424 # Check if it's a DataFrame or a list
425 if hasattr(page_data, "is_empty"):
426 # It's a DataFrame
427 if not page_data.is_empty():
428 page_rows.extend(page_data)
429 else:
430 # It's a list or other iterable
431 if page_data:
432 page_rows.extend(page_data)
434 # Add footer content
435 if page_content.get("footers"):
436 for footer_content in page_content["footers"]:
437 footer_text = footer_content.get("text", "")
438 if footer_text:
439 page_rows.append(footer_text)
441 # Add page break between pages (except last page)
442 if page_num < len(pages):
443 page_rows.append(document_service.generate_page_break(document))
445 all_rows.extend(page_rows)
447 return all_rows
449 def encode_column_header(
450 self, df, rtf_attrs, page_col_width: float
451 ) -> Sequence[str] | None:
452 """Encode column header component with column width support.
454 Args:
455 df: DataFrame containing header data
456 rtf_attrs: RTFColumnHeader attributes
457 page_col_width: Page column width for calculations
459 Returns:
460 List of RTF header strings
461 """
462 if rtf_attrs is None:
463 return None
465 dim = df.shape
467 rtf_attrs.col_rel_width = rtf_attrs.col_rel_width or [1] * dim[1]
468 rtf_attrs = rtf_attrs._set_default()
470 from ..row import Utils
472 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, page_col_width)
474 return rtf_attrs._encode(df, col_widths)
476 def encode_page_break(self, page_config, page_margin_encode_func) -> str:
477 """Generate proper RTF page break sequence matching r2rtf format.
479 Args:
480 page_config: RTFPage configuration
481 page_margin_encode_func: Function to encode page margins
483 Returns:
484 RTF page break string
485 """
486 from ..core import RTFConstants
488 page_setup = (
489 f"\\paperw{int(page_config.width * RTFConstants.TWIPS_PER_INCH)}"
490 f"\\paperh{int(page_config.height * RTFConstants.TWIPS_PER_INCH)}\n\n"
491 f"{page_margin_encode_func()}\n"
492 )
494 return f"{{\\pard\\fs2\\par}}\\page{{\\pard\\fs2\\par}}\n{page_setup}"
496 def encode_page_margin(self, page_config) -> str:
497 """Define RTF margin settings.
499 Args:
500 page_config: RTFPage configuration with margin settings
502 Returns:
503 RTF margin settings string
504 """
505 from ..row import Utils
507 margin_codes = [
508 "\\margl",
509 "\\margr",
510 "\\margt",
511 "\\margb",
512 "\\headery",
513 "\\footery",
514 ]
515 margins = [Utils._inch_to_twip(m) for m in page_config.margin]
516 margin = "".join(
517 f"{code}{margin}" for code, margin in zip(margin_codes, margins)
518 )
519 return margin + "\n"