From d7ffc347448296d171b643499c2808283d437ad3 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 22 Jun 2026 19:34:30 +0000 Subject: [PATCH 1/8] feat(consumer): periodically restart the Rust consumer to reconnect to Kafka Adds an opt-in mechanism that gracefully shuts the Rust consumer down after a configurable interval so it can be restarted and re-establish its Kafka connection. This is gated behind a per-consumer-group feature flag and defaults to a 15 minute interval. The behavior is controlled by two runtime config keys (read from snuba.state, so they can be toggled live without a redeploy): - kafka_consumer_periodic_restart__: feature flag, enables the behavior when set to a truthy value ("1"/"true"). Defaults to disabled. - kafka_consumer_periodic_restart_interval_secs__: seconds between restarts. Defaults to 900 (15 minutes) when unset/invalid. When enabled, a background thread signals a graceful shutdown after the configured interval; the process orchestrator then restarts the consumer, which reconnects to Kafka. Emits a `periodic_restart` counter metric. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01JXUY9wZfDEGZRMuBUM1AKo --- rust_snuba/src/auto_restart.rs | 138 +++++++++++++++++++++++++++++++++ rust_snuba/src/consumer.rs | 29 +++++++ rust_snuba/src/lib.rs | 1 + 3 files changed, 168 insertions(+) create mode 100644 rust_snuba/src/auto_restart.rs diff --git a/rust_snuba/src/auto_restart.rs b/rust_snuba/src/auto_restart.rs new file mode 100644 index 00000000000..cd4e577623f --- /dev/null +++ b/rust_snuba/src/auto_restart.rs @@ -0,0 +1,138 @@ +use std::time::Duration; + +use crate::runtime_config; + +/// Default interval between consumer restarts when periodic restart is enabled +/// but no explicit interval is configured: 15 minutes. +pub const DEFAULT_RESTART_INTERVAL_SECS: u64 = 900; + +fn is_truthy(value: &str) -> bool { + matches!(value.trim().to_ascii_lowercase().as_str(), "1" | "true") +} + +/// Returns the interval after which the consumer should gracefully shut down so +/// that it can be restarted and reconnect to Kafka, or `None` when periodic +/// restart is not enabled for `consumer_group`. +/// +/// This is gated behind a feature flag and controlled by two runtime config +/// keys (read from `snuba.state`, so they can be toggled live without a +/// redeploy): +/// +/// * `kafka_consumer_periodic_restart__` — the feature flag. +/// Periodic restart is only enabled when this is set to a truthy value +/// (`"1"` or `"true"`). Defaults to disabled. +/// * `kafka_consumer_periodic_restart_interval_secs__` — the +/// number of seconds between restarts. Must be a positive integer; defaults +/// to [`DEFAULT_RESTART_INTERVAL_SECS`] (15 minutes) when unset or invalid. +pub fn get_restart_interval(consumer_group: &str) -> Option { + let enabled = runtime_config::get_str_config( + format!("kafka_consumer_periodic_restart__{consumer_group}").as_str(), + ) + .ok() + .flatten() + .map(|value| is_truthy(&value)) + .unwrap_or(false); + + if !enabled { + return None; + } + + let interval_secs = runtime_config::get_str_config( + format!("kafka_consumer_periodic_restart_interval_secs__{consumer_group}").as_str(), + ) + .ok() + .flatten() + .and_then(|value| value.parse::().ok()) + .filter(|&secs| secs > 0) + .unwrap_or(DEFAULT_RESTART_INTERVAL_SECS); + + Some(Duration::from_secs(interval_secs)) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn clear(consumer_group: &str) { + runtime_config::patch_str_config_for_test( + &format!("kafka_consumer_periodic_restart__{consumer_group}"), + None, + ); + runtime_config::patch_str_config_for_test( + &format!("kafka_consumer_periodic_restart_interval_secs__{consumer_group}"), + None, + ); + } + + #[test] + fn test_disabled_by_default() { + crate::testutils::initialize_python(); + assert_eq!(get_restart_interval("periodic_restart_default_test"), None); + } + + #[test] + fn test_enabled_uses_default_interval() { + crate::testutils::initialize_python(); + let group = "periodic_restart_enabled_test"; + let _guard = scopeguard::guard((), |_| clear(group)); + + runtime_config::patch_str_config_for_test( + &format!("kafka_consumer_periodic_restart__{group}"), + Some("1"), + ); + assert_eq!( + get_restart_interval(group), + Some(Duration::from_secs(DEFAULT_RESTART_INTERVAL_SECS)) + ); + } + + #[test] + fn test_enabled_with_custom_interval() { + crate::testutils::initialize_python(); + let group = "periodic_restart_custom_test"; + let _guard = scopeguard::guard((), |_| clear(group)); + + runtime_config::patch_str_config_for_test( + &format!("kafka_consumer_periodic_restart__{group}"), + Some("true"), + ); + runtime_config::patch_str_config_for_test( + &format!("kafka_consumer_periodic_restart_interval_secs__{group}"), + Some("60"), + ); + assert_eq!(get_restart_interval(group), Some(Duration::from_secs(60))); + } + + #[test] + fn test_invalid_interval_falls_back_to_default() { + crate::testutils::initialize_python(); + let group = "periodic_restart_invalid_test"; + let _guard = scopeguard::guard((), |_| clear(group)); + + runtime_config::patch_str_config_for_test( + &format!("kafka_consumer_periodic_restart__{group}"), + Some("1"), + ); + runtime_config::patch_str_config_for_test( + &format!("kafka_consumer_periodic_restart_interval_secs__{group}"), + Some("garbage"), + ); + assert_eq!( + get_restart_interval(group), + Some(Duration::from_secs(DEFAULT_RESTART_INTERVAL_SECS)) + ); + } + + #[test] + fn test_interval_without_flag_is_ignored() { + crate::testutils::initialize_python(); + let group = "periodic_restart_no_flag_test"; + let _guard = scopeguard::guard((), |_| clear(group)); + + runtime_config::patch_str_config_for_test( + &format!("kafka_consumer_periodic_restart_interval_secs__{group}"), + Some("60"), + ); + assert_eq!(get_restart_interval(group), None); + } +} diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index ff3e33b52d0..6f1e27e2d50 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::thread; use std::time::Duration; use chrono::{DateTime, Utc}; @@ -6,6 +7,7 @@ use chrono::{DateTime, Utc}; use sentry_arroyo::backends::kafka::config::KafkaConfig; use sentry_arroyo::backends::kafka::producer::KafkaProducer; use sentry_arroyo::backends::kafka::types::KafkaPayload; +use sentry_arroyo::counter; use sentry_arroyo::metrics; use sentry_arroyo::processing::dlq::{DlqLimit, DlqPolicy, KafkaDlqProducer}; @@ -18,6 +20,7 @@ use pyo3::types::PyBytes; use sentry_options::init_with_schemas; +use crate::auto_restart; use crate::config; use crate::factory_v2::ConsumerStrategyFactoryV2; use crate::logging::{setup_logging, setup_sentry}; @@ -297,6 +300,32 @@ pub fn consumer_impl( let mut handle = processor.get_handle(); + // Periodically restart the consumer to reconnect to Kafka when the + // periodic-restart feature flag is enabled for this consumer group. After + // the configured interval (default 15 minutes) we signal a graceful + // shutdown; the process orchestrator then restarts the consumer, which + // re-establishes its Kafka connection. See `crate::auto_restart` for the + // controlling runtime config keys. + if let Some(restart_interval) = auto_restart::get_restart_interval(consumer_group) { + tracing::info!( + consumer_group, + interval_secs = restart_interval.as_secs(), + "Periodic Kafka reconnect enabled; consumer will restart after the configured interval", + ); + let mut restart_handle = handle.clone(); + let consumer_group = consumer_group.to_owned(); + thread::spawn(move || { + thread::sleep(restart_interval); + tracing::info!( + consumer_group, + interval_secs = restart_interval.as_secs(), + "Periodic restart interval elapsed; signaling shutdown to reconnect to Kafka", + ); + counter!("periodic_restart"); + restart_handle.signal_shutdown(); + }); + } + match rebalance_delay_secs { Some(secs) => { ctrlc::set_handler(move || { diff --git a/rust_snuba/src/lib.rs b/rust_snuba/src/lib.rs index 7fd2bedde6f..6784c6746a5 100644 --- a/rust_snuba/src/lib.rs +++ b/rust_snuba/src/lib.rs @@ -2,6 +2,7 @@ pub(crate) const SNUBA_SCHEMA: &str = include_str!("../../sentry-options/schemas/snuba/schema.json"); mod accepted_outcomes_consumer; +mod auto_restart; mod config; mod consumer; mod factory_v2; From 53395995eda00c49980f4637f738d96522a2e973 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 22 Jun 2026 19:38:05 +0000 Subject: [PATCH 2/8] fix(consumer): quantize rebalance on periodic restart Apply the same `delay_kafka_rebalance` quantization the Ctrl-C handler uses before signaling shutdown from the periodic-restart thread, so consumer groups that enable both periodic restart and quantized rebalancing don't trigger extra Kafka rebalances. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01JXUY9wZfDEGZRMuBUM1AKo --- rust_snuba/src/consumer.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index 6f1e27e2d50..d381480503c 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -322,6 +322,12 @@ pub fn consumer_impl( "Periodic restart interval elapsed; signaling shutdown to reconnect to Kafka", ); counter!("periodic_restart"); + // Honor the same rebalance quantization the Ctrl-C handler uses so a + // periodic restart does not trigger extra Kafka rebalances for + // consumer groups that also enable quantized rebalancing. + if let Some(secs) = rebalance_delay_secs { + rebalancing::delay_kafka_rebalance(secs); + } restart_handle.signal_shutdown(); }); } From 4488625e7a2f1dbcb9c8cfa679f10c858d37b4b5 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 22 Jun 2026 19:49:57 +0000 Subject: [PATCH 3/8] ref(consumer): restart the consumer loop in-process instead of exiting Rework `consumer_impl` so the periodic reconnect rebuilds the `StreamProcessor` in-process and resumes consuming, rather than exiting the process and relying on the orchestrator to restart it. Process-wide setup (logging, Sentry, metrics, procspawn, the Ctrl-C handler) now runs once, and the consumer is built and run inside a restart loop. The periodic restart timer sets a `restart_triggered` flag and signals a graceful shutdown; when `run()` returns, the loop rebuilds the consumer (reconnecting to Kafka) and continues. A Ctrl-C shutdown or any other graceful termination (e.g. `stop_at_timestamp`) exits the loop instead. Because the handle is recreated on each restart, the Ctrl-C handler (which can only be installed once) signals the live consumer through a shared mutex. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01JXUY9wZfDEGZRMuBUM1AKo --- rust_snuba/src/consumer.rs | 422 +++++++++++++++++++++---------------- 1 file changed, 237 insertions(+), 185 deletions(-) diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index d381480503c..961957188f3 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -1,4 +1,5 @@ -use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; @@ -12,7 +13,7 @@ use sentry_arroyo::metrics; use sentry_arroyo::processing::dlq::{DlqLimit, DlqPolicy, KafkaDlqProducer}; use sentry_arroyo::processing::strategies::run_task_in_threads::ConcurrencyConfig; -use sentry_arroyo::processing::StreamProcessor; +use sentry_arroyo::processing::{ProcessorHandle, StreamProcessor}; use sentry_arroyo::types::Topic; use pyo3::prelude::*; @@ -102,21 +103,11 @@ pub fn consumer_impl( init_with_schemas(&[("snuba", crate::SNUBA_SCHEMA)]) .expect("failed to initialize sentry-options"); + // Parse the config once up front for process-wide, one-time setup (logging, + // Sentry, metrics, etc.). The consumer itself is (re)built from a fresh + // parse inside the restart loop below, so a periodic restart can reconnect + // to Kafka without tearing the whole process down. let consumer_config = config::ConsumerConfig::load_from_str(consumer_config_raw).unwrap(); - let max_batch_size = consumer_config.max_batch_size; - let max_batch_time = Duration::from_millis(consumer_config.max_batch_time_ms); - let max_batch_size_calculation = consumer_config.max_batch_size_calculation; - - let batch_write_timeout = match batch_write_timeout_ms { - Some(timeout_ms) => { - if timeout_ms >= consumer_config.max_batch_time_ms { - Some(Duration::from_millis(timeout_ms)) - } else { - None - } - } - None => None, - }; for storage in &consumer_config.storages { tracing::info!( @@ -136,8 +127,6 @@ pub fn consumer_impl( let mut _sentry_guard = None; - let env_config = consumer_config.env.clone(); - // setup sentry if let Some(dsn) = consumer_config.env.sentry_dsn { tracing::debug!(sentry_dsn = dsn); @@ -167,94 +156,6 @@ pub fn consumer_impl( procspawn::init(); } - let first_storage = consumer_config.storages[0].clone(); - - tracing::info!( - storage = first_storage.name, - "Starting consumer for {:?}", - first_storage.name, - ); - - let config = KafkaConfig::new_consumer_config( - vec![], - consumer_group.to_owned(), - auto_offset_reset.parse().expect( - "Invalid value for `auto_offset_reset`. Valid values: `error`, `earliest`, `latest`", - ), - !no_strict_offset_reset, - max_poll_interval_ms, - Some(consumer_config.raw_topic.broker_config), - ); - - let logical_topic_name = consumer_config.raw_topic.logical_topic_name; - - // XXX: this variable must live for the lifetime of the entire consumer. we should do something - // to ensure this statically, such as use actual Rust lifetimes or ensuring the runtime stays - // alive by storing it inside of the DlqPolicy - let dlq_concurrency_config = ConcurrencyConfig::new(10); - - // DLQ policy applies only if we are not skipping writes, otherwise we don't want to be - // writing to the DLQ topics in prod. - - let blq_producer_config = consumer_config.dlq_topic.as_ref().map(|dlq_topic_config| { - let mut overrides = dlq_topic_config.broker_config.clone(); - overrides.insert("message.max.bytes".to_string(), "10000000".to_string()); // 10 MB, broker max - KafkaConfig::new_producer_config(vec![], Some(overrides)) - }); - - let dlq_topic = consumer_config - .dlq_topic - .as_ref() - .map(|dlq_topic_config| Topic::new(&dlq_topic_config.physical_topic_name)); - - let dlq_policy = consumer_config.dlq_topic.map(|dlq_topic_config| { - let producer = KafkaProducer::new(KafkaConfig::new_producer_config( - vec![], - Some(dlq_topic_config.broker_config), - )); - - let kafka_dlq_producer = Box::new(KafkaDlqProducer::new( - producer, - Topic::new(&dlq_topic_config.physical_topic_name), - )); - - let handle = dlq_concurrency_config.handle(); - DlqPolicy::new( - handle, - kafka_dlq_producer, - DlqLimit { - max_invalid_ratio: None, - max_consecutive_count: None, - }, - max_dlq_buffer_length, - ) - }); - - let commit_log_producer = if let Some(topic_config) = consumer_config.commit_log_topic { - let producer_config = - KafkaConfig::new_producer_config(vec![], Some(topic_config.broker_config)); - let producer = KafkaProducer::new(producer_config); - Some(( - Arc::new(producer), - Topic::new(&topic_config.physical_topic_name), - )) - } else { - None - }; - - let replacements_config = if let Some(topic_config) = consumer_config.replacements_topic { - let producer_config = - KafkaConfig::new_producer_config(vec![], Some(topic_config.broker_config)); - Some(( - producer_config, - Topic::new(&topic_config.physical_topic_name), - )) - } else { - None - }; - - let topic = Topic::new(&consumer_config.raw_topic.physical_topic_name); - let mut rebalance_delay_secs = consumer_config .raw_topic .quantized_rebalance_consumer_group_delay_secs; @@ -262,99 +163,250 @@ pub fn consumer_impl( if let Some(secs) = config_rebalance_delay_secs { rebalance_delay_secs = Some(secs); } - if let Some(secs) = rebalance_delay_secs { - rebalancing::delay_kafka_rebalance(secs) - } - let factory = ConsumerStrategyFactoryV2 { - storage_config: first_storage, - env_config, - logical_topic_name, - max_batch_size, - max_batch_time, - max_batch_size_calculation, - processing_concurrency: ConcurrencyConfig::new(concurrency), - clickhouse_concurrency: ConcurrencyConfig::new(clickhouse_concurrency), - commitlog_concurrency: ConcurrencyConfig::new(2), - replacements_concurrency: ConcurrencyConfig::new(4), - async_inserts, - python_max_queue_depth, - use_rust_processor, - health_check_file: health_check_file.map(ToOwned::to_owned), - enforce_schema, - commit_log_producer, - replacements_config, - physical_consumer_group: consumer_group.to_owned(), - physical_topic_name: Topic::new(&consumer_config.raw_topic.physical_topic_name), - accountant_topic_config: consumer_config.accountant_topic, - stop_at_timestamp, - batch_write_timeout, - join_timeout_ms, - health_check: health_check.to_string(), - use_row_binary, - blq_producer_config: blq_producer_config.clone(), - blq_topic: dlq_topic, - }; + // `shutdown_requested` is set by the Ctrl-C handler to permanently stop the + // process. `restart_triggered` is set by the periodic-restart timer to ask + // the loop below to rebuild the consumer (reconnecting to Kafka) without + // exiting the process. + let shutdown_requested = Arc::new(AtomicBool::new(false)); + let restart_triggered = Arc::new(AtomicBool::new(false)); + + // The Ctrl-C handler can only be installed once per process, but the + // consumer (and therefore its `ProcessorHandle`) is rebuilt on every + // restart. We share the live handle through a mutex so the handler always + // signals the consumer that is currently running. + let current_handle: Arc>> = Arc::new(Mutex::new(None)); + + { + let shutdown_requested = shutdown_requested.clone(); + let current_handle = current_handle.clone(); + ctrlc::set_handler(move || { + shutdown_requested.store(true, Ordering::SeqCst); + if let Some(secs) = rebalance_delay_secs { + rebalancing::delay_kafka_rebalance(secs); + } + if let Some(handle) = current_handle.lock().unwrap().as_mut() { + handle.signal_shutdown(); + } + }) + .expect("Error setting Ctrl-C handler"); + } - let processor = StreamProcessor::with_kafka(config, factory, topic, dlq_policy); + // Restart loop. Each iteration builds a fresh consumer (reconnecting to + // Kafka) and runs it until it shuts down. A shutdown caused by the periodic + // restart timer loops back around to rebuild the consumer; any other + // graceful shutdown (Ctrl-C, `stop_at_timestamp`, ...) exits the loop. + loop { + let consumer_config = config::ConsumerConfig::load_from_str(consumer_config_raw).unwrap(); + let max_batch_size = consumer_config.max_batch_size; + let max_batch_time = Duration::from_millis(consumer_config.max_batch_time_ms); + let max_batch_size_calculation = consumer_config.max_batch_size_calculation; + + let batch_write_timeout = match batch_write_timeout_ms { + Some(timeout_ms) => { + if timeout_ms >= consumer_config.max_batch_time_ms { + Some(Duration::from_millis(timeout_ms)) + } else { + None + } + } + None => None, + }; - let mut handle = processor.get_handle(); + let env_config = consumer_config.env.clone(); + let first_storage = consumer_config.storages[0].clone(); - // Periodically restart the consumer to reconnect to Kafka when the - // periodic-restart feature flag is enabled for this consumer group. After - // the configured interval (default 15 minutes) we signal a graceful - // shutdown; the process orchestrator then restarts the consumer, which - // re-establishes its Kafka connection. See `crate::auto_restart` for the - // controlling runtime config keys. - if let Some(restart_interval) = auto_restart::get_restart_interval(consumer_group) { tracing::info!( - consumer_group, - interval_secs = restart_interval.as_secs(), - "Periodic Kafka reconnect enabled; consumer will restart after the configured interval", + storage = first_storage.name, + "Starting consumer for {:?}", + first_storage.name, + ); + + let config = KafkaConfig::new_consumer_config( + vec![], + consumer_group.to_owned(), + auto_offset_reset.parse().expect( + "Invalid value for `auto_offset_reset`. Valid values: `error`, `earliest`, `latest`", + ), + !no_strict_offset_reset, + max_poll_interval_ms, + Some(consumer_config.raw_topic.broker_config), ); - let mut restart_handle = handle.clone(); - let consumer_group = consumer_group.to_owned(); - thread::spawn(move || { - thread::sleep(restart_interval); + + let logical_topic_name = consumer_config.raw_topic.logical_topic_name; + + // XXX: this variable must live for the lifetime of the entire consumer. we should do something + // to ensure this statically, such as use actual Rust lifetimes or ensuring the runtime stays + // alive by storing it inside of the DlqPolicy + let dlq_concurrency_config = ConcurrencyConfig::new(10); + + // DLQ policy applies only if we are not skipping writes, otherwise we don't want to be + // writing to the DLQ topics in prod. + + let blq_producer_config = consumer_config.dlq_topic.as_ref().map(|dlq_topic_config| { + let mut overrides = dlq_topic_config.broker_config.clone(); + overrides.insert("message.max.bytes".to_string(), "10000000".to_string()); // 10 MB, broker max + KafkaConfig::new_producer_config(vec![], Some(overrides)) + }); + + let dlq_topic = consumer_config + .dlq_topic + .as_ref() + .map(|dlq_topic_config| Topic::new(&dlq_topic_config.physical_topic_name)); + + let dlq_policy = consumer_config.dlq_topic.map(|dlq_topic_config| { + let producer = KafkaProducer::new(KafkaConfig::new_producer_config( + vec![], + Some(dlq_topic_config.broker_config), + )); + + let kafka_dlq_producer = Box::new(KafkaDlqProducer::new( + producer, + Topic::new(&dlq_topic_config.physical_topic_name), + )); + + let handle = dlq_concurrency_config.handle(); + DlqPolicy::new( + handle, + kafka_dlq_producer, + DlqLimit { + max_invalid_ratio: None, + max_consecutive_count: None, + }, + max_dlq_buffer_length, + ) + }); + + let commit_log_producer = if let Some(topic_config) = consumer_config.commit_log_topic { + let producer_config = + KafkaConfig::new_producer_config(vec![], Some(topic_config.broker_config)); + let producer = KafkaProducer::new(producer_config); + Some(( + Arc::new(producer), + Topic::new(&topic_config.physical_topic_name), + )) + } else { + None + }; + + let replacements_config = if let Some(topic_config) = consumer_config.replacements_topic { + let producer_config = + KafkaConfig::new_producer_config(vec![], Some(topic_config.broker_config)); + Some(( + producer_config, + Topic::new(&topic_config.physical_topic_name), + )) + } else { + None + }; + + let topic = Topic::new(&consumer_config.raw_topic.physical_topic_name); + + let factory = ConsumerStrategyFactoryV2 { + storage_config: first_storage, + env_config, + logical_topic_name, + max_batch_size, + max_batch_time, + max_batch_size_calculation, + processing_concurrency: ConcurrencyConfig::new(concurrency), + clickhouse_concurrency: ConcurrencyConfig::new(clickhouse_concurrency), + commitlog_concurrency: ConcurrencyConfig::new(2), + replacements_concurrency: ConcurrencyConfig::new(4), + async_inserts, + python_max_queue_depth, + use_rust_processor, + health_check_file: health_check_file.map(ToOwned::to_owned), + enforce_schema, + commit_log_producer, + replacements_config, + physical_consumer_group: consumer_group.to_owned(), + physical_topic_name: Topic::new(&consumer_config.raw_topic.physical_topic_name), + accountant_topic_config: consumer_config.accountant_topic, + stop_at_timestamp, + batch_write_timeout, + join_timeout_ms, + health_check: health_check.to_string(), + use_row_binary, + blq_producer_config: blq_producer_config.clone(), + blq_topic: dlq_topic, + }; + + let processor = StreamProcessor::with_kafka(config, factory, topic, dlq_policy); + + // Publish this consumer's handle so the Ctrl-C handler signals the live + // consumer. + let handle = processor.get_handle(); + *current_handle.lock().unwrap() = Some(handle.clone()); + + // If a shutdown was requested before we managed to (re)build the + // consumer, stop now instead of starting it. + if shutdown_requested.load(Ordering::SeqCst) { + break; + } + + // Quantize the (re)join when configured, matching the Ctrl-C/leave path. + if let Some(secs) = rebalance_delay_secs { + rebalancing::delay_kafka_rebalance(secs); + } + + // Periodically restart the consumer to reconnect to Kafka when the + // periodic-restart feature flag is enabled for this consumer group. + // After the configured interval (default 15 minutes) we signal a + // graceful shutdown; the restart loop then rebuilds the consumer, which + // re-establishes its Kafka connection. See `crate::auto_restart` for the + // controlling runtime config keys. + if let Some(restart_interval) = auto_restart::get_restart_interval(consumer_group) { tracing::info!( consumer_group, interval_secs = restart_interval.as_secs(), - "Periodic restart interval elapsed; signaling shutdown to reconnect to Kafka", + "Periodic Kafka reconnect enabled; consumer will restart after the configured interval", ); - counter!("periodic_restart"); - // Honor the same rebalance quantization the Ctrl-C handler uses so a - // periodic restart does not trigger extra Kafka rebalances for - // consumer groups that also enable quantized rebalancing. - if let Some(secs) = rebalance_delay_secs { - rebalancing::delay_kafka_rebalance(secs); - } - restart_handle.signal_shutdown(); - }); - } - - match rebalance_delay_secs { - Some(secs) => { - ctrlc::set_handler(move || { - rebalancing::delay_kafka_rebalance(secs); - handle.signal_shutdown(); - }) - .expect("Error setting Ctrl-C handler"); + let mut restart_handle = handle.clone(); + let restart_triggered = restart_triggered.clone(); + let consumer_group = consumer_group.to_owned(); + thread::spawn(move || { + thread::sleep(restart_interval); + tracing::info!( + consumer_group, + interval_secs = restart_interval.as_secs(), + "Periodic restart interval elapsed; signaling shutdown to reconnect to Kafka", + ); + counter!("periodic_restart"); + // Honor the same rebalance quantization the Ctrl-C handler uses + // so a periodic restart does not trigger extra Kafka rebalances + // for consumer groups that also enable quantized rebalancing. + if let Some(secs) = rebalance_delay_secs { + rebalancing::delay_kafka_rebalance(secs); + } + restart_triggered.store(true, Ordering::SeqCst); + restart_handle.signal_shutdown(); + }); } - None => { - ctrlc::set_handler(move || { - handle.signal_shutdown(); - }) - .expect("Error setting Ctrl-C handler"); + + match processor.run() { + Ok(()) => { + if shutdown_requested.load(Ordering::SeqCst) { + break; + } + if restart_triggered.swap(false, Ordering::SeqCst) { + counter!("consumer_reconnect"); + tracing::info!("Restarting consumer to reconnect to Kafka"); + continue; + } + // Graceful termination that was not a periodic restart (e.g. + // `stop_at_timestamp`): exit the loop. + break; + } + Err(error) => { + let error: &dyn std::error::Error = &error; + tracing::error!("{:?}", error); + return 1; + } } } - if let Err(error) = processor.run() { - let error: &dyn std::error::Error = &error; - tracing::error!("{:?}", error); - 1 - } else { - 0 - } + 0 } mod exceptions { From 88c3af198a673891e92e2bff7928b286f2008b39 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 22 Jun 2026 19:57:09 +0000 Subject: [PATCH 4/8] fix(consumer): tidy restart-loop rebalance delay and timer lifecycle Address review feedback on the in-process restart loop: - Only quantize the join on the first run. On a periodic restart the timer already quantizes the corresponding "leave", so delaying the rejoin too kept the consumer offline for two delay periods instead of one. - Re-check `shutdown_requested` after the first-run rebalance delay so a Ctrl-C arriving during the delay stops promptly instead of spawning a restart timer and entering `run()`. - Make the restart timer wait on a channel instead of a bare sleep, and join it after the run ends, so it is never left orphaned when the consumer exits for another reason (e.g. `stop_at_timestamp`). Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01JXUY9wZfDEGZRMuBUM1AKo --- rust_snuba/src/consumer.rs | 76 +++++++++++++++++++++++++++----------- 1 file changed, 54 insertions(+), 22 deletions(-) diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index 961957188f3..0da9a772c46 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -1,5 +1,5 @@ use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::{mpsc, Arc, Mutex}; use std::thread; use std::time::Duration; @@ -196,6 +196,7 @@ pub fn consumer_impl( // Kafka) and runs it until it shuts down. A shutdown caused by the periodic // restart timer loops back around to rebuild the consumer; any other // graceful shutdown (Ctrl-C, `stop_at_timestamp`, ...) exits the loop. + let mut first_run = true; loop { let consumer_config = config::ConsumerConfig::load_from_str(consumer_config_raw).unwrap(); let max_batch_size = consumer_config.max_batch_size; @@ -345,10 +346,22 @@ pub fn consumer_impl( break; } - // Quantize the (re)join when configured, matching the Ctrl-C/leave path. - if let Some(secs) = rebalance_delay_secs { - rebalancing::delay_kafka_rebalance(secs); + // Quantize the initial join when configured, matching the original + // startup behaviour. We only do this on the first run: on a periodic + // restart the timer below already quantized the corresponding "leave", + // so delaying the rejoin too would keep the consumer offline for two + // delay periods instead of one. + if first_run { + if let Some(secs) = rebalance_delay_secs { + rebalancing::delay_kafka_rebalance(secs); + } + // A shutdown may have arrived during the delay; stop now instead of + // starting the consumer and spawning a needless restart timer. + if shutdown_requested.load(Ordering::SeqCst) { + break; + } } + first_run = false; // Periodically restart the consumer to reconnect to Kafka when the // periodic-restart feature flag is enabled for this consumer group. @@ -356,35 +369,54 @@ pub fn consumer_impl( // graceful shutdown; the restart loop then rebuilds the consumer, which // re-establishes its Kafka connection. See `crate::auto_restart` for the // controlling runtime config keys. - if let Some(restart_interval) = auto_restart::get_restart_interval(consumer_group) { + // + // The timer waits on a channel rather than a bare sleep so that, when + // the run ends for any reason, we can stop and join it below instead of + // leaking a sleeping thread. + let restart_timer = auto_restart::get_restart_interval(consumer_group).map(|interval| { tracing::info!( consumer_group, - interval_secs = restart_interval.as_secs(), + interval_secs = interval.as_secs(), "Periodic Kafka reconnect enabled; consumer will restart after the configured interval", ); + let (stop_tx, stop_rx) = mpsc::channel::<()>(); let mut restart_handle = handle.clone(); let restart_triggered = restart_triggered.clone(); let consumer_group = consumer_group.to_owned(); - thread::spawn(move || { - thread::sleep(restart_interval); - tracing::info!( - consumer_group, - interval_secs = restart_interval.as_secs(), - "Periodic restart interval elapsed; signaling shutdown to reconnect to Kafka", - ); - counter!("periodic_restart"); - // Honor the same rebalance quantization the Ctrl-C handler uses - // so a periodic restart does not trigger extra Kafka rebalances - // for consumer groups that also enable quantized rebalancing. - if let Some(secs) = rebalance_delay_secs { - rebalancing::delay_kafka_rebalance(secs); + let join = thread::spawn(move || { + // Wake up either when the interval elapses or when we are asked + // to stop (the run ended for another reason). + if let Err(mpsc::RecvTimeoutError::Timeout) = stop_rx.recv_timeout(interval) { + tracing::info!( + consumer_group, + interval_secs = interval.as_secs(), + "Periodic restart interval elapsed; signaling shutdown to reconnect to Kafka", + ); + counter!("periodic_restart"); + // Honor the same rebalance quantization the Ctrl-C handler + // uses so a periodic restart does not trigger extra Kafka + // rebalances for groups that also enable quantized + // rebalancing. + if let Some(secs) = rebalance_delay_secs { + rebalancing::delay_kafka_rebalance(secs); + } + restart_triggered.store(true, Ordering::SeqCst); + restart_handle.signal_shutdown(); } - restart_triggered.store(true, Ordering::SeqCst); - restart_handle.signal_shutdown(); }); + (stop_tx, join) + }); + + let result = processor.run(); + + // The run has ended: stop the restart timer (if it hasn't already fired) + // and join it so we never leave an orphaned sleeping thread behind. + if let Some((stop_tx, join)) = restart_timer { + drop(stop_tx); + let _ = join.join(); } - match processor.run() { + match result { Ok(()) => { if shutdown_requested.load(Ordering::SeqCst) { break; From 69c2a00eac640b86c6af1ca260b6d824af07e78d Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 22 Jun 2026 20:05:59 +0000 Subject: [PATCH 5/8] fix(consumer): refresh rebalance delay per restart and make timer interruptible Address two review findings on the restart loop: - Re-read the quantized rebalance delay from runtime config on every restart (and share it with the Ctrl-C handler) so live config updates take effect without a full process restart, instead of capturing a stale value once. - Make the restart timer's leave-quantization wait on the stop channel rather than a bare `thread::sleep`, so when the run ends for another reason (e.g. `stop_at_timestamp`) we can wake and join the timer immediately instead of hanging the shutdown for up to `rebalance_delay_secs`. Adds `rebalancing::quantized_rebalance_delay`, which computes the quantized delay duration so callers needing an interruptible wait can use it directly; `delay_kafka_rebalance` now sleeps on that duration. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01JXUY9wZfDEGZRMuBUM1AKo --- rust_snuba/src/consumer.rs | 65 ++++++++++++++++++++++------------- rust_snuba/src/rebalancing.rs | 30 +++++++++------- 2 files changed, 60 insertions(+), 35 deletions(-) diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index 0da9a772c46..29f3ff2ee3e 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -156,13 +156,14 @@ pub fn consumer_impl( procspawn::init(); } - let mut rebalance_delay_secs = consumer_config + // The "quantized rebalance" delay may be set statically in the consumer + // config and/or overridden live via runtime config. We capture the static + // value once (it comes from the immutable startup config) and refresh the + // runtime override on every restart in the loop below, so live updates take + // effect without a full process restart. + let static_rebalance_delay_secs = consumer_config .raw_topic .quantized_rebalance_consumer_group_delay_secs; - let config_rebalance_delay_secs = rebalancing::get_rebalance_delay_secs(consumer_group); - if let Some(secs) = config_rebalance_delay_secs { - rebalance_delay_secs = Some(secs); - } // `shutdown_requested` is set by the Ctrl-C handler to permanently stop the // process. `restart_triggered` is set by the periodic-restart timer to ask @@ -174,15 +175,21 @@ pub fn consumer_impl( // The Ctrl-C handler can only be installed once per process, but the // consumer (and therefore its `ProcessorHandle`) is rebuilt on every // restart. We share the live handle through a mutex so the handler always - // signals the consumer that is currently running. + // signals the consumer that is currently running. The current rebalance + // delay is shared the same way so the handler quantizes its "leave" using + // the latest configured value. let current_handle: Arc>> = Arc::new(Mutex::new(None)); + let rebalance_delay_secs = Arc::new(Mutex::new( + rebalancing::get_rebalance_delay_secs(consumer_group).or(static_rebalance_delay_secs), + )); { let shutdown_requested = shutdown_requested.clone(); let current_handle = current_handle.clone(); + let rebalance_delay_secs = rebalance_delay_secs.clone(); ctrlc::set_handler(move || { shutdown_requested.store(true, Ordering::SeqCst); - if let Some(secs) = rebalance_delay_secs { + if let Some(secs) = *rebalance_delay_secs.lock().unwrap() { rebalancing::delay_kafka_rebalance(secs); } if let Some(handle) = current_handle.lock().unwrap().as_mut() { @@ -198,6 +205,12 @@ pub fn consumer_impl( // graceful shutdown (Ctrl-C, `stop_at_timestamp`, ...) exits the loop. let mut first_run = true; loop { + // Refresh the rebalance delay from runtime config so live updates take + // effect on each restart, and publish it for the Ctrl-C handler. + let current_rebalance_delay_secs = + rebalancing::get_rebalance_delay_secs(consumer_group).or(static_rebalance_delay_secs); + *rebalance_delay_secs.lock().unwrap() = current_rebalance_delay_secs; + let consumer_config = config::ConsumerConfig::load_from_str(consumer_config_raw).unwrap(); let max_batch_size = consumer_config.max_batch_size; let max_batch_time = Duration::from_millis(consumer_config.max_batch_time_ms); @@ -352,7 +365,7 @@ pub fn consumer_impl( // so delaying the rejoin too would keep the consumer offline for two // delay periods instead of one. if first_run { - if let Some(secs) = rebalance_delay_secs { + if let Some(secs) = current_rebalance_delay_secs { rebalancing::delay_kafka_rebalance(secs); } // A shutdown may have arrived during the delay; stop now instead of @@ -386,23 +399,29 @@ pub fn consumer_impl( let join = thread::spawn(move || { // Wake up either when the interval elapses or when we are asked // to stop (the run ended for another reason). - if let Err(mpsc::RecvTimeoutError::Timeout) = stop_rx.recv_timeout(interval) { - tracing::info!( - consumer_group, - interval_secs = interval.as_secs(), - "Periodic restart interval elapsed; signaling shutdown to reconnect to Kafka", - ); - counter!("periodic_restart"); - // Honor the same rebalance quantization the Ctrl-C handler - // uses so a periodic restart does not trigger extra Kafka - // rebalances for groups that also enable quantized - // rebalancing. - if let Some(secs) = rebalance_delay_secs { - rebalancing::delay_kafka_rebalance(secs); + if stop_rx.recv_timeout(interval) != Err(mpsc::RecvTimeoutError::Timeout) { + return; + } + tracing::info!( + consumer_group, + interval_secs = interval.as_secs(), + "Periodic restart interval elapsed; signaling shutdown to reconnect to Kafka", + ); + counter!("periodic_restart"); + // Honor the same rebalance quantization the Ctrl-C handler uses + // so a periodic restart does not trigger extra Kafka rebalances + // for groups that also enable quantized rebalancing. We wait on + // the stop channel rather than sleeping, so that if the run ends + // for another reason during this delay we can bail immediately + // instead of blocking the join below. + if let Some(secs) = current_rebalance_delay_secs { + let delay = rebalancing::quantized_rebalance_delay(secs); + if stop_rx.recv_timeout(delay) != Err(mpsc::RecvTimeoutError::Timeout) { + return; } - restart_triggered.store(true, Ordering::SeqCst); - restart_handle.signal_shutdown(); } + restart_triggered.store(true, Ordering::SeqCst); + restart_handle.signal_shutdown(); }); (stop_tx, join) }); diff --git a/rust_snuba/src/rebalancing.rs b/rust_snuba/src/rebalancing.rs index 9be9d840f6a..11e293cc88f 100644 --- a/rust_snuba/src/rebalancing.rs +++ b/rust_snuba/src/rebalancing.rs @@ -2,6 +2,21 @@ use crate::runtime_config; use std::thread; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +/// Computes how long to wait so that the next rebalance step lands on a +/// `configured_delay_secs` "tick". See [`delay_kafka_rebalance`] for why we +/// quantize rebalances. Exposed separately so callers that need an +/// interruptible wait (e.g. a thread that may be asked to stop early) can wait +/// on this duration themselves instead of using the blocking sleep below. +pub fn quantized_rebalance_delay(configured_delay_secs: u64) -> Duration { + let current_time = SystemTime::now(); + let time_elapsed_in_slot = match current_time.duration_since(UNIX_EPOCH) { + Ok(duration) => duration.as_secs(), + Err(_) => 0, + } % configured_delay_secs; + + Duration::from_secs(configured_delay_secs - time_elapsed_in_slot) +} + pub fn delay_kafka_rebalance(configured_delay_secs: u64) { /* * Introduces a configurable delay to the consumer topic @@ -13,19 +28,10 @@ pub fn delay_kafka_rebalance(configured_delay_secs: u64) { * fewer "stop the world rebalancing" occurrences and more time * for the consumer group to stabilize and make progress. */ - let current_time = SystemTime::now(); - let time_elapsed_in_slot = match current_time.duration_since(UNIX_EPOCH) { - Ok(duration) => duration.as_secs(), - Err(_) => 0, - } % configured_delay_secs; - tracing::info!( - "Delaying rebalance by {} seconds", - configured_delay_secs - time_elapsed_in_slot - ); + let delay = quantized_rebalance_delay(configured_delay_secs); + tracing::info!("Delaying rebalance by {} seconds", delay.as_secs()); - thread::sleep(Duration::from_secs( - configured_delay_secs - time_elapsed_in_slot, - )); + thread::sleep(delay); } pub fn get_rebalance_delay_secs(consumer_group: &str) -> Option { From d9f08139116176635eccbd748320d12bc4c5fd23 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 22 Jun 2026 20:14:27 +0000 Subject: [PATCH 6/8] fix(consumer): resolve restart/stop race and Ctrl-C lock-hold Address two more review findings on the restart loop: - Replace the unconditional `restart_triggered` flag with a per-run compare-and-swap outcome (`OUTCOME_RESTART`/`OUTCOME_STOP`). The timer only signals a restart if it claims the outcome first, and the main thread claims a stop before joining the timer, so a timer firing concurrently with a non-restart shutdown (e.g. `stop_at_timestamp`) can no longer turn it into a restart. - Copy the rebalance delay out of the shared mutex and release the lock before the (potentially multi-minute) sleep in the Ctrl-C handler, so the main loop never blocks acquiring the same lock during shutdown. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01JXUY9wZfDEGZRMuBUM1AKo --- rust_snuba/src/consumer.rs | 70 +++++++++++++++++++++++++++++--------- 1 file changed, 54 insertions(+), 16 deletions(-) diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index 29f3ff2ee3e..ff82d4a2ed5 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -1,4 +1,4 @@ -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU8, Ordering}; use std::sync::{mpsc, Arc, Mutex}; use std::thread; use std::time::Duration; @@ -78,6 +78,14 @@ pub fn consumer( }) } +/// Outcome of a single consumer run inside the restart loop, used to resolve +/// the race between the periodic-restart timer firing and the run ending for +/// another reason (e.g. `stop_at_timestamp`). Whichever side records its +/// outcome first via compare-and-swap wins. +const OUTCOME_UNDECIDED: u8 = 0; +const OUTCOME_RESTART: u8 = 1; +const OUTCOME_STOP: u8 = 2; + #[allow(clippy::too_many_arguments)] pub fn consumer_impl( consumer_group: &str, @@ -166,11 +174,9 @@ pub fn consumer_impl( .quantized_rebalance_consumer_group_delay_secs; // `shutdown_requested` is set by the Ctrl-C handler to permanently stop the - // process. `restart_triggered` is set by the periodic-restart timer to ask - // the loop below to rebuild the consumer (reconnecting to Kafka) without - // exiting the process. + // process. Whether a given run should restart (vs exit) is tracked per + // iteration via an `OUTCOME_*` atomic in the loop below. let shutdown_requested = Arc::new(AtomicBool::new(false)); - let restart_triggered = Arc::new(AtomicBool::new(false)); // The Ctrl-C handler can only be installed once per process, but the // consumer (and therefore its `ProcessorHandle`) is rebuilt on every @@ -189,7 +195,11 @@ pub fn consumer_impl( let rebalance_delay_secs = rebalance_delay_secs.clone(); ctrlc::set_handler(move || { shutdown_requested.store(true, Ordering::SeqCst); - if let Some(secs) = *rebalance_delay_secs.lock().unwrap() { + // Copy the delay out and release the lock before the (potentially + // multi-minute) sleep, so the main loop never blocks waiting for + // this handler to release the lock. + let delay = *rebalance_delay_secs.lock().unwrap(); + if let Some(secs) = delay { rebalancing::delay_kafka_rebalance(secs); } if let Some(handle) = current_handle.lock().unwrap().as_mut() { @@ -376,6 +386,12 @@ pub fn consumer_impl( } first_run = false; + // Tracks whether this run ends in a restart or a stop. The timer and the + // main thread both record their outcome via compare-and-swap, so a timer + // firing concurrently with a non-restart shutdown can't turn it into a + // restart (and vice versa) — whichever records first wins. + let restart_outcome = Arc::new(AtomicU8::new(OUTCOME_UNDECIDED)); + // Periodically restart the consumer to reconnect to Kafka when the // periodic-restart feature flag is enabled for this consumer group. // After the configured interval (default 15 minutes) we signal a @@ -394,7 +410,7 @@ pub fn consumer_impl( ); let (stop_tx, stop_rx) = mpsc::channel::<()>(); let mut restart_handle = handle.clone(); - let restart_triggered = restart_triggered.clone(); + let restart_outcome = restart_outcome.clone(); let consumer_group = consumer_group.to_owned(); let join = thread::spawn(move || { // Wake up either when the interval elapses or when we are asked @@ -402,12 +418,6 @@ pub fn consumer_impl( if stop_rx.recv_timeout(interval) != Err(mpsc::RecvTimeoutError::Timeout) { return; } - tracing::info!( - consumer_group, - interval_secs = interval.as_secs(), - "Periodic restart interval elapsed; signaling shutdown to reconnect to Kafka", - ); - counter!("periodic_restart"); // Honor the same rebalance quantization the Ctrl-C handler uses // so a periodic restart does not trigger extra Kafka rebalances // for groups that also enable quantized rebalancing. We wait on @@ -420,14 +430,42 @@ pub fn consumer_impl( return; } } - restart_triggered.store(true, Ordering::SeqCst); - restart_handle.signal_shutdown(); + // Only signal a restart if the main thread hasn't already + // decided this run is stopping for another reason. + if restart_outcome + .compare_exchange( + OUTCOME_UNDECIDED, + OUTCOME_RESTART, + Ordering::SeqCst, + Ordering::SeqCst, + ) + .is_ok() + { + tracing::info!( + consumer_group, + interval_secs = interval.as_secs(), + "Periodic restart interval elapsed; signaling shutdown to reconnect to Kafka", + ); + counter!("periodic_restart"); + restart_handle.signal_shutdown(); + } }); (stop_tx, join) }); let result = processor.run(); + // Record that this run is stopping unless the timer already claimed a + // restart. Doing this before stopping the timer ensures a timer that + // fires concurrently with a non-restart shutdown won't trigger a + // restart. + let _ = restart_outcome.compare_exchange( + OUTCOME_UNDECIDED, + OUTCOME_STOP, + Ordering::SeqCst, + Ordering::SeqCst, + ); + // The run has ended: stop the restart timer (if it hasn't already fired) // and join it so we never leave an orphaned sleeping thread behind. if let Some((stop_tx, join)) = restart_timer { @@ -440,7 +478,7 @@ pub fn consumer_impl( if shutdown_requested.load(Ordering::SeqCst) { break; } - if restart_triggered.swap(false, Ordering::SeqCst) { + if restart_outcome.load(Ordering::SeqCst) == OUTCOME_RESTART { counter!("consumer_reconnect"); tracing::info!("Restarting consumer to reconnect to Kafka"); continue; From 330ef8c5e7959c75179cb4e9058056bf874cdfe4 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 22 Jun 2026 20:18:53 +0000 Subject: [PATCH 7/8] fix(rebalancing): guard quantized_rebalance_delay against a zero delay A rebalance delay of 0 (settable via runtime config or CLI) made quantized_rebalance_delay panic on a divide-by-zero in the modulo, which in the restart timer thread would fail silently. Treat 0 as "no quantization" and return Duration::ZERO, protecting all callers (the timer, the Ctrl-C handler, and the first-run join delay). Adds unit tests for the zero and normal cases. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01JXUY9wZfDEGZRMuBUM1AKo --- rust_snuba/src/rebalancing.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/rust_snuba/src/rebalancing.rs b/rust_snuba/src/rebalancing.rs index 11e293cc88f..0ebedcd4878 100644 --- a/rust_snuba/src/rebalancing.rs +++ b/rust_snuba/src/rebalancing.rs @@ -8,6 +8,13 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; /// interruptible wait (e.g. a thread that may be asked to stop early) can wait /// on this duration themselves instead of using the blocking sleep below. pub fn quantized_rebalance_delay(configured_delay_secs: u64) -> Duration { + // A delay of zero means "no quantization". Guard against it explicitly: + // it's a valid configuration, and the modulo below would otherwise divide + // by zero and panic. + if configured_delay_secs == 0 { + return Duration::ZERO; + } + let current_time = SystemTime::now(); let time_elapsed_in_slot = match current_time.duration_since(UNIX_EPOCH) { Ok(duration) => duration.as_secs(), @@ -47,6 +54,18 @@ pub fn get_rebalance_delay_secs(consumer_group: &str) -> Option { mod tests { use super::*; + #[test] + fn test_quantized_rebalance_delay_zero_does_not_panic() { + assert_eq!(quantized_rebalance_delay(0), Duration::ZERO); + } + + #[test] + fn test_quantized_rebalance_delay_is_within_the_configured_window() { + let delay = quantized_rebalance_delay(60); + assert!(delay > Duration::ZERO); + assert!(delay <= Duration::from_secs(60)); + } + #[test] fn test_delay_config() { // teardown, even when the test fails From 332962a09d43daa7ee7e5b1bef2a517db1a9ee86 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 22 Jun 2026 20:24:17 +0000 Subject: [PATCH 8/8] fix(consumer): clear shared handle after a run ends Clear `current_handle` once a processor's run returns so the Ctrl-C handler doesn't signal a defunct processor's handle while the next consumer is being built. The next iteration republishes a fresh handle before running. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01JXUY9wZfDEGZRMuBUM1AKo --- rust_snuba/src/consumer.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index ff82d4a2ed5..14eac41df2a 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -473,6 +473,11 @@ pub fn consumer_impl( let _ = join.join(); } + // This processor has stopped; clear the shared handle so the Ctrl-C + // handler doesn't signal a defunct processor while the next one is being + // built. The next iteration republishes a fresh handle before running. + *current_handle.lock().unwrap() = None; + match result { Ok(()) => { if shutdown_requested.load(Ordering::SeqCst) {