@@ -105,9 +105,7 @@ def expire_snapshots_older_than(self, timestamp_ms: int) -> None:
105105
106106 if snapshots_to_expire :
107107 with self .tbl .transaction () as txn :
108- from pyiceberg .table .update import RemoveSnapshotsUpdate
109-
110- txn ._apply ((RemoveSnapshotsUpdate (snapshot_ids = snapshots_to_expire ),))
108+ self .expire_snapshots_by_ids (snapshots_to_expire )
111109
112110 def expire_snapshots_older_than_with_retention (
113111 self , timestamp_ms : int , retain_last_n : Optional [int ] = None , min_snapshots_to_keep : Optional [int ] = None
@@ -125,9 +123,7 @@ def expire_snapshots_older_than_with_retention(
125123
126124 if snapshots_to_expire :
127125 with self .tbl .transaction () as txn :
128- from pyiceberg .table .update import RemoveSnapshotsUpdate
129-
130- txn ._apply ((RemoveSnapshotsUpdate (snapshot_ids = snapshots_to_expire ),))
126+ self .expire_snapshots_by_ids (snapshots_to_expire )
131127
132128 def retain_last_n_snapshots (self , n : int ) -> None :
133129 """Keep only the last N snapshots, expiring all others.
@@ -163,9 +159,7 @@ def retain_last_n_snapshots(self, n: int) -> None:
163159
164160 if snapshots_to_expire :
165161 with self .tbl .transaction () as txn :
166- from pyiceberg .table .update import RemoveSnapshotsUpdate
167-
168- txn ._apply ((RemoveSnapshotsUpdate (snapshot_ids = snapshots_to_expire ),))
162+ self .expire_snapshots_by_ids (snapshots_to_expire )
169163
170164 def _get_snapshots_to_expire_with_retention (
171165 self , timestamp_ms : Optional [int ] = None , retain_last_n : Optional [int ] = None , min_snapshots_to_keep : Optional [int ] = None
@@ -298,101 +292,108 @@ def _get_protected_snapshot_ids(self, table_metadata: TableMetadata) -> Set[int]
298292 protected_ids .add (ref .snapshot_id )
299293 return protected_ids
300294
301- def _get_all_datafiles (
302- self ,
303- scan_all_snapshots : bool = False ,
304- target_file_path : Optional [str ] = None ,
305- parallel : bool = True ,
306- ) -> List [DataFile ]:
307- """Collect all DataFiles in the table, optionally filtering by file path."""
295+ def _get_all_datafiles (self ) -> List [DataFile ]:
296+ """Collect all DataFiles in the table, scanning all partitions."""
308297 datafiles : List [DataFile ] = []
309298
310299 def process_manifest (manifest : ManifestFile ) -> list [DataFile ]:
311300 found : list [DataFile ] = []
312301 for entry in manifest .fetch_manifest_entry (io = self .tbl .io ):
313302 if hasattr (entry , "data_file" ):
314- df = entry .data_file
315- if target_file_path is None or df .file_path == target_file_path :
316- found .append (df )
303+ found .append (entry .data_file )
317304 return found
318305
319- if scan_all_snapshots :
320- manifests = []
321- for snapshot in self .tbl .snapshots ():
322- manifests .extend (snapshot .manifests (io = self .tbl .io ))
323- if parallel :
324- with ThreadPoolExecutor () as executor :
325- results = executor .map (process_manifest , manifests )
326- for res in results :
327- datafiles .extend (res )
328- else :
329- for manifest in manifests :
330- datafiles .extend (process_manifest (manifest ))
331- else :
332- # Only current snapshot
333- for chunk in self .tbl .inspect .data_files ().to_pylist ():
334- file_path = chunk .get ("file_path" )
335- partition : dict [str , Any ] = dict (chunk .get ("partition" , {}) or {})
336- if target_file_path is None or file_path == target_file_path :
337- datafiles .append (DataFile (file_path = file_path , partition = partition ))
306+ # Scan all snapshots
307+ manifests = []
308+ for snapshot in self .tbl .snapshots ():
309+ manifests .extend (snapshot .manifests (io = self .tbl .io ))
310+ with ThreadPoolExecutor () as executor :
311+ results = executor .map (process_manifest , manifests )
312+ for res in results :
313+ datafiles .extend (res )
314+
315+ return datafiles
316+
317+ def _get_all_datafiles_with_context (self ) -> List [tuple [DataFile , str , int ]]:
318+ """Collect all DataFiles in the table, scanning all partitions, with manifest context."""
319+ datafiles : List [tuple [DataFile , str , int ]] = []
320+
321+ def process_manifest (manifest : ManifestFile ) -> list [tuple [DataFile , str , int ]]:
322+ found : list [tuple [DataFile , str , int ]] = []
323+ for idx , entry in enumerate (manifest .fetch_manifest_entry (io = self .tbl .io )):
324+ if hasattr (entry , "data_file" ):
325+ found .append ((entry .data_file , getattr (manifest , 'manifest_path' , str (manifest )), idx ))
326+ return found
327+
328+ # Scan all snapshots
329+ manifests = []
330+ for snapshot in self .tbl .snapshots ():
331+ manifests .extend (snapshot .manifests (io = self .tbl .io ))
332+ with ThreadPoolExecutor () as executor :
333+ results = executor .map (process_manifest , manifests )
334+ for res in results :
335+ datafiles .extend (res )
336+
338337 return datafiles
339338
340- def deduplicate_data_files (
341- self ,
342- scan_all_partitions : bool = True ,
343- scan_all_snapshots : bool = False ,
344- to_remove : Optional [List [Union [DataFile , str ]]] = None ,
345- parallel : bool = True ,
346- ) -> List [DataFile ]:
339+ def _detect_duplicates (self , all_datafiles_with_context : List [tuple [DataFile , str , int ]]) -> List [DataFile ]:
340+ """Detect duplicate data files based on file name and extension."""
341+ seen = {}
342+ processed_entries = set ()
343+ duplicates = []
344+
345+ for df , manifest_path , entry_idx in all_datafiles_with_context :
346+ # Extract file name and extension
347+ file_name_with_extension = df .file_path .split ("/" )[- 1 ]
348+ entry_key = (manifest_path , entry_idx )
349+
350+ if file_name_with_extension in seen :
351+ if entry_key not in processed_entries :
352+ duplicates .append (df )
353+ processed_entries .add (entry_key )
354+ else :
355+ seen [file_name_with_extension ] = (df , manifest_path , entry_idx )
356+
357+ return duplicates
358+
359+ def deduplicate_data_files (self ) -> List [DataFile ]:
347360 """
348361 Remove duplicate data files from an Iceberg table.
349362
350- Args:
351- scan_all_partitions: If True, scan all partitions for duplicates (uses file_path+partition as key).
352- scan_all_snapshots: If True, scan all snapshots for duplicates, otherwise only current snapshot.
353- to_remove: List of DataFile objects or file path strings to remove. If None, auto-detect duplicates.
354- parallel: If True, parallelize manifest traversal.
355-
356363 Returns:
357364 List of removed DataFile objects.
358365 """
359366 removed : List [DataFile ] = []
360367
361- # Determine what to remove
362- if to_remove is None :
363- # Auto-detect duplicates
364- all_datafiles = self ._get_all_datafiles (scan_all_snapshots = scan_all_snapshots , parallel = parallel )
365- seen = {}
366- duplicates = []
367- for df in all_datafiles :
368- partition : dict [str , Any ] = df .partition .to_dict () if hasattr (df .partition , "to_dict" ) else {}
369- if scan_all_partitions :
370- key = (df .file_path , tuple (sorted (partition .items ())) if partition else ())
371- else :
372- key = (df .file_path , ()) # Add an empty tuple for partition when scan_all_partitions is False
373- if key in seen :
374- duplicates .append (df )
375- else :
376- seen [key ] = df
377- to_remove = duplicates # type: ignore[assignment]
378-
379- # Normalize to DataFile objects
380- normalized_to_remove : List [DataFile ] = []
381- all_datafiles = self ._get_all_datafiles (scan_all_snapshots = scan_all_snapshots , parallel = parallel )
382- for item in to_remove or []:
383- if isinstance (item , DataFile ):
384- normalized_to_remove .append (item )
385- elif isinstance (item , str ):
386- # Remove all DataFiles with this file_path
387- for df in all_datafiles :
388- if df .file_path == item :
389- normalized_to_remove .append (df )
390- else :
391- raise ValueError (f"Unsupported type in to_remove: { type (item )} " )
368+ # Collect all data files
369+ all_datafiles_with_context = self ._get_all_datafiles_with_context ()
370+
371+ # Detect duplicates
372+ duplicates = self ._detect_duplicates (all_datafiles_with_context )
392373
393374 # Remove the DataFiles
394- for df in normalized_to_remove :
395- self .tbl .transaction ().update_snapshot ().overwrite ().delete_data_file (df ). commit ()
375+ for df in duplicates :
376+ self .tbl .transaction ().update_snapshot ().overwrite ().delete_data_file (df )
396377 removed .append (df )
397378
398379 return removed
380+
381+ def _detect_duplicates (self , all_datafiles_with_context : List [tuple [DataFile , str , int ]]) -> List [DataFile ]:
382+ """Detect duplicate data files based on file path and partition."""
383+ seen = {}
384+ processed_entries = set ()
385+ duplicates = []
386+
387+ for df , manifest_path , entry_idx in all_datafiles_with_context :
388+ partition : dict [str , Any ] = df .partition .to_dict () if hasattr (df .partition , "to_dict" ) else {}
389+ key = (df .file_path , tuple (sorted (partition .items ())) if partition else ())
390+ entry_key = (manifest_path , entry_idx )
391+
392+ if key in seen :
393+ if entry_key not in processed_entries :
394+ duplicates .append (df )
395+ processed_entries .add (entry_key )
396+ else :
397+ seen [key ] = (df , manifest_path , entry_idx )
398+
399+ return duplicates
0 commit comments