-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconsole_buffer.py
More file actions
235 lines (189 loc) · 7.56 KB
/
console_buffer.py
File metadata and controls
235 lines (189 loc) · 7.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
"""
Console buffer for Objectif.AI.
Maintains a circular in-memory log buffer and broadcasts to WebSocket clients.
Thread-safe. Designed to be used from async FastAPI context.
"""
import asyncio
import json
import logging
import time
from collections import deque
from dataclasses import dataclass, field, asdict
from enum import Enum
from typing import Optional
import itertools
logger = logging.getLogger(__name__)
class LogLevel(str, Enum):
INFO = "info"
SUCCESS = "success"
WARNING = "warning"
ERROR = "error"
REQUEST = "request" # Incoming image from BlueIris
DETECTION = "detection" # Detection result
SYSTEM = "system"
# Shapes cycled through for request/detection pairing
# Fixed-width via CSS — these are the Unicode symbols, CSS handles alignment
SHAPES = ["●", "■", "▲", "◆", "★", "⬟", "◉", "⬡", "▼", "◈"]
_shape_counter = itertools.cycle(range(len(SHAPES)))
_shape_lock = asyncio.Lock()
@dataclass
class LogEntry:
id: int
timestamp: float
level: str # LogLevel value
message: str
shape_index: Optional[int] = None # index into SHAPES, for request/detection pairs
inference_ms: Optional[float] = None
detections: Optional[list] = None # list of {label, confidence} dicts
_entry_counter = itertools.count(1)
def _make_entry(level: LogLevel, message: str, **kwargs) -> LogEntry:
return LogEntry(
id=next(_entry_counter),
timestamp=time.time(),
level=level.value,
message=message,
**kwargs,
)
def entry_to_dict(entry: LogEntry) -> dict:
d = asdict(entry)
d["shape"] = SHAPES[entry.shape_index] if entry.shape_index is not None else None
return d
class ConsoleBuffer:
"""
Thread-safe circular buffer of log entries.
Broadcasts new entries to all connected WebSocket clients.
"""
def __init__(self, max_size: int = 1000):
self._max_size = max_size
self._buffer: deque[LogEntry] = deque(maxlen=max_size)
self._clients: set = set()
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._next_shape_idx: int = 0
def resize(self, new_size: int):
self._max_size = new_size
old = list(self._buffer)
self._buffer = deque(old[-new_size:], maxlen=new_size)
def set_loop(self, loop: asyncio.AbstractEventLoop):
self._loop = loop
# ------------------------------------------------------------------
# Client management
# ------------------------------------------------------------------
def add_client(self, ws):
self._clients.add(ws)
logger.debug(f"Console client connected. Total: {len(self._clients)}")
def remove_client(self, ws):
self._clients.discard(ws)
logger.debug(f"Console client disconnected. Total: {len(self._clients)}")
# ------------------------------------------------------------------
# Logging helpers
# ------------------------------------------------------------------
def _next_shape(self) -> int:
idx = self._next_shape_idx
self._next_shape_idx = (self._next_shape_idx + 1) % len(SHAPES)
return idx
def _append_and_broadcast(self, entry: LogEntry):
self._buffer.append(entry)
self._broadcast(entry)
def _broadcast(self, entry: LogEntry):
if not self._clients:
return
payload = json.dumps({"type": "log", "entry": entry_to_dict(entry)})
dead = set()
for ws in list(self._clients):
try:
# Schedule coroutine on the event loop if available
if self._loop and self._loop.is_running():
asyncio.run_coroutine_threadsafe(
_safe_send(ws, payload), self._loop
)
else:
# Sync fallback — best effort
pass
except Exception:
dead.add(ws)
self._clients -= dead
async def _broadcast_async(self, entry: LogEntry):
if not self._clients:
return
payload = json.dumps({"type": "log", "entry": entry_to_dict(entry)})
dead = set()
for ws in list(self._clients):
try:
await _safe_send(ws, payload)
except Exception:
dead.add(ws)
self._clients -= dead
# ------------------------------------------------------------------
# Public log methods
# ------------------------------------------------------------------
def info(self, message: str):
self._append_and_broadcast(_make_entry(LogLevel.INFO, message))
def success(self, message: str):
self._append_and_broadcast(_make_entry(LogLevel.SUCCESS, message))
def warning(self, message: str):
self._append_and_broadcast(_make_entry(LogLevel.WARNING, message))
def error(self, message: str):
self._append_and_broadcast(_make_entry(LogLevel.ERROR, message))
def system(self, message: str):
self._append_and_broadcast(_make_entry(LogLevel.SYSTEM, message))
def request_received(self, size_kb: float) -> int:
"""Log an incoming image request. Returns the shape_index for pairing."""
shape_idx = self._next_shape()
entry = _make_entry(
LogLevel.REQUEST,
f"Image received — {size_kb:.1f} KB",
shape_index=shape_idx,
)
self._append_and_broadcast(entry)
return shape_idx
def detection_result(
self,
shape_idx: int,
detections: list,
inference_ms: float,
):
"""Log a detection result, paired with a prior request via shape_idx."""
# Sort by confidence descending — frontend uses this order for display
sorted_dets = sorted(detections, key=lambda d: d["confidence"], reverse=True)
if sorted_dets:
# Summary line always shows count + time
msg = f"Detected {len(sorted_dets)} item{'s' if len(sorted_dets)!=1 else ''} — {inference_ms:.1f} ms"
else:
msg = f"No detections — {inference_ms:.1f} ms"
entry = _make_entry(
LogLevel.DETECTION,
msg,
shape_index=shape_idx,
inference_ms=inference_ms,
detections=sorted_dets, # always full sorted list
)
self._append_and_broadcast(entry)
def no_model(self):
self._append_and_broadcast(
_make_entry(LogLevel.WARNING, "No model loaded — request ignored")
)
# ------------------------------------------------------------------
# History retrieval
# ------------------------------------------------------------------
def get_history(self, last_n: Optional[int] = None) -> list[dict]:
entries = list(self._buffer)
if last_n is not None:
entries = entries[-last_n:]
return [entry_to_dict(e) for e in entries]
async def send_history(self, ws, last_n: int = 200):
"""Send recent history to a newly connected WebSocket client."""
history = self.get_history(last_n)
payload = json.dumps({"type": "history", "entries": history})
try:
await ws.send_text(payload)
except Exception as e:
logger.debug(f"Failed to send history: {e}")
async def _safe_send(ws, payload: str):
try:
await ws.send_text(payload)
except Exception:
pass
# ---------------------------------------------------------------------------
# Module-level singleton — imported everywhere
# ---------------------------------------------------------------------------
console = ConsoleBuffer()