From 33c3aade180533ce540092bcfd54adbcf4c89bad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Thu, 14 May 2026 19:27:01 +0200 Subject: [PATCH 01/12] fix: reject in-flight task promises when worker crashes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a worker crashes (onerror event), tasks already dispatched to it had their promises stored in promiseResponseMap but were never settled. The error handler only restarted the worker and redistributed queued tasks — in-flight tasks were silently dropped, causing pool.execute() callers to hang indefinitely. The fix iterates promiseResponseMap on worker error, finds all entries targeting the crashed worker by stable workerId, and rejects them with a descriptive error before termination. Changes: - Replace workerNodeKey (array index) with stable workerId in PromiseResponseWrapper — indices become stale after removeWorkerNode - Add handleWorkerNodeCrash() to unify crash recovery logic - Add rejectInFlightTaskPromises() and rejectRemainingQueuedTaskPromises() - Add updatePromiseResponseWorkerId() for task steal/redistribute tracking - Resolve workerNodeKey dynamically from message.workerId at response time - Gate exit handler worker restart on restartWorkerOnError option - Add crash worker test and regression test for promise rejection Port of poolifier/poolifier#3211 --- src/pools/abstract-pool.ts | 173 ++++++++++++++++++---- src/utility-types.ts | 4 +- tests/pools/thread/fixed.test.mjs | 48 +++++- tests/worker-files/thread/crashWorker.mjs | 18 +++ 4 files changed, 211 insertions(+), 32 deletions(-) create mode 100644 tests/worker-files/thread/crashWorker.mjs diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 2046f19e..5c9e3e21 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1187,7 +1187,7 @@ export abstract class AbstractPool< this.promiseResponseMap.set(task.taskId!, { reject, resolve, - workerNodeKey, + workerId: this.workerNodes[workerNodeKey].info.id, abortSignal, }) if ( @@ -1672,22 +1672,8 @@ export abstract class AbstractPool< ) } workerNode.worker.onerror = (errorEvent) => { - workerNode.info.ready = false - this.eventTarget?.dispatchEvent( - new ErrorEvent(PoolEvents.error, errorEvent), - ) - if (this.started && !this.destroying) { - if (this.opts.restartWorkerOnError === true) { - if (workerNode.info.dynamic) { - this.createAndSetupDynamicWorkerNode() - } else if (!this.startingMinimumNumberOfWorkers) { - this.startMinimumNumberOfWorkers(true) - } - } - if (this.opts.enableTasksQueue === true) { - this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode)) - } - } + errorEvent.preventDefault() + this.handleWorkerNodeCrash(workerNode, errorEvent) workerNode?.terminate() } workerNode.worker.addEventListener( @@ -1705,7 +1691,8 @@ export abstract class AbstractPool< if ( this.started && !this.startingMinimumNumberOfWorkers && - !this.destroying + !this.destroying && + this.opts.restartWorkerOnError === true ) { this.startMinimumNumberOfWorkers(true) } @@ -1916,10 +1903,99 @@ export abstract class AbstractPool< if (destinationWorkerNodeKey === -1) { break } - this.handleTask( - destinationWorkerNodeKey, - this.dequeueTask(sourceWorkerNodeKey)!, - ) + const task = this.dequeueTask(sourceWorkerNodeKey)! + this.handleTask(destinationWorkerNodeKey, task) + this.updatePromiseResponseWorkerId(task.taskId, destinationWorkerNodeKey) + } + } + + /** + * Rejects in-flight task promises for the given crashed worker node key. + * @param workerNodeKey - The worker node key. + * @param errorEvent - The error event that caused the worker to crash. + */ + private rejectInFlightTaskPromises( + workerNodeKey: number, + errorEvent: ErrorEvent, + ): void { + if (workerNodeKey === -1) { + return + } + const workerNode = this.workerNodes[workerNodeKey] + const crashedWorkerId = workerNode.info.id + if (crashedWorkerId == null) { + return + } + const queuedTaskIds = new Set< + `${string}-${string}-${string}-${string}-${string}` + >() + for (const task of workerNode.tasksQueue) { + queuedTaskIds.add(task.taskId!) + } + const crashError = new Error( + `Worker node crashed with error: '${errorEvent.message}'`, + ) + for (const [taskId, promiseResponse] of this.promiseResponseMap) { + if ( + promiseResponse.workerId === crashedWorkerId && + !queuedTaskIds.has(taskId) + ) { + promiseResponse.reject(crashError) + this.promiseResponseMap.delete(taskId) + workerNode.dispatchEvent(new Event('taskFinished')) + } + } + this.checkAndEmitTaskExecutionFinishedEvents() + } + + /** + * Rejects remaining queued task promises for the given crashed worker node key. + * @param workerNodeKey - The worker node key. + * @param errorEvent - The error event that caused the worker to crash. + */ + private rejectRemainingQueuedTaskPromises( + workerNodeKey: number, + errorEvent: ErrorEvent, + ): void { + if (workerNodeKey === -1) { + return + } + const workerNode = this.workerNodes[workerNodeKey] + if (this.tasksQueueSize(workerNodeKey) === 0) { + return + } + const crashError = new Error( + `Worker node crashed with error: '${errorEvent.message}'`, + ) + while (this.tasksQueueSize(workerNodeKey) > 0) { + const task = this.dequeueTask(workerNodeKey) + if (task?.taskId != null) { + const promiseResponse = this.promiseResponseMap.get(task.taskId) + if (promiseResponse != null) { + promiseResponse.reject(crashError) + this.promiseResponseMap.delete(task.taskId) + workerNode.dispatchEvent(new Event('taskFinished')) + } + } + } + this.checkAndEmitTaskExecutionFinishedEvents() + } + + /** + * Updates the promise response worker id after task steal or redistribute. + * Ensures crash-time rejection targets the correct worker. + * @param taskId - The task id. + * @param workerNodeKey - The destination worker node key. + */ + private updatePromiseResponseWorkerId( + taskId: `${string}-${string}-${string}-${string}-${string}` | undefined, + workerNodeKey: number, + ): void { + if (taskId != null) { + const promiseResponse = this.promiseResponseMap.get(taskId) + if (promiseResponse != null) { + promiseResponse.workerId = this.workerNodes[workerNodeKey].info.id + } } } @@ -2020,6 +2096,10 @@ export abstract class AbstractPool< sourceWorkerNode.info.stolen = false destinationWorkerNode.info.stealing = false this.handleTask(destinationWorkerNodeKey, stolenTask) + this.updatePromiseResponseWorkerId( + stolenTask.taskId, + destinationWorkerNodeKey, + ) this.updateTaskStolenStatisticsWorkerUsage( destinationWorkerNodeKey, stolenTask.name!, @@ -2038,6 +2118,35 @@ export abstract class AbstractPool< ) } + /** + * Handles a crashed worker node: emits error, rejects in-flight promises, + * restarts the worker, and redistributes queued tasks. + * @param workerNode - The crashed worker node. + * @param errorEvent - The error event that caused the crash. + */ + private handleWorkerNodeCrash( + workerNode: IWorkerNode, + errorEvent: ErrorEvent, + ): void { + workerNode.info.ready = false + this.eventTarget?.dispatchEvent( + new ErrorEvent(PoolEvents.error, errorEvent), + ) + const crashedWorkerNodeKey = this.workerNodes.indexOf(workerNode) + this.rejectInFlightTaskPromises(crashedWorkerNodeKey, errorEvent) + if (this.started && !this.destroying) { + if (this.opts.restartWorkerOnError === true) { + if (workerNode.info.dynamic) { + this.createAndSetupDynamicWorkerNode() + } + } + if (this.opts.enableTasksQueue === true) { + this.redistributeQueuedTasks(crashedWorkerNodeKey) + this.rejectRemainingQueuedTaskPromises(crashedWorkerNodeKey, errorEvent) + } + } + } + private readonly handleWorkerNodeIdleEvent = ( event: CustomEvent, previousStolenTask?: Task, @@ -2230,11 +2339,13 @@ export abstract class AbstractPool< } private handleTaskExecutionResponse(message: MessageValue): void { - const { name, taskId, workerError, data } = message + const { name, taskId, workerError, data, workerId } = message const promiseResponse = this.promiseResponseMap.get(taskId!) if (promiseResponse != null) { - const { resolve, reject, workerNodeKey } = promiseResponse - const workerNode = this.workerNodes[workerNodeKey] + const { resolve, reject } = promiseResponse + const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + const workerNode = + workerNodeKey !== -1 ? this.workerNodes[workerNodeKey] : undefined if (workerError != null) { this.eventTarget?.dispatchEvent( new ErrorEvent(PoolEvents.taskError, { error: workerError }), @@ -2248,12 +2359,18 @@ export abstract class AbstractPool< } else { resolve(data!) } - this.afterTaskExecutionHook(workerNodeKey, message) + if (workerNodeKey !== -1) { + this.afterTaskExecutionHook(workerNodeKey, message) + } queueMicrotask(() => { workerNode?.dispatchEvent(new Event('taskFinished')) this.promiseResponseMap.delete(taskId!) this.checkAndEmitTaskExecutionFinishedEvents() - if (this.opts.enableTasksQueue === true && !this.destroying) { + if ( + workerNodeKey !== -1 && + this.opts.enableTasksQueue === true && + !this.destroying + ) { if ( !this.isWorkerNodeBusy(workerNodeKey) && this.tasksQueueSize(workerNodeKey) > 0 @@ -2264,7 +2381,7 @@ export abstract class AbstractPool< ) } if (this.isWorkerNodeIdle(workerNodeKey)) { - workerNode.dispatchEvent( + workerNode?.dispatchEvent( new CustomEvent('idle', { detail: { workerNodeKey }, }), diff --git a/src/utility-types.ts b/src/utility-types.ts index 6b1de900..e7431d55 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -233,9 +233,9 @@ export interface PromiseResponseWrapper { */ readonly reject: (reason?: unknown) => void /** - * The worker node key executing the task. + * The worker id executing the task. */ - readonly workerNodeKey: number + workerId: `${string}-${string}-${string}-${string}-${string}` | undefined } /** diff --git a/tests/pools/thread/fixed.test.mjs b/tests/pools/thread/fixed.test.mjs index 5fa92fb3..cec8072d 100644 --- a/tests/pools/thread/fixed.test.mjs +++ b/tests/pools/thread/fixed.test.mjs @@ -18,7 +18,8 @@ describe({ echoPool, errorPool, asyncErrorPool, - asyncPool + asyncPool, + crashPool before(() => { pool = new FixedThreadPool( @@ -68,6 +69,18 @@ describe({ numberOfThreads, new URL('./../../worker-files/thread/asyncWorker.mjs', import.meta.url), ) + crashPool = new FixedThreadPool( + 1, + new URL( + './../../worker-files/thread/crashWorker.mjs', + import.meta.url, + ), + { + enableTasksQueue: true, + restartWorkerOnError: false, + tasksQueueOptions: { concurrency: 1 }, + }, + ) }) after(async () => { @@ -78,6 +91,7 @@ describe({ await asyncErrorPool.destroy() await emptyPool.destroy() await queuePool.destroy() + await crashPool.destroy() }) it('Verify that the function is executed in a worker thread', async () => { @@ -148,7 +162,14 @@ describe({ expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual( numberOfThreads * maxMultiplier, ) - expect(workerNode.usage.tasks.executed).toBe(maxMultiplier) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual( + queuePool.opts.tasksQueueOptions.concurrency, + ) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + numberOfThreads * + (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency) + + queuePool.opts.tasksQueueOptions.concurrency, + ) expect(workerNode.usage.tasks.queued).toBe(0) expect(workerNode.usage.tasks.maxQueued).toBe( maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency, @@ -292,6 +313,29 @@ describe({ expect(error.stack).toBeDefined() }) + it('Verify that in-flight task promises reject on worker crash', async () => { + let poolError + const errorHandler = (e) => { + poolError = e + } + crashPool.eventTarget.addEventListener(PoolEvents.error, errorHandler, { + once: true, + }) + const exitPromise = waitWorkerNodeEvents(crashPool, 'exit', 1) + let error + try { + await crashPool.execute() + } catch (e) { + error = e + } + expect(error).toBeInstanceOf(Error) + expect(error.message).toMatch(/Worker node crashed with error:/) + expect(error.message).toMatch(/Simulated worker crash/) + expect(poolError).toBeInstanceOf(ErrorEvent) + expect(poolError.message).toMatch(/Simulated worker crash/) + await exitPromise + }) + it('Verify that async function is working properly', async () => { const data = { f: 10 } const startTime = performance.now() diff --git a/tests/worker-files/thread/crashWorker.mjs b/tests/worker-files/thread/crashWorker.mjs new file mode 100644 index 00000000..9191d816 --- /dev/null +++ b/tests/worker-files/thread/crashWorker.mjs @@ -0,0 +1,18 @@ +import { KillBehaviors, ThreadWorker } from '../../../src/mod.ts' + +/** + * Worker that simulates a crash via an unhandled exception during task execution. + * The async function never resolves, keeping the task in-flight while the scheduled + * throw kills the worker thread and triggers the 'error' event on the parent. + */ +async function crash() { + setTimeout(() => { + throw new Error('Simulated worker crash') + }, 10) + await new Promise(() => {}) +} + +export default new ThreadWorker(crash, { + killBehavior: KillBehaviors.HARD, + maxInactiveTime: 500, +}) From d27cda6aeb2a4da35e98b4940082abacd5a8602d Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Thu, 14 May 2026 17:27:33 +0000 Subject: [PATCH 02/12] [autofix.ci] apply automated fixes --- src/pools/abstract-pool.ts | 5 +++-- tests/pools/thread/fixed.test.mjs | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 5c9e3e21..1309616d 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -2344,8 +2344,9 @@ export abstract class AbstractPool< if (promiseResponse != null) { const { resolve, reject } = promiseResponse const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) - const workerNode = - workerNodeKey !== -1 ? this.workerNodes[workerNodeKey] : undefined + const workerNode = workerNodeKey !== -1 + ? this.workerNodes[workerNodeKey] + : undefined if (workerError != null) { this.eventTarget?.dispatchEvent( new ErrorEvent(PoolEvents.taskError, { error: workerError }), diff --git a/tests/pools/thread/fixed.test.mjs b/tests/pools/thread/fixed.test.mjs index cec8072d..ea1a92e3 100644 --- a/tests/pools/thread/fixed.test.mjs +++ b/tests/pools/thread/fixed.test.mjs @@ -167,7 +167,7 @@ describe({ ) expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( numberOfThreads * - (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency) + + (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency) + queuePool.opts.tasksQueueOptions.concurrency, ) expect(workerNode.usage.tasks.queued).toBe(0) From e252032b4fa85078120daf389a600b818cfd8483 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Thu, 14 May 2026 22:49:26 +0200 Subject: [PATCH 03/12] fix: add crashHandled flag, improve rejection logic and harden guards - Add crashHandled flag to WorkerInfo to prevent double crash handling between onerror and exit handlers - Move rejectRemainingQueuedTaskPromises outside started/destroying guard (queued promises must always be settled regardless of pool state) - Handle workerId == null case in rejectInFlightTaskPromises (crash before worker ID assignment) - Add { cause } to crash Error constructors for error chain traceability - Update task statistics (executing/failed) on crash rejection - Add workerNodeKey === -1 guards in updatePromiseResponseWorkerId and handleWorkerReadyResponse - Exit handler: detect unexpected exit via crashHandled flag, condition restart on restartWorkerOnError or normal exit - Tighten test bounds for stolen/sequentiallyStolen task counts Aligns with poolifier/poolifier#3211 latest changes --- .prettierrc.json | 7 ++++ biome.json | 51 +++++++++++++++++++++++++++ src/pools/abstract-pool.ts | 57 ++++++++++++++++++++++++++++--- src/pools/utils.ts | 1 + src/pools/worker.ts | 5 +++ tests/pools/thread/fixed.test.mjs | 9 +++-- 6 files changed, 123 insertions(+), 7 deletions(-) create mode 100644 .prettierrc.json create mode 100644 biome.json diff --git a/.prettierrc.json b/.prettierrc.json new file mode 100644 index 00000000..de624450 --- /dev/null +++ b/.prettierrc.json @@ -0,0 +1,7 @@ +{ + "$schema": "https://json.schemastore.org/prettierrc", + "arrowParens": "avoid", + "singleQuote": true, + "semi": false, + "trailingComma": "es5" +} diff --git a/biome.json b/biome.json new file mode 100644 index 00000000..24c40748 --- /dev/null +++ b/biome.json @@ -0,0 +1,51 @@ +{ + "$schema": "https://biomejs.dev/schemas/2.4.13/schema.json", + "assist": { + "actions": { + "source": { + "organizeImports": "on" + } + } + }, + "linter": { + "enabled": true, + "rules": { + "recommended": true, + "style": { + "noParameterAssign": "error", + "useAsConstAssertion": "error", + "useDefaultParameterLast": "error", + "useEnumInitializers": "error", + "useSelfClosingElements": "error", + "useSingleVarDeclarator": "error", + "noUnusedTemplateLiteral": "error", + "useNumberNamespace": "error", + "noInferrableTypes": "error", + "noUselessElse": "error" + } + } + }, + "formatter": { + "indentStyle": "space", + "indentWidth": 2 + }, + "javascript": { + "formatter": { + "arrowParentheses": "asNeeded", + "quoteStyle": "single", + "semicolons": "asNeeded", + "trailingCommas": "es5" + } + }, + "json": { + "parser": { + "allowComments": true + } + }, + "vcs": { + "enabled": true, + "clientKind": "git", + "useIgnoreFile": true, + "defaultBranch": "master" + } +} diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 1309616d..1e0202c4 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1687,12 +1687,27 @@ export abstract class AbstractPool< workerNode.addEventListener( 'exit', () => { + const workerNodeKey = this.workerNodes.indexOf(workerNode) + if ( + workerNode.info.ready && + !workerNode.info.crashHandled && + workerNodeKey !== -1 && + !this.destroying + ) { + this.handleWorkerNodeCrash( + workerNode, + new ErrorEvent('error', { + message: 'Worker node exited unexpectedly', + }), + ) + } this.removeWorkerNode(workerNode) if ( this.started && !this.startingMinimumNumberOfWorkers && !this.destroying && - this.opts.restartWorkerOnError === true + (this.opts.restartWorkerOnError === true || + !workerNode.info.crashHandled) ) { this.startMinimumNumberOfWorkers(true) } @@ -1924,6 +1939,18 @@ export abstract class AbstractPool< const workerNode = this.workerNodes[workerNodeKey] const crashedWorkerId = workerNode.info.id if (crashedWorkerId == null) { + const crashError = new Error( + `Worker node crashed with error: '${errorEvent.message}'`, + { cause: errorEvent.error ?? errorEvent }, + ) + for (const [taskId, promiseResponse] of this.promiseResponseMap) { + if (promiseResponse.workerId == null) { + promiseResponse.reject(crashError) + this.promiseResponseMap.delete(taskId) + workerNode.dispatchEvent(new Event('taskFinished')) + } + } + this.checkAndEmitTaskExecutionFinishedEvents() return } const queuedTaskIds = new Set< @@ -1934,6 +1961,7 @@ export abstract class AbstractPool< } const crashError = new Error( `Worker node crashed with error: '${errorEvent.message}'`, + { cause: errorEvent.error ?? errorEvent }, ) for (const [taskId, promiseResponse] of this.promiseResponseMap) { if ( @@ -1942,6 +1970,10 @@ export abstract class AbstractPool< ) { promiseResponse.reject(crashError) this.promiseResponseMap.delete(taskId) + if (workerNode.usage.tasks.executing > 0) { + --workerNode.usage.tasks.executing + } + ++workerNode.usage.tasks.failed workerNode.dispatchEvent(new Event('taskFinished')) } } @@ -1966,6 +1998,7 @@ export abstract class AbstractPool< } const crashError = new Error( `Worker node crashed with error: '${errorEvent.message}'`, + { cause: errorEvent.error ?? errorEvent }, ) while (this.tasksQueueSize(workerNodeKey) > 0) { const task = this.dequeueTask(workerNodeKey) @@ -1974,6 +2007,7 @@ export abstract class AbstractPool< if (promiseResponse != null) { promiseResponse.reject(crashError) this.promiseResponseMap.delete(task.taskId) + ++workerNode.usage.tasks.failed workerNode.dispatchEvent(new Event('taskFinished')) } } @@ -1991,11 +2025,20 @@ export abstract class AbstractPool< taskId: `${string}-${string}-${string}-${string}-${string}` | undefined, workerNodeKey: number, ): void { - if (taskId != null) { + if (taskId == null || workerNodeKey === -1) { + return + } + const workerNode = this.workerNodes[workerNodeKey] + if (workerNode.info.id == null) { const promiseResponse = this.promiseResponseMap.get(taskId) if (promiseResponse != null) { - promiseResponse.workerId = this.workerNodes[workerNodeKey].info.id + promiseResponse.workerId = undefined } + return + } + const promiseResponse = this.promiseResponseMap.get(taskId) + if (promiseResponse != null) { + promiseResponse.workerId = workerNode.info.id } } @@ -2129,6 +2172,7 @@ export abstract class AbstractPool< errorEvent: ErrorEvent, ): void { workerNode.info.ready = false + workerNode.info.crashHandled = true this.eventTarget?.dispatchEvent( new ErrorEvent(PoolEvents.error, errorEvent), ) @@ -2142,9 +2186,11 @@ export abstract class AbstractPool< } if (this.opts.enableTasksQueue === true) { this.redistributeQueuedTasks(crashedWorkerNodeKey) - this.rejectRemainingQueuedTaskPromises(crashedWorkerNodeKey, errorEvent) } } + if (this.opts.enableTasksQueue === true) { + this.rejectRemainingQueuedTaskPromises(crashedWorkerNodeKey, errorEvent) + } } private readonly handleWorkerNodeIdleEvent = ( @@ -2330,6 +2376,9 @@ export abstract class AbstractPool< ) } const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + if (workerNodeKey === -1) { + return + } const workerNode = this.workerNodes[workerNodeKey] workerNode.info.ready = ready workerNode.info.taskFunctionsProperties = taskFunctionsProperties diff --git a/src/pools/utils.ts b/src/pools/utils.ts index 4993f7b8..7daeefe8 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -530,6 +530,7 @@ export const initWorkerInfo = (worker: IWorker): WorkerInfo => { backPressure: false, backPressureStealing: false, continuousStealing: false, + crashHandled: false, queuedTaskAbortion: false, dynamic: false, id: getWorkerId(worker), diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 96aa1dc0..6a7461fd 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -164,6 +164,11 @@ export interface WorkerInfo { * This flag is set to `true` when worker node is continuously stealing tasks from other worker nodes. */ continuousStealing: boolean + /** + * Crash handled flag. + * This flag is set to `true` when worker node crash has been handled. + */ + crashHandled: boolean /** * Back pressure stealing flag. * This flag is set to `true` when worker node is stealing one task from another back pressured worker node. diff --git a/tests/pools/thread/fixed.test.mjs b/tests/pools/thread/fixed.test.mjs index ea1a92e3..e096b552 100644 --- a/tests/pools/thread/fixed.test.mjs +++ b/tests/pools/thread/fixed.test.mjs @@ -178,18 +178,21 @@ describe({ workerNode.usage.tasks.sequentiallyStolen, ).toBeGreaterThanOrEqual(0) expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual( - numberOfThreads * maxMultiplier, + numberOfThreads * + (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency), ) expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0) expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual( - numberOfThreads * maxMultiplier, + numberOfThreads * + (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency), ) } expect(queuePool.info.executedTasks).toBe(numberOfThreads * maxMultiplier) expect(queuePool.info.backPressure).toBe(false) expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0) expect(queuePool.info.stolenTasks).toBeLessThanOrEqual( - numberOfThreads * maxMultiplier, + numberOfThreads * + (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency), ) }) From 76d649cc9ca2d8a316f7ac7a652dbd3670a1067c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Thu, 14 May 2026 23:00:17 +0200 Subject: [PATCH 04/12] fix: harden crash-handling guards and eliminate DRY violations - Guard getWorkerNodeKeyByWorkerId against undefined workerId to prevent erroneous matching of uninitialized worker nodes - Call updatePromiseResponseWorkerId BEFORE handleTask in redistribute and stealTask for correctness-by-construction (prevents stale workerId if task response arrives synchronously) - Construct crashError once in handleWorkerNodeCrash and pass as param to rejectInFlightTaskPromises/rejectRemainingQueuedTaskPromises (DRY) - Add executing--/failed++ stats in rejectInFlightTaskPromises null-path for consistency with the non-null path - Fix handleWorkerNodeCrash JSDoc to accurately describe behavior --- src/pools/abstract-pool.ts | 42 +++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 1e0202c4..e004fb12 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -559,6 +559,9 @@ export abstract class AbstractPool< * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise. */ private getWorkerNodeKeyByWorkerId(workerId: string | undefined): number { + if (workerId == null) { + return -1 + } return this.workerNodes.findIndex( (workerNode) => workerNode.info.id === workerId, ) @@ -1919,19 +1922,19 @@ export abstract class AbstractPool< break } const task = this.dequeueTask(sourceWorkerNodeKey)! - this.handleTask(destinationWorkerNodeKey, task) this.updatePromiseResponseWorkerId(task.taskId, destinationWorkerNodeKey) + this.handleTask(destinationWorkerNodeKey, task) } } /** * Rejects in-flight task promises for the given crashed worker node key. * @param workerNodeKey - The worker node key. - * @param errorEvent - The error event that caused the worker to crash. + * @param crashError - The crash error to reject promises with. */ private rejectInFlightTaskPromises( workerNodeKey: number, - errorEvent: ErrorEvent, + crashError: Error, ): void { if (workerNodeKey === -1) { return @@ -1939,14 +1942,14 @@ export abstract class AbstractPool< const workerNode = this.workerNodes[workerNodeKey] const crashedWorkerId = workerNode.info.id if (crashedWorkerId == null) { - const crashError = new Error( - `Worker node crashed with error: '${errorEvent.message}'`, - { cause: errorEvent.error ?? errorEvent }, - ) for (const [taskId, promiseResponse] of this.promiseResponseMap) { if (promiseResponse.workerId == null) { promiseResponse.reject(crashError) this.promiseResponseMap.delete(taskId) + if (workerNode.usage.tasks.executing > 0) { + --workerNode.usage.tasks.executing + } + ++workerNode.usage.tasks.failed workerNode.dispatchEvent(new Event('taskFinished')) } } @@ -1959,10 +1962,6 @@ export abstract class AbstractPool< for (const task of workerNode.tasksQueue) { queuedTaskIds.add(task.taskId!) } - const crashError = new Error( - `Worker node crashed with error: '${errorEvent.message}'`, - { cause: errorEvent.error ?? errorEvent }, - ) for (const [taskId, promiseResponse] of this.promiseResponseMap) { if ( promiseResponse.workerId === crashedWorkerId && @@ -1983,11 +1982,11 @@ export abstract class AbstractPool< /** * Rejects remaining queued task promises for the given crashed worker node key. * @param workerNodeKey - The worker node key. - * @param errorEvent - The error event that caused the worker to crash. + * @param crashError - The crash error to reject promises with. */ private rejectRemainingQueuedTaskPromises( workerNodeKey: number, - errorEvent: ErrorEvent, + crashError: Error, ): void { if (workerNodeKey === -1) { return @@ -1996,10 +1995,6 @@ export abstract class AbstractPool< if (this.tasksQueueSize(workerNodeKey) === 0) { return } - const crashError = new Error( - `Worker node crashed with error: '${errorEvent.message}'`, - { cause: errorEvent.error ?? errorEvent }, - ) while (this.tasksQueueSize(workerNodeKey) > 0) { const task = this.dequeueTask(workerNodeKey) if (task?.taskId != null) { @@ -2138,11 +2133,11 @@ export abstract class AbstractPool< } sourceWorkerNode.info.stolen = false destinationWorkerNode.info.stealing = false - this.handleTask(destinationWorkerNodeKey, stolenTask) this.updatePromiseResponseWorkerId( stolenTask.taskId, destinationWorkerNodeKey, ) + this.handleTask(destinationWorkerNodeKey, stolenTask) this.updateTaskStolenStatisticsWorkerUsage( destinationWorkerNodeKey, stolenTask.name!, @@ -2163,7 +2158,8 @@ export abstract class AbstractPool< /** * Handles a crashed worker node: emits error, rejects in-flight promises, - * restarts the worker, and redistributes queued tasks. + * restarts dynamic workers if configured, and redistributes queued tasks. + * Static worker restart is handled by the exit event handler. * @param workerNode - The crashed worker node. * @param errorEvent - The error event that caused the crash. */ @@ -2177,7 +2173,11 @@ export abstract class AbstractPool< new ErrorEvent(PoolEvents.error, errorEvent), ) const crashedWorkerNodeKey = this.workerNodes.indexOf(workerNode) - this.rejectInFlightTaskPromises(crashedWorkerNodeKey, errorEvent) + const crashError = new Error( + `Worker node crashed with error: '${errorEvent.message}'`, + { cause: errorEvent.error ?? errorEvent }, + ) + this.rejectInFlightTaskPromises(crashedWorkerNodeKey, crashError) if (this.started && !this.destroying) { if (this.opts.restartWorkerOnError === true) { if (workerNode.info.dynamic) { @@ -2189,7 +2189,7 @@ export abstract class AbstractPool< } } if (this.opts.enableTasksQueue === true) { - this.rejectRemainingQueuedTaskPromises(crashedWorkerNodeKey, errorEvent) + this.rejectRemainingQueuedTaskPromises(crashedWorkerNodeKey, crashError) } } From c10bca0d2e827108adcbbf62bff03b3dd9810714 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Thu, 14 May 2026 23:27:48 +0200 Subject: [PATCH 05/12] fix: add flushWorkerNodePromises catch-all for unsettled promises on exit Add flushWorkerNodePromises() method as a catch-all to reject any unsettled promises when a worker node exits without crash handling (e.g., unexpected exit without onerror, or termination during pool destroy). The exit handler now has three distinct blocks: 1. Crash detection: handleWorkerNodeCrash if ready && !crashHandled 2. Promise flush: flushWorkerNodePromises for remaining unsettled promises (guarded by ready && !crashHandled to skip intentional exits) 3. Cleanup: removeWorkerNode + conditional restart Aligns with poolifier/poolifier#3211 latest changes (flushWorkerNodePromises catch-all pattern) --- src/pools/abstract-pool.ts | 46 +++++++++++++++++++++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index e004fb12..495b5b52 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1691,6 +1691,7 @@ export abstract class AbstractPool< 'exit', () => { const workerNodeKey = this.workerNodes.indexOf(workerNode) + const exitError = new Error('Worker node exited unexpectedly') if ( workerNode.info.ready && !workerNode.info.crashHandled && @@ -1700,10 +1701,23 @@ export abstract class AbstractPool< this.handleWorkerNodeCrash( workerNode, new ErrorEvent('error', { - message: 'Worker node exited unexpectedly', + message: exitError.message, + error: exitError, }), ) } + if ( + workerNodeKey !== -1 && + !workerNode.info.crashHandled && + workerNode.info.ready + ) { + this.flushWorkerNodePromises( + workerNode, + this.destroying + ? new Error('Worker node terminated during pool destroy') + : exitError, + ) + } this.removeWorkerNode(workerNode) if ( this.started && @@ -2010,6 +2024,36 @@ export abstract class AbstractPool< this.checkAndEmitTaskExecutionFinishedEvents() } + /** + * Rejects all unsettled promises targeting the given worker node. + * Used as a catch-all when crash handling was bypassed (e.g., during pool + * destroy or for non-ready worker exits). Idempotent: already-deleted + * entries are simply skipped. + * @param workerNode - The worker node whose promises to flush. + * @param error - The rejection error. + */ + private flushWorkerNodePromises( + workerNode: IWorkerNode, + error: Error, + ): void { + const workerId = workerNode.info.id + for (const [taskId, promiseResponse] of this.promiseResponseMap) { + if ( + promiseResponse.workerId === workerId || + (workerId == null && promiseResponse.workerId == null) + ) { + promiseResponse.reject(error) + this.promiseResponseMap.delete(taskId) + if (workerNode.usage.tasks.executing > 0) { + --workerNode.usage.tasks.executing + } + ++workerNode.usage.tasks.failed + workerNode.dispatchEvent(new Event('taskFinished')) + } + } + this.checkAndEmitTaskExecutionFinishedEvents() + } + /** * Updates the promise response worker id after task steal or redistribute. * Ensures crash-time rejection targets the correct worker. From 7be3dcec99eb642b2c626042908a0b0eac074b39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Thu, 14 May 2026 23:37:04 +0200 Subject: [PATCH 06/12] fix: add crashHandled to WorkerInfo test expectations --- tests/pools/abstract-pool.test.mjs | 2 ++ tests/pools/utils.test.mjs | 1 + tests/pools/worker-node.test.mjs | 1 + 3 files changed, 4 insertions(+) diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index a9d049c4..7e2b8478 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -1000,6 +1000,7 @@ describe({ stealing: false, stolen: false, continuousStealing: false, + crashHandled: false, backPressureStealing: false, backPressure: false, queuedTaskAbortion: false, @@ -1021,6 +1022,7 @@ describe({ stealing: false, stolen: false, continuousStealing: false, + crashHandled: false, backPressureStealing: false, backPressure: false, queuedTaskAbortion: false, diff --git a/tests/pools/utils.test.mjs b/tests/pools/utils.test.mjs index 34fac57f..c450475d 100644 --- a/tests/pools/utils.test.mjs +++ b/tests/pools/utils.test.mjs @@ -126,6 +126,7 @@ describe('Pool utils test suite', () => { backPressure: false, backPressureStealing: false, continuousStealing: false, + crashHandled: false, queuedTaskAbortion: false, dynamic: false, id: expect.any(String), diff --git a/tests/pools/worker-node.test.mjs b/tests/pools/worker-node.test.mjs index 02c27336..955fd62a 100644 --- a/tests/pools/worker-node.test.mjs +++ b/tests/pools/worker-node.test.mjs @@ -236,6 +236,7 @@ describe('Worker node test suite', () => { stealing: false, stolen: false, continuousStealing: false, + crashHandled: false, backPressureStealing: false, backPressure: false, queuedTaskAbortion: false, From 409f125ca6f7972846e45adfa52ffa1bdb0d96ef Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Thu, 14 May 2026 21:37:29 +0000 Subject: [PATCH 07/12] [autofix.ci] apply automated fixes --- tests/pools/abstract-pool.test.mjs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 7e2b8478..37c7bf70 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -1000,7 +1000,7 @@ describe({ stealing: false, stolen: false, continuousStealing: false, - crashHandled: false, + crashHandled: false, backPressureStealing: false, backPressure: false, queuedTaskAbortion: false, @@ -1022,7 +1022,7 @@ describe({ stealing: false, stolen: false, continuousStealing: false, - crashHandled: false, + crashHandled: false, backPressureStealing: false, backPressure: false, queuedTaskAbortion: false, From 7e170c311036668f62c101aa0884db51cac788fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Thu, 14 May 2026 23:57:59 +0200 Subject: [PATCH 08/12] =?UTF-8?q?refactor:=20address=20review=20findings?= =?UTF-8?q?=20=E2=80=94=20DRY=20helper,=20fallback=20workerId,=20simplify?= =?UTF-8?q?=20null-path?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Extract rejectTaskPromise() helper to eliminate duplicated reject + delete + stats + taskFinished pattern across 3 methods - Add fallback to promiseResponse.workerId in handleTaskExecutionResponse when message.workerId lookup fails (worker already removed) - Simplify rejectInFlightTaskPromises null-ID path to early-return - Keep flush guard as workerNode.info.ready (not this.destroying) since synchronous terminate in web workers causes unhandled rejections during pool.destroy() — destroyWorkerNode already handles graceful shutdown --- src/pools/abstract-pool.ts | 60 ++++++++++++++++++-------------------- 1 file changed, 29 insertions(+), 31 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 495b5b52..97a9b2dc 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1956,18 +1956,6 @@ export abstract class AbstractPool< const workerNode = this.workerNodes[workerNodeKey] const crashedWorkerId = workerNode.info.id if (crashedWorkerId == null) { - for (const [taskId, promiseResponse] of this.promiseResponseMap) { - if (promiseResponse.workerId == null) { - promiseResponse.reject(crashError) - this.promiseResponseMap.delete(taskId) - if (workerNode.usage.tasks.executing > 0) { - --workerNode.usage.tasks.executing - } - ++workerNode.usage.tasks.failed - workerNode.dispatchEvent(new Event('taskFinished')) - } - } - this.checkAndEmitTaskExecutionFinishedEvents() return } const queuedTaskIds = new Set< @@ -1981,13 +1969,7 @@ export abstract class AbstractPool< promiseResponse.workerId === crashedWorkerId && !queuedTaskIds.has(taskId) ) { - promiseResponse.reject(crashError) - this.promiseResponseMap.delete(taskId) - if (workerNode.usage.tasks.executing > 0) { - --workerNode.usage.tasks.executing - } - ++workerNode.usage.tasks.failed - workerNode.dispatchEvent(new Event('taskFinished')) + this.rejectTaskPromise(taskId, promiseResponse, workerNode, crashError) } } this.checkAndEmitTaskExecutionFinishedEvents() @@ -2014,16 +1996,35 @@ export abstract class AbstractPool< if (task?.taskId != null) { const promiseResponse = this.promiseResponseMap.get(task.taskId) if (promiseResponse != null) { - promiseResponse.reject(crashError) - this.promiseResponseMap.delete(task.taskId) - ++workerNode.usage.tasks.failed - workerNode.dispatchEvent(new Event('taskFinished')) + this.rejectTaskPromise( + task.taskId, + promiseResponse, + workerNode, + crashError, + false, + ) } } } this.checkAndEmitTaskExecutionFinishedEvents() } + private rejectTaskPromise( + taskId: `${string}-${string}-${string}-${string}-${string}`, + promiseResponse: PromiseResponseWrapper, + workerNode: IWorkerNode, + error: Error, + decrementExecuting = true, + ): void { + promiseResponse.reject(error) + this.promiseResponseMap.delete(taskId) + if (decrementExecuting && workerNode.usage.tasks.executing > 0) { + --workerNode.usage.tasks.executing + } + ++workerNode.usage.tasks.failed + workerNode.dispatchEvent(new Event('taskFinished')) + } + /** * Rejects all unsettled promises targeting the given worker node. * Used as a catch-all when crash handling was bypassed (e.g., during pool @@ -2042,13 +2043,7 @@ export abstract class AbstractPool< promiseResponse.workerId === workerId || (workerId == null && promiseResponse.workerId == null) ) { - promiseResponse.reject(error) - this.promiseResponseMap.delete(taskId) - if (workerNode.usage.tasks.executing > 0) { - --workerNode.usage.tasks.executing - } - ++workerNode.usage.tasks.failed - workerNode.dispatchEvent(new Event('taskFinished')) + this.rejectTaskPromise(taskId, promiseResponse, workerNode, error) } } this.checkAndEmitTaskExecutionFinishedEvents() @@ -2436,7 +2431,10 @@ export abstract class AbstractPool< const promiseResponse = this.promiseResponseMap.get(taskId!) if (promiseResponse != null) { const { resolve, reject } = promiseResponse - const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + let workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + if (workerNodeKey === -1) { + workerNodeKey = this.getWorkerNodeKeyByWorkerId(promiseResponse.workerId) + } const workerNode = workerNodeKey !== -1 ? this.workerNodes[workerNodeKey] : undefined From b0176074017d8b81e2b2bfe0e3eb739c5554cc07 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Thu, 14 May 2026 21:58:35 +0000 Subject: [PATCH 09/12] [autofix.ci] apply automated fixes --- src/pools/abstract-pool.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 97a9b2dc..e4dd0021 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -2433,7 +2433,9 @@ export abstract class AbstractPool< const { resolve, reject } = promiseResponse let workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) if (workerNodeKey === -1) { - workerNodeKey = this.getWorkerNodeKeyByWorkerId(promiseResponse.workerId) + workerNodeKey = this.getWorkerNodeKeyByWorkerId( + promiseResponse.workerId, + ) } const workerNode = workerNodeKey !== -1 ? this.workerNodes[workerNodeKey] From e9d3ee859c363c3a04e6306f0f9d3961d7d2f44f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 15 May 2026 00:29:25 +0200 Subject: [PATCH 10/12] chore: remove unrelated config files added by mistake --- .prettierrc.json | 7 ------- biome.json | 51 ------------------------------------------------ 2 files changed, 58 deletions(-) delete mode 100644 .prettierrc.json delete mode 100644 biome.json diff --git a/.prettierrc.json b/.prettierrc.json deleted file mode 100644 index de624450..00000000 --- a/.prettierrc.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "$schema": "https://json.schemastore.org/prettierrc", - "arrowParens": "avoid", - "singleQuote": true, - "semi": false, - "trailingComma": "es5" -} diff --git a/biome.json b/biome.json deleted file mode 100644 index 24c40748..00000000 --- a/biome.json +++ /dev/null @@ -1,51 +0,0 @@ -{ - "$schema": "https://biomejs.dev/schemas/2.4.13/schema.json", - "assist": { - "actions": { - "source": { - "organizeImports": "on" - } - } - }, - "linter": { - "enabled": true, - "rules": { - "recommended": true, - "style": { - "noParameterAssign": "error", - "useAsConstAssertion": "error", - "useDefaultParameterLast": "error", - "useEnumInitializers": "error", - "useSelfClosingElements": "error", - "useSingleVarDeclarator": "error", - "noUnusedTemplateLiteral": "error", - "useNumberNamespace": "error", - "noInferrableTypes": "error", - "noUselessElse": "error" - } - } - }, - "formatter": { - "indentStyle": "space", - "indentWidth": 2 - }, - "javascript": { - "formatter": { - "arrowParentheses": "asNeeded", - "quoteStyle": "single", - "semicolons": "asNeeded", - "trailingCommas": "es5" - } - }, - "json": { - "parser": { - "allowComments": true - } - }, - "vcs": { - "enabled": true, - "clientKind": "git", - "useIgnoreFile": true, - "defaultBranch": "master" - } -} From bb1637f818cb14cd1b55e8c5cf22c6d6fcce12b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 15 May 2026 00:36:56 +0200 Subject: [PATCH 11/12] docs: fix flushWorkerNodePromises JSDoc to match actual behavior The JSDoc was copied from upstream where exitCode !== 0 allows flush during pool destroy. In our web worker adaptation, synchronous terminate causes unhandled rejections if flush runs during destroy, so the guard uses workerNode.info.ready instead. Updated doc to reflect this. --- src/pools/abstract-pool.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index e4dd0021..3ee75826 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -2027,9 +2027,8 @@ export abstract class AbstractPool< /** * Rejects all unsettled promises targeting the given worker node. - * Used as a catch-all when crash handling was bypassed (e.g., during pool - * destroy or for non-ready worker exits). Idempotent: already-deleted - * entries are simply skipped. + * Called from the exit handler for unexpected exits of ready workers + * that were not already handled by the onerror crash path. * @param workerNode - The worker node whose promises to flush. * @param error - The rejection error. */ From da58c8a16ff9a7ccd622abe3f64c442767afc77a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 15 May 2026 00:43:26 +0200 Subject: [PATCH 12/12] fix: resolve abort target dynamically via promiseResponseMap The abort signal handler captured workerNodeKey at task submission time. After steal/redistribute, the stale index dispatches abortTask to the wrong worker node. Resolve the current worker dynamically from the stored workerId (kept up-to-date by updatePromiseResponseWorkerId). --- src/pools/abstract-pool.ts | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 3ee75826..fb718205 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1176,11 +1176,21 @@ export abstract class AbstractPool< abortSignal?.addEventListener( 'abort', () => { - this.workerNodes[workerNodeKey]?.dispatchEvent( + const promiseResponse = this.promiseResponseMap.get(task.taskId!) + if (promiseResponse == null) { + return + } + const currentWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( + promiseResponse.workerId, + ) + if (currentWorkerNodeKey === -1) { + return + } + this.workerNodes[currentWorkerNodeKey]?.dispatchEvent( new CustomEvent('abortTask', { detail: { taskId: task.taskId, - workerId: this.getWorkerInfo(workerNodeKey)!.id!, + workerId: promiseResponse.workerId, }, }), )