Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/praisonai-agents/praisonaiagents/gateway/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@
OutboundDeliveryProtocol,
ChannelInfo,
PresenceInfo,
# Home channel and delivery protocols
HomeChannelRegistryProtocol,
DeliveryResolverProtocol,
# Protocol version negotiation
PROTOCOL_VERSION,
MIN_PROTOCOL_VERSION,
MAX_PROTOCOL_VERSION,
ProtocolHello,
ProtocolHelloOk,
GapInfo,
ResumeSnapshot,
)
from .config import (
GatewayConfig,
Expand Down Expand Up @@ -93,6 +104,17 @@ def __getattr__(name: str):
"OutboundDeliveryProtocol",
"ChannelInfo",
"PresenceInfo",
# Home channel and delivery protocols
"HomeChannelRegistryProtocol",
"DeliveryResolverProtocol",
# Protocol version negotiation
"PROTOCOL_VERSION",
"MIN_PROTOCOL_VERSION",
"MAX_PROTOCOL_VERSION",
"ProtocolHello",
"ProtocolHelloOk",
"GapInfo",
"ResumeSnapshot",
# Config (always available)
"GatewayConfig",
"SessionConfig",
Expand Down
61 changes: 59 additions & 2 deletions src/praisonai-agents/praisonaiagents/gateway/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
Literal,
Optional,
Protocol,
TypedDict,
Union,
runtime_checkable,
)
Expand Down Expand Up @@ -173,6 +174,7 @@ class GatewayEvent:
timestamp: Event creation time
source: Source identifier (agent_id, client_id, etc.)
target: Target identifier (optional, for directed events)
sequence: Monotonic sequence number for gap detection (optional)
"""

type: Union[EventType, str]
Expand All @@ -181,17 +183,21 @@ class GatewayEvent:
timestamp: float = field(default_factory=time.time)
source: Optional[str] = None
target: Optional[str] = None
sequence: Optional[int] = None # Monotonic sequence for gap detection

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
result = {
"type": self.type.value if isinstance(self.type, EventType) else self.type,
"data": self.data,
"event_id": self.event_id,
"timestamp": self.timestamp,
"source": self.source,
"target": self.target,
}
if self.sequence is not None:
result["sequence"] = self.sequence
return result

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "GatewayEvent":
Expand All @@ -209,6 +215,7 @@ def from_dict(cls, data: Dict[str, Any]) -> "GatewayEvent":
timestamp=data.get("timestamp", time.time()),
source=data.get("source"),
target=data.get("target"),
sequence=data.get("sequence"),
)


Expand Down Expand Up @@ -1115,7 +1122,6 @@ def lookup(self, session_id: str) -> Optional[Dict[str, Any]]:
...


# ---------------------------------------------------------------------------
# Home Channel and Delivery Routing Protocols
# ---------------------------------------------------------------------------

Expand Down Expand Up @@ -1193,3 +1199,54 @@ def resolve(
List of concrete delivery targets
"""
...


# ---------------------------------------------------------------------------
# Protocol Version Negotiation (Issue #2130)
# ---------------------------------------------------------------------------

# Protocol version constants
PROTOCOL_VERSION = 1
MIN_PROTOCOL_VERSION = 1
MAX_PROTOCOL_VERSION = 1


class ProtocolHello(TypedDict, total=False):
"""Protocol version negotiation handshake request.

Sent by client during join to negotiate protocol version.
"""
min_version: int # Minimum protocol version client supports
max_version: int # Maximum protocol version client supports
features: List[str] # Optional feature flags


class ProtocolHelloOk(TypedDict):
"""Protocol version negotiation response.

Server's response to protocol negotiation.
"""
protocol_version: int # Negotiated protocol version
server_min_version: int # Server's minimum supported version
server_max_version: int # Server's maximum supported version
features: List[str] # Enabled feature flags


class GapInfo(TypedDict):
"""Information about a gap in the event sequence."""
expected_seq: int # Expected sequence number
received_seq: int # Received sequence number
missed_count: int # Number of events missed


class ResumeSnapshot(TypedDict, total=False):
"""Complete snapshot for session resumption.

Provides all necessary state for one-round-trip reconnection.
"""
cursor: int # Resume cursor position
sequence: int # Current sequence number for gap detection
events: List[Dict[str, Any]] # Replayed events since cursor
presence: List[Dict[str, Any]] # Current presence information
health: Dict[str, Any] # Gateway health status
session_state: Dict[str, Any] # Session-specific state
9 changes: 9 additions & 0 deletions src/praisonai/praisonai/gateway/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

if TYPE_CHECKING:
from .server import WebSocketGateway, GatewaySession
from .client import GatewayClient, BackoffConfig
from .rate_limiter import AuthRateLimiter
from .pairing import PairingStore
from .exec_approval import ExecApprovalManager, get_exec_approval_manager
Expand All @@ -23,6 +24,12 @@ def __getattr__(name: str):
if name == "GatewaySession":
from .server import GatewaySession
return GatewaySession
if name == "GatewayClient":
from .client import GatewayClient
return GatewayClient
if name == "BackoffConfig":
from .client import BackoffConfig
return BackoffConfig
# Security / approval primitives
if name == "AuthRateLimiter":
from .rate_limiter import AuthRateLimiter
Expand Down Expand Up @@ -50,6 +57,8 @@ def __getattr__(name: str):
__all__ = [
"WebSocketGateway",
"GatewaySession",
"GatewayClient",
"BackoffConfig",
"AuthRateLimiter",
"PairingStore",
"ExecApprovalManager",
Expand Down
Loading
Loading