Skip to content

Commit 5d63e41

Browse files
authored
[Fix-17350] Fix issues of sub-workflow failover from different master server (#17352)
1 parent 1317e83 commit 5d63e41

11 files changed

Lines changed: 1456 additions & 44 deletions

File tree

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.dolphinscheduler.common.enums;
1919

20-
import java.util.Arrays;
2120
import java.util.HashMap;
2221
import java.util.Map;
2322

@@ -94,8 +93,19 @@ public boolean canPause() {
9493
|| this == SERIAL_WAIT;
9594
}
9695

97-
public boolean canFailover() {
98-
return Arrays.stream(NEED_FAILOVER_STATES).anyMatch(x -> x == this.getCode());
96+
/**
97+
* status can be take over on sub-workflow
98+
* @return bool
99+
*/
100+
public boolean canTakeover() {
101+
return this == RUNNING_EXECUTION
102+
|| this == READY_PAUSE
103+
|| this == PAUSE
104+
|| this == READY_STOP
105+
|| this == STOP
106+
|| this == FAILURE
107+
|| this == SUCCESS
108+
|| this == FAILOVER;
99109
}
100110

101111
public boolean canDirectPauseInDB() {

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,6 @@ public interface WorkflowInstanceMapper extends BaseMapper<WorkflowInstance> {
5656
List<WorkflowInstance> queryByHostAndStatus(@Param("host") String host,
5757
@Param("states") int[] stateArray);
5858

59-
/**
60-
* query workflow instance by host and stateArray which is not sub workflow
61-
*
62-
* @param host host
63-
* @param stateArray stateArray
64-
* @return workflow instance list
65-
*/
66-
List<WorkflowInstance> queryMainWorkflowByHostAndStatus(@Param("host") String host,
67-
@Param("states") int[] stateArray);
68-
6959
/**
7060
* query workflow instance host by stateArray
7161
*

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ public List<String> queryNeedFailoverMasters() {
176176

177177
@Override
178178
public List<WorkflowInstance> queryNeedFailoverWorkflowInstances(String masterAddress) {
179-
return mybatisMapper.queryMainWorkflowByHostAndStatus(masterAddress,
179+
return mybatisMapper.queryByHostAndStatus(masterAddress,
180180
WorkflowExecutionStatus.getNeedFailoverWorkflowInstanceState());
181181
}
182182
}

dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -50,22 +50,6 @@
5050
</if>
5151
order by id asc
5252
</select>
53-
<select id="queryMainWorkflowByHostAndStatus" resultType="org.apache.dolphinscheduler.dao.entity.WorkflowInstance">
54-
select
55-
<include refid="baseSql"/>
56-
from t_ds_workflow_instance
57-
where is_sub_workflow=0
58-
<if test="host != null and host != ''">
59-
and host=#{host}
60-
</if>
61-
<if test="states != null and states.length != 0">
62-
and state in
63-
<foreach collection="states" item="i" open="(" close=")" separator=",">
64-
#{i}
65-
</foreach>
66-
</if>
67-
order by id asc
68-
</select>
6953
<select id="queryNeedFailoverWorkflowInstanceHost" resultType="String">
7054
select distinct host
7155
from t_ds_workflow_instance

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/subworkflow/SubWorkflowLogicTask.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import org.apache.dolphinscheduler.server.master.engine.executor.plugin.ITaskParameterDeserializer;
4141
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
4242
import org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
43-
import org.apache.dolphinscheduler.server.master.failover.WorkflowFailover;
4443
import org.apache.dolphinscheduler.task.executor.ITaskExecutor;
4544
import org.apache.dolphinscheduler.task.executor.events.TaskExecutorRuntimeContextChangedLifecycleEvent;
4645

@@ -168,9 +167,9 @@ private SubWorkflowLogicTaskRuntimeContext recoverFromFaultTolerantTasks() {
168167
final WorkflowInstance subWorkflowInstance = workflowInstanceDao.queryById(
169168
subWorkflowLogicTaskRuntimeContext.getSubWorkflowInstanceId());
170169

171-
if (subWorkflowInstance != null && subWorkflowInstance.getState().canFailover()) {
172-
// Only handle sub-workflow's fail-over in SubWorkflowLogicTask's fail-over
173-
applicationContext.getBean(WorkflowFailover.class).failoverWorkflow(subWorkflowInstance);
170+
if (subWorkflowInstance != null && subWorkflowInstance.getState().canTakeover()) {
171+
// Here we only need to take over the runtime context of sub-workflow,
172+
// the sub-workflow will be failover by master-server when needed.
174173
return subWorkflowLogicTaskRuntimeContext;
175174
}
176175

0 commit comments

Comments
 (0)