Skip to content

Commit caef18a

Browse files
fix(datalake): cold-storage Iceberg dir detection and tighten JSON classification
1 parent b85aacc commit caef18a

3 files changed

Lines changed: 15 additions & 10 deletions

File tree

  • ingestion/src/metadata

ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,16 +127,21 @@ def get_table_names(
127127
"""
128128
bucket = self._client.get_bucket(bucket_name)
129129
iceberg_tables: Dict[str, Tuple[int, str, int | None]] = {} # noqa: UP006
130+
cold_iceberg_dirs: Set[str] = set() # noqa: UP006
130131

131132
for blob in bucket.list_blobs(prefix=prefix):
132-
if skip_cold_storage and self._should_skip_gcs_cold_storage(blob):
133+
is_cold = skip_cold_storage and self._should_skip_gcs_cold_storage(blob)
134+
if is_cold:
133135
logger.debug(
134136
f"Skipping cold storage object: {blob.name} (storage_class: {getattr(blob, 'storage_class', None)})"
135137
)
138+
match = self._ICEBERG_METADATA_RE.match(blob.name)
139+
if match:
140+
cold_iceberg_dirs.add(match.group(1))
136141
continue
137142
self._update_iceberg_entry(iceberg_tables, blob.name, blob.size)
138143

139-
iceberg_dirs = set(iceberg_tables.keys())
144+
iceberg_dirs = set(iceberg_tables.keys()) | cold_iceberg_dirs
140145
for _, metadata_blob_path, size in iceberg_tables.values():
141146
yield metadata_blob_path, size
142147

ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,20 +88,25 @@ def get_table_names(
8888
kwargs["Prefix"] = prefix if prefix.endswith("/") else f"{prefix}/"
8989

9090
iceberg_tables: Dict[str, Tuple[int, str, int | None]] = {} # noqa: UP006
91+
cold_iceberg_dirs: Set[str] = set() # noqa: UP006
9192

9293
for key in list_s3_objects(self._client, **kwargs):
9394
key_name = key["Key"]
9495
size = key.get("Size")
95-
if skip_cold_storage and self._should_skip_s3_cold_storage(key):
96+
is_cold = skip_cold_storage and self._should_skip_s3_cold_storage(key)
97+
if is_cold:
9698
logger.debug(
9799
f"Skipping cold storage object: {key_name} "
98100
f"(StorageClass: {key.get('StorageClass', 'STANDARD')}, "
99101
f"ArchiveStatus: {key.get('ArchiveStatus', '')})"
100102
)
103+
match = self._ICEBERG_METADATA_RE.match(key_name)
104+
if match:
105+
cold_iceberg_dirs.add(match.group(1))
101106
continue
102107
self._update_iceberg_entry(iceberg_tables, key_name, size)
103108

104-
iceberg_dirs = set(iceberg_tables.keys())
109+
iceberg_dirs = set(iceberg_tables.keys()) | cold_iceberg_dirs
105110
for _, metadata_key, size in iceberg_tables.values():
106111
yield metadata_key, size
107112

ingestion/src/metadata/readers/dataframe/json.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,6 @@ def _read_json_object(
126126
and (
127127
data.get("$schema") is not None # JSON Schema files
128128
or data.get("format-version") is not None # Apache Iceberg table metadata
129-
or ( # Delta Lake / Iceberg schema structure
130-
isinstance(data.get("schema"), dict) and isinstance(data.get("schema", {}).get("fields"), list)
131-
)
132129
)
133130
else None
134131
)
@@ -158,9 +155,7 @@ def _is_json_lines(file_obj) -> bool:
158155
return False
159156
if obj.get("$schema") is not None:
160157
return False
161-
if obj.get("format-version") is not None:
162-
return False
163-
return not (isinstance(obj.get("schema"), dict) and isinstance(obj.get("schema", {}).get("fields"), list))
158+
return obj.get("format-version") is None
164159

165160
def _read_json_smart(
166161
self,

0 commit comments

Comments
 (0)