Skip to content

Commit c7a22b4

Browse files
ulixius9jaya6400
authored andcommitted
fix(athena): ingest Iceberg table properties from $properties metatable (open-metadata#27715)
1 parent db7a5f3 commit c7a22b4

8 files changed

Lines changed: 484 additions & 55 deletions

File tree

ingestion/src/metadata/ingestion/source/database/athena/metadata.py

Lines changed: 45 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@
1111

1212
"""Athena source module"""
1313

14-
import threading
14+
import hashlib
15+
import re
1516
import traceback
16-
from typing import Dict, Iterable, Optional, Set, Tuple # noqa: UP035
17+
from typing import Iterable, Optional, Tuple # noqa: UP035
1718

1819
from pyathena.sqlalchemy.base import AthenaDialect
20+
from sqlalchemy import text
1921
from sqlalchemy.engine.reflection import Inspector
2022

2123
from metadata.clients.aws_client import AWSClient
@@ -40,6 +42,7 @@
4042
from metadata.generated.schema.metadataIngestion.workflow import (
4143
Source as WorkflowSource,
4244
)
45+
from metadata.generated.schema.type.basic import EntityName, Markdown
4346
from metadata.ingestion.api.models import Either
4447
from metadata.ingestion.api.steps import InvalidSourceException
4548
from metadata.ingestion.models.custom_properties import (
@@ -81,7 +84,10 @@
8184
ATHENA_TAG = "ATHENA TAG"
8285
ATHENA_TAG_CLASSIFICATION = "ATHENA TAG CLASSIFICATION"
8386

84-
ATHENA_TABLE_PROPS_CONTEXT_KEY = "_athena_current_tbl_props"
87+
ICEBERG_TABLE_TYPE = "ICEBERG"
88+
PROPERTY_NAME_INVALID_CHARS_PATTERN = re.compile(r"[^A-Za-z0-9_.\-]")
89+
PROPERTY_NAME_REPLACEMENT = "__"
90+
PROPERTY_NAME_MAX_LENGTH = 256
8591

8692
ATHENA_INTERVAL_TYPE_MAP = {
8793
**dict.fromkeys(["enum", "string", "VARCHAR"], PartitionIntervalTypes.COLUMN_VALUE),
@@ -117,10 +123,8 @@ def __init__(
117123
self.athena_lake_formation_client = AthenaLakeFormationClient(connection=self.service_connection)
118124
self.external_location_map = {}
119125
self.schema_description_map = {}
120-
self._thread_local = threading.local()
121126
self.glue_client = None
122-
self._processed_prop: Set[str] = set() # noqa: UP006
123-
self._processed_prop_lock = threading.Lock()
127+
self._processed_prop: set[str] = set()
124128
self._string_property_type_ref = None
125129

126130
def prepare(self):
@@ -160,7 +164,9 @@ def query_table_names_and_types(self, schema_name: str) -> Iterable[TableNameAnd
160164
for page in paginator.paginate(DatabaseName=schema_name):
161165
for table in page.get("TableList", []):
162166
params = table.get("Parameters", {})
163-
table_type = TableType.Iceberg if params.get("table_type") == "ICEBERG" else TableType.External
167+
table_type = (
168+
TableType.Iceberg if params.get("table_type") == ICEBERG_TABLE_TYPE else TableType.External
169+
)
164170
results.append(TableNameAndType(name=table["Name"], type_=table_type))
165171
return results # noqa: TRY300
166172
except Exception as exc:
@@ -307,22 +313,12 @@ def yield_table_tags(
307313
# pylint: disable=arguments-differ
308314
def get_table_description(self, schema_name: str, table_name: str, inspector: Inspector) -> str:
309315
description = None
310-
setattr(self._thread_local, ATHENA_TABLE_PROPS_CONTEXT_KEY, {})
311316
try:
312317
table_info: dict = inspector.get_table_comment(table_name, schema_name)
313318
table_option = inspector.get_table_options(table_name, schema_name)
314319
self.external_location_map[(self.context.get().database, schema_name, table_name)] = table_option.get(
315320
"awsathena_location"
316321
)
317-
setattr(
318-
self._thread_local,
319-
ATHENA_TABLE_PROPS_CONTEXT_KEY,
320-
{
321-
prop_name: str(prop_value)
322-
for prop_name, prop_value in (table_option.get("awsathena_tblproperties") or {}).items()
323-
if prop_value is not None
324-
},
325-
)
326322
# Catch any exception without breaking the ingestion
327323
except Exception as exc: # pylint: disable=broad-except
328324
logger.debug(traceback.format_exc())
@@ -352,35 +348,56 @@ def _get_columns_internal(
352348
catalog_id=self.service_connection.catalogId,
353349
)
354350

355-
def get_table_extensions(self, table_name: str) -> Optional[Dict[str, str]]: # noqa: UP006, UP045
351+
def get_table_extensions(self, table_name: str, table_type: TableType | None = None) -> dict[str, str] | None:
352+
if not getattr(self.source_config, "includeCustomProperties", False):
353+
return None
356354
if not self._string_property_type_ref:
357355
return None
358-
tbl_properties = getattr(self._thread_local, ATHENA_TABLE_PROPS_CONTEXT_KEY, {})
356+
if table_type != TableType.Iceberg:
357+
return None
358+
schema_name: str = getattr(self.context.get(), "database_schema", "")
359+
tbl_properties = self._fetch_iceberg_properties(schema_name, table_name)
359360
if not tbl_properties:
360361
return None
361362
registered_properties = {}
362363
for prop_name, prop_value in tbl_properties.items():
363-
with self._processed_prop_lock:
364-
prop_already_registered = prop_name in self._processed_prop
365-
if not prop_already_registered:
364+
if not prop_value:
365+
continue
366+
sanitized_name = PROPERTY_NAME_INVALID_CHARS_PATTERN.sub(PROPERTY_NAME_REPLACEMENT, prop_name)
367+
if len(sanitized_name) > PROPERTY_NAME_MAX_LENGTH:
368+
sanitized_name = hashlib.md5(prop_name.encode("utf-8"), usedforsecurity=False).hexdigest()
369+
if sanitized_name not in self._processed_prop:
366370
try:
367-
self.metadata.create_or_update_custom_property(
371+
self.metadata.create_or_update_custom_property( # pyright: ignore[reportUnknownMemberType, reportUnusedCallResult]
368372
OMetaCustomProperties(
369373
entity_type=Table,
370374
createCustomPropertyRequest=CreateCustomPropertyRequest(
371-
name=prop_name,
372-
description=prop_name,
375+
name=EntityName(sanitized_name),
376+
displayName=prop_name,
377+
description=Markdown(prop_name),
373378
propertyType=self._string_property_type_ref,
379+
customPropertyConfig=None,
374380
),
375381
)
376382
)
377-
with self._processed_prop_lock:
378-
self._processed_prop.add(prop_name)
383+
self._processed_prop.add(sanitized_name)
379384
except Exception as exc:
380385
logger.warning(
381386
f"Failed to register custom property [{prop_name}] for Athena table properties: {exc}"
382387
)
383388
logger.debug(traceback.format_exc())
384389
continue
385-
registered_properties[prop_name] = prop_value
390+
registered_properties[sanitized_name] = prop_value
386391
return registered_properties or None
392+
393+
def _fetch_iceberg_properties(self, schema_name: str, table_name: str) -> dict[str, str]:
394+
"""Read Iceberg native properties from Athena's `<table>$properties` metatable."""
395+
query = text(f'SELECT key, value FROM "{schema_name}"."{table_name}$properties"')
396+
try:
397+
with self.engine.connect() as conn:
398+
result = conn.execute(query)
399+
return {str(row[0]): str(row[1]) for row in result if row[0] is not None and row[1] is not None}
400+
except Exception as exc:
401+
logger.debug(f"Unable to read Iceberg $properties for [{schema_name}.{table_name}]: {exc}")
402+
logger.debug(traceback.format_exc())
403+
return {}

ingestion/src/metadata/ingestion/source/database/common_db_source.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,11 @@ def get_location_path(self, table_name: str, schema_name: str) -> Optional[str]:
489489
by default there will be no location path
490490
"""
491491

492-
def get_table_extensions(self, table_name: str):
492+
def get_table_extensions(
493+
self,
494+
table_name: str, # pyright: ignore[reportUnusedParameter]
495+
table_type: TableType | None = None, # pyright: ignore[reportUnusedParameter]
496+
):
493497
"""
494498
Method to fetch the extensions of the table
495499
"""
@@ -569,7 +573,7 @@ def yield_table(self, table_name_and_type: Tuple[str, TableType]) -> Iterable[Ei
569573
),
570574
owners=self.get_owner_ref(table_name=table_name),
571575
locationPath=self.get_location_path(table_name=table_name, schema_name=schema_name),
572-
extension=self.get_table_extensions(table_name=table_name),
576+
extension=self.get_table_extensions(table_name=table_name, table_type=table_type),
573577
)
574578

575579
is_partitioned, partition_details = self.get_table_partition_details(

0 commit comments

Comments
 (0)