Skip to content

Commit 4361d29

Browse files
Implementation
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent c6b8508 commit 4361d29

10 files changed

Lines changed: 1522 additions & 4 deletions

File tree

pyiceberg/catalog/__init__.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE,
4848
CommitTableResponse,
4949
CreateTableTransaction,
50+
ReplaceTableTransaction,
5051
StagedTable,
5152
Table,
5253
TableProperties,
@@ -442,6 +443,66 @@ def create_table_if_not_exists(
442443
except TableAlreadyExistsError:
443444
return self.load_table(identifier)
444445

446+
@abstractmethod
447+
def replace_table(
448+
self,
449+
identifier: str | Identifier,
450+
schema: Schema | pa.Schema,
451+
location: str | None = None,
452+
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
453+
sort_order: SortOrder = UNSORTED_SORT_ORDER,
454+
properties: Properties = EMPTY_DICT,
455+
) -> Table:
456+
"""Atomically replace a table's schema, spec, sort order, location, and properties.
457+
458+
The table UUID and history (snapshots, schemas, specs, sort orders) are preserved.
459+
The current snapshot is cleared (main branch ref is removed).
460+
461+
Args:
462+
identifier (str | Identifier): Table identifier.
463+
schema (Schema): New table schema.
464+
location (str | None): New table location. Defaults to the existing location.
465+
partition_spec (PartitionSpec): New partition spec.
466+
sort_order (SortOrder): New sort order.
467+
properties (Properties): New table properties (merged with existing).
468+
469+
Returns:
470+
Table: the replaced table instance.
471+
472+
Raises:
473+
NoSuchTableError: If the table does not exist.
474+
"""
475+
476+
@abstractmethod
477+
def replace_table_transaction(
478+
self,
479+
identifier: str | Identifier,
480+
schema: Schema | pa.Schema,
481+
location: str | None = None,
482+
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
483+
sort_order: SortOrder = UNSORTED_SORT_ORDER,
484+
properties: Properties = EMPTY_DICT,
485+
) -> ReplaceTableTransaction:
486+
"""Create a ReplaceTableTransaction.
487+
488+
The transaction can be used to stage additional changes (schema evolution,
489+
partition evolution, etc.) before committing.
490+
491+
Args:
492+
identifier (str | Identifier): Table identifier.
493+
schema (Schema): New table schema.
494+
location (str | None): New table location. Defaults to the existing location.
495+
partition_spec (PartitionSpec): New partition spec.
496+
sort_order (SortOrder): New sort order.
497+
properties (Properties): New table properties (merged with existing).
498+
499+
Returns:
500+
ReplaceTableTransaction: A transaction for the replace operation.
501+
502+
Raises:
503+
NoSuchTableError: If the table does not exist.
504+
"""
505+
445506
@abstractmethod
446507
def load_table(self, identifier: str | Identifier) -> Table:
447508
"""Load the table's metadata and returns the table instance.
@@ -888,6 +949,28 @@ def create_table_transaction(
888949
self._create_staged_table(identifier, schema, location, partition_spec, sort_order, properties)
889950
)
890951

952+
def replace_table(
953+
self,
954+
identifier: str | Identifier,
955+
schema: Schema | pa.Schema,
956+
location: str | None = None,
957+
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
958+
sort_order: SortOrder = UNSORTED_SORT_ORDER,
959+
properties: Properties = EMPTY_DICT,
960+
) -> Table:
961+
raise NotImplementedError("replace_table is not yet supported for this catalog type")
962+
963+
def replace_table_transaction(
964+
self,
965+
identifier: str | Identifier,
966+
schema: Schema | pa.Schema,
967+
location: str | None = None,
968+
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
969+
sort_order: SortOrder = UNSORTED_SORT_ORDER,
970+
properties: Properties = EMPTY_DICT,
971+
) -> ReplaceTableTransaction:
972+
raise NotImplementedError("replace_table_transaction is not yet supported for this catalog type")
973+
891974
def table_exists(self, identifier: str | Identifier) -> bool:
892975
try:
893976
self.load_table(identifier)

pyiceberg/catalog/noop.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from pyiceberg.table import (
2727
CommitTableResponse,
2828
CreateTableTransaction,
29+
ReplaceTableTransaction,
2930
Table,
3031
)
3132
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -64,6 +65,28 @@ def create_table_transaction(
6465
) -> CreateTableTransaction:
6566
raise NotImplementedError
6667

68+
def replace_table(
69+
self,
70+
identifier: str | Identifier,
71+
schema: Schema | pa.Schema,
72+
location: str | None = None,
73+
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
74+
sort_order: SortOrder = UNSORTED_SORT_ORDER,
75+
properties: Properties = EMPTY_DICT,
76+
) -> Table:
77+
raise NotImplementedError
78+
79+
def replace_table_transaction(
80+
self,
81+
identifier: str | Identifier,
82+
schema: Schema | pa.Schema,
83+
location: str | None = None,
84+
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
85+
sort_order: SortOrder = UNSORTED_SORT_ORDER,
86+
properties: Properties = EMPTY_DICT,
87+
) -> ReplaceTableTransaction:
88+
raise NotImplementedError
89+
6790
def load_table(self, identifier: str | Identifier) -> Table:
6891
raise NotImplementedError
6992

pyiceberg/catalog/rest/__init__.py

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,19 @@
6767
FileIO,
6868
load_file_io,
6969
)
70-
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec, assign_fresh_partition_spec_ids
71-
from pyiceberg.schema import Schema, assign_fresh_schema_ids
70+
from pyiceberg.partitioning import (
71+
UNPARTITIONED_PARTITION_SPEC,
72+
PartitionSpec,
73+
assign_fresh_partition_spec_ids,
74+
assign_fresh_partition_spec_ids_for_replace,
75+
)
76+
from pyiceberg.schema import Schema, assign_fresh_schema_ids, assign_fresh_schema_ids_for_replace
7277
from pyiceberg.table import (
7378
CommitTableRequest,
7479
CommitTableResponse,
7580
CreateTableTransaction,
7681
FileScanTask,
82+
ReplaceTableTransaction,
7783
StagedTable,
7884
Table,
7985
TableIdentifier,
@@ -937,6 +943,77 @@ def create_table_transaction(
937943
staged_table = self._response_to_staged_table(self.identifier_to_tuple(identifier), table_response)
938944
return CreateTableTransaction(staged_table)
939945

946+
def replace_table(
947+
self,
948+
identifier: str | Identifier,
949+
schema: Schema | pa.Schema,
950+
location: str | None = None,
951+
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
952+
sort_order: SortOrder = UNSORTED_SORT_ORDER,
953+
properties: Properties = EMPTY_DICT,
954+
) -> Table:
955+
txn = self.replace_table_transaction(
956+
identifier=identifier,
957+
schema=schema,
958+
location=location,
959+
partition_spec=partition_spec,
960+
sort_order=sort_order,
961+
properties=properties,
962+
)
963+
return txn.commit_transaction()
964+
965+
@retry(**_RETRY_ARGS)
966+
def replace_table_transaction(
967+
self,
968+
identifier: str | Identifier,
969+
schema: Schema | pa.Schema,
970+
location: str | None = None,
971+
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
972+
sort_order: SortOrder = UNSORTED_SORT_ORDER,
973+
properties: Properties = EMPTY_DICT,
974+
) -> ReplaceTableTransaction:
975+
existing_table = self.load_table(identifier)
976+
existing_metadata = existing_table.metadata
977+
978+
iceberg_schema = self._convert_schema_if_needed(
979+
schema,
980+
int(properties.get(TableProperties.FORMAT_VERSION, existing_metadata.format_version)), # type: ignore
981+
)
982+
983+
# Assign fresh schema IDs, reusing IDs from the existing schema by field name
984+
fresh_schema, _ = assign_fresh_schema_ids_for_replace(
985+
iceberg_schema, existing_metadata.schema(), existing_metadata.last_column_id
986+
)
987+
988+
# Assign fresh partition spec IDs, reusing IDs from existing specs
989+
fresh_partition_spec, _ = assign_fresh_partition_spec_ids_for_replace(
990+
partition_spec, iceberg_schema, fresh_schema, existing_metadata.partition_specs, existing_metadata.last_partition_id
991+
)
992+
993+
# Assign fresh sort order IDs
994+
fresh_sort_order = assign_fresh_sort_order_ids(sort_order, iceberg_schema, fresh_schema)
995+
996+
# Use existing location if not specified
997+
resolved_location = location.rstrip("/") if location else existing_metadata.location
998+
999+
# Create a StagedTable from the existing table
1000+
staged_table = StagedTable(
1001+
identifier=existing_table.name(),
1002+
metadata=existing_metadata,
1003+
metadata_location=existing_table.metadata_location,
1004+
io=existing_table.io,
1005+
catalog=self,
1006+
)
1007+
1008+
return ReplaceTableTransaction(
1009+
table=staged_table,
1010+
new_schema=fresh_schema,
1011+
new_spec=fresh_partition_spec,
1012+
new_sort_order=fresh_sort_order,
1013+
new_location=resolved_location,
1014+
new_properties=properties,
1015+
)
1016+
9401017
@retry(**_RETRY_ARGS)
9411018
def create_view(
9421019
self,

pyiceberg/partitioning.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,73 @@ def assign_fresh_partition_spec_ids(spec: PartitionSpec, old_schema: Schema, fre
335335
return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID)
336336

337337

338+
def assign_fresh_partition_spec_ids_for_replace(
339+
spec: PartitionSpec,
340+
old_schema: Schema,
341+
fresh_schema: Schema,
342+
existing_specs: list[PartitionSpec],
343+
last_partition_id: int | None,
344+
) -> tuple[PartitionSpec, int]:
345+
"""Assign partition field IDs for a replace operation, reusing IDs from existing specs.
346+
347+
For each partition field, if a field with the same (source_id, transform) pair exists in
348+
any of the existing specs, its partition field ID is reused; otherwise a fresh ID is
349+
allocated starting from last_partition_id + 1.
350+
351+
Args:
352+
spec: The new partition spec to assign IDs to.
353+
old_schema: The schema that the new spec's source_ids reference.
354+
fresh_schema: The schema with freshly assigned field IDs.
355+
existing_specs: All partition specs from the existing table metadata.
356+
last_partition_id: The current table's last_partition_id.
357+
358+
Returns:
359+
A tuple of (fresh_spec, new_last_partition_id).
360+
"""
361+
effective_last_partition_id = last_partition_id if last_partition_id is not None else PARTITION_FIELD_ID_START - 1
362+
363+
# Build (source_id, transform) → partition_field_id mapping from all existing specs
364+
# Use max() for dedup when the same (source_id, transform) appears in multiple specs
365+
transform_to_field_id: dict[tuple[int, str], int] = {}
366+
for existing_spec in existing_specs:
367+
for field in existing_spec.fields:
368+
key = (field.source_id, str(field.transform))
369+
if key not in transform_to_field_id or field.field_id > transform_to_field_id[key]:
370+
transform_to_field_id[key] = field.field_id
371+
372+
next_id = effective_last_partition_id
373+
partition_fields = []
374+
for field in spec.fields:
375+
original_column_name = old_schema.find_column_name(field.source_id)
376+
if original_column_name is None:
377+
raise ValueError(f"Could not find in old schema: {field}")
378+
fresh_field = fresh_schema.find_field(original_column_name)
379+
if fresh_field is None:
380+
raise ValueError(f"Could not find field in fresh schema: {original_column_name}")
381+
382+
validate_partition_name(field.name, field.transform, fresh_field.field_id, fresh_schema, set())
383+
384+
key = (fresh_field.field_id, str(field.transform))
385+
if key in transform_to_field_id:
386+
partition_field_id = transform_to_field_id[key]
387+
else:
388+
next_id += 1
389+
partition_field_id = next_id
390+
transform_to_field_id[key] = partition_field_id
391+
392+
partition_fields.append(
393+
PartitionField(
394+
name=field.name,
395+
source_id=fresh_field.field_id,
396+
field_id=partition_field_id,
397+
transform=field.transform,
398+
)
399+
)
400+
401+
new_last_partition_id = max(next_id, effective_last_partition_id)
402+
return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID), new_last_partition_id
403+
404+
338405
T = TypeVar("T")
339406

340407

pyiceberg/schema.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1380,6 +1380,58 @@ def primitive(self, primitive: PrimitiveType) -> PrimitiveType:
13801380
return primitive
13811381

13821382

1383+
class _SetFreshIDsForReplace(_SetFreshIDs):
1384+
"""Assign fresh IDs for a replace operation, reusing IDs from the base schema by field name.
1385+
1386+
For each field in the new schema, if a field with the same full name exists in the
1387+
base schema, its ID is reused; otherwise a fresh ID is allocated starting from
1388+
last_column_id + 1.
1389+
"""
1390+
1391+
def __init__(self, old_id_to_base_id: dict[int, int], starting_id: int) -> None:
1392+
self.old_id_to_new_id: dict[int, int] = {}
1393+
self._old_id_to_base_id = old_id_to_base_id
1394+
counter = itertools.count(starting_id + 1)
1395+
self.next_id_func = lambda: next(counter)
1396+
1397+
def _get_and_increment(self, current_id: int) -> int:
1398+
if current_id in self._old_id_to_base_id:
1399+
new_id = self._old_id_to_base_id[current_id]
1400+
else:
1401+
new_id = self.next_id_func()
1402+
self.old_id_to_new_id[current_id] = new_id
1403+
return new_id
1404+
1405+
1406+
def assign_fresh_schema_ids_for_replace(schema: Schema, base_schema: Schema, last_column_id: int) -> tuple[Schema, int]:
1407+
"""Assign fresh IDs to a schema for a replace operation, reusing IDs from the base schema.
1408+
1409+
For each field in the new schema, if a field with the same full path name exists
1410+
in the base schema, its ID is reused. New fields get IDs starting from
1411+
last_column_id + 1.
1412+
1413+
Args:
1414+
schema: The new schema to assign IDs to.
1415+
base_schema: The existing table's current schema (IDs are reused from here by name).
1416+
last_column_id: The current table's last_column_id (new IDs start above this).
1417+
1418+
Returns:
1419+
A tuple of (fresh_schema, new_last_column_id).
1420+
"""
1421+
base_name_to_id = index_by_name(base_schema)
1422+
new_id_to_name = index_name_by_id(schema)
1423+
1424+
old_id_to_base_id: dict[int, int] = {}
1425+
for old_id, name in new_id_to_name.items():
1426+
if name in base_name_to_id:
1427+
old_id_to_base_id[old_id] = base_name_to_id[name]
1428+
1429+
visitor = _SetFreshIDsForReplace(old_id_to_base_id, last_column_id)
1430+
fresh_schema = pre_order_visit(schema, visitor)
1431+
new_last_column_id = max(fresh_schema.highest_field_id, last_column_id)
1432+
return fresh_schema, new_last_column_id
1433+
1434+
13831435
# Implementation copied from Apache Iceberg repo.
13841436
def make_compatible_name(name: str) -> str:
13851437
"""Make a field name compatible with Avro specification.

0 commit comments

Comments
 (0)