diff --git a/rust_snuba/src/auto_restart.rs b/rust_snuba/src/auto_restart.rs new file mode 100644 index 00000000000..cd4e577623f --- /dev/null +++ b/rust_snuba/src/auto_restart.rs @@ -0,0 +1,138 @@ +use std::time::Duration; + +use crate::runtime_config; + +/// Default interval between consumer restarts when periodic restart is enabled +/// but no explicit interval is configured: 15 minutes. +pub const DEFAULT_RESTART_INTERVAL_SECS: u64 = 900; + +fn is_truthy(value: &str) -> bool { + matches!(value.trim().to_ascii_lowercase().as_str(), "1" | "true") +} + +/// Returns the interval after which the consumer should gracefully shut down so +/// that it can be restarted and reconnect to Kafka, or `None` when periodic +/// restart is not enabled for `consumer_group`. +/// +/// This is gated behind a feature flag and controlled by two runtime config +/// keys (read from `snuba.state`, so they can be toggled live without a +/// redeploy): +/// +/// * `kafka_consumer_periodic_restart__` — the feature flag. +/// Periodic restart is only enabled when this is set to a truthy value +/// (`"1"` or `"true"`). Defaults to disabled. +/// * `kafka_consumer_periodic_restart_interval_secs__` — the +/// number of seconds between restarts. Must be a positive integer; defaults +/// to [`DEFAULT_RESTART_INTERVAL_SECS`] (15 minutes) when unset or invalid. +pub fn get_restart_interval(consumer_group: &str) -> Option { + let enabled = runtime_config::get_str_config( + format!("kafka_consumer_periodic_restart__{consumer_group}").as_str(), + ) + .ok() + .flatten() + .map(|value| is_truthy(&value)) + .unwrap_or(false); + + if !enabled { + return None; + } + + let interval_secs = runtime_config::get_str_config( + format!("kafka_consumer_periodic_restart_interval_secs__{consumer_group}").as_str(), + ) + .ok() + .flatten() + .and_then(|value| value.parse::().ok()) + .filter(|&secs| secs > 0) + .unwrap_or(DEFAULT_RESTART_INTERVAL_SECS); + + Some(Duration::from_secs(interval_secs)) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn clear(consumer_group: &str) { + runtime_config::patch_str_config_for_test( + &format!("kafka_consumer_periodic_restart__{consumer_group}"), + None, + ); + runtime_config::patch_str_config_for_test( + &format!("kafka_consumer_periodic_restart_interval_secs__{consumer_group}"), + None, + ); + } + + #[test] + fn test_disabled_by_default() { + crate::testutils::initialize_python(); + assert_eq!(get_restart_interval("periodic_restart_default_test"), None); + } + + #[test] + fn test_enabled_uses_default_interval() { + crate::testutils::initialize_python(); + let group = "periodic_restart_enabled_test"; + let _guard = scopeguard::guard((), |_| clear(group)); + + runtime_config::patch_str_config_for_test( + &format!("kafka_consumer_periodic_restart__{group}"), + Some("1"), + ); + assert_eq!( + get_restart_interval(group), + Some(Duration::from_secs(DEFAULT_RESTART_INTERVAL_SECS)) + ); + } + + #[test] + fn test_enabled_with_custom_interval() { + crate::testutils::initialize_python(); + let group = "periodic_restart_custom_test"; + let _guard = scopeguard::guard((), |_| clear(group)); + + runtime_config::patch_str_config_for_test( + &format!("kafka_consumer_periodic_restart__{group}"), + Some("true"), + ); + runtime_config::patch_str_config_for_test( + &format!("kafka_consumer_periodic_restart_interval_secs__{group}"), + Some("60"), + ); + assert_eq!(get_restart_interval(group), Some(Duration::from_secs(60))); + } + + #[test] + fn test_invalid_interval_falls_back_to_default() { + crate::testutils::initialize_python(); + let group = "periodic_restart_invalid_test"; + let _guard = scopeguard::guard((), |_| clear(group)); + + runtime_config::patch_str_config_for_test( + &format!("kafka_consumer_periodic_restart__{group}"), + Some("1"), + ); + runtime_config::patch_str_config_for_test( + &format!("kafka_consumer_periodic_restart_interval_secs__{group}"), + Some("garbage"), + ); + assert_eq!( + get_restart_interval(group), + Some(Duration::from_secs(DEFAULT_RESTART_INTERVAL_SECS)) + ); + } + + #[test] + fn test_interval_without_flag_is_ignored() { + crate::testutils::initialize_python(); + let group = "periodic_restart_no_flag_test"; + let _guard = scopeguard::guard((), |_| clear(group)); + + runtime_config::patch_str_config_for_test( + &format!("kafka_consumer_periodic_restart_interval_secs__{group}"), + Some("60"), + ); + assert_eq!(get_restart_interval(group), None); + } +} diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index ff3e33b52d0..14eac41df2a 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -1,4 +1,6 @@ -use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU8, Ordering}; +use std::sync::{mpsc, Arc, Mutex}; +use std::thread; use std::time::Duration; use chrono::{DateTime, Utc}; @@ -6,11 +8,12 @@ use chrono::{DateTime, Utc}; use sentry_arroyo::backends::kafka::config::KafkaConfig; use sentry_arroyo::backends::kafka::producer::KafkaProducer; use sentry_arroyo::backends::kafka::types::KafkaPayload; +use sentry_arroyo::counter; use sentry_arroyo::metrics; use sentry_arroyo::processing::dlq::{DlqLimit, DlqPolicy, KafkaDlqProducer}; use sentry_arroyo::processing::strategies::run_task_in_threads::ConcurrencyConfig; -use sentry_arroyo::processing::StreamProcessor; +use sentry_arroyo::processing::{ProcessorHandle, StreamProcessor}; use sentry_arroyo::types::Topic; use pyo3::prelude::*; @@ -18,6 +21,7 @@ use pyo3::types::PyBytes; use sentry_options::init_with_schemas; +use crate::auto_restart; use crate::config; use crate::factory_v2::ConsumerStrategyFactoryV2; use crate::logging::{setup_logging, setup_sentry}; @@ -74,6 +78,14 @@ pub fn consumer( }) } +/// Outcome of a single consumer run inside the restart loop, used to resolve +/// the race between the periodic-restart timer firing and the run ending for +/// another reason (e.g. `stop_at_timestamp`). Whichever side records its +/// outcome first via compare-and-swap wins. +const OUTCOME_UNDECIDED: u8 = 0; +const OUTCOME_RESTART: u8 = 1; +const OUTCOME_STOP: u8 = 2; + #[allow(clippy::too_many_arguments)] pub fn consumer_impl( consumer_group: &str, @@ -99,21 +111,11 @@ pub fn consumer_impl( init_with_schemas(&[("snuba", crate::SNUBA_SCHEMA)]) .expect("failed to initialize sentry-options"); + // Parse the config once up front for process-wide, one-time setup (logging, + // Sentry, metrics, etc.). The consumer itself is (re)built from a fresh + // parse inside the restart loop below, so a periodic restart can reconnect + // to Kafka without tearing the whole process down. let consumer_config = config::ConsumerConfig::load_from_str(consumer_config_raw).unwrap(); - let max_batch_size = consumer_config.max_batch_size; - let max_batch_time = Duration::from_millis(consumer_config.max_batch_time_ms); - let max_batch_size_calculation = consumer_config.max_batch_size_calculation; - - let batch_write_timeout = match batch_write_timeout_ms { - Some(timeout_ms) => { - if timeout_ms >= consumer_config.max_batch_time_ms { - Some(Duration::from_millis(timeout_ms)) - } else { - None - } - } - None => None, - }; for storage in &consumer_config.storages { tracing::info!( @@ -133,8 +135,6 @@ pub fn consumer_impl( let mut _sentry_guard = None; - let env_config = consumer_config.env.clone(); - // setup sentry if let Some(dsn) = consumer_config.env.sentry_dsn { tracing::debug!(sentry_dsn = dsn); @@ -164,162 +164,343 @@ pub fn consumer_impl( procspawn::init(); } - let first_storage = consumer_config.storages[0].clone(); - - tracing::info!( - storage = first_storage.name, - "Starting consumer for {:?}", - first_storage.name, - ); - - let config = KafkaConfig::new_consumer_config( - vec![], - consumer_group.to_owned(), - auto_offset_reset.parse().expect( - "Invalid value for `auto_offset_reset`. Valid values: `error`, `earliest`, `latest`", - ), - !no_strict_offset_reset, - max_poll_interval_ms, - Some(consumer_config.raw_topic.broker_config), - ); - - let logical_topic_name = consumer_config.raw_topic.logical_topic_name; - - // XXX: this variable must live for the lifetime of the entire consumer. we should do something - // to ensure this statically, such as use actual Rust lifetimes or ensuring the runtime stays - // alive by storing it inside of the DlqPolicy - let dlq_concurrency_config = ConcurrencyConfig::new(10); - - // DLQ policy applies only if we are not skipping writes, otherwise we don't want to be - // writing to the DLQ topics in prod. - - let blq_producer_config = consumer_config.dlq_topic.as_ref().map(|dlq_topic_config| { - let mut overrides = dlq_topic_config.broker_config.clone(); - overrides.insert("message.max.bytes".to_string(), "10000000".to_string()); // 10 MB, broker max - KafkaConfig::new_producer_config(vec![], Some(overrides)) - }); - - let dlq_topic = consumer_config - .dlq_topic - .as_ref() - .map(|dlq_topic_config| Topic::new(&dlq_topic_config.physical_topic_name)); - - let dlq_policy = consumer_config.dlq_topic.map(|dlq_topic_config| { - let producer = KafkaProducer::new(KafkaConfig::new_producer_config( - vec![], - Some(dlq_topic_config.broker_config), - )); - - let kafka_dlq_producer = Box::new(KafkaDlqProducer::new( - producer, - Topic::new(&dlq_topic_config.physical_topic_name), - )); - - let handle = dlq_concurrency_config.handle(); - DlqPolicy::new( - handle, - kafka_dlq_producer, - DlqLimit { - max_invalid_ratio: None, - max_consecutive_count: None, - }, - max_dlq_buffer_length, - ) - }); - - let commit_log_producer = if let Some(topic_config) = consumer_config.commit_log_topic { - let producer_config = - KafkaConfig::new_producer_config(vec![], Some(topic_config.broker_config)); - let producer = KafkaProducer::new(producer_config); - Some(( - Arc::new(producer), - Topic::new(&topic_config.physical_topic_name), - )) - } else { - None - }; - - let replacements_config = if let Some(topic_config) = consumer_config.replacements_topic { - let producer_config = - KafkaConfig::new_producer_config(vec![], Some(topic_config.broker_config)); - Some(( - producer_config, - Topic::new(&topic_config.physical_topic_name), - )) - } else { - None - }; - - let topic = Topic::new(&consumer_config.raw_topic.physical_topic_name); - - let mut rebalance_delay_secs = consumer_config + // The "quantized rebalance" delay may be set statically in the consumer + // config and/or overridden live via runtime config. We capture the static + // value once (it comes from the immutable startup config) and refresh the + // runtime override on every restart in the loop below, so live updates take + // effect without a full process restart. + let static_rebalance_delay_secs = consumer_config .raw_topic .quantized_rebalance_consumer_group_delay_secs; - let config_rebalance_delay_secs = rebalancing::get_rebalance_delay_secs(consumer_group); - if let Some(secs) = config_rebalance_delay_secs { - rebalance_delay_secs = Some(secs); - } - if let Some(secs) = rebalance_delay_secs { - rebalancing::delay_kafka_rebalance(secs) + + // `shutdown_requested` is set by the Ctrl-C handler to permanently stop the + // process. Whether a given run should restart (vs exit) is tracked per + // iteration via an `OUTCOME_*` atomic in the loop below. + let shutdown_requested = Arc::new(AtomicBool::new(false)); + + // The Ctrl-C handler can only be installed once per process, but the + // consumer (and therefore its `ProcessorHandle`) is rebuilt on every + // restart. We share the live handle through a mutex so the handler always + // signals the consumer that is currently running. The current rebalance + // delay is shared the same way so the handler quantizes its "leave" using + // the latest configured value. + let current_handle: Arc>> = Arc::new(Mutex::new(None)); + let rebalance_delay_secs = Arc::new(Mutex::new( + rebalancing::get_rebalance_delay_secs(consumer_group).or(static_rebalance_delay_secs), + )); + + { + let shutdown_requested = shutdown_requested.clone(); + let current_handle = current_handle.clone(); + let rebalance_delay_secs = rebalance_delay_secs.clone(); + ctrlc::set_handler(move || { + shutdown_requested.store(true, Ordering::SeqCst); + // Copy the delay out and release the lock before the (potentially + // multi-minute) sleep, so the main loop never blocks waiting for + // this handler to release the lock. + let delay = *rebalance_delay_secs.lock().unwrap(); + if let Some(secs) = delay { + rebalancing::delay_kafka_rebalance(secs); + } + if let Some(handle) = current_handle.lock().unwrap().as_mut() { + handle.signal_shutdown(); + } + }) + .expect("Error setting Ctrl-C handler"); } - let factory = ConsumerStrategyFactoryV2 { - storage_config: first_storage, - env_config, - logical_topic_name, - max_batch_size, - max_batch_time, - max_batch_size_calculation, - processing_concurrency: ConcurrencyConfig::new(concurrency), - clickhouse_concurrency: ConcurrencyConfig::new(clickhouse_concurrency), - commitlog_concurrency: ConcurrencyConfig::new(2), - replacements_concurrency: ConcurrencyConfig::new(4), - async_inserts, - python_max_queue_depth, - use_rust_processor, - health_check_file: health_check_file.map(ToOwned::to_owned), - enforce_schema, - commit_log_producer, - replacements_config, - physical_consumer_group: consumer_group.to_owned(), - physical_topic_name: Topic::new(&consumer_config.raw_topic.physical_topic_name), - accountant_topic_config: consumer_config.accountant_topic, - stop_at_timestamp, - batch_write_timeout, - join_timeout_ms, - health_check: health_check.to_string(), - use_row_binary, - blq_producer_config: blq_producer_config.clone(), - blq_topic: dlq_topic, - }; + // Restart loop. Each iteration builds a fresh consumer (reconnecting to + // Kafka) and runs it until it shuts down. A shutdown caused by the periodic + // restart timer loops back around to rebuild the consumer; any other + // graceful shutdown (Ctrl-C, `stop_at_timestamp`, ...) exits the loop. + let mut first_run = true; + loop { + // Refresh the rebalance delay from runtime config so live updates take + // effect on each restart, and publish it for the Ctrl-C handler. + let current_rebalance_delay_secs = + rebalancing::get_rebalance_delay_secs(consumer_group).or(static_rebalance_delay_secs); + *rebalance_delay_secs.lock().unwrap() = current_rebalance_delay_secs; + + let consumer_config = config::ConsumerConfig::load_from_str(consumer_config_raw).unwrap(); + let max_batch_size = consumer_config.max_batch_size; + let max_batch_time = Duration::from_millis(consumer_config.max_batch_time_ms); + let max_batch_size_calculation = consumer_config.max_batch_size_calculation; + + let batch_write_timeout = match batch_write_timeout_ms { + Some(timeout_ms) => { + if timeout_ms >= consumer_config.max_batch_time_ms { + Some(Duration::from_millis(timeout_ms)) + } else { + None + } + } + None => None, + }; - let processor = StreamProcessor::with_kafka(config, factory, topic, dlq_policy); + let env_config = consumer_config.env.clone(); + let first_storage = consumer_config.storages[0].clone(); - let mut handle = processor.get_handle(); + tracing::info!( + storage = first_storage.name, + "Starting consumer for {:?}", + first_storage.name, + ); - match rebalance_delay_secs { - Some(secs) => { - ctrlc::set_handler(move || { + let config = KafkaConfig::new_consumer_config( + vec![], + consumer_group.to_owned(), + auto_offset_reset.parse().expect( + "Invalid value for `auto_offset_reset`. Valid values: `error`, `earliest`, `latest`", + ), + !no_strict_offset_reset, + max_poll_interval_ms, + Some(consumer_config.raw_topic.broker_config), + ); + + let logical_topic_name = consumer_config.raw_topic.logical_topic_name; + + // XXX: this variable must live for the lifetime of the entire consumer. we should do something + // to ensure this statically, such as use actual Rust lifetimes or ensuring the runtime stays + // alive by storing it inside of the DlqPolicy + let dlq_concurrency_config = ConcurrencyConfig::new(10); + + // DLQ policy applies only if we are not skipping writes, otherwise we don't want to be + // writing to the DLQ topics in prod. + + let blq_producer_config = consumer_config.dlq_topic.as_ref().map(|dlq_topic_config| { + let mut overrides = dlq_topic_config.broker_config.clone(); + overrides.insert("message.max.bytes".to_string(), "10000000".to_string()); // 10 MB, broker max + KafkaConfig::new_producer_config(vec![], Some(overrides)) + }); + + let dlq_topic = consumer_config + .dlq_topic + .as_ref() + .map(|dlq_topic_config| Topic::new(&dlq_topic_config.physical_topic_name)); + + let dlq_policy = consumer_config.dlq_topic.map(|dlq_topic_config| { + let producer = KafkaProducer::new(KafkaConfig::new_producer_config( + vec![], + Some(dlq_topic_config.broker_config), + )); + + let kafka_dlq_producer = Box::new(KafkaDlqProducer::new( + producer, + Topic::new(&dlq_topic_config.physical_topic_name), + )); + + let handle = dlq_concurrency_config.handle(); + DlqPolicy::new( + handle, + kafka_dlq_producer, + DlqLimit { + max_invalid_ratio: None, + max_consecutive_count: None, + }, + max_dlq_buffer_length, + ) + }); + + let commit_log_producer = if let Some(topic_config) = consumer_config.commit_log_topic { + let producer_config = + KafkaConfig::new_producer_config(vec![], Some(topic_config.broker_config)); + let producer = KafkaProducer::new(producer_config); + Some(( + Arc::new(producer), + Topic::new(&topic_config.physical_topic_name), + )) + } else { + None + }; + + let replacements_config = if let Some(topic_config) = consumer_config.replacements_topic { + let producer_config = + KafkaConfig::new_producer_config(vec![], Some(topic_config.broker_config)); + Some(( + producer_config, + Topic::new(&topic_config.physical_topic_name), + )) + } else { + None + }; + + let topic = Topic::new(&consumer_config.raw_topic.physical_topic_name); + + let factory = ConsumerStrategyFactoryV2 { + storage_config: first_storage, + env_config, + logical_topic_name, + max_batch_size, + max_batch_time, + max_batch_size_calculation, + processing_concurrency: ConcurrencyConfig::new(concurrency), + clickhouse_concurrency: ConcurrencyConfig::new(clickhouse_concurrency), + commitlog_concurrency: ConcurrencyConfig::new(2), + replacements_concurrency: ConcurrencyConfig::new(4), + async_inserts, + python_max_queue_depth, + use_rust_processor, + health_check_file: health_check_file.map(ToOwned::to_owned), + enforce_schema, + commit_log_producer, + replacements_config, + physical_consumer_group: consumer_group.to_owned(), + physical_topic_name: Topic::new(&consumer_config.raw_topic.physical_topic_name), + accountant_topic_config: consumer_config.accountant_topic, + stop_at_timestamp, + batch_write_timeout, + join_timeout_ms, + health_check: health_check.to_string(), + use_row_binary, + blq_producer_config: blq_producer_config.clone(), + blq_topic: dlq_topic, + }; + + let processor = StreamProcessor::with_kafka(config, factory, topic, dlq_policy); + + // Publish this consumer's handle so the Ctrl-C handler signals the live + // consumer. + let handle = processor.get_handle(); + *current_handle.lock().unwrap() = Some(handle.clone()); + + // If a shutdown was requested before we managed to (re)build the + // consumer, stop now instead of starting it. + if shutdown_requested.load(Ordering::SeqCst) { + break; + } + + // Quantize the initial join when configured, matching the original + // startup behaviour. We only do this on the first run: on a periodic + // restart the timer below already quantized the corresponding "leave", + // so delaying the rejoin too would keep the consumer offline for two + // delay periods instead of one. + if first_run { + if let Some(secs) = current_rebalance_delay_secs { rebalancing::delay_kafka_rebalance(secs); - handle.signal_shutdown(); - }) - .expect("Error setting Ctrl-C handler"); + } + // A shutdown may have arrived during the delay; stop now instead of + // starting the consumer and spawning a needless restart timer. + if shutdown_requested.load(Ordering::SeqCst) { + break; + } } - None => { - ctrlc::set_handler(move || { - handle.signal_shutdown(); - }) - .expect("Error setting Ctrl-C handler"); + first_run = false; + + // Tracks whether this run ends in a restart or a stop. The timer and the + // main thread both record their outcome via compare-and-swap, so a timer + // firing concurrently with a non-restart shutdown can't turn it into a + // restart (and vice versa) — whichever records first wins. + let restart_outcome = Arc::new(AtomicU8::new(OUTCOME_UNDECIDED)); + + // Periodically restart the consumer to reconnect to Kafka when the + // periodic-restart feature flag is enabled for this consumer group. + // After the configured interval (default 15 minutes) we signal a + // graceful shutdown; the restart loop then rebuilds the consumer, which + // re-establishes its Kafka connection. See `crate::auto_restart` for the + // controlling runtime config keys. + // + // The timer waits on a channel rather than a bare sleep so that, when + // the run ends for any reason, we can stop and join it below instead of + // leaking a sleeping thread. + let restart_timer = auto_restart::get_restart_interval(consumer_group).map(|interval| { + tracing::info!( + consumer_group, + interval_secs = interval.as_secs(), + "Periodic Kafka reconnect enabled; consumer will restart after the configured interval", + ); + let (stop_tx, stop_rx) = mpsc::channel::<()>(); + let mut restart_handle = handle.clone(); + let restart_outcome = restart_outcome.clone(); + let consumer_group = consumer_group.to_owned(); + let join = thread::spawn(move || { + // Wake up either when the interval elapses or when we are asked + // to stop (the run ended for another reason). + if stop_rx.recv_timeout(interval) != Err(mpsc::RecvTimeoutError::Timeout) { + return; + } + // Honor the same rebalance quantization the Ctrl-C handler uses + // so a periodic restart does not trigger extra Kafka rebalances + // for groups that also enable quantized rebalancing. We wait on + // the stop channel rather than sleeping, so that if the run ends + // for another reason during this delay we can bail immediately + // instead of blocking the join below. + if let Some(secs) = current_rebalance_delay_secs { + let delay = rebalancing::quantized_rebalance_delay(secs); + if stop_rx.recv_timeout(delay) != Err(mpsc::RecvTimeoutError::Timeout) { + return; + } + } + // Only signal a restart if the main thread hasn't already + // decided this run is stopping for another reason. + if restart_outcome + .compare_exchange( + OUTCOME_UNDECIDED, + OUTCOME_RESTART, + Ordering::SeqCst, + Ordering::SeqCst, + ) + .is_ok() + { + tracing::info!( + consumer_group, + interval_secs = interval.as_secs(), + "Periodic restart interval elapsed; signaling shutdown to reconnect to Kafka", + ); + counter!("periodic_restart"); + restart_handle.signal_shutdown(); + } + }); + (stop_tx, join) + }); + + let result = processor.run(); + + // Record that this run is stopping unless the timer already claimed a + // restart. Doing this before stopping the timer ensures a timer that + // fires concurrently with a non-restart shutdown won't trigger a + // restart. + let _ = restart_outcome.compare_exchange( + OUTCOME_UNDECIDED, + OUTCOME_STOP, + Ordering::SeqCst, + Ordering::SeqCst, + ); + + // The run has ended: stop the restart timer (if it hasn't already fired) + // and join it so we never leave an orphaned sleeping thread behind. + if let Some((stop_tx, join)) = restart_timer { + drop(stop_tx); + let _ = join.join(); } - } - if let Err(error) = processor.run() { - let error: &dyn std::error::Error = &error; - tracing::error!("{:?}", error); - 1 - } else { - 0 + // This processor has stopped; clear the shared handle so the Ctrl-C + // handler doesn't signal a defunct processor while the next one is being + // built. The next iteration republishes a fresh handle before running. + *current_handle.lock().unwrap() = None; + + match result { + Ok(()) => { + if shutdown_requested.load(Ordering::SeqCst) { + break; + } + if restart_outcome.load(Ordering::SeqCst) == OUTCOME_RESTART { + counter!("consumer_reconnect"); + tracing::info!("Restarting consumer to reconnect to Kafka"); + continue; + } + // Graceful termination that was not a periodic restart (e.g. + // `stop_at_timestamp`): exit the loop. + break; + } + Err(error) => { + let error: &dyn std::error::Error = &error; + tracing::error!("{:?}", error); + return 1; + } + } } + + 0 } mod exceptions { diff --git a/rust_snuba/src/lib.rs b/rust_snuba/src/lib.rs index 7fd2bedde6f..6784c6746a5 100644 --- a/rust_snuba/src/lib.rs +++ b/rust_snuba/src/lib.rs @@ -2,6 +2,7 @@ pub(crate) const SNUBA_SCHEMA: &str = include_str!("../../sentry-options/schemas/snuba/schema.json"); mod accepted_outcomes_consumer; +mod auto_restart; mod config; mod consumer; mod factory_v2; diff --git a/rust_snuba/src/rebalancing.rs b/rust_snuba/src/rebalancing.rs index 9be9d840f6a..0ebedcd4878 100644 --- a/rust_snuba/src/rebalancing.rs +++ b/rust_snuba/src/rebalancing.rs @@ -2,6 +2,28 @@ use crate::runtime_config; use std::thread; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +/// Computes how long to wait so that the next rebalance step lands on a +/// `configured_delay_secs` "tick". See [`delay_kafka_rebalance`] for why we +/// quantize rebalances. Exposed separately so callers that need an +/// interruptible wait (e.g. a thread that may be asked to stop early) can wait +/// on this duration themselves instead of using the blocking sleep below. +pub fn quantized_rebalance_delay(configured_delay_secs: u64) -> Duration { + // A delay of zero means "no quantization". Guard against it explicitly: + // it's a valid configuration, and the modulo below would otherwise divide + // by zero and panic. + if configured_delay_secs == 0 { + return Duration::ZERO; + } + + let current_time = SystemTime::now(); + let time_elapsed_in_slot = match current_time.duration_since(UNIX_EPOCH) { + Ok(duration) => duration.as_secs(), + Err(_) => 0, + } % configured_delay_secs; + + Duration::from_secs(configured_delay_secs - time_elapsed_in_slot) +} + pub fn delay_kafka_rebalance(configured_delay_secs: u64) { /* * Introduces a configurable delay to the consumer topic @@ -13,19 +35,10 @@ pub fn delay_kafka_rebalance(configured_delay_secs: u64) { * fewer "stop the world rebalancing" occurrences and more time * for the consumer group to stabilize and make progress. */ - let current_time = SystemTime::now(); - let time_elapsed_in_slot = match current_time.duration_since(UNIX_EPOCH) { - Ok(duration) => duration.as_secs(), - Err(_) => 0, - } % configured_delay_secs; - tracing::info!( - "Delaying rebalance by {} seconds", - configured_delay_secs - time_elapsed_in_slot - ); + let delay = quantized_rebalance_delay(configured_delay_secs); + tracing::info!("Delaying rebalance by {} seconds", delay.as_secs()); - thread::sleep(Duration::from_secs( - configured_delay_secs - time_elapsed_in_slot, - )); + thread::sleep(delay); } pub fn get_rebalance_delay_secs(consumer_group: &str) -> Option { @@ -41,6 +54,18 @@ pub fn get_rebalance_delay_secs(consumer_group: &str) -> Option { mod tests { use super::*; + #[test] + fn test_quantized_rebalance_delay_zero_does_not_panic() { + assert_eq!(quantized_rebalance_delay(0), Duration::ZERO); + } + + #[test] + fn test_quantized_rebalance_delay_is_within_the_configured_window() { + let delay = quantized_rebalance_delay(60); + assert!(delay > Duration::ZERO); + assert!(delay <= Duration::from_secs(60)); + } + #[test] fn test_delay_config() { // teardown, even when the test fails