@@ -3108,87 +3108,86 @@ describe("RunEngine debounce", () => {
31083108 data : { delayUntil : new Date ( Date . now ( ) + 2_000 ) } ,
31093109 } ) ;
31103110
3111- // Count taskRun.update calls so we can assert that the fast-path
3112- // actually short-circuits the herd's writes. We monkey-patch the
3113- // bound method on the prisma instance the engine is holding.
3114- let updateCount = 0 ;
3115- const originalUpdate = prisma . taskRun . update . bind ( prisma . taskRun ) ;
3116- ( prisma . taskRun as unknown as { update : typeof originalUpdate } ) . update = ( (
3117- ...args : Parameters < typeof originalUpdate >
3118- ) => {
3119- updateCount ++ ;
3120- return originalUpdate ( ...args ) ;
3121- } ) as typeof originalUpdate ;
3122-
3123- try {
3124- const N = 40 ;
3125- const triggers = Array . from ( { length : N } , ( _ , i ) =>
3126- engine . trigger (
3127- {
3128- number : i + 1 ,
3129- friendlyId : `run_stress${ i + 1 } ` ,
3130- environment : authenticatedEnvironment ,
3131- taskIdentifier,
3132- payload : `{"data": "stress-${ i } "}` ,
3133- payloadType : "application/json" ,
3134- context : { } ,
3135- traceContext : { } ,
3136- traceId : `t_stress_${ i } ` ,
3137- spanId : `s_stress_${ i } ` ,
3138- workerQueue : "main" ,
3139- queue : "task/test-task" ,
3140- isTest : false ,
3141- tags : [ ] ,
3142- delayUntil : new Date ( Date . now ( ) + 30_000 ) ,
3143- debounce : {
3144- key : "stress-key" ,
3145- delay : "30s" ,
3146- } ,
3111+ // Subscribe to `runDelayRescheduled` so we can count how many times
3112+ // the herd actually pushed `delayUntil` forward. Each event corresponds
3113+ // to a successful reschedule under the lock - the fast-path/contention
3114+ // fallback paths skip the reschedule entirely. We use the engine's
3115+ // public eventBus, which is the same observable interface other tests
3116+ // in this repo (ttl, trigger, cancelling, waitpoints) use.
3117+ let rescheduleCount = 0 ;
3118+ engine . eventBus . on ( "runDelayRescheduled" , ( ) => {
3119+ rescheduleCount ++ ;
3120+ } ) ;
3121+
3122+ const N = 40 ;
3123+ const triggers = Array . from ( { length : N } , ( _ , i ) =>
3124+ engine . trigger (
3125+ {
3126+ number : i + 1 ,
3127+ friendlyId : `run_stress${ i + 1 } ` ,
3128+ environment : authenticatedEnvironment ,
3129+ taskIdentifier,
3130+ payload : `{"data": "stress-${ i } "}` ,
3131+ payloadType : "application/json" ,
3132+ context : { } ,
3133+ traceContext : { } ,
3134+ traceId : `t_stress_${ i } ` ,
3135+ spanId : `s_stress_${ i } ` ,
3136+ workerQueue : "main" ,
3137+ queue : "task/test-task" ,
3138+ isTest : false ,
3139+ tags : [ ] ,
3140+ delayUntil : new Date ( Date . now ( ) + 30_000 ) ,
3141+ debounce : {
3142+ key : "stress-key" ,
3143+ delay : "30s" ,
31473144 } ,
3148- prisma
3149- )
3150- ) ;
3151-
3152- const start = performance . now ( ) ;
3153- const settled = await Promise . allSettled ( triggers ) ;
3154- const durationMs = performance . now ( ) - start ;
3155-
3156- const fulfilled = settled . filter (
3157- ( r ) : r is PromiseFulfilledResult < { id : string } > => r . status === "fulfilled"
3158- ) ;
3159- const rejected = settled . filter ( ( r ) => r . status === "rejected" ) ;
3160-
3161- // No 5xx feedback loop: every concurrent trigger succeeds and
3162- // returns the existing run id.
3163- expect ( rejected ) . toHaveLength ( 0 ) ;
3164- expect ( fulfilled ) . toHaveLength ( N ) ;
3165- for ( const r of fulfilled ) {
3166- expect ( r . value . id ) . toBe ( seed . id ) ;
3167- }
3168-
3169- // Only one row, regardless of contention path.
3170- const runs = await prisma . taskRun . findMany ( {
3171- where : { taskIdentifier, runtimeEnvironmentId : authenticatedEnvironment . id } ,
3172- } ) ;
3173- expect ( runs . length ) . toBe ( 1 ) ;
3174-
3175- console . log (
3176- `[stress fixed=${ fixed } ] N=${ N } duration=${ durationMs . toFixed (
3177- 0
3178- ) } ms taskRun.update=${ updateCount } `
3179- ) ;
3180-
3181- if ( fixed ) {
3182- // With fast-path + quantization: the herd collapses onto the
3183- // same quantized newDelayUntil. Trigger #1 takes the lock and
3184- // updates delayUntil; every subsequent trigger sees a covering
3185- // delayUntil on the unlocked read and short-circuits. So at
3186- // most one update lands on the run row.
3187- expect ( updateCount ) . toBeLessThanOrEqual ( 1 ) ;
3188- }
3189- } finally {
3190- ( prisma . taskRun as unknown as { update : typeof originalUpdate } ) . update =
3191- originalUpdate ;
3145+ } ,
3146+ prisma
3147+ )
3148+ ) ;
3149+
3150+ const start = performance . now ( ) ;
3151+ const settled = await Promise . allSettled ( triggers ) ;
3152+ const durationMs = performance . now ( ) - start ;
3153+
3154+ const fulfilled = settled . filter (
3155+ ( r ) : r is PromiseFulfilledResult < { id : string } > => r . status === "fulfilled"
3156+ ) ;
3157+ const rejected = settled . filter ( ( r ) => r . status === "rejected" ) ;
3158+
3159+ // No 5xx feedback loop: every concurrent trigger succeeds and
3160+ // returns the existing run id.
3161+ expect ( rejected ) . toHaveLength ( 0 ) ;
3162+ expect ( fulfilled ) . toHaveLength ( N ) ;
3163+ for ( const r of fulfilled ) {
3164+ expect ( r . value . id ) . toBe ( seed . id ) ;
3165+ }
3166+
3167+ // Only one row, regardless of contention path.
3168+ const runs = await prisma . taskRun . findMany ( {
3169+ where : { taskIdentifier, runtimeEnvironmentId : authenticatedEnvironment . id } ,
3170+ } ) ;
3171+ expect ( runs . length ) . toBe ( 1 ) ;
3172+
3173+ // Wait briefly for any in-flight reschedule events to flush before
3174+ // asserting on the count. EventBus emit is synchronous here but
3175+ // settle a microtask just to be safe.
3176+ await new Promise ( ( resolve ) => setImmediate ( resolve ) ) ;
3177+
3178+ console . log (
3179+ `[stress fixed=${ fixed } ] N=${ N } duration=${ durationMs . toFixed (
3180+ 0
3181+ ) } ms reschedules=${ rescheduleCount } `
3182+ ) ;
3183+
3184+ if ( fixed ) {
3185+ // With fast-path + quantization: the herd collapses onto the
3186+ // same quantized newDelayUntil. Trigger #1 takes the lock and
3187+ // pushes delayUntil; every subsequent trigger sees a covering
3188+ // delayUntil on the unlocked read and short-circuits without
3189+ // emitting a reschedule. So at most one reschedule fires.
3190+ expect ( rescheduleCount ) . toBeLessThanOrEqual ( 1 ) ;
31923191 }
31933192 } finally {
31943193 await engine . quit ( ) ;
0 commit comments