Skip to content

Commit bdec8b1

Browse files
authored
[Improvement-17179][Master] Remove GlobalTaskDispatchWaitingQueue (#17180)
1 parent b9bd779 commit bdec8b1

14 files changed

Lines changed: 273 additions & 691 deletions

File tree

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,10 @@
3838
import org.apache.dolphinscheduler.server.master.engine.system.SystemEventBus;
3939
import org.apache.dolphinscheduler.server.master.engine.system.SystemEventBusFireWorker;
4040
import org.apache.dolphinscheduler.server.master.engine.system.event.GlobalMasterFailoverEvent;
41+
import org.apache.dolphinscheduler.server.master.engine.task.dispatcher.WorkerGroupDispatcherCoordinator;
4142
import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
4243
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
4344
import org.apache.dolphinscheduler.server.master.rpc.MasterRpcServer;
44-
import org.apache.dolphinscheduler.server.master.runner.WorkerGroupTaskDispatcherManager;
4545
import org.apache.dolphinscheduler.server.master.utils.MasterThreadFactory;
4646
import org.apache.dolphinscheduler.service.ServiceConfiguration;
4747
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -101,7 +101,7 @@ public class MasterServer implements IStoppable {
101101
private MasterCoordinator masterCoordinator;
102102

103103
@Autowired
104-
private WorkerGroupTaskDispatcherManager workerGroupTaskDispatcherManager;
104+
private WorkerGroupDispatcherCoordinator workerGroupDispatcherCoordinator;
105105

106106
public static void main(String[] args) {
107107
MasterServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);
@@ -189,8 +189,8 @@ public void close(String cause) {
189189
// close spring Context and will invoke method with @PreDestroy annotation to destroy beans.
190190
// like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
191191
SpringApplicationContext closedSpringContext = springApplicationContext;
192-
WorkerGroupTaskDispatcherManager closeWorkerGroupTaskDispatcherManager =
193-
workerGroupTaskDispatcherManager) {
192+
WorkerGroupDispatcherCoordinator closeWorkerGroupDispatcherCoordinator =
193+
workerGroupDispatcherCoordinator) {
194194

195195
log.info("MasterServer is stopping, current cause : {}", cause);
196196
} catch (Exception e) {

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEngine.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import org.apache.dolphinscheduler.server.master.engine.command.CommandEngine;
2121
import org.apache.dolphinscheduler.server.master.engine.executor.LogicTaskEngineDelegator;
22-
import org.apache.dolphinscheduler.server.master.runner.GlobalTaskDispatchWaitingQueueLooper;
22+
import org.apache.dolphinscheduler.server.master.engine.task.dispatcher.WorkerGroupDispatcherCoordinator;
2323

2424
import lombok.extern.slf4j.Slf4j;
2525

@@ -37,7 +37,7 @@ public class WorkflowEngine implements AutoCloseable {
3737
private CommandEngine commandEngine;
3838

3939
@Autowired
40-
private GlobalTaskDispatchWaitingQueueLooper globalTaskDispatchWaitingQueueLooper;
40+
private WorkerGroupDispatcherCoordinator workerGroupDispatcherCoordinator;
4141

4242
@Autowired
4343
private LogicTaskEngineDelegator logicTaskEngineDelegator;
@@ -48,7 +48,7 @@ public void start() {
4848

4949
commandEngine.start();
5050

51-
globalTaskDispatchWaitingQueueLooper.start();
51+
workerGroupDispatcherCoordinator.start();
5252

5353
logicTaskEngineDelegator.start();
5454

@@ -60,7 +60,7 @@ public void close() throws Exception {
6060
try (
6161
final CommandEngine ignore1 = commandEngine;
6262
final WorkflowEventBusCoordinator ignore2 = workflowEventBusCoordinator;
63-
final GlobalTaskDispatchWaitingQueueLooper ignore3 = globalTaskDispatchWaitingQueueLooper;
63+
final WorkerGroupDispatcherCoordinator ignore3 = workerGroupDispatcherCoordinator;
6464
final LogicTaskEngineDelegator ignore5 = logicTaskEngineDelegator) {
6565
// closed the resource
6666
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcher.java renamed to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java

Lines changed: 55 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,17 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.dolphinscheduler.server.master.runner;
18+
package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
1919

2020
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
21-
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
22-
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
2321
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
2422
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
2523
import org.apache.dolphinscheduler.server.master.runner.queue.PriorityAndDelayBasedTaskEntry;
2624
import org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue;
25+
import org.apache.dolphinscheduler.task.executor.log.TaskExecutorMDCUtils;
2726

27+
import java.util.Set;
28+
import java.util.concurrent.ConcurrentHashMap;
2829
import java.util.concurrent.atomic.AtomicBoolean;
2930

3031
import lombok.extern.slf4j.Slf4j;
@@ -37,7 +38,7 @@
3738
* 3. Ensuring thread safety and correct state transitions during task processing.
3839
*/
3940
@Slf4j
40-
public class WorkerGroupTaskDispatcher extends BaseDaemonThread {
41+
public class WorkerGroupDispatcher extends BaseDaemonThread {
4142

4243
private final ITaskExecutorClient taskExecutorClient;
4344

@@ -47,26 +48,16 @@ public class WorkerGroupTaskDispatcher extends BaseDaemonThread {
4748
// If it needs to be placed at the front of the queue, the queue needs to be re-implemented.
4849
private final PriorityDelayQueue<PriorityAndDelayBasedTaskEntry<ITaskExecutionRunnable>> workerGroupQueue;
4950

51+
private final Set<Integer> waitingDispatchTaskIds;
52+
5053
private final AtomicBoolean runningFlag = new AtomicBoolean(false);
5154

52-
public WorkerGroupTaskDispatcher(String workerGroupName, ITaskExecutorClient taskExecutorClient) {
55+
public WorkerGroupDispatcher(String workerGroupName, ITaskExecutorClient taskExecutorClient) {
5356
super("WorkerGroupTaskDispatcher-" + workerGroupName);
5457
this.taskExecutorClient = taskExecutorClient;
5558
this.workerGroupQueue = new PriorityDelayQueue<>();
56-
}
57-
58-
/**
59-
* Adds a task to the worker group queue.
60-
* This method wraps the given task execution object into a priority and delay-based task entry and adds it to the worker group queue.
61-
* The task is only added if the current dispatcher status is either STARTED or INIT. If the dispatcher is in any other state,
62-
* the task addition will fail, and a warning message will be logged.
63-
*
64-
* @param taskExecutionRunnable The task execution object to add to the queue, which implements the {@link ITaskExecutionRunnable} interface.
65-
* @param delayTimeMills The delay time in milliseconds before the task should be executed.
66-
*/
67-
public void addTaskToWorkerGroupQueue(ITaskExecutionRunnable taskExecutionRunnable,
68-
long delayTimeMills) {
69-
workerGroupQueue.add(new PriorityAndDelayBasedTaskEntry<>(delayTimeMills, taskExecutionRunnable));
59+
this.waitingDispatchTaskIds = ConcurrentHashMap.newKeySet();
60+
log.info("Initialize WorkerGroupDispatcher: {}", this.getName());
7061
}
7162

7263
@Override
@@ -80,27 +71,25 @@ public synchronized void start() {
8071
}
8172
}
8273

83-
public synchronized void close() {
84-
log.info("The {} closed called but not implemented", this.getName());
85-
// todo WorkerGroupTaskDispatcher thread needs to be shut down after the WorkerGroup is deleted.
86-
}
87-
8874
@Override
8975
public void run() {
9076
while (runningFlag.get()) {
91-
dispatch();
77+
PriorityAndDelayBasedTaskEntry<ITaskExecutionRunnable> taskEntry = workerGroupQueue.take();
78+
ITaskExecutionRunnable taskExecutionRunnable = taskEntry.getData();
79+
try (
80+
TaskExecutorMDCUtils.MDCAutoClosable ignore =
81+
TaskExecutorMDCUtils.logWithMDC(taskExecutionRunnable.getId())) {
82+
doDispatchTask(taskExecutionRunnable);
83+
}
9284
}
9385
}
9486

95-
private void dispatch() {
96-
PriorityAndDelayBasedTaskEntry<ITaskExecutionRunnable> taskEntry = workerGroupQueue.take();
97-
ITaskExecutionRunnable taskExecutionRunnable = taskEntry.getData();
98-
final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance();
87+
private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) {
9988
try {
100-
final TaskExecutionStatus taskStatus = taskInstance.getState();
101-
if (taskStatus != TaskExecutionStatus.SUBMITTED_SUCCESS
102-
&& taskStatus != TaskExecutionStatus.DELAY_EXECUTION) {
103-
log.warn("The TaskInstance {} state is : {}, will not dispatch", taskInstance.getName(), taskStatus);
89+
if (!waitingDispatchTaskIds.remove(taskExecutionRunnable.getId())) {
90+
log.info(
91+
"The task: {} doesn't exist in waitingDispatchTaskIds(it might be paused or killed), will skip dispatch",
92+
taskExecutionRunnable.getId());
10493
return;
10594
}
10695
taskExecutorClient.dispatch(taskExecutionRunnable);
@@ -110,16 +99,44 @@ private void dispatch() {
11099
// the waiting time will increase multiple of times, but will not exceed 60 seconds
111100
long waitingTimeMills = Math.min(
112101
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() * 1_000L, 60_000L);
113-
workerGroupQueue.add(new PriorityAndDelayBasedTaskEntry<>(waitingTimeMills, taskExecutionRunnable));
114-
log.error("Dispatch Task: {} failed will retry after: {}/ms", taskInstance.getName(), waitingTimeMills, e);
102+
dispatchTask(taskExecutionRunnable, waitingTimeMills);
103+
log.error("Dispatch Task: {} failed will retry after: {}/ms", taskExecutionRunnable.getId(),
104+
waitingTimeMills, e);
115105
}
116106
}
117107

118108
/**
119-
* ony use unit test
120-
* @return size
109+
* Adds a task to the worker group queue.
110+
* This method wraps the given task execution object into a priority and delay-based task entry and adds it to the worker group queue.
111+
* The task is only added if the current dispatcher status is either STARTED or INIT. If the dispatcher is in any other state,
112+
* the task addition will fail, and a warning message will be logged.
113+
*
114+
* @param taskExecutionRunnable The task execution object to add to the queue, which implements the {@link ITaskExecutionRunnable} interface.
115+
* @param delayTimeMills The delay time in milliseconds before the task should be executed.
121116
*/
122-
protected int queueSize() {
117+
public void dispatchTask(final ITaskExecutionRunnable taskExecutionRunnable, final long delayTimeMills) {
118+
waitingDispatchTaskIds.add(taskExecutionRunnable.getId());
119+
workerGroupQueue.add(new PriorityAndDelayBasedTaskEntry<>(delayTimeMills, taskExecutionRunnable));
120+
}
121+
122+
public boolean removeTask(ITaskExecutionRunnable taskExecutionRunnable) {
123+
return waitingDispatchTaskIds.remove(taskExecutionRunnable.getId());
124+
}
125+
126+
public boolean existTask(ITaskExecutionRunnable taskExecutionRunnable) {
127+
return waitingDispatchTaskIds.contains(taskExecutionRunnable.getId());
128+
}
129+
130+
public synchronized void close() {
131+
// todo WorkerGroupTaskDispatcher thread needs to be shut down after the WorkerGroup is deleted.
132+
if (runningFlag.compareAndSet(true, false)) {
133+
log.info("WorkerGroupDispatcher {} closed", this.getName());
134+
} else {
135+
log.warn("The WorkerGroupDispatcher: {} doesn't started", this.getName());
136+
}
137+
}
138+
139+
int queueSize() {
123140
return this.workerGroupQueue.size();
124141
}
125142
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
19+
20+
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
21+
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
22+
23+
import java.util.concurrent.ConcurrentHashMap;
24+
25+
import lombok.extern.slf4j.Slf4j;
26+
27+
import org.springframework.beans.factory.annotation.Autowired;
28+
import org.springframework.stereotype.Component;
29+
30+
/**
31+
* WorkerGroupTaskDispatcherManager is responsible for managing the task dispatching for worker groups.
32+
* It maintains a mapping of worker groups to their task dispatchers and priority delay queues,
33+
* and supports adding tasks, starting and stopping worker groups, as well as cleaning up resources upon shutdown.
34+
*/
35+
@Component
36+
@Slf4j
37+
public class WorkerGroupDispatcherCoordinator implements AutoCloseable {
38+
39+
@Autowired
40+
private ITaskExecutorClient taskExecutorClient;
41+
42+
private final ConcurrentHashMap<String, WorkerGroupDispatcher> workerGroupDispatcherMap;
43+
44+
public WorkerGroupDispatcherCoordinator() {
45+
workerGroupDispatcherMap = new ConcurrentHashMap<>();
46+
}
47+
48+
public void start() {
49+
log.info("WorkerGroupTaskDispatcherManager started...");
50+
}
51+
52+
/**
53+
* Dispatch task to the worker group with the specified remaining time.
54+
*/
55+
public void dispatchTask(final ITaskExecutionRunnable taskExecutionRunnable,
56+
final long delayTimeMills) {
57+
final String workerGroup = taskExecutionRunnable.getTaskInstance().getWorkerGroup();
58+
getOrCreateWorkerGroupDispatcher(workerGroup).dispatchTask(taskExecutionRunnable, delayTimeMills);
59+
log.info("Success add Task: {} to WorkerGroupDispatcher: {}", taskExecutionRunnable.getId(), workerGroup);
60+
}
61+
62+
/**
63+
* Remove task from the dispatcher.
64+
* <p> If the task doesn't exist in the dispatcher, it will return false, this means the task might already be dispatched.
65+
*/
66+
public boolean removeTask(ITaskExecutionRunnable taskExecutionRunnable) {
67+
final String workerGroup = taskExecutionRunnable.getTaskInstance().getWorkerGroup();
68+
boolean removed = getOrCreateWorkerGroupDispatcher(workerGroup).removeTask(taskExecutionRunnable);
69+
if (removed) {
70+
log.info("Success removed Task: {} from WorkerGroupDispatcher: {}",
71+
taskExecutionRunnable.getId(), workerGroup);
72+
} else {
73+
log.info("Failed to remove Task: {} from WorkerGroupDispatcher: {}, this task has been dispatched",
74+
taskExecutionRunnable.getId(), workerGroup);
75+
}
76+
return removed;
77+
}
78+
79+
public boolean existWorkerGroup(String workerGroup) {
80+
return workerGroupDispatcherMap.containsKey(workerGroup);
81+
}
82+
83+
/**
84+
* Stop all workerGroupTaskDispatchWaitingQueueLooper
85+
*/
86+
@Override
87+
public void close() throws Exception {
88+
log.info("WorkerGroupDispatcherCoordinator closing");
89+
for (WorkerGroupDispatcher workerGroupDispatcher : workerGroupDispatcherMap.values()) {
90+
try {
91+
workerGroupDispatcher.close();
92+
} catch (Exception e) {
93+
log.error("close WorkerGroupDispatcher: {} error", workerGroupDispatcher.getName(), e);
94+
}
95+
}
96+
log.info("WorkerGroupDispatcherCoordinator closed...");
97+
}
98+
99+
private WorkerGroupDispatcher getOrCreateWorkerGroupDispatcher(String workerGroup) {
100+
return workerGroupDispatcherMap.computeIfAbsent(workerGroup, wg -> {
101+
WorkerGroupDispatcher workerGroupDispatcher = new WorkerGroupDispatcher(wg, taskExecutorClient);
102+
workerGroupDispatcher.start();
103+
return workerGroupDispatcher;
104+
});
105+
}
106+
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
2222
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
2323
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
24+
import org.apache.dolphinscheduler.server.master.engine.task.dispatcher.WorkerGroupDispatcherCoordinator;
2425
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchLifecycleEvent;
2526
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
2627
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
@@ -35,7 +36,6 @@
3536
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskSuccessLifecycleEvent;
3637
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
3738
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
38-
import org.apache.dolphinscheduler.server.master.runner.GlobalTaskDispatchWaitingQueue;
3939

4040
import lombok.extern.slf4j.Slf4j;
4141

@@ -50,7 +50,7 @@
5050
public class TaskSubmittedStateAction extends AbstractTaskStateAction {
5151

5252
@Autowired
53-
private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;
53+
private WorkerGroupDispatcherCoordinator workerGroupDispatcherCoordinator;
5454

5555
@Autowired
5656
private TaskInstanceDao taskInstanceDao;
@@ -107,7 +107,7 @@ public void dispatchEventAction(final IWorkflowExecutionRunnable workflowExecuti
107107
taskInstance.getDelayTime(),
108108
remainTimeMills);
109109
}
110-
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecutionRunnable, remainTimeMills);
110+
workerGroupDispatcherCoordinator.dispatchTask(taskExecutionRunnable, remainTimeMills);
111111
}
112112

113113
@Override
@@ -123,7 +123,7 @@ public void pauseEventAction(final IWorkflowExecutionRunnable workflowExecutionR
123123
final ITaskExecutionRunnable taskExecutionRunnable,
124124
final TaskPauseLifecycleEvent taskPauseEvent) {
125125
throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
126-
if (globalTaskDispatchWaitingQueue.markTaskExecutionRunnableRemoved(taskExecutionRunnable)) {
126+
if (workerGroupDispatcherCoordinator.removeTask(taskExecutionRunnable)) {
127127
log.info("Success pause task: {} before dispatch", taskExecutionRunnable.getName());
128128
taskExecutionRunnable.getWorkflowEventBus().publish(TaskPausedLifecycleEvent.of(taskExecutionRunnable));
129129
return;
@@ -144,7 +144,7 @@ public void killEventAction(final IWorkflowExecutionRunnable workflowExecutionRu
144144
final ITaskExecutionRunnable taskExecutionRunnable,
145145
final TaskKillLifecycleEvent taskKillEvent) {
146146
throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
147-
if (globalTaskDispatchWaitingQueue.markTaskExecutionRunnableRemoved(taskExecutionRunnable)) {
147+
if (workerGroupDispatcherCoordinator.removeTask(taskExecutionRunnable)) {
148148
log.info("Success kill task: {} before dispatch", taskExecutionRunnable.getName());
149149
taskExecutionRunnable.getWorkflowEventBus().publish(TaskKilledLifecycleEvent.of(taskExecutionRunnable));
150150
return;

0 commit comments

Comments
 (0)