Skip to content

Commit b59f200

Browse files
committed
Remove reserve concurrency system from run engine
1 parent 0e75e57 commit b59f200

4 files changed

Lines changed: 30 additions & 281 deletions

File tree

internal-packages/redis/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ const defaultOptions: Partial<RedisOptions> = {
88
const delay = Math.min(times * 50, 1000);
99
return delay;
1010
},
11-
maxRetriesPerRequest: 20,
11+
maxRetriesPerRequest: process.env.VITEST ? 1 : 20,
1212
};
1313

1414
const logger = new Logger("Redis", "debug");

internal-packages/run-engine/src/engine/index.ts

Lines changed: 29 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ import { nanoid } from "nanoid";
5151
import { EventEmitter } from "node:events";
5252
import { z } from "zod";
5353
import { FairQueueSelectionStrategy } from "../run-queue/fairQueueSelectionStrategy.js";
54-
import { RunQueue, RunQueueReserveConcurrencyOptions } from "../run-queue/index.js";
54+
import { RunQueue } from "../run-queue/index.js";
5555
import { RunQueueFullKeyProducer } from "../run-queue/keyProducer.js";
5656
import { MinimalAuthenticatedEnvironment } from "../shared/index.js";
5757
import { MAX_TASK_RUN_ATTEMPTS } from "./consts.js";
@@ -482,8 +482,6 @@ export class RunEngine {
482482
completedByTaskRunId: taskRun.id,
483483
});
484484

485-
let reserveConcurrencyOptions: RunQueueReserveConcurrencyOptions | undefined;
486-
487485
//triggerAndWait or batchTriggerAndWait
488486
if (resumeParentOnCompletion && parentTaskRunId) {
489487
//this will block the parent run from continuing until this waitpoint is completed (and removed)
@@ -497,23 +495,8 @@ export class RunEngine {
497495
workerId,
498496
runnerId,
499497
tx: prisma,
498+
releaseConcurrency: true, // TODO: This needs to use the release concurrency system
500499
});
501-
502-
const parentRun = await prisma.taskRun.findFirst({
503-
select: {
504-
queue: true,
505-
},
506-
where: {
507-
id: parentTaskRunId,
508-
},
509-
});
510-
511-
if (parentRun) {
512-
reserveConcurrencyOptions = {
513-
messageId: parentTaskRunId,
514-
recursiveQueue: parentRun?.queue === taskRun.queue,
515-
};
516-
}
517500
}
518501

519502
//Make sure lock extension succeeded
@@ -588,29 +571,16 @@ export class RunEngine {
588571
availableAt: taskRun.delayUntil,
589572
});
590573
} else {
591-
const { wasEnqueued, error } = await this.#enqueueRun({
574+
await this.#enqueueRun({
592575
run: taskRun,
593576
env: environment,
594577
timestamp: Date.now() - taskRun.priorityMs,
595578
workerId,
596579
runnerId,
597580
tx: prisma,
598-
reserveConcurrency: reserveConcurrencyOptions,
599581
});
600582

601-
if (error) {
602-
// Fail the run immediately
603-
taskRun = await prisma.taskRun.update({
604-
where: { id: taskRun.id },
605-
data: {
606-
status: runStatusFromError(error),
607-
completedAt: new Date(),
608-
error,
609-
},
610-
});
611-
}
612-
613-
if (wasEnqueued && taskRun.ttl) {
583+
if (taskRun.ttl) {
614584
const expireAt = parseNaturalLanguageDuration(taskRun.ttl);
615585

616586
if (expireAt) {
@@ -1560,9 +1530,7 @@ export class RunEngine {
15601530
});
15611531

15621532
//remove it from the queue and release concurrency
1563-
await this.runQueue.acknowledgeMessage(run.runtimeEnvironment.organizationId, runId, {
1564-
messageId: run.parentTaskRunId ?? undefined,
1565-
});
1533+
await this.runQueue.acknowledgeMessage(run.runtimeEnvironment.organizationId, runId);
15661534

15671535
//if executing, we need to message the worker to cancel the run and put it into `PENDING_CANCEL` status
15681536
if (isExecuting(latestSnapshot.executionStatus)) {
@@ -2653,6 +2621,7 @@ export class RunEngine {
26532621
async quit() {
26542622
try {
26552623
//stop the run queue
2624+
await this.releaseConcurrencyQueue.quit();
26562625
await this.runQueue.quit();
26572626
await this.worker.stop();
26582627
await this.runLock.quit();
@@ -2785,9 +2754,7 @@ export class RunEngine {
27852754
},
27862755
});
27872756

2788-
await this.runQueue.acknowledgeMessage(updatedRun.runtimeEnvironment.organizationId, runId, {
2789-
messageId: updatedRun.parentTaskRunId ?? undefined,
2790-
});
2757+
await this.runQueue.acknowledgeMessage(updatedRun.runtimeEnvironment.organizationId, runId);
27912758

27922759
if (!updatedRun.associatedWaitpoint) {
27932760
throw new ServiceValidationError("No associated waitpoint found", 400);
@@ -2930,9 +2897,7 @@ export class RunEngine {
29302897
});
29312898
const newSnapshot = await getLatestExecutionSnapshot(prisma, runId);
29322899

2933-
await this.runQueue.acknowledgeMessage(run.project.organizationId, runId, {
2934-
messageId: run.parentTaskRunId ?? undefined,
2935-
});
2900+
await this.runQueue.acknowledgeMessage(run.project.organizationId, runId);
29362901

29372902
// We need to manually emit this as we created the final snapshot as part of the task run update
29382903
this.eventBus.emit("executionSnapshotCreated", {
@@ -3281,9 +3246,7 @@ export class RunEngine {
32813246
throw new ServiceValidationError("No associated waitpoint found", 400);
32823247
}
32833248

3284-
await this.runQueue.acknowledgeMessage(run.runtimeEnvironment.organizationId, runId, {
3285-
messageId: run.parentTaskRunId ?? undefined,
3286-
});
3249+
await this.runQueue.acknowledgeMessage(run.runtimeEnvironment.organizationId, runId);
32873250

32883251
await this.completeWaitpoint({
32893252
id: run.associatedWaitpoint.id,
@@ -3327,7 +3290,6 @@ export class RunEngine {
33273290
completedWaitpoints,
33283291
workerId,
33293292
runnerId,
3330-
reserveConcurrency,
33313293
}: {
33323294
run: TaskRun;
33333295
env: MinimalAuthenticatedEnvironment;
@@ -3345,11 +3307,10 @@ export class RunEngine {
33453307
}[];
33463308
workerId?: string;
33473309
runnerId?: string;
3348-
reserveConcurrency?: RunQueueReserveConcurrencyOptions;
3349-
}): Promise<{ wasEnqueued: boolean; error?: TaskRunError }> {
3310+
}): Promise<void> {
33503311
const prisma = tx ?? this.prisma;
33513312

3352-
return await this.runLock.lock([run.id], 5000, async (signal) => {
3313+
await this.runLock.lock([run.id], 5000, async (signal) => {
33533314
const newSnapshot = await this.#createExecutionSnapshot(prisma, {
33543315
run,
33553316
snapshot: {
@@ -3370,7 +3331,7 @@ export class RunEngine {
33703331
masterQueues.push(run.secondaryMasterQueue);
33713332
}
33723333

3373-
const wasEnqueued = await this.runQueue.enqueueMessage({
3334+
await this.runQueue.enqueueMessage({
33743335
env,
33753336
masterQueues,
33763337
message: {
@@ -3385,21 +3346,7 @@ export class RunEngine {
33853346
timestamp,
33863347
attempt: 0,
33873348
},
3388-
reserveConcurrency,
33893349
});
3390-
3391-
if (!wasEnqueued) {
3392-
return {
3393-
wasEnqueued: false,
3394-
error: {
3395-
type: "INTERNAL_ERROR",
3396-
code: TaskRunErrorCodes.RECURSIVE_WAIT_DEADLOCK,
3397-
message: `This run will never execute because it was triggered recursively and the task has no remaining concurrency available`,
3398-
} satisfies TaskRunError,
3399-
};
3400-
}
3401-
3402-
return { wasEnqueued };
34033350
});
34043351
}
34053352

@@ -3614,54 +3561,32 @@ export class RunEngine {
36143561
throw new Error(`#enqueueDelayedRun: run not found: ${runId}`);
36153562
}
36163563

3617-
let reserveConcurrency: RunQueueReserveConcurrencyOptions | undefined;
3618-
3619-
if (run.parentTaskRunId) {
3620-
const parentRun = await this.prisma.taskRun.findFirst({
3621-
where: { id: run.parentTaskRunId },
3622-
});
3623-
3624-
if (parentRun) {
3625-
reserveConcurrency = {
3626-
messageId: parentRun.id,
3627-
recursiveQueue: parentRun.queue === run.queue,
3628-
};
3629-
}
3630-
}
3631-
36323564
// Now we need to enqueue the run into the RunQueue
3633-
const { wasEnqueued, error } = await this.#enqueueRun({
3565+
await this.#enqueueRun({
36343566
run,
36353567
env: run.runtimeEnvironment,
36363568
timestamp: run.createdAt.getTime() - run.priorityMs,
3637-
reserveConcurrency,
36383569
batchId: run.batchId ?? undefined,
36393570
});
36403571

3641-
if (error) {
3642-
await this.#permanentlyFailRun({ runId, error, failedAt: new Date() });
3643-
}
3644-
3645-
if (wasEnqueued) {
3646-
await this.prisma.taskRun.update({
3647-
where: { id: runId },
3648-
data: {
3649-
status: "PENDING",
3650-
queuedAt: new Date(),
3651-
},
3652-
});
3572+
await this.prisma.taskRun.update({
3573+
where: { id: runId },
3574+
data: {
3575+
status: "PENDING",
3576+
queuedAt: new Date(),
3577+
},
3578+
});
36533579

3654-
if (run.ttl) {
3655-
const expireAt = parseNaturalLanguageDuration(run.ttl);
3580+
if (run.ttl) {
3581+
const expireAt = parseNaturalLanguageDuration(run.ttl);
36563582

3657-
if (expireAt) {
3658-
await this.worker.enqueue({
3659-
id: `expireRun:${runId}`,
3660-
job: "expireRun",
3661-
payload: { runId },
3662-
availableAt: expireAt,
3663-
});
3664-
}
3583+
if (expireAt) {
3584+
await this.worker.enqueue({
3585+
id: `expireRun:${runId}`,
3586+
job: "expireRun",
3587+
payload: { runId },
3588+
availableAt: expireAt,
3589+
});
36653590
}
36663591
}
36673592
}

0 commit comments

Comments
 (0)