Skip to content

Commit ea6f23e

Browse files
authored
Fixes #21886: allow all success openlineage event types (#24889)
1 parent 0c2b5b1 commit ea6f23e

3 files changed

Lines changed: 198 additions & 8 deletions

File tree

ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ class OpenlineageSource(PipelineServiceSource):
7272
Works under the assumption that OpenLineage integrations produce events to Kafka topic, which is a source of events
7373
for this connector.
7474
75-
Only 'SUCCESS' OpenLineage events are taken into account in this connector.
75+
Only OpenLineage events that indicate successfull data movement (COMPLETE, RUNNING, START) are taken into account in this connector.
7676
7777
Configuring OpenLineage integrations: https://openlineage.io/docs/integrations/about
7878
"""
@@ -195,18 +195,18 @@ def _render_pipeline_name(cls, pipeline_details: OpenLineageEvent) -> str:
195195
return f"{namespace}-{name}"
196196

197197
@classmethod
198-
def _filter_event_by_type(
199-
cls, event: OpenLineageEvent, event_type: EventType
198+
def _filter_event_by_types(
199+
cls, event: OpenLineageEvent, event_types: List[EventType]
200200
) -> Optional[Dict]:
201201
"""
202-
returns event if it's of particular event_type.
202+
returns event if it's of one of the particular event_types.
203203
for example - for lineage events we will be only looking for EventType.COMPLETE event type.
204204
205205
:param event: Open Lineage raw event.
206-
:param event_type: type of event we are looking for.
207-
:return: Open Lineage event if matches event_type, otherwise None
206+
:param event_types: list of event types we are looking for.
207+
:return: Open Lineage event if matches one of the event_types, otherwise None
208208
"""
209-
return event if event.event_type == event_type else {}
209+
return event if event.event_type in event_types else {}
210210

211211
@classmethod
212212
def _get_om_table_columns(cls, table_input: Dict) -> Optional[List]:
@@ -471,7 +471,10 @@ def get_pipelines_list(self) -> Optional[List[Any]]:
471471
_result = message_to_open_lineage_event(
472472
json.loads(message.value())
473473
)
474-
result = self._filter_event_by_type(_result, EventType.COMPLETE)
474+
result = self._filter_event_by_types(
475+
_result,
476+
[EventType.COMPLETE, EventType.RUNNING, EventType.START],
477+
)
475478
if result:
476479
yield result
477480
except Exception as e:

ingestion/src/metadata/ingestion/source/pipeline/openlineage/models.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,9 @@ class EventType(str, Enum):
8686
List of used OpenLineage event types.
8787
"""
8888

89+
START = "START"
90+
RUNNING = "RUNNING"
8991
COMPLETE = "COMPLETE"
92+
ABORT = "ABORT"
93+
FAIL = "FAIL"
94+
OTHER = "OTHER"

ingestion/tests/unit/topology/pipeline/test_openlineage.py

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from uuid import UUID
77

88
from metadata.generated.schema.api.data.createTable import CreateTableRequest
9+
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
910
from metadata.generated.schema.entity.data.pipeline import Pipeline, Task
1011
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
1112
OpenMetadataConnection,
@@ -597,6 +598,187 @@ def test_get_create_table_request(self, mock_get_schema_fqn, mock_get_table_fqn)
597598
create_request.columns[i].dataTypeDisplay, expected_type_display
598599
)
599600

601+
@patch("confluent_kafka.Consumer")
602+
def test_get_pipelines_list_filters_complete_events(self, mock_consumer_class):
603+
"""Test that get_pipelines_list returns COMPLETE events"""
604+
event = copy.deepcopy(VALID_EVENT)
605+
event["eventType"] = "COMPLETE"
606+
self.setup_mock_consumer_with_kafka_event(event)
607+
608+
result_generator = self.open_lineage_source.get_pipelines_list()
609+
results = list(result_generator)
610+
611+
self.assertEqual(len(results), 1)
612+
self.assertIsInstance(results[0], OpenLineageEvent)
613+
self.assertEqual(results[0].event_type, "COMPLETE")
614+
615+
@patch("confluent_kafka.Consumer")
616+
def test_get_pipelines_list_filters_running_events(self, mock_consumer_class):
617+
"""Test that get_pipelines_list returns RUNNING events"""
618+
event = copy.deepcopy(VALID_EVENT)
619+
event["eventType"] = "RUNNING"
620+
self.setup_mock_consumer_with_kafka_event(event)
621+
622+
result_generator = self.open_lineage_source.get_pipelines_list()
623+
results = list(result_generator)
624+
625+
self.assertEqual(len(results), 1)
626+
self.assertIsInstance(results[0], OpenLineageEvent)
627+
self.assertEqual(results[0].event_type, "RUNNING")
628+
629+
@patch("confluent_kafka.Consumer")
630+
def test_get_pipelines_list_filters_start_events(self, mock_consumer_class):
631+
"""Test that get_pipelines_list returns START events"""
632+
event = copy.deepcopy(VALID_EVENT)
633+
event["eventType"] = "START"
634+
self.setup_mock_consumer_with_kafka_event(event)
635+
636+
result_generator = self.open_lineage_source.get_pipelines_list()
637+
results = list(result_generator)
638+
639+
self.assertEqual(len(results), 1)
640+
self.assertIsInstance(results[0], OpenLineageEvent)
641+
self.assertEqual(results[0].event_type, "START")
642+
643+
@patch("confluent_kafka.Consumer")
644+
def test_get_pipelines_list_filters_out_fail_events(self, mock_consumer_class):
645+
"""Test that get_pipelines_list filters out FAIL events"""
646+
event = copy.deepcopy(VALID_EVENT)
647+
event["eventType"] = "FAIL"
648+
self.setup_mock_consumer_with_kafka_event(event)
649+
650+
result_generator = self.open_lineage_source.get_pipelines_list()
651+
results = list(result_generator)
652+
653+
self.assertEqual(len(results), 0)
654+
655+
@patch("confluent_kafka.Consumer")
656+
def test_get_pipelines_list_filters_out_abort_events(self, mock_consumer_class):
657+
"""Test that get_pipelines_list filters out ABORT events"""
658+
event = copy.deepcopy(VALID_EVENT)
659+
event["eventType"] = "ABORT"
660+
self.setup_mock_consumer_with_kafka_event(event)
661+
662+
result_generator = self.open_lineage_source.get_pipelines_list()
663+
results = list(result_generator)
664+
665+
self.assertEqual(len(results), 0)
666+
667+
@patch(
668+
"metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn_from_om"
669+
)
670+
def test_lineage_merge_start_with_data_running_without(self, mock_get_table_fqn):
671+
"""
672+
Test that START event with lineage data followed by RUNNING event without
673+
lineage data does not overwrite existing lineage in the database.
674+
675+
This simulates Flink streaming jobs where:
676+
- START event contains initial lineage
677+
- RUNNING events are heartbeats with no/empty lineage
678+
679+
The test verifies the complete flow:
680+
1. START event creates lineage with column details
681+
2. RUNNING event with empty data is processed
682+
3. Query back the lineage - it should still have the original data
683+
"""
684+
# Create START event with lineage data
685+
start_event = copy.deepcopy(FULL_OL_KAFKA_EVENT)
686+
start_event["eventType"] = "START"
687+
688+
# Create RUNNING event with same job but no lineage (empty inputs/outputs)
689+
running_event = copy.deepcopy(FULL_OL_KAFKA_EVENT)
690+
running_event["eventType"] = "RUNNING"
691+
running_event["inputs"] = []
692+
running_event["outputs"] = []
693+
694+
# Mock table FQN lookup
695+
def mock_fqn_side_effect(table_details):
696+
return f"testService.shopify.{table_details.name}"
697+
698+
mock_get_table_fqn.side_effect = mock_fqn_side_effect
699+
700+
# Mock metadata.get_by_name for table lookups
701+
from_table_id = "69fc8906-4a4a-45ab-9a54-9cc2d399e10e"
702+
to_table_id = "59fc8906-4a4a-45ab-9a54-9cc2d399e10e"
703+
704+
def mock_get_uuid_by_name(entity, fqn):
705+
if fqn == "testService.shopify.raw_product_catalog":
706+
return Mock(id=from_table_id)
707+
elif fqn == "testService.shopify.fact_order_new5":
708+
return Mock(id=to_table_id)
709+
elif "openlineage_source" in fqn: # Pipeline entity
710+
return Mock(id=Mock(root="79fc8906-4a4a-45ab-9a54-9cc2d399e10e"))
711+
return None
712+
713+
# Process START event with lineage
714+
start_ol_event = message_to_open_lineage_event(start_event)
715+
with patch.object(
716+
OpenMetadataConnection,
717+
"get_by_name",
718+
create=True,
719+
side_effect=mock_get_uuid_by_name,
720+
):
721+
start_lineage_results = list(
722+
self.open_lineage_source.yield_pipeline_lineage_details(start_ol_event)
723+
)
724+
725+
# Process RUNNING event without lineage
726+
running_ol_event = message_to_open_lineage_event(running_event)
727+
with patch.object(
728+
OpenMetadataConnection,
729+
"get_by_name",
730+
create=True,
731+
side_effect=mock_get_uuid_by_name,
732+
):
733+
running_lineage_results = list(
734+
self.open_lineage_source.yield_pipeline_lineage_details(
735+
running_ol_event
736+
)
737+
)
738+
739+
# Extract lineage requests from START event
740+
start_lineage_requests = [
741+
r.right
742+
for r in start_lineage_results
743+
if r.right and isinstance(r.right, AddLineageRequest)
744+
]
745+
746+
# Extract lineage requests from RUNNING event
747+
running_lineage_requests = [
748+
r.right
749+
for r in running_lineage_results
750+
if r.right and isinstance(r.right, AddLineageRequest)
751+
]
752+
753+
# Verify START event produced lineage with column details
754+
start_requests_with_columns = [
755+
req
756+
for req in start_lineage_requests
757+
if req.edge.lineageDetails and req.edge.lineageDetails.columnsLineage
758+
]
759+
self.assertGreater(
760+
len(start_requests_with_columns),
761+
0,
762+
"START event should produce lineage requests with column details",
763+
)
764+
765+
# Count column lineage entries from START
766+
start_column_count = sum(
767+
len(req.edge.lineageDetails.columnsLineage)
768+
for req in start_requests_with_columns
769+
)
770+
self.assertGreater(
771+
start_column_count, 0, "START event should have column lineage"
772+
)
773+
774+
# Key assertion: RUNNING event with empty inputs/outputs produces no lineage requests
775+
# This prevents empty data from being sent to the database
776+
self.assertEqual(
777+
len(running_lineage_requests),
778+
0,
779+
"RUNNING event with empty inputs/outputs should not produce any lineage requests",
780+
)
781+
600782

601783
if __name__ == "__main__":
602784
unittest.main()

0 commit comments

Comments
 (0)