Skip to content

Commit 134c1eb

Browse files
feat(datalake): add GCS/S3 Iceberg table ingestion support (#22644)
1 parent a1e7086 commit 134c1eb

7 files changed

Lines changed: 635 additions & 32 deletions

File tree

ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@
1414
"""
1515

1616
import os
17+
import re
1718
from copy import deepcopy
1819
from functools import partial
19-
from typing import Callable, Iterable, List, Optional, Set, Tuple # noqa: UP035
20+
from typing import Callable, Dict, Iterable, List, Optional, Set, Tuple # noqa: UP035
2021

2122
from google.cloud import storage
2223

@@ -107,21 +108,61 @@ def get_database_schema_names(self, bucket_name: Optional[str]) -> Iterable[str]
107108
for bucket in self._client.list_buckets():
108109
yield bucket.name
109110

111+
_ICEBERG_METADATA_RE = re.compile(r"^(.*)/metadata/v(\d+)\.metadata\.json$")
112+
113+
@staticmethod
114+
def _should_skip_gcs_cold_storage(blob) -> bool:
115+
storage_class = getattr(blob, "storage_class", None)
116+
return bool(storage_class and storage_class in GCS_COLD_STORAGE_CLASSES)
117+
118+
def _classify_gcs_blob(
119+
self,
120+
iceberg_tables: Dict[str, Tuple[int, str, Optional[int]]], # noqa: UP006, UP045
121+
regular_files: List[Tuple[str, Optional[int]]], # noqa: UP006, UP045
122+
blob,
123+
) -> None:
124+
match = self._ICEBERG_METADATA_RE.match(blob.name)
125+
if match:
126+
table_dir, version = match.group(1), int(match.group(2))
127+
existing = iceberg_tables.get(table_dir)
128+
if existing is None or version > existing[0]:
129+
iceberg_tables[table_dir] = (version, blob.name, blob.size)
130+
else:
131+
regular_files.append((blob.name, blob.size))
132+
110133
def get_table_names(
111134
self,
112135
bucket_name: str,
113136
prefix: Optional[str], # noqa: UP045
114137
skip_cold_storage: bool = False,
115138
) -> Iterable[Tuple[str, Optional[int]]]: # noqa: UP006, UP045
139+
"""
140+
Lists tables in a GCS bucket using a single pass.
141+
142+
Iceberg table directories are identified by blobs matching
143+
``<table_dir>/metadata/v<N>.metadata.json``. Only the blob with the
144+
highest integer version is yielded per table directory. Regular files
145+
not under any Iceberg table directory are also yielded.
146+
"""
116147
bucket = self._client.get_bucket(bucket_name)
117-
118-
for key in bucket.list_blobs(prefix=prefix):
119-
if skip_cold_storage:
120-
storage_class = getattr(key, "storage_class", None)
121-
if storage_class and storage_class in GCS_COLD_STORAGE_CLASSES:
122-
logger.debug(f"Skipping cold storage object: {key.name} (storage_class: {storage_class})")
123-
continue
124-
yield key.name, key.size
148+
iceberg_tables: Dict[str, Tuple[int, str, Optional[int]]] = {} # noqa: UP006
149+
regular_files: List[Tuple[str, Optional[int]]] = [] # noqa: UP006
150+
151+
for blob in bucket.list_blobs(prefix=prefix):
152+
if skip_cold_storage and self._should_skip_gcs_cold_storage(blob):
153+
logger.debug(
154+
f"Skipping cold storage object: {blob.name} "
155+
f"(storage_class: {getattr(blob, 'storage_class', None)})"
156+
)
157+
continue
158+
self._classify_gcs_blob(iceberg_tables, regular_files, blob)
159+
160+
iceberg_dirs = set(iceberg_tables.keys())
161+
for _, metadata_blob_path, size in iceberg_tables.values():
162+
yield metadata_blob_path, size
163+
for file_path, size in regular_files:
164+
if not any(file_path.startswith(d + "/") for d in iceberg_dirs):
165+
yield file_path, size
125166

126167
def close(self, service_connection):
127168
os.environ.pop("GOOGLE_CLOUD_PROJECT", "")

ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py

Lines changed: 58 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@
1313
Datalake S3 Client
1414
"""
1515

16+
import re
1617
from functools import partial
17-
from typing import Callable, Iterable, Optional, Set, Tuple # noqa: UP035
18+
from typing import Callable, Dict, Iterable, List, Optional, Set, Tuple # noqa: UP035
1819

1920
from metadata.clients.aws_client import AWSClient
2021
from metadata.generated.schema.entity.services.connections.database.datalake.s3Config import (
@@ -61,31 +62,72 @@ def get_database_schema_names(self, bucket_name: Optional[str]) -> Iterable[str]
6162
for bucket in self._client.list_buckets()["Buckets"]:
6263
yield bucket["Name"]
6364

65+
_ICEBERG_METADATA_RE = re.compile(r"^(.*)/metadata/v(\d+)\.metadata\.json$")
66+
67+
@staticmethod
68+
def _should_skip_s3_cold_storage(key: dict) -> bool:
69+
storage_class = key.get("StorageClass", "STANDARD")
70+
archive_status = key.get("ArchiveStatus", "")
71+
return storage_class in S3_COLD_STORAGE_CLASSES or archive_status in {
72+
"ARCHIVE_ACCESS",
73+
"DEEP_ARCHIVE_ACCESS",
74+
}
75+
76+
def _classify_s3_object(
77+
self,
78+
iceberg_tables: Dict[str, Tuple[int, str, Optional[int]]], # noqa: UP006, UP045
79+
regular_files: List[Tuple[str, Optional[int]]], # noqa: UP006, UP045
80+
key_name: str,
81+
size: Optional[int], # noqa: UP045
82+
) -> None:
83+
match = self._ICEBERG_METADATA_RE.match(key_name)
84+
if match:
85+
table_dir, version = match.group(1), int(match.group(2))
86+
existing = iceberg_tables.get(table_dir)
87+
if existing is None or version > existing[0]:
88+
iceberg_tables[table_dir] = (version, key_name, size)
89+
else:
90+
regular_files.append((key_name, size))
91+
6492
def get_table_names(
6593
self,
6694
bucket_name: str,
6795
prefix: Optional[str], # noqa: UP045
6896
skip_cold_storage: bool = False,
6997
) -> Iterable[Tuple[str, Optional[int]]]: # noqa: UP006, UP045
70-
kwargs = {"Bucket": bucket_name}
71-
98+
"""
99+
Lists tables in an S3 bucket using a single pass.
100+
101+
Iceberg table directories are identified by objects matching
102+
``<table_dir>/metadata/v<N>.metadata.json``. Only the object with the
103+
highest integer version is yielded per table directory. Regular files
104+
not under any Iceberg table directory are also yielded.
105+
"""
106+
kwargs: Dict[str, str] = {"Bucket": bucket_name} # noqa: UP006
72107
if prefix:
73108
kwargs["Prefix"] = prefix if prefix.endswith("/") else f"{prefix}/"
74109

110+
iceberg_tables: Dict[str, Tuple[int, str, Optional[int]]] = {} # noqa: UP006
111+
regular_files: List[Tuple[str, Optional[int]]] = [] # noqa: UP006
112+
75113
for key in list_s3_objects(self._client, **kwargs):
76-
if skip_cold_storage:
77-
storage_class = key.get("StorageClass", "STANDARD")
78-
archive_status = key.get("ArchiveStatus", "")
79-
if storage_class in S3_COLD_STORAGE_CLASSES or archive_status in {
80-
"ARCHIVE_ACCESS",
81-
"DEEP_ARCHIVE_ACCESS",
82-
}:
83-
logger.debug(
84-
f"Skipping cold storage object: {key['Key']} "
85-
f"(StorageClass: {storage_class}, ArchiveStatus: {archive_status})"
86-
)
87-
continue
88-
yield key["Key"], key.get("Size")
114+
key_name = key["Key"]
115+
size = key.get("Size")
116+
if skip_cold_storage and self._should_skip_s3_cold_storage(key):
117+
logger.debug(
118+
f"Skipping cold storage object: {key_name} "
119+
f"(StorageClass: {key.get('StorageClass', 'STANDARD')}, "
120+
f"ArchiveStatus: {key.get('ArchiveStatus', '')})"
121+
)
122+
continue
123+
self._classify_s3_object(iceberg_tables, regular_files, key_name, size)
124+
125+
iceberg_dirs = set(iceberg_tables.keys())
126+
for _, metadata_key, size in iceberg_tables.values():
127+
yield metadata_key, size
128+
for file_path, size in regular_files:
129+
if not any(file_path.startswith(d + "/") for d in iceberg_dirs):
130+
yield file_path, size
89131

90132
def get_folders_prefix(self, bucket_name: str, prefix: Optional[str]) -> Iterable[str]: # noqa: UP045
91133
for page in self._client.get_paginator("list_objects_v2").paginate(

ingestion/src/metadata/ingestion/source/database/datalake/metadata.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
DataFrameColumnParser,
6868
fetch_dataframe_first_chunk,
6969
get_file_format_type,
70+
get_iceberg_table_name_from_metadata_path,
7071
)
7172
from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table
7273
from metadata.utils.logger import ingestion_logger
@@ -201,7 +202,7 @@ def yield_database_schema(self, schema_name: str) -> Iterable[Either[CreateDatab
201202

202203
def get_tables_name_and_type( # pylint: disable=too-many-branches
203204
self,
204-
) -> Iterable[Tuple[str, TableType, SupportedTypes, Optional[int]]]: # noqa: UP006, UP045
205+
) -> Iterable[Tuple[str, TableType, SupportedTypes, Optional[int], str]]: # noqa: UP006, UP045
205206
"""
206207
Handle table and views.
207208
@@ -238,26 +239,37 @@ def get_tables_name_and_type( # pylint: disable=too-many-branches
238239
logger.debug(f"Object filtered due to unsupported file type: {key_name}")
239240
continue
240241

241-
yield table_name, TableType.Regular, file_extension, file_size
242+
table_type = (
243+
TableType.Iceberg
244+
if get_iceberg_table_name_from_metadata_path(key_name) is not None
245+
else TableType.Regular
246+
)
247+
yield table_name, table_type, file_extension, file_size, key_name
242248

243249
def yield_table(
244250
self,
245-
table_name_and_type: Tuple[str, TableType, SupportedTypes, Optional[int]], # noqa: UP006, UP045
251+
table_name_and_type: Tuple[str, TableType, SupportedTypes, Optional[int], str], # noqa: UP006, UP045
246252
) -> Iterable[Either[CreateTableRequest]]:
247253
"""
248254
From topology.
249255
Prepare a table request and pass it to the sink.
250256
Uses first chunk only for schema inference to avoid loading entire file.
251257
"""
252-
table_name, table_type, table_extension, file_size = table_name_and_type
258+
(
259+
table_name,
260+
table_type,
261+
table_extension,
262+
file_size,
263+
fetch_key,
264+
) = table_name_and_type
253265
schema_name = self.context.get().database_schema
254266
try:
255267
table_constraints = None
256268
data_frame, raw_data = fetch_dataframe_first_chunk(
257269
config_source=self.config_source,
258270
client=self.client.client,
259271
file_fqn=DatalakeTableSchemaWrapper(
260-
key=table_name,
272+
key=fetch_key,
261273
bucket_name=schema_name,
262274
file_extension=table_extension,
263275
file_size=file_size,
@@ -326,7 +338,8 @@ def standardize_table_name(
326338
schema: str,
327339
table: str, # pylint: disable=unused-argument
328340
) -> str:
329-
return table
341+
iceberg_name = get_iceberg_table_name_from_metadata_path(table)
342+
return iceberg_name if iceberg_name is not None else table
330343

331344
def filter_dl_table(self, table_name: str):
332345
"""Filters Datalake Tables based on filterPattern"""

ingestion/src/metadata/readers/dataframe/json.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,20 @@ def _read_json_object(
120120

121121
content = content.decode(UTF_8, errors="ignore") if isinstance(content, bytes) else content
122122
data = json.loads(content)
123-
raw_data = content if isinstance(data, dict) and data.get("$schema") else None
123+
raw_data = (
124+
content
125+
if isinstance(data, dict)
126+
and (
127+
data.get("$schema") is not None # JSON Schema files
128+
or data.get("format-version")
129+
is not None # Apache Iceberg table metadata
130+
or ( # Delta Lake / Iceberg schema structure
131+
isinstance(data.get("schema"), dict)
132+
and isinstance(data.get("schema", {}).get("fields"), list)
133+
)
134+
)
135+
else None
136+
)
124137
data = [data] if isinstance(data, dict) else data
125138

126139
def chunk_generator():

ingestion/src/metadata/utils/datalake/datalake_utils.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import ast
1818
import json
1919
import random
20+
import re
2021
import traceback
2122
from typing import Any, Dict, List, Optional, Union, cast # noqa: UP035
2223

@@ -149,6 +150,25 @@ def fetch_dataframe_first_chunk(
149150
return None
150151

151152

153+
_ICEBERG_METADATA_PATH_RE = re.compile(r"([^/]+)/metadata/v\d+\.metadata\.json$")
154+
155+
156+
def get_iceberg_table_name_from_metadata_path(metadata_path: str) -> Optional[str]:
157+
"""
158+
Extracts the Iceberg table directory name from a metadata file path.
159+
160+
Examples:
161+
"warehouse/orders/metadata/v2.metadata.json" -> "orders"
162+
"my_prefix/sales/metadata/v1.metadata.json" -> "sales"
163+
"simple/metadata/v3.metadata.json" -> "simple"
164+
"data/orders.json" -> None
165+
166+
Returns None if the path does not match the Iceberg metadata pattern.
167+
"""
168+
match = _ICEBERG_METADATA_PATH_RE.search(metadata_path)
169+
return match.group(1) if match else None
170+
171+
152172
def get_file_format_type(key_name, metadata_entry=None):
153173
for supported_types in SupportedTypes:
154174
if key_name.lower().endswith(supported_types.value.lower()):

ingestion/tests/unit/readers/test_json_reader.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,86 @@ def test_empty_json_lines(self):
261261
total_rows = sum(len(chunk) for chunk in chunks)
262262
self.assertEqual(total_rows, 2)
263263

264+
def test_raw_data_set_for_iceberg_metadata(self):
265+
iceberg_metadata = json.dumps(
266+
{
267+
"format-version": 2,
268+
"table-uuid": "abc-123",
269+
"location": "gs://bucket/warehouse/orders",
270+
"schema": {
271+
"type": "struct",
272+
"schema-id": 0,
273+
"fields": [
274+
{"id": 1, "name": "id", "type": "long", "required": True},
275+
{"id": 2, "name": "name", "type": "string", "required": False},
276+
],
277+
},
278+
}
279+
).encode("utf-8")
280+
281+
_, raw_data = JSONDataFrameReader._read_json_object(iceberg_metadata)
282+
283+
assert raw_data is not None
284+
285+
def test_iceberg_columns_parsed_correctly(self):
286+
from metadata.utils.datalake.datalake_utils import JsonDataFrameColumnParser
287+
288+
iceberg_metadata = json.dumps(
289+
{
290+
"format-version": 2,
291+
"table-uuid": "abc-123",
292+
"location": "gs://bucket/warehouse/orders",
293+
"schema": {
294+
"type": "struct",
295+
"schema-id": 0,
296+
"fields": [
297+
{"id": 1, "name": "id", "type": "long", "required": True},
298+
{"id": 2, "name": "name", "type": "string", "required": False},
299+
],
300+
},
301+
}
302+
).encode("utf-8")
303+
304+
_, raw_data = JSONDataFrameReader._read_json_object(iceberg_metadata)
305+
assert raw_data is not None
306+
307+
import pandas as pd
308+
309+
from metadata.generated.schema.entity.data.table import DataType
310+
311+
empty_df = pd.DataFrame()
312+
parser = JsonDataFrameColumnParser(data_frame=empty_df, raw_data=raw_data)
313+
columns = parser.get_columns()
314+
315+
assert len(columns) == 2
316+
column_names = [col.name.root for col in columns]
317+
assert "id" in column_names
318+
assert "name" in column_names
319+
320+
id_col = next(col for col in columns if col.name.root == "id")
321+
name_col = next(col for col in columns if col.name.root == "name")
322+
assert id_col.dataType in {DataType.INT, DataType.BIGINT, DataType.LONG}
323+
assert name_col.dataType in {DataType.STRING, DataType.VARCHAR, DataType.TEXT}
324+
325+
def test_raw_data_none_for_regular_json(self):
326+
regular_json = json.dumps([{"col1": "val1", "col2": 42}]).encode("utf-8")
327+
328+
_, raw_data = JSONDataFrameReader._read_json_object(regular_json)
329+
330+
assert raw_data is None
331+
332+
def test_raw_data_set_for_json_schema(self):
333+
json_schema = json.dumps(
334+
{
335+
"$schema": "http://json-schema.org/draft-07/schema",
336+
"properties": {"id": {"type": "integer"}},
337+
}
338+
).encode("utf-8")
339+
340+
_, raw_data = JSONDataFrameReader._read_json_object(json_schema)
341+
342+
assert raw_data is not None
343+
264344

265345
if __name__ == "__main__":
266346
unittest.main()

0 commit comments

Comments
 (0)