diff --git a/android/src/androidTest/java/com/comapeo/core/NodeJSIPCTest.kt b/android/src/androidTest/java/com/comapeo/core/NodeJSIPCTest.kt index 65ef888..3e6a209 100644 --- a/android/src/androidTest/java/com/comapeo/core/NodeJSIPCTest.kt +++ b/android/src/androidTest/java/com/comapeo/core/NodeJSIPCTest.kt @@ -288,6 +288,44 @@ class NodeJSIPCTest { Thread.sleep(500) } + /** + * Pins the synchronous-close contract: after [NodeJSIPC.close] returns, + * the peer's blocking read must already have observed EOF. A regression + * to a launched-coroutine teardown would race with the next `OnCreate` + * and break rpc-reflector cleanup. + */ + @Test + fun closeReleasesSocketSynchronously() { + val connected = CountDownLatch(1) + val readResult = java.util.concurrent.atomic.AtomicInteger(Int.MIN_VALUE) + val readReturned = CountDownLatch(1) + + startMockServer { input, _ -> + connected.countDown() + try { + readResult.set(input.read()) + } catch (e: IOException) { + // Some kernels surface peer-close as IOException; treat as EOF. + readResult.set(-1) + } + readReturned.countDown() + } + + val ipc = NodeJSIPC(socketFile) { msg -> receivedMessages.add(msg) } + assertTrue("Should connect within 10s", connected.await(10, TimeUnit.SECONDS)) + // Let the receive loop settle so we test steady-state close, not + // a connect-cancel race. + Thread.sleep(200) + + ipc.close() + + assertTrue( + "Server-side read must unblock with EOF within 1s of close() returning", + readReturned.await(1, TimeUnit.SECONDS) + ) + assertEquals(-1, readResult.get()) + } + @Test fun handlesServerDisconnect() { val connected = CountDownLatch(1) diff --git a/android/src/main/java/com/comapeo/core/ComapeoCoreModule.kt b/android/src/main/java/com/comapeo/core/ComapeoCoreModule.kt index d5eed55..80dfe43 100644 --- a/android/src/main/java/com/comapeo/core/ComapeoCoreModule.kt +++ b/android/src/main/java/com/comapeo/core/ComapeoCoreModule.kt @@ -100,6 +100,10 @@ class ComapeoCoreModule : Module() { } override fun definition() = ModuleDefinition { + // OnCreate / OnDestroy are bound to the Expo AppContext (JS runtime + // lifetime), so they fire on every JS reload — the boundary at + // which rpc-reflector subscriptions on MapeoManager need to be + // torn down before a fresh client connects. OnCreate { val socketFile = File(appContext.persistentFilesDirectory, ComapeoCoreService.COMAPEO_SOCKET_FILENAME) @@ -190,19 +194,16 @@ class ComapeoCoreModule : Module() { } OnDestroy { - ipc.disconnect() - controlIpc.disconnect() + // Synchronous close: backend must see EOF on the previous + // session before the next OnCreate opens a new connection, or + // rpc-reflector cleanup runs against the wrong connection. + ipc.close() + controlIpc.close() } OnActivityEntersForeground { - // `connect()` is idempotent on `NodeJSIPC`: it early-returns - // when the IPC is already in Connected/Connecting/Disconnecting - // and resets a prior `Error` state so the next attempt can - // succeed. Calling it on every foreground transition is - // therefore the cheap way to recover from a transient - // connection failure (e.g. the FGS was killed and respawned - // while the app was backgrounded) without us tracking the - // IPC state ourselves at this layer. + // Idempotent reconnect to recover from a transient failure + // (e.g. FGS killed and respawned while backgrounded). ipc.connect() controlIpc.connect() } diff --git a/android/src/main/java/com/comapeo/core/NodeJSIPC.kt b/android/src/main/java/com/comapeo/core/NodeJSIPC.kt index 11cb650..e5ef5f9 100644 --- a/android/src/main/java/com/comapeo/core/NodeJSIPC.kt +++ b/android/src/main/java/com/comapeo/core/NodeJSIPC.kt @@ -8,6 +8,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay @@ -232,6 +233,25 @@ class NodeJSIPC( connect() sendChannel.trySend(message) } + + /** + * Synchronous, terminal teardown. Unlike [disconnect], the underlying + * [LocalSocket] is closed on the calling thread, so the Node.js peer + * observes EOF before [close] returns — required when a fresh + * `OnCreate` will open a new connection immediately after. + * + * The instance must not be reused after [close]; construct a new one. + */ + fun close() { + scope.cancel() + sendChannel.close() + try { dataOutputStream?.close() } catch (_: Exception) {} + try { dataInputStream?.close() } catch (_: Exception) {} + if (::socket.isInitialized) { + try { socket.close() } catch (_: Exception) {} + } + state.value = State.Disconnected + } } /** diff --git a/backend/lib/message-port.js b/backend/lib/message-port.js index d77621d..67f9a3c 100644 --- a/backend/lib/message-port.js +++ b/backend/lib/message-port.js @@ -100,5 +100,9 @@ export class SocketMessagePort extends TypedEmitter { this.#state = "closed"; this.#queue.length = 0; this.#framedStream.destroy(); + // ComapeoRpcServer wires `messagePort.on("close", () => server.close())` + // to clear rpc-reflector subscriptions registered against MapeoManager; + // without this emit, listeners leak on every reconnect (e.g. RN reload). + this.emit("close"); } } diff --git a/backend/package.json b/backend/package.json index afe05e5..b706ba7 100644 --- a/backend/package.json +++ b/backend/package.json @@ -5,6 +5,7 @@ "type": "module", "scripts": { "build": "rollup -c", + "test": "node --test \"test/*.test.js\"", "types": "tsc", "postinstall": "patch-package" }, diff --git a/backend/test/message-port.test.js b/backend/test/message-port.test.js new file mode 100644 index 0000000..c82ed00 --- /dev/null +++ b/backend/test/message-port.test.js @@ -0,0 +1,133 @@ +import { test } from "node:test"; +import assert from "node:assert/strict"; +import { Duplex } from "node:stream"; +import FramedStream from "framed-stream"; +import { Buffer } from "node:buffer"; + +import { SocketMessagePort } from "../lib/message-port.js"; + +/** + * In-process duplex pair mirroring AF_UNIX semantics: destroying one side + * delivers EOF to the peer. + * + * @returns {[Duplex, Duplex]} + */ +function pair() { + /** @type {Duplex} */ + let a; + /** @type {Duplex} */ + let b; + a = new Duplex({ + read() {}, + write(chunk, _enc, cb) { + b.push(chunk); + cb(); + }, + final(cb) { + b.push(null); + cb(); + }, + destroy(_err, cb) { + if (!b.destroyed) b.destroy(); + cb(null); + }, + }); + b = new Duplex({ + read() {}, + write(chunk, _enc, cb) { + a.push(chunk); + cb(); + }, + final(cb) { + a.push(null); + cb(); + }, + destroy(_err, cb) { + if (!a.destroyed) a.destroy(); + cb(null); + }, + }); + return [a, b]; +} + +test("SocketMessagePort emits 'close' once when the underlying socket closes", async () => { + const [serverSide, clientSide] = pair(); + const port = new SocketMessagePort(serverSide); + port.start(); + + let closeCount = 0; + port.on("close", () => { + closeCount++; + }); + + clientSide.destroy(); + await new Promise((resolve) => setImmediate(resolve)); + + assert.equal(closeCount, 1, "close must fire exactly once"); +}); + +test("SocketMessagePort emits 'close' when close() is called directly", async () => { + const [serverSide] = pair(); + const port = new SocketMessagePort(serverSide); + port.start(); + + let closeCount = 0; + port.on("close", () => { + closeCount++; + }); + + port.close(); + port.close(); + + await new Promise((resolve) => setImmediate(resolve)); + + assert.equal(closeCount, 1, "close must be idempotent and fire at most once"); +}); + +test( + "ComapeoRpcServer-style wiring: handler listeners are removed when the socket closes", + async () => { + const { createServer } = await import("rpc-reflector"); + const { EventEmitter } = await import("node:events"); + + const handler = new EventEmitter(); + const [serverSide, clientSide] = pair(); + + const messagePort = new SocketMessagePort(serverSide); + messagePort.start(); + + const server = createServer(handler, messagePort); + messagePort.on("close", () => server.close()); + + // msgType.ON === 2 in rpc-reflector/lib/constants.js + const ON = 2; + const onFrame = Buffer.from(JSON.stringify([ON, "progress", []])); + const lengthPrefix = Buffer.alloc(4); + lengthPrefix.writeUInt32LE(onFrame.length, 0); + clientSide.write(Buffer.concat([lengthPrefix, onFrame])); + + await new Promise((resolve) => setImmediate(resolve)); + assert.equal(handler.listenerCount("progress"), 1); + + clientSide.destroy(); + await new Promise((resolve) => setImmediate(resolve)); + + assert.equal( + handler.listenerCount("progress"), + 0, + "listener must be removed once the socket closes", + ); + }, +); + +test("smoke: FramedStream wired through pair() round-trips a message", async () => { + const [a, b] = pair(); + const fa = new FramedStream(a); + const fb = new FramedStream(b); + + const received = new Promise((resolve) => fb.once("data", resolve)); + fa.write(Buffer.from("hello")); + + const buf = await received; + assert.equal(buf.toString(), "hello"); +}); diff --git a/ios/ComapeoCoreModule.swift b/ios/ComapeoCoreModule.swift index 030b55d..b4e2e16 100644 --- a/ios/ComapeoCoreModule.swift +++ b/ios/ComapeoCoreModule.swift @@ -65,18 +65,25 @@ public class ComapeoCoreModule: Module { } OnDestroy { + // Relies on expo-modules-core PR #33760 (shipped in SDK 53+, + // verified against expo-modules-core@55.0.23): weak + // `MainValueConverter.appContext` lets AppContext deinit on + // JS reload, which fires .moduleDestroy and runs this block. + // If a future SDK reintroduces the strong-ref cycle, the + // fallback is to drive disconnect() from + // RCTBridgeWillReloadNotification / + // RCTJavaScriptWillStartLoadingNotification. + // + // disconnect() is already synchronous on iOS (shutdown + + // join + close), so the backend observes EOF before this + // block returns. self.ipc?.disconnect() self.ipc = nil } OnAppEntersForeground { - // `connect()` on NodeJSIPC is idempotent: it early-returns - // when the IPC is already .connected/.connecting/.disconnecting - // and resets a prior .error state so a fresh connect attempt - // can succeed. Calling it on every foreground is the cheap - // way to recover from a transient connection failure (e.g. - // an iOS suspension that closed the underlying fd) without - // tracking IPC state at this layer. + // Idempotent reconnect to recover from a transient failure + // (e.g. iOS suspension closed the underlying fd). self.ipc?.connect() }