Skip to content

Commit 470d0de

Browse files
jsingh-yelpTeddyCr
authored andcommitted
bug(openlineage): add namespace-based DB service resolution for db_table lookups (#27005)
* add namespace-based DB service resolution for openLineage db_table lookups * add additional tests * Update generated TypeScript types * raise error if multiple db_schemas are found * resolve gitar comments * add lru cache for _entity_cache * resolve gitar issues * move ambigious exception to per table * resolve code review comments --------- Co-authored-by: Teddy <teddy.crepineau@gmail.com>
1 parent 00ded0e commit 470d0de

16 files changed

Lines changed: 662 additions & 32 deletions

File tree

ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py

Lines changed: 161 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
from collections import defaultdict
1919
from itertools import groupby, product
2020
from typing import Any, Dict, Iterable, List, Optional, Tuple
21-
from urllib.parse import urlparse
21+
from urllib.parse import quote, urlparse
22+
23+
from cachetools import LRUCache
2224

2325
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
2426
from metadata.generated.schema.api.data.createTable import CreateTableRequest
@@ -32,6 +34,9 @@
3234
KinesisBrokerConfig,
3335
OpenLineageConnection,
3436
)
37+
from metadata.generated.schema.entity.services.databaseService import (
38+
DatabaseServiceType,
39+
)
3540
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
3641
StackTraceError,
3742
)
@@ -69,7 +74,13 @@
6974
get_or_create_pipeline_service,
7075
resolve_pipeline_service_type,
7176
)
77+
from metadata.ingestion.source.pipeline.openlineage.table_resolver import (
78+
extract_db_scheme_from_namespace,
79+
find_service_by_namespace_mapping,
80+
find_services_by_scheme,
81+
)
7282
from metadata.ingestion.source.pipeline.openlineage.utils import (
83+
AmbiguousServiceException,
7384
FQNNotFoundException,
7485
message_to_open_lineage_event,
7586
)
@@ -113,6 +124,9 @@ def create(
113124
def prepare(self):
114125
self._service_cache = {}
115126
self._current_pipeline_service = None
127+
self._entity_cache: LRUCache = LRUCache(maxsize=10000)
128+
self._namespace_to_service_cache: LRUCache = LRUCache(maxsize=10000)
129+
self._db_service_type_map: Dict[str, str] = self._build_db_service_type_map()
116130

117131
def close(self) -> None:
118132
self.metadata.compute_percentile(Pipeline, self.today)
@@ -214,7 +228,77 @@ def _get_topic_details(data: Dict) -> TopicDetails:
214228

215229
return TopicDetails(name=name, broker_hostname=broker_hostname)
216230

217-
def _get_table_fqn(self, table_details: TableDetails) -> Optional[str]:
231+
def _get_by_name_cached(self, entity_class, fqn_str: str, **kwargs):
232+
"""Wrapper around metadata.get_by_name with in-memory caching."""
233+
if not hasattr(self, "_entity_cache"):
234+
return self.metadata.get_by_name(entity_class, fqn_str, **kwargs)
235+
key = f"{entity_class.__name__}:{fqn_str}"
236+
if key not in self._entity_cache:
237+
result = self.metadata.get_by_name(entity_class, fqn_str, **kwargs)
238+
if result is not None:
239+
self._entity_cache[key] = result
240+
return result
241+
return self._entity_cache[key]
242+
243+
def _build_db_service_type_map(self):
244+
"""Build a map of {service_name: DatabaseServiceType} filtered to configured dbServiceNames."""
245+
type_map = {}
246+
for service_name in self.get_db_service_names():
247+
try:
248+
resp = self.metadata.client.get(
249+
f"/services/databaseServices/name/{quote(service_name, safe='')}"
250+
)
251+
svc_type_str = resp.get("serviceType")
252+
if svc_type_str:
253+
type_map[service_name] = DatabaseServiceType(svc_type_str)
254+
except Exception:
255+
logger.debug(f"Could not fetch DB service: {service_name}")
256+
return type_map
257+
258+
def _resolve_db_services_for_namespace(self, namespace: str) -> Optional[List[str]]:
259+
"""
260+
Resolve which DB services to search for a given OL dataset namespace.
261+
262+
Resolution order:
263+
1. Check namespaceToServiceMapping config (exact then prefix match).
264+
2. Extract scheme from namespace, filter services by matching DB type.
265+
If exactly one match -> use it. If multiple -> log warning and return all.
266+
3. Return None -> caller falls back to all dbServiceNames.
267+
"""
268+
if not hasattr(self, "_namespace_to_service_cache"):
269+
return None
270+
271+
if namespace in self._namespace_to_service_cache:
272+
return self._namespace_to_service_cache[namespace]
273+
274+
result = None
275+
configured = set(self.get_db_service_names() or [])
276+
277+
mapping = self.service_connection.namespaceToServiceMapping or {}
278+
mapped_service = find_service_by_namespace_mapping(namespace, mapping)
279+
if mapped_service and mapped_service in configured:
280+
result = [mapped_service]
281+
elif mapped_service:
282+
logger.warning(
283+
f"Namespace mapping resolved '{namespace}' to service "
284+
f"'{mapped_service}', but it is not in the configured "
285+
f"dbServiceNames. Falling back to scheme-based resolution."
286+
)
287+
if not result:
288+
# Auto-discover by extracting the DB scheme from the namespace URL
289+
db_scheme = extract_db_scheme_from_namespace(namespace)
290+
if db_scheme:
291+
matched = find_services_by_scheme(db_scheme, self._db_service_type_map)
292+
if matched:
293+
result = matched
294+
295+
if result is not None:
296+
self._namespace_to_service_cache[namespace] = result
297+
return result
298+
299+
def _get_table_fqn(
300+
self, table_details: TableDetails, namespace: Optional[str] = None
301+
) -> Optional[str]:
218302
if not self.get_db_service_names():
219303
if not self._db_service_names_warned:
220304
logger.warning(
@@ -224,15 +308,59 @@ def _get_table_fqn(self, table_details: TableDetails) -> Optional[str]:
224308
)
225309
self._db_service_names_warned = True
226310
return None
311+
227312
try:
228-
return self._get_table_fqn_from_om(table_details)
229-
except FQNNotFoundException:
230-
try:
231-
schema_fqn = self._get_schema_fqn_from_om(table_details.schema)
313+
resolved_services = self._resolve_db_services_for_namespace(namespace)
232314

233-
return f"{schema_fqn}.{table_details.name}"
315+
try:
316+
return self._get_table_fqn_from_om(
317+
table_details, services=resolved_services
318+
)
234319
except FQNNotFoundException:
235-
return None
320+
try:
321+
schema_fqn = self._get_schema_fqn_from_om(
322+
table_details.schema, services=resolved_services
323+
)
324+
return f"{schema_fqn}.{table_details.name}"
325+
except FQNNotFoundException:
326+
return None
327+
except Exception:
328+
logger.warning(
329+
f"Failed to get FQN for table {table_details.name}: {traceback.format_exc()}"
330+
)
331+
return None
332+
333+
def _get_table_fqn_from_om(
334+
self, table_details: TableDetails, services: Optional[List[str]] = None
335+
) -> str:
336+
"""
337+
Looks for matching Table entity in OM across all configured DB services.
338+
Raises AmbiguousServiceException if the table exists in multiple services
339+
of the same scheme-resolved type.
340+
"""
341+
resolved = services is not None
342+
found = []
343+
for db_service in services or self.get_db_service_names():
344+
result = fqn.build(
345+
metadata=self.metadata,
346+
entity_type=Table,
347+
service_name=db_service,
348+
database_name=table_details.database,
349+
schema_name=table_details.schema,
350+
table_name=table_details.name,
351+
)
352+
if result:
353+
if not resolved:
354+
return result
355+
found.append(result)
356+
if len(found) > 1:
357+
raise AmbiguousServiceException(
358+
f"Table '{table_details.name}' found in multiple services: "
359+
f"{found}. Configure 'namespaceToServiceMapping' to disambiguate."
360+
)
361+
if found:
362+
return found[0]
363+
raise FQNNotFoundException(f"Table FQN not found for {table_details}")
236364

237365
def _build_broker_to_service_map(self) -> Dict[str, str]:
238366
"""
@@ -309,15 +437,18 @@ def _get_topic_entity(self, topic_details: TopicDetails) -> Optional[Topic]:
309437
logger.warning(f"Error finding topic for {topic_details.name}: {exc}")
310438
return None
311439

312-
def _get_schema_fqn_from_om(self, schema: str) -> Optional[str]:
440+
def _get_schema_fqn_from_om(
441+
self, schema: str, services: Optional[List[str]] = None
442+
) -> Optional[str]:
313443
"""
314444
Based on partial schema name look for any matching DatabaseSchema object in open metadata.
315445
316446
:param schema: schema name
447+
:param services: optional list of service names to search
317448
:return: fully qualified name of a DatabaseSchema in Open Metadata
318449
"""
319450
result = None
320-
services = self.get_db_service_names()
451+
services = services or self.get_db_service_names()
321452

322453
for db_service in services:
323454
result = fqn.build(
@@ -469,7 +600,10 @@ def _build_ol_name_to_fqn_map(self, tables: List):
469600
entity_details = self._get_entity_details(table)
470601
if entity_details.entity_type != "table":
471602
continue
472-
table_fqn = self._get_table_fqn(entity_details.table_details)
603+
table_fqn = self._get_table_fqn(
604+
entity_details.table_details,
605+
namespace=table.get("namespace"),
606+
)
473607

474608
if table_fqn:
475609
result[OpenlineageSource._get_ol_table_name(table)] = table_fqn
@@ -505,7 +639,10 @@ def _get_column_lineage(
505639
if entity_details.entity_type != "table":
506640
continue
507641

508-
output_table_fqn = self._get_table_fqn(entity_details.table_details)
642+
output_table_fqn = self._get_table_fqn(
643+
entity_details.table_details,
644+
namespace=table.get("namespace"),
645+
)
509646
for field_name, field_spec in (
510647
table.get("facets", {})
511648
.get("columnLineage", {})
@@ -604,18 +741,21 @@ def yield_pipeline_lineage_details(
604741
if create_table_request:
605742
yield create_table_request
606743

607-
table_fqn = self._get_table_fqn(entity_details.table_details)
744+
table_fqn = self._get_table_fqn(
745+
entity_details.table_details,
746+
namespace=entity_data.get("namespace"),
747+
)
608748

609749
if table_fqn:
610-
entity_list.append(
611-
LineageNode(
612-
fqn=TableFQN(value=table_fqn),
613-
uuid=self.metadata.get_by_name(
614-
Table, table_fqn
615-
).id.root,
616-
node_type="table",
750+
table_entity = self._get_by_name_cached(Table, table_fqn)
751+
if table_entity:
752+
entity_list.append(
753+
LineageNode(
754+
fqn=TableFQN(value=table_fqn),
755+
uuid=table_entity.id.root,
756+
node_type="table",
757+
)
617758
)
618-
)
619759

620760
elif entity_details.entity_type == "topic":
621761
topic_entity = self._get_topic_entity(entity_details.topic_details)

0 commit comments

Comments
 (0)