Skip to content

Commit 9aeb843

Browse files
authored
feat: supersedes option to job queue concurrency controls (#15179)
Merge first: #15177 Adds `supersedes` option to concurrency controls allowing newer jobs to automatically delete older pending jobs with the same concurrency key. This enables "last queued wins" behavior for scenarios where only the latest state matters. ## Usage ```typescript concurrency: { key: ({ input }) => `generate:${input.documentId}`, exclusive: true, supersedes: true, // Newer jobs delete older pending ones (not yet completed and did not start processing yet) } ``` **Example scenario:** User rapidly edits a document triggering 5 regeneration jobs. Only the last job runs - intermediate pending jobs are automatically deleted. ## Important Only **pending** jobs are deleted. Running jobs complete normally and the new job waits.
1 parent 01f90c9 commit 9aeb843

8 files changed

Lines changed: 415 additions & 15 deletions

File tree

docs/jobs-queue/workflows.mdx

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ The `concurrency` option accepts either a function (shorthand) or an object with
426426
**Shorthand (function only):**
427427

428428
```ts
429-
// Exclusive concurrency is enabled by default
429+
// Exclusive defaults to true, supersedes defaults to false
430430
concurrency: ({ input }) => `my-key:${input.resourceId}`
431431
```
432432

@@ -441,42 +441,83 @@ concurrency: {
441441
// Only one job with this key can run at a time
442442
// @default true
443443
exclusive: true,
444+
445+
// Delete older pending jobs when a new job is queued
446+
// @default false
447+
supersedes: false,
444448
}
445449
```
446450

447-
#### Use Cases
451+
#### Common Patterns
448452

449-
**1. Document processing (prevent race conditions):**
453+
**1. Exclusive only (preserve all jobs):**
450454

451455
```ts
452-
concurrency: ({ input }) => `process:${input.documentId}`
456+
concurrency: {
457+
key: ({ input }) => `process:${input.documentId}`,
458+
exclusive: true,
459+
supersedes: false, // All jobs run, just not in parallel
460+
}
453461
```
454462

455-
Multiple updates to the same document will be processed one at a time, in the order they were queued.
463+
Use when every job represents unique work that must complete (e.g., processing distinct versions of a document).
456464

457-
**2. Per-user operations:**
465+
**2. Exclusive + Supersedes (last queued wins):**
458466

459467
```ts
460-
concurrency: ({ input }) => `user:${input.userId}`
468+
concurrency: {
469+
key: ({ input }) => `generate:${input.documentId}`,
470+
exclusive: true,
471+
supersedes: true, // Only latest job runs
472+
}
461473
```
462474

463-
Ensures only one job per user runs at a time, preventing conflicts when a user triggers multiple actions quickly.
475+
Use when only the latest state matters (e.g., regenerating embeddings after rapid edits - intermediate states can be skipped).
464476

465-
**3. External API calls (respect rate limits):**
477+
**3. Queue-specific concurrency:**
466478

467479
```ts
468-
concurrency: ({ input }) => `api:${input.resourceId}`
480+
concurrency: {
481+
key: ({ input, queue }) => `${queue}:sync:${input.resourceId}`,
482+
}
469483
```
470484

471-
Prevents parallel API calls for the same resource, useful when external services don't handle concurrent updates well.
485+
Include the queue name to allow the same resource to be processed concurrently in different queues.
486+
487+
#### Supersedes Behavior
472488

473-
**4. Queue-specific concurrency:**
489+
When `supersedes: true` is set, newly queued jobs will automatically delete older pending (not yet running) jobs with the same concurrency key:
490+
491+
**Example scenario:**
492+
493+
```
494+
1. Job A queued → pending
495+
2. Job B queued → A deleted, B pending
496+
3. Job C queued → B deleted, C pending
497+
4. Only Job C runs
498+
```
499+
500+
**Configuration:**
474501

475502
```ts
476-
concurrency: ({ input, queue }) => `${queue}:process:${input.documentId}`
503+
concurrency: {
504+
key: ({ input }) => `generate:${input.documentId}`,
505+
exclusive: true, // Still enforced
506+
supersedes: true, // Delete older pending jobs
507+
}
477508
```
478509

479-
Include the queue name in the key to allow the same resource to be processed concurrently in different queues (e.g., `emails` queue vs `default` queue).
510+
**When to use:**
511+
512+
- Data regeneration (embeddings, thumbnails) after rapid edits
513+
- Report generation where only the latest parameters matter
514+
- Any scenario where intermediate pending jobs are made obsolete by newer ones
515+
516+
**Important notes:**
517+
518+
- Only **pending** jobs (not yet running) are deleted
519+
- If a job is already **running**, it completes normally and the new job waits
520+
- Without `exclusive: true`, supersedes still deletes pending jobs but won't prevent parallel execution
480521

481522
#### Important Considerations
482523

packages/payload/src/queues/config/types/workflowTypes.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ export type JobTaskStatus = {
136136
*/
137137
export type ConcurrencyConfig<TInput = object> =
138138
| ((args: { input: TInput; queue: string }) => string)
139-
// Shorthand: key function only, exclusive defaults to true
139+
// Shorthand: key function only, exclusive defaults to true, supersedes defaults to false
140140
| {
141141
/**
142142
* Only one job with this key can run at a time.
@@ -150,6 +150,13 @@ export type ConcurrencyConfig<TInput = object> =
150150
* The queue name is provided to allow for queue-specific concurrency keys if needed.
151151
*/
152152
key: (args: { input: TInput; queue: string }) => string
153+
/**
154+
* When a new job is queued, delete older pending (not yet running) jobs with the same key.
155+
* Already-running jobs are not affected.
156+
* Useful when only the latest state matters (e.g., regenerating data after multiple rapid edits).
157+
* @default false
158+
*/
159+
supersedes?: boolean
153160
}
154161

155162
export type WorkflowConfig<

packages/payload/src/queues/localAPI.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ export const getJobsLocalAPI = (payload: Payload) => ({
161161
// Compute concurrency key from workflow or task config (only if feature is enabled)
162162
if (payload.config.jobs?.enableConcurrencyControl) {
163163
let concurrencyKey: null | string = null
164+
let supersedes = false
164165
const queueName = queue || 'default'
165166

166167
if (args.workflow) {
@@ -171,6 +172,7 @@ export const getJobsLocalAPI = (payload: Payload) => ({
171172
concurrencyKey = concurrencyConfig({ input: args.input, queue: queueName })
172173
} else {
173174
concurrencyKey = concurrencyConfig.key({ input: args.input, queue: queueName })
175+
supersedes = concurrencyConfig.supersedes ?? false
174176
}
175177
}
176178
} else if (args.task) {
@@ -181,12 +183,43 @@ export const getJobsLocalAPI = (payload: Payload) => ({
181183
concurrencyKey = concurrencyConfig({ input: args.input, queue: queueName })
182184
} else {
183185
concurrencyKey = concurrencyConfig.key({ input: args.input, queue: queueName })
186+
supersedes = concurrencyConfig.supersedes ?? false
184187
}
185188
}
186189
}
187190

188191
if (concurrencyKey) {
189192
data.concurrencyKey = concurrencyKey
193+
194+
// If supersedes is enabled, delete older pending jobs with the same key
195+
if (supersedes) {
196+
if (payload.config.jobs.runHooks) {
197+
await payload.delete({
198+
collection: jobsCollectionSlug,
199+
depth: 0,
200+
disableTransaction: true,
201+
where: {
202+
and: [
203+
{ concurrencyKey: { equals: concurrencyKey } },
204+
{ processing: { equals: false } },
205+
{ completedAt: { exists: false } },
206+
],
207+
},
208+
})
209+
} else {
210+
await payload.db.deleteMany({
211+
collection: jobsCollectionSlug,
212+
req,
213+
where: {
214+
and: [
215+
{ concurrencyKey: { equals: concurrencyKey } },
216+
{ processing: { equals: false } },
217+
{ completedAt: { exists: false } },
218+
],
219+
},
220+
})
221+
}
222+
}
190223
}
191224
}
192225

payload.db

348 KB
Binary file not shown.

test/queues/getConfig.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import { retriesWorkflowLevelTestWorkflow } from './workflows/retriesWorkflowLev
3737
import { selfCancelWorkflow } from './workflows/selfCancel.js'
3838
import { subTaskWorkflow } from './workflows/subTask.js'
3939
import { subTaskFailsWorkflow } from './workflows/subTaskFails.js'
40+
import { supersedesConcurrencyWorkflow } from './workflows/supersedesConcurrency.js'
4041
import { updatePostWorkflow } from './workflows/updatePost.js'
4142
import { updatePostJSONWorkflow } from './workflows/updatePostJSON.js'
4243
import { workflowAndTasksRetriesUndefinedWorkflow } from './workflows/workflowAndTasksRetriesUndefined.js'
@@ -177,6 +178,7 @@ export const getConfig: () => Partial<Config> = () => ({
177178
exclusiveConcurrencyWorkflow,
178179
noConcurrencyWorkflow,
179180
queueSpecificConcurrencyWorkflow,
181+
supersedesConcurrencyWorkflow,
180182
],
181183
},
182184
editor: lexicalEditor(),

0 commit comments

Comments
 (0)