diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 2046f19e..fb718205 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -559,6 +559,9 @@ export abstract class AbstractPool< * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise. */ private getWorkerNodeKeyByWorkerId(workerId: string | undefined): number { + if (workerId == null) { + return -1 + } return this.workerNodes.findIndex( (workerNode) => workerNode.info.id === workerId, ) @@ -1173,11 +1176,21 @@ export abstract class AbstractPool< abortSignal?.addEventListener( 'abort', () => { - this.workerNodes[workerNodeKey]?.dispatchEvent( + const promiseResponse = this.promiseResponseMap.get(task.taskId!) + if (promiseResponse == null) { + return + } + const currentWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( + promiseResponse.workerId, + ) + if (currentWorkerNodeKey === -1) { + return + } + this.workerNodes[currentWorkerNodeKey]?.dispatchEvent( new CustomEvent('abortTask', { detail: { taskId: task.taskId, - workerId: this.getWorkerInfo(workerNodeKey)!.id!, + workerId: promiseResponse.workerId, }, }), ) @@ -1187,7 +1200,7 @@ export abstract class AbstractPool< this.promiseResponseMap.set(task.taskId!, { reject, resolve, - workerNodeKey, + workerId: this.workerNodes[workerNodeKey].info.id, abortSignal, }) if ( @@ -1672,22 +1685,8 @@ export abstract class AbstractPool< ) } workerNode.worker.onerror = (errorEvent) => { - workerNode.info.ready = false - this.eventTarget?.dispatchEvent( - new ErrorEvent(PoolEvents.error, errorEvent), - ) - if (this.started && !this.destroying) { - if (this.opts.restartWorkerOnError === true) { - if (workerNode.info.dynamic) { - this.createAndSetupDynamicWorkerNode() - } else if (!this.startingMinimumNumberOfWorkers) { - this.startMinimumNumberOfWorkers(true) - } - } - if (this.opts.enableTasksQueue === true) { - this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode)) - } - } + errorEvent.preventDefault() + this.handleWorkerNodeCrash(workerNode, errorEvent) workerNode?.terminate() } workerNode.worker.addEventListener( @@ -1701,11 +1700,41 @@ export abstract class AbstractPool< workerNode.addEventListener( 'exit', () => { + const workerNodeKey = this.workerNodes.indexOf(workerNode) + const exitError = new Error('Worker node exited unexpectedly') + if ( + workerNode.info.ready && + !workerNode.info.crashHandled && + workerNodeKey !== -1 && + !this.destroying + ) { + this.handleWorkerNodeCrash( + workerNode, + new ErrorEvent('error', { + message: exitError.message, + error: exitError, + }), + ) + } + if ( + workerNodeKey !== -1 && + !workerNode.info.crashHandled && + workerNode.info.ready + ) { + this.flushWorkerNodePromises( + workerNode, + this.destroying + ? new Error('Worker node terminated during pool destroy') + : exitError, + ) + } this.removeWorkerNode(workerNode) if ( this.started && !this.startingMinimumNumberOfWorkers && - !this.destroying + !this.destroying && + (this.opts.restartWorkerOnError === true || + !workerNode.info.crashHandled) ) { this.startMinimumNumberOfWorkers(true) } @@ -1916,10 +1945,143 @@ export abstract class AbstractPool< if (destinationWorkerNodeKey === -1) { break } - this.handleTask( - destinationWorkerNodeKey, - this.dequeueTask(sourceWorkerNodeKey)!, - ) + const task = this.dequeueTask(sourceWorkerNodeKey)! + this.updatePromiseResponseWorkerId(task.taskId, destinationWorkerNodeKey) + this.handleTask(destinationWorkerNodeKey, task) + } + } + + /** + * Rejects in-flight task promises for the given crashed worker node key. + * @param workerNodeKey - The worker node key. + * @param crashError - The crash error to reject promises with. + */ + private rejectInFlightTaskPromises( + workerNodeKey: number, + crashError: Error, + ): void { + if (workerNodeKey === -1) { + return + } + const workerNode = this.workerNodes[workerNodeKey] + const crashedWorkerId = workerNode.info.id + if (crashedWorkerId == null) { + return + } + const queuedTaskIds = new Set< + `${string}-${string}-${string}-${string}-${string}` + >() + for (const task of workerNode.tasksQueue) { + queuedTaskIds.add(task.taskId!) + } + for (const [taskId, promiseResponse] of this.promiseResponseMap) { + if ( + promiseResponse.workerId === crashedWorkerId && + !queuedTaskIds.has(taskId) + ) { + this.rejectTaskPromise(taskId, promiseResponse, workerNode, crashError) + } + } + this.checkAndEmitTaskExecutionFinishedEvents() + } + + /** + * Rejects remaining queued task promises for the given crashed worker node key. + * @param workerNodeKey - The worker node key. + * @param crashError - The crash error to reject promises with. + */ + private rejectRemainingQueuedTaskPromises( + workerNodeKey: number, + crashError: Error, + ): void { + if (workerNodeKey === -1) { + return + } + const workerNode = this.workerNodes[workerNodeKey] + if (this.tasksQueueSize(workerNodeKey) === 0) { + return + } + while (this.tasksQueueSize(workerNodeKey) > 0) { + const task = this.dequeueTask(workerNodeKey) + if (task?.taskId != null) { + const promiseResponse = this.promiseResponseMap.get(task.taskId) + if (promiseResponse != null) { + this.rejectTaskPromise( + task.taskId, + promiseResponse, + workerNode, + crashError, + false, + ) + } + } + } + this.checkAndEmitTaskExecutionFinishedEvents() + } + + private rejectTaskPromise( + taskId: `${string}-${string}-${string}-${string}-${string}`, + promiseResponse: PromiseResponseWrapper, + workerNode: IWorkerNode, + error: Error, + decrementExecuting = true, + ): void { + promiseResponse.reject(error) + this.promiseResponseMap.delete(taskId) + if (decrementExecuting && workerNode.usage.tasks.executing > 0) { + --workerNode.usage.tasks.executing + } + ++workerNode.usage.tasks.failed + workerNode.dispatchEvent(new Event('taskFinished')) + } + + /** + * Rejects all unsettled promises targeting the given worker node. + * Called from the exit handler for unexpected exits of ready workers + * that were not already handled by the onerror crash path. + * @param workerNode - The worker node whose promises to flush. + * @param error - The rejection error. + */ + private flushWorkerNodePromises( + workerNode: IWorkerNode, + error: Error, + ): void { + const workerId = workerNode.info.id + for (const [taskId, promiseResponse] of this.promiseResponseMap) { + if ( + promiseResponse.workerId === workerId || + (workerId == null && promiseResponse.workerId == null) + ) { + this.rejectTaskPromise(taskId, promiseResponse, workerNode, error) + } + } + this.checkAndEmitTaskExecutionFinishedEvents() + } + + /** + * Updates the promise response worker id after task steal or redistribute. + * Ensures crash-time rejection targets the correct worker. + * @param taskId - The task id. + * @param workerNodeKey - The destination worker node key. + */ + private updatePromiseResponseWorkerId( + taskId: `${string}-${string}-${string}-${string}-${string}` | undefined, + workerNodeKey: number, + ): void { + if (taskId == null || workerNodeKey === -1) { + return + } + const workerNode = this.workerNodes[workerNodeKey] + if (workerNode.info.id == null) { + const promiseResponse = this.promiseResponseMap.get(taskId) + if (promiseResponse != null) { + promiseResponse.workerId = undefined + } + return + } + const promiseResponse = this.promiseResponseMap.get(taskId) + if (promiseResponse != null) { + promiseResponse.workerId = workerNode.info.id } } @@ -2019,6 +2181,10 @@ export abstract class AbstractPool< } sourceWorkerNode.info.stolen = false destinationWorkerNode.info.stealing = false + this.updatePromiseResponseWorkerId( + stolenTask.taskId, + destinationWorkerNodeKey, + ) this.handleTask(destinationWorkerNodeKey, stolenTask) this.updateTaskStolenStatisticsWorkerUsage( destinationWorkerNodeKey, @@ -2038,6 +2204,43 @@ export abstract class AbstractPool< ) } + /** + * Handles a crashed worker node: emits error, rejects in-flight promises, + * restarts dynamic workers if configured, and redistributes queued tasks. + * Static worker restart is handled by the exit event handler. + * @param workerNode - The crashed worker node. + * @param errorEvent - The error event that caused the crash. + */ + private handleWorkerNodeCrash( + workerNode: IWorkerNode, + errorEvent: ErrorEvent, + ): void { + workerNode.info.ready = false + workerNode.info.crashHandled = true + this.eventTarget?.dispatchEvent( + new ErrorEvent(PoolEvents.error, errorEvent), + ) + const crashedWorkerNodeKey = this.workerNodes.indexOf(workerNode) + const crashError = new Error( + `Worker node crashed with error: '${errorEvent.message}'`, + { cause: errorEvent.error ?? errorEvent }, + ) + this.rejectInFlightTaskPromises(crashedWorkerNodeKey, crashError) + if (this.started && !this.destroying) { + if (this.opts.restartWorkerOnError === true) { + if (workerNode.info.dynamic) { + this.createAndSetupDynamicWorkerNode() + } + } + if (this.opts.enableTasksQueue === true) { + this.redistributeQueuedTasks(crashedWorkerNodeKey) + } + } + if (this.opts.enableTasksQueue === true) { + this.rejectRemainingQueuedTaskPromises(crashedWorkerNodeKey, crashError) + } + } + private readonly handleWorkerNodeIdleEvent = ( event: CustomEvent, previousStolenTask?: Task, @@ -2221,6 +2424,9 @@ export abstract class AbstractPool< ) } const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + if (workerNodeKey === -1) { + return + } const workerNode = this.workerNodes[workerNodeKey] workerNode.info.ready = ready workerNode.info.taskFunctionsProperties = taskFunctionsProperties @@ -2230,11 +2436,19 @@ export abstract class AbstractPool< } private handleTaskExecutionResponse(message: MessageValue): void { - const { name, taskId, workerError, data } = message + const { name, taskId, workerError, data, workerId } = message const promiseResponse = this.promiseResponseMap.get(taskId!) if (promiseResponse != null) { - const { resolve, reject, workerNodeKey } = promiseResponse - const workerNode = this.workerNodes[workerNodeKey] + const { resolve, reject } = promiseResponse + let workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + if (workerNodeKey === -1) { + workerNodeKey = this.getWorkerNodeKeyByWorkerId( + promiseResponse.workerId, + ) + } + const workerNode = workerNodeKey !== -1 + ? this.workerNodes[workerNodeKey] + : undefined if (workerError != null) { this.eventTarget?.dispatchEvent( new ErrorEvent(PoolEvents.taskError, { error: workerError }), @@ -2248,12 +2462,18 @@ export abstract class AbstractPool< } else { resolve(data!) } - this.afterTaskExecutionHook(workerNodeKey, message) + if (workerNodeKey !== -1) { + this.afterTaskExecutionHook(workerNodeKey, message) + } queueMicrotask(() => { workerNode?.dispatchEvent(new Event('taskFinished')) this.promiseResponseMap.delete(taskId!) this.checkAndEmitTaskExecutionFinishedEvents() - if (this.opts.enableTasksQueue === true && !this.destroying) { + if ( + workerNodeKey !== -1 && + this.opts.enableTasksQueue === true && + !this.destroying + ) { if ( !this.isWorkerNodeBusy(workerNodeKey) && this.tasksQueueSize(workerNodeKey) > 0 @@ -2264,7 +2484,7 @@ export abstract class AbstractPool< ) } if (this.isWorkerNodeIdle(workerNodeKey)) { - workerNode.dispatchEvent( + workerNode?.dispatchEvent( new CustomEvent('idle', { detail: { workerNodeKey }, }), diff --git a/src/pools/utils.ts b/src/pools/utils.ts index 4993f7b8..7daeefe8 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -530,6 +530,7 @@ export const initWorkerInfo = (worker: IWorker): WorkerInfo => { backPressure: false, backPressureStealing: false, continuousStealing: false, + crashHandled: false, queuedTaskAbortion: false, dynamic: false, id: getWorkerId(worker), diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 96aa1dc0..6a7461fd 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -164,6 +164,11 @@ export interface WorkerInfo { * This flag is set to `true` when worker node is continuously stealing tasks from other worker nodes. */ continuousStealing: boolean + /** + * Crash handled flag. + * This flag is set to `true` when worker node crash has been handled. + */ + crashHandled: boolean /** * Back pressure stealing flag. * This flag is set to `true` when worker node is stealing one task from another back pressured worker node. diff --git a/src/utility-types.ts b/src/utility-types.ts index 6b1de900..e7431d55 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -233,9 +233,9 @@ export interface PromiseResponseWrapper { */ readonly reject: (reason?: unknown) => void /** - * The worker node key executing the task. + * The worker id executing the task. */ - readonly workerNodeKey: number + workerId: `${string}-${string}-${string}-${string}-${string}` | undefined } /** diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index a9d049c4..37c7bf70 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -1000,6 +1000,7 @@ describe({ stealing: false, stolen: false, continuousStealing: false, + crashHandled: false, backPressureStealing: false, backPressure: false, queuedTaskAbortion: false, @@ -1021,6 +1022,7 @@ describe({ stealing: false, stolen: false, continuousStealing: false, + crashHandled: false, backPressureStealing: false, backPressure: false, queuedTaskAbortion: false, diff --git a/tests/pools/thread/fixed.test.mjs b/tests/pools/thread/fixed.test.mjs index 5fa92fb3..e096b552 100644 --- a/tests/pools/thread/fixed.test.mjs +++ b/tests/pools/thread/fixed.test.mjs @@ -18,7 +18,8 @@ describe({ echoPool, errorPool, asyncErrorPool, - asyncPool + asyncPool, + crashPool before(() => { pool = new FixedThreadPool( @@ -68,6 +69,18 @@ describe({ numberOfThreads, new URL('./../../worker-files/thread/asyncWorker.mjs', import.meta.url), ) + crashPool = new FixedThreadPool( + 1, + new URL( + './../../worker-files/thread/crashWorker.mjs', + import.meta.url, + ), + { + enableTasksQueue: true, + restartWorkerOnError: false, + tasksQueueOptions: { concurrency: 1 }, + }, + ) }) after(async () => { @@ -78,6 +91,7 @@ describe({ await asyncErrorPool.destroy() await emptyPool.destroy() await queuePool.destroy() + await crashPool.destroy() }) it('Verify that the function is executed in a worker thread', async () => { @@ -148,7 +162,14 @@ describe({ expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual( numberOfThreads * maxMultiplier, ) - expect(workerNode.usage.tasks.executed).toBe(maxMultiplier) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual( + queuePool.opts.tasksQueueOptions.concurrency, + ) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + numberOfThreads * + (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency) + + queuePool.opts.tasksQueueOptions.concurrency, + ) expect(workerNode.usage.tasks.queued).toBe(0) expect(workerNode.usage.tasks.maxQueued).toBe( maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency, @@ -157,18 +178,21 @@ describe({ workerNode.usage.tasks.sequentiallyStolen, ).toBeGreaterThanOrEqual(0) expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual( - numberOfThreads * maxMultiplier, + numberOfThreads * + (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency), ) expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0) expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual( - numberOfThreads * maxMultiplier, + numberOfThreads * + (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency), ) } expect(queuePool.info.executedTasks).toBe(numberOfThreads * maxMultiplier) expect(queuePool.info.backPressure).toBe(false) expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0) expect(queuePool.info.stolenTasks).toBeLessThanOrEqual( - numberOfThreads * maxMultiplier, + numberOfThreads * + (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency), ) }) @@ -292,6 +316,29 @@ describe({ expect(error.stack).toBeDefined() }) + it('Verify that in-flight task promises reject on worker crash', async () => { + let poolError + const errorHandler = (e) => { + poolError = e + } + crashPool.eventTarget.addEventListener(PoolEvents.error, errorHandler, { + once: true, + }) + const exitPromise = waitWorkerNodeEvents(crashPool, 'exit', 1) + let error + try { + await crashPool.execute() + } catch (e) { + error = e + } + expect(error).toBeInstanceOf(Error) + expect(error.message).toMatch(/Worker node crashed with error:/) + expect(error.message).toMatch(/Simulated worker crash/) + expect(poolError).toBeInstanceOf(ErrorEvent) + expect(poolError.message).toMatch(/Simulated worker crash/) + await exitPromise + }) + it('Verify that async function is working properly', async () => { const data = { f: 10 } const startTime = performance.now() diff --git a/tests/pools/utils.test.mjs b/tests/pools/utils.test.mjs index 34fac57f..c450475d 100644 --- a/tests/pools/utils.test.mjs +++ b/tests/pools/utils.test.mjs @@ -126,6 +126,7 @@ describe('Pool utils test suite', () => { backPressure: false, backPressureStealing: false, continuousStealing: false, + crashHandled: false, queuedTaskAbortion: false, dynamic: false, id: expect.any(String), diff --git a/tests/pools/worker-node.test.mjs b/tests/pools/worker-node.test.mjs index 02c27336..955fd62a 100644 --- a/tests/pools/worker-node.test.mjs +++ b/tests/pools/worker-node.test.mjs @@ -236,6 +236,7 @@ describe('Worker node test suite', () => { stealing: false, stolen: false, continuousStealing: false, + crashHandled: false, backPressureStealing: false, backPressure: false, queuedTaskAbortion: false, diff --git a/tests/worker-files/thread/crashWorker.mjs b/tests/worker-files/thread/crashWorker.mjs new file mode 100644 index 00000000..9191d816 --- /dev/null +++ b/tests/worker-files/thread/crashWorker.mjs @@ -0,0 +1,18 @@ +import { KillBehaviors, ThreadWorker } from '../../../src/mod.ts' + +/** + * Worker that simulates a crash via an unhandled exception during task execution. + * The async function never resolves, keeping the task in-flight while the scheduled + * throw kills the worker thread and triggers the 'error' event on the parent. + */ +async function crash() { + setTimeout(() => { + throw new Error('Simulated worker crash') + }, 10) + await new Promise(() => {}) +} + +export default new ThreadWorker(crash, { + killBehavior: KillBehaviors.HARD, + maxInactiveTime: 500, +})