Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions src/agents/memory/openai_conversations_session.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from openai import AsyncOpenAI

from agents.models._openai_shared import get_default_openai_client
Expand All @@ -8,6 +10,9 @@
from .session import SessionABC
from .session_settings import SessionSettings, resolve_session_limit

if TYPE_CHECKING:
from ..run_context import RunContextWrapper


async def start_openai_conversations_session(openai_client: AsyncOpenAI | None = None) -> str:
_maybe_openai_client = openai_client
Expand Down Expand Up @@ -70,7 +75,11 @@ async def _get_session_id(self) -> str:
async def _clear_session_id(self) -> None:
self._session_id = None

async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
async def get_items(
self,
limit: int | None = None,
wrapper: RunContextWrapper[Any] | None = None,
) -> list[TResponseInputItem]:
session_id = await self._get_session_id()

session_limit = resolve_session_limit(limit, self.session_settings)
Expand All @@ -97,7 +106,11 @@ async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:

return all_items # type: ignore

async def add_items(self, items: list[TResponseInputItem]) -> None:
async def add_items(
self,
items: list[TResponseInputItem],
wrapper: RunContextWrapper[Any] | None = None,
) -> None:
session_id = await self._get_session_id()
if not items:
return
Expand All @@ -107,7 +120,10 @@ async def add_items(self, items: list[TResponseInputItem]) -> None:
items=items,
)

async def pop_item(self) -> TResponseInputItem | None:
async def pop_item(
self,
wrapper: RunContextWrapper[Any] | None = None,
) -> TResponseInputItem | None:
session_id = await self._get_session_id()
items = await self.get_items(limit=1)
if not items:
Expand All @@ -118,7 +134,10 @@ async def pop_item(self) -> TResponseInputItem | None:
)
return items[0]

async def clear_session(self) -> None:
async def clear_session(
self,
wrapper: RunContextWrapper[Any] | None = None,
) -> None:
session_id = await self._get_session_id()
await self._openai_client.conversations.delete(
conversation_id=session_id,
Expand Down
35 changes: 26 additions & 9 deletions src/agents/memory/openai_responses_compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
)

if TYPE_CHECKING:
from ..run_context import RunContextWrapper
from .session import Session

logger = logging.getLogger("openai-agents.openai.compaction")
Expand Down Expand Up @@ -229,8 +230,12 @@ async def run_compaction(self, args: OpenAIResponsesCompactionArgs | None = None
f"candidates={len(self._compaction_candidate_items)})"
)

async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
return await self.underlying_session.get_items(limit)
async def get_items(
self,
limit: int | None = None,
wrapper: RunContextWrapper[Any] | None = None,
) -> list[TResponseInputItem]:
return await self.underlying_session.get_items(limit, wrapper=wrapper)

async def _defer_compaction(self, response_id: str, store: bool | None = None) -> None:
if self._deferred_response_id is not None:
Expand Down Expand Up @@ -258,8 +263,12 @@ def _get_deferred_compaction_response_id(self) -> str | None:
def _clear_deferred_compaction(self) -> None:
self._deferred_response_id = None

async def add_items(self, items: list[TResponseInputItem]) -> None:
await self.underlying_session.add_items(items)
async def add_items(
self,
items: list[TResponseInputItem],
wrapper: RunContextWrapper[Any] | None = None,
) -> None:
await self.underlying_session.add_items(items, wrapper=wrapper)
if self._compaction_candidate_items is not None:
new_items = _normalize_compaction_session_items(items)
new_candidates = select_compaction_candidate_items(new_items)
Expand All @@ -268,15 +277,21 @@ async def add_items(self, items: list[TResponseInputItem]) -> None:
if self._session_items is not None:
self._session_items.extend(_normalize_compaction_session_items(items))

async def pop_item(self) -> TResponseInputItem | None:
popped = await self.underlying_session.pop_item()
async def pop_item(
self,
wrapper: RunContextWrapper[Any] | None = None,
) -> TResponseInputItem | None:
popped = await self.underlying_session.pop_item(wrapper=wrapper)
if popped:
self._compaction_candidate_items = None
self._session_items = None
return popped

async def clear_session(self) -> None:
await self.underlying_session.clear_session()
async def clear_session(
self,
wrapper: RunContextWrapper[Any] | None = None,
) -> None:
await self.underlying_session.clear_session(wrapper=wrapper)
self._compaction_candidate_items = []
self._session_items = []
self._deferred_response_id = None
Expand All @@ -288,7 +303,9 @@ async def _ensure_compaction_candidates(
if self._compaction_candidate_items is not None and self._session_items is not None:
return (self._compaction_candidate_items[:], self._session_items[:])
Comment on lines +308 to 312
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Key compaction candidate cache by wrapper context

_ensure_compaction_candidates accepts a wrapper but still returns cached candidates/session items without checking which wrapper populated that cache. If one OpenAIResponsesCompactionSession instance is reused across different RunContextWrappers (the new feature this commit enables), the second context can reuse the first context’s cached history, causing compaction decisions and compacted inputs to be computed from the wrong tenant/user data.

Useful? React with 👍 / 👎.


history = _normalize_compaction_session_items(await self.underlying_session.get_items())
history = _normalize_compaction_session_items(
await self.underlying_session.get_items()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Thread wrapper through compaction candidate loading

OpenAIResponsesCompactionSession now advertises wrapper-aware session methods, but _ensure_compaction_candidates still reads history via underlying_session.get_items() without forwarding any wrapper. For underlying sessions that scope storage by RunContextWrapper (tenant/user routing), compaction decisions will be computed from the wrong context (or fail when wrapper is required), and subsequent compaction can rewrite incorrect history. This path needs to accept and propagate the active wrapper consistently.

Useful? React with 👍 / 👎.

)
candidates = select_compaction_candidate_items(history)
self._compaction_candidate_items = candidates
self._session_items = history
Expand Down
69 changes: 58 additions & 11 deletions src/agents/memory/session.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Literal, Protocol, TypeGuard, runtime_checkable
from typing import TYPE_CHECKING, Any, Literal, Protocol, TypeGuard, runtime_checkable

from typing_extensions import TypedDict

if TYPE_CHECKING:
from ..items import TResponseInputItem
from ..run_context import RunContextWrapper
from .session_settings import SessionSettings


Expand All @@ -21,36 +22,59 @@ class Session(Protocol):
session_id: str
session_settings: SessionSettings | None = None

async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
async def get_items(
self,
limit: int | None = None,
wrapper: RunContextWrapper[Any] | None = None,
) -> list[TResponseInputItem]:
"""Retrieve the conversation history for this session.

Args:
limit: Maximum number of items to retrieve. If None, retrieves all items.
When specified, returns the latest N items in chronological order.
wrapper: Optional run context wrapper providing context and usage info.

Returns:
List of input items representing the conversation history
"""
...

async def add_items(self, items: list[TResponseInputItem]) -> None:
async def add_items(
self,
items: list[TResponseInputItem],
wrapper: RunContextWrapper[Any] | None = None,
) -> None:
"""Add new items to the conversation history.

Args:
items: List of input items to add to the history
wrapper: Optional run context wrapper providing context and usage info.
"""
...

async def pop_item(self) -> TResponseInputItem | None:
async def pop_item(
self,
wrapper: RunContextWrapper[Any] | None = None,
) -> TResponseInputItem | None:
"""Remove and return the most recent item from the session.

Args:
wrapper: Optional run context wrapper providing context and usage info.

Returns:
The most recent item if it exists, None if the session is empty
"""
...

async def clear_session(self) -> None:
"""Clear all items for this session."""
async def clear_session(
self,
wrapper: RunContextWrapper[Any] | None = None,
) -> None:
"""Clear all items for this session.

Args:
wrapper: Optional run context wrapper providing context and usage info.
"""
...


Expand All @@ -68,39 +92,62 @@ class SessionABC(ABC):
session_settings: SessionSettings | None = None

@abstractmethod
async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
async def get_items(
self,
limit: int | None = None,
wrapper: RunContextWrapper[Any] | None = None,
) -> list[TResponseInputItem]:
"""Retrieve the conversation history for this session.

Args:
limit: Maximum number of items to retrieve. If None, retrieves all items.
When specified, returns the latest N items in chronological order.
wrapper: Optional run context wrapper providing context and usage info.

Returns:
List of input items representing the conversation history
"""
...

@abstractmethod
async def add_items(self, items: list[TResponseInputItem]) -> None:
async def add_items(
self,
items: list[TResponseInputItem],
wrapper: RunContextWrapper[Any] | None = None,
) -> None:
"""Add new items to the conversation history.

Args:
items: List of input items to add to the history
wrapper: Optional run context wrapper providing context and usage info.
"""
...

@abstractmethod
async def pop_item(self) -> TResponseInputItem | None:
async def pop_item(
self,
wrapper: RunContextWrapper[Any] | None = None,
) -> TResponseInputItem | None:
"""Remove and return the most recent item from the session.

Args:
wrapper: Optional run context wrapper providing context and usage info.

Returns:
The most recent item if it exists, None if the session is empty
"""
...

@abstractmethod
async def clear_session(self) -> None:
"""Clear all items for this session."""
async def clear_session(
self,
wrapper: RunContextWrapper[Any] | None = None,
) -> None:
"""Clear all items for this session.

Args:
wrapper: Optional run context wrapper providing context and usage info.
"""
...


Expand Down
38 changes: 32 additions & 6 deletions src/agents/memory/sqlite_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@
from collections.abc import Iterator
from contextlib import contextmanager
from pathlib import Path
from typing import ClassVar
from typing import TYPE_CHECKING, Any, ClassVar

from ..items import TResponseInputItem
from .session import SessionABC
from .session_settings import SessionSettings, resolve_session_limit

if TYPE_CHECKING:
from ..run_context import RunContextWrapper


class SQLiteSession(SessionABC):
"""SQLite-based implementation of session storage.
Expand Down Expand Up @@ -199,12 +202,17 @@ def _insert_items(self, conn: sqlite3.Connection, items: list[TResponseInputItem
(self.session_id,),
)

async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
async def get_items(
self,
limit: int | None = None,
wrapper: RunContextWrapper[Any] | None = None,
) -> list[TResponseInputItem]:
"""Retrieve the conversation history for this session.

Args:
limit: Maximum number of items to retrieve. If None, uses session_settings.limit.
When specified, returns the latest N items in chronological order.
wrapper: Optional run context wrapper providing context and usage info.

Returns:
List of input items representing the conversation history
Expand Down Expand Up @@ -254,11 +262,16 @@ def _get_items_sync():

return await asyncio.to_thread(_get_items_sync)

async def add_items(self, items: list[TResponseInputItem]) -> None:
async def add_items(
self,
items: list[TResponseInputItem],
wrapper: RunContextWrapper[Any] | None = None,
) -> None:
"""Add new items to the conversation history.

Args:
items: List of input items to add to the history
wrapper: Optional run context wrapper providing context and usage info.
"""
if not items:
return
Expand All @@ -270,9 +283,15 @@ def _add_items_sync():

await asyncio.to_thread(_add_items_sync)

async def pop_item(self) -> TResponseInputItem | None:
async def pop_item(
self,
wrapper: RunContextWrapper[Any] | None = None,
) -> TResponseInputItem | None:
"""Remove and return the most recent item from the session.

Args:
wrapper: Optional run context wrapper providing context and usage info.

Returns:
The most recent item if it exists, None if the session is empty
"""
Expand Down Expand Up @@ -310,8 +329,15 @@ def _pop_item_sync():

return await asyncio.to_thread(_pop_item_sync)

async def clear_session(self) -> None:
"""Clear all items for this session."""
async def clear_session(
self,
wrapper: RunContextWrapper[Any] | None = None,
) -> None:
"""Clear all items for this session.

Args:
wrapper: Optional run context wrapper providing context and usage info.
"""

def _clear_session_sync():
with self._locked_connection() as conn:
Expand Down
Loading