Skip to content

Commit ebf0b6b

Browse files
committed
waiting for worker system
1 parent 51f02ac commit ebf0b6b

2 files changed

Lines changed: 114 additions & 77 deletions

File tree

internal-packages/run-engine/src/engine/index.ts

Lines changed: 12 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ import { TtlSystem } from "./systems/ttlSystem.js";
5252
import { WaitpointSystem } from "./systems/waitpointSystem.js";
5353
import { EngineWorker, HeartbeatTimeouts, RunEngineOptions, TriggerParams } from "./types.js";
5454
import { workerCatalog } from "./workerCatalog.js";
55+
import { WaitingForWorkerSystem } from "./systems/waitingForWorkerSystem.js";
5556

5657
export class RunEngine {
5758
private runLockRedis: Redis;
@@ -77,6 +78,7 @@ export class RunEngine {
7778
checkpointSystem: CheckpointSystem;
7879
delayedRunSystem: DelayedRunSystem;
7980
ttlSystem: TtlSystem;
81+
waitingForWorkerSystem: WaitingForWorkerSystem;
8082

8183
constructor(private readonly options: RunEngineOptions) {
8284
this.prisma = options.prisma;
@@ -150,7 +152,9 @@ export class RunEngine {
150152
});
151153
},
152154
queueRunsWaitingForWorker: async ({ payload }) => {
153-
await this.#queueRunsWaitingForWorker({ backgroundWorkerId: payload.backgroundWorkerId });
155+
await this.waitingForWorkerSystem.enqueueRunsWaitingForWorker({
156+
backgroundWorkerId: payload.backgroundWorkerId,
157+
});
154158
},
155159
tryCompleteBatch: async ({ payload }) => {
156160
await this.batchSystem.performCompleteBatch({ batchId: payload.batchId });
@@ -255,6 +259,11 @@ export class RunEngine {
255259
enqueueSystem: this.enqueueSystem,
256260
});
257261

262+
this.waitingForWorkerSystem = new WaitingForWorkerSystem({
263+
resources,
264+
enqueueSystem: this.enqueueSystem,
265+
});
266+
258267
this.waitpointSystem = new WaitpointSystem({
259268
resources,
260269
executionSnapshotSystem: this.executionSnapshotSystem,
@@ -771,10 +780,8 @@ export class RunEngine {
771780
}: {
772781
backgroundWorkerId: string;
773782
}): Promise<void> {
774-
//we want this to happen in the background
775-
await this.worker.enqueue({
776-
job: "queueRunsWaitingForWorker",
777-
payload: { backgroundWorkerId },
783+
return this.waitingForWorkerSystem.enqueueRunsWaitingForWorker({
784+
backgroundWorkerId,
778785
});
779786
}
780787

@@ -1225,78 +1232,6 @@ export class RunEngine {
12251232
}
12261233
}
12271234

1228-
async #queueRunsWaitingForWorker({ backgroundWorkerId }: { backgroundWorkerId: string }) {
1229-
//It could be a lot of runs, so we will process them in a batch
1230-
//if there are still more to process we will enqueue this function again
1231-
const maxCount = this.options.queueRunsWaitingForWorkerBatchSize ?? 200;
1232-
1233-
const backgroundWorker = await this.prisma.backgroundWorker.findFirst({
1234-
where: {
1235-
id: backgroundWorkerId,
1236-
},
1237-
include: {
1238-
runtimeEnvironment: {
1239-
include: {
1240-
project: true,
1241-
organization: true,
1242-
},
1243-
},
1244-
tasks: true,
1245-
},
1246-
});
1247-
1248-
if (!backgroundWorker) {
1249-
this.logger.error("#queueRunsWaitingForWorker: background worker not found", {
1250-
id: backgroundWorkerId,
1251-
});
1252-
return;
1253-
}
1254-
1255-
const runsWaitingForDeploy = await this.prisma.taskRun.findMany({
1256-
where: {
1257-
runtimeEnvironmentId: backgroundWorker.runtimeEnvironmentId,
1258-
projectId: backgroundWorker.projectId,
1259-
status: "WAITING_FOR_DEPLOY",
1260-
taskIdentifier: {
1261-
in: backgroundWorker.tasks.map((task) => task.slug),
1262-
},
1263-
},
1264-
orderBy: {
1265-
createdAt: "asc",
1266-
},
1267-
take: maxCount + 1,
1268-
});
1269-
1270-
//none to process
1271-
if (!runsWaitingForDeploy.length) return;
1272-
1273-
for (const run of runsWaitingForDeploy) {
1274-
await this.prisma.$transaction(async (tx) => {
1275-
const updatedRun = await tx.taskRun.update({
1276-
where: {
1277-
id: run.id,
1278-
},
1279-
data: {
1280-
status: "PENDING",
1281-
},
1282-
});
1283-
await this.enqueueSystem.enqueueRun({
1284-
run: updatedRun,
1285-
env: backgroundWorker.runtimeEnvironment,
1286-
//add to the queue using the original run created time
1287-
//this should ensure they're in the correct order in the queue
1288-
timestamp: updatedRun.createdAt.getTime() - updatedRun.priorityMs,
1289-
tx,
1290-
});
1291-
});
1292-
}
1293-
1294-
//enqueue more if needed
1295-
if (runsWaitingForDeploy.length > maxCount) {
1296-
await this.queueRunsWaitingForWorker({ backgroundWorkerId });
1297-
}
1298-
}
1299-
13001235
//#endregion
13011236

13021237
//#region Heartbeat
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import { EnqueueSystem } from "./enqueueSystem.js";
2+
import { SystemResources } from "./systems.js";
3+
4+
export type WaitingForWorkerSystemOptions = {
5+
resources: SystemResources;
6+
enqueueSystem: EnqueueSystem;
7+
queueRunsWaitingForWorkerBatchSize?: number;
8+
};
9+
10+
export class WaitingForWorkerSystem {
11+
private readonly $: SystemResources;
12+
private readonly enqueueSystem: EnqueueSystem;
13+
14+
constructor(private readonly options: WaitingForWorkerSystemOptions) {
15+
this.$ = options.resources;
16+
this.enqueueSystem = options.enqueueSystem;
17+
}
18+
19+
async enqueueRunsWaitingForWorker({ backgroundWorkerId }: { backgroundWorkerId: string }) {
20+
//It could be a lot of runs, so we will process them in a batch
21+
//if there are still more to process we will enqueue this function again
22+
const maxCount = this.options.queueRunsWaitingForWorkerBatchSize ?? 200;
23+
24+
const backgroundWorker = await this.$.prisma.backgroundWorker.findFirst({
25+
where: {
26+
id: backgroundWorkerId,
27+
},
28+
include: {
29+
runtimeEnvironment: {
30+
include: {
31+
project: true,
32+
organization: true,
33+
},
34+
},
35+
tasks: true,
36+
},
37+
});
38+
39+
if (!backgroundWorker) {
40+
this.$.logger.error("#queueRunsWaitingForWorker: background worker not found", {
41+
id: backgroundWorkerId,
42+
});
43+
return;
44+
}
45+
46+
const runsWaitingForDeploy = await this.$.prisma.taskRun.findMany({
47+
where: {
48+
runtimeEnvironmentId: backgroundWorker.runtimeEnvironmentId,
49+
projectId: backgroundWorker.projectId,
50+
status: "WAITING_FOR_DEPLOY",
51+
taskIdentifier: {
52+
in: backgroundWorker.tasks.map((task) => task.slug),
53+
},
54+
},
55+
orderBy: {
56+
createdAt: "asc",
57+
},
58+
take: maxCount + 1,
59+
});
60+
61+
//none to process
62+
if (!runsWaitingForDeploy.length) return;
63+
64+
for (const run of runsWaitingForDeploy) {
65+
await this.$.prisma.$transaction(async (tx) => {
66+
const updatedRun = await tx.taskRun.update({
67+
where: {
68+
id: run.id,
69+
},
70+
data: {
71+
status: "PENDING",
72+
},
73+
});
74+
await this.enqueueSystem.enqueueRun({
75+
run: updatedRun,
76+
env: backgroundWorker.runtimeEnvironment,
77+
//add to the queue using the original run created time
78+
//this should ensure they're in the correct order in the queue
79+
timestamp: updatedRun.createdAt.getTime() - updatedRun.priorityMs,
80+
tx,
81+
});
82+
});
83+
}
84+
85+
//enqueue more if needed
86+
if (runsWaitingForDeploy.length > maxCount) {
87+
await this.scheduleEnqueueRunsWaitingForWorker({ backgroundWorkerId });
88+
}
89+
}
90+
91+
async scheduleEnqueueRunsWaitingForWorker({
92+
backgroundWorkerId,
93+
}: {
94+
backgroundWorkerId: string;
95+
}): Promise<void> {
96+
//we want this to happen in the background
97+
await this.$.worker.enqueue({
98+
job: "queueRunsWaitingForWorker",
99+
payload: { backgroundWorkerId },
100+
});
101+
}
102+
}

0 commit comments

Comments
 (0)