Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions src/WorkerHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,8 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options) {
* We need this timeout in either case of a Timeout or Cancellation Error as if
* the worker does not send a message we still need to give a window of time for a response.
*
* The workerTermniateTimeout is used here if this promise is rejected the worker cleanup
* operations will occure.
* The workerTerminateTimeout is used here if this promise is rejected the worker cleanup
* operations will occur.
*/
me.tracking[id].timeoutId = setTimeout(function() {
me.tracking[id].resolver.reject(error);
Expand All @@ -496,7 +496,14 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options) {
* @return {boolean} Returns true if the worker is busy
*/
WorkerHandler.prototype.busy = function () {
return this.cleaning || Object.keys(this.processing).length > 0;
// A worker with entries in `tracking` has a cancelled or timed-out task whose
// cleanup may force-terminate the worker after `workerTerminateTimeout`. New
// tasks must not be assigned to it during that window — if they were, the
// forced terminate would reject them with "Worker terminated" even though
// they were unrelated to the original cancellation.
return this.cleaning
|| Object.keys(this.processing).length > 0
|| Object.keys(this.tracking).length > 0;
};

/**
Expand Down Expand Up @@ -625,7 +632,7 @@ WorkerHandler.prototype.terminateAndNotify = function (force, timeout) {
};

/**
* Wrapper error type to denote that a TimeoutError has already been proceesed
* Wrapper error type to denote that a TimeoutError has already been processed
* and we should skip cleanup operations
* @param {Promise.TimeoutError} timeoutError
*/
Expand Down
51 changes: 43 additions & 8 deletions test/Pool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,41 @@ describe('Pool', function () {

// TODO: test whether a task in the queue can be neatly cancelled

it('should not assign new tasks to a worker whose previous task was cancelled', function (done) {
// Regression test: when a task is cancelled, WorkerHandler moves it to
// `tracking` and schedules a force-terminate after `workerTerminateTimeout`.
// During that window the worker must not pick up new tasks — otherwise
// the forced termination would reject them with "Worker terminated"
// even though they had nothing to do with the original cancellation.
var pool = createPool({maxWorkers: 1, workerTerminateTimeout: 50});

function forever() {
while (1 > 0) {} // runs forever (so cancel must force-terminate)
}

function add(a, b) {
return a + b;
}

var p1 = pool.exec(forever).catch(function (err) {
assert(err instanceof Promise.CancellationError);
});

// Cancel after the worker has actually started running `forever`.
setTimeout(function () {
p1.cancel();

// Queue a follow-up task immediately to ensure that it queues until
// the worker finishes cleaning up the cancelled task.
pool.exec(add, [3, 4])
.then(function (result) {
assert.strictEqual(result, 7);
done();
})
.catch(done);
Comment on lines +714 to +719

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: are we confident we are not just hitting the catch block and cleaning up? I think there's potential to swallow errors.

}, 25);
});

it('should timeout a task', function () {
var pool = createPool({maxWorkers: 10});

Expand Down Expand Up @@ -1576,10 +1611,10 @@ describe('Pool', function () {
let stats = pool.stats();
assert.strictEqual(workerCount, 1);
assert.strictEqual(stats.totalWorkers, 1);
assert.strictEqual(stats.idleWorkers, 1);
assert.strictEqual(stats.busyWorkers, 0);
}).then(function() {
return pool.exec(add, [1, 2])
// Still busy - cleanup is in progress.
assert.strictEqual(stats.busyWorkers, 1);
}).then(function() {
return pool.exec(add, [1, 2])
}).then(function() {
var stats = pool.stats();
assert.strictEqual(workerCount, 1);
Expand Down Expand Up @@ -1616,10 +1651,10 @@ describe('Pool', function () {
let stats = pool.stats();
assert.strictEqual(workerCount, 1);
assert.strictEqual(stats.totalWorkers, 1);
assert.strictEqual(stats.idleWorkers, 1);
assert.strictEqual(stats.busyWorkers, 0);
}).then(function() {
return pool.exec(add, [1, 2])
// Still busy - cleanup is in progress on the worker.
assert.strictEqual(stats.busyWorkers, 1);
}).then(function() {
return pool.exec(add, [1, 2])
}).then(function() {
var stats = pool.stats();
assert.strictEqual(workerCount, 1);
Expand Down
Loading