Skip to content

Commit ed84b11

Browse files
committed
Delayed run system
1 parent f46d7f0 commit ed84b11

5 files changed

Lines changed: 187 additions & 138 deletions

File tree

internal-packages/run-engine/src/engine/errors.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,27 @@ export function runStatusFromError(error: TaskRunError): TaskRunStatus {
5656
assertExhaustive(error.code);
5757
}
5858
}
59+
60+
export class ServiceValidationError extends Error {
61+
constructor(
62+
message: string,
63+
public status?: number
64+
) {
65+
super(message);
66+
this.name = "ServiceValidationError";
67+
}
68+
}
69+
70+
export class NotImplementedError extends Error {
71+
constructor(message: string) {
72+
console.error("This isn't implemented", { message });
73+
super(message);
74+
}
75+
}
76+
77+
export class RunDuplicateIdempotencyKeyError extends Error {
78+
constructor(message: string) {
79+
super(message);
80+
this.name = "RunDuplicateIdempotencyKeyError";
81+
}
82+
}

internal-packages/run-engine/src/engine/index.ts

Lines changed: 18 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ import { SystemResources } from "./systems/systems.js";
5252
import { WaitpointSystem } from "./systems/waitpointSystem.js";
5353
import { EngineWorker, HeartbeatTimeouts, RunEngineOptions, TriggerParams } from "./types.js";
5454
import { workerCatalog } from "./workerCatalog.js";
55+
import {
56+
NotImplementedError,
57+
RunDuplicateIdempotencyKeyError,
58+
ServiceValidationError,
59+
} from "./errors.js";
60+
import { DelayedRunSystem } from "./systems/delayedRunSystem.js";
5561

5662
export class RunEngine {
5763
private runLockRedis: Redis;
@@ -75,6 +81,7 @@ export class RunEngine {
7581
batchSystem: BatchSystem;
7682
enqueueSystem: EnqueueSystem;
7783
checkpointSystem: CheckpointSystem;
84+
delayedRunSystem: DelayedRunSystem;
7885

7986
constructor(private readonly options: RunEngineOptions) {
8087
this.prisma = options.prisma;
@@ -159,7 +166,7 @@ export class RunEngine {
159166
});
160167
},
161168
enqueueDelayedRun: async ({ payload }) => {
162-
await this.#enqueueDelayedRun({ runId: payload.runId });
169+
await this.delayedRunSystem.enqueueDelayedRun({ runId: payload.runId });
163170
},
164171
},
165172
}).start();
@@ -248,6 +255,11 @@ export class RunEngine {
248255
executionSnapshotSystem: this.executionSnapshotSystem,
249256
});
250257

258+
this.delayedRunSystem = new DelayedRunSystem({
259+
resources,
260+
enqueueSystem: this.enqueueSystem,
261+
});
262+
251263
this.waitpointSystem = new WaitpointSystem({
252264
resources,
253265
executionSnapshotSystem: this.executionSnapshotSystem,
@@ -789,47 +801,11 @@ export class RunEngine {
789801
delayUntil: Date;
790802
tx?: PrismaClientOrTransaction;
791803
}): Promise<TaskRun> {
792-
const prisma = tx ?? this.prisma;
793-
return startSpan(
794-
this.tracer,
795-
"rescheduleRun",
796-
async () => {
797-
return await this.runLock.lock([runId], 5_000, async () => {
798-
const snapshot = await getLatestExecutionSnapshot(prisma, runId);
799-
800-
//if the run isn't just created then we can't reschedule it
801-
if (snapshot.executionStatus !== "RUN_CREATED") {
802-
throw new ServiceValidationError("Cannot reschedule a run that is not delayed");
803-
}
804-
805-
const updatedRun = await prisma.taskRun.update({
806-
where: {
807-
id: runId,
808-
},
809-
data: {
810-
delayUntil: delayUntil,
811-
executionSnapshots: {
812-
create: {
813-
engine: "V2",
814-
executionStatus: "RUN_CREATED",
815-
description: "Delayed run was rescheduled to a future date",
816-
runStatus: "EXPIRED",
817-
environmentId: snapshot.environmentId,
818-
environmentType: snapshot.environmentType,
819-
},
820-
},
821-
},
822-
});
823-
824-
await this.worker.reschedule(`enqueueDelayedRun:${updatedRun.id}`, delayUntil);
825-
826-
return updatedRun;
827-
});
828-
},
829-
{
830-
attributes: { runId },
831-
}
832-
);
804+
return this.delayedRunSystem.rescheduleDelayedRun({
805+
runId,
806+
delayUntil,
807+
tx,
808+
});
833809
}
834810

835811
async lengthOfEnvQueue(environment: MinimalAuthenticatedEnvironment): Promise<number> {
@@ -1353,53 +1329,6 @@ export class RunEngine {
13531329
});
13541330
}
13551331

1356-
async #enqueueDelayedRun({ runId }: { runId: string }) {
1357-
const run = await this.prisma.taskRun.findFirst({
1358-
where: { id: runId },
1359-
include: {
1360-
runtimeEnvironment: {
1361-
include: {
1362-
project: true,
1363-
organization: true,
1364-
},
1365-
},
1366-
},
1367-
});
1368-
1369-
if (!run) {
1370-
throw new Error(`#enqueueDelayedRun: run not found: ${runId}`);
1371-
}
1372-
1373-
// Now we need to enqueue the run into the RunQueue
1374-
await this.enqueueSystem.enqueueRun({
1375-
run,
1376-
env: run.runtimeEnvironment,
1377-
timestamp: run.createdAt.getTime() - run.priorityMs,
1378-
batchId: run.batchId ?? undefined,
1379-
});
1380-
1381-
await this.prisma.taskRun.update({
1382-
where: { id: runId },
1383-
data: {
1384-
status: "PENDING",
1385-
queuedAt: new Date(),
1386-
},
1387-
});
1388-
1389-
if (run.ttl) {
1390-
const expireAt = parseNaturalLanguageDuration(run.ttl);
1391-
1392-
if (expireAt) {
1393-
await this.worker.enqueue({
1394-
id: `expireRun:${runId}`,
1395-
job: "expireRun",
1396-
payload: { runId },
1397-
availableAt: expireAt,
1398-
});
1399-
}
1400-
}
1401-
}
1402-
14031332
async #queueRunsWaitingForWorker({ backgroundWorkerId }: { backgroundWorkerId: string }) {
14041333
//It could be a lot of runs, so we will process them in a batch
14051334
//if there are still more to process we will enqueue this function again
@@ -1624,27 +1553,3 @@ export class RunEngine {
16241553
return `master-background-worker:${backgroundWorkerId}`;
16251554
}
16261555
}
1627-
1628-
export class ServiceValidationError extends Error {
1629-
constructor(
1630-
message: string,
1631-
public status?: number
1632-
) {
1633-
super(message);
1634-
this.name = "ServiceValidationError";
1635-
}
1636-
}
1637-
1638-
class NotImplementedError extends Error {
1639-
constructor(message: string) {
1640-
console.error("This isn't implemented", { message });
1641-
super(message);
1642-
}
1643-
}
1644-
1645-
export class RunDuplicateIdempotencyKeyError extends Error {
1646-
constructor(message: string) {
1647-
super(message);
1648-
this.name = "RunDuplicateIdempotencyKeyError";
1649-
}
1650-
}

internal-packages/run-engine/src/engine/systems/checkpointSystem.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@ import { CheckpointInput, CreateCheckpointResult, ExecutionResult } from "@trigg
22
import { CheckpointId } from "@trigger.dev/core/v3/isomorphic";
33
import { PrismaClientOrTransaction } from "@trigger.dev/database";
44
import { sendNotificationToWorker } from "../eventBus.js";
5-
import { ServiceValidationError } from "../index.js";
65
import { isCheckpointable, isPendingExecuting } from "../statuses.js";
76
import {
87
getLatestExecutionSnapshot,
98
executionResultFromSnapshot,
109
ExecutionSnapshotSystem,
1110
} from "./executionSnapshotSystem.js";
1211
import { SystemResources } from "./systems.js";
12+
import { ServiceValidationError } from "../errors.js";
1313

1414
export type CheckpointSystemOptions = {
1515
resources: SystemResources;
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
import { startSpan } from "@internal/tracing";
2+
import { SystemResources } from "./systems.js";
3+
import { PrismaClientOrTransaction, TaskRun } from "@trigger.dev/database";
4+
import { getLatestExecutionSnapshot } from "./executionSnapshotSystem.js";
5+
import { parseNaturalLanguageDuration } from "@trigger.dev/core/v3/isomorphic";
6+
import { EnqueueSystem } from "./enqueueSystem.js";
7+
import { ServiceValidationError } from "../errors.js";
8+
9+
export type DelayedRunSystemOptions = {
10+
resources: SystemResources;
11+
enqueueSystem: EnqueueSystem;
12+
};
13+
14+
export class DelayedRunSystem {
15+
private readonly $: SystemResources;
16+
private readonly enqueueSystem: EnqueueSystem;
17+
18+
constructor(private readonly options: DelayedRunSystemOptions) {
19+
this.$ = options.resources;
20+
this.enqueueSystem = options.enqueueSystem;
21+
}
22+
23+
/**
24+
* Reschedules a delayed run where the run hasn't been queued yet
25+
*/
26+
async rescheduleDelayedRun({
27+
runId,
28+
delayUntil,
29+
tx,
30+
}: {
31+
runId: string;
32+
delayUntil: Date;
33+
tx?: PrismaClientOrTransaction;
34+
}): Promise<TaskRun> {
35+
const prisma = tx ?? this.$.prisma;
36+
return startSpan(
37+
this.$.tracer,
38+
"rescheduleDelayedRun",
39+
async () => {
40+
return await this.$.runLock.lock([runId], 5_000, async () => {
41+
const snapshot = await getLatestExecutionSnapshot(prisma, runId);
42+
43+
//if the run isn't just created then we can't reschedule it
44+
if (snapshot.executionStatus !== "RUN_CREATED") {
45+
throw new ServiceValidationError("Cannot reschedule a run that is not delayed");
46+
}
47+
48+
const updatedRun = await prisma.taskRun.update({
49+
where: {
50+
id: runId,
51+
},
52+
data: {
53+
delayUntil: delayUntil,
54+
executionSnapshots: {
55+
create: {
56+
engine: "V2",
57+
executionStatus: "RUN_CREATED",
58+
description: "Delayed run was rescheduled to a future date",
59+
runStatus: "EXPIRED",
60+
environmentId: snapshot.environmentId,
61+
environmentType: snapshot.environmentType,
62+
},
63+
},
64+
},
65+
});
66+
67+
await this.$.worker.reschedule(`enqueueDelayedRun:${updatedRun.id}`, delayUntil);
68+
69+
return updatedRun;
70+
});
71+
},
72+
{
73+
attributes: { runId },
74+
}
75+
);
76+
}
77+
78+
async enqueueDelayedRun({ runId }: { runId: string }) {
79+
const run = await this.$.prisma.taskRun.findFirst({
80+
where: { id: runId },
81+
include: {
82+
runtimeEnvironment: {
83+
include: {
84+
project: true,
85+
organization: true,
86+
},
87+
},
88+
},
89+
});
90+
91+
if (!run) {
92+
throw new Error(`#enqueueDelayedRun: run not found: ${runId}`);
93+
}
94+
95+
// Now we need to enqueue the run into the RunQueue
96+
await this.enqueueSystem.enqueueRun({
97+
run,
98+
env: run.runtimeEnvironment,
99+
timestamp: run.createdAt.getTime() - run.priorityMs,
100+
batchId: run.batchId ?? undefined,
101+
});
102+
103+
await this.$.prisma.taskRun.update({
104+
where: { id: runId },
105+
data: {
106+
status: "PENDING",
107+
queuedAt: new Date(),
108+
},
109+
});
110+
111+
if (run.ttl) {
112+
const expireAt = parseNaturalLanguageDuration(run.ttl);
113+
114+
if (expireAt) {
115+
await this.$.worker.enqueue({
116+
id: `expireRun:${runId}`,
117+
job: "expireRun",
118+
payload: { runId },
119+
availableAt: expireAt,
120+
});
121+
}
122+
}
123+
}
124+
}

0 commit comments

Comments
 (0)