Skip to content

Feat/long link#141

Open
wardseptember wants to merge 5 commits into
trpc-group:masterfrom
wardseptember:feat/long_link
Open

Feat/long link#141
wardseptember wants to merge 5 commits into
trpc-group:masterfrom
wardseptember:feat/long_link

Conversation

@wardseptember
Copy link
Copy Markdown
Collaborator

No description provided.

@codecov
Copy link
Copy Markdown

codecov Bot commented May 16, 2026

Codecov Report

❌ Patch coverage is 90.52632% with 9 lines in your changes missing coverage. Please review.
✅ Project coverage is 86.20402%. Comparing base (063c0d2) to head (0b9b7e2).

Files with missing lines Patch % Lines
...ent/trpc/core/cluster/RpcClusterClientManager.java 92.72727% 4 Missing ⚠️
...ncent/trpc/core/cluster/def/DefClusterInvoker.java 55.55556% 4 Missing ⚠️
.../trpc/transport/netty/NettyTcpClientTransport.java 0.00000% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@                  Coverage Diff                  @@
##                master        #141         +/-   ##
=====================================================
+ Coverage     85.87630%   86.20402%   +0.32771%     
- Complexity        4327        4337         +10     
=====================================================
  Files              436         436                 
  Lines            14373       14381          +8     
  Branches          1287        1293          +6     
=====================================================
+ Hits             12343       12397         +54     
+ Misses            2030        1984         -46     
Files with missing lines Coverage Δ
...t/trpc/core/transport/AbstractClientTransport.java 73.71795% <100.00000%> (ø)
...t/trpc/proto/http/client/Http2ConsumerInvoker.java 88.88889% <100.00000%> (+0.28130%) ⬆️
...encent/trpc/proto/http/client/Http2cRpcClient.java 86.95652% <100.00000%> (+5.70651%) ⬆️
...nt/trpc/proto/http/client/HttpConsumerInvoker.java 100.00000% <100.00000%> (ø)
.../tencent/trpc/proto/http/client/HttpRpcClient.java 90.62500% <100.00000%> (+6.41446%) ⬆️
...ncent/trpc/transport/netty/NettyClientHandler.java 100.00000% <ø> (ø)
...ncent/trpc/transport/netty/NettyServerHandler.java 100.00000% <ø> (ø)
.../trpc/transport/netty/NettyTcpServerTransport.java 84.25926% <100.00000%> (-0.56218%) ⬇️
.../trpc/transport/netty/NettyTcpClientTransport.java 96.96970% <0.00000%> (-0.25252%) ⬇️
...ent/trpc/core/cluster/RpcClusterClientManager.java 93.93939% <92.72727%> (+29.13939%) ⬆️
... and 1 more

... and 3 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@wardseptember
Copy link
Copy Markdown
Collaborator Author

tRPC-Java 长链接改造方案详细总结

一、需求与目标

1.1 原方案问题

原 tRPC-Java 客户端走的是 "按需短连接 + 空闲断开" 模式:

  • RpcClusterClientManager 内置 idleScanner(每 N 秒扫描所有缓存的 RpcClient,超 idleTimeout 没用就 close()
  • Netty 客户端/服务端 pipeline 注册 IdleStateHandler(读/写空闲触发 userEventTriggered → 主动关闭)
  • 高并发短间隔的 RPC 场景下,反复 connect / disconnect 浪费三次握手 + TLS 协商,且 idle 触发后下一次请求面临冷启动延迟

1.2 目标

  1. 真长链接:连接建立后保持,不主动断开
  2. 不内存泄漏:客户端/服务端被异常下线场景下,缓存里的死连接对象要能被回收
  3. 修改尽量简单、轻量
  4. 保留 idleTimeout 字段做兼容(不删除,只是不参与判断)
  5. trpc 协议http 协议 都要支持长链接
  6. 修改内容单测覆盖率尽可能到 100%

二、整体架构与关键模块

┌──────────────────────────────────────────────────────────────────┐
│  RpcClusterClientManager  ← 客户端缓存 + 轻量定时重连检查器           │
│      ↓ 持有                                                       │
│  RpcClientProxy (failureCount)  ← 包一层用来计数连续 unavailable    │
│      ↓ delegate                                                   │
│  AbstractRpcClient → AbstractClientTransport (channels)           │
│      ↓                                                            │
│  Netty Channel (无 IdleStateHandler)                              │
└──────────────────────────────────────────────────────────────────┘

HTTP 侧:
HttpRpcClient / Http2cRpcClient  ← evictExpired/evictIdle + lastUsedNanos
      ↓
PoolingHttpClientConnectionManager (Apache HttpClient)

三、改造细节(按层分组)

3.1 Cluster 层:RpcClusterClientManager

这是改动核心

移除部分

// 删除:每 N 秒扫描所有 client,关闭 idle 超时的客户端
private static void scanIdleClient() { ... }

新增轻量定时器:观测 + 失败阈值清理

private static final int RECONNECT_CHECK_PERIOD_SECONDS = 30;
private static final int MAX_RECONNECT_FAILURES = 5;

private static volatile ScheduledFuture<?> reconnectCheckerFuture;

private static void ensureReconnectCheckerStarted() {
    // DCL:避免重复启动;首次 getOrCreateClient 时懒启动
    if (reconnectCheckerFuture != null || CLOSED_FLAG.get()) return;
    synchronized (RpcClusterClientManager.class) {
        if (reconnectCheckerFuture != null || CLOSED_FLAG.get()) return;
        try {
            reconnectCheckerFuture = WorkerPoolManager.getShareScheduler()
                    .scheduleAtFixedRate(
                        RpcClusterClientManager::checkAndReconnect,
                        30, 30, TimeUnit.SECONDS);
        } catch (Throwable ex) {
            // 即使调度器不可用也不能阻塞业务,业务路径会自己 lazy reconnect
            logger.warn("Start reconnect-check timer failed, ...", ex);
        }
    }
}

static void checkAndReconnect() {
    if (CLOSED_FLAG.get()) return;
    CLUSTER_MAP.forEach((bConfig, clusterMap) -> clusterMap.forEach((key, proxy) -> {
        try {
            if (proxy.isAvailable()) {
                proxy.failureCount.set(0);   // 健康即重置
                return;
            }
            int fails = proxy.failureCount.incrementAndGet();
            if (fails >= MAX_RECONNECT_FAILURES) {
                proxy.close();   // 触发 closeFuture → cache eviction
            }
        } catch (Throwable ex) {
            logger.error("Reconnect-check on client {} threw", key, ex);
        }
    }));
}

为什么这么设计

  • 不主动发心跳:只观测 isAvailable() 当前态,依赖底层 transport 在 channelInactive 时已经把 channel 标 invalid,避免引入心跳协议复杂度
  • 5×30s = 150s 容忍窗:避免短暂网络抖动造成误清理
  • 达到阈值才 close():close 触发 closeFuture → 在 getOrCreateClient 注册的钩子里把 entry 从 CLUSTER_MAP 移除,下次请求自然重建
  • 懒启动:仅当真正调用 getOrCreateClient 时才注册定时任务,未启用 RPC 客户端的进程零开销

缓存条目防误删(CAS)

proxy.closeFuture().whenComplete((r, e) -> {
    Map<String, RpcClientProxy> clusterMap = CLUSTER_MAP.get(bConfig);
    if (clusterMap != null) {
        clusterMap.remove(k, proxy);   // ← 关键:CAS remove(key, value)
    }
});

为什么用 remove(k, proxy) 而不是 remove(k)
旧 proxy close 后异步触发钩子时,新 proxy 可能已经放进缓存(用户主动 close → 立即重建)。无条件 remove(key)误删新对象导致内存泄漏与下一次请求失败。CAS 语义保证只有当缓存里的还是同一个旧 proxy 时才移除。

idleTimeout 字段保留

仅保留字段做向后兼容;不在任何代码分支中读取。


3.2 ClusterInvoker 层:DefClusterInvoker

问题(原代码 bug)

// 原代码:closeFuture 钩子无条件移除
invoker.getRpcClient().closeFuture().whenComplete((r, e) ->
    invokerCache.remove(uniqId)
);

这里也有同样的"误删新条目"风险。

修复

RpcClient rpcClient = ...;
ConsumerInvoker<T> invoker = rpcClient.createInvoker(consumerConfig);
ConsumerInvoker<T> existing = invokerCache.putIfAbsent(uniqId, invoker);
if (existing != null) {
    return existing;
}
// CAS remove:只删自己注册的那一份
rpcClient.closeFuture().whenComplete((r, e) ->
    invokerCache.remove(uniqId, invoker)
);
return invoker;

3.3 Transport 层:AbstractClientTransport

问题:原 channels 字段类型 ArrayList

this.channels = Lists.newArrayListWithExpectedSize(config.getConnsPerAddr());

长链接下被多个业务线程并发读(select channel 发请求),又会被建连/重连线程写。ArrayList 没有内存可见性保证,存在 读到半构造对象 / 读到旧引用 的并发风险。

改成

this.channels = new CopyOnWriteArrayList<>();

为什么用 CopyOnWriteArrayList

  • 读多写极少(写只在 startInternal / channelInactive 重连时发生)
  • 读端 zero-cost、强内存可见性(每次写都全量替换 volatile Object[] elements
  • 长链接数量上限(connsPerAddr)由 startInternal 中的 for 循环控制,与 List 实现无关

3.4 Netty 层

NettyTcpClientTransport / NettyTcpServerTransport

// 删除:客户端、服务端 pipeline 都不再注册 IdleStateHandler
pipeline.addLast(new IdleStateHandler(idleTimeout, 0, 0, MILLISECONDS));

NettyClientHandler / NettyServerHandler

// 删除:idle 触发后主动关闭 channel 的死代码
@Override public void userEventTriggered(...) {
    if (evt instanceof IdleStateEvent) ctx.close();
}

为什么必须改
即使把 RpcClusterClientManager 的 idle 扫描去掉,只要 Netty 还注册了 IdleStateHandler,连接照样会在空闲后被 Netty 自己关掉,长链接形同虚设。

安全性分析

  • 服务端去掉 IdleStateHandler 不会导致主调进程异常下线时被调侧泄漏,因为:
    • 正常 close / 进程退出:四次挥手 → Netty 收到 channelInactive → 清理 channel
    • 物理断网(拔网线/IP 漂移):依赖 OS TCP keepalive(默认 ~2h)回收,对应内存约 (channelObject + buffer) ≈ KB 级,每条连接,能接受
  • 主调侧依赖 channelInactive 配合上面 RpcClusterClientManager 的定时器兜底(150s 后 close + 缓存 evict)

3.5 HTTP 协议长链接

HTTP 走的是 Apache HttpClient 4.x 的 PoolingHttpClientConnectionManager,要解决两个问题:

  1. 已经空闲很久但服务端可能已经断开的连接还在池里(NoHttpResponseException)
  2. HttpRpcClient 实例本身要能被回收(isAvailable() 误报会让 RpcClusterClientManager 拿不掉死实例)

HttpRpcClient / Http2cRpcClient

// (1) 池层面定期清理
HttpClientBuilder builder = HttpClients.custom()
    .setConnectionManager(connMgr)
    .evictExpiredConnections()
    .evictIdleConnections(60, TimeUnit.SECONDS)   // 60s idle 即剔除
    .setConnectionManagerShared(false);
connMgr.setValidateAfterInactivity(2000);          // 2s 不用就重新校验

// (2) 客户端实例层面:暴露"最后使用时间"
private volatile long lastUsedNanos = System.nanoTime();
public void markUsed() { this.lastUsedNanos = System.nanoTime(); }

@Override
public boolean isAvailable() {
    if (super.isClosed()) return false;
    long idleNs = System.nanoTime() - lastUsedNanos;
    // 10 分钟没人用 → 视为不可用,触发 RpcClusterClientManager 走 5 次失败回收
    return idleNs < TimeUnit.MINUTES.toNanos(10);
}

HttpConsumerInvoker / Http2ConsumerInvoker

public CompletionStage<Response> invoke(Request request) {
    rpcClient.markUsed();   // ← 每次发请求都打活
    ...
}

为什么这么设计

  • HTTP/1.1 的 keep-alive 由 HttpClient 内部管理,业务层不要去 close 单个 HTTP 连接
  • 通过 evictIdleConnections + validateAfterInactivity池层面保证不发到死连接上
  • 通过 lastUsedNanos + isAvailable()实例层面让上层定时器能识别冷却 10 分钟的 client 并回收,避免 HttpRpcClient 实例堆积

四、完整防泄漏链路

场景 A:被调(服务端)下线/IP 漂移
   ↓
主调侧 Netty 收到 channelInactive (TCP RST/FIN)
   ↓
AbstractClientTransport 清理对应 NettyChannel
   ↓
后续 RpcClient.isAvailable() 返回 false
   ↓
RpcClusterClientManager 的 30s 定时器观察到 unavailable
   ↓
连续 5 次累计 ≈150s 后 → proxy.close()
   ↓
proxy.closeFuture() 完成 → CLUSTER_MAP.remove(uniqId, proxy)
   ↓
内存被 GC

场景 B:主调(客户端)异常下线
   ↓
进程退出 → OS 发 FIN 给被调
   ↓
被调 Netty 收到 channelInactive → 清理服务端 channel
   ↓
(拔网线场景)依赖 OS keepalive ~2h 兜底

五、单测覆盖(共 24 个测试,4 个测试类)

测试类 用途 关键技术
RpcClusterClientManagerTest (17) 主流程:getOrCreateClient/close/checkAndReconnect/Proxy delegate 纯 JUnit + Stub
DefClusterInvokerCloseFutureTest (5) CAS remove 防误删语义 纯 JUnit
RpcClusterClientManagerLoggerLevelTest (1) 覆盖 3 个 logger.isDebugEnabled()==false 分支 log4j2 LoggerContext.updateLoggers() 临时调级别
RpcClusterClientManagerSchedulerRejectTest (1) 覆盖 scheduler 抛 RejectedExecutionException 的 catch 分支 PowerMock mockStatic(WorkerPoolManager.class)
HttpRpcClientLongLinkTest (10) markUsed/isAvailable 三分支 + Http2/Https 继承 反射 + 子类

关键单测设计取舍

  1. PowerMock 在 JDK17 下崩溃MagicAccessorImpl 限制)→ 项目锁定 JDK8,PowerMock 可用
  2. logger 分支覆盖不靠 PowerMock 改 static-finalWhitebox.setInternalState 在 PowerMock 1.7.4 + JDK8 下也修改不了 static-final → 改用 log4j2 标准 API 调级别,零反射、零 PowerMock,最简单
  3. scheduler 异常:直接 mock 静态方法 WorkerPoolManager.getShareScheduler() 返回会抛 RejectedExecutionException 的 ScheduledExecutorService

最终覆盖率

RpcClusterClientManager 行覆盖 93%,3 处 isDebugEnabled + scheduler catch 全部覆盖。剩余未覆盖:

  • L165 DCL 内层并发竞态 check(防御代码)
  • L367 RpcClientProxy.equalsgetClass != obj.getClass(boilerplate)

六、改动文件清单

核心改动(已 commit)

  • RpcClusterClientManager.java — 改造主体
  • DefClusterInvoker.java — CAS remove 修复
  • AbstractClientTransport.java — channels 改 CopyOnWriteArrayList
  • NettyTcpClientTransport.java / NettyTcpServerTransport.java — 移除 IdleStateHandler
  • NettyClientHandler.java / NettyServerHandler.java — 删除 idle close 死代码
  • HttpRpcClient.java / Http2cRpcClient.java — evictIdle + lastUsedNanos + isAvailable
  • HttpConsumerInvoker.java / Http2ConsumerInvoker.java — markUsed

单测

  • RpcClusterClientManagerTest.java(增强 17 测试)
  • DefClusterInvokerCloseFutureTest.java(新建 5 测试)
  • RpcClusterClientManagerLoggerLevelTest.java(新建 1 测试,本次新增)
  • RpcClusterClientManagerSchedulerRejectTest.java(新建 1 测试,本次新增)
  • HttpRpcClientLongLinkTest.java(新建 10 测试)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant