Skip to content

Commit 75ccd92

Browse files
committed
fix upsert with null values
1 parent 96e6d54 commit 75ccd92

2 files changed

Lines changed: 46 additions & 1 deletion

File tree

pyiceberg/table/upsert_util.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,16 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols
6565
# When the target table is empty, there is nothing to update :)
6666
return source_table.schema.empty_table()
6767

68-
diff_expr = functools.reduce(operator.or_, [pc.field(f"{col}-lhs") != pc.field(f"{col}-rhs") for col in non_key_cols])
68+
diff_expr = functools.reduce(
69+
operator.or_,
70+
[
71+
pc.or_kleene(
72+
pc.is_null(pc.not_equal(pc.field(f"{col}-lhs"), pc.field(f"{col}-rhs"))),
73+
pc.not_equal(pc.field(f"{col}-lhs"), pc.field(f"{col}-rhs")),
74+
)
75+
for col in non_key_cols
76+
],
77+
)
6978

7079
return (
7180
source_table

tests/table/test_upsert.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,3 +509,39 @@ def test_upsert_without_identifier_fields(catalog: Catalog) -> None:
509509
ValueError, match="Join columns could not be found, please set identifier-field-ids or pass in explicitly."
510510
):
511511
tbl.upsert(df)
512+
513+
514+
def test_upsert_with_nulls(catalog: Catalog) -> None:
515+
identifier = "default.test_upsert_with_nulls"
516+
_drop_table(catalog, identifier)
517+
518+
schema = pa.schema(
519+
[
520+
("foo", pa.string()),
521+
("bar", pa.int32()),
522+
("baz", pa.bool_()),
523+
]
524+
)
525+
526+
# create table with null value
527+
table = catalog.create_table(identifier, schema)
528+
data_with_null = pa.Table.from_pylist(
529+
[
530+
{"foo": "apple", "bar": None, "baz": False},
531+
],
532+
schema=schema,
533+
)
534+
table.append(data_with_null)
535+
assert table.scan().to_arrow()["bar"].is_null()
536+
537+
# upsert table with non-null value
538+
data_without_null = pa.Table.from_pylist(
539+
[
540+
{"foo": "apple", "bar": 7, "baz": False},
541+
],
542+
schema=schema,
543+
)
544+
upd = table.upsert(data_without_null, join_cols=["foo"])
545+
assert upd.rows_updated == 1
546+
assert upd.rows_inserted == 0
547+
assert table.scan().to_arrow() == data_without_null

0 commit comments

Comments
 (0)