Skip to content

feat(consumer): periodically restart the Rust consumer to reconnect to Kafka#8085

Open
phacops wants to merge 9 commits into
masterfrom
claude/epic-bohr-dbnj39
Open

feat(consumer): periodically restart the Rust consumer to reconnect to Kafka#8085
phacops wants to merge 9 commits into
masterfrom
claude/epic-bohr-dbnj39

Conversation

@phacops

@phacops phacops commented Jun 22, 2026

Copy link
Copy Markdown
Contributor

Summary

Adds an opt-in mechanism that periodically reconnects the Rust consumer (rust-consumer, the entrypoint every self-hosted consumer service runs) to Kafka. The behavior is gated behind a feature flag and the interval is controlled by a setting, defaulting to 15 minutes.

The reconnect happens in-process: when the interval elapses, the consumer is shut down gracefully and the run loop rebuilds the StreamProcessor (re-establishing the Kafka connection) and resumes consuming — the process is not torn down.

How it's configured

Both knobs are runtime config keys (read from snuba.state), so they can be toggled live without a redeploy — same pattern as the existing quantized_rebalance_consumer_group_delay_secs__<consumer_group> key:

Key Purpose Default
kafka_consumer_periodic_restart__<consumer_group> Feature flag. Enables the behavior when set to a truthy value ("1" / "true"). disabled
kafka_consumer_periodic_restart_interval_secs__<consumer_group> Setting controlling seconds between reconnects. Must be a positive integer. 900 (15 min)

The interval is only honored when the feature flag is enabled; an invalid/missing interval falls back to the 15‑minute default.

How it works

consumer_impl was reworked so that process-wide, one-time setup (logging, Sentry, metrics, procspawn, the Ctrl-C handler) runs once, and the consumer is built and run inside a restart loop:

  • A background timer (spawned per run when the flag is enabled) sleeps for the configured interval, sets a restart_triggered flag, and signals a graceful shutdown.
  • When processor.run() returns, the loop checks the flags:
    • periodic restart (restart_triggered) → rebuild the consumer and continue (reconnect, stay alive);
    • Ctrl-C (shutdown_requested) → exit the loop / process;
    • any other graceful termination (e.g. --stop-at-timestamp) → exit the loop.
  • Because the ProcessorHandle is recreated on each restart but the Ctrl-C handler can only be installed once, the handler signals the live consumer through a shared Mutex<Option<ProcessorHandle>>.
  • The reconnect honors the same delay_kafka_rebalance quantization the Ctrl-C path uses (on both leave and rejoin), so groups that also enable quantized rebalancing don't incur extra Kafka rebalances.

Emits a periodic_restart counter when a reconnect is signaled and a consumer_reconnect counter when the loop actually rebuilds the consumer.

Changes

  • New rust_snuba/src/auto_restart.rsget_restart_interval(consumer_group) -> Option<Duration> reads the two runtime config keys and returns the interval when enabled (or None when disabled). Includes unit tests.
  • rust_snuba/src/consumer.rsconsumer_impl restructured into a one-time-setup + restart-loop shape as described above.

Testing

  • cargo check --lib
  • cargo clippy --lib ✅ (no new warnings)
  • rustfmt --check
  • cargo test --lib ✅ — all runnable tests pass (the 4 failures in my sandbox require ClickHouse/Redis/Python-state services and fail identically on the base commit; they pass in CI).

Notes

  • Default-off: existing consumers are unaffected until the flag is set for their consumer group.
  • A periodic reconnect triggers a Kafka group rebalance (the consumer leaves and rejoins). For groups that also use quantized rebalancing, both the leave and the rejoin are quantized to reduce churn.

🤖 Generated with Claude Code

https://claude.ai/code/session_01JXUY9wZfDEGZRMuBUM1AKo

…o Kafka

Adds an opt-in mechanism that gracefully shuts the Rust consumer down after
a configurable interval so it can be restarted and re-establish its Kafka
connection. This is gated behind a per-consumer-group feature flag and
defaults to a 15 minute interval.

The behavior is controlled by two runtime config keys (read from snuba.state,
so they can be toggled live without a redeploy):

- kafka_consumer_periodic_restart__<consumer_group>: feature flag, enables the
  behavior when set to a truthy value ("1"/"true"). Defaults to disabled.
- kafka_consumer_periodic_restart_interval_secs__<consumer_group>: seconds
  between restarts. Defaults to 900 (15 minutes) when unset/invalid.

When enabled, a background thread signals a graceful shutdown after the
configured interval; the process orchestrator then restarts the consumer,
which reconnects to Kafka. Emits a `periodic_restart` counter metric.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01JXUY9wZfDEGZRMuBUM1AKo
@phacops phacops requested a review from a team as a code owner June 22, 2026 19:34
Comment thread rust_snuba/src/consumer.rs Outdated
claude added 2 commits June 22, 2026 19:38
Apply the same `delay_kafka_rebalance` quantization the Ctrl-C handler uses
before signaling shutdown from the periodic-restart thread, so consumer groups
that enable both periodic restart and quantized rebalancing don't trigger
extra Kafka rebalances.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01JXUY9wZfDEGZRMuBUM1AKo
Rework `consumer_impl` so the periodic reconnect rebuilds the `StreamProcessor`
in-process and resumes consuming, rather than exiting the process and relying on
the orchestrator to restart it.

Process-wide setup (logging, Sentry, metrics, procspawn, the Ctrl-C handler) now
runs once, and the consumer is built and run inside a restart loop. The periodic
restart timer sets a `restart_triggered` flag and signals a graceful shutdown;
when `run()` returns, the loop rebuilds the consumer (reconnecting to Kafka) and
continues. A Ctrl-C shutdown or any other graceful termination (e.g.
`stop_at_timestamp`) exits the loop instead. Because the handle is recreated on
each restart, the Ctrl-C handler (which can only be installed once) signals the
live consumer through a shared mutex.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01JXUY9wZfDEGZRMuBUM1AKo
Comment thread rust_snuba/src/consumer.rs
Comment thread rust_snuba/src/consumer.rs Outdated
Comment thread rust_snuba/src/consumer.rs Outdated
Address review feedback on the in-process restart loop:

- Only quantize the join on the first run. On a periodic restart the timer
  already quantizes the corresponding "leave", so delaying the rejoin too kept
  the consumer offline for two delay periods instead of one.
- Re-check `shutdown_requested` after the first-run rebalance delay so a Ctrl-C
  arriving during the delay stops promptly instead of spawning a restart timer
  and entering `run()`.
- Make the restart timer wait on a channel instead of a bare sleep, and join it
  after the run ends, so it is never left orphaned when the consumer exits for
  another reason (e.g. `stop_at_timestamp`).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01JXUY9wZfDEGZRMuBUM1AKo
Comment thread rust_snuba/src/consumer.rs Outdated
Comment thread rust_snuba/src/consumer.rs
…erruptible

Address two review findings on the restart loop:

- Re-read the quantized rebalance delay from runtime config on every restart
  (and share it with the Ctrl-C handler) so live config updates take effect
  without a full process restart, instead of capturing a stale value once.
- Make the restart timer's leave-quantization wait on the stop channel rather
  than a bare `thread::sleep`, so when the run ends for another reason (e.g.
  `stop_at_timestamp`) we can wake and join the timer immediately instead of
  hanging the shutdown for up to `rebalance_delay_secs`.

Adds `rebalancing::quantized_rebalance_delay`, which computes the quantized
delay duration so callers needing an interruptible wait can use it directly;
`delay_kafka_rebalance` now sleeps on that duration.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01JXUY9wZfDEGZRMuBUM1AKo

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 69c2a00. Configure here.

Comment thread rust_snuba/src/consumer.rs Outdated
Comment thread rust_snuba/src/consumer.rs Outdated
Address two more review findings on the restart loop:

- Replace the unconditional `restart_triggered` flag with a per-run
  compare-and-swap outcome (`OUTCOME_RESTART`/`OUTCOME_STOP`). The timer only
  signals a restart if it claims the outcome first, and the main thread claims
  a stop before joining the timer, so a timer firing concurrently with a
  non-restart shutdown (e.g. `stop_at_timestamp`) can no longer turn it into a
  restart.
- Copy the rebalance delay out of the shared mutex and release the lock before
  the (potentially multi-minute) sleep in the Ctrl-C handler, so the main loop
  never blocks acquiring the same lock during shutdown.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01JXUY9wZfDEGZRMuBUM1AKo
Comment thread rust_snuba/src/rebalancing.rs
A rebalance delay of 0 (settable via runtime config or CLI) made
quantized_rebalance_delay panic on a divide-by-zero in the modulo, which in the
restart timer thread would fail silently. Treat 0 as "no quantization" and
return Duration::ZERO, protecting all callers (the timer, the Ctrl-C handler,
and the first-run join delay). Adds unit tests for the zero and normal cases.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01JXUY9wZfDEGZRMuBUM1AKo
Comment thread rust_snuba/src/consumer.rs
claude and others added 2 commits June 22, 2026 20:24
Clear `current_handle` once a processor's run returns so the Ctrl-C handler
doesn't signal a defunct processor's handle while the next consumer is being
built. The next iteration republishes a fresh handle before running.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01JXUY9wZfDEGZRMuBUM1AKo
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants