diff --git a/tests/test_hmac.py b/tests/test_hmac.py new file mode 100644 index 0000000..3a4cfc5 --- /dev/null +++ b/tests/test_hmac.py @@ -0,0 +1,103 @@ +from unittest.mock import patch + +from vp_core.security import sign_request, verify_request + +# Shared cross-language test vector. +# The same inputs MUST produce the same signature when Ruby signs via +# OpenSSL::HMAC.hexdigest. volo-be/spec/clients/volo_agents_client_spec.rb +# asserts the same EXPECTED_SIG below — any drift breaks auth in production. +SHARED_SECRET = "test_secret_do_not_use_in_prod" +SHARED_METHOD = "POST" +SHARED_PATH = "/api/v1/keywords/suggest" +SHARED_BODY = b'{"seeds":["beer"]}' +SHARED_TIMESTAMP = 1712419200 +EXPECTED_SIG = "95975a61fdd5896a2e3ada649ead58c1a96dbf3c2a338a8d862580ec05b44e69" + + +def test_shared_vector_matches_ruby(): + """Cross-language contract — Ruby's OpenSSL::HMAC must produce this hex.""" + sig, ts = sign_request( + SHARED_SECRET, SHARED_METHOD, SHARED_PATH, SHARED_BODY, SHARED_TIMESTAMP + ) + assert sig == EXPECTED_SIG + assert ts == SHARED_TIMESTAMP + + +def test_sign_and_verify_round_trip(): + sig, ts = sign_request("s3cret", "POST", "/x", b"hello") + with patch("vp_core.security.hmac.time.time", return_value=ts): + assert verify_request("s3cret", sig, "POST", "/x", b"hello", ts) is True + + +def test_verify_tampered_body(): + sig, ts = sign_request("s3cret", "POST", "/x", b"hello") + with patch("vp_core.security.hmac.time.time", return_value=ts): + assert verify_request("s3cret", sig, "POST", "/x", b"tampered", ts) is False + + +def test_verify_tampered_path(): + sig, ts = sign_request("s3cret", "POST", "/x", b"hello") + with patch("vp_core.security.hmac.time.time", return_value=ts): + assert verify_request("s3cret", sig, "POST", "/y", b"hello", ts) is False + + +def test_verify_tampered_method(): + sig, ts = sign_request("s3cret", "POST", "/x", b"hello") + with patch("vp_core.security.hmac.time.time", return_value=ts): + assert verify_request("s3cret", sig, "DELETE", "/x", b"hello", ts) is False + + +def test_verify_wrong_secret(): + sig, ts = sign_request("s3cret", "POST", "/x", b"hello") + with patch("vp_core.security.hmac.time.time", return_value=ts): + assert verify_request("wrong", sig, "POST", "/x", b"hello", ts) is False + + +def test_verify_expired_timestamp(): + sig, ts = sign_request("s3cret", "POST", "/x", b"hello", timestamp=1_000_000) + # now is 61s past the signed timestamp — outside 60s window + with patch("vp_core.security.hmac.time.time", return_value=1_000_061): + assert verify_request("s3cret", sig, "POST", "/x", b"hello", ts) is False + + +def test_verify_future_timestamp(): + sig, ts = sign_request("s3cret", "POST", "/x", b"hello", timestamp=1_000_000) + # now is 61s before the signed timestamp — clock skew outside window + with patch("vp_core.security.hmac.time.time", return_value=999_939): + assert verify_request("s3cret", sig, "POST", "/x", b"hello", ts) is False + + +def test_verify_within_window_edge(): + sig, ts = sign_request("s3cret", "POST", "/x", b"hello", timestamp=1_000_000) + # exactly 60s off — still valid (boundary is inclusive) + with patch("vp_core.security.hmac.time.time", return_value=1_000_060): + assert verify_request("s3cret", sig, "POST", "/x", b"hello", ts) is True + + +def test_sign_request_normalizes_method_case(): + sig_upper, _ = sign_request("s3cret", "POST", "/x", b"", timestamp=1) + sig_lower, _ = sign_request("s3cret", "post", "/x", b"", timestamp=1) + assert sig_upper == sig_lower + + +def test_sign_request_empty_body(): + """GET requests have empty body — should still sign cleanly.""" + sig, ts = sign_request("s3cret", "GET", "/_ping", b"") + with patch("vp_core.security.hmac.time.time", return_value=ts): + assert verify_request("s3cret", sig, "GET", "/_ping", b"", ts) is True + + +def test_sign_request_uses_current_time_when_no_timestamp(): + with patch("vp_core.security.hmac.time.time", return_value=1234567890): + _, ts = sign_request("s3cret", "POST", "/x", b"") + assert ts == 1234567890 + + +def test_verify_custom_window(): + sig, ts = sign_request("s3cret", "POST", "/x", b"hello", timestamp=1_000_000) + # 10s old — inside custom 5s window? No. + with patch("vp_core.security.hmac.time.time", return_value=1_000_010): + assert verify_request("s3cret", sig, "POST", "/x", b"hello", ts, window_seconds=5) is False + # 10s old — inside custom 15s window? Yes. + with patch("vp_core.security.hmac.time.time", return_value=1_000_010): + assert verify_request("s3cret", sig, "POST", "/x", b"hello", ts, window_seconds=15) is True diff --git a/vp_core/clients/__init__.py b/vp_core/clients/__init__.py new file mode 100644 index 0000000..0ec78e9 --- /dev/null +++ b/vp_core/clients/__init__.py @@ -0,0 +1,7 @@ +""" +HTTP client utilities for interacting with external/internal services. +""" + +from .volopay_be import validate_token + +__all__ = ["validate_token"] diff --git a/vp_core/clients/volopay_be.py b/vp_core/clients/volopay_be.py new file mode 100644 index 0000000..4b071da --- /dev/null +++ b/vp_core/clients/volopay_be.py @@ -0,0 +1,109 @@ +""" +Volopay Backend (volo-be) HTTP client utilities. + +This module provides shared utilities for microservices that need to interact +with the main volo-be backend, primarily for auth token validation. + +Usage: + from vp_core.clients.volopay_be import validate_token + + valid = await validate_token( + client="web", + access_token="abc123", + uid="user@example.com", + account="volopay", + volo_be_url="https://api.volopay.com" + ) +""" + +import httpx + + +async def validate_token( + client: str, + access_token: str, + uid: str, + account: str, + volo_be_url: str, + x_feature: str | None = None, +) -> bool: + """ + Validates a user's session token by calling volo-be's token validation endpoint. + + This is used by satellite microservices (volo-agents, ocr-reader, etc.) to verify + that a frontend user's token is valid without maintaining their own user database. + + Note: Environment-specific logic (e.g., skipping validation in test) should be + handled by the calling service, not here. This is a pure HTTP client function. + + Args: + client: Client identifier (e.g., "web", "mobile") + access_token: User's session access token + uid: User identifier (typically email) + account: Account/organization identifier + volo_be_url: Base URL of volo-be (e.g., "https://api.volopay.com") + x_feature: Optional feature flag header + + Returns: + True if the token is valid (volo-be returned 200), False otherwise. + + Example: + >>> valid = await validate_token( + ... client="web", + ... access_token="eyJ0eXAi...", + ... uid="user@example.com", + ... account="volopay", + ... volo_be_url="https://api.volopay.com" + ... ) + >>> if valid: + ... # proceed with authenticated request + """ + headers = { + "client": client, + "access-token": access_token, + "uid": uid, + "account": account, + } + + # Add optional x_feature header if provided + if x_feature: + headers["x_feature"] = x_feature + + async with httpx.AsyncClient(base_url=volo_be_url, headers=headers) as http: + try: + response = await http.get("/api/v3/auth/user/validate_token") + return response.status_code == 200 + except httpx.HTTPError: + # Network errors, timeouts, etc. — treat as invalid token + return False + + +# Future: Two-layer auth builder +# When multiple microservices (volo-agents, ocr-reader, future services) all adopt +# the same two-layer auth pattern (shared secret + token validation fallback), +# consider adding a generic FastAPI dependency builder here: +# +# def create_two_layer_auth_dependency( +# service_name: str, # "agents", "ocr", etc. +# secret_env_var: str, # "VOLO_BE_AGENTS_SECRET" +# volo_be_url_env_var: str = "VOLO_BE_URL", +# env_var: str = "ENV" +# ) -> Callable: +# """ +# Creates a FastAPI Depends() function that enforces two-layer auth: +# 1. Checks volo_be_{service_name}_secret header against env var +# 2. Falls back to validate_token() if Layer 1 fails +# +# Returns: +# A FastAPI dependency (Depends-compatible callable) +# +# Example: +# from vp_core.clients import create_two_layer_auth_dependency +# +# AuthDep = create_two_layer_auth_dependency( +# service_name="agents", +# secret_env_var="VOLO_BE_AGENTS_SECRET" +# ) +# +# app.include_router(router, dependencies=[AuthDep]) +# """ diff --git a/vp_core/security/__init__.py b/vp_core/security/__init__.py index 8fdfd34..2d6ba8d 100644 --- a/vp_core/security/__init__.py +++ b/vp_core/security/__init__.py @@ -1,3 +1,10 @@ from .guardrails import PromptSanitizer, OutputGuardrail +from .hmac import REPLAY_WINDOW_SECONDS, sign_request, verify_request -__all__ = ["PromptSanitizer", "OutputGuardrail"] +__all__ = [ + "PromptSanitizer", + "OutputGuardrail", + "sign_request", + "verify_request", + "REPLAY_WINDOW_SECONDS", +] diff --git a/vp_core/security/hmac.py b/vp_core/security/hmac.py new file mode 100644 index 0000000..66e6248 --- /dev/null +++ b/vp_core/security/hmac.py @@ -0,0 +1,100 @@ +""" +HMAC request signing utilities for service-to-service authentication. + +Pure-function helpers for signing and verifying HTTP requests between +Volopay microservices. Shared across all Python services that need +Layer 1 (backend-to-backend) auth. + +Usage (verifier side — e.g., volo-agents FastAPI dependency): + + from vp_core.security import verify_request + + body = await request.body() + ok = verify_request( + secret=SHARED_SECRET, + signature=request.headers["x-signature"], + method=request.method, + path=request.url.path, + body=body, + timestamp=int(request.headers["x-timestamp"]), + ) + +Usage (sender side — if the sender is Python): + + from vp_core.security import sign_request + + signature, ts = sign_request(SHARED_SECRET, "POST", "/api/v1/x", body_bytes) + headers = {"X-Signature": signature, "X-Timestamp": str(ts)} + +Non-Python senders (e.g., the Ruby volo-be client) implement their own +signing using OpenSSL::HMAC. The shared test vector in tests/test_hmac.py +enforces byte-identical output across languages. +""" + +import hashlib +import hmac +import time + +REPLAY_WINDOW_SECONDS = 60 + + +def sign_request( + secret: str, + method: str, + path: str, + body: bytes, + timestamp: int | None = None, +) -> tuple[str, int]: + """ + Compute HMAC-SHA256 signature over (timestamp, method, path, body). + + Args: + secret: Shared secret known to both sender and verifier + method: HTTP method (case-insensitive, normalized to uppercase) + path: Request path (e.g., "/api/v1/keywords/suggest") + body: Raw request body bytes (empty bytes for GET requests) + timestamp: Unix timestamp in seconds. If None, uses current time. + + Returns: + (signature_hex, timestamp) — both travel as headers. + """ + ts = timestamp if timestamp is not None else int(time.time()) + prefix = f"{ts}\n{method.upper()}\n{path}\n".encode() + signature = hmac.new(secret.encode(), prefix + body, hashlib.sha256).hexdigest() + return signature, ts + + +def verify_request( + secret: str, + signature: str, + method: str, + path: str, + body: bytes, + timestamp: int, + window_seconds: int = REPLAY_WINDOW_SECONDS, +) -> bool: + """ + Verify an incoming HMAC signature with replay protection. + + Performs two checks: + 1. Timestamp is within ``window_seconds`` of now (replay protection) + 2. Recomputed signature matches the provided one (timing-safe) + + Args: + secret: Shared secret known to both sender and verifier + signature: Hex-encoded signature from X-Signature header + method: HTTP method from the incoming request + path: Request path from the incoming request + body: Raw request body bytes + timestamp: Unix timestamp from X-Timestamp header + window_seconds: Max allowed clock drift. Defaults to 60s. + + Returns: + True if valid, False on any failure (timestamp skew, signature + mismatch, wrong secret). Never raises. + """ + if abs(int(time.time()) - timestamp) > window_seconds: + return False + + expected, _ = sign_request(secret, method, path, body, timestamp) + return hmac.compare_digest(signature, expected)