Skip to content

Commit e46eb00

Browse files
Refactor TaskMetrics tracking to event-driven approach
1 parent edeef4e commit e46eb00

3 files changed

Lines changed: 34 additions & 9 deletions

File tree

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEventBusFireWorker.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,40 @@ private void doFireSingleEvent(final IWorkflowExecutionRunnable workflowExecutio
156156
throw new RuntimeException("No EventHandler found for event: " + event.getEventType());
157157
}
158158
lifecycleEventHandler.handle(workflowExecutionRunnable, event);
159+
160+
recordTaskInstanceMetrics(event);
161+
}
162+
163+
private void recordTaskInstanceMetrics(AbstractLifecycleEvent event) {
164+
if (!(event
165+
.getEventType() instanceof org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType)) {
166+
return;
167+
}
168+
169+
switch ((org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType) event
170+
.getEventType()) {
171+
case DISPATCHED:
172+
org.apache.dolphinscheduler.server.master.metrics.TaskMetrics.incTaskInstanceByState("dispatch");
173+
break;
174+
case SUCCEEDED:
175+
org.apache.dolphinscheduler.server.master.metrics.TaskMetrics.incTaskInstanceByState("success");
176+
break;
177+
case FAILED:
178+
case FATAL:
179+
org.apache.dolphinscheduler.server.master.metrics.TaskMetrics.incTaskInstanceByState("fail");
180+
break;
181+
case KILLED:
182+
org.apache.dolphinscheduler.server.master.metrics.TaskMetrics.incTaskInstanceByState("kill");
183+
break;
184+
case RETRY:
185+
org.apache.dolphinscheduler.server.master.metrics.TaskMetrics.incTaskInstanceByState("retry");
186+
break;
187+
case TIMEOUT:
188+
org.apache.dolphinscheduler.server.master.metrics.TaskMetrics.incTaskInstanceByState("timeout");
189+
break;
190+
default:
191+
break;
192+
}
159193
}
160194

161195
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/handler/TaskTimeoutLifecycleEventHandler.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ public void handle(final ITaskStateAction taskStateAction,
5454
// The task instance is not active, means it is already finished.
5555
return;
5656
}
57-
org.apache.dolphinscheduler.server.master.metrics.TaskMetrics.incTaskInstanceByState("timeout");
5857
final String taskName = taskExecutionRunnable.getName();
5958
final TaskTimeoutStrategy timeoutNotifyStrategy = taskTimeoutLifecycleEvent.getTimeoutStrategy();
6059
if (timeoutNotifyStrategy == null) {

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
4747
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent;
4848
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
49-
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
5049

5150
import org.apache.commons.lang3.StringUtils;
5251

@@ -110,7 +109,6 @@ public void onFatalEvent(final IWorkflowExecutionRunnable workflowExecutionRunna
110109

111110
if (taskExecutionRunnable.isTaskInstanceCanRetry()) {
112111
taskExecutionRunnable.getWorkflowEventBus().publish(TaskRetryLifecycleEvent.of(taskExecutionRunnable));
113-
TaskMetrics.incTaskInstanceByState("retry");
114112
return;
115113
}
116114

@@ -133,7 +131,6 @@ private void persistentTaskInstanceFatalEventToDB(final ITaskExecutionRunnable t
133131
taskInstance.setState(TaskExecutionStatus.FAILURE);
134132
taskInstance.setEndTime(taskFatalEvent.getEndTime());
135133
taskInstanceDao.updateById(taskInstance);
136-
TaskMetrics.incTaskInstanceByState("fail");
137134
}
138135

139136
@Override
@@ -144,7 +141,6 @@ public void onDispatchedEvent(final IWorkflowExecutionRunnable workflowExecution
144141
taskInstance.setState(DISPATCH);
145142
taskInstance.setHost(taskDispatchedEvent.getExecutorHost());
146143
taskInstanceDao.updateById(taskInstance);
147-
TaskMetrics.incTaskInstanceByState("dispatch");
148144
}
149145

150146
@Override
@@ -200,7 +196,6 @@ private void persistentTaskInstanceKilledEventToDB(final ITaskExecutionRunnable
200196
taskInstance.setState(TaskExecutionStatus.KILL);
201197
taskInstance.setEndTime(taskKilledEvent.getEndTime());
202198
taskInstanceDao.updateById(taskInstance);
203-
TaskMetrics.incTaskInstanceByState("kill");
204199
}
205200

206201
@Override
@@ -212,7 +207,6 @@ public void onFailedEvent(final IWorkflowExecutionRunnable workflowExecutionRunn
212207

213208
if (taskExecutionRunnable.isTaskInstanceCanRetry()) {
214209
taskExecutionRunnable.getWorkflowEventBus().publish(TaskRetryLifecycleEvent.of(taskExecutionRunnable));
215-
TaskMetrics.incTaskInstanceByState("retry");
216210
return;
217211
}
218212
// If all successors are condition tasks, then the task will not be marked as failure.
@@ -234,7 +228,6 @@ private void persistentTaskInstanceFailedEventToDB(final ITaskExecutionRunnable
234228
taskInstance.setState(TaskExecutionStatus.FAILURE);
235229
taskInstance.setEndTime(taskFailedEvent.getEndTime());
236230
taskInstanceDao.updateById(taskInstance);
237-
TaskMetrics.incTaskInstanceByState("fail");
238231
}
239232

240233
@Override
@@ -265,7 +258,6 @@ protected void persistentTaskInstanceSuccessEventToDB(final ITaskExecutionRunnab
265258
JSONUtils.toJsonString(taskSuccessEvent.getVarPool()));
266259
taskInstance.setVarPool(VarPoolUtils.serializeVarPool(finalVarPool));
267260
taskInstanceDao.updateById(taskInstance);
268-
TaskMetrics.incTaskInstanceByState("success");
269261
}
270262

271263
/**

0 commit comments

Comments
 (0)