From 92c10b50d3cd1b3d7de7b4c894f319d9873075a8 Mon Sep 17 00:00:00 2001 From: Ram Narayan Balaji Date: Wed, 1 Apr 2026 13:34:20 +0530 Subject: [PATCH 1/9] Feat(Workflows): Auto-inject inclusive gateways for batch routing Check nodes (CheckEntityAttributes, CheckChangeDescription, DataCompleteness) now emit boolean flag variables (hasTrueEntities, hasFalseEntities, has__entities) instead of a single RESULT_VARIABLE. MainWorkflow auto-detects split/join points at BPMN build time via NodeInterface.getOutputPorts() and injects InclusiveGateways so both true and false branches fire simultaneously when a batch has mixed pass/fail entities. Co-Authored-By: Claude Sonnet 4.6 --- .../governance/workflows/Workflow.java | 2 + .../workflows/elements/NodeInterface.java | 11 + .../CheckChangeDescriptionTask.java | 6 + .../CheckEntityAttributesTask.java | 6 + .../automatedTask/DataCompletenessTask.java | 13 + .../impl/CheckChangeDescriptionTaskImpl.java | 7 +- .../impl/CheckEntityAttributesImpl.java | 7 +- .../impl/DataCompletenessImpl.java | 3 +- .../workflows/flowable/MainWorkflow.java | 194 +++++++++++++- .../builders/InclusiveGatewayBuilder.java | 38 +++ .../CheckChangeDescriptionTaskImplTest.java | 15 +- .../impl/CheckEntityAttributesImplTest.java | 15 +- .../MainWorkflowInclusiveGatewayTest.java | 251 ++++++++++++++++++ .../builders/InclusiveGatewayBuilderTest.java | 39 +++ 14 files changed, 579 insertions(+), 28 deletions(-) create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilder.java create mode 100644 openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflowInclusiveGatewayTest.java create mode 100644 openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilderTest.java diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/Workflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/Workflow.java index 7508e6ef3373..09ad9274dd51 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/Workflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/Workflow.java @@ -16,6 +16,8 @@ public class Workflow { public static final String ENTITY_LIST_VARIABLE = "entityList"; public static final String TRUE_ENTITY_LIST_VARIABLE = "true_entityList"; public static final String FALSE_ENTITY_LIST_VARIABLE = "false_entityList"; + public static final String HAS_TRUE_ENTITIES_VARIABLE = "hasTrueEntities"; + public static final String HAS_FALSE_ENTITIES_VARIABLE = "hasFalseEntities"; public static final String BATCH_SINK_PROCESSED_VARIABLE = "batchSinkProcessed"; public static final String TRIGGERING_OBJECT_ID_VARIABLE = "triggeringObjectId"; public static final String RECOGNIZER_FEEDBACK = "recognizerFeedback"; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/NodeInterface.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/NodeInterface.java index ec1fd36990f8..6be633495821 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/NodeInterface.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/NodeInterface.java @@ -5,6 +5,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Set; import org.flowable.bpmn.model.Activity; import org.flowable.bpmn.model.BoundaryEvent; import org.flowable.bpmn.model.BpmnModel; @@ -22,6 +23,16 @@ public interface NodeInterface { void addToWorkflow(BpmnModel model, Process process); + /** + * Returns the output ports this node can produce. Nodes that split a batch into multiple + * entity lists (e.g. CheckEntityAttributes → "true"/"false") declare their ports here. + * The MainWorkflow uses this to automatically inject inclusive gateways at split/join points. + * The default (empty set) means no inclusive gateway is injected for this node. + */ + default Set getOutputPorts() { + return Set.of(); + } + default BoundaryEvent getRuntimeExceptionBoundaryEvent() { return null; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CheckChangeDescriptionTask.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CheckChangeDescriptionTask.java index d6af66cfad0f..e42c2cde8a93 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CheckChangeDescriptionTask.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CheckChangeDescriptionTask.java @@ -6,6 +6,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.Set; import lombok.extern.slf4j.Slf4j; import org.flowable.bpmn.model.BoundaryEvent; import org.flowable.bpmn.model.BpmnModel; @@ -84,6 +85,11 @@ public CheckChangeDescriptionTask( this.subProcess = subProcess; } + @Override + public Set getOutputPorts() { + return Set.of("true", "false"); + } + @Override public BoundaryEvent getRuntimeExceptionBoundaryEvent() { return runtimeExceptionBoundaryEvent; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CheckEntityAttributesTask.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CheckEntityAttributesTask.java index 6c8d29a551ac..0046884a0713 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CheckEntityAttributesTask.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CheckEntityAttributesTask.java @@ -6,6 +6,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.Set; import lombok.extern.slf4j.Slf4j; import org.flowable.bpmn.model.BoundaryEvent; import org.flowable.bpmn.model.BpmnModel; @@ -77,6 +78,11 @@ public CheckEntityAttributesTask( this.subProcess = subProcess; } + @Override + public Set getOutputPorts() { + return Set.of("true", "false"); + } + @Override public BoundaryEvent getRuntimeExceptionBoundaryEvent() { return runtimeExceptionBoundaryEvent; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/DataCompletenessTask.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/DataCompletenessTask.java index bbb3ffb6c29d..6236e2f475a5 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/DataCompletenessTask.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/DataCompletenessTask.java @@ -7,6 +7,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.flowable.bpmn.model.BoundaryEvent; import org.flowable.bpmn.model.BpmnModel; @@ -19,6 +21,7 @@ import org.flowable.bpmn.model.SubProcess; import org.openmetadata.schema.governance.workflows.WorkflowConfiguration; import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.DataCompletenessTaskDefinition; +import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.QualityBand; import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.service.governance.workflows.elements.NodeInterface; import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl.DataCompletenessImpl; @@ -32,9 +35,14 @@ public class DataCompletenessTask implements NodeInterface { private final SubProcess subProcess; private final BoundaryEvent runtimeExceptionBoundaryEvent; + private final Set outputPorts; public DataCompletenessTask( DataCompletenessTaskDefinition nodeDefinition, WorkflowConfiguration workflowConfig) { + this.outputPorts = + nodeDefinition.getConfig().getQualityBands().stream() + .map(QualityBand::getName) + .collect(Collectors.toSet()); String subProcessId = nodeDefinition.getName(); SubProcess subProcess = new SubProcessBuilder().id(subProcessId).build(); @@ -107,6 +115,11 @@ private ServiceTask getDataCompletenessServiceTask( return builder.build(); } + @Override + public Set getOutputPorts() { + return outputPorts; + } + @Override public void addToWorkflow(BpmnModel model, Process process) { process.addFlowElement(subProcess); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckChangeDescriptionTaskImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckChangeDescriptionTaskImpl.java index ed96ca5dda14..082d01256c1c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckChangeDescriptionTaskImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckChangeDescriptionTaskImpl.java @@ -2,7 +2,8 @@ import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.FALSE_ENTITY_LIST_VARIABLE; -import static org.openmetadata.service.governance.workflows.Workflow.RESULT_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.HAS_FALSE_ENTITIES_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.HAS_TRUE_ENTITIES_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.TRUE_ENTITY_LIST_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION; import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId; @@ -77,10 +78,10 @@ public void execute(DelegateExecution execution) { } } - boolean result = !trueEntityList.isEmpty(); varHandler.setNodeVariable(TRUE_ENTITY_LIST_VARIABLE, trueEntityList); varHandler.setNodeVariable(FALSE_ENTITY_LIST_VARIABLE, falseEntityList); - varHandler.setNodeVariable(RESULT_VARIABLE, result); + varHandler.setNodeVariable(HAS_TRUE_ENTITIES_VARIABLE, !trueEntityList.isEmpty()); + varHandler.setNodeVariable(HAS_FALSE_ENTITIES_VARIABLE, !falseEntityList.isEmpty()); } catch (Exception exc) { LOG.error( "[{}] Failure: ", getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()), exc); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImpl.java index fcac337b762f..bcd4a089f047 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImpl.java @@ -2,7 +2,8 @@ import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.FALSE_ENTITY_LIST_VARIABLE; -import static org.openmetadata.service.governance.workflows.Workflow.RESULT_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.HAS_FALSE_ENTITIES_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.HAS_TRUE_ENTITIES_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.TRUE_ENTITY_LIST_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION; import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId; @@ -69,10 +70,10 @@ public void execute(DelegateExecution execution) { } } - boolean result = !trueEntityList.isEmpty(); varHandler.setNodeVariable(TRUE_ENTITY_LIST_VARIABLE, trueEntityList); varHandler.setNodeVariable(FALSE_ENTITY_LIST_VARIABLE, falseEntityList); - varHandler.setNodeVariable(RESULT_VARIABLE, result); + varHandler.setNodeVariable(HAS_TRUE_ENTITIES_VARIABLE, !trueEntityList.isEmpty()); + varHandler.setNodeVariable(HAS_FALSE_ENTITIES_VARIABLE, !falseEntityList.isEmpty()); } catch (Exception exc) { LOG.error( "[{}] Failure: ", getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()), exc); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImpl.java index b5f22cc641a3..9980417191f7 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImpl.java @@ -2,7 +2,6 @@ import static org.openmetadata.service.governance.workflows.Workflow.ENTITY_LIST_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; -import static org.openmetadata.service.governance.workflows.Workflow.RESULT_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION; import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId; @@ -125,6 +124,7 @@ public void execute(DelegateExecution execution) { for (QualityBand band : qualityBands) { List bandEntities = entitiesByBand.getOrDefault(band.getName(), List.of()); varHandler.setNodeVariable(band.getName() + "_" + ENTITY_LIST_VARIABLE, bandEntities); + varHandler.setNodeVariable("has_" + band.getName() + "_entities", !bandEntities.isEmpty()); } // Priority band = highest minimumScore band that has entities @@ -142,7 +142,6 @@ public void execute(DelegateExecution execution) { priorityBand, entitiesByBand.keySet()); - varHandler.setNodeVariable(RESULT_VARIABLE, priorityBand); varHandler.setNodeVariable("entityResults", entityResults); // Scalar outputs for backward compat when processing a single entity diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflow.java index c1e757f1c448..789565a76334 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflow.java @@ -1,6 +1,9 @@ package org.openmetadata.service.governance.workflows.flowable; import static org.openmetadata.service.governance.workflows.Workflow.GLOBAL_NAMESPACE; +import static org.openmetadata.service.governance.workflows.Workflow.HAS_FALSE_ENTITIES_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.HAS_TRUE_ENTITIES_VARIABLE; +import static org.openmetadata.service.governance.workflows.WorkflowVariableHandler.getNamespacedVariableName; import java.util.ArrayList; import java.util.HashMap; @@ -14,6 +17,7 @@ import lombok.Getter; import org.flowable.bpmn.model.BoundaryEvent; import org.flowable.bpmn.model.BpmnModel; +import org.flowable.bpmn.model.InclusiveGateway; import org.flowable.bpmn.model.Process; import org.flowable.bpmn.model.SequenceFlow; import org.openmetadata.schema.governance.workflows.WorkflowDefinition; @@ -24,6 +28,7 @@ import org.openmetadata.service.governance.workflows.elements.NodeFactory; import org.openmetadata.service.governance.workflows.elements.NodeInterface; import org.openmetadata.service.governance.workflows.elements.nodes.endEvent.EndEvent; +import org.openmetadata.service.governance.workflows.flowable.builders.InclusiveGatewayBuilder; @Getter public class MainWorkflow { @@ -45,20 +50,70 @@ public MainWorkflow(WorkflowDefinition workflowDefinition) { .orElse(workflowDefinition.getFullyQualifiedName())); model.addProcess(process); - // Add Nodes + List edges = workflowDefinition.getEdges(); + + // Add Nodes and collect instances for gateway detection + Map nodeInstanceMap = new HashMap<>(); for (WorkflowNodeDefinitionInterface nodeDefinitionObj : workflowDefinition.getNodes()) { NodeInterface node = NodeFactory.createNode(nodeDefinitionObj, workflowDefinition.getConfig()); + nodeInstanceMap.put(nodeDefinitionObj.getName(), node); node.addToWorkflow(model, process); Optional.ofNullable(node.getRuntimeExceptionBoundaryEvent()) .ifPresent(runtimeExceptionBoundaryEvents::add); } - // Add Edges - for (EdgeDefinition edgeDefinition : workflowDefinition.getEdges()) { - Edge edge = new Edge(edgeDefinition); - edge.addToWorkflow(model, process); + // Detect where inclusive gateways need to be injected + Map> outgoingEdges = buildOutgoingEdgesMap(edges); + Map> incomingEdges = buildIncomingEdgesMap(edges); + Set splitNodes = detectSplitNodes(outgoingEdges, nodeInstanceMap); + Set joinNodes = detectJoinNodes(incomingEdges, splitNodes); + + // Add split gateways and their outgoing conditional flows + for (String splitNode : splitNodes) { + String gatewayId = splitGatewayId(splitNode); + InclusiveGateway gateway = new InclusiveGatewayBuilder().id(gatewayId).build(); + process.addFlowElement(gateway); + process.addFlowElement(new SequenceFlow(splitNode, gatewayId)); + + for (EdgeDefinition edge : outgoingEdges.get(splitNode)) { + String targetId = + joinNodes.contains(edge.getTo()) ? joinGatewayId(edge.getTo()) : edge.getTo(); + SequenceFlow flow = new SequenceFlow(gatewayId, targetId); + flow.setConditionExpression(buildGatewayCondition(splitNode, edge.getCondition())); + process.addFlowElement(flow); + } + } + + // Add join gateways and their single outgoing flow to the target node + for (String joinNode : joinNodes) { + String gatewayId = joinGatewayId(joinNode); + InclusiveGateway gateway = new InclusiveGatewayBuilder().id(gatewayId).build(); + process.addFlowElement(gateway); + process.addFlowElement(new SequenceFlow(gatewayId, joinNode)); + } + + // Add normal edges (not handled by split/join gateways) + for (EdgeDefinition edgeDefinition : edges) { + String from = edgeDefinition.getFrom(); + String to = edgeDefinition.getTo(); + boolean fromSplit = splitNodes.contains(from); + boolean toJoin = joinNodes.contains(to); + + if (fromSplit) { + // Already wired above via the split gateway + continue; + } + + if (toJoin) { + // Incoming to a join node — wire to the join gateway instead + SequenceFlow flow = new SequenceFlow(from, joinGatewayId(to)); + process.addFlowElement(flow); + } else { + Edge edge = new Edge(edgeDefinition); + edge.addToWorkflow(model, process); + } } // Configure Exception Flow @@ -68,6 +123,130 @@ public MainWorkflow(WorkflowDefinition workflowDefinition) { this.workflowName = workflowName; } + private Map> buildOutgoingEdgesMap(List edges) { + Map> map = new HashMap<>(); + for (EdgeDefinition edge : edges) { + map.computeIfAbsent(edge.getFrom(), k -> new ArrayList<>()).add(edge); + } + return map; + } + + private Map> buildIncomingEdgesMap(List edges) { + Map> map = new HashMap<>(); + for (EdgeDefinition edge : edges) { + map.computeIfAbsent(edge.getTo(), k -> new ArrayList<>()).add(edge); + } + return map; + } + + /** + * A node is a split point if it has ≥2 outgoing conditional edges AND explicitly declares + * multiple output ports via {@link NodeInterface#getOutputPorts()}. This ensures only batch + * check nodes (CheckEntityAttributes, CheckChangeDescription, DataCompleteness) get inclusive + * gateways — not user approval tasks or other nodes that use result-based conditional routing. + */ + private Set detectSplitNodes( + Map> outgoingEdges, Map nodeInstanceMap) { + Set splits = new HashSet<>(); + for (Map.Entry> entry : outgoingEdges.entrySet()) { + String nodeName = entry.getKey(); + NodeInterface node = nodeInstanceMap.get(nodeName); + if (node == null || node.getOutputPorts().size() < 2) { + continue; + } + long conditionalEdgeCount = + entry.getValue().stream() + .filter(e -> e.getCondition() != null && !e.getCondition().isEmpty()) + .count(); + if (conditionalEdgeCount >= 2) { + splits.add(nodeName); + } + } + return splits; + } + + /** + * A node is a join point if it has ≥2 incoming edges whose source nodes all trace back + * to the same split node (directly or transitively). An inclusive join gateway is needed + * to synchronize the parallel branches before continuing. + */ + private Set detectJoinNodes( + Map> incomingEdges, Set splitNodes) { + Set joins = new HashSet<>(); + for (Map.Entry> entry : incomingEdges.entrySet()) { + String targetNode = entry.getKey(); + List incoming = entry.getValue(); + if (incoming.size() < 2) { + continue; + } + // Find split ancestors reachable from each source node + List> splitAncestorSets = new ArrayList<>(); + for (EdgeDefinition edge : incoming) { + splitAncestorSets.add( + findSplitAncestors(edge.getFrom(), incomingEdges, splitNodes, new HashSet<>())); + } + // If any split node appears in ALL ancestor sets, this node needs a join gateway + Set common = new HashSet<>(splitAncestorSets.get(0)); + for (int i = 1; i < splitAncestorSets.size(); i++) { + common.retainAll(splitAncestorSets.get(i)); + } + if (!common.isEmpty()) { + joins.add(targetNode); + } + } + return joins; + } + + private Set findSplitAncestors( + String nodeName, + Map> incomingEdges, + Set splitNodes, + Set visited) { + Set ancestors = new HashSet<>(); + if (visited.contains(nodeName)) { + return ancestors; + } + visited.add(nodeName); + if (splitNodes.contains(nodeName)) { + ancestors.add(nodeName); + } + List incoming = incomingEdges.get(nodeName); + if (incoming != null) { + for (EdgeDefinition edge : incoming) { + ancestors.addAll(findSplitAncestors(edge.getFrom(), incomingEdges, splitNodes, visited)); + } + } + return ancestors; + } + + /** + * Generates the inclusive gateway condition expression for an outgoing edge from a split node. + * Maps the edge condition value to the appropriate boolean flag variable set by the node impl. + */ + private String buildGatewayCondition(String nodeName, String edgeCondition) { + if (edgeCondition == null || edgeCondition.isEmpty()) { + return null; + } + String flagVariable; + if ("true".equals(edgeCondition)) { + flagVariable = getNamespacedVariableName(nodeName, HAS_TRUE_ENTITIES_VARIABLE); + } else if ("false".equals(edgeCondition)) { + flagVariable = getNamespacedVariableName(nodeName, HAS_FALSE_ENTITIES_VARIABLE); + } else { + // DataCompleteness band name (e.g., "gold", "silver") + flagVariable = getNamespacedVariableName(nodeName, "has_" + edgeCondition + "_entities"); + } + return String.format("${%s}", flagVariable); + } + + private static String splitGatewayId(String nodeName) { + return nodeName + "_inclusiveSplit"; + } + + private static String joinGatewayId(String nodeName) { + return nodeName + "_inclusiveJoin"; + } + private void configureRuntimeExceptionFlow(Process process) { EndEvent errorEndEvent = new EndEvent("Error"); process.addFlowElement(errorEndEvent.getEndEvent()); @@ -130,7 +309,6 @@ private void validateNode(WorkflowNodeDefinitionInterface nodeDefinition) { "Invalid Workflow: [%s] is expecting '%s' to be an output from [%s], which it is not.", nodeDefinition.getName(), variable, namespace)); } - // Enhanced validation: Check for reachability instead of direct connection if (!validateNodeIsReachable(nodeDefinition.getName(), namespace)) { throw new RuntimeException( String.format( @@ -176,12 +354,9 @@ private boolean validateNodeHasInput(String nodeName, String inputNodeName) { * For example: A -> B -> C, where C can use outputs from A even without a direct edge. */ private boolean validateNodeIsReachable(String targetNode, String sourceNode) { - // First check for direct connection (fast path) if (validateNodeHasInput(targetNode, sourceNode)) { return true; } - - // If no direct connection, check for transitive path through the graph return canReachThroughGraph(sourceNode, targetNode); } @@ -206,7 +381,6 @@ private boolean canReachThroughGraph(String sourceNode, String targetNode) { } visited.add(current); - // Find all nodes that current node connects to for (Map.Entry> entry : incomingEdgesMap.entrySet()) { if (entry.getValue().contains(current)) { String nextNode = entry.getKey(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilder.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilder.java new file mode 100644 index 000000000000..61366a6e3aa3 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilder.java @@ -0,0 +1,38 @@ +package org.openmetadata.service.governance.workflows.flowable.builders; + +import org.flowable.bpmn.model.InclusiveGateway; + +public class InclusiveGatewayBuilder extends FlowableElementBuilder { + + private boolean async = true; + private boolean exclusive = true; + private String defaultFlow; + + public InclusiveGatewayBuilder setAsync(boolean async) { + this.async = async; + return this; + } + + public InclusiveGatewayBuilder exclusive(boolean exclusive) { + this.exclusive = exclusive; + return this; + } + + public InclusiveGatewayBuilder defaultFlow(String defaultFlowId) { + this.defaultFlow = defaultFlowId; + return this; + } + + @Override + public InclusiveGateway build() { + InclusiveGateway gateway = new InclusiveGateway(); + gateway.setId(id); + gateway.setName(id); + gateway.setAsynchronous(async); + gateway.setExclusive(exclusive); + if (defaultFlow != null) { + gateway.setDefaultFlow(defaultFlow); + } + return gateway; + } +} diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckChangeDescriptionTaskImplTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckChangeDescriptionTaskImplTest.java index f5029b334034..034fa53f84bd 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckChangeDescriptionTaskImplTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckChangeDescriptionTaskImplTest.java @@ -6,7 +6,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.openmetadata.service.governance.workflows.Workflow.FALSE_ENTITY_LIST_VARIABLE; -import static org.openmetadata.service.governance.workflows.Workflow.RESULT_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.HAS_FALSE_ENTITIES_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.HAS_TRUE_ENTITIES_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.TRUE_ENTITY_LIST_VARIABLE; import java.lang.reflect.Field; @@ -76,7 +77,8 @@ void testExecute_NoChangeDescription_ReturnsTrueForCreate() { } verify(execution).setVariable(eq("process_" + TRUE_ENTITY_LIST_VARIABLE), eq(entityList)); - verify(execution).setVariable(eq("process_" + RESULT_VARIABLE), eq(true)); + verify(execution).setVariable(eq("process_" + HAS_TRUE_ENTITIES_VARIABLE), eq(true)); + verify(execution).setVariable(eq("process_" + HAS_FALSE_ENTITIES_VARIABLE), eq(false)); } @Test @@ -112,7 +114,8 @@ void testExecute_WithMatchingChangeDescription() { } verify(execution).setVariable(eq("process_" + TRUE_ENTITY_LIST_VARIABLE), eq(entityList)); - verify(execution).setVariable(eq("process_" + RESULT_VARIABLE), eq(true)); + verify(execution).setVariable(eq("process_" + HAS_TRUE_ENTITIES_VARIABLE), eq(true)); + verify(execution).setVariable(eq("process_" + HAS_FALSE_ENTITIES_VARIABLE), eq(false)); } @Test @@ -148,7 +151,8 @@ void testExecute_WithNonMatchingChangeDescription() { } verify(execution).setVariable(eq("process_" + FALSE_ENTITY_LIST_VARIABLE), eq(entityList)); - verify(execution).setVariable(eq("process_" + RESULT_VARIABLE), eq(false)); + verify(execution).setVariable(eq("process_" + HAS_TRUE_ENTITIES_VARIABLE), eq(false)); + verify(execution).setVariable(eq("process_" + HAS_FALSE_ENTITIES_VARIABLE), eq(true)); } @Test @@ -183,7 +187,8 @@ void testExecute_AndConditionAllMatch() { impl.execute(execution); } - verify(execution).setVariable(eq("process_" + RESULT_VARIABLE), eq(true)); + verify(execution).setVariable(eq("process_" + HAS_TRUE_ENTITIES_VARIABLE), eq(true)); + verify(execution).setVariable(eq("process_" + HAS_FALSE_ENTITIES_VARIABLE), eq(false)); } @Test diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImplTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImplTest.java index d58a68c4a39a..6b6dc2076c4d 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImplTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImplTest.java @@ -7,7 +7,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.openmetadata.service.governance.workflows.Workflow.FALSE_ENTITY_LIST_VARIABLE; -import static org.openmetadata.service.governance.workflows.Workflow.RESULT_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.HAS_FALSE_ENTITIES_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.HAS_TRUE_ENTITIES_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.TRUE_ENTITY_LIST_VARIABLE; import java.lang.reflect.Field; @@ -72,7 +73,8 @@ void testExecute_EntityMatchesRule() { } verify(execution).setVariable(eq("process_" + TRUE_ENTITY_LIST_VARIABLE), eq(entityList)); - verify(execution).setVariable(eq("process_" + RESULT_VARIABLE), eq(true)); + verify(execution).setVariable(eq("process_" + HAS_TRUE_ENTITIES_VARIABLE), eq(true)); + verify(execution).setVariable(eq("process_" + HAS_FALSE_ENTITIES_VARIABLE), eq(false)); } @Test @@ -97,7 +99,8 @@ void testExecute_EntityDoesNotMatchRule() { } verify(execution).setVariable(eq("process_" + FALSE_ENTITY_LIST_VARIABLE), eq(entityList)); - verify(execution).setVariable(eq("process_" + RESULT_VARIABLE), eq(false)); + verify(execution).setVariable(eq("process_" + HAS_TRUE_ENTITIES_VARIABLE), eq(false)); + verify(execution).setVariable(eq("process_" + HAS_FALSE_ENTITIES_VARIABLE), eq(true)); } @Test @@ -110,7 +113,8 @@ void testExecute_EmptyEntityList() { impl.execute(execution); - verify(execution).setVariable(eq("process_" + RESULT_VARIABLE), eq(false)); + verify(execution).setVariable(eq("process_" + HAS_TRUE_ENTITIES_VARIABLE), eq(false)); + verify(execution).setVariable(eq("process_" + HAS_FALSE_ENTITIES_VARIABLE), eq(false)); } @Test @@ -149,7 +153,8 @@ void testExecute_MultipleEntities_PartialMatch() { impl.execute(execution); } - verify(execution).setVariable(eq("process_" + RESULT_VARIABLE), eq(true)); + verify(execution).setVariable(eq("process_" + HAS_TRUE_ENTITIES_VARIABLE), eq(true)); + verify(execution).setVariable(eq("process_" + HAS_FALSE_ENTITIES_VARIABLE), eq(true)); } private void injectExpression(Object target, String fieldName, Expression value) diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflowInclusiveGatewayTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflowInclusiveGatewayTest.java new file mode 100644 index 000000000000..0d4386e669f1 --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflowInclusiveGatewayTest.java @@ -0,0 +1,251 @@ +package org.openmetadata.service.governance.workflows.flowable; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Arrays; +import java.util.List; +import org.flowable.bpmn.model.FlowElement; +import org.flowable.bpmn.model.InclusiveGateway; +import org.flowable.bpmn.model.Process; +import org.flowable.bpmn.model.SequenceFlow; +import org.junit.jupiter.api.Test; +import org.openmetadata.schema.governance.workflows.WorkflowConfiguration; +import org.openmetadata.schema.governance.workflows.WorkflowDefinition; +import org.openmetadata.schema.governance.workflows.elements.EdgeDefinition; +import org.openmetadata.schema.governance.workflows.elements.WorkflowNodeDefinitionInterface; +import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.CheckEntityAttributesTaskDefinition; +import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.Config__2; +import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.Config__4; +import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.DataCompletenessTaskDefinition; +import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.QualityBand; +import org.openmetadata.schema.governance.workflows.elements.nodes.endEvent.EndEventDefinition; +import org.openmetadata.schema.governance.workflows.elements.nodes.startEvent.StartEventDefinition; +import org.openmetadata.schema.governance.workflows.elements.triggers.NoOpTriggerDefinition; + +class MainWorkflowInclusiveGatewayTest { + + @Test + void testSplitGatewayInserted_WhenCheckNodeHasTwoConditionalOutgoingEdges() { + // Workflow: start → check → cert(true) / notify(false) → independent ends + WorkflowDefinition workflow = + createWorkflow( + "SplitOnlyWorkflow", + List.of( + startEvent("start"), + checkAttributesNode("check"), + endEvent("certEnd"), + endEvent("notifyEnd")), + List.of( + edge("start", "check", null), + edge("check", "cert", "true"), + edge("check", "notify", "false"), + edge("cert", "certEnd", null), + edge("notify", "notifyEnd", null))); + + MainWorkflow mainWorkflow = new MainWorkflow(workflow); + Process process = mainWorkflow.getModel().getProcesses().get(0); + + // Split gateway should be injected after "check" + FlowElement splitGateway = process.getFlowElement("check_inclusiveSplit"); + assertNotNull(splitGateway, "Split gateway should exist after check node"); + assertInstanceOf(InclusiveGateway.class, splitGateway); + + // No join gateway needed — branches lead to separate end events + assertNull(process.getFlowElement("certEnd_inclusiveJoin"), "No join when branches diverge"); + assertNull(process.getFlowElement("notifyEnd_inclusiveJoin"), "No join when branches diverge"); + + // Verify two conditional flows from split gateway + List conditions = + process.getFlowElements().stream() + .filter(e -> e instanceof SequenceFlow) + .map(e -> (SequenceFlow) e) + .filter(f -> "check_inclusiveSplit".equals(f.getSourceRef())) + .map(SequenceFlow::getConditionExpression) + .toList(); + + assertEquals(2, conditions.size()); + assertTrue(conditions.contains("${check_hasTrueEntities}"), "True branch uses hasTrueEntities"); + assertTrue( + conditions.contains("${check_hasFalseEntities}"), "False branch uses hasFalseEntities"); + } + + @Test + void testJoinGatewayInserted_WhenBranchesConverge() { + // Workflow: check → cert(true) / notify(false) → converge on end + WorkflowDefinition workflow = + createWorkflow( + "SplitJoinWorkflow", + List.of(startEvent("start"), checkAttributesNode("check"), endEvent("end")), + List.of( + edge("start", "check", null), + edge("check", "cert", "true"), + edge("check", "notify", "false"), + edge("cert", "end", null), + edge("notify", "end", null))); + + MainWorkflow mainWorkflow = new MainWorkflow(workflow); + Process process = mainWorkflow.getModel().getProcesses().get(0); + + assertNotNull( + process.getFlowElement("check_inclusiveSplit"), "Split gateway should be present"); + + FlowElement joinGateway = process.getFlowElement("end_inclusiveJoin"); + assertNotNull(joinGateway, "Join gateway should exist before end"); + assertInstanceOf(InclusiveGateway.class, joinGateway); + + long flowsToJoin = + process.getFlowElements().stream() + .filter(e -> e instanceof SequenceFlow) + .map(e -> (SequenceFlow) e) + .filter(f -> "end_inclusiveJoin".equals(f.getTargetRef())) + .count(); + assertEquals(2, flowsToJoin, "Both cert and notify should flow into the join gateway"); + + long flowsFromJoin = + process.getFlowElements().stream() + .filter(e -> e instanceof SequenceFlow) + .map(e -> (SequenceFlow) e) + .filter(f -> "end_inclusiveJoin".equals(f.getSourceRef())) + .count(); + assertEquals(1, flowsFromJoin, "Join gateway has exactly one outgoing flow"); + } + + @Test + void testNoGatewaysInserted_WhenNoConditionalEdges() { + WorkflowDefinition workflow = + createWorkflow( + "LinearWorkflow", + List.of(startEvent("start"), endEvent("end")), + List.of(edge("start", "task", null), edge("task", "end", null))); + + MainWorkflow mainWorkflow = new MainWorkflow(workflow); + Process process = mainWorkflow.getModel().getProcesses().get(0); + + long gatewayCount = + process.getFlowElements().stream().filter(e -> e instanceof InclusiveGateway).count(); + assertEquals(0, gatewayCount, "No gateways for linear workflows"); + } + + @Test + void testNoGatewayInserted_ForNonCheckNodeWithConditionalEdges() { + // UserApproval (not a check node) has "true"/"false" edges — should NOT get inclusive gateway + WorkflowDefinition workflow = + createWorkflow( + "UserApprovalWorkflow", + List.of(startEvent("start"), endEvent("endTrue"), endEvent("endFalse")), + List.of( + edge("start", "UserApproval", null), + edge("UserApproval", "nextTrue", "true"), + edge("UserApproval", "nextFalse", "false"))); + + MainWorkflow mainWorkflow = new MainWorkflow(workflow); + Process process = mainWorkflow.getModel().getProcesses().get(0); + + // UserApproval is not in the nodes list → nodeInstanceMap has no entry → no gateway + assertNull( + process.getFlowElement("UserApproval_inclusiveSplit"), + "Non-check node should not get an inclusive gateway"); + } + + @Test + void testDataCompleteness_BandConditionsGenerateCorrectFlags() { + WorkflowDefinition workflow = + createWorkflow( + "DataQualityWorkflow", + List.of( + startEvent("start"), + dataCompletenessNode("dataQuality", "gold", "silver"), + endEvent("goldEnd"), + endEvent("silverEnd")), + List.of( + edge("start", "dataQuality", null), + edge("dataQuality", "goldAction", "gold"), + edge("dataQuality", "silverAction", "silver"), + edge("goldAction", "goldEnd", null), + edge("silverAction", "silverEnd", null))); + + MainWorkflow mainWorkflow = new MainWorkflow(workflow); + Process process = mainWorkflow.getModel().getProcesses().get(0); + + assertNotNull( + process.getFlowElement("dataQuality_inclusiveSplit"), + "Split gateway for data quality node"); + + List conditions = + process.getFlowElements().stream() + .filter(e -> e instanceof SequenceFlow) + .map(e -> (SequenceFlow) e) + .filter(f -> "dataQuality_inclusiveSplit".equals(f.getSourceRef())) + .map(SequenceFlow::getConditionExpression) + .toList(); + + assertTrue(conditions.contains("${dataQuality_has_gold_entities}"), "Gold band condition"); + assertTrue(conditions.contains("${dataQuality_has_silver_entities}"), "Silver band condition"); + } + + // --- Helpers --- + + private WorkflowDefinition createWorkflow( + String name, List nodes, List edges) { + WorkflowDefinition workflow = new WorkflowDefinition(); + workflow.setName(name); + workflow.setFullyQualifiedName(name); + workflow.setConfig(new WorkflowConfiguration()); + workflow.setTrigger(new NoOpTriggerDefinition()); + workflow.setNodes(nodes); + workflow.setEdges(edges); + return workflow; + } + + private StartEventDefinition startEvent(String name) { + StartEventDefinition def = new StartEventDefinition(); + def.setName(name); + return def; + } + + private EndEventDefinition endEvent(String name) { + EndEventDefinition def = new EndEventDefinition(); + def.setName(name); + return def; + } + + private CheckEntityAttributesTaskDefinition checkAttributesNode(String name) { + Config__2 config = new Config__2(); + config.setRules("[]"); + CheckEntityAttributesTaskDefinition def = new CheckEntityAttributesTaskDefinition(); + def.setName(name); + def.setConfig(config); + return def; + } + + private DataCompletenessTaskDefinition dataCompletenessNode(String name, String... bandNames) { + List bands = + Arrays.stream(bandNames) + .map( + n -> { + QualityBand b = new QualityBand(); + b.setName(n); + b.setMinimumScore(0.0); + return b; + }) + .toList(); + Config__4 config = new Config__4(); + config.setQualityBands(bands); + DataCompletenessTaskDefinition def = new DataCompletenessTaskDefinition(); + def.setName(name); + def.setConfig(config); + return def; + } + + private EdgeDefinition edge(String from, String to, String condition) { + EdgeDefinition def = new EdgeDefinition(); + def.setFrom(from); + def.setTo(to); + def.setCondition(condition); + return def; + } +} diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilderTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilderTest.java new file mode 100644 index 000000000000..20dd6e8fa4f6 --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilderTest.java @@ -0,0 +1,39 @@ +package org.openmetadata.service.governance.workflows.flowable.builders; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.flowable.bpmn.model.InclusiveGateway; +import org.junit.jupiter.api.Test; + +class InclusiveGatewayBuilderTest { + + @Test + void testBuildDefaultsToAsyncExclusive() { + InclusiveGateway gateway = new InclusiveGatewayBuilder().id("splitGateway").build(); + + assertEquals("splitGateway", gateway.getId()); + assertEquals("splitGateway", gateway.getName()); + assertTrue(gateway.isAsynchronous()); + assertTrue(gateway.isExclusive()); + assertNull(gateway.getDefaultFlow()); + } + + @Test + void testBuildWithDefaultFlow() { + InclusiveGateway gateway = + new InclusiveGatewayBuilder().id("joinGateway").defaultFlow("flow1").build(); + + assertEquals("joinGateway", gateway.getId()); + assertEquals("flow1", gateway.getDefaultFlow()); + } + + @Test + void testBuildWithAsyncFalse() { + InclusiveGateway gateway = new InclusiveGatewayBuilder().id("g1").setAsync(false).build(); + + assertEquals("g1", gateway.getId()); + assertTrue(!gateway.isAsynchronous()); + } +} From 43d9935d58d676b2659e4684b7b323b1d212ee9f Mon Sep 17 00:00:00 2001 From: Ram Narayan Balaji Date: Wed, 1 Apr 2026 13:52:09 +0530 Subject: [PATCH 2/9] Fix inclusive gateway deadlock on empty entity list; fix test node references - Check nodes (CheckEntityAttributes, CheckChangeDescription) now set hasFalseEntities=true when entity list is empty, ensuring the split inclusive gateway always has at least one active branch and never stalls. - DataCompletenessImpl activates the lowest-score band flag when no entities are assigned to any band (empty input or all entities failed processing). - MainWorkflowInclusiveGatewayTest: replaced dangling edge targets (cert, notify, goldAction etc.) with actual declared end-event nodes so the constructed BPMN model is structurally valid. Co-Authored-By: Claude Sonnet 4.6 --- .../impl/CheckChangeDescriptionTaskImpl.java | 3 +- .../impl/CheckEntityAttributesImpl.java | 3 +- .../impl/DataCompletenessImpl.java | 10 +++++++ .../impl/CheckEntityAttributesImplTest.java | 2 +- .../MainWorkflowInclusiveGatewayTest.java | 28 ++++++++----------- 5 files changed, 26 insertions(+), 20 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckChangeDescriptionTaskImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckChangeDescriptionTaskImpl.java index 082d01256c1c..314b71ae3dc3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckChangeDescriptionTaskImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckChangeDescriptionTaskImpl.java @@ -81,7 +81,8 @@ public void execute(DelegateExecution execution) { varHandler.setNodeVariable(TRUE_ENTITY_LIST_VARIABLE, trueEntityList); varHandler.setNodeVariable(FALSE_ENTITY_LIST_VARIABLE, falseEntityList); varHandler.setNodeVariable(HAS_TRUE_ENTITIES_VARIABLE, !trueEntityList.isEmpty()); - varHandler.setNodeVariable(HAS_FALSE_ENTITIES_VARIABLE, !falseEntityList.isEmpty()); + varHandler.setNodeVariable( + HAS_FALSE_ENTITIES_VARIABLE, !falseEntityList.isEmpty() || entityList.isEmpty()); } catch (Exception exc) { LOG.error( "[{}] Failure: ", getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()), exc); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImpl.java index bcd4a089f047..68b856cbbae5 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImpl.java @@ -73,7 +73,8 @@ public void execute(DelegateExecution execution) { varHandler.setNodeVariable(TRUE_ENTITY_LIST_VARIABLE, trueEntityList); varHandler.setNodeVariable(FALSE_ENTITY_LIST_VARIABLE, falseEntityList); varHandler.setNodeVariable(HAS_TRUE_ENTITIES_VARIABLE, !trueEntityList.isEmpty()); - varHandler.setNodeVariable(HAS_FALSE_ENTITIES_VARIABLE, !falseEntityList.isEmpty()); + varHandler.setNodeVariable( + HAS_FALSE_ENTITIES_VARIABLE, !falseEntityList.isEmpty() || entityList.isEmpty()); } catch (Exception exc) { LOG.error( "[{}] Failure: ", getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()), exc); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImpl.java index 9980417191f7..956e43f9f09e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImpl.java @@ -127,6 +127,16 @@ public void execute(DelegateExecution execution) { varHandler.setNodeVariable("has_" + band.getName() + "_entities", !bandEntities.isEmpty()); } + // If no entities were assigned to any band (empty input or all failed), activate the + // lowest-score band so the inclusive split gateway always has at least one active branch. + boolean anyBandActive = + qualityBands.stream().anyMatch(b -> entitiesByBand.containsKey(b.getName())); + if (!anyBandActive) { + qualityBands.stream() + .min(Comparator.comparingDouble(QualityBand::getMinimumScore)) + .ifPresent(b -> varHandler.setNodeVariable("has_" + b.getName() + "_entities", true)); + } + // Priority band = highest minimumScore band that has entities String priorityBand = qualityBands.stream() diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImplTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImplTest.java index 6b6dc2076c4d..df534aae4711 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImplTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImplTest.java @@ -114,7 +114,7 @@ void testExecute_EmptyEntityList() { impl.execute(execution); verify(execution).setVariable(eq("process_" + HAS_TRUE_ENTITIES_VARIABLE), eq(false)); - verify(execution).setVariable(eq("process_" + HAS_FALSE_ENTITIES_VARIABLE), eq(false)); + verify(execution).setVariable(eq("process_" + HAS_FALSE_ENTITIES_VARIABLE), eq(true)); } @Test diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflowInclusiveGatewayTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflowInclusiveGatewayTest.java index 0d4386e669f1..3465e5666954 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflowInclusiveGatewayTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflowInclusiveGatewayTest.java @@ -30,7 +30,7 @@ class MainWorkflowInclusiveGatewayTest { @Test void testSplitGatewayInserted_WhenCheckNodeHasTwoConditionalOutgoingEdges() { - // Workflow: start → check → cert(true) / notify(false) → independent ends + // Workflow: start → check → certEnd(true) / notifyEnd(false) — independent ends WorkflowDefinition workflow = createWorkflow( "SplitOnlyWorkflow", @@ -41,10 +41,8 @@ void testSplitGatewayInserted_WhenCheckNodeHasTwoConditionalOutgoingEdges() { endEvent("notifyEnd")), List.of( edge("start", "check", null), - edge("check", "cert", "true"), - edge("check", "notify", "false"), - edge("cert", "certEnd", null), - edge("notify", "notifyEnd", null))); + edge("check", "certEnd", "true"), + edge("check", "notifyEnd", "false"))); MainWorkflow mainWorkflow = new MainWorkflow(workflow); Process process = mainWorkflow.getModel().getProcesses().get(0); @@ -75,17 +73,15 @@ void testSplitGatewayInserted_WhenCheckNodeHasTwoConditionalOutgoingEdges() { @Test void testJoinGatewayInserted_WhenBranchesConverge() { - // Workflow: check → cert(true) / notify(false) → converge on end + // Workflow: check → end(true) / end(false) — both branches converge on same end node WorkflowDefinition workflow = createWorkflow( "SplitJoinWorkflow", List.of(startEvent("start"), checkAttributesNode("check"), endEvent("end")), List.of( edge("start", "check", null), - edge("check", "cert", "true"), - edge("check", "notify", "false"), - edge("cert", "end", null), - edge("notify", "end", null))); + edge("check", "end", "true"), + edge("check", "end", "false"))); MainWorkflow mainWorkflow = new MainWorkflow(workflow); Process process = mainWorkflow.getModel().getProcesses().get(0); @@ -103,7 +99,7 @@ void testJoinGatewayInserted_WhenBranchesConverge() { .map(e -> (SequenceFlow) e) .filter(f -> "end_inclusiveJoin".equals(f.getTargetRef())) .count(); - assertEquals(2, flowsToJoin, "Both cert and notify should flow into the join gateway"); + assertEquals(2, flowsToJoin, "Both conditional branches should flow into the join gateway"); long flowsFromJoin = process.getFlowElements().stream() @@ -139,8 +135,8 @@ void testNoGatewayInserted_ForNonCheckNodeWithConditionalEdges() { List.of(startEvent("start"), endEvent("endTrue"), endEvent("endFalse")), List.of( edge("start", "UserApproval", null), - edge("UserApproval", "nextTrue", "true"), - edge("UserApproval", "nextFalse", "false"))); + edge("UserApproval", "endTrue", "true"), + edge("UserApproval", "endFalse", "false"))); MainWorkflow mainWorkflow = new MainWorkflow(workflow); Process process = mainWorkflow.getModel().getProcesses().get(0); @@ -163,10 +159,8 @@ void testDataCompleteness_BandConditionsGenerateCorrectFlags() { endEvent("silverEnd")), List.of( edge("start", "dataQuality", null), - edge("dataQuality", "goldAction", "gold"), - edge("dataQuality", "silverAction", "silver"), - edge("goldAction", "goldEnd", null), - edge("silverAction", "silverEnd", null))); + edge("dataQuality", "goldEnd", "gold"), + edge("dataQuality", "silverEnd", "silver"))); MainWorkflow mainWorkflow = new MainWorkflow(workflow); Process process = mainWorkflow.getModel().getProcesses().get(0); From fbb6bbb05c82902d4f903d8d1e07dd56160398f5 Mon Sep 17 00:00:00 2001 From: Ram Narayan Balaji Date: Wed, 1 Apr 2026 18:01:57 +0530 Subject: [PATCH 3/9] Address self-review findings on inclusive gateway implementation - DataCompletenessTask: guard against null getQualityBands() with Optional - DataCompletenessImpl: extract bandFlagVariable() static helper used by both the impl and MainWorkflow to avoid naming convention drift - InclusiveGatewayBuilder: remove misleading exclusive() setter; async job exclusivity is hardcoded true (not a routing concern, matches ExclusiveGatewayBuilder) - MainWorkflow: add @Slf4j + warn log when a conditional edge targets a join node (condition is intentionally ignored); use DataCompletenessImpl.bandFlagVariable() - DataCompletenessImplTest: add tests for empty entity list and all-entities-fail scenarios, both verifying lowest-band flag is activated as deadlock fallback Co-Authored-By: Claude Sonnet 4.6 --- .../automatedTask/DataCompletenessTask.java | 3 +- .../impl/DataCompletenessImpl.java | 8 +- .../workflows/flowable/MainWorkflow.java | 14 ++- .../builders/InclusiveGatewayBuilder.java | 8 +- .../impl/DataCompletenessImplTest.java | 95 +++++++++++++++++++ 5 files changed, 117 insertions(+), 11 deletions(-) create mode 100644 openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImplTest.java diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/DataCompletenessTask.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/DataCompletenessTask.java index 6236e2f475a5..3cc490ba2bc7 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/DataCompletenessTask.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/DataCompletenessTask.java @@ -7,6 +7,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -40,7 +41,7 @@ public class DataCompletenessTask implements NodeInterface { public DataCompletenessTask( DataCompletenessTaskDefinition nodeDefinition, WorkflowConfiguration workflowConfig) { this.outputPorts = - nodeDefinition.getConfig().getQualityBands().stream() + Optional.ofNullable(nodeDefinition.getConfig().getQualityBands()).orElse(List.of()).stream() .map(QualityBand::getName) .collect(Collectors.toSet()); String subProcessId = nodeDefinition.getName(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImpl.java index 956e43f9f09e..a4d3fe959427 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImpl.java @@ -124,7 +124,7 @@ public void execute(DelegateExecution execution) { for (QualityBand band : qualityBands) { List bandEntities = entitiesByBand.getOrDefault(band.getName(), List.of()); varHandler.setNodeVariable(band.getName() + "_" + ENTITY_LIST_VARIABLE, bandEntities); - varHandler.setNodeVariable("has_" + band.getName() + "_entities", !bandEntities.isEmpty()); + varHandler.setNodeVariable(bandFlagVariable(band.getName()), !bandEntities.isEmpty()); } // If no entities were assigned to any band (empty input or all failed), activate the @@ -134,7 +134,7 @@ public void execute(DelegateExecution execution) { if (!anyBandActive) { qualityBands.stream() .min(Comparator.comparingDouble(QualityBand::getMinimumScore)) - .ifPresent(b -> varHandler.setNodeVariable("has_" + b.getName() + "_entities", true)); + .ifPresent(b -> varHandler.setNodeVariable(bandFlagVariable(b.getName()), true)); } // Priority band = highest minimumScore band that has entities @@ -174,6 +174,10 @@ public void execute(DelegateExecution execution) { } } + public static String bandFlagVariable(String bandName) { + return "has_" + bandName + "_entities"; + } + private void storeFieldList( WorkflowVariableHandler varHandler, String varName, List fields) { if (fields.size() <= 50) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflow.java index 789565a76334..cd1133611264 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflow.java @@ -15,6 +15,7 @@ import java.util.Queue; import java.util.Set; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.flowable.bpmn.model.BoundaryEvent; import org.flowable.bpmn.model.BpmnModel; import org.flowable.bpmn.model.InclusiveGateway; @@ -27,10 +28,12 @@ import org.openmetadata.service.governance.workflows.elements.Edge; import org.openmetadata.service.governance.workflows.elements.NodeFactory; import org.openmetadata.service.governance.workflows.elements.NodeInterface; +import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl.DataCompletenessImpl; import org.openmetadata.service.governance.workflows.elements.nodes.endEvent.EndEvent; import org.openmetadata.service.governance.workflows.flowable.builders.InclusiveGatewayBuilder; @Getter +@Slf4j public class MainWorkflow { private final BpmnModel model; private final String workflowName; @@ -108,6 +111,14 @@ public MainWorkflow(WorkflowDefinition workflowDefinition) { if (toJoin) { // Incoming to a join node — wire to the join gateway instead + if (edgeDefinition.getCondition() != null && !edgeDefinition.getCondition().isEmpty()) { + LOG.warn( + "[WorkflowEdge] Edge from='{}' to='{}' has condition='{}' but '{}' is a join node; condition is ignored", + from, + to, + edgeDefinition.getCondition(), + to); + } SequenceFlow flow = new SequenceFlow(from, joinGatewayId(to)); process.addFlowElement(flow); } else { @@ -234,7 +245,8 @@ private String buildGatewayCondition(String nodeName, String edgeCondition) { flagVariable = getNamespacedVariableName(nodeName, HAS_FALSE_ENTITIES_VARIABLE); } else { // DataCompleteness band name (e.g., "gold", "silver") - flagVariable = getNamespacedVariableName(nodeName, "has_" + edgeCondition + "_entities"); + flagVariable = + getNamespacedVariableName(nodeName, DataCompletenessImpl.bandFlagVariable(edgeCondition)); } return String.format("${%s}", flagVariable); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilder.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilder.java index 61366a6e3aa3..c4ce9265ffad 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilder.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilder.java @@ -5,7 +5,6 @@ public class InclusiveGatewayBuilder extends FlowableElementBuilder { private boolean async = true; - private boolean exclusive = true; private String defaultFlow; public InclusiveGatewayBuilder setAsync(boolean async) { @@ -13,11 +12,6 @@ public InclusiveGatewayBuilder setAsync(boolean async) { return this; } - public InclusiveGatewayBuilder exclusive(boolean exclusive) { - this.exclusive = exclusive; - return this; - } - public InclusiveGatewayBuilder defaultFlow(String defaultFlowId) { this.defaultFlow = defaultFlowId; return this; @@ -29,7 +23,7 @@ public InclusiveGateway build() { gateway.setId(id); gateway.setName(id); gateway.setAsynchronous(async); - gateway.setExclusive(exclusive); + gateway.setExclusive(true); if (defaultFlow != null) { gateway.setDefaultFlow(defaultFlow); } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImplTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImplTest.java new file mode 100644 index 000000000000..72ab6e2304f3 --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImplTest.java @@ -0,0 +1,95 @@ +package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl; + +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.lang.reflect.Field; +import java.util.List; +import java.util.Map; +import org.flowable.common.engine.api.delegate.Expression; +import org.flowable.engine.delegate.DelegateExecution; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; +import org.openmetadata.schema.type.Include; +import org.openmetadata.service.Entity; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +class DataCompletenessImplTest { + + @Mock private DelegateExecution execution; + @Mock private Expression fieldsToCheckExpr; + @Mock private Expression qualityBandsExpr; + @Mock private Expression inputNamespaceMapExpr; + + private DataCompletenessImpl impl; + + private static final String QUALITY_BANDS_JSON = + "[{\"name\":\"gold\",\"minimumScore\":0.8},{\"name\":\"silver\",\"minimumScore\":0.5}]"; + + @BeforeEach + void setUp() throws Exception { + impl = new DataCompletenessImpl(); + injectExpression(impl, "fieldsToCheckExpr", fieldsToCheckExpr); + injectExpression(impl, "qualityBandsExpr", qualityBandsExpr); + injectExpression(impl, "inputNamespaceMapExpr", inputNamespaceMapExpr); + + when(execution.getProcessDefinitionId()).thenReturn("process:1:test"); + when(execution.getCurrentActivityId()).thenReturn("process.dataCompletenessTask"); + when(execution.getParent()).thenReturn(null); + + when(inputNamespaceMapExpr.getValue(execution)).thenReturn("{\"entityList\":\"global\"}"); + when(fieldsToCheckExpr.getValue(execution)).thenReturn("[\"description\"]"); + when(qualityBandsExpr.getValue(execution)).thenReturn(QUALITY_BANDS_JSON); + } + + @Test + void testExecute_EmptyEntityList_ActivatesLowestBand() { + when(execution.getVariable("global_entityList")).thenReturn(List.of()); + + try (MockedStatic entityMock = mockStatic(Entity.class)) { + entityMock + .when(() -> Entity.getEntitiesByLinks(anyList(), eq("*"), eq(Include.ALL))) + .thenReturn(Map.of()); + + impl.execute(execution); + } + + // Silver has the lowest minimumScore (0.5) — it should be activated as the fallback + verify(execution).setVariable(eq("process_has_silver_entities"), eq(true)); + } + + @Test + void testExecute_AllEntitiesFailProcessing_ActivatesLowestBand() { + List entityList = List.of("<#E::table::test.db.table>"); + when(execution.getVariable("global_entityList")).thenReturn(entityList); + + try (MockedStatic entityMock = mockStatic(Entity.class)) { + // Entity not found in map → goes to failedEntities, no band assignment + entityMock + .when(() -> Entity.getEntitiesByLinks(anyList(), eq("*"), eq(Include.ALL))) + .thenReturn(Map.of()); + + impl.execute(execution); + } + + // Silver has the lowest minimumScore (0.5) — it should be activated as the fallback + verify(execution).setVariable(eq("process_has_silver_entities"), eq(true)); + } + + private void injectExpression(Object target, String fieldName, Expression value) + throws Exception { + Field field = DataCompletenessImpl.class.getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + } +} From 17311bb46d4c96260cd4a1ea1bad333c3d8b8dc2 Mon Sep 17 00:00:00 2001 From: Ram Narayan Balaji Date: Sun, 5 Apr 2026 16:53:54 +0530 Subject: [PATCH 4/9] Fix: set InclusiveGateway async=false to prevent CI timeout Gateways with async=true create async jobs that wait on the 60s asyncJobAcquisitionInterval poller. With 4 gateways in the GlossaryApprovalWorkflow critical path, worst case is 240s which exceeds the 180s test timeout. Async is already provided by the trigger layer: PeriodicBatchEntityTrigger sets asyncLeave=true on the start event, and EventBasedEntityTrigger workflows ran fully synchronously on the base branch without issue. Gateway routing (boolean condition evaluation) is trivial and safe to run synchronously within the existing execution context. Co-Authored-By: Claude Sonnet 4.6 --- .../flowable/builders/InclusiveGatewayBuilder.java | 2 +- .../builders/InclusiveGatewayBuilderTest.java | 11 ++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilder.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilder.java index c4ce9265ffad..a56f218beda3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilder.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilder.java @@ -4,7 +4,7 @@ public class InclusiveGatewayBuilder extends FlowableElementBuilder { - private boolean async = true; + private boolean async = false; private String defaultFlow; public InclusiveGatewayBuilder setAsync(boolean async) { diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilderTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilderTest.java index 20dd6e8fa4f6..23d9f5e7fa0b 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilderTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilderTest.java @@ -1,6 +1,7 @@ package org.openmetadata.service.governance.workflows.flowable.builders; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -10,12 +11,12 @@ class InclusiveGatewayBuilderTest { @Test - void testBuildDefaultsToAsyncExclusive() { + void testBuildDefaultsToSyncExclusive() { InclusiveGateway gateway = new InclusiveGatewayBuilder().id("splitGateway").build(); assertEquals("splitGateway", gateway.getId()); assertEquals("splitGateway", gateway.getName()); - assertTrue(gateway.isAsynchronous()); + assertFalse(gateway.isAsynchronous()); assertTrue(gateway.isExclusive()); assertNull(gateway.getDefaultFlow()); } @@ -30,10 +31,10 @@ void testBuildWithDefaultFlow() { } @Test - void testBuildWithAsyncFalse() { - InclusiveGateway gateway = new InclusiveGatewayBuilder().id("g1").setAsync(false).build(); + void testBuildWithAsyncTrue() { + InclusiveGateway gateway = new InclusiveGatewayBuilder().id("g1").setAsync(true).build(); assertEquals("g1", gateway.getId()); - assertTrue(!gateway.isAsynchronous()); + assertTrue(gateway.isAsynchronous()); } } From b9d4f8f5748b6fe19a0d6bbae49151f1af5c5905 Mon Sep 17 00:00:00 2001 From: Ram Narayan Balaji Date: Mon, 6 Apr 2026 10:28:50 +0530 Subject: [PATCH 5/9] Fix null guard on getConfig() in DataCompletenessTask Guard the entire getConfig().getQualityBands() chain with Optional.ofNullable to prevent NPE if getConfig() returns null, consistent with the defensive null-checking pattern used in CheckChangeDescriptionTask. Co-Authored-By: Claude Sonnet 4.6 --- .../elements/nodes/automatedTask/DataCompletenessTask.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/DataCompletenessTask.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/DataCompletenessTask.java index 3cc490ba2bc7..1550ec62d73d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/DataCompletenessTask.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/DataCompletenessTask.java @@ -41,7 +41,10 @@ public class DataCompletenessTask implements NodeInterface { public DataCompletenessTask( DataCompletenessTaskDefinition nodeDefinition, WorkflowConfiguration workflowConfig) { this.outputPorts = - Optional.ofNullable(nodeDefinition.getConfig().getQualityBands()).orElse(List.of()).stream() + Optional.ofNullable(nodeDefinition.getConfig()) + .map(c -> c.getQualityBands()) + .orElse(List.of()) + .stream() .map(QualityBand::getName) .collect(Collectors.toSet()); String subProcessId = nodeDefinition.getName(); From 3362332f9db10a160081065d97fc80688af8a62c Mon Sep 17 00:00:00 2001 From: Ram Narayan Balaji Date: Wed, 15 Apr 2026 02:48:10 +0530 Subject: [PATCH 6/9] feat(workflows): add branches property to dataCompletenessTask schema Enables inclusive gateway output routing by declaring supported branch names on the task schema. Co-Authored-By: Claude Sonnet 4.6 --- .../elements/nodes/automatedTask/dataCompletenessTask.json | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/dataCompletenessTask.json b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/dataCompletenessTask.json index f175d8e0f1d6..cc48f1cdf80d 100644 --- a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/dataCompletenessTask.json +++ b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/dataCompletenessTask.json @@ -111,6 +111,11 @@ "items": { "type": "string" }, "default": ["completenessScore", "qualityBand", "filledFieldsCount", "totalFieldsCount", "missingFields", "filledFields", "result"], "additionalItems": false + }, + "branches": { + "type": "array", + "items": { "type": "string" }, + "default": [] } }, "required": ["name", "config"], From 8c890fe2b5fe006ae6b17fb03e3abacdaddb0dc4 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Tue, 14 Apr 2026 21:22:43 +0000 Subject: [PATCH 7/9] Update generated TypeScript types --- .../elements/nodes/automatedTask/dataCompletenessTask.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/elements/nodes/automatedTask/dataCompletenessTask.ts b/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/elements/nodes/automatedTask/dataCompletenessTask.ts index 67fe304518c5..bff4bd83ca3a 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/elements/nodes/automatedTask/dataCompletenessTask.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/elements/nodes/automatedTask/dataCompletenessTask.ts @@ -14,7 +14,8 @@ * Evaluates entity data completeness based on field presence and outputs quality bands. */ export interface DataCompletenessTask { - config: CompletenessConfiguration; + branches?: string[]; + config: CompletenessConfiguration; /** * Description of what this completeness check does */ From efd006b69a4f5e001317a7c645006eacd650fa74 Mon Sep 17 00:00:00 2001 From: Ram Narayan Balaji Date: Tue, 5 May 2026 14:38:12 +0530 Subject: [PATCH 8/9] fix(governance): address blocking review comments on inclusive gateway PR MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - InclusiveGatewayBuilder: remove setExclusive(true) — verified against Flowable 7.2.0 source that this flag only affects async-job scheduler exclusivity; InclusiveGatewayActivityBehavior never reads it; routing semantics come from the InclusiveGateway class type, not this property - MainWorkflow: wire defaultFlow on each split gateway pointing to the corresponding join gateway (via new buildSplitToJoinMap helper). Verified against TakeOutgoingSequenceFlowsOperation source: without a defaultFlow, Flowable throws FlowableException when all conditions are false. buildGatewayCondition now emits ${var == true} instead of ${var} so UelExpressionCondition.evaluate never receives a bare null - DataCompletenessImpl: remove anyBandActive fallback that forced the lowest-score band flag to true with an empty entity list. All flags are already written as !bandEntities.isEmpty() in the per-band loop, so when all conditions are false the split gateway safely falls through to its defaultFlow, eliminating spurious audit log entries from stage listeners firing on zero-entity SetEntityAttribute executions - Tests updated throughout to reflect new condition format (== true), 3-incoming-flows on join gateway (2 conditional + 1 defaultFlow), and all-false flags for empty/all-failed data completeness runs Co-Authored-By: Claude Sonnet 4.6 --- .../impl/DataCompletenessImpl.java | 13 +------ .../workflows/flowable/MainWorkflow.java | 37 ++++++++++++++++++- .../builders/InclusiveGatewayBuilder.java | 1 - .../impl/DataCompletenessImplTest.java | 14 ++++--- .../MainWorkflowInclusiveGatewayTest.java | 25 ++++++++++--- .../builders/InclusiveGatewayBuilderTest.java | 3 +- 6 files changed, 65 insertions(+), 28 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImpl.java index a4d3fe959427..3084d6a8f087 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImpl.java @@ -120,23 +120,14 @@ public void execute(DelegateExecution execution) { entityList.size()); } - // Per-band entity lists — ALL bands stored, empty or not (inclusive gateway ready) + // Per-band entity lists — ALL bands stored, empty or not. + // When all flags are false the split gateway falls through to its defaultFlow. for (QualityBand band : qualityBands) { List bandEntities = entitiesByBand.getOrDefault(band.getName(), List.of()); varHandler.setNodeVariable(band.getName() + "_" + ENTITY_LIST_VARIABLE, bandEntities); varHandler.setNodeVariable(bandFlagVariable(band.getName()), !bandEntities.isEmpty()); } - // If no entities were assigned to any band (empty input or all failed), activate the - // lowest-score band so the inclusive split gateway always has at least one active branch. - boolean anyBandActive = - qualityBands.stream().anyMatch(b -> entitiesByBand.containsKey(b.getName())); - if (!anyBandActive) { - qualityBands.stream() - .min(Comparator.comparingDouble(QualityBand::getMinimumScore)) - .ifPresent(b -> varHandler.setNodeVariable(bandFlagVariable(b.getName()), true)); - } - // Priority band = highest minimumScore band that has entities String priorityBand = qualityBands.stream() diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflow.java index ce20d193577b..356d12f2a1a2 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflow.java @@ -75,14 +75,28 @@ public MainWorkflow(WorkflowDefinition workflowDefinition) { Map> incomingEdges = buildIncomingEdgesMap(edges); Set splitNodes = detectSplitNodes(outgoingEdges, nodeInstanceMap); Set joinNodes = detectJoinNodes(incomingEdges, splitNodes); + Map splitToJoin = buildSplitToJoinMap(incomingEdges, splitNodes, joinNodes); // Add split gateways and their outgoing conditional flows for (String splitNode : splitNodes) { String gatewayId = splitGatewayId(splitNode); - InclusiveGateway gateway = new InclusiveGatewayBuilder().id(gatewayId).build(); + String joinNode = splitToJoin.get(splitNode); + String defaultFlowId = joinNode != null ? gatewayId + "_default" : null; + + InclusiveGatewayBuilder builder = new InclusiveGatewayBuilder().id(gatewayId); + if (defaultFlowId != null) { + builder = builder.defaultFlow(defaultFlowId); + } + InclusiveGateway gateway = builder.build(); process.addFlowElement(gateway); process.addFlowElement(new SequenceFlow(splitNode, gatewayId)); + if (defaultFlowId != null) { + SequenceFlow defaultFlow = new SequenceFlow(gatewayId, joinGatewayId(joinNode)); + defaultFlow.setId(defaultFlowId); + process.addFlowElement(defaultFlow); + } + for (EdgeDefinition edge : outgoingEdges.get(splitNode)) { String targetId = joinNodes.contains(edge.getTo()) ? joinGatewayId(edge.getTo()) : edge.getTo(); @@ -233,6 +247,25 @@ private Set findSplitAncestors( return ancestors; } + private Map buildSplitToJoinMap( + Map> incomingEdges, + Set splitNodes, + Set joinNodes) { + Map result = new HashMap<>(); + for (String joinNode : joinNodes) { + List incoming = incomingEdges.getOrDefault(joinNode, List.of()); + if (incoming.isEmpty()) { + continue; + } + Set ancestors = + findSplitAncestors(incoming.get(0).getFrom(), incomingEdges, splitNodes, new HashSet<>()); + for (String split : ancestors) { + result.putIfAbsent(split, joinNode); + } + } + return result; + } + /** * Generates the inclusive gateway condition expression for an outgoing edge from a split node. * Maps the edge condition value to the appropriate boolean flag variable set by the node impl. @@ -251,7 +284,7 @@ private String buildGatewayCondition(String nodeName, String edgeCondition) { flagVariable = getNamespacedVariableName(nodeName, DataCompletenessImpl.bandFlagVariable(edgeCondition)); } - return String.format("${%s}", flagVariable); + return String.format("${%s == true}", flagVariable); } private static String splitGatewayId(String nodeName) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilder.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilder.java index a56f218beda3..3adeb4e42303 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilder.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilder.java @@ -23,7 +23,6 @@ public InclusiveGateway build() { gateway.setId(id); gateway.setName(id); gateway.setAsynchronous(async); - gateway.setExclusive(true); if (defaultFlow != null) { gateway.setDefaultFlow(defaultFlow); } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImplTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImplTest.java index 72ab6e2304f3..d0241f45bc32 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImplTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImplTest.java @@ -53,7 +53,7 @@ void setUp() throws Exception { } @Test - void testExecute_EmptyEntityList_ActivatesLowestBand() { + void testExecute_EmptyEntityList_SetsAllFlagsToFalse() { when(execution.getVariable("global_entityList")).thenReturn(List.of()); try (MockedStatic entityMock = mockStatic(Entity.class)) { @@ -64,12 +64,13 @@ void testExecute_EmptyEntityList_ActivatesLowestBand() { impl.execute(execution); } - // Silver has the lowest minimumScore (0.5) — it should be activated as the fallback - verify(execution).setVariable(eq("process_has_silver_entities"), eq(true)); + // All band flags must be false — the split gateway's defaultFlow handles the empty case + verify(execution).setVariable(eq("process_has_gold_entities"), eq(false)); + verify(execution).setVariable(eq("process_has_silver_entities"), eq(false)); } @Test - void testExecute_AllEntitiesFailProcessing_ActivatesLowestBand() { + void testExecute_AllEntitiesFailProcessing_SetsAllFlagsToFalse() { List entityList = List.of("<#E::table::test.db.table>"); when(execution.getVariable("global_entityList")).thenReturn(entityList); @@ -82,8 +83,9 @@ void testExecute_AllEntitiesFailProcessing_ActivatesLowestBand() { impl.execute(execution); } - // Silver has the lowest minimumScore (0.5) — it should be activated as the fallback - verify(execution).setVariable(eq("process_has_silver_entities"), eq(true)); + // All band flags must be false — the split gateway's defaultFlow handles the empty case + verify(execution).setVariable(eq("process_has_gold_entities"), eq(false)); + verify(execution).setVariable(eq("process_has_silver_entities"), eq(false)); } private void injectExpression(Object target, String fieldName, Expression value) diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflowInclusiveGatewayTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflowInclusiveGatewayTest.java index 3465e5666954..900d4fb11947 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflowInclusiveGatewayTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflowInclusiveGatewayTest.java @@ -66,9 +66,12 @@ void testSplitGatewayInserted_WhenCheckNodeHasTwoConditionalOutgoingEdges() { .toList(); assertEquals(2, conditions.size()); - assertTrue(conditions.contains("${check_hasTrueEntities}"), "True branch uses hasTrueEntities"); assertTrue( - conditions.contains("${check_hasFalseEntities}"), "False branch uses hasFalseEntities"); + conditions.contains("${check_hasTrueEntities == true}"), + "True branch uses hasTrueEntities"); + assertTrue( + conditions.contains("${check_hasFalseEntities == true}"), + "False branch uses hasFalseEntities"); } @Test @@ -86,8 +89,12 @@ void testJoinGatewayInserted_WhenBranchesConverge() { MainWorkflow mainWorkflow = new MainWorkflow(workflow); Process process = mainWorkflow.getModel().getProcesses().get(0); + FlowElement splitElement = process.getFlowElement("check_inclusiveSplit"); + assertNotNull(splitElement, "Split gateway should be present"); + InclusiveGateway splitGateway = (InclusiveGateway) splitElement; assertNotNull( - process.getFlowElement("check_inclusiveSplit"), "Split gateway should be present"); + splitGateway.getDefaultFlow(), + "Split gateway must have a defaultFlow for the empty-condition case"); FlowElement joinGateway = process.getFlowElement("end_inclusiveJoin"); assertNotNull(joinGateway, "Join gateway should exist before end"); @@ -99,7 +106,11 @@ void testJoinGatewayInserted_WhenBranchesConverge() { .map(e -> (SequenceFlow) e) .filter(f -> "end_inclusiveJoin".equals(f.getTargetRef())) .count(); - assertEquals(2, flowsToJoin, "Both conditional branches should flow into the join gateway"); + // 2 conditional branches + 1 defaultFlow (skip-path when all conditions false) + assertEquals( + 3, + flowsToJoin, + "Both conditional branches and defaultFlow should flow into the join gateway"); long flowsFromJoin = process.getFlowElements().stream() @@ -177,8 +188,10 @@ void testDataCompleteness_BandConditionsGenerateCorrectFlags() { .map(SequenceFlow::getConditionExpression) .toList(); - assertTrue(conditions.contains("${dataQuality_has_gold_entities}"), "Gold band condition"); - assertTrue(conditions.contains("${dataQuality_has_silver_entities}"), "Silver band condition"); + assertTrue( + conditions.contains("${dataQuality_has_gold_entities == true}"), "Gold band condition"); + assertTrue( + conditions.contains("${dataQuality_has_silver_entities == true}"), "Silver band condition"); } // --- Helpers --- diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilderTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilderTest.java index 23d9f5e7fa0b..b7a7b9a8a550 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilderTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilderTest.java @@ -11,13 +11,12 @@ class InclusiveGatewayBuilderTest { @Test - void testBuildDefaultsToSyncExclusive() { + void testBuildDefaultsToSync() { InclusiveGateway gateway = new InclusiveGatewayBuilder().id("splitGateway").build(); assertEquals("splitGateway", gateway.getId()); assertEquals("splitGateway", gateway.getName()); assertFalse(gateway.isAsynchronous()); - assertTrue(gateway.isExclusive()); assertNull(gateway.getDefaultFlow()); } From cfa1bcddf01c494cd87c8842ee12a5cef27d5d40 Mon Sep 17 00:00:00 2001 From: Ram Narayan Balaji Date: Tue, 5 May 2026 21:37:25 +0530 Subject: [PATCH 9/9] fix(governance): prevent workflow from restoring soft-deleted entities Switch Include.ALL to Include.NON_DELETED in WorkflowEventConsumer and SetEntityAttributeImpl so deleted entities are never picked up for workflow processing. Add skipDeleted guard in bulkUpdateEntitiesForGovernanceWorkflow so even if a deleted entity reaches the bulk-update path it is skipped rather than restored. Co-Authored-By: Claude Sonnet 4.6 --- .../governance/workflows/WorkflowEventConsumer.java | 2 +- .../nodes/automatedTask/impl/SetEntityAttributeImpl.java | 2 +- .../org/openmetadata/service/jdbi3/EntityRepository.java | 9 +++++++++ .../automatedTask/impl/SetEntityAttributeImplTest.java | 4 ++-- 4 files changed, 13 insertions(+), 4 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowEventConsumer.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowEventConsumer.java index 1f9f079f0471..b12050f6a638 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowEventConsumer.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowEventConsumer.java @@ -215,7 +215,7 @@ public static Map defaultHandler(ChangeEvent event) { EntityReference entityReference; try { entityReference = - Entity.getEntityReferenceById(entityType, event.getEntityId(), Include.ALL); + Entity.getEntityReferenceById(entityType, event.getEntityId(), Include.NON_DELETED); } catch (EntityNotFoundException e) { // Entity was deleted between event creation and processing - skip workflow trigger LOG.debug( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/SetEntityAttributeImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/SetEntityAttributeImpl.java index d5b6d14f7a09..be1abda151a8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/SetEntityAttributeImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/SetEntityAttributeImpl.java @@ -103,7 +103,7 @@ private void processBatch(List entityLinks, BatchContext ctx) { EntityRepository repo = (EntityRepository) Entity.getEntityRepository(ctx.entityType()); Map loadedByLink = - Entity.getEntitiesByLinks(entityLinks, "*", Include.ALL); + Entity.getEntitiesByLinks(entityLinks, "*", Include.NON_DELETED); for (String link : entityLinks) { if (!loadedByLink.containsKey(link)) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java index 0a6c9279d744..ef55322e7bc0 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java @@ -10689,6 +10689,7 @@ public void bulkUpdateEntitiesForGovernanceWorkflow( userName, impersonatedBy, true, + true, success, failed, latencies); @@ -10704,6 +10705,7 @@ private void bulkUpdateEntities( String userName, String impersonatedBy, boolean skipBotGuard, + boolean skipDeleted, List successRequests, List failedRequests, List entityLatenciesNanos) { @@ -10769,6 +10771,12 @@ private void bulkUpdateEntities( entity.setImpersonatedBy(impersonatedBy); if (Boolean.TRUE.equals(original.getDeleted())) { + if (skipDeleted) { + LOG.debug( + "[BulkUpdate] Skipping soft-deleted entity '{}' — governance workflows must not restore entities", + fqn); + continue; + } restoreEntity(entity.getUpdatedBy(), original.getId()); } @@ -11035,6 +11043,7 @@ private BulkOperationResult bulkCreateOrUpdateEntitiesSequential( userName, null, false, + false, successRequests, failedRequests, entityLatenciesNanos); diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/SetEntityAttributeImplTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/SetEntityAttributeImplTest.java index ef396acd0b1f..4e520861f7b4 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/SetEntityAttributeImplTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/SetEntityAttributeImplTest.java @@ -79,7 +79,7 @@ void testExecute_SetsFieldWithGovernanceBot() { try (MockedStatic entityMock = mockStatic(Entity.class); MockedStatic fieldUtilsMock = mockStatic(EntityFieldUtils.class)) { entityMock - .when(() -> Entity.getEntitiesByLinks(anyList(), eq("*"), eq(Include.ALL))) + .when(() -> Entity.getEntitiesByLinks(anyList(), eq("*"), eq(Include.NON_DELETED))) .thenReturn(Map.of("<#E::table::test.db.table>", table)); entityMock.when(() -> Entity.getEntityRepository(anyString())).thenReturn(mockRepo); @@ -126,7 +126,7 @@ void testExecute_SetsFieldWithActualUser() { try (MockedStatic entityMock = mockStatic(Entity.class); MockedStatic fieldUtilsMock = mockStatic(EntityFieldUtils.class)) { entityMock - .when(() -> Entity.getEntitiesByLinks(anyList(), eq("*"), eq(Include.ALL))) + .when(() -> Entity.getEntitiesByLinks(anyList(), eq("*"), eq(Include.NON_DELETED))) .thenReturn(Map.of("<#E::table::test.db.table>", table)); entityMock.when(() -> Entity.getEntityRepository(anyString())).thenReturn(mockRepo);