Skip to content

Commit 0d3f593

Browse files
authored
[Fix-17308][Alert] Missing alert logic when workflow instance finished (#17309)
1 parent 734a502 commit 0d3f593

4 files changed

Lines changed: 52 additions & 186 deletions

File tree

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WarningType.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,22 @@
2222
import java.util.Arrays;
2323
import java.util.Map;
2424

25+
import lombok.Getter;
26+
2527
import com.baomidou.mybatisplus.annotation.EnumValue;
2628
import com.google.common.base.Functions;
2729

2830
/**
29-
* types for whether to send warning when process ends;
31+
* types for whether to send warning when workflow instance ends
3032
*/
33+
@Getter
3134
public enum WarningType {
3235

3336
/**
3437
* 0 do not send warning;
35-
* 1 send if process success;
36-
* 2 send if process failed;
37-
* 3 send if process ends, whatever the result;
38-
* 4 send global events;
38+
* 1 send if workflow success;
39+
* 2 send if workflow failed;
40+
* 3 send if workflow ends, whatever the result;
3941
*/
4042
NONE(0, "none"),
4143
SUCCESS(1, "success"),
@@ -51,14 +53,6 @@ public enum WarningType {
5153
private final int code;
5254
private final String descp;
5355

54-
public int getCode() {
55-
return code;
56-
}
57-
58-
public String getDescp() {
59-
return descp;
60-
}
61-
6256
private static final Map<String, WarningType> WARNING_TYPE_MAP =
6357
Arrays.stream(WarningType.values()).collect(toMap(WarningType::getDescp, Functions.identity()));
6458

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent;
3636
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
3737
import org.apache.dolphinscheduler.server.master.utils.WorkflowInstanceUtils;
38+
import org.apache.dolphinscheduler.service.alert.WorkflowAlertManager;
3839

3940
import org.apache.commons.collections4.CollectionUtils;
4041

@@ -61,6 +62,9 @@ public abstract class AbstractWorkflowStateAction implements IWorkflowStateActio
6162
@Autowired
6263
protected WorkflowEventBusCoordinator workflowEventBusCoordinator;
6364

65+
@Autowired
66+
protected WorkflowAlertManager workflowAlertManager;
67+
6468
/**
6569
* Try to trigger the tasks if the trigger condition is met.
6670
* <p> If all the given tasks trigger condition is not met then will try to emit workflow finish event.
@@ -190,6 +194,7 @@ protected void finalizeEventAction(final IWorkflowExecutionRunnable workflowExec
190194

191195
workflowCacheRepository.remove(workflowExecutionRunnable.getId());
192196
workflowEventBusCoordinator.unRegisterWorkflowEventBus(workflowExecutionRunnable);
197+
workflowAlertManager.sendAlertWorkflowInstance(workflowExecutionRunnable.getWorkflowInstance());
193198

194199
log.info("Successfully finalize WorkflowExecuteRunnable: {}", workflowExecutionRunnable.getName());
195200
}

dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java

Lines changed: 40 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,16 @@
2424
import org.apache.dolphinscheduler.common.utils.JSONUtils;
2525
import org.apache.dolphinscheduler.dao.AlertDao;
2626
import org.apache.dolphinscheduler.dao.entity.Alert;
27+
import org.apache.dolphinscheduler.dao.entity.Project;
2728
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
2829
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
2930
import org.apache.dolphinscheduler.dao.entity.User;
3031
import org.apache.dolphinscheduler.dao.entity.WorkflowAlertContent;
3132
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinitionLog;
3233
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
33-
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
34-
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionLogMapper;
34+
import org.apache.dolphinscheduler.dao.repository.ProjectDao;
35+
import org.apache.dolphinscheduler.dao.repository.UserDao;
36+
import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionLogDao;
3537

3638
import java.util.ArrayList;
3739
import java.util.Date;
@@ -46,20 +48,20 @@
4648
@Slf4j
4749
public class WorkflowAlertManager {
4850

49-
/**
50-
* alert dao
51-
*/
5251
@Autowired
5352
private AlertDao alertDao;
5453

5554
@Autowired
56-
private WorkflowDefinitionLogMapper workflowDefinitionLogMapper;
55+
private WorkflowDefinitionLogDao workflowDefinitionLogDao;
56+
57+
@Autowired
58+
private UserDao userDao;
5759

5860
@Autowired
59-
private UserMapper userMapper;
61+
private ProjectDao projectDao;
6062

6163
/**
62-
* command type convert chinese
64+
* convert command type to human-readable name
6365
*
6466
* @param commandType command type
6567
* @return command name
@@ -95,73 +97,41 @@ private String getCommandCnName(CommandType commandType) {
9597
* get workflow instance content
9698
*
9799
* @param workflowInstance workflow instance
98-
* @param taskInstances task instance list
99100
* @return workflow instance format content
100101
*/
101102
public String getContentWorkflowInstance(WorkflowInstance workflowInstance,
102-
List<TaskInstance> taskInstances,
103-
ProjectUser projectUser) {
103+
Project project) {
104104

105-
String res = "";
106-
WorkflowDefinitionLog workflowDefinitionLog = workflowDefinitionLogMapper
105+
String res;
106+
WorkflowDefinitionLog workflowDefinitionLog = workflowDefinitionLogDao
107107
.queryByDefinitionCodeAndVersion(workflowInstance.getWorkflowDefinitionCode(),
108108
workflowInstance.getWorkflowDefinitionVersion());
109109

110110
String modifyBy = "";
111111
if (workflowDefinitionLog != null) {
112-
User operator = userMapper.selectById(workflowDefinitionLog.getOperator());
112+
User operator = userDao.queryById(workflowDefinitionLog.getOperator());
113113
modifyBy = operator == null ? "" : operator.getUserName();
114114
}
115115

116-
if (workflowInstance.getState().isSuccess()) {
117-
List<WorkflowAlertContent> successTaskList = new ArrayList<>(1);
118-
WorkflowAlertContent workflowAlertContent = WorkflowAlertContent.builder()
119-
.projectCode(projectUser.getProjectCode())
120-
.projectName(projectUser.getProjectName())
121-
.owner(projectUser.getUserName())
122-
.workflowInstanceId(workflowInstance.getId())
123-
.workflowDefinitionCode(workflowInstance.getWorkflowDefinitionCode())
124-
.workflowInstanceName(workflowInstance.getName())
125-
.commandType(workflowInstance.getCommandType())
126-
.workflowExecutionStatus(workflowInstance.getState())
127-
.modifyBy(modifyBy)
128-
.recovery(workflowInstance.getRecovery())
129-
.runTimes(workflowInstance.getRunTimes())
130-
.workflowStartTime(workflowInstance.getStartTime())
131-
.workflowEndTime(workflowInstance.getEndTime())
132-
.workflowHost(workflowInstance.getHost())
133-
.build();
134-
successTaskList.add(workflowAlertContent);
135-
res = JSONUtils.toJsonString(successTaskList);
136-
} else if (workflowInstance.getState().isFailure()) {
137-
138-
List<WorkflowAlertContent> failedTaskList = new ArrayList<>();
139-
for (TaskInstance task : taskInstances) {
140-
if (task.getState().isSuccess()) {
141-
continue;
142-
}
143-
WorkflowAlertContent workflowAlertContent = WorkflowAlertContent.builder()
144-
.projectCode(projectUser.getProjectCode())
145-
.projectName(projectUser.getProjectName())
146-
.owner(projectUser.getUserName())
147-
.workflowInstanceId(workflowInstance.getId())
148-
.workflowDefinitionCode(workflowInstance.getWorkflowDefinitionCode())
149-
.workflowInstanceName(workflowInstance.getName())
150-
.modifyBy(modifyBy)
151-
.taskCode(task.getTaskCode())
152-
.taskName(task.getName())
153-
.taskType(task.getTaskType())
154-
.taskState(task.getState())
155-
.taskStartTime(task.getStartTime())
156-
.taskEndTime(task.getEndTime())
157-
.taskHost(task.getHost())
158-
.taskPriority(task.getTaskInstancePriority().getDescp())
159-
.logPath(task.getLogPath())
160-
.build();
161-
failedTaskList.add(workflowAlertContent);
162-
}
163-
res = JSONUtils.toJsonString(failedTaskList);
164-
}
116+
List<WorkflowAlertContent> successTaskList = new ArrayList<>(1);
117+
WorkflowAlertContent workflowAlertContent = WorkflowAlertContent.builder()
118+
.projectCode(project.getCode())
119+
.projectName(project.getName())
120+
.owner(project.getUserName())
121+
.workflowInstanceId(workflowInstance.getId())
122+
.workflowDefinitionCode(workflowInstance.getWorkflowDefinitionCode())
123+
.workflowInstanceName(workflowInstance.getName())
124+
.commandType(workflowInstance.getCommandType())
125+
.workflowExecutionStatus(workflowInstance.getState())
126+
.modifyBy(modifyBy)
127+
.recovery(workflowInstance.getRecovery())
128+
.runTimes(workflowInstance.getRunTimes())
129+
.workflowStartTime(workflowInstance.getStartTime())
130+
.workflowEndTime(workflowInstance.getEndTime())
131+
.workflowHost(workflowInstance.getHost())
132+
.build();
133+
successTaskList.add(workflowAlertContent);
134+
res = JSONUtils.toJsonString(successTaskList);
165135

166136
return res;
167137
}
@@ -177,12 +147,12 @@ private String getWorkerToleranceContent(WorkflowInstance workflowInstance, List
177147

178148
List<WorkflowAlertContent> toleranceTaskInstanceList = new ArrayList<>();
179149

180-
WorkflowDefinitionLog workflowDefinitionLog = workflowDefinitionLogMapper
150+
WorkflowDefinitionLog workflowDefinitionLog = workflowDefinitionLogDao
181151
.queryByDefinitionCodeAndVersion(workflowInstance.getWorkflowDefinitionCode(),
182152
workflowInstance.getWorkflowDefinitionVersion());
183153
String modifyBy = "";
184154
if (workflowDefinitionLog != null) {
185-
User operator = userMapper.selectById(workflowDefinitionLog.getOperator());
155+
User operator = userDao.queryById(workflowDefinitionLog.getOperator());
186156
modifyBy = operator == null ? "" : operator.getUserName();
187157
}
188158

@@ -232,24 +202,23 @@ public void sendAlertWorkerToleranceFault(WorkflowInstance workflowInstance, Lis
232202
* send workflow instance alert
233203
*
234204
* @param workflowInstance workflow instance
235-
* @param taskInstances task instance list
236205
*/
237-
public void sendAlertWorkflowInstance(WorkflowInstance workflowInstance,
238-
List<TaskInstance> taskInstances,
239-
ProjectUser projectUser) {
206+
public void sendAlertWorkflowInstance(WorkflowInstance workflowInstance) {
240207
if (!isNeedToSendWarning(workflowInstance)) {
241208
return;
242209
}
210+
Project project = projectDao.queryByCode(workflowInstance.getProjectCode());
211+
243212
Alert alert = new Alert();
244213
String cmdName = getCommandCnName(workflowInstance.getCommandType());
245214
String success = workflowInstance.getState().isSuccess() ? "success" : "failed";
246215
alert.setTitle(cmdName + " " + success);
247216
alert.setWarningType(workflowInstance.getState().isSuccess() ? WarningType.SUCCESS : WarningType.FAILURE);
248-
String content = getContentWorkflowInstance(workflowInstance, taskInstances, projectUser);
217+
String content = getContentWorkflowInstance(workflowInstance, project);
249218
alert.setContent(content);
250219
alert.setAlertGroupId(workflowInstance.getWarningGroupId());
251220
alert.setCreateTime(new Date());
252-
alert.setProjectCode(projectUser.getProjectCode());
221+
alert.setProjectCode(workflowInstance.getProjectCode());
253222
alert.setWorkflowDefinitionCode(workflowInstance.getWorkflowDefinitionCode());
254223
alert.setWorkflowInstanceId(workflowInstance.getId());
255224
alert.setAlertType(workflowInstance.getState().isSuccess() ? AlertType.WORKFLOW_INSTANCE_SUCCESS
@@ -295,54 +264,4 @@ public void sendTaskTimeoutAlert(WorkflowInstance workflowInstance,
295264
ProjectUser projectUser) {
296265
alertDao.sendTaskTimeoutAlert(workflowInstance, taskInstance, projectUser);
297266
}
298-
299-
/**
300-
*
301-
* check node type and workflow blocking flag, then insert a block record into db
302-
*
303-
* @param workflowInstance workflow instance
304-
* @param projectUser the project owner
305-
*/
306-
public void sendWorkflowBlockingAlert(WorkflowInstance workflowInstance,
307-
ProjectUser projectUser) {
308-
Alert alert = new Alert();
309-
String cmdName = getCommandCnName(workflowInstance.getCommandType());
310-
List<WorkflowAlertContent> blockingNodeList = new ArrayList<>(1);
311-
312-
WorkflowDefinitionLog workflowDefinitionLog = workflowDefinitionLogMapper
313-
.queryByDefinitionCodeAndVersion(workflowInstance.getWorkflowDefinitionCode(),
314-
workflowInstance.getWorkflowDefinitionVersion());
315-
316-
String modifyBy = "";
317-
if (workflowDefinitionLog != null) {
318-
User operator = userMapper.selectById(workflowDefinitionLog.getOperator());
319-
modifyBy = operator == null ? "" : operator.getUserName();
320-
}
321-
322-
WorkflowAlertContent workflowAlertContent = WorkflowAlertContent.builder()
323-
.projectCode(projectUser.getProjectCode())
324-
.projectName(projectUser.getProjectName())
325-
.owner(projectUser.getUserName())
326-
.workflowInstanceId(workflowInstance.getId())
327-
.workflowInstanceName(workflowInstance.getName())
328-
.commandType(workflowInstance.getCommandType())
329-
.workflowExecutionStatus(workflowInstance.getState())
330-
.modifyBy(modifyBy)
331-
.runTimes(workflowInstance.getRunTimes())
332-
.workflowStartTime(workflowInstance.getStartTime())
333-
.workflowEndTime(workflowInstance.getEndTime())
334-
.workflowHost(workflowInstance.getHost())
335-
.build();
336-
blockingNodeList.add(workflowAlertContent);
337-
String content = JSONUtils.toJsonString(blockingNodeList);
338-
alert.setTitle(cmdName + " Blocked");
339-
alert.setContent(content);
340-
alert.setAlertGroupId(workflowInstance.getWarningGroupId());
341-
alert.setCreateTime(new Date());
342-
alert.setProjectCode(projectUser.getProjectCode());
343-
alert.setWorkflowDefinitionCode(workflowInstance.getWorkflowDefinitionCode());
344-
alert.setWorkflowInstanceId(workflowInstance.getId());
345-
alert.setAlertType(AlertType.WORKFLOW_INSTANCE_BLOCKED);
346-
alertDao.addAlert(alert);
347-
}
348267
}

dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManagerTest.java

Lines changed: 0 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,13 @@
1717

1818
package org.apache.dolphinscheduler.service.alert;
1919

20-
import org.apache.dolphinscheduler.common.enums.CommandType;
21-
import org.apache.dolphinscheduler.common.enums.WarningType;
22-
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
2320
import org.apache.dolphinscheduler.dao.AlertDao;
24-
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
2521
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
2622
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
2723
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
2824
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionLogMapper;
2925

3026
import java.util.ArrayList;
31-
import java.util.Date;
3227
import java.util.List;
3328

3429
import org.junit.jupiter.api.Test;
@@ -64,7 +59,6 @@ public class WorkflowAlertManagerTest {
6459
*/
6560
@Test
6661
public void sendWarningWorkerToleranceFaultTest() {
67-
// process instance
6862
WorkflowInstance workflowInstance = new WorkflowInstance();
6963
workflowInstance.setName("test");
7064

@@ -77,50 +71,4 @@ public void sendWarningWorkerToleranceFaultTest() {
7771

7872
workflowAlertManager.sendAlertWorkerToleranceFault(workflowInstance, taskInstanceList);
7973
}
80-
81-
/**
82-
* send worker alert fault tolerance
83-
*/
84-
@Test
85-
public void sendWarnningOfProcessInstanceTest() {
86-
// process instance
87-
WorkflowInstance workflowInstance = new WorkflowInstance();
88-
workflowInstance.setWarningType(WarningType.SUCCESS);
89-
workflowInstance.setState(WorkflowExecutionStatus.SUCCESS);
90-
workflowInstance.setCommandType(CommandType.COMPLEMENT_DATA);
91-
workflowInstance.setWarningGroupId(1);
92-
workflowInstance.setWorkflowDefinitionCode(1L);
93-
workflowInstance.setWorkflowDefinitionVersion(1);
94-
95-
ProjectUser projectUser = new ProjectUser();
96-
TaskInstance taskInstance = new TaskInstance();
97-
List<TaskInstance> taskInstanceList = new ArrayList<>();
98-
taskInstanceList.add(taskInstance);
99-
100-
workflowAlertManager.sendAlertWorkflowInstance(workflowInstance, taskInstanceList, projectUser);
101-
}
102-
103-
/**
104-
* send blocking alert
105-
*/
106-
@Test
107-
public void sendBlockingAlertTest() {
108-
// process instance
109-
WorkflowInstance workflowInstance = new WorkflowInstance();
110-
workflowInstance.setId(1);
111-
workflowInstance.setName("test-process-01");
112-
workflowInstance.setCommandType(CommandType.START_PROCESS);
113-
workflowInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION);
114-
workflowInstance.setRunTimes(0);
115-
workflowInstance.setStartTime(new Date());
116-
workflowInstance.setEndTime(new Date());
117-
workflowInstance.setHost("127.0.0.1");
118-
workflowInstance.setWarningGroupId(1);
119-
workflowInstance.setWorkflowDefinitionCode(1L);
120-
workflowInstance.setWorkflowDefinitionVersion(1);
121-
122-
ProjectUser projectUser = new ProjectUser();
123-
124-
workflowAlertManager.sendWorkflowBlockingAlert(workflowInstance, projectUser);
125-
}
12674
}

0 commit comments

Comments
 (0)