diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/Workflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/Workflow.java index 7508e6ef3373..09ad9274dd51 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/Workflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/Workflow.java @@ -16,6 +16,8 @@ public class Workflow { public static final String ENTITY_LIST_VARIABLE = "entityList"; public static final String TRUE_ENTITY_LIST_VARIABLE = "true_entityList"; public static final String FALSE_ENTITY_LIST_VARIABLE = "false_entityList"; + public static final String HAS_TRUE_ENTITIES_VARIABLE = "hasTrueEntities"; + public static final String HAS_FALSE_ENTITIES_VARIABLE = "hasFalseEntities"; public static final String BATCH_SINK_PROCESSED_VARIABLE = "batchSinkProcessed"; public static final String TRIGGERING_OBJECT_ID_VARIABLE = "triggeringObjectId"; public static final String RECOGNIZER_FEEDBACK = "recognizerFeedback"; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/NodeInterface.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/NodeInterface.java index ec1fd36990f8..6be633495821 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/NodeInterface.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/NodeInterface.java @@ -5,6 +5,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Set; import org.flowable.bpmn.model.Activity; import org.flowable.bpmn.model.BoundaryEvent; import org.flowable.bpmn.model.BpmnModel; @@ -22,6 +23,16 @@ public interface NodeInterface { void addToWorkflow(BpmnModel model, Process process); + /** + * Returns the output ports this node can produce. Nodes that split a batch into multiple + * entity lists (e.g. CheckEntityAttributes → "true"/"false") declare their ports here. + * The MainWorkflow uses this to automatically inject inclusive gateways at split/join points. + * The default (empty set) means no inclusive gateway is injected for this node. + */ + default Set getOutputPorts() { + return Set.of(); + } + default BoundaryEvent getRuntimeExceptionBoundaryEvent() { return null; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CheckChangeDescriptionTask.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CheckChangeDescriptionTask.java index d6af66cfad0f..e42c2cde8a93 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CheckChangeDescriptionTask.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CheckChangeDescriptionTask.java @@ -6,6 +6,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.Set; import lombok.extern.slf4j.Slf4j; import org.flowable.bpmn.model.BoundaryEvent; import org.flowable.bpmn.model.BpmnModel; @@ -84,6 +85,11 @@ public CheckChangeDescriptionTask( this.subProcess = subProcess; } + @Override + public Set getOutputPorts() { + return Set.of("true", "false"); + } + @Override public BoundaryEvent getRuntimeExceptionBoundaryEvent() { return runtimeExceptionBoundaryEvent; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CheckEntityAttributesTask.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CheckEntityAttributesTask.java index 6c8d29a551ac..0046884a0713 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CheckEntityAttributesTask.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/CheckEntityAttributesTask.java @@ -6,6 +6,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.Set; import lombok.extern.slf4j.Slf4j; import org.flowable.bpmn.model.BoundaryEvent; import org.flowable.bpmn.model.BpmnModel; @@ -77,6 +78,11 @@ public CheckEntityAttributesTask( this.subProcess = subProcess; } + @Override + public Set getOutputPorts() { + return Set.of("true", "false"); + } + @Override public BoundaryEvent getRuntimeExceptionBoundaryEvent() { return runtimeExceptionBoundaryEvent; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/DataCompletenessTask.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/DataCompletenessTask.java index bbb3ffb6c29d..1550ec62d73d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/DataCompletenessTask.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/DataCompletenessTask.java @@ -7,6 +7,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.flowable.bpmn.model.BoundaryEvent; import org.flowable.bpmn.model.BpmnModel; @@ -19,6 +22,7 @@ import org.flowable.bpmn.model.SubProcess; import org.openmetadata.schema.governance.workflows.WorkflowConfiguration; import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.DataCompletenessTaskDefinition; +import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.QualityBand; import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.service.governance.workflows.elements.NodeInterface; import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl.DataCompletenessImpl; @@ -32,9 +36,17 @@ public class DataCompletenessTask implements NodeInterface { private final SubProcess subProcess; private final BoundaryEvent runtimeExceptionBoundaryEvent; + private final Set outputPorts; public DataCompletenessTask( DataCompletenessTaskDefinition nodeDefinition, WorkflowConfiguration workflowConfig) { + this.outputPorts = + Optional.ofNullable(nodeDefinition.getConfig()) + .map(c -> c.getQualityBands()) + .orElse(List.of()) + .stream() + .map(QualityBand::getName) + .collect(Collectors.toSet()); String subProcessId = nodeDefinition.getName(); SubProcess subProcess = new SubProcessBuilder().id(subProcessId).build(); @@ -107,6 +119,11 @@ private ServiceTask getDataCompletenessServiceTask( return builder.build(); } + @Override + public Set getOutputPorts() { + return outputPorts; + } + @Override public void addToWorkflow(BpmnModel model, Process process) { process.addFlowElement(subProcess); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckChangeDescriptionTaskImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckChangeDescriptionTaskImpl.java index ed96ca5dda14..314b71ae3dc3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckChangeDescriptionTaskImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckChangeDescriptionTaskImpl.java @@ -2,7 +2,8 @@ import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.FALSE_ENTITY_LIST_VARIABLE; -import static org.openmetadata.service.governance.workflows.Workflow.RESULT_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.HAS_FALSE_ENTITIES_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.HAS_TRUE_ENTITIES_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.TRUE_ENTITY_LIST_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION; import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId; @@ -77,10 +78,11 @@ public void execute(DelegateExecution execution) { } } - boolean result = !trueEntityList.isEmpty(); varHandler.setNodeVariable(TRUE_ENTITY_LIST_VARIABLE, trueEntityList); varHandler.setNodeVariable(FALSE_ENTITY_LIST_VARIABLE, falseEntityList); - varHandler.setNodeVariable(RESULT_VARIABLE, result); + varHandler.setNodeVariable(HAS_TRUE_ENTITIES_VARIABLE, !trueEntityList.isEmpty()); + varHandler.setNodeVariable( + HAS_FALSE_ENTITIES_VARIABLE, !falseEntityList.isEmpty() || entityList.isEmpty()); } catch (Exception exc) { LOG.error( "[{}] Failure: ", getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()), exc); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImpl.java index fcac337b762f..68b856cbbae5 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImpl.java @@ -2,7 +2,8 @@ import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.FALSE_ENTITY_LIST_VARIABLE; -import static org.openmetadata.service.governance.workflows.Workflow.RESULT_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.HAS_FALSE_ENTITIES_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.HAS_TRUE_ENTITIES_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.TRUE_ENTITY_LIST_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION; import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId; @@ -69,10 +70,11 @@ public void execute(DelegateExecution execution) { } } - boolean result = !trueEntityList.isEmpty(); varHandler.setNodeVariable(TRUE_ENTITY_LIST_VARIABLE, trueEntityList); varHandler.setNodeVariable(FALSE_ENTITY_LIST_VARIABLE, falseEntityList); - varHandler.setNodeVariable(RESULT_VARIABLE, result); + varHandler.setNodeVariable(HAS_TRUE_ENTITIES_VARIABLE, !trueEntityList.isEmpty()); + varHandler.setNodeVariable( + HAS_FALSE_ENTITIES_VARIABLE, !falseEntityList.isEmpty() || entityList.isEmpty()); } catch (Exception exc) { LOG.error( "[{}] Failure: ", getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()), exc); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImpl.java index b5f22cc641a3..3084d6a8f087 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImpl.java @@ -2,7 +2,6 @@ import static org.openmetadata.service.governance.workflows.Workflow.ENTITY_LIST_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; -import static org.openmetadata.service.governance.workflows.Workflow.RESULT_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION; import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId; @@ -121,10 +120,12 @@ public void execute(DelegateExecution execution) { entityList.size()); } - // Per-band entity lists — ALL bands stored, empty or not (inclusive gateway ready) + // Per-band entity lists — ALL bands stored, empty or not. + // When all flags are false the split gateway falls through to its defaultFlow. for (QualityBand band : qualityBands) { List bandEntities = entitiesByBand.getOrDefault(band.getName(), List.of()); varHandler.setNodeVariable(band.getName() + "_" + ENTITY_LIST_VARIABLE, bandEntities); + varHandler.setNodeVariable(bandFlagVariable(band.getName()), !bandEntities.isEmpty()); } // Priority band = highest minimumScore band that has entities @@ -142,7 +143,6 @@ public void execute(DelegateExecution execution) { priorityBand, entitiesByBand.keySet()); - varHandler.setNodeVariable(RESULT_VARIABLE, priorityBand); varHandler.setNodeVariable("entityResults", entityResults); // Scalar outputs for backward compat when processing a single entity @@ -165,6 +165,10 @@ public void execute(DelegateExecution execution) { } } + public static String bandFlagVariable(String bandName) { + return "has_" + bandName + "_entities"; + } + private void storeFieldList( WorkflowVariableHandler varHandler, String varName, List fields) { if (fields.size() <= 50) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflow.java index 59f108a43dc9..356d12f2a1a2 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflow.java @@ -1,6 +1,9 @@ package org.openmetadata.service.governance.workflows.flowable; import static org.openmetadata.service.governance.workflows.Workflow.GLOBAL_NAMESPACE; +import static org.openmetadata.service.governance.workflows.Workflow.HAS_FALSE_ENTITIES_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.HAS_TRUE_ENTITIES_VARIABLE; +import static org.openmetadata.service.governance.workflows.WorkflowVariableHandler.getNamespacedVariableName; import java.util.ArrayList; import java.util.HashMap; @@ -12,8 +15,10 @@ import java.util.Queue; import java.util.Set; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.flowable.bpmn.model.BoundaryEvent; import org.flowable.bpmn.model.BpmnModel; +import org.flowable.bpmn.model.InclusiveGateway; import org.flowable.bpmn.model.Process; import org.flowable.bpmn.model.SequenceFlow; import org.openmetadata.schema.governance.workflows.WorkflowDefinition; @@ -23,9 +28,12 @@ import org.openmetadata.service.governance.workflows.elements.Edge; import org.openmetadata.service.governance.workflows.elements.NodeFactory; import org.openmetadata.service.governance.workflows.elements.NodeInterface; +import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl.DataCompletenessImpl; import org.openmetadata.service.governance.workflows.elements.nodes.endEvent.EndEvent; +import org.openmetadata.service.governance.workflows.flowable.builders.InclusiveGatewayBuilder; @Getter +@Slf4j public class MainWorkflow { private final BpmnModel model; private final String workflowName; @@ -45,23 +53,95 @@ public MainWorkflow(WorkflowDefinition workflowDefinition) { .orElse(workflowDefinition.getFullyQualifiedName())); model.addProcess(process); - // Add Nodes + List edges = workflowDefinition.getEdges(); + + // Add Nodes and collect instances for gateway detection + Map nodeInstanceMap = new HashMap<>(); for (WorkflowNodeDefinitionInterface nodeDefinitionObj : workflowDefinition.getNodes()) { NodeInterface node = NodeFactory.createNode( nodeDefinitionObj, workflowDefinition.getConfig(), workflowDefinition.getFullyQualifiedName()); + nodeInstanceMap.put(nodeDefinitionObj.getName(), node); node.addToWorkflow(model, process); Optional.ofNullable(node.getRuntimeExceptionBoundaryEvent()) .ifPresent(runtimeExceptionBoundaryEvents::add); } - // Add Edges - for (EdgeDefinition edgeDefinition : workflowDefinition.getEdges()) { - Edge edge = new Edge(edgeDefinition); - edge.addToWorkflow(model, process); + // Detect where inclusive gateways need to be injected + Map> outgoingEdges = buildOutgoingEdgesMap(edges); + Map> incomingEdges = buildIncomingEdgesMap(edges); + Set splitNodes = detectSplitNodes(outgoingEdges, nodeInstanceMap); + Set joinNodes = detectJoinNodes(incomingEdges, splitNodes); + Map splitToJoin = buildSplitToJoinMap(incomingEdges, splitNodes, joinNodes); + + // Add split gateways and their outgoing conditional flows + for (String splitNode : splitNodes) { + String gatewayId = splitGatewayId(splitNode); + String joinNode = splitToJoin.get(splitNode); + String defaultFlowId = joinNode != null ? gatewayId + "_default" : null; + + InclusiveGatewayBuilder builder = new InclusiveGatewayBuilder().id(gatewayId); + if (defaultFlowId != null) { + builder = builder.defaultFlow(defaultFlowId); + } + InclusiveGateway gateway = builder.build(); + process.addFlowElement(gateway); + process.addFlowElement(new SequenceFlow(splitNode, gatewayId)); + + if (defaultFlowId != null) { + SequenceFlow defaultFlow = new SequenceFlow(gatewayId, joinGatewayId(joinNode)); + defaultFlow.setId(defaultFlowId); + process.addFlowElement(defaultFlow); + } + + for (EdgeDefinition edge : outgoingEdges.get(splitNode)) { + String targetId = + joinNodes.contains(edge.getTo()) ? joinGatewayId(edge.getTo()) : edge.getTo(); + SequenceFlow flow = new SequenceFlow(gatewayId, targetId); + flow.setConditionExpression(buildGatewayCondition(splitNode, edge.getCondition())); + process.addFlowElement(flow); + } + } + + // Add join gateways and their single outgoing flow to the target node + for (String joinNode : joinNodes) { + String gatewayId = joinGatewayId(joinNode); + InclusiveGateway gateway = new InclusiveGatewayBuilder().id(gatewayId).build(); + process.addFlowElement(gateway); + process.addFlowElement(new SequenceFlow(gatewayId, joinNode)); + } + + // Add normal edges (not handled by split/join gateways) + for (EdgeDefinition edgeDefinition : edges) { + String from = edgeDefinition.getFrom(); + String to = edgeDefinition.getTo(); + boolean fromSplit = splitNodes.contains(from); + boolean toJoin = joinNodes.contains(to); + + if (fromSplit) { + // Already wired above via the split gateway + continue; + } + + if (toJoin) { + // Incoming to a join node — wire to the join gateway instead + if (edgeDefinition.getCondition() != null && !edgeDefinition.getCondition().isEmpty()) { + LOG.warn( + "[WorkflowEdge] Edge from='{}' to='{}' has condition='{}' but '{}' is a join node; condition is ignored", + from, + to, + edgeDefinition.getCondition(), + to); + } + SequenceFlow flow = new SequenceFlow(from, joinGatewayId(to)); + process.addFlowElement(flow); + } else { + Edge edge = new Edge(edgeDefinition); + edge.addToWorkflow(model, process); + } } // Configure Exception Flow @@ -71,6 +151,150 @@ public MainWorkflow(WorkflowDefinition workflowDefinition) { this.workflowName = workflowName; } + private Map> buildOutgoingEdgesMap(List edges) { + Map> map = new HashMap<>(); + for (EdgeDefinition edge : edges) { + map.computeIfAbsent(edge.getFrom(), k -> new ArrayList<>()).add(edge); + } + return map; + } + + private Map> buildIncomingEdgesMap(List edges) { + Map> map = new HashMap<>(); + for (EdgeDefinition edge : edges) { + map.computeIfAbsent(edge.getTo(), k -> new ArrayList<>()).add(edge); + } + return map; + } + + /** + * A node is a split point if it has ≥2 outgoing conditional edges AND explicitly declares + * multiple output ports via {@link NodeInterface#getOutputPorts()}. This ensures only batch + * check nodes (CheckEntityAttributes, CheckChangeDescription, DataCompleteness) get inclusive + * gateways — not user approval tasks or other nodes that use result-based conditional routing. + */ + private Set detectSplitNodes( + Map> outgoingEdges, Map nodeInstanceMap) { + Set splits = new HashSet<>(); + for (Map.Entry> entry : outgoingEdges.entrySet()) { + String nodeName = entry.getKey(); + NodeInterface node = nodeInstanceMap.get(nodeName); + if (node == null || node.getOutputPorts().size() < 2) { + continue; + } + long conditionalEdgeCount = + entry.getValue().stream() + .filter(e -> e.getCondition() != null && !e.getCondition().isEmpty()) + .count(); + if (conditionalEdgeCount >= 2) { + splits.add(nodeName); + } + } + return splits; + } + + /** + * A node is a join point if it has ≥2 incoming edges whose source nodes all trace back + * to the same split node (directly or transitively). An inclusive join gateway is needed + * to synchronize the parallel branches before continuing. + */ + private Set detectJoinNodes( + Map> incomingEdges, Set splitNodes) { + Set joins = new HashSet<>(); + for (Map.Entry> entry : incomingEdges.entrySet()) { + String targetNode = entry.getKey(); + List incoming = entry.getValue(); + if (incoming.size() < 2) { + continue; + } + // Find split ancestors reachable from each source node + List> splitAncestorSets = new ArrayList<>(); + for (EdgeDefinition edge : incoming) { + splitAncestorSets.add( + findSplitAncestors(edge.getFrom(), incomingEdges, splitNodes, new HashSet<>())); + } + // If any split node appears in ALL ancestor sets, this node needs a join gateway + Set common = new HashSet<>(splitAncestorSets.get(0)); + for (int i = 1; i < splitAncestorSets.size(); i++) { + common.retainAll(splitAncestorSets.get(i)); + } + if (!common.isEmpty()) { + joins.add(targetNode); + } + } + return joins; + } + + private Set findSplitAncestors( + String nodeName, + Map> incomingEdges, + Set splitNodes, + Set visited) { + Set ancestors = new HashSet<>(); + if (visited.contains(nodeName)) { + return ancestors; + } + visited.add(nodeName); + if (splitNodes.contains(nodeName)) { + ancestors.add(nodeName); + } + List incoming = incomingEdges.get(nodeName); + if (incoming != null) { + for (EdgeDefinition edge : incoming) { + ancestors.addAll(findSplitAncestors(edge.getFrom(), incomingEdges, splitNodes, visited)); + } + } + return ancestors; + } + + private Map buildSplitToJoinMap( + Map> incomingEdges, + Set splitNodes, + Set joinNodes) { + Map result = new HashMap<>(); + for (String joinNode : joinNodes) { + List incoming = incomingEdges.getOrDefault(joinNode, List.of()); + if (incoming.isEmpty()) { + continue; + } + Set ancestors = + findSplitAncestors(incoming.get(0).getFrom(), incomingEdges, splitNodes, new HashSet<>()); + for (String split : ancestors) { + result.putIfAbsent(split, joinNode); + } + } + return result; + } + + /** + * Generates the inclusive gateway condition expression for an outgoing edge from a split node. + * Maps the edge condition value to the appropriate boolean flag variable set by the node impl. + */ + private String buildGatewayCondition(String nodeName, String edgeCondition) { + if (edgeCondition == null || edgeCondition.isEmpty()) { + return null; + } + String flagVariable; + if ("true".equals(edgeCondition)) { + flagVariable = getNamespacedVariableName(nodeName, HAS_TRUE_ENTITIES_VARIABLE); + } else if ("false".equals(edgeCondition)) { + flagVariable = getNamespacedVariableName(nodeName, HAS_FALSE_ENTITIES_VARIABLE); + } else { + // DataCompleteness band name (e.g., "gold", "silver") + flagVariable = + getNamespacedVariableName(nodeName, DataCompletenessImpl.bandFlagVariable(edgeCondition)); + } + return String.format("${%s == true}", flagVariable); + } + + private static String splitGatewayId(String nodeName) { + return nodeName + "_inclusiveSplit"; + } + + private static String joinGatewayId(String nodeName) { + return nodeName + "_inclusiveJoin"; + } + private void configureRuntimeExceptionFlow(Process process) { EndEvent errorEndEvent = new EndEvent("Error"); process.addFlowElement(errorEndEvent.getEndEvent()); @@ -133,7 +357,6 @@ private void validateNode(WorkflowNodeDefinitionInterface nodeDefinition) { "Invalid Workflow: [%s] is expecting '%s' to be an output from [%s], which it is not.", nodeDefinition.getName(), variable, namespace)); } - // Enhanced validation: Check for reachability instead of direct connection if (!validateNodeIsReachable(nodeDefinition.getName(), namespace)) { throw new RuntimeException( String.format( @@ -179,12 +402,9 @@ private boolean validateNodeHasInput(String nodeName, String inputNodeName) { * For example: A -> B -> C, where C can use outputs from A even without a direct edge. */ private boolean validateNodeIsReachable(String targetNode, String sourceNode) { - // First check for direct connection (fast path) if (validateNodeHasInput(targetNode, sourceNode)) { return true; } - - // If no direct connection, check for transitive path through the graph return canReachThroughGraph(sourceNode, targetNode); } @@ -209,7 +429,6 @@ private boolean canReachThroughGraph(String sourceNode, String targetNode) { } visited.add(current); - // Find all nodes that current node connects to for (Map.Entry> entry : incomingEdgesMap.entrySet()) { if (entry.getValue().contains(current)) { String nextNode = entry.getKey(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilder.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilder.java new file mode 100644 index 000000000000..3adeb4e42303 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilder.java @@ -0,0 +1,31 @@ +package org.openmetadata.service.governance.workflows.flowable.builders; + +import org.flowable.bpmn.model.InclusiveGateway; + +public class InclusiveGatewayBuilder extends FlowableElementBuilder { + + private boolean async = false; + private String defaultFlow; + + public InclusiveGatewayBuilder setAsync(boolean async) { + this.async = async; + return this; + } + + public InclusiveGatewayBuilder defaultFlow(String defaultFlowId) { + this.defaultFlow = defaultFlowId; + return this; + } + + @Override + public InclusiveGateway build() { + InclusiveGateway gateway = new InclusiveGateway(); + gateway.setId(id); + gateway.setName(id); + gateway.setAsynchronous(async); + if (defaultFlow != null) { + gateway.setDefaultFlow(defaultFlow); + } + return gateway; + } +} diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckChangeDescriptionTaskImplTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckChangeDescriptionTaskImplTest.java index f5029b334034..034fa53f84bd 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckChangeDescriptionTaskImplTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckChangeDescriptionTaskImplTest.java @@ -6,7 +6,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.openmetadata.service.governance.workflows.Workflow.FALSE_ENTITY_LIST_VARIABLE; -import static org.openmetadata.service.governance.workflows.Workflow.RESULT_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.HAS_FALSE_ENTITIES_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.HAS_TRUE_ENTITIES_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.TRUE_ENTITY_LIST_VARIABLE; import java.lang.reflect.Field; @@ -76,7 +77,8 @@ void testExecute_NoChangeDescription_ReturnsTrueForCreate() { } verify(execution).setVariable(eq("process_" + TRUE_ENTITY_LIST_VARIABLE), eq(entityList)); - verify(execution).setVariable(eq("process_" + RESULT_VARIABLE), eq(true)); + verify(execution).setVariable(eq("process_" + HAS_TRUE_ENTITIES_VARIABLE), eq(true)); + verify(execution).setVariable(eq("process_" + HAS_FALSE_ENTITIES_VARIABLE), eq(false)); } @Test @@ -112,7 +114,8 @@ void testExecute_WithMatchingChangeDescription() { } verify(execution).setVariable(eq("process_" + TRUE_ENTITY_LIST_VARIABLE), eq(entityList)); - verify(execution).setVariable(eq("process_" + RESULT_VARIABLE), eq(true)); + verify(execution).setVariable(eq("process_" + HAS_TRUE_ENTITIES_VARIABLE), eq(true)); + verify(execution).setVariable(eq("process_" + HAS_FALSE_ENTITIES_VARIABLE), eq(false)); } @Test @@ -148,7 +151,8 @@ void testExecute_WithNonMatchingChangeDescription() { } verify(execution).setVariable(eq("process_" + FALSE_ENTITY_LIST_VARIABLE), eq(entityList)); - verify(execution).setVariable(eq("process_" + RESULT_VARIABLE), eq(false)); + verify(execution).setVariable(eq("process_" + HAS_TRUE_ENTITIES_VARIABLE), eq(false)); + verify(execution).setVariable(eq("process_" + HAS_FALSE_ENTITIES_VARIABLE), eq(true)); } @Test @@ -183,7 +187,8 @@ void testExecute_AndConditionAllMatch() { impl.execute(execution); } - verify(execution).setVariable(eq("process_" + RESULT_VARIABLE), eq(true)); + verify(execution).setVariable(eq("process_" + HAS_TRUE_ENTITIES_VARIABLE), eq(true)); + verify(execution).setVariable(eq("process_" + HAS_FALSE_ENTITIES_VARIABLE), eq(false)); } @Test diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImplTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImplTest.java index d58a68c4a39a..df534aae4711 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImplTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImplTest.java @@ -7,7 +7,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.openmetadata.service.governance.workflows.Workflow.FALSE_ENTITY_LIST_VARIABLE; -import static org.openmetadata.service.governance.workflows.Workflow.RESULT_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.HAS_FALSE_ENTITIES_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.HAS_TRUE_ENTITIES_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.TRUE_ENTITY_LIST_VARIABLE; import java.lang.reflect.Field; @@ -72,7 +73,8 @@ void testExecute_EntityMatchesRule() { } verify(execution).setVariable(eq("process_" + TRUE_ENTITY_LIST_VARIABLE), eq(entityList)); - verify(execution).setVariable(eq("process_" + RESULT_VARIABLE), eq(true)); + verify(execution).setVariable(eq("process_" + HAS_TRUE_ENTITIES_VARIABLE), eq(true)); + verify(execution).setVariable(eq("process_" + HAS_FALSE_ENTITIES_VARIABLE), eq(false)); } @Test @@ -97,7 +99,8 @@ void testExecute_EntityDoesNotMatchRule() { } verify(execution).setVariable(eq("process_" + FALSE_ENTITY_LIST_VARIABLE), eq(entityList)); - verify(execution).setVariable(eq("process_" + RESULT_VARIABLE), eq(false)); + verify(execution).setVariable(eq("process_" + HAS_TRUE_ENTITIES_VARIABLE), eq(false)); + verify(execution).setVariable(eq("process_" + HAS_FALSE_ENTITIES_VARIABLE), eq(true)); } @Test @@ -110,7 +113,8 @@ void testExecute_EmptyEntityList() { impl.execute(execution); - verify(execution).setVariable(eq("process_" + RESULT_VARIABLE), eq(false)); + verify(execution).setVariable(eq("process_" + HAS_TRUE_ENTITIES_VARIABLE), eq(false)); + verify(execution).setVariable(eq("process_" + HAS_FALSE_ENTITIES_VARIABLE), eq(true)); } @Test @@ -149,7 +153,8 @@ void testExecute_MultipleEntities_PartialMatch() { impl.execute(execution); } - verify(execution).setVariable(eq("process_" + RESULT_VARIABLE), eq(true)); + verify(execution).setVariable(eq("process_" + HAS_TRUE_ENTITIES_VARIABLE), eq(true)); + verify(execution).setVariable(eq("process_" + HAS_FALSE_ENTITIES_VARIABLE), eq(true)); } private void injectExpression(Object target, String fieldName, Expression value) diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImplTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImplTest.java new file mode 100644 index 000000000000..d0241f45bc32 --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImplTest.java @@ -0,0 +1,97 @@ +package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl; + +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.lang.reflect.Field; +import java.util.List; +import java.util.Map; +import org.flowable.common.engine.api.delegate.Expression; +import org.flowable.engine.delegate.DelegateExecution; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; +import org.openmetadata.schema.type.Include; +import org.openmetadata.service.Entity; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +class DataCompletenessImplTest { + + @Mock private DelegateExecution execution; + @Mock private Expression fieldsToCheckExpr; + @Mock private Expression qualityBandsExpr; + @Mock private Expression inputNamespaceMapExpr; + + private DataCompletenessImpl impl; + + private static final String QUALITY_BANDS_JSON = + "[{\"name\":\"gold\",\"minimumScore\":0.8},{\"name\":\"silver\",\"minimumScore\":0.5}]"; + + @BeforeEach + void setUp() throws Exception { + impl = new DataCompletenessImpl(); + injectExpression(impl, "fieldsToCheckExpr", fieldsToCheckExpr); + injectExpression(impl, "qualityBandsExpr", qualityBandsExpr); + injectExpression(impl, "inputNamespaceMapExpr", inputNamespaceMapExpr); + + when(execution.getProcessDefinitionId()).thenReturn("process:1:test"); + when(execution.getCurrentActivityId()).thenReturn("process.dataCompletenessTask"); + when(execution.getParent()).thenReturn(null); + + when(inputNamespaceMapExpr.getValue(execution)).thenReturn("{\"entityList\":\"global\"}"); + when(fieldsToCheckExpr.getValue(execution)).thenReturn("[\"description\"]"); + when(qualityBandsExpr.getValue(execution)).thenReturn(QUALITY_BANDS_JSON); + } + + @Test + void testExecute_EmptyEntityList_SetsAllFlagsToFalse() { + when(execution.getVariable("global_entityList")).thenReturn(List.of()); + + try (MockedStatic entityMock = mockStatic(Entity.class)) { + entityMock + .when(() -> Entity.getEntitiesByLinks(anyList(), eq("*"), eq(Include.ALL))) + .thenReturn(Map.of()); + + impl.execute(execution); + } + + // All band flags must be false — the split gateway's defaultFlow handles the empty case + verify(execution).setVariable(eq("process_has_gold_entities"), eq(false)); + verify(execution).setVariable(eq("process_has_silver_entities"), eq(false)); + } + + @Test + void testExecute_AllEntitiesFailProcessing_SetsAllFlagsToFalse() { + List entityList = List.of("<#E::table::test.db.table>"); + when(execution.getVariable("global_entityList")).thenReturn(entityList); + + try (MockedStatic entityMock = mockStatic(Entity.class)) { + // Entity not found in map → goes to failedEntities, no band assignment + entityMock + .when(() -> Entity.getEntitiesByLinks(anyList(), eq("*"), eq(Include.ALL))) + .thenReturn(Map.of()); + + impl.execute(execution); + } + + // All band flags must be false — the split gateway's defaultFlow handles the empty case + verify(execution).setVariable(eq("process_has_gold_entities"), eq(false)); + verify(execution).setVariable(eq("process_has_silver_entities"), eq(false)); + } + + private void injectExpression(Object target, String fieldName, Expression value) + throws Exception { + Field field = DataCompletenessImpl.class.getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + } +} diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflowInclusiveGatewayTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflowInclusiveGatewayTest.java new file mode 100644 index 000000000000..900d4fb11947 --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflowInclusiveGatewayTest.java @@ -0,0 +1,258 @@ +package org.openmetadata.service.governance.workflows.flowable; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Arrays; +import java.util.List; +import org.flowable.bpmn.model.FlowElement; +import org.flowable.bpmn.model.InclusiveGateway; +import org.flowable.bpmn.model.Process; +import org.flowable.bpmn.model.SequenceFlow; +import org.junit.jupiter.api.Test; +import org.openmetadata.schema.governance.workflows.WorkflowConfiguration; +import org.openmetadata.schema.governance.workflows.WorkflowDefinition; +import org.openmetadata.schema.governance.workflows.elements.EdgeDefinition; +import org.openmetadata.schema.governance.workflows.elements.WorkflowNodeDefinitionInterface; +import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.CheckEntityAttributesTaskDefinition; +import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.Config__2; +import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.Config__4; +import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.DataCompletenessTaskDefinition; +import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.QualityBand; +import org.openmetadata.schema.governance.workflows.elements.nodes.endEvent.EndEventDefinition; +import org.openmetadata.schema.governance.workflows.elements.nodes.startEvent.StartEventDefinition; +import org.openmetadata.schema.governance.workflows.elements.triggers.NoOpTriggerDefinition; + +class MainWorkflowInclusiveGatewayTest { + + @Test + void testSplitGatewayInserted_WhenCheckNodeHasTwoConditionalOutgoingEdges() { + // Workflow: start → check → certEnd(true) / notifyEnd(false) — independent ends + WorkflowDefinition workflow = + createWorkflow( + "SplitOnlyWorkflow", + List.of( + startEvent("start"), + checkAttributesNode("check"), + endEvent("certEnd"), + endEvent("notifyEnd")), + List.of( + edge("start", "check", null), + edge("check", "certEnd", "true"), + edge("check", "notifyEnd", "false"))); + + MainWorkflow mainWorkflow = new MainWorkflow(workflow); + Process process = mainWorkflow.getModel().getProcesses().get(0); + + // Split gateway should be injected after "check" + FlowElement splitGateway = process.getFlowElement("check_inclusiveSplit"); + assertNotNull(splitGateway, "Split gateway should exist after check node"); + assertInstanceOf(InclusiveGateway.class, splitGateway); + + // No join gateway needed — branches lead to separate end events + assertNull(process.getFlowElement("certEnd_inclusiveJoin"), "No join when branches diverge"); + assertNull(process.getFlowElement("notifyEnd_inclusiveJoin"), "No join when branches diverge"); + + // Verify two conditional flows from split gateway + List conditions = + process.getFlowElements().stream() + .filter(e -> e instanceof SequenceFlow) + .map(e -> (SequenceFlow) e) + .filter(f -> "check_inclusiveSplit".equals(f.getSourceRef())) + .map(SequenceFlow::getConditionExpression) + .toList(); + + assertEquals(2, conditions.size()); + assertTrue( + conditions.contains("${check_hasTrueEntities == true}"), + "True branch uses hasTrueEntities"); + assertTrue( + conditions.contains("${check_hasFalseEntities == true}"), + "False branch uses hasFalseEntities"); + } + + @Test + void testJoinGatewayInserted_WhenBranchesConverge() { + // Workflow: check → end(true) / end(false) — both branches converge on same end node + WorkflowDefinition workflow = + createWorkflow( + "SplitJoinWorkflow", + List.of(startEvent("start"), checkAttributesNode("check"), endEvent("end")), + List.of( + edge("start", "check", null), + edge("check", "end", "true"), + edge("check", "end", "false"))); + + MainWorkflow mainWorkflow = new MainWorkflow(workflow); + Process process = mainWorkflow.getModel().getProcesses().get(0); + + FlowElement splitElement = process.getFlowElement("check_inclusiveSplit"); + assertNotNull(splitElement, "Split gateway should be present"); + InclusiveGateway splitGateway = (InclusiveGateway) splitElement; + assertNotNull( + splitGateway.getDefaultFlow(), + "Split gateway must have a defaultFlow for the empty-condition case"); + + FlowElement joinGateway = process.getFlowElement("end_inclusiveJoin"); + assertNotNull(joinGateway, "Join gateway should exist before end"); + assertInstanceOf(InclusiveGateway.class, joinGateway); + + long flowsToJoin = + process.getFlowElements().stream() + .filter(e -> e instanceof SequenceFlow) + .map(e -> (SequenceFlow) e) + .filter(f -> "end_inclusiveJoin".equals(f.getTargetRef())) + .count(); + // 2 conditional branches + 1 defaultFlow (skip-path when all conditions false) + assertEquals( + 3, + flowsToJoin, + "Both conditional branches and defaultFlow should flow into the join gateway"); + + long flowsFromJoin = + process.getFlowElements().stream() + .filter(e -> e instanceof SequenceFlow) + .map(e -> (SequenceFlow) e) + .filter(f -> "end_inclusiveJoin".equals(f.getSourceRef())) + .count(); + assertEquals(1, flowsFromJoin, "Join gateway has exactly one outgoing flow"); + } + + @Test + void testNoGatewaysInserted_WhenNoConditionalEdges() { + WorkflowDefinition workflow = + createWorkflow( + "LinearWorkflow", + List.of(startEvent("start"), endEvent("end")), + List.of(edge("start", "task", null), edge("task", "end", null))); + + MainWorkflow mainWorkflow = new MainWorkflow(workflow); + Process process = mainWorkflow.getModel().getProcesses().get(0); + + long gatewayCount = + process.getFlowElements().stream().filter(e -> e instanceof InclusiveGateway).count(); + assertEquals(0, gatewayCount, "No gateways for linear workflows"); + } + + @Test + void testNoGatewayInserted_ForNonCheckNodeWithConditionalEdges() { + // UserApproval (not a check node) has "true"/"false" edges — should NOT get inclusive gateway + WorkflowDefinition workflow = + createWorkflow( + "UserApprovalWorkflow", + List.of(startEvent("start"), endEvent("endTrue"), endEvent("endFalse")), + List.of( + edge("start", "UserApproval", null), + edge("UserApproval", "endTrue", "true"), + edge("UserApproval", "endFalse", "false"))); + + MainWorkflow mainWorkflow = new MainWorkflow(workflow); + Process process = mainWorkflow.getModel().getProcesses().get(0); + + // UserApproval is not in the nodes list → nodeInstanceMap has no entry → no gateway + assertNull( + process.getFlowElement("UserApproval_inclusiveSplit"), + "Non-check node should not get an inclusive gateway"); + } + + @Test + void testDataCompleteness_BandConditionsGenerateCorrectFlags() { + WorkflowDefinition workflow = + createWorkflow( + "DataQualityWorkflow", + List.of( + startEvent("start"), + dataCompletenessNode("dataQuality", "gold", "silver"), + endEvent("goldEnd"), + endEvent("silverEnd")), + List.of( + edge("start", "dataQuality", null), + edge("dataQuality", "goldEnd", "gold"), + edge("dataQuality", "silverEnd", "silver"))); + + MainWorkflow mainWorkflow = new MainWorkflow(workflow); + Process process = mainWorkflow.getModel().getProcesses().get(0); + + assertNotNull( + process.getFlowElement("dataQuality_inclusiveSplit"), + "Split gateway for data quality node"); + + List conditions = + process.getFlowElements().stream() + .filter(e -> e instanceof SequenceFlow) + .map(e -> (SequenceFlow) e) + .filter(f -> "dataQuality_inclusiveSplit".equals(f.getSourceRef())) + .map(SequenceFlow::getConditionExpression) + .toList(); + + assertTrue( + conditions.contains("${dataQuality_has_gold_entities == true}"), "Gold band condition"); + assertTrue( + conditions.contains("${dataQuality_has_silver_entities == true}"), "Silver band condition"); + } + + // --- Helpers --- + + private WorkflowDefinition createWorkflow( + String name, List nodes, List edges) { + WorkflowDefinition workflow = new WorkflowDefinition(); + workflow.setName(name); + workflow.setFullyQualifiedName(name); + workflow.setConfig(new WorkflowConfiguration()); + workflow.setTrigger(new NoOpTriggerDefinition()); + workflow.setNodes(nodes); + workflow.setEdges(edges); + return workflow; + } + + private StartEventDefinition startEvent(String name) { + StartEventDefinition def = new StartEventDefinition(); + def.setName(name); + return def; + } + + private EndEventDefinition endEvent(String name) { + EndEventDefinition def = new EndEventDefinition(); + def.setName(name); + return def; + } + + private CheckEntityAttributesTaskDefinition checkAttributesNode(String name) { + Config__2 config = new Config__2(); + config.setRules("[]"); + CheckEntityAttributesTaskDefinition def = new CheckEntityAttributesTaskDefinition(); + def.setName(name); + def.setConfig(config); + return def; + } + + private DataCompletenessTaskDefinition dataCompletenessNode(String name, String... bandNames) { + List bands = + Arrays.stream(bandNames) + .map( + n -> { + QualityBand b = new QualityBand(); + b.setName(n); + b.setMinimumScore(0.0); + return b; + }) + .toList(); + Config__4 config = new Config__4(); + config.setQualityBands(bands); + DataCompletenessTaskDefinition def = new DataCompletenessTaskDefinition(); + def.setName(name); + def.setConfig(config); + return def; + } + + private EdgeDefinition edge(String from, String to, String condition) { + EdgeDefinition def = new EdgeDefinition(); + def.setFrom(from); + def.setTo(to); + def.setCondition(condition); + return def; + } +} diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilderTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilderTest.java new file mode 100644 index 000000000000..b7a7b9a8a550 --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/flowable/builders/InclusiveGatewayBuilderTest.java @@ -0,0 +1,39 @@ +package org.openmetadata.service.governance.workflows.flowable.builders; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.flowable.bpmn.model.InclusiveGateway; +import org.junit.jupiter.api.Test; + +class InclusiveGatewayBuilderTest { + + @Test + void testBuildDefaultsToSync() { + InclusiveGateway gateway = new InclusiveGatewayBuilder().id("splitGateway").build(); + + assertEquals("splitGateway", gateway.getId()); + assertEquals("splitGateway", gateway.getName()); + assertFalse(gateway.isAsynchronous()); + assertNull(gateway.getDefaultFlow()); + } + + @Test + void testBuildWithDefaultFlow() { + InclusiveGateway gateway = + new InclusiveGatewayBuilder().id("joinGateway").defaultFlow("flow1").build(); + + assertEquals("joinGateway", gateway.getId()); + assertEquals("flow1", gateway.getDefaultFlow()); + } + + @Test + void testBuildWithAsyncTrue() { + InclusiveGateway gateway = new InclusiveGatewayBuilder().id("g1").setAsync(true).build(); + + assertEquals("g1", gateway.getId()); + assertTrue(gateway.isAsynchronous()); + } +} diff --git a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/dataCompletenessTask.json b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/dataCompletenessTask.json index f175d8e0f1d6..cc48f1cdf80d 100644 --- a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/dataCompletenessTask.json +++ b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/dataCompletenessTask.json @@ -111,6 +111,11 @@ "items": { "type": "string" }, "default": ["completenessScore", "qualityBand", "filledFieldsCount", "totalFieldsCount", "missingFields", "filledFields", "result"], "additionalItems": false + }, + "branches": { + "type": "array", + "items": { "type": "string" }, + "default": [] } }, "required": ["name", "config"], diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/elements/nodes/automatedTask/dataCompletenessTask.ts b/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/elements/nodes/automatedTask/dataCompletenessTask.ts index 67fe304518c5..bff4bd83ca3a 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/elements/nodes/automatedTask/dataCompletenessTask.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/elements/nodes/automatedTask/dataCompletenessTask.ts @@ -14,7 +14,8 @@ * Evaluates entity data completeness based on field presence and outputs quality bands. */ export interface DataCompletenessTask { - config: CompletenessConfiguration; + branches?: string[]; + config: CompletenessConfiguration; /** * Description of what this completeness check does */