diff --git a/README.md b/README.md index 44fb6ee..39f3899 100644 --- a/README.md +++ b/README.md @@ -69,6 +69,60 @@ if __name__ == "__main__": ``` +### Health Check + +The WebSocket client can monitor connection liveness using an app-level +JSON ping/pong (`{ "event": "ping" }` / `{ "event": "pong" }`). It is disabled +by default. When enabled, on each interval tick the client checks whether +**any** inbound message has arrived since the last ping it sent (freshness +check): + +- If nothing arrived since the last ping, that counts as a *miss* and the + consecutive-miss counter is incremented. +- If any inbound message (a `pong`, market data, or anything else) arrived, + the counter is reset to `0`. +- Once the consecutive-miss counter reaches `max_missed_pongs`, the client + disconnects and stops the timer. + +| Option | Type | Default | Description | +| ------------------ | ------ | ------- | -------------------------------------------------------------- | +| `enabled` | `bool` | `False` | Enables the health-check ping/pong. | +| `ping_interval` | `int` | `30000` | Interval in milliseconds between health-check pings. | +| `max_missed_pongs` | `int` | `2` | Consecutive misses (no inbound messages) before disconnecting. | + +```py +from fugle_marketdata import WebSocketClient +from fugle_marketdata.websocket.client import HealthCheckConfig + +client = WebSocketClient( + api_key='YOUR_API_KEY', + health_check=HealthCheckConfig( + enabled=True, + ping_interval=30000, + max_missed_pongs=2, + ), +) +``` + +#### Disconnect reason + +When the client disconnects because of a health-check timeout, the +`disconnect` event is emitted with an extra argument +`{"reason": "health-check-timeout"}`. Normal or manual disconnects emit +`disconnect` **without** that extra argument. Listeners that only read the +close code and message keep working unchanged. + +```py +def handle_disconnect(code, message, info=None): + if info and info.get("reason") == "health-check-timeout": + print("Health check timed out, reconnecting...") + stock.connect() + stock.subscribe({"channel": "trades", "symbol": "2330"}) + + +stock.on("disconnect", handle_disconnect) +``` + ## Error Handling The library provides a custom `FugleAPIError` exception for API-related errors, which includes detailed debugging information. diff --git a/fugle_marketdata/websocket/client.py b/fugle_marketdata/websocket/client.py index 2fcdcbd..0e84b9f 100644 --- a/fugle_marketdata/websocket/client.py +++ b/fugle_marketdata/websocket/client.py @@ -1,3 +1,4 @@ +import time import orjson import websocket from typing import Optional @@ -52,7 +53,10 @@ def __init__(self, **config): # Health check properties self.ping_timer = None - self.missed_pongs = 0 + self.consecutive_misses = 0 + self.last_message_at = 0.0 + self.last_ping_at = 0.0 + self.__disconnect_reason = None def ping(self, message): message = { @@ -121,9 +125,16 @@ def __on_open(self, ws): self.ee.emit(CONNECT_EVENT) def __on_close(self, ws, close_status_code, close_msg): - self.ee.emit(DISCONNECT_EVENT, close_status_code, close_msg) + reason = self.__disconnect_reason + self.__disconnect_reason = None + if reason is not None: + self.ee.emit(DISCONNECT_EVENT, close_status_code, close_msg, reason) + else: + self.ee.emit(DISCONNECT_EVENT, close_status_code, close_msg) def __on_message(self, ws, data): + # Any inbound message counts as freshness. + self.last_message_at = time.monotonic() message = orjson.loads(data) self.ee.emit(MESSAGE_EVENT, data) if message['event'] == AUTHENTICATED_EVENT: @@ -138,9 +149,6 @@ def __on_message(self, ws, data): self.ee.emit(UNAUTHENTICATED_EVENT, message) self.auth_status = AuthenticationState.UNAUTHENTICATED self.error = Exception(UNAUTHENTICATED_MESSAGE) - elif message['event'] == 'pong': - # Reset missed pongs counter - self.missed_pongs = 0 def __on_error(self, ws, error): self.ee.emit(ERROR_EVENT, error) @@ -157,36 +165,51 @@ def check_auth_status(self): self.error = Exception(AUTHENTICATION_TIMEOUT_MESSAGE) def __start_health_check(self): - """Start the health check ping/pong mechanism""" + """Start the freshness-based health check ping/pong mechanism""" if self.health_check and self.health_check.enabled: - self.missed_pongs = 0 - self.__send_ping() + self.consecutive_misses = 0 + # Seed timestamps so the first tick treats the connection as fresh. + now = time.monotonic() + self.last_message_at = now + self.last_ping_at = now + self.__schedule_next_ping() + + def __schedule_next_ping(self): + if not self.health_check or not self.health_check.enabled: + return + interval_seconds = self.health_check.ping_interval / 1000.0 + self.ping_timer = Timer(interval_seconds, self.__health_check_tick) + self.ping_timer.daemon = True + self.ping_timer.start() - def __send_ping(self): - """Send ping and schedule next ping""" + def __health_check_tick(self): + """Run one freshness check, then send a ping and reschedule.""" if not self.health_check or not self.health_check.enabled: return + # Freshness check: did anything arrive since our last ping? + if self.last_message_at < self.last_ping_at: + self.consecutive_misses += 1 + else: + self.consecutive_misses = 0 + + # Clamp to >= 1: max_missed_pongs of 0 would disconnect a healthy + # connection on the first tick (consecutive_misses starts at 0). + max_missed = max(1, self.health_check.max_missed_pongs) + if self.consecutive_misses >= max_missed: + self.__disconnect_reason = {"reason": "health-check-timeout"} + self.disconnect() + return + try: self.ping("") - self.missed_pongs += 1 - self.__check_missed_pongs() - - # Schedule next ping - interval_seconds = self.health_check.ping_interval / 1000.0 - self.ping_timer = Timer(interval_seconds, self.__send_ping) - self.ping_timer.start() + self.last_ping_at = time.monotonic() + self.__schedule_next_ping() except Exception as error: print(f"Failed to send ping: {error}") + self.__disconnect_reason = {"reason": "health-check-timeout"} self.disconnect() - def __check_missed_pongs(self): - """Check if too many pongs have been missed""" - if self.health_check and self.health_check.enabled: - if self.missed_pongs > self.health_check.max_missed_pongs: - self.disconnect() - raise Exception(f"Did not receive pong for {self.health_check.max_missed_pongs} consecutive times. Disconnecting...") - def connect(self): Thread(target=self.__ws.run_forever).start() while True: diff --git a/tests/test_websocket_client.py b/tests/test_websocket_client.py index ab0556b..87f22b1 100644 --- a/tests/test_websocket_client.py +++ b/tests/test_websocket_client.py @@ -1,5 +1,7 @@ import threading +import time from fugle_marketdata import WebSocketClient +from fugle_marketdata.websocket.client import HealthCheckConfig, WebSocketClient as CoreWebSocketClient from fugle_marketdata.websocket.futopt.client import WebSocketFutOptClient from fugle_marketdata.websocket.stock.client import WebSocketStockClient import pytest @@ -179,4 +181,161 @@ def test_base_url_with_path_and_trailing_slash(self): stock = client.stock assert stock.config['base_url'] == 'wss://ws.example.com/api/v2/stock/streaming' futopt = client.futopt - assert futopt.config['base_url'] == 'wss://ws.example.com/api/v2/futopt/streaming' \ No newline at end of file + assert futopt.config['base_url'] == 'wss://ws.example.com/api/v2/futopt/streaming' + + +def _build_health_client(max_missed_pongs=2, ping_interval=30000): + """Build a core client with health check enabled, with the send and the + disconnect side effects stubbed so the freshness logic can be unit-tested + without a live socket.""" + health = HealthCheckConfig( + enabled=True, + ping_interval=ping_interval, + max_missed_pongs=max_missed_pongs, + ) + client = CoreWebSocketClient( + base_url='wss://ws.example.com/stock/streaming', + api_key='api-key', + health_check=health, + ) + + sent = [] + # Stub the name-mangled private __send so ping() does not touch the socket. + client._WebSocketClient__send = lambda message: sent.append(message) + client._sent = sent + + closed = {'count': 0} + + def fake_disconnect(): + closed['count'] += 1 + if client.ping_timer is not None: + client.ping_timer.cancel() + client.ping_timer = None + # Mimic real on_close emitting the disconnect event with the reason. + reason = client._WebSocketClient__disconnect_reason + client._WebSocketClient__disconnect_reason = None + if reason is not None: + client.ee.emit('disconnect', None, None, reason) + else: + client.ee.emit('disconnect', None, None) + + client.disconnect = fake_disconnect + client._closed = closed + return client + + +def _tick(client): + """Run a single health-check tick synchronously.""" + client._WebSocketClient__health_check_tick() + + +class TestWebSocketHealthCheck: + def test_config_defaults(self): + cfg = HealthCheckConfig() + assert cfg.enabled is False + assert cfg.ping_interval == 30000 + assert cfg.max_missed_pongs == 2 + + def test_tick_sends_ping_and_updates_last_ping_at(self): + client = _build_health_client() + client.last_message_at = time.monotonic() + client.last_ping_at = client.last_message_at + before = client.last_ping_at + _tick(client) + # A ping is sent and last_ping_at advances. + assert any(m['event'] == 'ping' for m in client._sent) + assert client.last_ping_at >= before + assert client._closed['count'] == 0 + if client.ping_timer: + client.ping_timer.cancel() + + def test_inbound_message_resets_freshness_no_disconnect(self): + client = _build_health_client(max_missed_pongs=2) + client.last_message_at = time.monotonic() + client.last_ping_at = client.last_message_at + + for _ in range(5): + _tick(client) + if client.ping_timer: + client.ping_timer.cancel() + client.ping_timer = None + # Simulate any inbound message arriving after the ping. + client._WebSocketClient__on_message( + None, b'{"event":"data","data":{}}' + ) + + assert client.consecutive_misses == 0 + assert client._closed['count'] == 0 + + def test_consecutive_misses_accumulate_then_disconnect(self): + client = _build_health_client(max_missed_pongs=2) + client.last_message_at = time.monotonic() + client.last_ping_at = client.last_message_at + + # First tick: fresh (seeded equal), sends ping, advances last_ping_at. + _tick(client) + if client.ping_timer: + client.ping_timer.cancel() + client.ping_timer = None + assert client.consecutive_misses == 0 + + # No inbound message arrives -> miss 1 + _tick(client) + if client.ping_timer: + client.ping_timer.cancel() + client.ping_timer = None + assert client.consecutive_misses == 1 + assert client._closed['count'] == 0 + + # Still nothing -> miss 2 reaches max -> disconnect + _tick(client) + assert client._closed['count'] == 1 + + def test_disconnect_event_has_timeout_reason_on_miss(self): + client = _build_health_client(max_missed_pongs=1) + received = {} + + def on_disconnect(*args): + received['args'] = args + + client.on('disconnect', on_disconnect) + + # Seed so the connection is already stale relative to a future ping. + client.last_ping_at = time.monotonic() + client.last_message_at = client.last_ping_at - 1.0 + + _tick(client) + assert client._closed['count'] == 1 + # disconnect emitted with (code, msg, reason) + assert received['args'][-1] == {"reason": "health-check-timeout"} + + def test_max_missed_pongs_zero_is_clamped_to_one(self): + # max_missed_pongs=0 must not disconnect a healthy connection on the + # first tick; it is clamped to a minimum of 1. + client = _build_health_client(max_missed_pongs=0) + client.last_message_at = time.monotonic() + client.last_ping_at = client.last_message_at + + _tick(client) + if client.ping_timer: + client.ping_timer.cancel() + client.ping_timer = None + + assert client._closed['count'] == 0 + + def test_normal_disconnect_has_no_reason(self): + # Use a real (un-stubbed) core client to check on_close threading. + health = HealthCheckConfig(enabled=True) + client = CoreWebSocketClient( + base_url='wss://ws.example.com/stock/streaming', + api_key='api-key', + health_check=health, + ) + received = {} + client.on('disconnect', lambda *args: received.update(args=args)) + + # No pending reason -> normal close, second-style arg absent. + client._WebSocketClient__on_close(None, 1000, 'normal') + assert received['args'] == (1000, 'normal') + # Only two args: no reason payload. + assert len(received['args']) == 2 \ No newline at end of file