Skip to content

Commit dcb8828

Browse files
committed
Add audio streaming (host -> viewer PCM) for Remote Desktop
A new AUDIO message type carries 16-bit signed PCM blocks (16 kHz mono, 50 ms per block by default) alongside JPEG frames on the same channel. The 'sounddevice' dependency stays optional: audio.py imports it lazily so machines without PortAudio can still import the package, and a backend failure during host startup is logged + audio is reported disabled rather than tearing the host down. Host: enable_audio + audio_device / sample_rate / channels / block configure capture; the host's broadcast loop pushes each block into a bounded per-client deque (max ~2.5 s buffered), and a dedicated audio sender thread per client drains the queue. The bounded queue means a slow viewer drops old chunks instead of blocking the audio capture thread feeding everyone else. Viewer: a new on_audio callback fires on each AUDIO message; combined with AudioPlayer (also a thin sounddevice wrapper) callers get playback in two lines. The viewer never opens an audio device on its own — playback is opt-in. Tests fake sounddevice via monkeypatch and cover both unit-level behaviour (callback bytes, lazy backend, lifecycle, validation) and end-to-end host->viewer streaming, queue back-pressure, and graceful degradation when the backend cannot start.
1 parent fcdf352 commit dcb8828

6 files changed

Lines changed: 585 additions & 1 deletion

File tree

je_auto_control/utils/remote_desktop/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@
99
it to untrusted networks should be paired with an SSH tunnel or TLS
1010
front-end.
1111
"""
12+
from je_auto_control.utils.remote_desktop.audio import (
13+
AudioBackendError, AudioCapture, AudioPlayer,
14+
is_audio_backend_available,
15+
)
1216
from je_auto_control.utils.remote_desktop.host import RemoteDesktopHost
1317
from je_auto_control.utils.remote_desktop.host_id import (
1418
HostIdError, format_host_id, generate_host_id, load_or_create_host_id,
@@ -36,4 +40,6 @@
3640
"dispatch_input", "registry",
3741
"HostIdError", "format_host_id", "generate_host_id",
3842
"load_or_create_host_id", "parse_host_id", "validate_host_id",
43+
"AudioBackendError", "AudioCapture", "AudioPlayer",
44+
"is_audio_backend_available",
3945
]
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
"""Audio capture + playback wrappers around the optional ``sounddevice`` lib.
2+
3+
Both classes import ``sounddevice`` lazily so the package stays importable
4+
on systems without PortAudio. ``AudioCapture`` pulls signed-int16 PCM in
5+
fixed-size blocks via the library's callback API and forwards each block
6+
as raw bytes; ``AudioPlayer`` accepts the same byte format and writes it
7+
to the default (or user-chosen) output device.
8+
9+
Defaults are 16 kHz, mono, 50 ms blocks (1600 bytes per block, ~32 KB/s)
10+
— small enough that audio chunks ride alongside JPEG frames over a LAN
11+
without noticeably starving the video pipe.
12+
"""
13+
import threading
14+
from typing import Callable, Optional
15+
16+
DEFAULT_SAMPLE_RATE = 16_000
17+
DEFAULT_CHANNELS = 1
18+
DEFAULT_BLOCK_FRAMES = 800 # 50 ms at 16 kHz
19+
SAMPLE_DTYPE = "int16"
20+
BYTES_PER_SAMPLE = 2
21+
22+
AudioBlockCallback = Callable[[bytes], None]
23+
24+
25+
class AudioBackendError(RuntimeError):
26+
"""Raised when the optional ``sounddevice`` backend cannot be loaded."""
27+
28+
29+
def _load_sounddevice():
30+
"""Import ``sounddevice`` lazily; raise a helpful error if missing."""
31+
try:
32+
import sounddevice # noqa: PLC0415 intentional lazy import
33+
except ImportError as error:
34+
raise AudioBackendError(
35+
"audio support requires 'sounddevice'. Install with: "
36+
"pip install sounddevice"
37+
) from error
38+
return sounddevice
39+
40+
41+
def is_audio_backend_available() -> bool:
42+
"""Return True if ``sounddevice`` can be imported."""
43+
try:
44+
_load_sounddevice()
45+
except AudioBackendError:
46+
return False
47+
return True
48+
49+
50+
class AudioCapture:
51+
"""Capture mono int16 PCM blocks and hand them to ``on_block`` as bytes.
52+
53+
``on_block`` is invoked from the audio library's internal thread, so
54+
callers must keep it cheap (queueing / signalling is fine; CPU-heavy
55+
work blocks the audio pipeline).
56+
"""
57+
58+
def __init__(self, on_block: AudioBlockCallback,
59+
device: Optional[int] = None,
60+
sample_rate: int = DEFAULT_SAMPLE_RATE,
61+
channels: int = DEFAULT_CHANNELS,
62+
block_frames: int = DEFAULT_BLOCK_FRAMES) -> None:
63+
if not callable(on_block):
64+
raise TypeError("on_block must be callable")
65+
if sample_rate <= 0 or channels <= 0 or block_frames <= 0:
66+
raise ValueError(
67+
"sample_rate, channels and block_frames must be positive"
68+
)
69+
self._on_block = on_block
70+
self._device = device
71+
self._sample_rate = int(sample_rate)
72+
self._channels = int(channels)
73+
self._block_frames = int(block_frames)
74+
self._stream = None
75+
self._lock = threading.Lock()
76+
77+
@property
78+
def sample_rate(self) -> int:
79+
return self._sample_rate
80+
81+
@property
82+
def channels(self) -> int:
83+
return self._channels
84+
85+
@property
86+
def is_running(self) -> bool:
87+
return self._stream is not None
88+
89+
def start(self) -> None:
90+
"""Open the input stream; subsequent blocks fire ``on_block`` callbacks."""
91+
with self._lock:
92+
if self._stream is not None:
93+
return
94+
sd = _load_sounddevice()
95+
self._stream = sd.RawInputStream(
96+
samplerate=self._sample_rate,
97+
channels=self._channels,
98+
dtype=SAMPLE_DTYPE,
99+
blocksize=self._block_frames,
100+
device=self._device,
101+
callback=self._raw_callback,
102+
)
103+
self._stream.start()
104+
105+
def stop(self) -> None:
106+
"""Stop and release the input stream."""
107+
with self._lock:
108+
stream = self._stream
109+
self._stream = None
110+
if stream is None:
111+
return
112+
try:
113+
stream.stop()
114+
finally:
115+
try:
116+
stream.close()
117+
except (OSError, RuntimeError):
118+
pass
119+
120+
def _raw_callback(self, indata, frames, time_info, status) -> None:
121+
del frames, time_info # unused — block size is fixed
122+
if status:
123+
# Drops / overflows are surfaced via ``status``; we let the
124+
# audio thread continue rather than tearing down the stream.
125+
return
126+
try:
127+
self._on_block(bytes(indata))
128+
except Exception: # noqa: BLE001 callback isolation
129+
# We must not propagate user callback errors back into PortAudio.
130+
pass
131+
132+
133+
class AudioPlayer:
134+
"""Play int16 PCM bytes through the default (or chosen) output device."""
135+
136+
def __init__(self, device: Optional[int] = None,
137+
sample_rate: int = DEFAULT_SAMPLE_RATE,
138+
channels: int = DEFAULT_CHANNELS) -> None:
139+
if sample_rate <= 0 or channels <= 0:
140+
raise ValueError("sample_rate and channels must be positive")
141+
self._device = device
142+
self._sample_rate = int(sample_rate)
143+
self._channels = int(channels)
144+
self._stream = None
145+
self._lock = threading.Lock()
146+
147+
@property
148+
def is_running(self) -> bool:
149+
return self._stream is not None
150+
151+
def start(self) -> None:
152+
"""Open the output stream so :meth:`play` becomes valid."""
153+
with self._lock:
154+
if self._stream is not None:
155+
return
156+
sd = _load_sounddevice()
157+
self._stream = sd.RawOutputStream(
158+
samplerate=self._sample_rate,
159+
channels=self._channels,
160+
dtype=SAMPLE_DTYPE,
161+
device=self._device,
162+
)
163+
self._stream.start()
164+
165+
def play(self, chunk: bytes) -> None:
166+
"""Write a chunk of int16 PCM bytes to the stream."""
167+
if not isinstance(chunk, (bytes, bytearray, memoryview)):
168+
raise TypeError("chunk must be bytes-like")
169+
if not chunk:
170+
return
171+
stream = self._stream
172+
if stream is None:
173+
raise RuntimeError("AudioPlayer is not running; call start() first")
174+
try:
175+
stream.write(bytes(chunk))
176+
except (OSError, RuntimeError):
177+
# Late writes after stop / device removal — ignore so the
178+
# network thread can keep flowing without crashing.
179+
pass
180+
181+
def stop(self) -> None:
182+
with self._lock:
183+
stream = self._stream
184+
self._stream = None
185+
if stream is None:
186+
return
187+
try:
188+
stream.stop()
189+
finally:
190+
try:
191+
stream.close()
192+
except (OSError, RuntimeError):
193+
pass

0 commit comments

Comments
 (0)