Skip to content

Commit e2a9e76

Browse files
authored
[Fix-17355] Fix reassignWorkflowInstanceHost may failed when no events in channel (#17372)
1 parent 37f7067 commit e2a9e76

18 files changed

Lines changed: 106 additions & 78 deletions

File tree

dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/TaskExecutorEventRemoteReporterClient.java

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -33,32 +33,39 @@
3333
@Slf4j
3434
public class TaskExecutorEventRemoteReporterClient implements ITaskExecutorEventRemoteReporterClient {
3535

36-
public void reportTaskExecutionEventToMaster(final IReportableTaskExecutorLifecycleEvent taskExecutorLifecycleEvent) {
36+
@Override
37+
public void reportTaskExecutionEventToMaster(final String masterAddress,
38+
final IReportableTaskExecutorLifecycleEvent taskExecutorLifecycleEvent) {
3739
try {
3840
taskExecutorLifecycleEvent.setLatestReportTime(System.currentTimeMillis());
3941
switch (taskExecutorLifecycleEvent.getType()) {
4042
case DISPATCHED:
41-
reportTaskDispatchedEventToMaster(
43+
reportTaskDispatchedEventToMaster(masterAddress,
4244
(TaskExecutorDispatchedLifecycleEvent) taskExecutorLifecycleEvent);
4345
break;
4446
case RUNNING:
45-
reportTaskRunningEventToMaster((TaskExecutorStartedLifecycleEvent) taskExecutorLifecycleEvent);
47+
reportTaskRunningEventToMaster(masterAddress,
48+
(TaskExecutorStartedLifecycleEvent) taskExecutorLifecycleEvent);
4649
break;
4750
case RUNTIME_CONTEXT_CHANGE:
48-
reportTaskRuntimeContextChangeEventToMaster(
51+
reportTaskRuntimeContextChangeEventToMaster(masterAddress,
4952
(TaskExecutorRuntimeContextChangedLifecycleEvent) taskExecutorLifecycleEvent);
5053
break;
5154
case PAUSED:
52-
reportTaskPausedEventToMaster((TaskExecutorPausedLifecycleEvent) taskExecutorLifecycleEvent);
55+
reportTaskPausedEventToMaster(masterAddress,
56+
(TaskExecutorPausedLifecycleEvent) taskExecutorLifecycleEvent);
5357
break;
5458
case KILLED:
55-
reportTaskKilledEventToMaster((TaskExecutorKilledLifecycleEvent) taskExecutorLifecycleEvent);
59+
reportTaskKilledEventToMaster(masterAddress,
60+
(TaskExecutorKilledLifecycleEvent) taskExecutorLifecycleEvent);
5661
break;
5762
case FAILED:
58-
reportTaskFailedEventToMaster((TaskExecutorFailedLifecycleEvent) taskExecutorLifecycleEvent);
63+
reportTaskFailedEventToMaster(masterAddress,
64+
(TaskExecutorFailedLifecycleEvent) taskExecutorLifecycleEvent);
5965
break;
6066
case SUCCESS:
61-
reportTaskSuccessEventToMaster((TaskExecutorSuccessLifecycleEvent) taskExecutorLifecycleEvent);
67+
reportTaskSuccessEventToMaster(masterAddress,
68+
(TaskExecutorSuccessLifecycleEvent) taskExecutorLifecycleEvent);
6269
break;
6370
default:
6471
log.warn("Unsupported TaskExecutionEvent: {}", taskExecutorLifecycleEvent);
@@ -69,52 +76,59 @@ public void reportTaskExecutionEventToMaster(final IReportableTaskExecutorLifecy
6976
}
7077
}
7178

72-
private static void reportTaskDispatchedEventToMaster(final TaskExecutorDispatchedLifecycleEvent taskExecutionDispatchedEvent) {
79+
private static void reportTaskDispatchedEventToMaster(final String masterAddress,
80+
final TaskExecutorDispatchedLifecycleEvent taskExecutionDispatchedEvent) {
7381
Clients
7482
.withService(ITaskExecutorEventListener.class)
75-
.withHost(taskExecutionDispatchedEvent.getWorkflowInstanceHost())
83+
.withHost(masterAddress)
7684
.onTaskExecutorDispatched(taskExecutionDispatchedEvent);
7785
}
7886

79-
private static void reportTaskRunningEventToMaster(final TaskExecutorStartedLifecycleEvent taskExecutionRunningEvent) {
87+
private static void reportTaskRunningEventToMaster(final String masterAddress,
88+
final TaskExecutorStartedLifecycleEvent taskExecutionRunningEvent) {
8089
Clients
8190
.withService(ITaskExecutorEventListener.class)
82-
.withHost(taskExecutionRunningEvent.getWorkflowInstanceHost())
91+
.withHost(masterAddress)
8392
.onTaskExecutorRunning(taskExecutionRunningEvent);
8493
}
8594

86-
private static void reportTaskRuntimeContextChangeEventToMaster(final TaskExecutorRuntimeContextChangedLifecycleEvent taskExecutorLifecycleEvent) {
95+
private static void reportTaskRuntimeContextChangeEventToMaster(final String masterAddress,
96+
final TaskExecutorRuntimeContextChangedLifecycleEvent taskExecutorLifecycleEvent) {
8797
Clients
8898
.withService(ITaskExecutorEventListener.class)
89-
.withHost(taskExecutorLifecycleEvent.getWorkflowInstanceHost())
99+
.withHost(masterAddress)
90100
.onTaskExecutorRuntimeContextChanged(taskExecutorLifecycleEvent);
91101
}
92102

93-
private static void reportTaskPausedEventToMaster(final TaskExecutorPausedLifecycleEvent taskExecutionPausedEvent) {
103+
private static void reportTaskPausedEventToMaster(final String masterAddress,
104+
final TaskExecutorPausedLifecycleEvent taskExecutionPausedEvent) {
94105
Clients
95106
.withService(ITaskExecutorEventListener.class)
96-
.withHost(taskExecutionPausedEvent.getWorkflowInstanceHost())
107+
.withHost(masterAddress)
97108
.onTaskExecutorPaused(taskExecutionPausedEvent);
98109
}
99110

100-
private static void reportTaskKilledEventToMaster(final TaskExecutorKilledLifecycleEvent taskExecutionKilledEvent) {
111+
private static void reportTaskKilledEventToMaster(final String masterAddress,
112+
final TaskExecutorKilledLifecycleEvent taskExecutionKilledEvent) {
101113
Clients
102114
.withService(ITaskExecutorEventListener.class)
103-
.withHost(taskExecutionKilledEvent.getWorkflowInstanceHost())
115+
.withHost(masterAddress)
104116
.onTaskExecutorKilled(taskExecutionKilledEvent);
105117
}
106118

107-
private static void reportTaskFailedEventToMaster(final TaskExecutorFailedLifecycleEvent taskExecutionFailedEvent) {
119+
private static void reportTaskFailedEventToMaster(final String masterAddress,
120+
final TaskExecutorFailedLifecycleEvent taskExecutionFailedEvent) {
108121
Clients
109122
.withService(ITaskExecutorEventListener.class)
110-
.withHost(taskExecutionFailedEvent.getWorkflowInstanceHost())
123+
.withHost(masterAddress)
111124
.onTaskExecutorFailed(taskExecutionFailedEvent);
112125
}
113126

114-
private static void reportTaskSuccessEventToMaster(final TaskExecutorSuccessLifecycleEvent taskExecutionSuccessEvent) {
127+
private static void reportTaskSuccessEventToMaster(final String masterAddress,
128+
final TaskExecutorSuccessLifecycleEvent taskExecutionSuccessEvent) {
115129
Clients
116130
.withService(ITaskExecutorEventListener.class)
117-
.withHost(taskExecutionSuccessEvent.getWorkflowInstanceHost())
131+
.withHost(masterAddress)
118132
.onTaskExecutorSuccess(taskExecutionSuccessEvent);
119133
}
120134
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutorLifecycleEventReporter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
public class LogicTaskExecutorLifecycleEventReporter extends TaskExecutorLifecycleEventRemoteReporter {
2626

2727
public LogicTaskExecutorLifecycleEventReporter(
28-
final LogicTaskExecutorEventRemoteReporterClient logicTaskExecutorEventRemoteReporterClient) {
29-
super("LogicTaskExecutorLifecycleEventReporter", logicTaskExecutorEventRemoteReporterClient);
28+
final LogicTaskExecutorEventRemoteReporterClient logicTaskExecutorEventRemoteReporterClient,
29+
final LogicTaskExecutorRepository taskExecutorRepository) {
30+
super("LogicTaskExecutorLifecycleEventReporter", logicTaskExecutorEventRemoteReporterClient,
31+
taskExecutorRepository);
3032
}
3133
}

dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/ITaskExecutorEventRemoteReporterClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,6 @@
2121

2222
public interface ITaskExecutorEventRemoteReporterClient {
2323

24-
void reportTaskExecutionEventToMaster(final IReportableTaskExecutorLifecycleEvent reportableTaskExecutorLifecycleEvent);
24+
void reportTaskExecutionEventToMaster(final String masterAddress,
25+
final IReportableTaskExecutorLifecycleEvent reportableTaskExecutorLifecycleEvent);
2526
}

dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/ITaskExecutorLifecycleEventReporter.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,9 @@ public interface ITaskExecutorLifecycleEventReporter extends AutoCloseable {
4848
void receiveTaskExecutorLifecycleEventACK(final TaskExecutorLifecycleEventAck taskExecutorLifecycleEventAck);
4949

5050
/**
51-
* Reassign the workflow instance host of the IReportableTaskExecutorLifecycleEvent.
52-
* <p> This method is used to reassign the workflow instance host of the IReportableTaskExecutorLifecycleEvent, once the workflow's host changed.
51+
* Reset the events in the channel to allow them to be reported immediately.
5352
*/
54-
boolean reassignWorkflowInstanceHost(int taskInstanceId, String workflowHost);
53+
void onWorkflowInstanceHostChanged(int taskInstanceId);
5554

5655
/**
5756
* Shutdown the reporter.

dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/TaskExecutorLifecycleEventRemoteReporter.java

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,18 @@
1717

1818
package org.apache.dolphinscheduler.task.executor.eventbus;
1919

20+
import org.apache.dolphinscheduler.common.exception.BaseException;
2021
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
2122
import org.apache.dolphinscheduler.common.utils.JSONUtils;
23+
import org.apache.dolphinscheduler.task.executor.ITaskExecutor;
24+
import org.apache.dolphinscheduler.task.executor.ITaskExecutorRepository;
2225
import org.apache.dolphinscheduler.task.executor.events.IReportableTaskExecutorLifecycleEvent;
26+
import org.apache.dolphinscheduler.task.executor.events.TaskExecutorFinalizeLifecycleEvent;
2327
import org.apache.dolphinscheduler.task.executor.events.TaskExecutorLifecycleEventType;
2428
import org.apache.dolphinscheduler.task.executor.log.TaskExecutorMDCUtils;
2529

2630
import java.util.Map;
31+
import java.util.Optional;
2732
import java.util.concurrent.ConcurrentHashMap;
2833
import java.util.concurrent.LinkedBlockingQueue;
2934
import java.util.concurrent.TimeUnit;
@@ -56,11 +61,15 @@ public class TaskExecutorLifecycleEventRemoteReporter extends BaseDaemonThread
5661

5762
private final Condition taskExecutionEventEmptyCondition = eventChannelsLock.newCondition();
5863

64+
private final ITaskExecutorRepository taskExecutorRepository;
65+
5966
public TaskExecutorLifecycleEventRemoteReporter(final String reporterName,
60-
final ITaskExecutorEventRemoteReporterClient taskExecutorEventRemoteReporterClient) {
67+
final ITaskExecutorEventRemoteReporterClient taskExecutorEventRemoteReporterClient,
68+
final ITaskExecutorRepository taskExecutorRepository) {
6169
super(reporterName);
6270
this.reporterName = reporterName;
6371
this.taskExecutorEventRemoteReporterClient = taskExecutorEventRemoteReporterClient;
72+
this.taskExecutorRepository = taskExecutorRepository;
6473
}
6574

6675
@Override
@@ -128,6 +137,11 @@ public void receiveTaskExecutorLifecycleEventACK(final TaskExecutorLifecycleEven
128137
log.info("Failed removed ReportableTaskExecutorLifecycleEvent by ack: {}", eventAck);
129138
}
130139
if (eventChannel.isEmpty()) {
140+
// Extend the lifecycle of the TaskExecutor to span the entire processing cycle of the task.
141+
// so we can finalize the TaskExecutor after the associated channel has been removed.
142+
if (removed != null && removed.getType().isFinished()) {
143+
finalizeTaskExecutor(removed.getTaskInstanceId());
144+
}
131145
eventChannels.remove(taskExecutorId);
132146
log.debug("Removed ReportableTaskExecutorLifecycleEventChannel: {}", taskExecutorId);
133147
}
@@ -138,15 +152,14 @@ public void receiveTaskExecutorLifecycleEventACK(final TaskExecutorLifecycleEven
138152
}
139153

140154
@Override
141-
public boolean reassignWorkflowInstanceHost(int taskInstanceId, String workflowHost) {
155+
public void onWorkflowInstanceHostChanged(int taskInstanceId) {
142156
eventChannelsLock.lock();
143157
try {
144158
final ReportableTaskExecutorLifecycleEventChannel eventChannel = eventChannels.get(taskInstanceId);
145-
if (eventChannel == null) {
146-
return false;
159+
if (eventChannel != null) {
160+
eventChannel.taskExecutionEventsQueue.forEach(event -> event.setLatestReportTime(null));
161+
taskExecutionEventEmptyCondition.signalAll();
147162
}
148-
eventChannel.taskExecutionEventsQueue.forEach(event -> event.setWorkflowInstanceHost(workflowHost));
149-
return true;
150163
} finally {
151164
eventChannelsLock.unlock();
152165
}
@@ -164,6 +177,16 @@ public Map<Integer, ReportableTaskExecutorLifecycleEventChannel> getEventChannel
164177
return eventChannels;
165178
}
166179

180+
private void finalizeTaskExecutor(final Integer taskExecutorId) {
181+
final Optional<ITaskExecutor> taskExecutorOptional = taskExecutorRepository.get(taskExecutorId);
182+
if (taskExecutorOptional.isPresent()) {
183+
taskExecutorOptional.get().getTaskExecutorEventBus()
184+
.publish(TaskExecutorFinalizeLifecycleEvent.of(taskExecutorOptional.get()));
185+
} else {
186+
log.warn("TaskExecutor is not exists: {}", taskExecutorId);
187+
}
188+
}
189+
167190
private void handleTaskExecutionEventChannel(final ReportableTaskExecutorLifecycleEventChannel reportableTaskExecutorLifecycleEventChannel) {
168191
if (reportableTaskExecutorLifecycleEventChannel.isEmpty()) {
169192
return;
@@ -175,7 +198,16 @@ private void handleTaskExecutionEventChannel(final ReportableTaskExecutorLifecyc
175198
TaskExecutorMDCUtils.logWithMDC(headEvent.getTaskInstanceId())) {
176199
try {
177200
if (isTaskExecutorEventNeverSent(headEvent) || isRetryIntervalExceeded(headEvent)) {
178-
taskExecutorEventRemoteReporterClient.reportTaskExecutionEventToMaster(headEvent);
201+
final Optional<ITaskExecutor> taskExecutorOptional =
202+
taskExecutorRepository.get(headEvent.getTaskInstanceId());
203+
if (!taskExecutorOptional.isPresent()) {
204+
throw new BaseException(String.format("The TaskExecutor id %d is not exist.",
205+
headEvent.getTaskInstanceId()));
206+
}
207+
final String masterAddress =
208+
taskExecutorOptional.get().getTaskExecutionContext().getWorkflowInstanceHost();
209+
taskExecutorEventRemoteReporterClient.reportTaskExecutionEventToMaster(masterAddress,
210+
headEvent);
179211
continue;
180212
}
181213
if (log.isDebugEnabled()) {

dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/IReportableTaskExecutorLifecycleEvent.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,6 @@ public interface IReportableTaskExecutorLifecycleEvent extends ITaskExecutorLife
2929
*/
3030
int getWorkflowInstanceId();
3131

32-
/**
33-
* The host of the workflow instance which the event should report to.
34-
*/
35-
String getWorkflowInstanceHost();
36-
37-
/**
38-
* Set the host of the workflow instance which the event should report to.
39-
*/
40-
void setWorkflowInstanceHost(String workflowInstanceHost);
41-
4232
/**
4333
* Get the latest report time of the event, if the event is never reported, return null.
4434
*/

dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorDispatchedLifecycleEvent.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@ public class TaskExecutorDispatchedLifecycleEvent extends AbstractTaskExecutorLi
3737

3838
private int workflowInstanceId;
3939

40-
private String workflowInstanceHost;
41-
4240
private String taskInstanceHost;
4341

4442
private Long latestReportTime;
@@ -49,7 +47,6 @@ public static TaskExecutorDispatchedLifecycleEvent of(final ITaskExecutor taskEx
4947
.taskInstanceId(taskExecutionContext.getTaskInstanceId())
5048
.workflowInstanceId(taskExecutionContext.getWorkflowInstanceId())
5149
.taskInstanceHost(taskExecutionContext.getHost())
52-
.workflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost())
5350
.type(TaskExecutorLifecycleEventType.DISPATCHED)
5451
.build();
5552
}

dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorFailedLifecycleEvent.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@ public class TaskExecutorFailedLifecycleEvent extends AbstractTaskExecutorLifecy
3737

3838
private int workflowInstanceId;
3939

40-
private String workflowInstanceHost;
41-
4240
private String taskInstanceHost;
4341

4442
private String appIds;
@@ -53,7 +51,6 @@ public static TaskExecutorFailedLifecycleEvent of(final ITaskExecutor taskExecut
5351
.taskInstanceId(taskExecutor.getId())
5452
.workflowInstanceId(taskExecutionContext.getWorkflowInstanceId())
5553
.taskInstanceHost(taskExecutionContext.getHost())
56-
.workflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost())
5754
.appIds(taskExecutionContext.getAppIds())
5855
.endTime(taskExecutionContext.getEndTime())
5956
.type(TaskExecutorLifecycleEventType.FAILED)

dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorKilledLifecycleEvent.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@ public class TaskExecutorKilledLifecycleEvent extends AbstractTaskExecutorLifecy
3737

3838
private int workflowInstanceId;
3939

40-
private String workflowInstanceHost;
41-
4240
private String taskInstanceHost;
4341

4442
private long endTime;
@@ -51,7 +49,6 @@ public static TaskExecutorKilledLifecycleEvent of(final ITaskExecutor taskExecut
5149
.taskInstanceId(taskExecutionContext.getTaskInstanceId())
5250
.workflowInstanceId(taskExecutionContext.getWorkflowInstanceId())
5351
.taskInstanceHost(taskExecutionContext.getHost())
54-
.workflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost())
5552
.endTime(taskExecutionContext.getEndTime())
5653
.type(TaskExecutorLifecycleEventType.KILLED)
5754
.build();

dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorLifecycleEventType.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,10 @@ public enum TaskExecutorLifecycleEventType {
4040
FINALIZE,
4141
;
4242

43+
public boolean isFinished() {
44+
return (this == KILLED
45+
|| this == PAUSED
46+
|| this == FAILED
47+
|| this == SUCCESS);
48+
}
4349
}

0 commit comments

Comments
 (0)