Skip to content

Commit d5e7f6b

Browse files
committed
WIP
1 parent 6bf580a commit d5e7f6b

4 files changed

Lines changed: 33 additions & 2 deletions

File tree

apps/webapp/app/runEngine/concerns/queues.server.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ export class DefaultQueueManager implements QueueManager {
2727
): Promise<QueueProperties> {
2828
let queueName: string;
2929
let lockedQueueId: string | undefined;
30+
let lockedQueueReleaseConcurrencyOnWaitpoint: boolean | undefined;
3031

3132
// Determine queue name based on lockToVersion and provided options
3233
if (lockedBackgroundWorker) {
@@ -53,6 +54,9 @@ export class DefaultQueueManager implements QueueManager {
5354
// Use the validated queue name directly
5455
queueName = specifiedQueue.name;
5556
lockedQueueId = specifiedQueue.id;
57+
lockedQueueReleaseConcurrencyOnWaitpoint =
58+
typeof specifiedQueue.concurrencyLimit === "undefined" ||
59+
specifiedQueue.releaseConcurrencyOnWaitpoint;
5660
} else {
5761
// No specific queue name provided, use the default queue for the task on the locked worker
5862
const lockedTask = await this.prisma.backgroundWorkerTask.findFirst({
@@ -91,6 +95,9 @@ export class DefaultQueueManager implements QueueManager {
9195
// Use the task's default queue name
9296
queueName = lockedTask.queue.name;
9397
lockedQueueId = lockedTask.queue.id;
98+
lockedQueueReleaseConcurrencyOnWaitpoint =
99+
typeof lockedTask.queue.concurrencyLimit === "undefined" ||
100+
lockedTask.queue.releaseConcurrencyOnWaitpoint;
94101
}
95102
} else {
96103
// Task is not locked to a specific version, use regular logic
@@ -118,6 +125,7 @@ export class DefaultQueueManager implements QueueManager {
118125
return {
119126
queueName,
120127
lockedQueueId,
128+
lockedQueueReleaseConcurrencyOnWaitpoint,
121129
};
122130
}
123131

apps/webapp/app/runEngine/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ export type QueueValidationResult =
5252
export type QueueProperties = {
5353
queueName: string;
5454
lockedQueueId?: string;
55+
lockedQueueReleaseConcurrencyOnWaitpoint?: boolean;
5556
};
5657

5758
export type LockedBackgroundWorker = Pick<
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
-- DropIndex
2+
DROP INDEX "SecretStore_key_idx";
3+
4+
-- DropIndex
5+
DROP INDEX "TaskRun_runtimeEnvironmentId_createdAt_idx";
6+
7+
-- DropIndex
8+
DROP INDEX "TaskRun_runtimeEnvironmentId_id_idx";
9+
10+
-- AlterTable
11+
ALTER TABLE "TaskRun" ADD COLUMN "lockedQueueReleaseConcurrencyOnWaitpoint" BOOLEAN;
12+
13+
-- CreateIndex
14+
CREATE INDEX "SecretStore_key_idx" ON "SecretStore"("key" text_pattern_ops);
15+
16+
-- CreateIndex
17+
CREATE INDEX "TaskRun_runtimeEnvironmentId_id_idx" ON "TaskRun"("runtimeEnvironmentId", "id" DESC);
18+
19+
-- CreateIndex
20+
CREATE INDEX "TaskRun_runtimeEnvironmentId_createdAt_idx" ON "TaskRun"("runtimeEnvironmentId", "createdAt" DESC);

internal-packages/database/prisma/schema.prisma

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -555,9 +555,11 @@ model TaskRun {
555555
organizationId String?
556556
557557
// The specific queue this run is in
558-
queue String
558+
queue String
559559
// The queueId is set when the run is locked to a specific queue
560-
lockedQueueId String?
560+
lockedQueueId String?
561+
// Whether the locked queue specifies that concurrency should be released when a run is blocked by a waitpoint
562+
lockedQueueReleaseConcurrencyOnWaitpoint Boolean?
561563
562564
/// The main queue that this run is part of
563565
workerQueue String @default("main") @map("masterQueue")

0 commit comments

Comments
 (0)