Skip to content

Commit 3a5a4d5

Browse files
committed
add namespace-based DB service resolution for openLineage db_table lookups
1 parent ad1a2bd commit 3a5a4d5

4 files changed

Lines changed: 502 additions & 23 deletions

File tree

ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py

Lines changed: 123 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
KinesisBrokerConfig,
3333
OpenLineageConnection,
3434
)
35+
from metadata.generated.schema.entity.services.databaseService import DatabaseService
3536
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
3637
StackTraceError,
3738
)
@@ -69,6 +70,11 @@
6970
get_or_create_pipeline_service,
7071
resolve_pipeline_service_type,
7172
)
73+
from metadata.ingestion.source.pipeline.openlineage.table_resolver import (
74+
extract_db_scheme_from_namespace,
75+
find_service_by_namespace_mapping,
76+
find_services_by_scheme,
77+
)
7278
from metadata.ingestion.source.pipeline.openlineage.utils import (
7379
FQNNotFoundException,
7480
message_to_open_lineage_event,
@@ -113,6 +119,9 @@ def create(
113119
def prepare(self):
114120
self._service_cache = {}
115121
self._current_pipeline_service = None
122+
self._entity_cache: Dict[str, Any] = {}
123+
self._namespace_to_service_cache: Dict[str, Optional[List[str]]] = {}
124+
self._db_service_type_map: Dict[str, str] = self._build_db_service_type_map()
116125

117126
def close(self) -> None:
118127
self.metadata.compute_percentile(Pipeline, self.today)
@@ -214,7 +223,74 @@ def _get_topic_details(data: Dict) -> TopicDetails:
214223

215224
return TopicDetails(name=name, broker_hostname=broker_hostname)
216225

217-
def _get_table_fqn(self, table_details: TableDetails) -> Optional[str]:
226+
def _get_by_name_cached(self, entity_class, fqn_str: str, **kwargs):
227+
"""Wrapper around metadata.get_by_name with in-memory caching."""
228+
if not hasattr(self, "_entity_cache"):
229+
return self.metadata.get_by_name(entity_class, fqn_str, **kwargs)
230+
key = f"{entity_class.__name__}:{fqn_str}"
231+
if key not in self._entity_cache:
232+
result = self.metadata.get_by_name(entity_class, fqn_str, **kwargs)
233+
if result is not None:
234+
self._entity_cache[key] = result
235+
return result
236+
return self._entity_cache[key]
237+
238+
def _build_db_service_type_map(self):
239+
"""Build a map of {service_name: DatabaseServiceType} filtered to configured dbServiceNames."""
240+
type_map = {}
241+
for service_name in self.get_db_service_names():
242+
try:
243+
svc = self.metadata.get_by_name(DatabaseService, service_name)
244+
if svc and svc.serviceType:
245+
type_map[service_name] = svc.serviceType
246+
except Exception:
247+
logger.debug(f"Could not fetch DB service: {service_name}")
248+
return type_map
249+
250+
def _resolve_db_services_for_namespace(self, namespace: str) -> Optional[List[str]]:
251+
"""
252+
Resolve which DB services to search for a given OL dataset namespace.
253+
254+
Resolution order:
255+
1. Check namespaceToServiceMapping config (exact then prefix match).
256+
2. Extract scheme from namespace, filter services by matching DB type.
257+
If exactly one match -> use it. If multiple -> log warning and return all.
258+
3. Return None -> caller falls back to all dbServiceNames.
259+
"""
260+
if not hasattr(self, "_namespace_to_service_cache"):
261+
return None
262+
263+
if namespace in self._namespace_to_service_cache:
264+
return self._namespace_to_service_cache[namespace]
265+
266+
result = None
267+
configured = set(self.get_db_service_names() or [])
268+
269+
mapping = self.service_connection.namespaceToServiceMapping or {}
270+
mapped_service = find_service_by_namespace_mapping(namespace, mapping)
271+
if mapped_service and mapped_service in configured:
272+
result = [mapped_service]
273+
else:
274+
# Auto-discover by extracting the DB scheme from the namespace URL
275+
db_scheme = extract_db_scheme_from_namespace(namespace)
276+
if db_scheme:
277+
matched = find_services_by_scheme(db_scheme, self._db_service_type_map)
278+
if len(matched) == 1:
279+
result = matched
280+
elif len(matched) > 1:
281+
logger.warning(
282+
f"Namespace '{namespace}' (scheme={db_scheme}) matches "
283+
f"multiple DB services: {matched}. Configure "
284+
f"'namespaceToServiceMapping' to disambiguate."
285+
)
286+
result = matched
287+
288+
self._namespace_to_service_cache[namespace] = result
289+
return result
290+
291+
def _get_table_fqn(
292+
self, table_details: TableDetails, namespace: Optional[str] = None
293+
) -> Optional[str]:
218294
if not self.get_db_service_names():
219295
if not self._db_service_names_warned:
220296
logger.warning(
@@ -224,16 +300,41 @@ def _get_table_fqn(self, table_details: TableDetails) -> Optional[str]:
224300
)
225301
self._db_service_names_warned = True
226302
return None
303+
304+
resolved_services = self._resolve_db_services_for_namespace(namespace)
305+
227306
try:
228-
return self._get_table_fqn_from_om(table_details)
307+
return self._get_table_fqn_from_om(
308+
table_details, services=resolved_services
309+
)
229310
except FQNNotFoundException:
230311
try:
231-
schema_fqn = self._get_schema_fqn_from_om(table_details.schema)
232-
312+
schema_fqn = self._get_schema_fqn_from_om(
313+
table_details.schema, services=resolved_services
314+
)
233315
return f"{schema_fqn}.{table_details.name}"
234316
except FQNNotFoundException:
235317
return None
236318

319+
def _get_table_fqn_from_om(
320+
self, table_details: TableDetails, services: Optional[List[str]] = None
321+
) -> str:
322+
"""
323+
Looks for matching Table entity in OM across all configured DB services.
324+
"""
325+
for db_service in services or self.get_db_service_names():
326+
result = fqn.build(
327+
metadata=self.metadata,
328+
entity_type=Table,
329+
service_name=db_service,
330+
database_name=table_details.database,
331+
schema_name=table_details.schema,
332+
table_name=table_details.name,
333+
)
334+
if result:
335+
return result
336+
raise FQNNotFoundException(f"Table FQN not found for {table_details}")
337+
237338
def _build_broker_to_service_map(self) -> Dict[str, str]:
238339
"""
239340
Build a cache mapping broker hostnames to messaging service FQNs.
@@ -309,15 +410,18 @@ def _get_topic_entity(self, topic_details: TopicDetails) -> Optional[Topic]:
309410
logger.warning(f"Error finding topic for {topic_details.name}: {exc}")
310411
return None
311412

312-
def _get_schema_fqn_from_om(self, schema: str) -> Optional[str]:
413+
def _get_schema_fqn_from_om(
414+
self, schema: str, services: Optional[List[str]] = None
415+
) -> Optional[str]:
313416
"""
314417
Based on partial schema name look for any matching DatabaseSchema object in open metadata.
315418
316419
:param schema: schema name
420+
:param services: optional list of service names to search
317421
:return: fully qualified name of a DatabaseSchema in Open Metadata
318422
"""
319423
result = None
320-
services = self.get_db_service_names()
424+
services = services or self.get_db_service_names()
321425

322426
for db_service in services:
323427
result = fqn.build(
@@ -469,7 +573,10 @@ def _build_ol_name_to_fqn_map(self, tables: List):
469573
entity_details = self._get_entity_details(table)
470574
if entity_details.entity_type != "table":
471575
continue
472-
table_fqn = self._get_table_fqn(entity_details.table_details)
576+
table_fqn = self._get_table_fqn(
577+
entity_details.table_details,
578+
namespace=table.get("namespace"),
579+
)
473580

474581
if table_fqn:
475582
result[OpenlineageSource._get_ol_table_name(table)] = table_fqn
@@ -505,7 +612,10 @@ def _get_column_lineage(
505612
if entity_details.entity_type != "table":
506613
continue
507614

508-
output_table_fqn = self._get_table_fqn(entity_details.table_details)
615+
output_table_fqn = self._get_table_fqn(
616+
entity_details.table_details,
617+
namespace=table.get("namespace"),
618+
)
509619
for field_name, field_spec in (
510620
table.get("facets", {})
511621
.get("columnLineage", {})
@@ -604,15 +714,16 @@ def yield_pipeline_lineage_details(
604714
if create_table_request:
605715
yield create_table_request
606716

607-
table_fqn = self._get_table_fqn(entity_details.table_details)
717+
table_fqn = self._get_table_fqn(
718+
entity_details.table_details,
719+
namespace=entity_data.get("namespace"),
720+
)
608721

609722
if table_fqn:
610723
entity_list.append(
611724
LineageNode(
612725
fqn=TableFQN(value=table_fqn),
613-
uuid=self.metadata.get_by_name(
614-
Table, table_fqn
615-
).id.root,
726+
uuid=self._get_by_name_cached(Table, table_fqn).id.root,
616727
node_type="table",
617728
)
618729
)
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
# Copyright 2025 Collate
2+
# Licensed under the Collate Community License, Version 1.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
6+
# Unless required by applicable law or agreed to in writing, software
7+
# distributed under the License is distributed on an "AS IS" BASIS,
8+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
9+
# See the License for the specific language governing permissions and
10+
# limitations under the License.
11+
12+
"""
13+
Resolves a database service name from an OpenLineage dataset namespace.
14+
15+
OpenLineage dataset namespaces carry the data store type in the URL scheme
16+
(e.g. ``mysql://host:3306/db``, ``redshift://cluster:5439/db``). This module
17+
uses that scheme to narrow lineage table lookups to services of the matching
18+
type, eliminating false matches when the same table name exists in multiple
19+
services.
20+
21+
Resolution order (per namespace):
22+
1. Explicit ``namespaceToServiceMapping`` config — exact then prefix match.
23+
2. Scheme-based auto-discovery — if exactly one configured DB service has the
24+
matching type, use it. If multiple match, log a warning and fall back.
25+
3. Caller falls back to existing suffix search across all ``dbServiceNames``.
26+
"""
27+
28+
from typing import Dict, List, Optional
29+
from urllib.parse import urlparse
30+
31+
from metadata.generated.schema.entity.services.databaseService import (
32+
DatabaseServiceType,
33+
)
34+
from metadata.utils.logger import ingestion_logger
35+
36+
logger = ingestion_logger()
37+
38+
# Maps OpenLineage dataset namespace URI schemes to OMD DatabaseServiceType.
39+
# See: https://openlineage.io/docs/spec/naming/
40+
NAMESPACE_SCHEME_TO_SERVICE_TYPE: Dict[str, DatabaseServiceType] = {
41+
"awsathena": DatabaseServiceType.Athena,
42+
"bigquery": DatabaseServiceType.BigQuery,
43+
"cassandra": DatabaseServiceType.Cassandra,
44+
"db2": DatabaseServiceType.Db2,
45+
"hive": DatabaseServiceType.Hive,
46+
"mssql": DatabaseServiceType.Mssql,
47+
"mysql": DatabaseServiceType.Mysql,
48+
"oracle": DatabaseServiceType.Oracle,
49+
"postgres": DatabaseServiceType.Postgres,
50+
"redshift": DatabaseServiceType.Redshift,
51+
"snowflake": DatabaseServiceType.Snowflake,
52+
"sqlserver": DatabaseServiceType.Synapse,
53+
"teradata": DatabaseServiceType.Teradata,
54+
"trino": DatabaseServiceType.Trino,
55+
}
56+
57+
58+
def extract_db_scheme_from_namespace(namespace: str) -> Optional[str]:
59+
"""
60+
Extract the URL scheme from an OpenLineage dataset namespace.
61+
62+
>>> extract_namespace_scheme("mysql://host:3306/db")
63+
'mysql'
64+
>>> extract_namespace_scheme("redshift://cluster:5439/db")
65+
'redshift'
66+
>>> extract_namespace_scheme("airflow")
67+
None
68+
"""
69+
try:
70+
scheme = urlparse(namespace).scheme
71+
return scheme.lower() if scheme else None
72+
except Exception:
73+
return None
74+
75+
76+
def find_service_by_namespace_mapping(
77+
namespace: str,
78+
mapping: Dict[str, str],
79+
) -> Optional[str]:
80+
"""
81+
Look up a database service name from a user-configured
82+
``namespaceToServiceMapping`` dict (namespace-prefix → OMD service name).
83+
84+
Resolution order:
85+
1. Exact match — ``mapping[namespace]``
86+
2. Prefix match — namespace starts with a mapping key.
87+
When multiple keys match, the longest key wins.
88+
89+
Example mapping::
90+
91+
{
92+
"mysql://cluster-a:3306": "mysql-cluster-a",
93+
"mysql://cluster-a:3306/specific_db": "mysql-specific",
94+
}
95+
96+
With namespace ``"mysql://cluster-a:3306/specific_db/table"``
97+
→ returns ``"mysql-specific"`` (longest prefix match).
98+
99+
Returns the mapped service name, or ``None`` if no entry matches.
100+
"""
101+
if not namespace or not mapping:
102+
return None
103+
104+
if namespace in mapping:
105+
return mapping[namespace]
106+
107+
# Prefix match: namespace starts with a mapping key.
108+
# Pick the longest matching key to avoid ambiguity.
109+
best_key = ""
110+
best_service = None
111+
for key, service_name in mapping.items():
112+
if namespace.startswith(key) and len(key) > len(best_key):
113+
best_key = key
114+
best_service = service_name
115+
116+
return best_service
117+
118+
119+
def find_services_by_scheme(
120+
scheme: str,
121+
db_service_type_map: Dict[str, DatabaseServiceType],
122+
) -> List[str]:
123+
"""
124+
Filter a pre-built ``{service_name: DatabaseServiceType}`` map to only
125+
those whose type matches the given URL scheme.
126+
127+
If the scheme is recognized (present in ``NAMESPACE_SCHEME_TO_SERVICE_TYPE``),
128+
returns only services of that exact type.
129+
130+
If the scheme is **not** recognized, returns services whose
131+
types are *not* in the known map — i.e. custom non-standard
132+
services that are more likely to be the correct match.
133+
134+
Returns a list of matching service names (may be empty or contain
135+
multiple entries when several services of the same type are configured).
136+
"""
137+
target_type = NAMESPACE_SCHEME_TO_SERVICE_TYPE.get(scheme)
138+
139+
if target_type:
140+
return [
141+
name
142+
for name, svc_type in db_service_type_map.items()
143+
if svc_type == target_type
144+
]
145+
146+
known_types = set(NAMESPACE_SCHEME_TO_SERVICE_TYPE.values())
147+
148+
return [
149+
name
150+
for name, svc_type in db_service_type_map.items()
151+
if svc_type not in known_types
152+
]

0 commit comments

Comments
 (0)