-
Notifications
You must be signed in to change notification settings - Fork 0
Volo be validate token logic added #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
ShyamSagothia
wants to merge
7
commits into
master
Choose a base branch
from
volo-be-validate-token-logic-added
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+327
−1
Open
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
5ec03aa
feat: Add Volopay backend client with token validation utility.
ShyamSagothia a7b4a58
added httpx library
ShyamSagothia b49acda
minor changes added
ShyamSagothia b6ded3f
Merge branch 'master' into volo-be-validate-token-logic-added
ShyamSagothia 87bc9f7
resolved pr comments
ShyamSagothia a21add0
feat: added HMAC Request Signing to use in server-to-server calls
ShyamSagothia f637aa7
Merge branch 'master' into volo-be-validate-token-logic-added
ShyamSagothia File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,103 @@ | ||
| from unittest.mock import patch | ||
|
|
||
| from vp_core.security import sign_request, verify_request | ||
|
|
||
| # Shared cross-language test vector. | ||
| # The same inputs MUST produce the same signature when Ruby signs via | ||
| # OpenSSL::HMAC.hexdigest. volo-be/spec/clients/volo_agents_client_spec.rb | ||
| # asserts the same EXPECTED_SIG below — any drift breaks auth in production. | ||
| SHARED_SECRET = "test_secret_do_not_use_in_prod" | ||
| SHARED_METHOD = "POST" | ||
| SHARED_PATH = "/api/v1/keywords/suggest" | ||
| SHARED_BODY = b'{"seeds":["beer"]}' | ||
| SHARED_TIMESTAMP = 1712419200 | ||
| EXPECTED_SIG = "95975a61fdd5896a2e3ada649ead58c1a96dbf3c2a338a8d862580ec05b44e69" | ||
|
|
||
|
|
||
| def test_shared_vector_matches_ruby(): | ||
| """Cross-language contract — Ruby's OpenSSL::HMAC must produce this hex.""" | ||
| sig, ts = sign_request( | ||
| SHARED_SECRET, SHARED_METHOD, SHARED_PATH, SHARED_BODY, SHARED_TIMESTAMP | ||
| ) | ||
| assert sig == EXPECTED_SIG | ||
| assert ts == SHARED_TIMESTAMP | ||
|
|
||
|
|
||
| def test_sign_and_verify_round_trip(): | ||
| sig, ts = sign_request("s3cret", "POST", "/x", b"hello") | ||
| with patch("vp_core.security.hmac.time.time", return_value=ts): | ||
| assert verify_request("s3cret", sig, "POST", "/x", b"hello", ts) is True | ||
|
|
||
|
|
||
| def test_verify_tampered_body(): | ||
| sig, ts = sign_request("s3cret", "POST", "/x", b"hello") | ||
| with patch("vp_core.security.hmac.time.time", return_value=ts): | ||
| assert verify_request("s3cret", sig, "POST", "/x", b"tampered", ts) is False | ||
|
|
||
|
|
||
| def test_verify_tampered_path(): | ||
| sig, ts = sign_request("s3cret", "POST", "/x", b"hello") | ||
| with patch("vp_core.security.hmac.time.time", return_value=ts): | ||
| assert verify_request("s3cret", sig, "POST", "/y", b"hello", ts) is False | ||
|
|
||
|
|
||
| def test_verify_tampered_method(): | ||
| sig, ts = sign_request("s3cret", "POST", "/x", b"hello") | ||
| with patch("vp_core.security.hmac.time.time", return_value=ts): | ||
| assert verify_request("s3cret", sig, "DELETE", "/x", b"hello", ts) is False | ||
|
|
||
|
|
||
| def test_verify_wrong_secret(): | ||
| sig, ts = sign_request("s3cret", "POST", "/x", b"hello") | ||
| with patch("vp_core.security.hmac.time.time", return_value=ts): | ||
| assert verify_request("wrong", sig, "POST", "/x", b"hello", ts) is False | ||
|
|
||
|
|
||
| def test_verify_expired_timestamp(): | ||
| sig, ts = sign_request("s3cret", "POST", "/x", b"hello", timestamp=1_000_000) | ||
| # now is 61s past the signed timestamp — outside 60s window | ||
| with patch("vp_core.security.hmac.time.time", return_value=1_000_061): | ||
| assert verify_request("s3cret", sig, "POST", "/x", b"hello", ts) is False | ||
|
|
||
|
|
||
| def test_verify_future_timestamp(): | ||
| sig, ts = sign_request("s3cret", "POST", "/x", b"hello", timestamp=1_000_000) | ||
| # now is 61s before the signed timestamp — clock skew outside window | ||
| with patch("vp_core.security.hmac.time.time", return_value=999_939): | ||
| assert verify_request("s3cret", sig, "POST", "/x", b"hello", ts) is False | ||
|
|
||
|
|
||
| def test_verify_within_window_edge(): | ||
| sig, ts = sign_request("s3cret", "POST", "/x", b"hello", timestamp=1_000_000) | ||
| # exactly 60s off — still valid (boundary is inclusive) | ||
| with patch("vp_core.security.hmac.time.time", return_value=1_000_060): | ||
| assert verify_request("s3cret", sig, "POST", "/x", b"hello", ts) is True | ||
|
|
||
|
|
||
| def test_sign_request_normalizes_method_case(): | ||
| sig_upper, _ = sign_request("s3cret", "POST", "/x", b"", timestamp=1) | ||
| sig_lower, _ = sign_request("s3cret", "post", "/x", b"", timestamp=1) | ||
| assert sig_upper == sig_lower | ||
|
|
||
|
|
||
| def test_sign_request_empty_body(): | ||
| """GET requests have empty body — should still sign cleanly.""" | ||
| sig, ts = sign_request("s3cret", "GET", "/_ping", b"") | ||
| with patch("vp_core.security.hmac.time.time", return_value=ts): | ||
| assert verify_request("s3cret", sig, "GET", "/_ping", b"", ts) is True | ||
|
|
||
|
|
||
| def test_sign_request_uses_current_time_when_no_timestamp(): | ||
| with patch("vp_core.security.hmac.time.time", return_value=1234567890): | ||
| _, ts = sign_request("s3cret", "POST", "/x", b"") | ||
| assert ts == 1234567890 | ||
|
|
||
|
|
||
| def test_verify_custom_window(): | ||
| sig, ts = sign_request("s3cret", "POST", "/x", b"hello", timestamp=1_000_000) | ||
| # 10s old — inside custom 5s window? No. | ||
| with patch("vp_core.security.hmac.time.time", return_value=1_000_010): | ||
| assert verify_request("s3cret", sig, "POST", "/x", b"hello", ts, window_seconds=5) is False | ||
| # 10s old — inside custom 15s window? Yes. | ||
| with patch("vp_core.security.hmac.time.time", return_value=1_000_010): | ||
| assert verify_request("s3cret", sig, "POST", "/x", b"hello", ts, window_seconds=15) is True |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| """ | ||
| HTTP client utilities for interacting with external/internal services. | ||
| """ | ||
|
|
||
| from .volopay_be import validate_token | ||
|
|
||
| __all__ = ["validate_token"] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,109 @@ | ||
| """ | ||
| Volopay Backend (volo-be) HTTP client utilities. | ||
|
|
||
| This module provides shared utilities for microservices that need to interact | ||
| with the main volo-be backend, primarily for auth token validation. | ||
|
|
||
| Usage: | ||
| from vp_core.clients.volopay_be import validate_token | ||
|
|
||
| valid = await validate_token( | ||
| client="web", | ||
| access_token="abc123", | ||
| uid="user@example.com", | ||
| account="volopay", | ||
| volo_be_url="https://api.volopay.com" | ||
| ) | ||
| """ | ||
|
|
||
| import httpx | ||
|
|
||
|
|
||
| async def validate_token( | ||
| client: str, | ||
| access_token: str, | ||
| uid: str, | ||
| account: str, | ||
| volo_be_url: str, | ||
| x_feature: str | None = None, | ||
| ) -> bool: | ||
| """ | ||
| Validates a user's session token by calling volo-be's token validation endpoint. | ||
|
|
||
| This is used by satellite microservices (volo-agents, ocr-reader, etc.) to verify | ||
| that a frontend user's token is valid without maintaining their own user database. | ||
|
|
||
| Note: Environment-specific logic (e.g., skipping validation in test) should be | ||
| handled by the calling service, not here. This is a pure HTTP client function. | ||
|
|
||
| Args: | ||
| client: Client identifier (e.g., "web", "mobile") | ||
| access_token: User's session access token | ||
| uid: User identifier (typically email) | ||
| account: Account/organization identifier | ||
| volo_be_url: Base URL of volo-be (e.g., "https://api.volopay.com") | ||
| x_feature: Optional feature flag header | ||
|
|
||
| Returns: | ||
| True if the token is valid (volo-be returned 200), False otherwise. | ||
|
|
||
| Example: | ||
| >>> valid = await validate_token( | ||
| ... client="web", | ||
| ... access_token="eyJ0eXAi...", | ||
| ... uid="user@example.com", | ||
| ... account="volopay", | ||
| ... volo_be_url="https://api.volopay.com" | ||
| ... ) | ||
| >>> if valid: | ||
| ... # proceed with authenticated request | ||
| """ | ||
| headers = { | ||
| "client": client, | ||
| "access-token": access_token, | ||
| "uid": uid, | ||
| "account": account, | ||
| } | ||
|
|
||
| # Add optional x_feature header if provided | ||
| if x_feature: | ||
| headers["x_feature"] = x_feature | ||
|
|
||
| async with httpx.AsyncClient(base_url=volo_be_url, headers=headers) as http: | ||
| try: | ||
| response = await http.get("/api/v3/auth/user/validate_token") | ||
| return response.status_code == 200 | ||
| except httpx.HTTPError: | ||
| # Network errors, timeouts, etc. — treat as invalid token | ||
| return False | ||
|
|
||
|
|
||
| # Future: Two-layer auth builder | ||
| # When multiple microservices (volo-agents, ocr-reader, future services) all adopt | ||
| # the same two-layer auth pattern (shared secret + token validation fallback), | ||
| # consider adding a generic FastAPI dependency builder here: | ||
| # | ||
| # def create_two_layer_auth_dependency( | ||
| # service_name: str, # "agents", "ocr", etc. | ||
| # secret_env_var: str, # "VOLO_BE_AGENTS_SECRET" | ||
| # volo_be_url_env_var: str = "VOLO_BE_URL", | ||
| # env_var: str = "ENV" | ||
| # ) -> Callable: | ||
| # """ | ||
| # Creates a FastAPI Depends() function that enforces two-layer auth: | ||
| # 1. Checks volo_be_{service_name}_secret header against env var | ||
| # 2. Falls back to validate_token() if Layer 1 fails | ||
| # | ||
| # Returns: | ||
| # A FastAPI dependency (Depends-compatible callable) | ||
| # | ||
| # Example: | ||
| # from vp_core.clients import create_two_layer_auth_dependency | ||
| # | ||
| # AuthDep = create_two_layer_auth_dependency( | ||
| # service_name="agents", | ||
| # secret_env_var="VOLO_BE_AGENTS_SECRET" | ||
| # ) | ||
| # | ||
| # app.include_router(router, dependencies=[AuthDep]) | ||
| # """ | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,10 @@ | ||
| from .guardrails import PromptSanitizer, OutputGuardrail | ||
| from .hmac import REPLAY_WINDOW_SECONDS, sign_request, verify_request | ||
|
|
||
| __all__ = ["PromptSanitizer", "OutputGuardrail"] | ||
| __all__ = [ | ||
| "PromptSanitizer", | ||
| "OutputGuardrail", | ||
| "sign_request", | ||
| "verify_request", | ||
| "REPLAY_WINDOW_SECONDS", | ||
| ] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,100 @@ | ||
| """ | ||
| HMAC request signing utilities for service-to-service authentication. | ||
|
|
||
| Pure-function helpers for signing and verifying HTTP requests between | ||
| Volopay microservices. Shared across all Python services that need | ||
| Layer 1 (backend-to-backend) auth. | ||
|
|
||
| Usage (verifier side — e.g., volo-agents FastAPI dependency): | ||
|
|
||
| from vp_core.security import verify_request | ||
|
|
||
| body = await request.body() | ||
| ok = verify_request( | ||
| secret=SHARED_SECRET, | ||
| signature=request.headers["x-signature"], | ||
| method=request.method, | ||
| path=request.url.path, | ||
| body=body, | ||
| timestamp=int(request.headers["x-timestamp"]), | ||
| ) | ||
|
|
||
| Usage (sender side — if the sender is Python): | ||
|
|
||
| from vp_core.security import sign_request | ||
|
|
||
| signature, ts = sign_request(SHARED_SECRET, "POST", "/api/v1/x", body_bytes) | ||
| headers = {"X-Signature": signature, "X-Timestamp": str(ts)} | ||
|
|
||
| Non-Python senders (e.g., the Ruby volo-be client) implement their own | ||
| signing using OpenSSL::HMAC. The shared test vector in tests/test_hmac.py | ||
| enforces byte-identical output across languages. | ||
| """ | ||
|
|
||
| import hashlib | ||
| import hmac | ||
| import time | ||
|
|
||
| REPLAY_WINDOW_SECONDS = 60 | ||
|
|
||
|
|
||
| def sign_request( | ||
| secret: str, | ||
| method: str, | ||
| path: str, | ||
| body: bytes, | ||
| timestamp: int | None = None, | ||
| ) -> tuple[str, int]: | ||
| """ | ||
| Compute HMAC-SHA256 signature over (timestamp, method, path, body). | ||
|
|
||
| Args: | ||
| secret: Shared secret known to both sender and verifier | ||
| method: HTTP method (case-insensitive, normalized to uppercase) | ||
| path: Request path (e.g., "/api/v1/keywords/suggest") | ||
| body: Raw request body bytes (empty bytes for GET requests) | ||
| timestamp: Unix timestamp in seconds. If None, uses current time. | ||
|
|
||
| Returns: | ||
| (signature_hex, timestamp) — both travel as headers. | ||
| """ | ||
| ts = timestamp if timestamp is not None else int(time.time()) | ||
| prefix = f"{ts}\n{method.upper()}\n{path}\n".encode() | ||
| signature = hmac.new(secret.encode(), prefix + body, hashlib.sha256).hexdigest() | ||
| return signature, ts | ||
|
|
||
|
|
||
| def verify_request( | ||
| secret: str, | ||
| signature: str, | ||
| method: str, | ||
| path: str, | ||
| body: bytes, | ||
| timestamp: int, | ||
| window_seconds: int = REPLAY_WINDOW_SECONDS, | ||
| ) -> bool: | ||
| """ | ||
| Verify an incoming HMAC signature with replay protection. | ||
|
|
||
| Performs two checks: | ||
| 1. Timestamp is within ``window_seconds`` of now (replay protection) | ||
| 2. Recomputed signature matches the provided one (timing-safe) | ||
|
|
||
| Args: | ||
| secret: Shared secret known to both sender and verifier | ||
| signature: Hex-encoded signature from X-Signature header | ||
| method: HTTP method from the incoming request | ||
| path: Request path from the incoming request | ||
| body: Raw request body bytes | ||
| timestamp: Unix timestamp from X-Timestamp header | ||
| window_seconds: Max allowed clock drift. Defaults to 60s. | ||
|
|
||
| Returns: | ||
| True if valid, False on any failure (timestamp skew, signature | ||
| mismatch, wrong secret). Never raises. | ||
| """ | ||
| if abs(int(time.time()) - timestamp) > window_seconds: | ||
| return False | ||
|
|
||
| expected, _ = sign_request(secret, method, path, body, timestamp) | ||
| return hmac.compare_digest(signature, expected) |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel it should change base url as well. based on the request u are getting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this we discussed, will pass this from FE