Skip to content

Commit be355e5

Browse files
committed
Fix: datalake parse array type nested strcuture fields inside json file (#27798)
1 parent d121367 commit be355e5

4 files changed

Lines changed: 164 additions & 9 deletions

File tree

ingestion/src/metadata/ingestion/source/database/redshift/utils.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
"""
1414
import re
1515
from collections import defaultdict
16+
from typing import Any
1617

1718
import sqlalchemy as sa
1819
from packaging.version import Version
@@ -49,6 +50,59 @@
4950
logger = ingestion_logger()
5051

5152

53+
def _redshift_initialize(self, connection):
54+
"""
55+
Override PGDialect + PGDialect_psycopg2 initialization to skip
56+
PostgreSQL-specific queries that Redshift doesn't support
57+
(e.g., SHOW standard_conforming_strings).
58+
"""
59+
from sqlalchemy.engine.default import DefaultDialect # noqa: PLC0415
60+
61+
DefaultDialect.initialize(self, connection)
62+
self._backslash_escapes = False
63+
self.supports_smallserial = False
64+
self._supports_drop_index_concurrently = False
65+
self.supports_identity_columns = False
66+
self._has_native_hstore = False
67+
68+
69+
def _load_domains(self, connection, schema: str | None = None, **kw: Any) -> dict:
70+
"""
71+
Override to return empty dict since Redshift does not support user-created
72+
domains and pg_catalog.pg_collation does not exist in Redshift, causing a
73+
ProgrammingError that aborts the transaction and breaks all subsequent queries.
74+
"""
75+
return {}
76+
77+
78+
def get_temp_table_names(self, connection, schema=None, **kw):
79+
"""
80+
Override PGDialect's get_temp_table_names to avoid querying
81+
pg_catalog.pg_class.relpersistence which does not exist in Redshift,
82+
causing a ProgrammingError that aborts the transaction and breaks all
83+
subsequent queries.
84+
"""
85+
return []
86+
87+
88+
def get_multi_columns(
89+
self,
90+
connection,
91+
schema: str | None = None,
92+
filter_names: Any | None = None,
93+
scope: Any | None = None,
94+
kind: Any | None = None,
95+
**kw: Any,
96+
):
97+
"""
98+
Override PGDialect's get_multi_columns to avoid querying
99+
pg_attribute.attcollation which does not exist in Redshift.
100+
Falls back to the default implementation that delegates to
101+
the already-overridden get_columns() method.
102+
"""
103+
return self._default_multi_reflect(self.get_columns, connection, **kw)
104+
105+
52106
# pylint: disable=protected-access
53107
@calculate_execution_time()
54108
@reflection.cache

ingestion/src/metadata/profiler/orm/converter/redshift/converter.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
to an SQLAlchemy ORM class.
1515
"""
1616

17-
from typing import Dict, Set
17+
from typing import Dict, Set, cast # noqa: UP035
1818

1919
from sqlalchemy.sql.sqltypes import TypeEngine
2020

@@ -48,5 +48,5 @@ def map_sqa_to_om_types() -> Dict[TypeEngine, Set[DataType]]:
4848

4949
return {
5050
**CommonMapTypes.map_sqa_to_om_types(),
51-
GEOMETRY: {DataType.GEOMETRY},
51+
cast("TypeEngine", GEOMETRY): {DataType.GEOMETRY},
5252
}

ingestion/src/metadata/utils/datalake/datalake_utils.py

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,16 @@
3232
logger = utils_logger()
3333

3434

35+
class _ArrayOfStruct:
36+
"""Marker for a JSON value observed as a list of dicts. Carries the merged struct shape
37+
so downstream column construction can render it as ARRAY<STRUCT<...>>."""
38+
39+
__slots__ = ("struct",)
40+
41+
def __init__(self, struct: Dict): # noqa: UP006
42+
self.struct = struct
43+
44+
3545
def fetch_dataframe_generator(
3646
config_source,
3747
client,
@@ -288,6 +298,10 @@ def _get_columns(cls, data_frame: "DataFrame"):
288298
}
289299
if data_type == DataType.ARRAY:
290300
parsed_string["arrayDataType"] = DataType.UNKNOWN
301+
struct_children = cls._get_array_struct_children(data_frame[column].dropna()[:100])
302+
if struct_children:
303+
parsed_string["arrayDataType"] = DataType.STRUCT
304+
parsed_string["children"] = struct_children
291305

292306
if data_type == DataType.JSON:
293307
parsed_string["children"] = cls.get_children(
@@ -400,6 +414,11 @@ def unique_json_structure(cls, dicts: List[Dict]) -> Dict:
400414
result[key] = cls.unique_json_structure(
401415
[nested_json if isinstance(nested_json, dict) else {}, value]
402416
)
417+
elif isinstance(value, list) and value and all(isinstance(item, dict) for item in value):
418+
merged_struct = cls.unique_json_structure(value)
419+
existing = result.get(key)
420+
existing_struct = existing.struct if isinstance(existing, _ArrayOfStruct) else {}
421+
result[key] = _ArrayOfStruct(cls.unique_json_structure([existing_struct, merged_struct]))
403422
else:
404423
result[key] = value
405424
return result
@@ -414,15 +433,19 @@ def construct_json_column_children(cls, json_column: Dict) -> List[Dict]:
414433
children = []
415434
for key, value in json_column.items():
416435
column = {}
417-
type_ = type(value).__name__.lower()
418-
column["dataTypeDisplay"] = cls._data_formats.get(
419-
type_, DataType.UNKNOWN
420-
).value
421-
column["dataType"] = cls._data_formats.get(type_, DataType.UNKNOWN).value
422436
column["name"] = truncate_column_name(key)
423437
column["displayName"] = key
424-
if isinstance(value, dict):
425-
column["children"] = cls.construct_json_column_children(value)
438+
if isinstance(value, _ArrayOfStruct):
439+
column["dataType"] = DataType.ARRAY.value
440+
column["dataTypeDisplay"] = DataType.ARRAY.value
441+
column["arrayDataType"] = DataType.STRUCT
442+
column["children"] = cls.construct_json_column_children(value.struct)
443+
else:
444+
type_ = type(value).__name__.lower()
445+
column["dataTypeDisplay"] = cls._data_formats.get(type_, DataType.UNKNOWN).value
446+
column["dataType"] = cls._data_formats.get(type_, DataType.UNKNOWN).value
447+
if isinstance(value, dict):
448+
column["children"] = cls.construct_json_column_children(value)
426449
children.append(column)
427450

428451
return children
@@ -451,6 +474,27 @@ def get_children(cls, json_column) -> List[Dict]:
451474

452475
return cls.construct_json_column_children(json_structure)
453476

477+
@classmethod
478+
def _get_array_struct_children(cls, array_column: Any) -> List[Dict]: # noqa: UP006
479+
"""For an ARRAY column whose elements are dicts, infer the merged struct shape and
480+
return it as children. Returns an empty list when elements are not dicts.
481+
"""
482+
flattened = []
483+
for value in array_column.values.tolist():
484+
if isinstance(value, str):
485+
try:
486+
value = json.loads(value) # noqa: PLW2901
487+
except (TypeError, ValueError):
488+
continue
489+
if isinstance(value, dict):
490+
flattened.append(value)
491+
elif isinstance(value, list):
492+
flattened.extend(item for item in value if isinstance(item, dict))
493+
if not flattened:
494+
return []
495+
merged_struct = cls.unique_json_structure(flattened)
496+
return cls.construct_json_column_children(merged_struct)
497+
454498

455499
# pylint: disable=import-outside-toplevel
456500
class ParquetDataFrameColumnParser:

ingestion/tests/unit/utils/test_datalake.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,63 @@ def test_construct_column(self):
167167
for el in zip(expected, actual):
168168
self.assertDictEqual(el[0], el[1])
169169

170+
def test_unique_json_structure_with_list_of_dicts(self):
171+
"""list-of-dicts values are merged into a struct shape (e.g. Iceberg `schema.fields`)."""
172+
sample_data = [
173+
{
174+
"schema": {
175+
"fields": [
176+
{"id": 1, "name": "customer_id", "type": "string"},
177+
{"id": 2, "name": "customer_type_cd", "type": "string"},
178+
]
179+
}
180+
}
181+
]
182+
183+
actual = GenericDataFrameColumnParser.unique_json_structure(sample_data)
184+
fields_value = actual["schema"]["fields"]
185+
186+
from metadata.utils.datalake.datalake_utils import _ArrayOfStruct
187+
188+
assert isinstance(fields_value, _ArrayOfStruct)
189+
assert set(fields_value.struct.keys()) == {"id", "name", "type"}
190+
191+
def test_unique_json_structure_merges_list_of_dicts_across_samples(self):
192+
"""list-of-dicts values across multiple samples are unioned, not overwritten."""
193+
from metadata.utils.datalake.datalake_utils import _ArrayOfStruct
194+
195+
sample_data = [
196+
{"schema": {"fields": [{"id": 1, "name": "customer_id", "type": "string"}]}},
197+
{"schema": {"fields": [{"id": 2, "required": False, "type": "string"}]}},
198+
{"schema": {"fields": [{"description": "ciam id"}]}},
199+
]
200+
201+
actual = GenericDataFrameColumnParser.unique_json_structure(sample_data)
202+
fields_value = actual["schema"]["fields"]
203+
204+
assert isinstance(fields_value, _ArrayOfStruct)
205+
assert set(fields_value.struct.keys()) == {"id", "name", "type", "required", "description"}
206+
207+
def test_construct_column_with_array_of_struct(self):
208+
"""list-of-dicts values render as ARRAY<STRUCT<...>> with children for the struct fields."""
209+
structure = {
210+
"schema": {
211+
"fields": [
212+
{"id": 1, "name": "customer_id", "type": "string"},
213+
{"id": 2, "name": "ciam_id", "type": "string"},
214+
]
215+
}
216+
}
217+
merged = GenericDataFrameColumnParser.unique_json_structure([structure])
218+
children = GenericDataFrameColumnParser.construct_json_column_children(merged)
219+
220+
schema_col = children[0]
221+
fields_col = next(c for c in schema_col["children"] if c["name"] == "fields")
222+
223+
assert fields_col["dataType"] == DataType.ARRAY.value
224+
assert fields_col["arrayDataType"] == DataType.STRUCT
225+
assert {child["name"] for child in fields_col["children"]} == {"id", "name", "type"}
226+
170227
def test_create_column_object(self):
171228
"""test create column object fn"""
172229
formatted_column = GenericDataFrameColumnParser.construct_json_column_children(

0 commit comments

Comments
 (0)