diff --git a/src/fetch/tests.rs b/src/fetch/tests.rs index d32107f0..1a6232b2 100644 --- a/src/fetch/tests.rs +++ b/src/fetch/tests.rs @@ -147,6 +147,10 @@ impl ActivationStore for MockStore { unimplemented!() } + async fn delete_activation_batch(&self, _ids: &[String]) -> Result { + unimplemented!() + } + async fn get_retry_activations(&self) -> Result, Error> { unimplemented!() } diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 715041f4..ef39c507 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; -use anyhow::Result; +use anyhow::{Error, Result}; use chrono::Utc; use prost::Message; use sentry_protos::taskbroker::v1::consumer_service_server::ConsumerService; @@ -256,6 +256,27 @@ impl ConsumerService for TaskbrokerServer { } } + // Completed tasks are immediately deleted from the store, instead of being updated. All the relevant upkeep metrics are also updated. + let completed_ids = batches + .remove(&ActivationStatus::Complete) + .unwrap_or_default(); + if !completed_ids.is_empty() { + let requested = completed_ids.len() as u64; + let status = ActivationStatus::Complete; + metrics::histogram!("grpc_server.set_batch_activation_status.batch_size", "status" => status.to_string()).record(requested as f64); + let result = self.store.delete_activation_batch(&completed_ids).await; + let (affected, err_msg) = handle_batch_update(result, requested, status); + if !err_msg.is_empty() { + error!("Failed to delete completed activations: {:?}", err_msg); + metrics::histogram!("grpc_server.set_batch_activation_status.duration") + .record(start_time.elapsed()); + return Err(Status::internal("Failed to set batch activation status")); + } else { + metrics::counter!("upkeep.task.state_transition", "state" => "completed") + .increment(affected); + } + } + // For each status and batch of IDs, update the status for those IDs. In the case of a failure, return an error to the worker. // Track and log a warning if the number of rows updated is less than the number of IDs requested. metrics::histogram!("grpc_server.set_batch_activation_status.num_batches") @@ -263,31 +284,13 @@ impl ConsumerService for TaskbrokerServer { for (status, ids) in batches { let requested = ids.len() as u64; metrics::histogram!("grpc_server.set_batch_activation_status.batch_size", "status" => status.to_string()).record(requested as f64); - match self.store.set_status_batch(&ids, status).await { - Ok(affected) => { - metrics::histogram!("grpc_server.set_batch_activation_status.affected_diff", "status" => status.to_string()) - .record((requested - affected) as f64); - if affected < requested { - metrics::histogram!( - "grpc_server.set_batch_activation_status.partial", - "status" => status.to_string() - ) - .record((requested - affected) as f64); - warn!( - ?status, - requested, affected, "Updated fewer rows than IDs requested in batch" - ); - } - metrics::counter!("grpc_server.set_status", "result" => "ok", "status" => status.to_string()).increment(affected); - metrics::counter!("grpc_server.set_status", "result" => "skipped_in_batch", "status" => status.to_string()).increment(requested - affected); - } - Err(e) => { - metrics::counter!("grpc_server.set_status", "result" => "error", "status" => status.to_string()).increment(requested); - metrics::histogram!("grpc_server.set_batch_activation_status.duration") - .record(start_time.elapsed()); - error!("Failed to set batch activation status: {:?}", e); - return Err(Status::internal("Failed to set batch activation status")); - } + let result = self.store.set_status_batch(&ids, status).await; + let (_, err_msg) = handle_batch_update(result, requested, status); + if !err_msg.is_empty() { + error!("Failed to set batch activation status: {:?}", err_msg); + metrics::histogram!("grpc_server.set_batch_activation_status.duration") + .record(start_time.elapsed()); + return Err(Status::internal("Failed to set batch activation status")); } } @@ -401,3 +404,34 @@ pub async fn flush_updates(store: Arc, buffer: &mut Vec, + requested: u64, + status: ActivationStatus, +) -> (u64, String) { + match result { + Ok(affected) => { + metrics::histogram!("grpc_server.set_batch_activation_status.affected_diff", "status" => status.to_string()) + .record((requested - affected) as f64); + if affected < requested { + metrics::histogram!( + "grpc_server.set_batch_activation_status.partial", + "status" => status.to_string() + ) + .record((requested - affected) as f64); + warn!( + ?status, + requested, affected, "Updated fewer rows than IDs requested in batch" + ); + } + metrics::counter!("grpc_server.set_status", "result" => "ok", "status" => status.to_string()).increment(affected); + metrics::counter!("grpc_server.set_status", "result" => "skipped_in_batch", "status" => status.to_string()).increment(requested - affected); + (affected, String::new()) // No error message + } + Err(e) => { + metrics::counter!("grpc_server.set_status", "result" => "error", "status" => status.to_string()).increment(requested); + (requested, e.to_string()) + } + } +} diff --git a/src/grpc/server_tests.rs b/src/grpc/server_tests.rs index d503b36c..720c6a7a 100644 --- a/src/grpc/server_tests.rs +++ b/src/grpc/server_tests.rs @@ -470,8 +470,12 @@ async fn test_set_task_status_forwards_to_update_channel(#[case] adapter: &str) } #[tokio::test] -async fn test_set_task_status_update_channel_closed_returns_internal() { - let store = create_test_store("sqlite").await; +#[rstest] +#[case::sqlite("sqlite")] +#[case::postgres("postgres")] +#[allow(deprecated)] +async fn test_set_task_status_update_channel_closed_returns_internal(#[case] adapter: &str) { + let store = create_test_store(adapter).await; let config = create_config(); let (update_tx, update_rx) = mpsc::channel::(8); @@ -553,10 +557,10 @@ async fn test_set_batch_activation_status_success(#[case] adapter: &str) { .await; assert!(response.is_ok()); - let row0 = store.get_by_id("id_0").await.unwrap().expect("row exists"); - assert_eq!(row0.status, ActivationStatus::Complete); - let row1 = store.get_by_id("id_1").await.unwrap().expect("row exists"); - assert_eq!(row1.status, ActivationStatus::Complete); + let row0 = store.get_by_id("id_0").await.unwrap(); + assert!(row0.is_none(), "row should be deleted"); + let row1 = store.get_by_id("id_1").await.unwrap(); + assert!(row1.is_none(), "row should be deleted"); let row2 = store.get_by_id("id_2").await.unwrap().expect("row exists"); assert_eq!(row2.status, ActivationStatus::Failure); } @@ -584,9 +588,12 @@ async fn test_set_batch_activation_status_empty(#[case] adapter: &str) { } #[tokio::test] +#[rstest] +#[case::sqlite("sqlite")] +#[case::postgres("postgres")] #[allow(deprecated)] -async fn test_set_batch_activation_status_invalid_status() { - let store = create_test_store("sqlite").await; +async fn test_set_batch_activation_status_invalid_status(#[case] adapter: &str) { + let store = create_test_store(adapter).await; let config = create_config(); let service = TaskbrokerServer { @@ -704,10 +711,10 @@ async fn test_set_batch_activation_status_mixed(#[case] adapter: &str) { .await; assert!(response.is_ok()); - let row0 = store.get_by_id("id_0").await.unwrap().expect("row exists"); - assert_eq!(row0.status, ActivationStatus::Complete); - let row1 = store.get_by_id("id_1").await.unwrap().expect("row exists"); - assert_eq!(row1.status, ActivationStatus::Complete); + let row0 = store.get_by_id("id_0").await.unwrap(); + assert!(row0.is_none(), "row should be deleted"); + let row1 = store.get_by_id("id_1").await.unwrap(); + assert!(row1.is_none(), "row should be deleted"); let row2 = store.get_by_id("id_2").await.unwrap().expect("row exists"); assert_eq!(row2.status, ActivationStatus::Retry); } diff --git a/src/push/tests.rs b/src/push/tests.rs index ce1013b6..df821065 100644 --- a/src/push/tests.rs +++ b/src/push/tests.rs @@ -107,6 +107,10 @@ impl ActivationStore for MockStore { Ok(()) } + async fn delete_activation_batch(&self, _ids: &[String]) -> Result { + Ok(0) + } + async fn vacuum_db(&self) -> Result<()> { Ok(()) } diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index cd3c4d4a..cb8c1c31 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -943,6 +943,22 @@ impl ActivationStore for PostgresStore { .await } + #[instrument(skip_all)] + #[framed] + async fn delete_activation_batch(&self, ids: &[String]) -> Result { + retry_query(&self.config.retry, "delete_activation_batch", || async { + let mut conn = self + .acquire_write_conn_metric("delete_activation_batch") + .await?; + let result = sqlx::query("DELETE FROM inflight_taskactivations WHERE id = ANY($1)") + .bind(ids) + .execute(&mut *conn) + .await?; + Ok(result.rows_affected()) + }) + .await + } + #[instrument(skip_all)] #[framed] async fn get_retry_activations(&self) -> Result, Error> { diff --git a/src/store/adapters/sqlite.rs b/src/store/adapters/sqlite.rs index cdd051aa..d7bebbc9 100644 --- a/src/store/adapters/sqlite.rs +++ b/src/store/adapters/sqlite.rs @@ -896,6 +896,27 @@ impl ActivationStore for SqliteStore { Ok(()) } + #[instrument(skip_all)] + async fn delete_activation_batch(&self, ids: &[String]) -> Result { + let mut conn = self + .acquire_write_conn_metric("delete_activation_batch") + .await?; + + let mut query_builder = QueryBuilder::new("DELETE FROM inflight_taskactivations "); + query_builder.push(" WHERE id IN ("); + + let mut separated = query_builder.separated(", "); + + for id in ids.iter() { + separated.push_bind(id); + } + + separated.push_unseparated(")"); + + let result = query_builder.build().execute(&mut *conn).await?; + Ok(result.rows_affected()) + } + #[instrument(skip_all)] async fn get_retry_activations(&self) -> Result, Error> { Ok(sqlx::query_as( diff --git a/src/store/traits.rs b/src/store/traits.rs index db5b1825..e8f06e01 100644 --- a/src/store/traits.rs +++ b/src/store/traits.rs @@ -148,6 +148,9 @@ pub trait ActivationStore: Send + Sync { /// Delete an activation by id async fn delete_activation(&self, id: &str) -> Result<(), Error>; + /// Delete a batch of activations by id + async fn delete_activation_batch(&self, ids: &[String]) -> Result; + /// DATABASE OPERATIONS /// Trigger incremental vacuum to reclaim free pages in the database async fn vacuum_db(&self) -> Result<(), Error>;