Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import com.tencent.trpc.core.rpc.Response;
import com.tencent.trpc.core.rpc.RpcClient;
import com.tencent.trpc.core.worker.WorkerPoolManager;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -35,15 +33,40 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Used to manage the list mapping of point-to-point clients generated through BackendConfig,
* and because the entire framework is based on a pull model to maintain the service IP of the server.
* Therefore, a scanner is added to remove long-unused clients.
* Used to manage the list mapping of point-to-point clients generated through BackendConfig.
* <p>
* Long-connection mode:
* <ul>
* <li>Clients are kept alive for the lifetime of the {@link BackendConfig}; no idle timeout
* scanner closes idle connections.</li>
* <li>A lightweight background timer periodically (every
* {@value #RECONNECT_CHECK_PERIOD_SECONDS}s) scans cached clients. If a client is found
* to be unavailable, its failure counter is incremented; once the counter reaches
* {@value #MAX_RECONNECT_FAILURES}, the client is closed and evicted, freeing memory and
* allowing the next request to rebuild a fresh long connection.</li>
* <li>When the underlying {@link RpcClient} closes itself (transport error or our own timer),
* the {@link RpcClient#closeFuture()} callback removes the cache entry.</li>
* <li>{@link #shutdownBackendConfig(BackendConfig)} / {@link #close()} still release clients
* explicitly.</li>
* </ul>
*/
public class RpcClusterClientManager {

private static final Logger logger = LoggerFactory.getLogger(RpcClusterClientManager.class);

/**
* Period of the reconnect-check timer in seconds.
*/
private static final int RECONNECT_CHECK_PERIOD_SECONDS = 30;
/**
* Maximum number of consecutive reconnect-check failures tolerated before the client is
* closed and evicted from the cache.
*/
private static final int MAX_RECONNECT_FAILURES = 5;

/**
* Cluster map, {@code Map<BackendConfig, Map<String, RpcClientProxy>>}
*/
Expand All @@ -52,14 +75,11 @@ public class RpcClusterClientManager {
* Is close flag
*/
private static final AtomicBoolean CLOSED_FLAG = new AtomicBoolean(false);

/**
* Prevent too many clients and perform periodic cleaning.
* Reconnect-check timer handle. Started lazily on first {@link #getOrCreateClient}.
*/
private static ScheduledFuture<?> cleanerFuture;

static {
cleanerFuture = startRpcClientCleaner();
}
private static volatile ScheduledFuture<?> reconnectCheckerFuture;

/**
* Shutdown a cluster.
Expand All @@ -82,80 +102,40 @@ public static void shutdownBackendConfig(BackendConfig backendConfig) {
}));
}

/**
* Used to periodically scan unused clients and release them.
* <p>Add judgment to determine whether to close the shared thread pool.</p>
*
* @return ScheduledFuture, a delayed result-bearing action that can be cancelled
*/
private static ScheduledFuture<?> startRpcClientCleaner() {
return Optional.ofNullable(WorkerPoolManager.getShareScheduler())
.map(ss -> {
if (ss.isShutdown()) {
return null;
}
return ss.scheduleAtFixedRate(() -> {
try {
scanUnusedClient();
} catch (Throwable ex) {
logger.error("RpcClientCleaner exception", ex);
}
}, 0, 15, TimeUnit.MINUTES);
}).orElse(null);
}

/**
* Scanning for unused clients.
*/
public static void scanUnusedClient() {
Map<BackendConfig, List<RpcClient>> unusedClientMap = Maps.newHashMap();
CLUSTER_MAP.forEach((bConfig, clusterMap) -> {
if (logger.isDebugEnabled()) {
logger.debug("RpcClusterClient scheduler report clusterName={}, naming={}, num of client is {}",
bConfig.getName(), bConfig.getNamingOptions().getServiceNaming(), clusterMap.keySet().size());
}
clusterMap.forEach((clientKey, clientValue) -> {
try {
if (isIdleTimeout(bConfig, clientValue)) {
Optional.ofNullable(clusterMap.remove(clientKey))
.ifPresent(rpcCli -> unusedClientMap.computeIfAbsent(bConfig, k -> new ArrayList<>())
.add(rpcCli));
}
} catch (Throwable ex) {
logger.error("RpcClientCleaner exception", ex);
}
});
});
unusedClientMap.forEach((bConfig, value) -> value.forEach(e -> {
try {
e.close();
} finally {
logger.warn("RpcClient in clusterName={}, naming={}, remove rpc client{}, due to unused time > {} ms",
bConfig.getName(), bConfig.getNamingOptions().getServiceNaming(),
e.getProtocolConfig().toSimpleString(), bConfig.getIdleTimeout());
}
}));
}

private static boolean isIdleTimeout(BackendConfig bConfig, RpcClientProxy clientProxy) {
long unusedNanosLimit = TimeUnit.MILLISECONDS.toNanos(bConfig.getIdleTimeout());
long lastUsedNanos = clientProxy.getLastUsedNanos();
return lastUsedNanos > 0 && unusedNanosLimit > 0 && (System.nanoTime() - lastUsedNanos) > unusedNanosLimit;
}

/**
* Get RpcClient based on BackendConfig. If RpcClient does not exist, create a new one and cache it.
* <p>The created client is a long-lived connection. To prevent memory leak, when the
* underlying client is closed (by itself or by the reconnect-check timer below), its entry
* in the cache is removed via the {@link RpcClient#closeFuture()} callback.</p>
*
* @param bConfig BackendConfig, configuration for the backend
* @param pConfig ProtocolConfig, configuration for the protocol
* @return RpcClient instance based on BackendConfig and ProtocolConfig
*/
public static RpcClient getOrCreateClient(BackendConfig bConfig, ProtocolConfig pConfig) {
Preconditions.checkNotNull(bConfig, "backendConfig can't not be null");
ensureReconnectCheckerStarted();
Map<String, RpcClientProxy> map = CLUSTER_MAP.computeIfAbsent(bConfig, k -> new ConcurrentHashMap<>());
RpcClientProxy rpcClientProxy = map.computeIfAbsent(pConfig.toUniqId(),
uniqId -> createRpcClientProxy(pConfig));
rpcClientProxy.updateLastUsedNanos();
String uniqId = pConfig.toUniqId();
RpcClientProxy rpcClientProxy = map.computeIfAbsent(uniqId,
k -> {
RpcClientProxy proxy = createRpcClientProxy(pConfig);
// When the underlying rpcClient closes (transport error or reconnect-check
// eviction), remove it from the cache to avoid memory leak. The next call
// will rebuild a new long connection on demand.
proxy.closeFuture().whenComplete((r, e) -> {
Map<String, RpcClientProxy> clusterMap = CLUSTER_MAP.get(bConfig);
if (clusterMap != null) {
// Only remove if the cached proxy is still the same instance.
clusterMap.remove(k, proxy);
}
if (logger.isDebugEnabled()) {
logger.debug("RpcClient closed, removed from cluster cache, backendConfig={}, client={}",
bConfig.toSimpleString(), proxy.getProtocolConfig().toSimpleString());
}
});
return proxy;
});
return rpcClientProxy;
}

Expand All @@ -174,15 +154,85 @@ private static RpcClientProxy createRpcClientProxy(ProtocolConfig protocolConfig
}
}

/**
* Lazily start the reconnect-check timer on first usage. Idempotent and thread-safe.
*/
private static void ensureReconnectCheckerStarted() {
if (reconnectCheckerFuture != null || CLOSED_FLAG.get()) {
return;
}
synchronized (RpcClusterClientManager.class) {
if (reconnectCheckerFuture != null || CLOSED_FLAG.get()) {
return;
}
try {
reconnectCheckerFuture = WorkerPoolManager.getShareScheduler().scheduleAtFixedRate(
RpcClusterClientManager::checkAndReconnect,
RECONNECT_CHECK_PERIOD_SECONDS,
RECONNECT_CHECK_PERIOD_SECONDS,
TimeUnit.SECONDS);
} catch (Throwable ex) {
logger.warn("Start reconnect-check timer failed, will fall back to lazy reconnect "
+ "on request path only", ex);
}
}
}

/**
* Periodic check: walk every cached client; for each one
* that is currently unavailable, increment its failure counter; once the counter reaches
* {@link #MAX_RECONNECT_FAILURES} the client is closed (which triggers
* {@code closeFuture} → cache eviction). Healthy clients have their counter reset.
* <p>The check itself does not actively send a heartbeat: it simply observes the current
* connection state. The transport's existing lazy reconnect (triggered by request path or
* by Netty's channelInactive event) takes care of re-establishing the long connection.</p>
*/
static void checkAndReconnect() {
if (CLOSED_FLAG.get()) {
return;
}
CLUSTER_MAP.forEach((bConfig, clusterMap) -> clusterMap.forEach((key, proxy) -> {
try {
if (proxy.isAvailable()) {
proxy.failureCount.set(0);
return;
}
int fails = proxy.failureCount.incrementAndGet();
if (logger.isDebugEnabled()) {
logger.debug("Reconnect-check: client {} not available, failureCount={}",
proxy.getProtocolConfig().toSimpleString(), fails);
}
if (fails >= MAX_RECONNECT_FAILURES) {
logger.warn("Reconnect-check: client {} unavailable for {} consecutive checks "
+ "(~{}s), closing and evicting from cache",
proxy.getProtocolConfig().toSimpleString(), fails,
fails * RECONNECT_CHECK_PERIOD_SECONDS);
try {
proxy.close();
} catch (Throwable ex) {
logger.error("Close stale client {} failed",
proxy.getProtocolConfig().toSimpleString(), ex);
}
}
} catch (Throwable ex) {
logger.error("Reconnect-check on client {} threw", key, ex);
}
}));
}

/**
* Close client
*/
public static void close() {
if (CLOSED_FLAG.compareAndSet(Boolean.FALSE, Boolean.TRUE)) {
try {
Optional.ofNullable(cleanerFuture).ifPresent(cf -> cf.cancel(Boolean.TRUE));
ScheduledFuture<?> f = reconnectCheckerFuture;
if (f != null) {
f.cancel(true);
reconnectCheckerFuture = null;
}
} catch (Exception ex) {
logger.error("clientCleanerFuture ", ex);
logger.error("Cancel reconnect-check timer failed", ex);
}
CLUSTER_MAP.forEach((config, clientProxyMap) -> clientProxyMap
.forEach((key, clientProxy) -> {
Expand All @@ -193,6 +243,7 @@ public static void close() {
ex);
}
}));
CLUSTER_MAP.clear();
}
}

Expand All @@ -218,7 +269,6 @@ public Class<T> getInterface() {

@Override
public CompletionStage<Response> invoke(Request request) {
rpcClient.updateLastUsedNanos();
return delegate.invoke(request);
}

Expand Down Expand Up @@ -255,22 +305,17 @@ public boolean equals(Object obj) {

private static class RpcClientProxy implements RpcClient {

private RpcClient delegate;

private volatile long lastUsedNanos = System.nanoTime();
private final RpcClient delegate;
/**
* Consecutive failure count observed by the reconnect-check timer; reset to 0 whenever
* the client is observed available.
*/
final AtomicInteger failureCount = new AtomicInteger(0);

RpcClientProxy(RpcClient delegate) {
this.delegate = delegate;
}

public void updateLastUsedNanos() {
lastUsedNanos = System.nanoTime();
}

public long getLastUsedNanos() {
return lastUsedNanos;
}

@Override
public void open() throws TRpcException {
delegate.open();
Expand Down Expand Up @@ -327,4 +372,4 @@ public boolean equals(Object obj) {
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ public ConsumerInvokerProxy<T> createInvoker(ServiceInstance instance) {
value = invokerCache.get(key);
if (value == null || !value.isAvailable()) {
if (value != null && !value.isAvailable()) {
invokerCache.remove(key);
// CAS remove: only evict if the current cache slot is still the stale
// proxy we observed. Avoid blowing away a fresh proxy that another
// thread may have just installed (or that an earlier closeFuture hook
// would otherwise also race against).
invokerCache.remove(key, value);
}
RpcClient rpcClient;
try {
Expand All @@ -100,24 +104,30 @@ public ConsumerInvokerProxy<T> createInvoker(ServiceInstance instance) {
backendConfig.generateProtocolConfig(instance.getHost(), instance.getPort(),
backendConfig.getNetwork(), protocolType.getName(), backendConfig.getExtMap()));
ConsumerInvoker<T> originInvoker = rpcClient.createInvoker(consumerConfig);
value = new ConsumerInvokerProxy<>(FilterChain.buildConsumerChain(consumerConfig,
originInvoker), rpcClient);
invokerCache.put(key, value);
// When the rpcClient is cleaned up and closed during idle time,
// the corresponding map should also be cleaned up.
ConsumerInvokerProxy<T> created = new ConsumerInvokerProxy<>(
FilterChain.buildConsumerChain(consumerConfig, originInvoker), rpcClient);
invokerCache.put(key, created);
// Long-connection mode: the client is no longer evicted by an idle scanner.
// It is only closed on explicit shutdown or fatal transport error (or by
// the reconnect-check timer in RpcClusterClientManager). When that
// happens we still need to clean up the local invoker cache to avoid
// memory leak.
// Use CAS remove: if the cache slot has already been replaced by a newer
// proxy (e.g. after a series of rebuilds), this stale closeFuture must
// NOT evict that newer entry.
rpcClient.closeFuture().whenComplete((r, e) -> {
ConsumerInvokerProxy<T> remove = invokerCache.remove(key);
if (remove != null) {
logger.warn("Service [name=" + consumerConfig.getServiceInterface()
boolean removed = invokerCache.remove(key, created);
if (removed && logger.isDebugEnabled()) {
logger.debug("Service [name=" + consumerConfig.getServiceInterface()
.getName()
+ ", naming=" + backendConfig.getNamingOptions()
.getServiceNaming()
+ ")], remove rpc client invoker["
+ remove.getInvoker().getProtocolConfig().toSimpleString()
+ "], remove rpc client invoker["
+ created.getInvoker().getProtocolConfig().toSimpleString()
+ "], due to rpc client close");
}
});
return value;
return created;
} catch (Exception ex) {
throw TRpcException.newFrameException(ErrorCode.TRPC_INVOKE_UNKNOWN_ERR,
"Service(name=" + consumerConfig.getServiceInterface().getName()
Expand Down
Loading
Loading