diff --git a/ingestion/src/metadata/ingestion/source/database/athena/metadata.py b/ingestion/src/metadata/ingestion/source/database/athena/metadata.py index e0dc051fadc0..4246031c6197 100644 --- a/ingestion/src/metadata/ingestion/source/database/athena/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/athena/metadata.py @@ -11,11 +11,13 @@ """Athena source module""" -import threading +import hashlib +import re import traceback -from typing import Dict, Iterable, Optional, Set, Tuple # noqa: UP035 +from typing import Iterable, Optional, Tuple # noqa: UP035 from pyathena.sqlalchemy.base import AthenaDialect +from sqlalchemy import text from sqlalchemy.engine.reflection import Inspector from metadata.clients.aws_client import AWSClient @@ -40,6 +42,7 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) +from metadata.generated.schema.type.basic import EntityName, Markdown from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.models.custom_properties import ( @@ -81,7 +84,10 @@ ATHENA_TAG = "ATHENA TAG" ATHENA_TAG_CLASSIFICATION = "ATHENA TAG CLASSIFICATION" -ATHENA_TABLE_PROPS_CONTEXT_KEY = "_athena_current_tbl_props" +ICEBERG_TABLE_TYPE = "ICEBERG" +PROPERTY_NAME_INVALID_CHARS_PATTERN = re.compile(r"[^A-Za-z0-9_.\-]") +PROPERTY_NAME_REPLACEMENT = "__" +PROPERTY_NAME_MAX_LENGTH = 256 ATHENA_INTERVAL_TYPE_MAP = { **dict.fromkeys(["enum", "string", "VARCHAR"], PartitionIntervalTypes.COLUMN_VALUE), @@ -117,10 +123,8 @@ def __init__( self.athena_lake_formation_client = AthenaLakeFormationClient(connection=self.service_connection) self.external_location_map = {} self.schema_description_map = {} - self._thread_local = threading.local() self.glue_client = None - self._processed_prop: Set[str] = set() # noqa: UP006 - self._processed_prop_lock = threading.Lock() + self._processed_prop: set[str] = set() self._string_property_type_ref = None def prepare(self): @@ -160,7 +164,9 @@ def query_table_names_and_types(self, schema_name: str) -> Iterable[TableNameAnd for page in paginator.paginate(DatabaseName=schema_name): for table in page.get("TableList", []): params = table.get("Parameters", {}) - table_type = TableType.Iceberg if params.get("table_type") == "ICEBERG" else TableType.External + table_type = ( + TableType.Iceberg if params.get("table_type") == ICEBERG_TABLE_TYPE else TableType.External + ) results.append(TableNameAndType(name=table["Name"], type_=table_type)) return results # noqa: TRY300 except Exception as exc: @@ -307,22 +313,12 @@ def yield_table_tags( # pylint: disable=arguments-differ def get_table_description(self, schema_name: str, table_name: str, inspector: Inspector) -> str: description = None - setattr(self._thread_local, ATHENA_TABLE_PROPS_CONTEXT_KEY, {}) try: table_info: dict = inspector.get_table_comment(table_name, schema_name) table_option = inspector.get_table_options(table_name, schema_name) self.external_location_map[(self.context.get().database, schema_name, table_name)] = table_option.get( "awsathena_location" ) - setattr( - self._thread_local, - ATHENA_TABLE_PROPS_CONTEXT_KEY, - { - prop_name: str(prop_value) - for prop_name, prop_value in (table_option.get("awsathena_tblproperties") or {}).items() - if prop_value is not None - }, - ) # Catch any exception without breaking the ingestion except Exception as exc: # pylint: disable=broad-except logger.debug(traceback.format_exc()) @@ -352,35 +348,56 @@ def _get_columns_internal( catalog_id=self.service_connection.catalogId, ) - def get_table_extensions(self, table_name: str) -> Optional[Dict[str, str]]: # noqa: UP006, UP045 + def get_table_extensions(self, table_name: str, table_type: TableType | None = None) -> dict[str, str] | None: + if not getattr(self.source_config, "includeCustomProperties", False): + return None if not self._string_property_type_ref: return None - tbl_properties = getattr(self._thread_local, ATHENA_TABLE_PROPS_CONTEXT_KEY, {}) + if table_type != TableType.Iceberg: + return None + schema_name: str = getattr(self.context.get(), "database_schema", "") + tbl_properties = self._fetch_iceberg_properties(schema_name, table_name) if not tbl_properties: return None registered_properties = {} for prop_name, prop_value in tbl_properties.items(): - with self._processed_prop_lock: - prop_already_registered = prop_name in self._processed_prop - if not prop_already_registered: + if not prop_value: + continue + sanitized_name = PROPERTY_NAME_INVALID_CHARS_PATTERN.sub(PROPERTY_NAME_REPLACEMENT, prop_name) + if len(sanitized_name) > PROPERTY_NAME_MAX_LENGTH: + sanitized_name = hashlib.md5(prop_name.encode("utf-8"), usedforsecurity=False).hexdigest() + if sanitized_name not in self._processed_prop: try: - self.metadata.create_or_update_custom_property( + self.metadata.create_or_update_custom_property( # pyright: ignore[reportUnknownMemberType, reportUnusedCallResult] OMetaCustomProperties( entity_type=Table, createCustomPropertyRequest=CreateCustomPropertyRequest( - name=prop_name, - description=prop_name, + name=EntityName(sanitized_name), + displayName=prop_name, + description=Markdown(prop_name), propertyType=self._string_property_type_ref, + customPropertyConfig=None, ), ) ) - with self._processed_prop_lock: - self._processed_prop.add(prop_name) + self._processed_prop.add(sanitized_name) except Exception as exc: logger.warning( f"Failed to register custom property [{prop_name}] for Athena table properties: {exc}" ) logger.debug(traceback.format_exc()) continue - registered_properties[prop_name] = prop_value + registered_properties[sanitized_name] = prop_value return registered_properties or None + + def _fetch_iceberg_properties(self, schema_name: str, table_name: str) -> dict[str, str]: + """Read Iceberg native properties from Athena's `$properties` metatable.""" + query = text(f'SELECT key, value FROM "{schema_name}"."{table_name}$properties"') + try: + with self.engine.connect() as conn: + result = conn.execute(query) + return {str(row[0]): str(row[1]) for row in result if row[0] is not None and row[1] is not None} + except Exception as exc: + logger.debug(f"Unable to read Iceberg $properties for [{schema_name}.{table_name}]: {exc}") + logger.debug(traceback.format_exc()) + return {} diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index b6a0319eec75..7e53acc89828 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -489,7 +489,11 @@ def get_location_path(self, table_name: str, schema_name: str) -> Optional[str]: by default there will be no location path """ - def get_table_extensions(self, table_name: str): + def get_table_extensions( + self, + table_name: str, # pyright: ignore[reportUnusedParameter] + table_type: TableType | None = None, # pyright: ignore[reportUnusedParameter] + ): """ Method to fetch the extensions of the table """ @@ -569,7 +573,7 @@ def yield_table(self, table_name_and_type: Tuple[str, TableType]) -> Iterable[Ei ), owners=self.get_owner_ref(table_name=table_name), locationPath=self.get_location_path(table_name=table_name, schema_name=schema_name), - extension=self.get_table_extensions(table_name=table_name), + extension=self.get_table_extensions(table_name=table_name, table_type=table_type), ) is_partitioned, partition_details = self.get_table_partition_details( diff --git a/ingestion/tests/unit/topology/database/test_athena.py b/ingestion/tests/unit/topology/database/test_athena.py index 591a8ade432b..7dce30c7b332 100644 --- a/ingestion/tests/unit/topology/database/test_athena.py +++ b/ingestion/tests/unit/topology/database/test_athena.py @@ -12,11 +12,13 @@ Test athena source """ +import hashlib import unittest from datetime import datetime from unittest.mock import MagicMock, patch from uuid import UUID +import pytest from pydantic import AnyUrl from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest @@ -322,31 +324,6 @@ def test_column_lineage(self): ) assert column_lineage == EXPECTED_COLUMN_LINEAGE - def test_get_table_extensions_returns_none_without_type_ref(self): - self.athena_source._string_property_type_ref = None - assert self.athena_source.get_table_extensions(MOCK_TABLE_NAME) is None - - def test_get_table_extensions_returns_properties_from_description(self): - from metadata.generated.schema.type.customProperty import PropertyType - - self.athena_source._string_property_type_ref = PropertyType( - EntityReference(id=UUID("00000000-0000-0000-0000-000000000001"), type="type") - ) - mock_inspector = MagicMock() - mock_inspector.get_table_comment.return_value = {"text": "desc"} - mock_inspector.get_table_options.return_value = { - "awsathena_location": "s3://bucket/path", - "awsathena_tblproperties": {"prop_key": "prop_value", "null_prop": None}, - } - self.athena_source.get_table_description(MOCK_DATABASE_SCHEMA.name.root, MOCK_TABLE_NAME, mock_inspector) - - with patch.object(self.athena_source, "metadata") as mock_metadata: - result = self.athena_source.get_table_extensions(MOCK_TABLE_NAME) - - assert result == {"prop_key": "prop_value"} - assert "null_prop" not in result - mock_metadata.create_or_update_custom_property.assert_called_once() - SUBMISSION_DT = datetime(2024, 1, 2, 10, 0, 0) COMPLETION_DT = datetime(2024, 1, 2, 10, 5, 0) @@ -396,3 +373,404 @@ def test_end_time_falls_back_to_submission_when_completion_missing(self): assert len(results) == 1 assert len(results[0].queries) == 1 assert results[0].queries[0].endTime == SUBMISSION_DT.isoformat(" ", "seconds") + + +@pytest.fixture +def athena_source(): + """A minimally-wired AthenaSource with context populated and a dummy type ref.""" + from metadata.generated.schema.type.customProperty import PropertyType + + config = OpenMetadataWorkflowConfig.model_validate(mock_athena_config) + with patch( + "metadata.ingestion.source.database.database_service.DatabaseServiceSource.test_connection", + return_value=False, + ): + source = AthenaSource.create( + mock_athena_config["source"], + config.workflowConfig.openMetadataServerConfig, + ) + + source.context.get().__dict__["database_schema"] = MOCK_DATABASE_SCHEMA.name.root + source.context.get().__dict__["database_service"] = MOCK_DATABASE_SERVICE.name.root + source.context.get().__dict__["database"] = MOCK_DATABASE.name.root + source._string_property_type_ref = PropertyType( + EntityReference(id=UUID("00000000-0000-0000-0000-000000000001"), type="type") + ) + source.source_config.includeCustomProperties = True + return source + + +def _mock_query_rows(source, rows): + """Wire source.engine.connect() as a context manager yielding the given rows.""" + mock_engine = MagicMock() + mock_engine.connect.return_value.__enter__.return_value.execute.return_value = rows + source.engine = mock_engine + return mock_engine + + +def _get_request(mock_metadata, call_index=0): + """Pull the CreateCustomPropertyRequest from a create_or_update_custom_property call.""" + return mock_metadata.create_or_update_custom_property.call_args_list[call_index].args[0].createCustomPropertyRequest + + +class TestGetTableExtensionsEarlyExits: + """Cover the early-return branches of get_table_extensions.""" + + def test_returns_none_when_include_custom_properties_disabled(self, athena_source): + athena_source.source_config.includeCustomProperties = False + with patch.object(athena_source, "_fetch_iceberg_properties") as mock_fetch: + result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) + assert result is None + mock_fetch.assert_not_called() + + def test_returns_none_without_type_ref(self, athena_source): + athena_source._string_property_type_ref = None + assert athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) is None + + def test_returns_none_for_external_table(self, athena_source): + assert athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.External) is None + + def test_returns_none_for_regular_table(self, athena_source): + assert athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Regular) is None + + def test_returns_none_when_table_type_is_none(self, athena_source): + assert athena_source.get_table_extensions(MOCK_TABLE_NAME) is None + + def test_returns_none_when_query_yields_no_properties(self, athena_source): + with patch.object(athena_source, "_fetch_iceberg_properties", return_value={}): + assert athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) is None + + def test_returns_none_when_all_values_filtered_out(self, athena_source): + props = {"k1": None, "k2": ""} + with ( + patch.object(athena_source, "_fetch_iceberg_properties", return_value=props), + patch.object(athena_source, "metadata"), + ): + assert athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) is None + + +class TestGetTableExtensionsSanitization: + """Property name sanitization and display-name preservation.""" + + def test_dot_is_preserved(self, athena_source): + props = {"myprop.owner": "team-a"} + with ( + patch.object(athena_source, "_fetch_iceberg_properties", return_value=props), + patch.object(athena_source, "metadata") as mock_metadata, + ): + result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) + + assert result == {"myprop.owner": "team-a"} + request = _get_request(mock_metadata) + assert request.name.root == "myprop.owner" + assert request.displayName == "myprop.owner" + + def test_hyphen_is_preserved(self, athena_source): + props = {"myprop-owner": "x"} + with ( + patch.object(athena_source, "_fetch_iceberg_properties", return_value=props), + patch.object(athena_source, "metadata") as mock_metadata, + ): + result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) + + assert result == {"myprop-owner": "x"} + request = _get_request(mock_metadata) + assert request.name.root == "myprop-owner" + + def test_allowed_punctuation_combined_preserved(self, athena_source): + """Dots and hyphens together are allowed — name passes through untouched.""" + props = {"myprop.airflow-dag-id": "scrape-dag"} + with ( + patch.object(athena_source, "_fetch_iceberg_properties", return_value=props), + patch.object(athena_source, "metadata") as mock_metadata, + ): + result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) + + assert result == {"myprop.airflow-dag-id": "scrape-dag"} + request = _get_request(mock_metadata) + assert request.name.root == "myprop.airflow-dag-id" + assert request.displayName == "myprop.airflow-dag-id" + + def test_other_special_chars_still_replaced(self, athena_source): + """Everything outside [A-Za-z0-9_.-] gets replaced with __.""" + props = {"myprop/airflow:dag id@prod": "v"} + with ( + patch.object(athena_source, "_fetch_iceberg_properties", return_value=props), + patch.object(athena_source, "metadata") as mock_metadata, + ): + result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) + + assert result == {"myprop__airflow__dag__id__prod": "v"} + request = _get_request(mock_metadata) + assert request.displayName == "myprop/airflow:dag id@prod" + + def test_mixed_allowed_and_disallowed_chars(self, athena_source): + """Allowed chars (. -) stay; disallowed chars (/ space) get replaced.""" + props = {"myprop.data/type-v1 beta": "v"} + with ( + patch.object(athena_source, "_fetch_iceberg_properties", return_value=props), + patch.object(athena_source, "metadata"), + ): + result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) + + assert result == {"myprop.data__type-v1__beta": "v"} + + def test_already_valid_name_unchanged(self, athena_source): + props = {"simple_key": "value"} + with ( + patch.object(athena_source, "_fetch_iceberg_properties", return_value=props), + patch.object(athena_source, "metadata") as mock_metadata, + ): + result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) + + assert result == {"simple_key": "value"} + request = _get_request(mock_metadata) + assert request.name.root == "simple_key" + assert request.displayName == "simple_key" + + def test_alphanumeric_and_underscore_preserved(self, athena_source): + props = {"abc123_XYZ": "v"} + with ( + patch.object(athena_source, "_fetch_iceberg_properties", return_value=props), + patch.object(athena_source, "metadata"), + ): + result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) + assert result == {"abc123_XYZ": "v"} + + def test_sanitized_name_at_256_chars_not_hashed(self, athena_source): + name = "a" * 256 + props = {name: "value"} + with ( + patch.object(athena_source, "_fetch_iceberg_properties", return_value=props), + patch.object(athena_source, "metadata") as mock_metadata, + ): + result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) + + assert result == {name: "value"} + request = _get_request(mock_metadata) + assert request.displayName == name + + def test_long_sanitized_name_is_md5_hashed(self, athena_source): + original = "myprop." + ("a" * 260) + props = {original: "value"} + with ( + patch.object(athena_source, "_fetch_iceberg_properties", return_value=props), + patch.object(athena_source, "metadata") as mock_metadata, + ): + result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) + + expected_hash = hashlib.md5(original.encode("utf-8"), usedforsecurity=False).hexdigest() + assert result == {expected_hash: "value"} + request = _get_request(mock_metadata) + assert request.name.root == expected_hash + assert request.displayName == original + + def test_hashed_name_is_stable_for_same_input(self, athena_source): + """Same long original name must always map to the same hash.""" + original = "x." + ("b" * 300) + props_first = {original: "v1"} + props_second = {original: "v2"} + + with ( + patch.object( + athena_source, + "_fetch_iceberg_properties", + side_effect=[props_first, props_second], + ), + patch.object(athena_source, "metadata"), + ): + r1 = athena_source.get_table_extensions("t1", TableType.Iceberg) + r2 = athena_source.get_table_extensions("t2", TableType.Iceberg) + + assert list(r1.keys()) == list(r2.keys()) + + +class TestGetTableExtensionsValueFiltering: + """Filter out null and empty-string property values.""" + + def test_skips_none_valued_property(self, athena_source): + props = {"k1": "v1", "k2": None} + with ( + patch.object(athena_source, "_fetch_iceberg_properties", return_value=props), + patch.object(athena_source, "metadata") as mock_metadata, + ): + result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) + + assert result == {"k1": "v1"} + assert mock_metadata.create_or_update_custom_property.call_count == 1 + + def test_skips_empty_string_valued_property(self, athena_source): + props = {"k1": "v1", "k2": ""} + with ( + patch.object(athena_source, "_fetch_iceberg_properties", return_value=props), + patch.object(athena_source, "metadata"), + ): + result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) + assert result == {"k1": "v1"} + + def test_keeps_string_zero(self, athena_source): + """'0' is falsy-ish in some checks but is a legitimate value.""" + props = {"k": "0"} + with ( + patch.object(athena_source, "_fetch_iceberg_properties", return_value=props), + patch.object(athena_source, "metadata"), + ): + result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) + assert result == {"k": "0"} + + def test_keeps_whitespace_value(self, athena_source): + """A single space is not an empty string and should pass through.""" + props = {"k": " "} + with ( + patch.object(athena_source, "_fetch_iceberg_properties", return_value=props), + patch.object(athena_source, "metadata"), + ): + result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) + assert result == {"k": " "} + + +class TestGetTableExtensionsDedup: + """_processed_prop prevents redundant custom-property registration.""" + + def test_same_prop_across_tables_registered_once(self, athena_source): + props = {"shared_key": "v"} + with ( + patch.object(athena_source, "_fetch_iceberg_properties", return_value=props), + patch.object(athena_source, "metadata") as mock_metadata, + ): + athena_source.get_table_extensions("tbl1", TableType.Iceberg) + athena_source.get_table_extensions("tbl2", TableType.Iceberg) + + assert mock_metadata.create_or_update_custom_property.call_count == 1 + assert "shared_key" in athena_source._processed_prop + + def test_distinct_props_each_registered_once(self, athena_source): + with ( + patch.object( + athena_source, + "_fetch_iceberg_properties", + side_effect=[{"k1": "a"}, {"k2": "b"}], + ), + patch.object(athena_source, "metadata") as mock_metadata, + ): + athena_source.get_table_extensions("tbl1", TableType.Iceberg) + athena_source.get_table_extensions("tbl2", TableType.Iceberg) + + assert mock_metadata.create_or_update_custom_property.call_count == 2 + + def test_registration_failure_does_not_mark_prop_processed(self, athena_source): + """A failed registration must not be cached — so a retry on the next table can succeed.""" + props = {"k1": "v1"} + with ( + patch.object(athena_source, "_fetch_iceberg_properties", return_value=props), + patch.object(athena_source, "metadata") as mock_metadata, + ): + mock_metadata.create_or_update_custom_property.side_effect = Exception("boom") + result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) + + assert result is None + assert "k1" not in athena_source._processed_prop + + def test_registration_failure_for_one_prop_does_not_block_others(self, athena_source): + """Registration errors on one prop don't prevent others from being returned.""" + props = {"bad_prop": "x", "good_prop": "y"} + call_flag = {"first": True} + + def side_effect(_): + if call_flag["first"]: + call_flag["first"] = False + raise Exception("boom") # noqa: TRY002 + return + + with ( + patch.object(athena_source, "_fetch_iceberg_properties", return_value=props), + patch.object(athena_source, "metadata") as mock_metadata, + ): + mock_metadata.create_or_update_custom_property.side_effect = side_effect + result = athena_source.get_table_extensions(MOCK_TABLE_NAME, TableType.Iceberg) + + assert result == {"good_prop": "y"} + + +class TestFetchIcebergProperties: + """Unit tests for the $properties query helper.""" + + def test_returns_properties_from_query(self, athena_source): + _mock_query_rows( + athena_source, + [("myprop.owner", "team-a"), ("myprop.source", "ex")], + ) + + result = athena_source._fetch_iceberg_properties(MOCK_DATABASE_SCHEMA.name.root, MOCK_TABLE_NAME) + assert result == {"myprop.owner": "team-a", "myprop.source": "ex"} + + def test_returns_empty_dict_on_exception(self, athena_source): + mock_engine = MagicMock() + mock_engine.connect.side_effect = Exception("connection refused") + athena_source.engine = mock_engine + + result = athena_source._fetch_iceberg_properties(MOCK_DATABASE_SCHEMA.name.root, MOCK_TABLE_NAME) + assert result == {} + + def test_filters_null_key_and_null_value_rows(self, athena_source): + _mock_query_rows( + athena_source, + [ + ("k1", "v1"), + (None, "no_key"), + ("k2", None), + ("k3", "v3"), + ], + ) + + result = athena_source._fetch_iceberg_properties(MOCK_DATABASE_SCHEMA.name.root, MOCK_TABLE_NAME) + assert result == {"k1": "v1", "k3": "v3"} + + def test_query_targets_dollar_properties_metatable(self, athena_source): + mock_engine = _mock_query_rows(athena_source, []) + + athena_source._fetch_iceberg_properties("my_schema", "my_table") + + execute_call = mock_engine.connect.return_value.__enter__.return_value.execute + executed_sql = str(execute_call.call_args.args[0]) + assert "my_schema" in executed_sql + assert "my_table$properties" in executed_sql + assert "key" in executed_sql + assert "value" in executed_sql + + def test_values_are_coerced_to_string(self, athena_source): + _mock_query_rows(athena_source, [("k_int", 42), ("k_bool", True)]) + + result = athena_source._fetch_iceberg_properties(MOCK_DATABASE_SCHEMA.name.root, MOCK_TABLE_NAME) + assert result == {"k_int": "42", "k_bool": "True"} + + +class TestQueryTableNamesAndTypesIcebergConstant: + """Iceberg detection uses the shared ICEBERG_TABLE_TYPE constant.""" + + def test_constant_value_matches_glue_parameter(self): + from metadata.ingestion.source.database.athena.metadata import ( + ICEBERG_TABLE_TYPE, + ) + + assert ICEBERG_TABLE_TYPE == "ICEBERG" + + +class TestIncludeCustomPropertiesSchema: + """The includeCustomProperties config flag defaults to False.""" + + def test_default_is_false(self): + from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( + DatabaseServiceMetadataPipeline, + ) + + pipeline = DatabaseServiceMetadataPipeline() + assert pipeline.includeCustomProperties is False + + def test_can_be_enabled(self): + from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( + DatabaseServiceMetadataPipeline, + ) + + pipeline = DatabaseServiceMetadataPipeline(includeCustomProperties=True) + assert pipeline.includeCustomProperties is True diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json index b0428852ded5..f8dc855707bf 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json @@ -90,6 +90,12 @@ "default": true, "title": "Include Tags" }, + "includeCustomProperties": { + "description": "Optional configuration to toggle the ingestion of source-specific custom properties (e.g. Iceberg table properties) onto the entity extension. When disabled, no custom property definitions are registered and no extension values are set.", + "type": "boolean", + "default": false, + "title": "Include Custom Properties" + }, "includeOwners":{ "title": "Include Owners", "description": "Set the 'Include Owners' toggle to control whether to include owners to the ingested entity if the owner email matches with a user stored in the OM server as part of metadata ingestion. If the ingested entity already exists and has an owner, the owner will not be overwritten.", diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts index 331710b15137..549836e170cf 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts @@ -287,6 +287,12 @@ export interface Pipeline { * schema information. */ extractJsonSchema?: boolean; + /** + * Optional configuration to toggle the ingestion of source-specific custom properties (e.g. + * Iceberg table properties) onto the entity extension. When disabled, no custom property + * definitions are registered and no extension values are set. + */ + includeCustomProperties?: boolean; /** * Optional configuration to toggle the DDL Statements ingestion. */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts index 3f4f856fb42e..a2cbc9e911bc 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts @@ -986,6 +986,12 @@ export interface Pipeline { * schema information. */ extractJsonSchema?: boolean; + /** + * Optional configuration to toggle the ingestion of source-specific custom properties (e.g. + * Iceberg table properties) onto the entity extension. When disabled, no custom property + * definitions are registered and no extension values are set. + */ + includeCustomProperties?: boolean; /** * Optional configuration to toggle the DDL Statements ingestion. */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/databaseServiceMetadataPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/databaseServiceMetadataPipeline.ts index 534532887a14..d02a01a8af39 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/databaseServiceMetadataPipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/databaseServiceMetadataPipeline.ts @@ -24,6 +24,12 @@ export interface DatabaseServiceMetadataPipeline { * schema information. */ extractJsonSchema?: boolean; + /** + * Optional configuration to toggle the ingestion of source-specific custom properties (e.g. + * Iceberg table properties) onto the entity extension. When disabled, no custom property + * definitions are registered and no extension values are set. + */ + includeCustomProperties?: boolean; /** * Optional configuration to toggle the DDL Statements ingestion. */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts index 0e00f44f8445..d29712a508a8 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts @@ -5272,6 +5272,12 @@ export interface Pipeline { * schema information. */ extractJsonSchema?: boolean; + /** + * Optional configuration to toggle the ingestion of source-specific custom properties (e.g. + * Iceberg table properties) onto the entity extension. When disabled, no custom property + * definitions are registered and no extension values are set. + */ + includeCustomProperties?: boolean; /** * Optional configuration to toggle the DDL Statements ingestion. */