Skip to content

Commit 4e6e1c6

Browse files
fix(datalake): restore cold storage skip logging and remove duplicate tests per review
1 parent 949389a commit 4e6e1c6

3 files changed

Lines changed: 8 additions & 38 deletions

File tree

ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,9 @@ def _discover_iceberg_dirs(
124124

125125
for blob in bucket.list_blobs(prefix=prefix):
126126
if skip_cold_storage and self._should_skip_gcs_cold_storage(blob):
127+
logger.debug(
128+
f"Skipping cold storage object: {blob.name} (storage_class: {getattr(blob, 'storage_class', None)})"
129+
)
127130
match = self._ICEBERG_METADATA_RE.match(blob.name)
128131
if match:
129132
cold_iceberg_dirs.add(match.group(1))

ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ def _discover_iceberg_dirs(
8383
key_name = key["Key"]
8484
size = key.get("Size")
8585
if skip_cold_storage and self._should_skip_s3_cold_storage(key):
86+
logger.debug(
87+
f"Skipping cold storage object: {key_name} "
88+
f"(StorageClass: {key.get('StorageClass', 'STANDARD')}, "
89+
f"ArchiveStatus: {key.get('ArchiveStatus', '')})"
90+
)
8691
match = self._ICEBERG_METADATA_RE.match(key_name)
8792
if match:
8893
cold_iceberg_dirs.add(match.group(1))

ingestion/tests/unit/source/database/test_iceberg_discovery.py

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -76,22 +76,6 @@ def test_gcs_iceberg_table_detected(self):
7676
assert name == "warehouse/orders/metadata/v2.metadata.json"
7777
assert size == 600
7878

79-
def test_gcs_iceberg_yields_one_table_per_directory(self):
80-
blobs = [
81-
_make_blob("warehouse/orders/metadata/v1.metadata.json", size=500),
82-
_make_blob("warehouse/orders/metadata/v2.metadata.json", size=600),
83-
_make_blob("warehouse/orders/data/00000-0-abc.parquet", size=8192),
84-
_make_blob("warehouse/orders/data/00001-0-def.parquet", size=9216),
85-
]
86-
client = _make_gcs_client(blobs)
87-
88-
results = list(client.get_table_names("my-bucket", prefix="warehouse"))
89-
90-
assert len(results) == 1
91-
name, size = results[0]
92-
assert name == "warehouse/orders/metadata/v2.metadata.json"
93-
assert size == 600
94-
9579
def test_gcs_multiple_iceberg_tables(self):
9680
blobs = [
9781
_make_blob("warehouse/orders/metadata/v1.metadata.json", size=400),
@@ -195,28 +179,6 @@ def test_s3_iceberg_table_detected(self):
195179
assert name == "warehouse/orders/metadata/v2.metadata.json"
196180
assert size == 600
197181

198-
def test_s3_iceberg_yields_one_table_per_directory(self):
199-
keys = [
200-
"warehouse/orders/metadata/v1.metadata.json",
201-
"warehouse/orders/metadata/v2.metadata.json",
202-
"warehouse/orders/data/00000-0-abc.parquet",
203-
]
204-
client = self._make_s3_client(
205-
keys,
206-
sizes={"warehouse/orders/metadata/v2.metadata.json": 600},
207-
)
208-
209-
with patch(
210-
"metadata.ingestion.source.database.datalake.clients.s3.list_s3_objects",
211-
return_value=self._s3_objects,
212-
):
213-
results = list(client.get_table_names("my-bucket", prefix="warehouse"))
214-
215-
assert len(results) == 1
216-
name, size = results[0]
217-
assert name == "warehouse/orders/metadata/v2.metadata.json"
218-
assert size == 600
219-
220182
def test_s3_fallback_for_non_iceberg(self):
221183
keys = [
222184
"data/orders.csv",

0 commit comments

Comments
 (0)