Skip to content

Commit 86c066a

Browse files
authored
Removed unused params in TaskExecutionContext (#17195)
1 parent a41e34f commit 86c066a

12 files changed

Lines changed: 6 additions & 77 deletions

File tree

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionContextBuilder.java

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,9 @@ public TaskExecutionContextBuilder buildProcessInstanceRelatedInfo(final Workflo
102102
taskExecutionContext.setScheduleTime(DateUtils.dateToTimeStamp(workflowInstance.getScheduleTime()));
103103
taskExecutionContext.setGlobalParams(workflowInstance.getGlobalParams());
104104
taskExecutionContext.setExecutorId(workflowInstance.getExecutorId());
105-
taskExecutionContext.setCmdTypeIfComplement(workflowInstance.getCmdTypeIfComplement().getCode());
106105
taskExecutionContext.setTenantCode(workflowInstance.getTenantCode());
107106
taskExecutionContext.setWorkflowDefinitionCode(workflowInstance.getWorkflowDefinitionCode());
108107
taskExecutionContext.setWorkflowDefinitionVersion(workflowInstance.getWorkflowDefinitionVersion());
109-
taskExecutionContext.setProjectCode(workflowInstance.getProjectCode());
110108
return this;
111109
}
112110

@@ -128,27 +126,14 @@ public TaskExecutionContextBuilder buildK8sTaskRelatedInfo(final K8sTaskExecutio
128126
}
129127

130128
/**
131-
* build global and local params
129+
* The runtime params, include local params from task, global params from workflow, startup params from command, varpool params from pre-task, built-in params from system
132130
*
133-
* @param propertyMap
134-
* @return
135131
*/
136132
public TaskExecutionContextBuilder buildPrepareParams(final Map<String, Property> propertyMap) {
137133
taskExecutionContext.setPrepareParamsMap(propertyMap);
138134
return this;
139135
}
140136

141-
/**
142-
* build business params
143-
*
144-
* @param businessParamsMap
145-
* @return
146-
*/
147-
public TaskExecutionContextBuilder buildBusinessParams(final Map<String, Property> businessParamsMap) {
148-
taskExecutionContext.setParamsMap(businessParamsMap);
149-
return this;
150-
}
151-
152137
public TaskExecutionContextBuilder buildWorkflowInstanceHost(final String masterHost) {
153138
taskExecutionContext.setWorkflowInstanceHost(masterHost);
154139
return this;

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecutionContextFactory.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public TaskExecutionContext createTaskExecutionContext(TaskExecutionContextCreat
8585
.buildTaskDefinitionRelatedInfo(request.getTaskDefinition())
8686
.buildProcessInstanceRelatedInfo(request.getWorkflowInstance())
8787
.buildResourceParameters(getResourceParameters(taskInstance))
88-
.buildBusinessParams(getBusinessParams(workflowInstance))
88+
// todo: use TaskRuntimeParameters to replace Map<String, Property> in TaskExecutionContext
8989
.buildPrepareParams(getPrepareParams(taskInstance, workflowInstance, workflowDefinition, project))
9090
.buildK8sTaskRelatedInfo(getK8sTaskExecutionContext(taskInstance))
9191
.create();
@@ -152,10 +152,6 @@ private K8sTaskExecutionContext getK8sTaskExecutionContext(final TaskInstance ta
152152
return k8sTaskExecutionContext;
153153
}
154154

155-
private Map<String, Property> getBusinessParams(final WorkflowInstance workflowInstance) {
156-
return curingParamsService.preBuildBusinessParams(workflowInstance);
157-
}
158-
159155
private Map<String, Property> getPrepareParams(final TaskInstance taskInstance,
160156
final WorkflowInstance workflowInstance,
161157
final WorkflowDefinition workflowDefinition,

dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,6 @@ public AbstractCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
9393
this.logBuffer = new LinkedBlockingQueue<>();
9494
this.logBuffer.add(EMPTY_STRING);
9595

96-
if (this.taskRequest != null) {
97-
// set logBufferEnable=true if the task uses logHandler and logBuffer to buffer log messages
98-
this.taskRequest.setLogBufferEnable(true);
99-
}
10096
}
10197

10298
// todo: We need to build the IShellActuator in outer class, since different task may have specific logic to build

dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
2626
import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
2727
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
28+
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
2829

2930
import java.util.List;
30-
import java.util.Map;
3131

3232
import lombok.extern.slf4j.Slf4j;
3333

@@ -46,7 +46,7 @@ public AbstractYarnTask(TaskExecutionContext taskRequest) {
4646
public void handle(TaskCallBack taskCallBack) throws TaskException {
4747
try {
4848
IShellInterceptorBuilder shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
49-
.properties(getProperties())
49+
.properties(ParameterUtils.convert(taskRequest.getPrepareParamsMap()))
5050
// todo: do we need to move the replace to subclass?
5151
.appendScript(getScript().replaceAll("\\r\\n", System.lineSeparator()));
5252
// SHELL task exit code
@@ -109,9 +109,4 @@ public List<String> getApplicationIds() throws TaskException {
109109
* Get the script used to bootstrap the task
110110
*/
111111
protected abstract String getScript();
112-
113-
/**
114-
* Get the properties of the task used to replace the placeholders in the script.
115-
*/
116-
protected abstract Map<String, String> getProperties();
117112
}

dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -80,26 +80,17 @@ public class TaskExecutionContext implements Serializable {
8080

8181
private int executorId;
8282

83-
private int cmdTypeIfComplement;
84-
8583
private String tenantCode;
8684

8785
private int workflowDefinitionId;
8886

89-
private int projectId;
90-
91-
private long projectCode;
92-
9387
private String taskParams;
9488

9589
private String environmentConfig;
9690

9791
/**
98-
* definedParams
99-
* // todo: we need to rename definedParams, prepareParamsMap, paramsMap, this is confusing
92+
* Include local params, global params and system built-in params
10093
*/
101-
private Map<String, String> definedParams;
102-
10394
private Map<String, Property> prepareParamsMap;
10495

10596
// Please use task instanceId
@@ -126,16 +117,12 @@ public class TaskExecutionContext implements Serializable {
126117

127118
private int dryRun;
128119

129-
private Map<String, Property> paramsMap;
130-
131120
private Integer cpuQuota;
132121

133122
private Integer memoryMax;
134123

135124
private int testFlag;
136125

137-
private boolean logBufferEnable;
138-
139126
private int dispatchFailTimes;
140127

141128
private boolean failover;

dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828

2929
import java.io.IOException;
3030
import java.util.List;
31-
import java.util.Map;
3231
import java.util.stream.Collectors;
3332

3433
import lombok.extern.slf4j.Slf4j;
@@ -70,11 +69,6 @@ protected String getScript() {
7069
return args.stream().collect(Collectors.joining(" "));
7170
}
7271

73-
@Override
74-
protected Map<String, String> getProperties() {
75-
return taskExecutionContext.getDefinedParams();
76-
}
77-
7872
@Override
7973
public AbstractParameters getParameters() {
8074
return flinkParameters;

dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
2525

2626
import java.util.List;
27-
import java.util.Map;
2827
import java.util.regex.Matcher;
2928
import java.util.regex.Pattern;
3029
import java.util.stream.Collectors;
@@ -79,11 +78,6 @@ protected String getScript() {
7978
return args.stream().collect(Collectors.joining(" "));
8079
}
8180

82-
@Override
83-
protected Map<String, String> getProperties() {
84-
return taskExecutionContext.getDefinedParams();
85-
}
86-
8781
@Override
8882
public AbstractParameters getParameters() {
8983
return flinkParameters;

dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,6 @@ protected String getScript() {
9191
return args.stream().collect(Collectors.joining(" "));
9292
}
9393

94-
@Override
95-
protected Map<String, String> getProperties() {
96-
return taskExecutionContext.getDefinedParams();
97-
}
98-
9994
@Override
10095
public AbstractParameters getParameters() {
10196
return mapreduceParameters;

dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ public void handle(TaskCallBack taskCallBack) throws TaskException {
9898
setProcessId(taskResponse.getProcessId());
9999
setTaskOutputParams(shellCommandExecutor.getTaskOutputParams());
100100
pythonParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
101+
taskRequest.setVarPool(JSONUtils.toJsonString(pythonParameters.getVarPool()));
101102
} catch (Exception e) {
102103
log.error("python task failure", e);
103104
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);

dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,11 +112,6 @@ protected String getScript() {
112112
return args.stream().collect(Collectors.joining(" "));
113113
}
114114

115-
@Override
116-
protected Map<String, String> getProperties() {
117-
return ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap());
118-
}
119-
120115
/**
121116
* build spark options
122117
*

0 commit comments

Comments
 (0)