Skip to content
12 changes: 6 additions & 6 deletions bench/worker_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ func BenchmarkWorkerRunJob(b *testing.B) {
b.StopTimer()

for n := 0; n < b.N; n++ {
require.NoError(b, redistest.Reset(client))
require.NoError(b, redistest.Reset(client, "{ns-bench}"))

wp := work.NewWorkerPoolWithOptions(
struct{}{}, 1, "{ns1}", pool,
struct{}{}, 1, "{ns-bench}", pool,
work.WorkerPoolOptions{
SleepBackoffs: []int64{1000},
},
Expand All @@ -49,7 +49,7 @@ func BenchmarkWorkerRunJob(b *testing.B) {
return nil
})

enqueuer := work.NewEnqueuer("{ns1}", pool)
enqueuer := work.NewEnqueuer("{ns-bench}", pool)
for i := 0; i < k; i++ {
_, err := enqueuer.Enqueue("test", nil)
require.NoError(b, err)
Expand All @@ -68,11 +68,11 @@ func BenchmarkWorkerRunJob(b *testing.B) {
b.StopTimer()

for n := 0; n < b.N; n++ {
require.NoError(b, redistest.Reset(client))
require.NoError(b, redistest.Reset(client, "{ns-bench}"))

queue := work2.NewRedisQueue(client)
w := work2.NewWorker(&work2.WorkerOptions{
Namespace: "{ns1}",
Namespace: "{ns-bench}",
Queue: queue,
})
var wg sync.WaitGroup
Expand All @@ -93,7 +93,7 @@ func BenchmarkWorkerRunJob(b *testing.B) {
job := work2.NewJob()

err := queue.Enqueue(job, &work2.EnqueueOptions{
Namespace: "{ns1}",
Namespace: "{ns-bench}",
QueueID: "test",
})
require.NoError(b, err)
Expand Down
52 changes: 26 additions & 26 deletions http/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
func TestServer(t *testing.T) {
client := redistest.NewClient()
defer client.Close()
require.NoError(t, redistest.Reset(client))
require.NoError(t, redistest.Reset(client, "{ns-http}"))
q := work.NewRedisQueue(client)

srv := NewServer(&ServerOptions{
Expand Down Expand Up @@ -89,15 +89,15 @@ func TestServer(t *testing.T) {
},
{
reqMethod: "DELETE",
reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&job_id=xxx",
reqURL: "http://example.com/jobs?namespace=%7Bns-http%7D&job_id=xxx",
respCode: 200,
respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"\",\"job\":{\"ID\":\"xxx\",\"CreatedAt\":\"0001-01-01T00:00:00Z\",\"UpdatedAt\":\"0001-01-01T00:00:00Z\",\"EnqueuedAt\":\"0001-01-01T00:00:00Z\",\"Payload\":null,\"Retries\":0,\"LastError\":\"\"}}\n",
respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"\",\"job\":{\"ID\":\"xxx\",\"CreatedAt\":\"0001-01-01T00:00:00Z\",\"UpdatedAt\":\"0001-01-01T00:00:00Z\",\"EnqueuedAt\":\"0001-01-01T00:00:00Z\",\"Payload\":null,\"Retries\":0,\"LastError\":\"\"}}\n",
},
{
reqMethod: "GET",
reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&job_id=xxx",
reqURL: "http://example.com/jobs?namespace=%7Bns-http%7D&job_id=xxx",
respCode: 200,
respBody: "{\"namespace\":\"{ns1}\",\"status\":\"completed\",\"job\":{\"ID\":\"xxx\",\"CreatedAt\":\"0001-01-01T00:00:00Z\",\"UpdatedAt\":\"0001-01-01T00:00:00Z\",\"EnqueuedAt\":\"0001-01-01T00:00:00Z\",\"Payload\":null,\"Retries\":0,\"LastError\":\"\"}}\n",
respBody: "{\"namespace\":\"{ns-http}\",\"status\":\"completed\",\"job\":{\"ID\":\"xxx\",\"CreatedAt\":\"0001-01-01T00:00:00Z\",\"UpdatedAt\":\"0001-01-01T00:00:00Z\",\"EnqueuedAt\":\"0001-01-01T00:00:00Z\",\"Payload\":null,\"Retries\":0,\"LastError\":\"\"}}\n",
},
{
// bad duration
Expand All @@ -122,91 +122,91 @@ func TestServer(t *testing.T) {
reqMethod: "POST",
reqURL: "http://example.com/jobs",
reqBody: `{
"namespace": "{ns1}",
"namespace": "{ns-http}",
"queue_id": "q1",
"payload": "payload1",
"delay": "10s"
}`,
respCode: 200,
respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"[a-z0-9-]{36}\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}",
respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"[a-z0-9-]{36}\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}",
},
{
reqMethod: "GET",
reqURL: "http://example.com/metrics?namespace=%7Bns1%7D&queue_id=q1",
reqURL: "http://example.com/metrics?namespace=%7Bns-http%7D&queue_id=q1",
respCode: 200,
respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"ready_total\":0,\"scheduled_total\":1,\"total\":1,\"latency\":0}\n",
respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"ready_total\":0,\"scheduled_total\":1,\"total\":1,\"latency\":0}\n",
},
{
reqMethod: "POST",
reqURL: "http://example.com/jobs",
reqBody: `{
"namespace": "{ns1}",
"namespace": "{ns-http}",
"queue_id": "q1",
"payload": "payload1"
}`,
respCode: 200,
respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"[a-z0-9-]{36}\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}",
respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"[a-z0-9-]{36}\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}",
},
{
reqMethod: "GET",
reqURL: "http://example.com/metrics?namespace=%7Bns1%7D&queue_id=q1",
reqURL: "http://example.com/metrics?namespace=%7Bns-http%7D&queue_id=q1",
respCode: 200,
respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"ready_total\":1,\"scheduled_total\":1,\"total\":2,\"latency\":[0-9]+}\n",
respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"ready_total\":1,\"scheduled_total\":1,\"total\":2,\"latency\":[0-9]+}\n",
},
{
reqMethod: "POST",
reqURL: "http://example.com/jobs",
reqBody: `{
"namespace": "{ns1}",
"namespace": "{ns-http}",
"queue_id": "q1",
"id": "id1",
"payload": "payload1"
}`,
respCode: 200,
respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}",
respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}",
},
{
// same job id
reqMethod: "POST",
reqURL: "http://example.com/jobs",
reqBody: `{
"namespace": "{ns1}",
"namespace": "{ns-http}",
"queue_id": "q1",
"id": "id1",
"payload": "payload2"
}`,
respCode: 200,
respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}",
respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}",
},
{
reqMethod: "GET",
reqURL: "http://example.com/metrics?namespace=%7Bns1%7D&queue_id=q1",
reqURL: "http://example.com/metrics?namespace=%7Bns-http%7D&queue_id=q1",
respCode: 200,
respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"ready_total\":2,\"scheduled_total\":1,\"total\":3,\"latency\":[0-9]+}\n",
respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"ready_total\":2,\"scheduled_total\":1,\"total\":3,\"latency\":[0-9]+}\n",
},
{
reqMethod: "GET",
reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&job_id=id1",
reqURL: "http://example.com/jobs?namespace=%7Bns-http%7D&job_id=id1",
respCode: 200,
respBody: "{\"namespace\":\"{ns1}\",\"status\":\"ready\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}",
respBody: "{\"namespace\":\"{ns-http}\",\"status\":\"ready\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}",
},
{
reqMethod: "DELETE",
reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&job_id=id1",
reqURL: "http://example.com/jobs?namespace=%7Bns-http%7D&job_id=id1",
respCode: 500,
respBody: "{\"error\":\"work: empty queue id\"}\n",
},
{
reqMethod: "DELETE",
reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&queue_id=q1&job_id=id1",
reqURL: "http://example.com/jobs?namespace=%7Bns-http%7D&queue_id=q1&job_id=id1",
respCode: 200,
respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}",
respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}",
},
{
reqMethod: "GET",
reqURL: "http://example.com/metrics?namespace=%7Bns1%7D&queue_id=q1",
reqURL: "http://example.com/metrics?namespace=%7Bns-http%7D&queue_id=q1",
respCode: 200,
respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"ready_total\":1,\"scheduled_total\":1,\"total\":2,\"latency\":[0-9]+}\n",
respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"ready_total\":1,\"scheduled_total\":1,\"total\":2,\"latency\":[0-9]+}\n",
},
} {
var reqBody io.Reader
Expand Down
42 changes: 42 additions & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,31 @@ type Job struct {
Retries int64 `msgpack:"retries"`
// If the job previously fails, LastError will be populated with error string.
LastError string `msgpack:"last_error"`

// AllowPromotion controls whether Enqueue and PromoteJob may lower this
// job's score.
//
// When false (default), both operations use ZADD XX GT semantics: once a
// job is scheduled at time T, a subsequent Enqueue with score T' < T is
// a no-op, and PromoteJob cannot reduce the score. This preserves the
// "deferral" guarantee relied on by deterministic-ID jobs that may be
// enqueued multiple times concurrently (a dedup pattern) and prevents
// PromoteJob from demoting a job whose score has been bumped to
// now + InvisibleSec by Dequeue.
//
// When true, both operations use ZADD XX (no GT). This lets an
// explicit caller lower the score, for example in an opt-in retry flow
// where backoff should override Dequeue's InvisibleSec mark, and lets
// PromoteJob advance a scheduled-but-pending job to now. The caller is
// responsible for ensuring this is safe — typically that the job has a
// unique ID and is not relied on for dedup-deferral.
//
// This field is write-only at the Go API: it is persisted to Redis on
// Enqueue and consulted server-side by Enqueue and PromoteJob, but it
// is NOT rehydrated onto jobs returned by Dequeue or BulkFind — those
// always observe the zero value. PromoteJob reads the persisted value
// directly from Redis, so callers do not need to round-trip it.
AllowPromotion bool `msgpack:"-" json:",omitempty"`
}

// InvalidJobPayloadError wraps json or msgpack decoding error.
Expand Down Expand Up @@ -240,3 +265,20 @@ func (opt *FindOptions) Validate() error {
type BulkJobFinder interface {
BulkFind(jobIDs []string, opts *FindOptions) ([]*Job, error)
}

// PromoteOptions specifies how a job is promoted in the queue.
type PromoteOptions struct {
Namespace string
QueueID string
}

// Validate validates PromoteOptions.
func (opt *PromoteOptions) Validate() error {
if opt.Namespace == "" {
return ErrEmptyNamespace
}
if opt.QueueID == "" {
return ErrEmptyQueueID
}
return nil
}
16 changes: 8 additions & 8 deletions middleware/concurrent/dequeuer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
func TestDequeuer(t *testing.T) {
client := redistest.NewClient()
defer client.Close()
require.NoError(t, redistest.Reset(client))
require.NoError(t, redistest.Reset(client, "{ns-concurrent}"))

var called int
h1 := func(*work.DequeueOptions) (*work.Job, error) {
Expand All @@ -38,7 +38,7 @@ func TestDequeuer(t *testing.T) {
}

opt := &work.DequeueOptions{
Namespace: "{ns1}",
Namespace: "{ns-concurrent}",
QueueID: "q1",
At: time.Now(),
InvisibleSec: 60,
Expand Down Expand Up @@ -75,7 +75,7 @@ func TestDequeuer(t *testing.T) {

z, err := client.ZRangeByScoreWithScores(
context.Background(),
"{ns1}:lock:q1",
"{ns-concurrent}:lock:q1",
&redis.ZRangeBy{
Min: "-inf",
Max: "+inf",
Expand All @@ -87,7 +87,7 @@ func TestDequeuer(t *testing.T) {
require.EqualValues(t, opt.At.Unix()+60, z[0].Score)
require.EqualValues(t, opt.At.Unix()+60, z[1].Score)

require.NoError(t, client.ZRem(context.Background(), "{ns1}:lock:q1", "w1").Err())
require.NoError(t, client.ZRem(context.Background(), "{ns-concurrent}:lock:q1", "w1").Err())
optLater := *opt
optLater.At = opt.At.Add(10 * time.Second)
// worker 0 is locked already
Expand All @@ -105,7 +105,7 @@ func TestDequeuer(t *testing.T) {

z, err = client.ZRangeByScoreWithScores(
context.Background(),
"{ns1}:lock:q1",
"{ns-concurrent}:lock:q1",
&redis.ZRangeBy{
Min: "-inf",
Max: "+inf",
Expand Down Expand Up @@ -137,7 +137,7 @@ func TestDequeuer(t *testing.T) {

z, err = client.ZRangeByScoreWithScores(
context.Background(),
"{ns1}:lock:q1",
"{ns-concurrent}:lock:q1",
&redis.ZRangeBy{
Min: "-inf",
Max: "+inf",
Expand All @@ -155,7 +155,7 @@ func BenchmarkConcurrency(b *testing.B) {

client := redistest.NewClient()
defer client.Close()
require.NoError(b, redistest.Reset(client))
require.NoError(b, redistest.Reset(client, "{ns-concurrent}"))

var called int
h1 := func(*work.DequeueOptions) (*work.Job, error) {
Expand All @@ -178,7 +178,7 @@ func BenchmarkConcurrency(b *testing.B) {
}

opt := &work.DequeueOptions{
Namespace: "{ns1}",
Namespace: "{ns-concurrent}",
QueueID: "q1",
At: time.Now(),
InvisibleSec: 60,
Expand Down
2 changes: 1 addition & 1 deletion middleware/concurrent/local_dequeuer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestLocalDequeuer(t *testing.T) {
}

opt := &work.DequeueOptions{
Namespace: "{ns1}",
Namespace: "{ns-concurrent}",
QueueID: "q1",
At: time.Now(),
InvisibleSec: 60,
Expand Down
2 changes: 1 addition & 1 deletion middleware/discard/after_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func TestAfter(t *testing.T) {
job := work.NewJob()
opt := &work.DequeueOptions{
Namespace: "{ns1}",
Namespace: "{ns-discard}",
QueueID: "q1",
}
d := After(time.Minute)
Expand Down
2 changes: 1 addition & 1 deletion middleware/discard/invalid_payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
func TestInvalidPayload(t *testing.T) {
job := work.NewJob()
opt := &work.DequeueOptions{
Namespace: "{ns1}",
Namespace: "{ns-discard}",
QueueID: "q1",
}
h := InvalidPayload(func(*work.Job, *work.DequeueOptions) error {
Expand Down
2 changes: 1 addition & 1 deletion middleware/discard/max_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func TestMaxRetry(t *testing.T) {
job := work.NewJob()
opt := &work.DequeueOptions{
Namespace: "{ns1}",
Namespace: "{ns-discard}",
QueueID: "q1",
}
d := MaxRetry(1)
Expand Down
13 changes: 7 additions & 6 deletions middleware/heartbeat/heartbeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,13 @@ func Heartbeater(hopts *HeartbeaterOptions) work.HandleMiddleware {
}
}()

err := f(job, opt)
cancel()
<-done
job.UpdatedAt = copiedJob.UpdatedAt
job.EnqueuedAt = copiedJob.EnqueuedAt
return err
defer func() {
cancel()
<-done
job.UpdatedAt = copiedJob.UpdatedAt
job.EnqueuedAt = copiedJob.EnqueuedAt
}()
return f(job, opt)
}
}
}
Loading