feat(consumer): periodically restart the Rust consumer to reconnect to Kafka#8085
Open
phacops wants to merge 9 commits into
Open
feat(consumer): periodically restart the Rust consumer to reconnect to Kafka#8085phacops wants to merge 9 commits into
phacops wants to merge 9 commits into
Conversation
…o Kafka
Adds an opt-in mechanism that gracefully shuts the Rust consumer down after
a configurable interval so it can be restarted and re-establish its Kafka
connection. This is gated behind a per-consumer-group feature flag and
defaults to a 15 minute interval.
The behavior is controlled by two runtime config keys (read from snuba.state,
so they can be toggled live without a redeploy):
- kafka_consumer_periodic_restart__<consumer_group>: feature flag, enables the
behavior when set to a truthy value ("1"/"true"). Defaults to disabled.
- kafka_consumer_periodic_restart_interval_secs__<consumer_group>: seconds
between restarts. Defaults to 900 (15 minutes) when unset/invalid.
When enabled, a background thread signals a graceful shutdown after the
configured interval; the process orchestrator then restarts the consumer,
which reconnects to Kafka. Emits a `periodic_restart` counter metric.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01JXUY9wZfDEGZRMuBUM1AKo
untitaker
approved these changes
Jun 22, 2026
Apply the same `delay_kafka_rebalance` quantization the Ctrl-C handler uses before signaling shutdown from the periodic-restart thread, so consumer groups that enable both periodic restart and quantized rebalancing don't trigger extra Kafka rebalances. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01JXUY9wZfDEGZRMuBUM1AKo
Rework `consumer_impl` so the periodic reconnect rebuilds the `StreamProcessor` in-process and resumes consuming, rather than exiting the process and relying on the orchestrator to restart it. Process-wide setup (logging, Sentry, metrics, procspawn, the Ctrl-C handler) now runs once, and the consumer is built and run inside a restart loop. The periodic restart timer sets a `restart_triggered` flag and signals a graceful shutdown; when `run()` returns, the loop rebuilds the consumer (reconnecting to Kafka) and continues. A Ctrl-C shutdown or any other graceful termination (e.g. `stop_at_timestamp`) exits the loop instead. Because the handle is recreated on each restart, the Ctrl-C handler (which can only be installed once) signals the live consumer through a shared mutex. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01JXUY9wZfDEGZRMuBUM1AKo
Address review feedback on the in-process restart loop: - Only quantize the join on the first run. On a periodic restart the timer already quantizes the corresponding "leave", so delaying the rejoin too kept the consumer offline for two delay periods instead of one. - Re-check `shutdown_requested` after the first-run rebalance delay so a Ctrl-C arriving during the delay stops promptly instead of spawning a restart timer and entering `run()`. - Make the restart timer wait on a channel instead of a bare sleep, and join it after the run ends, so it is never left orphaned when the consumer exits for another reason (e.g. `stop_at_timestamp`). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01JXUY9wZfDEGZRMuBUM1AKo
MeredithAnya
approved these changes
Jun 22, 2026
…erruptible Address two review findings on the restart loop: - Re-read the quantized rebalance delay from runtime config on every restart (and share it with the Ctrl-C handler) so live config updates take effect without a full process restart, instead of capturing a stale value once. - Make the restart timer's leave-quantization wait on the stop channel rather than a bare `thread::sleep`, so when the run ends for another reason (e.g. `stop_at_timestamp`) we can wake and join the timer immediately instead of hanging the shutdown for up to `rebalance_delay_secs`. Adds `rebalancing::quantized_rebalance_delay`, which computes the quantized delay duration so callers needing an interruptible wait can use it directly; `delay_kafka_rebalance` now sleeps on that duration. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01JXUY9wZfDEGZRMuBUM1AKo
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 69c2a00. Configure here.
Address two more review findings on the restart loop: - Replace the unconditional `restart_triggered` flag with a per-run compare-and-swap outcome (`OUTCOME_RESTART`/`OUTCOME_STOP`). The timer only signals a restart if it claims the outcome first, and the main thread claims a stop before joining the timer, so a timer firing concurrently with a non-restart shutdown (e.g. `stop_at_timestamp`) can no longer turn it into a restart. - Copy the rebalance delay out of the shared mutex and release the lock before the (potentially multi-minute) sleep in the Ctrl-C handler, so the main loop never blocks acquiring the same lock during shutdown. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01JXUY9wZfDEGZRMuBUM1AKo
A rebalance delay of 0 (settable via runtime config or CLI) made quantized_rebalance_delay panic on a divide-by-zero in the modulo, which in the restart timer thread would fail silently. Treat 0 as "no quantization" and return Duration::ZERO, protecting all callers (the timer, the Ctrl-C handler, and the first-run join delay). Adds unit tests for the zero and normal cases. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01JXUY9wZfDEGZRMuBUM1AKo
Clear `current_handle` once a processor's run returns so the Ctrl-C handler doesn't signal a defunct processor's handle while the next consumer is being built. The next iteration republishes a fresh handle before running. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01JXUY9wZfDEGZRMuBUM1AKo
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.

Summary
Adds an opt-in mechanism that periodically reconnects the Rust consumer (
rust-consumer, the entrypoint every self-hosted consumer service runs) to Kafka. The behavior is gated behind a feature flag and the interval is controlled by a setting, defaulting to 15 minutes.The reconnect happens in-process: when the interval elapses, the consumer is shut down gracefully and the run loop rebuilds the
StreamProcessor(re-establishing the Kafka connection) and resumes consuming — the process is not torn down.How it's configured
Both knobs are runtime config keys (read from
snuba.state), so they can be toggled live without a redeploy — same pattern as the existingquantized_rebalance_consumer_group_delay_secs__<consumer_group>key:kafka_consumer_periodic_restart__<consumer_group>"1"/"true").kafka_consumer_periodic_restart_interval_secs__<consumer_group>900(15 min)The interval is only honored when the feature flag is enabled; an invalid/missing interval falls back to the 15‑minute default.
How it works
consumer_implwas reworked so that process-wide, one-time setup (logging, Sentry, metrics,procspawn, the Ctrl-C handler) runs once, and the consumer is built and run inside a restart loop:restart_triggeredflag, and signals a graceful shutdown.processor.run()returns, the loop checks the flags:restart_triggered) → rebuild the consumer andcontinue(reconnect, stay alive);shutdown_requested) → exit the loop / process;--stop-at-timestamp) → exit the loop.ProcessorHandleis recreated on each restart but the Ctrl-C handler can only be installed once, the handler signals the live consumer through a sharedMutex<Option<ProcessorHandle>>.delay_kafka_rebalancequantization the Ctrl-C path uses (on both leave and rejoin), so groups that also enable quantized rebalancing don't incur extra Kafka rebalances.Emits a
periodic_restartcounter when a reconnect is signaled and aconsumer_reconnectcounter when the loop actually rebuilds the consumer.Changes
rust_snuba/src/auto_restart.rs—get_restart_interval(consumer_group) -> Option<Duration>reads the two runtime config keys and returns the interval when enabled (orNonewhen disabled). Includes unit tests.rust_snuba/src/consumer.rs—consumer_implrestructured into a one-time-setup + restart-loop shape as described above.Testing
cargo check --lib✅cargo clippy --lib✅ (no new warnings)rustfmt --check✅cargo test --lib✅ — all runnable tests pass (the 4 failures in my sandbox require ClickHouse/Redis/Python-state services and fail identically on the base commit; they pass in CI).Notes
🤖 Generated with Claude Code
https://claude.ai/code/session_01JXUY9wZfDEGZRMuBUM1AKo