From 201c557cf6f43f6e975b016745c48eaff3772ef5 Mon Sep 17 00:00:00 2001 From: Vasyl Vdovychenko Date: Sun, 14 Jun 2026 19:41:32 -0400 Subject: [PATCH] feat(ai): streaming agent loop (AI-037a) AgentLoop.StreamAsync yields an AgentEvent per step as it happens, then a terminal Done event with the final AgentResult; RunAsync is rebuilt on top (behaviour identical, budget exhaustion still throws with its transcript). StudyBuddyAgent.StreamAsync streams with the same config. The SSE endpoint + persistence + GET run are AI-037b. Co-Authored-By: Claude Opus 4.8 --- CHANGELOG.md | 9 ++++ .../src/Ai/TextStack.Ai.Agents/AgentLoop.cs | 49 +++++++++++++++++-- .../src/Application/Agents/StudyBuddyAgent.cs | 17 +++---- tests/TextStack.UnitTests/AgentLoopTests.cs | 40 +++++++++++++++ .../StudyBuddyAgentTests.cs | 20 ++++++++ 5 files changed, 122 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4fcf7b9a..7688486d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,15 @@ ## [Unreleased] +### Phase 6 — streaming agent loop (2026-06-14) + +Phase 6 **AI-037, slice a** — the loop streams its steps so the reader can watch the agent work. The SSE endpoint + run persistence + `GET` are slice b. + +- **`AgentLoop.StreamAsync`** — yields an `AgentEvent` per step as it happens (`llm_response` / `tool_result`), then a terminal Done event carrying the final `AgentResult`. `RunAsync` is now built on top of it (collects to the Done result), so non-streaming callers (the AI-039 eval) are unchanged — behaviour identical, budget exhaustion still throws with its transcript after the partial steps have streamed. +- **`AgentEvent`** (`Ai.Agents`) — a step-or-result union (`OfStep`/`Done`). +- **`StudyBuddyAgent.StreamAsync`** — same prompt/tools/budget as `RunAsync`, streamed; both share one `BuildAgentInput`. +- Tests: `AgentLoop.StreamAsync` (step events in order → terminal Done; budget exhaustion streams partial steps then throws, no Done); `StudyBuddyAgent.StreamAsync` (config threading → step + Done). Full suite green (270). + ### Phase 6 — agent run persistence (2026-06-14) Phase 6 **AI-036** — agent runs are saved so the reader UI can replay an agent's steps (AI-038) and runs are observable. Persistence mechanics only; the endpoint that calls it is AI-037. diff --git a/backend/src/Ai/TextStack.Ai.Agents/AgentLoop.cs b/backend/src/Ai/TextStack.Ai.Agents/AgentLoop.cs index c18f86de..0a4b60f9 100644 --- a/backend/src/Ai/TextStack.Ai.Agents/AgentLoop.cs +++ b/backend/src/Ai/TextStack.Ai.Agents/AgentLoop.cs @@ -1,10 +1,21 @@ using System.Diagnostics; +using System.Runtime.CompilerServices; using System.Text.Json; using TextStack.Ai.Core; using TextStack.Ai.Tools; namespace TextStack.Ai.Agents; +/// +/// One event from a streamed agent run (AI-037a): either a recorded as it happens, +/// or the terminal . Exactly one is non-null. +/// +public sealed record AgentEvent(AgentStep? Step, AgentResult? Result) +{ + public static AgentEvent OfStep(AgentStep step) => new(step, null); + public static AgentEvent Done(AgentResult result) => new(null, result); +} + /// /// The hand-rolled plan → act → observe loop (Phase 6, AI-034) every concrete agent runs on. Each /// iteration: ask the model (with the agent's allowed tool schemas); if it answers without a tool @@ -18,8 +29,32 @@ namespace TextStack.Ai.Agents; /// public sealed class AgentLoop(ILlmService llm, IToolRegistry tools, ToolDispatcher dispatcher) { + /// + /// Runs the loop to completion and returns the final result. Built on — + /// for non-streaming callers (e.g. the eval, AI-039). Budget exhaustion still throws + /// (with its transcript). + /// public async Task> RunAsync( AgentInput input, AgentContext ctx, AgentLoopOptions options, CancellationToken ct) + { + await foreach (var e in StreamAsync(input, ctx, options, ct)) + { + if (e.Result is { } result) + return result; + } + // Unreachable: the loop either yields a Done event or throws on budget exhaustion. + throw new InvalidOperationException("Agent stream ended without a result."); + } + + /// + /// Streams the run step-by-step (AI-037a): an per recorded step as it + /// happens, then a terminal Done event carrying the final . The + /// endpoint (AI-037b) maps these to SSE so the reader sees the agent's progress. Budget exhaustion + /// throws after the partial steps have already streamed. + /// + public async IAsyncEnumerable StreamAsync( + AgentInput input, AgentContext ctx, AgentLoopOptions options, + [EnumeratorCancellation] CancellationToken ct) { var sw = Stopwatch.StartNew(); var steps = new List(); @@ -42,15 +77,19 @@ public async Task> RunAsync( inputTokens += response.Usage.InputTokens; outputTokens += response.Usage.OutputTokens; cost += response.Usage.CostUsd; - steps.Add(Step(iteration, "llm_response", SerializeResponse(response))); + + var llmStep = Step(iteration, "llm_response", SerializeResponse(response)); + steps.Add(llmStep); + yield return AgentEvent.OfStep(llmStep); // No tool calls → the model has produced its final answer. if (response.ToolCalls.Count == 0) { sw.Stop(); - return new AgentResult( + yield return AgentEvent.Done(new AgentResult( response.Text, steps, - new AgentUsage(iteration + 1, inputTokens, outputTokens, cost, (int)sw.ElapsedMilliseconds)); + new AgentUsage(iteration + 1, inputTokens, outputTokens, cost, (int)sw.ElapsedMilliseconds))); + yield break; } // The assistant's tool-call turn must precede the tool results (OpenAI message ordering). @@ -61,7 +100,9 @@ public async Task> RunAsync( { var call = response.ToolCalls[j]; var result = results[j]; - steps.Add(Step(iteration, "tool_result", SerializeResult(call, result))); + var toolStep = Step(iteration, "tool_result", SerializeResult(call, result)); + steps.Add(toolStep); + yield return AgentEvent.OfStep(toolStep); // Errors come back as data (ToolCallingSession serialization) so the model can recover. messages.Add(new LlmMessage("tool", ToolCallingSession.SerializeResult(result), [call])); } diff --git a/backend/src/Application/Agents/StudyBuddyAgent.cs b/backend/src/Application/Agents/StudyBuddyAgent.cs index ec2e1c8b..3a91b2aa 100644 --- a/backend/src/Application/Agents/StudyBuddyAgent.cs +++ b/backend/src/Application/Agents/StudyBuddyAgent.cs @@ -31,16 +31,15 @@ public sealed class StudyBuddyAgent(AgentLoop loop) : IAgent> RunAsync(StudyBuddyInput input, AgentContext ctx, CancellationToken ct) - { - var agentInput = new AgentInput( - UserGoal: BuildGoal(input), - SystemPrompt: SystemPrompt, - AllowedTools: AllowedTools, - FeatureTag: FeatureTag); + public Task> RunAsync(StudyBuddyInput input, AgentContext ctx, CancellationToken ct) => + loop.RunAsync(BuildAgentInput(input), ctx, Options, ct); - return loop.RunAsync(agentInput, ctx, Options, ct); - } + /// Streams the run step-by-step (AI-037a) for the SSE endpoint — same config as . + public IAsyncEnumerable StreamAsync(StudyBuddyInput input, AgentContext ctx, CancellationToken ct) => + loop.StreamAsync(BuildAgentInput(input), ctx, Options, ct); + + private static AgentInput BuildAgentInput(StudyBuddyInput input) => + new(UserGoal: BuildGoal(input), SystemPrompt: SystemPrompt, AllowedTools: AllowedTools, FeatureTag: FeatureTag); private static string BuildGoal(StudyBuddyInput input) { diff --git a/tests/TextStack.UnitTests/AgentLoopTests.cs b/tests/TextStack.UnitTests/AgentLoopTests.cs index c9a2a956..84410cf0 100644 --- a/tests/TextStack.UnitTests/AgentLoopTests.cs +++ b/tests/TextStack.UnitTests/AgentLoopTests.cs @@ -133,6 +133,46 @@ await Assert.ThrowsAsync(() => TestContext.Current.CancellationToken)); } + [Fact] + public async Task Stream_EmitsStepEventsThenTerminalDone() + { + var llm = new ScriptedLlm( + [Call(args: """{"x":1}""")], // turn 1: tool + ["Final."]); // turn 2: answer + var events = new List(); + await foreach (var e in Loop(llm, new EchoTool()).StreamAsync( + Input(), Ctx(), new AgentLoopOptions(), TestContext.Current.CancellationToken)) + { + events.Add(e); + } + + // step (llm) → step (tool) → step (llm) → done; exactly one terminal result. + Assert.Equal(3, events.Count(e => e.Step is not null)); + var done = Assert.Single(events.Where(e => e.Result is not null)); + Assert.Equal("Final.", done.Result!.Output); + Assert.Equal(2, done.Result.Usage.Iterations); + Assert.Same(events[^1], done); // done is the last event + } + + [Fact] + public async Task Stream_BudgetExhausted_StreamsPartialSteps_ThenThrows() + { + var llm = new ScriptedLlm([Call()], [Call()], [Call()]); // never answers + var streamed = new List(); + + await Assert.ThrowsAsync(async () => + { + await foreach (var e in Loop(llm, new EchoTool()).StreamAsync( + Input(), Ctx(), new AgentLoopOptions(MaxSteps: 3), TestContext.Current.CancellationToken)) + { + streamed.Add(e); + } + }); + + Assert.NotEmpty(streamed); // partial steps reached the consumer + Assert.All(streamed, e => Assert.Null(e.Result)); // no Done event on exhaustion + } + [Fact] public async Task Run_UnknownTool_FedBackAsData_LoopRecovers() { diff --git a/tests/TextStack.UnitTests/StudyBuddyAgentTests.cs b/tests/TextStack.UnitTests/StudyBuddyAgentTests.cs index 5b0003e3..769e289b 100644 --- a/tests/TextStack.UnitTests/StudyBuddyAgentTests.cs +++ b/tests/TextStack.UnitTests/StudyBuddyAgentTests.cs @@ -57,6 +57,26 @@ public async Task Run_FeedsPromptAndPassageGoal_ReturnsAnswer() Assert.Contains("Chapter 3", request.Messages[0].Content); // chapter threaded into the goal } + [Fact] + public async Task Stream_FeedsSameConfig_EmitsStepThenDone() + { + var llm = new FixedLlm("Streamed explanation."); + var agent = new StudyBuddyAgent(Loop(llm)); + + var events = new List(); + await foreach (var e in agent.StreamAsync( + new StudyBuddyInput("A confusing passage.", Guid.NewGuid(), ChapterNumber: 2), + Ctx(), TestContext.Current.CancellationToken)) + { + events.Add(e); + } + + var done = Assert.Single(events.Where(e => e.Result is not null)); + Assert.Equal("Streamed explanation.", done.Result!.Output); + Assert.Equal(StudyBuddyAgent.SystemPrompt, llm.Requests[0].SystemPrompt); + Assert.Contains("Chapter 2", llm.Requests[0].Messages[0].Content); + } + [Fact] public async Task Run_NoChapter_GoalOmitsChapter() {