Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions tests/test_hmac.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from unittest.mock import patch

from vp_core.security import sign_request, verify_request

# Shared cross-language test vector.
# The same inputs MUST produce the same signature when Ruby signs via
# OpenSSL::HMAC.hexdigest. volo-be/spec/clients/volo_agents_client_spec.rb
# asserts the same EXPECTED_SIG below — any drift breaks auth in production.
SHARED_SECRET = "test_secret_do_not_use_in_prod"
SHARED_METHOD = "POST"
SHARED_PATH = "/api/v1/keywords/suggest"
SHARED_BODY = b'{"seeds":["beer"]}'
SHARED_TIMESTAMP = 1712419200
EXPECTED_SIG = "95975a61fdd5896a2e3ada649ead58c1a96dbf3c2a338a8d862580ec05b44e69"


def test_shared_vector_matches_ruby():
"""Cross-language contract — Ruby's OpenSSL::HMAC must produce this hex."""
sig, ts = sign_request(
SHARED_SECRET, SHARED_METHOD, SHARED_PATH, SHARED_BODY, SHARED_TIMESTAMP
)
assert sig == EXPECTED_SIG
assert ts == SHARED_TIMESTAMP


def test_sign_and_verify_round_trip():
sig, ts = sign_request("s3cret", "POST", "/x", b"hello")
with patch("vp_core.security.hmac.time.time", return_value=ts):
assert verify_request("s3cret", sig, "POST", "/x", b"hello", ts) is True


def test_verify_tampered_body():
sig, ts = sign_request("s3cret", "POST", "/x", b"hello")
with patch("vp_core.security.hmac.time.time", return_value=ts):
assert verify_request("s3cret", sig, "POST", "/x", b"tampered", ts) is False


def test_verify_tampered_path():
sig, ts = sign_request("s3cret", "POST", "/x", b"hello")
with patch("vp_core.security.hmac.time.time", return_value=ts):
assert verify_request("s3cret", sig, "POST", "/y", b"hello", ts) is False


def test_verify_tampered_method():
sig, ts = sign_request("s3cret", "POST", "/x", b"hello")
with patch("vp_core.security.hmac.time.time", return_value=ts):
assert verify_request("s3cret", sig, "DELETE", "/x", b"hello", ts) is False


def test_verify_wrong_secret():
sig, ts = sign_request("s3cret", "POST", "/x", b"hello")
with patch("vp_core.security.hmac.time.time", return_value=ts):
assert verify_request("wrong", sig, "POST", "/x", b"hello", ts) is False


def test_verify_expired_timestamp():
sig, ts = sign_request("s3cret", "POST", "/x", b"hello", timestamp=1_000_000)
# now is 61s past the signed timestamp — outside 60s window
with patch("vp_core.security.hmac.time.time", return_value=1_000_061):
assert verify_request("s3cret", sig, "POST", "/x", b"hello", ts) is False


def test_verify_future_timestamp():
sig, ts = sign_request("s3cret", "POST", "/x", b"hello", timestamp=1_000_000)
# now is 61s before the signed timestamp — clock skew outside window
with patch("vp_core.security.hmac.time.time", return_value=999_939):
assert verify_request("s3cret", sig, "POST", "/x", b"hello", ts) is False


def test_verify_within_window_edge():
sig, ts = sign_request("s3cret", "POST", "/x", b"hello", timestamp=1_000_000)
# exactly 60s off — still valid (boundary is inclusive)
with patch("vp_core.security.hmac.time.time", return_value=1_000_060):
assert verify_request("s3cret", sig, "POST", "/x", b"hello", ts) is True


def test_sign_request_normalizes_method_case():
sig_upper, _ = sign_request("s3cret", "POST", "/x", b"", timestamp=1)
sig_lower, _ = sign_request("s3cret", "post", "/x", b"", timestamp=1)
assert sig_upper == sig_lower


def test_sign_request_empty_body():
"""GET requests have empty body — should still sign cleanly."""
sig, ts = sign_request("s3cret", "GET", "/_ping", b"")
with patch("vp_core.security.hmac.time.time", return_value=ts):
assert verify_request("s3cret", sig, "GET", "/_ping", b"", ts) is True


def test_sign_request_uses_current_time_when_no_timestamp():
with patch("vp_core.security.hmac.time.time", return_value=1234567890):
_, ts = sign_request("s3cret", "POST", "/x", b"")
assert ts == 1234567890


def test_verify_custom_window():
sig, ts = sign_request("s3cret", "POST", "/x", b"hello", timestamp=1_000_000)
# 10s old — inside custom 5s window? No.
with patch("vp_core.security.hmac.time.time", return_value=1_000_010):
assert verify_request("s3cret", sig, "POST", "/x", b"hello", ts, window_seconds=5) is False
# 10s old — inside custom 15s window? Yes.
with patch("vp_core.security.hmac.time.time", return_value=1_000_010):
assert verify_request("s3cret", sig, "POST", "/x", b"hello", ts, window_seconds=15) is True
7 changes: 7 additions & 0 deletions vp_core/clients/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""
HTTP client utilities for interacting with external/internal services.
"""

from .volopay_be import validate_token

__all__ = ["validate_token"]
109 changes: 109 additions & 0 deletions vp_core/clients/volopay_be.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""
Volopay Backend (volo-be) HTTP client utilities.

This module provides shared utilities for microservices that need to interact
with the main volo-be backend, primarily for auth token validation.

Usage:
from vp_core.clients.volopay_be import validate_token

valid = await validate_token(
client="web",
access_token="abc123",
uid="user@example.com",
account="volopay",
volo_be_url="https://api.volopay.com"
)
"""

import httpx


async def validate_token(
client: str,
access_token: str,
uid: str,
account: str,
volo_be_url: str,
x_feature: str | None = None,
) -> bool:
"""
Validates a user's session token by calling volo-be's token validation endpoint.

This is used by satellite microservices (volo-agents, ocr-reader, etc.) to verify
that a frontend user's token is valid without maintaining their own user database.

Note: Environment-specific logic (e.g., skipping validation in test) should be
handled by the calling service, not here. This is a pure HTTP client function.

Args:
client: Client identifier (e.g., "web", "mobile")
access_token: User's session access token
uid: User identifier (typically email)
account: Account/organization identifier
volo_be_url: Base URL of volo-be (e.g., "https://api.volopay.com")
x_feature: Optional feature flag header

Returns:
True if the token is valid (volo-be returned 200), False otherwise.

Example:
>>> valid = await validate_token(
... client="web",
... access_token="eyJ0eXAi...",
... uid="user@example.com",
... account="volopay",
... volo_be_url="https://api.volopay.com"
... )
>>> if valid:
... # proceed with authenticated request
"""
headers = {
"client": client,
"access-token": access_token,
"uid": uid,
"account": account,
}

# Add optional x_feature header if provided
if x_feature:
headers["x_feature"] = x_feature

async with httpx.AsyncClient(base_url=volo_be_url, headers=headers) as http:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel it should change base url as well. based on the request u are getting

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this we discussed, will pass this from FE

try:
response = await http.get("/api/v3/auth/user/validate_token")
return response.status_code == 200
except httpx.HTTPError:
# Network errors, timeouts, etc. — treat as invalid token
return False


# Future: Two-layer auth builder
# When multiple microservices (volo-agents, ocr-reader, future services) all adopt
# the same two-layer auth pattern (shared secret + token validation fallback),
# consider adding a generic FastAPI dependency builder here:
#
# def create_two_layer_auth_dependency(
# service_name: str, # "agents", "ocr", etc.
# secret_env_var: str, # "VOLO_BE_AGENTS_SECRET"
# volo_be_url_env_var: str = "VOLO_BE_URL",
# env_var: str = "ENV"
# ) -> Callable:
# """
# Creates a FastAPI Depends() function that enforces two-layer auth:
# 1. Checks volo_be_{service_name}_secret header against env var
# 2. Falls back to validate_token() if Layer 1 fails
#
# Returns:
# A FastAPI dependency (Depends-compatible callable)
#
# Example:
# from vp_core.clients import create_two_layer_auth_dependency
#
# AuthDep = create_two_layer_auth_dependency(
# service_name="agents",
# secret_env_var="VOLO_BE_AGENTS_SECRET"
# )
#
# app.include_router(router, dependencies=[AuthDep])
# """
9 changes: 8 additions & 1 deletion vp_core/security/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
from .guardrails import PromptSanitizer, OutputGuardrail
from .hmac import REPLAY_WINDOW_SECONDS, sign_request, verify_request

__all__ = ["PromptSanitizer", "OutputGuardrail"]
__all__ = [
"PromptSanitizer",
"OutputGuardrail",
"sign_request",
"verify_request",
"REPLAY_WINDOW_SECONDS",
]
100 changes: 100 additions & 0 deletions vp_core/security/hmac.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""
HMAC request signing utilities for service-to-service authentication.

Pure-function helpers for signing and verifying HTTP requests between
Volopay microservices. Shared across all Python services that need
Layer 1 (backend-to-backend) auth.

Usage (verifier side — e.g., volo-agents FastAPI dependency):

from vp_core.security import verify_request

body = await request.body()
ok = verify_request(
secret=SHARED_SECRET,
signature=request.headers["x-signature"],
method=request.method,
path=request.url.path,
body=body,
timestamp=int(request.headers["x-timestamp"]),
)

Usage (sender side — if the sender is Python):

from vp_core.security import sign_request

signature, ts = sign_request(SHARED_SECRET, "POST", "/api/v1/x", body_bytes)
headers = {"X-Signature": signature, "X-Timestamp": str(ts)}

Non-Python senders (e.g., the Ruby volo-be client) implement their own
signing using OpenSSL::HMAC. The shared test vector in tests/test_hmac.py
enforces byte-identical output across languages.
"""

import hashlib
import hmac
import time

REPLAY_WINDOW_SECONDS = 60


def sign_request(
secret: str,
method: str,
path: str,
body: bytes,
timestamp: int | None = None,
) -> tuple[str, int]:
"""
Compute HMAC-SHA256 signature over (timestamp, method, path, body).

Args:
secret: Shared secret known to both sender and verifier
method: HTTP method (case-insensitive, normalized to uppercase)
path: Request path (e.g., "/api/v1/keywords/suggest")
body: Raw request body bytes (empty bytes for GET requests)
timestamp: Unix timestamp in seconds. If None, uses current time.

Returns:
(signature_hex, timestamp) — both travel as headers.
"""
ts = timestamp if timestamp is not None else int(time.time())
prefix = f"{ts}\n{method.upper()}\n{path}\n".encode()
signature = hmac.new(secret.encode(), prefix + body, hashlib.sha256).hexdigest()
return signature, ts


def verify_request(
secret: str,
signature: str,
method: str,
path: str,
body: bytes,
timestamp: int,
window_seconds: int = REPLAY_WINDOW_SECONDS,
) -> bool:
"""
Verify an incoming HMAC signature with replay protection.

Performs two checks:
1. Timestamp is within ``window_seconds`` of now (replay protection)
2. Recomputed signature matches the provided one (timing-safe)

Args:
secret: Shared secret known to both sender and verifier
signature: Hex-encoded signature from X-Signature header
method: HTTP method from the incoming request
path: Request path from the incoming request
body: Raw request body bytes
timestamp: Unix timestamp from X-Timestamp header
window_seconds: Max allowed clock drift. Defaults to 60s.

Returns:
True if valid, False on any failure (timestamp skew, signature
mismatch, wrong secret). Never raises.
"""
if abs(int(time.time()) - timestamp) > window_seconds:
return False

expected, _ = sign_request(secret, method, path, body, timestamp)
return hmac.compare_digest(signature, expected)