Skip to content

Commit f9b0e04

Browse files
authored
Retry TaskKillLifecycleEvent/TaskPauseLifecycleEvent when task instance is submitted but already dispatched (#17203)
1 parent a5296d6 commit f9b0e04

8 files changed

Lines changed: 39 additions & 16 deletions

File tree

dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEvent.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public abstract class AbstractDelayEvent implements IEvent, Delayed {
3434

3535
private static final long DEFAULT_DELAY_TIME = 0;
3636

37+
// In milliseconds
3738
protected long delayTime;
3839

3940
@Builder.Default

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
1919

2020
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
21+
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
2122
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
2223
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
2324
import org.apache.dolphinscheduler.server.master.runner.queue.PriorityAndDelayBasedTaskEntry;
@@ -79,7 +80,10 @@ public void run() {
7980
try (
8081
TaskExecutorMDCUtils.MDCAutoClosable ignore =
8182
TaskExecutorMDCUtils.logWithMDC(taskExecutionRunnable.getId())) {
83+
LogUtils.setWorkflowInstanceIdMDC(taskExecutionRunnable.getTaskInstance().getWorkflowInstanceId());
8284
doDispatchTask(taskExecutionRunnable);
85+
} finally {
86+
LogUtils.removeWorkflowInstanceIdMDC();
8387
}
8488
}
8589
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskKillLifecycleEvent.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,24 @@
2222
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType;
2323
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
2424

25-
import lombok.AllArgsConstructor;
2625
import lombok.Getter;
2726

2827
@Getter
29-
@AllArgsConstructor
3028
public class TaskKillLifecycleEvent extends AbstractTaskLifecycleEvent {
3129

3230
private final ITaskExecutionRunnable taskExecutionRunnable;
3331

32+
private TaskKillLifecycleEvent(ITaskExecutionRunnable taskExecutionRunnable, long delayTime) {
33+
super(delayTime);
34+
this.taskExecutionRunnable = taskExecutionRunnable;
35+
}
36+
3437
public static TaskKillLifecycleEvent of(final ITaskExecutionRunnable taskExecutionRunnable) {
35-
return new TaskKillLifecycleEvent(taskExecutionRunnable);
38+
return of(taskExecutionRunnable, 0);
39+
}
40+
41+
public static TaskKillLifecycleEvent of(final ITaskExecutionRunnable taskExecutionRunnable, long delayTime) {
42+
return new TaskKillLifecycleEvent(taskExecutionRunnable, delayTime);
3643
}
3744

3845
@Override

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskPauseLifecycleEvent.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,24 @@
2222
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType;
2323
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
2424

25-
import lombok.AllArgsConstructor;
2625
import lombok.Getter;
2726

2827
@Getter
29-
@AllArgsConstructor
3028
public class TaskPauseLifecycleEvent extends AbstractTaskLifecycleEvent {
3129

3230
private final ITaskExecutionRunnable taskExecutionRunnable;
3331

32+
private TaskPauseLifecycleEvent(final ITaskExecutionRunnable taskExecutionRunnable, long delayTime) {
33+
super(delayTime);
34+
this.taskExecutionRunnable = taskExecutionRunnable;
35+
}
36+
3437
public static TaskPauseLifecycleEvent of(ITaskExecutionRunnable taskExecutionRunnable) {
35-
return new TaskPauseLifecycleEvent(taskExecutionRunnable);
38+
return of(taskExecutionRunnable, 0);
39+
}
40+
41+
public static TaskPauseLifecycleEvent of(ITaskExecutionRunnable taskExecutionRunnable, long delayTime) {
42+
return new TaskPauseLifecycleEvent(taskExecutionRunnable, delayTime);
3643
}
3744

3845
@Override

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ protected void failoverTask(final ITaskExecutionRunnable taskExecutionRunnable)
229229
protected void tryToDispatchTask(final ITaskExecutionRunnable taskExecutionRunnable) {
230230
if (isTaskNeedAcquireTaskGroupSlot(taskExecutionRunnable)) {
231231
acquireTaskGroupSlot(taskExecutionRunnable);
232-
log.info("Task{} using taskGroup, success acquire taskGroup slot", taskExecutionRunnable.getName());
232+
log.info("Task[name={}] using taskGroup, success acquire taskGroup slot", taskExecutionRunnable.getName());
233233
return;
234234
}
235235
taskExecutionRunnable.getWorkflowEventBus().publish(TaskDispatchLifecycleEvent.of(taskExecutionRunnable));
@@ -262,7 +262,7 @@ protected void throwExceptionIfStateIsNotMatch(final ITaskExecutionRunnable task
262262
protected void logWarningIfCannotDoAction(final ITaskExecutionRunnable taskExecutionRunnable,
263263
final AbstractLifecycleEvent event) {
264264
final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance();
265-
log.warn("Task {} state is {} cannot do action on event: {}",
265+
log.warn("Task[name={}] state is {} cannot do action on event: {}",
266266
taskInstance.getName(),
267267
taskInstance.getState(),
268268
event);

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskDispatchStateAction.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.dolphinscheduler.server.master.engine.task.statemachine;
1919

20-
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
2120
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
2221
import org.apache.dolphinscheduler.server.master.engine.task.client.TaskExecutorClient;
2322
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchLifecycleEvent;
@@ -44,8 +43,6 @@
4443
@Component
4544
public class TaskDispatchStateAction extends AbstractTaskStateAction {
4645

47-
@Autowired
48-
private TaskInstanceDao taskInstanceDao;
4946
@Autowired
5047
private TaskExecutorClient taskExecutorClient;
5148

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
3838
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
3939

40+
import java.util.concurrent.TimeUnit;
41+
4042
import lombok.extern.slf4j.Slf4j;
4143

4244
import org.springframework.beans.factory.annotation.Autowired;
@@ -128,7 +130,10 @@ public void pauseEventAction(final IWorkflowExecutionRunnable workflowExecutionR
128130
taskExecutionRunnable.getWorkflowEventBus().publish(TaskPausedLifecycleEvent.of(taskExecutionRunnable));
129131
return;
130132
}
131-
logWarningIfCannotDoAction(taskExecutionRunnable, taskPauseEvent);
133+
log.info("The task[id={}] is submitted and already dispatched, cannot pause, will try to pause it after 5s",
134+
taskExecutionRunnable.getId());
135+
taskExecutionRunnable.getWorkflowEventBus()
136+
.publish(TaskPauseLifecycleEvent.of(taskExecutionRunnable, TimeUnit.SECONDS.toSeconds(5)));
132137
}
133138

134139
@Override
@@ -145,11 +150,14 @@ public void killEventAction(final IWorkflowExecutionRunnable workflowExecutionRu
145150
final TaskKillLifecycleEvent taskKillEvent) {
146151
throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
147152
if (workerGroupDispatcherCoordinator.removeTask(taskExecutionRunnable)) {
148-
log.info("Success kill task: {} before dispatch", taskExecutionRunnable.getName());
153+
log.info("Success kill task[id={}] before dispatch", taskExecutionRunnable.getId());
149154
taskExecutionRunnable.getWorkflowEventBus().publish(TaskKilledLifecycleEvent.of(taskExecutionRunnable));
150155
return;
151156
}
152-
logWarningIfCannotDoAction(taskExecutionRunnable, taskKillEvent);
157+
log.info("The task[id={}] is submitted and already dispatched, cannot kill, will kill it after 5s",
158+
taskExecutionRunnable.getId());
159+
taskExecutionRunnable.getWorkflowEventBus()
160+
.publish(TaskKillLifecycleEvent.of(taskExecutionRunnable, TimeUnit.SECONDS.toSeconds(5)));
153161
}
154162

155163
@Override

dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/log/TaskExecutorMDCUtils.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.dolphinscheduler.task.executor.log;
1919

20-
import org.apache.dolphinscheduler.common.constants.Constants;
2120
import org.apache.dolphinscheduler.task.executor.ITaskExecutor;
2221

2322
import org.slf4j.MDC;
@@ -41,7 +40,7 @@ public static MDCAutoClosable logWithMDC(final int taskInstanceId, final String
4140
MDC.put(TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY, logPath);
4241
}
4342

44-
MDC.put(Constants.TASK_INSTANCE_ID_MDC_KEY, String.valueOf(taskInstanceId));
43+
MDC.put(TASK_INSTANCE_ID_MDC_KEY, String.valueOf(taskInstanceId));
4544

4645
return () -> {
4746
MDC.remove(TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY);

0 commit comments

Comments
 (0)