Skip to content

Commit 5782558

Browse files
Fix nextBranch is empty in switch task (#17054)
Co-authored-by: caishunfeng <caishunfeng2021@gmail.com>
1 parent 5614699 commit 5782558

5 files changed

Lines changed: 293 additions & 2 deletions

File tree

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/switchtask/SwitchLogicTask.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,9 @@ private void calculateSwitchBranch() {
117117
if (nextBranch == null) {
118118
log.info("All switch item is not satisfied");
119119
moveToDefaultBranch();
120+
} else {
121+
log.info("The condition is satisfied, move to the next branch: {}", getTaskName(nextBranch));
122+
taskParameters.setNextBranch(nextBranch);
120123
}
121124
}
122125

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/SuccessorFlowAdjuster.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ private void adjustSwitchTaskSuccessorFlow(final ITaskExecutionRunnable taskExec
119119
needSkippedBranch.add(switchResultVo.getNextNode());
120120
}
121121
}
122-
needSkippedBranch.remove(switchResult.getNextNode());
122+
needSkippedBranch.remove(switchParameters.getNextBranch());
123123
markTaskSkipped(taskExecutionRunnable, needSkippedBranch);
124124
}
125125

@@ -136,7 +136,7 @@ private void markTaskSkipped(final ITaskExecutionRunnable taskExecutionRunnable,
136136
taskExecutionRunnable.getWorkflowInstance().getName());
137137
continue;
138138
}
139-
workflowExecutionGraph.markTaskSkipped(taskExecutionRunnable);
139+
workflowExecutionGraph.markTaskSkipped(branch);
140140
}
141141
}
142142

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,92 @@ public void testStartWorkflow_with_oneSuccessTaskUsingEnvironmentConfig() {
183183
masterContainer.assertAllResourceReleased();
184184
}
185185

186+
@Test
187+
@DisplayName("Test start a workflow with one success switch task and two fake task")
188+
public void testStartWorkflow_with_oneSuccessSwitch_twoFakeTask() {
189+
final String yaml = "/it/start/workflow_with_one_success_switch_two_fake_task.yaml";
190+
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
191+
final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
192+
193+
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
194+
.workflowDefinition(parentWorkflow)
195+
.runWorkflowCommandParam(new RunWorkflowCommandParam())
196+
.build();
197+
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
198+
199+
await()
200+
.atMost(Duration.ofMinutes(1))
201+
.untilAsserted(() -> {
202+
203+
Assertions
204+
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
205+
.matches(
206+
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
207+
.matches(
208+
workflowInstance -> workflowInstance.getIsSubWorkflow() == Flag.NO)
209+
.matches(
210+
workflowInstance -> workflowInstance.getDryRun() == Flag.NO.getCode());
211+
212+
Assertions
213+
.assertThat(repository.queryTaskInstance(workflowInstanceId))
214+
.hasSize(2)
215+
.anySatisfy(taskInstance -> {
216+
assertThat(taskInstance.getName()).isEqualTo("switch_task");
217+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
218+
})
219+
.anySatisfy(taskInstance -> {
220+
assertThat(taskInstance.getName()).isEqualTo("success_branch");
221+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
222+
});
223+
224+
});
225+
226+
masterContainer.assertAllResourceReleased();
227+
}
228+
229+
@Test
230+
@DisplayName("Test start a workflow with one failed switch task and two fake task")
231+
public void testStartWorkflow_with_oneFailedSwitch_twoFakeTask() {
232+
final String yaml = "/it/start/workflow_with_one_failed_switch_two_fake_task.yaml";
233+
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
234+
final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
235+
236+
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
237+
.workflowDefinition(parentWorkflow)
238+
.runWorkflowCommandParam(new RunWorkflowCommandParam())
239+
.build();
240+
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
241+
242+
await()
243+
.atMost(Duration.ofMinutes(1))
244+
.untilAsserted(() -> {
245+
246+
Assertions
247+
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
248+
.matches(
249+
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
250+
.matches(
251+
workflowInstance -> workflowInstance.getIsSubWorkflow() == Flag.NO)
252+
.matches(
253+
workflowInstance -> workflowInstance.getDryRun() == Flag.NO.getCode());
254+
255+
Assertions
256+
.assertThat(repository.queryTaskInstance(workflowInstanceId))
257+
.hasSize(2)
258+
.anySatisfy(taskInstance -> {
259+
assertThat(taskInstance.getName()).isEqualTo("switch_task");
260+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
261+
})
262+
.anySatisfy(taskInstance -> {
263+
assertThat(taskInstance.getName()).isEqualTo("default_branch");
264+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
265+
});
266+
267+
});
268+
269+
masterContainer.assertAllResourceReleased();
270+
}
271+
186272
@Test
187273
@DisplayName("Test start a workflow with one sub workflow task(A) success")
188274
public void testStartWorkflow_with_subWorkflowTask_success() {
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
project:
19+
name: MasterIntegrationTest
20+
code: 1
21+
description: This is a fake project
22+
userId: 1
23+
userName: admin
24+
createTime: 2024-08-12 00:00:00
25+
updateTime: 2021-08-12 00:00:00
26+
27+
workflows:
28+
- name: workflow_with_one_failed_switch_two_fake_task
29+
code: 1
30+
version: 1
31+
projectCode: 1
32+
description: This is a workflow with one switch task falied
33+
releaseState: ONLINE
34+
createTime: 2024-08-12 00:00:00
35+
updateTime: 2021-08-12 00:00:00
36+
userId: 1
37+
executionType: PARALLEL
38+
39+
tasks:
40+
- name: switch_task
41+
code: 1
42+
version: 1
43+
projectCode: 1
44+
userId: 1
45+
taskType: SWITCH
46+
taskParams: '{"localParams":[],"rawScript":"","resourceList":[],"switchResult":{"dependTaskList":[{"condition":"false == true","nextNode":2}],"nextNode":3}}'
47+
workerGroup: default
48+
createTime: 2024-08-12 00:00:00
49+
updateTime: 2021-08-12 00:00:00
50+
taskExecuteType: BATCH
51+
- name: success_branch
52+
code: 2
53+
version: 1
54+
projectCode: 1
55+
userId: 1
56+
taskType: LogicFakeTask
57+
taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo success"}'
58+
workerGroup: default
59+
createTime: 2024-08-12 00:00:00
60+
updateTime: 2021-08-12 00:00:00
61+
taskExecuteType: BATCH
62+
- name: default_branch
63+
code: 3
64+
version: 1
65+
projectCode: 1
66+
userId: 1
67+
taskType: LogicFakeTask
68+
taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo success"}'
69+
workerGroup: default
70+
createTime: 2024-08-12 00:00:00
71+
updateTime: 2021-08-12 00:00:00
72+
taskExecuteType: BATCH
73+
74+
taskRelations:
75+
- projectCode: 1
76+
workflowDefinitionCode: 1
77+
workflowDefinitionVersion: 1
78+
preTaskCode: 0
79+
preTaskVersion: 0
80+
postTaskCode: 1
81+
postTaskVersion: 1
82+
createTime: 2024-08-12 00:00:00
83+
updateTime: 2024-08-12 00:00:00
84+
- projectCode: 1
85+
workflowDefinitionCode: 1
86+
workflowDefinitionVersion: 1
87+
preTaskCode: 1
88+
preTaskVersion: 1
89+
postTaskCode: 2
90+
postTaskVersion: 1
91+
createTime: 2024-08-12 00:00:00
92+
updateTime: 2024-08-12 00:00:00
93+
- projectCode: 1
94+
workflowDefinitionCode: 1
95+
workflowDefinitionVersion: 1
96+
preTaskCode: 1
97+
preTaskVersion: 1
98+
postTaskCode: 3
99+
postTaskVersion: 1
100+
createTime: 2024-08-12 00:00:00
101+
updateTime: 2024-08-12 00:00:00
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
project:
19+
name: MasterIntegrationTest
20+
code: 1
21+
description: This is a fake project
22+
userId: 1
23+
userName: admin
24+
createTime: 2024-08-12 00:00:00
25+
updateTime: 2021-08-12 00:00:00
26+
27+
workflows:
28+
- name: workflow_with_one_success_switch_two_fake_task
29+
code: 1
30+
version: 1
31+
projectCode: 1
32+
description: This is a workflow with one switch task success
33+
releaseState: ONLINE
34+
createTime: 2024-08-12 00:00:00
35+
updateTime: 2021-08-12 00:00:00
36+
userId: 1
37+
executionType: PARALLEL
38+
39+
tasks:
40+
- name: switch_task
41+
code: 1
42+
version: 1
43+
projectCode: 1
44+
userId: 1
45+
taskType: SWITCH
46+
taskParams: '{"localParams":[],"rawScript":"","resourceList":[],"switchResult":{"dependTaskList":[{"condition":"true == true","nextNode":2}],"nextNode":3}}'
47+
workerGroup: default
48+
createTime: 2024-08-12 00:00:00
49+
updateTime: 2021-08-12 00:00:00
50+
taskExecuteType: BATCH
51+
- name: success_branch
52+
code: 2
53+
version: 1
54+
projectCode: 1
55+
userId: 1
56+
taskType: LogicFakeTask
57+
taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo success"}'
58+
workerGroup: default
59+
createTime: 2024-08-12 00:00:00
60+
updateTime: 2021-08-12 00:00:00
61+
taskExecuteType: BATCH
62+
- name: default_branch
63+
code: 3
64+
version: 1
65+
projectCode: 1
66+
userId: 1
67+
taskType: LogicFakeTask
68+
taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo success"}'
69+
workerGroup: default
70+
createTime: 2024-08-12 00:00:00
71+
updateTime: 2021-08-12 00:00:00
72+
taskExecuteType: BATCH
73+
74+
taskRelations:
75+
- projectCode: 1
76+
workflowDefinitionCode: 1
77+
workflowDefinitionVersion: 1
78+
preTaskCode: 0
79+
preTaskVersion: 0
80+
postTaskCode: 1
81+
postTaskVersion: 1
82+
createTime: 2024-08-12 00:00:00
83+
updateTime: 2024-08-12 00:00:00
84+
- projectCode: 1
85+
workflowDefinitionCode: 1
86+
workflowDefinitionVersion: 1
87+
preTaskCode: 1
88+
preTaskVersion: 1
89+
postTaskCode: 2
90+
postTaskVersion: 1
91+
createTime: 2024-08-12 00:00:00
92+
updateTime: 2024-08-12 00:00:00
93+
- projectCode: 1
94+
workflowDefinitionCode: 1
95+
workflowDefinitionVersion: 1
96+
preTaskCode: 1
97+
preTaskVersion: 1
98+
postTaskCode: 3
99+
postTaskVersion: 1
100+
createTime: 2024-08-12 00:00:00
101+
updateTime: 2024-08-12 00:00:00

0 commit comments

Comments
 (0)