diff --git a/docs/content/docs/development/workflow_agent.md b/docs/content/docs/development/workflow_agent.md index d15b6e923..b8b7e860d 100644 --- a/docs/content/docs/development/workflow_agent.md +++ b/docs/content/docs/development/workflow_agent.md @@ -543,14 +543,9 @@ from flink_agents.api.function import JavaFunction class MyAgent(Agent): @action( InputEvent.EVENT_TYPE, - target=JavaFunction( - qualname="com.example.MyHandlers", - method_name="handleInput", - parameter_types=[ - "org.apache.flink.agents.api.Event", - "org.apache.flink.agents.api.context.RunnerContext", - ], - ), + # Action signatures are fixed (Event, RunnerContext), so for_action + # fills the Java parameter types for you — only the class and method. + target=JavaFunction.for_action("com.example.MyHandlers", "handleInput"), ) @staticmethod def handle_input(event: Event, ctx: RunnerContext) -> None: diff --git a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/YamlActionsInPythonCrossLanguageTest.java b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/YamlActionsInPythonCrossLanguageTest.java new file mode 100644 index 000000000..444129806 --- /dev/null +++ b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/YamlActionsInPythonCrossLanguageTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.resource.test; + +import org.apache.flink.agents.api.AgentsExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.CloseableIterator; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URL; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import static org.apache.flink.agents.resource.test.ChatModelCrossLanguageAgent.OLLAMA_MODEL; +import static org.apache.flink.agents.resource.test.CrossLanguageTestPreparationUtils.pullModel; + +/** + * End-to-end test for the Java YAML loader with cross-language orchestration actions: a Java host + * loads an agent whose user actions {@code process_input} / {@code process_chat_response} are + * {@code type: python}, while the Java-native built-in chat loop bridges ChatRequest→Response. + * Companion to {@link YamlCrossLanguageTest} (Java actions + Python tool) — here the actions cross + * languages and the built-ins stay native. The math path still drives a cross-language Python tool + * ({@code calculate_bmi}). + */ +public class YamlActionsInPythonCrossLanguageTest { + + private static final Logger LOG = + LoggerFactory.getLogger(YamlActionsInPythonCrossLanguageTest.class); + + private final boolean ollamaReady; + + public YamlActionsInPythonCrossLanguageTest() throws IOException { + ollamaReady = pullModel(OLLAMA_MODEL); + } + + @Test + public void testYamlPythonActionsOnJavaHost() throws Exception { + Assumptions.assumeTrue(ollamaReady, "Ollama Server information is not provided"); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataStream inputStream = + env.fromData( + "Calculate BMI for someone who is 1.75 meters tall and weighs 70 kg", + "Tell me a joke about cats."); + + AgentsExecutionEnvironment agentsEnv = + AgentsExecutionEnvironment.getExecutionEnvironment(env); + agentsEnv.loadYaml(yamlFixture("yaml_cross_language_actions_in_python.yaml")); + + DataStream outputStream = + agentsEnv + .fromDataStream( + inputStream, (KeySelector) value -> "orderKey") + .apply("yaml_actions_in_python_agent") + .toDataStream(); + + CloseableIterator results = outputStream.collectAsync(); + agentsEnv.execute(); + + List responses = new ArrayList<>(); + while (results.hasNext()) { + responses.add(String.valueOf(results.next())); + } + LOG.info("Python-action cross-language YAML agent responses: {}", responses); + + Assertions.assertEquals( + 2, responses.size(), "expected 2 responses, got " + responses.size()); + + String joined = String.join("\n", responses).toLowerCase(); + Assertions.assertTrue( + joined.contains("22"), String.format("math answer missing '22': %s", responses)); + Assertions.assertTrue( + joined.contains("cat"), + String.format("creative answer missing 'cat': %s", responses)); + } + + private static Path yamlFixture(String name) { + URL resource = + YamlActionsInPythonCrossLanguageTest.class + .getClassLoader() + .getResource("yaml/" + name); + Objects.requireNonNull(resource, "fixture not found on classpath: yaml/" + name); + return Paths.get(resource.getPath()); + } +} diff --git a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/resources/yaml/yaml_cross_language_actions_in_python.yaml b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/resources/yaml/yaml_cross_language_actions_in_python.yaml new file mode 100644 index 000000000..615489807 --- /dev/null +++ b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/resources/yaml/yaml_cross_language_actions_in_python.yaml @@ -0,0 +1,47 @@ +agents: + - name: yaml_actions_in_python_agent + description: | + Java-host cross-language YAML e2e agent whose orchestration actions are + Python. Complements yaml_cross_language_agent (Java actions + Python tool): + here ``process_input`` / ``process_chat_response`` are ``type: python`` and + dispatch to ``yaml_cross_language_actions``, while the Java-native built-in + chat loop bridges ChatRequest→Response. The math path still exercises a + cross-language Python tool (calculate_bmi). + + chat_model_connections: + - name: ollama_connection_java + clazz: ollama + type: java + endpoint: http://localhost:11434 + requestTimeout: 240 + - name: ollama_connection_python + clazz: ollama + request_timeout: 240 + + chat_model_setups: + - name: math_chat_model + clazz: ollama + type: java + connection: ollama_connection_java + model: qwen3:1.7b + tools: [calculate_bmi] + extract_reasoning: true + - name: creative_chat_model + clazz: ollama + connection: ollama_connection_python + model: qwen3:1.7b + extract_reasoning: true + + tools: + - name: calculate_bmi + function: flink_agents.e2e_tests.e2e_tests_resource_cross_language.yaml_cross_language_actions:calculate_bmi + + actions: + - name: process_input + type: python + function: flink_agents.e2e_tests.e2e_tests_resource_cross_language.yaml_cross_language_actions:process_input + listen_to: [input] + - name: process_chat_response + type: python + function: flink_agents.e2e_tests.e2e_tests_resource_cross_language.yaml_cross_language_actions:process_chat_response + listen_to: [chat_response] diff --git a/python/flink_agents/api/function.py b/python/flink_agents/api/function.py index b5597664b..d95375a40 100644 --- a/python/flink_agents/api/function.py +++ b/python/flink_agents/api/function.py @@ -25,10 +25,18 @@ import importlib import inspect from abc import ABC -from typing import Any, Callable, List +from typing import Any, Callable, List, Tuple from pydantic import BaseModel, model_serializer +#: Java parameter types of an action method. Action signatures are fixed +#: ``(Event, RunnerContext)``, so callers never have to spell them out. A tuple +#: keeps the shared constant immutable; callers copy it into their own list. +ACTION_PARAMETER_TYPES: Tuple[str, ...] = ( + "org.apache.flink.agents.api.Event", + "org.apache.flink.agents.api.context.RunnerContext", +) + class Function(BaseModel, ABC): """Marker base class for function descriptors. Pure data — has no @@ -99,6 +107,20 @@ class JavaFunction(Function): method_name: str parameter_types: List[str] + @classmethod + def for_action(cls, qualname: str, method_name: str) -> "JavaFunction": + """Build a descriptor for a Java action, filling the fixed signature. + + Actions always take ``(Event, RunnerContext)``, so ``parameter_types`` + is implied — mirrors the YAML API, which omits it for ``type: java`` + actions. Tools must still pass ``parameter_types`` to the constructor. + """ + return cls( + qualname=qualname, + method_name=method_name, + parameter_types=list(ACTION_PARAMETER_TYPES), + ) + @model_serializer def __serialize(self) -> dict[str, Any]: return { diff --git a/python/flink_agents/api/tests/test_agent_add_action.py b/python/flink_agents/api/tests/test_agent_add_action.py index 98b448b03..ee7e59f2b 100644 --- a/python/flink_agents/api/tests/test_agent_add_action.py +++ b/python/flink_agents/api/tests/test_agent_add_action.py @@ -30,14 +30,16 @@ def _dummy_action(event: Event, ctx: RunnerContext) -> None: def _make_java_function() -> JavaFunction: - return JavaFunction( - qualname="com.example.Handlers", - method_name="handle", - parameter_types=[ - "org.apache.flink.agents.api.Event", - "org.apache.flink.agents.api.context.RunnerContext", - ], - ) + return JavaFunction.for_action("com.example.Handlers", "handle") + + +def test_java_function_for_action_fills_fixed_action_signature() -> None: + """``for_action`` omits parameter_types boilerplate; YAML already does this.""" + jf = JavaFunction.for_action("com.example.Handlers", "handle") + assert jf.parameter_types == [ + "org.apache.flink.agents.api.Event", + "org.apache.flink.agents.api.context.RunnerContext", + ] # ── Descriptor pass-through ───────────────────────────────────────────── diff --git a/python/flink_agents/api/tests/test_decorators.py b/python/flink_agents/api/tests/test_decorators.py index 6a4b131fc..94ff8d86f 100644 --- a/python/flink_agents/api/tests/test_decorators.py +++ b/python/flink_agents/api/tests/test_decorators.py @@ -94,14 +94,7 @@ def bad_handler(event: Event, ctx: RunnerContext) -> None: def _java_target() -> JavaFunction: - return JavaFunction( - qualname="com.example.Handlers", - method_name="handle", - parameter_types=[ - "org.apache.flink.agents.api.Event", - "org.apache.flink.agents.api.context.RunnerContext", - ], - ) + return JavaFunction.for_action("com.example.Handlers", "handle") def test_action_decorator_with_cross_language_target() -> None: diff --git a/python/flink_agents/api/yaml/loader.py b/python/flink_agents/api/yaml/loader.py index 2a0ad28df..54e5938e2 100644 --- a/python/flink_agents/api/yaml/loader.py +++ b/python/flink_agents/api/yaml/loader.py @@ -29,7 +29,12 @@ from flink_agents.api.agents.agent import Agent from flink_agents.api.chat_message import ChatMessage, MessageRole -from flink_agents.api.function import Function, JavaFunction, PythonFunction +from flink_agents.api.function import ( + ACTION_PARAMETER_TYPES, + Function, + JavaFunction, + PythonFunction, +) from flink_agents.api.prompts.prompt import Prompt from flink_agents.api.resource import ResourceDescriptor, ResourceType from flink_agents.api.skills import Skills, SkillSourceSpec @@ -50,13 +55,6 @@ YamlAgentsDocument, ) -# Default Java parameter types for an action. Action methods in -# flink-agents always have signature (Event, RunnerContext). -_JAVA_ACTION_PARAMETER_TYPES: list[str] = [ - "org.apache.flink.agents.api.Event", - "org.apache.flink.agents.api.context.RunnerContext", -] - _DESCRIPTOR_TYPES: Dict[str, ResourceType] = { "chat_model_connections": ResourceType.CHAT_MODEL_CONNECTION, "chat_model_setups": ResourceType.CHAT_MODEL, @@ -159,7 +157,7 @@ def _add_descriptors_to_agent( def _resolve_action_function(action: ActionSpec) -> Function: - parameter_types = _JAVA_ACTION_PARAMETER_TYPES if action.type == "java" else None + parameter_types = ACTION_PARAMETER_TYPES if action.type == "java" else None return resolve_function( name=action.name, function=action.function, diff --git a/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/python_agent_with_java_action.py b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/python_agent_with_java_action.py index a59b46fed..f7fb2160b 100644 --- a/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/python_agent_with_java_action.py +++ b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/python_agent_with_java_action.py @@ -25,10 +25,6 @@ JAVA_HANDLER_QUALNAME = "org.apache.flink.agents.resource.test.JavaActionHandler" JAVA_HANDLER_METHOD = "multiplyByTwo" -JAVA_HANDLER_PARAMETER_TYPES = [ - "org.apache.flink.agents.api.Event", - "org.apache.flink.agents.api.context.RunnerContext", -] class PythonAgentWithJavaActionAgent(Agent): @@ -40,11 +36,7 @@ def __init__(self) -> None: self.add_action( name="multiply_by_two", events=[InputEvent.EVENT_TYPE], - func=JavaFunction( - qualname=JAVA_HANDLER_QUALNAME, - method_name=JAVA_HANDLER_METHOD, - parameter_types=JAVA_HANDLER_PARAMETER_TYPES, - ), + func=JavaFunction.for_action(JAVA_HANDLER_QUALNAME, JAVA_HANDLER_METHOD), ) diff --git a/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/yaml_actions_in_java_cross_language_test.py b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/yaml_actions_in_java_cross_language_test.py new file mode 100644 index 000000000..770d8505c --- /dev/null +++ b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/yaml_actions_in_java_cross_language_test.py @@ -0,0 +1,148 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +"""E2E test: Python-host agent whose orchestration actions are Java. + +Companion to ``yaml_cross_language_test.py`` (Python actions + Java tool). +Here ``process_input`` / ``process_chat_response`` are ``type: java`` and +dispatch to ``YamlCrossLanguageActions``; the Python-native built-in +``chat_model_action`` bridges the chat loop, and the math path calls the +cross-language Java ``calculateBMI`` tool. Exercises a Java user action → +Python built-in → Java user action round trip. + +Skipped when the Ollama client/model or the cross-language test-jar is +unavailable. +""" + +import os +import sysconfig +from pathlib import Path + +import pytest +from pyflink.common import Configuration, Encoder, WatermarkStrategy +from pyflink.common.typeinfo import Types +from pyflink.datastream import RuntimeExecutionMode, StreamExecutionEnvironment +from pyflink.datastream.connectors.file_system import ( + FileSource, + StreamFormat, + StreamingFileSink, +) + +from flink_agents.api.execution_environment import AgentsExecutionEnvironment +from flink_agents.e2e_tests.test_utils import ( + assert_tool_invoked, + collect_tool_invocations, + pull_model, +) + +current_dir = Path(__file__).parent +_RESOURCES = current_dir.parent / "resources" +_REPO_ROOT = current_dir.parent.parent.parent.parent +_TEST_JAR = ( + _REPO_ROOT + / "e2e-test" + / "flink-agents-end-to-end-tests-resource-cross-language" + / "target" + / "flink-agents-end-to-end-tests-resource-cross-language-0.3-SNAPSHOT-tests.jar" +) + +os.environ["PYTHONPATH"] = sysconfig.get_paths()["purelib"] + +OLLAMA_MODEL = os.environ.get("OLLAMA_CHAT_MODEL", "qwen3:1.7b") +_client = pull_model(OLLAMA_MODEL) + + +@pytest.mark.skipif( + _client is None, + reason="Ollama client is not available or test model is missing.", +) +@pytest.mark.skipif( + not _TEST_JAR.is_file(), + reason=( + "Cross-language test-jar is missing; run " + "'mvn package -DskipTests -pl e2e-test/" + "flink-agents-end-to-end-tests-resource-cross-language' first." + ), +) +def test_yaml_python_host_with_java_actions( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """``load_yaml`` agent whose process_* actions are Java, bridged by Python.""" + monkeypatch.setenv("OLLAMA_CHAT_MODEL", OLLAMA_MODEL) + config = Configuration() + config.set_string("python.pythonpath", sysconfig.get_paths()["purelib"]) + env = StreamExecutionEnvironment.get_execution_environment(config) + env.set_runtime_mode(RuntimeExecutionMode.STREAMING) + env.set_parallelism(1) + env.add_jars(f"file://{_TEST_JAR}") + + input_datastream = env.from_source( + source=FileSource.for_record_stream_format( + StreamFormat.text_line_format(), + f"file:///{_RESOURCES}/yaml_cross_language_input", + ).build(), + watermark_strategy=WatermarkStrategy.no_watermarks(), + source_name="yaml_actions_in_java_source", + ) + deserialize_datastream = input_datastream.map(lambda x: str(x)) + + agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env) + log_dir = tmp_path / "event_logs" + log_dir.mkdir(parents=True, exist_ok=True) + agents_env.get_config().set_str("baseLogDir", str(log_dir)) + agents_env.load_yaml(_RESOURCES / "yaml_cross_language_actions_in_java.yaml") + + output_datastream = ( + agents_env.from_datastream( + input=deserialize_datastream, key_selector=lambda x: "orderKey" + ) + .apply("yaml_actions_in_java_agent") + .to_datastream() + ) + + result_dir = tmp_path / "results" + result_dir.mkdir(parents=True, exist_ok=True) + output_datastream.map( + lambda x: str(x).replace("\n", "").replace("\r", ""), Types.STRING() + ).add_sink( + StreamingFileSink.for_row_format( + base_path=str(result_dir.absolute()), + encoder=Encoder.simple_string_encoder(), + ).build() + ) + + agents_env.execute() + + actual_result = [] + for file in result_dir.iterdir(): + if file.is_dir(): + for child in file.iterdir(): + with child.open() as f: + actual_result.extend(f.readlines()) + if file.is_file(): + with file.open() as f: + actual_result.extend(f.readlines()) + + # Math path went through the Java calculateBMI tool — cross-language tool. + assert_tool_invoked( + collect_tool_invocations(log_dir), + "calculateBMI", + {"weightKg": 70, "heightM": 1.75}, + ) + # Creative path uses no tool; its answer mentions a cat. + joined = "\n".join(actual_result).lower() + assert "cat" in joined, f"creative answer missing 'cat': {actual_result!r}" diff --git a/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/yaml_java_action_cross_language_test.py b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/yaml_java_action_cross_language_test.py new file mode 100644 index 000000000..37303c0fe --- /dev/null +++ b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/yaml_java_action_cross_language_test.py @@ -0,0 +1,107 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +"""E2E test: a YAML-declared agent whose action body is a Java method. + +Companion to ``yaml_cross_language_test.py`` (Java *tool*) — this covers +the Java *action* path. Parses ``resources/yaml_cross_language_java_action.yaml`` +via ``AgentsExecutionEnvironment.load_yaml`` and runs the declared agent. +The single ``type: java`` action dispatches into +``org.apache.flink.agents.resource.test.JavaActionHandler.multiplyByTwo``, +mirroring ``python_agent_with_java_action_test`` but driven by YAML. The +YAML omits ``parameter_types`` (actions have a fixed signature), so this +also guards the loader's auto-fill. No chat model is involved, so it runs +whenever the cross-language test-jar is present. +""" + +import os +import sysconfig +from pathlib import Path + +import pytest +from pyflink.common import Configuration, Encoder +from pyflink.common.typeinfo import Types +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.datastream.connectors.file_system import StreamingFileSink + +from flink_agents.api.execution_environment import AgentsExecutionEnvironment + +current_dir = Path(__file__).parent +_RESOURCES = current_dir.parent / "resources" +_REPO_ROOT = current_dir.parent.parent.parent.parent +_TEST_JAR = ( + _REPO_ROOT + / "e2e-test" + / "flink-agents-end-to-end-tests-resource-cross-language" + / "target" + / "flink-agents-end-to-end-tests-resource-cross-language-0.3-SNAPSHOT-tests.jar" +) + +os.environ["PYTHONPATH"] = sysconfig.get_paths()["purelib"] + + +@pytest.mark.skipif( + not _TEST_JAR.is_file(), + reason=( + "Cross-language test-jar is missing; run " + "'mvn package -DskipTests -pl e2e-test/" + "flink-agents-end-to-end-tests-resource-cross-language' first." + ), +) +def test_yaml_agent_dispatches_java_action_body(tmp_path: Path) -> None: + """``load_yaml`` → ``apply(by name)`` with a YAML-declared Java action.""" + config = Configuration() + config.set_string("python.pythonpath", sysconfig.get_paths()["purelib"]) + env = StreamExecutionEnvironment.get_execution_environment(config) + env.set_parallelism(1) + env.add_jars(f"file://{_TEST_JAR}") + + input_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.LONG()).map( + lambda x: x + ) + + agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env) + agents_env.load_yaml(_RESOURCES / "yaml_cross_language_java_action.yaml") + output_datastream = ( + agents_env.from_datastream(input=input_stream, key_selector=lambda x: 0) + .apply("yaml_cross_language_java_action_agent") + .to_datastream(Types.LONG()) + ) + + result_dir = tmp_path / "results" + result_dir.mkdir(parents=True, exist_ok=True) + output_datastream.map(lambda x: str(x), Types.STRING()).add_sink( + StreamingFileSink.for_row_format( + base_path=str(result_dir.absolute()), + encoder=Encoder.simple_string_encoder(), + ).build() + ) + + agents_env.execute() + + actual: list[int] = [] + for file in result_dir.iterdir(): + if file.is_dir(): + for child in file.iterdir(): + with child.open() as f: + actual.extend(int(line.strip()) for line in f if line.strip()) + elif file.is_file(): + with file.open() as f: + actual.extend(int(line.strip()) for line in f if line.strip()) + + actual.sort() + assert actual == [2, 4, 6, 8, 10], f"unexpected outputs: {actual}" diff --git a/python/flink_agents/e2e_tests/resources/yaml_cross_language_actions_in_java.yaml b/python/flink_agents/e2e_tests/resources/yaml_cross_language_actions_in_java.yaml new file mode 100644 index 000000000..b7acca473 --- /dev/null +++ b/python/flink_agents/e2e_tests/resources/yaml_cross_language_actions_in_java.yaml @@ -0,0 +1,49 @@ +agents: + - name: yaml_actions_in_java_agent + description: | + Python-host cross-language e2e agent whose orchestration actions are + Java. Complements yaml_cross_language_agent (Python actions + Java tool): + here the user actions ``process_input`` / ``process_chat_response`` are + ``type: java`` and dispatch to ``YamlCrossLanguageActions``, while the + Python-native built-in ``chat_model_action`` bridges ChatRequest→Response. + The math path also exercises a cross-language Java tool (calculateBMI). + + chat_model_connections: + - name: ollama_connection + clazz: ollama + request_timeout: 240.0 + - name: ollama_connection_java + clazz: ollama + type: java + endpoint: http://localhost:11434 + requestTimeout: 240 + + chat_model_setups: + - name: math_chat_model + clazz: ollama + connection: ollama_connection + model: qwen3:1.7b + tools: [calculateBMI] + extract_reasoning: true + - name: creative_chat_model + clazz: ollama + type: java + connection: ollama_connection_java + model: qwen3:1.7b + extract_reasoning: true + + tools: + - name: calculateBMI + type: java + function: org.apache.flink.agents.resource.test.ChatModelCrossLanguageAgent:calculateBMI + parameter_types: [java.lang.Double, java.lang.Double] + + actions: + - name: process_input + type: java + function: org.apache.flink.agents.resource.test.YamlCrossLanguageActions:processInput + listen_to: [input] + - name: process_chat_response + type: java + function: org.apache.flink.agents.resource.test.YamlCrossLanguageActions:processChatResponse + listen_to: [chat_response] diff --git a/python/flink_agents/e2e_tests/resources/yaml_cross_language_java_action.yaml b/python/flink_agents/e2e_tests/resources/yaml_cross_language_java_action.yaml new file mode 100644 index 000000000..d6c9029bb --- /dev/null +++ b/python/flink_agents/e2e_tests/resources/yaml_cross_language_java_action.yaml @@ -0,0 +1,15 @@ +agents: + - name: yaml_cross_language_java_action_agent + description: | + YAML-driven cross-language e2e agent whose action body is a Java + static method. Exercises the Python→Java action bridge: the agent + declares a single ``type: java`` action that dispatches into + ``JavaActionHandler.multiplyByTwo``. No ``parameter_types`` is + written — action signatures are fixed (Event, RunnerContext), so the + loader fills them in. + + actions: + - name: multiply_by_two + type: java + function: org.apache.flink.agents.resource.test.JavaActionHandler:multiplyByTwo + listen_to: [input] diff --git a/python/flink_agents/plan/tests/test_agent_plan.py b/python/flink_agents/plan/tests/test_agent_plan.py index 801223e21..9d68f8c1b 100644 --- a/python/flink_agents/plan/tests/test_agent_plan.py +++ b/python/flink_agents/plan/tests/test_agent_plan.py @@ -149,14 +149,7 @@ def test_action_inherited_from_parent_agent_class_is_rejected() -> None: class AgentWithCrossLanguageDecoratedAction(Agent): @action( InputEvent.EVENT_TYPE, - target=JavaFunction( - qualname=_JAVA_HANDLER_QUALNAME, - method_name="handleInput", - parameter_types=[ - "org.apache.flink.agents.api.Event", - "org.apache.flink.agents.api.context.RunnerContext", - ], - ), + target=JavaFunction.for_action(_JAVA_HANDLER_QUALNAME, "handleInput"), ) @staticmethod def handle(event: Event, ctx: RunnerContext) -> None: diff --git a/python/flink_agents/plan/tests/test_agent_plan_cross_language.py b/python/flink_agents/plan/tests/test_agent_plan_cross_language.py index 9a4b6d347..fbf467116 100644 --- a/python/flink_agents/plan/tests/test_agent_plan_cross_language.py +++ b/python/flink_agents/plan/tests/test_agent_plan_cross_language.py @@ -60,14 +60,7 @@ def _dummy_action(event: Event, ctx: RunnerContext) -> None: def _make_java_function_descriptor() -> ApiJavaFunction: - return ApiJavaFunction( - qualname="com.example.Handlers", - method_name="handle", - parameter_types=[ - "org.apache.flink.agents.api.Event", - "org.apache.flink.agents.api.context.RunnerContext", - ], - ) + return ApiJavaFunction.for_action("com.example.Handlers", "handle") def _make_python_function_descriptor() -> ApiPythonFunction: diff --git a/python/flink_agents/runtime/tests/test_local_runner_cross_language.py b/python/flink_agents/runtime/tests/test_local_runner_cross_language.py index 0bb1502e4..7075fc714 100644 --- a/python/flink_agents/runtime/tests/test_local_runner_cross_language.py +++ b/python/flink_agents/runtime/tests/test_local_runner_cross_language.py @@ -36,14 +36,7 @@ def echo_action(event: Event, ctx: RunnerContext) -> None: def _make_java_function_descriptor() -> ApiJavaFunction: - return ApiJavaFunction( - qualname="com.example.Handlers", - method_name="handle", - parameter_types=[ - "org.apache.flink.agents.api.Event", - "org.apache.flink.agents.api.context.RunnerContext", - ], - ) + return ApiJavaFunction.for_action("com.example.Handlers", "handle") def test_local_runner_dispatches_python_function_action() -> None: