diff --git a/AGENTS.md b/AGENTS.md index 486c1c5..78e1ad1 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -65,4 +65,5 @@ tools = ["claude", "cursor"] ### Always Do - agr and agrx should always be unified and synced. - include in the plan to write tests for what is implemented -- Save all skills in `skills/` directory (not `.claude/skills/` which is gitignored) \ No newline at end of file +- Save all skills in `skills/` directory (not `.claude/skills/` which is gitignored) +- Run all checks and tests before creating PR or commiting any code. \ No newline at end of file diff --git a/CLAUDE.md b/CLAUDE.md index c801768..e11e00d 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -66,4 +66,5 @@ tools = ["claude", "cursor"] ### Always Do - agr and agrx should always be unified and synced. - include in the plan to write tests for what is implemented -- Save all skills in `skills/` directory (not `.claude/skills/` which is gitignored) \ No newline at end of file +- Save all skills in `skills/` directory (not `.claude/skills/` which is gitignored) +- Run all checks and tests before creating PR or commiting any code. \ No newline at end of file diff --git a/agr.lock b/agr.lock index 4c2c3a1..cd85eb4 100644 --- a/agr.lock +++ b/agr.lock @@ -30,27 +30,27 @@ installed-name = "github-issue-triage" [[ralph]] handle = "computerlovetech/research/conduct-research" source = "github" -commit = "637025a5e221264845bff7e6a89aaba7a7bdb919" +commit = "36b92ec43ded18b9609e69c87fe5696346b6af27" content-hash = "sha256:3a9c1ef64234ccb61d9b681afc4305ffadad5735f0536f77759a41366e75c3a2" installed-name = "conduct-research" [[ralph]] handle = "computerlovetech/ralphs/improve-codebase" source = "github" -commit = "70a0dabf5a59ae1a3acaf50a9e619c83e174a35f" -content-hash = "sha256:0afc55d5b2d4061fa82c270a147c66c8bc21589ab62c47d9747ddb5f382c708d" +commit = "82c5c4dd1e2e244e86e2ddad3f987b4d9a6d27f2" +content-hash = "sha256:637929bdfe382116efaac98e3a46cfac1de4978978b9f1d984c51d65b49d408a" installed-name = "improve-codebase" [[ralph]] handle = "computerlovetech/ralphs/bug-hunter" source = "github" -commit = "70a0dabf5a59ae1a3acaf50a9e619c83e174a35f" +commit = "82c5c4dd1e2e244e86e2ddad3f987b4d9a6d27f2" content-hash = "sha256:ab2e4924cbb999f677523211af9cfda48ce8433b17545854bd770f29d7664c64" installed-name = "bug-hunter" [[ralph]] handle = "computerlovetech/ralphs/security" source = "github" -commit = "70a0dabf5a59ae1a3acaf50a9e619c83e174a35f" +commit = "82c5c4dd1e2e244e86e2ddad3f987b4d9a6d27f2" content-hash = "sha256:2b025b4e5546f2a04367a79e5f4201892bc155d22f53a55cfc0cd21071922de1" installed-name = "security" diff --git a/agr/auth.py b/agr/auth.py new file mode 100644 index 0000000..653a005 --- /dev/null +++ b/agr/auth.py @@ -0,0 +1,258 @@ +import json +import os +from dataclasses import dataclass +from pathlib import Path +from typing import Callable, Protocol + +from agr.exceptions import AgrError + +AUTH_DIR_NAME = ".agr" +AUTH_FILE_NAME = "auth.json" +AUTH_FILE_MODE = 0o600 + + +@dataclass(frozen=True) +class StoredGitHubCredential: + method: str + token: str + username: str | None = None + + +@dataclass(frozen=True) +class StoredToken: + token: str + + +@dataclass(frozen=True) +class AuthStatus: + authenticated: bool + source: str | None + method: str | None = None + + +@dataclass(frozen=True) +class DeviceAuthorization: + device_code: str + user_code: str + verification_uri: str + expires_in: int + interval: int + + +class CredentialStore(Protocol): + def read_credential(self) -> StoredGitHubCredential | None: ... + + def write_credential(self, credential: StoredGitHubCredential) -> None: ... + + def delete_token(self) -> bool: ... + + +class TokenStore(CredentialStore, Protocol): + def read_token(self) -> str | None: ... + + def write_token(self, token: str) -> None: ... + + +class GitHubLoginStrategy(Protocol): + def login(self) -> StoredGitHubCredential: ... + + +class DeviceOAuthClient(Protocol): + def request_device_authorization(self) -> DeviceAuthorization: ... + + def poll_for_token(self, authorization: DeviceAuthorization) -> str: ... + + +class AuthStatusChecker(Protocol): + def get_status(self) -> AuthStatus: ... + + +DevicePromptHandler = Callable[[DeviceAuthorization], None] +StringPrompt = Callable[[], str] + + +class FileTokenStore: + def __init__(self, auth_file: Path | None = None) -> None: + self.auth_file = auth_file or default_auth_file() + + def read_credential(self) -> StoredGitHubCredential | None: + if not self.auth_file.exists(): + return None + try: + data = json.loads(self.auth_file.read_text()) + except (OSError, json.JSONDecodeError): + return None + token = data.get("github_token") + if not isinstance(token, str) or not token.strip(): + return None + method = data.get("method") + username = data.get("username") + normalized_method = ( + method if isinstance(method, str) and method.strip() else "oauth" + ) + normalized_username = ( + username.strip() if isinstance(username, str) and username.strip() else None + ) + return StoredGitHubCredential( + method=normalized_method.strip(), + token=token.strip(), + username=normalized_username, + ) + + def write_credential(self, credential: StoredGitHubCredential) -> None: + token = credential.token.strip() + method = credential.method.strip() + username = credential.username.strip() if credential.username else None + if not token: + raise AgrError("Cannot store an empty GitHub token.") + if not method: + raise AgrError("Cannot store a GitHub credential without a method.") + if method == "username_password" and not username: + raise AgrError( + "Cannot store a username/password credential without a username." + ) + data: dict[str, str] = {"github_token": token, "method": method} + if username: + data["username"] = username + self.auth_file.parent.mkdir(parents=True, exist_ok=True) + fd = os.open( + self.auth_file, + os.O_WRONLY | os.O_CREAT | os.O_TRUNC, + AUTH_FILE_MODE, + ) + try: + with os.fdopen(fd, "w") as file: + json.dump(data, file) + finally: + os.chmod(self.auth_file, AUTH_FILE_MODE) + + def read_token(self) -> str | None: + credential = self.read_credential() + return credential.token if credential else None + + def write_token(self, token: str) -> None: + self.write_credential(StoredGitHubCredential(method="oauth", token=token)) + + def delete_token(self) -> bool: + try: + self.auth_file.unlink() + except FileNotFoundError: + return False + return True + + +class GitHubAuthStatusChecker: + def __init__( + self, + store: CredentialStore | None = None, + env: dict[str, str] | None = None, + ) -> None: + self.store = store or FileTokenStore() + self.env = env + + def get_status(self) -> AuthStatus: + environ = self.env if self.env is not None else os.environ + for env_var in ("GITHUB_TOKEN", "GH_TOKEN"): + token = environ.get(env_var, "") + if token.strip(): + return AuthStatus(authenticated=True, source=env_var, method="env") + credential = self.store.read_credential() + if credential: + return AuthStatus( + authenticated=True, source="stored", method=credential.method + ) + return AuthStatus(authenticated=False, source=None, method=None) + + +class UsernamePasswordGitHubLoginStrategy: + def __init__( + self, + username: str | None = None, + password: str | None = None, + username_prompt: StringPrompt | None = None, + password_prompt: StringPrompt | None = None, + ) -> None: + self.username = username + self.password = password + self.username_prompt = username_prompt + self.password_prompt = password_prompt + + def login(self) -> StoredGitHubCredential: + username = ( + self.username if self.username is not None else self._prompt_username() + ) + password = ( + self.password if self.password is not None else self._prompt_password() + ) + normalized_username = username.strip() + normalized_password = password.strip() + if not normalized_username: + raise AgrError("GitHub username cannot be empty.") + if not normalized_password: + raise AgrError("GitHub password or token cannot be empty.") + return StoredGitHubCredential( + method="username_password", + token=normalized_password, + username=normalized_username, + ) + + def _prompt_username(self) -> str: + if self.username_prompt is None: + raise AgrError("GitHub username prompt is not configured.") + return self.username_prompt() + + def _prompt_password(self) -> str: + if self.password_prompt is None: + raise AgrError("GitHub password prompt is not configured.") + return self.password_prompt() + + +class OAuthGitHubLoginStrategy: + def __init__( + self, + oauth_client: DeviceOAuthClient, + prompt_handler: DevicePromptHandler | None = None, + ) -> None: + self.oauth_client = oauth_client + self.prompt_handler = prompt_handler + + def login(self) -> StoredGitHubCredential: + authorization = self.oauth_client.request_device_authorization() + if self.prompt_handler: + self.prompt_handler(authorization) + token = self.oauth_client.poll_for_token(authorization) + return StoredGitHubCredential(method="oauth", token=token) + + +def default_auth_file() -> Path: + return Path.home() / AUTH_DIR_NAME / AUTH_FILE_NAME + + +def read_stored_github_credential( + store: CredentialStore | None = None, +) -> StoredGitHubCredential | None: + return (store or FileTokenStore()).read_credential() + + +def read_stored_github_token(store: TokenStore | None = None) -> str | None: + return (store or FileTokenStore()).read_token() + + +def login( + strategy: GitHubLoginStrategy, + store: CredentialStore | None = None, +) -> StoredGitHubCredential: + token_store = store or FileTokenStore() + credential = strategy.login() + token_store.write_credential(credential) + return credential + + +def status( + store: CredentialStore | None = None, env: dict[str, str] | None = None +) -> AuthStatus: + return GitHubAuthStatusChecker(store, env).get_status() + + +def logout(store: CredentialStore | None = None) -> bool: + return (store or FileTokenStore()).delete_token() diff --git a/agr/commands/remove.py b/agr/commands/remove.py index dbbb0a8..baab8d8 100644 --- a/agr/commands/remove.py +++ b/agr/commands/remove.py @@ -58,7 +58,9 @@ def _uninstall_from_filesystem( Returns True if anything was removed. """ if is_ralph: - return uninstall_ralph(handle, repo_root, source_name, default_repo=default_repo) + return uninstall_ralph( + handle, repo_root, source_name, default_repo=default_repo + ) removed = False for tool in tools: if uninstall_skill( diff --git a/agr/git.py b/agr/git.py index 5032043..a548fc4 100644 --- a/agr/git.py +++ b/agr/git.py @@ -1,5 +1,6 @@ """Git operations for downloading and preparing repositories.""" +import base64 import hashlib import os import re @@ -10,6 +11,7 @@ from contextlib import contextmanager from pathlib import Path from collections.abc import Generator +from agr.auth import StoredGitHubCredential, read_stored_github_credential from agr.exceptions import ( AgrError, AuthenticationError, @@ -52,18 +54,19 @@ def _build_github_auth_env() -> dict[str, str]: Returns: Dict of env var overrides. Empty when no token is available. """ - token = get_github_token() - if not token: + credential = get_github_credential() + if not credential: return {} try: existing_count = int(os.environ.get("GIT_CONFIG_COUNT", "0")) except ValueError: existing_count = 0 + basic_token = _build_basic_auth_token(credential) return { "GIT_CONFIG_COUNT": str(existing_count + 1), f"GIT_CONFIG_KEY_{existing_count}": ("http.https://github.com/.extraheader"), - f"GIT_CONFIG_VALUE_{existing_count}": (f"AUTHORIZATION: bearer {token}"), + f"GIT_CONFIG_VALUE_{existing_count}": (f"AUTHORIZATION: basic {basic_token}"), } @@ -87,7 +90,7 @@ def _run_git(cmd: list[str]) -> subprocess.CompletedProcess[str]: AgrError: If git cannot be executed (e.g., not installed). """ auth_env = _build_github_auth_env() - env = {**os.environ, **auth_env} if auth_env else None + env = {**os.environ, "GIT_TERMINAL_PROMPT": "0", **auth_env} try: return subprocess.run( cmd, @@ -125,19 +128,32 @@ def _run_git_checked( return result -def get_github_token() -> str | None: - """Get GitHub token from environment. - - Checks GITHUB_TOKEN first, then falls back to GH_TOKEN (used by gh CLI). - - Returns: - Token string if set and non-empty, None otherwise. - """ +def get_github_credential() -> StoredGitHubCredential | None: for env_var in ("GITHUB_TOKEN", "GH_TOKEN"): token = os.environ.get(env_var, "") if token.strip(): - return token.strip() - return None + return StoredGitHubCredential( + method="env", + token=token.strip(), + username="x-access-token", + ) + return read_stored_github_credential() + + +def get_github_token() -> str | None: + credential = get_github_credential() + return credential.token if credential else None + + +def _build_basic_auth_token(credential: StoredGitHubCredential) -> str: + username = ( + credential.username + if credential.method == "username_password" + else "x-access-token" + ) + if credential.method == "username_password" and not username: + return "" + return base64.b64encode(f"{username}:{credential.token}".encode()).decode() def short_commit(commit: str) -> str: @@ -366,16 +382,19 @@ def _raise_clone_error( if token_missing: raise AuthenticationError( f"Authentication failed for source '{source.name}'. " - "Repository not found or requires authentication." + "Repository not found or requires authentication. " + "Run 'agr auth login' or set GITHUB_TOKEN/GH_TOKEN." ) from None raise AuthenticationError( - f"Authentication failed for source '{source.name}'." + f"Authentication failed for source '{source.name}'. " + "Run 'agr auth login' or set GITHUB_TOKEN/GH_TOKEN." ) from None # 2. Explicit "not found" responses from the server if _is_repo_not_found(lowered): raise RepoNotFoundError( - f"Repository '{owner}/{repo_name}' not found in source '{source.name}'." + f"Repository '{owner}/{repo_name}' not found in source '{source.name}'. " + "If this is a private repository, run 'agr auth login' or set GITHUB_TOKEN/GH_TOKEN." ) from None # 3. DNS / network failures @@ -389,7 +408,8 @@ def _raise_clone_error( # user toward setting GITHUB_TOKEN. if token_missing and _is_ambiguous_auth_hint(lowered): raise RepoNotFoundError( - f"Repository '{owner}/{repo_name}' not found in source '{source.name}'." + f"Repository '{owner}/{repo_name}' not found in source '{source.name}'. " + "Run 'agr auth login' or set GITHUB_TOKEN/GH_TOKEN for private repositories." ) from None # 5. Catch-all for unrecognized errors diff --git a/agr/github_oauth.py b/agr/github_oauth.py new file mode 100644 index 0000000..088f913 --- /dev/null +++ b/agr/github_oauth.py @@ -0,0 +1,93 @@ +import os +import time +from typing import Callable + +import httpx + +from agr.auth import DeviceAuthorization +from agr.exceptions import AuthenticationError + +GITHUB_DEVICE_CODE_URL = "https://github.com/login/device/code" +GITHUB_ACCESS_TOKEN_URL = "https://github.com/login/oauth/access_token" +AGR_GITHUB_OAUTH_CLIENT_ID = "Ov23li9UKn7X2CZq7VIi" +UNCONFIGURED_GITHUB_OAUTH_CLIENT_ID = "replace-with-agr-github-oauth-client-id" +GITHUB_OAUTH_SCOPE = "repo" +MISSING_CLIENT_ID_MESSAGE = ( + "GitHub OAuth login is not configured for this build of agr. " + "Set AGR_GITHUB_OAUTH_CLIENT_ID to a GitHub OAuth app client ID with device flow enabled, " + "or set GITHUB_TOKEN/GH_TOKEN." +) + + +class GitHubOAuthDeviceFlow: + def __init__( + self, + client_id: str = AGR_GITHUB_OAUTH_CLIENT_ID, + client: httpx.Client | None = None, + sleep: Callable[[float], None] = time.sleep, + ) -> None: + self.client_id = os.environ.get("AGR_GITHUB_OAUTH_CLIENT_ID", client_id) + self.client = client or httpx.Client(timeout=30) + self.sleep = sleep + + def request_device_authorization(self) -> DeviceAuthorization: + if self.client_id == UNCONFIGURED_GITHUB_OAUTH_CLIENT_ID: + raise AuthenticationError(MISSING_CLIENT_ID_MESSAGE) + response = self.client.post( + GITHUB_DEVICE_CODE_URL, + data={"client_id": self.client_id, "scope": GITHUB_OAUTH_SCOPE}, + headers={"Accept": "application/json"}, + ) + try: + response.raise_for_status() + except httpx.HTTPStatusError as exc: + raise AuthenticationError( + f"GitHub device authorization failed: HTTP {exc.response.status_code}." + ) from None + data = response.json() + return DeviceAuthorization( + device_code=str(data["device_code"]), + user_code=str(data["user_code"]), + verification_uri=str(data["verification_uri"]), + expires_in=int(data["expires_in"]), + interval=int(data.get("interval", 5)), + ) + + def poll_for_token(self, authorization: DeviceAuthorization) -> str: + interval = authorization.interval + deadline = time.monotonic() + authorization.expires_in + while time.monotonic() < deadline: + self.sleep(interval) + response = self.client.post( + GITHUB_ACCESS_TOKEN_URL, + data={ + "client_id": self.client_id, + "device_code": authorization.device_code, + "grant_type": "urn:ietf:params:oauth:grant-type:device_code", + }, + headers={"Accept": "application/json"}, + ) + response.raise_for_status() + data = response.json() + token = data.get("access_token") + if isinstance(token, str) and token.strip(): + return token.strip() + error = data.get("error") + if error == "authorization_pending": + continue + if error == "slow_down": + interval += 5 + continue + if error == "expired_token": + raise AuthenticationError( + "GitHub login expired. Run 'agr auth login' again." + ) + if error == "access_denied": + raise AuthenticationError("GitHub login was cancelled or denied.") + if isinstance(error, str): + description = data.get("error_description") + if isinstance(description, str) and description.strip(): + raise AuthenticationError(description.strip()) + raise AuthenticationError(f"GitHub login failed: {error}") + raise AuthenticationError("GitHub login failed: no access token returned.") + raise AuthenticationError("GitHub login expired. Run 'agr auth login' again.") diff --git a/agr/main.py b/agr/main.py index 8c56a73..c49a088 100644 --- a/agr/main.py +++ b/agr/main.py @@ -1,10 +1,12 @@ """CLI entry point for agr.""" from typing import Annotated +import webbrowser import typer from agr import __version__ +from agr import auth as agr_auth from agr.commands.add import run_add from agr.commands.init import run_init from agr.commands.list import run_list @@ -22,7 +24,8 @@ run_config_show, run_config_unset, ) -from agr.console import set_quiet +from agr.console import get_console, set_quiet +from agr.github_oauth import GitHubOAuthDeviceFlow from agr.tool import available_tools_string GlobalScope = Annotated[ @@ -45,6 +48,13 @@ ) app.add_typer(config_app, name="config") +auth_app = typer.Typer( + name="auth", + help="Manage GitHub authentication.", + no_args_is_help=True, +) +app.add_typer(auth_app, name="auth") + # --- New unified config commands --- @@ -128,6 +138,83 @@ def config_remove( run_config_remove(key, values, global_scope) +def print_auth_status(result: agr_auth.AuthStatus) -> None: + console = get_console() + if result.source == "stored": + method = f" ({result.method})" if result.method else "" + console.print(f"Authenticated with stored agr GitHub token{method}.") + return + console.print(f"Authenticated with {result.source} environment token.") + + +@auth_app.command("login") +def auth_login( + oauth: Annotated[ + bool, + typer.Option("--oauth", help="Authenticate using GitHub OAuth device flow."), + ] = False, +) -> None: + """Authenticate with GitHub.""" + console = get_console() + result = agr_auth.GitHubAuthStatusChecker().get_status() + if result.authenticated: + print_auth_status(result) + console.print("Already logged in.") + return + + def show_device_prompt(authorization: agr_auth.DeviceAuthorization) -> None: + console.print("Open this URL to authenticate with GitHub:") + console.print(authorization.verification_uri) + console.print(f"Enter code: [bold]{authorization.user_code}[/bold]") + webbrowser.open(authorization.verification_uri) + console.print("Waiting for GitHub authorization...") + + if oauth: + strategy: agr_auth.GitHubLoginStrategy = agr_auth.OAuthGitHubLoginStrategy( + GitHubOAuthDeviceFlow(), + show_device_prompt, + ) + else: + strategy = agr_auth.UsernamePasswordGitHubLoginStrategy( + username_prompt=lambda: typer.prompt("GitHub username"), + password_prompt=lambda: typer.prompt( + "GitHub password or token", + hide_input=True, + ), + ) + + try: + agr_auth.login(strategy) + except Exception as exc: + console.print(f"[red]Error:[/red] {exc}") + raise typer.Exit(1) from None + console.print("Authenticated with GitHub.") + + +@auth_app.command("status") +def auth_status() -> None: + """Show GitHub authentication status.""" + result = agr_auth.status() + console = get_console() + if not result.authenticated: + console.print( + "Not authenticated. Run 'agr auth login' or set GITHUB_TOKEN/GH_TOKEN." + ) + raise typer.Exit(1) + print_auth_status(result) + + +@auth_app.command("logout") +def auth_logout() -> None: + """Remove stored GitHub authentication.""" + removed = agr_auth.logout() + console = get_console() + if removed: + console.print("Removed stored GitHub token.") + return + console.print("No stored GitHub token found.") + + def version_callback(value: bool) -> None: """Print version and exit.""" if value: diff --git a/agr/tool.py b/agr/tool.py index 6e01a82..d60898b 100644 --- a/agr/tool.py +++ b/agr/tool.py @@ -39,7 +39,9 @@ class ToolConfig: skill_prompt_prefix: str = "/" # Prefix for invoking a skill install_hint: str | None = None # Help text for installation detection_signals: tuple[str, ...] = () # Paths that indicate tool presence - instruction_file: str = DEFAULT_INSTRUCTION_FILE # Canonical instruction file for this tool + instruction_file: str = ( + DEFAULT_INSTRUCTION_FILE # Canonical instruction file for this tool + ) def get_skills_dir(self, repo_root: Path) -> Path: """Get the skills directory for this tool in a repo.""" diff --git a/tests/cli/agr/test_auth.py b/tests/cli/agr/test_auth.py new file mode 100644 index 0000000..03df5ec --- /dev/null +++ b/tests/cli/agr/test_auth.py @@ -0,0 +1,112 @@ +import json +from pathlib import Path + +from tests.cli.assertions import assert_cli +from tests.cli.runner import run_cli + + +def test_auth_login_default_stores_username_password_credential(tmp_path: Path) -> None: + result = run_cli( + ["agr", "auth", "login"], + env={"HOME": str(tmp_path)}, + input="octocat\nsecret-token\n", + ) + + auth_file = tmp_path / ".agr" / "auth.json" + assert_cli(result).succeeded().stdout_contains("Authenticated with GitHub") + assert json.loads(auth_file.read_text()) == { + "github_token": "secret-token", + "method": "username_password", + "username": "octocat", + } + assert "secret-token" not in result.stdout + assert "secret-token" not in result.stderr + + +def test_auth_login_oauth_uses_device_flow_flag(tmp_path: Path) -> None: + result = run_cli( + ["agr", "auth", "login", "--oauth"], + env={"HOME": str(tmp_path), "AGR_GITHUB_OAUTH_CLIENT_ID": ""}, + ) + + assert_cli(result).failed().stdout_contains("GitHub device authorization failed") + assert "GitHub username" not in result.stdout + + +def test_auth_login_skips_prompt_when_stored_oauth_token_exists(tmp_path: Path) -> None: + auth_dir = tmp_path / ".agr" + auth_dir.mkdir() + (auth_dir / "auth.json").write_text( + json.dumps({"github_token": "stored-token", "method": "oauth"}) + ) + + result = run_cli( + ["agr", "auth", "login"], + env={"HOME": str(tmp_path), "GITHUB_TOKEN": "", "GH_TOKEN": ""}, + ) + + assert_cli(result).succeeded().stdout_contains("Already logged in") + assert "GitHub username" not in result.stdout + + +def test_auth_login_skips_prompt_when_environment_token_exists(tmp_path: Path) -> None: + result = run_cli( + ["agr", "auth", "login"], + env={"HOME": str(tmp_path), "GITHUB_TOKEN": "github-token"}, + ) + + assert_cli(result).succeeded().stdout_contains("Already logged in") + assert "GitHub username" not in result.stdout + + +def test_auth_status_reports_environment_token(tmp_path: Path) -> None: + result = run_cli( + ["agr", "auth", "status"], + env={"HOME": str(tmp_path), "GITHUB_TOKEN": "github-token"}, + ) + + assert_cli(result).succeeded().stdout_contains("GITHUB_TOKEN environment token") + assert "github-token" not in result.stdout + + +def test_auth_status_reports_stored_token(tmp_path: Path) -> None: + auth_dir = tmp_path / ".agr" + auth_dir.mkdir() + (auth_dir / "auth.json").write_text(json.dumps({"github_token": "stored-token"})) + + result = run_cli( + ["agr", "auth", "status"], + env={"HOME": str(tmp_path), "GITHUB_TOKEN": "", "GH_TOKEN": ""}, + ) + + assert_cli(result).succeeded().stdout_contains("stored agr GitHub token") + assert "stored-token" not in result.stdout + + +def test_auth_status_reports_not_authenticated(tmp_path: Path) -> None: + result = run_cli( + ["agr", "auth", "status"], + env={"HOME": str(tmp_path), "GITHUB_TOKEN": "", "GH_TOKEN": ""}, + ) + + assert result.returncode == 1 + assert "Not authenticated" in result.stdout + assert "agr auth login" in result.stdout + + +def test_auth_logout_removes_stored_token(tmp_path: Path) -> None: + auth_dir = tmp_path / ".agr" + auth_dir.mkdir() + auth_file = auth_dir / "auth.json" + auth_file.write_text(json.dumps({"github_token": "stored-token"})) + + result = run_cli(["agr", "auth", "logout"], env={"HOME": str(tmp_path)}) + + assert_cli(result).succeeded().stdout_contains("Removed stored GitHub token") + assert not auth_file.exists() + + +def test_auth_logout_when_no_token_is_successful(tmp_path: Path) -> None: + result = run_cli(["agr", "auth", "logout"], env={"HOME": str(tmp_path)}) + + assert_cli(result).succeeded().stdout_contains("No stored GitHub token found") diff --git a/tests/cli/agr/test_sync.py b/tests/cli/agr/test_sync.py index dbf98ea..26a4b75 100644 --- a/tests/cli/agr/test_sync.py +++ b/tests/cli/agr/test_sync.py @@ -173,6 +173,7 @@ def test_sync_rewrites_shorthand_handle_in_toml( dep = config.dependencies[0] assert dep.handle == "acme/skills/test-skill" + class TestAgrSyncRalph: """Tests for agr sync with ralph dependencies.""" diff --git a/tests/test_docs.py b/tests/test_docs.py index 6d44e1c..daf0a33 100644 --- a/tests/test_docs.py +++ b/tests/test_docs.py @@ -44,18 +44,14 @@ def test_index_exists(self): """Home page exists.""" assert (DOCS_DIR / "index.md").exists() - def test_creating_exists(self): - """Creating skills page exists.""" - assert (DOCS_DIR / "creating.md").exists() + def test_managing_exists(self): + """Managing skills page exists.""" + assert (DOCS_DIR / "managing.md").exists() def test_reference_exists(self): """Reference page exists.""" assert (DOCS_DIR / "reference.md").exists() - def test_llms_txt_exists(self): - """llms.txt exists.""" - assert (DOCS_DIR / "llms.txt").exists() - class TestInternalLinks: """Test that internal links resolve to existing files.""" @@ -189,33 +185,15 @@ class TestToolDocumentation: ALL_TOOL_NAMES = set(TOOLS.keys()) - def test_configuration_mentions_all_tools(self): - """configuration.md tools table lists all supported tools.""" - content = (DOCS_DIR / "configuration.md").read_text() - # Check the Multi-Tool Setup section which has the tools table. - section_marker = "## Multi-Tool Setup" - assert section_marker in content, ( - "configuration.md missing 'Multi-Tool Setup' section" - ) + def test_managing_mentions_all_tools(self): + """managing.md Multi-tool section lists all supported tools.""" + content = (DOCS_DIR / "managing.md").read_text() + section_marker = "## Multi-tool" + assert section_marker in content, "managing.md missing 'Multi-tool' section" section = content[content.index(section_marker) :] for tool in self.ALL_TOOL_NAMES: assert tool in section.lower(), ( - f"Tool '{tool}' not in configuration.md Multi-Tool Setup section" - ) - - def test_agrx_cli_table_mentions_all_tools(self): - """agrx.md CLI Requirements table lists all supported tools.""" - content = (DOCS_DIR / "agrx.md").read_text() - # Extract just the CLI Requirements section to ensure tools appear - # in the table, not just in passing mentions elsewhere on the page. - cli_section_marker = "## Tool CLI Requirements" - assert cli_section_marker in content, ( - "agrx.md missing 'Tool CLI Requirements' section" - ) - cli_section = content[content.index(cli_section_marker) :] - for tool in self.ALL_TOOL_NAMES: - assert tool in cli_section.lower(), ( - f"Tool '{tool}' not in agrx.md CLI Requirements table" + f"Tool '{tool}' not in managing.md Multi-tool section" ) def test_reference_agrx_section_mentions_all_tools(self): @@ -226,24 +204,6 @@ def test_reference_agrx_section_mentions_all_tools(self): f"Tool '{tool}' not mentioned in reference.md" ) - def test_tools_page_detection_signals_match_code(self): - """tools.md Detection Signals table matches ToolConfig.detection_signals.""" - content = (DOCS_DIR / "tools.md").read_text() - section_marker = "## Detection Signals" - assert section_marker in content, "tools.md missing 'Detection Signals' section" - section = content[content.index(section_marker) :] - # Stop at the next h2 section - next_h2 = section.find("\n## ", 1) - if next_h2 != -1: - section = section[:next_h2] - - for name, tool in self.TOOLS.items(): - for signal in tool.detection_signals: - assert signal in section, ( - f"Detection signal '{signal}' for tool '{name}' " - f"missing from tools.md Detection Signals table" - ) - class TestContentQuality: """Test documentation content quality.""" @@ -253,12 +213,12 @@ def test_index_has_quick_start(self): content = (DOCS_DIR / "index.md").read_text() assert "uvx agr add" in content or "uv tool install agr" in content - def test_creating_has_skill_example(self): - """Creating page has a complete skill example.""" - content = (DOCS_DIR / "creating.md").read_text() - assert "SKILL.md" in content - assert "name:" in content - assert "description:" in content + def test_managing_has_config_example(self): + """Managing page has a complete agr.toml example.""" + content = (DOCS_DIR / "managing.md").read_text() + assert "agr.toml" in content + assert "tools" in content + assert "dependencies" in content def test_reference_has_all_commands(self): """Reference page documents all main commands.""" diff --git a/tests/test_fetcher.py b/tests/test_fetcher.py index 054068a..3bb1f7e 100644 --- a/tests/test_fetcher.py +++ b/tests/test_fetcher.py @@ -42,6 +42,7 @@ def test_get_github_token_returns_none_when_unset(self, monkeypatch): """None returned when no token env vars are set.""" monkeypatch.delenv("GITHUB_TOKEN", raising=False) monkeypatch.delenv("GH_TOKEN", raising=False) + monkeypatch.setattr("agr.git.read_stored_github_credential", lambda: None) assert get_github_token() is None def test_get_github_token_prefers_github_token(self, monkeypatch): diff --git a/tests/test_sdk_hub.py b/tests/test_sdk_hub.py index c60026c..8d52e93 100644 --- a/tests/test_sdk_hub.py +++ b/tests/test_sdk_hub.py @@ -13,6 +13,7 @@ def _raises_invalid_handle(repo_handle: str) -> None: # --- Control-character / whitespace rejection (SF-008) --- + def test_list_skills_rejects_newline_in_owner(): _raises_invalid_handle("owner\nevil/repo") @@ -27,6 +28,7 @@ def test_list_skills_rejects_space_in_owner_single_part(): # --- YAML character rejection (SF-010 / SF-011 / SF-012) --- + def test_list_skills_rejects_bracket_in_owner(): _raises_invalid_handle("[owner]/repo") @@ -41,6 +43,7 @@ def test_list_skills_rejects_pipe_in_owner(): # --- Path traversal rejection (SF-003) --- + def test_list_skills_rejects_dotdot_owner(): _raises_invalid_handle("../repo") @@ -51,6 +54,7 @@ def test_list_skills_rejects_dotdot_repo(): # --- Valid handles still work (no regression) --- + def test_list_skills_valid_single_part_raises_network_not_handle(monkeypatch): """Valid owner should pass handle validation; network call is mocked to raise.""" import agr.sdk.hub as hub diff --git a/tests/unit/test_auth.py b/tests/unit/test_auth.py new file mode 100644 index 0000000..9cfc321 --- /dev/null +++ b/tests/unit/test_auth.py @@ -0,0 +1,278 @@ +import json +import stat +from pathlib import Path + +import pytest + +from agr.auth import ( + AuthStatus, + DeviceAuthorization, + FileTokenStore, + GitHubAuthStatusChecker, + OAuthGitHubLoginStrategy, + StoredGitHubCredential, + UsernamePasswordGitHubLoginStrategy, + login, + logout, + status, +) +from agr.exceptions import AgrError + + +class FakeOAuthClient: + def __init__(self) -> None: + self.authorization = DeviceAuthorization( + "device", + "USER-CODE", + "https://github.com/login/device", + 900, + 1, + ) + self.token = "stored-token" + + def request_device_authorization(self) -> DeviceAuthorization: + return self.authorization + + def poll_for_token(self, authorization: DeviceAuthorization) -> str: + assert authorization == self.authorization + return self.token + + +class MemoryTokenStore: + def __init__(self, credential: StoredGitHubCredential | None = None) -> None: + self.credential = credential + self.deleted = False + + def read_credential(self) -> StoredGitHubCredential | None: + return self.credential + + def write_credential(self, credential: StoredGitHubCredential) -> None: + self.credential = credential + + def read_token(self) -> str | None: + return self.credential.token if self.credential else None + + def write_token(self, token: str) -> None: + self.credential = StoredGitHubCredential(method="oauth", token=token) + + def delete_token(self) -> bool: + had_token = self.credential is not None + self.credential = None + self.deleted = True + return had_token + + +def test_file_token_store_writes_reads_and_sets_permissions(tmp_path: Path) -> None: + auth_file = tmp_path / ".agr" / "auth.json" + store = FileTokenStore(auth_file) + credential = StoredGitHubCredential( + method="username_password", + token=" token-value ", + username=" octocat ", + ) + + store.write_credential(credential) + + assert store.read_credential() == StoredGitHubCredential( + method="username_password", + token="token-value", + username="octocat", + ) + assert json.loads(auth_file.read_text()) == { + "github_token": "token-value", + "method": "username_password", + "username": "octocat", + } + assert stat.S_IMODE(auth_file.stat().st_mode) == 0o600 + + +def test_file_token_store_reads_backcompat_token_format(tmp_path: Path) -> None: + auth_file = tmp_path / ".agr" / "auth.json" + auth_file.parent.mkdir() + auth_file.write_text(json.dumps({"github_token": " stored-token "})) + store = FileTokenStore(auth_file) + + assert store.read_credential() == StoredGitHubCredential( + method="oauth", + token="stored-token", + ) + assert store.read_token() == "stored-token" + + +def test_file_token_store_write_token_uses_oauth_method(tmp_path: Path) -> None: + auth_file = tmp_path / ".agr" / "auth.json" + store = FileTokenStore(auth_file) + + store.write_token(" token-value ") + + assert store.read_credential() == StoredGitHubCredential( + method="oauth", + token="token-value", + ) + + +def test_file_token_store_delete_removes_file(tmp_path: Path) -> None: + auth_file = tmp_path / ".agr" / "auth.json" + store = FileTokenStore(auth_file) + store.write_token("token-value") + + assert store.delete_token() is True + assert store.read_token() is None + assert store.delete_token() is False + + +def test_file_token_store_ignores_missing_invalid_and_empty_files( + tmp_path: Path, +) -> None: + auth_file = tmp_path / ".agr" / "auth.json" + store = FileTokenStore(auth_file) + + assert store.read_token() is None + auth_file.parent.mkdir() + auth_file.write_text("not-json") + assert store.read_token() is None + auth_file.write_text(json.dumps({"github_token": " "})) + assert store.read_token() is None + + +def test_file_token_store_rejects_empty_token(tmp_path: Path) -> None: + store = FileTokenStore(tmp_path / ".agr" / "auth.json") + + with pytest.raises(AgrError, match="empty"): + store.write_token(" ") + + +def test_file_token_store_rejects_username_password_without_username( + tmp_path: Path, +) -> None: + store = FileTokenStore(tmp_path / ".agr" / "auth.json") + + with pytest.raises(AgrError, match="username"): + store.write_credential( + StoredGitHubCredential(method="username_password", token="token") + ) + + +def test_auth_status_checker_prefers_environment_tokens() -> None: + store = MemoryTokenStore( + StoredGitHubCredential(method="oauth", token="stored-token") + ) + checker = GitHubAuthStatusChecker( + store, {"GITHUB_TOKEN": " github-token ", "GH_TOKEN": "gh-token"} + ) + + result = checker.get_status() + + assert result == AuthStatus(authenticated=True, source="GITHUB_TOKEN", method="env") + + +def test_status_prefers_environment_tokens() -> None: + store = MemoryTokenStore( + StoredGitHubCredential(method="oauth", token="stored-token") + ) + + result = status(store, {"GITHUB_TOKEN": " github-token ", "GH_TOKEN": "gh-token"}) + + assert result == AuthStatus(authenticated=True, source="GITHUB_TOKEN", method="env") + + +def test_status_uses_gh_token_before_stored_token() -> None: + store = MemoryTokenStore( + StoredGitHubCredential(method="oauth", token="stored-token") + ) + + result = status(store, {"GH_TOKEN": "gh-token"}) + + assert result == AuthStatus(authenticated=True, source="GH_TOKEN", method="env") + + +def test_status_uses_stored_token() -> None: + store = MemoryTokenStore( + StoredGitHubCredential(method="username_password", token="stored-token") + ) + + result = status(store, {}) + + assert result == AuthStatus( + authenticated=True, + source="stored", + method="username_password", + ) + + +def test_status_reports_not_authenticated() -> None: + assert status(MemoryTokenStore(), {}) == AuthStatus( + authenticated=False, + source=None, + method=None, + ) + + +def test_username_password_strategy_uses_values() -> None: + strategy = UsernamePasswordGitHubLoginStrategy( + username=" octocat ", + password=" secret-token ", + ) + + assert strategy.login() == StoredGitHubCredential( + method="username_password", + token="secret-token", + username="octocat", + ) + + +def test_username_password_strategy_uses_prompts() -> None: + strategy = UsernamePasswordGitHubLoginStrategy( + username_prompt=lambda: "octocat", + password_prompt=lambda: "secret-token", + ) + + assert strategy.login() == StoredGitHubCredential( + method="username_password", + token="secret-token", + username="octocat", + ) + + +def test_username_password_strategy_rejects_empty_values() -> None: + strategy = UsernamePasswordGitHubLoginStrategy(username="octocat", password=" ") + + with pytest.raises(AgrError, match="password"): + strategy.login() + + +def test_oauth_strategy_calls_prompt_and_polls() -> None: + oauth = FakeOAuthClient() + prompts: list[DeviceAuthorization] = [] + strategy = OAuthGitHubLoginStrategy(oauth, prompts.append) + + credential = strategy.login() + + assert credential == StoredGitHubCredential(method="oauth", token="stored-token") + assert prompts == [oauth.authorization] + + +def test_login_calls_strategy_and_saves_credential() -> None: + store = MemoryTokenStore() + strategy = UsernamePasswordGitHubLoginStrategy( + username="octocat", + password="secret-token", + ) + + credential = login(strategy, store) + + assert credential == StoredGitHubCredential( + method="username_password", + token="secret-token", + username="octocat", + ) + assert store.read_credential() == credential + + +def test_logout_deletes_token() -> None: + store = MemoryTokenStore( + StoredGitHubCredential(method="oauth", token="stored-token") + ) + + assert logout(store) is True + assert store.read_token() is None diff --git a/tests/unit/test_git.py b/tests/unit/test_git.py index 795288f..d17400f 100644 --- a/tests/unit/test_git.py +++ b/tests/unit/test_git.py @@ -1,5 +1,6 @@ """Unit tests for the git module's pure helper functions.""" +import base64 import os import subprocess from pathlib import Path @@ -7,8 +8,10 @@ import pytest +from agr.auth import StoredGitHubCredential from agr.exceptions import AgrError, AuthenticationError, RepoNotFoundError from agr.git import fetch_and_checkout_commit, get_github_token, validate_commit_sha +from agr.git import _run_git as run_git from agr.git import _is_github_source as is_github_source from agr.git import _partial_clone_unsupported as partial_clone_unsupported from agr.git import _build_github_auth_env as build_github_auth_env @@ -20,7 +23,10 @@ class TestGetGithubToken: """Tests for get_github_token().""" def test_returns_none_when_no_env_vars(self): - with patch.dict(os.environ, {}, clear=True): + with ( + patch.dict(os.environ, {}, clear=True), + patch("agr.git.read_stored_github_credential", return_value=None), + ): assert get_github_token() is None def test_prefers_github_token_over_gh_token(self): @@ -49,9 +55,24 @@ def test_strips_whitespace_from_token(self): assert get_github_token() == "my-token" def test_returns_none_when_both_empty(self): - with patch.dict(os.environ, {"GITHUB_TOKEN": "", "GH_TOKEN": ""}, clear=True): + with ( + patch.dict(os.environ, {"GITHUB_TOKEN": "", "GH_TOKEN": ""}, clear=True), + patch("agr.git.read_stored_github_credential", return_value=None), + ): assert get_github_token() is None + def test_falls_back_to_stored_agr_token(self): + with ( + patch.dict(os.environ, {}, clear=True), + patch( + "agr.git.read_stored_github_credential", + return_value=StoredGitHubCredential( + method="oauth", token="stored-token" + ), + ), + ): + assert get_github_token() == "stored-token" + class TestIsGithubSource: """Tests for _is_github_source().""" @@ -110,6 +131,11 @@ class TestRaiseCloneError: types. The classification order matters and is tested here. """ + @pytest.fixture(autouse=True) + def no_stored_token(self): + with patch("agr.git.read_stored_github_credential", return_value=None): + yield + GITHUB_SOURCE = SourceConfig( name="github", type="git", @@ -493,6 +519,44 @@ def test_rejects_refs_heads_prefix(self): validate_commit_sha("refs/heads/main") +class TestRunGit: + def test_disables_terminal_prompts(self): + captured = {} + + def fake_run(*args, **kwargs): + captured.update(kwargs) + return subprocess.CompletedProcess(args[0], 0, "", "") + + with ( + patch("agr.git.subprocess.run", side_effect=fake_run), + patch("agr.git._build_github_auth_env", return_value={}), + patch.dict(os.environ, {}, clear=True), + ): + result = run_git(["git", "status"]) + + assert result.returncode == 0 + assert captured["env"]["GIT_TERMINAL_PROMPT"] == "0" + + def test_preserves_auth_env_when_disabling_prompts(self): + captured = {} + + def fake_run(*args, **kwargs): + captured.update(kwargs) + return subprocess.CompletedProcess(args[0], 0, "", "") + + with ( + patch("agr.git.subprocess.run", side_effect=fake_run), + patch( + "agr.git._build_github_auth_env", return_value={"GIT_CONFIG_COUNT": "1"} + ), + patch.dict(os.environ, {}, clear=True), + ): + run_git(["git", "status"]) + + assert captured["env"]["GIT_TERMINAL_PROMPT"] == "0" + assert captured["env"]["GIT_CONFIG_COUNT"] == "1" + + class TestBuildGithubAuthEnv: """Tests for _build_github_auth_env(). @@ -501,7 +565,10 @@ class TestBuildGithubAuthEnv: """ def test_returns_empty_when_no_token(self): - with patch.dict(os.environ, {}, clear=True): + with ( + patch.dict(os.environ, {}, clear=True), + patch("agr.git.read_stored_github_credential", return_value=None), + ): assert build_github_auth_env() == {} def test_returns_auth_env_when_token_set(self): @@ -509,7 +576,8 @@ def test_returns_auth_env_when_token_set(self): env = build_github_auth_env() assert env["GIT_CONFIG_COUNT"] == "1" assert env["GIT_CONFIG_KEY_0"] == "http.https://github.com/.extraheader" - assert env["GIT_CONFIG_VALUE_0"] == "AUTHORIZATION: bearer ghp_test123" + expected = base64.b64encode(b"x-access-token:ghp_test123").decode() + assert env["GIT_CONFIG_VALUE_0"] == f"AUTHORIZATION: basic {expected}" def test_token_value_not_in_config_keys(self): """Token must only appear in GIT_CONFIG_VALUE, never in keys.""" @@ -540,10 +608,38 @@ def test_appends_to_existing_git_config_count(self): def test_uses_gh_token_fallback(self): with patch.dict(os.environ, {"GH_TOKEN": "gh_fallback"}, clear=True): env = build_github_auth_env() - assert env["GIT_CONFIG_VALUE_0"] == "AUTHORIZATION: bearer gh_fallback" + expected = base64.b64encode(b"x-access-token:gh_fallback").decode() + assert env["GIT_CONFIG_VALUE_0"] == f"AUTHORIZATION: basic {expected}" + + def test_uses_stored_oauth_credential(self): + credential = StoredGitHubCredential(method="oauth", token="stored-token") + with ( + patch.dict(os.environ, {}, clear=True), + patch("agr.git.read_stored_github_credential", return_value=credential), + ): + env = build_github_auth_env() + expected = base64.b64encode(b"x-access-token:stored-token").decode() + assert env["GIT_CONFIG_VALUE_0"] == f"AUTHORIZATION: basic {expected}" + + def test_uses_stored_username_password_credential(self): + credential = StoredGitHubCredential( + method="username_password", + token="secret-token", + username="octocat", + ) + with ( + patch.dict(os.environ, {}, clear=True), + patch("agr.git.read_stored_github_credential", return_value=credential), + ): + env = build_github_auth_env() + expected = base64.b64encode(b"octocat:secret-token").decode() + assert env["GIT_CONFIG_VALUE_0"] == f"AUTHORIZATION: basic {expected}" def test_whitespace_only_token_returns_empty(self): - with patch.dict(os.environ, {"GITHUB_TOKEN": " "}, clear=True): + with ( + patch.dict(os.environ, {"GITHUB_TOKEN": " "}, clear=True), + patch("agr.git.read_stored_github_credential", return_value=None), + ): assert build_github_auth_env() == {} def test_scoped_to_github_com(self): @@ -564,4 +660,5 @@ def test_ignores_malformed_git_config_count(self): env = build_github_auth_env() assert env["GIT_CONFIG_COUNT"] == "1" assert "GIT_CONFIG_KEY_0" in env - assert env["GIT_CONFIG_VALUE_0"] == "AUTHORIZATION: bearer tok" + expected = base64.b64encode(b"x-access-token:tok").decode() + assert env["GIT_CONFIG_VALUE_0"] == f"AUTHORIZATION: basic {expected}" diff --git a/tests/unit/test_github_oauth.py b/tests/unit/test_github_oauth.py new file mode 100644 index 0000000..93152c6 --- /dev/null +++ b/tests/unit/test_github_oauth.py @@ -0,0 +1,161 @@ +import httpx +import pytest + +from agr.auth import DeviceAuthorization +from agr.exceptions import AuthenticationError +from agr.github_oauth import ( + GITHUB_ACCESS_TOKEN_URL, + GITHUB_DEVICE_CODE_URL, + GitHubOAuthDeviceFlow, + MISSING_CLIENT_ID_MESSAGE, + UNCONFIGURED_GITHUB_OAUTH_CLIENT_ID, +) + + +def make_client( + responses: list[dict[str, object]], +) -> tuple[httpx.Client, list[httpx.Request]]: + requests: list[httpx.Request] = [] + + def handler(request: httpx.Request) -> httpx.Response: + requests.append(request) + return httpx.Response(200, json=responses.pop(0)) + + return httpx.Client(transport=httpx.MockTransport(handler)), requests + + +def test_request_device_authorization_requires_configured_client_id() -> None: + flow = GitHubOAuthDeviceFlow(client_id=UNCONFIGURED_GITHUB_OAUTH_CLIENT_ID) + + with pytest.raises(AuthenticationError, match="not configured"): + flow.request_device_authorization() + + assert "AGR_GITHUB_OAUTH_CLIENT_ID" in MISSING_CLIENT_ID_MESSAGE + + +def test_uses_client_id_environment_override(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("AGR_GITHUB_OAUTH_CLIENT_ID", "env-client-id") + client, requests = make_client( + [ + { + "device_code": "device-code", + "user_code": "USER-CODE", + "verification_uri": "https://github.com/login/device", + "expires_in": 900, + "interval": 5, + } + ] + ) + flow = GitHubOAuthDeviceFlow(client_id="client-id", client=client) + + flow.request_device_authorization() + + assert "client_id=env-client-id" in requests[0].content.decode() + + +def test_request_device_authorization_raises_friendly_http_error() -> None: + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(404, request=request) + + client = httpx.Client(transport=httpx.MockTransport(handler)) + flow = GitHubOAuthDeviceFlow(client_id="client-id", client=client) + + with pytest.raises(AuthenticationError, match="HTTP 404"): + flow.request_device_authorization() + + +def test_request_device_authorization_returns_caller_facing_data() -> None: + client, requests = make_client( + [ + { + "device_code": "device-code", + "user_code": "USER-CODE", + "verification_uri": "https://github.com/login/device", + "expires_in": 900, + "interval": 5, + } + ] + ) + flow = GitHubOAuthDeviceFlow(client_id="client-id", client=client) + + authorization = flow.request_device_authorization() + + assert authorization == DeviceAuthorization( + device_code="device-code", + user_code="USER-CODE", + verification_uri="https://github.com/login/device", + expires_in=900, + interval=5, + ) + assert str(requests[0].url) == GITHUB_DEVICE_CODE_URL + assert "client_id=client-id" in requests[0].content.decode() + + +def test_poll_for_token_handles_pending_then_success() -> None: + client, requests = make_client( + [ + {"error": "authorization_pending"}, + {"access_token": " github-token "}, + ] + ) + sleeps: list[float] = [] + flow = GitHubOAuthDeviceFlow( + client_id="client-id", client=client, sleep=sleeps.append + ) + + token = flow.poll_for_token(DeviceAuthorization("device", "USER", "url", 900, 1)) + + assert token == "github-token" + assert sleeps == [1, 1] + assert [str(request.url) for request in requests] == [ + GITHUB_ACCESS_TOKEN_URL, + GITHUB_ACCESS_TOKEN_URL, + ] + + +def test_poll_for_token_handles_slow_down() -> None: + client, _ = make_client( + [ + {"error": "slow_down"}, + {"access_token": "github-token"}, + ] + ) + sleeps: list[float] = [] + flow = GitHubOAuthDeviceFlow( + client_id="client-id", client=client, sleep=sleeps.append + ) + + assert ( + flow.poll_for_token(DeviceAuthorization("device", "USER", "url", 900, 2)) + == "github-token" + ) + assert sleeps == [2, 7] + + +@pytest.mark.parametrize( + ("error", "message"), + [ + ("expired_token", "expired"), + ("access_denied", "cancelled|denied"), + ], +) +def test_poll_for_token_raises_for_terminal_errors(error: str, message: str) -> None: + client, _ = make_client([{"error": error}]) + flow = GitHubOAuthDeviceFlow( + client_id="client-id", client=client, sleep=lambda _: None + ) + + with pytest.raises(AuthenticationError, match=message): + flow.poll_for_token(DeviceAuthorization("device", "USER", "url", 900, 1)) + + +def test_poll_for_token_raises_for_unknown_error_description() -> None: + client, _ = make_client( + [{"error": "bad_verification_code", "error_description": "Bad code"}] + ) + flow = GitHubOAuthDeviceFlow( + client_id="client-id", client=client, sleep=lambda _: None + ) + + with pytest.raises(AuthenticationError, match="Bad code"): + flow.poll_for_token(DeviceAuthorization("device", "USER", "url", 900, 1))