feat: Add merge() to Table and Transaction#7
Conversation
f474c1b to
560dad4
Compare
How it worksBefore:
|
| account | date |
|---|---|
| A1 | 2025-01-01 |
| A2 | 2025-01-02 |
create_match_filter produces:
Or(
And(EqualTo("account", "A1"), EqualTo("date", "2025-01-01")),
And(EqualTo("account", "A2"), EqualTo("date", "2025-01-02")),
)
One And branch per source row, one EqualTo leaf per join column inside each branch, all wrapped in a single top-level Or. So 5,000 source rows with 4 join columns produces 5,000 And branches × 4 EqualTo leaves = 20,000 leaf nodes under one giant Or. Manifest evaluation walks this tree for every data file. At small source sizes the cost is negligible. At moderate sizes evaluation gets slow. At production sizes the process is killed during manifest pruning before it can finish.
After: merge()
Same copy-on-write strategy as Spark's MERGE INTO, with a smaller filter. For the same two-row example as above, merge() builds:
And(
In("account", ["A1", "A2"]),
In("date", ["2025-01-01", "2025-01-02"]),
)
One In per join column listing that column's distinct values, combined with a single And. The full algorithm:
- Per-column
Infilters for file pruning - O(sum of cardinalities) instead of O(product). The filter matches the cross product{A1, A2} × {2025-01-01, 2025-01-02}= 4 key tuples, vs the 2 actual source keys. This is fine: theInfilter is only used at the file level byplan_files()to decide which data files to read. It does not decide which rows get replaced. - Read all rows from candidate files - copy-on-write requires the full file contents, not just the matching rows. Read with
row_filter=ALWAYS_TRUEso no rows are pre-filtered. - Anti-join against source keys (this is where row-level correctness is enforced) - the anti-join uses exact key tuples from the source df, not the per-column
Inlists. For the example above, the anti-join key set is{(A1, 2025-01-01), (A2, 2025-01-02)}- exactly 2 tuples, not 4. So a target row with(A2, 2025-01-01)is kept (its key is not in the source tuple set), even though the file containing it was a candidate. Concatenated with the source produces the new file content. - Single
OVERWRITEsnapshot via_OverwriteFiles- delete the old data files, append the rewritten content, commit atomically.
The over-approximation in step 1 only costs us extra file I/O (we read some files we did not strictly need to). It never causes wrong rows to be replaced, because step 3 always re-checks against the exact source tuples. Final table state is identical to what create_match_filter + overwrite() would have produced.
For the 5,000-row × 4-column case: the In filter has 4 nodes total (one per column), each holding up to 5,000 values. Compare to create_match_filter's 20,000+ tree nodes.
Benchmark
End-to-end: filter construction + file I/O + snapshot commit. Target: 25,200 rows (252 partitions x 100 rows).
| Approach | Source rows | Time |
|---|---|---|
create_match_filter + overwrite() |
100 | 243 ms |
create_match_filter + overwrite() |
5,000 | killed |
merge() |
100 | 76 ms |
merge() |
5,000 | 64 ms |
At 5,000 source rows, create_match_filter builds a 20,000-node Or(And(EqualTo, ...)) tree and the process gets killed during manifest evaluation. merge() handles the same input in 64 ms because the per-column In filter is O(sum) not O(product).
For reference, create_match_filter alone (without the overwrite step) takes 242 ms and 6.7 MB at 5,000 source rows - the cost is in the filter, not the I/O.
Benchmark tests live in tests/benchmark/test_merge_filter.py.
a5c7ee2 to
e7d36ef
Compare
| # against deduplicated target keys), then check for duplicates. | ||
| if check_duplicate_keys: | ||
| unique_target_keys = target_data.select(join_cols).group_by(join_cols).aggregate([]) | ||
| source_matching_target = source_keys.join(unique_target_keys, keys=join_cols, join_type="left semi") |
There was a problem hiding this comment.
I'm not sure about the semantics of this. It's OK to have duplicate keys if you're adding new rows, but not if you're replacing rows? It seems like an unnecessary filter.
There was a problem hiding this comment.
Okay think I fixed this now to only validate the incoming df to check uniqueness on join cols.
93651f2 to
684ced7
Compare
Atomic delete-insert merge by join columns using per-column In filters for file pruning and in-memory anti-join for row-level correctness, committed as a single OVERWRITE snapshot. Unlike upsert(), does not enforce uniqueness on source or target.
296d242 to
ea9065e
Compare
Context
Our ETL pipelines use a delete-insert merge pattern on Iceberg tables - delete existing rows by key, insert new rows. Today this requires separate
table.delete()+table.append()calls (two Iceberg commits). A crash between them causes data loss.upsert()doesn't fit because it enforces uniqueness on both source and target. Our tables have multiple rows per key combination by design.What
Atomic delete-insert by join columns in a single
OVERWRITEsnapshot. Uses per-columnInfilters for file pruning (O(sum) not O(product)), reads all rows from candidate files (copy-on-write), anti-joins against source keys, and commits via_OverwriteFiles.check_duplicate_keysOptional source-side data quality check:
Raises if the source df has duplicate key tuples. This is a data quality guard -
merge()produces correct results with duplicate keys (deletes all matching target rows, inserts all source rows). The flag catches upstream bugs where the source accidentally contains duplicates.This is NOT the same as Spark's
write.merge.cardinality-check. Spark's check prevents ambiguous row-level updates (WHEN MATCHED THEN UPDATE) where one target row would receive conflicting values from multiple source rows. Ourmerge()does delete-insert, not update-in-place - there's no ambiguity to resolve, so Spark's check doesn't apply.Comparison
upsert()merge()MERGE INTOcheck_duplicate_keys=True)How other engines handle this
MERGE INTOsupports multiple clause types (UPDATE, DELETE, INSERT) with per-clause conditions. The cardinality check only applies toWHEN MATCHED THEN UPDATEto prevent ambiguous updates. Does not apply toWHEN NOT MATCHED THEN INSERT. Ourmerge()is the delete-insert equivalent - no UPDATE path, so no cardinality ambiguity.RowDelta. No uniqueness enforced (Flink Writes).