diff --git a/ingestion/src/metadata/ingestion/source/database/athena/metadata.py b/ingestion/src/metadata/ingestion/source/database/athena/metadata.py
index e0dc051fadc0..4246031c6197 100644
--- a/ingestion/src/metadata/ingestion/source/database/athena/metadata.py
+++ b/ingestion/src/metadata/ingestion/source/database/athena/metadata.py
@@ -11,11 +11,13 @@
"""Athena source module"""
-import threading
+import hashlib
+import re
import traceback
-from typing import Dict, Iterable, Optional, Set, Tuple # noqa: UP035
+from typing import Iterable, Optional, Tuple # noqa: UP035
from pyathena.sqlalchemy.base import AthenaDialect
+from sqlalchemy import text
from sqlalchemy.engine.reflection import Inspector
from metadata.clients.aws_client import AWSClient
@@ -40,6 +42,7 @@
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
+from metadata.generated.schema.type.basic import EntityName, Markdown
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.custom_properties import (
@@ -81,7 +84,10 @@
ATHENA_TAG = "ATHENA TAG"
ATHENA_TAG_CLASSIFICATION = "ATHENA TAG CLASSIFICATION"
-ATHENA_TABLE_PROPS_CONTEXT_KEY = "_athena_current_tbl_props"
+ICEBERG_TABLE_TYPE = "ICEBERG"
+PROPERTY_NAME_INVALID_CHARS_PATTERN = re.compile(r"[^A-Za-z0-9_.\-]")
+PROPERTY_NAME_REPLACEMENT = "__"
+PROPERTY_NAME_MAX_LENGTH = 256
ATHENA_INTERVAL_TYPE_MAP = {
**dict.fromkeys(["enum", "string", "VARCHAR"], PartitionIntervalTypes.COLUMN_VALUE),
@@ -117,10 +123,8 @@ def __init__(
self.athena_lake_formation_client = AthenaLakeFormationClient(connection=self.service_connection)
self.external_location_map = {}
self.schema_description_map = {}
- self._thread_local = threading.local()
self.glue_client = None
- self._processed_prop: Set[str] = set() # noqa: UP006
- self._processed_prop_lock = threading.Lock()
+ self._processed_prop: set[str] = set()
self._string_property_type_ref = None
def prepare(self):
@@ -160,7 +164,9 @@ def query_table_names_and_types(self, schema_name: str) -> Iterable[TableNameAnd
for page in paginator.paginate(DatabaseName=schema_name):
for table in page.get("TableList", []):
params = table.get("Parameters", {})
- table_type = TableType.Iceberg if params.get("table_type") == "ICEBERG" else TableType.External
+ table_type = (
+ TableType.Iceberg if params.get("table_type") == ICEBERG_TABLE_TYPE else TableType.External
+ )
results.append(TableNameAndType(name=table["Name"], type_=table_type))
return results # noqa: TRY300
except Exception as exc:
@@ -307,22 +313,12 @@ def yield_table_tags(
# pylint: disable=arguments-differ
def get_table_description(self, schema_name: str, table_name: str, inspector: Inspector) -> str:
description = None
- setattr(self._thread_local, ATHENA_TABLE_PROPS_CONTEXT_KEY, {})
try:
table_info: dict = inspector.get_table_comment(table_name, schema_name)
table_option = inspector.get_table_options(table_name, schema_name)
self.external_location_map[(self.context.get().database, schema_name, table_name)] = table_option.get(
"awsathena_location"
)
- setattr(
- self._thread_local,
- ATHENA_TABLE_PROPS_CONTEXT_KEY,
- {
- prop_name: str(prop_value)
- for prop_name, prop_value in (table_option.get("awsathena_tblproperties") or {}).items()
- if prop_value is not None
- },
- )
# Catch any exception without breaking the ingestion
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
@@ -352,35 +348,56 @@ def _get_columns_internal(
catalog_id=self.service_connection.catalogId,
)
- def get_table_extensions(self, table_name: str) -> Optional[Dict[str, str]]: # noqa: UP006, UP045
+ def get_table_extensions(self, table_name: str, table_type: TableType | None = None) -> dict[str, str] | None:
+ if not getattr(self.source_config, "includeCustomProperties", False):
+ return None
if not self._string_property_type_ref:
return None
- tbl_properties = getattr(self._thread_local, ATHENA_TABLE_PROPS_CONTEXT_KEY, {})
+ if table_type != TableType.Iceberg:
+ return None
+ schema_name: str = getattr(self.context.get(), "database_schema", "")
+ tbl_properties = self._fetch_iceberg_properties(schema_name, table_name)
if not tbl_properties:
return None
registered_properties = {}
for prop_name, prop_value in tbl_properties.items():
- with self._processed_prop_lock:
- prop_already_registered = prop_name in self._processed_prop
- if not prop_already_registered:
+ if not prop_value:
+ continue
+ sanitized_name = PROPERTY_NAME_INVALID_CHARS_PATTERN.sub(PROPERTY_NAME_REPLACEMENT, prop_name)
+ if len(sanitized_name) > PROPERTY_NAME_MAX_LENGTH:
+ sanitized_name = hashlib.md5(prop_name.encode("utf-8"), usedforsecurity=False).hexdigest()
+ if sanitized_name not in self._processed_prop:
try:
- self.metadata.create_or_update_custom_property(
+ self.metadata.create_or_update_custom_property( # pyright: ignore[reportUnknownMemberType, reportUnusedCallResult]
OMetaCustomProperties(
entity_type=Table,
createCustomPropertyRequest=CreateCustomPropertyRequest(
- name=prop_name,
- description=prop_name,
+ name=EntityName(sanitized_name),
+ displayName=prop_name,
+ description=Markdown(prop_name),
propertyType=self._string_property_type_ref,
+ customPropertyConfig=None,
),
)
)
- with self._processed_prop_lock:
- self._processed_prop.add(prop_name)
+ self._processed_prop.add(sanitized_name)
except Exception as exc:
logger.warning(
f"Failed to register custom property [{prop_name}] for Athena table properties: {exc}"
)
logger.debug(traceback.format_exc())
continue
- registered_properties[prop_name] = prop_value
+ registered_properties[sanitized_name] = prop_value
return registered_properties or None
+
+ def _fetch_iceberg_properties(self, schema_name: str, table_name: str) -> dict[str, str]:
+ """Read Iceberg native properties from Athena's `
$properties` metatable."""
+ query = text(f'SELECT key, value FROM "{schema_name}"."{table_name}$properties"')
+ try:
+ with self.engine.connect() as conn:
+ result = conn.execute(query)
+ return {str(row[0]): str(row[1]) for row in result if row[0] is not None and row[1] is not None}
+ except Exception as exc:
+ logger.debug(f"Unable to read Iceberg $properties for [{schema_name}.{table_name}]: {exc}")
+ logger.debug(traceback.format_exc())
+ return {}
diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py
index b6a0319eec75..7e53acc89828 100644
--- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py
+++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py
@@ -489,7 +489,11 @@ def get_location_path(self, table_name: str, schema_name: str) -> Optional[str]:
by default there will be no location path
"""
- def get_table_extensions(self, table_name: str):
+ def get_table_extensions(
+ self,
+ table_name: str, # pyright: ignore[reportUnusedParameter]
+ table_type: TableType | None = None, # pyright: ignore[reportUnusedParameter]
+ ):
"""
Method to fetch the extensions of the table
"""
@@ -569,7 +573,7 @@ def yield_table(self, table_name_and_type: Tuple[str, TableType]) -> Iterable[Ei
),
owners=self.get_owner_ref(table_name=table_name),
locationPath=self.get_location_path(table_name=table_name, schema_name=schema_name),
- extension=self.get_table_extensions(table_name=table_name),
+ extension=self.get_table_extensions(table_name=table_name, table_type=table_type),
)
is_partitioned, partition_details = self.get_table_partition_details(
diff --git a/ingestion/tests/unit/topology/database/test_athena.py b/ingestion/tests/unit/topology/database/test_athena.py
index 591a8ade432b..7dce30c7b332 100644
--- a/ingestion/tests/unit/topology/database/test_athena.py
+++ b/ingestion/tests/unit/topology/database/test_athena.py
@@ -12,11 +12,13 @@
Test athena source
"""
+import hashlib
import unittest
from datetime import datetime
from unittest.mock import MagicMock, patch
from uuid import UUID
+import pytest
from pydantic import AnyUrl
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
@@ -322,31 +324,6 @@ def test_column_lineage(self):
)
assert column_lineage == EXPECTED_COLUMN_LINEAGE
- def test_get_table_extensions_returns_none_without_type_ref(self):
- self.athena_source._string_property_type_ref = None
- assert self.athena_source.get_table_extensions(MOCK_TABLE_NAME) is None
-
- def test_get_table_extensions_returns_properties_from_description(self):
- from metadata.generated.schema.type.customProperty import PropertyType
-
- self.athena_source._string_property_type_ref = PropertyType(
- EntityReference(id=UUID("00000000-0000-0000-0000-000000000001"), type="type")
- )
- mock_inspector = MagicMock()
- mock_inspector.get_table_comment.return_value = {"text": "desc"}
- mock_inspector.get_table_options.return_value = {
- "awsathena_location": "s3://bucket/path",
- "awsathena_tblproperties": {"prop_key": "prop_value", "null_prop": None},
- }
- self.athena_source.get_table_description(MOCK_DATABASE_SCHEMA.name.root, MOCK_TABLE_NAME, mock_inspector)
-
- with patch.object(self.athena_source, "metadata") as mock_metadata:
- result = self.athena_source.get_table_extensions(MOCK_TABLE_NAME)
-
- assert result == {"prop_key": "prop_value"}
- assert "null_prop" not in result
- mock_metadata.create_or_update_custom_property.assert_called_once()
-
SUBMISSION_DT = datetime(2024, 1, 2, 10, 0, 0)
COMPLETION_DT = datetime(2024, 1, 2, 10, 5, 0)
@@ -396,3 +373,404 @@ def test_end_time_falls_back_to_submission_when_completion_missing(self):
assert len(results) == 1
assert len(results[0].queries) == 1
assert results[0].queries[0].endTime == SUBMISSION_DT.isoformat(" ", "seconds")
+
+
+@pytest.fixture
+def athena_source():
+ """A minimally-wired AthenaSource with context populated and a dummy type ref."""
+ from metadata.generated.schema.type.customProperty import PropertyType
+
+ config = OpenMetadataWorkflowConfig.model_validate(mock_athena_config)
+ with patch(
+ "metadata.ingestion.source.database.database_service.DatabaseServiceSource.test_connection",
+ return_value=False,
+ ):
+ source = AthenaSource.create(
+ mock_athena_config["source"],
+ config.workflowConfig.openMetadataServerConfig,
+ )
+
+ source.context.get().__dict__["database_schema"] = MOCK_DATABASE_SCHEMA.name.root
+ source.context.get().__dict__["database_service"] = MOCK_DATABASE_SERVICE.name.root
+ source.context.get().__dict__["database"] = MOCK_DATABASE.name.root
+ source._string_property_type_ref = PropertyType(
+ EntityReference(id=UUID("00000000-0000-0000-0000-000000000001"), type="type")
+ )
+ source.source_config.includeCustomProperties = True
+ return source
+
+
+def _mock_query_rows(source, rows):
+ """Wire source.engine.connect() as a context manager yielding the given rows."""
+ mock_engine = MagicMock()
+ mock_engine.connect.return_value.__enter__.return_value.execute.return_value = rows
+ source.engine = mock_engine
+ return mock_engine
+
+
+def _get_request(mock_metadata, call_index=0):
+ """Pull the CreateCustomPropertyRequest from a create_or_update_custom_property call."""
+ return mock_metadata.create_or_update_custom_property.call_args_list[call_index].args[0].createCustomPropertyRequest
+
+
+class TestGetTableExtensionsEarlyExits:
+ """Cover the early-return branches of get_table_extensions."""
+
+ def test_returns_none_when_include_custom_properties_disabled(self, athena_source):
+ athena_source.source_config.includeCustomProperties = False
+ with patch.object(athena_source, "_fetch_iceberg_properties") as mock_fetch:
+ result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg)
+ assert result is None
+ mock_fetch.assert_not_called()
+
+ def test_returns_none_without_type_ref(self, athena_source):
+ athena_source._string_property_type_ref = None
+ assert athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) is None
+
+ def test_returns_none_for_external_table(self, athena_source):
+ assert athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.External) is None
+
+ def test_returns_none_for_regular_table(self, athena_source):
+ assert athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Regular) is None
+
+ def test_returns_none_when_table_type_is_none(self, athena_source):
+ assert athena_source.get_table_extensions(MOCK_TABLE_NAME) is None
+
+ def test_returns_none_when_query_yields_no_properties(self, athena_source):
+ with patch.object(athena_source, "_fetch_iceberg_properties", return_value={}):
+ assert athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) is None
+
+ def test_returns_none_when_all_values_filtered_out(self, athena_source):
+ props = {"k1": None, "k2": ""}
+ with (
+ patch.object(athena_source, "_fetch_iceberg_properties", return_value=props),
+ patch.object(athena_source, "metadata"),
+ ):
+ assert athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) is None
+
+
+class TestGetTableExtensionsSanitization:
+ """Property name sanitization and display-name preservation."""
+
+ def test_dot_is_preserved(self, athena_source):
+ props = {"myprop.owner": "team-a"}
+ with (
+ patch.object(athena_source, "_fetch_iceberg_properties", return_value=props),
+ patch.object(athena_source, "metadata") as mock_metadata,
+ ):
+ result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg)
+
+ assert result == {"myprop.owner": "team-a"}
+ request = _get_request(mock_metadata)
+ assert request.name.root == "myprop.owner"
+ assert request.displayName == "myprop.owner"
+
+ def test_hyphen_is_preserved(self, athena_source):
+ props = {"myprop-owner": "x"}
+ with (
+ patch.object(athena_source, "_fetch_iceberg_properties", return_value=props),
+ patch.object(athena_source, "metadata") as mock_metadata,
+ ):
+ result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg)
+
+ assert result == {"myprop-owner": "x"}
+ request = _get_request(mock_metadata)
+ assert request.name.root == "myprop-owner"
+
+ def test_allowed_punctuation_combined_preserved(self, athena_source):
+ """Dots and hyphens together are allowed — name passes through untouched."""
+ props = {"myprop.airflow-dag-id": "scrape-dag"}
+ with (
+ patch.object(athena_source, "_fetch_iceberg_properties", return_value=props),
+ patch.object(athena_source, "metadata") as mock_metadata,
+ ):
+ result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg)
+
+ assert result == {"myprop.airflow-dag-id": "scrape-dag"}
+ request = _get_request(mock_metadata)
+ assert request.name.root == "myprop.airflow-dag-id"
+ assert request.displayName == "myprop.airflow-dag-id"
+
+ def test_other_special_chars_still_replaced(self, athena_source):
+ """Everything outside [A-Za-z0-9_.-] gets replaced with __."""
+ props = {"myprop/airflow:dag id@prod": "v"}
+ with (
+ patch.object(athena_source, "_fetch_iceberg_properties", return_value=props),
+ patch.object(athena_source, "metadata") as mock_metadata,
+ ):
+ result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg)
+
+ assert result == {"myprop__airflow__dag__id__prod": "v"}
+ request = _get_request(mock_metadata)
+ assert request.displayName == "myprop/airflow:dag id@prod"
+
+ def test_mixed_allowed_and_disallowed_chars(self, athena_source):
+ """Allowed chars (. -) stay; disallowed chars (/ space) get replaced."""
+ props = {"myprop.data/type-v1 beta": "v"}
+ with (
+ patch.object(athena_source, "_fetch_iceberg_properties", return_value=props),
+ patch.object(athena_source, "metadata"),
+ ):
+ result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg)
+
+ assert result == {"myprop.data__type-v1__beta": "v"}
+
+ def test_already_valid_name_unchanged(self, athena_source):
+ props = {"simple_key": "value"}
+ with (
+ patch.object(athena_source, "_fetch_iceberg_properties", return_value=props),
+ patch.object(athena_source, "metadata") as mock_metadata,
+ ):
+ result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg)
+
+ assert result == {"simple_key": "value"}
+ request = _get_request(mock_metadata)
+ assert request.name.root == "simple_key"
+ assert request.displayName == "simple_key"
+
+ def test_alphanumeric_and_underscore_preserved(self, athena_source):
+ props = {"abc123_XYZ": "v"}
+ with (
+ patch.object(athena_source, "_fetch_iceberg_properties", return_value=props),
+ patch.object(athena_source, "metadata"),
+ ):
+ result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg)
+ assert result == {"abc123_XYZ": "v"}
+
+ def test_sanitized_name_at_256_chars_not_hashed(self, athena_source):
+ name = "a" * 256
+ props = {name: "value"}
+ with (
+ patch.object(athena_source, "_fetch_iceberg_properties", return_value=props),
+ patch.object(athena_source, "metadata") as mock_metadata,
+ ):
+ result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg)
+
+ assert result == {name: "value"}
+ request = _get_request(mock_metadata)
+ assert request.displayName == name
+
+ def test_long_sanitized_name_is_md5_hashed(self, athena_source):
+ original = "myprop." + ("a" * 260)
+ props = {original: "value"}
+ with (
+ patch.object(athena_source, "_fetch_iceberg_properties", return_value=props),
+ patch.object(athena_source, "metadata") as mock_metadata,
+ ):
+ result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg)
+
+ expected_hash = hashlib.md5(original.encode("utf-8"), usedforsecurity=False).hexdigest()
+ assert result == {expected_hash: "value"}
+ request = _get_request(mock_metadata)
+ assert request.name.root == expected_hash
+ assert request.displayName == original
+
+ def test_hashed_name_is_stable_for_same_input(self, athena_source):
+ """Same long original name must always map to the same hash."""
+ original = "x." + ("b" * 300)
+ props_first = {original: "v1"}
+ props_second = {original: "v2"}
+
+ with (
+ patch.object(
+ athena_source,
+ "_fetch_iceberg_properties",
+ side_effect=[props_first, props_second],
+ ),
+ patch.object(athena_source, "metadata"),
+ ):
+ r1 = athena_source.get_table_extensions("t1", TableType.Iceberg)
+ r2 = athena_source.get_table_extensions("t2", TableType.Iceberg)
+
+ assert list(r1.keys()) == list(r2.keys())
+
+
+class TestGetTableExtensionsValueFiltering:
+ """Filter out null and empty-string property values."""
+
+ def test_skips_none_valued_property(self, athena_source):
+ props = {"k1": "v1", "k2": None}
+ with (
+ patch.object(athena_source, "_fetch_iceberg_properties", return_value=props),
+ patch.object(athena_source, "metadata") as mock_metadata,
+ ):
+ result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg)
+
+ assert result == {"k1": "v1"}
+ assert mock_metadata.create_or_update_custom_property.call_count == 1
+
+ def test_skips_empty_string_valued_property(self, athena_source):
+ props = {"k1": "v1", "k2": ""}
+ with (
+ patch.object(athena_source, "_fetch_iceberg_properties", return_value=props),
+ patch.object(athena_source, "metadata"),
+ ):
+ result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg)
+ assert result == {"k1": "v1"}
+
+ def test_keeps_string_zero(self, athena_source):
+ """'0' is falsy-ish in some checks but is a legitimate value."""
+ props = {"k": "0"}
+ with (
+ patch.object(athena_source, "_fetch_iceberg_properties", return_value=props),
+ patch.object(athena_source, "metadata"),
+ ):
+ result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg)
+ assert result == {"k": "0"}
+
+ def test_keeps_whitespace_value(self, athena_source):
+ """A single space is not an empty string and should pass through."""
+ props = {"k": " "}
+ with (
+ patch.object(athena_source, "_fetch_iceberg_properties", return_value=props),
+ patch.object(athena_source, "metadata"),
+ ):
+ result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg)
+ assert result == {"k": " "}
+
+
+class TestGetTableExtensionsDedup:
+ """_processed_prop prevents redundant custom-property registration."""
+
+ def test_same_prop_across_tables_registered_once(self, athena_source):
+ props = {"shared_key": "v"}
+ with (
+ patch.object(athena_source, "_fetch_iceberg_properties", return_value=props),
+ patch.object(athena_source, "metadata") as mock_metadata,
+ ):
+ athena_source.get_table_extensions("tbl1", TableType.Iceberg)
+ athena_source.get_table_extensions("tbl2", TableType.Iceberg)
+
+ assert mock_metadata.create_or_update_custom_property.call_count == 1
+ assert "shared_key" in athena_source._processed_prop
+
+ def test_distinct_props_each_registered_once(self, athena_source):
+ with (
+ patch.object(
+ athena_source,
+ "_fetch_iceberg_properties",
+ side_effect=[{"k1": "a"}, {"k2": "b"}],
+ ),
+ patch.object(athena_source, "metadata") as mock_metadata,
+ ):
+ athena_source.get_table_extensions("tbl1", TableType.Iceberg)
+ athena_source.get_table_extensions("tbl2", TableType.Iceberg)
+
+ assert mock_metadata.create_or_update_custom_property.call_count == 2
+
+ def test_registration_failure_does_not_mark_prop_processed(self, athena_source):
+ """A failed registration must not be cached — so a retry on the next table can succeed."""
+ props = {"k1": "v1"}
+ with (
+ patch.object(athena_source, "_fetch_iceberg_properties", return_value=props),
+ patch.object(athena_source, "metadata") as mock_metadata,
+ ):
+ mock_metadata.create_or_update_custom_property.side_effect = Exception("boom")
+ result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg)
+
+ assert result is None
+ assert "k1" not in athena_source._processed_prop
+
+ def test_registration_failure_for_one_prop_does_not_block_others(self, athena_source):
+ """Registration errors on one prop don't prevent others from being returned."""
+ props = {"bad_prop": "x", "good_prop": "y"}
+ call_flag = {"first": True}
+
+ def side_effect(_):
+ if call_flag["first"]:
+ call_flag["first"] = False
+ raise Exception("boom") # noqa: TRY002
+ return
+
+ with (
+ patch.object(athena_source, "_fetch_iceberg_properties", return_value=props),
+ patch.object(athena_source, "metadata") as mock_metadata,
+ ):
+ mock_metadata.create_or_update_custom_property.side_effect = side_effect
+ result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg)
+
+ assert result == {"good_prop": "y"}
+
+
+class TestFetchIcebergProperties:
+ """Unit tests for the $properties query helper."""
+
+ def test_returns_properties_from_query(self, athena_source):
+ _mock_query_rows(
+ athena_source,
+ [("myprop.owner", "team-a"), ("myprop.source", "ex")],
+ )
+
+ result = athena_source._fetch_iceberg_properties(MOCK_DATABASE_SCHEMA.name.root, MOCK_TABLE_NAME)
+ assert result == {"myprop.owner": "team-a", "myprop.source": "ex"}
+
+ def test_returns_empty_dict_on_exception(self, athena_source):
+ mock_engine = MagicMock()
+ mock_engine.connect.side_effect = Exception("connection refused")
+ athena_source.engine = mock_engine
+
+ result = athena_source._fetch_iceberg_properties(MOCK_DATABASE_SCHEMA.name.root, MOCK_TABLE_NAME)
+ assert result == {}
+
+ def test_filters_null_key_and_null_value_rows(self, athena_source):
+ _mock_query_rows(
+ athena_source,
+ [
+ ("k1", "v1"),
+ (None, "no_key"),
+ ("k2", None),
+ ("k3", "v3"),
+ ],
+ )
+
+ result = athena_source._fetch_iceberg_properties(MOCK_DATABASE_SCHEMA.name.root, MOCK_TABLE_NAME)
+ assert result == {"k1": "v1", "k3": "v3"}
+
+ def test_query_targets_dollar_properties_metatable(self, athena_source):
+ mock_engine = _mock_query_rows(athena_source, [])
+
+ athena_source._fetch_iceberg_properties("my_schema", "my_table")
+
+ execute_call = mock_engine.connect.return_value.__enter__.return_value.execute
+ executed_sql = str(execute_call.call_args.args[0])
+ assert "my_schema" in executed_sql
+ assert "my_table$properties" in executed_sql
+ assert "key" in executed_sql
+ assert "value" in executed_sql
+
+ def test_values_are_coerced_to_string(self, athena_source):
+ _mock_query_rows(athena_source, [("k_int", 42), ("k_bool", True)])
+
+ result = athena_source._fetch_iceberg_properties(MOCK_DATABASE_SCHEMA.name.root, MOCK_TABLE_NAME)
+ assert result == {"k_int": "42", "k_bool": "True"}
+
+
+class TestQueryTableNamesAndTypesIcebergConstant:
+ """Iceberg detection uses the shared ICEBERG_TABLE_TYPE constant."""
+
+ def test_constant_value_matches_glue_parameter(self):
+ from metadata.ingestion.source.database.athena.metadata import (
+ ICEBERG_TABLE_TYPE,
+ )
+
+ assert ICEBERG_TABLE_TYPE == "ICEBERG"
+
+
+class TestIncludeCustomPropertiesSchema:
+ """The includeCustomProperties config flag defaults to False."""
+
+ def test_default_is_false(self):
+ from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
+ DatabaseServiceMetadataPipeline,
+ )
+
+ pipeline = DatabaseServiceMetadataPipeline()
+ assert pipeline.includeCustomProperties is False
+
+ def test_can_be_enabled(self):
+ from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
+ DatabaseServiceMetadataPipeline,
+ )
+
+ pipeline = DatabaseServiceMetadataPipeline(includeCustomProperties=True)
+ assert pipeline.includeCustomProperties is True
diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json
index b0428852ded5..f8dc855707bf 100644
--- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json
+++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json
@@ -90,6 +90,12 @@
"default": true,
"title": "Include Tags"
},
+ "includeCustomProperties": {
+ "description": "Optional configuration to toggle the ingestion of source-specific custom properties (e.g. Iceberg table properties) onto the entity extension. When disabled, no custom property definitions are registered and no extension values are set.",
+ "type": "boolean",
+ "default": false,
+ "title": "Include Custom Properties"
+ },
"includeOwners":{
"title": "Include Owners",
"description": "Set the 'Include Owners' toggle to control whether to include owners to the ingested entity if the owner email matches with a user stored in the OM server as part of metadata ingestion. If the ingested entity already exists and has an owner, the owner will not be overwritten.",
diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts
index 331710b15137..549836e170cf 100644
--- a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts
+++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts
@@ -287,6 +287,12 @@ export interface Pipeline {
* schema information.
*/
extractJsonSchema?: boolean;
+ /**
+ * Optional configuration to toggle the ingestion of source-specific custom properties (e.g.
+ * Iceberg table properties) onto the entity extension. When disabled, no custom property
+ * definitions are registered and no extension values are set.
+ */
+ includeCustomProperties?: boolean;
/**
* Optional configuration to toggle the DDL Statements ingestion.
*/
diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts
index 3f4f856fb42e..a2cbc9e911bc 100644
--- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts
+++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts
@@ -986,6 +986,12 @@ export interface Pipeline {
* schema information.
*/
extractJsonSchema?: boolean;
+ /**
+ * Optional configuration to toggle the ingestion of source-specific custom properties (e.g.
+ * Iceberg table properties) onto the entity extension. When disabled, no custom property
+ * definitions are registered and no extension values are set.
+ */
+ includeCustomProperties?: boolean;
/**
* Optional configuration to toggle the DDL Statements ingestion.
*/
diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/databaseServiceMetadataPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/databaseServiceMetadataPipeline.ts
index 534532887a14..d02a01a8af39 100644
--- a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/databaseServiceMetadataPipeline.ts
+++ b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/databaseServiceMetadataPipeline.ts
@@ -24,6 +24,12 @@ export interface DatabaseServiceMetadataPipeline {
* schema information.
*/
extractJsonSchema?: boolean;
+ /**
+ * Optional configuration to toggle the ingestion of source-specific custom properties (e.g.
+ * Iceberg table properties) onto the entity extension. When disabled, no custom property
+ * definitions are registered and no extension values are set.
+ */
+ includeCustomProperties?: boolean;
/**
* Optional configuration to toggle the DDL Statements ingestion.
*/
diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts
index 0e00f44f8445..d29712a508a8 100644
--- a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts
+++ b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts
@@ -5272,6 +5272,12 @@ export interface Pipeline {
* schema information.
*/
extractJsonSchema?: boolean;
+ /**
+ * Optional configuration to toggle the ingestion of source-specific custom properties (e.g.
+ * Iceberg table properties) onto the entity extension. When disabled, no custom property
+ * definitions are registered and no extension values are set.
+ */
+ includeCustomProperties?: boolean;
/**
* Optional configuration to toggle the DDL Statements ingestion.
*/