Skip to content

feat: Add merge() to Table and Transaction#7

Open
AyushPatel101 wants to merge 1 commit intodevelopfrom
feat/merge-rows
Open

feat: Add merge() to Table and Transaction#7
AyushPatel101 wants to merge 1 commit intodevelopfrom
feat/merge-rows

Conversation

@AyushPatel101
Copy link
Copy Markdown
Collaborator

@AyushPatel101 AyushPatel101 commented Apr 28, 2026

Context

Our ETL pipelines use a delete-insert merge pattern on Iceberg tables - delete existing rows by key, insert new rows. Today this requires separate table.delete() + table.append() calls (two Iceberg commits). A crash between them causes data loss.

upsert() doesn't fit because it enforces uniqueness on both source and target. Our tables have multiple rows per key combination by design.

What

table.merge(df, join_cols=["date_id", "account_number"])

Atomic delete-insert by join columns in a single OVERWRITE snapshot. Uses per-column In filters for file pruning (O(sum) not O(product)), reads all rows from candidate files (copy-on-write), anti-joins against source keys, and commits via _OverwriteFiles.

check_duplicate_keys

Optional source-side data quality check:

table.merge(df, join_cols=["date_id"], check_duplicate_keys=True)

Raises if the source df has duplicate key tuples. This is a data quality guard - merge() produces correct results with duplicate keys (deletes all matching target rows, inserts all source rows). The flag catches upstream bugs where the source accidentally contains duplicates.

This is NOT the same as Spark's write.merge.cardinality-check. Spark's check prevents ambiguous row-level updates (WHEN MATCHED THEN UPDATE) where one target row would receive conflicting values from multiple source rows. Our merge() does delete-insert, not update-in-place - there's no ambiguity to resolve, so Spark's check doesn't apply.

Comparison

upsert() merge() Spark MERGE INTO
Source duplicates Always rejected Allowed (unless check_duplicate_keys=True) Configurable (only for UPDATE, not INSERT)
Target duplicates Always rejected Allowed Allowed
Row comparison Yes (skips unchanged) No (replaces all matching) Depends on WHEN clauses
Snapshots Up to 3 1 (single OVERWRITE) 1

How other engines handle this

  • Spark: MERGE INTO supports multiple clause types (UPDATE, DELETE, INSERT) with per-clause conditions. The cardinality check only applies to WHEN MATCHED THEN UPDATE to prevent ambiguous updates. Does not apply to WHEN NOT MATCHED THEN INSERT. Our merge() is the delete-insert equivalent - no UPDATE path, so no cardinality ambiguity.
  • Flink: Equality delete files + data files via RowDelta. No uniqueness enforced (Flink Writes).
  • Iceberg spec: "uniqueness of rows by this identifier is not guaranteed or required by Iceberg" (spec).

@AyushPatel101 AyushPatel101 self-assigned this Apr 28, 2026
Comment thread pyiceberg/table/__init__.py Outdated
@AyushPatel101 AyushPatel101 force-pushed the feat/merge-rows branch 13 times, most recently from f474c1b to 560dad4 Compare April 29, 2026 17:34
@AyushPatel101
Copy link
Copy Markdown
Collaborator Author

AyushPatel101 commented Apr 29, 2026

How it works

Before: create_match_filter + overwrite()

Pyiceberg already had a way to do delete-insert by key, used internally by upsert():

  1. Call create_match_filter(df, join_cols) to build a filter expression that matches every key tuple in the source.
  2. Pass that filter to overwrite(), which deletes matching target rows and inserts the source.

The filter shape is the problem. For a source df with two rows and join_cols=["account", "date"]:

account date
A1 2025-01-01
A2 2025-01-02

create_match_filter produces:

Or(
    And(EqualTo("account", "A1"), EqualTo("date", "2025-01-01")),
    And(EqualTo("account", "A2"), EqualTo("date", "2025-01-02")),
)

One And branch per source row, one EqualTo leaf per join column inside each branch, all wrapped in a single top-level Or. So 5,000 source rows with 4 join columns produces 5,000 And branches × 4 EqualTo leaves = 20,000 leaf nodes under one giant Or. Manifest evaluation walks this tree for every data file. At small source sizes the cost is negligible. At moderate sizes evaluation gets slow. At production sizes the process is killed during manifest pruning before it can finish.

After: merge()

Same copy-on-write strategy as Spark's MERGE INTO, with a smaller filter. For the same two-row example as above, merge() builds:

And(
    In("account", ["A1", "A2"]),
    In("date", ["2025-01-01", "2025-01-02"]),
)

One In per join column listing that column's distinct values, combined with a single And. The full algorithm:

  1. Per-column In filters for file pruning - O(sum of cardinalities) instead of O(product). The filter matches the cross product {A1, A2} × {2025-01-01, 2025-01-02} = 4 key tuples, vs the 2 actual source keys. This is fine: the In filter is only used at the file level by plan_files() to decide which data files to read. It does not decide which rows get replaced.
  2. Read all rows from candidate files - copy-on-write requires the full file contents, not just the matching rows. Read with row_filter=ALWAYS_TRUE so no rows are pre-filtered.
  3. Anti-join against source keys (this is where row-level correctness is enforced) - the anti-join uses exact key tuples from the source df, not the per-column In lists. For the example above, the anti-join key set is {(A1, 2025-01-01), (A2, 2025-01-02)} - exactly 2 tuples, not 4. So a target row with (A2, 2025-01-01) is kept (its key is not in the source tuple set), even though the file containing it was a candidate. Concatenated with the source produces the new file content.
  4. Single OVERWRITE snapshot via _OverwriteFiles - delete the old data files, append the rewritten content, commit atomically.

The over-approximation in step 1 only costs us extra file I/O (we read some files we did not strictly need to). It never causes wrong rows to be replaced, because step 3 always re-checks against the exact source tuples. Final table state is identical to what create_match_filter + overwrite() would have produced.

For the 5,000-row × 4-column case: the In filter has 4 nodes total (one per column), each holding up to 5,000 values. Compare to create_match_filter's 20,000+ tree nodes.

Benchmark

End-to-end: filter construction + file I/O + snapshot commit. Target: 25,200 rows (252 partitions x 100 rows).

Approach Source rows Time
create_match_filter + overwrite() 100 243 ms
create_match_filter + overwrite() 5,000 killed
merge() 100 76 ms
merge() 5,000 64 ms

At 5,000 source rows, create_match_filter builds a 20,000-node Or(And(EqualTo, ...)) tree and the process gets killed during manifest evaluation. merge() handles the same input in 64 ms because the per-column In filter is O(sum) not O(product).

For reference, create_match_filter alone (without the overwrite step) takes 242 ms and 6.7 MB at 5,000 source rows - the cost is in the filter, not the I/O.

Benchmark tests live in tests/benchmark/test_merge_filter.py.

@AyushPatel101 AyushPatel101 force-pushed the feat/merge-rows branch 5 times, most recently from a5c7ee2 to e7d36ef Compare April 30, 2026 20:58
Comment thread pyiceberg/table/__init__.py Outdated
# against deduplicated target keys), then check for duplicates.
if check_duplicate_keys:
unique_target_keys = target_data.select(join_cols).group_by(join_cols).aggregate([])
source_matching_target = source_keys.join(unique_target_keys, keys=join_cols, join_type="left semi")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about the semantics of this. It's OK to have duplicate keys if you're adding new rows, but not if you're replacing rows? It seems like an unnecessary filter.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that does seem unnecessary.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay think I fixed this now to only validate the incoming df to check uniqueness on join cols.

@AyushPatel101 AyushPatel101 force-pushed the feat/merge-rows branch 2 times, most recently from 93651f2 to 684ced7 Compare May 5, 2026 17:13
Atomic delete-insert merge by join columns using per-column In
filters for file pruning and in-memory anti-join for row-level
correctness, committed as a single OVERWRITE snapshot.

Unlike upsert(), does not enforce uniqueness on source or target.
@AyushPatel101 AyushPatel101 force-pushed the feat/merge-rows branch 2 times, most recently from 296d242 to ea9065e Compare May 5, 2026 22:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants