Skip to content

Commit 3fc48ac

Browse files
committed
Add Invalidation for Lineage Cache during updates (#27606)
* Add Invalidation for Lineage Cache during updates * Add test (cherry picked from commit 557626f)
1 parent 706a3ab commit 3fc48ac

9 files changed

Lines changed: 290 additions & 8 deletions

File tree

openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,16 @@ private void addLineageToSearch(
393393
buildEntityLineageData(fromEntity, toEntity, lineageDetails).withToEntity(null);
394394
Pair<String, String> to = new ImmutablePair<>("_id", toEntity.getId().toString());
395395
searchClient.updateLineage(destinationIndexName, to, lineageData);
396+
invalidateLineageCacheForEdge(fromEntity, toEntity);
397+
}
398+
399+
private void invalidateLineageCacheForEdge(EntityReference from, EntityReference to) {
400+
if (from != null) {
401+
searchClient.invalidateLineageCache(from.getFullyQualifiedName());
402+
}
403+
if (to != null) {
404+
searchClient.invalidateLineageCache(to.getFullyQualifiedName());
405+
}
396406
}
397407

398408
public static RelationshipRef buildEntityRefLineage(EntityReference entityRef) {
@@ -1187,12 +1197,28 @@ private void deleteLineageFromSearch(List<CollectionDAO.EntityRelationshipObject
11871197
for (CollectionDAO.EntityRelationshipObject obj : relations) {
11881198
LineageDetails lineageDetails = JsonUtils.readValue(obj.getJson(), LineageDetails.class);
11891199
deleteLineageFromSearch(
1190-
new EntityReference().withId(UUID.fromString(obj.getFromId())),
1191-
new EntityReference().withId(UUID.fromString(obj.getToId())),
1200+
resolveRefForCacheInvalidation(obj.getFromEntity(), obj.getFromId()),
1201+
resolveRefForCacheInvalidation(obj.getToEntity(), obj.getToId()),
11921202
lineageDetails);
11931203
}
11941204
}
11951205

1206+
private EntityReference resolveRefForCacheInvalidation(String entityType, String id) {
1207+
EntityReference ref = new EntityReference().withId(UUID.fromString(id));
1208+
if (nullOrEmpty(entityType)) {
1209+
return ref;
1210+
}
1211+
try {
1212+
EntityReference resolved =
1213+
Entity.getEntityReferenceById(entityType, UUID.fromString(id), Include.ALL);
1214+
return ref.withType(entityType).withFullyQualifiedName(resolved.getFullyQualifiedName());
1215+
} catch (Exception e) {
1216+
LOG.debug(
1217+
"Could not resolve FQN for {}:{} during lineage cache invalidation", entityType, id);
1218+
return ref.withType(entityType);
1219+
}
1220+
}
1221+
11961222
private void deleteLineageFromSearch(
11971223
EntityReference fromEntity, EntityReference toEntity, LineageDetails lineageDetails) {
11981224
String uniqueValue = getDocumentUniqueId(fromEntity, toEntity);
@@ -1202,6 +1228,7 @@ private void deleteLineageFromSearch(
12021228
new ImmutablePair<>("upstreamLineage.docUniqueId.keyword", uniqueValue),
12031229
new ImmutablePair<>(
12041230
REMOVE_LINEAGE_SCRIPT, Collections.singletonMap("docUniqueId", uniqueValue)));
1231+
invalidateLineageCacheForEdge(fromEntity, toEntity);
12051232
} catch (Exception e) {
12061233
SearchIndexRetryQueue.enqueue(
12071234
fromEntity.getId() != null ? fromEntity.getId().toString() : null,

openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -718,4 +718,15 @@ default void initializeLineageBuilders() {
718718
// Default implementation does nothing - concrete implementations can override
719719
// This allows backward compatibility for clients that don't need lineage features
720720
}
721+
722+
/**
723+
* Evicts every cached lineage graph whose root, nodes, or edge endpoints reference
724+
* the given FQN. Callers invoke this after a lineage edge involving the FQN is added
725+
* or deleted so stale graphs are not served back to the UI.
726+
*
727+
* @param fqn Fully qualified name of the entity touched by the mutation
728+
*/
729+
default void invalidateLineageCache(String fqn) {
730+
// Default no-op; concrete clients delegate to their LineageGraphBuilder cache
731+
}
721732
}

openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,14 @@ public SearchLineageResult searchPlatformLineage(
425425
return lineageGraphBuilder.getPlatformLineage(index, queryFilter, deleted);
426426
}
427427

428+
@Override
429+
public void invalidateLineageCache(String fqn) {
430+
if (lineageGraphBuilder == null) {
431+
return;
432+
}
433+
lineageGraphBuilder.invalidateLineageCacheForFqn(fqn);
434+
}
435+
428436
@Override
429437
public Response searchEntityRelationship(
430438
String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted)

openmetadata-service/src/main/java/org/openmetadata/service/search/lineage/AbstractLineageGraphBuilder.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -183,15 +183,26 @@ protected void cacheResult(SearchLineageRequest request, SearchLineageResult res
183183
* Should be called when entity is updated or lineage edges change.
184184
*/
185185
protected void invalidateCache(String fqn) {
186-
if (!config.isEnableCaching()) {
186+
invalidateLineageCacheForFqn(fqn);
187+
}
188+
189+
/**
190+
* Drops every cached lineage graph whose root, nodes, or edges reference the given FQN.
191+
* Called when a lineage edge touching this FQN is added or deleted.
192+
*/
193+
public void invalidateLineageCacheForFqn(String fqn) {
194+
if (!config.isEnableCaching() || nullOrEmpty(fqn)) {
187195
return;
188196
}
197+
cache.invalidateIfGraphContains(fqn);
198+
}
189199

190-
// Note: This is a simplified invalidation.
191-
// Full implementation would need to invalidate all cache entries
192-
// that involve this FQN (as source or in the graph).
193-
// For now, we rely on TTL-based expiration.
194-
LOG.debug("Cache invalidation requested for fqn={} (TTL-based expiration active)", fqn);
200+
/** Drops the entire lineage graph cache. */
201+
public void invalidateAllLineageCache() {
202+
if (!config.isEnableCaching()) {
203+
return;
204+
}
205+
cache.invalidateAll();
195206
}
196207

197208
/**

openmetadata-service/src/main/java/org/openmetadata/service/search/lineage/GuavaLineageGraphCache.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,55 @@ public void invalidateAll() {
129129
LOG.info("Cache INVALIDATE_ALL: Cleared all {} entries", cache.size());
130130
}
131131

132+
@Override
133+
public void invalidateIfGraphContains(String fqn) {
134+
if (fqn == null || fqn.isEmpty() || cache.size() == 0) {
135+
return;
136+
}
137+
java.util.List<LineageCacheKey> toEvict = new java.util.ArrayList<>();
138+
for (java.util.Map.Entry<LineageCacheKey, SearchLineageResult> entry :
139+
cache.asMap().entrySet()) {
140+
if (graphReferencesFqn(entry.getKey(), entry.getValue(), fqn)) {
141+
toEvict.add(entry.getKey());
142+
}
143+
}
144+
if (!toEvict.isEmpty()) {
145+
cache.invalidateAll(toEvict);
146+
LOG.debug("Cache INVALIDATE_FQN fqn={} evicted {} entries", fqn, toEvict.size());
147+
}
148+
}
149+
150+
private boolean graphReferencesFqn(LineageCacheKey key, SearchLineageResult result, String fqn) {
151+
if (fqn.equals(key.getFqn())) {
152+
return true;
153+
}
154+
if (result == null) {
155+
return false;
156+
}
157+
if (result.getNodes() != null && result.getNodes().containsKey(fqn)) {
158+
return true;
159+
}
160+
return edgeMapReferencesFqn(result.getUpstreamEdges(), fqn)
161+
|| edgeMapReferencesFqn(result.getDownstreamEdges(), fqn);
162+
}
163+
164+
private boolean edgeMapReferencesFqn(
165+
java.util.Map<String, org.openmetadata.schema.api.lineage.EsLineageData> edges, String fqn) {
166+
if (edges == null || edges.isEmpty()) {
167+
return false;
168+
}
169+
for (org.openmetadata.schema.api.lineage.EsLineageData edge : edges.values()) {
170+
if (edge.getFromEntity() != null
171+
&& fqn.equals(edge.getFromEntity().getFullyQualifiedName())) {
172+
return true;
173+
}
174+
if (edge.getToEntity() != null && fqn.equals(edge.getToEntity().getFullyQualifiedName())) {
175+
return true;
176+
}
177+
}
178+
return false;
179+
}
180+
132181
@Override
133182
public CacheStats getStats() {
134183
return cache.stats();

openmetadata-service/src/main/java/org/openmetadata/service/search/lineage/LineageGraphCache.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,17 @@ public interface LineageGraphCache {
5252
*/
5353
void invalidateAll();
5454

55+
/**
56+
* Invalidates every cached graph whose root FQN, nodes map, or edge endpoints reference
57+
* the given FQN. Used after lineage edges touching the FQN are added or deleted so stale
58+
* graphs are not served.
59+
*
60+
* @param fqn Fully qualified name of the entity whose graphs should be evicted
61+
*/
62+
default void invalidateIfGraphContains(String fqn) {
63+
invalidateAll();
64+
}
65+
5566
/**
5667
* Gets cache statistics for monitoring and metrics.
5768
*

openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,14 @@ public SearchLineageResult searchPlatformLineage(
415415
return lineageGraphBuilder.getPlatformLineage(index, queryFilter, deleted);
416416
}
417417

418+
@Override
419+
public void invalidateLineageCache(String fqn) {
420+
if (lineageGraphBuilder == null) {
421+
return;
422+
}
423+
lineageGraphBuilder.invalidateLineageCacheForFqn(fqn);
424+
}
425+
418426
@Override
419427
public Response searchEntityRelationship(
420428
String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted)

openmetadata-service/src/test/java/org/openmetadata/service/search/lineage/AbstractLineageGraphBuilderTest.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -815,5 +815,73 @@ public <T> List<T> sortEntitiesByDepthThenName(
815815
java.util.function.Function<T, String> fqnExtractor) {
816816
return super.sortEntitiesByDepthThenName(entities, depthExtractor, fqnExtractor);
817817
}
818+
819+
public void cacheResultForTest(SearchLineageRequest request, SearchLineageResult result) {
820+
super.cacheResult(request, result);
821+
}
822+
823+
public java.util.Optional<SearchLineageResult> checkCacheForTest(SearchLineageRequest request) {
824+
return super.checkCache(request);
825+
}
826+
}
827+
828+
@Test
829+
void invalidateLineageCacheForFqnDropsEntryKeyedByThatFqn() {
830+
SearchLineageRequest request =
831+
new SearchLineageRequest().withFqn("svc.db.target").withUpstreamDepth(3);
832+
SearchLineageResult result = new SearchLineageResult().withNodes(new HashMap<>());
833+
834+
builder.cacheResultForTest(request, result);
835+
assertTrue(builder.checkCacheForTest(request).isPresent(), "Precondition: entry cached");
836+
837+
builder.invalidateLineageCacheForFqn("svc.db.target");
838+
839+
assertFalse(
840+
builder.checkCacheForTest(request).isPresent(),
841+
"Entry whose root FQN matches must be evicted");
842+
}
843+
844+
@Test
845+
void invalidateLineageCacheForFqnDropsEntryWhereFqnAppearsInNodes() {
846+
SearchLineageRequest request =
847+
new SearchLineageRequest().withFqn("svc.db.root").withUpstreamDepth(3);
848+
SearchLineageResult result = new SearchLineageResult().withNodes(new HashMap<>());
849+
result.getNodes().put("svc.db.target", new NodeInformation().withNodeDepth(1));
850+
851+
builder.cacheResultForTest(request, result);
852+
853+
builder.invalidateLineageCacheForFqn("svc.db.target");
854+
855+
assertFalse(
856+
builder.checkCacheForTest(request).isPresent(),
857+
"Entry containing the FQN in its nodes must be evicted");
858+
}
859+
860+
@Test
861+
void invalidateLineageCacheForFqnLeavesUnrelatedEntriesAlone() {
862+
SearchLineageRequest request =
863+
new SearchLineageRequest().withFqn("svc.db.kept").withUpstreamDepth(3);
864+
SearchLineageResult result = new SearchLineageResult().withNodes(new HashMap<>());
865+
result.getNodes().put("svc.db.kept", new NodeInformation().withNodeDepth(0));
866+
867+
builder.cacheResultForTest(request, result);
868+
869+
builder.invalidateLineageCacheForFqn("svc.db.unrelated");
870+
871+
assertTrue(
872+
builder.checkCacheForTest(request).isPresent(),
873+
"Unrelated cache entry must survive FQN-targeted invalidation");
874+
}
875+
876+
@Test
877+
void invalidateLineageCacheForFqnIgnoresNullOrBlank() {
878+
SearchLineageRequest request =
879+
new SearchLineageRequest().withFqn("svc.db.kept").withUpstreamDepth(3);
880+
builder.cacheResultForTest(request, new SearchLineageResult().withNodes(new HashMap<>()));
881+
882+
builder.invalidateLineageCacheForFqn(null);
883+
builder.invalidateLineageCacheForFqn("");
884+
885+
assertTrue(builder.checkCacheForTest(request).isPresent());
818886
}
819887
}

openmetadata-service/src/test/java/org/openmetadata/service/search/lineage/GuavaLineageGraphCacheTest.java

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import java.util.Optional;
2121
import org.junit.jupiter.api.BeforeEach;
2222
import org.junit.jupiter.api.Test;
23+
import org.openmetadata.schema.api.lineage.EsLineageData;
24+
import org.openmetadata.schema.api.lineage.RelationshipRef;
2325
import org.openmetadata.schema.api.lineage.SearchLineageRequest;
2426
import org.openmetadata.schema.api.lineage.SearchLineageResult;
2527

@@ -216,4 +218,91 @@ private SearchLineageResult createMockResult(int nodeCount) {
216218

217219
return result;
218220
}
221+
222+
@Test
223+
public void testInvalidateIfGraphContainsEvictsEntryKeyedByFqn() {
224+
LineageCacheKey targetKey =
225+
LineageCacheKey.fromRequest(new SearchLineageRequest().withFqn("svc.db.target"));
226+
LineageCacheKey otherKey =
227+
LineageCacheKey.fromRequest(new SearchLineageRequest().withFqn("svc.db.other"));
228+
229+
cache.put(targetKey, createMockResult(5));
230+
cache.put(otherKey, createMockResult(5));
231+
232+
cache.invalidateIfGraphContains("svc.db.target");
233+
234+
assertFalse(cache.get(targetKey).isPresent(), "Root-FQN-matching entry must be evicted");
235+
assertTrue(cache.get(otherKey).isPresent(), "Unrelated entry must survive");
236+
}
237+
238+
@Test
239+
public void testInvalidateIfGraphContainsEvictsWhenFqnAppearsAsNode() {
240+
LineageCacheKey rootKey =
241+
LineageCacheKey.fromRequest(new SearchLineageRequest().withFqn("svc.db.root"));
242+
LineageCacheKey otherKey =
243+
LineageCacheKey.fromRequest(new SearchLineageRequest().withFqn("svc.db.elsewhere"));
244+
245+
SearchLineageResult resultWithTarget = createMockResult(0);
246+
resultWithTarget.getNodes().put("svc.db.target", null);
247+
248+
cache.put(rootKey, resultWithTarget);
249+
cache.put(otherKey, createMockResult(5));
250+
251+
cache.invalidateIfGraphContains("svc.db.target");
252+
253+
assertFalse(
254+
cache.get(rootKey).isPresent(), "Entry whose nodes contain the FQN must be evicted");
255+
assertTrue(cache.get(otherKey).isPresent(), "Unrelated entry must survive");
256+
}
257+
258+
@Test
259+
public void testInvalidateIfGraphContainsEvictsWhenFqnAppearsAsEdgeEndpoint() {
260+
LineageCacheKey rootKey =
261+
LineageCacheKey.fromRequest(new SearchLineageRequest().withFqn("svc.db.root"));
262+
263+
SearchLineageResult resultWithEdge = createMockResult(0);
264+
EsLineageData edge =
265+
new EsLineageData()
266+
.withDocId("svc.db.root->svc.db.downstream")
267+
.withFromEntity(new RelationshipRef().withFullyQualifiedName("svc.db.root"))
268+
.withToEntity(new RelationshipRef().withFullyQualifiedName("svc.db.downstream"));
269+
resultWithEdge.getDownstreamEdges().put("edge1", edge);
270+
271+
cache.put(rootKey, resultWithEdge);
272+
273+
cache.invalidateIfGraphContains("svc.db.downstream");
274+
275+
assertFalse(
276+
cache.get(rootKey).isPresent(),
277+
"Entry whose edge endpoint matches the FQN must be evicted");
278+
}
279+
280+
@Test
281+
public void testInvalidateIfGraphContainsIsNoOpForNullOrBlankFqn() {
282+
LineageCacheKey key =
283+
LineageCacheKey.fromRequest(new SearchLineageRequest().withFqn("svc.db.kept"));
284+
cache.put(key, createMockResult(5));
285+
286+
cache.invalidateIfGraphContains(null);
287+
cache.invalidateIfGraphContains("");
288+
289+
assertTrue(cache.get(key).isPresent(), "Null/blank FQN must not evict anything");
290+
}
291+
292+
@Test
293+
public void testInvalidateIfGraphContainsLeavesUnrelatedEntries() {
294+
LineageCacheKey keyA =
295+
LineageCacheKey.fromRequest(new SearchLineageRequest().withFqn("svc.db.a"));
296+
LineageCacheKey keyB =
297+
LineageCacheKey.fromRequest(new SearchLineageRequest().withFqn("svc.db.b"));
298+
299+
cache.put(keyA, createMockResult(5));
300+
cache.put(keyB, createMockResult(5));
301+
302+
cache.invalidateIfGraphContains("svc.db.unrelated");
303+
304+
assertTrue(cache.get(keyA).isPresent());
305+
assertTrue(cache.get(keyB).isPresent());
306+
assertEquals(2, cache.size());
307+
}
219308
}

0 commit comments

Comments
 (0)