From cf221e3e251f2169a4d93c017cb84b816497c64c Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Fri, 19 Jun 2026 09:57:46 -0400 Subject: [PATCH 1/2] feat: Immediately delete completed tasks Previously activations that completed successfully would have their status updated, and then the upkeep thread would remove all the completed activations in batches. Now that the status update is happening in batches, the completed activations can also be immediately deleted instead of waiting for the upkeep thread. --- src/fetch/tests.rs | 4 ++ src/grpc/server.rs | 83 +++++++++++++++++++++++----------- src/grpc/server_tests.rs | 31 ++++++++----- src/push/tests.rs | 4 ++ src/store/adapters/postgres.rs | 16 +++++++ src/store/adapters/sqlite.rs | 21 +++++++++ src/store/traits.rs | 3 ++ 7 files changed, 124 insertions(+), 38 deletions(-) diff --git a/src/fetch/tests.rs b/src/fetch/tests.rs index d32107f0..1a6232b2 100644 --- a/src/fetch/tests.rs +++ b/src/fetch/tests.rs @@ -147,6 +147,10 @@ impl ActivationStore for MockStore { unimplemented!() } + async fn delete_activation_batch(&self, _ids: &[String]) -> Result { + unimplemented!() + } + async fn get_retry_activations(&self) -> Result, Error> { unimplemented!() } diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 715041f4..cc4826dd 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; -use anyhow::Result; +use anyhow::{Error, Result}; use chrono::Utc; use prost::Message; use sentry_protos::taskbroker::v1::consumer_service_server::ConsumerService; @@ -256,6 +256,22 @@ impl ConsumerService for TaskbrokerServer { } } + // Completed tasks are immediately deleted from the store, instead of being updated. All the relevant upkeep metrics are also updated. + let completed_ids = batches + .remove(&ActivationStatus::Complete) + .unwrap_or_default(); + if !completed_ids.is_empty() { + let requested = completed_ids.len() as u64; + let status = ActivationStatus::Complete; + metrics::histogram!("grpc_server.set_batch_activation_status.batch_size", "status" => status.to_string()).record(requested as f64); + let result = self.store.delete_activation_batch(&completed_ids).await; + let err_msg = handle_batch_update(result, requested, status, start_time); + if !err_msg.is_empty() { + error!("Failed to delete completed activations: {:?}", err_msg); + return Err(Status::internal("Failed to set batch activation status")); + } + } + // For each status and batch of IDs, update the status for those IDs. In the case of a failure, return an error to the worker. // Track and log a warning if the number of rows updated is less than the number of IDs requested. metrics::histogram!("grpc_server.set_batch_activation_status.num_batches") @@ -263,31 +279,11 @@ impl ConsumerService for TaskbrokerServer { for (status, ids) in batches { let requested = ids.len() as u64; metrics::histogram!("grpc_server.set_batch_activation_status.batch_size", "status" => status.to_string()).record(requested as f64); - match self.store.set_status_batch(&ids, status).await { - Ok(affected) => { - metrics::histogram!("grpc_server.set_batch_activation_status.affected_diff", "status" => status.to_string()) - .record((requested - affected) as f64); - if affected < requested { - metrics::histogram!( - "grpc_server.set_batch_activation_status.partial", - "status" => status.to_string() - ) - .record((requested - affected) as f64); - warn!( - ?status, - requested, affected, "Updated fewer rows than IDs requested in batch" - ); - } - metrics::counter!("grpc_server.set_status", "result" => "ok", "status" => status.to_string()).increment(affected); - metrics::counter!("grpc_server.set_status", "result" => "skipped_in_batch", "status" => status.to_string()).increment(requested - affected); - } - Err(e) => { - metrics::counter!("grpc_server.set_status", "result" => "error", "status" => status.to_string()).increment(requested); - metrics::histogram!("grpc_server.set_batch_activation_status.duration") - .record(start_time.elapsed()); - error!("Failed to set batch activation status: {:?}", e); - return Err(Status::internal("Failed to set batch activation status")); - } + let result = self.store.set_status_batch(&ids, status).await; + let err_msg = handle_batch_update(result, requested, status, start_time); + if !err_msg.is_empty() { + error!("Failed to set batch activation status: {:?}", err_msg); + return Err(Status::internal("Failed to set batch activation status")); } } @@ -401,3 +397,38 @@ pub async fn flush_updates(store: Arc, buffer: &mut Vec, + requested: u64, + status: ActivationStatus, + start_time: Instant, +) -> String { + match result { + Ok(affected) => { + metrics::histogram!("upkeep.remove_completed").record(affected as f64); + metrics::histogram!("grpc_server.set_batch_activation_status.affected_diff", "status" => status.to_string()) + .record((requested - affected) as f64); + if affected < requested { + metrics::histogram!( + "grpc_server.set_batch_activation_status.partial", + "status" => status.to_string() + ) + .record((requested - affected) as f64); + warn!( + ?status, + requested, affected, "Updated fewer rows than IDs requested in batch" + ); + } + metrics::counter!("grpc_server.set_status", "result" => "ok", "status" => status.to_string()).increment(affected); + metrics::counter!("grpc_server.set_status", "result" => "skipped_in_batch", "status" => status.to_string()).increment(requested - affected); + String::new() // No error message + } + Err(e) => { + metrics::counter!("grpc_server.set_status", "result" => "error", "status" => status.to_string()).increment(requested); + metrics::histogram!("grpc_server.set_batch_activation_status.duration") + .record(start_time.elapsed()); + e.to_string() + } + } +} diff --git a/src/grpc/server_tests.rs b/src/grpc/server_tests.rs index d503b36c..720c6a7a 100644 --- a/src/grpc/server_tests.rs +++ b/src/grpc/server_tests.rs @@ -470,8 +470,12 @@ async fn test_set_task_status_forwards_to_update_channel(#[case] adapter: &str) } #[tokio::test] -async fn test_set_task_status_update_channel_closed_returns_internal() { - let store = create_test_store("sqlite").await; +#[rstest] +#[case::sqlite("sqlite")] +#[case::postgres("postgres")] +#[allow(deprecated)] +async fn test_set_task_status_update_channel_closed_returns_internal(#[case] adapter: &str) { + let store = create_test_store(adapter).await; let config = create_config(); let (update_tx, update_rx) = mpsc::channel::(8); @@ -553,10 +557,10 @@ async fn test_set_batch_activation_status_success(#[case] adapter: &str) { .await; assert!(response.is_ok()); - let row0 = store.get_by_id("id_0").await.unwrap().expect("row exists"); - assert_eq!(row0.status, ActivationStatus::Complete); - let row1 = store.get_by_id("id_1").await.unwrap().expect("row exists"); - assert_eq!(row1.status, ActivationStatus::Complete); + let row0 = store.get_by_id("id_0").await.unwrap(); + assert!(row0.is_none(), "row should be deleted"); + let row1 = store.get_by_id("id_1").await.unwrap(); + assert!(row1.is_none(), "row should be deleted"); let row2 = store.get_by_id("id_2").await.unwrap().expect("row exists"); assert_eq!(row2.status, ActivationStatus::Failure); } @@ -584,9 +588,12 @@ async fn test_set_batch_activation_status_empty(#[case] adapter: &str) { } #[tokio::test] +#[rstest] +#[case::sqlite("sqlite")] +#[case::postgres("postgres")] #[allow(deprecated)] -async fn test_set_batch_activation_status_invalid_status() { - let store = create_test_store("sqlite").await; +async fn test_set_batch_activation_status_invalid_status(#[case] adapter: &str) { + let store = create_test_store(adapter).await; let config = create_config(); let service = TaskbrokerServer { @@ -704,10 +711,10 @@ async fn test_set_batch_activation_status_mixed(#[case] adapter: &str) { .await; assert!(response.is_ok()); - let row0 = store.get_by_id("id_0").await.unwrap().expect("row exists"); - assert_eq!(row0.status, ActivationStatus::Complete); - let row1 = store.get_by_id("id_1").await.unwrap().expect("row exists"); - assert_eq!(row1.status, ActivationStatus::Complete); + let row0 = store.get_by_id("id_0").await.unwrap(); + assert!(row0.is_none(), "row should be deleted"); + let row1 = store.get_by_id("id_1").await.unwrap(); + assert!(row1.is_none(), "row should be deleted"); let row2 = store.get_by_id("id_2").await.unwrap().expect("row exists"); assert_eq!(row2.status, ActivationStatus::Retry); } diff --git a/src/push/tests.rs b/src/push/tests.rs index ce1013b6..df821065 100644 --- a/src/push/tests.rs +++ b/src/push/tests.rs @@ -107,6 +107,10 @@ impl ActivationStore for MockStore { Ok(()) } + async fn delete_activation_batch(&self, _ids: &[String]) -> Result { + Ok(0) + } + async fn vacuum_db(&self) -> Result<()> { Ok(()) } diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index cd3c4d4a..cb8c1c31 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -943,6 +943,22 @@ impl ActivationStore for PostgresStore { .await } + #[instrument(skip_all)] + #[framed] + async fn delete_activation_batch(&self, ids: &[String]) -> Result { + retry_query(&self.config.retry, "delete_activation_batch", || async { + let mut conn = self + .acquire_write_conn_metric("delete_activation_batch") + .await?; + let result = sqlx::query("DELETE FROM inflight_taskactivations WHERE id = ANY($1)") + .bind(ids) + .execute(&mut *conn) + .await?; + Ok(result.rows_affected()) + }) + .await + } + #[instrument(skip_all)] #[framed] async fn get_retry_activations(&self) -> Result, Error> { diff --git a/src/store/adapters/sqlite.rs b/src/store/adapters/sqlite.rs index cdd051aa..d7bebbc9 100644 --- a/src/store/adapters/sqlite.rs +++ b/src/store/adapters/sqlite.rs @@ -896,6 +896,27 @@ impl ActivationStore for SqliteStore { Ok(()) } + #[instrument(skip_all)] + async fn delete_activation_batch(&self, ids: &[String]) -> Result { + let mut conn = self + .acquire_write_conn_metric("delete_activation_batch") + .await?; + + let mut query_builder = QueryBuilder::new("DELETE FROM inflight_taskactivations "); + query_builder.push(" WHERE id IN ("); + + let mut separated = query_builder.separated(", "); + + for id in ids.iter() { + separated.push_bind(id); + } + + separated.push_unseparated(")"); + + let result = query_builder.build().execute(&mut *conn).await?; + Ok(result.rows_affected()) + } + #[instrument(skip_all)] async fn get_retry_activations(&self) -> Result, Error> { Ok(sqlx::query_as( diff --git a/src/store/traits.rs b/src/store/traits.rs index db5b1825..e8f06e01 100644 --- a/src/store/traits.rs +++ b/src/store/traits.rs @@ -148,6 +148,9 @@ pub trait ActivationStore: Send + Sync { /// Delete an activation by id async fn delete_activation(&self, id: &str) -> Result<(), Error>; + /// Delete a batch of activations by id + async fn delete_activation_batch(&self, ids: &[String]) -> Result; + /// DATABASE OPERATIONS /// Trigger incremental vacuum to reclaim free pages in the database async fn vacuum_db(&self) -> Result<(), Error>; From 65c579e81effe83901c431d225e855cb714fa197 Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Fri, 19 Jun 2026 10:08:20 -0400 Subject: [PATCH 2/2] fix metric --- src/grpc/server.rs | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/grpc/server.rs b/src/grpc/server.rs index cc4826dd..ef39c507 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -265,10 +265,15 @@ impl ConsumerService for TaskbrokerServer { let status = ActivationStatus::Complete; metrics::histogram!("grpc_server.set_batch_activation_status.batch_size", "status" => status.to_string()).record(requested as f64); let result = self.store.delete_activation_batch(&completed_ids).await; - let err_msg = handle_batch_update(result, requested, status, start_time); + let (affected, err_msg) = handle_batch_update(result, requested, status); if !err_msg.is_empty() { error!("Failed to delete completed activations: {:?}", err_msg); + metrics::histogram!("grpc_server.set_batch_activation_status.duration") + .record(start_time.elapsed()); return Err(Status::internal("Failed to set batch activation status")); + } else { + metrics::counter!("upkeep.task.state_transition", "state" => "completed") + .increment(affected); } } @@ -280,9 +285,11 @@ impl ConsumerService for TaskbrokerServer { let requested = ids.len() as u64; metrics::histogram!("grpc_server.set_batch_activation_status.batch_size", "status" => status.to_string()).record(requested as f64); let result = self.store.set_status_batch(&ids, status).await; - let err_msg = handle_batch_update(result, requested, status, start_time); + let (_, err_msg) = handle_batch_update(result, requested, status); if !err_msg.is_empty() { error!("Failed to set batch activation status: {:?}", err_msg); + metrics::histogram!("grpc_server.set_batch_activation_status.duration") + .record(start_time.elapsed()); return Err(Status::internal("Failed to set batch activation status")); } } @@ -402,11 +409,9 @@ fn handle_batch_update( result: Result, requested: u64, status: ActivationStatus, - start_time: Instant, -) -> String { +) -> (u64, String) { match result { Ok(affected) => { - metrics::histogram!("upkeep.remove_completed").record(affected as f64); metrics::histogram!("grpc_server.set_batch_activation_status.affected_diff", "status" => status.to_string()) .record((requested - affected) as f64); if affected < requested { @@ -422,13 +427,11 @@ fn handle_batch_update( } metrics::counter!("grpc_server.set_status", "result" => "ok", "status" => status.to_string()).increment(affected); metrics::counter!("grpc_server.set_status", "result" => "skipped_in_batch", "status" => status.to_string()).increment(requested - affected); - String::new() // No error message + (affected, String::new()) // No error message } Err(e) => { metrics::counter!("grpc_server.set_status", "result" => "error", "status" => status.to_string()).increment(requested); - metrics::histogram!("grpc_server.set_batch_activation_status.duration") - .record(start_time.elapsed()); - e.to_string() + (requested, e.to_string()) } } }