Skip to content

Commit 7866e95

Browse files
committed
waiting for worker system
1 parent a4581f1 commit 7866e95

2 files changed

Lines changed: 114 additions & 77 deletions

File tree

internal-packages/run-engine/src/engine/index.ts

Lines changed: 12 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ import { TtlSystem } from "./systems/ttlSystem.js";
5252
import { WaitpointSystem } from "./systems/waitpointSystem.js";
5353
import { EngineWorker, HeartbeatTimeouts, RunEngineOptions, TriggerParams } from "./types.js";
5454
import { workerCatalog } from "./workerCatalog.js";
55+
import { WaitingForWorkerSystem } from "./systems/waitingForWorkerSystem.js";
5556

5657
export class RunEngine {
5758
private runLockRedis: Redis;
@@ -77,6 +78,7 @@ export class RunEngine {
7778
checkpointSystem: CheckpointSystem;
7879
delayedRunSystem: DelayedRunSystem;
7980
ttlSystem: TtlSystem;
81+
waitingForWorkerSystem: WaitingForWorkerSystem;
8082

8183
constructor(private readonly options: RunEngineOptions) {
8284
this.prisma = options.prisma;
@@ -150,7 +152,9 @@ export class RunEngine {
150152
});
151153
},
152154
queueRunsWaitingForWorker: async ({ payload }) => {
153-
await this.#queueRunsWaitingForWorker({ backgroundWorkerId: payload.backgroundWorkerId });
155+
await this.waitingForWorkerSystem.enqueueRunsWaitingForWorker({
156+
backgroundWorkerId: payload.backgroundWorkerId,
157+
});
154158
},
155159
tryCompleteBatch: async ({ payload }) => {
156160
await this.batchSystem.performCompleteBatch({ batchId: payload.batchId });
@@ -255,6 +259,11 @@ export class RunEngine {
255259
enqueueSystem: this.enqueueSystem,
256260
});
257261

262+
this.waitingForWorkerSystem = new WaitingForWorkerSystem({
263+
resources,
264+
enqueueSystem: this.enqueueSystem,
265+
});
266+
258267
this.waitpointSystem = new WaitpointSystem({
259268
resources,
260269
executionSnapshotSystem: this.executionSnapshotSystem,
@@ -771,10 +780,8 @@ export class RunEngine {
771780
}: {
772781
backgroundWorkerId: string;
773782
}): Promise<void> {
774-
//we want this to happen in the background
775-
await this.worker.enqueue({
776-
job: "queueRunsWaitingForWorker",
777-
payload: { backgroundWorkerId },
783+
return this.waitingForWorkerSystem.enqueueRunsWaitingForWorker({
784+
backgroundWorkerId,
778785
});
779786
}
780787

@@ -1237,78 +1244,6 @@ export class RunEngine {
12371244
}
12381245
}
12391246

1240-
async #queueRunsWaitingForWorker({ backgroundWorkerId }: { backgroundWorkerId: string }) {
1241-
//It could be a lot of runs, so we will process them in a batch
1242-
//if there are still more to process we will enqueue this function again
1243-
const maxCount = this.options.queueRunsWaitingForWorkerBatchSize ?? 200;
1244-
1245-
const backgroundWorker = await this.prisma.backgroundWorker.findFirst({
1246-
where: {
1247-
id: backgroundWorkerId,
1248-
},
1249-
include: {
1250-
runtimeEnvironment: {
1251-
include: {
1252-
project: true,
1253-
organization: true,
1254-
},
1255-
},
1256-
tasks: true,
1257-
},
1258-
});
1259-
1260-
if (!backgroundWorker) {
1261-
this.logger.error("#queueRunsWaitingForWorker: background worker not found", {
1262-
id: backgroundWorkerId,
1263-
});
1264-
return;
1265-
}
1266-
1267-
const runsWaitingForDeploy = await this.prisma.taskRun.findMany({
1268-
where: {
1269-
runtimeEnvironmentId: backgroundWorker.runtimeEnvironmentId,
1270-
projectId: backgroundWorker.projectId,
1271-
status: "WAITING_FOR_DEPLOY",
1272-
taskIdentifier: {
1273-
in: backgroundWorker.tasks.map((task) => task.slug),
1274-
},
1275-
},
1276-
orderBy: {
1277-
createdAt: "asc",
1278-
},
1279-
take: maxCount + 1,
1280-
});
1281-
1282-
//none to process
1283-
if (!runsWaitingForDeploy.length) return;
1284-
1285-
for (const run of runsWaitingForDeploy) {
1286-
await this.prisma.$transaction(async (tx) => {
1287-
const updatedRun = await tx.taskRun.update({
1288-
where: {
1289-
id: run.id,
1290-
},
1291-
data: {
1292-
status: "PENDING",
1293-
},
1294-
});
1295-
await this.enqueueSystem.enqueueRun({
1296-
run: updatedRun,
1297-
env: backgroundWorker.runtimeEnvironment,
1298-
//add to the queue using the original run created time
1299-
//this should ensure they're in the correct order in the queue
1300-
timestamp: updatedRun.createdAt.getTime() - updatedRun.priorityMs,
1301-
tx,
1302-
});
1303-
});
1304-
}
1305-
1306-
//enqueue more if needed
1307-
if (runsWaitingForDeploy.length > maxCount) {
1308-
await this.queueRunsWaitingForWorker({ backgroundWorkerId });
1309-
}
1310-
}
1311-
13121247
//#endregion
13131248

13141249
//#region Heartbeat
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import { EnqueueSystem } from "./enqueueSystem.js";
2+
import { SystemResources } from "./systems.js";
3+
4+
export type WaitingForWorkerSystemOptions = {
5+
resources: SystemResources;
6+
enqueueSystem: EnqueueSystem;
7+
queueRunsWaitingForWorkerBatchSize?: number;
8+
};
9+
10+
export class WaitingForWorkerSystem {
11+
private readonly $: SystemResources;
12+
private readonly enqueueSystem: EnqueueSystem;
13+
14+
constructor(private readonly options: WaitingForWorkerSystemOptions) {
15+
this.$ = options.resources;
16+
this.enqueueSystem = options.enqueueSystem;
17+
}
18+
19+
async enqueueRunsWaitingForWorker({ backgroundWorkerId }: { backgroundWorkerId: string }) {
20+
//It could be a lot of runs, so we will process them in a batch
21+
//if there are still more to process we will enqueue this function again
22+
const maxCount = this.options.queueRunsWaitingForWorkerBatchSize ?? 200;
23+
24+
const backgroundWorker = await this.$.prisma.backgroundWorker.findFirst({
25+
where: {
26+
id: backgroundWorkerId,
27+
},
28+
include: {
29+
runtimeEnvironment: {
30+
include: {
31+
project: true,
32+
organization: true,
33+
},
34+
},
35+
tasks: true,
36+
},
37+
});
38+
39+
if (!backgroundWorker) {
40+
this.$.logger.error("#queueRunsWaitingForWorker: background worker not found", {
41+
id: backgroundWorkerId,
42+
});
43+
return;
44+
}
45+
46+
const runsWaitingForDeploy = await this.$.prisma.taskRun.findMany({
47+
where: {
48+
runtimeEnvironmentId: backgroundWorker.runtimeEnvironmentId,
49+
projectId: backgroundWorker.projectId,
50+
status: "WAITING_FOR_DEPLOY",
51+
taskIdentifier: {
52+
in: backgroundWorker.tasks.map((task) => task.slug),
53+
},
54+
},
55+
orderBy: {
56+
createdAt: "asc",
57+
},
58+
take: maxCount + 1,
59+
});
60+
61+
//none to process
62+
if (!runsWaitingForDeploy.length) return;
63+
64+
for (const run of runsWaitingForDeploy) {
65+
await this.$.prisma.$transaction(async (tx) => {
66+
const updatedRun = await tx.taskRun.update({
67+
where: {
68+
id: run.id,
69+
},
70+
data: {
71+
status: "PENDING",
72+
},
73+
});
74+
await this.enqueueSystem.enqueueRun({
75+
run: updatedRun,
76+
env: backgroundWorker.runtimeEnvironment,
77+
//add to the queue using the original run created time
78+
//this should ensure they're in the correct order in the queue
79+
timestamp: updatedRun.createdAt.getTime() - updatedRun.priorityMs,
80+
tx,
81+
});
82+
});
83+
}
84+
85+
//enqueue more if needed
86+
if (runsWaitingForDeploy.length > maxCount) {
87+
await this.scheduleEnqueueRunsWaitingForWorker({ backgroundWorkerId });
88+
}
89+
}
90+
91+
async scheduleEnqueueRunsWaitingForWorker({
92+
backgroundWorkerId,
93+
}: {
94+
backgroundWorkerId: string;
95+
}): Promise<void> {
96+
//we want this to happen in the background
97+
await this.$.worker.enqueue({
98+
job: "queueRunsWaitingForWorker",
99+
payload: { backgroundWorkerId },
100+
});
101+
}
102+
}

0 commit comments

Comments
 (0)