Skip to content

Commit 3ddef45

Browse files
committed
Add transaction tests to catalog tests
1 parent 8013545 commit 3ddef45

1 file changed

Lines changed: 133 additions & 1 deletion

File tree

tests/integration/test_catalog.py

Lines changed: 133 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,20 @@
2626
from pyiceberg.catalog.rest import RestCatalog
2727
from pyiceberg.catalog.sql import SqlCatalog
2828
from pyiceberg.exceptions import (
29+
CommitFailedException,
2930
NamespaceAlreadyExistsError,
3031
NamespaceNotEmptyError,
3132
NoSuchNamespaceError,
3233
NoSuchTableError,
3334
TableAlreadyExistsError,
3435
)
3536
from pyiceberg.io import WAREHOUSE
36-
from pyiceberg.schema import Schema
37+
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
38+
from pyiceberg.schema import INITIAL_SCHEMA_ID, Schema
39+
from pyiceberg.table.metadata import INITIAL_SPEC_ID
40+
from pyiceberg.table.sorting import INITIAL_SORT_ORDER_ID, SortField, SortOrder
41+
from pyiceberg.transforms import IdentityTransform
42+
from pyiceberg.types import IntegerType, LongType, UUIDType
3743
from tests.conftest import clean_up
3844

3945

@@ -218,6 +224,132 @@ def test_table_exists(test_catalog: Catalog, table_schema_nested: Schema, databa
218224
assert test_catalog.table_exists((database_name, table_name)) is True
219225

220226

227+
@pytest.mark.integration
228+
@pytest.mark.parametrize("test_catalog", CATALOGS)
229+
def test_update_table_transaction(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None:
230+
identifier = (database_name, table_name)
231+
232+
test_catalog.create_namespace(database_name)
233+
table = test_catalog.create_table(identifier, test_schema)
234+
assert test_catalog.table_exists(identifier)
235+
236+
expected_schema: Schema = Schema()
237+
expected_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC
238+
239+
with table.transaction() as transaction:
240+
with transaction.update_schema() as update_schema:
241+
update_schema.add_column("new_col", IntegerType())
242+
expected_schema = update_schema._apply()
243+
244+
with transaction.update_spec() as update_spec:
245+
update_spec.add_field("new_col", IdentityTransform())
246+
expected_spec = update_spec._apply()
247+
248+
table = test_catalog.load_table(identifier)
249+
assert table.schema().as_struct() == expected_schema.as_struct()
250+
assert table.spec().fields == expected_spec.fields
251+
252+
253+
@pytest.mark.integration
254+
@pytest.mark.parametrize("test_catalog", CATALOGS)
255+
def test_update_schema_conflict(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None:
256+
if isinstance(test_catalog, HiveCatalog):
257+
pytest.skip("HiveCatalog fails in this test, need to investigate")
258+
259+
identifier = (database_name, table_name)
260+
261+
test_catalog.create_namespace(database_name)
262+
table = test_catalog.create_table(identifier, test_schema)
263+
assert test_catalog.table_exists(identifier)
264+
265+
original_update = table.update_schema().add_column("new_col", LongType())
266+
267+
# Update schema concurrently so that the original update fails
268+
concurrent_update = test_catalog.load_table(identifier).update_schema().delete_column("VendorID")
269+
expected_schema = concurrent_update._apply()
270+
concurrent_update.commit()
271+
272+
with pytest.raises(CommitFailedException):
273+
original_update.commit()
274+
275+
table = test_catalog.load_table(identifier)
276+
assert table.schema().as_struct() == expected_schema.as_struct()
277+
278+
279+
@pytest.mark.integration
280+
@pytest.mark.parametrize("test_catalog", CATALOGS)
281+
def test_create_table_transaction_simple(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None:
282+
identifier = (database_name, table_name)
283+
284+
test_catalog.create_namespace(database_name)
285+
table_transaction = test_catalog.create_table_transaction(identifier, test_schema)
286+
assert not test_catalog.table_exists(identifier)
287+
288+
table_transaction.update_schema().add_column("new_col", IntegerType()).commit()
289+
assert not test_catalog.table_exists(identifier)
290+
291+
table_transaction.commit_transaction()
292+
assert test_catalog.table_exists(identifier)
293+
294+
table = test_catalog.load_table(identifier)
295+
assert table.schema().find_type("new_col").is_primitive
296+
297+
298+
@pytest.mark.integration
299+
@pytest.mark.parametrize("test_catalog", CATALOGS)
300+
def test_create_table_transaction_multiple_schemas(
301+
test_catalog: Catalog, test_schema: Schema, test_partition_spec: PartitionSpec, table_name: str, database_name: str
302+
) -> None:
303+
identifier = (database_name, table_name)
304+
305+
test_catalog.create_namespace(database_name)
306+
table_transaction = test_catalog.create_table_transaction(
307+
identifier=identifier,
308+
schema=test_schema,
309+
partition_spec=test_partition_spec,
310+
sort_order=SortOrder(SortField(source_id=1)),
311+
)
312+
assert not test_catalog.table_exists(identifier)
313+
314+
table_transaction.update_schema().add_column("new_col", IntegerType()).commit()
315+
assert not test_catalog.table_exists(identifier)
316+
317+
table_transaction.update_schema().add_column("new_col_1", UUIDType()).commit()
318+
assert not test_catalog.table_exists(identifier)
319+
320+
table_transaction.update_spec().add_field("new_col", IdentityTransform()).commit()
321+
assert not test_catalog.table_exists(identifier)
322+
323+
# TODO: test replace sort order when available
324+
325+
expected_schema = table_transaction.update_schema()._apply()
326+
expected_spec = table_transaction.update_spec()._apply()
327+
328+
table_transaction.commit_transaction()
329+
assert test_catalog.table_exists(identifier)
330+
331+
table = test_catalog.load_table(identifier)
332+
assert table.schema().as_struct() == expected_schema.as_struct()
333+
assert table.schema().schema_id == INITIAL_SCHEMA_ID + 2
334+
assert table.spec().fields == expected_spec.fields
335+
assert table.spec().spec_id == INITIAL_SPEC_ID + 1
336+
assert table.sort_order().order_id == INITIAL_SORT_ORDER_ID
337+
338+
339+
@pytest.mark.integration
340+
@pytest.mark.parametrize("test_catalog", CATALOGS)
341+
def test_concurrent_create_transaction(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None:
342+
identifier = (database_name, table_name)
343+
344+
test_catalog.create_namespace(database_name)
345+
table = test_catalog.create_table_transaction(identifier=identifier, schema=test_schema)
346+
assert not test_catalog.table_exists(identifier)
347+
348+
test_catalog.create_table(identifier, test_schema)
349+
with pytest.raises(CommitFailedException):
350+
table.commit_transaction()
351+
352+
221353
@pytest.mark.integration
222354
@pytest.mark.parametrize("test_catalog", CATALOGS)
223355
def test_create_namespace(test_catalog: Catalog, database_name: str) -> None:

0 commit comments

Comments
 (0)