Skip to content

Commit 00ded0e

Browse files
jsingh-yelpayush-shah
authored andcommitted
Fixes 26434: [openlinege] Resolve openlineage pipeline/job to it's integration type (#26821)
* resolve openlineage pipeline/job to it's integration type * include kafka broker port for _build_broker_to_service_map function --------- Co-authored-by: Ayush Shah <ayush02shah12@gmail.com>
1 parent fbc73c0 commit 00ded0e

4 files changed

Lines changed: 483 additions & 9 deletions

File tree

ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,13 @@
6262
TopicDetails,
6363
TopicFQN,
6464
)
65+
from metadata.ingestion.source.pipeline.openlineage.service_resolver import (
66+
build_service_name,
67+
extract_integration_type,
68+
find_pipeline_by_namespace,
69+
get_or_create_pipeline_service,
70+
resolve_pipeline_service_type,
71+
)
6572
from metadata.ingestion.source.pipeline.openlineage.utils import (
6673
FQNNotFoundException,
6774
message_to_open_lineage_event,
@@ -87,6 +94,8 @@ class OpenlineageSource(PipelineServiceSource):
8794
"""
8895

8996
_db_service_names_warned: bool = False
97+
_service_cache: Dict[str, str]
98+
_current_pipeline_service: Optional[str] = None
9099

91100
@classmethod
92101
def create(
@@ -102,7 +111,8 @@ def create(
102111
return cls(config, metadata)
103112

104113
def prepare(self):
105-
"""Nothing to prepare"""
114+
self._service_cache = {}
115+
self._current_pipeline_service = None
106116

107117
def close(self) -> None:
108118
self.metadata.compute_percentile(Pipeline, self.today)
@@ -192,12 +202,16 @@ def _get_topic_details(data: Dict) -> TopicDetails:
192202
except KeyError:
193203
raise ValueError("Topic name is not present")
194204

195-
broker_hostname = urlparse(namespace).hostname
205+
parsed = urlparse(namespace)
206+
broker_hostname = parsed.hostname
196207
if not broker_hostname:
197208
raise ValueError(
198209
f"Could not extract broker hostname from namespace: {namespace}"
199210
)
200211

212+
if parsed.port:
213+
broker_hostname = f"{broker_hostname}:{parsed.port}"
214+
201215
return TopicDetails(name=name, broker_hostname=broker_hostname)
202216

203217
def _get_table_fqn(self, table_details: TableDetails) -> Optional[str]:
@@ -240,9 +254,9 @@ def _build_broker_to_service_map(self) -> Dict[str, str]:
240254
bootstrap_servers = svc.connection.config.bootstrapServers or ""
241255
svc_fqn = svc.fullyQualifiedName.root
242256
for broker in bootstrap_servers.split(","):
243-
hostname = broker.strip().split(":")[0]
244-
if hostname:
245-
self._broker_to_service[hostname] = svc_fqn
257+
broker = broker.strip()
258+
if broker:
259+
self._broker_to_service[broker] = svc_fqn
246260
except Exception:
247261
logger.debug(
248262
f"Could not extract bootstrapServers from service {svc.name}"
@@ -514,16 +528,50 @@ def _get_column_lineage(
514528

515529
return OpenlineageSource._create_output_lineage_dict(_result)
516530

531+
def _resolve_pipeline_service(self, pipeline_details: OpenLineageEvent) -> str:
532+
"""
533+
Resolve the pipeline service for the current event.
534+
535+
Resolution order:
536+
1. **Namespace fallback** — try ``namespace.jobName`` as a pipeline
537+
FQN. If a pipeline already exists (e.g. ingested by a native
538+
Airflow connector), reuse its service.
539+
2. **Integration type** — extract from
540+
``job.facets.jobType.integration`` and create a typed service
541+
(e.g. ``spark_openlineage``).
542+
3. **Default** — fall back to the configured OpenLineage service.
543+
"""
544+
fallback = self.context.get().pipeline_service
545+
546+
ns_result = find_pipeline_by_namespace(self.metadata, pipeline_details)
547+
if ns_result:
548+
service_name, _ = ns_result
549+
return service_name
550+
551+
integration = extract_integration_type(pipeline_details)
552+
service_name = build_service_name(integration, fallback)
553+
554+
if service_name != fallback:
555+
service_type = resolve_pipeline_service_type(integration)
556+
get_or_create_pipeline_service(
557+
self.metadata, service_name, service_type, self._service_cache
558+
)
559+
560+
return service_name
561+
517562
def yield_pipeline(
518563
self, pipeline_details: OpenLineageEvent
519564
) -> Iterable[Either[CreatePipelineRequest]]:
520565
pipeline_name = self.get_pipeline_name(pipeline_details)
566+
self._current_pipeline_service = self._resolve_pipeline_service(
567+
pipeline_details
568+
)
521569
try:
522570
description = f"""```json
523571
{json.dumps(pipeline_details.run_facet, indent=4).strip()}```"""
524572
request = CreatePipelineRequest(
525573
name=pipeline_name,
526-
service=self.context.get().pipeline_service,
574+
service=self._current_pipeline_service,
527575
description=description,
528576
)
529577

@@ -597,10 +645,13 @@ def yield_pipeline_lineage_details(
597645
for n in product(input_edges, output_edges)
598646
]
599647

648+
service_name = (
649+
self._current_pipeline_service or self.context.get().pipeline_service
650+
)
600651
pipeline_fqn = fqn.build(
601652
metadata=self.metadata,
602653
entity_type=Pipeline,
603-
service_name=self.context.get().pipeline_service,
654+
service_name=service_name,
604655
pipeline_name=self.context.get().pipeline,
605656
)
606657

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
# Copyright 2025 Collate
2+
# Licensed under the Collate Community License, Version 1.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
6+
# Unless required by applicable law or agreed to in writing, software
7+
# distributed under the License is distributed on an "AS IS" BASIS,
8+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
9+
# See the License for the specific language governing permissions and
10+
# limitations under the License.
11+
12+
"""
13+
Resolves the pipeline service type and name from OpenLineage event metadata.
14+
15+
OpenLineage events carry integration identity (Spark, Flink, Airflow, etc.)
16+
in job facets. This module extracts that information and maps it to the
17+
appropriate OMD PipelineServiceType, creating services as needed.
18+
"""
19+
20+
from typing import Dict, Optional, Tuple
21+
22+
from metadata.generated.schema.api.services.createPipelineService import (
23+
CreatePipelineServiceRequest,
24+
)
25+
from metadata.generated.schema.entity.data.pipeline import Pipeline
26+
from metadata.generated.schema.entity.services.pipelineService import (
27+
PipelineService,
28+
PipelineServiceType,
29+
)
30+
from metadata.generated.schema.type.basic import EntityName
31+
from metadata.ingestion.ometa.ometa_api import OpenMetadata
32+
from metadata.ingestion.source.pipeline.openlineage.models import OpenLineageEvent
33+
from metadata.utils.logger import ingestion_logger
34+
35+
logger = ingestion_logger()
36+
37+
INTEGRATION_TO_SERVICE_TYPE: Dict[str, PipelineServiceType] = {
38+
"spark": PipelineServiceType.Spark,
39+
"flink": PipelineServiceType.Flink,
40+
"airflow": PipelineServiceType.Airflow,
41+
"dbt": PipelineServiceType.DBTCloud,
42+
"dagster": PipelineServiceType.Dagster,
43+
}
44+
45+
SERVICE_NAME_SUFFIX = "_openlineage"
46+
47+
48+
def extract_integration_type(event: OpenLineageEvent) -> Optional[str]:
49+
"""
50+
Extract the integration type from an OpenLineage event via the
51+
standard ``job.facets.jobType.integration`` field.
52+
53+
Returns a lowercase string like "spark", "flink", "airflow", or None.
54+
"""
55+
try:
56+
integration = event.job["facets"]["jobType"]["integration"]
57+
if integration:
58+
return integration.strip().lower()
59+
except (KeyError, TypeError, AttributeError):
60+
pass
61+
62+
return None
63+
64+
65+
def find_pipeline_by_namespace(
66+
metadata: OpenMetadata,
67+
event: OpenLineageEvent,
68+
) -> Optional[Tuple[str, Pipeline]]:
69+
"""
70+
Try to find an existing pipeline using ``namespace.jobName`` as FQN.
71+
72+
When pipelines are ingested by native connectors (e.g. Airflow), the OL
73+
job namespace often matches the pipeline service name already present in
74+
OMD. Checking this first avoids creating duplicate pipelines under a new
75+
service.
76+
77+
Returns ``(service_name, pipeline_entity)`` on hit, or ``None``.
78+
"""
79+
try:
80+
namespace = event.job.get("namespace")
81+
name = event.job.get("name")
82+
except (AttributeError, TypeError):
83+
return None
84+
85+
if not namespace or not name:
86+
return None
87+
88+
fallback_fqn = f"{namespace}.{name}"
89+
existing = metadata.get_by_name(Pipeline, fallback_fqn)
90+
if existing:
91+
logger.info(f"Resolved pipeline via namespace fallback: {fallback_fqn}")
92+
return namespace, existing
93+
94+
return None
95+
96+
97+
def resolve_pipeline_service_type(
98+
integration: Optional[str],
99+
) -> PipelineServiceType:
100+
"""Map an integration string to a PipelineServiceType enum."""
101+
if integration and integration in INTEGRATION_TO_SERVICE_TYPE:
102+
return INTEGRATION_TO_SERVICE_TYPE[integration]
103+
return PipelineServiceType.OpenLineage
104+
105+
106+
def build_service_name(integration: Optional[str], fallback_service: str) -> str:
107+
"""
108+
Build the pipeline service name.
109+
110+
If integration is recognized, returns "{integration}_openlineage".
111+
Otherwise returns the fallback (the configured OpenLineage service name).
112+
"""
113+
if integration and integration in INTEGRATION_TO_SERVICE_TYPE:
114+
return f"{integration}{SERVICE_NAME_SUFFIX}"
115+
return fallback_service
116+
117+
118+
def get_or_create_pipeline_service(
119+
metadata: OpenMetadata,
120+
service_name: str,
121+
service_type: PipelineServiceType,
122+
_cache: Optional[Dict[str, str]] = None,
123+
) -> str:
124+
"""
125+
Ensure a PipelineService with the given name and type exists in OMD.
126+
127+
Uses an external cache dict to avoid repeated API calls within a session.
128+
Returns the service name.
129+
"""
130+
if _cache is not None and service_name in _cache:
131+
return _cache[service_name]
132+
133+
existing = metadata.get_by_name(PipelineService, service_name)
134+
if existing:
135+
if _cache is not None:
136+
_cache[service_name] = service_name
137+
return service_name
138+
139+
logger.info(
140+
f"Creating pipeline service '{service_name}' with type '{service_type.value}'"
141+
)
142+
request = CreatePipelineServiceRequest(
143+
name=EntityName(service_name),
144+
serviceType=service_type,
145+
)
146+
metadata.create_or_update(request)
147+
148+
if _cache is not None:
149+
_cache[service_name] = service_name
150+
151+
return service_name

ingestion/tests/unit/topology/pipeline/test_openlineage.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1169,7 +1169,7 @@ def test_entity_detection_kafka_namespace_returns_topic(self):
11691169
self.assertIsNotNone(result.topic_details)
11701170
self.assertIsNone(result.table_details)
11711171
self.assertEqual(result.topic_details.name, "my-events-topic")
1172-
self.assertEqual(result.topic_details.broker_hostname, "broker-host")
1172+
self.assertEqual(result.topic_details.broker_hostname, "broker-host:9092")
11731173

11741174
def test_entity_detection_non_kafka_namespace_returns_table(self):
11751175
"""Test that non-kafka namespaces (e.g. bigquery, hive) are detected as tables."""
@@ -1193,7 +1193,7 @@ def test_topic_details_extraction_various_broker_formats(self):
11931193
{"name": "topic1", "namespace": "kafka://my-broker:9092"}
11941194
)
11951195
self.assertEqual(result.name, "topic1")
1196-
self.assertEqual(result.broker_hostname, "my-broker")
1196+
self.assertEqual(result.broker_hostname, "my-broker:9092")
11971197

11981198
# Broker without port
11991199
result = OpenlineageSource._get_topic_details(

0 commit comments

Comments
 (0)