Skip to content

[Bug] Python AgentConfigOptions starts PyFlink gateway during Pemja worker initialization #866

Description

@aka-dag

Search before asking

  • I searched in the issues and found nothing similar.

Description

When a Python action is initialized in the TaskManager-side Pemja worker, PythonActionExecutor.open() imports flink_agents.runtime.flink_runner_context.
This import chain eventually imports flink_agents.api.core_options:
flink_runner_context -> resource_cache -> resource_context -> agent_plan -> chat_model_action -> core_options
During core_options import, AgentConfigOptionsMeta resolves Java-side AgentConfigOptions through pyflink.java_gateway.get_gateway(). This starts a PyFlink gateway process from inside the Pemja worker, which is unexpected because Python is already embedded in the Flink JVM and should access Java objects through Pemja instead of launching a separate PyFlink gateway.
In some deployment environments, this causes action initialization to fail before the operator is opened.

Expected behavior:

Importing Flink Agents runtime modules in the Pemja worker should not start a PyFlink Java gateway. Python config options should be available without calling get_gateway() during runtime initialization.

Actual behavior:

core_options calls pyflink.java_gateway.get_gateway() during import, and the TaskManager task fails during ActionExecutionOperator.open().

Relevant sanitized stack trace:

pemja.core.PythonException: <class 'FileNotFoundError'>: 2
    at .../subprocess._execute_child(subprocess.py:1955)
    at .../subprocess.__init__(subprocess.py:1026)
    at .../pyflink/pyflink_gateway_server.launch_gateway_server_process(...)
    at .../pyflink/java_gateway.launch_gateway(...)
    at .../pyflink/java_gateway.get_gateway(...)
    at .../flink_agents/api/core_options.py:61
    at .../flink_agents/api/core_options.py:111
    at .../flink_agents/plan/actions/chat_model_action.py:33
    at .../flink_agents/plan/agent_plan.py:39
    at .../flink_agents/runtime/resource_context.py:26
    at .../flink_agents/runtime/resource_cache.py:25
    at .../flink_agents/runtime/flink_runner_context.py:54
    at <string>.<module>(<string>:2)
    at pemja.core.PythonInterpreter.exec(Native Method)
    at org.apache.flink.agents.runtime.python.utils.PythonActionExecutor.open(PythonActionExecutor.java:98)
    at org.apache.flink.agents.runtime.operator.PythonBridgeManager.initPythonActionExecutor(PythonBridgeManager.java:249)
    at org.apache.flink.agents.runtime.operator.PythonBridgeManager.open(PythonBridgeManager.java:173)
    at org.apache.flink.agents.runtime.operator.ActionExecutionOperator.open(ActionExecutionOperator.java:172)

Proposed fix:

Remove the Java-backed dynamic lookup from Python AgentConfigOptions.

Instead of using AgentConfigOptionsMeta.__getattr__ to resolve missing options from Java via get_gateway(), define the supported config options explicitly in both Java and Python. This avoids starting a PyFlink gateway in the Pemja worker and makes the Python option surface deterministic.

The fix should likely include:

  • Remove AgentConfigOptionsMeta and covert_j_option_to_python_option.
  • Change AgentConfigOptions to a normal Python class.
  • Add explicit Python declarations for Java-side options that are intended to be exposed in Python, such as BASE_LOG_DIR, PRETTY_PRINT, action state store options, and EVENT_LISTENERS.

How to reproduce

Run a Flink Agents job containing a Python action in a TaskManager/Pemja deployment environment where the embedded Python worker cannot start a PyFlink gateway server process.

During operator initialization, ActionExecutionOperator.open() calls PythonActionExecutor.open(), which executes Python imports and fails while importing flink_agents.api.core_options.

Version and environment

Flink Agents: 0.3-SNAPSHOT
Flink: 2.1 based distribution
Python: 3.11

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    bug[Issue Type] Something isn't working as expected.priority/majorDefault priority of the PR or issue.

    Type

    No type

    Fields

    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions