@@ -15,6 +15,7 @@ import { nanoid } from "nanoid";
1515import { SystemResources } from "./systems.js" ;
1616import { ExecutionSnapshotSystem , getLatestExecutionSnapshot } from "./executionSnapshotSystem.js" ;
1717import { DelayedRunSystem } from "./delayedRunSystem.js" ;
18+ import { LockAcquisitionTimeoutError } from "../locking.js" ;
1819
1920export type DebounceOptions = {
2021 key : string ;
@@ -45,6 +46,17 @@ export type DebounceSystemOptions = {
4546 executionSnapshotSystem : ExecutionSnapshotSystem ;
4647 delayedRunSystem : DelayedRunSystem ;
4748 maxDebounceDurationMs : number ;
49+ /**
50+ * Bucket size in milliseconds used to quantize the newly computed `delayUntil`.
51+ * Set to 0 to disable quantization.
52+ */
53+ quantizeNewDelayUntilMs ?: number ;
54+ /**
55+ * When true, read the existing run's `delayUntil` outside the redlock and
56+ * short-circuit if the new (quantized) `delayUntil` is not later than the
57+ * current one.
58+ */
59+ fastPathSkipEnabled ?: boolean ;
4860} ;
4961
5062export type DebounceResult =
@@ -89,6 +101,8 @@ export class DebounceSystem {
89101 private readonly executionSnapshotSystem : ExecutionSnapshotSystem ;
90102 private readonly delayedRunSystem : DelayedRunSystem ;
91103 private readonly maxDebounceDurationMs : number ;
104+ private readonly quantizeNewDelayUntilMs : number ;
105+ private readonly fastPathSkipEnabled : boolean ;
92106
93107 constructor ( options : DebounceSystemOptions ) {
94108 this . $ = options . resources ;
@@ -106,6 +120,8 @@ export class DebounceSystem {
106120 this . executionSnapshotSystem = options . executionSnapshotSystem ;
107121 this . delayedRunSystem = options . delayedRunSystem ;
108122 this . maxDebounceDurationMs = options . maxDebounceDurationMs ;
123+ this . quantizeNewDelayUntilMs = Math . max ( 0 , options . quantizeNewDelayUntilMs ?? 1000 ) ;
124+ this . fastPathSkipEnabled = options . fastPathSkipEnabled ?? true ;
109125
110126 this . #registerCommands( ) ;
111127 }
@@ -450,9 +466,238 @@ return 0
450466 debounce : DebounceOptions ;
451467 tx ?: PrismaClientOrTransaction ;
452468 } ) : Promise < DebounceResult > {
453- return await this . $ . runLock . lock ( "handleDebounce" , [ existingRunId ] , async ( ) => {
454- const prisma = tx ?? this . $ . prisma ;
469+ const prisma = tx ?? this . $ . prisma ;
470+
471+ // Compute the (quantized) target delayUntil up-front, before taking any lock.
472+ // Quantizing to e.g. 1s buckets collapses many concurrent triggers on the same
473+ // hot debounce key onto the same target time, so the unlocked fast-path skip
474+ // below becomes effective and the redlock is not contended.
475+ const newDelayUntil = this . #computeQuantizedDelayUntil( debounce . delay ) ;
476+
477+ // Fast-path: read the current delayUntil outside the redlock and short-circuit
478+ // if our (quantized) newDelayUntil isn't later than what's already scheduled.
479+ // Safe because debounce is monotonic-forward only: a stale read either matches
480+ // reality or undershoots, both of which decay correctly (re-checked properly
481+ // inside the lock by whoever is actually pushing forward).
482+ if ( this . fastPathSkipEnabled && newDelayUntil ) {
483+ const fastPathResult = await this . #tryFastPathSkip( {
484+ existingRunId,
485+ newDelayUntil,
486+ debounce,
487+ prisma,
488+ } ) ;
489+ if ( fastPathResult ) {
490+ return fastPathResult ;
491+ }
492+ }
493+
494+ try {
495+ return await this . $ . runLock . lock ( "handleDebounce" , [ existingRunId ] , async ( ) => {
496+ return await this . #handleExistingRunLocked( {
497+ existingRunId,
498+ redisKey,
499+ environmentId,
500+ taskIdentifier,
501+ debounce,
502+ newDelayUntil,
503+ prisma,
504+ tx,
505+ } ) ;
506+ } ) ;
507+ } catch ( error ) {
508+ // Lock contention safety net: if we couldn't take the lock (redlock quorum
509+ // failure or our retry budget exhausted), fall in line with whoever is
510+ // actually updating the run instead of bubbling a 5xx to the SDK and
511+ // amplifying the herd via SDK retries. Debounce is best-effort - dropping
512+ // our contribution to delayUntil here is fine, the herd is updating it for
513+ // us.
514+ if ( this . #isLockContentionError( error ) ) {
515+ return await this . #handleLockContentionFallback( {
516+ existingRunId,
517+ debounce,
518+ error,
519+ prisma,
520+ } ) ;
521+ }
522+ throw error ;
523+ }
524+ }
525+
526+ /**
527+ * Parses the debounce delay and (optionally) quantizes it to a bucket boundary
528+ * by flooring the absolute timestamp. Quantization makes concurrent triggers on
529+ * the same key share a target time, which is what makes the unlocked fast-path
530+ * skip effective.
531+ */
532+ #computeQuantizedDelayUntil( delay : string ) : Date | null {
533+ const parsed = parseNaturalLanguageDuration ( delay ) ;
534+ if ( ! parsed ) {
535+ return null ;
536+ }
537+ if ( this . quantizeNewDelayUntilMs <= 0 ) {
538+ return parsed ;
539+ }
540+ const bucket = this . quantizeNewDelayUntilMs ;
541+ const quantized = Math . floor ( parsed . getTime ( ) / bucket ) * bucket ;
542+ return new Date ( quantized ) ;
543+ }
544+
545+ #isLockContentionError( error : unknown ) : boolean {
546+ if ( ! ( error instanceof Error ) ) return false ;
547+ return (
548+ error instanceof LockAcquisitionTimeoutError ||
549+ error . name === "LockAcquisitionTimeoutError" ||
550+ error . name === "ExecutionError" ||
551+ error . name === "ResourceLockedError"
552+ ) ;
553+ }
554+
555+ /**
556+ * Reads `delayUntil`/`status`/`createdAt` outside the redlock and
557+ * short-circuits if the existing scheduled time already covers our target.
558+ * Skips trailing-mode triggers that carry `updateData` since those still need
559+ * the lock to apply their data update. Also falls through when the run has
560+ * already exceeded its max debounce duration so the locked path can return
561+ * `max_duration_exceeded` and let the caller create a new run.
562+ */
563+ async #tryFastPathSkip( {
564+ existingRunId,
565+ newDelayUntil,
566+ debounce,
567+ prisma,
568+ } : {
569+ existingRunId : string ;
570+ newDelayUntil : Date ;
571+ debounce : DebounceOptions ;
572+ prisma : PrismaClientOrTransaction ;
573+ } ) : Promise < DebounceResult | null > {
574+ // Trailing mode with updateData still needs the lock so the data update is
575+ // applied; only short-circuit when there's nothing to update.
576+ if ( debounce . mode === "trailing" && debounce . updateData ) {
577+ return null ;
578+ }
579+
580+ const probe = await prisma . taskRun . findFirst ( {
581+ where : { id : existingRunId } ,
582+ select : { status : true , delayUntil : true , createdAt : true } ,
583+ } ) ;
584+ if ( ! probe || probe . status !== "DELAYED" || ! probe . delayUntil ) {
585+ return null ;
586+ }
587+ if ( newDelayUntil . getTime ( ) > probe . delayUntil . getTime ( ) ) {
588+ return null ;
589+ }
590+
591+ // Fall through to the lock path when newDelayUntil would exceed the run's
592+ // max debounce window so the caller can return max_duration_exceeded and
593+ // create a fresh run.
594+ let maxDurationMs = this . maxDebounceDurationMs ;
595+ if ( debounce . maxDelay ) {
596+ const parsedMaxDelay = parseNaturalLanguageDurationInMs ( debounce . maxDelay ) ;
597+ if ( parsedMaxDelay !== undefined ) {
598+ maxDurationMs = parsedMaxDelay ;
599+ }
600+ }
601+ const maxDelayUntilMs = probe . createdAt . getTime ( ) + maxDurationMs ;
602+ if ( newDelayUntil . getTime ( ) > maxDelayUntilMs ) {
603+ return null ;
604+ }
605+
606+ const fullRun = await prisma . taskRun . findFirst ( {
607+ where : { id : existingRunId } ,
608+ include : { associatedWaitpoint : true } ,
609+ } ) ;
610+ if ( ! fullRun || fullRun . status !== "DELAYED" ) {
611+ return null ;
612+ }
613+
614+ this . $ . logger . debug ( "handleExistingRun: fast-path skip, existing delayUntil already covers" , {
615+ existingRunId,
616+ debounceKey : debounce . key ,
617+ newDelayUntil,
618+ currentDelayUntil : fullRun . delayUntil ,
619+ } ) ;
620+
621+ return {
622+ status : "existing" ,
623+ run : fullRun ,
624+ waitpoint : fullRun . associatedWaitpoint ,
625+ } ;
626+ }
627+
628+ async #handleLockContentionFallback( {
629+ existingRunId,
630+ debounce,
631+ error,
632+ prisma,
633+ } : {
634+ existingRunId : string ;
635+ debounce : DebounceOptions ;
636+ error : unknown ;
637+ prisma : PrismaClientOrTransaction ;
638+ } ) : Promise < DebounceResult > {
639+ const fullRun = await prisma . taskRun . findFirst ( {
640+ where : { id : existingRunId } ,
641+ include : { associatedWaitpoint : true } ,
642+ } ) ;
455643
644+ if ( ! fullRun || fullRun . status !== "DELAYED" ) {
645+ // The run is no longer in a state we can safely return as "existing" -
646+ // re-throw so the caller surfaces the failure rather than silently
647+ // succeeding on a stale/terminated run.
648+ this . $ . logger . warn (
649+ "handleExistingRun: lock contention, but existing run no longer DELAYED - rethrowing" ,
650+ {
651+ existingRunId,
652+ debounceKey : debounce . key ,
653+ status : fullRun ?. status ,
654+ }
655+ ) ;
656+ throw error ;
657+ }
658+
659+ this . $ . logger . warn (
660+ "handleExistingRun: lock contention, returning existing run without rescheduling" ,
661+ {
662+ existingRunId,
663+ debounceKey : debounce . key ,
664+ currentDelayUntil : fullRun . delayUntil ,
665+ error : error instanceof Error ? error . message : String ( error ) ,
666+ errorName : error instanceof Error ? error . name : undefined ,
667+ }
668+ ) ;
669+
670+ return {
671+ status : "existing" ,
672+ run : fullRun ,
673+ waitpoint : fullRun . associatedWaitpoint ,
674+ } ;
675+ }
676+
677+ /**
678+ * Body of `handleExistingRun` that runs while holding the redlock on the run.
679+ * Receives the (possibly quantized) `newDelayUntil` precomputed by the caller.
680+ */
681+ async #handleExistingRunLocked( {
682+ existingRunId,
683+ redisKey,
684+ environmentId,
685+ taskIdentifier,
686+ debounce,
687+ newDelayUntil,
688+ prisma,
689+ tx,
690+ } : {
691+ existingRunId : string ;
692+ redisKey : string ;
693+ environmentId : string ;
694+ taskIdentifier : string ;
695+ debounce : DebounceOptions ;
696+ newDelayUntil : Date | null ;
697+ prisma : PrismaClientOrTransaction ;
698+ tx ?: PrismaClientOrTransaction ;
699+ } ) : Promise < DebounceResult > {
700+ {
456701 // Get the latest execution snapshot
457702 let snapshot ;
458703 try {
@@ -514,8 +759,6 @@ return 0
514759 } ) ;
515760 }
516761
517- // Calculate new delay - parseNaturalLanguageDuration returns a Date (now + duration)
518- const newDelayUntil = parseNaturalLanguageDuration ( debounce . delay ) ;
519762 if ( ! newDelayUntil ) {
520763 this . $ . logger . error ( "handleExistingRun: invalid delay duration" , {
521764 delay : debounce . delay ,
@@ -619,7 +862,7 @@ return 0
619862 run : updatedRun ,
620863 waitpoint : existingRun . associatedWaitpoint ,
621864 } ;
622- } ) ;
865+ }
623866 }
624867
625868 /**
0 commit comments