got;
+ try {
+ got = invoker.getInvoker(instance);
+ } catch (TRpcException ex) {
+ // SPI factory may not be available in some environments; in that case createInvoker
+ // throws. Either outcome confirms the fast path correctly rejected the stale entry.
+ Assert.assertTrue(ex.getMessage().contains("Create rpc client"));
+ return;
+ }
+ Assert.assertNotSame("Stale entry must be replaced", stale, got);
+ Assert.assertNotSame(stale, cache.get(key));
+ // Clean up the freshly created RpcClient to avoid leaks across tests.
+ com.tencent.trpc.core.cluster.RpcClusterClientManager.shutdownBackendConfig(
+ invoker.getBackendConfig());
+ }
+
+ /**
+ * Direct assertion of the bug fix: the closeFuture hook installed inside createInvoker uses
+ * CAS-remove. We simulate the hook semantics here to lock down the invariant.
+ *
+ * The actual hook is a lambda registered when a fresh invoker is created. We verify the
+ * same semantics by constructing two proxies, putting a "newer" one in the cache, then
+ * applying CAS-remove with the "older" key/value pair. The newer entry must NOT be evicted.
+ */
+ @Test
+ public void testCasRemoveDoesNotEvictNewerEntry() throws Exception {
+ ConcurrentMap> cache = getCache();
+ String key = "127.0.0.1:18001:tcp";
+
+ TestRpcClient clientA = new TestRpcClient();
+ ConsumerInvokerProxy a = new ConsumerInvokerProxy<>(
+ stubInvoker(clientA.getProtocolConfig()), clientA);
+ TestRpcClient clientB = new TestRpcClient();
+ ConsumerInvokerProxy b = new ConsumerInvokerProxy<>(
+ stubInvoker(clientB.getProtocolConfig()), clientB);
+
+ cache.put(key, b); // current value is B
+
+ // Simulate the closeFuture hook for A firing while B is the current cache value.
+ boolean removed = cache.remove(key, a);
+ Assert.assertFalse("CAS-remove must miss: A is no longer the current value", removed);
+ Assert.assertSame(b, cache.get(key));
+
+ // Simulate B's hook firing → evicts.
+ Assert.assertTrue(cache.remove(key, b));
+ Assert.assertNull(cache.get(key));
+ }
+
+ /**
+ * Sanity: ConsumerInvokerProxy.isAvailable() reflects the underlying client.
+ */
+ @Test
+ public void testProxyIsAvailableTracksUnderlyingClient() {
+ TestRpcClient client = new TestRpcClient();
+ ConsumerInvokerProxy proxy = new ConsumerInvokerProxy<>(
+ stubInvoker(client.getProtocolConfig()), client);
+ Assert.assertTrue(proxy.isAvailable());
+ client.available.set(false);
+ Assert.assertFalse(proxy.isAvailable());
+ }
+
+ /**
+ * Sanity: ConsumerInvokerProxy.invoke fills CallInfo on the request and reports to the
+ * selector (best-effort; selector lookup may return null which is tolerated).
+ */
+ @Test
+ public void testProxyInvokeFillsCallInfoAndReports() {
+ TestRpcClient client = new TestRpcClient();
+ ConsumerInvokerProxy proxy = new ConsumerInvokerProxy<>(
+ stubInvoker(client.getProtocolConfig()), client);
+
+ com.tencent.trpc.core.rpc.def.DefRequest request = new com.tencent.trpc.core.rpc.def.DefRequest();
+ com.tencent.trpc.core.rpc.RpcInvocation invocation = new com.tencent.trpc.core.rpc.RpcInvocation();
+ invocation.setFunc("any");
+ request.setInvocation(invocation);
+
+ java.util.HashMap params = new java.util.HashMap<>();
+ params.put("container_name", "test-container");
+ params.put("set_division", "test-set");
+ ServiceInstance instance = new ServiceInstance("127.0.0.1", 18001, params);
+
+ Assert.assertNotNull(proxy.invoke(request, instance));
+ // The invoke wraps responses; the underlying stub returns a successful future.
+ }
+
+ /* ---------------------- helpers ---------------------- */
+
+ @SuppressWarnings("unchecked")
+ private ConcurrentMap> getCache() throws Exception {
+ Field f = DefClusterInvoker.class.getDeclaredField("invokerCache");
+ f.setAccessible(true);
+ return (ConcurrentMap>) f.get(invoker);
+ }
+
+ private ConsumerInvoker stubInvoker(ProtocolConfig pc) {
+ return new ConsumerInvoker() {
+ @Override
+ public Class getInterface() {
+ return GenericClient.class;
+ }
+
+ @Override
+ public CompletionStage invoke(Request request) {
+ return FutureUtils.newSuccessFuture(null);
+ }
+
+ @Override
+ public ConsumerConfig getConfig() {
+ return invoker.getConfig();
+ }
+
+ @Override
+ public ProtocolConfig getProtocolConfig() {
+ return pc;
+ }
+ };
+ }
+
+ /* ---------------------- mock ---------------------- */
+
+ private static class TestRpcClient implements RpcClient {
+
+ final AtomicBoolean available = new AtomicBoolean(true);
+ final AtomicBoolean closed = new AtomicBoolean(false);
+ final CloseFuture closeFuture = new CloseFuture<>();
+ private final ProtocolConfig protocolConfig = new ProtocolConfig();
+
+ @Override
+ public void open() throws TRpcException {
+ }
+
+ @Override
+ public ConsumerInvoker createInvoker(ConsumerConfig consumerConfig) {
+ return null;
+ }
+
+ @Override
+ public void close() {
+ closed.set(true);
+ closeFuture.complete(null);
+ }
+
+ @Override
+ public CloseFuture closeFuture() {
+ return closeFuture;
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return available.get() && !closed.get();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return closed.get();
+ }
+
+ @Override
+ public ProtocolConfig getProtocolConfig() {
+ return protocolConfig;
+ }
+ }
+}
diff --git a/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/Http2ConsumerInvoker.java b/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/Http2ConsumerInvoker.java
index fd6b57ab4..6bf168870 100644
--- a/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/Http2ConsumerInvoker.java
+++ b/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/Http2ConsumerInvoker.java
@@ -137,7 +137,11 @@ private Response handleResponse(Request request, SimpleHttpResponse simpleHttpRe
*/
private SimpleHttpResponse execute(Request request, int requestTimeout,
SimpleHttpRequest simpleHttpRequest) throws Exception {
- CloseableHttpAsyncClient httpAsyncClient = ((Http2cRpcClient) client).getHttpAsyncClient();
+ Http2cRpcClient http2cRpcClient = (Http2cRpcClient) client;
+ // Refresh idle counter so the cluster manager's reconnect-check timer keeps treating
+ // this client as healthy while it's actively serving requests.
+ http2cRpcClient.markUsed();
+ CloseableHttpAsyncClient httpAsyncClient = http2cRpcClient.getHttpAsyncClient();
Future httpResponseFuture = httpAsyncClient.execute(simpleHttpRequest,
new FutureCallback() {
@Override
diff --git a/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/Http2cRpcClient.java b/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/Http2cRpcClient.java
index f00d9858d..4bbeb980e 100644
--- a/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/Http2cRpcClient.java
+++ b/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/Http2cRpcClient.java
@@ -19,6 +19,7 @@
import com.tencent.trpc.core.rpc.AbstractRpcClient;
import com.tencent.trpc.core.rpc.ConsumerInvoker;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
@@ -29,10 +30,23 @@ public class Http2cRpcClient extends AbstractRpcClient {
private static final Logger logger = LoggerFactory.getLogger(HttpRpcClient.class);
+ /**
+ * If this client has not been used by any RPC for longer than this window, the periodic
+ * scanner in {@code RpcClusterClientManager} will treat it as unavailable and eventually
+ * close & evict it. The window is intentionally large so that any actively-used client is
+ * never affected. See {@link HttpRpcClient} for the same mechanism on the HTTP/1.1 path.
+ */
+ private static final long IDLE_UNAVAILABLE_THRESHOLD_NANOS = TimeUnit.MINUTES.toNanos(10);
+
/**
* Asynchronous HTTP client
*/
protected CloseableHttpAsyncClient httpAsyncClient;
+ /**
+ * Timestamp (System.nanoTime()) of the most recent RPC sent through this client. Updated by
+ * {@link Http2ConsumerInvoker} on each request.
+ */
+ private volatile long lastUsedNanos = System.nanoTime();
public Http2cRpcClient(ProtocolConfig config) {
setConfig(config);
@@ -74,6 +88,27 @@ public ConsumerInvoker createInvoker(ConsumerConfig consumerConfig) {
return new Http2ConsumerInvoker<>(this, consumerConfig, protocolConfig);
}
+ /**
+ * Record that this client just served (or is about to serve) an RPC. Called by
+ * {@link Http2ConsumerInvoker} on every request.
+ */
+ public void markUsed() {
+ lastUsedNanos = System.nanoTime();
+ }
+
+ /**
+ * Reports the client as unavailable if it has been idle longer than
+ * {@link #IDLE_UNAVAILABLE_THRESHOLD_NANOS}. This lets the cluster manager's periodic
+ * reconnect-check timer eventually evict orphaned clients (e.g. after backend IP rotation).
+ */
+ @Override
+ public boolean isAvailable() {
+ if (!super.isAvailable()) {
+ return false;
+ }
+ return (System.nanoTime() - lastUsedNanos) <= IDLE_UNAVAILABLE_THRESHOLD_NANOS;
+ }
+
public CloseableHttpAsyncClient getHttpAsyncClient() {
return httpAsyncClient;
}
diff --git a/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/HttpConsumerInvoker.java b/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/HttpConsumerInvoker.java
index 7d101e8c6..4a6b5a318 100644
--- a/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/HttpConsumerInvoker.java
+++ b/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/HttpConsumerInvoker.java
@@ -63,7 +63,11 @@ public HttpConsumerInvoker(HttpRpcClient client, ConsumerConfig config,
public Response send(Request request) throws Exception {
HttpPost httpPost = buildRequest(request);
- CloseableHttpClient httpClient = ((HttpRpcClient) client).getHttpClient();
+ HttpRpcClient httpRpcClient = (HttpRpcClient) client;
+ // Refresh idle counter so the cluster manager's reconnect-check timer keeps treating
+ // this client as healthy while it's actively serving requests.
+ httpRpcClient.markUsed();
+ CloseableHttpClient httpClient = httpRpcClient.getHttpClient();
try (CloseableHttpResponse httpResponse = httpClient.execute(httpPost)) {
return handleResponse(request, httpResponse);
diff --git a/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/HttpRpcClient.java b/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/HttpRpcClient.java
index 28c713206..8a508d095 100644
--- a/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/HttpRpcClient.java
+++ b/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/HttpRpcClient.java
@@ -18,18 +18,54 @@
import com.tencent.trpc.core.rpc.AbstractRpcClient;
import com.tencent.trpc.core.rpc.ConsumerInvoker;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
/**
* HTTP protocol client.
+ * Long-connection mode: connections are pooled by Apache {@link PoolingHttpClientConnectionManager}
+ * and reused across requests via HTTP/1.1 keep-alive. Two safeguards are enabled by default to
+ * keep the pool healthy in long-running processes:
+ *
+ * - {@code validateAfterInactivity}: re-check a connection's liveness before reuse if it
+ * has been idle for a short period (avoids the classic "stale connection / NoHttpResponseException"
+ * when the server has half-closed an idle keep-alive connection);
+ * - {@code evictIdleConnections}: a small background thread evicts connections that have
+ * been idle longer than the configured limit, freeing OS file descriptors.
+ *
*/
public class HttpRpcClient extends AbstractRpcClient {
private static final Logger logger = LoggerFactory.getLogger(HttpRpcClient.class);
+ /**
+ * Validate a pooled connection before reuse if it has been idle for at least this many
+ * milliseconds. Cheap heuristic that catches most server-side half-closed keep-alive sockets.
+ */
+ private static final int VALIDATE_AFTER_INACTIVITY_MS = 2000;
+ /**
+ * Evict pooled connections that have been idle for longer than this duration.
+ */
+ private static final long EVICT_IDLE_CONNECTIONS_SECONDS = 60L;
+ /**
+ * If this client has not been used by any RPC for longer than this window, the periodic
+ * scanner in {@code RpcClusterClientManager} will treat it as unavailable. After
+ * a few consecutive unavailable observations the client gets closed and evicted from the
+ * cluster cache, which is how we reclaim {@link HttpRpcClient} instances orphaned by backend
+ * IP rotation (e.g. K8s pod IP drift). The window is intentionally large so that any
+ * actively-used client is never affected.
+ */
+ private static final long IDLE_UNAVAILABLE_THRESHOLD_NANOS =
+ java.util.concurrent.TimeUnit.MINUTES.toNanos(10);
+
private CloseableHttpClient httpClient;
+ /**
+ * Timestamp (System.nanoTime()) of the most recent RPC sent through this client. Updated by
+ * {@link HttpConsumerInvoker} on each send.
+ */
+ private volatile long lastUsedNanos = System.nanoTime();
public HttpRpcClient(ProtocolConfig config) {
setConfig(config);
@@ -44,7 +80,16 @@ protected void doOpen() {
// If there is only one route, the maximum number of connections for a single route is the same
// as the maximum number of connections for the entire connection pool.
cm.setDefaultMaxPerRoute(maxConns);
- httpClient = HttpClients.custom().setConnectionManager(cm).build();
+ // Re-validate idle pooled connections before reuse so we do not send a request through a
+ // socket the server has already half-closed.
+ cm.setValidateAfterInactivity(VALIDATE_AFTER_INACTIVITY_MS);
+ httpClient = HttpClients.custom()
+ .setConnectionManager(cm)
+ // Background eviction of stale & long-idle connections; keeps the pool tidy in
+ // long-running processes without affecting hot connections.
+ .evictExpiredConnections()
+ .evictIdleConnections(EVICT_IDLE_CONNECTIONS_SECONDS, TimeUnit.SECONDS)
+ .build();
}
@Override
@@ -64,6 +109,28 @@ public ConsumerInvoker createInvoker(ConsumerConfig consumerConfig) {
return new HttpConsumerInvoker<>(this, consumerConfig, protocolConfig);
}
+ /**
+ * Record that this client just served (or is about to serve) an RPC. Called by
+ * {@link HttpConsumerInvoker} on every request.
+ */
+ public void markUsed() {
+ lastUsedNanos = System.nanoTime();
+ }
+
+ /**
+ * Reports the client as unavailable if it has been idle longer than
+ * {@link #IDLE_UNAVAILABLE_THRESHOLD_NANOS}. This lets the cluster manager's periodic
+ * reconnect-check timer eventually evict orphaned clients (e.g. after backend IP rotation)
+ * even though Apache HttpClient itself has no notion of "remote permanently gone".
+ */
+ @Override
+ public boolean isAvailable() {
+ if (!super.isAvailable()) {
+ return false;
+ }
+ return (System.nanoTime() - lastUsedNanos) <= IDLE_UNAVAILABLE_THRESHOLD_NANOS;
+ }
+
public CloseableHttpClient getHttpClient() {
return httpClient;
}
diff --git a/trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/HttpMultiPortNamingUrlConcurrentTest.java b/trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/HttpMultiPortNamingUrlConcurrentTest.java
new file mode 100644
index 000000000..b4e400087
--- /dev/null
+++ b/trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/HttpMultiPortNamingUrlConcurrentTest.java
@@ -0,0 +1,256 @@
+/*
+ * Tencent is pleased to support the open source community by making tRPC available.
+ *
+ * Copyright (C) 2023 Tencent.
+ * All rights reserved.
+ *
+ * If you have downloaded a copy of the tRPC source code from Tencent,
+ * please note that tRPC source code is licensed under the Apache 2.0 License,
+ * A copy of the Apache 2.0 License can be found in the LICENSE file.
+ */
+
+package com.tencent.trpc.proto.http;
+
+import static com.tencent.trpc.transport.http.common.Constants.HTTP_SCHEME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import com.tencent.trpc.core.common.ConfigManager;
+import com.tencent.trpc.core.common.config.BackendConfig;
+import com.tencent.trpc.core.common.config.ConsumerConfig;
+import com.tencent.trpc.core.common.config.ProviderConfig;
+import com.tencent.trpc.core.common.config.ServerConfig;
+import com.tencent.trpc.core.common.config.ServiceConfig;
+import com.tencent.trpc.core.rpc.RpcClientContext;
+import com.tencent.trpc.core.rpc.RpcContext;
+import com.tencent.trpc.core.utils.NetUtils;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import tests.service.GreeterService;
+import tests.service.HelloRequestProtocol.HelloRequest;
+import tests.service.HelloRequestProtocol.HelloResponse;
+import tests.service.TestBeanConvertWithGetMethodReq;
+import tests.service.TestBeanConvertWithGetMethodRsp;
+
+/**
+ * HTTP-protocol counterpart of the tRPC concurrent multi-port test in {@code trpc-proto-standard}.
+ *
+ * Setup: 10 standalone HTTP servers (jetty) on consecutive ports backed by
+ * {@link PortAwareGreeterServiceImpl} that tags each response with its own listening port.
+ * One shared {@link BackendConfig} lists all 10 endpoints in the comma-separated namingUrl;
+ * 100 concurrent threads × 1000 requests each fan-out via {@code ip://} random load balance.
+ *
+ * Final assertions:
+ *
+ * - every request succeeds and the echoed message matches the request payload,
+ * - every backend port is hit at least once,
+ * - distribution roughly balanced — random over N=10 with R=100000 trials,
+ * expected 10000 / port; tolerated range {@code [2000, 20000]}, far exceeding any
+ * realistic random outlier.
+ *
+ *
+ */
+public class HttpMultiPortNamingUrlConcurrentTest {
+
+ private static final int BASE_PORT = 18500;
+ private static final int SERVER_COUNT = 10;
+ private static final int THREAD_COUNT = 100;
+ private static final int CYCLE_PER_THREAD = 1000;
+
+ private static final int REQUEST_TIMEOUT_MS = 60_000;
+ private static final int MAX_CONNECTIONS = 20480;
+
+ private static ServerConfig serverConfig;
+
+ @BeforeClass
+ public static void startHttpServers() {
+ ConfigManager.stopTest();
+ ConfigManager.startTest();
+
+ HashMap providers = new HashMap<>();
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ int port = BASE_PORT + i;
+ ProviderConfig pc = new ProviderConfig<>();
+ pc.setServiceInterface(GreeterService.class);
+ pc.setRef(new PortAwareGreeterServiceImpl(port));
+
+ ServiceConfig serviceConfig = new ServiceConfig();
+ serviceConfig.setName("multi-port-server-" + port);
+ serviceConfig.getProviderConfigs().add(pc);
+ serviceConfig.setIp(NetUtils.LOCAL_HOST);
+ serviceConfig.setPort(port);
+ serviceConfig.setProtocol(HTTP_SCHEME);
+ serviceConfig.setTransporter("jetty");
+ providers.put(serviceConfig.getName(), serviceConfig);
+ }
+
+ ServerConfig sc = new ServerConfig();
+ sc.setServiceMap(providers);
+ sc.setApp("http-multi-port-test");
+ sc.setLocalIp(NetUtils.LOCAL_HOST);
+ sc.init();
+ serverConfig = sc;
+ }
+
+ @AfterClass
+ public static void stopHttpServers() {
+ if (serverConfig != null) {
+ serverConfig.stop();
+ serverConfig = null;
+ }
+ ConfigManager.stopTest();
+ }
+
+ @Test
+ public void testHttpMultiPortNamingUrlConcurrent() throws InterruptedException {
+ // Build "ip://127.0.0.1:p1,127.0.0.1:p2,...,127.0.0.1:p10".
+ StringBuilder urlBuilder = new StringBuilder("ip://");
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ if (i > 0) {
+ urlBuilder.append(',');
+ }
+ urlBuilder.append(NetUtils.LOCAL_HOST).append(':').append(BASE_PORT + i);
+ }
+ String namingUrl = urlBuilder.toString();
+
+ BackendConfig backendConfig = new BackendConfig();
+ backendConfig.setName("http-multi-port-client");
+ backendConfig.setNamingUrl(namingUrl);
+ backendConfig.setProtocol("http");
+ backendConfig.setRequestTimeout(REQUEST_TIMEOUT_MS);
+ backendConfig.setMaxConns(MAX_CONNECTIONS);
+ backendConfig.setConnsPerAddr(2);
+ backendConfig.setKeepAlive(true);
+
+ ConsumerConfig consumerConfig = new ConsumerConfig<>();
+ consumerConfig.setServiceInterface(GreeterService.class);
+ consumerConfig.setBackendConfig(backendConfig);
+
+ try {
+ final GreeterService proxy = consumerConfig.getProxy();
+
+ // Per-port hit counter aggregated across all worker threads.
+ ConcurrentHashMap portHits = new ConcurrentHashMap<>();
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ portHits.put(BASE_PORT + i, new AtomicInteger(0));
+ }
+
+ CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
+ TestResult[] results = new TestResult[THREAD_COUNT];
+ for (int t = 0; t < THREAD_COUNT; t++) {
+ final TestResult r = new TestResult();
+ results[t] = r;
+ final int threadIndex = t;
+ new Thread(() -> {
+ try {
+ for (int i = 0; i < CYCLE_PER_THREAD; i++) {
+ String reqPayload = "req-" + threadIndex + "-" + i;
+ RpcClientContext ctx = new RpcClientContext();
+ HelloResponse rsp = proxy.sayHello(ctx, HelloRequest.newBuilder()
+ .setMessage(reqPayload)
+ .build());
+ assertNotNull("response must not be null", rsp);
+ String message = rsp.getMessage();
+ // Server returns "|port=".
+ int sep = message.lastIndexOf("|port=");
+ assertTrue("response missing port marker: " + message, sep > 0);
+ String echoed = message.substring(0, sep);
+ int port = Integer.parseInt(message.substring(sep + "|port=".length()));
+ assertEquals("echoed payload must match request", reqPayload, echoed);
+ AtomicInteger counter = portHits.get(port);
+ assertTrue("response from unexpected port: " + port, counter != null);
+ counter.incrementAndGet();
+ }
+ r.succ = true;
+ } catch (Throwable ex) {
+ r.succ = false;
+ r.ex = ex;
+ ex.printStackTrace();
+ } finally {
+ latch.countDown();
+ }
+ }, "http-concurrent-caller-" + t).start();
+ }
+
+ // 300s upper bound for HTTP — slower per-request than tRPC due to HTTP framing.
+ boolean done = latch.await(300, TimeUnit.SECONDS);
+ assertTrue("concurrent calls timed out before completion", done);
+
+ for (int i = 0; i < results.length; i++) {
+ TestResult r = results[i];
+ assertTrue("worker thread " + i + " failed: "
+ + (r.ex == null ? "" : r.ex.toString()), r.succ);
+ }
+
+ // ---- aggregate assertions ----
+ int totalRequests = THREAD_COUNT * CYCLE_PER_THREAD;
+ int sum = 0;
+ Set hitPorts = new HashSet<>();
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ int port = BASE_PORT + i;
+ int hits = portHits.get(port).get();
+ sum += hits;
+ if (hits > 0) {
+ hitPorts.add(port);
+ }
+ // Random over 10 with 100000 trials → expected 10000/server.
+ // [2000, 20000] leaves >>3-sigma headroom; CI-safe.
+ assertTrue("port " + port + " never received a request", hits > 0);
+ assertTrue("port " + port + " too few hits: " + hits, hits >= 2000);
+ assertTrue("port " + port + " too many hits: " + hits, hits <= 20000);
+ }
+ assertEquals("total responses should equal total requests", totalRequests, sum);
+ assertEquals("all 10 backend ports must be hit", SERVER_COUNT, hitPorts.size());
+ } finally {
+ backendConfig.stop();
+ }
+ }
+
+ /**
+ * Service impl that tags every response with its own listening port so the test can verify
+ * the actual server that handled each request.
+ */
+ private static class PortAwareGreeterServiceImpl implements GreeterService {
+
+ private final int port;
+
+ PortAwareGreeterServiceImpl(int port) {
+ this.port = port;
+ }
+
+ @Override
+ public HelloResponse sayHello(RpcContext context, HelloRequest request) {
+ String message = request.getMessage();
+ return HelloResponse.newBuilder()
+ .setMessage(message + "|port=" + port)
+ .build();
+ }
+
+ @Override
+ public String sayBlankHello(RpcContext context, HelloRequest request) {
+ return "";
+ }
+
+ @Override
+ public TestBeanConvertWithGetMethodRsp sayHelloNonPbType(RpcContext context,
+ TestBeanConvertWithGetMethodReq request) {
+ return new TestBeanConvertWithGetMethodRsp(request.getMessage(),
+ request.getStatus(), request.getComments());
+ }
+ }
+
+ private static class TestResult {
+
+ boolean succ;
+ Throwable ex;
+ }
+}
diff --git a/trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/HttpRpcClientLongLinkTest.java b/trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/HttpRpcClientLongLinkTest.java
new file mode 100644
index 000000000..0b5fdc1b7
--- /dev/null
+++ b/trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/HttpRpcClientLongLinkTest.java
@@ -0,0 +1,244 @@
+/*
+ * Tencent is pleased to support the open source community by making tRPC available.
+ *
+ * Copyright (C) 2023 Tencent.
+ * All rights reserved.
+ *
+ * If you have downloaded a copy of the tRPC source code from Tencent,
+ * please note that tRPC source code is licensed under the Apache 2.0 License,
+ * A copy of the Apache 2.0 License can be found in the LICENSE file.
+ */
+
+package com.tencent.trpc.proto.http;
+
+import com.tencent.trpc.core.common.config.ProtocolConfig;
+import com.tencent.trpc.proto.http.client.Http2RpcClient;
+import com.tencent.trpc.proto.http.client.Http2cRpcClient;
+import com.tencent.trpc.proto.http.client.HttpRpcClient;
+import com.tencent.trpc.proto.http.client.HttpsRpcClient;
+import java.lang.reflect.Field;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Pure unit tests for the long-connection extensions on the HTTP RpcClient hierarchy:
+ * {@code markUsed}, the overridden {@code isAvailable}, and the idle-threshold heuristic.
+ *
+ * No Jetty server is started; we manipulate the internal {@code lastUsedNanos} field via
+ * reflection to drive the idle/active branches.
+ */
+public class HttpRpcClientLongLinkTest {
+
+ private static final long ELEVEN_MINUTES_NANOS = java.util.concurrent.TimeUnit.MINUTES.toNanos(11);
+
+ /**
+ * HttpRpcClient.isAvailable returns false when not started (lifecycle.isStarted() == false),
+ * regardless of lastUsedNanos.
+ */
+ @Test
+ public void testHttpRpcClientNotAvailableBeforeOpen() {
+ ProtocolConfig pc = newProtocolConfig();
+ HttpRpcClient client = new HttpRpcClient(pc);
+ // Lifecycle has not been started, so super.isAvailable() returns false.
+ Assert.assertFalse(client.isAvailable());
+ }
+
+ /**
+ * HttpRpcClient.isAvailable returns true when started AND recently used.
+ */
+ @Test
+ public void testHttpRpcClientAvailableWhenStartedAndFresh() {
+ ProtocolConfig pc = newProtocolConfig();
+ HttpRpcClient client = new HttpRpcClient(pc);
+ client.open();
+ try {
+ client.markUsed(); // refresh
+ Assert.assertTrue(client.isAvailable());
+ } finally {
+ client.close();
+ }
+ }
+
+ /**
+ * HttpRpcClient.isAvailable returns false when started but idle > 10 min.
+ */
+ @Test
+ public void testHttpRpcClientNotAvailableWhenIdleTooLong() throws Exception {
+ ProtocolConfig pc = newProtocolConfig();
+ HttpRpcClient client = new HttpRpcClient(pc);
+ client.open();
+ try {
+ // Force lastUsedNanos to "11 minutes ago".
+ setField(client, "lastUsedNanos", System.nanoTime() - ELEVEN_MINUTES_NANOS);
+ Assert.assertFalse(client.isAvailable());
+
+ // Recovering via markUsed restores availability.
+ client.markUsed();
+ Assert.assertTrue(client.isAvailable());
+ } finally {
+ client.close();
+ }
+ }
+
+ /**
+ * Http2cRpcClient mirrors the same logic.
+ */
+ @Test
+ public void testHttp2cRpcClientNotAvailableBeforeOpen() {
+ ProtocolConfig pc = newProtocolConfig();
+ Http2cRpcClient client = new Http2cRpcClient(pc);
+ Assert.assertFalse(client.isAvailable());
+ }
+
+ @Test
+ public void testHttp2cRpcClientAvailableWhenStartedAndFresh() {
+ ProtocolConfig pc = newProtocolConfig();
+ Http2cRpcClient client = new Http2cRpcClient(pc);
+ client.open();
+ try {
+ client.markUsed();
+ Assert.assertTrue(client.isAvailable());
+ } finally {
+ client.close();
+ }
+ }
+
+ @Test
+ public void testHttp2cRpcClientNotAvailableWhenIdleTooLong() throws Exception {
+ ProtocolConfig pc = newProtocolConfig();
+ Http2cRpcClient client = new Http2cRpcClient(pc);
+ client.open();
+ try {
+ setField(client, "lastUsedNanos", System.nanoTime() - ELEVEN_MINUTES_NANOS);
+ Assert.assertFalse(client.isAvailable());
+ client.markUsed();
+ Assert.assertTrue(client.isAvailable());
+ } finally {
+ client.close();
+ }
+ }
+
+ /**
+ * doOpen wires the underlying httpClient. close() must release it without throwing.
+ */
+ @Test
+ public void testHttpRpcClientOpenCloseReleasesResources() {
+ ProtocolConfig pc = newProtocolConfig();
+ HttpRpcClient client = new HttpRpcClient(pc);
+ client.open();
+ Assert.assertNotNull(client.getHttpClient());
+ client.close();
+ Assert.assertTrue(client.isClosed());
+ // Idempotent close is safe.
+ client.close();
+ }
+
+ /**
+ * doOpen for Http2cRpcClient creates an httpAsyncClient and starts it.
+ */
+ @Test
+ public void testHttp2cRpcClientOpenCloseReleasesResources() {
+ ProtocolConfig pc = newProtocolConfig();
+ Http2cRpcClient client = new Http2cRpcClient(pc);
+ client.open();
+ Assert.assertNotNull(client.getHttpAsyncClient());
+ client.close();
+ Assert.assertTrue(client.isClosed());
+ client.close();
+ }
+
+ /**
+ * Http2RpcClient inherits markUsed/isAvailable from Http2cRpcClient. We don't need a real
+ * TLS context — we only verify the inherited methods behave the same on the subclass.
+ * Use reflection to flip lifecycle state to STARTED so that super.isAvailable() returns true.
+ */
+ @Test
+ public void testHttp2RpcClientInheritsIdleHeuristic() throws Exception {
+ ProtocolConfig pc = newProtocolConfig();
+ Http2RpcClient client = new Http2RpcClient(pc);
+ forceLifecycleStarted(client);
+ try {
+ client.markUsed();
+ Assert.assertTrue(client.isAvailable());
+ setField(client, "lastUsedNanos", System.nanoTime() - ELEVEN_MINUTES_NANOS);
+ Assert.assertFalse(client.isAvailable());
+ } finally {
+ forceLifecycleClosed(client);
+ }
+ }
+
+ /**
+ * HttpsRpcClient extends Http2RpcClient. Same inheritance check.
+ */
+ @Test
+ public void testHttpsRpcClientInheritsIdleHeuristic() throws Exception {
+ ProtocolConfig pc = newProtocolConfig();
+ HttpsRpcClient client = new HttpsRpcClient(pc);
+ forceLifecycleStarted(client);
+ try {
+ client.markUsed();
+ Assert.assertTrue(client.isAvailable());
+ setField(client, "lastUsedNanos", System.nanoTime() - ELEVEN_MINUTES_NANOS);
+ Assert.assertFalse(client.isAvailable());
+ } finally {
+ forceLifecycleClosed(client);
+ }
+ }
+
+ /* ---------------------- helpers ---------------------- */
+
+ private static ProtocolConfig newProtocolConfig() {
+ ProtocolConfig pc = new ProtocolConfig();
+ pc.setIp("127.0.0.1");
+ pc.setPort(0);
+ pc.setProtocol("http");
+ pc.setNetwork("tcp");
+ pc.setDefault();
+ return pc;
+ }
+
+ /**
+ * Bypass real doOpen — flip the embedded LifecycleObj to STARTED so isAvailable's
+ * super.isAvailable() check passes without spinning up actual HTTP infrastructure.
+ */
+ private static void forceLifecycleStarted(Object client) throws Exception {
+ Field lf = findField(client.getClass(), "lifecycleObj");
+ lf.setAccessible(true);
+ Object lifecycle = lf.get(client);
+ Field state = findField(lifecycle.getClass(), "state");
+ state.setAccessible(true);
+ // LifecycleState enum: STARTED ordinal lookup via reflection (avoid hard dependency).
+ Class> stateEnum = state.getType();
+ Object started = stateEnum.getMethod("valueOf", String.class).invoke(null, "STARTED");
+ state.set(lifecycle, started);
+ }
+
+ private static void forceLifecycleClosed(Object client) throws Exception {
+ Field lf = findField(client.getClass(), "lifecycleObj");
+ lf.setAccessible(true);
+ Object lifecycle = lf.get(client);
+ Field state = findField(lifecycle.getClass(), "state");
+ state.setAccessible(true);
+ Class> stateEnum = state.getType();
+ Object stopped = stateEnum.getMethod("valueOf", String.class).invoke(null, "STOPPED");
+ state.set(lifecycle, stopped);
+ }
+
+ private static void setField(Object target, String fieldName, Object value) throws Exception {
+ Field f = findField(target.getClass(), fieldName);
+ f.setAccessible(true);
+ f.set(target, value);
+ }
+
+ private static Field findField(Class> clazz, String name) throws NoSuchFieldException {
+ Class> c = clazz;
+ while (c != null) {
+ try {
+ return c.getDeclaredField(name);
+ } catch (NoSuchFieldException ignored) {
+ c = c.getSuperclass();
+ }
+ }
+ throw new NoSuchFieldException(name);
+ }
+}
diff --git a/trpc-proto/trpc-proto-standard/src/test/java/com/tencent/trpc/proto/standard/concurrenttest/MultiPortNamingUrlConcurrentTest.java b/trpc-proto/trpc-proto-standard/src/test/java/com/tencent/trpc/proto/standard/concurrenttest/MultiPortNamingUrlConcurrentTest.java
new file mode 100644
index 000000000..3269332e7
--- /dev/null
+++ b/trpc-proto/trpc-proto-standard/src/test/java/com/tencent/trpc/proto/standard/concurrenttest/MultiPortNamingUrlConcurrentTest.java
@@ -0,0 +1,228 @@
+/*
+ * Tencent is pleased to support the open source community by making tRPC available.
+ *
+ * Copyright (C) 2023 Tencent.
+ * All rights reserved.
+ *
+ * If you have downloaded a copy of the tRPC source code from Tencent,
+ * please note that tRPC source code is licensed under the Apache 2.0 License,
+ * A copy of the Apache 2.0 License can be found in the LICENSE file.
+ */
+
+package com.tencent.trpc.proto.standard.concurrenttest;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.protobuf.ByteString;
+import com.tencent.trpc.core.common.ConfigManager;
+import com.tencent.trpc.core.common.config.BackendConfig;
+import com.tencent.trpc.core.common.config.ProviderConfig;
+import com.tencent.trpc.core.common.config.ServiceConfig;
+import com.tencent.trpc.core.rpc.RpcClientContext;
+import com.tencent.trpc.core.rpc.RpcServerContext;
+import com.tencent.trpc.proto.standard.common.HelloRequestProtocol.HelloRequest;
+import com.tencent.trpc.proto.standard.common.HelloRequestProtocol.HelloResponse;
+import com.tencent.trpc.proto.support.DefResponseFutureManager;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Verifies that {@link BackendConfig#setNamingUrl(String)} configured with a comma-separated list
+ * of {@code ip:port} entries fan-outs requests to all backends (random load balance) under heavy
+ * concurrency.
+ *
+ * Setup: 10 standalone tRPC servers on consecutive ports; one shared {@link BackendConfig}
+ * whose namingUrl lists all 10 endpoints; 100 concurrent threads × 1000 requests each.
+ *
+ * Each server impl echoes back its own listening port so the client can group responses by the
+ * actually-served port. Final assertions:
+ *
+ * - every request succeeds with the exact echoed payload,
+ * - all 10 backend ports get hit at least once (proving namingUrl actually distributes
+ * traffic, not pinning to one endpoint),
+ * - distribution is roughly balanced — random selector over N=10 with R=100000 requests
+ * gives an expected 10000 per server; we tolerate {@code [2000, 20000]} per server which
+ * is a generous bound far above any realistic random outlier.
+ *
+ */
+public class MultiPortNamingUrlConcurrentTest {
+
+ private static final int BASE_TCP_PORT = 12500;
+ private static final int SERVER_COUNT = 10;
+ private static final int THREAD_COUNT = 100;
+ private static final int CYCLE_PER_THREAD = 1000;
+
+ private final List serviceConfigs = new ArrayList<>(SERVER_COUNT);
+
+ @Before
+ public void before() {
+ ConfigManager.stopTest();
+ ConfigManager.startTest();
+ startServers();
+ }
+
+ @After
+ public void stop() {
+ for (ServiceConfig serviceConfig : serviceConfigs) {
+ try {
+ serviceConfig.unExport();
+ } catch (Exception ignore) {
+ // ignore
+ }
+ }
+ serviceConfigs.clear();
+ ConfigManager.stopTest();
+ }
+
+ @Test
+ public void testMultiPortNamingUrlConcurrent() throws InterruptedException {
+ // Build the comma-separated namingUrl: "ip://127.0.0.1:p1,127.0.0.1:p2,..."
+ StringBuilder urlBuilder = new StringBuilder("ip://");
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ if (i > 0) {
+ urlBuilder.append(',');
+ }
+ urlBuilder.append("127.0.0.1:").append(BASE_TCP_PORT + i);
+ }
+ String namingUrl = urlBuilder.toString();
+
+ BackendConfig backendConfig = new BackendConfig();
+ DefResponseFutureManager.reset();
+ backendConfig.setNamingUrl(namingUrl);
+ // One long connection per backend addr is enough; keeps the test deterministic.
+ backendConfig.setConnsPerAddr(5);
+ backendConfig.setNetwork("tcp");
+ // Generous client-side timeout so a slow JIT warm-up can't fail individual calls.
+ backendConfig.setRequestTimeout(60_000);
+
+ final ConcurrentTestServiceApi proxy = backendConfig.getProxy(ConcurrentTestServiceApi.class);
+
+ // Per-port hit counter aggregated across all threads.
+ ConcurrentHashMap portHits = new ConcurrentHashMap<>();
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ portHits.put(BASE_TCP_PORT + i, new AtomicInteger(0));
+ }
+
+ CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
+ List results = new ArrayList<>(THREAD_COUNT);
+ for (int t = 0; t < THREAD_COUNT; t++) {
+ final TestResult r = new TestResult();
+ results.add(r);
+ final int threadIndex = t;
+ new Thread(() -> {
+ try {
+ for (int i = 0; i < CYCLE_PER_THREAD; i++) {
+ String reqPayload = "req-" + threadIndex + "-" + i;
+ RpcClientContext context = new RpcClientContext();
+ HelloResponse response = proxy.sayHello(context, HelloRequest.newBuilder()
+ .setMessage(ByteString.copyFromUtf8(reqPayload))
+ .build());
+ // Server impl returns "|port="
+ String message = response.getMessage().toStringUtf8();
+ int sep = message.lastIndexOf("|port=");
+ assertTrue("response missing port marker: " + message, sep > 0);
+ String echoed = message.substring(0, sep);
+ int port = Integer.parseInt(message.substring(sep + "|port=".length()));
+ assertEquals("echoed payload must match request", reqPayload, echoed);
+ AtomicInteger counter = portHits.get(port);
+ assertTrue("response from unexpected port: " + port, counter != null);
+ counter.incrementAndGet();
+ }
+ r.succ = true;
+ } catch (Throwable ex) {
+ r.succ = false;
+ r.ex = ex;
+ ex.printStackTrace();
+ } finally {
+ latch.countDown();
+ }
+ }, "concurrent-caller-" + t).start();
+ }
+ // 200s upper bound; full run on a laptop usually finishes in a few seconds.
+ boolean done = latch.await(200, TimeUnit.SECONDS);
+ assertTrue("concurrent calls timed out before completion", done);
+
+ for (int i = 0; i < results.size(); i++) {
+ TestResult r = results.get(i);
+ assertTrue("worker thread " + i + " failed: "
+ + (r.ex == null ? "" : r.ex.toString()), r.succ);
+ }
+
+ // ---- final aggregate assertions ----
+ int totalRequests = THREAD_COUNT * CYCLE_PER_THREAD;
+ int sum = 0;
+ Set hitPorts = new HashSet<>();
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ int port = BASE_TCP_PORT + i;
+ int hits = portHits.get(port).get();
+ sum += hits;
+ if (hits > 0) {
+ hitPorts.add(port);
+ }
+ // Random over 10 with 100000 trials → expected 10000/server.
+ // Lower bound 2000 / upper bound 20000 leaves >>3-sigma headroom; CI-safe.
+ assertTrue("port " + port + " never received a request", hits > 0);
+ assertTrue("port " + port + " too few hits: " + hits, hits >= 2000);
+ assertTrue("port " + port + " too many hits: " + hits, hits <= 20000);
+ }
+ assertEquals("total responses should equal total requests", totalRequests, sum);
+ assertEquals("all 10 backend ports must be hit", SERVER_COUNT, hitPorts.size());
+ }
+
+ private void startServers() {
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ int port = BASE_TCP_PORT + i;
+ ProviderConfig providerConfig = new ProviderConfig<>();
+ providerConfig.setRef(new PortAwareEchoServiceImpl(port));
+
+ ServiceConfig serviceConfig = new ServiceConfig();
+ serviceConfig.setIp("127.0.0.1");
+ serviceConfig.setNetwork("tcp");
+ serviceConfig.setPort(port);
+ serviceConfig.setEnableLinkTimeout(true);
+ // Generous server-side timeout to avoid spurious timeouts on slow CI.
+ serviceConfig.setRequestTimeout(60_000);
+ serviceConfig.addProviderConfig(providerConfig);
+ serviceConfig.export();
+ serviceConfigs.add(serviceConfig);
+ }
+ }
+
+ /**
+ * Service impl that tags every response with its own listening port so the test can verify
+ * the actual server that handled each request.
+ */
+ private static class PortAwareEchoServiceImpl implements ConcurrentTestService {
+
+ private final int port;
+
+ PortAwareEchoServiceImpl(int port) {
+ this.port = port;
+ }
+
+ @Override
+ public HelloResponse sayHello(RpcServerContext context, HelloRequest request) {
+ String echoed = request.getMessage().toStringUtf8();
+ String tagged = echoed + "|port=" + port;
+ return HelloResponse.newBuilder()
+ .setMessage(ByteString.copyFromUtf8(tagged))
+ .build();
+ }
+ }
+
+ private static class TestResult {
+
+ boolean succ;
+ Throwable ex;
+ }
+}
diff --git a/trpc-test/trpc-test-integration/src/integration-test/java/com/tencent/trpc/integration/test/transport/TransportIntegrationTest.java b/trpc-test/trpc-test-integration/src/integration-test/java/com/tencent/trpc/integration/test/transport/TransportIntegrationTest.java
index 29460e42e..feb5497e9 100644
--- a/trpc-test/trpc-test-integration/src/integration-test/java/com/tencent/trpc/integration/test/transport/TransportIntegrationTest.java
+++ b/trpc-test/trpc-test-integration/src/integration-test/java/com/tencent/trpc/integration/test/transport/TransportIntegrationTest.java
@@ -102,15 +102,14 @@ public void testUdpToTcpNettyTransport() {
}
/**
- * Test for server-side idle-timeout
+ * Test for server-side idle-timeout.
+ * Long-connection mode: idle timeout no longer closes the connection. The framework
+ * keeps the connection alive regardless of how long it stays idle, so this case is
+ * intentionally left empty as a placeholder for the historical behaviour.
*/
@Test
public void testIdleTimeout() {
- assertThrows(RuntimeException.class, () ->
- tcpEchoAPI.delayedEcho(new RpcClientContext(), DelayedEchoRequest.newBuilder()
- .setMessage("timeout")
- .setDelaySeconds(2)
- .build()));
+ // No-op under long-connection mode: idleTimeout has no effect on the netty pipeline.
}
/**
diff --git a/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyClientHandler.java b/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyClientHandler.java
index 26dce0165..cb847535e 100644
--- a/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyClientHandler.java
+++ b/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyClientHandler.java
@@ -20,8 +20,6 @@
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
-import io.netty.handler.timeout.IdleState;
-import io.netty.handler.timeout.IdleStateEvent;
@io.netty.channel.ChannelHandler.Sharable
public class NettyClientHandler extends ChannelDuplexHandler {
@@ -105,24 +103,6 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}
}
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (evt instanceof IdleStateEvent) {
- NettyChannel channel = NettyChannelManager.getOrAddChannel(ctx.channel(), config);
- try {
- // only close the channel in a TCP scenario.
- if (isTcp) {
- IdleState state = ((IdleStateEvent) evt).state();
- logger.warn("Idle event(state=" + state + ") trigger, close channel" + channel);
- channel.close();
- }
- } finally {
- NettyChannelManager.removeChannelIfDisconnected(ctx.channel());
- }
- }
- super.userEventTriggered(ctx, evt);
- }
-
public ConcurrentHashSet getChannelSet() {
return channelSet;
}
diff --git a/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyServerHandler.java b/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyServerHandler.java
index 4b495560e..26eade523 100644
--- a/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyServerHandler.java
+++ b/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyServerHandler.java
@@ -20,8 +20,6 @@
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
-import io.netty.handler.timeout.IdleState;
-import io.netty.handler.timeout.IdleStateEvent;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -122,21 +120,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
}
}
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (evt instanceof IdleStateEvent) {
- NettyChannel channel = NettyChannelManager.getOrAddChannel(ctx.channel(), config);
- try {
- IdleState state = ((IdleStateEvent) evt).state();
- logger.warn("idle event[{}] trigger, close channel:{}", state, channel);
- channel.close();
- } finally {
- NettyChannelManager.removeChannelIfDisconnected(ctx.channel());
- }
- }
- super.userEventTriggered(ctx, evt);
- }
-
public ConcurrentMap getChannels() {
return clientChannels;
}
diff --git a/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyTcpClientTransport.java b/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyTcpClientTransport.java
index 8bc913304..312ba4e76 100644
--- a/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyTcpClientTransport.java
+++ b/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyTcpClientTransport.java
@@ -11,8 +11,6 @@
package com.tencent.trpc.transport.netty;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
import com.tencent.trpc.core.common.config.ProtocolConfig;
import com.tencent.trpc.core.logger.Logger;
import com.tencent.trpc.core.logger.LoggerFactory;
@@ -25,13 +23,15 @@
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.CompletableFuture;
/**
- * A netty tcp ClientTransport
+ * A netty tcp ClientTransport.
+ * Long-connection mode: no IdleStateHandler is installed and idle detection is disabled.
+ * Connections are kept alive for the lifetime of the transport and only released when the
+ * transport is shut down or when the peer actively closes the connection.
*/
public class NettyTcpClientTransport extends NettyAbstractClientTransport {
@@ -67,18 +67,17 @@ protected void doOpen() {
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(NioSocketChannel ch) {
-
- IdleStateHandler clientIdleHandler =
- new IdleStateHandler(0, config.getIdleTimeout(), 0, MILLISECONDS);
+ // Long-connection mode: do NOT install IdleStateHandler. The idleTimeout field
+ // is kept for backward compatibility but no longer takes effect on the netty
+ // pipeline.
ChannelPipeline p = ch.pipeline();
if (codec == null) {
- p.addLast("client-idle", clientIdleHandler).addLast("handler", clientHandler);
+ p.addLast("handler", clientHandler);
} else {
NettyCodecAdapter nettyCodec = NettyCodecAdapter
.createTcpCodecAdapter(codec, config);
p.addLast("encode", nettyCodec.getEncoder())
.addLast("decode", nettyCodec.getDecoder())
- .addLast("client-idle", clientIdleHandler)
.addLast("handler", clientHandler);
}
}
diff --git a/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyTcpServerTransport.java b/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyTcpServerTransport.java
index 4af2e3cdd..d6e6d6c6e 100644
--- a/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyTcpServerTransport.java
+++ b/trpc-transport/trpc-transport-netty/src/main/java/com/tencent/trpc/transport/netty/NettyTcpServerTransport.java
@@ -11,8 +11,6 @@
package com.tencent.trpc.transport.netty;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
import com.tencent.trpc.core.common.config.ProtocolConfig;
import com.tencent.trpc.core.exception.TransportException;
import com.tencent.trpc.core.logger.Logger;
@@ -40,7 +38,6 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.flush.FlushConsolidationHandler;
-import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.Version;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.HashSet;
@@ -122,16 +119,14 @@ protected void doOpen() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
- IdleStateHandler idleHandler =
- new IdleStateHandler(0, 0, config.getIdleTimeout(), MILLISECONDS);
- if (codec == null) {
- p.addLast("server-idle", idleHandler);
- } else {
+ // Long-connection mode: do NOT install IdleStateHandler. The idleTimeout field
+ // is kept for backward compatibility but no longer takes effect on the netty
+ // pipeline. The server never proactively closes a client connection due to idle.
+ if (codec != null) {
NettyCodecAdapter nettyCodec = NettyCodecAdapter
.createTcpCodecAdapter(codec, config);
p.addLast("encode", nettyCodec.getEncoder())//
- .addLast("decode", nettyCodec.getDecoder())//
- .addLast("server-idle", idleHandler);
+ .addLast("decode", nettyCodec.getDecoder());
}
if (flushConsolidationSwitch) {
p.addLast("flushConsolidationHandlers",
diff --git a/trpc-transport/trpc-transport-netty/src/test/java/com/tencent/trpc/transport/netty/NettyChannelHandlerTest.java b/trpc-transport/trpc-transport-netty/src/test/java/com/tencent/trpc/transport/netty/NettyChannelHandlerTest.java
index 6f962d660..11a0073b7 100644
--- a/trpc-transport/trpc-transport-netty/src/test/java/com/tencent/trpc/transport/netty/NettyChannelHandlerTest.java
+++ b/trpc-transport/trpc-transport-netty/src/test/java/com/tencent/trpc/transport/netty/NettyChannelHandlerTest.java
@@ -33,7 +33,8 @@ public void test() throws Exception {
new NettyClientHandler(new ChannelHandlerAdapter(), new ProtocolConfig(), true)
.userEventTriggered(new ChannelHandlerContextTest(channelTest2),
IdleStateEvent.WRITER_IDLE_STATE_EVENT);
- Assert.assertTrue(channelTest2.getIsClose() != null && channelTest2.isClose);
+ // Long-connection mode: client must NOT close the channel on idle event.
+ Assert.assertTrue(channelTest2.getIsClose() == null || !channelTest2.isClose);
ChannelTest channelTest3 = new ChannelTest();
channelTest3.setActive(true);
@@ -47,6 +48,7 @@ public void test() throws Exception {
new NettyServerHandler(new ChannelHandlerAdapter(), new ProtocolConfig(), true)
.userEventTriggered(new ChannelHandlerContextTest(channelTest4),
IdleStateEvent.WRITER_IDLE_STATE_EVENT);
- Assert.assertTrue(channelTest4.getIsClose() != null && channelTest4.isClose);
+ // Long-connection mode: server must NOT close the channel on idle event.
+ Assert.assertTrue(channelTest4.getIsClose() == null || !channelTest4.isClose);
}
}