|
16 | 16 | # under the License. |
17 | 17 | # pylint: disable=redefined-outer-name,arguments-renamed,fixme |
18 | 18 | from tempfile import TemporaryDirectory |
| 19 | +from unittest import mock |
19 | 20 |
|
20 | 21 | import fastavro |
21 | 22 | import pytest |
22 | 23 |
|
| 24 | +import pyiceberg.manifest as manifest_module |
23 | 25 | from pyiceberg.avro.codecs import AvroCompressionCodec |
24 | 26 | from pyiceberg.io import load_file_io |
25 | 27 | from pyiceberg.io.pyarrow import PyArrowFileIO |
|
34 | 36 | PartitionFieldSummary, |
35 | 37 | _inherit_from_manifest, |
36 | 38 | _manifest_cache, |
| 39 | + _get_manifest_cache, |
37 | 40 | _manifests, |
| 41 | + clear_manifest_cache, |
38 | 42 | read_manifest_list, |
39 | 43 | write_manifest, |
40 | 44 | write_manifest_list, |
|
47 | 51 |
|
48 | 52 |
|
49 | 53 | @pytest.fixture(autouse=True) |
50 | | -def clear_global_manifests_cache() -> None: |
51 | | - # Clear the global cache before each test |
52 | | - _manifest_cache.clear() |
| 54 | +def reset_global_manifests_cache() -> None: |
| 55 | + # Reset cache state before each test so config is re-read |
| 56 | + manifest_module._manifest_cache_manager._cache = None |
| 57 | + manifest_module._manifest_cache_manager._initialized = False |
53 | 58 |
|
54 | 59 |
|
55 | 60 | def _verify_metadata_with_fastavro(avro_file: str, expected_metadata: dict[str, str]) -> None: |
@@ -805,9 +810,9 @@ def test_manifest_cache_deduplicates_manifest_files() -> None: |
805 | 810 |
|
806 | 811 | # Verify cache size - should only have 3 unique ManifestFile objects |
807 | 812 | # instead of 1 + 2 + 3 = 6 objects as with the old approach |
808 | | - assert len(_manifest_cache) == 3, ( |
809 | | - f"Cache should contain exactly 3 unique ManifestFile objects, but has {len(_manifest_cache)}" |
810 | | - ) |
| 813 | + cache = _get_manifest_cache() |
| 814 | + assert cache is not None, "Manifest cache should be enabled for this test" |
| 815 | + assert len(cache) == 3, f"Cache should contain exactly 3 unique ManifestFile objects, but has {len(cache)}" |
811 | 816 |
|
812 | 817 |
|
813 | 818 | def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None: |
@@ -880,9 +885,11 @@ def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None: |
880 | 885 | # With the new approach, we should have exactly N objects |
881 | 886 |
|
882 | 887 | # Verify cache has exactly N unique entries |
883 | | - assert len(_manifest_cache) == num_manifests, ( |
| 888 | + cache = _get_manifest_cache() |
| 889 | + assert cache is not None, "Manifest cache should be enabled for this test" |
| 890 | + assert len(cache) == num_manifests, ( |
884 | 891 | f"Cache should contain exactly {num_manifests} ManifestFile objects, " |
885 | | - f"but has {len(_manifest_cache)}. " |
| 892 | + f"but has {len(cache)}. " |
886 | 893 | f"Old approach would have {num_manifests * (num_manifests + 1) // 2} objects." |
887 | 894 | ) |
888 | 895 |
|
@@ -973,3 +980,115 @@ def test_inherit_from_manifest_snapshot_id() -> None: |
973 | 980 | assert result.snapshot_id == 3051729675574597004 |
974 | 981 | assert result.sequence_number == 1 |
975 | 982 | assert result.file_sequence_number == 1 |
| 983 | +def test_clear_manifest_cache() -> None: |
| 984 | + """Test that clear_manifest_cache() clears cache entries while keeping cache enabled.""" |
| 985 | + io = PyArrowFileIO() |
| 986 | + |
| 987 | + with TemporaryDirectory() as tmp_dir: |
| 988 | + schema = Schema(NestedField(field_id=1, name="id", field_type=IntegerType(), required=True)) |
| 989 | + spec = UNPARTITIONED_PARTITION_SPEC |
| 990 | + |
| 991 | + # Create a manifest file |
| 992 | + manifest_path = f"{tmp_dir}/manifest.avro" |
| 993 | + with write_manifest( |
| 994 | + format_version=2, |
| 995 | + spec=spec, |
| 996 | + schema=schema, |
| 997 | + output_file=io.new_output(manifest_path), |
| 998 | + snapshot_id=1, |
| 999 | + avro_compression="zstandard", |
| 1000 | + ) as writer: |
| 1001 | + data_file = DataFile.from_args( |
| 1002 | + content=DataFileContent.DATA, |
| 1003 | + file_path=f"{tmp_dir}/data.parquet", |
| 1004 | + file_format=FileFormat.PARQUET, |
| 1005 | + partition=Record(), |
| 1006 | + record_count=100, |
| 1007 | + file_size_in_bytes=1000, |
| 1008 | + ) |
| 1009 | + writer.add_entry( |
| 1010 | + ManifestEntry.from_args( |
| 1011 | + status=ManifestEntryStatus.ADDED, |
| 1012 | + snapshot_id=1, |
| 1013 | + data_file=data_file, |
| 1014 | + ) |
| 1015 | + ) |
| 1016 | + manifest_file = writer.to_manifest_file() |
| 1017 | + |
| 1018 | + # Create a manifest list |
| 1019 | + list_path = f"{tmp_dir}/manifest-list.avro" |
| 1020 | + with write_manifest_list( |
| 1021 | + format_version=2, |
| 1022 | + output_file=io.new_output(list_path), |
| 1023 | + snapshot_id=1, |
| 1024 | + parent_snapshot_id=None, |
| 1025 | + sequence_number=1, |
| 1026 | + avro_compression="zstandard", |
| 1027 | + ) as list_writer: |
| 1028 | + list_writer.add_manifests([manifest_file]) |
| 1029 | + |
| 1030 | + # Populate the cache |
| 1031 | + _manifests(io, list_path) |
| 1032 | + |
| 1033 | + # Verify cache has entries |
| 1034 | + cache = _get_manifest_cache() |
| 1035 | + assert cache is not None, "Cache should be enabled" |
| 1036 | + assert len(cache) > 0, "Cache should have entries after reading manifests" |
| 1037 | + |
| 1038 | + # Clear the cache |
| 1039 | + clear_manifest_cache() |
| 1040 | + |
| 1041 | + # Verify cache is empty but still enabled |
| 1042 | + cache_after = _get_manifest_cache() |
| 1043 | + assert cache_after is not None, "Cache should still be enabled after clear" |
| 1044 | + assert len(cache_after) == 0, "Cache should be empty after clear" |
| 1045 | + |
| 1046 | + |
| 1047 | +@pytest.mark.parametrize( |
| 1048 | + "env_vars,expected_enabled,expected_size", |
| 1049 | + [ |
| 1050 | + ({}, True, 128), # defaults |
| 1051 | + ({"PYICEBERG_MANIFEST__CACHE__ENABLED": "true"}, True, 128), |
| 1052 | + ({"PYICEBERG_MANIFEST__CACHE__ENABLED": "false"}, False, 128), |
| 1053 | + ({"PYICEBERG_MANIFEST__CACHE__SIZE": "64"}, True, 64), |
| 1054 | + ({"PYICEBERG_MANIFEST__CACHE__SIZE": "256"}, True, 256), |
| 1055 | + ({"PYICEBERG_MANIFEST__CACHE__ENABLED": "false", "PYICEBERG_MANIFEST__CACHE__SIZE": "64"}, False, 64), |
| 1056 | + ], |
| 1057 | +) |
| 1058 | +def test_manifest_cache_config_valid_values(env_vars: dict[str, str], expected_enabled: bool, expected_size: int) -> None: |
| 1059 | + """Test that valid config values are applied correctly.""" |
| 1060 | + import os |
| 1061 | + |
| 1062 | + with mock.patch.dict(os.environ, env_vars, clear=False): |
| 1063 | + # Reset cache state so config is re-read |
| 1064 | + manifest_module._manifest_cache_manager._cache = None |
| 1065 | + manifest_module._manifest_cache_manager._initialized = False |
| 1066 | + cache = _get_manifest_cache() |
| 1067 | + |
| 1068 | + if expected_enabled: |
| 1069 | + assert cache is not None, "Cache should be enabled" |
| 1070 | + assert cache.maxsize == expected_size, f"Cache size should be {expected_size}" |
| 1071 | + else: |
| 1072 | + assert cache is None, "Cache should be disabled" |
| 1073 | + |
| 1074 | + |
| 1075 | +@pytest.mark.parametrize( |
| 1076 | + "env_vars,expected_error_substring", |
| 1077 | + [ |
| 1078 | + ({"PYICEBERG_MANIFEST__CACHE__ENABLED": "maybe"}, "manifest.cache.enabled should be a boolean"), |
| 1079 | + ({"PYICEBERG_MANIFEST__CACHE__ENABLED": "invalid"}, "manifest.cache.enabled should be a boolean"), |
| 1080 | + ({"PYICEBERG_MANIFEST__CACHE__SIZE": "abc"}, "manifest.cache.size should be a positive integer"), |
| 1081 | + ({"PYICEBERG_MANIFEST__CACHE__SIZE": "0"}, "manifest.cache.size must be >= 1"), |
| 1082 | + ({"PYICEBERG_MANIFEST__CACHE__SIZE": "-5"}, "manifest.cache.size must be >= 1"), |
| 1083 | + ], |
| 1084 | +) |
| 1085 | +def test_manifest_cache_config_invalid_values(env_vars: dict[str, str], expected_error_substring: str) -> None: |
| 1086 | + """Test that invalid config values raise ValueError with appropriate message.""" |
| 1087 | + import os |
| 1088 | + |
| 1089 | + with mock.patch.dict(os.environ, env_vars, clear=False): |
| 1090 | + # Reset cache state so config is re-read |
| 1091 | + manifest_module._manifest_cache_manager._cache = None |
| 1092 | + manifest_module._manifest_cache_manager._initialized = False |
| 1093 | + with pytest.raises(ValueError, match=expected_error_substring): |
| 1094 | + _get_manifest_cache() |
0 commit comments