Skip to content

Commit d2c58b2

Browse files
committed
fix schemas and partition specs to be according the snapshot and not latest and make
1 parent 7ee20ea commit d2c58b2

3 files changed

Lines changed: 22 additions & 16 deletions

File tree

pyiceberg/io/pyarrow.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -904,14 +904,15 @@ def _read_delete_file(fs: FileSystem, data_file: DataFile) -> Iterator[PositionD
904904

905905

906906
def _read_deletes(fs: FileSystem, data_file: DataFile) -> Dict[str, pa.ChunkedArray]:
907-
delete_fragment = _construct_fragment(
908-
fs, data_file, file_format_kwargs={"dictionary_columns": ("file_path",), "pre_buffer": True, "buffer_size": ONE_MEGABYTE}
909-
)
910-
table = ds.Scanner.from_fragment(fragment=delete_fragment).to_table()
911-
table = table.unify_dictionaries()
907+
deletes_by_file: Dict[str, List[int]] = {}
908+
for delete in _read_delete_file(fs, data_file):
909+
if delete.file_path not in deletes_by_file:
910+
deletes_by_file[delete.file_path] = []
911+
deletes_by_file[delete.file_path].append(delete.pos)
912+
913+
# Convert lists of positions to ChunkedArrays
912914
return {
913-
file.as_py(): table.filter(pc.field("file_path") == file).column("pos")
914-
for file in table.column("file_path").chunks[0].dictionary
915+
file_path: pa.chunked_array([pa.array(positions, type=pa.int64())]) for file_path, positions in deletes_by_file.items()
915916
}
916917

917918

pyiceberg/table/inspect.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from pyiceberg.conversions import from_bytes
2323
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary
2424
from pyiceberg.partitioning import PartitionSpec
25+
from pyiceberg.schema import Schema
2526
from pyiceberg.table.snapshots import Snapshot, ancestors_of
2627
from pyiceberg.types import PrimitiveType
2728
from pyiceberg.utils.concurrent import ExecutorFactory
@@ -384,14 +385,16 @@ def _get_all_manifests_schema(self) -> "pa.Schema":
384385
all_manifests_schema = all_manifests_schema.append(pa.field("reference_snapshot_id", pa.int64(), nullable=False))
385386
return all_manifests_schema
386387

387-
def _get_positional_deletes_schema(self) -> "pa.Schema":
388+
def _get_positional_deletes_schema(self, schema: Optional[Schema] = None, spec_id: Optional[int] = None) -> "pa.Schema":
388389
import pyarrow as pa
389390

390391
from pyiceberg.io.pyarrow import schema_to_pyarrow
391392

392-
partition_struct = self.tbl.metadata.spec_struct()
393+
schema = schema or self.tbl.metadata.schema()
394+
395+
partition_struct = self.tbl.metadata.spec_struct(spec_id=spec_id)
393396
pa_partition_struct = schema_to_pyarrow(partition_struct)
394-
pa_row_struct = schema_to_pyarrow(self.tbl.schema().as_struct())
397+
pa_row_struct = schema_to_pyarrow(schema.as_struct())
395398
positional_delete_schema = pa.schema(
396399
[
397400
pa.field("file_path", pa.string(), nullable=False),
@@ -473,11 +476,13 @@ def _partition_summaries_to_rows(
473476
schema=self._get_all_manifests_schema() if is_all_manifests_table else self._get_manifests_schema(),
474477
)
475478

476-
def _generate_positional_delete_table(self, manifest: ManifestFile, position_deletes_schema: "pa.Schema") -> "pa.Table":
479+
def _generate_positional_delete_table(self, manifest: ManifestFile, schema: Schema) -> "pa.Table":
477480
import pyarrow as pa
478481

479482
positional_deletes: List["pa.Table"] = []
480483

484+
position_deletes_schema = self._get_positional_deletes_schema(schema=schema, spec_id=manifest.partition_spec_id)
485+
481486
if manifest.content == ManifestContent.DELETES:
482487
for entry in manifest.fetch_manifest_entry(self.tbl.io):
483488
if entry.data_file.content == DataFileContent.POSITION_DELETES:
@@ -713,14 +718,14 @@ def position_deletes(self, snapshot_id: Optional[int] = None) -> "pa.Table":
713718
import pyarrow as pa
714719

715720
snapshot = self._get_snapshot(snapshot_id) if snapshot_id else self.tbl.current_snapshot()
716-
position_deletes_schema = self._get_positional_deletes_schema()
717-
718721
if not snapshot:
719-
return pa.Table.from_pylist([], schema=position_deletes_schema)
722+
schema = self._get_positional_deletes_schema()
723+
return pa.Table.from_pylist([], schema=schema)
720724

725+
schemas = self.tbl.schemas()
721726
executor = ExecutorFactory.get_or_create()
722727
positional_deletes: Iterator["pa.Table"] = executor.map(
723-
lambda manifest: self._generate_positional_delete_table(manifest, position_deletes_schema),
728+
lambda manifest: self._generate_positional_delete_table(manifest, schema=schemas[snapshot.schema_id]),
724729
snapshot.manifests(self.tbl.io),
725730
)
726731
return pa.concat_tables(positional_deletes)

pyiceberg/table/metadata.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ def specs_struct(self) -> StructType:
280280
return StructType(*nested_fields)
281281

282282
def spec_struct(self, spec_id: Optional[int] = None) -> StructType:
283-
"""Produce for a spec_id a struct of PartitionSpecs.
283+
"""Produce for a spec_id a struct of PartitionSpecs.
284284
285285
The partition fields should be optional: Partition fields may be added later,
286286
in which case not all files would have the result field, and it may be null.

0 commit comments

Comments
 (0)