Skip to content

Commit 0a00aa4

Browse files
authored
[Improvement-17986][task-plugin] Support parameter replacement in Flink and FlinkStream task (#17987)
1 parent f9e6ecf commit 0a00aa4

4 files changed

Lines changed: 248 additions & 15 deletions

File tree

  • dolphinscheduler-task-plugin

dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828

2929
import java.io.IOException;
3030
import java.util.List;
31-
import java.util.stream.Collectors;
3231

3332
import lombok.extern.slf4j.Slf4j;
3433

@@ -53,20 +52,11 @@ public void init() {
5352
if (flinkParameters == null || !flinkParameters.checkParameters()) {
5453
throw new RuntimeException("flink task params is not valid");
5554
}
56-
57-
FileUtils.generateScriptFile(taskExecutionContext, flinkParameters);
5855
}
5956

60-
/**
61-
* create command
62-
*
63-
* @return command
64-
*/
6557
@Override
6658
protected String getScript() {
67-
// flink run/run-application [OPTIONS] <jar-file> <arguments>
68-
List<String> args = FlinkArgsUtils.buildRunCommandLine(taskExecutionContext, flinkParameters);
69-
return args.stream().collect(Collectors.joining(" "));
59+
return buildScriptWithParameterReplacement(flinkParameters);
7060
}
7161

7262
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.plugin.task.flink;
19+
20+
import static org.apache.dolphinscheduler.common.constants.DateConstants.PARAMETER_DATETIME;
21+
22+
import org.apache.dolphinscheduler.common.utils.JSONUtils;
23+
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
24+
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
25+
26+
import java.nio.charset.StandardCharsets;
27+
import java.nio.file.Files;
28+
import java.nio.file.Path;
29+
import java.nio.file.Paths;
30+
import java.util.HashMap;
31+
import java.util.Map;
32+
33+
import org.junit.jupiter.api.Assertions;
34+
import org.junit.jupiter.api.Test;
35+
import org.junit.jupiter.api.io.TempDir;
36+
37+
public class FlinkStreamTaskTest {
38+
39+
@TempDir
40+
Path tempDir;
41+
42+
@Test
43+
public void testParameterReplacementInScript() throws Exception {
44+
String executePath = tempDir.toString();
45+
String taskAppId = "test-app";
46+
47+
FlinkStreamParameters flinkParameters = new FlinkStreamParameters();
48+
flinkParameters.setProgramType(ProgramType.SQL);
49+
flinkParameters.setDeployMode(FlinkDeployMode.LOCAL);
50+
flinkParameters.setParallelism(2);
51+
flinkParameters.setInitScript("SET 'date' = '${bizdate}';");
52+
flinkParameters.setRawScript("SELECT * FROM logs WHERE dt = '${bizdate}' AND env = '${env}'");
53+
54+
Map<String, Property> prepareParamsMap = new HashMap<>();
55+
prepareParamsMap.put("bizdate", new Property("bizdate", null, null, "20250601"));
56+
prepareParamsMap.put("env", new Property("env", null, null, "prod"));
57+
58+
TaskExecutionContext context = new TaskExecutionContext();
59+
context.setTaskParams(JSONUtils.toJsonString(flinkParameters));
60+
context.setExecutePath(executePath);
61+
context.setTaskAppId(taskAppId);
62+
context.setPrepareParamsMap(prepareParamsMap);
63+
64+
FlinkStreamTask task = new FlinkStreamTask(context);
65+
task.init();
66+
task.getScript();
67+
68+
String initScriptPath = String.format("%s/%s_init.sql", executePath, taskAppId);
69+
String nodeScriptPath = String.format("%s/%s_node.sql", executePath, taskAppId);
70+
71+
String initContent = new String(Files.readAllBytes(Paths.get(initScriptPath)), StandardCharsets.UTF_8);
72+
String nodeContent = new String(Files.readAllBytes(Paths.get(nodeScriptPath)), StandardCharsets.UTF_8);
73+
74+
String expectedInitOptions = String.join(FlinkConstants.FLINK_SQL_NEWLINE,
75+
FlinkArgsUtils.buildInitOptionsForSql(flinkParameters)).concat(FlinkConstants.FLINK_SQL_NEWLINE);
76+
Assertions.assertEquals(expectedInitOptions + "SET 'date' = '20250601';", initContent);
77+
Assertions.assertEquals("SELECT * FROM logs WHERE dt = '20250601' AND env = 'prod'", nodeContent.trim());
78+
}
79+
80+
@Test
81+
public void testParameterReplacementTimePlaceholder() throws Exception {
82+
String executePath = tempDir.toString();
83+
String taskAppId = "test-time";
84+
85+
FlinkStreamParameters flinkParameters = new FlinkStreamParameters();
86+
flinkParameters.setProgramType(ProgramType.SQL);
87+
flinkParameters.setDeployMode(FlinkDeployMode.LOCAL);
88+
flinkParameters.setParallelism(2);
89+
flinkParameters.setInitScript("");
90+
flinkParameters.setRawScript("INSERT INTO t SELECT * FROM s WHERE dt = '$[yyyyMMdd]'");
91+
92+
Map<String, Property> prepareParamsMap = new HashMap<>();
93+
prepareParamsMap.put(PARAMETER_DATETIME, new Property(PARAMETER_DATETIME, null, null, "20210815080000"));
94+
95+
TaskExecutionContext context = new TaskExecutionContext();
96+
context.setTaskParams(JSONUtils.toJsonString(flinkParameters));
97+
context.setExecutePath(executePath);
98+
context.setTaskAppId(taskAppId);
99+
context.setPrepareParamsMap(prepareParamsMap);
100+
101+
FlinkStreamTask task = new FlinkStreamTask(context);
102+
task.init();
103+
task.getScript();
104+
105+
String nodeScriptPath = String.format("%s/%s_node.sql", executePath, taskAppId);
106+
String nodeContent = new String(Files.readAllBytes(Paths.get(nodeScriptPath)), StandardCharsets.UTF_8);
107+
108+
Assertions.assertEquals("INSERT INTO t SELECT * FROM s WHERE dt = '20210815'", nodeContent.trim());
109+
}
110+
}

dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,14 @@
2121
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
2222
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
2323
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
24+
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
2425
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
26+
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
27+
28+
import org.apache.commons.lang3.StringUtils;
2529

2630
import java.util.List;
31+
import java.util.Map;
2732
import java.util.regex.Matcher;
2833
import java.util.regex.Pattern;
2934
import java.util.stream.Collectors;
@@ -62,8 +67,6 @@ public void init() {
6267
if (flinkParameters == null || !flinkParameters.checkParameters()) {
6368
throw new RuntimeException("flink task params is not valid");
6469
}
65-
66-
FileUtils.generateScriptFile(taskExecutionContext, flinkParameters);
6770
}
6871

6972
/**
@@ -73,8 +76,31 @@ public void init() {
7376
*/
7477
@Override
7578
protected String getScript() {
76-
// flink run/run-application [OPTIONS] <jar-file> <arguments>
77-
List<String> args = FlinkArgsUtils.buildRunCommandLine(taskExecutionContext, flinkParameters);
79+
return buildScriptWithParameterReplacement(flinkParameters);
80+
}
81+
82+
/**
83+
* Apply parameter replacement to initScript/rawScript, generate script files and build run command.
84+
*
85+
* @param params flink parameters
86+
* @return run command string
87+
*/
88+
protected String buildScriptWithParameterReplacement(FlinkParameters params) {
89+
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
90+
Map<String, String> stringParams = ParameterUtils.convert(paramsMap);
91+
92+
if (StringUtils.isNotBlank(params.getInitScript())) {
93+
params.setInitScript(
94+
ParameterUtils.convertParameterPlaceholders(params.getInitScript(), stringParams));
95+
}
96+
if (StringUtils.isNotBlank(params.getRawScript())) {
97+
params.setRawScript(
98+
ParameterUtils.convertParameterPlaceholders(params.getRawScript(), stringParams));
99+
}
100+
101+
FileUtils.generateScriptFile(taskExecutionContext, params);
102+
103+
List<String> args = FlinkArgsUtils.buildRunCommandLine(taskExecutionContext, params);
78104
return args.stream().collect(Collectors.joining(" "));
79105
}
80106

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.plugin.task.flink;
19+
20+
import static org.apache.dolphinscheduler.common.constants.DateConstants.PARAMETER_DATETIME;
21+
22+
import org.apache.dolphinscheduler.common.utils.JSONUtils;
23+
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
24+
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
25+
26+
import java.nio.charset.StandardCharsets;
27+
import java.nio.file.Files;
28+
import java.nio.file.Path;
29+
import java.nio.file.Paths;
30+
import java.util.HashMap;
31+
import java.util.Map;
32+
33+
import org.junit.jupiter.api.Assertions;
34+
import org.junit.jupiter.api.Test;
35+
import org.junit.jupiter.api.io.TempDir;
36+
37+
public class FlinkTaskTest {
38+
39+
@TempDir
40+
Path tempDir;
41+
42+
@Test
43+
public void testParameterReplacementInScript() throws Exception {
44+
String executePath = tempDir.toString();
45+
String taskAppId = "test-app";
46+
47+
FlinkParameters flinkParameters = new FlinkParameters();
48+
flinkParameters.setProgramType(ProgramType.SQL);
49+
flinkParameters.setDeployMode(FlinkDeployMode.LOCAL);
50+
flinkParameters.setParallelism(2);
51+
flinkParameters.setInitScript("set batch_size=${batch_size};");
52+
flinkParameters.setRawScript("SELECT * FROM logs WHERE dt='$[yyyyMMdd]';");
53+
54+
Map<String, Property> prepareParamsMap = new HashMap<>();
55+
prepareParamsMap.put("batch_size", new Property("batch_size", null, null, "1000"));
56+
prepareParamsMap.put(PARAMETER_DATETIME, new Property(PARAMETER_DATETIME, null, null, "20201201120000"));
57+
58+
TaskExecutionContext context = new TaskExecutionContext();
59+
context.setTaskParams(JSONUtils.toJsonString(flinkParameters));
60+
context.setExecutePath(executePath);
61+
context.setTaskAppId(taskAppId);
62+
context.setPrepareParamsMap(prepareParamsMap);
63+
64+
FlinkTask task = new FlinkTask(context);
65+
task.init();
66+
task.getScript();
67+
68+
String initScriptPath = String.format("%s/%s_init.sql", executePath, taskAppId);
69+
String nodeScriptPath = String.format("%s/%s_node.sql", executePath, taskAppId);
70+
71+
String initContent = new String(Files.readAllBytes(Paths.get(initScriptPath)), StandardCharsets.UTF_8);
72+
String nodeContent = new String(Files.readAllBytes(Paths.get(nodeScriptPath)), StandardCharsets.UTF_8);
73+
74+
String expectedInitOptions = String.join(FlinkConstants.FLINK_SQL_NEWLINE,
75+
FlinkArgsUtils.buildInitOptionsForSql(flinkParameters)).concat(FlinkConstants.FLINK_SQL_NEWLINE);
76+
Assertions.assertEquals(expectedInitOptions + "set batch_size=1000;", initContent);
77+
Assertions.assertEquals("SELECT * FROM logs WHERE dt='20201201';", nodeContent.trim());
78+
}
79+
80+
@Test
81+
public void testParameterReplacementWithNullParamsMap() throws Exception {
82+
String executePath = tempDir.toString();
83+
String taskAppId = "test-null-params";
84+
85+
FlinkParameters flinkParameters = new FlinkParameters();
86+
flinkParameters.setProgramType(ProgramType.SQL);
87+
flinkParameters.setDeployMode(FlinkDeployMode.LOCAL);
88+
flinkParameters.setParallelism(2);
89+
flinkParameters.setInitScript("");
90+
flinkParameters.setRawScript("SELECT 1;");
91+
92+
TaskExecutionContext context = new TaskExecutionContext();
93+
context.setTaskParams(JSONUtils.toJsonString(flinkParameters));
94+
context.setExecutePath(executePath);
95+
context.setTaskAppId(taskAppId);
96+
context.setPrepareParamsMap(null);
97+
98+
FlinkTask task = new FlinkTask(context);
99+
task.init();
100+
String script = task.getScript();
101+
102+
String nodeScriptPath = String.format("%s/%s_node.sql", executePath, taskAppId);
103+
String nodeContent = new String(Files.readAllBytes(Paths.get(nodeScriptPath)), StandardCharsets.UTF_8);
104+
Assertions.assertEquals("SELECT 1;", nodeContent.trim());
105+
Assertions.assertNotNull(script);
106+
}
107+
}

0 commit comments

Comments
 (0)