Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@

"""Athena source module"""

import threading
import hashlib
import re
import traceback
from typing import Dict, Iterable, Optional, Set, Tuple

from pyathena.sqlalchemy.base import AthenaDialect
from sqlalchemy import text
from sqlalchemy.engine.reflection import Inspector

from metadata.clients.aws_client import AWSClient
Expand Down Expand Up @@ -81,7 +83,10 @@
ATHENA_TAG = "ATHENA TAG"
ATHENA_TAG_CLASSIFICATION = "ATHENA TAG CLASSIFICATION"

ATHENA_TABLE_PROPS_CONTEXT_KEY = "_athena_current_tbl_props"
ICEBERG_TABLE_TYPE = "ICEBERG"
PROPERTY_NAME_INVALID_CHARS_PATTERN = re.compile(r"[^A-Za-z0-9_]")

Check warning on line 87 in ingestion/src/metadata/ingestion/source/database/athena/metadata.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Use concise character class syntax '\W' instead of '[^A-Za-z0-9_]'.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ2_3cWTAoJmzSlDrw6g&open=AZ2_3cWTAoJmzSlDrw6g&pullRequest=27715
PROPERTY_NAME_REPLACEMENT = "__"
Comment thread
ulixius9 marked this conversation as resolved.
PROPERTY_NAME_MAX_LENGTH = 256

ATHENA_INTERVAL_TYPE_MAP = {
**dict.fromkeys(["enum", "string", "VARCHAR"], PartitionIntervalTypes.COLUMN_VALUE),
Expand Down Expand Up @@ -125,10 +130,8 @@
)
self.external_location_map = {}
self.schema_description_map = {}
self._thread_local = threading.local()
self.glue_client = None
self._processed_prop: Set[str] = set()
self._processed_prop_lock = threading.Lock()
self._string_property_type_ref = None

def prepare(self):
Expand Down Expand Up @@ -178,7 +181,7 @@
params = table.get("Parameters", {})
table_type = (
TableType.Iceberg
if params.get("table_type") == "ICEBERG"
if params.get("table_type") == ICEBERG_TABLE_TYPE
else TableType.External
)
results.append(
Expand Down Expand Up @@ -340,24 +343,12 @@
self, schema_name: str, table_name: str, inspector: Inspector
) -> str:
description = None
setattr(self._thread_local, ATHENA_TABLE_PROPS_CONTEXT_KEY, {})
try:
table_info: dict = inspector.get_table_comment(table_name, schema_name)
table_option = inspector.get_table_options(table_name, schema_name)
self.external_location_map[
(self.context.get().database, schema_name, table_name)
] = table_option.get("awsathena_location")
setattr(
self._thread_local,
ATHENA_TABLE_PROPS_CONTEXT_KEY,
{
prop_name: str(prop_value)
for prop_name, prop_value in (
table_option.get("awsathena_tblproperties") or {}
).items()
if prop_value is not None
},
)
# Catch any exception without breaking the ingestion
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
Expand Down Expand Up @@ -389,35 +380,69 @@
catalog_id=self.service_connection.catalogId,
)

def get_table_extensions(self, table_name: str) -> Optional[Dict[str, str]]:
def get_table_extensions(
self, table_name: str, table_type: Optional[TableType] = None
) -> Optional[Dict[str, str]]:
if not self._string_property_type_ref:
return None
tbl_properties = getattr(self._thread_local, ATHENA_TABLE_PROPS_CONTEXT_KEY, {})
if table_type != TableType.Iceberg:
return None
schema_name = self.context.get().database_schema
tbl_properties = self._fetch_iceberg_properties(schema_name, table_name)
if not tbl_properties:
Comment thread
ulixius9 marked this conversation as resolved.
return None
registered_properties = {}
for prop_name, prop_value in tbl_properties.items():
with self._processed_prop_lock:
prop_already_registered = prop_name in self._processed_prop
if not prop_already_registered:
if prop_value is None or prop_value == "":
continue
sanitized_name = PROPERTY_NAME_INVALID_CHARS_PATTERN.sub(
PROPERTY_NAME_REPLACEMENT, prop_name
)
if len(sanitized_name) > PROPERTY_NAME_MAX_LENGTH:
sanitized_name = hashlib.md5(
prop_name.encode("utf-8"), usedforsecurity=False
).hexdigest()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Edge Case: Sanitized property names can collide, silently dropping values

The replacement strategy ([^A-Za-z0-9_]__) maps distinct property names to the same sanitized key. For example, kpler.owner and kpler-owner both become kpler__owner. When both appear in the same $properties result set:

  1. The first is registered and written to registered_properties.
  2. The second is seen as "already processed" (dedup) and its value silently overwrites the first in registered_properties.

This means the final value stored in OM is non-deterministic (depends on dict iteration order) and one property value is lost without any log message.

Consider appending a short hash suffix when a collision is detected, or at minimum logging a warning when two original names map to the same sanitized key.

Suggested fix:

# After computing sanitized_name, detect collision:
if sanitized_name in registered_properties:
    logger.warning(
        f"Sanitized name collision: [{prop_name}] maps to [{sanitized_name}] "
        f"which is already used by another property — skipping."
    )
    continue

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

if sanitized_name not in self._processed_prop:
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sanitizing property names by replacing non [A-Za-z0-9_] chars with __ can cause collisions (e.g., a-b and a.b both become a__b). With the current _processed_prop/registered_properties keyed by sanitized_name, one of the original properties will be silently dropped/overwritten. Consider adding deterministic collision handling (e.g., append a short hash of the original key when a collision is detected) so all distinct Iceberg properties can be preserved.

Copilot uses AI. Check for mistakes.
Comment thread
ulixius9 marked this conversation as resolved.
try:
self.metadata.create_or_update_custom_property(
OMetaCustomProperties(
entity_type=Table,
createCustomPropertyRequest=CreateCustomPropertyRequest(
name=prop_name,
name=sanitized_name,
displayName=prop_name,
description=prop_name,
propertyType=self._string_property_type_ref,
),
)
)
with self._processed_prop_lock:
self._processed_prop.add(prop_name)
self._processed_prop.add(sanitized_name)
except Exception as exc:
logger.warning(
f"Failed to register custom property [{prop_name}] for Athena table properties: {exc}"
)
logger.debug(traceback.format_exc())
continue
registered_properties[prop_name] = prop_value
registered_properties[sanitized_name] = prop_value
return registered_properties or None

def _fetch_iceberg_properties(
self, schema_name: str, table_name: str
) -> Dict[str, str]:
"""Read Iceberg native properties from Athena's `<table>$properties` metatable."""
query = text(
f'SELECT key, value FROM "{schema_name}"."{table_name}$properties"'
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Security: SQL identifiers interpolated via f-string without escaping

_fetch_iceberg_properties builds the query with an f-string: f'SELECT key, value FROM "{schema_name}"."{table_name}$properties"'. If a schema or table name contains a double-quote character, the quoting breaks and arbitrary SQL could be injected. The risk is low because these names originate from the Glue catalogue (not direct user input), but defence-in-depth suggests escaping the identifiers.

SQLAlchemy provides quoted_name or the dialect's identifier_preparer.quote_identifier() for safe quoting. Alternatively, a simple schema_name.replace('"', '""') (standard SQL double-quote escaping) would suffice.

Suggested fix:

safe_schema = schema_name.replace('"', '""')
safe_table = table_name.replace('"', '""')
query = text(
    f'SELECT key, value FROM "{safe_schema}"."{safe_table}$properties"'
)

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

try:
with self.engine.connect() as conn:
result = conn.execute(query)
return {
str(row[0]): str(row[1])
for row in result
if row[0] is not None and row[1] is not None
}
except Exception as exc:
logger.debug(
f"Unable to read Iceberg $properties for [{schema_name}.{table_name}]: {exc}"
)
logger.debug(traceback.format_exc())
return {}
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,9 @@ def get_location_path(self, table_name: str, schema_name: str) -> Optional[str]:
by default there will be no location path
"""

def get_table_extensions(self, table_name: str):
def get_table_extensions(
self, table_name: str, table_type: Optional[TableType] = None
):
"""
Method to fetch the extensions of the table
"""
Expand Down Expand Up @@ -647,7 +649,9 @@ def yield_table(
locationPath=self.get_location_path(
table_name=table_name, schema_name=schema_name
),
extension=self.get_table_extensions(table_name=table_name),
extension=self.get_table_extensions(
table_name=table_name, table_type=table_type
),
)

is_partitioned, partition_details = self.get_table_partition_details(
Expand Down
Loading
Loading