Skip to content

Commit 2f57e05

Browse files
authored
[Improvement-16979] Unify PriorityDelayQueue with AbstractDelayEventBus (#17155)
1 parent 4d7f927 commit 2f57e05

11 files changed

Lines changed: 110 additions & 335 deletions

File tree

dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEventBus.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ public Optional<T> peek() {
4242
return Optional.ofNullable(delayEventQueue.peek());
4343
}
4444

45+
@Override
46+
public T take() throws InterruptedException {
47+
return delayEventQueue.take();
48+
}
49+
4550
@Override
4651
public Optional<T> remove() {
4752
return Optional.ofNullable(delayEventQueue.remove());

dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/IEventBus.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ public interface IEventBus<T extends IEvent> {
5454
*/
5555
Optional<T> poll() throws InterruptedException;
5656

57+
/**
58+
* Remove the head event from the bus. This method will block if the event bus is empty.
59+
* <p> If the thread is interrupted, an {@link InterruptedException} will be thrown.
60+
*/
61+
T take() throws InterruptedException;
62+
5763
/**
5864
* peek the head event from the bus. This method will not block if the event bus is empty will return empty optional.
5965
*/

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java renamed to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/TaskDispatchableEventBus.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,30 +15,33 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.dolphinscheduler.server.master.runner.queue;
18+
package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
1919

20-
import java.util.concurrent.DelayQueue;
20+
import org.apache.dolphinscheduler.eventbus.AbstractDelayEventBus;
21+
import org.apache.dolphinscheduler.server.master.engine.task.dispatcher.event.TaskDispatchableEvent;
2122

2223
import lombok.SneakyThrows;
2324

24-
public class PriorityDelayQueue<V extends DelayEntry> {
25-
26-
private final DelayQueue<V> queue = new DelayQueue<>();
25+
public class TaskDispatchableEventBus<V extends TaskDispatchableEvent<T>, T extends Comparable<T>>
26+
extends
27+
AbstractDelayEventBus<V> {
2728

2829
public void add(V v) {
29-
queue.put(v);
30+
super.publish(v);
3031
}
3132

3233
@SneakyThrows
3334
public V take() {
34-
return queue.take();
35+
return super.take();
3536
}
3637

38+
// Only use in test
3739
public int size() {
38-
return queue.size();
40+
return delayEventQueue.size();
3941
}
4042

43+
// Only use in test
4144
public void clear() {
42-
queue.clear();
45+
delayEventQueue.clear();
4346
}
4447
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,8 @@
2020
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
2121
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
2222
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
23+
import org.apache.dolphinscheduler.server.master.engine.task.dispatcher.event.TaskDispatchableEvent;
2324
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
24-
import org.apache.dolphinscheduler.server.master.runner.queue.PriorityAndDelayBasedTaskEntry;
25-
import org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue;
2625
import org.apache.dolphinscheduler.task.executor.log.TaskExecutorMDCUtils;
2726

2827
import java.util.Set;
@@ -34,7 +33,7 @@
3433
/**
3534
* WorkerGroupTaskDispatcher is responsible for dispatching tasks from the task queue.
3635
* The main responsibilities include:
37-
* 1. Continuously fetching tasks from the {@link PriorityDelayQueue} for dispatch.
36+
* 1. Continuously fetching tasks from the {@link TaskDispatchableEvent} for dispatch.
3837
* 2. Re-queuing tasks that fail to dispatch according to retry logic.
3938
* 3. Ensuring thread safety and correct state transitions during task processing.
4039
*/
@@ -43,11 +42,7 @@ public class WorkerGroupDispatcher extends BaseDaemonThread {
4342

4443
private final ITaskExecutorClient taskExecutorClient;
4544

46-
// TODO The current queue is flawed. When a high-priority task fails,
47-
// it will be delayed and will not return to the first or second position.
48-
// Tasks with the same priority will preempt its position.
49-
// If it needs to be placed at the front of the queue, the queue needs to be re-implemented.
50-
private final PriorityDelayQueue<PriorityAndDelayBasedTaskEntry<ITaskExecutionRunnable>> workerGroupQueue;
45+
private final TaskDispatchableEventBus<TaskDispatchableEvent<ITaskExecutionRunnable>, ITaskExecutionRunnable> workerGroupEventBus;
5146

5247
private final Set<Integer> waitingDispatchTaskIds;
5348

@@ -56,7 +51,7 @@ public class WorkerGroupDispatcher extends BaseDaemonThread {
5651
public WorkerGroupDispatcher(String workerGroupName, ITaskExecutorClient taskExecutorClient) {
5752
super("WorkerGroupTaskDispatcher-" + workerGroupName);
5853
this.taskExecutorClient = taskExecutorClient;
59-
this.workerGroupQueue = new PriorityDelayQueue<>();
54+
this.workerGroupEventBus = new TaskDispatchableEventBus<>();
6055
this.waitingDispatchTaskIds = ConcurrentHashMap.newKeySet();
6156
log.info("Initialize WorkerGroupDispatcher: {}", this.getName());
6257
}
@@ -75,7 +70,7 @@ public synchronized void start() {
7570
@Override
7671
public void run() {
7772
while (runningFlag.get()) {
78-
PriorityAndDelayBasedTaskEntry<ITaskExecutionRunnable> taskEntry = workerGroupQueue.take();
73+
TaskDispatchableEvent<ITaskExecutionRunnable> taskEntry = workerGroupEventBus.take();
7974
ITaskExecutionRunnable taskExecutionRunnable = taskEntry.getData();
8075
try (
8176
TaskExecutorMDCUtils.MDCAutoClosable ignore =
@@ -120,7 +115,7 @@ private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) {
120115
*/
121116
public void dispatchTask(final ITaskExecutionRunnable taskExecutionRunnable, final long delayTimeMills) {
122117
waitingDispatchTaskIds.add(taskExecutionRunnable.getId());
123-
workerGroupQueue.add(new PriorityAndDelayBasedTaskEntry<>(delayTimeMills, taskExecutionRunnable));
118+
workerGroupEventBus.add(new TaskDispatchableEvent<>(delayTimeMills, taskExecutionRunnable));
124119
}
125120

126121
public boolean removeTask(ITaskExecutionRunnable taskExecutionRunnable) {
@@ -141,6 +136,6 @@ public synchronized void close() {
141136
}
142137

143138
int queueSize() {
144-
return this.workerGroupQueue.size();
139+
return this.workerGroupEventBus.size();
145140
}
146141
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.engine.task.dispatcher.event;
19+
20+
import static com.google.common.base.Preconditions.checkNotNull;
21+
22+
import org.apache.dolphinscheduler.eventbus.AbstractDelayEvent;
23+
24+
import java.util.concurrent.Delayed;
25+
26+
import lombok.Getter;
27+
28+
@Getter
29+
public class TaskDispatchableEvent<V extends Comparable<V>> extends AbstractDelayEvent {
30+
31+
protected final V data;
32+
33+
public TaskDispatchableEvent(long delayTimeMills, V data) {
34+
super(delayTimeMills);
35+
this.data = checkNotNull(data, "data is null");
36+
}
37+
38+
@Override
39+
public int compareTo(Delayed other) {
40+
if (!(other instanceof TaskDispatchableEvent)) {
41+
throw new RuntimeException("The object being compared is not a TaskReadyForDispatchEvent.");
42+
}
43+
44+
@SuppressWarnings("unchecked")
45+
final TaskDispatchableEvent<V> otherEvent = (TaskDispatchableEvent<V>) other;
46+
47+
// there should compare data first for priority
48+
if (data != null && otherEvent.data != null) {
49+
final int compareResult = data.compareTo(otherEvent.data);
50+
if (compareResult != 0) {
51+
return compareResult;
52+
}
53+
}
54+
55+
return super.compareTo(other);
56+
}
57+
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntry.java

Lines changed: 0 additions & 82 deletions
This file was deleted.

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityAndDelayBasedTaskEntry.java

Lines changed: 0 additions & 50 deletions
This file was deleted.

0 commit comments

Comments
 (0)