forked from a2aproject/a2a-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnotifications_app.py
More file actions
69 lines (56 loc) · 2.02 KB
/
notifications_app.py
File metadata and controls
69 lines (56 loc) · 2.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import asyncio
from typing import Annotated
from fastapi import FastAPI, HTTPException, Path, Request
from pydantic import BaseModel, ValidationError
from a2a.types import Task
class Notification(BaseModel):
"""Encapsulates default push notification data."""
task: Task
token: str
def create_notifications_app() -> FastAPI:
"""Creates a simple push notification ingesting HTTP+REST application."""
app = FastAPI()
store_lock = asyncio.Lock()
store: dict[str, list[Notification]] = {}
@app.post('/notifications')
async def add_notification(request: Request):
"""Endpoint for ingesting notifications from agents. It receives a JSON
payload and stores it in-memory.
"""
token = request.headers.get('x-a2a-notification-token')
if not token:
raise HTTPException(
status_code=400,
detail='Missing "x-a2a-notification-token" header.',
)
try:
task = Task.model_validate(await request.json())
except ValidationError as e:
raise HTTPException(status_code=400, detail=str(e))
async with store_lock:
if task.id not in store:
store[task.id] = []
store[task.id].append(
Notification(
task=task,
token=token,
)
)
return {
'status': 'received',
}
@app.get('/tasks/{task_id}/notifications')
async def list_notifications_by_task(
task_id: Annotated[
str, Path(title='The ID of the task to list the notifications for.')
],
):
"""Helper endpoint for retrieving ingested notifications for a given task."""
async with store_lock:
notifications = store.get(task_id, [])
return {'notifications': notifications}
@app.get('/health')
def health_check():
"""Helper endpoint for checking if the server is up."""
return {'status': 'ok'}
return app