@@ -1674,6 +1674,10 @@ def to_polars(self) -> pl.DataFrame:
16741674class FileBasedScan (AbstractTableScan , ABC ):
16751675 """A base class for table scans that plan FileScanTasks."""
16761676
1677+ @cached_property
1678+ def _manifest_group_planner (self ) -> ManifestGroupPlanner :
1679+ return ManifestGroupPlanner (self )
1680+
16771681 @abstractmethod
16781682 def plan_files (self ) -> Iterable [FileScanTask ]: ...
16791683
@@ -1918,14 +1922,12 @@ def plan_files(self) -> Iterable[FileScanTask]:
19181922 if not snapshot :
19191923 return iter ([])
19201924
1921- return ManifestGroup (
1922- manifests = snapshot .manifests (self .io ),
1923- io = self .io ,
1924- table_metadata = self .table_metadata ,
1925- parsed_row_filter = self .row_filter ,
1926- case_sensitive = self .case_sensitive ,
1927- options = self .options ,
1928- ).plan_files ()
1925+ return self ._manifest_group_planner .plan_files (manifests = snapshot .manifests (self .io ))
1926+
1927+ # TODO: Document motivation and un-caching
1928+ @property
1929+ def partition_filters (self ) -> KeyDefaultDict [int , BooleanExpression ]:
1930+ return self ._manifest_group_planner .partition_filters
19291931
19301932
19311933A = TypeVar ("A" , bound = "IncrementalScan" , covariant = True )
@@ -2051,16 +2053,11 @@ def plan_files(self) -> Iterable[FileScanTask]:
20512053 if manifest_file .content == ManifestContent .DATA and manifest_file .added_snapshot_id in append_snapshot_ids
20522054 }
20532055
2054- return ManifestGroup (
2056+ return self . _manifest_group_planner . plan_files (
20552057 manifests = list (manifests ),
2056- io = self .io ,
2057- table_metadata = self .table_metadata ,
2058- parsed_row_filter = self .row_filter ,
2059- case_sensitive = self .case_sensitive ,
2060- options = self .options ,
20612058 manifest_entry_filter = lambda manifest_entry : manifest_entry .snapshot_id in append_snapshot_ids
20622059 and manifest_entry .status == ManifestEntryStatus .ADDED ,
2063- ). plan_files ()
2060+ )
20642061
20652062 def _validate_and_resolve_snapshots (self ) -> tuple [int , int ]:
20662063 current_snapshot = self .table_metadata .current_snapshot ()
@@ -2108,42 +2105,31 @@ def _appends_between(
21082105 ]
21092106
21102107
2111- class ManifestGroup :
2112- manifests : List [ManifestFile ]
2108+ class ManifestGroupPlanner :
21132109 io : FileIO
21142110 table_metadata : TableMetadata
2115- parsed_row_filter : BooleanExpression
2111+ row_filter : BooleanExpression
21162112 case_sensitive : bool
21172113 options : Properties
2118- manifest_entry_filter : Callable [[ManifestEntry ], bool ]
21192114
2120- def __init__ (
2115+ def __init__ (self , scan : AbstractTableScan ):
2116+ self .io = scan .io
2117+ self .table_metadata = scan .table_metadata
2118+ self .row_filter = scan .row_filter
2119+ self .case_sensitive = scan .case_sensitive
2120+ self .options = scan .options
2121+
2122+ def plan_files (
21212123 self ,
21222124 manifests : List [ManifestFile ],
2123- io : FileIO ,
2124- table_metadata : TableMetadata ,
2125- parsed_row_filter : BooleanExpression ,
2126- case_sensitive : bool ,
2127- options : Properties ,
21282125 manifest_entry_filter : Callable [[ManifestEntry ], bool ] = lambda _ : True ,
2129- ):
2130- self .manifests = manifests
2131- self .io = io
2132- self .table_metadata = table_metadata
2133- self .parsed_row_filter = parsed_row_filter
2134- self .case_sensitive = case_sensitive
2135- self .options = options
2136- self .manifest_entry_filter = manifest_entry_filter
2137-
2138- def plan_files (self ) -> Iterable [FileScanTask ]:
2126+ ) -> Iterable [FileScanTask ]:
21392127 # step 1: filter manifests using partition summaries
21402128 # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id
21412129
21422130 manifest_evaluators : Dict [int , Callable [[ManifestFile ], bool ]] = KeyDefaultDict (self ._build_manifest_evaluator )
21432131 manifests = [
2144- manifest_file
2145- for manifest_file in self .manifests
2146- if manifest_evaluators [manifest_file .partition_spec_id ](manifest_file )
2132+ manifest_file for manifest_file in manifests if manifest_evaluators [manifest_file .partition_spec_id ](manifest_file )
21472133 ]
21482134
21492135 residual_evaluators : Dict [int , Callable [[DataFile ], ResidualEvaluator ]] = KeyDefaultDict (self ._build_residual_evaluator )
@@ -2174,7 +2160,7 @@ def plan_files(self) -> Iterable[FileScanTask]:
21742160 ],
21752161 )
21762162 ):
2177- if not self . manifest_entry_filter (manifest_entry ):
2163+ if not manifest_entry_filter (manifest_entry ):
21782164 continue
21792165
21802166 data_file = manifest_entry .data_file
@@ -2201,25 +2187,23 @@ def plan_files(self) -> Iterable[FileScanTask]:
22012187 for data_entry in data_entries
22022188 ]
22032189
2204- def _build_partition_projection (self , spec_id : int ) -> BooleanExpression :
2205- project = inclusive_projection (self .table_metadata .schema (), self .table_metadata .specs ()[spec_id ], self .case_sensitive )
2206- return project (self .parsed_row_filter )
2207-
2208- # TODO: Document that this method was removed
2209- # TODO: Or probably: Don't move it and think. We should cache on the scan classes themselves, not here
22102190 @cached_property
2211- def _partition_filters (self ) -> KeyDefaultDict [int , BooleanExpression ]:
2191+ def partition_filters (self ) -> KeyDefaultDict [int , BooleanExpression ]:
22122192 return KeyDefaultDict (self ._build_partition_projection )
22132193
2194+ def _build_partition_projection (self , spec_id : int ) -> BooleanExpression :
2195+ project = inclusive_projection (self .table_metadata .schema (), self .table_metadata .specs ()[spec_id ], self .case_sensitive )
2196+ return project (self .row_filter )
2197+
22142198 def _build_manifest_evaluator (self , spec_id : int ) -> Callable [[ManifestFile ], bool ]:
22152199 spec = self .table_metadata .specs ()[spec_id ]
2216- return manifest_evaluator (spec , self .table_metadata .schema (), self ._partition_filters [spec_id ], self .case_sensitive )
2200+ return manifest_evaluator (spec , self .table_metadata .schema (), self .partition_filters [spec_id ], self .case_sensitive )
22172201
22182202 def _build_partition_evaluator (self , spec_id : int ) -> Callable [[DataFile ], bool ]:
22192203 spec = self .table_metadata .specs ()[spec_id ]
22202204 partition_type = spec .partition_type (self .table_metadata .schema ())
22212205 partition_schema = Schema (* partition_type .fields )
2222- partition_expr = self ._partition_filters [spec_id ]
2206+ partition_expr = self .partition_filters [spec_id ]
22232207
22242208 # The lambda created here is run in multiple threads.
22252209 # So we avoid creating _EvaluatorExpression methods bound to a single
@@ -2235,7 +2219,7 @@ def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]:
22352219 # shared instance across multiple threads.
22362220 return lambda data_file : _InclusiveMetricsEvaluator (
22372221 schema ,
2238- self .parsed_row_filter ,
2222+ self .row_filter ,
22392223 self .case_sensitive ,
22402224 include_empty_files ,
22412225 ).eval (data_file )
@@ -2253,13 +2237,13 @@ def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], Residu
22532237 return lambda datafile : (
22542238 residual_evaluator_of (
22552239 spec = spec ,
2256- expr = self .parsed_row_filter ,
2240+ expr = self .row_filter ,
22572241 case_sensitive = self .case_sensitive ,
22582242 schema = self .table_metadata .schema (),
22592243 )
22602244 )
22612245
2262- # TODO: Document that this method was removed from DataScan and it was made static
2246+ # TODO: Document that this method was was made static
22632247 @staticmethod
22642248 def _check_sequence_number (min_sequence_number : int , manifest : ManifestFile ) -> bool :
22652249 """Ensure that no manifests are loaded that contain deletes that are older than the data.
0 commit comments