diff --git a/Cargo.lock b/Cargo.lock index 387eb531..c27ca44a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2677,9 +2677,9 @@ dependencies = [ [[package]] name = "sentry_protos" -version = "0.8.13" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60dfb8c1b03c3f6e800a91eca7daea05205dd87f63b8d70b50b7e2211a2e0be2" +checksum = "d1c4808101306172a10683c4b9b905f891bbc2d8567496c8b82c45ca54965cc8" dependencies = [ "prost", "prost-types", diff --git a/Cargo.toml b/Cargo.toml index 6e618f47..49e41e1b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,7 @@ sentry = { version = "0.41.0", default-features = false, features = [ "tracing", "logs" ] } -sentry_protos = "0.8.13" +sentry_protos = "0.11.0" serde = "1.0.214" serde_bytes = "0.11" serde_yaml = "0.9.34" diff --git a/benches/store_bench.rs b/benches/store_bench.rs index 3c72d7e8..26ce42c6 100644 --- a/benches/store_bench.rs +++ b/benches/store_bench.rs @@ -121,7 +121,12 @@ async fn set_status(num_activations: u32, num_workers: u32) { for task_id in 0..num_activations { if task_id % num_workers == worker_idx { store - .set_status(&format!("id_{task_id}"), InflightActivationStatus::Complete) + .set_status( + &format!("id_{task_id}"), + InflightActivationStatus::Complete, + None, + None, + ) .await .unwrap(); } diff --git a/clients/python/pyproject.toml b/clients/python/pyproject.toml index e8133318..8c577338 100644 --- a/clients/python/pyproject.toml +++ b/clients/python/pyproject.toml @@ -6,7 +6,7 @@ readme = "README.md" dependencies = [ "sentry-arroyo>=2.38.7", "sentry-sdk[http2]>=2.43.0", - "sentry-protos>=0.8.13", + "sentry-protos>=0.10.0", "confluent_kafka>=2.3.0", "cronsim>=2.6", "grpcio>=1.67.0", diff --git a/clients/python/src/taskbroker_client/types.py b/clients/python/src/taskbroker_client/types.py index 326ddd96..800e8b14 100644 --- a/clients/python/src/taskbroker_client/types.py +++ b/clients/python/src/taskbroker_client/types.py @@ -68,3 +68,5 @@ class ProcessingResult: status: TaskActivationStatus.ValueType host: str receive_timestamp: float + max_attempts: int | None = None + delay_on_retry: int | None = None diff --git a/clients/python/src/taskbroker_client/worker/client.py b/clients/python/src/taskbroker_client/worker/client.py index 2a1c56b2..6b9e1ae0 100644 --- a/clients/python/src/taskbroker_client/worker/client.py +++ b/clients/python/src/taskbroker_client/worker/client.py @@ -445,6 +445,8 @@ def update_task( id=processing_result.task_id, status=processing_result.status, fetch_next_task=fetch_next_task, + max_attempts=processing_result.max_attempts, + delay_on_retry=processing_result.delay_on_retry, ) try: @@ -566,6 +568,8 @@ def update_task( id=processing_result.task_id, status=processing_result.status, fetch_next_task=None, + max_attempts=processing_result.max_attempts, + delay_on_retry=processing_result.delay_on_retry, ) retries = 0 diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index 31e38069..c9e8b5d7 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -378,6 +378,19 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: status=next_state, host=inflight.host, receive_timestamp=inflight.receive_timestamp, + # Send max_attempts and delay_on_retry if this is a retry. + # Don't send it on every task as this codepath is relatively + # unoptimized on the broker side. + max_attempts=( + task_func.retry._times + 1 + if task_func.retry and next_state == TASK_ACTIVATION_STATUS_RETRY + else None + ), + delay_on_retry=( + task_func.retry._delay + if task_func.retry and next_state == TASK_ACTIVATION_STATUS_RETRY + else None + ), ) ) diff --git a/pyproject.toml b/pyproject.toml index 12fcf328..8a839d4b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,7 @@ dev = [ "orjson>=3.10.10", "protobuf>=5.28.3", "pyyaml>=6.0.2", - "sentry-protos>=0.2.0", + "sentry-protos>=0.11.0", "flake8>=7.3.0", "isort>=5.13.2", "mypy>=1.17.1", diff --git a/src/config.rs b/src/config.rs index fd9b11e8..5e5cfeaa 100644 --- a/src/config.rs +++ b/src/config.rs @@ -129,6 +129,11 @@ pub struct Config { /// The location to the DLQ private key file pub kafka_deadletter_ssl_key_location: Option, + /// The topic to publish retry task activations to. + /// When set, retries go to this topic instead of kafka_topic. + /// Required for raw_mode where the main topic has other consumers. + pub kafka_retry_topic: Option, + /// The default number of partitions for a topic pub default_topic_partitions: i32, @@ -371,6 +376,7 @@ impl Default for Config { kafka_deadletter_ssl_ca_location: None, kafka_deadletter_ssl_certificate_location: None, kafka_deadletter_ssl_key_location: None, + kafka_retry_topic: None, default_topic_partitions: 1, kafka_session_timeout_ms: 6000, kafka_auto_commit_interval_ms: 5000, diff --git a/src/fetch/tests.rs b/src/fetch/tests.rs index 092d3503..2fb872e4 100644 --- a/src/fetch/tests.rs +++ b/src/fetch/tests.rs @@ -118,6 +118,8 @@ impl InflightActivationStore for MockStore { &self, _id: &str, _status: InflightActivationStatus, + _max_attempts: Option, + _delay_on_retry: Option, ) -> Result, Error> { unimplemented!() } diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 9d669c5e..92f0f0e7 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -106,7 +106,16 @@ impl ConsumerService for TaskbrokerServer { metrics::counter!("grpc_server.set_status.failure").increment(1); } - if let Some(ref tx) = self.update_tx { + let max_attempts = request.get_ref().max_attempts; + let delay_on_retry = request.get_ref().delay_on_retry; + + // Use batching channel if available and we don't need to update retry state. + // If max_attempts or delay_on_retry is Some, we can't use batching API to update the + // activation, and have to fall back to individual set_status. + if let Some(ref tx) = self.update_tx + && max_attempts.is_none() + && delay_on_retry.is_none() + { tx.send((id, status)) .await .map_err(|_| Status::internal("Status update channel closed"))?; @@ -115,7 +124,11 @@ impl ConsumerService for TaskbrokerServer { return Ok(Response::new(SetTaskStatusResponse { task: None })); } - match self.store.set_status(&id, status).await { + match self + .store + .set_status(&id, status, max_attempts, delay_on_retry) + .await + { Ok(Some(_)) => metrics::counter!( "grpc_server.set_status", "result" => "ok", diff --git a/src/grpc/server_tests.rs b/src/grpc/server_tests.rs index 86bc343e..633b7ac1 100644 --- a/src/grpc/server_tests.rs +++ b/src/grpc/server_tests.rs @@ -86,6 +86,8 @@ async fn test_set_task_status(#[case] adapter: &str) { id: "test_task".to_string(), status: 5, // Complete fetch_next_task: None, + max_attempts: None, + delay_on_retry: None, }; let response = service.set_task_status(Request::new(request)).await; @@ -113,6 +115,8 @@ async fn test_set_task_status_invalid(#[case] adapter: &str) { id: "test_task".to_string(), status: 1, // Invalid fetch_next_task: None, + max_attempts: None, + delay_on_retry: None, }; let response = service.set_task_status(Request::new(request)).await; @@ -266,6 +270,8 @@ async fn test_set_task_status_success(#[case] adapter: &str) { namespace: None, application: None, }), + max_attempts: None, + delay_on_retry: None, }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_ok()); @@ -306,6 +312,8 @@ async fn test_set_task_status_with_application(#[case] adapter: &str) { application: Some("hammers".into()), namespace: None, }), + max_attempts: None, + delay_on_retry: None, }; let response = service.set_task_status(Request::new(request)).await; @@ -352,6 +360,8 @@ async fn test_set_task_status_with_application_no_match(#[case] adapter: &str) { application: Some("no-matches".into()), namespace: None, }), + max_attempts: None, + delay_on_retry: None, }; let response = service.set_task_status(Request::new(request)).await; @@ -386,6 +396,8 @@ async fn test_set_task_status_with_namespace_requires_application(#[case] adapte application: None, namespace: Some(namespace), }), + max_attempts: None, + delay_on_retry: None, }; let response = service.set_task_status(Request::new(request)).await; @@ -433,6 +445,8 @@ async fn test_set_task_status_forwards_to_update_channel(#[case] adapter: &str) namespace: None, application: None, }), + max_attempts: None, + delay_on_retry: None, })) .await .unwrap(); @@ -476,6 +490,8 @@ async fn test_set_task_status_update_channel_closed_returns_internal() { id: "id_0".to_string(), status: 5, fetch_next_task: None, + max_attempts: None, + delay_on_retry: None, })) .await; diff --git a/src/kafka/deserialize.rs b/src/kafka/deserialize.rs index 0a6d0919..8f7a73e1 100644 --- a/src/kafka/deserialize.rs +++ b/src/kafka/deserialize.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use anyhow::Error; +use rdkafka::Message; use rdkafka::message::OwnedMessage; use crate::config::Config; @@ -12,6 +13,8 @@ use super::deserialize_raw::{self, RawConfig}; pub struct DeserializeConfig { activation_config: DeserializeActivationConfig, raw_config: Option, + /// Retry topic always contains activations, even in raw_mode. + retry_topic: Option, } impl DeserializeConfig { @@ -19,6 +22,7 @@ impl DeserializeConfig { Self { activation_config: DeserializeActivationConfig::from_config(config), raw_config: RawConfig::from_config(config), + retry_topic: config.kafka_retry_topic.clone(), } } } @@ -26,13 +30,23 @@ impl DeserializeConfig { /// Create a unified deserializer that handles both normal and raw modes. /// In raw mode, raw Kafka bytes are wrapped into a TaskActivation. /// In normal mode, Kafka messages are expected to contain encoded TaskActivation protos. +/// Messages from the retry topic are always deserialized as activations. pub fn new( config: DeserializeConfig, ) -> impl Fn(Arc) -> Result { let raw_deserializer = config.raw_config.map(deserialize_raw::new); let activation_deserializer = deserialize_activation::new(config.activation_config); + let retry_topic = config.retry_topic; move |msg: Arc| { + // Messages from the retry topic are always activations + if let Some(ref retry_topic) = retry_topic + && msg.topic() == retry_topic + { + return activation_deserializer(msg); + } + + // For main topic: use raw deserializer in raw_mode, else activation deserializer if let Some(ref raw_deserializer) = raw_deserializer { raw_deserializer(msg) } else { diff --git a/src/kafka/deserialize_raw.rs b/src/kafka/deserialize_raw.rs index 803f09a0..6e6bd742 100644 --- a/src/kafka/deserialize_raw.rs +++ b/src/kafka/deserialize_raw.rs @@ -44,6 +44,13 @@ impl RawConfig { application ); + if let Some(ref retry_topic) = config.kafka_retry_topic { + assert!( + retry_topic != &config.kafka_topic, + "kafka_retry_topic cannot equal kafka_topic when raw_mode is enabled" + ); + } + Some(Self { namespace: config .raw_namespace diff --git a/src/main.rs b/src/main.rs index c3648a6d..5095cd3e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -86,11 +86,21 @@ async fn main() -> Result<(), Error> { if config.create_missing_topics { let kafka_client_config = config.kafka_consumer_config(); create_missing_topics( - kafka_client_config, + kafka_client_config.clone(), &config.kafka_topic, config.default_topic_partitions, ) .await?; + + // Create retry topic if configured + if let Some(ref retry_topic) = config.kafka_retry_topic { + create_missing_topics( + kafka_client_config, + retry_topic, + config.default_topic_partitions, + ) + .await?; + } } if config.full_vacuum_on_start { @@ -158,11 +168,19 @@ async fn main() -> Result<(), Error> { let consumer_store = store.clone(); let consumer_config = config.clone(); let runtime_config_manager = runtime_config_manager.clone(); + + // Build list of topics to consume from + let mut topics_to_consume = vec![consumer_config.kafka_topic.clone()]; + if let Some(ref retry_topic) = consumer_config.kafka_retry_topic { + topics_to_consume.push(retry_topic.clone()); + } + async move { // The consumer has an internal thread that listens for cancellations, so it doesn't need // an outer select here like the other tasks. + let topic_refs: Vec<&str> = topics_to_consume.iter().map(|s| s.as_str()).collect(); start_consumer( - &[&consumer_config.kafka_topic], + &topic_refs, &consumer_config.kafka_consumer_config(), consumer_store.clone(), processing_strategy!({ diff --git a/src/push/tests.rs b/src/push/tests.rs index 888b127d..e3f78a9f 100644 --- a/src/push/tests.rs +++ b/src/push/tests.rs @@ -100,6 +100,8 @@ impl InflightActivationStore for MockStore { &self, _id: &str, _status: InflightActivationStatus, + _max_attempts: Option, + _delay_on_retry: Option, ) -> anyhow::Result> { Ok(None) } diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index 27ce276f..9d6cf9d2 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -5,13 +5,14 @@ use std::time::Instant; use sqlx::ConnectOptions; use sqlx::pool::PoolConnection; use sqlx::postgres::{PgConnectOptions, PgPool, PgPoolOptions}; -use sqlx::{FromRow, Pool, Postgres, QueryBuilder}; +use sqlx::{FromRow, Pool, Postgres, QueryBuilder, Transaction}; use anyhow::{Error, anyhow}; use async_backtrace::framed; use async_trait::async_trait; use chrono::{DateTime, Utc}; -use sentry_protos::taskbroker::v1::OnAttemptsExceeded; +use prost::Message; +use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, TaskActivation}; use tracing::{instrument, warn}; use crate::config::Config; @@ -183,10 +184,23 @@ impl PostgresActivationStore { ) -> Result, Error> { let start = Instant::now(); let conn = self.write_pool.acquire().await?; - metrics::histogram!("postgres.write.acquire_conn", "fn" => caller).record(start.elapsed()); + metrics::histogram!("postgres.write.acquire_conn", "fn" => caller, "mode" => "conn") + .record(start.elapsed()); Ok(conn) } + #[framed] + async fn begin_write_tx_metric( + &self, + caller: &'static str, + ) -> Result, Error> { + let start = Instant::now(); + let tx = self.write_pool.begin().await?; + metrics::histogram!("postgres.write.acquire_conn", "fn" => caller, "mode" => "begin") + .record(start.elapsed()); + Ok(tx) + } + #[framed] pub async fn new(config: PostgresActivationStoreConfig) -> Result { if config.run_migrations { @@ -630,27 +644,66 @@ impl InflightActivationStore for PostgresActivationStore { }) } - /// Update the status of a specific activation + /// Update the status of a specific activation. + /// If max_attempts is provided (for Retry status), also updates the activation's retry_state. #[instrument(skip_all)] #[framed] async fn set_status( &self, id: &str, status: InflightActivationStatus, + max_attempts: Option, + delay_on_retry: Option, ) -> Result, Error> { - let mut conn = self.acquire_write_conn_metric("set_status").await?; + let mut tx = self.begin_write_tx_metric("set_status").await?; + let result: Option = sqlx::query_as( "UPDATE inflight_taskactivations SET status = $1 WHERE id = $2 RETURNING *, kafka_offset AS offset", ) .bind(status.to_string()) .bind(id) - .fetch_optional(&mut *conn) + .fetch_optional(&mut *tx) .await?; - let Some(row) = result else { + let Some(mut row) = result else { return Ok(None); }; + let mut activation = TaskActivation::decode(&row.activation as &[u8])?; + let mut needs_update = false; + let retry_state = activation.retry_state.get_or_insert_default(); + + // Only update the blob if max_attempts actually changed. This should rarely + // happen after the first retry, since max_attempts comes from the task's + // retry decorator which stays constant across retries. + // For raw topics, retry_state starts as None so we create it on first retry. + if let Some(max_attempts) = max_attempts + && retry_state.max_attempts != max_attempts + { + retry_state.max_attempts = max_attempts; + needs_update = true; + } + + if let Some(delay_on_retry) = delay_on_retry + && retry_state.delay_on_retry != Some(delay_on_retry) + { + retry_state.delay_on_retry = Some(delay_on_retry); + needs_update = true; + } + + if needs_update { + let updated_activation = activation.encode_to_vec(); + sqlx::query("UPDATE inflight_taskactivations SET activation = $1 WHERE id = $2") + .bind(&updated_activation) + .bind(id) + .execute(&mut *tx) + .await?; + + row.activation = updated_activation; + } + + tx.commit().await?; + Ok(Some(row.into())) } diff --git a/src/store/adapters/sqlite.rs b/src/store/adapters/sqlite.rs index de457de4..8712a0c0 100644 --- a/src/store/adapters/sqlite.rs +++ b/src/store/adapters/sqlite.rs @@ -7,7 +7,7 @@ use sqlx::sqlite::{ SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqlitePool, SqliteRow, SqliteSynchronous, }; -use sqlx::{ConnectOptions, FromRow, Pool, QueryBuilder, Row, Sqlite}; +use sqlx::{ConnectOptions, FromRow, Pool, QueryBuilder, Row, Sqlite, Transaction}; use anyhow::{Error, anyhow}; use async_trait::async_trait; @@ -20,7 +20,8 @@ use libsqlite3_sys::{ SQLITE_DBSTATUS_LOOKASIDE_USED, SQLITE_DBSTATUS_SCHEMA_USED, SQLITE_DBSTATUS_STMT_USED, SQLITE_OK, sqlite3_db_status, }; -use sentry_protos::taskbroker::v1::OnAttemptsExceeded; +use prost::Message; +use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, TaskActivation}; use tracing::{instrument, warn}; use crate::config::Config; @@ -184,10 +185,22 @@ impl SqliteActivationStore { ) -> Result, Error> { let start = Instant::now(); let conn = self.write_pool.acquire().await?; - metrics::histogram!("sqlite.write.acquire_conn", "fn" => caller).record(start.elapsed()); + metrics::histogram!("sqlite.write.acquire_conn", "fn" => caller, "mode" => "conn") + .record(start.elapsed()); Ok(conn) } + async fn begin_write_tx_metric( + &self, + caller: &'static str, + ) -> Result, Error> { + let start = Instant::now(); + let tx = self.write_pool.begin().await?; + metrics::histogram!("sqlite.write.acquire_conn", "fn" => caller, "mode" => "begin") + .record(start.elapsed()); + Ok(tx) + } + async fn emit_db_status_metrics(&self) { if !self.config.enable_sqlite_status_metrics { return; @@ -683,26 +696,65 @@ impl InflightActivationStore for SqliteActivationStore { Ok(result.get::("count") as usize) } - /// Update the status of a specific activation + /// Update the status of a specific activation. + /// If max_attempts or delay_on_retry is provided (for Retry status), also updates the activation's retry_state. #[instrument(skip_all)] async fn set_status( &self, id: &str, status: InflightActivationStatus, + max_attempts: Option, + delay_on_retry: Option, ) -> Result, Error> { - let mut conn = self.acquire_write_conn_metric("set_status").await?; + let mut tx = self.begin_write_tx_metric("set_status").await?; + let result: Option = sqlx::query_as( "UPDATE inflight_taskactivations SET status = $1 WHERE id = $2 RETURNING *", ) .bind(status) .bind(id) - .fetch_optional(&mut *conn) + .fetch_optional(&mut *tx) .await?; - let Some(row) = result else { + let Some(mut row) = result else { return Ok(None); }; + let mut activation = TaskActivation::decode(&row.activation as &[u8])?; + let mut needs_update = false; + let retry_state = activation.retry_state.get_or_insert_default(); + + // Only update the blob if max_attempts actually changed. This should rarely + // happen after the first retry, since max_attempts comes from the task's + // retry decorator which stays constant across retries. + // For raw topics, retry_state starts as None so we create it on first retry. + if let Some(max_attempts) = max_attempts + && retry_state.max_attempts != max_attempts + { + retry_state.max_attempts = max_attempts; + needs_update = true; + } + + if let Some(delay_on_retry) = delay_on_retry + && retry_state.delay_on_retry != Some(delay_on_retry) + { + retry_state.delay_on_retry = Some(delay_on_retry); + needs_update = true; + } + + if needs_update { + let updated_activation = activation.encode_to_vec(); + sqlx::query("UPDATE inflight_taskactivations SET activation = $1 WHERE id = $2") + .bind(&updated_activation) + .bind(id) + .execute(&mut *tx) + .await?; + + row.activation = updated_activation; + } + + tx.commit().await?; + Ok(Some(row.into())) } diff --git a/src/store/tests.rs b/src/store/tests.rs index 18df83aa..b9ca408c 100644 --- a/src/store/tests.rs +++ b/src/store/tests.rs @@ -132,15 +132,15 @@ async fn test_count_depths(#[case] adapter: &str) { assert!(store.store(batch).await.is_ok()); store - .set_status("id_0", InflightActivationStatus::Processing) + .set_status("id_0", InflightActivationStatus::Processing, None, None) .await .unwrap(); store - .set_status("id_1", InflightActivationStatus::Delay) + .set_status("id_1", InflightActivationStatus::Delay, None, None) .await .unwrap(); store - .set_status("id_2", InflightActivationStatus::Complete) + .set_status("id_2", InflightActivationStatus::Complete, None, None) .await .unwrap(); @@ -739,7 +739,7 @@ async fn test_set_activation_status(#[case] adapter: &str) { assert!( store - .set_status("id_0", InflightActivationStatus::Failure) + .set_status("id_0", InflightActivationStatus::Failure, None, None) .await .is_ok() ); @@ -755,7 +755,7 @@ async fn test_set_activation_status(#[case] adapter: &str) { assert!( store - .set_status("id_0", InflightActivationStatus::Pending) + .set_status("id_0", InflightActivationStatus::Pending, None, None) .await .is_ok() ); @@ -769,13 +769,13 @@ async fn test_set_activation_status(#[case] adapter: &str) { .await; assert!( store - .set_status("id_0", InflightActivationStatus::Failure) + .set_status("id_0", InflightActivationStatus::Failure, None, None) .await .is_ok() ); assert!( store - .set_status("id_1", InflightActivationStatus::Failure) + .set_status("id_1", InflightActivationStatus::Failure, None, None) .await .is_ok() ); @@ -797,7 +797,7 @@ async fn test_set_activation_status(#[case] adapter: &str) { ); let result = store - .set_status("not_there", InflightActivationStatus::Complete) + .set_status("not_there", InflightActivationStatus::Complete, None, None) .await; assert!(result.is_ok(), "no query error"); @@ -805,7 +805,7 @@ async fn test_set_activation_status(#[case] adapter: &str) { assert!(activation.is_none(), "no activation found"); let result = store - .set_status("id_0", InflightActivationStatus::Complete) + .set_status("id_0", InflightActivationStatus::Complete, None, None) .await; assert!(result.is_ok(), "no query error"); @@ -837,7 +837,7 @@ async fn test_set_activation_status_with_partitions(#[case] adapter: &str) { assert!( store - .set_status("id_0", InflightActivationStatus::Failure) + .set_status("id_0", InflightActivationStatus::Failure, None, None) .await .is_ok() ); @@ -852,7 +852,7 @@ async fn test_set_activation_status_with_partitions(#[case] adapter: &str) { assert!( store - .set_status("id_0", InflightActivationStatus::Pending) + .set_status("id_0", InflightActivationStatus::Pending, None, None) .await .is_ok() ); @@ -866,13 +866,13 @@ async fn test_set_activation_status_with_partitions(#[case] adapter: &str) { .await; assert!( store - .set_status("id_0", InflightActivationStatus::Failure) + .set_status("id_0", InflightActivationStatus::Failure, None, None) .await .is_ok() ); assert!( store - .set_status("id_1", InflightActivationStatus::Failure) + .set_status("id_1", InflightActivationStatus::Failure, None, None) .await .is_ok() ); @@ -896,7 +896,7 @@ async fn test_set_activation_status_with_partitions(#[case] adapter: &str) { ); let result = store - .set_status("not_there", InflightActivationStatus::Complete) + .set_status("not_there", InflightActivationStatus::Complete, None, None) .await; assert!(result.is_ok(), "no query error"); @@ -904,7 +904,7 @@ async fn test_set_activation_status_with_partitions(#[case] adapter: &str) { assert!(activation.is_none(), "no activation found"); let result = store - .set_status("id_0", InflightActivationStatus::Complete) + .set_status("id_0", InflightActivationStatus::Complete, None, None) .await; assert!(result.is_ok(), "no query error"); @@ -985,7 +985,7 @@ async fn test_get_retry_activations(#[case] adapter: &str) { assert!( store - .set_status("id_0", InflightActivationStatus::Retry) + .set_status("id_0", InflightActivationStatus::Retry, None, None) .await .is_ok() ); @@ -1001,7 +1001,7 @@ async fn test_get_retry_activations(#[case] adapter: &str) { assert!( store - .set_status("id_1", InflightActivationStatus::Retry) + .set_status("id_1", InflightActivationStatus::Retry, None, None) .await .is_ok() ); diff --git a/src/store/traits.rs b/src/store/traits.rs index d8bdb5e0..8c584c02 100644 --- a/src/store/traits.rs +++ b/src/store/traits.rs @@ -71,11 +71,14 @@ pub trait InflightActivationStore: Send + Sync { /// Record successful push. async fn mark_activation_processing(&self, id: &str) -> Result<(), Error>; - /// Update the status of a specific activation + /// Update the status of a specific activation. + /// If max_attempts or delay_on_retry is provided (for Retry status), also updates the activation's retry_state. async fn set_status( &self, id: &str, status: InflightActivationStatus, + max_attempts: Option, + delay_on_retry: Option, ) -> Result, Error>; /// Update the status of multiple activations in one batch. diff --git a/src/upkeep.rs b/src/upkeep.rs index 5336f063..23c8f00d 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -151,20 +151,26 @@ pub async fn do_upkeep( // 1. Handle retry tasks let handle_retries_start = Instant::now(); if let Ok(retries) = store.get_retry_activations().await { + // Use retry topic if configured, otherwise fall back to main topic + let retry_target_topic = config + .kafka_retry_topic + .as_ref() + .unwrap_or(&config.kafka_topic); + // 2. Append retries to kafka let deliveries = retries .into_iter() .map(|inflight| { let producer = producer.clone(); let config = config.clone(); + let target_topic = retry_target_topic.clone(); async move { let activation = TaskActivation::decode(&inflight.activation as &[u8]).unwrap(); let serialized = create_retry_activation(&activation).encode_to_vec(); let delivery = producer .send( - FutureRecord::<(), Vec>::to(&config.kafka_topic) - .payload(&serialized), + FutureRecord::<(), Vec>::to(&target_topic).payload(&serialized), Timeout::After(Duration::from_millis(config.kafka_send_timeout_ms)), ) .await; diff --git a/uv.lock b/uv.lock index 6380aaaa..853115a2 100644 --- a/uv.lock +++ b/uv.lock @@ -599,7 +599,7 @@ wheels = [ [[package]] name = "sentry-protos" -version = "0.8.13" +version = "0.11.0" source = { registry = "https://pypi.devinfra.sentry.io/simple" } dependencies = [ { name = "grpc-stubs", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, @@ -607,7 +607,7 @@ dependencies = [ { name = "protobuf", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, ] wheels = [ - { url = "https://pypi.devinfra.sentry.io/wheels/sentry_protos-0.8.13-py3-none-any.whl", hash = "sha256:8cebc86dbb20cea0157f488e0509bb854b8ec03f840ceb09445b2c4c2ee27d4f" }, + { url = "https://pypi.devinfra.sentry.io/wheels/sentry_protos-0.11.0-py3-none-any.whl", hash = "sha256:0b32997afc1d21241cb996602de3a06d25cd33efc437e1874c1c66fb98534918" }, ] [[package]] @@ -696,7 +696,7 @@ dev = [ { name = "pytest", specifier = ">=9.0.3" }, { name = "pyyaml", specifier = ">=6.0.2" }, { name = "sentry-devenv", specifier = ">=1.22.2" }, - { name = "sentry-protos", specifier = ">=0.2.0" }, + { name = "sentry-protos", specifier = ">=0.11.0" }, { name = "types-protobuf", specifier = ">=5.27.0.20240626,<6.0.0" }, { name = "types-pyyaml", specifier = ">=6.0.12.20241230" }, ] @@ -753,7 +753,7 @@ requires-dist = [ { name = "redis", specifier = ">=3.4.1" }, { name = "redis-py-cluster", specifier = ">=2.1.0" }, { name = "sentry-arroyo", specifier = ">=2.38.7" }, - { name = "sentry-protos", specifier = ">=0.8.13" }, + { name = "sentry-protos", specifier = ">=0.10.0" }, { name = "sentry-sdk", extras = ["http2"], specifier = ">=2.43.0" }, { name = "setuptools", marker = "extra == 'examples'", specifier = ">=80.0" }, { name = "zstandard", specifier = ">=0.18.0" },