Skip to content

Commit f5469d7

Browse files
authored
[DSIP-55][Master] Separate the waiting dispatched task into different queue by worker group (#17037)
1 parent b66d099 commit f5469d7

14 files changed

Lines changed: 808 additions & 175 deletions

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
4242
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
4343
import org.apache.dolphinscheduler.server.master.rpc.MasterRpcServer;
44+
import org.apache.dolphinscheduler.server.master.runner.WorkerGroupTaskDispatcherManager;
4445
import org.apache.dolphinscheduler.server.master.utils.MasterThreadFactory;
4546
import org.apache.dolphinscheduler.service.ServiceConfiguration;
4647
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -99,6 +100,9 @@ public class MasterServer implements IStoppable {
99100
@Autowired
100101
private MasterCoordinator masterCoordinator;
101102

103+
@Autowired
104+
private WorkerGroupTaskDispatcherManager workerGroupTaskDispatcherManager;
105+
102106
public static void main(String[] args) {
103107
MasterServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);
104108

@@ -129,6 +133,7 @@ public void initialized() {
129133
this.masterCoordinator.start();
130134

131135
this.clusterManager.start();
136+
132137
this.clusterStateMonitors.start();
133138

134139
this.workflowEngine.start();
@@ -183,7 +188,9 @@ public void close(String cause) {
183188
MasterRegistryClient closedMasterRegistryClient = masterRegistryClient;
184189
// close spring Context and will invoke method with @PreDestroy annotation to destroy beans.
185190
// like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
186-
SpringApplicationContext closedSpringContext = springApplicationContext) {
191+
SpringApplicationContext closedSpringContext = springApplicationContext;
192+
WorkerGroupTaskDispatcherManager closeWorkerGroupTaskDispatcherManager =
193+
workerGroupTaskDispatcherManager) {
187194

188195
log.info("MasterServer is stopping, current cause : {}", cause);
189196
} catch (Exception e) {

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@
1818
package org.apache.dolphinscheduler.server.master.runner;
1919

2020
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
21-
import org.apache.dolphinscheduler.server.master.runner.queue.DelayEntry;
22-
import org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue;
21+
import org.apache.dolphinscheduler.server.master.runner.queue.TimeBasedTaskExecutionRunnableComparableEntry;
2322

2423
import java.util.Set;
2524
import java.util.concurrent.ConcurrentHashMap;
25+
import java.util.concurrent.DelayQueue;
26+
import java.util.concurrent.PriorityBlockingQueue;
2627

2728
import lombok.SneakyThrows;
2829
import lombok.extern.slf4j.Slf4j;
@@ -31,43 +32,37 @@
3132

3233
/**
3334
* The class is used to store {@link ITaskExecutionRunnable} which needs to be dispatched. The {@link ITaskExecutionRunnable}
34-
* will be stored in {@link PriorityDelayQueue}, if the {@link ITaskExecutionRunnable}'s delay time is 0, then it will be
35+
* will be stored in {@link DelayQueue}, if the {@link ITaskExecutionRunnable}'s delay time is 0, then it will be
3536
* consumed by {@link GlobalTaskDispatchWaitingQueueLooper}.
3637
* <p>
37-
* The order of {@link ITaskExecutionRunnable} in the {@link PriorityDelayQueue} is determined by {@link ITaskExecutionRunnable#compareTo}.
38+
* The order of {@link ITaskExecutionRunnable} in the {@link DelayQueue} is determined by {@link ITaskExecutionRunnable#compareTo}.
3839
*/
3940
@Slf4j
4041
@Component
4142
public class GlobalTaskDispatchWaitingQueue {
4243

4344
private final Set<Integer> waitingTaskInstanceIds = ConcurrentHashMap.newKeySet();
44-
private final PriorityDelayQueue<DelayEntry<ITaskExecutionRunnable>> priorityDelayQueue =
45-
new PriorityDelayQueue<>();
4645

47-
/**
48-
* Submit a {@link ITaskExecutionRunnable} with delay time 0, it will be consumed immediately.
49-
*/
50-
public synchronized void dispatchTaskExecuteRunnable(ITaskExecutionRunnable ITaskExecutionRunnable) {
51-
dispatchTaskExecuteRunnableWithDelay(ITaskExecutionRunnable, 0);
52-
}
46+
private final DelayQueue<TimeBasedTaskExecutionRunnableComparableEntry> delayQueue =
47+
new DelayQueue<>();
5348

5449
/**
5550
* Submit a {@link ITaskExecutionRunnable} with delay time, if the delay time <= 0 then it can be consumed.
5651
*/
5752
public synchronized void dispatchTaskExecuteRunnableWithDelay(ITaskExecutionRunnable taskExecutionRunnable,
5853
long delayTimeMills) {
5954
waitingTaskInstanceIds.add(taskExecutionRunnable.getTaskInstance().getId());
60-
priorityDelayQueue.add(new DelayEntry<>(delayTimeMills, taskExecutionRunnable));
55+
delayQueue.add(new TimeBasedTaskExecutionRunnableComparableEntry(delayTimeMills, taskExecutionRunnable));
6156
}
6257

6358
/**
64-
* Consume {@link ITaskExecutionRunnable} from the {@link PriorityDelayQueue}, only the delay time <= 0 can be consumed.
59+
* Consume {@link ITaskExecutionRunnable} from the {@link PriorityBlockingQueue}, only the delay time <= 0 can be consumed.
6560
*/
6661
@SneakyThrows
6762
public ITaskExecutionRunnable takeTaskExecuteRunnable() {
68-
ITaskExecutionRunnable taskExecutionRunnable = priorityDelayQueue.take().getData();
63+
ITaskExecutionRunnable taskExecutionRunnable = (ITaskExecutionRunnable) delayQueue.take().getData();
6964
while (!markTaskExecutionRunnableRemoved(taskExecutionRunnable)) {
70-
taskExecutionRunnable = priorityDelayQueue.take().getData();
65+
taskExecutionRunnable = (ITaskExecutionRunnable) delayQueue.take().getData();
7166
}
7267
return taskExecutionRunnable;
7368
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
2121
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
2222
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
23-
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
2423
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
2524

2625
import java.util.concurrent.atomic.AtomicBoolean;
@@ -38,7 +37,7 @@ public class GlobalTaskDispatchWaitingQueueLooper extends BaseDaemonThread imple
3837
private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;
3938

4039
@Autowired
41-
private ITaskExecutorClient taskExecutorClient;
40+
private WorkerGroupTaskDispatcherManager workerGroupTaskDispatcherManager;
4241

4342
private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);
4443

@@ -73,23 +72,35 @@ void doDispatch() {
7372
log.warn("The TaskInstance {} state is : {}, will not dispatch", taskInstance.getName(), status);
7473
return;
7574
}
76-
taskExecutorClient.dispatch(taskExecutionRunnable);
75+
this.dispatchTaskToWorkerGroup(taskExecutionRunnable);
7776
} catch (Exception e) {
78-
// If dispatch failed, will put the task back to the queue
79-
// The task will be dispatched after waiting time.
80-
// the waiting time will increase multiple of times, but will not exceed 60 seconds
81-
long waitingTimeMills = Math.min(
82-
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() * 1_000L, 60_000L);
83-
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecutionRunnable,
84-
waitingTimeMills);
85-
log.error("Dispatch Task: {} failed will retry after: {}/ms", taskInstance.getName(), waitingTimeMills, e);
77+
this.delayRetryDispatch(taskExecutionRunnable, e);
8678
}
8779
}
8880

81+
private void delayRetryDispatch(ITaskExecutionRunnable taskExecutionRunnable, Exception e) {
82+
// If dispatch failed, will put the task back to the queue
83+
// The task will be dispatched after waiting time.
84+
// the waiting time will increase multiple of times, but will not exceed 60 seconds
85+
long waitingTimeMills = Math.min(
86+
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() * 1_000L, 60_000L);
87+
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecutionRunnable,
88+
waitingTimeMills);
89+
log.error("Dispatch Task: {} failed will retry after: {}/ms", taskExecutionRunnable.getTaskInstance().getName(),
90+
waitingTimeMills, e);
91+
}
92+
93+
private void dispatchTaskToWorkerGroup(ITaskExecutionRunnable taskExecutionRunnable) {
94+
workerGroupTaskDispatcherManager.addTaskToWorkerGroup(
95+
taskExecutionRunnable.getTaskInstance().getWorkerGroup(),
96+
taskExecutionRunnable, 0);
97+
}
98+
8999
@Override
90100
public void close() throws Exception {
91101
if (RUNNING_FLAG.compareAndSet(true, false)) {
92102
log.info("GlobalTaskDispatchWaitingQueueLooper stopping...");
103+
workerGroupTaskDispatcherManager.close();
93104
log.info("GlobalTaskDispatchWaitingQueueLooper stopped...");
94105
} else {
95106
log.error("GlobalTaskDispatchWaitingQueueLooper is not started");
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.runner;
19+
20+
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
21+
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
22+
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
23+
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
24+
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
25+
import org.apache.dolphinscheduler.server.master.runner.queue.PriorityAndDelayBasedTaskEntry;
26+
import org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue;
27+
28+
import java.util.concurrent.atomic.AtomicBoolean;
29+
30+
import lombok.extern.slf4j.Slf4j;
31+
32+
/**
33+
* WorkerGroupTaskDispatcher is responsible for dispatching tasks from the task queue.
34+
* The main responsibilities include:
35+
* 1. Continuously fetching tasks from the {@link PriorityDelayQueue} for dispatch.
36+
* 2. Re-queuing tasks that fail to dispatch according to retry logic.
37+
* 3. Ensuring thread safety and correct state transitions during task processing.
38+
*/
39+
@Slf4j
40+
public class WorkerGroupTaskDispatcher extends BaseDaemonThread {
41+
42+
private final ITaskExecutorClient taskExecutorClient;
43+
44+
// TODO The current queue is flawed. When a high-priority task fails,
45+
// it will be delayed and will not return to the first or second position.
46+
// Tasks with the same priority will preempt its position.
47+
// If it needs to be placed at the front of the queue, the queue needs to be re-implemented.
48+
private final PriorityDelayQueue<PriorityAndDelayBasedTaskEntry<ITaskExecutionRunnable>> workerGroupQueue;
49+
50+
private final AtomicBoolean runningFlag = new AtomicBoolean(false);
51+
52+
public WorkerGroupTaskDispatcher(String workerGroupName, ITaskExecutorClient taskExecutorClient) {
53+
super("WorkerGroupTaskDispatcher-" + workerGroupName);
54+
this.taskExecutorClient = taskExecutorClient;
55+
this.workerGroupQueue = new PriorityDelayQueue<>();
56+
}
57+
58+
/**
59+
* Adds a task to the worker group queue.
60+
* This method wraps the given task execution object into a priority and delay-based task entry and adds it to the worker group queue.
61+
* The task is only added if the current dispatcher status is either STARTED or INIT. If the dispatcher is in any other state,
62+
* the task addition will fail, and a warning message will be logged.
63+
*
64+
* @param taskExecutionRunnable The task execution object to add to the queue, which implements the {@link ITaskExecutionRunnable} interface.
65+
* @param delayTimeMills The delay time in milliseconds before the task should be executed.
66+
*/
67+
public void addTaskToWorkerGroupQueue(ITaskExecutionRunnable taskExecutionRunnable,
68+
long delayTimeMills) {
69+
workerGroupQueue.add(new PriorityAndDelayBasedTaskEntry<>(delayTimeMills, taskExecutionRunnable));
70+
}
71+
72+
@Override
73+
public synchronized void start() {
74+
if (runningFlag.compareAndSet(false, true)) {
75+
log.info("The {} starting...", this.getName());
76+
super.start();
77+
log.info("The {} started", this.getName());
78+
} else {
79+
log.error("The {} status is {}, will not start again", this.getName(), runningFlag.get());
80+
}
81+
}
82+
83+
public synchronized void close() {
84+
log.info("The {} closed called but not implemented", this.getName());
85+
// todo WorkerGroupTaskDispatcher thread needs to be shut down after the WorkerGroup is deleted.
86+
}
87+
88+
@Override
89+
public void run() {
90+
while (runningFlag.get()) {
91+
dispatch();
92+
}
93+
}
94+
95+
private void dispatch() {
96+
PriorityAndDelayBasedTaskEntry<ITaskExecutionRunnable> taskEntry = workerGroupQueue.take();
97+
ITaskExecutionRunnable taskExecutionRunnable = taskEntry.getData();
98+
final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance();
99+
try {
100+
final TaskExecutionStatus taskStatus = taskInstance.getState();
101+
if (taskStatus != TaskExecutionStatus.SUBMITTED_SUCCESS
102+
&& taskStatus != TaskExecutionStatus.DELAY_EXECUTION) {
103+
log.warn("The TaskInstance {} state is : {}, will not dispatch", taskInstance.getName(), taskStatus);
104+
return;
105+
}
106+
taskExecutorClient.dispatch(taskExecutionRunnable);
107+
} catch (Exception e) {
108+
// If dispatch failed, will put the task back to the queue
109+
// The task will be dispatched after waiting time.
110+
// the waiting time will increase multiple of times, but will not exceed 60 seconds
111+
long waitingTimeMills = Math.min(
112+
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() * 1_000L, 60_000L);
113+
workerGroupQueue.add(new PriorityAndDelayBasedTaskEntry<>(waitingTimeMills, taskExecutionRunnable));
114+
log.error("Dispatch Task: {} failed will retry after: {}/ms", taskInstance.getName(), waitingTimeMills, e);
115+
}
116+
}
117+
118+
/**
119+
* ony use unit test
120+
* @return size
121+
*/
122+
protected int queueSize() {
123+
return this.workerGroupQueue.size();
124+
}
125+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.runner;
19+
20+
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
21+
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
22+
23+
import java.util.Map;
24+
import java.util.concurrent.ConcurrentHashMap;
25+
26+
import lombok.Getter;
27+
import lombok.extern.slf4j.Slf4j;
28+
29+
import org.springframework.beans.factory.annotation.Autowired;
30+
import org.springframework.stereotype.Component;
31+
32+
/**
33+
* WorkerGroupTaskDispatcherManager is responsible for managing the task dispatching for worker groups.
34+
* It maintains a mapping of worker groups to their task dispatchers and priority delay queues,
35+
* and supports adding tasks, starting and stopping worker groups, as well as cleaning up resources upon shutdown.
36+
*/
37+
@Component
38+
@Slf4j
39+
public class WorkerGroupTaskDispatcherManager implements AutoCloseable {
40+
41+
@Autowired
42+
private ITaskExecutorClient taskExecutorClient;
43+
44+
@Getter
45+
private final ConcurrentHashMap<String, WorkerGroupTaskDispatcher> dispatchWorkerMap;
46+
47+
public WorkerGroupTaskDispatcherManager() {
48+
dispatchWorkerMap = new ConcurrentHashMap<>();
49+
}
50+
51+
/**
52+
* Adds a task to the specified worker group queue and starts or wakes up the corresponding processing loop.
53+
*
54+
* @param workerGroup the identifier for the worker group, used to distinguish different task queues
55+
* @param taskExecutionRunnable an instance of ITaskExecutionRunnable representing the task to be executed
56+
* @param delayTimeMills the delay time before the task is executed, in milliseconds
57+
*/
58+
public synchronized void addTaskToWorkerGroup(String workerGroup, ITaskExecutionRunnable taskExecutionRunnable,
59+
long delayTimeMills) {
60+
WorkerGroupTaskDispatcher workerGroupTaskDispatcher = dispatchWorkerMap.computeIfAbsent(
61+
workerGroup, key -> new WorkerGroupTaskDispatcher(workerGroup, taskExecutorClient));
62+
if (!workerGroupTaskDispatcher.isAlive()) {
63+
workerGroupTaskDispatcher.start();
64+
}
65+
workerGroupTaskDispatcher.addTaskToWorkerGroupQueue(taskExecutionRunnable, delayTimeMills);
66+
}
67+
68+
/**
69+
* Stop all workerGroupTaskDispatchWaitingQueueLooper
70+
*/
71+
@Override
72+
public void close() throws Exception {
73+
log.info("WorkerGroupTaskDispatcherManager start close");
74+
for (Map.Entry<String, WorkerGroupTaskDispatcher> entry : dispatchWorkerMap.entrySet()) {
75+
try {
76+
entry.getValue().close();
77+
} catch (Exception e) {
78+
log.error("close worker group error", e);
79+
}
80+
}
81+
log.info("WorkerGroupTaskDispatcherManager closed");
82+
}
83+
}

0 commit comments

Comments
 (0)