diff --git a/package-lock.json b/package-lock.json index e0bf30ac..cab1df15 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,6 +16,7 @@ "@eslint/js": "^9.39.1", "@types/express": "^5.0.3", "@types/node": "^22.13.14", + "@types/pg": "^8.16.0", "@types/sinon": "^17.0.4", "@types/supertest": "^6.0.3", "@vitest/coverage-v8": "^4.0.15", @@ -1521,6 +1522,18 @@ "dev": true, "license": "MIT" }, + "node_modules/@types/pg": { + "version": "8.16.0", + "resolved": "https://registry.npmjs.org/@types/pg/-/pg-8.16.0.tgz", + "integrity": "sha512-RmhMd/wD+CF8Dfo+cVIy3RR5cl8CyfXQ0tGgW6XBL8L4LM/UTEbNXYRbLwU6w+CgrKBNbrQWt4FUtTfaU5jSYQ==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/node": "*", + "pg-protocol": "*", + "pg-types": "^2.2.0" + } + }, "node_modules/@types/qs": { "version": "6.14.0", "resolved": "https://registry.npmjs.org/@types/qs/-/qs-6.14.0.tgz", @@ -5683,6 +5696,40 @@ "dev": true, "license": "MIT" }, + "node_modules/pg-int8": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/pg-int8/-/pg-int8-1.0.1.tgz", + "integrity": "sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw==", + "dev": true, + "license": "ISC", + "engines": { + "node": ">=4.0.0" + } + }, + "node_modules/pg-protocol": { + "version": "1.10.3", + "resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.10.3.tgz", + "integrity": "sha512-6DIBgBQaTKDJyxnXaLiLR8wBpQQcGWuAESkRBX/t6OwA8YsqP+iVSiond2EDy6Y/dsGk8rh/jtax3js5NeV7JQ==", + "dev": true, + "license": "MIT" + }, + "node_modules/pg-types": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/pg-types/-/pg-types-2.2.0.tgz", + "integrity": "sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==", + "dev": true, + "license": "MIT", + "dependencies": { + "pg-int8": "1.0.1", + "postgres-array": "~2.0.0", + "postgres-bytea": "~1.0.0", + "postgres-date": "~1.0.4", + "postgres-interval": "^1.1.0" + }, + "engines": { + "node": ">=4" + } + }, "node_modules/picocolors": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/picocolors/-/picocolors-1.1.1.tgz", @@ -5797,6 +5844,49 @@ } } }, + "node_modules/postgres-array": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/postgres-array/-/postgres-array-2.0.0.tgz", + "integrity": "sha512-VpZrUqU5A69eQyW2c5CA1jtLecCsN2U/bD6VilrFDWq5+5UIEVO7nazS3TEcHf1zuPYO/sqGvUvW62g86RXZuA==", + "dev": true, + "license": "MIT", + "engines": { + "node": ">=4" + } + }, + "node_modules/postgres-bytea": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/postgres-bytea/-/postgres-bytea-1.0.0.tgz", + "integrity": "sha512-xy3pmLuQqRBZBXDULy7KbaitYqLcmxigw14Q5sj8QBVLqEwXfeybIKVWiqAXTlcvdvb0+xkOtDbfQMOf4lST1w==", + "dev": true, + "license": "MIT", + "engines": { + "node": ">=0.10.0" + } + }, + "node_modules/postgres-date": { + "version": "1.0.7", + "resolved": "https://registry.npmjs.org/postgres-date/-/postgres-date-1.0.7.tgz", + "integrity": "sha512-suDmjLVQg78nMK2UZ454hAG+OAW+HQPZ6n++TNDUX+L0+uUlLywnoxJKDou51Zm+zTCjrCl0Nq6J9C5hP9vK/Q==", + "dev": true, + "license": "MIT", + "engines": { + "node": ">=0.10.0" + } + }, + "node_modules/postgres-interval": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/postgres-interval/-/postgres-interval-1.2.0.tgz", + "integrity": "sha512-9ZhXKM/rw350N1ovuWHbGxnGh/SNJ4cnxHiM0rxE4VN41wsg8P8zWn9hv/buK00RP4WvlOyr/RBDiptyxVbkZQ==", + "dev": true, + "license": "MIT", + "dependencies": { + "xtend": "^4.0.0" + }, + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/prelude-ls": { "version": "1.2.1", "resolved": "https://registry.npmjs.org/prelude-ls/-/prelude-ls-1.2.1.tgz", @@ -8569,6 +8659,16 @@ "dev": true, "license": "ISC" }, + "node_modules/xtend": { + "version": "4.0.2", + "resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz", + "integrity": "sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==", + "dev": true, + "license": "MIT", + "engines": { + "node": ">=0.4" + } + }, "node_modules/yallist": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz", diff --git a/package.json b/package.json index b6ec1a7b..f8828ba2 100644 --- a/package.json +++ b/package.json @@ -29,6 +29,11 @@ "import": "./dist/server/express/index.js", "require": "./dist/server/express/index.cjs" }, + "./server/pgTaskStore": { + "types": "./dist/server/pgTaskStore/index.d.ts", + "import": "./dist/server/pgTaskStore/index.js", + "require": "./dist/server/pgTaskStore/index.cjs" + }, "./client": { "types": "./dist/client/index.d.ts", "import": "./dist/client/index.js", @@ -45,6 +50,7 @@ "@eslint/js": "^9.39.1", "@types/express": "^5.0.3", "@types/node": "^22.13.14", + "@types/pg": "^8.16.0", "@types/sinon": "^17.0.4", "@types/supertest": "^6.0.3", "@vitest/coverage-v8": "^4.0.15", diff --git a/src/server/pgTaskStore/index.ts b/src/server/pgTaskStore/index.ts new file mode 100644 index 00000000..8819ef97 --- /dev/null +++ b/src/server/pgTaskStore/index.ts @@ -0,0 +1,189 @@ +import { Task, TaskStatus } from '../../types.js'; +import { TaskStore } from '../store.js'; +import { A2A_DB_NAMES, getMigrations } from './migrations/index.js'; +import { Pool } from 'pg'; + +/** + * A persistent TaskStore implementation using PostgreSQL. + * + * Assumes the following table schema: + * CREATE TABLE a2a_tasks ( + * id VARCHAR(255) PRIMARY KEY, + * task_data JSONB NOT NULL, + * created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + * updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + * ); + */ +export class PostgresTaskStore implements TaskStore { + private pool: Pool; + private isSetup: boolean = false; + + constructor(pool: Pool) { + this.pool = pool; + } + + async setup(): Promise { + if (this.isSetup) { + return; + } + const client = await this.pool.connect(); + try { + // Ensure the public schema exists (it usually does by default) + await client.query(`CREATE SCHEMA IF NOT EXISTS a2a_public;`); + let version = -1; + const MIGRATIONS = getMigrations(); + + try { + const result = await client.query( + `SELECT v FROM ${A2A_DB_NAMES.MIGRATION} ORDER BY v DESC LIMIT 1` + ); + if (result.rows.length > 0) { + version = result.rows[0].v; + } + } catch (error) { + // Assume table doesn't exist if there's an error + if ( + typeof error === 'object' && + error !== null && + 'code' in error && + typeof error.code === 'string' && + error.code === '42P01' // Postgres error code for undefined_table + ) { + version = -1; + } else { + throw error; + } + } + + for (let v = version + 1; v < MIGRATIONS.length; v += 1) { + await client.query(MIGRATIONS[v]); + await client.query(`INSERT INTO ${A2A_DB_NAMES.MIGRATION} (v) VALUES ($1)`, [v]); + } + + this.isSetup = true; + } finally { + client.release(); + } + } + + public async load(id: string): Promise { + if (!this.isSetup) { + throw new Error('PostgresTaskStore not set up. Call .setup() first.'); + } + const res = await this.pool.query(`SELECT * FROM ${A2A_DB_NAMES.TASKS} WHERE id = $1`, [id]); + if (res.rows.length === 0) { + return undefined; + } + const row = res.rows[0]; + const reconstructedTask: Task = { + id: row.id, + ...row.task_data, // Spread the rest of the task_data + }; + + return reconstructedTask; + } + + public async save(task: Task): Promise { + if (!this.isSetup) { + throw new Error('PostgresTaskStore not set up. Call .setup() first.'); + } + + const taskId = task.id; + + if (!taskId) { + throw new Error('Task object must contain task id properties for persistence.'); + } + + await this.pool.query( + `INSERT INTO ${A2A_DB_NAMES.TASKS} (id, task_data) + VALUES ($1, $2) + ON CONFLICT (id) DO UPDATE SET + task_data = $2, + updated_at = NOW()`, + [taskId, task] + ); + } + + public async delete(id: string): Promise { + if (!this.isSetup) { + throw new Error('PostgresTaskStore not set up. Call .setup() first.'); + } + await this.pool.query(`DELETE FROM ${A2A_DB_NAMES.TASKS} WHERE id = $1`, [id]); + } + + public async list( + page: string = '1', + pageSize: string = '10', + status?: TaskStatus['state'][], + metadataSearch?: Record + ): Promise<{ + result: Task[]; + page: string; + pageSize: string; + totalNumberOfTasks: number; + }> { + if (!this.isSetup) { + throw new Error('PostgresTaskStore not set up. Call .setup() first.'); + } + + // let jsonbQuery = `task_data ->'metadata' @> jsonb_build_object( + // 'memberId', $1::text, + // 'userId', $2::text + // )`; + let jsonbQuery = ''; + if (metadataSearch) { + jsonbQuery = `task_data ->'metadata' @> jsonb_build_object(${Object.entries(metadataSearch) + .map(([key, _value], index) => `'${key}', $${index + 1}::text`) + .join(',')})`; + } + + let additionalQuery = ''; + if (status) { + additionalQuery = `AND (task_data ->'status' ->> 'state') IN (${status?.map((status) => `'${status}'`).join(',')})`; + } + + const params = [...Object.values(metadataSearch)]; + + const query = `WITH filtered_tasks AS ( + SELECT * + FROM ${A2A_DB_NAMES.TASKS} WHERE ${jsonbQuery} ${additionalQuery} + ), + paginated_tasks AS ( + SELECT * + FROM filtered_tasks + ORDER BY created_at DESC + LIMIT ${parseInt(pageSize)} OFFSET ${(parseInt(page) - 1) * parseInt(pageSize)} + ), + total_filtered_count AS ( + SELECT COUNT(*) AS count FROM filtered_tasks + ) + SELECT + ( + SELECT json_agg(pt ORDER BY pt.created_at DESC) + FROM paginated_tasks pt + ) AS tasks, + total_filtered_count.count + FROM total_filtered_count;`; + + const res = await this.pool.query(query, params); + + return { + result: res.rows[0].tasks + ? res.rows[0].tasks.map((row: { id: string; task_data: Task }) => { + const reconstructedTask: Task = { + id: row.id, + ...row.task_data, + }; + return reconstructedTask; + }) + : [], + page, + pageSize, + totalNumberOfTasks: res.rows[0].count, + }; + } + + async end(): Promise { + await this.pool.end(); + } +} diff --git a/src/server/pgTaskStore/migrations/index.ts b/src/server/pgTaskStore/migrations/index.ts new file mode 100644 index 00000000..b7f27820 --- /dev/null +++ b/src/server/pgTaskStore/migrations/index.ts @@ -0,0 +1,30 @@ +export enum A2A_DB_NAMES { + MIGRATION = 'a2a_public.a2a_migrations', + TASKS = 'a2a_public.a2a_tasks', + METADATA_INDEX = 'idx_metadata_gin', + STATUS_INDEX = 'idx_status_gin', +} + +export const getMigrations = () => [ + ` + CREATE TABLE IF NOT EXISTS ${A2A_DB_NAMES.MIGRATION} ( + v INT PRIMARY KEY + ); + `, + ` + CREATE TABLE IF NOT EXISTS ${A2A_DB_NAMES.TASKS} ( + id VARCHAR(255) PRIMARY KEY, + task_data JSONB NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ); + `, + + `CREATE INDEX IF NOT EXISTS ${A2A_DB_NAMES.METADATA_INDEX} + ON ${A2A_DB_NAMES.TASKS} + USING gin ((task_data->'metadata'));`, + + `CREATE INDEX IF NOT EXISTS ${A2A_DB_NAMES.STATUS_INDEX} + ON ${A2A_DB_NAMES.TASKS} + USING gin ((task_data->'status'));`, +]; diff --git a/src/server/store.ts b/src/server/store.ts index 1510aae7..aa9e3fcd 100644 --- a/src/server/store.ts +++ b/src/server/store.ts @@ -1,4 +1,4 @@ -import { Task } from '../types.js'; +import { Task, TaskStatus } from '../types.js'; import { ServerCallContext } from './context.js'; /** @@ -22,6 +22,28 @@ export interface TaskStore { * @returns A promise resolving to an object containing the Task, or undefined if not found. */ load(taskId: string, context?: ServerCallContext): Promise; + + /** + * Deletes a task by task ID. + * @param taskId The ID of the task to delete. + * @returns A promise resolving when the delete operation is complete. + */ + delete(taskId: string): Promise; + + /** + * Lists tasks. + * @param page The page number will send all tasks if not provided. + * @param pageSize The page size will send all tasks if not provided. + * @param status The list of statuses to filter tasks by. + * @param metadataSearch To filter tasks by metadata keys and values. + * @returns A promise resolving to an object containing the tasks. + */ + list( + page?: string, + pageSize?: string, + status?: TaskStatus['state'][], + metadataSearch?: Record + ): Promise<{ result: Task[]; totalNumberOfTasks: number }>; } // ======================== @@ -42,4 +64,39 @@ export class InMemoryTaskStore implements TaskStore { // Store copies to prevent internal mutation if caller reuses objects this.store.set(task.id, { ...task }); } + + async delete(taskId: string): Promise { + this.store.delete(taskId); + } + + async list( + page?: string, + pageSize?: string, + status?: TaskStatus['state'][], + metadataSearch?: Record + ): Promise<{ result: Task[]; totalNumberOfTasks: number }> { + const tasks = Array.from(this.store.values()); + const filteredTasks = !status + ? tasks + : tasks.filter((task) => status?.includes(task.status.state)); + const filteredTasksByMetadata = !metadataSearch + ? filteredTasks + : filteredTasks.filter((task) => + Object.entries(metadataSearch).every( + ([key, value]) => task.metadata?.[key] === (value as unknown) + ) + ); + const paginatedTasks = + !page || !pageSize + ? filteredTasksByMetadata + : filteredTasksByMetadata.slice( + (Number(page) - 1) * Number(pageSize), + Number(page) * Number(pageSize) + ); + const totalNumberOfTasks = filteredTasksByMetadata.length; + return { + result: paginatedTasks, + totalNumberOfTasks, + }; + } } diff --git a/tsup.config.ts b/tsup.config.ts index 24030b81..cc649c03 100644 --- a/tsup.config.ts +++ b/tsup.config.ts @@ -5,6 +5,7 @@ export default defineConfig({ 'src/index.ts', 'src/server/index.ts', 'src/server/express/index.ts', + 'src/server/pgTaskStore/index.ts', 'src/client/index.ts', ], format: ['esm', 'cjs'],