Skip to content

Commit 9adfd5b

Browse files
committed
Enable V3 manifest writing and row-lineage snapshot commits
1 parent 9687d08 commit 9adfd5b

10 files changed

Lines changed: 244 additions & 36 deletions

File tree

pyiceberg/manifest.py

Lines changed: 92 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454

5555
UNASSIGNED_SEQ = -1
5656
DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024
57-
DEFAULT_READ_VERSION: Literal[2] = 2
57+
DEFAULT_READ_VERSION: Literal[3] = 3
5858

5959
INITIAL_SEQUENCE_NUMBER = 0
6060

@@ -852,6 +852,17 @@ def partitions(self) -> list[PartitionFieldSummary] | None:
852852
def key_metadata(self) -> bytes | None:
853853
return self._data[14]
854854

855+
@property
856+
def first_row_id(self) -> int | None:
857+
return self._data[15] if len(self._data) > 15 else None
858+
859+
@first_row_id.setter
860+
def first_row_id(self, value: int | None) -> None:
861+
if len(self._data) <= 15:
862+
self._data.append(value)
863+
else:
864+
self._data[15] = value
865+
855866
def has_added_files(self) -> bool:
856867
return self.added_files_count is None or self.added_files_count > 0
857868

@@ -1240,6 +1251,12 @@ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
12401251
return entry
12411252

12421253

1254+
class ManifestWriterV3(ManifestWriterV2):
1255+
@property
1256+
def version(self) -> TableVersion:
1257+
return 3
1258+
1259+
12431260
def write_manifest(
12441261
format_version: TableVersion,
12451262
spec: PartitionSpec,
@@ -1252,6 +1269,8 @@ def write_manifest(
12521269
return ManifestWriterV1(spec, schema, output_file, snapshot_id, avro_compression)
12531270
elif format_version == 2:
12541271
return ManifestWriterV2(spec, schema, output_file, snapshot_id, avro_compression)
1272+
elif format_version == 3:
1273+
return ManifestWriterV3(spec, schema, output_file, snapshot_id, avro_compression)
12551274
else:
12561275
raise ValueError(f"Cannot write manifest for table version: {format_version}")
12571276

@@ -1295,6 +1314,10 @@ def __exit__(
12951314
@abstractmethod
12961315
def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: ...
12971316

1317+
@property
1318+
def next_row_id(self) -> int | None:
1319+
return None
1320+
12981321
def add_manifests(self, manifest_files: list[ManifestFile]) -> ManifestListWriter:
12991322
self._writer.write_block([self.prepare_manifest(manifest_file) for manifest_file in manifest_files])
13001323
return self
@@ -1351,9 +1374,7 @@ def __init__(
13511374
self._commit_snapshot_id = snapshot_id
13521375
self._sequence_number = sequence_number
13531376

1354-
def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
1355-
wrapped_manifest_file = copy(manifest_file)
1356-
1377+
def _prepare_manifest_for_commit(self, wrapped_manifest_file: ManifestFile) -> ManifestFile:
13571378
if wrapped_manifest_file.sequence_number == UNASSIGNED_SEQ:
13581379
# if the sequence number is being assigned here, then the manifest must be created by the current operation.
13591380
# To validate this, check that the snapshot id matches the current commit
@@ -1374,6 +1395,59 @@ def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
13741395
wrapped_manifest_file.min_sequence_number = self._sequence_number
13751396
return wrapped_manifest_file
13761397

1398+
def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
1399+
return self._prepare_manifest_for_commit(copy(manifest_file))
1400+
1401+
1402+
class ManifestListWriterV3(ManifestListWriterV2):
1403+
_next_row_id: int
1404+
1405+
def __init__(
1406+
self,
1407+
output_file: OutputFile,
1408+
snapshot_id: int,
1409+
parent_snapshot_id: int | None,
1410+
sequence_number: int,
1411+
snapshot_first_row_id: int,
1412+
compression: AvroCompressionCodec,
1413+
):
1414+
super().__init__(
1415+
output_file=output_file,
1416+
snapshot_id=snapshot_id,
1417+
parent_snapshot_id=parent_snapshot_id,
1418+
sequence_number=sequence_number,
1419+
compression=compression,
1420+
)
1421+
self._format_version = 3
1422+
self._meta = {
1423+
"snapshot-id": str(snapshot_id),
1424+
"parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null",
1425+
"sequence-number": str(sequence_number),
1426+
"first-row-id": str(snapshot_first_row_id),
1427+
"format-version": "3",
1428+
AVRO_CODEC_KEY: compression,
1429+
}
1430+
self._next_row_id = snapshot_first_row_id
1431+
1432+
@property
1433+
def next_row_id(self) -> int | None:
1434+
return self._next_row_id
1435+
1436+
def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
1437+
wrapped_manifest_file = self._prepare_manifest_for_commit(copy(manifest_file))
1438+
1439+
if wrapped_manifest_file.content == ManifestContent.DATA and wrapped_manifest_file.first_row_id is None:
1440+
if wrapped_manifest_file.existing_rows_count is None or wrapped_manifest_file.added_rows_count is None:
1441+
raise ValueError(
1442+
"Cannot assign first row id for a v3 manifest without existing-rows-count and added-rows-count: "
1443+
f"{wrapped_manifest_file.manifest_path}"
1444+
)
1445+
1446+
wrapped_manifest_file.first_row_id = self._next_row_id
1447+
self._next_row_id += wrapped_manifest_file.existing_rows_count + wrapped_manifest_file.added_rows_count
1448+
1449+
return wrapped_manifest_file
1450+
13771451

13781452
def write_manifest_list(
13791453
format_version: TableVersion,
@@ -1382,12 +1456,26 @@ def write_manifest_list(
13821456
parent_snapshot_id: int | None,
13831457
sequence_number: int | None,
13841458
avro_compression: AvroCompressionCodec,
1459+
snapshot_first_row_id: int | None = None,
13851460
) -> ManifestListWriter:
13861461
if format_version == 1:
13871462
return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id, avro_compression)
13881463
elif format_version == 2:
13891464
if sequence_number is None:
13901465
raise ValueError(f"Sequence-number is required for V2 tables: {sequence_number}")
13911466
return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number, avro_compression)
1467+
elif format_version == 3:
1468+
if sequence_number is None:
1469+
raise ValueError(f"Sequence-number is required for V3 tables: {sequence_number}")
1470+
if snapshot_first_row_id is None:
1471+
raise ValueError(f"snapshot_first_row_id is required for V3 tables: {snapshot_first_row_id}")
1472+
return ManifestListWriterV3(
1473+
output_file=output_file,
1474+
snapshot_id=snapshot_id,
1475+
parent_snapshot_id=parent_snapshot_id,
1476+
sequence_number=sequence_number,
1477+
snapshot_first_row_id=snapshot_first_row_id,
1478+
compression=avro_compression,
1479+
)
13921480
else:
13931481
raise ValueError(f"Cannot write manifest list for table version: {format_version}")

pyiceberg/table/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ def upgrade_table_version(self, format_version: TableVersion) -> Transaction:
335335
Returns:
336336
The alter table builder.
337337
"""
338-
if format_version not in {1, 2}:
338+
if format_version not in {1, 2, 3}:
339339
raise ValueError(f"Unsupported table format version: {format_version}")
340340

341341
if format_version < self.table_metadata.format_version:

pyiceberg/table/metadata.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
INITIAL_SPEC_ID = 0
6767
DEFAULT_SCHEMA_ID = 0
6868

69-
SUPPORTED_TABLE_FORMAT_VERSION = 2
69+
SUPPORTED_TABLE_FORMAT_VERSION = 3
7070

7171

7272
def cleanup_snapshot_id(data: dict[str, Any]) -> dict[str, Any]:
@@ -574,9 +574,6 @@ def construct_refs(self) -> TableMetadata:
574574
next_row_id: int | None = Field(alias="next-row-id", default=None)
575575
"""A long higher than all assigned row IDs; the next snapshot's `first-row-id`."""
576576

577-
def model_dump_json(self, exclude_none: bool = True, exclude: Any | None = None, by_alias: bool = True, **kwargs: Any) -> str:
578-
raise NotImplementedError("Writing V3 is not yet supported, see: https://github.com/apache/iceberg-python/issues/1551")
579-
580577

581578
TableMetadata = Annotated[TableMetadataV1 | TableMetadataV2 | TableMetadataV3, Field(discriminator="format_version")]
582579

@@ -645,6 +642,7 @@ def new_table_metadata(
645642
properties=properties,
646643
last_partition_id=fresh_partition_spec.last_assigned_field_id,
647644
table_uuid=table_uuid,
645+
next_row_id=0,
648646
)
649647
else:
650648
raise ValidationError(f"Unknown format version: {format_version}")

pyiceberg/table/update/__init__.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from pyiceberg.exceptions import CommitFailedException
2929
from pyiceberg.partitioning import PARTITION_FIELD_ID_START, PartitionSpec
3030
from pyiceberg.schema import Schema
31-
from pyiceberg.table.metadata import SUPPORTED_TABLE_FORMAT_VERSION, TableMetadata, TableMetadataUtil
31+
from pyiceberg.table.metadata import SUPPORTED_TABLE_FORMAT_VERSION, TableMetadata, TableMetadataUtil, TableMetadataV3
3232
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType
3333
from pyiceberg.table.snapshots import (
3434
MetadataLogEntry,
@@ -320,9 +320,17 @@ def _(
320320
return base_metadata
321321

322322
updated_metadata = base_metadata.model_copy(update={"format_version": update.format_version})
323+
updated_metadata = TableMetadataUtil._construct_without_validation(updated_metadata)
324+
325+
if (
326+
isinstance(updated_metadata, TableMetadataV3)
327+
and base_metadata.format_version < 3
328+
and updated_metadata.next_row_id is None
329+
):
330+
updated_metadata = updated_metadata.model_copy(update={"next_row_id": 0})
323331

324332
context.add_update(update)
325-
return TableMetadataUtil._construct_without_validation(updated_metadata)
333+
return updated_metadata
326334

327335

328336
@_apply_table_update.register(SetPropertiesUpdate)

pyiceberg/table/update/snapshot.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,8 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary:
276276

277277
def _commit(self) -> UpdatesAndRequirements:
278278
new_manifests = self._manifests()
279-
next_sequence_number = self._transaction.table_metadata.next_sequence_number()
279+
table_metadata = self._transaction.table_metadata
280+
next_sequence_number = table_metadata.next_sequence_number()
280281

281282
summary = self._summary(self.snapshot_properties)
282283
file_name = _new_manifest_list_file_name(
@@ -287,29 +288,41 @@ def _commit(self) -> UpdatesAndRequirements:
287288
location_provider = self._transaction._table.location_provider()
288289
manifest_list_file_path = location_provider.new_metadata_location(file_name)
289290

291+
snapshot_first_row_id: int | None = None
292+
if table_metadata.format_version >= 3:
293+
snapshot_first_row_id = table_metadata.next_row_id
294+
if snapshot_first_row_id is None:
295+
raise ValueError("Cannot commit to a v3 table without next-row-id")
296+
290297
with write_manifest_list(
291-
format_version=self._transaction.table_metadata.format_version,
298+
format_version=table_metadata.format_version,
292299
output_file=self._io.new_output(manifest_list_file_path),
293300
snapshot_id=self._snapshot_id,
294301
parent_snapshot_id=self._parent_snapshot_id,
295302
sequence_number=next_sequence_number,
296303
avro_compression=self._compression,
304+
snapshot_first_row_id=snapshot_first_row_id,
297305
) as writer:
298306
writer.add_manifests(new_manifests)
299307

300-
first_row_id: int | None = None
308+
added_rows: int | None = None
309+
if table_metadata.format_version >= 3:
310+
writer_next_row_id = writer.next_row_id
311+
if writer_next_row_id is None or snapshot_first_row_id is None:
312+
raise ValueError("Cannot determine assigned rows for a v3 snapshot commit")
313+
added_rows = writer_next_row_id - snapshot_first_row_id
301314

302-
if self._transaction.table_metadata.format_version >= 3:
303-
first_row_id = self._transaction.table_metadata.next_row_id
315+
first_row_id: int | None = snapshot_first_row_id
304316

305317
snapshot = Snapshot(
306318
snapshot_id=self._snapshot_id,
307319
parent_snapshot_id=self._parent_snapshot_id,
308320
manifest_list=manifest_list_file_path,
309321
sequence_number=next_sequence_number,
310322
summary=summary,
311-
schema_id=self._transaction.table_metadata.current_schema_id,
323+
schema_id=table_metadata.current_schema_id,
312324
first_row_id=first_row_id,
325+
added_rows=added_rows,
313326
)
314327

315328
add_snapshot_update = AddSnapshotUpdate(snapshot=snapshot)

tests/integration/test_reads.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -968,10 +968,10 @@ def test_upgrade_table_version(catalog: Catalog) -> None:
968968
transaction.upgrade_table_version(format_version=1)
969969
assert "Cannot downgrade v2 table to v1" in str(e.value)
970970

971-
with pytest.raises(ValueError) as e:
972-
with table_test_table_version.transaction() as transaction:
973-
transaction.upgrade_table_version(format_version=3)
974-
assert "Unsupported table format version: 3" in str(e.value)
971+
with table_test_table_version.transaction() as transaction:
972+
transaction.upgrade_table_version(format_version=3)
973+
974+
assert table_test_table_version.format_version == 3
975975

976976

977977
@pytest.mark.integration

tests/integration/test_writes/test_writes.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2336,10 +2336,10 @@ def test_nanosecond_support_on_catalog(
23362336

23372337
_create_table(session_catalog, identifier, {"format-version": "3"}, schema=arrow_table_schema_with_all_timestamp_precisions)
23382338

2339-
with pytest.raises(NotImplementedError, match="Writing V3 is not yet supported"):
2340-
catalog.create_table(
2341-
"ns.table1", schema=arrow_table_schema_with_all_timestamp_precisions, properties={"format-version": "3"}
2342-
)
2339+
nanosecond_table = catalog.create_table(
2340+
"ns.table1", schema=arrow_table_schema_with_all_timestamp_precisions, properties={"format-version": "3"}
2341+
)
2342+
assert nanosecond_table.format_version == 3
23432343

23442344
with pytest.raises(
23452345
UnsupportedPyArrowTypeException, match=re.escape("Column 'timestamp_ns' has an unsupported type: timestamp[ns]")
@@ -2495,7 +2495,6 @@ def test_stage_only_overwrite_files(
24952495
assert parent_snapshot_id == [None, first_snapshot, second_snapshot, second_snapshot, second_snapshot]
24962496

24972497

2498-
@pytest.mark.skip("V3 writer support is not enabled.")
24992498
@pytest.mark.integration
25002499
def test_v3_write_and_read_row_lineage(spark: SparkSession, session_catalog: Catalog) -> None:
25012500
"""Test writing to a v3 table and reading with Spark."""
@@ -2528,6 +2527,11 @@ def test_v3_write_and_read_row_lineage(spark: SparkSession, session_catalog: Cat
25282527

25292528
tbl.append(test_data)
25302529

2530+
current_snapshot = tbl.current_snapshot()
2531+
assert current_snapshot is not None
2532+
assert current_snapshot.first_row_id == initial_next_row_id
2533+
assert current_snapshot.added_rows == len(test_data)
2534+
25312535
assert tbl.metadata.next_row_id == initial_next_row_id + len(test_data), (
25322536
"Expected next_row_id to be incremented by the number of added rows"
25332537
)

tests/table/test_init.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
Table,
4242
TableIdentifier,
4343
)
44-
from pyiceberg.table.metadata import TableMetadataUtil, TableMetadataV2, _generate_snapshot_id
44+
from pyiceberg.table.metadata import TableMetadataUtil, TableMetadataV2, TableMetadataV3, _generate_snapshot_id
4545
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType
4646
from pyiceberg.table.snapshots import (
4747
MetadataLogEntry,
@@ -81,6 +81,7 @@
8181
SetPropertiesUpdate,
8282
SetSnapshotRefUpdate,
8383
SetStatisticsUpdate,
84+
UpgradeFormatVersionUpdate,
8485
_apply_table_update,
8586
_TableMetadataUpdateContext,
8687
update_table_metadata,
@@ -942,6 +943,14 @@ def test_update_metadata_update_sort_order_invalid(table_v2: Table) -> None:
942943
update_table_metadata(table_v2.metadata, (SetDefaultSortOrderUpdate(sort_order_id=invalid_order_id),))
943944

944945

946+
def test_upgrade_format_version_to_v3_initializes_next_row_id(table_v2: Table) -> None:
947+
new_metadata = update_table_metadata(table_v2.metadata, (UpgradeFormatVersionUpdate(format_version=3),))
948+
949+
assert isinstance(new_metadata, TableMetadataV3)
950+
assert new_metadata.format_version == 3
951+
assert new_metadata.next_row_id == 0
952+
953+
945954
def test_update_metadata_with_multiple_updates(table_v1: Table) -> None:
946955
base_metadata = table_v1.metadata
947956
transaction = table_v1.transaction()

tests/table/test_metadata.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -184,12 +184,13 @@ def test_serialize_v2(example_table_metadata_v2: dict[str, Any]) -> None:
184184

185185

186186
def test_serialize_v3(example_table_metadata_v3: dict[str, Any]) -> None:
187-
# Writing will be part of https://github.com/apache/iceberg-python/issues/1551
188-
189-
with pytest.raises(NotImplementedError) as exc_info:
190-
_ = TableMetadataV3(**example_table_metadata_v3).model_dump_json()
187+
table_metadata = TableMetadataV3(**example_table_metadata_v3)
188+
table_metadata_json = table_metadata.model_dump_json()
189+
parsed = json.loads(table_metadata_json)
191190

192-
assert "Writing V3 is not yet supported, see: https://github.com/apache/iceberg-python/issues/1551" in str(exc_info.value)
191+
assert parsed["format-version"] == 3
192+
assert parsed["next-row-id"] == 1
193+
assert TableMetadataV3(**parsed) == table_metadata
193194

194195

195196
def test_migrate_v1_schemas(example_table_metadata_v1: dict[str, Any]) -> None:
@@ -837,6 +838,7 @@ def test_new_table_metadata_with_v3_schema() -> None:
837838
default_sort_order_id=1,
838839
refs={},
839840
format_version=3,
841+
next_row_id=0,
840842
)
841843

842844
assert actual.model_dump() == expected.model_dump()

0 commit comments

Comments
 (0)