Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,60 @@ if __name__ == "__main__":

```

### Health Check

The WebSocket client can monitor connection liveness using an app-level
JSON ping/pong (`{ "event": "ping" }` / `{ "event": "pong" }`). It is disabled
by default. When enabled, on each interval tick the client checks whether
**any** inbound message has arrived since the last ping it sent (freshness
check):

- If nothing arrived since the last ping, that counts as a *miss* and the
consecutive-miss counter is incremented.
- If any inbound message (a `pong`, market data, or anything else) arrived,
the counter is reset to `0`.
- Once the consecutive-miss counter reaches `max_missed_pongs`, the client
disconnects and stops the timer.

| Option | Type | Default | Description |
| ------------------ | ------ | ------- | -------------------------------------------------------------- |
| `enabled` | `bool` | `False` | Enables the health-check ping/pong. |
| `ping_interval` | `int` | `30000` | Interval in milliseconds between health-check pings. |
| `max_missed_pongs` | `int` | `2` | Consecutive misses (no inbound messages) before disconnecting. |

```py
from fugle_marketdata import WebSocketClient
from fugle_marketdata.websocket.client import HealthCheckConfig

client = WebSocketClient(
api_key='YOUR_API_KEY',
health_check=HealthCheckConfig(
enabled=True,
ping_interval=30000,
max_missed_pongs=2,
),
)
```

#### Disconnect reason

When the client disconnects because of a health-check timeout, the
`disconnect` event is emitted with an extra argument
`{"reason": "health-check-timeout"}`. Normal or manual disconnects emit
`disconnect` **without** that extra argument. Listeners that only read the
close code and message keep working unchanged.

```py
def handle_disconnect(code, message, info=None):
if info and info.get("reason") == "health-check-timeout":
print("Health check timed out, reconnecting...")
stock.connect()
stock.subscribe({"channel": "trades", "symbol": "2330"})


stock.on("disconnect", handle_disconnect)
```

## Error Handling

The library provides a custom `FugleAPIError` exception for API-related errors, which includes detailed debugging information.
Expand Down
71 changes: 47 additions & 24 deletions fugle_marketdata/websocket/client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
import orjson
import websocket
from typing import Optional
Expand Down Expand Up @@ -52,7 +53,10 @@ def __init__(self, **config):

# Health check properties
self.ping_timer = None
self.missed_pongs = 0
self.consecutive_misses = 0
self.last_message_at = 0.0
self.last_ping_at = 0.0
self.__disconnect_reason = None

def ping(self, message):
message = {
Expand Down Expand Up @@ -121,9 +125,16 @@ def __on_open(self, ws):
self.ee.emit(CONNECT_EVENT)

def __on_close(self, ws, close_status_code, close_msg):
self.ee.emit(DISCONNECT_EVENT, close_status_code, close_msg)
reason = self.__disconnect_reason
self.__disconnect_reason = None
if reason is not None:
self.ee.emit(DISCONNECT_EVENT, close_status_code, close_msg, reason)
else:
self.ee.emit(DISCONNECT_EVENT, close_status_code, close_msg)

def __on_message(self, ws, data):
# Any inbound message counts as freshness.
self.last_message_at = time.monotonic()
message = orjson.loads(data)
self.ee.emit(MESSAGE_EVENT, data)
if message['event'] == AUTHENTICATED_EVENT:
Expand All @@ -138,9 +149,6 @@ def __on_message(self, ws, data):
self.ee.emit(UNAUTHENTICATED_EVENT, message)
self.auth_status = AuthenticationState.UNAUTHENTICATED
self.error = Exception(UNAUTHENTICATED_MESSAGE)
elif message['event'] == 'pong':
# Reset missed pongs counter
self.missed_pongs = 0

def __on_error(self, ws, error):
self.ee.emit(ERROR_EVENT, error)
Expand All @@ -157,36 +165,51 @@ def check_auth_status(self):
self.error = Exception(AUTHENTICATION_TIMEOUT_MESSAGE)

def __start_health_check(self):
"""Start the health check ping/pong mechanism"""
"""Start the freshness-based health check ping/pong mechanism"""
if self.health_check and self.health_check.enabled:
self.missed_pongs = 0
self.__send_ping()
self.consecutive_misses = 0
# Seed timestamps so the first tick treats the connection as fresh.
now = time.monotonic()
self.last_message_at = now
self.last_ping_at = now
self.__schedule_next_ping()

def __schedule_next_ping(self):
if not self.health_check or not self.health_check.enabled:
return
interval_seconds = self.health_check.ping_interval / 1000.0
self.ping_timer = Timer(interval_seconds, self.__health_check_tick)
self.ping_timer.daemon = True
self.ping_timer.start()

def __send_ping(self):
"""Send ping and schedule next ping"""
def __health_check_tick(self):
"""Run one freshness check, then send a ping and reschedule."""
if not self.health_check or not self.health_check.enabled:
return

# Freshness check: did anything arrive since our last ping?
if self.last_message_at < self.last_ping_at:
self.consecutive_misses += 1
else:
self.consecutive_misses = 0

# Clamp to >= 1: max_missed_pongs of 0 would disconnect a healthy
# connection on the first tick (consecutive_misses starts at 0).
max_missed = max(1, self.health_check.max_missed_pongs)
if self.consecutive_misses >= max_missed:
self.__disconnect_reason = {"reason": "health-check-timeout"}
self.disconnect()
return

try:
self.ping("")
self.missed_pongs += 1
self.__check_missed_pongs()

# Schedule next ping
interval_seconds = self.health_check.ping_interval / 1000.0
self.ping_timer = Timer(interval_seconds, self.__send_ping)
self.ping_timer.start()
self.last_ping_at = time.monotonic()
self.__schedule_next_ping()
except Exception as error:
print(f"Failed to send ping: {error}")
self.__disconnect_reason = {"reason": "health-check-timeout"}
self.disconnect()

def __check_missed_pongs(self):
"""Check if too many pongs have been missed"""
if self.health_check and self.health_check.enabled:
if self.missed_pongs > self.health_check.max_missed_pongs:
self.disconnect()
raise Exception(f"Did not receive pong for {self.health_check.max_missed_pongs} consecutive times. Disconnecting...")

def connect(self):
Thread(target=self.__ws.run_forever).start()
while True:
Expand Down
161 changes: 160 additions & 1 deletion tests/test_websocket_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import threading
import time
from fugle_marketdata import WebSocketClient
from fugle_marketdata.websocket.client import HealthCheckConfig, WebSocketClient as CoreWebSocketClient
from fugle_marketdata.websocket.futopt.client import WebSocketFutOptClient
from fugle_marketdata.websocket.stock.client import WebSocketStockClient
import pytest
Expand Down Expand Up @@ -179,4 +181,161 @@ def test_base_url_with_path_and_trailing_slash(self):
stock = client.stock
assert stock.config['base_url'] == 'wss://ws.example.com/api/v2/stock/streaming'
futopt = client.futopt
assert futopt.config['base_url'] == 'wss://ws.example.com/api/v2/futopt/streaming'
assert futopt.config['base_url'] == 'wss://ws.example.com/api/v2/futopt/streaming'


def _build_health_client(max_missed_pongs=2, ping_interval=30000):
"""Build a core client with health check enabled, with the send and the
disconnect side effects stubbed so the freshness logic can be unit-tested
without a live socket."""
health = HealthCheckConfig(
enabled=True,
ping_interval=ping_interval,
max_missed_pongs=max_missed_pongs,
)
client = CoreWebSocketClient(
base_url='wss://ws.example.com/stock/streaming',
api_key='api-key',
health_check=health,
)

sent = []
# Stub the name-mangled private __send so ping() does not touch the socket.
client._WebSocketClient__send = lambda message: sent.append(message)
client._sent = sent

closed = {'count': 0}

def fake_disconnect():
closed['count'] += 1
if client.ping_timer is not None:
client.ping_timer.cancel()
client.ping_timer = None
# Mimic real on_close emitting the disconnect event with the reason.
reason = client._WebSocketClient__disconnect_reason
client._WebSocketClient__disconnect_reason = None
if reason is not None:
client.ee.emit('disconnect', None, None, reason)
else:
client.ee.emit('disconnect', None, None)

client.disconnect = fake_disconnect
client._closed = closed
return client


def _tick(client):
"""Run a single health-check tick synchronously."""
client._WebSocketClient__health_check_tick()


class TestWebSocketHealthCheck:
def test_config_defaults(self):
cfg = HealthCheckConfig()
assert cfg.enabled is False
assert cfg.ping_interval == 30000
assert cfg.max_missed_pongs == 2

def test_tick_sends_ping_and_updates_last_ping_at(self):
client = _build_health_client()
client.last_message_at = time.monotonic()
client.last_ping_at = client.last_message_at
before = client.last_ping_at
_tick(client)
# A ping is sent and last_ping_at advances.
assert any(m['event'] == 'ping' for m in client._sent)
assert client.last_ping_at >= before
assert client._closed['count'] == 0
if client.ping_timer:
client.ping_timer.cancel()

def test_inbound_message_resets_freshness_no_disconnect(self):
client = _build_health_client(max_missed_pongs=2)
client.last_message_at = time.monotonic()
client.last_ping_at = client.last_message_at

for _ in range(5):
_tick(client)
if client.ping_timer:
client.ping_timer.cancel()
client.ping_timer = None
# Simulate any inbound message arriving after the ping.
client._WebSocketClient__on_message(
None, b'{"event":"data","data":{}}'
)

assert client.consecutive_misses == 0
assert client._closed['count'] == 0

def test_consecutive_misses_accumulate_then_disconnect(self):
client = _build_health_client(max_missed_pongs=2)
client.last_message_at = time.monotonic()
client.last_ping_at = client.last_message_at

# First tick: fresh (seeded equal), sends ping, advances last_ping_at.
_tick(client)
if client.ping_timer:
client.ping_timer.cancel()
client.ping_timer = None
assert client.consecutive_misses == 0

# No inbound message arrives -> miss 1
_tick(client)
if client.ping_timer:
client.ping_timer.cancel()
client.ping_timer = None
assert client.consecutive_misses == 1
assert client._closed['count'] == 0

# Still nothing -> miss 2 reaches max -> disconnect
_tick(client)
assert client._closed['count'] == 1

def test_disconnect_event_has_timeout_reason_on_miss(self):
client = _build_health_client(max_missed_pongs=1)
received = {}

def on_disconnect(*args):
received['args'] = args

client.on('disconnect', on_disconnect)

# Seed so the connection is already stale relative to a future ping.
client.last_ping_at = time.monotonic()
client.last_message_at = client.last_ping_at - 1.0

_tick(client)
assert client._closed['count'] == 1
# disconnect emitted with (code, msg, reason)
assert received['args'][-1] == {"reason": "health-check-timeout"}

def test_max_missed_pongs_zero_is_clamped_to_one(self):
# max_missed_pongs=0 must not disconnect a healthy connection on the
# first tick; it is clamped to a minimum of 1.
client = _build_health_client(max_missed_pongs=0)
client.last_message_at = time.monotonic()
client.last_ping_at = client.last_message_at

_tick(client)
if client.ping_timer:
client.ping_timer.cancel()
client.ping_timer = None

assert client._closed['count'] == 0

def test_normal_disconnect_has_no_reason(self):
# Use a real (un-stubbed) core client to check on_close threading.
health = HealthCheckConfig(enabled=True)
client = CoreWebSocketClient(
base_url='wss://ws.example.com/stock/streaming',
api_key='api-key',
health_check=health,
)
received = {}
client.on('disconnect', lambda *args: received.update(args=args))

# No pending reason -> normal close, second-style arg absent.
client._WebSocketClient__on_close(None, 1000, 'normal')
assert received['args'] == (1000, 'normal')
# Only two args: no reason payload.
assert len(received['args']) == 2