Skip to content

Commit 96901b4

Browse files
committed
move startRunAttempt to RunAttemptSystem
1 parent ba67b53 commit 96901b4

2 files changed

Lines changed: 326 additions & 270 deletions

File tree

internal-packages/run-engine/src/engine/index.ts

Lines changed: 9 additions & 269 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,7 @@ export class RunEngine {
267267
executionSnapshotSystem: this.executionSnapshotSystem,
268268
batchSystem: this.batchSystem,
269269
waitpointSystem: this.waitpointSystem,
270+
machines: this.options.machines,
270271
});
271272

272273
this.dequeueSystem = new DequeueSystem({
@@ -703,275 +704,14 @@ export class RunEngine {
703704
isWarmStart?: boolean;
704705
tx?: PrismaClientOrTransaction;
705706
}): Promise<StartRunAttemptResult> {
706-
const prisma = tx ?? this.prisma;
707-
708-
return startSpan(
709-
this.tracer,
710-
"startRunAttempt",
711-
async (span) => {
712-
return this.runLock.lock([runId], 5000, async () => {
713-
const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId);
714-
715-
if (latestSnapshot.id !== snapshotId) {
716-
//if there is a big delay between the snapshot and the attempt, the snapshot might have changed
717-
//we just want to log because elsewhere it should have been put back into a state where it can be attempted
718-
this.logger.warn(
719-
"RunEngine.createRunAttempt(): snapshot has changed since the attempt was created, ignoring."
720-
);
721-
throw new ServiceValidationError("Snapshot changed", 409);
722-
}
723-
724-
const environment = await this.#getAuthenticatedEnvironmentFromRun(runId, prisma);
725-
if (!environment) {
726-
throw new ServiceValidationError("Environment not found", 404);
727-
}
728-
729-
const taskRun = await prisma.taskRun.findFirst({
730-
where: {
731-
id: runId,
732-
},
733-
include: {
734-
tags: true,
735-
lockedBy: {
736-
include: {
737-
worker: {
738-
select: {
739-
id: true,
740-
version: true,
741-
sdkVersion: true,
742-
cliVersion: true,
743-
supportsLazyAttempts: true,
744-
},
745-
},
746-
},
747-
},
748-
batchItems: {
749-
include: {
750-
batchTaskRun: true,
751-
},
752-
},
753-
},
754-
});
755-
756-
this.logger.debug("Creating a task run attempt", { taskRun });
757-
758-
if (!taskRun) {
759-
throw new ServiceValidationError("Task run not found", 404);
760-
}
761-
762-
span.setAttribute("projectId", taskRun.projectId);
763-
span.setAttribute("environmentId", taskRun.runtimeEnvironmentId);
764-
span.setAttribute("taskRunId", taskRun.id);
765-
span.setAttribute("taskRunFriendlyId", taskRun.friendlyId);
766-
767-
if (taskRun.status === "CANCELED") {
768-
throw new ServiceValidationError("Task run is cancelled", 400);
769-
}
770-
771-
if (!taskRun.lockedBy) {
772-
throw new ServiceValidationError("Task run is not locked", 400);
773-
}
774-
775-
const queue = await prisma.taskQueue.findUnique({
776-
where: {
777-
runtimeEnvironmentId_name: {
778-
runtimeEnvironmentId: environment.id,
779-
name: taskRun.queue,
780-
},
781-
},
782-
});
783-
784-
if (!queue) {
785-
throw new ServiceValidationError("Queue not found", 404);
786-
}
787-
788-
//increment the attempt number (start at 1)
789-
const nextAttemptNumber = (taskRun.attemptNumber ?? 0) + 1;
790-
791-
if (nextAttemptNumber > MAX_TASK_RUN_ATTEMPTS) {
792-
await this.runAttemptSystem.attemptFailed({
793-
runId: taskRun.id,
794-
snapshotId,
795-
completion: {
796-
ok: false,
797-
id: taskRun.id,
798-
error: {
799-
type: "INTERNAL_ERROR",
800-
code: "TASK_RUN_CRASHED",
801-
message: "Max attempts reached.",
802-
},
803-
},
804-
tx: prisma,
805-
});
806-
throw new ServiceValidationError("Max attempts reached", 400);
807-
}
808-
809-
this.eventBus.emit("runAttemptStarted", {
810-
time: new Date(),
811-
run: {
812-
id: taskRun.id,
813-
attemptNumber: nextAttemptNumber,
814-
baseCostInCents: taskRun.baseCostInCents,
815-
},
816-
organization: {
817-
id: environment.organization.id,
818-
},
819-
});
820-
821-
const result = await $transaction(
822-
prisma,
823-
async (tx) => {
824-
const run = await tx.taskRun.update({
825-
where: {
826-
id: taskRun.id,
827-
},
828-
data: {
829-
status: "EXECUTING",
830-
attemptNumber: nextAttemptNumber,
831-
executedAt: taskRun.attemptNumber === null ? new Date() : undefined,
832-
},
833-
include: {
834-
tags: true,
835-
lockedBy: {
836-
include: { worker: true },
837-
},
838-
},
839-
});
840-
841-
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(tx, {
842-
run,
843-
snapshot: {
844-
executionStatus: "EXECUTING",
845-
description: `Attempt created, starting execution${
846-
isWarmStart ? " (warm start)" : ""
847-
}`,
848-
},
849-
environmentId: latestSnapshot.environmentId,
850-
environmentType: latestSnapshot.environmentType,
851-
workerId,
852-
runnerId,
853-
});
854-
855-
if (taskRun.ttl) {
856-
//don't expire the run, it's going to execute
857-
await this.worker.ack(`expireRun:${taskRun.id}`);
858-
}
859-
860-
return { run, snapshot: newSnapshot };
861-
},
862-
(error) => {
863-
this.logger.error("RunEngine.createRunAttempt(): prisma.$transaction error", {
864-
code: error.code,
865-
meta: error.meta,
866-
stack: error.stack,
867-
message: error.message,
868-
name: error.name,
869-
});
870-
throw new ServiceValidationError(
871-
"Failed to update task run and execution snapshot",
872-
500
873-
);
874-
}
875-
);
876-
877-
if (!result) {
878-
this.logger.error("RunEngine.createRunAttempt(): failed to create task run attempt", {
879-
runId: taskRun.id,
880-
nextAttemptNumber,
881-
});
882-
throw new ServiceValidationError("Failed to create task run attempt", 500);
883-
}
884-
885-
const { run, snapshot } = result;
886-
887-
const machinePreset = getMachinePreset({
888-
machines: this.options.machines.machines,
889-
defaultMachine: this.options.machines.defaultMachine,
890-
config: taskRun.lockedBy.machineConfig ?? {},
891-
run: taskRun,
892-
});
893-
894-
const metadata = await parsePacket({
895-
data: taskRun.metadata ?? undefined,
896-
dataType: taskRun.metadataType,
897-
});
898-
899-
const execution: TaskRunExecution = {
900-
task: {
901-
id: run.lockedBy!.slug,
902-
filePath: run.lockedBy!.filePath,
903-
exportName: run.lockedBy!.exportName,
904-
},
905-
attempt: {
906-
number: nextAttemptNumber,
907-
startedAt: latestSnapshot.updatedAt,
908-
/** @deprecated */
909-
id: "deprecated",
910-
/** @deprecated */
911-
backgroundWorkerId: "deprecated",
912-
/** @deprecated */
913-
backgroundWorkerTaskId: "deprecated",
914-
/** @deprecated */
915-
status: "deprecated",
916-
},
917-
run: {
918-
id: run.friendlyId,
919-
payload: run.payload,
920-
payloadType: run.payloadType,
921-
createdAt: run.createdAt,
922-
tags: run.tags.map((tag) => tag.name),
923-
isTest: run.isTest,
924-
idempotencyKey: run.idempotencyKey ?? undefined,
925-
startedAt: run.startedAt ?? run.createdAt,
926-
maxAttempts: run.maxAttempts ?? undefined,
927-
version: run.lockedBy!.worker.version,
928-
metadata,
929-
maxDuration: run.maxDurationInSeconds ?? undefined,
930-
/** @deprecated */
931-
context: undefined,
932-
/** @deprecated */
933-
durationMs: run.usageDurationMs,
934-
/** @deprecated */
935-
costInCents: run.costInCents,
936-
/** @deprecated */
937-
baseCostInCents: run.baseCostInCents,
938-
traceContext: run.traceContext as Record<string, string | undefined>,
939-
priority: run.priorityMs === 0 ? undefined : run.priorityMs / 1_000,
940-
},
941-
queue: {
942-
id: queue.friendlyId,
943-
name: queue.name,
944-
},
945-
environment: {
946-
id: environment.id,
947-
slug: environment.slug,
948-
type: environment.type,
949-
},
950-
organization: {
951-
id: environment.organization.id,
952-
slug: environment.organization.slug,
953-
name: environment.organization.title,
954-
},
955-
project: {
956-
id: environment.project.id,
957-
ref: environment.project.externalRef,
958-
slug: environment.project.slug,
959-
name: environment.project.name,
960-
},
961-
batch:
962-
taskRun.batchItems[0] && taskRun.batchItems[0].batchTaskRun
963-
? { id: taskRun.batchItems[0].batchTaskRun.friendlyId }
964-
: undefined,
965-
machine: machinePreset,
966-
};
967-
968-
return { run, snapshot, execution };
969-
});
970-
},
971-
{
972-
attributes: { runId, snapshotId },
973-
}
974-
);
707+
return this.runAttemptSystem.startRunAttempt({
708+
runId,
709+
snapshotId,
710+
workerId,
711+
runnerId,
712+
isWarmStart,
713+
tx,
714+
});
975715
}
976716

977717
/** How a run is completed */

0 commit comments

Comments
 (0)