Skip to content

Commit 949f404

Browse files
committed
Make release concurrency system extremely simple, everything just releases all the time
1 parent d5e7f6b commit 949f404

13 files changed

Lines changed: 76 additions & 3259 deletions

apps/webapp/app/runEngine/concerns/queues.server.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,7 @@ export class DefaultQueueManager implements QueueManager {
5555
queueName = specifiedQueue.name;
5656
lockedQueueId = specifiedQueue.id;
5757
lockedQueueReleaseConcurrencyOnWaitpoint =
58-
typeof specifiedQueue.concurrencyLimit === "undefined" ||
59-
specifiedQueue.releaseConcurrencyOnWaitpoint;
58+
this.engine.shouldReleaseConcurrencyOnWaitpointForQueue(specifiedQueue);
6059
} else {
6160
// No specific queue name provided, use the default queue for the task on the locked worker
6261
const lockedTask = await this.prisma.backgroundWorkerTask.findFirst({

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -212,10 +212,11 @@ export class RunEngineTriggerTaskService {
212212
})
213213
: undefined;
214214

215-
const { queueName, lockedQueueId } = await this.queueConcern.resolveQueueProperties(
216-
triggerRequest,
217-
lockedToBackgroundWorker ?? undefined
218-
);
215+
const { queueName, lockedQueueId, lockedQueueReleaseConcurrencyOnWaitpoint } =
216+
await this.queueConcern.resolveQueueProperties(
217+
triggerRequest,
218+
lockedToBackgroundWorker ?? undefined
219+
);
219220

220221
//upsert tags
221222
const tags = await createTags(
@@ -272,6 +273,7 @@ export class RunEngineTriggerTaskService {
272273
concurrencyKey: body.options?.concurrencyKey,
273274
queue: queueName,
274275
lockedQueueId,
276+
lockedQueueReleaseConcurrencyOnWaitpoint,
275277
workerQueue,
276278
isTest: body.options?.test ?? false,
277279
delayUntil,

internal-packages/run-engine/src/engine/index.ts

Lines changed: 8 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import {
1717
PrismaClient,
1818
PrismaClientOrTransaction,
1919
PrismaReplicaClient,
20+
TaskQueue,
2021
TaskRun,
2122
TaskRunExecutionSnapshot,
2223
Waitpoint,
@@ -241,34 +242,6 @@ export class RunEngine {
241242

242243
this.releaseConcurrencySystem = new ReleaseConcurrencySystem({
243244
resources,
244-
maxTokensRatio: options.releaseConcurrency?.maxTokensRatio,
245-
releasingsMaxAge: options.releaseConcurrency?.releasingsMaxAge,
246-
releasingsPollInterval: options.releaseConcurrency?.releasingsPollInterval,
247-
queueOptions:
248-
typeof options.releaseConcurrency?.disabled === "boolean" &&
249-
options.releaseConcurrency.disabled
250-
? undefined
251-
: {
252-
disableConsumers: options.releaseConcurrency?.disableConsumers,
253-
redis: {
254-
...options.queue.redis, // Use base queue redis options
255-
...options.releaseConcurrency?.redis, // Allow overrides
256-
keyPrefix: `${options.queue.redis.keyPrefix ?? ""}release-concurrency:`,
257-
},
258-
retry: {
259-
maxRetries: options.releaseConcurrency?.maxRetries ?? 5,
260-
backoff: {
261-
minDelay: options.releaseConcurrency?.backoff?.minDelay ?? 1000,
262-
maxDelay: options.releaseConcurrency?.backoff?.maxDelay ?? 10000,
263-
factor: options.releaseConcurrency?.backoff?.factor ?? 2,
264-
},
265-
},
266-
consumersCount: options.releaseConcurrency?.consumersCount ?? 1,
267-
pollInterval: options.releaseConcurrency?.pollInterval ?? 1000,
268-
batchSize: options.releaseConcurrency?.batchSize ?? 10,
269-
tracer: this.tracer,
270-
meter: this.meter,
271-
},
272245
});
273246

274247
this.executionSnapshotSystem = new ExecutionSnapshotSystem({
@@ -332,6 +305,7 @@ export class RunEngine {
332305
runAttemptSystem: this.runAttemptSystem,
333306
machines: this.options.machines,
334307
releaseConcurrencySystem: this.releaseConcurrencySystem,
308+
waitpointSystem: this.waitpointSystem,
335309
});
336310
}
337311

@@ -361,6 +335,7 @@ export class RunEngine {
361335
workerQueue,
362336
queue,
363337
lockedQueueId,
338+
lockedQueueReleaseConcurrencyOnWaitpoint,
364339
isTest,
365340
delayUntil,
366341
queuedAt,
@@ -433,6 +408,7 @@ export class RunEngine {
433408
concurrencyKey,
434409
queue,
435410
lockedQueueId,
411+
lockedQueueReleaseConcurrencyOnWaitpoint,
436412
workerQueue,
437413
isTest,
438414
delayUntil,
@@ -1124,6 +1100,10 @@ export class RunEngine {
11241100
return this.raceSimulationSystem.registerRacepointForRun({ runId, waitInterval });
11251101
}
11261102

1103+
shouldReleaseConcurrencyOnWaitpointForQueue(queue: TaskQueue) {
1104+
return this.waitpointSystem.shouldReleaseConcurrencyOnWaitpointForQueue(queue);
1105+
}
1106+
11271107
async migrateLegacyMasterQueues() {
11281108
const workerGroups = await this.prisma.workerInstanceGroup.findMany({
11291109
where: {
@@ -1160,7 +1140,6 @@ export class RunEngine {
11601140
async quit() {
11611141
try {
11621142
//stop the run queue
1163-
await this.releaseConcurrencySystem.quit();
11641143
await this.runQueue.quit();
11651144
await this.worker.stop();
11661145
await this.runLock.quit();

0 commit comments

Comments
 (0)