Skip to content

Commit ba67b53

Browse files
committed
WIP run engine systems
1 parent b59f200 commit ba67b53

11 files changed

Lines changed: 2698 additions & 2065 deletions

File tree

internal-packages/run-engine/README.md

Lines changed: 116 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ It is responsible for:
2424
Many operations on the run are "atomic" in the sense that only a single operation can mutate them at a time. We use RedLock to create a distributed lock to ensure this. Postgres locking is not enough on its own because we have multiple API instances and Redis is used for the queue.
2525

2626
There are race conditions we need to deal with:
27+
2728
- When checkpointing the run continues to execute until the checkpoint has been stored. At the same time the run continues and the checkpoint can become irrelevant if the waitpoint is completed. Both can happen at the same time, so we must lock the run and protect against outdated checkpoints.
2829

2930
## Run execution
@@ -41,6 +42,7 @@ We can also store invalid states by setting an error. These invalid states are p
4142
## Workers
4243

4344
A worker is a server that runs tasks. There are two types of workers:
45+
4446
- Hosted workers (serverless, managed and cloud-only)
4547
- Self-hosted workers
4648

@@ -67,6 +69,7 @@ If there's only a `workerGroup`, we can just `dequeueFromMasterQueue()` to get r
6769
This is a fair multi-tenant queue. It is designed to fairly select runs, respect concurrency limits, and have high throughput. It provides visibility into the current concurrency for the env, org, etc.
6870

6971
It has built-in reliability features:
72+
7073
- When nacking we increment the `attempt` and if it continually fails we will move it to a Dead Letter Queue (DLQ).
7174
- If a run is in the DLQ you can redrive it.
7275

@@ -87,30 +90,34 @@ A single Waitpoint can block many runs, the same waitpoint can only block a run
8790
They can have output data associated with them, e.g. the finished run payload. That includes an error, e.g. a failed run.
8891

8992
There are currently three types:
90-
- `RUN` which gets completed when the associated run completes. Every run has an `associatedWaitpoint` that matches the lifetime of the run.
91-
- `DATETIME` which gets completed when the datetime is reached.
92-
- `MANUAL` which gets completed when that event occurs.
93+
94+
- `RUN` which gets completed when the associated run completes. Every run has an `associatedWaitpoint` that matches the lifetime of the run.
95+
- `DATETIME` which gets completed when the datetime is reached.
96+
- `MANUAL` which gets completed when that event occurs.
9397

9498
Waitpoints can have an idempotencyKey which allows stops them from being created multiple times. This is especially useful for event waitpoints, where you don't want to create a new waitpoint for the same event twice.
9599

96100
### `wait.for()` or `wait.until()`
101+
97102
Wait for a future time, then continue. We should add the option to pass an `idempotencyKey` so a second attempt doesn't wait again. By default it would wait again.
98103

99104
```ts
100105
//Note if the idempotency key is a string, it will get prefixed with the run id.
101106
//you can explicitly pass in an idempotency key created with the the global scope.
102-
await wait.until(new Date('2022-01-01T00:00:00Z'), { idempotencyKey: "first-wait" });
103-
await wait.until(new Date('2022-01-01T00:00:00Z'), { idempotencyKey: "second-wait" });
107+
await wait.until(new Date("2022-01-01T00:00:00Z"), { idempotencyKey: "first-wait" });
108+
await wait.until(new Date("2022-01-01T00:00:00Z"), { idempotencyKey: "second-wait" });
104109
```
105110

106111
### `triggerAndWait()` or `batchTriggerAndWait()`
112+
107113
Trigger and then wait for run(s) to finish. If the run fails it will still continue but with the errors so the developer can decide what to do.
108114

109115
### The `trigger` `delay` option
110116

111117
When triggering a run and passing the `delay` option, we use a `DATETIME` waitpoint to block the run from starting.
112118

113119
### `wait.forRequest()`
120+
114121
Wait until a request has been received at the URL that you are given. This is useful for pausing a run and then continuing it again when some external event occurs on another service. For example, Replicate have an API where they will callback when their work is complete.
115122

116123
### `wait.forWaitpoint(waitpointId)`
@@ -155,6 +162,7 @@ When `trigger` is called the run is added to the queue. We only dequeue when the
155162
When `trigger` is called, we check if the rate limit has been exceeded. If it has then we ignore the trigger. The run is thrown away and an appropriate error is returned.
156163

157164
This is useful:
165+
158166
- To prevent abuse.
159167
- To control how many executions a user can do (using a `key` with rate limiting).
160168

@@ -163,6 +171,7 @@ This is useful:
163171
When `trigger` is called, we prevent too many runs happening in a period by collapsing into a single run. This is done by discarding some runs in a period.
164172

165173
This is useful:
174+
166175
- To prevent too many runs happening in a short period.
167176

168177
We should mark the run as `"DELAYED"` with the correct `delayUntil` time. This will allow the user to see that the run is delayed and why.
@@ -172,6 +181,7 @@ We should mark the run as `"DELAYED"` with the correct `delayUntil` time. This w
172181
When `trigger` is called the run is added to the queue. We only run them when they don't exceed the limit in that time period, by controlling the timing of when they are dequeued.
173182

174183
This is useful:
184+
175185
- To prevent too many runs happening in a short period.
176186
- To control how many executions a user can do (using a `key` with throttling).
177187
- When you need to execute every run but not too many in a short period, e.g. avoiding rate limits.
@@ -181,9 +191,110 @@ This is useful:
181191
When `trigger` is called, we batch the runs together. This means the payload of the run is an array of items, each being a single payload.
182192

183193
This is useful:
194+
184195
- For performance, as it reduces the number of runs in the system.
185196
- It can be useful when using 3rd party APIs that support batching.
186197

187198
## Emitting events
188199

189200
The Run Engine emits events using its `eventBus`. This is used for runs completing, failing, or things that any workers should be aware of.
201+
202+
# RunEngine System Architecture
203+
204+
The RunEngine is composed of several specialized systems that handle different aspects of task execution and management. Below is a diagram showing the relationships between these systems.
205+
206+
```mermaid
207+
graph TD
208+
RE[RunEngine]
209+
DS[DequeueSystem]
210+
RAS[RunAttemptSystem]
211+
ESS[ExecutionSnapshotSystem]
212+
WS[WaitpointSystem]
213+
BS[BatchSystem]
214+
215+
%% Core Dependencies
216+
RE --> DS
217+
RE --> RAS
218+
RE --> ESS
219+
RE --> WS
220+
RE --> BS
221+
222+
%% System Dependencies
223+
DS --> ESS
224+
DS --> RAS
225+
226+
RAS --> ESS
227+
RAS --> WS
228+
RAS --> BS
229+
230+
%% Shared Resources
231+
subgraph Resources
232+
PRI[(Prisma)]
233+
LOG[Logger]
234+
TRC[Tracer]
235+
RQ[RunQueue]
236+
RL[RunLocker]
237+
EB[EventBus]
238+
WRK[Worker]
239+
end
240+
241+
%% Resource Dependencies
242+
RE -.-> Resources
243+
DS -.-> PRI & LOG & TRC & RQ & RL
244+
RAS -.-> PRI & LOG & TRC & RL & EB & RQ & WRK
245+
ESS -.-> PRI & LOG & TRC & WRK & EB
246+
WS -.-> PRI & LOG & TRC & WRK & EB
247+
BS -.-> PRI & LOG & TRC & WRK
248+
```
249+
250+
## System Responsibilities
251+
252+
### DequeueSystem
253+
254+
- Handles dequeuing of tasks from master queues
255+
- Manages resource allocation and constraints
256+
- Handles task deployment verification
257+
258+
### RunAttemptSystem
259+
260+
- Manages run attempt lifecycle
261+
- Handles success/failure scenarios
262+
- Manages retries and cancellations
263+
- Coordinates with other systems for run completion
264+
265+
### ExecutionSnapshotSystem
266+
267+
- Creates and manages execution snapshots
268+
- Tracks run state and progress
269+
- Manages heartbeats for active runs
270+
- Maintains execution history
271+
272+
### WaitpointSystem
273+
274+
- Manages waitpoints for task synchronization
275+
- Handles waitpoint completion
276+
- Coordinates blocked runs
277+
278+
### BatchSystem
279+
280+
- Manages batch operations
281+
- Handles batch completion
282+
- Coordinates batch-related task runs
283+
284+
## Shared Resources
285+
286+
- **Prisma**: Database access
287+
- **Logger**: Logging functionality
288+
- **Tracer**: Tracing and monitoring
289+
- **RunQueue**: Task queue management
290+
- **RunLocker**: Run locking mechanism
291+
- **EventBus**: Event communication
292+
- **Worker**: Background task execution
293+
294+
## Key Interactions
295+
296+
1. **RunEngine** orchestrates all systems and holds shared resources
297+
2. **DequeueSystem** works closely with **RunAttemptSystem** for task execution
298+
3. **RunAttemptSystem** coordinates with **WaitpointSystem** and **BatchSystem** for run completion
299+
4. **ExecutionSnapshotSystem** is used by all other systems to track state
300+
5. All systems share common resources but have specific responsibilities

internal-packages/run-engine/src/engine/eventBus.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { TaskRunExecutionStatus, TaskRunStatus } from "@trigger.dev/database";
22
import { AuthenticatedEnvironment } from "../shared/index.js";
33
import { FlushedRunMetadata, TaskRunError } from "@trigger.dev/core/v3";
4+
import { EventEmitter } from "events";
45

56
export type EventBusEvents = {
67
runAttemptStarted: [
@@ -178,3 +179,33 @@ export type EventBusEvents = {
178179
};
179180

180181
export type EventBusEventArgs<T extends keyof EventBusEvents> = EventBusEvents[T];
182+
183+
export type EventBus = EventEmitter<EventBusEvents>;
184+
185+
/**
186+
* Sends a notification that a run has changed and we need to fetch the latest run state.
187+
* The worker will call `getRunExecutionData` via the API and act accordingly.
188+
*/
189+
export async function sendNotificationToWorker({
190+
runId,
191+
snapshot,
192+
eventBus,
193+
}: {
194+
runId: string;
195+
snapshot: {
196+
id: string;
197+
executionStatus: TaskRunExecutionStatus;
198+
};
199+
eventBus: EventBus;
200+
}) {
201+
eventBus.emit("workerNotification", {
202+
time: new Date(),
203+
run: {
204+
id: runId,
205+
},
206+
snapshot: {
207+
id: snapshot.id,
208+
executionStatus: snapshot.executionStatus,
209+
},
210+
});
211+
}

internal-packages/run-engine/src/engine/executionSnapshots.ts

Lines changed: 0 additions & 131 deletions
This file was deleted.

0 commit comments

Comments
 (0)