diff --git a/python/flink_agents/api/core_options.py b/python/flink_agents/api/core_options.py index f5247616c..d196777c4 100644 --- a/python/flink_agents/api/core_options.py +++ b/python/flink_agents/api/core_options.py @@ -17,60 +17,10 @@ ################################################################################# import os from enum import Enum -from typing import Any - -from pyflink.java_gateway import get_gateway from flink_agents.api.configuration import ConfigOption -def covert_j_option_to_python_option(j_option: Any) -> ConfigOption: - """Convert a Java config option to a Python config option.""" - key = j_option.getKey() - default = j_option.getDefaultValue() - type_name = j_option.getTypeName() - - if type_name == "java.lang.String": - config_type = str - elif type_name == "java.lang.Integer": - config_type = int - elif type_name == "java.lang.Long": - config_type = int - elif type_name == "java.lang.Boolean": - config_type = bool - elif type_name == "java.lang.Float": - config_type = float - elif type_name == "java.lang.Double": - config_type = float - else: - msg = f"Unsupported type: {type_name}" - raise TypeError(msg) - - return ConfigOption(key, config_type, default) - - -class AgentConfigOptionsMeta(type): - """Metaclass for FlinkAgentsCoreOptions.""" - - def __init__( - cls, name: str, bases: tuple[type, ...], attrs: dict[str, Any] - ) -> None: - """Initialize the metaclass for FlinkAgentsCoreOptions.""" - super().__init__(name, bases, attrs) - - jvm = get_gateway().jvm - cls.jvm = jvm - - def __getattr__(cls, item: str) -> ConfigOption: - j_option = getattr( - cls.jvm.org.apache.flink.agents.api.configuration.AgentConfigOptions, - item, - ) - - python_option = covert_j_option_to_python_option(j_option) - return python_option - - class ErrorHandlingStrategy(Enum): """Error handling strategy for Agent. @@ -108,14 +58,23 @@ class LoggerType(Enum): FILE = "file" -class AgentConfigOptions(metaclass=AgentConfigOptionsMeta): - """CoreOptions to manage core configuration parameters for Flink Agents.""" +class EventLogLevel(Enum): + """Log level for event logging. - JOB_IDENTIFIER = ConfigOption( - key="job-identifier", - config_type=str, - default=None, - ) + Mirrors the Java ``EventLogLevel`` enum. + """ + + OFF = "OFF" + STANDARD = "STANDARD" + VERBOSE = "VERBOSE" + + +class AgentConfigOptions: + """CoreOptions to manage core configuration parameters for Flink Agents. + + Options are declared explicitly in Python and must stay aligned with the + Java ``AgentConfigOptions`` class. + """ EVENT_LOGGER_TYPE = ConfigOption( key="eventLoggerType", @@ -123,11 +82,112 @@ class AgentConfigOptions(metaclass=AgentConfigOptionsMeta): default=LoggerType.SLF4J, ) - # Event log level config options + BASE_LOG_DIR = ConfigOption( + key="baseLogDir", + config_type=str, + default=None, + ) + + PRETTY_PRINT = ConfigOption( + key="prettyPrint", + config_type=bool, + default=False, + ) + + ACTION_STATE_STORE_BACKEND = ConfigOption( + key="actionStateStoreBackend", + config_type=str, + default=None, + ) + + KAFKA_BOOTSTRAP_SERVERS = ConfigOption( + key="kafkaBootstrapServers", + config_type=str, + default="localhost:9092", + ) + + KAFKA_ACTION_STATE_TOPIC = ConfigOption( + key="kafkaActionStateTopic", + config_type=str, + default=None, + ) + + KAFKA_ACTION_STATE_TOPIC_NUM_PARTITIONS = ConfigOption( + key="kafkaActionStateTopicNumPartitions", + config_type=int, + default=64, + ) + + KAFKA_ACTION_STATE_TOPIC_REPLICATION_FACTOR = ConfigOption( + key="kafkaActionStateTopicReplicationFactor", + config_type=int, + default=1, + ) + + FLUSS_BOOTSTRAP_SERVERS = ConfigOption( + key="flussBootstrapServers", + config_type=str, + default="localhost:9123", + ) + + FLUSS_ACTION_STATE_DATABASE = ConfigOption( + key="flussActionStateDatabase", + config_type=str, + default="flink_agents", + ) + + FLUSS_ACTION_STATE_TABLE = ConfigOption( + key="flussActionStateTable", + config_type=str, + default=None, + ) + + FLUSS_ACTION_STATE_TABLE_BUCKETS = ConfigOption( + key="flussActionStateTableBuckets", + config_type=int, + default=64, + ) + + FLUSS_SECURITY_PROTOCOL = ConfigOption( + key="flussSecurityProtocol", + config_type=str, + default="PLAINTEXT", + ) + + FLUSS_SASL_MECHANISM = ConfigOption( + key="flussSaslMechanism", + config_type=str, + default="PLAIN", + ) + + FLUSS_SASL_JAAS_CONFIG = ConfigOption( + key="flussSaslJaasConfig", + config_type=str, + default=None, + ) + + FLUSS_SASL_USERNAME = ConfigOption( + key="flussSaslUsername", + config_type=str, + default=None, + ) + + FLUSS_SASL_PASSWORD = ConfigOption( + key="flussSaslPassword", + config_type=str, + default=None, + ) + + JOB_IDENTIFIER = ConfigOption( + key="job-identifier", + config_type=str, + default=None, + ) + EVENT_LOG_LEVEL = ConfigOption( key="event-log.level", - config_type=str, - default="STANDARD", + config_type=EventLogLevel, + default=EventLogLevel.STANDARD, ) EVENT_LOG_MAX_STRING_LENGTH = ConfigOption( @@ -148,6 +208,12 @@ class AgentConfigOptions(metaclass=AgentConfigOptionsMeta): default=5, ) + EVENT_LISTENERS = ConfigOption( + key="event-listeners", + config_type=list, + default=None, + ) + class AgentExecutionOptions: """Execution options for Flink Agents.""" diff --git a/python/flink_agents/plan/tests/compatibility/create_python_option_from_java_option.py b/python/flink_agents/plan/tests/compatibility/create_python_option_from_java_option.py index b7251ee17..984ea92ae 100644 --- a/python/flink_agents/plan/tests/compatibility/create_python_option_from_java_option.py +++ b/python/flink_agents/plan/tests/compatibility/create_python_option_from_java_option.py @@ -15,36 +15,27 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################# -from pathlib import Path - -from pyflink.util.java_utils import add_jars_to_context_class_loader - from flink_agents.api.core_options import ( AgentConfigOptions, AgentExecutionOptions, + EventLogLevel, ShortTermMemoryTtlUpdate, ShortTermMemoryTtlVisibility, ) -# This script is used to verify that Java-defined configuration options -# (e.g., AgentConfigOptions) are correctly exposed and accessible in the -# Python environment via the JAR file. It loads a Java JAR into the Python -# context and performs basic assertions on the configuration keys, types, -# and default values to ensure compatibility between Java and Python layers. -# -# The JAR file path is relative to this script and should be updated if -# the build structure changes. +# This script verifies that Python configuration options stay aligned with the +# Java AgentConfigOptions / AgentExecutionOptions definitions. if __name__ == "__main__": - current_dir = Path(__file__).parent - - jars = Path(current_dir).glob("../../../../../api/target/flink-agents-api-*.jar") - jars = [f"file:///{jar}" for jar in jars] - add_jars_to_context_class_loader(jars) - assert AgentConfigOptions.BASE_LOG_DIR.get_key() == "baseLogDir" assert AgentConfigOptions.BASE_LOG_DIR.get_type() is str assert AgentConfigOptions.BASE_LOG_DIR.get_default_value() is None + assert AgentConfigOptions.EVENT_LOG_LEVEL.get_key() == "event-log.level" + assert AgentConfigOptions.EVENT_LOG_LEVEL.get_type() is EventLogLevel + assert ( + AgentConfigOptions.EVENT_LOG_LEVEL.get_default_value() is EventLogLevel.STANDARD + ) + assert ( AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_MS.get_key() == "short-term-memory.state-ttl.ms"