-
Notifications
You must be signed in to change notification settings - Fork 136
feat: add pg task store #269
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,189 @@ | ||
| import { Task, TaskStatus } from '../../types.js'; | ||
| import { TaskStore } from '../store.js'; | ||
| import { A2A_DB_NAMES, getMigrations } from './migrations/index.js'; | ||
| import { Pool } from 'pg'; | ||
|
|
||
| /** | ||
| * A persistent TaskStore implementation using PostgreSQL. | ||
| * | ||
| * Assumes the following table schema: | ||
| * CREATE TABLE a2a_tasks ( | ||
| * id VARCHAR(255) PRIMARY KEY, | ||
| * task_data JSONB NOT NULL, | ||
| * created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), | ||
| * updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() | ||
| * ); | ||
| */ | ||
| export class PostgresTaskStore implements TaskStore { | ||
| private pool: Pool; | ||
| private isSetup: boolean = false; | ||
|
|
||
| constructor(pool: Pool) { | ||
| this.pool = pool; | ||
| } | ||
|
|
||
| async setup(): Promise<void> { | ||
| if (this.isSetup) { | ||
| return; | ||
| } | ||
| const client = await this.pool.connect(); | ||
| try { | ||
| // Ensure the public schema exists (it usually does by default) | ||
| await client.query(`CREATE SCHEMA IF NOT EXISTS a2a_public;`); | ||
| let version = -1; | ||
| const MIGRATIONS = getMigrations(); | ||
|
|
||
| try { | ||
| const result = await client.query( | ||
| `SELECT v FROM ${A2A_DB_NAMES.MIGRATION} ORDER BY v DESC LIMIT 1` | ||
| ); | ||
| if (result.rows.length > 0) { | ||
| version = result.rows[0].v; | ||
| } | ||
| } catch (error) { | ||
| // Assume table doesn't exist if there's an error | ||
| if ( | ||
| typeof error === 'object' && | ||
| error !== null && | ||
| 'code' in error && | ||
| typeof error.code === 'string' && | ||
| error.code === '42P01' // Postgres error code for undefined_table | ||
| ) { | ||
| version = -1; | ||
| } else { | ||
| throw error; | ||
| } | ||
| } | ||
|
|
||
| for (let v = version + 1; v < MIGRATIONS.length; v += 1) { | ||
| await client.query(MIGRATIONS[v]); | ||
| await client.query(`INSERT INTO ${A2A_DB_NAMES.MIGRATION} (v) VALUES ($1)`, [v]); | ||
| } | ||
|
|
||
| this.isSetup = true; | ||
| } finally { | ||
| client.release(); | ||
| } | ||
| } | ||
|
|
||
| public async load(id: string): Promise<Task | undefined> { | ||
| if (!this.isSetup) { | ||
| throw new Error('PostgresTaskStore not set up. Call .setup() first.'); | ||
| } | ||
| const res = await this.pool.query(`SELECT * FROM ${A2A_DB_NAMES.TASKS} WHERE id = $1`, [id]); | ||
| if (res.rows.length === 0) { | ||
| return undefined; | ||
| } | ||
| const row = res.rows[0]; | ||
| const reconstructedTask: Task = { | ||
| id: row.id, | ||
| ...row.task_data, // Spread the rest of the task_data | ||
| }; | ||
|
|
||
| return reconstructedTask; | ||
| } | ||
|
|
||
| public async save(task: Task): Promise<void> { | ||
| if (!this.isSetup) { | ||
| throw new Error('PostgresTaskStore not set up. Call .setup() first.'); | ||
| } | ||
|
|
||
| const taskId = task.id; | ||
|
|
||
| if (!taskId) { | ||
| throw new Error('Task object must contain task id properties for persistence.'); | ||
| } | ||
|
|
||
| await this.pool.query( | ||
| `INSERT INTO ${A2A_DB_NAMES.TASKS} (id, task_data) | ||
| VALUES ($1, $2) | ||
| ON CONFLICT (id) DO UPDATE SET | ||
| task_data = $2, | ||
| updated_at = NOW()`, | ||
| [taskId, task] | ||
| ); | ||
|
Comment on lines
+91
to
+104
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The const { id: taskId, ...taskData } = task;
if (!taskId) {
throw new Error('Task object must contain task id properties for persistence.');
}
await this.pool.query(
`INSERT INTO ${A2A_DB_NAMES.TASKS} (id, task_data)
VALUES ($1, $2)
ON CONFLICT (id) DO UPDATE SET
task_data = $2,
updated_at = NOW()`,
[taskId, taskData]
); |
||
| } | ||
|
|
||
| public async delete(id: string): Promise<void> { | ||
| if (!this.isSetup) { | ||
| throw new Error('PostgresTaskStore not set up. Call .setup() first.'); | ||
| } | ||
| await this.pool.query(`DELETE FROM ${A2A_DB_NAMES.TASKS} WHERE id = $1`, [id]); | ||
| } | ||
|
|
||
| public async list( | ||
| page: string = '1', | ||
| pageSize: string = '10', | ||
| status?: TaskStatus['state'][], | ||
| metadataSearch?: Record<string, unknown> | ||
| ): Promise<{ | ||
| result: Task[]; | ||
| page: string; | ||
| pageSize: string; | ||
| totalNumberOfTasks: number; | ||
| }> { | ||
| if (!this.isSetup) { | ||
| throw new Error('PostgresTaskStore not set up. Call .setup() first.'); | ||
| } | ||
|
|
||
| // let jsonbQuery = `task_data ->'metadata' @> jsonb_build_object( | ||
| // 'memberId', $1::text, | ||
| // 'userId', $2::text | ||
| // )`; | ||
| let jsonbQuery = ''; | ||
| if (metadataSearch) { | ||
| jsonbQuery = `task_data ->'metadata' @> jsonb_build_object(${Object.entries(metadataSearch) | ||
| .map(([key, _value], index) => `'${key}', $${index + 1}::text`) | ||
| .join(',')})`; | ||
| } | ||
|
|
||
| let additionalQuery = ''; | ||
| if (status) { | ||
| additionalQuery = `AND (task_data ->'status' ->> 'state') IN (${status?.map((status) => `'${status}'`).join(',')})`; | ||
| } | ||
|
|
||
| const params = [...Object.values(metadataSearch)]; | ||
|
|
||
| const query = `WITH filtered_tasks AS ( | ||
| SELECT * | ||
| FROM ${A2A_DB_NAMES.TASKS} WHERE ${jsonbQuery} ${additionalQuery} | ||
| ), | ||
| paginated_tasks AS ( | ||
| SELECT * | ||
| FROM filtered_tasks | ||
| ORDER BY created_at DESC | ||
| LIMIT ${parseInt(pageSize)} OFFSET ${(parseInt(page) - 1) * parseInt(pageSize)} | ||
| ), | ||
| total_filtered_count AS ( | ||
| SELECT COUNT(*) AS count FROM filtered_tasks | ||
| ) | ||
| SELECT | ||
| ( | ||
| SELECT json_agg(pt ORDER BY pt.created_at DESC) | ||
| FROM paginated_tasks pt | ||
| ) AS tasks, | ||
| total_filtered_count.count | ||
| FROM total_filtered_count;`; | ||
|
|
||
| const res = await this.pool.query(query, params); | ||
|
Comment on lines
+133
to
+168
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The query construction in the
I suggest refactoring the query and parameter building logic to use parameterized queries correctly for all dynamic values and to construct the const params: unknown[] = [];
const whereClauses: string[] = [];
if (metadataSearch && Object.keys(metadataSearch).length > 0) {
params.push(JSON.stringify(metadataSearch));
whereClauses.push(`task_data->'metadata' @> $${params.length}::jsonb`);
}
if (status && status.length > 0) {
params.push(status);
whereClauses.push(`(task_data->'status'->>'state') = ANY($${params.length}::text[])`);
}
const whereClause = whereClauses.length > 0 ? `WHERE ${whereClauses.join(' AND ')}` : '';
const pageNum = parseInt(page, 10);
const pageSizeNum = parseInt(pageSize, 10);
params.push(pageSizeNum);
params.push((pageNum - 1) * pageSizeNum);
const query = `WITH filtered_tasks AS (
SELECT *
FROM ${A2A_DB_NAMES.TASKS} ${whereClause}
),
paginated_tasks AS (
SELECT *
FROM filtered_tasks
ORDER BY created_at DESC
LIMIT $${params.length - 1} OFFSET $${params.length}
),
total_filtered_count AS (
SELECT COUNT(*) AS count FROM filtered_tasks
)
SELECT
(
SELECT json_agg(pt ORDER BY pt.created_at DESC)
FROM paginated_tasks pt
) AS tasks,
total_filtered_count.count
FROM total_filtered_count;`;
const res = await this.pool.query(query, params); |
||
|
|
||
| return { | ||
| result: res.rows[0].tasks | ||
| ? res.rows[0].tasks.map((row: { id: string; task_data: Task }) => { | ||
| const reconstructedTask: Task = { | ||
| id: row.id, | ||
| ...row.task_data, | ||
| }; | ||
| return reconstructedTask; | ||
| }) | ||
| : [], | ||
| page, | ||
| pageSize, | ||
| totalNumberOfTasks: res.rows[0].count, | ||
| }; | ||
|
Comment on lines
+170
to
+183
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The return {
result: res.rows[0].tasks
? res.rows[0].tasks.map((row: { id: string; task_data: Task }) => {
const reconstructedTask: Task = {
id: row.id,
...row.task_data,
};
return reconstructedTask;
})
: [],
totalNumberOfTasks: res.rows[0].count,
}; |
||
| } | ||
|
|
||
| async end(): Promise<void> { | ||
| await this.pool.end(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| export enum A2A_DB_NAMES { | ||
| MIGRATION = 'a2a_public.a2a_migrations', | ||
| TASKS = 'a2a_public.a2a_tasks', | ||
| METADATA_INDEX = 'idx_metadata_gin', | ||
| STATUS_INDEX = 'idx_status_gin', | ||
| } | ||
|
|
||
| export const getMigrations = () => [ | ||
| ` | ||
| CREATE TABLE IF NOT EXISTS ${A2A_DB_NAMES.MIGRATION} ( | ||
| v INT PRIMARY KEY | ||
| ); | ||
| `, | ||
| ` | ||
| CREATE TABLE IF NOT EXISTS ${A2A_DB_NAMES.TASKS} ( | ||
| id VARCHAR(255) PRIMARY KEY, | ||
| task_data JSONB NOT NULL, | ||
| created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), | ||
| updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() | ||
| ); | ||
| `, | ||
|
|
||
| `CREATE INDEX IF NOT EXISTS ${A2A_DB_NAMES.METADATA_INDEX} | ||
| ON ${A2A_DB_NAMES.TASKS} | ||
| USING gin ((task_data->'metadata'));`, | ||
|
|
||
| `CREATE INDEX IF NOT EXISTS ${A2A_DB_NAMES.STATUS_INDEX} | ||
| ON ${A2A_DB_NAMES.TASKS} | ||
| USING gin ((task_data->'status'));`, | ||
| ]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The migration loop does not use a transaction. If a migration script runs successfully but the subsequent
INSERTinto the migrations table fails, the database will be in an inconsistent state. On the next run, the setup will not re-run the successful migration, but the version will not be recorded. It's a best practice to wrap each migration and its version update in a transaction to ensure atomicity.