Skip to content

Commit 363f066

Browse files
committed
Delayed run system
1 parent 63b43ff commit 363f066

5 files changed

Lines changed: 187 additions & 138 deletions

File tree

internal-packages/run-engine/src/engine/errors.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,27 @@ export function runStatusFromError(error: TaskRunError): TaskRunStatus {
5656
assertExhaustive(error.code);
5757
}
5858
}
59+
60+
export class ServiceValidationError extends Error {
61+
constructor(
62+
message: string,
63+
public status?: number
64+
) {
65+
super(message);
66+
this.name = "ServiceValidationError";
67+
}
68+
}
69+
70+
export class NotImplementedError extends Error {
71+
constructor(message: string) {
72+
console.error("This isn't implemented", { message });
73+
super(message);
74+
}
75+
}
76+
77+
export class RunDuplicateIdempotencyKeyError extends Error {
78+
constructor(message: string) {
79+
super(message);
80+
this.name = "RunDuplicateIdempotencyKeyError";
81+
}
82+
}

internal-packages/run-engine/src/engine/index.ts

Lines changed: 18 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ import { SystemResources } from "./systems/systems.js";
5252
import { WaitpointSystem } from "./systems/waitpointSystem.js";
5353
import { EngineWorker, HeartbeatTimeouts, RunEngineOptions, TriggerParams } from "./types.js";
5454
import { workerCatalog } from "./workerCatalog.js";
55+
import {
56+
NotImplementedError,
57+
RunDuplicateIdempotencyKeyError,
58+
ServiceValidationError,
59+
} from "./errors.js";
60+
import { DelayedRunSystem } from "./systems/delayedRunSystem.js";
5561

5662
export class RunEngine {
5763
private runLockRedis: Redis;
@@ -75,6 +81,7 @@ export class RunEngine {
7581
batchSystem: BatchSystem;
7682
enqueueSystem: EnqueueSystem;
7783
checkpointSystem: CheckpointSystem;
84+
delayedRunSystem: DelayedRunSystem;
7885

7986
constructor(private readonly options: RunEngineOptions) {
8087
this.prisma = options.prisma;
@@ -159,7 +166,7 @@ export class RunEngine {
159166
});
160167
},
161168
enqueueDelayedRun: async ({ payload }) => {
162-
await this.#enqueueDelayedRun({ runId: payload.runId });
169+
await this.delayedRunSystem.enqueueDelayedRun({ runId: payload.runId });
163170
},
164171
},
165172
}).start();
@@ -248,6 +255,11 @@ export class RunEngine {
248255
executionSnapshotSystem: this.executionSnapshotSystem,
249256
});
250257

258+
this.delayedRunSystem = new DelayedRunSystem({
259+
resources,
260+
enqueueSystem: this.enqueueSystem,
261+
});
262+
251263
this.waitpointSystem = new WaitpointSystem({
252264
resources,
253265
executionSnapshotSystem: this.executionSnapshotSystem,
@@ -789,47 +801,11 @@ export class RunEngine {
789801
delayUntil: Date;
790802
tx?: PrismaClientOrTransaction;
791803
}): Promise<TaskRun> {
792-
const prisma = tx ?? this.prisma;
793-
return startSpan(
794-
this.tracer,
795-
"rescheduleRun",
796-
async () => {
797-
return await this.runLock.lock([runId], 5_000, async () => {
798-
const snapshot = await getLatestExecutionSnapshot(prisma, runId);
799-
800-
//if the run isn't just created then we can't reschedule it
801-
if (snapshot.executionStatus !== "RUN_CREATED") {
802-
throw new ServiceValidationError("Cannot reschedule a run that is not delayed");
803-
}
804-
805-
const updatedRun = await prisma.taskRun.update({
806-
where: {
807-
id: runId,
808-
},
809-
data: {
810-
delayUntil: delayUntil,
811-
executionSnapshots: {
812-
create: {
813-
engine: "V2",
814-
executionStatus: "RUN_CREATED",
815-
description: "Delayed run was rescheduled to a future date",
816-
runStatus: "EXPIRED",
817-
environmentId: snapshot.environmentId,
818-
environmentType: snapshot.environmentType,
819-
},
820-
},
821-
},
822-
});
823-
824-
await this.worker.reschedule(`enqueueDelayedRun:${updatedRun.id}`, delayUntil);
825-
826-
return updatedRun;
827-
});
828-
},
829-
{
830-
attributes: { runId },
831-
}
832-
);
804+
return this.delayedRunSystem.rescheduleDelayedRun({
805+
runId,
806+
delayUntil,
807+
tx,
808+
});
833809
}
834810

835811
async lengthOfEnvQueue(environment: MinimalAuthenticatedEnvironment): Promise<number> {
@@ -1365,53 +1341,6 @@ export class RunEngine {
13651341
});
13661342
}
13671343

1368-
async #enqueueDelayedRun({ runId }: { runId: string }) {
1369-
const run = await this.prisma.taskRun.findFirst({
1370-
where: { id: runId },
1371-
include: {
1372-
runtimeEnvironment: {
1373-
include: {
1374-
project: true,
1375-
organization: true,
1376-
},
1377-
},
1378-
},
1379-
});
1380-
1381-
if (!run) {
1382-
throw new Error(`#enqueueDelayedRun: run not found: ${runId}`);
1383-
}
1384-
1385-
// Now we need to enqueue the run into the RunQueue
1386-
await this.enqueueSystem.enqueueRun({
1387-
run,
1388-
env: run.runtimeEnvironment,
1389-
timestamp: run.createdAt.getTime() - run.priorityMs,
1390-
batchId: run.batchId ?? undefined,
1391-
});
1392-
1393-
await this.prisma.taskRun.update({
1394-
where: { id: runId },
1395-
data: {
1396-
status: "PENDING",
1397-
queuedAt: new Date(),
1398-
},
1399-
});
1400-
1401-
if (run.ttl) {
1402-
const expireAt = parseNaturalLanguageDuration(run.ttl);
1403-
1404-
if (expireAt) {
1405-
await this.worker.enqueue({
1406-
id: `expireRun:${runId}`,
1407-
job: "expireRun",
1408-
payload: { runId },
1409-
availableAt: expireAt,
1410-
});
1411-
}
1412-
}
1413-
}
1414-
14151344
async #queueRunsWaitingForWorker({ backgroundWorkerId }: { backgroundWorkerId: string }) {
14161345
//It could be a lot of runs, so we will process them in a batch
14171346
//if there are still more to process we will enqueue this function again
@@ -1636,27 +1565,3 @@ export class RunEngine {
16361565
return `master-background-worker:${backgroundWorkerId}`;
16371566
}
16381567
}
1639-
1640-
export class ServiceValidationError extends Error {
1641-
constructor(
1642-
message: string,
1643-
public status?: number
1644-
) {
1645-
super(message);
1646-
this.name = "ServiceValidationError";
1647-
}
1648-
}
1649-
1650-
class NotImplementedError extends Error {
1651-
constructor(message: string) {
1652-
console.error("This isn't implemented", { message });
1653-
super(message);
1654-
}
1655-
}
1656-
1657-
export class RunDuplicateIdempotencyKeyError extends Error {
1658-
constructor(message: string) {
1659-
super(message);
1660-
this.name = "RunDuplicateIdempotencyKeyError";
1661-
}
1662-
}

internal-packages/run-engine/src/engine/systems/checkpointSystem.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@ import { CheckpointInput, CreateCheckpointResult, ExecutionResult } from "@trigg
22
import { CheckpointId } from "@trigger.dev/core/v3/isomorphic";
33
import { PrismaClientOrTransaction } from "@trigger.dev/database";
44
import { sendNotificationToWorker } from "../eventBus.js";
5-
import { ServiceValidationError } from "../index.js";
65
import { isCheckpointable, isPendingExecuting } from "../statuses.js";
76
import {
87
getLatestExecutionSnapshot,
98
executionResultFromSnapshot,
109
ExecutionSnapshotSystem,
1110
} from "./executionSnapshotSystem.js";
1211
import { SystemResources } from "./systems.js";
12+
import { ServiceValidationError } from "../errors.js";
1313

1414
export type CheckpointSystemOptions = {
1515
resources: SystemResources;
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
import { startSpan } from "@internal/tracing";
2+
import { SystemResources } from "./systems.js";
3+
import { PrismaClientOrTransaction, TaskRun } from "@trigger.dev/database";
4+
import { getLatestExecutionSnapshot } from "./executionSnapshotSystem.js";
5+
import { parseNaturalLanguageDuration } from "@trigger.dev/core/v3/isomorphic";
6+
import { EnqueueSystem } from "./enqueueSystem.js";
7+
import { ServiceValidationError } from "../errors.js";
8+
9+
export type DelayedRunSystemOptions = {
10+
resources: SystemResources;
11+
enqueueSystem: EnqueueSystem;
12+
};
13+
14+
export class DelayedRunSystem {
15+
private readonly $: SystemResources;
16+
private readonly enqueueSystem: EnqueueSystem;
17+
18+
constructor(private readonly options: DelayedRunSystemOptions) {
19+
this.$ = options.resources;
20+
this.enqueueSystem = options.enqueueSystem;
21+
}
22+
23+
/**
24+
* Reschedules a delayed run where the run hasn't been queued yet
25+
*/
26+
async rescheduleDelayedRun({
27+
runId,
28+
delayUntil,
29+
tx,
30+
}: {
31+
runId: string;
32+
delayUntil: Date;
33+
tx?: PrismaClientOrTransaction;
34+
}): Promise<TaskRun> {
35+
const prisma = tx ?? this.$.prisma;
36+
return startSpan(
37+
this.$.tracer,
38+
"rescheduleDelayedRun",
39+
async () => {
40+
return await this.$.runLock.lock([runId], 5_000, async () => {
41+
const snapshot = await getLatestExecutionSnapshot(prisma, runId);
42+
43+
//if the run isn't just created then we can't reschedule it
44+
if (snapshot.executionStatus !== "RUN_CREATED") {
45+
throw new ServiceValidationError("Cannot reschedule a run that is not delayed");
46+
}
47+
48+
const updatedRun = await prisma.taskRun.update({
49+
where: {
50+
id: runId,
51+
},
52+
data: {
53+
delayUntil: delayUntil,
54+
executionSnapshots: {
55+
create: {
56+
engine: "V2",
57+
executionStatus: "RUN_CREATED",
58+
description: "Delayed run was rescheduled to a future date",
59+
runStatus: "EXPIRED",
60+
environmentId: snapshot.environmentId,
61+
environmentType: snapshot.environmentType,
62+
},
63+
},
64+
},
65+
});
66+
67+
await this.$.worker.reschedule(`enqueueDelayedRun:${updatedRun.id}`, delayUntil);
68+
69+
return updatedRun;
70+
});
71+
},
72+
{
73+
attributes: { runId },
74+
}
75+
);
76+
}
77+
78+
async enqueueDelayedRun({ runId }: { runId: string }) {
79+
const run = await this.$.prisma.taskRun.findFirst({
80+
where: { id: runId },
81+
include: {
82+
runtimeEnvironment: {
83+
include: {
84+
project: true,
85+
organization: true,
86+
},
87+
},
88+
},
89+
});
90+
91+
if (!run) {
92+
throw new Error(`#enqueueDelayedRun: run not found: ${runId}`);
93+
}
94+
95+
// Now we need to enqueue the run into the RunQueue
96+
await this.enqueueSystem.enqueueRun({
97+
run,
98+
env: run.runtimeEnvironment,
99+
timestamp: run.createdAt.getTime() - run.priorityMs,
100+
batchId: run.batchId ?? undefined,
101+
});
102+
103+
await this.$.prisma.taskRun.update({
104+
where: { id: runId },
105+
data: {
106+
status: "PENDING",
107+
queuedAt: new Date(),
108+
},
109+
});
110+
111+
if (run.ttl) {
112+
const expireAt = parseNaturalLanguageDuration(run.ttl);
113+
114+
if (expireAt) {
115+
await this.$.worker.enqueue({
116+
id: `expireRun:${runId}`,
117+
job: "expireRun",
118+
payload: { runId },
119+
availableAt: expireAt,
120+
});
121+
}
122+
}
123+
}
124+
}

0 commit comments

Comments
 (0)