Skip to content

Commit 6f83140

Browse files
fix(datalake): reduce SonarQube cognitive complexity to pass quality gate
1 parent caef18a commit 6f83140

2 files changed

Lines changed: 80 additions & 64 deletions

File tree

  • ingestion/src/metadata/ingestion/source/database/datalake/clients

ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py

Lines changed: 39 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,43 @@ def _should_skip_gcs_cold_storage(blob) -> bool:
112112
storage_class = getattr(blob, "storage_class", None)
113113
return bool(storage_class and storage_class in GCS_COLD_STORAGE_CLASSES)
114114

115+
def _discover_iceberg_dirs(
116+
self,
117+
bucket,
118+
prefix: Optional[str], # noqa: UP045
119+
skip_cold_storage: bool,
120+
) -> Tuple[Dict[str, Tuple[int, str, int | None]], Set[str]]: # noqa: UP006
121+
"""Pass 1: discover Iceberg table directories and return (iceberg_tables, iceberg_dirs)."""
122+
iceberg_tables: Dict[str, Tuple[int, str, int | None]] = {} # noqa: UP006
123+
cold_iceberg_dirs: Set[str] = set() # noqa: UP006
124+
125+
for blob in bucket.list_blobs(prefix=prefix):
126+
if skip_cold_storage and self._should_skip_gcs_cold_storage(blob):
127+
match = self._ICEBERG_METADATA_RE.match(blob.name)
128+
if match:
129+
cold_iceberg_dirs.add(match.group(1))
130+
continue
131+
self._update_iceberg_entry(iceberg_tables, blob.name, blob.size)
132+
133+
return iceberg_tables, set(iceberg_tables.keys()) | cold_iceberg_dirs
134+
135+
def _yield_regular_files(
136+
self,
137+
bucket,
138+
prefix: Optional[str], # noqa: UP045
139+
skip_cold_storage: bool,
140+
iceberg_dirs: Set[str], # noqa: UP006
141+
) -> Iterable[Tuple[str, Optional[int]]]: # noqa: UP006, UP045
142+
"""Pass 2: stream regular files, skipping Iceberg directory contents."""
143+
for blob in bucket.list_blobs(prefix=prefix):
144+
if skip_cold_storage and self._should_skip_gcs_cold_storage(blob):
145+
continue
146+
if iceberg_dirs and (
147+
self._ICEBERG_METADATA_RE.match(blob.name) or any(blob.name.startswith(d + "/") for d in iceberg_dirs)
148+
):
149+
continue
150+
yield blob.name, blob.size
151+
115152
def get_table_names(
116153
self,
117154
bucket_name: str,
@@ -126,38 +163,12 @@ def get_table_names(
126163
files without accumulation, keeping memory overhead at O(1) per object.
127164
"""
128165
bucket = self._client.get_bucket(bucket_name)
129-
iceberg_tables: Dict[str, Tuple[int, str, int | None]] = {} # noqa: UP006
130-
cold_iceberg_dirs: Set[str] = set() # noqa: UP006
166+
iceberg_tables, iceberg_dirs = self._discover_iceberg_dirs(bucket, prefix, skip_cold_storage)
131167

132-
for blob in bucket.list_blobs(prefix=prefix):
133-
is_cold = skip_cold_storage and self._should_skip_gcs_cold_storage(blob)
134-
if is_cold:
135-
logger.debug(
136-
f"Skipping cold storage object: {blob.name} (storage_class: {getattr(blob, 'storage_class', None)})"
137-
)
138-
match = self._ICEBERG_METADATA_RE.match(blob.name)
139-
if match:
140-
cold_iceberg_dirs.add(match.group(1))
141-
continue
142-
self._update_iceberg_entry(iceberg_tables, blob.name, blob.size)
143-
144-
iceberg_dirs = set(iceberg_tables.keys()) | cold_iceberg_dirs
145168
for _, metadata_blob_path, size in iceberg_tables.values():
146169
yield metadata_blob_path, size
147170

148-
if not iceberg_dirs:
149-
for blob in bucket.list_blobs(prefix=prefix):
150-
if skip_cold_storage and self._should_skip_gcs_cold_storage(blob):
151-
continue
152-
yield blob.name, blob.size
153-
else:
154-
for blob in bucket.list_blobs(prefix=prefix):
155-
if skip_cold_storage and self._should_skip_gcs_cold_storage(blob):
156-
continue
157-
if not self._ICEBERG_METADATA_RE.match(blob.name) and not any(
158-
blob.name.startswith(d + "/") for d in iceberg_dirs
159-
):
160-
yield blob.name, blob.size
171+
yield from self._yield_regular_files(bucket, prefix, skip_cold_storage, iceberg_dirs)
161172

162173
def close(self, service_connection):
163174
os.environ.pop("GOOGLE_CLOUD_PROJECT", "")

ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py

Lines changed: 41 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,45 @@ def _should_skip_s3_cold_storage(key: dict) -> bool:
7070
"DEEP_ARCHIVE_ACCESS",
7171
}
7272

73+
def _discover_iceberg_dirs(
74+
self,
75+
skip_cold_storage: bool,
76+
**kwargs: str,
77+
) -> Tuple[Dict[str, Tuple[int, str, int | None]], Set[str]]: # noqa: UP006
78+
"""Pass 1: discover Iceberg table directories and return (iceberg_tables, iceberg_dirs)."""
79+
iceberg_tables: Dict[str, Tuple[int, str, int | None]] = {} # noqa: UP006
80+
cold_iceberg_dirs: Set[str] = set() # noqa: UP006
81+
82+
for key in list_s3_objects(self._client, **kwargs):
83+
key_name = key["Key"]
84+
size = key.get("Size")
85+
if skip_cold_storage and self._should_skip_s3_cold_storage(key):
86+
match = self._ICEBERG_METADATA_RE.match(key_name)
87+
if match:
88+
cold_iceberg_dirs.add(match.group(1))
89+
continue
90+
self._update_iceberg_entry(iceberg_tables, key_name, size)
91+
92+
return iceberg_tables, set(iceberg_tables.keys()) | cold_iceberg_dirs
93+
94+
def _yield_regular_files(
95+
self,
96+
skip_cold_storage: bool,
97+
iceberg_dirs: Set[str], # noqa: UP006
98+
**kwargs: str,
99+
) -> Iterable[Tuple[str, Optional[int]]]: # noqa: UP006, UP045
100+
"""Pass 2: stream regular files, skipping Iceberg directory contents."""
101+
for key in list_s3_objects(self._client, **kwargs):
102+
key_name = key["Key"]
103+
size = key.get("Size")
104+
if skip_cold_storage and self._should_skip_s3_cold_storage(key):
105+
continue
106+
if iceberg_dirs and (
107+
self._ICEBERG_METADATA_RE.match(key_name) or any(key_name.startswith(d + "/") for d in iceberg_dirs)
108+
):
109+
continue
110+
yield key_name, size
111+
73112
def get_table_names(
74113
self,
75114
bucket_name: str,
@@ -87,46 +126,12 @@ def get_table_names(
87126
if prefix:
88127
kwargs["Prefix"] = prefix if prefix.endswith("/") else f"{prefix}/"
89128

90-
iceberg_tables: Dict[str, Tuple[int, str, int | None]] = {} # noqa: UP006
91-
cold_iceberg_dirs: Set[str] = set() # noqa: UP006
129+
iceberg_tables, iceberg_dirs = self._discover_iceberg_dirs(skip_cold_storage, **kwargs)
92130

93-
for key in list_s3_objects(self._client, **kwargs):
94-
key_name = key["Key"]
95-
size = key.get("Size")
96-
is_cold = skip_cold_storage and self._should_skip_s3_cold_storage(key)
97-
if is_cold:
98-
logger.debug(
99-
f"Skipping cold storage object: {key_name} "
100-
f"(StorageClass: {key.get('StorageClass', 'STANDARD')}, "
101-
f"ArchiveStatus: {key.get('ArchiveStatus', '')})"
102-
)
103-
match = self._ICEBERG_METADATA_RE.match(key_name)
104-
if match:
105-
cold_iceberg_dirs.add(match.group(1))
106-
continue
107-
self._update_iceberg_entry(iceberg_tables, key_name, size)
108-
109-
iceberg_dirs = set(iceberg_tables.keys()) | cold_iceberg_dirs
110131
for _, metadata_key, size in iceberg_tables.values():
111132
yield metadata_key, size
112133

113-
if not iceberg_dirs:
114-
for key in list_s3_objects(self._client, **kwargs):
115-
key_name = key["Key"]
116-
size = key.get("Size")
117-
if skip_cold_storage and self._should_skip_s3_cold_storage(key):
118-
continue
119-
yield key_name, size
120-
else:
121-
for key in list_s3_objects(self._client, **kwargs):
122-
key_name = key["Key"]
123-
size = key.get("Size")
124-
if skip_cold_storage and self._should_skip_s3_cold_storage(key):
125-
continue
126-
if not self._ICEBERG_METADATA_RE.match(key_name) and not any(
127-
key_name.startswith(d + "/") for d in iceberg_dirs
128-
):
129-
yield key_name, size
134+
yield from self._yield_regular_files(skip_cold_storage, iceberg_dirs, **kwargs)
130135

131136
def get_folders_prefix(self, bucket_name: str, prefix: Optional[str]) -> Iterable[str]: # noqa: UP045
132137
for page in self._client.get_paginator("list_objects_v2").paginate(

0 commit comments

Comments
 (0)