Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,12 @@ public CommonConfig setMaxRowsInCteBuffer(int maxRows) {
return this;
}

@Override
public CommonConfig setEnableTopologyProbing(boolean enableTopologyProbing) {
setProperty("enable_topology_probing", String.valueOf(enableTopologyProbing));
return this;
}

// For part of the log directory
public String getClusterConfigStr() {
return fromConsensusFullNameToAbbr(properties.getProperty(CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,4 +699,11 @@ public CommonConfig setMaxRowsInCteBuffer(int maxRows) {
cnConfig.setMaxRowsInCteBuffer(maxRows);
return this;
}

@Override
public CommonConfig setEnableTopologyProbing(boolean enableTopologyProbing) {
dnConfig.setEnableTopologyProbing(enableTopologyProbing);
cnConfig.setEnableTopologyProbing(enableTopologyProbing);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -487,4 +487,9 @@ public CommonConfig setCteBufferSize(long cteBufferSize) {
public CommonConfig setMaxRowsInCteBuffer(int maxRows) {
return this;
}

@Override
public CommonConfig setEnableTopologyProbing(boolean enableTopologyProbing) {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,4 +215,6 @@ default CommonConfig setDefaultDatabaseLevel(int defaultDatabaseLevel) {
CommonConfig setCteBufferSize(long cteBufferSize);

CommonConfig setMaxRowsInCteBuffer(int maxRows);

CommonConfig setEnableTopologyProbing(boolean enableTopologyProbing);
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public static void setUp() throws Exception {
.setSchemaReplicationFactor(testReplicationFactor)
.setDataReplicationFactor(testReplicationFactor)
.setTimePartitionInterval(testTimePartitionInterval)
.setDefaultDataRegionGroupNumPerDatabase(testDataRegionGroupPerDatabase);
.setDefaultDataRegionGroupNumPerDatabase(testDataRegionGroupPerDatabase)
.setEnableTopologyProbing(true);
EnvFactory.getEnv().initClusterEnvironment(1, 3);
prepareTableData(createSqls);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public enum CnToDnAsyncRequestType {
SUBMIT_TEST_CONNECTION_TASK,
SUBMIT_TEST_DN_INTERNAL_CONNECTION_TASK,
TEST_CONNECTION,
PUSH_TOPOLOGY,

// Region Maintenance
CREATE_DATA_REGION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.apache.iotdb.mpp.rpc.thrift.TTableDeviceDeletionWithPatternAndFilterReq;
import org.apache.iotdb.mpp.rpc.thrift.TTableDeviceDeletionWithPatternOrModReq;
import org.apache.iotdb.mpp.rpc.thrift.TTableDeviceInvalidateCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateClusterTopologyReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTableReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
Expand Down Expand Up @@ -407,6 +408,11 @@ protected void initActionMapBuilder() {
CnToDnAsyncRequestType.TEST_CONNECTION,
(req, client, handler) ->
client.testConnectionEmptyRPC((DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
CnToDnAsyncRequestType.PUSH_TOPOLOGY,
(req, client, handler) ->
client.updateClusterTopology(
(TUpdateClusterTopologyReq) req, (DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
CnToDnAsyncRequestType.INSERT_RECORD,
(req, client, handler) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ public static DataNodeAsyncRequestRPCHandler<?> buildHandler(
dataNodeLocationMap,
(Map<Integer, TExternalServiceListResp>) responseMap,
countDownLatch);
case PUSH_TOPOLOGY:
case SET_TTL:
case CREATE_DATA_REGION:
case CREATE_SCHEMA_REGION:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,15 @@ public class ConfigNodeConfig {
/** Acceptable pause duration for Phi accrual failure detector */
private long failureDetectorPhiAcceptablePauseInMs = 10000;

/** Whether to enable topology probing between DataNodes. Supports hot-reload. */
private volatile boolean enableTopologyProbing = false;

/** Base interval in ms for topology probing. */
private long topologyProbingBaseIntervalInMs = 5000;

/** Ratio of probing timeout to probing interval (must be less than 1.0). */
private double topologyProbingTimeoutRatio = 0.5;

/** The policy of cluster RegionGroups' leader distribution. */
private String leaderDistributionPolicy = AbstractLeaderBalancer.CFD_POLICY;

Expand Down Expand Up @@ -1288,4 +1297,28 @@ public long getFailureDetectorPhiAcceptablePauseInMs() {
public void setFailureDetectorPhiAcceptablePauseInMs(long failureDetectorPhiAcceptablePauseInMs) {
this.failureDetectorPhiAcceptablePauseInMs = failureDetectorPhiAcceptablePauseInMs;
}

public boolean isEnableTopologyProbing() {
return enableTopologyProbing;
}

public void setEnableTopologyProbing(boolean enableTopologyProbing) {
this.enableTopologyProbing = enableTopologyProbing;
}

public long getTopologyProbingBaseIntervalInMs() {
return topologyProbingBaseIntervalInMs;
}

public void setTopologyProbingBaseIntervalInMs(long topologyProbingBaseIntervalInMs) {
this.topologyProbingBaseIntervalInMs = topologyProbingBaseIntervalInMs;
}

public double getTopologyProbingTimeoutRatio() {
return topologyProbingTimeoutRatio;
}

public void setTopologyProbingTimeoutRatio(double topologyProbingTimeoutRatio) {
this.topologyProbingTimeoutRatio = topologyProbingTimeoutRatio;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,35 @@ private void loadProperties(TrimProperties properties) throws BadNodeUrlExceptio
"failure_detector_phi_acceptable_pause_in_ms",
String.valueOf(conf.getFailureDetectorPhiAcceptablePauseInMs()))));

conf.setEnableTopologyProbing(
Boolean.parseBoolean(
properties.getProperty(
"enable_topology_probing", String.valueOf(conf.isEnableTopologyProbing()))));

long topologyProbingBaseIntervalInMs =
Long.parseLong(
properties.getProperty(
"topology_probing_base_interval_in_ms",
String.valueOf(conf.getTopologyProbingBaseIntervalInMs())));
if (topologyProbingBaseIntervalInMs <= 0) {
throw new IOException(
"topology_probing_base_interval_in_ms must be positive, but got: "
+ topologyProbingBaseIntervalInMs);
}
conf.setTopologyProbingBaseIntervalInMs(topologyProbingBaseIntervalInMs);

double topologyProbingTimeoutRatio =
Double.parseDouble(
properties.getProperty(
"topology_probing_timeout_ratio",
String.valueOf(conf.getTopologyProbingTimeoutRatio())));
if (topologyProbingTimeoutRatio <= 0 || topologyProbingTimeoutRatio >= 1.0) {
throw new IOException(
"topology_probing_timeout_ratio must be in (0, 1), but got: "
+ topologyProbingTimeoutRatio);
}
conf.setTopologyProbingTimeoutRatio(topologyProbingTimeoutRatio);

String leaderDistributionPolicy =
properties.getProperty("leader_distribution_policy", conf.getLeaderDistributionPolicy());
if (AbstractLeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy)
Expand Down Expand Up @@ -773,6 +802,8 @@ public void loadHotModifiedProps(TrimProperties properties) {
ConfigurationFileUtils.updateAppliedProperties(properties, true);
Optional.ofNullable(properties.getProperty(IoTDBConstant.CLUSTER_NAME))
.ifPresent(conf::setClusterName);
Optional.ofNullable(properties.getProperty("enable_topology_probing"))
.ifPresent(v -> conf.setEnableTopologyProbing(Boolean.parseBoolean(v)));
}

public static ConfigNodeDescriptor getInstance() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ public void notifyNotLeader() {
configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeMetaSync();
configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeHeartbeat();
configManager.getSubscriptionManager().getSubscriptionCoordinator().stopSubscriptionMetaSync();
configManager.getLoadManager().stopTopologyService();
configManager.getLoadManager().stopLoadServices();
configManager.getProcedureManager().stopExecutor();
configManager.getRetryFailedTasksThread().stopRetryFailedTasksService();
Expand Down Expand Up @@ -288,6 +289,10 @@ public void notifyLeaderReady() {
// Always start load services first
configManager.getLoadManager().startLoadServices();

if (CONF.isEnableTopologyProbing()) {
configManager.getLoadManager().startTopologyService();
}

// Start leader scheduling services
configManager.getProcedureManager().startExecutor();
threadPool.submit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1744,6 +1744,7 @@ public TSStatus setConfiguration(TSetConfigurationReq req) {
TrimProperties properties = new TrimProperties();
properties.putAll(req.getConfigs());

boolean wasTopologyProbingEnabled = CONF.isEnableTopologyProbing();
if (configurationFileFound) {
File file = new File(url.getFile());
try {
Expand All @@ -1767,6 +1768,7 @@ public TSStatus setConfiguration(TSetConfigurationReq req) {
}
LOGGER.warn(msg);
}
handleTopologyProbingHotReload(wasTopologyProbingEnabled);
if (currentNodeId == req.getNodeId()) {
return tsStatus;
}
Expand All @@ -1778,6 +1780,18 @@ public TSStatus setConfiguration(TSetConfigurationReq req) {
return RpcUtils.squashResponseStatusList(statusList);
}

private void handleTopologyProbingHotReload(boolean wasEnabled) {
boolean isEnabled = CONF.isEnableTopologyProbing();
if (wasEnabled == isEnabled) {
return;
}
if (isEnabled && getConsensusManager().isLeader()) {
getLoadManager().startTopologyService();
} else if (!isEnabled) {
getLoadManager().stopTopologyService();
}
Comment thread
CRZbulabula marked this conversation as resolved.
}

@Override
public TSStatus startRepairData() {
TSStatus status = confirmLeader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ public void startLoadServices() {
statisticsService.startLoadStatisticsService();
eventService.startEventService();
partitionBalancer.setupPartitionBalancer();
topologyService.startTopologyService();
}

public void stopLoadServices() {
Expand All @@ -162,6 +161,13 @@ public void stopLoadServices() {
loadCache.clearHeartbeatCache();
partitionBalancer.clearPartitionBalancer();
routeBalancer.clearRegionPriority();
}

public void startTopologyService() {
topologyService.startTopologyService();
}

public void stopTopologyService() {
topologyService.stopTopologyService();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ public void updateTopology(Map<Integer, Set<Integer>> latestTopology) {
for (int fromId : latestTopology.keySet()) {
for (int toId : latestTopology.keySet()) {
boolean originReachable =
latestTopology.getOrDefault(fromId, Collections.emptySet()).contains(toId);
topologyGraph.getOrDefault(fromId, Collections.emptySet()).contains(toId);
boolean newReachable =
latestTopology.getOrDefault(fromId, Collections.emptySet()).contains(toId);
if (originReachable != newReachable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -168,13 +167,6 @@ protected TDataNodeHeartbeatReq genHeartbeatReq() {
heartbeatReq.setSpaceQuotaUsage(configManager.getClusterQuotaManager().getSpaceQuotaUsage());
}

final Map<Integer, Set<Integer>> topologyMap =
configManager.getLoadManager().getLoadCache().getTopology();
if (topologyMap != null) {
heartbeatReq.setTopology(topologyMap);
heartbeatReq.setDataNodes(configManager.getNodeManager().getRegisteredDataNodeLocations());
}

// We broadcast region operations list every 100 heartbeat loops
if (heartbeatCounter.get() % 100 == 0) {
heartbeatReq.setCurrentRegionOperations(
Expand Down
Loading
Loading