Skip to content

Commit 3b1d631

Browse files
authored
[Improvement-17157][Master] Support setting max.concurrent.workflow.instances (#17159)
1 parent ea42073 commit 3b1d631

8 files changed

Lines changed: 288 additions & 18 deletions

File tree

docs/docs/en/architecture/configuration.md

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -275,21 +275,22 @@ Location: `api-server/conf/application.yaml`
275275

276276
Location: `master-server/conf/application.yaml`
277277

278-
| Parameters | Default value | Description |
279-
|-----------------------------------------------------------------------------|------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------|
280-
| master.listen-port | 5678 | master listen port |
281-
| master.logic-task-config.task-executor-thread-count | 2 * CPU +1 | The thread size used to execute logic task |
282-
| master.worker-load-balancer-configuration-properties.type | DYNAMIC_WEIGHTED_ROUND_ROBIN | Master will use the worker's cpu/memory/threadPool usage to calculate the worker load, the lower load will have more change to be dispatched task |
283-
| master.max-heartbeat-interval | 10s | master max heartbeat interval |
284-
| master.server-load-protection.enabled | true | If set true, will open master overload protection |
285-
| master.server-load-protection.max-system-cpu-usage-percentage-thresholds | 0.8 | Master max system cpu usage, when the master's system cpu usage is smaller then this value, master server can execute workflow. |
286-
| master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds | 0.8 | Master max JVM cpu usage, when the master's jvm cpu usage is smaller then this value, master server can execute workflow. |
287-
| master.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.8 | Master max system memory usage , when the master's system memory usage is smaller then this value, master server can execute workflow. |
288-
| master.server-load-protection.max-disk-usage-percentage-thresholds | 0.8 | Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow. |
289-
| master.worker-group-refresh-interval | 10s | The interval to refresh worker group from db to memory |
290-
| master.command-fetch-strategy.type | ID_SLOT_BASED | The command fetch strategy, only support `ID_SLOT_BASED` |
291-
| master.command-fetch-strategy.config.id-step | 1 | The id auto incremental step of t_ds_command in db |
292-
| master.command-fetch-strategy.config.fetch-size | 10 | The number of commands fetched by master |
278+
| Parameters | Default value | Description |
279+
|-----------------------------------------------------------------------------|------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------|
280+
| master.listen-port | 5678 | master listen port |
281+
| master.logic-task-config.task-executor-thread-count | 2 * CPU +1 | The thread size used to execute logic task |
282+
| master.worker-load-balancer-configuration-properties.type | DYNAMIC_WEIGHTED_ROUND_ROBIN | Master will use the worker's cpu/memory/threadPool usage to calculate the worker load, the lower load will have more change to be dispatched task |
283+
| master.max-heartbeat-interval | 10s | master max heartbeat interval |
284+
| master.server-load-protection.enabled | true | If set true, will open master overload protection |
285+
| master.server-load-protection.max-system-cpu-usage-percentage-thresholds | 0.8 | Master max system cpu usage, when the master's system cpu usage is smaller then this value, master server can execute workflow. |
286+
| master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds | 0.8 | Master max JVM cpu usage, when the master's jvm cpu usage is smaller then this value, master server can execute workflow. |
287+
| master.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.8 | Master max system memory usage , when the master's system memory usage is smaller then this value, master server can execute workflow. |
288+
| master.server-load-protection.max-disk-usage-percentage-thresholds | 0.8 | Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow. |
289+
| master.server-load-protection.max-concurrent-workflow-instances | 2147483647 | Master max concurrent workflow instances, when the master's workflow instance count reaches or exceeds this value, master server will be marked as busy. |
290+
| master.worker-group-refresh-interval | 10s | The interval to refresh worker group from db to memory |
291+
| master.command-fetch-strategy.type | ID_SLOT_BASED | The command fetch strategy, only support `ID_SLOT_BASED` |
292+
| master.command-fetch-strategy.config.id-step | 1 | The id auto incremental step of t_ds_command in db |
293+
| master.command-fetch-strategy.config.fetch-size | 10 | The number of commands fetched by master |
293294

294295
### Worker Server related configuration
295296

docs/docs/zh/architecture/configuration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,7 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId
291291
| master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds | 0.7 | master最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的JVM CPU |
292292
| master.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.7 | master最大系统 内存使用值,只有当前系统内存使用值低于最大系统内存使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统内存 |
293293
| master.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | master最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统磁盘空间 |
294+
| master.server-load-protection.max-concurrent-workflow-instances | 2147483647 | Master最大并发工作流实例数. 当Master的工作流实例数达到或超过此值时,Master服务将被标记为繁忙. |
294295
| master.failover-interval | 10 | failover间隔,单位为分钟 |
295296
| master.kill-application-when-task-failover | true | 当任务实例failover时,是否kill掉yarn或k8s application |
296297
| master.master.worker-group-refresh-interval | 10s | 定期将workerGroup从数据库中同步到内存的时间间隔 |

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import lombok.Data;
2929
import lombok.extern.slf4j.Slf4j;
3030

31+
import org.springframework.beans.factory.annotation.Autowired;
3132
import org.springframework.boot.context.properties.ConfigurationProperties;
3233
import org.springframework.context.annotation.Configuration;
3334
import org.springframework.validation.Errors;
@@ -55,7 +56,8 @@ public class MasterConfig implements Validator {
5556
*/
5657
private Duration maxHeartbeatInterval = Duration.ofSeconds(10);
5758

58-
private MasterServerLoadProtection serverLoadProtection = new MasterServerLoadProtection();
59+
@Autowired
60+
private MasterServerLoadProtection serverLoadProtection;
5961

6062
private Duration workerGroupRefreshInterval = Duration.ofMinutes(5);
6163

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtection.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,54 @@
1818
package org.apache.dolphinscheduler.server.master.config;
1919

2020
import org.apache.dolphinscheduler.meter.metrics.BaseServerLoadProtection;
21+
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
22+
import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository;
2123

24+
import lombok.Getter;
2225
import lombok.extern.slf4j.Slf4j;
2326

2427
@Slf4j
28+
@Getter
2529
public class MasterServerLoadProtection extends BaseServerLoadProtection {
2630

31+
private final int maxConcurrentWorkflowInstances;
32+
private final IWorkflowRepository workflowRepository;
33+
34+
public MasterServerLoadProtection(IWorkflowRepository workflowRepository,
35+
int maxConcurrentWorkflowInstances,
36+
double maxSystemCpuUsagePercentageThresholds,
37+
double maxJvmCpuUsagePercentageThresholds,
38+
double maxSystemMemoryUsagePercentageThresholds,
39+
double maxDiskUsagePercentageThresholds,
40+
boolean enabled) {
41+
this.workflowRepository = workflowRepository;
42+
this.maxConcurrentWorkflowInstances = maxConcurrentWorkflowInstances;
43+
this.maxSystemCpuUsagePercentageThresholds = maxSystemCpuUsagePercentageThresholds;
44+
this.maxJvmCpuUsagePercentageThresholds = maxJvmCpuUsagePercentageThresholds;
45+
this.maxSystemMemoryUsagePercentageThresholds = maxSystemMemoryUsagePercentageThresholds;
46+
this.maxDiskUsagePercentageThresholds = maxDiskUsagePercentageThresholds;
47+
this.enabled = enabled;
48+
}
49+
50+
@Override
51+
public boolean isOverload(SystemMetrics systemMetrics) {
52+
if (!enabled) {
53+
return false;
54+
}
55+
56+
// Check system metrics first
57+
if (super.isOverload(systemMetrics)) {
58+
return true;
59+
}
60+
61+
// Check workflow instance count
62+
int currentWorkflowInstanceCount = workflowRepository.getAll().size();
63+
if (currentWorkflowInstanceCount >= maxConcurrentWorkflowInstances) {
64+
log.info(
65+
"OverLoad: the workflow instance count: {} exceeds the maxConcurrentWorkflowInstances {}",
66+
currentWorkflowInstanceCount, maxConcurrentWorkflowInstances);
67+
return true;
68+
}
69+
return false;
70+
}
2771
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.config;
19+
20+
import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository;
21+
22+
import lombok.extern.slf4j.Slf4j;
23+
24+
import org.springframework.beans.factory.annotation.Value;
25+
import org.springframework.context.annotation.Bean;
26+
import org.springframework.context.annotation.Configuration;
27+
28+
@Slf4j
29+
@Configuration
30+
public class MasterServerLoadProtectionConfig {
31+
32+
@Bean
33+
public MasterServerLoadProtection masterServerLoadProtection(
34+
IWorkflowRepository workflowRepository,
35+
@Value("${master.server-load-protection.max-concurrent-workflow-instances:2147483647}") int maxConcurrentWorkflowInstances,
36+
@Value("${master.server-load-protection.max-system-cpu-usage-percentage-thresholds:0.7}") double maxSystemCpuUsagePercentageThresholds,
37+
@Value("${master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds:0.7}") double maxJvmCpuUsagePercentageThresholds,
38+
@Value("${master.server-load-protection.max-system-memory-usage-percentage-thresholds:0.7}") double maxSystemMemoryUsagePercentageThresholds,
39+
@Value("${master.server-load-protection.max-disk-usage-percentage-thresholds:0.7}") double maxDiskUsagePercentageThresholds,
40+
@Value("${master.server-load-protection.enabled:true}") boolean enabled) {
41+
MasterServerLoadProtection protection =
42+
new MasterServerLoadProtection(workflowRepository,
43+
maxConcurrentWorkflowInstances,
44+
maxSystemCpuUsagePercentageThresholds,
45+
maxJvmCpuUsagePercentageThresholds,
46+
maxSystemMemoryUsagePercentageThresholds,
47+
maxDiskUsagePercentageThresholds,
48+
enabled);
49+
log.info(
50+
"Initialized MasterServerLoadProtection with IWorkflowRepository and maxConcurrentWorkflowInstances={}, "
51+
+
52+
"maxSystemCpuUsagePercentageThresholds={}, maxJvmCpuUsagePercentageThresholds={}, " +
53+
"maxSystemMemoryUsagePercentageThresholds={}, maxDiskUsagePercentageThresholds={}, enabled={}",
54+
maxConcurrentWorkflowInstances, maxSystemCpuUsagePercentageThresholds,
55+
maxJvmCpuUsagePercentageThresholds,
56+
maxSystemMemoryUsagePercentageThresholds, maxDiskUsagePercentageThresholds, enabled);
57+
return protection;
58+
}
59+
}

dolphinscheduler-master/src/main/resources/application.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ master:
101101
max-system-memory-usage-percentage-thresholds: 0.8
102102
# Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow.
103103
max-disk-usage-percentage-thresholds: 0.8
104+
# Master max concurrent workflow instances, when the master's workflow instance count exceeds this value, master server will be marked as busy.
105+
max-concurrent-workflow-instances: 2147483647
104106
worker-group-refresh-interval: 5m
105107
command-fetch-strategy:
106108
type: ID_SLOT_BASED

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterConfigTest.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,46 @@
2020
import static com.google.common.truth.Truth.assertThat;
2121
import static org.junit.jupiter.api.Assertions.assertEquals;
2222
import static org.junit.jupiter.api.Assertions.assertTrue;
23+
import static org.mockito.Mockito.mock;
2324

2425
import org.apache.dolphinscheduler.server.master.cluster.loadbalancer.WorkerLoadBalancerConfigurationProperties;
2526
import org.apache.dolphinscheduler.server.master.cluster.loadbalancer.WorkerLoadBalancerType;
27+
import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository;
2628

2729
import org.junit.jupiter.api.Test;
2830
import org.springframework.beans.factory.annotation.Autowired;
2931
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
3032
import org.springframework.boot.test.context.SpringBootTest;
33+
import org.springframework.boot.test.context.TestConfiguration;
34+
import org.springframework.context.annotation.Bean;
35+
import org.springframework.test.context.TestPropertySource;
3136

3237
@AutoConfigureMockMvc
33-
@SpringBootTest(classes = MasterConfig.class)
38+
@SpringBootTest(classes = {
39+
MasterConfig.class,
40+
MasterServerLoadProtectionConfig.class,
41+
MasterConfigTest.TestBeans.class
42+
})
43+
@TestPropertySource(properties = {
44+
"master.server-load-protection.max-system-cpu-usage-percentage-thresholds=0.9",
45+
"master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds=0.9",
46+
"master.server-load-protection.max-system-memory-usage-percentage-thresholds=0.9",
47+
"master.server-load-protection.max-disk-usage-percentage-thresholds=0.9"
48+
})
3449
public class MasterConfigTest {
3550

3651
@Autowired
3752
private MasterConfig masterConfig;
3853

54+
@TestConfiguration
55+
static class TestBeans {
56+
57+
@Bean
58+
public IWorkflowRepository workflowRepository() {
59+
return mock(IWorkflowRepository.class);
60+
}
61+
}
62+
3963
@Test
4064
public void getServerLoadProtection() {
4165
MasterServerLoadProtection serverLoadProtection = masterConfig.getServerLoadProtection();

0 commit comments

Comments
 (0)