Skip to content

Commit 2048b72

Browse files
committed
go to QUEUE_EXECUTING state if reacquiring concurrency doesn't work
1 parent 61c0834 commit 2048b72

1 file changed

Lines changed: 45 additions & 23 deletions

File tree

  • internal-packages/run-engine/src/engine

internal-packages/run-engine/src/engine/index.ts

Lines changed: 45 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3346,6 +3346,7 @@ export class RunEngine {
33463346
timestamp: number;
33473347
tx?: PrismaClientOrTransaction;
33483348
snapshot?: {
3349+
status?: Extract<TaskRunExecutionStatus, "QUEUED" | "QUEUED_EXECUTING">;
33493350
description?: string;
33503351
};
33513352
batchId?: string;
@@ -3362,9 +3363,9 @@ export class RunEngine {
33623363

33633364
return await this.runLock.lock([run.id], 5000, async (signal) => {
33643365
const newSnapshot = await this.#createExecutionSnapshot(prisma, {
3365-
run: run,
3366+
run,
33663367
snapshot: {
3367-
executionStatus: "QUEUED",
3368+
executionStatus: snapshot?.status ?? "QUEUED",
33683369
description: snapshot?.description ?? "Run was QUEUED",
33693370
},
33703371
batchId,
@@ -3531,29 +3532,50 @@ export class RunEngine {
35313532

35323533
//run is still executing, send a message to the worker
35333534
if (isExecuting(snapshot.executionStatus)) {
3534-
const newSnapshot = await this.#createExecutionSnapshot(this.prisma, {
3535-
run: {
3536-
id: runId,
3537-
status: snapshot.runStatus,
3538-
attemptNumber: snapshot.attemptNumber,
3539-
},
3540-
snapshot: {
3541-
executionStatus: "EXECUTING",
3542-
description: "Run was continued, whilst still executing.",
3543-
},
3544-
environmentId: snapshot.environmentId,
3545-
environmentType: snapshot.environmentType,
3546-
batchId: snapshot.batchId ?? undefined,
3547-
completedWaitpoints: blockingWaitpoints.map((b) => ({
3548-
id: b.waitpoint.id,
3549-
index: b.batchIndex ?? undefined,
3550-
})),
3551-
});
3535+
const result = await this.runQueue.reacquireConcurrency(
3536+
run.runtimeEnvironment.organization.id,
3537+
runId
3538+
);
35523539

3553-
//we reacquire the concurrency if it's still running because we're not going to be dequeuing (which also does this)
3554-
await this.runQueue.reacquireConcurrency(run.runtimeEnvironment.organization.id, runId);
3540+
if (result) {
3541+
const newSnapshot = await this.#createExecutionSnapshot(this.prisma, {
3542+
run: {
3543+
id: runId,
3544+
status: snapshot.runStatus,
3545+
attemptNumber: snapshot.attemptNumber,
3546+
},
3547+
snapshot: {
3548+
executionStatus: "EXECUTING",
3549+
description: "Run was continued, whilst still executing.",
3550+
},
3551+
environmentId: snapshot.environmentId,
3552+
environmentType: snapshot.environmentType,
3553+
batchId: snapshot.batchId ?? undefined,
3554+
completedWaitpoints: blockingWaitpoints.map((b) => ({
3555+
id: b.waitpoint.id,
3556+
index: b.batchIndex ?? undefined,
3557+
})),
3558+
});
35553559

3556-
await this.#sendNotificationToWorker({ runId, snapshot: newSnapshot });
3560+
await this.#sendNotificationToWorker({ runId, snapshot: newSnapshot });
3561+
} else {
3562+
// Because we cannot reacquire the concurrency, we need to enqueue the run again
3563+
// and because the run is still executing, we need to set the status to QUEUED_EXECUTING
3564+
await this.#enqueueRun({
3565+
run,
3566+
env: run.runtimeEnvironment,
3567+
timestamp: run.createdAt.getTime() - run.priorityMs,
3568+
snapshot: {
3569+
status: "QUEUED_EXECUTING",
3570+
description: "Run can continue, but is waiting for concurrency",
3571+
},
3572+
batchId: snapshot.batchId ?? undefined,
3573+
completedWaitpoints: blockingWaitpoints.map((b) => ({
3574+
id: b.waitpoint.id,
3575+
index: b.batchIndex ?? undefined,
3576+
})),
3577+
});
3578+
}
35573579
} else {
35583580
if (snapshot.executionStatus !== "RUN_CREATED" && !snapshot.checkpointId) {
35593581
// TODO: We're screwed, should probably fail the run immediately

0 commit comments

Comments
 (0)