From 5ee30e53241a008c7a4403724a83a7dd693b5917 Mon Sep 17 00:00:00 2001 From: ulixius9 Date: Fri, 24 Apr 2026 18:41:27 +0530 Subject: [PATCH 1/8] fix(athena): ingest Iceberg table properties from $properties metatable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Glue Parameters only carry Iceberg catalog pointers (table_type, metadata_location) — they don't surface native Iceberg properties like write.parquet.compression-codec or any user-set keys (e.g. kpler.*) written by PyIceberg/Spark/Airflow. Those live inside metadata.json and are exposed via Athena's $properties metatable. - Switch get_table_extensions to query $properties for Iceberg tables; skip non-Iceberg tables to avoid wasted Athena queries - Sanitise property names (non-alphanumeric/underscore → __), preserve the original name as displayName - MD5-hash sanitised names longer than 256 chars - Skip null and empty-string values - Plumb table_type through get_table_extensions; remove the thread-local props context and the ineffective processed-prop lock (idempotent registration is fine under GIL) Co-Authored-By: Claude Opus 4.7 (1M context) --- .../source/database/athena/metadata.py | 77 ++-- .../source/database/common_db_source.py | 8 +- .../unit/topology/database/test_athena.py | 435 ++++++++++++++++-- 3 files changed, 462 insertions(+), 58 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/athena/metadata.py b/ingestion/src/metadata/ingestion/source/database/athena/metadata.py index 80a9c4fd7ab2..4faca461d1ea 100644 --- a/ingestion/src/metadata/ingestion/source/database/athena/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/athena/metadata.py @@ -11,11 +11,13 @@ """Athena source module""" -import threading +import hashlib +import re import traceback from typing import Dict, Iterable, Optional, Set, Tuple from pyathena.sqlalchemy.base import AthenaDialect +from sqlalchemy import text from sqlalchemy.engine.reflection import Inspector from metadata.clients.aws_client import AWSClient @@ -81,7 +83,10 @@ ATHENA_TAG = "ATHENA TAG" ATHENA_TAG_CLASSIFICATION = "ATHENA TAG CLASSIFICATION" -ATHENA_TABLE_PROPS_CONTEXT_KEY = "_athena_current_tbl_props" +ICEBERG_TABLE_TYPE = "ICEBERG" +PROPERTY_NAME_INVALID_CHARS_PATTERN = re.compile(r"[^A-Za-z0-9_]") +PROPERTY_NAME_REPLACEMENT = "__" +PROPERTY_NAME_MAX_LENGTH = 256 ATHENA_INTERVAL_TYPE_MAP = { **dict.fromkeys(["enum", "string", "VARCHAR"], PartitionIntervalTypes.COLUMN_VALUE), @@ -125,10 +130,8 @@ def __init__( ) self.external_location_map = {} self.schema_description_map = {} - self._thread_local = threading.local() self.glue_client = None self._processed_prop: Set[str] = set() - self._processed_prop_lock = threading.Lock() self._string_property_type_ref = None def prepare(self): @@ -178,7 +181,7 @@ def query_table_names_and_types( params = table.get("Parameters", {}) table_type = ( TableType.Iceberg - if params.get("table_type") == "ICEBERG" + if params.get("table_type") == ICEBERG_TABLE_TYPE else TableType.External ) results.append( @@ -340,24 +343,12 @@ def get_table_description( self, schema_name: str, table_name: str, inspector: Inspector ) -> str: description = None - setattr(self._thread_local, ATHENA_TABLE_PROPS_CONTEXT_KEY, {}) try: table_info: dict = inspector.get_table_comment(table_name, schema_name) table_option = inspector.get_table_options(table_name, schema_name) self.external_location_map[ (self.context.get().database, schema_name, table_name) ] = table_option.get("awsathena_location") - setattr( - self._thread_local, - ATHENA_TABLE_PROPS_CONTEXT_KEY, - { - prop_name: str(prop_value) - for prop_name, prop_value in ( - table_option.get("awsathena_tblproperties") or {} - ).items() - if prop_value is not None - }, - ) # Catch any exception without breaking the ingestion except Exception as exc: # pylint: disable=broad-except logger.debug(traceback.format_exc()) @@ -389,35 +380,69 @@ def _get_columns_internal( catalog_id=self.service_connection.catalogId, ) - def get_table_extensions(self, table_name: str) -> Optional[Dict[str, str]]: + def get_table_extensions( + self, table_name: str, table_type: Optional[TableType] = None + ) -> Optional[Dict[str, str]]: if not self._string_property_type_ref: return None - tbl_properties = getattr(self._thread_local, ATHENA_TABLE_PROPS_CONTEXT_KEY, {}) + if table_type != TableType.Iceberg: + return None + schema_name = self.context.get().database_schema + tbl_properties = self._fetch_iceberg_properties(schema_name, table_name) if not tbl_properties: return None registered_properties = {} for prop_name, prop_value in tbl_properties.items(): - with self._processed_prop_lock: - prop_already_registered = prop_name in self._processed_prop - if not prop_already_registered: + if prop_value is None or prop_value == "": + continue + sanitized_name = PROPERTY_NAME_INVALID_CHARS_PATTERN.sub( + PROPERTY_NAME_REPLACEMENT, prop_name + ) + if len(sanitized_name) > PROPERTY_NAME_MAX_LENGTH: + sanitized_name = hashlib.md5( + prop_name.encode("utf-8"), usedforsecurity=False + ).hexdigest() + if sanitized_name not in self._processed_prop: try: self.metadata.create_or_update_custom_property( OMetaCustomProperties( entity_type=Table, createCustomPropertyRequest=CreateCustomPropertyRequest( - name=prop_name, + name=sanitized_name, + displayName=prop_name, description=prop_name, propertyType=self._string_property_type_ref, ), ) ) - with self._processed_prop_lock: - self._processed_prop.add(prop_name) + self._processed_prop.add(sanitized_name) except Exception as exc: logger.warning( f"Failed to register custom property [{prop_name}] for Athena table properties: {exc}" ) logger.debug(traceback.format_exc()) continue - registered_properties[prop_name] = prop_value + registered_properties[sanitized_name] = prop_value return registered_properties or None + + def _fetch_iceberg_properties( + self, schema_name: str, table_name: str + ) -> Dict[str, str]: + """Read Iceberg native properties from Athena's `
$properties` metatable.""" + query = text( + f'SELECT key, value FROM "{schema_name}"."{table_name}$properties"' + ) + try: + with self.engine.connect() as conn: + result = conn.execute(query) + return { + str(row[0]): str(row[1]) + for row in result + if row[0] is not None and row[1] is not None + } + except Exception as exc: + logger.debug( + f"Unable to read Iceberg $properties for [{schema_name}.{table_name}]: {exc}" + ) + logger.debug(traceback.format_exc()) + return {} diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index 471c986845fa..8a0c66460f49 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -559,7 +559,9 @@ def get_location_path(self, table_name: str, schema_name: str) -> Optional[str]: by default there will be no location path """ - def get_table_extensions(self, table_name: str): + def get_table_extensions( + self, table_name: str, table_type: Optional[TableType] = None + ): """ Method to fetch the extensions of the table """ @@ -647,7 +649,9 @@ def yield_table( locationPath=self.get_location_path( table_name=table_name, schema_name=schema_name ), - extension=self.get_table_extensions(table_name=table_name), + extension=self.get_table_extensions( + table_name=table_name, table_type=table_type + ), ) is_partitioned, partition_details = self.get_table_partition_details( diff --git a/ingestion/tests/unit/topology/database/test_athena.py b/ingestion/tests/unit/topology/database/test_athena.py index a72de60334e5..bc7e74346329 100644 --- a/ingestion/tests/unit/topology/database/test_athena.py +++ b/ingestion/tests/unit/topology/database/test_athena.py @@ -12,11 +12,13 @@ Test athena source """ +import hashlib import unittest from datetime import datetime from unittest.mock import MagicMock, patch from uuid import UUID +import pytest from pydantic import AnyUrl from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest @@ -349,36 +351,6 @@ def test_column_lineage(self): ) assert column_lineage == EXPECTED_COLUMN_LINEAGE - def test_get_table_extensions_returns_none_without_type_ref(self): - self.athena_source._string_property_type_ref = None - assert self.athena_source.get_table_extensions(MOCK_TABLE_NAME) is None - - def test_get_table_extensions_returns_properties_from_description(self): - from metadata.generated.schema.type.customProperty import PropertyType - - self.athena_source._string_property_type_ref = PropertyType( - EntityReference( - id=UUID("00000000-0000-0000-0000-000000000001"), type="type" - ) - ) - mock_inspector = MagicMock() - mock_inspector.get_table_comment.return_value = {"text": "desc"} - mock_inspector.get_table_options.return_value = { - "awsathena_location": "s3://bucket/path", - "awsathena_tblproperties": {"prop_key": "prop_value", "null_prop": None}, - } - self.athena_source.get_table_description( - MOCK_DATABASE_SCHEMA.name.root, MOCK_TABLE_NAME, mock_inspector - ) - - with patch.object(self.athena_source, "metadata") as mock_metadata: - result = self.athena_source.get_table_extensions(MOCK_TABLE_NAME) - - assert result == {"prop_key": "prop_value"} - assert "null_prop" not in result - mock_metadata.create_or_update_custom_property.assert_called_once() - - SUBMISSION_DT = datetime(2024, 1, 2, 10, 0, 0) COMPLETION_DT = datetime(2024, 1, 2, 10, 5, 0) @@ -427,3 +399,406 @@ def test_end_time_falls_back_to_submission_when_completion_missing(self): assert len(results) == 1 assert len(results[0].queries) == 1 assert results[0].queries[0].endTime == SUBMISSION_DT.isoformat(" ", "seconds") + + +@pytest.fixture +def athena_source(): + """A minimally-wired AthenaSource with context populated and a dummy type ref.""" + from metadata.generated.schema.type.customProperty import PropertyType + + config = OpenMetadataWorkflowConfig.model_validate(mock_athena_config) + with patch( + "metadata.ingestion.source.database.database_service." + "DatabaseServiceSource.test_connection", + return_value=False, + ): + source = AthenaSource.create( + mock_athena_config["source"], + config.workflowConfig.openMetadataServerConfig, + ) + + source.context.get().__dict__[ + "database_schema" + ] = MOCK_DATABASE_SCHEMA.name.root + source.context.get().__dict__[ + "database_service" + ] = MOCK_DATABASE_SERVICE.name.root + source.context.get().__dict__["database"] = MOCK_DATABASE.name.root + source._string_property_type_ref = PropertyType( + EntityReference( + id=UUID("00000000-0000-0000-0000-000000000001"), type="type" + ) + ) + return source + + +def _mock_query_rows(source, rows): + """Wire source.engine.connect() as a context manager yielding the given rows.""" + mock_engine = MagicMock() + mock_engine.connect.return_value.__enter__.return_value.execute.return_value = rows + source.engine = mock_engine + return mock_engine + + +def _get_request(mock_metadata, call_index=0): + """Pull the CreateCustomPropertyRequest from a create_or_update_custom_property call.""" + return ( + mock_metadata.create_or_update_custom_property.call_args_list[call_index] + .args[0] + .createCustomPropertyRequest + ) + + +class TestGetTableExtensionsEarlyExits: + """Cover the early-return branches of get_table_extensions.""" + + def test_returns_none_without_type_ref(self, athena_source): + athena_source._string_property_type_ref = None + assert ( + athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) + is None + ) + + def test_returns_none_for_external_table(self, athena_source): + assert ( + athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.External) + is None + ) + + def test_returns_none_for_regular_table(self, athena_source): + assert ( + athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Regular) + is None + ) + + def test_returns_none_when_table_type_is_none(self, athena_source): + assert athena_source.get_table_extensions(MOCK_TABLE_NAME) is None + + def test_returns_none_when_query_yields_no_properties(self, athena_source): + with patch.object( + athena_source, "_fetch_iceberg_properties", return_value={} + ): + assert ( + athena_source.get_table_extensions( + MOCK_TABLE_NAME, TableType.Iceberg + ) + is None + ) + + def test_returns_none_when_all_values_filtered_out(self, athena_source): + props = {"k1": None, "k2": ""} + with patch.object( + athena_source, "_fetch_iceberg_properties", return_value=props + ), patch.object(athena_source, "metadata"): + assert ( + athena_source.get_table_extensions( + MOCK_TABLE_NAME, TableType.Iceberg + ) + is None + ) + + +class TestGetTableExtensionsSanitization: + """Property name sanitization and display-name preservation.""" + + def test_replaces_dot_with_double_underscore(self, athena_source): + props = {"kpler.owner": "team-a"} + with patch.object( + athena_source, "_fetch_iceberg_properties", return_value=props + ), patch.object(athena_source, "metadata") as mock_metadata: + result = athena_source.get_table_extensions( + MOCK_TABLE_NAME, TableType.Iceberg + ) + + assert result == {"kpler__owner": "team-a"} + request = _get_request(mock_metadata) + assert request.name.root == "kpler__owner" + assert request.displayName == "kpler.owner" + + def test_replaces_hyphen_with_double_underscore(self, athena_source): + props = {"kpler-owner": "x"} + with patch.object( + athena_source, "_fetch_iceberg_properties", return_value=props + ), patch.object(athena_source, "metadata"): + result = athena_source.get_table_extensions( + MOCK_TABLE_NAME, TableType.Iceberg + ) + assert result == {"kpler__owner": "x"} + + def test_replaces_each_special_char_independently(self, athena_source): + props = {"kpler.airflow-dag-id": "scrape-dag"} + with patch.object( + athena_source, "_fetch_iceberg_properties", return_value=props + ), patch.object(athena_source, "metadata") as mock_metadata: + result = athena_source.get_table_extensions( + MOCK_TABLE_NAME, TableType.Iceberg + ) + + assert result == {"kpler__airflow__dag__id": "scrape-dag"} + request = _get_request(mock_metadata) + assert request.displayName == "kpler.airflow-dag-id" + + def test_already_valid_name_unchanged(self, athena_source): + props = {"simple_key": "value"} + with patch.object( + athena_source, "_fetch_iceberg_properties", return_value=props + ), patch.object(athena_source, "metadata") as mock_metadata: + result = athena_source.get_table_extensions( + MOCK_TABLE_NAME, TableType.Iceberg + ) + + assert result == {"simple_key": "value"} + request = _get_request(mock_metadata) + assert request.name.root == "simple_key" + assert request.displayName == "simple_key" + + def test_alphanumeric_and_underscore_preserved(self, athena_source): + props = {"abc123_XYZ": "v"} + with patch.object( + athena_source, "_fetch_iceberg_properties", return_value=props + ), patch.object(athena_source, "metadata"): + result = athena_source.get_table_extensions( + MOCK_TABLE_NAME, TableType.Iceberg + ) + assert result == {"abc123_XYZ": "v"} + + def test_sanitized_name_at_256_chars_not_hashed(self, athena_source): + name = "a" * 256 + props = {name: "value"} + with patch.object( + athena_source, "_fetch_iceberg_properties", return_value=props + ), patch.object(athena_source, "metadata") as mock_metadata: + result = athena_source.get_table_extensions( + MOCK_TABLE_NAME, TableType.Iceberg + ) + + assert result == {name: "value"} + request = _get_request(mock_metadata) + assert request.displayName == name + + def test_long_sanitized_name_is_md5_hashed(self, athena_source): + original = "kpler." + ("a" * 260) + props = {original: "value"} + with patch.object( + athena_source, "_fetch_iceberg_properties", return_value=props + ), patch.object(athena_source, "metadata") as mock_metadata: + result = athena_source.get_table_extensions( + MOCK_TABLE_NAME, TableType.Iceberg + ) + + expected_hash = hashlib.md5( + original.encode("utf-8"), usedforsecurity=False + ).hexdigest() + assert result == {expected_hash: "value"} + request = _get_request(mock_metadata) + assert request.name.root == expected_hash + assert request.displayName == original + + def test_hashed_name_is_stable_for_same_input(self, athena_source): + """Same long original name must always map to the same hash.""" + original = "x." + ("b" * 300) + props_first = {original: "v1"} + props_second = {original: "v2"} + + with patch.object( + athena_source, + "_fetch_iceberg_properties", + side_effect=[props_first, props_second], + ), patch.object(athena_source, "metadata"): + r1 = athena_source.get_table_extensions( + "t1", TableType.Iceberg + ) + r2 = athena_source.get_table_extensions( + "t2", TableType.Iceberg + ) + + assert list(r1.keys()) == list(r2.keys()) + + +class TestGetTableExtensionsValueFiltering: + """Filter out null and empty-string property values.""" + + def test_skips_none_valued_property(self, athena_source): + props = {"k1": "v1", "k2": None} + with patch.object( + athena_source, "_fetch_iceberg_properties", return_value=props + ), patch.object(athena_source, "metadata") as mock_metadata: + result = athena_source.get_table_extensions( + MOCK_TABLE_NAME, TableType.Iceberg + ) + + assert result == {"k1": "v1"} + assert mock_metadata.create_or_update_custom_property.call_count == 1 + + def test_skips_empty_string_valued_property(self, athena_source): + props = {"k1": "v1", "k2": ""} + with patch.object( + athena_source, "_fetch_iceberg_properties", return_value=props + ), patch.object(athena_source, "metadata"): + result = athena_source.get_table_extensions( + MOCK_TABLE_NAME, TableType.Iceberg + ) + assert result == {"k1": "v1"} + + def test_keeps_string_zero(self, athena_source): + """'0' is falsy-ish in some checks but is a legitimate value.""" + props = {"k": "0"} + with patch.object( + athena_source, "_fetch_iceberg_properties", return_value=props + ), patch.object(athena_source, "metadata"): + result = athena_source.get_table_extensions( + MOCK_TABLE_NAME, TableType.Iceberg + ) + assert result == {"k": "0"} + + def test_keeps_whitespace_value(self, athena_source): + """A single space is not an empty string and should pass through.""" + props = {"k": " "} + with patch.object( + athena_source, "_fetch_iceberg_properties", return_value=props + ), patch.object(athena_source, "metadata"): + result = athena_source.get_table_extensions( + MOCK_TABLE_NAME, TableType.Iceberg + ) + assert result == {"k": " "} + + +class TestGetTableExtensionsDedup: + """_processed_prop prevents redundant custom-property registration.""" + + def test_same_prop_across_tables_registered_once(self, athena_source): + props = {"shared_key": "v"} + with patch.object( + athena_source, "_fetch_iceberg_properties", return_value=props + ), patch.object(athena_source, "metadata") as mock_metadata: + athena_source.get_table_extensions("tbl1", TableType.Iceberg) + athena_source.get_table_extensions("tbl2", TableType.Iceberg) + + assert mock_metadata.create_or_update_custom_property.call_count == 1 + assert "shared_key" in athena_source._processed_prop + + def test_distinct_props_each_registered_once(self, athena_source): + with patch.object( + athena_source, + "_fetch_iceberg_properties", + side_effect=[{"k1": "a"}, {"k2": "b"}], + ), patch.object(athena_source, "metadata") as mock_metadata: + athena_source.get_table_extensions("tbl1", TableType.Iceberg) + athena_source.get_table_extensions("tbl2", TableType.Iceberg) + + assert mock_metadata.create_or_update_custom_property.call_count == 2 + + def test_registration_failure_does_not_mark_prop_processed( + self, athena_source + ): + """A failed registration must not be cached — so a retry on the next table can succeed.""" + props = {"k1": "v1"} + with patch.object( + athena_source, "_fetch_iceberg_properties", return_value=props + ), patch.object(athena_source, "metadata") as mock_metadata: + mock_metadata.create_or_update_custom_property.side_effect = Exception( + "boom" + ) + result = athena_source.get_table_extensions( + MOCK_TABLE_NAME, TableType.Iceberg + ) + + assert result is None + assert "k1" not in athena_source._processed_prop + + def test_registration_failure_for_one_prop_does_not_block_others( + self, athena_source + ): + """Registration errors on one prop don't prevent others from being returned.""" + props = {"bad_prop": "x", "good_prop": "y"} + call_flag = {"first": True} + + def side_effect(_): + if call_flag["first"]: + call_flag["first"] = False + raise Exception("boom") + return None + + with patch.object( + athena_source, "_fetch_iceberg_properties", return_value=props + ), patch.object(athena_source, "metadata") as mock_metadata: + mock_metadata.create_or_update_custom_property.side_effect = side_effect + result = athena_source.get_table_extensions( + MOCK_TABLE_NAME, TableType.Iceberg + ) + + assert result == {"good_prop": "y"} + + +class TestFetchIcebergProperties: + """Unit tests for the $properties query helper.""" + + def test_returns_properties_from_query(self, athena_source): + _mock_query_rows( + athena_source, + [("kpler.owner", "team-a"), ("kpler.source", "ex")], + ) + + result = athena_source._fetch_iceberg_properties( + MOCK_DATABASE_SCHEMA.name.root, MOCK_TABLE_NAME + ) + assert result == {"kpler.owner": "team-a", "kpler.source": "ex"} + + def test_returns_empty_dict_on_exception(self, athena_source): + mock_engine = MagicMock() + mock_engine.connect.side_effect = Exception("connection refused") + athena_source.engine = mock_engine + + result = athena_source._fetch_iceberg_properties( + MOCK_DATABASE_SCHEMA.name.root, MOCK_TABLE_NAME + ) + assert result == {} + + def test_filters_null_key_and_null_value_rows(self, athena_source): + _mock_query_rows( + athena_source, + [ + ("k1", "v1"), + (None, "no_key"), + ("k2", None), + ("k3", "v3"), + ], + ) + + result = athena_source._fetch_iceberg_properties( + MOCK_DATABASE_SCHEMA.name.root, MOCK_TABLE_NAME + ) + assert result == {"k1": "v1", "k3": "v3"} + + def test_query_targets_dollar_properties_metatable(self, athena_source): + mock_engine = _mock_query_rows(athena_source, []) + + athena_source._fetch_iceberg_properties("my_schema", "my_table") + + execute_call = ( + mock_engine.connect.return_value.__enter__.return_value.execute + ) + executed_sql = str(execute_call.call_args.args[0]) + assert "my_schema" in executed_sql + assert "my_table$properties" in executed_sql + assert "key" in executed_sql + assert "value" in executed_sql + + def test_values_are_coerced_to_string(self, athena_source): + _mock_query_rows(athena_source, [("k_int", 42), ("k_bool", True)]) + + result = athena_source._fetch_iceberg_properties( + MOCK_DATABASE_SCHEMA.name.root, MOCK_TABLE_NAME + ) + assert result == {"k_int": "42", "k_bool": "True"} + + +class TestQueryTableNamesAndTypesIcebergConstant: + """Iceberg detection uses the shared ICEBERG_TABLE_TYPE constant.""" + + def test_constant_value_matches_glue_parameter(self): + from metadata.ingestion.source.database.athena.metadata import ( + ICEBERG_TABLE_TYPE, + ) + + assert ICEBERG_TABLE_TYPE == "ICEBERG" From b19b62332a9cfac119a154ea25e9416c0fd1288a Mon Sep 17 00:00:00 2001 From: ulixius9 Date: Fri, 24 Apr 2026 18:43:26 +0530 Subject: [PATCH 2/8] pyformat --- .../unit/topology/database/test_athena.py | 41 +++++-------------- 1 file changed, 11 insertions(+), 30 deletions(-) diff --git a/ingestion/tests/unit/topology/database/test_athena.py b/ingestion/tests/unit/topology/database/test_athena.py index bc7e74346329..bb1b341a6173 100644 --- a/ingestion/tests/unit/topology/database/test_athena.py +++ b/ingestion/tests/unit/topology/database/test_athena.py @@ -351,6 +351,7 @@ def test_column_lineage(self): ) assert column_lineage == EXPECTED_COLUMN_LINEAGE + SUBMISSION_DT = datetime(2024, 1, 2, 10, 0, 0) COMPLETION_DT = datetime(2024, 1, 2, 10, 5, 0) @@ -417,17 +418,11 @@ def athena_source(): config.workflowConfig.openMetadataServerConfig, ) - source.context.get().__dict__[ - "database_schema" - ] = MOCK_DATABASE_SCHEMA.name.root - source.context.get().__dict__[ - "database_service" - ] = MOCK_DATABASE_SERVICE.name.root + source.context.get().__dict__["database_schema"] = MOCK_DATABASE_SCHEMA.name.root + source.context.get().__dict__["database_service"] = MOCK_DATABASE_SERVICE.name.root source.context.get().__dict__["database"] = MOCK_DATABASE.name.root source._string_property_type_ref = PropertyType( - EntityReference( - id=UUID("00000000-0000-0000-0000-000000000001"), type="type" - ) + EntityReference(id=UUID("00000000-0000-0000-0000-000000000001"), type="type") ) return source @@ -475,13 +470,9 @@ def test_returns_none_when_table_type_is_none(self, athena_source): assert athena_source.get_table_extensions(MOCK_TABLE_NAME) is None def test_returns_none_when_query_yields_no_properties(self, athena_source): - with patch.object( - athena_source, "_fetch_iceberg_properties", return_value={} - ): + with patch.object(athena_source, "_fetch_iceberg_properties", return_value={}): assert ( - athena_source.get_table_extensions( - MOCK_TABLE_NAME, TableType.Iceberg - ) + athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) is None ) @@ -491,9 +482,7 @@ def test_returns_none_when_all_values_filtered_out(self, athena_source): athena_source, "_fetch_iceberg_properties", return_value=props ), patch.object(athena_source, "metadata"): assert ( - athena_source.get_table_extensions( - MOCK_TABLE_NAME, TableType.Iceberg - ) + athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) is None ) @@ -605,12 +594,8 @@ def test_hashed_name_is_stable_for_same_input(self, athena_source): "_fetch_iceberg_properties", side_effect=[props_first, props_second], ), patch.object(athena_source, "metadata"): - r1 = athena_source.get_table_extensions( - "t1", TableType.Iceberg - ) - r2 = athena_source.get_table_extensions( - "t2", TableType.Iceberg - ) + r1 = athena_source.get_table_extensions("t1", TableType.Iceberg) + r2 = athena_source.get_table_extensions("t2", TableType.Iceberg) assert list(r1.keys()) == list(r2.keys()) @@ -688,9 +673,7 @@ def test_distinct_props_each_registered_once(self, athena_source): assert mock_metadata.create_or_update_custom_property.call_count == 2 - def test_registration_failure_does_not_mark_prop_processed( - self, athena_source - ): + def test_registration_failure_does_not_mark_prop_processed(self, athena_source): """A failed registration must not be cached — so a retry on the next table can succeed.""" props = {"k1": "v1"} with patch.object( @@ -775,9 +758,7 @@ def test_query_targets_dollar_properties_metatable(self, athena_source): athena_source._fetch_iceberg_properties("my_schema", "my_table") - execute_call = ( - mock_engine.connect.return_value.__enter__.return_value.execute - ) + execute_call = mock_engine.connect.return_value.__enter__.return_value.execute executed_sql = str(execute_call.call_args.args[0]) assert "my_schema" in executed_sql assert "my_table$properties" in executed_sql From 83bdabeebd8d7771cde4329e368c6effe87b54e8 Mon Sep 17 00:00:00 2001 From: ulixius9 Date: Mon, 27 Apr 2026 14:44:02 +0530 Subject: [PATCH 3/8] add includeCustomProperties, and allow in the property name --- .../source/database/athena/metadata.py | 4 +- .../unit/topology/database/test_athena.py | 81 +++++++++++++++++-- .../databaseServiceMetadataPipeline.json | 6 ++ .../createIngestionPipeline.ts | 6 ++ .../ingestionPipelines/ingestionPipeline.ts | 6 ++ .../databaseServiceMetadataPipeline.ts | 6 ++ .../generated/metadataIngestion/workflow.ts | 6 ++ 7 files changed, 106 insertions(+), 9 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/athena/metadata.py b/ingestion/src/metadata/ingestion/source/database/athena/metadata.py index 4faca461d1ea..11668a07c8ed 100644 --- a/ingestion/src/metadata/ingestion/source/database/athena/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/athena/metadata.py @@ -84,7 +84,7 @@ ATHENA_TAG_CLASSIFICATION = "ATHENA TAG CLASSIFICATION" ICEBERG_TABLE_TYPE = "ICEBERG" -PROPERTY_NAME_INVALID_CHARS_PATTERN = re.compile(r"[^A-Za-z0-9_]") +PROPERTY_NAME_INVALID_CHARS_PATTERN = re.compile(r"[^A-Za-z0-9_.\-]") PROPERTY_NAME_REPLACEMENT = "__" PROPERTY_NAME_MAX_LENGTH = 256 @@ -383,6 +383,8 @@ def _get_columns_internal( def get_table_extensions( self, table_name: str, table_type: Optional[TableType] = None ) -> Optional[Dict[str, str]]: + if not getattr(self.source_config,"includeCustomProperties",False): + return None if not self._string_property_type_ref: return None if table_type != TableType.Iceberg: diff --git a/ingestion/tests/unit/topology/database/test_athena.py b/ingestion/tests/unit/topology/database/test_athena.py index bb1b341a6173..0cf4bd8bcb73 100644 --- a/ingestion/tests/unit/topology/database/test_athena.py +++ b/ingestion/tests/unit/topology/database/test_athena.py @@ -424,6 +424,7 @@ def athena_source(): source._string_property_type_ref = PropertyType( EntityReference(id=UUID("00000000-0000-0000-0000-000000000001"), type="type") ) + source.source_config.includeCustomProperties = True return source @@ -447,6 +448,19 @@ def _get_request(mock_metadata, call_index=0): class TestGetTableExtensionsEarlyExits: """Cover the early-return branches of get_table_extensions.""" + def test_returns_none_when_include_custom_properties_disabled( + self, athena_source + ): + athena_source.source_config.includeCustomProperties = False + with patch.object( + athena_source, "_fetch_iceberg_properties" + ) as mock_fetch: + result = athena_source.get_table_extensions( + MOCK_TABLE_NAME, TableType.Iceberg + ) + assert result is None + mock_fetch.assert_not_called() + def test_returns_none_without_type_ref(self, athena_source): athena_source._string_property_type_ref = None assert ( @@ -490,7 +504,7 @@ def test_returns_none_when_all_values_filtered_out(self, athena_source): class TestGetTableExtensionsSanitization: """Property name sanitization and display-name preservation.""" - def test_replaces_dot_with_double_underscore(self, athena_source): + def test_dot_is_preserved(self, athena_source): props = {"kpler.owner": "team-a"} with patch.object( athena_source, "_fetch_iceberg_properties", return_value=props @@ -499,22 +513,26 @@ def test_replaces_dot_with_double_underscore(self, athena_source): MOCK_TABLE_NAME, TableType.Iceberg ) - assert result == {"kpler__owner": "team-a"} + assert result == {"kpler.owner": "team-a"} request = _get_request(mock_metadata) - assert request.name.root == "kpler__owner" + assert request.name.root == "kpler.owner" assert request.displayName == "kpler.owner" - def test_replaces_hyphen_with_double_underscore(self, athena_source): + def test_hyphen_is_preserved(self, athena_source): props = {"kpler-owner": "x"} with patch.object( athena_source, "_fetch_iceberg_properties", return_value=props - ), patch.object(athena_source, "metadata"): + ), patch.object(athena_source, "metadata") as mock_metadata: result = athena_source.get_table_extensions( MOCK_TABLE_NAME, TableType.Iceberg ) - assert result == {"kpler__owner": "x"} - def test_replaces_each_special_char_independently(self, athena_source): + assert result == {"kpler-owner": "x"} + request = _get_request(mock_metadata) + assert request.name.root == "kpler-owner" + + def test_allowed_punctuation_combined_preserved(self, athena_source): + """Dots and hyphens together are allowed — name passes through untouched.""" props = {"kpler.airflow-dag-id": "scrape-dag"} with patch.object( athena_source, "_fetch_iceberg_properties", return_value=props @@ -523,10 +541,37 @@ def test_replaces_each_special_char_independently(self, athena_source): MOCK_TABLE_NAME, TableType.Iceberg ) - assert result == {"kpler__airflow__dag__id": "scrape-dag"} + assert result == {"kpler.airflow-dag-id": "scrape-dag"} request = _get_request(mock_metadata) + assert request.name.root == "kpler.airflow-dag-id" assert request.displayName == "kpler.airflow-dag-id" + def test_other_special_chars_still_replaced(self, athena_source): + """Everything outside [A-Za-z0-9_.-] gets replaced with __.""" + props = {"kpler/airflow:dag id@prod": "v"} + with patch.object( + athena_source, "_fetch_iceberg_properties", return_value=props + ), patch.object(athena_source, "metadata") as mock_metadata: + result = athena_source.get_table_extensions( + MOCK_TABLE_NAME, TableType.Iceberg + ) + + assert result == {"kpler__airflow__dag__id__prod": "v"} + request = _get_request(mock_metadata) + assert request.displayName == "kpler/airflow:dag id@prod" + + def test_mixed_allowed_and_disallowed_chars(self, athena_source): + """Allowed chars (. -) stay; disallowed chars (/ space) get replaced.""" + props = {"kpler.data/type-v1 beta": "v"} + with patch.object( + athena_source, "_fetch_iceberg_properties", return_value=props + ), patch.object(athena_source, "metadata"): + result = athena_source.get_table_extensions( + MOCK_TABLE_NAME, TableType.Iceberg + ) + + assert result == {"kpler.data__type-v1__beta": "v"} + def test_already_valid_name_unchanged(self, athena_source): props = {"simple_key": "value"} with patch.object( @@ -783,3 +828,23 @@ def test_constant_value_matches_glue_parameter(self): ) assert ICEBERG_TABLE_TYPE == "ICEBERG" + + +class TestIncludeCustomPropertiesSchema: + """The includeCustomProperties config flag defaults to False.""" + + def test_default_is_false(self): + from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( + DatabaseServiceMetadataPipeline, + ) + + pipeline = DatabaseServiceMetadataPipeline() + assert pipeline.includeCustomProperties is False + + def test_can_be_enabled(self): + from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( + DatabaseServiceMetadataPipeline, + ) + + pipeline = DatabaseServiceMetadataPipeline(includeCustomProperties=True) + assert pipeline.includeCustomProperties is True diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json index b0428852ded5..f8dc855707bf 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json @@ -90,6 +90,12 @@ "default": true, "title": "Include Tags" }, + "includeCustomProperties": { + "description": "Optional configuration to toggle the ingestion of source-specific custom properties (e.g. Iceberg table properties) onto the entity extension. When disabled, no custom property definitions are registered and no extension values are set.", + "type": "boolean", + "default": false, + "title": "Include Custom Properties" + }, "includeOwners":{ "title": "Include Owners", "description": "Set the 'Include Owners' toggle to control whether to include owners to the ingested entity if the owner email matches with a user stored in the OM server as part of metadata ingestion. If the ingested entity already exists and has an owner, the owner will not be overwritten.", diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts index a15ca90b2856..e02dfd973520 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts @@ -285,6 +285,12 @@ export interface Pipeline { * schema information. */ extractJsonSchema?: boolean; + /** + * Optional configuration to toggle the ingestion of source-specific custom properties (e.g. + * Iceberg table properties) onto the entity extension. When disabled, no custom property + * definitions are registered and no extension values are set. + */ + includeCustomProperties?: boolean; /** * Optional configuration to toggle the DDL Statements ingestion. */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts index 406c685c0ff7..af1141e2d086 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts @@ -979,6 +979,12 @@ export interface Pipeline { * schema information. */ extractJsonSchema?: boolean; + /** + * Optional configuration to toggle the ingestion of source-specific custom properties (e.g. + * Iceberg table properties) onto the entity extension. When disabled, no custom property + * definitions are registered and no extension values are set. + */ + includeCustomProperties?: boolean; /** * Optional configuration to toggle the DDL Statements ingestion. */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/databaseServiceMetadataPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/databaseServiceMetadataPipeline.ts index 534532887a14..d02a01a8af39 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/databaseServiceMetadataPipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/databaseServiceMetadataPipeline.ts @@ -24,6 +24,12 @@ export interface DatabaseServiceMetadataPipeline { * schema information. */ extractJsonSchema?: boolean; + /** + * Optional configuration to toggle the ingestion of source-specific custom properties (e.g. + * Iceberg table properties) onto the entity extension. When disabled, no custom property + * definitions are registered and no extension values are set. + */ + includeCustomProperties?: boolean; /** * Optional configuration to toggle the DDL Statements ingestion. */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts index c2490c28cbb4..65e635061ed9 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts @@ -5226,6 +5226,12 @@ export interface Pipeline { * schema information. */ extractJsonSchema?: boolean; + /** + * Optional configuration to toggle the ingestion of source-specific custom properties (e.g. + * Iceberg table properties) onto the entity extension. When disabled, no custom property + * definitions are registered and no extension values are set. + */ + includeCustomProperties?: boolean; /** * Optional configuration to toggle the DDL Statements ingestion. */ From 3a0f75825bae323530421f7e537d22183f4407e5 Mon Sep 17 00:00:00 2001 From: ulixius9 Date: Mon, 27 Apr 2026 19:11:12 +0530 Subject: [PATCH 4/8] fix tests --- .../unit/topology/database/test_athena.py | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/ingestion/tests/unit/topology/database/test_athena.py b/ingestion/tests/unit/topology/database/test_athena.py index aa3885ea2168..1e67df1376de 100644 --- a/ingestion/tests/unit/topology/database/test_athena.py +++ b/ingestion/tests/unit/topology/database/test_athena.py @@ -453,67 +453,67 @@ class TestGetTableExtensionsSanitization: """Property name sanitization and display-name preservation.""" def test_dot_is_preserved(self, athena_source): - props = {"kpler.owner": "team-a"} + props = {"myprop.owner": "team-a"} with ( patch.object(athena_source, "_fetch_iceberg_properties", return_value=props), patch.object(athena_source, "metadata") as mock_metadata, ): result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) - assert result == {"kpler.owner": "team-a"} + assert result == {"myprop.owner": "team-a"} request = _get_request(mock_metadata) - assert request.name.root == "kpler.owner" - assert request.displayName == "kpler.owner" + assert request.name.root == "myprop.owner" + assert request.displayName == "myprop.owner" def test_hyphen_is_preserved(self, athena_source): - props = {"kpler-owner": "x"} + props = {"myprop-owner": "x"} with ( patch.object(athena_source, "_fetch_iceberg_properties", return_value=props), patch.object(athena_source, "metadata") as mock_metadata, ): result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) - assert result == {"kpler-owner": "x"} + assert result == {"myprop-owner": "x"} request = _get_request(mock_metadata) - assert request.name.root == "kpler-owner" + assert request.name.root == "myprop-owner" def test_allowed_punctuation_combined_preserved(self, athena_source): """Dots and hyphens together are allowed — name passes through untouched.""" - props = {"kpler.airflow-dag-id": "scrape-dag"} + props = {"myprop.airflow-dag-id": "scrape-dag"} with ( patch.object(athena_source, "_fetch_iceberg_properties", return_value=props), patch.object(athena_source, "metadata") as mock_metadata, ): result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) - assert result == {"kpler.airflow-dag-id": "scrape-dag"} + assert result == {"myprop.airflow-dag-id": "scrape-dag"} request = _get_request(mock_metadata) - assert request.name.root == "kpler.airflow-dag-id" - assert request.displayName == "kpler.airflow-dag-id" + assert request.name.root == "myprop.airflow-dag-id" + assert request.displayName == "myprop.airflow-dag-id" def test_other_special_chars_still_replaced(self, athena_source): """Everything outside [A-Za-z0-9_.-] gets replaced with __.""" - props = {"kpler/airflow:dag id@prod": "v"} + props = {"myprop/airflow:dag id@prod": "v"} with ( patch.object(athena_source, "_fetch_iceberg_properties", return_value=props), patch.object(athena_source, "metadata") as mock_metadata, ): result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) - assert result == {"kpler__airflow__dag__id__prod": "v"} + assert result == {"myprop__airflow__dag__id__prod": "v"} request = _get_request(mock_metadata) - assert request.displayName == "kpler/airflow:dag id@prod" + assert request.displayName == "myprop/airflow:dag id@prod" def test_mixed_allowed_and_disallowed_chars(self, athena_source): """Allowed chars (. -) stay; disallowed chars (/ space) get replaced.""" - props = {"kpler.data/type-v1 beta": "v"} + props = {"myprop.data/type-v1 beta": "v"} with ( patch.object(athena_source, "_fetch_iceberg_properties", return_value=props), patch.object(athena_source, "metadata"), ): result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) - assert result == {"kpler.data__type-v1__beta": "v"} + assert result == {"myprop.data__type-v1__beta": "v"} def test_already_valid_name_unchanged(self, athena_source): props = {"simple_key": "value"} @@ -551,7 +551,7 @@ def test_sanitized_name_at_256_chars_not_hashed(self, athena_source): assert request.displayName == name def test_long_sanitized_name_is_md5_hashed(self, athena_source): - original = "kpler." + ("a" * 260) + original = "myprop." + ("a" * 260) props = {original: "value"} with ( patch.object(athena_source, "_fetch_iceberg_properties", return_value=props), @@ -698,11 +698,11 @@ class TestFetchIcebergProperties: def test_returns_properties_from_query(self, athena_source): _mock_query_rows( athena_source, - [("kpler.owner", "team-a"), ("kpler.source", "ex")], + [("myprop.owner", "team-a"), ("myprop.source", "ex")], ) result = athena_source._fetch_iceberg_properties(MOCK_DATABASE_SCHEMA.name.root, MOCK_TABLE_NAME) - assert result == {"kpler.owner": "team-a", "kpler.source": "ex"} + assert result == {"myprop.owner": "team-a", "myprop.source": "ex"} def test_returns_empty_dict_on_exception(self, athena_source): mock_engine = MagicMock() From ce35e7a80b3e6888d80e153759b393e2a240fda5 Mon Sep 17 00:00:00 2001 From: ulixius9 Date: Tue, 28 Apr 2026 13:10:16 +0530 Subject: [PATCH 5/8] format --- .../metadata/ingestion/source/database/athena/metadata.py | 8 ++++---- .../ingestion/source/database/common_db_source.py | 2 +- ingestion/tests/unit/topology/database/test_athena.py | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/athena/metadata.py b/ingestion/src/metadata/ingestion/source/database/athena/metadata.py index 73e1f8ba849b..101df4faf99d 100644 --- a/ingestion/src/metadata/ingestion/source/database/athena/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/athena/metadata.py @@ -14,7 +14,7 @@ import hashlib import re import traceback -from typing import Dict, Iterable, Optional, Set, Tuple # noqa: UP035 +from typing import Iterable, Optional, Tuple # noqa: UP035 from pyathena.sqlalchemy.base import AthenaDialect from sqlalchemy import text @@ -123,7 +123,7 @@ def __init__( self.external_location_map = {} self.schema_description_map = {} self.glue_client = None - self._processed_prop: Set[str] = set() + self._processed_prop: set[str] = set() self._string_property_type_ref = None def prepare(self): @@ -347,7 +347,7 @@ def _get_columns_internal( catalog_id=self.service_connection.catalogId, ) - def get_table_extensions(self, table_name: str, table_type: Optional[TableType] = None) -> Optional[Dict[str, str]]: + def get_table_extensions(self, table_name: str, table_type: TableType | None = None) -> dict[str, str] | None: if not getattr(self.source_config, "includeCustomProperties", False): return None if not self._string_property_type_ref: @@ -388,7 +388,7 @@ def get_table_extensions(self, table_name: str, table_type: Optional[TableType] registered_properties[sanitized_name] = prop_value return registered_properties or None - def _fetch_iceberg_properties(self, schema_name: str, table_name: str) -> Dict[str, str]: + def _fetch_iceberg_properties(self, schema_name: str, table_name: str) -> dict[str, str]: """Read Iceberg native properties from Athena's `
$properties` metatable.""" query = text(f'SELECT key, value FROM "{schema_name}"."{table_name}$properties"') try: diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index 028d0edc8a14..3640bdd28daf 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -489,7 +489,7 @@ def get_location_path(self, table_name: str, schema_name: str) -> Optional[str]: by default there will be no location path """ - def get_table_extensions(self, table_name: str, table_type: Optional[TableType] = None): + def get_table_extensions(self, table_name: str, table_type: TableType | None = None): """ Method to fetch the extensions of the table """ diff --git a/ingestion/tests/unit/topology/database/test_athena.py b/ingestion/tests/unit/topology/database/test_athena.py index 8a64cd795f8e..7dce30c7b332 100644 --- a/ingestion/tests/unit/topology/database/test_athena.py +++ b/ingestion/tests/unit/topology/database/test_athena.py @@ -679,8 +679,8 @@ def test_registration_failure_for_one_prop_does_not_block_others(self, athena_so def side_effect(_): if call_flag["first"]: call_flag["first"] = False - raise Exception("boom") - return None + raise Exception("boom") # noqa: TRY002 + return with ( patch.object(athena_source, "_fetch_iceberg_properties", return_value=props), From e711265c44c7d6e750ec1a272e36051caaa9f2dd Mon Sep 17 00:00:00 2001 From: ulixius9 Date: Tue, 28 Apr 2026 14:01:09 +0530 Subject: [PATCH 6/8] fix static --- .../src/metadata/ingestion/source/database/athena/metadata.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ingestion/src/metadata/ingestion/source/database/athena/metadata.py b/ingestion/src/metadata/ingestion/source/database/athena/metadata.py index 101df4faf99d..a66797efb632 100644 --- a/ingestion/src/metadata/ingestion/source/database/athena/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/athena/metadata.py @@ -354,7 +354,7 @@ def get_table_extensions(self, table_name: str, table_type: TableType | None = N return None if table_type != TableType.Iceberg: return None - schema_name = self.context.get().database_schema + schema_name: str = getattr(self.context.get(), "database_schema", "") tbl_properties = self._fetch_iceberg_properties(schema_name, table_name) if not tbl_properties: return None @@ -375,6 +375,7 @@ def get_table_extensions(self, table_name: str, table_type: TableType | None = N displayName=prop_name, description=prop_name, propertyType=self._string_property_type_ref, + customPropertyConfig=None, ), ) ) From a1e35f0f24a0a378b1146643afce0762d82c6f61 Mon Sep 17 00:00:00 2001 From: ulixius9 Date: Tue, 28 Apr 2026 16:32:04 +0530 Subject: [PATCH 7/8] fix static check --- .../ingestion/source/database/athena/metadata.py | 9 +++++---- .../ingestion/source/database/common_db_source.py | 6 +++++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/athena/metadata.py b/ingestion/src/metadata/ingestion/source/database/athena/metadata.py index a66797efb632..b362ef272303 100644 --- a/ingestion/src/metadata/ingestion/source/database/athena/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/athena/metadata.py @@ -42,6 +42,7 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) +from metadata.generated.schema.type.basic import EntityName, Markdown from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.models.custom_properties import ( @@ -360,20 +361,20 @@ def get_table_extensions(self, table_name: str, table_type: TableType | None = N return None registered_properties = {} for prop_name, prop_value in tbl_properties.items(): - if prop_value is None or prop_value == "": + if not prop_value: continue sanitized_name = PROPERTY_NAME_INVALID_CHARS_PATTERN.sub(PROPERTY_NAME_REPLACEMENT, prop_name) if len(sanitized_name) > PROPERTY_NAME_MAX_LENGTH: sanitized_name = hashlib.md5(prop_name.encode("utf-8"), usedforsecurity=False).hexdigest() if sanitized_name not in self._processed_prop: try: - self.metadata.create_or_update_custom_property( + _ = self.metadata.create_or_update_custom_property( OMetaCustomProperties( entity_type=Table, createCustomPropertyRequest=CreateCustomPropertyRequest( - name=sanitized_name, + name=EntityName(sanitized_name), displayName=prop_name, - description=prop_name, + description=Markdown(prop_name), propertyType=self._string_property_type_ref, customPropertyConfig=None, ), diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index 3640bdd28daf..7e53acc89828 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -489,7 +489,11 @@ def get_location_path(self, table_name: str, schema_name: str) -> Optional[str]: by default there will be no location path """ - def get_table_extensions(self, table_name: str, table_type: TableType | None = None): + def get_table_extensions( + self, + table_name: str, # pyright: ignore[reportUnusedParameter] + table_type: TableType | None = None, # pyright: ignore[reportUnusedParameter] + ): """ Method to fetch the extensions of the table """ From 586b7e73c5c01351368c6779c9365e9bcf1a6486 Mon Sep 17 00:00:00 2001 From: ulixius9 Date: Tue, 28 Apr 2026 16:38:11 +0530 Subject: [PATCH 8/8] fix static check --- .../src/metadata/ingestion/source/database/athena/metadata.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingestion/src/metadata/ingestion/source/database/athena/metadata.py b/ingestion/src/metadata/ingestion/source/database/athena/metadata.py index b362ef272303..4246031c6197 100644 --- a/ingestion/src/metadata/ingestion/source/database/athena/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/athena/metadata.py @@ -368,7 +368,7 @@ def get_table_extensions(self, table_name: str, table_type: TableType | None = N sanitized_name = hashlib.md5(prop_name.encode("utf-8"), usedforsecurity=False).hexdigest() if sanitized_name not in self._processed_prop: try: - _ = self.metadata.create_or_update_custom_property( + self.metadata.create_or_update_custom_property( # pyright: ignore[reportUnknownMemberType, reportUnusedCallResult] OMetaCustomProperties( entity_type=Table, createCustomPropertyRequest=CreateCustomPropertyRequest(