Skip to content

Commit 30586b4

Browse files
test(run-engine): add hot-key debounce stress tests + replica-routed fast-path read
Stress test: - Seeds a debounce key, then fires 40 concurrent triggers on it. - With the full fix on (fast-path + 1s quantization): all 40 succeed, all return the seed run id, and only 1 `taskRun.update` lands on the run (the first lock-protected trigger; the rest short-circuit via the unlocked read). - With fast-path/quantization off: still correct (no 5xx, all return the seed run) thanks to the contention fallback, with 4 updates observed under N=40 (rest absorbed by the fallback). Replica-routed fast-path read: - New `useReplicaForFastPathRead` debounce option (default false). When on, the unlocked `delayUntil`/`createdAt` read goes through `readOnlyPrisma` instead of the writer. Safe because the read is best-effort and re-checked under the lock; replica lag at worst means a few extra callers fall through to the lock. - New env var `RUN_ENGINE_DEBOUNCE_USE_REPLICA_FOR_FAST_PATH_READ` wires this through in the webapp so cloud (Aurora readers) can opt in without code changes. Co-Authored-By: Eric Allam <eallam@icloud.com>
1 parent 5b1e357 commit 30586b4

6 files changed

Lines changed: 219 additions & 3 deletions

File tree

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -837,6 +837,7 @@ const EnvironmentSchema = z
837837
.default("info"),
838838
RUN_ENGINE_TREAT_PRODUCTION_EXECUTION_STALLS_AS_OOM: z.string().default("0"),
839839
RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED: z.string().default("0"),
840+
RUN_ENGINE_DEBOUNCE_USE_REPLICA_FOR_FAST_PATH_READ: z.string().default("0"),
840841

841842
/** How long should the presence ttl last */
842843
DEV_PRESENCE_SSE_TIMEOUT: z.coerce.number().int().default(30_000),

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ function createRunEngine() {
214214
// Debounce configuration
215215
debounce: {
216216
maxDebounceDurationMs: env.RUN_ENGINE_MAXIMUM_DEBOUNCE_DURATION_MS,
217+
useReplicaForFastPathRead: env.RUN_ENGINE_DEBOUNCE_USE_REPLICA_FOR_FAST_PATH_READ === "1",
217218
},
218219
});
219220

internal-packages/run-engine/src/engine/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ export class RunEngine {
326326
maxDebounceDurationMs: options.debounce?.maxDebounceDurationMs ?? 60 * 60 * 1000, // Default 1 hour
327327
quantizeNewDelayUntilMs: options.debounce?.quantizeNewDelayUntilMs ?? 1000,
328328
fastPathSkipEnabled: options.debounce?.fastPathSkipEnabled ?? true,
329+
useReplicaForFastPathRead: options.debounce?.useReplicaForFastPathRead ?? false,
329330
});
330331

331332
this.pendingVersionSystem = new PendingVersionSystem({

internal-packages/run-engine/src/engine/systems/debounceSystem.ts

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,12 @@ import {
1010
parseNaturalLanguageDuration,
1111
parseNaturalLanguageDurationInMs,
1212
} from "@trigger.dev/core/v3/isomorphic";
13-
import { PrismaClientOrTransaction, TaskRun, Waitpoint } from "@trigger.dev/database";
13+
import {
14+
PrismaClientOrTransaction,
15+
PrismaReplicaClient,
16+
TaskRun,
17+
Waitpoint,
18+
} from "@trigger.dev/database";
1419
import { nanoid } from "nanoid";
1520
import { SystemResources } from "./systems.js";
1621
import { ExecutionSnapshotSystem, getLatestExecutionSnapshot } from "./executionSnapshotSystem.js";
@@ -57,6 +62,11 @@ export type DebounceSystemOptions = {
5762
* current one.
5863
*/
5964
fastPathSkipEnabled?: boolean;
65+
/**
66+
* When true, route the unlocked fast-path read through `readOnlyPrisma`
67+
* (e.g. an Aurora reader) instead of the writer.
68+
*/
69+
useReplicaForFastPathRead?: boolean;
6070
};
6171

6272
export type DebounceResult =
@@ -103,6 +113,7 @@ export class DebounceSystem {
103113
private readonly maxDebounceDurationMs: number;
104114
private readonly quantizeNewDelayUntilMs: number;
105115
private readonly fastPathSkipEnabled: boolean;
116+
private readonly useReplicaForFastPathRead: boolean;
106117

107118
constructor(options: DebounceSystemOptions) {
108119
this.$ = options.resources;
@@ -122,6 +133,7 @@ export class DebounceSystem {
122133
this.maxDebounceDurationMs = options.maxDebounceDurationMs;
123134
this.quantizeNewDelayUntilMs = Math.max(0, options.quantizeNewDelayUntilMs ?? 1000);
124135
this.fastPathSkipEnabled = options.fastPathSkipEnabled ?? true;
136+
this.useReplicaForFastPathRead = options.useReplicaForFastPathRead ?? false;
125137

126138
this.#registerCommands();
127139
}
@@ -467,6 +479,13 @@ return 0
467479
tx?: PrismaClientOrTransaction;
468480
}): Promise<DebounceResult> {
469481
const prisma = tx ?? this.$.prisma;
482+
// Reads that are explicitly best-effort (the fast-path skip) can run on
483+
// `readOnlyPrisma` when configured. Replica lag is fine: the monotonic-
484+
// forward invariant means a stale read just falls through to the locked
485+
// path. Only divert reads when the caller isn't inside a tx (where the
486+
// read needs to see the tx's writes).
487+
const fastPathReadPrisma =
488+
tx ?? (this.useReplicaForFastPathRead ? this.$.readOnlyPrisma : this.$.prisma);
470489

471490
// Compute the (quantized) target delayUntil up-front, before taking any lock.
472491
// Quantizing to e.g. 1s buckets collapses many concurrent triggers on the same
@@ -484,7 +503,7 @@ return 0
484503
existingRunId,
485504
newDelayUntil,
486505
debounce,
487-
prisma,
506+
prisma: fastPathReadPrisma,
488507
});
489508
if (fastPathResult) {
490509
return fastPathResult;
@@ -569,7 +588,7 @@ return 0
569588
existingRunId: string;
570589
newDelayUntil: Date;
571590
debounce: DebounceOptions;
572-
prisma: PrismaClientOrTransaction;
591+
prisma: PrismaClientOrTransaction | PrismaReplicaClient;
573592
}): Promise<DebounceResult | null> {
574593
// Trailing mode with updateData still needs the lock so the data update is
575594
// applied; only short-circuit when there's nothing to update.

internal-packages/run-engine/src/engine/tests/debounce.test.ts

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3012,5 +3012,189 @@ describe("RunEngine debounce", () => {
30123012
}
30133013
}
30143014
);
3015+
3016+
// Reproduces the hot-key contention from TRI-8758: fires N concurrent
3017+
// triggers on the same debounce key after the run is already DELAYED.
3018+
//
3019+
// - fixed=true: fast-path skip + 1s quantization on. The herd collapses on
3020+
// the unlocked read and onto the same quantized newDelayUntil, so almost
3021+
// every call short-circuits and `taskRun.update` is barely written.
3022+
// - fixed=false: fast-path off and quantization off (closer to the
3023+
// pre-fix behaviour). The lock-contention fallback (also part of this
3024+
// PR) still catches herd lock failures; this case validates that even
3025+
// without the fast-path the system stays correct under stress, just at
3026+
// higher Redlock cost.
3027+
for (const fixed of [true, false]) {
3028+
containerTest(
3029+
`Debounce hot-key stress (fixed=${fixed}): N concurrent triggers stay correct`,
3030+
async ({ prisma, redisOptions }) => {
3031+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
3032+
3033+
const engine = new RunEngine({
3034+
prisma,
3035+
worker: {
3036+
redis: redisOptions,
3037+
workers: 1,
3038+
tasksPerWorker: 10,
3039+
pollIntervalMs: 100,
3040+
},
3041+
queue: {
3042+
redis: redisOptions,
3043+
},
3044+
runLock: {
3045+
redis: redisOptions,
3046+
},
3047+
machines: {
3048+
defaultMachine: "small-1x",
3049+
machines: {
3050+
"small-1x": {
3051+
name: "small-1x" as const,
3052+
cpu: 0.5,
3053+
memory: 0.5,
3054+
centsPerMs: 0.0001,
3055+
},
3056+
},
3057+
baseCostInCents: 0.0001,
3058+
},
3059+
debounce: {
3060+
maxDebounceDurationMs: 10 * 60_000,
3061+
fastPathSkipEnabled: fixed,
3062+
// 1s buckets - same as the real default - or 0 to mimic the
3063+
// pre-fix behaviour where every concurrent trigger has a slightly
3064+
// larger newDelayUntil than the last.
3065+
quantizeNewDelayUntilMs: fixed ? 1000 : 0,
3066+
},
3067+
tracer: trace.getTracer("test", "0.0.0"),
3068+
});
3069+
3070+
try {
3071+
const taskIdentifier = "test-task";
3072+
await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier);
3073+
3074+
// Seed the debounce key with an initial run, then push delayUntil far
3075+
// forward so the herd lands well inside the existing window.
3076+
const seed = await engine.trigger(
3077+
{
3078+
number: 0,
3079+
friendlyId: "run_stress0",
3080+
environment: authenticatedEnvironment,
3081+
taskIdentifier,
3082+
payload: '{"data": "seed"}',
3083+
payloadType: "application/json",
3084+
context: {},
3085+
traceContext: {},
3086+
traceId: "t_stress_seed",
3087+
spanId: "s_stress_seed",
3088+
workerQueue: "main",
3089+
queue: "task/test-task",
3090+
isTest: false,
3091+
tags: [],
3092+
delayUntil: new Date(Date.now() + 30_000),
3093+
debounce: {
3094+
key: "stress-key",
3095+
delay: "30s",
3096+
},
3097+
},
3098+
prisma
3099+
);
3100+
3101+
// Move delayUntil to a small but safe future offset. The herd's
3102+
// newDelayUntil (now + 30s) will be meaningfully later than the
3103+
// current value, so the fast-path-off branch reschedules. The
3104+
// ~2s buffer keeps the run DELAYED long enough to absorb startup
3105+
// jitter before the first trigger writes delayUntil = now + 30s.
3106+
await prisma.taskRun.update({
3107+
where: { id: seed.id },
3108+
data: { delayUntil: new Date(Date.now() + 2_000) },
3109+
});
3110+
3111+
// Count taskRun.update calls so we can assert that the fast-path
3112+
// actually short-circuits the herd's writes. We monkey-patch the
3113+
// bound method on the prisma instance the engine is holding.
3114+
let updateCount = 0;
3115+
const originalUpdate = prisma.taskRun.update.bind(prisma.taskRun);
3116+
(prisma.taskRun as unknown as { update: typeof originalUpdate }).update = ((
3117+
...args: Parameters<typeof originalUpdate>
3118+
) => {
3119+
updateCount++;
3120+
return originalUpdate(...args);
3121+
}) as typeof originalUpdate;
3122+
3123+
try {
3124+
const N = 40;
3125+
const triggers = Array.from({ length: N }, (_, i) =>
3126+
engine.trigger(
3127+
{
3128+
number: i + 1,
3129+
friendlyId: `run_stress${i + 1}`,
3130+
environment: authenticatedEnvironment,
3131+
taskIdentifier,
3132+
payload: `{"data": "stress-${i}"}`,
3133+
payloadType: "application/json",
3134+
context: {},
3135+
traceContext: {},
3136+
traceId: `t_stress_${i}`,
3137+
spanId: `s_stress_${i}`,
3138+
workerQueue: "main",
3139+
queue: "task/test-task",
3140+
isTest: false,
3141+
tags: [],
3142+
delayUntil: new Date(Date.now() + 30_000),
3143+
debounce: {
3144+
key: "stress-key",
3145+
delay: "30s",
3146+
},
3147+
},
3148+
prisma
3149+
)
3150+
);
3151+
3152+
const start = performance.now();
3153+
const settled = await Promise.allSettled(triggers);
3154+
const durationMs = performance.now() - start;
3155+
3156+
const fulfilled = settled.filter(
3157+
(r): r is PromiseFulfilledResult<{ id: string }> => r.status === "fulfilled"
3158+
);
3159+
const rejected = settled.filter((r) => r.status === "rejected");
3160+
3161+
// No 5xx feedback loop: every concurrent trigger succeeds and
3162+
// returns the existing run id.
3163+
expect(rejected).toHaveLength(0);
3164+
expect(fulfilled).toHaveLength(N);
3165+
for (const r of fulfilled) {
3166+
expect(r.value.id).toBe(seed.id);
3167+
}
3168+
3169+
// Only one row, regardless of contention path.
3170+
const runs = await prisma.taskRun.findMany({
3171+
where: { taskIdentifier, runtimeEnvironmentId: authenticatedEnvironment.id },
3172+
});
3173+
expect(runs.length).toBe(1);
3174+
3175+
console.log(
3176+
`[stress fixed=${fixed}] N=${N} duration=${durationMs.toFixed(
3177+
0
3178+
)}ms taskRun.update=${updateCount}`
3179+
);
3180+
3181+
if (fixed) {
3182+
// With fast-path + quantization: the herd collapses onto the
3183+
// same quantized newDelayUntil. Trigger #1 takes the lock and
3184+
// updates delayUntil; every subsequent trigger sees a covering
3185+
// delayUntil on the unlocked read and short-circuits. So at
3186+
// most one update lands on the run row.
3187+
expect(updateCount).toBeLessThanOrEqual(1);
3188+
}
3189+
} finally {
3190+
(prisma.taskRun as unknown as { update: typeof originalUpdate }).update =
3191+
originalUpdate;
3192+
}
3193+
} finally {
3194+
await engine.quit();
3195+
}
3196+
}
3197+
);
3198+
}
30153199
});
30163200

internal-packages/run-engine/src/engine/types.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,16 @@ export type RunEngineOptions = {
151151
* Default: true.
152152
*/
153153
fastPathSkipEnabled?: boolean;
154+
/**
155+
* Whether to route the unlocked fast-path read of `delayUntil`/`createdAt`
156+
* through `readOnlyPrisma` (e.g. an Aurora reader) instead of the writer.
157+
* Safe because the read is best-effort and re-checked under the lock by
158+
* whichever caller is actually pushing forward; replica lag at worst means
159+
* a few extra callers fall through to the lock.
160+
*
161+
* Default: false.
162+
*/
163+
useReplicaForFastPathRead?: boolean;
154164
};
155165
/** If not set then checkpoints won't ever be used */
156166
retryWarmStartThresholdMs?: number;

0 commit comments

Comments
 (0)