Add client-side stream-liveness watchdog with reconnect-on-stale#3718
Add client-side stream-liveness watchdog with reconnect-on-stale#3718tylerhawkes wants to merge 1 commit into
Conversation
|
Claude finished @tylerhawkes's task —— View job Code Review — June 6, 2026 07:43 UTCI've reviewed the watchdog implementation and reconnect logic. Most of the significant issues have already been identified by macroscopeapp and addressed. Here are additional observations not yet mentioned: 1. Error handling asymmetry in callback streams (Minor)In all three
The current behavior is: only This is by design per the PR description, but worth noting: a persistent series of retryable errors would be surfaced to the callback rather than triggering stream recreation. Consider whether callbacks should receive infrastructure errors vs. only message-processing errors. 2. Proptest coverage could be stronger (Minor)
Consider adding a proptest variant with 3. RAII guard implementation is correct (Positive note)
4. Test coverage gap for creation-failure-after-healthy (Medium)The reconnect loops at Err(e) => {
if ready.is_some() { break Err(e); } // startup failure
tracing::warn!("...will retry"); // post-healthy failure
tokio::select! { ... }
continue;
}There's no test coverage for the "post-healthy creation failure during reconnect" path. The existing A test that:
...would validate this critical resilience path. 5. Code quality observations (Positive)
SummaryThe implementation is solid and most issues have been addressed. The main unresolved review comments from macroscopeapp relate to error-handling edge cases in the reconnect loops, which are worth addressing before merge. The test coverage is strong but could benefit from covering the "creation failure during reconnect" path. Recommendation: Address the outstanding macroscopeapp comments regarding infinite retry loops for non-
|
| tokio::select! { | ||
| _ = cancel.cancelled() => break, | ||
| next = stream.next() => match next { | ||
| Some(convo) => convo_callback(convo), | ||
| None => break, | ||
| } | ||
| _ = watchdog::watchdog_reconnect_delay() => {} | ||
| } |
There was a problem hiding this comment.
🟡 Medium subscriptions/mod.rs:491
After a watchdog trip, the reconnect path drops the StreamConversations and waits in watchdog_reconnect_delay() before creating a new one. Since StreamConversations subscribes to a BroadcastStream of LocalEvents, any NewGroup events broadcast during that delay are lost permanently — the new subscription has no replay. This causes convo_callback to silently miss locally created conversations whenever a reconnect coincides with group creation. Consider subscribing before the delay so the subscription is active during the reconnect window.
- if cancelled || !stale {
- break;
- }
- tracing::debug!("`stream_conversations` went stale; reconnecting");
- tokio::select! {
- _ = cancel.cancelled() => break,
- _ = watchdog::watchdog_reconnect_delay() => {}
+ if cancelled || !stale {
+ break;
+ }
+ tracing::debug!("`stream_conversations` went stale; reconnecting");
+ // Reconnect delay before re-subscribing risks missing
+ // LocalEvents::NewGroup broadcasts during the window.
+ // Consider restructuring to ensure subscription exists before delay.🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file crates/xmtp_mls/src/subscriptions/mod.rs around lines 491-494:
After a watchdog trip, the reconnect path drops the `StreamConversations` and waits in `watchdog_reconnect_delay()` before creating a new one. Since `StreamConversations` subscribes to a `BroadcastStream` of `LocalEvents`, any `NewGroup` events broadcast during that delay are lost permanently — the new subscription has no replay. This causes `convo_callback` to silently miss locally created conversations whenever a reconnect coincides with group creation. Consider subscribing before the delay so the subscription is active during the reconnect window.
ApprovabilityVerdict: Needs human review 3 blocking correctness issues found. New feature adding stream-liveness watchdog with automatic reconnect behavior - a significant change to core streaming infrastructure. Multiple unresolved review comments identify medium-severity concerns including potential endless retry loops and lost events during reconnect windows. You can customize Macroscope's approvability policy. Learn more. |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3718 +/- ##
==========================================
+ Coverage 84.23% 84.29% +0.06%
==========================================
Files 408 409 +1
Lines 59219 59495 +276
==========================================
+ Hits 49884 50153 +269
- Misses 9335 9342 +7 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
| callback(message); | ||
| } | ||
|
|
||
| if !stale { |
| /// deadline is always measured from the most recent activity. | ||
| timer: Option<BoxDynFuture<'static, ()>>, | ||
| /// Once tripped, the stream is finished and must not be polled again. | ||
| tripped: bool, |
There was a problem hiding this comment.
this lends itself well to implement the FusedStream trait
There was a problem hiding this comment.
Done — implements FusedStream now; is_terminated() flips once tripped, whether by a stale trip or the inner stream ending.
| /// An idle stream trips after the timeout: one `StreamStale`, then termination. | ||
| #[xmtp_common::test(unwrap_try = true)] | ||
| async fn trips_on_idle() { | ||
| let inner = stream::pending::<Result<u8, SubscribeError>>(); |
There was a problem hiding this comment.
with isolated stream impls like this it can be useful to make the tests synchronous and instead call the streams poll() impl manually. futures-test includes some nice test utilities for this as well. sync fns would also open a possibility of running the test against proptest more easily
mostly personal preference tho
There was a problem hiding this comment.
Done — pulled the idle timer behind an injectable factory so the combinator polls synchronously. Rewrote the unit tests to drive poll_next by hand via futures-test, and added a proptest over arbitrary item sequences (every item in order → one StreamStale → fused). Real-clock coverage stays in one smoke test + the integration tests.
33d96e6 to
59ad0d1
Compare
59ad0d1 to
5346977
Compare
42587c8 to
ab4dad0
Compare
| .await | ||
| { | ||
| Ok(stream) => stream, | ||
| Err(e) => { |
There was a problem hiding this comment.
🟡 Medium subscriptions/mod.rs:580
After the first successful subscription, non-StreamStale errors from StreamAllMessages::new are retried forever instead of terminating. Permanent failures (DB/query errors from find_groups or StreamGroupMessages::from_cow) trigger endless reconnect loops; callers never receive on_close or a terminal error, violating the contract that only SubscribeError::StreamStale should reconnect.
Also found in 1 other location(s)
crates/xmtp_mls/src/groups/subscriptions.rs:173
The new startup-failure path at
break Err(e)does not actually make subscription setup fail for normal callers.stream_messages_with_callbackonly reports readiness through theoneshotsender, butxmtp_common::StreamHandle::wait_for_ready()ignores a dropped sender (stream_handles.rs:234-237). So if the firstStreamGroupMessages::newfails, the task exits withErr(e)and dropstx, yet callers that only dowait_for_ready()still see apparent success and continue with a stream that never started.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @crates/xmtp_mls/src/subscriptions/mod.rs around line 580:
After the first successful subscription, non-`StreamStale` errors from `StreamAllMessages::new` are retried forever instead of terminating. Permanent failures (DB/query errors from `find_groups` or `StreamGroupMessages::from_cow`) trigger endless reconnect loops; callers never receive `on_close` or a terminal error, violating the contract that only `SubscribeError::StreamStale` should reconnect.
Also found in 1 other location(s):
- crates/xmtp_mls/src/groups/subscriptions.rs:173 -- The new startup-failure path at `break Err(e)` does not actually make subscription setup fail for normal callers. `stream_messages_with_callback` only reports readiness through the `oneshot` sender, but `xmtp_common::StreamHandle::wait_for_ready()` ignores a dropped sender (`stream_handles.rs:234-237`). So if the first `StreamGroupMessages::new` fails, the task exits with `Err(e)` and drops `tx`, yet callers that only do `wait_for_ready()` still see apparent success and continue with a stream that never started.
ab4dad0 to
7d4ff6d
Compare
7d4ff6d to
ff26fb7
Compare
| /// a stream that errors or trips right after creation, or a fast creation failure — capping | ||
| /// a tight reconnect loop to one attempt per floor. The jitter de-syncs clients that are all | ||
| /// tight-looping against the same upstream. | ||
| pub(crate) async fn watchdog_reconnect_delay_since(attempt_started: Instant) { |
There was a problem hiding this comment.
🟢 Low subscriptions/watchdog.rs:147
watchdog_reconnect_delay_since computes reconnect_base + rand_jitter(reconnect_jitter) using unbounded environment variables, so a large XMTP_STREAM_WATCHDOG_RECONNECT_BASE_SECS can overflow Duration::MAX. std::time::Duration addition panics on overflow, so a misconfigured environment variable crashes the process on the first reconnect attempt.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @crates/xmtp_mls/src/subscriptions/watchdog.rs around line 147:
`watchdog_reconnect_delay_since` computes `reconnect_base + rand_jitter(reconnect_jitter)` using unbounded environment variables, so a large `XMTP_STREAM_WATCHDOG_RECONNECT_BASE_SECS` can overflow `Duration::MAX`. `std::time::Duration` addition panics on overflow, so a misconfigured environment variable crashes the process on the first reconnect attempt.
What
A client-side stream-liveness floor: a
WatchdogStreamcombinator that wraps each subscription with an idle timer (reset on every item). If nothing arrives within the idle timeout (default 300s, env-overridable), it yields a singleSubscribeError::StreamStaleand terminates; the consume loops treat that as a signal to re-subscribe from the persisted cursor instead of hanging forever. A silently-dead stream becomes a recoverable reconnect.Why
The v3 path has no server-side keepalive, so an idle subscription whose transport wedges open — e.g. an L7 proxy keeps answering HTTP/2 pings while the backend stream is gone — delivers neither an error nor a close. The consumer just hangs. This is the client-side liveness floor; it's layered with the keepalive work (#3715) and, longer term, a server-side heartbeat (XIP-83) that lets the idle timeout drop well below 300s.
How
subscriptions/watchdog.rs— theWatchdogStream<S>combinator (lazy-armed idle timer via an injectable factory overBoxDynFuture, cross-platform native/wasm; implementsFusedStreamonce tripped) + a test-only idle-timeout override.WatchdogConfig(LazyLock), each knob env-overridable — see Config below. Reconnect is throttled, not delayed: it measures from when each attempt began, so a stream that was healthy for the whole idle timeout reconnects immediately (a couple seconds of backoff on top of a multi-minute idle is pointless), while a tight loop — a fast creation failure or a stream that trips right after creation — is paced to ≥ one attempt per floor. The floor uses a real random jitter draw (not wall-clock low bits) to de-sync clients tight-looping against the same upstream.SubscribeError::StreamStale(retryable) wired into the reconnect-on-stale loops instream_all_messages_with_callback,stream_conversations_with_callback, and the groupstream_messages_with_callback— all three cancellation-token aware. A clean server close or cancellation stops the loop; onlyStreamStalereconnects. A stream-creation failure propagates as a startup error before the first successful subscription, and is retried with backoff (like a stale-trip) after it — so a transient reconnect-time hiccup no longer silently terminates the subscription (addresses the review's finding Refactor to separate wasm bindings from rust crate #1).Config (env vars, captured once at startup; safe defaults)
XMTP_STREAM_WATCHDOG_IDLE_TIMEOUT_SECS(default300) — idle timeout before a stale trip. A forgiving server-side proxy can raise it; a latency-sensitive mobile client can lower it.XMTP_STREAM_WATCHDOG_RECONNECT_BASE_SECS(default1) — minimum interval between reconnect attempts (the throttle floor). A normal long-idle trip reconnects immediately; this only paces a tight loop.XMTP_STREAM_WATCHDOG_RECONNECT_JITTER_MS(default1000) — random jitter added to the throttle floor, to de-sync clients tight-looping against the same upstream (0disables).Tests (all pass; no flakiness across repeated runs)
futures-test, fully deterministic with no real clock: pass-through-then-trip-once, clean-end-is-not-a-trip, no-trip-until-the-timer-fires.StreamStale, then terminated — fused forever after.new_uses_a_real_timer— smoke test that the production constructor wires a real wall-clock timer.WatchdogConfigenv parsing (present / explicit-zero / garbage / missing) and jitter-stays-in-bounds.watchdog_trips_on_idle_real_stream— a real idle v3 stream goes silent and the watchdog trips + terminates (validates the no-server-keepalive premise against the live transport).watchdog_reconnect_keeps_stream_alive— a message sent after a stale-trip still arrives, proving the consume loop re-subscribed (confirmed via logs).just lint-rustgreen; native + wasm compile.Notes / caveats
StreamStaletriggers reconnect. Transport errors still terminate as before (pre-existing behavior, unchanged).StreamClosed/StreamOpenedevent pair). The server heartbeat removes this.LocalEventsbroadcast receiver, which has no replay. Network welcomes are caught up from the cursor, but a locally created group during the reconnect window is not re-emitted (the caller already holds that group). Documented in code; removed by the later refactor that wraps the network sub below the broadcast.Merge gate: full CI streaming suite green.
🤖 Generated with Claude Code
Note
Add client-side stream-liveness watchdog with auto-reconnect on stale
WatchdogStreaminsubscriptions/watchdog.rs, a stream combinator that emits a singleSubscribeError::StreamStaleafter a configurable idle timeout, then terminates — allowing callers to detect silent hangs.stream_conversations_with_callback,stream_all_messages_with_callback, andstream_messages_with_callbackto wrap their inner streams inWatchdogStreamand reconnect automatically on stale events.watchdog_reconnect_delay_since, which applies a jitter-based floor delay to prevent tight reconnect loops.Macroscope summarized ff26fb7.