Skip to content

Commit d6c2c46

Browse files
authored
[Fix-17847] Fix incWorkflowInstanceByStateAndWorkflowDefinitionCode not called in version 3.4.0 (#18082)
1 parent 8b5a846 commit d6c2c46

9 files changed

Lines changed: 307 additions & 3 deletions

File tree

dolphinscheduler-master/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,10 @@
307307
<groupId>org.apache.logging.log4j</groupId>
308308
<artifactId>log4j-to-slf4j</artifactId>
309309
</exclusion>
310+
<exclusion>
311+
<groupId>com.vaadin.external.google</groupId>
312+
<artifactId>android-json</artifactId>
313+
</exclusion>
310314
</exclusions>
311315
</dependency>
312316

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
4040
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.WorkflowExecutionRunnableFactory;
4141
import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
42+
import org.apache.dolphinscheduler.server.master.metrics.WorkflowInstanceMetrics;
4243
import org.apache.dolphinscheduler.service.command.CommandService;
4344

4445
import org.apache.commons.collections4.CollectionUtils;
@@ -177,6 +178,7 @@ private CompletableFuture<Void> bootstrapWorkflowExecutionRunnable(IWorkflowExec
177178
return CompletableFuture.completedFuture(null);
178179
}
179180

181+
WorkflowInstanceMetrics.recordWorkflowInstanceSubmit(workflowInstance.getWorkflowDefinitionCode());
180182
workflowRepository.put(workflowExecutionRunnable);
181183
workflowEventBusCoordinator.registerWorkflowEventBus(workflowExecutionRunnable);
182184
workflowExecutionRunnable.getWorkflowEventBus()
@@ -203,6 +205,9 @@ private Void bootstrapError(Command command, Throwable throwable) {
203205
final int workflowInstanceId = command.getWorkflowInstanceId();
204206

205207
workflowInstanceDao.forceUpdateWorkflowInstanceState(workflowInstanceId, WorkflowExecutionStatus.FAILURE);
208+
WorkflowInstanceMetrics.recordWorkflowInstanceFinish(
209+
WorkflowExecutionStatus.FAILURE,
210+
command.getWorkflowDefinitionCode());
206211
log.info("Set workflow instance {} state to FAILURE", workflowInstanceId);
207212

208213
commandService.moveToErrorCommand(command, ExceptionUtils.getStackTrace(throwable));

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/serial/AbstractSerialCommandHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
3030
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopRequest;
3131
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
32+
import org.apache.dolphinscheduler.server.master.metrics.WorkflowInstanceMetrics;
3233

3334
import lombok.extern.slf4j.Slf4j;
3435

@@ -86,6 +87,9 @@ public Void doInTransaction(TransactionStatus status) {
8687
workflowInstance.setEndTime(DateUtils.getCurrentDate());
8788
workflowInstance.setState(WorkflowExecutionStatus.STOP);
8889
workflowInstanceDao.upsertWorkflowInstance(workflowInstance);
90+
WorkflowInstanceMetrics.recordWorkflowInstanceFinish(
91+
WorkflowExecutionStatus.STOP,
92+
workflowInstance.getWorkflowDefinitionCode());
8993
return null;
9094
}
9195
});

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent;
3737
import org.apache.dolphinscheduler.server.master.engine.workflow.policy.IWorkflowFailureStrategy;
3838
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
39+
import org.apache.dolphinscheduler.server.master.metrics.WorkflowInstanceMetrics;
3940
import org.apache.dolphinscheduler.server.master.utils.WorkflowInstanceUtils;
4041
import org.apache.dolphinscheduler.service.alert.WorkflowAlertManager;
4142

@@ -146,6 +147,9 @@ protected void workflowFinish(final IWorkflowExecutionRunnable workflowExecution
146147
final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance();
147148
workflowInstance.setEndTime(new Date());
148149
transformWorkflowInstanceState(workflowExecutionRunnable, workflowExecutionStatus);
150+
WorkflowInstanceMetrics.recordWorkflowInstanceFinish(
151+
workflowExecutionStatus,
152+
workflowInstance.getWorkflowDefinitionCode());
149153
if (workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowDefinition().getExecutionType()
150154
.isSerial()) {
151155
if (serialCommandDao.deleteByWorkflowInstanceId(workflowInstance.getId()) > 0) {

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/WorkflowFailover.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.dolphinscheduler.dao.repository.CommandDao;
2626
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
2727
import org.apache.dolphinscheduler.extract.master.command.WorkflowFailoverCommandParam;
28+
import org.apache.dolphinscheduler.server.master.metrics.WorkflowInstanceMetrics;
2829

2930
import lombok.extern.slf4j.Slf4j;
3031

@@ -48,6 +49,7 @@ public void failoverWorkflow(final WorkflowInstance workflowInstance) {
4849
workflowInstance.getId(),
4950
workflowInstance.getState(),
5051
WorkflowExecutionStatus.FAILOVER);
52+
WorkflowInstanceMetrics.recordWorkflowInstanceFailover(workflowInstance.getWorkflowDefinitionCode());
5153

5254
final WorkflowFailoverCommandParam failoverWorkflowCommandParam = WorkflowFailoverCommandParam.builder()
5355
.workflowExecutionStatus(workflowInstance.getState())

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/WorkflowInstanceMetrics.java

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.dolphinscheduler.server.master.metrics;
1919

20+
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
21+
2022
import java.util.Set;
2123
import java.util.concurrent.TimeUnit;
2224
import java.util.function.Supplier;
@@ -36,7 +38,16 @@
3638
public class WorkflowInstanceMetrics {
3739

3840
private final Set<String> workflowInstanceStates = ImmutableSet.of(
39-
"submit", "timeout", "finish", "failover", "success", "fail", "stop");
41+
WorkflowExecutionStatus.SUBMITTED_SUCCESS.name(),
42+
WorkflowExecutionStatus.RUNNING_EXECUTION.name(),
43+
WorkflowExecutionStatus.READY_PAUSE.name(),
44+
WorkflowExecutionStatus.PAUSE.name(),
45+
WorkflowExecutionStatus.READY_STOP.name(),
46+
WorkflowExecutionStatus.FAILOVER.name(),
47+
WorkflowExecutionStatus.SUCCESS.name(),
48+
WorkflowExecutionStatus.FAILURE.name(),
49+
WorkflowExecutionStatus.STOP.name(),
50+
WorkflowExecutionStatus.SERIAL_WAIT.name());
4051

4152
static {
4253
for (final String state : workflowInstanceStates) {
@@ -78,18 +89,39 @@ public synchronized void registerWorkflowInstanceResubmitGauge(Supplier<Number>
7889
.register(Metrics.globalRegistry);
7990
}
8091

81-
public void incWorkflowInstanceByStateAndWorkflowDefinitionCode(final String state,
92+
public void incWorkflowInstanceByStateAndWorkflowDefinitionCode(final WorkflowExecutionStatus state,
8293
final String workflowDefinitionCode) {
8394
// When tags need to be determined from local context,
8495
// you have no choice but to construct or lookup the Meter inside your method body.
8596
// The lookup cost is just a single hash lookup, so it is acceptable for most use cases.
8697
Metrics.globalRegistry.counter(
8798
"ds.workflow.instance.count",
88-
"state", state,
99+
"state", state.name(),
89100
"workflow.definition.code", workflowDefinitionCode)
90101
.increment();
91102
}
92103

104+
public void recordWorkflowInstanceSubmit(final Long workflowDefinitionCode) {
105+
incWorkflowInstanceByStateAndWorkflowDefinitionCode(
106+
WorkflowExecutionStatus.SUBMITTED_SUCCESS,
107+
String.valueOf(workflowDefinitionCode));
108+
}
109+
110+
public void recordWorkflowInstanceFailover(final Long workflowDefinitionCode) {
111+
incWorkflowInstanceByStateAndWorkflowDefinitionCode(
112+
WorkflowExecutionStatus.FAILOVER,
113+
String.valueOf(workflowDefinitionCode));
114+
}
115+
116+
public void recordWorkflowInstanceFinish(final WorkflowExecutionStatus workflowExecutionStatus,
117+
final Long workflowDefinitionCode) {
118+
if (workflowExecutionStatus == null || !workflowExecutionStatus.isFinalState()) {
119+
return;
120+
}
121+
incWorkflowInstanceByStateAndWorkflowDefinitionCode(workflowExecutionStatus,
122+
String.valueOf(workflowDefinitionCode));
123+
}
124+
93125
public void cleanUpWorkflowInstanceCountMetricsByDefinitionCode(final Long workflowDefinitionCode) {
94126
for (final String state : workflowInstanceStates) {
95127
final Counter counter = Metrics.globalRegistry.counter(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.integration.cases;
19+
20+
import static org.assertj.core.api.Assertions.assertThat;
21+
import static org.awaitility.Awaitility.await;
22+
23+
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
24+
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
25+
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
26+
import org.apache.dolphinscheduler.extract.master.command.RunWorkflowCommandParam;
27+
import org.apache.dolphinscheduler.server.master.AbstractMasterIntegrationTestCase;
28+
import org.apache.dolphinscheduler.server.master.integration.WorkflowOperator;
29+
import org.apache.dolphinscheduler.server.master.integration.WorkflowTestCaseContext;
30+
import org.apache.dolphinscheduler.server.master.metrics.WorkflowInstanceMetrics;
31+
32+
import java.time.Duration;
33+
34+
import org.junit.jupiter.api.DisplayName;
35+
import org.junit.jupiter.api.Test;
36+
37+
import io.micrometer.core.instrument.Counter;
38+
import io.micrometer.core.instrument.Metrics;
39+
40+
public class WorkflowInstanceMetricsTestCase extends AbstractMasterIntegrationTestCase {
41+
42+
@Test
43+
@DisplayName("Test workflow instance metrics for a successful workflow")
44+
public void testWorkflowInstanceMetrics_with_oneSuccessTask() {
45+
final String yaml = "/it/metrics/workflow_with_one_fake_task_success_for_metrics.yaml";
46+
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
47+
final WorkflowDefinition workflow = context.getOneWorkflow();
48+
final long workflowDefinitionCode = workflow.getCode();
49+
WorkflowInstanceMetrics.cleanUpWorkflowInstanceCountMetricsByDefinitionCode(workflowDefinitionCode);
50+
51+
final double submitBefore = workflowInstanceCount(WorkflowExecutionStatus.SUBMITTED_SUCCESS.name(),
52+
workflowDefinitionCode);
53+
final double successBefore = workflowInstanceCount(WorkflowExecutionStatus.SUCCESS.name(),
54+
workflowDefinitionCode);
55+
56+
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
57+
.workflowDefinition(workflow)
58+
.runWorkflowCommandParam(new RunWorkflowCommandParam())
59+
.build();
60+
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
61+
62+
await()
63+
.atMost(Duration.ofMinutes(2))
64+
.untilAsserted(() -> {
65+
assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState())
66+
.isEqualTo(WorkflowExecutionStatus.SUCCESS);
67+
assertThat(workflowInstanceCount(WorkflowExecutionStatus.SUBMITTED_SUCCESS.name(),
68+
workflowDefinitionCode)).isEqualTo(submitBefore + 1.0d);
69+
assertThat(workflowInstanceCount(WorkflowExecutionStatus.SUCCESS.name(), workflowDefinitionCode))
70+
.isEqualTo(successBefore + 1.0d);
71+
});
72+
73+
masterContainer.assertAllResourceReleased();
74+
}
75+
76+
@Test
77+
@DisplayName("Test workflow instance metrics for serial discard strategy")
78+
public void testWorkflowInstanceMetrics_with_serialDiscardStrategy() {
79+
final String yaml = "/it/metrics/workflow_with_serial_discard_strategy_for_metrics.yaml";
80+
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
81+
final WorkflowDefinition workflow = context.getOneWorkflow();
82+
final long workflowDefinitionCode = workflow.getCode();
83+
WorkflowInstanceMetrics.cleanUpWorkflowInstanceCountMetricsByDefinitionCode(workflowDefinitionCode);
84+
85+
final double submitBefore = workflowInstanceCount(WorkflowExecutionStatus.SUBMITTED_SUCCESS.name(),
86+
workflowDefinitionCode);
87+
final double successBefore = workflowInstanceCount(WorkflowExecutionStatus.SUCCESS.name(),
88+
workflowDefinitionCode);
89+
final double stopBefore = workflowInstanceCount(WorkflowExecutionStatus.STOP.name(),
90+
workflowDefinitionCode);
91+
92+
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
93+
.workflowDefinition(workflow)
94+
.runWorkflowCommandParam(new RunWorkflowCommandParam())
95+
.build();
96+
final Integer workflowInstanceId1 = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
97+
final Integer workflowInstanceId2 = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
98+
final Integer workflowInstanceId3 = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
99+
100+
await()
101+
.atMost(Duration.ofMinutes(2))
102+
.untilAsserted(() -> {
103+
final WorkflowInstance workflowInstance1 = repository.queryWorkflowInstance(workflowInstanceId1);
104+
final WorkflowInstance workflowInstance2 = repository.queryWorkflowInstance(workflowInstanceId2);
105+
final WorkflowInstance workflowInstance3 = repository.queryWorkflowInstance(workflowInstanceId3);
106+
assertThat(workflowInstance1.getState()).isEqualTo(WorkflowExecutionStatus.SUCCESS);
107+
assertThat(workflowInstance2.getState()).isEqualTo(WorkflowExecutionStatus.STOP);
108+
assertThat(workflowInstance3.getState()).isEqualTo(WorkflowExecutionStatus.STOP);
109+
110+
assertThat(workflowInstanceCount(WorkflowExecutionStatus.SUBMITTED_SUCCESS.name(),
111+
workflowDefinitionCode)).isEqualTo(submitBefore + 1.0d);
112+
assertThat(workflowInstanceCount(WorkflowExecutionStatus.SUCCESS.name(), workflowDefinitionCode))
113+
.isEqualTo(successBefore + 1.0d);
114+
assertThat(workflowInstanceCount(WorkflowExecutionStatus.STOP.name(), workflowDefinitionCode))
115+
.isEqualTo(stopBefore + 2.0d);
116+
});
117+
118+
masterContainer.assertAllResourceReleased();
119+
}
120+
121+
private double workflowInstanceCount(final String state, final long workflowDefinitionCode) {
122+
final Counter counter = Metrics.globalRegistry.find("ds.workflow.instance.count")
123+
.tags(
124+
"state",
125+
state,
126+
"workflow.definition.code",
127+
String.valueOf(workflowDefinitionCode))
128+
.counter();
129+
return counter == null ? 0.0d : counter.count();
130+
}
131+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
project:
19+
name: MasterIntegrationTest
20+
code: 2101
21+
description: This is a fake project
22+
userId: 1
23+
userName: admin
24+
createTime: 2024-08-12 00:00:00
25+
updateTime: 2021-08-12 00:00:00
26+
27+
workflows:
28+
- name: workflow_with_one_fake_task_success_for_metrics
29+
code: 210101
30+
version: 1
31+
projectCode: 2101
32+
description: This is a fake workflow with single task for metrics
33+
releaseState: ONLINE
34+
createTime: 2024-08-12 00:00:00
35+
updateTime: 2021-08-12 00:00:00
36+
userId: 1
37+
executionType: PARALLEL
38+
39+
tasks:
40+
- name: A
41+
code: 2101001
42+
version: 1
43+
projectCode: 2101
44+
userId: 1
45+
taskType: LogicFakeTask
46+
taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 30"}'
47+
workerGroup: default
48+
createTime: 2024-08-12 00:00:00
49+
updateTime: 2021-08-12 00:00:00
50+
taskExecuteType: BATCH
51+
52+
taskRelations:
53+
- projectCode: 2101
54+
workflowDefinitionCode: 210101
55+
workflowDefinitionVersion: 1
56+
preTaskCode: 0
57+
preTaskVersion: 0
58+
postTaskCode: 2101001
59+
postTaskVersion: 1
60+
createTime: 2024-08-12 00:00:00
61+
updateTime: 2024-08-12 00:00:00

0 commit comments

Comments
 (0)