Skip to content

Commit 14e5451

Browse files
committed
Use async/await with semaphores
1 parent 4e26f97 commit 14e5451

2 files changed

Lines changed: 106 additions & 0 deletions

File tree

JavaScript/8-binary-async.js

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
'use strict';
2+
3+
const threads = require('worker_threads');
4+
const { Worker, isMainThread } = threads;
5+
6+
const LOCKED = 0;
7+
const UNLOCKED = 1;
8+
9+
class BinarySemaphore {
10+
constructor(shared, offset = 0, init = false) {
11+
this.lock = new Int32Array(shared, offset, 1);
12+
if (init) Atomics.store(this.lock, 0, UNLOCKED);
13+
}
14+
15+
async enter() {
16+
let prev = Atomics.exchange(this.lock, 0, LOCKED);
17+
while (prev !== UNLOCKED) {
18+
Atomics.wait(this.lock, 0, LOCKED);
19+
prev = Atomics.exchange(this.lock, 0, LOCKED);
20+
}
21+
Atomics.store(this.lock, 0, LOCKED);
22+
}
23+
24+
leave() {
25+
if (Atomics.load(this.lock, 0) === UNLOCKED) {
26+
throw new Error('Cannot leave unlocked BinarySemaphore');
27+
}
28+
Atomics.store(this.lock, 0, UNLOCKED);
29+
Atomics.notify(this.lock, 0, 1);
30+
}
31+
}
32+
33+
// Usage
34+
35+
if (isMainThread) {
36+
const buffer = new SharedArrayBuffer(14);
37+
const semaphore = new BinarySemaphore(buffer, 0, true);
38+
console.dir({ semaphore });
39+
new Worker(__filename, { workerData: buffer });
40+
new Worker(__filename, { workerData: buffer });
41+
} else {
42+
const { threadId, workerData } = threads;
43+
const semaphore = new BinarySemaphore(workerData);
44+
const array = new Int8Array(workerData, 4);
45+
const value = threadId === 1 ? 1 : -1;
46+
setInterval(async () => {
47+
await semaphore.enter();
48+
for (let i = 0; i < 10; i++) {
49+
array[i] += value;
50+
}
51+
console.dir([ threadId, semaphore.lock[0], array ]);
52+
semaphore.leave();
53+
}, 100);
54+
}

JavaScript/9-counting-async.js

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
'use strict';
2+
3+
const fs = require('fs');
4+
const threads = require('worker_threads');
5+
const { Worker, isMainThread } = threads;
6+
7+
class CountingSemaphore {
8+
constructor(shared, offset = 0, initial) {
9+
this.counter = new Int32Array(shared, offset, 1);
10+
if (typeof initial === 'number') {
11+
Atomics.store(this.counter, 0, initial);
12+
}
13+
}
14+
15+
async enter() {
16+
Atomics.wait(this.counter, 0, 0);
17+
const prev = Atomics.sub(this.counter, 0, 1);
18+
if (prev > 0) return;
19+
this.leave();
20+
return await this.enter();
21+
}
22+
23+
leave() {
24+
Atomics.add(this.counter, 0, 1);
25+
Atomics.notify(this.counter, 0, 1);
26+
}
27+
}
28+
29+
// Usage
30+
31+
if (isMainThread) {
32+
const buffer = new SharedArrayBuffer(4);
33+
const semaphore = new CountingSemaphore(buffer, 0, 2);
34+
console.dir({ semaphore: semaphore.counter[0] });
35+
for (let i = 0; i < 20; i++) {
36+
new Worker(__filename, { workerData: buffer });
37+
}
38+
} else {
39+
const { threadId, workerData } = threads;
40+
const semaphore = new CountingSemaphore(workerData);
41+
const REPEAT_COUNT = 1000000;
42+
const file = `file-${threadId}.dat`;
43+
console.dir({ threadId, semaphore: semaphore.counter[0] });
44+
45+
(async () => {
46+
await semaphore.enter();
47+
const data = `Data from ${threadId}`.repeat(REPEAT_COUNT);
48+
await fs.promises.writeFile(file, data);
49+
await fs.promises.unlink(file);
50+
semaphore.leave();
51+
})();
52+
}

0 commit comments

Comments
 (0)