-
Notifications
You must be signed in to change notification settings - Fork 429
Expand file tree
/
Copy pathqueue_manager.py
More file actions
35 lines (23 loc) · 1.23 KB
/
queue_manager.py
File metadata and controls
35 lines (23 loc) · 1.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from abc import ABC, abstractmethod
from a2a.server.events.event_queue import EventQueueLegacy
class QueueManager(ABC):
"""Interface for managing the event queue lifecycles per task."""
@abstractmethod
async def add(self, task_id: str, queue: EventQueueLegacy) -> None:
"""Adds a new event queue associated with a task ID."""
@abstractmethod
async def get(self, task_id: str) -> EventQueueLegacy | None:
"""Retrieves the event queue for a task ID."""
@abstractmethod
async def tap(self, task_id: str) -> EventQueueLegacy | None:
"""Creates a child event queue (tap) for an existing task ID."""
@abstractmethod
async def close(self, task_id: str) -> None:
"""Closes and removes the event queue for a task ID."""
@abstractmethod
async def create_or_tap(self, task_id: str) -> EventQueueLegacy:
"""Creates a queue if one doesn't exist, otherwise taps the existing one."""
class TaskQueueExists(Exception): # noqa: N818
"""Exception raised when attempting to add a queue for a task ID that already exists."""
class NoTaskQueue(Exception): # noqa: N818
"""Exception raised when attempting to access or close a queue for a task ID that does not exist."""