Skip to content

Commit fb7228a

Browse files
committed
final version of position_deletes
1 parent a26ca73 commit fb7228a

3 files changed

Lines changed: 61 additions & 78 deletions

File tree

pyiceberg/io/pyarrow.py

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -889,17 +889,9 @@ def _construct_fragment(fs: FileSystem, data_file: DataFile, file_format_kwargs:
889889
return _get_file_format(data_file.file_format, **file_format_kwargs).make_fragment(path, fs)
890890

891891

892-
def _read_delete_file(fs: FileSystem, data_file: DataFile) -> pa.Table:
893-
positinal_delete_schema = pa.schema(
894-
[
895-
pa.field("file_path", pa.string(), nullable=False),
896-
pa.field("pos", pa.int64(), nullable=False),
897-
pa.field("row", pa.int64(), nullable=True),
898-
]
899-
)
900-
892+
def _read_delete_file(fs: FileSystem, data_file: DataFile, schema: "pa.Schema") -> pa.Table:
901893
delete_fragment = _construct_fragment(fs, data_file, file_format_kwargs={"pre_buffer": True, "buffer_size": ONE_MEGABYTE})
902-
table = ds.Scanner.from_fragment(fragment=delete_fragment, schema=positinal_delete_schema).to_table()
894+
table = ds.Scanner.from_fragment(fragment=delete_fragment, schema=schema).to_table()
903895
return table
904896

905897

pyiceberg/table/inspect.py

Lines changed: 55 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple
2121

2222
from pyiceberg.conversions import from_bytes
23-
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, PartitionFieldSummary
23+
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary
2424
from pyiceberg.partitioning import PartitionSpec
2525
from pyiceberg.table.snapshots import Snapshot, ancestors_of
2626
from pyiceberg.types import PrimitiveType
@@ -384,14 +384,35 @@ def _get_all_manifests_schema(self) -> "pa.Schema":
384384
all_manifests_schema = all_manifests_schema.append(pa.field("reference_snapshot_id", pa.int64(), nullable=False))
385385
return all_manifests_schema
386386

387+
def _get_positional_file_schema(self) -> "pa.Schema":
388+
import pyarrow as pa
389+
390+
from pyiceberg.io.pyarrow import schema_to_pyarrow
391+
392+
pa_row_struct = schema_to_pyarrow(self.tbl.schema().as_struct())
393+
positinal_delete_schema = pa.schema(
394+
[
395+
pa.field("file_path", pa.string(), nullable=False),
396+
pa.field("pos", pa.int64(), nullable=False),
397+
pa.field("row", pa_row_struct, nullable=True),
398+
]
399+
)
400+
return positinal_delete_schema
401+
387402
def _get_positional_deletes_schema(self) -> "pa.Schema":
388403
import pyarrow as pa
389404

405+
from pyiceberg.io.pyarrow import schema_to_pyarrow
406+
407+
partition_record = self.tbl.metadata.specs_struct()
408+
pa_partition_struct = schema_to_pyarrow(partition_record)
409+
pa_row_struct = schema_to_pyarrow(self.tbl.schema().as_struct())
390410
positinal_delete_schema = pa.schema(
391411
[
392412
pa.field("file_path", pa.string(), nullable=False),
393413
pa.field("pos", pa.int64(), nullable=False),
394-
pa.field("row", pa.int64(), nullable=True),
414+
pa.field("row", pa_row_struct, nullable=True),
415+
pa.field("partition", pa_partition_struct, nullable=False),
395416
pa.field("spec_id", pa.int64(), nullable=True),
396417
pa.field("delete_file_path", pa.string(), nullable=False),
397418
]
@@ -467,23 +488,30 @@ def _partition_summaries_to_rows(
467488
schema=self._get_all_manifests_schema() if is_all_manifests_table else self._get_manifests_schema(),
468489
)
469490

470-
# def _generate_positional_delete_table(self, manifest_list: ManifestFile) -> "pa.Table":
471-
# import pyarrow as pa
472-
# all_deletes = []
473-
# if manifest_list.content == ManifestContent.DELETES:
474-
# for manifest_entry in manifest_list.fetch_manifest_entry(self.tbl.io):
475-
# if manifest_entry.data_file.content == DataFileContent.POSITION_DELETES:
476-
# from pyiceberg.io.pyarrow import _read_delete_file
477-
# from pyiceberg.io.pyarrow import _fs_from_file_path
478-
# positional_delete = _read_delete_file(
479-
# _fs_from_file_path(self.tbl.io, manifest_entry.data_file.file_path),
480-
# manifest_entry.data_file)
481-
#
482-
# positional_delete = positional_delete.append_column("spec_id", pa.array(
483-
# [manifest_list.partition_spec_id] * len(positional_delete))).append_column("delete_file_path",pa.array([manifest_entry.data_file.file_path] * len(positional_delete)))
484-
#
485-
# all_deletes.append(positional_delete)
486-
# return pa.concat_tables(all_deletes)
491+
def _generate_positional_delete_table(self, manifest: ManifestFile, position_deletes_schema: "pa.Schema") -> "pa.Table":
492+
import pyarrow as pa
493+
494+
positional_deletes: List["pa.Table"] = []
495+
if manifest.content == ManifestContent.DELETES:
496+
for entry in manifest.fetch_manifest_entry(self.tbl.io):
497+
if entry.data_file.content == DataFileContent.POSITION_DELETES:
498+
from pyiceberg.io.pyarrow import _fs_from_file_path, _read_delete_file
499+
500+
positional_delete_file = _read_delete_file(
501+
_fs_from_file_path(self.tbl.io, entry.data_file.file_path),
502+
entry.data_file,
503+
self._get_positional_file_schema(),
504+
).to_pylist()
505+
for record in positional_delete_file:
506+
record["partition"] = entry.data_file.partition.__dict__
507+
record["spec_id"] = manifest.partition_spec_id
508+
record["delete_file_path"] = entry.data_file.file_path
509+
510+
positional_deletes.append(pa.Table.from_pylist(positional_delete_file, position_deletes_schema))
511+
512+
if not positional_deletes:
513+
return pa.Table.from_pylist([], position_deletes_schema)
514+
return pa.concat_tables(positional_deletes)
487515

488516
def manifests(self) -> "pa.Table":
489517
return self._generate_manifests_table(self.tbl.current_snapshot())
@@ -693,38 +721,15 @@ def all_manifests(self) -> "pa.Table":
693721
def position_deletes(self) -> "pa.Table":
694722
import pyarrow as pa
695723

696-
snapshots = self.tbl.snapshots()
697-
if not snapshots:
698-
return pa.Table.from_pylist([], schema=self._get_positional_deletes_schema())
724+
position_deletes_schema = self._get_positional_deletes_schema()
699725
current_snapshot = self.tbl.current_snapshot()
700726

701-
#
702-
# executor = ExecutorFactory.get_or_create()
703-
# positonal_deletes: Iterator["pa.Table"] = executor.map(
704-
# lambda manifest_list: self._generate_positional_delete_table(manifest_list),current_snapshot.manifests(self.tbl.io)
705-
# )
706-
# all_deletes = []
707-
positional_deletes = []
708-
709-
for manifest_list in current_snapshot.manifests(self.tbl.io):
710-
import pyarrow as pa
711-
if manifest_list.content == ManifestContent.DELETES:
712-
defaultSpecId = self.tbl.spec().spec_id
713-
for manifest_entry in manifest_list.fetch_manifest_entry(self.tbl.io):
714-
715-
if manifest_entry.data_file.content == DataFileContent.POSITION_DELETES:
716-
from pyiceberg.io.pyarrow import _read_delete_file
717-
from pyiceberg.io.pyarrow import _fs_from_file_path
718-
positional_delete = _read_delete_file(
719-
_fs_from_file_path(self.tbl.io, manifest_entry.data_file.file_path),
720-
manifest_entry.data_file)
721-
722-
positional_delete = positional_delete.append_column("spec_id", pa.array(
723-
[manifest_list.partition_spec_id] * len(positional_delete))).append_column("partition", pa.array(
724-
[self.] * len(positional_delete))).append_column(
725-
"delete_file_path", pa.array([manifest_entry.data_file.file_path] * len(positional_delete)))
726-
727-
positional_deletes.append(positional_delete)
728-
# return pa.concat_tables(all_deletes)
727+
if not current_snapshot:
728+
return pa.Table.from_pylist([], schema=position_deletes_schema)
729729

730+
executor = ExecutorFactory.get_or_create()
731+
positional_deletes: Iterator["pa.Table"] = executor.map(
732+
lambda manifest: self._generate_positional_delete_table(manifest, position_deletes_schema),
733+
current_snapshot.manifests(self.tbl.io),
734+
)
730735
return pa.concat_tables(positional_deletes)

tests/integration/test_inspect_table.py

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -941,7 +941,7 @@ def test_inspect_all_manifests(spark: SparkSession, session_catalog: Catalog, fo
941941

942942

943943
@pytest.mark.integration
944-
@pytest.mark.parametrize("format_version", [2])
944+
@pytest.mark.parametrize("format_version", [1, 2])
945945
def test_inspect_positional_deletes(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
946946
from pandas.testing import assert_frame_equal
947947

@@ -964,8 +964,6 @@ def test_inspect_positional_deletes(spark: SparkSession, session_catalog: Catalo
964964
)
965965
tbl = session_catalog.load_table(identifier)
966966

967-
# check all_manifests when there are no snapshots
968-
969967
spark.sql(f"INSERT INTO {identifier} VALUES (1, 'a')")
970968

971969
spark.sql(f"INSERT INTO {identifier} VALUES (2, 'b')")
@@ -977,21 +975,10 @@ def test_inspect_positional_deletes(spark: SparkSession, session_catalog: Catalo
977975
tbl.refresh()
978976
df = tbl.inspect.position_deletes()
979977

980-
assert df.column_names == [
981-
"file_path",
982-
"pos",
983-
"row",
984-
"spec_id",
985-
"delete_file_path"
986-
]
978+
assert df.column_names == ["file_path", "pos", "row", "partition", "spec_id", "delete_file_path"]
987979

988-
int_cols = [
989-
"pos"
990-
]
991-
string_cols = [
992-
"file_path",
993-
"delete_file_path"
994-
]
980+
int_cols = ["pos"]
981+
string_cols = ["file_path", "delete_file_path"]
995982

996983
for column in int_cols:
997984
for value in df[column]:
@@ -1001,7 +988,6 @@ def test_inspect_positional_deletes(spark: SparkSession, session_catalog: Catalo
1001988
for value in df[column]:
1002989
assert isinstance(value.as_py(), str)
1003990

1004-
new_df = spark.sql(f"select * from {identifier}.position_deletes").toPandas()
1005991
lhs = spark.table(f"{identifier}.position_deletes").toPandas()
1006992
rhs = df.to_pandas()
1007993
assert_frame_equal(lhs, rhs, check_dtype=False)

0 commit comments

Comments
 (0)