Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions android/src/androidTest/java/com/comapeo/core/NodeJSIPCTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,44 @@ class NodeJSIPCTest {
Thread.sleep(500)
}

/**
* Pins the synchronous-close contract: after [NodeJSIPC.close] returns,
* the peer's blocking read must already have observed EOF. A regression
* to a launched-coroutine teardown would race with the next `OnCreate`
* and break rpc-reflector cleanup.
*/
@Test
fun closeReleasesSocketSynchronously() {
val connected = CountDownLatch(1)
val readResult = java.util.concurrent.atomic.AtomicInteger(Int.MIN_VALUE)
val readReturned = CountDownLatch(1)

startMockServer { input, _ ->
connected.countDown()
try {
readResult.set(input.read())
} catch (e: IOException) {
// Some kernels surface peer-close as IOException; treat as EOF.
readResult.set(-1)
}
readReturned.countDown()
}

val ipc = NodeJSIPC(socketFile) { msg -> receivedMessages.add(msg) }
assertTrue("Should connect within 10s", connected.await(10, TimeUnit.SECONDS))
// Let the receive loop settle so we test steady-state close, not
// a connect-cancel race.
Thread.sleep(200)

ipc.close()

assertTrue(
"Server-side read must unblock with EOF within 1s of close() returning",
readReturned.await(1, TimeUnit.SECONDS)
)
assertEquals(-1, readResult.get())
}

@Test
fun handlesServerDisconnect() {
val connected = CountDownLatch(1)
Expand Down
21 changes: 11 additions & 10 deletions android/src/main/java/com/comapeo/core/ComapeoCoreModule.kt
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ class ComapeoCoreModule : Module() {
}

override fun definition() = ModuleDefinition {
// OnCreate / OnDestroy are bound to the Expo AppContext (JS runtime
// lifetime), so they fire on every JS reload — the boundary at
// which rpc-reflector subscriptions on MapeoManager need to be
// torn down before a fresh client connects.
OnCreate {
val socketFile =
File(appContext.persistentFilesDirectory, ComapeoCoreService.COMAPEO_SOCKET_FILENAME)
Expand Down Expand Up @@ -190,19 +194,16 @@ class ComapeoCoreModule : Module() {
}

OnDestroy {
ipc.disconnect()
controlIpc.disconnect()
// Synchronous close: backend must see EOF on the previous
// session before the next OnCreate opens a new connection, or
// rpc-reflector cleanup runs against the wrong connection.
ipc.close()
controlIpc.close()
}

OnActivityEntersForeground {
// `connect()` is idempotent on `NodeJSIPC`: it early-returns
// when the IPC is already in Connected/Connecting/Disconnecting
// and resets a prior `Error` state so the next attempt can
// succeed. Calling it on every foreground transition is
// therefore the cheap way to recover from a transient
// connection failure (e.g. the FGS was killed and respawned
// while the app was backgrounded) without us tracking the
// IPC state ourselves at this layer.
// Idempotent reconnect to recover from a transient failure
// (e.g. FGS killed and respawned while backgrounded).
ipc.connect()
controlIpc.connect()
}
Expand Down
20 changes: 20 additions & 0 deletions android/src/main/java/com/comapeo/core/NodeJSIPC.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
Expand Down Expand Up @@ -232,6 +233,25 @@ class NodeJSIPC(
connect()
sendChannel.trySend(message)
}

/**
* Synchronous, terminal teardown. Unlike [disconnect], the underlying
* [LocalSocket] is closed on the calling thread, so the Node.js peer
* observes EOF before [close] returns — required when a fresh
* `OnCreate` will open a new connection immediately after.
*
* The instance must not be reused after [close]; construct a new one.
*/
fun close() {
scope.cancel()
sendChannel.close()
try { dataOutputStream?.close() } catch (_: Exception) {}
try { dataInputStream?.close() } catch (_: Exception) {}
if (::socket.isInitialized) {
try { socket.close() } catch (_: Exception) {}
}
state.value = State.Disconnected
}
}

/**
Expand Down
4 changes: 4 additions & 0 deletions backend/lib/message-port.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,9 @@ export class SocketMessagePort extends TypedEmitter {
this.#state = "closed";
this.#queue.length = 0;
this.#framedStream.destroy();
// ComapeoRpcServer wires `messagePort.on("close", () => server.close())`
// to clear rpc-reflector subscriptions registered against MapeoManager;
// without this emit, listeners leak on every reconnect (e.g. RN reload).
this.emit("close");
}
}
1 change: 1 addition & 0 deletions backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"type": "module",
"scripts": {
"build": "rollup -c",
"test": "node --test \"test/*.test.js\"",
"types": "tsc",
"postinstall": "patch-package"
},
Expand Down
133 changes: 133 additions & 0 deletions backend/test/message-port.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import { test } from "node:test";
import assert from "node:assert/strict";
import { Duplex } from "node:stream";
import FramedStream from "framed-stream";
import { Buffer } from "node:buffer";

import { SocketMessagePort } from "../lib/message-port.js";

/**
* In-process duplex pair mirroring AF_UNIX semantics: destroying one side
* delivers EOF to the peer.
*
* @returns {[Duplex, Duplex]}
*/
function pair() {
/** @type {Duplex} */
let a;
/** @type {Duplex} */
let b;
a = new Duplex({
read() {},
write(chunk, _enc, cb) {
b.push(chunk);
cb();
},
final(cb) {
b.push(null);
cb();
},
destroy(_err, cb) {
if (!b.destroyed) b.destroy();
cb(null);
},
});
b = new Duplex({
read() {},
write(chunk, _enc, cb) {
a.push(chunk);
cb();
},
final(cb) {
a.push(null);
cb();
},
destroy(_err, cb) {
if (!a.destroyed) a.destroy();
cb(null);
},
});
return [a, b];
}

test("SocketMessagePort emits 'close' once when the underlying socket closes", async () => {
const [serverSide, clientSide] = pair();
const port = new SocketMessagePort(serverSide);
port.start();

let closeCount = 0;
port.on("close", () => {
closeCount++;
});

clientSide.destroy();
await new Promise((resolve) => setImmediate(resolve));

assert.equal(closeCount, 1, "close must fire exactly once");
});

test("SocketMessagePort emits 'close' when close() is called directly", async () => {
const [serverSide] = pair();
const port = new SocketMessagePort(serverSide);
port.start();

let closeCount = 0;
port.on("close", () => {
closeCount++;
});

port.close();
port.close();

await new Promise((resolve) => setImmediate(resolve));

assert.equal(closeCount, 1, "close must be idempotent and fire at most once");
});

test(
"ComapeoRpcServer-style wiring: handler listeners are removed when the socket closes",
async () => {
const { createServer } = await import("rpc-reflector");
const { EventEmitter } = await import("node:events");

const handler = new EventEmitter();
const [serverSide, clientSide] = pair();

const messagePort = new SocketMessagePort(serverSide);
messagePort.start();

const server = createServer(handler, messagePort);
messagePort.on("close", () => server.close());

// msgType.ON === 2 in rpc-reflector/lib/constants.js
const ON = 2;
const onFrame = Buffer.from(JSON.stringify([ON, "progress", []]));
const lengthPrefix = Buffer.alloc(4);
lengthPrefix.writeUInt32LE(onFrame.length, 0);
clientSide.write(Buffer.concat([lengthPrefix, onFrame]));

await new Promise((resolve) => setImmediate(resolve));
assert.equal(handler.listenerCount("progress"), 1);

clientSide.destroy();
await new Promise((resolve) => setImmediate(resolve));

assert.equal(
handler.listenerCount("progress"),
0,
"listener must be removed once the socket closes",
);
},
);

test("smoke: FramedStream wired through pair() round-trips a message", async () => {
const [a, b] = pair();
const fa = new FramedStream(a);
const fb = new FramedStream(b);

const received = new Promise((resolve) => fb.once("data", resolve));
fa.write(Buffer.from("hello"));

const buf = await received;
assert.equal(buf.toString(), "hello");
});
21 changes: 14 additions & 7 deletions ios/ComapeoCoreModule.swift
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,25 @@ public class ComapeoCoreModule: Module {
}

OnDestroy {
// Relies on expo-modules-core PR #33760 (shipped in SDK 53+,
// verified against expo-modules-core@55.0.23): weak
// `MainValueConverter.appContext` lets AppContext deinit on
// JS reload, which fires .moduleDestroy and runs this block.
// If a future SDK reintroduces the strong-ref cycle, the
// fallback is to drive disconnect() from
// RCTBridgeWillReloadNotification /
// RCTJavaScriptWillStartLoadingNotification.
//
// disconnect() is already synchronous on iOS (shutdown +
// join + close), so the backend observes EOF before this
// block returns.
self.ipc?.disconnect()
self.ipc = nil
}

OnAppEntersForeground {
// `connect()` on NodeJSIPC is idempotent: it early-returns
// when the IPC is already .connected/.connecting/.disconnecting
// and resets a prior .error state so a fresh connect attempt
// can succeed. Calling it on every foreground is the cheap
// way to recover from a transient connection failure (e.g.
// an iOS suspension that closed the underlying fd) without
// tracking IPC state at this layer.
// Idempotent reconnect to recover from a transient failure
// (e.g. iOS suspension closed the underlying fd).
self.ipc?.connect()
}

Expand Down
Loading