Skip to content

Commit b186671

Browse files
committed
Support configurable maximum runtime for workflow/task instances
1 parent 6a5790b commit b186671

8 files changed

Lines changed: 108 additions & 51 deletions

File tree

deploy/kubernetes/dolphinscheduler/values.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,10 @@ master:
565565
MASTER_SERVER_LOAD_PROTECTION_MAX_SYSTEM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS: 0.7
566566
# -- Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow.
567567
MASTER_SERVER_LOAD_PROTECTION_MAX_DISK_USAGE_PERCENTAGE_THRESHOLDS: 0.7
568+
# Maximum allowed running time for a workflow instance. If the running duration exceeds this value, the instance will be killed. The default value of 0d indicates no limit.
569+
MASTER_SERVER_LOAD_PROTECTION_MAX_WORKFLOW_INSTANCE_RUNTIME: 0d
570+
# Maximum allowed running time for a task instance. If the running duration exceeds this value, the instance will be killed. The default value of 0d indicates no limit.
571+
MASTER_SERVER_LOAD_PROTECTION_MAX_TASK_INSTANCE_RUNTIME: 0d
568572
# -- Master failover interval, the unit is minute
569573
MASTER_FAILOVER_INTERVAL: "10m"
570574
# -- Master kill application when handle failover

docs/docs/en/architecture/configuration.md

Lines changed: 25 additions & 23 deletions
Large diffs are not rendered by default.

docs/docs/zh/architecture/configuration.md

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -303,22 +303,24 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId
303303

304304
位置:`worker-server/conf/application.yaml`
305305

306-
| 参数 | 默认值 | 描述 |
307-
|-----------------------------------------------------------------------------|-----------|-----------------------------------------------------------------------------------------|
308-
| worker.listen-port | 1234 | worker监听端口 |
309-
| worker.max-heartbeat-interval | 10s | worker最大心跳间隔 |
310-
| worker.host-weight | 100 | 派发任务时,worker主机的权重 |
311-
| worker.tenant-auto-create | true | 租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。 |
312-
| worker.server-load-protection.enabled | true | 是否开启系统保护策略 |
313-
| worker.server-load-protection.max-system-cpu-usage-percentage-thresholds | 0.8 | worker最大系统cpu使用值,只有当前系统cpu使用值低于最大系统cpu使用值,worker服务才能接收任务. 默认值为0.8: 会使用80%的操作系统CPU |
314-
| worker.server-load-protection.max-jvm-cpu-usage-percentage-thresholds | 0.8 | worker最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM cpu使用值,worker服务才能接收任务. 默认值为0.8: 会使用80%的JVM CPU |
315-
| worker.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.8 | worker最大系统 内存使用值,只有当前系统内存使用值低于最大系统内存使用值,worker服务才能接收任务. 默认值为0.8: 会使用80%的操作系统内存 |
316-
| worker.server-load-protection.max-disk-usage-percentage-thresholds | 0.8 | worker最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,worker服务才能接收任务. 默认值为0.8: 会使用80%的操作系统磁盘空间 |
317-
| worker.alert-listen-host | localhost | alert监听host |
318-
| worker.alert-listen-port | 50052 | alert监听端口 |
319-
| worker.physical-task-config.task-executor-thread-size | 100 | Worker中任务最大并发度 |
320-
| worker.tenant-config.auto-create-tenant-enabled | true | 租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。 |
321-
| worker.tenant-config.default-tenant-enabled | false | 如果设置为true, 将会使用worker服务启动用户作为 `default` 租户。 |
306+
| 默认值 | 参数 | 描述 |
307+
|-----------|-----------------------------------------------------------------------------|-----------------------------------------------------------------------------------------|
308+
| 1234 | worker.listen-port | worker监听端口 |
309+
| 10s | worker.max-heartbeat-interval | worker最大心跳间隔 |
310+
| 100 | worker.host-weight | 派发任务时,worker主机的权重 |
311+
| true | worker.tenant-auto-create | 租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。 |
312+
| true | worker.server-load-protection.enabled | 是否开启系统保护策略 |
313+
| 0.8 | worker.server-load-protection.max-system-cpu-usage-percentage-thresholds | worker最大系统cpu使用值,只有当前系统cpu使用值低于最大系统cpu使用值,worker服务才能接收任务. 默认值为0.8: 会使用80%的操作系统CPU |
314+
| 0.8 | worker.server-load-protection.max-jvm-cpu-usage-percentage-thresholds | worker最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM cpu使用值,worker服务才能接收任务. 默认值为0.8: 会使用80%的JVM CPU |
315+
| 0.8 | worker.server-load-protection.max-system-memory-usage-percentage-thresholds | worker最大系统 内存使用值,只有当前系统内存使用值低于最大系统内存使用值,worker服务才能接收任务. 默认值为0.8: 会使用80%的操作系统内存 |
316+
| 0.8 | worker.server-load-protection.max-disk-usage-percentage-thresholds | worker最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,worker服务才能接收任务. 默认值为0.8: 会使用80%的操作系统磁盘空间 |
317+
| 0m | master.server-load-protection.max-workflow-instance-runtime | 一个工作流实例最大的运行时间,如果超过这个时间,实例会被kill。 默认值为 0d 表示没有限制, 最小值为1分钟。 |
318+
| 0m | master.server-load-protection.max-task-instance-runtime | 一个任务实例最大的运行时间,如果超过这个时间,实例会被kill。 默认值为 0d 表示没有限制, 最小值为1分钟。 |
319+
| localhost | worker.alert-listen-host | alert监听host |
320+
| 50052 | worker.alert-listen-port | alert监听端口 |
321+
| 100 | worker.physical-task-config.task-executor-thread-size | Worker中任务最大并发度 |
322+
| true | worker.tenant-config.auto-create-tenant-enabled | 租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。 |
323+
| false | worker.tenant-config.default-tenant-enabled | 如果设置为true, 将会使用worker服务启动用户作为 `default` 租户。 |
322324

323325
## Alert Server相关配置
324326

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ public void validate(Object target, Errors errors) {
100100
if (StringUtils.isEmpty(masterConfig.getMasterAddress())) {
101101
masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
102102
}
103+
serverLoadProtection.validate(errors);
103104
commandFetchStrategy.validate(errors);
104105
workerLoadBalancerConfigurationProperties.validate(errors);
105106

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtectionConfig.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,36 @@
1919

2020
import org.apache.dolphinscheduler.meter.metrics.BaseServerLoadProtectionConfig;
2121

22+
import java.time.Duration;
23+
2224
import lombok.Data;
2325
import lombok.EqualsAndHashCode;
2426

27+
import org.springframework.validation.Errors;
28+
2529
@Data
2630
@EqualsAndHashCode(callSuper = true)
2731
public class MasterServerLoadProtectionConfig extends BaseServerLoadProtectionConfig {
2832

2933
private int maxConcurrentWorkflowInstances = Integer.MAX_VALUE;
3034

35+
private Duration maxWorkflowInstanceRuntime = Duration.ofDays(0);
36+
37+
private Duration maxTaskInstanceRuntime = Duration.ofDays(0);
38+
39+
public void validate(Errors errors) {
40+
if (maxConcurrentWorkflowInstances <= 0) {
41+
errors.rejectValue("maxConcurrentWorkflowInstances", null,
42+
"maxConcurrentWorkflowInstances must be greater than 0");
43+
}
44+
if (!maxWorkflowInstanceRuntime.isZero() &&
45+
maxWorkflowInstanceRuntime.compareTo(Duration.ofMinutes(1)) < 0) {
46+
errors.rejectValue("maxWorkflowInstanceRuntime", null,
47+
"maxWorkflowInstanceRuntime must be 0 (disabled) or >= 1m");
48+
}
49+
if (!maxTaskInstanceRuntime.isZero() &&
50+
maxTaskInstanceRuntime.compareTo(Duration.ofMinutes(1)) < 0) {
51+
errors.rejectValue("maxTaskInstanceRuntime", null, "maxTaskInstanceRuntime must be 0 (disabled) or >= 1m");
52+
}
53+
}
3154
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskTimeoutLifecycleEvent.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919

2020
import static com.google.common.base.Preconditions.checkState;
2121

22-
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
2322
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
23+
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
2424
import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
2525
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.AbstractTaskLifecycleEvent;
2626
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType;
@@ -35,23 +35,27 @@ public class TaskTimeoutLifecycleEvent extends AbstractTaskLifecycleEvent {
3535

3636
private final ITaskExecutionRunnable taskExecutionRunnable;
3737

38+
private final TaskTimeoutStrategy timeoutStrategy;
39+
3840
protected TaskTimeoutLifecycleEvent(final ITaskExecutionRunnable taskExecutionRunnable,
41+
final TaskTimeoutStrategy timeoutStrategy,
3942
final long timeout) {
4043
super(timeout);
44+
this.timeoutStrategy = timeoutStrategy;
4145
this.taskExecutionRunnable = taskExecutionRunnable;
4246
}
4347

44-
public static TaskTimeoutLifecycleEvent of(final ITaskExecutionRunnable taskExecutionRunnable) {
45-
final TaskDefinition taskDefinition = taskExecutionRunnable.getTaskDefinition();
48+
public static TaskTimeoutLifecycleEvent of(final ITaskExecutionRunnable taskExecutionRunnable,
49+
final TaskTimeoutStrategy timeoutStrategy,
50+
final long timeoutInMinutes) {
4651
final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance();
47-
checkState(taskDefinition != null, "The task instance must be initialized before retrying.");
4852

49-
final int timeout = taskDefinition.getTimeout();
50-
checkState(timeout >= 0, "The task timeout: %s must >=0 minutes", timeout);
53+
checkState(timeoutStrategy != null, "The task timeoutStrategy must not be null");
54+
checkState(timeoutInMinutes >= 0, "The task timeout: %s must >=0 minutes", timeoutInMinutes);
5155

5256
long delayTime = System.currentTimeMillis() - taskInstance.getSubmitTime().getTime()
53-
+ TimeUnit.MINUTES.toMillis(timeout);
54-
return new TaskTimeoutLifecycleEvent(taskExecutionRunnable, delayTime);
57+
+ TimeUnit.SECONDS.toMillis(timeoutInMinutes);
58+
return new TaskTimeoutLifecycleEvent(taskExecutionRunnable, timeoutStrategy, delayTime);
5559
}
5660

5761
@Override

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/handler/TaskStartLifecycleEventHandler.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.dolphinscheduler.server.master.engine.task.lifecycle.handler;
1919

2020
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
21+
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
22+
import org.apache.dolphinscheduler.server.master.config.MasterServerLoadProtectionConfig;
2123
import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
2224
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType;
2325
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskStartLifecycleEvent;
@@ -28,12 +30,16 @@
2830

2931
import lombok.extern.slf4j.Slf4j;
3032

33+
import org.springframework.beans.factory.annotation.Autowired;
3134
import org.springframework.stereotype.Component;
3235

3336
@Slf4j
3437
@Component
3538
public class TaskStartLifecycleEventHandler extends AbstractTaskLifecycleEventHandler<TaskStartLifecycleEvent> {
3639

40+
@Autowired
41+
private MasterServerLoadProtectionConfig masterServerLoadProtectionConfig;
42+
3743
@Override
3844
public void handle(final IWorkflowExecutionRunnable workflowExecutionRunnable,
3945
final TaskStartLifecycleEvent taskStartLifecycleEvent) {
@@ -63,13 +69,20 @@ public ILifecycleEventType matchEventType() {
6369

6470
private void taskTimeoutMonitor(final ITaskExecutionRunnable taskExecutionRunnable) {
6571
final TaskDefinition taskDefinition = taskExecutionRunnable.getTaskDefinition();
66-
if (taskDefinition.getTimeout() <= 0) {
72+
int taskTimeout = taskDefinition.getTimeout();
73+
if (taskTimeout > 0 && taskDefinition.getTimeoutNotifyStrategy() != null) {
6774
log.debug("The task {} timeout {} is invalided, so the timeout monitor will not be started.",
6875
taskDefinition.getName(),
6976
taskDefinition.getTimeout());
70-
return;
77+
taskExecutionRunnable.getWorkflowEventBus().publish(TaskTimeoutLifecycleEvent.of(
78+
taskExecutionRunnable, taskDefinition.getTimeoutNotifyStrategy(), taskTimeout));
79+
}
80+
81+
int systemTimeout = (int) masterServerLoadProtectionConfig.getMaxTaskInstanceRuntime().toMinutes();
82+
if (systemTimeout > 0) {
83+
taskExecutionRunnable.getWorkflowEventBus().publish(TaskTimeoutLifecycleEvent.of(
84+
taskExecutionRunnable, TaskTimeoutStrategy.FAILED, systemTimeout));
7185
}
72-
taskExecutionRunnable.getWorkflowEventBus().publish(TaskTimeoutLifecycleEvent.of(taskExecutionRunnable));
7386
}
7487

7588
}

dolphinscheduler-master/src/main/resources/application.yaml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,15 @@ master:
110110
max-disk-usage-percentage-thresholds: 0.8
111111
# Master max concurrent workflow instances, when the master's workflow instance count exceeds this value, master server will be marked as busy.
112112
max-concurrent-workflow-instances: 2147483647
113-
worker-group-refresh-interval: 5m
113+
# Maximum allowed running time for a workflow instance.
114+
# If the running duration exceeds this value, the instance will be killed.
115+
# The default value of 0d indicates no limit.
116+
max-workflow-instance-runtime: 0d
117+
# Maximum allowed running time for a task instance.
118+
# If the running duration exceeds this value, the instance will be killed.
119+
# The default value of 0d indicates no limit.
120+
max-task-instance-runtime: 0d
121+
worker-group-refresh-interval: 0m
114122
command-fetch-strategy:
115123
type: ID_SLOT_BASED
116124
config:

0 commit comments

Comments
 (0)