Skip to content

Commit 8931ed0

Browse files
committed
Equality deletes
1 parent 794ec02 commit 8931ed0

7 files changed

Lines changed: 515 additions & 50 deletions

File tree

pyiceberg/io/pyarrow.py

Lines changed: 104 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1136,6 +1136,13 @@ def _read_deletes(io: FileIO, data_file: DataFile) -> dict[str, pa.ChunkedArray]
11361136
raise ValueError(f"Delete file format not supported: {data_file.file_format}")
11371137

11381138

1139+
def _read_equality_deletes(io: FileIO, delete_file: DataFile) -> pa.Table:
1140+
arrow_format = _get_file_format(delete_file.file_format, pre_buffer=True, buffer_size=ONE_MEGABYTE)
1141+
with io.new_input(delete_file.file_path).open() as fi:
1142+
fragment = arrow_format.make_fragment(fi)
1143+
return ds.Scanner.from_fragment(fragment=fragment).to_table()
1144+
1145+
11391146
def _combine_positional_deletes(positional_deletes: list[pa.ChunkedArray], start_index: int, end_index: int) -> pa.Array:
11401147
if len(positional_deletes) == 1:
11411148
all_chunks = positional_deletes[0]
@@ -1609,6 +1616,7 @@ def _task_to_record_batches(
16091616
table_schema: Schema,
16101617
projected_field_ids: set[int],
16111618
positional_deletes: list[ChunkedArray] | None,
1619+
equality_deletes: list[tuple[set[int], pa.Table]] | None,
16121620
case_sensitive: bool,
16131621
name_mapping: NameMapping | None = None,
16141622
partition_spec: PartitionSpec | None = None,
@@ -1643,14 +1651,20 @@ def _task_to_record_batches(
16431651
bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive)
16441652
pyarrow_filter = expression_to_pyarrow(bound_file_filter, file_schema)
16451653

1646-
file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False)
1654+
# Ensure equality delete columns are also projected
1655+
all_projected_field_ids = projected_field_ids.copy()
1656+
if equality_deletes:
1657+
for eq_ids, _ in equality_deletes:
1658+
all_projected_field_ids.update(eq_ids)
1659+
1660+
file_project_schema = prune_columns(file_schema, all_projected_field_ids, select_full_types=False)
16471661

16481662
fragment_scanner = ds.Scanner.from_fragment(
16491663
fragment=fragment,
16501664
schema=physical_schema,
16511665
# This will push down the query to Arrow.
1652-
# But in case there are positional deletes, we have to apply them first
1653-
filter=pyarrow_filter if not positional_deletes else None,
1666+
# But in case there are positional or equality deletes, we have to apply them first
1667+
filter=pyarrow_filter if not positional_deletes and not equality_deletes else None,
16541668
columns=[col.name for col in file_project_schema.columns],
16551669
)
16561670

@@ -1666,6 +1680,38 @@ def _task_to_record_batches(
16661680
indices = _combine_positional_deletes(positional_deletes, current_index, current_index + len(batch))
16671681
current_batch = current_batch.take(indices)
16681682

1683+
if current_batch.num_rows > 0 and equality_deletes:
1684+
for eq_ids, eq_delete_table in equality_deletes:
1685+
try:
1686+
eq_file_schema = pyarrow_to_schema(
1687+
eq_delete_table.schema,
1688+
name_mapping=name_mapping,
1689+
format_version=format_version,
1690+
)
1691+
1692+
rename_map = {}
1693+
for field_id in eq_ids:
1694+
file_name = eq_file_schema.find_column_name(field_id)
1695+
current_name = table_schema.find_column_name(field_id)
1696+
if file_name != current_name:
1697+
rename_map[file_name] = current_name
1698+
1699+
if rename_map:
1700+
eq_delete_table = eq_delete_table.rename_columns(
1701+
[rename_map.get(name, name) for name in eq_delete_table.column_names]
1702+
)
1703+
1704+
join_keys = [table_schema.find_column_name(field_id) for field_id in eq_ids]
1705+
current_table = pa.Table.from_batches([current_batch])
1706+
current_table = current_table.join(eq_delete_table, keys=join_keys, join_type="left anti")
1707+
1708+
if current_table.num_rows == 0:
1709+
current_batch = current_table.to_batches()[0]
1710+
break
1711+
current_batch = current_table.to_batches()[0]
1712+
except (ValueError, ResolveError):
1713+
continue
1714+
16691715
# skip empty batches
16701716
if current_batch.num_rows == 0:
16711717
continue
@@ -1691,23 +1737,57 @@ def _task_to_record_batches(
16911737
)
16921738

16931739

1694-
def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> dict[str, list[ChunkedArray]]:
1695-
deletes_per_file: dict[str, list[ChunkedArray]] = {}
1740+
def _read_all_delete_files(
1741+
io: FileIO, tasks: Iterable[FileScanTask]
1742+
) -> tuple[dict[str, list[ChunkedArray]], dict[str, list[tuple[set[int], pa.Table]]]]:
1743+
pos_deletes_per_file: dict[str, list[ChunkedArray]] = {}
1744+
eq_deletes_per_file: dict[str, list[tuple[set[int], pa.Table]]] = {}
1745+
16961746
unique_deletes = set(itertools.chain.from_iterable([task.delete_files for task in tasks]))
16971747
if len(unique_deletes) > 0:
1748+
unique_pos_deletes = {d for d in unique_deletes if d.content == DataFileContent.POSITION_DELETES}
1749+
unique_eq_deletes = {d for d in unique_deletes if d.content == DataFileContent.EQUALITY_DELETES}
1750+
16981751
executor = ExecutorFactory.get_or_create()
1699-
deletes_per_files: Iterator[dict[str, ChunkedArray]] = executor.map(
1700-
lambda args: _read_deletes(*args),
1701-
[(io, delete_file) for delete_file in unique_deletes],
1702-
)
1703-
for delete in deletes_per_files:
1704-
for file, arr in delete.items():
1705-
if file in deletes_per_file:
1706-
deletes_per_file[file].append(arr)
1707-
else:
1708-
deletes_per_file[file] = [arr]
17091752

1710-
return deletes_per_file
1753+
if len(unique_pos_deletes) > 0:
1754+
pos_deletes: Iterator[dict[str, ChunkedArray]] = executor.map(
1755+
lambda args: _read_deletes(*args),
1756+
[(io, delete_file) for delete_file in unique_pos_deletes],
1757+
)
1758+
for delete in pos_deletes:
1759+
for file, arr in delete.items():
1760+
if file in pos_deletes_per_file:
1761+
pos_deletes_per_file[file].append(arr)
1762+
else:
1763+
pos_deletes_per_file[file] = [arr]
1764+
1765+
if len(unique_eq_deletes) > 0:
1766+
# We map each unique eq delete file location to its loaded table and its equality IDs
1767+
eq_deletes_tables: dict[str, tuple[set[int], pa.Table]] = dict(
1768+
zip(
1769+
[d.file_path for d in unique_eq_deletes],
1770+
zip(
1771+
[set(d.equality_ids) if d.equality_ids else set() for d in unique_eq_deletes],
1772+
executor.map(
1773+
lambda args: _read_equality_deletes(*args),
1774+
[(io, d) for d in unique_eq_deletes],
1775+
),
1776+
strict=True,
1777+
),
1778+
strict=True,
1779+
)
1780+
)
1781+
1782+
# Map eq deletes to each task's data file path
1783+
for task in tasks:
1784+
eq_deletes_for_task = [
1785+
eq_deletes_tables[d.file_path] for d in task.delete_files if d.content == DataFileContent.EQUALITY_DELETES
1786+
]
1787+
if eq_deletes_for_task:
1788+
eq_deletes_per_file[task.file.file_path] = eq_deletes_for_task
1789+
1790+
return pos_deletes_per_file, eq_deletes_per_file
17111791

17121792

17131793
class ArrowScan:
@@ -1807,7 +1887,7 @@ def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.Record
18071887
ResolveError: When a required field cannot be found in the file
18081888
ValueError: When a field type in the file cannot be projected to the schema type
18091889
"""
1810-
deletes_per_file = _read_all_delete_files(self._io, tasks)
1890+
pos_deletes_per_file, eq_deletes_per_file = _read_all_delete_files(self._io, tasks)
18111891

18121892
total_row_count = 0
18131893
executor = ExecutorFactory.get_or_create()
@@ -1816,7 +1896,7 @@ def batches_for_task(task: FileScanTask) -> list[pa.RecordBatch]:
18161896
# Materialize the iterator here to ensure execution happens within the executor.
18171897
# Otherwise, the iterator would be lazily consumed later (in the main thread),
18181898
# defeating the purpose of using executor.map.
1819-
return list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))
1899+
return list(self._record_batches_from_scan_tasks_and_deletes([task], pos_deletes_per_file, eq_deletes_per_file))
18201900

18211901
limit_reached = False
18221902
for batches in executor.map(batches_for_task, tasks):
@@ -1836,7 +1916,10 @@ def batches_for_task(task: FileScanTask) -> list[pa.RecordBatch]:
18361916
break
18371917

18381918
def _record_batches_from_scan_tasks_and_deletes(
1839-
self, tasks: Iterable[FileScanTask], deletes_per_file: dict[str, list[ChunkedArray]]
1919+
self,
1920+
tasks: Iterable[FileScanTask],
1921+
pos_deletes_per_file: dict[str, list[ChunkedArray]],
1922+
eq_deletes_per_file: dict[str, list[pa.Table]],
18401923
) -> Iterator[pa.RecordBatch]:
18411924
total_row_count = 0
18421925
for task in tasks:
@@ -1849,7 +1932,8 @@ def _record_batches_from_scan_tasks_and_deletes(
18491932
self._projected_schema,
18501933
self._table_metadata.schema(),
18511934
self._projected_field_ids,
1852-
deletes_per_file.get(task.file.file_path),
1935+
pos_deletes_per_file.get(task.file.file_path),
1936+
eq_deletes_per_file.get(task.file.file_path),
18531937
self._case_sensitive,
18541938
self._table_metadata.name_mapping(),
18551939
self._table_metadata.specs().get(task.file.spec_id),

pyiceberg/table/__init__.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -641,7 +641,7 @@ def delete(
641641
self.table_metadata.properties.get(TableProperties.DELETE_MODE, TableProperties.DELETE_MODE_DEFAULT)
642642
== TableProperties.DELETE_MODE_MERGE_ON_READ
643643
):
644-
warnings.warn("Merge on read is not yet supported, falling back to copy-on-write", stacklevel=2)
644+
raise NotImplementedError("Merge on read is not yet supported")
645645

646646
if isinstance(delete_filter, str):
647647
delete_filter = _parse_row_filter(delete_filter)
@@ -1833,16 +1833,12 @@ def from_rest_response(
18331833
Raises:
18341834
NotImplementedError: If equality delete files are encountered.
18351835
"""
1836-
from pyiceberg.catalog.rest.scan_planning import RESTEqualityDeleteFile
1837-
18381836
data_file = _rest_file_to_data_file(rest_task.data_file)
18391837

18401838
resolved_deletes: set[DataFile] = set()
18411839
if rest_task.delete_file_references:
18421840
for idx in rest_task.delete_file_references:
18431841
delete_file = delete_files[idx]
1844-
if isinstance(delete_file, RESTEqualityDeleteFile):
1845-
raise NotImplementedError(f"PyIceberg does not yet support equality deletes: {delete_file.file_path}")
18461842
resolved_deletes.add(_rest_file_to_data_file(delete_file))
18471843

18481844
return FileScanTask(
@@ -2067,10 +2063,8 @@ def _plan_files_local(self) -> Iterable[FileScanTask]:
20672063
data_file = manifest_entry.data_file
20682064
if data_file.content == DataFileContent.DATA:
20692065
data_entries.append(manifest_entry)
2070-
elif data_file.content == DataFileContent.POSITION_DELETES:
2066+
elif data_file.content in {DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES}:
20712067
delete_index.add_delete_file(manifest_entry, partition_key=data_file.partition)
2072-
elif data_file.content == DataFileContent.EQUALITY_DELETES:
2073-
raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568")
20742068
else:
20752069
raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}")
20762070
return [

pyiceberg/table/delete_file_index.py

Lines changed: 39 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
from pyiceberg.expressions import EqualTo
2222
from pyiceberg.expressions.visitors import _InclusiveMetricsEvaluator
23-
from pyiceberg.manifest import INITIAL_SEQUENCE_NUMBER, POSITIONAL_DELETE_SCHEMA, DataFile, ManifestEntry
23+
from pyiceberg.manifest import INITIAL_SEQUENCE_NUMBER, POSITIONAL_DELETE_SCHEMA, DataFile, DataFileContent, ManifestEntry
2424
from pyiceberg.typedef import Record
2525

2626
PATH_FIELD_ID = 2147483546
@@ -59,6 +59,10 @@ def referenced_delete_files(self) -> list[DataFile]:
5959
return [data_file for data_file, _ in self._files]
6060

6161

62+
class EqualityDeletes(PositionDeletes):
63+
"""Collects equality delete files and indexes them by sequence number."""
64+
65+
6266
def _has_path_bounds(delete_file: DataFile) -> bool:
6367
lower = delete_file.lower_bounds
6468
upper = delete_file.upper_bounds
@@ -103,26 +107,32 @@ def _partition_key(spec_id: int, partition: Record | None) -> tuple[int, Record]
103107

104108

105109
class DeleteFileIndex:
106-
"""Indexes position delete files by partition and by exact data file path."""
110+
"""Indexes position and equality delete files by partition and by exact data file path."""
107111

108112
def __init__(self) -> None:
109-
self._by_partition: dict[tuple[int, Record], PositionDeletes] = {}
110-
self._by_path: dict[str, PositionDeletes] = {}
113+
self._pos_by_partition: dict[tuple[int, Record], PositionDeletes] = {}
114+
self._pos_by_path: dict[str, PositionDeletes] = {}
115+
self._eq_by_partition: dict[tuple[int, Record], EqualityDeletes] = {}
111116

112117
def is_empty(self) -> bool:
113-
return not self._by_partition and not self._by_path
118+
return not self._pos_by_partition and not self._pos_by_path and not self._eq_by_partition
114119

115120
def add_delete_file(self, manifest_entry: ManifestEntry, partition_key: Record | None = None) -> None:
116121
delete_file = manifest_entry.data_file
117122
seq = manifest_entry.sequence_number or INITIAL_SEQUENCE_NUMBER
118-
target_path = _referenced_data_file_path(delete_file)
119123

120-
if target_path:
121-
deletes = self._by_path.setdefault(target_path, PositionDeletes())
122-
deletes.add(delete_file, seq)
123-
else:
124+
if delete_file.content == DataFileContent.POSITION_DELETES:
125+
target_path = _referenced_data_file_path(delete_file)
126+
if target_path:
127+
deletes = self._pos_by_path.setdefault(target_path, PositionDeletes())
128+
deletes.add(delete_file, seq)
129+
else:
130+
key = _partition_key(delete_file.spec_id or 0, partition_key)
131+
deletes = self._pos_by_partition.setdefault(key, PositionDeletes())
132+
deletes.add(delete_file, seq)
133+
elif delete_file.content == DataFileContent.EQUALITY_DELETES:
124134
key = _partition_key(delete_file.spec_id or 0, partition_key)
125-
deletes = self._by_partition.setdefault(key, PositionDeletes())
135+
deletes = self._eq_by_partition.setdefault(key, EqualityDeletes())
126136
deletes.add(delete_file, seq)
127137

128138
def for_data_file(self, seq_num: int, data_file: DataFile, partition_key: Record | None = None) -> set[DataFile]:
@@ -131,27 +141,36 @@ def for_data_file(self, seq_num: int, data_file: DataFile, partition_key: Record
131141

132142
deletes: set[DataFile] = set()
133143
spec_id = data_file.spec_id or 0
134-
135144
key = _partition_key(spec_id, partition_key)
136-
partition_deletes = self._by_partition.get(key)
137-
if partition_deletes:
138-
for delete_file in partition_deletes.filter_by_seq(seq_num):
145+
146+
# Add position deletes
147+
partition_pos_deletes = self._pos_by_partition.get(key)
148+
if partition_pos_deletes:
149+
for delete_file in partition_pos_deletes.filter_by_seq(seq_num):
139150
if _applies_to_data_file(delete_file, data_file):
140151
deletes.add(delete_file)
141152

142-
path_deletes = self._by_path.get(data_file.file_path)
143-
if path_deletes:
144-
deletes.update(path_deletes.filter_by_seq(seq_num))
153+
path_pos_deletes = self._pos_by_path.get(data_file.file_path)
154+
if path_pos_deletes:
155+
deletes.update(path_pos_deletes.filter_by_seq(seq_num))
156+
157+
# Add equality deletes
158+
partition_eq_deletes = self._eq_by_partition.get(key)
159+
if partition_eq_deletes:
160+
deletes.update(partition_eq_deletes.filter_by_seq(seq_num))
145161

146162
return deletes
147163

148164
def referenced_delete_files(self) -> list[DataFile]:
149165
data_files: list[DataFile] = []
150166

151-
for deletes in self._by_partition.values():
167+
for deletes in self._pos_by_partition.values():
168+
data_files.extend(deletes.referenced_delete_files())
169+
170+
for deletes in self._pos_by_path.values():
152171
data_files.extend(deletes.referenced_delete_files())
153172

154-
for deletes in self._by_path.values():
173+
for deletes in self._eq_by_partition.values():
155174
data_files.extend(deletes.referenced_delete_files())
156175

157176
return data_files

pyiceberg/table/update/snapshot.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@ def _validate_target_branch(self, branch: str | None) -> str | None:
145145
return branch
146146

147147
def append_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]:
148+
if data_file.content == DataFileContent.EQUALITY_DELETES:
149+
raise NotImplementedError(f"PyIceberg does not support writing {data_file.content}")
148150
self._added_data_files.append(data_file)
149151
return self
150152

0 commit comments

Comments
 (0)