Skip to content

Commit 5b935df

Browse files
Fix: datalake parse array type nested strcuture fields inside json file (#27798)
1 parent 2c262b9 commit 5b935df

4 files changed

Lines changed: 121 additions & 9 deletions

File tree

ingestion/src/metadata/ingestion/source/database/redshift/utils.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import re
1616
from collections import defaultdict
17+
from typing import Any
1718

1819
import sqlalchemy as sa
1920
from packaging.version import Version
@@ -66,7 +67,7 @@ def _redshift_initialize(self, connection):
6667
self._has_native_hstore = False
6768

6869

69-
def _load_domains(self, connection, **kw):
70+
def _load_domains(self, connection, schema: str | None = None, **kw: Any) -> dict:
7071
"""
7172
Override to return empty dict since Redshift does not support user-created
7273
domains and pg_catalog.pg_collation does not exist in Redshift, causing a
@@ -85,7 +86,15 @@ def get_temp_table_names(self, connection, schema=None, **kw):
8586
return []
8687

8788

88-
def get_multi_columns(self, connection, **kw):
89+
def get_multi_columns(
90+
self,
91+
connection,
92+
schema: str | None = None,
93+
filter_names: Any | None = None,
94+
scope: Any | None = None,
95+
kind: Any | None = None,
96+
**kw: Any,
97+
):
8998
"""
9099
Override PGDialect's get_multi_columns to avoid querying
91100
pg_attribute.attcollation which does not exist in Redshift.

ingestion/src/metadata/profiler/orm/converter/redshift/converter.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
to an SQLAlchemy ORM class.
1515
"""
1616

17-
from typing import Dict, Set # noqa: UP035
17+
from typing import Dict, Set, cast # noqa: UP035
1818

1919
from sqlalchemy.sql.sqltypes import TypeEngine
2020

@@ -45,5 +45,5 @@ def map_sqa_to_om_types() -> Dict[TypeEngine, Set[DataType]]: # noqa: UP006
4545

4646
return {
4747
**CommonMapTypes.map_sqa_to_om_types(),
48-
GEOMETRY: {DataType.GEOMETRY},
48+
cast("TypeEngine", GEOMETRY): {DataType.GEOMETRY},
4949
}

ingestion/src/metadata/utils/datalake/datalake_utils.py

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,16 @@
3333
logger = utils_logger()
3434

3535

36+
class _ArrayOfStruct:
37+
"""Marker for a JSON value observed as a list of dicts. Carries the merged struct shape
38+
so downstream column construction can render it as ARRAY<STRUCT<...>>."""
39+
40+
__slots__ = ("struct",)
41+
42+
def __init__(self, struct: Dict): # noqa: UP006
43+
self.struct = struct
44+
45+
3646
def fetch_dataframe_generator(
3747
config_source,
3848
client,
@@ -297,6 +307,10 @@ def _get_columns(cls, data_frame: "DataFrame"): # noqa: F821
297307
}
298308
if data_type == DataType.ARRAY:
299309
parsed_string["arrayDataType"] = DataType.UNKNOWN
310+
struct_children = cls._get_array_struct_children(data_frame[column].dropna()[:100])
311+
if struct_children:
312+
parsed_string["arrayDataType"] = DataType.STRUCT
313+
parsed_string["children"] = struct_children
300314

301315
if data_type == DataType.JSON:
302316
parsed_string["children"] = cls.get_children(data_frame[column].dropna()[:100])
@@ -398,6 +412,11 @@ def unique_json_structure(cls, dicts: List[Dict]) -> Dict: # noqa: UP006
398412
result[key] = cls.unique_json_structure(
399413
[nested_json if isinstance(nested_json, dict) else {}, value]
400414
)
415+
elif isinstance(value, list) and value and all(isinstance(item, dict) for item in value):
416+
merged_struct = cls.unique_json_structure(value)
417+
existing = result.get(key)
418+
existing_struct = existing.struct if isinstance(existing, _ArrayOfStruct) else {}
419+
result[key] = _ArrayOfStruct(cls.unique_json_structure([existing_struct, merged_struct]))
401420
else:
402421
result[key] = value
403422
return result
@@ -412,13 +431,19 @@ def construct_json_column_children(cls, json_column: Dict) -> List[Dict]: # noq
412431
children = []
413432
for key, value in json_column.items():
414433
column = {}
415-
type_ = type(value).__name__.lower()
416-
column["dataTypeDisplay"] = cls._data_formats.get(type_, DataType.UNKNOWN).value
417-
column["dataType"] = cls._data_formats.get(type_, DataType.UNKNOWN).value
418434
column["name"] = truncate_column_name(key)
419435
column["displayName"] = key
420-
if isinstance(value, dict):
421-
column["children"] = cls.construct_json_column_children(value)
436+
if isinstance(value, _ArrayOfStruct):
437+
column["dataType"] = DataType.ARRAY.value
438+
column["dataTypeDisplay"] = DataType.ARRAY.value
439+
column["arrayDataType"] = DataType.STRUCT
440+
column["children"] = cls.construct_json_column_children(value.struct)
441+
else:
442+
type_ = type(value).__name__.lower()
443+
column["dataTypeDisplay"] = cls._data_formats.get(type_, DataType.UNKNOWN).value
444+
column["dataType"] = cls._data_formats.get(type_, DataType.UNKNOWN).value
445+
if isinstance(value, dict):
446+
column["children"] = cls.construct_json_column_children(value)
422447
children.append(column)
423448

424449
return children
@@ -446,6 +471,27 @@ def get_children(cls, json_column) -> List[Dict]: # noqa: UP006
446471

447472
return cls.construct_json_column_children(json_structure)
448473

474+
@classmethod
475+
def _get_array_struct_children(cls, array_column: Any) -> List[Dict]: # noqa: UP006
476+
"""For an ARRAY column whose elements are dicts, infer the merged struct shape and
477+
return it as children. Returns an empty list when elements are not dicts.
478+
"""
479+
flattened = []
480+
for value in array_column.values.tolist():
481+
if isinstance(value, str):
482+
try:
483+
value = json.loads(value) # noqa: PLW2901
484+
except (TypeError, ValueError):
485+
continue
486+
if isinstance(value, dict):
487+
flattened.append(value)
488+
elif isinstance(value, list):
489+
flattened.extend(item for item in value if isinstance(item, dict))
490+
if not flattened:
491+
return []
492+
merged_struct = cls.unique_json_structure(flattened)
493+
return cls.construct_json_column_children(merged_struct)
494+
449495

450496
# pylint: disable=import-outside-toplevel
451497
class ParquetDataFrameColumnParser:

ingestion/tests/unit/utils/test_datalake.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,63 @@ def test_construct_column(self):
167167
for el in zip(expected, actual): # noqa: B905
168168
self.assertDictEqual(el[0], el[1])
169169

170+
def test_unique_json_structure_with_list_of_dicts(self):
171+
"""list-of-dicts values are merged into a struct shape (e.g. Iceberg `schema.fields`)."""
172+
sample_data = [
173+
{
174+
"schema": {
175+
"fields": [
176+
{"id": 1, "name": "customer_id", "type": "string"},
177+
{"id": 2, "name": "customer_type_cd", "type": "string"},
178+
]
179+
}
180+
}
181+
]
182+
183+
actual = GenericDataFrameColumnParser.unique_json_structure(sample_data)
184+
fields_value = actual["schema"]["fields"]
185+
186+
from metadata.utils.datalake.datalake_utils import _ArrayOfStruct
187+
188+
assert isinstance(fields_value, _ArrayOfStruct)
189+
assert set(fields_value.struct.keys()) == {"id", "name", "type"}
190+
191+
def test_unique_json_structure_merges_list_of_dicts_across_samples(self):
192+
"""list-of-dicts values across multiple samples are unioned, not overwritten."""
193+
from metadata.utils.datalake.datalake_utils import _ArrayOfStruct
194+
195+
sample_data = [
196+
{"schema": {"fields": [{"id": 1, "name": "customer_id", "type": "string"}]}},
197+
{"schema": {"fields": [{"id": 2, "required": False, "type": "string"}]}},
198+
{"schema": {"fields": [{"description": "ciam id"}]}},
199+
]
200+
201+
actual = GenericDataFrameColumnParser.unique_json_structure(sample_data)
202+
fields_value = actual["schema"]["fields"]
203+
204+
assert isinstance(fields_value, _ArrayOfStruct)
205+
assert set(fields_value.struct.keys()) == {"id", "name", "type", "required", "description"}
206+
207+
def test_construct_column_with_array_of_struct(self):
208+
"""list-of-dicts values render as ARRAY<STRUCT<...>> with children for the struct fields."""
209+
structure = {
210+
"schema": {
211+
"fields": [
212+
{"id": 1, "name": "customer_id", "type": "string"},
213+
{"id": 2, "name": "ciam_id", "type": "string"},
214+
]
215+
}
216+
}
217+
merged = GenericDataFrameColumnParser.unique_json_structure([structure])
218+
children = GenericDataFrameColumnParser.construct_json_column_children(merged)
219+
220+
schema_col = children[0]
221+
fields_col = next(c for c in schema_col["children"] if c["name"] == "fields")
222+
223+
assert fields_col["dataType"] == DataType.ARRAY.value
224+
assert fields_col["arrayDataType"] == DataType.STRUCT
225+
assert {child["name"] for child in fields_col["children"]} == {"id", "name", "type"}
226+
170227
def test_create_column_object(self):
171228
"""test create column object fn"""
172229
formatted_column = GenericDataFrameColumnParser.construct_json_column_children(STRUCTURE)

0 commit comments

Comments
 (0)