Skip to content

Commit 4bfcf5f

Browse files
feat: Send current task as the first event after subscribing to it (#418)
# Description Sending a `Task` object as the first event after subscribing to it. Additionally, throwing an `UnsupportedOperationError` error if a `Task` in terminal state was used for subscribing. This behavior is aligned with spec requirements: https://a2a-protocol.org/latest/specification/#1046-subscribetotask Fixes #323 🦕 --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
1 parent 672f32b commit 4bfcf5f

2 files changed

Lines changed: 126 additions & 29 deletions

File tree

src/server/request_handler/default_request_handler.ts

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -612,32 +612,38 @@ export class DefaultRequestHandler implements A2ARequestHandler {
612612
}
613613

614614
const taskId = params.id;
615-
const task = await this.taskStore.load(taskId, context);
616-
if (!task) {
617-
throw new TaskNotFoundError(`Task not found: ${taskId}`);
618-
}
619615

620-
// Yield the current task state first
621-
yield { payload: { $case: 'task', value: task } };
616+
// Attach to the event bus BEFORE loading the task from the store.
617+
// This eliminates the race condition where events published between the store
618+
// load and the subscription would be missed. The ExecutionEventQueue constructor
619+
// synchronously registers listeners, so all events from this point forward are
620+
// buffered in the queue's internal array.
621+
const eventBus = this.eventBusManager.getByTaskId(taskId);
622+
const eventQueue = eventBus ? new ExecutionEventQueue(eventBus) : undefined;
622623

623-
// If task is already in a final state, no more events will come.
624-
if (TERMINAL_STATE_LIST.includes(task.status!.state)) {
625-
return;
626-
}
624+
try {
625+
const task = await this.taskStore.load(taskId, context);
626+
if (!task) {
627+
throw new TaskNotFoundError(`Task not found: ${taskId}`);
628+
}
629+
if (task.status?.state !== undefined && TERMINAL_STATE_LIST.includes(task.status.state)) {
630+
throw new UnsupportedOperationError(
631+
`Task ${taskId} is in a terminal state (${task.status.state}) and cannot be subscribed to.`
632+
);
633+
}
627634

628-
const eventBus = this.eventBusManager.getByTaskId(taskId);
629-
if (!eventBus) {
630-
// No active execution for this task, so no live events.
631-
console.warn(`Resubscribe: No active event bus for task ${taskId}.`);
632-
return;
633-
}
635+
if (!eventQueue) {
636+
throw new UnsupportedOperationError(`Resubscribe: No active event bus for task ${taskId}.`);
637+
}
634638

635-
// Attach a new queue to the existing bus for this resubscription
636-
const eventQueue = new ExecutionEventQueue(eventBus);
637-
// Note: The ResultManager part is already handled by the original execution flow.
638-
// Resubscribe just listens for new events.
639+
// Per spec 3.1.6: "The operation MUST return a Task object as the first event
640+
// in the stream, representing the current state of the task at the time of
641+
// subscription."
642+
yield { payload: { $case: 'task', value: task } };
639643

640-
try {
644+
// Stream live events, filtering by taskId.
645+
// The ResultManager is already handled by the original execution flow;
646+
// resubscribe only listens for new events.
641647
for await (const event of eventQueue.events()) {
642648
switch (event.kind) {
643649
case 'statusUpdate':
@@ -663,7 +669,7 @@ export class DefaultRequestHandler implements A2ARequestHandler {
663669
}
664670
}
665671
} finally {
666-
eventQueue.stop();
672+
eventQueue?.stop();
667673
}
668674
}
669675

test/server/default_request_handler.spec.ts

Lines changed: 98 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ import {
5353
import { MockPushNotificationSender } from './mocks/push_notification_sender.mock.js';
5454
import { ServerCallContext } from '../../src/server/context.js';
5555
import { MockTaskStore } from './mocks/task_store.mock.js';
56+
import { TERMINAL_STATE_LIST } from '../../src/server/utils.js';
5657

5758
describe('DefaultRequestHandler as A2ARequestHandler', () => {
5859
let handler: A2ARequestHandler;
@@ -1154,14 +1155,8 @@ describe('DefaultRequestHandler as A2ARequestHandler', () => {
11541155

11551156
it('sendMessage: should reject if task is in a terminal state', async () => {
11561157
const taskId = 'task-terminal-1';
1157-
const terminalStates: TaskState[] = [
1158-
TaskState.TASK_STATE_COMPLETED,
1159-
TaskState.TASK_STATE_FAILED,
1160-
TaskState.TASK_STATE_CANCELED,
1161-
TaskState.TASK_STATE_REJECTED,
1162-
];
11631158

1164-
for (const state of terminalStates) {
1159+
for (const state of TERMINAL_STATE_LIST) {
11651160
const fakeTask: Task = {
11661161
id: taskId,
11671162
contextId: 'ctx-terminal',
@@ -1389,6 +1384,102 @@ describe('DefaultRequestHandler as A2ARequestHandler', () => {
13891384
assert.equal(lastSaveCall.status.state, TaskState.TASK_STATE_COMPLETED);
13901385
});
13911386

1387+
it('resubscribe: should throw UnsupportedOperationError for terminal-state tasks', async () => {
1388+
const taskId = 'task-terminal-resub';
1389+
1390+
for (const state of TERMINAL_STATE_LIST) {
1391+
const fakeTask: Task = {
1392+
id: taskId,
1393+
contextId: 'ctx-terminal-resub',
1394+
status: { state: state as TaskState, message: undefined, timestamp: undefined },
1395+
artifacts: [],
1396+
history: [],
1397+
metadata: {},
1398+
};
1399+
await mockTaskStore.save(fakeTask, serverCallContext);
1400+
1401+
const generator = handler.resubscribe({ id: taskId, tenant: '' }, serverCallContext);
1402+
try {
1403+
await generator.next();
1404+
assert.fail(`Should have thrown for terminal state: ${state}`);
1405+
} catch (error: unknown) {
1406+
expect(error).to.be.instanceOf(UnsupportedOperationError);
1407+
expect((error as Error).message).to.contain(`Task ${taskId} is in a terminal state`);
1408+
}
1409+
}
1410+
});
1411+
1412+
it('resubscribe: should throw TaskNotFoundError for non-existent task', async () => {
1413+
const generator = handler.resubscribe(
1414+
{ id: 'non-existent-task', tenant: '' },
1415+
serverCallContext
1416+
);
1417+
try {
1418+
await generator.next();
1419+
assert.fail('Should have thrown TaskNotFoundError');
1420+
} catch (error: unknown) {
1421+
expect(error).to.be.instanceOf(TaskNotFoundError);
1422+
}
1423+
});
1424+
1425+
it('resubscribe: should yield Task as the first event with current state', async () => {
1426+
const taskId = 'task-resub-first-event';
1427+
const fakeTask: Task = {
1428+
id: taskId,
1429+
contextId: 'ctx-resub-first',
1430+
status: { state: TaskState.TASK_STATE_WORKING, message: undefined, timestamp: undefined },
1431+
artifacts: [],
1432+
history: [],
1433+
metadata: {},
1434+
};
1435+
await mockTaskStore.save(fakeTask, serverCallContext);
1436+
1437+
// Create an active event bus
1438+
const bus = executionEventBusManager.createOrGetByTaskId(taskId);
1439+
1440+
const generator = handler.resubscribe({ id: taskId, tenant: '' }, serverCallContext);
1441+
1442+
// Advance once to yield the task and create the event queue
1443+
const firstResult = await generator.next();
1444+
assert.isFalse(firstResult.done);
1445+
1446+
// Now finish the bus to unblock the stream
1447+
bus.finished();
1448+
1449+
const results: StreamResponse[] = [firstResult.value];
1450+
for await (const event of generator) {
1451+
results.push(event);
1452+
}
1453+
1454+
assert.lengthOf(results, 1, 'Should yield exactly one event (the initial task snapshot)');
1455+
assert.equal(results[0].payload?.$case, 'task');
1456+
assert.deepEqual((results[0].payload as { $case: 'task'; value: Task }).value, fakeTask);
1457+
});
1458+
1459+
it('resubscribe: should throw UnsupportedOperationError when no active event bus exists', async () => {
1460+
const taskId = 'task-resub-no-bus';
1461+
const fakeTask: Task = {
1462+
id: taskId,
1463+
contextId: 'ctx-resub-no-bus',
1464+
status: { state: TaskState.TASK_STATE_WORKING, message: undefined, timestamp: undefined },
1465+
artifacts: [],
1466+
history: [],
1467+
metadata: {},
1468+
};
1469+
await mockTaskStore.save(fakeTask, serverCallContext);
1470+
1471+
const generator = handler.resubscribe({ id: taskId, tenant: '' }, serverCallContext);
1472+
try {
1473+
await generator.next();
1474+
assert.fail('Should have thrown UnsupportedOperationError');
1475+
} catch (error: unknown) {
1476+
expect(error).to.be.instanceOf(UnsupportedOperationError);
1477+
expect((error as Error).message).to.contain(
1478+
`Resubscribe: No active event bus for task ${taskId}`
1479+
);
1480+
}
1481+
});
1482+
13921483
it('getTask: should return an existing task from the store', async () => {
13931484
const fakeTask: Task = {
13941485
id: 'task-exist',

0 commit comments

Comments
 (0)