Skip to content

Commit 936af50

Browse files
committed
go to QUEUE_EXECUTING state if reacquiring concurrency doesn't work
1 parent 8de78cb commit 936af50

1 file changed

Lines changed: 45 additions & 23 deletions

File tree

  • internal-packages/run-engine/src/engine

internal-packages/run-engine/src/engine/index.ts

Lines changed: 45 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3334,6 +3334,7 @@ export class RunEngine {
33343334
timestamp: number;
33353335
tx?: PrismaClientOrTransaction;
33363336
snapshot?: {
3337+
status?: Extract<TaskRunExecutionStatus, "QUEUED" | "QUEUED_EXECUTING">;
33373338
description?: string;
33383339
};
33393340
batchId?: string;
@@ -3350,9 +3351,9 @@ export class RunEngine {
33503351

33513352
return await this.runLock.lock([run.id], 5000, async (signal) => {
33523353
const newSnapshot = await this.#createExecutionSnapshot(prisma, {
3353-
run: run,
3354+
run,
33543355
snapshot: {
3355-
executionStatus: "QUEUED",
3356+
executionStatus: snapshot?.status ?? "QUEUED",
33563357
description: snapshot?.description ?? "Run was QUEUED",
33573358
},
33583359
batchId,
@@ -3519,29 +3520,50 @@ export class RunEngine {
35193520

35203521
//run is still executing, send a message to the worker
35213522
if (isExecuting(snapshot.executionStatus)) {
3522-
const newSnapshot = await this.#createExecutionSnapshot(this.prisma, {
3523-
run: {
3524-
id: runId,
3525-
status: snapshot.runStatus,
3526-
attemptNumber: snapshot.attemptNumber,
3527-
},
3528-
snapshot: {
3529-
executionStatus: "EXECUTING",
3530-
description: "Run was continued, whilst still executing.",
3531-
},
3532-
environmentId: snapshot.environmentId,
3533-
environmentType: snapshot.environmentType,
3534-
batchId: snapshot.batchId ?? undefined,
3535-
completedWaitpoints: blockingWaitpoints.map((b) => ({
3536-
id: b.waitpoint.id,
3537-
index: b.batchIndex ?? undefined,
3538-
})),
3539-
});
3523+
const result = await this.runQueue.reacquireConcurrency(
3524+
run.runtimeEnvironment.organization.id,
3525+
runId
3526+
);
35403527

3541-
//we reacquire the concurrency if it's still running because we're not going to be dequeuing (which also does this)
3542-
await this.runQueue.reacquireConcurrency(run.runtimeEnvironment.organization.id, runId);
3528+
if (result) {
3529+
const newSnapshot = await this.#createExecutionSnapshot(this.prisma, {
3530+
run: {
3531+
id: runId,
3532+
status: snapshot.runStatus,
3533+
attemptNumber: snapshot.attemptNumber,
3534+
},
3535+
snapshot: {
3536+
executionStatus: "EXECUTING",
3537+
description: "Run was continued, whilst still executing.",
3538+
},
3539+
environmentId: snapshot.environmentId,
3540+
environmentType: snapshot.environmentType,
3541+
batchId: snapshot.batchId ?? undefined,
3542+
completedWaitpoints: blockingWaitpoints.map((b) => ({
3543+
id: b.waitpoint.id,
3544+
index: b.batchIndex ?? undefined,
3545+
})),
3546+
});
35433547

3544-
await this.#sendNotificationToWorker({ runId, snapshot: newSnapshot });
3548+
await this.#sendNotificationToWorker({ runId, snapshot: newSnapshot });
3549+
} else {
3550+
// Because we cannot reacquire the concurrency, we need to enqueue the run again
3551+
// and because the run is still executing, we need to set the status to QUEUED_EXECUTING
3552+
await this.#enqueueRun({
3553+
run,
3554+
env: run.runtimeEnvironment,
3555+
timestamp: run.createdAt.getTime() - run.priorityMs,
3556+
snapshot: {
3557+
status: "QUEUED_EXECUTING",
3558+
description: "Run can continue, but is waiting for concurrency",
3559+
},
3560+
batchId: snapshot.batchId ?? undefined,
3561+
completedWaitpoints: blockingWaitpoints.map((b) => ({
3562+
id: b.waitpoint.id,
3563+
index: b.batchIndex ?? undefined,
3564+
})),
3565+
});
3566+
}
35453567
} else {
35463568
if (snapshot.executionStatus !== "RUN_CREATED" && !snapshot.checkpointId) {
35473569
// TODO: We're screwed, should probably fail the run immediately

0 commit comments

Comments
 (0)