Skip to content

Commit 16dbda9

Browse files
authored
[Improvement-16754][DataX] Support DataX writer parameter batchSize (#18192)
1 parent c502ea1 commit 16dbda9

7 files changed

Lines changed: 110 additions & 0 deletions

File tree

dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParameters.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,11 @@ public class DataxParameters extends AbstractParameters {
101101
*/
102102
private int xmx;
103103

104+
/**
105+
* writer batch size for DataX
106+
*/
107+
private int batchSize;
108+
104109
private List<ResourceInfo> resourceList;
105110

106111
@Override
@@ -138,6 +143,7 @@ public String toString() {
138143
", jobChannel=" + jobChannel +
139144
", xms=" + xms +
140145
", xmx=" + xmx +
146+
", batchSize=" + batchSize +
141147
", resourceList=" + JSONUtils.toJsonString(resourceList) +
142148
'}';
143149
}

dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,10 @@ private List<ObjectNode> buildDataxJobContentJson() {
282282
}
283283
}
284284

285+
if (dataXParameters.getBatchSize() > 0) {
286+
writerParam.put("batchSize", dataXParameters.getBatchSize());
287+
}
288+
285289
ObjectNode writer = JSONUtils.createObjectNode();
286290
writer.put("name", DataxUtils.getWriterPluginName(dataxTaskExecutionContext.getTargetType()));
287291
writer.set("parameter", writerParam);

dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParametersTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,26 @@ public void testToString() {
9191
+ "jobChannel=1, "
9292
+ "xms=0, "
9393
+ "xmx=-100, "
94+
+ "batchSize=0, "
9495
+ "resourceList=[{\"id\":null,\"resourceName\":\"/hdfs.keytab\",\"res\":null}]"
9596
+ "}";
9697

9798
Assertions.assertEquals(expected, dataxParameters.toString());
9899
}
99100

101+
@Test
102+
public void testBatchSize() {
103+
DataxParameters dataxParameters = new DataxParameters();
104+
dataxParameters.setBatchSize(0);
105+
Assertions.assertEquals(0, dataxParameters.getBatchSize());
106+
107+
dataxParameters.setBatchSize(2048);
108+
Assertions.assertEquals(2048, dataxParameters.getBatchSize());
109+
110+
dataxParameters.setBatchSize(65536);
111+
Assertions.assertEquals(65536, dataxParameters.getBatchSize());
112+
}
113+
100114
public String loadJvmEnvTest(DataxParameters dataXParameters) {
101115
int xms = dataXParameters.getXms() < 1 ? 1 : dataXParameters.getXms();
102116
int xmx = dataXParameters.getXmx() < 1 ? 1 : dataXParameters.getXmx();

dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.dolphinscheduler.common.utils.FileUtils;
2828
import org.apache.dolphinscheduler.common.utils.JSONUtils;
2929
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
30+
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
3031
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
3132
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
3233
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
@@ -45,13 +46,15 @@
4546
import java.io.IOException;
4647
import java.io.InputStream;
4748
import java.lang.reflect.Field;
49+
import java.lang.reflect.Method;
4850
import java.nio.file.Files;
4951
import java.sql.Connection;
5052
import java.sql.PreparedStatement;
5153
import java.sql.ResultSet;
5254
import java.sql.ResultSetMetaData;
5355
import java.sql.SQLException;
5456
import java.util.HashMap;
57+
import java.util.List;
5558
import java.util.Map;
5659

5760
import org.junit.jupiter.api.Assertions;
@@ -62,6 +65,8 @@
6265
import org.mockito.Mockito;
6366
import org.mockito.junit.jupiter.MockitoExtension;
6467

68+
import com.fasterxml.jackson.databind.node.ObjectNode;
69+
6570
@ExtendWith(MockitoExtension.class)
6671
public class DataxTaskTest {
6772

@@ -273,6 +278,71 @@ private TaskExecutionContext buildTestTaskExecutionContext() {
273278
return taskExecutionContext;
274279
}
275280

281+
@Test
282+
public void testBuildDataxJobContentJsonWithBatchSize() throws Exception {
283+
// set batchSize > 0 via reflection
284+
Field dataXParametersField = DataxTask.class.getDeclaredField("dataXParameters");
285+
dataXParametersField.setAccessible(true);
286+
DataxParameters params = (DataxParameters) dataXParametersField.get(dataxTask);
287+
params.setBatchSize(1024);
288+
params.setDsType("MYSQL");
289+
params.setDtType("MYSQL");
290+
291+
// set dataxTaskExecutionContext via reflection
292+
DataxTaskExecutionContext ctx = new DataxTaskExecutionContext();
293+
ctx.setSourcetype(DbType.MYSQL);
294+
ctx.setTargetType(DbType.MYSQL);
295+
ctx.setSourceConnectionParams(
296+
"{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:mysql://localhost:3306\"}");
297+
ctx.setTargetConnectionParams(
298+
"{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:mysql://localhost:3306\"}");
299+
300+
Field ctxField = DataxTask.class.getDeclaredField("dataxTaskExecutionContext");
301+
ctxField.setAccessible(true);
302+
ctxField.set(dataxTask, ctx);
303+
304+
BaseConnectionParam mockConnParam = mock(BaseConnectionParam.class);
305+
when(mockConnParam.getUser()).thenReturn("root");
306+
when(mockConnParam.getPassword()).thenReturn("123456");
307+
when(mockConnParam.getCompatibleMode()).thenReturn(null);
308+
309+
try (
310+
MockedStatic<DataSourceUtils> mockedDataSourceUtils = mockStatic(DataSourceUtils.class);
311+
MockedStatic<DataSourceClientProvider> mockedProvider = mockStatic(DataSourceClientProvider.class)) {
312+
313+
mockedDataSourceUtils
314+
.when(() -> DataSourceUtils.buildConnectionParams(Mockito.any(DbType.class), Mockito.anyString()))
315+
.thenReturn(mockConnParam);
316+
mockedDataSourceUtils.when(() -> DataSourceUtils.getJdbcUrl(Mockito.any(DbType.class), Mockito.any()))
317+
.thenReturn("jdbc:mysql://localhost:3306/test");
318+
319+
Connection connection = mock(Connection.class);
320+
mockedProvider.when(() -> DataSourceClientProvider.getAdHocConnection(Mockito.any(), Mockito.any()))
321+
.thenReturn(connection);
322+
323+
PreparedStatement stmt = mock(PreparedStatement.class);
324+
when(connection.prepareStatement(anyString())).thenReturn(stmt);
325+
ResultSetMetaData md = mock(ResultSetMetaData.class);
326+
when(md.getColumnCount()).thenReturn(1);
327+
when(md.getColumnLabel(eq(1))).thenReturn("col1");
328+
ResultSet resultSet = mock(ResultSet.class);
329+
when(resultSet.getMetaData()).thenReturn(md);
330+
when(stmt.executeQuery()).thenReturn(resultSet);
331+
332+
Method method = DataxTask.class.getDeclaredMethod("buildDataxJobContentJson");
333+
method.setAccessible(true);
334+
Object invokeResult = method.invoke(dataxTask);
335+
Assertions.assertNotNull(invokeResult);
336+
List<?> result = (List<?>) invokeResult;
337+
338+
Assertions.assertEquals(1, result.size());
339+
ObjectNode contentNode = (ObjectNode) result.get(0);
340+
ObjectNode writerParam = (ObjectNode) contentNode.get("writer").get("parameter");
341+
Assertions.assertTrue(writerParam.has("batchSize"));
342+
Assertions.assertEquals(1024, writerParam.get("batchSize").asInt());
343+
}
344+
}
345+
276346
private String getJsonString() {
277347
return "{\n" +
278348
" \"job\": {\n" +

dolphinscheduler-ui/src/locales/en_US/project.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -648,6 +648,8 @@ export default {
648648
datax_target_database_pre_sql: 'Pre SQL Statement',
649649
datax_target_database_post_sql: 'Post SQL Statement',
650650
datax_non_query_sql_tips: 'Please enter the non-query sql statement',
651+
datax_writer_batch_size: 'Writer Batch Size',
652+
datax_writer_batch_size_tips: '0 or empty = default 2048',
651653
datax_job_speed_byte: 'Speed(Byte count)',
652654
datax_job_speed_byte_info: '(0 means unlimited)',
653655
datax_job_speed_record: 'Speed(Record count)',

dolphinscheduler-ui/src/locales/zh_CN/project.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,8 @@ export default {
629629
datax_target_database_pre_sql: '目标库前置SQL',
630630
datax_target_database_post_sql: '目标库后置SQL',
631631
datax_non_query_sql_tips: '请输入非查询SQL语句',
632+
datax_writer_batch_size: 'Writer 批量大小',
633+
datax_writer_batch_size_tips: '0 或留空为默认值 2048',
632634
datax_job_speed_byte: '限流(字节数)',
633635
datax_job_speed_byte_info: '(KB,0代表不限制)',
634636
datax_job_speed_record: '限流(记录数)',

dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datax.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,18 @@ export function useDataX(model: { [field: string]: any }): IJsonItem[] {
254254
autosize: { minRows: 1 }
255255
}
256256
},
257+
{
258+
type: 'input-number',
259+
field: 'batchSize',
260+
name: t('project.node.datax_writer_batch_size'),
261+
span: otherStatementSpan,
262+
props: {
263+
min: 0,
264+
step: 1024,
265+
placeholder: t('project.node.datax_writer_batch_size_tips')
266+
},
267+
value: null
268+
},
257269
{
258270
type: 'select',
259271
field: 'jobSpeedByte',

0 commit comments

Comments
 (0)