Skip to content

Commit 51f02ac

Browse files
committed
ttl system
1 parent ed84b11 commit 51f02ac

3 files changed

Lines changed: 161 additions & 126 deletions

File tree

internal-packages/run-engine/src/engine/index.ts

Lines changed: 22 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,9 @@ import {
1111
MachineResources,
1212
RunExecutionData,
1313
StartRunAttemptResult,
14-
TaskRunError,
1514
TaskRunExecutionResult,
1615
} from "@trigger.dev/core/v3";
17-
import {
18-
BatchId,
19-
parseNaturalLanguageDuration,
20-
QueueId,
21-
RunId,
22-
WaitpointId,
23-
} from "@trigger.dev/core/v3/isomorphic";
16+
import { BatchId, QueueId, RunId, WaitpointId } from "@trigger.dev/core/v3/isomorphic";
2417
import {
2518
Prisma,
2619
PrismaClient,
@@ -35,12 +28,18 @@ import { FairQueueSelectionStrategy } from "../run-queue/fairQueueSelectionStrat
3528
import { RunQueue } from "../run-queue/index.js";
3629
import { RunQueueFullKeyProducer } from "../run-queue/keyProducer.js";
3730
import { MinimalAuthenticatedEnvironment } from "../shared/index.js";
38-
import { EventBus, EventBusEvents, sendNotificationToWorker } from "./eventBus.js";
31+
import {
32+
NotImplementedError,
33+
RunDuplicateIdempotencyKeyError,
34+
ServiceValidationError,
35+
} from "./errors.js";
36+
import { EventBus, EventBusEvents } from "./eventBus.js";
3937
import { RunLocker } from "./locking.js";
4038
import { ReleaseConcurrencyTokenBucketQueue } from "./releaseConcurrencyTokenBucketQueue.js";
41-
import { canReleaseConcurrency, isExecuting } from "./statuses.js";
39+
import { canReleaseConcurrency } from "./statuses.js";
4240
import { BatchSystem } from "./systems/batchSystem.js";
4341
import { CheckpointSystem } from "./systems/checkpointSystem.js";
42+
import { DelayedRunSystem } from "./systems/delayedRunSystem.js";
4443
import { DequeueSystem } from "./systems/dequeueSystem.js";
4544
import { EnqueueSystem } from "./systems/enqueueSystem.js";
4645
import {
@@ -49,15 +48,10 @@ import {
4948
} from "./systems/executionSnapshotSystem.js";
5049
import { RunAttemptSystem } from "./systems/runAttemptSystem.js";
5150
import { SystemResources } from "./systems/systems.js";
51+
import { TtlSystem } from "./systems/ttlSystem.js";
5252
import { WaitpointSystem } from "./systems/waitpointSystem.js";
5353
import { EngineWorker, HeartbeatTimeouts, RunEngineOptions, TriggerParams } from "./types.js";
5454
import { workerCatalog } from "./workerCatalog.js";
55-
import {
56-
NotImplementedError,
57-
RunDuplicateIdempotencyKeyError,
58-
ServiceValidationError,
59-
} from "./errors.js";
60-
import { DelayedRunSystem } from "./systems/delayedRunSystem.js";
6155

6256
export class RunEngine {
6357
private runLockRedis: Redis;
@@ -82,6 +76,7 @@ export class RunEngine {
8276
enqueueSystem: EnqueueSystem;
8377
checkpointSystem: CheckpointSystem;
8478
delayedRunSystem: DelayedRunSystem;
79+
ttlSystem: TtlSystem;
8580

8681
constructor(private readonly options: RunEngineOptions) {
8782
this.prisma = options.prisma;
@@ -131,7 +126,7 @@ export class RunEngine {
131126
logger: new Logger("RunEngineWorker", "debug"),
132127
jobs: {
133128
finishWaitpoint: async ({ payload }) => {
134-
await this.completeWaitpoint({
129+
await this.waitpointSystem.completeWaitpoint({
135130
id: payload.waitpointId,
136131
output: payload.error
137132
? {
@@ -145,7 +140,7 @@ export class RunEngine {
145140
await this.#handleStalledSnapshot(payload);
146141
},
147142
expireRun: async ({ payload }) => {
148-
await this.#expireRun({ runId: payload.runId });
143+
await this.ttlSystem.expireRun({ runId: payload.runId });
149144
},
150145
cancelRun: async ({ payload }) => {
151146
await this.runAttemptSystem.cancelRun({
@@ -266,6 +261,11 @@ export class RunEngine {
266261
enqueueSystem: this.enqueueSystem,
267262
});
268263

264+
this.ttlSystem = new TtlSystem({
265+
resources,
266+
waitpointSystem: this.waitpointSystem,
267+
});
268+
269269
this.batchSystem = new BatchSystem({
270270
resources,
271271
});
@@ -554,11 +554,9 @@ export class RunEngine {
554554

555555
if (taskRun.delayUntil) {
556556
// Schedule the run to be enqueued at the delayUntil time
557-
await this.worker.enqueue({
558-
id: `enqueueDelayedRun:${taskRun.id}`,
559-
job: "enqueueDelayedRun",
560-
payload: { runId: taskRun.id },
561-
availableAt: taskRun.delayUntil,
557+
await this.delayedRunSystem.scheduleDelayedRunEnqueuing({
558+
runId: taskRun.id,
559+
delayUntil: taskRun.delayUntil,
562560
});
563561
} else {
564562
await this.enqueueSystem.enqueueRun({
@@ -571,16 +569,7 @@ export class RunEngine {
571569
});
572570

573571
if (taskRun.ttl) {
574-
const expireAt = parseNaturalLanguageDuration(taskRun.ttl);
575-
576-
if (expireAt) {
577-
await this.worker.enqueue({
578-
id: `expireRun:${taskRun.id}`,
579-
job: "expireRun",
580-
payload: { runId: taskRun.id },
581-
availableAt: expireAt,
582-
});
583-
}
572+
await this.ttlSystem.scheduleExpireRun({ runId: taskRun.id, ttl: taskRun.ttl });
584573
}
585574
}
586575
});
@@ -1236,99 +1225,6 @@ export class RunEngine {
12361225
}
12371226
}
12381227

1239-
async #expireRun({ runId, tx }: { runId: string; tx?: PrismaClientOrTransaction }) {
1240-
const prisma = tx ?? this.prisma;
1241-
await this.runLock.lock([runId], 5_000, async () => {
1242-
const snapshot = await getLatestExecutionSnapshot(prisma, runId);
1243-
1244-
//if we're executing then we won't expire the run
1245-
if (isExecuting(snapshot.executionStatus)) {
1246-
return;
1247-
}
1248-
1249-
//only expire "PENDING" runs
1250-
const run = await prisma.taskRun.findUnique({ where: { id: runId } });
1251-
1252-
if (!run) {
1253-
this.logger.debug("Could not find enqueued run to expire", {
1254-
runId,
1255-
});
1256-
return;
1257-
}
1258-
1259-
if (run.status !== "PENDING") {
1260-
this.logger.debug("Run cannot be expired because it's not in PENDING status", {
1261-
run,
1262-
});
1263-
return;
1264-
}
1265-
1266-
if (run.lockedAt) {
1267-
this.logger.debug("Run cannot be expired because it's locked, so will run", {
1268-
run,
1269-
});
1270-
return;
1271-
}
1272-
1273-
const error: TaskRunError = {
1274-
type: "STRING_ERROR",
1275-
raw: `Run expired because the TTL (${run.ttl}) was reached`,
1276-
};
1277-
1278-
const updatedRun = await prisma.taskRun.update({
1279-
where: { id: runId },
1280-
data: {
1281-
status: "EXPIRED",
1282-
completedAt: new Date(),
1283-
expiredAt: new Date(),
1284-
error,
1285-
executionSnapshots: {
1286-
create: {
1287-
engine: "V2",
1288-
executionStatus: "FINISHED",
1289-
description: "Run was expired because the TTL was reached",
1290-
runStatus: "EXPIRED",
1291-
environmentId: snapshot.environmentId,
1292-
environmentType: snapshot.environmentType,
1293-
},
1294-
},
1295-
},
1296-
select: {
1297-
id: true,
1298-
spanId: true,
1299-
ttl: true,
1300-
associatedWaitpoint: {
1301-
select: {
1302-
id: true,
1303-
},
1304-
},
1305-
runtimeEnvironment: {
1306-
select: {
1307-
organizationId: true,
1308-
},
1309-
},
1310-
createdAt: true,
1311-
completedAt: true,
1312-
taskEventStore: true,
1313-
parentTaskRunId: true,
1314-
},
1315-
});
1316-
1317-
await this.runQueue.acknowledgeMessage(updatedRun.runtimeEnvironment.organizationId, runId);
1318-
1319-
if (!updatedRun.associatedWaitpoint) {
1320-
throw new ServiceValidationError("No associated waitpoint found", 400);
1321-
}
1322-
1323-
await this.completeWaitpoint({
1324-
id: updatedRun.associatedWaitpoint.id,
1325-
output: { value: JSON.stringify(error), isError: true },
1326-
});
1327-
1328-
this.eventBus.emit("runExpired", { run: updatedRun, time: new Date() });
1329-
});
1330-
}
1331-
13321228
async #queueRunsWaitingForWorker({ backgroundWorkerId }: { backgroundWorkerId: string }) {
13331229
//It could be a lot of runs, so we will process them in a batch
13341230
//if there are still more to process we will enqueue this function again

internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,4 +121,13 @@ export class DelayedRunSystem {
121121
}
122122
}
123123
}
124+
125+
async scheduleDelayedRunEnqueuing({ runId, delayUntil }: { runId: string; delayUntil: Date }) {
126+
await this.$.worker.enqueue({
127+
id: `enqueueDelayedRun:${runId}`,
128+
job: "enqueueDelayedRun",
129+
payload: { runId },
130+
availableAt: delayUntil,
131+
});
132+
}
124133
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
import { startSpan } from "@internal/tracing";
2+
import { SystemResources } from "./systems.js";
3+
import { PrismaClientOrTransaction, TaskRun } from "@trigger.dev/database";
4+
import { getLatestExecutionSnapshot } from "./executionSnapshotSystem.js";
5+
import { parseNaturalLanguageDuration } from "@trigger.dev/core/v3/isomorphic";
6+
import { ServiceValidationError } from "../errors.js";
7+
import { isExecuting } from "../statuses.js";
8+
import { TaskRunError } from "@trigger.dev/core/v3/schemas";
9+
import { WaitpointSystem } from "./waitpointSystem.js";
10+
11+
export type TtlSystemOptions = {
12+
resources: SystemResources;
13+
waitpointSystem: WaitpointSystem;
14+
};
15+
16+
export class TtlSystem {
17+
private readonly $: SystemResources;
18+
private readonly waitpointSystem: WaitpointSystem;
19+
20+
constructor(private readonly options: TtlSystemOptions) {
21+
this.$ = options.resources;
22+
this.waitpointSystem = options.waitpointSystem;
23+
}
24+
25+
async expireRun({ runId, tx }: { runId: string; tx?: PrismaClientOrTransaction }) {
26+
const prisma = tx ?? this.$.prisma;
27+
await this.$.runLock.lock([runId], 5_000, async () => {
28+
const snapshot = await getLatestExecutionSnapshot(prisma, runId);
29+
30+
//if we're executing then we won't expire the run
31+
if (isExecuting(snapshot.executionStatus)) {
32+
return;
33+
}
34+
35+
//only expire "PENDING" runs
36+
const run = await prisma.taskRun.findUnique({ where: { id: runId } });
37+
38+
if (!run) {
39+
this.$.logger.debug("Could not find enqueued run to expire", {
40+
runId,
41+
});
42+
return;
43+
}
44+
45+
if (run.status !== "PENDING") {
46+
this.$.logger.debug("Run cannot be expired because it's not in PENDING status", {
47+
run,
48+
});
49+
return;
50+
}
51+
52+
if (run.lockedAt) {
53+
this.$.logger.debug("Run cannot be expired because it's locked, so will run", {
54+
run,
55+
});
56+
return;
57+
}
58+
59+
const error: TaskRunError = {
60+
type: "STRING_ERROR",
61+
raw: `Run expired because the TTL (${run.ttl}) was reached`,
62+
};
63+
64+
const updatedRun = await prisma.taskRun.update({
65+
where: { id: runId },
66+
data: {
67+
status: "EXPIRED",
68+
completedAt: new Date(),
69+
expiredAt: new Date(),
70+
error,
71+
executionSnapshots: {
72+
create: {
73+
engine: "V2",
74+
executionStatus: "FINISHED",
75+
description: "Run was expired because the TTL was reached",
76+
runStatus: "EXPIRED",
77+
environmentId: snapshot.environmentId,
78+
environmentType: snapshot.environmentType,
79+
},
80+
},
81+
},
82+
select: {
83+
id: true,
84+
spanId: true,
85+
ttl: true,
86+
associatedWaitpoint: {
87+
select: {
88+
id: true,
89+
},
90+
},
91+
runtimeEnvironment: {
92+
select: {
93+
organizationId: true,
94+
},
95+
},
96+
createdAt: true,
97+
completedAt: true,
98+
taskEventStore: true,
99+
parentTaskRunId: true,
100+
},
101+
});
102+
103+
await this.$.runQueue.acknowledgeMessage(updatedRun.runtimeEnvironment.organizationId, runId);
104+
105+
if (!updatedRun.associatedWaitpoint) {
106+
throw new ServiceValidationError("No associated waitpoint found", 400);
107+
}
108+
109+
await this.waitpointSystem.completeWaitpoint({
110+
id: updatedRun.associatedWaitpoint.id,
111+
output: { value: JSON.stringify(error), isError: true },
112+
});
113+
114+
this.$.eventBus.emit("runExpired", { run: updatedRun, time: new Date() });
115+
});
116+
}
117+
118+
async scheduleExpireRun({ runId, ttl }: { runId: string; ttl: string }) {
119+
const expireAt = parseNaturalLanguageDuration(ttl);
120+
121+
if (expireAt) {
122+
await this.$.worker.enqueue({
123+
id: `expireRun:${runId}`,
124+
job: "expireRun",
125+
payload: { runId },
126+
availableAt: expireAt,
127+
});
128+
}
129+
}
130+
}

0 commit comments

Comments
 (0)