diff --git a/src/Aevatar.AI.ToolProviders.Workflow/Tools/WorkflowStatusTool.cs b/src/Aevatar.AI.ToolProviders.Workflow/Tools/WorkflowStatusTool.cs index d1f613456..381380d5a 100644 --- a/src/Aevatar.AI.ToolProviders.Workflow/Tools/WorkflowStatusTool.cs +++ b/src/Aevatar.AI.ToolProviders.Workflow/Tools/WorkflowStatusTool.cs @@ -79,8 +79,8 @@ public async Task ExecuteAsync(string argumentsJson, CancellationToken c return action switch { "list" => ListWorkflows(), - "catalog" => ListCatalog(), - "detail" => GetDetail(args), + "catalog" => await ListCatalogAsync(ct), + "detail" => await GetDetailAsync(args, ct), "timeline" => await GetTimelineAsync(args, ct), _ => await GetStatusAsync(args, ct), }; @@ -98,9 +98,9 @@ private string ListWorkflows() return JsonSerializer.Serialize(new { workflows, count = workflows.Count }, s_json); } - private string ListCatalog() + private async Task ListCatalogAsync(CancellationToken ct) { - var catalog = _queryService.ListWorkflowCatalog(); + var catalog = await _queryService.ListWorkflowCatalogAsync(ct); var items = catalog.Select(c => new { name = c.Name, description = c.Description, category = c.Category, @@ -109,13 +109,13 @@ private string ListCatalog() return JsonSerializer.Serialize(new { workflows = items, count = items.Length }, s_json); } - private string GetDetail(ToolArgs args) + private async Task GetDetailAsync(ToolArgs args, CancellationToken ct) { var name = args.Str("workflow_name"); if (string.IsNullOrWhiteSpace(name)) return """{"error":"'workflow_name' is required for 'detail' action"}"""; - var detail = _queryService.GetWorkflowDetail(name); + var detail = await _queryService.GetWorkflowDetailAsync(name, ct); if (detail == null) return JsonSerializer.Serialize(new { error = $"Workflow '{name}' not found" }); diff --git a/src/workflow/Aevatar.Workflow.Application.Abstractions/Queries/IWorkflowCapabilitiesPort.cs b/src/workflow/Aevatar.Workflow.Application.Abstractions/Queries/IWorkflowCapabilitiesPort.cs index a8ce1519a..57fce0ae4 100644 --- a/src/workflow/Aevatar.Workflow.Application.Abstractions/Queries/IWorkflowCapabilitiesPort.cs +++ b/src/workflow/Aevatar.Workflow.Application.Abstractions/Queries/IWorkflowCapabilitiesPort.cs @@ -2,5 +2,5 @@ namespace Aevatar.Workflow.Application.Abstractions.Queries; public interface IWorkflowCapabilitiesPort { - WorkflowCapabilitiesDocument GetCapabilities(); + Task GetCapabilitiesAsync(CancellationToken ct = default); } diff --git a/src/workflow/Aevatar.Workflow.Application.Abstractions/Queries/IWorkflowCatalogPort.cs b/src/workflow/Aevatar.Workflow.Application.Abstractions/Queries/IWorkflowCatalogPort.cs index 5a1975f31..250517963 100644 --- a/src/workflow/Aevatar.Workflow.Application.Abstractions/Queries/IWorkflowCatalogPort.cs +++ b/src/workflow/Aevatar.Workflow.Application.Abstractions/Queries/IWorkflowCatalogPort.cs @@ -2,7 +2,10 @@ namespace Aevatar.Workflow.Application.Abstractions.Queries; public interface IWorkflowCatalogPort { - IReadOnlyList ListWorkflowCatalog(); + // Refactor (iter56/cluster-920-workflow-catalog-async-query): old=sync catalog query, new=async end-to-end + // Catalog and capability query ports expose Task-returning methods so readmodel readers are awaited. + // HTTP, WebSocket, and tool callers pass cancellation tokens through this single query seam. + Task> ListWorkflowCatalogAsync(CancellationToken ct = default); - WorkflowCatalogItemDetail? GetWorkflowDetail(string workflowName); + Task GetWorkflowDetailAsync(string workflowName, CancellationToken ct = default); } diff --git a/src/workflow/Aevatar.Workflow.Application.Abstractions/Queries/IWorkflowExecutionQueryApplicationService.cs b/src/workflow/Aevatar.Workflow.Application.Abstractions/Queries/IWorkflowExecutionQueryApplicationService.cs index 5a1a9affd..2e3fb0133 100644 --- a/src/workflow/Aevatar.Workflow.Application.Abstractions/Queries/IWorkflowExecutionQueryApplicationService.cs +++ b/src/workflow/Aevatar.Workflow.Application.Abstractions/Queries/IWorkflowExecutionQueryApplicationService.cs @@ -8,11 +8,11 @@ public interface IWorkflowExecutionQueryApplicationService IReadOnlyList ListWorkflows(); - IReadOnlyList ListWorkflowCatalog(); + Task> ListWorkflowCatalogAsync(CancellationToken ct = default); - WorkflowCatalogItemDetail? GetWorkflowDetail(string workflowName); + Task GetWorkflowDetailAsync(string workflowName, CancellationToken ct = default); - WorkflowCapabilitiesDocument GetCapabilities(); + Task GetCapabilitiesAsync(CancellationToken ct = default); Task GetActorSnapshotAsync(string actorId, CancellationToken ct = default); diff --git a/src/workflow/Aevatar.Workflow.Application/Queries/RegistryBackedWorkflowCatalogPort.cs b/src/workflow/Aevatar.Workflow.Application/Queries/RegistryBackedWorkflowCatalogPort.cs index 14185c661..8ce02e260 100644 --- a/src/workflow/Aevatar.Workflow.Application/Queries/RegistryBackedWorkflowCatalogPort.cs +++ b/src/workflow/Aevatar.Workflow.Application/Queries/RegistryBackedWorkflowCatalogPort.cs @@ -12,9 +12,10 @@ public RegistryBackedWorkflowCatalogPort(IWorkflowDefinitionCatalog workflowRegi _workflowRegistry = workflowRegistry; } - public IReadOnlyList ListWorkflowCatalog() + public Task> ListWorkflowCatalogAsync(CancellationToken ct = default) { - return _workflowRegistry.GetNames() + ct.ThrowIfCancellationRequested(); + IReadOnlyList catalog = _workflowRegistry.GetNames() .OrderBy(name => name, StringComparer.OrdinalIgnoreCase) .Select(name => new WorkflowCatalogItem { @@ -26,19 +27,23 @@ public IReadOnlyList ListWorkflowCatalog() ShowInLibrary = true, }) .ToList(); + return Task.FromResult(catalog); } - public WorkflowCatalogItemDetail? GetWorkflowDetail(string workflowName) + public Task GetWorkflowDetailAsync( + string workflowName, + CancellationToken ct = default) { + ct.ThrowIfCancellationRequested(); if (string.IsNullOrWhiteSpace(workflowName)) - return null; + return Task.FromResult(null); var normalizedName = workflowName.Trim(); var yaml = _workflowRegistry.GetYaml(normalizedName); if (string.IsNullOrWhiteSpace(yaml)) - return null; + return Task.FromResult(null); - return new WorkflowCatalogItemDetail + return Task.FromResult(new WorkflowCatalogItemDetail { Catalog = new WorkflowCatalogItem { @@ -50,12 +55,13 @@ public IReadOnlyList ListWorkflowCatalog() ShowInLibrary = true, }, Yaml = yaml, - }; + }); } - public WorkflowCapabilitiesDocument GetCapabilities() + public Task GetCapabilitiesAsync(CancellationToken ct = default) { - return new WorkflowCapabilitiesDocument + ct.ThrowIfCancellationRequested(); + return Task.FromResult(new WorkflowCapabilitiesDocument { SchemaVersion = "capabilities.v1", Workflows = _workflowRegistry.GetNames() @@ -66,6 +72,6 @@ public WorkflowCapabilitiesDocument GetCapabilities() Source = "builtin", }) .ToList(), - }; + }); } } diff --git a/src/workflow/Aevatar.Workflow.Application/Queries/WorkflowExecutionQueryApplicationService.cs b/src/workflow/Aevatar.Workflow.Application/Queries/WorkflowExecutionQueryApplicationService.cs index 2ad89fab8..7619ce502 100644 --- a/src/workflow/Aevatar.Workflow.Application/Queries/WorkflowExecutionQueryApplicationService.cs +++ b/src/workflow/Aevatar.Workflow.Application/Queries/WorkflowExecutionQueryApplicationService.cs @@ -50,19 +50,21 @@ public async Task> ListAgentsAsync(Cancellat public IReadOnlyList ListWorkflows() => _workflowRegistry.GetNames(); - public IReadOnlyList ListWorkflowCatalog() => - _workflowCatalogPort.ListWorkflowCatalog(); + public Task> ListWorkflowCatalogAsync(CancellationToken ct = default) => + _workflowCatalogPort.ListWorkflowCatalogAsync(ct); - public WorkflowCatalogItemDetail? GetWorkflowDetail(string workflowName) + public async Task GetWorkflowDetailAsync( + string workflowName, + CancellationToken ct = default) { if (string.IsNullOrWhiteSpace(workflowName)) return null; - return _workflowCatalogPort.GetWorkflowDetail(workflowName); + return await _workflowCatalogPort.GetWorkflowDetailAsync(workflowName, ct); } - public WorkflowCapabilitiesDocument GetCapabilities() => - _workflowCapabilitiesPort.GetCapabilities(); + public Task GetCapabilitiesAsync(CancellationToken ct = default) => + _workflowCapabilitiesPort.GetCapabilitiesAsync(ct); public async Task GetActorSnapshotAsync(string actorId, CancellationToken ct = default) { diff --git a/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ChatEndpoints.cs b/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ChatEndpoints.cs index 42f0c365a..9f465f71c 100644 --- a/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ChatEndpoints.cs +++ b/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ChatEndpoints.cs @@ -49,7 +49,7 @@ public static async Task HandleChat( try { - var capabilities = TryResolveCapabilities(serviceProvider, logger); + var capabilities = await TryResolveCapabilitiesAsync(serviceProvider, logger, ct); var defaultMetadata = TryResolveRuntimeDefaultMetadata(serviceProvider, logger); var normalizedRequest = ChatRunRequestNormalizer.Normalize( input, @@ -539,7 +539,7 @@ await ChatWebSocketProtocol.SendAsync( } responseMessageType = ChatWebSocketProtocol.NormalizeMessageType(command.ResponseMessageType); - var capabilities = TryResolveCapabilities(http.RequestServices, logger); + var capabilities = await TryResolveCapabilitiesAsync(http.RequestServices, logger, ct); var defaultMetadata = TryResolveRuntimeDefaultMetadata(http.RequestServices, logger); await ChatWebSocketRunCoordinator.ExecuteAsync( socket, @@ -577,7 +577,10 @@ await ChatWebSocketProtocol.SendAsync( } } - private static WorkflowCapabilitiesDocument? TryResolveCapabilities(IServiceProvider? serviceProvider, ILogger? logger) + private static async Task TryResolveCapabilitiesAsync( + IServiceProvider? serviceProvider, + ILogger? logger, + CancellationToken ct) { if (serviceProvider == null) return null; @@ -586,7 +589,9 @@ await ChatWebSocketProtocol.SendAsync( { var queryService = serviceProvider.GetService(typeof(IWorkflowExecutionQueryApplicationService)) as IWorkflowExecutionQueryApplicationService; - return queryService?.GetCapabilities(); + return queryService == null + ? null + : await queryService.GetCapabilitiesAsync(ct); } catch (Exception ex) { diff --git a/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ChatQueryEndpoints.cs b/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ChatQueryEndpoints.cs index c14a3e6a9..623d88501 100644 --- a/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ChatQueryEndpoints.cs +++ b/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ChatQueryEndpoints.cs @@ -61,11 +61,13 @@ internal static async Task ListAgents( return Results.Ok(agents); } - internal static IResult ListPrimitives(IWorkflowExecutionQueryApplicationService queryService) + internal static async Task ListPrimitives( + IWorkflowExecutionQueryApplicationService queryService, + CancellationToken ct = default) { - var capabilities = queryService.GetCapabilities(); - var exampleWorkflowsByPrimitive = queryService - .ListWorkflowCatalog() + var capabilities = await queryService.GetCapabilitiesAsync(ct); + var catalog = await queryService.ListWorkflowCatalogAsync(ct); + var exampleWorkflowsByPrimitive = catalog .Where(static item => item.IsPrimitiveExample) .SelectMany(item => item.Primitives.Select(primitive => new { Primitive = primitive, Workflow = item.Name })) .GroupBy(static item => item.Primitive, StringComparer.OrdinalIgnoreCase) @@ -89,17 +91,22 @@ internal static IResult ListPrimitives(IWorkflowExecutionQueryApplicationService internal static IResult ListWorkflows(IWorkflowExecutionQueryApplicationService queryService) => Results.Ok(queryService.ListWorkflows()); - internal static IResult ListWorkflowCatalog(IWorkflowExecutionQueryApplicationService queryService) => - Results.Ok(queryService.ListWorkflowCatalog()); + internal static async Task ListWorkflowCatalog( + IWorkflowExecutionQueryApplicationService queryService, + CancellationToken ct = default) => + Results.Ok(await queryService.ListWorkflowCatalogAsync(ct)); - internal static IResult GetCapabilities(IWorkflowExecutionQueryApplicationService queryService) => - Results.Ok(queryService.GetCapabilities()); + internal static async Task GetCapabilities( + IWorkflowExecutionQueryApplicationService queryService, + CancellationToken ct = default) => + Results.Ok(await queryService.GetCapabilitiesAsync(ct)); - internal static IResult GetWorkflowDetail( + internal static async Task GetWorkflowDetail( string workflowName, - IWorkflowExecutionQueryApplicationService queryService) + IWorkflowExecutionQueryApplicationService queryService, + CancellationToken ct = default) { - var detail = queryService.GetWorkflowDetail(workflowName); + var detail = await queryService.GetWorkflowDetailAsync(workflowName, ct); return detail == null ? Results.NotFound() : Results.Ok(detail); } diff --git a/src/workflow/Aevatar.Workflow.Projection/Workflows/WorkflowCatalogReadModelQueryPort.cs b/src/workflow/Aevatar.Workflow.Projection/Workflows/WorkflowCatalogReadModelQueryPort.cs index a9de749a4..173d3b6e2 100644 --- a/src/workflow/Aevatar.Workflow.Projection/Workflows/WorkflowCatalogReadModelQueryPort.cs +++ b/src/workflow/Aevatar.Workflow.Projection/Workflows/WorkflowCatalogReadModelQueryPort.cs @@ -24,9 +24,9 @@ public WorkflowCatalogReadModelQueryPort( // Refactor (iter46/issue-871-workflow-file-catalog-query-port): // Old pattern: Workflow catalog/capabilities query port discovered files, parsed YAML, loaded connector config, and cached results in singleton process memory during query execution. // New principle: WorkflowGAgent per-definition authority; query ports only read freshness-bearing readmodels; file discovery/parsing happens at startup/import time, not in query path. - public IReadOnlyList ListWorkflowCatalog() + public async Task> ListWorkflowCatalogAsync(CancellationToken ct = default) { - var documents = QueryCatalogDocuments(); + var documents = await QueryCatalogDocumentsAsync(ct); return documents .Select(_mapper.ToCatalogItem) .OrderBy(item => item.Group, StringComparer.OrdinalIgnoreCase) @@ -35,35 +35,37 @@ public IReadOnlyList ListWorkflowCatalog() .ToList(); } - public WorkflowCatalogItemDetail? GetWorkflowDetail(string workflowName) + public async Task GetWorkflowDetailAsync( + string workflowName, + CancellationToken ct = default) { if (string.IsNullOrWhiteSpace(workflowName)) return null; - var document = _catalogReader.GetAsync(workflowName.Trim()).Result; + var document = await _catalogReader.GetAsync(workflowName.Trim(), ct); return document == null ? null : _mapper.ToCatalogItemDetail(document); } - public WorkflowCapabilitiesDocument GetCapabilities() + public async Task GetCapabilitiesAsync(CancellationToken ct = default) { - var capabilities = _capabilitiesReader.GetAsync(CapabilitiesDocumentId).Result + var capabilities = await _capabilitiesReader.GetAsync(CapabilitiesDocumentId, ct) ?? new WorkflowCapabilitiesCurrentStateDocument { Id = CapabilitiesDocumentId, ActorId = CapabilitiesDocumentId, SchemaVersion = "capabilities.v1", }; - return _mapper.ToCapabilitiesDocument(capabilities, QueryCatalogDocuments()); + return _mapper.ToCapabilitiesDocument(capabilities, await QueryCatalogDocumentsAsync(ct)); } - private IReadOnlyList QueryCatalogDocuments() + private async Task> QueryCatalogDocumentsAsync(CancellationToken ct) { - var result = _catalogReader.QueryAsync(new ProjectionDocumentQuery + var result = await _catalogReader.QueryAsync(new ProjectionDocumentQuery { Take = 1000, - }).Result; + }, ct); return result.Items; } } diff --git a/test/Aevatar.AI.ToolProviders.Workflow.Tests/WorkflowRunToolContractTests.cs b/test/Aevatar.AI.ToolProviders.Workflow.Tests/WorkflowRunToolContractTests.cs index b2c1c062e..d9d93004a 100644 --- a/test/Aevatar.AI.ToolProviders.Workflow.Tests/WorkflowRunToolContractTests.cs +++ b/test/Aevatar.AI.ToolProviders.Workflow.Tests/WorkflowRunToolContractTests.cs @@ -174,6 +174,63 @@ public async Task WorkflowStatusTool_Timeline_ShouldAcceptWorkflowRunIdAndAlias( "ListWorkflowRunTimelineExport:legacy-run:6"); } + [Fact] + public async Task WorkflowStatusTool_CatalogAndDetail_ShouldAwaitAsyncQueryMethods() + { + var query = new RecordingWorkflowExecutionQueryService + { + Catalog = + [ + new WorkflowCatalogItem + { + Name = "direct", + Description = "Direct workflow.", + Category = "deterministic", + Group = "starter-workflows", + Source = "builtin", + }, + ], + Detail = new WorkflowCatalogItemDetail + { + Catalog = new WorkflowCatalogItem + { + Name = "direct", + Description = "Direct workflow.", + }, + Definition = new WorkflowCatalogDefinition + { + Roles = + [ + new WorkflowCatalogRole + { + Id = "assistant", + Name = "Assistant", + }, + ], + Steps = + [ + new WorkflowCatalogStep + { + Id = "start", + Type = "llm", + TargetRole = "assistant", + }, + ], + }, + }, + }; + var tool = new WorkflowStatusTool(query, new WorkflowToolOptions()); + + var catalogResult = await tool.ExecuteAsync("""{"action":"catalog"}"""); + var detailResult = await tool.ExecuteAsync("""{"action":"detail","workflow_name":"direct"}"""); + + using var catalogDocument = JsonDocument.Parse(catalogResult); + catalogDocument.RootElement.GetProperty("workflows")[0].GetProperty("name").GetString().Should().Be("direct"); + using var detailDocument = JsonDocument.Parse(detailResult); + detailDocument.RootElement.GetProperty("name").GetString().Should().Be("direct"); + query.Calls.Should().Equal("ListWorkflowCatalog", "GetWorkflowDetail:direct"); + } + [Fact] public async Task WorkflowStatusTool_WhenWorkflowRunIdMissing_ShouldReturnNewErrors() { @@ -250,6 +307,8 @@ private sealed class RecordingWorkflowExecutionQueryService : IWorkflowExecution public bool ActorQueryEnabled { get; init; } = true; public List Calls { get; } = []; public WorkflowRunReport? Report { get; init; } + public IReadOnlyList Catalog { get; init; } = []; + public WorkflowCatalogItemDetail? Detail { get; init; } public IReadOnlyList Timeline { get; init; } = []; public IReadOnlyList GraphEdges { get; init; } = []; public WorkflowRunGraphExportSubgraph GraphSubgraph { get; init; } = new(); @@ -259,11 +318,22 @@ public Task> ListAgentsAsync(CancellationTok public IReadOnlyList ListWorkflows() => []; - public IReadOnlyList ListWorkflowCatalog() => []; + public Task> ListWorkflowCatalogAsync(CancellationToken ct = default) + { + Calls.Add("ListWorkflowCatalog"); + return Task.FromResult(Catalog); + } - public WorkflowCatalogItemDetail? GetWorkflowDetail(string workflowName) => null; + public Task GetWorkflowDetailAsync( + string workflowName, + CancellationToken ct = default) + { + Calls.Add($"GetWorkflowDetail:{workflowName}"); + return Task.FromResult(Detail); + } - public WorkflowCapabilitiesDocument GetCapabilities() => new(); + public Task GetCapabilitiesAsync(CancellationToken ct = default) => + Task.FromResult(new WorkflowCapabilitiesDocument()); public Task GetActorSnapshotAsync(string actorId, CancellationToken ct = default) => Task.FromResult(null); diff --git a/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsTests.cs b/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsTests.cs index 9af4e3a40..5a97aaf9f 100644 --- a/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsTests.cs +++ b/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsTests.cs @@ -5379,11 +5379,14 @@ public Task> ListAgentsAsync(CancellationTok public IReadOnlyList ListWorkflows() => []; - public IReadOnlyList ListWorkflowCatalog() => []; + public Task> ListWorkflowCatalogAsync(CancellationToken ct = default) => + Task.FromResult>([]); - public WorkflowCatalogItemDetail? GetWorkflowDetail(string workflowName) => null; + public Task GetWorkflowDetailAsync(string workflowName, CancellationToken ct = default) => + Task.FromResult(null); - public WorkflowCapabilitiesDocument GetCapabilities() => new(); + public Task GetCapabilitiesAsync(CancellationToken ct = default) => + Task.FromResult(new WorkflowCapabilitiesDocument()); public Task GetActorSnapshotAsync(string actorId, CancellationToken ct = default) { diff --git a/test/Aevatar.Workflow.Application.Tests/WorkflowExecutionQueryApplicationServiceTests.cs b/test/Aevatar.Workflow.Application.Tests/WorkflowExecutionQueryApplicationServiceTests.cs index b63324143..0cfb32e36 100644 --- a/test/Aevatar.Workflow.Application.Tests/WorkflowExecutionQueryApplicationServiceTests.cs +++ b/test/Aevatar.Workflow.Application.Tests/WorkflowExecutionQueryApplicationServiceTests.cs @@ -2,6 +2,7 @@ using Aevatar.Workflow.Application.Abstractions.Queries; using Aevatar.Workflow.Application.Abstractions.Workflows; using Aevatar.Workflow.Application.Queries; +using Aevatar.Workflow.Application.Workflows; using FluentAssertions; namespace Aevatar.Workflow.Application.Tests; @@ -143,6 +144,99 @@ public async Task ListAgentsAsync_ShouldHonorCancellation() await act.Should().ThrowAsync(); } + [Fact] + public async Task CatalogAndCapabilitiesQueries_ShouldDelegateAsyncAndPassCancellationToken() + { + var catalogPort = new RecordingWorkflowCatalogPort + { + Catalog = + [ + new WorkflowCatalogItem + { + Name = "direct", + }, + ], + Detail = new WorkflowCatalogItemDetail + { + Catalog = new WorkflowCatalogItem + { + Name = "direct", + }, + }, + }; + var capabilitiesPort = new RecordingWorkflowCapabilitiesPort + { + Capabilities = new WorkflowCapabilitiesDocument + { + SchemaVersion = "capabilities.v1", + }, + }; + var service = new WorkflowExecutionQueryApplicationService( + new StaticWorkflowDefinitionCatalog([]), + new FakeCurrentStateQueryPort([]), + new FakeArtifactQueryPort([]), + catalogPort, + capabilitiesPort); + using var cts = new CancellationTokenSource(); + + var catalog = await service.ListWorkflowCatalogAsync(cts.Token); + var detail = await service.GetWorkflowDetailAsync("direct", cts.Token); + var blankDetail = await service.GetWorkflowDetailAsync(" ", cts.Token); + var capabilities = await service.GetCapabilitiesAsync(cts.Token); + + catalog.Should().ContainSingle(item => item.Name == "direct"); + detail.Should().NotBeNull(); + blankDetail.Should().BeNull(); + capabilities.SchemaVersion.Should().Be("capabilities.v1"); + catalogPort.Calls.Should().Equal("ListWorkflowCatalog", "GetWorkflowDetail:direct"); + capabilitiesPort.Calls.Should().Equal("GetCapabilities"); + catalogPort.CancellationTokens.Should().OnlyContain(token => token == cts.Token); + capabilitiesPort.CancellationTokens.Should().OnlyContain(token => token == cts.Token); + } + + [Fact] + public async Task RegistryBackedWorkflowCatalogPort_ShouldExposeStartupCatalogThroughAsyncQueryMethods() + { + var registry = new WorkflowDefinitionCatalog(); + registry.Register("beta", """ + name: beta + description: Beta workflow. + steps: + - id: reply + type: llm_call + """); + registry.Register("alpha", """ + name: alpha + description: Alpha workflow. + steps: + - id: reply + type: llm_call + """); + var port = new RegistryBackedWorkflowCatalogPort(registry); + + var catalog = await port.ListWorkflowCatalogAsync(); + var detail = await port.GetWorkflowDetailAsync(" alpha "); + var blankDetail = await port.GetWorkflowDetailAsync(" "); + var missingDetail = await port.GetWorkflowDetailAsync("missing"); + var capabilities = await port.GetCapabilitiesAsync(); + + catalog.Select(item => item.Name).Should().Equal("alpha", "beta"); + catalog.Should().OnlyContain(item => + item.Source == "builtin" && + item.SourceLabel == "Built-in" && + item.Group == "starter-workflows" && + item.GroupLabel == "Starter Workflows" && + item.ShowInLibrary); + detail.Should().NotBeNull(); + detail!.Catalog.Name.Should().Be("alpha"); + detail.Yaml.Should().Contain("name: alpha"); + blankDetail.Should().BeNull(); + missingDetail.Should().BeNull(); + capabilities.SchemaVersion.Should().Be("capabilities.v1"); + capabilities.Workflows.Select(workflow => workflow.Name).Should().Equal("alpha", "beta"); + capabilities.Workflows.Should().OnlyContain(workflow => workflow.Source == "builtin"); + } + private sealed class StaticWorkflowDefinitionCatalog(IReadOnlyList names) : IWorkflowDefinitionCatalog { public void Register(string name, string yaml) => throw new NotSupportedException(); @@ -156,14 +250,53 @@ private sealed class StaticWorkflowDefinitionCatalog(IReadOnlyList names private sealed class StaticWorkflowCatalogPort : IWorkflowCatalogPort { - public IReadOnlyList ListWorkflowCatalog() => []; + public Task> ListWorkflowCatalogAsync(CancellationToken ct = default) => + Task.FromResult>([]); - public WorkflowCatalogItemDetail? GetWorkflowDetail(string workflowName) => null; + public Task GetWorkflowDetailAsync(string workflowName, CancellationToken ct = default) => + Task.FromResult(null); } private sealed class StaticWorkflowCapabilitiesPort : IWorkflowCapabilitiesPort { - public WorkflowCapabilitiesDocument GetCapabilities() => new(); + public Task GetCapabilitiesAsync(CancellationToken ct = default) => + Task.FromResult(new WorkflowCapabilitiesDocument()); + } + + private sealed class RecordingWorkflowCatalogPort : IWorkflowCatalogPort + { + public IReadOnlyList Catalog { get; init; } = []; + public WorkflowCatalogItemDetail? Detail { get; init; } + public List Calls { get; } = []; + public List CancellationTokens { get; } = []; + + public Task> ListWorkflowCatalogAsync(CancellationToken ct = default) + { + Calls.Add("ListWorkflowCatalog"); + CancellationTokens.Add(ct); + return Task.FromResult(Catalog); + } + + public Task GetWorkflowDetailAsync(string workflowName, CancellationToken ct = default) + { + Calls.Add($"GetWorkflowDetail:{workflowName}"); + CancellationTokens.Add(ct); + return Task.FromResult(Detail); + } + } + + private sealed class RecordingWorkflowCapabilitiesPort : IWorkflowCapabilitiesPort + { + public WorkflowCapabilitiesDocument Capabilities { get; init; } = new(); + public List Calls { get; } = []; + public List CancellationTokens { get; } = []; + + public Task GetCapabilitiesAsync(CancellationToken ct = default) + { + Calls.Add("GetCapabilities"); + CancellationTokens.Add(ct); + return Task.FromResult(Capabilities); + } } private sealed class FakeCurrentStateQueryPort(List calls) : IWorkflowExecutionCurrentStateQueryPort diff --git a/test/Aevatar.Workflow.Host.Api.Tests/ChatQueryEndpointsTests.cs b/test/Aevatar.Workflow.Host.Api.Tests/ChatQueryEndpointsTests.cs index aa92b89d0..df8981266 100644 --- a/test/Aevatar.Workflow.Host.Api.Tests/ChatQueryEndpointsTests.cs +++ b/test/Aevatar.Workflow.Host.Api.Tests/ChatQueryEndpointsTests.cs @@ -91,7 +91,8 @@ public async Task ListPrimitives_ShouldComposePrimitiveDescriptorsFromCapabiliti ], }; - var result = ChatQueryEndpoints.ListPrimitives(service); + using var cts = new CancellationTokenSource(); + var result = await ChatQueryEndpoints.ListPrimitives(service, cts.Token); var body = await ExecuteAsync(result); body.Should().Contain("workflow_call"); @@ -99,6 +100,47 @@ public async Task ListPrimitives_ShouldComposePrimitiveDescriptorsFromCapabiliti body.Should().Contain("child_example"); body.Should().NotContain("ignored_non_example"); service.Calls.Should().ContainInOrder("GetCapabilities", "ListWorkflowCatalog"); + service.CancellationTokens.Should().OnlyContain(token => token == cts.Token); + } + + [Fact] + public async Task CatalogEndpoints_ShouldAwaitAsyncQueryServiceAndPassCancellationToken() + { + var service = new FakeWorkflowExecutionQueryApplicationService + { + WorkflowCatalog = + [ + new WorkflowCatalogItem + { + Name = "direct", + }, + ], + WorkflowDetail = new WorkflowCatalogItemDetail + { + Catalog = new WorkflowCatalogItem + { + Name = "direct", + }, + }, + Capabilities = new WorkflowCapabilitiesDocument + { + SchemaVersion = "capabilities.v1", + }, + }; + using var cts = new CancellationTokenSource(); + + var catalog = await ChatQueryEndpoints.ListWorkflowCatalog(service, cts.Token); + var capabilities = await ChatQueryEndpoints.GetCapabilities(service, cts.Token); + var detail = await ChatQueryEndpoints.GetWorkflowDetail("direct", service, cts.Token); + + (await ExecuteAsync(catalog)).Should().Contain("direct"); + (await ExecuteAsync(capabilities)).Should().Contain("capabilities.v1"); + (await ExecuteAsync(detail)).Should().Contain("direct"); + service.Calls.Should().ContainInOrder( + "ListWorkflowCatalog", + "GetCapabilities", + "GetWorkflowDetail:direct"); + service.CancellationTokens.Should().OnlyContain(token => token == cts.Token); } [Fact] @@ -370,22 +412,29 @@ public IReadOnlyList ListWorkflows() return Workflows; } - public IReadOnlyList ListWorkflowCatalog() + public List CancellationTokens { get; } = []; + + public Task> ListWorkflowCatalogAsync(CancellationToken ct = default) { Calls.Add("ListWorkflowCatalog"); - return WorkflowCatalog; + CancellationTokens.Add(ct); + return Task.FromResult(WorkflowCatalog); } - public WorkflowCatalogItemDetail? GetWorkflowDetail(string workflowName) + public Task GetWorkflowDetailAsync( + string workflowName, + CancellationToken ct = default) { Calls.Add($"GetWorkflowDetail:{workflowName}"); - return WorkflowDetail; + CancellationTokens.Add(ct); + return Task.FromResult(WorkflowDetail); } - public WorkflowCapabilitiesDocument GetCapabilities() + public Task GetCapabilitiesAsync(CancellationToken ct = default) { Calls.Add("GetCapabilities"); - return Capabilities; + CancellationTokens.Add(ct); + return Task.FromResult(Capabilities); } public Task GetActorSnapshotAsync(string actorId, CancellationToken ct = default) diff --git a/test/Aevatar.Workflow.Host.Api.Tests/WorkflowExecutionQueryPortsCoverageTests.cs b/test/Aevatar.Workflow.Host.Api.Tests/WorkflowExecutionQueryPortsCoverageTests.cs index d683c4d2b..465f70e84 100644 --- a/test/Aevatar.Workflow.Host.Api.Tests/WorkflowExecutionQueryPortsCoverageTests.cs +++ b/test/Aevatar.Workflow.Host.Api.Tests/WorkflowExecutionQueryPortsCoverageTests.cs @@ -41,7 +41,7 @@ public void WorkflowExecutionReadModelMapper_ShouldMapCurrentStateStatuses( } [Fact] - public void WorkflowCatalogReadModelQueryPort_ShouldOnlyReadAndMapReadModelDocuments() + public async Task WorkflowCatalogReadModelQueryPort_ShouldOnlyReadAndMapReadModelDocuments() { var updatedAt = DateTimeOffset.Parse("2026-03-17T12:00:00+00:00"); var catalogReader = new RecordingDocumentReader @@ -107,9 +107,9 @@ public void WorkflowCatalogReadModelQueryPort_ShouldOnlyReadAndMapReadModelDocum capabilitiesReader, new WorkflowCatalogReadModelMapper()); - var catalog = port.ListWorkflowCatalog(); - var detail = port.GetWorkflowDetail("alpha"); - var capabilities = port.GetCapabilities(); + var catalog = await port.ListWorkflowCatalogAsync(); + var detail = await port.GetWorkflowDetailAsync("alpha"); + var capabilities = await port.GetCapabilitiesAsync(); catalog.Select(x => x.Name).Should().Equal("alpha", "beta"); catalog[0].AuthorityStateVersion.Should().Be(11); @@ -136,7 +136,7 @@ public void WorkflowCatalogReadModelQueryPort_ShouldOnlyReadAndMapReadModelDocum } [Fact] - public void WorkflowCatalogReadModelQueryPort_WhenReadModelsAreMissing_ShouldReturnHonestDefaults() + public async Task WorkflowCatalogReadModelQueryPort_WhenReadModelsAreMissing_ShouldReturnHonestDefaults() { var catalogReader = new RecordingDocumentReader(); var capabilitiesReader = new RecordingDocumentReader(); @@ -145,9 +145,9 @@ public void WorkflowCatalogReadModelQueryPort_WhenReadModelsAreMissing_ShouldRet capabilitiesReader, new WorkflowCatalogReadModelMapper()); - port.GetWorkflowDetail(" ").Should().BeNull(); - port.GetWorkflowDetail("missing").Should().BeNull(); - var capabilities = port.GetCapabilities(); + (await port.GetWorkflowDetailAsync(" ")).Should().BeNull(); + (await port.GetWorkflowDetailAsync("missing")).Should().BeNull(); + var capabilities = await port.GetCapabilitiesAsync(); capabilities.SchemaVersion.Should().Be("capabilities.v1"); capabilities.AuthorityStateVersion.Should().Be(0); @@ -158,6 +158,25 @@ public void WorkflowCatalogReadModelQueryPort_WhenReadModelsAreMissing_ShouldRet capabilitiesReader.GetCalls.Should().Be(1); } + [Fact] + public async Task WorkflowCatalogReadModelQueryPort_ShouldAwaitReadModelReadersWithoutBlocking() + { + var updatedAt = DateTimeOffset.Parse("2026-03-17T12:00:00+00:00"); + var catalogReader = new DeferredQueryDocumentReader( + [BuildCatalogDocument("alpha", updatedAt)]); + var port = new WorkflowCatalogReadModelQueryPort( + catalogReader, + new RecordingDocumentReader(), + new WorkflowCatalogReadModelMapper()); + + var catalogTask = port.ListWorkflowCatalogAsync(); + + catalogTask.IsCompleted.Should().BeFalse(); + catalogReader.CompleteQuery(); + var catalog = await catalogTask; + catalog.Should().ContainSingle(item => item.Name == "alpha"); + } + [Theory] [InlineData("custom", "home", "deterministic", "your-workflows", 0, "Saved")] [InlineData("custom", "cwd", "deterministic", "your-workflows", 0, "Workspace")] @@ -630,6 +649,43 @@ public Task> QueryAsync( } } + private sealed class DeferredQueryDocumentReader : IProjectionDocumentReader + where TReadModel : class, IProjectionReadModel + { + private readonly TaskCompletionSource> _query = + new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly IReadOnlyList _items; + + public DeferredQueryDocumentReader(IReadOnlyList items) + { + _items = items; + } + + public Task GetAsync(string key, CancellationToken ct = default) + { + _ = key; + ct.ThrowIfCancellationRequested(); + return Task.FromResult(null); + } + + public Task> QueryAsync( + ProjectionDocumentQuery query, + CancellationToken ct = default) + { + _ = query; + ct.ThrowIfCancellationRequested(); + return _query.Task; + } + + public void CompleteQuery() + { + _query.SetResult(new ProjectionDocumentQueryResult + { + Items = _items, + }); + } + } + private sealed class RecordingProjectionGraphStore : IProjectionGraphStore { public int GetNeighborsCalls { get; private set; } diff --git a/tools/ci/architecture_guards.sh b/tools/ci/architecture_guards.sh index c3b91288d..4a8b6f374 100755 --- a/tools/ci/architecture_guards.sh +++ b/tools/ci/architecture_guards.sh @@ -54,6 +54,54 @@ if rg -n "GetAwaiter\(\)\.GetResult\(\)" src; then exit 1 fi +# Refactor (iter56/cluster-920-workflow-catalog-async-query): +# Old pattern: production workflow query ports could hide async readmodel I/O behind sync .Result/.Wait calls. +# New principle: catalog/capabilities query seams are async end-to-end, and query ports await readmodel readers. +workflow_query_port_files=() +while IFS= read -r query_port_file; do + workflow_query_port_files+=("${query_port_file}") +done < <( + find src/workflow \ + -type f \ + \( -name '*QueryPort.cs' -o -path '*/Queries/*.cs' -o -path '*/Workflows/*ReadModelQueryPort.cs' \) \ + -not -path '*/bin/*' \ + -not -path '*/obj/*' \ + -not -name '*.g.cs' \ + -not -name '*.Designer.cs' \ + | sort +) + +if (( ${#workflow_query_port_files[@]} > 0 )); then + set +e + workflow_query_port_sync_blocking_report="$( + rg -n "\.Result|\.Wait[[:space:]]*\(|GetAwaiter\(\)\.GetResult\(\)" "${workflow_query_port_files[@]}" \ + | awk -F: ' +{ + file = $1; + line_no = $2; + text = substr($0, length(file) + length(line_no) + 3); + + if (text ~ /^[[:space:]]*\/\/\/?/) + next; + + print $0; +}' + )" + workflow_query_port_sync_blocking_status=$? + set -e + + if [[ ${workflow_query_port_sync_blocking_status} -ne 0 && ${workflow_query_port_sync_blocking_status} -ne 1 ]]; then + echo "Workflow query-port sync-blocking guard execution failed." + exit "${workflow_query_port_sync_blocking_status}" + fi + + if [ -n "${workflow_query_port_sync_blocking_report}" ]; then + echo "${workflow_query_port_sync_blocking_report}" + echo "Workflow production query ports must not sync-block async reads. Use async query seams and await readmodel readers." + exit 1 + fi +fi + # Refactor (iter18/cluster-001): # Old pattern: ILLMProvider 仍暴露 ChatAsync 非流式入口,provider/failover 可绕过流式链路 # New principle: Provider contract 只暴露 ChatStreamAsync;非流式聚合用现有 ChatStreamContentAggregator;无新 offline adapter