Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/fetch/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ impl ActivationStore for MockStore {
unimplemented!()
}

async fn delete_activation_batch(&self, _ids: &[String]) -> Result<u64, Error> {
unimplemented!()
}

async fn get_retry_activations(&self) -> Result<Vec<Activation>, Error> {
unimplemented!()
}
Expand Down
86 changes: 60 additions & 26 deletions src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;

use anyhow::Result;
use anyhow::{Error, Result};
use chrono::Utc;
use prost::Message;
use sentry_protos::taskbroker::v1::consumer_service_server::ConsumerService;
Expand Down Expand Up @@ -256,38 +256,41 @@ impl ConsumerService for TaskbrokerServer {
}
}

// Completed tasks are immediately deleted from the store, instead of being updated. All the relevant upkeep metrics are also updated.
let completed_ids = batches
.remove(&ActivationStatus::Complete)
.unwrap_or_default();
if !completed_ids.is_empty() {
let requested = completed_ids.len() as u64;
let status = ActivationStatus::Complete;
metrics::histogram!("grpc_server.set_batch_activation_status.batch_size", "status" => status.to_string()).record(requested as f64);
let result = self.store.delete_activation_batch(&completed_ids).await;
let (affected, err_msg) = handle_batch_update(result, requested, status);
if !err_msg.is_empty() {
error!("Failed to delete completed activations: {:?}", err_msg);
metrics::histogram!("grpc_server.set_batch_activation_status.duration")
.record(start_time.elapsed());
return Err(Status::internal("Failed to set batch activation status"));
} else {
metrics::counter!("upkeep.task.state_transition", "state" => "completed")
.increment(affected);
}
}

// For each status and batch of IDs, update the status for those IDs. In the case of a failure, return an error to the worker.
// Track and log a warning if the number of rows updated is less than the number of IDs requested.
metrics::histogram!("grpc_server.set_batch_activation_status.num_batches")
.record(batches.len() as f64);
for (status, ids) in batches {
let requested = ids.len() as u64;
metrics::histogram!("grpc_server.set_batch_activation_status.batch_size", "status" => status.to_string()).record(requested as f64);
match self.store.set_status_batch(&ids, status).await {
Ok(affected) => {
metrics::histogram!("grpc_server.set_batch_activation_status.affected_diff", "status" => status.to_string())
.record((requested - affected) as f64);
if affected < requested {
metrics::histogram!(
"grpc_server.set_batch_activation_status.partial",
"status" => status.to_string()
)
.record((requested - affected) as f64);
warn!(
?status,
requested, affected, "Updated fewer rows than IDs requested in batch"
);
}
metrics::counter!("grpc_server.set_status", "result" => "ok", "status" => status.to_string()).increment(affected);
metrics::counter!("grpc_server.set_status", "result" => "skipped_in_batch", "status" => status.to_string()).increment(requested - affected);
}
Err(e) => {
metrics::counter!("grpc_server.set_status", "result" => "error", "status" => status.to_string()).increment(requested);
metrics::histogram!("grpc_server.set_batch_activation_status.duration")
.record(start_time.elapsed());
error!("Failed to set batch activation status: {:?}", e);
return Err(Status::internal("Failed to set batch activation status"));
}
let result = self.store.set_status_batch(&ids, status).await;
let (_, err_msg) = handle_batch_update(result, requested, status);
if !err_msg.is_empty() {
error!("Failed to set batch activation status: {:?}", err_msg);
metrics::histogram!("grpc_server.set_batch_activation_status.duration")
.record(start_time.elapsed());
return Err(Status::internal("Failed to set batch activation status"));
}
}

Expand Down Expand Up @@ -401,3 +404,34 @@ pub async fn flush_updates(store: Arc<dyn ActivationStore>, buffer: &mut Vec<Sta
}
}
}

fn handle_batch_update(
result: Result<u64, Error>,
requested: u64,
status: ActivationStatus,
) -> (u64, String) {
match result {
Ok(affected) => {
metrics::histogram!("grpc_server.set_batch_activation_status.affected_diff", "status" => status.to_string())
.record((requested - affected) as f64);
Comment thread
evanh marked this conversation as resolved.
if affected < requested {
metrics::histogram!(
"grpc_server.set_batch_activation_status.partial",
"status" => status.to_string()
)
.record((requested - affected) as f64);
warn!(
?status,
requested, affected, "Updated fewer rows than IDs requested in batch"
);
}
metrics::counter!("grpc_server.set_status", "result" => "ok", "status" => status.to_string()).increment(affected);
metrics::counter!("grpc_server.set_status", "result" => "skipped_in_batch", "status" => status.to_string()).increment(requested - affected);
(affected, String::new()) // No error message
}
Err(e) => {
metrics::counter!("grpc_server.set_status", "result" => "error", "status" => status.to_string()).increment(requested);
(requested, e.to_string())
}
}
}
31 changes: 19 additions & 12 deletions src/grpc/server_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,8 +470,12 @@ async fn test_set_task_status_forwards_to_update_channel(#[case] adapter: &str)
}

#[tokio::test]
async fn test_set_task_status_update_channel_closed_returns_internal() {
let store = create_test_store("sqlite").await;
#[rstest]
#[case::sqlite("sqlite")]
#[case::postgres("postgres")]
#[allow(deprecated)]
async fn test_set_task_status_update_channel_closed_returns_internal(#[case] adapter: &str) {
let store = create_test_store(adapter).await;
let config = create_config();

let (update_tx, update_rx) = mpsc::channel::<StatusUpdate>(8);
Expand Down Expand Up @@ -553,10 +557,10 @@ async fn test_set_batch_activation_status_success(#[case] adapter: &str) {
.await;
assert!(response.is_ok());

let row0 = store.get_by_id("id_0").await.unwrap().expect("row exists");
assert_eq!(row0.status, ActivationStatus::Complete);
let row1 = store.get_by_id("id_1").await.unwrap().expect("row exists");
assert_eq!(row1.status, ActivationStatus::Complete);
let row0 = store.get_by_id("id_0").await.unwrap();
assert!(row0.is_none(), "row should be deleted");
let row1 = store.get_by_id("id_1").await.unwrap();
assert!(row1.is_none(), "row should be deleted");
let row2 = store.get_by_id("id_2").await.unwrap().expect("row exists");
assert_eq!(row2.status, ActivationStatus::Failure);
}
Expand Down Expand Up @@ -584,9 +588,12 @@ async fn test_set_batch_activation_status_empty(#[case] adapter: &str) {
}

#[tokio::test]
#[rstest]
#[case::sqlite("sqlite")]
#[case::postgres("postgres")]
#[allow(deprecated)]
async fn test_set_batch_activation_status_invalid_status() {
let store = create_test_store("sqlite").await;
async fn test_set_batch_activation_status_invalid_status(#[case] adapter: &str) {
let store = create_test_store(adapter).await;
let config = create_config();

let service = TaskbrokerServer {
Expand Down Expand Up @@ -704,10 +711,10 @@ async fn test_set_batch_activation_status_mixed(#[case] adapter: &str) {
.await;
assert!(response.is_ok());

let row0 = store.get_by_id("id_0").await.unwrap().expect("row exists");
assert_eq!(row0.status, ActivationStatus::Complete);
let row1 = store.get_by_id("id_1").await.unwrap().expect("row exists");
assert_eq!(row1.status, ActivationStatus::Complete);
let row0 = store.get_by_id("id_0").await.unwrap();
assert!(row0.is_none(), "row should be deleted");
let row1 = store.get_by_id("id_1").await.unwrap();
assert!(row1.is_none(), "row should be deleted");
let row2 = store.get_by_id("id_2").await.unwrap().expect("row exists");
assert_eq!(row2.status, ActivationStatus::Retry);
}
4 changes: 4 additions & 0 deletions src/push/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ impl ActivationStore for MockStore {
Ok(())
}

async fn delete_activation_batch(&self, _ids: &[String]) -> Result<u64> {
Ok(0)
}

async fn vacuum_db(&self) -> Result<()> {
Ok(())
}
Expand Down
16 changes: 16 additions & 0 deletions src/store/adapters/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,22 @@ impl ActivationStore for PostgresStore {
.await
}

#[instrument(skip_all)]
#[framed]
async fn delete_activation_batch(&self, ids: &[String]) -> Result<u64, Error> {
retry_query(&self.config.retry, "delete_activation_batch", || async {
let mut conn = self
.acquire_write_conn_metric("delete_activation_batch")
.await?;
let result = sqlx::query("DELETE FROM inflight_taskactivations WHERE id = ANY($1)")
.bind(ids)
.execute(&mut *conn)
.await?;
Ok(result.rows_affected())
})
.await
}

#[instrument(skip_all)]
#[framed]
async fn get_retry_activations(&self) -> Result<Vec<Activation>, Error> {
Expand Down
21 changes: 21 additions & 0 deletions src/store/adapters/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,27 @@ impl ActivationStore for SqliteStore {
Ok(())
}

#[instrument(skip_all)]
async fn delete_activation_batch(&self, ids: &[String]) -> Result<u64, Error> {
let mut conn = self
.acquire_write_conn_metric("delete_activation_batch")
.await?;

let mut query_builder = QueryBuilder::new("DELETE FROM inflight_taskactivations ");
query_builder.push(" WHERE id IN (");

let mut separated = query_builder.separated(", ");

for id in ids.iter() {
separated.push_bind(id);
}

separated.push_unseparated(")");

let result = query_builder.build().execute(&mut *conn).await?;
Ok(result.rows_affected())
}

#[instrument(skip_all)]
async fn get_retry_activations(&self) -> Result<Vec<Activation>, Error> {
Ok(sqlx::query_as(
Expand Down
3 changes: 3 additions & 0 deletions src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ pub trait ActivationStore: Send + Sync {
/// Delete an activation by id
async fn delete_activation(&self, id: &str) -> Result<(), Error>;

/// Delete a batch of activations by id
async fn delete_activation_batch(&self, ids: &[String]) -> Result<u64, Error>;

/// DATABASE OPERATIONS
/// Trigger incremental vacuum to reclaim free pages in the database
async fn vacuum_db(&self) -> Result<(), Error>;
Expand Down
Loading