Skip to content

Commit aa8694b

Browse files
Babargeruh
authored andcommitted
feat: add mor delete file index support
1 parent 58e5ad6 commit aa8694b

8 files changed

Lines changed: 2133 additions & 141 deletions

File tree

pyiceberg/io/pyarrow.py

Lines changed: 145 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -978,18 +978,30 @@ def _get_file_format(file_format: FileFormat, **kwargs: Dict[str, Any]) -> ds.Fi
978978
raise ValueError(f"Unsupported file format: {file_format}")
979979

980980

981-
def _read_deletes(io: FileIO, data_file: DataFile) -> Dict[str, pa.ChunkedArray]:
981+
def _construct_fragment(io: FileIO, data_file: DataFile, file_format_kwargs: Dict[str, Any] = EMPTY_DICT) -> ds.Fragment:
982+
with io.new_input(data_file.file_path).open() as fi:
983+
return _get_file_format(
984+
data_file.file_format, **file_format_kwargs
985+
).make_fragment(fi)
986+
987+
def _read_deletes(io: FileIO, data_file: DataFile) -> Union[Dict[str, pa.ChunkedArray], pa.Table]:
982988
if data_file.file_format == FileFormat.PARQUET:
983-
with io.new_input(data_file.file_path).open() as fi:
984-
delete_fragment = _get_file_format(
985-
data_file.file_format, dictionary_columns=("file_path",), pre_buffer=True, buffer_size=ONE_MEGABYTE
986-
).make_fragment(fi)
987-
table = ds.Scanner.from_fragment(fragment=delete_fragment).to_table()
988-
table = table.unify_dictionaries()
989-
return {
990-
file.as_py(): table.filter(pc.field("file_path") == file).column("pos")
991-
for file in table.column("file_path").chunks[0].dictionary
992-
}
989+
delete_fragment = _construct_fragment(
990+
io,
991+
data_file,
992+
file_format_kwargs={"dictionary_columns": ("file_path",), "pre_buffer": True, "buffer_size": ONE_MEGABYTE},
993+
)
994+
table = ds.Scanner.from_fragment(fragment=delete_fragment).to_table()
995+
if data_file.content == DataFileContent.POSITION_DELETES:
996+
table = table.unify_dictionaries()
997+
return {
998+
file.as_py(): table.filter(pc.field("file_path") == file).column("pos")
999+
for file in table.column("file_path").chunks[0].dictionary
1000+
}
1001+
elif data_file.content == DataFileContent.EQUALITY_DELETES:
1002+
return table
1003+
else:
1004+
raise ValueError(f"Unsupported delete file content: {data_file.content}")
9931005
elif data_file.file_format == FileFormat.PUFFIN:
9941006
with io.new_input(data_file.file_path).open() as fi:
9951007
payload = fi.read()
@@ -1445,7 +1457,7 @@ def _task_to_record_batches(
14451457
bound_row_filter: BooleanExpression,
14461458
projected_schema: Schema,
14471459
projected_field_ids: Set[int],
1448-
positional_deletes: Optional[List[ChunkedArray]],
1460+
deletes: Optional[List[Union[pa.ChunkedArray, pa.Table]]],
14491461
case_sensitive: bool,
14501462
name_mapping: Optional[NameMapping] = None,
14511463
partition_spec: Optional[PartitionSpec] = None,
@@ -1479,9 +1491,18 @@ def _task_to_record_batches(
14791491
schema=physical_schema,
14801492
# This will push down the query to Arrow.
14811493
# But in case there are positional deletes, we have to apply them first
1482-
filter=pyarrow_filter if not positional_deletes else None,
1494+
filter=pyarrow_filter if not deletes else None,
14831495
columns=[col.name for col in file_project_schema.columns],
14841496
)
1497+
positional_deletes = []
1498+
combined_eq_deletes = []
1499+
if deletes:
1500+
positional_deletes = [d for d in deletes if isinstance(d, pa.ChunkedArray)]
1501+
equality_deletes = [d for d in deletes if isinstance(d, pa.Table)]
1502+
if equality_deletes:
1503+
task_eq_files = [df for df in task.delete_files if df.content == DataFileContent.EQUALITY_DELETES]
1504+
# Group and combine equality deletes
1505+
combined_eq_deletes = group_equality_deletes(task_eq_files, equality_deletes)
14851506

14861507
next_index = 0
14871508
batches = fragment_scanner.to_batches()
@@ -1499,6 +1520,17 @@ def _task_to_record_batches(
14991520
if current_batch.num_rows == 0:
15001521
continue
15011522

1523+
if combined_eq_deletes:
1524+
table = pa.Table.from_batches([current_batch])
1525+
for equality_ids, combined_table in combined_eq_deletes:
1526+
table = _apply_equality_deletes(table, combined_table, equality_ids, file_schema)
1527+
if table.num_rows == 0:
1528+
break
1529+
if table.num_rows > 0:
1530+
current_batch = table.combine_chunks().to_batches()[0]
1531+
else:
1532+
continue
1533+
15021534
# Apply the user filter
15031535
if pyarrow_filter is not None:
15041536
# Temporary fix until PyArrow 21 is released ( https://github.com/apache/arrow/pull/46057 )
@@ -1529,9 +1561,16 @@ def _task_to_record_batches(
15291561

15301562

15311563
def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> Dict[str, List[ChunkedArray]]:
1532-
deletes_per_file: Dict[str, List[ChunkedArray]] = {}
1533-
unique_deletes = set(itertools.chain.from_iterable([task.delete_files for task in tasks]))
1534-
if len(unique_deletes) > 0:
1564+
deletes_per_file: Dict[str, List[Union[pa.ChunkedArray, pa.Table]]] = {}
1565+
1566+
# Position Deletes
1567+
unique_deletes = {
1568+
df
1569+
for task in tasks
1570+
for df in task.delete_files
1571+
if df.content == DataFileContent.POSITION_DELETES and df.file_format != FileFormat.PUFFIN
1572+
}
1573+
if unique_deletes:
15351574
executor = ExecutorFactory.get_or_create()
15361575
deletes_per_files: Iterator[Dict[str, ChunkedArray]] = executor.map(
15371576
lambda args: _read_deletes(*args),
@@ -1543,7 +1582,44 @@ def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> Dict[st
15431582
deletes_per_file[file].append(arr)
15441583
else:
15451584
deletes_per_file[file] = [arr]
1585+
# Deletion Vectors
1586+
deletion_vectors = {
1587+
df
1588+
for task in tasks
1589+
for df in task.delete_files
1590+
if df.content == DataFileContent.POSITION_DELETES and df.file_format == FileFormat.PUFFIN
1591+
}
1592+
if deletion_vectors:
1593+
executor = ExecutorFactory.get_or_create()
1594+
dv_results = executor.map(
1595+
lambda args: _read_deletes(*args),
1596+
[(_fs_from_file_path(io, delete_file.file_path), delete_file) for delete_file in deletion_vectors],
1597+
)
1598+
for delete in dv_results:
1599+
for file, arr in delete.items():
1600+
# Deletion vectors replace all position deletes for a file
1601+
deletes_per_file[file] = [arr]
1602+
1603+
# Equality Deletes
1604+
equality_delete_tasks = []
1605+
for task in tasks:
1606+
equality_deletes = [df for df in task.delete_files if df.content == DataFileContent.EQUALITY_DELETES]
1607+
if equality_deletes:
1608+
for delete_file in equality_deletes:
1609+
equality_delete_tasks.append((task.file.file_path, delete_file))
1610+
1611+
if equality_delete_tasks:
1612+
executor = ExecutorFactory.get_or_create()
15461613

1614+
# Processing equality delete tasks in parallel like position deletes
1615+
equality_delete_results = executor.map(
1616+
lambda args: (args[0], _read_deletes(_fs_from_file_path(io, args[1].file_path), args[1])),
1617+
equality_delete_tasks,
1618+
)
1619+
for file_path, equality_delete_table in equality_delete_results:
1620+
if file_path not in deletes_per_file:
1621+
deletes_per_file[file_path] = []
1622+
deletes_per_file[file_path].append(equality_delete_table)
15471623
return deletes_per_file
15481624

15491625

@@ -2799,3 +2875,56 @@ def _get_field_from_arrow_table(arrow_table: pa.Table, field_path: str) -> pa.Ar
27992875
field_array = arrow_table[path_parts[0]]
28002876
# Navigate into the struct using the remaining path parts
28012877
return pc.struct_field(field_array, path_parts[1:])
2878+
2879+
2880+
def group_equality_deletes(
2881+
task_eq_files: List[DataFile], equality_delete_tables: List[pa.Table]
2882+
) -> List[Tuple[List[int], pa.Table]]:
2883+
"""Group equality delete tables by their equality IDs."""
2884+
equality_delete_groups: Dict[frozenset[int], List[Tuple[List[int], pa.Table]]] = {}
2885+
2886+
for delete_file, delete_table in zip(task_eq_files, equality_delete_tables):
2887+
if delete_file.equality_ids:
2888+
key = frozenset(delete_file.equality_ids)
2889+
2890+
# Add to the appropriate group
2891+
if key not in equality_delete_groups:
2892+
equality_delete_groups[key] = []
2893+
equality_delete_groups[key].append((delete_file.equality_ids, delete_table))
2894+
2895+
# Combine tables with the same equality IDs
2896+
combined_deletes = []
2897+
for items in equality_delete_groups.values():
2898+
# Use the original equality IDs from the first item
2899+
original_ids = items[0][0]
2900+
tables = [item[1] for item in items]
2901+
2902+
if tables:
2903+
combined_table = pa.concat_tables(tables)
2904+
combined_deletes.append((original_ids, combined_table))
2905+
2906+
return combined_deletes
2907+
2908+
2909+
def _apply_equality_deletes(
2910+
data_table: pa.Table, delete_table: pa.Table, equality_ids: List[int], data_schema: Optional[Schema]
2911+
) -> pa.Table:
2912+
"""Apply equality deletes to a data table.
2913+
2914+
Filter out rows from the table that match the equality delete table the conditions in it.
2915+
Args:
2916+
data_table: A PyArrow table which has data to filter
2917+
delete_table: A PyArrow table containing the equality deletes
2918+
equality_ids: A List of field IDs to use for equality comparison
2919+
data_schema: The schema of the PyArrow table
2920+
Returns:
2921+
A filtered PyArrow table with matching rows removed
2922+
"""
2923+
if len(delete_table) == 0:
2924+
return data_table
2925+
if data_schema is None:
2926+
raise ValueError("Schema is required for applying equality deletes")
2927+
equality_columns = [data_schema.find_field(fid).name for fid in equality_ids]
2928+
# Use PyArrow's join function with left anti join type
2929+
result = data_table.join(delete_table.select(equality_columns), keys=equality_columns, join_type="left anti")
2930+
return result

pyiceberg/table/__init__.py

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
)
4242

4343
from pydantic import Field
44-
from sortedcontainers import SortedList
4544

4645
import pyiceberg.expressions.parser as parser
4746
from pyiceberg.expressions import (
@@ -64,7 +63,6 @@
6463
)
6564
from pyiceberg.io import FileIO, load_file_io
6665
from pyiceberg.manifest import (
67-
POSITIONAL_DELETE_SCHEMA,
6866
DataFile,
6967
DataFileContent,
7068
ManifestContent,
@@ -78,6 +76,7 @@
7876
PartitionSpec,
7977
)
8078
from pyiceberg.schema import Schema
79+
from pyiceberg.table.delete_file_index import DeleteFileIndex
8180
from pyiceberg.table.inspect import InspectTable
8281
from pyiceberg.table.locations import LocationProvider, load_location_provider
8382
from pyiceberg.table.metadata import (
@@ -1793,29 +1792,20 @@ def _min_sequence_number(manifests: List[ManifestFile]) -> int:
17931792
return INITIAL_SEQUENCE_NUMBER
17941793

17951794

1796-
def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_entries: SortedList[ManifestEntry]) -> Set[DataFile]:
1797-
"""Check if the delete file is relevant for the data file.
1798-
1799-
Using the column metrics to see if the filename is in the lower and upper bound.
1795+
def _match_deletes_to_data_file(data_entry: ManifestEntry, delete_file_index: DeleteFileIndex) -> Set[DataFile]:
1796+
"""Check if delete files are relevant for the data file.
18001797
18011798
Args:
1802-
data_entry (ManifestEntry): The manifest entry path of the datafile.
1803-
positional_delete_entries (List[ManifestEntry]): All the candidate positional deletes manifest entries.
1799+
data_entry (ManifestEntry): The manifest entry of the data file.
1800+
delete_file_index (DeleteFileIndex): Index containing all delete files.
18041801
18051802
Returns:
1806-
A set of files that are relevant for the data file.
1803+
A set of delete files that are relevant for the data file.
18071804
"""
1808-
relevant_entries = positional_delete_entries[positional_delete_entries.bisect_right(data_entry) :]
1809-
1810-
if len(relevant_entries) > 0:
1811-
evaluator = _InclusiveMetricsEvaluator(POSITIONAL_DELETE_SCHEMA, EqualTo("file_path", data_entry.data_file.file_path))
1812-
return {
1813-
positional_delete_entry.data_file
1814-
for positional_delete_entry in relevant_entries
1815-
if evaluator.eval(positional_delete_entry.data_file)
1816-
}
1817-
else:
1818-
return set()
1805+
candidate_deletes = delete_file_index.for_data_file(
1806+
data_entry.sequence_number or 0, data_entry.data_file, partition_key=data_entry.data_file.partition
1807+
)
1808+
return set(candidate_deletes)
18191809

18201810

18211811
class DataScan(TableScan):
@@ -1921,7 +1911,7 @@ def plan_files(self) -> Iterable[FileScanTask]:
19211911
min_sequence_number = _min_sequence_number(manifests)
19221912

19231913
data_entries: List[ManifestEntry] = []
1924-
positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER)
1914+
delete_file_index = DeleteFileIndex(self.table_metadata.schema(), self.table_metadata.specs())
19251915

19261916
executor = ExecutorFactory.get_or_create()
19271917
for manifest_entry in chain(
@@ -1942,19 +1932,16 @@ def plan_files(self) -> Iterable[FileScanTask]:
19421932
data_file = manifest_entry.data_file
19431933
if data_file.content == DataFileContent.DATA:
19441934
data_entries.append(manifest_entry)
1945-
elif data_file.content == DataFileContent.POSITION_DELETES:
1946-
positional_delete_entries.add(manifest_entry)
1947-
elif data_file.content == DataFileContent.EQUALITY_DELETES:
1948-
raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568")
1935+
elif data_file.content in (DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES):
1936+
delete_file_index.add_delete_file(manifest_entry, partition_key=data_file.partition)
19491937
else:
19501938
raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}")
1951-
19521939
return [
19531940
FileScanTask(
19541941
data_entry.data_file,
19551942
delete_files=_match_deletes_to_data_file(
19561943
data_entry,
1957-
positional_delete_entries,
1944+
delete_file_index,
19581945
),
19591946
residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for(
19601947
data_entry.data_file.partition

0 commit comments

Comments
 (0)