Skip to content

Commit ff71575

Browse files
authored
feat: allow configuring TCP listen backlog (#4355)
Linkerd proxy inbound listeners currently use a fixed TCP accept backlog (observed as 128 via ss -ltnp) that cannot be configured by operators. In high-traffic environments, especially during Kubernetes rollouts where many sidecars simultaneously establish new outbound connections to newly-ready pods, this fixed backlog can become a limiting factor. When a connection burst exceeds the proxy’s accept queue capacity, incoming connections are temporarily dropped or delayed at the TCP level, leading to short-lived connection failures such as: ``` {"timestamp":"2025-12-12T19:55:11.333411Z","level":"WARN","fields":{"message":"Failed to connect","error":"connect timed out after 1s"},"target":"linkerd_reconnect","threadId":"ThreadId(1)"} ``` Because the proxy backlog is not configurable or documented, operators have no direct way to tune Linkerd for services that experience high fan-in or connection storms (for example during rollouts, autoscaling events, or traffic rebalancing). This commit introduces support for two new environment variables: - `LINKERD2_PROXY_INBOUND_TCP_LISTEN_BACKLOG` - `LINKERD2_PROXY_OUTBOUND_TCP_LISTEN_BACKLOG` these can be configured using the `proxy.additionalEnv` field in the Linkerd Helm chart. Signed-off-by: Aurel Canciu <aurel.canciu@nexhealth.com>
1 parent 1ec95c7 commit ff71575

9 files changed

Lines changed: 159 additions & 16 deletions

File tree

linkerd/app/core/src/config.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ pub use crate::exp_backoff::ExponentialBackoff;
22
use crate::{
33
proxy::http::{h1, h2},
44
svc::{queue, ExtractParam, Param},
5-
transport::{DualListenAddr, Keepalive, ListenAddr, UserTimeout},
5+
transport::{Backlog, DualListenAddr, Keepalive, ListenAddr, UserTimeout},
66
};
77
use std::time::Duration;
88

@@ -11,6 +11,7 @@ pub struct ServerConfig {
1111
pub addr: DualListenAddr,
1212
pub keepalive: Keepalive,
1313
pub user_timeout: UserTimeout,
14+
pub backlog: Backlog,
1415
pub http2: h2::ServerParams,
1516
}
1617

@@ -84,3 +85,9 @@ impl Param<UserTimeout> for ServerConfig {
8485
self.user_timeout
8586
}
8687
}
88+
89+
impl Param<Backlog> for ServerConfig {
90+
fn param(&self) -> Backlog {
91+
self.backlog
92+
}
93+
}

linkerd/app/inbound/src/test_util.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use linkerd_app_core::{
88
http::{h1, h2},
99
tap,
1010
},
11-
transport::{DualListenAddr, Keepalive, UserTimeout},
11+
transport::{Backlog, DualListenAddr, Keepalive, UserTimeout},
1212
ProxyRuntime,
1313
};
1414
pub use linkerd_app_test as support;
@@ -59,6 +59,7 @@ pub fn default_config() -> Config {
5959
addr: DualListenAddr(([0, 0, 0, 0], 0).into(), None),
6060
keepalive: Keepalive(None),
6161
user_timeout: UserTimeout(None),
62+
backlog: Backlog::default(),
6263
http2: h2::ServerParams::default(),
6364
},
6465
connect: config::ConnectConfig {

linkerd/app/integration/src/proxy.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ use super::*;
22
use linkerd_app_core::{
33
svc::Param,
44
transport::{
5-
listen, orig_dst, Keepalive, ListenAddr, Local, OrigDstAddr, ServerAddr, UserTimeout,
5+
listen, orig_dst, Backlog, Keepalive, ListenAddr, Local, OrigDstAddr, ServerAddr,
6+
UserTimeout,
67
},
78
Result,
89
};
@@ -70,7 +71,7 @@ struct MockDualOrigDst {
7071

7172
impl<T> listen::Bind<T> for MockOrigDst
7273
where
73-
T: Param<Keepalive> + Param<UserTimeout> + Param<ListenAddr>,
74+
T: Param<Keepalive> + Param<UserTimeout> + Param<ListenAddr> + Param<Backlog>,
7475
{
7576
type Addrs = orig_dst::Addrs;
7677
type BoundAddrs = Local<ServerAddr>;
@@ -120,7 +121,7 @@ impl fmt::Debug for MockOrigDst {
120121

121122
impl<T> listen::Bind<T> for MockDualOrigDst
122123
where
123-
T: Param<Keepalive> + Param<UserTimeout> + Param<ListenAddr>,
124+
T: Param<Keepalive> + Param<UserTimeout> + Param<ListenAddr> + Param<Backlog>,
124125
{
125126
type Addrs = orig_dst::Addrs;
126127
type BoundAddrs = (Local<ServerAddr>, Option<Local<ServerAddr>>);

linkerd/app/outbound/src/test_util.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use linkerd_app_core::{
77
http::{h1, h2},
88
tap,
99
},
10-
transport::{DualListenAddr, Keepalive, UserTimeout},
10+
transport::{Backlog, DualListenAddr, Keepalive, UserTimeout},
1111
IpMatch, IpNet, ProxyRuntime,
1212
};
1313
pub use linkerd_app_test as support;
@@ -27,6 +27,7 @@ pub(crate) fn default_config() -> Config {
2727
addr: DualListenAddr(([0, 0, 0, 0], 0).into(), None),
2828
keepalive: Keepalive(None),
2929
user_timeout: UserTimeout(None),
30+
backlog: Backlog::default(),
3031
http2: h2::ServerParams::default(),
3132
},
3233
connect: config::ConnectConfig {

linkerd/app/src/env.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use linkerd_app_core::{
55
control::{Config as ControlConfig, ControlAddr},
66
proxy::http::{h1, h2},
77
tls,
8-
transport::{DualListenAddr, Keepalive, ListenAddr, UserTimeout},
8+
transport::{Backlog, DualListenAddr, Keepalive, ListenAddr, UserTimeout},
99
AddrMatch, Conditional, IpNet,
1010
};
1111
use std::{collections::HashSet, net::SocketAddr, path::PathBuf, time::Duration};
@@ -136,6 +136,9 @@ const ENV_OUTBOUND_ACCEPT_USER_TIMEOUT: &str = "LINKERD2_PROXY_OUTBOUND_ACCEPT_U
136136
const ENV_INBOUND_CONNECT_USER_TIMEOUT: &str = "LINKERD2_PROXY_INBOUND_CONNECT_USER_TIMEOUT";
137137
const ENV_OUTBOUND_CONNECT_USER_TIMEOUT: &str = "LINKERD2_PROXY_OUTBOUND_CONNECT_USER_TIMEOUT";
138138

139+
const ENV_INBOUND_TCP_LISTEN_BACKLOG: &str = "LINKERD2_PROXY_INBOUND_TCP_LISTEN_BACKLOG";
140+
const ENV_OUTBOUND_TCP_LISTEN_BACKLOG: &str = "LINKERD2_PROXY_OUTBOUND_TCP_LISTEN_BACKLOG";
141+
139142
const ENV_INBOUND_MAX_IDLE_CONNS_PER_ENDPOINT: &str = "LINKERD2_PROXY_MAX_IDLE_CONNS_PER_ENDPOINT";
140143
const ENV_OUTBOUND_MAX_IDLE_CONNS_PER_ENDPOINT: &str =
141144
"LINKERD2_PROXY_OUTBOUND_MAX_IDLE_CONNS_PER_ENDPOINT";
@@ -389,6 +392,14 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
389392
let inbound_accept_keepalive = parse(strings, ENV_INBOUND_ACCEPT_KEEPALIVE, parse_duration);
390393
let outbound_accept_keepalive = parse(strings, ENV_OUTBOUND_ACCEPT_KEEPALIVE, parse_duration);
391394

395+
let inbound_tcp_listen_backlog =
396+
parse(strings, ENV_INBOUND_TCP_LISTEN_BACKLOG, parse_number::<u32>);
397+
let outbound_tcp_listen_backlog = parse(
398+
strings,
399+
ENV_OUTBOUND_TCP_LISTEN_BACKLOG,
400+
parse_number::<u32>,
401+
);
402+
392403
let inbound_connect_keepalive = parse(strings, ENV_INBOUND_CONNECT_KEEPALIVE, parse_duration);
393404
let outbound_connect_keepalive = parse(strings, ENV_OUTBOUND_CONNECT_KEEPALIVE, parse_duration);
394405

@@ -500,10 +511,12 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
500511

501512
let keepalive = Keepalive(outbound_accept_keepalive?);
502513
let user_timeout = UserTimeout(outbound_accept_user_timeout?);
514+
let backlog = Backlog(outbound_tcp_listen_backlog?);
503515
let server = ServerConfig {
504516
addr,
505517
keepalive,
506518
user_timeout,
519+
backlog,
507520
http2: http2::parse_server(strings, "LINKERD2_PROXY_OUTBOUND_SERVER_HTTP2")?,
508521
};
509522
let discovery_idle_timeout =
@@ -592,10 +605,12 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
592605
);
593606
let keepalive = Keepalive(inbound_accept_keepalive?);
594607
let user_timeout = UserTimeout(inbound_accept_user_timeout?);
608+
let backlog = Backlog(inbound_tcp_listen_backlog?);
595609
let server = ServerConfig {
596610
addr,
597611
keepalive,
598612
user_timeout,
613+
backlog,
599614
http2: http2::parse_server(strings, "LINKERD2_PROXY_INBOUND_SERVER_HTTP2")?,
600615
};
601616
let discovery_idle_timeout =
@@ -815,6 +830,7 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
815830
addr: DualListenAddr(admin_listener_addr, None),
816831
keepalive: inbound.proxy.server.keepalive,
817832
user_timeout: inbound.proxy.server.user_timeout,
833+
backlog: inbound.proxy.server.backlog,
818834
http2: inbound.proxy.server.http2.clone(),
819835
},
820836

@@ -869,6 +885,7 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
869885
addr: DualListenAddr(addr, None),
870886
keepalive: inbound.proxy.server.keepalive,
871887
user_timeout: inbound.proxy.server.user_timeout,
888+
backlog: inbound.proxy.server.backlog,
872889
http2: inbound.proxy.server.http2.clone(),
873890
},
874891
})

linkerd/meshtls/tests/util.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use linkerd_meshtls::{self as meshtls, watch};
1111
use linkerd_proxy_transport::{
1212
addrs::*,
1313
listen::{Addrs, Bind, BindTcp},
14-
ConnectTcp, Keepalive, UserTimeout,
14+
Backlog, ConnectTcp, Keepalive, UserTimeout,
1515
};
1616
use linkerd_stack::{
1717
layer::Layer, service_fn, ExtractParam, InsertParam, NewService, Param, ServiceExt,
@@ -408,6 +408,11 @@ impl Param<UserTimeout> for Server {
408408
UserTimeout(None)
409409
}
410410
}
411+
impl Param<Backlog> for Server {
412+
fn param(&self) -> Backlog {
413+
Backlog(None)
414+
}
415+
}
411416

412417
// === impl ServerParams ===
413418

linkerd/proxy/transport/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ impl From<UserTimeout> for Option<Duration> {
4545
}
4646
}
4747

48+
#[derive(Copy, Clone, Debug, Default)]
49+
pub struct Backlog(pub Option<u32>);
50+
4851
// Misc.
4952

5053
fn set_nodelay_or_warn(socket: &TcpStream) {

linkerd/proxy/transport/src/listen.rs

Lines changed: 108 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
mod dual_bind;
22

3-
use crate::{addrs::*, Keepalive, UserTimeout};
3+
use crate::{addrs::*, Backlog, Keepalive, UserTimeout};
44
use dual_bind::DualBind;
55
use futures::prelude::*;
66
use linkerd_error::Result;
@@ -71,7 +71,7 @@ impl BindTcp {
7171

7272
impl<T> Bind<T> for BindTcp
7373
where
74-
T: Param<ListenAddr> + Param<Keepalive> + Param<UserTimeout>,
74+
T: Param<ListenAddr> + Param<Keepalive> + Param<UserTimeout> + Param<Backlog>,
7575
{
7676
type Addrs = Addrs;
7777
type BoundAddrs = Local<ServerAddr>;
@@ -81,10 +81,36 @@ where
8181
fn bind(self, params: &T) -> Result<(Self::BoundAddrs, Self::Incoming)> {
8282
let listen = {
8383
let ListenAddr(addr) = params.param();
84-
let l = std::net::TcpListener::bind(addr)?;
85-
// Ensure that O_NONBLOCK is set on the socket before using it with Tokio.
86-
l.set_nonblocking(true)?;
87-
tokio::net::TcpListener::from_std(l).expect("listener must be valid")
84+
let Backlog(backlog) = params.param();
85+
86+
match backlog {
87+
Some(backlog) => {
88+
// Use TcpSocket to configure a custom listen backlog.
89+
// TcpSocket::new_v4/v6 automatically sets O_NONBLOCK, which is
90+
// required for Tokio's async I/O operations.
91+
let socket = if addr.is_ipv4() {
92+
tokio::net::TcpSocket::new_v4()?
93+
} else {
94+
tokio::net::TcpSocket::new_v6()?
95+
};
96+
97+
// Enable SO_REUSEADDR to match std::net::TcpListener::bind
98+
// behavior. On Windows, std does not set SO_REUSEADDR to prevent
99+
// socket hijacking.
100+
#[cfg(not(windows))]
101+
socket.set_reuseaddr(true)?;
102+
103+
socket.bind(addr)?;
104+
socket.listen(backlog)?
105+
}
106+
None => {
107+
// No custom backlog configured; use std::net::TcpListener::bind
108+
// which applies platform-correct defaults.
109+
let l = std::net::TcpListener::bind(addr)?;
110+
l.set_nonblocking(true)?;
111+
tokio::net::TcpListener::from_std(l).expect("listener must be valid")
112+
}
113+
}
88114
};
89115
let server = Local(ServerAddr(listen.local_addr()?));
90116
let Keepalive(keepalive) = params.param();
@@ -138,3 +164,79 @@ impl Param<AddrPair> for Addrs {
138164
AddrPair(client, server)
139165
}
140166
}
167+
168+
#[cfg(test)]
169+
mod tests {
170+
use super::*;
171+
172+
#[derive(Clone)]
173+
struct TestParams {
174+
addr: ListenAddr,
175+
keepalive: Keepalive,
176+
user_timeout: UserTimeout,
177+
backlog: crate::Backlog,
178+
}
179+
180+
impl Param<ListenAddr> for TestParams {
181+
fn param(&self) -> ListenAddr {
182+
self.addr
183+
}
184+
}
185+
186+
impl Param<Keepalive> for TestParams {
187+
fn param(&self) -> Keepalive {
188+
self.keepalive
189+
}
190+
}
191+
192+
impl Param<UserTimeout> for TestParams {
193+
fn param(&self) -> UserTimeout {
194+
self.user_timeout
195+
}
196+
}
197+
198+
impl Param<crate::Backlog> for TestParams {
199+
fn param(&self) -> crate::Backlog {
200+
self.backlog
201+
}
202+
}
203+
204+
fn params_with_backlog(backlog: Option<u32>) -> TestParams {
205+
TestParams {
206+
addr: ListenAddr("127.0.0.1:0".parse().unwrap()),
207+
keepalive: Keepalive(None),
208+
user_timeout: UserTimeout(None),
209+
backlog: crate::Backlog(backlog),
210+
}
211+
}
212+
213+
#[tokio::test]
214+
async fn bind_with_custom_backlog() {
215+
let params = params_with_backlog(Some(1024));
216+
let bind = BindTcp::default();
217+
let (bound_addr, mut incoming) = bind.bind(&params).expect("failed to bind");
218+
219+
// Verify we can connect and accept through the listener.
220+
let addr = bound_addr.0 .0;
221+
let connect = tokio::net::TcpStream::connect(addr);
222+
let accept = incoming.next();
223+
let (conn, accepted) = tokio::join!(connect, accept);
224+
conn.expect("failed to connect");
225+
accepted.expect("stream ended").expect("failed to accept");
226+
}
227+
228+
#[tokio::test]
229+
async fn bind_with_default_backlog() {
230+
let params = params_with_backlog(None);
231+
let bind = BindTcp::default();
232+
let (bound_addr, mut incoming) = bind.bind(&params).expect("failed to bind");
233+
234+
// Verify we can connect and accept through the listener.
235+
let addr = bound_addr.0 .0;
236+
let connect = tokio::net::TcpStream::connect(addr);
237+
let accept = incoming.next();
238+
let (conn, accepted) = tokio::join!(connect, accept);
239+
conn.expect("failed to connect");
240+
accepted.expect("stream ended").expect("failed to accept");
241+
}
242+
}

linkerd/proxy/transport/src/listen/dual_bind.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{addrs::DualListenAddr, listen::Bind, Keepalive, ListenAddr, UserTimeout};
1+
use crate::{addrs::DualListenAddr, listen::Bind, Backlog, Keepalive, ListenAddr, UserTimeout};
22
use futures::Stream;
33
use linkerd_error::Result;
44
use linkerd_stack::Param;
@@ -26,7 +26,7 @@ impl<B> From<B> for DualBind<B> {
2626

2727
impl<T, B> Bind<T> for DualBind<B>
2828
where
29-
T: Param<DualListenAddr> + Param<Keepalive> + Param<UserTimeout> + Clone,
29+
T: Param<DualListenAddr> + Param<Keepalive> + Param<UserTimeout> + Param<Backlog> + Clone,
3030
B: Bind<Listen<T>, Io = TcpStream> + Clone + 'static,
3131
{
3232
type Addrs = B::Addrs;
@@ -68,6 +68,12 @@ impl<T: Param<UserTimeout>> Param<UserTimeout> for Listen<T> {
6868
}
6969
}
7070

71+
impl<T: Param<Backlog>> Param<Backlog> for Listen<T> {
72+
fn param(&self) -> Backlog {
73+
self.parent.param()
74+
}
75+
}
76+
7177
impl<T> Param<ListenAddr> for Listen<T> {
7278
fn param(&self) -> ListenAddr {
7379
ListenAddr(self.addr)

0 commit comments

Comments
 (0)