Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
from collections import defaultdict
from itertools import groupby, product
from typing import Any, Dict, Iterable, List, Optional, Tuple
from urllib.parse import urlparse
from urllib.parse import quote, urlparse

from cachetools import LRUCache

from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.data.createTable import CreateTableRequest
Expand All @@ -32,6 +34,9 @@
KinesisBrokerConfig,
OpenLineageConnection,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
Expand Down Expand Up @@ -69,7 +74,13 @@
get_or_create_pipeline_service,
resolve_pipeline_service_type,
)
from metadata.ingestion.source.pipeline.openlineage.table_resolver import (
extract_db_scheme_from_namespace,
find_service_by_namespace_mapping,
find_services_by_scheme,
)
from metadata.ingestion.source.pipeline.openlineage.utils import (
AmbiguousServiceException,
FQNNotFoundException,
message_to_open_lineage_event,
)
Expand Down Expand Up @@ -113,6 +124,9 @@ def create(
def prepare(self):
self._service_cache = {}
self._current_pipeline_service = None
self._entity_cache: LRUCache = LRUCache(maxsize=10000)
self._namespace_to_service_cache: LRUCache = LRUCache(maxsize=10000)
self._db_service_type_map: Dict[str, str] = self._build_db_service_type_map()

def close(self) -> None:
self.metadata.compute_percentile(Pipeline, self.today)
Expand Down Expand Up @@ -214,7 +228,77 @@ def _get_topic_details(data: Dict) -> TopicDetails:

return TopicDetails(name=name, broker_hostname=broker_hostname)

def _get_table_fqn(self, table_details: TableDetails) -> Optional[str]:
def _get_by_name_cached(self, entity_class, fqn_str: str, **kwargs):
"""Wrapper around metadata.get_by_name with in-memory caching."""
if not hasattr(self, "_entity_cache"):
return self.metadata.get_by_name(entity_class, fqn_str, **kwargs)
key = f"{entity_class.__name__}:{fqn_str}"
if key not in self._entity_cache:
result = self.metadata.get_by_name(entity_class, fqn_str, **kwargs)
if result is not None:
self._entity_cache[key] = result
return result
Comment thread
gitar-bot[bot] marked this conversation as resolved.
return self._entity_cache[key]

def _build_db_service_type_map(self):
"""Build a map of {service_name: DatabaseServiceType} filtered to configured dbServiceNames."""
type_map = {}
for service_name in self.get_db_service_names():
try:
resp = self.metadata.client.get(
f"/services/databaseServices/name/{quote(service_name, safe='')}"
)
svc_type_str = resp.get("serviceType")
if svc_type_str:
type_map[service_name] = DatabaseServiceType(svc_type_str)
except Exception:
logger.debug(f"Could not fetch DB service: {service_name}")
Comment thread
jsingh-yelp marked this conversation as resolved.
return type_map

def _resolve_db_services_for_namespace(self, namespace: str) -> Optional[List[str]]:
"""
Resolve which DB services to search for a given OL dataset namespace.

Resolution order:
1. Check namespaceToServiceMapping config (exact then prefix match).
2. Extract scheme from namespace, filter services by matching DB type.
If exactly one match -> use it. If multiple -> log warning and return all.
3. Return None -> caller falls back to all dbServiceNames.
"""
if not hasattr(self, "_namespace_to_service_cache"):
return None

if namespace in self._namespace_to_service_cache:
return self._namespace_to_service_cache[namespace]
Comment thread
gitar-bot[bot] marked this conversation as resolved.

result = None
configured = set(self.get_db_service_names() or [])

mapping = self.service_connection.namespaceToServiceMapping or {}
mapped_service = find_service_by_namespace_mapping(namespace, mapping)
if mapped_service and mapped_service in configured:
result = [mapped_service]
elif mapped_service:
logger.warning(
f"Namespace mapping resolved '{namespace}' to service "
f"'{mapped_service}', but it is not in the configured "
f"dbServiceNames. Falling back to scheme-based resolution."
)
if not result:
# Auto-discover by extracting the DB scheme from the namespace URL
db_scheme = extract_db_scheme_from_namespace(namespace)
if db_scheme:
matched = find_services_by_scheme(db_scheme, self._db_service_type_map)
if matched:
result = matched

if result is not None:
self._namespace_to_service_cache[namespace] = result
return result

def _get_table_fqn(
self, table_details: TableDetails, namespace: Optional[str] = None
) -> Optional[str]:
if not self.get_db_service_names():
if not self._db_service_names_warned:
logger.warning(
Expand All @@ -224,15 +308,59 @@ def _get_table_fqn(self, table_details: TableDetails) -> Optional[str]:
)
self._db_service_names_warned = True
return None

try:
Comment thread
gitar-bot[bot] marked this conversation as resolved.
return self._get_table_fqn_from_om(table_details)
except FQNNotFoundException:
try:
schema_fqn = self._get_schema_fqn_from_om(table_details.schema)
resolved_services = self._resolve_db_services_for_namespace(namespace)

return f"{schema_fqn}.{table_details.name}"
try:
return self._get_table_fqn_from_om(
table_details, services=resolved_services
)
except FQNNotFoundException:
return None
try:
schema_fqn = self._get_schema_fqn_from_om(
table_details.schema, services=resolved_services
)
return f"{schema_fqn}.{table_details.name}"
except FQNNotFoundException:
return None
except Exception:
logger.warning(
f"Failed to get FQN for table {table_details.name}: {traceback.format_exc()}"
)
return None

def _get_table_fqn_from_om(
self, table_details: TableDetails, services: Optional[List[str]] = None
) -> str:
"""
Looks for matching Table entity in OM across all configured DB services.
Raises AmbiguousServiceException if the table exists in multiple services
of the same scheme-resolved type.
"""
resolved = services is not None
found = []
for db_service in services or self.get_db_service_names():
result = fqn.build(
metadata=self.metadata,
entity_type=Table,
service_name=db_service,
database_name=table_details.database,
schema_name=table_details.schema,
table_name=table_details.name,
)
if result:
if not resolved:
return result
Comment thread
gitar-bot[bot] marked this conversation as resolved.
found.append(result)
if len(found) > 1:
raise AmbiguousServiceException(
f"Table '{table_details.name}' found in multiple services: "
f"{found}. Configure 'namespaceToServiceMapping' to disambiguate."
)
if found:
return found[0]
raise FQNNotFoundException(f"Table FQN not found for {table_details}")

def _build_broker_to_service_map(self) -> Dict[str, str]:
"""
Expand Down Expand Up @@ -309,15 +437,18 @@ def _get_topic_entity(self, topic_details: TopicDetails) -> Optional[Topic]:
logger.warning(f"Error finding topic for {topic_details.name}: {exc}")
return None

def _get_schema_fqn_from_om(self, schema: str) -> Optional[str]:
def _get_schema_fqn_from_om(
self, schema: str, services: Optional[List[str]] = None
) -> Optional[str]:
"""
Based on partial schema name look for any matching DatabaseSchema object in open metadata.

:param schema: schema name
:param services: optional list of service names to search
:return: fully qualified name of a DatabaseSchema in Open Metadata
"""
result = None
services = self.get_db_service_names()
services = services or self.get_db_service_names()

for db_service in services:
result = fqn.build(
Comment thread
jsingh-yelp marked this conversation as resolved.
Expand Down Expand Up @@ -469,7 +600,10 @@ def _build_ol_name_to_fqn_map(self, tables: List):
entity_details = self._get_entity_details(table)
if entity_details.entity_type != "table":
continue
table_fqn = self._get_table_fqn(entity_details.table_details)
table_fqn = self._get_table_fqn(
entity_details.table_details,
namespace=table.get("namespace"),
)

if table_fqn:
result[OpenlineageSource._get_ol_table_name(table)] = table_fqn
Expand Down Expand Up @@ -505,7 +639,10 @@ def _get_column_lineage(
if entity_details.entity_type != "table":
continue

output_table_fqn = self._get_table_fqn(entity_details.table_details)
output_table_fqn = self._get_table_fqn(
entity_details.table_details,
namespace=table.get("namespace"),
)
for field_name, field_spec in (
table.get("facets", {})
.get("columnLineage", {})
Expand Down Expand Up @@ -604,18 +741,21 @@ def yield_pipeline_lineage_details(
if create_table_request:
yield create_table_request

table_fqn = self._get_table_fqn(entity_details.table_details)
table_fqn = self._get_table_fqn(
entity_details.table_details,
namespace=entity_data.get("namespace"),
)

if table_fqn:
entity_list.append(
LineageNode(
fqn=TableFQN(value=table_fqn),
uuid=self.metadata.get_by_name(
Table, table_fqn
).id.root,
node_type="table",
table_entity = self._get_by_name_cached(Table, table_fqn)
if table_entity:
entity_list.append(
LineageNode(
fqn=TableFQN(value=table_fqn),
uuid=table_entity.id.root,
node_type="table",
)
)
)

elif entity_details.entity_type == "topic":
topic_entity = self._get_topic_entity(entity_details.topic_details)
Expand Down
Loading
Loading