Skip to content

Commit aa47420

Browse files
committed
WIP new reacquire concurrency system
1 parent d1552f9 commit aa47420

12 files changed

Lines changed: 688 additions & 43 deletions

File tree

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
-- AlterEnum
2+
ALTER TYPE "TaskRunExecutionStatus"
3+
ADD
4+
VALUE 'QUEUED_EXECUTING';

internal-packages/database/prisma/schema.prisma

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2011,6 +2011,8 @@ enum TaskRunExecutionStatus {
20112011
RUN_CREATED
20122012
/// Run is in the RunQueue
20132013
QUEUED
2014+
/// Run is in the RunQueue, and is also executing. This happens when a run is continued cannot reacquire concurrency
2015+
QUEUED_EXECUTING
20142016
/// Run has been pulled from the queue, but isn't executing yet
20152017
PENDING_EXECUTING
20162018
/// Run is executing on a worker

internal-packages/run-engine/src/engine/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3908,6 +3908,9 @@ export class RunEngine {
39083908
case "QUEUED": {
39093909
throw new NotImplementedError("There shouldn't be a heartbeat for QUEUED");
39103910
}
3911+
case "QUEUED_EXECUTING": {
3912+
throw new NotImplementedError("There shouldn't be a heartbeat for QUEUED_EXECUTING");
3913+
}
39113914
case "PENDING_EXECUTING": {
39123915
//the run didn't start executing, we need to requeue it
39133916
const run = await prisma.taskRun.findFirst({

internal-packages/run-engine/src/engine/statuses.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { TaskRunExecutionStatus, TaskRunStatus } from "@trigger.dev/database";
22

33
export function isDequeueableExecutionStatus(status: TaskRunExecutionStatus): boolean {
4-
const dequeuableExecutionStatuses: TaskRunExecutionStatus[] = ["QUEUED"];
4+
const dequeuableExecutionStatuses: TaskRunExecutionStatus[] = ["QUEUED", "QUEUED_EXECUTING"];
55
return dequeuableExecutionStatuses.includes(status);
66
}
77

internal-packages/run-engine/src/engine/locking.test.ts renamed to internal-packages/run-engine/src/engine/tests/locking.test.ts

File renamed without changes.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
export class MessageNotFoundError extends Error {
2+
constructor(messageId: string) {
3+
super(`Message not found: ${messageId}`);
4+
}
5+
}

internal-packages/run-engine/src/run-queue/index.ts

Lines changed: 81 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import {
2929
type RedisOptions,
3030
type Result,
3131
} from "@internal/redis";
32+
import { MessageNotFoundError } from "./errors.js";
3233

3334
const SemanticAttributes = {
3435
QUEUE: "runqueue.queue",
@@ -530,12 +531,9 @@ export class RunQueue {
530531
});
531532

532533
return this.redis.releaseConcurrency(
533-
this.keys.messageKey(orgId, messageId),
534-
message.queue,
535534
this.keys.currentConcurrencyKeyFromQueue(message.queue),
536535
this.keys.envCurrentConcurrencyKeyFromQueue(message.queue),
537-
messageId,
538-
JSON.stringify(message.masterQueues)
536+
messageId
539537
);
540538
},
541539
{
@@ -556,11 +554,7 @@ export class RunQueue {
556554
const message = await this.readMessage(orgId, messageId);
557555

558556
if (!message) {
559-
this.logger.log(`[${this.name}].acknowledgeMessage() message not found`, {
560-
messageId,
561-
service: this.name,
562-
});
563-
return;
557+
throw new MessageNotFoundError(messageId);
564558
}
565559

566560
span.setAttributes({
@@ -570,14 +564,25 @@ export class RunQueue {
570564
[SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey,
571565
});
572566

573-
return this.redis.reacquireConcurrency(
574-
this.keys.messageKey(orgId, messageId),
575-
message.queue,
576-
this.keys.currentConcurrencyKeyFromQueue(message.queue),
577-
this.keys.envCurrentConcurrencyKeyFromQueue(message.queue),
567+
const queueCurrentConcurrencyKey = this.keys.currentConcurrencyKeyFromQueue(message.queue);
568+
const envCurrentConcurrencyKey = this.keys.envCurrentConcurrencyKeyFromQueue(message.queue);
569+
const queueReserveConcurrencyKey = this.keys.reserveConcurrencyKeyFromQueue(message.queue);
570+
const envReserveConcurrencyKey = this.keys.envReserveConcurrencyKeyFromQueue(message.queue);
571+
const queueConcurrencyLimitKey = this.keys.concurrencyLimitKeyFromQueue(message.queue);
572+
const envConcurrencyLimitKey = this.keys.envConcurrencyLimitKeyFromQueue(message.queue);
573+
574+
const result = await this.redis.reacquireConcurrency(
575+
queueCurrentConcurrencyKey,
576+
envCurrentConcurrencyKey,
577+
queueReserveConcurrencyKey,
578+
envReserveConcurrencyKey,
579+
queueConcurrencyLimitKey,
580+
envConcurrencyLimitKey,
578581
messageId,
579-
JSON.stringify(message.masterQueues)
582+
String(this.options.defaultEnvConcurrency)
580583
);
584+
585+
return !!result;
581586
},
582587
{
583588
kind: SpanKind.CONSUMER,
@@ -1447,41 +1452,76 @@ redis.call('SREM', envCurrentConcurrencyKey, messageId)
14471452
});
14481453

14491454
this.redis.defineCommand("releaseConcurrency", {
1450-
numberOfKeys: 5,
1455+
numberOfKeys: 2,
14511456
lua: `
14521457
-- Keys:
1453-
local messageKey = KEYS[1]
1454-
local messageQueue = KEYS[2]
1455-
local concurrencyKey = KEYS[3]
1456-
local envCurrentConcurrencyKey = KEYS[4]
1457-
local envQueueKey = KEYS[5]
1458+
local queueCurrentConcurrencyKey = KEYS[1]
1459+
local envCurrentConcurrencyKey = KEYS[2]
14581460
14591461
-- Args:
14601462
local messageId = ARGV[1]
14611463
14621464
-- Update the concurrency keys
1463-
if concurrencyKey ~= "" then
1464-
redis.call('SREM', concurrencyKey, messageId)
1465-
end
1465+
redis.call('SREM', queueCurrentConcurrencyKey, messageId)
14661466
redis.call('SREM', envCurrentConcurrencyKey, messageId)
14671467
`,
14681468
});
14691469

14701470
this.redis.defineCommand("reacquireConcurrency", {
1471-
numberOfKeys: 4,
1471+
numberOfKeys: 6,
14721472
lua: `
14731473
-- Keys:
1474-
local messageKey = KEYS[1]
1475-
local messageQueue = KEYS[2]
1476-
local concurrencyKey = KEYS[3]
1477-
local envCurrentConcurrencyKey = KEYS[4]
1474+
local queueCurrentConcurrencyKey = KEYS[1]
1475+
local envCurrentConcurrencyKey = KEYS[2]
1476+
local queueReserveConcurrencyKey = KEYS[3]
1477+
local envReserveConcurrencyKey = KEYS[4]
1478+
local queueConcurrencyLimitKey = KEYS[5]
1479+
local envConcurrencyLimitKey = KEYS[6]
14781480
14791481
-- Args:
14801482
local messageId = ARGV[1]
1483+
local defaultEnvConcurrencyLimit = ARGV[2]
1484+
1485+
-- Check if the message is already in either current concurrency set
1486+
local isInQueueConcurrency = redis.call('SISMEMBER', queueCurrentConcurrencyKey, messageId) == 1
1487+
local isInEnvConcurrency = redis.call('SISMEMBER', envCurrentConcurrencyKey, messageId) == 1
1488+
1489+
-- If it's already in both sets, we're done
1490+
if isInQueueConcurrency and isInEnvConcurrency then
1491+
return true
1492+
end
1493+
1494+
-- Check current env concurrency against the limit
1495+
local envCurrentConcurrency = tonumber(redis.call('SCARD', envCurrentConcurrencyKey) or '0')
1496+
local envConcurrencyLimit = tonumber(redis.call('GET', envConcurrencyLimitKey) or defaultEnvConcurrencyLimit)
1497+
local envReserveConcurrency = tonumber(redis.call('SCARD', envReserveConcurrencyKey) or '0')
1498+
local totalEnvConcurrencyLimit = envConcurrencyLimit + envReserveConcurrency
1499+
1500+
if envCurrentConcurrency >= totalEnvConcurrencyLimit then
1501+
return false
1502+
end
1503+
1504+
-- Check current queue concurrency against the limit
1505+
local queueCurrentConcurrency = tonumber(redis.call('SCARD', queueCurrentConcurrencyKey) or '0')
1506+
local queueConcurrencyLimit = math.min(tonumber(redis.call('GET', queueConcurrencyLimitKey) or '1000000'), envConcurrencyLimit)
1507+
local queueReserveConcurrency = tonumber(redis.call('SCARD', queueReserveConcurrencyKey) or '0')
1508+
local totalQueueConcurrencyLimit = queueConcurrencyLimit + queueReserveConcurrency
1509+
1510+
if queueCurrentConcurrency >= totalQueueConcurrencyLimit then
1511+
return false
1512+
end
14811513
14821514
-- Update the concurrency keys
1483-
redis.call('SADD', concurrencyKey, messageId)
1515+
redis.call('SADD', queueCurrentConcurrencyKey, messageId)
14841516
redis.call('SADD', envCurrentConcurrencyKey, messageId)
1517+
1518+
-- Remove the message from the queue reserve concurrency set
1519+
redis.call('SREM', queueReserveConcurrencyKey, messageId)
1520+
1521+
-- Remove the message from the env reserve concurrency set
1522+
redis.call('SREM', envReserveConcurrencyKey, messageId)
1523+
1524+
return true
14851525
`,
14861526
});
14871527

@@ -1630,24 +1670,23 @@ declare module "@internal/redis" {
16301670
): Result<void, Context>;
16311671

16321672
releaseConcurrency(
1633-
messageKey: string,
1634-
messageQueue: string,
1635-
concurrencyKey: string,
1636-
envConcurrencyKey: string,
1673+
queueCurrentConcurrencyKey: string,
1674+
envCurrentConcurrencyKey: string,
16371675
messageId: string,
1638-
masterQueues: string,
16391676
callback?: Callback<void>
16401677
): Result<void, Context>;
16411678

16421679
reacquireConcurrency(
1643-
messageKey: string,
1644-
messageQueue: string,
1645-
concurrencyKey: string,
1646-
envConcurrencyKey: string,
1680+
queueCurrentConcurrencyKey: string,
1681+
envCurrentConcurrencyKey: string,
1682+
queueReserveConcurrencyKey: string,
1683+
envReserveConcurrencyKey: string,
1684+
queueConcurrencyLimitKey: string,
1685+
envConcurrencyLimitKey: string,
16471686
messageId: string,
1648-
masterQueues: string,
1649-
callback?: Callback<void>
1650-
): Result<void, Context>;
1687+
defaultEnvConcurrencyLimit: string,
1688+
callback?: Callback<string>
1689+
): Result<string, Context>;
16511690

16521691
updateGlobalConcurrencyLimits(
16531692
envConcurrencyLimitKey: string,

0 commit comments

Comments
 (0)