From 20396f8f893df82a0696c8c5e5989ad201c8be22 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Sat, 13 Jun 2026 23:20:28 -0700 Subject: [PATCH] [api] Remove dead no-arg getExecutionEnvironment() referencing a non-existent LocalExecutionEnvironment The no-arg getExecutionEnvironment() and the env==null branch of the two-arg overload reflectively loaded org.apache.flink.agents.runtime.env.LocalExecutionEnvironment, a class that does not exist (the runtime env/ package ships only RemoteExecutionEnvironment) and never existed in git history. Both paths could therefore only throw at runtime, while the Javadoc promised a local execution environment for testing. No Java code calls the no-arg form; every example and test passes a real StreamExecutionEnvironment. Remove the no-arg overload and the dead branch, and require a non-null StreamExecutionEnvironment (fail-fast). Java agents always run on a Flink environment. --- .../api/AgentsExecutionEnvironment.java | 72 ++++++------------- .../api/AgentsExecutionEnvironmentTest.java | 46 ++++++++++++ 2 files changed, 67 insertions(+), 51 deletions(-) create mode 100644 api/src/test/java/org/apache/flink/agents/api/AgentsExecutionEnvironmentTest.java diff --git a/api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java b/api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java index b1555f069..cb4a3c6b2 100644 --- a/api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java +++ b/api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java @@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; @@ -72,75 +73,44 @@ public Map> getResources() { } /** - * Get agents execution environment. + * Get agents execution environment for the given Flink {@link StreamExecutionEnvironment}. * - *

Factory method that creates an appropriate execution environment based on the provided - * StreamExecutionEnvironment. If no environment is provided, a local execution environment is - * returned for testing and development. + *

Agents execute on a Flink environment, so {@code env} is required. * - *

When integrating with Flink DataStream/Table APIs, users should pass the Flink - * StreamExecutionEnvironment to enable remote execution capabilities. - * - * @param env Optional StreamExecutionEnvironment for remote execution. If null, a local - * execution environment will be created. + * @param env StreamExecutionEnvironment that agents execute on. Must not be null. * @param tEnv Optional StreamTableEnvironment for table-to-stream conversion. - * @return AgentsExecutionEnvironment appropriate for the execution context. + * @return AgentsExecutionEnvironment backed by the given Flink environment. */ public static AgentsExecutionEnvironment getExecutionEnvironment( StreamExecutionEnvironment env, @Nullable StreamTableEnvironment tEnv) { - if (env == null) { - // Return local execution environment for testing/development - try { - Class localEnvClass = - Class.forName( - "org.apache.flink.agents.runtime.env.LocalExecutionEnvironment"); - return (AgentsExecutionEnvironment) - localEnvClass.getDeclaredConstructor().newInstance(); - } catch (Exception e) { - throw new RuntimeException("Failed to create LocalExecutionEnvironment", e); - } - } else { - // Return remote execution environment for Flink integration - try { - Class remoteEnvClass = - Class.forName( - "org.apache.flink.agents.runtime.env.RemoteExecutionEnvironment"); - return (AgentsExecutionEnvironment) - remoteEnvClass - .getDeclaredConstructor( - StreamExecutionEnvironment.class, - StreamTableEnvironment.class) - .newInstance(env, tEnv); - } catch (Exception e) { - throw new RuntimeException("Failed to create RemoteExecutionEnvironment", e); - } + Preconditions.checkNotNull(env, "StreamExecutionEnvironment must not be null."); + // Return remote execution environment for Flink integration + try { + Class remoteEnvClass = + Class.forName("org.apache.flink.agents.runtime.env.RemoteExecutionEnvironment"); + return (AgentsExecutionEnvironment) + remoteEnvClass + .getDeclaredConstructor( + StreamExecutionEnvironment.class, StreamTableEnvironment.class) + .newInstance(env, tEnv); + } catch (Exception e) { + throw new RuntimeException("Failed to create RemoteExecutionEnvironment", e); } } /** - * Convenience method to get execution environment without Flink StreamTableEnvironment. If - * StreamTableEnvironment is needed during execution, the environment will auto crate using + * Convenience method to get execution environment without a Flink StreamTableEnvironment. If a + * StreamTableEnvironment is needed during execution, it is created automatically from the * StreamExecutionEnvironment. * - *

* @param env Optional StreamExecutionEnvironment for remote execution. If null, a local - * execution environment will be created. - * - * @return Remote execution environment for testing and development. + * @param env StreamExecutionEnvironment that agents execute on. Must not be null. + * @return AgentsExecutionEnvironment backed by the given Flink environment. */ public static AgentsExecutionEnvironment getExecutionEnvironment( StreamExecutionEnvironment env) { return getExecutionEnvironment(env, null); } - /** - * Convenience method to get execution environment without Flink integration. - * - * @return Local execution environment for testing and development. - */ - public static AgentsExecutionEnvironment getExecutionEnvironment() { - return getExecutionEnvironment(null); - } - /** * Returns a writable configuration object for setting configuration values. * diff --git a/api/src/test/java/org/apache/flink/agents/api/AgentsExecutionEnvironmentTest.java b/api/src/test/java/org/apache/flink/agents/api/AgentsExecutionEnvironmentTest.java new file mode 100644 index 000000000..21b402db7 --- /dev/null +++ b/api/src/test/java/org/apache/flink/agents/api/AgentsExecutionEnvironmentTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.api; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Unit tests for {@link AgentsExecutionEnvironment} factory methods. */ +class AgentsExecutionEnvironmentTest { + + /** + * A Flink {@link StreamExecutionEnvironment} is required. Agents run on Flink and there is no + * in-process Java environment, so a null env must fail fast at the factory, with a message + * naming the missing argument, rather than later. + */ + @Test + void getExecutionEnvironmentRejectsNullStreamEnv() { + assertThatThrownBy( + () -> + AgentsExecutionEnvironment.getExecutionEnvironment( + (StreamExecutionEnvironment) null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("StreamExecutionEnvironment"); + assertThatThrownBy(() -> AgentsExecutionEnvironment.getExecutionEnvironment(null, null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("StreamExecutionEnvironment"); + } +}