Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions api/src/main/java/org/apache/flink/agents/api/EventType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.agents.api;

/**
* Compile-time constants for built-in event types, sourced from each {@code XxxEvent.EVENT_TYPE}.
*
* <p>Usage: {@code @Action(EventType.InputEvent)}.
*/
public final class EventType {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reconsider the EventType aggregation class

EventType.InputEvent duplicates InputEvent.EVENT_TYPE — two sources of truth for the same value. The register() / lookup() / lookupOrSelf() registry has zero callers in production code today.

InputEvent.EVENT_TYPE is already self-documenting and lives in its natural namespace. Suggest deferring the registry until the CEL PR actually needs it, and just using the event class constants directly.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I've refactored EventType to be a pure namespace of constants — each one simply re-exports the value from the corresponding event class (e.g., EventType.InputEvent = _InputEvent.EVENT_TYPE), so there's a single source of truth. The register() / lookup() / lookupOrSelf() registry has been removed from this PR and will be introduced later when the CEL PR actually requires dynamic resolution.

Meanwhile, I've also slimmed down the EventType class by removing the internal utility functions — their concrete implementations are deferred to a follow-up PR.


public static final String InputEvent = org.apache.flink.agents.api.InputEvent.EVENT_TYPE;
public static final String OutputEvent = org.apache.flink.agents.api.OutputEvent.EVENT_TYPE;
public static final String ChatRequestEvent =
org.apache.flink.agents.api.event.ChatRequestEvent.EVENT_TYPE;
public static final String ChatResponseEvent =
org.apache.flink.agents.api.event.ChatResponseEvent.EVENT_TYPE;
public static final String ToolRequestEvent =
org.apache.flink.agents.api.event.ToolRequestEvent.EVENT_TYPE;
public static final String ToolResponseEvent =
org.apache.flink.agents.api.event.ToolResponseEvent.EVENT_TYPE;
public static final String ContextRetrievalRequestEvent =
org.apache.flink.agents.api.event.ContextRetrievalRequestEvent.EVENT_TYPE;
public static final String ContextRetrievalResponseEvent =
org.apache.flink.agents.api.event.ContextRetrievalResponseEvent.EVENT_TYPE;

private EventType() {}
}
22 changes: 13 additions & 9 deletions api/src/main/java/org/apache/flink/agents/api/agents/Agent.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,43 +56,47 @@ public Map<ResourceType, Map<String, Object>> getResources() {
/**
* Add action to agent.
*
* @param eventTypes The event type strings this action listens to.
* @param triggerConditions Trigger condition strings — each is either an event-type name or a
* future condition-expression form.
* @param method The method of this action, should be static method.
* @param config The optional config can be used by this action.
*/
public Agent addAction(
String[] eventTypes, Method method, @Nullable Map<String, Object> config) {
return addAction(method.getName(), eventTypes, JavaFunction.fromMethod(method), config);
String[] triggerConditions, Method method, @Nullable Map<String, Object> config) {
return addAction(
method.getName(), triggerConditions, JavaFunction.fromMethod(method), config);
}

/**
* Add action to agent.
*
* @param eventTypes The event type strings this action listens to.
* @param triggerConditions Trigger condition strings — each is either an event-type name or a
* future condition-expression form.
* @param method The method of this action, should be static method.
*/
public Agent addAction(String[] eventTypes, Method method) {
return addAction(eventTypes, method, null);
public Agent addAction(String[] triggerConditions, Method method) {
return addAction(triggerConditions, method, null);
}

/**
* Add action to agent.
*
* @param name The action name. Must be unique within this agent.
* @param eventTypes The event type strings this action listens to.
* @param triggerConditions Trigger condition strings — each is either an event-type name or a
* future condition-expression form.
* @param function The api-layer function descriptor; will be promoted to a plan-layer
* executable at {@code AgentPlan} construction.
* @param config Optional config for this action.
*/
public Agent addAction(
String name,
String[] eventTypes,
String[] triggerConditions,
Function function,
@Nullable Map<String, Object> config) {
if (actions.containsKey(name)) {
throw new IllegalArgumentException(String.format("Action %s already defined.", name));
}
actions.put(name, new Tuple3<>(eventTypes, function, config));
actions.put(name, new Tuple3<>(triggerConditions, function, config));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.ClassUtils;
import org.apache.flink.agents.api.Event;
import org.apache.flink.agents.api.EventType;
import org.apache.flink.agents.api.InputEvent;
import org.apache.flink.agents.api.OutputEvent;
import org.apache.flink.agents.api.annotation.Action;
Expand Down Expand Up @@ -168,7 +169,7 @@ public static void startAction(Event event, RunnerContext ctx) {
ctx.sendEvent(new ChatRequestEvent(DEFAULT_CHAT_MODEL, inputMessages, outputSchema));
}

@Action(listenEventTypes = {ChatResponseEvent.EVENT_TYPE})
@Action(EventType.ChatResponseEvent)
public static void stopAction(Event event, RunnerContext ctx) {
ChatResponseEvent chatResponse = ChatResponseEvent.fromEvent(event);
ChatMessage response = chatResponse.getResponse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,24 @@
import java.lang.annotation.Target;

/**
* Annotation for marking a method as an agent action.
* Marks a method as an agent action triggered by matching events.
*
* <p>This annotation specifies which event types the action should respond to. The annotated method
* will be triggered when any of the specified event types occur.
* <p>Each {@link #value()} entry is an event type name string. Use the {@code EVENT_TYPE} constants
* on built-in event classes, the {@link org.apache.flink.agents.api.EventType} constants, or plain
* strings for custom events. Multiple entries combine with OR.
*
* <p>Events are specified as type strings via {@link #listenEventTypes()}. Use the {@code
* EVENT_TYPE} constants on built-in event classes for standard events, or plain strings for custom
* events.
* <pre>{@code
* // Built-in event type via the EventType constant
* @Action(EventType.InputEvent)
*
* <p>Example usage:
* // Equivalent via the legacy class constant
* @Action(InputEvent.EVENT_TYPE)
*
* <pre>{@code
* @Action(listenEventTypes = {InputEvent.EVENT_TYPE})
* public void handleInput(Event event, RunnerContext ctx) { ... }
* // User-defined event type
* @Action("MyCustomEvent")
*
* @Action(listenEventTypes = {InputEvent.EVENT_TYPE, "MyCustomEvent"})
* public void handleMultiple(Event event, RunnerContext ctx) { ... }
* // Multiple types (OR semantics)
* @Action({EventType.InputEvent, "MyCustomEvent"})
* }</pre>
*
* <p>For a cross-language action, set {@link #target()} to a {@link PythonFunction} with a
Expand All @@ -49,7 +50,7 @@
*
* <pre>{@code
* @Action(
* listenEventTypes = {InputEvent.EVENT_TYPE},
* value = EventType.InputEvent,
* target = @PythonFunction(module = "my_pkg.handlers", qualname = "handle_input"))
* public void handleInput(Event event, RunnerContext ctx) {
* throw new UnsupportedOperationException("cross-language stub");
Expand All @@ -60,11 +61,11 @@
@Retention(RetentionPolicy.RUNTIME)
public @interface Action {
/**
* List of event type strings that this action should respond to.
*
* @return Array of event type strings
* Event type name strings; multiple entries have OR semantics. Named {@code value} (not {@code
* triggerConditions}) to enable the {@code @Action({...})} shorthand (JLS §9.7.3); corresponds
* to Python's {@code *trigger_conditions}.
*/
String[] listenEventTypes();
String[] value();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, we use value as the parameter name, whereas Python’s Action decorator uses trigger_conditions.

I understand this discrepancy arises because in Java annotations, only a member named value can be used without explicitly specifying the parameter name during declaration.

Therefore, to improve usability for users, I believe this inconsistency is acceptable. However, we should probably explain this in the comments.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point — added a one-sentence note on explaining:

  • (1) The JLS §9.7.3 reason for the name.
  • (2) Its mapping to Python's *trigger_conditions.
    Kept it tight to avoid bloating the annotation javadoc.


/**
* Cross-language target. When {@link PythonFunction#module()} is non-empty, dispatch routes to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,9 +430,11 @@ static Function resolveActionFunction(ActionSpec action) {
/** Register an action on the agent with event aliases resolved. */
static void addActionToAgent(Agent agent, ActionSpec action) {
Function fn = resolveActionFunction(action);
String[] events =
action.getListenTo().stream().map(Aliases::resolveEventType).toArray(String[]::new);
String[] triggerConditions =
action.getTriggerConditions().stream()
.map(Aliases::resolveEventType)
.toArray(String[]::new);
Map<String, Object> config = action.getConfig();
agent.addAction(action.getName(), events, fn, config);
agent.addAction(action.getName(), triggerConditions, fn, config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,34 @@
import java.util.List;
import java.util.Map;

/** Action referencing a user function plus the event types it listens to. */
/**
* Action referencing a user function plus its trigger conditions.
*
* <p>Each entry in {@code trigger_conditions} is either an event-type name (bare identifier) or a
* future condition-expression form — the runtime classifies the string when it loads the plan.
*/
@JsonIgnoreProperties(ignoreUnknown = false)
public final class ActionSpec {
private final String name;
private final String function;
private final List<String> listenTo;
private final List<String> triggerConditions;
private final Map<String, Object> config;
private final Language type;

@JsonCreator
public ActionSpec(
@JsonProperty(value = "name", required = true) String name,
@JsonProperty("function") String function,
@JsonProperty(value = "listen_to", required = true) List<String> listenTo,
@JsonProperty(value = "trigger_conditions", required = true)
List<String> triggerConditions,
@JsonProperty("config") Map<String, Object> config,
@JsonProperty("type") Language type) {
if (listenTo == null || listenTo.isEmpty()) {
throw new IllegalArgumentException("listen_to must not be empty");
if (triggerConditions == null || triggerConditions.isEmpty()) {
throw new IllegalArgumentException("trigger_conditions must not be empty");
}
this.name = name;
this.function = function;
this.listenTo = listenTo;
this.triggerConditions = triggerConditions;
this.config = config;
this.type = type;
}
Expand All @@ -60,8 +66,8 @@ public String getFunction() {
return function;
}

public List<String> getListenTo() {
return listenTo;
public List<String> getTriggerConditions() {
return triggerConditions;
}

public Map<String, Object> getConfig() {
Expand Down
47 changes: 47 additions & 0 deletions api/src/test/java/org/apache/flink/agents/api/EventTypeTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.agents.api;

import org.apache.flink.agents.api.event.ChatRequestEvent;
import org.apache.flink.agents.api.event.ChatResponseEvent;
import org.apache.flink.agents.api.event.ContextRetrievalRequestEvent;
import org.apache.flink.agents.api.event.ContextRetrievalResponseEvent;
import org.apache.flink.agents.api.event.ToolRequestEvent;
import org.apache.flink.agents.api.event.ToolResponseEvent;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;

/** Tests for {@link EventType}. */
class EventTypeTest {

@Test
void builtInConstantsMatchEventClassConstants() {
assertEquals(InputEvent.EVENT_TYPE, EventType.InputEvent);
assertEquals(OutputEvent.EVENT_TYPE, EventType.OutputEvent);
assertEquals(ChatRequestEvent.EVENT_TYPE, EventType.ChatRequestEvent);
assertEquals(ChatResponseEvent.EVENT_TYPE, EventType.ChatResponseEvent);
assertEquals(ToolRequestEvent.EVENT_TYPE, EventType.ToolRequestEvent);
assertEquals(ToolResponseEvent.EVENT_TYPE, EventType.ToolResponseEvent);
assertEquals(
ContextRetrievalRequestEvent.EVENT_TYPE, EventType.ContextRetrievalRequestEvent);
assertEquals(
ContextRetrievalResponseEvent.EVENT_TYPE, EventType.ContextRetrievalResponseEvent);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ void actionDefaultsToPython(@TempDir Path tmp) throws Exception {
+ " actions:\n"
+ " - name: act\n"
+ " function: pkg.mod:fn\n"
+ " listen_to: [input]\n");
+ " trigger_conditions: [input]\n");
LoadedFile out = YamlLoader.buildAgents(file);
Tuple3<String[], Function, Map<String, Object>> entry =
out.getAgents().get("a").getActions().get("act");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,31 @@ class ActionSpecTest {
@Test
void minimal() throws Exception {
ActionSpec spec =
M.readValue("name: a\nfunction: pkg:fn\nlisten_to: [input]\n", ActionSpec.class);
M.readValue(
"name: a\nfunction: pkg:fn\ntrigger_conditions: [input]\n",
ActionSpec.class);
assertThat(spec.getName()).isEqualTo("a");
assertThat(spec.getFunction()).isEqualTo("pkg:fn");
assertThat(spec.getListenTo()).containsExactly("input");
assertThat(spec.getTriggerConditions()).containsExactly("input");
assertThat(spec.getConfig()).isNull();
assertThat(spec.getType()).isNull();
}

@Test
void rejectsEmptyListenTo() {
void rejectsEmptyTriggerConditions() {
assertThatThrownBy(
() ->
M.readValue(
"name: a\nfunction: x:y\nlisten_to: []\n",
"name: a\nfunction: x:y\ntrigger_conditions: []\n",
ActionSpec.class))
.hasMessageContaining("listen_to");
.hasMessageContaining("trigger_conditions");
}

@Test
void typeJava() throws Exception {
ActionSpec spec =
M.readValue(
"name: a\nfunction: X:m\nlisten_to: [input]\ntype: java\n",
"name: a\nfunction: X:m\ntrigger_conditions: [input]\ntype: java\n",
ActionSpec.class);
assertThat(spec.getType()).isEqualTo(Language.JAVA);
}
Expand All @@ -64,7 +66,7 @@ void rejectsUnknownProperty() {
assertThatThrownBy(
() ->
M.readValue(
"name: a\nfunction: x:y\nlisten_to: [input]\nextra: 1\n",
"name: a\nfunction: x:y\ntrigger_conditions: [input]\nextra: 1\n",
ActionSpec.class))
.hasMessageContaining("extra");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ void actionEntryCanBeStringOrFullSpec() throws Exception {
+ " - shared_one\n"
+ " - name: own\n"
+ " function: pkg:fn\n"
+ " listen_to: [input]\n";
+ " trigger_conditions: [input]\n";
AgentSpec spec = M.readValue(yaml, AgentSpec.class);
assertThat(spec.getActions()).hasSize(2);
assertThat(spec.getActions().get(0).isReference()).isTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ void sharedSectionsAtFileLevel() throws Exception {
String yaml =
"agents:\n - name: a\n"
+ "chat_model_connections:\n - name: shared\n clazz: x.Y\n"
+ "actions:\n - name: shared_a\n function: pkg:fn\n listen_to: [input]\n";
+ "actions:\n - name: shared_a\n function: pkg:fn\n trigger_conditions: [input]\n";
YamlAgentsDocument doc = M.readValue(yaml, YamlAgentsDocument.class);
assertThat(doc.getChatModelConnections()).hasSize(1);
assertThat(doc.getActions()).hasSize(1);
Expand Down
4 changes: 2 additions & 2 deletions api/src/test/resources/yaml/fixtures/dup_agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ agents:
- name: increment
function: org.apache.flink.agents.api.yaml.LoaderTargets:increment
type: java
listen_to: [input]
trigger_conditions: [input]
- name: dup
actions:
- name: decrement
function: org.apache.flink.agents.api.yaml.LoaderTargets:decrement
type: java
listen_to: [input]
trigger_conditions: [input]
4 changes: 2 additions & 2 deletions api/src/test/resources/yaml/fixtures/multi_agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ agents:
- name: increment
function: org.apache.flink.agents.api.yaml.LoaderTargets:increment
type: java
listen_to: [input]
trigger_conditions: [input]
- name: a2
actions:
- name: decrement
function: org.apache.flink.agents.api.yaml.LoaderTargets:decrement
type: java
listen_to: [input]
trigger_conditions: [input]
Loading
Loading