Skip to content

Commit 35bab25

Browse files
committed
read manifest with pyiceberg-core
1 parent 1ddc5d3 commit 35bab25

4 files changed

Lines changed: 74 additions & 27 deletions

File tree

pyiceberg/manifest.py

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -853,18 +853,47 @@ def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List
853853
Returns:
854854
An Iterator of manifest entries.
855855
"""
856-
input_file = io.new_input(self.manifest_path)
857-
with AvroFile[ManifestEntry](
858-
input_file,
859-
MANIFEST_ENTRY_SCHEMAS[DEFAULT_READ_VERSION],
860-
read_types={-1: ManifestEntry, 2: DataFile},
861-
read_enums={0: ManifestEntryStatus, 101: FileFormat, 134: DataFileContent},
862-
) as reader:
863-
return [
864-
_inherit_from_manifest(entry, self)
865-
for entry in reader
866-
if not discard_deleted or entry.status != ManifestEntryStatus.DELETED
867-
]
856+
from pyiceberg_core import manifest
857+
858+
bs = io.new_input(self.manifest_path).open().read()
859+
manifest = manifest.read_manifest_entries(bs)
860+
861+
# TODO: Don't convert the types
862+
# but this is the easiest for now until we
863+
# have the write part in there as well
864+
def _convert_entry(entry: Any) -> ManifestEntry:
865+
data_file = DataFile(
866+
DataFileContent(entry.data_file.content),
867+
entry.data_file.file_path,
868+
FileFormat(entry.data_file.file_format),
869+
[p.value() if p is not None else None for p in entry.data_file.partition],
870+
entry.data_file.record_count,
871+
entry.data_file.file_size_in_bytes,
872+
entry.data_file.column_sizes,
873+
entry.data_file.value_counts,
874+
entry.data_file.null_value_counts,
875+
entry.data_file.nan_value_counts,
876+
entry.data_file.lower_bounds,
877+
entry.data_file.upper_bounds,
878+
entry.data_file.key_metadata,
879+
entry.data_file.split_offsets,
880+
entry.data_file.equality_ids,
881+
entry.data_file.sort_order_id,
882+
)
883+
884+
return ManifestEntry(
885+
ManifestEntryStatus(entry.status),
886+
entry.snapshot_id,
887+
entry.sequence_number,
888+
entry.file_sequence_number,
889+
data_file,
890+
)
891+
892+
return [
893+
_inherit_from_manifest(_convert_entry(entry), self)
894+
for entry in manifest.entries()
895+
if not discard_deleted or entry.status != ManifestEntryStatus.DELETED
896+
]
868897

869898
def __eq__(self, other: Any) -> bool:
870899
"""Return the equality of two instances of the ManifestFile class."""
@@ -925,12 +954,12 @@ def _inherit_from_manifest(entry: ManifestEntry, manifest: ManifestFile) -> Mani
925954

926955
# in v1 tables, the sequence number is not persisted and can be safely defaulted to 0
927956
# in v2 tables, the sequence number should be inherited iff the entry status is ADDED
928-
if entry.sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED):
957+
if entry.sequence_number is None or (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED):
929958
entry.sequence_number = manifest.sequence_number
930959

931960
# in v1 tables, the file sequence number is not persisted and can be safely defaulted to 0
932961
# in v2 tables, the file sequence number should be inherited iff the entry status is ADDED
933-
if entry.file_sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED):
962+
if entry.file_sequence_number is None or (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED):
934963
# Only available in V2, always 0 in V1
935964
entry.file_sequence_number = manifest.sequence_number
936965

tests/avro/test_reader.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ def test_read_header(generated_manifest_entry_file: str, iceberg_manifest_entry_
112112
{
113113
"field-id": 1001,
114114
"default": None,
115-
"name": "tpep_pickup_datetime",
115+
"name": "tpep_pickup_day",
116116
"type": ["null", {"type": "int", "logicalType": "date"}],
117117
},
118118
],

tests/conftest.py

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1178,7 +1178,7 @@ def metadata_location_gz(tmp_path_factory: pytest.TempPathFactory) -> str:
11781178
"data_file": {
11791179
"file_path": "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet",
11801180
"file_format": "PARQUET",
1181-
"partition": {"VendorID": 1, "tpep_pickup_datetime": 1925},
1181+
"partition": {"VendorID": 1, "tpep_pickup_day": 1925},
11821182
"record_count": 19513,
11831183
"file_size_in_bytes": 388872,
11841184
"block_size_in_bytes": 67108864,
@@ -1298,7 +1298,7 @@ def metadata_location_gz(tmp_path_factory: pytest.TempPathFactory) -> str:
12981298
"data_file": {
12991299
"file_path": "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=1/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00002.parquet",
13001300
"file_format": "PARQUET",
1301-
"partition": {"VendorID": 1, "tpep_pickup_datetime": None},
1301+
"partition": {"VendorID": 1, "tpep_pickup_day": None},
13021302
"record_count": 95050,
13031303
"file_size_in_bytes": 1265950,
13041304
"block_size_in_bytes": 67108864,
@@ -1383,7 +1383,7 @@ def metadata_location_gz(tmp_path_factory: pytest.TempPathFactory) -> str:
13831383
{"key": 3, "value": b"\x01\x00\x00\x00\x00\x00\x00\x00"},
13841384
{"key": 4, "value": b"\x00\x00\x00\x00"},
13851385
{"key": 5, "value": b"\x01\x00\x00\x00"},
1386-
{"key": 6, "value": b"N"},
1386+
{"key": 6, "value": b"\x01\x00\x00\x00"},
13871387
{"key": 7, "value": b"\x01\x00\x00\x00"},
13881388
{"key": 8, "value": b"\x01\x00\x00\x00"},
13891389
{"key": 9, "value": b"\x01\x00\x00\x00"},
@@ -1403,7 +1403,7 @@ def metadata_location_gz(tmp_path_factory: pytest.TempPathFactory) -> str:
14031403
{"key": 3, "value": b"\x06\x00\x00\x00\x00\x00\x00\x00"},
14041404
{"key": 4, "value": b"\x06\x00\x00\x00"},
14051405
{"key": 5, "value": b"c\x00\x00\x00"},
1406-
{"key": 6, "value": b"Y"},
1406+
{"key": 6, "value": b"c\x00\x00\x00"},
14071407
{"key": 7, "value": b"\t\x01\x00\x00"},
14081408
{"key": 8, "value": b"\t\x01\x00\x00"},
14091409
{"key": 9, "value": b"\x04\x00\x00\x00"},
@@ -1677,7 +1677,7 @@ def avro_schema_manifest_entry() -> Dict[str, Any]:
16771677
{
16781678
"field-id": 1001,
16791679
"default": None,
1680-
"name": "tpep_pickup_datetime",
1680+
"name": "tpep_pickup_day",
16811681
"type": ["null", {"type": "int", "logicalType": "date"}],
16821682
},
16831683
],
@@ -1863,7 +1863,25 @@ def simple_map() -> MapType:
18631863
@pytest.fixture(scope="session")
18641864
def test_schema() -> Schema:
18651865
return Schema(
1866-
NestedField(1, "VendorID", IntegerType(), False), NestedField(2, "tpep_pickup_datetime", TimestampType(), False)
1866+
NestedField(1, "VendorID", IntegerType(), False),
1867+
NestedField(2, "tpep_pickup_datetime", TimestampType(), False),
1868+
NestedField(3, "tpep_dropoff_datetime", TimestampType(), False),
1869+
NestedField(4, "passenger_count", LongType(), False),
1870+
NestedField(5, "trip_distance", DoubleType(), False),
1871+
NestedField(6, "RatecodeID", DoubleType(), False),
1872+
NestedField(7, "store_and_fwd_flag", StringType(), False),
1873+
NestedField(8, "PULocationID", IntegerType(), False),
1874+
NestedField(9, "DOLocationID", IntegerType(), False),
1875+
NestedField(10, "payment_type", LongType(), False),
1876+
NestedField(11, "fare_amount", DoubleType(), False),
1877+
NestedField(12, "extra", DoubleType(), False),
1878+
NestedField(13, "mta_tax", DoubleType(), False),
1879+
NestedField(14, "tip_amount", DoubleType(), False),
1880+
NestedField(15, "tolls_amount", DoubleType(), False),
1881+
NestedField(16, "improvement_surcharge", DoubleType(), False),
1882+
NestedField(17, "total_amount", DoubleType(), False),
1883+
NestedField(18, "congestion_surcharge", DoubleType(), False),
1884+
NestedField(19, "Airport_fee", DoubleType(), False),
18671885
)
18681886

18691887

@@ -1969,7 +1987,7 @@ def iceberg_manifest_entry_schema() -> Schema:
19691987
),
19701988
NestedField(
19711989
field_id=1001,
1972-
name="tpep_pickup_datetime",
1990+
name="tpep_pickup_day",
19731991
field_type=DateType(),
19741992
required=False,
19751993
),

tests/utils/test_manifest.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
4242
from pyiceberg.schema import Schema
4343
from pyiceberg.table.snapshots import Operation, Snapshot, Summary
44-
from pyiceberg.typedef import Record, TableVersion
44+
from pyiceberg.typedef import TableVersion
4545
from pyiceberg.types import IntegerType, NestedField
4646

4747

@@ -85,7 +85,7 @@ def test_read_manifest_entry(generated_manifest_entry_file: str) -> None:
8585
== "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet"
8686
)
8787
assert data_file.file_format == FileFormat.PARQUET
88-
assert repr(data_file.partition) == "Record[1, 1925]"
88+
assert data_file.partition == [1, 1925]
8989
assert data_file.record_count == 19513
9090
assert data_file.file_size_in_bytes == 388872
9191
assert data_file.column_sizes == {
@@ -184,7 +184,7 @@ def test_read_manifest_entry(generated_manifest_entry_file: str) -> None:
184184
}
185185
assert data_file.key_metadata is None
186186
assert data_file.split_offsets == [4]
187-
assert data_file.equality_ids is None
187+
assert data_file.equality_ids == []
188188
assert data_file.sort_order_id == 0
189189

190190

@@ -422,7 +422,7 @@ def test_write_manifest(
422422
== "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet"
423423
)
424424
assert data_file.file_format == FileFormat.PARQUET
425-
assert data_file.partition == Record(1, 1925)
425+
assert data_file.partition == [1, 1925]
426426
assert data_file.record_count == 19513
427427
assert data_file.file_size_in_bytes == 388872
428428
assert data_file.column_sizes == {
@@ -521,7 +521,7 @@ def test_write_manifest(
521521
}
522522
assert data_file.key_metadata is None
523523
assert data_file.split_offsets == [4]
524-
assert data_file.equality_ids is None
524+
assert data_file.equality_ids == []
525525
assert data_file.sort_order_id == 0
526526

527527

0 commit comments

Comments
 (0)