Skip to content

Commit d1fddfb

Browse files
authored
[Chore] Merge the DataSourcePluginManager and DataSourceProcessorManager (#17975)
1 parent e7bee8c commit d1fddfb

12 files changed

Lines changed: 64 additions & 161 deletions

File tree

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
2323
import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
2424
import org.apache.dolphinscheduler.dao.DaoConfiguration;
25-
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
25+
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourcePluginManager;
2626
import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration;
2727
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
2828
import org.apache.dolphinscheduler.registry.api.RegistryConfiguration;
@@ -57,7 +57,7 @@ public static void main(String[] args) {
5757
public void run(ApplicationReadyEvent readyEvent) {
5858
ServerLifeCycleManager.toRunning();
5959
log.info("Received spring application context ready event will load taskPlugin and write to DB");
60-
DataSourceProcessorProvider.initialize();
60+
DataSourcePluginManager.loadDataSourcePlugin();
6161
TaskPluginManager.loadTaskPlugin();
6262
}
6363
}

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/BaseAdHocDataSourceClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.dolphinscheduler.plugin.datasource.api.client;
1919

20-
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
20+
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourcePluginManager;
2121
import org.apache.dolphinscheduler.spi.datasource.AdHocDataSourceClient;
2222
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
2323
import org.apache.dolphinscheduler.spi.enums.DbType;
@@ -38,7 +38,7 @@ protected BaseAdHocDataSourceClient(BaseConnectionParam baseConnectionParam, DbT
3838
@Override
3939
public Connection getConnection() throws SQLException {
4040
try {
41-
return DataSourceProcessorProvider.getDataSourceProcessor(dbType).getConnection(baseConnectionParam);
41+
return DataSourcePluginManager.getDataSourceProcessor(dbType).getConnection(baseConnectionParam);
4242
} catch (Exception e) {
4343
throw new SQLException("Create adhoc connection error", e);
4444
}

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,18 +57,13 @@ public class DataSourceClientProvider {
5757
})
5858
.maximumSize(100)
5959
.build();
60-
private static final DataSourcePluginManager dataSourcePluginManager = new DataSourcePluginManager();
61-
62-
static {
63-
dataSourcePluginManager.installPlugin();
64-
}
6560

6661
public static DataSourceClient getPooledDataSourceClient(DbType dbType,
6762
ConnectionParam connectionParam) throws ExecutionException {
6863
BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam;
6964
String datasourceUniqueId = DataSourceUtils.getDatasourceUniqueId(baseConnectionParam, dbType);
7065
return POOLED_DATASOURCE_CLIENT_CACHE.get(datasourceUniqueId, () -> {
71-
DataSourceChannel dataSourceChannel = dataSourcePluginManager.getDataSourceChannel(dbType);
66+
DataSourceChannel dataSourceChannel = DataSourcePluginManager.getDataSourceChannel(dbType);
7267
if (null == dataSourceChannel) {
7368
throw new RuntimeException(String.format("datasource plugin '%s' is not found", dbType.getName()));
7469
}
@@ -83,7 +78,7 @@ public static Connection getPooledConnection(DbType dbType,
8378

8479
public static AdHocDataSourceClient getAdHocDataSourceClient(DbType dbType, ConnectionParam connectionParam) {
8580
BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam;
86-
DataSourceChannel dataSourceChannel = dataSourcePluginManager.getDataSourceChannel(dbType);
81+
DataSourceChannel dataSourceChannel = DataSourcePluginManager.getDataSourceChannel(dbType);
8782
if (null == dataSourceChannel) {
8883
throw new RuntimeException(String.format("datasource plugin '%s' is not found", dbType.getName()));
8984
}

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourcePluginManager.java

Lines changed: 47 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,47 +19,74 @@
1919

2020
import static java.lang.String.format;
2121

22+
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
2223
import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
2324
import org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory;
2425
import org.apache.dolphinscheduler.spi.enums.DbType;
2526
import org.apache.dolphinscheduler.spi.plugin.PrioritySPIFactory;
2627

28+
import org.apache.commons.collections4.MapUtils;
29+
2730
import java.util.Map;
31+
import java.util.ServiceLoader;
2832
import java.util.concurrent.ConcurrentHashMap;
2933

34+
import lombok.NonNull;
3035
import lombok.extern.slf4j.Slf4j;
3136

3237
@Slf4j
3338
public class DataSourcePluginManager {
3439

35-
private final Map<String, DataSourceChannel> datasourceChannelMap = new ConcurrentHashMap<>();
36-
37-
public DataSourceChannel getDataSourceChannel(final DbType dbType) {
38-
return datasourceChannelMap.get(dbType.getName());
39-
}
40+
private static final Map<String, DataSourceChannel> datasourceChannelMap = new ConcurrentHashMap<>();
4041

41-
public void installPlugin() {
42+
private static final Map<String, DataSourceProcessor> dataSourceProcessorMap = new ConcurrentHashMap<>();
4243

43-
PrioritySPIFactory<DataSourceChannelFactory> prioritySPIFactory =
44-
new PrioritySPIFactory<>(DataSourceChannelFactory.class);
45-
for (Map.Entry<String, DataSourceChannelFactory> entry : prioritySPIFactory.getSPIMap().entrySet()) {
46-
final DataSourceChannelFactory factory = entry.getValue();
47-
final String name = entry.getKey();
44+
static {
45+
loadDataSourcePlugin();
46+
}
4847

49-
log.info("Registering datasource plugin: {}", name);
48+
public static DataSourceChannel getDataSourceChannel(@NonNull DbType dbType) {
49+
return datasourceChannelMap.get(dbType.getName());
50+
}
5051

51-
if (datasourceChannelMap.containsKey(name)) {
52-
throw new IllegalStateException(format("Duplicate datasource plugins named '%s'", name));
53-
}
52+
public static DataSourceProcessor getDataSourceProcessor(@NonNull DbType dbType) {
53+
return dataSourceProcessorMap.get(dbType.getName());
54+
}
5455

55-
loadDatasourceClient(factory);
56+
public static void loadDataSourcePlugin() {
57+
initializeDataSourceChannel();
58+
initializeDataSourceProcessor();
59+
}
5660

57-
log.info("Registered datasource plugin: {}", name);
61+
private static synchronized void initializeDataSourceChannel() {
62+
if (MapUtils.isNotEmpty(datasourceChannelMap)) {
63+
return;
5864
}
65+
new PrioritySPIFactory<>(DataSourceChannelFactory.class).getSPIMap().forEach(
66+
(dataSourceChannelName, dataSourceChannelFactory) -> {
67+
if (datasourceChannelMap.containsKey(dataSourceChannelName)) {
68+
throw new IllegalStateException(
69+
format("Duplicate datasource channel named '%s'", dataSourceChannelName));
70+
}
71+
datasourceChannelMap.put(dataSourceChannelName, dataSourceChannelFactory.create());
72+
log.info("Registered datasource channel: {}", dataSourceChannelName);
73+
});
5974
}
6075

61-
private void loadDatasourceClient(DataSourceChannelFactory datasourceChannelFactory) {
62-
DataSourceChannel datasourceChannel = datasourceChannelFactory.create();
63-
datasourceChannelMap.put(datasourceChannelFactory.getName(), datasourceChannel);
76+
private static synchronized void initializeDataSourceProcessor() {
77+
if (MapUtils.isNotEmpty(dataSourceProcessorMap)) {
78+
return;
79+
}
80+
81+
ServiceLoader.load(DataSourceProcessor.class).forEach(factory -> {
82+
final String name = factory.getDbType().getName();
83+
if (dataSourceProcessorMap.containsKey(name)) {
84+
throw new IllegalStateException(format("Duplicate datasource processor named '%s'", name));
85+
}
86+
DataSourceProcessor dataSourceProcessor = factory.create();
87+
dataSourceProcessorMap.put(name, dataSourceProcessor);
88+
log.info("Success register datasource processor -> {}", name);
89+
});
6490
}
91+
6592
}

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorManager.java

Lines changed: 0 additions & 58 deletions
This file was deleted.

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java

Lines changed: 0 additions & 52 deletions
This file was deleted.

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/DataSourceUtils.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,11 @@
2020
import org.apache.dolphinscheduler.common.utils.JSONUtils;
2121
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
2222
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
23-
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
23+
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourcePluginManager;
2424
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
2525
import org.apache.dolphinscheduler.spi.enums.DbType;
2626

2727
import java.sql.Connection;
28-
import java.util.Map;
2928

3029
import lombok.extern.slf4j.Slf4j;
3130

@@ -75,12 +74,7 @@ public static BaseDataSourceParamDTO buildDatasourceParamDTO(DbType dbType, Stri
7574
}
7675

7776
public static DataSourceProcessor getDatasourceProcessor(DbType dbType) {
78-
Map<String, DataSourceProcessor> dataSourceProcessorMap =
79-
DataSourceProcessorProvider.getDataSourceProcessorMap();
80-
if (!dataSourceProcessorMap.containsKey(dbType.name())) {
81-
throw new IllegalArgumentException("illegal datasource type");
82-
}
83-
return dataSourceProcessorMap.get(dbType.name());
77+
return DataSourcePluginManager.getDataSourceProcessor(dbType);
8478
}
8579

8680
/**

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-databend/src/test/java/org/apache/dolphinscheduler/plugin/datasource/databend/param/DatabendDataSourceProcessorTest.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,12 @@
3030

3131
import org.junit.jupiter.api.Assertions;
3232
import org.junit.jupiter.api.Test;
33-
import org.junit.jupiter.api.extension.ExtendWith;
3433
import org.mockito.MockedStatic;
3534
import org.mockito.Mockito;
36-
import org.mockito.junit.jupiter.MockitoExtension;
3735

38-
@ExtendWith(MockitoExtension.class)
3936
public class DatabendDataSourceProcessorTest {
4037

41-
private DatabendDataSourceProcessor databendDataSourceProcessor = new DatabendDataSourceProcessor();
38+
private final DatabendDataSourceProcessor databendDataSourceProcessor = new DatabendDataSourceProcessor();
4239

4340
@Test
4441
public void testCheckDatasourceParam() {

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.apache.dolphinscheduler.dao.DaoConfiguration;
2727
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
2828
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
29-
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
29+
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourcePluginManager;
3030
import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration;
3131
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
3232
import org.apache.dolphinscheduler.registry.api.RegistryConfiguration;
@@ -123,7 +123,7 @@ public void initialized() {
123123

124124
// install task plugin
125125
TaskPluginManager.loadTaskPlugin();
126-
DataSourceProcessorProvider.initialize();
126+
DataSourcePluginManager.loadDataSourcePlugin();
127127

128128
// self tolerant
129129
this.masterRegistryClient.start();

dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.apache.dolphinscheduler.common.utils.JSONUtils;
2424
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
2525
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
26-
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
26+
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourcePluginManager;
2727
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
2828
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
2929
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
@@ -95,7 +95,7 @@ public void handle(TaskCallBack taskCallBack) throws TaskException {
9595
procedureParameters.getLocalParams());
9696

9797
DbType dbType = DbType.valueOf(procedureParameters.getType());
98-
DataSourceProcessor dataSourceProcessor = DataSourceProcessorProvider.getDataSourceProcessor(dbType);
98+
DataSourceProcessor dataSourceProcessor = DataSourcePluginManager.getDataSourceProcessor(dbType);
9999
ConnectionParam connectionParams =
100100
dataSourceProcessor.createConnectionParams(procedureTaskExecutionContext.getConnectionParams());
101101
try (Connection connection = DataSourceClientProvider.getAdHocConnection(dbType, connectionParams)) {

0 commit comments

Comments
 (0)