Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/opencode_github/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
"""OpenCode GitHub integration helpers."""

from opencode_github.authorization import (
check_event_authorization,
is_authorized,
permission_rank,
)
from opencode_github.gamified_learning import (
AnalysisResult,
Assumption,
Expand All @@ -16,4 +21,7 @@
"DifficultyLevel",
"LearnerProfile",
"LearningChallenge",
"check_event_authorization",
"is_authorized",
"permission_rank",
]
88 changes: 88 additions & 0 deletions src/opencode_github/authorization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""Author-permission gating for slash commands.

Prevents unauthorised users from triggering the OpenCode agent by
checking the commenter's repository permission level before processing
any extracted commands.
"""

from __future__ import annotations

from opencode_github.github_client import GitHubClient
from opencode_github.webhook_handler import WebhookEvent

# GitHub permission levels in descending order of privilege.
_PERMISSION_RANK: dict[str, int] = {
"admin": 4,
"maintain": 3,
"write": 2,
"triage": 1,
"read": 0,
"none": -1,
}

DEFAULT_MIN_PERMISSION = "write"


def permission_rank(level: str) -> int:
"""Return a numeric rank for a GitHub permission level.

Unknown levels are treated as ``"none"`` (rank ``-1``).
"""
return _PERMISSION_RANK.get(level.lower(), -1)


def is_authorized(permission: str, min_level: str = DEFAULT_MIN_PERMISSION) -> bool:
"""Return ``True`` when *permission* meets or exceeds *min_level*."""
return permission_rank(permission) >= permission_rank(min_level)


async def check_event_authorization(
client: GitHubClient,
event: WebhookEvent,
min_level: str = DEFAULT_MIN_PERMISSION,
post_denial: bool = True,
) -> bool:
"""Check whether the sender of *event* is authorized to run commands.

Parameters
----------
client:
Authenticated GitHub API client.
event:
The parsed webhook event whose ``sender_login`` will be checked.
min_level:
Minimum required permission (default ``"write"``).
post_denial:
When ``True`` and the user is *not* authorized, post a polite
comment explaining that the command was ignored.

Returns
-------
bool
``True`` if the user has sufficient permissions.
"""
try:
permission = await client.get_user_permission(
event.repo_owner, event.repo_name, event.sender_login
)
except Exception:
# If we cannot determine permission (API error, network timeout, etc.),
# deny by default.
permission = "none"

authorized = is_authorized(permission, min_level)

if not authorized and post_denial:
body = (
f"@{event.sender_login} Sorry, you need **{min_level}** permission "
f"(or higher) on this repository to use slash commands. "
f"Your current permission level is **{permission}**."
)
try:
await client.create_issue_comment(
event.repo_owner, event.repo_name, event.issue_number, body
)
except Exception:
pass # Best-effort; don't fail the whole flow.

return authorized
11 changes: 11 additions & 0 deletions src/opencode_github/github_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,5 +134,16 @@ async def add_reaction(
json={"content": reaction},
)

async def get_user_permission(self, owner: str, repo: str, username: str) -> str:
"""Return the permission level of *username* on *owner/repo*.

Returns one of ``"admin"``, ``"maintain"``, ``"write"``,
``"triage"``, ``"read"``, or ``"none"``.
"""
data = await self._request(
"GET", f"/repos/{owner}/{repo}/collaborators/{username}/permission"
)
return data.get("permission", "none")
Comment on lines +137 to +146

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Permission rank map includes levels that may not be returned by the API field being read

The _PERMISSION_RANK dict at src/opencode_github/authorization.py:14-21 defines 6 levels including "maintain" and "triage". The get_user_permission method reads data["permission"] from the GitHub API response (src/opencode_github/github_client.py:146). The GitHub REST API's permission field on the collaborator permission endpoint historically returns only the 4-level model ("admin", "write", "read", "none"), while the more granular "maintain" and "triage" levels are only available via the role_name response field. If this is still the case, then setting min_level="maintain" would incorrectly deny maintain-role users (they'd get "write" from the API, rank 2 < rank 3). The docstring for get_user_permission claims it can return "maintain" and "triage", which would be inaccurate. This needs verification against the current GitHub API behavior for the 2022-11-28 API version.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


async def get_repo(self, owner: str, repo: str) -> dict[str, Any]:
return await self._request("GET", f"/repos/{owner}/{repo}")
170 changes: 170 additions & 0 deletions tests/test_authorization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
"""Tests for opencode_github.authorization."""

from __future__ import annotations

import httpx
import pytest
import respx

from opencode_github.authorization import (
check_event_authorization,
is_authorized,
permission_rank,
)
from opencode_github.github_client import GitHubClient
from opencode_github.webhook_handler import EventType, WebhookEvent

BASE = "https://api.github.com"


def _make_event(sender: str = "alice") -> WebhookEvent:
return WebhookEvent(
event_type=EventType.ISSUE_COMMENT,
action="created",
comment_body="/oc help",
comment_id=42,
sender_login=sender,
repo_owner="owner",
repo_name="repo",
issue_number=7,
raw_payload={},
)


@pytest.fixture()
def mock_router() -> respx.MockRouter:
with respx.mock(base_url=BASE, assert_all_called=False) as router:
yield router


@pytest.fixture()
def client() -> GitHubClient:
return GitHubClient(token="test-token", base_url=BASE, timeout=5)


class TestPermissionRank:
def test_known_levels(self) -> None:
assert permission_rank("admin") == 4
assert permission_rank("maintain") == 3
assert permission_rank("write") == 2
assert permission_rank("triage") == 1
assert permission_rank("read") == 0
assert permission_rank("none") == -1

def test_case_insensitive(self) -> None:
assert permission_rank("ADMIN") == 4
assert permission_rank("Write") == 2

def test_unknown_treated_as_none(self) -> None:
assert permission_rank("unknown") == -1
assert permission_rank("") == -1


class TestIsAuthorized:
def test_admin_always_passes(self) -> None:
assert is_authorized("admin", "write") is True
assert is_authorized("admin", "admin") is True

def test_write_meets_write(self) -> None:
assert is_authorized("write", "write") is True

def test_read_below_write(self) -> None:
assert is_authorized("read", "write") is False

def test_none_below_everything(self) -> None:
assert is_authorized("none", "read") is False

def test_triage_below_write(self) -> None:
assert is_authorized("triage", "write") is False

def test_maintain_above_write(self) -> None:
assert is_authorized("maintain", "write") is True

def test_custom_min_level_read(self) -> None:
assert is_authorized("read", "read") is True
assert is_authorized("none", "read") is False


class TestCheckEventAuthorization:
async def test_authorized_user(
self, mock_router: respx.MockRouter, client: GitHubClient
) -> None:
mock_router.get("/repos/owner/repo/collaborators/alice/permission").mock(
return_value=httpx.Response(200, json={"permission": "write"})
)
event = _make_event("alice")
result = await check_event_authorization(client, event)
assert result is True

async def test_admin_authorized(
self, mock_router: respx.MockRouter, client: GitHubClient
) -> None:
mock_router.get("/repos/owner/repo/collaborators/bob/permission").mock(
return_value=httpx.Response(200, json={"permission": "admin"})
)
event = _make_event("bob")
result = await check_event_authorization(client, event)
assert result is True

async def test_read_denied_posts_comment(
self, mock_router: respx.MockRouter, client: GitHubClient
) -> None:
mock_router.get("/repos/owner/repo/collaborators/eve/permission").mock(
return_value=httpx.Response(200, json={"permission": "read"})
)
comment_route = mock_router.post("/repos/owner/repo/issues/7/comments").mock(
return_value=httpx.Response(
201,
json={
"id": 99,
"body": "denied",
"user": {"login": "bot"},
"html_url": "https://github.com/owner/repo/issues/7#issuecomment-99",
},
)
)
event = _make_event("eve")
result = await check_event_authorization(client, event)
assert result is False
assert comment_route.called

async def test_denied_no_comment_when_disabled(
self, mock_router: respx.MockRouter, client: GitHubClient
) -> None:
mock_router.get("/repos/owner/repo/collaborators/eve/permission").mock(
return_value=httpx.Response(200, json={"permission": "read"})
)
event = _make_event("eve")
result = await check_event_authorization(client, event, post_denial=False)
assert result is False

async def test_api_error_defaults_to_denied(
self, mock_router: respx.MockRouter, client: GitHubClient
) -> None:
mock_router.get("/repos/owner/repo/collaborators/ghost/permission").mock(
return_value=httpx.Response(403, json={"message": "Forbidden"})
)
mock_router.post("/repos/owner/repo/issues/7/comments").mock(
return_value=httpx.Response(
201,
json={
"id": 100,
"body": "denied",
"user": {"login": "bot"},
"html_url": "https://github.com/owner/repo/issues/7#issuecomment-100",
},
)
)
event = _make_event("ghost")
result = await check_event_authorization(client, event)
assert result is False

async def test_custom_min_level(
self, mock_router: respx.MockRouter, client: GitHubClient
) -> None:
mock_router.get("/repos/owner/repo/collaborators/alice/permission").mock(
return_value=httpx.Response(200, json={"permission": "read"})
)
event = _make_event("alice")
result = await check_event_authorization(client, event, min_level="read")
assert result is True
Loading