Skip to content

Commit b28453f

Browse files
cratelynunleashed
andauthored
fix(app/core): do not immediately retry negative ttl errors (#4450)
`linkerd_app_core::control` provides utilities used by the data plane to communicate with the linkerd control plane. this includes, among other features such as load-balancing and configurability for settings like connection timeout durations, an error recovery that respects DNS record's negative TTL. as of today, we do this within an inline, anonymous closure. this commit pulls this business logic out of an inline closure, and into an explicit pair of structures. ResolveRecover is the Recover implementation that handles identifying the proper backoff strategy, when presented with a given boxed error. ResolveBackoff is the structure that acts as the sum type that encompasses either a TTL-driven interval, or an exponential backoff. see also, #4449. that introduces some additional guardrails to prevent panicking if a negative ttl of zero is encountered. as part of this code motion, this commit inserts a call to `tokio::time::Interval::reset()` to the `Recover` implementation that extracts negative TTL's from `hickory_resolver` errors. this means that, upon resolution errors with a negative TTL, we will no longer immediately retry, and instead wait for the prescribed time before attempting once more. introducing test coverage for this is difficult because we cannot create a `ResolveError` ourselves, and introducing e.g. a trait to inject here would incur an excessive amount of boilerplate and complexity. to provide assurance that this is correct, see this small playground example, in which we poll an `Interval` with and without this call to `reset()`. note that when calling reset, it will no longer immediately return `Poll::Ready(_)` upon the first call to `tick()`. ```rust #[tokio::main] async fn main() { let duration = std::time::Duration::from_secs(1); let mut interval = tokio::time::interval(duration); // interval.reset(); let start = std::time::Instant::now(); for i in 1..5 { interval.tick().await; let elapsed = start.elapsed().as_millis(); println!("#{i} - {elapsed}ms") } } ``` ``` ; cargo run #1 - 1ms #2 - 1001ms #3 - 2001ms #4 - 3001ms ``` with a reset, to avoid first poll being ready: ```rust #[tokio::main] async fn main() { let duration = std::time::Duration::from_secs(1); let mut interval = tokio::time::interval(duration); interval.reset(); let start = std::time::Instant::now(); for i in 1..5 { interval.tick().await; let elapsed = start.elapsed().as_millis(); println!("#{i} - {elapsed}ms") } } ``` ``` ; cargo run #1 - 1001ms #2 - 2001ms #3 - 3001ms #4 - 4001ms ``` Signed-off-by: katelyn martin <kate@buoyant.io> Co-authored-by: Alejandro Martinez Ruiz <alex@flawedcode.org>
1 parent 921ad6d commit b28453f

1 file changed

Lines changed: 110 additions & 32 deletions

File tree

linkerd/app/core/src/control.rs

Lines changed: 110 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,10 @@ use crate::{
22
classify, config, dns, identity, metrics, proxy::http, svc, tls, transport::ConnectTcp, Addr,
33
Error,
44
};
5-
use futures::future::Either;
65
use linkerd_metrics::prom;
76
use std::fmt;
87
use tokio::time;
9-
use tokio_stream::{wrappers::IntervalStream, StreamExt};
10-
use tracing::{info_span, warn};
8+
use tracing::info_span;
119

1210
#[derive(Clone, Debug)]
1311
pub struct Config {
@@ -106,26 +104,6 @@ impl Config {
106104
let addr = self.addr;
107105
tracing::trace!(%addr, "Building");
108106

109-
// When a DNS resolution fails, log the error and use the TTL, if there
110-
// is one, to drive re-resolution attempts.
111-
let resolve_backoff = {
112-
let backoff = self.connect.backoff;
113-
move |error: Error| {
114-
warn!(error, "Failed to resolve control-plane component");
115-
if let Some(e) = crate::errors::cause_ref::<dns::ResolveError>(&*error) {
116-
if let Some(ttl) = e.negative_ttl() {
117-
return Ok::<_, Error>(Either::Left(
118-
IntervalStream::new(time::interval(ttl)).map(|_| ()),
119-
));
120-
}
121-
}
122-
123-
// If the error didn't give us a TTL, use the default jittered
124-
// backoff.
125-
Ok(Either::Right(backoff.stream()))
126-
}
127-
};
128-
129107
let client = svc::stack(ConnectTcp::new(
130108
self.connect.keepalive,
131109
self.connect.user_timeout,
@@ -151,7 +129,11 @@ impl Config {
151129

152130
let balance = endpoint
153131
.lift_new()
154-
.push(self::balance::layer(metrics.balance, dns, resolve_backoff))
132+
.push(self::balance::layer(
133+
metrics.balance,
134+
dns,
135+
self.connect.backoff,
136+
))
155137
.push(legacy_metrics.to_layer::<classify::Response, _, _>())
156138
.push(classify::NewClassify::layer_default());
157139

@@ -251,20 +233,24 @@ mod balance {
251233
proxy::{dns_resolve::DnsResolve, http, resolve::recover},
252234
svc, tls,
253235
};
236+
use futures::Stream;
237+
use linkerd_error::Recover;
238+
use linkerd_exp_backoff::{ExponentialBackoff, ExponentialBackoffStream};
254239
use linkerd_stack::ExtractParam;
255-
use std::net::SocketAddr;
240+
use std::{net::SocketAddr, pin::Pin, task};
241+
use tokio_stream::wrappers::IntervalStream;
256242

257243
pub(super) type Metrics = http::balance::MetricFamilies<Labels>;
258244

259-
pub fn layer<B, R: Clone, N>(
245+
pub(super) type Resolve = recover::Resolve<ResolveRecover, DnsResolve>;
246+
247+
pub fn layer<B, N>(
260248
metrics: Metrics,
261249
dns: dns::Resolver,
262-
recover: R,
263-
) -> impl svc::Layer<
264-
N,
265-
Service = http::NewBalance<B, Params, recover::Resolve<R, DnsResolve>, NewIntoTarget<N>>,
266-
> {
267-
let resolve = recover::Resolve::new(recover, DnsResolve::new(dns));
250+
backoff: ExponentialBackoff,
251+
) -> impl svc::Layer<N, Service = http::NewBalance<B, Params, Resolve, NewIntoTarget<N>>> {
252+
let resolve = recover::Resolve::new(ResolveRecover::new(backoff), DnsResolve::new(dns));
253+
268254
svc::layer::mk(move |inner| {
269255
http::NewBalance::new(
270256
NewIntoTarget { inner },
@@ -293,6 +279,98 @@ mod balance {
293279
addr: String,
294280
}
295281

282+
/// A [`Recover`] implementation that respects DNS resolution errors.
283+
///
284+
/// When a DNS resolution fails, this will log the error and use the TTL, if there is one,
285+
/// to drive re-resolution attempts. It defaults to an exponential backoff with jitter for
286+
/// other errors.
287+
#[derive(Clone)]
288+
pub(super) struct ResolveRecover {
289+
backoff: ExponentialBackoff,
290+
}
291+
292+
/// A [`Stream`] used for control-plane client's error recovery.
293+
#[pin_project::pin_project(project = ResolveBackoffProj)]
294+
pub(super) enum ResolveBackoff {
295+
/// A DNS-resolution error occurred.
296+
///
297+
/// This will backoff at a regular interval according to the negative TTL.
298+
NegativeTtl(#[pin] IntervalStream),
299+
/// A jittered exponential backoff.
300+
ExponentialBackoff(#[pin] ExponentialBackoffStream),
301+
}
302+
303+
// === impl ResolveRecover ===
304+
305+
impl ResolveRecover {
306+
/// Returns a new [`ResolveRecover`].
307+
pub fn new(config: ExponentialBackoff) -> Self {
308+
Self { backoff: config }
309+
}
310+
}
311+
312+
impl Recover for ResolveRecover {
313+
type Backoff = ResolveBackoff;
314+
fn recover(
315+
&self,
316+
error: linkerd_error::Error,
317+
) -> Result<Self::Backoff, linkerd_error::Error> {
318+
let ResolveRecover { backoff } = self;
319+
320+
tracing::warn!(error, "Failed to resolve control-plane component");
321+
322+
// If we are recovering due to a DNS resolution error, check for a negative TTL.
323+
if let Some(e) = crate::errors::cause_ref::<dns::ResolveError>(&*error) {
324+
if let Some(ttl) = e.negative_ttl() {
325+
let mut interval = tokio::time::interval(ttl);
326+
327+
// `tokio::time::interval()`'s documentation states that the first tick
328+
// completes immediately. To respect the prescribed negative-ttl and avoid
329+
// immediately retrying, we reset the interval before returning it.
330+
interval.reset();
331+
332+
let stream = IntervalStream::new(interval);
333+
return Ok(ResolveBackoff::NegativeTtl(stream));
334+
}
335+
}
336+
337+
// If the error didn't give us a TTL, use the default jittered backoff.
338+
Ok(ResolveBackoff::ExponentialBackoff(backoff.stream()))
339+
}
340+
}
341+
342+
// === impl ResolveBackoff ===
343+
344+
impl Stream for ResolveBackoff {
345+
/// No items are yielded in this stream.
346+
type Item = ();
347+
348+
fn poll_next(
349+
self: Pin<&mut Self>,
350+
cx: &mut task::Context<'_>,
351+
) -> task::Poll<Option<Self::Item>> {
352+
use ResolveBackoffProj::{ExponentialBackoff, NegativeTtl};
353+
354+
match self.project() {
355+
NegativeTtl(stream) => {
356+
// We can discard the `Instant`s that IntervalStream yields.
357+
let discard = |opt: Option<_>| opt.map(drop);
358+
stream.poll_next(cx).map(discard)
359+
}
360+
ExponentialBackoff(stream) => stream.poll_next(cx),
361+
}
362+
}
363+
364+
fn size_hint(&self) -> (usize, Option<usize>) {
365+
use ResolveBackoff::{ExponentialBackoff, NegativeTtl};
366+
367+
match self {
368+
NegativeTtl(stream) => stream.size_hint(),
369+
ExponentialBackoff(stream) => stream.size_hint(),
370+
}
371+
}
372+
}
373+
296374
// === impl NewIntoTarget ===
297375

298376
impl<N: svc::NewService<ControlAddr>> svc::NewService<ControlAddr> for NewIntoTarget<N> {

0 commit comments

Comments
 (0)