Skip to content

Commit 4d7924e

Browse files
Implementation
1 parent 7528be0 commit 4d7924e

6 files changed

Lines changed: 568 additions & 16 deletions

File tree

pyiceberg/catalog/rest/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -977,7 +977,7 @@ def replace_table_transaction(
977977

978978
iceberg_schema = self._convert_schema_if_needed(
979979
schema,
980-
int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore
980+
int(properties.get(TableProperties.FORMAT_VERSION, existing_metadata.format_version)), # type: ignore
981981
)
982982

983983
# Assign fresh schema IDs, reusing IDs from the existing schema by field name

pyiceberg/schema.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1412,7 +1412,7 @@ def assign_fresh_schema_ids_for_replace(schema: Schema, base_schema: Schema, las
14121412
14131413
Args:
14141414
schema: The new schema to assign IDs to.
1415-
base_schema: The existing table's schema (IDs are reused from here by name).
1415+
base_schema: The existing table's current schema (IDs are reused from here by name).
14161416
last_column_id: The current table's last_column_id (new IDs start above this).
14171417
14181418
Returns:

pyiceberg/table/__init__.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1031,16 +1031,19 @@ def _initial_changes(
10311031
# Remove the main branch ref to clear the current snapshot
10321032
self._updates += (RemoveSnapshotRefUpdate(ref_name=MAIN_BRANCH),)
10331033

1034-
# Set the new schema (always added since field IDs are freshly assigned)
1035-
self._updates += (
1036-
AddSchemaUpdate(schema_=new_schema),
1037-
SetCurrentSchemaUpdate(schema_id=-1),
1038-
)
1034+
# Reuse an existing schema if structurally identical (ignoring schema_id).
1035+
existing_schema_id = self._find_matching_schema_id(table_metadata, new_schema)
1036+
if existing_schema_id is not None:
1037+
if existing_schema_id != table_metadata.current_schema_id:
1038+
self._updates += (SetCurrentSchemaUpdate(schema_id=existing_schema_id),)
1039+
else:
1040+
self._updates += (
1041+
AddSchemaUpdate(schema_=new_schema),
1042+
SetCurrentSchemaUpdate(schema_id=-1),
1043+
)
10391044

1040-
# Set the new partition spec.
10411045
# Only emit AddPartitionSpecUpdate + SetDefaultSpecUpdate(-1) when the spec
1042-
# is genuinely new. If an identical spec already exists, use its concrete ID
1043-
# to avoid "no spec has been added" errors from the server.
1046+
# is new. If an identical spec already exists, use its concrete ID.
10441047
effective_spec = UNPARTITIONED_PARTITION_SPEC if new_spec.is_unpartitioned() else new_spec
10451048
existing_spec_id = self._find_matching_spec_id(table_metadata, effective_spec)
10461049
if existing_spec_id is not None:
@@ -1072,6 +1075,14 @@ def _initial_changes(
10721075
if new_properties:
10731076
self._updates += (SetPropertiesUpdate(updates=new_properties),)
10741077

1078+
@staticmethod
1079+
def _find_matching_schema_id(table_metadata: TableMetadata, schema: Schema) -> int | None:
1080+
"""Find an existing schema structurally equal to the given one, returning its schema_id or None."""
1081+
for existing in table_metadata.schemas:
1082+
if existing == schema:
1083+
return existing.schema_id
1084+
return None
1085+
10751086
@staticmethod
10761087
def _find_matching_spec_id(table_metadata: TableMetadata, spec: PartitionSpec) -> int | None:
10771088
"""Find an existing partition spec with the same fields, returning its spec_id or None."""

tests/catalog/test_rest.py

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2891,6 +2891,12 @@ def test_replace_table_transaction_with_partition_spec(
28912891
assert spec_fields[0]["transform"] == "truncate[3]"
28922892
assert spec_fields[0]["name"] == "id_trunc"
28932893

2894+
# set-default-spec should also be present, pointing to the newly added spec
2895+
actions = [u["action"] for u in updates]
2896+
assert "set-default-spec" in actions
2897+
set_default_spec = next(u for u in updates if u["action"] == "set-default-spec")
2898+
assert set_default_spec["spec-id"] == -1
2899+
28942900

28952901
def test_replace_table_404(
28962902
rest_mock: Mocker,
@@ -2993,3 +2999,135 @@ def test_replace_table_transaction_same_location_no_set_location(
29932999
actions = [u["action"] for u in updates]
29943000
# set-location should NOT be present since location didn't change
29953001
assert "set-location" not in actions
3002+
3003+
3004+
def test_replace_table_transaction_same_schema_skips_add_schema(
3005+
rest_mock: Mocker,
3006+
example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any],
3007+
example_table_metadata_no_snapshot_v1_rest_json: dict[str, Any],
3008+
) -> None:
3009+
"""Test that replacing with the same schema skips add-schema and set-current-schema."""
3010+
table_uuid = example_table_metadata_with_snapshot_v1_rest_json["metadata"]["table-uuid"]
3011+
example_table_metadata_no_snapshot_v1_rest_json["metadata"]["table-uuid"] = table_uuid
3012+
3013+
rest_mock.get(
3014+
f"{TEST_URI}v1/namespaces/fokko/tables/fokko2",
3015+
json=example_table_metadata_with_snapshot_v1_rest_json,
3016+
status_code=200,
3017+
request_headers=TEST_HEADERS,
3018+
)
3019+
rest_mock.post(
3020+
f"{TEST_URI}v1/namespaces/fokko/tables/fokko2",
3021+
json=example_table_metadata_no_snapshot_v1_rest_json,
3022+
status_code=200,
3023+
request_headers=TEST_HEADERS,
3024+
)
3025+
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
3026+
3027+
# Use the exact same schema as the existing table (id: int, data: string)
3028+
same_schema = Schema(
3029+
NestedField(field_id=1, name="id", field_type=IntegerType(), required=False),
3030+
NestedField(field_id=2, name="data", field_type=StringType(), required=False),
3031+
)
3032+
txn = catalog.replace_table_transaction(
3033+
identifier=("fokko", "fokko2"),
3034+
schema=same_schema,
3035+
)
3036+
txn.commit_transaction()
3037+
3038+
updates = rest_mock.last_request.json()["updates"]
3039+
actions = [u["action"] for u in updates]
3040+
3041+
# Since the schema is unchanged, add-schema and set-current-schema should be skipped
3042+
assert "add-schema" not in actions
3043+
assert "set-current-schema" not in actions
3044+
3045+
# The only update should be remove-snapshot-ref
3046+
assert actions == ["remove-snapshot-ref"]
3047+
3048+
3049+
def test_replace_table_transaction_different_schema_adds_schema(
3050+
rest_mock: Mocker,
3051+
example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any],
3052+
example_table_metadata_no_snapshot_v1_rest_json: dict[str, Any],
3053+
) -> None:
3054+
"""Test that replacing with a genuinely new schema includes add-schema and set-current-schema."""
3055+
table_uuid = example_table_metadata_with_snapshot_v1_rest_json["metadata"]["table-uuid"]
3056+
example_table_metadata_no_snapshot_v1_rest_json["metadata"]["table-uuid"] = table_uuid
3057+
3058+
rest_mock.get(
3059+
f"{TEST_URI}v1/namespaces/fokko/tables/fokko2",
3060+
json=example_table_metadata_with_snapshot_v1_rest_json,
3061+
status_code=200,
3062+
request_headers=TEST_HEADERS,
3063+
)
3064+
rest_mock.post(
3065+
f"{TEST_URI}v1/namespaces/fokko/tables/fokko2",
3066+
json=example_table_metadata_no_snapshot_v1_rest_json,
3067+
status_code=200,
3068+
request_headers=TEST_HEADERS,
3069+
)
3070+
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
3071+
3072+
# A new schema with a different field (new_col instead of data)
3073+
new_schema = Schema(
3074+
NestedField(field_id=1, name="id", field_type=IntegerType(), required=False),
3075+
NestedField(field_id=3, name="new_col", field_type=BooleanType(), required=False),
3076+
)
3077+
txn = catalog.replace_table_transaction(
3078+
identifier=("fokko", "fokko2"),
3079+
schema=new_schema,
3080+
)
3081+
txn.commit_transaction()
3082+
3083+
updates = rest_mock.last_request.json()["updates"]
3084+
actions = [u["action"] for u in updates]
3085+
3086+
# Since the schema is different, add-schema and set-current-schema must be present
3087+
assert "add-schema" in actions
3088+
assert "set-current-schema" in actions
3089+
3090+
# set-current-schema should reference -1 (the last added schema)
3091+
set_schema = next(u for u in updates if u["action"] == "set-current-schema")
3092+
assert set_schema["schema-id"] == -1
3093+
3094+
3095+
def test_replace_table_transaction_with_sort_order(
3096+
rest_mock: Mocker,
3097+
example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any],
3098+
example_table_metadata_no_snapshot_v1_rest_json: dict[str, Any],
3099+
) -> None:
3100+
"""Test that replacing with a custom sort order includes add-sort-order and set-default-sort-order."""
3101+
table_uuid = example_table_metadata_with_snapshot_v1_rest_json["metadata"]["table-uuid"]
3102+
example_table_metadata_no_snapshot_v1_rest_json["metadata"]["table-uuid"] = table_uuid
3103+
3104+
rest_mock.get(
3105+
f"{TEST_URI}v1/namespaces/fokko/tables/fokko2",
3106+
json=example_table_metadata_with_snapshot_v1_rest_json,
3107+
status_code=200,
3108+
request_headers=TEST_HEADERS,
3109+
)
3110+
rest_mock.post(
3111+
f"{TEST_URI}v1/namespaces/fokko/tables/fokko2",
3112+
json=example_table_metadata_no_snapshot_v1_rest_json,
3113+
status_code=200,
3114+
request_headers=TEST_HEADERS,
3115+
)
3116+
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
3117+
3118+
txn = catalog.replace_table_transaction(
3119+
identifier=("fokko", "fokko2"),
3120+
schema=Schema(
3121+
NestedField(field_id=1, name="id", field_type=IntegerType(), required=False),
3122+
NestedField(field_id=2, name="data", field_type=StringType(), required=False),
3123+
),
3124+
sort_order=SortOrder(SortField(source_id=1, transform=IdentityTransform())),
3125+
)
3126+
txn.commit_transaction()
3127+
3128+
updates = rest_mock.last_request.json()["updates"]
3129+
actions = [u["action"] for u in updates]
3130+
3131+
# Should include add-sort-order and set-default-sort-order
3132+
assert "add-sort-order" in actions
3133+
assert "set-default-sort-order" in actions

0 commit comments

Comments
 (0)