Skip to content

Commit 2d2d31e

Browse files
committed
feat(server): implement resource scoping for tasks and push notifications
Introduces caller indentity isolation to ensure clients only access authorized resources, as mandated by the A2A spec. - Add 'owner' field to `TaskMixin` and `PushNotificationConfig` database models. - Add 'last_updated' field to `TaskMixin` for optimized sorting and indexing. - Update `DatabaseTaskStore`, `InMemoryTaskStore` and `DatabasePushNotificationConfigStore` to use `OwnerResolver`. - Add relevant Unit tests. - Add Alembic configuration to enable users to update their own databases with non-optional `owner` field in `tasks` table.
1 parent 5d42015 commit 2d2d31e

17 files changed

Lines changed: 823 additions & 80 deletions

alembic.ini

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# A generic, single database configuration.
2+
3+
[alembic]
4+
5+
# database URL. This is consumed by the user-maintained env.py script only.
6+
# other means of configuring database URLs may be customized within the env.py
7+
# file.
8+
# IMPORTANT: This is a placeholder and an example, and should be replaced with your actual database URL.
9+
sqlalchemy.url = sqlite+aiosqlite:///./test.db
10+
11+
12+
# Logging configuration
13+
[loggers]
14+
keys = root,sqlalchemy,alembic
15+
16+
[handlers]
17+
keys = console
18+
19+
[formatters]
20+
keys = generic
21+
22+
[logger_root]
23+
level = WARNING
24+
handlers = console
25+
qualname =
26+
27+
[logger_sqlalchemy]
28+
level = WARNING
29+
handlers =
30+
qualname = sqlalchemy.engine
31+
32+
[logger_alembic]
33+
level = INFO
34+
handlers =
35+
qualname = alembic
36+
37+
[handler_console]
38+
class = StreamHandler
39+
args = (sys.stderr,)
40+
level = NOTSET
41+
formatter = generic
42+
43+
[formatter_generic]
44+
format = %(levelname)-5.5s [%(name)s] %(message)s
45+
datefmt = %H:%M:%S

alembic/README

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# Database Migrations with Alembic
2+
3+
This directory contains database migration scripts for the A2A SDK, managed by [Alembic](https://alembic.sqlalchemy.org/).
4+
5+
## Configuration
6+
7+
- `alembic.ini`: Global configuration for Alembic, including the database URL.
8+
- `env.py`: Python script that runs when the Alembic environment is invoked. It configures the SQLAlchemy engine and connects it to the migration context.
9+
- `versions/`: Directory containing individual migration scripts.
10+
11+
## Common Commands
12+
13+
All commands should be run from the project root using `uv run`.
14+
15+
### Viewing Status
16+
```bash
17+
# View current migration version of the database
18+
uv run alembic current
19+
20+
# View migration history
21+
uv run alembic history --verbose
22+
```
23+
24+
### Running Migrations
25+
```bash
26+
# Upgrade to the latest version
27+
uv run alembic upgrade head
28+
29+
# Downgrade by one version
30+
uv run alembic downgrade base
31+
```
32+
33+
### Creating Migrations
34+
```bash
35+
# Create a new migration manually
36+
uv run alembic revision -m "description of changes"
37+
38+
# Create a new migration automatically (detects changes in models.py)
39+
uv run alembic revision --autogenerate -m "description of changes"
40+
```
41+
42+
## Troubleshooting
43+
44+
### "duplicate column name" error
45+
If you see an error like `sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) duplicate column name: owner`, it usually means the column was already created (perhaps by `Base.metadata.create_all()` in tests or development) but Alembic doesn't know about it yet.
46+
47+
To fix this, "stamp" the database to tell Alembic it is already at the latest version:
48+
```bash
49+
uv run alembic stamp head
50+
```
51+
52+
## How to add a new migration
53+
1. Modify the models in `src/a2a/server/models.py`.
54+
2. Run `uv run alembic revision --autogenerate -m "Add new field to Task"`.
55+
3. Review the generated script in `alembic/versions/`.
56+
4. Apply the migration with `uv run alembic upgrade head`.

alembic/env.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import asyncio
2+
3+
from logging.config import fileConfig
4+
5+
from sqlalchemy import pool
6+
from sqlalchemy.ext.asyncio import async_engine_from_config
7+
8+
from a2a.server.models import Base
9+
from alembic import context
10+
11+
12+
# this is the Alembic Config object, which provides
13+
# access to the values within the .ini file in use.
14+
config = context.config
15+
16+
# Interpret the config file for Python logging.
17+
# This line sets up loggers basically.
18+
if config.config_file_name is not None:
19+
fileConfig(config.config_file_name)
20+
21+
# add your model's MetaData object here for 'autogenerate' support
22+
target_metadata = Base.metadata
23+
24+
# other values from the config, defined by the needs of env.py,
25+
# can be acquired:
26+
# my_important_option = config.get_main_option("my_important_option")
27+
# ... etc.
28+
29+
30+
def run_migrations_offline() -> None:
31+
"""Run migrations in 'offline' mode.
32+
33+
This configures the context with just a URL
34+
and not an Engine, though an Engine is acceptable
35+
here as well. By skipping the Engine creation
36+
we don't even need a DBAPI to be available.
37+
38+
Calls to context.execute() here emit the given string to the
39+
script output.
40+
41+
"""
42+
url = config.get_main_option('sqlalchemy.url')
43+
context.configure(
44+
url=url,
45+
target_metadata=target_metadata,
46+
literal_binds=True,
47+
dialect_opts={'paramstyle': 'named'},
48+
)
49+
50+
with context.begin_transaction():
51+
context.run_migrations()
52+
53+
54+
def do_run_migrations(connection):
55+
context.configure(connection=connection, target_metadata=target_metadata)
56+
57+
with context.begin_transaction():
58+
context.run_migrations()
59+
60+
61+
async def run_async_migrations():
62+
"""In this scenario we need to create an Engine
63+
and associate a connection with the context.
64+
"""
65+
connectable = async_engine_from_config(
66+
config.get_section(config.config_ini_section),
67+
prefix='sqlalchemy.',
68+
poolclass=pool.NullPool,
69+
)
70+
71+
async with connectable.connect() as connection:
72+
await connection.run_sync(do_run_migrations)
73+
74+
await connectable.dispose()
75+
76+
77+
def run_migrations_online():
78+
"""Run migrations in 'online' mode."""
79+
asyncio.run(run_async_migrations())
80+
81+
82+
if context.is_offline_mode():
83+
run_migrations_offline()
84+
else:
85+
run_migrations_online()

alembic/script.py.mako

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
"""${message}
2+
3+
Revision ID: ${up_revision}
4+
Revises: ${down_revision | comma,n}
5+
Create Date: ${create_date}
6+
7+
"""
8+
from typing import Sequence, Union
9+
10+
from alembic import op
11+
import sqlalchemy as sa
12+
${imports if imports else ""}
13+
14+
# revision identifiers, used by Alembic.
15+
revision: str = ${repr(up_revision)}
16+
down_revision: Union[str, Sequence[str], None] = ${repr(down_revision)}
17+
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
18+
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
19+
20+
21+
def upgrade() -> None:
22+
"""Upgrade schema."""
23+
${upgrades if upgrades else "pass"}
24+
25+
26+
def downgrade() -> None:
27+
"""Downgrade schema."""
28+
${downgrades if downgrades else "pass"}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
"""add_owner_to_task
2+
3+
Revision ID: 6419d2d130f6
4+
Revises:
5+
Create Date: 2026-02-17 09:23:06.758085
6+
7+
"""
8+
9+
from collections.abc import Sequence
10+
11+
import sqlalchemy as sa
12+
13+
from alembic import op
14+
15+
16+
# revision identifiers, used by Alembic.
17+
revision: str = '6419d2d130f6'
18+
down_revision: str | Sequence[str] | None = None
19+
branch_labels: str | Sequence[str] | None = None
20+
depends_on: str | Sequence[str] | None = None
21+
22+
23+
def upgrade() -> None:
24+
"""Upgrade schema."""
25+
op.add_column(
26+
'tasks',
27+
sa.Column(
28+
'owner',
29+
sa.String(255),
30+
nullable=False,
31+
server_default='unknown', # Set your desired default value here
32+
),
33+
)
34+
35+
36+
def downgrade() -> None:
37+
"""Downgrade schema."""
38+
op.drop_column('tasks', 'owner')

pyproject.toml

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,3 +323,89 @@ docstring-code-format = true
323323
docstring-code-line-length = "dynamic"
324324
quote-style = "single"
325325
indent-style = "space"
326+
327+
328+
[tool.alembic]
329+
330+
# path to migration scripts.
331+
# this is typically a path given in POSIX (e.g. forward slashes)
332+
# format, relative to the token %(here)s which refers to the location of this
333+
# ini file
334+
script_location = "%(here)s/alembic"
335+
336+
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
337+
# Uncomment the line below if you want the files to be prepended with date and time
338+
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
339+
# for all available tokens
340+
# file_template = "%%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s"
341+
# Or organize into date-based subdirectories (requires recursive_version_locations = true)
342+
# file_template = "%%(year)d/%%(month).2d/%%(day).2d_%%(hour).2d%%(minute).2d_%%(second).2d_%%(rev)s_%%(slug)s"
343+
344+
# additional paths to be prepended to sys.path. defaults to the current working directory.
345+
prepend_sys_path = [
346+
"."
347+
]
348+
349+
# timezone to use when rendering the date within the migration file
350+
# as well as the filename.
351+
# If specified, requires the tzdata library which can be installed by adding
352+
# `alembic[tz]` to the pip requirements.
353+
# string value is passed to ZoneInfo()
354+
# leave blank for localtime
355+
# timezone =
356+
357+
# max length of characters to apply to the "slug" field
358+
# truncate_slug_length = 40
359+
360+
# set to 'true' to run the environment during
361+
# the 'revision' command, regardless of autogenerate
362+
# revision_environment = false
363+
364+
# set to 'true' to allow .pyc and .pyo files without
365+
# a source .py file to be detected as revisions in the
366+
# versions/ directory
367+
# sourceless = false
368+
369+
# version location specification; This defaults
370+
# to <script_location>/versions. When using multiple version
371+
# directories, initial revisions must be specified with --version-path.
372+
# version_locations = [
373+
# "%(here)s/alembic/versions",
374+
# "%(here)s/foo/bar"
375+
# ]
376+
377+
378+
# set to 'true' to search source files recursively
379+
# in each "version_locations" directory
380+
# new in Alembic version 1.10
381+
# recursive_version_locations = false
382+
383+
# the output encoding used when revision files
384+
# are written from script.py.mako
385+
# output_encoding = "utf-8"
386+
387+
# This section defines scripts or Python functions that are run
388+
# on newly generated revision scripts. See the documentation for further
389+
# detail and examples
390+
# [[tool.alembic.post_write_hooks]]
391+
# format using "black" - use the console_scripts runner,
392+
# against the "black" entrypoint
393+
# name = "black"
394+
# type = "console_scripts"
395+
# entrypoint = "black"
396+
# options = "-l 79 REVISION_SCRIPT_FILENAME"
397+
#
398+
# [[tool.alembic.post_write_hooks]]
399+
# lint with attempts to fix using "ruff" - use the module runner, against the "ruff" module
400+
# name = "ruff"
401+
# type = "module"
402+
# module = "ruff"
403+
# options = "check --fix REVISION_SCRIPT_FILENAME"
404+
#
405+
# [[tool.alembic.post_write_hooks]]
406+
# Alternatively, use the exec runner to execute a binary found on your PATH
407+
# name = "ruff"
408+
# type = "exec"
409+
# executable = "ruff"
410+
# options = "check --fix REVISION_SCRIPT_FILENAME"
411+

src/a2a/server/models.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import datetime
2+
13
from typing import TYPE_CHECKING, Any, Generic, TypeVar
24

35

@@ -16,7 +18,7 @@ def override(func): # noqa: ANN001, ANN201
1618

1719

1820
try:
19-
from sqlalchemy import JSON, Dialect, LargeBinary, String
21+
from sqlalchemy import JSON, Dialect, Index, LargeBinary, String
2022
from sqlalchemy.orm import (
2123
DeclarativeBase,
2224
Mapped,
@@ -127,6 +129,8 @@ class TaskMixin:
127129
kind: Mapped[str] = mapped_column(
128130
String(16), nullable=False, default='task'
129131
)
132+
owner: Mapped[str] = mapped_column(String(255), nullable=False, index=True)
133+
last_updated: Mapped[datetime] = mapped_column(String(22), nullable=True)
130134

131135
# Properly typed Pydantic fields with automatic serialization
132136
status: Mapped[TaskStatus] = mapped_column(PydanticType(TaskStatus))
@@ -152,6 +156,17 @@ def __repr__(self) -> str:
152156
f'context_id="{self.context_id}", status="{self.status}")>'
153157
)
154158

159+
@declared_attr
160+
@classmethod
161+
def __table_args__(cls) -> tuple[Any, ...]:
162+
"""Define a unique index (owner, last_updated) for each table that uses the mixin."""
163+
tablename = getattr(cls, '__tablename__', 'tasks')
164+
return (
165+
Index(
166+
f'idx_{tablename}_owner_last_updated', 'owner', 'last_updated'
167+
),
168+
)
169+
155170

156171
def create_task_model(
157172
table_name: str = 'tasks', base: type[DeclarativeBase] = Base
@@ -212,6 +227,7 @@ class PushNotificationConfigMixin:
212227
task_id: Mapped[str] = mapped_column(String(36), primary_key=True)
213228
config_id: Mapped[str] = mapped_column(String(255), primary_key=True)
214229
config_data: Mapped[bytes] = mapped_column(LargeBinary, nullable=False)
230+
owner: Mapped[str] = mapped_column(String(255), nullable=False, index=True)
215231

216232
@override
217233
def __repr__(self) -> str:

0 commit comments

Comments
 (0)