diff --git a/bench/worker_bench_test.go b/bench/worker_bench_test.go index f6e2204..739361b 100644 --- a/bench/worker_bench_test.go +++ b/bench/worker_bench_test.go @@ -34,10 +34,10 @@ func BenchmarkWorkerRunJob(b *testing.B) { b.StopTimer() for n := 0; n < b.N; n++ { - require.NoError(b, redistest.Reset(client)) + require.NoError(b, redistest.Reset(client, "{ns-bench}")) wp := work.NewWorkerPoolWithOptions( - struct{}{}, 1, "{ns1}", pool, + struct{}{}, 1, "{ns-bench}", pool, work.WorkerPoolOptions{ SleepBackoffs: []int64{1000}, }, @@ -49,7 +49,7 @@ func BenchmarkWorkerRunJob(b *testing.B) { return nil }) - enqueuer := work.NewEnqueuer("{ns1}", pool) + enqueuer := work.NewEnqueuer("{ns-bench}", pool) for i := 0; i < k; i++ { _, err := enqueuer.Enqueue("test", nil) require.NoError(b, err) @@ -68,11 +68,11 @@ func BenchmarkWorkerRunJob(b *testing.B) { b.StopTimer() for n := 0; n < b.N; n++ { - require.NoError(b, redistest.Reset(client)) + require.NoError(b, redistest.Reset(client, "{ns-bench}")) queue := work2.NewRedisQueue(client) w := work2.NewWorker(&work2.WorkerOptions{ - Namespace: "{ns1}", + Namespace: "{ns-bench}", Queue: queue, }) var wg sync.WaitGroup @@ -93,7 +93,7 @@ func BenchmarkWorkerRunJob(b *testing.B) { job := work2.NewJob() err := queue.Enqueue(job, &work2.EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-bench}", QueueID: "test", }) require.NoError(b, err) diff --git a/http/server_test.go b/http/server_test.go index 2069c8c..cfef907 100644 --- a/http/server_test.go +++ b/http/server_test.go @@ -16,7 +16,7 @@ import ( func TestServer(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-http}")) q := work.NewRedisQueue(client) srv := NewServer(&ServerOptions{ @@ -89,15 +89,15 @@ func TestServer(t *testing.T) { }, { reqMethod: "DELETE", - reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&job_id=xxx", + reqURL: "http://example.com/jobs?namespace=%7Bns-http%7D&job_id=xxx", respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"\",\"job\":{\"ID\":\"xxx\",\"CreatedAt\":\"0001-01-01T00:00:00Z\",\"UpdatedAt\":\"0001-01-01T00:00:00Z\",\"EnqueuedAt\":\"0001-01-01T00:00:00Z\",\"Payload\":null,\"Retries\":0,\"LastError\":\"\"}}\n", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"\",\"job\":{\"ID\":\"xxx\",\"CreatedAt\":\"0001-01-01T00:00:00Z\",\"UpdatedAt\":\"0001-01-01T00:00:00Z\",\"EnqueuedAt\":\"0001-01-01T00:00:00Z\",\"Payload\":null,\"Retries\":0,\"LastError\":\"\"}}\n", }, { reqMethod: "GET", - reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&job_id=xxx", + reqURL: "http://example.com/jobs?namespace=%7Bns-http%7D&job_id=xxx", respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"status\":\"completed\",\"job\":{\"ID\":\"xxx\",\"CreatedAt\":\"0001-01-01T00:00:00Z\",\"UpdatedAt\":\"0001-01-01T00:00:00Z\",\"EnqueuedAt\":\"0001-01-01T00:00:00Z\",\"Payload\":null,\"Retries\":0,\"LastError\":\"\"}}\n", + respBody: "{\"namespace\":\"{ns-http}\",\"status\":\"completed\",\"job\":{\"ID\":\"xxx\",\"CreatedAt\":\"0001-01-01T00:00:00Z\",\"UpdatedAt\":\"0001-01-01T00:00:00Z\",\"EnqueuedAt\":\"0001-01-01T00:00:00Z\",\"Payload\":null,\"Retries\":0,\"LastError\":\"\"}}\n", }, { // bad duration @@ -122,91 +122,91 @@ func TestServer(t *testing.T) { reqMethod: "POST", reqURL: "http://example.com/jobs", reqBody: `{ - "namespace": "{ns1}", + "namespace": "{ns-http}", "queue_id": "q1", "payload": "payload1", "delay": "10s" }`, respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"[a-z0-9-]{36}\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"[a-z0-9-]{36}\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", }, { reqMethod: "GET", - reqURL: "http://example.com/metrics?namespace=%7Bns1%7D&queue_id=q1", + reqURL: "http://example.com/metrics?namespace=%7Bns-http%7D&queue_id=q1", respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"ready_total\":0,\"scheduled_total\":1,\"total\":1,\"latency\":0}\n", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"ready_total\":0,\"scheduled_total\":1,\"total\":1,\"latency\":0}\n", }, { reqMethod: "POST", reqURL: "http://example.com/jobs", reqBody: `{ - "namespace": "{ns1}", + "namespace": "{ns-http}", "queue_id": "q1", "payload": "payload1" }`, respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"[a-z0-9-]{36}\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"[a-z0-9-]{36}\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", }, { reqMethod: "GET", - reqURL: "http://example.com/metrics?namespace=%7Bns1%7D&queue_id=q1", + reqURL: "http://example.com/metrics?namespace=%7Bns-http%7D&queue_id=q1", respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"ready_total\":1,\"scheduled_total\":1,\"total\":2,\"latency\":[0-9]+}\n", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"ready_total\":1,\"scheduled_total\":1,\"total\":2,\"latency\":[0-9]+}\n", }, { reqMethod: "POST", reqURL: "http://example.com/jobs", reqBody: `{ - "namespace": "{ns1}", + "namespace": "{ns-http}", "queue_id": "q1", "id": "id1", "payload": "payload1" }`, respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", }, { // same job id reqMethod: "POST", reqURL: "http://example.com/jobs", reqBody: `{ - "namespace": "{ns1}", + "namespace": "{ns-http}", "queue_id": "q1", "id": "id1", "payload": "payload2" }`, respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", }, { reqMethod: "GET", - reqURL: "http://example.com/metrics?namespace=%7Bns1%7D&queue_id=q1", + reqURL: "http://example.com/metrics?namespace=%7Bns-http%7D&queue_id=q1", respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"ready_total\":2,\"scheduled_total\":1,\"total\":3,\"latency\":[0-9]+}\n", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"ready_total\":2,\"scheduled_total\":1,\"total\":3,\"latency\":[0-9]+}\n", }, { reqMethod: "GET", - reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&job_id=id1", + reqURL: "http://example.com/jobs?namespace=%7Bns-http%7D&job_id=id1", respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"status\":\"ready\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", + respBody: "{\"namespace\":\"{ns-http}\",\"status\":\"ready\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", }, { reqMethod: "DELETE", - reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&job_id=id1", + reqURL: "http://example.com/jobs?namespace=%7Bns-http%7D&job_id=id1", respCode: 500, respBody: "{\"error\":\"work: empty queue id\"}\n", }, { reqMethod: "DELETE", - reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&queue_id=q1&job_id=id1", + reqURL: "http://example.com/jobs?namespace=%7Bns-http%7D&queue_id=q1&job_id=id1", respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"job\":{\"ID\":\"id1\",\"CreatedAt\":\"[TZ0-9:-]+\",\"UpdatedAt\":\"[TZ0-9:-]+\",\"EnqueuedAt\":\"[TZ0-9:-]+\",\"Payload\":\"InBheWxvYWQxIg==\",\"Retries\":0,\"LastError\":\"\"}}", }, { reqMethod: "GET", - reqURL: "http://example.com/metrics?namespace=%7Bns1%7D&queue_id=q1", + reqURL: "http://example.com/metrics?namespace=%7Bns-http%7D&queue_id=q1", respCode: 200, - respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"ready_total\":1,\"scheduled_total\":1,\"total\":2,\"latency\":[0-9]+}\n", + respBody: "{\"namespace\":\"{ns-http}\",\"queue_id\":\"q1\",\"ready_total\":1,\"scheduled_total\":1,\"total\":2,\"latency\":[0-9]+}\n", }, } { var reqBody io.Reader diff --git a/job.go b/job.go index be4e221..de8cc17 100644 --- a/job.go +++ b/job.go @@ -30,6 +30,31 @@ type Job struct { Retries int64 `msgpack:"retries"` // If the job previously fails, LastError will be populated with error string. LastError string `msgpack:"last_error"` + + // AllowPromotion controls whether Enqueue and PromoteJob may lower this + // job's score. + // + // When false (default), both operations use ZADD XX GT semantics: once a + // job is scheduled at time T, a subsequent Enqueue with score T' < T is + // a no-op, and PromoteJob cannot reduce the score. This preserves the + // "deferral" guarantee relied on by deterministic-ID jobs that may be + // enqueued multiple times concurrently (a dedup pattern) and prevents + // PromoteJob from demoting a job whose score has been bumped to + // now + InvisibleSec by Dequeue. + // + // When true, both operations use ZADD XX (no GT). This lets an + // explicit caller lower the score, for example in an opt-in retry flow + // where backoff should override Dequeue's InvisibleSec mark, and lets + // PromoteJob advance a scheduled-but-pending job to now. The caller is + // responsible for ensuring this is safe — typically that the job has a + // unique ID and is not relied on for dedup-deferral. + // + // This field is write-only at the Go API: it is persisted to Redis on + // Enqueue and consulted server-side by Enqueue and PromoteJob, but it + // is NOT rehydrated onto jobs returned by Dequeue or BulkFind — those + // always observe the zero value. PromoteJob reads the persisted value + // directly from Redis, so callers do not need to round-trip it. + AllowPromotion bool `msgpack:"-" json:",omitempty"` } // InvalidJobPayloadError wraps json or msgpack decoding error. @@ -240,3 +265,20 @@ func (opt *FindOptions) Validate() error { type BulkJobFinder interface { BulkFind(jobIDs []string, opts *FindOptions) ([]*Job, error) } + +// PromoteOptions specifies how a job is promoted in the queue. +type PromoteOptions struct { + Namespace string + QueueID string +} + +// Validate validates PromoteOptions. +func (opt *PromoteOptions) Validate() error { + if opt.Namespace == "" { + return ErrEmptyNamespace + } + if opt.QueueID == "" { + return ErrEmptyQueueID + } + return nil +} diff --git a/middleware/concurrent/dequeuer_test.go b/middleware/concurrent/dequeuer_test.go index 805c7c3..e903e65 100644 --- a/middleware/concurrent/dequeuer_test.go +++ b/middleware/concurrent/dequeuer_test.go @@ -15,7 +15,7 @@ import ( func TestDequeuer(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-concurrent}")) var called int h1 := func(*work.DequeueOptions) (*work.Job, error) { @@ -38,7 +38,7 @@ func TestDequeuer(t *testing.T) { } opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-concurrent}", QueueID: "q1", At: time.Now(), InvisibleSec: 60, @@ -75,7 +75,7 @@ func TestDequeuer(t *testing.T) { z, err := client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:lock:q1", + "{ns-concurrent}:lock:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -87,7 +87,7 @@ func TestDequeuer(t *testing.T) { require.EqualValues(t, opt.At.Unix()+60, z[0].Score) require.EqualValues(t, opt.At.Unix()+60, z[1].Score) - require.NoError(t, client.ZRem(context.Background(), "{ns1}:lock:q1", "w1").Err()) + require.NoError(t, client.ZRem(context.Background(), "{ns-concurrent}:lock:q1", "w1").Err()) optLater := *opt optLater.At = opt.At.Add(10 * time.Second) // worker 0 is locked already @@ -105,7 +105,7 @@ func TestDequeuer(t *testing.T) { z, err = client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:lock:q1", + "{ns-concurrent}:lock:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -137,7 +137,7 @@ func TestDequeuer(t *testing.T) { z, err = client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:lock:q1", + "{ns-concurrent}:lock:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -155,7 +155,7 @@ func BenchmarkConcurrency(b *testing.B) { client := redistest.NewClient() defer client.Close() - require.NoError(b, redistest.Reset(client)) + require.NoError(b, redistest.Reset(client, "{ns-concurrent}")) var called int h1 := func(*work.DequeueOptions) (*work.Job, error) { @@ -178,7 +178,7 @@ func BenchmarkConcurrency(b *testing.B) { } opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-concurrent}", QueueID: "q1", At: time.Now(), InvisibleSec: 60, diff --git a/middleware/concurrent/local_dequeuer_test.go b/middleware/concurrent/local_dequeuer_test.go index b3971a0..988db93 100644 --- a/middleware/concurrent/local_dequeuer_test.go +++ b/middleware/concurrent/local_dequeuer_test.go @@ -30,7 +30,7 @@ func TestLocalDequeuer(t *testing.T) { } opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-concurrent}", QueueID: "q1", At: time.Now(), InvisibleSec: 60, diff --git a/middleware/discard/after_test.go b/middleware/discard/after_test.go index e07bf83..9f8105c 100644 --- a/middleware/discard/after_test.go +++ b/middleware/discard/after_test.go @@ -12,7 +12,7 @@ import ( func TestAfter(t *testing.T) { job := work.NewJob() opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-discard}", QueueID: "q1", } d := After(time.Minute) diff --git a/middleware/discard/invalid_payload_test.go b/middleware/discard/invalid_payload_test.go index 5ef2c8c..3f706ae 100644 --- a/middleware/discard/invalid_payload_test.go +++ b/middleware/discard/invalid_payload_test.go @@ -10,7 +10,7 @@ import ( func TestInvalidPayload(t *testing.T) { job := work.NewJob() opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-discard}", QueueID: "q1", } h := InvalidPayload(func(*work.Job, *work.DequeueOptions) error { diff --git a/middleware/discard/max_retry_test.go b/middleware/discard/max_retry_test.go index c8b15a7..b5257f4 100644 --- a/middleware/discard/max_retry_test.go +++ b/middleware/discard/max_retry_test.go @@ -11,7 +11,7 @@ import ( func TestMaxRetry(t *testing.T) { job := work.NewJob() opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-discard}", QueueID: "q1", } d := MaxRetry(1) diff --git a/middleware/heartbeat/heartbeater.go b/middleware/heartbeat/heartbeater.go index 4403290..b21e729 100644 --- a/middleware/heartbeat/heartbeater.go +++ b/middleware/heartbeat/heartbeater.go @@ -54,12 +54,13 @@ func Heartbeater(hopts *HeartbeaterOptions) work.HandleMiddleware { } }() - err := f(job, opt) - cancel() - <-done - job.UpdatedAt = copiedJob.UpdatedAt - job.EnqueuedAt = copiedJob.EnqueuedAt - return err + defer func() { + cancel() + <-done + job.UpdatedAt = copiedJob.UpdatedAt + job.EnqueuedAt = copiedJob.EnqueuedAt + }() + return f(job, opt) } } } diff --git a/middleware/heartbeat/heartbeater_test.go b/middleware/heartbeat/heartbeater_test.go index 98e1b56..c56665c 100644 --- a/middleware/heartbeat/heartbeater_test.go +++ b/middleware/heartbeat/heartbeater_test.go @@ -2,6 +2,8 @@ package heartbeat import ( "context" + "errors" + "sync/atomic" "testing" "time" @@ -14,11 +16,11 @@ import ( func TestHeartbeater(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-heartbeat}")) job := work.NewJob() opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-heartbeat}", QueueID: "q1", At: time.Now(), InvisibleSec: 60, @@ -41,7 +43,7 @@ func TestHeartbeater(t *testing.T) { z, err := client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-heartbeat}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -50,3 +52,50 @@ func TestHeartbeater(t *testing.T) { require.Len(t, z, 1) require.EqualValues(t, job.EnqueuedAt.Unix(), z[0].Score) } + +type countingQueue struct { + count atomic.Int64 +} + +func (q *countingQueue) Enqueue(*work.Job, *work.EnqueueOptions) error { + q.count.Add(1) + return nil +} + +func (q *countingQueue) Dequeue(*work.DequeueOptions) (*work.Job, error) { + return nil, errors.New("not implemented") +} + +func (q *countingQueue) Ack(*work.Job, *work.AckOptions) error { + return errors.New("not implemented") +} + +func TestHeartbeaterStopsAfterPanic(t *testing.T) { + queue := &countingQueue{} + job := work.NewJob() + opt := &work.DequeueOptions{ + Namespace: "{ns-heartbeat-panic}", + QueueID: "q1", + At: time.Now(), + InvisibleSec: 60, + } + + hb := Heartbeater(&HeartbeaterOptions{ + Queue: queue, + InvisibleSec: 30, + IntervalSec: 1, + }) + + h := hb(func(*work.Job, *work.DequeueOptions) error { + panic("boom") + }) + + require.Panics(t, func() { + _ = h(job, opt) + }) + require.Equal(t, job.EnqueuedAt.Unix(), job.UpdatedAt.Unix()+30) + require.NotEqual(t, job.CreatedAt, job.UpdatedAt) + + time.Sleep(1100 * time.Millisecond) + require.EqualValues(t, 1, queue.count.Load()) +} diff --git a/middleware/logrus/logger_test.go b/middleware/logrus/logger_test.go index effb308..b192e89 100644 --- a/middleware/logrus/logger_test.go +++ b/middleware/logrus/logger_test.go @@ -11,7 +11,7 @@ import ( func TestHandleFuncLogger(t *testing.T) { job := work.NewJob() opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-logrus}", QueueID: "q1", } h := HandleFuncLogger(func(*work.Job, *work.DequeueOptions) error { @@ -31,7 +31,7 @@ func TestHandleFuncLogger(t *testing.T) { func TestEnqueueFuncLogger(t *testing.T) { job := work.NewJob() opt := &work.EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-logrus}", QueueID: "q1", } h := EnqueueFuncLogger(func(*work.Job, *work.EnqueueOptions) error { diff --git a/middleware/prometheus/metrics_test.go b/middleware/prometheus/metrics_test.go index f593e89..6921a57 100644 --- a/middleware/prometheus/metrics_test.go +++ b/middleware/prometheus/metrics_test.go @@ -20,7 +20,7 @@ func TestHandleFuncMetrics(t *testing.T) { job := work.NewJob() opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-prometheus}", QueueID: "q1", } h := HandleFuncMetrics(func(*work.Job, *work.DequeueOptions) error { @@ -58,7 +58,7 @@ func TestEnqueueFuncMetrics(t *testing.T) { job := work.NewJob() opt := &work.EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-prometheus}", QueueID: "q1", } h := EnqueueFuncMetrics(func(*work.Job, *work.EnqueueOptions) error { @@ -93,10 +93,10 @@ func TestExportWorkerMetrics(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-prometheus}")) w := work.NewWorker(&work.WorkerOptions{ - Namespace: "{ns1}", + Namespace: "{ns-prometheus}", Queue: work.NewRedisQueue(client), }) err = w.Register("test", diff --git a/middleware/recovery/catch_panic_test.go b/middleware/recovery/catch_panic_test.go index 0f3e3f2..1b8eb16 100644 --- a/middleware/recovery/catch_panic_test.go +++ b/middleware/recovery/catch_panic_test.go @@ -10,7 +10,7 @@ import ( func TestCatchPanic(t *testing.T) { job := work.NewJob() opt := &work.DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-recovery}", QueueID: "q1", } h := CatchPanic(func(*work.Job, *work.DequeueOptions) error { diff --git a/middleware/unique/enqueuer_test.go b/middleware/unique/enqueuer_test.go index 3c217e7..2b52c0a 100644 --- a/middleware/unique/enqueuer_test.go +++ b/middleware/unique/enqueuer_test.go @@ -13,7 +13,7 @@ import ( func TestEnqueuerBypass(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-unique}")) enq := Enqueuer(&EnqueuerOptions{ Client: client, @@ -30,7 +30,7 @@ func TestEnqueuerBypass(t *testing.T) { for i := 0; i < 3; i++ { job := work.NewJob() err := h(job, &work.EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-unique}", QueueID: "q1", }) require.NoError(t, err) @@ -41,7 +41,7 @@ func TestEnqueuerBypass(t *testing.T) { func TestEnqueuer(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-unique}")) enq := Enqueuer(&EnqueuerOptions{ Client: client, @@ -58,7 +58,7 @@ func TestEnqueuer(t *testing.T) { for i := 0; i < 3; i++ { job := work.NewJob() err := h(job, &work.EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-unique}", QueueID: "q1", }) require.NoError(t, err) @@ -66,10 +66,10 @@ func TestEnqueuer(t *testing.T) { require.Equal(t, 1, called) for i := 0; i < 3; i++ { - require.NoError(t, client.Del(context.Background(), "{ns1}:unique:q1:9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08").Err()) + require.NoError(t, client.Del(context.Background(), "{ns-unique}:unique:q1:9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08").Err()) job := work.NewJob() err := h(job, &work.EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-unique}", QueueID: "q1", }) require.NoError(t, err) @@ -82,7 +82,7 @@ func BenchmarkEnqueuer(b *testing.B) { client := redistest.NewClient() defer client.Close() - require.NoError(b, redistest.Reset(client)) + require.NoError(b, redistest.Reset(client, "{ns-unique}")) enq := Enqueuer(&EnqueuerOptions{ Client: client, @@ -101,7 +101,7 @@ func BenchmarkEnqueuer(b *testing.B) { for n := 0; n < b.N; n++ { job := work.NewJob() h(job, &work.EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-unique}", QueueID: "q1", }) } diff --git a/redis_queue.go b/redis_queue.go index ef9931b..bfb7245 100644 --- a/redis_queue.go +++ b/redis_queue.go @@ -10,8 +10,7 @@ import ( "github.com/redis/go-redis/v9" ) -func batchSlice(n int) [][]int { - const size = 1000 +func batchSliceWithSize(n int, size int) [][]int { var batches [][]int for i := 0; i < n; i += size { j := i + size @@ -23,6 +22,11 @@ func batchSlice(n int) [][]int { return batches } +func batchSlice(n int) [][]int { + const size = 1000 + return batchSliceWithSize(n, size) +} + type redisQueue struct { client redis.UniversalClient @@ -30,9 +34,18 @@ type redisQueue struct { dequeueScript *redis.Script ackScript *redis.Script findScript *redis.Script + promoteScript *redis.Script metricScript *redis.Script } +// JobPromoter can update a job's score in the queue to make it immediately +// eligible for dequeuing without re-enqueuing the entire job. +type JobPromoter interface { + // PromoteJob updates the job's score to time.Now(). Only affects jobs + // that exist and have scores <= now (won't demote jobs being processed). + PromoteJob(jobID string, opt *PromoteOptions) error +} + // RedisQueue implements Queue with other additional capabilities type RedisQueue interface { Queue @@ -40,6 +53,7 @@ type RedisQueue interface { BulkDequeuer BulkJobFinder MetricsExporter + JobPromoter } // NewRedisQueue creates a new queue stored in redis. @@ -49,22 +63,46 @@ func NewRedisQueue(client redis.UniversalClient) RedisQueue { local queue_id = ARGV[2] local queue_key = table.concat({ns, "queue", queue_id}, ":") - local zadd_args = {} + -- Per-job AllowPromotion (passed alongside each job) selects between + -- two ZADD variants. Guarded entries use ZADD ... gt so a duplicate + -- enqueue cannot demote an already-deferred deterministic-ID job; + -- promoted entries use plain ZADD so an explicit opt-in caller can + -- lower the score below the InvisibleSec mark set by Dequeue. + local guarded_args = {} + local promoted_args = {} - for i = 3,table.getn(ARGV),3 do + for i = 3,table.getn(ARGV),4 do local at = tonumber(ARGV[i]) - local job_id = ARGV[i+1] - local jobm = ARGV[i+2] + local requested_allow_promotion = ARGV[i+1] + local job_id = ARGV[i+2] + local jobm = ARGV[i+3] local job_key = table.concat({ns, "job", job_id}, ":") + local allow_promotion = redis.call("hget", job_key, "allow_promotion") + if allow_promotion == false then + allow_promotion = requested_allow_promotion + redis.call("hset", job_key, "allow_promotion", allow_promotion) + end -- update job fields redis.call("hset", job_key, "msgpack", jobm) - -- enqueue - table.insert(zadd_args, at) - table.insert(zadd_args, job_key) + if allow_promotion == "1" then + table.insert(promoted_args, at) + table.insert(promoted_args, job_key) + else + table.insert(guarded_args, at) + table.insert(guarded_args, job_key) + end + end + + local added = 0 + if table.getn(guarded_args) > 0 then + added = added + tonumber(redis.call("zadd", queue_key, "gt", unpack(guarded_args))) + end + if table.getn(promoted_args) > 0 then + added = added + tonumber(redis.call("zadd", queue_key, unpack(promoted_args))) end - return redis.call("zadd", queue_key, unpack(zadd_args)) + return added `) dequeueScript := redis.NewScript(` @@ -146,6 +184,30 @@ func NewRedisQueue(client redis.UniversalClient) RedisQueue { return ret `) + // PromoteJob is read-then-write (HGET allow_promotion, then ZADD) + // done atomically in a single script so an Ack + re-enqueue between + // the two operations cannot let a stale "AllowPromotion=true" read + // demote a freshly-enqueued job whose owner did not opt in. + promoteScript := redis.NewScript(` + local ns = ARGV[1] + local queue_id = ARGV[2] + local job_id = ARGV[3] + local at = ARGV[4] + local queue_key = table.concat({ns, "queue", queue_id}, ":") + local job_key = table.concat({ns, "job", job_id}, ":") + + -- XX always: never (re-)add a job whose hash is gone (Ack'd or never + -- enqueued). GT only when the job did not opt into promotion — without + -- GT, an explicit caller can lower the score from Dequeue's + -- InvisibleSec mark back down to now. + local allow_promotion = redis.call("hget", job_key, "allow_promotion") + if allow_promotion == "1" then + return redis.call("zadd", queue_key, "XX", at, job_key) + else + return redis.call("zadd", queue_key, "XX", "GT", at, job_key) + end + `) + metricScript := redis.NewScript(` local ns = ARGV[1] local queue_id = ARGV[2] @@ -174,6 +236,7 @@ func NewRedisQueue(client redis.UniversalClient) RedisQueue { dequeueScript: dequeueScript, ackScript: ackScript, findScript: findScript, + promoteScript: promoteScript, metricScript: metricScript, } } @@ -183,7 +246,11 @@ func (q *redisQueue) Enqueue(job *Job, opt *EnqueueOptions) error { } func (q *redisQueue) BulkEnqueue(jobs []*Job, opt *EnqueueOptions) error { - for _, batch := range batchSlice(len(jobs)) { + // Keep the total script ARGV count within the previous 3-args-per-job budget. + // bulkEnqueueSmallBatch now sends 4 values per job, so the maximum safe batch + // size is 1000 * 3 / 4 = 750 jobs. + const bulkEnqueueBatchSize = 750 + for _, batch := range batchSliceWithSize(len(jobs), bulkEnqueueBatchSize) { err := q.bulkEnqueueSmallBatch(jobs[batch[0]:batch[1]], opt) if err != nil { return err @@ -200,7 +267,7 @@ func (q *redisQueue) bulkEnqueueSmallBatch(jobs []*Job, opt *EnqueueOptions) err if len(jobs) == 0 { return nil } - args := make([]interface{}, 2+3*len(jobs)) + args := make([]interface{}, 2+4*len(jobs)) args[0] = opt.Namespace args[1] = opt.QueueID for i, job := range jobs { @@ -208,9 +275,14 @@ func (q *redisQueue) bulkEnqueueSmallBatch(jobs []*Job, opt *EnqueueOptions) err if err != nil { return err } - args[2+3*i] = job.EnqueuedAt.Unix() - args[2+3*i+1] = job.ID - args[2+3*i+2] = jobm + args[2+4*i] = job.EnqueuedAt.Unix() + if job.AllowPromotion { + args[2+4*i+1] = "1" + } else { + args[2+4*i+1] = "0" + } + args[2+4*i+2] = job.ID + args[2+4*i+3] = jobm } return q.enqueueScript.Run(context.Background(), q.client, []string{opt.Namespace}, args...).Err() } @@ -342,6 +414,41 @@ func (q *redisQueue) bulkFindSmallBatch(jobIDs []string, opt *FindOptions) ([]*J return jobs, nil } +func (q *redisQueue) PromoteJob(jobID string, opt *PromoteOptions) error { + err := opt.Validate() + if err != nil { + return err + } + + // promoteScript reads the separately-stored AllowPromotion metadata and + // performs the ZADD atomically. With AllowPromotion=false (default), the + // ZADD keeps the GT guard and is effectively a no-op for any job whose + // score sits in the future (either deferred via dedup or in-flight via + // Dequeue's InvisibleSec mark). With AllowPromotion=true, the caller + // has asserted that the job's calling pattern is safe to demote — the + // typical use case is a subqueue handler middleware that promotes the + // next gated job after the prior handler Acks. + // + // If the job is no longer stored (already Ack'd or never enqueued), + // the script's XX flag prevents (re-)adding it, so a missing hash is a + // no-op rather than an error. + err = q.promoteScript.Run( + context.Background(), + q.client, + []string{opt.Namespace}, + opt.Namespace, + opt.QueueID, + jobID, + time.Now().Unix(), + ).Err() + if errors.Is(err, redis.Nil) { + // ZADD XX with no member added returns nil; go-redis surfaces this + // as redis.Nil from the script. Treat it as a successful no-op. + return nil + } + return err +} + func (q *redisQueue) GetQueueMetrics(opt *QueueMetricsOptions) (*QueueMetrics, error) { err := opt.Validate() if err != nil { diff --git a/redis_queue_test.go b/redis_queue_test.go index e97c7e4..a4983ea 100644 --- a/redis_queue_test.go +++ b/redis_queue_test.go @@ -14,7 +14,7 @@ import ( func TestRedisQueueEnqueue(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) q := NewRedisQueue(client) type message struct { @@ -26,23 +26,24 @@ func TestRedisQueueEnqueue(t *testing.T) { require.NoError(t, err) err = q.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) - jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) + jobKey := fmt.Sprintf("{ns-work}:job:%s", job.ID) h, err := client.HGetAll(context.Background(), jobKey).Result() require.NoError(t, err) jobm, err := marshal(job) require.NoError(t, err) require.Equal(t, map[string]string{ - "msgpack": string(jobm), + "allow_promotion": "0", + "msgpack": string(jobm), }, h) jobs, err := q.BulkFind([]string{job.ID, "not-exist-id"}, &FindOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", }) require.NoError(t, err) require.Len(t, jobs, 2) @@ -52,12 +53,12 @@ func TestRedisQueueEnqueue(t *testing.T) { jobs[0].LastError = "hello world" err = q.Enqueue(jobs[0], &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) jobs, err = q.BulkFind([]string{job.ID}, &FindOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", }) require.NoError(t, err) require.Len(t, jobs, 1) @@ -66,7 +67,7 @@ func TestRedisQueueEnqueue(t *testing.T) { z, err := client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-work}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -77,14 +78,14 @@ func TestRedisQueueEnqueue(t *testing.T) { require.EqualValues(t, job.EnqueuedAt.Unix(), z[0].Score) err = q.Enqueue(job.Delay(time.Minute), &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) z, err = client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-work}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -98,7 +99,7 @@ func TestRedisQueueEnqueue(t *testing.T) { func TestRedisQueueDequeue(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) q := NewRedisQueue(client) type message struct { @@ -108,10 +109,10 @@ func TestRedisQueueDequeue(t *testing.T) { job := NewJob() err := job.MarshalPayload(message{Text: "hello"}) require.NoError(t, err) - jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) + jobKey := fmt.Sprintf("{ns-work}:job:%s", job.ID) err = q.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) @@ -119,7 +120,7 @@ func TestRedisQueueDequeue(t *testing.T) { now := job.EnqueuedAt.Add(123 * time.Second) jobDequeued, err := q.Dequeue(&DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", At: now, InvisibleSec: 0, @@ -129,7 +130,7 @@ func TestRedisQueueDequeue(t *testing.T) { z, err := client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-work}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -140,7 +141,7 @@ func TestRedisQueueDequeue(t *testing.T) { require.EqualValues(t, job.EnqueuedAt.Unix(), z[0].Score) jobDequeued, err = q.Dequeue(&DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", At: now, InvisibleSec: 60, @@ -153,12 +154,13 @@ func TestRedisQueueDequeue(t *testing.T) { jobm, err := marshal(job) require.NoError(t, err) require.Equal(t, map[string]string{ - "msgpack": string(jobm), + "allow_promotion": "0", + "msgpack": string(jobm), }, h) z, err = client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-work}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -170,7 +172,7 @@ func TestRedisQueueDequeue(t *testing.T) { // empty _, err = q.Dequeue(&DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", At: now, InvisibleSec: 60, @@ -182,7 +184,7 @@ func TestRedisQueueDequeue(t *testing.T) { func TestRedisQueueDequeueDeletedJob(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) q := NewRedisQueue(client) type message struct { @@ -194,25 +196,26 @@ func TestRedisQueueDequeueDeletedJob(t *testing.T) { require.NoError(t, err) err = q.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) - jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) + jobKey := fmt.Sprintf("{ns-work}:job:%s", job.ID) h, err := client.HGetAll(context.Background(), jobKey).Result() require.NoError(t, err) jobm, err := marshal(job) require.NoError(t, err) require.Equal(t, map[string]string{ - "msgpack": string(jobm), + "allow_promotion": "0", + "msgpack": string(jobm), }, h) require.NoError(t, client.Del(context.Background(), jobKey).Err()) _, err = q.Dequeue(&DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", At: job.EnqueuedAt, InvisibleSec: 60, @@ -221,7 +224,7 @@ func TestRedisQueueDequeueDeletedJob(t *testing.T) { z, err := client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-work}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -233,22 +236,22 @@ func TestRedisQueueDequeueDeletedJob(t *testing.T) { func TestRedisQueueAck(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) q := NewRedisQueue(client) job := NewJob() err := q.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) - jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) + jobKey := fmt.Sprintf("{ns-work}:job:%s", job.ID) z, err := client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-work}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -263,14 +266,14 @@ func TestRedisQueueAck(t *testing.T) { require.EqualValues(t, 1, e) err = q.Ack(job, &AckOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) z, err = client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-work}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -283,7 +286,7 @@ func TestRedisQueueAck(t *testing.T) { require.EqualValues(t, 0, e) err = q.Ack(job, &AckOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) @@ -292,36 +295,36 @@ func TestRedisQueueAck(t *testing.T) { func TestRedisQueueGetQueueMetrics(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) q := NewRedisQueue(client) job := NewJob() err := q.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) m, err := q.GetQueueMetrics(&QueueMetricsOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", At: job.EnqueuedAt, }) require.NoError(t, err) - require.Equal(t, "{ns1}", m.Namespace) + require.Equal(t, "{ns-work}", m.Namespace) require.Equal(t, "q1", m.QueueID) require.EqualValues(t, 1, m.ReadyTotal) require.EqualValues(t, 0, m.ScheduledTotal) require.True(t, 0 < m.Latency && m.Latency < time.Minute) m, err = q.GetQueueMetrics(&QueueMetricsOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", At: job.EnqueuedAt.Add(-time.Second), }) require.NoError(t, err) - require.Equal(t, "{ns1}", m.Namespace) + require.Equal(t, "{ns-work}", m.Namespace) require.Equal(t, "q1", m.QueueID) require.EqualValues(t, 0, m.ReadyTotal) require.EqualValues(t, 1, m.ScheduledTotal) @@ -331,7 +334,7 @@ func TestRedisQueueGetQueueMetrics(t *testing.T) { func TestRedisQueueBulkEnqueue(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) q := NewRedisQueue(client) const jobCount = 100000 @@ -342,12 +345,311 @@ func TestRedisQueueBulkEnqueue(t *testing.T) { } err := q.BulkEnqueue(jobs, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", }) require.NoError(t, err) - count, err := client.ZCard(context.Background(), "{ns1}:queue:q1").Result() + count, err := client.ZCard(context.Background(), "{ns-work}:queue:q1").Result() require.NoError(t, err) require.Equal(t, int64(jobCount), count) } + +func TestRedisQueuePromoteJob(t *testing.T) { + client := redistest.NewClient() + defer client.Close() + require.NoError(t, redistest.Reset(client, "{ns-work}")) + q := NewRedisQueue(client) + + // Enqueue two jobs with old timestamps (in the past) + job1 := NewJob() + job1.EnqueuedAt = time.Now().Add(-time.Hour) // 1 hour ago + job2 := NewJob() + job2.EnqueuedAt = time.Now().Add(-time.Hour) // 1 hour ago + + opts := &EnqueueOptions{ + Namespace: "{ns-work}", + QueueID: "q1", + } + + err := q.Enqueue(job1, opts) + require.NoError(t, err) + err = q.Enqueue(job2, opts) + require.NoError(t, err) + + // Check initial score of job2 (should be old timestamp) + queueKey := "{ns-work}:queue:q1" + jobKey := fmt.Sprintf("{ns-work}:job:%s", job2.ID) + initialScore, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + require.Equal(t, float64(job2.EnqueuedAt.Unix()), initialScore) + + // Promote job2 + beforePromote := time.Now().Unix() + err = q.PromoteJob(job2.ID, &PromoteOptions{ + Namespace: opts.Namespace, + QueueID: opts.QueueID, + }) + require.NoError(t, err) + afterPromote := time.Now().Unix() + + // Check that job2's score was updated to now + newScore, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + require.GreaterOrEqual(t, int64(newScore), beforePromote) + require.LessOrEqual(t, int64(newScore), afterPromote) + + // Promote non-existent job should not error (XX flag prevents adding) + err = q.PromoteJob("non-existent-job", &PromoteOptions{ + Namespace: opts.Namespace, + QueueID: opts.QueueID, + }) + require.NoError(t, err) + + // Verify non-existent job was not added to queue + exists, err := client.ZScore(context.Background(), queueKey, "{ns-work}:job:non-existent-job").Result() + require.Error(t, err) // redis.Nil error expected + require.Equal(t, float64(0), exists) +} + +func TestRedisQueueEnqueueGuardDoesNotDemote(t *testing.T) { + // AllowPromotion defaults to false. A second Enqueue of the same job + // with an earlier score must be a no-op so deterministic-ID dedup + // jobs cannot have their deferred run time clobbered. + client := redistest.NewClient() + defer client.Close() + require.NoError(t, redistest.Reset(client, "{ns-work}")) + q := NewRedisQueue(client) + + opts := &EnqueueOptions{Namespace: "{ns-work}", QueueID: "q1"} + + job := NewJob() + deferredAt := time.Now().Add(time.Minute) + job.EnqueuedAt = deferredAt + require.NoError(t, q.Enqueue(job, opts)) + + queueKey := "{ns-work}:queue:q1" + jobKey := fmt.Sprintf("{ns-work}:job:%s", job.ID) + scoreAfterFirst, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + require.EqualValues(t, deferredAt.Unix(), scoreAfterFirst) + + // Re-enqueue the same ID with an earlier score (the dedup pattern). + job.EnqueuedAt = time.Now() + job.AllowPromotion = true + require.NoError(t, q.Enqueue(job, opts)) + + scoreAfterSecond, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + require.Equal(t, scoreAfterFirst, scoreAfterSecond, "GT must reject the earlier-score re-enqueue when AllowPromotion is false") + + require.NoError(t, q.PromoteJob(job.ID, &PromoteOptions{ + Namespace: opts.Namespace, + QueueID: opts.QueueID, + })) + + scoreAfterPromote, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + require.Equal(t, scoreAfterFirst, scoreAfterPromote, "no-op re-enqueue must not flip AllowPromotion") +} + +func TestRedisQueueEnqueueAllowPromotionDemotes(t *testing.T) { + // AllowPromotion=true opts the job out of the GT guard so an explicit + // opt-in retry flow can lower the score from the Dequeue InvisibleSec + // mark back down to now + backoff. + client := redistest.NewClient() + defer client.Close() + require.NoError(t, redistest.Reset(client, "{ns-work}")) + q := NewRedisQueue(client) + + opts := &EnqueueOptions{Namespace: "{ns-work}", QueueID: "q1"} + + job := NewJob() + job.AllowPromotion = true + deferredAt := time.Now().Add(time.Minute) + job.EnqueuedAt = deferredAt + require.NoError(t, q.Enqueue(job, opts)) + + queueKey := "{ns-work}:queue:q1" + jobKey := fmt.Sprintf("{ns-work}:job:%s", job.ID) + + // Re-enqueue the same ID with an earlier score. + earlierAt := time.Now() + job.EnqueuedAt = earlierAt + require.NoError(t, q.Enqueue(job, opts)) + + scoreAfter, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + require.EqualValues(t, earlierAt.Unix(), scoreAfter, "plain ZADD must lower the score when AllowPromotion is true") +} + +func TestRedisQueueBulkEnqueueMixedAllowPromotion(t *testing.T) { + // A single BulkEnqueue containing both guarded (AllowPromotion=false) + // and promotable (AllowPromotion=true) jobs must apply the right + // semantic to each, atomically. + client := redistest.NewClient() + defer client.Close() + require.NoError(t, redistest.Reset(client, "{ns-work}")) + q := NewRedisQueue(client) + + opts := &EnqueueOptions{Namespace: "{ns-work}", QueueID: "q1"} + + guarded := NewJob() + deferredAt := time.Now().Add(time.Minute) + guarded.EnqueuedAt = deferredAt + + promotable := NewJob() + promotable.AllowPromotion = true + promotable.EnqueuedAt = deferredAt + + require.NoError(t, q.BulkEnqueue([]*Job{guarded, promotable}, opts)) + + // Re-enqueue both with an earlier score. + earlierAt := time.Now() + guarded.EnqueuedAt = earlierAt + promotable.EnqueuedAt = earlierAt + require.NoError(t, q.BulkEnqueue([]*Job{guarded, promotable}, opts)) + + queueKey := "{ns-work}:queue:q1" + + guardedScore, err := client.ZScore(context.Background(), queueKey, fmt.Sprintf("{ns-work}:job:%s", guarded.ID)).Result() + require.NoError(t, err) + require.EqualValues(t, deferredAt.Unix(), guardedScore, "guarded job must retain its later score") + + promotableScore, err := client.ZScore(context.Background(), queueKey, fmt.Sprintf("{ns-work}:job:%s", promotable.ID)).Result() + require.NoError(t, err) + require.EqualValues(t, earlierAt.Unix(), promotableScore, "promotable job must accept the earlier score") +} + +func TestRedisQueuePromoteJobAllowPromotionDemotes(t *testing.T) { + // PromoteJob on a job with AllowPromotion=true must be able to advance + // the score even when the job is currently sitting at the InvisibleSec + // mark from a prior Dequeue — that is precisely the state the subqueue + // PromoteOnAck path needs to advance. + client := redistest.NewClient() + defer client.Close() + require.NoError(t, redistest.Reset(client, "{ns-work}")) + q := NewRedisQueue(client) + + opts := &EnqueueOptions{Namespace: "{ns-work}", QueueID: "q1"} + + job := NewJob() + job.AllowPromotion = true + job.EnqueuedAt = time.Now() + require.NoError(t, q.Enqueue(job, opts)) + + dequeued, err := q.Dequeue(&DequeueOptions{ + Namespace: "{ns-work}", + QueueID: "q1", + At: time.Now(), + InvisibleSec: 60, + }) + require.NoError(t, err) + require.Equal(t, job.ID, dequeued.ID) + + queueKey := "{ns-work}:queue:q1" + jobKey := fmt.Sprintf("{ns-work}:job:%s", job.ID) + scoreAfterDequeue, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + + beforePromote := time.Now().Unix() + require.NoError(t, q.PromoteJob(job.ID, &PromoteOptions{ + Namespace: opts.Namespace, + QueueID: opts.QueueID, + })) + + scoreAfterPromote, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + require.Less(t, int64(scoreAfterPromote), int64(scoreAfterDequeue), "PromoteJob must lower the score for AllowPromotion=true jobs") + require.GreaterOrEqual(t, int64(scoreAfterPromote), beforePromote) +} + +func TestRedisQueuePromoteJobDoesNotDemote(t *testing.T) { + client := redistest.NewClient() + defer client.Close() + require.NoError(t, redistest.Reset(client, "{ns-work}")) + q := NewRedisQueue(client) + + // Enqueue a job + job := NewJob() + job.EnqueuedAt = time.Now() + + opts := &EnqueueOptions{ + Namespace: "{ns-work}", + QueueID: "q1", + } + + err := q.Enqueue(job, opts) + require.NoError(t, err) + + // Dequeue the job (this sets score to now + invisibleSec) + dequeueOpts := &DequeueOptions{ + Namespace: "{ns-work}", + QueueID: "q1", + At: time.Now(), + InvisibleSec: 60, // 60 seconds + } + dequeuedJob, err := q.Dequeue(dequeueOpts) + require.NoError(t, err) + require.Equal(t, job.ID, dequeuedJob.ID) + + // Check that job's score is now + invisibleSec + queueKey := "{ns-work}:queue:q1" + jobKey := fmt.Sprintf("{ns-work}:job:%s", job.ID) + scoreAfterDequeue, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + + // Try to promote the job (should not demote it because of GT flag) + err = q.PromoteJob(job.ID, &PromoteOptions{ + Namespace: opts.Namespace, + QueueID: opts.QueueID, + }) + require.NoError(t, err) + + // Verify score hasn't changed (GT flag prevented demotion) + scoreAfterPromote, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + require.Equal(t, scoreAfterDequeue, scoreAfterPromote) +} + +func TestRedisQueuePromoteJobMissingAllowPromotionDoesNotDemote(t *testing.T) { + client := redistest.NewClient() + defer client.Close() + require.NoError(t, redistest.Reset(client, "{ns-work}")) + q := NewRedisQueue(client) + + job := NewJob() + job.EnqueuedAt = time.Now() + + opts := &EnqueueOptions{ + Namespace: "{ns-work}", + QueueID: "q1", + } + + require.NoError(t, q.Enqueue(job, opts)) + + jobKey := fmt.Sprintf("{ns-work}:job:%s", job.ID) + require.NoError(t, client.HDel(context.Background(), jobKey, "allow_promotion").Err()) + + dequeuedJob, err := q.Dequeue(&DequeueOptions{ + Namespace: "{ns-work}", + QueueID: "q1", + At: time.Now(), + InvisibleSec: 60, + }) + require.NoError(t, err) + require.Equal(t, job.ID, dequeuedJob.ID) + + queueKey := "{ns-work}:queue:q1" + scoreAfterDequeue, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + + require.NoError(t, q.PromoteJob(job.ID, &PromoteOptions{ + Namespace: opts.Namespace, + QueueID: opts.QueueID, + })) + + scoreAfterPromote, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + require.Equal(t, scoreAfterDequeue, scoreAfterPromote) +} diff --git a/redistest/client.go b/redistest/client.go index 89ca770..0a62f34 100644 --- a/redistest/client.go +++ b/redistest/client.go @@ -23,12 +23,56 @@ func NewClient() redis.UniversalClient { }) } -// Reset is used to clear redis for next test. -func Reset(client redis.UniversalClient) error { - if cc, ok := client.(*redis.ClusterClient); ok { - return cc.ForEachMaster(context.Background(), func(ctx context.Context, c *redis.Client) error { - return c.FlushAll(ctx).Err() - }) +// Reset deletes keys belonging to the given namespaces so the caller starts +// from a clean slate. Scoping cleanup to namespaces lets tests in different +// packages run in parallel against the same Redis without one test's reset +// wiping another test's in-progress data (which a FlushAll would do). +// +// Passing no namespaces falls back to FlushAll for legacy callers. +func Reset(client redis.UniversalClient, namespaces ...string) error { + ctx := context.Background() + if len(namespaces) == 0 { + if cc, ok := client.(*redis.ClusterClient); ok { + return cc.ForEachMaster(ctx, func(ctx context.Context, c *redis.Client) error { + return c.FlushAll(ctx).Err() + }) + } + return client.FlushAll(ctx).Err() } - return client.FlushAll(context.Background()).Err() + + deleteMatching := func(ctx context.Context, c redis.Cmdable, pattern string) error { + var cursor uint64 + for { + keys, next, err := c.Scan(ctx, cursor, pattern, 1000).Result() + if err != nil { + return err + } + if len(keys) > 0 { + if err := c.Del(ctx, keys...).Err(); err != nil { + return err + } + } + if next == 0 { + return nil + } + cursor = next + } + } + + for _, ns := range namespaces { + pattern := ns + ":*" + if cc, ok := client.(*redis.ClusterClient); ok { + err := cc.ForEachMaster(ctx, func(ctx context.Context, c *redis.Client) error { + return deleteMatching(ctx, c, pattern) + }) + if err != nil { + return err + } + continue + } + if err := deleteMatching(ctx, client, pattern); err != nil { + return err + } + } + return nil } diff --git a/sidekiq/queue_test.go b/sidekiq/queue_test.go index 2a2a8f2..44ab979 100644 --- a/sidekiq/queue_test.go +++ b/sidekiq/queue_test.go @@ -14,7 +14,7 @@ import ( func TestSidekiqQueueExternalEnqueue(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{sidekiq}")) q := NewQueue(client) job := work.NewJob() @@ -44,7 +44,7 @@ func TestSidekiqQueueExternalEnqueue(t *testing.T) { func TestSidekiqQueueExternalEnqueueScheduled(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{sidekiq}")) q := NewQueue(client) job := work.NewJob() @@ -80,7 +80,7 @@ func TestSidekiqQueueExternalEnqueueScheduled(t *testing.T) { func TestSidekiqQueueExternalBulkEnqueue(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{sidekiq}")) q := NewQueue(client) const jobCount = 100000 diff --git a/worker.go b/worker.go index 2e070c6..db3fcaa 100644 --- a/worker.go +++ b/worker.go @@ -157,8 +157,8 @@ func (w *Worker) RunOnce(ctx context.Context, queueID string, h ContextHandleFun } handle := func(job *Job, o *DequeueOptions) error { - ctx, cancel := context.WithTimeout(ctx, opt.MaxExecutionTime) - defer cancel() + //ctx, cancel := context.WithTimeout(ctx, opt.MaxExecutionTime) + //defer cancel() return h(ctx, job, o) } for _, mw := range opt.HandleMiddleware { diff --git a/worker_test.go b/worker_test.go index 039c8ad..4d7acb5 100644 --- a/worker_test.go +++ b/worker_test.go @@ -16,10 +16,10 @@ import ( func TestWorkerStartStop(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) w := NewWorker(&WorkerOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", Queue: NewRedisQueue(client), }) err := w.Register("test", @@ -41,10 +41,10 @@ func TestWorkerStartStop(t *testing.T) { func TestWorkerExportMetrics(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) w := NewWorker(&WorkerOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", Queue: NewRedisQueue(client), }) err := w.Register("test", @@ -60,7 +60,7 @@ func TestWorkerExportMetrics(t *testing.T) { all, err := w.ExportMetrics() require.NoError(t, err) require.Len(t, all.Queue, 1) - require.Equal(t, all.Queue[0].Namespace, "{ns1}") + require.Equal(t, all.Queue[0].Namespace, "{ns-work}") require.Equal(t, all.Queue[0].QueueID, "test") } @@ -98,14 +98,14 @@ func waitEmpty(client redis.UniversalClient, key string, timeout time.Duration) func TestWorkerRunJobMultiQueue(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) type message struct { Text string } w := NewWorker(&WorkerOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", Queue: NewRedisQueue(client), }) err := w.Register("test1", @@ -147,7 +147,7 @@ func TestWorkerRunJobMultiQueue(t *testing.T) { require.NoError(t, err) err = w.opt.Queue.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "test1", }) require.NoError(t, err) @@ -159,32 +159,32 @@ func TestWorkerRunJobMultiQueue(t *testing.T) { require.NoError(t, err) err = w.opt.Queue.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "test2", }) require.NoError(t, err) } - count, err := client.ZCard(context.Background(), "{ns1}:queue:test1").Result() + count, err := client.ZCard(context.Background(), "{ns-work}:queue:test1").Result() require.NoError(t, err) require.EqualValues(t, 3, count) - count, err = client.ZCard(context.Background(), "{ns1}:queue:test2").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:test2").Result() require.NoError(t, err) require.EqualValues(t, 3, count) w.Start() - err = waitEmpty(client, "{ns1}:queue:test1", 10*time.Second) + err = waitEmpty(client, "{ns-work}:queue:test1", 10*time.Second) require.NoError(t, err) - err = waitEmpty(client, "{ns1}:queue:test2", 10*time.Second) + err = waitEmpty(client, "{ns-work}:queue:test2", 10*time.Second) require.NoError(t, err) w.Stop() - count, err = client.ZCard(context.Background(), "{ns1}:queue:test1").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:test1").Result() require.NoError(t, err) require.EqualValues(t, 0, count) - count, err = client.ZCard(context.Background(), "{ns1}:queue:test2").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:test2").Result() require.NoError(t, err) require.EqualValues(t, 0, count) } @@ -192,10 +192,10 @@ func TestWorkerRunJobMultiQueue(t *testing.T) { func TestWorkerRunJob(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) w := NewWorker(&WorkerOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", Queue: NewRedisQueue(client), }) err := w.Register("success", @@ -237,22 +237,22 @@ func TestWorkerRunJob(t *testing.T) { require.NoError(t, err) err = w.opt.Queue.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "success", }) require.NoError(t, err) } - count, err := client.ZCard(context.Background(), "{ns1}:queue:success").Result() + count, err := client.ZCard(context.Background(), "{ns-work}:queue:success").Result() require.NoError(t, err) require.EqualValues(t, 3, count) w.Start() - err = waitEmpty(client, "{ns1}:queue:success", 10*time.Second) + err = waitEmpty(client, "{ns-work}:queue:success", 10*time.Second) require.NoError(t, err) w.Stop() - count, err = client.ZCard(context.Background(), "{ns1}:queue:success").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:success").Result() require.NoError(t, err) require.EqualValues(t, 0, count) @@ -262,28 +262,28 @@ func TestWorkerRunJob(t *testing.T) { require.NoError(t, err) err = w.opt.Queue.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "failure", }) require.NoError(t, err) } - count, err = client.ZCard(context.Background(), "{ns1}:queue:failure").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:failure").Result() require.NoError(t, err) require.EqualValues(t, 3, count) w.Start() - err = waitEmpty(client, "{ns1}:queue:failure", 10*time.Second) + err = waitEmpty(client, "{ns-work}:queue:failure", 10*time.Second) require.NoError(t, err) w.Stop() - count, err = client.ZCard(context.Background(), "{ns1}:queue:failure").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:failure").Result() require.NoError(t, err) require.EqualValues(t, 3, count) for i := 0; i < 3; i++ { job, err := NewRedisQueue(client).Dequeue(&DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "failure", At: time.Now().Add(time.Hour), InvisibleSec: 3600, @@ -299,28 +299,28 @@ func TestWorkerRunJob(t *testing.T) { require.NoError(t, err) err = w.opt.Queue.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "panic", }) require.NoError(t, err) } - count, err = client.ZCard(context.Background(), "{ns1}:queue:panic").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:panic").Result() require.NoError(t, err) require.EqualValues(t, 3, count) w.Start() - err = waitEmpty(client, "{ns1}:queue:panic", 10*time.Second) + err = waitEmpty(client, "{ns-work}:queue:panic", 10*time.Second) require.NoError(t, err) w.Stop() - count, err = client.ZCard(context.Background(), "{ns1}:queue:panic").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:panic").Result() require.NoError(t, err) require.EqualValues(t, 3, count) for i := 0; i < 3; i++ { job, err := NewRedisQueue(client).Dequeue(&DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "panic", At: time.Now().Add(time.Hour), InvisibleSec: 3600, @@ -334,43 +334,43 @@ func TestWorkerRunJob(t *testing.T) { func TestWorkerRunOnce(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) job := NewJob() err := NewRedisQueue(client).Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "success", }) require.NoError(t, err) - count, err := client.ZCard(context.Background(), "{ns1}:queue:success").Result() + count, err := client.ZCard(context.Background(), "{ns-work}:queue:success").Result() require.NoError(t, err) require.EqualValues(t, 1, count) job2 := NewJob() err = NewRedisQueue(client).Enqueue(job2, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "failure", }) require.NoError(t, err) - count, err = client.ZCard(context.Background(), "{ns1}:queue:failure").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:failure").Result() require.NoError(t, err) require.EqualValues(t, 1, count) job3 := NewJob() err = NewRedisQueue(client).Enqueue(job3, &EnqueueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "panic", }) require.NoError(t, err) - count, err = client.ZCard(context.Background(), "{ns1}:queue:panic").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:panic").Result() require.NoError(t, err) require.EqualValues(t, 1, count) w := NewWorker(&WorkerOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", Queue: NewRedisQueue(client), }) @@ -384,7 +384,7 @@ func TestWorkerRunOnce(t *testing.T) { ) require.NoError(t, err) - count, err = client.ZCard(context.Background(), "{ns1}:queue:success").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:success").Result() require.NoError(t, err) require.EqualValues(t, 0, count) @@ -399,7 +399,7 @@ func TestWorkerRunOnce(t *testing.T) { require.Error(t, err) require.Equal(t, "no reason", err.Error()) - count, err = client.ZCard(context.Background(), "{ns1}:queue:failure").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:failure").Result() require.NoError(t, err) require.EqualValues(t, 1, count) @@ -414,7 +414,7 @@ func TestWorkerRunOnce(t *testing.T) { require.Error(t, err) require.True(t, strings.HasPrefix(err.Error(), "panic: unexpected")) - count, err = client.ZCard(context.Background(), "{ns1}:queue:panic").Result() + count, err = client.ZCard(context.Background(), "{ns-work}:queue:panic").Result() require.NoError(t, err) require.EqualValues(t, 1, count) } @@ -428,11 +428,11 @@ func TestWrappedHandlerError(t *testing.T) { func TestRetry(t *testing.T) { client := redistest.NewClient() defer client.Close() - require.NoError(t, redistest.Reset(client)) + require.NoError(t, redistest.Reset(client, "{ns-work}")) job := NewJob() opt := &DequeueOptions{ - Namespace: "{ns1}", + Namespace: "{ns-work}", QueueID: "q1", InvisibleSec: 10, } @@ -448,7 +448,7 @@ func TestRetry(t *testing.T) { z, err := client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-work}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -480,7 +480,7 @@ func TestRetry(t *testing.T) { z, err := client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "{ns-work}:queue:q1", &redis.ZRangeBy{ Min: "-inf", Max: "+inf",