Coverage for src/rtflite/pagination/core.py: 91%

103 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 16:35 +0000

1from collections.abc import Sequence 

2from typing import Any 

3 

4import polars as pl 

5from pydantic import BaseModel, ConfigDict, Field 

6 

7from ..attributes import TableAttributes 

8from ..fonts_mapping import FontName, FontNumber 

9from ..strwidth import get_string_width 

10 

11 

12class RTFPagination(BaseModel): 

13 """Core pagination logic and calculations for RTF documents""" 

14 

15 model_config = ConfigDict(arbitrary_types_allowed=True) 

16 

17 page_width: float = Field(..., description="Page width in inches") 

18 page_height: float = Field(..., description="Page height in inches") 

19 margin: Sequence[float] = Field( 

20 ..., description="Page margins [left, right, top, bottom, header, footer]" 

21 ) 

22 nrow: int = Field(..., description="Maximum rows per page") 

23 orientation: str = Field(..., description="Page orientation") 

24 

25 def calculate_available_space(self) -> dict[str, float]: 

26 """Calculate available space for content on each page""" 

27 content_width = ( 

28 self.page_width - self.margin[0] - self.margin[1] 

29 ) # left + right margins 

30 content_height = ( 

31 self.page_height - self.margin[2] - self.margin[3] 

32 ) # top + bottom margins 

33 header_space = self.margin[4] # header margin 

34 footer_space = self.margin[5] # footer margin 

35 

36 return { 

37 "content_width": content_width, 

38 "content_height": content_height, 

39 "header_space": header_space, 

40 "footer_space": footer_space, 

41 } 

42 

43 

44class PageBreakCalculator(BaseModel): 

45 """Calculates where page breaks should occur based on content and constraints""" 

46 

47 model_config = ConfigDict(arbitrary_types_allowed=True) 

48 

49 pagination: RTFPagination = Field(..., description="Pagination configuration") 

50 

51 def calculate_content_rows( 

52 self, 

53 df: pl.DataFrame, 

54 col_widths: list[float], 

55 table_attrs: TableAttributes | None = None, 

56 font_size: float = 9, 

57 ) -> list[int]: 

58 """Calculate how many rows each content row will occupy when rendered 

59 

60 Args: 

61 df: DataFrame containing the content 

62 col_widths: Width of each column in inches 

63 table_attrs: Table attributes containing cell height and font size info 

64 font_size: Default font size in points 

65 

66 Returns: 

67 List of row counts for each data row 

68 """ 

69 row_counts = [] 

70 dim = df.shape 

71 

72 for row_idx in range(df.height): 

73 max_lines_in_row = 1 

74 

75 for col_idx, col_width in enumerate(col_widths): 

76 if col_idx < len(df.columns): 

77 # Use proper polars column access - df[column_name][row_idx] 

78 col_name = df.columns[col_idx] 

79 cell_value = str(df[col_name][row_idx]) 

80 

81 # Get actual font size from table attributes if available 

82 actual_font_size = font_size 

83 if table_attrs and hasattr(table_attrs, "text_font_size"): 

84 from ..attributes import BroadcastValue 

85 

86 actual_font_size = BroadcastValue( 

87 value=table_attrs.text_font_size, dimension=dim 

88 ).iloc(row_idx, col_idx) 

89 

90 # Get actual font from table attributes if available 

91 actual_font: FontName | FontNumber = ( 

92 1 # Default to font number 1 (Times New Roman) 

93 ) 

94 if table_attrs and hasattr(table_attrs, "text_font"): 

95 from ..attributes import BroadcastValue 

96 

97 font_value = BroadcastValue( 

98 value=table_attrs.text_font, dimension=dim 

99 ).iloc(row_idx, col_idx) 

100 # Handle both FontNumber (int) and FontName (str) 

101 if isinstance(font_value, int) and 1 <= font_value <= 10: 

102 actual_font = font_value # type: ignore[assignment] 

103 elif isinstance(font_value, str): 

104 # If it's a string, use it directly 

105 actual_font = font_value # type: ignore[assignment] 

106 

107 # Calculate how many lines this text will need 

108 # Use the actual font from table attributes with actual font size 

109 text_width = get_string_width( 

110 cell_value, 

111 font=actual_font, 

112 font_size=actual_font_size, # type: ignore[arg-type] 

113 ) 

114 lines_needed = max(1, int(text_width / col_width) + 1) 

115 max_lines_in_row = max(max_lines_in_row, lines_needed) 

116 

117 # Account for cell height if specified in table attributes 

118 cell_height_lines = 1 

119 if table_attrs and hasattr(table_attrs, "cell_height"): 

120 from ..attributes import BroadcastValue 

121 

122 cell_height = BroadcastValue( 

123 value=table_attrs.cell_height, dimension=dim 

124 ).iloc(row_idx, 0) 

125 # Convert cell height from inches to approximate line count 

126 # Assuming default line height of ~0.15 inches 

127 cell_height_lines = max(1, int(cell_height / 0.15)) 

128 

129 row_counts.append(max(max_lines_in_row, cell_height_lines)) 

130 

131 return row_counts 

132 

133 def find_page_breaks( 

134 self, 

135 df: pl.DataFrame, 

136 col_widths: list[float], 

137 page_by: list[str] | None = None, 

138 new_page: bool = False, 

139 table_attrs: TableAttributes | None = None, 

140 additional_rows_per_page: int = 0, 

141 ) -> list[tuple[int, int]]: 

142 """Find optimal page break positions (r2rtf compatible) 

143 

144 Args: 

145 df: DataFrame to paginate 

146 col_widths: Column widths in inches 

147 page_by: Columns to group by for page breaks 

148 new_page: Whether to force new pages between groups 

149 table_attrs: Table attributes for accurate row calculation 

150 additional_rows_per_page: Additional rows per page (headers, footnotes, sources) 

151 

152 Returns: 

153 List of (start_row, end_row) tuples for each page 

154 """ 

155 if df.height == 0: 

156 return [] 

157 

158 row_counts = self.calculate_content_rows(df, col_widths, table_attrs) 

159 page_breaks = [] 

160 current_page_start = 0 

161 current_page_rows = 0 

162 

163 # Calculate available rows for data (r2rtf compatible) 

164 # In r2rtf, nrow includes ALL rows (headers, data, footnotes, sources) 

165 available_data_rows_per_page = max( 

166 1, self.pagination.nrow - additional_rows_per_page 

167 ) 

168 

169 for row_idx, row_height in enumerate(row_counts): 

170 # Check if adding this row would exceed page limit (accounting for additional rows) 

171 if current_page_rows + row_height > available_data_rows_per_page: 

172 # Create page break before this row 

173 if current_page_start < row_idx: 

174 page_breaks.append((current_page_start, row_idx - 1)) 

175 current_page_start = row_idx 

176 current_page_rows = row_height 

177 else: 

178 current_page_rows += row_height 

179 

180 # Handle group-based page breaks 

181 if page_by and new_page and row_idx < df.height - 1: 

182 current_group = {col: df[col][row_idx] for col in page_by} 

183 next_group = {col: df[col][row_idx + 1] for col in page_by} 

184 

185 if current_group != next_group: 

186 # Force page break between groups 

187 page_breaks.append((current_page_start, row_idx)) 

188 current_page_start = row_idx + 1 

189 current_page_rows = 0 

190 

191 # Add final page 

192 if current_page_start < df.height: 

193 page_breaks.append((current_page_start, df.height - 1)) 

194 

195 return page_breaks 

196 

197 

198class ContentDistributor(BaseModel): 

199 """Manages content distribution across multiple pages""" 

200 

201 model_config = ConfigDict(arbitrary_types_allowed=True) 

202 

203 pagination: RTFPagination = Field(..., description="Pagination configuration") 

204 calculator: PageBreakCalculator = Field(..., description="Page break calculator") 

205 

206 def distribute_content( 

207 self, 

208 df: pl.DataFrame, 

209 col_widths: list[float], 

210 page_by: list[str] | None = None, 

211 new_page: bool = False, 

212 pageby_header: bool = True, 

213 table_attrs: TableAttributes | None = None, 

214 additional_rows_per_page: int = 0, 

215 subline_by: list[str] | None = None, 

216 ) -> list[dict[str, Any]]: 

217 """Distribute content across multiple pages (r2rtf compatible) 

218 

219 Args: 

220 df: DataFrame to distribute 

221 col_widths: Column widths in inches 

222 page_by: Columns to group by 

223 new_page: Force new pages between groups 

224 pageby_header: Repeat headers on new pages 

225 table_attrs: Table attributes for accurate calculations 

226 additional_rows_per_page: Additional rows per page (headers, footnotes, sources) 

227 subline_by: Columns to create subline headers by (forces new_page=True) 

228 

229 Returns: 

230 List of page information dictionaries 

231 """ 

232 # If subline_by is specified, treat it as page_by with new_page=True 

233 if subline_by: 

234 page_by = subline_by 

235 new_page = True 

236 

237 page_breaks = self.calculator.find_page_breaks( 

238 df, col_widths, page_by, new_page, table_attrs, additional_rows_per_page 

239 ) 

240 pages = [] 

241 

242 for page_num, (start_row, end_row) in enumerate(page_breaks): 

243 page_df = df[start_row : end_row + 1] 

244 

245 page_info = { 

246 "page_number": page_num + 1, 

247 "total_pages": len(page_breaks), 

248 "data": page_df, 

249 "start_row": start_row, 

250 "end_row": end_row, 

251 "is_first_page": page_num == 0, 

252 "is_last_page": page_num == len(page_breaks) - 1, 

253 "needs_header": pageby_header or page_num == 0, 

254 "col_widths": col_widths, 

255 } 

256 

257 # Add subline_by header information for each page 

258 if subline_by: 

259 page_info["subline_header"] = self.get_group_headers( 

260 df, subline_by, start_row 

261 ) 

262 

263 pages.append(page_info) 

264 

265 return pages 

266 

267 def get_group_headers( 

268 self, df: pl.DataFrame, page_by: list[str], start_row: int 

269 ) -> dict[str, Any]: 

270 """Get group header information for a page 

271 

272 Args: 

273 df: Original DataFrame 

274 page_by: Grouping columns 

275 start_row: Starting row for this page 

276 

277 Returns: 

278 Dictionary with group header information 

279 """ 

280 if not page_by or start_row >= df.height: 

281 return {} 

282 

283 group_values = {} 

284 for col in page_by: 

285 group_values[col] = df[col][start_row] 

286 

287 return { 

288 "group_by_columns": page_by, 

289 "group_values": group_values, 

290 "header_text": " | ".join( 

291 f"{col}: {val}" for col, val in group_values.items() 

292 ), 

293 }