Search before asking
Description
When a Python action is initialized in the TaskManager-side Pemja worker, PythonActionExecutor.open() imports flink_agents.runtime.flink_runner_context.
This import chain eventually imports flink_agents.api.core_options:
flink_runner_context -> resource_cache -> resource_context -> agent_plan -> chat_model_action -> core_options
During core_options import, AgentConfigOptionsMeta resolves Java-side AgentConfigOptions through pyflink.java_gateway.get_gateway(). This starts a PyFlink gateway process from inside the Pemja worker, which is unexpected because Python is already embedded in the Flink JVM and should access Java objects through Pemja instead of launching a separate PyFlink gateway.
In some deployment environments, this causes action initialization to fail before the operator is opened.
Expected behavior:
Importing Flink Agents runtime modules in the Pemja worker should not start a PyFlink Java gateway. Python config options should be available without calling get_gateway() during runtime initialization.
Actual behavior:
core_options calls pyflink.java_gateway.get_gateway() during import, and the TaskManager task fails during ActionExecutionOperator.open().
Relevant sanitized stack trace:
pemja.core.PythonException: <class 'FileNotFoundError'>: 2
at .../subprocess._execute_child(subprocess.py:1955)
at .../subprocess.__init__(subprocess.py:1026)
at .../pyflink/pyflink_gateway_server.launch_gateway_server_process(...)
at .../pyflink/java_gateway.launch_gateway(...)
at .../pyflink/java_gateway.get_gateway(...)
at .../flink_agents/api/core_options.py:61
at .../flink_agents/api/core_options.py:111
at .../flink_agents/plan/actions/chat_model_action.py:33
at .../flink_agents/plan/agent_plan.py:39
at .../flink_agents/runtime/resource_context.py:26
at .../flink_agents/runtime/resource_cache.py:25
at .../flink_agents/runtime/flink_runner_context.py:54
at <string>.<module>(<string>:2)
at pemja.core.PythonInterpreter.exec(Native Method)
at org.apache.flink.agents.runtime.python.utils.PythonActionExecutor.open(PythonActionExecutor.java:98)
at org.apache.flink.agents.runtime.operator.PythonBridgeManager.initPythonActionExecutor(PythonBridgeManager.java:249)
at org.apache.flink.agents.runtime.operator.PythonBridgeManager.open(PythonBridgeManager.java:173)
at org.apache.flink.agents.runtime.operator.ActionExecutionOperator.open(ActionExecutionOperator.java:172)
Proposed fix:
Remove the Java-backed dynamic lookup from Python AgentConfigOptions.
Instead of using AgentConfigOptionsMeta.__getattr__ to resolve missing options from Java via get_gateway(), define the supported config options explicitly in both Java and Python. This avoids starting a PyFlink gateway in the Pemja worker and makes the Python option surface deterministic.
The fix should likely include:
- Remove
AgentConfigOptionsMeta and covert_j_option_to_python_option.
- Change
AgentConfigOptions to a normal Python class.
- Add explicit Python declarations for Java-side options that are intended to be exposed in Python, such as
BASE_LOG_DIR, PRETTY_PRINT, action state store options, and EVENT_LISTENERS.
How to reproduce
Run a Flink Agents job containing a Python action in a TaskManager/Pemja deployment environment where the embedded Python worker cannot start a PyFlink gateway server process.
During operator initialization, ActionExecutionOperator.open() calls PythonActionExecutor.open(), which executes Python imports and fails while importing flink_agents.api.core_options.
Version and environment
Flink Agents: 0.3-SNAPSHOT
Flink: 2.1 based distribution
Python: 3.11
Are you willing to submit a PR?
Search before asking
Description
When a Python action is initialized in the TaskManager-side Pemja worker,
PythonActionExecutor.open()importsflink_agents.runtime.flink_runner_context.This import chain eventually imports
flink_agents.api.core_options:flink_runner_context -> resource_cache -> resource_context -> agent_plan -> chat_model_action -> core_optionsDuring
core_optionsimport,AgentConfigOptionsMetaresolves Java-sideAgentConfigOptionsthroughpyflink.java_gateway.get_gateway(). This starts a PyFlink gateway process from inside the Pemja worker, which is unexpected because Python is already embedded in the Flink JVM and should access Java objects through Pemja instead of launching a separate PyFlink gateway.In some deployment environments, this causes action initialization to fail before the operator is opened.
Expected behavior:
Importing Flink Agents runtime modules in the Pemja worker should not start a PyFlink Java gateway. Python config options should be available without calling
get_gateway()during runtime initialization.Actual behavior:
core_optionscallspyflink.java_gateway.get_gateway()during import, and the TaskManager task fails duringActionExecutionOperator.open().Relevant sanitized stack trace:
Proposed fix:
Remove the Java-backed dynamic lookup from Python
AgentConfigOptions.Instead of using
AgentConfigOptionsMeta.__getattr__to resolve missing options from Java viaget_gateway(), define the supported config options explicitly in both Java and Python. This avoids starting a PyFlink gateway in the Pemja worker and makes the Python option surface deterministic.The fix should likely include:
AgentConfigOptionsMetaandcovert_j_option_to_python_option.AgentConfigOptionsto a normal Python class.BASE_LOG_DIR,PRETTY_PRINT, action state store options, andEVENT_LISTENERS.How to reproduce
Run a Flink Agents job containing a Python action in a TaskManager/Pemja deployment environment where the embedded Python worker cannot start a PyFlink gateway server process.
During operator initialization,
ActionExecutionOperator.open()callsPythonActionExecutor.open(), which executes Python imports and fails while importingflink_agents.api.core_options.Version and environment
Flink Agents: 0.3-SNAPSHOT
Flink: 2.1 based distribution
Python: 3.11
Are you willing to submit a PR?