Skip to content

Commit 8de78cb

Browse files
committed
Implement reserve concurrency clearing when the child run is acked
1 parent aa47420 commit 8de78cb

3 files changed

Lines changed: 136 additions & 46 deletions

File tree

internal-packages/run-engine/src/engine/index.ts

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1540,6 +1540,7 @@ export class RunEngine {
15401540
createdAt: true,
15411541
completedAt: true,
15421542
taskEventStore: true,
1543+
parentTaskRunId: true,
15431544
runtimeEnvironment: {
15441545
select: {
15451546
organizationId: true,
@@ -1559,7 +1560,9 @@ export class RunEngine {
15591560
});
15601561

15611562
//remove it from the queue and release concurrency
1562-
await this.runQueue.acknowledgeMessage(run.runtimeEnvironment.organizationId, runId);
1563+
await this.runQueue.acknowledgeMessage(run.runtimeEnvironment.organizationId, runId, {
1564+
messageId: run.parentTaskRunId ?? undefined,
1565+
});
15631566

15641567
//if executing, we need to message the worker to cancel the run and put it into `PENDING_CANCEL` status
15651568
if (isExecuting(latestSnapshot.executionStatus)) {
@@ -2778,10 +2781,13 @@ export class RunEngine {
27782781
createdAt: true,
27792782
completedAt: true,
27802783
taskEventStore: true,
2784+
parentTaskRunId: true,
27812785
},
27822786
});
27832787

2784-
await this.runQueue.acknowledgeMessage(updatedRun.runtimeEnvironment.organizationId, runId);
2788+
await this.runQueue.acknowledgeMessage(updatedRun.runtimeEnvironment.organizationId, runId, {
2789+
messageId: updatedRun.parentTaskRunId ?? undefined,
2790+
});
27852791

27862792
if (!updatedRun.associatedWaitpoint) {
27872793
throw new ServiceValidationError("No associated waitpoint found", 400);
@@ -2919,10 +2925,14 @@ export class RunEngine {
29192925
createdAt: true,
29202926
completedAt: true,
29212927
taskEventStore: true,
2928+
parentTaskRunId: true,
29222929
},
29232930
});
29242931
const newSnapshot = await getLatestExecutionSnapshot(prisma, runId);
2925-
await this.runQueue.acknowledgeMessage(run.project.organizationId, runId);
2932+
2933+
await this.runQueue.acknowledgeMessage(run.project.organizationId, runId, {
2934+
messageId: run.parentTaskRunId ?? undefined,
2935+
});
29262936

29272937
// We need to manually emit this as we created the final snapshot as part of the task run update
29282938
this.eventBus.emit("executionSnapshotCreated", {
@@ -3236,6 +3246,7 @@ export class RunEngine {
32363246
attemptNumber: true,
32373247
spanId: true,
32383248
batchId: true,
3249+
parentTaskRunId: true,
32393250
associatedWaitpoint: {
32403251
select: {
32413252
id: true,
@@ -3270,13 +3281,15 @@ export class RunEngine {
32703281
throw new ServiceValidationError("No associated waitpoint found", 400);
32713282
}
32723283

3284+
await this.runQueue.acknowledgeMessage(run.runtimeEnvironment.organizationId, runId, {
3285+
messageId: run.parentTaskRunId ?? undefined,
3286+
});
3287+
32733288
await this.completeWaitpoint({
32743289
id: run.associatedWaitpoint.id,
32753290
output: { value: JSON.stringify(error), isError: true },
32763291
});
32773292

3278-
await this.runQueue.acknowledgeMessage(run.runtimeEnvironment.organizationId, runId);
3279-
32803293
this.eventBus.emit("runFailed", {
32813294
time: failedAt,
32823295
run: {

internal-packages/run-engine/src/run-queue/index.ts

Lines changed: 111 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -411,18 +411,20 @@ export class RunQueue {
411411
* This is done when the run is in a final state.
412412
* @param messageId
413413
*/
414-
public async acknowledgeMessage(orgId: string, messageId: string) {
414+
public async acknowledgeMessage(
415+
orgId: string,
416+
messageId: string,
417+
reserveConcurrency?: {
418+
messageId?: string;
419+
}
420+
) {
415421
return this.#trace(
416422
"acknowledgeMessage",
417423
async (span) => {
418424
const message = await this.readMessage(orgId, messageId);
419425

420426
if (!message) {
421-
this.logger.log(`[${this.name}].acknowledgeMessage() message not found`, {
422-
messageId,
423-
service: this.name,
424-
});
425-
return;
427+
throw new MessageNotFoundError(messageId);
426428
}
427429

428430
span.setAttributes({
@@ -434,6 +436,7 @@ export class RunQueue {
434436

435437
await this.#callAcknowledgeMessage({
436438
message,
439+
reserveConcurrency,
437440
});
438441
},
439442
{
@@ -936,7 +939,15 @@ export class RunQueue {
936939
};
937940
}
938941

939-
async #callAcknowledgeMessage({ message }: { message: OutputPayload }) {
942+
async #callAcknowledgeMessage({
943+
message,
944+
reserveConcurrency,
945+
}: {
946+
message: OutputPayload;
947+
reserveConcurrency?: {
948+
messageId?: string;
949+
};
950+
}) {
940951
const messageId = message.runId;
941952
const messageKey = this.keys.messageKey(message.orgId, messageId);
942953
const messageQueue = message.queue;
@@ -956,22 +967,37 @@ export class RunQueue {
956967
service: this.name,
957968
});
958969

959-
const queueReserveConcurrencyKey = this.keys.reserveConcurrencyKeyFromQueue(messageQueue);
960-
const envReserveConcurrencyKey = this.keys.envReserveConcurrencyKeyFromQueue(messageQueue);
970+
if (!reserveConcurrency?.messageId) {
971+
return this.redis.acknowledgeMessage(
972+
messageKey,
973+
messageQueue,
974+
queueCurrentConcurrencyKey,
975+
envCurrentConcurrencyKey,
976+
envQueueKey,
977+
messageId,
978+
messageQueue,
979+
JSON.stringify(masterQueues),
980+
this.options.redis.keyPrefix ?? ""
981+
);
982+
} else {
983+
const queueReserveConcurrencyKey = this.keys.reserveConcurrencyKeyFromQueue(messageQueue);
984+
const envReserveConcurrencyKey = this.keys.envReserveConcurrencyKeyFromQueue(messageQueue);
961985

962-
return this.redis.acknowledgeMessage(
963-
messageKey,
964-
messageQueue,
965-
queueCurrentConcurrencyKey,
966-
envCurrentConcurrencyKey,
967-
envQueueKey,
968-
queueReserveConcurrencyKey,
969-
envReserveConcurrencyKey,
970-
messageId,
971-
messageQueue,
972-
JSON.stringify(masterQueues),
973-
this.options.redis.keyPrefix ?? ""
974-
);
986+
return this.redis.acknowledgeMessageWithReserveConcurrency(
987+
messageKey,
988+
messageQueue,
989+
queueCurrentConcurrencyKey,
990+
envCurrentConcurrencyKey,
991+
envQueueKey,
992+
envReserveConcurrencyKey,
993+
queueReserveConcurrencyKey,
994+
messageId,
995+
messageQueue,
996+
JSON.stringify(masterQueues),
997+
this.options.redis.keyPrefix ?? "",
998+
reserveConcurrency.messageId
999+
);
1000+
}
9751001
}
9761002

9771003
async #callNackMessage({ message, retryAt }: { message: OutputPayload; retryAt?: number }) {
@@ -1323,12 +1349,52 @@ return {messageId, messageScore, messagePayload} -- Return message details
13231349
});
13241350

13251351
this.redis.defineCommand("acknowledgeMessage", {
1352+
numberOfKeys: 5,
1353+
lua: `
1354+
-- Keys:
1355+
local messageKey = KEYS[1]
1356+
local messageQueueKey = KEYS[2]
1357+
local queueCurrentConcurrencyKey = KEYS[3]
1358+
local envCurrentConcurrencyKey = KEYS[4]
1359+
local envQueueKey = KEYS[5]
1360+
1361+
-- Args:
1362+
local messageId = ARGV[1]
1363+
local messageQueueName = ARGV[2]
1364+
local parentQueues = cjson.decode(ARGV[3])
1365+
local keyPrefix = ARGV[4]
1366+
1367+
-- Remove the message from the message key
1368+
redis.call('DEL', messageKey)
1369+
1370+
-- Remove the message from the queue
1371+
redis.call('ZREM', messageQueueKey, messageId)
1372+
redis.call('ZREM', envQueueKey, messageId)
1373+
1374+
-- Rebalance the parent queues
1375+
local earliestMessage = redis.call('ZRANGE', messageQueueKey, 0, 0, 'WITHSCORES')
1376+
for _, parentQueue in ipairs(parentQueues) do
1377+
local prefixedParentQueue = keyPrefix .. parentQueue
1378+
if #earliestMessage == 0 then
1379+
redis.call('ZREM', prefixedParentQueue, messageQueueName)
1380+
else
1381+
redis.call('ZADD', prefixedParentQueue, earliestMessage[2], messageQueueName)
1382+
end
1383+
end
1384+
1385+
-- Update the concurrency keys
1386+
redis.call('SREM', queueCurrentConcurrencyKey, messageId)
1387+
redis.call('SREM', envCurrentConcurrencyKey, messageId)
1388+
`,
1389+
});
1390+
1391+
this.redis.defineCommand("acknowledgeMessageWithReserveConcurrency", {
13261392
numberOfKeys: 7,
13271393
lua: `
13281394
-- Keys:
13291395
local messageKey = KEYS[1]
1330-
local messageQueue = KEYS[2]
1331-
local concurrencyKey = KEYS[3]
1396+
local messageQueueKey = KEYS[2]
1397+
local queueCurrentConcurrencyKey = KEYS[3]
13321398
local envCurrentConcurrencyKey = KEYS[4]
13331399
local envQueueKey = KEYS[5]
13341400
local queueReserveConcurrencyKey = KEYS[6]
@@ -1339,16 +1405,17 @@ local messageId = ARGV[1]
13391405
local messageQueueName = ARGV[2]
13401406
local parentQueues = cjson.decode(ARGV[3])
13411407
local keyPrefix = ARGV[4]
1408+
local reserveMessageId = ARGV[5]
13421409
13431410
-- Remove the message from the message key
13441411
redis.call('DEL', messageKey)
13451412
13461413
-- Remove the message from the queue
1347-
redis.call('ZREM', messageQueue, messageId)
1414+
redis.call('ZREM', messageQueueKey, messageId)
13481415
redis.call('ZREM', envQueueKey, messageId)
13491416
13501417
-- Rebalance the parent queues
1351-
local earliestMessage = redis.call('ZRANGE', messageQueue, 0, 0, 'WITHSCORES')
1418+
local earliestMessage = redis.call('ZRANGE', messageQueueKey, 0, 0, 'WITHSCORES')
13521419
for _, parentQueue in ipairs(parentQueues) do
13531420
local prefixedParentQueue = keyPrefix .. parentQueue
13541421
if #earliestMessage == 0 then
@@ -1359,12 +1426,12 @@ for _, parentQueue in ipairs(parentQueues) do
13591426
end
13601427
13611428
-- Update the concurrency keys
1362-
redis.call('SREM', concurrencyKey, messageId)
1429+
redis.call('SREM', queueCurrentConcurrencyKey, messageId)
13631430
redis.call('SREM', envCurrentConcurrencyKey, messageId)
13641431
13651432
-- Clear reserve concurrency
1366-
redis.call('SREM', queueReserveConcurrencyKey, messageId)
1367-
redis.call('SREM', envReserveConcurrencyKey, messageId)
1433+
redis.call('SREM', queueReserveConcurrencyKey, reserveMessageId)
1434+
redis.call('SREM', envReserveConcurrencyKey, reserveMessageId)
13681435
`,
13691436
});
13701437

@@ -1515,12 +1582,6 @@ end
15151582
redis.call('SADD', queueCurrentConcurrencyKey, messageId)
15161583
redis.call('SADD', envCurrentConcurrencyKey, messageId)
15171584
1518-
-- Remove the message from the queue reserve concurrency set
1519-
redis.call('SREM', queueReserveConcurrencyKey, messageId)
1520-
1521-
-- Remove the message from the env reserve concurrency set
1522-
redis.call('SREM', envReserveConcurrencyKey, messageId)
1523-
15241585
return true
15251586
`,
15261587
});
@@ -1631,12 +1692,26 @@ declare module "@internal/redis" {
16311692
concurrencyKey: string,
16321693
envConcurrencyKey: string,
16331694
envQueueKey: string,
1634-
queueReserveConcurrencyKey: string,
1695+
messageId: string,
1696+
messageQueueName: string,
1697+
masterQueues: string,
1698+
keyPrefix: string,
1699+
callback?: Callback<void>
1700+
): Result<void, Context>;
1701+
1702+
acknowledgeMessageWithReserveConcurrency(
1703+
messageKey: string,
1704+
messageQueue: string,
1705+
concurrencyKey: string,
1706+
envConcurrencyKey: string,
1707+
envQueueKey: string,
16351708
envReserveConcurrencyKey: string,
1709+
queueReserveConcurrencyKey: string,
16361710
messageId: string,
16371711
messageQueueName: string,
16381712
masterQueues: string,
16391713
keyPrefix: string,
1714+
reserveMessageId: string,
16401715
callback?: Callback<void>
16411716
): Result<void, Context>;
16421717

internal-packages/run-engine/src/run-queue/tests/ack.test.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ describe("RunQueue.acknowledgeMessage", () => {
274274
});
275275

276276
redisTest(
277-
"acknowledging a message clears reserve concurrency sets even when not dequeued",
277+
"acknowledging a message clears env reserve concurrency when recursive queue is false",
278278
async ({ redisContainer }) => {
279279
const queue = new RunQueue({
280280
...testOptions,
@@ -302,8 +302,8 @@ describe("RunQueue.acknowledgeMessage", () => {
302302
message: messageDev,
303303
masterQueues: ["main", envMasterQueue],
304304
reserveConcurrency: {
305-
messageId: messageDev.runId,
306-
recursiveQueue: true,
305+
messageId: "r1235",
306+
recursiveQueue: false,
307307
},
308308
});
309309

@@ -317,7 +317,7 @@ describe("RunQueue.acknowledgeMessage", () => {
317317
authenticatedEnvDev,
318318
messageDev.queue
319319
);
320-
expect(queueReserveConcurrency).toBe(1);
320+
expect(queueReserveConcurrency).toBe(0);
321321

322322
// Verify message is in queue before acknowledging
323323
const queueLengthBefore = await queue.lengthOfQueue(authenticatedEnvDev, messageDev.queue);
@@ -327,7 +327,9 @@ describe("RunQueue.acknowledgeMessage", () => {
327327
expect(envQueueLengthBefore).toBe(1);
328328

329329
// Acknowledge the message before dequeuing
330-
await queue.acknowledgeMessage(messageDev.orgId, messageDev.runId);
330+
await queue.acknowledgeMessage(messageDev.orgId, messageDev.runId, {
331+
messageId: "r1235",
332+
});
331333

332334
// Verify reserve concurrency is cleared
333335
const envReserveConcurrencyAfter = await queue.reserveConcurrencyOfEnvironment(

0 commit comments

Comments
 (0)