@@ -267,6 +267,7 @@ export class RunEngine {
267267 executionSnapshotSystem : this . executionSnapshotSystem ,
268268 batchSystem : this . batchSystem ,
269269 waitpointSystem : this . waitpointSystem ,
270+ machines : this . options . machines ,
270271 } ) ;
271272
272273 this . dequeueSystem = new DequeueSystem ( {
@@ -703,275 +704,14 @@ export class RunEngine {
703704 isWarmStart ?: boolean ;
704705 tx ?: PrismaClientOrTransaction ;
705706 } ) : Promise < StartRunAttemptResult > {
706- const prisma = tx ?? this . prisma ;
707-
708- return startSpan (
709- this . tracer ,
710- "startRunAttempt" ,
711- async ( span ) => {
712- return this . runLock . lock ( [ runId ] , 5000 , async ( ) => {
713- const latestSnapshot = await getLatestExecutionSnapshot ( prisma , runId ) ;
714-
715- if ( latestSnapshot . id !== snapshotId ) {
716- //if there is a big delay between the snapshot and the attempt, the snapshot might have changed
717- //we just want to log because elsewhere it should have been put back into a state where it can be attempted
718- this . logger . warn (
719- "RunEngine.createRunAttempt(): snapshot has changed since the attempt was created, ignoring."
720- ) ;
721- throw new ServiceValidationError ( "Snapshot changed" , 409 ) ;
722- }
723-
724- const environment = await this . #getAuthenticatedEnvironmentFromRun( runId , prisma ) ;
725- if ( ! environment ) {
726- throw new ServiceValidationError ( "Environment not found" , 404 ) ;
727- }
728-
729- const taskRun = await prisma . taskRun . findFirst ( {
730- where : {
731- id : runId ,
732- } ,
733- include : {
734- tags : true ,
735- lockedBy : {
736- include : {
737- worker : {
738- select : {
739- id : true ,
740- version : true ,
741- sdkVersion : true ,
742- cliVersion : true ,
743- supportsLazyAttempts : true ,
744- } ,
745- } ,
746- } ,
747- } ,
748- batchItems : {
749- include : {
750- batchTaskRun : true ,
751- } ,
752- } ,
753- } ,
754- } ) ;
755-
756- this . logger . debug ( "Creating a task run attempt" , { taskRun } ) ;
757-
758- if ( ! taskRun ) {
759- throw new ServiceValidationError ( "Task run not found" , 404 ) ;
760- }
761-
762- span . setAttribute ( "projectId" , taskRun . projectId ) ;
763- span . setAttribute ( "environmentId" , taskRun . runtimeEnvironmentId ) ;
764- span . setAttribute ( "taskRunId" , taskRun . id ) ;
765- span . setAttribute ( "taskRunFriendlyId" , taskRun . friendlyId ) ;
766-
767- if ( taskRun . status === "CANCELED" ) {
768- throw new ServiceValidationError ( "Task run is cancelled" , 400 ) ;
769- }
770-
771- if ( ! taskRun . lockedBy ) {
772- throw new ServiceValidationError ( "Task run is not locked" , 400 ) ;
773- }
774-
775- const queue = await prisma . taskQueue . findUnique ( {
776- where : {
777- runtimeEnvironmentId_name : {
778- runtimeEnvironmentId : environment . id ,
779- name : taskRun . queue ,
780- } ,
781- } ,
782- } ) ;
783-
784- if ( ! queue ) {
785- throw new ServiceValidationError ( "Queue not found" , 404 ) ;
786- }
787-
788- //increment the attempt number (start at 1)
789- const nextAttemptNumber = ( taskRun . attemptNumber ?? 0 ) + 1 ;
790-
791- if ( nextAttemptNumber > MAX_TASK_RUN_ATTEMPTS ) {
792- await this . runAttemptSystem . attemptFailed ( {
793- runId : taskRun . id ,
794- snapshotId,
795- completion : {
796- ok : false ,
797- id : taskRun . id ,
798- error : {
799- type : "INTERNAL_ERROR" ,
800- code : "TASK_RUN_CRASHED" ,
801- message : "Max attempts reached." ,
802- } ,
803- } ,
804- tx : prisma ,
805- } ) ;
806- throw new ServiceValidationError ( "Max attempts reached" , 400 ) ;
807- }
808-
809- this . eventBus . emit ( "runAttemptStarted" , {
810- time : new Date ( ) ,
811- run : {
812- id : taskRun . id ,
813- attemptNumber : nextAttemptNumber ,
814- baseCostInCents : taskRun . baseCostInCents ,
815- } ,
816- organization : {
817- id : environment . organization . id ,
818- } ,
819- } ) ;
820-
821- const result = await $transaction (
822- prisma ,
823- async ( tx ) => {
824- const run = await tx . taskRun . update ( {
825- where : {
826- id : taskRun . id ,
827- } ,
828- data : {
829- status : "EXECUTING" ,
830- attemptNumber : nextAttemptNumber ,
831- executedAt : taskRun . attemptNumber === null ? new Date ( ) : undefined ,
832- } ,
833- include : {
834- tags : true ,
835- lockedBy : {
836- include : { worker : true } ,
837- } ,
838- } ,
839- } ) ;
840-
841- const newSnapshot = await this . executionSnapshotSystem . createExecutionSnapshot ( tx , {
842- run,
843- snapshot : {
844- executionStatus : "EXECUTING" ,
845- description : `Attempt created, starting execution${
846- isWarmStart ? " (warm start)" : ""
847- } `,
848- } ,
849- environmentId : latestSnapshot . environmentId ,
850- environmentType : latestSnapshot . environmentType ,
851- workerId,
852- runnerId,
853- } ) ;
854-
855- if ( taskRun . ttl ) {
856- //don't expire the run, it's going to execute
857- await this . worker . ack ( `expireRun:${ taskRun . id } ` ) ;
858- }
859-
860- return { run, snapshot : newSnapshot } ;
861- } ,
862- ( error ) => {
863- this . logger . error ( "RunEngine.createRunAttempt(): prisma.$transaction error" , {
864- code : error . code ,
865- meta : error . meta ,
866- stack : error . stack ,
867- message : error . message ,
868- name : error . name ,
869- } ) ;
870- throw new ServiceValidationError (
871- "Failed to update task run and execution snapshot" ,
872- 500
873- ) ;
874- }
875- ) ;
876-
877- if ( ! result ) {
878- this . logger . error ( "RunEngine.createRunAttempt(): failed to create task run attempt" , {
879- runId : taskRun . id ,
880- nextAttemptNumber,
881- } ) ;
882- throw new ServiceValidationError ( "Failed to create task run attempt" , 500 ) ;
883- }
884-
885- const { run, snapshot } = result ;
886-
887- const machinePreset = getMachinePreset ( {
888- machines : this . options . machines . machines ,
889- defaultMachine : this . options . machines . defaultMachine ,
890- config : taskRun . lockedBy . machineConfig ?? { } ,
891- run : taskRun ,
892- } ) ;
893-
894- const metadata = await parsePacket ( {
895- data : taskRun . metadata ?? undefined ,
896- dataType : taskRun . metadataType ,
897- } ) ;
898-
899- const execution : TaskRunExecution = {
900- task : {
901- id : run . lockedBy ! . slug ,
902- filePath : run . lockedBy ! . filePath ,
903- exportName : run . lockedBy ! . exportName ,
904- } ,
905- attempt : {
906- number : nextAttemptNumber ,
907- startedAt : latestSnapshot . updatedAt ,
908- /** @deprecated */
909- id : "deprecated" ,
910- /** @deprecated */
911- backgroundWorkerId : "deprecated" ,
912- /** @deprecated */
913- backgroundWorkerTaskId : "deprecated" ,
914- /** @deprecated */
915- status : "deprecated" ,
916- } ,
917- run : {
918- id : run . friendlyId ,
919- payload : run . payload ,
920- payloadType : run . payloadType ,
921- createdAt : run . createdAt ,
922- tags : run . tags . map ( ( tag ) => tag . name ) ,
923- isTest : run . isTest ,
924- idempotencyKey : run . idempotencyKey ?? undefined ,
925- startedAt : run . startedAt ?? run . createdAt ,
926- maxAttempts : run . maxAttempts ?? undefined ,
927- version : run . lockedBy ! . worker . version ,
928- metadata,
929- maxDuration : run . maxDurationInSeconds ?? undefined ,
930- /** @deprecated */
931- context : undefined ,
932- /** @deprecated */
933- durationMs : run . usageDurationMs ,
934- /** @deprecated */
935- costInCents : run . costInCents ,
936- /** @deprecated */
937- baseCostInCents : run . baseCostInCents ,
938- traceContext : run . traceContext as Record < string , string | undefined > ,
939- priority : run . priorityMs === 0 ? undefined : run . priorityMs / 1_000 ,
940- } ,
941- queue : {
942- id : queue . friendlyId ,
943- name : queue . name ,
944- } ,
945- environment : {
946- id : environment . id ,
947- slug : environment . slug ,
948- type : environment . type ,
949- } ,
950- organization : {
951- id : environment . organization . id ,
952- slug : environment . organization . slug ,
953- name : environment . organization . title ,
954- } ,
955- project : {
956- id : environment . project . id ,
957- ref : environment . project . externalRef ,
958- slug : environment . project . slug ,
959- name : environment . project . name ,
960- } ,
961- batch :
962- taskRun . batchItems [ 0 ] && taskRun . batchItems [ 0 ] . batchTaskRun
963- ? { id : taskRun . batchItems [ 0 ] . batchTaskRun . friendlyId }
964- : undefined ,
965- machine : machinePreset ,
966- } ;
967-
968- return { run, snapshot, execution } ;
969- } ) ;
970- } ,
971- {
972- attributes : { runId, snapshotId } ,
973- }
974- ) ;
707+ return this . runAttemptSystem . startRunAttempt ( {
708+ runId,
709+ snapshotId,
710+ workerId,
711+ runnerId,
712+ isWarmStart,
713+ tx,
714+ } ) ;
975715 }
976716
977717 /** How a run is completed */
0 commit comments