Skip to content

Commit 7ee20ea

Browse files
committed
fix comments of pr
1 parent fb7228a commit 7ee20ea

5 files changed

Lines changed: 154 additions & 35 deletions

File tree

mkdocs/docs/api.md

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -971,6 +971,67 @@ readable_metrics: [
971971

972972
To show only data files or delete files in the current snapshot, use `table.inspect.data_files()` and `table.inspect.delete_files()` respectively.
973973

974+
### Position deletes
975+
976+
Inspect the positional delete files in the current snapshot of the table:
977+
978+
```python
979+
table.inspect.position_deletes()
980+
```
981+
982+
```python
983+
pyarrow.Table
984+
file_path: string not null
985+
pos: int64 not null
986+
row: struct<id: int32, data: large_string>
987+
child 0, id: int32
988+
child 1, data: large_string
989+
partition: struct<data: large_string> not null
990+
child 0, data: large_string
991+
spec_id: int64
992+
delete_file_path: string not null
993+
----
994+
file_path: [[],[],[],["s3://warehouse/default/table_metadata_position_deletes/data/data=a/00000-1-acbf93b7-f760-4517-aa84-b9240902d3d2-0-00001.parquet"]]
995+
pos: [[],[],[],[0]]
996+
row: [
997+
-- is_valid: all not null
998+
-- child 0 type: int32
999+
[]
1000+
-- child 1 type: large_string
1001+
[],
1002+
-- is_valid: all not null
1003+
-- child 0 type: int32
1004+
[]
1005+
-- child 1 type: large_string
1006+
[],
1007+
-- is_valid: all not null
1008+
-- child 0 type: int32
1009+
[]
1010+
-- child 1 type: large_string
1011+
[],
1012+
-- is_valid: [false]
1013+
-- child 0 type: int32
1014+
[0]
1015+
-- child 1 type: large_string
1016+
[""]]
1017+
partition: [
1018+
-- is_valid: all not null
1019+
-- child 0 type: large_string
1020+
[],
1021+
-- is_valid: all not null
1022+
-- child 0 type: large_string
1023+
[],
1024+
-- is_valid: all not null
1025+
-- child 0 type: large_string
1026+
[],
1027+
-- is_valid: all not null
1028+
-- child 0 type: large_string
1029+
["a"]]
1030+
spec_id: [[],[],[],[0]]
1031+
delete_file_path: [[],[],[],["s3://warehouse/default/table_metadata_position_deletes/data/data=a/00000-5-bc7a1d8a-fefe-4277-b4ac-8f1dd7badb7a-00001-deletes.parquet"]]
1032+
1033+
```
1034+
9741035
## Add Files
9751036

9761037
Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them.

pyiceberg/io/pyarrow.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@
122122
DataFile,
123123
DataFileContent,
124124
FileFormat,
125+
PositionDelete,
125126
)
126127
from pyiceberg.partitioning import PartitionField, PartitionFieldValue, PartitionKey, PartitionSpec, partition_record_value
127128
from pyiceberg.schema import (
@@ -889,10 +890,17 @@ def _construct_fragment(fs: FileSystem, data_file: DataFile, file_format_kwargs:
889890
return _get_file_format(data_file.file_format, **file_format_kwargs).make_fragment(path, fs)
890891

891892

892-
def _read_delete_file(fs: FileSystem, data_file: DataFile, schema: "pa.Schema") -> pa.Table:
893+
def _read_delete_file(fs: FileSystem, data_file: DataFile) -> Iterator[PositionDelete]:
893894
delete_fragment = _construct_fragment(fs, data_file, file_format_kwargs={"pre_buffer": True, "buffer_size": ONE_MEGABYTE})
894-
table = ds.Scanner.from_fragment(fragment=delete_fragment, schema=schema).to_table()
895-
return table
895+
table = ds.Scanner.from_fragment(fragment=delete_fragment).to_table()
896+
for batch in table.to_batches():
897+
for i in range(len(batch)):
898+
row = batch.column("row")[i].as_py() if "row" in batch.schema.names else None
899+
yield PositionDelete(
900+
file_path=batch.column("file_path")[i].as_py(),
901+
pos=batch.column("pos")[i].as_py(),
902+
row=row, # Setting row as None since it's optional and not needed for position deletes
903+
)
896904

897905

898906
def _read_deletes(fs: FileSystem, data_file: DataFile) -> Dict[str, pa.ChunkedArray]:

pyiceberg/manifest.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,34 @@ def data_file_with_partition(partition_type: StructType, format_version: TableVe
320320
)
321321

322322

323+
class PositionDelete(Record):
324+
__slots__ = ("file_path", "pos", "row")
325+
file_path: str
326+
pos: int
327+
row: Optional[Record]
328+
329+
def __setattr__(self, name: str, value: Any) -> None:
330+
"""Assign a key/value to a PositionDelete."""
331+
super().__setattr__(name, value)
332+
333+
def __init__(self, file_path: str, pos: int, row: Optional[Record], *data: Any, **named_data: Any) -> None:
334+
super().__init__(*data, **named_data)
335+
self.file_path = file_path
336+
self.pos = pos
337+
self.row = row
338+
339+
def __hash__(self) -> int:
340+
"""Return the hash of the file path."""
341+
return hash(self.file_path)
342+
343+
def __eq__(self, other: Any) -> bool:
344+
"""Compare the PositionDelete with another object.
345+
346+
If it is a PositionDelete, it will compare based on the file_path.
347+
"""
348+
return self.file_path == other.file_path if isinstance(other, PositionDelete) else False
349+
350+
323351
class DataFile(Record):
324352
__slots__ = (
325353
"content",

pyiceberg/table/inspect.py

Lines changed: 23 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -384,30 +384,15 @@ def _get_all_manifests_schema(self) -> "pa.Schema":
384384
all_manifests_schema = all_manifests_schema.append(pa.field("reference_snapshot_id", pa.int64(), nullable=False))
385385
return all_manifests_schema
386386

387-
def _get_positional_file_schema(self) -> "pa.Schema":
388-
import pyarrow as pa
389-
390-
from pyiceberg.io.pyarrow import schema_to_pyarrow
391-
392-
pa_row_struct = schema_to_pyarrow(self.tbl.schema().as_struct())
393-
positinal_delete_schema = pa.schema(
394-
[
395-
pa.field("file_path", pa.string(), nullable=False),
396-
pa.field("pos", pa.int64(), nullable=False),
397-
pa.field("row", pa_row_struct, nullable=True),
398-
]
399-
)
400-
return positinal_delete_schema
401-
402387
def _get_positional_deletes_schema(self) -> "pa.Schema":
403388
import pyarrow as pa
404389

405390
from pyiceberg.io.pyarrow import schema_to_pyarrow
406391

407-
partition_record = self.tbl.metadata.specs_struct()
408-
pa_partition_struct = schema_to_pyarrow(partition_record)
392+
partition_struct = self.tbl.metadata.spec_struct()
393+
pa_partition_struct = schema_to_pyarrow(partition_struct)
409394
pa_row_struct = schema_to_pyarrow(self.tbl.schema().as_struct())
410-
positinal_delete_schema = pa.schema(
395+
positional_delete_schema = pa.schema(
411396
[
412397
pa.field("file_path", pa.string(), nullable=False),
413398
pa.field("pos", pa.int64(), nullable=False),
@@ -417,7 +402,7 @@ def _get_positional_deletes_schema(self) -> "pa.Schema":
417402
pa.field("delete_file_path", pa.string(), nullable=False),
418403
]
419404
)
420-
return positinal_delete_schema
405+
return positional_delete_schema
421406

422407
def _generate_manifests_table(self, snapshot: Optional[Snapshot], is_all_manifests_table: bool = False) -> "pa.Table":
423408
import pyarrow as pa
@@ -492,22 +477,28 @@ def _generate_positional_delete_table(self, manifest: ManifestFile, position_del
492477
import pyarrow as pa
493478

494479
positional_deletes: List["pa.Table"] = []
480+
495481
if manifest.content == ManifestContent.DELETES:
496482
for entry in manifest.fetch_manifest_entry(self.tbl.io):
497483
if entry.data_file.content == DataFileContent.POSITION_DELETES:
498484
from pyiceberg.io.pyarrow import _fs_from_file_path, _read_delete_file
499485

500486
positional_delete_file = _read_delete_file(
501-
_fs_from_file_path(self.tbl.io, entry.data_file.file_path),
502-
entry.data_file,
503-
self._get_positional_file_schema(),
504-
).to_pylist()
487+
_fs_from_file_path(self.tbl.io, entry.data_file.file_path), entry.data_file
488+
)
489+
positional_deletes_records = []
505490
for record in positional_delete_file:
506-
record["partition"] = entry.data_file.partition.__dict__
507-
record["spec_id"] = manifest.partition_spec_id
508-
record["delete_file_path"] = entry.data_file.file_path
509-
510-
positional_deletes.append(pa.Table.from_pylist(positional_delete_file, position_deletes_schema))
491+
row = {
492+
"file_path": record.file_path,
493+
"pos": record.pos,
494+
"row": record.row,
495+
"partition": entry.data_file.partition.__dict__,
496+
"spec_id": manifest.partition_spec_id,
497+
"delete_file_path": entry.data_file.file_path,
498+
}
499+
positional_deletes_records.append(row)
500+
501+
positional_deletes.append(pa.Table.from_pylist(positional_deletes_records, position_deletes_schema))
511502

512503
if not positional_deletes:
513504
return pa.Table.from_pylist([], position_deletes_schema)
@@ -718,18 +709,18 @@ def all_manifests(self) -> "pa.Table":
718709
)
719710
return pa.concat_tables(manifests_by_snapshots)
720711

721-
def position_deletes(self) -> "pa.Table":
712+
def position_deletes(self, snapshot_id: Optional[int] = None) -> "pa.Table":
722713
import pyarrow as pa
723714

715+
snapshot = self._get_snapshot(snapshot_id) if snapshot_id else self.tbl.current_snapshot()
724716
position_deletes_schema = self._get_positional_deletes_schema()
725-
current_snapshot = self.tbl.current_snapshot()
726717

727-
if not current_snapshot:
718+
if not snapshot:
728719
return pa.Table.from_pylist([], schema=position_deletes_schema)
729720

730721
executor = ExecutorFactory.get_or_create()
731722
positional_deletes: Iterator["pa.Table"] = executor.map(
732723
lambda manifest: self._generate_positional_delete_table(manifest, position_deletes_schema),
733-
current_snapshot.manifests(self.tbl.io),
724+
snapshot.manifests(self.tbl.io),
734725
)
735726
return pa.concat_tables(positional_deletes)

pyiceberg/table/metadata.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,37 @@ def specs_struct(self) -> StructType:
279279

280280
return StructType(*nested_fields)
281281

282+
def spec_struct(self, spec_id: Optional[int] = None) -> StructType:
283+
"""Produce for a spec_id a struct of PartitionSpecs.
284+
285+
The partition fields should be optional: Partition fields may be added later,
286+
in which case not all files would have the result field, and it may be null.
287+
288+
:return: A StructType that represents a PartitionSpec of the table for a specific spec_id or latest.
289+
"""
290+
if spec_id is None:
291+
spec = self.spec()
292+
else:
293+
specs = self.specs()
294+
filtered_spec = list(filter(lambda spec: spec.spec_id == spec_id, specs.values()))
295+
if not filtered_spec:
296+
raise ValidationError(f"Spec with spec_id {spec_id} not found")
297+
spec = filtered_spec[0]
298+
# Collect all the fields
299+
struct_fields = {field.field_id: field for field in spec.fields}
300+
301+
schema = self.schema()
302+
303+
nested_fields = []
304+
# Sort them by field_id in order to get a deterministic output
305+
for field_id in sorted(struct_fields):
306+
field = struct_fields[field_id]
307+
source_type = schema.find_type(field.source_id)
308+
result_type = field.transform.result_type(source_type)
309+
nested_fields.append(NestedField(field_id=field.field_id, name=field.name, type=result_type, required=False))
310+
311+
return StructType(*nested_fields)
312+
282313
def new_snapshot_id(self) -> int:
283314
"""Generate a new snapshot-id that's not in use."""
284315
snapshot_id = _generate_snapshot_id()

0 commit comments

Comments
 (0)