Skip to content

Commit b95dbf9

Browse files
mohityadav766claude
andcommitted
fix(lineage): service nodes appearing in entity lineage view and empty By Service view (#27258)
* fix(lineage): prevent pipeline annotation inheritance in service/domain/dataProduct lineage and add pipeline service edges Bug #1: Service nodes (e.g., DatabaseService, MessagingService) were incorrectly appearing in entity-level lineage views. Root cause: getOrCreateLineageDetails() in addServiceLineage(), addDomainLineage(), and addDataProductsLineage() was copying the pipeline annotation from entity-level LineageDetails to service/domain/dataProduct-level LineageDetails. This caused service entities to have upstreamLineage.pipeline.fqnHash set in their Elasticsearch documents, making them match the PIPELINE_AS_EDGE_KEY query during BFS traversal and incorrectly appear alongside actual data assets. Fix: add .withPipeline(null) on each service/domain/dataProduct LineageDetails object to strip the pipeline annotation before persisting. Bug #2: "By Service" view was empty when viewing lineage for pipeline entities that were stored as edge annotators (Case B: table → topic with pipeline=flink_pipeline in LineageDetails) rather than as actual nodes (Case A). Root cause: addServiceLineage() only created database_service → kafka_service edges but no edges involving flink_pipeline_service. Fix: add addPipelineServiceEdges() called from addServiceLineage() that creates fromService → pipelineService and pipelineService → toService edges when a pipeline annotation exists in the entity-level lineage details. Also add unit tests covering both fixes to prevent regression. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(lineage): add migration to remove pipeline annotation from service/domain/dataProduct lineage edges The previous fix (e6df7a6) prevented new lineage from inheriting pipeline annotations on service/domain/dataProduct-level edges. However, existing data in the entity_relationship table already has pipeline set on those edges from before the fix, and Elasticsearch reindex reads from the DB — so reindex alone does not fix stale data. This migration removes the pipeline field from all service-to-service, domain-to-domain, and dataProduct-to-dataProduct lineage edges (relation=13/UPSTREAM) in entity_relationship. After upgrading and running this migration, operators should trigger an Elasticsearch/OpenSearch reindex so that the corrected DB records are reflected in the search index, which is what the lineage graph BFS traversal reads from. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(lineage): move pipeline annotation migration from 1.12.0 to 1.13.0 Moves the data migration that removes the pipeline field from service/domain/dataProduct lineage edges in entity_relationship to the 1.13.0 migration scripts, which is the correct target version. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(lineage): move pipeline annotation migration from 1.13.0 to new 1.12.6 Creates a new 1.12.6 migration with the data fix that removes the pipeline field from service/domain/dataProduct lineage edges in entity_relationship, and removes it from 1.13.0 where it was previously placed. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(lineage): add v1126 Java migration to create pipeline service edges for existing data For installations upgrading to 1.12.6 with existing lineage data, service edges fromService→pipelineService and pipelineService→toService were never created (only added by the code fix for new lineage going forward). This migration reads service-level lineage edges that have a pipeline annotation, resolves the pipeline entity's service, and inserts the two missing service edges into entity_relationship (DB only). After the SQL migration strips pipeline from service edges and a reindex runs, the "By Service" lineage view for pipeline services correctly shows their upstream/downstream service connections. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(lineage): fix v1126 migration to read entity-level edges for pipeline service creation The original migration read service-level edges (databaseService→messagingService) looking for pipeline annotations, but those had already been cleaned by the SQL migration before the Java migration could run in subsequent server restarts. Fix: read data-asset-level edges (table→topic etc.) which retain their pipeline annotation permanently. For each such edge, resolve fromEntity.service, toEntity.service, and pipeline.service, then create the two missing pipelineService edges in entity_relationship. Verified: after running the migration manually via direct SQL + OpenSearch update, the By Service view for lineage_test_flink_svc correctly shows 3 nodes with upstream (db_svc→flink_svc) and downstream (flink_svc→kafka_svc) edges. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(lineage): clean up pipeline service edges when entity lineage is deleted When entity-level lineage (table→topic) is deleted, cleanUpExtendedLineage only cleaned up fromService→toService (db_svc→kafka_svc) but left the new pipeline service edges (db_svc→flink_svc, flink_svc→kafka_svc) as orphans in both entity_relationship and OpenSearch. Fix: - Pass lineageDetails (which contains the pipeline reference) into cleanUpExtendedLineage from both deleteLineage and deleteLineageByFQN - Add cleanUpPipelineServiceEdges that mirrors addPipelineServiceEdges: uses getPipelineService(lineageDetails) to resolve the pipelineService, then calls processExtendedLineageCleanup for fromService→pipelineService and pipelineService→toService edges (decrement assetEdges or delete+remove from search if count reaches zero) - Also fix deleteLineageByFQN which was missing cleanUpExtendedLineage call entirely (pre-existing gap for service edge cleanup) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * test(lineage): add unit tests for pipeline annotation stripping and pipeline service edge creation - Add 4 new unit tests to LineageRepositoryTest covering: - Bug #1 (2 tests): service-level edges do not inherit pipeline annotation from entity lineage, both for new and existing edges - Bug #2 (2 tests): addPipelineServiceEdges creates fromService→pipelineService and pipelineService→toService edges when pipeline annotator is present, and skips them when no pipeline is set - Fix MySQL migration: add metadataService to entity type list (was in Java migration's SERVICE_ENTITY_TYPES but missing from SQL) and replace JSON_EXTRACT IS NOT NULL with JSON_CONTAINS_PATH to correctly handle both present and explicit-null pipeline fields - Fix PostgreSQL migration: add metadataService to entity type list Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * test(lineage): add integration tests for pipeline-as-annotator lineage scenario Tests Bug #1 (service nodes absent from entity-level lineage) and Bug #2 (pipeline service connected in service-level lineage) using a table → topic edge annotated with a pipeline entity reference. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * test(e2e): add Playwright tests for pipeline-as-annotator lineage scenario Tests Bug #1 (service nodes absent from entity-level lineage) and Bug #2 (pipeline service appears in service-level lineage) using API interception and direct request assertions via page.request.get(). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * style: apply spotless formatting to LineageRepositoryTest Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * style: apply prettier formatting to LineagePipelineAnnotator spec Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(lineage): guard against null LineageDetails in getPipelineService When the json column in entity_relationship is NULL, JsonUtils.readValue returns null. getPipelineService now short-circuits on a null argument instead of throwing NullPointerException via entityLineageDetails.getPipeline(). Fixes NPE in deleteLineageByFQN and deleteLineage cleanup paths. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(e2e): use authenticated apiContext for service lineage assertions page.request.get() sends browser cookies but OpenMetadata authenticates via JWT in localStorage, so those calls were unauthenticated (non-2xx). Replace with getToken + getAuthContext pattern used elsewhere. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(migration): add driveService to 1.12.6 pipeline annotation cleanup Directory, File, Spreadsheet, and Worksheet entities map to driveService, so service-level lineage edges between driveService instances could also have incorrectly inherited the pipeline annotation. Include driveService in the 1.12.6 cleanup migration for both MySQL and PostgreSQL. Also drops the stray trailing-newline changes from the 1.12.0 migration files — those edits were unnecessary. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * new line remove * fix(migration): add DRIVE_SERVICE to v1126 SERVICE_ENTITY_TYPES set driveService-to-driveService edges must be skipped during the pipeline service edge migration scan, same as all other service-level edges. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(migration): resolve merge conflict in v1126 MigrationUtil The rebase left MigrationUtil with duplicate imports and a missing closing brace on insertEdgeIfMissing. Merged both method sets cleanly and ran spotless. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> (cherry picked from commit c2e6d90)
1 parent 5147377 commit b95dbf9

12 files changed

Lines changed: 1234 additions & 16 deletions

File tree

bootstrap/sql/migrations/native/1.12.0/mysql/postDataMigrationSQLScript.sql

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,4 +76,3 @@ SET json = JSON_REMOVE(JSON_REMOVE(json, '$.inputPorts'), '$.outputPorts')
7676
WHERE jsonSchema = 'dataProduct'
7777
AND (JSON_CONTAINS_PATH(json, 'one', '$.inputPorts')
7878
OR JSON_CONTAINS_PATH(json, 'one', '$.outputPorts'));
79-

bootstrap/sql/migrations/native/1.12.0/postgres/postDataMigrationSQLScript.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,3 +110,4 @@ SET json = json::jsonb - 'inputPorts' - 'outputPorts'
110110
WHERE jsonSchema = 'dataProduct'
111111
AND (json::jsonb ?? 'inputPorts' OR json::jsonb ?? 'outputPorts');
112112

113+
Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,27 @@
1-
-- Placeholder for 1.12.6 MySQL post data migration SQL script
1+
-- Remove pipeline annotation from service-level, domain-level, and dataProduct-level lineage edges.
2+
-- These edges incorrectly inherited the pipeline annotation from entity-level lineage, causing service
3+
-- nodes to appear in entity-level lineage views and the "By Service" view to be empty for pipeline
4+
-- entities. After this migration, run an Elasticsearch/OpenSearch reindex to update search documents.
5+
UPDATE entity_relationship
6+
SET json = JSON_REMOVE(json, '$.pipeline')
7+
WHERE fromEntity IN ('databaseService', 'messagingService', 'pipelineService', 'dashboardService',
8+
'mlmodelService', 'metadataService', 'storageService', 'searchService', 'apiService',
9+
'driveService')
10+
AND toEntity IN ('databaseService', 'messagingService', 'pipelineService', 'dashboardService',
11+
'mlmodelService', 'metadataService', 'storageService', 'searchService', 'apiService',
12+
'driveService')
13+
AND relation = 13
14+
AND JSON_CONTAINS_PATH(json, 'one', '$.pipeline');
15+
16+
UPDATE entity_relationship
17+
SET json = JSON_REMOVE(json, '$.pipeline')
18+
WHERE fromEntity = 'domain' AND toEntity = 'domain'
19+
AND relation = 13
20+
AND JSON_EXTRACT(json, '$.pipeline') IS NOT NULL;
21+
22+
UPDATE entity_relationship
23+
SET json = JSON_REMOVE(json, '$.pipeline')
24+
WHERE fromEntity = 'dataProduct' AND toEntity = 'dataProduct'
25+
AND relation = 13
26+
AND JSON_EXTRACT(json, '$.pipeline') IS NOT NULL;
27+
Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,27 @@
1-
-- Placeholder for 1.12.6 Postgres post data migration SQL script
1+
-- Remove pipeline annotation from service-level, domain-level, and dataProduct-level lineage edges.
2+
-- These edges incorrectly inherited the pipeline annotation from entity-level lineage, causing service
3+
-- nodes to appear in entity-level lineage views and the "By Service" view to be empty for pipeline
4+
-- entities. After this migration, run an Elasticsearch/OpenSearch reindex to update search documents.
5+
UPDATE entity_relationship
6+
SET json = json - 'pipeline'
7+
WHERE fromentity IN ('databaseService', 'messagingService', 'pipelineService', 'dashboardService',
8+
'mlmodelService', 'metadataService', 'storageService', 'searchService', 'apiService',
9+
'driveService')
10+
AND toentity IN ('databaseService', 'messagingService', 'pipelineService', 'dashboardService',
11+
'mlmodelService', 'metadataService', 'storageService', 'searchService', 'apiService',
12+
'driveService')
13+
AND relation = 13
14+
AND json ?? 'pipeline';
15+
16+
UPDATE entity_relationship
17+
SET json = json - 'pipeline'
18+
WHERE fromentity = 'domain' AND toentity = 'domain'
19+
AND relation = 13
20+
AND json ?? 'pipeline';
21+
22+
UPDATE entity_relationship
23+
SET json = json - 'pipeline'
24+
WHERE fromentity = 'dataProduct' AND toentity = 'dataProduct'
25+
AND relation = 13
26+
AND json ?? 'pipeline';
27+
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
-- Placeholder for 1.12.6 Postgres schema changes
1+
-- Placeholder for 1.12.6 PostgreSQL schema changes
Lines changed: 320 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,320 @@
1+
package org.openmetadata.it.tests;
2+
3+
import static org.junit.jupiter.api.Assertions.assertFalse;
4+
import static org.junit.jupiter.api.Assertions.assertNotNull;
5+
import static org.junit.jupiter.api.Assertions.assertTrue;
6+
7+
import com.fasterxml.jackson.databind.JsonNode;
8+
import com.fasterxml.jackson.databind.ObjectMapper;
9+
import java.time.Duration;
10+
import java.util.List;
11+
import org.awaitility.Awaitility;
12+
import org.junit.jupiter.api.AfterAll;
13+
import org.junit.jupiter.api.BeforeAll;
14+
import org.junit.jupiter.api.Test;
15+
import org.junit.jupiter.api.TestInstance;
16+
import org.junit.jupiter.api.parallel.Execution;
17+
import org.junit.jupiter.api.parallel.ExecutionMode;
18+
import org.openmetadata.it.factories.DatabaseSchemaTestFactory;
19+
import org.openmetadata.it.factories.DatabaseServiceTestFactory;
20+
import org.openmetadata.it.factories.MessagingServiceTestFactory;
21+
import org.openmetadata.it.factories.PipelineServiceTestFactory;
22+
import org.openmetadata.it.util.SdkClients;
23+
import org.openmetadata.it.util.TestNamespace;
24+
import org.openmetadata.schema.api.data.CreatePipeline;
25+
import org.openmetadata.schema.api.data.CreateTable;
26+
import org.openmetadata.schema.api.data.CreateTopic;
27+
import org.openmetadata.schema.api.lineage.AddLineage;
28+
import org.openmetadata.schema.entity.data.DatabaseSchema;
29+
import org.openmetadata.schema.entity.data.Pipeline;
30+
import org.openmetadata.schema.entity.data.Table;
31+
import org.openmetadata.schema.entity.data.Topic;
32+
import org.openmetadata.schema.entity.services.DatabaseService;
33+
import org.openmetadata.schema.entity.services.MessagingService;
34+
import org.openmetadata.schema.entity.services.PipelineService;
35+
import org.openmetadata.schema.type.EntitiesEdge;
36+
import org.openmetadata.schema.type.LineageDetails;
37+
import org.openmetadata.sdk.client.OpenMetadataClient;
38+
import org.openmetadata.sdk.fluent.builders.ColumnBuilder;
39+
40+
/**
41+
* Integration tests for the pipeline-as-annotator lineage scenario.
42+
*
43+
* <p>When a pipeline is used as an edge annotation (not a lineage node) between a table and topic,
44+
* two bugs were observed:
45+
*
46+
* <ul>
47+
* <li>Bug #1: Service nodes (databaseService, messagingService, pipelineService) appeared in
48+
* entity-level lineage views, making graphs noisy.
49+
* <li>Bug #2: The pipeline service had no service-level edges, so the "By Service" view was
50+
* empty.
51+
* </ul>
52+
*
53+
* <p>Topology: {@code table → topic} (annotated with {@code pipeline})
54+
*/
55+
@Execution(ExecutionMode.SAME_THREAD)
56+
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
57+
public class LineagePipelineAnnotatorIT {
58+
59+
private static final ObjectMapper MAPPER = new ObjectMapper();
60+
private OpenMetadataClient client;
61+
private TestNamespace namespace;
62+
63+
private DatabaseService dbService;
64+
private MessagingService messagingService;
65+
private PipelineService pipelineService;
66+
private Table table;
67+
private Topic topic;
68+
private Pipeline pipeline;
69+
70+
@BeforeAll
71+
void setUp() throws Exception {
72+
client = SdkClients.adminClient();
73+
namespace = new TestNamespace("LineagePipelineAnnotatorIT");
74+
75+
dbService = DatabaseServiceTestFactory.createPostgres(namespace);
76+
DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(namespace, dbService);
77+
table = createTable(schema.getFullyQualifiedName());
78+
79+
messagingService = MessagingServiceTestFactory.createKafka(namespace);
80+
topic = createTopic(messagingService.getFullyQualifiedName());
81+
82+
pipelineService = PipelineServiceTestFactory.createAirflow(namespace);
83+
pipeline = createPipeline(pipelineService.getFullyQualifiedName());
84+
85+
addLineageWithPipelineAnnotator(table, topic, pipeline);
86+
waitForEntityLineageIndexed();
87+
waitForServiceLineageIndexed();
88+
}
89+
90+
@AfterAll
91+
void tearDown() {
92+
safeDeletePipeline(pipeline);
93+
safeDeleteTopic(topic);
94+
safeDeleteTable(table);
95+
}
96+
97+
@Test
98+
void entityLineage_ServiceNodesAreAbsent() throws Exception {
99+
JsonNode nodes = searchLineageNodes(table.getFullyQualifiedName(), "table", 1, 1);
100+
101+
assertNotNull(nodes);
102+
assertFalse(
103+
nodes.has(dbService.getFullyQualifiedName()),
104+
"DatabaseService should not appear in entity-level lineage");
105+
assertFalse(
106+
nodes.has(messagingService.getFullyQualifiedName()),
107+
"MessagingService should not appear in entity-level lineage");
108+
assertFalse(
109+
nodes.has(pipelineService.getFullyQualifiedName()),
110+
"PipelineService should not appear in entity-level lineage");
111+
}
112+
113+
@Test
114+
void entityLineage_DownstreamTopicIsPresent() throws Exception {
115+
JsonNode nodes = searchLineageNodes(table.getFullyQualifiedName(), "table", 0, 1);
116+
117+
assertNotNull(nodes);
118+
assertTrue(
119+
nodes.has(topic.getFullyQualifiedName()),
120+
"Topic should appear as downstream node in entity-level lineage");
121+
}
122+
123+
@Test
124+
void entityLineage_PipelineAnnotationPreservedOnEdge() throws Exception {
125+
JsonNode downstreamEdges =
126+
searchLineage(table.getFullyQualifiedName(), "table", 0, 1).get("downstreamEdges");
127+
128+
assertNotNull(downstreamEdges, "downstreamEdges should not be null");
129+
assertTrue(
130+
edgeHasPipelineAnnotation(downstreamEdges, pipeline.getFullyQualifiedName()),
131+
"Entity edge should carry the pipeline annotation");
132+
}
133+
134+
@Test
135+
void serviceLineage_PipelineServiceConnectedToBothServices() throws Exception {
136+
JsonNode nodes =
137+
searchLineageNodes(pipelineService.getFullyQualifiedName(), "pipelineService", 1, 1);
138+
139+
assertNotNull(nodes, "nodes should not be null for pipeline service lineage");
140+
assertTrue(
141+
nodes.has(dbService.getFullyQualifiedName()),
142+
"DatabaseService should appear in pipeline service lineage");
143+
assertTrue(
144+
nodes.has(messagingService.getFullyQualifiedName()),
145+
"MessagingService should appear in pipeline service lineage");
146+
}
147+
148+
@Test
149+
void serviceLineage_DatabaseServiceHasPipelineServiceDownstream() throws Exception {
150+
JsonNode nodes = searchLineageNodes(dbService.getFullyQualifiedName(), "databaseService", 0, 1);
151+
152+
assertNotNull(nodes);
153+
assertTrue(
154+
nodes.has(pipelineService.getFullyQualifiedName()),
155+
"PipelineService should appear as downstream of database service");
156+
}
157+
158+
@Test
159+
void serviceLineage_MessagingServiceHasPipelineServiceUpstream() throws Exception {
160+
JsonNode nodes =
161+
searchLineageNodes(messagingService.getFullyQualifiedName(), "messagingService", 1, 0);
162+
163+
assertNotNull(nodes);
164+
assertTrue(
165+
nodes.has(pipelineService.getFullyQualifiedName()),
166+
"PipelineService should appear as upstream of messaging service");
167+
}
168+
169+
// --- Helpers ---
170+
171+
private JsonNode searchLineageNodes(String fqn, String type, int upDepth, int downDepth)
172+
throws Exception {
173+
return searchLineage(fqn, type, upDepth, downDepth).get("nodes");
174+
}
175+
176+
private JsonNode searchLineage(String fqn, String type, int upDepth, int downDepth)
177+
throws Exception {
178+
String[] result = {null};
179+
Awaitility.await("searchLineage for " + fqn)
180+
.atMost(Duration.ofSeconds(30))
181+
.pollInterval(Duration.ofSeconds(2))
182+
.ignoreExceptions()
183+
.until(
184+
() -> {
185+
result[0] = client.lineage().searchLineage(fqn, type, upDepth, downDepth, false);
186+
return result[0] != null;
187+
});
188+
return MAPPER.readTree(result[0]);
189+
}
190+
191+
private boolean edgeHasPipelineAnnotation(JsonNode edgeMap, String pipelineFqn) {
192+
var edgeIter = edgeMap.elements();
193+
while (edgeIter.hasNext()) {
194+
JsonNode edge = edgeIter.next();
195+
JsonNode pipelineNode = edge.path("pipeline");
196+
if (!pipelineNode.isMissingNode() && !pipelineNode.isNull()) {
197+
String annotatedFqn = pipelineNode.path("fullyQualifiedName").asText("");
198+
if (annotatedFqn.equals(pipelineFqn)) {
199+
return true;
200+
}
201+
}
202+
}
203+
return false;
204+
}
205+
206+
private void addLineageWithPipelineAnnotator(Table from, Topic to, Pipeline pipe) {
207+
LineageDetails details =
208+
new LineageDetails()
209+
.withSource(LineageDetails.Source.PIPELINE_LINEAGE)
210+
.withPipeline(pipe.getEntityReference());
211+
212+
AddLineage addLineage =
213+
new AddLineage()
214+
.withEdge(
215+
new EntitiesEdge()
216+
.withFromEntity(from.getEntityReference())
217+
.withToEntity(to.getEntityReference())
218+
.withLineageDetails(details));
219+
220+
Awaitility.await("Add lineage " + from.getName() + " → " + to.getName())
221+
.atMost(Duration.ofSeconds(30))
222+
.pollInterval(Duration.ofSeconds(1))
223+
.ignoreExceptions()
224+
.until(
225+
() -> {
226+
client.lineage().addLineage(addLineage);
227+
return true;
228+
});
229+
}
230+
231+
private void waitForEntityLineageIndexed() {
232+
Awaitility.await("Wait for entity lineage in ES")
233+
.atMost(Duration.ofSeconds(90))
234+
.pollInterval(Duration.ofSeconds(3))
235+
.ignoreExceptions()
236+
.until(
237+
() -> {
238+
String result =
239+
client
240+
.lineage()
241+
.searchLineage(table.getFullyQualifiedName(), "table", 0, 1, false);
242+
JsonNode nodes = MAPPER.readTree(result).get("nodes");
243+
return nodes != null && nodes.has(topic.getFullyQualifiedName());
244+
});
245+
}
246+
247+
private void waitForServiceLineageIndexed() {
248+
Awaitility.await("Wait for service lineage in ES")
249+
.atMost(Duration.ofSeconds(90))
250+
.pollInterval(Duration.ofSeconds(3))
251+
.ignoreExceptions()
252+
.until(
253+
() -> {
254+
String result =
255+
client
256+
.lineage()
257+
.searchLineage(
258+
pipelineService.getFullyQualifiedName(), "pipelineService", 1, 1, false);
259+
JsonNode nodes = MAPPER.readTree(result).get("nodes");
260+
return nodes != null
261+
&& nodes.has(dbService.getFullyQualifiedName())
262+
&& nodes.has(messagingService.getFullyQualifiedName());
263+
});
264+
}
265+
266+
private Table createTable(String schemaFqn) {
267+
return client
268+
.tables()
269+
.create(
270+
new CreateTable()
271+
.withName(namespace.prefix("source_table"))
272+
.withDatabaseSchema(schemaFqn)
273+
.withColumns(List.of(new ColumnBuilder("id", "VARCHAR").dataLength(256).build())));
274+
}
275+
276+
private Topic createTopic(String serviceFqn) {
277+
CreateTopic request = new CreateTopic();
278+
request.setName(namespace.prefix("target_topic"));
279+
request.setService(serviceFqn);
280+
request.setPartitions(1);
281+
return client.topics().create(request);
282+
}
283+
284+
private Pipeline createPipeline(String serviceFqn) {
285+
CreatePipeline request = new CreatePipeline();
286+
request.setName(namespace.prefix("etl_pipeline"));
287+
request.setService(serviceFqn);
288+
return client.pipelines().create(request);
289+
}
290+
291+
private void safeDeleteTable(Table t) {
292+
if (t != null) {
293+
try {
294+
client.tables().delete(t.getId());
295+
} catch (Exception e) {
296+
// Ignore cleanup failures
297+
}
298+
}
299+
}
300+
301+
private void safeDeleteTopic(Topic t) {
302+
if (t != null) {
303+
try {
304+
client.topics().delete(t.getId());
305+
} catch (Exception e) {
306+
// Ignore cleanup failures
307+
}
308+
}
309+
}
310+
311+
private void safeDeletePipeline(Pipeline p) {
312+
if (p != null) {
313+
try {
314+
client.pipelines().delete(p.getId());
315+
} catch (Exception e) {
316+
// Ignore cleanup failures
317+
}
318+
}
319+
}
320+
}

0 commit comments

Comments
 (0)