diff --git a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py index 456290316f7c..aaedd9fbad04 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py @@ -18,7 +18,9 @@ from collections import defaultdict from itertools import groupby, product from typing import Any, Dict, Iterable, List, Optional, Tuple -from urllib.parse import urlparse +from urllib.parse import quote, urlparse + +from cachetools import LRUCache from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest from metadata.generated.schema.api.data.createTable import CreateTableRequest @@ -32,6 +34,9 @@ KinesisBrokerConfig, OpenLineageConnection, ) +from metadata.generated.schema.entity.services.databaseService import ( + DatabaseServiceType, +) from metadata.generated.schema.entity.services.ingestionPipelines.status import ( StackTraceError, ) @@ -69,7 +74,13 @@ get_or_create_pipeline_service, resolve_pipeline_service_type, ) +from metadata.ingestion.source.pipeline.openlineage.table_resolver import ( + extract_db_scheme_from_namespace, + find_service_by_namespace_mapping, + find_services_by_scheme, +) from metadata.ingestion.source.pipeline.openlineage.utils import ( + AmbiguousServiceException, FQNNotFoundException, message_to_open_lineage_event, ) @@ -113,6 +124,9 @@ def create( def prepare(self): self._service_cache = {} self._current_pipeline_service = None + self._entity_cache: LRUCache = LRUCache(maxsize=10000) + self._namespace_to_service_cache: LRUCache = LRUCache(maxsize=10000) + self._db_service_type_map: Dict[str, str] = self._build_db_service_type_map() def close(self) -> None: self.metadata.compute_percentile(Pipeline, self.today) @@ -214,7 +228,77 @@ def _get_topic_details(data: Dict) -> TopicDetails: return TopicDetails(name=name, broker_hostname=broker_hostname) - def _get_table_fqn(self, table_details: TableDetails) -> Optional[str]: + def _get_by_name_cached(self, entity_class, fqn_str: str, **kwargs): + """Wrapper around metadata.get_by_name with in-memory caching.""" + if not hasattr(self, "_entity_cache"): + return self.metadata.get_by_name(entity_class, fqn_str, **kwargs) + key = f"{entity_class.__name__}:{fqn_str}" + if key not in self._entity_cache: + result = self.metadata.get_by_name(entity_class, fqn_str, **kwargs) + if result is not None: + self._entity_cache[key] = result + return result + return self._entity_cache[key] + + def _build_db_service_type_map(self): + """Build a map of {service_name: DatabaseServiceType} filtered to configured dbServiceNames.""" + type_map = {} + for service_name in self.get_db_service_names(): + try: + resp = self.metadata.client.get( + f"/services/databaseServices/name/{quote(service_name, safe='')}" + ) + svc_type_str = resp.get("serviceType") + if svc_type_str: + type_map[service_name] = DatabaseServiceType(svc_type_str) + except Exception: + logger.debug(f"Could not fetch DB service: {service_name}") + return type_map + + def _resolve_db_services_for_namespace(self, namespace: str) -> Optional[List[str]]: + """ + Resolve which DB services to search for a given OL dataset namespace. + + Resolution order: + 1. Check namespaceToServiceMapping config (exact then prefix match). + 2. Extract scheme from namespace, filter services by matching DB type. + If exactly one match -> use it. If multiple -> log warning and return all. + 3. Return None -> caller falls back to all dbServiceNames. + """ + if not hasattr(self, "_namespace_to_service_cache"): + return None + + if namespace in self._namespace_to_service_cache: + return self._namespace_to_service_cache[namespace] + + result = None + configured = set(self.get_db_service_names() or []) + + mapping = self.service_connection.namespaceToServiceMapping or {} + mapped_service = find_service_by_namespace_mapping(namespace, mapping) + if mapped_service and mapped_service in configured: + result = [mapped_service] + elif mapped_service: + logger.warning( + f"Namespace mapping resolved '{namespace}' to service " + f"'{mapped_service}', but it is not in the configured " + f"dbServiceNames. Falling back to scheme-based resolution." + ) + if not result: + # Auto-discover by extracting the DB scheme from the namespace URL + db_scheme = extract_db_scheme_from_namespace(namespace) + if db_scheme: + matched = find_services_by_scheme(db_scheme, self._db_service_type_map) + if matched: + result = matched + + if result is not None: + self._namespace_to_service_cache[namespace] = result + return result + + def _get_table_fqn( + self, table_details: TableDetails, namespace: Optional[str] = None + ) -> Optional[str]: if not self.get_db_service_names(): if not self._db_service_names_warned: logger.warning( @@ -224,15 +308,59 @@ def _get_table_fqn(self, table_details: TableDetails) -> Optional[str]: ) self._db_service_names_warned = True return None + try: - return self._get_table_fqn_from_om(table_details) - except FQNNotFoundException: - try: - schema_fqn = self._get_schema_fqn_from_om(table_details.schema) + resolved_services = self._resolve_db_services_for_namespace(namespace) - return f"{schema_fqn}.{table_details.name}" + try: + return self._get_table_fqn_from_om( + table_details, services=resolved_services + ) except FQNNotFoundException: - return None + try: + schema_fqn = self._get_schema_fqn_from_om( + table_details.schema, services=resolved_services + ) + return f"{schema_fqn}.{table_details.name}" + except FQNNotFoundException: + return None + except Exception: + logger.warning( + f"Failed to get FQN for table {table_details.name}: {traceback.format_exc()}" + ) + return None + + def _get_table_fqn_from_om( + self, table_details: TableDetails, services: Optional[List[str]] = None + ) -> str: + """ + Looks for matching Table entity in OM across all configured DB services. + Raises AmbiguousServiceException if the table exists in multiple services + of the same scheme-resolved type. + """ + resolved = services is not None + found = [] + for db_service in services or self.get_db_service_names(): + result = fqn.build( + metadata=self.metadata, + entity_type=Table, + service_name=db_service, + database_name=table_details.database, + schema_name=table_details.schema, + table_name=table_details.name, + ) + if result: + if not resolved: + return result + found.append(result) + if len(found) > 1: + raise AmbiguousServiceException( + f"Table '{table_details.name}' found in multiple services: " + f"{found}. Configure 'namespaceToServiceMapping' to disambiguate." + ) + if found: + return found[0] + raise FQNNotFoundException(f"Table FQN not found for {table_details}") def _build_broker_to_service_map(self) -> Dict[str, str]: """ @@ -309,15 +437,18 @@ def _get_topic_entity(self, topic_details: TopicDetails) -> Optional[Topic]: logger.warning(f"Error finding topic for {topic_details.name}: {exc}") return None - def _get_schema_fqn_from_om(self, schema: str) -> Optional[str]: + def _get_schema_fqn_from_om( + self, schema: str, services: Optional[List[str]] = None + ) -> Optional[str]: """ Based on partial schema name look for any matching DatabaseSchema object in open metadata. :param schema: schema name + :param services: optional list of service names to search :return: fully qualified name of a DatabaseSchema in Open Metadata """ result = None - services = self.get_db_service_names() + services = services or self.get_db_service_names() for db_service in services: result = fqn.build( @@ -469,7 +600,10 @@ def _build_ol_name_to_fqn_map(self, tables: List): entity_details = self._get_entity_details(table) if entity_details.entity_type != "table": continue - table_fqn = self._get_table_fqn(entity_details.table_details) + table_fqn = self._get_table_fqn( + entity_details.table_details, + namespace=table.get("namespace"), + ) if table_fqn: result[OpenlineageSource._get_ol_table_name(table)] = table_fqn @@ -505,7 +639,10 @@ def _get_column_lineage( if entity_details.entity_type != "table": continue - output_table_fqn = self._get_table_fqn(entity_details.table_details) + output_table_fqn = self._get_table_fqn( + entity_details.table_details, + namespace=table.get("namespace"), + ) for field_name, field_spec in ( table.get("facets", {}) .get("columnLineage", {}) @@ -604,18 +741,21 @@ def yield_pipeline_lineage_details( if create_table_request: yield create_table_request - table_fqn = self._get_table_fqn(entity_details.table_details) + table_fqn = self._get_table_fqn( + entity_details.table_details, + namespace=entity_data.get("namespace"), + ) if table_fqn: - entity_list.append( - LineageNode( - fqn=TableFQN(value=table_fqn), - uuid=self.metadata.get_by_name( - Table, table_fqn - ).id.root, - node_type="table", + table_entity = self._get_by_name_cached(Table, table_fqn) + if table_entity: + entity_list.append( + LineageNode( + fqn=TableFQN(value=table_fqn), + uuid=table_entity.id.root, + node_type="table", + ) ) - ) elif entity_details.entity_type == "topic": topic_entity = self._get_topic_entity(entity_details.topic_details) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/table_resolver.py b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/table_resolver.py new file mode 100644 index 000000000000..0207b1c16fe2 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/table_resolver.py @@ -0,0 +1,152 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Resolves a database service name from an OpenLineage dataset namespace. + +OpenLineage dataset namespaces carry the data store type in the URL scheme +(e.g. ``mysql://host:3306/db``, ``redshift://cluster:5439/db``). This module +uses that scheme to narrow lineage table lookups to services of the matching +type, eliminating false matches when the same table name exists in multiple +services. + +Resolution order (per namespace): +1. Explicit ``namespaceToServiceMapping`` config — exact then prefix match. +2. Scheme-based auto-discovery — if exactly one configured DB service has the + matching type, use it. If multiple match, log a warning and fall back. +3. Caller falls back to existing suffix search across all ``dbServiceNames``. +""" + +from typing import Dict, List, Optional +from urllib.parse import urlparse + +from metadata.generated.schema.entity.services.databaseService import ( + DatabaseServiceType, +) +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + +# Maps OpenLineage dataset namespace URI schemes to OMD DatabaseServiceType. +# See: https://openlineage.io/docs/spec/naming/ +NAMESPACE_SCHEME_TO_SERVICE_TYPE: Dict[str, DatabaseServiceType] = { + "awsathena": DatabaseServiceType.Athena, + "bigquery": DatabaseServiceType.BigQuery, + "cassandra": DatabaseServiceType.Cassandra, + "db2": DatabaseServiceType.Db2, + "hive": DatabaseServiceType.Hive, + "mssql": DatabaseServiceType.Mssql, + "mysql": DatabaseServiceType.Mysql, + "oracle": DatabaseServiceType.Oracle, + "postgres": DatabaseServiceType.Postgres, + "redshift": DatabaseServiceType.Redshift, + "snowflake": DatabaseServiceType.Snowflake, + "sqlserver": DatabaseServiceType.Synapse, + "teradata": DatabaseServiceType.Teradata, + "trino": DatabaseServiceType.Trino, +} + + +def extract_db_scheme_from_namespace(namespace: str) -> Optional[str]: + """ + Extract the URL scheme from an OpenLineage dataset namespace. + + >>> extract_db_scheme_from_namespace("mysql://host:3306/db") + 'mysql' + >>> extract_db_scheme_from_namespace("redshift://cluster:5439/db") + 'redshift' + >>> extract_db_scheme_from_namespace("airflow") + None + """ + try: + scheme = urlparse(namespace).scheme + return scheme.lower() if scheme else None + except Exception: + return None + + +def find_service_by_namespace_mapping( + namespace: str, + mapping: Dict[str, str], +) -> Optional[str]: + """ + Look up a database service name from a user-configured + ``namespaceToServiceMapping`` dict (namespace-prefix → OMD service name). + + Resolution order: + 1. Exact match — ``mapping[namespace]`` + 2. Prefix match — namespace starts with a mapping key. + When multiple keys match, the longest key wins. + + Example mapping:: + + { + "mysql://cluster-a:3306": "mysql-cluster-a", + "mysql://cluster-a:3306/specific_db": "mysql-specific", + } + + With namespace ``"mysql://cluster-a:3306/specific_db/table"`` + → returns ``"mysql-specific"`` (longest prefix match). + + Returns the mapped service name, or ``None`` if no entry matches. + """ + if not namespace or not mapping: + return None + + if namespace in mapping: + return mapping[namespace] + + # Prefix match: namespace starts with a mapping key. + # Pick the longest matching key to avoid ambiguity. + best_key = "" + best_service = None + for key, service_name in mapping.items(): + if namespace.startswith(key) and len(key) > len(best_key): + best_key = key + best_service = service_name + + return best_service + + +def find_services_by_scheme( + scheme: str, + db_service_type_map: Dict[str, DatabaseServiceType], +) -> List[str]: + """ + Filter a pre-built ``{service_name: DatabaseServiceType}`` map to only + those whose type matches the given URL scheme. + + If the scheme is recognized (present in ``NAMESPACE_SCHEME_TO_SERVICE_TYPE``), + returns only services of that exact type. + + If the scheme is **not** recognized, returns services whose + types are *not* in the known map — i.e. custom non-standard + services that are more likely to be the correct match. + + Returns a list of matching service names (may be empty or contain + multiple entries when several services of the same type are configured). + """ + target_type = NAMESPACE_SCHEME_TO_SERVICE_TYPE.get(scheme) + + if target_type: + return [ + name + for name, svc_type in db_service_type_map.items() + if svc_type == target_type + ] + + known_types = set(NAMESPACE_SCHEME_TO_SERVICE_TYPE.values()) + + return [ + name + for name, svc_type in db_service_type_map.items() + if svc_type not in known_types + ] diff --git a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/utils.py b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/utils.py index 460de42c6fc1..3f591f29f219 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/utils.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/utils.py @@ -55,3 +55,12 @@ class FQNNotFoundException(Exception): """ pass + + +class AmbiguousServiceException(Exception): + """ + Raised when a dataset namespace matches multiple DB services of the same + type and cannot be unambiguously resolved. + """ + + pass diff --git a/ingestion/tests/unit/topology/pipeline/test_openlineage.py b/ingestion/tests/unit/topology/pipeline/test_openlineage.py index 6b28cc50c9a2..a86782356b18 100644 --- a/ingestion/tests/unit/topology/pipeline/test_openlineage.py +++ b/ingestion/tests/unit/topology/pipeline/test_openlineage.py @@ -6,6 +6,8 @@ from unittest.mock import MagicMock, Mock, patch from uuid import UUID +from cachetools import LRUCache + from metadata.generated.schema.api.data.createTable import CreateTableRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.pipeline import Pipeline, Task @@ -17,6 +19,9 @@ ConsumerOffsets1, SecurityProtocol, ) +from metadata.generated.schema.entity.services.databaseService import ( + DatabaseServiceType, +) from metadata.generated.schema.entity.services.pipelineService import ( PipelineConnection, PipelineService, @@ -36,6 +41,7 @@ from metadata.ingestion.source.pipeline.openlineage.models import ( EntityDetails, OpenLineageEvent, + TableDetails, ) from metadata.ingestion.source.pipeline.openlineage.utils import ( FQNNotFoundException, @@ -385,7 +391,7 @@ def test_get_column_lineage_empty_inputs_outputs(self): def test_build_ol_name_to_fqn_map_with_valid_data(self, mock_get_table_fqn): # Mock _get_table_fqn to return a constructed FQN based on the provided table details mock_get_table_fqn.side_effect = ( - lambda table_details: f"database.schema.{table_details.name}" + lambda table_details, namespace=None: f"database.schema.{table_details.name}" ) tables = [ @@ -445,7 +451,7 @@ def test_get_column_lineage_valid_inputs_outputs( """Test with valid input and output lists.""" # Setup mock_get_table_fqn.side_effect = ( - lambda table_details: f"database.schema.{table_details.name}" + lambda table_details, namespace=None: f"database.schema.{table_details.name}" ) mock_build_map.return_value = { "s3a:/project-db/src_test1": "database.schema.input_table_1", @@ -516,7 +522,7 @@ def test_get_column_lineage_normalizes_caps_columns_to_lowercase( ): """Test that CAPS column names from OL events are normalized to lowercase in column FQNs.""" mock_get_table_fqn.side_effect = ( - lambda table_details: f"database.schema.{table_details.name}" + lambda table_details, namespace=None: f"database.schema.{table_details.name}" ) mock_build_map.return_value = { "sqlserver:/host:1433/hk_schema.CASE_TEST_SOURCE": "database.schema.case_test_source", @@ -653,9 +659,10 @@ def test_get_pipelines_list(self): @patch( "metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn_from_om" ) - def test_yield_pipeline_lineage_details(self, mock_get_entity): + def test_yield_pipeline_lineage_details(self, mock_get_table_from_om): def t_fqn_build_side_effect( table_details, + services=None, ): return f"testService.shopify.{table_details.name}" @@ -687,7 +694,7 @@ def extract_lineage_details(pip_results): return table_lineage, col_lineage # Set up the side effect for the mock entity FQN builder - mock_get_entity.side_effect = t_fqn_build_side_effect + mock_get_table_from_om.side_effect = t_fqn_build_side_effect ol_event = self.read_openlineage_event_from_kafka(FULL_OL_KAFKA_EVENT) @@ -861,7 +868,9 @@ def test_get_pipelines_list_filters_out_abort_events(self, mock_consumer_class): @patch( "metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn_from_om" ) - def test_lineage_merge_start_with_data_running_without(self, mock_get_table_fqn): + def test_lineage_merge_start_with_data_running_without( + self, mock_get_table_from_om + ): """ Test that START event with lineage data followed by RUNNING event without lineage data does not overwrite existing lineage in the database. @@ -886,10 +895,10 @@ def test_lineage_merge_start_with_data_running_without(self, mock_get_table_fqn) running_event["outputs"] = [] # Mock table FQN lookup - def mock_fqn_side_effect(table_details): + def mock_fqn_side_effect(table_details, services=None): return f"testService.shopify.{table_details.name}" - mock_get_table_fqn.side_effect = mock_fqn_side_effect + mock_get_table_from_om.side_effect = mock_fqn_side_effect # Mock metadata.get_by_name for table lookups from_table_id = "69fc8906-4a4a-45ab-9a54-9cc2d399e10e" @@ -1111,10 +1120,10 @@ def test_get_pipelines_list_kinesis_empty_stream(self): @patch( "metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn_from_om" ) - def test_yield_pipeline_lineage_details_kinesis(self, mock_get_entity): + def test_yield_pipeline_lineage_details_kinesis(self, mock_get_table_from_om): """Test lineage extraction from a Kinesis-sourced event.""" - def t_fqn_build_side_effect(table_details): + def t_fqn_build_side_effect(table_details, services=None): return f"testService.shopify.{table_details.name}" def mock_get_uuid_by_name(entity, fqn): @@ -1125,7 +1134,7 @@ def mock_get_uuid_by_name(entity, fqn): else: return Mock(id=Mock(root="79fc8906-4a4a-45ab-9a54-9cc2d399e10e")) - mock_get_entity.side_effect = t_fqn_build_side_effect + mock_get_table_from_om.side_effect = t_fqn_build_side_effect self._build_mock_kinesis_client([FULL_OL_KAFKA_EVENT]) results = list(self.open_lineage_kinesis_source.get_pipelines_list()) @@ -1392,6 +1401,252 @@ def get_by_name(entity, fqn, **kwargs): self.assertEqual(edge.toEntity.id.root, table_id) self.assertEqual(edge.toEntity.type, "table") + def test_namespace_resolution_skips_when_service_not_in_configured_names(self): + """Edge case 1: When the namespace maps to a service that is NOT in get_db_service_names(), + resolution should fall through and the table should not be found. + + Setup: Two DB services exist (mysql_prod, redshift_prod) but only redshift_prod + is in dbServiceNames. The event namespace is mysql://... which would map to mysql_prod, + but since mysql_prod is not in dbServiceNames, resolution must skip it. + The table exists only in mysql_prod, so the result should be None. + """ + source = self.open_lineage_source + + source._namespace_to_service_cache = {} + # Only redshift_prod is configured — mysql_prod is NOT in dbServiceNames + source.source_config.lineageInformation = LineageInformation( + dbServiceNames=["redshift_prod"] + ) + # _build_db_service_type_map only includes configured services + source._db_service_type_map = {"redshift_prod": DatabaseServiceType.Redshift} + + # namespaceToServiceMapping maps namespace to mysql_prod which is NOT configured + object.__setattr__( + source.service_connection, + "namespaceToServiceMapping", + {"mysql://mysql-host:3306": "mysql_prod"}, + ) + + table = TableDetails(name="user_stat", schema="analytics") + + # fqn.build returns None for redshift_prod (table doesn't exist there) + with patch("metadata.utils.fqn.build", return_value=None): + result = source._get_table_fqn( + table, namespace="mysql://mysql-host:3306/mydb" + ) + + # mysql_prod is not in dbServiceNames so mapping is ignored. + # Fallback scheme-based: redshift:// != mysql://, no match. + # Falls through to all dbServiceNames (redshift_prod) where table doesn't exist. + assert result is None + + def test_namespace_scheme_resolves_correct_service_among_different_types(self): + """Edge case 2: Same table name exists in both a MySQL and a Redshift service. + The namespace scheme (mysql:// vs redshift://) disambiguates which service to search. + + Setup: Two services configured — mysql_prod (Mysql) and redshift_prod (Redshift). + Both have a table analytics.user_stat. A mysql:// namespace should find only the + MySQL table FQN, and a redshift:// namespace should find only the Redshift one. + """ + source = self.open_lineage_source + + source._namespace_to_service_cache = {} + source.source_config.lineageInformation = LineageInformation( + dbServiceNames=["mysql_prod", "redshift_prod"] + ) + source._db_service_type_map = { + "mysql_prod": DatabaseServiceType.Mysql, + "redshift_prod": DatabaseServiceType.Redshift, + } + + table = TableDetails(name="user_stat", schema="analytics") + + def mock_fqn_build( + metadata, + entity_type, + service_name, + database_name, + schema_name, + table_name, + **kwargs, + ): + if service_name == "mysql_prod": + return "mysql_prod.db.analytics.user_stat" + elif service_name == "redshift_prod": + return "redshift_prod.warehouse.analytics.user_stat" + return None + + with patch("metadata.utils.fqn.build", side_effect=mock_fqn_build): + # MySQL namespace -> scheme resolves to mysql_prod only + mysql_result = source._get_table_fqn( + table, namespace="mysql://mysql-host:3306/db" + ) + assert mysql_result == "mysql_prod.db.analytics.user_stat" + + # Clear cache for next lookup + source._namespace_to_service_cache = {} + + # Redshift namespace -> scheme resolves to redshift_prod only + redshift_result = source._get_table_fqn( + table, namespace="redshift://cluster:5439/warehouse" + ) + assert redshift_result == "redshift_prod.warehouse.analytics.user_stat" + + def test_namespace_mapping_config_disambiguates_same_type_services(self): + """Edge case 3: Two MySQL services (mysql_cluster_a, mysql_cluster_b) both have + a table analytics.user_stat. Scheme-based resolution returns both (ambiguous). + The namespaceToServiceMapping config disambiguates to the correct cluster. + """ + source = self.open_lineage_source + + source._namespace_to_service_cache = {} + source.source_config.lineageInformation = LineageInformation( + dbServiceNames=["mysql_cluster_a", "mysql_cluster_b"] + ) + source._db_service_type_map = { + "mysql_cluster_a": DatabaseServiceType.Mysql, + "mysql_cluster_b": DatabaseServiceType.Mysql, + } + + # Config maps specific namespace prefixes to the correct cluster + object.__setattr__( + source.service_connection, + "namespaceToServiceMapping", + { + "mysql://cluster-a:3306": "mysql_cluster_a", + "mysql://cluster-b:3306": "mysql_cluster_b", + }, + ) + + table = TableDetails(name="user_stat", schema="analytics") + + def mock_fqn_build( + metadata, + entity_type, + service_name, + database_name, + schema_name, + table_name, + **kwargs, + ): + if service_name == "mysql_cluster_a": + return "mysql_cluster_a.db.analytics.user_stat" + elif service_name == "mysql_cluster_b": + return "mysql_cluster_b.db.analytics.user_stat" + return None + + with patch("metadata.utils.fqn.build", side_effect=mock_fqn_build): + # cluster-a namespace -> mapping resolves to mysql_cluster_a + result_a = source._get_table_fqn( + table, namespace="mysql://cluster-a:3306/db" + ) + assert result_a == "mysql_cluster_a.db.analytics.user_stat" + + source._namespace_to_service_cache = {} + + # cluster-b namespace -> mapping resolves to mysql_cluster_b + result_b = source._get_table_fqn( + table, namespace="mysql://cluster-b:3306/db" + ) + assert result_b == "mysql_cluster_b.db.analytics.user_stat" + + def test_namespace_scheme_resolves_known_vs_custom_db_type(self): + """Edge case 4: A MySQL service and a custom/unknown DB service both have the + same table analytics.user_stat. A mysql:// namespace should resolve to the MySQL + service only, while a custom://... namespace (unknown scheme) should resolve to + the custom service. + + find_services_by_scheme returns services with non-standard types for unknown schemes. + """ + source = self.open_lineage_source + + source._namespace_to_service_cache = {} + source.source_config.lineageInformation = LineageInformation( + dbServiceNames=["mysql_prod", "custom_lakehouse"] + ) + source._db_service_type_map = { + "mysql_prod": DatabaseServiceType.Mysql, + "custom_lakehouse": "CustomDatabase", + } + + table = TableDetails(name="user_stat", schema="analytics") + + def mock_fqn_build( + metadata, + entity_type, + service_name, + database_name, + schema_name, + table_name, + **kwargs, + ): + if service_name == "mysql_prod": + return "mysql_prod.db.analytics.user_stat" + elif service_name == "custom_lakehouse": + return "custom_lakehouse.lake.analytics.user_stat" + return None + + with patch("metadata.utils.fqn.build", side_effect=mock_fqn_build): + # mysql:// namespace -> scheme matches Mysql -> resolves to mysql_prod only + mysql_result = source._get_table_fqn( + table, namespace="mysql://mysql-host:3306/db" + ) + assert mysql_result == "mysql_prod.db.analytics.user_stat" + + source._namespace_to_service_cache = {} + + # custom:// namespace (unknown scheme) -> find_services_by_scheme returns + # services whose type is NOT in the known scheme map, i.e. custom_lakehouse + custom_result = source._get_table_fqn( + table, namespace="custom://lakehouse-host:8080/lake" + ) + assert custom_result == "custom_lakehouse.lake.analytics.user_stat" + + def test_table_found_in_multiple_services_raises_ambiguous(self): + """When the same table exists in multiple DB services, + AmbiguousServiceException is raised, caught in _get_table_fqn, + logged as a warning, and None is returned (lineage skipped for this entity). + """ + source = self.open_lineage_source + + source._namespace_to_service_cache = LRUCache(maxsize=10000) + source.source_config.lineageInformation = LineageInformation( + dbServiceNames=["mysql_a", "mysql_b"] + ) + source._db_service_type_map = { + "mysql_a": DatabaseServiceType.Mysql, + "mysql_b": DatabaseServiceType.Mysql, + } + + table = TableDetails(name="user_stat", schema="analytics") + + def mock_fqn_build( + metadata, + entity_type, + service_name, + database_name, + schema_name, + table_name, + **kwargs, + ): + if service_name == "mysql_a": + return "mysql_a.db.analytics.user_stat" + elif service_name == "mysql_b": + return "mysql_b.db.analytics.user_stat" + return None + + import logging + + with patch("metadata.utils.fqn.build", side_effect=mock_fqn_build): + with self.assertLogs("metadata.Ingestion", level=logging.WARNING) as cm: + result = source._get_table_fqn( + table, namespace="mysql://some-host:3306/db" + ) + + assert result is None + assert any("Failed to get FQN for table" in msg for msg in cm.output) + assert any("AmbiguousServiceException" in msg for msg in cm.output) + def test_yield_pipeline_lineage_topic_not_found_skips_gracefully(self): """When a Kafka topic input cannot be resolved (no matching messaging service), no lineage edge should be produced for that topic, even though the table output diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/openLineageConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/openLineageConnection.json index 3f28f335c171..e9bf9bacbb00 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/openLineageConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/openLineageConnection.json @@ -178,6 +178,14 @@ } ] }, + "namespaceToServiceMapping": { + "title": "Namespace to Service Mapping", + "description": "Map OpenLineage dataset namespaces (or prefixes) to OpenMetadata database service names. Used when multiple services of the same type exist. Example: 'mysql://cluster-a:3306' -> 'mysql-cluster-a'.", + "type": "object", + "additionalProperties": { + "type": "string" + } + }, "pipelineFilterPattern": { "description": "Regex exclude pipelines.", "$ref": "../../../../type/filterPattern.json#/definitions/filterPattern", diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/automations/createWorkflow.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/automations/createWorkflow.ts index c83372f378a9..b6ed6a1224af 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/automations/createWorkflow.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/automations/createWorkflow.ts @@ -1927,6 +1927,12 @@ export interface ConfigObject { * Event broker configuration. Choose between Kafka and Kinesis. */ brokerConfig?: BrokerConfiguration; + /** + * Map OpenLineage dataset namespaces (or prefixes) to OpenMetadata database service names. + * Used when multiple services of the same type exist. Example: 'mysql://cluster-a:3306' -> + * 'mysql-cluster-a'. + */ + namespaceToServiceMapping?: { [key: string]: string }; /** * We support username/password or No Authentication */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/createPipelineService.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/createPipelineService.ts index 8545da4f1736..f9382e58b73c 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/createPipelineService.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/createPipelineService.ts @@ -284,6 +284,12 @@ export interface ConfigObject { * Event broker configuration. Choose between Kafka and Kinesis. */ brokerConfig?: BrokerConfiguration; + /** + * Map OpenLineage dataset namespaces (or prefixes) to OpenMetadata database service names. + * Used when multiple services of the same type exist. Example: 'mysql://cluster-a:3306' -> + * 'mysql-cluster-a'. + */ + namespaceToServiceMapping?: { [key: string]: string }; /** * We support username/password or No Authentication */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts index 37c073b1cb43..a6bae56f488a 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts @@ -4714,6 +4714,12 @@ export interface ConfigObject { * Event broker configuration. Choose between Kafka and Kinesis. */ brokerConfig?: BrokerConfiguration; + /** + * Map OpenLineage dataset namespaces (or prefixes) to OpenMetadata database service names. + * Used when multiple services of the same type exist. Example: 'mysql://cluster-a:3306' -> + * 'mysql-cluster-a'. + */ + namespaceToServiceMapping?: { [key: string]: string }; /** * We support username/password or No Authentication */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/testServiceConnection.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/testServiceConnection.ts index 89bccd85b11b..577c45677b0b 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/testServiceConnection.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/testServiceConnection.ts @@ -1809,6 +1809,12 @@ export interface ConfigObject { * Event broker configuration. Choose between Kafka and Kinesis. */ brokerConfig?: BrokerConfiguration; + /** + * Map OpenLineage dataset namespaces (or prefixes) to OpenMetadata database service names. + * Used when multiple services of the same type exist. Example: 'mysql://cluster-a:3306' -> + * 'mysql-cluster-a'. + */ + namespaceToServiceMapping?: { [key: string]: string }; /** * We support username/password or No Authentication */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/workflow.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/workflow.ts index 42c0710756e2..00d405389157 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/workflow.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/workflow.ts @@ -2462,6 +2462,12 @@ export interface ConfigObject { * Event broker configuration. Choose between Kafka and Kinesis. */ brokerConfig?: BrokerConfiguration; + /** + * Map OpenLineage dataset namespaces (or prefixes) to OpenMetadata database service names. + * Used when multiple services of the same type exist. Example: 'mysql://cluster-a:3306' -> + * 'mysql-cluster-a'. + */ + namespaceToServiceMapping?: { [key: string]: string }; /** * We support username/password or No Authentication */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/pipeline/openLineageConnection.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/pipeline/openLineageConnection.ts index 390ec845fef7..6c334948bccf 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/pipeline/openLineageConnection.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/pipeline/openLineageConnection.ts @@ -18,6 +18,12 @@ export interface OpenLineageConnection { * Event broker configuration. Choose between Kafka and Kinesis. */ brokerConfig: BrokerConfiguration; + /** + * Map OpenLineage dataset namespaces (or prefixes) to OpenMetadata database service names. + * Used when multiple services of the same type exist. Example: 'mysql://cluster-a:3306' -> + * 'mysql-cluster-a'. + */ + namespaceToServiceMapping?: { [key: string]: string }; /** * Regex exclude pipelines. */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/serviceConnection.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/serviceConnection.ts index 4120d36f35e8..618a29204304 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/serviceConnection.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/serviceConnection.ts @@ -1979,6 +1979,12 @@ export interface ConfigObject { * Event broker configuration. Choose between Kafka and Kinesis. */ brokerConfig?: BrokerConfiguration; + /** + * Map OpenLineage dataset namespaces (or prefixes) to OpenMetadata database service names. + * Used when multiple services of the same type exist. Example: 'mysql://cluster-a:3306' -> + * 'mysql-cluster-a'. + */ + namespaceToServiceMapping?: { [key: string]: string }; /** * We support username/password or No Authentication */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts index 88578023b5dc..605540b87c4b 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts @@ -5297,6 +5297,12 @@ export interface ConfigObject { * Event broker configuration. Choose between Kafka and Kinesis. */ brokerConfig?: BrokerConfiguration; + /** + * Map OpenLineage dataset namespaces (or prefixes) to OpenMetadata database service names. + * Used when multiple services of the same type exist. Example: 'mysql://cluster-a:3306' -> + * 'mysql-cluster-a'. + */ + namespaceToServiceMapping?: { [key: string]: string }; /** * We support username/password or No Authentication */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/pipelineService.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/pipelineService.ts index 6e982b303766..6a333a3bc97d 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/pipelineService.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/pipelineService.ts @@ -406,6 +406,12 @@ export interface ConfigObject { * Event broker configuration. Choose between Kafka and Kinesis. */ brokerConfig?: BrokerConfiguration; + /** + * Map OpenLineage dataset namespaces (or prefixes) to OpenMetadata database service names. + * Used when multiple services of the same type exist. Example: 'mysql://cluster-a:3306' -> + * 'mysql-cluster-a'. + */ + namespaceToServiceMapping?: { [key: string]: string }; /** * We support username/password or No Authentication */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/testSuitePipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/testSuitePipeline.ts index 83ff56a0c6e1..ee73ef85e9d8 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/testSuitePipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/testSuitePipeline.ts @@ -2023,6 +2023,12 @@ export interface ConfigObject { * Event broker configuration. Choose between Kafka and Kinesis. */ brokerConfig?: BrokerConfiguration; + /** + * Map OpenLineage dataset namespaces (or prefixes) to OpenMetadata database service names. + * Used when multiple services of the same type exist. Example: 'mysql://cluster-a:3306' -> + * 'mysql-cluster-a'. + */ + namespaceToServiceMapping?: { [key: string]: string }; /** * We support username/password or No Authentication */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts index 3bfb134f5acd..d7c5cedc2579 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts @@ -2068,6 +2068,12 @@ export interface ConfigObject { * Event broker configuration. Choose between Kafka and Kinesis. */ brokerConfig?: BrokerConfiguration; + /** + * Map OpenLineage dataset namespaces (or prefixes) to OpenMetadata database service names. + * Used when multiple services of the same type exist. Example: 'mysql://cluster-a:3306' -> + * 'mysql-cluster-a'. + */ + namespaceToServiceMapping?: { [key: string]: string }; /** * We support username/password or No Authentication */