11import {
2+ type MachinePresetName ,
23 conditionallyImportPacket ,
3- IOPacket ,
44 parsePacket ,
5- RunTags ,
6- stringifyIO ,
75} from "@trigger.dev/core/v3" ;
8- import { replaceSuperJsonPayload } from "@trigger.dev/core/v3/utils/ioSerialization" ;
9- import { TaskRun } from "@trigger.dev/database" ;
6+ import { type TaskRun } from "@trigger.dev/database" ;
107import { findEnvironmentById } from "~/models/runtimeEnvironment.server" ;
118import { getTagsForRunId } from "~/models/taskRunTag.server" ;
129import { logger } from "~/services/logger.server" ;
1310import { BaseService } from "./baseService.server" ;
1411import { OutOfEntitlementError , TriggerTaskService } from "./triggerTask.server" ;
12+ import { type RunOptionsData } from "../testTask" ;
1513
1614type OverrideOptions = {
1715 environmentId ?: string ;
18- payload ?: string ;
19- } ;
16+ payload ?: unknown ;
17+ metadata ?: unknown ;
18+ } & RunOptionsData ;
2019
2120export class ReplayTaskRunService extends BaseService {
22- public async call ( existingTaskRun : TaskRun , overrideOptions ? : OverrideOptions ) {
21+ public async call ( existingTaskRun : TaskRun , overrideOptions : OverrideOptions = { } ) {
2322 const authenticatedEnvironment = await findEnvironmentById (
24- overrideOptions ? .environmentId ?? existingTaskRun . runtimeEnvironmentId
23+ overrideOptions . environmentId ?? existingTaskRun . runtimeEnvironmentId
2524 ) ;
2625 if ( ! authenticatedEnvironment ) {
2726 return ;
@@ -36,57 +35,41 @@ export class ReplayTaskRunService extends BaseService {
3635 taskRunFriendlyId : existingTaskRun . friendlyId ,
3736 } ) ;
3837
39- let payloadPacket : IOPacket ;
40-
41- if ( overrideOptions ?. payload ) {
42- if ( existingTaskRun . payloadType === "application/super+json" ) {
43- const newPayload = await replaceSuperJsonPayload (
44- existingTaskRun . payload ,
45- overrideOptions . payload
46- ) ;
47- payloadPacket = await stringifyIO ( newPayload ) ;
48- } else {
49- payloadPacket = await conditionallyImportPacket ( {
50- data : overrideOptions . payload ,
51- dataType : existingTaskRun . payloadType ,
52- } ) ;
53- }
54- } else {
55- payloadPacket = await conditionallyImportPacket ( {
38+ const getExistingPayload = async ( ) => {
39+ const existingPayloadPacket = await conditionallyImportPacket ( {
5640 data : existingTaskRun . payload ,
5741 dataType : existingTaskRun . payloadType ,
5842 } ) ;
59- }
6043
61- const parsedPayload =
62- payloadPacket . dataType === "application/json"
63- ? await parsePacket ( payloadPacket )
64- : payloadPacket . data ;
65-
66- logger . info ( "Replaying task run payload" , {
67- taskRunId : existingTaskRun . id ,
68- taskRunFriendlyId : existingTaskRun . friendlyId ,
69- payloadPacketType : payloadPacket . dataType ,
70- } ) ;
44+ return existingPayloadPacket . dataType === "application/json"
45+ ? await parsePacket ( existingPayloadPacket )
46+ : existingPayloadPacket . data ;
47+ } ;
7148
72- const metadata = existingTaskRun . seedMetadata
73- ? await parsePacket ( {
74- data : existingTaskRun . seedMetadata ,
75- dataType : existingTaskRun . seedMetadataType ,
76- } )
77- : undefined ;
49+ const payload = overrideOptions . payload ?? ( await getExistingPayload ( ) ) ;
50+ const metadata =
51+ overrideOptions . metadata ??
52+ ( existingTaskRun . seedMetadata
53+ ? await parsePacket ( {
54+ data : existingTaskRun . seedMetadata ,
55+ dataType : existingTaskRun . seedMetadataType ,
56+ } )
57+ : undefined ) ;
7858
7959 try {
80- const tags = await getTagsForRunId ( {
81- friendlyId : existingTaskRun . friendlyId ,
82- environmentId : authenticatedEnvironment . id ,
83- } ) ;
60+ const tags =
61+ overrideOptions . tags ??
62+ (
63+ await getTagsForRunId ( {
64+ friendlyId : existingTaskRun . friendlyId ,
65+ environmentId : authenticatedEnvironment . id ,
66+ } )
67+ ) ?. map ( ( t ) => t . name ) ;
8468
85- //get the queue from the original run, so we can use the same settings on the replay
8669 const taskQueue = await this . _prisma . taskQueue . findFirst ( {
8770 where : {
8871 runtimeEnvironmentId : authenticatedEnvironment . id ,
89- name : existingTaskRun . queue ,
72+ name : overrideOptions . queue ?? existingTaskRun . queue ,
9073 } ,
9174 } ) ;
9275
@@ -95,18 +78,34 @@ export class ReplayTaskRunService extends BaseService {
9578 existingTaskRun . taskIdentifier ,
9679 authenticatedEnvironment ,
9780 {
98- payload : parsedPayload ,
81+ payload,
9982 options : {
10083 queue : taskQueue
10184 ? {
10285 name : taskQueue . name ,
10386 }
10487 : undefined ,
105- concurrencyKey : existingTaskRun . concurrencyKey ?? undefined ,
10688 test : existingTaskRun . isTest ,
107- payloadType : payloadPacket . dataType ,
108- tags : tags ?. map ( ( t ) => t . name ) as RunTags ,
109- metadata,
89+ tags,
90+ metadata : metadata ,
91+ delay : overrideOptions . delaySeconds
92+ ? new Date ( Date . now ( ) + overrideOptions . delaySeconds * 1000 )
93+ : undefined ,
94+ ttl : overrideOptions . ttlSeconds ,
95+ idempotencyKey : overrideOptions . idempotencyKey ,
96+ idempotencyKeyTTL : overrideOptions . idempotencyKeyTTLSeconds
97+ ? `${ overrideOptions . idempotencyKeyTTLSeconds } s`
98+ : undefined ,
99+ concurrencyKey :
100+ overrideOptions . concurrencyKey ?? existingTaskRun . concurrencyKey ?? undefined ,
101+ maxAttempts : overrideOptions . maxAttempts ,
102+ maxDuration : overrideOptions . maxDurationSeconds ,
103+ machine :
104+ overrideOptions . machine ??
105+ ( existingTaskRun . machinePreset as MachinePresetName ) ??
106+ undefined ,
107+ lockToVersion :
108+ overrideOptions . version === "latest" ? undefined : overrideOptions . version ,
110109 } ,
111110 } ,
112111 {
0 commit comments