Skip to content

Commit 77a9c02

Browse files
fix(datalake): reduce cognitive complexity and fix union type to pass SonarQube quality gate
1 parent be1ea8f commit 77a9c02

1 file changed

Lines changed: 52 additions & 44 deletions

File tree

ingestion/src/metadata/utils/datalake/datalake_utils.py

Lines changed: 52 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import random
2020
import re
2121
import traceback
22-
from typing import Any, Dict, List, Optional, Union, cast # noqa: UP035
22+
from typing import Any, Dict, List, Optional, cast # noqa: UP035
2323

2424
from metadata.generated.schema.entity.data.table import Column, DataType
2525
from metadata.ingestion.source.database.column_helpers import truncate_column_name
@@ -246,7 +246,7 @@ def create(
246246

247247
@staticmethod
248248
def _get_data_frame(
249-
data_frame: Union[List["DataFrame"], "DataFrame"], # noqa: F821, UP006
249+
data_frame: list[Any] | Any,
250250
sample: bool,
251251
shuffle: bool, # noqa: F821, RUF100
252252
):
@@ -301,6 +301,36 @@ def get_columns(self):
301301
"""
302302
return self._get_columns(self.data_frame)
303303

304+
@classmethod
305+
def _parse_column(cls, data_frame: "DataFrame", column: str) -> Optional[Column]: # noqa: F821, UP045
306+
# use String by default
307+
data_type = DataType.STRING
308+
try:
309+
if hasattr(data_frame[column], "dtypes"):
310+
data_type = cls.fetch_col_types(data_frame, column_name=column)
311+
312+
parsed_string = {
313+
"dataTypeDisplay": data_type.value,
314+
"dataType": data_type,
315+
"name": truncate_column_name(column),
316+
"displayName": column,
317+
}
318+
if data_type == DataType.ARRAY:
319+
parsed_string["arrayDataType"] = DataType.UNKNOWN
320+
struct_children = cls._get_array_struct_children(data_frame[column].dropna()[:100])
321+
if struct_children:
322+
parsed_string["arrayDataType"] = DataType.STRUCT
323+
parsed_string["children"] = struct_children
324+
325+
if data_type == DataType.JSON:
326+
parsed_string["children"] = cls.get_children(data_frame[column].dropna()[:100])
327+
328+
return Column(**parsed_string)
329+
except Exception as exc:
330+
logger.debug(traceback.format_exc())
331+
logger.warning(f"Unexpected exception parsing column [{column}]: {exc}")
332+
return None
333+
304334
@classmethod
305335
def _get_columns(cls, data_frame: "DataFrame"): # noqa: F821
306336
"""
@@ -311,34 +341,10 @@ def _get_columns(cls, data_frame: "DataFrame"): # noqa: F821
311341
"""
312342
cols = []
313343
if hasattr(data_frame, "columns"):
314-
df_columns = list(data_frame.columns)
315-
for column in df_columns:
316-
# use String by default
317-
data_type = DataType.STRING
318-
try:
319-
if hasattr(data_frame[column], "dtypes"):
320-
data_type = cls.fetch_col_types(data_frame, column_name=column)
321-
322-
parsed_string = {
323-
"dataTypeDisplay": data_type.value,
324-
"dataType": data_type,
325-
"name": truncate_column_name(column),
326-
"displayName": column,
327-
}
328-
if data_type == DataType.ARRAY:
329-
parsed_string["arrayDataType"] = DataType.UNKNOWN
330-
struct_children = cls._get_array_struct_children(data_frame[column].dropna()[:100])
331-
if struct_children:
332-
parsed_string["arrayDataType"] = DataType.STRUCT
333-
parsed_string["children"] = struct_children
334-
335-
if data_type == DataType.JSON:
336-
parsed_string["children"] = cls.get_children(data_frame[column].dropna()[:100])
337-
338-
cols.append(Column(**parsed_string))
339-
except Exception as exc:
340-
logger.debug(traceback.format_exc())
341-
logger.warning(f"Unexpected exception parsing column [{column}]: {exc}")
344+
for column in list(data_frame.columns):
345+
parsed_col = cls._parse_column(data_frame, column)
346+
if parsed_col:
347+
cols.append(parsed_col)
342348
return cols
343349

344350
@classmethod
@@ -413,6 +419,21 @@ def fetch_col_types(cls, data_frame, column_name):
413419
logger.debug(traceback.format_exc())
414420
return data_type or DataType.STRING
415421

422+
@classmethod
423+
def _process_unique_json_key(cls, result: Dict, key: str, value: Any) -> None: # noqa: UP006
424+
if isinstance(value, dict):
425+
nested_json = result.get(key, {})
426+
# `isinstance(nested_json, dict)` if for a key we first see a non dict value
427+
# but then see a dict value later, we will consider the key to be a dict.
428+
result[key] = cls.unique_json_structure([nested_json if isinstance(nested_json, dict) else {}, value])
429+
elif isinstance(value, list) and value and all(isinstance(item, dict) for item in value):
430+
merged_struct = cls.unique_json_structure(value)
431+
existing = result.get(key)
432+
existing_struct = existing.struct if isinstance(existing, _ArrayOfStruct) else {}
433+
result[key] = _ArrayOfStruct(cls.unique_json_structure([existing_struct, merged_struct]))
434+
else:
435+
result[key] = value
436+
416437
@classmethod
417438
def unique_json_structure(cls, dicts: List[Dict]) -> Dict: # noqa: UP006
418439
"""Given a sample of `n` json objects, return a json object that represents the unique
@@ -425,20 +446,7 @@ def unique_json_structure(cls, dicts: List[Dict]) -> Dict: # noqa: UP006
425446
result = {}
426447
for dict_ in dicts:
427448
for key, value in dict_.items():
428-
if isinstance(value, dict):
429-
nested_json = result.get(key, {})
430-
# `isinstance(nested_json, dict)` if for a key we first see a non dict value
431-
# but then see a dict value later, we will consider the key to be a dict.
432-
result[key] = cls.unique_json_structure(
433-
[nested_json if isinstance(nested_json, dict) else {}, value]
434-
)
435-
elif isinstance(value, list) and value and all(isinstance(item, dict) for item in value):
436-
merged_struct = cls.unique_json_structure(value)
437-
existing = result.get(key)
438-
existing_struct = existing.struct if isinstance(existing, _ArrayOfStruct) else {}
439-
result[key] = _ArrayOfStruct(cls.unique_json_structure([existing_struct, merged_struct]))
440-
else:
441-
result[key] = value
449+
cls._process_unique_json_key(result, key, value)
442450
return result
443451

444452
@classmethod

0 commit comments

Comments
 (0)