diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java index 9663fa371e95a..136399a3778b0 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java @@ -117,7 +117,7 @@ private static String getWindowsSearchPortCmd(final List ports) { } private static String getUnixSearchPortCmd(final List ports) { - return "lsof -iTCP -sTCP:LISTEN -P -n | awk '{print $9}' | grep -E " + return "lsof -iTCP -sTCP:LISTEN,TIME_WAIT -P -n | awk '{print $9}' | grep -E " + ports.stream().map(String::valueOf).collect(Collectors.joining("|")) + "\""; } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppBaseConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppBaseConfig.java index 473d2f1b64c7f..975ae1a5682df 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppBaseConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppBaseConfig.java @@ -40,6 +40,8 @@ public abstract class MppBaseConfig { /** Create an empty MppPersistentConfig. */ protected MppBaseConfig() { this.properties = new Properties(); + this.properties.setProperty("cn_selector_thread_nums_of_client_manager", "1"); + this.properties.setProperty("dn_selector_thread_nums_of_client_manager", "1"); } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java index e09ccc79becbf..8ec455142b8ba 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.ainode.AsyncAINodeServiceClient; import org.apache.iotdb.confignode.client.async.handlers.heartbeat.AINodeHeartbeatHandler; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; public class AsyncAINodeHeartbeatClientPool { @@ -34,7 +35,8 @@ private AsyncAINodeHeartbeatClientPool() { clientManager = new IClientManager.Factory() .createClientManager( - new ClientPoolFactory.AsyncAINodeHeartbeatServiceClientPoolFactory()); + new ClientPoolFactory.AsyncAINodeHeartbeatServiceClientPoolFactory( + ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager())); } public void getAINodeHeartBeat( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java index 7b6bca5d0d9e7..a6dffbe0eef63 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.async.AsyncConfigNodeInternalServiceClient; import org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeHeartbeatReq; public class AsyncConfigNodeHeartbeatClientPool { @@ -34,7 +35,8 @@ private AsyncConfigNodeHeartbeatClientPool() { clientManager = new IClientManager.Factory() .createClientManager( - new ClientPoolFactory.AsyncConfigNodeHeartbeatServiceClientPoolFactory()); + new ClientPoolFactory.AsyncConfigNodeHeartbeatServiceClientPoolFactory( + ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager())); } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java index 8d67d150efbbb..18a8120a9e459 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq; /** Asynchronously send RPC requests to DataNodes. See queryengine.thrift for more details. */ @@ -35,7 +36,8 @@ private AsyncDataNodeHeartbeatClientPool() { clientManager = new IClientManager.Factory() .createClientManager( - new ClientPoolFactory.AsyncDataNodeHeartbeatServiceClientPoolFactory()); + new ClientPoolFactory.AsyncDataNodeHeartbeatServiceClientPoolFactory( + ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager())); } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToCnInternalServiceAsyncRequestManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToCnInternalServiceAsyncRequestManager.java index 19eaf9d9a40d2..00267e5a8ccef 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToCnInternalServiceAsyncRequestManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToCnInternalServiceAsyncRequestManager.java @@ -30,6 +30,7 @@ import org.apache.iotdb.confignode.client.async.handlers.rpc.ConfigNodeAsyncRequestRPCHandler; import org.apache.iotdb.confignode.client.async.handlers.rpc.ConfigNodeTSStatusRPCHandler; import org.apache.iotdb.confignode.client.async.handlers.rpc.SubmitTestConnectionTaskToConfigNodeRPCHandler; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +41,10 @@ public class CnToCnInternalServiceAsyncRequestManager private static final Logger LOGGER = LoggerFactory.getLogger(CnToCnInternalServiceAsyncRequestManager.class); + public CnToCnInternalServiceAsyncRequestManager(int selectorNumOfAsyncClientManager) { + super(selectorNumOfAsyncClientManager); + } + @Override protected void initActionMapBuilder() { actionMapBuilder.put( @@ -71,7 +76,8 @@ protected void adjustClientTimeoutIfNecessary( private static class ClientPoolHolder { private static final CnToCnInternalServiceAsyncRequestManager INSTANCE = - new CnToCnInternalServiceAsyncRequestManager(); + new CnToCnInternalServiceAsyncRequestManager( + ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager()); private ClientPoolHolder() { // Empty constructor diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java index b9aac92a16504..e4dcaad2bfb96 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java @@ -46,6 +46,7 @@ import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.CheckSchemaRegionUsingTemplateRPCHandler; import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.ConsumerGroupPushMetaRPCHandler; import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.TopicPushMetaRPCHandler; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq; import org.apache.iotdb.mpp.rpc.thrift.TAlterViewReq; import org.apache.iotdb.mpp.rpc.thrift.TCheckSchemaRegionUsingTemplateReq; @@ -105,6 +106,10 @@ public class CnToDnInternalServiceAsyncRequestManager private static final Logger LOGGER = LoggerFactory.getLogger(CnToDnInternalServiceAsyncRequestManager.class); + private CnToDnInternalServiceAsyncRequestManager() { + super(ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager()); + } + @SuppressWarnings("unchecked") @Override protected void initActionMapBuilder() { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index 03d12ee46a696..77b353ea59d4d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -136,6 +136,17 @@ public class ConfigNodeConfig { */ private int maxClientNumForEachNode = DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE; + private int maxIdleClientNumForEachNode = DefaultProperty.MAX_IDLE_CLIENT_NUM_FOR_EACH_NODE; + + /** + * ClientManager will have so many selector threads (TAsyncClientManager) to distribute to its + * clients. + */ + private int selectorNumOfClientManager = + Runtime.getRuntime().availableProcessors() / 4 > 0 + ? Runtime.getRuntime().availableProcessors() / 4 + : 1; + /** System directory, including version file for each database and metadata. */ private String systemDir = IoTDBConstant.CN_DEFAULT_DATA_DIR + File.separator + IoTDBConstant.SYSTEM_FOLDER_NAME; @@ -448,6 +459,23 @@ public ConfigNodeConfig setMaxClientNumForEachNode(int maxClientNumForEachNode) return this; } + public int getMaxIdleClientNumForEachNode() { + return maxIdleClientNumForEachNode; + } + + public ConfigNodeConfig setMaxIdleClientNumForEachNode(int maxIdleClientNumForEachNode) { + this.maxIdleClientNumForEachNode = maxIdleClientNumForEachNode; + return this; + } + + public int getSelectorNumOfClientManager() { + return selectorNumOfClientManager; + } + + public void setSelectorNumOfClientManager(int selectorNumOfClientManager) { + this.selectorNumOfClientManager = selectorNumOfClientManager; + } + public String getConsensusDir() { return consensusDir; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index 614bae210dfa1..83cf1b612d01d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@ -299,6 +299,24 @@ private void loadProperties(TrimProperties properties) throws BadNodeUrlExceptio String.valueOf(conf.getMaxClientNumForEachNode())) .trim())); + int cnMaxIdleClientNumForEachNode = + Integer.parseInt( + properties.getProperty( + "cn_max_idle_client_count_for_each_node_in_client_manager", + String.valueOf(conf.getMaxIdleClientNumForEachNode()))); + if (cnMaxIdleClientNumForEachNode >= 0) { + conf.setMaxIdleClientNumForEachNode(cnMaxIdleClientNumForEachNode); + } + + int cnSelectorNumOfClientManager = + Integer.parseInt( + properties.getProperty( + "cn_selector_thread_nums_of_client_manager", + String.valueOf(conf.getSelectorNumOfClientManager()))); + if (cnSelectorNumOfClientManager > 0) { + conf.setSelectorNumOfClientManager(cnSelectorNumOfClientManager); + } + conf.setSystemDir(properties.getProperty("cn_system_dir", conf.getSystemDir()).trim()); conf.setConsensusDir(properties.getProperty("cn_consensus_dir", conf.getConsensusDir()).trim()); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java index 6c89e19291b35..25b084c5329d5 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java @@ -70,8 +70,6 @@ public Builder setReplication(Replication replication) { public static class RPC { - private final int rpcSelectorThreadNum; - private final int rpcMinConcurrentClientNum; private final int rpcMaxConcurrentClientNum; private final int thriftServerAwaitTimeForStopService; private final boolean isRpcThriftCompressionEnabled; @@ -83,8 +81,6 @@ public static class RPC { private final int maxClientNumForEachNode; private RPC( - int rpcSelectorThreadNum, - int rpcMinConcurrentClientNum, int rpcMaxConcurrentClientNum, int thriftServerAwaitTimeForStopService, boolean isRpcThriftCompressionEnabled, @@ -93,8 +89,6 @@ private RPC( boolean printLogWhenThriftClientEncounterException, int thriftMaxFrameSize, int maxClientNumForEachNode) { - this.rpcSelectorThreadNum = rpcSelectorThreadNum; - this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum; this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum; this.thriftServerAwaitTimeForStopService = thriftServerAwaitTimeForStopService; this.isRpcThriftCompressionEnabled = isRpcThriftCompressionEnabled; @@ -105,14 +99,6 @@ private RPC( this.maxClientNumForEachNode = maxClientNumForEachNode; } - public int getRpcSelectorThreadNum() { - return rpcSelectorThreadNum; - } - - public int getRpcMinConcurrentClientNum() { - return rpcMinConcurrentClientNum; - } - public int getRpcMaxConcurrentClientNum() { return rpcMaxConcurrentClientNum; } @@ -151,7 +137,6 @@ public static RPC.Builder newBuilder() { public static class Builder { - private int rpcSelectorThreadNum = 1; private int rpcMinConcurrentClientNum = Runtime.getRuntime().availableProcessors(); private int rpcMaxConcurrentClientNum = 65535; private int thriftServerAwaitTimeForStopService = 60; @@ -163,11 +148,6 @@ public static class Builder { private int thriftMaxFrameSize = 536870912; private int maxClientNumForEachNode = DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE; - public RPC.Builder setRpcSelectorThreadNum(int rpcSelectorThreadNum) { - this.rpcSelectorThreadNum = rpcSelectorThreadNum; - return this; - } - public RPC.Builder setRpcMinConcurrentClientNum(int rpcMinConcurrentClientNum) { this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum; return this; @@ -218,8 +198,6 @@ public Builder setMaxClientNumForEachNode(int maxClientNumForEachNode) { public RPC build() { return new RPC( - rpcSelectorThreadNum, - rpcMinConcurrentClientNum, rpcMaxConcurrentClientNum, thriftServerAwaitTimeForStopService, isRpcThriftCompressionEnabled, diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java index 2cb149b601b01..81c6e53e73a12 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java @@ -79,8 +79,6 @@ public PipeConsensusConfig build() { } public static class RPC { - private final int rpcSelectorThreadNum; - private final int rpcMinConcurrentClientNum; private final int rpcMaxConcurrentClientNum; private final int thriftServerAwaitTimeForStopService; private final boolean isRpcThriftCompressionEnabled; @@ -88,15 +86,11 @@ public static class RPC { private final int thriftMaxFrameSize; public RPC( - int rpcSelectorThreadNum, - int rpcMinConcurrentClientNum, int rpcMaxConcurrentClientNum, int thriftServerAwaitTimeForStopService, boolean isRpcThriftCompressionEnabled, int connectionTimeoutInMs, int thriftMaxFrameSize) { - this.rpcSelectorThreadNum = rpcSelectorThreadNum; - this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum; this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum; this.thriftServerAwaitTimeForStopService = thriftServerAwaitTimeForStopService; this.isRpcThriftCompressionEnabled = isRpcThriftCompressionEnabled; @@ -104,14 +98,6 @@ public RPC( this.thriftMaxFrameSize = thriftMaxFrameSize; } - public int getRpcSelectorThreadNum() { - return rpcSelectorThreadNum; - } - - public int getRpcMinConcurrentClientNum() { - return rpcMinConcurrentClientNum; - } - public int getRpcMaxConcurrentClientNum() { return rpcMaxConcurrentClientNum; } @@ -137,24 +123,12 @@ public static RPC.Builder newBuilder() { } public static class Builder { - private int rpcSelectorThreadNum = 1; - private int rpcMinConcurrentClientNum = Runtime.getRuntime().availableProcessors(); private int rpcMaxConcurrentClientNum = 65535; private int thriftServerAwaitTimeForStopService = 60; private boolean isRpcThriftCompressionEnabled = false; private int connectionTimeoutInMs = (int) TimeUnit.SECONDS.toMillis(60); private int thriftMaxFrameSize = 536870912; - public RPC.Builder setRpcSelectorThreadNum(int rpcSelectorThreadNum) { - this.rpcSelectorThreadNum = rpcSelectorThreadNum; - return this; - } - - public RPC.Builder setRpcMinConcurrentClientNum(int rpcMinConcurrentClientNum) { - this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum; - return this; - } - public RPC.Builder setRpcMaxConcurrentClientNum(int rpcMaxConcurrentClientNum) { this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum; return this; @@ -183,8 +157,6 @@ public RPC.Builder setThriftMaxFrameSize(int thriftMaxFrameSize) { public RPC build() { return new RPC( - rpcSelectorThreadNum, - rpcMinConcurrentClientNum, rpcMaxConcurrentClientNum, thriftServerAwaitTimeForStopService, isRpcThriftCompressionEnabled, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 5e89c4ec083a8..d3f13d1439232 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -139,12 +139,6 @@ public class IoTDBConfig { /** ssl key Store password. */ private String keyStorePwd = ""; - /** Rpc Selector thread num */ - private int rpcSelectorThreadCount = 1; - - /** Min concurrent client number */ - private int rpcMinConcurrentClientNum = Runtime.getRuntime().availableProcessors(); - /** Max concurrent client number */ private int rpcMaxConcurrentClientNum = 1000; @@ -999,6 +993,8 @@ public class IoTDBConfig { */ private int maxClientNumForEachNode = DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE; + private int maxIdleClientNumForEachNode = DefaultProperty.MAX_IDLE_CLIENT_NUM_FOR_EACH_NODE; + /** * Cache size of partition cache in {@link * org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher} @@ -1035,9 +1031,6 @@ public class IoTDBConfig { /** ThreadPool size for read operation in coordinator */ private int coordinatorReadExecutorSize = 20; - /** ThreadPool size for write operation in coordinator */ - private int coordinatorWriteExecutorSize = 50; - private int[] schemaMemoryProportion = new int[] {5, 4, 1}; /** Memory allocated for schemaRegion */ @@ -1872,22 +1865,6 @@ public void setUnSeqTsFileSize(long unSeqTsFileSize) { this.unSeqTsFileSize = unSeqTsFileSize; } - public int getRpcSelectorThreadCount() { - return rpcSelectorThreadCount; - } - - public void setRpcSelectorThreadCount(int rpcSelectorThreadCount) { - this.rpcSelectorThreadCount = rpcSelectorThreadCount; - } - - public int getRpcMinConcurrentClientNum() { - return rpcMinConcurrentClientNum; - } - - public void setRpcMinConcurrentClientNum(int rpcMinConcurrentClientNum) { - this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum; - } - public int getRpcMaxConcurrentClientNum() { return rpcMaxConcurrentClientNum; } @@ -3338,6 +3315,14 @@ public void setMaxClientNumForEachNode(int maxClientNumForEachNode) { this.maxClientNumForEachNode = maxClientNumForEachNode; } + public int getMaxIdleClientNumForEachNode() { + return maxIdleClientNumForEachNode; + } + + public void setMaxIdleClientNumForEachNode(int maxIdleClientNumForEachNode) { + this.maxIdleClientNumForEachNode = maxIdleClientNumForEachNode; + } + public int getSelectorNumOfClientManager() { return selectorNumOfClientManager; } @@ -3484,14 +3469,6 @@ public void setCoordinatorReadExecutorSize(int coordinatorReadExecutorSize) { this.coordinatorReadExecutorSize = coordinatorReadExecutorSize; } - public int getCoordinatorWriteExecutorSize() { - return coordinatorWriteExecutorSize; - } - - public void setCoordinatorWriteExecutorSize(int coordinatorWriteExecutorSize) { - this.coordinatorWriteExecutorSize = coordinatorWriteExecutorSize; - } - public TEndPoint getAddressAndPort() { return new TEndPoint(rpcAddress, rpcPort); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index e965611016de9..b01fca1a88f82 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -287,13 +287,23 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException String.valueOf(conf.getMaxClientNumForEachNode())) .trim())); - conf.setSelectorNumOfClientManager( + int dnMaxIdleClientNumForEachNode = Integer.parseInt( - properties - .getProperty( - "dn_selector_thread_count_of_client_manager", - String.valueOf(conf.getSelectorNumOfClientManager())) - .trim())); + properties.getProperty( + "dn_max_idle_client_count_for_each_node_in_client_manager", + String.valueOf(conf.getMaxIdleClientNumForEachNode()))); + if (dnMaxIdleClientNumForEachNode >= 0) { + conf.setMaxIdleClientNumForEachNode(dnMaxIdleClientNumForEachNode); + } + + int dnSelectorNumOfClientManager = + Integer.parseInt( + properties.getProperty( + "dn_selector_thread_nums_of_client_manager", + String.valueOf(conf.getSelectorNumOfClientManager()))); + if (dnSelectorNumOfClientManager > 0) { + conf.setSelectorNumOfClientManager(dnSelectorNumOfClientManager); + } conf.setRpcPort( Integer.parseInt( @@ -775,28 +785,6 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException properties.getProperty( "0.13_data_insert_adapt", String.valueOf(conf.isEnable13DataInsertAdapt())))); - int rpcSelectorThreadNum = - Integer.parseInt( - properties.getProperty( - "dn_rpc_selector_thread_count", - Integer.toString(conf.getRpcSelectorThreadCount()).trim())); - if (rpcSelectorThreadNum <= 0) { - rpcSelectorThreadNum = 1; - } - - conf.setRpcSelectorThreadCount(rpcSelectorThreadNum); - - int minConcurrentClientNum = - Integer.parseInt( - properties.getProperty( - "dn_rpc_min_concurrent_client_num", - Integer.toString(conf.getRpcMinConcurrentClientNum()).trim())); - if (minConcurrentClientNum <= 0) { - minConcurrentClientNum = Runtime.getRuntime().availableProcessors(); - } - - conf.setRpcMinConcurrentClientNum(minConcurrentClientNum); - int maxConcurrentClientNum = Integer.parseInt( properties.getProperty( @@ -1024,11 +1012,6 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException properties.getProperty( "coordinator_read_executor_size", Integer.toString(conf.getCoordinatorReadExecutorSize())))); - conf.setCoordinatorWriteExecutorSize( - Integer.parseInt( - properties.getProperty( - "coordinator_write_executor_size", - Integer.toString(conf.getCoordinatorWriteExecutorSize())))); // Commons commonDescriptor.loadCommonProps(properties); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java index 7dbc720a4a852..fa8e1efc6d8ed 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java @@ -123,8 +123,6 @@ private static ConsensusConfig buildConsensusConfig() { .setRpc( RPC.newBuilder() .setConnectionTimeoutInMs(CONF.getConnectionTimeoutInMS()) - .setRpcSelectorThreadNum(CONF.getRpcSelectorThreadCount()) - .setRpcMinConcurrentClientNum(CONF.getRpcMinConcurrentClientNum()) .setRpcMaxConcurrentClientNum(CONF.getRpcMaxConcurrentClientNum()) .setRpcThriftCompressionEnabled(CONF.isRpcThriftCompressionEnable()) .setSelectorNumOfClientManager(CONF.getSelectorNumOfClientManager()) @@ -151,8 +149,6 @@ private static ConsensusConfig buildConsensusConfig() { PipeConsensusConfig.RPC .newBuilder() .setConnectionTimeoutInMs(CONF.getConnectionTimeoutInMS()) - .setRpcSelectorThreadNum(CONF.getRpcSelectorThreadCount()) - .setRpcMinConcurrentClientNum(CONF.getRpcMinConcurrentClientNum()) .setRpcMaxConcurrentClientNum(CONF.getRpcMaxConcurrentClientNum()) .setIsRpcThriftCompressionEnabled(CONF.isRpcThriftCompressionEnable()) .setThriftServerAwaitTimeForStopService( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java index b5f5df430129f..102b1d2cbad94 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java @@ -61,32 +61,4 @@ public GenericKeyedObjectPool createClientPool return clientPool; } } - - public static class ClusterDeletionConfigNodeClientPoolFactory - implements IClientPoolFactory { - - @Override - public GenericKeyedObjectPool createClientPool( - ClientManager manager) { - GenericKeyedObjectPool clientPool = - new GenericKeyedObjectPool<>( - new ConfigNodeClient.Factory( - manager, - new ThriftClientProperty.Builder() - .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS() * 10) - .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable()) - .setSelectorNumOfAsyncClientManager( - conf.getSelectorNumOfClientManager() / 10 > 0 - ? conf.getSelectorNumOfClientManager() / 10 - : 1) - .build()), - new ClientPoolProperty.Builder() - .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode()) - .build() - .getConfig()); - ClientManagerMetrics.getInstance() - .registerClientManager(this.getClass().getSimpleName(), clientPool); - return clientPool; - } - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/cn/DnToCnInternalServiceAsyncRequestManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/cn/DnToCnInternalServiceAsyncRequestManager.java index b6a218a732e2b..9ac6b70841136 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/cn/DnToCnInternalServiceAsyncRequestManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/cn/DnToCnInternalServiceAsyncRequestManager.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler; import org.apache.iotdb.commons.client.request.ConfigNodeInternalServiceAsyncRequestManager; import org.apache.iotdb.commons.client.request.TestConnectionUtils; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +36,10 @@ public class DnToCnInternalServiceAsyncRequestManager private static final Logger LOGGER = LoggerFactory.getLogger(DnToCnInternalServiceAsyncRequestManager.class); + public DnToCnInternalServiceAsyncRequestManager() { + super(IoTDBDescriptor.getInstance().getConfig().getSelectorNumOfClientManager()); + } + @Override protected void initActionMapBuilder() { actionMapBuilder.put( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeExternalServiceAsyncRequestManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeExternalServiceAsyncRequestManager.java index dd56e1366c03b..29c3f76790716 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeExternalServiceAsyncRequestManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeExternalServiceAsyncRequestManager.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.client.request.AsyncRequestContext; import org.apache.iotdb.commons.client.request.AsyncRequestManager; import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,12 +39,17 @@ public class DataNodeExternalServiceAsyncRequestManager private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeExternalServiceAsyncRequestManager.class); + private DataNodeExternalServiceAsyncRequestManager() { + super(IoTDBDescriptor.getInstance().getConfig().getSelectorNumOfClientManager()); + } + @Override - protected void initClientManager() { + protected void initClientManager(int selectorNumOfAsyncClientManager) { clientManager = new IClientManager.Factory() .createClientManager( - new ClientPoolFactory.AsyncDataNodeExternalServiceClientPoolFactory()); + new ClientPoolFactory.AsyncDataNodeExternalServiceClientPoolFactory( + selectorNumOfAsyncClientManager)); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeMPPServiceAsyncRequestManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeMPPServiceAsyncRequestManager.java index ab08d83f2645b..e4d190e1571a4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeMPPServiceAsyncRequestManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeMPPServiceAsyncRequestManager.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.client.request.AsyncRequestContext; import org.apache.iotdb.commons.client.request.AsyncRequestManager; import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,14 +38,17 @@ public class DataNodeMPPServiceAsyncRequestManager private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeMPPServiceAsyncRequestManager.class); - public DataNodeMPPServiceAsyncRequestManager() {} + public DataNodeMPPServiceAsyncRequestManager() { + super(IoTDBDescriptor.getInstance().getConfig().getSelectorNumOfClientManager()); + } @Override - protected void initClientManager() { + protected void initClientManager(int selectorNumOfAsyncClientManager) { clientManager = new IClientManager.Factory() .createClientManager( - new ClientPoolFactory.AsyncDataNodeMPPDataExchangeServiceClientPoolFactory()); + new ClientPoolFactory.AsyncDataNodeMPPDataExchangeServiceClientPoolFactory( + selectorNumOfAsyncClientManager)); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DnToDnInternalServiceAsyncRequestManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DnToDnInternalServiceAsyncRequestManager.java index 88766458b649d..c2ca62cf15d48 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DnToDnInternalServiceAsyncRequestManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DnToDnInternalServiceAsyncRequestManager.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.client.request.AsyncRequestContext; import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler; import org.apache.iotdb.commons.client.request.DataNodeInternalServiceRequestManager; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +33,10 @@ public class DnToDnInternalServiceAsyncRequestManager private static final Logger LOGGER = LoggerFactory.getLogger(DnToDnInternalServiceAsyncRequestManager.class); + private DnToDnInternalServiceAsyncRequestManager() { + super(IoTDBDescriptor.getInstance().getConfig().getSelectorNumOfClientManager()); + } + @Override protected void initActionMapBuilder() { actionMapBuilder.put( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index 8afdfce7d5c65..2f173861f22e4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -92,10 +92,10 @@ public class Coordinator { ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER = new IClientManager.Factory() .createClientManager( - new ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory()); + new ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory( + CONFIG.getSelectorNumOfClientManager())); private final ExecutorService executor; - private final ExecutorService writeOperationExecutor; private final ScheduledExecutorService scheduledExecutor; private final ExecutorService dispatchExecutor; @@ -109,7 +109,6 @@ public class Coordinator { private Coordinator() { this.queryExecutionMap = new ConcurrentHashMap<>(); this.executor = getQueryExecutor(); - this.writeOperationExecutor = getWriteExecutor(); this.scheduledExecutor = getScheduledExecutor(); int dispatchThreadNum = Math.max(20, Runtime.getRuntime().availableProcessors() * 2); this.dispatchExecutor = @@ -220,8 +219,6 @@ private IQueryExecution createQueryExecutionForTreeModel( TreeModelPlanner treeModelPlanner = new TreeModelPlanner( statement, - executor, - writeOperationExecutor, scheduledExecutor, partitionFetcher, schemaFetcher, @@ -248,12 +245,6 @@ private ExecutorService getQueryExecutor() { coordinatorReadExecutorSize, ThreadName.MPP_COORDINATOR_EXECUTOR_POOL.getName()); } - private ExecutorService getWriteExecutor() { - int coordinatorWriteExecutorSize = CONFIG.getCoordinatorWriteExecutorSize(); - return IoTDBThreadPoolFactory.newFixedThreadPool( - coordinatorWriteExecutorSize, ThreadName.MPP_COORDINATOR_WRITE_EXECUTOR.getName()); - } - private ScheduledExecutorService getScheduledExecutor() { return IoTDBThreadPoolFactory.newScheduledThreadPool( COORDINATOR_SCHEDULED_EXECUTOR_SIZE, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 023f7e3379368..d582832864a8f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -143,7 +143,6 @@ import org.apache.iotdb.db.protocol.client.ConfigNodeClient; import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; -import org.apache.iotdb.db.protocol.client.DataNodeClientPoolFactory; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; import org.apache.iotdb.db.queryengine.plan.Coordinator; @@ -306,13 +305,6 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { private static final IClientManager CONFIG_NODE_CLIENT_MANAGER = ConfigNodeClientManager.getInstance(); - /** FIXME Consolidate this clientManager with the upper one. */ - private static final IClientManager - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER = - new IClientManager.Factory() - .createClientManager( - new DataNodeClientPoolFactory.ClusterDeletionConfigNodeClientPoolFactory()); - private static final class ClusterConfigTaskExecutorHolder { private static final ClusterConfigTaskExecutor INSTANCE = new ClusterConfigTaskExecutor(); @@ -1598,7 +1590,7 @@ public SettableFuture deactivateSchemaTemplate( req.setPathPatternTree( serializePatternListToByteBuffer(deactivateTemplateStatement.getPathPatternList())); try (final ConfigNodeClient client = - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { TSStatus tsStatus; do { try { @@ -1699,7 +1691,7 @@ public SettableFuture alterSchemaTemplate( alterSchemaTemplateStatement.getOperationType(), alterSchemaTemplateStatement.getTemplateAlterInfo())); try (final ConfigNodeClient client = - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { TSStatus tsStatus; do { try { @@ -1754,7 +1746,7 @@ public SettableFuture unsetSchemaTemplate( req.setTemplateName(unsetSchemaTemplateStatement.getTemplateName()); req.setPath(unsetSchemaTemplateStatement.getPath().getFullPath()); try (final ConfigNodeClient client = - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { TSStatus tsStatus; do { try { @@ -2406,7 +2398,7 @@ public SettableFuture deleteTimeSeries( queryId, serializePatternListToByteBuffer(deleteTimeSeriesStatement.getPathPatternList())); try (ConfigNodeClient client = - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { TSStatus tsStatus; do { try { @@ -2452,7 +2444,7 @@ public SettableFuture deleteLogicalView( queryId, serializePatternListToByteBuffer(deleteLogicalViewStatement.getPathPatternList())); try (ConfigNodeClient client = - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { TSStatus tsStatus; do { try { @@ -2541,7 +2533,7 @@ public SettableFuture renameLogicalView( new TDeleteLogicalViewReq( queryId, serializePatternListToByteBuffer(Collections.singletonList(oldName))); try (ConfigNodeClient client = - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { TSStatus tsStatus; do { try { @@ -2613,7 +2605,7 @@ public SettableFuture alterLogicalView( new TAlterLogicalViewReq( context.getQueryId().getId(), ByteBuffer.wrap(stream.toByteArray())); try (final ConfigNodeClient client = - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { TSStatus tsStatus; do { try { @@ -2675,7 +2667,7 @@ public TSStatus alterLogicalViewByPipe( .setIsGeneratedByPipe(shouldMarkAsPipeRequest); TSStatus tsStatus; try (ConfigNodeClient client = - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { do { try { tsStatus = client.alterLogicalView(req); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java index 1da701a8e3529..2812d81cf27fc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java @@ -48,15 +48,12 @@ import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; public class TreeModelPlanner implements IPlanner { private final Statement statement; - private final ExecutorService executor; - private final ExecutorService writeOperationExecutor; private final ScheduledExecutorService scheduledExecutor; private final IPartitionFetcher partitionFetcher; @@ -71,8 +68,6 @@ public class TreeModelPlanner implements IPlanner { public TreeModelPlanner( Statement statement, - ExecutorService executor, - ExecutorService writeOperationExecutor, ScheduledExecutorService scheduledExecutor, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher, @@ -80,8 +75,6 @@ public TreeModelPlanner( IClientManager asyncInternalServiceClientManager) { this.statement = statement; - this.executor = executor; - this.writeOperationExecutor = writeOperationExecutor; this.scheduledExecutor = scheduledExecutor; this.partitionFetcher = partitionFetcher; this.schemaFetcher = schemaFetcher; @@ -134,8 +127,6 @@ public IScheduler doSchedule( stateMachine, distributedPlan, context.getQueryType(), - executor, - writeOperationExecutor, scheduledExecutor, syncInternalServiceClientManager, asyncInternalServiceClientManager); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java index 33f1bd3a19455..283e480ae38f9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java @@ -39,7 +39,6 @@ import java.util.List; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; @@ -74,8 +73,6 @@ public ClusterScheduler( QueryStateMachine stateMachine, DistributedQueryPlan distributedQueryPlan, QueryType queryType, - ExecutorService executor, - ExecutorService writeOperationExecutor, ScheduledExecutorService scheduledExecutor, IClientManager syncInternalServiceClientManager, IClientManager @@ -88,8 +85,6 @@ public ClusterScheduler( new FragmentInstanceDispatcherImpl( queryType, queryContext, - executor, - writeOperationExecutor, syncInternalServiceClientManager, asyncInternalServiceClientManager); if (queryType == QueryType.READ) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index 41d325a5761a5..3312444f8822e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -65,7 +65,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -81,8 +80,6 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); - private final ExecutorService executor; - private final ExecutorService writeOperationExecutor; private final QueryType type; private final MPPQueryContext queryContext; private final String localhostIpAddr; @@ -104,15 +101,11 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { public FragmentInstanceDispatcherImpl( QueryType type, MPPQueryContext queryContext, - ExecutorService executor, - ExecutorService writeOperationExecutor, IClientManager syncInternalServiceClientManager, IClientManager asyncInternalServiceClientManager) { this.type = type; this.queryContext = queryContext; - this.executor = executor; - this.writeOperationExecutor = writeOperationExecutor; this.syncInternalServiceClientManager = syncInternalServiceClientManager; this.asyncInternalServiceClientManager = asyncInternalServiceClientManager; this.localhostIpAddr = IoTDBDescriptor.getInstance().getConfig().getInternalAddress(); diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index c7862e4886ec4..5447bcf5fd4ad 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -465,9 +465,10 @@ cn_rpc_max_concurrent_client_num=3000 cn_connection_timeout_ms=60000 # selector thread (TAsyncClientManager) nums for async thread in a clientManager +# When <= 0, use max(1, CPU core number / 4). # effectiveMode: restart # Datatype: int -cn_selector_thread_nums_of_client_manager=1 +cn_selector_thread_nums_of_client_manager=0 # The maximum number of clients that can be allocated for a node in a clientManager. # when the number of the client to a single node exceeds this number, the thread for applying for a client will be blocked @@ -476,6 +477,14 @@ cn_selector_thread_nums_of_client_manager=1 # Datatype: int cn_max_client_count_for_each_node_in_client_manager=1000 +# The maximum number of idle clients that can be retained for a node in a clientManager. +# When the number of idle clients to a single node exceeds this number, excess idle clients will be evicted. +# Idle clients are determined by a time threshold (default 1 minute of inactivity). +# 0 means no idle clients will be retained, connections are destroyed immediately upon return. +# effectiveMode: restart +# Datatype: int +# cn_max_idle_client_count_for_each_node_in_client_manager=1000 + # The maximum session idle time. unit: ms # Idle sessions are the ones that performs neither query or non-query operations for a period of time # Set to 0 to disable session timeout @@ -494,16 +503,6 @@ dn_rpc_thrift_compression_enable=false # this feature is under development, set this as false before it is done. dn_rpc_advanced_compression_enable=false -# the number of rpc selector -# effectiveMode: restart -# Datatype: int -dn_rpc_selector_thread_count=1 - -# The min number of concurrent clients that can be connected to the dataNode. -# effectiveMode: restart -# Datatype: int -dn_rpc_min_concurrent_client_num=1 - # The maximum number of concurrent clients that can be connected to the dataNode. # effectiveMode: restart # Datatype: int @@ -525,9 +524,10 @@ dn_thrift_init_buffer_size=1024 dn_connection_timeout_ms=60000 # selector thread (TAsyncClientManager) nums for async thread in a clientManager +# When <= 0, use max(1, CPU core number / 4). # effectiveMode: restart # Datatype: int -dn_selector_thread_count_of_client_manager=1 +dn_selector_thread_nums_of_client_manager=0 # The maximum number of clients that can be allocated for a node in a clientManager. # When the number of the client to a single node exceeds this number, the thread for applying for a client will be blocked @@ -536,6 +536,14 @@ dn_selector_thread_count_of_client_manager=1 # Datatype: int dn_max_client_count_for_each_node_in_client_manager=1000 +# The maximum number of idle clients that can be retained for a node in a clientManager. +# When the number of idle clients to a single node exceeds this number, excess idle clients will be evicted. +# Idle clients are determined by a time threshold (default 1 minute of inactivity). +# 0 means no idle clients will be retained, connections are destroyed immediately upon return. +# effectiveMode: restart +# Datatype: int +# dn_max_idle_client_count_for_each_node_in_client_manager=1000 + #################### ### REST Service Configuration #################### diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java index 3ff47a2c5e1d6..a70a6aa911d93 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java @@ -72,6 +72,12 @@ public GenericKeyedObjectPool createCli public static class AsyncConfigNodeInternalServiceClientPoolFactory implements IClientPoolFactory { + private final int selectorNumOfAsyncClientManager; + + public AsyncConfigNodeInternalServiceClientPoolFactory(int selectorNumOfAsyncClientManager) { + this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager; + } + @Override public GenericKeyedObjectPool createClientPool( ClientManager manager) { @@ -82,7 +88,7 @@ public GenericKeyedObjectPool c new ThriftClientProperty.Builder() .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS()) .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) - .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager()) + .setSelectorNumOfAsyncClientManager(selectorNumOfAsyncClientManager) .build(), ThreadName.ASYNC_CONFIGNODE_CLIENT_POOL.getName()), new ClientPoolProperty.Builder() @@ -120,6 +126,12 @@ public GenericKeyedObjectPool crea public static class AsyncDataNodeInternalServiceClientPoolFactory implements IClientPoolFactory { + private final int selectorNumOfAsyncClientManager; + + public AsyncDataNodeInternalServiceClientPoolFactory(int selectorNumOfAsyncClientManager) { + this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager; + } + @Override public GenericKeyedObjectPool createClientPool( ClientManager manager) { @@ -130,7 +142,8 @@ public GenericKeyedObjectPool cre new ThriftClientProperty.Builder() .setConnectionTimeoutMs(conf.getDnConnectionTimeoutInMS()) .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) - .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager()) + .setSelectorNumOfAsyncClientManager(selectorNumOfAsyncClientManager) + .setPrintLogWhenEncounterException(false) .build(), ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()), new ClientPoolProperty.Builder() @@ -145,6 +158,12 @@ public GenericKeyedObjectPool cre public static class AsyncDataNodeExternalServiceClientPoolFactory implements IClientPoolFactory { + private final int selectorNumOfAsyncClientManager; + + public AsyncDataNodeExternalServiceClientPoolFactory(int selectorNumOfAsyncClientManager) { + this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager; + } + @Override public GenericKeyedObjectPool createClientPool( ClientManager manager) { @@ -155,7 +174,7 @@ public GenericKeyedObjectPool cre new ThriftClientProperty.Builder() .setConnectionTimeoutMs(conf.getDnConnectionTimeoutInMS()) .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) - .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager()) + .setSelectorNumOfAsyncClientManager(selectorNumOfAsyncClientManager) .build(), ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()), new ClientPoolProperty.Builder() @@ -170,6 +189,12 @@ public GenericKeyedObjectPool cre public static class AsyncConfigNodeHeartbeatServiceClientPoolFactory implements IClientPoolFactory { + private final int selectorNumOfAsyncClientManager; + + public AsyncConfigNodeHeartbeatServiceClientPoolFactory(int selectorNumOfAsyncClientManager) { + this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager; + } + @Override public GenericKeyedObjectPool createClientPool( ClientManager manager) { @@ -181,7 +206,7 @@ public GenericKeyedObjectPool c new ThriftClientProperty.Builder() .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS()) .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) - .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager()) + .setSelectorNumOfAsyncClientManager(selectorNumOfAsyncClientManager) .setPrintLogWhenEncounterException(false) .build(), ThreadName.ASYNC_CONFIGNODE_HEARTBEAT_CLIENT_POOL.getName()), @@ -196,6 +221,13 @@ public GenericKeyedObjectPool c public static class AsyncDataNodeHeartbeatServiceClientPoolFactory implements IClientPoolFactory { + + private final int selectorNumOfAsyncClientManager; + + public AsyncDataNodeHeartbeatServiceClientPoolFactory(int selectorNumOfAsyncClientManager) { + this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager; + } + @Override public GenericKeyedObjectPool createClientPool( ClientManager manager) { @@ -206,7 +238,7 @@ public GenericKeyedObjectPool cre new ThriftClientProperty.Builder() .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS()) .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) - .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager()) + .setSelectorNumOfAsyncClientManager(selectorNumOfAsyncClientManager) .setPrintLogWhenEncounterException(false) .build(), ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()), @@ -246,6 +278,13 @@ public static class SyncDataNodeMPPDataExchangeServiceClientPoolFactory public static class AsyncDataNodeMPPDataExchangeServiceClientPoolFactory implements IClientPoolFactory { + private final int selectorNumOfAsyncClientManager; + + public AsyncDataNodeMPPDataExchangeServiceClientPoolFactory( + int selectorNumOfAsyncClientManager) { + this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager; + } + @Override public GenericKeyedObjectPool createClientPool( @@ -257,7 +296,7 @@ public static class AsyncDataNodeMPPDataExchangeServiceClientPoolFactory new ThriftClientProperty.Builder() .setConnectionTimeoutMs(conf.getDnConnectionTimeoutInMS()) .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) - .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager()) + .setSelectorNumOfAsyncClientManager(selectorNumOfAsyncClientManager) .build(), ThreadName.ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL.getName()), new ClientPoolProperty.Builder() @@ -323,6 +362,13 @@ public GenericKeyedObjectPool cre public static class AsyncAINodeHeartbeatServiceClientPoolFactory implements IClientPoolFactory { + + private final int selectorNumOfAsyncClientManager; + + public AsyncAINodeHeartbeatServiceClientPoolFactory(int selectorNumOfAsyncClientManager) { + this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager; + } + @Override public GenericKeyedObjectPool createClientPool( ClientManager manager) { @@ -333,7 +379,7 @@ public GenericKeyedObjectPool createClientP new ThriftClientProperty.Builder() .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS()) .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) - .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager()) + .setSelectorNumOfAsyncClientManager(selectorNumOfAsyncClientManager) .setPrintLogWhenEncounterException(false) .build(), ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java index 5bb7c22ee4994..1f818afe543da 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java @@ -19,6 +19,8 @@ package org.apache.iotdb.commons.client.property; +import org.apache.iotdb.commons.conf.CommonDescriptor; + import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig; import java.time.Duration; @@ -49,7 +51,11 @@ public static class Builder { * the maximum number of clients that can be allocated for a node. When some clients are idle * for more than {@code maxIdleTimeForClient}, they will be cleaned up. */ - private int maxClientNumForEachNode = DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE; + private int maxClientNumForEachNode = + CommonDescriptor.getInstance().getConfig().getMaxClientNumForEachNode(); + + private int maxIdleClientNumForEachNode = + CommonDescriptor.getInstance().getConfig().getMaxIdleClientNumForEachNode(); /** * the minimum amount of time a client may sit idle in the pool before it is eligible for @@ -74,6 +80,11 @@ public Builder setMaxClientNumForEachNode(int maxClientNumForEachNode) { return this; } + public Builder setMaxIdleClientNumForEachNode(int maxIdleClientNumForEachNode) { + this.maxIdleClientNumForEachNode = maxIdleClientNumForEachNode; + return this; + } + public Builder setMinIdleTimeForClient(long minIdleTimeForClient) { this.minIdleTimeForClient = minIdleTimeForClient; return this; @@ -87,7 +98,7 @@ public Builder setTimeBetweenEvictionRuns(long timeBetweenEvictionRuns) { public ClientPoolProperty build() { GenericKeyedObjectPoolConfig poolConfig = new GenericKeyedObjectPoolConfig<>(); poolConfig.setMaxTotalPerKey(maxClientNumForEachNode); - poolConfig.setMaxIdlePerKey(maxClientNumForEachNode); + poolConfig.setMaxIdlePerKey(maxIdleClientNumForEachNode); poolConfig.setTimeBetweenEvictionRuns(Duration.ofMillis(timeBetweenEvictionRuns)); poolConfig.setMinEvictableIdleTime(Duration.ofMillis(minIdleTimeForClient)); poolConfig.setMaxWait(Duration.ofMillis(waitClientTimeoutMs)); @@ -105,5 +116,6 @@ private DefaultProperty() {} public static final long MIN_IDLE_TIME_FOR_CLIENT_MS = TimeUnit.MINUTES.toMillis(1); public static final long TIME_BETWEEN_EVICTION_RUNS_MS = TimeUnit.MINUTES.toMillis(1); public static final int MAX_CLIENT_NUM_FOR_EACH_NODE = 1000; + public static final int MAX_IDLE_CLIENT_NUM_FOR_EACH_NODE = 1000; } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java index f8fe16166a8c8..f157ee7df936f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java @@ -117,7 +117,10 @@ private DefaultProperty() {} public static final boolean RPC_THRIFT_COMPRESSED_ENABLED = false; public static final int CONNECTION_TIMEOUT_MS = (int) TimeUnit.SECONDS.toMillis(20); public static final int CONNECTION_NEVER_TIMEOUT_MS = 0; - public static final int SELECTOR_NUM_OF_ASYNC_CLIENT_MANAGER = 1; + public static final int SELECTOR_NUM_OF_ASYNC_CLIENT_MANAGER = + Runtime.getRuntime().availableProcessors() / 4 > 0 + ? Runtime.getRuntime().availableProcessors() / 4 + : 1; public static final boolean PRINT_LOG_WHEN_ENCOUNTER_EXCEPTION = true; } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java index 3cca39bb635da..0290d33d3a46b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java @@ -53,15 +53,15 @@ public abstract class AsyncRequestManager { private static final int MAX_RETRY_NUM = 6; - protected AsyncRequestManager() { - initClientManager(); + protected AsyncRequestManager(int selectorNumOfAsyncClientManager) { + initClientManager(selectorNumOfAsyncClientManager); actionMapBuilder = ImmutableMap.builder(); initActionMapBuilder(); this.actionMap = this.actionMapBuilder.build(); checkActionMapCompleteness(); } - protected abstract void initClientManager(); + protected abstract void initClientManager(int selectorNumOfAsyncClientManager); protected abstract void initActionMapBuilder(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/ConfigNodeInternalServiceAsyncRequestManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/ConfigNodeInternalServiceAsyncRequestManager.java index 791a1e5df0e96..3b50c29fba6bf 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/ConfigNodeInternalServiceAsyncRequestManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/ConfigNodeInternalServiceAsyncRequestManager.java @@ -28,12 +28,18 @@ public abstract class ConfigNodeInternalServiceAsyncRequestManager extends AsyncRequestManager< RequestType, TConfigNodeLocation, AsyncConfigNodeInternalServiceClient> { + + protected ConfigNodeInternalServiceAsyncRequestManager(int selectorNumOfAsyncClientManager) { + super(selectorNumOfAsyncClientManager); + } + @Override - protected void initClientManager() { + protected void initClientManager(int selectorNumOfAsyncClientManager) { clientManager = new IClientManager.Factory() .createClientManager( - new ClientPoolFactory.AsyncConfigNodeInternalServiceClientPoolFactory()); + new ClientPoolFactory.AsyncConfigNodeInternalServiceClientPoolFactory( + selectorNumOfAsyncClientManager)); } @Override diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeInternalServiceRequestManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeInternalServiceRequestManager.java index fcb1b01857df8..722d4f241ebe1 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeInternalServiceRequestManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeInternalServiceRequestManager.java @@ -28,12 +28,18 @@ public abstract class DataNodeInternalServiceRequestManager extends AsyncRequestManager< RequestType, TDataNodeLocation, AsyncDataNodeInternalServiceClient> { + + protected DataNodeInternalServiceRequestManager(int selectorNumOfAsyncClientManager) { + super(selectorNumOfAsyncClientManager); + } + @Override - protected void initClientManager() { + protected void initClientManager(int selectorNumOfAsyncClientManager) { clientManager = new IClientManager.Factory() .createClientManager( - new ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory()); + new ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory( + selectorNumOfAsyncClientManager)); } @Override diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index 696b6b8ce07fb..4f5ad140e9919 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java @@ -44,7 +44,6 @@ public enum ThreadName { MPP_COORDINATOR_EXECUTOR_POOL("MPP-Coordinator-Executor"), DATANODE_INTERNAL_RPC_SERVICE("DataNodeInternalRPC-Service"), DATANODE_INTERNAL_RPC_PROCESSOR("DataNodeInternalRPC-Processor"), - MPP_COORDINATOR_WRITE_EXECUTOR("MPP-Coordinator-Write-Executor"), ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL("AsyncDataNodeMPPDataExchangeServiceClientPool"), // -------------------------- Compaction -------------------------- COMPACTION_WORKER("Compaction-Worker"), @@ -221,7 +220,6 @@ public enum ThreadName { MPP_COORDINATOR_EXECUTOR_POOL, DATANODE_INTERNAL_RPC_SERVICE, DATANODE_INTERNAL_RPC_PROCESSOR, - MPP_COORDINATOR_WRITE_EXECUTOR, ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL)); private static final Set compactionThreadNames = new HashSet<>(Arrays.asList(COMPACTION_WORKER, COMPACTION_SUB_TASK, COMPACTION_SCHEDULE)); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 22e282d85a83f..e1f52ba65ff6d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -146,13 +146,18 @@ public class CommonConfig { * ClientManager will have so many selector threads (TAsyncClientManager) to distribute to its * clients. */ - private int selectorNumOfClientManager = 1; + private int selectorNumOfClientManager = + Runtime.getRuntime().availableProcessors() / 4 > 0 + ? Runtime.getRuntime().availableProcessors() / 4 + : 1; /** Whether to use thrift compression. */ private boolean isRpcThriftCompressionEnabled = false; private int maxClientNumForEachNode = DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE; + private int maxIdleClientNumForEachNode = DefaultProperty.MAX_IDLE_CLIENT_NUM_FOR_EACH_NODE; + /** What will the system do when unrecoverable error occurs. */ private HandleSystemErrorStrategy handleSystemErrorStrategy = HandleSystemErrorStrategy.CHANGE_TO_READ_ONLY; @@ -630,6 +635,14 @@ public void setMaxClientNumForEachNode(int maxClientNumForEachNode) { this.maxClientNumForEachNode = maxClientNumForEachNode; } + public int getMaxIdleClientNumForEachNode() { + return maxIdleClientNumForEachNode; + } + + public void setMaxIdleClientNumForEachNode(int maxIdleClientNumForEachNode) { + this.maxIdleClientNumForEachNode = maxIdleClientNumForEachNode; + } + HandleSystemErrorStrategy getHandleSystemErrorStrategy() { return handleSystemErrorStrategy; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index 961d4fbb8f248..ab7d949044d0b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -128,13 +128,16 @@ public void loadCommonProps(TrimProperties properties) throws IOException { "cn_connection_timeout_ms", String.valueOf(config.getCnConnectionTimeoutInMS())) .trim())); - config.setSelectorNumOfClientManager( + int cnSelectorNumOfClientManager = Integer.parseInt( properties .getProperty( "cn_selector_thread_nums_of_client_manager", String.valueOf(config.getSelectorNumOfClientManager())) - .trim())); + .trim()); + if (cnSelectorNumOfClientManager > 0) { + config.setSelectorNumOfClientManager(cnSelectorNumOfClientManager); + } config.setMaxClientNumForEachNode( Integer.parseInt( @@ -144,6 +147,17 @@ public void loadCommonProps(TrimProperties properties) throws IOException { String.valueOf(config.getMaxClientNumForEachNode())) .trim())); + int cnMaxIdleClientNumForEachNode = + Integer.parseInt( + properties + .getProperty( + "cn_max_idle_client_count_for_each_node_in_client_manager", + String.valueOf(config.getMaxIdleClientNumForEachNode())) + .trim()); + if (cnMaxIdleClientNumForEachNode >= 0) { + config.setMaxIdleClientNumForEachNode(cnMaxIdleClientNumForEachNode); + } + config.setDnConnectionTimeoutInMS( Integer.parseInt( properties @@ -159,13 +173,16 @@ public void loadCommonProps(TrimProperties properties) throws IOException { String.valueOf(config.isRpcThriftCompressionEnabled())) .trim())); - config.setSelectorNumOfClientManager( + int dnSelectorNumOfClientManager = Integer.parseInt( properties .getProperty( "dn_selector_thread_nums_of_client_manager", String.valueOf(config.getSelectorNumOfClientManager())) - .trim())); + .trim()); + if (dnSelectorNumOfClientManager > 0) { + config.setSelectorNumOfClientManager(dnSelectorNumOfClientManager); + } config.setMaxClientNumForEachNode( Integer.parseInt( @@ -175,6 +192,17 @@ public void loadCommonProps(TrimProperties properties) throws IOException { String.valueOf(config.getMaxClientNumForEachNode())) .trim())); + int dnMaxIdleClientNumForEachNode = + Integer.parseInt( + properties + .getProperty( + "dn_max_idle_client_count_for_each_node_in_client_manager", + String.valueOf(config.getMaxIdleClientNumForEachNode())) + .trim()); + if (dnMaxIdleClientNumForEachNode >= 0) { + config.setMaxIdleClientNumForEachNode(dnMaxIdleClientNumForEachNode); + } + config.setHandleSystemErrorStrategy( HandleSystemErrorStrategy.valueOf( properties diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java index 70c32ca4801d8..7296a0b3f78ca 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java @@ -214,6 +214,7 @@ public void evictionTest() throws Exception { manager, new ThriftClientProperty.Builder().build()), new ClientPoolProperty.Builder() .setMaxClientNumForEachNode(maxClientForEachNode) + .setMaxIdleClientNumForEachNode(maxClientForEachNode) .setMinIdleTimeForClient(minIdleDuration) .setTimeBetweenEvictionRuns(evictionRunsDuration) .build() @@ -294,6 +295,7 @@ public void maxTotalTest() throws Exception { manager, new ThriftClientProperty.Builder().build()), new ClientPoolProperty.Builder() .setMaxClientNumForEachNode(maxTotalClientForEachNode) + .setMaxIdleClientNumForEachNode(maxTotalClientForEachNode) .setWaitClientTimeoutMs(waitClientTimeoutMs) .build() .getConfig()); @@ -369,6 +371,7 @@ public void maxWaitClientTimeoutTest() throws Exception { new ClientPoolProperty.Builder() .setWaitClientTimeoutMs(waitClientTimeoutMS) .setMaxClientNumForEachNode(maxTotalClientForEachNode) + .setMaxIdleClientNumForEachNode(maxTotalClientForEachNode) .build() .getConfig()); } @@ -617,7 +620,13 @@ public GenericKeyedObjectPool crea new ThriftClientProperty.Builder() .setConnectionTimeoutMs(CONNECTION_TIMEOUT) .build()), - new ClientPoolProperty.Builder().build().getConfig()); + new ClientPoolProperty.Builder() + .setMaxClientNumForEachNode( + ClientPoolProperty.DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE) + .setMaxIdleClientNumForEachNode( + ClientPoolProperty.DefaultProperty.MAX_IDLE_CLIENT_NUM_FOR_EACH_NODE) + .build() + .getConfig()); } } @@ -632,7 +641,13 @@ public GenericKeyedObjectPool cre manager, new ThriftClientProperty.Builder().setConnectionTimeoutMs(CONNECTION_TIMEOUT).build(), ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()), - new ClientPoolProperty.Builder().build().getConfig()); + new ClientPoolProperty.Builder() + .setMaxClientNumForEachNode( + ClientPoolProperty.DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE) + .setMaxIdleClientNumForEachNode( + ClientPoolProperty.DefaultProperty.MAX_IDLE_CLIENT_NUM_FOR_EACH_NODE) + .build() + .getConfig()); } } }