-
Notifications
You must be signed in to change notification settings - Fork 429
Expand file tree
/
Copy pathinmemory_task_store.py
More file actions
58 lines (48 loc) · 1.95 KB
/
inmemory_task_store.py
File metadata and controls
58 lines (48 loc) · 1.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import asyncio
import logging
from a2a.server.context import ServerCallContext
from a2a.server.tasks.task_store import TaskStore
from a2a.types import Task
logger = logging.getLogger(__name__)
class InMemoryTaskStore(TaskStore):
"""In-memory implementation of TaskStore.
Stores task objects in a dictionary in memory. Task data is lost when the
server process stops.
"""
def __init__(self) -> None:
"""Initializes the InMemoryTaskStore."""
logger.debug('Initializing InMemoryTaskStore')
self.tasks: dict[str, Task] = {}
self.lock = asyncio.Lock()
async def save(
self, task: Task, context: ServerCallContext | None = None
) -> None:
"""Saves or updates a task in the in-memory store."""
async with self.lock:
self.tasks[task.id] = task
logger.debug('Task %s saved successfully.', task.id)
async def get(
self, task_id: str, context: ServerCallContext | None = None
) -> Task | None:
"""Retrieves a task from the in-memory store by ID."""
async with self.lock:
logger.debug('Attempting to get task with id: %s', task_id)
task = self.tasks.get(task_id)
if task:
logger.debug('Task %s retrieved successfully.', task_id)
else:
logger.debug('Task %s not found in store.', task_id)
return task
async def delete(
self, task_id: str, context: ServerCallContext | None = None
) -> None:
"""Deletes a task from the in-memory store by ID."""
async with self.lock:
logger.debug('Attempting to delete task with id: %s', task_id)
if task_id in self.tasks:
del self.tasks[task_id]
logger.debug('Task %s deleted successfully.', task_id)
else:
logger.warning(
'Attempted to delete nonexistent task with id: %s', task_id
)