From 0b51c5940b7035de3bf1a725291ddb4f30cc2b32 Mon Sep 17 00:00:00 2001 From: rosemaryYuan <91046107+rosemarYuan@users.noreply.github.com> Date: Fri, 3 Jul 2026 21:28:35 +0800 Subject: [PATCH] [api][runtime][test] Add @JsonCreator to built-in event subclasses for ActionStateSerde durable recovery --- .../agents/api/event/ChatRequestEvent.java | 7 +- .../agents/api/event/ChatResponseEvent.java | 7 +- .../event/ContextRetrievalRequestEvent.java | 7 +- .../event/ContextRetrievalResponseEvent.java | 7 +- .../agents/api/event/ToolRequestEvent.java | 7 +- .../agents/api/event/ToolResponseEvent.java | 7 +- .../actionstate/ActionStateSerdeTest.java | 70 +++++++++++++++++++ 7 files changed, 106 insertions(+), 6 deletions(-) diff --git a/api/src/main/java/org/apache/flink/agents/api/event/ChatRequestEvent.java b/api/src/main/java/org/apache/flink/agents/api/event/ChatRequestEvent.java index a9a49e685..1dab26a44 100644 --- a/api/src/main/java/org/apache/flink/agents/api/event/ChatRequestEvent.java +++ b/api/src/main/java/org/apache/flink/agents/api/event/ChatRequestEvent.java @@ -18,7 +18,9 @@ package org.apache.flink.agents.api.event; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.agents.api.Event; import org.apache.flink.agents.api.chat.messages.ChatMessage; @@ -62,7 +64,10 @@ public ChatRequestEvent(String model, List messages) { this(model, messages, null, null); } - public ChatRequestEvent(UUID id, Map attributes) { + @JsonCreator + public ChatRequestEvent( + @JsonProperty("id") UUID id, + @JsonProperty("attributes") Map attributes) { super(id, EVENT_TYPE, attributes); } diff --git a/api/src/main/java/org/apache/flink/agents/api/event/ChatResponseEvent.java b/api/src/main/java/org/apache/flink/agents/api/event/ChatResponseEvent.java index 12402986a..d68fda3ca 100644 --- a/api/src/main/java/org/apache/flink/agents/api/event/ChatResponseEvent.java +++ b/api/src/main/java/org/apache/flink/agents/api/event/ChatResponseEvent.java @@ -18,7 +18,9 @@ package org.apache.flink.agents.api.event; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.agents.api.Event; import org.apache.flink.agents.api.chat.messages.ChatMessage; @@ -46,7 +48,10 @@ public ChatResponseEvent( setAttr("total_retry_wait_sec", totalRetryWaitSec); } - public ChatResponseEvent(UUID id, Map attributes) { + @JsonCreator + public ChatResponseEvent( + @JsonProperty("id") UUID id, + @JsonProperty("attributes") Map attributes) { super(id, EVENT_TYPE, attributes); } diff --git a/api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalRequestEvent.java b/api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalRequestEvent.java index 2ee8ae01c..387319c95 100644 --- a/api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalRequestEvent.java +++ b/api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalRequestEvent.java @@ -18,7 +18,9 @@ package org.apache.flink.agents.api.event; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.flink.agents.api.Event; import java.util.HashMap; @@ -43,7 +45,10 @@ public ContextRetrievalRequestEvent(String query, String vectorStore, int maxRes setAttr("max_results", maxResults); } - public ContextRetrievalRequestEvent(UUID id, Map attributes) { + @JsonCreator + public ContextRetrievalRequestEvent( + @JsonProperty("id") UUID id, + @JsonProperty("attributes") Map attributes) { super(id, EVENT_TYPE, attributes); } diff --git a/api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalResponseEvent.java b/api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalResponseEvent.java index 35861ce2d..37b4a76fc 100644 --- a/api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalResponseEvent.java +++ b/api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalResponseEvent.java @@ -18,7 +18,9 @@ package org.apache.flink.agents.api.event; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.agents.api.Event; import org.apache.flink.agents.api.vectorstores.Document; @@ -43,7 +45,10 @@ public ContextRetrievalResponseEvent(UUID requestId, String query, List(documents)); } - public ContextRetrievalResponseEvent(UUID id, Map attributes) { + @JsonCreator + public ContextRetrievalResponseEvent( + @JsonProperty("id") UUID id, + @JsonProperty("attributes") Map attributes) { super(id, EVENT_TYPE, attributes); } diff --git a/api/src/main/java/org/apache/flink/agents/api/event/ToolRequestEvent.java b/api/src/main/java/org/apache/flink/agents/api/event/ToolRequestEvent.java index c14991ba4..9a24ef7ea 100644 --- a/api/src/main/java/org/apache/flink/agents/api/event/ToolRequestEvent.java +++ b/api/src/main/java/org/apache/flink/agents/api/event/ToolRequestEvent.java @@ -18,7 +18,9 @@ package org.apache.flink.agents.api.event; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.flink.agents.api.Event; import java.util.HashMap; @@ -37,7 +39,10 @@ public ToolRequestEvent(String model, List> toolCalls) { setAttr("tool_calls", toolCalls); } - public ToolRequestEvent(UUID id, Map attributes) { + @JsonCreator + public ToolRequestEvent( + @JsonProperty("id") UUID id, + @JsonProperty("attributes") Map attributes) { super(id, EVENT_TYPE, attributes); } diff --git a/api/src/main/java/org/apache/flink/agents/api/event/ToolResponseEvent.java b/api/src/main/java/org/apache/flink/agents/api/event/ToolResponseEvent.java index 498ea3f07..eb5d639f3 100644 --- a/api/src/main/java/org/apache/flink/agents/api/event/ToolResponseEvent.java +++ b/api/src/main/java/org/apache/flink/agents/api/event/ToolResponseEvent.java @@ -18,7 +18,9 @@ package org.apache.flink.agents.api.event; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.agents.api.Event; import org.apache.flink.agents.api.tools.ToolResponse; @@ -57,7 +59,10 @@ public ToolResponseEvent( this(requestId, responses, success, error, Map.of()); } - public ToolResponseEvent(UUID id, Map attributes) { + @JsonCreator + public ToolResponseEvent( + @JsonProperty("id") UUID id, + @JsonProperty("attributes") Map attributes) { super(id, EVENT_TYPE, attributes); } diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/ActionStateSerdeTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/ActionStateSerdeTest.java index e0c4de0bf..d9c76a5cf 100644 --- a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/ActionStateSerdeTest.java +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/ActionStateSerdeTest.java @@ -20,7 +20,17 @@ import org.apache.flink.agents.api.Event; import org.apache.flink.agents.api.InputEvent; import org.apache.flink.agents.api.OutputEvent; +import org.apache.flink.agents.api.chat.messages.ChatMessage; +import org.apache.flink.agents.api.chat.messages.MessageRole; import org.apache.flink.agents.api.context.MemoryUpdate; +import org.apache.flink.agents.api.event.ChatRequestEvent; +import org.apache.flink.agents.api.event.ChatResponseEvent; +import org.apache.flink.agents.api.event.ContextRetrievalRequestEvent; +import org.apache.flink.agents.api.event.ContextRetrievalResponseEvent; +import org.apache.flink.agents.api.event.ToolRequestEvent; +import org.apache.flink.agents.api.event.ToolResponseEvent; +import org.apache.flink.agents.api.tools.ToolResponse; +import org.apache.flink.agents.api.vectorstores.Document; import org.junit.jupiter.api.Test; import java.nio.charset.StandardCharsets; @@ -29,6 +39,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import static org.junit.jupiter.api.Assertions.*; @@ -333,4 +344,63 @@ public void testKafkaSederDelegatesToActionStateSerde() throws Exception { assertNull(kafkaSeder.serialize("test-topic", null)); assertNull(kafkaSeder.deserialize("test-topic", null)); } + + @Test + public void testBuiltinChatToolAndContextEventsRoundTripThroughOutputEvents() throws Exception { + ChatMessage msg = new ChatMessage(MessageRole.USER, "hello"); + UUID requestId = UUID.randomUUID(); + Document doc = new Document("doc content", Map.of("source", "unit-test"), "doc-1"); + + // Built-in events are persisted both as the triggering taskEvent and as + // outputEvents; cover both paths. + ActionState originalState = new ActionState(new ChatRequestEvent("myModel", List.of(msg))); + originalState.addEvent(new ChatRequestEvent("myModel", List.of(msg))); + originalState.addEvent(new ChatResponseEvent(requestId, msg)); + originalState.addEvent(new ToolRequestEvent("myModel", List.of(Map.of("name", "myTool")))); + originalState.addEvent( + new ToolResponseEvent( + requestId, + Map.of("call-1", ToolResponse.success("result")), + Map.of("call-1", true), + Map.of())); + originalState.addEvent(new ContextRetrievalRequestEvent("query text", "myVectorStore", 5)); + originalState.addEvent( + new ContextRetrievalResponseEvent(requestId, "query text", List.of(doc))); + + byte[] serialized = ActionStateSerde.serialize(originalState); + ActionState deserializedState = ActionStateSerde.deserialize(serialized); + + assertEquals(ChatRequestEvent.class, deserializedState.getTaskEvent().getClass()); + + List outputEvents = deserializedState.getOutputEvents(); + assertEquals(6, outputEvents.size()); + assertEquals(ChatRequestEvent.class, outputEvents.get(0).getClass()); + assertEquals(ChatResponseEvent.class, outputEvents.get(1).getClass()); + assertEquals(ToolRequestEvent.class, outputEvents.get(2).getClass()); + assertEquals(ToolResponseEvent.class, outputEvents.get(3).getClass()); + assertEquals(ContextRetrievalRequestEvent.class, outputEvents.get(4).getClass()); + assertEquals(ContextRetrievalResponseEvent.class, outputEvents.get(5).getClass()); + + // Replayed events are consumed through fromEvent() (see ChatModelAction, + // ToolCallAction, ContextRetrievalAction), which must restore nested + // attributes degraded to raw maps by the JSON round-trip back to their + // typed forms. + ChatRequestEvent chatRequest = ChatRequestEvent.fromEvent(outputEvents.get(0)); + assertEquals("hello", chatRequest.getMessages().get(0).getContent()); + assertEquals(MessageRole.USER, chatRequest.getMessages().get(0).getRole()); + + ChatResponseEvent chatResponse = ChatResponseEvent.fromEvent(outputEvents.get(1)); + assertEquals(requestId, chatResponse.getRequestId()); + assertEquals("hello", chatResponse.getResponse().getContent()); + + ToolResponseEvent toolResponse = ToolResponseEvent.fromEvent(outputEvents.get(3)); + assertEquals(requestId, toolResponse.getRequestId()); + assertEquals("result", toolResponse.getResponses().get("call-1").getResult()); + + ContextRetrievalResponseEvent retrievalResponse = + ContextRetrievalResponseEvent.fromEvent(outputEvents.get(5)); + assertEquals(requestId, retrievalResponse.getRequestId()); + assertEquals("doc content", retrievalResponse.getDocuments().get(0).getContent()); + assertEquals("doc-1", retrievalResponse.getDocuments().get(0).getId()); + } }