diff --git a/trpc-core/src/main/java/com/tencent/trpc/core/cluster/RpcClusterClientManager.java b/trpc-core/src/main/java/com/tencent/trpc/core/cluster/RpcClusterClientManager.java index 320b8b0f5..76c516387 100644 --- a/trpc-core/src/main/java/com/tencent/trpc/core/cluster/RpcClusterClientManager.java +++ b/trpc-core/src/main/java/com/tencent/trpc/core/cluster/RpcClusterClientManager.java @@ -25,8 +25,6 @@ import com.tencent.trpc.core.rpc.Response; import com.tencent.trpc.core.rpc.RpcClient; import com.tencent.trpc.core.worker.WorkerPoolManager; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -35,15 +33,40 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; /** - * Used to manage the list mapping of point-to-point clients generated through BackendConfig, - * and because the entire framework is based on a pull model to maintain the service IP of the server. - * Therefore, a scanner is added to remove long-unused clients. + * Used to manage the list mapping of point-to-point clients generated through BackendConfig. + *

+ * Long-connection mode: + *

*/ public class RpcClusterClientManager { private static final Logger logger = LoggerFactory.getLogger(RpcClusterClientManager.class); + + /** + * Period of the reconnect-check timer in seconds. + */ + private static final int RECONNECT_CHECK_PERIOD_SECONDS = 30; + /** + * Maximum number of consecutive reconnect-check failures tolerated before the client is + * closed and evicted from the cache. + */ + private static final int MAX_RECONNECT_FAILURES = 5; + /** * Cluster map, {@code Map>} */ @@ -52,14 +75,11 @@ public class RpcClusterClientManager { * Is close flag */ private static final AtomicBoolean CLOSED_FLAG = new AtomicBoolean(false); + /** - * Prevent too many clients and perform periodic cleaning. + * Reconnect-check timer handle. Started lazily on first {@link #getOrCreateClient}. */ - private static ScheduledFuture cleanerFuture; - - static { - cleanerFuture = startRpcClientCleaner(); - } + private static volatile ScheduledFuture reconnectCheckerFuture; /** * Shutdown a cluster. @@ -82,69 +102,11 @@ public static void shutdownBackendConfig(BackendConfig backendConfig) { })); } - /** - * Used to periodically scan unused clients and release them. - *

Add judgment to determine whether to close the shared thread pool.

- * - * @return ScheduledFuture, a delayed result-bearing action that can be cancelled - */ - private static ScheduledFuture startRpcClientCleaner() { - return Optional.ofNullable(WorkerPoolManager.getShareScheduler()) - .map(ss -> { - if (ss.isShutdown()) { - return null; - } - return ss.scheduleAtFixedRate(() -> { - try { - scanUnusedClient(); - } catch (Throwable ex) { - logger.error("RpcClientCleaner exception", ex); - } - }, 0, 15, TimeUnit.MINUTES); - }).orElse(null); - } - - /** - * Scanning for unused clients. - */ - public static void scanUnusedClient() { - Map> unusedClientMap = Maps.newHashMap(); - CLUSTER_MAP.forEach((bConfig, clusterMap) -> { - if (logger.isDebugEnabled()) { - logger.debug("RpcClusterClient scheduler report clusterName={}, naming={}, num of client is {}", - bConfig.getName(), bConfig.getNamingOptions().getServiceNaming(), clusterMap.keySet().size()); - } - clusterMap.forEach((clientKey, clientValue) -> { - try { - if (isIdleTimeout(bConfig, clientValue)) { - Optional.ofNullable(clusterMap.remove(clientKey)) - .ifPresent(rpcCli -> unusedClientMap.computeIfAbsent(bConfig, k -> new ArrayList<>()) - .add(rpcCli)); - } - } catch (Throwable ex) { - logger.error("RpcClientCleaner exception", ex); - } - }); - }); - unusedClientMap.forEach((bConfig, value) -> value.forEach(e -> { - try { - e.close(); - } finally { - logger.warn("RpcClient in clusterName={}, naming={}, remove rpc client{}, due to unused time > {} ms", - bConfig.getName(), bConfig.getNamingOptions().getServiceNaming(), - e.getProtocolConfig().toSimpleString(), bConfig.getIdleTimeout()); - } - })); - } - - private static boolean isIdleTimeout(BackendConfig bConfig, RpcClientProxy clientProxy) { - long unusedNanosLimit = TimeUnit.MILLISECONDS.toNanos(bConfig.getIdleTimeout()); - long lastUsedNanos = clientProxy.getLastUsedNanos(); - return lastUsedNanos > 0 && unusedNanosLimit > 0 && (System.nanoTime() - lastUsedNanos) > unusedNanosLimit; - } - /** * Get RpcClient based on BackendConfig. If RpcClient does not exist, create a new one and cache it. + *

The created client is a long-lived connection. To prevent memory leak, when the + * underlying client is closed (by itself or by the reconnect-check timer below), its entry + * in the cache is removed via the {@link RpcClient#closeFuture()} callback.

* * @param bConfig BackendConfig, configuration for the backend * @param pConfig ProtocolConfig, configuration for the protocol @@ -152,10 +114,28 @@ private static boolean isIdleTimeout(BackendConfig bConfig, RpcClientProxy clien */ public static RpcClient getOrCreateClient(BackendConfig bConfig, ProtocolConfig pConfig) { Preconditions.checkNotNull(bConfig, "backendConfig can't not be null"); + ensureReconnectCheckerStarted(); Map map = CLUSTER_MAP.computeIfAbsent(bConfig, k -> new ConcurrentHashMap<>()); - RpcClientProxy rpcClientProxy = map.computeIfAbsent(pConfig.toUniqId(), - uniqId -> createRpcClientProxy(pConfig)); - rpcClientProxy.updateLastUsedNanos(); + String uniqId = pConfig.toUniqId(); + RpcClientProxy rpcClientProxy = map.computeIfAbsent(uniqId, + k -> { + RpcClientProxy proxy = createRpcClientProxy(pConfig); + // When the underlying rpcClient closes (transport error or reconnect-check + // eviction), remove it from the cache to avoid memory leak. The next call + // will rebuild a new long connection on demand. + proxy.closeFuture().whenComplete((r, e) -> { + Map clusterMap = CLUSTER_MAP.get(bConfig); + if (clusterMap != null) { + // Only remove if the cached proxy is still the same instance. + clusterMap.remove(k, proxy); + } + if (logger.isDebugEnabled()) { + logger.debug("RpcClient closed, removed from cluster cache, backendConfig={}, client={}", + bConfig.toSimpleString(), proxy.getProtocolConfig().toSimpleString()); + } + }); + return proxy; + }); return rpcClientProxy; } @@ -174,15 +154,85 @@ private static RpcClientProxy createRpcClientProxy(ProtocolConfig protocolConfig } } + /** + * Lazily start the reconnect-check timer on first usage. Idempotent and thread-safe. + */ + private static void ensureReconnectCheckerStarted() { + if (reconnectCheckerFuture != null || CLOSED_FLAG.get()) { + return; + } + synchronized (RpcClusterClientManager.class) { + if (reconnectCheckerFuture != null || CLOSED_FLAG.get()) { + return; + } + try { + reconnectCheckerFuture = WorkerPoolManager.getShareScheduler().scheduleAtFixedRate( + RpcClusterClientManager::checkAndReconnect, + RECONNECT_CHECK_PERIOD_SECONDS, + RECONNECT_CHECK_PERIOD_SECONDS, + TimeUnit.SECONDS); + } catch (Throwable ex) { + logger.warn("Start reconnect-check timer failed, will fall back to lazy reconnect " + + "on request path only", ex); + } + } + } + + /** + * Periodic check: walk every cached client; for each one + * that is currently unavailable, increment its failure counter; once the counter reaches + * {@link #MAX_RECONNECT_FAILURES} the client is closed (which triggers + * {@code closeFuture} → cache eviction). Healthy clients have their counter reset. + *

The check itself does not actively send a heartbeat: it simply observes the current + * connection state. The transport's existing lazy reconnect (triggered by request path or + * by Netty's channelInactive event) takes care of re-establishing the long connection.

+ */ + static void checkAndReconnect() { + if (CLOSED_FLAG.get()) { + return; + } + CLUSTER_MAP.forEach((bConfig, clusterMap) -> clusterMap.forEach((key, proxy) -> { + try { + if (proxy.isAvailable()) { + proxy.failureCount.set(0); + return; + } + int fails = proxy.failureCount.incrementAndGet(); + if (logger.isDebugEnabled()) { + logger.debug("Reconnect-check: client {} not available, failureCount={}", + proxy.getProtocolConfig().toSimpleString(), fails); + } + if (fails >= MAX_RECONNECT_FAILURES) { + logger.warn("Reconnect-check: client {} unavailable for {} consecutive checks " + + "(~{}s), closing and evicting from cache", + proxy.getProtocolConfig().toSimpleString(), fails, + fails * RECONNECT_CHECK_PERIOD_SECONDS); + try { + proxy.close(); + } catch (Throwable ex) { + logger.error("Close stale client {} failed", + proxy.getProtocolConfig().toSimpleString(), ex); + } + } + } catch (Throwable ex) { + logger.error("Reconnect-check on client {} threw", key, ex); + } + })); + } + /** * Close client */ public static void close() { if (CLOSED_FLAG.compareAndSet(Boolean.FALSE, Boolean.TRUE)) { try { - Optional.ofNullable(cleanerFuture).ifPresent(cf -> cf.cancel(Boolean.TRUE)); + ScheduledFuture f = reconnectCheckerFuture; + if (f != null) { + f.cancel(true); + reconnectCheckerFuture = null; + } } catch (Exception ex) { - logger.error("clientCleanerFuture ", ex); + logger.error("Cancel reconnect-check timer failed", ex); } CLUSTER_MAP.forEach((config, clientProxyMap) -> clientProxyMap .forEach((key, clientProxy) -> { @@ -193,6 +243,7 @@ public static void close() { ex); } })); + CLUSTER_MAP.clear(); } } @@ -218,7 +269,6 @@ public Class getInterface() { @Override public CompletionStage invoke(Request request) { - rpcClient.updateLastUsedNanos(); return delegate.invoke(request); } @@ -255,22 +305,17 @@ public boolean equals(Object obj) { private static class RpcClientProxy implements RpcClient { - private RpcClient delegate; - - private volatile long lastUsedNanos = System.nanoTime(); + private final RpcClient delegate; + /** + * Consecutive failure count observed by the reconnect-check timer; reset to 0 whenever + * the client is observed available. + */ + final AtomicInteger failureCount = new AtomicInteger(0); RpcClientProxy(RpcClient delegate) { this.delegate = delegate; } - public void updateLastUsedNanos() { - lastUsedNanos = System.nanoTime(); - } - - public long getLastUsedNanos() { - return lastUsedNanos; - } - @Override public void open() throws TRpcException { delegate.open(); @@ -327,4 +372,4 @@ public boolean equals(Object obj) { } } -} \ No newline at end of file +} diff --git a/trpc-core/src/main/java/com/tencent/trpc/core/cluster/def/DefClusterInvoker.java b/trpc-core/src/main/java/com/tencent/trpc/core/cluster/def/DefClusterInvoker.java index bf3155f18..d53cb4817 100644 --- a/trpc-core/src/main/java/com/tencent/trpc/core/cluster/def/DefClusterInvoker.java +++ b/trpc-core/src/main/java/com/tencent/trpc/core/cluster/def/DefClusterInvoker.java @@ -90,7 +90,11 @@ public ConsumerInvokerProxy createInvoker(ServiceInstance instance) { value = invokerCache.get(key); if (value == null || !value.isAvailable()) { if (value != null && !value.isAvailable()) { - invokerCache.remove(key); + // CAS remove: only evict if the current cache slot is still the stale + // proxy we observed. Avoid blowing away a fresh proxy that another + // thread may have just installed (or that an earlier closeFuture hook + // would otherwise also race against). + invokerCache.remove(key, value); } RpcClient rpcClient; try { @@ -100,24 +104,30 @@ public ConsumerInvokerProxy createInvoker(ServiceInstance instance) { backendConfig.generateProtocolConfig(instance.getHost(), instance.getPort(), backendConfig.getNetwork(), protocolType.getName(), backendConfig.getExtMap())); ConsumerInvoker originInvoker = rpcClient.createInvoker(consumerConfig); - value = new ConsumerInvokerProxy<>(FilterChain.buildConsumerChain(consumerConfig, - originInvoker), rpcClient); - invokerCache.put(key, value); - // When the rpcClient is cleaned up and closed during idle time, - // the corresponding map should also be cleaned up. + ConsumerInvokerProxy created = new ConsumerInvokerProxy<>( + FilterChain.buildConsumerChain(consumerConfig, originInvoker), rpcClient); + invokerCache.put(key, created); + // Long-connection mode: the client is no longer evicted by an idle scanner. + // It is only closed on explicit shutdown or fatal transport error (or by + // the reconnect-check timer in RpcClusterClientManager). When that + // happens we still need to clean up the local invoker cache to avoid + // memory leak. + // Use CAS remove: if the cache slot has already been replaced by a newer + // proxy (e.g. after a series of rebuilds), this stale closeFuture must + // NOT evict that newer entry. rpcClient.closeFuture().whenComplete((r, e) -> { - ConsumerInvokerProxy remove = invokerCache.remove(key); - if (remove != null) { - logger.warn("Service [name=" + consumerConfig.getServiceInterface() + boolean removed = invokerCache.remove(key, created); + if (removed && logger.isDebugEnabled()) { + logger.debug("Service [name=" + consumerConfig.getServiceInterface() .getName() + ", naming=" + backendConfig.getNamingOptions() .getServiceNaming() - + ")], remove rpc client invoker[" - + remove.getInvoker().getProtocolConfig().toSimpleString() + + "], remove rpc client invoker[" + + created.getInvoker().getProtocolConfig().toSimpleString() + "], due to rpc client close"); } }); - return value; + return created; } catch (Exception ex) { throw TRpcException.newFrameException(ErrorCode.TRPC_INVOKE_UNKNOWN_ERR, "Service(name=" + consumerConfig.getServiceInterface().getName() diff --git a/trpc-core/src/main/java/com/tencent/trpc/core/transport/AbstractClientTransport.java b/trpc-core/src/main/java/com/tencent/trpc/core/transport/AbstractClientTransport.java index 3c37a455d..0ec94dff6 100644 --- a/trpc-core/src/main/java/com/tencent/trpc/core/transport/AbstractClientTransport.java +++ b/trpc-core/src/main/java/com/tencent/trpc/core/transport/AbstractClientTransport.java @@ -11,7 +11,6 @@ package com.tencent.trpc.core.transport; -import com.google.common.collect.Lists; import com.tencent.trpc.core.common.LifecycleBase; import com.tencent.trpc.core.common.config.ProtocolConfig; import com.tencent.trpc.core.exception.LifecycleException; @@ -24,6 +23,7 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; @@ -64,7 +64,11 @@ public abstract class AbstractClientTransport implements ClientTransport { */ protected AtomicInteger channelIdx = new AtomicInteger(0); /** - * List of channels. + * Channel pool. Backed by {@link CopyOnWriteArrayList} so that the slot publication done by + * {@link #ensureChannelActive} (under {@link #connLock}) is visible to concurrent readers in + * {@link #getChannel0} with volatile semantics, eliminating the data race that an + * {@link java.util.ArrayList} would have. Size is bounded by {@code connsPerAddr}; writes + * are infrequent (slot rebuild on disconnect) so the COW copy cost is negligible. */ protected List channels; /** @@ -80,7 +84,7 @@ public AbstractClientTransport(ProtocolConfig config, ChannelHandler handler, this.config = Objects.requireNonNull(config, "config is null"); this.handler = Objects.requireNonNull(handler, "handler is null"); this.codec = clientCodec; - this.channels = Lists.newArrayListWithExpectedSize(config.getConnsPerAddr()); + this.channels = new CopyOnWriteArrayList<>(); } /** diff --git a/trpc-core/src/test/java/com/tencent/trpc/core/cluster/RpcClusterClientManagerTest.java b/trpc-core/src/test/java/com/tencent/trpc/core/cluster/RpcClusterClientManagerTest.java index 7ee1943da..efcc02627 100644 --- a/trpc-core/src/test/java/com/tencent/trpc/core/cluster/RpcClusterClientManagerTest.java +++ b/trpc-core/src/test/java/com/tencent/trpc/core/cluster/RpcClusterClientManagerTest.java @@ -12,6 +12,10 @@ package com.tencent.trpc.core.cluster; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import com.tencent.trpc.core.common.config.BackendConfig; import com.tencent.trpc.core.common.config.ConsumerConfig; @@ -21,12 +25,31 @@ import com.tencent.trpc.core.rpc.ConsumerInvoker; import com.tencent.trpc.core.rpc.RpcClient; import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; public class RpcClusterClientManagerTest { + @Before + public void setUp() { + // ensure clean state across tests (tests may have flipped CLOSED_FLAG) + RpcClusterClientManager.reset(); + } + + @After + public void tearDown() throws Exception { + // Clear cluster cache to keep tests independent. + Field field = RpcClusterClientManager.class.getDeclaredField("CLUSTER_MAP"); + field.setAccessible(true); + ((Map) field.get(null)).clear(); + RpcClusterClientManager.reset(); + } + @Test public void test() throws IllegalArgumentException, IllegalAccessException, NoSuchFieldException, SecurityException, InterruptedException { @@ -40,9 +63,12 @@ public void test() throws IllegalArgumentException, IllegalAccessException, NoSu field.setAccessible(true); Map clusterMap = (Map) field.get(null); assertEquals(1, clusterMap.get(backendConfig).size()); + // Long-connection mode: idle scanning is disabled, the client should still be cached after sleep. Thread.sleep(10); - RpcClusterClientManager.scanUnusedClient(); - assertEquals(0, clusterMap.get(backendConfig).size()); + assertEquals(1, clusterMap.get(backendConfig).size()); + // Explicit shutdown should release the cached client. + RpcClusterClientManager.shutdownBackendConfig(backendConfig); + assertNull(clusterMap.get(backendConfig)); BackendConfig backend = new BackendConfig(); backend.setNamingUrl("ip://127.0.0.1:8081"); RpcClusterClientManager.getOrCreateClient(backend, config); @@ -50,19 +76,18 @@ public void test() throws IllegalArgumentException, IllegalAccessException, NoSu } @Test - public void testDebugLog() throws Exception { + public void testDebugLog() { BackendConfig backendConfig = new BackendConfig(); backendConfig.setIdleTimeout(100000); backendConfig.setNamingUrl("ip://127.0.0.1:8082"); ProtocolConfigTest config = new ProtocolConfigTest(); RpcClient rpcClient = RpcClusterClientManager.getOrCreateClient(backendConfig, config); Assert.assertNotNull(rpcClient); - RpcClusterClientManager.scanUnusedClient(); RpcClusterClientManager.shutdownBackendConfig(backendConfig); } @Test - public void testGetOrCreateClientTwice() throws Exception { + public void testGetOrCreateClientTwice() { BackendConfig backendConfig = new BackendConfig(); backendConfig.setIdleTimeout(100000); backendConfig.setNamingUrl("ip://127.0.0.1:8083"); @@ -71,11 +96,13 @@ public void testGetOrCreateClientTwice() throws Exception { RpcClient rpcClient2 = RpcClusterClientManager.getOrCreateClient(backendConfig, config); Assert.assertNotNull(rpcClient1); Assert.assertNotNull(rpcClient2); + // Same key should return the same proxy instance (cache hit). + Assert.assertSame(rpcClient1, rpcClient2); RpcClusterClientManager.shutdownBackendConfig(backendConfig); } @Test - public void testClose() throws Exception { + public void testClose() { BackendConfig backendConfig = new BackendConfig(); backendConfig.setIdleTimeout(100000); backendConfig.setNamingUrl("ip://127.0.0.1:8084"); @@ -83,6 +110,8 @@ public void testClose() throws Exception { RpcClient rpcClient = RpcClusterClientManager.getOrCreateClient(backendConfig, config); Assert.assertNotNull(rpcClient); RpcClusterClientManager.close(); + // close() is idempotent, second call should be a no-op. + RpcClusterClientManager.close(); RpcClusterClientManager.reset(); } @@ -95,27 +124,324 @@ public void testShutdownNonExistBackend() { @Test public void testScanWithEmptyCluster() { - RpcClusterClientManager.scanUnusedClient(); + BackendConfig backendConfig = new BackendConfig(); + backendConfig.setNamingUrl("ip://127.0.0.1:9998"); + RpcClusterClientManager.shutdownBackendConfig(backendConfig); + } + + /** + * Triggers shutdownBackendConfig's catch branch: a client whose close() throws. + */ + @Test + public void testShutdownBackendConfigWhenCloseThrows() throws Exception { + BackendConfig backendConfig = new BackendConfig(); + backendConfig.setNamingUrl("ip://127.0.0.1:9001"); + ProtocolConfigTest config = new ProtocolConfigTest(); + config.failOnClose = true; + RpcClusterClientManager.getOrCreateClient(backendConfig, config); + // Should swallow exception and complete normally. + RpcClusterClientManager.shutdownBackendConfig(backendConfig); + Field field = RpcClusterClientManager.class.getDeclaredField("CLUSTER_MAP"); + field.setAccessible(true); + Map map = (Map) field.get(null); + assertNull(map.get(backendConfig)); + } + + /** + * Triggers close()'s catch branch when the client throws on close. + */ + @Test + public void testCloseWhenClientCloseThrows() { + BackendConfig backendConfig = new BackendConfig(); + backendConfig.setNamingUrl("ip://127.0.0.1:9002"); + ProtocolConfigTest config = new ProtocolConfigTest(); + config.failOnClose = true; + RpcClusterClientManager.getOrCreateClient(backendConfig, config); + // Should not propagate the exception out. + RpcClusterClientManager.close(); + RpcClusterClientManager.reset(); + } + + /** + * createRpcClientProxy: when open() throws, the partially-created proxy should be closed + * to avoid resource leak, and the exception should propagate. + */ + @Test + public void testCreateClientWhenOpenThrows() { + BackendConfig backendConfig = new BackendConfig(); + backendConfig.setNamingUrl("ip://127.0.0.1:9003"); + ProtocolConfigTest config = new ProtocolConfigTest(); + config.failOnOpen = true; + try { + RpcClusterClientManager.getOrCreateClient(backendConfig, config); + Assert.fail("expected exception"); + } catch (RuntimeException expected) { + // expected + } + } + + /** + * After CLOSED_FLAG is set, getOrCreateClient must reject new client creation. + */ + @Test + public void testGetOrCreateClientAfterClose() { + RpcClusterClientManager.close(); + BackendConfig backendConfig = new BackendConfig(); + backendConfig.setNamingUrl("ip://127.0.0.1:9004"); + ProtocolConfigTest config = new ProtocolConfigTest(); + try { + RpcClusterClientManager.getOrCreateClient(backendConfig, config); + Assert.fail("expected IllegalArgumentException"); + } catch (IllegalArgumentException expected) { + // expected + } finally { + RpcClusterClientManager.reset(); + } + } + + /** + * Direct invocation of checkAndReconnect: client is healthy, failureCount stays at 0. + */ + @Test + public void testCheckAndReconnectHealthyClient() throws Exception { + BackendConfig backendConfig = new BackendConfig(); + backendConfig.setNamingUrl("ip://127.0.0.1:9005"); + ProtocolConfigTest config = new ProtocolConfigTest(); + RpcClient client = RpcClusterClientManager.getOrCreateClient(backendConfig, config); + invokeCheckAndReconnect(); + // Healthy client must not be evicted. + Field field = RpcClusterClientManager.class.getDeclaredField("CLUSTER_MAP"); + field.setAccessible(true); + Map clusterMap = (Map) field.get(null); + assertEquals(1, clusterMap.get(backendConfig).size()); + assertEquals(0, getFailureCount(client)); + RpcClusterClientManager.shutdownBackendConfig(backendConfig); } + /** + * Direct invocation: client unavailable, failureCount accumulates and after MAX_RECONNECT_FAILURES + * the client is closed and evicted from the cache via the closeFuture hook. + */ + @Test + public void testCheckAndReconnectUnavailableTriggersEviction() throws Exception { + BackendConfig backendConfig = new BackendConfig(); + backendConfig.setNamingUrl("ip://127.0.0.1:9006"); + ProtocolConfigTest config = new ProtocolConfigTest(); + config.available = false; + RpcClient client = RpcClusterClientManager.getOrCreateClient(backendConfig, config); + + Field field = RpcClusterClientManager.class.getDeclaredField("CLUSTER_MAP"); + field.setAccessible(true); + Map clusterMap = (Map) field.get(null); + assertEquals(1, clusterMap.get(backendConfig).size()); + + // Run 4 times: failureCount grows but no eviction yet. + for (int i = 0; i < 4; i++) { + invokeCheckAndReconnect(); + } + assertEquals(4, getFailureCount(client)); + assertEquals(1, clusterMap.get(backendConfig).size()); + + // 5th run hits MAX_RECONNECT_FAILURES, triggers proxy.close() → closeFuture → + // CLUSTER_MAP.remove(uniqId, proxy). + invokeCheckAndReconnect(); + assertTrue(config.closed.get()); + Map sub = clusterMap.get(backendConfig); + // Either the inner map is now empty or the entry was removed. + if (sub != null) { + assertEquals(0, sub.size()); + } + } + + /** + * checkAndReconnect must early-return when CLOSED_FLAG is true. Also ensures the close branch + * inside checkAndReconnect handles client.close() throwing without breaking the loop. + */ + @Test + public void testCheckAndReconnectShortCircuitsOnClosed() throws Exception { + RpcClusterClientManager.close(); + // Should not throw. + invokeCheckAndReconnect(); + RpcClusterClientManager.reset(); + } + + /** + * Drives the catch branch of checkAndReconnect's per-proxy try block: proxy.close() throws. + */ + @Test + public void testCheckAndReconnectSwallowsCloseException() throws Exception { + BackendConfig backendConfig = new BackendConfig(); + backendConfig.setNamingUrl("ip://127.0.0.1:9007"); + ProtocolConfigTest config = new ProtocolConfigTest(); + config.available = false; + config.failOnClose = true; + RpcClient client = RpcClusterClientManager.getOrCreateClient(backendConfig, config); + + for (int i = 0; i < 5; i++) { + invokeCheckAndReconnect(); + } + // Must NOT throw out of the timer loop. Failure count should reach >= MAX (5). + assertTrue(getFailureCount(client) >= 5); + RpcClusterClientManager.shutdownBackendConfig(backendConfig); + } + + /** + * Exercises the RpcClientProxy delegate methods: open / createInvoker / closeFuture / + * isClosed / isAvailable / getProtocolConfig / equals / hashCode. + */ + @Test + public void testRpcClientProxyDelegation() { + BackendConfig backendConfig = new BackendConfig(); + backendConfig.setNamingUrl("ip://127.0.0.1:9008"); + ProtocolConfigTest config = new ProtocolConfigTest(); + config.invokerSupplier = () -> new StubConsumerInvoker(); + RpcClient proxy = RpcClusterClientManager.getOrCreateClient(backendConfig, config); + + assertTrue(proxy.isAvailable()); + assertFalse(proxy.isClosed()); + assertNotNull(proxy.closeFuture()); + assertNotNull(proxy.getProtocolConfig()); + // createInvoker delegates and wraps with ConsumerInvokerProxy. + ConsumerConfig cc = new ConsumerConfig<>(); + ConsumerInvoker invoker = proxy.createInvoker(cc); + // The wrapped invoker delegates getInterface / getConfig / getProtocolConfig / invoke. + assertNotNull(invoker.getInterface()); + assertNotNull(invoker.getConfig()); + assertNotNull(invoker.getProtocolConfig()); + assertNotNull(invoker.invoke(null)); + + // getOrCreateClient with the same key must return the cached proxy (same ref). + RpcClient sameKey = RpcClusterClientManager.getOrCreateClient(backendConfig, config); + Assert.assertSame(proxy, sameKey); + Assert.assertEquals(proxy.hashCode(), sameKey.hashCode()); + Assert.assertEquals(proxy, sameKey); + Assert.assertNotEquals(proxy, null); + Assert.assertNotEquals(proxy, "string"); + + RpcClusterClientManager.shutdownBackendConfig(backendConfig); + } + + /** + * Exercises ConsumerInvokerProxy.equals/hashCode through the client-created invoker chain. + */ + @Test + public void testConsumerInvokerProxyEquality() { + BackendConfig backendConfig = new BackendConfig(); + backendConfig.setNamingUrl("ip://127.0.0.1:9009"); + ProtocolConfigTest config = new ProtocolConfigTest(); + // Always wrap the SAME delegate so the two outer ConsumerInvokerProxy instances are equal. + StubConsumerInvoker shared = new StubConsumerInvoker(); + config.invokerSupplier = () -> shared; + RpcClient proxy = RpcClusterClientManager.getOrCreateClient(backendConfig, config); + + ConsumerConfig cc = new ConsumerConfig<>(); + ConsumerInvoker a = proxy.createInvoker(cc); + ConsumerInvoker b = proxy.createInvoker(cc); + Assert.assertEquals(a, b); + Assert.assertEquals(a.hashCode(), b.hashCode()); + Assert.assertEquals(a, a); + Assert.assertNotEquals(a, null); + Assert.assertNotEquals(a, "string"); + + RpcClusterClientManager.shutdownBackendConfig(backendConfig); + } + + /** + * Stub ConsumerInvoker for delegation/equality tests. + */ + private static class StubConsumerInvoker implements ConsumerInvoker { + + @Override + public Class getInterface() { + return Object.class; + } + + @Override + public java.util.concurrent.CompletionStage invoke( + com.tencent.trpc.core.rpc.Request request) { + return java.util.concurrent.CompletableFuture.completedFuture(null); + } + + @Override + public ConsumerConfig getConfig() { + return new ConsumerConfig<>(); + } + + @Override + public ProtocolConfig getProtocolConfig() { + return new ProtocolConfig(); + } + } + + /** + * Lazy timer start: the first getOrCreateClient triggers ensureReconnectCheckerStarted; the + * future field becomes non-null. Calling getOrCreateClient again must NOT replace it. + */ + @Test + public void testReconnectCheckerStartedLazilyAndOnce() throws Exception { + BackendConfig backendConfig = new BackendConfig(); + backendConfig.setNamingUrl("ip://127.0.0.1:9010"); + ProtocolConfigTest config = new ProtocolConfigTest(); + RpcClusterClientManager.getOrCreateClient(backendConfig, config); + + Field f = RpcClusterClientManager.class.getDeclaredField("reconnectCheckerFuture"); + f.setAccessible(true); + Object first = f.get(null); + assertNotNull("timer should be started", first); + + // Second call must not replace it. + RpcClusterClientManager.getOrCreateClient(backendConfig, config); + Object second = f.get(null); + Assert.assertSame(first, second); + + RpcClusterClientManager.shutdownBackendConfig(backendConfig); + } + + /* ---------------------- helpers ---------------------- */ + + private static void invokeCheckAndReconnect() throws Exception { + Method m = RpcClusterClientManager.class.getDeclaredMethod("checkAndReconnect"); + m.setAccessible(true); + m.invoke(null); + } + + private static int getFailureCount(RpcClient proxy) throws Exception { + Field f = proxy.getClass().getDeclaredField("failureCount"); + f.setAccessible(true); + return ((java.util.concurrent.atomic.AtomicInteger) f.get(proxy)).get(); + } + + /* ---------------------- mock ProtocolConfig ---------------------- */ + private static class ProtocolConfigTest extends ProtocolConfig { + boolean available = true; + boolean failOnOpen = false; + boolean failOnClose = false; + final AtomicBoolean closed = new AtomicBoolean(false); + java.util.function.Supplier> invokerSupplier; + @Override public RpcClient createClient() { return new RpcClient() { + private final CloseFuture closeFuture = new CloseFuture<>(); + @Override public void open() throws TRpcException { + if (failOnOpen) { + throw new RuntimeException("boom-open"); + } } @Override public boolean isClosed() { - return false; + return closed.get(); } @Override public boolean isAvailable() { - return true; + return available && !closed.get(); } @Override @@ -125,16 +451,27 @@ public ProtocolConfig getProtocolConfig() { @Override public void close() { + closed.set(true); + if (failOnClose) { + // Still complete the future first so cache eviction proceeds. + closeFuture.complete(null); + throw new RuntimeException("boom-close"); + } + closeFuture.complete(null); } + @SuppressWarnings({"unchecked", "rawtypes"}) @Override public ConsumerInvoker createInvoker(ConsumerConfig consumerConfig) { + if (invokerSupplier != null) { + return (ConsumerInvoker) invokerSupplier.get(); + } return null; } @Override public CloseFuture closeFuture() { - return new CloseFuture(); + return closeFuture; } }; } diff --git a/trpc-core/src/test/java/com/tencent/trpc/core/cluster/RpcClusterLoggerLevelTest.java b/trpc-core/src/test/java/com/tencent/trpc/core/cluster/RpcClusterLoggerLevelTest.java new file mode 100644 index 000000000..e7d586ac0 --- /dev/null +++ b/trpc-core/src/test/java/com/tencent/trpc/core/cluster/RpcClusterLoggerLevelTest.java @@ -0,0 +1,198 @@ +/* + * Tencent is pleased to support the open source community by making tRPC available. + * + * Copyright (C) 2023 Tencent. + * All rights reserved. + * + * If you have downloaded a copy of the tRPC source code from Tencent, + * please note that tRPC source code is licensed under the Apache 2.0 License, + * A copy of the Apache 2.0 License can be found in the LICENSE file. + */ + +package com.tencent.trpc.core.cluster; + +import static org.junit.Assert.assertNull; + +import com.tencent.trpc.core.common.config.BackendConfig; +import com.tencent.trpc.core.common.config.ConsumerConfig; +import com.tencent.trpc.core.common.config.ProtocolConfig; +import com.tencent.trpc.core.exception.TRpcException; +import com.tencent.trpc.core.rpc.CloseFuture; +import com.tencent.trpc.core.rpc.ConsumerInvoker; +import com.tencent.trpc.core.rpc.RpcClient; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.config.Configuration; +import org.apache.logging.log4j.core.config.LoggerConfig; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Covers the {@code logger.isDebugEnabled() == false} branches in + * {@link RpcClusterClientManager}. The test classpath's {@code log4j2.xml} configures the root + * logger at {@code DEBUG}, so the {@code true} branches are already exercised by other tests. + * + *

Strategy: temporarily raise the {@link RpcClusterClientManager} logger level to {@code INFO} + * via the standard log4j2 API (same API as {@code Log4j2LoggerProcessUnit}); run the same code + * paths; restore the level afterwards. No reflection / no PowerMock.

+ */ +public class RpcClusterLoggerLevelTest { + + private static final String TARGET_LOGGER = RpcClusterClientManager.class.getName(); + + private Level originalLevel; + + @Before + public void setUp() throws Exception { + RpcClusterClientManager.reset(); + clearClusterMap(); + // Snapshot original level and force INFO so isDebugEnabled() returns false. + originalLevel = setLoggerLevel(TARGET_LOGGER, Level.INFO); + } + + @After + public void tearDown() throws Exception { + // Restore original level (DEBUG inherited from root by default). + setLoggerLevel(TARGET_LOGGER, originalLevel); + clearClusterMap(); + RpcClusterClientManager.reset(); + } + + /** + * Drives all three {@code if (logger.isDebugEnabled())} blocks in the source with the flag + * evaluating to {@code false}, covering the previously-missed branches: + *
    + *
  • {@code shutdownBackendConfig} success path,
  • + *
  • {@code getOrCreateClient} closeFuture hook,
  • + *
  • {@code checkAndReconnect} per-client failure log.
  • + *
+ */ + @Test + public void testDebugBranchesWhenLoggerDisabled() throws Exception { + // Sanity check: after setLoggerLevel(INFO), the manager's logger must report + // isDebugEnabled() == false; otherwise our coverage assumption is wrong. + com.tencent.trpc.core.logger.Logger mgrLogger = + com.tencent.trpc.core.logger.LoggerFactory.getLogger(RpcClusterClientManager.class); + org.junit.Assert.assertFalse("logger.isDebugEnabled() must be false to cover the missed branch", + mgrLogger.isDebugEnabled()); + + BackendConfig backendConfig = new BackendConfig(); + backendConfig.setNamingUrl("ip://127.0.0.1:9100"); + StubProtocolConfig pConfig = new StubProtocolConfig(); + + // (1) Triggers getOrCreateClient → eventually closeFuture hook (via shutdown below) and + // (3) checkAndReconnect's failure-log path (since available is forced false next). + RpcClient client = RpcClusterClientManager.getOrCreateClient(backendConfig, pConfig); + org.junit.Assert.assertNotNull(client); + + pConfig.available = false; + invokeCheckAndReconnect(); + + // (2) shutdownBackendConfig success path. + RpcClusterClientManager.shutdownBackendConfig(backendConfig); + Field f = RpcClusterClientManager.class.getDeclaredField("CLUSTER_MAP"); + f.setAccessible(true); + assertNull(((Map) f.get(null)).get(backendConfig)); + } + + /* ---------------- helpers ---------------- */ + + /** + * Set the level of {@code loggerName} via log4j2 API; returns the previous level so the + * caller can restore it. Mirrors {@code Log4j2LoggerProcessUnit#changeLoggerLevel}. + */ + private static Level setLoggerLevel(String loggerName, Level newLevel) { + LoggerContext ctx = (LoggerContext) LogManager.getContext(false); + Configuration config = ctx.getConfiguration(); + LoggerConfig loggerConfig = config.getLoggerConfig(loggerName); + Level previous = loggerConfig.getLevel(); + // If the resolved logger config is the parent (root), create a dedicated one so we don't + // accidentally raise the level for other tests sharing the root logger. + if (!loggerConfig.getName().equals(loggerName)) { + LoggerConfig dedicated = LoggerConfig.createLogger(false, newLevel, loggerName, + "true", new org.apache.logging.log4j.core.config.AppenderRef[0], null, config, + null); + config.addLogger(loggerName, dedicated); + } else { + loggerConfig.setLevel(newLevel); + } + ctx.updateLoggers(); + return previous; + } + + private static void invokeCheckAndReconnect() throws Exception { + Method m = RpcClusterClientManager.class.getDeclaredMethod("checkAndReconnect"); + m.setAccessible(true); + m.invoke(null); + } + + private static void clearClusterMap() throws Exception { + Field f = RpcClusterClientManager.class.getDeclaredField("CLUSTER_MAP"); + f.setAccessible(true); + ((Map) f.get(null)).clear(); + } + + /* ---------------- stubs ---------------- */ + + private static class StubProtocolConfig extends ProtocolConfig { + + volatile boolean available = true; + final AtomicBoolean closed = new AtomicBoolean(false); + + @Override + public RpcClient createClient() { + return new StubRpcClient(this); + } + } + + private static class StubRpcClient implements RpcClient { + + private final StubProtocolConfig owner; + private final CloseFuture closeFuture = new CloseFuture<>(); + + StubRpcClient(StubProtocolConfig owner) { + this.owner = owner; + } + + @Override + public void open() throws TRpcException { + } + + @Override + public boolean isClosed() { + return owner.closed.get(); + } + + @Override + public boolean isAvailable() { + return owner.available && !owner.closed.get(); + } + + @Override + public ProtocolConfig getProtocolConfig() { + return owner; + } + + @Override + public void close() { + owner.closed.set(true); + closeFuture.complete(null); + } + + @Override + public ConsumerInvoker createInvoker(ConsumerConfig consumerConfig) { + return null; + } + + @Override + public CloseFuture closeFuture() { + return closeFuture; + } + } +} diff --git a/trpc-core/src/test/java/com/tencent/trpc/core/cluster/RpcClusterSchedulerRejectTest.java b/trpc-core/src/test/java/com/tencent/trpc/core/cluster/RpcClusterSchedulerRejectTest.java new file mode 100644 index 000000000..c3b401bb8 --- /dev/null +++ b/trpc-core/src/test/java/com/tencent/trpc/core/cluster/RpcClusterSchedulerRejectTest.java @@ -0,0 +1,169 @@ +/* + * Tencent is pleased to support the open source community by making tRPC available. + * + * Copyright (C) 2023 Tencent. + * All rights reserved. + * + * If you have downloaded a copy of the tRPC source code from Tencent, + * please note that tRPC source code is licensed under the Apache 2.0 License, + * A copy of the Apache 2.0 License can be found in the LICENSE file. + */ + +package com.tencent.trpc.core.cluster; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.when; + +import com.tencent.trpc.core.common.config.BackendConfig; +import com.tencent.trpc.core.common.config.ConsumerConfig; +import com.tencent.trpc.core.common.config.ProtocolConfig; +import com.tencent.trpc.core.exception.TRpcException; +import com.tencent.trpc.core.rpc.CloseFuture; +import com.tencent.trpc.core.rpc.ConsumerInvoker; +import com.tencent.trpc.core.rpc.RpcClient; +import com.tencent.trpc.core.worker.WorkerPoolManager; +import java.lang.reflect.Field; +import java.util.Map; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +/** + * Drives the {@code catch (Throwable)} branch in + * {@link RpcClusterClientManager#ensureReconnectCheckerStarted()}: when the shared scheduler + * rejects the periodic task, the manager must swallow the exception and leave + * {@code reconnectCheckerFuture} as {@code null}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({WorkerPoolManager.class}) +@PowerMockIgnore({"javax.management.*", "javax.security.*", "javax.ws.*", "javax.net.*"}) +public class RpcClusterSchedulerRejectTest { + + @Before + public void setUp() throws Exception { + RpcClusterClientManager.reset(); + clearReconnectCheckerFuture(); + clearClusterMap(); + } + + @After + public void tearDown() throws Exception { + clearReconnectCheckerFuture(); + clearClusterMap(); + RpcClusterClientManager.reset(); + } + + @Test + public void testSchedulerExceptionCatchBranch() throws Exception { + ScheduledExecutorService rejecting = Mockito.mock(ScheduledExecutorService.class); + when(rejecting.scheduleAtFixedRate(Mockito.any(Runnable.class), Mockito.anyLong(), + Mockito.anyLong(), Mockito.any())).thenThrow(new RejectedExecutionException("rejected")); + + PowerMockito.mockStatic(WorkerPoolManager.class); + PowerMockito.when(WorkerPoolManager.getShareScheduler()).thenReturn(rejecting); + + BackendConfig backendConfig = new BackendConfig(); + backendConfig.setNamingUrl("ip://127.0.0.1:9101"); + // Must NOT propagate the RejectedExecutionException. + RpcClient client = RpcClusterClientManager.getOrCreateClient(backendConfig, + new StubProtocolConfig()); + assertNotNull(client); + + // Catch branch leaves reconnectCheckerFuture as null. + Field f = RpcClusterClientManager.class.getDeclaredField("reconnectCheckerFuture"); + f.setAccessible(true); + assertNull("scheduler rejected → future must remain null", f.get(null)); + + RpcClusterClientManager.shutdownBackendConfig(backendConfig); + } + + /* ---------------- helpers ---------------- */ + + private static void clearClusterMap() throws Exception { + Field f = RpcClusterClientManager.class.getDeclaredField("CLUSTER_MAP"); + f.setAccessible(true); + ((Map) f.get(null)).clear(); + } + + private static void clearReconnectCheckerFuture() throws Exception { + Field f = RpcClusterClientManager.class.getDeclaredField("reconnectCheckerFuture"); + f.setAccessible(true); + Object cur = f.get(null); + if (cur != null) { + try { + ((java.util.concurrent.ScheduledFuture) cur).cancel(true); + } catch (Throwable ignore) { + // ignore + } + f.set(null, null); + } + } + + /* ---------------- stubs ---------------- */ + + private static class StubProtocolConfig extends ProtocolConfig { + + volatile boolean available = true; + final AtomicBoolean closed = new AtomicBoolean(false); + + @Override + public RpcClient createClient() { + return new StubRpcClient(this); + } + } + + private static class StubRpcClient implements RpcClient { + + private final StubProtocolConfig owner; + private final CloseFuture closeFuture = new CloseFuture<>(); + + StubRpcClient(StubProtocolConfig owner) { + this.owner = owner; + } + + @Override + public void open() throws TRpcException { + } + + @Override + public boolean isClosed() { + return owner.closed.get(); + } + + @Override + public boolean isAvailable() { + return owner.available && !owner.closed.get(); + } + + @Override + public ProtocolConfig getProtocolConfig() { + return owner; + } + + @Override + public void close() { + owner.closed.set(true); + closeFuture.complete(null); + } + + @Override + public ConsumerInvoker createInvoker(ConsumerConfig consumerConfig) { + return null; + } + + @Override + public CloseFuture closeFuture() { + return closeFuture; + } + } +} diff --git a/trpc-core/src/test/java/com/tencent/trpc/core/cluster/def/DefClusterInvokerCloseFutureTest.java b/trpc-core/src/test/java/com/tencent/trpc/core/cluster/def/DefClusterInvokerCloseFutureTest.java new file mode 100644 index 000000000..14beda0f1 --- /dev/null +++ b/trpc-core/src/test/java/com/tencent/trpc/core/cluster/def/DefClusterInvokerCloseFutureTest.java @@ -0,0 +1,260 @@ +/* + * Tencent is pleased to support the open source community by making tRPC available. + * + * Copyright (C) 2023 Tencent. + * All rights reserved. + * + * If you have downloaded a copy of the tRPC source code from Tencent, + * please note that tRPC source code is licensed under the Apache 2.0 License, + * A copy of the Apache 2.0 License can be found in the LICENSE file. + */ + +package com.tencent.trpc.core.cluster.def; + +import com.tencent.trpc.core.cluster.def.DefClusterInvoker.ConsumerInvokerProxy; +import com.tencent.trpc.core.common.config.BackendConfig; +import com.tencent.trpc.core.common.config.ConsumerConfig; +import com.tencent.trpc.core.common.config.ProtocolConfig; +import com.tencent.trpc.core.exception.TRpcException; +import com.tencent.trpc.core.rpc.CloseFuture; +import com.tencent.trpc.core.rpc.ConsumerInvoker; +import com.tencent.trpc.core.rpc.GenericClient; +import com.tencent.trpc.core.rpc.Request; +import com.tencent.trpc.core.rpc.Response; +import com.tencent.trpc.core.rpc.RpcClient; +import com.tencent.trpc.core.selector.ServiceInstance; +import com.tencent.trpc.core.utils.FutureUtils; +import java.lang.reflect.Field; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Plain-JUnit (no PowerMock) tests for the long-connection logic in {@link DefClusterInvoker}: + * the {@code getInvoker} fast path, the {@code createInvoker} stale-evict (CAS) path, and the + * {@code closeFuture}-driven cache cleanup. + * + *

These tests directly manipulate the {@code invokerCache} via reflection rather than going + * through {@link com.tencent.trpc.core.cluster.RpcClusterClientManager#getOrCreateClient} (which + * requires SPI-registered factories that are not available in pure unit tests).

+ */ +public class DefClusterInvokerCloseFutureTest { + + private DefClusterInvoker invoker; + + @Before + public void setUp() { + BackendConfig backendConfig = new BackendConfig(); + backendConfig.setNamingUrl("ip://127.0.0.1:18001"); + backendConfig.setServiceInterface(GenericClient.class); + backendConfig.setNetwork("tcp"); + backendConfig.setDefault(); + + ConsumerConfig consumerConfig = new ConsumerConfig<>(); + consumerConfig.setBackendConfig(backendConfig); + consumerConfig.setServiceInterface(GenericClient.class); + + invoker = new DefClusterInvoker<>(consumerConfig); + } + + /** + * getInvoker fast path: cached proxy is available → returned directly. + */ + @Test + public void testGetInvokerReturnsCachedAvailable() throws Exception { + ConcurrentMap> cache = getCache(); + ServiceInstance instance = new ServiceInstance("127.0.0.1", 18001); + String key = "127.0.0.1:18001:tcp"; + + TestRpcClient client = new TestRpcClient(); + ConsumerInvokerProxy proxy = new ConsumerInvokerProxy<>( + stubInvoker(client.getProtocolConfig()), client); + cache.put(key, proxy); + + ConsumerInvokerProxy got = invoker.getInvoker(instance); + Assert.assertSame(proxy, got); + } + + /** + * getInvoker fall-through: cached proxy is unavailable → it goes to createInvoker, which + * builds a fresh invoker (SPI factory for "trpc" is registered in this module) and replaces + * the stale entry. We assert: the returned invoker is NOT the stale one. + */ + @Test + public void testGetInvokerFallsThroughOnUnavailable() throws Exception { + ConcurrentMap> cache = getCache(); + ServiceInstance instance = new ServiceInstance("127.0.0.1", 18001); + String key = "127.0.0.1:18001:tcp"; + + TestRpcClient client = new TestRpcClient(); + client.available.set(false); + ConsumerInvokerProxy stale = new ConsumerInvokerProxy<>( + stubInvoker(client.getProtocolConfig()), client); + cache.put(key, stale); + + ConsumerInvokerProxy got; + try { + got = invoker.getInvoker(instance); + } catch (TRpcException ex) { + // SPI factory may not be available in some environments; in that case createInvoker + // throws. Either outcome confirms the fast path correctly rejected the stale entry. + Assert.assertTrue(ex.getMessage().contains("Create rpc client")); + return; + } + Assert.assertNotSame("Stale entry must be replaced", stale, got); + Assert.assertNotSame(stale, cache.get(key)); + // Clean up the freshly created RpcClient to avoid leaks across tests. + com.tencent.trpc.core.cluster.RpcClusterClientManager.shutdownBackendConfig( + invoker.getBackendConfig()); + } + + /** + * Direct assertion of the bug fix: the closeFuture hook installed inside createInvoker uses + * CAS-remove. We simulate the hook semantics here to lock down the invariant. + * + *

The actual hook is a lambda registered when a fresh invoker is created. We verify the + * same semantics by constructing two proxies, putting a "newer" one in the cache, then + * applying CAS-remove with the "older" key/value pair. The newer entry must NOT be evicted. + */ + @Test + public void testCasRemoveDoesNotEvictNewerEntry() throws Exception { + ConcurrentMap> cache = getCache(); + String key = "127.0.0.1:18001:tcp"; + + TestRpcClient clientA = new TestRpcClient(); + ConsumerInvokerProxy a = new ConsumerInvokerProxy<>( + stubInvoker(clientA.getProtocolConfig()), clientA); + TestRpcClient clientB = new TestRpcClient(); + ConsumerInvokerProxy b = new ConsumerInvokerProxy<>( + stubInvoker(clientB.getProtocolConfig()), clientB); + + cache.put(key, b); // current value is B + + // Simulate the closeFuture hook for A firing while B is the current cache value. + boolean removed = cache.remove(key, a); + Assert.assertFalse("CAS-remove must miss: A is no longer the current value", removed); + Assert.assertSame(b, cache.get(key)); + + // Simulate B's hook firing → evicts. + Assert.assertTrue(cache.remove(key, b)); + Assert.assertNull(cache.get(key)); + } + + /** + * Sanity: ConsumerInvokerProxy.isAvailable() reflects the underlying client. + */ + @Test + public void testProxyIsAvailableTracksUnderlyingClient() { + TestRpcClient client = new TestRpcClient(); + ConsumerInvokerProxy proxy = new ConsumerInvokerProxy<>( + stubInvoker(client.getProtocolConfig()), client); + Assert.assertTrue(proxy.isAvailable()); + client.available.set(false); + Assert.assertFalse(proxy.isAvailable()); + } + + /** + * Sanity: ConsumerInvokerProxy.invoke fills CallInfo on the request and reports to the + * selector (best-effort; selector lookup may return null which is tolerated). + */ + @Test + public void testProxyInvokeFillsCallInfoAndReports() { + TestRpcClient client = new TestRpcClient(); + ConsumerInvokerProxy proxy = new ConsumerInvokerProxy<>( + stubInvoker(client.getProtocolConfig()), client); + + com.tencent.trpc.core.rpc.def.DefRequest request = new com.tencent.trpc.core.rpc.def.DefRequest(); + com.tencent.trpc.core.rpc.RpcInvocation invocation = new com.tencent.trpc.core.rpc.RpcInvocation(); + invocation.setFunc("any"); + request.setInvocation(invocation); + + java.util.HashMap params = new java.util.HashMap<>(); + params.put("container_name", "test-container"); + params.put("set_division", "test-set"); + ServiceInstance instance = new ServiceInstance("127.0.0.1", 18001, params); + + Assert.assertNotNull(proxy.invoke(request, instance)); + // The invoke wraps responses; the underlying stub returns a successful future. + } + + /* ---------------------- helpers ---------------------- */ + + @SuppressWarnings("unchecked") + private ConcurrentMap> getCache() throws Exception { + Field f = DefClusterInvoker.class.getDeclaredField("invokerCache"); + f.setAccessible(true); + return (ConcurrentMap>) f.get(invoker); + } + + private ConsumerInvoker stubInvoker(ProtocolConfig pc) { + return new ConsumerInvoker() { + @Override + public Class getInterface() { + return GenericClient.class; + } + + @Override + public CompletionStage invoke(Request request) { + return FutureUtils.newSuccessFuture(null); + } + + @Override + public ConsumerConfig getConfig() { + return invoker.getConfig(); + } + + @Override + public ProtocolConfig getProtocolConfig() { + return pc; + } + }; + } + + /* ---------------------- mock ---------------------- */ + + private static class TestRpcClient implements RpcClient { + + final AtomicBoolean available = new AtomicBoolean(true); + final AtomicBoolean closed = new AtomicBoolean(false); + final CloseFuture closeFuture = new CloseFuture<>(); + private final ProtocolConfig protocolConfig = new ProtocolConfig(); + + @Override + public void open() throws TRpcException { + } + + @Override + public ConsumerInvoker createInvoker(ConsumerConfig consumerConfig) { + return null; + } + + @Override + public void close() { + closed.set(true); + closeFuture.complete(null); + } + + @Override + public CloseFuture closeFuture() { + return closeFuture; + } + + @Override + public boolean isAvailable() { + return available.get() && !closed.get(); + } + + @Override + public boolean isClosed() { + return closed.get(); + } + + @Override + public ProtocolConfig getProtocolConfig() { + return protocolConfig; + } + } +} diff --git a/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/Http2ConsumerInvoker.java b/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/Http2ConsumerInvoker.java index fd6b57ab4..6bf168870 100644 --- a/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/Http2ConsumerInvoker.java +++ b/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/Http2ConsumerInvoker.java @@ -137,7 +137,11 @@ private Response handleResponse(Request request, SimpleHttpResponse simpleHttpRe */ private SimpleHttpResponse execute(Request request, int requestTimeout, SimpleHttpRequest simpleHttpRequest) throws Exception { - CloseableHttpAsyncClient httpAsyncClient = ((Http2cRpcClient) client).getHttpAsyncClient(); + Http2cRpcClient http2cRpcClient = (Http2cRpcClient) client; + // Refresh idle counter so the cluster manager's reconnect-check timer keeps treating + // this client as healthy while it's actively serving requests. + http2cRpcClient.markUsed(); + CloseableHttpAsyncClient httpAsyncClient = http2cRpcClient.getHttpAsyncClient(); Future httpResponseFuture = httpAsyncClient.execute(simpleHttpRequest, new FutureCallback() { @Override diff --git a/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/Http2cRpcClient.java b/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/Http2cRpcClient.java index f00d9858d..4bbeb980e 100644 --- a/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/Http2cRpcClient.java +++ b/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/Http2cRpcClient.java @@ -19,6 +19,7 @@ import com.tencent.trpc.core.rpc.AbstractRpcClient; import com.tencent.trpc.core.rpc.ConsumerInvoker; import java.io.IOException; +import java.util.concurrent.TimeUnit; import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; import org.apache.hc.client5.http.impl.async.HttpAsyncClients; @@ -29,10 +30,23 @@ public class Http2cRpcClient extends AbstractRpcClient { private static final Logger logger = LoggerFactory.getLogger(HttpRpcClient.class); + /** + * If this client has not been used by any RPC for longer than this window, the periodic + * scanner in {@code RpcClusterClientManager} will treat it as unavailable and eventually + * close & evict it. The window is intentionally large so that any actively-used client is + * never affected. See {@link HttpRpcClient} for the same mechanism on the HTTP/1.1 path. + */ + private static final long IDLE_UNAVAILABLE_THRESHOLD_NANOS = TimeUnit.MINUTES.toNanos(10); + /** * Asynchronous HTTP client */ protected CloseableHttpAsyncClient httpAsyncClient; + /** + * Timestamp (System.nanoTime()) of the most recent RPC sent through this client. Updated by + * {@link Http2ConsumerInvoker} on each request. + */ + private volatile long lastUsedNanos = System.nanoTime(); public Http2cRpcClient(ProtocolConfig config) { setConfig(config); @@ -74,6 +88,27 @@ public ConsumerInvoker createInvoker(ConsumerConfig consumerConfig) { return new Http2ConsumerInvoker<>(this, consumerConfig, protocolConfig); } + /** + * Record that this client just served (or is about to serve) an RPC. Called by + * {@link Http2ConsumerInvoker} on every request. + */ + public void markUsed() { + lastUsedNanos = System.nanoTime(); + } + + /** + * Reports the client as unavailable if it has been idle longer than + * {@link #IDLE_UNAVAILABLE_THRESHOLD_NANOS}. This lets the cluster manager's periodic + * reconnect-check timer eventually evict orphaned clients (e.g. after backend IP rotation). + */ + @Override + public boolean isAvailable() { + if (!super.isAvailable()) { + return false; + } + return (System.nanoTime() - lastUsedNanos) <= IDLE_UNAVAILABLE_THRESHOLD_NANOS; + } + public CloseableHttpAsyncClient getHttpAsyncClient() { return httpAsyncClient; } diff --git a/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/HttpConsumerInvoker.java b/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/HttpConsumerInvoker.java index 7d101e8c6..4a6b5a318 100644 --- a/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/HttpConsumerInvoker.java +++ b/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/HttpConsumerInvoker.java @@ -63,7 +63,11 @@ public HttpConsumerInvoker(HttpRpcClient client, ConsumerConfig config, public Response send(Request request) throws Exception { HttpPost httpPost = buildRequest(request); - CloseableHttpClient httpClient = ((HttpRpcClient) client).getHttpClient(); + HttpRpcClient httpRpcClient = (HttpRpcClient) client; + // Refresh idle counter so the cluster manager's reconnect-check timer keeps treating + // this client as healthy while it's actively serving requests. + httpRpcClient.markUsed(); + CloseableHttpClient httpClient = httpRpcClient.getHttpClient(); try (CloseableHttpResponse httpResponse = httpClient.execute(httpPost)) { return handleResponse(request, httpResponse); diff --git a/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/HttpRpcClient.java b/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/HttpRpcClient.java index 28c713206..8a508d095 100644 --- a/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/HttpRpcClient.java +++ b/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/HttpRpcClient.java @@ -18,18 +18,54 @@ import com.tencent.trpc.core.rpc.AbstractRpcClient; import com.tencent.trpc.core.rpc.ConsumerInvoker; import java.io.IOException; +import java.util.concurrent.TimeUnit; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; /** * HTTP protocol client. + *

Long-connection mode: connections are pooled by Apache {@link PoolingHttpClientConnectionManager} + * and reused across requests via HTTP/1.1 keep-alive. Two safeguards are enabled by default to + * keep the pool healthy in long-running processes: + *

    + *
  • {@code validateAfterInactivity}: re-check a connection's liveness before reuse if it + * has been idle for a short period (avoids the classic "stale connection / NoHttpResponseException" + * when the server has half-closed an idle keep-alive connection);
  • + *
  • {@code evictIdleConnections}: a small background thread evicts connections that have + * been idle longer than the configured limit, freeing OS file descriptors.
  • + *
*/ public class HttpRpcClient extends AbstractRpcClient { private static final Logger logger = LoggerFactory.getLogger(HttpRpcClient.class); + /** + * Validate a pooled connection before reuse if it has been idle for at least this many + * milliseconds. Cheap heuristic that catches most server-side half-closed keep-alive sockets. + */ + private static final int VALIDATE_AFTER_INACTIVITY_MS = 2000; + /** + * Evict pooled connections that have been idle for longer than this duration. + */ + private static final long EVICT_IDLE_CONNECTIONS_SECONDS = 60L; + /** + * If this client has not been used by any RPC for longer than this window, the periodic + * scanner in {@code RpcClusterClientManager} will treat it as unavailable. After + * a few consecutive unavailable observations the client gets closed and evicted from the + * cluster cache, which is how we reclaim {@link HttpRpcClient} instances orphaned by backend + * IP rotation (e.g. K8s pod IP drift). The window is intentionally large so that any + * actively-used client is never affected. + */ + private static final long IDLE_UNAVAILABLE_THRESHOLD_NANOS = + java.util.concurrent.TimeUnit.MINUTES.toNanos(10); + private CloseableHttpClient httpClient; + /** + * Timestamp (System.nanoTime()) of the most recent RPC sent through this client. Updated by + * {@link HttpConsumerInvoker} on each send. + */ + private volatile long lastUsedNanos = System.nanoTime(); public HttpRpcClient(ProtocolConfig config) { setConfig(config); @@ -44,7 +80,16 @@ protected void doOpen() { // If there is only one route, the maximum number of connections for a single route is the same // as the maximum number of connections for the entire connection pool. cm.setDefaultMaxPerRoute(maxConns); - httpClient = HttpClients.custom().setConnectionManager(cm).build(); + // Re-validate idle pooled connections before reuse so we do not send a request through a + // socket the server has already half-closed. + cm.setValidateAfterInactivity(VALIDATE_AFTER_INACTIVITY_MS); + httpClient = HttpClients.custom() + .setConnectionManager(cm) + // Background eviction of stale & long-idle connections; keeps the pool tidy in + // long-running processes without affecting hot connections. + .evictExpiredConnections() + .evictIdleConnections(EVICT_IDLE_CONNECTIONS_SECONDS, TimeUnit.SECONDS) + .build(); } @Override @@ -64,6 +109,28 @@ public ConsumerInvoker createInvoker(ConsumerConfig consumerConfig) { return new HttpConsumerInvoker<>(this, consumerConfig, protocolConfig); } + /** + * Record that this client just served (or is about to serve) an RPC. Called by + * {@link HttpConsumerInvoker} on every request. + */ + public void markUsed() { + lastUsedNanos = System.nanoTime(); + } + + /** + * Reports the client as unavailable if it has been idle longer than + * {@link #IDLE_UNAVAILABLE_THRESHOLD_NANOS}. This lets the cluster manager's periodic + * reconnect-check timer eventually evict orphaned clients (e.g. after backend IP rotation) + * even though Apache HttpClient itself has no notion of "remote permanently gone". + */ + @Override + public boolean isAvailable() { + if (!super.isAvailable()) { + return false; + } + return (System.nanoTime() - lastUsedNanos) <= IDLE_UNAVAILABLE_THRESHOLD_NANOS; + } + public CloseableHttpClient getHttpClient() { return httpClient; } diff --git a/trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/HttpMultiPortNamingUrlConcurrentTest.java b/trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/HttpMultiPortNamingUrlConcurrentTest.java new file mode 100644 index 000000000..b4e400087 --- /dev/null +++ b/trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/HttpMultiPortNamingUrlConcurrentTest.java @@ -0,0 +1,256 @@ +/* + * Tencent is pleased to support the open source community by making tRPC available. + * + * Copyright (C) 2023 Tencent. + * All rights reserved. + * + * If you have downloaded a copy of the tRPC source code from Tencent, + * please note that tRPC source code is licensed under the Apache 2.0 License, + * A copy of the Apache 2.0 License can be found in the LICENSE file. + */ + +package com.tencent.trpc.proto.http; + +import static com.tencent.trpc.transport.http.common.Constants.HTTP_SCHEME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import com.tencent.trpc.core.common.ConfigManager; +import com.tencent.trpc.core.common.config.BackendConfig; +import com.tencent.trpc.core.common.config.ConsumerConfig; +import com.tencent.trpc.core.common.config.ProviderConfig; +import com.tencent.trpc.core.common.config.ServerConfig; +import com.tencent.trpc.core.common.config.ServiceConfig; +import com.tencent.trpc.core.rpc.RpcClientContext; +import com.tencent.trpc.core.rpc.RpcContext; +import com.tencent.trpc.core.utils.NetUtils; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import tests.service.GreeterService; +import tests.service.HelloRequestProtocol.HelloRequest; +import tests.service.HelloRequestProtocol.HelloResponse; +import tests.service.TestBeanConvertWithGetMethodReq; +import tests.service.TestBeanConvertWithGetMethodRsp; + +/** + * HTTP-protocol counterpart of the tRPC concurrent multi-port test in {@code trpc-proto-standard}. + * + *

Setup: 10 standalone HTTP servers (jetty) on consecutive ports backed by + * {@link PortAwareGreeterServiceImpl} that tags each response with its own listening port. + * One shared {@link BackendConfig} lists all 10 endpoints in the comma-separated namingUrl; + * 100 concurrent threads × 1000 requests each fan-out via {@code ip://} random load balance.

+ * + *

Final assertions: + *

    + *
  • every request succeeds and the echoed message matches the request payload,
  • + *
  • every backend port is hit at least once,
  • + *
  • distribution roughly balanced — random over N=10 with R=100000 trials, + * expected 10000 / port; tolerated range {@code [2000, 20000]}, far exceeding any + * realistic random outlier.
  • + *
+ *

+ */ +public class HttpMultiPortNamingUrlConcurrentTest { + + private static final int BASE_PORT = 18500; + private static final int SERVER_COUNT = 10; + private static final int THREAD_COUNT = 100; + private static final int CYCLE_PER_THREAD = 1000; + + private static final int REQUEST_TIMEOUT_MS = 60_000; + private static final int MAX_CONNECTIONS = 20480; + + private static ServerConfig serverConfig; + + @BeforeClass + public static void startHttpServers() { + ConfigManager.stopTest(); + ConfigManager.startTest(); + + HashMap providers = new HashMap<>(); + for (int i = 0; i < SERVER_COUNT; i++) { + int port = BASE_PORT + i; + ProviderConfig pc = new ProviderConfig<>(); + pc.setServiceInterface(GreeterService.class); + pc.setRef(new PortAwareGreeterServiceImpl(port)); + + ServiceConfig serviceConfig = new ServiceConfig(); + serviceConfig.setName("multi-port-server-" + port); + serviceConfig.getProviderConfigs().add(pc); + serviceConfig.setIp(NetUtils.LOCAL_HOST); + serviceConfig.setPort(port); + serviceConfig.setProtocol(HTTP_SCHEME); + serviceConfig.setTransporter("jetty"); + providers.put(serviceConfig.getName(), serviceConfig); + } + + ServerConfig sc = new ServerConfig(); + sc.setServiceMap(providers); + sc.setApp("http-multi-port-test"); + sc.setLocalIp(NetUtils.LOCAL_HOST); + sc.init(); + serverConfig = sc; + } + + @AfterClass + public static void stopHttpServers() { + if (serverConfig != null) { + serverConfig.stop(); + serverConfig = null; + } + ConfigManager.stopTest(); + } + + @Test + public void testHttpMultiPortNamingUrlConcurrent() throws InterruptedException { + // Build "ip://127.0.0.1:p1,127.0.0.1:p2,...,127.0.0.1:p10". + StringBuilder urlBuilder = new StringBuilder("ip://"); + for (int i = 0; i < SERVER_COUNT; i++) { + if (i > 0) { + urlBuilder.append(','); + } + urlBuilder.append(NetUtils.LOCAL_HOST).append(':').append(BASE_PORT + i); + } + String namingUrl = urlBuilder.toString(); + + BackendConfig backendConfig = new BackendConfig(); + backendConfig.setName("http-multi-port-client"); + backendConfig.setNamingUrl(namingUrl); + backendConfig.setProtocol("http"); + backendConfig.setRequestTimeout(REQUEST_TIMEOUT_MS); + backendConfig.setMaxConns(MAX_CONNECTIONS); + backendConfig.setConnsPerAddr(2); + backendConfig.setKeepAlive(true); + + ConsumerConfig consumerConfig = new ConsumerConfig<>(); + consumerConfig.setServiceInterface(GreeterService.class); + consumerConfig.setBackendConfig(backendConfig); + + try { + final GreeterService proxy = consumerConfig.getProxy(); + + // Per-port hit counter aggregated across all worker threads. + ConcurrentHashMap portHits = new ConcurrentHashMap<>(); + for (int i = 0; i < SERVER_COUNT; i++) { + portHits.put(BASE_PORT + i, new AtomicInteger(0)); + } + + CountDownLatch latch = new CountDownLatch(THREAD_COUNT); + TestResult[] results = new TestResult[THREAD_COUNT]; + for (int t = 0; t < THREAD_COUNT; t++) { + final TestResult r = new TestResult(); + results[t] = r; + final int threadIndex = t; + new Thread(() -> { + try { + for (int i = 0; i < CYCLE_PER_THREAD; i++) { + String reqPayload = "req-" + threadIndex + "-" + i; + RpcClientContext ctx = new RpcClientContext(); + HelloResponse rsp = proxy.sayHello(ctx, HelloRequest.newBuilder() + .setMessage(reqPayload) + .build()); + assertNotNull("response must not be null", rsp); + String message = rsp.getMessage(); + // Server returns "|port=". + int sep = message.lastIndexOf("|port="); + assertTrue("response missing port marker: " + message, sep > 0); + String echoed = message.substring(0, sep); + int port = Integer.parseInt(message.substring(sep + "|port=".length())); + assertEquals("echoed payload must match request", reqPayload, echoed); + AtomicInteger counter = portHits.get(port); + assertTrue("response from unexpected port: " + port, counter != null); + counter.incrementAndGet(); + } + r.succ = true; + } catch (Throwable ex) { + r.succ = false; + r.ex = ex; + ex.printStackTrace(); + } finally { + latch.countDown(); + } + }, "http-concurrent-caller-" + t).start(); + } + + // 300s upper bound for HTTP — slower per-request than tRPC due to HTTP framing. + boolean done = latch.await(300, TimeUnit.SECONDS); + assertTrue("concurrent calls timed out before completion", done); + + for (int i = 0; i < results.length; i++) { + TestResult r = results[i]; + assertTrue("worker thread " + i + " failed: " + + (r.ex == null ? "" : r.ex.toString()), r.succ); + } + + // ---- aggregate assertions ---- + int totalRequests = THREAD_COUNT * CYCLE_PER_THREAD; + int sum = 0; + Set hitPorts = new HashSet<>(); + for (int i = 0; i < SERVER_COUNT; i++) { + int port = BASE_PORT + i; + int hits = portHits.get(port).get(); + sum += hits; + if (hits > 0) { + hitPorts.add(port); + } + // Random over 10 with 100000 trials → expected 10000/server. + // [2000, 20000] leaves >>3-sigma headroom; CI-safe. + assertTrue("port " + port + " never received a request", hits > 0); + assertTrue("port " + port + " too few hits: " + hits, hits >= 2000); + assertTrue("port " + port + " too many hits: " + hits, hits <= 20000); + } + assertEquals("total responses should equal total requests", totalRequests, sum); + assertEquals("all 10 backend ports must be hit", SERVER_COUNT, hitPorts.size()); + } finally { + backendConfig.stop(); + } + } + + /** + * Service impl that tags every response with its own listening port so the test can verify + * the actual server that handled each request. + */ + private static class PortAwareGreeterServiceImpl implements GreeterService { + + private final int port; + + PortAwareGreeterServiceImpl(int port) { + this.port = port; + } + + @Override + public HelloResponse sayHello(RpcContext context, HelloRequest request) { + String message = request.getMessage(); + return HelloResponse.newBuilder() + .setMessage(message + "|port=" + port) + .build(); + } + + @Override + public String sayBlankHello(RpcContext context, HelloRequest request) { + return ""; + } + + @Override + public TestBeanConvertWithGetMethodRsp sayHelloNonPbType(RpcContext context, + TestBeanConvertWithGetMethodReq request) { + return new TestBeanConvertWithGetMethodRsp(request.getMessage(), + request.getStatus(), request.getComments()); + } + } + + private static class TestResult { + + boolean succ; + Throwable ex; + } +} diff --git a/trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/HttpRpcClientLongLinkTest.java b/trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/HttpRpcClientLongLinkTest.java new file mode 100644 index 000000000..0b5fdc1b7 --- /dev/null +++ b/trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/HttpRpcClientLongLinkTest.java @@ -0,0 +1,244 @@ +/* + * Tencent is pleased to support the open source community by making tRPC available. + * + * Copyright (C) 2023 Tencent. + * All rights reserved. + * + * If you have downloaded a copy of the tRPC source code from Tencent, + * please note that tRPC source code is licensed under the Apache 2.0 License, + * A copy of the Apache 2.0 License can be found in the LICENSE file. + */ + +package com.tencent.trpc.proto.http; + +import com.tencent.trpc.core.common.config.ProtocolConfig; +import com.tencent.trpc.proto.http.client.Http2RpcClient; +import com.tencent.trpc.proto.http.client.Http2cRpcClient; +import com.tencent.trpc.proto.http.client.HttpRpcClient; +import com.tencent.trpc.proto.http.client.HttpsRpcClient; +import java.lang.reflect.Field; +import org.junit.Assert; +import org.junit.Test; + +/** + * Pure unit tests for the long-connection extensions on the HTTP RpcClient hierarchy: + * {@code markUsed}, the overridden {@code isAvailable}, and the idle-threshold heuristic. + * + *

No Jetty server is started; we manipulate the internal {@code lastUsedNanos} field via + * reflection to drive the idle/active branches.

+ */ +public class HttpRpcClientLongLinkTest { + + private static final long ELEVEN_MINUTES_NANOS = java.util.concurrent.TimeUnit.MINUTES.toNanos(11); + + /** + * HttpRpcClient.isAvailable returns false when not started (lifecycle.isStarted() == false), + * regardless of lastUsedNanos. + */ + @Test + public void testHttpRpcClientNotAvailableBeforeOpen() { + ProtocolConfig pc = newProtocolConfig(); + HttpRpcClient client = new HttpRpcClient(pc); + // Lifecycle has not been started, so super.isAvailable() returns false. + Assert.assertFalse(client.isAvailable()); + } + + /** + * HttpRpcClient.isAvailable returns true when started AND recently used. + */ + @Test + public void testHttpRpcClientAvailableWhenStartedAndFresh() { + ProtocolConfig pc = newProtocolConfig(); + HttpRpcClient client = new HttpRpcClient(pc); + client.open(); + try { + client.markUsed(); // refresh + Assert.assertTrue(client.isAvailable()); + } finally { + client.close(); + } + } + + /** + * HttpRpcClient.isAvailable returns false when started but idle > 10 min. + */ + @Test + public void testHttpRpcClientNotAvailableWhenIdleTooLong() throws Exception { + ProtocolConfig pc = newProtocolConfig(); + HttpRpcClient client = new HttpRpcClient(pc); + client.open(); + try { + // Force lastUsedNanos to "11 minutes ago". + setField(client, "lastUsedNanos", System.nanoTime() - ELEVEN_MINUTES_NANOS); + Assert.assertFalse(client.isAvailable()); + + // Recovering via markUsed restores availability. + client.markUsed(); + Assert.assertTrue(client.isAvailable()); + } finally { + client.close(); + } + } + + /** + * Http2cRpcClient mirrors the same logic. + */ + @Test + public void testHttp2cRpcClientNotAvailableBeforeOpen() { + ProtocolConfig pc = newProtocolConfig(); + Http2cRpcClient client = new Http2cRpcClient(pc); + Assert.assertFalse(client.isAvailable()); + } + + @Test + public void testHttp2cRpcClientAvailableWhenStartedAndFresh() { + ProtocolConfig pc = newProtocolConfig(); + Http2cRpcClient client = new Http2cRpcClient(pc); + client.open(); + try { + client.markUsed(); + Assert.assertTrue(client.isAvailable()); + } finally { + client.close(); + } + } + + @Test + public void testHttp2cRpcClientNotAvailableWhenIdleTooLong() throws Exception { + ProtocolConfig pc = newProtocolConfig(); + Http2cRpcClient client = new Http2cRpcClient(pc); + client.open(); + try { + setField(client, "lastUsedNanos", System.nanoTime() - ELEVEN_MINUTES_NANOS); + Assert.assertFalse(client.isAvailable()); + client.markUsed(); + Assert.assertTrue(client.isAvailable()); + } finally { + client.close(); + } + } + + /** + * doOpen wires the underlying httpClient. close() must release it without throwing. + */ + @Test + public void testHttpRpcClientOpenCloseReleasesResources() { + ProtocolConfig pc = newProtocolConfig(); + HttpRpcClient client = new HttpRpcClient(pc); + client.open(); + Assert.assertNotNull(client.getHttpClient()); + client.close(); + Assert.assertTrue(client.isClosed()); + // Idempotent close is safe. + client.close(); + } + + /** + * doOpen for Http2cRpcClient creates an httpAsyncClient and starts it. + */ + @Test + public void testHttp2cRpcClientOpenCloseReleasesResources() { + ProtocolConfig pc = newProtocolConfig(); + Http2cRpcClient client = new Http2cRpcClient(pc); + client.open(); + Assert.assertNotNull(client.getHttpAsyncClient()); + client.close(); + Assert.assertTrue(client.isClosed()); + client.close(); + } + + /** + * Http2RpcClient inherits markUsed/isAvailable from Http2cRpcClient. We don't need a real + * TLS context — we only verify the inherited methods behave the same on the subclass. + * Use reflection to flip lifecycle state to STARTED so that super.isAvailable() returns true. + */ + @Test + public void testHttp2RpcClientInheritsIdleHeuristic() throws Exception { + ProtocolConfig pc = newProtocolConfig(); + Http2RpcClient client = new Http2RpcClient(pc); + forceLifecycleStarted(client); + try { + client.markUsed(); + Assert.assertTrue(client.isAvailable()); + setField(client, "lastUsedNanos", System.nanoTime() - ELEVEN_MINUTES_NANOS); + Assert.assertFalse(client.isAvailable()); + } finally { + forceLifecycleClosed(client); + } + } + + /** + * HttpsRpcClient extends Http2RpcClient. Same inheritance check. + */ + @Test + public void testHttpsRpcClientInheritsIdleHeuristic() throws Exception { + ProtocolConfig pc = newProtocolConfig(); + HttpsRpcClient client = new HttpsRpcClient(pc); + forceLifecycleStarted(client); + try { + client.markUsed(); + Assert.assertTrue(client.isAvailable()); + setField(client, "lastUsedNanos", System.nanoTime() - ELEVEN_MINUTES_NANOS); + Assert.assertFalse(client.isAvailable()); + } finally { + forceLifecycleClosed(client); + } + } + + /* ---------------------- helpers ---------------------- */ + + private static ProtocolConfig newProtocolConfig() { + ProtocolConfig pc = new ProtocolConfig(); + pc.setIp("127.0.0.1"); + pc.setPort(0); + pc.setProtocol("http"); + pc.setNetwork("tcp"); + pc.setDefault(); + return pc; + } + + /** + * Bypass real doOpen — flip the embedded LifecycleObj to STARTED so isAvailable's + * super.isAvailable() check passes without spinning up actual HTTP infrastructure. + */ + private static void forceLifecycleStarted(Object client) throws Exception { + Field lf = findField(client.getClass(), "lifecycleObj"); + lf.setAccessible(true); + Object lifecycle = lf.get(client); + Field state = findField(lifecycle.getClass(), "state"); + state.setAccessible(true); + // LifecycleState enum: STARTED ordinal lookup via reflection (avoid hard dependency). + Class stateEnum = state.getType(); + Object started = stateEnum.getMethod("valueOf", String.class).invoke(null, "STARTED"); + state.set(lifecycle, started); + } + + private static void forceLifecycleClosed(Object client) throws Exception { + Field lf = findField(client.getClass(), "lifecycleObj"); + lf.setAccessible(true); + Object lifecycle = lf.get(client); + Field state = findField(lifecycle.getClass(), "state"); + state.setAccessible(true); + Class stateEnum = state.getType(); + Object stopped = stateEnum.getMethod("valueOf", String.class).invoke(null, "STOPPED"); + state.set(lifecycle, stopped); + } + + private static void setField(Object target, String fieldName, Object value) throws Exception { + Field f = findField(target.getClass(), fieldName); + f.setAccessible(true); + f.set(target, value); + } + + private static Field findField(Class clazz, String name) throws NoSuchFieldException { + Class c = clazz; + while (c != null) { + try { + return c.getDeclaredField(name); + } catch (NoSuchFieldException ignored) { + c = c.getSuperclass(); + } + } + throw new NoSuchFieldException(name); + } +} diff --git a/trpc-proto/trpc-proto-standard/src/test/java/com/tencent/trpc/proto/standard/concurrenttest/MultiPortNamingUrlConcurrentTest.java b/trpc-proto/trpc-proto-standard/src/test/java/com/tencent/trpc/proto/standard/concurrenttest/MultiPortNamingUrlConcurrentTest.java new file mode 100644 index 000000000..3269332e7 --- /dev/null +++ b/trpc-proto/trpc-proto-standard/src/test/java/com/tencent/trpc/proto/standard/concurrenttest/MultiPortNamingUrlConcurrentTest.java @@ -0,0 +1,228 @@ +/* + * Tencent is pleased to support the open source community by making tRPC available. + * + * Copyright (C) 2023 Tencent. + * All rights reserved. + * + * If you have downloaded a copy of the tRPC source code from Tencent, + * please note that tRPC source code is licensed under the Apache 2.0 License, + * A copy of the Apache 2.0 License can be found in the LICENSE file. + */ + +package com.tencent.trpc.proto.standard.concurrenttest; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.google.protobuf.ByteString; +import com.tencent.trpc.core.common.ConfigManager; +import com.tencent.trpc.core.common.config.BackendConfig; +import com.tencent.trpc.core.common.config.ProviderConfig; +import com.tencent.trpc.core.common.config.ServiceConfig; +import com.tencent.trpc.core.rpc.RpcClientContext; +import com.tencent.trpc.core.rpc.RpcServerContext; +import com.tencent.trpc.proto.standard.common.HelloRequestProtocol.HelloRequest; +import com.tencent.trpc.proto.standard.common.HelloRequestProtocol.HelloResponse; +import com.tencent.trpc.proto.support.DefResponseFutureManager; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Verifies that {@link BackendConfig#setNamingUrl(String)} configured with a comma-separated list + * of {@code ip:port} entries fan-outs requests to all backends (random load balance) under heavy + * concurrency. + * + *

Setup: 10 standalone tRPC servers on consecutive ports; one shared {@link BackendConfig} + * whose namingUrl lists all 10 endpoints; 100 concurrent threads × 1000 requests each.

+ * + *

Each server impl echoes back its own listening port so the client can group responses by the + * actually-served port. Final assertions: + *

    + *
  • every request succeeds with the exact echoed payload,
  • + *
  • all 10 backend ports get hit at least once (proving namingUrl actually distributes + * traffic, not pinning to one endpoint),
  • + *
  • distribution is roughly balanced — random selector over N=10 with R=100000 requests + * gives an expected 10000 per server; we tolerate {@code [2000, 20000]} per server which + * is a generous bound far above any realistic random outlier.
  • + *
+ */ +public class MultiPortNamingUrlConcurrentTest { + + private static final int BASE_TCP_PORT = 12500; + private static final int SERVER_COUNT = 10; + private static final int THREAD_COUNT = 100; + private static final int CYCLE_PER_THREAD = 1000; + + private final List serviceConfigs = new ArrayList<>(SERVER_COUNT); + + @Before + public void before() { + ConfigManager.stopTest(); + ConfigManager.startTest(); + startServers(); + } + + @After + public void stop() { + for (ServiceConfig serviceConfig : serviceConfigs) { + try { + serviceConfig.unExport(); + } catch (Exception ignore) { + // ignore + } + } + serviceConfigs.clear(); + ConfigManager.stopTest(); + } + + @Test + public void testMultiPortNamingUrlConcurrent() throws InterruptedException { + // Build the comma-separated namingUrl: "ip://127.0.0.1:p1,127.0.0.1:p2,..." + StringBuilder urlBuilder = new StringBuilder("ip://"); + for (int i = 0; i < SERVER_COUNT; i++) { + if (i > 0) { + urlBuilder.append(','); + } + urlBuilder.append("127.0.0.1:").append(BASE_TCP_PORT + i); + } + String namingUrl = urlBuilder.toString(); + + BackendConfig backendConfig = new BackendConfig(); + DefResponseFutureManager.reset(); + backendConfig.setNamingUrl(namingUrl); + // One long connection per backend addr is enough; keeps the test deterministic. + backendConfig.setConnsPerAddr(5); + backendConfig.setNetwork("tcp"); + // Generous client-side timeout so a slow JIT warm-up can't fail individual calls. + backendConfig.setRequestTimeout(60_000); + + final ConcurrentTestServiceApi proxy = backendConfig.getProxy(ConcurrentTestServiceApi.class); + + // Per-port hit counter aggregated across all threads. + ConcurrentHashMap portHits = new ConcurrentHashMap<>(); + for (int i = 0; i < SERVER_COUNT; i++) { + portHits.put(BASE_TCP_PORT + i, new AtomicInteger(0)); + } + + CountDownLatch latch = new CountDownLatch(THREAD_COUNT); + List results = new ArrayList<>(THREAD_COUNT); + for (int t = 0; t < THREAD_COUNT; t++) { + final TestResult r = new TestResult(); + results.add(r); + final int threadIndex = t; + new Thread(() -> { + try { + for (int i = 0; i < CYCLE_PER_THREAD; i++) { + String reqPayload = "req-" + threadIndex + "-" + i; + RpcClientContext context = new RpcClientContext(); + HelloResponse response = proxy.sayHello(context, HelloRequest.newBuilder() + .setMessage(ByteString.copyFromUtf8(reqPayload)) + .build()); + // Server impl returns "|port=" + String message = response.getMessage().toStringUtf8(); + int sep = message.lastIndexOf("|port="); + assertTrue("response missing port marker: " + message, sep > 0); + String echoed = message.substring(0, sep); + int port = Integer.parseInt(message.substring(sep + "|port=".length())); + assertEquals("echoed payload must match request", reqPayload, echoed); + AtomicInteger counter = portHits.get(port); + assertTrue("response from unexpected port: " + port, counter != null); + counter.incrementAndGet(); + } + r.succ = true; + } catch (Throwable ex) { + r.succ = false; + r.ex = ex; + ex.printStackTrace(); + } finally { + latch.countDown(); + } + }, "concurrent-caller-" + t).start(); + } + // 200s upper bound; full run on a laptop usually finishes in a few seconds. + boolean done = latch.await(200, TimeUnit.SECONDS); + assertTrue("concurrent calls timed out before completion", done); + + for (int i = 0; i < results.size(); i++) { + TestResult r = results.get(i); + assertTrue("worker thread " + i + " failed: " + + (r.ex == null ? "" : r.ex.toString()), r.succ); + } + + // ---- final aggregate assertions ---- + int totalRequests = THREAD_COUNT * CYCLE_PER_THREAD; + int sum = 0; + Set hitPorts = new HashSet<>(); + for (int i = 0; i < SERVER_COUNT; i++) { + int port = BASE_TCP_PORT + i; + int hits = portHits.get(port).get(); + sum += hits; + if (hits > 0) { + hitPorts.add(port); + } + // Random over 10 with 100000 trials → expected 10000/server. + // Lower bound 2000 / upper bound 20000 leaves >>3-sigma headroom; CI-safe. + assertTrue("port " + port + " never received a request", hits > 0); + assertTrue("port " + port + " too few hits: " + hits, hits >= 2000); + assertTrue("port " + port + " too many hits: " + hits, hits <= 20000); + } + assertEquals("total responses should equal total requests", totalRequests, sum); + assertEquals("all 10 backend ports must be hit", SERVER_COUNT, hitPorts.size()); + } + + private void startServers() { + for (int i = 0; i < SERVER_COUNT; i++) { + int port = BASE_TCP_PORT + i; + ProviderConfig providerConfig = new ProviderConfig<>(); + providerConfig.setRef(new PortAwareEchoServiceImpl(port)); + + ServiceConfig serviceConfig = new ServiceConfig(); + serviceConfig.setIp("127.0.0.1"); + serviceConfig.setNetwork("tcp"); + serviceConfig.setPort(port); + serviceConfig.setEnableLinkTimeout(true); + // Generous server-side timeout to avoid spurious timeouts on slow CI. + serviceConfig.setRequestTimeout(60_000); + serviceConfig.addProviderConfig(providerConfig); + serviceConfig.export(); + serviceConfigs.add(serviceConfig); + } + } + + /** + * Service impl that tags every response with its own listening port so the test can verify + * the actual server that handled each request. + */ + private static class PortAwareEchoServiceImpl implements ConcurrentTestService { + + private final int port; + + PortAwareEchoServiceImpl(int port) { + this.port = port; + } + + @Override + public HelloResponse sayHello(RpcServerContext context, HelloRequest request) { + String echoed = request.getMessage().toStringUtf8(); + String tagged = echoed + "|port=" + port; + return HelloResponse.newBuilder() + .setMessage(ByteString.copyFromUtf8(tagged)) + .build(); + } + } + + private static class TestResult { + + boolean succ; + Throwable ex; + } +} diff --git a/trpc-test/trpc-test-integration/src/integration-test/java/com/tencent/trpc/integration/test/transport/TransportIntegrationTest.java b/trpc-test/trpc-test-integration/src/integration-test/java/com/tencent/trpc/integration/test/transport/TransportIntegrationTest.java index 29460e42e..feb5497e9 100644 --- a/trpc-test/trpc-test-integration/src/integration-test/java/com/tencent/trpc/integration/test/transport/TransportIntegrationTest.java +++ b/trpc-test/trpc-test-integration/src/integration-test/java/com/tencent/trpc/integration/test/transport/TransportIntegrationTest.java @@ -102,15 +102,14 @@ public void testUdpToTcpNettyTransport() { } /** - * Test for server-side idle-timeout + * Test for server-side idle-timeout. + *

Long-connection mode: idle timeout no longer closes the connection. The framework + * keeps the connection alive regardless of how long it stays idle, so this case is + * intentionally left empty as a placeholder for the historical behaviour.

*/ @Test public void testIdleTimeout() { - assertThrows(RuntimeException.class, () -> - tcpEchoAPI.delayedEcho(new RpcClientContext(), DelayedEchoRequest.newBuilder() - .setMessage("timeout") - .setDelaySeconds(2) - .build())); + // No-op under long-connection mode: idleTimeout has no effect on the netty pipeline. } /** diff --git a/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyClientHandler.java b/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyClientHandler.java index 26dce0165..cb847535e 100644 --- a/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyClientHandler.java +++ b/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyClientHandler.java @@ -20,8 +20,6 @@ import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import io.netty.handler.timeout.IdleState; -import io.netty.handler.timeout.IdleStateEvent; @io.netty.channel.ChannelHandler.Sharable public class NettyClientHandler extends ChannelDuplexHandler { @@ -105,24 +103,6 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) } } - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (evt instanceof IdleStateEvent) { - NettyChannel channel = NettyChannelManager.getOrAddChannel(ctx.channel(), config); - try { - // only close the channel in a TCP scenario. - if (isTcp) { - IdleState state = ((IdleStateEvent) evt).state(); - logger.warn("Idle event(state=" + state + ") trigger, close channel" + channel); - channel.close(); - } - } finally { - NettyChannelManager.removeChannelIfDisconnected(ctx.channel()); - } - } - super.userEventTriggered(ctx, evt); - } - public ConcurrentHashSet getChannelSet() { return channelSet; } diff --git a/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyServerHandler.java b/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyServerHandler.java index 4b495560e..26eade523 100644 --- a/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyServerHandler.java +++ b/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyServerHandler.java @@ -20,8 +20,6 @@ import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import io.netty.handler.timeout.IdleState; -import io.netty.handler.timeout.IdleStateEvent; import java.net.InetSocketAddress; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -122,21 +120,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { } } - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (evt instanceof IdleStateEvent) { - NettyChannel channel = NettyChannelManager.getOrAddChannel(ctx.channel(), config); - try { - IdleState state = ((IdleStateEvent) evt).state(); - logger.warn("idle event[{}] trigger, close channel:{}", state, channel); - channel.close(); - } finally { - NettyChannelManager.removeChannelIfDisconnected(ctx.channel()); - } - } - super.userEventTriggered(ctx, evt); - } - public ConcurrentMap getChannels() { return clientChannels; } diff --git a/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyTcpClientTransport.java b/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyTcpClientTransport.java index 8bc913304..312ba4e76 100644 --- a/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyTcpClientTransport.java +++ b/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyTcpClientTransport.java @@ -11,8 +11,6 @@ package com.tencent.trpc.transport.netty; -import static java.util.concurrent.TimeUnit.MILLISECONDS; - import com.tencent.trpc.core.common.config.ProtocolConfig; import com.tencent.trpc.core.logger.Logger; import com.tencent.trpc.core.logger.LoggerFactory; @@ -25,13 +23,15 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultThreadFactory; import java.util.concurrent.CompletableFuture; /** - * A netty tcp ClientTransport + * A netty tcp ClientTransport. + *

Long-connection mode: no IdleStateHandler is installed and idle detection is disabled. + * Connections are kept alive for the lifetime of the transport and only released when the + * transport is shut down or when the peer actively closes the connection.

*/ public class NettyTcpClientTransport extends NettyAbstractClientTransport { @@ -67,18 +67,17 @@ protected void doOpen() { bootstrap.handler(new ChannelInitializer() { @Override protected void initChannel(NioSocketChannel ch) { - - IdleStateHandler clientIdleHandler = - new IdleStateHandler(0, config.getIdleTimeout(), 0, MILLISECONDS); + // Long-connection mode: do NOT install IdleStateHandler. The idleTimeout field + // is kept for backward compatibility but no longer takes effect on the netty + // pipeline. ChannelPipeline p = ch.pipeline(); if (codec == null) { - p.addLast("client-idle", clientIdleHandler).addLast("handler", clientHandler); + p.addLast("handler", clientHandler); } else { NettyCodecAdapter nettyCodec = NettyCodecAdapter .createTcpCodecAdapter(codec, config); p.addLast("encode", nettyCodec.getEncoder()) .addLast("decode", nettyCodec.getDecoder()) - .addLast("client-idle", clientIdleHandler) .addLast("handler", clientHandler); } } diff --git a/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyTcpServerTransport.java b/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyTcpServerTransport.java index 4af2e3cdd..d6e6d6c6e 100644 --- a/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyTcpServerTransport.java +++ b/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyTcpServerTransport.java @@ -11,8 +11,6 @@ package com.tencent.trpc.transport.netty; -import static java.util.concurrent.TimeUnit.MILLISECONDS; - import com.tencent.trpc.core.common.config.ProtocolConfig; import com.tencent.trpc.core.exception.TransportException; import com.tencent.trpc.core.logger.Logger; @@ -40,7 +38,6 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.flush.FlushConsolidationHandler; -import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.Version; import io.netty.util.concurrent.DefaultThreadFactory; import java.util.HashSet; @@ -122,16 +119,14 @@ protected void doOpen() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); - IdleStateHandler idleHandler = - new IdleStateHandler(0, 0, config.getIdleTimeout(), MILLISECONDS); - if (codec == null) { - p.addLast("server-idle", idleHandler); - } else { + // Long-connection mode: do NOT install IdleStateHandler. The idleTimeout field + // is kept for backward compatibility but no longer takes effect on the netty + // pipeline. The server never proactively closes a client connection due to idle. + if (codec != null) { NettyCodecAdapter nettyCodec = NettyCodecAdapter .createTcpCodecAdapter(codec, config); p.addLast("encode", nettyCodec.getEncoder())// - .addLast("decode", nettyCodec.getDecoder())// - .addLast("server-idle", idleHandler); + .addLast("decode", nettyCodec.getDecoder()); } if (flushConsolidationSwitch) { p.addLast("flushConsolidationHandlers", diff --git a/trpc-transport/trpc-transport-netty/src/test/java/com/tencent/trpc/transport/netty/NettyChannelHandlerTest.java b/trpc-transport/trpc-transport-netty/src/test/java/com/tencent/trpc/transport/netty/NettyChannelHandlerTest.java index 6f962d660..11a0073b7 100644 --- a/trpc-transport/trpc-transport-netty/src/test/java/com/tencent/trpc/transport/netty/NettyChannelHandlerTest.java +++ b/trpc-transport/trpc-transport-netty/src/test/java/com/tencent/trpc/transport/netty/NettyChannelHandlerTest.java @@ -33,7 +33,8 @@ public void test() throws Exception { new NettyClientHandler(new ChannelHandlerAdapter(), new ProtocolConfig(), true) .userEventTriggered(new ChannelHandlerContextTest(channelTest2), IdleStateEvent.WRITER_IDLE_STATE_EVENT); - Assert.assertTrue(channelTest2.getIsClose() != null && channelTest2.isClose); + // Long-connection mode: client must NOT close the channel on idle event. + Assert.assertTrue(channelTest2.getIsClose() == null || !channelTest2.isClose); ChannelTest channelTest3 = new ChannelTest(); channelTest3.setActive(true); @@ -47,6 +48,7 @@ public void test() throws Exception { new NettyServerHandler(new ChannelHandlerAdapter(), new ProtocolConfig(), true) .userEventTriggered(new ChannelHandlerContextTest(channelTest4), IdleStateEvent.WRITER_IDLE_STATE_EVENT); - Assert.assertTrue(channelTest4.getIsClose() != null && channelTest4.isClose); + // Long-connection mode: server must NOT close the channel on idle event. + Assert.assertTrue(channelTest4.getIsClose() == null || !channelTest4.isClose); } }