Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
"import": "./dist/server/express/index.js",
"require": "./dist/server/express/index.cjs"
},
"./server/pgTaskStore": {
"types": "./dist/server/pgTaskStore/index.d.ts",
"import": "./dist/server/pgTaskStore/index.js",
"require": "./dist/server/pgTaskStore/index.cjs"
},
"./client": {
"types": "./dist/client/index.d.ts",
"import": "./dist/client/index.js",
Expand All @@ -45,6 +50,7 @@
"@eslint/js": "^9.39.1",
"@types/express": "^5.0.3",
"@types/node": "^22.13.14",
"@types/pg": "^8.16.0",
"@types/sinon": "^17.0.4",
"@types/supertest": "^6.0.3",
"@vitest/coverage-v8": "^4.0.15",
Expand Down
189 changes: 189 additions & 0 deletions src/server/pgTaskStore/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
import { Task, TaskStatus } from '../../types.js';
import { TaskStore } from '../store.js';
import { A2A_DB_NAMES, getMigrations } from './migrations/index.js';
import { Pool } from 'pg';

/**
* A persistent TaskStore implementation using PostgreSQL.
*
* Assumes the following table schema:
* CREATE TABLE a2a_tasks (
* id VARCHAR(255) PRIMARY KEY,
* task_data JSONB NOT NULL,
* created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
* updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
* );
*/
export class PostgresTaskStore implements TaskStore {
private pool: Pool;
private isSetup: boolean = false;

constructor(pool: Pool) {
this.pool = pool;
}

async setup(): Promise<void> {
if (this.isSetup) {
return;
}
const client = await this.pool.connect();
try {
// Ensure the public schema exists (it usually does by default)
await client.query(`CREATE SCHEMA IF NOT EXISTS a2a_public;`);
let version = -1;
const MIGRATIONS = getMigrations();

try {
const result = await client.query(
`SELECT v FROM ${A2A_DB_NAMES.MIGRATION} ORDER BY v DESC LIMIT 1`
);
if (result.rows.length > 0) {
version = result.rows[0].v;
}
} catch (error) {
// Assume table doesn't exist if there's an error
if (
typeof error === 'object' &&
error !== null &&
'code' in error &&
typeof error.code === 'string' &&
error.code === '42P01' // Postgres error code for undefined_table
) {
version = -1;
} else {
throw error;
}
}

for (let v = version + 1; v < MIGRATIONS.length; v += 1) {
await client.query(MIGRATIONS[v]);
await client.query(`INSERT INTO ${A2A_DB_NAMES.MIGRATION} (v) VALUES ($1)`, [v]);
}
Comment on lines +58 to +61
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The migration loop does not use a transaction. If a migration script runs successfully but the subsequent INSERT into the migrations table fails, the database will be in an inconsistent state. On the next run, the setup will not re-run the successful migration, but the version will not be recorded. It's a best practice to wrap each migration and its version update in a transaction to ensure atomicity.

      for (let v = version + 1; v < MIGRATIONS.length; v += 1) {
        await client.query('BEGIN');
        try {
          await client.query(MIGRATIONS[v]);
          await client.query(`INSERT INTO ${A2A_DB_NAMES.MIGRATION} (v) VALUES ($1)`, [v]);
          await client.query('COMMIT');
        } catch (e) {
          await client.query('ROLLBACK');
          throw e;
        }
      }


this.isSetup = true;
} finally {
client.release();
}
}

public async load(id: string): Promise<Task | undefined> {
if (!this.isSetup) {
throw new Error('PostgresTaskStore not set up. Call .setup() first.');
}
const res = await this.pool.query(`SELECT * FROM ${A2A_DB_NAMES.TASKS} WHERE id = $1`, [id]);
if (res.rows.length === 0) {
return undefined;
}
const row = res.rows[0];
const reconstructedTask: Task = {
id: row.id,
...row.task_data, // Spread the rest of the task_data
};

return reconstructedTask;
}

public async save(task: Task): Promise<void> {
if (!this.isSetup) {
throw new Error('PostgresTaskStore not set up. Call .setup() first.');
}

const taskId = task.id;

if (!taskId) {
throw new Error('Task object must contain task id properties for persistence.');
}

await this.pool.query(
`INSERT INTO ${A2A_DB_NAMES.TASKS} (id, task_data)
VALUES ($1, $2)
ON CONFLICT (id) DO UPDATE SET
task_data = $2,
updated_at = NOW()`,
[taskId, task]
);
Comment on lines +91 to +104
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The save method stores the entire task object in the task_data JSONB column, which includes the id property. Since the id is already stored in a dedicated id column, this is redundant. To avoid data duplication and keep the task_data column clean, I suggest destructuring the id from the task object and storing only the remaining data.

    const { id: taskId, ...taskData } = task;

    if (!taskId) {
      throw new Error('Task object must contain task id properties for persistence.');
    }

    await this.pool.query(
      `INSERT INTO ${A2A_DB_NAMES.TASKS} (id, task_data)
       VALUES ($1, $2)
       ON CONFLICT (id) DO UPDATE SET
         task_data = $2,
         updated_at = NOW()`,
      [taskId, taskData]
    );

}

public async delete(id: string): Promise<void> {
if (!this.isSetup) {
throw new Error('PostgresTaskStore not set up. Call .setup() first.');
}
await this.pool.query(`DELETE FROM ${A2A_DB_NAMES.TASKS} WHERE id = $1`, [id]);
}

public async list(
page: string = '1',
pageSize: string = '10',
status?: TaskStatus['state'][],
metadataSearch?: Record<string, unknown>
): Promise<{
result: Task[];
page: string;
pageSize: string;
totalNumberOfTasks: number;
}> {
if (!this.isSetup) {
throw new Error('PostgresTaskStore not set up. Call .setup() first.');
}

// let jsonbQuery = `task_data ->'metadata' @> jsonb_build_object(
// 'memberId', $1::text,
// 'userId', $2::text
// )`;
let jsonbQuery = '';
if (metadataSearch) {
jsonbQuery = `task_data ->'metadata' @> jsonb_build_object(${Object.entries(metadataSearch)
.map(([key, _value], index) => `'${key}', $${index + 1}::text`)
.join(',')})`;
}

let additionalQuery = '';
if (status) {
additionalQuery = `AND (task_data ->'status' ->> 'state') IN (${status?.map((status) => `'${status}'`).join(',')})`;
}

const params = [...Object.values(metadataSearch)];

const query = `WITH filtered_tasks AS (
SELECT *
FROM ${A2A_DB_NAMES.TASKS} WHERE ${jsonbQuery} ${additionalQuery}
),
paginated_tasks AS (
SELECT *
FROM filtered_tasks
ORDER BY created_at DESC
LIMIT ${parseInt(pageSize)} OFFSET ${(parseInt(page) - 1) * parseInt(pageSize)}
),
total_filtered_count AS (
SELECT COUNT(*) AS count FROM filtered_tasks
)
SELECT
(
SELECT json_agg(pt ORDER BY pt.created_at DESC)
FROM paginated_tasks pt
) AS tasks,
total_filtered_count.count
FROM total_filtered_count;`;

const res = await this.pool.query(query, params);
Comment on lines +133 to +168
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The query construction in the list method has several critical issues:

  1. SQL Injection Vulnerability: Both metadataSearch keys (line 136) and status values (line 142) are directly interpolated into the SQL query string. This makes the application vulnerable to SQL injection attacks. All external input should be treated as untrusted and passed as query parameters.
  2. Incorrect WHERE clause construction: The WHERE clause is built in a way that can lead to SQL syntax errors. For instance, if metadataSearch is not provided but status is, the query will contain WHERE AND ..., which is invalid.

I suggest refactoring the query and parameter building logic to use parameterized queries correctly for all dynamic values and to construct the WHERE clause safely. This addresses both the security vulnerabilities and the logic bug.

    const params: unknown[] = [];
    const whereClauses: string[] = [];

    if (metadataSearch && Object.keys(metadataSearch).length > 0) {
      params.push(JSON.stringify(metadataSearch));
      whereClauses.push(`task_data->'metadata' @> $${params.length}::jsonb`);
    }

    if (status && status.length > 0) {
      params.push(status);
      whereClauses.push(`(task_data->'status'->>'state') = ANY($${params.length}::text[])`);
    }

    const whereClause = whereClauses.length > 0 ? `WHERE ${whereClauses.join(' AND ')}` : '';

    const pageNum = parseInt(page, 10);
    const pageSizeNum = parseInt(pageSize, 10);

    params.push(pageSizeNum);
    params.push((pageNum - 1) * pageSizeNum);

    const query = `WITH filtered_tasks AS (
        SELECT *
        FROM ${A2A_DB_NAMES.TASKS} ${whereClause}
      ),
      paginated_tasks AS (
        SELECT *
        FROM filtered_tasks
        ORDER BY created_at DESC
        LIMIT $${params.length - 1} OFFSET $${params.length}
      ),
      total_filtered_count AS (
        SELECT COUNT(*) AS count FROM filtered_tasks
      )
      SELECT
        (
          SELECT json_agg(pt ORDER BY pt.created_at DESC)
          FROM paginated_tasks pt
        ) AS tasks,
        total_filtered_count.count
      FROM total_filtered_count;`;

    const res = await this.pool.query(query, params);


return {
result: res.rows[0].tasks
? res.rows[0].tasks.map((row: { id: string; task_data: Task }) => {
const reconstructedTask: Task = {
id: row.id,
...row.task_data,
};
return reconstructedTask;
})
: [],
page,
pageSize,
totalNumberOfTasks: res.rows[0].count,
};
Comment on lines +170 to +183
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The list method's return type does not match the TaskStore interface it implements. The interface expects Promise<{ result: Task[]; totalNumberOfTasks: number }>, but this implementation returns page and pageSize as well. This violates the interface contract and will likely cause type errors in consuming code.

    return {
      result: res.rows[0].tasks
        ? res.rows[0].tasks.map((row: { id: string; task_data: Task }) => {
            const reconstructedTask: Task = {
              id: row.id,
              ...row.task_data,
            };
            return reconstructedTask;
          })
        : [],
      totalNumberOfTasks: res.rows[0].count,
    };

}

async end(): Promise<void> {
await this.pool.end();
}
}
30 changes: 30 additions & 0 deletions src/server/pgTaskStore/migrations/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
export enum A2A_DB_NAMES {
MIGRATION = 'a2a_public.a2a_migrations',
TASKS = 'a2a_public.a2a_tasks',
METADATA_INDEX = 'idx_metadata_gin',
STATUS_INDEX = 'idx_status_gin',
}

export const getMigrations = () => [
`
CREATE TABLE IF NOT EXISTS ${A2A_DB_NAMES.MIGRATION} (
v INT PRIMARY KEY
);
`,
`
CREATE TABLE IF NOT EXISTS ${A2A_DB_NAMES.TASKS} (
id VARCHAR(255) PRIMARY KEY,
task_data JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
`,

`CREATE INDEX IF NOT EXISTS ${A2A_DB_NAMES.METADATA_INDEX}
ON ${A2A_DB_NAMES.TASKS}
USING gin ((task_data->'metadata'));`,

`CREATE INDEX IF NOT EXISTS ${A2A_DB_NAMES.STATUS_INDEX}
ON ${A2A_DB_NAMES.TASKS}
USING gin ((task_data->'status'));`,
];
Loading
Loading