Skip to content

Commit 1887eb2

Browse files
feat: add isReplay to run context
Add isReplay boolean to the run context, following the same pattern as isTest. The value is derived from the existing replayedFromTaskRunFriendlyId database field - no schema migration needed. Changes: - Add isReplay to TaskRun and V3TaskRun schemas (default false) - Add RUN_IS_REPLAY semantic attribute - Propagate isReplay through dequeue and attempt systems - Add isReplay to DequeuedMessage and LazyAttemptPayload schemas Co-Authored-By: nick <55853254+nicktrn@users.noreply.github.com>
1 parent 5693b62 commit 1887eb2

7 files changed

Lines changed: 59 additions & 48 deletions

File tree

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -607,6 +607,7 @@ export class DequeueSystem {
607607
id: lockedTaskRun.id,
608608
friendlyId: lockedTaskRun.friendlyId,
609609
isTest: lockedTaskRun.isTest,
610+
isReplay: !!lockedTaskRun.replayedFromTaskRunFriendlyId,
610611
machine: machinePreset,
611612
attemptNumber: nextAttemptNumber,
612613
// Keeping this for backwards compatibility, but really this should be called workerQueue

internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts

Lines changed: 46 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ export class RunAttemptSystem {
196196
machinePreset: true,
197197
runTags: true,
198198
isTest: true,
199+
replayedFromTaskRunFriendlyId: true,
199200
idempotencyKey: true,
200201
idempotencyKeyOptions: true,
201202
startedAt: true,
@@ -232,9 +233,9 @@ export class RunAttemptSystem {
232233
run.lockedById
233234
? this.#resolveTaskRunExecutionTask(run.lockedById)
234235
: Promise.resolve({
235-
id: run.taskIdentifier,
236-
filePath: "unknown",
237-
}),
236+
id: run.taskIdentifier,
237+
filePath: "unknown",
238+
}),
238239
this.#resolveTaskRunExecutionQueue({
239240
lockedQueueId: run.lockedQueueId ?? undefined,
240241
queueName: run.queue,
@@ -245,13 +246,13 @@ export class RunAttemptSystem {
245246
run.lockedById
246247
? this.#resolveTaskRunExecutionMachinePreset(run.lockedById, run.machinePreset)
247248
: Promise.resolve(
248-
getMachinePreset({
249-
defaultMachine: this.options.machines.defaultMachine,
250-
machines: this.options.machines.machines,
251-
config: undefined,
252-
run,
253-
})
254-
),
249+
getMachinePreset({
250+
defaultMachine: this.options.machines.defaultMachine,
251+
machines: this.options.machines.machines,
252+
config: undefined,
253+
run,
254+
})
255+
),
255256
run.lockedById
256257
? this.#resolveTaskRunExecutionDeployment(run.lockedById)
257258
: Promise.resolve(undefined),
@@ -262,6 +263,7 @@ export class RunAttemptSystem {
262263
id: run.friendlyId,
263264
tags: run.runTags,
264265
isTest: run.isTest,
266+
isReplay: !!run.replayedFromTaskRunFriendlyId,
265267
createdAt: run.createdAt,
266268
startedAt: run.startedAt ?? run.createdAt,
267269
idempotencyKey: getUserProvidedIdempotencyKey(run) ?? undefined,
@@ -426,6 +428,7 @@ export class RunAttemptSystem {
426428
payloadType: true,
427429
runTags: true,
428430
isTest: true,
431+
replayedFromTaskRunFriendlyId: true,
429432
idempotencyKey: true,
430433
idempotencyKeyOptions: true,
431434
startedAt: true,
@@ -459,8 +462,9 @@ export class RunAttemptSystem {
459462
run,
460463
snapshot: {
461464
executionStatus: "EXECUTING",
462-
description: `Attempt created, starting execution${isWarmStart ? " (warm start)" : ""
463-
}`,
465+
description: `Attempt created, starting execution${
466+
isWarmStart ? " (warm start)" : ""
467+
}`,
464468
},
465469
previousSnapshotId: latestSnapshot.id,
466470
environmentId: latestSnapshot.environmentId,
@@ -574,6 +578,7 @@ export class RunAttemptSystem {
574578
createdAt: updatedRun.createdAt,
575579
tags: updatedRun.runTags,
576580
isTest: updatedRun.isTest,
581+
isReplay: !!updatedRun.replayedFromTaskRunFriendlyId,
577582
idempotencyKey: getUserProvidedIdempotencyKey(updatedRun) ?? undefined,
578583
idempotencyKeyScope: extractIdempotencyKeyScope(updatedRun),
579584
startedAt: updatedRun.startedAt ?? updatedRun.createdAt,
@@ -618,8 +623,8 @@ export class RunAttemptSystem {
618623
deployment,
619624
batch: updatedRun.batchId
620625
? {
621-
id: BatchId.toFriendlyId(updatedRun.batchId),
622-
}
626+
id: BatchId.toFriendlyId(updatedRun.batchId),
627+
}
623628
: undefined,
624629
};
625630

@@ -1387,8 +1392,8 @@ export class RunAttemptSystem {
13871392
error,
13881393
bulkActionGroupIds: bulkActionId
13891394
? {
1390-
push: bulkActionId,
1391-
}
1395+
push: bulkActionId,
1396+
}
13921397
: undefined,
13931398
...(usageUpdate && {
13941399
usageDurationMs: usageUpdate.usageDurationMs,
@@ -1876,26 +1881,26 @@ export class RunAttemptSystem {
18761881
const result = await this.cache.queues.swr(cacheKey, async () => {
18771882
const queue = params.lockedQueueId
18781883
? await this.$.readOnlyPrisma.taskQueue.findFirst({
1879-
where: {
1880-
id: params.lockedQueueId,
1881-
},
1882-
select: {
1883-
id: true,
1884-
friendlyId: true,
1885-
name: true,
1886-
},
1887-
})
1884+
where: {
1885+
id: params.lockedQueueId,
1886+
},
1887+
select: {
1888+
id: true,
1889+
friendlyId: true,
1890+
name: true,
1891+
},
1892+
})
18881893
: await this.$.readOnlyPrisma.taskQueue.findFirst({
1889-
where: {
1890-
runtimeEnvironmentId: params.runtimeEnvironmentId,
1891-
name: params.queueName,
1892-
},
1893-
select: {
1894-
id: true,
1895-
friendlyId: true,
1896-
name: true,
1897-
},
1898-
});
1894+
where: {
1895+
runtimeEnvironmentId: params.runtimeEnvironmentId,
1896+
name: params.queueName,
1897+
},
1898+
select: {
1899+
id: true,
1900+
friendlyId: true,
1901+
name: true,
1902+
},
1903+
});
18991904

19001905
if (!queue) {
19011906
// Return synthetic queue so run/span view still loads (e.g. createFailedTaskRun with fallback queue)
@@ -2068,13 +2073,13 @@ export class RunAttemptSystem {
20682073
if (environmentType !== "DEVELOPMENT") {
20692074
const machinePreset = machinePresetName
20702075
? machinePresetFromName(
2071-
this.options.machines.machines,
2072-
machinePresetName as MachinePresetName
2073-
)
2076+
this.options.machines.machines,
2077+
machinePresetName as MachinePresetName
2078+
)
20742079
: machinePresetFromName(
2075-
this.options.machines.machines,
2076-
this.options.machines.defaultMachine
2077-
);
2080+
this.options.machines.machines,
2081+
this.options.machines.defaultMachine
2082+
);
20782083

20792084
costInCents = currentCostInCents + attemptDurationMs * machinePreset.centsPerMs;
20802085
}
@@ -2084,7 +2089,6 @@ export class RunAttemptSystem {
20842089
costInCents,
20852090
};
20862091
}
2087-
20882092
}
20892093

20902094
export function safeParseGitMeta(git: unknown): GitMeta | undefined {

packages/core/src/v3/schemas/common.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ export const TaskRun = z.object({
215215
payloadType: z.string(),
216216
tags: z.array(z.string()),
217217
isTest: z.boolean().default(false),
218+
isReplay: z.boolean().default(false),
218219
createdAt: z.coerce.date(),
219220
startedAt: z.coerce.date().default(() => new Date()),
220221
/** The user-provided idempotency key (not the hash) */
@@ -378,6 +379,7 @@ export const V3TaskRun = z.object({
378379
payloadType: z.string(),
379380
tags: z.array(z.string()),
380381
isTest: z.boolean().default(false),
382+
isReplay: z.boolean().default(false),
381383
createdAt: z.coerce.date(),
382384
startedAt: z.coerce.date().default(() => new Date()),
383385
/** The user-provided idempotency key (not the hash) */
@@ -538,13 +540,13 @@ export type WaitpointTokenResult = z.infer<typeof WaitpointTokenResult>;
538540

539541
export type WaitpointTokenTypedResult<T> =
540542
| {
541-
ok: true;
542-
output: T;
543-
}
543+
ok: true;
544+
output: T;
545+
}
544546
| {
545-
ok: false;
546-
error: Error;
547-
};
547+
ok: false;
548+
error: Error;
549+
};
548550

549551
export const SerializedError = z.object({
550552
message: z.string(),

packages/core/src/v3/schemas/runEngine.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ export const DequeuedMessage = z.object({
277277
id: z.string(),
278278
friendlyId: z.string(),
279279
isTest: z.boolean(),
280+
isReplay: z.boolean().default(false),
280281
machine: MachinePreset,
281282
attemptNumber: z.number(),
282283
masterQueue: z.string(),

packages/core/src/v3/schemas/schemas.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,7 @@ export const TaskRunExecutionLazyAttemptPayload = z.object({
292292
attemptCount: z.number().optional(),
293293
messageId: z.string(),
294294
isTest: z.boolean(),
295+
isReplay: z.boolean().default(false),
295296
traceContext: z.record(z.unknown()),
296297
environment: z.record(z.string()).optional(),
297298
metrics: TaskRunExecutionMetrics.optional(),

packages/core/src/v3/semanticInternalAttributes.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ export const SemanticInternalAttributes = {
1212
ATTEMPT_NUMBER: "ctx.attempt.number",
1313
RUN_ID: "ctx.run.id",
1414
RUN_IS_TEST: "ctx.run.isTest",
15+
RUN_IS_REPLAY: "ctx.run.isReplay",
1516
ORIGINAL_RUN_ID: "$original_run_id",
1617
BATCH_ID: "ctx.batch.id",
1718
TASK_SLUG: "ctx.task.id",

packages/core/src/v3/taskContext/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ export class TaskContextAPI {
9494
[SemanticInternalAttributes.QUEUE_ID]: this.ctx.queue.id,
9595
[SemanticInternalAttributes.RUN_ID]: this.ctx.run.id,
9696
[SemanticInternalAttributes.RUN_IS_TEST]: this.ctx.run.isTest,
97+
[SemanticInternalAttributes.RUN_IS_REPLAY]: this.ctx.run.isReplay,
9798
[SemanticInternalAttributes.BATCH_ID]: this.ctx.batch?.id,
9899
[SemanticInternalAttributes.IDEMPOTENCY_KEY]: this.ctx.run.idempotencyKey,
99100
};

0 commit comments

Comments
 (0)