diff --git a/migrations/postgres/0004_add_topic_age_contention.sql b/migrations/postgres/0004_add_topic_age_contention.sql new file mode 100644 index 00000000..3eabcbb1 --- /dev/null +++ b/migrations/postgres/0004_add_topic_age_contention.sql @@ -0,0 +1,13 @@ +ALTER TABLE inflight_taskactivations ADD COLUMN topic TEXT NOT NULL DEFAULT ''; + +-- Contention filter for the upkeep queries, which filter by status + +-- (topic,partition) and have no ORDER BY, so they bitmap-scan this index. +-- Replaces idx_activation_partition (partition), which a followup drops. +CREATE INDEX idx_topic_partition + ON inflight_taskactivations (topic, partition); + +-- Age-based drain branch (added_at < threshold). +-- SQLite has an equivalent in its (status, added_at, ...) index, but we want +-- to keep this index separate from status as added_at is immutable. +CREATE INDEX idx_added_at + ON inflight_taskactivations (added_at); diff --git a/src/config/mod.rs b/src/config/mod.rs index e5e8a414..55efb5e8 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -29,11 +29,15 @@ pub mod validate; use deprecated::DeprecatedConfig; use kafka::{ClusterConfig, TopicConfig}; use raw::RawModeConfig; -use store::DatabaseAdapter; /// Used to identify whether a configuration option was provided by a user, or whether it is a default. const DEFAULT_CONFIG_PROVIDER: &str = "TaskbrokerConfig"; +/// The default Kafka topic used when none is configured (the historical +/// zero-config `taskworker` default). Also used as the default `topic` for +/// activations that aren't built from a Kafka message (e.g. in tests). +pub const DEFAULT_TOPIC: &str = "taskworker"; + /// How the taskbroker delivers tasks to workers. #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Deserialize, Serialize)] #[serde(rename_all = "lowercase")] @@ -403,7 +407,6 @@ impl Config { pub(crate) fn normalize_and_validate(&mut self) -> Result<()> { const DEFAULT_CLUSTER: &str = "default"; const DEADLETTER_CLUSTER: &str = "deadletter"; - const DEFAULT_TOPIC: &str = "taskworker"; const DEFAULT_CLUSTER_ADDRESS: &str = "127.0.0.1:9092"; const DEFAULT_CONSUMER_GROUP: &str = "taskworker"; @@ -630,23 +633,6 @@ impl Config { // Validate at least one consumable topic. let consumable = self.consumable_topics()?; - // Multi-topic consumption is only supported on the sqlite adapter for - // now. The postgres adapter filters claims by a single shared partition - // list, but those partition numbers aren't unique across topics, so the - // filter would mix partitions from different topics together. Note this - // filtering exists only to avoid lock contention between brokers, not for - // correctness; supporting multi-topic on postgres means reworking how we - // avoid that contention (e.g. filtering by (topic, partition) or another - // mechanism entirely). Reject the combination here, before any consumer - // spawns. - if consumable.len() > 1 && self.store.adapter == DatabaseAdapter::Postgres { - return Err(anyhow!( - "multi-topic consumption ({} consumable topics) is not supported with the \ - postgres database adapter; use the sqlite adapter or a single consumable topic", - consumable.len() - )); - } - // The deadletter topic must be a declared topic so the producer can // resolve its cluster. In legacy mode it was added above; in the new // format the user must declare it (produce-only) in kafka_topics. @@ -909,7 +895,8 @@ mod tests { use crate::logging::LogFormat; use crate::{Args, Run}; - use super::{Config, DatabaseAdapter, DeliveryMode}; + use super::store::DatabaseAdapter; + use super::{Config, DeliveryMode}; #[test] fn test_default() { @@ -1701,57 +1688,6 @@ kafka_clusters: }); } - #[test] - fn test_multi_topic_rejected_on_postgres() { - Jail::expect_with(|jail| { - // Multiple consumable topics are allowed on sqlite but rejected on - // postgres, whose claim filtering can't distinguish partitions - // across topics. - jail.create_file( - "config.yaml", - r#" -database_adapter: postgres -kafka_deadletter_topic: tasks-dlq -kafka_retry_topic: tasks-retry - -kafka_topics: - profiles: - cluster: my-cluster - consumer_group: taskbroker-profiles - subscriptions: - cluster: my-cluster - consumer_group: taskbroker-subscriptions - tasks-retry: - cluster: my-cluster - consumer_group: taskbroker-retry - produce_only: true - tasks-dlq: - cluster: my-cluster - consumer_group: taskbroker-dlq - produce_only: true - -kafka_clusters: - my-cluster: - address: 10.0.0.1:9092 -"#, - )?; - - let args = Args { - run: Run::Broker, - config: Some("config.yaml".to_owned()), - }; - let err = Config::from_args(&args).unwrap_err(); - assert!( - err.to_string() - .contains("not supported with the postgres database adapter"), - "unexpected error: {}", - err - ); - - Ok(()) - }); - } - #[test] fn test_multi_topic_allows_one_consumable_with_produce_only() { Jail::expect_with(|jail| { diff --git a/src/config/store.rs b/src/config/store.rs index 30ce9c5f..bef22143 100644 --- a/src/config/store.rs +++ b/src/config/store.rs @@ -181,6 +181,12 @@ pub struct StoreConfig { /// are extended by. This helps reduce broker deadline resets when /// brokers are under load, or there are small networking delays. pub processing_deadline_grace_sec: u64, + + /// Age-based contention drain threshold in seconds (postgres only). + /// Activations older than this bypass the (topic, partition) contention + /// filter so any broker can drain orphaned rows left by rebalances or + /// topic/partition moves. + pub contention_drain_age_sec: u64, } impl Default for StoreConfig { @@ -200,6 +206,7 @@ impl Default for StoreConfig { max_processing_count: 2048, max_processing_attempts: 5, processing_deadline_grace_sec: 3, + contention_drain_age_sec: 60, } } } diff --git a/src/fetch/tests.rs b/src/fetch/tests.rs index ec4e6457..c8aa11cc 100644 --- a/src/fetch/tests.rs +++ b/src/fetch/tests.rs @@ -10,7 +10,7 @@ use crate::config::Config; use crate::config::fetch::FetchConfig; use crate::store::activation::{Activation, ActivationStatus}; use crate::store::traits::ActivationStore; -use crate::store::types::{BucketRange, FailedTasksForwarder}; +use crate::store::types::{BucketRange, FailedTasksForwarder, TopicPartition}; use crate::test_utils::make_activations; use super::*; @@ -49,9 +49,9 @@ impl MockStore { #[async_trait] impl ActivationStore for MockStore { - fn assign_partitions(&self, _partitions: Vec) -> Result<(), Error> { - Ok(()) - } + fn assign_partitions(&self, _partitions: &mut dyn Iterator) {} + + fn revoke_partitions(&self, _partitions: &mut dyn Iterator) {} async fn vacuum_db(&self) -> Result<(), Error> { unimplemented!() diff --git a/src/kafka/consumer.rs b/src/kafka/consumer.rs index 1548ef0c..6ea4350d 100644 --- a/src/kafka/consumer.rs +++ b/src/kafka/consumer.rs @@ -29,6 +29,7 @@ use futures::{Stream, StreamExt, future, pin_mut}; use tracing::{debug, error, info, instrument, warn}; use crate::store::traits::ActivationStore; +use crate::store::types::TopicPartition; pub async fn start_consumer( topics: &[&str], @@ -41,10 +42,10 @@ pub async fn start_consumer( ) -> Result<(), Error> { let (client_shutdown_sender, client_shutdown_receiver) = oneshot::channel(); let (event_sender, event_receiver) = unbounded_channel(); - // Each consumer subscribes to a single topic; join defensively in case that - // ever changes. Used as the `topic` tag on this consumer's metrics. - let topic = topics.join(","); - let context = KafkaContext::new(event_sender.clone(), topic.clone()); + // Metrics tag only. Ownership keys come per-topic from the assignment `tpl`, + // never from this string — so a multi-topic consumer stays correct. + let topics_tag = topics.join(","); + let context = KafkaContext::new(event_sender.clone(), topics_tag.clone()); let consumer: Arc> = Arc::new( kafka_client_config .create_with_context(context) @@ -57,14 +58,14 @@ pub async fn start_consumer( handle_shutdown_signals(event_sender.clone()); poll_consumer_client(consumer.clone(), client_shutdown_receiver); - metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topic.clone()).set(0); + metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topics_tag.clone()).set(0); handle_events( consumer, event_receiver, activation_store, client_shutdown_sender, spawn_actors, - topic, + topics_tag, ) .await } @@ -114,14 +115,14 @@ pub fn poll_consumer_client( pub struct KafkaContext { event_sender: UnboundedSender<(Event, SyncSender<()>)>, /// The topic(s) this consumer is subscribed to, used as a metric tag. - topic: String, + topics_tag: String, } impl KafkaContext { - pub fn new(event_sender: UnboundedSender<(Event, SyncSender<()>)>, topic: String) -> Self { + pub fn new(event_sender: UnboundedSender<(Event, SyncSender<()>)>, topics_tag: String) -> Self { Self { event_sender, - topic, + topics_tag, } } } @@ -151,7 +152,7 @@ impl ConsumerContext for KafkaContext { info!("Rendezvous complete"); metrics::counter!( "arroyo.consumer.partitions_assigned.count", - "topic" => self.topic.clone(), + "topic" => self.topics_tag.clone(), ) .increment(tpl.count() as u64); } @@ -173,7 +174,7 @@ impl ConsumerContext for KafkaContext { info!("Rendezvous complete"); metrics::counter!( "arroyo.consumer.partitions_revoked.count", - "topic" => self.topic.clone(), + "topic" => self.topics_tag.clone(), ) .increment(tpl.count() as u64); } @@ -352,7 +353,7 @@ pub async fn handle_events( Arc>, &BTreeSet<(String, i32)>, ) -> ActorHandles, - topic: String, + topics_tag: String, ) -> Result<(), anyhow::Error> { const CALLBACK_DURATION: Duration = Duration::from_secs(4); @@ -379,14 +380,9 @@ pub async fn handle_events( info!("Received event: {:?}", event); state = match (state, event) { (ConsumerState::Ready, Event::Assign(tpl)) => { - metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topic.clone()) + metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topics_tag.clone()) .set(tpl.len() as f64); - // Note: This assumes we only process one topic per consumer. - let mut partitions = Vec::::new(); - for (_, partition) in tpl.iter() { - partitions.push(*partition); - } - activation_store.assign_partitions(partitions).unwrap(); + activation_store.assign_partitions(&mut tpl.iter().map(TopicPartition::from)); ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl) } (ConsumerState::Ready, Event::Revoke(_)) => { @@ -401,17 +397,17 @@ pub async fn handle_events( tpl == revoked, "Revoked TPL should be equal to the subset of TPL we're consuming from" ); - activation_store.assign_partitions(vec![]).unwrap(); + activation_store.revoke_partitions(&mut revoked.iter().map(TopicPartition::from)); handles.shutdown(CALLBACK_DURATION).await; - metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topic.clone()).set(0); + metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topics_tag.clone()).set(0); ConsumerState::Ready } - (ConsumerState::Consuming(handles, _), Event::Shutdown) => { - activation_store.assign_partitions(vec![]).unwrap(); + (ConsumerState::Consuming(handles, tpl), Event::Shutdown) => { + activation_store.revoke_partitions(&mut tpl.iter().map(TopicPartition::from)); handles.shutdown(CALLBACK_DURATION).await; debug!("Signaling shutdown to client..."); shutdown_client.take(); - metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topic.clone()).set(0); + metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topics_tag.clone()).set(0); ConsumerState::Stopped } (ConsumerState::Stopped, _) => { diff --git a/src/kafka/deserialize_activation.rs b/src/kafka/deserialize_activation.rs index 11b3bf97..8ccb88a9 100644 --- a/src/kafka/deserialize_activation.rs +++ b/src/kafka/deserialize_activation.rs @@ -96,6 +96,7 @@ pub fn new( id: activation.id.clone(), activation: payload.to_vec(), status, + topic: msg.topic().to_owned(), partition: msg.partition(), offset: msg.offset(), added_at: Utc::now(), diff --git a/src/kafka/deserialize_raw.rs b/src/kafka/deserialize_raw.rs index 0eadbb82..cd62cc70 100644 --- a/src/kafka/deserialize_raw.rs +++ b/src/kafka/deserialize_raw.rs @@ -192,6 +192,7 @@ pub fn new(config: RawConfig) -> impl Fn(&OwnedMessage) -> Result) -> Result<()> { - Ok(()) - } + fn assign_partitions(&self, _partitions: &mut dyn Iterator) {} + + fn revoke_partitions(&self, _partitions: &mut dyn Iterator) {} async fn claim_activations( &self, diff --git a/src/store/activation.rs b/src/store/activation.rs index 97ba171c..c15bdbef 100644 --- a/src/store/activation.rs +++ b/src/store/activation.rs @@ -104,6 +104,12 @@ pub struct Activation { #[builder(default = ActivationStatus::Pending)] pub status: ActivationStatus, + /// The Kafka topic the activation was received from. Paired with `partition` + /// to scope contention filtering, since partition indices overlap across + /// topics. Defaults to [`crate::config::DEFAULT_TOPIC`] in tests. + #[builder(setter(into), default = crate::config::DEFAULT_TOPIC.to_owned())] + pub topic: String, + /// The partition the activation was received from #[builder(default = 0)] pub partition: i32, diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index 8ac21131..c71513a1 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -1,5 +1,5 @@ use std::borrow::Cow; -use std::collections::HashMap; +use std::collections::{BTreeSet, HashMap}; use std::str::FromStr; use std::sync::RwLock; use std::time::Instant; @@ -23,7 +23,7 @@ use crate::push::compute_claim_duration_ms; use crate::store::activation::{Activation, ActivationStatus}; use crate::store::retry::retry_query; use crate::store::traits::ActivationStore; -use crate::store::types::{BucketRange, DepthCounts, FailedTasksForwarder}; +use crate::store::types::{BucketRange, DepthCounts, FailedTasksForwarder, TopicPartition}; /// Run migrations. pub async fn migrate(config: &StoreConfig) -> Result<()> { @@ -100,6 +100,7 @@ pub async fn migrate(config: &StoreConfig) -> Result<()> { struct TableRow<'a> { pub id: Cow<'a, str>, pub activation: Cow<'a, [u8]>, + pub topic: Cow<'a, str>, pub partition: i32, pub offset: i64, pub added_at: DateTime, @@ -124,6 +125,7 @@ impl<'a> From<&'a Activation> for TableRow<'a> { Self { id: Cow::Borrowed(&value.id), activation: Cow::Borrowed(&value.activation), + topic: Cow::Borrowed(&value.topic), partition: value.partition, offset: value.offset, added_at: value.added_at, @@ -153,6 +155,7 @@ impl From> for Activation { id: value.id.into_owned(), activation: value.activation.into_owned(), status: ActivationStatus::from_str(&value.status).unwrap(), + topic: value.topic.into_owned(), partition: value.partition, offset: value.offset, added_at: value.added_at, @@ -181,6 +184,7 @@ impl FromRow<'_, PgRow> for TableRow<'static> { Ok(Self { id: Cow::Owned(row.try_get::("id")?), activation: Cow::Owned(row.try_get::, _>("activation")?), + topic: Cow::Owned(row.try_get::("topic")?), partition: row.try_get("partition")?, offset: row.try_get("offset")?, added_at: row.try_get("added_at")?, @@ -244,7 +248,9 @@ pub struct PostgresStore { read_pool: PgPool, write_pool: PgPool, config: StoreConfig, - partitions: RwLock>, + /// (topic, partition) pairs assigned to this broker. Contention queries + /// filter by these pairs across all assigned topics. + partitions: RwLock>, claim_duration_ms: u64, } @@ -296,29 +302,50 @@ impl PostgresStore { read_pool, write_pool, config: config.store.clone(), - partitions: RwLock::new(vec![]), + partitions: RwLock::new(BTreeSet::new()), claim_duration_ms: compute_claim_duration_ms(config), }) } - /// Add the partition condition to the query builder in a thread-safe manner + /// Restrict a query to the (topic, partition) pairs this broker owns. + /// + /// rows older than `contention_drain_age_sec` bypass the filter so any broker can drain + /// orphaned activations (left by rebalances or topic/partition moves). This only affects + /// contention, not correctness. + /// + /// With no partitions assigned (e.g. before the first rebalance) the query is + /// left unfiltered. fn add_partition_condition( &self, query_builder: &mut QueryBuilder, first_condition: bool, ) { let partitions = self.partitions.read().unwrap(); + if partitions.is_empty() { + return; + } + let condition = if first_condition { "WHERE" } else { "AND" }; - if !partitions.is_empty() { - query_builder.push(" "); - query_builder.push(condition); - query_builder.push(" partition IN ("); - let mut separated = query_builder.separated(", "); - for partition in partitions.iter() { - separated.push_bind(*partition); + query_builder.push(" "); + query_builder.push(condition); + query_builder.push(" ((topic, partition) IN ("); + let mut first = true; + for tp in partitions.iter() { + if !first { + query_builder.push(", "); } + first = false; + query_builder.push("("); + query_builder.push_bind(tp.topic.clone()); + query_builder.push(", "); + query_builder.push_bind(tp.partition); query_builder.push(")"); } + query_builder.push(") OR added_at < "); + let drain_cutoff = + Utc::now() - chrono::Duration::seconds(self.config.contention_drain_age_sec as i64); + query_builder.push_bind(drain_cutoff); + query_builder.push(")"); } } @@ -365,6 +392,7 @@ impl ActivationStore for PostgresStore { " SELECT id, activation, + topic, partition, kafka_offset AS offset, added_at, @@ -399,11 +427,15 @@ impl ActivationStore for PostgresStore { Ok(Some(row.into())) } - fn assign_partitions(&self, partitions: Vec) -> Result<(), Error> { + fn assign_partitions(&self, partitions: &mut dyn Iterator) { + self.partitions.write().unwrap().extend(partitions); + } + + fn revoke_partitions(&self, partitions: &mut dyn Iterator) { let mut write_guard = self.partitions.write().unwrap(); - write_guard.clear(); - write_guard.extend(partitions); - Ok(()) + for tp in partitions { + write_guard.remove(&tp); + } } #[instrument(skip_all)] @@ -420,6 +452,7 @@ impl ActivationStore for PostgresStore { ( id, activation, + topic, partition, kafka_offset, added_at, @@ -448,6 +481,7 @@ impl ActivationStore for PostgresStore { Cow::Borrowed(bytes) => b.push_bind(bytes), Cow::Owned(bytes) => b.push_bind(bytes), }; + b.push_bind(row.topic); b.push_bind(row.partition); b.push_bind(row.offset); b.push_bind(row.added_at); @@ -775,36 +809,51 @@ impl ActivationStore for PostgresStore { #[instrument(skip_all)] #[framed] - async fn count_depths_per_partition(&self) -> Result, Error> { - let assigned: Vec = self.partitions.read().unwrap().clone(); + async fn count_depths_per_partition( + &self, + ) -> Result, Error> { + // Per-owned-(topic, partition) gauge: scoped to owned pairs only (no drain + // escape) — depths for partitions this broker doesn't own would be + // meaningless. Grouping by (topic, partition) keeps same-index partitions + // from different topics distinct. + let assigned: Vec = + self.partitions.read().unwrap().iter().cloned().collect(); if assigned.is_empty() { return Ok(HashMap::new()); } let mut query_builder = QueryBuilder::new( - "SELECT partition, + "SELECT topic, partition, COUNT(*) FILTER (WHERE status = 'Pending'), COUNT(*) FILTER (WHERE status = 'Delay'), COUNT(*) FILTER (WHERE status = 'Claimed'), COUNT(*) FILTER (WHERE status = 'Processing') - FROM inflight_taskactivations WHERE partition IN (", + FROM inflight_taskactivations WHERE (topic, partition) IN (", ); - let mut separated = query_builder.separated(", "); - for partition in &assigned { - separated.push_bind(*partition); + let mut first = true; + for tp in &assigned { + if !first { + query_builder.push(", "); + } + first = false; + query_builder.push("("); + query_builder.push_bind(tp.topic.clone()); + query_builder.push(", "); + query_builder.push_bind(tp.partition); + query_builder.push(")"); } - query_builder.push(") GROUP BY partition"); + query_builder.push(") GROUP BY topic, partition"); - let rows: Vec<(i32, i64, i64, i64, i64)> = query_builder + let rows: Vec<(String, i32, i64, i64, i64, i64)> = query_builder .build_query_as() .fetch_all(&self.read_pool) .await?; - let mut counts: HashMap = rows + let mut counts: HashMap = rows .into_iter() - .map(|(partition, pending, delay, claimed, processing)| { + .map(|(topic, partition, pending, delay, claimed, processing)| { ( - partition, + TopicPartition::new(topic, partition), DepthCounts { pending: pending as usize, delay: delay as usize, @@ -815,8 +864,8 @@ impl ActivationStore for PostgresStore { }) .collect(); - for partition in &assigned { - counts.entry(*partition).or_insert(DepthCounts { + for key in &assigned { + counts.entry(key.clone()).or_insert(DepthCounts { pending: 0, delay: 0, claimed: 0, @@ -981,6 +1030,7 @@ impl ActivationStore for PostgresStore { let mut query_builder = QueryBuilder::new( "SELECT id, activation, + topic, partition, kafka_offset AS offset, added_at, diff --git a/src/store/adapters/sqlite.rs b/src/store/adapters/sqlite.rs index 7b361813..605739e6 100644 --- a/src/store/adapters/sqlite.rs +++ b/src/store/adapters/sqlite.rs @@ -30,7 +30,7 @@ use crate::config::store::StoreConfig; use crate::push::compute_claim_duration_ms; use crate::store::activation::{Activation, ActivationStatus}; use crate::store::traits::ActivationStore; -use crate::store::types::{BucketRange, FailedTasksForwarder}; +use crate::store::types::{BucketRange, FailedTasksForwarder, TopicPartition}; /// Database representation of an [`Activation`], used for both reads and /// writes. @@ -96,6 +96,11 @@ impl From> for Activation { id: value.id.into_owned(), activation: value.activation.into_owned(), status: ActivationStatus::from_str(&value.status).unwrap(), + // sqlite owns its whole DB and never filters by topic, so it has no + // contention problem and stores no `topic` column. We deliberately + // leave the sqlite schema untouched and default to DEFAULT_TOPIC on + // read-back. + topic: crate::config::DEFAULT_TOPIC.to_owned(), partition: value.partition, offset: value.offset, added_at: value.added_at, @@ -460,11 +465,15 @@ impl ActivationStore for SqliteStore { Ok(Some(row.into())) } - fn assign_partitions(&self, partitions: Vec) -> Result<(), Error> { + fn assign_partitions(&self, partitions: &mut dyn Iterator) { // sqlite owns its whole DB regardless of partition assignment, so this // is a no-op. Fires once per consumer, hence debug rather than warn. - debug!("assign_partitions: {:?}", partitions); - Ok(()) + debug!("assign_partitions: {:?}", partitions.collect::>()); + } + + fn revoke_partitions(&self, partitions: &mut dyn Iterator) { + // No-op for the same reason as `assign_partitions`. + debug!("revoke_partitions: {:?}", partitions.collect::>()); } #[instrument(skip_all)] diff --git a/src/store/tests.rs b/src/store/tests.rs index 96bce6ad..07b1d0fb 100644 --- a/src/store/tests.rs +++ b/src/store/tests.rs @@ -11,11 +11,12 @@ use tempfile::TempDir; use tokio::sync::broadcast; use tokio::task::JoinSet; -use crate::config::Config; use crate::config::store::{SqliteConfig, StoreConfig}; +use crate::config::{Config, DEFAULT_TOPIC}; use crate::store::activation::{ActivationBuilder, ActivationStatus}; use crate::store::adapters::sqlite::{SqliteStore, create_sqlite_pool}; use crate::store::traits::ActivationStore; +use crate::store::types::TopicPartition; use crate::test_utils::{ StatusCount, TaskActivationBuilder, assert_counts, create_integration_config, create_test_store, generate_temp_filename, generate_unique_namespace, make_activations, @@ -159,7 +160,11 @@ async fn test_count_depths_per_partition_postgres() { // Assign three partitions; partition 2 will have no activations and must // appear in the result with zero counts (zero-fill behavior). - store.assign_partitions(vec![0, 1, 2]).unwrap(); + store.assign_partitions( + &mut [0, 1, 2] + .into_iter() + .map(|p| TopicPartition::new(DEFAULT_TOPIC, p)), + ); let namespace = generate_unique_namespace(); let now = Utc::now(); @@ -197,13 +202,17 @@ async fn test_count_depths_per_partition_postgres() { let depths = store.count_depths_per_partition().await.unwrap(); - let p0 = depths.get(&0).expect("partition 0 missing"); + let p0 = depths + .get(&TopicPartition::new(DEFAULT_TOPIC, 0)) + .expect("partition 0 missing"); assert_eq!(p0.pending, 2, "partition 0 pending"); assert_eq!(p0.processing, 1, "partition 0 processing"); assert_eq!(p0.delay, 0, "partition 0 delay"); assert_eq!(p0.claimed, 0, "partition 0 claimed"); - let p1 = depths.get(&1).expect("partition 1 missing"); + let p1 = depths + .get(&TopicPartition::new(DEFAULT_TOPIC, 1)) + .expect("partition 1 missing"); assert_eq!(p1.pending, 0, "partition 1 pending"); assert_eq!(p1.delay, 1, "partition 1 delay"); assert_eq!(p1.processing, 0, "partition 1 processing"); @@ -211,7 +220,7 @@ async fn test_count_depths_per_partition_postgres() { // Zero-fill: partition 2 is assigned but has no rows. let p2 = depths - .get(&2) + .get(&TopicPartition::new(DEFAULT_TOPIC, 2)) .expect("partition 2 missing (zero-fill failed)"); assert_eq!(p2.pending, 0, "partition 2 pending"); assert_eq!(p2.delay, 0, "partition 2 delay"); @@ -221,6 +230,142 @@ async fn test_count_depths_per_partition_postgres() { store.remove_db().await.unwrap(); } +/// Multi-topic: partition indices overlap across topics, so claims must filter +/// by (topic, partition). A broker assigned only topic-a/partition-0 must not +/// claim a fresh topic-b/partition-0 row, even though the partition index is the +/// same. +#[tokio::test] +async fn test_multi_topic_partition_scoping_postgres() { + let store = create_test_store("postgres").await; + + // Replace the default assignment from `create_test_store` with topic-a only. + store.revoke_partitions(&mut std::iter::once(TopicPartition::new(DEFAULT_TOPIC, 0))); + store.assign_partitions(&mut std::iter::once(TopicPartition::new("topic-a", 0))); + + let namespace = generate_unique_namespace(); + let now = Utc::now(); + let make = |id: &str, topic: &str| { + ActivationBuilder::new() + .id(id.to_string()) + .taskname("taskname") + .namespace(namespace.clone()) + .topic(topic.to_string()) + .partition(0) + .added_at(now) + .received_at(now) + .processing_deadline_duration(10) + .build(TaskActivationBuilder::new()) + }; + assert!( + store + .store(&[make("a0", "topic-a"), make("b0", "topic-b")]) + .await + .is_ok() + ); + + // Only topic-a/partition-0 is owned, so only "a0" is claimable. + let claimed = store + .claim_activations_for_push(Some(10), None) + .await + .unwrap(); + assert_eq!(claimed.len(), 1); + assert_eq!(claimed[0].id, "a0"); + + // After also owning topic-b, "b0" becomes claimable. + store.assign_partitions(&mut std::iter::once(TopicPartition::new("topic-b", 0))); + let claimed = store + .claim_activations_for_push(Some(10), None) + .await + .unwrap(); + assert_eq!(claimed.len(), 1); + assert_eq!(claimed[0].id, "b0"); + + store.remove_db().await.unwrap(); +} + +/// Age-based drain: a row whose (topic, partition) this broker doesn't own is +/// still claimed once it is older than `contention_drain_age_sec`, so orphaned +/// activations drain without operator intervention. A fresh unowned row is not. +#[tokio::test] +async fn test_age_based_drain_claims_orphan_postgres() { + let store = create_test_store("postgres").await; + // Owns an unrelated topic/partition, so neither row matches by ownership. + store.revoke_partitions(&mut std::iter::once(TopicPartition::new(DEFAULT_TOPIC, 0))); + store.assign_partitions(&mut std::iter::once(TopicPartition::new("owned-topic", 0))); + + let namespace = generate_unique_namespace(); + let now = Utc::now(); + let make = |id: &str, added_at| { + ActivationBuilder::new() + .id(id.to_string()) + .taskname("taskname") + .namespace(namespace.clone()) + .topic("orphan-topic".to_string()) + .partition(99) + .added_at(added_at) + .received_at(now) + .processing_deadline_duration(10) + .build(TaskActivationBuilder::new()) + }; + // "old" is past the 60s default threshold; "fresh" is not. + let old_added_at = now - chrono::Duration::seconds(120); + assert!( + store + .store(&[make("old", old_added_at), make("fresh", now)]) + .await + .is_ok() + ); + + let claimed = store + .claim_activations_for_push(Some(10), None) + .await + .unwrap(); + assert_eq!(claimed.len(), 1, "only the old orphan should drain"); + assert_eq!(claimed[0].id, "old"); + + store.remove_db().await.unwrap(); +} + +/// Age-based drain also applies to upkeep: an orphaned Delay row past its +/// delay_until is released to Pending once older than the threshold, even though +/// this broker doesn't own its (topic, partition). +#[tokio::test] +async fn test_age_based_drain_upkeep_postgres() { + let store = create_test_store("postgres").await; + store.revoke_partitions(&mut std::iter::once(TopicPartition::new(DEFAULT_TOPIC, 0))); + store.assign_partitions(&mut std::iter::once(TopicPartition::new("owned-topic", 0))); + + let namespace = generate_unique_namespace(); + let now = Utc::now(); + let delayed = ActivationBuilder::new() + .id("orphan_delayed".to_string()) + .taskname("taskname") + .namespace(namespace.clone()) + .topic("orphan-topic".to_string()) + .partition(99) + .status(ActivationStatus::Delay) + .delay_until(now - chrono::Duration::seconds(5)) + .added_at(now - chrono::Duration::seconds(120)) + .received_at(now) + .processing_deadline_duration(10) + .build(TaskActivationBuilder::new()); + assert!(store.store(&[delayed]).await.is_ok()); + + let released = store.handle_delay_until().await.unwrap(); + assert_eq!(released, 1, "old orphaned delay row should be released"); + assert_eq!( + store + .get_by_id("orphan_delayed") + .await + .unwrap() + .unwrap() + .status, + ActivationStatus::Pending + ); + + store.remove_db().await.unwrap(); +} + #[tokio::test] #[rstest] #[case::sqlite("sqlite")] diff --git a/src/store/traits.rs b/src/store/traits.rs index e8f06e01..eb8e8fee 100644 --- a/src/store/traits.rs +++ b/src/store/traits.rs @@ -7,7 +7,7 @@ use tokio::join; use tracing::warn; use crate::store::activation::{Activation, ActivationStatus}; -use crate::store::types::{BucketRange, DepthCounts, FailedTasksForwarder}; +use crate::store::types::{BucketRange, DepthCounts, FailedTasksForwarder, TopicPartition}; #[async_trait] pub trait ActivationStore: Send + Sync { @@ -15,7 +15,12 @@ pub trait ActivationStore: Send + Sync { /// Store a batch of activations async fn store(&self, batch: &[Activation]) -> Result; - fn assign_partitions(&self, partitions: Vec) -> Result<(), Error>; + /// Add the given (topic, partition) pairs to the set this broker owns. Pairs + /// accumulate across topics and consumers sharing the store. + fn assign_partitions(&self, partitions: &mut dyn Iterator); + + /// Remove the given (topic, partition) pairs from the set this broker owns. + fn revoke_partitions(&self, partitions: &mut dyn Iterator); /// Get `limit` pending activations, optionally filtered by namespaces and bucket subrange. /// If `mark_activation_processing` is true, sets status to `Processing` and `processing_deadline`; otherwise `Claimed` and `claim_expires_at`. @@ -130,12 +135,17 @@ pub trait ActivationStore: Send + Sync { }) } - /// Queue depths grouped by partition for upkeep gauges. The default - /// implementation returns the flat total under sentinel partition -1 for - /// stores that aren't partition-aware. - async fn count_depths_per_partition(&self) -> Result, Error> { + /// Queue depths grouped by (topic, partition) for upkeep gauges. The default + /// implementation returns the flat total under the default topic and sentinel + /// partition -1 for stores that aren't partition-aware. + async fn count_depths_per_partition( + &self, + ) -> Result, Error> { let total = self.count_depths().await?; - Ok(HashMap::from([(-1, total)])) + Ok(HashMap::from([( + TopicPartition::new(crate::config::DEFAULT_TOPIC, -1), + total, + )])) } /// Set the processing deadline for a specific activation diff --git a/src/store/types.rs b/src/store/types.rs index 0f308fdb..7a490cdb 100644 --- a/src/store/types.rs +++ b/src/store/types.rs @@ -1,5 +1,35 @@ pub type BucketRange = (i16, i16); +/// A Kafka topic paired with one of its partition indices. Partition indices +/// overlap across topics, so contention filtering and per-partition gauges must +/// be keyed by the (topic, partition) pair rather than the partition alone. +#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct TopicPartition { + pub topic: String, + pub partition: i32, +} + +impl TopicPartition { + pub fn new(topic: impl Into, partition: i32) -> Self { + Self { + topic: topic.into(), + partition, + } + } +} + +impl std::fmt::Display for TopicPartition { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}/{}", self.topic, self.partition) + } +} + +impl From<&(String, i32)> for TopicPartition { + fn from((topic, partition): &(String, i32)) -> Self { + Self::new(topic.clone(), *partition) + } +} + pub struct FailedTasksForwarder { pub to_discard: Vec<(String, Vec)>, pub to_deadletter: Vec<(String, Vec)>, diff --git a/src/test_utils.rs b/src/test_utils.rs index 30ef3eba..f9499601 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -15,13 +15,14 @@ use prost_types::Timestamp; use sentry_protos::taskbroker::v1::{self, OnAttemptsExceeded, RetryState, TaskActivation}; use uuid::Uuid; -use crate::config::Config; use crate::config::deprecated::DeprecatedConfig; use crate::config::store::{PgConfig, StoreConfig}; +use crate::config::{Config, DEFAULT_TOPIC}; use crate::store::activation::{Activation, ActivationBuilder, ActivationStatus}; use crate::store::adapters::postgres::{self, PostgresStore}; use crate::store::adapters::sqlite::SqliteStore; use crate::store::traits::ActivationStore; +use crate::store::types::TopicPartition; /// msgpack encoding of an empty map (`{}`), used as default task parameters in tests. pub const EMPTY_MSGPACK_MAP: &[u8] = &[0x80]; @@ -286,7 +287,7 @@ pub async fn create_test_store(adapter: &str) -> Arc { let store = Arc::new(PostgresStore::new(&config).await.unwrap()) as Arc; - store.assign_partitions(vec![0]).unwrap(); + store.assign_partitions(&mut std::iter::once(TopicPartition::new(DEFAULT_TOPIC, 0))); store } _ => panic!("Invalid adapter: {}", adapter), diff --git a/src/upkeep.rs b/src/upkeep.rs index fab354bc..9f096000 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -21,6 +21,7 @@ use crate::SERVICE_NAME; use crate::config::Config; use crate::runtime_config::RuntimeConfigManager; use crate::store::traits::ActivationStore; +use crate::store::types::TopicPartition; /// The upkeep task that periodically performs upkeep /// on the activation store @@ -46,7 +47,7 @@ pub async fn upkeep( let mut last_run = Instant::now(); let mut last_vacuum = Instant::now(); let mut last_backtrace_log = Instant::now(); - let mut emitted_partitions: HashSet = HashSet::new(); + let mut emitted_partitions: HashSet = HashSet::new(); loop { select! { _ = timer.tick() => { @@ -129,7 +130,7 @@ pub async fn do_upkeep( startup_time: DateTime, runtime_config_manager: Arc, last_vacuum: &mut Instant, - emitted_partitions: &mut HashSet, + emitted_partitions: &mut HashSet, ) -> UpkeepResults { let current_time = Utc::now(); let upkeep_start = Instant::now(); @@ -499,28 +500,30 @@ pub async fn do_upkeep( // without a partition filter still see the global total via tag sum. // Zero out gauges for partitions we emitted last cycle but no longer own. if let Ok(depths) = depth_counts { - let current: HashSet = depths.keys().copied().collect(); + let current: HashSet = depths.keys().cloned().collect(); - for partition in emitted_partitions.difference(¤t) { - let partition = partition.to_string(); - metrics::gauge!("upkeep.current_pending_tasks", "partition" => partition.clone()) + for tp in emitted_partitions.difference(¤t) { + let topic = tp.topic.clone(); + let partition = tp.partition.to_string(); + metrics::gauge!("upkeep.current_pending_tasks", "topic" => topic.clone(), "partition" => partition.clone()) .set(0.0); - metrics::gauge!("upkeep.current_claimed_tasks", "partition" => partition.clone()) + metrics::gauge!("upkeep.current_claimed_tasks", "topic" => topic.clone(), "partition" => partition.clone()) .set(0.0); - metrics::gauge!("upkeep.current_processing_tasks", "partition" => partition.clone()) + metrics::gauge!("upkeep.current_processing_tasks", "topic" => topic.clone(), "partition" => partition.clone()) .set(0.0); - metrics::gauge!("upkeep.current_delayed_tasks", "partition" => partition).set(0.0); + metrics::gauge!("upkeep.current_delayed_tasks", "topic" => topic, "partition" => partition).set(0.0); } - for (partition, counts) in &depths { - let partition = partition.to_string(); - metrics::gauge!("upkeep.current_pending_tasks", "partition" => partition.clone()) + for (tp, counts) in &depths { + let topic = tp.topic.clone(); + let partition = tp.partition.to_string(); + metrics::gauge!("upkeep.current_pending_tasks", "topic" => topic.clone(), "partition" => partition.clone()) .set(counts.pending as f64); - metrics::gauge!("upkeep.current_claimed_tasks", "partition" => partition.clone()) + metrics::gauge!("upkeep.current_claimed_tasks", "topic" => topic.clone(), "partition" => partition.clone()) .set(counts.claimed as f64); - metrics::gauge!("upkeep.current_processing_tasks", "partition" => partition.clone()) + metrics::gauge!("upkeep.current_processing_tasks", "topic" => topic.clone(), "partition" => partition.clone()) .set(counts.processing as f64); - metrics::gauge!("upkeep.current_delayed_tasks", "partition" => partition) + metrics::gauge!("upkeep.current_delayed_tasks", "topic" => topic, "partition" => partition) .set(counts.delay as f64); }