Skip to content

Commit d939b67

Browse files
committed
test: update test_replace_internally to add sequence number check along with test_replace_multiple_files, test_replace_partitioned_table, and test_replace_no_op_on_non_empty_table
1 parent c8162a8 commit d939b67

1 file changed

Lines changed: 179 additions & 0 deletions

File tree

tests/table/test_replace.py

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ def test_replace_internally(catalog: Catalog) -> None:
141141
assert len(existing_entries) == 1
142142
assert existing_entries[0].data_file.file_path == file_to_keep.file_path
143143
assert existing_entries[0].snapshot_id == old_snapshot_id
144+
assert existing_entries[0].sequence_number == old_sequence_number
144145

145146

146147
def test_replace_reuses_unaffected_manifests(catalog: Catalog) -> None:
@@ -456,3 +457,181 @@ def test_replace_passes_through_delete_manifests(catalog: Catalog) -> None:
456457
manifest_paths_after = [m.manifest_path for m in manifests_after]
457458

458459
assert delete_manifest_path in manifest_paths_after
460+
461+
462+
def test_replace_multiple_files(catalog: Catalog) -> None:
463+
# Setup a basic table
464+
catalog.create_namespace("default")
465+
table = catalog.create_table(
466+
identifier="default.test_replace_multiple",
467+
schema=Schema(),
468+
)
469+
470+
file_1 = DataFile.from_args(
471+
file_path="s3://bucket/test/data/1.parquet",
472+
file_format=FileFormat.PARQUET,
473+
partition=Record(),
474+
record_count=100,
475+
file_size_in_bytes=1024,
476+
content=DataFileContent.DATA,
477+
)
478+
file_1.spec_id = 0
479+
480+
file_2 = DataFile.from_args(
481+
file_path="s3://bucket/test/data/2.parquet",
482+
file_format=FileFormat.PARQUET,
483+
partition=Record(),
484+
record_count=100,
485+
file_size_in_bytes=1024,
486+
content=DataFileContent.DATA,
487+
)
488+
file_2.spec_id = 0
489+
490+
file_1_new = DataFile.from_args(
491+
file_path="s3://bucket/test/data/1_new.parquet",
492+
file_format=FileFormat.PARQUET,
493+
partition=Record(),
494+
record_count=50,
495+
file_size_in_bytes=512,
496+
content=DataFileContent.DATA,
497+
)
498+
file_1_new.spec_id = 0
499+
500+
file_2_new = DataFile.from_args(
501+
file_path="s3://bucket/test/data/2_new.parquet",
502+
file_format=FileFormat.PARQUET,
503+
partition=Record(),
504+
record_count=50,
505+
file_size_in_bytes=512,
506+
content=DataFileContent.DATA,
507+
)
508+
file_2_new.spec_id = 0
509+
510+
# Append initial files
511+
with table.transaction() as tx:
512+
with tx.update_snapshot().fast_append() as append_snapshot:
513+
append_snapshot.append_data_file(file_1)
514+
append_snapshot.append_data_file(file_2)
515+
516+
# Replace both files with new ones
517+
with table.transaction() as tx:
518+
with tx.update_snapshot().replace() as rewrite:
519+
rewrite.delete_data_file(file_1)
520+
rewrite.delete_data_file(file_2)
521+
rewrite.append_data_file(file_1_new)
522+
rewrite.append_data_file(file_2_new)
523+
524+
snapshot = cast(Snapshot, table.current_snapshot())
525+
summary = cast(Summary, snapshot.summary)
526+
527+
assert summary["added-data-files"] == "2"
528+
assert summary["deleted-data-files"] == "2"
529+
assert summary["added-records"] == "100"
530+
assert summary["deleted-records"] == "200"
531+
assert summary["total-records"] == "100"
532+
533+
534+
def test_replace_partitioned_table(catalog: Catalog) -> None:
535+
from pyiceberg.partitioning import PartitionField, PartitionSpec
536+
from pyiceberg.transforms import IdentityTransform
537+
from pyiceberg.types import IntegerType, NestedField, StringType
538+
539+
# Setup a partitioned table
540+
catalog.create_namespace("default")
541+
schema = Schema(
542+
NestedField(field_id=1, name="id", field_type=IntegerType(), required=True),
543+
NestedField(field_id=2, name="data", field_type=StringType(), required=True),
544+
)
545+
spec = PartitionSpec(PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="id"))
546+
table = catalog.create_table(
547+
identifier="default.test_replace_partitioned",
548+
schema=schema,
549+
partition_spec=spec,
550+
)
551+
552+
# File in partition id=1
553+
file_part1 = DataFile.from_args(
554+
file_path="s3://bucket/test/data/part1.parquet",
555+
file_format=FileFormat.PARQUET,
556+
partition=Record(1),
557+
record_count=100,
558+
file_size_in_bytes=1024,
559+
content=DataFileContent.DATA,
560+
)
561+
file_part1.spec_id = table.spec().spec_id
562+
563+
# File in partition id=2
564+
file_part2 = DataFile.from_args(
565+
file_path="s3://bucket/test/data/part2.parquet",
566+
file_format=FileFormat.PARQUET,
567+
partition=Record(2),
568+
record_count=100,
569+
file_size_in_bytes=1024,
570+
content=DataFileContent.DATA,
571+
)
572+
file_part2.spec_id = table.spec().spec_id
573+
574+
# Add initial files
575+
with table.transaction() as tx:
576+
with tx.update_snapshot().fast_append() as append_snapshot:
577+
append_snapshot.append_data_file(file_part1)
578+
append_snapshot.append_data_file(file_part2)
579+
580+
# Replace file in partition 1
581+
file_part1_new = DataFile.from_args(
582+
file_path="s3://bucket/test/data/part1_new.parquet",
583+
file_format=FileFormat.PARQUET,
584+
partition=Record(1),
585+
record_count=50,
586+
file_size_in_bytes=512,
587+
content=DataFileContent.DATA,
588+
)
589+
file_part1_new.spec_id = table.spec().spec_id
590+
591+
with table.transaction() as tx:
592+
with tx.update_snapshot().replace() as rewrite:
593+
rewrite.delete_data_file(file_part1)
594+
rewrite.append_data_file(file_part1_new)
595+
596+
snapshot = cast(Snapshot, table.current_snapshot())
597+
summary = cast(Summary, snapshot.summary)
598+
599+
assert summary["added-data-files"] == "1"
600+
assert summary["deleted-data-files"] == "1"
601+
assert summary["total-records"] == "150"
602+
603+
604+
def test_replace_no_op_on_non_empty_table(catalog: Catalog) -> None:
605+
# Setup a basic table
606+
catalog.create_namespace("default")
607+
table = catalog.create_table(
608+
identifier="default.test_replace_noop_nonempty",
609+
schema=Schema(),
610+
)
611+
612+
file_a = DataFile.from_args(
613+
file_path="s3://bucket/test/data/a.parquet",
614+
file_format=FileFormat.PARQUET,
615+
partition=Record(),
616+
record_count=10,
617+
file_size_in_bytes=100,
618+
content=DataFileContent.DATA,
619+
)
620+
file_a.spec_id = 0
621+
622+
# Commit 1: Append file A
623+
with table.transaction() as tx:
624+
with tx.update_snapshot().fast_append() as append_snapshot:
625+
append_snapshot.append_data_file(file_a)
626+
627+
initial_snapshot = table.current_snapshot()
628+
assert initial_snapshot is not None
629+
630+
# Perform a no-op replace
631+
with table.transaction() as tx:
632+
with tx.update_snapshot().replace():
633+
pass
634+
635+
# Successive calls to current_snapshot() should yield the same snapshot
636+
assert table.current_snapshot() == initial_snapshot
637+
assert len(table.history()) == 1

0 commit comments

Comments
 (0)