Skip to content

Commit 043ed2f

Browse files
Merge branch 'dev' into Improvement-Metrics
2 parents eaba128 + 7dc6a4a commit 043ed2f

12 files changed

Lines changed: 313 additions & 5 deletions

File tree

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public enum AlertType {
2929

3030
/**
3131
* 0 workflow instance failure, 1 workflow instance success, 2 workflow instance blocked, 3 workflow instance timeout, 4 fault tolerance warning,
32-
* 5 task failure, 6 task success, 7 task timeout, 8 close alert
32+
* 5 task failure, 6 task success, 7 task timeout
3333
*/
3434
WORKFLOW_INSTANCE_FAILURE(0, "workflow instance failure"),
3535
WORKFLOW_INSTANCE_SUCCESS(1, "workflow instance success"),

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,13 @@ public void sendServerStoppedAlert(String host, String serverType) {
195195
* @param projectUser projectUser
196196
*/
197197
public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance, ProjectUser projectUser) {
198+
if (projectUser == null) {
199+
throw new IllegalArgumentException("projectUser must not be null");
200+
}
201+
if (workflowInstance.getWarningGroupId() == null) {
202+
throw new IllegalArgumentException("warningGroupId of the workflow instance must not be null");
203+
}
204+
198205
int alertGroupId = workflowInstance.getWarningGroupId();
199206
Alert alert = new Alert();
200207
List<WorkflowAlertContent> workflowAlertContentList = new ArrayList<>(1);
@@ -220,10 +227,11 @@ public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance, ProjectU
220227
alert.setWorkflowDefinitionCode(workflowInstance.getWorkflowDefinitionCode());
221228
alert.setWorkflowInstanceId(workflowInstance.getId());
222229
alert.setAlertType(AlertType.WORKFLOW_INSTANCE_TIMEOUT);
223-
saveTaskTimeoutAlert(alert, content, alertGroupId);
230+
231+
saveTimeoutAlert(alert, content, alertGroupId);
224232
}
225233

226-
private void saveTaskTimeoutAlert(Alert alert, String content, int alertGroupId) {
234+
private void saveTimeoutAlert(Alert alert, String content, int alertGroupId) {
227235
alert.setAlertGroupId(alertGroupId);
228236
alert.setWarningType(WarningType.FAILURE);
229237
alert.setContent(content);
@@ -275,7 +283,8 @@ public void sendTaskTimeoutAlert(WorkflowInstance workflowInstance,
275283
alert.setWorkflowDefinitionCode(workflowInstance.getWorkflowDefinitionCode());
276284
alert.setWorkflowInstanceId(workflowInstance.getId());
277285
alert.setAlertType(AlertType.TASK_TIMEOUT);
278-
saveTaskTimeoutAlert(alert, content, workflowInstance.getWarningGroupId());
286+
287+
saveTimeoutAlert(alert, content, workflowInstance.getWarningGroupId());
279288
}
280289

281290
/**

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskTimeoutLifecycleEvent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public static TaskTimeoutLifecycleEvent of(final ITaskExecutionRunnable taskExec
5151
final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance();
5252

5353
checkState(timeoutStrategy != null, "The task timeoutStrategy must not be null");
54-
checkState(timeoutInMinutes >= 0, "The task timeout: %s must >=0 minutes", timeoutInMinutes);
54+
checkState(timeoutInMinutes > 0, "The task timeout: %s must > 0 minutes", timeoutInMinutes);
5555

5656
long delayTime = System.currentTimeMillis() - taskInstance.getSubmitTime().getTime()
5757
+ TimeUnit.MINUTES.toMillis(timeoutInMinutes);

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/WorkflowLifecycleEventType.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ public enum WorkflowLifecycleEventType implements ILifecycleEventType {
2929
* Notify the workflow instance there exist a task has been finished, and should do DAG topology logic transaction.
3030
*/
3131
TOPOLOGY_LOGICAL_TRANSACTION_WITH_TASK_FINISH,
32+
/**
33+
* Do Timeout strategy of the workflow instance.
34+
*/
35+
TIMEOUT,
3236
/**
3337
* Pause the workflow instance
3438
*/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event;
19+
20+
import static com.google.common.base.Preconditions.checkState;
21+
22+
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
23+
import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
24+
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.AbstractWorkflowLifecycleLifecycleEvent;
25+
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType;
26+
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
27+
28+
import java.util.concurrent.TimeUnit;
29+
30+
import lombok.AccessLevel;
31+
import lombok.AllArgsConstructor;
32+
import lombok.Getter;
33+
34+
@Getter
35+
@AllArgsConstructor(access = AccessLevel.PRIVATE)
36+
public class WorkflowTimeoutLifecycleEvent extends AbstractWorkflowLifecycleLifecycleEvent {
37+
38+
private IWorkflowExecutionRunnable workflowExecutionRunnable;
39+
40+
protected WorkflowTimeoutLifecycleEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
41+
final long timeout) {
42+
super(timeout);
43+
this.workflowExecutionRunnable = workflowExecutionRunnable;
44+
}
45+
46+
public static WorkflowTimeoutLifecycleEvent of(IWorkflowExecutionRunnable workflowExecutionRunnable) {
47+
final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance();
48+
checkState(workflowInstance != null,
49+
"The workflow instance must be initialized before creating workflow timeout event.");
50+
51+
final int timeout = workflowInstance.getTimeout();
52+
checkState(timeout > 0, "The workflow timeout: %s must > 0 minutes", timeout);
53+
54+
long delayTime = System.currentTimeMillis() - workflowInstance.getRestartTime().getTime()
55+
+ TimeUnit.MINUTES.toMillis(timeout);
56+
return new WorkflowTimeoutLifecycleEvent(workflowExecutionRunnable, delayTime);
57+
}
58+
59+
@Override
60+
public ILifecycleEventType getEventType() {
61+
return WorkflowLifecycleEventType.TIMEOUT;
62+
}
63+
64+
@Override
65+
public String toString() {
66+
return "WorkflowTimeoutLifecycleEvent{" +
67+
"workflow=" + workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowInstance().getName() +
68+
'}';
69+
}
70+
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandler.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717

1818
package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.handler;
1919

20+
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
2021
import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
2122
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType;
2223
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowStartLifecycleEvent;
24+
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTimeoutLifecycleEvent;
2325
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
2426
import org.apache.dolphinscheduler.server.master.engine.workflow.statemachine.IWorkflowStateAction;
2527

@@ -38,11 +40,24 @@ public void handle(final IWorkflowStateAction workflowStateAction,
3840
final IWorkflowExecutionRunnable workflowExecutionRunnable,
3941
final WorkflowStartLifecycleEvent workflowStartEvent) {
4042

43+
workflowTimeoutMonitor(workflowExecutionRunnable);
4144
workflowStateAction.onStartEvent(workflowExecutionRunnable, workflowStartEvent);
4245
}
4346

4447
@Override
4548
public ILifecycleEventType matchEventType() {
4649
return WorkflowLifecycleEventType.START;
4750
}
51+
52+
private void workflowTimeoutMonitor(final IWorkflowExecutionRunnable workflowExecutionRunnable) {
53+
final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance();
54+
if (workflowInstance.getTimeout() <= 0) {
55+
log.debug("The workflow {} timeout {} is not configured or invalid, skip timeout monitor.",
56+
workflowInstance.getName(),
57+
workflowInstance.getTimeout());
58+
return;
59+
}
60+
workflowExecutionRunnable.getWorkflowEventBus()
61+
.publish(WorkflowTimeoutLifecycleEvent.of(workflowExecutionRunnable));
62+
}
4863
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.handler;
19+
20+
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
21+
import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
22+
import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraph;
23+
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType;
24+
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTimeoutLifecycleEvent;
25+
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
26+
import org.apache.dolphinscheduler.server.master.engine.workflow.statemachine.IWorkflowStateAction;
27+
import org.apache.dolphinscheduler.service.alert.WorkflowAlertManager;
28+
29+
import lombok.extern.slf4j.Slf4j;
30+
31+
import org.springframework.stereotype.Component;
32+
33+
@Slf4j
34+
@Component
35+
public class WorkflowTimeoutLifecycleEventHandler
36+
extends
37+
AbstractWorkflowLifecycleEventHandler<WorkflowTimeoutLifecycleEvent> {
38+
39+
private final WorkflowAlertManager workflowAlertManager;
40+
41+
public WorkflowTimeoutLifecycleEventHandler(final WorkflowAlertManager workflowAlertManager) {
42+
this.workflowAlertManager = workflowAlertManager;
43+
}
44+
45+
@Override
46+
public void handle(final IWorkflowStateAction workflowStateAction,
47+
final IWorkflowExecutionRunnable workflowExecutionRunnable,
48+
final WorkflowTimeoutLifecycleEvent workflowTimeoutLifecycleEvent) {
49+
final IWorkflowExecutionGraph workflowExecutionGraph = workflowExecutionRunnable.getWorkflowExecutionGraph();
50+
if (workflowExecutionGraph.isAllTaskExecutionRunnableChainFinish()) {
51+
// all the TaskExecutionRunnable chain in the graph is finish, means the workflow is already finished.
52+
return;
53+
}
54+
55+
final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance();
56+
final boolean shouldSendAlert = workflowInstance.getWarningGroupId() != null;
57+
58+
if (shouldSendAlert) {
59+
doWorkflowTimeoutAlert(workflowExecutionRunnable);
60+
} else {
61+
log.info("Skipped sending timeout alert for workflow {} because warningGroupId is null.",
62+
workflowInstance.getName());
63+
}
64+
65+
}
66+
67+
@Override
68+
public ILifecycleEventType matchEventType() {
69+
return WorkflowLifecycleEventType.TIMEOUT;
70+
}
71+
72+
private void doWorkflowTimeoutAlert(final IWorkflowExecutionRunnable workflowExecutionRunnable) {
73+
final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance();
74+
workflowAlertManager.sendWorkflowTimeoutAlert(workflowInstance);
75+
}
76+
}

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/Repository.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717

1818
package org.apache.dolphinscheduler.server.master.integration;
1919

20+
import org.apache.dolphinscheduler.dao.entity.Alert;
2021
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
2122
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
2223
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
24+
import org.apache.dolphinscheduler.dao.mapper.AlertMapper;
2325
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
2426
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
2527

@@ -39,6 +41,9 @@ public class Repository {
3941
@Autowired
4042
private TaskInstanceDao taskInstanceDao;
4143

44+
@Autowired
45+
private AlertMapper alertMapper;
46+
4247
/**
4348
* Return the list of process instances for a given workflow definition in ascending order of their IDs.
4449
*/
@@ -87,4 +92,13 @@ public List<TaskInstance> queryAllTaskInstance() {
8792
return taskInstanceDao.queryAll();
8893
}
8994

95+
/**
96+
* Return the list of alert for a given workflow instance in ascending order of their IDs.
97+
*/
98+
public List<Alert> queryAlert(final Integer workflowInstanceId) {
99+
return alertMapper.selectByWorkflowInstanceId(workflowInstanceId)
100+
.stream()
101+
.sorted(Comparator.comparingInt(Alert::getId))
102+
.collect(Collectors.toList());
103+
}
90104
}

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
2121
import org.apache.dolphinscheduler.common.enums.Flag;
2222
import org.apache.dolphinscheduler.common.enums.TaskDependType;
23+
import org.apache.dolphinscheduler.common.enums.WarningType;
2324
import org.apache.dolphinscheduler.dao.entity.Project;
2425
import org.apache.dolphinscheduler.dao.entity.Schedule;
2526
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
@@ -67,6 +68,8 @@ public Integer manualTriggerWorkflow(final WorkflowTriggerDTO workflowTriggerDTO
6768
.dryRun(workflowTriggerDTO.getDryRun())
6869
.taskDependType(workflowTriggerDTO.getTaskDependType())
6970
.failureStrategy(workflowTriggerDTO.getFailureStrategy())
71+
.warningGroupId(workflowTriggerDTO.getWarningGroupId())
72+
.warningType(workflowTriggerDTO.getWarningType())
7073
.build();
7174

7275
final WorkflowManualTriggerResponse manualTriggerWorkflowResponse =
@@ -160,6 +163,12 @@ public static class WorkflowTriggerDTO {
160163

161164
@Builder.Default
162165
private FailureStrategy failureStrategy = FailureStrategy.CONTINUE;
166+
167+
@Builder.Default
168+
private WarningType warningType = WarningType.NONE;
169+
170+
@Builder.Default
171+
private Integer warningGroupId = null;
163172
}
164173

165174
@Data

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static com.google.common.truth.Truth.assertThat;
2121
import static org.awaitility.Awaitility.await;
2222

23+
import org.apache.dolphinscheduler.common.enums.AlertType;
2324
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
2425
import org.apache.dolphinscheduler.common.enums.Flag;
2526
import org.apache.dolphinscheduler.common.enums.TaskDependType;
@@ -1778,4 +1779,45 @@ public void testTaskRemainsSubmittedSuccess_with_noAvailableWorkerAndTimeoutDisa
17781779
// masterContainer.assertAllResourceReleased();
17791780
}
17801781

1782+
@Test
1783+
@DisplayName("Test start a workflow when timeout should trigger alert when warningGroupId is set")
1784+
public void testWorkflowTimeout_WithAlertGroup_ShouldSendAlert() {
1785+
final String yaml = "/it/start/workflow_with_workflow_timeout_alert.yaml";
1786+
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
1787+
final WorkflowDefinition workflow = context.getOneWorkflow();
1788+
1789+
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
1790+
.workflowDefinition(workflow)
1791+
.runWorkflowCommandParam(new RunWorkflowCommandParam())
1792+
.warningGroupId(workflow.getWarningGroupId())
1793+
.build();
1794+
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
1795+
1796+
await().atMost(Duration.ofMinutes(2))
1797+
.untilAsserted(() -> {
1798+
Assertions
1799+
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
1800+
.matches(
1801+
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);
1802+
Assertions
1803+
.assertThat(repository.queryTaskInstance(workflow))
1804+
.hasSize(1)
1805+
.anySatisfy(taskInstance -> {
1806+
assertThat(taskInstance.getName()).isEqualTo("long_running_task");
1807+
assertThat(taskInstance.getWorkerGroup()).isEqualTo("default");
1808+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
1809+
});
1810+
Assertions
1811+
.assertThat(repository.queryAlert(workflowInstanceId))
1812+
.hasSize(1)
1813+
.anySatisfy(alert -> {
1814+
assertThat(alert.getTitle()).isEqualTo("Workflow Timeout Warn");
1815+
assertThat(alert.getProjectCode()).isEqualTo(1);
1816+
assertThat(alert.getWorkflowDefinitionCode()).isEqualTo(1);
1817+
assertThat(alert.getAlertType()).isEqualTo(AlertType.WORKFLOW_INSTANCE_TIMEOUT);
1818+
});
1819+
});
1820+
1821+
masterContainer.assertAllResourceReleased();
1822+
}
17811823
}

0 commit comments

Comments
 (0)