From 2d5297f0e11585a9c8eb331d974615791881ba2b Mon Sep 17 00:00:00 2001 From: Nathan Yergler Date: Mon, 4 Jan 2021 15:23:19 -0800 Subject: [PATCH 1/6] Pass at least one key to script executions This allows Redis to route our execution to the correct node when running in a Redis Cluster. --- redis_queue.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/redis_queue.go b/redis_queue.go index f61524f..48f6cdf 100644 --- a/redis_queue.go +++ b/redis_queue.go @@ -29,6 +29,13 @@ type RedisQueue interface { MetricsExporter } +// scriptKey returns a slice of strings containing at least one of the keys to +// be used by a script. This allows Redis route our script execution to the +// correct node in the event we're using a namespace. +func scriptKey(ns, queueID string) []string { + return []string{strings.Join([]string{ns, "queue", queueID}, ":")} +} + // NewRedisQueue creates a new queue stored in redis. func NewRedisQueue(client redis.UniversalClient) RedisQueue { enqueueScript := redis.NewScript(` @@ -189,7 +196,7 @@ func (q *redisQueue) BulkEnqueue(jobs []*Job, opt *EnqueueOptions) error { args[2+3*i+1] = job.ID args[2+3*i+2] = jobm } - return q.enqueueScript.Run(context.Background(), q.client, nil, args...).Err() + return q.enqueueScript.Run(context.Background(), q.client, scriptKey(opt.Namespace, opt.QueueID), args...).Err() } func (q *redisQueue) Dequeue(opt *DequeueOptions) (*Job, error) { @@ -205,7 +212,7 @@ func (q *redisQueue) BulkDequeue(count int64, opt *DequeueOptions) ([]*Job, erro if err != nil { return nil, err } - res, err := q.dequeueScript.Run(context.Background(), q.client, nil, + res, err := q.dequeueScript.Run(context.Background(), q.client, scriptKey(opt.Namespace, opt.QueueID), opt.Namespace, opt.QueueID, opt.At.Unix(), @@ -249,7 +256,7 @@ func (q *redisQueue) BulkAck(jobs []*Job, opt *AckOptions) error { for i, job := range jobs { args[2+i] = job.ID } - return q.ackScript.Run(context.Background(), q.client, nil, args...).Err() + return q.ackScript.Run(context.Background(), q.client, scriptKey(opt.Namespace, opt.QueueID), args...).Err() } func (q *redisQueue) BulkFind(jobIDs []string, opt *FindOptions) ([]*Job, error) { @@ -265,7 +272,7 @@ func (q *redisQueue) BulkFind(jobIDs []string, opt *FindOptions) ([]*Job, error) for i, jobID := range jobIDs { args[1+i] = jobID } - res, err := q.findScript.Run(context.Background(), q.client, nil, args...).Result() + res, err := q.findScript.Run(context.Background(), q.client, scriptKey(opt.Namespace, jobIDs[0]), args...).Result() if err != nil { return nil, err } From 414947a7f3a4bbc498e0648445731b97c5a507ac Mon Sep 17 00:00:00 2001 From: James Palawaga Date: Thu, 10 Nov 2022 11:37:59 -0500 Subject: [PATCH 2/6] Remove context deadline Quick fix. We use the heartbeat middlewear which allows long-running jobs to work. However, the context is also ending up canceled which is causing other issues. --- worker.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/worker.go b/worker.go index 2e070c6..db3fcaa 100644 --- a/worker.go +++ b/worker.go @@ -157,8 +157,8 @@ func (w *Worker) RunOnce(ctx context.Context, queueID string, h ContextHandleFun } handle := func(job *Job, o *DequeueOptions) error { - ctx, cancel := context.WithTimeout(ctx, opt.MaxExecutionTime) - defer cancel() + //ctx, cancel := context.WithTimeout(ctx, opt.MaxExecutionTime) + //defer cancel() return h(ctx, job, o) } for _, mw := range opt.HandleMiddleware { From 2e0cbc010959ee16a9625decf7f06c7234ad4d07 Mon Sep 17 00:00:00 2001 From: Nathan Yergler Date: Fri, 15 Mar 2024 12:13:16 -0700 Subject: [PATCH 3/6] Only update jobs with a later queued_at timestamp --- redis_queue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redis_queue.go b/redis_queue.go index ef9931b..ecc93a4 100644 --- a/redis_queue.go +++ b/redis_queue.go @@ -64,7 +64,7 @@ func NewRedisQueue(client redis.UniversalClient) RedisQueue { table.insert(zadd_args, at) table.insert(zadd_args, job_key) end - return redis.call("zadd", queue_key, unpack(zadd_args)) + return redis.call("zadd", queue_key, "gt", unpack(zadd_args)) `) dequeueScript := redis.NewScript(` From 4c851607f33debf16e7ec492ed14b38f49d4ca49 Mon Sep 17 00:00:00 2001 From: Nathan Yergler Date: Mon, 8 Dec 2025 09:47:14 -0800 Subject: [PATCH 4/6] Add support for promoting queued jobs --- job.go | 17 +++++++ redis_queue.go | 40 +++++++++++++++++ redis_queue_test.go | 105 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 162 insertions(+) diff --git a/job.go b/job.go index be4e221..329a293 100644 --- a/job.go +++ b/job.go @@ -240,3 +240,20 @@ func (opt *FindOptions) Validate() error { type BulkJobFinder interface { BulkFind(jobIDs []string, opts *FindOptions) ([]*Job, error) } + +// PromoteOptions specifies how a job is promoted in the queue. +type PromoteOptions struct { + Namespace string + QueueID string +} + +// Validate validates PromoteOptions. +func (opt *PromoteOptions) Validate() error { + if opt.Namespace == "" { + return ErrEmptyNamespace + } + if opt.QueueID == "" { + return ErrEmptyQueueID + } + return nil +} diff --git a/redis_queue.go b/redis_queue.go index ecc93a4..0c58025 100644 --- a/redis_queue.go +++ b/redis_queue.go @@ -33,6 +33,14 @@ type redisQueue struct { metricScript *redis.Script } +// JobPromoter can update a job's score in the queue to make it immediately +// eligible for dequeuing without re-enqueuing the entire job. +type JobPromoter interface { + // PromoteJob updates the job's score to time.Now(). Only affects jobs + // that exist and have scores <= now (won't demote jobs being processed). + PromoteJob(jobID string, opt *PromoteOptions) error +} + // RedisQueue implements Queue with other additional capabilities type RedisQueue interface { Queue @@ -40,6 +48,7 @@ type RedisQueue interface { BulkDequeuer BulkJobFinder MetricsExporter + JobPromoter } // NewRedisQueue creates a new queue stored in redis. @@ -342,6 +351,37 @@ func (q *redisQueue) bulkFindSmallBatch(jobIDs []string, opt *FindOptions) ([]*J return jobs, nil } +func (q *redisQueue) PromoteJob(jobID string, opt *PromoteOptions) error { + err := opt.Validate() + if err != nil { + return err + } + + queueKey := opt.Namespace + ":queue:" + opt.QueueID + jobKey := opt.Namespace + ":job:" + jobID + + // ZADD with both XX and GT flags: + // - XX: Only update existing members (don't resurrect completed jobs) + // - GT: Only update if new score > current score (don't demote processing jobs) + // + // Safety guarantees: + // 1. If job was completed and removed: XX prevents re-adding it + // 2. If job is being processed (score = now + invisibleSec): GT prevents demotion + // 3. If job is pending (score <= now): Both flags allow promotion + return q.client.ZAddArgs( + context.Background(), + queueKey, + redis.ZAddArgs{ + XX: true, // Only update existing + GT: true, // Only if new score is greater + Members: []redis.Z{{ + Score: float64(time.Now().Unix()), + Member: jobKey, + }}, + }, + ).Err() +} + func (q *redisQueue) GetQueueMetrics(opt *QueueMetricsOptions) (*QueueMetrics, error) { err := opt.Validate() if err != nil { diff --git a/redis_queue_test.go b/redis_queue_test.go index e97c7e4..eb5d0ad 100644 --- a/redis_queue_test.go +++ b/redis_queue_test.go @@ -351,3 +351,108 @@ func TestRedisQueueBulkEnqueue(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(jobCount), count) } + +func TestRedisQueuePromoteJob(t *testing.T) { + client := redistest.NewClient() + defer client.Close() + require.NoError(t, redistest.Reset(client)) + q := NewRedisQueue(client) + + // Enqueue two jobs with old timestamps (in the past) + job1 := NewJob() + job1.EnqueuedAt = time.Now().Add(-time.Hour) // 1 hour ago + job2 := NewJob() + job2.EnqueuedAt = time.Now().Add(-time.Hour) // 1 hour ago + + opts := &EnqueueOptions{ + Namespace: "{ns1}", + QueueID: "q1", + } + + err := q.Enqueue(job1, opts) + require.NoError(t, err) + err = q.Enqueue(job2, opts) + require.NoError(t, err) + + // Check initial score of job2 (should be old timestamp) + queueKey := "{ns1}:queue:q1" + jobKey := fmt.Sprintf("{ns1}:job:%s", job2.ID) + initialScore, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + require.Equal(t, float64(job2.EnqueuedAt.Unix()), initialScore) + + // Promote job2 + beforePromote := time.Now().Unix() + err = q.PromoteJob(job2.ID, &PromoteOptions{ + Namespace: opts.Namespace, + QueueID: opts.QueueID, + }) + require.NoError(t, err) + afterPromote := time.Now().Unix() + + // Check that job2's score was updated to now + newScore, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + require.GreaterOrEqual(t, int64(newScore), beforePromote) + require.LessOrEqual(t, int64(newScore), afterPromote) + + // Promote non-existent job should not error (XX flag prevents adding) + err = q.PromoteJob("non-existent-job", &PromoteOptions{ + Namespace: opts.Namespace, + QueueID: opts.QueueID, + }) + require.NoError(t, err) + + // Verify non-existent job was not added to queue + exists, err := client.ZScore(context.Background(), queueKey, "{ns1}:job:non-existent-job").Result() + require.Error(t, err) // redis.Nil error expected + require.Equal(t, float64(0), exists) +} + +func TestRedisQueuePromoteJobDoesNotDemote(t *testing.T) { + client := redistest.NewClient() + defer client.Close() + require.NoError(t, redistest.Reset(client)) + q := NewRedisQueue(client) + + // Enqueue a job + job := NewJob() + job.EnqueuedAt = time.Now() + + opts := &EnqueueOptions{ + Namespace: "{ns1}", + QueueID: "q1", + } + + err := q.Enqueue(job, opts) + require.NoError(t, err) + + // Dequeue the job (this sets score to now + invisibleSec) + dequeueOpts := &DequeueOptions{ + Namespace: "{ns1}", + QueueID: "q1", + At: time.Now(), + InvisibleSec: 60, // 60 seconds + } + dequeuedJob, err := q.Dequeue(dequeueOpts) + require.NoError(t, err) + require.Equal(t, job.ID, dequeuedJob.ID) + + // Check that job's score is now + invisibleSec + queueKey := "{ns1}:queue:q1" + jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) + scoreAfterDequeue, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + + // Try to promote the job (should not demote it because of GT flag) + err = q.PromoteJob(job.ID, &PromoteOptions{ + Namespace: opts.Namespace, + QueueID: opts.QueueID, + }) + require.NoError(t, err) + + // Verify score hasn't changed (GT flag prevented demotion) + scoreAfterPromote, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + require.Equal(t, scoreAfterDequeue, scoreAfterPromote) +} From 33d8963acfa6dfcb9f6f33c9e55ed5ef036f2a94 Mon Sep 17 00:00:00 2001 From: Nathan Yergler Date: Wed, 13 May 2026 21:07:21 -0700 Subject: [PATCH 5/6] Per-job AllowPromotion opts out of ZADD GT guard (#4) Enqueue and PromoteJob have both used ZADD XX GT since #3 to preserve the deferral guarantee for jobs enqueued with deterministic IDs: once a schedule sits at time T in the future, a duplicate enqueue at T' < T must be a no-op, and PromoteJob must not demote a job whose score has been bumped to now + InvisibleSec by Dequeue. That is the right semantic for dedup-style jobs that may race their own re-enqueue. It is the wrong semantic for two cases that have grown up around the queue since: 1. Worker retry rescheduling. When a handler returns an error the retry middleware computes a backoff delay and calls Enqueue with score = now + delay. With GT, that score is rejected because the Dequeue invisibility mark (now + InvisibleSec, typically 60s) is greater. The configured Backoff is effectively dead for any value less than InvisibleSec; gated handlers re-run only on the InvisibleSec cadence regardless of how short the backoff is. 2. Subqueue PromoteOnAck. The subqueue middleware advances the next gated job after the prior handler Acks by calling PromoteJob on that job's ID. With GT, the score (still sitting at the InvisibleSec mark from its last dequeue/gated cycle) is also rejected and the next job continues to wait out its full invisibility window. Add a per-job AllowPromotion flag. Default false preserves today's GT semantics so dedup-deferral jobs are unaffected; setting true causes Enqueue to use plain ZADD XX so backoff can lower the score, and causes PromoteJob to use ZADD XX (without GT) so the next gated subqueue entry can be advanced. The flag is read only after the job is initially enqueued. The Enqueue Lua script splits the per-job arg list into two ZADD calls (one with gt, one without) so a mixed BulkEnqueue stays atomic. PromoteJob does an HGET to read the flag before issuing the ZADD; this adds one round-trip per promotion but keeps the API stable. --- bench/worker_bench_test.go | 12 +- http/server_test.go | 52 ++-- job.go | 25 ++ middleware/concurrent/dequeuer_test.go | 16 +- middleware/concurrent/local_dequeuer_test.go | 2 +- middleware/discard/after_test.go | 2 +- middleware/discard/invalid_payload_test.go | 2 +- middleware/discard/max_retry_test.go | 2 +- middleware/heartbeat/heartbeater_test.go | 6 +- middleware/logrus/logger_test.go | 4 +- middleware/prometheus/metrics_test.go | 8 +- middleware/recovery/catch_panic_test.go | 2 +- middleware/unique/enqueuer_test.go | 16 +- redis_queue.go | 137 ++++++--- redis_queue_test.go | 299 +++++++++++++++---- redistest/client.go | 58 +++- sidekiq/queue_test.go | 6 +- worker_test.go | 92 +++--- 18 files changed, 537 insertions(+), 204 deletions(-) diff --git a/bench/worker_bench_test.go b/bench/worker_bench_test.go index f6e2204..739361b 100644 --- a/bench/worker_bench_test.go +++ b/bench/worker_bench_test.go @@ -34,10 +34,10 @@ func BenchmarkWorkerRunJob(b *testing.B) { b.StopTimer() for n := 0; n < b.N; n++ { - require.NoError(b, redistest.Reset(client)) + require.NoError(b, redistest.Reset(client, "{ns-bench}")) wp := work.NewWorkerPoolWithOptions( - struct{}{}, 1, "{ns1}", pool, + struct{}{}, 1, "{ns-bench}", pool, work.WorkerPoolOptions{ SleepBackoffs: []int64{1000}, }, @@ -49,7 +49,7 @@ func BenchmarkWorkerRunJob(b *testing.B) { return nil }) - enqueuer := work.NewEnqueuer("{ns1}", pool) + enqueuer := work.NewEnqueuer("{ns-bench}", pool) for i := 0; i < k; i++ { _, err := enqueuer.Enqueue("test", nil) require.NoError(b, err) @@ -68,11 +68,11 @@ func BenchmarkWorkerRunJob(b *testing.B) { b.StopTimer() for n := 0; n < b.N; n++ { - require.NoError(b, redistest.Reset(client)) + require.NoError(b, redistest.Reset(client, "{ns-bench}")) queue := work2.NewRedisQueue(client) w := work2.NewWorker(&work2.WorkerOptions{ - Namespace: "{ns1}", + Namespace: "{ns-bench}", Queue: queue, }) var wg sync.WaitGroup @@ -93,7 +93,7 @@ func BenchmarkWorkerRunJob(b *testing.B) { job := work2.NewJob() err := queue.Enqueue(job, &work2.EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-bench}", QueueID: "test", }) require.NoError(b, err) diff --git a/http/server_test.go b/http/server_test.go index 2069c8c..cfef907 100644 --- a/http/server_test.go +++ b/http/server_test.go @@ -16,7 +16,7 @@ import ( func TestServer(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-http}")) q := work.NewRedisQueue(client) srv := NewServer(&ServerOptions{ @@ -89,15 +89,15 @@ func TestServer(t *testing.T) { }, { reqMethod: "DELETE", - reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&job_id=xxx", + reqURL: "http://example.com/jobs?namespace=%7Bns-http%7D&job_id=xxx", respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"\",\"job\":{\"ID\":\"xxx\",\"CreatedAt\":\"0001-01-01T00:00:00Z\",\"UpdatedAt\":\"0001-01-01T00:00:00Z\",\"EnqueuedAt\":\"0001-01-01T00:00:00Z\",\"Payload\":null,\"Retries\":0,\"LastError\":\"\"}}\n", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"\",\"job\":{\"ID\":\"xxx\",\"CreatedAt\":\"0001-01-01T00:00:00Z\",\"UpdatedAt\":\"0001-01-01T00:00:00Z\",\"EnqueuedAt\":\"0001-01-01T00:00:00Z\",\"Payload\":null,\"Retries\":0,\"LastError\":\"\"}}\n", }, { reqMethod: "GET", - reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&job_id=xxx", + reqURL: "http://example.com/jobs?namespace=%7Bns-http%7D&job_id=xxx", respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"status\":\"completed\",\"job\":{\"ID\":\"xxx\",\"CreatedAt\":\"0001-01-01T00:00:00Z\",\"UpdatedAt\":\"0001-01-01T00:00:00Z\",\"EnqueuedAt\":\"0001-01-01T00:00:00Z\",\"Payload\":null,\"Retries\":0,\"LastError\":\"\"}}\n", + respBody: "{\"namespace\":\"{ns-http}\",\"status\":\"completed\",\"job\":{\"ID\":\"xxx\",\"CreatedAt\":\"0001-01-01T00:00:00Z\",\"UpdatedAt\":\"0001-01-01T00:00:00Z\",\"EnqueuedAt\":\"0001-01-01T00:00:00Z\",\"Payload\":null,\"Retries\":0,\"LastError\":\"\"}}\n", }, { // bad duration @@ -122,91 +122,91 @@ func TestServer(t *testing.T) { reqMethod: "POST", reqURL: "http://example.com/jobs", reqBody: `{ - "namespace": "{ns1}", + "namespace": "{ns-http}", "queue_id": "q1", "payload": "payload1", "delay": "10s" }`, respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"[a-z0-9-]{36}\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"[a-z0-9-]{36}\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", }, { reqMethod: "GET", - reqURL: "http://example.com/metrics?namespace=%7Bns1%7D&queue_id=q1", + reqURL: "http://example.com/metrics?namespace=%7Bns-http%7D&queue_id=q1", respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"ready_total\":0,\"scheduled_total\":1,\"total\":1,\"latency\":0}\n", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"ready_total\":0,\"scheduled_total\":1,\"total\":1,\"latency\":0}\n", }, { reqMethod: "POST", reqURL: "http://example.com/jobs", reqBody: `{ - "namespace": "{ns1}", + "namespace": "{ns-http}", "queue_id": "q1", "payload": "payload1" }`, respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"[a-z0-9-]{36}\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"[a-z0-9-]{36}\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", }, { reqMethod: "GET", - reqURL: "http://example.com/metrics?namespace=%7Bns1%7D&queue_id=q1", + reqURL: "http://example.com/metrics?namespace=%7Bns-http%7D&queue_id=q1", respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"ready_total\":1,\"scheduled_total\":1,\"total\":2,\"latency\":[0-9]+}\n", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"ready_total\":1,\"scheduled_total\":1,\"total\":2,\"latency\":[0-9]+}\n", }, { reqMethod: "POST", reqURL: "http://example.com/jobs", reqBody: `{ - "namespace": "{ns1}", + "namespace": "{ns-http}", "queue_id": "q1", "id": "id1", "payload": "payload1" }`, respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", }, { // same job id reqMethod: "POST", reqURL: "http://example.com/jobs", reqBody: `{ - "namespace": "{ns1}", + "namespace": "{ns-http}", "queue_id": "q1", "id": "id1", "payload": "payload2" }`, respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", }, { reqMethod: "GET", - reqURL: "http://example.com/metrics?namespace=%7Bns1%7D&queue_id=q1", + reqURL: "http://example.com/metrics?namespace=%7Bns-http%7D&queue_id=q1", respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"ready_total\":2,\"scheduled_total\":1,\"total\":3,\"latency\":[0-9]+}\n", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"ready_total\":2,\"scheduled_total\":1,\"total\":3,\"latency\":[0-9]+}\n", }, { reqMethod: "GET", - reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&job_id=id1", + reqURL: "http://example.com/jobs?namespace=%7Bns-http%7D&job_id=id1", respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"status\":\"ready\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", + respBody: "{\"namespace\":\"{ns-http}\",\"status\":\"ready\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", }, { reqMethod: "DELETE", - reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&job_id=id1", + reqURL: "http://example.com/jobs?namespace=%7Bns-http%7D&job_id=id1", respCode: 500, respBody: "{\"error\":\"work: empty queue id\"}\n", }, { reqMethod: "DELETE", - reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&queue_id=q1&job_id=id1", + reqURL: "http://example.com/jobs?namespace=%7Bns-http%7D&queue_id=q1&job_id=id1", respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", }, { reqMethod: "GET", - reqURL: "http://example.com/metrics?namespace=%7Bns1%7D&queue_id=q1", + reqURL: "http://example.com/metrics?namespace=%7Bns-http%7D&queue_id=q1", respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"ready_total\":1,\"scheduled_total\":1,\"total\":2,\"latency\":[0-9]+}\n", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"ready_total\":1,\"scheduled_total\":1,\"total\":2,\"latency\":[0-9]+}\n", }, } { var reqBody io.Reader diff --git a/job.go b/job.go index 329a293..de8cc17 100644 --- a/job.go +++ b/job.go @@ -30,6 +30,31 @@ type Job struct { Retries int64 `msgpack:"retries"` // If the job previously fails, LastError will be populated with error string. LastError string `msgpack:"last_error"` + + // AllowPromotion controls whether Enqueue and PromoteJob may lower this + // job's score. + // + // When false (default), both operations use ZADD XX GT semantics: once a + // job is scheduled at time T, a subsequent Enqueue with score T' < T is + // a no-op, and PromoteJob cannot reduce the score. This preserves the + // "deferral" guarantee relied on by deterministic-ID jobs that may be + // enqueued multiple times concurrently (a dedup pattern) and prevents + // PromoteJob from demoting a job whose score has been bumped to + // now + InvisibleSec by Dequeue. + // + // When true, both operations use ZADD XX (no GT). This lets an + // explicit caller lower the score, for example in an opt-in retry flow + // where backoff should override Dequeue's InvisibleSec mark, and lets + // PromoteJob advance a scheduled-but-pending job to now. The caller is + // responsible for ensuring this is safe — typically that the job has a + // unique ID and is not relied on for dedup-deferral. + // + // This field is write-only at the Go API: it is persisted to Redis on + // Enqueue and consulted server-side by Enqueue and PromoteJob, but it + // is NOT rehydrated onto jobs returned by Dequeue or BulkFind — those + // always observe the zero value. PromoteJob reads the persisted value + // directly from Redis, so callers do not need to round-trip it. + AllowPromotion bool `msgpack:"-" json:",omitempty"` } // InvalidJobPayloadError wraps json or msgpack decoding error. diff --git a/middleware/concurrent/dequeuer_test.go b/middleware/concurrent/dequeuer_test.go index 805c7c3..e903e65 100644 --- a/middleware/concurrent/dequeuer_test.go +++ b/middleware/concurrent/dequeuer_test.go @@ -15,7 +15,7 @@ import ( func TestDequeuer(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-concurrent}")) var called int h1 := func(*work.DequeueOptions) (*work.Job, error) { @@ -38,7 +38,7 @@ func TestDequeuer(t *testing.T) { } opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-concurrent}", QueueID: "q1", At: time.Now(), InvisibleSec: 60, @@ -75,7 +75,7 @@ func TestDequeuer(t *testing.T) { z, err := client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:lock:q1", + "{ns-concurrent}:lock:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -87,7 +87,7 @@ func TestDequeuer(t *testing.T) { require.EqualValues(t, opt.At.Unix()+60, z[0].Score) require.EqualValues(t, opt.At.Unix()+60, z[1].Score) - require.NoError(t, client.ZRem(context.Background(), "{ns1}:lock:q1", "w1").Err()) + require.NoError(t, client.ZRem(context.Background(), "{ns-concurrent}:lock:q1", "w1").Err()) optLater := *opt optLater.At = opt.At.Add(10 * time.Second) // worker 0 is locked already @@ -105,7 +105,7 @@ func TestDequeuer(t *testing.T) { z, err = client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:lock:q1", + "{ns-concurrent}:lock:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -137,7 +137,7 @@ func TestDequeuer(t *testing.T) { z, err = client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:lock:q1", + "{ns-concurrent}:lock:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -155,7 +155,7 @@ func BenchmarkConcurrency(b *testing.B) { client := redistest.NewClient() defer client.Close() - require.NoError(b, redistest.Reset(client)) + require.NoError(b, redistest.Reset(client, "{ns-concurrent}")) var called int h1 := func(*work.DequeueOptions) (*work.Job, error) { @@ -178,7 +178,7 @@ func BenchmarkConcurrency(b *testing.B) { } opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-concurrent}", QueueID: "q1", At: time.Now(), InvisibleSec: 60, diff --git a/middleware/concurrent/local_dequeuer_test.go b/middleware/concurrent/local_dequeuer_test.go index b3971a0..988db93 100644 --- a/middleware/concurrent/local_dequeuer_test.go +++ b/middleware/concurrent/local_dequeuer_test.go @@ -30,7 +30,7 @@ func TestLocalDequeuer(t *testing.T) { } opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-concurrent}", QueueID: "q1", At: time.Now(), InvisibleSec: 60, diff --git a/middleware/discard/after_test.go b/middleware/discard/after_test.go index e07bf83..9f8105c 100644 --- a/middleware/discard/after_test.go +++ b/middleware/discard/after_test.go @@ -12,7 +12,7 @@ import ( func TestAfter(t *testing.T) { job := work.NewJob() opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-discard}", QueueID: "q1", } d := After(time.Minute) diff --git a/middleware/discard/invalid_payload_test.go b/middleware/discard/invalid_payload_test.go index 5ef2c8c..3f706ae 100644 --- a/middleware/discard/invalid_payload_test.go +++ b/middleware/discard/invalid_payload_test.go @@ -10,7 +10,7 @@ import ( func TestInvalidPayload(t *testing.T) { job := work.NewJob() opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-discard}", QueueID: "q1", } h := InvalidPayload(func(*work.Job, *work.DequeueOptions) error { diff --git a/middleware/discard/max_retry_test.go b/middleware/discard/max_retry_test.go index c8b15a7..b5257f4 100644 --- a/middleware/discard/max_retry_test.go +++ b/middleware/discard/max_retry_test.go @@ -11,7 +11,7 @@ import ( func TestMaxRetry(t *testing.T) { job := work.NewJob() opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-discard}", QueueID: "q1", } d := MaxRetry(1) diff --git a/middleware/heartbeat/heartbeater_test.go b/middleware/heartbeat/heartbeater_test.go index 98e1b56..23acc57 100644 --- a/middleware/heartbeat/heartbeater_test.go +++ b/middleware/heartbeat/heartbeater_test.go @@ -14,11 +14,11 @@ import ( func TestHeartbeater(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-heartbeat}")) job := work.NewJob() opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-heartbeat}", QueueID: "q1", At: time.Now(), InvisibleSec: 60, @@ -41,7 +41,7 @@ func TestHeartbeater(t *testing.T) { z, err := client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-heartbeat}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", diff --git a/middleware/logrus/logger_test.go b/middleware/logrus/logger_test.go index effb308..b192e89 100644 --- a/middleware/logrus/logger_test.go +++ b/middleware/logrus/logger_test.go @@ -11,7 +11,7 @@ import ( func TestHandleFuncLogger(t *testing.T) { job := work.NewJob() opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-logrus}", QueueID: "q1", } h := HandleFuncLogger(func(*work.Job, *work.DequeueOptions) error { @@ -31,7 +31,7 @@ func TestHandleFuncLogger(t *testing.T) { func TestEnqueueFuncLogger(t *testing.T) { job := work.NewJob() opt := &work.EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-logrus}", QueueID: "q1", } h := EnqueueFuncLogger(func(*work.Job, *work.EnqueueOptions) error { diff --git a/middleware/prometheus/metrics_test.go b/middleware/prometheus/metrics_test.go index f593e89..6921a57 100644 --- a/middleware/prometheus/metrics_test.go +++ b/middleware/prometheus/metrics_test.go @@ -20,7 +20,7 @@ func TestHandleFuncMetrics(t *testing.T) { job := work.NewJob() opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-prometheus}", QueueID: "q1", } h := HandleFuncMetrics(func(*work.Job, *work.DequeueOptions) error { @@ -58,7 +58,7 @@ func TestEnqueueFuncMetrics(t *testing.T) { job := work.NewJob() opt := &work.EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-prometheus}", QueueID: "q1", } h := EnqueueFuncMetrics(func(*work.Job, *work.EnqueueOptions) error { @@ -93,10 +93,10 @@ func TestExportWorkerMetrics(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-prometheus}")) w := work.NewWorker(&work.WorkerOptions{ - Namespace: "{ns1}", + Namespace: "{ns-prometheus}", Queue: work.NewRedisQueue(client), }) err = w.Register("test", diff --git a/middleware/recovery/catch_panic_test.go b/middleware/recovery/catch_panic_test.go index 0f3e3f2..1b8eb16 100644 --- a/middleware/recovery/catch_panic_test.go +++ b/middleware/recovery/catch_panic_test.go @@ -10,7 +10,7 @@ import ( func TestCatchPanic(t *testing.T) { job := work.NewJob() opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-recovery}", QueueID: "q1", } h := CatchPanic(func(*work.Job, *work.DequeueOptions) error { diff --git a/middleware/unique/enqueuer_test.go b/middleware/unique/enqueuer_test.go index 3c217e7..2b52c0a 100644 --- a/middleware/unique/enqueuer_test.go +++ b/middleware/unique/enqueuer_test.go @@ -13,7 +13,7 @@ import ( func TestEnqueuerBypass(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-unique}")) enq := Enqueuer(&EnqueuerOptions{ Client: client, @@ -30,7 +30,7 @@ func TestEnqueuerBypass(t *testing.T) { for i := 0; i < 3; i++ { job := work.NewJob() err := h(job, &work.EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-unique}", QueueID: "q1", }) require.NoError(t, err) @@ -41,7 +41,7 @@ func TestEnqueuerBypass(t *testing.T) { func TestEnqueuer(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-unique}")) enq := Enqueuer(&EnqueuerOptions{ Client: client, @@ -58,7 +58,7 @@ func TestEnqueuer(t *testing.T) { for i := 0; i < 3; i++ { job := work.NewJob() err := h(job, &work.EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-unique}", QueueID: "q1", }) require.NoError(t, err) @@ -66,10 +66,10 @@ func TestEnqueuer(t *testing.T) { require.Equal(t, 1, called) for i := 0; i < 3; i++ { - require.NoError(t, client.Del(context.Background(), "{ns1}:unique:q1:9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08").Err()) + require.NoError(t, client.Del(context.Background(), "{ns-unique}:unique:q1:9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08").Err()) job := work.NewJob() err := h(job, &work.EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-unique}", QueueID: "q1", }) require.NoError(t, err) @@ -82,7 +82,7 @@ func BenchmarkEnqueuer(b *testing.B) { client := redistest.NewClient() defer client.Close() - require.NoError(b, redistest.Reset(client)) + require.NoError(b, redistest.Reset(client, "{ns-unique}")) enq := Enqueuer(&EnqueuerOptions{ Client: client, @@ -101,7 +101,7 @@ func BenchmarkEnqueuer(b *testing.B) { for n := 0; n < b.N; n++ { job := work.NewJob() h(job, &work.EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-unique}", QueueID: "q1", }) } diff --git a/redis_queue.go b/redis_queue.go index 0c58025..bfb7245 100644 --- a/redis_queue.go +++ b/redis_queue.go @@ -10,8 +10,7 @@ import ( "github.com/redis/go-redis/v9" ) -func batchSlice(n int) [][]int { - const size = 1000 +func batchSliceWithSize(n int, size int) [][]int { var batches [][]int for i := 0; i < n; i += size { j := i + size @@ -23,6 +22,11 @@ func batchSlice(n int) [][]int { return batches } +func batchSlice(n int) [][]int { + const size = 1000 + return batchSliceWithSize(n, size) +} + type redisQueue struct { client redis.UniversalClient @@ -30,6 +34,7 @@ type redisQueue struct { dequeueScript *redis.Script ackScript *redis.Script findScript *redis.Script + promoteScript *redis.Script metricScript *redis.Script } @@ -58,22 +63,46 @@ func NewRedisQueue(client redis.UniversalClient) RedisQueue { local queue_id = ARGV[2] local queue_key = table.concat({ns, "queue", queue_id}, ":") - local zadd_args = {} + -- Per-job AllowPromotion (passed alongside each job) selects between + -- two ZADD variants. Guarded entries use ZADD ... gt so a duplicate + -- enqueue cannot demote an already-deferred deterministic-ID job; + -- promoted entries use plain ZADD so an explicit opt-in caller can + -- lower the score below the InvisibleSec mark set by Dequeue. + local guarded_args = {} + local promoted_args = {} - for i = 3,table.getn(ARGV),3 do + for i = 3,table.getn(ARGV),4 do local at = tonumber(ARGV[i]) - local job_id = ARGV[i+1] - local jobm = ARGV[i+2] + local requested_allow_promotion = ARGV[i+1] + local job_id = ARGV[i+2] + local jobm = ARGV[i+3] local job_key = table.concat({ns, "job", job_id}, ":") + local allow_promotion = redis.call("hget", job_key, "allow_promotion") + if allow_promotion == false then + allow_promotion = requested_allow_promotion + redis.call("hset", job_key, "allow_promotion", allow_promotion) + end -- update job fields redis.call("hset", job_key, "msgpack", jobm) - -- enqueue - table.insert(zadd_args, at) - table.insert(zadd_args, job_key) + if allow_promotion == "1" then + table.insert(promoted_args, at) + table.insert(promoted_args, job_key) + else + table.insert(guarded_args, at) + table.insert(guarded_args, job_key) + end + end + + local added = 0 + if table.getn(guarded_args) > 0 then + added = added + tonumber(redis.call("zadd", queue_key, "gt", unpack(guarded_args))) + end + if table.getn(promoted_args) > 0 then + added = added + tonumber(redis.call("zadd", queue_key, unpack(promoted_args))) end - return redis.call("zadd", queue_key, "gt", unpack(zadd_args)) + return added `) dequeueScript := redis.NewScript(` @@ -155,6 +184,30 @@ func NewRedisQueue(client redis.UniversalClient) RedisQueue { return ret `) + // PromoteJob is read-then-write (HGET allow_promotion, then ZADD) + // done atomically in a single script so an Ack + re-enqueue between + // the two operations cannot let a stale "AllowPromotion=true" read + // demote a freshly-enqueued job whose owner did not opt in. + promoteScript := redis.NewScript(` + local ns = ARGV[1] + local queue_id = ARGV[2] + local job_id = ARGV[3] + local at = ARGV[4] + local queue_key = table.concat({ns, "queue", queue_id}, ":") + local job_key = table.concat({ns, "job", job_id}, ":") + + -- XX always: never (re-)add a job whose hash is gone (Ack'd or never + -- enqueued). GT only when the job did not opt into promotion — without + -- GT, an explicit caller can lower the score from Dequeue's + -- InvisibleSec mark back down to now. + local allow_promotion = redis.call("hget", job_key, "allow_promotion") + if allow_promotion == "1" then + return redis.call("zadd", queue_key, "XX", at, job_key) + else + return redis.call("zadd", queue_key, "XX", "GT", at, job_key) + end + `) + metricScript := redis.NewScript(` local ns = ARGV[1] local queue_id = ARGV[2] @@ -183,6 +236,7 @@ func NewRedisQueue(client redis.UniversalClient) RedisQueue { dequeueScript: dequeueScript, ackScript: ackScript, findScript: findScript, + promoteScript: promoteScript, metricScript: metricScript, } } @@ -192,7 +246,11 @@ func (q *redisQueue) Enqueue(job *Job, opt *EnqueueOptions) error { } func (q *redisQueue) BulkEnqueue(jobs []*Job, opt *EnqueueOptions) error { - for _, batch := range batchSlice(len(jobs)) { + // Keep the total script ARGV count within the previous 3-args-per-job budget. + // bulkEnqueueSmallBatch now sends 4 values per job, so the maximum safe batch + // size is 1000 * 3 / 4 = 750 jobs. + const bulkEnqueueBatchSize = 750 + for _, batch := range batchSliceWithSize(len(jobs), bulkEnqueueBatchSize) { err := q.bulkEnqueueSmallBatch(jobs[batch[0]:batch[1]], opt) if err != nil { return err @@ -209,7 +267,7 @@ func (q *redisQueue) bulkEnqueueSmallBatch(jobs []*Job, opt *EnqueueOptions) err if len(jobs) == 0 { return nil } - args := make([]interface{}, 2+3*len(jobs)) + args := make([]interface{}, 2+4*len(jobs)) args[0] = opt.Namespace args[1] = opt.QueueID for i, job := range jobs { @@ -217,9 +275,14 @@ func (q *redisQueue) bulkEnqueueSmallBatch(jobs []*Job, opt *EnqueueOptions) err if err != nil { return err } - args[2+3*i] = job.EnqueuedAt.Unix() - args[2+3*i+1] = job.ID - args[2+3*i+2] = jobm + args[2+4*i] = job.EnqueuedAt.Unix() + if job.AllowPromotion { + args[2+4*i+1] = "1" + } else { + args[2+4*i+1] = "0" + } + args[2+4*i+2] = job.ID + args[2+4*i+3] = jobm } return q.enqueueScript.Run(context.Background(), q.client, []string{opt.Namespace}, args...).Err() } @@ -357,29 +420,33 @@ func (q *redisQueue) PromoteJob(jobID string, opt *PromoteOptions) error { return err } - queueKey := opt.Namespace + ":queue:" + opt.QueueID - jobKey := opt.Namespace + ":job:" + jobID - - // ZADD with both XX and GT flags: - // - XX: Only update existing members (don't resurrect completed jobs) - // - GT: Only update if new score > current score (don't demote processing jobs) + // promoteScript reads the separately-stored AllowPromotion metadata and + // performs the ZADD atomically. With AllowPromotion=false (default), the + // ZADD keeps the GT guard and is effectively a no-op for any job whose + // score sits in the future (either deferred via dedup or in-flight via + // Dequeue's InvisibleSec mark). With AllowPromotion=true, the caller + // has asserted that the job's calling pattern is safe to demote — the + // typical use case is a subqueue handler middleware that promotes the + // next gated job after the prior handler Acks. // - // Safety guarantees: - // 1. If job was completed and removed: XX prevents re-adding it - // 2. If job is being processed (score = now + invisibleSec): GT prevents demotion - // 3. If job is pending (score <= now): Both flags allow promotion - return q.client.ZAddArgs( + // If the job is no longer stored (already Ack'd or never enqueued), + // the script's XX flag prevents (re-)adding it, so a missing hash is a + // no-op rather than an error. + err = q.promoteScript.Run( context.Background(), - queueKey, - redis.ZAddArgs{ - XX: true, // Only update existing - GT: true, // Only if new score is greater - Members: []redis.Z{{ - Score: float64(time.Now().Unix()), - Member: jobKey, - }}, - }, + q.client, + []string{opt.Namespace}, + opt.Namespace, + opt.QueueID, + jobID, + time.Now().Unix(), ).Err() + if errors.Is(err, redis.Nil) { + // ZADD XX with no member added returns nil; go-redis surfaces this + // as redis.Nil from the script. Treat it as a successful no-op. + return nil + } + return err } func (q *redisQueue) GetQueueMetrics(opt *QueueMetricsOptions) (*QueueMetrics, error) { diff --git a/redis_queue_test.go b/redis_queue_test.go index eb5d0ad..a4983ea 100644 --- a/redis_queue_test.go +++ b/redis_queue_test.go @@ -14,7 +14,7 @@ import ( func TestRedisQueueEnqueue(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) q := NewRedisQueue(client) type message struct { @@ -26,23 +26,24 @@ func TestRedisQueueEnqueue(t *testing.T) { require.NoError(t, err) err = q.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) - jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) + jobKey := fmt.Sprintf("{ns-work}:job:%s", job.ID) h, err := client.HGetAll(context.Background(), jobKey).Result() require.NoError(t, err) jobm, err := marshal(job) require.NoError(t, err) require.Equal(t, map[string]string{ - "msgpack": string(jobm), + "allow_promotion": "0", + "msgpack": string(jobm), }, h) jobs, err := q.BulkFind([]string{job.ID, "not-exist-id"}, &FindOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", }) require.NoError(t, err) require.Len(t, jobs, 2) @@ -52,12 +53,12 @@ func TestRedisQueueEnqueue(t *testing.T) { jobs[0].LastError = "hello world" err = q.Enqueue(jobs[0], &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) jobs, err = q.BulkFind([]string{job.ID}, &FindOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", }) require.NoError(t, err) require.Len(t, jobs, 1) @@ -66,7 +67,7 @@ func TestRedisQueueEnqueue(t *testing.T) { z, err := client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-work}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -77,14 +78,14 @@ func TestRedisQueueEnqueue(t *testing.T) { require.EqualValues(t, job.EnqueuedAt.Unix(), z[0].Score) err = q.Enqueue(job.Delay(time.Minute), &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) z, err = client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-work}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -98,7 +99,7 @@ func TestRedisQueueEnqueue(t *testing.T) { func TestRedisQueueDequeue(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) q := NewRedisQueue(client) type message struct { @@ -108,10 +109,10 @@ func TestRedisQueueDequeue(t *testing.T) { job := NewJob() err := job.MarshalPayload(message{Text: "hello"}) require.NoError(t, err) - jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) + jobKey := fmt.Sprintf("{ns-work}:job:%s", job.ID) err = q.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) @@ -119,7 +120,7 @@ func TestRedisQueueDequeue(t *testing.T) { now := job.EnqueuedAt.Add(123 * time.Second) jobDequeued, err := q.Dequeue(&DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", At: now, InvisibleSec: 0, @@ -129,7 +130,7 @@ func TestRedisQueueDequeue(t *testing.T) { z, err := client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-work}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -140,7 +141,7 @@ func TestRedisQueueDequeue(t *testing.T) { require.EqualValues(t, job.EnqueuedAt.Unix(), z[0].Score) jobDequeued, err = q.Dequeue(&DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", At: now, InvisibleSec: 60, @@ -153,12 +154,13 @@ func TestRedisQueueDequeue(t *testing.T) { jobm, err := marshal(job) require.NoError(t, err) require.Equal(t, map[string]string{ - "msgpack": string(jobm), + "allow_promotion": "0", + "msgpack": string(jobm), }, h) z, err = client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-work}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -170,7 +172,7 @@ func TestRedisQueueDequeue(t *testing.T) { // empty _, err = q.Dequeue(&DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", At: now, InvisibleSec: 60, @@ -182,7 +184,7 @@ func TestRedisQueueDequeue(t *testing.T) { func TestRedisQueueDequeueDeletedJob(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) q := NewRedisQueue(client) type message struct { @@ -194,25 +196,26 @@ func TestRedisQueueDequeueDeletedJob(t *testing.T) { require.NoError(t, err) err = q.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) - jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) + jobKey := fmt.Sprintf("{ns-work}:job:%s", job.ID) h, err := client.HGetAll(context.Background(), jobKey).Result() require.NoError(t, err) jobm, err := marshal(job) require.NoError(t, err) require.Equal(t, map[string]string{ - "msgpack": string(jobm), + "allow_promotion": "0", + "msgpack": string(jobm), }, h) require.NoError(t, client.Del(context.Background(), jobKey).Err()) _, err = q.Dequeue(&DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", At: job.EnqueuedAt, InvisibleSec: 60, @@ -221,7 +224,7 @@ func TestRedisQueueDequeueDeletedJob(t *testing.T) { z, err := client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-work}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -233,22 +236,22 @@ func TestRedisQueueDequeueDeletedJob(t *testing.T) { func TestRedisQueueAck(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) q := NewRedisQueue(client) job := NewJob() err := q.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) - jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) + jobKey := fmt.Sprintf("{ns-work}:job:%s", job.ID) z, err := client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-work}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -263,14 +266,14 @@ func TestRedisQueueAck(t *testing.T) { require.EqualValues(t, 1, e) err = q.Ack(job, &AckOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) z, err = client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-work}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -283,7 +286,7 @@ func TestRedisQueueAck(t *testing.T) { require.EqualValues(t, 0, e) err = q.Ack(job, &AckOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) @@ -292,36 +295,36 @@ func TestRedisQueueAck(t *testing.T) { func TestRedisQueueGetQueueMetrics(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) q := NewRedisQueue(client) job := NewJob() err := q.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) m, err := q.GetQueueMetrics(&QueueMetricsOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", At: job.EnqueuedAt, }) require.NoError(t, err) - require.Equal(t, "{ns1}", m.Namespace) + require.Equal(t, "{ns-work}", m.Namespace) require.Equal(t, "q1", m.QueueID) require.EqualValues(t, 1, m.ReadyTotal) require.EqualValues(t, 0, m.ScheduledTotal) require.True(t, 0 < m.Latency && m.Latency < time.Minute) m, err = q.GetQueueMetrics(&QueueMetricsOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", At: job.EnqueuedAt.Add(-time.Second), }) require.NoError(t, err) - require.Equal(t, "{ns1}", m.Namespace) + require.Equal(t, "{ns-work}", m.Namespace) require.Equal(t, "q1", m.QueueID) require.EqualValues(t, 0, m.ReadyTotal) require.EqualValues(t, 1, m.ScheduledTotal) @@ -331,7 +334,7 @@ func TestRedisQueueGetQueueMetrics(t *testing.T) { func TestRedisQueueBulkEnqueue(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) q := NewRedisQueue(client) const jobCount = 100000 @@ -342,12 +345,12 @@ func TestRedisQueueBulkEnqueue(t *testing.T) { } err := q.BulkEnqueue(jobs, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) - count, err := client.ZCard(context.Background(), "{ns1}:queue:q1").Result() + count, err := client.ZCard(context.Background(), "{ns-work}:queue:q1").Result() require.NoError(t, err) require.Equal(t, int64(jobCount), count) } @@ -355,7 +358,7 @@ func TestRedisQueueBulkEnqueue(t *testing.T) { func TestRedisQueuePromoteJob(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) q := NewRedisQueue(client) // Enqueue two jobs with old timestamps (in the past) @@ -365,7 +368,7 @@ func TestRedisQueuePromoteJob(t *testing.T) { job2.EnqueuedAt = time.Now().Add(-time.Hour) // 1 hour ago opts := &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", } @@ -375,8 +378,8 @@ func TestRedisQueuePromoteJob(t *testing.T) { require.NoError(t, err) // Check initial score of job2 (should be old timestamp) - queueKey := "{ns1}:queue:q1" - jobKey := fmt.Sprintf("{ns1}:job:%s", job2.ID) + queueKey := "{ns-work}:queue:q1" + jobKey := fmt.Sprintf("{ns-work}:job:%s", job2.ID) initialScore, err := client.ZScore(context.Background(), queueKey, jobKey).Result() require.NoError(t, err) require.Equal(t, float64(job2.EnqueuedAt.Unix()), initialScore) @@ -404,15 +407,167 @@ func TestRedisQueuePromoteJob(t *testing.T) { require.NoError(t, err) // Verify non-existent job was not added to queue - exists, err := client.ZScore(context.Background(), queueKey, "{ns1}:job:non-existent-job").Result() + exists, err := client.ZScore(context.Background(), queueKey, "{ns-work}:job:non-existent-job").Result() require.Error(t, err) // redis.Nil error expected require.Equal(t, float64(0), exists) } +func TestRedisQueueEnqueueGuardDoesNotDemote(t *testing.T) { + // AllowPromotion defaults to false. A second Enqueue of the same job + // with an earlier score must be a no-op so deterministic-ID dedup + // jobs cannot have their deferred run time clobbered. + client := redistest.NewClient() + defer client.Close() + require.NoError(t, redistest.Reset(client, "{ns-work}")) + q := NewRedisQueue(client) + + opts := &EnqueueOptions{Namespace: "{ns-work}", QueueID: "q1"} + + job := NewJob() + deferredAt := time.Now().Add(time.Minute) + job.EnqueuedAt = deferredAt + require.NoError(t, q.Enqueue(job, opts)) + + queueKey := "{ns-work}:queue:q1" + jobKey := fmt.Sprintf("{ns-work}:job:%s", job.ID) + scoreAfterFirst, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + require.EqualValues(t, deferredAt.Unix(), scoreAfterFirst) + + // Re-enqueue the same ID with an earlier score (the dedup pattern). + job.EnqueuedAt = time.Now() + job.AllowPromotion = true + require.NoError(t, q.Enqueue(job, opts)) + + scoreAfterSecond, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + require.Equal(t, scoreAfterFirst, scoreAfterSecond, "GT must reject the earlier-score re-enqueue when AllowPromotion is false") + + require.NoError(t, q.PromoteJob(job.ID, &PromoteOptions{ + Namespace: opts.Namespace, + QueueID: opts.QueueID, + })) + + scoreAfterPromote, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + require.Equal(t, scoreAfterFirst, scoreAfterPromote, "no-op re-enqueue must not flip AllowPromotion") +} + +func TestRedisQueueEnqueueAllowPromotionDemotes(t *testing.T) { + // AllowPromotion=true opts the job out of the GT guard so an explicit + // opt-in retry flow can lower the score from the Dequeue InvisibleSec + // mark back down to now + backoff. + client := redistest.NewClient() + defer client.Close() + require.NoError(t, redistest.Reset(client, "{ns-work}")) + q := NewRedisQueue(client) + + opts := &EnqueueOptions{Namespace: "{ns-work}", QueueID: "q1"} + + job := NewJob() + job.AllowPromotion = true + deferredAt := time.Now().Add(time.Minute) + job.EnqueuedAt = deferredAt + require.NoError(t, q.Enqueue(job, opts)) + + queueKey := "{ns-work}:queue:q1" + jobKey := fmt.Sprintf("{ns-work}:job:%s", job.ID) + + // Re-enqueue the same ID with an earlier score. + earlierAt := time.Now() + job.EnqueuedAt = earlierAt + require.NoError(t, q.Enqueue(job, opts)) + + scoreAfter, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + require.EqualValues(t, earlierAt.Unix(), scoreAfter, "plain ZADD must lower the score when AllowPromotion is true") +} + +func TestRedisQueueBulkEnqueueMixedAllowPromotion(t *testing.T) { + // A single BulkEnqueue containing both guarded (AllowPromotion=false) + // and promotable (AllowPromotion=true) jobs must apply the right + // semantic to each, atomically. + client := redistest.NewClient() + defer client.Close() + require.NoError(t, redistest.Reset(client, "{ns-work}")) + q := NewRedisQueue(client) + + opts := &EnqueueOptions{Namespace: "{ns-work}", QueueID: "q1"} + + guarded := NewJob() + deferredAt := time.Now().Add(time.Minute) + guarded.EnqueuedAt = deferredAt + + promotable := NewJob() + promotable.AllowPromotion = true + promotable.EnqueuedAt = deferredAt + + require.NoError(t, q.BulkEnqueue([]*Job{guarded, promotable}, opts)) + + // Re-enqueue both with an earlier score. + earlierAt := time.Now() + guarded.EnqueuedAt = earlierAt + promotable.EnqueuedAt = earlierAt + require.NoError(t, q.BulkEnqueue([]*Job{guarded, promotable}, opts)) + + queueKey := "{ns-work}:queue:q1" + + guardedScore, err := client.ZScore(context.Background(), queueKey, fmt.Sprintf("{ns-work}:job:%s", guarded.ID)).Result() + require.NoError(t, err) + require.EqualValues(t, deferredAt.Unix(), guardedScore, "guarded job must retain its later score") + + promotableScore, err := client.ZScore(context.Background(), queueKey, fmt.Sprintf("{ns-work}:job:%s", promotable.ID)).Result() + require.NoError(t, err) + require.EqualValues(t, earlierAt.Unix(), promotableScore, "promotable job must accept the earlier score") +} + +func TestRedisQueuePromoteJobAllowPromotionDemotes(t *testing.T) { + // PromoteJob on a job with AllowPromotion=true must be able to advance + // the score even when the job is currently sitting at the InvisibleSec + // mark from a prior Dequeue — that is precisely the state the subqueue + // PromoteOnAck path needs to advance. + client := redistest.NewClient() + defer client.Close() + require.NoError(t, redistest.Reset(client, "{ns-work}")) + q := NewRedisQueue(client) + + opts := &EnqueueOptions{Namespace: "{ns-work}", QueueID: "q1"} + + job := NewJob() + job.AllowPromotion = true + job.EnqueuedAt = time.Now() + require.NoError(t, q.Enqueue(job, opts)) + + dequeued, err := q.Dequeue(&DequeueOptions{ + Namespace: "{ns-work}", + QueueID: "q1", + At: time.Now(), + InvisibleSec: 60, + }) + require.NoError(t, err) + require.Equal(t, job.ID, dequeued.ID) + + queueKey := "{ns-work}:queue:q1" + jobKey := fmt.Sprintf("{ns-work}:job:%s", job.ID) + scoreAfterDequeue, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + + beforePromote := time.Now().Unix() + require.NoError(t, q.PromoteJob(job.ID, &PromoteOptions{ + Namespace: opts.Namespace, + QueueID: opts.QueueID, + })) + + scoreAfterPromote, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + require.Less(t, int64(scoreAfterPromote), int64(scoreAfterDequeue), "PromoteJob must lower the score for AllowPromotion=true jobs") + require.GreaterOrEqual(t, int64(scoreAfterPromote), beforePromote) +} + func TestRedisQueuePromoteJobDoesNotDemote(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) q := NewRedisQueue(client) // Enqueue a job @@ -420,7 +575,7 @@ func TestRedisQueuePromoteJobDoesNotDemote(t *testing.T) { job.EnqueuedAt = time.Now() opts := &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", } @@ -429,7 +584,7 @@ func TestRedisQueuePromoteJobDoesNotDemote(t *testing.T) { // Dequeue the job (this sets score to now + invisibleSec) dequeueOpts := &DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", At: time.Now(), InvisibleSec: 60, // 60 seconds @@ -439,8 +594,8 @@ func TestRedisQueuePromoteJobDoesNotDemote(t *testing.T) { require.Equal(t, job.ID, dequeuedJob.ID) // Check that job's score is now + invisibleSec - queueKey := "{ns1}:queue:q1" - jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) + queueKey := "{ns-work}:queue:q1" + jobKey := fmt.Sprintf("{ns-work}:job:%s", job.ID) scoreAfterDequeue, err := client.ZScore(context.Background(), queueKey, jobKey).Result() require.NoError(t, err) @@ -456,3 +611,45 @@ func TestRedisQueuePromoteJobDoesNotDemote(t *testing.T) { require.NoError(t, err) require.Equal(t, scoreAfterDequeue, scoreAfterPromote) } + +func TestRedisQueuePromoteJobMissingAllowPromotionDoesNotDemote(t *testing.T) { + client := redistest.NewClient() + defer client.Close() + require.NoError(t, redistest.Reset(client, "{ns-work}")) + q := NewRedisQueue(client) + + job := NewJob() + job.EnqueuedAt = time.Now() + + opts := &EnqueueOptions{ + Namespace: "{ns-work}", + QueueID: "q1", + } + + require.NoError(t, q.Enqueue(job, opts)) + + jobKey := fmt.Sprintf("{ns-work}:job:%s", job.ID) + require.NoError(t, client.HDel(context.Background(), jobKey, "allow_promotion").Err()) + + dequeuedJob, err := q.Dequeue(&DequeueOptions{ + Namespace: "{ns-work}", + QueueID: "q1", + At: time.Now(), + InvisibleSec: 60, + }) + require.NoError(t, err) + require.Equal(t, job.ID, dequeuedJob.ID) + + queueKey := "{ns-work}:queue:q1" + scoreAfterDequeue, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + + require.NoError(t, q.PromoteJob(job.ID, &PromoteOptions{ + Namespace: opts.Namespace, + QueueID: opts.QueueID, + })) + + scoreAfterPromote, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + require.Equal(t, scoreAfterDequeue, scoreAfterPromote) +} diff --git a/redistest/client.go b/redistest/client.go index 89ca770..0a62f34 100644 --- a/redistest/client.go +++ b/redistest/client.go @@ -23,12 +23,56 @@ func NewClient() redis.UniversalClient { }) } -// Reset is used to clear redis for next test. -func Reset(client redis.UniversalClient) error { - if cc, ok := client.(*redis.ClusterClient); ok { - return cc.ForEachMaster(context.Background(), func(ctx context.Context, c *redis.Client) error { - return c.FlushAll(ctx).Err() - }) +// Reset deletes keys belonging to the given namespaces so the caller starts +// from a clean slate. Scoping cleanup to namespaces lets tests in different +// packages run in parallel against the same Redis without one test's reset +// wiping another test's in-progress data (which a FlushAll would do). +// +// Passing no namespaces falls back to FlushAll for legacy callers. +func Reset(client redis.UniversalClient, namespaces ...string) error { + ctx := context.Background() + if len(namespaces) == 0 { + if cc, ok := client.(*redis.ClusterClient); ok { + return cc.ForEachMaster(ctx, func(ctx context.Context, c *redis.Client) error { + return c.FlushAll(ctx).Err() + }) + } + return client.FlushAll(ctx).Err() } - return client.FlushAll(context.Background()).Err() + + deleteMatching := func(ctx context.Context, c redis.Cmdable, pattern string) error { + var cursor uint64 + for { + keys, next, err := c.Scan(ctx, cursor, pattern, 1000).Result() + if err != nil { + return err + } + if len(keys) > 0 { + if err := c.Del(ctx, keys...).Err(); err != nil { + return err + } + } + if next == 0 { + return nil + } + cursor = next + } + } + + for _, ns := range namespaces { + pattern := ns + ":*" + if cc, ok := client.(*redis.ClusterClient); ok { + err := cc.ForEachMaster(ctx, func(ctx context.Context, c *redis.Client) error { + return deleteMatching(ctx, c, pattern) + }) + if err != nil { + return err + } + continue + } + if err := deleteMatching(ctx, client, pattern); err != nil { + return err + } + } + return nil } diff --git a/sidekiq/queue_test.go b/sidekiq/queue_test.go index 2a2a8f2..44ab979 100644 --- a/sidekiq/queue_test.go +++ b/sidekiq/queue_test.go @@ -14,7 +14,7 @@ import ( func TestSidekiqQueueExternalEnqueue(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{sidekiq}")) q := NewQueue(client) job := work.NewJob() @@ -44,7 +44,7 @@ func TestSidekiqQueueExternalEnqueue(t *testing.T) { func TestSidekiqQueueExternalEnqueueScheduled(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{sidekiq}")) q := NewQueue(client) job := work.NewJob() @@ -80,7 +80,7 @@ func TestSidekiqQueueExternalEnqueueScheduled(t *testing.T) { func TestSidekiqQueueExternalBulkEnqueue(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{sidekiq}")) q := NewQueue(client) const jobCount = 100000 diff --git a/worker_test.go b/worker_test.go index 039c8ad..4d7acb5 100644 --- a/worker_test.go +++ b/worker_test.go @@ -16,10 +16,10 @@ import ( func TestWorkerStartStop(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) w := NewWorker(&WorkerOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", Queue: NewRedisQueue(client), }) err := w.Register("test", @@ -41,10 +41,10 @@ func TestWorkerStartStop(t *testing.T) { func TestWorkerExportMetrics(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) w := NewWorker(&WorkerOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", Queue: NewRedisQueue(client), }) err := w.Register("test", @@ -60,7 +60,7 @@ func TestWorkerExportMetrics(t *testing.T) { all, err := w.ExportMetrics() require.NoError(t, err) require.Len(t, all.Queue, 1) - require.Equal(t, all.Queue[0].Namespace, "{ns1}") + require.Equal(t, all.Queue[0].Namespace, "{ns-work}") require.Equal(t, all.Queue[0].QueueID, "test") } @@ -98,14 +98,14 @@ func waitEmpty(client redis.UniversalClient, key string, timeout time.Duration) func TestWorkerRunJobMultiQueue(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) type message struct { Text string } w := NewWorker(&WorkerOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", Queue: NewRedisQueue(client), }) err := w.Register("test1", @@ -147,7 +147,7 @@ func TestWorkerRunJobMultiQueue(t *testing.T) { require.NoError(t, err) err = w.opt.Queue.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "test1", }) require.NoError(t, err) @@ -159,32 +159,32 @@ func TestWorkerRunJobMultiQueue(t *testing.T) { require.NoError(t, err) err = w.opt.Queue.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "test2", }) require.NoError(t, err) } - count, err := client.ZCard(context.Background(), "{ns1}:queue:test1").Result() + count, err := client.ZCard(context.Background(), "{ns-work}:queue:test1").Result() require.NoError(t, err) require.EqualValues(t, 3, count) - count, err = client.ZCard(context.Background(), "{ns1}:queue:test2").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:test2").Result() require.NoError(t, err) require.EqualValues(t, 3, count) w.Start() - err = waitEmpty(client, "{ns1}:queue:test1", 10*time.Second) + err = waitEmpty(client, "{ns-work}:queue:test1", 10*time.Second) require.NoError(t, err) - err = waitEmpty(client, "{ns1}:queue:test2", 10*time.Second) + err = waitEmpty(client, "{ns-work}:queue:test2", 10*time.Second) require.NoError(t, err) w.Stop() - count, err = client.ZCard(context.Background(), "{ns1}:queue:test1").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:test1").Result() require.NoError(t, err) require.EqualValues(t, 0, count) - count, err = client.ZCard(context.Background(), "{ns1}:queue:test2").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:test2").Result() require.NoError(t, err) require.EqualValues(t, 0, count) } @@ -192,10 +192,10 @@ func TestWorkerRunJobMultiQueue(t *testing.T) { func TestWorkerRunJob(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) w := NewWorker(&WorkerOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", Queue: NewRedisQueue(client), }) err := w.Register("success", @@ -237,22 +237,22 @@ func TestWorkerRunJob(t *testing.T) { require.NoError(t, err) err = w.opt.Queue.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "success", }) require.NoError(t, err) } - count, err := client.ZCard(context.Background(), "{ns1}:queue:success").Result() + count, err := client.ZCard(context.Background(), "{ns-work}:queue:success").Result() require.NoError(t, err) require.EqualValues(t, 3, count) w.Start() - err = waitEmpty(client, "{ns1}:queue:success", 10*time.Second) + err = waitEmpty(client, "{ns-work}:queue:success", 10*time.Second) require.NoError(t, err) w.Stop() - count, err = client.ZCard(context.Background(), "{ns1}:queue:success").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:success").Result() require.NoError(t, err) require.EqualValues(t, 0, count) @@ -262,28 +262,28 @@ func TestWorkerRunJob(t *testing.T) { require.NoError(t, err) err = w.opt.Queue.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "failure", }) require.NoError(t, err) } - count, err = client.ZCard(context.Background(), "{ns1}:queue:failure").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:failure").Result() require.NoError(t, err) require.EqualValues(t, 3, count) w.Start() - err = waitEmpty(client, "{ns1}:queue:failure", 10*time.Second) + err = waitEmpty(client, "{ns-work}:queue:failure", 10*time.Second) require.NoError(t, err) w.Stop() - count, err = client.ZCard(context.Background(), "{ns1}:queue:failure").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:failure").Result() require.NoError(t, err) require.EqualValues(t, 3, count) for i := 0; i < 3; i++ { job, err := NewRedisQueue(client).Dequeue(&DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "failure", At: time.Now().Add(time.Hour), InvisibleSec: 3600, @@ -299,28 +299,28 @@ func TestWorkerRunJob(t *testing.T) { require.NoError(t, err) err = w.opt.Queue.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "panic", }) require.NoError(t, err) } - count, err = client.ZCard(context.Background(), "{ns1}:queue:panic").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:panic").Result() require.NoError(t, err) require.EqualValues(t, 3, count) w.Start() - err = waitEmpty(client, "{ns1}:queue:panic", 10*time.Second) + err = waitEmpty(client, "{ns-work}:queue:panic", 10*time.Second) require.NoError(t, err) w.Stop() - count, err = client.ZCard(context.Background(), "{ns1}:queue:panic").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:panic").Result() require.NoError(t, err) require.EqualValues(t, 3, count) for i := 0; i < 3; i++ { job, err := NewRedisQueue(client).Dequeue(&DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "panic", At: time.Now().Add(time.Hour), InvisibleSec: 3600, @@ -334,43 +334,43 @@ func TestWorkerRunJob(t *testing.T) { func TestWorkerRunOnce(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) job := NewJob() err := NewRedisQueue(client).Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "success", }) require.NoError(t, err) - count, err := client.ZCard(context.Background(), "{ns1}:queue:success").Result() + count, err := client.ZCard(context.Background(), "{ns-work}:queue:success").Result() require.NoError(t, err) require.EqualValues(t, 1, count) job2 := NewJob() err = NewRedisQueue(client).Enqueue(job2, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "failure", }) require.NoError(t, err) - count, err = client.ZCard(context.Background(), "{ns1}:queue:failure").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:failure").Result() require.NoError(t, err) require.EqualValues(t, 1, count) job3 := NewJob() err = NewRedisQueue(client).Enqueue(job3, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "panic", }) require.NoError(t, err) - count, err = client.ZCard(context.Background(), "{ns1}:queue:panic").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:panic").Result() require.NoError(t, err) require.EqualValues(t, 1, count) w := NewWorker(&WorkerOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", Queue: NewRedisQueue(client), }) @@ -384,7 +384,7 @@ func TestWorkerRunOnce(t *testing.T) { ) require.NoError(t, err) - count, err = client.ZCard(context.Background(), "{ns1}:queue:success").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:success").Result() require.NoError(t, err) require.EqualValues(t, 0, count) @@ -399,7 +399,7 @@ func TestWorkerRunOnce(t *testing.T) { require.Error(t, err) require.Equal(t, "no reason", err.Error()) - count, err = client.ZCard(context.Background(), "{ns1}:queue:failure").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:failure").Result() require.NoError(t, err) require.EqualValues(t, 1, count) @@ -414,7 +414,7 @@ func TestWorkerRunOnce(t *testing.T) { require.Error(t, err) require.True(t, strings.HasPrefix(err.Error(), "panic: unexpected")) - count, err = client.ZCard(context.Background(), "{ns1}:queue:panic").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:panic").Result() require.NoError(t, err) require.EqualValues(t, 1, count) } @@ -428,11 +428,11 @@ func TestWrappedHandlerError(t *testing.T) { func TestRetry(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) job := NewJob() opt := &DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", InvisibleSec: 10, } @@ -448,7 +448,7 @@ func TestRetry(t *testing.T) { z, err := client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-work}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -480,7 +480,7 @@ func TestRetry(t *testing.T) { z, err := client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-work}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", From d21704b7a96bdb93b2435e8dc7f561c3ea36c984 Mon Sep 17 00:00:00 2001 From: James Palawaga Date: Fri, 15 May 2026 15:29:23 -0400 Subject: [PATCH 6/6] Allow heartbeater to exit after wrapped func panic In the old code, it was possible for the wrapped function call to panic, which would cause the heartbeating go routine to never stop refreshing the call. This moves the important bits into a defer statement which will definitely be ran, allowing the heartbeater to close. This is motivated by seeing a heartbeater run indefinitely, always updating the visibility of a job and never allowing it to actually be consumed or retried. --- middleware/heartbeat/heartbeater.go | 13 ++++--- middleware/heartbeat/heartbeater_test.go | 49 ++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 6 deletions(-) diff --git a/middleware/heartbeat/heartbeater.go b/middleware/heartbeat/heartbeater.go index 4403290..b21e729 100644 --- a/middleware/heartbeat/heartbeater.go +++ b/middleware/heartbeat/heartbeater.go @@ -54,12 +54,13 @@ func Heartbeater(hopts *HeartbeaterOptions) work.HandleMiddleware { } }() - err := f(job, opt) - cancel() - <-done - job.UpdatedAt = copiedJob.UpdatedAt - job.EnqueuedAt = copiedJob.EnqueuedAt - return err + defer func() { + cancel() + <-done + job.UpdatedAt = copiedJob.UpdatedAt + job.EnqueuedAt = copiedJob.EnqueuedAt + }() + return f(job, opt) } } } diff --git a/middleware/heartbeat/heartbeater_test.go b/middleware/heartbeat/heartbeater_test.go index 23acc57..c56665c 100644 --- a/middleware/heartbeat/heartbeater_test.go +++ b/middleware/heartbeat/heartbeater_test.go @@ -2,6 +2,8 @@ package heartbeat import ( "context" + "errors" + "sync/atomic" "testing" "time" @@ -50,3 +52,50 @@ func TestHeartbeater(t *testing.T) { require.Len(t, z, 1) require.EqualValues(t, job.EnqueuedAt.Unix(), z[0].Score) } + +type countingQueue struct { + count atomic.Int64 +} + +func (q *countingQueue) Enqueue(*work.Job, *work.EnqueueOptions) error { + q.count.Add(1) + return nil +} + +func (q *countingQueue) Dequeue(*work.DequeueOptions) (*work.Job, error) { + return nil, errors.New("not implemented") +} + +func (q *countingQueue) Ack(*work.Job, *work.AckOptions) error { + return errors.New("not implemented") +} + +func TestHeartbeaterStopsAfterPanic(t *testing.T) { + queue := &countingQueue{} + job := work.NewJob() + opt := &work.DequeueOptions{ + Namespace: "{ns-heartbeat-panic}", + QueueID: "q1", + At: time.Now(), + InvisibleSec: 60, + } + + hb := Heartbeater(&HeartbeaterOptions{ + Queue: queue, + InvisibleSec: 30, + IntervalSec: 1, + }) + + h := hb(func(*work.Job, *work.DequeueOptions) error { + panic("boom") + }) + + require.Panics(t, func() { + _ = h(job, opt) + }) + require.Equal(t, job.EnqueuedAt.Unix(), job.UpdatedAt.Unix()+30) + require.NotEqual(t, job.CreatedAt, job.UpdatedAt) + + time.Sleep(1100 * time.Millisecond) + require.EqualValues(t, 1, queue.count.Load()) +}