diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 688c3f4..ffd15fa 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -20,13 +20,35 @@ jobs: - name: Configure Git for private modules run: git config --global url."https://x-access-token:${{ secrets.GITHUB_TOKEN }}@github.com/".insteadOf "https://github.com/" - - name: Provision cluster - uses: agynio/bootstrap/.github/actions/provision@main + - name: Checkout agn-cli + uses: actions/checkout@v4 + with: + repository: agynio/agn-cli + token: ${{ secrets.GITHUB_TOKEN }} + path: .agn-cli - - name: Setup DevSpace - uses: agynio/e2e/.github/actions/setup-devspace@main + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: '1.26.x' - - name: Run E2E tests - uses: agynio/e2e/.github/actions/run-tests@main + - name: Install buf + uses: bufbuild/buf-setup-action@v1 with: - service: agynd_cli + github_token: ${{ secrets.GITHUB_TOKEN }} + version: 1.66.0 + + - name: Generate protobuf bindings + run: | + buf generate buf.build/agynio/api \ + --path agynio/api/gateway/v1 \ + --include-imports + + - name: Install Codex CLI + run: npm install -g @openai/codex + + - name: Run repo-local E2E tests + run: go test -v -count=1 -tags e2e ./test/e2e/ + env: + OPENAI_API_KEY: test-key + AGN_REPO_PATH: ${{ github.workspace }}/.agn-cli diff --git a/README.md b/README.md index 11fd569..aa3a420 100644 --- a/README.md +++ b/README.md @@ -16,3 +16,23 @@ Full setup: https://github.com/agynio/architecture/blob/main/architecture/operat devspace dev devspace dev -w ``` + +## E2E validation + +The GitHub E2E workflow runs this repository's local E2E tests with: + +```bash +go test -v -count=1 -tags e2e ./test/e2e/ +``` + +Those tests validate the local agent CLI bridge behavior against deterministic +TestLLM endpoints through the Codex and AGN SDK flows. The workflow checks out +`agynio/agn-cli` so the AGN coverage builds and runs the current AGN CLI during +the test. They also build and execute this repository's `cmd/agynd` binary with +`/agyn-bin/config.json` installed by the test harness. That test uses a stub +Gateway and fake AGN agent to verify daemon startup, platform initialization, +and subscriber startup without requiring a full cluster. + +This repository does not run the centralized `agynio/e2e` smoke suite. Broader +platform and service smoke coverage remains owned by the centralized E2E +repository and service-specific workflows. diff --git a/internal/daemon/agn.go b/internal/daemon/agn.go index 5fc498f..47739c0 100644 --- a/internal/daemon/agn.go +++ b/internal/daemon/agn.go @@ -59,6 +59,7 @@ func newAgnDaemon(ctx context.Context, cfg config.Config, version string) (*Daem tracingProxy, err := tracingproxy.Start(ctx, tracingproxy.Config{ TracingAddress: cfg.TracingAddress, + ListenAddress: tracingProxyListenAddress, ThreadID: cfg.ThreadID, WorkloadID: cfg.WorkloadID, }) @@ -67,7 +68,7 @@ func newAgnDaemon(ctx context.Context, cfg config.Config, version string) (*Daem return nil, err } - otlpEndpoint := "http://" + tracingproxy.ListenAddress + otlpEndpoint := "http://" + tracingProxy.Address() agnClient, err := agnsdk.Start(ctx, agnsdk.Options{ BinaryPath: cfg.AgentBinary, Env: []string{ diff --git a/internal/daemon/claude.go b/internal/daemon/claude.go index 0907da2..8d871a1 100644 --- a/internal/daemon/claude.go +++ b/internal/daemon/claude.go @@ -39,6 +39,7 @@ func newClaudeDaemon(ctx context.Context, cfg config.Config, version string) (*D tracingProxy, err := tracingproxy.Start(ctx, tracingproxy.Config{ TracingAddress: cfg.TracingAddress, + ListenAddress: tracingProxyListenAddress, ThreadID: cfg.ThreadID, WorkloadID: cfg.WorkloadID, }) @@ -47,7 +48,7 @@ func newClaudeDaemon(ctx context.Context, cfg config.Config, version string) (*D return nil, err } - otlpEndpoint := "http://" + tracingproxy.ListenAddress + otlpEndpoint := "http://" + tracingProxy.Address() options := claude.Options{ BinaryPath: cfg.AgentBinary, WorkDir: cfg.WorkDir, diff --git a/internal/daemon/codexconfig.go b/internal/daemon/codexconfig.go index 51a0cdf..f9a0ebf 100644 --- a/internal/daemon/codexconfig.go +++ b/internal/daemon/codexconfig.go @@ -10,7 +10,6 @@ import ( "sync" "github.com/agynio/agynd-cli/internal/config" - "github.com/agynio/agynd-cli/internal/tracingproxy" codex "github.com/agynio/codex-sdk-go" ) @@ -52,21 +51,20 @@ var codexAuthEnvVars = []string{ codexEnvCodexAccessToken, } -func writeCodexConfig(llmBaseURL string, mcpServers []config.MCPServer) (string, error) { +func writeCodexConfig(llmBaseURL string, mcpServers []config.MCPServer, otlpEndpoint string) (string, error) { codexHome := filepath.Join(codexHomeEnv(), ".codex") if err := os.MkdirAll(codexHome, 0o700); err != nil { return "", fmt.Errorf("create codex home dir: %w", err) } configPath := filepath.Join(codexHome, "config.toml") - payload := codexConfig(llmBaseURL, mcpServers) + payload := codexConfig(llmBaseURL, mcpServers, otlpEndpoint) if err := os.WriteFile(configPath, []byte(payload), 0o600); err != nil { return "", fmt.Errorf("write codex config: %w", err) } return codexHome, nil } -func codexConfig(llmBaseURL string, mcpServers []config.MCPServer) string { - otlpEndpoint := "http://" + tracingproxy.ListenAddress +func codexConfig(llmBaseURL string, mcpServers []config.MCPServer, otlpEndpoint string) string { apiKeyEnv := codexAPIKeyEnv if isZitiLLMBaseURL(llmBaseURL) { apiKeyEnv = "" diff --git a/internal/daemon/codexconfig_test.go b/internal/daemon/codexconfig_test.go index dede3b2..f633f5f 100644 --- a/internal/daemon/codexconfig_test.go +++ b/internal/daemon/codexconfig_test.go @@ -9,7 +9,6 @@ import ( "testing" "github.com/agynio/agynd-cli/internal/config" - "github.com/agynio/agynd-cli/internal/tracingproxy" codex "github.com/agynio/codex-sdk-go" ) @@ -31,12 +30,14 @@ func (noopCodexClient) Close() error { return nil } +const testCodexOTLPEndpoint = "http://127.0.0.1:54321" + func TestWriteCodexConfig(t *testing.T) { tmpHome := t.TempDir() t.Setenv("HOME", tmpHome) baseURL := "https://example.com" - codexHome, err := writeCodexConfig(baseURL, nil) + codexHome, err := writeCodexConfig(baseURL, nil, testCodexOTLPEndpoint) if err != nil { t.Fatalf("expected config to be written, got %v", err) } @@ -62,7 +63,7 @@ func TestWriteCodexConfigHomeFallback(t *testing.T) { t.Setenv("HOME", "") baseURL := "https://example.com" - codexHome, err := writeCodexConfig(baseURL, nil) + codexHome, err := writeCodexConfig(baseURL, nil, testCodexOTLPEndpoint) if err != nil { t.Fatalf("expected config to be written, got %v", err) } @@ -89,7 +90,7 @@ func TestWriteCodexConfigForZitiOmitsAPIKeyEnv(t *testing.T) { t.Setenv("HOME", tmpHome) baseURL := "http://llm-proxy.ziti:443/v1" - codexHome, err := writeCodexConfig(baseURL, nil) + codexHome, err := writeCodexConfig(baseURL, nil, testCodexOTLPEndpoint) if err != nil { t.Fatalf("expected config to be written, got %v", err) } @@ -115,7 +116,7 @@ func TestWriteCodexConfigWithMCPServers(t *testing.T) { {Name: "memory", Port: 8100}, {Name: "cache", Port: 8200}, } - codexHome, err := writeCodexConfig(baseURL, mcpServers) + codexHome, err := writeCodexConfig(baseURL, mcpServers, testCodexOTLPEndpoint) if err != nil { t.Fatalf("expected config to be written, got %v", err) } @@ -152,7 +153,7 @@ func TestCodexEnvIncludesAPIKeyForPublicLLM(t *testing.T) { LLMAPIToken: "token-123", } - env := codexEnv(cfg, "/tmp/.codex", "/tmp", "http://127.0.0.1:4317") + env := codexEnv(cfg, "/tmp/.codex", "/tmp", testCodexOTLPEndpoint) if env[codexEnvOpenAIAPIKey] != "token-123" { t.Fatalf("expected API key in codex env, got %q", env[codexEnvOpenAIAPIKey]) @@ -167,7 +168,7 @@ func TestCodexEnvOmitsAPIKeyForZitiLLM(t *testing.T) { LLMAPIToken: "user-token", } - env := codexEnv(cfg, "/tmp/.codex", "/tmp", "http://127.0.0.1:4317") + env := codexEnv(cfg, "/tmp/.codex", "/tmp", testCodexOTLPEndpoint) if _, ok := env[codexEnvOpenAIAPIKey]; ok { t.Fatalf("expected ziti codex env to omit %s", codexEnvOpenAIAPIKey) @@ -217,8 +218,8 @@ func TestZitiCodexProcessReceivesNoAuthEnvConfig(t *testing.T) { LLMAPIToken: "agent-env-token", } - env := codexEnv(cfg, "/tmp/.codex", "/tmp", "http://127.0.0.1:4317") - configPayload := codexConfig(cfg.LLMBaseURL, nil) + env := codexEnv(cfg, "/tmp/.codex", "/tmp", testCodexOTLPEndpoint) + configPayload := codexConfig(cfg.LLMBaseURL, nil, testCodexOTLPEndpoint) seenEnv := map[string]bool{} _, err := withoutCodexAuthEnv(func() (codexClient, error) { for _, key := range codexAuthEnvVars { @@ -277,22 +278,21 @@ func assertCodexBaseEnv(t *testing.T, env map[string]string) { if env[codexEnvHome] != "/tmp" { t.Fatalf("expected HOME, got %q", env[codexEnvHome]) } - if env[codexEnvOTELExporterOTLPEndpoint] != "http://127.0.0.1:4317" { + if env[codexEnvOTELExporterOTLPEndpoint] != testCodexOTLPEndpoint { t.Fatalf("expected OTLP endpoint, got %q", env[codexEnvOTELExporterOTLPEndpoint]) } } func codexConfigWithAPIKey(baseURL string) string { - return codexConfigPayload(baseURL, `env_key = "OPENAI_API_KEY" + return codexConfigPayload(baseURL, testCodexOTLPEndpoint, `env_key = "OPENAI_API_KEY" `) } func codexConfigWithoutAPIKey(baseURL string) string { - return codexConfigPayload(baseURL, "") + return codexConfigPayload(baseURL, testCodexOTLPEndpoint, "") } -func codexConfigPayload(baseURL, apiKeyEnv string) string { - otlpEndpoint := "http://" + tracingproxy.ListenAddress +func codexConfigPayload(baseURL, otlpEndpoint, apiKeyEnv string) string { return fmt.Sprintf(`model_provider = "platform" approval_policy = "never" sandbox_mode = "danger-full-access" diff --git a/internal/daemon/daemon.go b/internal/daemon/daemon.go index 5d408f0..8726f02 100644 --- a/internal/daemon/daemon.go +++ b/internal/daemon/daemon.go @@ -42,6 +42,7 @@ const ( opAgnTurn = "agn_turn" opClaudeTurn = "claude_turn" opProcessSignalShutdown = "process_signal/shutdown" + tracingProxyListenAddress = "127.0.0.1:0" ) const ( @@ -253,29 +254,32 @@ func newCodexDaemon(ctx context.Context, cfg config.Config, version string) (*Da tracker := codexbridge.NewTurnTracker() bridge := codexbridge.New(tracker) threadsMapping := codexbridge.NewThreadMapping() - codexHome, err := writeCodexConfig(cfg.LLMBaseURL, cfg.MCPServers) + + tracingProxy, err := tracingproxy.Start(ctx, tracingproxy.Config{ + TracingAddress: cfg.TracingAddress, + ListenAddress: tracingProxyListenAddress, + ThreadID: cfg.ThreadID, + WorkloadID: cfg.WorkloadID, + }) if err != nil { _ = setup.gatewayConn.Close() return nil, err } - - if err := runInitScripts(ctx, setup.agents, cfg.AgentID.String(), cfg.WorkDir); err != nil { + otlpEndpoint := "http://" + tracingProxy.Address() + codexHome, err := writeCodexConfig(cfg.LLMBaseURL, cfg.MCPServers, otlpEndpoint) + if err != nil { + tracingProxy.Close() _ = setup.gatewayConn.Close() return nil, err } - codexHomeValue := codexHomeEnv() - mappingStore := codexbridge.NewThreadMappingStore(codexHomeValue) - tracingProxy, err := tracingproxy.Start(ctx, tracingproxy.Config{ - TracingAddress: cfg.TracingAddress, - ThreadID: cfg.ThreadID, - WorkloadID: cfg.WorkloadID, - }) - if err != nil { + if err := runInitScripts(ctx, setup.agents, cfg.AgentID.String(), cfg.WorkDir); err != nil { + tracingProxy.Close() _ = setup.gatewayConn.Close() return nil, err } - otlpEndpoint := "http://" + tracingproxy.ListenAddress + codexHomeValue := codexHomeEnv() + mappingStore := codexbridge.NewThreadMappingStore(codexHomeValue) options := []codex.Option{ codex.WithBinary(cfg.AgentBinary), codex.WithWorkDir(cfg.WorkDir), diff --git a/test/e2e/agynd_daemon_test.go b/test/e2e/agynd_daemon_test.go new file mode 100644 index 0000000..e11bc77 --- /dev/null +++ b/test/e2e/agynd_daemon_test.go @@ -0,0 +1,403 @@ +//go:build e2e + +package e2e + +import ( + "context" + "fmt" + "net" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "syscall" + "testing" + "time" + + agentsv1 "github.com/agynio/agynd-cli/.gen/go/agynio/api/agents/v1" + gatewayv1 "github.com/agynio/agynd-cli/.gen/go/agynio/api/gateway/v1" + notificationsv1 "github.com/agynio/agynd-cli/.gen/go/agynio/api/notifications/v1" + runnersv1 "github.com/agynio/agynd-cli/.gen/go/agynio/api/runners/v1" + threadsv1 "github.com/agynio/agynd-cli/.gen/go/agynio/api/threads/v1" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +const ( + agyndE2EAgentID = "550e8400-e29b-41d4-a716-446655440000" + agyndE2EThreadID = "550e8400-e29b-41d4-a716-446655440001" + agyndE2EWorkloadID = "550e8400-e29b-41d4-a716-446655440002" +) + +func TestAgyndBinaryInitializesWithStubGateway(t *testing.T) { + binary := buildAgynd(t) + agentBinary := buildFakeAgnAgent(t) + server := startAgyndGatewayStub(t) + workDir := t.TempDir() + installAgentRuntimeConfig(t, agentBinary) + + cmd := exec.Command(binary) + cmd.Dir = workDir + cmd.Env = append(cleanAgyndEnv(), + "AGENT_ID="+agyndE2EAgentID, + "THREAD_ID="+agyndE2EThreadID, + "WORKLOAD_ID="+agyndE2EWorkloadID, + "GATEWAY_ADDRESS="+server.address, + "TRACING_ADDRESS=127.0.0.1:1", + "LLM_BASE_URL=https://testllm.dev/v1/org/agynio/suite/agn", + "LLM_API_TOKEN=test-token", + "AGN_TOKEN_COUNTING_ADDRESS=127.0.0.1:1", + "HOME="+t.TempDir(), + "WORKSPACE_DIR="+workDir, + ) + + var output strings.Builder + cmd.Stdout = &output + cmd.Stderr = &output + if err := cmd.Start(); err != nil { + t.Fatalf("start agynd: %v", err) + } + + waitCh := make(chan error, 1) + go func() { + waitCh <- cmd.Wait() + }() + + select { + case <-server.runStarted: + case err := <-waitCh: + t.Fatalf("agynd exited before daemon.Run/subscriber startup: %v\noutput:\n%s", err, output.String()) + case <-time.After(10 * time.Second): + terminateProcess(cmd.Process) + <-waitCh + t.Fatalf("agynd did not reach daemon.Run/subscriber startup; calls: %s\noutput:\n%s", server.callsSummary(), output.String()) + } + + if err := cmd.Process.Signal(os.Interrupt); err != nil { + terminateProcess(cmd.Process) + <-waitCh + t.Fatalf("interrupt agynd: %v\noutput:\n%s", err, output.String()) + } + select { + case <-waitCh: + case <-time.After(5 * time.Second): + terminateProcess(cmd.Process) + <-waitCh + t.Fatalf("agynd did not stop after interrupt; output:\n%s", output.String()) + } + + server.assertInitialized(t) + server.assertRunStarted(t) +} + +func terminateProcess(process *os.Process) { + if process == nil { + return + } + _ = process.Signal(os.Interrupt) + time.Sleep(100 * time.Millisecond) + _ = process.Signal(syscall.SIGKILL) +} + +func cleanAgyndEnv() []string { + blocked := map[string]struct{}{ + "AGENT_MCP_SERVERS": {}, + "MCP_PORT": {}, + } + env := os.Environ() + cleaned := make([]string, 0, len(env)) + for _, entry := range env { + key, _, ok := strings.Cut(entry, "=") + if !ok { + continue + } + if _, skip := blocked[key]; skip { + continue + } + cleaned = append(cleaned, entry) + } + return cleaned +} + +func buildAgynd(t *testing.T) string { + t.Helper() + dir := filepath.Join(repoRoot(t), ".tmp-e2e", "agynd-"+strings.ReplaceAll(t.Name(), "/", "-")) + t.Cleanup(func() { _ = os.RemoveAll(dir) }) + if err := os.MkdirAll(dir, 0o700); err != nil { + t.Fatalf("create agynd build dir: %v", err) + } + path := filepath.Join(dir, "agynd") + cmd := exec.Command("go", "build", "-trimpath", "-o", path, "./cmd/agynd") + cmd.Dir = repoRoot(t) + cmd.Env = append(os.Environ(), "CGO_ENABLED=0") + output, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("build agynd: %v\n%s", err, output) + } + return path +} + +func installAgentRuntimeConfig(t *testing.T, agentBinary string) { + t.Helper() + runner := newPrivilegedRunner(t) + runner.run(t, "mkdir", "-p", "/agyn-bin") + + backupPath := filepath.Join(t.TempDir(), "config.json.backup") + hadExisting := fileExists("/agyn-bin/config.json") + if hadExisting { + runner.run(t, "cp", "/agyn-bin/config.json", backupPath) + } + t.Cleanup(func() { + if hadExisting { + runner.run(t, "cp", backupPath, "/agyn-bin/config.json") + return + } + runner.run(t, "rm", "-f", "/agyn-bin/config.json") + }) + + configPath := filepath.Join(t.TempDir(), "config.json") + payload := fmt.Sprintf(`{"sdk":"agn","bin":%q}`, agentBinary) + if err := os.WriteFile(configPath, []byte(payload), 0o600); err != nil { + t.Fatalf("write test config: %v", err) + } + runner.run(t, "cp", configPath, "/agyn-bin/config.json") + runner.run(t, "chmod", "0644", "/agyn-bin/config.json") +} + +type privilegedRunner struct { + sudo string +} + +func newPrivilegedRunner(t *testing.T) privilegedRunner { + t.Helper() + if sudo, err := exec.LookPath("sudo"); err == nil { + return privilegedRunner{sudo: sudo} + } + if os.Geteuid() == 0 { + return privilegedRunner{} + } + t.Fatal("sudo is required to install /agyn-bin/config.json for agynd e2e") + return privilegedRunner{} +} + +func (r privilegedRunner) run(t *testing.T, name string, args ...string) { + t.Helper() + cmdArgs := args + if r.sudo != "" { + cmdArgs = append([]string{name}, args...) + name = r.sudo + } + if output, err := exec.Command(name, cmdArgs...).CombinedOutput(); err != nil { + t.Fatalf("run %s %s: %v\n%s", name, strings.Join(cmdArgs, " "), err, output) + } +} + +func fileExists(path string) bool { + _, err := os.Stat(path) + return err == nil +} + +func buildFakeAgnAgent(t *testing.T) string { + t.Helper() + path := filepath.Join(t.TempDir(), "fake-agn") + source := filepath.Join(t.TempDir(), "main.go") + if err := os.WriteFile(source, []byte(fakeAgnAgentSource), 0o600); err != nil { + t.Fatalf("write fake agn source: %v", err) + } + cmd := exec.Command("go", "build", "-trimpath", "-o", path, source) + cmd.Env = append(os.Environ(), "CGO_ENABLED=0") + output, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("build fake agn agent: %v\n%s", err, output) + } + return path +} + +const fakeAgnAgentSource = "package main\n\nimport (\n\t\"bufio\"\n\t\"encoding/json\"\n\t\"fmt\"\n\t\"os\"\n)\n\ntype request struct {\n\tJSONRPC string `json:\"jsonrpc\"`\n\tID json.RawMessage `json:\"id\"`\n\tMethod string `json:\"method\"`\n\tParams json.RawMessage `json:\"params\"`\n}\n\ntype response struct {\n\tJSONRPC string `json:\"jsonrpc\"`\n\tID json.RawMessage `json:\"id\"`\n\tResult json.RawMessage `json:\"result\"`\n}\n\nfunc main() {\n\tif len(os.Args) < 2 || os.Args[1] != \"serve\" {\n\t\tos.Exit(2)\n\t}\n\tscanner := bufio.NewScanner(os.Stdin)\n\tfor scanner.Scan() {\n\t\tvar req request\n\t\tif err := json.Unmarshal(scanner.Bytes(), &req); err != nil {\n\t\t\tcontinue\n\t\t}\n\t\tresult := json.RawMessage(`{\"thread_id\":\"thread-from-fake-agent\",\"response\":\"ok\"}`)\n\t\tif req.Method == \"thread/list\" {\n\t\t\tresult = json.RawMessage(`{\"data\":[],\"next_cursor\":null}`)\n\t\t}\n\t\tpayload, _ := json.Marshal(response{JSONRPC: \"2.0\", ID: req.ID, Result: result})\n\t\tfmt.Println(string(payload))\n\t}\n}\n" + +func repoRoot(t *testing.T) string { + t.Helper() + wd, err := os.Getwd() + if err != nil { + t.Fatalf("get working directory: %v", err) + } + return filepath.Clean(filepath.Join(wd, "..", "..")) +} + +type agyndGatewayStub struct { + address string + server *grpc.Server + runStarted chan struct{} + runOnce sync.Once + + mu sync.Mutex + getAgentCalls int + listSkillsCalls int + listMCPsCalls int + listInitScriptsCalls int +} + +type agyndAgentsGatewayStub struct { + gatewayv1.UnimplementedAgentsGatewayServer + *agyndGatewayStub +} + +type agyndThreadsGatewayStub struct { + gatewayv1.UnimplementedThreadsGatewayServer + *agyndGatewayStub +} + +type agyndNotificationsGatewayStub struct { + gatewayv1.UnimplementedNotificationsGatewayServer + *agyndGatewayStub +} + +type agyndRunnersGatewayStub struct { + gatewayv1.UnimplementedRunnersGatewayServer + *agyndGatewayStub +} + +func startAgyndGatewayStub(t *testing.T) *agyndGatewayStub { + t.Helper() + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen gateway stub: %v", err) + } + stub := &agyndGatewayStub{address: listener.Addr().String(), runStarted: make(chan struct{})} + stub.server = grpc.NewServer() + gatewayv1.RegisterAgentsGatewayServer(stub.server, agyndAgentsGatewayStub{agyndGatewayStub: stub}) + gatewayv1.RegisterThreadsGatewayServer(stub.server, agyndThreadsGatewayStub{agyndGatewayStub: stub}) + gatewayv1.RegisterNotificationsGatewayServer(stub.server, agyndNotificationsGatewayStub{agyndGatewayStub: stub}) + gatewayv1.RegisterRunnersGatewayServer(stub.server, agyndRunnersGatewayStub{agyndGatewayStub: stub}) + + serveErr := make(chan error, 1) + go func() { + serveErr <- stub.server.Serve(listener) + }() + t.Cleanup(func() { + stub.server.Stop() + _ = listener.Close() + select { + case err := <-serveErr: + if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { + t.Errorf("gateway stub stopped with error: %v", err) + } + default: + } + }) + return stub +} + +func (s agyndAgentsGatewayStub) GetAgent(_ context.Context, req *agentsv1.GetAgentRequest) (*agentsv1.GetAgentResponse, error) { + if req.GetId() != agyndE2EAgentID { + return nil, status.Errorf(codes.InvalidArgument, "unexpected agent id %q", req.GetId()) + } + s.mu.Lock() + s.getAgentCalls++ + s.mu.Unlock() + return &agentsv1.GetAgentResponse{Agent: &agentsv1.Agent{ + Meta: &agentsv1.EntityMeta{Id: agyndE2EAgentID}, + Name: "agynd-e2e-agent", + Model: "simple-hello", + }}, nil +} + +func (s agyndAgentsGatewayStub) ListSkills(_ context.Context, req *agentsv1.ListSkillsRequest) (*agentsv1.ListSkillsResponse, error) { + if req.GetAgentId() != agyndE2EAgentID { + return nil, status.Errorf(codes.InvalidArgument, "unexpected skills agent id %q", req.GetAgentId()) + } + s.mu.Lock() + s.listSkillsCalls++ + s.mu.Unlock() + return &agentsv1.ListSkillsResponse{}, nil +} + +func (s agyndAgentsGatewayStub) ListMcps(_ context.Context, req *agentsv1.ListMcpsRequest) (*agentsv1.ListMcpsResponse, error) { + if req.GetAgentId() != agyndE2EAgentID { + return nil, status.Errorf(codes.InvalidArgument, "unexpected mcps agent id %q", req.GetAgentId()) + } + s.mu.Lock() + s.listMCPsCalls++ + s.mu.Unlock() + return &agentsv1.ListMcpsResponse{}, nil +} + +func (s agyndAgentsGatewayStub) ListInitScripts(_ context.Context, req *agentsv1.ListInitScriptsRequest) (*agentsv1.ListInitScriptsResponse, error) { + if req.GetAgentId() != agyndE2EAgentID { + return nil, status.Errorf(codes.InvalidArgument, "unexpected init scripts agent id %q", req.GetAgentId()) + } + s.mu.Lock() + s.listInitScriptsCalls++ + s.mu.Unlock() + return &agentsv1.ListInitScriptsResponse{}, nil +} + +func (s *agyndGatewayStub) markRunStarted() { + s.runOnce.Do(func() { close(s.runStarted) }) +} + +func (s *agyndGatewayStub) waitRunStarted(timeout time.Duration) error { + select { + case <-s.runStarted: + return nil + case <-time.After(timeout): + return fmt.Errorf("timed out waiting for Subscribe; calls: %s", s.callsSummary()) + } +} + +func (s agyndThreadsGatewayStub) GetUnackedMessages(context.Context, *threadsv1.GetUnackedMessagesRequest) (*threadsv1.GetUnackedMessagesResponse, error) { + return &threadsv1.GetUnackedMessagesResponse{}, nil +} + +func (s agyndNotificationsGatewayStub) Subscribe(_ *notificationsv1.SubscribeRequest, stream grpc.ServerStreamingServer[notificationsv1.SubscribeResponse]) error { + s.markRunStarted() + <-stream.Context().Done() + return stream.Context().Err() +} + +func (s agyndRunnersGatewayStub) TouchWorkload(context.Context, *runnersv1.TouchWorkloadRequest) (*runnersv1.TouchWorkloadResponse, error) { + return &runnersv1.TouchWorkloadResponse{}, nil +} + +func (s *agyndGatewayStub) assertInitialized(t *testing.T) { + t.Helper() + s.mu.Lock() + defer s.mu.Unlock() + checks := map[string]int{ + "GetAgent": s.getAgentCalls, + "ListSkills": s.listSkillsCalls, + "ListMcps": s.listMCPsCalls, + "ListInitScripts": s.listInitScriptsCalls, + } + for name, calls := range checks { + if calls == 0 { + t.Fatalf("expected agynd to call %s during initialization; calls: %s", name, formatCalls(checks)) + } + } +} + +func (s *agyndGatewayStub) assertRunStarted(t *testing.T) { + t.Helper() + select { + case <-s.runStarted: + default: + t.Fatal("expected agynd to enter daemon.Run and subscribe to notifications") + } +} + +func (s *agyndGatewayStub) callsSummary() string { + s.mu.Lock() + defer s.mu.Unlock() + return formatCalls(map[string]int{ + "GetAgent": s.getAgentCalls, + "ListSkills": s.listSkillsCalls, + "ListMcps": s.listMCPsCalls, + "ListInitScripts": s.listInitScriptsCalls, + }) +} + +func formatCalls(calls map[string]int) string { + return fmt.Sprintf("GetAgent=%d ListSkills=%d ListMcps=%d ListInitScripts=%d", calls["GetAgent"], calls["ListSkills"], calls["ListMcps"], calls["ListInitScripts"]) +}