Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ public async Task<string> ExecuteAsync(string argumentsJson, CancellationToken c
return action switch
{
"list" => ListWorkflows(),
"catalog" => ListCatalog(),
"detail" => GetDetail(args),
"catalog" => await ListCatalogAsync(ct),
"detail" => await GetDetailAsync(args, ct),
"timeline" => await GetTimelineAsync(args, ct),
_ => await GetStatusAsync(args, ct),
};
Expand All @@ -98,9 +98,9 @@ private string ListWorkflows()
return JsonSerializer.Serialize(new { workflows, count = workflows.Count }, s_json);
}

private string ListCatalog()
private async Task<string> ListCatalogAsync(CancellationToken ct)
{
var catalog = _queryService.ListWorkflowCatalog();
var catalog = await _queryService.ListWorkflowCatalogAsync(ct);
var items = catalog.Select(c => new
{
name = c.Name, description = c.Description, category = c.Category,
Expand All @@ -109,13 +109,13 @@ private string ListCatalog()
return JsonSerializer.Serialize(new { workflows = items, count = items.Length }, s_json);
}

private string GetDetail(ToolArgs args)
private async Task<string> GetDetailAsync(ToolArgs args, CancellationToken ct)
{
var name = args.Str("workflow_name");
if (string.IsNullOrWhiteSpace(name))
return """{"error":"'workflow_name' is required for 'detail' action"}""";

var detail = _queryService.GetWorkflowDetail(name);
var detail = await _queryService.GetWorkflowDetailAsync(name, ct);
if (detail == null)
return JsonSerializer.Serialize(new { error = $"Workflow '{name}' not found" });

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ namespace Aevatar.Workflow.Application.Abstractions.Queries;

public interface IWorkflowCapabilitiesPort
{
WorkflowCapabilitiesDocument GetCapabilities();
Task<WorkflowCapabilitiesDocument> GetCapabilitiesAsync(CancellationToken ct = default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ namespace Aevatar.Workflow.Application.Abstractions.Queries;

public interface IWorkflowCatalogPort
{
IReadOnlyList<WorkflowCatalogItem> ListWorkflowCatalog();
// Refactor (iter56/cluster-920-workflow-catalog-async-query): old=sync catalog query, new=async end-to-end
// Catalog and capability query ports expose Task-returning methods so readmodel readers are awaited.
// HTTP, WebSocket, and tool callers pass cancellation tokens through this single query seam.
Task<IReadOnlyList<WorkflowCatalogItem>> ListWorkflowCatalogAsync(CancellationToken ct = default);

WorkflowCatalogItemDetail? GetWorkflowDetail(string workflowName);
Task<WorkflowCatalogItemDetail?> GetWorkflowDetailAsync(string workflowName, CancellationToken ct = default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ public interface IWorkflowExecutionQueryApplicationService

IReadOnlyList<string> ListWorkflows();

IReadOnlyList<WorkflowCatalogItem> ListWorkflowCatalog();
Task<IReadOnlyList<WorkflowCatalogItem>> ListWorkflowCatalogAsync(CancellationToken ct = default);

WorkflowCatalogItemDetail? GetWorkflowDetail(string workflowName);
Task<WorkflowCatalogItemDetail?> GetWorkflowDetailAsync(string workflowName, CancellationToken ct = default);

WorkflowCapabilitiesDocument GetCapabilities();
Task<WorkflowCapabilitiesDocument> GetCapabilitiesAsync(CancellationToken ct = default);

Task<WorkflowActorSnapshot?> GetActorSnapshotAsync(string actorId, CancellationToken ct = default);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ public RegistryBackedWorkflowCatalogPort(IWorkflowDefinitionCatalog workflowRegi
_workflowRegistry = workflowRegistry;
}

public IReadOnlyList<WorkflowCatalogItem> ListWorkflowCatalog()
public Task<IReadOnlyList<WorkflowCatalogItem>> ListWorkflowCatalogAsync(CancellationToken ct = default)
{
return _workflowRegistry.GetNames()
ct.ThrowIfCancellationRequested();
IReadOnlyList<WorkflowCatalogItem> catalog = _workflowRegistry.GetNames()
.OrderBy(name => name, StringComparer.OrdinalIgnoreCase)
.Select(name => new WorkflowCatalogItem
{
Expand All @@ -26,19 +27,23 @@ public IReadOnlyList<WorkflowCatalogItem> ListWorkflowCatalog()
ShowInLibrary = true,
})
.ToList();
return Task.FromResult(catalog);
}

public WorkflowCatalogItemDetail? GetWorkflowDetail(string workflowName)
public Task<WorkflowCatalogItemDetail?> GetWorkflowDetailAsync(
string workflowName,
CancellationToken ct = default)
{
ct.ThrowIfCancellationRequested();
if (string.IsNullOrWhiteSpace(workflowName))
return null;
return Task.FromResult<WorkflowCatalogItemDetail?>(null);

var normalizedName = workflowName.Trim();
var yaml = _workflowRegistry.GetYaml(normalizedName);
if (string.IsNullOrWhiteSpace(yaml))
return null;
return Task.FromResult<WorkflowCatalogItemDetail?>(null);

return new WorkflowCatalogItemDetail
return Task.FromResult<WorkflowCatalogItemDetail?>(new WorkflowCatalogItemDetail
{
Catalog = new WorkflowCatalogItem
{
Expand All @@ -50,12 +55,13 @@ public IReadOnlyList<WorkflowCatalogItem> ListWorkflowCatalog()
ShowInLibrary = true,
},
Yaml = yaml,
};
});
}

public WorkflowCapabilitiesDocument GetCapabilities()
public Task<WorkflowCapabilitiesDocument> GetCapabilitiesAsync(CancellationToken ct = default)
{
return new WorkflowCapabilitiesDocument
ct.ThrowIfCancellationRequested();
return Task.FromResult(new WorkflowCapabilitiesDocument
{
SchemaVersion = "capabilities.v1",
Workflows = _workflowRegistry.GetNames()
Expand All @@ -66,6 +72,6 @@ public WorkflowCapabilitiesDocument GetCapabilities()
Source = "builtin",
})
.ToList(),
};
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,21 @@ public async Task<IReadOnlyList<WorkflowAgentSummary>> ListAgentsAsync(Cancellat

public IReadOnlyList<string> ListWorkflows() => _workflowRegistry.GetNames();

public IReadOnlyList<WorkflowCatalogItem> ListWorkflowCatalog() =>
_workflowCatalogPort.ListWorkflowCatalog();
public Task<IReadOnlyList<WorkflowCatalogItem>> ListWorkflowCatalogAsync(CancellationToken ct = default) =>
_workflowCatalogPort.ListWorkflowCatalogAsync(ct);

public WorkflowCatalogItemDetail? GetWorkflowDetail(string workflowName)
public async Task<WorkflowCatalogItemDetail?> GetWorkflowDetailAsync(
string workflowName,
CancellationToken ct = default)
{
if (string.IsNullOrWhiteSpace(workflowName))
return null;

return _workflowCatalogPort.GetWorkflowDetail(workflowName);
return await _workflowCatalogPort.GetWorkflowDetailAsync(workflowName, ct);
}

public WorkflowCapabilitiesDocument GetCapabilities() =>
_workflowCapabilitiesPort.GetCapabilities();
public Task<WorkflowCapabilitiesDocument> GetCapabilitiesAsync(CancellationToken ct = default) =>
_workflowCapabilitiesPort.GetCapabilitiesAsync(ct);

public async Task<WorkflowActorSnapshot?> GetActorSnapshotAsync(string actorId, CancellationToken ct = default)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public static async Task HandleChat(

try
{
var capabilities = TryResolveCapabilities(serviceProvider, logger);
var capabilities = await TryResolveCapabilitiesAsync(serviceProvider, logger, ct);
var defaultMetadata = TryResolveRuntimeDefaultMetadata(serviceProvider, logger);
var normalizedRequest = ChatRunRequestNormalizer.Normalize(
input,
Expand Down Expand Up @@ -539,7 +539,7 @@ await ChatWebSocketProtocol.SendAsync(
}

responseMessageType = ChatWebSocketProtocol.NormalizeMessageType(command.ResponseMessageType);
var capabilities = TryResolveCapabilities(http.RequestServices, logger);
var capabilities = await TryResolveCapabilitiesAsync(http.RequestServices, logger, ct);
var defaultMetadata = TryResolveRuntimeDefaultMetadata(http.RequestServices, logger);
await ChatWebSocketRunCoordinator.ExecuteAsync(
socket,
Expand Down Expand Up @@ -577,7 +577,10 @@ await ChatWebSocketProtocol.SendAsync(
}
}

private static WorkflowCapabilitiesDocument? TryResolveCapabilities(IServiceProvider? serviceProvider, ILogger? logger)
private static async Task<WorkflowCapabilitiesDocument?> TryResolveCapabilitiesAsync(
IServiceProvider? serviceProvider,
ILogger? logger,
CancellationToken ct)
{
if (serviceProvider == null)
return null;
Expand All @@ -586,7 +589,9 @@ await ChatWebSocketProtocol.SendAsync(
{
var queryService = serviceProvider.GetService(typeof(IWorkflowExecutionQueryApplicationService))
as IWorkflowExecutionQueryApplicationService;
return queryService?.GetCapabilities();
return queryService == null
? null
: await queryService.GetCapabilitiesAsync(ct);
}
catch (Exception ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,13 @@ internal static async Task<IResult> ListAgents(
return Results.Ok(agents);
}

internal static IResult ListPrimitives(IWorkflowExecutionQueryApplicationService queryService)
internal static async Task<IResult> ListPrimitives(
IWorkflowExecutionQueryApplicationService queryService,
CancellationToken ct = default)
{
var capabilities = queryService.GetCapabilities();
var exampleWorkflowsByPrimitive = queryService
.ListWorkflowCatalog()
var capabilities = await queryService.GetCapabilitiesAsync(ct);
var catalog = await queryService.ListWorkflowCatalogAsync(ct);
var exampleWorkflowsByPrimitive = catalog
.Where(static item => item.IsPrimitiveExample)
.SelectMany(item => item.Primitives.Select(primitive => new { Primitive = primitive, Workflow = item.Name }))
.GroupBy(static item => item.Primitive, StringComparer.OrdinalIgnoreCase)
Expand All @@ -89,17 +91,22 @@ internal static IResult ListPrimitives(IWorkflowExecutionQueryApplicationService
internal static IResult ListWorkflows(IWorkflowExecutionQueryApplicationService queryService) =>
Results.Ok(queryService.ListWorkflows());

internal static IResult ListWorkflowCatalog(IWorkflowExecutionQueryApplicationService queryService) =>
Results.Ok(queryService.ListWorkflowCatalog());
internal static async Task<IResult> ListWorkflowCatalog(
IWorkflowExecutionQueryApplicationService queryService,
CancellationToken ct = default) =>
Results.Ok(await queryService.ListWorkflowCatalogAsync(ct));

internal static IResult GetCapabilities(IWorkflowExecutionQueryApplicationService queryService) =>
Results.Ok(queryService.GetCapabilities());
internal static async Task<IResult> GetCapabilities(
IWorkflowExecutionQueryApplicationService queryService,
CancellationToken ct = default) =>
Results.Ok(await queryService.GetCapabilitiesAsync(ct));

internal static IResult GetWorkflowDetail(
internal static async Task<IResult> GetWorkflowDetail(
string workflowName,
IWorkflowExecutionQueryApplicationService queryService)
IWorkflowExecutionQueryApplicationService queryService,
CancellationToken ct = default)
{
var detail = queryService.GetWorkflowDetail(workflowName);
var detail = await queryService.GetWorkflowDetailAsync(workflowName, ct);
return detail == null ? Results.NotFound() : Results.Ok(detail);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ public WorkflowCatalogReadModelQueryPort(
// Refactor (iter46/issue-871-workflow-file-catalog-query-port):
// Old pattern: Workflow catalog/capabilities query port discovered files, parsed YAML, loaded connector config, and cached results in singleton process memory during query execution.
// New principle: WorkflowGAgent per-definition authority; query ports only read freshness-bearing readmodels; file discovery/parsing happens at startup/import time, not in query path.
public IReadOnlyList<WorkflowCatalogItem> ListWorkflowCatalog()
public async Task<IReadOnlyList<WorkflowCatalogItem>> ListWorkflowCatalogAsync(CancellationToken ct = default)
{
var documents = QueryCatalogDocuments();
var documents = await QueryCatalogDocumentsAsync(ct);
return documents
.Select(_mapper.ToCatalogItem)
.OrderBy(item => item.Group, StringComparer.OrdinalIgnoreCase)
Expand All @@ -35,35 +35,37 @@ public IReadOnlyList<WorkflowCatalogItem> ListWorkflowCatalog()
.ToList();
}

public WorkflowCatalogItemDetail? GetWorkflowDetail(string workflowName)
public async Task<WorkflowCatalogItemDetail?> GetWorkflowDetailAsync(
string workflowName,
CancellationToken ct = default)
{
if (string.IsNullOrWhiteSpace(workflowName))
return null;

var document = _catalogReader.GetAsync(workflowName.Trim()).Result;
var document = await _catalogReader.GetAsync(workflowName.Trim(), ct);
return document == null
? null
: _mapper.ToCatalogItemDetail(document);
}

public WorkflowCapabilitiesDocument GetCapabilities()
public async Task<WorkflowCapabilitiesDocument> GetCapabilitiesAsync(CancellationToken ct = default)
{
var capabilities = _capabilitiesReader.GetAsync(CapabilitiesDocumentId).Result
var capabilities = await _capabilitiesReader.GetAsync(CapabilitiesDocumentId, ct)
?? new WorkflowCapabilitiesCurrentStateDocument
{
Id = CapabilitiesDocumentId,
ActorId = CapabilitiesDocumentId,
SchemaVersion = "capabilities.v1",
};
return _mapper.ToCapabilitiesDocument(capabilities, QueryCatalogDocuments());
return _mapper.ToCapabilitiesDocument(capabilities, await QueryCatalogDocumentsAsync(ct));
}

private IReadOnlyList<WorkflowCatalogCurrentStateDocument> QueryCatalogDocuments()
private async Task<IReadOnlyList<WorkflowCatalogCurrentStateDocument>> QueryCatalogDocumentsAsync(CancellationToken ct)
{
var result = _catalogReader.QueryAsync(new ProjectionDocumentQuery
var result = await _catalogReader.QueryAsync(new ProjectionDocumentQuery
{
Take = 1000,
}).Result;
}, ct);
return result.Items;
}
}
Loading
Loading