Skip to content

Commit 6176e33

Browse files
[Improvement-Metrics][master] Add missing metrics to Master module
1 parent 40fe806 commit 6176e33

4 files changed

Lines changed: 17 additions & 2 deletions

File tree

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/handler/TaskTimeoutLifecycleEventHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public void handle(final ITaskStateAction taskStateAction,
5454
// The task instance is not active, means it is already finished.
5555
return;
5656
}
57+
org.apache.dolphinscheduler.server.master.metrics.TaskMetrics.incTaskInstanceByState("timeout");
5758
final String taskName = taskExecutionRunnable.getName();
5859
final TaskTimeoutStrategy timeoutNotifyStrategy = taskTimeoutLifecycleEvent.getTimeoutStrategy();
5960
if (timeoutNotifyStrategy == null) {

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
4747
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent;
4848
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
49+
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
4950

5051
import org.apache.commons.lang3.StringUtils;
5152

@@ -109,6 +110,7 @@ public void onFatalEvent(final IWorkflowExecutionRunnable workflowExecutionRunna
109110

110111
if (taskExecutionRunnable.isTaskInstanceCanRetry()) {
111112
taskExecutionRunnable.getWorkflowEventBus().publish(TaskRetryLifecycleEvent.of(taskExecutionRunnable));
113+
TaskMetrics.incTaskInstanceByState("retry");
112114
return;
113115
}
114116

@@ -131,6 +133,7 @@ private void persistentTaskInstanceFatalEventToDB(final ITaskExecutionRunnable t
131133
taskInstance.setState(TaskExecutionStatus.FAILURE);
132134
taskInstance.setEndTime(taskFatalEvent.getEndTime());
133135
taskInstanceDao.updateById(taskInstance);
136+
TaskMetrics.incTaskInstanceByState("fail");
134137
}
135138

136139
@Override
@@ -141,6 +144,7 @@ public void onDispatchedEvent(final IWorkflowExecutionRunnable workflowExecution
141144
taskInstance.setState(DISPATCH);
142145
taskInstance.setHost(taskDispatchedEvent.getExecutorHost());
143146
taskInstanceDao.updateById(taskInstance);
147+
TaskMetrics.incTaskInstanceByState("dispatch");
144148
}
145149

146150
@Override
@@ -196,7 +200,7 @@ private void persistentTaskInstanceKilledEventToDB(final ITaskExecutionRunnable
196200
taskInstance.setState(TaskExecutionStatus.KILL);
197201
taskInstance.setEndTime(taskKilledEvent.getEndTime());
198202
taskInstanceDao.updateById(taskInstance);
199-
203+
TaskMetrics.incTaskInstanceByState("kill");
200204
}
201205

202206
@Override
@@ -208,6 +212,7 @@ public void onFailedEvent(final IWorkflowExecutionRunnable workflowExecutionRunn
208212

209213
if (taskExecutionRunnable.isTaskInstanceCanRetry()) {
210214
taskExecutionRunnable.getWorkflowEventBus().publish(TaskRetryLifecycleEvent.of(taskExecutionRunnable));
215+
TaskMetrics.incTaskInstanceByState("retry");
211216
return;
212217
}
213218
// If all successors are condition tasks, then the task will not be marked as failure.
@@ -229,6 +234,7 @@ private void persistentTaskInstanceFailedEventToDB(final ITaskExecutionRunnable
229234
taskInstance.setState(TaskExecutionStatus.FAILURE);
230235
taskInstance.setEndTime(taskFailedEvent.getEndTime());
231236
taskInstanceDao.updateById(taskInstance);
237+
TaskMetrics.incTaskInstanceByState("fail");
232238
}
233239

234240
@Override
@@ -259,6 +265,7 @@ protected void persistentTaskInstanceSuccessEventToDB(final ITaskExecutionRunnab
259265
JSONUtils.toJsonString(taskSuccessEvent.getVarPool()));
260266
taskInstance.setVarPool(VarPoolUtils.serializeVarPool(finalVarPool));
261267
taskInstanceDao.updateById(taskInstance);
268+
TaskMetrics.incTaskInstanceByState("success");
262269
}
263270

264271
/**

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/WorkflowExecutionRunnableFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,12 @@ public class WorkflowExecutionRunnableFactory {
5252
*/
5353
@Transactional
5454
public IWorkflowExecutionRunnable createWorkflowExecuteRunnable(Command command) {
55+
long startTime = System.currentTimeMillis();
5556
deleteCommandOrThrow(command);
56-
return doCreateWorkflowExecutionRunnable(command);
57+
IWorkflowExecutionRunnable workflowExecutionRunnable = doCreateWorkflowExecutionRunnable(command);
58+
org.apache.dolphinscheduler.server.master.metrics.WorkflowInstanceMetrics
59+
.recordWorkflowInstanceGenerateTime(System.currentTimeMillis() - startTime);
60+
return workflowExecutionRunnable;
5761
}
5862

5963
/**

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,9 @@ protected void transformWorkflowInstanceState(final IWorkflowExecutionRunnable w
172172
workflowInstanceDao.updateById(workflowInstance);
173173
log.info("Success set WorkflowExecuteRunnable: {} state from: {} to {}",
174174
workflowInstance.getName(), originState.name(), targetState.name());
175+
WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode(
176+
targetState.name(),
177+
String.valueOf(workflowInstance.getWorkflowDefinitionCode()));
175178
} catch (Exception ex) {
176179
workflowInstance.setState(originState);
177180
throw ex;

0 commit comments

Comments
 (0)