From 2e01dc6cf25decf186e2fdfbcbe47c5a72c97f76 Mon Sep 17 00:00:00 2001 From: Zzih96 Date: Wed, 27 Aug 2025 15:08:20 +0800 Subject: [PATCH 1/4] ddd the closureTaskContext method for the closing task and add varPool writeBack --- .../server/master/engine/executor/LogicTaskExecutor.java | 3 +++ .../task/executor/AbstractTaskExecutor.java | 4 ++++ .../server/worker/executor/PhysicalTaskExecutor.java | 8 ++++++++ 3 files changed, 15 insertions(+) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutor.java index 4dafabfe4d24..f3686593ec0c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutor.java @@ -59,6 +59,9 @@ protected void doTriggerTaskPlugin() { logicTask.start(); } + @Override + protected void closureTaskContext() {} + @SneakyThrows @Override public void pause() { diff --git a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/AbstractTaskExecutor.java b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/AbstractTaskExecutor.java index 79a0b4af393c..86967b50ddd0 100644 --- a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/AbstractTaskExecutor.java +++ b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/AbstractTaskExecutor.java @@ -79,6 +79,8 @@ public void start() { } doTriggerTaskPlugin(); + + closureTaskContext(); } @Override @@ -125,6 +127,8 @@ protected void transitTaskExecutorState(final TaskExecutorState taskExecutorStat protected abstract void doTriggerTaskPlugin(); + protected abstract void closureTaskContext(); + protected void initializeTaskContext() { taskExecutionContext.setStartTime(System.currentTimeMillis()); log.info(TaskLogMarkers.excludeInTaskLog(), "Initialized taskContext {}", diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java index d6139b2019b1..03d3de2f2151 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java @@ -87,6 +87,14 @@ public void updateTaskInstanceInfo(final int taskInstanceId) { }); } + @Override + protected void closureTaskContext() { + log.info("Begin to closure taskContext."); + taskExecutionContext.setVarPool(physicalTask.getParameters().getVarPool()); + log.info("Set taskContext varPool {}", JSONUtils.toPrettyJsonString(taskExecutionContext.getVarPool())); + log.info("End closure taskContext."); + } + @Override protected TaskExecutorState doTrackTaskPluginStatus() { return TaskExecutorStateMappings.mapState(physicalTask.getExitStatus()); From fe4a770d550781204528af4485822e7dbb62dfd2 Mon Sep 17 00:00:00 2001 From: Zzih96 Date: Thu, 28 Aug 2025 09:54:29 +0800 Subject: [PATCH 2/4] ./mvnw spotless:apply --- .../server/master/engine/executor/LogicTaskExecutor.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutor.java index f3686593ec0c..52420caf579f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutor.java @@ -60,7 +60,8 @@ protected void doTriggerTaskPlugin() { } @Override - protected void closureTaskContext() {} + protected void closureTaskContext() { + } @SneakyThrows @Override From 560e97ab973fcfff03eee3c87a4c9d8180db09c9 Mon Sep 17 00:00:00 2001 From: Zzih96 Date: Thu, 4 Sep 2025 17:28:58 +0800 Subject: [PATCH 3/4] fix sql varPool not work --- .../engine/executor/LogicTaskExecutor.java | 4 --- .../task/executor/AbstractTaskExecutor.java | 4 --- .../plugin/task/sql/SqlTask.java | 3 ++ .../plugin/task/sql/SqlTaskTest.java | 30 +++++++++++++++++++ .../worker/executor/PhysicalTaskExecutor.java | 8 ----- 5 files changed, 33 insertions(+), 16 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutor.java index 52420caf579f..4dafabfe4d24 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutor.java @@ -59,10 +59,6 @@ protected void doTriggerTaskPlugin() { logicTask.start(); } - @Override - protected void closureTaskContext() { - } - @SneakyThrows @Override public void pause() { diff --git a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/AbstractTaskExecutor.java b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/AbstractTaskExecutor.java index 86967b50ddd0..79a0b4af393c 100644 --- a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/AbstractTaskExecutor.java +++ b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/AbstractTaskExecutor.java @@ -79,8 +79,6 @@ public void start() { } doTriggerTaskPlugin(); - - closureTaskContext(); } @Override @@ -127,8 +125,6 @@ protected void transitTaskExecutorState(final TaskExecutorState taskExecutorStat protected abstract void doTriggerTaskPlugin(); - protected abstract void closureTaskContext(); - protected void initializeTaskContext() { taskExecutionContext.setStartTime(System.currentTimeMillis()); log.info(TaskLogMarkers.excludeInTaskLog(), "Initialized taskContext {}", diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java index 76bdbbc5fbdd..0a866fc499ca 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java @@ -198,6 +198,9 @@ public void executeFuncAndSql(List mainStatementsBinds, // post execute executeUpdate(connection, postStatementsBinds, "post"); + + // set varPool + taskExecutionContext.setVarPool(sqlParameters.getVarPool()); } catch (Exception e) { log.error("execute sql error: {}", e.getMessage()); throw e; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/test/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/test/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskTest.java index 3e1b25a4be97..aca7b98bd596 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/test/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/test/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskTest.java @@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType; import org.apache.dolphinscheduler.plugin.task.api.model.Property; +import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; @@ -173,4 +174,33 @@ void testReplacingSqlHasQuestionMarkAndParams() { Assertions.assertEquals(4, sqlParamsMap.size()); Assertions.assertEquals(expected, formatSql); } + + @Test + void testVarPoolSetting() { + // 创建包含OUT参数的SqlParameters + SqlParameters sqlParameters = new SqlParameters(); + sqlParameters.setType("HIVE"); + sqlParameters.setDatasource(1); + sqlParameters.setSql("select id, name from user where id = 1"); + + // 设置OUT参数 + Property outParam = new Property("id", Direct.OUT, DataType.VARCHAR, ""); + sqlParameters.setLocalParams(Lists.newArrayList(outParam)); + + // 模拟SQL执行结果 + String sqlResult = "[{\"id\":\"1\",\"name\":\"test_user\"}]"; + + // 调用dealOutParam方法处理输出参数 + sqlParameters.dealOutParam(sqlResult); + + // 验证varPool是否正确设置 + Assertions.assertNotNull(sqlParameters.getVarPool()); + Assertions.assertEquals(1, sqlParameters.getVarPool().size()); + + // 验证参数值是否正确 + Property varPoolParam = sqlParameters.getVarPool().get(0); + Assertions.assertEquals("id", varPoolParam.getProp()); + Assertions.assertEquals("1", varPoolParam.getValue()); + Assertions.assertEquals(Direct.OUT, varPoolParam.getDirect()); + } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java index 03d3de2f2151..d6139b2019b1 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java @@ -87,14 +87,6 @@ public void updateTaskInstanceInfo(final int taskInstanceId) { }); } - @Override - protected void closureTaskContext() { - log.info("Begin to closure taskContext."); - taskExecutionContext.setVarPool(physicalTask.getParameters().getVarPool()); - log.info("Set taskContext varPool {}", JSONUtils.toPrettyJsonString(taskExecutionContext.getVarPool())); - log.info("End closure taskContext."); - } - @Override protected TaskExecutorState doTrackTaskPluginStatus() { return TaskExecutorStateMappings.mapState(physicalTask.getExitStatus()); From 4694bb86a1cc6b406e2196b8bef67011cd06bc0f Mon Sep 17 00:00:00 2001 From: Zzih96 Date: Fri, 5 Sep 2025 11:46:16 +0800 Subject: [PATCH 4/4] remove chinese comments --- .../dolphinscheduler/plugin/task/sql/SqlTaskTest.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/test/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/test/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskTest.java index aca7b98bd596..233d19757ab0 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/test/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/test/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskTest.java @@ -177,27 +177,21 @@ void testReplacingSqlHasQuestionMarkAndParams() { @Test void testVarPoolSetting() { - // 创建包含OUT参数的SqlParameters SqlParameters sqlParameters = new SqlParameters(); sqlParameters.setType("HIVE"); sqlParameters.setDatasource(1); sqlParameters.setSql("select id, name from user where id = 1"); - // 设置OUT参数 Property outParam = new Property("id", Direct.OUT, DataType.VARCHAR, ""); sqlParameters.setLocalParams(Lists.newArrayList(outParam)); - // 模拟SQL执行结果 String sqlResult = "[{\"id\":\"1\",\"name\":\"test_user\"}]"; - // 调用dealOutParam方法处理输出参数 sqlParameters.dealOutParam(sqlResult); - // 验证varPool是否正确设置 Assertions.assertNotNull(sqlParameters.getVarPool()); Assertions.assertEquals(1, sqlParameters.getVarPool().size()); - // 验证参数值是否正确 Property varPoolParam = sqlParameters.getVarPool().get(0); Assertions.assertEquals("id", varPoolParam.getProp()); Assertions.assertEquals("1", varPoolParam.getValue());