Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 126 additions & 60 deletions python/flink_agents/api/core_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,60 +17,10 @@
#################################################################################
import os
from enum import Enum
from typing import Any

from pyflink.java_gateway import get_gateway

from flink_agents.api.configuration import ConfigOption


def covert_j_option_to_python_option(j_option: Any) -> ConfigOption:
"""Convert a Java config option to a Python config option."""
key = j_option.getKey()
default = j_option.getDefaultValue()
type_name = j_option.getTypeName()

if type_name == "java.lang.String":
config_type = str
elif type_name == "java.lang.Integer":
config_type = int
elif type_name == "java.lang.Long":
config_type = int
elif type_name == "java.lang.Boolean":
config_type = bool
elif type_name == "java.lang.Float":
config_type = float
elif type_name == "java.lang.Double":
config_type = float
else:
msg = f"Unsupported type: {type_name}"
raise TypeError(msg)

return ConfigOption(key, config_type, default)


class AgentConfigOptionsMeta(type):
"""Metaclass for FlinkAgentsCoreOptions."""

def __init__(
cls, name: str, bases: tuple[type, ...], attrs: dict[str, Any]
) -> None:
"""Initialize the metaclass for FlinkAgentsCoreOptions."""
super().__init__(name, bases, attrs)

jvm = get_gateway().jvm
cls.jvm = jvm

def __getattr__(cls, item: str) -> ConfigOption:
j_option = getattr(
cls.jvm.org.apache.flink.agents.api.configuration.AgentConfigOptions,
item,
)

python_option = covert_j_option_to_python_option(j_option)
return python_option


class ErrorHandlingStrategy(Enum):
"""Error handling strategy for Agent.

Expand Down Expand Up @@ -108,26 +58,136 @@ class LoggerType(Enum):
FILE = "file"


class AgentConfigOptions(metaclass=AgentConfigOptionsMeta):
"""CoreOptions to manage core configuration parameters for Flink Agents."""
class EventLogLevel(Enum):
"""Log level for event logging.

JOB_IDENTIFIER = ConfigOption(
key="job-identifier",
config_type=str,
default=None,
)
Mirrors the Java ``EventLogLevel`` enum.
"""

OFF = "OFF"
STANDARD = "STANDARD"
VERBOSE = "VERBOSE"


class AgentConfigOptions:
"""CoreOptions to manage core configuration parameters for Flink Agents.

Options are declared explicitly in Python and must stay aligned with the
Java ``AgentConfigOptions`` class.
"""

EVENT_LOGGER_TYPE = ConfigOption(
key="eventLoggerType",
config_type=LoggerType,
default=LoggerType.SLF4J,
)

# Event log level config options
BASE_LOG_DIR = ConfigOption(
key="baseLogDir",
config_type=str,
default=None,
)

PRETTY_PRINT = ConfigOption(
key="prettyPrint",
config_type=bool,
default=False,
)

ACTION_STATE_STORE_BACKEND = ConfigOption(
key="actionStateStoreBackend",
config_type=str,
default=None,
)

KAFKA_BOOTSTRAP_SERVERS = ConfigOption(
key="kafkaBootstrapServers",
config_type=str,
default="localhost:9092",
)

KAFKA_ACTION_STATE_TOPIC = ConfigOption(
key="kafkaActionStateTopic",
config_type=str,
default=None,
)

KAFKA_ACTION_STATE_TOPIC_NUM_PARTITIONS = ConfigOption(
key="kafkaActionStateTopicNumPartitions",
config_type=int,
default=64,
)

KAFKA_ACTION_STATE_TOPIC_REPLICATION_FACTOR = ConfigOption(
key="kafkaActionStateTopicReplicationFactor",
config_type=int,
default=1,
)

FLUSS_BOOTSTRAP_SERVERS = ConfigOption(
key="flussBootstrapServers",
config_type=str,
default="localhost:9123",
)

FLUSS_ACTION_STATE_DATABASE = ConfigOption(
key="flussActionStateDatabase",
config_type=str,
default="flink_agents",
)

FLUSS_ACTION_STATE_TABLE = ConfigOption(
key="flussActionStateTable",
config_type=str,
default=None,
)

FLUSS_ACTION_STATE_TABLE_BUCKETS = ConfigOption(
key="flussActionStateTableBuckets",
config_type=int,
default=64,
)

FLUSS_SECURITY_PROTOCOL = ConfigOption(
key="flussSecurityProtocol",
config_type=str,
default="PLAINTEXT",
)

FLUSS_SASL_MECHANISM = ConfigOption(
key="flussSaslMechanism",
config_type=str,
default="PLAIN",
)

FLUSS_SASL_JAAS_CONFIG = ConfigOption(
key="flussSaslJaasConfig",
config_type=str,
default=None,
)

FLUSS_SASL_USERNAME = ConfigOption(
key="flussSaslUsername",
config_type=str,
default=None,
)

FLUSS_SASL_PASSWORD = ConfigOption(
key="flussSaslPassword",
config_type=str,
default=None,
)

JOB_IDENTIFIER = ConfigOption(
key="job-identifier",
config_type=str,
default=None,
)

EVENT_LOG_LEVEL = ConfigOption(
key="event-log.level",
config_type=str,
default="STANDARD",
config_type=EventLogLevel,
default=EventLogLevel.STANDARD,
)

EVENT_LOG_MAX_STRING_LENGTH = ConfigOption(
Expand All @@ -148,6 +208,12 @@ class AgentConfigOptions(metaclass=AgentConfigOptionsMeta):
default=5,
)

EVENT_LISTENERS = ConfigOption(
key="event-listeners",
config_type=list,
default=None,
)


class AgentExecutionOptions:
"""Execution options for Flink Agents."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,27 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################
from pathlib import Path

from pyflink.util.java_utils import add_jars_to_context_class_loader

from flink_agents.api.core_options import (
AgentConfigOptions,
AgentExecutionOptions,
EventLogLevel,
ShortTermMemoryTtlUpdate,
ShortTermMemoryTtlVisibility,
)

# This script is used to verify that Java-defined configuration options
# (e.g., AgentConfigOptions) are correctly exposed and accessible in the
# Python environment via the JAR file. It loads a Java JAR into the Python
# context and performs basic assertions on the configuration keys, types,
# and default values to ensure compatibility between Java and Python layers.
#
# The JAR file path is relative to this script and should be updated if
# the build structure changes.
# This script verifies that Python configuration options stay aligned with the
# Java AgentConfigOptions / AgentExecutionOptions definitions.
if __name__ == "__main__":
current_dir = Path(__file__).parent

jars = Path(current_dir).glob("../../../../../api/target/flink-agents-api-*.jar")
jars = [f"file:///{jar}" for jar in jars]
add_jars_to_context_class_loader(jars)

assert AgentConfigOptions.BASE_LOG_DIR.get_key() == "baseLogDir"
assert AgentConfigOptions.BASE_LOG_DIR.get_type() is str
assert AgentConfigOptions.BASE_LOG_DIR.get_default_value() is None

assert AgentConfigOptions.EVENT_LOG_LEVEL.get_key() == "event-log.level"
assert AgentConfigOptions.EVENT_LOG_LEVEL.get_type() is EventLogLevel
assert (
AgentConfigOptions.EVENT_LOG_LEVEL.get_default_value() is EventLogLevel.STANDARD
)

assert (
AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_MS.get_key()
== "short-term-memory.state-ttl.ms"
Expand Down
Loading