Coverage for src/rtflite/encode.py: 55%
210 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-07 05:03 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-07 05:03 +0000
1from collections.abc import MutableSequence
3import pandas as pd
4from pydantic import BaseModel, ConfigDict, Field, model_validator
6from .input import (
7 BroadcastValue,
8 RTFBody,
9 RTFColumnHeader,
10 RTFFootnote,
11 RTFPage,
12 RTFPageFooter,
13 RTFPageHeader,
14 RTFSource,
15 RTFSubline,
16 RTFTitle,
17 TableAttributes,
18)
19from .row import Utils
22class RTFDocument(BaseModel):
23 model_config = ConfigDict(arbitrary_types_allowed=True)
25 df: pd.DataFrame = Field(
26 ..., description="The DataFrame containing the data for the RTF document."
27 )
28 rtf_page: RTFPage = Field(
29 default_factory=lambda: RTFPage(),
30 description="Page settings including size, orientation and margins",
31 )
32 rtf_page_header: RTFPageHeader | None = Field(
33 default=None, description="Text to appear in the header of each page"
34 )
35 rtf_title: RTFTitle | None = Field(
36 default_factory=lambda: RTFTitle(),
37 description="Title section settings including text and formatting",
38 )
39 rtf_subline: RTFSubline | None = Field(
40 default=None, description="Subject line text to appear below the title"
41 )
42 rtf_column_header: list[RTFColumnHeader] = Field(
43 default_factory=lambda: [RTFColumnHeader()],
44 description="Column header settings",
45 )
46 rtf_body: RTFBody | None = Field(
47 default_factory=lambda: RTFBody(),
48 description="Table body section settings including column widths and formatting",
49 )
50 rtf_footnote: RTFFootnote | None = Field(
51 default=None, description="Footnote text to appear at bottom of document"
52 )
53 rtf_source: RTFSource | None = Field(
54 default=None, description="Data source citation text"
55 )
56 rtf_page_footer: RTFPageFooter | None = Field(
57 default=None, description="Text to appear in the footer of each page"
58 )
60 @model_validator(mode="after")
61 def validate_column_names(self):
62 columns = self.df.columns.tolist()
64 if self.rtf_body.group_by is not None:
65 for column in self.rtf_body.group_by:
66 if column not in columns:
67 raise ValueError(f"`group_by` column {column} not found in `df`")
69 if self.rtf_body.page_by is not None:
70 for column in self.rtf_body.page_by:
71 if column not in columns:
72 raise ValueError(f"`page_by` column {column} not found in `df`")
74 if self.rtf_body.subline_by is not None:
75 for column in self.rtf_body.subline_by:
76 if column not in columns:
77 raise ValueError(f"`subline_by` column {column} not found in `df`")
79 return self
81 def __init__(self, **data):
82 super().__init__(**data)
83 dim = self.df.shape
84 # Set default values
85 self.rtf_body.col_rel_width = self.rtf_body.col_rel_width or [1] * dim[1]
86 self._table_space = int(
87 Utils._inch_to_twip(self.rtf_page.width - self.rtf_page.col_width) / 2
88 )
90 if self.rtf_subline is not None:
91 if self.rtf_subline.text_indent_reference == "table":
92 self.rtf_subline.text_space_before = (
93 self._table_space + self.rtf_subline.text_space_before
94 )
95 self.rtf_subline.text_space_after = (
96 self._table_space + self.rtf_subline.text_space_after
97 )
99 if self.rtf_page_header is not None:
100 if self.rtf_page_header.text_indent_reference == "table":
101 self.rtf_page_header.text_space_before = (
102 self._table_space + self.rtf_page_header.text_space_before
103 )
104 self.rtf_page_header.text_space_after = (
105 self._table_space + self.rtf_page_header.text_space_after
106 )
108 if self.rtf_page_footer is not None:
109 if self.rtf_page_footer.text_indent_reference == "table":
110 self.rtf_page_footer.text_space_before = (
111 self._table_space + self.rtf_page_footer.text_space_before
112 )
113 self.rtf_page_footer.text_space_after = (
114 self._table_space + self.rtf_page_footer.text_space_after
115 )
117 def _rtf_page_encode(self) -> str:
118 """Define RTF page settings"""
119 page_size = [
120 f"\\paperw{Utils._inch_to_twip(self.rtf_page.width)}",
121 f"\\paperh{Utils._inch_to_twip(self.rtf_page.height)}",
122 ]
123 page_size = "".join(page_size)
125 if self.rtf_page.orientation == "landscape":
126 page_size += "\\landscape\n"
127 else:
128 page_size += "\n"
130 # Add page footer if exists
131 # if self.rtf_page.page_footer:
132 # footer = ["{\\footer", self._rtf_paragraph(self.rtf_page.page_footer), "}"]
133 # page_size = "\n".join(footer + [page_size])
135 # Add page header if exists
136 # if self.rtf_page.page_header:
137 # header = ["{\\header", self._rtf_paragraph(self.rtf_page.page_header), "}"]
138 # page_size = "\n".join(header + [page_size])
140 return page_size
142 def _rtf_page_margin_encode(self) -> str:
143 """Define RTF margin settings"""
144 margin_codes = [
145 "\\margl",
146 "\\margr",
147 "\\margt",
148 "\\margb",
149 "\\headery",
150 "\\footery",
151 ]
152 margins = [Utils._inch_to_twip(m) for m in self.rtf_page.margin]
153 margin = "".join(
154 f"{code}{margin}" for code, margin in zip(margin_codes, margins)
155 )
156 return margin + "\n"
158 def _rtf_page_header_encode(self, method: str) -> str:
159 """Convert the RTF page header into RTF syntax using the Text class."""
160 if not self.rtf_page_header:
161 return None
163 return self.rtf_page_header._encode(
164 text=self.rtf_page_header.text, method=method
165 )
167 def _rtf_page_header_encode(self, method: str) -> str:
168 """Convert the RTF page header into RTF syntax using the Text class."""
169 if self.rtf_page_header is None:
170 return None
172 encode = self.rtf_page_header._encode(
173 text=self.rtf_page_header.text, method=method
174 )
175 return f"{ \\header{encode}} "
177 def _rtf_page_footer_encode(self, method: str) -> str:
178 """Convert the RTF page footer into RTF syntax using the Text class."""
179 if self.rtf_page_footer is None:
180 return None
182 encode = self.rtf_page_footer._encode(
183 text=self.rtf_page_footer.text, method=method
184 )
185 return f"{ \\footer{encode}} "
187 def _rtf_title_encode(self, method: str) -> str:
188 """Convert the RTF title into RTF syntax using the Text class."""
189 if not self.rtf_title:
190 return None
192 return self.rtf_title._encode(text=self.rtf_title.text, method=method)
194 def _rtf_subline_encode(self, method: str) -> str:
195 """Convert the RTF subline into RTF syntax using the Text class."""
196 if self.rtf_subline is None:
197 return None
199 encode = self.rtf_subline._encode(text=self.rtf_subline.text, method=method)
200 return encode
202 def _page_by(self) -> list[list[tuple[int, int, int]]]:
203 """Create components for page_by format.
205 This method organizes data into sections based on the page_by grouping variables.
207 Returns:
208 A list of sections, where each section is a list of tuples (row_idx, col_idx, level).
209 Each tuple represents:
210 - row_idx: The row index in the dataframe
211 - col_idx: The column index in the dataframe
212 - level: The nesting level of the section header.
214 """
215 # obtain input data
216 data = self.df.to_dict("records")
217 var = self.rtf_body.page_by
219 # obtain column names and dimensions
220 columns = list(data[0].keys())
221 dim = (len(data), len(columns))
223 if var is None:
224 return None
226 def get_column_index(column_name: str) -> int:
227 """Get the index of a column in the column list."""
228 return columns.index(column_name)
230 def get_matching_rows(group_values: dict) -> list[int]:
231 """Get row indices that match the group values."""
232 return [
233 i
234 for i, row in enumerate(data)
235 if all(row[k] == v for k, v in group_values.items())
236 ]
238 def get_unique_combinations(variables: list[str]) -> list[dict]:
239 """Get unique combinations of values for the specified variables."""
240 seen = set()
241 unique = []
242 for row in data:
243 key = tuple(row[v] for v in variables)
244 if key not in seen:
245 seen.add(key)
246 unique.append({v: row[v] for v in variables})
247 return unique
249 output = []
250 prev_values = {v: None for v in var}
252 # Process each unique combination of grouping variables
253 for group in get_unique_combinations(var):
254 indices = get_matching_rows(group)
256 # Handle headers for each level
257 for level, var_name in enumerate(var):
258 current_val = group[var_name]
260 need_header = False
261 if level == len(var) - 1:
262 need_header = True
263 else:
264 for l in range(level + 1):
265 if group[var[l]] != prev_values[var[l]]:
266 need_header = True
267 break
269 if need_header:
270 col_idx = get_column_index(var_name)
271 # Add level information as third element in tuple
272 output.append([(indices[0], col_idx, level)])
274 prev_values[var_name] = current_val
276 # Handle data rows
277 for index in indices:
278 output.append(
279 [
280 (index, j, len(var))
281 for j in range(len(columns))
282 if columns[j] not in var
283 ]
284 )
286 return output
288 def _rtf_footnote_encode(self) -> str:
289 """Convert the RTF footnote into RTF syntax using the Text class."""
290 rtf_attrs = self.rtf_footnote
292 if rtf_attrs is None:
293 return None
295 col_total_width = self.rtf_page.col_width
296 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, col_total_width)
297 return rtf_attrs._encode(rtf_attrs.text, col_widths)
299 def _rtf_source_encode(self) -> str:
300 """Convert the RTF source into RTF syntax using the Text class."""
301 rtf_attrs = self.rtf_source
303 if rtf_attrs is None:
304 return None
306 col_total_width = self.rtf_page.col_width
307 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, col_total_width)
308 return rtf_attrs._encode(rtf_attrs.text, col_widths)
310 def _rtf_body_encode(
311 self, df: pd.DataFrame, rtf_attrs: TableAttributes | None
312 ) -> MutableSequence[str]:
313 """Convert the RTF table into RTF syntax using the Cell class.
315 Args:
316 df: Input DataFrame to encode
317 rtf_attrs: Table attributes for styling
319 Returns:
320 List of RTF-encoded strings representing table rows
321 """
322 if rtf_attrs is None:
323 return None
325 # Initialize dimensions and widths
326 dim = df.shape
327 col_total_width = self.rtf_page.col_width
328 page_by = self._page_by()
330 if page_by is None:
331 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, col_total_width)
332 return rtf_attrs._encode(df, col_widths)
334 rows = []
335 for section in page_by:
336 # Skip empty sections
337 indices = [(row, col) for row, col, level in section]
338 if not indices:
339 continue
341 # Create DataFrame for current section
342 section_df = pd.DataFrame(
343 {
344 i: [BroadcastValue(value=df).iloc(row, col)]
345 for i, (row, col) in enumerate(indices)
346 }
347 )
349 # Collect all text and table attributes
350 section_attrs_dict = rtf_attrs._get_section_attributes(indices)
351 section_attrs = TableAttributes(**section_attrs_dict)
353 # Calculate column widths and encode section
354 col_widths = Utils._col_widths(section_attrs.col_rel_width, col_total_width)
355 rows.extend(section_attrs._encode(section_df, col_widths))
357 return rows
359 def _rtf_column_header_encode(
360 self, df: pd.DataFrame, rtf_attrs: TableAttributes | None
361 ) -> MutableSequence[str]:
362 dim = df.shape
363 col_total_width = self.rtf_page.col_width
365 if rtf_attrs is None:
366 return None
368 rtf_attrs.col_rel_width = rtf_attrs.col_rel_width or [1] * dim[1]
369 rtf_attrs = rtf_attrs._set_default()
371 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, col_total_width)
373 return rtf_attrs._encode(df, col_widths)
375 def _rtf_start_encode(self) -> str:
376 return "{\\rtf1\\ansi\n\\deff0\\deflang1033"
378 def _rtf_font_table_encode(self) -> str:
379 """Define RTF fonts"""
380 font_types = Utils._font_type()
381 font_rtf = [f"\\f{i}" for i in range(10)]
382 font_style = font_types["style"]
383 font_name = font_types["name"]
384 font_charset = font_types["charset"]
386 font_table = "{\\fonttbl"
387 for rtf, style, name, charset in zip(
388 font_rtf, font_style, font_name, font_charset
389 ):
390 font_table += f"{ {rtf}{style}{charset}\\fprq2 {name};} \n"
391 font_table += "}"
393 return font_table
395 def rtf_encode(self) -> str:
396 """Generate RTF code"""
397 dim = self.df.shape
399 # Title
400 rtf_title = self._rtf_title_encode(method="line")
402 # Page Border
403 doc_border_top = BroadcastValue(
404 value=self.rtf_page.border_first, dimension=(1, dim[1])
405 ).to_list()[0]
406 doc_border_bottom = BroadcastValue(
407 value=self.rtf_page.border_last, dimension=(1, dim[1])
408 ).to_list()[0]
409 page_border_top = BroadcastValue(
410 value=self.rtf_body.border_first, dimension=(1, dim[1])
411 ).to_list()[0]
412 page_border_bottom = BroadcastValue(
413 value=self.rtf_body.border_last, dimension=(1, dim[1])
414 ).to_list()[0]
416 # Column header
417 if self.rtf_column_header is None:
418 rtf_column_header = ""
419 self.rtf_body.border_top = BroadcastValue(
420 value=self.rtf_body.border_top, dimension=dim
421 ).update_row(0, doc_border_top)
422 else:
423 if self.rtf_column_header[0].text is None and self.rtf_body.as_colheader:
424 columns = [
425 col
426 for col in self.df.columns
427 if col not in (self.rtf_body.page_by or [])
428 ]
429 self.rtf_column_header[0].text = pd.DataFrame([columns])
430 self.rtf_column_header = self.rtf_column_header[:1]
432 self.rtf_column_header[0].border_top = BroadcastValue(
433 value=self.rtf_column_header[0].border_top, dimension=dim
434 ).update_row(0, doc_border_top)
436 rtf_column_header = [
437 self._rtf_column_header_encode(df=header.text, rtf_attrs=header)
438 for header in self.rtf_column_header
439 ]
441 self.rtf_body.border_top = BroadcastValue(
442 value=self.rtf_body.border_top, dimension=dim
443 ).update_row(0, page_border_top)
445 # Bottom border last line update
446 if self.rtf_footnote is not None:
447 self.rtf_footnote.border_bottom = BroadcastValue(
448 value=self.rtf_footnote.border_bottom, dimension=(1, 1)
449 ).update_row(0, page_border_bottom[0])
451 self.rtf_footnote.border_bottom = BroadcastValue(
452 value=self.rtf_footnote.border_bottom, dimension=(1, 1)
453 ).update_row(0, doc_border_bottom[0])
454 else:
455 self.rtf_body.border_bottom = BroadcastValue(
456 value=self.rtf_body.border_bottom, dimension=dim
457 ).update_row(dim[0] - 1, page_border_bottom)
459 self.rtf_body.border_bottom = BroadcastValue(
460 value=self.rtf_body.border_bottom, dimension=dim
461 ).update_row(dim[0] - 1, doc_border_bottom)
463 # Body
464 rtf_body = self._rtf_body_encode(df=self.df, rtf_attrs=self.rtf_body)
466 return "\n".join(
467 [
468 item
469 for item in [
470 self._rtf_start_encode(),
471 self._rtf_font_table_encode(),
472 "\n",
473 self._rtf_page_encode(),
474 self._rtf_page_margin_encode(),
475 self._rtf_page_header_encode(method="line"),
476 self._rtf_page_footer_encode(method="line"),
477 rtf_title,
478 "\n",
479 self._rtf_subline_encode(method="line"),
480 "\n".join(
481 header for sublist in rtf_column_header for header in sublist
482 )
483 if rtf_column_header
484 else None,
485 "\n".join(rtf_body),
486 "\n".join(self._rtf_footnote_encode())
487 if self.rtf_footnote is not None
488 else None,
489 "\n".join(self._rtf_source_encode())
490 if self.rtf_source is not None
491 else None,
492 "\n\n",
493 "}",
494 ]
495 if item is not None
496 ]
497 )
499 def write_rtf(self, file_path: str) -> None:
500 """Write the RTF code into a `.rtf` file."""
501 print(file_path)
502 rtf_code = self.rtf_encode()
503 with open(file_path, "w", encoding="utf-8") as f:
504 f.write(rtf_code)