Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions migrations/postgres/0004_add_topic_age_contention.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
ALTER TABLE inflight_taskactivations ADD COLUMN topic TEXT NOT NULL DEFAULT '';

-- Contention filter for the upkeep queries, which filter by status +
-- (topic,partition) and have no ORDER BY, so they bitmap-scan this index.
-- Replaces idx_activation_partition (partition), which a followup drops.
CREATE INDEX idx_topic_partition
ON inflight_taskactivations (topic, partition);

-- Age-based drain branch (added_at < threshold).
-- SQLite has an equivalent in its (status, added_at, ...) index, but we want
-- to keep this index separate from status as added_at is immutable.
CREATE INDEX idx_added_at
ON inflight_taskactivations (added_at);
78 changes: 7 additions & 71 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,15 @@ pub mod validate;
use deprecated::DeprecatedConfig;
use kafka::{ClusterConfig, TopicConfig};
use raw::RawModeConfig;
use store::DatabaseAdapter;

/// Used to identify whether a configuration option was provided by a user, or whether it is a default.
const DEFAULT_CONFIG_PROVIDER: &str = "TaskbrokerConfig";

/// The default Kafka topic used when none is configured (the historical
/// zero-config `taskworker` default). Also used as the default `topic` for
/// activations that aren't built from a Kafka message (e.g. in tests).
pub const DEFAULT_TOPIC: &str = "taskworker";

/// How the taskbroker delivers tasks to workers.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
Expand Down Expand Up @@ -403,7 +407,6 @@ impl Config {
pub(crate) fn normalize_and_validate(&mut self) -> Result<()> {
const DEFAULT_CLUSTER: &str = "default";
const DEADLETTER_CLUSTER: &str = "deadletter";
const DEFAULT_TOPIC: &str = "taskworker";
const DEFAULT_CLUSTER_ADDRESS: &str = "127.0.0.1:9092";
const DEFAULT_CONSUMER_GROUP: &str = "taskworker";

Expand Down Expand Up @@ -630,23 +633,6 @@ impl Config {
// Validate at least one consumable topic.
let consumable = self.consumable_topics()?;

// Multi-topic consumption is only supported on the sqlite adapter for
// now. The postgres adapter filters claims by a single shared partition
// list, but those partition numbers aren't unique across topics, so the
// filter would mix partitions from different topics together. Note this
// filtering exists only to avoid lock contention between brokers, not for
// correctness; supporting multi-topic on postgres means reworking how we
// avoid that contention (e.g. filtering by (topic, partition) or another
// mechanism entirely). Reject the combination here, before any consumer
// spawns.
if consumable.len() > 1 && self.store.adapter == DatabaseAdapter::Postgres {
return Err(anyhow!(
"multi-topic consumption ({} consumable topics) is not supported with the \
postgres database adapter; use the sqlite adapter or a single consumable topic",
consumable.len()
));
}

// The deadletter topic must be a declared topic so the producer can
// resolve its cluster. In legacy mode it was added above; in the new
// format the user must declare it (produce-only) in kafka_topics.
Expand Down Expand Up @@ -909,7 +895,8 @@ mod tests {
use crate::logging::LogFormat;
use crate::{Args, Run};

use super::{Config, DatabaseAdapter, DeliveryMode};
use super::store::DatabaseAdapter;
use super::{Config, DeliveryMode};

#[test]
fn test_default() {
Expand Down Expand Up @@ -1701,57 +1688,6 @@ kafka_clusters:
});
}

#[test]
fn test_multi_topic_rejected_on_postgres() {
Jail::expect_with(|jail| {
// Multiple consumable topics are allowed on sqlite but rejected on
// postgres, whose claim filtering can't distinguish partitions
// across topics.
jail.create_file(
"config.yaml",
r#"
database_adapter: postgres
kafka_deadletter_topic: tasks-dlq
kafka_retry_topic: tasks-retry

kafka_topics:
profiles:
cluster: my-cluster
consumer_group: taskbroker-profiles
subscriptions:
cluster: my-cluster
consumer_group: taskbroker-subscriptions
tasks-retry:
cluster: my-cluster
consumer_group: taskbroker-retry
produce_only: true
tasks-dlq:
cluster: my-cluster
consumer_group: taskbroker-dlq
produce_only: true

kafka_clusters:
my-cluster:
address: 10.0.0.1:9092
"#,
)?;

let args = Args {
run: Run::Broker,
config: Some("config.yaml".to_owned()),
};
let err = Config::from_args(&args).unwrap_err();
assert!(
err.to_string()
.contains("not supported with the postgres database adapter"),
"unexpected error: {}",
err
);

Ok(())
});
}

#[test]
fn test_multi_topic_allows_one_consumable_with_produce_only() {
Jail::expect_with(|jail| {
Expand Down
7 changes: 7 additions & 0 deletions src/config/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ pub struct StoreConfig {
/// are extended by. This helps reduce broker deadline resets when
/// brokers are under load, or there are small networking delays.
pub processing_deadline_grace_sec: u64,

/// Age-based contention drain threshold in seconds (postgres only).
/// Activations older than this bypass the (topic, partition) contention
/// filter so any broker can drain orphaned rows left by rebalances or
/// topic/partition moves.
pub contention_drain_age_sec: u64,
}

impl Default for StoreConfig {
Expand All @@ -200,6 +206,7 @@ impl Default for StoreConfig {
max_processing_count: 2048,
max_processing_attempts: 5,
processing_deadline_grace_sec: 3,
contention_drain_age_sec: 60,
}
}
}
8 changes: 4 additions & 4 deletions src/fetch/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::config::Config;
use crate::config::fetch::FetchConfig;
use crate::store::activation::{Activation, ActivationStatus};
use crate::store::traits::ActivationStore;
use crate::store::types::{BucketRange, FailedTasksForwarder};
use crate::store::types::{BucketRange, FailedTasksForwarder, TopicPartition};
use crate::test_utils::make_activations;

use super::*;
Expand Down Expand Up @@ -49,9 +49,9 @@ impl MockStore {

#[async_trait]
impl ActivationStore for MockStore {
fn assign_partitions(&self, _partitions: Vec<i32>) -> Result<(), Error> {
Ok(())
}
fn assign_partitions(&self, _partitions: &mut dyn Iterator<Item = TopicPartition>) {}

fn revoke_partitions(&self, _partitions: &mut dyn Iterator<Item = TopicPartition>) {}

async fn vacuum_db(&self) -> Result<(), Error> {
unimplemented!()
Expand Down
44 changes: 20 additions & 24 deletions src/kafka/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use futures::{Stream, StreamExt, future, pin_mut};
use tracing::{debug, error, info, instrument, warn};

use crate::store::traits::ActivationStore;
use crate::store::types::TopicPartition;

pub async fn start_consumer(
topics: &[&str],
Expand All @@ -41,10 +42,10 @@ pub async fn start_consumer(
) -> Result<(), Error> {
let (client_shutdown_sender, client_shutdown_receiver) = oneshot::channel();
let (event_sender, event_receiver) = unbounded_channel();
// Each consumer subscribes to a single topic; join defensively in case that
// ever changes. Used as the `topic` tag on this consumer's metrics.
let topic = topics.join(",");
let context = KafkaContext::new(event_sender.clone(), topic.clone());
// Metrics tag only. Ownership keys come per-topic from the assignment `tpl`,
// never from this string — so a multi-topic consumer stays correct.
let topics_tag = topics.join(",");
let context = KafkaContext::new(event_sender.clone(), topics_tag.clone());
let consumer: Arc<StreamConsumer<KafkaContext>> = Arc::new(
kafka_client_config
.create_with_context(context)
Expand All @@ -57,14 +58,14 @@ pub async fn start_consumer(

handle_shutdown_signals(event_sender.clone());
poll_consumer_client(consumer.clone(), client_shutdown_receiver);
metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topic.clone()).set(0);
metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topics_tag.clone()).set(0);
handle_events(
consumer,
event_receiver,
activation_store,
client_shutdown_sender,
spawn_actors,
topic,
topics_tag,
)
.await
}
Expand Down Expand Up @@ -114,14 +115,14 @@ pub fn poll_consumer_client(
pub struct KafkaContext {
event_sender: UnboundedSender<(Event, SyncSender<()>)>,
/// The topic(s) this consumer is subscribed to, used as a metric tag.
topic: String,
topics_tag: String,
}

impl KafkaContext {
pub fn new(event_sender: UnboundedSender<(Event, SyncSender<()>)>, topic: String) -> Self {
pub fn new(event_sender: UnboundedSender<(Event, SyncSender<()>)>, topics_tag: String) -> Self {
Self {
event_sender,
topic,
topics_tag,
}
}
}
Expand Down Expand Up @@ -151,7 +152,7 @@ impl ConsumerContext for KafkaContext {
info!("Rendezvous complete");
metrics::counter!(
"arroyo.consumer.partitions_assigned.count",
"topic" => self.topic.clone(),
"topic" => self.topics_tag.clone(),
)
.increment(tpl.count() as u64);
}
Expand All @@ -173,7 +174,7 @@ impl ConsumerContext for KafkaContext {
info!("Rendezvous complete");
metrics::counter!(
"arroyo.consumer.partitions_revoked.count",
"topic" => self.topic.clone(),
"topic" => self.topics_tag.clone(),
)
.increment(tpl.count() as u64);
}
Expand Down Expand Up @@ -352,7 +353,7 @@ pub async fn handle_events(
Arc<StreamConsumer<KafkaContext>>,
&BTreeSet<(String, i32)>,
) -> ActorHandles,
topic: String,
topics_tag: String,
) -> Result<(), anyhow::Error> {
const CALLBACK_DURATION: Duration = Duration::from_secs(4);

Expand All @@ -379,14 +380,9 @@ pub async fn handle_events(
info!("Received event: {:?}", event);
state = match (state, event) {
(ConsumerState::Ready, Event::Assign(tpl)) => {
metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topic.clone())
metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topics_tag.clone())
.set(tpl.len() as f64);
// Note: This assumes we only process one topic per consumer.
let mut partitions = Vec::<i32>::new();
for (_, partition) in tpl.iter() {
partitions.push(*partition);
}
activation_store.assign_partitions(partitions).unwrap();
activation_store.assign_partitions(&mut tpl.iter().map(TopicPartition::from));
ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl)
}
(ConsumerState::Ready, Event::Revoke(_)) => {
Expand All @@ -401,17 +397,17 @@ pub async fn handle_events(
tpl == revoked,
"Revoked TPL should be equal to the subset of TPL we're consuming from"
);
activation_store.assign_partitions(vec![]).unwrap();
activation_store.revoke_partitions(&mut revoked.iter().map(TopicPartition::from));
handles.shutdown(CALLBACK_DURATION).await;
metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topic.clone()).set(0);
metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topics_tag.clone()).set(0);
ConsumerState::Ready
}
Comment thread
untitaker marked this conversation as resolved.
(ConsumerState::Consuming(handles, _), Event::Shutdown) => {
activation_store.assign_partitions(vec![]).unwrap();
(ConsumerState::Consuming(handles, tpl), Event::Shutdown) => {
activation_store.revoke_partitions(&mut tpl.iter().map(TopicPartition::from));
handles.shutdown(CALLBACK_DURATION).await;
debug!("Signaling shutdown to client...");
shutdown_client.take();
metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topic.clone()).set(0);
metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topics_tag.clone()).set(0);
ConsumerState::Stopped
}
(ConsumerState::Stopped, _) => {
Expand Down
1 change: 1 addition & 0 deletions src/kafka/deserialize_activation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub fn new(
id: activation.id.clone(),
activation: payload.to_vec(),
status,
topic: msg.topic().to_owned(),
partition: msg.partition(),
offset: msg.offset(),
added_at: Utc::now(),
Expand Down
1 change: 1 addition & 0 deletions src/kafka/deserialize_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ pub fn new(config: RawConfig) -> impl Fn(&OwnedMessage) -> Result<Activation, Er
id,
activation: activation_bytes,
status: ActivationStatus::Pending,
topic: msg.topic().to_owned(),
partition: msg.partition(),
offset: msg.offset(),
added_at: now,
Expand Down
8 changes: 4 additions & 4 deletions src/push/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::config::push::{PushConfig, PushQueueConfig};
use crate::push::updater::test_eager_updater;
use crate::store::activation::{Activation, ActivationStatus};
use crate::store::traits::ActivationStore;
use crate::store::types::FailedTasksForwarder;
use crate::store::types::{FailedTasksForwarder, TopicPartition};
use crate::test_utils::make_activations;
use crate::worker::test_worker_map;

Expand All @@ -36,9 +36,9 @@ impl ActivationStore for MockStore {
Ok(0)
}

fn assign_partitions(&self, _partitions: Vec<i32>) -> Result<()> {
Ok(())
}
fn assign_partitions(&self, _partitions: &mut dyn Iterator<Item = TopicPartition>) {}

fn revoke_partitions(&self, _partitions: &mut dyn Iterator<Item = TopicPartition>) {}

async fn claim_activations(
&self,
Expand Down
6 changes: 6 additions & 0 deletions src/store/activation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ pub struct Activation {
#[builder(default = ActivationStatus::Pending)]
pub status: ActivationStatus,

/// The Kafka topic the activation was received from. Paired with `partition`
/// to scope contention filtering, since partition indices overlap across
/// topics. Defaults to [`crate::config::DEFAULT_TOPIC`] in tests.
#[builder(setter(into), default = crate::config::DEFAULT_TOPIC.to_owned())]
pub topic: String,

/// The partition the activation was received from
#[builder(default = 0)]
pub partition: i32,
Expand Down
Loading
Loading