Skip to content

Commit 8082b4e

Browse files
fix(datalake): address Copilot + gitar-bot findings on Iceberg ingestion
- Fix _is_json_lines false-positive: minified single-line Iceberg/Delta metadata dicts were classified as JSONL, bypassing the raw_data gate entirely. Now all three detection conditions (format-version, schema.fields, \) are checked. - Move _ICEBERG_METADATA_RE and _update_iceberg_entry to DatalakeBaseClient to eliminate regex/classify duplication between GCS and S3 clients (DRY) - Replace single-pass O(N) memory approach with two-pass streaming: pass 1 builds iceberg_tables dict only (O(tables)), pass 2 streams regular files without accumulation (O(1) per object) - Fix sys.modules stub in test_iceberg_discovery.py: use setdefault for all three google module entries to avoid overwriting real installed packages
1 parent 3604a30 commit 8082b4e

5 files changed

Lines changed: 77 additions & 68 deletions

File tree

ingestion/src/metadata/ingestion/source/database/datalake/clients/base.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,36 @@
1313
Datalake Base Client
1414
"""
1515

16+
import re
1617
from abc import ABC, abstractmethod
17-
from typing import Any, Callable, Iterable, Optional, Tuple # noqa: UP035
18+
from typing import Any, Callable, Dict, Iterable, Optional, Tuple # noqa: UP035
1819

1920

2021
class DatalakeBaseClient(ABC):
2122
"""Base DL client implementation"""
2223

24+
_ICEBERG_METADATA_RE = re.compile(r"^(.*)/metadata/v(\d+)\.metadata\.json$")
25+
26+
def _update_iceberg_entry(
27+
self,
28+
iceberg_tables: Dict[str, Tuple[int, str, Optional[int]]], # noqa: UP006, UP045
29+
name: str,
30+
size: Optional[int], # noqa: UP045
31+
) -> bool:
32+
"""
33+
If name matches the Iceberg metadata pattern, update iceberg_tables with
34+
the highest-version entry and return True. Otherwise return False.
35+
"""
36+
match = self._ICEBERG_METADATA_RE.match(name)
37+
if not match:
38+
return False
39+
table_dir, version = match.group(1), int(match.group(2))
40+
existing = iceberg_tables.get(table_dir)
41+
if existing is None or version > existing[0]:
42+
iceberg_tables[table_dir] = (version, name, size)
43+
return True
44+
45+
2346
def __init__(self, client: Any, session: Any = None, **kwargs):
2447
self._client = client
2548
self._session = session

ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
"""
1515

1616
import os
17-
import re
1817
from copy import deepcopy
1918
from functools import partial
2019
from typing import Callable, Dict, Iterable, List, Optional, Set, Tuple # noqa: UP035
@@ -108,45 +107,26 @@ def get_database_schema_names(self, bucket_name: Optional[str]) -> Iterable[str]
108107
for bucket in self._client.list_buckets():
109108
yield bucket.name
110109

111-
_ICEBERG_METADATA_RE = re.compile(r"^(.*)/metadata/v(\d+)\.metadata\.json$")
112-
113110
@staticmethod
114111
def _should_skip_gcs_cold_storage(blob) -> bool:
115112
storage_class = getattr(blob, "storage_class", None)
116113
return bool(storage_class and storage_class in GCS_COLD_STORAGE_CLASSES)
117114

118-
def _classify_gcs_blob(
119-
self,
120-
iceberg_tables: Dict[str, Tuple[int, str, Optional[int]]], # noqa: UP006, UP045
121-
regular_files: List[Tuple[str, Optional[int]]], # noqa: UP006, UP045
122-
blob,
123-
) -> None:
124-
match = self._ICEBERG_METADATA_RE.match(blob.name)
125-
if match:
126-
table_dir, version = match.group(1), int(match.group(2))
127-
existing = iceberg_tables.get(table_dir)
128-
if existing is None or version > existing[0]:
129-
iceberg_tables[table_dir] = (version, blob.name, blob.size)
130-
else:
131-
regular_files.append((blob.name, blob.size))
132-
133115
def get_table_names(
134116
self,
135117
bucket_name: str,
136118
prefix: Optional[str], # noqa: UP045
137119
skip_cold_storage: bool = False,
138120
) -> Iterable[Tuple[str, Optional[int]]]: # noqa: UP006, UP045
139121
"""
140-
Lists tables in a GCS bucket using a single pass.
122+
Lists tables in a GCS bucket using a two-pass approach.
141123
142-
Iceberg table directories are identified by blobs matching
143-
``<table_dir>/metadata/v<N>.metadata.json``. Only the blob with the
144-
highest integer version is yielded per table directory. Regular files
145-
not under any Iceberg table directory are also yielded.
124+
Pass 1 collects only the Iceberg table dict (memory proportional to the
125+
number of Iceberg tables, which is always small). Pass 2 streams regular
126+
files without accumulation, keeping memory overhead at O(1) per object.
146127
"""
147128
bucket = self._client.get_bucket(bucket_name)
148129
iceberg_tables: Dict[str, Tuple[int, str, Optional[int]]] = {} # noqa: UP006
149-
regular_files: List[Tuple[str, Optional[int]]] = [] # noqa: UP006
150130

151131
for blob in bucket.list_blobs(prefix=prefix):
152132
if skip_cold_storage and self._should_skip_gcs_cold_storage(blob):
@@ -155,14 +135,19 @@ def get_table_names(
155135
f"(storage_class: {getattr(blob, 'storage_class', None)})"
156136
)
157137
continue
158-
self._classify_gcs_blob(iceberg_tables, regular_files, blob)
138+
self._update_iceberg_entry(iceberg_tables, blob.name, blob.size)
159139

160140
iceberg_dirs = set(iceberg_tables.keys())
161141
for _, metadata_blob_path, size in iceberg_tables.values():
162142
yield metadata_blob_path, size
163-
for file_path, size in regular_files:
164-
if not any(file_path.startswith(d + "/") for d in iceberg_dirs):
165-
yield file_path, size
143+
144+
for blob in bucket.list_blobs(prefix=prefix):
145+
if skip_cold_storage and self._should_skip_gcs_cold_storage(blob):
146+
continue
147+
if not self._ICEBERG_METADATA_RE.match(blob.name) and not any(
148+
blob.name.startswith(d + "/") for d in iceberg_dirs
149+
):
150+
yield blob.name, blob.size
166151

167152
def close(self, service_connection):
168153
os.environ.pop("GOOGLE_CLOUD_PROJECT", "")

ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py

Lines changed: 16 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,8 @@
1313
Datalake S3 Client
1414
"""
1515

16-
import re
1716
from functools import partial
18-
from typing import Callable, Dict, Iterable, List, Optional, Set, Tuple # noqa: UP035
17+
from typing import Callable, Dict, Iterable, Optional, Set, Tuple # noqa: UP035
1918

2019
from metadata.clients.aws_client import AWSClient
2120
from metadata.generated.schema.entity.services.connections.database.datalake.s3Config import (
@@ -62,8 +61,6 @@ def get_database_schema_names(self, bucket_name: Optional[str]) -> Iterable[str]
6261
for bucket in self._client.list_buckets()["Buckets"]:
6362
yield bucket["Name"]
6463

65-
_ICEBERG_METADATA_RE = re.compile(r"^(.*)/metadata/v(\d+)\.metadata\.json$")
66-
6764
@staticmethod
6865
def _should_skip_s3_cold_storage(key: dict) -> bool:
6966
storage_class = key.get("StorageClass", "STANDARD")
@@ -73,42 +70,24 @@ def _should_skip_s3_cold_storage(key: dict) -> bool:
7370
"DEEP_ARCHIVE_ACCESS",
7471
}
7572

76-
def _classify_s3_object(
77-
self,
78-
iceberg_tables: Dict[str, Tuple[int, str, Optional[int]]], # noqa: UP006, UP045
79-
regular_files: List[Tuple[str, Optional[int]]], # noqa: UP006, UP045
80-
key_name: str,
81-
size: Optional[int], # noqa: UP045
82-
) -> None:
83-
match = self._ICEBERG_METADATA_RE.match(key_name)
84-
if match:
85-
table_dir, version = match.group(1), int(match.group(2))
86-
existing = iceberg_tables.get(table_dir)
87-
if existing is None or version > existing[0]:
88-
iceberg_tables[table_dir] = (version, key_name, size)
89-
else:
90-
regular_files.append((key_name, size))
91-
9273
def get_table_names(
9374
self,
9475
bucket_name: str,
9576
prefix: Optional[str], # noqa: UP045
9677
skip_cold_storage: bool = False,
9778
) -> Iterable[Tuple[str, Optional[int]]]: # noqa: UP006, UP045
9879
"""
99-
Lists tables in an S3 bucket using a single pass.
80+
Lists tables in an S3 bucket using a two-pass approach.
10081
101-
Iceberg table directories are identified by objects matching
102-
``<table_dir>/metadata/v<N>.metadata.json``. Only the object with the
103-
highest integer version is yielded per table directory. Regular files
104-
not under any Iceberg table directory are also yielded.
82+
Pass 1 collects only the Iceberg table dict (memory proportional to the
83+
number of Iceberg tables, which is always small). Pass 2 streams regular
84+
files without accumulation, keeping memory overhead at O(1) per object.
10585
"""
10686
kwargs: Dict[str, str] = {"Bucket": bucket_name} # noqa: UP006
10787
if prefix:
10888
kwargs["Prefix"] = prefix if prefix.endswith("/") else f"{prefix}/"
10989

11090
iceberg_tables: Dict[str, Tuple[int, str, Optional[int]]] = {} # noqa: UP006
111-
regular_files: List[Tuple[str, Optional[int]]] = [] # noqa: UP006
11291

11392
for key in list_s3_objects(self._client, **kwargs):
11493
key_name = key["Key"]
@@ -120,14 +99,21 @@ def get_table_names(
12099
f"ArchiveStatus: {key.get('ArchiveStatus', '')})"
121100
)
122101
continue
123-
self._classify_s3_object(iceberg_tables, regular_files, key_name, size)
102+
self._update_iceberg_entry(iceberg_tables, key_name, size)
124103

125104
iceberg_dirs = set(iceberg_tables.keys())
126105
for _, metadata_key, size in iceberg_tables.values():
127106
yield metadata_key, size
128-
for file_path, size in regular_files:
129-
if not any(file_path.startswith(d + "/") for d in iceberg_dirs):
130-
yield file_path, size
107+
108+
for key in list_s3_objects(self._client, **kwargs):
109+
key_name = key["Key"]
110+
size = key.get("Size")
111+
if skip_cold_storage and self._should_skip_s3_cold_storage(key):
112+
continue
113+
if not self._ICEBERG_METADATA_RE.match(key_name) and not any(
114+
key_name.startswith(d + "/") for d in iceberg_dirs
115+
):
116+
yield key_name, size
131117

132118
def get_folders_prefix(self, bucket_name: str, prefix: Optional[str]) -> Iterable[str]: # noqa: UP045
133119
for page in self._client.get_paginator("list_objects_v2").paginate(

ingestion/src/metadata/readers/dataframe/json.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,17 @@ def _is_json_lines(file_obj) -> bool:
153153
return True
154154
try:
155155
obj = json.loads(first_line)
156-
return isinstance(obj, dict) and not obj.get("$schema")
156+
if not isinstance(obj, dict):
157+
return False
158+
if obj.get("$schema") is not None:
159+
return False
160+
if obj.get("format-version") is not None:
161+
return False
162+
if isinstance(obj.get("schema"), dict) and isinstance(
163+
obj.get("schema", {}).get("fields"), list
164+
):
165+
return False
166+
return True
157167
except json.JSONDecodeError:
158168
return False
159169

ingestion/tests/unit/source/database/test_iceberg_discovery.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,19 @@
1717
from unittest.mock import MagicMock, patch
1818

1919
# Stub google.cloud.storage so this test file runs without the google-cloud-storage
20-
# package being installed. The logic under test (_get_iceberg_tables, get_table_names)
21-
# only interacts with the storage client through our own mock objects.
22-
_gcloud_mod = types.ModuleType("google.cloud")
23-
_storage_mod = types.ModuleType("google.cloud.storage")
24-
_storage_mod.Client = MagicMock
25-
sys.modules.setdefault("google", types.ModuleType("google"))
26-
sys.modules["google.cloud"] = _gcloud_mod
27-
sys.modules["google.cloud.storage"] = _storage_mod
20+
# package being installed. setdefault preserves the real module if it is already
21+
# present, which prevents breaking other tests or masking integration issues.
22+
_google_mod = sys.modules.setdefault("google", types.ModuleType("google"))
23+
_gcloud_mod = sys.modules.setdefault("google.cloud", types.ModuleType("google.cloud"))
24+
_storage_mod = sys.modules.setdefault(
25+
"google.cloud.storage", types.ModuleType("google.cloud.storage")
26+
)
27+
if not hasattr(_storage_mod, "Client"):
28+
_storage_mod.Client = MagicMock
29+
if not hasattr(_google_mod, "cloud"):
30+
_google_mod.cloud = _gcloud_mod
31+
if not hasattr(_gcloud_mod, "storage"):
32+
_gcloud_mod.storage = _storage_mod
2833

2934
from metadata.ingestion.source.database.datalake.clients.gcs import ( # noqa: E402
3035
DatalakeGcsClient,

0 commit comments

Comments
 (0)