Skip to content

Commit 5ee30e5

Browse files
ulixius9claude
andcommitted
fix(athena): ingest Iceberg table properties from $properties metatable
Glue Parameters only carry Iceberg catalog pointers (table_type, metadata_location) — they don't surface native Iceberg properties like write.parquet.compression-codec or any user-set keys (e.g. kpler.*) written by PyIceberg/Spark/Airflow. Those live inside metadata.json and are exposed via Athena's <table>$properties metatable. - Switch get_table_extensions to query $properties for Iceberg tables; skip non-Iceberg tables to avoid wasted Athena queries - Sanitise property names (non-alphanumeric/underscore → __), preserve the original name as displayName - MD5-hash sanitised names longer than 256 chars - Skip null and empty-string values - Plumb table_type through get_table_extensions; remove the thread-local props context and the ineffective processed-prop lock (idempotent registration is fine under GIL) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 5d9dbfa commit 5ee30e5

3 files changed

Lines changed: 462 additions & 58 deletions

File tree

ingestion/src/metadata/ingestion/source/database/athena/metadata.py

Lines changed: 51 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@
1111

1212
"""Athena source module"""
1313

14-
import threading
14+
import hashlib
15+
import re
1516
import traceback
1617
from typing import Dict, Iterable, Optional, Set, Tuple
1718

1819
from pyathena.sqlalchemy.base import AthenaDialect
20+
from sqlalchemy import text
1921
from sqlalchemy.engine.reflection import Inspector
2022

2123
from metadata.clients.aws_client import AWSClient
@@ -81,7 +83,10 @@
8183
ATHENA_TAG = "ATHENA TAG"
8284
ATHENA_TAG_CLASSIFICATION = "ATHENA TAG CLASSIFICATION"
8385

84-
ATHENA_TABLE_PROPS_CONTEXT_KEY = "_athena_current_tbl_props"
86+
ICEBERG_TABLE_TYPE = "ICEBERG"
87+
PROPERTY_NAME_INVALID_CHARS_PATTERN = re.compile(r"[^A-Za-z0-9_]")
88+
PROPERTY_NAME_REPLACEMENT = "__"
89+
PROPERTY_NAME_MAX_LENGTH = 256
8590

8691
ATHENA_INTERVAL_TYPE_MAP = {
8792
**dict.fromkeys(["enum", "string", "VARCHAR"], PartitionIntervalTypes.COLUMN_VALUE),
@@ -125,10 +130,8 @@ def __init__(
125130
)
126131
self.external_location_map = {}
127132
self.schema_description_map = {}
128-
self._thread_local = threading.local()
129133
self.glue_client = None
130134
self._processed_prop: Set[str] = set()
131-
self._processed_prop_lock = threading.Lock()
132135
self._string_property_type_ref = None
133136

134137
def prepare(self):
@@ -178,7 +181,7 @@ def query_table_names_and_types(
178181
params = table.get("Parameters", {})
179182
table_type = (
180183
TableType.Iceberg
181-
if params.get("table_type") == "ICEBERG"
184+
if params.get("table_type") == ICEBERG_TABLE_TYPE
182185
else TableType.External
183186
)
184187
results.append(
@@ -340,24 +343,12 @@ def get_table_description(
340343
self, schema_name: str, table_name: str, inspector: Inspector
341344
) -> str:
342345
description = None
343-
setattr(self._thread_local, ATHENA_TABLE_PROPS_CONTEXT_KEY, {})
344346
try:
345347
table_info: dict = inspector.get_table_comment(table_name, schema_name)
346348
table_option = inspector.get_table_options(table_name, schema_name)
347349
self.external_location_map[
348350
(self.context.get().database, schema_name, table_name)
349351
] = table_option.get("awsathena_location")
350-
setattr(
351-
self._thread_local,
352-
ATHENA_TABLE_PROPS_CONTEXT_KEY,
353-
{
354-
prop_name: str(prop_value)
355-
for prop_name, prop_value in (
356-
table_option.get("awsathena_tblproperties") or {}
357-
).items()
358-
if prop_value is not None
359-
},
360-
)
361352
# Catch any exception without breaking the ingestion
362353
except Exception as exc: # pylint: disable=broad-except
363354
logger.debug(traceback.format_exc())
@@ -389,35 +380,69 @@ def _get_columns_internal(
389380
catalog_id=self.service_connection.catalogId,
390381
)
391382

392-
def get_table_extensions(self, table_name: str) -> Optional[Dict[str, str]]:
383+
def get_table_extensions(
384+
self, table_name: str, table_type: Optional[TableType] = None
385+
) -> Optional[Dict[str, str]]:
393386
if not self._string_property_type_ref:
394387
return None
395-
tbl_properties = getattr(self._thread_local, ATHENA_TABLE_PROPS_CONTEXT_KEY, {})
388+
if table_type != TableType.Iceberg:
389+
return None
390+
schema_name = self.context.get().database_schema
391+
tbl_properties = self._fetch_iceberg_properties(schema_name, table_name)
396392
if not tbl_properties:
397393
return None
398394
registered_properties = {}
399395
for prop_name, prop_value in tbl_properties.items():
400-
with self._processed_prop_lock:
401-
prop_already_registered = prop_name in self._processed_prop
402-
if not prop_already_registered:
396+
if prop_value is None or prop_value == "":
397+
continue
398+
sanitized_name = PROPERTY_NAME_INVALID_CHARS_PATTERN.sub(
399+
PROPERTY_NAME_REPLACEMENT, prop_name
400+
)
401+
if len(sanitized_name) > PROPERTY_NAME_MAX_LENGTH:
402+
sanitized_name = hashlib.md5(
403+
prop_name.encode("utf-8"), usedforsecurity=False
404+
).hexdigest()
405+
if sanitized_name not in self._processed_prop:
403406
try:
404407
self.metadata.create_or_update_custom_property(
405408
OMetaCustomProperties(
406409
entity_type=Table,
407410
createCustomPropertyRequest=CreateCustomPropertyRequest(
408-
name=prop_name,
411+
name=sanitized_name,
412+
displayName=prop_name,
409413
description=prop_name,
410414
propertyType=self._string_property_type_ref,
411415
),
412416
)
413417
)
414-
with self._processed_prop_lock:
415-
self._processed_prop.add(prop_name)
418+
self._processed_prop.add(sanitized_name)
416419
except Exception as exc:
417420
logger.warning(
418421
f"Failed to register custom property [{prop_name}] for Athena table properties: {exc}"
419422
)
420423
logger.debug(traceback.format_exc())
421424
continue
422-
registered_properties[prop_name] = prop_value
425+
registered_properties[sanitized_name] = prop_value
423426
return registered_properties or None
427+
428+
def _fetch_iceberg_properties(
429+
self, schema_name: str, table_name: str
430+
) -> Dict[str, str]:
431+
"""Read Iceberg native properties from Athena's `<table>$properties` metatable."""
432+
query = text(
433+
f'SELECT key, value FROM "{schema_name}"."{table_name}$properties"'
434+
)
435+
try:
436+
with self.engine.connect() as conn:
437+
result = conn.execute(query)
438+
return {
439+
str(row[0]): str(row[1])
440+
for row in result
441+
if row[0] is not None and row[1] is not None
442+
}
443+
except Exception as exc:
444+
logger.debug(
445+
f"Unable to read Iceberg $properties for [{schema_name}.{table_name}]: {exc}"
446+
)
447+
logger.debug(traceback.format_exc())
448+
return {}

ingestion/src/metadata/ingestion/source/database/common_db_source.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,9 @@ def get_location_path(self, table_name: str, schema_name: str) -> Optional[str]:
559559
by default there will be no location path
560560
"""
561561

562-
def get_table_extensions(self, table_name: str):
562+
def get_table_extensions(
563+
self, table_name: str, table_type: Optional[TableType] = None
564+
):
563565
"""
564566
Method to fetch the extensions of the table
565567
"""
@@ -647,7 +649,9 @@ def yield_table(
647649
locationPath=self.get_location_path(
648650
table_name=table_name, schema_name=schema_name
649651
),
650-
extension=self.get_table_extensions(table_name=table_name),
652+
extension=self.get_table_extensions(
653+
table_name=table_name, table_type=table_type
654+
),
651655
)
652656

653657
is_partitioned, partition_details = self.get_table_partition_details(

0 commit comments

Comments
 (0)