Skip to content

Commit 7c87e37

Browse files
committed
Add resource subscriptions and live screen feed
resources/subscribe / resources/unsubscribe wire a producer callback into a provider. ResourceProvider gains optional subscribe/unsubscribe hooks; ChainProvider fans out to children; LiveScreenProvider exposes autocontrol://screen/live and pushes notifications/resources/updated every poll_seconds while at least one client is subscribed. resources.subscribe capability flag flipped to true so clients know to attempt subscriptions.
1 parent f8bd1ba commit 7c87e37

4 files changed

Lines changed: 232 additions & 8 deletions

File tree

je_auto_control/utils/mcp_server/__init__.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,18 @@
2727
MCPPrompt, MCPPromptArgument, PromptProvider, default_prompt_provider,
2828
)
2929
from je_auto_control.utils.mcp_server.resources import (
30-
MCPResource, ResourceProvider, default_resource_provider,
30+
LiveScreenProvider, MCPResource, ResourceProvider,
31+
default_resource_provider,
3132
)
3233
from je_auto_control.utils.mcp_server.tools import (
3334
MCPContent, MCPTool, MCPToolAnnotations, build_default_tool_registry,
3435
make_plugin_tool, register_plugin_tools,
3536
)
3637

3738
__all__ = [
38-
"AuditLogger", "FakeState", "HttpMCPServer", "MCPContent",
39-
"MCPLogBridge", "MCPPrompt", "MCPPromptArgument", "MCPResource",
40-
"MCPServer", "MCPTool", "MCPToolAnnotations",
39+
"AuditLogger", "FakeState", "HttpMCPServer", "LiveScreenProvider",
40+
"MCPContent", "MCPLogBridge", "MCPPrompt", "MCPPromptArgument",
41+
"MCPResource", "MCPServer", "MCPTool", "MCPToolAnnotations",
4142
"OperationCancelledError", "PluginWatcher", "PromptProvider",
4243
"RateLimiter", "ResourceProvider", "ToolCallContext",
4344
"build_default_tool_registry",

je_auto_control/utils/mcp_server/resources.py

Lines changed: 110 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,13 @@
77
abstraction lets callers compose custom sources without touching the
88
JSON-RPC layer.
99
"""
10+
import base64
11+
import io
1012
import json
1113
import os
14+
import threading
1215
from dataclasses import dataclass
13-
from typing import Any, Dict, List, Optional
16+
from typing import Any, Callable, Dict, List, Optional
1417

1518

1619
@dataclass(frozen=True)
@@ -45,6 +48,21 @@ def set_workspace_root(self, root: str) -> None:
4548
"""Hook for MCP roots. Default: no-op. FS-backed providers override."""
4649
del root
4750

51+
def subscribe(self, uri: str,
52+
on_update: Callable[[], None]) -> Optional[Any]:
53+
"""Optional hook: start emitting ``on_update`` calls until unsubscribed.
54+
55+
Return a non-``None`` handle when this provider owns ``uri`` and
56+
accepted the subscription. The default implementation returns
57+
``None`` (not subscribable).
58+
"""
59+
del uri, on_update
60+
return None
61+
62+
def unsubscribe(self, uri: str, handle: Any) -> None:
63+
"""Cancel a previous :meth:`subscribe` handle."""
64+
del uri, handle
65+
4866

4967
class FileSystemProvider(ResourceProvider):
5068
"""Expose ``*.json`` action files in ``root`` under ``<scheme>://files/<name>``."""
@@ -173,18 +191,107 @@ def set_workspace_root(self, root: str) -> None:
173191
for provider in self.providers:
174192
provider.set_workspace_root(root)
175193

194+
def subscribe(self, uri: str,
195+
on_update: Callable[[], None]) -> Optional[Any]:
196+
for provider in self.providers:
197+
handle = provider.subscribe(uri, on_update)
198+
if handle is not None:
199+
return (provider, handle)
200+
return None
201+
202+
def unsubscribe(self, uri: str, handle: Any) -> None:
203+
if not isinstance(handle, tuple) or len(handle) != 2:
204+
return
205+
provider, child_handle = handle
206+
provider.unsubscribe(uri, child_handle)
207+
208+
209+
class LiveScreenProvider(ResourceProvider):
210+
"""Live screen feed at ``autocontrol://screen/live``.
211+
212+
``read`` always grabs a fresh PNG (base64-encoded). Subscribers
213+
receive ``on_update`` calls every ``poll_seconds`` so they can
214+
re-fetch the resource and surface live state to the model.
215+
"""
216+
217+
URI = "autocontrol://screen/live"
218+
219+
def __init__(self, poll_seconds: float = 1.0) -> None:
220+
self._poll_seconds = max(0.1, float(poll_seconds))
221+
self._lock = threading.Lock()
222+
self._subscribers: Dict[int, Callable[[], None]] = {}
223+
self._next_handle = 1
224+
self._thread: Optional[threading.Thread] = None
225+
self._stop = threading.Event()
226+
227+
def list(self) -> List[MCPResource]:
228+
return [MCPResource(
229+
uri=self.URI, name="screen_live",
230+
description=("Current screen as base64 PNG. Subscribe to be "
231+
"notified when it should be re-fetched."),
232+
mime_type="image/png",
233+
)]
234+
235+
def read(self, uri: str) -> Optional[Dict[str, Any]]:
236+
if uri != self.URI:
237+
return None
238+
from je_auto_control.utils.cv2_utils.screenshot import pil_screenshot
239+
image = pil_screenshot()
240+
buffer = io.BytesIO()
241+
image.save(buffer, format="PNG")
242+
encoded = base64.b64encode(buffer.getvalue()).decode("ascii")
243+
return {"uri": uri, "mimeType": "image/png", "blob": encoded}
244+
245+
def subscribe(self, uri: str,
246+
on_update: Callable[[], None]) -> Optional[Any]:
247+
if uri != self.URI:
248+
return None
249+
with self._lock:
250+
handle = self._next_handle
251+
self._next_handle += 1
252+
self._subscribers[handle] = on_update
253+
if self._thread is None or not self._thread.is_alive():
254+
self._stop.clear()
255+
self._thread = threading.Thread(
256+
target=self._broadcast_loop, daemon=True,
257+
name="MCPLiveScreen",
258+
)
259+
self._thread.start()
260+
return handle
261+
262+
def unsubscribe(self, uri: str, handle: Any) -> None:
263+
if uri != self.URI:
264+
return
265+
with self._lock:
266+
self._subscribers.pop(int(handle), None)
267+
if not self._subscribers:
268+
self._stop.set()
269+
self._thread = None
270+
271+
def _broadcast_loop(self) -> None:
272+
while not self._stop.is_set():
273+
with self._lock:
274+
callbacks = list(self._subscribers.values())
275+
for callback in callbacks:
276+
try:
277+
callback()
278+
except (OSError, RuntimeError, ValueError):
279+
pass
280+
self._stop.wait(self._poll_seconds)
281+
176282

177283
def default_resource_provider(root: str = ".") -> ResourceProvider:
178284
"""Return the resource provider exposed by the default MCP server."""
179285
return ChainProvider([
180286
FileSystemProvider(root=root),
181287
HistoryProvider(),
182288
CommandsProvider(),
289+
LiveScreenProvider(),
183290
])
184291

185292

186293
__all__ = [
187294
"ChainProvider", "CommandsProvider", "FileSystemProvider",
188-
"HistoryProvider", "MCPResource", "ResourceProvider",
189-
"default_resource_provider",
295+
"HistoryProvider", "LiveScreenProvider", "MCPResource",
296+
"ResourceProvider", "default_resource_provider",
190297
]

je_auto_control/utils/mcp_server/server.py

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ def __init__(self, tools: Optional[List[MCPTool]] = None,
8686
self._pending_outbound: Dict[Any, Dict[str, Any]] = {}
8787
self._outbound_lock = threading.Lock()
8888
self._client_capabilities: Dict[str, Any] = {}
89+
self._resource_subscriptions: Dict[str, Any] = {}
90+
self._subscriptions_lock = threading.Lock()
8991

9092
def register_tool(self, tool: MCPTool) -> None:
9193
"""Add or replace a tool in the live registry.
@@ -379,6 +381,10 @@ def _dispatch(self, msg_id: Any, method: Optional[str],
379381
for resource in self._resources.list()]}
380382
if method == "resources/read":
381383
return self._handle_resources_read(params)
384+
if method == "resources/subscribe":
385+
return self._handle_resources_subscribe(params)
386+
if method == "resources/unsubscribe":
387+
return self._handle_resources_unsubscribe(params)
382388
if method == "prompts/list":
383389
return {"prompts": [prompt.to_descriptor()
384390
for prompt in self._prompts.list()]}
@@ -411,7 +417,7 @@ def _handle_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]:
411417
self._client_capabilities = client_caps
412418
capabilities: Dict[str, Any] = {
413419
"tools": {"listChanged": True},
414-
"resources": {"listChanged": False, "subscribe": False},
420+
"resources": {"listChanged": False, "subscribe": True},
415421
"prompts": {"listChanged": False},
416422
"sampling": {},
417423
"logging": {},
@@ -434,6 +440,45 @@ def _handle_resources_read(self,
434440
raise _MCPError(-32602, f"Unknown resource: {uri}")
435441
return {"contents": [content]}
436442

443+
def _handle_resources_subscribe(self,
444+
params: Dict[str, Any]) -> Dict[str, Any]:
445+
uri = params.get("uri")
446+
if not isinstance(uri, str) or not uri:
447+
raise _MCPError(-32602, "resources/subscribe requires 'uri'")
448+
with self._subscriptions_lock:
449+
if uri in self._resource_subscriptions:
450+
return {}
451+
handle = self._resources.subscribe(
452+
uri, lambda u=uri: self._notify_resource_updated(u),
453+
)
454+
if handle is None:
455+
raise _MCPError(-32602, f"Unsubscribable resource: {uri}")
456+
with self._subscriptions_lock:
457+
self._resource_subscriptions[uri] = handle
458+
return {}
459+
460+
def _handle_resources_unsubscribe(self,
461+
params: Dict[str, Any]) -> Dict[str, Any]:
462+
uri = params.get("uri")
463+
if not isinstance(uri, str) or not uri:
464+
raise _MCPError(-32602, "resources/unsubscribe requires 'uri'")
465+
with self._subscriptions_lock:
466+
handle = self._resource_subscriptions.pop(uri, None)
467+
if handle is not None:
468+
self._resources.unsubscribe(uri, handle)
469+
return {}
470+
471+
def _notify_resource_updated(self, uri: str) -> None:
472+
notifier = self._notifier
473+
if notifier is None:
474+
return
475+
try:
476+
notifier("notifications/resources/updated", {"uri": uri})
477+
except (OSError, RuntimeError, ValueError):
478+
autocontrol_logger.exception(
479+
"MCP failed to send resources/updated for %s", uri,
480+
)
481+
437482
def _handle_prompts_get(self, params: Dict[str, Any]) -> Dict[str, Any]:
438483
name = params.get("name")
439484
arguments = params.get("arguments") or {}

test/unit_test/headless/test_mcp_server.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1504,6 +1504,77 @@ def test_aliases_excluded_from_read_only_registry():
15041504
assert "mouse_pos" in names
15051505

15061506

1507+
def test_initialize_advertises_resources_subscribe_capability():
1508+
server = MCPServer(tools=[])
1509+
response = _decode(server.handle_line(_request("initialize", params={})))
1510+
caps = response["result"]["capabilities"]
1511+
assert caps["resources"]["subscribe"] is True
1512+
1513+
1514+
def test_resources_subscribe_and_notification_round_trip():
1515+
"""Subscribe to a fake resource and verify the server forwards updates."""
1516+
from je_auto_control.utils.mcp_server.resources import (
1517+
ChainProvider, MCPResource, ResourceProvider,
1518+
)
1519+
1520+
class _FakeProvider(ResourceProvider):
1521+
URI = "fake://live"
1522+
1523+
def __init__(self):
1524+
self.callback = None
1525+
1526+
def list(self):
1527+
return [MCPResource(uri=self.URI, name="fake")]
1528+
1529+
def read(self, uri):
1530+
if uri == self.URI:
1531+
return {"uri": uri, "mimeType": "text/plain", "text": "hi"}
1532+
return None
1533+
1534+
def subscribe(self, uri, on_update):
1535+
if uri != self.URI:
1536+
return None
1537+
self.callback = on_update
1538+
return "fake-handle"
1539+
1540+
def unsubscribe(self, uri, handle):
1541+
self.callback = None
1542+
1543+
fake = _FakeProvider()
1544+
chain = ChainProvider([fake])
1545+
captured = []
1546+
server = MCPServer(tools=[], resource_provider=chain)
1547+
server.set_notifier(lambda method, params: captured.append((method, params)))
1548+
1549+
sub_response = _decode(server.handle_line(_request(
1550+
"resources/subscribe", params={"uri": "fake://live"},
1551+
)))
1552+
assert sub_response["result"] == {}
1553+
assert fake.callback is not None
1554+
1555+
# Simulate the provider noticing fresh content.
1556+
fake.callback()
1557+
1558+
methods = [event[0] for event in captured]
1559+
assert "notifications/resources/updated" in methods
1560+
update = next(e for e in captured if e[0] == "notifications/resources/updated")
1561+
assert update[1] == {"uri": "fake://live"}
1562+
1563+
unsub_response = _decode(server.handle_line(_request(
1564+
"resources/unsubscribe", params={"uri": "fake://live"},
1565+
)))
1566+
assert unsub_response["result"] == {}
1567+
assert fake.callback is None
1568+
1569+
1570+
def test_resources_subscribe_rejects_unknown_uri():
1571+
server = MCPServer(tools=[])
1572+
response = _decode(server.handle_line(_request(
1573+
"resources/subscribe", params={"uri": "fake://nowhere"},
1574+
)))
1575+
assert response["error"]["code"] == -32602
1576+
1577+
15071578
def test_default_registry_lists_core_automation_tools():
15081579
names = {tool.name for tool in build_default_tool_registry()}
15091580
expected = {

0 commit comments

Comments
 (0)