Skip to content

Commit 3aa8923

Browse files
authored
fix(breaker): prevent latent busy-spin in closed-state drain loop (#4481)
* fix(breaker): prevent latent busy-spin in closed-state drain loop A latent busy-spin exists in the closed-state backoff drain loop: `_ = self.rsps.recv() => continue` discards the recv() result, so when the response channel closes recv() returns None immediately on every iteration and the loop spins, saturating a CPU core. Currently this is unreachable because gate::Rx and mpsc::Sender live in the same Gate<BroadcastClassification<...>> service struct and drop together in a single synchronous destructor chain, so gate.lost() always fires when the channel is closing. However, gate::Rx is a watch::Receiver, implementing Clone. If a future change cloned it for a legitimate purpose (monitoring gate state, exposing it to an admin endpoint, observability, etc), the clone would keep gate.lost() from resolving after the endpoint service is dropped, while the mpsc channel would close normally once in-flight requests drain. The breaker task would then busy-spin instead of terminating. [NB: the reason for this is that a clone would extend the lifetime of the receiver, and for gate.lost() to fire the internal Sender::closed() requires that all receivers are gone] Check the recv() result and propagate None as Err(()) to terminate the task cleanly, mirroring the pattern already used in open() and probation(). This makes the breaker self-contained, terminating on its own channel state rather than depending on the fragility of the ownership structure, where currently gate::Rx and mpsc::Sender happen to share a destructor chain. Signed-off-by: Alejandro Martinez Ruiz <amr@buoyant.io> * test(breaker): regression test for channel close with surviving gate::Rx Verify that the breaker task terminates when the response channel closes while a clone of gate::Rx outlives the endpoint service. We build a minimal Gate, clone gate::Rx, trip the breaker via status 500 into closed state then let the service drop. The surviving gate::Rx clone should prevent gate.lost() from firing, so the breaker must terminate via the recv() call, returning None in the closed state drain loop. Without the channel close check in closed() this test would hang (spinning on recv() returning None indefinitely). Signed-off-by: Alejandro Martinez Ruiz <amr@buoyant.io> --------- Signed-off-by: Alejandro Martinez Ruiz <amr@buoyant.io>
1 parent 6a89cd7 commit 3aa8923

1 file changed

Lines changed: 120 additions & 3 deletions

File tree

linkerd/app/outbound/src/http/breaker/consecutive_failures.rs

Lines changed: 120 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,11 @@ impl ConsecutiveFailures {
7676
loop {
7777
tokio::select! {
7878
_ = backoff.next() => break,
79-
// Ignore responses while the breaker is shut.
80-
_ = self.rsps.recv() => continue,
79+
// Ignore responses while the breaker is shut, but
80+
// terminate if the channel is closed. recv() on a
81+
// closed channel returns None immediately, which would
82+
// otherwise busy-spin this loop.
83+
rsp = self.rsps.recv() => { rsp.ok_or(())?; },
8184
_ = self.gate.lost() => return Err(()),
8285
}
8386
}
@@ -105,7 +108,12 @@ impl ConsecutiveFailures {
105108
#[cfg(test)]
106109
mod tests {
107110
use super::*;
108-
use tokio::time;
111+
use linkerd_app_core::{
112+
proxy::http::{self, classify::BroadcastClassification, BoxBody},
113+
svc::{self, ServiceExt},
114+
Error,
115+
};
116+
use tokio::{task::yield_now, time};
109117
use tokio_test::{assert_pending, task};
110118

111119
#[tokio::test(flavor = "current_thread", start_paused = true)]
@@ -208,4 +216,113 @@ mod tests {
208216
assert_pending!(task.poll());
209217
assert!(params.gate.is_open());
210218
}
219+
220+
#[tokio::test(flavor = "current_thread", start_paused = true)]
221+
async fn closed_state_terminates_on_channel_close() {
222+
let _trace = linkerd_tracing::test::trace_init();
223+
224+
let (params, gate, rsps) = gate::Params::channel(1);
225+
let send = |res: Result<http::StatusCode, http::StatusCode>| {
226+
params
227+
.responses
228+
.try_send(classify::Class::Http(res))
229+
.unwrap()
230+
};
231+
232+
let backoff = ExponentialBackoff::try_new(
233+
time::Duration::from_secs(1),
234+
time::Duration::from_secs(100),
235+
0.0,
236+
)
237+
.expect("backoff params are valid");
238+
239+
// max_failures=1: a single failure trips the breaker into closed state.
240+
let breaker = ConsecutiveFailures::new(1, backoff, gate, rsps);
241+
let mut task = task::spawn(breaker.run());
242+
243+
assert_pending!(task.poll());
244+
assert!(params.gate.is_open());
245+
246+
// Trip the breaker into the closed state.
247+
send(Err(http::StatusCode::INTERNAL_SERVER_ERROR));
248+
assert_pending!(task.poll());
249+
assert!(params.gate.is_shut(), "should be in closed state");
250+
251+
// Drop the response sender while in the closed state drain loop.
252+
// The breaker must terminate rather than busy-spinning on None from
253+
// the closed channel.
254+
drop(params.responses);
255+
assert!(task.poll().is_ready());
256+
}
257+
258+
/// This test uses `tokio::spawn` (real task) instead of
259+
/// `tokio_test::task::spawn` because the Gate + BroadcastClassification
260+
/// service stack requires an actual task context for `oneshot()` to drive
261+
/// response classification through the mpsc channel.
262+
#[tokio::test(flavor = "current_thread", start_paused = true)]
263+
async fn closed_state_terminates_when_service_drops_but_gate_rx_survives() {
264+
let _trace = linkerd_tracing::test::trace_init();
265+
266+
let (params, gate, rsps) = gate::Params::channel(1);
267+
// Keep an extra gate receiver alive to model a future stack change that
268+
// retains gate state after the endpoint service itself is dropped.
269+
let leaked_gate = params.gate.clone();
270+
271+
let backoff = ExponentialBackoff::try_new(
272+
time::Duration::from_secs(1),
273+
time::Duration::from_secs(100),
274+
0.0,
275+
)
276+
.expect("backoff params are valid");
277+
278+
let breaker = ConsecutiveFailures::new(1, backoff, gate, rsps);
279+
let breaker = tokio::spawn(breaker.run());
280+
281+
let svc = svc::Gate::new(
282+
params.gate,
283+
BroadcastClassification::<classify::Response, _>::new(
284+
params.responses,
285+
svc::mk(|_: http::Request<BoxBody>| async move {
286+
Ok::<_, Error>(
287+
http::Response::builder()
288+
.status(http::StatusCode::INTERNAL_SERVER_ERROR)
289+
.body(BoxBody::default())
290+
.unwrap(),
291+
)
292+
}),
293+
),
294+
);
295+
296+
let req = http::Request::builder()
297+
.extension(classify::Response::default())
298+
.body(BoxBody::default())
299+
.unwrap();
300+
let rsp = svc.oneshot(req).await.expect("service must succeed");
301+
drop(rsp);
302+
303+
for _ in 0..16 {
304+
if leaked_gate.is_shut() {
305+
break;
306+
}
307+
yield_now().await;
308+
}
309+
assert!(
310+
leaked_gate.is_shut(),
311+
"breaker should enter the closed state"
312+
);
313+
314+
for _ in 0..16 {
315+
if breaker.is_finished() {
316+
break;
317+
}
318+
yield_now().await;
319+
}
320+
321+
assert!(
322+
breaker.is_finished(),
323+
"breaker should terminate when the response channel closes, \
324+
even if a gate::Rx clone survives"
325+
);
326+
breaker.await.expect("breaker task must not panic");
327+
}
211328
}

0 commit comments

Comments
 (0)