Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

## [Unreleased]

### Phase 6 — streaming agent loop (2026-06-14)

Phase 6 **AI-037, slice a** — the loop streams its steps so the reader can watch the agent work. The SSE endpoint + run persistence + `GET` are slice b.

- **`AgentLoop.StreamAsync`** — yields an `AgentEvent` per step as it happens (`llm_response` / `tool_result`), then a terminal Done event carrying the final `AgentResult`. `RunAsync` is now built on top of it (collects to the Done result), so non-streaming callers (the AI-039 eval) are unchanged — behaviour identical, budget exhaustion still throws with its transcript after the partial steps have streamed.
- **`AgentEvent`** (`Ai.Agents`) — a step-or-result union (`OfStep`/`Done`).
- **`StudyBuddyAgent.StreamAsync`** — same prompt/tools/budget as `RunAsync`, streamed; both share one `BuildAgentInput`.
- Tests: `AgentLoop.StreamAsync` (step events in order → terminal Done; budget exhaustion streams partial steps then throws, no Done); `StudyBuddyAgent.StreamAsync` (config threading → step + Done). Full suite green (270).

### Phase 6 — agent run persistence (2026-06-14)

Phase 6 **AI-036** — agent runs are saved so the reader UI can replay an agent's steps (AI-038) and runs are observable. Persistence mechanics only; the endpoint that calls it is AI-037.
Expand Down
49 changes: 45 additions & 4 deletions backend/src/Ai/TextStack.Ai.Agents/AgentLoop.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Text.Json;
using TextStack.Ai.Core;
using TextStack.Ai.Tools;

namespace TextStack.Ai.Agents;

/// <summary>
/// One event from a streamed agent run (AI-037a): either a recorded <see cref="Step"/> as it happens,
/// or the terminal <see cref="Result"/>. Exactly one is non-null.
/// </summary>
public sealed record AgentEvent(AgentStep? Step, AgentResult<string>? Result)
{
public static AgentEvent OfStep(AgentStep step) => new(step, null);
public static AgentEvent Done(AgentResult<string> result) => new(null, result);
}

/// <summary>
/// The hand-rolled plan → act → observe loop (Phase 6, AI-034) every concrete agent runs on. Each
/// iteration: ask the model (with the agent's allowed tool schemas); if it answers without a tool
Expand All @@ -18,8 +29,32 @@ namespace TextStack.Ai.Agents;
/// </summary>
public sealed class AgentLoop(ILlmService llm, IToolRegistry tools, ToolDispatcher dispatcher)
{
/// <summary>
/// Runs the loop to completion and returns the final result. Built on <see cref="StreamAsync"/> —
/// for non-streaming callers (e.g. the eval, AI-039). Budget exhaustion still throws
/// <see cref="AgentBudgetExhaustedException"/> (with its transcript).
/// </summary>
public async Task<AgentResult<string>> RunAsync(
AgentInput input, AgentContext ctx, AgentLoopOptions options, CancellationToken ct)
{
await foreach (var e in StreamAsync(input, ctx, options, ct))
{
if (e.Result is { } result)
return result;
}
// Unreachable: the loop either yields a Done event or throws on budget exhaustion.
throw new InvalidOperationException("Agent stream ended without a result.");
}

/// <summary>
/// Streams the run step-by-step (AI-037a): an <see cref="AgentEvent"/> per recorded step as it
/// happens, then a terminal Done event carrying the final <see cref="AgentResult{T}"/>. The
/// endpoint (AI-037b) maps these to SSE so the reader sees the agent's progress. Budget exhaustion
/// throws after the partial steps have already streamed.
/// </summary>
public async IAsyncEnumerable<AgentEvent> StreamAsync(
AgentInput input, AgentContext ctx, AgentLoopOptions options,
[EnumeratorCancellation] CancellationToken ct)
{
var sw = Stopwatch.StartNew();
var steps = new List<AgentStep>();
Expand All @@ -42,15 +77,19 @@ public async Task<AgentResult<string>> RunAsync(
inputTokens += response.Usage.InputTokens;
outputTokens += response.Usage.OutputTokens;
cost += response.Usage.CostUsd;
steps.Add(Step(iteration, "llm_response", SerializeResponse(response)));

var llmStep = Step(iteration, "llm_response", SerializeResponse(response));
steps.Add(llmStep);
yield return AgentEvent.OfStep(llmStep);

// No tool calls → the model has produced its final answer.
if (response.ToolCalls.Count == 0)
{
sw.Stop();
return new AgentResult<string>(
yield return AgentEvent.Done(new AgentResult<string>(
response.Text, steps,
new AgentUsage(iteration + 1, inputTokens, outputTokens, cost, (int)sw.ElapsedMilliseconds));
new AgentUsage(iteration + 1, inputTokens, outputTokens, cost, (int)sw.ElapsedMilliseconds)));
yield break;
}

// The assistant's tool-call turn must precede the tool results (OpenAI message ordering).
Expand All @@ -61,7 +100,9 @@ public async Task<AgentResult<string>> RunAsync(
{
var call = response.ToolCalls[j];
var result = results[j];
steps.Add(Step(iteration, "tool_result", SerializeResult(call, result)));
var toolStep = Step(iteration, "tool_result", SerializeResult(call, result));
steps.Add(toolStep);
yield return AgentEvent.OfStep(toolStep);
// Errors come back as data (ToolCallingSession serialization) so the model can recover.
messages.Add(new LlmMessage("tool", ToolCallingSession.SerializeResult(result), [call]));
}
Expand Down
17 changes: 8 additions & 9 deletions backend/src/Application/Agents/StudyBuddyAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,15 @@ public sealed class StudyBuddyAgent(AgentLoop loop) : IAgent<StudyBuddyInput, st
"get_user_highlights",
];

public Task<AgentResult<string>> RunAsync(StudyBuddyInput input, AgentContext ctx, CancellationToken ct)
{
var agentInput = new AgentInput(
UserGoal: BuildGoal(input),
SystemPrompt: SystemPrompt,
AllowedTools: AllowedTools,
FeatureTag: FeatureTag);
public Task<AgentResult<string>> RunAsync(StudyBuddyInput input, AgentContext ctx, CancellationToken ct) =>
loop.RunAsync(BuildAgentInput(input), ctx, Options, ct);

return loop.RunAsync(agentInput, ctx, Options, ct);
}
/// <summary>Streams the run step-by-step (AI-037a) for the SSE endpoint — same config as <see cref="RunAsync"/>.</summary>
public IAsyncEnumerable<AgentEvent> StreamAsync(StudyBuddyInput input, AgentContext ctx, CancellationToken ct) =>
loop.StreamAsync(BuildAgentInput(input), ctx, Options, ct);

private static AgentInput BuildAgentInput(StudyBuddyInput input) =>
new(UserGoal: BuildGoal(input), SystemPrompt: SystemPrompt, AllowedTools: AllowedTools, FeatureTag: FeatureTag);

private static string BuildGoal(StudyBuddyInput input)
{
Expand Down
40 changes: 40 additions & 0 deletions tests/TextStack.UnitTests/AgentLoopTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,46 @@ await Assert.ThrowsAsync<AgentBudgetExhaustedException>(() =>
TestContext.Current.CancellationToken));
}

[Fact]
public async Task Stream_EmitsStepEventsThenTerminalDone()
{
var llm = new ScriptedLlm(
[Call(args: """{"x":1}""")], // turn 1: tool
["Final."]); // turn 2: answer
var events = new List<AgentEvent>();
await foreach (var e in Loop(llm, new EchoTool()).StreamAsync(
Input(), Ctx(), new AgentLoopOptions(), TestContext.Current.CancellationToken))
{
events.Add(e);
}

// step (llm) → step (tool) → step (llm) → done; exactly one terminal result.
Assert.Equal(3, events.Count(e => e.Step is not null));
var done = Assert.Single(events.Where(e => e.Result is not null));
Assert.Equal("Final.", done.Result!.Output);
Assert.Equal(2, done.Result.Usage.Iterations);
Assert.Same(events[^1], done); // done is the last event
}

[Fact]
public async Task Stream_BudgetExhausted_StreamsPartialSteps_ThenThrows()
{
var llm = new ScriptedLlm([Call()], [Call()], [Call()]); // never answers
var streamed = new List<AgentEvent>();

await Assert.ThrowsAsync<AgentBudgetExhaustedException>(async () =>
{
await foreach (var e in Loop(llm, new EchoTool()).StreamAsync(
Input(), Ctx(), new AgentLoopOptions(MaxSteps: 3), TestContext.Current.CancellationToken))
{
streamed.Add(e);
}
});

Assert.NotEmpty(streamed); // partial steps reached the consumer
Assert.All(streamed, e => Assert.Null(e.Result)); // no Done event on exhaustion
}

[Fact]
public async Task Run_UnknownTool_FedBackAsData_LoopRecovers()
{
Expand Down
20 changes: 20 additions & 0 deletions tests/TextStack.UnitTests/StudyBuddyAgentTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,26 @@ public async Task Run_FeedsPromptAndPassageGoal_ReturnsAnswer()
Assert.Contains("Chapter 3", request.Messages[0].Content); // chapter threaded into the goal
}

[Fact]
public async Task Stream_FeedsSameConfig_EmitsStepThenDone()
{
var llm = new FixedLlm("Streamed explanation.");
var agent = new StudyBuddyAgent(Loop(llm));

var events = new List<AgentEvent>();
await foreach (var e in agent.StreamAsync(
new StudyBuddyInput("A confusing passage.", Guid.NewGuid(), ChapterNumber: 2),
Ctx(), TestContext.Current.CancellationToken))
{
events.Add(e);
}

var done = Assert.Single(events.Where(e => e.Result is not null));
Assert.Equal("Streamed explanation.", done.Result!.Output);
Assert.Equal(StudyBuddyAgent.SystemPrompt, llm.Requests[0].SystemPrompt);
Assert.Contains("Chapter 2", llm.Requests[0].Messages[0].Content);
}

[Fact]
public async Task Run_NoChapter_GoalOmitsChapter()
{
Expand Down
Loading