Skip to content

Add client-side stream-liveness watchdog with reconnect-on-stale#3718

Open
tylerhawkes wants to merge 1 commit into
mainfrom
tyler/stage0-stream-watchdog
Open

Add client-side stream-liveness watchdog with reconnect-on-stale#3718
tylerhawkes wants to merge 1 commit into
mainfrom
tyler/stage0-stream-watchdog

Conversation

@tylerhawkes
Copy link
Copy Markdown
Contributor

@tylerhawkes tylerhawkes commented Jun 4, 2026

What

A client-side stream-liveness floor: a WatchdogStream combinator that wraps each subscription with an idle timer (reset on every item). If nothing arrives within the idle timeout (default 300s, env-overridable), it yields a single SubscribeError::StreamStale and terminates; the consume loops treat that as a signal to re-subscribe from the persisted cursor instead of hanging forever. A silently-dead stream becomes a recoverable reconnect.

Why

The v3 path has no server-side keepalive, so an idle subscription whose transport wedges open — e.g. an L7 proxy keeps answering HTTP/2 pings while the backend stream is gone — delivers neither an error nor a close. The consumer just hangs. This is the client-side liveness floor; it's layered with the keepalive work (#3715) and, longer term, a server-side heartbeat (XIP-83) that lets the idle timeout drop well below 300s.

How

  • subscriptions/watchdog.rs — the WatchdogStream<S> combinator (lazy-armed idle timer via an injectable factory over BoxDynFuture, cross-platform native/wasm; implements FusedStream once tripped) + a test-only idle-timeout override.
  • Timing is read once at startup into a process-wide WatchdogConfig (LazyLock), each knob env-overridable — see Config below. Reconnect is throttled, not delayed: it measures from when each attempt began, so a stream that was healthy for the whole idle timeout reconnects immediately (a couple seconds of backoff on top of a multi-minute idle is pointless), while a tight loop — a fast creation failure or a stream that trips right after creation — is paced to ≥ one attempt per floor. The floor uses a real random jitter draw (not wall-clock low bits) to de-sync clients tight-looping against the same upstream.
  • SubscribeError::StreamStale (retryable) wired into the reconnect-on-stale loops in stream_all_messages_with_callback, stream_conversations_with_callback, and the group stream_messages_with_callback — all three cancellation-token aware. A clean server close or cancellation stops the loop; only StreamStale reconnects. A stream-creation failure propagates as a startup error before the first successful subscription, and is retried with backoff (like a stale-trip) after it — so a transient reconnect-time hiccup no longer silently terminates the subscription (addresses the review's finding Refactor to separate wasm bindings from rust crate #1).

Config (env vars, captured once at startup; safe defaults)

  • XMTP_STREAM_WATCHDOG_IDLE_TIMEOUT_SECS (default 300) — idle timeout before a stale trip. A forgiving server-side proxy can raise it; a latency-sensitive mobile client can lower it.
  • XMTP_STREAM_WATCHDOG_RECONNECT_BASE_SECS (default 1) — minimum interval between reconnect attempts (the throttle floor). A normal long-idle trip reconnects immediately; this only paces a tight loop.
  • XMTP_STREAM_WATCHDOG_RECONNECT_JITTER_MS (default 1000) — random jitter added to the throttle floor, to de-sync clients tight-looping against the same upstream (0 disables).

Tests (all pass; no flakiness across repeated runs)

  • Combinator state machine, now synchronous — the stream is polled by hand via futures-test, fully deterministic with no real clock: pass-through-then-trip-once, clean-end-is-not-a-trip, no-trip-until-the-timer-fires.
  • A proptest over arbitrary item sequences: every item is yielded in order, then exactly one StreamStale, then terminated — fused forever after.
  • new_uses_a_real_timer — smoke test that the production constructor wires a real wall-clock timer.
  • WatchdogConfig env parsing (present / explicit-zero / garbage / missing) and jitter-stays-in-bounds.
  • watchdog_trips_on_idle_real_stream — a real idle v3 stream goes silent and the watchdog trips + terminates (validates the no-server-keepalive premise against the live transport).
  • watchdog_reconnect_keeps_stream_alive — a message sent after a stale-trip still arrives, proving the consume loop re-subscribed (confirmed via logs).

just lint-rust green; native + wasm compile.

Notes / caveats

  • The watchdog wraps the outer (consume-loop) stream, not the inner network stream. A later phase with a server heartbeat relocates it below the heartbeat filter.
  • Only StreamStale triggers reconnect. Transport errors still terminate as before (pre-existing behavior, unchanged).
  • A healthy dormant stream reconnects ~every idle-timeout (a catch-up query + a StreamClosed/StreamOpened event pair). The server heartbeat removes this.
  • Conversation stream: a re-subscribe drops/recreates the local LocalEvents broadcast receiver, which has no replay. Network welcomes are caught up from the cursor, but a locally created group during the reconnect window is not re-emitted (the caller already holds that group). Documented in code; removed by the later refactor that wraps the network sub below the broadcast.

Merge gate: full CI streaming suite green.

🤖 Generated with Claude Code

Note

Add client-side stream-liveness watchdog with auto-reconnect on stale

  • Introduces WatchdogStream in subscriptions/watchdog.rs, a stream combinator that emits a single SubscribeError::StreamStale after a configurable idle timeout, then terminates — allowing callers to detect silent hangs.
  • Reworks stream_conversations_with_callback, stream_all_messages_with_callback, and stream_messages_with_callback to wrap their inner streams in WatchdogStream and reconnect automatically on stale events.
  • Reconnect pacing is handled by watchdog_reconnect_delay_since, which applies a jitter-based floor delay to prevent tight reconnect loops.
  • Watchdog timing is configurable via environment variables and supports thread-local overrides in tests.
  • Behavioral Change: readiness is now signaled after the first successful subscription (not immediately), and initial stream creation failures are surfaced to the caller rather than being retried silently.

Macroscope summarized ff26fb7.

@tylerhawkes tylerhawkes requested a review from a team as a code owner June 4, 2026 17:14
@claude
Copy link
Copy Markdown

claude Bot commented Jun 4, 2026

Claude finished @tylerhawkes's task —— View job


Code Review — June 6, 2026 07:43 UTC

I've reviewed the watchdog implementation and reconnect logic. Most of the significant issues have already been identified by macroscopeapp and addressed. Here are additional observations not yet mentioned:

1. Error handling asymmetry in callback streams (Minor)

In all three *_with_callback implementations (lines 622, 496, 207 in mod.rs/subscriptions.rs), non-StreamStale errors are passed to the callback and the stream continues consuming. This means:

  • A retryable ApiClient error from the underlying stream gets delivered to the callback but doesn't trigger reconnection
  • The callback receives errors that it may not be able to handle meaningfully (e.g., transient network failures)

The current behavior is: only StreamStale reconnects; all other errors (including retryable ones) are delivered to the callback and the stream continues until None or cancellation.

This is by design per the PR description, but worth noting: a persistent series of retryable errors would be surfaced to the callback rather than triggering stream recreation. Consider whether callbacks should receive infrastructure errors vs. only message-processing errors.

2. Proptest coverage could be stronger (Minor)

watchdog.rs:403 — The proptest validates the state machine for arbitrary item sequences, which is excellent. However, it doesn't test:

  • Sequences with errors (the stream is Result<T, SubscribeError>)
  • Timer reset on error items (errors are activity too)

Consider adding a proptest variant with items in proptest::collection::vec(proptest::result::maybe_err(any::<u8>(), ...), 0..32) to validate that error items also reset the idle timer.

3. RAII guard implementation is correct (Positive note)

stream_all/tests.rs:983-996 — The ResetIdleTimeout guard properly addresses macroscopeapp's earlier concern about test pollution. Well done.

4. Test coverage gap for creation-failure-after-healthy (Medium)

The reconnect loops at groups/subscriptions.rs:169, mod.rs:455, and mod.rs:578 all have this pattern:

Err(e) => {
    if ready.is_some() { break Err(e); }  // startup failure
    tracing::warn!("...will retry");       // post-healthy failure
    tokio::select! { ... }
    continue;
}

There's no test coverage for the "post-healthy creation failure during reconnect" path. The existing watchdog_reconnect_keeps_stream_alive test validates reconnect after a stale trip, but doesn't exercise the case where StreamGroupMessages::new / StreamConversations::new / StreamAllMessages::new fails during a reconnect attempt after being healthy.

A test that:

  1. Lets the stream become healthy
  2. Triggers a stale trip
  3. Causes the next new() call to fail (e.g., by dropping the API client temporarily)
  4. Confirms the loop retries with backoff instead of terminating

...would validate this critical resilience path.

5. Code quality observations (Positive)

  • Synchronous proptest approach is excellent — deterministic, fast, comprehensive
  • FusedStream implementation is correct
  • Env config parsing is robust (handles missing, zero, garbage)
  • Real jitter via rand::distr::Uniform prevents thundering herd
  • Documentation is thorough and honest about limitations (e.g., LocalEvents replay gap)

Summary

The implementation is solid and most issues have been addressed. The main unresolved review comments from macroscopeapp relate to error-handling edge cases in the reconnect loops, which are worth addressing before merge. The test coverage is strong but could benefit from covering the "creation failure during reconnect" path.

Recommendation: Address the outstanding macroscopeapp comments regarding infinite retry loops for non-StreamStale errors, then merge.


  • Read watchdog.rs implementation
  • Read mod.rs subscription changes
  • Read groups/subscriptions.rs changes
  • Analyze for issues not already mentioned
  • Post review feedback

Comment thread crates/xmtp_mls/src/subscriptions/stream_all/tests.rs Outdated
Comment thread crates/xmtp_mls/src/subscriptions/watchdog.rs Outdated
Comment thread crates/xmtp_mls/src/groups/subscriptions.rs Outdated
Comment on lines 491 to 494
tokio::select! {
_ = cancel.cancelled() => break,
next = stream.next() => match next {
Some(convo) => convo_callback(convo),
None => break,
}
_ = watchdog::watchdog_reconnect_delay() => {}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium subscriptions/mod.rs:491

After a watchdog trip, the reconnect path drops the StreamConversations and waits in watchdog_reconnect_delay() before creating a new one. Since StreamConversations subscribes to a BroadcastStream of LocalEvents, any NewGroup events broadcast during that delay are lost permanently — the new subscription has no replay. This causes convo_callback to silently miss locally created conversations whenever a reconnect coincides with group creation. Consider subscribing before the delay so the subscription is active during the reconnect window.

-                if cancelled || !stale {
-                    break;
-                }
-                tracing::debug!("`stream_conversations` went stale; reconnecting");
-                tokio::select! {
-                    _ = cancel.cancelled() => break,
-                    _ = watchdog::watchdog_reconnect_delay() => {}
+                if cancelled || !stale {
+                    break;
+                }
+                tracing::debug!("`stream_conversations` went stale; reconnecting");
+                // Reconnect delay before re-subscribing risks missing
+                // LocalEvents::NewGroup broadcasts during the window.
+                // Consider restructuring to ensure subscription exists before delay.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file crates/xmtp_mls/src/subscriptions/mod.rs around lines 491-494:

After a watchdog trip, the reconnect path drops the `StreamConversations` and waits in `watchdog_reconnect_delay()` before creating a new one. Since `StreamConversations` subscribes to a `BroadcastStream` of `LocalEvents`, any `NewGroup` events broadcast during that delay are lost permanently — the new subscription has no replay. This causes `convo_callback` to silently miss locally created conversations whenever a reconnect coincides with group creation. Consider subscribing before the delay so the subscription is active during the reconnect window.

@macroscopeapp
Copy link
Copy Markdown
Contributor

macroscopeapp Bot commented Jun 4, 2026

Approvability

Verdict: Needs human review

3 blocking correctness issues found. New feature adding stream-liveness watchdog with automatic reconnect behavior - a significant change to core streaming infrastructure. Multiple unresolved review comments identify medium-severity concerns including potential endless retry loops and lost events during reconnect windows.

You can customize Macroscope's approvability policy. Learn more.

@codecov
Copy link
Copy Markdown

codecov Bot commented Jun 4, 2026

Codecov Report

❌ Patch coverage is 81.58730% with 58 lines in your changes missing coverage. Please review.
✅ Project coverage is 84.29%. Comparing base (3b9cad6) to head (ff26fb7).

Files with missing lines Patch % Lines
crates/xmtp_mls/src/subscriptions/mod.rs 61.62% 33 Missing ⚠️
crates/xmtp_mls/src/groups/subscriptions.rs 47.61% 22 Missing ⚠️
crates/xmtp_mls/src/subscriptions/watchdog.rs 98.39% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3718      +/-   ##
==========================================
+ Coverage   84.23%   84.29%   +0.06%     
==========================================
  Files         408      409       +1     
  Lines       59219    59495     +276     
==========================================
+ Hits        49884    50153     +269     
- Misses       9335     9342       +7     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

callback(message);
}

if !stale {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be if stale

/// deadline is always measured from the most recent activity.
timer: Option<BoxDynFuture<'static, ()>>,
/// Once tripped, the stream is finished and must not be polled again.
tripped: bool,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this lends itself well to implement the FusedStream trait

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — implements FusedStream now; is_terminated() flips once tripped, whether by a stale trip or the inner stream ending.

/// An idle stream trips after the timeout: one `StreamStale`, then termination.
#[xmtp_common::test(unwrap_try = true)]
async fn trips_on_idle() {
let inner = stream::pending::<Result<u8, SubscribeError>>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with isolated stream impls like this it can be useful to make the tests synchronous and instead call the streams poll() impl manually. futures-test includes some nice test utilities for this as well. sync fns would also open a possibility of running the test against proptest more easily

mostly personal preference tho

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — pulled the idle timer behind an injectable factory so the combinator polls synchronously. Rewrote the unit tests to drive poll_next by hand via futures-test, and added a proptest over arbitrary item sequences (every item in order → one StreamStale → fused). Real-clock coverage stays in one smoke test + the integration tests.

@tylerhawkes tylerhawkes force-pushed the tyler/stage0-stream-watchdog branch from 33d96e6 to 59ad0d1 Compare June 4, 2026 18:12
Comment thread crates/xmtp_mls/src/subscriptions/mod.rs Outdated
@tylerhawkes tylerhawkes force-pushed the tyler/stage0-stream-watchdog branch from 59ad0d1 to 5346977 Compare June 4, 2026 19:12
Comment thread crates/xmtp_mls/src/subscriptions/watchdog.rs Outdated
Comment thread crates/xmtp_mls/src/subscriptions/mod.rs
@tylerhawkes tylerhawkes force-pushed the tyler/stage0-stream-watchdog branch 2 times, most recently from 42587c8 to ab4dad0 Compare June 5, 2026 14:43
.await
{
Ok(stream) => stream,
Err(e) => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium subscriptions/mod.rs:580

After the first successful subscription, non-StreamStale errors from StreamAllMessages::new are retried forever instead of terminating. Permanent failures (DB/query errors from find_groups or StreamGroupMessages::from_cow) trigger endless reconnect loops; callers never receive on_close or a terminal error, violating the contract that only SubscribeError::StreamStale should reconnect.

Also found in 1 other location(s)

crates/xmtp_mls/src/groups/subscriptions.rs:173

The new startup-failure path at break Err(e) does not actually make subscription setup fail for normal callers. stream_messages_with_callback only reports readiness through the oneshot sender, but xmtp_common::StreamHandle::wait_for_ready() ignores a dropped sender (stream_handles.rs:234-237). So if the first StreamGroupMessages::new fails, the task exits with Err(e) and drops tx, yet callers that only do wait_for_ready() still see apparent success and continue with a stream that never started.

🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @crates/xmtp_mls/src/subscriptions/mod.rs around line 580:

After the first successful subscription, non-`StreamStale` errors from `StreamAllMessages::new` are retried forever instead of terminating. Permanent failures (DB/query errors from `find_groups` or `StreamGroupMessages::from_cow`) trigger endless reconnect loops; callers never receive `on_close` or a terminal error, violating the contract that only `SubscribeError::StreamStale` should reconnect.

Also found in 1 other location(s):
- crates/xmtp_mls/src/groups/subscriptions.rs:173 -- The new startup-failure path at `break Err(e)` does not actually make subscription setup fail for normal callers. `stream_messages_with_callback` only reports readiness through the `oneshot` sender, but `xmtp_common::StreamHandle::wait_for_ready()` ignores a dropped sender (`stream_handles.rs:234-237`). So if the first `StreamGroupMessages::new` fails, the task exits with `Err(e)` and drops `tx`, yet callers that only do `wait_for_ready()` still see apparent success and continue with a stream that never started.

@tylerhawkes tylerhawkes force-pushed the tyler/stage0-stream-watchdog branch from ab4dad0 to 7d4ff6d Compare June 6, 2026 05:07
@tylerhawkes tylerhawkes force-pushed the tyler/stage0-stream-watchdog branch from 7d4ff6d to ff26fb7 Compare June 6, 2026 07:42
/// a stream that errors or trips right after creation, or a fast creation failure — capping
/// a tight reconnect loop to one attempt per floor. The jitter de-syncs clients that are all
/// tight-looping against the same upstream.
pub(crate) async fn watchdog_reconnect_delay_since(attempt_started: Instant) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Low subscriptions/watchdog.rs:147

watchdog_reconnect_delay_since computes reconnect_base + rand_jitter(reconnect_jitter) using unbounded environment variables, so a large XMTP_STREAM_WATCHDOG_RECONNECT_BASE_SECS can overflow Duration::MAX. std::time::Duration addition panics on overflow, so a misconfigured environment variable crashes the process on the first reconnect attempt.

🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @crates/xmtp_mls/src/subscriptions/watchdog.rs around line 147:

`watchdog_reconnect_delay_since` computes `reconnect_base + rand_jitter(reconnect_jitter)` using unbounded environment variables, so a large `XMTP_STREAM_WATCHDOG_RECONNECT_BASE_SECS` can overflow `Duration::MAX`. `std::time::Duration` addition panics on overflow, so a misconfigured environment variable crashes the process on the first reconnect attempt.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants