Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 250 additions & 30 deletions src/pools/abstract-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,9 @@
* @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
*/
private getWorkerNodeKeyByWorkerId(workerId: string | undefined): number {
if (workerId == null) {
return -1
}
return this.workerNodes.findIndex(
(workerNode) => workerNode.info.id === workerId,
)
Expand Down Expand Up @@ -1173,11 +1176,21 @@
abortSignal?.addEventListener(
'abort',
() => {
this.workerNodes[workerNodeKey]?.dispatchEvent(
const promiseResponse = this.promiseResponseMap.get(task.taskId!)
if (promiseResponse == null) {
return
}
const currentWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
promiseResponse.workerId,
)
if (currentWorkerNodeKey === -1) {
return
}
this.workerNodes[currentWorkerNodeKey]?.dispatchEvent(
new CustomEvent<WorkerNodeEventDetail>('abortTask', {
detail: {
taskId: task.taskId,
workerId: this.getWorkerInfo(workerNodeKey)!.id!,
workerId: promiseResponse.workerId,
},
}),
)
Expand All @@ -1187,7 +1200,7 @@
this.promiseResponseMap.set(task.taskId!, {
reject,
resolve,
workerNodeKey,
workerId: this.workerNodes[workerNodeKey].info.id,
abortSignal,
})
if (
Expand Down Expand Up @@ -1672,22 +1685,8 @@
)
}
workerNode.worker.onerror = (errorEvent) => {
workerNode.info.ready = false
this.eventTarget?.dispatchEvent(
new ErrorEvent(PoolEvents.error, errorEvent),
)
if (this.started && !this.destroying) {
if (this.opts.restartWorkerOnError === true) {
if (workerNode.info.dynamic) {
this.createAndSetupDynamicWorkerNode()
} else if (!this.startingMinimumNumberOfWorkers) {
this.startMinimumNumberOfWorkers(true)
}
}
if (this.opts.enableTasksQueue === true) {
this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
}
}
errorEvent.preventDefault()
this.handleWorkerNodeCrash(workerNode, errorEvent)
workerNode?.terminate()
}
workerNode.worker.addEventListener(
Expand All @@ -1701,11 +1700,41 @@
workerNode.addEventListener(
'exit',
() => {
const workerNodeKey = this.workerNodes.indexOf(workerNode)
const exitError = new Error('Worker node exited unexpectedly')
if (
workerNode.info.ready &&
!workerNode.info.crashHandled &&
workerNodeKey !== -1 &&
!this.destroying
) {
this.handleWorkerNodeCrash(
workerNode,
new ErrorEvent('error', {
message: exitError.message,
error: exitError,
}),
)
}
if (
workerNodeKey !== -1 &&
!workerNode.info.crashHandled &&
workerNode.info.ready
) {
this.flushWorkerNodePromises(
workerNode,
this.destroying
? new Error('Worker node terminated during pool destroy')
: exitError,
)
}
Comment on lines +1719 to +1730
this.removeWorkerNode(workerNode)
if (
this.started &&
!this.startingMinimumNumberOfWorkers &&
!this.destroying
!this.destroying &&
(this.opts.restartWorkerOnError === true ||
!workerNode.info.crashHandled)
Comment on lines +1736 to +1737
) {
this.startMinimumNumberOfWorkers(true)
}
Expand Down Expand Up @@ -1916,10 +1945,143 @@
if (destinationWorkerNodeKey === -1) {
break
}
this.handleTask(
destinationWorkerNodeKey,
this.dequeueTask(sourceWorkerNodeKey)!,
)
const task = this.dequeueTask(sourceWorkerNodeKey)!
this.updatePromiseResponseWorkerId(task.taskId, destinationWorkerNodeKey)
this.handleTask(destinationWorkerNodeKey, task)
}
}

/**
* Rejects in-flight task promises for the given crashed worker node key.
* @param workerNodeKey - The worker node key.
* @param crashError - The crash error to reject promises with.
*/
private rejectInFlightTaskPromises(
workerNodeKey: number,
crashError: Error,
): void {
if (workerNodeKey === -1) {
return
}
const workerNode = this.workerNodes[workerNodeKey]
const crashedWorkerId = workerNode.info.id
if (crashedWorkerId == null) {
return
}
const queuedTaskIds = new Set<
`${string}-${string}-${string}-${string}-${string}`
>()
for (const task of workerNode.tasksQueue) {
queuedTaskIds.add(task.taskId!)
}
for (const [taskId, promiseResponse] of this.promiseResponseMap) {
if (
promiseResponse.workerId === crashedWorkerId &&
!queuedTaskIds.has(taskId)
) {
this.rejectTaskPromise(taskId, promiseResponse, workerNode, crashError)
}
}
this.checkAndEmitTaskExecutionFinishedEvents()
}

/**
* Rejects remaining queued task promises for the given crashed worker node key.
* @param workerNodeKey - The worker node key.
* @param crashError - The crash error to reject promises with.
*/
private rejectRemainingQueuedTaskPromises(
workerNodeKey: number,
crashError: Error,
): void {
if (workerNodeKey === -1) {
return
}
const workerNode = this.workerNodes[workerNodeKey]
if (this.tasksQueueSize(workerNodeKey) === 0) {
return
}
while (this.tasksQueueSize(workerNodeKey) > 0) {
const task = this.dequeueTask(workerNodeKey)
if (task?.taskId != null) {
const promiseResponse = this.promiseResponseMap.get(task.taskId)
if (promiseResponse != null) {
this.rejectTaskPromise(
task.taskId,
promiseResponse,
workerNode,
crashError,
false,
)
}
}
}
this.checkAndEmitTaskExecutionFinishedEvents()
}

private rejectTaskPromise(
taskId: `${string}-${string}-${string}-${string}-${string}`,
promiseResponse: PromiseResponseWrapper<Response>,
workerNode: IWorkerNode<Worker, Data>,
error: Error,
decrementExecuting = true,
): void {
promiseResponse.reject(error)
this.promiseResponseMap.delete(taskId)
if (decrementExecuting && workerNode.usage.tasks.executing > 0) {
--workerNode.usage.tasks.executing
}
++workerNode.usage.tasks.failed
workerNode.dispatchEvent(new Event('taskFinished'))
}

/**
* Rejects all unsettled promises targeting the given worker node.
* Called from the exit handler for unexpected exits of ready workers
* that were not already handled by the onerror crash path.
* @param workerNode - The worker node whose promises to flush.
* @param error - The rejection error.
*/
private flushWorkerNodePromises(
workerNode: IWorkerNode<Worker, Data>,
error: Error,
): void {
const workerId = workerNode.info.id
for (const [taskId, promiseResponse] of this.promiseResponseMap) {
if (
promiseResponse.workerId === workerId ||
(workerId == null && promiseResponse.workerId == null)
) {
this.rejectTaskPromise(taskId, promiseResponse, workerNode, error)
}
}
this.checkAndEmitTaskExecutionFinishedEvents()
}

/**
* Updates the promise response worker id after task steal or redistribute.
* Ensures crash-time rejection targets the correct worker.
* @param taskId - The task id.
* @param workerNodeKey - The destination worker node key.
*/
private updatePromiseResponseWorkerId(
taskId: `${string}-${string}-${string}-${string}-${string}` | undefined,
workerNodeKey: number,
): void {
if (taskId == null || workerNodeKey === -1) {
return
}
const workerNode = this.workerNodes[workerNodeKey]
if (workerNode.info.id == null) {
const promiseResponse = this.promiseResponseMap.get(taskId)
if (promiseResponse != null) {
promiseResponse.workerId = undefined
}
return
}
const promiseResponse = this.promiseResponseMap.get(taskId)
if (promiseResponse != null) {
promiseResponse.workerId = workerNode.info.id
}
}

Expand Down Expand Up @@ -2019,6 +2181,10 @@
}
sourceWorkerNode.info.stolen = false
destinationWorkerNode.info.stealing = false
this.updatePromiseResponseWorkerId(
stolenTask.taskId,
destinationWorkerNodeKey,
)
this.handleTask(destinationWorkerNodeKey, stolenTask)
this.updateTaskStolenStatisticsWorkerUsage(
destinationWorkerNodeKey,
Expand All @@ -2038,6 +2204,43 @@
)
}

/**
* Handles a crashed worker node: emits error, rejects in-flight promises,
* restarts dynamic workers if configured, and redistributes queued tasks.
* Static worker restart is handled by the exit event handler.
* @param workerNode - The crashed worker node.
* @param errorEvent - The error event that caused the crash.
*/
private handleWorkerNodeCrash(
workerNode: IWorkerNode<Worker, Data>,
errorEvent: ErrorEvent,
): void {
workerNode.info.ready = false
workerNode.info.crashHandled = true
this.eventTarget?.dispatchEvent(
new ErrorEvent(PoolEvents.error, errorEvent),
)
const crashedWorkerNodeKey = this.workerNodes.indexOf(workerNode)
const crashError = new Error(
`Worker node crashed with error: '${errorEvent.message}'`,
{ cause: errorEvent.error ?? errorEvent },
)
this.rejectInFlightTaskPromises(crashedWorkerNodeKey, crashError)
if (this.started && !this.destroying) {
if (this.opts.restartWorkerOnError === true) {
if (workerNode.info.dynamic) {
this.createAndSetupDynamicWorkerNode()
}
}
if (this.opts.enableTasksQueue === true) {
this.redistributeQueuedTasks(crashedWorkerNodeKey)
}
}
if (this.opts.enableTasksQueue === true) {
this.rejectRemainingQueuedTaskPromises(crashedWorkerNodeKey, crashError)
}
}

private readonly handleWorkerNodeIdleEvent = (
event: CustomEvent<WorkerNodeEventDetail>,
previousStolenTask?: Task<Data>,
Expand Down Expand Up @@ -2221,6 +2424,9 @@
)
}
const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
if (workerNodeKey === -1) {
return
}
const workerNode = this.workerNodes[workerNodeKey]
workerNode.info.ready = ready
workerNode.info.taskFunctionsProperties = taskFunctionsProperties
Expand All @@ -2230,11 +2436,19 @@
}

private handleTaskExecutionResponse(message: MessageValue<Response>): void {
const { name, taskId, workerError, data } = message
const { name, taskId, workerError, data, workerId } = message
const promiseResponse = this.promiseResponseMap.get(taskId!)
if (promiseResponse != null) {
const { resolve, reject, workerNodeKey } = promiseResponse
const workerNode = this.workerNodes[workerNodeKey]
const { resolve, reject } = promiseResponse
let workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
if (workerNodeKey === -1) {
workerNodeKey = this.getWorkerNodeKeyByWorkerId(
promiseResponse.workerId,
)
}
const workerNode = workerNodeKey !== -1

Check warning on line 2449 in src/pools/abstract-pool.ts

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Unexpected negated condition.

See more on https://sonarcloud.io/project/issues?id=poolifier_poolifier-web-worker&issues=AZ4niXZf6txHQD_w_Wzf&open=AZ4niXZf6txHQD_w_Wzf&pullRequest=146
? this.workerNodes[workerNodeKey]
: undefined
if (workerError != null) {
this.eventTarget?.dispatchEvent(
new ErrorEvent(PoolEvents.taskError, { error: workerError }),
Expand All @@ -2248,12 +2462,18 @@
} else {
resolve(data!)
}
this.afterTaskExecutionHook(workerNodeKey, message)
if (workerNodeKey !== -1) {
this.afterTaskExecutionHook(workerNodeKey, message)
}
Comment on lines +2465 to +2467
queueMicrotask(() => {
workerNode?.dispatchEvent(new Event('taskFinished'))
this.promiseResponseMap.delete(taskId!)
this.checkAndEmitTaskExecutionFinishedEvents()
if (this.opts.enableTasksQueue === true && !this.destroying) {
if (
workerNodeKey !== -1 &&
this.opts.enableTasksQueue === true &&
!this.destroying
) {
if (
!this.isWorkerNodeBusy(workerNodeKey) &&
this.tasksQueueSize(workerNodeKey) > 0
Expand All @@ -2264,7 +2484,7 @@
)
}
if (this.isWorkerNodeIdle(workerNodeKey)) {
workerNode.dispatchEvent(
workerNode?.dispatchEvent(
new CustomEvent<WorkerNodeEventDetail>('idle', {
detail: { workerNodeKey },
}),
Expand Down
1 change: 1 addition & 0 deletions src/pools/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ export const initWorkerInfo = (worker: IWorker): WorkerInfo => {
backPressure: false,
backPressureStealing: false,
continuousStealing: false,
crashHandled: false,
queuedTaskAbortion: false,
dynamic: false,
id: getWorkerId(worker),
Expand Down
5 changes: 5 additions & 0 deletions src/pools/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ export interface WorkerInfo {
* This flag is set to `true` when worker node is continuously stealing tasks from other worker nodes.
*/
continuousStealing: boolean
/**
* Crash handled flag.
* This flag is set to `true` when worker node crash has been handled.
*/
crashHandled: boolean
/**
* Back pressure stealing flag.
* This flag is set to `true` when worker node is stealing one task from another back pressured worker node.
Expand Down
Loading
Loading