Skip to content

Commit 055acb3

Browse files
RDF Knowledge Graph epic with requirements including structured RDF modeling, PROV-O lineage representation, idempotent indexing, inference engine support, and scalable indexing architecture. (#24911)
* RDF Knowledge Graph epic with requirements including structured RDF modeling, PROV-O lineage representation, idempotent indexing, inference engine support, and scalable indexing architecture. * Update generated TypeScript types * RDF Knowledge Graph epic with requirements including structured RDF modeling, PROV-O lineage representation, idempotent indexing, inference engine support, and scalable indexing architecture. --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 9731654 commit 055acb3

22 files changed

Lines changed: 4882 additions & 329 deletions

File tree

openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/RdfIndexApp.java

Lines changed: 351 additions & 159 deletions
Large diffs are not rendered by default.

openmetadata-service/src/main/java/org/openmetadata/service/rdf/RdfRepository.java

Lines changed: 215 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ private RdfRepository(RdfConfiguration config) {
4444
new JsonLdTranslator(JsonUtils.getObjectMapper(), config.getBaseUri().toString());
4545
LOG.info("RDF Repository initialized with {} storage", config.getStorageType());
4646

47-
// Load ontologies on initialization
4847
loadOntologies();
4948
} else {
5049
this.storageService = null;
@@ -124,7 +123,6 @@ public void delete(EntityReference entityReference) {
124123
+ "/"
125124
+ entityReference.getId();
126125

127-
// Remove entity and all its relationships from the knowledge graph
128126
String sparqlUpdate =
129127
String.format(
130128
"DELETE WHERE { GRAPH <%s> { <%s> ?p ?o } }; "
@@ -144,10 +142,8 @@ public void addRelationship(EntityRelationship relationship) {
144142
}
145143

146144
try {
147-
// Create a model for the relationship with proper RDF predicates
148145
Model relationshipModel = createRelationshipModel(relationship);
149146

150-
// Store the relationship model
151147
String fromUri =
152148
config.getBaseUri().toString()
153149
+ "entity/"
@@ -194,7 +190,6 @@ public void addRelationship(EntityRelationship relationship) {
194190
private Model createRelationshipModel(EntityRelationship relationship) {
195191
Model model = ModelFactory.createDefaultModel();
196192

197-
// Set namespace prefixes
198193
model.setNsPrefix("om", "https://open-metadata.org/ontology/");
199194
model.setNsPrefix("prov", "http://www.w3.org/ns/prov#");
200195
model.setNsPrefix("dct", "http://purl.org/dc/terms/");
@@ -215,18 +210,15 @@ private Model createRelationshipModel(EntityRelationship relationship) {
215210
Resource fromResource = model.createResource(fromUri);
216211
Resource toResource = model.createResource(toUri);
217212

218-
// Map relationship type to RDF predicate based on entityRelationship context
219213
String relationshipType = relationship.getRelationshipType().value();
220214
Property predicate = getRelationshipPredicate(relationshipType, model);
221215

222-
// Add the relationship triple
223216
fromResource.addProperty(predicate, toResource);
224217

225218
return model;
226219
}
227220

228221
private Property getRelationshipPredicate(String relationshipType, Model model) {
229-
// Map OpenMetadata relationship types to RDF predicates from context
230222
return switch (relationshipType.toLowerCase()) {
231223
case "contains" -> model.createProperty("https://open-metadata.org/ontology/", "contains");
232224
case "uses" -> model.createProperty("http://www.w3.org/ns/prov#", "used");
@@ -268,6 +260,189 @@ public void bulkAddRelationships(List<EntityRelationship> relationships) {
268260
}
269261
}
270262

263+
/**
264+
* Add a lineage relationship with full details (SQL query, pipeline, column lineage). This stores
265+
* the lineage as structured RDF triples instead of a single JSON literal, enabling rich SPARQL
266+
* queries like: "Find all tables derived from table X via pipeline Y" or "What columns from
267+
* source table feed into column Z"
268+
*/
269+
public void addLineageWithDetails(
270+
String fromType,
271+
UUID fromId,
272+
String toType,
273+
UUID toId,
274+
org.openmetadata.schema.type.LineageDetails lineageDetails) {
275+
if (!isEnabled()) {
276+
return;
277+
}
278+
279+
try {
280+
Model model = ModelFactory.createDefaultModel();
281+
282+
model.setNsPrefix("om", "https://open-metadata.org/ontology/");
283+
model.setNsPrefix("prov", "http://www.w3.org/ns/prov#");
284+
model.setNsPrefix("dct", "http://purl.org/dc/terms/");
285+
286+
String fromUri = config.getBaseUri().toString() + "entity/" + fromType + "/" + fromId;
287+
String toUri = config.getBaseUri().toString() + "entity/" + toType + "/" + toId;
288+
289+
Resource fromResource = model.createResource(fromUri);
290+
Resource toResource = model.createResource(toUri);
291+
292+
// PROV-O: to wasDerivedFrom from (reverse direction for semantic correctness)
293+
Property derivedFrom = model.createProperty("http://www.w3.org/ns/prov#", "wasDerivedFrom");
294+
toResource.addProperty(derivedFrom, fromResource);
295+
296+
// OpenMetadata-specific upstream for compatibility
297+
Property upstream = model.createProperty("https://open-metadata.org/ontology/", "UPSTREAM");
298+
fromResource.addProperty(upstream, toResource);
299+
300+
if (lineageDetails != null) {
301+
String detailsUri =
302+
config.getBaseUri().toString()
303+
+ "lineageDetails/"
304+
+ fromId
305+
+ "/"
306+
+ toId
307+
+ "/"
308+
+ System.currentTimeMillis();
309+
Resource detailsResource = model.createResource(detailsUri);
310+
311+
Property hasLineageDetails =
312+
model.createProperty("https://open-metadata.org/ontology/", "hasLineageDetails");
313+
fromResource.addProperty(hasLineageDetails, detailsResource);
314+
315+
detailsResource.addProperty(
316+
model.createProperty("http://www.w3.org/1999/02/22-rdf-syntax-ns#", "type"),
317+
model.createResource("https://open-metadata.org/ontology/LineageDetails"));
318+
319+
if (lineageDetails.getSqlQuery() != null && !lineageDetails.getSqlQuery().isEmpty()) {
320+
detailsResource.addProperty(
321+
model.createProperty("https://open-metadata.org/ontology/", "sqlQuery"),
322+
lineageDetails.getSqlQuery());
323+
}
324+
325+
if (lineageDetails.getSource() != null) {
326+
detailsResource.addProperty(
327+
model.createProperty("https://open-metadata.org/ontology/", "lineageSource"),
328+
lineageDetails.getSource().value());
329+
}
330+
331+
if (lineageDetails.getDescription() != null && !lineageDetails.getDescription().isEmpty()) {
332+
detailsResource.addProperty(
333+
model.createProperty("http://purl.org/dc/terms/", "description"),
334+
lineageDetails.getDescription());
335+
}
336+
337+
if (lineageDetails.getPipeline() != null && lineageDetails.getPipeline().getId() != null) {
338+
EntityReference pipeline = lineageDetails.getPipeline();
339+
String pipelineType = pipeline.getType() != null ? pipeline.getType() : "pipeline";
340+
String pipelineUri =
341+
config.getBaseUri().toString() + "entity/" + pipelineType + "/" + pipeline.getId();
342+
Resource pipelineResource = model.createResource(pipelineUri);
343+
344+
detailsResource.addProperty(
345+
model.createProperty("http://www.w3.org/ns/prov#", "wasGeneratedBy"),
346+
pipelineResource);
347+
}
348+
349+
if (lineageDetails.getColumnsLineage() != null
350+
&& !lineageDetails.getColumnsLineage().isEmpty()) {
351+
Property hasColumnLineage =
352+
model.createProperty("https://open-metadata.org/ontology/", "hasColumnLineage");
353+
354+
for (org.openmetadata.schema.type.ColumnLineage colLineage :
355+
lineageDetails.getColumnsLineage()) {
356+
String colLineageUri = detailsUri + "/columnLineage/" + System.nanoTime();
357+
Resource colLineageResource = model.createResource(colLineageUri);
358+
359+
detailsResource.addProperty(hasColumnLineage, colLineageResource);
360+
colLineageResource.addProperty(
361+
model.createProperty("http://www.w3.org/1999/02/22-rdf-syntax-ns#", "type"),
362+
model.createResource("https://open-metadata.org/ontology/ColumnLineage"));
363+
364+
if (colLineage.getFromColumns() != null) {
365+
Property fromColumnProp =
366+
model.createProperty("https://open-metadata.org/ontology/", "fromColumn");
367+
for (String fromCol : colLineage.getFromColumns()) {
368+
colLineageResource.addProperty(fromColumnProp, fromCol);
369+
}
370+
}
371+
372+
if (colLineage.getToColumn() != null) {
373+
colLineageResource.addProperty(
374+
model.createProperty("https://open-metadata.org/ontology/", "toColumn"),
375+
colLineage.getToColumn());
376+
}
377+
378+
if (colLineage.getFunction() != null) {
379+
colLineageResource.addProperty(
380+
model.createProperty("https://open-metadata.org/ontology/", "transformFunction"),
381+
colLineage.getFunction());
382+
}
383+
}
384+
}
385+
386+
if (lineageDetails.getCreatedAt() != null) {
387+
detailsResource.addProperty(
388+
model.createProperty("http://purl.org/dc/terms/", "created"),
389+
model.createTypedLiteral(
390+
lineageDetails.getCreatedAt().toString(),
391+
org.apache.jena.datatypes.xsd.XSDDatatype.XSDlong));
392+
}
393+
if (lineageDetails.getUpdatedAt() != null) {
394+
detailsResource.addProperty(
395+
model.createProperty("http://purl.org/dc/terms/", "modified"),
396+
model.createTypedLiteral(
397+
lineageDetails.getUpdatedAt().toString(),
398+
org.apache.jena.datatypes.xsd.XSDDatatype.XSDlong));
399+
}
400+
401+
if (lineageDetails.getCreatedBy() != null) {
402+
detailsResource.addProperty(
403+
model.createProperty("https://open-metadata.org/ontology/", "lineageCreatedBy"),
404+
lineageDetails.getCreatedBy());
405+
}
406+
if (lineageDetails.getUpdatedBy() != null) {
407+
detailsResource.addProperty(
408+
model.createProperty("https://open-metadata.org/ontology/", "lineageUpdatedBy"),
409+
lineageDetails.getUpdatedBy());
410+
}
411+
}
412+
413+
// Idempotent delete/insert pattern ensures no duplicate triples
414+
java.io.StringWriter writer = new java.io.StringWriter();
415+
model.write(writer, "N-TRIPLES");
416+
String triples = writer.toString();
417+
418+
if (!triples.isEmpty()) {
419+
String deleteQuery =
420+
String.format(
421+
"DELETE WHERE { GRAPH <%s> { <%s> <https://open-metadata.org/ontology/UPSTREAM> <%s> . } }; "
422+
+ "DELETE WHERE { GRAPH <%s> { <%s> <http://www.w3.org/ns/prov#wasDerivedFrom> <%s> . } }",
423+
KNOWLEDGE_GRAPH, fromUri, toUri, KNOWLEDGE_GRAPH, toUri, fromUri);
424+
425+
storageService.executeSparqlUpdate(deleteQuery);
426+
427+
StringBuilder insertQuery = new StringBuilder();
428+
insertQuery.append("INSERT DATA { GRAPH <").append(KNOWLEDGE_GRAPH).append("> { ");
429+
insertQuery.append(triples);
430+
insertQuery.append(" } }");
431+
432+
storageService.executeSparqlUpdate(insertQuery.toString());
433+
LOG.debug("Added lineage with details from {}/{} to {}/{}", fromType, fromId, toType, toId);
434+
}
435+
} catch (Exception e) {
436+
LOG.error(
437+
"Failed to add lineage with details from {}/{} to {}/{}",
438+
fromType,
439+
fromId,
440+
toType,
441+
toId,
442+
e);
443+
}
444+
}
445+
271446
public void removeRelationship(EntityRelationship relationship) {
272447
if (!isEnabled()) {
273448
return;
@@ -355,9 +530,41 @@ public String executeSparqlQuery(String query, String format) {
355530
throw new IllegalStateException("RDF not enabled");
356531
}
357532

533+
// Check if inference is enabled by default in configuration
534+
if (isInferenceEnabledByDefault()) {
535+
String defaultLevel = getDefaultInferenceLevel();
536+
return executeSparqlQueryWithInference(query, format, defaultLevel);
537+
}
538+
358539
return storageService.executeSparqlQuery(query, format);
359540
}
360541

542+
/**
543+
* Execute SPARQL query without inference, regardless of configuration. Use this for internal
544+
* queries where inference overhead is not needed.
545+
*/
546+
public String executeSparqlQueryDirect(String query, String format) {
547+
if (!isEnabled()) {
548+
throw new IllegalStateException("RDF not enabled");
549+
}
550+
return storageService.executeSparqlQuery(query, format);
551+
}
552+
553+
public boolean isInferenceEnabledByDefault() {
554+
return config.getInferenceEnabled() != null && config.getInferenceEnabled();
555+
}
556+
557+
public String getDefaultInferenceLevel() {
558+
if (config.getDefaultInferenceLevel() != null) {
559+
return config.getDefaultInferenceLevel().value();
560+
}
561+
return "NONE";
562+
}
563+
564+
public RdfConfiguration getConfig() {
565+
return config;
566+
}
567+
361568
public List<Map<String, String>> executeSparqlQueryAsJson(String query) {
362569
String result = executeSparqlQuery(query, "json");
363570
return parseSparqlJsonResults(result);

0 commit comments

Comments
 (0)