Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion migrations_lockfile.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@

replays: 0007_organizationmember_replay_access

seer: 0023_add_seer_run_pull_request
seer: 0024_add_agent_write_grant

sentry: 1124_weeklyreportprojectexclusion

social_auth: 0003_social_auth_json_field

Check failure on line 37 in migrations_lockfile.txt

View check run for this annotation

@sentry/warden / warden: security-review

Agent token signing key defaults to empty string, enabling forged token authentication

`_signing_key()` returns `settings.SEER_API_SHARED_SECRET`, which defaults to `""`, with no empty-key guard. When the secret is unset (the default, e.g. self-hosted), `decode_agent_token` verifies HS256 JWTs with an empty key, so an attacker can forge a `sntryag_`-prefixed token for any `sub`/`org`/`scopes` and be authenticated by the globally-registered `AgentTokenAuthentication`. Mirror `viewer_context._get_signing_key`, which raises `ValueError` when no key is configured.
tempest: 0001_squashed_0003_use_encrypted_char_field

uptime: 0055_backfill_2xx_status_assertion
Expand Down
37 changes: 36 additions & 1 deletion src/sentry/api/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from django.urls import resolve
from django.utils.crypto import constant_time_compare
from django.utils.encoding import force_str
from jwt import PyJWTError
from rest_framework.authentication import (
BaseAuthentication,
BasicAuthentication,
Expand Down Expand Up @@ -45,6 +46,7 @@
from sentry.sentry_apps.models.sentry_app_installation import SentryAppInstallation
from sentry.sentry_apps.token_exchange.util import GrantTypes
from sentry.silo.base import SiloLimit, SiloMode
from sentry.types.token import SENTRY_AGENT_TOKEN_PREFIX
from sentry.users.models.user import User
from sentry.users.services.user import RpcUser
from sentry.users.services.user.service import user_service
Expand Down Expand Up @@ -520,7 +522,7 @@
return True

token_str = force_str(auth[1])
return not token_str.startswith(SENTRY_ORG_AUTH_TOKEN_PREFIX)
return not token_str.startswith((SENTRY_ORG_AUTH_TOKEN_PREFIX, SENTRY_AGENT_TOKEN_PREFIX))

def authenticate_token(self, request: Request, token_str: str) -> tuple[Any, Any]:
user: AnonymousUser | User | RpcUser | None = AnonymousUser()
Expand Down Expand Up @@ -605,6 +607,39 @@
)


@AuthenticationSiloLimit(SiloMode.CELL, SiloMode.CONTROL)
class AgentTokenAuthentication(StandardAuthentication):
"""Authenticates the Seer agent's short-lived capability token: a Sentry-signed JWT
(see ``sentry.seer.agent_token``) carrying the ``sntryag_`` prefix, not a stored
``ApiToken``."""

token_name = b"bearer"

def accepts_auth(self, auth: list[bytes]) -> bool:
if not super().accepts_auth(auth) or len(auth) != 2:
return False
return force_str(auth[1]).startswith(SENTRY_AGENT_TOKEN_PREFIX)

def authenticate_token(self, request: Request, token_str: str) -> tuple[Any, Any]:
from sentry.seer import agent_token

try:
claims = agent_token.decode_agent_token(token_str)
user_id = int(claims["sub"])
# Building the token casts org and scopes too, so any missing/mis-typed claim

Check failure on line 629 in src/sentry/api/authentication.py

View check run for this annotation

@sentry/warden / warden: security-review

[MWJ-369] Agent token signing key defaults to empty string, enabling forged token authentication (additional location)

`_signing_key()` returns `settings.SEER_API_SHARED_SECRET`, which defaults to `""`, with no empty-key guard. When the secret is unset (the default, e.g. self-hosted), `decode_agent_token` verifies HS256 JWTs with an empty key, so an attacker can forge a `sntryag_`-prefixed token for any `sub`/`org`/`scopes` and be authenticated by the globally-registered `AgentTokenAuthentication`. Mirror `viewer_context._get_signing_key`, which raises `ValueError` when no key is configured.
# in a signed token is a clean 401 here, not a 500 downstream.
auth_token = agent_token.build_authenticated_token(claims)
except (PyJWTError, KeyError, ValueError, TypeError):
raise AuthenticationFailed("Invalid agent token")

user = user_service.get_user(user_id=user_id)
if user is None or not user.is_active or getattr(user, "is_suspended", False):
raise AuthenticationFailed("Invalid agent token")

agent_token.mark_agent_request(request, claims)
return self.transform_auth(user, auth_token, "api_token", api_token_type=self.token_name)


@AuthenticationSiloLimit(SiloMode.CONTROL, SiloMode.CELL)
class OrgAuthTokenAuthentication(StandardAuthentication):
token_name = b"bearer"
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/api/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
get_paginator,
)
Comment thread
sentry-warden[bot] marked this conversation as resolved.
from .authentication import (
AgentTokenAuthentication,

Check failure on line 70 in src/sentry/api/base.py

View check run for this annotation

@sentry/warden / warden: security-review

Agent capability JWT signed with Seer's shared secret, letting the Seer agent forge its own write-scoped tokens

The agent capability token is HMAC-signed and verified with `SEER_API_SHARED_SECRET` (`agent_token._signing_key()`), the same symmetric secret the Seer service already holds to sign X-Viewer-Context JWTs and authenticate Seer→Sentry calls; the Seer agent can therefore forge a valid `sntryag_` token with any `sub`, `org`, and `scopes`, bypassing the user-approval write gate the flow is meant to enforce. Sign these tokens with a key only Sentry holds (asymmetric RS256 with Sentry as sole private-key holder, or a dedicated Sentry-only `AGENT_TOKEN_SIGNING_SECRET`).
ApiKeyAuthentication,
OrgAuthTokenAuthentication,
UserAuthTokenAuthentication,
Expand Down Expand Up @@ -105,6 +106,7 @@
DEFAULT_AUTHENTICATION = (
UserAuthTokenAuthentication,
OrgAuthTokenAuthentication,
AgentTokenAuthentication,
Comment thread
sentry-warden[bot] marked this conversation as resolved.
ApiKeyAuthentication,
ViewerContextAuthentication,
SessionAuthentication,
Expand Down
10 changes: 10 additions & 0 deletions src/sentry/api/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any

from rest_framework.exceptions import PermissionDenied
from rest_framework.permissions import SAFE_METHODS, BasePermission, IsAuthenticated # noqa: S012
from rest_framework.request import Request

Expand All @@ -26,6 +27,7 @@
RpcUserOrganizationContext,
organization_service,
)
from sentry.seer import agent_token
Comment thread
sentry-warden[bot] marked this conversation as resolved.
from sentry.utils import auth

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -179,6 +181,14 @@ def determine_access(
organization = org_context.organization
extra = {"organization_id": organization.id, "user_id": user_id}

agent_claims = agent_token.get_agent_claims(request)
if agent_claims is not None and int(agent_claims["org"]) != organization.id:
# An agent token is bound to the org it was minted for: its scopes (including
# any user-granted write) were de-escalated for *that* org only. Never honor it
# against a different org. This is the single access-assembly chokepoint, so the
# binding holds for every permission class that derives access here.
raise PermissionDenied

if request.auth:
if request.user and request.user.is_authenticated:
request.access = access.from_request_org_and_scopes(
Expand Down
12 changes: 12 additions & 0 deletions src/sentry/api/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,8 @@
from sentry.seer.endpoints.group_autofix_repos import GroupAutofixReposEndpoint
from sentry.seer.endpoints.group_autofix_setup_check import GroupAutofixSetupCheck
from sentry.seer.endpoints.issue_view_title_generate import IssueViewTitleGenerateEndpoint
from sentry.seer.endpoints.organization_agent_approve import OrganizationAgentApproveEndpoint
from sentry.seer.endpoints.organization_agent_token import OrganizationAgentTokenEndpoint
from sentry.seer.endpoints.organization_autofix_automation_settings import (
OrganizationAutofixAutomationSettingsEndpoint,
)
Expand Down Expand Up @@ -1693,6 +1695,16 @@ def create_group_urls(name_prefix: str) -> list[URLPattern | URLResolver]:
OrganizationEventsAnomaliesEndpoint.as_view(),
name="sentry-api-0-organization-events-anomalies",
),
re_path(
r"^(?P<organization_id_or_slug>[^/]+)/agent/token/$",
OrganizationAgentTokenEndpoint.as_view(),
name="sentry-api-0-organization-agent-token",
),
re_path(
r"^(?P<organization_id_or_slug>[^/]+)/agent/approve/$",
OrganizationAgentApproveEndpoint.as_view(),
name="sentry-api-0-organization-agent-approve",
),
re_path(
r"^(?P<organization_id_or_slug>[^/]+)/traces/$",
OrganizationTracesEndpoint.as_view(),
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/features/temporary.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,9 @@
# Enable sentry source code search tool
manager.add("organizations:seer-agent-source-code-search", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True)
# Enable code mode tools (sentry_api_search/execute) in Seer Agent
manager.add("organizations:seer-explorer-code-mode-tools", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True)

Check failure on line 261 in src/sentry/features/temporary.py

View check run for this annotation

@sentry/warden / warden: security-review

[L78-WLX] Agent capability JWT signed with Seer's shared secret, letting the Seer agent forge its own write-scoped tokens (additional location)

The agent capability token is HMAC-signed and verified with `SEER_API_SHARED_SECRET` (`agent_token._signing_key()`), the same symmetric secret the Seer service already holds to sign X-Viewer-Context JWTs and authenticate Seer→Sentry calls; the Seer agent can therefore forge a valid `sntryag_` token with any `sub`, `org`, and `scopes`, bypassing the user-approval write gate the flow is meant to enforce. Sign these tokens with a key only Sentry holds (asymmetric RS256 with Sentry as sole private-key holder, or a dedicated Sentry-only `AGENT_TOKEN_SIGNING_SECRET`).
# Gate Seer agent writes behind short-lived, scope-bound capability tokens + user-approved grants
manager.add("organizations:seer-agent-token-flow", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False)
# Enable code mode tools for Slack-initiated Explorer sessions
manager.add("organizations:seer-slack-code-mode", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False)
# Enable the thinking blocks toggle in the Seer Agent top bar
Expand Down
154 changes: 154 additions & 0 deletions src/sentry/seer/agent_token.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
"""Short-lived, scope-bound capability tokens for the Seer agent.

Tokens are signed JWTs, not stored (verified by signature/claims, re-minted on demand);
only :class:`SeerAgentWriteGrant`, the durable record of user consent, persists.
"""

from __future__ import annotations

from collections.abc import Iterable
from datetime import datetime, timedelta
from typing import Any

from django.conf import settings
from django.db import router, transaction
from django.utils import timezone
from rest_framework.request import Request

from sentry.auth.services.auth import AuthenticatedToken
from sentry.seer.models.agent_write_grant import DEFAULT_EXPIRATION, SeerAgentWriteGrant
from sentry.types.token import SENTRY_AGENT_TOKEN_PREFIX
from sentry.utils import jwt

FEATURE_FLAG = "organizations:seer-agent-token-flow"

# Distinct audience so the token can't be replayed against another audience that shares the
# signing secret (e.g. X-Viewer-Context JWTs).
AGENT_TOKEN_AUDIENCE = "sentry-agent-api"

# TTL is the only bound on a leaked token, so keep it short.
DEFAULT_TOKEN_TTL = timedelta(minutes=5)

# Set when an agent token authenticates; read by the cross-org binding guard.
_REQUEST_CLAIMS_ATTR = "_agent_token_claims"


def _signing_key() -> str:
return settings.SEER_API_SHARED_SECRET

Check failure on line 37 in src/sentry/seer/agent_token.py

View check run for this annotation

@sentry/warden / warden: security-review

[L78-WLX] Agent capability JWT signed with Seer's shared secret, letting the Seer agent forge its own write-scoped tokens (additional location)

The agent capability token is HMAC-signed and verified with `SEER_API_SHARED_SECRET` (`agent_token._signing_key()`), the same symmetric secret the Seer service already holds to sign X-Viewer-Context JWTs and authenticate Seer→Sentry calls; the Seer agent can therefore forge a valid `sntryag_` token with any `sub`, `org`, and `scopes`, bypassing the user-approval write gate the flow is meant to enforce. Sign these tokens with a key only Sentry holds (asymmetric RS256 with Sentry as sole private-key holder, or a dedicated Sentry-only `AGENT_TOKEN_SIGNING_SECRET`).


def readonly_scopes() -> frozenset[str]:
# Not demo_mode.get_readonly_scopes(): that also allows project:releases, a write.
return frozenset(settings.SENTRY_READONLY_SCOPES)


def active_grant_scopes(organization_id: int, user_id: int, session_id: str) -> set[str]:
"""Unexpired scopes the user approved for the agent in this org + session. Keyed on
authenticated identity, never client input."""
scopes: set[str] = set()
grants = SeerAgentWriteGrant.objects.filter(
organization_id=organization_id,
user_id=user_id,
agent_session_id=session_id,
expires_at__gt=timezone.now(),
)
for grant in grants:
scopes.update(grant.get_scopes())
return scopes


def compute_token_scopes(
caller_scopes: Iterable[str],
organization_id: int,
user_id: int,
session_id: str,
requested_scopes: Iterable[str] | None = None,
) -> list[str]:
"""De-escalation rule: ``caller_scopes ∩ (read-only ∪ approved grants)``, optionally
narrowed by ``requested_scopes``. Never exceeds the caller's own authority."""
caller = set(caller_scopes)
allowed = readonly_scopes() | active_grant_scopes(organization_id, user_id, session_id)
effective = caller & allowed
if requested_scopes is not None:
effective &= set(requested_scopes)
return sorted(effective)


def encode_agent_token(
*,
user_id: int,
organization_id: int,
scopes: Iterable[str],
session_id: str,
ttl: timedelta = DEFAULT_TOKEN_TTL,
) -> tuple[str, datetime]:
"""Mint a signed agent token. Returns the JWT and its expiry. No DB write."""
now = timezone.now()
expires_at = now + ttl
payload = {
"aud": AGENT_TOKEN_AUDIENCE,
"sub": str(user_id),
"org": organization_id,
"scopes": sorted(scopes),
"sid": session_id,
"iat": int(now.timestamp()),
"exp": int(expires_at.timestamp()),
}
token = SENTRY_AGENT_TOKEN_PREFIX + jwt.encode(payload, _signing_key(), algorithm="HS256")
return token, expires_at


def decode_agent_token(token_str: str) -> dict[str, Any]:
"""Verify signature, ``exp`` and ``aud``; return the claims. Raises a pyjwt error on any
invalid token."""
if not token_str.startswith(SENTRY_AGENT_TOKEN_PREFIX):
raise jwt.DecodeError("not an agent token")
return jwt.decode(
token_str.removeprefix(SENTRY_AGENT_TOKEN_PREFIX),
_signing_key(),
audience=AGENT_TOKEN_AUDIENCE,
algorithms=["HS256"],
)


def build_authenticated_token(claims: dict[str, Any]) -> AuthenticatedToken:
# kind="api_token" routes the token through the ordinary token-scope path (scope
# intersection with the member's role).
return AuthenticatedToken(
kind="api_token",
scopes=list(claims.get("scopes", [])),
user_id=int(claims["sub"]),
organization_id=int(claims["org"]),
)


def mark_agent_request(request: Request, claims: dict[str, Any]) -> None:
setattr(request, _REQUEST_CLAIMS_ATTR, claims)


def get_agent_claims(request: Request) -> dict[str, Any] | None:
return getattr(request, _REQUEST_CLAIMS_ATTR, None)


def create_write_grant(
*, organization_id: int, user_id: int, session_id: str, scopes: Iterable[str]
) -> SeerAgentWriteGrant:
"""Merge ``scopes`` into the single grant for ``(org, user, session)`` and refresh its
expiry, creating it if absent. The caller MUST have already capped ``scopes`` to the
approving user's own authority. The unique constraint plus row lock keep concurrent
approvals from racing."""
with transaction.atomic(using=router.db_for_write(SeerAgentWriteGrant)):
grant, created = SeerAgentWriteGrant.objects.select_for_update().get_or_create(
organization_id=organization_id,
user_id=user_id,
agent_session_id=session_id,
defaults={
"scope_list": sorted(scopes),
"expires_at": timezone.now() + DEFAULT_EXPIRATION,
},
)
if not created:
grant.scope_list = sorted(set(grant.get_scopes()) | set(scopes))
grant.expires_at = timezone.now() + DEFAULT_EXPIRATION
grant.save(update_fields=["scope_list", "expires_at", "date_updated"])
return grant
Loading
Loading