From b5d038cd69e3575833c079e757b32a56f98492ca Mon Sep 17 00:00:00 2001 From: jkukreja Date: Thu, 2 Apr 2026 18:48:27 -0400 Subject: [PATCH 1/9] add namespace-based DB service resolution for openLineage db_table lookups --- .../source/pipeline/openlineage/metadata.py | 135 +++++++++- .../pipeline/openlineage/table_resolver.py | 152 ++++++++++++ .../topology/pipeline/test_openlineage.py | 230 +++++++++++++++++- .../pipeline/openLineageConnection.json | 8 + 4 files changed, 502 insertions(+), 23 deletions(-) create mode 100644 ingestion/src/metadata/ingestion/source/pipeline/openlineage/table_resolver.py diff --git a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py index 456290316f7c..4c494beb2c26 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py @@ -32,6 +32,7 @@ KinesisBrokerConfig, OpenLineageConnection, ) +from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.ingestionPipelines.status import ( StackTraceError, ) @@ -69,6 +70,11 @@ get_or_create_pipeline_service, resolve_pipeline_service_type, ) +from metadata.ingestion.source.pipeline.openlineage.table_resolver import ( + extract_db_scheme_from_namespace, + find_service_by_namespace_mapping, + find_services_by_scheme, +) from metadata.ingestion.source.pipeline.openlineage.utils import ( FQNNotFoundException, message_to_open_lineage_event, @@ -113,6 +119,9 @@ def create( def prepare(self): self._service_cache = {} self._current_pipeline_service = None + self._entity_cache: Dict[str, Any] = {} + self._namespace_to_service_cache: Dict[str, Optional[List[str]]] = {} + self._db_service_type_map: Dict[str, str] = self._build_db_service_type_map() def close(self) -> None: self.metadata.compute_percentile(Pipeline, self.today) @@ -214,7 +223,74 @@ def _get_topic_details(data: Dict) -> TopicDetails: return TopicDetails(name=name, broker_hostname=broker_hostname) - def _get_table_fqn(self, table_details: TableDetails) -> Optional[str]: + def _get_by_name_cached(self, entity_class, fqn_str: str, **kwargs): + """Wrapper around metadata.get_by_name with in-memory caching.""" + if not hasattr(self, "_entity_cache"): + return self.metadata.get_by_name(entity_class, fqn_str, **kwargs) + key = f"{entity_class.__name__}:{fqn_str}" + if key not in self._entity_cache: + result = self.metadata.get_by_name(entity_class, fqn_str, **kwargs) + if result is not None: + self._entity_cache[key] = result + return result + return self._entity_cache[key] + + def _build_db_service_type_map(self): + """Build a map of {service_name: DatabaseServiceType} filtered to configured dbServiceNames.""" + type_map = {} + for service_name in self.get_db_service_names(): + try: + svc = self.metadata.get_by_name(DatabaseService, service_name) + if svc and svc.serviceType: + type_map[service_name] = svc.serviceType + except Exception: + logger.debug(f"Could not fetch DB service: {service_name}") + return type_map + + def _resolve_db_services_for_namespace(self, namespace: str) -> Optional[List[str]]: + """ + Resolve which DB services to search for a given OL dataset namespace. + + Resolution order: + 1. Check namespaceToServiceMapping config (exact then prefix match). + 2. Extract scheme from namespace, filter services by matching DB type. + If exactly one match -> use it. If multiple -> log warning and return all. + 3. Return None -> caller falls back to all dbServiceNames. + """ + if not hasattr(self, "_namespace_to_service_cache"): + return None + + if namespace in self._namespace_to_service_cache: + return self._namespace_to_service_cache[namespace] + + result = None + configured = set(self.get_db_service_names() or []) + + mapping = self.service_connection.namespaceToServiceMapping or {} + mapped_service = find_service_by_namespace_mapping(namespace, mapping) + if mapped_service and mapped_service in configured: + result = [mapped_service] + else: + # Auto-discover by extracting the DB scheme from the namespace URL + db_scheme = extract_db_scheme_from_namespace(namespace) + if db_scheme: + matched = find_services_by_scheme(db_scheme, self._db_service_type_map) + if len(matched) == 1: + result = matched + elif len(matched) > 1: + logger.warning( + f"Namespace '{namespace}' (scheme={db_scheme}) matches " + f"multiple DB services: {matched}. Configure " + f"'namespaceToServiceMapping' to disambiguate." + ) + result = matched + + self._namespace_to_service_cache[namespace] = result + return result + + def _get_table_fqn( + self, table_details: TableDetails, namespace: Optional[str] = None + ) -> Optional[str]: if not self.get_db_service_names(): if not self._db_service_names_warned: logger.warning( @@ -224,16 +300,41 @@ def _get_table_fqn(self, table_details: TableDetails) -> Optional[str]: ) self._db_service_names_warned = True return None + + resolved_services = self._resolve_db_services_for_namespace(namespace) + try: - return self._get_table_fqn_from_om(table_details) + return self._get_table_fqn_from_om( + table_details, services=resolved_services + ) except FQNNotFoundException: try: - schema_fqn = self._get_schema_fqn_from_om(table_details.schema) - + schema_fqn = self._get_schema_fqn_from_om( + table_details.schema, services=resolved_services + ) return f"{schema_fqn}.{table_details.name}" except FQNNotFoundException: return None + def _get_table_fqn_from_om( + self, table_details: TableDetails, services: Optional[List[str]] = None + ) -> str: + """ + Looks for matching Table entity in OM across all configured DB services. + """ + for db_service in services or self.get_db_service_names(): + result = fqn.build( + metadata=self.metadata, + entity_type=Table, + service_name=db_service, + database_name=table_details.database, + schema_name=table_details.schema, + table_name=table_details.name, + ) + if result: + return result + raise FQNNotFoundException(f"Table FQN not found for {table_details}") + def _build_broker_to_service_map(self) -> Dict[str, str]: """ Build a cache mapping broker hostnames to messaging service FQNs. @@ -309,15 +410,18 @@ def _get_topic_entity(self, topic_details: TopicDetails) -> Optional[Topic]: logger.warning(f"Error finding topic for {topic_details.name}: {exc}") return None - def _get_schema_fqn_from_om(self, schema: str) -> Optional[str]: + def _get_schema_fqn_from_om( + self, schema: str, services: Optional[List[str]] = None + ) -> Optional[str]: """ Based on partial schema name look for any matching DatabaseSchema object in open metadata. :param schema: schema name + :param services: optional list of service names to search :return: fully qualified name of a DatabaseSchema in Open Metadata """ result = None - services = self.get_db_service_names() + services = services or self.get_db_service_names() for db_service in services: result = fqn.build( @@ -469,7 +573,10 @@ def _build_ol_name_to_fqn_map(self, tables: List): entity_details = self._get_entity_details(table) if entity_details.entity_type != "table": continue - table_fqn = self._get_table_fqn(entity_details.table_details) + table_fqn = self._get_table_fqn( + entity_details.table_details, + namespace=table.get("namespace"), + ) if table_fqn: result[OpenlineageSource._get_ol_table_name(table)] = table_fqn @@ -505,7 +612,10 @@ def _get_column_lineage( if entity_details.entity_type != "table": continue - output_table_fqn = self._get_table_fqn(entity_details.table_details) + output_table_fqn = self._get_table_fqn( + entity_details.table_details, + namespace=table.get("namespace"), + ) for field_name, field_spec in ( table.get("facets", {}) .get("columnLineage", {}) @@ -604,15 +714,16 @@ def yield_pipeline_lineage_details( if create_table_request: yield create_table_request - table_fqn = self._get_table_fqn(entity_details.table_details) + table_fqn = self._get_table_fqn( + entity_details.table_details, + namespace=entity_data.get("namespace"), + ) if table_fqn: entity_list.append( LineageNode( fqn=TableFQN(value=table_fqn), - uuid=self.metadata.get_by_name( - Table, table_fqn - ).id.root, + uuid=self._get_by_name_cached(Table, table_fqn).id.root, node_type="table", ) ) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/table_resolver.py b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/table_resolver.py new file mode 100644 index 000000000000..b7f58b67aefd --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/table_resolver.py @@ -0,0 +1,152 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Resolves a database service name from an OpenLineage dataset namespace. + +OpenLineage dataset namespaces carry the data store type in the URL scheme +(e.g. ``mysql://host:3306/db``, ``redshift://cluster:5439/db``). This module +uses that scheme to narrow lineage table lookups to services of the matching +type, eliminating false matches when the same table name exists in multiple +services. + +Resolution order (per namespace): +1. Explicit ``namespaceToServiceMapping`` config — exact then prefix match. +2. Scheme-based auto-discovery — if exactly one configured DB service has the + matching type, use it. If multiple match, log a warning and fall back. +3. Caller falls back to existing suffix search across all ``dbServiceNames``. +""" + +from typing import Dict, List, Optional +from urllib.parse import urlparse + +from metadata.generated.schema.entity.services.databaseService import ( + DatabaseServiceType, +) +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + +# Maps OpenLineage dataset namespace URI schemes to OMD DatabaseServiceType. +# See: https://openlineage.io/docs/spec/naming/ +NAMESPACE_SCHEME_TO_SERVICE_TYPE: Dict[str, DatabaseServiceType] = { + "awsathena": DatabaseServiceType.Athena, + "bigquery": DatabaseServiceType.BigQuery, + "cassandra": DatabaseServiceType.Cassandra, + "db2": DatabaseServiceType.Db2, + "hive": DatabaseServiceType.Hive, + "mssql": DatabaseServiceType.Mssql, + "mysql": DatabaseServiceType.Mysql, + "oracle": DatabaseServiceType.Oracle, + "postgres": DatabaseServiceType.Postgres, + "redshift": DatabaseServiceType.Redshift, + "snowflake": DatabaseServiceType.Snowflake, + "sqlserver": DatabaseServiceType.Synapse, + "teradata": DatabaseServiceType.Teradata, + "trino": DatabaseServiceType.Trino, +} + + +def extract_db_scheme_from_namespace(namespace: str) -> Optional[str]: + """ + Extract the URL scheme from an OpenLineage dataset namespace. + + >>> extract_namespace_scheme("mysql://host:3306/db") + 'mysql' + >>> extract_namespace_scheme("redshift://cluster:5439/db") + 'redshift' + >>> extract_namespace_scheme("airflow") + None + """ + try: + scheme = urlparse(namespace).scheme + return scheme.lower() if scheme else None + except Exception: + return None + + +def find_service_by_namespace_mapping( + namespace: str, + mapping: Dict[str, str], +) -> Optional[str]: + """ + Look up a database service name from a user-configured + ``namespaceToServiceMapping`` dict (namespace-prefix → OMD service name). + + Resolution order: + 1. Exact match — ``mapping[namespace]`` + 2. Prefix match — namespace starts with a mapping key. + When multiple keys match, the longest key wins. + + Example mapping:: + + { + "mysql://cluster-a:3306": "mysql-cluster-a", + "mysql://cluster-a:3306/specific_db": "mysql-specific", + } + + With namespace ``"mysql://cluster-a:3306/specific_db/table"`` + → returns ``"mysql-specific"`` (longest prefix match). + + Returns the mapped service name, or ``None`` if no entry matches. + """ + if not namespace or not mapping: + return None + + if namespace in mapping: + return mapping[namespace] + + # Prefix match: namespace starts with a mapping key. + # Pick the longest matching key to avoid ambiguity. + best_key = "" + best_service = None + for key, service_name in mapping.items(): + if namespace.startswith(key) and len(key) > len(best_key): + best_key = key + best_service = service_name + + return best_service + + +def find_services_by_scheme( + scheme: str, + db_service_type_map: Dict[str, DatabaseServiceType], +) -> List[str]: + """ + Filter a pre-built ``{service_name: DatabaseServiceType}`` map to only + those whose type matches the given URL scheme. + + If the scheme is recognized (present in ``NAMESPACE_SCHEME_TO_SERVICE_TYPE``), + returns only services of that exact type. + + If the scheme is **not** recognized, returns services whose + types are *not* in the known map — i.e. custom non-standard + services that are more likely to be the correct match. + + Returns a list of matching service names (may be empty or contain + multiple entries when several services of the same type are configured). + """ + target_type = NAMESPACE_SCHEME_TO_SERVICE_TYPE.get(scheme) + + if target_type: + return [ + name + for name, svc_type in db_service_type_map.items() + if svc_type == target_type + ] + + known_types = set(NAMESPACE_SCHEME_TO_SERVICE_TYPE.values()) + + return [ + name + for name, svc_type in db_service_type_map.items() + if svc_type not in known_types + ] diff --git a/ingestion/tests/unit/topology/pipeline/test_openlineage.py b/ingestion/tests/unit/topology/pipeline/test_openlineage.py index 6b28cc50c9a2..96fdf1f0b535 100644 --- a/ingestion/tests/unit/topology/pipeline/test_openlineage.py +++ b/ingestion/tests/unit/topology/pipeline/test_openlineage.py @@ -17,6 +17,9 @@ ConsumerOffsets1, SecurityProtocol, ) +from metadata.generated.schema.entity.services.databaseService import ( + DatabaseServiceType, +) from metadata.generated.schema.entity.services.pipelineService import ( PipelineConnection, PipelineService, @@ -36,6 +39,7 @@ from metadata.ingestion.source.pipeline.openlineage.models import ( EntityDetails, OpenLineageEvent, + TableDetails, ) from metadata.ingestion.source.pipeline.openlineage.utils import ( FQNNotFoundException, @@ -385,7 +389,7 @@ def test_get_column_lineage_empty_inputs_outputs(self): def test_build_ol_name_to_fqn_map_with_valid_data(self, mock_get_table_fqn): # Mock _get_table_fqn to return a constructed FQN based on the provided table details mock_get_table_fqn.side_effect = ( - lambda table_details: f"database.schema.{table_details.name}" + lambda table_details, namespace=None: f"database.schema.{table_details.name}" ) tables = [ @@ -445,7 +449,7 @@ def test_get_column_lineage_valid_inputs_outputs( """Test with valid input and output lists.""" # Setup mock_get_table_fqn.side_effect = ( - lambda table_details: f"database.schema.{table_details.name}" + lambda table_details, namespace=None: f"database.schema.{table_details.name}" ) mock_build_map.return_value = { "s3a:/project-db/src_test1": "database.schema.input_table_1", @@ -516,7 +520,7 @@ def test_get_column_lineage_normalizes_caps_columns_to_lowercase( ): """Test that CAPS column names from OL events are normalized to lowercase in column FQNs.""" mock_get_table_fqn.side_effect = ( - lambda table_details: f"database.schema.{table_details.name}" + lambda table_details, namespace=None: f"database.schema.{table_details.name}" ) mock_build_map.return_value = { "sqlserver:/host:1433/hk_schema.CASE_TEST_SOURCE": "database.schema.case_test_source", @@ -653,9 +657,10 @@ def test_get_pipelines_list(self): @patch( "metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn_from_om" ) - def test_yield_pipeline_lineage_details(self, mock_get_entity): + def test_yield_pipeline_lineage_details(self, mock_get_table_from_om): def t_fqn_build_side_effect( table_details, + services=None, ): return f"testService.shopify.{table_details.name}" @@ -687,7 +692,7 @@ def extract_lineage_details(pip_results): return table_lineage, col_lineage # Set up the side effect for the mock entity FQN builder - mock_get_entity.side_effect = t_fqn_build_side_effect + mock_get_table_from_om.side_effect = t_fqn_build_side_effect ol_event = self.read_openlineage_event_from_kafka(FULL_OL_KAFKA_EVENT) @@ -861,7 +866,9 @@ def test_get_pipelines_list_filters_out_abort_events(self, mock_consumer_class): @patch( "metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn_from_om" ) - def test_lineage_merge_start_with_data_running_without(self, mock_get_table_fqn): + def test_lineage_merge_start_with_data_running_without( + self, mock_get_table_from_om + ): """ Test that START event with lineage data followed by RUNNING event without lineage data does not overwrite existing lineage in the database. @@ -886,10 +893,10 @@ def test_lineage_merge_start_with_data_running_without(self, mock_get_table_fqn) running_event["outputs"] = [] # Mock table FQN lookup - def mock_fqn_side_effect(table_details): + def mock_fqn_side_effect(table_details, services=None): return f"testService.shopify.{table_details.name}" - mock_get_table_fqn.side_effect = mock_fqn_side_effect + mock_get_table_from_om.side_effect = mock_fqn_side_effect # Mock metadata.get_by_name for table lookups from_table_id = "69fc8906-4a4a-45ab-9a54-9cc2d399e10e" @@ -1111,10 +1118,10 @@ def test_get_pipelines_list_kinesis_empty_stream(self): @patch( "metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn_from_om" ) - def test_yield_pipeline_lineage_details_kinesis(self, mock_get_entity): + def test_yield_pipeline_lineage_details_kinesis(self, mock_get_table_from_om): """Test lineage extraction from a Kinesis-sourced event.""" - def t_fqn_build_side_effect(table_details): + def t_fqn_build_side_effect(table_details, services=None): return f"testService.shopify.{table_details.name}" def mock_get_uuid_by_name(entity, fqn): @@ -1125,7 +1132,7 @@ def mock_get_uuid_by_name(entity, fqn): else: return Mock(id=Mock(root="79fc8906-4a4a-45ab-9a54-9cc2d399e10e")) - mock_get_entity.side_effect = t_fqn_build_side_effect + mock_get_table_from_om.side_effect = t_fqn_build_side_effect self._build_mock_kinesis_client([FULL_OL_KAFKA_EVENT]) results = list(self.open_lineage_kinesis_source.get_pipelines_list()) @@ -1392,6 +1399,207 @@ def get_by_name(entity, fqn, **kwargs): self.assertEqual(edge.toEntity.id.root, table_id) self.assertEqual(edge.toEntity.type, "table") + def test_namespace_resolution_skips_when_service_not_in_configured_names(self): + """Edge case 1: When the namespace maps to a service that is NOT in get_db_service_names(), + resolution should fall through and the table should not be found. + + Setup: Two DB services exist (mysql_prod, redshift_prod) but only redshift_prod + is in dbServiceNames. The event namespace is mysql://... which would map to mysql_prod, + but since mysql_prod is not in dbServiceNames, resolution must skip it. + The table exists only in mysql_prod, so the result should be None. + """ + source = self.open_lineage_source + + source._namespace_to_service_cache = {} + # Only redshift_prod is configured — mysql_prod is NOT in dbServiceNames + source.source_config.lineageInformation = LineageInformation( + dbServiceNames=["redshift_prod"] + ) + # _build_db_service_type_map only includes configured services + source._db_service_type_map = {"redshift_prod": DatabaseServiceType.Redshift} + + # namespaceToServiceMapping maps namespace to mysql_prod which is NOT configured + object.__setattr__( + source.service_connection, + "namespaceToServiceMapping", + {"mysql://mysql-host:3306": "mysql_prod"}, + ) + + table = TableDetails(name="user_stat", schema="analytics") + + # fqn.build returns None for redshift_prod (table doesn't exist there) + with patch("metadata.utils.fqn.build", return_value=None): + result = source._get_table_fqn( + table, namespace="mysql://mysql-host:3306/mydb" + ) + + # mysql_prod is not in dbServiceNames so mapping is ignored. + # Fallback scheme-based: redshift:// != mysql://, no match. + # Falls through to all dbServiceNames (redshift_prod) where table doesn't exist. + assert result is None + + def test_namespace_scheme_resolves_correct_service_among_different_types(self): + """Edge case 2: Same table name exists in both a MySQL and a Redshift service. + The namespace scheme (mysql:// vs redshift://) disambiguates which service to search. + + Setup: Two services configured — mysql_prod (Mysql) and redshift_prod (Redshift). + Both have a table analytics.user_stat. A mysql:// namespace should find only the + MySQL table FQN, and a redshift:// namespace should find only the Redshift one. + """ + source = self.open_lineage_source + + source._namespace_to_service_cache = {} + source.source_config.lineageInformation = LineageInformation( + dbServiceNames=["mysql_prod", "redshift_prod"] + ) + source._db_service_type_map = { + "mysql_prod": DatabaseServiceType.Mysql, + "redshift_prod": DatabaseServiceType.Redshift, + } + + table = TableDetails(name="user_stat", schema="analytics") + + def mock_fqn_build( + metadata, + entity_type, + service_name, + database_name, + schema_name, + table_name, + **kwargs, + ): + if service_name == "mysql_prod": + return "mysql_prod.db.analytics.user_stat" + elif service_name == "redshift_prod": + return "redshift_prod.warehouse.analytics.user_stat" + return None + + with patch("metadata.utils.fqn.build", side_effect=mock_fqn_build): + # MySQL namespace -> scheme resolves to mysql_prod only + mysql_result = source._get_table_fqn( + table, namespace="mysql://mysql-host:3306/db" + ) + assert mysql_result == "mysql_prod.db.analytics.user_stat" + + # Clear cache for next lookup + source._namespace_to_service_cache = {} + + # Redshift namespace -> scheme resolves to redshift_prod only + redshift_result = source._get_table_fqn( + table, namespace="redshift://cluster:5439/warehouse" + ) + assert redshift_result == "redshift_prod.warehouse.analytics.user_stat" + + def test_namespace_mapping_config_disambiguates_same_type_services(self): + """Edge case 3: Two MySQL services (mysql_cluster_a, mysql_cluster_b) both have + a table analytics.user_stat. Scheme-based resolution returns both (ambiguous). + The namespaceToServiceMapping config disambiguates to the correct cluster. + """ + source = self.open_lineage_source + + source._namespace_to_service_cache = {} + source.source_config.lineageInformation = LineageInformation( + dbServiceNames=["mysql_cluster_a", "mysql_cluster_b"] + ) + source._db_service_type_map = { + "mysql_cluster_a": DatabaseServiceType.Mysql, + "mysql_cluster_b": DatabaseServiceType.Mysql, + } + + # Config maps specific namespace prefixes to the correct cluster + object.__setattr__( + source.service_connection, + "namespaceToServiceMapping", + { + "mysql://cluster-a:3306": "mysql_cluster_a", + "mysql://cluster-b:3306": "mysql_cluster_b", + }, + ) + + table = TableDetails(name="user_stat", schema="analytics") + + def mock_fqn_build( + metadata, + entity_type, + service_name, + database_name, + schema_name, + table_name, + **kwargs, + ): + if service_name == "mysql_cluster_a": + return "mysql_cluster_a.db.analytics.user_stat" + elif service_name == "mysql_cluster_b": + return "mysql_cluster_b.db.analytics.user_stat" + return None + + with patch("metadata.utils.fqn.build", side_effect=mock_fqn_build): + # cluster-a namespace -> mapping resolves to mysql_cluster_a + result_a = source._get_table_fqn( + table, namespace="mysql://cluster-a:3306/db" + ) + assert result_a == "mysql_cluster_a.db.analytics.user_stat" + + source._namespace_to_service_cache = {} + + # cluster-b namespace -> mapping resolves to mysql_cluster_b + result_b = source._get_table_fqn( + table, namespace="mysql://cluster-b:3306/db" + ) + assert result_b == "mysql_cluster_b.db.analytics.user_stat" + + def test_namespace_scheme_resolves_known_vs_custom_db_type(self): + """Edge case 4: A MySQL service and a custom/unknown DB service both have the + same table analytics.user_stat. A mysql:// namespace should resolve to the MySQL + service only, while a custom://... namespace (unknown scheme) should resolve to + the custom service. + + find_services_by_scheme returns services with non-standard types for unknown schemes. + """ + source = self.open_lineage_source + + source._namespace_to_service_cache = {} + source.source_config.lineageInformation = LineageInformation( + dbServiceNames=["mysql_prod", "custom_lakehouse"] + ) + source._db_service_type_map = { + "mysql_prod": DatabaseServiceType.Mysql, + "custom_lakehouse": "CustomDatabase", + } + + table = TableDetails(name="user_stat", schema="analytics") + + def mock_fqn_build( + metadata, + entity_type, + service_name, + database_name, + schema_name, + table_name, + **kwargs, + ): + if service_name == "mysql_prod": + return "mysql_prod.db.analytics.user_stat" + elif service_name == "custom_lakehouse": + return "custom_lakehouse.lake.analytics.user_stat" + return None + + with patch("metadata.utils.fqn.build", side_effect=mock_fqn_build): + # mysql:// namespace -> scheme matches Mysql -> resolves to mysql_prod only + mysql_result = source._get_table_fqn( + table, namespace="mysql://mysql-host:3306/db" + ) + assert mysql_result == "mysql_prod.db.analytics.user_stat" + + source._namespace_to_service_cache = {} + + # custom:// namespace (unknown scheme) -> find_services_by_scheme returns + # services whose type is NOT in the known scheme map, i.e. custom_lakehouse + custom_result = source._get_table_fqn( + table, namespace="custom://lakehouse-host:8080/lake" + ) + assert custom_result == "custom_lakehouse.lake.analytics.user_stat" + def test_yield_pipeline_lineage_topic_not_found_skips_gracefully(self): """When a Kafka topic input cannot be resolved (no matching messaging service), no lineage edge should be produced for that topic, even though the table output diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/openLineageConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/openLineageConnection.json index 3f28f335c171..e9bf9bacbb00 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/openLineageConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/openLineageConnection.json @@ -178,6 +178,14 @@ } ] }, + "namespaceToServiceMapping": { + "title": "Namespace to Service Mapping", + "description": "Map OpenLineage dataset namespaces (or prefixes) to OpenMetadata database service names. Used when multiple services of the same type exist. Example: 'mysql://cluster-a:3306' -> 'mysql-cluster-a'.", + "type": "object", + "additionalProperties": { + "type": "string" + } + }, "pipelineFilterPattern": { "description": "Regex exclude pipelines.", "$ref": "../../../../type/filterPattern.json#/definitions/filterPattern", From f524ecdce873842c158e36866f3a7deede15f9e2 Mon Sep 17 00:00:00 2001 From: jkukreja Date: Thu, 2 Apr 2026 19:02:35 -0400 Subject: [PATCH 2/9] add additional tests --- .../pipeline/openlineage/table_resolver.py | 6 +-- .../topology/pipeline/test_openlineage.py | 51 +++++++++++++++++++ 2 files changed, 54 insertions(+), 3 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/table_resolver.py b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/table_resolver.py index b7f58b67aefd..0207b1c16fe2 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/table_resolver.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/table_resolver.py @@ -59,11 +59,11 @@ def extract_db_scheme_from_namespace(namespace: str) -> Optional[str]: """ Extract the URL scheme from an OpenLineage dataset namespace. - >>> extract_namespace_scheme("mysql://host:3306/db") + >>> extract_db_scheme_from_namespace("mysql://host:3306/db") 'mysql' - >>> extract_namespace_scheme("redshift://cluster:5439/db") + >>> extract_db_scheme_from_namespace("redshift://cluster:5439/db") 'redshift' - >>> extract_namespace_scheme("airflow") + >>> extract_db_scheme_from_namespace("airflow") None """ try: diff --git a/ingestion/tests/unit/topology/pipeline/test_openlineage.py b/ingestion/tests/unit/topology/pipeline/test_openlineage.py index 96fdf1f0b535..d071e02c66c4 100644 --- a/ingestion/tests/unit/topology/pipeline/test_openlineage.py +++ b/ingestion/tests/unit/topology/pipeline/test_openlineage.py @@ -1600,6 +1600,57 @@ def mock_fqn_build( ) assert custom_result == "custom_lakehouse.lake.analytics.user_stat" + def test_unknown_scheme_matches_all_custom_services_returns_first_found(self): + """Edge case 5: Multiple custom/non-standard DB services configured. An unknown + namespace scheme (custom://) matches ALL of them since none are in the known + scheme map. _get_table_fqn_from_om returns whichever has the table first — + this is ambiguous. Users should use namespaceToServiceMapping to disambiguate. + + Verifies that a warning is logged when multiple custom services match. + """ + source = self.open_lineage_source + + source._namespace_to_service_cache = {} + source.source_config.lineageInformation = LineageInformation( + dbServiceNames=["custom_lakehouse_a", "custom_lakehouse_b"] + ) + source._db_service_type_map = { + "custom_lakehouse_a": "CustomDatabaseA", + "custom_lakehouse_b": "CustomDatabaseB", + } + + table = TableDetails(name="user_stat", schema="analytics") + + def mock_fqn_build( + metadata, + entity_type, + service_name, + database_name, + schema_name, + table_name, + **kwargs, + ): + # Both custom services have the table + if service_name == "custom_lakehouse_a": + return "custom_lakehouse_a.lake.analytics.user_stat" + elif service_name == "custom_lakehouse_b": + return "custom_lakehouse_b.lake.analytics.user_stat" + return None + + with patch("metadata.utils.fqn.build", side_effect=mock_fqn_build): + import logging + + with self.assertLogs("metadata.Ingestion", level=logging.WARNING) as cm: + result = source._get_table_fqn( + table, namespace="custom://some-host:8080/lake" + ) + + # Returns the first match — ambiguous but functional + assert result == "custom_lakehouse_a.lake.analytics.user_stat" + + # Warning should advise using namespaceToServiceMapping + assert any("namespaceToServiceMapping" in msg for msg in cm.output) + def test_yield_pipeline_lineage_topic_not_found_skips_gracefully(self): """When a Kafka topic input cannot be resolved (no matching messaging service), no lineage edge should be produced for that topic, even though the table output From 01299052a0554e263c005998ab520b6ef1cec7cc Mon Sep 17 00:00:00 2001 From: jkukreja Date: Thu, 2 Apr 2026 19:14:50 -0400 Subject: [PATCH 3/9] Update generated TypeScript types --- .../ui/src/generated/api/automations/createWorkflow.ts | 6 ++++++ .../ui/src/generated/api/services/createPipelineService.ts | 6 ++++++ .../services/ingestionPipelines/createIngestionPipeline.ts | 6 ++++++ .../generated/entity/automations/testServiceConnection.ts | 6 ++++++ .../ui/src/generated/entity/automations/workflow.ts | 6 ++++++ .../services/connections/pipeline/openLineageConnection.ts | 6 ++++++ .../entity/services/connections/serviceConnection.ts | 6 ++++++ .../entity/services/ingestionPipelines/ingestionPipeline.ts | 6 ++++++ .../ui/src/generated/entity/services/pipelineService.ts | 6 ++++++ .../ui/src/generated/metadataIngestion/testSuitePipeline.ts | 6 ++++++ .../ui/src/generated/metadataIngestion/workflow.ts | 6 ++++++ 11 files changed, 66 insertions(+) diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/automations/createWorkflow.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/automations/createWorkflow.ts index c83372f378a9..b6ed6a1224af 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/automations/createWorkflow.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/automations/createWorkflow.ts @@ -1927,6 +1927,12 @@ export interface ConfigObject { * Event broker configuration. Choose between Kafka and Kinesis. */ brokerConfig?: BrokerConfiguration; + /** + * Map OpenLineage dataset namespaces (or prefixes) to OpenMetadata database service names. + * Used when multiple services of the same type exist. Example: 'mysql://cluster-a:3306' -> + * 'mysql-cluster-a'. + */ + namespaceToServiceMapping?: { [key: string]: string }; /** * We support username/password or No Authentication */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/createPipelineService.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/createPipelineService.ts index 8545da4f1736..f9382e58b73c 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/createPipelineService.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/createPipelineService.ts @@ -284,6 +284,12 @@ export interface ConfigObject { * Event broker configuration. Choose between Kafka and Kinesis. */ brokerConfig?: BrokerConfiguration; + /** + * Map OpenLineage dataset namespaces (or prefixes) to OpenMetadata database service names. + * Used when multiple services of the same type exist. Example: 'mysql://cluster-a:3306' -> + * 'mysql-cluster-a'. + */ + namespaceToServiceMapping?: { [key: string]: string }; /** * We support username/password or No Authentication */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts index 37c073b1cb43..a6bae56f488a 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts @@ -4714,6 +4714,12 @@ export interface ConfigObject { * Event broker configuration. Choose between Kafka and Kinesis. */ brokerConfig?: BrokerConfiguration; + /** + * Map OpenLineage dataset namespaces (or prefixes) to OpenMetadata database service names. + * Used when multiple services of the same type exist. Example: 'mysql://cluster-a:3306' -> + * 'mysql-cluster-a'. + */ + namespaceToServiceMapping?: { [key: string]: string }; /** * We support username/password or No Authentication */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/testServiceConnection.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/testServiceConnection.ts index 89bccd85b11b..577c45677b0b 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/testServiceConnection.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/testServiceConnection.ts @@ -1809,6 +1809,12 @@ export interface ConfigObject { * Event broker configuration. Choose between Kafka and Kinesis. */ brokerConfig?: BrokerConfiguration; + /** + * Map OpenLineage dataset namespaces (or prefixes) to OpenMetadata database service names. + * Used when multiple services of the same type exist. Example: 'mysql://cluster-a:3306' -> + * 'mysql-cluster-a'. + */ + namespaceToServiceMapping?: { [key: string]: string }; /** * We support username/password or No Authentication */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/workflow.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/workflow.ts index 42c0710756e2..00d405389157 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/workflow.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/workflow.ts @@ -2462,6 +2462,12 @@ export interface ConfigObject { * Event broker configuration. Choose between Kafka and Kinesis. */ brokerConfig?: BrokerConfiguration; + /** + * Map OpenLineage dataset namespaces (or prefixes) to OpenMetadata database service names. + * Used when multiple services of the same type exist. Example: 'mysql://cluster-a:3306' -> + * 'mysql-cluster-a'. + */ + namespaceToServiceMapping?: { [key: string]: string }; /** * We support username/password or No Authentication */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/pipeline/openLineageConnection.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/pipeline/openLineageConnection.ts index 390ec845fef7..6c334948bccf 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/pipeline/openLineageConnection.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/pipeline/openLineageConnection.ts @@ -18,6 +18,12 @@ export interface OpenLineageConnection { * Event broker configuration. Choose between Kafka and Kinesis. */ brokerConfig: BrokerConfiguration; + /** + * Map OpenLineage dataset namespaces (or prefixes) to OpenMetadata database service names. + * Used when multiple services of the same type exist. Example: 'mysql://cluster-a:3306' -> + * 'mysql-cluster-a'. + */ + namespaceToServiceMapping?: { [key: string]: string }; /** * Regex exclude pipelines. */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/serviceConnection.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/serviceConnection.ts index 4120d36f35e8..618a29204304 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/serviceConnection.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/serviceConnection.ts @@ -1979,6 +1979,12 @@ export interface ConfigObject { * Event broker configuration. Choose between Kafka and Kinesis. */ brokerConfig?: BrokerConfiguration; + /** + * Map OpenLineage dataset namespaces (or prefixes) to OpenMetadata database service names. + * Used when multiple services of the same type exist. Example: 'mysql://cluster-a:3306' -> + * 'mysql-cluster-a'. + */ + namespaceToServiceMapping?: { [key: string]: string }; /** * We support username/password or No Authentication */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts index 88578023b5dc..605540b87c4b 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts @@ -5297,6 +5297,12 @@ export interface ConfigObject { * Event broker configuration. Choose between Kafka and Kinesis. */ brokerConfig?: BrokerConfiguration; + /** + * Map OpenLineage dataset namespaces (or prefixes) to OpenMetadata database service names. + * Used when multiple services of the same type exist. Example: 'mysql://cluster-a:3306' -> + * 'mysql-cluster-a'. + */ + namespaceToServiceMapping?: { [key: string]: string }; /** * We support username/password or No Authentication */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/pipelineService.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/pipelineService.ts index 6e982b303766..6a333a3bc97d 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/pipelineService.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/pipelineService.ts @@ -406,6 +406,12 @@ export interface ConfigObject { * Event broker configuration. Choose between Kafka and Kinesis. */ brokerConfig?: BrokerConfiguration; + /** + * Map OpenLineage dataset namespaces (or prefixes) to OpenMetadata database service names. + * Used when multiple services of the same type exist. Example: 'mysql://cluster-a:3306' -> + * 'mysql-cluster-a'. + */ + namespaceToServiceMapping?: { [key: string]: string }; /** * We support username/password or No Authentication */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/testSuitePipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/testSuitePipeline.ts index 83ff56a0c6e1..ee73ef85e9d8 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/testSuitePipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/testSuitePipeline.ts @@ -2023,6 +2023,12 @@ export interface ConfigObject { * Event broker configuration. Choose between Kafka and Kinesis. */ brokerConfig?: BrokerConfiguration; + /** + * Map OpenLineage dataset namespaces (or prefixes) to OpenMetadata database service names. + * Used when multiple services of the same type exist. Example: 'mysql://cluster-a:3306' -> + * 'mysql-cluster-a'. + */ + namespaceToServiceMapping?: { [key: string]: string }; /** * We support username/password or No Authentication */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts index 3bfb134f5acd..d7c5cedc2579 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts @@ -2068,6 +2068,12 @@ export interface ConfigObject { * Event broker configuration. Choose between Kafka and Kinesis. */ brokerConfig?: BrokerConfiguration; + /** + * Map OpenLineage dataset namespaces (or prefixes) to OpenMetadata database service names. + * Used when multiple services of the same type exist. Example: 'mysql://cluster-a:3306' -> + * 'mysql-cluster-a'. + */ + namespaceToServiceMapping?: { [key: string]: string }; /** * We support username/password or No Authentication */ From 2a092bd8f7a212dc0e1f20409ce003744ba9599c Mon Sep 17 00:00:00 2001 From: jkukreja Date: Fri, 3 Apr 2026 01:46:38 -0400 Subject: [PATCH 4/9] raise error if multiple db_schemas are found --- .../source/pipeline/openlineage/metadata.py | 43 +++++++++++------- .../source/pipeline/openlineage/utils.py | 9 ++++ .../topology/pipeline/test_openlineage.py | 45 ++++++------------- 3 files changed, 49 insertions(+), 48 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py index 4c494beb2c26..3eaedb59b187 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py @@ -32,7 +32,9 @@ KinesisBrokerConfig, OpenLineageConnection, ) -from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.entity.services.databaseService import ( + DatabaseServiceType, +) from metadata.generated.schema.entity.services.ingestionPipelines.status import ( StackTraceError, ) @@ -76,6 +78,7 @@ find_services_by_scheme, ) from metadata.ingestion.source.pipeline.openlineage.utils import ( + AmbiguousServiceException, FQNNotFoundException, message_to_open_lineage_event, ) @@ -240,9 +243,12 @@ def _build_db_service_type_map(self): type_map = {} for service_name in self.get_db_service_names(): try: - svc = self.metadata.get_by_name(DatabaseService, service_name) - if svc and svc.serviceType: - type_map[service_name] = svc.serviceType + resp = self.metadata.client.get( + f"/services/databaseServices/name/{service_name}" + ) + svc_type_str = resp.get("serviceType") + if svc_type_str: + type_map[service_name] = DatabaseServiceType(svc_type_str) except Exception: logger.debug(f"Could not fetch DB service: {service_name}") return type_map @@ -278,12 +284,11 @@ def _resolve_db_services_for_namespace(self, namespace: str) -> Optional[List[st if len(matched) == 1: result = matched elif len(matched) > 1: - logger.warning( + raise AmbiguousServiceException( f"Namespace '{namespace}' (scheme={db_scheme}) matches " f"multiple DB services: {matched}. Configure " f"'namespaceToServiceMapping' to disambiguate." ) - result = matched self._namespace_to_service_cache[namespace] = result return result @@ -301,20 +306,26 @@ def _get_table_fqn( self._db_service_names_warned = True return None - resolved_services = self._resolve_db_services_for_namespace(namespace) - try: - return self._get_table_fqn_from_om( - table_details, services=resolved_services - ) - except FQNNotFoundException: + resolved_services = self._resolve_db_services_for_namespace(namespace) + try: - schema_fqn = self._get_schema_fqn_from_om( - table_details.schema, services=resolved_services + return self._get_table_fqn_from_om( + table_details, services=resolved_services ) - return f"{schema_fqn}.{table_details.name}" except FQNNotFoundException: - return None + try: + schema_fqn = self._get_schema_fqn_from_om( + table_details.schema, services=resolved_services + ) + return f"{schema_fqn}.{table_details.name}" + except FQNNotFoundException: + return None + except Exception: + logger.warning( + f"Failed to get FQN for table {table_details.name}: {traceback.format_exc()}" + ) + return None def _get_table_fqn_from_om( self, table_details: TableDetails, services: Optional[List[str]] = None diff --git a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/utils.py b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/utils.py index 460de42c6fc1..3f591f29f219 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/utils.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/utils.py @@ -55,3 +55,12 @@ class FQNNotFoundException(Exception): """ pass + + +class AmbiguousServiceException(Exception): + """ + Raised when a dataset namespace matches multiple DB services of the same + type and cannot be unambiguously resolved. + """ + + pass diff --git a/ingestion/tests/unit/topology/pipeline/test_openlineage.py b/ingestion/tests/unit/topology/pipeline/test_openlineage.py index d071e02c66c4..ec986b9ab8fa 100644 --- a/ingestion/tests/unit/topology/pipeline/test_openlineage.py +++ b/ingestion/tests/unit/topology/pipeline/test_openlineage.py @@ -1600,13 +1600,13 @@ def mock_fqn_build( ) assert custom_result == "custom_lakehouse.lake.analytics.user_stat" - def test_unknown_scheme_matches_all_custom_services_returns_first_found(self): + def test_unknown_scheme_matches_multiple_custom_services_logs_and_returns_none( + self, + ): """Edge case 5: Multiple custom/non-standard DB services configured. An unknown namespace scheme (custom://) matches ALL of them since none are in the known - scheme map. _get_table_fqn_from_om returns whichever has the table first — - this is ambiguous. Users should use namespaceToServiceMapping to disambiguate. - - Verifies that a warning is logged when multiple custom services match. + scheme map. AmbiguousServiceException is raised, caught in _get_table_fqn, + logged as a warning, and None is returned (lineage skipped for this entity). """ source = self.open_lineage_source @@ -1621,35 +1621,16 @@ def test_unknown_scheme_matches_all_custom_services_returns_first_found(self): table = TableDetails(name="user_stat", schema="analytics") - def mock_fqn_build( - metadata, - entity_type, - service_name, - database_name, - schema_name, - table_name, - **kwargs, - ): - # Both custom services have the table - if service_name == "custom_lakehouse_a": - return "custom_lakehouse_a.lake.analytics.user_stat" - elif service_name == "custom_lakehouse_b": - return "custom_lakehouse_b.lake.analytics.user_stat" - return None + import logging - with patch("metadata.utils.fqn.build", side_effect=mock_fqn_build): - import logging - - with self.assertLogs("metadata.Ingestion", level=logging.WARNING) as cm: - result = source._get_table_fqn( - table, namespace="custom://some-host:8080/lake" - ) - - # Returns the first match — ambiguous but functional - assert result == "custom_lakehouse_a.lake.analytics.user_stat" + with self.assertLogs("metadata.Ingestion", level=logging.WARNING) as cm: + result = source._get_table_fqn( + table, namespace="custom://some-host:8080/lake" + ) - # Warning should advise using namespaceToServiceMapping - assert any("namespaceToServiceMapping" in msg for msg in cm.output) + assert result is None + assert any("Failed to get FQN for table" in msg for msg in cm.output) + assert any("AmbiguousServiceException" in msg for msg in cm.output) def test_yield_pipeline_lineage_topic_not_found_skips_gracefully(self): """When a Kafka topic input cannot be resolved (no matching messaging service), From 0eb98744941c475e6b4665ea7cb5e9666c654e9f Mon Sep 17 00:00:00 2001 From: jkukreja Date: Fri, 3 Apr 2026 02:20:45 -0400 Subject: [PATCH 5/9] resolve gitar comments --- .../ingestion/source/pipeline/openlineage/metadata.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py index 3eaedb59b187..ea4b3ba61e71 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py @@ -18,7 +18,7 @@ from collections import defaultdict from itertools import groupby, product from typing import Any, Dict, Iterable, List, Optional, Tuple -from urllib.parse import urlparse +from urllib.parse import quote, urlparse from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest from metadata.generated.schema.api.data.createTable import CreateTableRequest @@ -244,7 +244,7 @@ def _build_db_service_type_map(self): for service_name in self.get_db_service_names(): try: resp = self.metadata.client.get( - f"/services/databaseServices/name/{service_name}" + f"/services/databaseServices/name/{quote(service_name, safe='')}" ) svc_type_str = resp.get("serviceType") if svc_type_str: From 8ab8bbc685e7c7fb996f2a1394bc83e2e5aa908d Mon Sep 17 00:00:00 2001 From: jkukreja Date: Fri, 3 Apr 2026 12:39:08 -0400 Subject: [PATCH 6/9] add lru cache for _entity_cache --- .../ingestion/source/pipeline/openlineage/metadata.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py index ea4b3ba61e71..6dfa4d75b4e1 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py @@ -20,6 +20,8 @@ from typing import Any, Dict, Iterable, List, Optional, Tuple from urllib.parse import quote, urlparse +from cachetools import LRUCache + from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest from metadata.generated.schema.api.data.createTable import CreateTableRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest @@ -122,8 +124,8 @@ def create( def prepare(self): self._service_cache = {} self._current_pipeline_service = None - self._entity_cache: Dict[str, Any] = {} - self._namespace_to_service_cache: Dict[str, Optional[List[str]]] = {} + self._entity_cache: LRUCache = LRUCache(maxsize=10000) + self._namespace_to_service_cache: LRUCache = LRUCache(maxsize=10000) self._db_service_type_map: Dict[str, str] = self._build_db_service_type_map() def close(self) -> None: @@ -290,7 +292,8 @@ def _resolve_db_services_for_namespace(self, namespace: str) -> Optional[List[st f"'namespaceToServiceMapping' to disambiguate." ) - self._namespace_to_service_cache[namespace] = result + if result is not None: + self._namespace_to_service_cache[namespace] = result return result def _get_table_fqn( From f1bf3326ebe25875d66be7e0147e7e73f65bcf76 Mon Sep 17 00:00:00 2001 From: jkukreja Date: Sat, 4 Apr 2026 12:51:00 -0400 Subject: [PATCH 7/9] resolve gitar issues --- .../source/pipeline/openlineage/metadata.py | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py index 6dfa4d75b4e1..f93de2a832d1 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py @@ -278,7 +278,13 @@ def _resolve_db_services_for_namespace(self, namespace: str) -> Optional[List[st mapped_service = find_service_by_namespace_mapping(namespace, mapping) if mapped_service and mapped_service in configured: result = [mapped_service] - else: + elif mapped_service: + logger.warning( + f"Namespace mapping resolved '{namespace}' to service " + f"'{mapped_service}', but it is not in the configured " + f"dbServiceNames. Falling back to scheme-based resolution." + ) + if not result: # Auto-discover by extracting the DB scheme from the namespace URL db_scheme = extract_db_scheme_from_namespace(namespace) if db_scheme: @@ -734,13 +740,15 @@ def yield_pipeline_lineage_details( ) if table_fqn: - entity_list.append( - LineageNode( - fqn=TableFQN(value=table_fqn), - uuid=self._get_by_name_cached(Table, table_fqn).id.root, - node_type="table", + table_entity = self._get_by_name_cached(Table, table_fqn) + if table_entity: + entity_list.append( + LineageNode( + fqn=TableFQN(value=table_fqn), + uuid=table_entity.id.root, + node_type="table", + ) ) - ) elif entity_details.entity_type == "topic": topic_entity = self._get_topic_entity(entity_details.topic_details) From 5d6fbabda075d10f7ad8ddf32d7c79dbf5064881 Mon Sep 17 00:00:00 2001 From: jkukreja Date: Tue, 7 Apr 2026 14:23:10 -0400 Subject: [PATCH 8/9] move ambigious exception to per table --- .../source/pipeline/openlineage/metadata.py | 19 ++++---- .../topology/pipeline/test_openlineage.py | 43 +++++++++++++------ 2 files changed, 40 insertions(+), 22 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py index f93de2a832d1..b376c9364a1e 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py @@ -289,14 +289,8 @@ def _resolve_db_services_for_namespace(self, namespace: str) -> Optional[List[st db_scheme = extract_db_scheme_from_namespace(namespace) if db_scheme: matched = find_services_by_scheme(db_scheme, self._db_service_type_map) - if len(matched) == 1: + if matched: result = matched - elif len(matched) > 1: - raise AmbiguousServiceException( - f"Namespace '{namespace}' (scheme={db_scheme}) matches " - f"multiple DB services: {matched}. Configure " - f"'namespaceToServiceMapping' to disambiguate." - ) if result is not None: self._namespace_to_service_cache[namespace] = result @@ -341,7 +335,9 @@ def _get_table_fqn_from_om( ) -> str: """ Looks for matching Table entity in OM across all configured DB services. + Raises AmbiguousServiceException if the table exists in multiple services. """ + found = [] for db_service in services or self.get_db_service_names(): result = fqn.build( metadata=self.metadata, @@ -352,7 +348,14 @@ def _get_table_fqn_from_om( table_name=table_details.name, ) if result: - return result + found.append(result) + if len(found) == 1: + return found[0] + if len(found) > 1: + raise AmbiguousServiceException( + f"Table '{table_details.name}' found in multiple services: " + f"{found}. Configure 'namespaceToServiceMapping' to disambiguate." + ) raise FQNNotFoundException(f"Table FQN not found for {table_details}") def _build_broker_to_service_map(self) -> Dict[str, str]: diff --git a/ingestion/tests/unit/topology/pipeline/test_openlineage.py b/ingestion/tests/unit/topology/pipeline/test_openlineage.py index ec986b9ab8fa..a86782356b18 100644 --- a/ingestion/tests/unit/topology/pipeline/test_openlineage.py +++ b/ingestion/tests/unit/topology/pipeline/test_openlineage.py @@ -6,6 +6,8 @@ from unittest.mock import MagicMock, Mock, patch from uuid import UUID +from cachetools import LRUCache + from metadata.generated.schema.api.data.createTable import CreateTableRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.pipeline import Pipeline, Task @@ -1600,33 +1602,46 @@ def mock_fqn_build( ) assert custom_result == "custom_lakehouse.lake.analytics.user_stat" - def test_unknown_scheme_matches_multiple_custom_services_logs_and_returns_none( - self, - ): - """Edge case 5: Multiple custom/non-standard DB services configured. An unknown - namespace scheme (custom://) matches ALL of them since none are in the known - scheme map. AmbiguousServiceException is raised, caught in _get_table_fqn, + def test_table_found_in_multiple_services_raises_ambiguous(self): + """When the same table exists in multiple DB services, + AmbiguousServiceException is raised, caught in _get_table_fqn, logged as a warning, and None is returned (lineage skipped for this entity). """ source = self.open_lineage_source - source._namespace_to_service_cache = {} + source._namespace_to_service_cache = LRUCache(maxsize=10000) source.source_config.lineageInformation = LineageInformation( - dbServiceNames=["custom_lakehouse_a", "custom_lakehouse_b"] + dbServiceNames=["mysql_a", "mysql_b"] ) source._db_service_type_map = { - "custom_lakehouse_a": "CustomDatabaseA", - "custom_lakehouse_b": "CustomDatabaseB", + "mysql_a": DatabaseServiceType.Mysql, + "mysql_b": DatabaseServiceType.Mysql, } table = TableDetails(name="user_stat", schema="analytics") + def mock_fqn_build( + metadata, + entity_type, + service_name, + database_name, + schema_name, + table_name, + **kwargs, + ): + if service_name == "mysql_a": + return "mysql_a.db.analytics.user_stat" + elif service_name == "mysql_b": + return "mysql_b.db.analytics.user_stat" + return None + import logging - with self.assertLogs("metadata.Ingestion", level=logging.WARNING) as cm: - result = source._get_table_fqn( - table, namespace="custom://some-host:8080/lake" - ) + with patch("metadata.utils.fqn.build", side_effect=mock_fqn_build): + with self.assertLogs("metadata.Ingestion", level=logging.WARNING) as cm: + result = source._get_table_fqn( + table, namespace="mysql://some-host:3306/db" + ) assert result is None assert any("Failed to get FQN for table" in msg for msg in cm.output) From d37b01a5c01e36db57222cea314ac5e86b6f3647 Mon Sep 17 00:00:00 2001 From: jkukreja Date: Tue, 7 Apr 2026 16:37:35 -0400 Subject: [PATCH 9/9] resolve code review comments --- .../source/pipeline/openlineage/metadata.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py index b376c9364a1e..aaedd9fbad04 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py @@ -335,8 +335,10 @@ def _get_table_fqn_from_om( ) -> str: """ Looks for matching Table entity in OM across all configured DB services. - Raises AmbiguousServiceException if the table exists in multiple services. + Raises AmbiguousServiceException if the table exists in multiple services + of the same scheme-resolved type. """ + resolved = services is not None found = [] for db_service in services or self.get_db_service_names(): result = fqn.build( @@ -348,14 +350,16 @@ def _get_table_fqn_from_om( table_name=table_details.name, ) if result: + if not resolved: + return result found.append(result) - if len(found) == 1: + if len(found) > 1: + raise AmbiguousServiceException( + f"Table '{table_details.name}' found in multiple services: " + f"{found}. Configure 'namespaceToServiceMapping' to disambiguate." + ) + if found: return found[0] - if len(found) > 1: - raise AmbiguousServiceException( - f"Table '{table_details.name}' found in multiple services: " - f"{found}. Configure 'namespaceToServiceMapping' to disambiguate." - ) raise FQNNotFoundException(f"Table FQN not found for {table_details}") def _build_broker_to_service_map(self) -> Dict[str, str]: