Skip to content

Commit 25f7429

Browse files
authored
[Fix-18154] Fix abnormal transmission of sub-workflow complement date (#18155)
1 parent 01856e0 commit 25f7429

4 files changed

Lines changed: 99 additions & 1 deletion

File tree

dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowManualTriggerRequest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
2626

2727
import java.util.ArrayList;
28+
import java.util.Date;
2829
import java.util.List;
2930

3031
import lombok.AllArgsConstructor;
@@ -66,6 +67,10 @@ public class WorkflowManualTriggerRequest {
6667

6768
private Long environmentCode;
6869

70+
private Date scheduleTime;
71+
72+
private String timeZone;
73+
6974
@Builder.Default
7075
private List<Property> startParamList = new ArrayList<>();
7176

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/subworkflow/SubWorkflowLogicTask.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,8 @@ private SubWorkflowLogicTaskRuntimeContext triggerNewSubWorkflow() {
250250
.workerGroup(workflowInstance.getWorkerGroup())
251251
.tenantCode(workflowInstance.getTenantCode())
252252
.environmentCode(workflowInstance.getEnvironmentCode())
253+
.scheduleTime(workflowInstance.getScheduleTime())
254+
.timeZone(commandParam.getTimeZone())
253255
.startParamList(paramList)
254256
.dryRun(Flag.of(workflowInstance.getDryRun()))
255257
.build();

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowManualTrigger.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowManualTriggerResponse;
3434

3535
import org.apache.commons.lang3.ObjectUtils;
36+
import org.apache.commons.lang3.StringUtils;
3637
import org.apache.commons.lang3.tuple.ImmutablePair;
3738

3839
import java.util.Date;
@@ -61,6 +62,7 @@ protected ImmutablePair<WorkflowDefinition, WorkflowInstance> constructWorkflowI
6162
workflowInstance.setCommandType(commandType);
6263
workflowInstance.setStateWithDesc(WorkflowExecutionStatus.SUBMITTED_SUCCESS, commandType.name());
6364
workflowInstance.setRecovery(Flag.NO);
65+
workflowInstance.setScheduleTime(workflowManualTriggerRequest.getScheduleTime());
6466
workflowInstance.setStartTime(new Date());
6567
workflowInstance.setRestartTime(workflowInstance.getStartTime());
6668
workflowInstance.setRunTimes(1);
@@ -91,7 +93,8 @@ protected Command constructTriggerCommand(final WorkflowManualTriggerRequest wor
9193
final RunWorkflowCommandParam runWorkflowCommandParam = RunWorkflowCommandParam.builder()
9294
.commandParams(workflowManualTriggerRequest.getStartParamList())
9395
.startNodes(workflowManualTriggerRequest.getStartNodes())
94-
.timeZone(DateUtils.getTimezone())
96+
.timeZone(
97+
StringUtils.defaultIfBlank(workflowManualTriggerRequest.getTimeZone(), DateUtils.getTimezone()))
9598
.build();
9699
return Command.builder()
97100
.commandType(CommandType.START_PROCESS)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.integration.cases;
19+
20+
import static org.assertj.core.api.Assertions.assertThat;
21+
import static org.awaitility.Awaitility.await;
22+
23+
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
24+
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
25+
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
26+
import org.apache.dolphinscheduler.extract.master.command.BackfillWorkflowCommandParam;
27+
import org.apache.dolphinscheduler.server.master.AbstractMasterIntegrationTestCase;
28+
import org.apache.dolphinscheduler.server.master.integration.WorkflowOperator;
29+
import org.apache.dolphinscheduler.server.master.integration.WorkflowTestCaseContext;
30+
31+
import org.apache.commons.lang3.time.DateUtils;
32+
33+
import java.time.Duration;
34+
import java.util.List;
35+
36+
import org.assertj.core.util.Lists;
37+
import org.junit.jupiter.api.DisplayName;
38+
import org.junit.jupiter.api.Test;
39+
40+
class SubWorkflowBackfillScheduleTimeInheritanceTestCase extends AbstractMasterIntegrationTestCase {
41+
42+
@Test
43+
@DisplayName("Test sub workflow inherits parent backfill schedule time")
44+
void testSubWorkflowInheritsParentBackfillScheduleTime() throws Exception {
45+
final String yaml = "/it/start/workflow_with_sub_workflow_task_success.yaml";
46+
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
47+
final WorkflowDefinition parentWorkflow = context.getWorkflow("parent_workflow");
48+
final WorkflowDefinition childWorkflow = context.getWorkflow("child_workflow");
49+
final String backfillTime = "2026-04-08 00:00:00";
50+
51+
final BackfillWorkflowCommandParam backfillWorkflowCommandParam = BackfillWorkflowCommandParam.builder()
52+
.backfillTimeList(Lists.newArrayList(backfillTime))
53+
.build();
54+
final WorkflowOperator.WorkflowBackfillDTO workflowBackfillDTO = WorkflowOperator.WorkflowBackfillDTO.builder()
55+
.workflow(parentWorkflow)
56+
.backfillWorkflowCommandParam(backfillWorkflowCommandParam)
57+
.build();
58+
59+
workflowOperator.backfillWorkflow(workflowBackfillDTO);
60+
61+
await()
62+
.atMost(Duration.ofMinutes(1))
63+
.untilAsserted(() -> {
64+
final List<WorkflowInstance> parentWorkflowInstances =
65+
repository.queryWorkflowInstance(parentWorkflow);
66+
final List<WorkflowInstance> childWorkflowInstances =
67+
repository.queryWorkflowInstance(childWorkflow);
68+
69+
assertThat(parentWorkflowInstances)
70+
.singleElement()
71+
.satisfies(workflowInstance -> {
72+
assertThat(workflowInstance.getState()).isEqualTo(WorkflowExecutionStatus.SUCCESS);
73+
assertThat(workflowInstance.getScheduleTime())
74+
.isEqualTo(DateUtils.parseDate(backfillTime, "yyyy-MM-dd HH:mm:ss"));
75+
});
76+
77+
assertThat(childWorkflowInstances)
78+
.singleElement()
79+
.satisfies(workflowInstance -> {
80+
assertThat(workflowInstance.getState()).isEqualTo(WorkflowExecutionStatus.SUCCESS);
81+
assertThat(workflowInstance.getScheduleTime())
82+
.isEqualTo(DateUtils.parseDate(backfillTime, "yyyy-MM-dd HH:mm:ss"));
83+
});
84+
});
85+
86+
masterContainer.assertAllResourceReleased();
87+
}
88+
}

0 commit comments

Comments
 (0)