Skip to content

Commit ba89dc7

Browse files
committed
add namespace-based DB service resolution for openLineage db_table lookups
1 parent ad1a2bd commit ba89dc7

3 files changed

Lines changed: 523 additions & 23 deletions

File tree

ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py

Lines changed: 141 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
KinesisBrokerConfig,
3333
OpenLineageConnection,
3434
)
35+
from metadata.generated.schema.entity.services.databaseService import DatabaseService
3536
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
3637
StackTraceError,
3738
)
@@ -69,6 +70,11 @@
6970
get_or_create_pipeline_service,
7071
resolve_pipeline_service_type,
7172
)
73+
from metadata.ingestion.source.pipeline.openlineage.table_resolver import (
74+
extract_db_scheme_from_namespace,
75+
find_service_by_namespace_mapping,
76+
find_services_by_scheme,
77+
)
7278
from metadata.ingestion.source.pipeline.openlineage.utils import (
7379
FQNNotFoundException,
7480
message_to_open_lineage_event,
@@ -113,6 +119,9 @@ def create(
113119
def prepare(self):
114120
self._service_cache = {}
115121
self._current_pipeline_service = None
122+
self._entity_cache: Dict[str, Any] = {}
123+
self._namespace_to_service_cache: Dict[str, Optional[List[str]]] = {}
124+
self._db_service_type_map: Dict[str, str] = self._build_db_service_type_map()
116125

117126
def close(self) -> None:
118127
self.metadata.compute_percentile(Pipeline, self.today)
@@ -214,7 +223,94 @@ def _get_topic_details(data: Dict) -> TopicDetails:
214223

215224
return TopicDetails(name=name, broker_hostname=broker_hostname)
216225

217-
def _get_table_fqn(self, table_details: TableDetails) -> Optional[str]:
226+
def _get_by_name_cached(self, entity_class, fqn_str: str, **kwargs):
227+
"""Wrapper around metadata.get_by_name with in-memory caching."""
228+
if not hasattr(self, "_entity_cache"):
229+
return self.metadata.get_by_name(entity_class, fqn_str, **kwargs)
230+
key = f"{entity_class.__name__}:{fqn_str}"
231+
if key not in self._entity_cache:
232+
self._entity_cache[key] = self.metadata.get_by_name(
233+
entity_class, fqn_str, **kwargs
234+
)
235+
return self._entity_cache[key]
236+
237+
def _build_db_service_type_map(self):
238+
"""Build a map of {service_name: DatabaseServiceType} filtered to configured dbServiceNames."""
239+
type_map = {}
240+
for service_name in self.get_db_service_names():
241+
try:
242+
svc = self.metadata.get_by_name(DatabaseService, service_name)
243+
if svc and svc.serviceType:
244+
type_map[service_name] = svc.serviceType
245+
except Exception:
246+
logger.debug(f"Could not fetch DB service: {service_name}")
247+
return type_map
248+
249+
def _resolve_db_services_for_namespace(self, namespace: str) -> Optional[List[str]]:
250+
"""
251+
Resolve which DB services to search for a given OL dataset namespace.
252+
253+
Resolution order:
254+
1. Check namespaceToServiceMapping config (exact then prefix match).
255+
2. Extract scheme from namespace, filter services by matching DB type.
256+
If exactly one match -> use it. If multiple -> log warning and return all.
257+
3. Return None -> caller falls back to all dbServiceNames.
258+
"""
259+
if not hasattr(self, "_namespace_to_service_cache"):
260+
return None
261+
262+
if namespace in self._namespace_to_service_cache:
263+
return self._namespace_to_service_cache[namespace]
264+
265+
result = None
266+
configured = set(self.get_db_service_names() or [])
267+
268+
mapping = (
269+
getattr(self.service_connection, "namespaceToServiceMapping", None) or {}
270+
)
271+
mapped_service = find_service_by_namespace_mapping(namespace, mapping)
272+
if mapped_service and mapped_service in configured:
273+
result = [mapped_service]
274+
else:
275+
# Auto-discover by extracting the DB scheme from the namespace URL
276+
db_scheme = extract_db_scheme_from_namespace(namespace)
277+
if db_scheme:
278+
matched = find_services_by_scheme(db_scheme, self._db_service_type_map)
279+
if len(matched) == 1:
280+
result = matched
281+
elif len(matched) > 1:
282+
logger.warning(
283+
f"Namespace '{namespace}' (scheme={db_scheme}) matches "
284+
f"multiple DB services: {matched}. Configure "
285+
f"'namespaceToServiceMapping' to disambiguate."
286+
)
287+
result = matched
288+
289+
self._namespace_to_service_cache[namespace] = result
290+
return result
291+
292+
def _find_table_fqn(
293+
self, table_details: TableDetails, services: Optional[List[str]] = None
294+
) -> str:
295+
search_services = services or self.get_db_service_names()
296+
for db_service in search_services:
297+
result = fqn.build(
298+
metadata=self.metadata,
299+
entity_type=Table,
300+
service_name=db_service,
301+
database_name=table_details.database,
302+
schema_name=table_details.schema,
303+
table_name=table_details.name,
304+
)
305+
if result:
306+
return result
307+
raise FQNNotFoundException(
308+
f"Table FQN not found for {table_details} in services {search_services}"
309+
)
310+
311+
def _get_table_fqn(
312+
self, table_details: TableDetails, namespace: Optional[str] = None
313+
) -> Optional[str]:
218314
if not self.get_db_service_names():
219315
if not self._db_service_names_warned:
220316
logger.warning(
@@ -224,16 +320,39 @@ def _get_table_fqn(self, table_details: TableDetails) -> Optional[str]:
224320
)
225321
self._db_service_names_warned = True
226322
return None
323+
324+
resolved_services = self._resolve_db_services_for_namespace(namespace)
325+
227326
try:
228-
return self._get_table_fqn_from_om(table_details)
327+
return self._find_table_fqn(table_details, services=resolved_services)
229328
except FQNNotFoundException:
230329
try:
231-
schema_fqn = self._get_schema_fqn_from_om(table_details.schema)
232-
330+
schema_fqn = self._get_schema_fqn_from_om(
331+
table_details.schema, services=resolved_services
332+
)
233333
return f"{schema_fqn}.{table_details.name}"
234334
except FQNNotFoundException:
235335
return None
236336

337+
def _get_table_fqn_from_om(
338+
self, table_details: TableDetails, services: Optional[List[str]] = None
339+
) -> str:
340+
"""
341+
Looks for matching Table entity in OM across all configured DB services.
342+
"""
343+
for db_service in services or self.get_db_service_names():
344+
result = fqn.build(
345+
metadata=self.metadata,
346+
entity_type=Table,
347+
service_name=db_service,
348+
database_name=table_details.database,
349+
schema_name=table_details.schema,
350+
table_name=table_details.name,
351+
)
352+
if result:
353+
return result
354+
raise FQNNotFoundException(f"Table FQN not found for {table_details}")
355+
237356
def _build_broker_to_service_map(self) -> Dict[str, str]:
238357
"""
239358
Build a cache mapping broker hostnames to messaging service FQNs.
@@ -309,15 +428,18 @@ def _get_topic_entity(self, topic_details: TopicDetails) -> Optional[Topic]:
309428
logger.warning(f"Error finding topic for {topic_details.name}: {exc}")
310429
return None
311430

312-
def _get_schema_fqn_from_om(self, schema: str) -> Optional[str]:
431+
def _get_schema_fqn_from_om(
432+
self, schema: str, services: Optional[List[str]] = None
433+
) -> Optional[str]:
313434
"""
314435
Based on partial schema name look for any matching DatabaseSchema object in open metadata.
315436
316437
:param schema: schema name
438+
:param services: optional list of service names to search
317439
:return: fully qualified name of a DatabaseSchema in Open Metadata
318440
"""
319441
result = None
320-
services = self.get_db_service_names()
442+
services = services or self.get_db_service_names()
321443

322444
for db_service in services:
323445
result = fqn.build(
@@ -469,7 +591,10 @@ def _build_ol_name_to_fqn_map(self, tables: List):
469591
entity_details = self._get_entity_details(table)
470592
if entity_details.entity_type != "table":
471593
continue
472-
table_fqn = self._get_table_fqn(entity_details.table_details)
594+
table_fqn = self._get_table_fqn(
595+
entity_details.table_details,
596+
namespace=table.get("namespace"),
597+
)
473598

474599
if table_fqn:
475600
result[OpenlineageSource._get_ol_table_name(table)] = table_fqn
@@ -505,7 +630,10 @@ def _get_column_lineage(
505630
if entity_details.entity_type != "table":
506631
continue
507632

508-
output_table_fqn = self._get_table_fqn(entity_details.table_details)
633+
output_table_fqn = self._get_table_fqn(
634+
entity_details.table_details,
635+
namespace=table.get("namespace"),
636+
)
509637
for field_name, field_spec in (
510638
table.get("facets", {})
511639
.get("columnLineage", {})
@@ -604,15 +732,16 @@ def yield_pipeline_lineage_details(
604732
if create_table_request:
605733
yield create_table_request
606734

607-
table_fqn = self._get_table_fqn(entity_details.table_details)
735+
table_fqn = self._get_table_fqn(
736+
entity_details.table_details,
737+
namespace=entity_data.get("namespace"),
738+
)
608739

609740
if table_fqn:
610741
entity_list.append(
611742
LineageNode(
612743
fqn=TableFQN(value=table_fqn),
613-
uuid=self.metadata.get_by_name(
614-
Table, table_fqn
615-
).id.root,
744+
uuid=self._get_by_name_cached(Table, table_fqn).id.root,
616745
node_type="table",
617746
)
618747
)
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
# Copyright 2025 Collate
2+
# Licensed under the Collate Community License, Version 1.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
6+
# Unless required by applicable law or agreed to in writing, software
7+
# distributed under the License is distributed on an "AS IS" BASIS,
8+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
9+
# See the License for the specific language governing permissions and
10+
# limitations under the License.
11+
12+
"""
13+
Resolves a database service name from an OpenLineage dataset namespace.
14+
15+
OpenLineage dataset namespaces carry the data store type in the URL scheme
16+
(e.g. ``mysql://host:3306/db``, ``redshift://cluster:5439/db``). This module
17+
uses that scheme to narrow lineage table lookups to services of the matching
18+
type, eliminating false matches when the same table name exists in multiple
19+
services.
20+
21+
Resolution order (per namespace):
22+
1. Explicit ``namespaceToServiceMapping`` config — exact then prefix match.
23+
2. Scheme-based auto-discovery — if exactly one configured DB service has the
24+
matching type, use it. If multiple match, log a warning and fall back.
25+
3. Caller falls back to existing suffix search across all ``dbServiceNames``.
26+
"""
27+
28+
from typing import Dict, List, Optional
29+
from urllib.parse import urlparse
30+
31+
from metadata.generated.schema.entity.services.databaseService import (
32+
DatabaseServiceType,
33+
)
34+
from metadata.utils.logger import ingestion_logger
35+
36+
logger = ingestion_logger()
37+
38+
# Maps OpenLineage dataset namespace URI schemes to OMD DatabaseServiceType.
39+
# See: https://openlineage.io/docs/spec/naming/
40+
NAMESPACE_SCHEME_TO_SERVICE_TYPE: Dict[str, DatabaseServiceType] = {
41+
"awsathena": DatabaseServiceType.Athena,
42+
"bigquery": DatabaseServiceType.BigQuery,
43+
"cassandra": DatabaseServiceType.Cassandra,
44+
"db2": DatabaseServiceType.Db2,
45+
"hive": DatabaseServiceType.Hive,
46+
"mssql": DatabaseServiceType.Mssql,
47+
"mysql": DatabaseServiceType.Mysql,
48+
"oracle": DatabaseServiceType.Oracle,
49+
"postgres": DatabaseServiceType.Postgres,
50+
"redshift": DatabaseServiceType.Redshift,
51+
"snowflake": DatabaseServiceType.Snowflake,
52+
"sqlserver": DatabaseServiceType.Synapse,
53+
"teradata": DatabaseServiceType.Teradata,
54+
"trino": DatabaseServiceType.Trino,
55+
}
56+
57+
58+
def extract_db_scheme_from_namespace(namespace: str) -> Optional[str]:
59+
"""
60+
Extract the URL scheme from an OpenLineage dataset namespace.
61+
62+
>>> extract_namespace_scheme("mysql://host:3306/db")
63+
'mysql'
64+
>>> extract_namespace_scheme("redshift://cluster:5439/db")
65+
'redshift'
66+
>>> extract_namespace_scheme("airflow")
67+
None
68+
"""
69+
try:
70+
scheme = urlparse(namespace).scheme
71+
return scheme.lower() if scheme else None
72+
except Exception:
73+
return None
74+
75+
76+
def find_service_by_namespace_mapping(
77+
namespace: str,
78+
mapping: Dict[str, str],
79+
) -> Optional[str]:
80+
"""
81+
Look up a database service name from a user-configured
82+
``namespaceToServiceMapping`` dict (namespace-prefix → OMD service name).
83+
84+
Resolution order:
85+
1. Exact match — ``mapping[namespace]``
86+
2. Prefix match — bidirectional ``startsWith`` check.
87+
88+
Example mapping::
89+
90+
{
91+
"mysql://cluster-a:3306": "mysql-cluster-a",
92+
"redshift://prod.us-east-1:5439": "redshift-prod",
93+
}
94+
95+
With namespace ``"mysql://cluster-a:3306/mydb"`` → returns ``"mysql-cluster-a"``
96+
(namespace starts with the key).
97+
98+
Returns the mapped service name, or ``None`` if no entry matches.
99+
"""
100+
if not namespace or not mapping:
101+
return None
102+
103+
if namespace in mapping:
104+
return mapping[namespace]
105+
106+
# check if there is a partial match
107+
for key, service_name in mapping.items():
108+
if namespace.startswith(key) or key.startswith(namespace):
109+
return service_name
110+
111+
return None
112+
113+
114+
def find_services_by_scheme(
115+
scheme: str,
116+
db_service_type_map: Dict[str, DatabaseServiceType],
117+
) -> List[str]:
118+
"""
119+
Filter a pre-built ``{service_name: DatabaseServiceType}`` map to only
120+
those whose type matches the given URL scheme.
121+
122+
If the scheme is recognized (present in ``NAMESPACE_SCHEME_TO_SERVICE_TYPE``),
123+
returns only services of that exact type.
124+
125+
If the scheme is **not** recognized, returns services whose
126+
types are *not* in the known map — i.e. custom non-standard
127+
services that are more likely to be the correct match.
128+
129+
Returns a list of matching service names (may be empty or contain
130+
multiple entries when several services of the same type are configured).
131+
"""
132+
target_type = NAMESPACE_SCHEME_TO_SERVICE_TYPE.get(scheme)
133+
134+
if target_type:
135+
return [
136+
name
137+
for name, svc_type in db_service_type_map.items()
138+
if svc_type == target_type
139+
]
140+
141+
known_types = set(NAMESPACE_SCHEME_TO_SERVICE_TYPE.values())
142+
143+
return [
144+
name
145+
for name, svc_type in db_service_type_map.items()
146+
if svc_type not in known_types
147+
]

0 commit comments

Comments
 (0)