Skip to content

Commit b85aacc

Browse files
fix(datalake): hybrid two-pass listing, ruff checkstyle, and Copilot findings
- Hybrid two-pass: non-Iceberg buckets (common case) skip regex/set overhead in pass 2; Iceberg buckets apply directory filter as before - Move _ICEBERG_METADATA_RE and _update_iceberg_entry to DatalakeBaseClient (DRY) - Fix _is_json_lines false-positive for minified Iceberg/Delta metadata dicts - Fix sys.modules stub to use setdefault (avoids overwriting real google packages) - Fix ruff: SIM103, TRY300 in json.py; RUF013, RUF059 in test file
1 parent 8082b4e commit b85aacc

6 files changed

Lines changed: 55 additions & 85 deletions

File tree

ingestion/src/metadata/ingestion/source/database/datalake/clients/base.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ def _update_iceberg_entry(
4242
iceberg_tables[table_dir] = (version, name, size)
4343
return True
4444

45-
4645
def __init__(self, client: Any, session: Any = None, **kwargs):
4746
self._client = client
4847
self._session = session

ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -126,13 +126,12 @@ def get_table_names(
126126
files without accumulation, keeping memory overhead at O(1) per object.
127127
"""
128128
bucket = self._client.get_bucket(bucket_name)
129-
iceberg_tables: Dict[str, Tuple[int, str, Optional[int]]] = {} # noqa: UP006
129+
iceberg_tables: Dict[str, Tuple[int, str, int | None]] = {} # noqa: UP006
130130

131131
for blob in bucket.list_blobs(prefix=prefix):
132132
if skip_cold_storage and self._should_skip_gcs_cold_storage(blob):
133133
logger.debug(
134-
f"Skipping cold storage object: {blob.name} "
135-
f"(storage_class: {getattr(blob, 'storage_class', None)})"
134+
f"Skipping cold storage object: {blob.name} (storage_class: {getattr(blob, 'storage_class', None)})"
136135
)
137136
continue
138137
self._update_iceberg_entry(iceberg_tables, blob.name, blob.size)
@@ -141,13 +140,19 @@ def get_table_names(
141140
for _, metadata_blob_path, size in iceberg_tables.values():
142141
yield metadata_blob_path, size
143142

144-
for blob in bucket.list_blobs(prefix=prefix):
145-
if skip_cold_storage and self._should_skip_gcs_cold_storage(blob):
146-
continue
147-
if not self._ICEBERG_METADATA_RE.match(blob.name) and not any(
148-
blob.name.startswith(d + "/") for d in iceberg_dirs
149-
):
143+
if not iceberg_dirs:
144+
for blob in bucket.list_blobs(prefix=prefix):
145+
if skip_cold_storage and self._should_skip_gcs_cold_storage(blob):
146+
continue
150147
yield blob.name, blob.size
148+
else:
149+
for blob in bucket.list_blobs(prefix=prefix):
150+
if skip_cold_storage and self._should_skip_gcs_cold_storage(blob):
151+
continue
152+
if not self._ICEBERG_METADATA_RE.match(blob.name) and not any(
153+
blob.name.startswith(d + "/") for d in iceberg_dirs
154+
):
155+
yield blob.name, blob.size
151156

152157
def close(self, service_connection):
153158
os.environ.pop("GOOGLE_CLOUD_PROJECT", "")

ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ def get_table_names(
8787
if prefix:
8888
kwargs["Prefix"] = prefix if prefix.endswith("/") else f"{prefix}/"
8989

90-
iceberg_tables: Dict[str, Tuple[int, str, Optional[int]]] = {} # noqa: UP006
90+
iceberg_tables: Dict[str, Tuple[int, str, int | None]] = {} # noqa: UP006
9191

9292
for key in list_s3_objects(self._client, **kwargs):
9393
key_name = key["Key"]
@@ -105,15 +105,23 @@ def get_table_names(
105105
for _, metadata_key, size in iceberg_tables.values():
106106
yield metadata_key, size
107107

108-
for key in list_s3_objects(self._client, **kwargs):
109-
key_name = key["Key"]
110-
size = key.get("Size")
111-
if skip_cold_storage and self._should_skip_s3_cold_storage(key):
112-
continue
113-
if not self._ICEBERG_METADATA_RE.match(key_name) and not any(
114-
key_name.startswith(d + "/") for d in iceberg_dirs
115-
):
108+
if not iceberg_dirs:
109+
for key in list_s3_objects(self._client, **kwargs):
110+
key_name = key["Key"]
111+
size = key.get("Size")
112+
if skip_cold_storage and self._should_skip_s3_cold_storage(key):
113+
continue
116114
yield key_name, size
115+
else:
116+
for key in list_s3_objects(self._client, **kwargs):
117+
key_name = key["Key"]
118+
size = key.get("Size")
119+
if skip_cold_storage and self._should_skip_s3_cold_storage(key):
120+
continue
121+
if not self._ICEBERG_METADATA_RE.match(key_name) and not any(
122+
key_name.startswith(d + "/") for d in iceberg_dirs
123+
):
124+
yield key_name, size
117125

118126
def get_folders_prefix(self, bucket_name: str, prefix: Optional[str]) -> Iterable[str]: # noqa: UP045
119127
for page in self._client.get_paginator("list_objects_v2").paginate(

ingestion/src/metadata/readers/dataframe/json.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,9 @@ def _read_json_object(
125125
if isinstance(data, dict)
126126
and (
127127
data.get("$schema") is not None # JSON Schema files
128-
or data.get("format-version")
129-
is not None # Apache Iceberg table metadata
128+
or data.get("format-version") is not None # Apache Iceberg table metadata
130129
or ( # Delta Lake / Iceberg schema structure
131-
isinstance(data.get("schema"), dict)
132-
and isinstance(data.get("schema", {}).get("fields"), list)
130+
isinstance(data.get("schema"), dict) and isinstance(data.get("schema", {}).get("fields"), list)
133131
)
134132
)
135133
else None
@@ -153,19 +151,16 @@ def _is_json_lines(file_obj) -> bool:
153151
return True
154152
try:
155153
obj = json.loads(first_line)
154+
except json.JSONDecodeError:
155+
return False
156+
else:
156157
if not isinstance(obj, dict):
157158
return False
158159
if obj.get("$schema") is not None:
159160
return False
160161
if obj.get("format-version") is not None:
161162
return False
162-
if isinstance(obj.get("schema"), dict) and isinstance(
163-
obj.get("schema", {}).get("fields"), list
164-
):
165-
return False
166-
return True
167-
except json.JSONDecodeError:
168-
return False
163+
return not (isinstance(obj.get("schema"), dict) and isinstance(obj.get("schema", {}).get("fields"), list))
169164

170165
def _read_json_smart(
171166
self,

ingestion/src/metadata/utils/datalake/datalake_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ def fetch_dataframe_first_chunk(
153153
_ICEBERG_METADATA_PATH_RE = re.compile(r"([^/]+)/metadata/v\d+\.metadata\.json$")
154154

155155

156-
def get_iceberg_table_name_from_metadata_path(metadata_path: str) -> Optional[str]:
156+
def get_iceberg_table_name_from_metadata_path(metadata_path: str) -> str | None:
157157
"""
158158
Extracts the Iceberg table directory name from a metadata file path.
159159

ingestion/tests/unit/source/database/test_iceberg_discovery.py

Lines changed: 17 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
"""
1313
Tests for Iceberg table directory detection in DatalakeGcsClient and DatalakeS3Client.
1414
"""
15+
1516
import sys
1617
import types
1718
from unittest.mock import MagicMock, patch
@@ -21,9 +22,7 @@
2122
# present, which prevents breaking other tests or masking integration issues.
2223
_google_mod = sys.modules.setdefault("google", types.ModuleType("google"))
2324
_gcloud_mod = sys.modules.setdefault("google.cloud", types.ModuleType("google.cloud"))
24-
_storage_mod = sys.modules.setdefault(
25-
"google.cloud.storage", types.ModuleType("google.cloud.storage")
26-
)
25+
_storage_mod = sys.modules.setdefault("google.cloud.storage", types.ModuleType("google.cloud.storage"))
2726
if not hasattr(_storage_mod, "Client"):
2827
_storage_mod.Client = MagicMock
2928
if not hasattr(_google_mod, "cloud"):
@@ -39,9 +38,7 @@
3938
)
4039

4140

42-
def _make_blob(
43-
name: str, size: int = 1024, storage_class: str = "STANDARD"
44-
) -> MagicMock:
41+
def _make_blob(name: str, size: int = 1024, storage_class: str = "STANDARD") -> MagicMock:
4542
blob = MagicMock()
4643
blob.name = name
4744
blob.size = size
@@ -54,9 +51,7 @@ def _make_gcs_client(blobs: list) -> DatalakeGcsClient:
5451
mock_bucket = MagicMock()
5552
mock_storage_client.get_bucket.return_value = mock_bucket
5653
mock_bucket.list_blobs.return_value = blobs
57-
mock_bucket.get_blob.side_effect = lambda name: next(
58-
(b for b in blobs if b.name == name), None
59-
)
54+
mock_bucket.get_blob.side_effect = lambda name: next((b for b in blobs if b.name == name), None)
6055
client = DatalakeGcsClient.__new__(DatalakeGcsClient)
6156
client._client = mock_storage_client
6257
client._temp_credentials_file_path_list = []
@@ -166,17 +161,15 @@ def test_gcs_iceberg_version_comparison_v10(self):
166161

167162

168163
class TestS3IcebergDiscovery:
169-
def _make_s3_client(self, keys: list, sizes: dict = None) -> DatalakeS3Client:
164+
def _make_s3_client(self, keys: list, sizes: dict | None = None) -> DatalakeS3Client:
170165
"""Helper: create a DatalakeS3Client backed by a mocked boto3 client."""
171166
mock_boto_client = MagicMock()
172167
client = DatalakeS3Client.__new__(DatalakeS3Client)
173168
client._client = mock_boto_client
174169
client._session = None
175170
self._mock_boto_client = mock_boto_client
176171
sizes = sizes or {}
177-
self._s3_objects = [
178-
{"Key": k, "Size": sizes.get(k, 1024)} for k in keys
179-
]
172+
self._s3_objects = [{"Key": k, "Size": sizes.get(k, 1024)} for k in keys]
180173
return client
181174

182175
def test_s3_iceberg_table_detected(self):
@@ -275,39 +268,19 @@ def test_iceberg_table_name_extracted_correctly(self):
275268
get_iceberg_table_name_from_metadata_path,
276269
)
277270

278-
assert (
279-
get_iceberg_table_name_from_metadata_path(
280-
"warehouse/orders/metadata/v2.metadata.json"
281-
)
282-
== "orders"
283-
)
284-
assert (
285-
get_iceberg_table_name_from_metadata_path(
286-
"my_prefix/sales/metadata/v1.metadata.json"
287-
)
288-
== "sales"
289-
)
290-
assert (
291-
get_iceberg_table_name_from_metadata_path(
292-
"simple/metadata/v3.metadata.json"
293-
)
294-
== "simple"
295-
)
271+
assert get_iceberg_table_name_from_metadata_path("warehouse/orders/metadata/v2.metadata.json") == "orders"
272+
assert get_iceberg_table_name_from_metadata_path("my_prefix/sales/metadata/v1.metadata.json") == "sales"
273+
assert get_iceberg_table_name_from_metadata_path("simple/metadata/v3.metadata.json") == "simple"
296274

297275
def test_non_iceberg_path_returns_none(self):
298276
from metadata.utils.datalake.datalake_utils import (
299277
get_iceberg_table_name_from_metadata_path,
300278
)
301279

302280
assert get_iceberg_table_name_from_metadata_path("data/orders.json") is None
303-
assert (
304-
get_iceberg_table_name_from_metadata_path("warehouse/orders.json") is None
305-
)
281+
assert get_iceberg_table_name_from_metadata_path("warehouse/orders.json") is None
306282
assert get_iceberg_table_name_from_metadata_path("metadata/v1.json") is None
307-
assert (
308-
get_iceberg_table_name_from_metadata_path("orders/metadata/snapshot.avro")
309-
is None
310-
)
283+
assert get_iceberg_table_name_from_metadata_path("orders/metadata/snapshot.avro") is None
311284

312285
def test_table_type_iceberg_for_metadata_files(self):
313286
from metadata.generated.schema.entity.data.table import TableType
@@ -317,9 +290,7 @@ def test_table_type_iceberg_for_metadata_files(self):
317290

318291
key_name = "warehouse/orders/metadata/v1.metadata.json"
319292
table_type = (
320-
TableType.Iceberg
321-
if get_iceberg_table_name_from_metadata_path(key_name) is not None
322-
else TableType.Regular
293+
TableType.Iceberg if get_iceberg_table_name_from_metadata_path(key_name) is not None else TableType.Regular
323294
)
324295
assert table_type == TableType.Iceberg
325296

@@ -335,9 +306,7 @@ def test_table_type_regular_for_normal_files(self):
335306
if get_iceberg_table_name_from_metadata_path(key_name) is not None
336307
else TableType.Regular
337308
)
338-
assert (
339-
table_type == TableType.Regular
340-
), f"Expected Regular for {key_name}, got {table_type}"
309+
assert table_type == TableType.Regular, f"Expected Regular for {key_name}, got {table_type}"
341310

342311

343312
class TestSlice4FetchKeyCorrectness:
@@ -375,7 +344,7 @@ def test_yield_table_uses_metadata_path_not_display_name(self):
375344
file_size,
376345
original_key,
377346
)
378-
table_name, table_type, table_extension, t_file_size, fetch_key = tuple_5
347+
table_name, _table_type, table_extension, t_file_size, fetch_key = tuple_5
379348

380349
wrapper = DatalakeTableSchemaWrapper(
381350
key=fetch_key,
@@ -384,12 +353,8 @@ def test_yield_table_uses_metadata_path_not_display_name(self):
384353
file_size=t_file_size,
385354
)
386355

387-
assert (
388-
wrapper.key == original_key
389-
), f"fetch key should be original blob path, got {wrapper.key!r}"
390-
assert (
391-
wrapper.key != display_name
392-
), f"fetch key must NOT be the display name '{display_name}'"
356+
assert wrapper.key == original_key, f"fetch key should be original blob path, got {wrapper.key!r}"
357+
assert wrapper.key != display_name, f"fetch key must NOT be the display name '{display_name}'"
393358
assert table_name == display_name
394359

395360
def test_non_iceberg_fetch_key_equals_table_name(self):
@@ -406,9 +371,7 @@ def test_non_iceberg_fetch_key_equals_table_name(self):
406371
)
407372

408373
key_name = "data/orders.parquet"
409-
table_name = (
410-
key_name # standardize_table_name returns unchanged for non-Iceberg
411-
)
374+
table_name = key_name # standardize_table_name returns unchanged for non-Iceberg
412375

413376
assert get_iceberg_table_name_from_metadata_path(key_name) is None
414377

0 commit comments

Comments
 (0)