Skip to content

Commit 2a2647b

Browse files
[Improvement-18039][Metrics] Refactor workflow state metrics and fix missing imports
This commit addresses feedback regarding missing variables and symbols in the workflow metrics implementation. - Added the missing WorkflowInstanceMetrics import in AbstractWorkflowStateAction. - Refactored the workflow state-to-tag mapping into WorkflowInstanceMetrics for better encapsulation. This ensures the correct lowercase tags (e.g., 'submit', 'fail') are consistently used. - Removed unused task dispatch metrics in TaskMetrics to improve code maintainability. - Applied project formatting standards using spotless.
1 parent be08d86 commit 2a2647b

3 files changed

Lines changed: 27 additions & 35 deletions

File tree

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent;
3737
import org.apache.dolphinscheduler.server.master.engine.workflow.policy.IWorkflowFailureStrategy;
3838
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
39+
import org.apache.dolphinscheduler.server.master.metrics.WorkflowInstanceMetrics;
3940
import org.apache.dolphinscheduler.server.master.utils.WorkflowInstanceUtils;
4041
import org.apache.dolphinscheduler.service.alert.WorkflowAlertManager;
4142

@@ -173,7 +174,7 @@ protected void transformWorkflowInstanceState(final IWorkflowExecutionRunnable w
173174
log.info("Success set WorkflowExecuteRunnable: {} state from: {} to {}",
174175
workflowInstance.getName(), originState.name(), targetState.name());
175176
WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode(
176-
targetState.name(),
177+
targetState,
177178
String.valueOf(workflowInstance.getWorkflowDefinitionCode()));
178179
} catch (Exception ex) {
179180
workflowInstance.setState(originState);

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetrics.java

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -50,39 +50,12 @@ public class TaskMetrics {
5050

5151
}
5252

53-
private final Counter taskDispatchCounter =
54-
Counter.builder("ds.task.dispatch.count")
55-
.description("Task dispatch count")
56-
.register(Metrics.globalRegistry);
57-
58-
private final Counter taskDispatchFailCounter =
59-
Counter.builder("ds.task.dispatch.failure.count")
60-
.description("Task dispatch failures count, retried ones included")
61-
.register(Metrics.globalRegistry);
62-
63-
private final Counter taskDispatchErrorCounter =
64-
Counter.builder("ds.task.dispatch.error.count")
65-
.description("Number of errors during task dispatch")
66-
.register(Metrics.globalRegistry);
67-
6853
public synchronized void registerTaskPrepared(Supplier<Number> consumer) {
6954
Gauge.builder("ds.task.prepared", consumer)
7055
.description("Task prepared count")
7156
.register(Metrics.globalRegistry);
7257
}
7358

74-
public void incTaskDispatchFailed(int failedCount) {
75-
taskDispatchFailCounter.increment(failedCount);
76-
}
77-
78-
public void incTaskDispatchError() {
79-
taskDispatchErrorCounter.increment();
80-
}
81-
82-
public void incTaskDispatch() {
83-
taskDispatchCounter.increment();
84-
}
85-
8659
public void incTaskInstanceByState(final String state) {
8760
if (taskInstanceCounters.get(state) == null) {
8861
return;

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/WorkflowInstanceMetrics.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.dolphinscheduler.server.master.metrics;
1919

20+
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
21+
2022
import java.util.Set;
2123
import java.util.concurrent.TimeUnit;
2224
import java.util.function.Supplier;
@@ -36,7 +38,7 @@
3638
public class WorkflowInstanceMetrics {
3739

3840
private final Set<String> workflowInstanceStates = ImmutableSet.of(
39-
"submit", "timeout", "finish", "failover", "success", "fail", "stop");
41+
"submit", "timeout", "finish", "failover", "success", "fail", "stop", "pause");
4042

4143
static {
4244
for (final String state : workflowInstanceStates) {
@@ -78,18 +80,34 @@ public synchronized void registerWorkflowInstanceResubmitGauge(Supplier<Number>
7880
.register(Metrics.globalRegistry);
7981
}
8082

81-
public void incWorkflowInstanceByStateAndWorkflowDefinitionCode(final String state,
82-
final String workflowDefinitionCode) {
83-
// When tags need to be determined from local context,
84-
// you have no choice but to construct or lookup the Meter inside your method body.
85-
// The lookup cost is just a single hash lookup, so it is acceptable for most use cases.
83+
public void incWorkflowInstanceByStateAndWorkflowDefinitionCode(WorkflowExecutionStatus state,
84+
String workflowDefinitionCode) {
8685
Metrics.globalRegistry.counter(
8786
"ds.workflow.instance.count",
88-
"state", state,
87+
"state", getMetricState(state),
8988
"workflow.definition.code", workflowDefinitionCode)
9089
.increment();
9190
}
9291

92+
private String getMetricState(WorkflowExecutionStatus state) {
93+
switch (state) {
94+
case SUBMITTED_SUCCESS:
95+
return "submit";
96+
case FAILURE:
97+
return "fail";
98+
case SUCCESS:
99+
return "success";
100+
case STOP:
101+
return "stop";
102+
case PAUSE:
103+
return "pause";
104+
case FAILOVER:
105+
return "failover";
106+
default:
107+
return state.name().toLowerCase();
108+
}
109+
}
110+
93111
public void cleanUpWorkflowInstanceCountMetricsByDefinitionCode(final Long workflowDefinitionCode) {
94112
for (final String state : workflowInstanceStates) {
95113
final Counter counter = Metrics.globalRegistry.counter(

0 commit comments

Comments
 (0)