Skip to content

Commit 9681ec3

Browse files
committed
refactor: update the test_replace.py to use a reusable _create_dummy_data_file()
1 parent d7e89db commit 9681ec3

1 file changed

Lines changed: 50 additions & 134 deletions

File tree

tests/table/test_replace.py

Lines changed: 50 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,28 @@
3131
from pyiceberg.typedef import Record
3232

3333

34+
def _create_dummy_data_file(
35+
file_path: str,
36+
record_count: int,
37+
file_size_in_bytes: int = 1024,
38+
content: DataFileContent = DataFileContent.DATA,
39+
partition: Record | None = None,
40+
spec_id: int = 0,
41+
) -> DataFile:
42+
if partition is None:
43+
partition = Record()
44+
df = DataFile.from_args(
45+
file_path=file_path,
46+
file_format=FileFormat.PARQUET,
47+
partition=partition,
48+
record_count=record_count,
49+
file_size_in_bytes=file_size_in_bytes,
50+
content=content,
51+
)
52+
df.spec_id = spec_id
53+
return df
54+
55+
3456
def test_replace_internally(catalog: Catalog) -> None:
3557
# Setup a basic table using the catalog fixture
3658
catalog.create_namespace("default")
@@ -40,37 +62,23 @@ def test_replace_internally(catalog: Catalog) -> None:
4062
)
4163

4264
# 1. File we will delete
43-
file_to_delete = DataFile.from_args(
65+
file_to_delete = _create_dummy_data_file(
4466
file_path="s3://bucket/test/data/deleted.parquet",
45-
file_format=FileFormat.PARQUET,
46-
partition=Record(),
4767
record_count=100,
48-
file_size_in_bytes=1024,
49-
content=DataFileContent.DATA,
5068
)
51-
file_to_delete.spec_id = 0
5269

5370
# 2. File we will leave completely untouched
54-
file_to_keep = DataFile.from_args(
71+
file_to_keep = _create_dummy_data_file(
5572
file_path="s3://bucket/test/data/kept.parquet",
56-
file_format=FileFormat.PARQUET,
57-
partition=Record(),
5873
record_count=50,
5974
file_size_in_bytes=512,
60-
content=DataFileContent.DATA,
6175
)
62-
file_to_keep.spec_id = 0
6376

6477
# 3. File we are adding as a replacement
65-
file_to_add = DataFile.from_args(
78+
file_to_add = _create_dummy_data_file(
6679
file_path="s3://bucket/test/data/added.parquet",
67-
file_format=FileFormat.PARQUET,
68-
partition=Record(),
6980
record_count=100,
70-
file_size_in_bytes=1024,
71-
content=DataFileContent.DATA,
7281
)
73-
file_to_add.spec_id = 0
7482

7583
# Initially append BOTH the file to delete and the file to keep
7684
with table.transaction() as tx:
@@ -152,35 +160,23 @@ def test_replace_reuses_unaffected_manifests(catalog: Catalog) -> None:
152160
schema=Schema(),
153161
)
154162

155-
file_a = DataFile.from_args(
163+
file_a = _create_dummy_data_file(
156164
file_path="s3://bucket/test/data/a.parquet",
157-
file_format=FileFormat.PARQUET,
158-
partition=Record(),
159165
record_count=10,
160166
file_size_in_bytes=100,
161-
content=DataFileContent.DATA,
162167
)
163-
file_a.spec_id = 0
164168

165-
file_b = DataFile.from_args(
169+
file_b = _create_dummy_data_file(
166170
file_path="s3://bucket/test/data/b.parquet",
167-
file_format=FileFormat.PARQUET,
168-
partition=Record(),
169171
record_count=10,
170172
file_size_in_bytes=100,
171-
content=DataFileContent.DATA,
172173
)
173-
file_b.spec_id = 0
174174

175-
file_c = DataFile.from_args(
175+
file_c = _create_dummy_data_file(
176176
file_path="s3://bucket/test/data/c.parquet",
177-
file_format=FileFormat.PARQUET,
178-
partition=Record(),
179177
record_count=10,
180178
file_size_in_bytes=100,
181-
content=DataFileContent.DATA,
182179
)
183-
file_c.spec_id = 0
184180

185181
# Commit 1: Append file A (Creates Manifest 1)
186182
with table.transaction() as tx:
@@ -260,25 +256,15 @@ def test_replace_missing_file_abort(catalog: Catalog) -> None:
260256
schema=Schema(),
261257
)
262258

263-
fake_data_file = DataFile.from_args(
259+
fake_data_file = _create_dummy_data_file(
264260
file_path="s3://bucket/test/data/does_not_exist.parquet",
265-
file_format=FileFormat.PARQUET,
266-
partition=Record(),
267261
record_count=100,
268-
file_size_in_bytes=1024,
269-
content=DataFileContent.DATA,
270262
)
271-
fake_data_file.spec_id = 0
272263

273-
new_data_file = DataFile.from_args(
264+
new_data_file = _create_dummy_data_file(
274265
file_path="s3://bucket/test/data/new.parquet",
275-
file_format=FileFormat.PARQUET,
276-
partition=Record(),
277266
record_count=100,
278-
file_size_in_bytes=1024,
279-
content=DataFileContent.DATA,
280267
)
281-
new_data_file.spec_id = 0
282268

283269
# Ensure it aborts when trying to replace a file that isn't in the table
284270
with pytest.raises(ValueError, match="Cannot delete files that are not present in the table"):
@@ -296,26 +282,16 @@ def test_replace_invariant_violation(catalog: Catalog) -> None:
296282
schema=Schema(),
297283
)
298284

299-
file_to_delete = DataFile.from_args(
285+
file_to_delete = _create_dummy_data_file(
300286
file_path="s3://bucket/test/data/deleted.parquet",
301-
file_format=FileFormat.PARQUET,
302-
partition=Record(),
303287
record_count=100,
304-
file_size_in_bytes=1024,
305-
content=DataFileContent.DATA,
306288
)
307-
file_to_delete.spec_id = 0
308289

309290
# Create a new file with MORE records than the one we are deleting
310-
too_many_records_file = DataFile.from_args(
291+
too_many_records_file = _create_dummy_data_file(
311292
file_path="s3://bucket/test/data/too_many.parquet",
312-
file_format=FileFormat.PARQUET,
313-
partition=Record(),
314293
record_count=101,
315-
file_size_in_bytes=1024,
316-
content=DataFileContent.DATA,
317294
)
318-
too_many_records_file.spec_id = 0
319295

320296
# Initially append to have something to replace
321297
with table.transaction() as tx:
@@ -339,26 +315,17 @@ def test_replace_allows_shrinking_for_soft_deletes(catalog: Catalog) -> None:
339315
)
340316

341317
# Old data file has 100 records
342-
file_to_delete = DataFile.from_args(
318+
file_to_delete = _create_dummy_data_file(
343319
file_path="s3://bucket/test/data/deleted.parquet",
344-
file_format=FileFormat.PARQUET,
345-
partition=Record(),
346320
record_count=100,
347-
file_size_in_bytes=1024,
348-
content=DataFileContent.DATA,
349321
)
350-
file_to_delete.spec_id = 0
351322

352323
# New data file only has 90 records (simulating 10 records were soft-deleted)
353-
shrunk_file_to_add = DataFile.from_args(
324+
shrunk_file_to_add = _create_dummy_data_file(
354325
file_path="s3://bucket/test/data/shrunk.parquet",
355-
file_format=FileFormat.PARQUET,
356-
partition=Record(),
357326
record_count=90,
358327
file_size_in_bytes=900,
359-
content=DataFileContent.DATA,
360328
)
361-
shrunk_file_to_add.spec_id = 0
362329

363330
# Initially append
364331
with table.transaction() as tx:
@@ -389,37 +356,26 @@ def test_replace_passes_through_delete_manifests(catalog: Catalog) -> None:
389356
)
390357

391358
# 1. Data file we will replace
392-
file_a = DataFile.from_args(
359+
file_a = _create_dummy_data_file(
393360
file_path="s3://bucket/test/data/a.parquet",
394-
file_format=FileFormat.PARQUET,
395-
partition=Record(),
396361
record_count=10,
397362
file_size_in_bytes=100,
398-
content=DataFileContent.DATA,
399363
)
400-
file_a.spec_id = 0
401364

402365
# 2. A Position Delete file (representing row-level deletes)
403-
file_a_deletes = DataFile.from_args(
366+
file_a_deletes = _create_dummy_data_file(
404367
file_path="s3://bucket/test/data/a_deletes.parquet",
405-
file_format=FileFormat.PARQUET,
406-
partition=Record(),
407368
record_count=2,
408369
file_size_in_bytes=50,
409370
content=DataFileContent.POSITION_DELETES,
410371
)
411-
file_a_deletes.spec_id = 0
412372

413373
# 3. Data file we are adding as a replacement
414-
file_b = DataFile.from_args(
374+
file_b = _create_dummy_data_file(
415375
file_path="s3://bucket/test/data/b.parquet",
416-
file_format=FileFormat.PARQUET,
417-
partition=Record(),
418376
record_count=10,
419377
file_size_in_bytes=100,
420-
content=DataFileContent.DATA,
421378
)
422-
file_b.spec_id = 0
423379

424380
# Commit 1: Append the data file
425381
with table.transaction() as tx:
@@ -467,45 +423,27 @@ def test_replace_multiple_files(catalog: Catalog) -> None:
467423
schema=Schema(),
468424
)
469425

470-
file_1 = DataFile.from_args(
426+
file_1 = _create_dummy_data_file(
471427
file_path="s3://bucket/test/data/1.parquet",
472-
file_format=FileFormat.PARQUET,
473-
partition=Record(),
474428
record_count=100,
475-
file_size_in_bytes=1024,
476-
content=DataFileContent.DATA,
477429
)
478-
file_1.spec_id = 0
479430

480-
file_2 = DataFile.from_args(
431+
file_2 = _create_dummy_data_file(
481432
file_path="s3://bucket/test/data/2.parquet",
482-
file_format=FileFormat.PARQUET,
483-
partition=Record(),
484433
record_count=100,
485-
file_size_in_bytes=1024,
486-
content=DataFileContent.DATA,
487434
)
488-
file_2.spec_id = 0
489435

490-
file_1_new = DataFile.from_args(
436+
file_1_new = _create_dummy_data_file(
491437
file_path="s3://bucket/test/data/1_new.parquet",
492-
file_format=FileFormat.PARQUET,
493-
partition=Record(),
494438
record_count=50,
495439
file_size_in_bytes=512,
496-
content=DataFileContent.DATA,
497440
)
498-
file_1_new.spec_id = 0
499441

500-
file_2_new = DataFile.from_args(
442+
file_2_new = _create_dummy_data_file(
501443
file_path="s3://bucket/test/data/2_new.parquet",
502-
file_format=FileFormat.PARQUET,
503-
partition=Record(),
504444
record_count=50,
505445
file_size_in_bytes=512,
506-
content=DataFileContent.DATA,
507446
)
508-
file_2_new.spec_id = 0
509447

510448
# Append initial files
511449
with table.transaction() as tx:
@@ -550,26 +488,20 @@ def test_replace_partitioned_table(catalog: Catalog) -> None:
550488
)
551489

552490
# File in partition id=1
553-
file_part1 = DataFile.from_args(
491+
file_part1 = _create_dummy_data_file(
554492
file_path="s3://bucket/test/data/part1.parquet",
555-
file_format=FileFormat.PARQUET,
556493
partition=Record(1),
557494
record_count=100,
558-
file_size_in_bytes=1024,
559-
content=DataFileContent.DATA,
495+
spec_id=table.spec().spec_id,
560496
)
561-
file_part1.spec_id = table.spec().spec_id
562497

563498
# File in partition id=2
564-
file_part2 = DataFile.from_args(
499+
file_part2 = _create_dummy_data_file(
565500
file_path="s3://bucket/test/data/part2.parquet",
566-
file_format=FileFormat.PARQUET,
567501
partition=Record(2),
568502
record_count=100,
569-
file_size_in_bytes=1024,
570-
content=DataFileContent.DATA,
503+
spec_id=table.spec().spec_id,
571504
)
572-
file_part2.spec_id = table.spec().spec_id
573505

574506
# Add initial files
575507
with table.transaction() as tx:
@@ -578,15 +510,13 @@ def test_replace_partitioned_table(catalog: Catalog) -> None:
578510
append_snapshot.append_data_file(file_part2)
579511

580512
# Replace file in partition 1
581-
file_part1_new = DataFile.from_args(
513+
file_part1_new = _create_dummy_data_file(
582514
file_path="s3://bucket/test/data/part1_new.parquet",
583-
file_format=FileFormat.PARQUET,
584515
partition=Record(1),
585516
record_count=50,
586517
file_size_in_bytes=512,
587-
content=DataFileContent.DATA,
518+
spec_id=table.spec().spec_id,
588519
)
589-
file_part1_new.spec_id = table.spec().spec_id
590520

591521
with table.transaction() as tx:
592522
with tx.update_snapshot().replace() as rewrite:
@@ -609,15 +539,11 @@ def test_replace_no_op_on_non_empty_table(catalog: Catalog) -> None:
609539
schema=Schema(),
610540
)
611541

612-
file_a = DataFile.from_args(
542+
file_a = _create_dummy_data_file(
613543
file_path="s3://bucket/test/data/a.parquet",
614-
file_format=FileFormat.PARQUET,
615-
partition=Record(),
616544
record_count=10,
617545
file_size_in_bytes=100,
618-
content=DataFileContent.DATA,
619546
)
620-
file_a.spec_id = 0
621547

622548
# Commit 1: Append file A
623549
with table.transaction() as tx:
@@ -646,26 +572,16 @@ def test_replace_on_custom_branch(catalog: Catalog) -> None:
646572
)
647573

648574
# 1. File we will delete
649-
file_to_delete = DataFile.from_args(
575+
file_to_delete = _create_dummy_data_file(
650576
file_path="s3://bucket/test/data/deleted.parquet",
651-
file_format=FileFormat.PARQUET,
652-
partition=Record(),
653577
record_count=100,
654-
file_size_in_bytes=1024,
655-
content=DataFileContent.DATA,
656578
)
657-
file_to_delete.spec_id = 0
658579

659580
# 2. File we are adding as a replacement
660-
file_to_add = DataFile.from_args(
581+
file_to_add = _create_dummy_data_file(
661582
file_path="s3://bucket/test/data/added.parquet",
662-
file_format=FileFormat.PARQUET,
663-
partition=Record(),
664583
record_count=100,
665-
file_size_in_bytes=1024,
666-
content=DataFileContent.DATA,
667584
)
668-
file_to_add.spec_id = 0
669585

670586
# Initially append to have something to replace on main
671587
with table.transaction() as tx:

0 commit comments

Comments
 (0)