Skip to content

Commit a414dce

Browse files
author
masa hoashi
committed
fix(table): use filter-based partition conflict check in RowDelta.validate
Replace partitionTupleKey + validateNoConflictingDataFilesInPartitions (old) with eqDeletePartitionsToFilter which builds an OR(AND(EqualTo(...))) BooleanExpression from the eq-delete DataFiles and routes it through validateNoConflictingDataFiles → validateAddedDataFilesMatchingFilter. Benefits over the removed partitionTupleKey approach: - Type-safe: anyToLiteral uses a type switch over all iceberg.LiteralType values (bool, int32/64, float32/64, string, []byte, Date, Time, Timestamp, TimestampNano, Decimal, uuid.UUID) — no fmt.Sprintf instability for UUID/decimal/binary partitions. - Spec-evolution correct: the filter is projected per each concurrent manifest's PartitionSpec via buildPartitionProjection, so renamed fields and transform changes do not produce false conflicts. - No key-injection: avoids the =;-separator collision risk in string-keyed maps. - Unpartitioned tables: RowDelta.validate now explicitly checks PartitionSpec.NumFields() == 0 and passes AlwaysTrue directly, rather than relying on implicit fallback. Adds 9 regression and validation tests: - anyToLiteral for all supported types and an unsupported type - Short-circuit under SnapshotIsolation and empty inputs - #978 reproducer: different-partition concurrent append is not rejected - Same-partition concurrent append IS rejected - UUID partition: same rejected, different allowed - Unpartitioned table AlwaysTrue fallback detects any concurrent append Fixes #978
1 parent 440647e commit a414dce

3 files changed

Lines changed: 643 additions & 10 deletions

File tree

table/conflict_validation.go

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ import (
5353

5454
"github.com/apache/iceberg-go"
5555
iceio "github.com/apache/iceberg-go/io"
56+
"github.com/google/uuid"
5657
)
5758

5859
// IsolationLevel controls how strictly a commit rejects concurrent
@@ -426,6 +427,159 @@ func validateNoConflictingDataFiles(ctx *conflictContext, filter iceberg.Boolean
426427
return validateAddedDataFilesMatchingFilter(ctx, filter)
427428
}
428429

430+
// validateNoConflictingDataFilesInPartitions is like
431+
// validateNoConflictingDataFiles but scoped to the partitions touched
432+
// by equality-delete files. It builds an OR-of-equalities filter from
433+
// the provided partition tuples and delegates to
434+
// validateAddedDataFilesMatchingFilter, which performs per-spec
435+
// projection, manifest-summary pruning, and type-aware evaluation via
436+
// iceberg.Literal — making it safe for UUID, decimal, binary, fixed,
437+
// and future partition types, and correct across partition-spec
438+
// evolution because each concurrent manifest is projected against its
439+
// own spec ID.
440+
//
441+
// Callers are responsible for ensuring the table is partitioned
442+
// (i.e. at least one partition field exists) before calling this
443+
// function. For unpartitioned tables, call
444+
// validateNoConflictingDataFiles(ctx, iceberg.AlwaysTrue{}, level)
445+
// directly.
446+
//
447+
// Under IsolationSnapshot this validator is a no-op.
448+
func validateNoConflictingDataFilesInPartitions(ctx *conflictContext, eqDeleteFiles []iceberg.DataFile, level IsolationLevel) error {
449+
if level != IsolationSerializable {
450+
return nil
451+
}
452+
453+
if len(ctx.concurrent) == 0 || len(eqDeleteFiles) == 0 {
454+
return nil
455+
}
456+
457+
filter, err := eqDeletePartitionsToFilter(eqDeleteFiles, ctx.current)
458+
if err != nil {
459+
return fmt.Errorf("building partition conflict filter: %w", err)
460+
}
461+
462+
return validateNoConflictingDataFiles(ctx, filter, level)
463+
}
464+
465+
// eqDeletePartitionsToFilter converts equality-delete data files into an
466+
// OR-of-ANDs BooleanExpression in row (source) space, suitable for passing
467+
// to validateAddedDataFilesMatchingFilter.
468+
//
469+
// For each eq-delete file it resolves each partition field ID to the source
470+
// schema field name via the file's partition spec, then builds an EqualTo
471+
// predicate using Reference(sourceFieldName). Multiple fields within one
472+
// partition are AND-ed; multiple eq-delete files are OR-ed.
473+
//
474+
// The resulting expression is projected per-concurrent-manifest's spec ID
475+
// inside validateAddedDataFilesMatchingFilter (via buildPartitionProjection),
476+
// ensuring correct conflict detection even after partition-spec evolution.
477+
//
478+
// An empty partition tuple (unpartitioned delete) returns AlwaysTrue so the
479+
// caller falls back to the conservative full-table scan. Callers should
480+
// normally guard against calling this function for unpartitioned tables (see
481+
// RowDelta.validate).
482+
func eqDeletePartitionsToFilter(files []iceberg.DataFile, meta Metadata) (iceberg.BooleanExpression, error) {
483+
terms := make([]iceberg.BooleanExpression, 0, len(files))
484+
for _, f := range files {
485+
p := f.Partition()
486+
if len(p) == 0 {
487+
return iceberg.AlwaysTrue{}, nil
488+
}
489+
490+
spec := meta.PartitionSpecByID(int(f.SpecID()))
491+
if spec == nil {
492+
return nil, fmt.Errorf("partition spec ID %d not found in metadata", f.SpecID())
493+
}
494+
495+
// Build partition field ID → PartitionField lookup for this spec.
496+
partFieldByID := make(map[int]iceberg.PartitionField, spec.NumFields())
497+
for _, pf := range spec.Fields() {
498+
partFieldByID[pf.FieldID] = pf
499+
}
500+
501+
// Sort partition field IDs for deterministic expression order.
502+
fieldIDs := make([]int, 0, len(p))
503+
for id := range p {
504+
fieldIDs = append(fieldIDs, id)
505+
}
506+
sort.Ints(fieldIDs)
507+
508+
conjuncts := make([]iceberg.BooleanExpression, 0, len(p))
509+
for _, partFieldID := range fieldIDs {
510+
pf, ok := partFieldByID[partFieldID]
511+
if !ok {
512+
return nil, fmt.Errorf("partition field ID %d not found in spec %d", partFieldID, f.SpecID())
513+
}
514+
515+
// Resolve to source schema field to obtain the Reference name.
516+
sourceField, ok := meta.CurrentSchema().FindFieldByID(pf.SourceID())
517+
if !ok {
518+
return nil, fmt.Errorf("source field ID %d (partition field %q) not found in schema", pf.SourceID(), pf.Name)
519+
}
520+
521+
lit, err := anyToLiteral(p[partFieldID])
522+
if err != nil {
523+
return nil, fmt.Errorf("partition field %q: %w", sourceField.Name, err)
524+
}
525+
526+
conjuncts = append(conjuncts, iceberg.LiteralPredicate(iceberg.OpEQ, iceberg.Reference(sourceField.Name), lit))
527+
}
528+
529+
if len(conjuncts) == 1 {
530+
terms = append(terms, conjuncts[0])
531+
} else {
532+
terms = append(terms, iceberg.NewAnd(conjuncts[0], conjuncts[1], conjuncts[2:]...))
533+
}
534+
}
535+
536+
if len(terms) == 0 {
537+
return iceberg.AlwaysTrue{}, nil
538+
}
539+
540+
if len(terms) == 1 {
541+
return terms[0], nil
542+
}
543+
544+
return iceberg.NewOr(terms[0], terms[1], terms[2:]...), nil
545+
}
546+
547+
// anyToLiteral converts a dynamically-typed partition value (as
548+
// stored in iceberg.DataFile.Partition()) to an iceberg.Literal.
549+
// The supported types mirror the iceberg.LiteralType constraint.
550+
func anyToLiteral(v any) (iceberg.Literal, error) {
551+
switch val := v.(type) {
552+
case bool:
553+
return iceberg.NewLiteral(val), nil
554+
case int32:
555+
return iceberg.NewLiteral(val), nil
556+
case int64:
557+
return iceberg.NewLiteral(val), nil
558+
case float32:
559+
return iceberg.NewLiteral(val), nil
560+
case float64:
561+
return iceberg.NewLiteral(val), nil
562+
case string:
563+
return iceberg.NewLiteral(val), nil
564+
case []byte:
565+
return iceberg.NewLiteral(val), nil
566+
case iceberg.Date:
567+
return iceberg.NewLiteral(val), nil
568+
case iceberg.Time:
569+
return iceberg.NewLiteral(val), nil
570+
case iceberg.Timestamp:
571+
return iceberg.NewLiteral(val), nil
572+
case iceberg.TimestampNano:
573+
return iceberg.NewLiteral(val), nil
574+
case iceberg.Decimal:
575+
return iceberg.NewLiteral(val), nil
576+
case uuid.UUID:
577+
return iceberg.NewLiteral(val), nil
578+
default:
579+
return nil, fmt.Errorf("unsupported partition value type %T", v)
580+
}
581+
}
582+
429583
// validateNoNewDeletesForRewrittenFiles rejects the commit if any
430584
// concurrent snapshot added delete files that would be lost when the
431585
// committer's rewrite replaces a data file.

0 commit comments

Comments
 (0)