Coverage for src/rtflite/encode.py: 63%
146 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-03 15:40 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-03 15:40 +0000
1from collections.abc import MutableSequence
3import pandas as pd
4from pydantic import BaseModel, ConfigDict, Field
6from .input import (
7 BroadcastValue,
8 RTFBody,
9 RTFColumnHeader,
10 RTFPage,
11 RTFTitle,
12 TableAttributes,
13)
14from .row import Border, Cell, Row, TextContent, Utils
15from .strwidth import get_string_width
18class RTFDocument(BaseModel):
19 model_config = ConfigDict(arbitrary_types_allowed=True)
21 df: pd.DataFrame = Field(
22 ..., description="The DataFrame containing the data for the RTF document."
23 )
24 rtf_page: RTFPage = Field(
25 default_factory=lambda: RTFPage(),
26 description="Page settings including size, orientation and margins",
27 )
28 rtf_page_header: str | None = Field(
29 default=None, description="Text to appear in the header of each page"
30 )
31 rtf_title: RTFTitle | None = Field(
32 default_factory=lambda: RTFTitle(),
33 description="Title section settings including text and formatting",
34 )
35 rtf_subline: str | None = Field(
36 default=None, description="Subject line text to appear below the title"
37 )
38 rtf_column_header: list[RTFColumnHeader] = Field(
39 default_factory=lambda: [RTFColumnHeader()],
40 description="Column header settings",
41 )
42 rtf_body: RTFBody | None = Field(
43 default_factory=lambda: RTFBody(),
44 description="Table body section settings including column widths and formatting",
45 )
46 rtf_footnote: str | None = Field(
47 default=None, description="Footnote text to appear at bottom of document"
48 )
49 rtf_source: str | None = Field(
50 default=None, description="Data source citation text"
51 )
52 rtf_page_footer: str | None = Field(
53 default=None, description="Text to appear in the footer of each page"
54 )
56 def _rtf_page_encode(self) -> str:
57 """Define RTF page settings"""
59 self.rtf_page = self.rtf_page._set_default()
61 page_size = [
62 f"\\paperw{Utils._inch_to_twip(self.rtf_page.width)}",
63 f"\\paperh{Utils._inch_to_twip(self.rtf_page.height)}",
64 ]
65 page_size = "".join(page_size)
67 if self.rtf_page.orientation == "landscape":
68 page_size += "\\landscape\n"
69 else:
70 page_size += "\n"
72 # Add page footer if exists
73 # if self.rtf_page.page_footer:
74 # footer = ["{\\footer", self._rtf_paragraph(self.rtf_page.page_footer), "}"]
75 # page_size = "\n".join(footer + [page_size])
77 # Add page header if exists
78 # if self.rtf_page.page_header:
79 # header = ["{\\header", self._rtf_paragraph(self.rtf_page.page_header), "}"]
80 # page_size = "\n".join(header + [page_size])
82 return page_size
84 def _rtf_page_margin_encode(self) -> str:
85 """Define RTF margin settings"""
87 self.rtf_page = self.rtf_page._set_default()
89 margin_codes = [
90 "\\margl",
91 "\\margr",
92 "\\margt",
93 "\\margb",
94 "\\headery",
95 "\\footery",
96 ]
97 margins = [Utils._inch_to_twip(m) for m in self.rtf_page.margin]
98 margin = "".join(
99 f"{code}{margin}" for code, margin in zip(margin_codes, margins)
100 )
101 return margin + "\n"
103 def _rtf_title_encode(self, method: str) -> str:
104 """Convert the RTF title into RTF syntax using the Text class."""
105 if not self.rtf_title:
106 return None
108 self.rtf_title = self.rtf_title._set_default()
110 return self.rtf_title._encode(text=self.rtf_title.text, method=method)
112 def _page_by(self) -> list[list[tuple[int, int, int]]]:
113 """Create components for page_by format.
115 This method organizes data into sections based on the page_by grouping variables.
117 Returns:
118 A list of sections, where each section is a list of tuples (row_idx, col_idx, level).
119 Each tuple represents:
120 - row_idx: The row index in the dataframe
121 - col_idx: The column index in the dataframe
122 - level: The nesting level of the section header.
124 """
125 # obtain input data
126 data = self.df.to_dict("records")
127 var = self.rtf_body.page_by
129 # obtain column names and dimensions
130 columns = list(data[0].keys())
131 dim = (len(data), len(columns))
133 if var is None:
134 return None
136 def get_column_index(column_name: str) -> int:
137 """Get the index of a column in the column list."""
138 return columns.index(column_name)
140 def get_matching_rows(group_values: dict) -> list[int]:
141 """Get row indices that match the group values."""
142 return [
143 i
144 for i, row in enumerate(data)
145 if all(row[k] == v for k, v in group_values.items())
146 ]
148 def get_unique_combinations(variables: list[str]) -> list[dict]:
149 """Get unique combinations of values for the specified variables."""
150 seen = set()
151 unique = []
152 for row in data:
153 key = tuple(row[v] for v in variables)
154 if key not in seen:
155 seen.add(key)
156 unique.append({v: row[v] for v in variables})
157 return unique
159 output = []
160 prev_values = {v: None for v in var}
162 # Process each unique combination of grouping variables
163 for group in get_unique_combinations(var):
164 indices = get_matching_rows(group)
166 # Handle headers for each level
167 for level, var_name in enumerate(var):
168 current_val = group[var_name]
170 # Check if we need to print this level's header
171 # We print if either:
172 # 1. This is the deepest level (always print)
173 # 2. The value at this level has changed
174 need_header = False
175 if level == len(var) - 1:
176 need_header = True
177 else:
178 for l in range(level + 1):
179 if group[var[l]] != prev_values[var[l]]:
180 need_header = True
181 break
183 if need_header:
184 col_idx = get_column_index(var_name)
185 # Add level information as third element in tuple
186 output.append([(indices[0], col_idx, level)])
188 prev_values[var_name] = current_val
190 # Handle data rows
191 for index in indices:
192 output.append(
193 [
194 (index, j, len(var))
195 for j in range(len(columns))
196 if columns[j] not in var
197 ]
198 )
200 return output
202 def _rtf_body_encode(
203 self, df: pd.DataFrame, rtf_attrs: TableAttributes | None
204 ) -> MutableSequence[str]:
205 """Convert the RTF table into RTF syntax using the Cell class.
207 Args:
208 df: Input DataFrame to encode
209 rtf_attrs: Table attributes for styling
211 Returns:
212 List of RTF-encoded strings representing table rows
213 """
214 if rtf_attrs is None:
215 return None
217 # Initialize dimensions and widths
218 dim = df.shape
219 col_total_width = self.rtf_page._set_default().col_width
220 page_by = self._page_by()
222 if page_by is None:
223 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, col_total_width)
224 return rtf_attrs._encode(df, col_widths)
226 rows = []
227 for section in page_by:
228 # Skip empty sections
229 indices = [(row, col) for row, col, level in section]
230 if not indices:
231 continue
233 # Create DataFrame for current section
234 section_df = pd.DataFrame(
235 {
236 i: [BroadcastValue(value=df).iloc(row, col)]
237 for i, (row, col) in enumerate(indices)
238 }
239 )
241 # Collect all text and table attributes
242 section_attrs_dict = rtf_attrs._get_section_attributes(indices)
243 section_attrs = TableAttributes(**section_attrs_dict)
245 # Calculate column widths and encode section
246 col_widths = Utils._col_widths(section_attrs.col_rel_width, col_total_width)
247 rows.extend(section_attrs._encode(section_df, col_widths))
249 return rows
251 def _rtf_column_header_encode(
252 self, df: pd.DataFrame, rtf_attrs: TableAttributes | None
253 ) -> MutableSequence[str]:
254 dim = df.shape
255 col_total_width = self.rtf_page._set_default().col_width
257 if rtf_attrs is None:
258 return None
260 rtf_attrs.col_rel_width = rtf_attrs.col_rel_width or [1] * dim[1]
261 rtf_attrs = rtf_attrs._set_default()
263 col_widths = Utils._col_widths(rtf_attrs.col_rel_width, col_total_width)
265 return rtf_attrs._encode(df, col_widths)
267 def _rtf_start_encode(self) -> str:
268 return "{\\rtf1\\ansi\n\\deff0\\deflang1033"
270 def _rtf_font_table_encode(self) -> str:
271 """Define RTF fonts"""
272 font_types = Utils._font_type()
273 font_rtf = [f"\\f{i}" for i in range(10)]
274 font_style = font_types["style"]
275 font_name = font_types["name"]
277 font_table = "{\\fonttbl"
278 for rtf, style, name in zip(font_rtf, font_style, font_name):
279 font_table += f"{ {rtf}{style}\\fcharset161\\fprq2 {name};} \n"
280 font_table += "}"
282 return font_table
284 def rtf_encode(self) -> str:
285 """Generate RTF code"""
286 dim = self.df.shape
287 # Set default values
288 self.rtf_body.col_rel_width = self.rtf_body.col_rel_width or [1] * dim[1]
289 self.rtf_body = self.rtf_body._set_default()
291 # Title
292 rtf_title = self._rtf_title_encode(method="line")
294 # Page Border
295 doc_border_top = BroadcastValue(
296 value=self.rtf_page.border_first, dimension=(1, dim[1])
297 ).to_dataframe()
298 doc_border_bottom = BroadcastValue(
299 value=self.rtf_page.border_last, dimension=(1, dim[1])
300 ).to_dataframe()
301 page_border_top = BroadcastValue(
302 value=self.rtf_body.border_first, dimension=(1, dim[1])
303 ).to_dataframe()
304 page_border_bottom = BroadcastValue(
305 value=self.rtf_body.border_last, dimension=(1, dim[1])
306 ).to_dataframe()
308 # Column header
309 if self.rtf_column_header is None:
310 rtf_column_header = ""
311 self.rtf_body.border_top = BroadcastValue(
312 value=self.rtf_body.border_top, dimension=dim
313 ).update_row(0, doc_border_top)
314 else:
315 if self.rtf_column_header[0].df is None and self.rtf_body.as_colheader:
316 columns = [
317 col
318 for col in self.df.columns
319 if col not in (self.rtf_body.page_by or [])
320 ]
321 self.rtf_column_header[0].df = pd.DataFrame([columns])
322 self.rtf_column_header = self.rtf_column_header[:1]
324 self.rtf_column_header[0].border_top = BroadcastValue(
325 value=self.rtf_column_header[0], dimension=dim
326 ).update_row(0, doc_border_top)
328 rtf_column_header = [
329 self._rtf_column_header_encode(df=header.df, rtf_attrs=header)
330 for header in self.rtf_column_header
331 ]
333 self.rtf_body.border_top = BroadcastValue(
334 value=self.rtf_body.border_top, dimension=dim
335 ).update_row(0, page_border_top)
336 self.rtf_body.border_bottom = BroadcastValue(
337 value=self.rtf_body.border_bottom, dimension=dim
338 ).update_row(dim[0] - 1, page_border_bottom)
339 self.rtf_body.border_bottom = BroadcastValue(
340 value=self.rtf_body.border_bottom, dimension=dim
341 ).update_row(dim[0] - 1, doc_border_bottom)
343 # Body
344 rtf_body = self._rtf_body_encode(df=self.df, rtf_attrs=self.rtf_body)
346 return "\n".join(
347 [
348 self._rtf_start_encode(),
349 self._rtf_font_table_encode(),
350 "\n",
351 self._rtf_page_encode(),
352 self._rtf_page_margin_encode(),
353 rtf_title,
354 "\n",
355 "\n".join(
356 header for sublist in rtf_column_header for header in sublist
357 ),
358 "\n".join(rtf_body),
359 "\n\n",
360 "}",
361 ]
362 )
364 def write_rtf(self, file_path: str) -> None:
365 """Write the RTF code into a `.rtf` file."""
366 print(file_path)
367 rtf_code = self.rtf_encode()
368 with open(file_path, "w", encoding="utf-8") as f:
369 f.write(rtf_code)