From 1a167b6f6112883f12eda3ee664faba6e7d34972 Mon Sep 17 00:00:00 2001 From: Bartek Gralewicz Date: Mon, 20 Apr 2026 14:42:53 +0000 Subject: [PATCH 01/16] Add multi tenant support for REST transport. --- .betterer.results | 2 +- src/client/transports/rest_transport.ts | 95 ++++--- src/server/express/rest_handler.ts | 268 +++++++++--------- .../transports/rest/rest_transport_handler.ts | 29 +- test/client/transports/rest_transport.spec.ts | 63 ++++ test/server/express/rest_handler.spec.ts | 24 ++ 6 files changed, 291 insertions(+), 190 deletions(-) diff --git a/.betterer.results b/.betterer.results index cfb80c97..9f504223 100644 --- a/.betterer.results +++ b/.betterer.results @@ -9,7 +9,7 @@ exports[`TypeScript Strict Mode`] = { [248, 15, 4, "tsc: Expected 2 arguments, but got 1.", "2087764327"], [271, 15, 4, "tsc: Expected 2 arguments, but got 1.", "2087764327"] ], - "src/server/express/rest_handler.ts:3969358401": [ + "src/server/express/rest_handler.ts:3943239369": [ [208, 50, 17, "tsc: Argument of type \'unknown\' is not assignable to parameter of type \'Message | Task | TaskArtifactUpdateEvent | TaskStatusUpdateEvent\'.", "3749434707"] ], "src/server/transports/jsonrpc/jsonrpc_transport_handler.ts:39466399": [ diff --git a/src/client/transports/rest_transport.ts b/src/client/transports/rest_transport.ts index 7f9aaaad..61334402 100644 --- a/src/client/transports/rest_transport.ts +++ b/src/client/transports/rest_transport.ts @@ -33,8 +33,6 @@ import { SubscribeToTaskRequest, ListTasksRequest, ListTasksResponse, - TaskState, - taskStateToJSON, } from '../../types/pb/a2a.js'; const PROTOCOL_NAME: TransportProtocolName = 'HTTP+JSON'; @@ -60,6 +58,10 @@ export class RestTransport implements Transport { this.customFetchImpl = options.fetchImpl; } + private _buildPath(path: string, tenant?: string): string { + return tenant ? `/${tenant}${path}` : path; + } + get protocolName(): string { return PROTOCOL_NAME; } @@ -81,9 +83,10 @@ export class RestTransport implements Transport { options?: RequestOptions ): Promise { const requestBody = params; + const path = this._buildPath('/message:send', params.tenant); const response = await this._sendRequest( 'POST', - '/message:send', + path, requestBody, options, SendMessageRequest, @@ -97,24 +100,23 @@ export class RestTransport implements Transport { options?: RequestOptions ): AsyncGenerator { const requestBody = SendMessageRequest.toJSON(params); - yield* this._sendStreamingRequest('/message:stream', requestBody, options); + const path = this._buildPath('/message:stream', params.tenant); + yield* this._sendStreamingRequest(path, requestBody, options); } async createTaskPushNotificationConfig( params: TaskPushNotificationConfig, options?: RequestOptions ): Promise { - const response = await this._sendRequest< - TaskPushNotificationConfig, - TaskPushNotificationConfig - >( - 'POST', + const requestBody = params; + const path = this._buildPath( `/tasks/${encodeURIComponent(params.taskId)}/pushNotificationConfigs`, - params, - options, + params.tenant + ); + const response = await this._sendRequest< TaskPushNotificationConfig, TaskPushNotificationConfig - ); + >('POST', path, requestBody, options, TaskPushNotificationConfig, TaskPushNotificationConfig); return response; } @@ -122,9 +124,13 @@ export class RestTransport implements Transport { params: GetTaskPushNotificationConfigRequest, options?: RequestOptions ): Promise { - const response = await this._sendRequest( - 'GET', + const path = this._buildPath( `/tasks/${params.taskId}/pushNotificationConfigs/${params.id}`, + params.tenant + ); + const response = await this._sendRequest( + 'GET', + path, undefined, options, undefined, @@ -137,9 +143,10 @@ export class RestTransport implements Transport { params: ListTaskPushNotificationConfigsRequest, options?: RequestOptions ): Promise { - const response = await this._sendRequest( + const path = this._buildPath(`/tasks/${params.taskId}/pushNotificationConfigs`, params.tenant); + const response = await this._sendRequest( 'GET', - `/tasks/${params.taskId}/pushNotificationConfigs`, + path, undefined, options, undefined, @@ -152,24 +159,24 @@ export class RestTransport implements Transport { params: DeleteTaskPushNotificationConfigRequest, options?: RequestOptions ): Promise { - await this._sendRequest( - 'DELETE', + const path = this._buildPath( `/tasks/${params.taskId}/pushNotificationConfigs/${params.id}`, - undefined, - options, - undefined, - undefined + params.tenant ); + await this._sendRequest('DELETE', path, undefined, options, undefined, undefined); } async getTask(params: GetTaskRequest, options?: RequestOptions): Promise { - const queryParams = new URLSearchParams(); + const queryParams: Record = {}; if (params.historyLength !== undefined) { - queryParams.set('historyLength', String(params.historyLength)); + queryParams.historyLength = params.historyLength.toString(); } - const queryString = queryParams.toString(); - const path = `/tasks/${params.id}${queryString ? `?${queryString}` : ''}`; - const response = await this._sendRequest( + const queryString = new URLSearchParams(queryParams).toString(); + const path = this._buildPath( + `/tasks/${params.id}${queryString ? `?${queryString}` : ''}`, + params.tenant + ); + const response = await this._sendRequest( 'GET', path, undefined, @@ -181,9 +188,10 @@ export class RestTransport implements Transport { } async cancelTask(params: CancelTaskRequest, options?: RequestOptions): Promise { - const response = await this._sendRequest( + const path = this._buildPath(`/tasks/${params.id}:cancel`, params.tenant); + const response = await this._sendRequest( 'POST', - `/tasks/${params.id}:cancel`, + path, undefined, options, undefined, @@ -193,25 +201,21 @@ export class RestTransport implements Transport { } async listTasks(params: ListTasksRequest, options?: RequestOptions): Promise { - const queryParams = new URLSearchParams(); - if (params.tenant) queryParams.set('tenant', params.tenant); - if (params.contextId) queryParams.set('contextId', params.contextId); - if (params.status !== undefined && params.status !== TaskState.TASK_STATE_UNSPECIFIED) { - queryParams.set('status', taskStateToJSON(params.status)); - } - if (params.pageSize !== undefined) queryParams.set('pageSize', String(params.pageSize)); - if (params.pageToken) queryParams.set('pageToken', params.pageToken); + const queryParams: Record = {}; + if (params.contextId) queryParams.contextId = params.contextId; + if (params.status !== undefined) queryParams.status = params.status.toString(); + if (params.pageSize !== undefined) queryParams.pageSize = params.pageSize.toString(); + if (params.pageToken) queryParams.pageToken = params.pageToken; if (params.historyLength !== undefined) - queryParams.set('historyLength', String(params.historyLength)); - if (params.statusTimestampAfter) - queryParams.set('statusTimestampAfter', params.statusTimestampAfter); + queryParams.historyLength = params.historyLength.toString(); + if (params.statusTimestampAfter) queryParams.statusTimestampAfter = params.statusTimestampAfter; if (params.includeArtifacts !== undefined) - queryParams.set('includeArtifacts', String(params.includeArtifacts)); + queryParams.includeArtifacts = params.includeArtifacts.toString(); - const queryString = queryParams.toString(); - const path = `/tasks${queryString ? `?${queryString}` : ''}`; + const queryString = new URLSearchParams(queryParams).toString(); + const path = this._buildPath(`/tasks${queryString ? `?${queryString}` : ''}`, params.tenant); - const response = await this._sendRequest( + const response = await this._sendRequest( 'GET', path, undefined, @@ -226,7 +230,8 @@ export class RestTransport implements Transport { params: SubscribeToTaskRequest, options?: RequestOptions ): AsyncGenerator { - yield* this._sendStreamingRequest(`/tasks/${params.id}:subscribe`, undefined, options); + const path = this._buildPath(`/tasks/${params.id}:subscribe`, params.tenant); + yield* this._sendStreamingRequest(path, undefined, options); } private _fetch(...args: Parameters): ReturnType { diff --git a/src/server/express/rest_handler.ts b/src/server/express/rest_handler.ts index 3b28d8b8..3d648d2e 100644 --- a/src/server/express/rest_handler.ts +++ b/src/server/express/rest_handler.ts @@ -267,6 +267,18 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { // Route Handlers // ============================================================================ + /** + * Helper to register routes with and without optional tenant prefix. + */ + const registerRoute = ( + method: 'get' | 'post' | 'delete' | 'put', + path: string, + handler: AsyncRouteHandler + ) => { + router[method](path, asyncHandler(handler)); + router[method](`/:tenant${path}`, asyncHandler(handler)); + }; + /** * GET /extendedAgentCard * @@ -275,14 +287,11 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { * @returns 200 OK with agent card * @returns 500 Internal Server Error on failure */ - router.get( - '/extendedAgentCard', - asyncHandler(async (req, res) => { - const context = await buildContext(req); - const result = await restTransportHandler.getAuthenticatedExtendedAgentCard(context); - sendResponse(res, HTTP_STATUS.OK, context, result, AgentCard); - }) - ); + registerRoute('get', '/extendedAgentCard', async (req, res) => { + const context = await buildContext(req); + const result = await restTransportHandler.getAuthenticatedExtendedAgentCard(context); + sendResponse(res, HTTP_STATUS.OK, context, result, AgentCard); + }); /** * POST /message:send @@ -295,22 +304,22 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { * @returns 201 Created with RestMessage or RestTask * @returns 400 Bad Request if message is invalid */ - router.post( - '/message\\:send', - asyncHandler(async (req, res) => { - const context = await buildContext(req); - const params = SendMessageRequest.fromJSON(req.body); - const result = await restTransportHandler.sendMessage(params, context); - const protoResult = ToProto.messageSendResult(result); - sendResponse( - res, - HTTP_STATUS.CREATED, - context, - protoResult, - SendMessageResponse - ); - }) - ); + registerRoute('post', '/message\\:send', async (req, res) => { + const context = await buildContext(req); + const params = SendMessageRequest.fromJSON(req.body); + if (req.params.tenant) { + params.tenant = req.params.tenant; + } + const result = await restTransportHandler.sendMessage(params, context); + const protoResult = ToProto.messageSendResult(result); + sendResponse( + res, + HTTP_STATUS.CREATED, + context, + protoResult, + SendMessageResponse + ); + }); /** * POST /message:stream @@ -324,15 +333,15 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { * @returns 400 Bad Request if message is invalid * @returns 501 Not Implemented if streaming not supported */ - router.post( - '/message\\:stream', - asyncHandler(async (req, res) => { - const context = await buildContext(req); - const params = SendMessageRequest.fromJSON(req.body); - const stream = await restTransportHandler.sendMessageStream(params, context); - await sendStreamResponse(res, stream, context); - }) - ); + registerRoute('post', '/message\\:stream', async (req, res) => { + const context = await buildContext(req); + const params = SendMessageRequest.fromJSON(req.body); + if (req.params.tenant) { + params.tenant = req.params.tenant; + } + const stream = await restTransportHandler.sendMessageStream(params, context); + await sendStreamResponse(res, stream, context); + }); /** * GET /tasks/:taskId @@ -345,19 +354,17 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { * @returns 400 Bad Request if historyLength is invalid * @returns 404 Not Found if task doesn't exist */ - router.get( - '/tasks/:taskId', - asyncHandler(async (req, res) => { - const context = await buildContext(req); - const result = await restTransportHandler.getTask( - req.params.taskId, - context, - //TODO: clarify for version 1.0.0 the format of the historyLength query parameter, and if history should always be added to the returned object - req.query.historyLength ?? req.query.history_length - ); - sendResponse(res, HTTP_STATUS.OK, context, result, Task); - }) - ); + registerRoute('get', '/tasks/:taskId', async (req, res) => { + const context = await buildContext(req); + const result = await restTransportHandler.getTask( + req.params.taskId, + context, + //TODO: clarify for version 1.0.0 the format of the historyLength query parameter, and if history should always be added to the returned object + req.query.historyLength ?? req.query.history_length, + req.params.tenant + ); + sendResponse(res, HTTP_STATUS.OK, context, result, Task); + }); /** * POST /tasks/:taskId:cancel @@ -370,14 +377,15 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { * @returns 404 Not Found if task doesn't exist * @returns 409 Conflict if task cannot be canceled */ - router.post( - '/tasks/:taskId\\:cancel', - asyncHandler(async (req, res) => { - const context = await buildContext(req); - const result = await restTransportHandler.cancelTask(req.params.taskId, context); - sendResponse(res, HTTP_STATUS.ACCEPTED, context, result, Task); - }) - ); + registerRoute('post', '/tasks/:taskId\\:cancel', async (req, res) => { + const context = await buildContext(req); + const result = await restTransportHandler.cancelTask( + req.params.taskId, + context, + req.params.tenant + ); + sendResponse(res, HTTP_STATUS.ACCEPTED, context, result, Task); + }); /** * GET /tasks @@ -387,14 +395,15 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { * @returns 200 OK with ListTasksResponse * @returns 400 Bad Request if filter or pageSize is invalid */ - router.get( - '/tasks', - asyncHandler(async (req, res) => { - const context = await buildContext(req); - const result = await restTransportHandler.listTasks(req.query, context); - sendResponse(res, HTTP_STATUS.OK, context, result, ListTasksResponse); - }) - ); + registerRoute('get', '/tasks', async (req, res) => { + const context = await buildContext(req); + const query = { ...req.query }; + if (req.params.tenant) { + query.tenant = req.params.tenant; + } + const result = await restTransportHandler.listTasks(query, context); + sendResponse(res, HTTP_STATUS.OK, context, result, ListTasksResponse); + }); /** * POST /tasks/:taskId:subscribe @@ -407,14 +416,15 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { * @returns 404 Not Found if task doesn't exist * @returns 501 Not Implemented if streaming not supported */ - router.post( - '/tasks/:taskId\\:subscribe', - asyncHandler(async (req, res) => { - const context = await buildContext(req); - const stream = await restTransportHandler.resubscribe(req.params.taskId, context); - await sendStreamResponse(res, stream, context); - }) - ); + registerRoute('post', '/tasks/:taskId\\:subscribe', async (req, res) => { + const context = await buildContext(req); + const stream = await restTransportHandler.resubscribe( + req.params.taskId, + context, + req.params.tenant + ); + await sendStreamResponse(res, stream, context); + }); /** * POST /tasks/:taskId/pushNotificationConfigs @@ -427,21 +437,21 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { * @returns 201 Created with TaskPushNotificationConfig * @returns 501 Not Implemented if push notifications not supported */ - router.post( - '/tasks/:taskId/pushNotificationConfigs', - asyncHandler(async (req, res) => { - const context = await buildContext(req); - const params = TaskPushNotificationConfig.fromJSON(req.body); - const result = await restTransportHandler.createTaskPushNotificationConfig(params, context); - sendResponse( - res, - HTTP_STATUS.CREATED, - context, - result, - TaskPushNotificationConfig - ); - }) - ); + registerRoute('post', '/tasks/:taskId/pushNotificationConfigs', async (req, res) => { + const context = await buildContext(req); + const params = TaskPushNotificationConfig.fromJSON(req.body); + if (req.params.tenant) { + params.tenant = req.params.tenant; + } + const result = await restTransportHandler.createTaskPushNotificationConfig(params, context); + sendResponse( + res, + HTTP_STATUS.CREATED, + context, + result, + TaskPushNotificationConfig + ); + }); /** * GET /tasks/:taskId/pushNotificationConfigs @@ -452,23 +462,21 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { * @returns 200 OK with array of TaskPushNotificationConfig * @returns 404 Not Found if task doesn't exist */ - router.get( - '/tasks/:taskId/pushNotificationConfigs', - asyncHandler(async (req, res) => { - const context = await buildContext(req); - const result = await restTransportHandler.listTaskPushNotificationConfigs( - req.params.taskId, - context - ); - sendResponse( - res, - HTTP_STATUS.OK, - context, - result, - ListTaskPushNotificationConfigsResponse - ); - }) - ); + registerRoute('get', '/tasks/:taskId/pushNotificationConfigs', async (req, res) => { + const context = await buildContext(req); + const result = await restTransportHandler.listTaskPushNotificationConfigs( + req.params.taskId, + context, + req.params.tenant + ); + sendResponse( + res, + HTTP_STATUS.OK, + context, + result, + ListTaskPushNotificationConfigsResponse + ); + }); /** * GET /tasks/:taskId/pushNotificationConfigs/:configId @@ -480,24 +488,22 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { * @returns 200 OK with TaskPushNotificationConfig * @returns 404 Not Found if task or config doesn't exist */ - router.get( - '/tasks/:taskId/pushNotificationConfigs/:configId', - asyncHandler(async (req, res) => { - const context = await buildContext(req); - const result = await restTransportHandler.getTaskPushNotificationConfig( - req.params.taskId, - req.params.configId, - context - ); - sendResponse( - res, - HTTP_STATUS.OK, - context, - result, - TaskPushNotificationConfig - ); - }) - ); + registerRoute('get', '/tasks/:taskId/pushNotificationConfigs/:configId', async (req, res) => { + const context = await buildContext(req); + const result = await restTransportHandler.getTaskPushNotificationConfig( + req.params.taskId, + req.params.configId, + context, + req.params.tenant + ); + sendResponse( + res, + HTTP_STATUS.OK, + context, + result, + TaskPushNotificationConfig + ); + }); /** * DELETE /tasks/:taskId/pushNotificationConfigs/:configId @@ -509,18 +515,16 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { * @returns 204 No Content on success * @returns 404 Not Found if task or config doesn't exist */ - router.delete( - '/tasks/:taskId/pushNotificationConfigs/:configId', - asyncHandler(async (req, res) => { - const context = await buildContext(req); - await restTransportHandler.deleteTaskPushNotificationConfig( - req.params.taskId, - req.params.configId, - context - ); - sendResponse(res, HTTP_STATUS.NO_CONTENT, context); - }) - ); + registerRoute('delete', '/tasks/:taskId/pushNotificationConfigs/:configId', async (req, res) => { + const context = await buildContext(req); + await restTransportHandler.deleteTaskPushNotificationConfig( + req.params.taskId, + req.params.configId, + context, + req.params.tenant + ); + sendResponse(res, HTTP_STATUS.NO_CONTENT, context); + }); return router; } diff --git a/src/server/transports/rest/rest_transport_handler.ts b/src/server/transports/rest/rest_transport_handler.ts index d9fec271..cea6e63a 100644 --- a/src/server/transports/rest/rest_transport_handler.ts +++ b/src/server/transports/rest/rest_transport_handler.ts @@ -173,9 +173,10 @@ export class RestTransportHandler { async getTask( taskId: string, context: ServerCallContext, - historyLength?: unknown + historyLength?: unknown, + tenant?: string ): Promise { - const params: GetTaskRequest = { id: taskId, historyLength: 0, tenant: '' }; + const params: GetTaskRequest = { id: taskId, historyLength: 0, tenant: tenant || '' }; if (historyLength !== undefined) { params.historyLength = this.parseHistoryLength(historyLength); } @@ -185,8 +186,8 @@ export class RestTransportHandler { /** * Cancels a task. */ - async cancelTask(taskId: string, context: ServerCallContext): Promise { - const params: CancelTaskRequest = { id: taskId, tenant: '', metadata: {} }; + async cancelTask(taskId: string, context: ServerCallContext, tenant?: string): Promise { + const params: CancelTaskRequest = { id: taskId, tenant: tenant || '', metadata: {} }; return this.requestHandler.cancelTask(params, context); } @@ -219,12 +220,13 @@ export class RestTransportHandler { */ async resubscribe( taskId: string, - context: ServerCallContext + context: ServerCallContext, + tenant?: string ): Promise< AsyncGenerator > { await this.requireCapability('streaming'); - return this.requestHandler.resubscribe({ id: taskId, tenant: '' }, context); + return this.requestHandler.resubscribe({ id: taskId, tenant: tenant || '' }, context); } /** @@ -247,10 +249,11 @@ export class RestTransportHandler { */ async listTaskPushNotificationConfigs( taskId: string, - context: ServerCallContext + context: ServerCallContext, + tenant?: string ): Promise { const result = await this.requestHandler.listTaskPushNotificationConfigs( - { taskId, pageSize: 0, pageToken: '', tenant: '' }, + { taskId, pageSize: 0, pageToken: '', tenant: tenant || '' }, context ); return result; @@ -262,10 +265,11 @@ export class RestTransportHandler { async getTaskPushNotificationConfig( taskId: string, configId: string, - context: ServerCallContext + context: ServerCallContext, + tenant?: string ): Promise { const config = await this.requestHandler.getTaskPushNotificationConfig( - { taskId, id: configId, tenant: '' }, + { taskId, id: configId, tenant: tenant || '' }, context ); return config; @@ -277,10 +281,11 @@ export class RestTransportHandler { async deleteTaskPushNotificationConfig( taskId: string, configId: string, - context: ServerCallContext + context: ServerCallContext, + tenant?: string ): Promise { await this.requestHandler.deleteTaskPushNotificationConfig( - { taskId, id: configId, tenant: '' }, + { taskId, id: configId, tenant: tenant || '' }, context ); } diff --git a/test/client/transports/rest_transport.spec.ts b/test/client/transports/rest_transport.spec.ts index 046df52b..ee390bdb 100644 --- a/test/client/transports/rest_transport.spec.ts +++ b/test/client/transports/rest_transport.spec.ts @@ -101,6 +101,19 @@ describe('RestTransport', () => { ); }); + it('should send message with tenant prefix successfully', async () => { + const messageParams = createMessageParams(); + messageParams.tenant = 'tenant1'; + const mockResponse = createMockProtoMessage(); + + mockFetch.mockResolvedValue(createRestResponse(mockResponse)); + + await transport.sendMessage(messageParams); + + const [url] = mockFetch.mock.calls[0]; + expect(url).to.equal(`${endpoint}/tenant1/message:send`); + }); + it('should correctly add the extension headers', async () => { const messageParams = createMessageParams(); const expectedExtensions = 'extension1,extension2'; @@ -141,6 +154,18 @@ describe('RestTransport', () => { expect(options?.method).to.equal('GET'); }); + it('should get task with tenant prefix successfully', async () => { + const taskId = 'task-123'; + const mockTask = createMockProtoTask(taskId); + + mockFetch.mockResolvedValue(createRestResponse(mockTask)); + + await transport.getTask({ id: taskId, tenant: 'tenant1', historyLength: 0 }); + + const [url] = mockFetch.mock.calls[0]; + expect(url).to.equal(`${endpoint}/tenant1/tasks/${taskId}?historyLength=0`); + }); + it('should pass historyLength as query parameter', async () => { const taskId = 'task-123'; const historyLength = 10; @@ -193,6 +218,44 @@ describe('RestTransport', () => { }); }); + describe('listTasks', () => { + it('should list tasks successfully', async () => { + const mockResponse = { tasks: [] as any[], nextPageToken: '', pageSize: 0, totalSize: 0 }; + mockFetch.mockResolvedValue(createRestResponse(mockResponse)); + + const result = await transport.listTasks({ + tenant: '', + contextId: '', + status: TaskState.TASK_STATE_UNSPECIFIED, + pageToken: '', + statusTimestampAfter: '', + }); + + expect(result).to.deep.equal(mockResponse); + expect(mockFetch).toHaveBeenCalledTimes(1); + + const [url, options] = mockFetch.mock.calls[0]; + expect(url).to.equal(`${endpoint}/tasks?status=0`); + expect(options?.method).to.equal('GET'); + }); + + it('should list tasks with tenant prefix successfully', async () => { + const mockResponse = { tasks: [] as any[], nextPageToken: '', pageSize: 0, totalSize: 0 }; + mockFetch.mockResolvedValue(createRestResponse(mockResponse)); + + await transport.listTasks({ + tenant: 'tenant1', + contextId: '', + status: TaskState.TASK_STATE_UNSPECIFIED, + pageToken: '', + statusTimestampAfter: '', + }); + + const [url] = mockFetch.mock.calls[0]; + expect(url).to.equal(`${endpoint}/tenant1/tasks?status=0`); + }); + }); + describe('getExtendedAgentCard', () => { it('should get extended agent card successfully', async () => { const mockCard: AgentCard = { diff --git a/test/server/express/rest_handler.spec.ts b/test/server/express/rest_handler.spec.ts index 82ed8fb0..4d3e4126 100644 --- a/test/server/express/rest_handler.spec.ts +++ b/test/server/express/rest_handler.spec.ts @@ -128,6 +128,13 @@ describe('restHandler', () => { assert.deepEqual(response.body.name, testAgentCard.name); }); + it('should return the agent card with 200 OK when tenant is provided', async () => { + const response = await request(app).get('/tenant1/extendedAgentCard').expect(200); + + expect(mockRequestHandler.getAuthenticatedExtendedAgentCard as Mock).toHaveBeenCalledTimes(1); + assert.deepEqual(response.body.name, testAgentCard.name); + }); + it('should return 400 if getAuthenticatedExtendedAgentCard fails', async () => { (mockRequestHandler.getAuthenticatedExtendedAgentCard as Mock).mockRejectedValue( new RequestMalformedError('Card fetch failed') @@ -164,6 +171,23 @@ describe('restHandler', () => { assert.isUndefined(response.body.kind); }); + it('should accept message with tenant prefix and pass tenant to handler', async () => { + const message = ProtoMessage.toJSON(testMessage); + (mockRequestHandler.sendMessage as Mock).mockResolvedValue(testTask); + + await request(app).post('/tenant1/message:send').send({ message }).expect(201); + + expect(mockRequestHandler.sendMessage).toHaveBeenCalledWith( + expect.objectContaining({ + tenant: 'tenant1', + message: expect.objectContaining({ + messageId: 'msg-1', + }), + }), + expect.anything() + ); + }); + it('should return 400 when message is invalid', async () => { (mockRequestHandler.sendMessage as Mock).mockRejectedValue( new RequestMalformedError('Message is required') From 939714e60ce526670acd219361932b287d451263 Mon Sep 17 00:00:00 2001 From: Bartek Gralewicz Date: Tue, 21 Apr 2026 12:08:02 +0000 Subject: [PATCH 02/16] Minor code simplifications after self code review. --- src/client/transports/rest_transport.ts | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/client/transports/rest_transport.ts b/src/client/transports/rest_transport.ts index d20529ce..9ef75b28 100644 --- a/src/client/transports/rest_transport.ts +++ b/src/client/transports/rest_transport.ts @@ -108,7 +108,6 @@ export class RestTransport implements Transport { params: TaskPushNotificationConfig, options?: RequestOptions ): Promise { - const requestBody = params; const path = this._buildPath( `/tasks/${encodeURIComponent(params.taskId)}/pushNotificationConfigs`, params.tenant @@ -116,7 +115,7 @@ export class RestTransport implements Transport { const response = await this._sendRequest< TaskPushNotificationConfig, TaskPushNotificationConfig - >('POST', path, requestBody, options, TaskPushNotificationConfig, TaskPushNotificationConfig); + >('POST', path, params, options, TaskPushNotificationConfig, TaskPushNotificationConfig); return response; } @@ -201,18 +200,19 @@ export class RestTransport implements Transport { } async listTasks(params: ListTasksRequest, options?: RequestOptions): Promise { - const queryParams: Record = {}; - if (params.contextId) queryParams.contextId = params.contextId; - if (params.status !== undefined) queryParams.status = params.status.toString(); - if (params.pageSize !== undefined) queryParams.pageSize = params.pageSize.toString(); - if (params.pageToken) queryParams.pageToken = params.pageToken; + const queryParams = new URLSearchParams(); + if (params.contextId) queryParams.set('contextId', params.contextId); + if (params.status !== undefined) queryParams.set('status', params.status.toString()); + if (params.pageSize !== undefined) queryParams.set('pageSize', params.pageSize.toString()); + if (params.pageToken) queryParams.set('pageToken', params.pageToken); if (params.historyLength !== undefined) - queryParams.historyLength = params.historyLength.toString(); - if (params.statusTimestampAfter) queryParams.statusTimestampAfter = params.statusTimestampAfter; + queryParams.set('historyLength', params.historyLength.toString()); + if (params.statusTimestampAfter) + queryParams.set('statusTimestampAfter', params.statusTimestampAfter); if (params.includeArtifacts !== undefined) - queryParams.includeArtifacts = params.includeArtifacts.toString(); + queryParams.set('includeArtifacts', params.includeArtifacts.toString()); - const queryString = new URLSearchParams(queryParams).toString(); + const queryString = queryParams.toString(); const path = this._buildPath(`/tasks${queryString ? `?${queryString}` : ''}`, params.tenant); const response = await this._sendRequest( From 6ffe5944acdc32905da061dbfed46ee1793b0814 Mon Sep 17 00:00:00 2001 From: Bartek Gralewicz Date: Tue, 21 Apr 2026 12:33:28 +0000 Subject: [PATCH 03/16] Restore original listTasks params setting. --- src/client/transports/rest_transport.ts | 12 ++++++++---- test/client/transports/rest_transport.spec.ts | 4 ++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/client/transports/rest_transport.ts b/src/client/transports/rest_transport.ts index 9ef75b28..aeceb33f 100644 --- a/src/client/transports/rest_transport.ts +++ b/src/client/transports/rest_transport.ts @@ -33,6 +33,8 @@ import { SubscribeToTaskRequest, ListTasksRequest, ListTasksResponse, + TaskState, + taskStateToJSON, } from '../../types/pb/a2a.js'; const PROTOCOL_NAME: TransportProtocolName = 'HTTP+JSON'; @@ -202,15 +204,17 @@ export class RestTransport implements Transport { async listTasks(params: ListTasksRequest, options?: RequestOptions): Promise { const queryParams = new URLSearchParams(); if (params.contextId) queryParams.set('contextId', params.contextId); - if (params.status !== undefined) queryParams.set('status', params.status.toString()); - if (params.pageSize !== undefined) queryParams.set('pageSize', params.pageSize.toString()); + if (params.status !== undefined && params.status !== TaskState.TASK_STATE_UNSPECIFIED) { + queryParams.set('status', taskStateToJSON(params.status)); + } + if (params.pageSize !== undefined) queryParams.set('pageSize', String(params.pageSize)); if (params.pageToken) queryParams.set('pageToken', params.pageToken); if (params.historyLength !== undefined) - queryParams.set('historyLength', params.historyLength.toString()); + queryParams.set('historyLength', String(params.historyLength)); if (params.statusTimestampAfter) queryParams.set('statusTimestampAfter', params.statusTimestampAfter); if (params.includeArtifacts !== undefined) - queryParams.set('includeArtifacts', params.includeArtifacts.toString()); + queryParams.set('includeArtifacts', String(params.includeArtifacts)); const queryString = queryParams.toString(); const path = this._buildPath(`/tasks${queryString ? `?${queryString}` : ''}`, params.tenant); diff --git a/test/client/transports/rest_transport.spec.ts b/test/client/transports/rest_transport.spec.ts index ee390bdb..084b249a 100644 --- a/test/client/transports/rest_transport.spec.ts +++ b/test/client/transports/rest_transport.spec.ts @@ -235,7 +235,7 @@ describe('RestTransport', () => { expect(mockFetch).toHaveBeenCalledTimes(1); const [url, options] = mockFetch.mock.calls[0]; - expect(url).to.equal(`${endpoint}/tasks?status=0`); + expect(url).to.equal(`${endpoint}/tasks`); expect(options?.method).to.equal('GET'); }); @@ -252,7 +252,7 @@ describe('RestTransport', () => { }); const [url] = mockFetch.mock.calls[0]; - expect(url).to.equal(`${endpoint}/tenant1/tasks?status=0`); + expect(url).to.equal(`${endpoint}/tenant1/tasks`); }); }); From 4d7d4477adde9568734eaa07690bb8304fce6ffd Mon Sep 17 00:00:00 2001 From: Bartek Gralewicz Date: Tue, 21 Apr 2026 12:52:24 +0000 Subject: [PATCH 04/16] Added additional encodeURIComponent for string safety. --- src/client/transports/rest_transport.ts | 28 +++++++++++++++++-------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/src/client/transports/rest_transport.ts b/src/client/transports/rest_transport.ts index aeceb33f..b2b0f295 100644 --- a/src/client/transports/rest_transport.ts +++ b/src/client/transports/rest_transport.ts @@ -126,7 +126,9 @@ export class RestTransport implements Transport { options?: RequestOptions ): Promise { const path = this._buildPath( - `/tasks/${params.taskId}/pushNotificationConfigs/${params.id}`, + `/tasks/${encodeURIComponent(params.taskId)}/pushNotificationConfigs/${encodeURIComponent( + params.id + )}`, params.tenant ); const response = await this._sendRequest( @@ -144,7 +146,10 @@ export class RestTransport implements Transport { params: ListTaskPushNotificationConfigsRequest, options?: RequestOptions ): Promise { - const path = this._buildPath(`/tasks/${params.taskId}/pushNotificationConfigs`, params.tenant); + const path = this._buildPath( + `/tasks/${encodeURIComponent(params.taskId)}/pushNotificationConfigs`, + params.tenant + ); const response = await this._sendRequest( 'GET', path, @@ -161,20 +166,22 @@ export class RestTransport implements Transport { options?: RequestOptions ): Promise { const path = this._buildPath( - `/tasks/${params.taskId}/pushNotificationConfigs/${params.id}`, + `/tasks/${encodeURIComponent(params.taskId)}/pushNotificationConfigs/${encodeURIComponent( + params.id + )}`, params.tenant ); await this._sendRequest('DELETE', path, undefined, options, undefined, undefined); } async getTask(params: GetTaskRequest, options?: RequestOptions): Promise { - const queryParams: Record = {}; + const queryParams = new URLSearchParams(); if (params.historyLength !== undefined) { - queryParams.historyLength = params.historyLength.toString(); + queryParams.set('historyLength', params.historyLength.toString()); } - const queryString = new URLSearchParams(queryParams).toString(); + const queryString = queryParams.toString(); const path = this._buildPath( - `/tasks/${params.id}${queryString ? `?${queryString}` : ''}`, + `/tasks/${encodeURIComponent(params.id)}${queryString ? `?${queryString}` : ''}`, params.tenant ); const response = await this._sendRequest( @@ -189,7 +196,7 @@ export class RestTransport implements Transport { } async cancelTask(params: CancelTaskRequest, options?: RequestOptions): Promise { - const path = this._buildPath(`/tasks/${params.id}:cancel`, params.tenant); + const path = this._buildPath(`/tasks/${encodeURIComponent(params.id)}:cancel`, params.tenant); const response = await this._sendRequest( 'POST', path, @@ -234,7 +241,10 @@ export class RestTransport implements Transport { params: SubscribeToTaskRequest, options?: RequestOptions ): AsyncGenerator { - const path = this._buildPath(`/tasks/${params.id}:subscribe`, params.tenant); + const path = this._buildPath( + `/tasks/${encodeURIComponent(params.id)}:subscribe`, + params.tenant + ); yield* this._sendStreamingRequest(path, undefined, options); } From 945473c3ddc9bf5b341be27d648a841b318f8439 Mon Sep 17 00:00:00 2001 From: Bartek Gralewicz Date: Wed, 22 Apr 2026 07:23:39 +0000 Subject: [PATCH 05/16] Add URI encoding for tenant value. --- src/client/transports/rest_transport.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/transports/rest_transport.ts b/src/client/transports/rest_transport.ts index b2b0f295..c76bf4d9 100644 --- a/src/client/transports/rest_transport.ts +++ b/src/client/transports/rest_transport.ts @@ -61,7 +61,7 @@ export class RestTransport implements Transport { } private _buildPath(path: string, tenant?: string): string { - return tenant ? `/${tenant}${path}` : path; + return tenant ? '/' + encodeURIComponent(tenant) + path : path; } get protocolName(): string { From f72068ab7b1e8501661338f14e36096fa9332f87 Mon Sep 17 00:00:00 2001 From: Bartek Gralewicz Date: Wed, 22 Apr 2026 09:08:35 +0000 Subject: [PATCH 06/16] Add initial support for multi-tenancy in json-rpc and grpc transports. --- src/server/express/rest_handler.ts | 5 ++++- src/server/grpc/grpc_service.ts | 4 ++-- .../request_handler/a2a_request_handler.ts | 3 ++- .../request_handler/default_request_handler.ts | 3 ++- .../jsonrpc/jsonrpc_transport_handler.ts | 6 +++++- .../transports/rest/rest_transport_handler.ts | 8 ++++++-- test/server/default_request_handler.spec.ts | 8 ++++---- test/server/grpc/grpc_handler.spec.ts | 15 +++++++++++++-- test/server/jsonrpc_transport_handler.spec.ts | 17 +++++++++++++++++ 9 files changed, 55 insertions(+), 14 deletions(-) diff --git a/src/server/express/rest_handler.ts b/src/server/express/rest_handler.ts index cab5db9e..7c0340f3 100644 --- a/src/server/express/rest_handler.ts +++ b/src/server/express/rest_handler.ts @@ -282,7 +282,10 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { */ registerRoute('get', '/extendedAgentCard', async (req, res) => { const context = await buildContext(req); - const result = await restTransportHandler.getAuthenticatedExtendedAgentCard(context); + const result = await restTransportHandler.getAuthenticatedExtendedAgentCard( + { tenant: (req.params.tenant as string) || '' }, + context + ); sendResponse(res, HTTP_STATUS.OK, context, result, AgentCard); }); diff --git a/src/server/grpc/grpc_service.ts b/src/server/grpc/grpc_service.ts index aa285b10..75120b68 100644 --- a/src/server/grpc/grpc_service.ts +++ b/src/server/grpc/grpc_service.ts @@ -202,8 +202,8 @@ export function grpcService(options: GrpcServiceOptions): A2AServiceServer { call: grpc.ServerUnaryCall, callback: grpc.sendUnaryData ): Promise { - return wrapUnary(call, callback, (_params, context) => - requestHandler.getAuthenticatedExtendedAgentCard(context) + return wrapUnary(call, callback, (params, context) => + requestHandler.getAuthenticatedExtendedAgentCard(params, context) ); }, listTasks( diff --git a/src/server/request_handler/a2a_request_handler.ts b/src/server/request_handler/a2a_request_handler.ts index 76569771..1785eca1 100644 --- a/src/server/request_handler/a2a_request_handler.ts +++ b/src/server/request_handler/a2a_request_handler.ts @@ -6,6 +6,7 @@ import { ListTaskPushNotificationConfigsRequest, GetTaskPushNotificationConfigRequest, DeleteTaskPushNotificationConfigRequest, + GetExtendedAgentCardRequest, CancelTaskRequest, GetTaskRequest, SubscribeToTaskRequest, @@ -20,7 +21,7 @@ import { ServerCallContext } from '../context.js'; export interface A2ARequestHandler { getAgentCard(): Promise; - getAuthenticatedExtendedAgentCard(context: ServerCallContext): Promise; + getAuthenticatedExtendedAgentCard(params: GetExtendedAgentCardRequest, context: ServerCallContext): Promise; sendMessage(params: SendMessageRequest, context: ServerCallContext): Promise; diff --git a/src/server/request_handler/default_request_handler.ts b/src/server/request_handler/default_request_handler.ts index 83808ee1..08104120 100644 --- a/src/server/request_handler/default_request_handler.ts +++ b/src/server/request_handler/default_request_handler.ts @@ -21,6 +21,7 @@ import { SendMessageRequest, GetTaskRequest, CancelTaskRequest, + GetExtendedAgentCardRequest, GetTaskPushNotificationConfigRequest, ListTaskPushNotificationConfigsRequest, DeleteTaskPushNotificationConfigRequest, @@ -92,7 +93,7 @@ export class DefaultRequestHandler implements A2ARequestHandler { return this.agentCard; } - async getAuthenticatedExtendedAgentCard(context: ServerCallContext): Promise { + async getAuthenticatedExtendedAgentCard(_params: GetExtendedAgentCardRequest, context: ServerCallContext): Promise { if (!this.agentCard.capabilities?.extendedAgentCard) { throw new UnsupportedOperationError('Agent does not support authenticated extended card.'); } diff --git a/src/server/transports/jsonrpc/jsonrpc_transport_handler.ts b/src/server/transports/jsonrpc/jsonrpc_transport_handler.ts index 38d3b04e..149fbcd4 100644 --- a/src/server/transports/jsonrpc/jsonrpc_transport_handler.ts +++ b/src/server/transports/jsonrpc/jsonrpc_transport_handler.ts @@ -5,6 +5,7 @@ import { SendMessageRequest, SubscribeToTaskRequest, GetTaskRequest, + GetExtendedAgentCardRequest, CancelTaskRequest, TaskPushNotificationConfig, GetTaskPushNotificationConfigRequest, @@ -210,7 +211,10 @@ export class JsonRpcTransportHandler { break; case 'GetExtendedAgentCard': result = AgentCard.toJSON( - await this.requestHandler.getAuthenticatedExtendedAgentCard(context) + await this.requestHandler.getAuthenticatedExtendedAgentCard( + GetExtendedAgentCardRequest.fromJSON(rpcRequest.params), + context + ) ); break; default: diff --git a/src/server/transports/rest/rest_transport_handler.ts b/src/server/transports/rest/rest_transport_handler.ts index 35b90350..113ab3cb 100644 --- a/src/server/transports/rest/rest_transport_handler.ts +++ b/src/server/transports/rest/rest_transport_handler.ts @@ -16,6 +16,7 @@ import { StreamResponse, GetTaskRequest, CancelTaskRequest, + GetExtendedAgentCardRequest, ListTasksRequest, ListTasksResponse, TaskState, @@ -119,8 +120,11 @@ export class RestTransportHandler { /** * Gets the authenticated extended agent card. */ - async getAuthenticatedExtendedAgentCard(context: ServerCallContext): Promise { - return this.requestHandler.getAuthenticatedExtendedAgentCard(context); + async getAuthenticatedExtendedAgentCard( + params: GetExtendedAgentCardRequest, + context: ServerCallContext + ): Promise { + return this.requestHandler.getAuthenticatedExtendedAgentCard(params, context); } /** diff --git a/test/server/default_request_handler.spec.ts b/test/server/default_request_handler.spec.ts index bbe18c81..bfc459e2 100644 --- a/test/server/default_request_handler.spec.ts +++ b/test/server/default_request_handler.spec.ts @@ -3020,7 +3020,7 @@ describe('DefaultRequestHandler as A2ARequestHandler', () => { it('getAuthenticatedExtendedAgentCard should fail if the agent card does not support extended agent card', async () => { let caughtError; try { - await handler.getAuthenticatedExtendedAgentCard(serverCallContext); + await handler.getAuthenticatedExtendedAgentCard({ tenant: '' }, serverCallContext); } catch (error: any) { caughtError = error; } finally { @@ -3040,7 +3040,7 @@ describe('DefaultRequestHandler as A2ARequestHandler', () => { ); let caughtError; try { - await handler.getAuthenticatedExtendedAgentCard(serverCallContext); + await handler.getAuthenticatedExtendedAgentCard({ tenant: '' }, serverCallContext); } catch (error: any) { caughtError = error; } finally { @@ -3061,7 +3061,7 @@ describe('DefaultRequestHandler as A2ARequestHandler', () => { ); const context = new ServerCallContext(undefined, new A2AUser(true)); - const agentCard = await handler.getAuthenticatedExtendedAgentCard(context); + const agentCard = await handler.getAuthenticatedExtendedAgentCard({ tenant: '' }, context); assert.deepEqual(agentCard, extendedAgentCard); }); @@ -3077,7 +3077,7 @@ describe('DefaultRequestHandler as A2ARequestHandler', () => { ); const context = new ServerCallContext(undefined, new A2AUser(false)); - const agentCard = await handler.getAuthenticatedExtendedAgentCard(context); + const agentCard = await handler.getAuthenticatedExtendedAgentCard({ tenant: '' }, context); assert(agentCard.capabilities.extensions.length === 1); assert.deepEqual(agentCard.capabilities.extensions[0], { uri: 'requested-extension-uri', diff --git a/test/server/grpc/grpc_handler.spec.ts b/test/server/grpc/grpc_handler.spec.ts index 11b1dac4..27b898fa 100644 --- a/test/server/grpc/grpc_handler.spec.ts +++ b/test/server/grpc/grpc_handler.spec.ts @@ -108,7 +108,7 @@ describe('grpcHandler', () => { describe('getExtendedAgentCard', () => { it('should return agent card via gRPC callback', async () => { - const call = createMockUnaryCall({}); + const call = createMockUnaryCall({ tenant: '' }); const callback = vi.fn(); await handler.getExtendedAgentCard(call, callback); @@ -123,7 +123,7 @@ describe('grpcHandler', () => { (mockRequestHandler.getAuthenticatedExtendedAgentCard as Mock).mockRejectedValue( new TaskNotFoundError('Not Found') ); - const call = createMockUnaryCall({}); + const call = createMockUnaryCall({ tenant: '' }); const callback = vi.fn(); await handler.getExtendedAgentCard(call, callback); @@ -132,6 +132,17 @@ describe('grpcHandler', () => { assert.equal(err.code, grpc.status.NOT_FOUND); assert.equal(err.details, 'Not Found'); }); + + it('should pass tenant from request to request handler', async () => { + const call = createMockUnaryCall({ tenant: 'test-tenant' }); + const callback = vi.fn(); + await handler.getExtendedAgentCard(call, callback); + + expect(mockRequestHandler.getAuthenticatedExtendedAgentCard).toHaveBeenCalledWith( + expect.objectContaining({ tenant: 'test-tenant' }), + expect.anything() + ); + }); }); describe('sendMessage', () => { diff --git a/test/server/jsonrpc_transport_handler.spec.ts b/test/server/jsonrpc_transport_handler.spec.ts index e0a74cfc..e6878e16 100644 --- a/test/server/jsonrpc_transport_handler.spec.ts +++ b/test/server/jsonrpc_transport_handler.spec.ts @@ -201,6 +201,23 @@ describe('JsonRpcTransportHandler', () => { }); }); + describe('Method handling', () => { + it('should pass tenant from params to getAuthenticatedExtendedAgentCard', async () => { + const request = { + jsonrpc: '2.0', + method: 'GetExtendedAgentCard', + id: 1, + params: { tenant: 'test-tenant' }, + }; + await transportHandler.handle(request, defaultContext); + + expect(mockRequestHandler.getAuthenticatedExtendedAgentCard).toHaveBeenCalledWith( + expect.objectContaining({ tenant: 'test-tenant' }), + expect.anything() + ); + }); + }); + describe('Error mapping', () => { it('should map RequestMalformedError to code and message', async () => { const mappedError = JsonRpcTransportHandler.mapToJSONRPCError( From 490562a2e3801180ce04cbf6ef58f0d472404642 Mon Sep 17 00:00:00 2001 From: Bartek Gralewicz Date: Wed, 22 Apr 2026 11:33:38 +0000 Subject: [PATCH 07/16] Aling all transports to support multi-tenancy. --- .betterer.results | 4 +- src/client/multitransport-client.ts | 8 +- src/client/transports/grpc/grpc_transport.ts | 7 +- src/client/transports/json_rpc_transport.ts | 12 +- src/client/transports/rest_transport.ts | 9 +- src/client/transports/transport.ts | 6 +- src/server/agent_execution/agent_executor.ts | 5 + src/server/context.ts | 8 +- src/server/express/rest_handler.ts | 52 +++- src/server/grpc/grpc_service.ts | 22 +- .../push_notification_store.ts | 38 ++- .../request_handler/a2a_request_handler.ts | 5 +- .../default_request_handler.ts | 25 +- src/server/store.ts | 57 ++++- .../jsonrpc/jsonrpc_transport_handler.ts | 9 + test/client/multitransport-client.spec.ts | 7 +- test/client/transports/grpc_transport.spec.ts | 2 +- test/client/transports/rest_transport.spec.ts | 41 ++- test/e2e.spec.ts | 207 +++++++++++++++- test/server/rest_transport_handler.spec.ts | 5 +- test/server/tenant_isolation.spec.ts | 233 ++++++++++++++++++ 21 files changed, 704 insertions(+), 58 deletions(-) create mode 100644 test/server/tenant_isolation.spec.ts diff --git a/.betterer.results b/.betterer.results index bd9b09e3..1fb94338 100644 --- a/.betterer.results +++ b/.betterer.results @@ -9,8 +9,8 @@ exports[`TypeScript Strict Mode`] = { [327, 15, 4, "tsc: Expected 2 arguments, but got 1.", "2087764327"], [350, 15, 4, "tsc: Expected 2 arguments, but got 1.", "2087764327"] ], - "src/server/transports/jsonrpc/jsonrpc_transport_handler.ts:3231104525": [ - [91, 12, 10, "tsc: Variable \'rpcRequest\' is used before being assigned.", "3927050741"] + "src/server/transports/jsonrpc/jsonrpc_transport_handler.ts:1546714885": [ + [92, 12, 10, "tsc: Variable \'rpcRequest\' is used before being assigned.", "3927050741"] ] }` }; diff --git a/src/client/multitransport-client.ts b/src/client/multitransport-client.ts index 82f1eda3..6c2dc7f3 100644 --- a/src/client/multitransport-client.ts +++ b/src/client/multitransport-client.ts @@ -77,13 +77,19 @@ export class Client { /** * If the current agent card supports the extended feature, it will try to fetch the extended agent card from the server, * Otherwise it will return the current agent card value. + * + * The tenant for the request is inferred from the agent card's interface declaration. */ async getAgentCard(options?: RequestOptions): Promise { if (this.agentCard.capabilities?.extendedAgentCard) { + const tenant = + this.agentCard.supportedInterfaces?.find( + (iface) => iface.protocolBinding === this.transport.protocolName + )?.tenant ?? ''; this.agentCard = await this.executeWithInterceptors( { method: 'getAgentCard' }, options, - (_, options) => this.transport.getExtendedAgentCard(options) + (_, options) => this.transport.getExtendedAgentCard({ tenant }, options) ); } return this.agentCard; diff --git a/src/client/transports/grpc/grpc_transport.ts b/src/client/transports/grpc/grpc_transport.ts index 3b13f88d..9aed2c57 100644 --- a/src/client/transports/grpc/grpc_transport.ts +++ b/src/client/transports/grpc/grpc_transport.ts @@ -69,10 +69,13 @@ export class GrpcTransport implements Transport { return PROTOCOL_NAME; } - async getExtendedAgentCard(options?: RequestOptions): Promise { + async getExtendedAgentCard( + params: GetExtendedAgentCardRequest, + options?: RequestOptions + ): Promise { const rpcResponse = await this._sendGrpcRequest( 'getExtendedAgentCard', - { tenant: '' }, + params, options, this.grpcClient.getExtendedAgentCard.bind(this.grpcClient) ); diff --git a/src/client/transports/json_rpc_transport.ts b/src/client/transports/json_rpc_transport.ts index fdf70cd2..6d8af770 100644 --- a/src/client/transports/json_rpc_transport.ts +++ b/src/client/transports/json_rpc_transport.ts @@ -17,6 +17,7 @@ import { Transport, TransportFactory } from './transport.js'; import { CancelTaskRequest, DeleteTaskPushNotificationConfigRequest, + GetExtendedAgentCardRequest, MessageFns, SendMessageRequest, SubscribeToTaskRequest, @@ -51,12 +52,15 @@ export class JsonRpcTransport implements Transport { return PROTOCOL_NAME; } - async getExtendedAgentCard(options?: RequestOptions): Promise { - const rpcResponse = await this._sendRpcRequest( + async getExtendedAgentCard( + params: GetExtendedAgentCardRequest, + options?: RequestOptions + ): Promise { + const rpcResponse = await this._sendRpcRequest( 'GetExtendedAgentCard', - undefined, + params, options, - undefined + GetExtendedAgentCardRequest ); return AgentCard.fromJSON(rpcResponse.result); } diff --git a/src/client/transports/rest_transport.ts b/src/client/transports/rest_transport.ts index c76bf4d9..4eef8c92 100644 --- a/src/client/transports/rest_transport.ts +++ b/src/client/transports/rest_transport.ts @@ -20,6 +20,7 @@ import { AgentCard, CancelTaskRequest, DeleteTaskPushNotificationConfigRequest, + GetExtendedAgentCardRequest, GetTaskPushNotificationConfigRequest, GetTaskRequest, ListTaskPushNotificationConfigsRequest, @@ -68,10 +69,14 @@ export class RestTransport implements Transport { return PROTOCOL_NAME; } - async getExtendedAgentCard(options?: RequestOptions): Promise { + async getExtendedAgentCard( + params: GetExtendedAgentCardRequest, + options?: RequestOptions + ): Promise { + const path = this._buildPath('/extendedAgentCard', params.tenant); const response = await this._sendRequest( 'GET', - '/extendedAgentCard', + path, undefined, options, undefined, diff --git a/src/client/transports/transport.ts b/src/client/transports/transport.ts index 8685e1c8..2273b701 100644 --- a/src/client/transports/transport.ts +++ b/src/client/transports/transport.ts @@ -9,6 +9,7 @@ import { ListTaskPushNotificationConfigsResponse, DeleteTaskPushNotificationConfigRequest, GetTaskRequest, + GetExtendedAgentCardRequest, GetTaskPushNotificationConfigRequest, SubscribeToTaskRequest, SendMessageResult, @@ -20,7 +21,10 @@ import { RequestOptions } from '../multitransport-client.js'; export interface Transport { get protocolName(): string; - getExtendedAgentCard(options?: RequestOptions): Promise; + getExtendedAgentCard( + params: GetExtendedAgentCardRequest, + options?: RequestOptions + ): Promise; sendMessage(params: SendMessageRequest, options?: RequestOptions): Promise; diff --git a/src/server/agent_execution/agent_executor.ts b/src/server/agent_execution/agent_executor.ts index 8477bdc3..4c7f2b68 100644 --- a/src/server/agent_execution/agent_executor.ts +++ b/src/server/agent_execution/agent_executor.ts @@ -4,6 +4,11 @@ import { RequestContext } from './request_context.js'; export interface AgentExecutor { /** * Executes the agent logic based on the request context and publishes events. + * + * In multi-tenant deployments, the tenant identifier is available via + * `requestContext.context.tenant`. Implementations MAY use this to scope + * agent behavior or data access by tenant. + * * @param requestContext The context of the current request. * @param eventBus The bus to publish execution events to. */ diff --git a/src/server/context.ts b/src/server/context.ts index 37c17c66..fce82392 100644 --- a/src/server/context.ts +++ b/src/server/context.ts @@ -4,11 +4,17 @@ import { User } from './authentication/user.js'; export class ServerCallContext { private readonly _requestedExtensions?: Extensions; private readonly _user?: User; + private readonly _tenant?: string; private _activatedExtensions?: Extensions; - constructor(requestedExtensions?: Extensions, user?: User) { + constructor(requestedExtensions?: Extensions, user?: User, tenant?: string) { this._requestedExtensions = requestedExtensions; this._user = user; + this._tenant = tenant; + } + + get tenant(): string | undefined { + return this._tenant; } get user(): User | undefined { diff --git a/src/server/express/rest_handler.ts b/src/server/express/rest_handler.ts index 7c0340f3..31009ce7 100644 --- a/src/server/express/rest_handler.ts +++ b/src/server/express/rest_handler.ts @@ -108,16 +108,19 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { /** * Builds a ServerCallContext from the Express request. - * Extracts protocol extensions from headers and builds user from request. + * Extracts protocol extensions from headers, builds user from request, + * and extracts tenant from the URL path parameter if present. * * @param req - Express request object - * @returns ServerCallContext with requested extensions and authenticated user + * @returns ServerCallContext with requested extensions, authenticated user, and tenant */ const buildContext = async (req: Request): Promise => { const user = await options.userBuilder(req); + const tenant = (req.params.tenant as string) || undefined; return new ServerCallContext( Extensions.parseServiceParameter(req.header(HTTP_EXTENSION_HEADER)), - user + user, + tenant ); }; @@ -272,6 +275,28 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { router[method](`/:tenant${path}`, asyncHandler(handler)); }; + /** + * Resolves tenant for a request by applying the path-based tenant to the params object. + * If the request body already contains a tenant that differs from the path-based tenant, + * a warning is logged and the path-based tenant takes precedence (since it is the + * canonical source per the spec: "provided as a path parameter"). + * + * @param params - The deserialized request params object with a `tenant` field + * @param req - Express request with potential `req.params.tenant` from URL path + */ + const applyPathTenant = (params: T, req: Request): void => { + const pathTenant = req.params.tenant as string | undefined; + if (pathTenant) { + if (params.tenant && params.tenant !== pathTenant) { + console.warn( + `Tenant mismatch: URL path tenant "${pathTenant}" differs from request body ` + + `tenant "${params.tenant}". Using path tenant as the canonical value.` + ); + } + params.tenant = pathTenant; + } + }; + /** * GET /extendedAgentCard * @@ -303,9 +328,7 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { registerRoute('post', '/message\\:send', async (req, res) => { const context = await buildContext(req); const params = SendMessageRequest.fromJSON(req.body); - if (req.params.tenant) { - params.tenant = req.params.tenant; - } + applyPathTenant(params, req); const result = await restTransportHandler.sendMessage(params, context); const protoResult = ToProto.messageSendResult(result); sendResponse( @@ -332,9 +355,7 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { registerRoute('post', '/message\\:stream', async (req, res) => { const context = await buildContext(req); const params = SendMessageRequest.fromJSON(req.body); - if (req.params.tenant) { - params.tenant = req.params.tenant; - } + applyPathTenant(params, req); const stream = await restTransportHandler.sendMessageStream(params, context); await sendStreamResponse(res, stream, context); }); @@ -394,7 +415,16 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { registerRoute('get', '/tasks', async (req, res) => { const context = await buildContext(req); const query = { ...req.query }; + // For listTasks, tenant comes from the URL path (not from query params). + // The path tenant is already in the context; pass it along in the query object + // so RestTransportHandler can include it in the ListTasksRequest. if (req.params.tenant) { + if (query.tenant && query.tenant !== req.params.tenant) { + console.warn( + `Tenant mismatch: URL path tenant "${req.params.tenant}" differs from query ` + + `param tenant "${query.tenant}". Using path tenant as the canonical value.` + ); + } query.tenant = req.params.tenant; } const result = await restTransportHandler.listTasks(query, context); @@ -436,9 +466,7 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { registerRoute('post', '/tasks/:taskId/pushNotificationConfigs', async (req, res) => { const context = await buildContext(req); const params = TaskPushNotificationConfig.fromJSON(req.body); - if (req.params.tenant) { - params.tenant = req.params.tenant; - } + applyPathTenant(params, req); const result = await restTransportHandler.createTaskPushNotificationConfig(params, context); sendResponse( res, diff --git a/src/server/grpc/grpc_service.ts b/src/server/grpc/grpc_service.ts index 75120b68..3ae51ee1 100644 --- a/src/server/grpc/grpc_service.ts +++ b/src/server/grpc/grpc_service.ts @@ -64,7 +64,8 @@ export function grpcService(options: GrpcServiceOptions): A2AServiceServer { const requestHandler = options.requestHandler; /** - * Helper to wrap Unary calls with common logic (context, metadata, error handling) + * Helper to wrap Unary calls with common logic (context, metadata, error handling). + * Extracts tenant from the request if available and enriches the context. */ const wrapUnaryWithConverter = async ( call: grpc.ServerUnaryCall, @@ -73,7 +74,13 @@ export function grpcService(options: GrpcServiceOptions): A2AServiceServer { converter: (res: TResult) => TRes ) => { try { - const context = await buildContext(call, options.userBuilder); + let context = await buildContext(call, options.userBuilder); + // For gRPC, tenant is inside the request message. Extract it and enrich + // the context so downstream components (stores, executors) can scope by tenant. + const tenant = (call.request as Record)?.tenant as string | undefined; + if (tenant) { + context = new ServerCallContext(context.requestedExtensions, context.user, tenant); + } const result = await handler(call.request, context); call.sendMetadata(buildMetadata(context)); callback(null, converter(result)); @@ -91,14 +98,21 @@ export function grpcService(options: GrpcServiceOptions): A2AServiceServer { }; /** - * Helper to wrap Streaming calls with common logic (context, metadata, error handling) + * Helper to wrap Streaming calls with common logic (context, metadata, error handling). + * Extracts tenant from the request if available and enriches the context. */ const wrapStreaming = async ( call: grpc.ServerWritableStream, handler: (req: TReq, ctx: ServerCallContext) => AsyncGenerator ) => { try { - const context = await buildContext(call, options.userBuilder); + let context = await buildContext(call, options.userBuilder); + // For gRPC, tenant is inside the request message. Extract it and enrich + // the context so downstream components (stores, executors) can scope by tenant. + const tenant = (call.request as Record)?.tenant as string | undefined; + if (tenant) { + context = new ServerCallContext(context.requestedExtensions, context.user, tenant); + } const stream = await handler(call.request, context); call.sendMetadata(buildMetadata(context)); for await (const responsePart of stream) { diff --git a/src/server/push_notification/push_notification_store.ts b/src/server/push_notification/push_notification_store.ts index f96b7bdd..05ae2f26 100644 --- a/src/server/push_notification/push_notification_store.ts +++ b/src/server/push_notification/push_notification_store.ts @@ -1,6 +1,12 @@ import { TaskPushNotificationConfig } from '../../index.js'; import { ServerCallContext } from '../context.js'; +/** + * Interface for push notification configuration storage. + * + * Implementations SHOULD use `context.tenant` (when present) to scope data access, + * ensuring push notification configs from one tenant are not accessible to another. + */ export interface PushNotificationStore { save( taskId: string, @@ -11,15 +17,27 @@ export interface PushNotificationStore { delete(taskId: string, context: ServerCallContext, configId?: string): Promise; } +/** + * In-memory push notification config store with tenant-scoped data isolation. + * Uses `context.tenant` to build composite storage keys, preventing cross-tenant access. + */ export class InMemoryPushNotificationStore implements PushNotificationStore { private store: Map = new Map(); + /** + * Builds a composite storage key from tenant and task ID. + */ + private _storageKey(taskId: string, context: ServerCallContext): string { + return context.tenant ? `${context.tenant}:${taskId}` : taskId; + } + async save( taskId: string, - _context: ServerCallContext, + context: ServerCallContext, pushNotificationConfig: TaskPushNotificationConfig ): Promise { - const configs = this.store.get(taskId) || []; + const key = this._storageKey(taskId, context); + const configs = this.store.get(key) || []; // Set ID if it's not already set if (!pushNotificationConfig.id) { @@ -34,21 +52,23 @@ export class InMemoryPushNotificationStore implements PushNotificationStore { // Add the new/updated config configs.push(pushNotificationConfig); - this.store.set(taskId, configs); + this.store.set(key, configs); } - async load(taskId: string, _context: ServerCallContext): Promise { - const configs = this.store.get(taskId); + async load(taskId: string, context: ServerCallContext): Promise { + const key = this._storageKey(taskId, context); + const configs = this.store.get(key); return configs || []; } - async delete(taskId: string, _context: ServerCallContext, configId?: string): Promise { + async delete(taskId: string, context: ServerCallContext, configId?: string): Promise { // If no configId is provided, use taskId as the configId (backward compatibility) if (configId === undefined) { configId = taskId; } - const configs = this.store.get(taskId); + const key = this._storageKey(taskId, context); + const configs = this.store.get(key); if (!configs) { return; } @@ -59,9 +79,9 @@ export class InMemoryPushNotificationStore implements PushNotificationStore { } if (configs.length === 0) { - this.store.delete(taskId); + this.store.delete(key); } else { - this.store.set(taskId, configs); + this.store.set(key, configs); } } } diff --git a/src/server/request_handler/a2a_request_handler.ts b/src/server/request_handler/a2a_request_handler.ts index 1785eca1..968ee1b2 100644 --- a/src/server/request_handler/a2a_request_handler.ts +++ b/src/server/request_handler/a2a_request_handler.ts @@ -21,7 +21,10 @@ import { ServerCallContext } from '../context.js'; export interface A2ARequestHandler { getAgentCard(): Promise; - getAuthenticatedExtendedAgentCard(params: GetExtendedAgentCardRequest, context: ServerCallContext): Promise; + getAuthenticatedExtendedAgentCard( + params: GetExtendedAgentCardRequest, + context: ServerCallContext + ): Promise; sendMessage(params: SendMessageRequest, context: ServerCallContext): Promise; diff --git a/src/server/request_handler/default_request_handler.ts b/src/server/request_handler/default_request_handler.ts index 08104120..3340a25b 100644 --- a/src/server/request_handler/default_request_handler.ts +++ b/src/server/request_handler/default_request_handler.ts @@ -56,6 +56,26 @@ import { ServerCallContext } from '../context.js'; import { DEFAULT_PAGE_SIZE } from '../../constants.js'; import { TERMINAL_STATE_LIST } from '../utils.js'; +/** + * Default implementation of the A2A request handler. + * + * ## Multi-Tenancy + * + * This handler supports multi-tenant deployments through the `tenant` field present + * on all request objects (per A2A spec Sections 3.1.x and 4.4.6). The tenant value + * flows through the system as follows: + * + * 1. **Transport layer** extracts tenant from the protocol-specific source: + * - REST: URL path prefix (`/:tenant/...`) + * - JSON-RPC: `params.tenant` in the request body + * - gRPC: `tenant` field in the request message + * + * 2. **`ServerCallContext.tenant`** carries the tenant to all downstream components, + * including `TaskStore`, `PushNotificationStore`, and `AgentExecutor`. + * + * 3. **`InMemoryTaskStore`** and **`InMemoryPushNotificationStore`** use `context.tenant` + * to scope data with composite keys (`{tenant}:{id}`), providing tenant isolation. + */ export class DefaultRequestHandler implements A2ARequestHandler { private readonly agentCard: AgentCard; private readonly taskStore: TaskStore; @@ -93,7 +113,10 @@ export class DefaultRequestHandler implements A2ARequestHandler { return this.agentCard; } - async getAuthenticatedExtendedAgentCard(_params: GetExtendedAgentCardRequest, context: ServerCallContext): Promise { + async getAuthenticatedExtendedAgentCard( + _params: GetExtendedAgentCardRequest, + context: ServerCallContext + ): Promise { if (!this.agentCard.capabilities?.extendedAgentCard) { throw new UnsupportedOperationError('Agent does not support authenticated extended card.'); } diff --git a/src/server/store.ts b/src/server/store.ts index f3f6511b..ca0ecc4c 100644 --- a/src/server/store.ts +++ b/src/server/store.ts @@ -6,13 +6,18 @@ import { RequestMalformedError } from '../errors.js'; /** * Simplified interface for task storage providers. * Stores and retrieves the task. + * + * Implementations SHOULD use `context.tenant` (when present) to scope data access. + * Per spec Section 13.1, servers MUST ensure appropriate scope limitation based on + * the authenticated caller's authorization boundaries, which includes tenant isolation + * in multi-tenant deployments. */ export interface TaskStore { /** * Saves a task. * Overwrites existing data if the task ID exists. * @param task The task to save. - * @param context The context of the current call. + * @param context The context of the current call. Use `context.tenant` for tenant-scoped storage. * @returns A promise resolving when the save operation is complete. */ save(task: Task, context: ServerCallContext): Promise; @@ -20,7 +25,7 @@ export interface TaskStore { /** * Loads a task by task ID. * @param taskId The ID of the task to load. - * @param context The context of the current call. + * @param context The context of the current call. Use `context.tenant` for tenant-scoped lookups. * @returns A promise resolving to an object containing the Task, or undefined if not found. */ load(taskId: string, context: ServerCallContext): Promise; @@ -28,7 +33,7 @@ export interface TaskStore { /** * Lists tasks with filtering and pagination. * @param params Filtering and pagination parameters. - * @param context The context of the current call. + * @param context The context of the current call. Use `context.tenant` for tenant-scoped listing. */ list(params: ListTasksRequest, context: ServerCallContext): Promise; } @@ -37,25 +42,34 @@ export interface TaskStore { // InMemoryTaskStore // ======================== // -// Methods in InMemoryTaskStore accept ServerCallContext but do not use it. -// This is intentional to match the TaskStore interface. +// InMemoryTaskStore provides tenant-scoped data isolation using `context.tenant`. +// Tasks are stored with a composite key of `{tenant}:{taskId}` when a tenant is present. +// When no tenant is specified, tasks are stored under the taskId alone (global scope). -// Use Task directly for storage export class InMemoryTaskStore implements TaskStore { private store: Map = new Map(); - async load(taskId: string, _context: ServerCallContext): Promise { - const entry = this.store.get(taskId); + /** + * Builds a composite storage key from tenant and task ID. + * When tenant is present, the key is `{tenant}:{taskId}` to provide tenant isolation. + * When tenant is absent, the key is the taskId alone (global scope). + */ + private _storageKey(taskId: string, context: ServerCallContext): string { + return context.tenant ? `${context.tenant}:${taskId}` : taskId; + } + + async load(taskId: string, context: ServerCallContext): Promise { + const entry = this.store.get(this._storageKey(taskId, context)); // Return copies to prevent external mutation return entry ? { ...entry } : undefined; } - async save(task: Task, _context: ServerCallContext): Promise { + async save(task: Task, context: ServerCallContext): Promise { // Store copies to prevent internal mutation if caller reuses objects - this.store.set(task.id, { ...task }); + this.store.set(this._storageKey(task.id, context), { ...task }); } - async list(params: ListTasksRequest, _context: ServerCallContext): Promise { + async list(params: ListTasksRequest, context: ServerCallContext): Promise { const { contextId, status, @@ -66,7 +80,16 @@ export class InMemoryTaskStore implements TaskStore { includeArtifacts = false, } = params; - let tasks = Array.from(this.store.values()); + let tasks = Array.from(this.store.entries()) + // Filter by tenant: only return tasks whose storage key belongs to the current tenant scope + .filter(([key]) => { + if (context.tenant) { + return key.startsWith(`${context.tenant}:`); + } + // When no tenant is specified, only return global-scope tasks (no ':' prefix from tenanting) + return !key.includes(':') || this._isGlobalKey(key); + }) + .map(([, task]) => task); // Filter by contextId if (contextId) { @@ -154,4 +177,14 @@ export class InMemoryTaskStore implements TaskStore { totalSize, }; } + + /** + * Checks if a key that contains ':' is actually a global-scope key + * (i.e., the task ID itself contains ':'). This is determined by checking + * whether the key exists in the store as a task whose ID matches the full key. + */ + private _isGlobalKey(key: string): boolean { + const task = this.store.get(key); + return task !== undefined && task.id === key; + } } diff --git a/src/server/transports/jsonrpc/jsonrpc_transport_handler.ts b/src/server/transports/jsonrpc/jsonrpc_transport_handler.ts index 149fbcd4..5688082d 100644 --- a/src/server/transports/jsonrpc/jsonrpc_transport_handler.ts +++ b/src/server/transports/jsonrpc/jsonrpc_transport_handler.ts @@ -101,6 +101,15 @@ export class JsonRpcTransportHandler { throw new RequestMalformedError(`Invalid method parameters.`); } + // For JSON-RPC, tenant is inside the params body. Extract it and enrich the + // context so downstream components (stores, executors) can scope by tenant. + const paramsTenant = (rpcRequest.params as Record | undefined)?.tenant as + | string + | undefined; + if (paramsTenant && !context.tenant) { + context = new ServerCallContext(context.requestedExtensions, context.user, paramsTenant); + } + if (method === 'SendStreamingMessage' || method === 'SubscribeToTask') { const params = rpcRequest.params; const agentCard = await this.requestHandler.getAgentCard(); diff --git a/test/client/multitransport-client.spec.ts b/test/client/multitransport-client.spec.ts index d2e7c7bd..f131ddbc 100644 --- a/test/client/multitransport-client.spec.ts +++ b/test/client/multitransport-client.spec.ts @@ -77,8 +77,10 @@ describe('Client', () => { }; client = new Client(transport, agentCardWithExtendedSupport); - let caughtOptions; - transport.getExtendedAgentCard.mockImplementation(async (options) => { + let caughtParams: unknown; + let caughtOptions: unknown; + transport.getExtendedAgentCard.mockImplementation(async (params, options) => { + caughtParams = params; caughtOptions = options; return extendedAgentCard; }); @@ -90,6 +92,7 @@ describe('Client', () => { expect(transport.getExtendedAgentCard).toHaveBeenCalledTimes(1); expect(result).to.equal(extendedAgentCard); + expect(caughtParams).to.deep.equal({ tenant: '' }); expect(caughtOptions).to.equal(expectedOptions); }); diff --git a/test/client/transports/grpc_transport.spec.ts b/test/client/transports/grpc_transport.spec.ts index 10883d66..b1310585 100644 --- a/test/client/transports/grpc_transport.spec.ts +++ b/test/client/transports/grpc_transport.spec.ts @@ -89,7 +89,7 @@ describe('GrpcTransport', () => { const mockCard = createMockAgentCard(); mockUnarySuccess(mockGrpcClient.getExtendedAgentCard as Mock, mockCard); - const result = await transport.getExtendedAgentCard(); + const result = await transport.getExtendedAgentCard({ tenant: '' }); expect(result).toEqual(mockCard); expect(mockGrpcClient.getExtendedAgentCard).toHaveBeenCalled(); diff --git a/test/client/transports/rest_transport.spec.ts b/test/client/transports/rest_transport.spec.ts index 084b249a..1ed37dbc 100644 --- a/test/client/transports/rest_transport.spec.ts +++ b/test/client/transports/rest_transport.spec.ts @@ -290,7 +290,7 @@ describe('RestTransport', () => { mockFetch.mockResolvedValue(createRestResponse(AgentCard.toJSON(mockCard))); - const result = await transport.getExtendedAgentCard(); + const result = await transport.getExtendedAgentCard({ tenant: '' }); expect(result).toEqual(expect.objectContaining(mockCard)); expect(mockFetch).toHaveBeenCalledTimes(1); @@ -299,6 +299,45 @@ describe('RestTransport', () => { expect(url).to.equal(`${endpoint}/extendedAgentCard`); expect(options?.method).to.equal('GET'); }); + + it('should get extended agent card with tenant prefix', async () => { + const mockCard: AgentCard = { + name: 'Test Agent', + description: 'A test agent for testing', + capabilities: { + streaming: true, + pushNotifications: true, + extensions: [], + }, + skills: [], + defaultInputModes: ['text'], + defaultOutputModes: ['text'], + supportedInterfaces: [ + { + url: endpoint, + protocolBinding: 'HTTP+JSON', + tenant: 'my-tenant', + protocolVersion: '1.0.0', + }, + ], + version: '1.0.0', + provider: { + url: '', + organization: '', + }, + securityRequirements: [], + securitySchemes: {}, + documentationUrl: '', + signatures: [], + }; + + mockFetch.mockResolvedValue(createRestResponse(AgentCard.toJSON(mockCard))); + + await transport.getExtendedAgentCard({ tenant: 'my-tenant' }); + + const [url] = mockFetch.mock.calls[0]; + expect(url).to.equal(`${endpoint}/my-tenant/extendedAgentCard`); + }); }); describe('Push Notification Config', () => { diff --git a/test/e2e.spec.ts b/test/e2e.spec.ts index 8b323c1b..64a53421 100644 --- a/test/e2e.spec.ts +++ b/test/e2e.spec.ts @@ -10,7 +10,7 @@ import { RequestContext, } from '../src/server/index.js'; import { AgentEvent } from '../src/server/events/execution_event_bus.js'; -import { AgentCard, Message, Role, TaskState, StreamResponse } from '../src/index.js'; +import { AgentCard, Message, Role, Task, TaskState, StreamResponse } from '../src/index.js'; import { agentCardHandler } from '../src/server/express/agent_card_handler.js'; import { jsonRpcHandler } from '../src/server/express/json_rpc_handler.js'; import { restHandler } from '../src/server/express/rest_handler.js'; @@ -292,6 +292,211 @@ describe('Client E2E tests', () => { }); }); +describe('Multi-tenancy E2E tests', () => { + // Only REST supports tenant-prefixed URL routing. JSON-RPC uses body params, + // and gRPC uses request message fields (both tested via the transport handler unit tests). + describe('[REST] tenant-scoped routing', () => { + let app: Express; + let server: Server; + let agentExecutor: TestAgentExecutor; + let agentCard: AgentCard; + let clientFactory: ClientFactory; + + beforeEach(async () => { + agentExecutor = new TestAgentExecutor(); + agentCard = { + name: 'Test Agent', + description: 'A multi-tenant test agent', + version: '1.0.0', + supportedInterfaces: [ + { + url: 'localhost', + protocolBinding: 'HTTP+JSON', + tenant: 'test-tenant', + protocolVersion: '1.0.0', + }, + ], + capabilities: { + streaming: true, + pushNotifications: true, + extensions: [], + }, + defaultInputModes: ['text/plain'], + defaultOutputModes: ['text/plain'], + skills: [], + provider: { url: '', organization: '' }, + documentationUrl: '', + securityRequirements: [], + securitySchemes: {}, + signatures: [], + }; + const requestHandler = new DefaultRequestHandler( + agentCard, + new InMemoryTaskStore(), + agentExecutor + ); + + app = express(); + app.use( + '/a2a/rest', + restHandler({ requestHandler, userBuilder: UserBuilder.noAuthentication }) + ); + + server = app.listen(); + const address = server.address() as AddressInfo; + agentCard.supportedInterfaces![0].url = `http://localhost:${address.port}/a2a/rest`; + clientFactory = new ClientFactory(); + }); + + afterEach(() => { + server.close(); + }); + + it('should send a message via tenant-prefixed route and retrieve the task', async () => { + const tenant = 'test-tenant'; + agentExecutor.events = [ + AgentEvent.task({ + id: '1', + contextId: '2', + status: { + state: TaskState.TASK_STATE_SUBMITTED, + timestamp: undefined, + message: undefined, + }, + artifacts: [], + history: [], + metadata: {}, + }), + AgentEvent.statusUpdate({ + taskId: '1', + contextId: '2', + status: { + state: TaskState.TASK_STATE_COMPLETED, + timestamp: undefined, + message: undefined, + }, + metadata: {}, + }), + ]; + const client = await clientFactory.createFromAgentCard(agentCard); + + const result = await client.sendMessage({ + tenant, + message: createTestMessage('msg-1', 'Hello from tenant'), + configuration: undefined, + metadata: {}, + }); + + // Result should be a Task (not a Message) since we published task events + expect('id' in result).to.equal(true); + const task = result as Task; + expect(task.status?.state).to.equal(TaskState.TASK_STATE_COMPLETED); + + // Should be able to retrieve the task via the same tenant + const retrieved = await client.getTask({ + id: task.id, + tenant, + historyLength: 10, + }); + expect(retrieved.id).to.equal(task.id); + }); + + it('should isolate tasks between tenants', async () => { + const requestHandler = new DefaultRequestHandler( + agentCard, + new InMemoryTaskStore(), + agentExecutor + ); + + // Create a separate server with a fresh store + const isolationApp = express(); + isolationApp.use( + '/a2a/rest', + restHandler({ requestHandler, userBuilder: UserBuilder.noAuthentication }) + ); + const isolationServer = isolationApp.listen(); + const address = isolationServer.address() as AddressInfo; + + try { + const baseUrl = `http://localhost:${address.port}/a2a/rest`; + + // Send message as tenant-A + agentExecutor.events = [ + AgentEvent.task({ + id: 'task-a', + contextId: 'ctx-a', + status: { + state: TaskState.TASK_STATE_SUBMITTED, + timestamp: undefined, + message: undefined, + }, + artifacts: [], + history: [], + metadata: {}, + }), + AgentEvent.statusUpdate({ + taskId: 'task-a', + contextId: 'ctx-a', + status: { + state: TaskState.TASK_STATE_COMPLETED, + timestamp: undefined, + message: undefined, + }, + metadata: {}, + }), + ]; + + const tenantACard = { + ...agentCard, + supportedInterfaces: [ + { + url: baseUrl, + protocolBinding: 'HTTP+JSON', + tenant: 'tenant-A', + protocolVersion: '1.0.0', + }, + ], + }; + const clientA = await clientFactory.createFromAgentCard(tenantACard); + const resultA = await clientA.sendMessage({ + tenant: 'tenant-A', + message: createTestMessage('msg-a', 'Hello from A'), + configuration: undefined, + metadata: {}, + }); + expect('id' in resultA).to.equal(true); + + // Try to get tenant-A's task as tenant-B -- should fail + const tenantBCard = { + ...agentCard, + supportedInterfaces: [ + { + url: baseUrl, + protocolBinding: 'HTTP+JSON', + tenant: 'tenant-B', + protocolVersion: '1.0.0', + }, + ], + }; + const clientB = await clientFactory.createFromAgentCard(tenantBCard); + try { + await clientB.getTask({ + id: (resultA as any).id, + tenant: 'tenant-B', + historyLength: 0, + }); + // Should not reach here + expect.fail('Expected TaskNotFoundError'); + } catch (error: unknown) { + expect((error as Error).name).to.equal('TaskNotFoundError'); + } + } finally { + isolationServer.close(); + } + }); + }); +}); + const removeUndefinedFields = (obj: any) => JSON.parse(JSON.stringify(obj)); function createTestMessage(id: string, text: string): Message { return { diff --git a/test/server/rest_transport_handler.spec.ts b/test/server/rest_transport_handler.spec.ts index a05a07ba..20bfa580 100644 --- a/test/server/rest_transport_handler.spec.ts +++ b/test/server/rest_transport_handler.spec.ts @@ -137,7 +137,10 @@ describe('RestTransportHandler', () => { describe('getAuthenticatedExtendedAgentCard', () => { it('should return extended agent card from request handler', async () => { - const card = await transportHandler.getAuthenticatedExtendedAgentCard(mockContext); + const card = await transportHandler.getAuthenticatedExtendedAgentCard( + { tenant: '' }, + mockContext + ); expect(card).to.deep.equal(testAgentCard); expect(mockRequestHandler.getAuthenticatedExtendedAgentCard as Mock).toHaveBeenCalledTimes(1); diff --git a/test/server/tenant_isolation.spec.ts b/test/server/tenant_isolation.spec.ts new file mode 100644 index 00000000..e4451e44 --- /dev/null +++ b/test/server/tenant_isolation.spec.ts @@ -0,0 +1,233 @@ +import { describe, it, expect, beforeEach } from 'vitest'; +import { InMemoryTaskStore } from '../../src/server/store.js'; +import { InMemoryPushNotificationStore } from '../../src/server/push_notification/push_notification_store.js'; +import { ServerCallContext } from '../../src/server/context.js'; +import { Task, TaskState, TaskPushNotificationConfig } from '../../src/index.js'; + +function createContext(tenant?: string): ServerCallContext { + return new ServerCallContext(undefined, undefined, tenant); +} + +function createTask(id: string, contextId: string = 'ctx-1'): Task { + return { + id, + contextId, + status: { + state: TaskState.TASK_STATE_COMPLETED, + timestamp: new Date().toISOString(), + message: undefined, + }, + artifacts: [], + history: [], + metadata: {}, + }; +} + +describe('InMemoryTaskStore tenant isolation', () => { + let store: InMemoryTaskStore; + + beforeEach(() => { + store = new InMemoryTaskStore(); + }); + + it('should save and load a task without tenant (global scope)', async () => { + const ctx = createContext(); + const task = createTask('task-1'); + await store.save(task, ctx); + + const loaded = await store.load('task-1', ctx); + expect(loaded).toBeDefined(); + expect(loaded!.id).to.equal('task-1'); + }); + + it('should save and load a task with tenant', async () => { + const ctx = createContext('tenant-A'); + const task = createTask('task-1'); + await store.save(task, ctx); + + const loaded = await store.load('task-1', ctx); + expect(loaded).toBeDefined(); + expect(loaded!.id).to.equal('task-1'); + }); + + it('should isolate tasks between tenants', async () => { + const ctxA = createContext('tenant-A'); + const ctxB = createContext('tenant-B'); + + await store.save(createTask('task-1'), ctxA); + + // Tenant A can load the task + const loadedA = await store.load('task-1', ctxA); + expect(loadedA).toBeDefined(); + + // Tenant B cannot load the same task + const loadedB = await store.load('task-1', ctxB); + expect(loadedB).toBeUndefined(); + }); + + it('should allow same task ID in different tenants', async () => { + const ctxA = createContext('tenant-A'); + const ctxB = createContext('tenant-B'); + + const taskA = createTask('task-1', 'ctx-A'); + const taskB = createTask('task-1', 'ctx-B'); + + await store.save(taskA, ctxA); + await store.save(taskB, ctxB); + + const loadedA = await store.load('task-1', ctxA); + const loadedB = await store.load('task-1', ctxB); + + expect(loadedA!.contextId).to.equal('ctx-A'); + expect(loadedB!.contextId).to.equal('ctx-B'); + }); + + it('should list only tasks belonging to the tenant', async () => { + const ctxA = createContext('tenant-A'); + const ctxB = createContext('tenant-B'); + + await store.save(createTask('task-a1'), ctxA); + await store.save(createTask('task-a2'), ctxA); + await store.save(createTask('task-b1'), ctxB); + + const listA = await store.list( + { + tenant: 'tenant-A', + contextId: '', + status: undefined, + pageSize: 10, + pageToken: '', + statusTimestampAfter: '', + }, + ctxA + ); + + expect(listA.tasks).toHaveLength(2); + expect(listA.tasks.map((t) => t.id).sort()).toEqual(['task-a1', 'task-a2']); + + const listB = await store.list( + { + tenant: 'tenant-B', + contextId: '', + status: undefined, + pageSize: 10, + pageToken: '', + statusTimestampAfter: '', + }, + ctxB + ); + + expect(listB.tasks).toHaveLength(1); + expect(listB.tasks[0].id).to.equal('task-b1'); + }); + + it('should isolate tenant-scoped tasks from global scope', async () => { + const ctxGlobal = createContext(); + const ctxTenant = createContext('tenant-A'); + + await store.save(createTask('global-task'), ctxGlobal); + await store.save(createTask('tenant-task'), ctxTenant); + + // Global context should not see tenant tasks + const globalList = await store.list( + { + tenant: '', + contextId: '', + status: undefined, + pageSize: 10, + pageToken: '', + statusTimestampAfter: '', + }, + ctxGlobal + ); + expect(globalList.tasks).toHaveLength(1); + expect(globalList.tasks[0].id).to.equal('global-task'); + + // Tenant context should not see global tasks + const tenantList = await store.list( + { + tenant: 'tenant-A', + contextId: '', + status: undefined, + pageSize: 10, + pageToken: '', + statusTimestampAfter: '', + }, + ctxTenant + ); + expect(tenantList.tasks).toHaveLength(1); + expect(tenantList.tasks[0].id).to.equal('tenant-task'); + }); +}); + +describe('InMemoryPushNotificationStore tenant isolation', () => { + let store: InMemoryPushNotificationStore; + + const createConfig = ( + id: string, + taskId: string, + tenant: string = '' + ): TaskPushNotificationConfig => ({ + tenant, + id, + taskId, + url: `https://notify.example.com/${id}`, + token: 'secret', + authentication: undefined, + }); + + beforeEach(() => { + store = new InMemoryPushNotificationStore(); + }); + + it('should isolate configs between tenants', async () => { + const ctxA = createContext('tenant-A'); + const ctxB = createContext('tenant-B'); + + await store.save('task-1', ctxA, createConfig('config-1', 'task-1', 'tenant-A')); + + // Tenant A can load the config + const loadedA = await store.load('task-1', ctxA); + expect(loadedA).toHaveLength(1); + expect(loadedA[0].id).to.equal('config-1'); + + // Tenant B cannot load tenant A's configs + const loadedB = await store.load('task-1', ctxB); + expect(loadedB).toHaveLength(0); + }); + + it('should allow same task ID configs in different tenants', async () => { + const ctxA = createContext('tenant-A'); + const ctxB = createContext('tenant-B'); + + await store.save('task-1', ctxA, createConfig('config-a', 'task-1', 'tenant-A')); + await store.save('task-1', ctxB, createConfig('config-b', 'task-1', 'tenant-B')); + + const loadedA = await store.load('task-1', ctxA); + const loadedB = await store.load('task-1', ctxB); + + expect(loadedA).toHaveLength(1); + expect(loadedA[0].id).to.equal('config-a'); + expect(loadedB).toHaveLength(1); + expect(loadedB[0].id).to.equal('config-b'); + }); + + it('should delete configs only within the tenant scope', async () => { + const ctxA = createContext('tenant-A'); + const ctxB = createContext('tenant-B'); + + await store.save('task-1', ctxA, createConfig('config-1', 'task-1', 'tenant-A')); + await store.save('task-1', ctxB, createConfig('config-1', 'task-1', 'tenant-B')); + + // Delete from tenant A + await store.delete('task-1', ctxA, 'config-1'); + + // Tenant A config is gone + const loadedA = await store.load('task-1', ctxA); + expect(loadedA).toHaveLength(0); + + // Tenant B config still exists + const loadedB = await store.load('task-1', ctxB); + expect(loadedB).toHaveLength(1); + }); +}); From 922bf8bd38f4007791c40008bd2f5ba0225fd170 Mon Sep 17 00:00:00 2001 From: Bartek Gralewicz Date: Wed, 22 Apr 2026 11:59:26 +0000 Subject: [PATCH 08/16] Reduce code duplication in grpc_service when extracting tenant value. --- src/server/grpc/grpc_service.ts | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/src/server/grpc/grpc_service.ts b/src/server/grpc/grpc_service.ts index 3ae51ee1..6108126c 100644 --- a/src/server/grpc/grpc_service.ts +++ b/src/server/grpc/grpc_service.ts @@ -74,13 +74,7 @@ export function grpcService(options: GrpcServiceOptions): A2AServiceServer { converter: (res: TResult) => TRes ) => { try { - let context = await buildContext(call, options.userBuilder); - // For gRPC, tenant is inside the request message. Extract it and enrich - // the context so downstream components (stores, executors) can scope by tenant. - const tenant = (call.request as Record)?.tenant as string | undefined; - if (tenant) { - context = new ServerCallContext(context.requestedExtensions, context.user, tenant); - } + const context = await _buildContext(call, options.userBuilder); const result = await handler(call.request, context); call.sendMetadata(buildMetadata(context)); callback(null, converter(result)); @@ -106,13 +100,7 @@ export function grpcService(options: GrpcServiceOptions): A2AServiceServer { handler: (req: TReq, ctx: ServerCallContext) => AsyncGenerator ) => { try { - let context = await buildContext(call, options.userBuilder); - // For gRPC, tenant is inside the request message. Extract it and enrich - // the context so downstream components (stores, executors) can scope by tenant. - const tenant = (call.request as Record)?.tenant as string | undefined; - if (tenant) { - context = new ServerCallContext(context.requestedExtensions, context.user, tenant); - } + const context = await _buildContext(call, options.userBuilder); const stream = await handler(call.request, context); call.sendMetadata(buildMetadata(context)); for await (const responsePart of stream) { @@ -255,7 +243,7 @@ const mapToError = (error: unknown): Partial => { }; }; -const buildContext = async ( +const _buildContext = async ( call: grpc.ServerUnaryCall | grpc.ServerWritableStream, userBuilder: UserBuilder ): Promise => { @@ -263,7 +251,15 @@ const buildContext = async ( const extensionHeaders = call.metadata.get(HTTP_EXTENSION_HEADER); const extensionString = extensionHeaders.map((v) => v.toString()).join(','); - return new ServerCallContext(Extensions.parseServiceParameter(extensionString), user); + let context = new ServerCallContext(Extensions.parseServiceParameter(extensionString), user); + + // For gRPC, tenant is inside the request message. Extract it and enrich + // the context so downstream components (stores, executors) can scope by tenant. + const tenant = (call.request as Record)?.tenant as string | undefined; + if (tenant) { + context = new ServerCallContext(context.requestedExtensions, context.user, tenant); + } + return context; }; const buildMetadata = (context: ServerCallContext): grpc.Metadata => { From 58d877f5870307ee8c933cdfc57924ddf16662e4 Mon Sep 17 00:00:00 2001 From: Bartek Gralewicz Date: Wed, 22 Apr 2026 13:56:31 +0000 Subject: [PATCH 09/16] Simplify tenant extraction in grpc_service. --- src/server/grpc/grpc_service.ts | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/server/grpc/grpc_service.ts b/src/server/grpc/grpc_service.ts index 6108126c..043a4b5f 100644 --- a/src/server/grpc/grpc_service.ts +++ b/src/server/grpc/grpc_service.ts @@ -250,16 +250,10 @@ const _buildContext = async ( const user = await userBuilder(call); const extensionHeaders = call.metadata.get(HTTP_EXTENSION_HEADER); const extensionString = extensionHeaders.map((v) => v.toString()).join(','); - - let context = new ServerCallContext(Extensions.parseServiceParameter(extensionString), user); - - // For gRPC, tenant is inside the request message. Extract it and enrich - // the context so downstream components (stores, executors) can scope by tenant. const tenant = (call.request as Record)?.tenant as string | undefined; - if (tenant) { - context = new ServerCallContext(context.requestedExtensions, context.user, tenant); - } - return context; + return tenant + ? new ServerCallContext(Extensions.parseServiceParameter(extensionString), user, tenant) + : new ServerCallContext(Extensions.parseServiceParameter(extensionString), user); }; const buildMetadata = (context: ServerCallContext): grpc.Metadata => { From 2bbaa935e0a1cd2c1f9ef8b18e74c9fd37bb775a Mon Sep 17 00:00:00 2001 From: Bartek Gralewicz Date: Wed, 22 Apr 2026 14:29:18 +0000 Subject: [PATCH 10/16] Add safety throw if task id includes : as it would make tenant url stacking ambigous. --- src/server/push_notification/push_notification_store.ts | 9 ++++++++- src/server/store.ts | 8 +++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/src/server/push_notification/push_notification_store.ts b/src/server/push_notification/push_notification_store.ts index 05ae2f26..16e36da8 100644 --- a/src/server/push_notification/push_notification_store.ts +++ b/src/server/push_notification/push_notification_store.ts @@ -1,3 +1,4 @@ +import { RequestMalformedError } from '../../errors.js'; import { TaskPushNotificationConfig } from '../../index.js'; import { ServerCallContext } from '../context.js'; @@ -28,7 +29,13 @@ export class InMemoryPushNotificationStore implements PushNotificationStore { * Builds a composite storage key from tenant and task ID. */ private _storageKey(taskId: string, context: ServerCallContext): string { - return context.tenant ? `${context.tenant}:${taskId}` : taskId; + if (context.tenant) { + return `${context.tenant}:${taskId}`; + } + if (taskId && taskId.includes(':')) { + throw new RequestMalformedError('Task ID cannot contain ":" character for global tasks.'); + } + return taskId; } async save( diff --git a/src/server/store.ts b/src/server/store.ts index ca0ecc4c..e902a4b1 100644 --- a/src/server/store.ts +++ b/src/server/store.ts @@ -55,7 +55,13 @@ export class InMemoryTaskStore implements TaskStore { * When tenant is absent, the key is the taskId alone (global scope). */ private _storageKey(taskId: string, context: ServerCallContext): string { - return context.tenant ? `${context.tenant}:${taskId}` : taskId; + if (context.tenant) { + return `${context.tenant}:${taskId}`; + } + if (taskId && taskId.includes(':')) { + throw new RequestMalformedError('Task ID cannot contain ":" character for global tasks.'); + } + return taskId; } async load(taskId: string, context: ServerCallContext): Promise { From 5792de4d7ad24134f61dcb73ada29632cca0b51d Mon Sep 17 00:00:00 2001 From: Bartek Gralewicz Date: Thu, 23 Apr 2026 08:28:13 +0000 Subject: [PATCH 11/16] Use nested map instead of relying on a semicolon naming scheme. --- .../push_notification_store.ts | 57 +++++++++++------- src/server/store.ts | 60 +++++++------------ 2 files changed, 57 insertions(+), 60 deletions(-) diff --git a/src/server/push_notification/push_notification_store.ts b/src/server/push_notification/push_notification_store.ts index 16e36da8..7b88ced3 100644 --- a/src/server/push_notification/push_notification_store.ts +++ b/src/server/push_notification/push_notification_store.ts @@ -1,4 +1,3 @@ -import { RequestMalformedError } from '../../errors.js'; import { TaskPushNotificationConfig } from '../../index.js'; import { ServerCallContext } from '../context.js'; @@ -20,22 +19,33 @@ export interface PushNotificationStore { /** * In-memory push notification config store with tenant-scoped data isolation. - * Uses `context.tenant` to build composite storage keys, preventing cross-tenant access. + * A nested Map structure (tenant -> taskId -> configs[]) is used so that tenant + * scoping is structural, imposing no restrictions on task ID format. */ export class InMemoryPushNotificationStore implements PushNotificationStore { - private store: Map = new Map(); + // Outer map: tenant key ('' for global/no-tenant) -> inner map of taskId -> configs + private store: Map> = new Map(); - /** - * Builds a composite storage key from tenant and task ID. - */ - private _storageKey(taskId: string, context: ServerCallContext): string { - if (context.tenant) { - return `${context.tenant}:${taskId}`; - } - if (taskId && taskId.includes(':')) { - throw new RequestMalformedError('Task ID cannot contain ":" character for global tasks.'); + private _tenantKey(context: ServerCallContext): string { + return context.tenant ?? ''; + } + + private _getTenantBucket( + context: ServerCallContext + ): Map | undefined { + return this.store.get(this._tenantKey(context)); + } + + private _getOrCreateTenantBucket( + context: ServerCallContext + ): Map { + const key = this._tenantKey(context); + let bucket = this.store.get(key); + if (!bucket) { + bucket = new Map(); + this.store.set(key, bucket); } - return taskId; + return bucket; } async save( @@ -43,8 +53,8 @@ export class InMemoryPushNotificationStore implements PushNotificationStore { context: ServerCallContext, pushNotificationConfig: TaskPushNotificationConfig ): Promise { - const key = this._storageKey(taskId, context); - const configs = this.store.get(key) || []; + const bucket = this._getOrCreateTenantBucket(context); + const configs = bucket.get(taskId) || []; // Set ID if it's not already set if (!pushNotificationConfig.id) { @@ -59,12 +69,11 @@ export class InMemoryPushNotificationStore implements PushNotificationStore { // Add the new/updated config configs.push(pushNotificationConfig); - this.store.set(key, configs); + bucket.set(taskId, configs); } async load(taskId: string, context: ServerCallContext): Promise { - const key = this._storageKey(taskId, context); - const configs = this.store.get(key); + const configs = this._getTenantBucket(context)?.get(taskId); return configs || []; } @@ -74,8 +83,12 @@ export class InMemoryPushNotificationStore implements PushNotificationStore { configId = taskId; } - const key = this._storageKey(taskId, context); - const configs = this.store.get(key); + const bucket = this._getTenantBucket(context); + if (!bucket) { + return; + } + + const configs = bucket.get(taskId); if (!configs) { return; } @@ -86,9 +99,7 @@ export class InMemoryPushNotificationStore implements PushNotificationStore { } if (configs.length === 0) { - this.store.delete(key); - } else { - this.store.set(key, configs); + bucket.delete(taskId); } } } diff --git a/src/server/store.ts b/src/server/store.ts index e902a4b1..4518e27b 100644 --- a/src/server/store.ts +++ b/src/server/store.ts @@ -43,36 +43,40 @@ export interface TaskStore { // ======================== // // InMemoryTaskStore provides tenant-scoped data isolation using `context.tenant`. -// Tasks are stored with a composite key of `{tenant}:{taskId}` when a tenant is present. -// When no tenant is specified, tasks are stored under the taskId alone (global scope). +// A nested Map structure (tenant -> taskId -> Task) is used so that tenant scoping +// is structural rather than key-convention based, imposing no restrictions on task ID format. export class InMemoryTaskStore implements TaskStore { - private store: Map = new Map(); + // Outer map: tenant key ('' for global/no-tenant) -> inner map of taskId -> Task + private store: Map> = new Map(); - /** - * Builds a composite storage key from tenant and task ID. - * When tenant is present, the key is `{tenant}:{taskId}` to provide tenant isolation. - * When tenant is absent, the key is the taskId alone (global scope). - */ - private _storageKey(taskId: string, context: ServerCallContext): string { - if (context.tenant) { - return `${context.tenant}:${taskId}`; - } - if (taskId && taskId.includes(':')) { - throw new RequestMalformedError('Task ID cannot contain ":" character for global tasks.'); + private _tenantKey(context: ServerCallContext): string { + return context.tenant ?? ''; + } + + private _getTenantBucket(context: ServerCallContext): Map | undefined { + return this.store.get(this._tenantKey(context)); + } + + private _getOrCreateTenantBucket(context: ServerCallContext): Map { + const key = this._tenantKey(context); + let bucket = this.store.get(key); + if (!bucket) { + bucket = new Map(); + this.store.set(key, bucket); } - return taskId; + return bucket; } async load(taskId: string, context: ServerCallContext): Promise { - const entry = this.store.get(this._storageKey(taskId, context)); + const entry = this._getTenantBucket(context)?.get(taskId); // Return copies to prevent external mutation return entry ? { ...entry } : undefined; } async save(task: Task, context: ServerCallContext): Promise { // Store copies to prevent internal mutation if caller reuses objects - this.store.set(this._storageKey(task.id, context), { ...task }); + this._getOrCreateTenantBucket(context).set(task.id, { ...task }); } async list(params: ListTasksRequest, context: ServerCallContext): Promise { @@ -86,16 +90,8 @@ export class InMemoryTaskStore implements TaskStore { includeArtifacts = false, } = params; - let tasks = Array.from(this.store.entries()) - // Filter by tenant: only return tasks whose storage key belongs to the current tenant scope - .filter(([key]) => { - if (context.tenant) { - return key.startsWith(`${context.tenant}:`); - } - // When no tenant is specified, only return global-scope tasks (no ':' prefix from tenanting) - return !key.includes(':') || this._isGlobalKey(key); - }) - .map(([, task]) => task); + const bucket = this._getTenantBucket(context); + let tasks = bucket ? Array.from(bucket.values()) : []; // Filter by contextId if (contextId) { @@ -183,14 +179,4 @@ export class InMemoryTaskStore implements TaskStore { totalSize, }; } - - /** - * Checks if a key that contains ':' is actually a global-scope key - * (i.e., the task ID itself contains ':'). This is determined by checking - * whether the key exists in the store as a task whose ID matches the full key. - */ - private _isGlobalKey(key: string): boolean { - const task = this.store.get(key); - return task !== undefined && task.id === key; - } } From 41e6423aad9d3bf68544b6ce45f4e71afc2a4080 Mon Sep 17 00:00:00 2001 From: Bartek Gralewicz Date: Thu, 23 Apr 2026 08:30:21 +0000 Subject: [PATCH 12/16] Applied nit for the ServerCallContext construction with optional tenant field. --- src/server/grpc/grpc_service.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/server/grpc/grpc_service.ts b/src/server/grpc/grpc_service.ts index 043a4b5f..1839001e 100644 --- a/src/server/grpc/grpc_service.ts +++ b/src/server/grpc/grpc_service.ts @@ -251,9 +251,7 @@ const _buildContext = async ( const extensionHeaders = call.metadata.get(HTTP_EXTENSION_HEADER); const extensionString = extensionHeaders.map((v) => v.toString()).join(','); const tenant = (call.request as Record)?.tenant as string | undefined; - return tenant - ? new ServerCallContext(Extensions.parseServiceParameter(extensionString), user, tenant) - : new ServerCallContext(Extensions.parseServiceParameter(extensionString), user); + return new ServerCallContext(Extensions.parseServiceParameter(extensionString), user, tenant); }; const buildMetadata = (context: ServerCallContext): grpc.Metadata => { From a52ecca123f39b351d9b2cdc2bb4242daf627346 Mon Sep 17 00:00:00 2001 From: Bartek Gralewicz Date: Thu, 23 Apr 2026 08:55:59 +0000 Subject: [PATCH 13/16] Add tenant_transport_decorator so that default tenant is automatically added to all requests when specified. --- src/client/factory.ts | 41 ++- src/client/index.ts | 1 + src/client/multitransport-client.ts | 10 +- .../transports/tenant_transport_decorator.ts | 141 ++++++++ test/client/factory.spec.ts | 35 ++ .../tenant_transport_decorator.spec.ts | 315 ++++++++++++++++++ 6 files changed, 523 insertions(+), 20 deletions(-) create mode 100644 src/client/transports/tenant_transport_decorator.ts create mode 100644 test/client/transports/tenant_transport_decorator.spec.ts diff --git a/src/client/factory.ts b/src/client/factory.ts index b97ccb4e..2e05c4ed 100644 --- a/src/client/factory.ts +++ b/src/client/factory.ts @@ -4,6 +4,7 @@ import { AgentCardResolver } from './card-resolver.js'; import { Client, ClientConfig } from './multitransport-client.js'; import { JsonRpcTransportFactory } from './transports/json_rpc_transport.js'; import { RestTransportFactory } from './transports/rest_transport.js'; +import { TenantTransportDecorator } from './transports/tenant_transport_decorator.js'; import { TransportFactory } from './transports/transport.js'; export interface ClientFactoryOptions { @@ -95,29 +96,41 @@ export class ClientFactory { /** * Creates a new client from the provided agent card. + * + * When the selected `AgentInterface` declares a non-empty `tenant` value + * (per spec Section 4.4.6), the transport is automatically wrapped with a + * {@link TenantTransportDecorator} so the default tenant is applied to every + * request without requiring callers to set it manually. */ async createFromAgentCard(agentCard: AgentCard): Promise { const interfaces = agentCard.supportedInterfaces ?? []; - const urlsPerAgentTransports = new CaseInsensitiveMap(); - for (const i of interfaces) { - const existing = urlsPerAgentTransports.get(i.protocolBinding); - if (!existing || i.protocolVersion === '1.0') { - urlsPerAgentTransports.set(i.protocolBinding, i.url); + + // Track the best interface per protocol binding (prefer version 1.0). + const bestInterfacePerProtocol = new CaseInsensitiveMap<(typeof interfaces)[number]>(); + for (const agentInterface of interfaces) { + const existing = bestInterfacePerProtocol.get(agentInterface.protocolBinding); + if (!existing || agentInterface.protocolVersion === '1.0') { + bestInterfacePerProtocol.set(agentInterface.protocolBinding, agentInterface); } } + const transportsByPreference = [ ...(this.options.preferredTransports ?? []), ...interfaces.map((i) => i.protocolBinding), ]; - for (const transport of transportsByPreference) { - const url = urlsPerAgentTransports.get(transport); - const factory = this.transportsByName.get(transport); - if (factory && url) { - return new Client( - await factory.create(url, agentCard), - agentCard, - this.options.clientConfig - ); + for (const transportName of transportsByPreference) { + const selectedInterface = bestInterfacePerProtocol.get(transportName); + const factory = this.transportsByName.get(transportName); + if (factory && selectedInterface) { + let transport = await factory.create(selectedInterface.url, agentCard); + + // If the agent interface declares a default tenant, wrap the transport + // so the tenant is automatically applied to all requests. + if (selectedInterface.tenant) { + transport = new TenantTransportDecorator(transport, selectedInterface.tenant); + } + + return new Client(transport, agentCard, this.options.clientConfig); } } throw new Error( diff --git a/src/client/index.ts b/src/client/index.ts index ea1ea5f2..91c931be 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -10,6 +10,7 @@ export { } from './card-resolver.js'; export { Client, type ClientConfig, type RequestOptions } from './multitransport-client.js'; export type { Transport, TransportFactory } from './transports/transport.js'; +export { TenantTransportDecorator } from './transports/tenant_transport_decorator.js'; export { ClientFactory, ClientFactoryOptions } from './factory.js'; export { JsonRpcTransportFactory } from './transports/json_rpc_transport.js'; export { RestTransportFactory } from './transports/rest_transport.js'; diff --git a/src/client/multitransport-client.ts b/src/client/multitransport-client.ts index 6c2dc7f3..51fc7ff6 100644 --- a/src/client/multitransport-client.ts +++ b/src/client/multitransport-client.ts @@ -78,18 +78,16 @@ export class Client { * If the current agent card supports the extended feature, it will try to fetch the extended agent card from the server, * Otherwise it will return the current agent card value. * - * The tenant for the request is inferred from the agent card's interface declaration. + * When a default tenant is configured (via `TenantTransportDecorator`, wired + * automatically by `ClientFactory` from `AgentInterface.tenant`), the tenant + * is applied to the request transparently. */ async getAgentCard(options?: RequestOptions): Promise { if (this.agentCard.capabilities?.extendedAgentCard) { - const tenant = - this.agentCard.supportedInterfaces?.find( - (iface) => iface.protocolBinding === this.transport.protocolName - )?.tenant ?? ''; this.agentCard = await this.executeWithInterceptors( { method: 'getAgentCard' }, options, - (_, options) => this.transport.getExtendedAgentCard({ tenant }, options) + (_, options) => this.transport.getExtendedAgentCard({ tenant: '' }, options) ); } return this.agentCard; diff --git a/src/client/transports/tenant_transport_decorator.ts b/src/client/transports/tenant_transport_decorator.ts new file mode 100644 index 00000000..620a48a1 --- /dev/null +++ b/src/client/transports/tenant_transport_decorator.ts @@ -0,0 +1,141 @@ +import { + AgentCard, + CancelTaskRequest, + DeleteTaskPushNotificationConfigRequest, + GetExtendedAgentCardRequest, + GetTaskPushNotificationConfigRequest, + GetTaskRequest, + ListTaskPushNotificationConfigsRequest, + ListTaskPushNotificationConfigsResponse, + ListTasksRequest, + ListTasksResponse, + SendMessageRequest, + StreamResponse, + SubscribeToTaskRequest, + Task, + TaskPushNotificationConfig, +} from '../../index.js'; +import { RequestOptions } from '../multitransport-client.js'; +import { Transport } from './transport.js'; +import { SendMessageResult } from '../../index.js'; + +/** + * A transport decorator that attaches a default tenant to all requests. + * + * When an `AgentInterface` declares a `tenant` value (per spec Section 4.4.6), + * this decorator ensures every outbound request carries that tenant unless the + * caller has already specified one. This mirrors the behavior of the Python SDK's + * `TenantTransportDecorator`. + * + * The factory wires this decorator automatically when `AgentInterface.tenant` is + * non-empty, so callers do not need to manually set tenant on every request. + */ +export class TenantTransportDecorator implements Transport { + constructor( + private readonly base: Transport, + private readonly defaultTenant: string + ) {} + + get protocolName(): string { + return this.base.protocolName; + } + + /** + * Returns the request tenant if non-empty, otherwise falls back to the default. + */ + private _resolveTenant(tenant: string | undefined): string { + return tenant || this.defaultTenant; + } + + async getExtendedAgentCard( + params: GetExtendedAgentCardRequest, + options?: RequestOptions + ): Promise { + return this.base.getExtendedAgentCard( + { ...params, tenant: this._resolveTenant(params.tenant) }, + options + ); + } + + async sendMessage( + params: SendMessageRequest, + options?: RequestOptions + ): Promise { + return this.base.sendMessage( + { ...params, tenant: this._resolveTenant(params.tenant) }, + options + ); + } + + async *sendMessageStream( + params: SendMessageRequest, + options?: RequestOptions + ): AsyncGenerator { + yield* this.base.sendMessageStream( + { ...params, tenant: this._resolveTenant(params.tenant) }, + options + ); + } + + async getTask(params: GetTaskRequest, options?: RequestOptions): Promise { + return this.base.getTask({ ...params, tenant: this._resolveTenant(params.tenant) }, options); + } + + async cancelTask(params: CancelTaskRequest, options?: RequestOptions): Promise { + return this.base.cancelTask({ ...params, tenant: this._resolveTenant(params.tenant) }, options); + } + + async listTasks(params: ListTasksRequest, options?: RequestOptions): Promise { + return this.base.listTasks({ ...params, tenant: this._resolveTenant(params.tenant) }, options); + } + + async createTaskPushNotificationConfig( + params: TaskPushNotificationConfig, + options?: RequestOptions + ): Promise { + return this.base.createTaskPushNotificationConfig( + { ...params, tenant: this._resolveTenant(params.tenant) }, + options + ); + } + + async getTaskPushNotificationConfig( + params: GetTaskPushNotificationConfigRequest, + options?: RequestOptions + ): Promise { + return this.base.getTaskPushNotificationConfig( + { ...params, tenant: this._resolveTenant(params.tenant) }, + options + ); + } + + async listTaskPushNotificationConfig( + params: ListTaskPushNotificationConfigsRequest, + options?: RequestOptions + ): Promise { + return this.base.listTaskPushNotificationConfig( + { ...params, tenant: this._resolveTenant(params.tenant) }, + options + ); + } + + async deleteTaskPushNotificationConfig( + params: DeleteTaskPushNotificationConfigRequest, + options?: RequestOptions + ): Promise { + return this.base.deleteTaskPushNotificationConfig( + { ...params, tenant: this._resolveTenant(params.tenant) }, + options + ); + } + + async *resubscribeTask( + params: SubscribeToTaskRequest, + options?: RequestOptions + ): AsyncGenerator { + yield* this.base.resubscribeTask( + { ...params, tenant: this._resolveTenant(params.tenant) }, + options + ); + } +} diff --git a/test/client/factory.spec.ts b/test/client/factory.spec.ts index 0677372e..5f671f79 100644 --- a/test/client/factory.spec.ts +++ b/test/client/factory.spec.ts @@ -1,6 +1,7 @@ import { describe, it, beforeEach, expect, vi, Mock } from 'vitest'; import { ClientFactory, ClientFactoryOptions } from '../../src/client/factory.js'; import { Transport } from '../../src/client/transports/transport.js'; +import { TenantTransportDecorator } from '../../src/client/transports/tenant_transport_decorator.js'; import { AgentCard } from '../../src/index.js'; import { Client } from '../../src/client/multitransport-client.js'; import { CallInterceptor } from '../../src/client/interceptors.js'; @@ -299,6 +300,40 @@ describe('ClientFactory', () => { 'a2a/my-agent-card.json' ); }); + + it('should wrap transport with TenantTransportDecorator when interface has tenant', async () => { + agentCard.supportedInterfaces = [ + { + url: 'http://transport1.com', + protocolBinding: 'Transport1', + tenant: 'my-tenant', + protocolVersion: '1.0.0', + }, + ]; + const factory = new ClientFactory({ transports: [mockTransportFactory1] }); + + const client = await factory.createFromAgentCard(agentCard); + + expect(client).to.be.instanceOf(Client); + expect(client.transport).to.be.instanceOf(TenantTransportDecorator); + }); + + it('should NOT wrap transport with TenantTransportDecorator when interface has no tenant', async () => { + agentCard.supportedInterfaces = [ + { + url: 'http://transport1.com', + protocolBinding: 'Transport1', + tenant: '', + protocolVersion: '1.0.0', + }, + ]; + const factory = new ClientFactory({ transports: [mockTransportFactory1] }); + + const client = await factory.createFromAgentCard(agentCard); + + expect(client).to.be.instanceOf(Client); + expect(client.transport).not.to.be.instanceOf(TenantTransportDecorator); + }); }); describe('ClientFactoryOptions.createFrom', () => { diff --git a/test/client/transports/tenant_transport_decorator.spec.ts b/test/client/transports/tenant_transport_decorator.spec.ts new file mode 100644 index 00000000..91cc117a --- /dev/null +++ b/test/client/transports/tenant_transport_decorator.spec.ts @@ -0,0 +1,315 @@ +import { describe, it, beforeEach, expect, vi, Mock } from 'vitest'; +import { TenantTransportDecorator } from '../../../src/client/transports/tenant_transport_decorator.js'; +import { Transport } from '../../../src/client/transports/transport.js'; +import { SendMessageRequest } from '../../../src/types/pb/a2a.js'; + +describe('TenantTransportDecorator', () => { + const DEFAULT_TENANT = 'default-tenant'; + let mockTransport: Record, Mock> & { + protocolName: string; + }; + let decorator: TenantTransportDecorator; + + beforeEach(() => { + mockTransport = { + getExtendedAgentCard: vi.fn().mockResolvedValue({}), + sendMessage: vi.fn().mockResolvedValue({}), + sendMessageStream: vi.fn().mockReturnValue((async function* () {})()), + createTaskPushNotificationConfig: vi.fn().mockResolvedValue({}), + getTaskPushNotificationConfig: vi.fn().mockResolvedValue({}), + listTaskPushNotificationConfig: vi.fn().mockResolvedValue({ configs: [] }), + deleteTaskPushNotificationConfig: vi.fn().mockResolvedValue(undefined), + getTask: vi.fn().mockResolvedValue({}), + cancelTask: vi.fn().mockResolvedValue({}), + listTasks: vi.fn().mockResolvedValue({ tasks: [] }), + resubscribeTask: vi.fn().mockReturnValue((async function* () {})()), + protocolName: 'MockTransport', + }; + decorator = new TenantTransportDecorator(mockTransport, DEFAULT_TENANT); + }); + + it('should expose the base transport protocol name', () => { + expect(decorator.protocolName).to.equal('MockTransport'); + }); + + describe('default tenant application', () => { + it('should apply default tenant to sendMessage when tenant is empty', async () => { + await decorator.sendMessage({ + tenant: '', + message: undefined, + configuration: undefined, + metadata: {}, + }); + + const passedParams = mockTransport.sendMessage.mock.calls[0][0]; + expect(passedParams.tenant).to.equal(DEFAULT_TENANT); + }); + + it('should preserve caller-specified tenant on sendMessage', async () => { + await decorator.sendMessage({ + tenant: 'custom-tenant', + message: undefined, + configuration: undefined, + metadata: {}, + }); + + const passedParams = mockTransport.sendMessage.mock.calls[0][0]; + expect(passedParams.tenant).to.equal('custom-tenant'); + }); + + it('should apply default tenant to getExtendedAgentCard when tenant is empty', async () => { + await decorator.getExtendedAgentCard({ tenant: '' }); + + const passedParams = mockTransport.getExtendedAgentCard.mock.calls[0][0]; + expect(passedParams.tenant).to.equal(DEFAULT_TENANT); + }); + + it('should apply default tenant to getTask when tenant is empty', async () => { + await decorator.getTask({ id: 'task-1', tenant: '', historyLength: 0 }); + + const passedParams = mockTransport.getTask.mock.calls[0][0]; + expect(passedParams.tenant).to.equal(DEFAULT_TENANT); + }); + + it('should preserve caller-specified tenant on getTask', async () => { + await decorator.getTask({ id: 'task-1', tenant: 'override', historyLength: 0 }); + + const passedParams = mockTransport.getTask.mock.calls[0][0]; + expect(passedParams.tenant).to.equal('override'); + }); + + it('should apply default tenant to cancelTask when tenant is empty', async () => { + await decorator.cancelTask({ id: 'task-1', tenant: '', metadata: {} }); + + const passedParams = mockTransport.cancelTask.mock.calls[0][0]; + expect(passedParams.tenant).to.equal(DEFAULT_TENANT); + }); + + it('should apply default tenant to listTasks when tenant is empty', async () => { + await decorator.listTasks({ + tenant: '', + contextId: '', + status: undefined, + pageToken: '', + statusTimestampAfter: '', + }); + + const passedParams = mockTransport.listTasks.mock.calls[0][0]; + expect(passedParams.tenant).to.equal(DEFAULT_TENANT); + }); + + it('should apply default tenant to createTaskPushNotificationConfig when tenant is empty', async () => { + await decorator.createTaskPushNotificationConfig({ + tenant: '', + id: 'config-1', + taskId: 'task-1', + url: 'https://example.com', + token: '', + authentication: undefined, + }); + + const passedParams = mockTransport.createTaskPushNotificationConfig.mock.calls[0][0]; + expect(passedParams.tenant).to.equal(DEFAULT_TENANT); + }); + + it('should apply default tenant to getTaskPushNotificationConfig when tenant is empty', async () => { + await decorator.getTaskPushNotificationConfig({ id: 'cfg-1', taskId: 'task-1', tenant: '' }); + + const passedParams = mockTransport.getTaskPushNotificationConfig.mock.calls[0][0]; + expect(passedParams.tenant).to.equal(DEFAULT_TENANT); + }); + + it('should apply default tenant to listTaskPushNotificationConfig when tenant is empty', async () => { + await decorator.listTaskPushNotificationConfig({ + taskId: 'task-1', + tenant: '', + pageSize: 0, + pageToken: '', + }); + + const passedParams = mockTransport.listTaskPushNotificationConfig.mock.calls[0][0]; + expect(passedParams.tenant).to.equal(DEFAULT_TENANT); + }); + + it('should apply default tenant to deleteTaskPushNotificationConfig when tenant is empty', async () => { + await decorator.deleteTaskPushNotificationConfig({ + id: 'cfg-1', + taskId: 'task-1', + tenant: '', + }); + + const passedParams = mockTransport.deleteTaskPushNotificationConfig.mock.calls[0][0]; + expect(passedParams.tenant).to.equal(DEFAULT_TENANT); + }); + + it('should apply default tenant to sendMessageStream when tenant is empty', async () => { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const _ of decorator.sendMessageStream({ + tenant: '', + message: undefined, + configuration: undefined, + metadata: {}, + })) { + // consume + } + + const passedParams = mockTransport.sendMessageStream.mock.calls[0][0]; + expect(passedParams.tenant).to.equal(DEFAULT_TENANT); + }); + + it('should apply default tenant to resubscribeTask when tenant is empty', async () => { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const _ of decorator.resubscribeTask({ id: 'task-1', tenant: '' })) { + // consume + } + + const passedParams = mockTransport.resubscribeTask.mock.calls[0][0]; + expect(passedParams.tenant).to.equal(DEFAULT_TENANT); + }); + }); + + describe('caller-specified tenant preservation', () => { + const CALLER_TENANT = 'caller-tenant'; + + it('should preserve caller-specified tenant on getExtendedAgentCard', async () => { + await decorator.getExtendedAgentCard({ tenant: CALLER_TENANT }); + + const passedParams = mockTransport.getExtendedAgentCard.mock.calls[0][0]; + expect(passedParams.tenant).to.equal(CALLER_TENANT); + }); + + it('should preserve caller-specified tenant on cancelTask', async () => { + await decorator.cancelTask({ id: 'task-1', tenant: CALLER_TENANT, metadata: {} }); + + const passedParams = mockTransport.cancelTask.mock.calls[0][0]; + expect(passedParams.tenant).to.equal(CALLER_TENANT); + }); + + it('should preserve caller-specified tenant on listTasks', async () => { + await decorator.listTasks({ + tenant: CALLER_TENANT, + contextId: '', + status: undefined, + pageToken: '', + statusTimestampAfter: '', + }); + + const passedParams = mockTransport.listTasks.mock.calls[0][0]; + expect(passedParams.tenant).to.equal(CALLER_TENANT); + }); + + it('should preserve caller-specified tenant on createTaskPushNotificationConfig', async () => { + await decorator.createTaskPushNotificationConfig({ + tenant: CALLER_TENANT, + id: 'config-1', + taskId: 'task-1', + url: 'https://example.com', + token: '', + authentication: undefined, + }); + + const passedParams = mockTransport.createTaskPushNotificationConfig.mock.calls[0][0]; + expect(passedParams.tenant).to.equal(CALLER_TENANT); + }); + + it('should preserve caller-specified tenant on getTaskPushNotificationConfig', async () => { + await decorator.getTaskPushNotificationConfig({ + id: 'cfg-1', + taskId: 'task-1', + tenant: CALLER_TENANT, + }); + + const passedParams = mockTransport.getTaskPushNotificationConfig.mock.calls[0][0]; + expect(passedParams.tenant).to.equal(CALLER_TENANT); + }); + + it('should preserve caller-specified tenant on listTaskPushNotificationConfig', async () => { + await decorator.listTaskPushNotificationConfig({ + taskId: 'task-1', + tenant: CALLER_TENANT, + pageSize: 0, + pageToken: '', + }); + + const passedParams = mockTransport.listTaskPushNotificationConfig.mock.calls[0][0]; + expect(passedParams.tenant).to.equal(CALLER_TENANT); + }); + + it('should preserve caller-specified tenant on deleteTaskPushNotificationConfig', async () => { + await decorator.deleteTaskPushNotificationConfig({ + id: 'cfg-1', + taskId: 'task-1', + tenant: CALLER_TENANT, + }); + + const passedParams = mockTransport.deleteTaskPushNotificationConfig.mock.calls[0][0]; + expect(passedParams.tenant).to.equal(CALLER_TENANT); + }); + + it('should preserve caller-specified tenant on sendMessageStream', async () => { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const _ of decorator.sendMessageStream({ + tenant: CALLER_TENANT, + message: undefined, + configuration: undefined, + metadata: {}, + })) { + // consume + } + + const passedParams = mockTransport.sendMessageStream.mock.calls[0][0]; + expect(passedParams.tenant).to.equal(CALLER_TENANT); + }); + + it('should preserve caller-specified tenant on resubscribeTask', async () => { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const _ of decorator.resubscribeTask({ + id: 'task-1', + tenant: CALLER_TENANT, + })) { + // consume + } + + const passedParams = mockTransport.resubscribeTask.mock.calls[0][0]; + expect(passedParams.tenant).to.equal(CALLER_TENANT); + }); + }); + + describe('other fields passthrough', () => { + it('should not mutate the original params object', async () => { + const params: SendMessageRequest = { + tenant: '', + message: undefined, + configuration: undefined, + metadata: { key: 'value' }, + }; + const original = { ...params }; + await decorator.sendMessage(params); + + // Original object should be unchanged + expect(params).to.deep.equal(original); + // But the base transport received the resolved tenant + const passedParams = mockTransport.sendMessage.mock.calls[0][0]; + expect(passedParams.tenant).to.equal(DEFAULT_TENANT); + }); + + it('should forward all non-tenant fields unchanged', async () => { + await decorator.getTask({ id: 'my-task', tenant: '', historyLength: 42 }); + + const passedParams = mockTransport.getTask.mock.calls[0][0]; + expect(passedParams.id).to.equal('my-task'); + expect(passedParams.historyLength).to.equal(42); + expect(passedParams.tenant).to.equal(DEFAULT_TENANT); + }); + }); + + it('should pass through RequestOptions to the base transport', async () => { + const options = { signal: new AbortController().signal }; + await decorator.sendMessage( + { tenant: '', message: undefined, configuration: undefined, metadata: {} }, + options + ); + + expect(mockTransport.sendMessage.mock.calls[0][1]).to.equal(options); + }); +}); From 8e2aefbe704b61f6dd34d5f0cf301023e83c42d3 Mon Sep 17 00:00:00 2001 From: Bartek Gralewicz Date: Thu, 23 Apr 2026 09:23:41 +0000 Subject: [PATCH 14/16] Update applyPathTenant to support query parameters as well. The name is also updated to resolvePathTenant. --- .betterer.results | 4 +- src/client/factory.ts | 1 - src/server/express/rest_handler.ts | 61 ++++++++++++++---------------- 3 files changed, 30 insertions(+), 36 deletions(-) diff --git a/.betterer.results b/.betterer.results index 1fb94338..169b85f2 100644 --- a/.betterer.results +++ b/.betterer.results @@ -9,8 +9,8 @@ exports[`TypeScript Strict Mode`] = { [327, 15, 4, "tsc: Expected 2 arguments, but got 1.", "2087764327"], [350, 15, 4, "tsc: Expected 2 arguments, but got 1.", "2087764327"] ], - "src/server/transports/jsonrpc/jsonrpc_transport_handler.ts:1546714885": [ - [92, 12, 10, "tsc: Variable \'rpcRequest\' is used before being assigned.", "3927050741"] + "src/server/transports/jsonrpc/jsonrpc_transport_handler.ts:3366537262": [ + [90, 12, 10, "tsc: Variable \'rpcRequest\' is used before being assigned.", "3927050741"] ] }` }; diff --git a/src/client/factory.ts b/src/client/factory.ts index 2e05c4ed..e49d07f9 100644 --- a/src/client/factory.ts +++ b/src/client/factory.ts @@ -105,7 +105,6 @@ export class ClientFactory { async createFromAgentCard(agentCard: AgentCard): Promise { const interfaces = agentCard.supportedInterfaces ?? []; - // Track the best interface per protocol binding (prefer version 1.0). const bestInterfacePerProtocol = new CaseInsensitiveMap<(typeof interfaces)[number]>(); for (const agentInterface of interfaces) { const existing = bestInterfacePerProtocol.get(agentInterface.protocolBinding); diff --git a/src/server/express/rest_handler.ts b/src/server/express/rest_handler.ts index a0f6fbeb..268cb468 100644 --- a/src/server/express/rest_handler.ts +++ b/src/server/express/rest_handler.ts @@ -276,25 +276,31 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { }; /** - * Resolves tenant for a request by applying the path-based tenant to the params object. - * If the request body already contains a tenant that differs from the path-based tenant, - * a warning is logged and the path-based tenant takes precedence (since it is the - * canonical source per the spec: "provided as a path parameter"). + * Resolves the tenant for a request from the URL path parameter, optionally + * validating it against an existing tenant value from the request body or + * query string. The path-based tenant always takes precedence (per spec: + * "provided as a path parameter"). * - * @param params - The deserialized request params object with a `tenant` field - * @param req - Express request with potential `req.params.tenant` from URL path + * When called with only `req`, returns the path tenant or empty string. + * When called with `req` and an existing `bodyTenant`, logs a warning if + * they conflict and returns the path tenant. + * + * @param req - Express request with potential `req.params.tenant` + * @param bodyTenant - Optional tenant value from the request body or query + * @returns The resolved tenant string */ - const applyPathTenant = (params: T, req: Request): void => { + const resolvePathTenant = (req: Request, bodyTenant?: string): string => { const pathTenant = req.params.tenant as string | undefined; if (pathTenant) { - if (params.tenant && params.tenant !== pathTenant) { + if (bodyTenant && bodyTenant !== pathTenant) { console.warn( - `Tenant mismatch: URL path tenant "${pathTenant}" differs from request body ` + - `tenant "${params.tenant}". Using path tenant as the canonical value.` + `Tenant mismatch: URL path tenant "${pathTenant}" differs from request ` + + `tenant "${bodyTenant}". Using path tenant as the canonical value.` ); } - params.tenant = pathTenant; + return pathTenant; } + return bodyTenant || ''; }; /** @@ -308,7 +314,7 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { registerRoute('get', '/extendedAgentCard', async (req, res) => { const context = await buildContext(req); const result = await restTransportHandler.getAuthenticatedExtendedAgentCard( - { tenant: (req.params.tenant as string) || '' }, + { tenant: resolvePathTenant(req) }, context ); sendResponse(res, HTTP_STATUS.OK, context, result, AgentCard); @@ -328,7 +334,7 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { registerRoute('post', '/message\\:send', async (req, res) => { const context = await buildContext(req); const params = SendMessageRequest.fromJSON(req.body); - applyPathTenant(params, req); + params.tenant = resolvePathTenant(req, params.tenant); const result = await restTransportHandler.sendMessage(params, context); const protoResult = ToProto.messageSendResult(result); sendResponse( @@ -355,7 +361,7 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { registerRoute('post', '/message\\:stream', async (req, res) => { const context = await buildContext(req); const params = SendMessageRequest.fromJSON(req.body); - applyPathTenant(params, req); + params.tenant = resolvePathTenant(req, params.tenant); const stream = await restTransportHandler.sendMessageStream(params, context); await sendStreamResponse(res, stream, context); }); @@ -377,7 +383,7 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { req.params.taskId, context, req.query.historyLength, - req.params.tenant + resolvePathTenant(req) ); sendResponse(res, HTTP_STATUS.OK, context, result, Task); }); @@ -398,7 +404,7 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { const result = await restTransportHandler.cancelTask( req.params.taskId, context, - req.params.tenant + resolvePathTenant(req) ); sendResponse(res, HTTP_STATUS.ACCEPTED, context, result, Task); }); @@ -414,18 +420,7 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { registerRoute('get', '/tasks', async (req, res) => { const context = await buildContext(req); const query = { ...req.query }; - // For listTasks, tenant comes from the URL path (not from query params). - // The path tenant is already in the context; pass it along in the query object - // so RestTransportHandler can include it in the ListTasksRequest. - if (req.params.tenant) { - if (query.tenant && query.tenant !== req.params.tenant) { - console.warn( - `Tenant mismatch: URL path tenant "${req.params.tenant}" differs from query ` + - `param tenant "${query.tenant}". Using path tenant as the canonical value.` - ); - } - query.tenant = req.params.tenant; - } + query.tenant = resolvePathTenant(req, query.tenant as string | undefined); const result = await restTransportHandler.listTasks(query, context); sendResponse(res, HTTP_STATUS.OK, context, result, ListTasksResponse); }); @@ -446,7 +441,7 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { const stream = await restTransportHandler.resubscribe( req.params.taskId, context, - req.params.tenant + resolvePathTenant(req) ); await sendStreamResponse(res, stream, context); }); @@ -465,7 +460,7 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { registerRoute('post', '/tasks/:taskId/pushNotificationConfigs', async (req, res) => { const context = await buildContext(req); const params = TaskPushNotificationConfig.fromJSON(req.body); - applyPathTenant(params, req); + params.tenant = resolvePathTenant(req, params.tenant); const result = await restTransportHandler.createTaskPushNotificationConfig(params, context); sendResponse( res, @@ -490,7 +485,7 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { const result = await restTransportHandler.listTaskPushNotificationConfigs( req.params.taskId, context, - req.params.tenant + resolvePathTenant(req) ); sendResponse( res, @@ -517,7 +512,7 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { req.params.taskId, req.params.configId, context, - req.params.tenant + resolvePathTenant(req) ); sendResponse( res, @@ -544,7 +539,7 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { req.params.taskId, req.params.configId, context, - req.params.tenant + resolvePathTenant(req) ); sendResponse(res, HTTP_STATUS.NO_CONTENT, context); }); From 42c0e663eeb24e236c6f837e914d19a5ff552489 Mon Sep 17 00:00:00 2001 From: Bartek Gralewicz Date: Thu, 23 Apr 2026 09:34:11 +0000 Subject: [PATCH 15/16] Add tenantMiddleware to properly apply tenant logic directly on the hanlder layer. --- src/server/express/rest_handler.ts | 104 +++++++++++++++++------------ 1 file changed, 62 insertions(+), 42 deletions(-) diff --git a/src/server/express/rest_handler.ts b/src/server/express/rest_handler.ts index 268cb468..7ea5227c 100644 --- a/src/server/express/rest_handler.ts +++ b/src/server/express/rest_handler.ts @@ -263,8 +263,61 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { // Route Handlers // ============================================================================ + /** + * Middleware that resolves tenant from the URL path parameter and normalizes + * it into the request so downstream handlers don't need to deal with tenant + * resolution at all. + * + * For tenant-prefixed routes (`/:tenant/...`), the path tenant is the + * canonical source (per spec: "provided as a path parameter"). If the + * request body or query string also carries a tenant that differs, a warning + * is logged and the path tenant wins. + * + * The resolved tenant is written to: + * - `req.body.tenant` for POST / PUT / DELETE requests that may carry a JSON body + * - `req.query.tenant` for GET requests that use query parameters + * + * Non-tenant-prefixed routes pass through unchanged. + */ + const tenantMiddleware = (req: Request, _res: Response, next: () => void): void => { + const pathTenant = req.params.tenant as string | undefined; + if (!pathTenant) { + next(); + return; + } + + // Detect conflict with body tenant (POST / PUT / DELETE with JSON body) + const bodyTenant = req.body?.tenant as string | undefined; + if (bodyTenant && bodyTenant !== pathTenant) { + console.warn( + `Tenant mismatch: URL path tenant "${pathTenant}" differs from request body ` + + `tenant "${bodyTenant}". Using path tenant as the canonical value.` + ); + } + + // Detect conflict with query tenant (GET) + const queryTenant = req.query?.tenant as string | undefined; + if (queryTenant && queryTenant !== pathTenant) { + console.warn( + `Tenant mismatch: URL path tenant "${pathTenant}" differs from query param ` + + `tenant "${queryTenant}". Using path tenant as the canonical value.` + ); + } + + // Normalize: write path tenant into both body and query so handlers can + // read it from whichever source they naturally consume. + if (req.body) { + req.body.tenant = pathTenant; + } + (req.query as Record).tenant = pathTenant; + + next(); + }; + /** * Helper to register routes with and without optional tenant prefix. + * Tenant-prefixed routes get `tenantMiddleware` applied automatically, + * so individual handlers never need to resolve tenant themselves. */ const registerRoute = ( method: 'get' | 'post' | 'delete' | 'put', @@ -272,35 +325,7 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { handler: AsyncRouteHandler ) => { router[method](path, asyncHandler(handler)); - router[method](`/:tenant${path}`, asyncHandler(handler)); - }; - - /** - * Resolves the tenant for a request from the URL path parameter, optionally - * validating it against an existing tenant value from the request body or - * query string. The path-based tenant always takes precedence (per spec: - * "provided as a path parameter"). - * - * When called with only `req`, returns the path tenant or empty string. - * When called with `req` and an existing `bodyTenant`, logs a warning if - * they conflict and returns the path tenant. - * - * @param req - Express request with potential `req.params.tenant` - * @param bodyTenant - Optional tenant value from the request body or query - * @returns The resolved tenant string - */ - const resolvePathTenant = (req: Request, bodyTenant?: string): string => { - const pathTenant = req.params.tenant as string | undefined; - if (pathTenant) { - if (bodyTenant && bodyTenant !== pathTenant) { - console.warn( - `Tenant mismatch: URL path tenant "${pathTenant}" differs from request ` + - `tenant "${bodyTenant}". Using path tenant as the canonical value.` - ); - } - return pathTenant; - } - return bodyTenant || ''; + router[method](`/:tenant${path}`, tenantMiddleware, asyncHandler(handler)); }; /** @@ -314,7 +339,7 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { registerRoute('get', '/extendedAgentCard', async (req, res) => { const context = await buildContext(req); const result = await restTransportHandler.getAuthenticatedExtendedAgentCard( - { tenant: resolvePathTenant(req) }, + { tenant: (req.query.tenant as string) || '' }, context ); sendResponse(res, HTTP_STATUS.OK, context, result, AgentCard); @@ -334,7 +359,6 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { registerRoute('post', '/message\\:send', async (req, res) => { const context = await buildContext(req); const params = SendMessageRequest.fromJSON(req.body); - params.tenant = resolvePathTenant(req, params.tenant); const result = await restTransportHandler.sendMessage(params, context); const protoResult = ToProto.messageSendResult(result); sendResponse( @@ -361,7 +385,6 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { registerRoute('post', '/message\\:stream', async (req, res) => { const context = await buildContext(req); const params = SendMessageRequest.fromJSON(req.body); - params.tenant = resolvePathTenant(req, params.tenant); const stream = await restTransportHandler.sendMessageStream(params, context); await sendStreamResponse(res, stream, context); }); @@ -383,7 +406,7 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { req.params.taskId, context, req.query.historyLength, - resolvePathTenant(req) + (req.query.tenant as string) || '' ); sendResponse(res, HTTP_STATUS.OK, context, result, Task); }); @@ -404,7 +427,7 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { const result = await restTransportHandler.cancelTask( req.params.taskId, context, - resolvePathTenant(req) + (req.query.tenant as string) || '' ); sendResponse(res, HTTP_STATUS.ACCEPTED, context, result, Task); }); @@ -419,9 +442,7 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { */ registerRoute('get', '/tasks', async (req, res) => { const context = await buildContext(req); - const query = { ...req.query }; - query.tenant = resolvePathTenant(req, query.tenant as string | undefined); - const result = await restTransportHandler.listTasks(query, context); + const result = await restTransportHandler.listTasks(req.query, context); sendResponse(res, HTTP_STATUS.OK, context, result, ListTasksResponse); }); @@ -441,7 +462,7 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { const stream = await restTransportHandler.resubscribe( req.params.taskId, context, - resolvePathTenant(req) + (req.query.tenant as string) || '' ); await sendStreamResponse(res, stream, context); }); @@ -460,7 +481,6 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { registerRoute('post', '/tasks/:taskId/pushNotificationConfigs', async (req, res) => { const context = await buildContext(req); const params = TaskPushNotificationConfig.fromJSON(req.body); - params.tenant = resolvePathTenant(req, params.tenant); const result = await restTransportHandler.createTaskPushNotificationConfig(params, context); sendResponse( res, @@ -485,7 +505,7 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { const result = await restTransportHandler.listTaskPushNotificationConfigs( req.params.taskId, context, - resolvePathTenant(req) + (req.query.tenant as string) || '' ); sendResponse( res, @@ -512,7 +532,7 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { req.params.taskId, req.params.configId, context, - resolvePathTenant(req) + (req.query.tenant as string) || '' ); sendResponse( res, @@ -539,7 +559,7 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { req.params.taskId, req.params.configId, context, - resolvePathTenant(req) + (req.query.tenant as string) || '' ); sendResponse(res, HTTP_STATUS.NO_CONTENT, context); }); From 1067cd8d9ebf1d43ed9bc5f7249a40891134a8b9 Mon Sep 17 00:00:00 2001 From: Bartek Gralewicz Date: Thu, 23 Apr 2026 09:50:58 +0000 Subject: [PATCH 16/16] Add drain method to the tenant transport decorator so that no eslint disables are needed. --- .../tenant_transport_decorator.spec.ts | 59 ++++++++++--------- 1 file changed, 30 insertions(+), 29 deletions(-) diff --git a/test/client/transports/tenant_transport_decorator.spec.ts b/test/client/transports/tenant_transport_decorator.spec.ts index 91cc117a..b56463ba 100644 --- a/test/client/transports/tenant_transport_decorator.spec.ts +++ b/test/client/transports/tenant_transport_decorator.spec.ts @@ -3,6 +3,13 @@ import { TenantTransportDecorator } from '../../../src/client/transports/tenant_ import { Transport } from '../../../src/client/transports/transport.js'; import { SendMessageRequest } from '../../../src/types/pb/a2a.js'; +/** Drains an async generator to completion. */ +async function drain(gen: AsyncGenerator): Promise { + while (!(await gen.next()).done) { + // consume all values + } +} + describe('TenantTransportDecorator', () => { const DEFAULT_TENANT = 'default-tenant'; let mockTransport: Record, Mock> & { @@ -143,25 +150,21 @@ describe('TenantTransportDecorator', () => { }); it('should apply default tenant to sendMessageStream when tenant is empty', async () => { - // eslint-disable-next-line @typescript-eslint/no-unused-vars - for await (const _ of decorator.sendMessageStream({ - tenant: '', - message: undefined, - configuration: undefined, - metadata: {}, - })) { - // consume - } + await drain( + decorator.sendMessageStream({ + tenant: '', + message: undefined, + configuration: undefined, + metadata: {}, + }) + ); const passedParams = mockTransport.sendMessageStream.mock.calls[0][0]; expect(passedParams.tenant).to.equal(DEFAULT_TENANT); }); it('should apply default tenant to resubscribeTask when tenant is empty', async () => { - // eslint-disable-next-line @typescript-eslint/no-unused-vars - for await (const _ of decorator.resubscribeTask({ id: 'task-1', tenant: '' })) { - // consume - } + await drain(decorator.resubscribeTask({ id: 'task-1', tenant: '' })); const passedParams = mockTransport.resubscribeTask.mock.calls[0][0]; expect(passedParams.tenant).to.equal(DEFAULT_TENANT); @@ -247,28 +250,26 @@ describe('TenantTransportDecorator', () => { }); it('should preserve caller-specified tenant on sendMessageStream', async () => { - // eslint-disable-next-line @typescript-eslint/no-unused-vars - for await (const _ of decorator.sendMessageStream({ - tenant: CALLER_TENANT, - message: undefined, - configuration: undefined, - metadata: {}, - })) { - // consume - } + await drain( + decorator.sendMessageStream({ + tenant: CALLER_TENANT, + message: undefined, + configuration: undefined, + metadata: {}, + }) + ); const passedParams = mockTransport.sendMessageStream.mock.calls[0][0]; expect(passedParams.tenant).to.equal(CALLER_TENANT); }); it('should preserve caller-specified tenant on resubscribeTask', async () => { - // eslint-disable-next-line @typescript-eslint/no-unused-vars - for await (const _ of decorator.resubscribeTask({ - id: 'task-1', - tenant: CALLER_TENANT, - })) { - // consume - } + await drain( + decorator.resubscribeTask({ + id: 'task-1', + tenant: CALLER_TENANT, + }) + ); const passedParams = mockTransport.resubscribeTask.mock.calls[0][0]; expect(passedParams.tenant).to.equal(CALLER_TENANT);