Skip to content

Commit 8176709

Browse files
committed
feat: Implement core application structure, dashboard, and settings page with live health metrics and admin actions for data purging and database vacuuming.
1 parent b15ae68 commit 8176709

14 files changed

Lines changed: 615 additions & 163 deletions

File tree

cmd/argus/main.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,12 +99,23 @@ func main() {
9999
defer hub.Stop()
100100
slog.Info("🔌 WebSocket hub started")
101101

102+
// 4b. Initialize Event Notification Hub (for live mode — pushes data snapshots)
103+
eventHub := realtime.NewEventHub(
104+
repo,
105+
metrics.IncrementActiveConns,
106+
metrics.DecrementActiveConns,
107+
)
108+
ctxEvents, cancelEvents := context.WithCancel(context.Background())
109+
defer cancelEvents()
110+
go eventHub.Start(ctxEvents, 5*time.Second)
111+
slog.Info("⚡ Event notification hub started (5s flush)")
112+
102113
// 5. Initialize AI Service
103114
aiService := ai.NewService(repo)
104115
defer aiService.Stop()
105116

106117
// 6. Initialize API Server
107-
apiServer := api.NewServer(repo, hub, metrics)
118+
apiServer := api.NewServer(repo, hub, eventHub, metrics)
108119

109120
// 7. Initialize OTLP Ingestion (gRPC)
110121
traceServer := ingest.NewTraceServer(repo, metrics, cfg)
@@ -115,6 +126,7 @@ func main() {
115126
start := time.Now()
116127
apiServer.BroadcastLog(l)
117128
aiService.EnqueueLog(l)
129+
eventHub.NotifyRefresh()
118130
if time.Since(start) > 100*time.Millisecond {
119131
slog.Warn("Slow broadcast/enqueue", "duration", time.Since(start))
120132
}

internal/api/handlers.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,19 @@ import (
1414

1515
// Server handles HTTP API requests.
1616
type Server struct {
17-
repo *storage.Repository
18-
hub *realtime.Hub
19-
metrics *telemetry.Metrics
17+
repo *storage.Repository
18+
hub *realtime.Hub
19+
eventHub *realtime.EventHub
20+
metrics *telemetry.Metrics
2021
}
2122

2223
// NewServer creates a new API server.
23-
func NewServer(repo *storage.Repository, hub *realtime.Hub, metrics *telemetry.Metrics) *Server {
24+
func NewServer(repo *storage.Repository, hub *realtime.Hub, eventHub *realtime.EventHub, metrics *telemetry.Metrics) *Server {
2425
return &Server{
25-
repo: repo,
26-
hub: hub,
27-
metrics: metrics,
26+
repo: repo,
27+
hub: hub,
28+
eventHub: eventHub,
29+
metrics: metrics,
2830
}
2931
}
3032

@@ -44,6 +46,7 @@ func (s *Server) RegisterRoutes(mux *http.ServeMux) {
4446
mux.Handle("GET /metrics", telemetry.PrometheusHandler())
4547
mux.HandleFunc("/ws", s.hub.HandleWebSocket)
4648
mux.HandleFunc("/ws/health", s.metrics.HealthWSHandler())
49+
mux.HandleFunc("/ws/events", s.eventHub.HandleWebSocket)
4750
mux.HandleFunc("DELETE /api/admin/purge", s.handlePurge)
4851
mux.HandleFunc("POST /api/admin/vacuum", s.handleVacuum)
4952
}

internal/realtime/events_ws.go

Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
package realtime
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"log/slog"
7+
"net/http"
8+
"sync"
9+
"time"
10+
11+
"github.com/RandomCodeSpace/Project-Argus/internal/storage"
12+
"github.com/coder/websocket"
13+
)
14+
15+
// LiveSnapshot is the data payload pushed to all event WS clients.
16+
type LiveSnapshot struct {
17+
Type string `json:"type"`
18+
Dashboard *storage.DashboardStats `json:"dashboard"`
19+
Traffic []storage.TrafficPoint `json:"traffic"`
20+
Traces *storage.TracesResponse `json:"traces"`
21+
ServiceMap *storage.ServiceMapMetrics `json:"service_map"`
22+
}
23+
24+
// clientFilter tracks a client's active service filter.
25+
// Empty string = all services (no filter).
26+
type clientFilter struct {
27+
service string
28+
}
29+
30+
// EventHub manages WebSocket clients and pushes live data snapshots
31+
// filtered per-client's selected service. Debounces rapid ingestion
32+
// bursts and only computes snapshots every flush interval.
33+
type EventHub struct {
34+
repo *storage.Repository
35+
onConn func()
36+
onDisc func()
37+
38+
mu sync.Mutex
39+
clients map[*websocket.Conn]*clientFilter
40+
pending bool
41+
}
42+
43+
// NewEventHub creates a new event notification hub.
44+
func NewEventHub(repo *storage.Repository, onConnect, onDisconnect func()) *EventHub {
45+
return &EventHub{
46+
repo: repo,
47+
onConn: onConnect,
48+
onDisc: onDisconnect,
49+
clients: make(map[*websocket.Conn]*clientFilter),
50+
}
51+
}
52+
53+
// Start begins the periodic flush loop. Call in a goroutine.
54+
func (h *EventHub) Start(ctx context.Context, interval time.Duration) {
55+
ticker := time.NewTicker(interval)
56+
defer ticker.Stop()
57+
for {
58+
select {
59+
case <-ctx.Done():
60+
return
61+
case <-ticker.C:
62+
h.flush()
63+
}
64+
}
65+
}
66+
67+
// NotifyRefresh marks that new data has arrived. The actual broadcast
68+
// happens on the next ticker flush to debounce rapid ingestion bursts.
69+
func (h *EventHub) NotifyRefresh() {
70+
h.mu.Lock()
71+
h.pending = true
72+
h.mu.Unlock()
73+
}
74+
75+
// HandleWebSocket upgrades an HTTP request to a WebSocket connection,
76+
// registers it as an event client, and listens for filter messages.
77+
func (h *EventHub) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
78+
conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{
79+
InsecureSkipVerify: true,
80+
})
81+
if err != nil {
82+
slog.Error("Event WS accept failed", "error", err)
83+
return
84+
}
85+
86+
// Check for initial service filter from query params
87+
initialService := r.URL.Query().Get("service")
88+
h.addClient(conn, initialService)
89+
90+
// Send immediate snapshot so the client has data right away
91+
h.sendSnapshotTo(conn, initialService)
92+
93+
// Read loop: client can send {"service":"xxx"} to change filter
94+
for {
95+
_, msg, readErr := conn.Read(r.Context())
96+
if readErr != nil {
97+
break
98+
}
99+
var filterMsg struct {
100+
Service string `json:"service"`
101+
}
102+
if json.Unmarshal(msg, &filterMsg) == nil {
103+
h.updateClientFilter(conn, filterMsg.Service)
104+
}
105+
}
106+
107+
h.removeClient(conn)
108+
conn.Close(websocket.StatusNormalClosure, "bye")
109+
}
110+
111+
func (h *EventHub) addClient(c *websocket.Conn, service string) {
112+
h.mu.Lock()
113+
h.clients[c] = &clientFilter{service: service}
114+
h.mu.Unlock()
115+
if h.onConn != nil {
116+
h.onConn()
117+
}
118+
}
119+
120+
func (h *EventHub) removeClient(c *websocket.Conn) {
121+
h.mu.Lock()
122+
delete(h.clients, c)
123+
h.mu.Unlock()
124+
if h.onDisc != nil {
125+
h.onDisc()
126+
}
127+
}
128+
129+
func (h *EventHub) updateClientFilter(c *websocket.Conn, service string) {
130+
h.mu.Lock()
131+
if cf, ok := h.clients[c]; ok {
132+
cf.service = service
133+
}
134+
h.mu.Unlock()
135+
}
136+
137+
// flush computes per-service snapshots and pushes to matching clients.
138+
func (h *EventHub) flush() {
139+
h.mu.Lock()
140+
if !h.pending {
141+
h.mu.Unlock()
142+
return
143+
}
144+
h.pending = false
145+
146+
if len(h.clients) == 0 {
147+
h.mu.Unlock()
148+
return
149+
}
150+
151+
// Group clients by service filter
152+
groups := make(map[string][]*websocket.Conn)
153+
for c, cf := range h.clients {
154+
groups[cf.service] = append(groups[cf.service], c)
155+
}
156+
h.mu.Unlock()
157+
158+
// Compute one snapshot per unique filter, push to matching clients
159+
for service, clients := range groups {
160+
snapshot := h.computeSnapshot(service)
161+
if snapshot == nil {
162+
continue
163+
}
164+
msg, err := json.Marshal(snapshot)
165+
if err != nil {
166+
slog.Error("Event WS marshal failed", "error", err)
167+
continue
168+
}
169+
170+
for _, conn := range clients {
171+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
172+
if err := conn.Write(ctx, websocket.MessageText, msg); err != nil {
173+
slog.Debug("Event WS send failed, removing client", "error", err)
174+
h.removeClient(conn)
175+
conn.Close(websocket.StatusGoingAway, "write error")
176+
}
177+
cancel()
178+
}
179+
}
180+
}
181+
182+
// sendSnapshotTo sends a snapshot to a single client.
183+
func (h *EventHub) sendSnapshotTo(conn *websocket.Conn, service string) {
184+
snapshot := h.computeSnapshot(service)
185+
if snapshot == nil {
186+
return
187+
}
188+
msg, err := json.Marshal(snapshot)
189+
if err != nil {
190+
return
191+
}
192+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
193+
defer cancel()
194+
conn.Write(ctx, websocket.MessageText, msg)
195+
}
196+
197+
// computeSnapshot queries the DB for the last 15 minutes of data,
198+
// optionally filtered by a single service name.
199+
func (h *EventHub) computeSnapshot(service string) *LiveSnapshot {
200+
now := time.Now()
201+
start := now.Add(-15 * time.Minute)
202+
203+
var serviceNames []string
204+
if service != "" {
205+
serviceNames = []string{service}
206+
}
207+
208+
snapshot := &LiveSnapshot{Type: "live_snapshot"}
209+
210+
if stats, err := h.repo.GetDashboardStats(start, now, serviceNames); err == nil {
211+
snapshot.Dashboard = stats
212+
}
213+
214+
if traffic, err := h.repo.GetTrafficMetrics(start, now, serviceNames); err == nil {
215+
snapshot.Traffic = traffic
216+
}
217+
218+
if traces, err := h.repo.GetTracesFiltered(start, now, serviceNames, "", "", 25, 0, "timestamp", "desc"); err == nil {
219+
snapshot.Traces = traces
220+
}
221+
222+
if smap, err := h.repo.GetServiceMapMetrics(start, now); err == nil {
223+
snapshot.ServiceMap = smap
224+
}
225+
226+
return snapshot
227+
}

internal/telemetry/health_ws.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ func (m *Metrics) HealthWSHandler() http.HandlerFunc {
2424
}
2525
defer conn.Close(websocket.StatusNormalClosure, "closing")
2626

27+
// Track this connection in active_connections metric
28+
m.IncrementActiveConns()
29+
defer m.DecrementActiveConns()
30+
2731
slog.Info("📊 Health WS client connected")
2832

2933
// Send immediate snapshot so client doesn't wait for first tick

internal/telemetry/metrics.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,22 @@ func (m *Metrics) SetActiveConnections(n int) {
6060
m.activeConns.Store(int64(n))
6161
}
6262

63+
// IncrementActiveConns atomically adds 1 to the active connection count.
64+
func (m *Metrics) IncrementActiveConns() {
65+
n := m.activeConns.Add(1)
66+
m.ActiveConnections.Set(float64(n))
67+
}
68+
69+
// DecrementActiveConns atomically subtracts 1 from the active connection count.
70+
func (m *Metrics) DecrementActiveConns() {
71+
n := m.activeConns.Add(-1)
72+
if n < 0 {
73+
n = 0
74+
m.activeConns.Store(0)
75+
}
76+
m.ActiveConnections.Set(float64(n))
77+
}
78+
6379
// SetDLQSize updates the DLQ size gauge.
6480
func (m *Metrics) SetDLQSize(n int) {
6581
m.DLQSize.Set(float64(n))

web/src/App.tsx

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
import { AppLayout } from './layouts/AppLayout'
2+
import { LiveModeProvider } from './contexts/LiveModeContext'
23

34
function App() {
4-
return <AppLayout />
5+
return (
6+
<LiveModeProvider>
7+
<AppLayout />
8+
</LiveModeProvider>
9+
)
510
}
611

712
export default App

0 commit comments

Comments
 (0)