Skip to content

Commit b753534

Browse files
committed
feat: SSE events + Web→Discord reverse push
Phase 3 of review API integration: SSE events (events.py): - emit_chat_message: chat messages with source tracking - emit_approved: node approved with source - emit_rejected: node rejected with reason + source - emit_rolled_back: rollback from→to with source JSON SSE endpoint (sse.py): - GET /api/runs/{run_id}/events — JSON SSE for external clients Web→Discord reverse push (api.py): - approve via web → Discord: '✅ {node} 已在网页端通过' - revise via web → Discord: '🔄 {node} 被驳回(网页端)' - reject via API → Discord: '🔄 {node} 被驳回' - rollback via API → Discord: '⏪ 已回退: {from} → {to}'
1 parent 3a7f7fc commit b753534

3 files changed

Lines changed: 95 additions & 5 deletions

File tree

scripts/web/events.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,3 +111,23 @@ def emit_review_needed(run_id: str, node: str, review_type: str) -> None:
111111

112112
def emit_run_complete(run_id: str, total_time_ms: int = 0) -> None:
113113
event_bus.publish_sync(PipelineEvent(type="run_complete", run_id=run_id, data={"status": "completed", "total_time_ms": total_time_ms}))
114+
115+
def emit_chat_message(run_id: str, node: str, role: str, message: str, source: str = "") -> None:
116+
event_bus.publish_sync(PipelineEvent(type="chat_message", run_id=run_id, data={
117+
"node": node, "role": role, "message": message[:500], "source": source,
118+
}))
119+
120+
def emit_approved(run_id: str, node: str, next_node: str = "", source: str = "") -> None:
121+
event_bus.publish_sync(PipelineEvent(type="approved", run_id=run_id, data={
122+
"node": node, "next": next_node, "source": source,
123+
}))
124+
125+
def emit_rejected(run_id: str, node: str, reason: str = "", source: str = "") -> None:
126+
event_bus.publish_sync(PipelineEvent(type="rejected", run_id=run_id, data={
127+
"node": node, "reason": reason[:300], "source": source,
128+
}))
129+
130+
def emit_rolled_back(run_id: str, from_node: str, to_node: str, source: str = "") -> None:
131+
event_bus.publish_sync(PipelineEvent(type="rolled_back", run_id=run_id, data={
132+
"from_node": from_node, "to_node": to_node, "source": source,
133+
}))

scripts/web/routes/api.py

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,11 @@
2626
get_run_artifact, load_settings, save_settings,
2727
PIPELINE_NODES, _load_raw_state, _save_state,
2828
)
29-
from web.events import event_bus, emit_node_start, emit_node_complete, emit_run_complete
29+
from web.events import (
30+
event_bus, emit_node_start, emit_node_complete, emit_run_complete,
31+
emit_chat_message, emit_approved, emit_rejected, emit_rolled_back,
32+
emit_review_needed,
33+
)
3034

3135

3236
def _node_session_key(state: dict, run_id: str, node_id: str, lane: str = "main") -> str:
@@ -614,6 +618,29 @@ async def api_submit_review(request: Request, run_id: str, background_tasks: Bac
614618

615619
_save_state(raw)
616620

621+
# SSE 事件 + Discord 反向推送
622+
current_node = raw.get("current_stage", "")
623+
if action in ("approve", "select"):
624+
emit_approved(run_id, current_node, source="web")
625+
try:
626+
from web.notify import notify_discord
627+
asyncio.ensure_future(notify_discord(
628+
f"✅ **{current_node}** 已在网页端通过",
629+
run_id=run_id, node=current_node,
630+
))
631+
except Exception:
632+
pass
633+
elif action == "revise":
634+
emit_rejected(run_id, current_node, reason=feedback if action == "revise" else "", source="web")
635+
try:
636+
from web.notify import notify_discord
637+
asyncio.ensure_future(notify_discord(
638+
f"🔄 **{current_node}** 被驳回(网页端)\n> {feedback[:200]}",
639+
run_id=run_id, node=current_node,
640+
))
641+
except Exception:
642+
pass
643+
617644
# 恢复 Pipeline 执行
618645
background_tasks.add_task(_execute_pipeline, run_id)
619646

@@ -668,6 +695,7 @@ async def api_chat(request: Request, run_id: str):
668695
history = get_chat_history(run_id, node_id)
669696
display_msg = user_msg or "(附图片)"
670697
save_chat_message(run_id, node_id, "user", display_msg, attachments=attachments, source=source)
698+
emit_chat_message(run_id, node_id, "user", display_msg, source=source)
671699

672700
# ── skill-driven 提示:检测 URL / 搜索意图,但不在 Python 里预抓取内容 ──
673701
import re as _re
@@ -941,9 +969,22 @@ async def api_reject_node(request: Request, run_id: str, background_tasks: Backg
941969
raw["user_feedback"] = {"action": "revise", "global_note": reason, "source": source}
942970
_save_state(raw)
943971

972+
emit_rejected(run_id, node_id, reason=reason, source=source)
973+
944974
# 恢复 Pipeline 执行(带反馈重新执行当前节点)
945975
background_tasks.add_task(_execute_pipeline, run_id)
946976

977+
# Discord 反向推送
978+
try:
979+
from web.notify import notify_discord
980+
import asyncio
981+
asyncio.ensure_future(notify_discord(
982+
f"🔄 **{node_id}** 被驳回({source}\n> {reason[:200]}",
983+
run_id=run_id, node=node_id,
984+
))
985+
except Exception:
986+
pass
987+
947988
return {
948989
"ok": True,
949990
"node": node_id,
@@ -1040,6 +1081,20 @@ async def api_rollback_node(request: Request, run_id: str):
10401081
raw["_rollback_reason"] = reason
10411082
_save_state(raw)
10421083

1084+
emit_rolled_back(run_id, current_node, target_node, source=body.get("source", "api"))
1085+
1086+
# Discord 反向推送
1087+
try:
1088+
from web.notify import notify_discord
1089+
import asyncio
1090+
reason_text = f"\n> {reason[:200]}" if reason else ""
1091+
asyncio.ensure_future(notify_discord(
1092+
f"⏪ 已回退: **{current_node}** → **{target_node}**{reason_text}",
1093+
run_id=run_id, node=target_node,
1094+
))
1095+
except Exception:
1096+
pass
1097+
10431098
return {
10441099
"ok": True,
10451100
"rolled_back_to": target_node,
@@ -1836,10 +1891,7 @@ async def _execute_pipeline(run_id: str):
18361891
if str(scripts_dir) not in sys.path:
18371892
sys.path.insert(0, str(scripts_dir))
18381893

1839-
from web.events import (
1840-
emit_node_start, emit_node_complete, emit_node_error,
1841-
emit_run_complete, emit_review_needed,
1842-
)
1894+
from web.events import emit_node_start, emit_node_complete, emit_node_error
18431895
from web.run_manager import _load_raw_state, _save_state, PIPELINE_NODES
18441896

18451897
# 真实节点函数映射

scripts/web/routes/sse.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,24 @@
2525
templates = Jinja2Templates(directory=str(TEMPLATES_DIR))
2626

2727

28+
@router.get("/api/runs/{run_id}/events")
29+
async def sse_json_endpoint(request: Request, run_id: str):
30+
"""JSON SSE 端点:推送 Pipeline 事件(供 OpenClaw Agent / 外部客户端订阅)
31+
32+
每个事件的 data 是 JSON 对象:
33+
event: node_complete
34+
data: {"run_id":"...","node":"scout","duration_ms":12000}
35+
"""
36+
async def event_generator():
37+
try:
38+
async for event in event_bus.subscribe(run_id):
39+
yield event.to_sse()
40+
except asyncio.CancelledError:
41+
pass
42+
43+
return EventSourceResponse(event_generator())
44+
45+
2846
@router.get("/sse/{run_id}")
2947
async def sse_endpoint(request: Request, run_id: str):
3048
"""

0 commit comments

Comments
 (0)