fix(occ): RowDelta uses partition-scoped conflict check instead of AlwaysTrue#983
fix(occ): RowDelta uses partition-scoped conflict check instead of AlwaysTrue#983mzzz-zzm wants to merge 1 commit intoapache:mainfrom
Conversation
laskoviymishka
left a comment
There was a problem hiding this comment.
Thanks for picking this up — the diagnosis on #978 looks right, and partition-scoped validation is definitely the right direction.
I’d like to hold this version before merge though, mostly because the new validator may be a bit too narrow today. The old AlwaysTrue path was conservative, but safe. This version can miss conflicts under partition-spec evolution, and possibly for partition values like UUID, decimal, binary, or fixed.
The good news is I think this can be fixed without changing the overall direction. The codebase already has validateAddedDataFilesMatchingFilter, which handles per-spec projection, manifest pruning, and type-aware partition evaluation. Could we express the equality-delete partitions as an OR-of-equalities filter and route through that helper? That should avoid the string-key comparison and reuse the existing validation path.
A few things I’d love to see before merge:
- add the #978 reproducer as a regression test
- add same-partition reject / different-partition allow tests
- add one UUID or decimal partition test
- add a partition-spec evolution test
- update the
RowDelta.validatecomments, since they still describe the old conservative behavior - split out the unrelated REST/S3/ancestry changes, so this PR stays focused
The empty-base ancestry fix looks useful too, just probably deserves its own PR.
Overall, I think this is the right line of work — I’d just rather make the new scoped validator as safe as the conservative one before we land it.
| sort.Ints(keys) | ||
| var buf []byte | ||
| for _, k := range keys { | ||
| buf = fmt.Appendf(buf, "%d=%v;", k, p[k]) |
There was a problem hiding this comment.
fmt.Appendf("%d=%v;", k, p[k]) is pretty fragile as an equality oracle here. map[int]any partition values can land as several Go types depending on construction path — uuid.UUID (= [16]byte) vs raw [16]byte, iceberg.DecimalLiteral vs *big.Rat, time.Time with monotonic-clock suffix, etc., see convertAvroValueToIcebergType in manifest.go:1756. Same logical value, different %v, missed match. Also = and ; aren't escaped, so a string value "1=a;2=b" collides with a different two-field tuple.
A silent miss here is worse than the bug being fixed — we'd accept a commit Java would reject. I'd either build an Or(And(...)) partition expression and call the existing validateAddedDataFilesMatchingFilter (which uses iceberg.Literal comparison and handles all of these), or normalize each known type to a canonical hashable form before keying.
| if e.Status() != iceberg.EntryStatusADDED || e.SnapshotID() != snap.SnapshotID { | ||
| continue | ||
| } | ||
| if _, ok := partSet[partitionTupleKey(e.DataFile().Partition())]; ok { |
There was a problem hiding this comment.
The lookup compares only (field_id, value) and never consults mf.PartitionSpecID(). After a partition-spec evolution (renamed field, identity → bucket, day → hour), the same logical row gets written under a different tuple — eq-delete bound to spec A might be {1: "hot"} while concurrent data under spec B is {1: 3} (bucketed) or {2: "hot"} (renamed). Tuples never match, real conflict missed.
The sibling helper at lines 343-434 handles this via buildPartitionProjection(specID, ...) keyed per spec id. I'd at minimum key on (specID, tuple) and use mf.PartitionSpecID() plus the eq-delete file's SpecID(). Routing through validateAddedDataFilesMatchingFilter with an OR-of-equalities filter gets it for free.
| // If any eq-delete file is unpartitioned (empty tuple), the delete | ||
| // could affect any row — fall back to the conservative AlwaysTrue check. | ||
| for _, p := range eqDeletePartitions { | ||
| if len(p) == 0 { |
There was a problem hiding this comment.
I'd hoist this fallback to the caller. The function name says "InPartitions", but a single empty input element silently turns it into AlwaysTrue across the whole table — leaky. Also "empty per-file partition tuple" is a noisy proxy for "unpartitioned table": an eq-delete file with an unset partition map on a partitioned table (easy to do via NewDataFileBuilder without partition data) silently nullifies the optimization. The right signal is the spec itself.
Move the decision into RowDelta.validate: if the table's only spec is unpartitioned, call the AlwaysTrue version directly; otherwise the partition-scoped one. Both paths become explicit.
| continue | ||
| } | ||
| if _, ok := partSet[partitionTupleKey(e.DataFile().Partition())]; ok { | ||
| return fmt.Errorf("%w: snapshot %d added data file %s in eq-delete partition", |
There was a problem hiding this comment.
The sibling on line 404 is "snapshot %d added data file %s matching filter %s" — includes the filter that triggered the match. This one says "in eq-delete partition" without saying which one. With multiple eq-delete files spanning multiple partitions, an operator triaging this can't tell which one conflicted without reading manifests by hand.
return fmt.Errorf("%w: snapshot %d added data file %s in partition %v overlapping eq-delete",
ErrConflictingDataFiles, snap.SnapshotID, e.DataFile().FilePath(), e.DataFile().Partition())| // Empty partition tuple → unpartitioned eq-delete → must fall back to AlwaysTrue. | ||
| // With no concurrent snapshots in ctx the AlwaysTrue path returns nil (no-op). | ||
| emptyPartition := map[int]any{} | ||
| require.NoError(t, validateNoConflictingDataFilesInPartitions(ctx, []map[int]any{emptyPartition}, IsolationSerializable)) |
There was a problem hiding this comment.
This test doesn't actually exercise the fallback. newConflictContext(meta, meta, ...) produces ctx.concurrent = [] because base and current point at the same head, so the validator short-circuits at len(ctx.concurrent) == 0 before reaching the empty-tuple loop. The comment in the test admits it: "With no concurrent snapshots in ctx the AlwaysTrue path returns nil (no-op)" — it'd still pass if we deleted the fallback entirely.
I'd build a context with a real concurrent snapshot (mirror TestNewConflictContext_WriterHasNoBranchView — base = newConflictTestMetadata(t, nil), current = newConflictTestMetadata(t, &head)), then assert the fallback is reached, either by injecting a fake iceio.IO and observing the manifest fetch, or by an end-to-end fixture where AlwaysTrue would correctly flag a conflict the partition-scoped check would miss.
| // With no concurrent snapshots in ctx the AlwaysTrue path returns nil (no-op). | ||
| emptyPartition := map[int]any{} | ||
| require.NoError(t, validateNoConflictingDataFilesInPartitions(ctx, []map[int]any{emptyPartition}, IsolationSerializable)) | ||
| } |
There was a problem hiding this comment.
The #978 reproducer (TestBugRepro_RowDeltaFalseConflictDifferentPartition from the issue body) is the test that would fail on main and pass on this branch — it's the regression guard for the actual contract of this PR, and it isn't here.
A few I'd want at minimum:
- the Bug(table): RowDelta.validate uses AlwaysTrue filter — false conflicts for concurrent appends to different partitions #978 reproducer as-is — different-partition allowed
- same-partition rejected — without this, a future change to
partitionTupleKeycould silently degrade the validator into a no-op with no signal - UUID or decimal partition column, same-partition rejected — would fail today against
%vkeying, that's the point - spec evolution — eq-delete bound to spec A, concurrent data under spec B (renamed field or transform change), will fail today
- a genuinely-unpartitioned table replacing the trivial-pass test above
The harness exists — conflict_validation_test.go builds metadata with concurrent snapshots and row_delta_test.go builds eq-delete files. They just need to be wired together.
| // falls back to AlwaysTrue — the equality delete could affect any row. | ||
| // | ||
| // Under IsolationSnapshot this validator is a no-op. | ||
| func validateNoConflictingDataFilesInPartitions(ctx *conflictContext, eqDeletePartitions []map[int]any, level IsolationLevel) error { |
There was a problem hiding this comment.
Bigger-picture: I'd consider folding this into validateAddedDataFilesMatchingFilter rather than adding a sibling. Derive a BooleanExpression from eqDeletePartitions like Or(And(field_a == v_a, field_b == v_b), ...) and pass it to validateNoConflictingDataFiles(ctx, filter, level). That helper already does per-spec projection (spec-evolution correct), manifest-summary pruning (no full-manifest scan when summaries can't match), and type-aware partition evaluation via iceberg.Literal (no UUID/decimal/binary issues).
Resolves the comments on partitionTupleKey and spec-id awareness at the same time, and matches the pattern documented in this file's preamble ("there is one code path that pruning semantics flow through") and Java's MergingSnapshotProducer.validateNoNewDataFiles.
3971efe to
a414dce
Compare
…idate Replace partitionTupleKey + validateNoConflictingDataFilesInPartitions (old) with eqDeletePartitionsToFilter which builds an OR(AND(EqualTo(...))) BooleanExpression from the eq-delete DataFiles and routes it through validateNoConflictingDataFiles -> validateAddedDataFilesMatchingFilter. Benefits over the removed partitionTupleKey approach: - Type-safe: anyToLiteral uses a type switch over all iceberg.LiteralType values (bool, int32/64, float32/64, string, []byte, Date, Time, Timestamp, TimestampNano, Decimal, uuid.UUID) -- no fmt.Sprintf instability for UUID/decimal/binary partitions. - Spec-evolution correct: the filter is projected per each concurrent manifest's PartitionSpec via buildPartitionProjection, so renamed fields and transform changes do not produce false conflicts. - No key-injection: avoids the =;-separator collision risk in string-keyed maps. - Unpartitioned tables: RowDelta.validate checks PartitionSpec.NumFields() == 0 and passes AlwaysTrue directly; eqDeletePartitionsToFilter is only called for partitioned tables. Updates RowDelta.validate doc comment to describe the partition-scoped behavior. Adds 9 regression and validation tests: - anyToLiteral for all supported types and an unsupported type - Short-circuit under SnapshotIsolation and empty inputs - apache#978 reproducer: different-partition concurrent append is not rejected - Same-partition concurrent append IS rejected - UUID partition: same rejected, different allowed - Unpartitioned table AlwaysTrue fallback detects any concurrent append Fixes apache#978
a414dce to
2af6a47
Compare
Fixes #978
Problem
RowDelta.validatepassesiceberg.AlwaysTrue{}tovalidateNoConflictingDataFileswhenever equality-delete files are present. This means any concurrent append to any partition is treated as a conflict, even when it lands in a completely different partition from the equality-deletes. Under serializable isolation this causes spuriousErrConflictingDataFileserrors for workloads that write to multiple independent partitions concurrently.Fix
RowDeltanow collects the partition tuples of all equality-delete files it adds (eqDeletePartitions). A new validator,validateNoConflictingDataFilesInPartitions, checks concurrent data files only in those specific partition tuples:AlwaysTruecheck, preserving existing safety.Files changed
table/row_delta.go: collecteqDeletePartitionsinstead ofhasEqDeletestable/conflict_validation.go:validateNoConflictingDataFilesInPartitions+partitionTupleKeytable/partition_conflict_test.go: unit tests for both new functions