Skip to content

Commit 06af05a

Browse files
committed
Make upsert work for non-join complex column types - skip column comparison
1 parent f16f8b3 commit 06af05a

2 files changed

Lines changed: 33 additions & 4 deletions

File tree

pyiceberg/table/upsert_util.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,5 +80,30 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols
8080
# https://github.com/apache/arrow/issues/45557
8181
).cast(target_table.schema)
8282
except pa.ArrowInvalid:
83-
# When we are not able to compare, just update all rows from source table
84-
return source_table.cast(target_table.schema)
83+
# When we are not able to compare (e.g. due to unsupported types),
84+
# fall back to selecting only rows in the source table that do NOT already exist in the target.
85+
# See: https://github.com/apache/arrow/issues/35785
86+
87+
MARKER_COLUMN_NAME = "__from_target"
88+
89+
assert MARKER_COLUMN_NAME not in join_cols_set
90+
91+
# Step 1: Prepare source index with join keys and a marker
92+
# Cast to target table schema, so we can do the join
93+
source_index = source_table.cast(target_table.schema).select(join_cols_set)
94+
95+
# Step 2: Prepare target index with join keys and a marker
96+
target_index = target_table.select(join_cols_set).append_column(
97+
MARKER_COLUMN_NAME, pa.array([True] * len(target_table), pa.bool_())
98+
)
99+
100+
# Step 3: Perform a left outer join to find which rows from source exist in target
101+
joined = source_index.join(target_index, keys=list(join_cols_set), join_type="left outer")
102+
103+
# Step 4: Create a boolean mask for rows that do NOT exist in the target
104+
# i.e., where 'from_target' is null after the join
105+
to_update_mask = pc.invert(pc.is_null(joined[MARKER_COLUMN_NAME]))
106+
107+
# Step 5: Filter source table using the mask (keep only rows that should be updated),
108+
# and cast to the target schema to ensure compatibility (e.g. large_string → string)
109+
return source_table.filter(to_update_mask)

tests/table/test_upsert.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -564,7 +564,11 @@ def test_upsert_struct_field_fails_in_join(catalog: Catalog) -> None:
564564
{
565565
"id": 1,
566566
"nested_type": {"sub1": "bla1", "sub2": "bla"},
567-
}
567+
},
568+
{
569+
"id": 2,
570+
"nested_type": {"sub1": "bla1", "sub2": "bla"},
571+
},
568572
],
569573
schema=arrow_schema,
570574
)
@@ -574,4 +578,4 @@ def test_upsert_struct_field_fails_in_join(catalog: Catalog) -> None:
574578
# Row needs to be updated even tho it's not changed.
575579
# When pyarrow isn't able to compare rows, just update everything
576580
assert upd.rows_updated == 1
577-
assert upd.rows_inserted == 0
581+
assert upd.rows_inserted == 1

0 commit comments

Comments
 (0)