Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
92c10b5
Feat(Workflows): Auto-inject inclusive gateways for batch routing
yan-3005 Apr 1, 2026
43d9935
Fix inclusive gateway deadlock on empty entity list; fix test node re…
yan-3005 Apr 1, 2026
fbb6bbb
Address self-review findings on inclusive gateway implementation
yan-3005 Apr 1, 2026
17311bb
Fix: set InclusiveGateway async=false to prevent CI timeout
yan-3005 Apr 5, 2026
b9d4f8f
Fix null guard on getConfig() in DataCompletenessTask
yan-3005 Apr 6, 2026
625bdbb
Merge branch 'ram/workflow-improvements' into ram/inclusive-gateway
yan-3005 Apr 6, 2026
12cd155
Merge branch 'ram/workflow-improvements' into ram/inclusive-gateway
yan-3005 Apr 7, 2026
8e25a8d
Merge branch 'ram/workflow-improvements' into ram/inclusive-gateway
yan-3005 Apr 12, 2026
3362332
feat(workflows): add branches property to dataCompletenessTask schema
yan-3005 Apr 14, 2026
8c890fe
Update generated TypeScript types
github-actions[bot] Apr 14, 2026
2e4b97f
Merge branch 'ram/workflow-improvements' into ram/inclusive-gateway
yan-3005 Apr 14, 2026
32935fb
Merge branch 'ram/workflow-improvements' into ram/inclusive-gateway
yan-3005 Apr 15, 2026
27430bf
Merge branch 'ram/workflow-improvements' into ram/inclusive-gateway
yan-3005 Apr 15, 2026
d7b9124
Merge branch 'ram/workflow-improvements' into ram/inclusive-gateway
yan-3005 Apr 15, 2026
4534a20
Merge branch 'ram/workflow-improvements' into ram/inclusive-gateway
yan-3005 Apr 15, 2026
579e04e
Merge branch 'ram/workflow-improvements' into ram/inclusive-gateway
yan-3005 Apr 15, 2026
c3c7928
Merge remote-tracking branch 'origin/ram/workflow-improvements' into …
yan-3005 Apr 15, 2026
6b5b549
Merge branch 'ram/inclusive-gateway' of https://github.com/open-metad…
yan-3005 Apr 15, 2026
2675c26
Merge branch 'ram/workflow-improvements' into ram/inclusive-gateway
yan-3005 Apr 15, 2026
051c22c
Merge branch 'ram/workflow-improvements' into ram/inclusive-gateway
yan-3005 Apr 23, 2026
575f490
Merge ram/workflow-improvements into ram/inclusive-gateway
yan-3005 Apr 28, 2026
e73f5cc
Merge remote-tracking branch 'origin/ram/workflow-improvements' into …
yan-3005 May 5, 2026
b02de0a
Merge branch 'ram/workflow-improvements' into ram/inclusive-gateway
yan-3005 May 5, 2026
bfe6299
Merge branch 'ram/workflow-improvements' into ram/inclusive-gateway
yan-3005 May 5, 2026
c6df6f5
Merge branch 'ram/workflow-improvements' into ram/inclusive-gateway
yan-3005 May 5, 2026
e27a40c
Merge branch 'ram/workflow-improvements' into ram/inclusive-gateway
yan-3005 May 5, 2026
efd006b
fix(governance): address blocking review comments on inclusive gatewa…
yan-3005 May 5, 2026
a975793
Merge branch 'ram/workflow-improvements' into ram/inclusive-gateway
yan-3005 May 5, 2026
c14a61a
Merge branch 'ram/workflow-improvements' into ram/inclusive-gateway
yan-3005 May 5, 2026
711e237
Merge branch 'ram/workflow-improvements' into ram/inclusive-gateway
yan-3005 May 5, 2026
cfa1bcd
fix(governance): prevent workflow from restoring soft-deleted entities
yan-3005 May 5, 2026
3c190d1
Merge branch 'ram/workflow-improvements' into ram/inclusive-gateway
yan-3005 May 5, 2026
d9607e3
Merge branch 'ram/workflow-improvements' into ram/inclusive-gateway
yan-3005 May 6, 2026
66e5c87
Merge branch 'ram/workflow-improvements' into ram/inclusive-gateway
yan-3005 May 6, 2026
4e43ae2
Merge branch 'ram/workflow-improvements' into ram/inclusive-gateway
yan-3005 May 6, 2026
5c2c2d2
Merge branch 'ram/workflow-improvements' into ram/inclusive-gateway
yan-3005 May 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public class Workflow {
public static final String ENTITY_LIST_VARIABLE = "entityList";
public static final String TRUE_ENTITY_LIST_VARIABLE = "true_entityList";
public static final String FALSE_ENTITY_LIST_VARIABLE = "false_entityList";
public static final String HAS_TRUE_ENTITIES_VARIABLE = "hasTrueEntities";
public static final String HAS_FALSE_ENTITIES_VARIABLE = "hasFalseEntities";
public static final String BATCH_SINK_PROCESSED_VARIABLE = "batchSinkProcessed";
public static final String TRIGGERING_OBJECT_ID_VARIABLE = "triggeringObjectId";
public static final String RECOGNIZER_FEEDBACK = "recognizerFeedback";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.flowable.bpmn.model.Activity;
import org.flowable.bpmn.model.BoundaryEvent;
import org.flowable.bpmn.model.BpmnModel;
Expand All @@ -22,6 +23,16 @@
public interface NodeInterface {
void addToWorkflow(BpmnModel model, Process process);

/**
* Returns the output ports this node can produce. Nodes that split a batch into multiple
* entity lists (e.g. CheckEntityAttributes → "true"/"false") declare their ports here.
* The MainWorkflow uses this to automatically inject inclusive gateways at split/join points.
* The default (empty set) means no inclusive gateway is injected for this node.
*/
default Set<String> getOutputPorts() {
return Set.of();
}

default BoundaryEvent getRuntimeExceptionBoundaryEvent() {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.flowable.bpmn.model.BoundaryEvent;
import org.flowable.bpmn.model.BpmnModel;
Expand Down Expand Up @@ -84,6 +85,11 @@ public CheckChangeDescriptionTask(
this.subProcess = subProcess;
}

@Override
public Set<String> getOutputPorts() {
return Set.of("true", "false");
}

@Override
public BoundaryEvent getRuntimeExceptionBoundaryEvent() {
return runtimeExceptionBoundaryEvent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.flowable.bpmn.model.BoundaryEvent;
import org.flowable.bpmn.model.BpmnModel;
Expand Down Expand Up @@ -77,6 +78,11 @@ public CheckEntityAttributesTask(
this.subProcess = subProcess;
}

@Override
public Set<String> getOutputPorts() {
return Set.of("true", "false");
}

@Override
public BoundaryEvent getRuntimeExceptionBoundaryEvent() {
return runtimeExceptionBoundaryEvent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.flowable.bpmn.model.BoundaryEvent;
import org.flowable.bpmn.model.BpmnModel;
Expand All @@ -19,6 +22,7 @@
import org.flowable.bpmn.model.SubProcess;
import org.openmetadata.schema.governance.workflows.WorkflowConfiguration;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.DataCompletenessTaskDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.QualityBand;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.governance.workflows.elements.NodeInterface;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl.DataCompletenessImpl;
Expand All @@ -32,9 +36,17 @@
public class DataCompletenessTask implements NodeInterface {
private final SubProcess subProcess;
private final BoundaryEvent runtimeExceptionBoundaryEvent;
private final Set<String> outputPorts;

public DataCompletenessTask(
DataCompletenessTaskDefinition nodeDefinition, WorkflowConfiguration workflowConfig) {
this.outputPorts =
Optional.ofNullable(nodeDefinition.getConfig())
.map(c -> c.getQualityBands())
.orElse(List.of())
.stream()
.map(QualityBand::getName)
.collect(Collectors.toSet());
String subProcessId = nodeDefinition.getName();

SubProcess subProcess = new SubProcessBuilder().id(subProcessId).build();
Expand Down Expand Up @@ -107,6 +119,11 @@ private ServiceTask getDataCompletenessServiceTask(
return builder.build();
}

@Override
public Set<String> getOutputPorts() {
return outputPorts;
}

@Override
public void addToWorkflow(BpmnModel model, Process process) {
process.addFlowElement(subProcess);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.FALSE_ENTITY_LIST_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.RESULT_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.HAS_FALSE_ENTITIES_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.HAS_TRUE_ENTITIES_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.TRUE_ENTITY_LIST_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION;
import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId;
Expand Down Expand Up @@ -77,10 +78,11 @@ public void execute(DelegateExecution execution) {
}
}

boolean result = !trueEntityList.isEmpty();
varHandler.setNodeVariable(TRUE_ENTITY_LIST_VARIABLE, trueEntityList);
varHandler.setNodeVariable(FALSE_ENTITY_LIST_VARIABLE, falseEntityList);
varHandler.setNodeVariable(RESULT_VARIABLE, result);
varHandler.setNodeVariable(HAS_TRUE_ENTITIES_VARIABLE, !trueEntityList.isEmpty());
varHandler.setNodeVariable(
HAS_FALSE_ENTITIES_VARIABLE, !falseEntityList.isEmpty() || entityList.isEmpty());
} catch (Exception exc) {
LOG.error(
"[{}] Failure: ", getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()), exc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.FALSE_ENTITY_LIST_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.RESULT_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.HAS_FALSE_ENTITIES_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.HAS_TRUE_ENTITIES_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.TRUE_ENTITY_LIST_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION;
import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId;
Expand Down Expand Up @@ -69,10 +70,11 @@ public void execute(DelegateExecution execution) {
}
}

boolean result = !trueEntityList.isEmpty();
varHandler.setNodeVariable(TRUE_ENTITY_LIST_VARIABLE, trueEntityList);
varHandler.setNodeVariable(FALSE_ENTITY_LIST_VARIABLE, falseEntityList);
varHandler.setNodeVariable(RESULT_VARIABLE, result);
varHandler.setNodeVariable(HAS_TRUE_ENTITIES_VARIABLE, !trueEntityList.isEmpty());
varHandler.setNodeVariable(
HAS_FALSE_ENTITIES_VARIABLE, !falseEntityList.isEmpty() || entityList.isEmpty());
} catch (Exception exc) {
LOG.error(
"[{}] Failure: ", getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()), exc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import static org.openmetadata.service.governance.workflows.Workflow.ENTITY_LIST_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.RESULT_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION;
import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId;

Expand Down Expand Up @@ -121,10 +120,12 @@ public void execute(DelegateExecution execution) {
entityList.size());
}

// Per-band entity lists — ALL bands stored, empty or not (inclusive gateway ready)
// Per-band entity lists — ALL bands stored, empty or not.
// When all flags are false the split gateway falls through to its defaultFlow.
for (QualityBand band : qualityBands) {
List<String> bandEntities = entitiesByBand.getOrDefault(band.getName(), List.of());
varHandler.setNodeVariable(band.getName() + "_" + ENTITY_LIST_VARIABLE, bandEntities);
varHandler.setNodeVariable(bandFlagVariable(band.getName()), !bandEntities.isEmpty());
}

// Priority band = highest minimumScore band that has entities
Expand All @@ -142,7 +143,6 @@ public void execute(DelegateExecution execution) {
priorityBand,
entitiesByBand.keySet());

varHandler.setNodeVariable(RESULT_VARIABLE, priorityBand);
varHandler.setNodeVariable("entityResults", entityResults);

// Scalar outputs for backward compat when processing a single entity
Expand All @@ -165,6 +165,10 @@ public void execute(DelegateExecution execution) {
}
}

public static String bandFlagVariable(String bandName) {
return "has_" + bandName + "_entities";
}

private void storeFieldList(
WorkflowVariableHandler varHandler, String varName, List<String> fields) {
if (fields.size() <= 50) {
Expand Down
Loading
Loading