Skip to content

Commit 808e611

Browse files
Improvements
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 7528be0 commit 808e611

7 files changed

Lines changed: 615 additions & 23 deletions

File tree

pyiceberg/catalog/rest/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -977,12 +977,12 @@ def replace_table_transaction(
977977

978978
iceberg_schema = self._convert_schema_if_needed(
979979
schema,
980-
int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore
980+
int(properties.get(TableProperties.FORMAT_VERSION, existing_metadata.format_version)), # type: ignore
981981
)
982982

983-
# Assign fresh schema IDs, reusing IDs from the existing schema by field name
983+
# Assign fresh schema IDs, reusing IDs from all existing schemas by field name
984984
fresh_schema, _ = assign_fresh_schema_ids_for_replace(
985-
iceberg_schema, existing_metadata.schema(), existing_metadata.last_column_id
985+
iceberg_schema, existing_metadata.schemas, existing_metadata.last_column_id
986986
)
987987

988988
# Assign fresh partition spec IDs, reusing IDs from existing specs

pyiceberg/schema.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1403,22 +1403,32 @@ def _get_and_increment(self, current_id: int) -> int:
14031403
return new_id
14041404

14051405

1406-
def assign_fresh_schema_ids_for_replace(schema: Schema, base_schema: Schema, last_column_id: int) -> tuple[Schema, int]:
1407-
"""Assign fresh IDs to a schema for a replace operation, reusing IDs from the base schema.
1406+
def assign_fresh_schema_ids_for_replace(schema: Schema, all_schemas: list[Schema], last_column_id: int) -> tuple[Schema, int]:
1407+
"""Assign fresh IDs to a schema for a replace operation, reusing IDs from existing schemas.
14081408
14091409
For each field in the new schema, if a field with the same full path name exists
1410-
in the base schema, its ID is reused. New fields get IDs starting from
1410+
in any of the existing schemas, its ID is reused. New fields get IDs starting from
14111411
last_column_id + 1.
14121412
14131413
Args:
14141414
schema: The new schema to assign IDs to.
1415-
base_schema: The existing table's schema (IDs are reused from here by name).
1415+
all_schemas: All schemas from the existing table metadata (IDs are reused from here by name).
14161416
last_column_id: The current table's last_column_id (new IDs start above this).
14171417
14181418
Returns:
14191419
A tuple of (fresh_schema, new_last_column_id).
14201420
"""
1421-
base_name_to_id = index_by_name(base_schema)
1421+
# N.B. We diverge from the Java implementation by using ALL historical schemas for
1422+
# field ID reuse, not just the current schema. Java's TypeUtil.assignFreshIds only
1423+
# uses the current schema as the base, so a replace A→B→A where A and B have
1424+
# disjoint fields would create a 3rd schema (field IDs from A are lost when B is
1425+
# current). By using all schemas, we can recover those IDs and correctly deduplicate
1426+
# the schema on the way back. This is safe because Iceberg guarantees field
1427+
# name-to-ID consistency across schema evolution.
1428+
base_name_to_id: dict[str, int] = {}
1429+
for existing_schema in all_schemas:
1430+
base_name_to_id.update(index_by_name(existing_schema))
1431+
14221432
new_id_to_name = index_name_by_id(schema)
14231433

14241434
old_id_to_base_id: dict[int, int] = {}

pyiceberg/table/__init__.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1031,11 +1031,18 @@ def _initial_changes(
10311031
# Remove the main branch ref to clear the current snapshot
10321032
self._updates += (RemoveSnapshotRefUpdate(ref_name=MAIN_BRANCH),)
10331033

1034-
# Set the new schema (always added since field IDs are freshly assigned)
1035-
self._updates += (
1036-
AddSchemaUpdate(schema_=new_schema),
1037-
SetCurrentSchemaUpdate(schema_id=-1),
1038-
)
1034+
# Set the new schema, reusing an existing schema if structurally identical.
1035+
# Schema.__eq__ compares fields and identifier_field_ids (ignoring schema_id),
1036+
# matching Java's sameSchema() behavior.
1037+
existing_schema_id = self._find_matching_schema_id(table_metadata, new_schema)
1038+
if existing_schema_id is not None:
1039+
if existing_schema_id != table_metadata.current_schema_id:
1040+
self._updates += (SetCurrentSchemaUpdate(schema_id=existing_schema_id),)
1041+
else:
1042+
self._updates += (
1043+
AddSchemaUpdate(schema_=new_schema),
1044+
SetCurrentSchemaUpdate(schema_id=-1),
1045+
)
10391046

10401047
# Set the new partition spec.
10411048
# Only emit AddPartitionSpecUpdate + SetDefaultSpecUpdate(-1) when the spec
@@ -1072,6 +1079,14 @@ def _initial_changes(
10721079
if new_properties:
10731080
self._updates += (SetPropertiesUpdate(updates=new_properties),)
10741081

1082+
@staticmethod
1083+
def _find_matching_schema_id(table_metadata: TableMetadata, schema: Schema) -> int | None:
1084+
"""Find an existing schema structurally equal to the given one, returning its schema_id or None."""
1085+
for existing in table_metadata.schemas:
1086+
if existing == schema:
1087+
return existing.schema_id
1088+
return None
1089+
10751090
@staticmethod
10761091
def _find_matching_spec_id(table_metadata: TableMetadata, spec: PartitionSpec) -> int | None:
10771092
"""Find an existing partition spec with the same fields, returning its spec_id or None."""

tests/catalog/test_rest.py

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2891,6 +2891,12 @@ def test_replace_table_transaction_with_partition_spec(
28912891
assert spec_fields[0]["transform"] == "truncate[3]"
28922892
assert spec_fields[0]["name"] == "id_trunc"
28932893

2894+
# set-default-spec should also be present, pointing to the newly added spec
2895+
actions = [u["action"] for u in updates]
2896+
assert "set-default-spec" in actions
2897+
set_default_spec = next(u for u in updates if u["action"] == "set-default-spec")
2898+
assert set_default_spec["spec-id"] == -1
2899+
28942900

28952901
def test_replace_table_404(
28962902
rest_mock: Mocker,
@@ -2993,3 +2999,135 @@ def test_replace_table_transaction_same_location_no_set_location(
29932999
actions = [u["action"] for u in updates]
29943000
# set-location should NOT be present since location didn't change
29953001
assert "set-location" not in actions
3002+
3003+
3004+
def test_replace_table_transaction_same_schema_skips_add_schema(
3005+
rest_mock: Mocker,
3006+
example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any],
3007+
example_table_metadata_no_snapshot_v1_rest_json: dict[str, Any],
3008+
) -> None:
3009+
"""Test that replacing with the same schema skips add-schema and set-current-schema."""
3010+
table_uuid = example_table_metadata_with_snapshot_v1_rest_json["metadata"]["table-uuid"]
3011+
example_table_metadata_no_snapshot_v1_rest_json["metadata"]["table-uuid"] = table_uuid
3012+
3013+
rest_mock.get(
3014+
f"{TEST_URI}v1/namespaces/fokko/tables/fokko2",
3015+
json=example_table_metadata_with_snapshot_v1_rest_json,
3016+
status_code=200,
3017+
request_headers=TEST_HEADERS,
3018+
)
3019+
rest_mock.post(
3020+
f"{TEST_URI}v1/namespaces/fokko/tables/fokko2",
3021+
json=example_table_metadata_no_snapshot_v1_rest_json,
3022+
status_code=200,
3023+
request_headers=TEST_HEADERS,
3024+
)
3025+
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
3026+
3027+
# Use the exact same schema as the existing table (id: int, data: string)
3028+
same_schema = Schema(
3029+
NestedField(field_id=1, name="id", field_type=IntegerType(), required=False),
3030+
NestedField(field_id=2, name="data", field_type=StringType(), required=False),
3031+
)
3032+
txn = catalog.replace_table_transaction(
3033+
identifier=("fokko", "fokko2"),
3034+
schema=same_schema,
3035+
)
3036+
txn.commit_transaction()
3037+
3038+
updates = rest_mock.last_request.json()["updates"]
3039+
actions = [u["action"] for u in updates]
3040+
3041+
# Since the schema is unchanged, add-schema and set-current-schema should be skipped
3042+
assert "add-schema" not in actions
3043+
assert "set-current-schema" not in actions
3044+
3045+
# The only update should be remove-snapshot-ref
3046+
assert actions == ["remove-snapshot-ref"]
3047+
3048+
3049+
def test_replace_table_transaction_different_schema_adds_schema(
3050+
rest_mock: Mocker,
3051+
example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any],
3052+
example_table_metadata_no_snapshot_v1_rest_json: dict[str, Any],
3053+
) -> None:
3054+
"""Test that replacing with a genuinely new schema includes add-schema and set-current-schema."""
3055+
table_uuid = example_table_metadata_with_snapshot_v1_rest_json["metadata"]["table-uuid"]
3056+
example_table_metadata_no_snapshot_v1_rest_json["metadata"]["table-uuid"] = table_uuid
3057+
3058+
rest_mock.get(
3059+
f"{TEST_URI}v1/namespaces/fokko/tables/fokko2",
3060+
json=example_table_metadata_with_snapshot_v1_rest_json,
3061+
status_code=200,
3062+
request_headers=TEST_HEADERS,
3063+
)
3064+
rest_mock.post(
3065+
f"{TEST_URI}v1/namespaces/fokko/tables/fokko2",
3066+
json=example_table_metadata_no_snapshot_v1_rest_json,
3067+
status_code=200,
3068+
request_headers=TEST_HEADERS,
3069+
)
3070+
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
3071+
3072+
# A new schema with a different field (new_col instead of data)
3073+
new_schema = Schema(
3074+
NestedField(field_id=1, name="id", field_type=IntegerType(), required=False),
3075+
NestedField(field_id=3, name="new_col", field_type=BooleanType(), required=False),
3076+
)
3077+
txn = catalog.replace_table_transaction(
3078+
identifier=("fokko", "fokko2"),
3079+
schema=new_schema,
3080+
)
3081+
txn.commit_transaction()
3082+
3083+
updates = rest_mock.last_request.json()["updates"]
3084+
actions = [u["action"] for u in updates]
3085+
3086+
# Since the schema is different, add-schema and set-current-schema must be present
3087+
assert "add-schema" in actions
3088+
assert "set-current-schema" in actions
3089+
3090+
# set-current-schema should reference -1 (the last added schema)
3091+
set_schema = next(u for u in updates if u["action"] == "set-current-schema")
3092+
assert set_schema["schema-id"] == -1
3093+
3094+
3095+
def test_replace_table_transaction_with_sort_order(
3096+
rest_mock: Mocker,
3097+
example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any],
3098+
example_table_metadata_no_snapshot_v1_rest_json: dict[str, Any],
3099+
) -> None:
3100+
"""Test that replacing with a custom sort order includes add-sort-order and set-default-sort-order."""
3101+
table_uuid = example_table_metadata_with_snapshot_v1_rest_json["metadata"]["table-uuid"]
3102+
example_table_metadata_no_snapshot_v1_rest_json["metadata"]["table-uuid"] = table_uuid
3103+
3104+
rest_mock.get(
3105+
f"{TEST_URI}v1/namespaces/fokko/tables/fokko2",
3106+
json=example_table_metadata_with_snapshot_v1_rest_json,
3107+
status_code=200,
3108+
request_headers=TEST_HEADERS,
3109+
)
3110+
rest_mock.post(
3111+
f"{TEST_URI}v1/namespaces/fokko/tables/fokko2",
3112+
json=example_table_metadata_no_snapshot_v1_rest_json,
3113+
status_code=200,
3114+
request_headers=TEST_HEADERS,
3115+
)
3116+
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
3117+
3118+
txn = catalog.replace_table_transaction(
3119+
identifier=("fokko", "fokko2"),
3120+
schema=Schema(
3121+
NestedField(field_id=1, name="id", field_type=IntegerType(), required=False),
3122+
NestedField(field_id=2, name="data", field_type=StringType(), required=False),
3123+
),
3124+
sort_order=SortOrder(SortField(source_id=1, transform=IdentityTransform())),
3125+
)
3126+
txn.commit_transaction()
3127+
3128+
updates = rest_mock.last_request.json()["updates"]
3129+
actions = [u["action"] for u in updates]
3130+
3131+
# Should include add-sort-order and set-default-sort-order
3132+
assert "add-sort-order" in actions
3133+
assert "set-default-sort-order" in actions

0 commit comments

Comments
 (0)