Skip to content

Commit 6f12d1d

Browse files
authored
[Fix-17184] Fix varpool cannot use (#17199)
1 parent 59d5c3e commit 6f12d1d

36 files changed

Lines changed: 393 additions & 849 deletions

File tree

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dependent/DependentTaskTracker.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import org.apache.dolphinscheduler.common.constants.Constants;
2121
import org.apache.dolphinscheduler.common.enums.ContextType;
22-
import org.apache.dolphinscheduler.common.utils.JSONUtils;
2322
import org.apache.dolphinscheduler.dao.entity.DependentResultTaskInstanceContext;
2423
import org.apache.dolphinscheduler.dao.entity.Project;
2524
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@@ -111,7 +110,7 @@ public DependentTaskTracker(TaskExecutionContext taskExecutionContext,
111110
DependResult dependResult = calculateDependResult();
112111
log.info("The final Dependent result is: {}", dependResult);
113112
if (dependResult == DependResult.SUCCESS) {
114-
dependentParameters.setVarPool(JSONUtils.toJsonString(dependVarPoolPropertyMap.values()));
113+
dependentParameters.setVarPool(new ArrayList<>(dependVarPoolPropertyMap.values()));
115114
log.info("Set dependentParameters varPool: {}", dependentParameters.getVarPool());
116115
return TaskExecutionStatus.SUCCESS;
117116
} else {

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/fake/LogicFakeTask.java

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,27 @@
1717

1818
package org.apache.dolphinscheduler.server.master.engine.executor.plugin.fake;
1919

20+
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
2021
import org.apache.dolphinscheduler.common.utils.JSONUtils;
2122
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
2223
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
2324
import org.apache.dolphinscheduler.plugin.task.api.parameters.LogicFakeTaskParameters;
25+
import org.apache.dolphinscheduler.plugin.task.api.parser.TaskOutputParameterParser;
2426
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
2527
import org.apache.dolphinscheduler.server.master.engine.executor.plugin.AbstractLogicTask;
2628
import org.apache.dolphinscheduler.server.master.engine.executor.plugin.ITaskParameterDeserializer;
2729
import org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
2830

2931
import org.apache.commons.lang3.StringUtils;
3032

33+
import java.io.BufferedReader;
34+
import java.io.InputStreamReader;
35+
import java.util.Map;
36+
import java.util.concurrent.ExecutorService;
37+
import java.util.concurrent.Future;
38+
import java.util.concurrent.TimeUnit;
39+
import java.util.concurrent.TimeoutException;
40+
3141
import lombok.extern.slf4j.Slf4j;
3242

3343
import com.fasterxml.jackson.core.type.TypeReference;
@@ -41,7 +51,7 @@
4151
@VisibleForTesting
4252
public class LogicFakeTask extends AbstractLogicTask<LogicFakeTaskParameters> {
4353

44-
private Process process;
54+
private volatile Process process;
4555

4656
public LogicFakeTask(final TaskExecutionContext taskExecutionContext) {
4757
super(taskExecutionContext);
@@ -60,22 +70,33 @@ public void start() throws MasterTaskExecuteException {
6070
if (StringUtils.isNotEmpty(taskExecutionContext.getEnvironmentConfig())) {
6171
shellScript = taskExecutionContext.getEnvironmentConfig() + "\n" + shellScript;
6272
}
63-
final String[] cmd = {"/bin/sh", "-c", shellScript};
64-
process = Runtime.getRuntime().exec(cmd);
73+
ProcessBuilder processBuilder = new ProcessBuilder("/bin/sh", "-c", shellScript);
74+
processBuilder.redirectErrorStream(true);
75+
process = processBuilder.start();
76+
final Future<Map<String, String>> parseVarPoolFuture = parseVarPool();
6577
int exitCode = process.waitFor();
66-
if (taskExecutionStatus != TaskExecutionStatus.RUNNING_EXECUTION) {
78+
log.info("LogicFakeTask: {} execute finished with exit code: {}",
79+
taskExecutionContext.getTaskName(),
80+
exitCode);
81+
if (taskExecutionStatus == TaskExecutionStatus.KILL) {
82+
try {
83+
parseVarPoolFuture.get(1, TimeUnit.SECONDS);
84+
} catch (TimeoutException interruptedException) {
85+
// ignore
86+
}
6787
// The task has been killed
88+
log.info("LogicFakeTask: {} has been killed", taskExecutionContext.getTaskName());
6889
return;
6990
}
91+
92+
final Map<String, String> taskOutputParams = parseVarPoolFuture.get();
7093
if (exitCode == 0) {
71-
log.info("LogicFakeTask: {} execute success with exit code: {}",
72-
taskExecutionContext.getTaskName(),
73-
exitCode);
94+
log.info("LogicFakeTask: {} execute success", taskExecutionContext.getTaskName());
95+
taskParameters.dealOutParam(taskOutputParams);
96+
taskExecutionContext.setVarPool(taskParameters.getVarPool());
7497
onTaskSuccess();
7598
} else {
76-
log.info("LogicFakeTask: {} execute failed with exit code: {}",
77-
taskExecutionContext.getTaskName(),
78-
exitCode);
99+
log.info("LogicFakeTask: {} execute failed", taskExecutionContext.getTaskName());
79100
onTaskFailed();
80101
}
81102
} catch (Exception ex) {
@@ -104,4 +125,25 @@ public ITaskParameterDeserializer<LogicFakeTaskParameters> getTaskParameterDeser
104125
});
105126
}
106127

128+
private Future<Map<String, String>> parseVarPool() {
129+
ExecutorService varPoolParseThreadPool = ThreadUtils.newSingleDaemonScheduledExecutorService(
130+
"ResolveOutputLog-thread-" + taskExecutionContext.getTaskName());
131+
Future<Map<String, String>> future = varPoolParseThreadPool.submit(() -> {
132+
TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser();
133+
try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
134+
String line;
135+
while ((line = inReader.readLine()) != null) {
136+
log.info(line);
137+
taskOutputParameterParser.appendParseLog(line);
138+
}
139+
} catch (Exception e) {
140+
log.error("Parse var pool error", e);
141+
}
142+
return taskOutputParameterParser.getTaskOutputParams();
143+
});
144+
145+
varPoolParseThreadPool.shutdown();
146+
return future;
147+
}
148+
107149
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinator.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ public void dispatchTask(final ITaskExecutionRunnable taskExecutionRunnable,
5656
final long delayTimeMills) {
5757
final String workerGroup = taskExecutionRunnable.getTaskInstance().getWorkerGroup();
5858
getOrCreateWorkerGroupDispatcher(workerGroup).dispatchTask(taskExecutionRunnable, delayTimeMills);
59-
log.info("Success add Task: {} to WorkerGroupDispatcher: {}", taskExecutionRunnable.getId(), workerGroup);
59+
log.info("Success add Task[id={}] to WorkerGroupDispatcher[name={}]", taskExecutionRunnable.getId(),
60+
workerGroup);
6061
}
6162

6263
/**
@@ -67,10 +68,10 @@ public boolean removeTask(ITaskExecutionRunnable taskExecutionRunnable) {
6768
final String workerGroup = taskExecutionRunnable.getTaskInstance().getWorkerGroup();
6869
boolean removed = getOrCreateWorkerGroupDispatcher(workerGroup).removeTask(taskExecutionRunnable);
6970
if (removed) {
70-
log.info("Success removed Task: {} from WorkerGroupDispatcher: {}",
71+
log.info("Success removed Task[id={}] from WorkerGroupDispatcher[name={}]",
7172
taskExecutionRunnable.getId(), workerGroup);
7273
} else {
73-
log.info("Failed to remove Task: {} from WorkerGroupDispatcher: {}, this task has been dispatched",
74+
log.info("Failed to remove Task[id={}] from WorkerGroupDispatcher[name={}], this task has been dispatched",
7475
taskExecutionRunnable.getId(), workerGroup);
7576
}
7677
return removed;
@@ -90,7 +91,7 @@ public void close() throws Exception {
9091
try {
9192
workerGroupDispatcher.close();
9293
} catch (Exception e) {
93-
log.error("close WorkerGroupDispatcher: {} error", workerGroupDispatcher.getName(), e);
94+
log.error("close WorkerGroupDispatcher[name={}] error", workerGroupDispatcher.getName(), e);
9495
}
9596
}
9697
log.info("WorkerGroupDispatcherCoordinator closed...");

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskKillLifecycleEvent.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ public ILifecycleEventType getEventType() {
5050
@Override
5151
public String toString() {
5252
return "TaskKillLifecycleEvent{" +
53-
"task=" + taskExecutionRunnable.getName() +
53+
"task=" + taskExecutionRunnable.getName() + ", " +
54+
"delayTime=" + delayTime +
5455
'}';
5556
}
5657
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskPauseLifecycleEvent.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ public ILifecycleEventType getEventType() {
5050
@Override
5151
public String toString() {
5252
return "TaskPauseLifecycleEvent{" +
53-
"task=" + taskExecutionRunnable.getName() +
53+
"task=" + taskExecutionRunnable.getName() + ", " +
54+
"delayTime=" + delayTime +
5455
'}';
5556
}
5657
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskSuccessLifecycleEvent.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717

1818
package org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event;
1919

20+
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
2021
import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
2122
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.AbstractTaskLifecycleEvent;
2223
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType;
2324
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
2425

2526
import java.util.Date;
27+
import java.util.List;
2628

2729
import lombok.AllArgsConstructor;
2830
import lombok.Builder;
@@ -37,7 +39,7 @@ public class TaskSuccessLifecycleEvent extends AbstractTaskLifecycleEvent {
3739

3840
private final Date endTime;
3941

40-
private final String varPool;
42+
private final List<Property> varPool;
4143

4244
@Override
4345
public ILifecycleEventType getEventType() {

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/ITaskExecutionRunnable.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ default String getName() {
5656
*/
5757
void initializeFirstRunTaskInstance();
5858

59+
/**
60+
* Initialize {@link TaskExecutionContext}.
61+
* <p> The TaskExecutionContext should be initialized before dispatch stage.
62+
*/
63+
void initializeTaskExecutionContext();
64+
5965
/**
6066
* Whether the task instance is running.
6167
*/

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionContextBuilder.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ public TaskExecutionContextBuilder buildTaskInstanceRelatedInfo(final TaskInstan
6767
taskExecutionContext.setLogPath(taskInstance.getLogPath());
6868
taskExecutionContext.setWorkerGroup(taskInstance.getWorkerGroup());
6969
taskExecutionContext.setHost(taskInstance.getHost());
70-
taskExecutionContext.setVarPool(taskInstance.getVarPool());
7170
taskExecutionContext.setDryRun(taskInstance.getDryRun());
7271
taskExecutionContext.setTestFlag(taskInstance.getTestFlag());
7372
taskExecutionContext.setCpuQuota(taskInstance.getCpuQuota());

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionContextCreateRequest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
2323
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
2424
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
25+
import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraph;
2526

2627
import lombok.AllArgsConstructor;
2728
import lombok.Builder;
@@ -32,6 +33,7 @@
3233
@AllArgsConstructor
3334
public class TaskExecutionContextCreateRequest {
3435

36+
private IWorkflowExecutionGraph workflowExecutionGraph;
3537
private WorkflowDefinition workflowDefinition;
3638
private WorkflowInstance workflowInstance;
3739
private TaskDefinition taskDefinition;

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,6 @@ public TaskExecutionRunnable(TaskExecutionRunnableBuilder taskExecutionRunnableB
7272
this.workflowInstance = checkNotNull(taskExecutionRunnableBuilder.getWorkflowInstance());
7373
this.taskDefinition = checkNotNull(taskExecutionRunnableBuilder.getTaskDefinition());
7474
this.taskInstance = taskExecutionRunnableBuilder.getTaskInstance();
75-
if (isTaskInstanceInitialized()) {
76-
initializeTaskExecutionContext();
77-
}
7875
}
7976

8077
@Override
@@ -92,7 +89,6 @@ public void initializeFirstRunTaskInstance() {
9289
.withTaskDefinition(taskDefinition)
9390
.withWorkflowInstance(workflowInstance)
9491
.build();
95-
initializeTaskExecutionContext();
9692
}
9793

9894
@Override
@@ -108,7 +104,6 @@ public void retry() {
108104
.builder()
109105
.withTaskInstance(taskInstance)
110106
.build();
111-
initializeTaskExecutionContext();
112107
getWorkflowEventBus().publish(TaskStartLifecycleEvent.of(this));
113108
}
114109

@@ -124,8 +119,6 @@ public void failover() {
124119
.builder()
125120
.withTaskInstance(taskInstance)
126121
.build();
127-
initializeTaskExecutionContext();
128-
129122
getWorkflowEventBus().publish(TaskStartLifecycleEvent.of(this));
130123
}
131124

@@ -139,17 +132,20 @@ public void kill() {
139132
getWorkflowEventBus().publish(TaskKillLifecycleEvent.of(this));
140133
}
141134

142-
private void initializeTaskExecutionContext() {
143-
checkState(isTaskInstanceInitialized(), "The task instance is null, can't initialize TaskExecutionContext.");
135+
@Override
136+
public void initializeTaskExecutionContext() {
137+
checkState(isTaskInstanceInitialized(),
138+
"The task instance is not initialized, can't initialize TaskExecutionContext.");
144139
final TaskExecutionContextCreateRequest request = TaskExecutionContextCreateRequest.builder()
140+
.workflowExecutionGraph(workflowExecutionGraph)
145141
.workflowDefinition(workflowDefinition)
146142
.project(project)
147143
.workflowInstance(workflowInstance)
148144
.taskDefinition(taskDefinition)
149145
.taskInstance(taskInstance)
150146
.build();
151-
this.taskExecutionContext = applicationContext.getBean(TaskExecutionContextFactory.class)
152-
.createTaskExecutionContext(request);
147+
this.taskExecutionContext =
148+
applicationContext.getBean(TaskExecutionContextFactory.class).createTaskExecutionContext(request);
153149
}
154150

155151
private boolean takeOverTaskFromExecutor() {

0 commit comments

Comments
 (0)