@@ -532,3 +532,39 @@ def test_transaction(catalog: Catalog) -> None:
532532 df = table .scan ().to_arrow ()
533533
534534 assert df_before_transaction == df
535+
536+
537+ def test_transaction_multiple_upserts (catalog : Catalog ) -> None :
538+ identifier = "default.test_multi_upsert"
539+ _drop_table (catalog , identifier )
540+
541+ schema = Schema (
542+ NestedField (1 , "id" , IntegerType (), required = True ),
543+ NestedField (2 , "name" , StringType (), required = True ),
544+ identifier_field_ids = [1 ],
545+ )
546+
547+ tbl = catalog .create_table (identifier , schema = schema )
548+
549+ # Define exact schema: required int32 and required string
550+ arrow_schema = pa .schema ([
551+ pa .field ("id" , pa .int32 (), nullable = False ),
552+ pa .field ("name" , pa .string (), nullable = False ),
553+ ])
554+
555+ tbl .append (pa .Table .from_pylist ([{"id" : 1 , "name" : "Alice" }], schema = arrow_schema ))
556+
557+ df = pa .Table .from_pylist ([{"id" : 2 , "name" : "Bob" }, {"id" : 1 , "name" : "Alicia" }], schema = arrow_schema )
558+
559+ with tbl .transaction () as txn :
560+ # This should read the uncommitted changes?
561+ txn .upsert (df , join_cols = ["id" ])
562+
563+ txn .upsert (df , join_cols = ["id" ])
564+
565+ result = tbl .scan ().to_arrow ().to_pylist ()
566+ assert sorted (result , key = lambda x : x ["id" ]) == [
567+ {"id" : 1 , "name" : "Alicia" },
568+ {"id" : 2 , "name" : "Bob" },
569+ ]
570+
0 commit comments