#its-data-intelligence Stream processing and analytics API for the Intelligent Traffic System.
B1 → Kafka → Stream Processor → PostgreSQL → FastAPI API → B3
Two services:
- b2-stream-processor — Consumes events from Kafka, aggregates in 5-second windows, computes metrics, classifies congestion, writes to Postgres
- b2-api — REST + WebSocket API serving traffic metrics to B3 dashboard
# Start everything (Kafka, Postgres, both B2 services)
cp .env.example .env
docker compose up -d
# Forcing a clean rebuild (skips BuildKit layer cache)
# `--build --force-recreate` still uses the layer cache — use this instead:
docker compose build --no-cache --pull
docker compose up -d --force-recreate
# Send mock traffic events (for development without B1)
pip install kafka-python
python tools/mock_producer.py --cameras 4 --rate 10 --brokers localhost:29094
# Check the API
curl http://localhost:18001/health
curl http://localhost:18001/cameras
curl http://localhost:18001/congestion/currentA step-by-step walkthrough that exercises every capability the team built. Run from the repo root unless noted.
- Docker Desktop running
- Python 3.11+ on the host (for
tools/mock_producer.py) - Optional:
jq,websocatfor nicer output
cp .env.example .env
docker compose up -d
docker compose ps
docker compose exec b2-stream-processor alembic upgrade headNeed a clean rebuild?
docker compose up -d --build --force-recreatestill hits the BuildKit layer cache. Rundocker compose build --no-cache --pullfirst, thendocker compose up -d --force-recreate.
curl -s http://localhost:18001/health | jq
# {"status":"ok","kafka":"ok","postgres":"ok"}Stop Kafka briefly to see the degraded path:
docker compose stop kafka
curl -s http://localhost:18001/health | jq
# {"status":"degraded","kafka":"unreachable","postgres":"ok"} ← still HTTP 200
docker compose start kafkadocker compose logs --tail=20 b2-stream-processor | jq -c .Every line is one JSON object with time, level, logger, message. Library logs (kafka-python, sqlalchemy) come through the same formatter.
pip install kafka-python
python tools/mock_producer.py --cameras 4 --rate 10 --brokers localhost:29094In another terminal, watch windows close:
docker compose logs -f b2-stream-processor | jq -c 'select(.message | startswith("window_flushed"))'docker compose exec postgres psql -U user -d traffic -c "
SELECT camera_id, ts, vehicle_id, class, frame_id, confidence,
bbox_x, bbox_y, bbox_w, bbox_h, lane_id, speed_kmh
FROM traffic_events ORDER BY ts DESC LIMIT 5;"frame_id, confidence, full bbox, and lane_id were dropped before. They're now in the row.
docker compose exec postgres psql -U user -d traffic -c "
SELECT camera_id, window_start, vehicle_count,
round(avg_speed_kmh::numeric, 1) AS avg_speed,
congestion_level, round(congestion_score::numeric, 2) AS score
FROM traffic_metrics
WHERE lane_id IS NULL
ORDER BY window_start DESC LIMIT 5;"Restart the producer with lanes:
python tools/mock_producer.py --cameras 4 --rate 10 --with-lanes --brokers localhost:29094After ~15s:
docker compose exec postgres psql -U user -d traffic -c "
SELECT camera_id, lane_id, window_start, vehicle_count, congestion_level
FROM traffic_metrics
WHERE lane_id IS NOT NULL
ORDER BY window_start DESC, camera_id, lane_id LIMIT 12;"One camera now produces both a camera-wide row and per-lane rows for the same 5s window.
python tools/mock_producer.py --cameras 4 --rate 15 --no-speed --brokers localhost:29094After ~3 windows:
docker compose exec postgres psql -U user -d traffic -c "
SELECT camera_id, window_start, round(avg_speed_kmh::numeric, 1) AS avg_speed
FROM traffic_metrics WHERE lane_id IS NULL
ORDER BY window_start DESC LIMIT 5;"avg_speed is non-zero — SpeedTracker computed it from inter-frame centroid displacement.
Use a camera_id that actually exists (after the mock producer, /cameras typically lists cam_01 … cam_04 when --cameras 4). For a full ordered curl checklist, see Verify the HTTP API locally (step-by-step) below.
export BASE=http://localhost:18001
curl -s "$BASE/cameras" | jq
curl -s "$BASE/metrics/current?camera_id=cam_01" | jq
curl -s "$BASE/metrics/history?camera_id=cam_01&from=2026-05-01T00:00:00Z&to=2026-05-02T00:00:00Z" | jq
curl -s "$BASE/congestion/current" | jq
curl -s "$BASE/api/dashboard/summary" | jq
curl -s "$BASE/api/dashboard/events?limit=5" | jq
curl -s "$BASE/api/map/heatmap?minutes=5" | jq
curl -s "$BASE/api/map/incidents" | jq
curl -s "$BASE/api/alerts/current" | jq
curl -s "$BASE/api/analytics/metrics" | jq
# ST-GCN forecast (defaults: horizon 10 min, lookback 15 min). Requires checkpoint + topology; may 404/503 until configured.
curl -s "$BASE/api/predict/congestion?camera_id=cam_01" | jq# all cameras (default — camera-wide rows only)
websocat ws://localhost:18001/ws/metrics
# single camera
websocat 'ws://localhost:18001/ws/metrics?camera_id=cam_01'
# per-lane breakdowns for one camera
websocat 'ws://localhost:18001/ws/metrics/lanes?camera_id=cam_01'
# unified real-time event channel (#35) — typed envelopes:
# traffic_metrics_update (5s), heatmap_update (10s),
# new_alert (instant on HIGH/SEVERE), admin_broadcast.
websocat ws://localhost:18001/ws/events
websocat 'ws://localhost:18001/ws/events?types=new_alert,admin_broadcast'# API process: 8000
curl -s localhost:18001/metrics | grep -E '^b2_api_requests_total|^b2_api_request_latency'
# Processor process: 9100
curl -s localhost:9100/metrics | grep -E '^b2_events_processed_total|^b2_window_flushes_total|^b2_kafka_consumer_lag'docker compose exec b2-stream-processor python -c "
from shared.db import SessionLocal
from processor.retention import sweep
events_deleted, metrics_deleted = sweep(SessionLocal)
print(f'events_deleted={events_deleted} metrics_deleted={metrics_deleted}')"pip install -r requirements.txt
make test-integration # ~60–90s on first runCoverage:
test_kafka_schema_compat— 100 mock events round-trip through validatortest_end_to_end_flow— produce → consume → DB row + per-lane rowtest_health_with_real_kafka—/healthreturnskafka=oktest_retention— sweeper deletes old rows
| Capability | Where to look | Talking point |
|---|---|---|
| Wire-compatible with B1 verbatim | traffic_events table |
every B1 field persisted |
| Per-lane analytics | traffic_metrics WHERE lane_id IS NOT NULL |
single table, one query path, one writer |
| Speed fallback | --no-speed run |
tracking-based estimate when B1 omits it |
| Kafka-aware health | /health while Kafka is stopped |
reports degraded, still HTTP 200 |
| Structured logs | docker compose logs ... | jq |
one JSON object per line, lib logs included |
| Two Prometheus endpoints | :18001/metrics, :9100/metrics |
API + processor instrumented |
| WebSocket camera filter | ?camera_id=X |
B3 can subscribe to one camera |
| Retention enforced | manual sweep() invocation |
events 24h, metrics 30d (SRS §7) |
| Integration tests | make test-integration |
testcontainers Kafka + Postgres in CI |
# Install dependencies (full dev stack incl. testcontainers)
pip install -r requirements.txt
# Run unit tests (skips integration)
make test
# Run integration tests (requires Docker — spins up Kafka + Postgres)
make test-integration
# Run everything
make test-all
# Lint
make lint
# Run mock producer
make mock # plain B1-shaped events
python tools/mock_producer.py --with-lanes --brokers localhost:29094 # include lane_id ~80% of events
python tools/mock_producer.py --no-speed --brokers localhost:29094 # exercise B2 speed fallback
# View logs
make logs
# Tear down
make downPer SRS §7, the processor sweeps old rows on a configurable interval
(default 1 hour): traffic_events are kept for 24h, traffic_metrics for
30 days. Override via RETENTION_EVENTS_HOURS / RETENTION_METRICS_DAYS.
All HTTP paths below are served by b2-api (default host mapping: http://localhost:18001 → container port 8000). Routers are defined under src/api/main.py and src/api/routes/.
| Method | Path | Description |
|---|---|---|
| GET | /health |
Liveness probe (Kafka + Postgres reachability) |
| GET | /cameras |
List active cameras (from latest metrics) |
| GET | /metrics/current?camera_id=X |
Latest aggregated metrics for one camera |
| GET | /metrics/history?camera_id=X&from=T1&to=T2 |
Historical metrics (from / to are ISO 8601) |
| GET | /congestion/current |
Latest camera-wide congestion for all cameras |
| Method | Path | Description |
|---|---|---|
| GET | /api/dashboard/summary |
KPI summary across cameras |
| GET | /api/dashboard/events?limit=N |
Recent raw traffic events (default limit=10) |
| GET | /api/map/heatmap?minutes=N |
Per-camera intensity for heatmap (default minutes=5) |
| GET | /api/map/incidents?severity=… |
Cameras with elevated congestion; optional severity filter |
| Method | Path | Description |
|---|---|---|
| GET | /api/alerts/current |
Active congestion-style alerts from latest metrics |
| GET | /api/alerts/history |
Optional filters: from, to, severity, road_segment, type |
| POST | /api/alerts/{alert_id}/acknowledge |
JSON body: admin_id (see OpenAPI /docs) |
| GET | /api/alerts/export |
CSV export; same query params as history |
| Method | Path | Description |
|---|---|---|
| GET | /api/analytics/metrics |
Optional start / end (ISO); defaults to last 24h |
| GET | /api/analytics/compare |
Required start_a, end_a, start_b, end_b |
| GET | /api/analytics/report/pdf |
Optional start / end; returns a PDF attachment |
| Method | Path | Description |
|---|---|---|
| GET | /api/predict/congestion?camera_id=X |
ST-GCN lane forecast aggregated to camera level. Query params: horizon_minutes (default 10), lookback_minutes (default 15). Needs traffic-predictor/ config + trained checkpoint on the API host; see TRAFFIC_PREDICTOR_HOME, STGCN_CONFIG_PATH, STGCN_CHECKPOINT_PATH in .env.example. |
All under /api/admin/…. Send X-Admin-Token: <ADMIN_API_KEY> (or Bearer token). In docker-compose.yml, b2-api sets ADMIN_API_KEY for local dev.
| Method | Path | Description |
|---|---|---|
| GET/PUT | /api/admin/thresholds |
Read/update congestion thresholds |
| GET | /api/admin/zones |
List monitoring zones |
| POST | /api/admin/zones |
Create zone (JSON body) |
| PUT | /api/admin/zones/{zone_id} |
Update zone |
| DELETE | /api/admin/zones/{zone_id} |
Delete zone |
| POST | /api/admin/notifications/broadcast |
Push message to WebSocket admin_broadcast |
| Protocol | Path | Description |
|---|---|---|
| WS | /ws/metrics |
Optional query params: camera_id, role (viewer or operator) |
| WS | /ws/metrics/lanes |
Same optional params as /ws/metrics |
| WS | /ws/events[?types=…] |
Typed envelopes: traffic_metrics_update, heatmap_update, new_alert, admin_broadcast |
| Method | Path | Description |
|---|---|---|
| GET | /metrics |
Prometheus scrape (request counts, latency, etc.) |
The stream processor exposes its own Prometheus endpoint on host port 9100 (see Demo §11): b2_events_processed_total, b2_events_dropped_total{reason}, b2_window_flushes_total, b2_kafka_consumer_lag.
Interactive OpenAPI: http://localhost:18001/docs (Swagger UI) lists every route, parameters, and response schemas.
Follow these in order after the stack is up (docker compose up -d, migrations applied as in Demo §1). Set a base URL (host port from docker-compose.yml for b2-api):
export BASE=http://localhost:18001-
Health — expect
statusokordegradedif Kafka is down.curl -s "$BASE/health" | jq
-
Cameras — confirms the API can read Postgres; IDs should match your producer (e.g.
cam_01).curl -s "$BASE/cameras" | jq
-
Latest metrics — pick any
camera_idfrom step 2.curl -s "$BASE/metrics/current?camera_id=cam_01" | jq
-
Metrics history — adjust
from/toto bracket existingwindow_startvalues (UTC ISO 8601).curl -s "$BASE/metrics/history?camera_id=cam_01&from=2026-05-01T00:00:00Z&to=2026-05-11T23:59:59Z" | jq
-
Congestion snapshot (all cameras).
curl -s "$BASE/congestion/current" | jq
-
Dashboard summary.
curl -s "$BASE/api/dashboard/summary" | jq
-
Recent events.
curl -s "$BASE/api/dashboard/events?limit=5" | jq
-
Heatmap points.
curl -s "$BASE/api/map/heatmap?minutes=5" | jq
-
Map incidents (optional
?severity=HIGH).curl -s "$BASE/api/map/incidents" | jq
-
Alerts — may be an empty array until metrics show HIGH/CRITICAL-style congestion.
curl -s "$BASE/api/alerts/current" | jq
-
Analytics (24h default window).
curl -s "$BASE/api/analytics/metrics" | jq
Range compare (optional):
GET /api/analytics/comparerequires all ofstart_a,end_a,start_b, andend_b(ISO 8601). Use/docsto try it interactively. -
Analytics PDF — writes a file; omit
-sif you want HTTP headers.curl -sS -o /tmp/b2-report.pdf "$BASE/api/analytics/report/pdf" -
ST-GCN forecast — defaults 10 min horizon, 15 min lookback. Returns 404 if there is no recent per-lane data for the camera, or the camera is outside the ST-GCN topology; 503 if the model bundle or checkpoint is missing or inference fails.
curl -s "$BASE/api/predict/congestion?camera_id=cam_01" | jq curl -s "$BASE/api/predict/congestion?camera_id=cam_01&horizon_minutes=10&lookback_minutes=15" | jq
-
Admin thresholds (optional) — uses
ADMIN_API_KEYfrom Compose (example:my-secret-123).curl -s -H "X-Admin-Token: my-secret-123" "$BASE/api/admin/thresholds" | jq
-
WebSockets (optional) — install
websocat. Usews://(nothttp://).websocat "ws://localhost:18001/ws/metrics" websocat "ws://localhost:18001/ws/events" websocat "ws://localhost:18001/ws/metrics?camera_id=cam_01"
-
Prometheus scrape (optional).
curl -s "$BASE/metrics" | head curl -s "http://localhost:9100/metrics" | head # processor; only if port 9100 is published
All configuration via environment variables. See .env.example for the full list.
These are tracked on the project board so we don't lose them:
- Pixel-to-meter calibration is a placeholder (
SPEED_TRACKER_PIXEL_TO_METER=0.05). Absolute km/h numbers will be off until B1 hands over per-camera survey data. Tracked in #19. - In-memory windowing, not PyFlink. Fine for the demo and the load we expect; PyFlink migration is on the backlog (#20) for after the demo.
- Auth is delegated to Kong / Keycloak (B4). B2 endpoints are unauthenticated when hit directly — fine inside the cluster, defense-in-depth tracked in #23.
- Event-time window flushing now uses a per-camera watermark + allowed lateness (tracked in #21). Keep
WINDOW_ALLOWED_LATENESS_SECONDStuned to your worst expected camera skew.
src/
shared/ — Config, DB models, Pydantic schemas (shared by both services)
processor/ — Stream processor (Kafka consumer, windowed aggregation, Postgres writer)
api/ — FastAPI REST + WebSocket service
tools/ — Mock event producer for development
migrations/ — Alembic database migrations
tests/ — Unit tests + tests/integration/ (testcontainers)
docker/ — Dockerfiles for each service