Skip to content

Commit c236066

Browse files
authored
[Improvement-16767] Takeover sub-workflow in sub-workflow-task (#17153)
1 parent 1caa65d commit c236066

36 files changed

Lines changed: 849 additions & 543 deletions

File tree

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.dolphinscheduler.common.enums;
1919

20+
import java.util.Arrays;
2021
import java.util.HashMap;
2122
import java.util.Map;
2223

@@ -93,6 +94,10 @@ public boolean canPause() {
9394
|| this == SERIAL_WAIT;
9495
}
9596

97+
public boolean canFailover() {
98+
return Arrays.stream(NEED_FAILOVER_STATES).anyMatch(x -> x == this.getCode());
99+
}
100+
96101
public boolean canDirectPauseInDB() {
97102
return this == SERIAL_WAIT;
98103
}

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,15 @@ public interface WorkflowInstanceMapper extends BaseMapper<WorkflowInstance> {
5656
List<WorkflowInstance> queryByHostAndStatus(@Param("host") String host,
5757
@Param("states") int[] stateArray);
5858

59+
/**
60+
* query workflow instance by host and stateArray which is not sub workflow
61+
*
62+
* @param host host
63+
* @param stateArray stateArray
64+
* @return workflow instance list
65+
*/
66+
List<WorkflowInstance> queryMainWorkflowByHostAndStatus(@Param("host") String host,
67+
@Param("states") int[] stateArray);
5968
/**
6069
* query workflow instance host by stateArray
6170
*

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ public List<String> queryNeedFailoverMasters() {
170170

171171
@Override
172172
public List<WorkflowInstance> queryNeedFailoverWorkflowInstances(String masterAddress) {
173-
return mybatisMapper.queryByHostAndStatus(masterAddress,
173+
return mybatisMapper.queryMainWorkflowByHostAndStatus(masterAddress,
174174
WorkflowExecutionStatus.getNeedFailoverWorkflowInstanceState());
175175
}
176176
}

dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,22 @@
5050
</if>
5151
order by id asc
5252
</select>
53+
<select id="queryMainWorkflowByHostAndStatus" resultType="org.apache.dolphinscheduler.dao.entity.WorkflowInstance">
54+
select
55+
<include refid="baseSql"/>
56+
from t_ds_workflow_instance
57+
where is_sub_workflow=0
58+
<if test="host != null and host != ''">
59+
and host=#{host}
60+
</if>
61+
<if test="states != null and states.length != 0">
62+
and state in
63+
<foreach collection="states" item="i" open="(" close=")" separator=",">
64+
#{i}
65+
</foreach>
66+
</if>
67+
order by id asc
68+
</select>
5369
<select id="queryNeedFailoverWorkflowInstanceHost" resultType="String">
5470
select distinct host
5571
from t_ds_workflow_instance

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/subworkflow/SubWorkflowLogicTask.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.dolphinscheduler.server.master.engine.executor.plugin.ITaskParameterDeserializer;
4141
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
4242
import org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
43+
import org.apache.dolphinscheduler.server.master.failover.WorkflowFailover;
4344
import org.apache.dolphinscheduler.task.executor.ITaskExecutor;
4445
import org.apache.dolphinscheduler.task.executor.events.TaskExecutorRuntimeContextChangedLifecycleEvent;
4546

@@ -148,15 +149,33 @@ private SubWorkflowLogicTaskRuntimeContext initializeSubWorkflowInstance() {
148149
return triggerNewSubWorkflow();
149150
}
150151

151-
switch (workflowExecutionRunnable.getWorkflowInstance().getCommandType()) {
152+
// In some cases, workflow instance's command type has not been changed,
153+
// there should better to use command.type instead
154+
switch (workflowExecutionRunnable.getWorkflowExecuteContext().getCommand().getCommandType()) {
155+
case RECOVER_TOLERANCE_FAULT_PROCESS:
156+
return recoverFromFaultTolerantTasks();
152157
case RECOVER_SUSPENDED_PROCESS:
153158
return recoverFromSuspendTasks();
154159
case START_FAILURE_TASK_PROCESS:
155160
return recoverFromFailedTasks();
156161
default:
157162
return triggerNewSubWorkflow();
158163
}
164+
}
165+
166+
private SubWorkflowLogicTaskRuntimeContext recoverFromFaultTolerantTasks() {
167+
final WorkflowInstanceDao workflowInstanceDao = applicationContext.getBean(WorkflowInstanceDao.class);
168+
final WorkflowInstance subWorkflowInstance = workflowInstanceDao.queryById(
169+
subWorkflowLogicTaskRuntimeContext.getSubWorkflowInstanceId());
170+
171+
if (subWorkflowInstance != null && subWorkflowInstance.getState().canFailover()) {
172+
// Only handle sub-workflow's fail-over in SubWorkflowLogicTask's fail-over
173+
applicationContext.getBean(WorkflowFailover.class).failoverWorkflow(subWorkflowInstance);
174+
return subWorkflowLogicTaskRuntimeContext;
175+
}
159176

177+
// The sub-workflow's state is bad, trigger a new sub-workflow instance
178+
return triggerNewSubWorkflow();
160179
}
161180

162181
private SubWorkflowLogicTaskRuntimeContext recoverFromFailedTasks() {

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/Repository.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ public WorkflowInstance queryWorkflowInstance(final Integer workflowInstanceId)
5757
return workflowInstanceDao.queryById(workflowInstanceId);
5858
}
5959

60+
public List<WorkflowInstance> queryAllWorkflowInstance() {
61+
return workflowInstanceDao.queryAll();
62+
}
63+
6064
/**
6165
* Return the list of task instances for a given workflow definition in ascending order of their IDs.
6266
*/
@@ -79,4 +83,8 @@ public List<TaskInstance> queryTaskInstance(final Integer workflowInstanceId) {
7983
.collect(Collectors.toList());
8084
}
8185

86+
public List<TaskInstance> queryAllTaskInstance() {
87+
return taskInstanceDao.queryAll();
88+
}
89+
8290
}

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowTestCaseContext.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class WorkflowTestCaseContext {
4343

4444
private List<WorkflowDefinition> workflows;
4545

46-
private WorkflowInstance workflowInstance;
46+
private List<WorkflowInstance> workflowInstances;
4747

4848
private List<TaskInstance> taskInstances;
4949

@@ -61,5 +61,4 @@ public WorkflowDefinition getOneWorkflow() {
6161
}
6262
return workflows.get(0);
6363
}
64-
6564
}

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowTestCaseContextFactory.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,8 @@ public WorkflowTestCaseContext initializeContextFromYaml(final String yamlPath)
9090
initializeWorkflowDefinitionToDB(workflowTestCaseContext.getWorkflows());
9191
initializeTaskDefinitionsToDB(workflowTestCaseContext.getTasks());
9292
initializeTaskRelationsToDB(workflowTestCaseContext.getTaskRelations());
93-
if (workflowTestCaseContext.getWorkflowInstance() != null) {
94-
initializeWorkflowInstanceToDB(workflowTestCaseContext.getWorkflowInstance());
93+
if (CollectionUtils.isNotEmpty(workflowTestCaseContext.getWorkflowInstances())) {
94+
initializeWorkflowInstancesToDB(workflowTestCaseContext.getWorkflowInstances());
9595
}
9696
if (CollectionUtils.isNotEmpty(workflowTestCaseContext.getTaskInstances())) {
9797
initializeTaskInstancesToDB(workflowTestCaseContext.getTaskInstances());
@@ -111,8 +111,10 @@ private void initializeTaskInstancesToDB(List<TaskInstance> taskInstances) {
111111
}
112112
}
113113

114-
private void initializeWorkflowInstanceToDB(WorkflowInstance workflowInstance) {
115-
workflowInstanceDao.insert(workflowInstance);
114+
private void initializeWorkflowInstancesToDB(List<WorkflowInstance> workflowInstances) {
115+
for (WorkflowInstance workflowInstance : workflowInstances) {
116+
workflowInstanceDao.insert(workflowInstance);
117+
}
116118
}
117119

118120
private void initializeWorkflowDefinitionToDB(final List<WorkflowDefinition> workflowDefinitions) {

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceFailoverTestCase.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -615,4 +615,64 @@ public void testGlobalFailover_runningWorkflow_fromAnotherMaster() {
615615
masterContainer.assertAllResourceReleased();
616616

617617
}
618+
619+
@Test
620+
public void testGlobalFailover_runningWorkflow_takeOverSubWorkflow() {
621+
final String yaml = "/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml";
622+
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
623+
final WorkflowDefinition mainWorkflow = context.getWorkflows().stream()
624+
.filter(workflow -> workflow.getName().equals("workflow_with_one_sub_workflow_running")).findFirst()
625+
.orElse(null);
626+
final WorkflowDefinition subWorkflow = context.getWorkflows().stream()
627+
.filter(workflow -> workflow.getName().equals("sub_workflow_running")).findFirst().orElse(null);
628+
629+
assertThat(mainWorkflow).isNotNull();
630+
assertThat(subWorkflow).isNotNull();
631+
632+
systemEventBus.publish(GlobalMasterFailoverEvent.of(new Date()));
633+
634+
await()
635+
.atMost(Duration.ofMinutes(1))
636+
.untilAsserted(() -> {
637+
assertThat(repository.queryAllWorkflowInstance())
638+
.hasSize(2)
639+
.anySatisfy(workflowInstance -> {
640+
assertThat(workflowInstance.getState())
641+
.isEqualTo(WorkflowExecutionStatus.SUCCESS);
642+
});
643+
});
644+
645+
await()
646+
.atMost(Duration.ofMinutes(1))
647+
.untilAsserted(() -> {
648+
assertThat(repository.queryTaskInstance(mainWorkflow))
649+
.hasSize(2)
650+
.anySatisfy(taskInstance -> {
651+
assertThat(taskInstance.getState())
652+
.isEqualTo(taskInstance.getId() == 1 ? TaskExecutionStatus.NEED_FAULT_TOLERANCE
653+
: TaskExecutionStatus.SUCCESS);
654+
assertThat(taskInstance.getName())
655+
.isEqualTo("sub_workflow_task");
656+
});
657+
});
658+
659+
await()
660+
.atMost(Duration.ofMinutes(1))
661+
.untilAsserted(() -> {
662+
assertThat(repository.queryTaskInstance(subWorkflow))
663+
.hasSize(2)
664+
.anySatisfy(taskInstance -> {
665+
assertThat(taskInstance.getState())
666+
.isEqualTo(taskInstance.getId() == 2 ? TaskExecutionStatus.NEED_FAULT_TOLERANCE
667+
: TaskExecutionStatus.SUCCESS);
668+
assertThat(taskInstance.getName())
669+
.isEqualTo("fake_task_A");
670+
});
671+
});
672+
673+
assertThat(repository.queryAllTaskInstance()).hasSize(4);
674+
675+
masterContainer.assertAllResourceReleased();
676+
677+
}
618678
}

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceRecoverFailureTaskTestCase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public void testRepeatRunningWorkflow_with_taskOnly() {
5252
final String yaml = "/it/recover_failure_tasks/failure_workflow_with_two_serial_fake_task.yaml";
5353
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
5454

55-
final Integer workflowInstanceId = context.getWorkflowInstance().getId();
55+
final Integer workflowInstanceId = context.getWorkflowInstances().get(0).getId();
5656
workflowOperator.recoverFailureTasks(workflowInstanceId);
5757

5858
await()
@@ -101,7 +101,7 @@ public void testRecoverFailureWorkflow_from_another_master() {
101101
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
102102
final WorkflowDefinition workflow = context.getOneWorkflow();
103103

104-
final Integer workflowInstanceId = context.getWorkflowInstance().getId();
104+
final Integer workflowInstanceId = context.getWorkflowInstances().get(0).getId();
105105
workflowOperator.recoverFailureTasks(workflowInstanceId);
106106

107107
await()

0 commit comments

Comments
 (0)