Skip to content

Commit 0c14fe8

Browse files
Zzih96ruanwenjun
authored andcommitted
fix sql varPool not work
1 parent 714ec4a commit 0c14fe8

5 files changed

Lines changed: 33 additions & 16 deletions

File tree

  • dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor
  • dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor
  • dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src
  • dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutor.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,6 @@ protected void doTriggerTaskPlugin() {
5959
logicTask.start();
6060
}
6161

62-
@Override
63-
protected void closureTaskContext() {
64-
}
65-
6662
@SneakyThrows
6763
@Override
6864
public void pause() {

dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/AbstractTaskExecutor.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,6 @@ public void start() {
7979
}
8080

8181
doTriggerTaskPlugin();
82-
83-
closureTaskContext();
8482
}
8583

8684
@Override
@@ -127,8 +125,6 @@ protected void transitTaskExecutorState(final TaskExecutorState taskExecutorStat
127125

128126
protected abstract void doTriggerTaskPlugin();
129127

130-
protected abstract void closureTaskContext();
131-
132128
protected void initializeTaskContext() {
133129
taskExecutionContext.setStartTime(System.currentTimeMillis());
134130
log.info(TaskLogMarkers.excludeInTaskLog(), "Initialized taskContext {}",

dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,9 @@ public void executeFuncAndSql(List<SqlBinds> mainStatementsBinds,
198198

199199
// post execute
200200
executeUpdate(connection, postStatementsBinds, "post");
201+
202+
// set varPool
203+
taskExecutionContext.setVarPool(sqlParameters.getVarPool());
201204
} catch (Exception e) {
202205
log.error("execute sql error: {}", e.getMessage());
203206
throw e;

dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/test/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
2525
import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType;
2626
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
27+
import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters;
2728
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters;
2829
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
2930
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
@@ -173,4 +174,33 @@ void testReplacingSqlHasQuestionMarkAndParams() {
173174
Assertions.assertEquals(4, sqlParamsMap.size());
174175
Assertions.assertEquals(expected, formatSql);
175176
}
177+
178+
@Test
179+
void testVarPoolSetting() {
180+
// 创建包含OUT参数的SqlParameters
181+
SqlParameters sqlParameters = new SqlParameters();
182+
sqlParameters.setType("HIVE");
183+
sqlParameters.setDatasource(1);
184+
sqlParameters.setSql("select id, name from user where id = 1");
185+
186+
// 设置OUT参数
187+
Property outParam = new Property("id", Direct.OUT, DataType.VARCHAR, "");
188+
sqlParameters.setLocalParams(Lists.newArrayList(outParam));
189+
190+
// 模拟SQL执行结果
191+
String sqlResult = "[{\"id\":\"1\",\"name\":\"test_user\"}]";
192+
193+
// 调用dealOutParam方法处理输出参数
194+
sqlParameters.dealOutParam(sqlResult);
195+
196+
// 验证varPool是否正确设置
197+
Assertions.assertNotNull(sqlParameters.getVarPool());
198+
Assertions.assertEquals(1, sqlParameters.getVarPool().size());
199+
200+
// 验证参数值是否正确
201+
Property varPoolParam = sqlParameters.getVarPool().get(0);
202+
Assertions.assertEquals("id", varPoolParam.getProp());
203+
Assertions.assertEquals("1", varPoolParam.getValue());
204+
Assertions.assertEquals(Direct.OUT, varPoolParam.getDirect());
205+
}
176206
}

dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -87,14 +87,6 @@ public void updateTaskInstanceInfo(final int taskInstanceId) {
8787
});
8888
}
8989

90-
@Override
91-
protected void closureTaskContext() {
92-
log.info("Begin to closure taskContext.");
93-
taskExecutionContext.setVarPool(physicalTask.getParameters().getVarPool());
94-
log.info("Set taskContext varPool {}", JSONUtils.toPrettyJsonString(taskExecutionContext.getVarPool()));
95-
log.info("End closure taskContext.");
96-
}
97-
9890
@Override
9991
protected TaskExecutorState doTrackTaskPluginStatus() {
10092
return TaskExecutorStateMappings.mapState(physicalTask.getExitStatus());

0 commit comments

Comments
 (0)