Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ All notable changes to this project are documented in this file.

## [Unreleased]

- **chore**: improve `FromRequest` api adding `Option<T>` ([#745](https://github.com/geofmureithi/apalis/pull/745))
- **chore (api)!**: metadata as a key-value store ([#747](https://github.com/geofmureithi/apalis/pull/747))
- **chore**: improve `FromRequest` api adding `Option<T>` ([#746](https://github.com/geofmureithi/apalis/pull/746))
- **chore**: bump to v1.0.0 rc.9 ([#744](https://github.com/geofmureithi/apalis/pull/744))
- **feat (api)!**: remove queue input in expose endpoints ([#741](https://github.com/geofmureithi/apalis/pull/741))
- **feat**: idempotency for sql tasks ([#736](https://github.com/geofmureithi/apalis/pull/736))
- **chore**: bump to v1.0.0 rc.8 ([#734](https://github.com/geofmureithi/apalis/pull/734))
- **feat**: idempotency for tasks ([#726](https://github.com/apalis-dev/apalis/pull/726))
- **fix(tracing)**: improve OpenTelemetry context propagation across worker tracing layers ([#716](https://github.com/apalis-dev/apalis/pull/716))
- **deps(deps)**: bump sentry-* from 0.46.2 to 0.47.0 ([#715](https://github.com/apalis-dev/apalis/pull/715))
- **deps(deps)**: bump sentry-\* from 0.46.2 to 0.47.0 ([#715](https://github.com/apalis-dev/apalis/pull/715))
- **chore**: bump to v1.0.0 rc.7 ([#714](https://github.com/apalis-dev/apalis/pull/714))
- **chore**: bump to v1.0.0 rc.6 ([#705](https://github.com/apalis-dev/apalis/pull/705))
- **fix (workflow)**: remove Display constraints in workflow service ([#704](https://github.com/apalis-dev/apalis/pull/704))
Expand Down
16 changes: 8 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 28 additions & 7 deletions apalis-core/src/backend/impls/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
use crate::backend::BackendExt;
use crate::backend::codec::IdentityCodec;
use crate::features_table;
use crate::task::extensions::Extensions;
use crate::task::metadata::{MetadataExt, MetadataStore};
use crate::{
backend::{Backend, TaskStream},
task::{
Expand Down Expand Up @@ -109,25 +109,44 @@ pub type BoxedReceiver<Args, Ctx> = Pin<Box<dyn Stream<Item = Task<Args, Ctx, Ra
ListWorkers => not_supported("List all workers registered with the backend"),
ListTasks => not_supported("List all tasks in the backend"),
}]
pub struct MemoryStorage<Args, Ctx = Extensions> {
pub struct MemoryStorage<Args, Ctx = MemoryContext> {
pub(super) sender: MemorySink<Args, Ctx>,
pub(super) receiver: BoxedReceiver<Args, Ctx>,
}

impl<Args: Send + 'static> Default for MemoryStorage<Args, Extensions> {
impl<Args: Send + 'static> Default for MemoryStorage<Args, MemoryContext> {
fn default() -> Self {
Self::new()
}
}

impl<Args: Send + 'static> MemoryStorage<Args, Extensions> {
/// Store extra context related to task metadata
#[derive(Debug, Clone, Default)]
pub struct MemoryContext {
metadata: MetadataStore,
}

impl MetadataExt for MemoryContext {
fn metadata(&self) -> &MetadataStore {
&self.metadata
}

fn metadata_mut(&mut self) -> &mut MetadataStore {
&mut self.metadata
}
}

impl<Args: Send + 'static> MemoryStorage<Args, MemoryContext> {
/// Create a new in-memory storage
#[must_use]
pub fn new() -> Self {
let (sender, receiver) = unbounded();
let sender = Box::new(sender)
as Box<
dyn Sink<Task<Args, Extensions, RandomId>, Error = SendError> + Send + Sync + Unpin,
dyn Sink<Task<Args, MemoryContext, RandomId>, Error = SendError>
+ Send
+ Sync
+ Unpin,
>;
Self {
sender: MemorySink {
Expand Down Expand Up @@ -170,7 +189,7 @@ impl<Args, Ctx> Sink<Task<Args, Ctx, RandomId>> for MemoryStorage<Args, Ctx> {
}
}

type ArcMemorySink<Args, Ctx = Extensions> = Arc<
type ArcMemorySink<Args, Ctx = MemoryContext> = Arc<
Mutex<
Box<dyn Sink<Task<Args, Ctx, RandomId>, Error = SendError> + Send + Sync + Unpin + 'static>,
>,
Expand All @@ -179,7 +198,7 @@ type ArcMemorySink<Args, Ctx = Extensions> = Arc<
type ArcIdempotencySet = Arc<Mutex<HashSet<String>>>;

/// Memory sink for sending tasks to the in-memory backend
pub struct MemorySink<Args, Ctx = Extensions> {
pub struct MemorySink<Args, Ctx = MemoryContext> {
pub(super) inner: ArcMemorySink<Args, Ctx>,
pub(super) idempotency_keys: ArcIdempotencySet,
}
Expand Down Expand Up @@ -226,6 +245,8 @@ impl<Args, Ctx> Sink<Task<Args, Ctx, RandomId>> for MemorySink<Args, Ctx> {
) -> Result<(), Self::Error> {
let this = self.get_mut();

let _ = item.parts.data.get_or_insert(MetadataStore::default());

// Ensure task id exists
item.parts
.task_id
Expand Down
21 changes: 14 additions & 7 deletions apalis-core/src/task/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,17 @@
//! ```
//!
use crate::task::{
Parts, Task, attempt::Attempt, extensions::Extensions, metadata::MetadataExt, status::Status,
Parts, Task,
attempt::Attempt,
extensions::Extensions,
metadata::{Metadata, MetadataExt},
status::Status,
task_id::TaskId,
};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{
fmt::Debug,
time::{Duration, SystemTime, UNIX_EPOCH},
};

/// Builder for creating [`Task`] instances with optional configuration
#[derive(Debug)]
Expand Down Expand Up @@ -87,13 +94,13 @@ impl<Args, Ctx, IdType> TaskBuilder<Args, Ctx, IdType> {

/// Insert a value into the task's ctx context
#[must_use]
pub fn meta<M>(mut self, value: M) -> Self
pub fn meta<M>(mut self, value: &M) -> Self
where
Ctx: MetadataExt<M>,
Ctx: MetadataExt,
M: Metadata,
M::Error: Debug,
{
self.ctx
.inject(value)
.unwrap_or_else(|_| panic!("Failed to inject item into context"));
self.ctx.inject(value).expect("Could not add Metadata");
self
}

Expand Down
59 changes: 47 additions & 12 deletions apalis-core/src/task/extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use std::fmt;
use std::hash::{BuildHasherDefault, Hasher};

use crate::task::data::MissingDataError;
use crate::task::metadata::MetadataExt;

type AnyMap = HashMap<TypeId, Box<dyn AnyClone + Send + Sync>, BuildHasherDefault<IdHasher>>;

Expand Down Expand Up @@ -256,6 +255,53 @@ impl Extensions {
}
}
}

/// Get a mutable reference to the value of type `T`,
/// inserting `value` if it does not already exist.
///
/// # Example
///
/// ```
/// # use apalis_core::task::extensions::Extensions;
/// let mut ext = Extensions::new();
///
/// let value = ext.get_or_insert(String::from("Hello"));
/// value.push_str(" World");
///
/// assert_eq!(ext.get::<String>().unwrap(), "Hello World");
/// ```
pub fn get_or_insert<T>(&mut self, value: T) -> &mut T
where
T: Clone + Send + Sync + 'static,
{
self.get_or_insert_with(|| value)
}

/// Get a mutable reference to the value of type `T`,
/// inserting the result of `f` if it does not already exist.
///
/// # Example
///
/// ```
/// # use apalis_core::task::extensions::Extensions;
/// let mut ext = Extensions::new();
///
/// let value = ext.get_or_insert_with(|| String::from("Hello"));
/// value.push_str(" World");
///
/// assert_eq!(ext.get::<String>().unwrap(), "Hello World");
/// ```
pub fn get_or_insert_with<T, F>(&mut self, f: F) -> &mut T
where
T: Clone + Send + Sync + 'static,
F: FnOnce() -> T,
{
if self.get::<T>().is_none() {
self.insert(f());
}

self.get_mut::<T>().expect("value was just inserted")
}
}

impl fmt::Debug for Extensions {
Expand Down Expand Up @@ -295,17 +341,6 @@ impl Clone for Box<dyn AnyClone + Send + Sync> {
}
}

impl<T: Clone + Send + Sync + 'static> MetadataExt<T> for Extensions {
type Error = MissingDataError;
fn inject(&mut self, value: T) -> Result<(), Self::Error> {
self.insert(value);
Ok(())
}
fn extract(&self) -> Result<T, Self::Error> {
Ok(self.get_checked::<T>()?.clone())
}
}

#[test]
fn test_extensions() {
#[derive(Clone, Debug, PartialEq)]
Expand Down
Loading
Loading