Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,16 @@ The key of the pairs stored in `MemoryObject` must be a string. The supported va

**Python** is restricted to recursively *checkpoint-stable* values:

- **Primitive Types**: `None`, `bool`, `int`, `float`, `str`
- **Primitive Types**: `None`, `bool`, `int`, `float`, `str`, `bytes`
- **Collections**: `list`, and `dict` with `str` keys (values are recursively validated)
- **Memory Object**: A nested `MemoryObject` created via `new_object()`.

Anything else — Pydantic models, `uuid.UUID`, `Enum`, custom classes, `tuple`, `set`, or a `dict` with non-`str` keys — is **rejected by `set()` with a `TypeError`**. `bytes` is not supported yet.
Anything else — Pydantic models, `uuid.UUID`, `Enum`, custom classes, `tuple`, `set`, `bytearray`, or a `dict` with non-`str` keys — is **rejected by `set()` with a `TypeError`**. Exact `bytes` is supported (it converts to a native Java `byte[]`), but `bytearray` and `bytes` subclasses are not — Pemja wraps them as non-checkpoint-stable objects rather than materializing a `byte[]`.

This is because Python values are converted across the Pemja boundary into Flink state, and only the types above materialize into native, checkpoint-stable JVM values; other objects would be stored as wrappers that fail on state restore. To store a richer object, materialize it to a primitive form first (e.g. `model.model_dump(mode="json")` for a Pydantic model, or `str(value)` for a UUID) and reconstruct it on read.

{{< hint warning >}}
Python memory values must be checkpoint-stable primitives, unlike the Java contract which also supports POJOs and Kryo-serializable objects. Python values materialize across the Pemja boundary before reaching Flink state, so models and other objects must be materialized first with `model_dump(mode="json")` (or `str(...)`) and reconstructed on read.
Python memory values must be checkpoint-stable primitives, unlike the Java contract which also supports POJOs and Kryo-serializable objects. Python values materialize across the Pemja boundary before reaching Flink state, so models and other objects must be materialized first with `model_dump(mode="json")` (or `str(...)`) and reconstructed on read. Use exact `bytes` rather than `bytearray` for binary values, since only `bytes` materializes into a native `byte[]`.
{{< /hint >}}

### Read & Write
Expand Down
11 changes: 7 additions & 4 deletions python/flink_agents/api/memory_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
# Exact builtin types Pemja materializes into native, checkpoint-stable JVM values.
# Exact-type (not isinstance): a str/int Enum or numpy scalar is a subclass that Pemja
# PyObject-wraps despite passing isinstance — accepting it would defeat the validator.
_CHECKPOINT_STABLE_SCALARS = (bool, int, float, str)
# Exact `bytes` converts to a Java byte[]; bytearray and bytes-subclasses are PyObject-
# wrapped, and the exact-type check excludes both for free.
_CHECKPOINT_STABLE_SCALARS = (bool, int, float, str, bytes)


def validate_memory_value(path: str, value: Any) -> None:
Expand All @@ -45,7 +47,7 @@ def validate_memory_value(path: str, value: Any) -> None:
The memory path the value is being set at, used to build the error breadcrumb.
value: Any
The value to validate. Must be recursively composed of None, bool, int, float,
str, list, or dict with str keys.
str, bytes, list, or dict with str keys.
"""
_validate(value, f"value at memory path {path!r}")

Expand Down Expand Up @@ -77,8 +79,9 @@ def _validate(value: Any, where: str) -> None:
msg = (
f"{where} has type {type(value).__name__!r}, which is not checkpoint-stable. "
f"Python memory values must be recursively composed of None, bool, int, float, "
f"str, list, or dict with str keys, because they cross the Pemja boundary into "
f"Flink state and non-primitive objects cannot be safely checkpointed/restored. "
f"str, bytes, list, or dict with str keys, because they cross the Pemja boundary "
f"into Flink state and non-primitive objects cannot be safely "
f"checkpointed/restored. "
f"Materialize it first, e.g. str(value) for a UUID, value.model_dump(mode='json')"
f" for a Pydantic model, or list(value) for a tuple/set."
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,19 @@ class _Plain:
pass


class _BytesSub(bytes):
pass


def test_accepts_none_and_scalars() -> None:
for value in (None, True, False, 0, 1, -3, 3.14, "", "hello"):
for value in (None, True, False, 0, 1, -3, 3.14, "", "hello", b"", b"hello"):
validate_memory_value("p", value)


def test_accepts_nested_list_and_dict() -> None:
validate_memory_value("p", [1, "a", [2, 3], {"k": [4, None]}])
validate_memory_value("p", {"a": 1, "b": {"c": [True, "x"]}})
validate_memory_value("p", [b"x", {"k": b"y"}])


def test_rejects_pydantic_model() -> None:
Expand All @@ -67,6 +72,14 @@ def test_rejects_tuple_set_frozenset() -> None:
validate_memory_value("p", value)


def test_rejects_bytearray_and_bytes_subclass() -> None:
# Exact `bytes` is accepted, but bytearray and bytes-subclasses are PyObject-wrapped
# by Pemja; the exact-type check must reject them.
for value in (bytearray(b"x"), _BytesSub(b"x")):
with pytest.raises(TypeError, match="not checkpoint-stable"):
validate_memory_value("p", value)


def test_rejects_str_enum() -> None:
# str-Enum passes isinstance(str) but is PyObject-wrapped by Pemja; the
# exact-type check must reject it.
Expand Down
Loading