Skip to content

Commit 018baa7

Browse files
committed
WIP new release concurrency system
1 parent f507be3 commit 018baa7

10 files changed

Lines changed: 26 additions & 501 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- AlterTable
2+
ALTER TABLE "RuntimeEnvironment" ADD COLUMN "concurrencyLimitBurstFactor" DECIMAL(4,2) NOT NULL DEFAULT 2.00;

internal-packages/database/prisma/schema.prisma

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,8 +247,9 @@ model RuntimeEnvironment {
247247
///A memorable code for the environment
248248
shortcode String
249249
250-
maximumConcurrencyLimit Int @default(5)
251-
paused Boolean @default(false)
250+
maximumConcurrencyLimit Int @default(5)
251+
concurrencyLimitBurstFactor Decimal @default("2.00") @db.Decimal(4, 2)
252+
paused Boolean @default(false)
252253
253254
autoEnableInternalSources Boolean @default(true)
254255

internal-packages/database/src/transaction.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ export type PrismaClientOrTransaction = PrismaClient | PrismaTransactionClient;
99

1010
export type PrismaReplicaClient = Omit<PrismaClient, "$transaction">;
1111

12+
export const Decimal = Prisma.Decimal;
13+
1214
function isTransactionClient(prisma: PrismaClientOrTransaction): prisma is PrismaTransactionClient {
1315
return !("$transaction" in prisma);
1416
}

internal-packages/run-engine/src/engine/index.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,6 @@ export class RunEngine {
275275
resources,
276276
executionSnapshotSystem: this.executionSnapshotSystem,
277277
enqueueSystem: this.enqueueSystem,
278-
releaseConcurrencySystem: this.releaseConcurrencySystem,
279278
});
280279

281280
this.ttlSystem = new TtlSystem({

internal-packages/run-engine/src/engine/systems/checkpointSystem.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,10 +195,13 @@ export class CheckpointSystem {
195195
checkpointId: taskRunCheckpoint.id,
196196
});
197197

198-
this.$.logger.debug("Refilling token bucket for release concurrency queue", {
198+
this.$.logger.debug("Releasing concurrency for run because it was checkpointed", {
199199
snapshot,
200+
newSnapshot,
200201
});
201202

203+
await this.releaseConcurrencySystem.releaseConcurrency(run);
204+
202205
return {
203206
ok: true as const,
204207
...executionResultFromSnapshot(newSnapshot),
@@ -228,6 +231,13 @@ export class CheckpointSystem {
228231
runnerId,
229232
});
230233

234+
this.$.logger.debug("Releasing concurrency for run because it was checkpointed", {
235+
snapshot,
236+
newSnapshot,
237+
});
238+
239+
await this.releaseConcurrencySystem.releaseConcurrency(run);
240+
231241
return {
232242
ok: true as const,
233243
...executionResultFromSnapshot(newSnapshot),

internal-packages/run-engine/src/engine/systems/waitpointSystem.ts

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,17 @@ export type WaitpointSystemOptions = {
2020
resources: SystemResources;
2121
executionSnapshotSystem: ExecutionSnapshotSystem;
2222
enqueueSystem: EnqueueSystem;
23-
releaseConcurrencySystem: ReleaseConcurrencySystem;
2423
};
2524

2625
export class WaitpointSystem {
2726
private readonly $: SystemResources;
2827
private readonly executionSnapshotSystem: ExecutionSnapshotSystem;
29-
private readonly releaseConcurrencySystem: ReleaseConcurrencySystem;
3028
private readonly enqueueSystem: EnqueueSystem;
3129

3230
constructor(private readonly options: WaitpointSystemOptions) {
3331
this.$ = options.resources;
3432
this.executionSnapshotSystem = options.executionSnapshotSystem;
3533
this.enqueueSystem = options.enqueueSystem;
36-
this.releaseConcurrencySystem = options.releaseConcurrencySystem;
3734
}
3835

3936
shouldReleaseConcurrencyOnWaitpointForQueue(queue: TaskQueue) {
@@ -456,29 +453,6 @@ export class WaitpointSystem {
456453

457454
// Let the worker know immediately, so it can suspend the run
458455
await sendNotificationToWorker({ runId, snapshot, eventBus: this.$.eventBus });
459-
460-
if (isRunBlocked) {
461-
//release concurrency
462-
const run = await this.$.prisma.taskRun.findFirst({
463-
where: { id: runId },
464-
select: {
465-
id: true,
466-
organizationId: true,
467-
lockedQueueReleaseConcurrencyOnWaitpoint: true,
468-
},
469-
});
470-
471-
if (!run) {
472-
this.$.logger.error(
473-
"WaitpointSystem.blockRunWithWaitpoint(): Run not found, cannot release concurrency",
474-
{
475-
runId,
476-
}
477-
);
478-
} else {
479-
await this.releaseConcurrencySystem.releaseConcurrency(run, releaseConcurrency);
480-
}
481-
}
482456
}
483457

484458
if (timeout) {
@@ -557,6 +531,7 @@ export class WaitpointSystem {
557531
id: true,
558532
type: true,
559533
maximumConcurrencyLimit: true,
534+
concurrencyLimitBurstFactor: true,
560535
project: { select: { id: true } },
561536
organization: { select: { id: true } },
562537
},

0 commit comments

Comments
 (0)