Skip to content

Commit 7d11e82

Browse files
committed
Remove reserve concurrency system from run engine
1 parent 8c66ec3 commit 7d11e82

4 files changed

Lines changed: 30 additions & 281 deletions

File tree

internal-packages/redis/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ const defaultOptions: Partial<RedisOptions> = {
88
const delay = Math.min(times * 50, 1000);
99
return delay;
1010
},
11-
maxRetriesPerRequest: 20,
11+
maxRetriesPerRequest: process.env.VITEST ? 1 : 20,
1212
};
1313

1414
const logger = new Logger("Redis", "debug");

internal-packages/run-engine/src/engine/index.ts

Lines changed: 29 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ import { nanoid } from "nanoid";
5151
import { EventEmitter } from "node:events";
5252
import { z } from "zod";
5353
import { FairQueueSelectionStrategy } from "../run-queue/fairQueueSelectionStrategy.js";
54-
import { RunQueue, RunQueueReserveConcurrencyOptions } from "../run-queue/index.js";
54+
import { RunQueue } from "../run-queue/index.js";
5555
import { RunQueueFullKeyProducer } from "../run-queue/keyProducer.js";
5656
import { MinimalAuthenticatedEnvironment } from "../shared/index.js";
5757
import { MAX_TASK_RUN_ATTEMPTS } from "./consts.js";
@@ -482,8 +482,6 @@ export class RunEngine {
482482
completedByTaskRunId: taskRun.id,
483483
});
484484

485-
let reserveConcurrencyOptions: RunQueueReserveConcurrencyOptions | undefined;
486-
487485
//triggerAndWait or batchTriggerAndWait
488486
if (resumeParentOnCompletion && parentTaskRunId) {
489487
//this will block the parent run from continuing until this waitpoint is completed (and removed)
@@ -497,23 +495,8 @@ export class RunEngine {
497495
workerId,
498496
runnerId,
499497
tx: prisma,
498+
releaseConcurrency: true, // TODO: This needs to use the release concurrency system
500499
});
501-
502-
const parentRun = await prisma.taskRun.findFirst({
503-
select: {
504-
queue: true,
505-
},
506-
where: {
507-
id: parentTaskRunId,
508-
},
509-
});
510-
511-
if (parentRun) {
512-
reserveConcurrencyOptions = {
513-
messageId: parentTaskRunId,
514-
recursiveQueue: parentRun?.queue === taskRun.queue,
515-
};
516-
}
517500
}
518501

519502
//Make sure lock extension succeeded
@@ -588,29 +571,16 @@ export class RunEngine {
588571
availableAt: taskRun.delayUntil,
589572
});
590573
} else {
591-
const { wasEnqueued, error } = await this.#enqueueRun({
574+
await this.#enqueueRun({
592575
run: taskRun,
593576
env: environment,
594577
timestamp: Date.now() - taskRun.priorityMs,
595578
workerId,
596579
runnerId,
597580
tx: prisma,
598-
reserveConcurrency: reserveConcurrencyOptions,
599581
});
600582

601-
if (error) {
602-
// Fail the run immediately
603-
taskRun = await prisma.taskRun.update({
604-
where: { id: taskRun.id },
605-
data: {
606-
status: runStatusFromError(error),
607-
completedAt: new Date(),
608-
error,
609-
},
610-
});
611-
}
612-
613-
if (wasEnqueued && taskRun.ttl) {
583+
if (taskRun.ttl) {
614584
const expireAt = parseNaturalLanguageDuration(taskRun.ttl);
615585

616586
if (expireAt) {
@@ -1560,9 +1530,7 @@ export class RunEngine {
15601530
});
15611531

15621532
//remove it from the queue and release concurrency
1563-
await this.runQueue.acknowledgeMessage(run.runtimeEnvironment.organizationId, runId, {
1564-
messageId: run.parentTaskRunId ?? undefined,
1565-
});
1533+
await this.runQueue.acknowledgeMessage(run.runtimeEnvironment.organizationId, runId);
15661534

15671535
//if executing, we need to message the worker to cancel the run and put it into `PENDING_CANCEL` status
15681536
if (isExecuting(latestSnapshot.executionStatus)) {
@@ -2665,6 +2633,7 @@ export class RunEngine {
26652633
async quit() {
26662634
try {
26672635
//stop the run queue
2636+
await this.releaseConcurrencyQueue.quit();
26682637
await this.runQueue.quit();
26692638
await this.worker.stop();
26702639
await this.runLock.quit();
@@ -2797,9 +2766,7 @@ export class RunEngine {
27972766
},
27982767
});
27992768

2800-
await this.runQueue.acknowledgeMessage(updatedRun.runtimeEnvironment.organizationId, runId, {
2801-
messageId: updatedRun.parentTaskRunId ?? undefined,
2802-
});
2769+
await this.runQueue.acknowledgeMessage(updatedRun.runtimeEnvironment.organizationId, runId);
28032770

28042771
if (!updatedRun.associatedWaitpoint) {
28052772
throw new ServiceValidationError("No associated waitpoint found", 400);
@@ -2942,9 +2909,7 @@ export class RunEngine {
29422909
});
29432910
const newSnapshot = await getLatestExecutionSnapshot(prisma, runId);
29442911

2945-
await this.runQueue.acknowledgeMessage(run.project.organizationId, runId, {
2946-
messageId: run.parentTaskRunId ?? undefined,
2947-
});
2912+
await this.runQueue.acknowledgeMessage(run.project.organizationId, runId);
29482913

29492914
// We need to manually emit this as we created the final snapshot as part of the task run update
29502915
this.eventBus.emit("executionSnapshotCreated", {
@@ -3293,9 +3258,7 @@ export class RunEngine {
32933258
throw new ServiceValidationError("No associated waitpoint found", 400);
32943259
}
32953260

3296-
await this.runQueue.acknowledgeMessage(run.runtimeEnvironment.organizationId, runId, {
3297-
messageId: run.parentTaskRunId ?? undefined,
3298-
});
3261+
await this.runQueue.acknowledgeMessage(run.runtimeEnvironment.organizationId, runId);
32993262

33003263
await this.completeWaitpoint({
33013264
id: run.associatedWaitpoint.id,
@@ -3339,7 +3302,6 @@ export class RunEngine {
33393302
completedWaitpoints,
33403303
workerId,
33413304
runnerId,
3342-
reserveConcurrency,
33433305
}: {
33443306
run: TaskRun;
33453307
env: MinimalAuthenticatedEnvironment;
@@ -3357,11 +3319,10 @@ export class RunEngine {
33573319
}[];
33583320
workerId?: string;
33593321
runnerId?: string;
3360-
reserveConcurrency?: RunQueueReserveConcurrencyOptions;
3361-
}): Promise<{ wasEnqueued: boolean; error?: TaskRunError }> {
3322+
}): Promise<void> {
33623323
const prisma = tx ?? this.prisma;
33633324

3364-
return await this.runLock.lock([run.id], 5000, async (signal) => {
3325+
await this.runLock.lock([run.id], 5000, async (signal) => {
33653326
const newSnapshot = await this.#createExecutionSnapshot(prisma, {
33663327
run,
33673328
snapshot: {
@@ -3382,7 +3343,7 @@ export class RunEngine {
33823343
masterQueues.push(run.secondaryMasterQueue);
33833344
}
33843345

3385-
const wasEnqueued = await this.runQueue.enqueueMessage({
3346+
await this.runQueue.enqueueMessage({
33863347
env,
33873348
masterQueues,
33883349
message: {
@@ -3397,21 +3358,7 @@ export class RunEngine {
33973358
timestamp,
33983359
attempt: 0,
33993360
},
3400-
reserveConcurrency,
34013361
});
3402-
3403-
if (!wasEnqueued) {
3404-
return {
3405-
wasEnqueued: false,
3406-
error: {
3407-
type: "INTERNAL_ERROR",
3408-
code: TaskRunErrorCodes.RECURSIVE_WAIT_DEADLOCK,
3409-
message: `This run will never execute because it was triggered recursively and the task has no remaining concurrency available`,
3410-
} satisfies TaskRunError,
3411-
};
3412-
}
3413-
3414-
return { wasEnqueued };
34153362
});
34163363
}
34173364

@@ -3626,54 +3573,32 @@ export class RunEngine {
36263573
throw new Error(`#enqueueDelayedRun: run not found: ${runId}`);
36273574
}
36283575

3629-
let reserveConcurrency: RunQueueReserveConcurrencyOptions | undefined;
3630-
3631-
if (run.parentTaskRunId) {
3632-
const parentRun = await this.prisma.taskRun.findFirst({
3633-
where: { id: run.parentTaskRunId },
3634-
});
3635-
3636-
if (parentRun) {
3637-
reserveConcurrency = {
3638-
messageId: parentRun.id,
3639-
recursiveQueue: parentRun.queue === run.queue,
3640-
};
3641-
}
3642-
}
3643-
36443576
// Now we need to enqueue the run into the RunQueue
3645-
const { wasEnqueued, error } = await this.#enqueueRun({
3577+
await this.#enqueueRun({
36463578
run,
36473579
env: run.runtimeEnvironment,
36483580
timestamp: run.createdAt.getTime() - run.priorityMs,
3649-
reserveConcurrency,
36503581
batchId: run.batchId ?? undefined,
36513582
});
36523583

3653-
if (error) {
3654-
await this.#permanentlyFailRun({ runId, error, failedAt: new Date() });
3655-
}
3656-
3657-
if (wasEnqueued) {
3658-
await this.prisma.taskRun.update({
3659-
where: { id: runId },
3660-
data: {
3661-
status: "PENDING",
3662-
queuedAt: new Date(),
3663-
},
3664-
});
3584+
await this.prisma.taskRun.update({
3585+
where: { id: runId },
3586+
data: {
3587+
status: "PENDING",
3588+
queuedAt: new Date(),
3589+
},
3590+
});
36653591

3666-
if (run.ttl) {
3667-
const expireAt = parseNaturalLanguageDuration(run.ttl);
3592+
if (run.ttl) {
3593+
const expireAt = parseNaturalLanguageDuration(run.ttl);
36683594

3669-
if (expireAt) {
3670-
await this.worker.enqueue({
3671-
id: `expireRun:${runId}`,
3672-
job: "expireRun",
3673-
payload: { runId },
3674-
availableAt: expireAt,
3675-
});
3676-
}
3595+
if (expireAt) {
3596+
await this.worker.enqueue({
3597+
id: `expireRun:${runId}`,
3598+
job: "expireRun",
3599+
payload: { runId },
3600+
availableAt: expireAt,
3601+
});
36773602
}
36783603
}
36793604
}

0 commit comments

Comments
 (0)