Skip to content

Commit fe42adc

Browse files
ulixius9harshsoni2024
authored andcommitted
fix(athena): ingest Iceberg table properties from $properties metatable (#27715)
1 parent 90a9274 commit fe42adc

8 files changed

Lines changed: 501 additions & 70 deletions

File tree

ingestion/src/metadata/ingestion/source/database/athena/metadata.py

Lines changed: 61 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@
1111

1212
"""Athena source module"""
1313

14-
import threading
14+
import hashlib
15+
import re
1516
import traceback
16-
from typing import Dict, Iterable, Optional, Set, Tuple
17+
from typing import Iterable, Optional, Tuple # noqa: UP035
1718

1819
from pyathena.sqlalchemy.base import AthenaDialect
20+
from sqlalchemy import text
1921
from sqlalchemy.engine.reflection import Inspector
2022

2123
from metadata.clients.aws_client import AWSClient
@@ -40,6 +42,7 @@
4042
from metadata.generated.schema.metadataIngestion.workflow import (
4143
Source as WorkflowSource,
4244
)
45+
from metadata.generated.schema.type.basic import EntityName, Markdown
4346
from metadata.ingestion.api.models import Either
4447
from metadata.ingestion.api.steps import InvalidSourceException
4548
from metadata.ingestion.models.custom_properties import (
@@ -81,7 +84,10 @@
8184
ATHENA_TAG = "ATHENA TAG"
8285
ATHENA_TAG_CLASSIFICATION = "ATHENA TAG CLASSIFICATION"
8386

84-
ATHENA_TABLE_PROPS_CONTEXT_KEY = "_athena_current_tbl_props"
87+
ICEBERG_TABLE_TYPE = "ICEBERG"
88+
PROPERTY_NAME_INVALID_CHARS_PATTERN = re.compile(r"[^A-Za-z0-9_.\-]")
89+
PROPERTY_NAME_REPLACEMENT = "__"
90+
PROPERTY_NAME_MAX_LENGTH = 256
8591

8692
ATHENA_INTERVAL_TYPE_MAP = {
8793
**dict.fromkeys(["enum", "string", "VARCHAR"], PartitionIntervalTypes.COLUMN_VALUE),
@@ -125,10 +131,8 @@ def __init__(
125131
)
126132
self.external_location_map = {}
127133
self.schema_description_map = {}
128-
self._thread_local = threading.local()
129134
self.glue_client = None
130-
self._processed_prop: Set[str] = set()
131-
self._processed_prop_lock = threading.Lock()
135+
self._processed_prop: set[str] = set()
132136
self._string_property_type_ref = None
133137

134138
def prepare(self):
@@ -162,11 +166,23 @@ def prepare(self):
162166
def get_schema_description(self, schema_name: str) -> Optional[str]:
163167
return self.schema_description_map.get(schema_name)
164168

165-
def query_table_names_and_types(
166-
self, schema_name: str
167-
) -> Iterable[TableNameAndType]:
168-
"""Return tables as external"""
169-
169+
def query_table_names_and_types(self, schema_name: str) -> Iterable[TableNameAndType]:
170+
"""Return tables with proper type detection using a single Glue API pass."""
171+
if self.glue_client:
172+
try:
173+
results = []
174+
paginator = self.glue_client.get_paginator("get_tables")
175+
for page in paginator.paginate(DatabaseName=schema_name):
176+
for table in page.get("TableList", []):
177+
params = table.get("Parameters", {})
178+
table_type = (
179+
TableType.Iceberg if params.get("table_type") == ICEBERG_TABLE_TYPE else TableType.External
180+
)
181+
results.append(TableNameAndType(name=table["Name"], type_=table_type))
182+
return results # noqa: TRY300
183+
except Exception as exc:
184+
logger.debug(traceback.format_exc())
185+
logger.warning(f"Failed to fetch Glue table metadata for schema [{schema_name}]: {exc}")
170186
return [
171187
TableNameAndType(name=name, type_=TableType.External)
172188
for name in self.inspector.get_table_names(schema_name)
@@ -316,23 +332,11 @@ def get_table_description(
316332
self, schema_name: str, table_name: str, inspector: Inspector
317333
) -> str:
318334
description = None
319-
setattr(self._thread_local, ATHENA_TABLE_PROPS_CONTEXT_KEY, {})
320335
try:
321336
table_info: dict = inspector.get_table_comment(table_name, schema_name)
322337
table_option = inspector.get_table_options(table_name, schema_name)
323-
self.external_location_map[
324-
(self.context.get().database, schema_name, table_name)
325-
] = table_option.get("awsathena_location")
326-
setattr(
327-
self._thread_local,
328-
ATHENA_TABLE_PROPS_CONTEXT_KEY,
329-
{
330-
prop_name: str(prop_value)
331-
for prop_name, prop_value in (
332-
table_option.get("awsathena_tblproperties") or {}
333-
).items()
334-
if prop_value is not None
335-
},
338+
self.external_location_map[(self.context.get().database, schema_name, table_name)] = table_option.get(
339+
"awsathena_location"
336340
)
337341
# Catch any exception without breaking the ingestion
338342
except Exception as exc: # pylint: disable=broad-except
@@ -364,35 +368,56 @@ def _get_columns_internal(
364368
glue_client=self.glue_client,
365369
)
366370

367-
def get_table_extensions(self, table_name: str) -> Optional[Dict[str, str]]:
371+
def get_table_extensions(self, table_name: str, table_type: TableType | None = None) -> dict[str, str] | None:
372+
if not getattr(self.source_config, "includeCustomProperties", False):
373+
return None
368374
if not self._string_property_type_ref:
369375
return None
370-
tbl_properties = getattr(self._thread_local, ATHENA_TABLE_PROPS_CONTEXT_KEY, {})
376+
if table_type != TableType.Iceberg:
377+
return None
378+
schema_name: str = getattr(self.context.get(), "database_schema", "")
379+
tbl_properties = self._fetch_iceberg_properties(schema_name, table_name)
371380
if not tbl_properties:
372381
return None
373382
registered_properties = {}
374383
for prop_name, prop_value in tbl_properties.items():
375-
with self._processed_prop_lock:
376-
prop_already_registered = prop_name in self._processed_prop
377-
if not prop_already_registered:
384+
if not prop_value:
385+
continue
386+
sanitized_name = PROPERTY_NAME_INVALID_CHARS_PATTERN.sub(PROPERTY_NAME_REPLACEMENT, prop_name)
387+
if len(sanitized_name) > PROPERTY_NAME_MAX_LENGTH:
388+
sanitized_name = hashlib.md5(prop_name.encode("utf-8"), usedforsecurity=False).hexdigest()
389+
if sanitized_name not in self._processed_prop:
378390
try:
379-
self.metadata.create_or_update_custom_property(
391+
self.metadata.create_or_update_custom_property( # pyright: ignore[reportUnknownMemberType, reportUnusedCallResult]
380392
OMetaCustomProperties(
381393
entity_type=Table,
382394
createCustomPropertyRequest=CreateCustomPropertyRequest(
383-
name=prop_name,
384-
description=prop_name,
395+
name=EntityName(sanitized_name),
396+
displayName=prop_name,
397+
description=Markdown(prop_name),
385398
propertyType=self._string_property_type_ref,
399+
customPropertyConfig=None,
386400
),
387401
)
388402
)
389-
with self._processed_prop_lock:
390-
self._processed_prop.add(prop_name)
403+
self._processed_prop.add(sanitized_name)
391404
except Exception as exc:
392405
logger.warning(
393406
f"Failed to register custom property [{prop_name}] for Athena table properties: {exc}"
394407
)
395408
logger.debug(traceback.format_exc())
396409
continue
397-
registered_properties[prop_name] = prop_value
410+
registered_properties[sanitized_name] = prop_value
398411
return registered_properties or None
412+
413+
def _fetch_iceberg_properties(self, schema_name: str, table_name: str) -> dict[str, str]:
414+
"""Read Iceberg native properties from Athena's `<table>$properties` metatable."""
415+
query = text(f'SELECT key, value FROM "{schema_name}"."{table_name}$properties"')
416+
try:
417+
with self.engine.connect() as conn:
418+
result = conn.execute(query)
419+
return {str(row[0]): str(row[1]) for row in result if row[0] is not None and row[1] is not None}
420+
except Exception as exc:
421+
logger.debug(f"Unable to read Iceberg $properties for [{schema_name}.{table_name}]: {exc}")
422+
logger.debug(traceback.format_exc())
423+
return {}

ingestion/src/metadata/ingestion/source/database/common_db_source.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -533,7 +533,11 @@ def get_location_path(self, table_name: str, schema_name: str) -> Optional[str]:
533533
by default there will be no location path
534534
"""
535535

536-
def get_table_extensions(self, table_name: str):
536+
def get_table_extensions(
537+
self,
538+
table_name: str, # pyright: ignore[reportUnusedParameter]
539+
table_type: TableType | None = None, # pyright: ignore[reportUnusedParameter]
540+
):
537541
"""
538542
Method to fetch the extensions of the table
539543
"""
@@ -618,10 +622,8 @@ def yield_table(
618622
table_type=table_type,
619623
),
620624
owners=self.get_owner_ref(table_name=table_name),
621-
locationPath=self.get_location_path(
622-
table_name=table_name, schema_name=schema_name
623-
),
624-
extension=self.get_table_extensions(table_name=table_name),
625+
locationPath=self.get_location_path(table_name=table_name, schema_name=schema_name),
626+
extension=self.get_table_extensions(table_name=table_name, table_type=table_type),
625627
)
626628

627629
is_partitioned, partition_details = self.get_table_partition_details(

0 commit comments

Comments
 (0)