Skip to content

Intelligent-Traffic-Ecosystem/its-data-intelligence

Repository files navigation

B2 — Data & Intelligence

#its-data-intelligence Stream processing and analytics API for the Intelligent Traffic System.

Architecture

B1 → Kafka → Stream Processor → PostgreSQL → FastAPI API → B3

Two services:

  • b2-stream-processor — Consumes events from Kafka, aggregates in 5-second windows, computes metrics, classifies congestion, writes to Postgres
  • b2-api — REST + WebSocket API serving traffic metrics to B3 dashboard

Quick Start

# Start everything (Kafka, Postgres, both B2 services)
cp .env.example .env
docker compose up -d

# Forcing a clean rebuild (skips BuildKit layer cache)
# `--build --force-recreate` still uses the layer cache — use this instead:
docker compose build --no-cache --pull
docker compose up -d --force-recreate

# Send mock traffic events (for development without B1)
pip install kafka-python
python tools/mock_producer.py --cameras 4 --rate 10 --brokers localhost:29094

# Check the API
curl http://localhost:18001/health
curl http://localhost:18001/cameras
curl http://localhost:18001/congestion/current

Demo Runbook

A step-by-step walkthrough that exercises every capability the team built. Run from the repo root unless noted.

Prerequisites

  • Docker Desktop running
  • Python 3.11+ on the host (for tools/mock_producer.py)
  • Optional: jq, websocat for nicer output

1 — Bring up the stack

cp .env.example .env
docker compose up -d
docker compose ps
docker compose exec b2-stream-processor alembic upgrade head

Need a clean rebuild? docker compose up -d --build --force-recreate still hits the BuildKit layer cache. Run docker compose build --no-cache --pull first, then docker compose up -d --force-recreate.

2 — Health probe (Kafka + Postgres aware)

curl -s http://localhost:18001/health | jq
# {"status":"ok","kafka":"ok","postgres":"ok"}

Stop Kafka briefly to see the degraded path:

docker compose stop kafka
curl -s http://localhost:18001/health | jq
# {"status":"degraded","kafka":"unreachable","postgres":"ok"}   ← still HTTP 200
docker compose start kafka

3 — Structured JSON logs

docker compose logs --tail=20 b2-stream-processor | jq -c .

Every line is one JSON object with time, level, logger, message. Library logs (kafka-python, sqlalchemy) come through the same formatter.

4 — Send B1-shaped events

pip install kafka-python
python tools/mock_producer.py --cameras 4 --rate 10 --brokers localhost:29094

In another terminal, watch windows close:

docker compose logs -f b2-stream-processor | jq -c 'select(.message | startswith("window_flushed"))'

5 — Verify all B1 fields are persisted

docker compose exec postgres psql -U user -d traffic -c "
SELECT camera_id, ts, vehicle_id, class, frame_id, confidence,
       bbox_x, bbox_y, bbox_w, bbox_h, lane_id, speed_kmh
FROM traffic_events ORDER BY ts DESC LIMIT 5;"

frame_id, confidence, full bbox, and lane_id were dropped before. They're now in the row.

6 — Inspect aggregated camera-wide metrics

docker compose exec postgres psql -U user -d traffic -c "
SELECT camera_id, window_start, vehicle_count,
       round(avg_speed_kmh::numeric, 1) AS avg_speed,
       congestion_level, round(congestion_score::numeric, 2) AS score
FROM traffic_metrics
WHERE lane_id IS NULL
ORDER BY window_start DESC LIMIT 5;"

7 — Per-lane breakdown

Restart the producer with lanes:

python tools/mock_producer.py --cameras 4 --rate 10 --with-lanes --brokers localhost:29094

After ~15s:

docker compose exec postgres psql -U user -d traffic -c "
SELECT camera_id, lane_id, window_start, vehicle_count, congestion_level
FROM traffic_metrics
WHERE lane_id IS NOT NULL
ORDER BY window_start DESC, camera_id, lane_id LIMIT 12;"

One camera now produces both a camera-wide row and per-lane rows for the same 5s window.

8 — Speed fallback when B1 omits speed_estimate

python tools/mock_producer.py --cameras 4 --rate 15 --no-speed --brokers localhost:29094

After ~3 windows:

docker compose exec postgres psql -U user -d traffic -c "
SELECT camera_id, window_start, round(avg_speed_kmh::numeric, 1) AS avg_speed
FROM traffic_metrics WHERE lane_id IS NULL
ORDER BY window_start DESC LIMIT 5;"

avg_speed is non-zero — SpeedTracker computed it from inter-frame centroid displacement.

9 — REST endpoints (the contract for B3)

Use a camera_id that actually exists (after the mock producer, /cameras typically lists cam_01cam_04 when --cameras 4). For a full ordered curl checklist, see Verify the HTTP API locally (step-by-step) below.

export BASE=http://localhost:18001
curl -s "$BASE/cameras" | jq
curl -s "$BASE/metrics/current?camera_id=cam_01" | jq
curl -s "$BASE/metrics/history?camera_id=cam_01&from=2026-05-01T00:00:00Z&to=2026-05-02T00:00:00Z" | jq
curl -s "$BASE/congestion/current" | jq
curl -s "$BASE/api/dashboard/summary" | jq
curl -s "$BASE/api/dashboard/events?limit=5" | jq
curl -s "$BASE/api/map/heatmap?minutes=5" | jq
curl -s "$BASE/api/map/incidents" | jq
curl -s "$BASE/api/alerts/current" | jq
curl -s "$BASE/api/analytics/metrics" | jq
# ST-GCN forecast (defaults: horizon 10 min, lookback 15 min). Requires checkpoint + topology; may 404/503 until configured.
curl -s "$BASE/api/predict/congestion?camera_id=cam_01" | jq

10 — WebSocket live stream

# all cameras (default — camera-wide rows only)
websocat ws://localhost:18001/ws/metrics

# single camera
websocat 'ws://localhost:18001/ws/metrics?camera_id=cam_01'

# per-lane breakdowns for one camera
websocat 'ws://localhost:18001/ws/metrics/lanes?camera_id=cam_01'

# unified real-time event channel (#35) — typed envelopes:
#   traffic_metrics_update (5s), heatmap_update (10s),
#   new_alert (instant on HIGH/SEVERE), admin_broadcast.
websocat ws://localhost:18001/ws/events
websocat 'ws://localhost:18001/ws/events?types=new_alert,admin_broadcast'

11 — Prometheus metrics (two endpoints)

# API process: 8000
curl -s localhost:18001/metrics | grep -E '^b2_api_requests_total|^b2_api_request_latency'

# Processor process: 9100
curl -s localhost:9100/metrics | grep -E '^b2_events_processed_total|^b2_window_flushes_total|^b2_kafka_consumer_lag'

12 — Manual retention sweep

docker compose exec b2-stream-processor python -c "
from shared.db import SessionLocal
from processor.retention import sweep
events_deleted, metrics_deleted = sweep(SessionLocal)
print(f'events_deleted={events_deleted} metrics_deleted={metrics_deleted}')"

13 — Integration tests against real Kafka + Postgres

pip install -r requirements.txt
make test-integration                          # ~60–90s on first run

Coverage:

  • test_kafka_schema_compat — 100 mock events round-trip through validator
  • test_end_to_end_flow — produce → consume → DB row + per-lane row
  • test_health_with_real_kafka/health returns kafka=ok
  • test_retention — sweeper deletes old rows

Capability Matrix (what to highlight)

Capability Where to look Talking point
Wire-compatible with B1 verbatim traffic_events table every B1 field persisted
Per-lane analytics traffic_metrics WHERE lane_id IS NOT NULL single table, one query path, one writer
Speed fallback --no-speed run tracking-based estimate when B1 omits it
Kafka-aware health /health while Kafka is stopped reports degraded, still HTTP 200
Structured logs docker compose logs ... | jq one JSON object per line, lib logs included
Two Prometheus endpoints :18001/metrics, :9100/metrics API + processor instrumented
WebSocket camera filter ?camera_id=X B3 can subscribe to one camera
Retention enforced manual sweep() invocation events 24h, metrics 30d (SRS §7)
Integration tests make test-integration testcontainers Kafka + Postgres in CI

Development

# Install dependencies (full dev stack incl. testcontainers)
pip install -r requirements.txt

# Run unit tests (skips integration)
make test

# Run integration tests (requires Docker — spins up Kafka + Postgres)
make test-integration

# Run everything
make test-all

# Lint
make lint

# Run mock producer
make mock                                      # plain B1-shaped events
python tools/mock_producer.py --with-lanes --brokers localhost:29094     # include lane_id ~80% of events
python tools/mock_producer.py --no-speed --brokers localhost:29094       # exercise B2 speed fallback

# View logs
make logs

# Tear down
make down

Retention

Per SRS §7, the processor sweeps old rows on a configurable interval (default 1 hour): traffic_events are kept for 24h, traffic_metrics for 30 days. Override via RETENTION_EVENTS_HOURS / RETENTION_METRICS_DAYS.

API Endpoints

All HTTP paths below are served by b2-api (default host mapping: http://localhost:18001 → container port 8000). Routers are defined under src/api/main.py and src/api/routes/.

Traffic and health

Method Path Description
GET /health Liveness probe (Kafka + Postgres reachability)
GET /cameras List active cameras (from latest metrics)
GET /metrics/current?camera_id=X Latest aggregated metrics for one camera
GET /metrics/history?camera_id=X&from=T1&to=T2 Historical metrics (from / to are ISO 8601)
GET /congestion/current Latest camera-wide congestion for all cameras

Dashboard and map (B3 helpers)

Method Path Description
GET /api/dashboard/summary KPI summary across cameras
GET /api/dashboard/events?limit=N Recent raw traffic events (default limit=10)
GET /api/map/heatmap?minutes=N Per-camera intensity for heatmap (default minutes=5)
GET /api/map/incidents?severity=… Cameras with elevated congestion; optional severity filter

Alerts

Method Path Description
GET /api/alerts/current Active congestion-style alerts from latest metrics
GET /api/alerts/history Optional filters: from, to, severity, road_segment, type
POST /api/alerts/{alert_id}/acknowledge JSON body: admin_id (see OpenAPI /docs)
GET /api/alerts/export CSV export; same query params as history

Analytics

Method Path Description
GET /api/analytics/metrics Optional start / end (ISO); defaults to last 24h
GET /api/analytics/compare Required start_a, end_a, start_b, end_b
GET /api/analytics/report/pdf Optional start / end; returns a PDF attachment

Forecast (ST-GCN)

Method Path Description
GET /api/predict/congestion?camera_id=X ST-GCN lane forecast aggregated to camera level. Query params: horizon_minutes (default 10), lookback_minutes (default 15). Needs traffic-predictor/ config + trained checkpoint on the API host; see TRAFFIC_PREDICTOR_HOME, STGCN_CONFIG_PATH, STGCN_CHECKPOINT_PATH in .env.example.

Admin (authenticated)

All under /api/admin/…. Send X-Admin-Token: <ADMIN_API_KEY> (or Bearer token). In docker-compose.yml, b2-api sets ADMIN_API_KEY for local dev.

Method Path Description
GET/PUT /api/admin/thresholds Read/update congestion thresholds
GET /api/admin/zones List monitoring zones
POST /api/admin/zones Create zone (JSON body)
PUT /api/admin/zones/{zone_id} Update zone
DELETE /api/admin/zones/{zone_id} Delete zone
POST /api/admin/notifications/broadcast Push message to WebSocket admin_broadcast

WebSockets

Protocol Path Description
WS /ws/metrics Optional query params: camera_id, role (viewer or operator)
WS /ws/metrics/lanes Same optional params as /ws/metrics
WS /ws/events[?types=…] Typed envelopes: traffic_metrics_update, heatmap_update, new_alert, admin_broadcast

Prometheus (API process)

Method Path Description
GET /metrics Prometheus scrape (request counts, latency, etc.)

The stream processor exposes its own Prometheus endpoint on host port 9100 (see Demo §11): b2_events_processed_total, b2_events_dropped_total{reason}, b2_window_flushes_total, b2_kafka_consumer_lag.

Interactive OpenAPI: http://localhost:18001/docs (Swagger UI) lists every route, parameters, and response schemas.

Verify the HTTP API locally (step-by-step)

Follow these in order after the stack is up (docker compose up -d, migrations applied as in Demo §1). Set a base URL (host port from docker-compose.yml for b2-api):

export BASE=http://localhost:18001
  1. Health — expect status ok or degraded if Kafka is down.

    curl -s "$BASE/health" | jq
  2. Cameras — confirms the API can read Postgres; IDs should match your producer (e.g. cam_01).

    curl -s "$BASE/cameras" | jq
  3. Latest metrics — pick any camera_id from step 2.

    curl -s "$BASE/metrics/current?camera_id=cam_01" | jq
  4. Metrics history — adjust from / to to bracket existing window_start values (UTC ISO 8601).

    curl -s "$BASE/metrics/history?camera_id=cam_01&from=2026-05-01T00:00:00Z&to=2026-05-11T23:59:59Z" | jq
  5. Congestion snapshot (all cameras).

    curl -s "$BASE/congestion/current" | jq
  6. Dashboard summary.

    curl -s "$BASE/api/dashboard/summary" | jq
  7. Recent events.

    curl -s "$BASE/api/dashboard/events?limit=5" | jq
  8. Heatmap points.

    curl -s "$BASE/api/map/heatmap?minutes=5" | jq
  9. Map incidents (optional ?severity=HIGH).

    curl -s "$BASE/api/map/incidents" | jq
  10. Alerts — may be an empty array until metrics show HIGH/CRITICAL-style congestion.

    curl -s "$BASE/api/alerts/current" | jq
  11. Analytics (24h default window).

    curl -s "$BASE/api/analytics/metrics" | jq

    Range compare (optional): GET /api/analytics/compare requires all of start_a, end_a, start_b, and end_b (ISO 8601). Use /docs to try it interactively.

  12. Analytics PDF — writes a file; omit -s if you want HTTP headers.

    curl -sS -o /tmp/b2-report.pdf "$BASE/api/analytics/report/pdf"
  13. ST-GCN forecast — defaults 10 min horizon, 15 min lookback. Returns 404 if there is no recent per-lane data for the camera, or the camera is outside the ST-GCN topology; 503 if the model bundle or checkpoint is missing or inference fails.

    curl -s "$BASE/api/predict/congestion?camera_id=cam_01" | jq
    curl -s "$BASE/api/predict/congestion?camera_id=cam_01&horizon_minutes=10&lookback_minutes=15" | jq
  14. Admin thresholds (optional) — uses ADMIN_API_KEY from Compose (example: my-secret-123).

    curl -s -H "X-Admin-Token: my-secret-123" "$BASE/api/admin/thresholds" | jq
  15. WebSockets (optional) — install websocat. Use ws:// (not http://).

    websocat "ws://localhost:18001/ws/metrics"
    websocat "ws://localhost:18001/ws/events"
    websocat "ws://localhost:18001/ws/metrics?camera_id=cam_01"
  16. Prometheus scrape (optional).

    curl -s "$BASE/metrics" | head
    curl -s "http://localhost:9100/metrics" | head   # processor; only if port 9100 is published

Configuration

All configuration via environment variables. See .env.example for the full list.

Known caveats / Backlog

These are tracked on the project board so we don't lose them:

  • Pixel-to-meter calibration is a placeholder (SPEED_TRACKER_PIXEL_TO_METER=0.05). Absolute km/h numbers will be off until B1 hands over per-camera survey data. Tracked in #19.
  • In-memory windowing, not PyFlink. Fine for the demo and the load we expect; PyFlink migration is on the backlog (#20) for after the demo.
  • Auth is delegated to Kong / Keycloak (B4). B2 endpoints are unauthenticated when hit directly — fine inside the cluster, defense-in-depth tracked in #23.
  • Event-time window flushing now uses a per-camera watermark + allowed lateness (tracked in #21). Keep WINDOW_ALLOWED_LATENESS_SECONDS tuned to your worst expected camera skew.

Project Structure

src/
  shared/     — Config, DB models, Pydantic schemas (shared by both services)
  processor/  — Stream processor (Kafka consumer, windowed aggregation, Postgres writer)
  api/        — FastAPI REST + WebSocket service
tools/        — Mock event producer for development
migrations/   — Alembic database migrations
tests/        — Unit tests + tests/integration/ (testcontainers)
docker/       — Dockerfiles for each service

About

B2 Data & Intelligence — Stream processing and analytics API for the Intelligent Traffic System

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors