Skip to content

Commit 01fd91f

Browse files
committed
[Fix-17453] Fix TASK_ONLY strategy cannot work
1 parent cfa96f6 commit 01fd91f

9 files changed

Lines changed: 157 additions & 1 deletion

File tree

dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/command/RunWorkflowCommandParam.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import org.apache.dolphinscheduler.common.enums.CommandType;
2121

22+
import java.util.List;
23+
2224
import lombok.Data;
2325
import lombok.EqualsAndHashCode;
2426
import lombok.NoArgsConstructor;
@@ -35,4 +37,9 @@ public CommandType getCommandType() {
3537
return CommandType.START_PROCESS;
3638
}
3739

40+
public RunWorkflowCommandParam withStartNodes(List<Long> startNodes) {
41+
this.startNodes = startNodes;
42+
return this;
43+
}
44+
3845
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ protected void assembleWorkflowExecutionGraph(final WorkflowExecuteContextBuilde
139139
.doVisitFunction(taskExecutionRunnableCreator)
140140
.build();
141141
workflowGraphTopologyLogicalVisitor.visit();
142+
workflowExecutionGraph.removeUnReachableEdge();
142143

143144
workflowExecuteContextBuilder.setWorkflowExecutionGraph(workflowExecutionGraph);
144145
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ protected void assembleWorkflowExecutionGraph(final WorkflowExecuteContextBuilde
116116
.doVisitFunction(taskExecutionRunnableCreator)
117117
.build();
118118
workflowGraphTopologyLogicalVisitor.visit();
119+
workflowExecutionGraph.removeUnReachableEdge();
119120

120121
workflowExecuteContextBuilder.setWorkflowExecutionGraph(workflowExecutionGraph);
121122
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ protected void assembleWorkflowExecutionGraph(final WorkflowExecuteContextBuilde
137137
.doVisitFunction(taskExecutionRunnableCreator)
138138
.build();
139139
workflowGraphTopologyLogicalVisitor.visit();
140+
workflowExecutionGraph.removeUnReachableEdge();
140141

141142
workflowExecuteContextBuilder.setWorkflowExecutionGraph(workflowExecutionGraph);
142143
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraph.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ public interface IWorkflowExecutionGraph {
4040
*/
4141
void addEdge(final String fromTaskName, final Set<String> toTaskName);
4242

43+
/**
44+
* Remove the unreachable edge in the graph.
45+
*/
46+
void removeUnReachableEdge();
47+
4348
/**
4449
* Return the start tasks, the start tasks in the workflow execution graph is the tasks which predecessors is empty.
4550
*/

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,16 @@
2828
import java.util.ArrayList;
2929
import java.util.HashMap;
3030
import java.util.HashSet;
31+
import java.util.Iterator;
3132
import java.util.List;
3233
import java.util.Map;
3334
import java.util.Set;
35+
import java.util.function.Consumer;
3436
import java.util.stream.Collectors;
3537

3638
public class WorkflowExecutionGraph implements IWorkflowExecutionGraph {
3739

40+
// Store all the task execution runnable in the execution graph.
3841
private final Map<String, ITaskExecutionRunnable> totalTaskExecuteRunnableMap;
3942

4043
private final Set<String> failureTaskChains;
@@ -78,6 +81,26 @@ public void addEdge(String fromTaskName, Set<String> toTaskNames) {
7881
toTaskNames.forEach(toTask -> predecessors.computeIfAbsent(toTask, k -> new HashSet<>()).add(fromTaskName));
7982
}
8083

84+
@Override
85+
public void removeUnReachableEdge() {
86+
// If the node in successors or predecessors is not in taskExecuteRunnableMap
87+
// It means that the node is not executable, so we need to filter it out
88+
Consumer<Map<String, Set<String>>> removeUnReachableEdge = edgeMap -> {
89+
final Iterator<Map.Entry<String, Set<String>>> iterator = edgeMap.entrySet().iterator();
90+
while (iterator.hasNext()) {
91+
Map.Entry<String, Set<String>> entry = iterator.next();
92+
if (!totalTaskExecuteRunnableMap.containsKey(entry.getKey())) {
93+
iterator.remove();
94+
continue;
95+
}
96+
Set<String> toTasks = entry.getValue();
97+
toTasks.removeIf(toTask -> !totalTaskExecuteRunnableMap.containsKey(toTask));
98+
}
99+
};
100+
removeUnReachableEdge.accept(successors);
101+
removeUnReachableEdge.accept(predecessors);
102+
}
103+
81104
@Override
82105
public List<ITaskExecutionRunnable> getStartNodes() {
83106
return totalTaskExecuteRunnableMap.values()

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.dolphinscheduler.server.master.integration;
1919

2020
import org.apache.dolphinscheduler.common.enums.Flag;
21+
import org.apache.dolphinscheduler.common.enums.TaskDependType;
2122
import org.apache.dolphinscheduler.dao.entity.Project;
2223
import org.apache.dolphinscheduler.dao.entity.Schedule;
2324
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
@@ -62,7 +63,8 @@ public Integer manualTriggerWorkflow(final WorkflowTriggerDTO workflowTriggerDTO
6263
.workflowDefinitionVersion(workflowTriggerDTO.workflowDefinition.getVersion())
6364
.startNodes(workflowTriggerDTO.getRunWorkflowCommandParam().getStartNodes())
6465
.startParamList(workflowTriggerDTO.getRunWorkflowCommandParam().getCommandParams())
65-
.dryRun(workflowTriggerDTO.dryRun)
66+
.dryRun(workflowTriggerDTO.getDryRun())
67+
.taskDependType(workflowTriggerDTO.getTaskDependType())
6668
.build();
6769

6870
final WorkflowManualTriggerResponse manualTriggerWorkflowResponse =
@@ -150,6 +152,9 @@ public static class WorkflowTriggerDTO {
150152

151153
@Builder.Default
152154
private Flag dryRun = Flag.NO;
155+
156+
@Builder.Default
157+
private TaskDependType taskDependType = TaskDependType.TASK_POST;
153158
}
154159

155160
@Data

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.awaitility.Awaitility.await;
2222

2323
import org.apache.dolphinscheduler.common.enums.Flag;
24+
import org.apache.dolphinscheduler.common.enums.TaskDependType;
2425
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
2526
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
2627
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
@@ -1035,4 +1036,36 @@ public void testStartWorkflow_withTimeoutKillTask() {
10351036
});
10361037
masterContainer.assertAllResourceReleased();
10371038
}
1039+
1040+
@Test
1041+
@DisplayName("Test start a workflow with task depend type TASK_ONLY")
1042+
public void testStartWorkflow_withTaskOnlyStrategy() {
1043+
final String yaml = "/it/start/workflow_with_task_only_strategy.yaml";
1044+
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
1045+
final WorkflowDefinition workflow = context.getOneWorkflow();
1046+
1047+
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
1048+
.workflowDefinition(workflow)
1049+
.runWorkflowCommandParam(new RunWorkflowCommandParam().withStartNodes(Lists.newArrayList(1L)))
1050+
.taskDependType(TaskDependType.TASK_ONLY)
1051+
.build();
1052+
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
1053+
1054+
await()
1055+
.atMost(Duration.ofMinutes(1))
1056+
.untilAsserted(() -> {
1057+
Assertions
1058+
.assertThat(repository.queryWorkflowInstance(workflow))
1059+
.satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
1060+
.isEqualTo(WorkflowExecutionStatus.SUCCESS));
1061+
Assertions
1062+
.assertThat(repository.queryTaskInstance(workflow))
1063+
.hasSize(1)
1064+
.satisfiesExactly(taskInstance -> {
1065+
assertThat(taskInstance.getName()).isEqualTo("A");
1066+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
1067+
});
1068+
});
1069+
masterContainer.assertAllResourceReleased();
1070+
}
10381071
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
project:
18+
name: MasterIntegrationTest
19+
code: 1
20+
description: This is a fake project
21+
userId: 1
22+
userName: admin
23+
createTime: 2024-08-12 00:00:00
24+
updateTime: 2021-08-12 00:00:00
25+
26+
workflows:
27+
- name: workflow_with_two_serial_fake_task_success
28+
code: 1
29+
version: 1
30+
projectCode: 1
31+
description: This is a fake workflow with two serial tasks
32+
releaseState: ONLINE
33+
createTime: 2024-08-12 00:00:00
34+
updateTime: 2021-08-12 00:00:00
35+
userId: 1
36+
executionType: PARALLEL
37+
38+
tasks:
39+
- name: A
40+
code: 1
41+
version: 1
42+
projectCode: 1
43+
userId: 1
44+
taskType: LogicFakeTask
45+
taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo hello"}'
46+
workerGroup: default
47+
createTime: 2024-08-12 00:00:00
48+
updateTime: 2021-08-12 00:00:00
49+
taskExecuteType: BATCH
50+
- name: B
51+
code: 2
52+
version: 1
53+
projectCode: 1
54+
userId: 1
55+
taskType: LogicFakeTask
56+
taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo hello"}'
57+
workerGroup: default
58+
createTime: 2024-08-12 00:00:00
59+
updateTime: 2021-08-12 00:00:00
60+
taskExecuteType: BATCH
61+
62+
taskRelations:
63+
- projectCode: 1
64+
workflowDefinitionCode: 1
65+
workflowDefinitionVersion: 1
66+
preTaskCode: 0
67+
preTaskVersion: 0
68+
postTaskCode: 1
69+
postTaskVersion: 1
70+
createTime: 2024-08-12 00:00:00
71+
updateTime: 2024-08-12 00:00:00
72+
- projectCode: 1
73+
workflowDefinitionCode: 1
74+
workflowDefinitionVersion: 1
75+
preTaskCode: 1
76+
preTaskVersion: 1
77+
postTaskCode: 2
78+
postTaskVersion: 1
79+
createTime: 2024-08-12 00:00:00
80+
updateTime: 2024-08-12 00:00:00

0 commit comments

Comments
 (0)