Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@

"""Athena source module"""

import threading
import hashlib
import re
import traceback
from typing import Dict, Iterable, Optional, Set, Tuple # noqa: UP035
from typing import Iterable, Optional, Tuple # noqa: UP035

from pyathena.sqlalchemy.base import AthenaDialect
from sqlalchemy import text
from sqlalchemy.engine.reflection import Inspector

from metadata.clients.aws_client import AWSClient
Expand All @@ -40,6 +42,7 @@
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.basic import EntityName, Markdown
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.custom_properties import (
Expand Down Expand Up @@ -81,7 +84,10 @@
ATHENA_TAG = "ATHENA TAG"
ATHENA_TAG_CLASSIFICATION = "ATHENA TAG CLASSIFICATION"

ATHENA_TABLE_PROPS_CONTEXT_KEY = "_athena_current_tbl_props"
ICEBERG_TABLE_TYPE = "ICEBERG"
PROPERTY_NAME_INVALID_CHARS_PATTERN = re.compile(r"[^A-Za-z0-9_.\-]")
PROPERTY_NAME_REPLACEMENT = "__"
Comment thread
ulixius9 marked this conversation as resolved.
PROPERTY_NAME_MAX_LENGTH = 256

ATHENA_INTERVAL_TYPE_MAP = {
**dict.fromkeys(["enum", "string", "VARCHAR"], PartitionIntervalTypes.COLUMN_VALUE),
Expand Down Expand Up @@ -117,10 +123,8 @@ def __init__(
self.athena_lake_formation_client = AthenaLakeFormationClient(connection=self.service_connection)
self.external_location_map = {}
self.schema_description_map = {}
self._thread_local = threading.local()
self.glue_client = None
self._processed_prop: Set[str] = set() # noqa: UP006
self._processed_prop_lock = threading.Lock()
self._processed_prop: set[str] = set()
self._string_property_type_ref = None

def prepare(self):
Expand Down Expand Up @@ -160,7 +164,9 @@ def query_table_names_and_types(self, schema_name: str) -> Iterable[TableNameAnd
for page in paginator.paginate(DatabaseName=schema_name):
for table in page.get("TableList", []):
params = table.get("Parameters", {})
table_type = TableType.Iceberg if params.get("table_type") == "ICEBERG" else TableType.External
table_type = (
TableType.Iceberg if params.get("table_type") == ICEBERG_TABLE_TYPE else TableType.External
)
results.append(TableNameAndType(name=table["Name"], type_=table_type))
return results # noqa: TRY300
except Exception as exc:
Expand Down Expand Up @@ -307,22 +313,12 @@ def yield_table_tags(
# pylint: disable=arguments-differ
def get_table_description(self, schema_name: str, table_name: str, inspector: Inspector) -> str:
description = None
setattr(self._thread_local, ATHENA_TABLE_PROPS_CONTEXT_KEY, {})
try:
table_info: dict = inspector.get_table_comment(table_name, schema_name)
table_option = inspector.get_table_options(table_name, schema_name)
self.external_location_map[(self.context.get().database, schema_name, table_name)] = table_option.get(
"awsathena_location"
)
setattr(
self._thread_local,
ATHENA_TABLE_PROPS_CONTEXT_KEY,
{
prop_name: str(prop_value)
for prop_name, prop_value in (table_option.get("awsathena_tblproperties") or {}).items()
if prop_value is not None
},
)
# Catch any exception without breaking the ingestion
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
Expand Down Expand Up @@ -352,35 +348,56 @@ def _get_columns_internal(
catalog_id=self.service_connection.catalogId,
)

def get_table_extensions(self, table_name: str) -> Optional[Dict[str, str]]: # noqa: UP006, UP045
def get_table_extensions(self, table_name: str, table_type: TableType | None = None) -> dict[str, str] | None:
if not getattr(self.source_config, "includeCustomProperties", False):
return None
if not self._string_property_type_ref:
return None
tbl_properties = getattr(self._thread_local, ATHENA_TABLE_PROPS_CONTEXT_KEY, {})
if table_type != TableType.Iceberg:
return None
schema_name: str = getattr(self.context.get(), "database_schema", "")
tbl_properties = self._fetch_iceberg_properties(schema_name, table_name)
if not tbl_properties:
Comment thread
ulixius9 marked this conversation as resolved.
return None
registered_properties = {}
for prop_name, prop_value in tbl_properties.items():
with self._processed_prop_lock:
prop_already_registered = prop_name in self._processed_prop
if not prop_already_registered:
if not prop_value:
continue
sanitized_name = PROPERTY_NAME_INVALID_CHARS_PATTERN.sub(PROPERTY_NAME_REPLACEMENT, prop_name)
if len(sanitized_name) > PROPERTY_NAME_MAX_LENGTH:
sanitized_name = hashlib.md5(prop_name.encode("utf-8"), usedforsecurity=False).hexdigest()
if sanitized_name not in self._processed_prop:
Comment thread
ulixius9 marked this conversation as resolved.
try:
self.metadata.create_or_update_custom_property(
self.metadata.create_or_update_custom_property( # pyright: ignore[reportUnknownMemberType, reportUnusedCallResult]
OMetaCustomProperties(
entity_type=Table,
createCustomPropertyRequest=CreateCustomPropertyRequest(
name=prop_name,
description=prop_name,
name=EntityName(sanitized_name),
displayName=prop_name,
description=Markdown(prop_name),
propertyType=self._string_property_type_ref,
customPropertyConfig=None,
),
)
)
with self._processed_prop_lock:
self._processed_prop.add(prop_name)
self._processed_prop.add(sanitized_name)
except Exception as exc:
logger.warning(
f"Failed to register custom property [{prop_name}] for Athena table properties: {exc}"
)
logger.debug(traceback.format_exc())
continue
registered_properties[prop_name] = prop_value
registered_properties[sanitized_name] = prop_value
return registered_properties or None

def _fetch_iceberg_properties(self, schema_name: str, table_name: str) -> dict[str, str]:
"""Read Iceberg native properties from Athena's `<table>$properties` metatable."""
query = text(f'SELECT key, value FROM "{schema_name}"."{table_name}$properties"')
try:
with self.engine.connect() as conn:
result = conn.execute(query)
return {str(row[0]): str(row[1]) for row in result if row[0] is not None and row[1] is not None}
except Exception as exc:
logger.debug(f"Unable to read Iceberg $properties for [{schema_name}.{table_name}]: {exc}")
logger.debug(traceback.format_exc())
return {}
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@
for table_name in self.inspector.get_view_names(schema_name) or []
]

def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]: # noqa: UP006, UP045

Check failure on line 356 in ingestion/src/metadata/ingestion/source/database/common_db_source.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 19 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ3Tcd0KfUmKhQCyPxsq&open=AZ3Tcd0KfUmKhQCyPxsq&pullRequest=27715
"""
Handle table and views.

Expand Down Expand Up @@ -489,7 +489,11 @@
by default there will be no location path
"""

def get_table_extensions(self, table_name: str):
def get_table_extensions(
self,
table_name: str, # pyright: ignore[reportUnusedParameter]
table_type: TableType | None = None, # pyright: ignore[reportUnusedParameter]
):
"""
Method to fetch the extensions of the table
"""
Expand Down Expand Up @@ -569,7 +573,7 @@
),
owners=self.get_owner_ref(table_name=table_name),
locationPath=self.get_location_path(table_name=table_name, schema_name=schema_name),
extension=self.get_table_extensions(table_name=table_name),
extension=self.get_table_extensions(table_name=table_name, table_type=table_type),
)

is_partitioned, partition_details = self.get_table_partition_details(
Expand Down
Loading
Loading