From 385df96ae2a6a6ca727f429503c54fb1d90f8fd7 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 23 Jun 2026 13:20:20 +0200 Subject: [PATCH 01/10] wip --- .../0004_add_topic_age_contention.sql | 17 +++ src/config/mod.rs | 78 +--------- src/config/store.rs | 8 + src/fetch/tests.rs | 2 +- src/kafka/consumer.rs | 6 +- src/kafka/deserialize_activation.rs | 1 + src/kafka/deserialize_raw.rs | 1 + src/push/tests.rs | 2 +- src/store/activation.rs | 7 + src/store/adapters/postgres.rs | 87 +++++++++-- src/store/adapters/sqlite.rs | 7 +- src/store/tests.rs | 142 +++++++++++++++++- src/store/traits.rs | 5 +- src/test_utils.rs | 4 +- 14 files changed, 268 insertions(+), 99 deletions(-) create mode 100644 migrations/postgres/0004_add_topic_age_contention.sql diff --git a/migrations/postgres/0004_add_topic_age_contention.sql b/migrations/postgres/0004_add_topic_age_contention.sql new file mode 100644 index 00000000..11239960 --- /dev/null +++ b/migrations/postgres/0004_add_topic_age_contention.sql @@ -0,0 +1,17 @@ +-- Multi-topic + age-based contention management (STREAM-1205). +-- +-- A `topic` column distinguishes activations across topics so that partition +-- indices, which overlap between topics, no longer collide. Contention queries +-- filter by (topic, partition) instead of partition alone. +ALTER TABLE inflight_taskactivations ADD COLUMN topic TEXT NOT NULL DEFAULT ''; + +-- Serves the contention branch of the claim/upkeep queries: +-- (topic, partition) IN (...) AND bucket BETWEEN ... +CREATE INDEX idx_topic_partition_bucket + ON inflight_taskactivations (topic, partition, bucket); + +-- Serves the age-based drain branch (added_at < threshold) that lets any broker +-- claim/maintain orphaned rows regardless of partition ownership. Non-partial so +-- it covers the drain branch for every status, not just Pending. +CREATE INDEX idx_added_at + ON inflight_taskactivations (added_at); diff --git a/src/config/mod.rs b/src/config/mod.rs index e5e8a414..55efb5e8 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -29,11 +29,15 @@ pub mod validate; use deprecated::DeprecatedConfig; use kafka::{ClusterConfig, TopicConfig}; use raw::RawModeConfig; -use store::DatabaseAdapter; /// Used to identify whether a configuration option was provided by a user, or whether it is a default. const DEFAULT_CONFIG_PROVIDER: &str = "TaskbrokerConfig"; +/// The default Kafka topic used when none is configured (the historical +/// zero-config `taskworker` default). Also used as the default `topic` for +/// activations that aren't built from a Kafka message (e.g. in tests). +pub const DEFAULT_TOPIC: &str = "taskworker"; + /// How the taskbroker delivers tasks to workers. #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Deserialize, Serialize)] #[serde(rename_all = "lowercase")] @@ -403,7 +407,6 @@ impl Config { pub(crate) fn normalize_and_validate(&mut self) -> Result<()> { const DEFAULT_CLUSTER: &str = "default"; const DEADLETTER_CLUSTER: &str = "deadletter"; - const DEFAULT_TOPIC: &str = "taskworker"; const DEFAULT_CLUSTER_ADDRESS: &str = "127.0.0.1:9092"; const DEFAULT_CONSUMER_GROUP: &str = "taskworker"; @@ -630,23 +633,6 @@ impl Config { // Validate at least one consumable topic. let consumable = self.consumable_topics()?; - // Multi-topic consumption is only supported on the sqlite adapter for - // now. The postgres adapter filters claims by a single shared partition - // list, but those partition numbers aren't unique across topics, so the - // filter would mix partitions from different topics together. Note this - // filtering exists only to avoid lock contention between brokers, not for - // correctness; supporting multi-topic on postgres means reworking how we - // avoid that contention (e.g. filtering by (topic, partition) or another - // mechanism entirely). Reject the combination here, before any consumer - // spawns. - if consumable.len() > 1 && self.store.adapter == DatabaseAdapter::Postgres { - return Err(anyhow!( - "multi-topic consumption ({} consumable topics) is not supported with the \ - postgres database adapter; use the sqlite adapter or a single consumable topic", - consumable.len() - )); - } - // The deadletter topic must be a declared topic so the producer can // resolve its cluster. In legacy mode it was added above; in the new // format the user must declare it (produce-only) in kafka_topics. @@ -909,7 +895,8 @@ mod tests { use crate::logging::LogFormat; use crate::{Args, Run}; - use super::{Config, DatabaseAdapter, DeliveryMode}; + use super::store::DatabaseAdapter; + use super::{Config, DeliveryMode}; #[test] fn test_default() { @@ -1701,57 +1688,6 @@ kafka_clusters: }); } - #[test] - fn test_multi_topic_rejected_on_postgres() { - Jail::expect_with(|jail| { - // Multiple consumable topics are allowed on sqlite but rejected on - // postgres, whose claim filtering can't distinguish partitions - // across topics. - jail.create_file( - "config.yaml", - r#" -database_adapter: postgres -kafka_deadletter_topic: tasks-dlq -kafka_retry_topic: tasks-retry - -kafka_topics: - profiles: - cluster: my-cluster - consumer_group: taskbroker-profiles - subscriptions: - cluster: my-cluster - consumer_group: taskbroker-subscriptions - tasks-retry: - cluster: my-cluster - consumer_group: taskbroker-retry - produce_only: true - tasks-dlq: - cluster: my-cluster - consumer_group: taskbroker-dlq - produce_only: true - -kafka_clusters: - my-cluster: - address: 10.0.0.1:9092 -"#, - )?; - - let args = Args { - run: Run::Broker, - config: Some("config.yaml".to_owned()), - }; - let err = Config::from_args(&args).unwrap_err(); - assert!( - err.to_string() - .contains("not supported with the postgres database adapter"), - "unexpected error: {}", - err - ); - - Ok(()) - }); - } - #[test] fn test_multi_topic_allows_one_consumable_with_produce_only() { Jail::expect_with(|jail| { diff --git a/src/config/store.rs b/src/config/store.rs index 30ce9c5f..f0cd73a8 100644 --- a/src/config/store.rs +++ b/src/config/store.rs @@ -181,6 +181,13 @@ pub struct StoreConfig { /// are extended by. This helps reduce broker deadline resets when /// brokers are under load, or there are small networking delays. pub processing_deadline_grace_sec: u64, + + /// Age-based contention drain threshold in seconds (postgres only). + /// Activations older than this are considered potentially orphaned and + /// bypass the (topic, partition) contention filter, so any broker can + /// claim and maintain them regardless of partition ownership. This drains + /// rows left behind by rebalances and topic/partition moves between pools. + pub contention_drain_age_sec: u64, } impl Default for StoreConfig { @@ -200,6 +207,7 @@ impl Default for StoreConfig { max_processing_count: 2048, max_processing_attempts: 5, processing_deadline_grace_sec: 3, + contention_drain_age_sec: 60, } } } diff --git a/src/fetch/tests.rs b/src/fetch/tests.rs index ec4e6457..e32686d7 100644 --- a/src/fetch/tests.rs +++ b/src/fetch/tests.rs @@ -49,7 +49,7 @@ impl MockStore { #[async_trait] impl ActivationStore for MockStore { - fn assign_partitions(&self, _partitions: Vec) -> Result<(), Error> { + fn assign_partitions(&self, _topic: &str, _partitions: Vec) -> Result<(), Error> { Ok(()) } diff --git a/src/kafka/consumer.rs b/src/kafka/consumer.rs index 1548ef0c..c4e0e001 100644 --- a/src/kafka/consumer.rs +++ b/src/kafka/consumer.rs @@ -386,7 +386,7 @@ pub async fn handle_events( for (_, partition) in tpl.iter() { partitions.push(*partition); } - activation_store.assign_partitions(partitions).unwrap(); + activation_store.assign_partitions(&topic, partitions).unwrap(); ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl) } (ConsumerState::Ready, Event::Revoke(_)) => { @@ -401,13 +401,13 @@ pub async fn handle_events( tpl == revoked, "Revoked TPL should be equal to the subset of TPL we're consuming from" ); - activation_store.assign_partitions(vec![]).unwrap(); + activation_store.assign_partitions(&topic, vec![]).unwrap(); handles.shutdown(CALLBACK_DURATION).await; metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topic.clone()).set(0); ConsumerState::Ready } (ConsumerState::Consuming(handles, _), Event::Shutdown) => { - activation_store.assign_partitions(vec![]).unwrap(); + activation_store.assign_partitions(&topic, vec![]).unwrap(); handles.shutdown(CALLBACK_DURATION).await; debug!("Signaling shutdown to client..."); shutdown_client.take(); diff --git a/src/kafka/deserialize_activation.rs b/src/kafka/deserialize_activation.rs index 11b3bf97..8ccb88a9 100644 --- a/src/kafka/deserialize_activation.rs +++ b/src/kafka/deserialize_activation.rs @@ -96,6 +96,7 @@ pub fn new( id: activation.id.clone(), activation: payload.to_vec(), status, + topic: msg.topic().to_owned(), partition: msg.partition(), offset: msg.offset(), added_at: Utc::now(), diff --git a/src/kafka/deserialize_raw.rs b/src/kafka/deserialize_raw.rs index 0eadbb82..cd62cc70 100644 --- a/src/kafka/deserialize_raw.rs +++ b/src/kafka/deserialize_raw.rs @@ -192,6 +192,7 @@ pub fn new(config: RawConfig) -> impl Fn(&OwnedMessage) -> Result) -> Result<()> { + fn assign_partitions(&self, _topic: &str, _partitions: Vec) -> Result<()> { Ok(()) } diff --git a/src/store/activation.rs b/src/store/activation.rs index 97ba171c..95290bab 100644 --- a/src/store/activation.rs +++ b/src/store/activation.rs @@ -104,6 +104,13 @@ pub struct Activation { #[builder(default = ActivationStatus::Pending)] pub status: ActivationStatus, + /// The Kafka topic the activation was received from. Used together with + /// `partition` to scope contention filtering, since partition indices + /// overlap across topics. Defaults to [`crate::config::DEFAULT_TOPIC`] when + /// not built from a Kafka message (e.g. in tests). + #[builder(setter(into), default = crate::config::DEFAULT_TOPIC.to_owned())] + pub topic: String, + /// The partition the activation was received from #[builder(default = 0)] pub partition: i32, diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index 8ac21131..0779dc6b 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -1,5 +1,5 @@ use std::borrow::Cow; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::str::FromStr; use std::sync::RwLock; use std::time::Instant; @@ -100,6 +100,7 @@ pub async fn migrate(config: &StoreConfig) -> Result<()> { struct TableRow<'a> { pub id: Cow<'a, str>, pub activation: Cow<'a, [u8]>, + pub topic: Cow<'a, str>, pub partition: i32, pub offset: i64, pub added_at: DateTime, @@ -124,6 +125,7 @@ impl<'a> From<&'a Activation> for TableRow<'a> { Self { id: Cow::Borrowed(&value.id), activation: Cow::Borrowed(&value.activation), + topic: Cow::Borrowed(&value.topic), partition: value.partition, offset: value.offset, added_at: value.added_at, @@ -153,6 +155,7 @@ impl From> for Activation { id: value.id.into_owned(), activation: value.activation.into_owned(), status: ActivationStatus::from_str(&value.status).unwrap(), + topic: value.topic.into_owned(), partition: value.partition, offset: value.offset, added_at: value.added_at, @@ -181,6 +184,7 @@ impl FromRow<'_, PgRow> for TableRow<'static> { Ok(Self { id: Cow::Owned(row.try_get::("id")?), activation: Cow::Owned(row.try_get::, _>("activation")?), + topic: Cow::Owned(row.try_get::("topic")?), partition: row.try_get("partition")?, offset: row.try_get("offset")?, added_at: row.try_get("added_at")?, @@ -244,7 +248,9 @@ pub struct PostgresStore { read_pool: PgPool, write_pool: PgPool, config: StoreConfig, - partitions: RwLock>, + /// Partitions assigned to this broker, keyed by topic. Contention queries + /// filter by the (topic, partition) pairs across all assigned topics. + partitions: RwLock>>, claim_duration_ms: u64, } @@ -296,29 +302,57 @@ impl PostgresStore { read_pool, write_pool, config: config.store.clone(), - partitions: RwLock::new(vec![]), + partitions: RwLock::new(BTreeMap::new()), claim_duration_ms: compute_claim_duration_ms(config), }) } - /// Add the partition condition to the query builder in a thread-safe manner + /// Add the contention condition to the query builder in a thread-safe manner. + /// + /// Limits a query to the rows this broker is responsible for, by the + /// (topic, partition) pairs assigned to it. An age-based escape hatch is + /// OR-ed in: rows older than `contention_drain_age_sec` bypass the filter so + /// that orphaned activations (left behind by rebalances or topic/partition + /// moves between pools) drain automatically — any broker can claim and + /// maintain them. `topic`/`partition` aren't required for correctness (the + /// status update is atomic and claims use `FOR UPDATE SKIP LOCKED`), so the + /// escape only ever causes brief, bounded extra contention, never data loss. + /// + /// When no partitions are assigned (e.g. before the first rebalance) no + /// condition is added and the query sees the whole table, as before. fn add_partition_condition( &self, query_builder: &mut QueryBuilder, first_condition: bool, ) { let partitions = self.partitions.read().unwrap(); + if partitions.is_empty() { + return; + } + let condition = if first_condition { "WHERE" } else { "AND" }; - if !partitions.is_empty() { - query_builder.push(" "); - query_builder.push(condition); - query_builder.push(" partition IN ("); - let mut separated = query_builder.separated(", "); - for partition in partitions.iter() { - separated.push_bind(*partition); + query_builder.push(" "); + query_builder.push(condition); + query_builder.push(" ((topic, partition) IN ("); + let mut first = true; + for (topic, topic_partitions) in partitions.iter() { + for partition in topic_partitions.iter() { + if !first { + query_builder.push(", "); + } + first = false; + query_builder.push("("); + query_builder.push_bind(topic.clone()); + query_builder.push(", "); + query_builder.push_bind(*partition); + query_builder.push(")"); } - query_builder.push(")"); } + query_builder.push(") OR added_at < "); + let drain_cutoff = + Utc::now() - chrono::Duration::seconds(self.config.contention_drain_age_sec as i64); + query_builder.push_bind(drain_cutoff); + query_builder.push(")"); } } @@ -365,6 +399,7 @@ impl ActivationStore for PostgresStore { " SELECT id, activation, + topic, partition, kafka_offset AS offset, added_at, @@ -399,10 +434,15 @@ impl ActivationStore for PostgresStore { Ok(Some(row.into())) } - fn assign_partitions(&self, partitions: Vec) -> Result<(), Error> { + fn assign_partitions(&self, topic: &str, partitions: Vec) -> Result<(), Error> { let mut write_guard = self.partitions.write().unwrap(); - write_guard.clear(); - write_guard.extend(partitions); + // Keep the map free of empty entries so `add_partition_condition` never + // emits an empty `IN ()`. An empty assignment clears the topic. + if partitions.is_empty() { + write_guard.remove(topic); + } else { + write_guard.insert(topic.to_owned(), partitions); + } Ok(()) } @@ -420,6 +460,7 @@ impl ActivationStore for PostgresStore { ( id, activation, + topic, partition, kafka_offset, added_at, @@ -448,6 +489,7 @@ impl ActivationStore for PostgresStore { Cow::Borrowed(bytes) => b.push_bind(bytes), Cow::Owned(bytes) => b.push_bind(bytes), }; + b.push_bind(row.topic); b.push_bind(row.partition); b.push_bind(row.offset); b.push_bind(row.added_at); @@ -776,7 +818,19 @@ impl ActivationStore for PostgresStore { #[instrument(skip_all)] #[framed] async fn count_depths_per_partition(&self) -> Result, Error> { - let assigned: Vec = self.partitions.read().unwrap().clone(); + // Per-owned-partition gauge: intentionally partition-only (no age-based + // drain escape), since reporting depths for partitions this broker + // doesn't own would be meaningless. + // TODO(multi-topic): this groups by raw partition number, so partitions + // with the same index across different topics are merged in the gauge. + let assigned: Vec = self + .partitions + .read() + .unwrap() + .values() + .flatten() + .copied() + .collect(); if assigned.is_empty() { return Ok(HashMap::new()); } @@ -981,6 +1035,7 @@ impl ActivationStore for PostgresStore { let mut query_builder = QueryBuilder::new( "SELECT id, activation, + topic, partition, kafka_offset AS offset, added_at, diff --git a/src/store/adapters/sqlite.rs b/src/store/adapters/sqlite.rs index 7b361813..da582dbe 100644 --- a/src/store/adapters/sqlite.rs +++ b/src/store/adapters/sqlite.rs @@ -96,6 +96,9 @@ impl From> for Activation { id: value.id.into_owned(), activation: value.activation.into_owned(), status: ActivationStatus::from_str(&value.status).unwrap(), + // sqlite owns its whole DB and never filters by topic, so the column + // isn't stored; report the default topic on read-back. + topic: crate::config::DEFAULT_TOPIC.to_owned(), partition: value.partition, offset: value.offset, added_at: value.added_at, @@ -460,10 +463,10 @@ impl ActivationStore for SqliteStore { Ok(Some(row.into())) } - fn assign_partitions(&self, partitions: Vec) -> Result<(), Error> { + fn assign_partitions(&self, topic: &str, partitions: Vec) -> Result<(), Error> { // sqlite owns its whole DB regardless of partition assignment, so this // is a no-op. Fires once per consumer, hence debug rather than warn. - debug!("assign_partitions: {:?}", partitions); + debug!("assign_partitions: {topic} {partitions:?}"); Ok(()) } diff --git a/src/store/tests.rs b/src/store/tests.rs index 96bce6ad..c7d590bd 100644 --- a/src/store/tests.rs +++ b/src/store/tests.rs @@ -11,8 +11,8 @@ use tempfile::TempDir; use tokio::sync::broadcast; use tokio::task::JoinSet; -use crate::config::Config; use crate::config::store::{SqliteConfig, StoreConfig}; +use crate::config::{Config, DEFAULT_TOPIC}; use crate::store::activation::{ActivationBuilder, ActivationStatus}; use crate::store::adapters::sqlite::{SqliteStore, create_sqlite_pool}; use crate::store::traits::ActivationStore; @@ -159,7 +159,9 @@ async fn test_count_depths_per_partition_postgres() { // Assign three partitions; partition 2 will have no activations and must // appear in the result with zero counts (zero-fill behavior). - store.assign_partitions(vec![0, 1, 2]).unwrap(); + store + .assign_partitions(DEFAULT_TOPIC, vec![0, 1, 2]) + .unwrap(); let namespace = generate_unique_namespace(); let now = Utc::now(); @@ -221,6 +223,142 @@ async fn test_count_depths_per_partition_postgres() { store.remove_db().await.unwrap(); } +/// Multi-topic: partition indices overlap across topics, so claims must filter +/// by (topic, partition). A broker assigned only topic-a/partition-0 must not +/// claim a fresh topic-b/partition-0 row, even though the partition index is the +/// same. +#[tokio::test] +async fn test_multi_topic_partition_scoping_postgres() { + let store = create_test_store("postgres").await; + + // Replace the default assignment from `create_test_store` with topic-a only. + store.assign_partitions(DEFAULT_TOPIC, vec![]).unwrap(); + store.assign_partitions("topic-a", vec![0]).unwrap(); + + let namespace = generate_unique_namespace(); + let now = Utc::now(); + let make = |id: &str, topic: &str| { + ActivationBuilder::new() + .id(id.to_string()) + .taskname("taskname") + .namespace(namespace.clone()) + .topic(topic.to_string()) + .partition(0) + .added_at(now) + .received_at(now) + .processing_deadline_duration(10) + .build(TaskActivationBuilder::new()) + }; + assert!( + store + .store(&[make("a0", "topic-a"), make("b0", "topic-b")]) + .await + .is_ok() + ); + + // Only topic-a/partition-0 is owned, so only "a0" is claimable. + let claimed = store + .claim_activations_for_push(Some(10), None) + .await + .unwrap(); + assert_eq!(claimed.len(), 1); + assert_eq!(claimed[0].id, "a0"); + + // After also owning topic-b, "b0" becomes claimable. + store.assign_partitions("topic-b", vec![0]).unwrap(); + let claimed = store + .claim_activations_for_push(Some(10), None) + .await + .unwrap(); + assert_eq!(claimed.len(), 1); + assert_eq!(claimed[0].id, "b0"); + + store.remove_db().await.unwrap(); +} + +/// Age-based drain: a row whose (topic, partition) this broker doesn't own is +/// still claimed once it is older than `contention_drain_age_sec`, so orphaned +/// activations drain without operator intervention. A fresh unowned row is not. +#[tokio::test] +async fn test_age_based_drain_claims_orphan_postgres() { + let store = create_test_store("postgres").await; + // Owns an unrelated topic/partition, so neither row matches by ownership. + store.assign_partitions(DEFAULT_TOPIC, vec![]).unwrap(); + store.assign_partitions("owned-topic", vec![0]).unwrap(); + + let namespace = generate_unique_namespace(); + let now = Utc::now(); + let make = |id: &str, added_at| { + ActivationBuilder::new() + .id(id.to_string()) + .taskname("taskname") + .namespace(namespace.clone()) + .topic("orphan-topic".to_string()) + .partition(99) + .added_at(added_at) + .received_at(now) + .processing_deadline_duration(10) + .build(TaskActivationBuilder::new()) + }; + // "old" is past the 60s default threshold; "fresh" is not. + let old_added_at = now - chrono::Duration::seconds(120); + assert!( + store + .store(&[make("old", old_added_at), make("fresh", now)]) + .await + .is_ok() + ); + + let claimed = store + .claim_activations_for_push(Some(10), None) + .await + .unwrap(); + assert_eq!(claimed.len(), 1, "only the old orphan should drain"); + assert_eq!(claimed[0].id, "old"); + + store.remove_db().await.unwrap(); +} + +/// Age-based drain also applies to upkeep: an orphaned Delay row past its +/// delay_until is released to Pending once older than the threshold, even though +/// this broker doesn't own its (topic, partition). +#[tokio::test] +async fn test_age_based_drain_upkeep_postgres() { + let store = create_test_store("postgres").await; + store.assign_partitions(DEFAULT_TOPIC, vec![]).unwrap(); + store.assign_partitions("owned-topic", vec![0]).unwrap(); + + let namespace = generate_unique_namespace(); + let now = Utc::now(); + let delayed = ActivationBuilder::new() + .id("orphan_delayed".to_string()) + .taskname("taskname") + .namespace(namespace.clone()) + .topic("orphan-topic".to_string()) + .partition(99) + .status(ActivationStatus::Delay) + .delay_until(now - chrono::Duration::seconds(5)) + .added_at(now - chrono::Duration::seconds(120)) + .received_at(now) + .processing_deadline_duration(10) + .build(TaskActivationBuilder::new()); + assert!(store.store(&[delayed]).await.is_ok()); + + let released = store.handle_delay_until().await.unwrap(); + assert_eq!(released, 1, "old orphaned delay row should be released"); + assert_eq!( + store + .get_by_id("orphan_delayed") + .await + .unwrap() + .unwrap() + .status, + ActivationStatus::Pending + ); + + store.remove_db().await.unwrap(); +} + #[tokio::test] #[rstest] #[case::sqlite("sqlite")] diff --git a/src/store/traits.rs b/src/store/traits.rs index e8f06e01..350de738 100644 --- a/src/store/traits.rs +++ b/src/store/traits.rs @@ -15,7 +15,10 @@ pub trait ActivationStore: Send + Sync { /// Store a batch of activations async fn store(&self, batch: &[Activation]) -> Result; - fn assign_partitions(&self, partitions: Vec) -> Result<(), Error>; + /// Record the partitions currently assigned to this broker for `topic`. + /// Contention queries filter by the (topic, partition) pairs across all + /// assigned topics. Passing an empty `partitions` clears the topic. + fn assign_partitions(&self, topic: &str, partitions: Vec) -> Result<(), Error>; /// Get `limit` pending activations, optionally filtered by namespaces and bucket subrange. /// If `mark_activation_processing` is true, sets status to `Processing` and `processing_deadline`; otherwise `Claimed` and `claim_expires_at`. diff --git a/src/test_utils.rs b/src/test_utils.rs index 30ef3eba..49fdb12c 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -15,9 +15,9 @@ use prost_types::Timestamp; use sentry_protos::taskbroker::v1::{self, OnAttemptsExceeded, RetryState, TaskActivation}; use uuid::Uuid; -use crate::config::Config; use crate::config::deprecated::DeprecatedConfig; use crate::config::store::{PgConfig, StoreConfig}; +use crate::config::{Config, DEFAULT_TOPIC}; use crate::store::activation::{Activation, ActivationBuilder, ActivationStatus}; use crate::store::adapters::postgres::{self, PostgresStore}; use crate::store::adapters::sqlite::SqliteStore; @@ -286,7 +286,7 @@ pub async fn create_test_store(adapter: &str) -> Arc { let store = Arc::new(PostgresStore::new(&config).await.unwrap()) as Arc; - store.assign_partitions(vec![0]).unwrap(); + store.assign_partitions(DEFAULT_TOPIC, vec![0]).unwrap(); store } _ => panic!("Invalid adapter: {}", adapter), From 77a408dbfd1d57ee3e0ad2aeb9414681aac1fea0 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 23 Jun 2026 13:38:54 +0200 Subject: [PATCH 02/10] wiop --- src/store/adapters/postgres.rs | 61 +++++++++++++++++++--------------- src/store/adapters/sqlite.rs | 9 +++-- src/store/tests.rs | 10 ++++-- src/store/traits.rs | 15 ++++++--- src/upkeep.rs | 28 ++++++++-------- 5 files changed, 74 insertions(+), 49 deletions(-) diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index 0779dc6b..d0686db1 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -817,48 +817,57 @@ impl ActivationStore for PostgresStore { #[instrument(skip_all)] #[framed] - async fn count_depths_per_partition(&self) -> Result, Error> { - // Per-owned-partition gauge: intentionally partition-only (no age-based - // drain escape), since reporting depths for partitions this broker - // doesn't own would be meaningless. - // TODO(multi-topic): this groups by raw partition number, so partitions - // with the same index across different topics are merged in the gauge. - let assigned: Vec = self - .partitions - .read() - .unwrap() - .values() - .flatten() - .copied() - .collect(); + async fn count_depths_per_partition( + &self, + ) -> Result, Error> { + // Per-owned-(topic, partition) gauge: intentionally scoped to owned + // (topic, partition) pairs (no age-based drain escape), since reporting + // depths for partitions this broker doesn't own would be meaningless. + // Grouping by (topic, partition) keeps partitions with the same index + // across different topics distinct in multi-topic mode. + let assigned: Vec<(String, i32)> = { + let partitions = self.partitions.read().unwrap(); + partitions + .iter() + .flat_map(|(topic, parts)| parts.iter().map(|p| (topic.clone(), *p))) + .collect() + }; if assigned.is_empty() { return Ok(HashMap::new()); } let mut query_builder = QueryBuilder::new( - "SELECT partition, + "SELECT topic, partition, COUNT(*) FILTER (WHERE status = 'Pending'), COUNT(*) FILTER (WHERE status = 'Delay'), COUNT(*) FILTER (WHERE status = 'Claimed'), COUNT(*) FILTER (WHERE status = 'Processing') - FROM inflight_taskactivations WHERE partition IN (", + FROM inflight_taskactivations WHERE (topic, partition) IN (", ); - let mut separated = query_builder.separated(", "); - for partition in &assigned { - separated.push_bind(*partition); + let mut first = true; + for (topic, partition) in &assigned { + if !first { + query_builder.push(", "); + } + first = false; + query_builder.push("("); + query_builder.push_bind(topic.clone()); + query_builder.push(", "); + query_builder.push_bind(*partition); + query_builder.push(")"); } - query_builder.push(") GROUP BY partition"); + query_builder.push(") GROUP BY topic, partition"); - let rows: Vec<(i32, i64, i64, i64, i64)> = query_builder + let rows: Vec<(String, i32, i64, i64, i64, i64)> = query_builder .build_query_as() .fetch_all(&self.read_pool) .await?; - let mut counts: HashMap = rows + let mut counts: HashMap<(String, i32), DepthCounts> = rows .into_iter() - .map(|(partition, pending, delay, claimed, processing)| { + .map(|(topic, partition, pending, delay, claimed, processing)| { ( - partition, + (topic, partition), DepthCounts { pending: pending as usize, delay: delay as usize, @@ -869,8 +878,8 @@ impl ActivationStore for PostgresStore { }) .collect(); - for partition in &assigned { - counts.entry(*partition).or_insert(DepthCounts { + for key in &assigned { + counts.entry(key.clone()).or_insert(DepthCounts { pending: 0, delay: 0, claimed: 0, diff --git a/src/store/adapters/sqlite.rs b/src/store/adapters/sqlite.rs index da582dbe..332fd0fb 100644 --- a/src/store/adapters/sqlite.rs +++ b/src/store/adapters/sqlite.rs @@ -96,8 +96,13 @@ impl From> for Activation { id: value.id.into_owned(), activation: value.activation.into_owned(), status: ActivationStatus::from_str(&value.status).unwrap(), - // sqlite owns its whole DB and never filters by topic, so the column - // isn't stored; report the default topic on read-back. + // The `topic` column only exists on postgres, where it scopes + // contention filtering across topics that share partition indices. + // sqlite owns its whole DB and never filters by topic, so it has no + // contention problem to solve and the column buys us nothing. This is + // a deliberate choice to leave the sqlite schema untouched for now + // rather than a limitation: we default to DEFAULT_TOPIC on read-back + // so the shared `Activation` struct is satisfied without a migration. topic: crate::config::DEFAULT_TOPIC.to_owned(), partition: value.partition, offset: value.offset, diff --git a/src/store/tests.rs b/src/store/tests.rs index c7d590bd..bfbaaf58 100644 --- a/src/store/tests.rs +++ b/src/store/tests.rs @@ -199,13 +199,17 @@ async fn test_count_depths_per_partition_postgres() { let depths = store.count_depths_per_partition().await.unwrap(); - let p0 = depths.get(&0).expect("partition 0 missing"); + let p0 = depths + .get(&(DEFAULT_TOPIC.to_owned(), 0)) + .expect("partition 0 missing"); assert_eq!(p0.pending, 2, "partition 0 pending"); assert_eq!(p0.processing, 1, "partition 0 processing"); assert_eq!(p0.delay, 0, "partition 0 delay"); assert_eq!(p0.claimed, 0, "partition 0 claimed"); - let p1 = depths.get(&1).expect("partition 1 missing"); + let p1 = depths + .get(&(DEFAULT_TOPIC.to_owned(), 1)) + .expect("partition 1 missing"); assert_eq!(p1.pending, 0, "partition 1 pending"); assert_eq!(p1.delay, 1, "partition 1 delay"); assert_eq!(p1.processing, 0, "partition 1 processing"); @@ -213,7 +217,7 @@ async fn test_count_depths_per_partition_postgres() { // Zero-fill: partition 2 is assigned but has no rows. let p2 = depths - .get(&2) + .get(&(DEFAULT_TOPIC.to_owned(), 2)) .expect("partition 2 missing (zero-fill failed)"); assert_eq!(p2.pending, 0, "partition 2 pending"); assert_eq!(p2.delay, 0, "partition 2 delay"); diff --git a/src/store/traits.rs b/src/store/traits.rs index 350de738..4172b87f 100644 --- a/src/store/traits.rs +++ b/src/store/traits.rs @@ -133,12 +133,17 @@ pub trait ActivationStore: Send + Sync { }) } - /// Queue depths grouped by partition for upkeep gauges. The default - /// implementation returns the flat total under sentinel partition -1 for - /// stores that aren't partition-aware. - async fn count_depths_per_partition(&self) -> Result, Error> { + /// Queue depths grouped by (topic, partition) for upkeep gauges. The default + /// implementation returns the flat total under the default topic and sentinel + /// partition -1 for stores that aren't partition-aware. + async fn count_depths_per_partition( + &self, + ) -> Result, Error> { let total = self.count_depths().await?; - Ok(HashMap::from([(-1, total)])) + Ok(HashMap::from([( + (crate::config::DEFAULT_TOPIC.to_owned(), -1), + total, + )])) } /// Set the processing deadline for a specific activation diff --git a/src/upkeep.rs b/src/upkeep.rs index fab354bc..2840952c 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -46,7 +46,7 @@ pub async fn upkeep( let mut last_run = Instant::now(); let mut last_vacuum = Instant::now(); let mut last_backtrace_log = Instant::now(); - let mut emitted_partitions: HashSet = HashSet::new(); + let mut emitted_partitions: HashSet<(String, i32)> = HashSet::new(); loop { select! { _ = timer.tick() => { @@ -129,7 +129,7 @@ pub async fn do_upkeep( startup_time: DateTime, runtime_config_manager: Arc, last_vacuum: &mut Instant, - emitted_partitions: &mut HashSet, + emitted_partitions: &mut HashSet<(String, i32)>, ) -> UpkeepResults { let current_time = Utc::now(); let upkeep_start = Instant::now(); @@ -499,28 +499,30 @@ pub async fn do_upkeep( // without a partition filter still see the global total via tag sum. // Zero out gauges for partitions we emitted last cycle but no longer own. if let Ok(depths) = depth_counts { - let current: HashSet = depths.keys().copied().collect(); + let current: HashSet<(String, i32)> = depths.keys().cloned().collect(); - for partition in emitted_partitions.difference(¤t) { + for (topic, partition) in emitted_partitions.difference(¤t) { + let topic = topic.clone(); let partition = partition.to_string(); - metrics::gauge!("upkeep.current_pending_tasks", "partition" => partition.clone()) + metrics::gauge!("upkeep.current_pending_tasks", "topic" => topic.clone(), "partition" => partition.clone()) .set(0.0); - metrics::gauge!("upkeep.current_claimed_tasks", "partition" => partition.clone()) + metrics::gauge!("upkeep.current_claimed_tasks", "topic" => topic.clone(), "partition" => partition.clone()) .set(0.0); - metrics::gauge!("upkeep.current_processing_tasks", "partition" => partition.clone()) + metrics::gauge!("upkeep.current_processing_tasks", "topic" => topic.clone(), "partition" => partition.clone()) .set(0.0); - metrics::gauge!("upkeep.current_delayed_tasks", "partition" => partition).set(0.0); + metrics::gauge!("upkeep.current_delayed_tasks", "topic" => topic, "partition" => partition).set(0.0); } - for (partition, counts) in &depths { + for ((topic, partition), counts) in &depths { + let topic = topic.clone(); let partition = partition.to_string(); - metrics::gauge!("upkeep.current_pending_tasks", "partition" => partition.clone()) + metrics::gauge!("upkeep.current_pending_tasks", "topic" => topic.clone(), "partition" => partition.clone()) .set(counts.pending as f64); - metrics::gauge!("upkeep.current_claimed_tasks", "partition" => partition.clone()) + metrics::gauge!("upkeep.current_claimed_tasks", "topic" => topic.clone(), "partition" => partition.clone()) .set(counts.claimed as f64); - metrics::gauge!("upkeep.current_processing_tasks", "partition" => partition.clone()) + metrics::gauge!("upkeep.current_processing_tasks", "topic" => topic.clone(), "partition" => partition.clone()) .set(counts.processing as f64); - metrics::gauge!("upkeep.current_delayed_tasks", "partition" => partition) + metrics::gauge!("upkeep.current_delayed_tasks", "topic" => topic, "partition" => partition) .set(counts.delay as f64); } From 181f2fde2c7d82dcbb029c342efc3465a3b3180e Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 23 Jun 2026 14:01:46 +0200 Subject: [PATCH 03/10] add TopicPartition --- src/store/adapters/postgres.rs | 64 +++++++++++++++------------------- src/store/tests.rs | 7 ++-- src/store/traits.rs | 10 +++--- src/store/types.rs | 24 +++++++++++++ src/upkeep.rs | 19 +++++----- 5 files changed, 71 insertions(+), 53 deletions(-) diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index d0686db1..135af817 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -1,5 +1,5 @@ use std::borrow::Cow; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeSet, HashMap}; use std::str::FromStr; use std::sync::RwLock; use std::time::Instant; @@ -23,7 +23,7 @@ use crate::push::compute_claim_duration_ms; use crate::store::activation::{Activation, ActivationStatus}; use crate::store::retry::retry_query; use crate::store::traits::ActivationStore; -use crate::store::types::{BucketRange, DepthCounts, FailedTasksForwarder}; +use crate::store::types::{BucketRange, DepthCounts, FailedTasksForwarder, TopicPartition}; /// Run migrations. pub async fn migrate(config: &StoreConfig) -> Result<()> { @@ -248,9 +248,9 @@ pub struct PostgresStore { read_pool: PgPool, write_pool: PgPool, config: StoreConfig, - /// Partitions assigned to this broker, keyed by topic. Contention queries - /// filter by the (topic, partition) pairs across all assigned topics. - partitions: RwLock>>, + /// (topic, partition) pairs assigned to this broker. Contention queries + /// filter by these pairs across all assigned topics. + partitions: RwLock>, claim_duration_ms: u64, } @@ -302,7 +302,7 @@ impl PostgresStore { read_pool, write_pool, config: config.store.clone(), - partitions: RwLock::new(BTreeMap::new()), + partitions: RwLock::new(BTreeSet::new()), claim_duration_ms: compute_claim_duration_ms(config), }) } @@ -335,18 +335,16 @@ impl PostgresStore { query_builder.push(condition); query_builder.push(" ((topic, partition) IN ("); let mut first = true; - for (topic, topic_partitions) in partitions.iter() { - for partition in topic_partitions.iter() { - if !first { - query_builder.push(", "); - } - first = false; - query_builder.push("("); - query_builder.push_bind(topic.clone()); + for tp in partitions.iter() { + if !first { query_builder.push(", "); - query_builder.push_bind(*partition); - query_builder.push(")"); } + first = false; + query_builder.push("("); + query_builder.push_bind(tp.topic.clone()); + query_builder.push(", "); + query_builder.push_bind(tp.partition); + query_builder.push(")"); } query_builder.push(") OR added_at < "); let drain_cutoff = @@ -436,12 +434,13 @@ impl ActivationStore for PostgresStore { fn assign_partitions(&self, topic: &str, partitions: Vec) -> Result<(), Error> { let mut write_guard = self.partitions.write().unwrap(); - // Keep the map free of empty entries so `add_partition_condition` never - // emits an empty `IN ()`. An empty assignment clears the topic. - if partitions.is_empty() { - write_guard.remove(topic); - } else { - write_guard.insert(topic.to_owned(), partitions); + // Replace only this topic's pairs (see trait contract): drop the topic's + // existing entries, leaving other topics intact, then insert the new set. + // An empty `partitions` thus just clears the topic, keeping the set free + // of stale entries so `add_partition_condition` never emits `IN ()`. + write_guard.retain(|tp| tp.topic != topic); + for partition in partitions { + write_guard.insert(TopicPartition::new(topic, partition)); } Ok(()) } @@ -819,19 +818,14 @@ impl ActivationStore for PostgresStore { #[framed] async fn count_depths_per_partition( &self, - ) -> Result, Error> { + ) -> Result, Error> { // Per-owned-(topic, partition) gauge: intentionally scoped to owned // (topic, partition) pairs (no age-based drain escape), since reporting // depths for partitions this broker doesn't own would be meaningless. // Grouping by (topic, partition) keeps partitions with the same index // across different topics distinct in multi-topic mode. - let assigned: Vec<(String, i32)> = { - let partitions = self.partitions.read().unwrap(); - partitions - .iter() - .flat_map(|(topic, parts)| parts.iter().map(|p| (topic.clone(), *p))) - .collect() - }; + let assigned: Vec = + self.partitions.read().unwrap().iter().cloned().collect(); if assigned.is_empty() { return Ok(HashMap::new()); } @@ -845,15 +839,15 @@ impl ActivationStore for PostgresStore { FROM inflight_taskactivations WHERE (topic, partition) IN (", ); let mut first = true; - for (topic, partition) in &assigned { + for tp in &assigned { if !first { query_builder.push(", "); } first = false; query_builder.push("("); - query_builder.push_bind(topic.clone()); + query_builder.push_bind(tp.topic.clone()); query_builder.push(", "); - query_builder.push_bind(*partition); + query_builder.push_bind(tp.partition); query_builder.push(")"); } query_builder.push(") GROUP BY topic, partition"); @@ -863,11 +857,11 @@ impl ActivationStore for PostgresStore { .fetch_all(&self.read_pool) .await?; - let mut counts: HashMap<(String, i32), DepthCounts> = rows + let mut counts: HashMap = rows .into_iter() .map(|(topic, partition, pending, delay, claimed, processing)| { ( - (topic, partition), + TopicPartition::new(topic, partition), DepthCounts { pending: pending as usize, delay: delay as usize, diff --git a/src/store/tests.rs b/src/store/tests.rs index bfbaaf58..34422836 100644 --- a/src/store/tests.rs +++ b/src/store/tests.rs @@ -16,6 +16,7 @@ use crate::config::{Config, DEFAULT_TOPIC}; use crate::store::activation::{ActivationBuilder, ActivationStatus}; use crate::store::adapters::sqlite::{SqliteStore, create_sqlite_pool}; use crate::store::traits::ActivationStore; +use crate::store::types::TopicPartition; use crate::test_utils::{ StatusCount, TaskActivationBuilder, assert_counts, create_integration_config, create_test_store, generate_temp_filename, generate_unique_namespace, make_activations, @@ -200,7 +201,7 @@ async fn test_count_depths_per_partition_postgres() { let depths = store.count_depths_per_partition().await.unwrap(); let p0 = depths - .get(&(DEFAULT_TOPIC.to_owned(), 0)) + .get(&TopicPartition::new(DEFAULT_TOPIC, 0)) .expect("partition 0 missing"); assert_eq!(p0.pending, 2, "partition 0 pending"); assert_eq!(p0.processing, 1, "partition 0 processing"); @@ -208,7 +209,7 @@ async fn test_count_depths_per_partition_postgres() { assert_eq!(p0.claimed, 0, "partition 0 claimed"); let p1 = depths - .get(&(DEFAULT_TOPIC.to_owned(), 1)) + .get(&TopicPartition::new(DEFAULT_TOPIC, 1)) .expect("partition 1 missing"); assert_eq!(p1.pending, 0, "partition 1 pending"); assert_eq!(p1.delay, 1, "partition 1 delay"); @@ -217,7 +218,7 @@ async fn test_count_depths_per_partition_postgres() { // Zero-fill: partition 2 is assigned but has no rows. let p2 = depths - .get(&(DEFAULT_TOPIC.to_owned(), 2)) + .get(&TopicPartition::new(DEFAULT_TOPIC, 2)) .expect("partition 2 missing (zero-fill failed)"); assert_eq!(p2.pending, 0, "partition 2 pending"); assert_eq!(p2.delay, 0, "partition 2 delay"); diff --git a/src/store/traits.rs b/src/store/traits.rs index 4172b87f..7604a383 100644 --- a/src/store/traits.rs +++ b/src/store/traits.rs @@ -7,7 +7,7 @@ use tokio::join; use tracing::warn; use crate::store::activation::{Activation, ActivationStatus}; -use crate::store::types::{BucketRange, DepthCounts, FailedTasksForwarder}; +use crate::store::types::{BucketRange, DepthCounts, FailedTasksForwarder, TopicPartition}; #[async_trait] pub trait ActivationStore: Send + Sync { @@ -15,9 +15,7 @@ pub trait ActivationStore: Send + Sync { /// Store a batch of activations async fn store(&self, batch: &[Activation]) -> Result; - /// Record the partitions currently assigned to this broker for `topic`. - /// Contention queries filter by the (topic, partition) pairs across all - /// assigned topics. Passing an empty `partitions` clears the topic. + /// Replace the set of partitions this broker owns for `topic`. fn assign_partitions(&self, topic: &str, partitions: Vec) -> Result<(), Error>; /// Get `limit` pending activations, optionally filtered by namespaces and bucket subrange. @@ -138,10 +136,10 @@ pub trait ActivationStore: Send + Sync { /// partition -1 for stores that aren't partition-aware. async fn count_depths_per_partition( &self, - ) -> Result, Error> { + ) -> Result, Error> { let total = self.count_depths().await?; Ok(HashMap::from([( - (crate::config::DEFAULT_TOPIC.to_owned(), -1), + TopicPartition::new(crate::config::DEFAULT_TOPIC, -1), total, )])) } diff --git a/src/store/types.rs b/src/store/types.rs index 0f308fdb..6f596809 100644 --- a/src/store/types.rs +++ b/src/store/types.rs @@ -1,5 +1,29 @@ pub type BucketRange = (i16, i16); +/// A Kafka topic paired with one of its partition indices. Partition indices +/// overlap across topics, so contention filtering and per-partition gauges must +/// be keyed by the (topic, partition) pair rather than the partition alone. +#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct TopicPartition { + pub topic: String, + pub partition: i32, +} + +impl TopicPartition { + pub fn new(topic: impl Into, partition: i32) -> Self { + Self { + topic: topic.into(), + partition, + } + } +} + +impl std::fmt::Display for TopicPartition { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}/{}", self.topic, self.partition) + } +} + pub struct FailedTasksForwarder { pub to_discard: Vec<(String, Vec)>, pub to_deadletter: Vec<(String, Vec)>, diff --git a/src/upkeep.rs b/src/upkeep.rs index 2840952c..9f096000 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -21,6 +21,7 @@ use crate::SERVICE_NAME; use crate::config::Config; use crate::runtime_config::RuntimeConfigManager; use crate::store::traits::ActivationStore; +use crate::store::types::TopicPartition; /// The upkeep task that periodically performs upkeep /// on the activation store @@ -46,7 +47,7 @@ pub async fn upkeep( let mut last_run = Instant::now(); let mut last_vacuum = Instant::now(); let mut last_backtrace_log = Instant::now(); - let mut emitted_partitions: HashSet<(String, i32)> = HashSet::new(); + let mut emitted_partitions: HashSet = HashSet::new(); loop { select! { _ = timer.tick() => { @@ -129,7 +130,7 @@ pub async fn do_upkeep( startup_time: DateTime, runtime_config_manager: Arc, last_vacuum: &mut Instant, - emitted_partitions: &mut HashSet<(String, i32)>, + emitted_partitions: &mut HashSet, ) -> UpkeepResults { let current_time = Utc::now(); let upkeep_start = Instant::now(); @@ -499,11 +500,11 @@ pub async fn do_upkeep( // without a partition filter still see the global total via tag sum. // Zero out gauges for partitions we emitted last cycle but no longer own. if let Ok(depths) = depth_counts { - let current: HashSet<(String, i32)> = depths.keys().cloned().collect(); + let current: HashSet = depths.keys().cloned().collect(); - for (topic, partition) in emitted_partitions.difference(¤t) { - let topic = topic.clone(); - let partition = partition.to_string(); + for tp in emitted_partitions.difference(¤t) { + let topic = tp.topic.clone(); + let partition = tp.partition.to_string(); metrics::gauge!("upkeep.current_pending_tasks", "topic" => topic.clone(), "partition" => partition.clone()) .set(0.0); metrics::gauge!("upkeep.current_claimed_tasks", "topic" => topic.clone(), "partition" => partition.clone()) @@ -513,9 +514,9 @@ pub async fn do_upkeep( metrics::gauge!("upkeep.current_delayed_tasks", "topic" => topic, "partition" => partition).set(0.0); } - for ((topic, partition), counts) in &depths { - let topic = topic.clone(); - let partition = partition.to_string(); + for (tp, counts) in &depths { + let topic = tp.topic.clone(); + let partition = tp.partition.to_string(); metrics::gauge!("upkeep.current_pending_tasks", "topic" => topic.clone(), "partition" => partition.clone()) .set(counts.pending as f64); metrics::gauge!("upkeep.current_claimed_tasks", "topic" => topic.clone(), "partition" => partition.clone()) From 137b0ce9c5827822d26d61239c651477c1130df6 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 23 Jun 2026 14:04:51 +0200 Subject: [PATCH 04/10] trim down verbose comments --- src/config/store.rs | 7 +++---- src/store/activation.rs | 7 +++---- src/store/adapters/postgres.rs | 34 +++++++++++++--------------------- src/store/adapters/sqlite.rs | 9 +++------ 4 files changed, 22 insertions(+), 35 deletions(-) diff --git a/src/config/store.rs b/src/config/store.rs index f0cd73a8..bef22143 100644 --- a/src/config/store.rs +++ b/src/config/store.rs @@ -183,10 +183,9 @@ pub struct StoreConfig { pub processing_deadline_grace_sec: u64, /// Age-based contention drain threshold in seconds (postgres only). - /// Activations older than this are considered potentially orphaned and - /// bypass the (topic, partition) contention filter, so any broker can - /// claim and maintain them regardless of partition ownership. This drains - /// rows left behind by rebalances and topic/partition moves between pools. + /// Activations older than this bypass the (topic, partition) contention + /// filter so any broker can drain orphaned rows left by rebalances or + /// topic/partition moves. pub contention_drain_age_sec: u64, } diff --git a/src/store/activation.rs b/src/store/activation.rs index 95290bab..c15bdbef 100644 --- a/src/store/activation.rs +++ b/src/store/activation.rs @@ -104,10 +104,9 @@ pub struct Activation { #[builder(default = ActivationStatus::Pending)] pub status: ActivationStatus, - /// The Kafka topic the activation was received from. Used together with - /// `partition` to scope contention filtering, since partition indices - /// overlap across topics. Defaults to [`crate::config::DEFAULT_TOPIC`] when - /// not built from a Kafka message (e.g. in tests). + /// The Kafka topic the activation was received from. Paired with `partition` + /// to scope contention filtering, since partition indices overlap across + /// topics. Defaults to [`crate::config::DEFAULT_TOPIC`] in tests. #[builder(setter(into), default = crate::config::DEFAULT_TOPIC.to_owned())] pub topic: String, diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index 135af817..6720de7b 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -307,19 +307,14 @@ impl PostgresStore { }) } - /// Add the contention condition to the query builder in a thread-safe manner. + /// Restrict a query to the (topic, partition) pairs this broker owns, OR-ing + /// in an age-based escape: rows older than `contention_drain_age_sec` bypass + /// the filter so any broker can drain orphaned activations (left by rebalances + /// or topic/partition moves). This only affects contention, not correctness — + /// claims use `FOR UPDATE SKIP LOCKED`. /// - /// Limits a query to the rows this broker is responsible for, by the - /// (topic, partition) pairs assigned to it. An age-based escape hatch is - /// OR-ed in: rows older than `contention_drain_age_sec` bypass the filter so - /// that orphaned activations (left behind by rebalances or topic/partition - /// moves between pools) drain automatically — any broker can claim and - /// maintain them. `topic`/`partition` aren't required for correctness (the - /// status update is atomic and claims use `FOR UPDATE SKIP LOCKED`), so the - /// escape only ever causes brief, bounded extra contention, never data loss. - /// - /// When no partitions are assigned (e.g. before the first rebalance) no - /// condition is added and the query sees the whole table, as before. + /// With no partitions assigned (e.g. before the first rebalance) the query is + /// left unfiltered. fn add_partition_condition( &self, query_builder: &mut QueryBuilder, @@ -434,10 +429,8 @@ impl ActivationStore for PostgresStore { fn assign_partitions(&self, topic: &str, partitions: Vec) -> Result<(), Error> { let mut write_guard = self.partitions.write().unwrap(); - // Replace only this topic's pairs (see trait contract): drop the topic's - // existing entries, leaving other topics intact, then insert the new set. - // An empty `partitions` thus just clears the topic, keeping the set free - // of stale entries so `add_partition_condition` never emits `IN ()`. + // Replace only this topic's pairs, leaving other topics intact. An empty + // `partitions` thus just clears the topic. write_guard.retain(|tp| tp.topic != topic); for partition in partitions { write_guard.insert(TopicPartition::new(topic, partition)); @@ -819,11 +812,10 @@ impl ActivationStore for PostgresStore { async fn count_depths_per_partition( &self, ) -> Result, Error> { - // Per-owned-(topic, partition) gauge: intentionally scoped to owned - // (topic, partition) pairs (no age-based drain escape), since reporting - // depths for partitions this broker doesn't own would be meaningless. - // Grouping by (topic, partition) keeps partitions with the same index - // across different topics distinct in multi-topic mode. + // Per-owned-(topic, partition) gauge: scoped to owned pairs only (no drain + // escape) — depths for partitions this broker doesn't own would be + // meaningless. Grouping by (topic, partition) keeps same-index partitions + // from different topics distinct. let assigned: Vec = self.partitions.read().unwrap().iter().cloned().collect(); if assigned.is_empty() { diff --git a/src/store/adapters/sqlite.rs b/src/store/adapters/sqlite.rs index 332fd0fb..3560b23c 100644 --- a/src/store/adapters/sqlite.rs +++ b/src/store/adapters/sqlite.rs @@ -96,13 +96,10 @@ impl From> for Activation { id: value.id.into_owned(), activation: value.activation.into_owned(), status: ActivationStatus::from_str(&value.status).unwrap(), - // The `topic` column only exists on postgres, where it scopes - // contention filtering across topics that share partition indices. // sqlite owns its whole DB and never filters by topic, so it has no - // contention problem to solve and the column buys us nothing. This is - // a deliberate choice to leave the sqlite schema untouched for now - // rather than a limitation: we default to DEFAULT_TOPIC on read-back - // so the shared `Activation` struct is satisfied without a migration. + // contention problem and stores no `topic` column. We deliberately + // leave the sqlite schema untouched and default to DEFAULT_TOPIC on + // read-back. topic: crate::config::DEFAULT_TOPIC.to_owned(), partition: value.partition, offset: value.offset, From 1c5cc2052ff86b7d3d790f6832273060eab0693b Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 23 Jun 2026 14:20:52 +0200 Subject: [PATCH 05/10] trim down comments --- .../postgres/0004_add_topic_age_contention.sql | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/migrations/postgres/0004_add_topic_age_contention.sql b/migrations/postgres/0004_add_topic_age_contention.sql index 11239960..5ff1574d 100644 --- a/migrations/postgres/0004_add_topic_age_contention.sql +++ b/migrations/postgres/0004_add_topic_age_contention.sql @@ -1,17 +1,11 @@ --- Multi-topic + age-based contention management (STREAM-1205). --- --- A `topic` column distinguishes activations across topics so that partition --- indices, which overlap between topics, no longer collide. Contention queries --- filter by (topic, partition) instead of partition alone. ALTER TABLE inflight_taskactivations ADD COLUMN topic TEXT NOT NULL DEFAULT ''; --- Serves the contention branch of the claim/upkeep queries: --- (topic, partition) IN (...) AND bucket BETWEEN ... -CREATE INDEX idx_topic_partition_bucket - ON inflight_taskactivations (topic, partition, bucket); +-- Replaces idx_activation_partition (partition), which a followup drops. +CREATE INDEX idx_topic_partition + ON inflight_taskactivations (topic, partition); --- Serves the age-based drain branch (added_at < threshold) that lets any broker --- claim/maintain orphaned rows regardless of partition ownership. Non-partial so --- it covers the drain branch for every status, not just Pending. +-- Age-based drain branch (added_at < threshold). +-- SQLite has an equivalent in its (status, added_at, ...) index, but we want +-- to keep this index separate from status as added_at is immutable. CREATE INDEX idx_added_at ON inflight_taskactivations (added_at); From b946e5df0ed0b5b0d823d4da7e902e5a4a881963 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 23 Jun 2026 14:35:08 +0200 Subject: [PATCH 06/10] add comment --- migrations/postgres/0004_add_topic_age_contention.sql | 2 ++ 1 file changed, 2 insertions(+) diff --git a/migrations/postgres/0004_add_topic_age_contention.sql b/migrations/postgres/0004_add_topic_age_contention.sql index 5ff1574d..3eabcbb1 100644 --- a/migrations/postgres/0004_add_topic_age_contention.sql +++ b/migrations/postgres/0004_add_topic_age_contention.sql @@ -1,5 +1,7 @@ ALTER TABLE inflight_taskactivations ADD COLUMN topic TEXT NOT NULL DEFAULT ''; +-- Contention filter for the upkeep queries, which filter by status + +-- (topic,partition) and have no ORDER BY, so they bitmap-scan this index. -- Replaces idx_activation_partition (partition), which a followup drops. CREATE INDEX idx_topic_partition ON inflight_taskactivations (topic, partition); From b4ee020ffb98104c8d67695afecf314fbfd60722 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 23 Jun 2026 15:02:10 +0200 Subject: [PATCH 07/10] use tpl for assign_partitions --- src/kafka/consumer.rs | 63 ++++++++++++++++++++++++++----------------- 1 file changed, 39 insertions(+), 24 deletions(-) diff --git a/src/kafka/consumer.rs b/src/kafka/consumer.rs index c4e0e001..baaf4506 100644 --- a/src/kafka/consumer.rs +++ b/src/kafka/consumer.rs @@ -41,10 +41,10 @@ pub async fn start_consumer( ) -> Result<(), Error> { let (client_shutdown_sender, client_shutdown_receiver) = oneshot::channel(); let (event_sender, event_receiver) = unbounded_channel(); - // Each consumer subscribes to a single topic; join defensively in case that - // ever changes. Used as the `topic` tag on this consumer's metrics. - let topic = topics.join(","); - let context = KafkaContext::new(event_sender.clone(), topic.clone()); + // Metrics tag only. Ownership keys come per-topic from the assignment `tpl`, + // never from this string — so a multi-topic consumer stays correct. + let topics_tag = topics.join(","); + let context = KafkaContext::new(event_sender.clone(), topics_tag.clone()); let consumer: Arc> = Arc::new( kafka_client_config .create_with_context(context) @@ -57,14 +57,14 @@ pub async fn start_consumer( handle_shutdown_signals(event_sender.clone()); poll_consumer_client(consumer.clone(), client_shutdown_receiver); - metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topic.clone()).set(0); + metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topics_tag.clone()).set(0); handle_events( consumer, event_receiver, activation_store, client_shutdown_sender, spawn_actors, - topic, + topics_tag, ) .await } @@ -114,14 +114,14 @@ pub fn poll_consumer_client( pub struct KafkaContext { event_sender: UnboundedSender<(Event, SyncSender<()>)>, /// The topic(s) this consumer is subscribed to, used as a metric tag. - topic: String, + topics_tag: String, } impl KafkaContext { - pub fn new(event_sender: UnboundedSender<(Event, SyncSender<()>)>, topic: String) -> Self { + pub fn new(event_sender: UnboundedSender<(Event, SyncSender<()>)>, topics_tag: String) -> Self { Self { event_sender, - topic, + topics_tag, } } } @@ -151,7 +151,7 @@ impl ConsumerContext for KafkaContext { info!("Rendezvous complete"); metrics::counter!( "arroyo.consumer.partitions_assigned.count", - "topic" => self.topic.clone(), + "topic" => self.topics_tag.clone(), ) .increment(tpl.count() as u64); } @@ -173,7 +173,7 @@ impl ConsumerContext for KafkaContext { info!("Rendezvous complete"); metrics::counter!( "arroyo.consumer.partitions_revoked.count", - "topic" => self.topic.clone(), + "topic" => self.topics_tag.clone(), ) .increment(tpl.count() as u64); } @@ -352,7 +352,7 @@ pub async fn handle_events( Arc>, &BTreeSet<(String, i32)>, ) -> ActorHandles, - topic: String, + topics_tag: String, ) -> Result<(), anyhow::Error> { const CALLBACK_DURATION: Duration = Duration::from_secs(4); @@ -379,14 +379,9 @@ pub async fn handle_events( info!("Received event: {:?}", event); state = match (state, event) { (ConsumerState::Ready, Event::Assign(tpl)) => { - metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topic.clone()) + metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topics_tag.clone()) .set(tpl.len() as f64); - // Note: This assumes we only process one topic per consumer. - let mut partitions = Vec::::new(); - for (_, partition) in tpl.iter() { - partitions.push(*partition); - } - activation_store.assign_partitions(&topic, partitions).unwrap(); + assign_from_tpl(activation_store.as_ref(), &tpl); ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl) } (ConsumerState::Ready, Event::Revoke(_)) => { @@ -401,17 +396,17 @@ pub async fn handle_events( tpl == revoked, "Revoked TPL should be equal to the subset of TPL we're consuming from" ); - activation_store.assign_partitions(&topic, vec![]).unwrap(); + release_from_tpl(activation_store.as_ref(), &revoked); handles.shutdown(CALLBACK_DURATION).await; - metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topic.clone()).set(0); + metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topics_tag.clone()).set(0); ConsumerState::Ready } - (ConsumerState::Consuming(handles, _), Event::Shutdown) => { - activation_store.assign_partitions(&topic, vec![]).unwrap(); + (ConsumerState::Consuming(handles, tpl), Event::Shutdown) => { + release_from_tpl(activation_store.as_ref(), &tpl); handles.shutdown(CALLBACK_DURATION).await; debug!("Signaling shutdown to client..."); shutdown_client.take(); - metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topic.clone()).set(0); + metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topics_tag.clone()).set(0); ConsumerState::Stopped } (ConsumerState::Stopped, _) => { @@ -425,6 +420,26 @@ pub async fn handle_events( Ok(()) } +/// Push a partition assignment into the store, grouped by topic. Each topic in +/// `tpl` replaces that topic's owned partitions; topics absent from `tpl` are +/// left untouched (so sibling consumers sharing the store aren't disturbed). +fn assign_from_tpl(store: &dyn ActivationStore, tpl: &BTreeSet<(String, i32)>) { + let mut by_topic: HashMap<&str, Vec> = HashMap::new(); + for (topic, partition) in tpl { + by_topic.entry(topic.as_str()).or_default().push(*partition); + } + for (topic, partitions) in by_topic { + store.assign_partitions(topic, partitions).unwrap(); + } +} + +/// Release every topic present in `tpl` from the store (assign no partitions). +fn release_from_tpl(store: &dyn ActivationStore, tpl: &BTreeSet<(String, i32)>) { + for topic in tpl.iter().map(|(t, _)| t.as_str()).collect::>() { + store.assign_partitions(topic, vec![]).unwrap(); + } +} + pub trait KafkaMessage { fn detach(&self) -> Result; } From 20a0ceea8238a0f028f96830ff4e70c62ec4e8bb Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 23 Jun 2026 15:11:58 +0200 Subject: [PATCH 08/10] refactor store.assign_partitions for simplicity --- src/fetch/tests.rs | 14 ++++++++++++-- src/kafka/consumer.rs | 33 ++++++++++----------------------- src/push/tests.rs | 14 ++++++++++++-- src/store/adapters/postgres.rs | 21 +++++++++++++++------ src/store/adapters/sqlite.rs | 18 +++++++++++++++--- src/store/tests.rs | 34 ++++++++++++++++++++++++++-------- src/store/traits.rs | 14 ++++++++++++-- src/store/types.rs | 6 ++++++ src/test_utils.rs | 5 ++++- 9 files changed, 112 insertions(+), 47 deletions(-) diff --git a/src/fetch/tests.rs b/src/fetch/tests.rs index e32686d7..019abd2b 100644 --- a/src/fetch/tests.rs +++ b/src/fetch/tests.rs @@ -10,7 +10,7 @@ use crate::config::Config; use crate::config::fetch::FetchConfig; use crate::store::activation::{Activation, ActivationStatus}; use crate::store::traits::ActivationStore; -use crate::store::types::{BucketRange, FailedTasksForwarder}; +use crate::store::types::{BucketRange, FailedTasksForwarder, TopicPartition}; use crate::test_utils::make_activations; use super::*; @@ -49,7 +49,17 @@ impl MockStore { #[async_trait] impl ActivationStore for MockStore { - fn assign_partitions(&self, _topic: &str, _partitions: Vec) -> Result<(), Error> { + fn assign_partitions( + &self, + _partitions: &mut dyn Iterator, + ) -> Result<(), Error> { + Ok(()) + } + + fn revoke_partitions( + &self, + _partitions: &mut dyn Iterator, + ) -> Result<(), Error> { Ok(()) } diff --git a/src/kafka/consumer.rs b/src/kafka/consumer.rs index baaf4506..b26d6dcb 100644 --- a/src/kafka/consumer.rs +++ b/src/kafka/consumer.rs @@ -29,6 +29,7 @@ use futures::{Stream, StreamExt, future, pin_mut}; use tracing::{debug, error, info, instrument, warn}; use crate::store::traits::ActivationStore; +use crate::store::types::TopicPartition; pub async fn start_consumer( topics: &[&str], @@ -381,7 +382,9 @@ pub async fn handle_events( (ConsumerState::Ready, Event::Assign(tpl)) => { metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topics_tag.clone()) .set(tpl.len() as f64); - assign_from_tpl(activation_store.as_ref(), &tpl); + activation_store + .assign_partitions(&mut tpl.iter().map(TopicPartition::from)) + .unwrap(); ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl) } (ConsumerState::Ready, Event::Revoke(_)) => { @@ -396,13 +399,17 @@ pub async fn handle_events( tpl == revoked, "Revoked TPL should be equal to the subset of TPL we're consuming from" ); - release_from_tpl(activation_store.as_ref(), &revoked); + activation_store + .revoke_partitions(&mut revoked.iter().map(TopicPartition::from)) + .unwrap(); handles.shutdown(CALLBACK_DURATION).await; metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topics_tag.clone()).set(0); ConsumerState::Ready } (ConsumerState::Consuming(handles, tpl), Event::Shutdown) => { - release_from_tpl(activation_store.as_ref(), &tpl); + activation_store + .revoke_partitions(&mut tpl.iter().map(TopicPartition::from)) + .unwrap(); handles.shutdown(CALLBACK_DURATION).await; debug!("Signaling shutdown to client..."); shutdown_client.take(); @@ -420,26 +427,6 @@ pub async fn handle_events( Ok(()) } -/// Push a partition assignment into the store, grouped by topic. Each topic in -/// `tpl` replaces that topic's owned partitions; topics absent from `tpl` are -/// left untouched (so sibling consumers sharing the store aren't disturbed). -fn assign_from_tpl(store: &dyn ActivationStore, tpl: &BTreeSet<(String, i32)>) { - let mut by_topic: HashMap<&str, Vec> = HashMap::new(); - for (topic, partition) in tpl { - by_topic.entry(topic.as_str()).or_default().push(*partition); - } - for (topic, partitions) in by_topic { - store.assign_partitions(topic, partitions).unwrap(); - } -} - -/// Release every topic present in `tpl` from the store (assign no partitions). -fn release_from_tpl(store: &dyn ActivationStore, tpl: &BTreeSet<(String, i32)>) { - for topic in tpl.iter().map(|(t, _)| t.as_str()).collect::>() { - store.assign_partitions(topic, vec![]).unwrap(); - } -} - pub trait KafkaMessage { fn detach(&self) -> Result; } diff --git a/src/push/tests.rs b/src/push/tests.rs index e7455c3c..18506606 100644 --- a/src/push/tests.rs +++ b/src/push/tests.rs @@ -12,7 +12,7 @@ use crate::config::push::{PushConfig, PushQueueConfig}; use crate::push::updater::test_eager_updater; use crate::store::activation::{Activation, ActivationStatus}; use crate::store::traits::ActivationStore; -use crate::store::types::FailedTasksForwarder; +use crate::store::types::{FailedTasksForwarder, TopicPartition}; use crate::test_utils::make_activations; use crate::worker::test_worker_map; @@ -36,7 +36,17 @@ impl ActivationStore for MockStore { Ok(0) } - fn assign_partitions(&self, _topic: &str, _partitions: Vec) -> Result<()> { + fn assign_partitions( + &self, + _partitions: &mut dyn Iterator, + ) -> Result<()> { + Ok(()) + } + + fn revoke_partitions( + &self, + _partitions: &mut dyn Iterator, + ) -> Result<()> { Ok(()) } diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index 6720de7b..95ab9b34 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -427,13 +427,22 @@ impl ActivationStore for PostgresStore { Ok(Some(row.into())) } - fn assign_partitions(&self, topic: &str, partitions: Vec) -> Result<(), Error> { + fn assign_partitions( + &self, + partitions: &mut dyn Iterator, + ) -> Result<(), Error> { + let mut write_guard = self.partitions.write().unwrap(); + write_guard.extend(partitions); + Ok(()) + } + + fn revoke_partitions( + &self, + partitions: &mut dyn Iterator, + ) -> Result<(), Error> { let mut write_guard = self.partitions.write().unwrap(); - // Replace only this topic's pairs, leaving other topics intact. An empty - // `partitions` thus just clears the topic. - write_guard.retain(|tp| tp.topic != topic); - for partition in partitions { - write_guard.insert(TopicPartition::new(topic, partition)); + for tp in partitions { + write_guard.remove(&tp); } Ok(()) } diff --git a/src/store/adapters/sqlite.rs b/src/store/adapters/sqlite.rs index 3560b23c..89453889 100644 --- a/src/store/adapters/sqlite.rs +++ b/src/store/adapters/sqlite.rs @@ -30,7 +30,7 @@ use crate::config::store::StoreConfig; use crate::push::compute_claim_duration_ms; use crate::store::activation::{Activation, ActivationStatus}; use crate::store::traits::ActivationStore; -use crate::store::types::{BucketRange, FailedTasksForwarder}; +use crate::store::types::{BucketRange, FailedTasksForwarder, TopicPartition}; /// Database representation of an [`Activation`], used for both reads and /// writes. @@ -465,10 +465,22 @@ impl ActivationStore for SqliteStore { Ok(Some(row.into())) } - fn assign_partitions(&self, topic: &str, partitions: Vec) -> Result<(), Error> { + fn assign_partitions( + &self, + partitions: &mut dyn Iterator, + ) -> Result<(), Error> { // sqlite owns its whole DB regardless of partition assignment, so this // is a no-op. Fires once per consumer, hence debug rather than warn. - debug!("assign_partitions: {topic} {partitions:?}"); + debug!("assign_partitions: {:?}", partitions.collect::>()); + Ok(()) + } + + fn revoke_partitions( + &self, + partitions: &mut dyn Iterator, + ) -> Result<(), Error> { + // No-op for the same reason as `assign_partitions`. + debug!("revoke_partitions: {:?}", partitions.collect::>()); Ok(()) } diff --git a/src/store/tests.rs b/src/store/tests.rs index 34422836..25034002 100644 --- a/src/store/tests.rs +++ b/src/store/tests.rs @@ -161,7 +161,11 @@ async fn test_count_depths_per_partition_postgres() { // Assign three partitions; partition 2 will have no activations and must // appear in the result with zero counts (zero-fill behavior). store - .assign_partitions(DEFAULT_TOPIC, vec![0, 1, 2]) + .assign_partitions( + &mut [0, 1, 2] + .into_iter() + .map(|p| TopicPartition::new(DEFAULT_TOPIC, p)), + ) .unwrap(); let namespace = generate_unique_namespace(); @@ -237,8 +241,12 @@ async fn test_multi_topic_partition_scoping_postgres() { let store = create_test_store("postgres").await; // Replace the default assignment from `create_test_store` with topic-a only. - store.assign_partitions(DEFAULT_TOPIC, vec![]).unwrap(); - store.assign_partitions("topic-a", vec![0]).unwrap(); + store + .revoke_partitions(&mut std::iter::once(TopicPartition::new(DEFAULT_TOPIC, 0))) + .unwrap(); + store + .assign_partitions(&mut std::iter::once(TopicPartition::new("topic-a", 0))) + .unwrap(); let namespace = generate_unique_namespace(); let now = Utc::now(); @@ -270,7 +278,9 @@ async fn test_multi_topic_partition_scoping_postgres() { assert_eq!(claimed[0].id, "a0"); // After also owning topic-b, "b0" becomes claimable. - store.assign_partitions("topic-b", vec![0]).unwrap(); + store + .assign_partitions(&mut std::iter::once(TopicPartition::new("topic-b", 0))) + .unwrap(); let claimed = store .claim_activations_for_push(Some(10), None) .await @@ -288,8 +298,12 @@ async fn test_multi_topic_partition_scoping_postgres() { async fn test_age_based_drain_claims_orphan_postgres() { let store = create_test_store("postgres").await; // Owns an unrelated topic/partition, so neither row matches by ownership. - store.assign_partitions(DEFAULT_TOPIC, vec![]).unwrap(); - store.assign_partitions("owned-topic", vec![0]).unwrap(); + store + .revoke_partitions(&mut std::iter::once(TopicPartition::new(DEFAULT_TOPIC, 0))) + .unwrap(); + store + .assign_partitions(&mut std::iter::once(TopicPartition::new("owned-topic", 0))) + .unwrap(); let namespace = generate_unique_namespace(); let now = Utc::now(); @@ -330,8 +344,12 @@ async fn test_age_based_drain_claims_orphan_postgres() { #[tokio::test] async fn test_age_based_drain_upkeep_postgres() { let store = create_test_store("postgres").await; - store.assign_partitions(DEFAULT_TOPIC, vec![]).unwrap(); - store.assign_partitions("owned-topic", vec![0]).unwrap(); + store + .revoke_partitions(&mut std::iter::once(TopicPartition::new(DEFAULT_TOPIC, 0))) + .unwrap(); + store + .assign_partitions(&mut std::iter::once(TopicPartition::new("owned-topic", 0))) + .unwrap(); let namespace = generate_unique_namespace(); let now = Utc::now(); diff --git a/src/store/traits.rs b/src/store/traits.rs index 7604a383..55c16247 100644 --- a/src/store/traits.rs +++ b/src/store/traits.rs @@ -15,8 +15,18 @@ pub trait ActivationStore: Send + Sync { /// Store a batch of activations async fn store(&self, batch: &[Activation]) -> Result; - /// Replace the set of partitions this broker owns for `topic`. - fn assign_partitions(&self, topic: &str, partitions: Vec) -> Result<(), Error>; + /// Add the given (topic, partition) pairs to the set this broker owns. Pairs + /// accumulate across topics and consumers sharing the store. + fn assign_partitions( + &self, + partitions: &mut dyn Iterator, + ) -> Result<(), Error>; + + /// Remove the given (topic, partition) pairs from the set this broker owns. + fn revoke_partitions( + &self, + partitions: &mut dyn Iterator, + ) -> Result<(), Error>; /// Get `limit` pending activations, optionally filtered by namespaces and bucket subrange. /// If `mark_activation_processing` is true, sets status to `Processing` and `processing_deadline`; otherwise `Claimed` and `claim_expires_at`. diff --git a/src/store/types.rs b/src/store/types.rs index 6f596809..7a490cdb 100644 --- a/src/store/types.rs +++ b/src/store/types.rs @@ -24,6 +24,12 @@ impl std::fmt::Display for TopicPartition { } } +impl From<&(String, i32)> for TopicPartition { + fn from((topic, partition): &(String, i32)) -> Self { + Self::new(topic.clone(), *partition) + } +} + pub struct FailedTasksForwarder { pub to_discard: Vec<(String, Vec)>, pub to_deadletter: Vec<(String, Vec)>, diff --git a/src/test_utils.rs b/src/test_utils.rs index 49fdb12c..9a577aea 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -22,6 +22,7 @@ use crate::store::activation::{Activation, ActivationBuilder, ActivationStatus}; use crate::store::adapters::postgres::{self, PostgresStore}; use crate::store::adapters::sqlite::SqliteStore; use crate::store::traits::ActivationStore; +use crate::store::types::TopicPartition; /// msgpack encoding of an empty map (`{}`), used as default task parameters in tests. pub const EMPTY_MSGPACK_MAP: &[u8] = &[0x80]; @@ -286,7 +287,9 @@ pub async fn create_test_store(adapter: &str) -> Arc { let store = Arc::new(PostgresStore::new(&config).await.unwrap()) as Arc; - store.assign_partitions(DEFAULT_TOPIC, vec![0]).unwrap(); + store + .assign_partitions(&mut std::iter::once(TopicPartition::new(DEFAULT_TOPIC, 0))) + .unwrap(); store } _ => panic!("Invalid adapter: {}", adapter), From 571b31c876d86bc673a437322da625cc220ea07a Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 23 Jun 2026 15:28:25 +0200 Subject: [PATCH 09/10] remove result return value from assign_partitions --- src/fetch/tests.rs | 14 ++---------- src/kafka/consumer.rs | 12 +++------- src/push/tests.rs | 14 ++---------- src/store/adapters/postgres.rs | 15 +++---------- src/store/adapters/sqlite.rs | 12 ++-------- src/store/tests.rs | 40 ++++++++++------------------------ src/store/traits.rs | 10 ++------- src/test_utils.rs | 4 +--- 8 files changed, 27 insertions(+), 94 deletions(-) diff --git a/src/fetch/tests.rs b/src/fetch/tests.rs index 019abd2b..c8aa11cc 100644 --- a/src/fetch/tests.rs +++ b/src/fetch/tests.rs @@ -49,19 +49,9 @@ impl MockStore { #[async_trait] impl ActivationStore for MockStore { - fn assign_partitions( - &self, - _partitions: &mut dyn Iterator, - ) -> Result<(), Error> { - Ok(()) - } + fn assign_partitions(&self, _partitions: &mut dyn Iterator) {} - fn revoke_partitions( - &self, - _partitions: &mut dyn Iterator, - ) -> Result<(), Error> { - Ok(()) - } + fn revoke_partitions(&self, _partitions: &mut dyn Iterator) {} async fn vacuum_db(&self) -> Result<(), Error> { unimplemented!() diff --git a/src/kafka/consumer.rs b/src/kafka/consumer.rs index b26d6dcb..6ea4350d 100644 --- a/src/kafka/consumer.rs +++ b/src/kafka/consumer.rs @@ -382,9 +382,7 @@ pub async fn handle_events( (ConsumerState::Ready, Event::Assign(tpl)) => { metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topics_tag.clone()) .set(tpl.len() as f64); - activation_store - .assign_partitions(&mut tpl.iter().map(TopicPartition::from)) - .unwrap(); + activation_store.assign_partitions(&mut tpl.iter().map(TopicPartition::from)); ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl) } (ConsumerState::Ready, Event::Revoke(_)) => { @@ -399,17 +397,13 @@ pub async fn handle_events( tpl == revoked, "Revoked TPL should be equal to the subset of TPL we're consuming from" ); - activation_store - .revoke_partitions(&mut revoked.iter().map(TopicPartition::from)) - .unwrap(); + activation_store.revoke_partitions(&mut revoked.iter().map(TopicPartition::from)); handles.shutdown(CALLBACK_DURATION).await; metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topics_tag.clone()).set(0); ConsumerState::Ready } (ConsumerState::Consuming(handles, tpl), Event::Shutdown) => { - activation_store - .revoke_partitions(&mut tpl.iter().map(TopicPartition::from)) - .unwrap(); + activation_store.revoke_partitions(&mut tpl.iter().map(TopicPartition::from)); handles.shutdown(CALLBACK_DURATION).await; debug!("Signaling shutdown to client..."); shutdown_client.take(); diff --git a/src/push/tests.rs b/src/push/tests.rs index 18506606..b7f710aa 100644 --- a/src/push/tests.rs +++ b/src/push/tests.rs @@ -36,19 +36,9 @@ impl ActivationStore for MockStore { Ok(0) } - fn assign_partitions( - &self, - _partitions: &mut dyn Iterator, - ) -> Result<()> { - Ok(()) - } + fn assign_partitions(&self, _partitions: &mut dyn Iterator) {} - fn revoke_partitions( - &self, - _partitions: &mut dyn Iterator, - ) -> Result<()> { - Ok(()) - } + fn revoke_partitions(&self, _partitions: &mut dyn Iterator) {} async fn claim_activations( &self, diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index 95ab9b34..71aa5e59 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -427,24 +427,15 @@ impl ActivationStore for PostgresStore { Ok(Some(row.into())) } - fn assign_partitions( - &self, - partitions: &mut dyn Iterator, - ) -> Result<(), Error> { - let mut write_guard = self.partitions.write().unwrap(); - write_guard.extend(partitions); - Ok(()) + fn assign_partitions(&self, partitions: &mut dyn Iterator) { + self.partitions.write().unwrap().extend(partitions); } - fn revoke_partitions( - &self, - partitions: &mut dyn Iterator, - ) -> Result<(), Error> { + fn revoke_partitions(&self, partitions: &mut dyn Iterator) { let mut write_guard = self.partitions.write().unwrap(); for tp in partitions { write_guard.remove(&tp); } - Ok(()) } #[instrument(skip_all)] diff --git a/src/store/adapters/sqlite.rs b/src/store/adapters/sqlite.rs index 89453889..605739e6 100644 --- a/src/store/adapters/sqlite.rs +++ b/src/store/adapters/sqlite.rs @@ -465,23 +465,15 @@ impl ActivationStore for SqliteStore { Ok(Some(row.into())) } - fn assign_partitions( - &self, - partitions: &mut dyn Iterator, - ) -> Result<(), Error> { + fn assign_partitions(&self, partitions: &mut dyn Iterator) { // sqlite owns its whole DB regardless of partition assignment, so this // is a no-op. Fires once per consumer, hence debug rather than warn. debug!("assign_partitions: {:?}", partitions.collect::>()); - Ok(()) } - fn revoke_partitions( - &self, - partitions: &mut dyn Iterator, - ) -> Result<(), Error> { + fn revoke_partitions(&self, partitions: &mut dyn Iterator) { // No-op for the same reason as `assign_partitions`. debug!("revoke_partitions: {:?}", partitions.collect::>()); - Ok(()) } #[instrument(skip_all)] diff --git a/src/store/tests.rs b/src/store/tests.rs index 25034002..07b1d0fb 100644 --- a/src/store/tests.rs +++ b/src/store/tests.rs @@ -160,13 +160,11 @@ async fn test_count_depths_per_partition_postgres() { // Assign three partitions; partition 2 will have no activations and must // appear in the result with zero counts (zero-fill behavior). - store - .assign_partitions( - &mut [0, 1, 2] - .into_iter() - .map(|p| TopicPartition::new(DEFAULT_TOPIC, p)), - ) - .unwrap(); + store.assign_partitions( + &mut [0, 1, 2] + .into_iter() + .map(|p| TopicPartition::new(DEFAULT_TOPIC, p)), + ); let namespace = generate_unique_namespace(); let now = Utc::now(); @@ -241,12 +239,8 @@ async fn test_multi_topic_partition_scoping_postgres() { let store = create_test_store("postgres").await; // Replace the default assignment from `create_test_store` with topic-a only. - store - .revoke_partitions(&mut std::iter::once(TopicPartition::new(DEFAULT_TOPIC, 0))) - .unwrap(); - store - .assign_partitions(&mut std::iter::once(TopicPartition::new("topic-a", 0))) - .unwrap(); + store.revoke_partitions(&mut std::iter::once(TopicPartition::new(DEFAULT_TOPIC, 0))); + store.assign_partitions(&mut std::iter::once(TopicPartition::new("topic-a", 0))); let namespace = generate_unique_namespace(); let now = Utc::now(); @@ -278,9 +272,7 @@ async fn test_multi_topic_partition_scoping_postgres() { assert_eq!(claimed[0].id, "a0"); // After also owning topic-b, "b0" becomes claimable. - store - .assign_partitions(&mut std::iter::once(TopicPartition::new("topic-b", 0))) - .unwrap(); + store.assign_partitions(&mut std::iter::once(TopicPartition::new("topic-b", 0))); let claimed = store .claim_activations_for_push(Some(10), None) .await @@ -298,12 +290,8 @@ async fn test_multi_topic_partition_scoping_postgres() { async fn test_age_based_drain_claims_orphan_postgres() { let store = create_test_store("postgres").await; // Owns an unrelated topic/partition, so neither row matches by ownership. - store - .revoke_partitions(&mut std::iter::once(TopicPartition::new(DEFAULT_TOPIC, 0))) - .unwrap(); - store - .assign_partitions(&mut std::iter::once(TopicPartition::new("owned-topic", 0))) - .unwrap(); + store.revoke_partitions(&mut std::iter::once(TopicPartition::new(DEFAULT_TOPIC, 0))); + store.assign_partitions(&mut std::iter::once(TopicPartition::new("owned-topic", 0))); let namespace = generate_unique_namespace(); let now = Utc::now(); @@ -344,12 +332,8 @@ async fn test_age_based_drain_claims_orphan_postgres() { #[tokio::test] async fn test_age_based_drain_upkeep_postgres() { let store = create_test_store("postgres").await; - store - .revoke_partitions(&mut std::iter::once(TopicPartition::new(DEFAULT_TOPIC, 0))) - .unwrap(); - store - .assign_partitions(&mut std::iter::once(TopicPartition::new("owned-topic", 0))) - .unwrap(); + store.revoke_partitions(&mut std::iter::once(TopicPartition::new(DEFAULT_TOPIC, 0))); + store.assign_partitions(&mut std::iter::once(TopicPartition::new("owned-topic", 0))); let namespace = generate_unique_namespace(); let now = Utc::now(); diff --git a/src/store/traits.rs b/src/store/traits.rs index 55c16247..eb8e8fee 100644 --- a/src/store/traits.rs +++ b/src/store/traits.rs @@ -17,16 +17,10 @@ pub trait ActivationStore: Send + Sync { /// Add the given (topic, partition) pairs to the set this broker owns. Pairs /// accumulate across topics and consumers sharing the store. - fn assign_partitions( - &self, - partitions: &mut dyn Iterator, - ) -> Result<(), Error>; + fn assign_partitions(&self, partitions: &mut dyn Iterator); /// Remove the given (topic, partition) pairs from the set this broker owns. - fn revoke_partitions( - &self, - partitions: &mut dyn Iterator, - ) -> Result<(), Error>; + fn revoke_partitions(&self, partitions: &mut dyn Iterator); /// Get `limit` pending activations, optionally filtered by namespaces and bucket subrange. /// If `mark_activation_processing` is true, sets status to `Processing` and `processing_deadline`; otherwise `Claimed` and `claim_expires_at`. diff --git a/src/test_utils.rs b/src/test_utils.rs index 9a577aea..f9499601 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -287,9 +287,7 @@ pub async fn create_test_store(adapter: &str) -> Arc { let store = Arc::new(PostgresStore::new(&config).await.unwrap()) as Arc; - store - .assign_partitions(&mut std::iter::once(TopicPartition::new(DEFAULT_TOPIC, 0))) - .unwrap(); + store.assign_partitions(&mut std::iter::once(TopicPartition::new(DEFAULT_TOPIC, 0))); store } _ => panic!("Invalid adapter: {}", adapter), From c9c5bc25751383eb82eacf8889085afda1086bd0 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 23 Jun 2026 15:33:14 +0200 Subject: [PATCH 10/10] clean up comment --- src/store/adapters/postgres.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index 71aa5e59..c71513a1 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -307,11 +307,11 @@ impl PostgresStore { }) } - /// Restrict a query to the (topic, partition) pairs this broker owns, OR-ing - /// in an age-based escape: rows older than `contention_drain_age_sec` bypass - /// the filter so any broker can drain orphaned activations (left by rebalances - /// or topic/partition moves). This only affects contention, not correctness — - /// claims use `FOR UPDATE SKIP LOCKED`. + /// Restrict a query to the (topic, partition) pairs this broker owns. + /// + /// rows older than `contention_drain_age_sec` bypass the filter so any broker can drain + /// orphaned activations (left by rebalances or topic/partition moves). This only affects + /// contention, not correctness. /// /// With no partitions assigned (e.g. before the first rebalance) the query is /// left unfiltered.