diff --git a/api/src/main/java/org/apache/flink/agents/api/EventType.java b/api/src/main/java/org/apache/flink/agents/api/EventType.java index f486a6d0e..3c3ef74bd 100644 --- a/api/src/main/java/org/apache/flink/agents/api/EventType.java +++ b/api/src/main/java/org/apache/flink/agents/api/EventType.java @@ -18,6 +18,12 @@ package org.apache.flink.agents.api; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + /** * Compile-time constants for built-in event types, sourced from each {@code XxxEvent.EVENT_TYPE}. * @@ -40,5 +46,25 @@ public final class EventType { public static final String ContextRetrievalResponseEvent = org.apache.flink.agents.api.event.ContextRetrievalResponseEvent.EVENT_TYPE; + /** + * Returns all built-in constants as an unmodifiable {@code name → event-type value} map. + * Enumerated reflectively from the {@code public static final String} fields of this class so + * newly added constants are picked up automatically. Iteration order is unspecified. + */ + public static Map allConstants() { + Map constants = new LinkedHashMap<>(); + for (Field field : EventType.class.getFields()) { + if (Modifier.isStatic(field.getModifiers()) && field.getType() == String.class) { + try { + constants.put(field.getName(), (String) field.get(null)); + } catch (IllegalAccessException e) { + // Unreachable: getFields() only returns public fields. + throw new IllegalStateException("Cannot read EventType." + field.getName(), e); + } + } + } + return Collections.unmodifiableMap(constants); + } + private EventType() {} } diff --git a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java index c39997da1..cb7de8069 100644 --- a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java +++ b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java @@ -33,6 +33,17 @@ public class AgentConfigOptions { public static final ConfigOption EVENT_LOGGER_TYPE = new ConfigOption<>("eventLoggerType", LoggerType.class, LoggerType.SLF4J); + /** + * Controls how the CEL condition evaluator handles runtime exceptions and non-Boolean results. + * Defaults to {@link CelEvaluationFailurePolicy#WARN_AND_SKIP} for streaming safety; set to + * {@link CelEvaluationFailurePolicy#FAIL} on strict-semantics pipelines to trigger failover. + */ + public static final ConfigOption CEL_EVALUATION_FAILURE_POLICY = + new ConfigOption<>( + "celEvaluationFailurePolicy", + CelEvaluationFailurePolicy.class, + CelEvaluationFailurePolicy.WARN_AND_SKIP); + /** The config parameter specifies the directory for the FileEvent file. */ public static final ConfigOption BASE_LOG_DIR = new ConfigOption<>("baseLogDir", String.class, null); diff --git a/api/src/main/java/org/apache/flink/agents/api/configuration/CelEvaluationFailurePolicy.java b/api/src/main/java/org/apache/flink/agents/api/configuration/CelEvaluationFailurePolicy.java new file mode 100644 index 000000000..8990d8224 --- /dev/null +++ b/api/src/main/java/org/apache/flink/agents/api/configuration/CelEvaluationFailurePolicy.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.agents.api.configuration; + +/** Behaviour when a CEL trigger condition throws or returns a non-Boolean at evaluation time. */ +public enum CelEvaluationFailurePolicy { + /** Log WARN and treat the action as not matching. Streaming-safe default. */ + WARN_AND_SKIP, + /** Rethrow as {@code IllegalStateException}; triggers Flink task failover. */ + FAIL +} diff --git a/dist/src/main/resources/META-INF/NOTICE b/dist/src/main/resources/META-INF/NOTICE index afc811a14..bee13e2e3 100644 --- a/dist/src/main/resources/META-INF/NOTICE +++ b/dist/src/main/resources/META-INF/NOTICE @@ -14,6 +14,8 @@ This project bundles the following dependencies under the Apache Software Licens - com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.18.2 - com.fasterxml.jackson.module:jackson-module-kotlin:2.18.2 - com.fasterxml:classmate:1.7.0 +- dev.cel:cel:0.12.0 +- dev.cel:protobuf:0.12.0 - org.apache.logging.log4j:log4j-api:2.23.1 - org.apache.logging.log4j:log4j-core:2.23.1 - org.apache.logging.log4j:log4j-slf4j-impl:2.23.1 @@ -182,6 +184,8 @@ This project bundles the following dependencies under the BSD 3-Clause license. See bundled license files for details. - com.google.protobuf:protobuf-java:3.25.5 +- com.google.re2j:re2j:1.8 +- org.antlr:antlr4-runtime:4.13.2 - org.ow2.asm:asm:9.3 This project bundles the following dependencies under the EPL2 license. diff --git a/dist/src/main/resources/META-INF/licenses/LICENSE.antlr4-runtime b/dist/src/main/resources/META-INF/licenses/LICENSE.antlr4-runtime new file mode 100644 index 000000000..5d2769415 --- /dev/null +++ b/dist/src/main/resources/META-INF/licenses/LICENSE.antlr4-runtime @@ -0,0 +1,28 @@ +Copyright (c) 2012-2022 The ANTLR Project. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + +1. Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright +notice, this list of conditions and the following disclaimer in the +documentation and/or other materials provided with the distribution. + +3. Neither name of copyright holders nor the names of its contributors +may be used to endorse or promote products derived from this software +without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR +CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/dist/src/main/resources/META-INF/licenses/LICENSE.re2j b/dist/src/main/resources/META-INF/licenses/LICENSE.re2j new file mode 100644 index 000000000..b620ae68f --- /dev/null +++ b/dist/src/main/resources/META-INF/licenses/LICENSE.re2j @@ -0,0 +1,32 @@ +This is a work derived from Russ Cox's RE2 in Go, whose license +http://golang.org/LICENSE is as follows: + +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the + distribution. + + * Neither the name of Google Inc. nor the names of its contributors + may be used to endorse or promote products derived from this + software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/e2e-test/cel-fixtures/cel_conformance_cases.yaml b/e2e-test/cel-fixtures/cel_conformance_cases.yaml new file mode 100644 index 000000000..f5d1e9a43 --- /dev/null +++ b/e2e-test/cel-fixtures/cel_conformance_cases.yaml @@ -0,0 +1,284 @@ +# CEL trigger condition conformance test corpus. +# Shared by Java (CelConditionEvaluatorTest) and Python (test_cel_pipeline_conformance). +# Each case is evaluated end-to-end: parse → validate → check → eval (no AST rewriting). +# +# Activation contract: type/EventType are framework-owned; attributes is the single-level +# merge of output.* > root > input.*; merged keys are also promoted to the top level; id is +# the user attribute when present, else the event UUID. + +- name: null_condition_passes + condition: null + event: { id: abc, type: test, attributes: {} } + expected: true + +- name: empty_condition_passes + condition: '' + event: { id: abc, type: test, attributes: {} } + expected: true + +- name: filter_by_type_match + condition: 'type == "_input_event"' + event: { id: abc, type: _input_event, attributes: {} } + expected: true + +- name: filter_by_type_mismatch + condition: 'type == "other"' + event: { id: abc, type: test, attributes: {} } + expected: false + +- name: filter_by_id + condition: 'id == "550e8400-e29b-41d4-a716-446655440000"' + event: { id: '550e8400-e29b-41d4-a716-446655440000', type: test, attributes: {} } + expected: true + +- name: in_operator_for_map_key + condition: '"score" in attributes && attributes["score"] >= 3' + event: { id: abc, type: test, attributes: { score: 5 } } + expected: true + +- name: in_operator_missing_key + condition: '"score" in attributes && attributes["score"] >= 3' + event: { id: abc, type: test, attributes: {} } + expected: false + +- name: has_macro_on_map + condition: 'has(attributes.score) && attributes.score >= 3' + event: { id: abc, type: test, attributes: { score: 5 } } + expected: true + +- name: has_macro_missing_key + condition: 'has(attributes.missing) && attributes.missing == true' + event: { id: abc, type: test, attributes: {} } + expected: false + +- name: nested_map_access + condition: 'has(attributes.meta) && attributes.meta.source == "api"' + event: { id: abc, type: test, attributes: { meta: { source: api } } } + expected: true + +- name: combined_type_and_attribute + condition: 'type == "review" && has(attributes.score) && attributes.score >= 3' + event: { id: abc, type: review, attributes: { score: 4 } } + expected: true + +- name: boolean_attribute + condition: 'has(attributes.success) && attributes.success == true' + event: { id: abc, type: test, attributes: { success: true } } + expected: true + +- name: string_attribute + condition: 'has(attributes.model) && attributes.model == "gpt-4"' + event: { id: abc, type: _chat_request_event, attributes: { model: gpt-4 } } + expected: true + +- name: numeric_comparison_less_than + condition: 'has(attributes.retry_count) && attributes.retry_count < 3' + event: { id: abc, type: test, attributes: { retry_count: 1 } } + expected: true + +- name: numeric_comparison_fails + condition: 'has(attributes.retry_count) && attributes.retry_count < 3' + event: { id: abc, type: test, attributes: { retry_count: 5 } } + expected: false + +- name: json_string_auto_parse_object + condition: 'has(attributes.input) && has(attributes.input.id) && attributes.input.id == "B000YFSR4W"' + event: + id: abc + type: _input_event + attributes: + input: '{"id": "B000YFSR4W", "review": "comfy fit"}' + expected: true + +- name: json_string_auto_parse_nested_mismatch + condition: 'has(attributes.input) && has(attributes.input.id) && attributes.input.id == "OTHER"' + event: + id: abc + type: _input_event + attributes: + input: '{"id": "B000YFSR4W", "review": "comfy fit"}' + expected: false + +- name: json_string_auto_parse_array + condition: 'has(attributes.tags) && attributes.tags[0] == "sports"' + event: + id: abc + type: test + attributes: + tags: '["sports", "shoes"]' + expected: true + +- name: plain_string_not_parsed + condition: 'has(attributes.name) && attributes.name == "hello world"' + event: { id: abc, type: test, attributes: { name: hello world } } + expected: true + +- name: invalid_json_string_kept_as_string + condition: 'has(attributes.broken) && attributes.broken == "{not json"' + event: { id: abc, type: test, attributes: { broken: '{not json' } } + expected: true + +- name: empty_attributes_has_returns_false + condition: 'has(attributes.key)' + event: { id: abc, type: test, attributes: {} } + expected: false + +- name: deeply_nested_json_string_3_levels + condition: 'has(attributes.data) && has(attributes.data.order) && attributes.data.order.id == "O100"' + event: + id: abc + type: test + attributes: + data: '{"order": {"id": "O100", "amount": 500}}' + expected: true + +- name: numeric_int_equals_int + condition: 'has(attributes.count) && attributes.count == 42' + event: { id: abc, type: test, attributes: { count: 42 } } + expected: true + +- name: or_condition_first_branch_true + condition: 'has(attributes.a) && attributes.a == 1 || has(attributes.b) && attributes.b == 2' + event: { id: abc, type: test, attributes: { a: 1 } } + expected: true + +- name: or_condition_second_branch_true + condition: '(has(attributes.a) && attributes.a == 1) || (has(attributes.b) && attributes.b == 2)' + event: { id: abc, type: test, attributes: { b: 2 } } + expected: true + +- name: dot_chain_deep_nested_access + condition: 'has(region.width) && region.width.score >= 80' + event: { id: abc, type: test, attributes: { region: { width: { score: 95 } } } } + expected: true + +- name: dot_chain_deep_nested_mismatch + condition: 'has(region.width) && region.width.score >= 80' + event: { id: abc, type: test, attributes: { region: { width: { score: 50 } } } } + expected: false + +- name: dot_chain_six_levels + condition: 'has(input.region) && input.region.width.score.month12 == 88' + event: + id: abc + type: test + attributes: + input: { region: { width: { score: { month12: 88 } } } } + expected: true + +- name: or_short_circuit_left_true_skips_right_error + condition: '(type == "hit") || (attributes.nope.deep > 3)' + event: { id: abc, type: hit, attributes: {} } + expected: true + +- name: and_short_circuit_has_guards_missing_field_access + condition: 'has(attributes.missing) && (attributes.missing.deep > 3)' + event: { id: abc, type: test, attributes: {} } + expected: false + +- name: empty_json_object_string_parsed_has_returns_false + condition: 'has(attributes.config.missing_key)' + event: { id: abc, type: test, attributes: { config: '{}' } } + expected: false + +- name: nested_json_string_recursively_parsed + condition: 'has(attributes.outer.inner) && attributes.outer.inner.id == 1' + event: + id: abc + type: test + attributes: + outer: '{"inner": "{\"id\": 1}"}' + expected: true + +- name: event_type_select_expr_folded_to_literal + condition: 'type == EventType.InputEvent' + event: { id: abc, type: _input_event, attributes: {} } + expected: true + +- name: int64_max_in_range + condition: 'attributes.x == 9223372036854775807' + event: { id: abc, type: test, attributes: { x: 9223372036854775807 } } + expected: true + +# ----- Flattened-activation contract (Method D) ----- + +- name: bare_root_attribute_access + condition: 'score >= 3' + event: { id: abc, type: test, attributes: { score: 5 } } + expected: true + +- name: has_on_bare_attribute_present + condition: 'has(score) && score >= 3' + event: { id: abc, type: test, attributes: { score: 5 } } + expected: true + +- name: has_on_bare_attribute_missing + condition: 'has(score) && score >= 3' + event: { id: abc, type: test, attributes: {} } + expected: false + +- name: output_subkey_wins_over_root_attribute + condition: 'k == "out"' + event: { id: abc, type: test, attributes: { k: root, output: { k: out } } } + expected: true + +- name: root_attribute_wins_over_input_subkey + condition: 'k == "root"' + event: { id: abc, type: test, attributes: { k: root, input: { k: in } } } + expected: true + +- name: user_id_attribute_overrides_event_uuid + condition: 'id == "tenant-42"' + event: { id: abc, type: test, attributes: { id: tenant-42 } } + expected: true + +- name: framework_type_wins_over_attribute_type + condition: 'type == "real"' + event: { id: abc, type: real, attributes: { type: fake } } + expected: true + +- name: attribute_type_still_reachable_via_attributes + condition: 'attributes.type == "fake"' + event: { id: abc, type: real, attributes: { type: fake } } + expected: true + +- name: flatten_is_single_level_only + condition: 'mylist.name == "x"' + event: { id: abc, type: test, attributes: { mylist: { name: x } } } + expected: true + +- name: event_type_constant_combined_with_bare_attribute + condition: 'type == EventType.InputEvent && score > 0.8' + event: { id: abc, type: _input_event, attributes: { score: 0.9 } } + expected: true + +# ----- Nested has() short-circuit (has(a.b.c) desugaring) ----- +# has(a.b.c) desugars to has(a) && has(a.b) && has(a.b.c): a missing intermediate (or a +# wholly-absent root) short-circuits to false in BOTH engines, never throwing "no such key". +# Without this, cel-java threw on the intermediate select and WARN_AND_SKIP turned it into a +# silent missed trigger, while cel-python returned false — a cross-language divergence. + +- name: has_nested_intermediate_missing_returns_false + condition: 'has(attributes.meta.source)' + event: { id: abc, type: test, attributes: {} } + expected: false + +- name: has_nested_deep_all_present + condition: 'has(attributes.meta.source)' + event: { id: abc, type: test, attributes: { meta: { source: api } } } + expected: true + +- name: has_nested_guard_then_access + condition: 'has(attributes.meta.source) && attributes.meta.source == "api"' + event: { id: abc, type: test, attributes: { meta: { source: api } } } + expected: true + +- name: has_bare_root_nested_intermediate_missing_returns_false + condition: 'has(value.user.tier)' + event: { id: abc, type: test, attributes: { value: { other: 1 } } } + expected: false + +- name: has_bare_root_nested_wholly_absent_returns_false + condition: 'has(value.user.tier)' + event: { id: abc, type: test, attributes: {} } + expected: false diff --git a/e2e-test/cel-fixtures/disallowed_macros.yaml b/e2e-test/cel-fixtures/disallowed_macros.yaml new file mode 100644 index 000000000..1a755945b --- /dev/null +++ b/e2e-test/cel-fixtures/disallowed_macros.yaml @@ -0,0 +1,39 @@ +# CEL macro whitelist test fixture. +# Shared by Java (CelExpressionFacadeTest) and Python (test_cel_facade). +# Phase-1 policy: only `has()` is allowed; all others are rejected. + +reject: + # Free-form calls + - "exists(x, x > 0)" + - "exists_one(x, x == 1)" + - "all(x, x > 0)" + - "filter(x, x > 0)" + - "map(x, x + 1)" + # Method-style calls (most common CEL usage) + - "attributes.tags.exists(x, x == 'a')" + - "attributes.scores.all(s, s >= 60)" + - "attributes.items.filter(i, i.active == true)" + - "attributes.items.map(i, i.name)" + - "attributes.tags.exists_one(t, t == 'urgent')" + # Nested in logical operators + - "has(attributes.x) && attributes.list.all(t, t > 0)" + - "type == '_input_event' || attributes.items.exists(i, i.price > 100)" + # Nested macros inside macro arguments + - "attributes.groups.exists(g, g.members.all(m, m.active == true))" + +accept: + # has() is allowed + - "has(attributes.score)" + - "has(attributes.user_id) && attributes.score > 10" + # Basic operators, no macros + - "type == '_input_event'" + - "attributes.score >= 90" + - "attributes.count != 0 && type == '_output_event'" + # Macro name inside string literal (not a call) + - "type == 'exists_check_event'" + - "attributes.name == 'filter_result'" + - "attributes.label == 'has all filter map exists'" + # Macro name as identifier substring (not a call) + - "attributes.existing == true" + - "attributes.mapKey == 'value'" + - "attributes.filter_count > 5" diff --git a/examples/pom.xml b/examples/pom.xml index 26f7f9f20..8fdaa7ad0 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -65,6 +65,13 @@ under the License. ${project.version} provided + + + + dev.cel + cel + ${cel.version} + \ No newline at end of file diff --git a/plan/pom.xml b/plan/pom.xml index 02df3c2c3..8281fd1ab 100644 --- a/plan/pom.xml +++ b/plan/pom.xml @@ -50,6 +50,11 @@ under the License. com.fasterxml.jackson.core jackson-databind + + dev.cel + cel + ${cel.version} + io.github.bonede diff --git a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java index 8af6ca5f1..af57fc799 100644 --- a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java +++ b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java @@ -85,6 +85,13 @@ public class AgentPlan implements Serializable { /** Mapping from event type string to list of actions that should be triggered by the event. */ private Map> actionsByEvent; + /** + * Actions with at least one CEL expression — need runtime evaluation, not just the + * actionsByEvent map. Transient; rebuilt by {@link #rebuildActionsWithCel()} after + * deserialization. + */ + private transient List actionsWithCel = new ArrayList<>(); + /** Two-level mapping of resource type to resource name to resource provider. */ private Map> resourceProviders; @@ -95,6 +102,7 @@ public AgentPlan(Map actions, Map> actionsB this.actionsByEvent = actionsByEvent; this.resourceProviders = new HashMap<>(); this.config = new AgentConfiguration(); + rebuildActionsWithCel(); } public AgentPlan( @@ -105,6 +113,7 @@ public AgentPlan( this.actionsByEvent = actionsByEvent; this.resourceProviders = resourceProviders; this.config = new AgentConfiguration(); + rebuildActionsWithCel(); } public AgentPlan( @@ -116,6 +125,7 @@ public AgentPlan( this.actionsByEvent = actionsByEvent; this.resourceProviders = resourceProviders; this.config = config; + rebuildActionsWithCel(); } /** @@ -180,6 +190,7 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE this.actionsByEvent = agentPlan.getActionsByEvent(); this.resourceProviders = agentPlan.getResourceProviders(); this.config = agentPlan.getConfig(); + rebuildActionsWithCel(); } private void extractActions( @@ -199,24 +210,51 @@ private void extractActions( // Create an Action Action action = new Action(actionName, function, triggerConditions, config); - - // Add to actions map - actions.put(action.getName(), action); - - // Add to actionsByEvent map - for (String eventTypeName : triggerConditions) { - actionsByEvent.computeIfAbsent(eventTypeName, k -> new ArrayList<>()).add(action); - } + registerAction(action); } private void addBuiltAction(Action action) { - // Add to actions map - actions.put(action.getName(), action); + registerAction(action); + } - // Add to actionsByEvent map + /** + * Registers an action into both {@link #actions}, {@link #actionsByEvent} (using type-only + * entries from {@link Action#getListenEventTypes()}), and {@link #actionsWithCel} if the action + * carries any CEL expression. + */ + private void registerAction(Action action) { + actions.put(action.getName(), action); for (String eventTypeName : action.getListenEventTypes()) { actionsByEvent.computeIfAbsent(eventTypeName, k -> new ArrayList<>()).add(action); } + if (action.hasCelCondition()) { + actionsWithCel.add(action); + } + } + + /** + * Rebuilds {@link #actionsWithCel} from {@link #actions}. Used after deserialization (where + * actionsWithCel is transient). + */ + private void rebuildActionsWithCel() { + if (actionsWithCel == null) { + actionsWithCel = new ArrayList<>(); + } else { + actionsWithCel.clear(); + } + if (actions == null) { + return; + } + for (Action action : actions.values()) { + if (action.hasCelCondition()) { + actionsWithCel.add(action); + } + } + } + + /** Returns the list of actions that require CEL evaluation. */ + public List getActionsWithCel() { + return actionsWithCel; } private void extractActionsFromAgent(Agent agent) throws Exception { diff --git a/plan/src/main/java/org/apache/flink/agents/plan/actions/Action.java b/plan/src/main/java/org/apache/flink/agents/plan/actions/Action.java index 18e3b83b4..fa9945b71 100644 --- a/plan/src/main/java/org/apache/flink/agents/plan/actions/Action.java +++ b/plan/src/main/java/org/apache/flink/agents/plan/actions/Action.java @@ -23,11 +23,16 @@ import org.apache.flink.agents.api.Event; import org.apache.flink.agents.api.context.RunnerContext; import org.apache.flink.agents.plan.Function; +import org.apache.flink.agents.plan.condition.ParsedCondition; +import org.apache.flink.agents.plan.condition.ParsedCondition.CelExpression; +import org.apache.flink.agents.plan.condition.ParsedCondition.TypeMatch; import org.apache.flink.agents.plan.serializer.ActionJsonDeserializer; import org.apache.flink.agents.plan.serializer.ActionJsonSerializer; import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -35,8 +40,8 @@ /** * Representation of an agent action with unified trigger conditions. * - *

Each entry of {@code triggerConditions} is an event type name string. Multiple entries combine - * with OR. + *

Each entry of {@code triggerConditions} is either a plain event-type name (matched against + * {@code event.getType()}) or a CEL expression. Multiple entries combine with OR. */ @JsonSerialize(using = ActionJsonSerializer.class) @JsonDeserialize(using = ActionJsonDeserializer.class) @@ -45,6 +50,12 @@ public class Action { private final Function exec; private final List triggerConditions; + /** + * Transient cache of classified {@link #triggerConditions}: CEL AST isn't Kryo-serialisable, so + * it is rebuilt lazily after deserialization via {@link #parsedConditions()}. + */ + private transient List parsedConditions; + // TODO: support nested map/list with non primitive type value. @Nullable private final Map config; @@ -54,13 +65,34 @@ public Action( List triggerConditions, @Nullable Map config) throws Exception { + if (triggerConditions == null || triggerConditions.isEmpty()) { + throw new IllegalArgumentException( + "Action '" + name + "' must have at least one entry in 'triggerConditions'"); + } this.name = name; this.exec = exec; this.triggerConditions = triggerConditions; this.config = config; + + // Build + validate eagerly so a bad condition fails at construction, not at runtime. + this.parsedConditions = buildParsedConditions(name, triggerConditions); + exec.checkSignature(new Class[] {Event.class, RunnerContext.class}); } + private static List buildParsedConditions( + String name, List triggerConditions) { + List parsed = new ArrayList<>(triggerConditions.size()); + for (String entry : triggerConditions) { + if (entry == null || entry.isEmpty()) { + throw new IllegalArgumentException( + "Action '" + name + "' has a null/empty trigger entry"); + } + parsed.add(ParsedCondition.classify(entry)); + } + return Collections.unmodifiableList(parsed); + } + public Action(String name, Function exec, List triggerConditions) throws Exception { this(name, exec, triggerConditions, null); } @@ -73,19 +105,43 @@ public Function getExec() { return exec; } - /** Returns the full trigger conditions list. */ + /** Returns the full trigger conditions list (type names and CEL expressions). */ public List getTriggerConditions() { return triggerConditions; } - /** - * Returns event-type names. Kept for callers that still consume the old naming; in this PR all - * trigger entries are plain event-type names so the list is identical to {@link - * #getTriggerConditions()}. A follow-up PR introduces CEL expressions and overrides this to - * filter out non-type entries. - */ + /** Returns parsed conditions in declaration order (unmodifiable). */ + public List getParsedConditions() { + return parsedConditions(); + } + + /** Lazily rebuilds parsedConditions on first access after deserialization. */ + private synchronized List parsedConditions() { + if (parsedConditions == null) { + parsedConditions = buildParsedConditions(name, triggerConditions); + } + return parsedConditions; + } + + /** Returns event-type names extracted from {@link TypeMatch} entries (CEL entries skipped). */ public List getListenEventTypes() { - return triggerConditions; + List typeNames = new ArrayList<>(); + for (ParsedCondition pc : parsedConditions()) { + if (pc instanceof TypeMatch) { + typeNames.add(pc.source()); + } + } + return typeNames; + } + + /** Returns whether this action carries at least one CEL expression entry. */ + public boolean hasCelCondition() { + for (ParsedCondition pc : parsedConditions()) { + if (pc instanceof CelExpression) { + return true; + } + } + return false; } @Nullable diff --git a/plan/src/main/java/org/apache/flink/agents/plan/condition/CelMacroPolicy.java b/plan/src/main/java/org/apache/flink/agents/plan/condition/CelMacroPolicy.java new file mode 100644 index 000000000..3d5385ee1 --- /dev/null +++ b/plan/src/main/java/org/apache/flink/agents/plan/condition/CelMacroPolicy.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.plan.condition; + +import com.google.common.collect.ImmutableList; +import dev.cel.common.CelAbstractSyntaxTree; +import dev.cel.common.ast.CelExpr; +import dev.cel.common.navigation.CelNavigableAst; +import dev.cel.parser.CelMacro; +import dev.cel.parser.CelMacroExprFactory; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.TreeSet; + +/** + * CEL macro rules for trigger conditions: the custom {@code has()} macro, the macro whitelist, and + * the reserved identifiers. + */ +public final class CelMacroPolicy { + + /** + * Custom parse-time {@code has()} macro: {@code has(a.b)} tests field presence on {@code a}; + * {@code has(score)} tests presence of key {@code score} in the {@code attributes} map. + */ + public static final CelMacro HAS = CelMacro.newGlobalMacro("has", 1, CelMacroPolicy::expandHas); + + /** CEL's logical-AND overload id; used to AND-join the desugared has() presence chain. */ + private static final String LOGICAL_AND_FUNCTION = "_&&_"; + + private static Optional expandHas( + CelMacroExprFactory exprFactory, CelExpr target, ImmutableList arguments) { + CelExpr arg = arguments.get(0); + if (arg.exprKind().getKind() == CelExpr.ExprKind.Kind.SELECT && !arg.select().testOnly()) { + return Optional.of(expandHasChain(exprFactory, arg)); + } + if (arg.exprKind().getKind() == CelExpr.ExprKind.Kind.IDENT + && !RESERVED_IDENTIFIERS.contains(arg.ident().name())) { + // has(score) → key presence in the attributes map. + return Optional.of( + exprFactory.newSelect( + exprFactory.newIdentifier("attributes"), arg.ident().name(), true)); + } + return Optional.of( + exprFactory.reportError( + "invalid argument to has() macro: expected a field selection like" + + " has(a.b) or an attribute name like has(score)")); + } + + /** + * Desugars select chain {@code a.b...z} into {@code has(a) && has(a.b) && ... && has(a.b...z)}. + * The {@code &&} short-circuits, so a missing intermediate key yields false instead of letting + * cel-java throw on a deeper select. + */ + private static CelExpr expandHasChain(CelMacroExprFactory exprFactory, CelExpr select) { + CelExpr operand = select.select().operand(); + CelExpr presence = exprFactory.newSelect(operand, select.select().field(), true); + if (operand.exprKind().getKind() == CelExpr.ExprKind.Kind.SELECT + && !operand.select().testOnly()) { + // Recurse into the deeper levels first so they are AND-ed to the left of this one. + return exprFactory.newGlobalCall( + LOGICAL_AND_FUNCTION, expandHasChain(exprFactory, operand), presence); + } + if (operand.exprKind().getKind() == CelExpr.ExprKind.Kind.IDENT + && !RESERVED_IDENTIFIERS.contains(operand.ident().name())) { + // Guard a bare root like has(score): key presence in the attributes map. + return exprFactory.newGlobalCall( + LOGICAL_AND_FUNCTION, + exprFactory.newSelect( + exprFactory.newIdentifier("attributes"), operand.ident().name(), true), + presence); + } + return presence; + } + + /** The complete set of CEL standard macro names. */ + public static final Set CEL_STANDARD_MACROS = + Set.of("has", "exists", "exists_one", "all", "filter", "map"); + + /** Macros allowed in trigger condition expressions. */ + public static final Set ALLOWED_MACROS = Set.of("has"); + + /** Returns the first disallowed macro call found in {@code ast}, or empty if none. */ + public static Optional findFirstDisallowedMacro(CelAbstractSyntaxTree ast) { + return CelNavigableAst.fromAst(ast) + .getRoot() + .allNodes() + .filter(node -> node.getKind() == CelExpr.ExprKind.Kind.CALL) + .map(node -> node.expr().call().function()) + .filter(fn -> CEL_STANDARD_MACROS.contains(fn) && !ALLOWED_MACROS.contains(fn)) + .findFirst(); + } + + /** Formats the disallowed-macro error message; kept aligned with the Python template. */ + public static String formatDisallowedMessage(String macro, String source) { + return "CEL expression uses disallowed macro '" + + macro + + "': \"" + + source + + "\". Only allows: " + + new TreeSet<>(ALLOWED_MACROS) + + "."; + } + + /** Names rejected as bare event-type aliases and never shadowed by user attributes. */ + public static final Set RESERVED_IDENTIFIERS; + + static { + Set set = + new HashSet<>( + Set.of( + // Framework-owned activation variables. + "type", + "attributes", + "EventType", + // CEL literals. + "true", + "false", + "null", + // CEL operators / type-conversion functions / container types. + "in", + "int", + "uint", + "double", + "string", + "bool", + "bytes", + "list")); + // All CEL standard macro names. + set.addAll(CEL_STANDARD_MACROS); + RESERVED_IDENTIFIERS = Collections.unmodifiableSet(set); + } + + private CelMacroPolicy() {} +} diff --git a/plan/src/main/java/org/apache/flink/agents/plan/condition/ParsedCondition.java b/plan/src/main/java/org/apache/flink/agents/plan/condition/ParsedCondition.java new file mode 100644 index 000000000..123efb119 --- /dev/null +++ b/plan/src/main/java/org/apache/flink/agents/plan/condition/ParsedCondition.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.plan.condition; + +import dev.cel.common.CelAbstractSyntaxTree; +import dev.cel.common.CelOptions; +import dev.cel.common.CelValidationException; +import dev.cel.common.ast.CelExpr; +import dev.cel.parser.CelParser; +import dev.cel.parser.CelParserFactory; + +import java.util.Objects; + +/** + * A parsed {@code Action.triggerConditions} entry — either {@link TypeMatch} or {@link + * CelExpression}. {@link #classify} turns a raw entry string into one of the two. + */ +public interface ParsedCondition { + + /** Original user-written entry string. */ + String source(); + + /** Mirrors the runtime facade caps so a too-deep / too-long expression fails at classify. */ + CelOptions CEL_OPTIONS = + CelOptions.current() + .maxParseRecursionDepth(32) + .maxExpressionCodePointSize(8_192) + .build(); + + /** + * Parser with the custom {@code has()} macro and the same resource caps as the runtime facade + * parser. + */ + CelParser CEL_PARSER = + CelParserFactory.standardCelParserBuilder() + .setOptions(CEL_OPTIONS) + .addMacros(CelMacroPolicy.HAS) + .build(); + + /** + * Parses a {@code triggerConditions} entry: a non-reserved bare-identifier root becomes a + * {@link TypeMatch}, everything else a {@link CelExpression}. + */ + static ParsedCondition classify(String source) { + if (source == null || source.isEmpty()) { + throw new IllegalArgumentException( + "ParsedCondition.classify: source must be non-null and non-empty"); + } + CelAbstractSyntaxTree ast; + try { + ast = CEL_PARSER.parse(source).getAst(); + } catch (CelValidationException e) { + throw new IllegalArgumentException( + "Invalid CEL expression: \"" + source + "\" — " + e.getMessage(), e); + } + CelExpr root = ast.getExpr(); + if (root.exprKind().getKind() == CelExpr.ExprKind.Kind.IDENT) { + String name = root.ident().name(); + if (CelMacroPolicy.RESERVED_IDENTIFIERS.contains(name)) { + throw new IllegalArgumentException( + "'" + + name + + "' is a CEL reserved keyword and cannot be used as an " + + "event type name. Did you mean: @action(\"" + + name + + " == 'xxx'\") or @action(\"attributes." + + name + + "\")?"); + } + return new TypeMatch(name); + } + return new CelExpression(source); + } + + /** A plain event-type match. {@link #source()} is compared against {@code Event.getType()}. */ + final class TypeMatch implements ParsedCondition { + + private final String source; + + public TypeMatch(String source) { + this.source = source; + } + + @Override + public String source() { + return source; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof TypeMatch)) return false; + return source.equals(((TypeMatch) o).source); + } + + @Override + public int hashCode() { + return Objects.hash(source); + } + + @Override + public String toString() { + return "TypeMatch{source=" + source + "}"; + } + } + + /** A CEL expression. Source-only; compiled elsewhere. */ + final class CelExpression implements ParsedCondition { + + private final String source; + + public CelExpression(String source) { + this.source = source; + } + + @Override + public String source() { + return source; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof CelExpression)) return false; + return source.equals(((CelExpression) o).source); + } + + @Override + public int hashCode() { + return Objects.hash(source); + } + + @Override + public String toString() { + return "CelExpression{source=" + source + "}"; + } + } +} diff --git a/plan/src/test/java/org/apache/flink/agents/plan/actions/ActionConstructionFailureTest.java b/plan/src/test/java/org/apache/flink/agents/plan/actions/ActionConstructionFailureTest.java new file mode 100644 index 000000000..bda5a6fb3 --- /dev/null +++ b/plan/src/test/java/org/apache/flink/agents/plan/actions/ActionConstructionFailureTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.plan.actions; + +import org.apache.flink.agents.plan.Function; +import org.apache.flink.agents.plan.JavaFunction; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +/** Pin the "silent → loud" contract: invalid CEL fails at Action construction time. */ +class ActionConstructionFailureTest { + + /** Reusable executor — Action's constructor only validates the signature. */ + private static Function execFn() throws Exception { + return new JavaFunction( + "org.apache.flink.agents.plan.actions.ActionConstructionFailureTest", + "noopExec", + new Class[] { + org.apache.flink.agents.api.Event.class, + org.apache.flink.agents.api.context.RunnerContext.class, + }); + } + + /** Dummy executor referenced reflectively from {@link #execFn()}. */ + public static void noopExec( + org.apache.flink.agents.api.Event e, + org.apache.flink.agents.api.context.RunnerContext c) {} + + // -- Loud failure on syntactically invalid CEL -- + + @Test + void construction_throwsOnInvalidCel_trailingOperator() throws Exception { + Function fn = execFn(); + assertThatThrownBy(() -> new Action("bad", fn, Collections.singletonList("type =="))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("type =="); + } + + @Test + void construction_throwsOnInvalidCel_danglingPlus() throws Exception { + Function fn = execFn(); + assertThatThrownBy(() -> new Action("bad", fn, Collections.singletonList("event.size +"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("event.size +"); + } + + @Test + void construction_throwsOnInvalidCel_unbalancedParen() throws Exception { + Function fn = execFn(); + assertThatThrownBy( + () -> new Action("bad", fn, Collections.singletonList("has(attributes.k"))) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void construction_throwsOnInvalidCel_inMixedList() throws Exception { + // The bad entry is sandwiched between two well-formed ones; the loud-failure contract + // says we don't silently drop it just because the rest of the list is fine. + Function fn = execFn(); + assertThatThrownBy( + () -> + new Action( + "bad", + fn, + Arrays.asList( + "InputEvent", "type ==", "has(attributes.k)"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("type =="); + } + + // -- Pre-existing structural validations -- + + @Test + void construction_throwsOnEmptyContains() throws Exception { + Function fn = execFn(); + assertThatThrownBy(() -> new Action("bad", fn, Collections.emptyList())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("must have at least one entry"); + } + + @Test + void construction_throwsOnNullContains() throws Exception { + Function fn = execFn(); + assertThatThrownBy(() -> new Action("bad", fn, null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("must have at least one entry"); + } + + // -- Negative control: well-formed inputs -- + + @Test + void construction_succeedsForWellFormedInputs() throws Exception { + Function fn = execFn(); + assertDoesNotThrow( + () -> + new Action( + "ok", + fn, + Arrays.asList( + "InputEvent", + "_input_event", + "type == 'x' && id > 0", + "has(attributes.user_id)"))); + } +} diff --git a/plan/src/test/java/org/apache/flink/agents/plan/actions/ActionParsedConditionsTest.java b/plan/src/test/java/org/apache/flink/agents/plan/actions/ActionParsedConditionsTest.java new file mode 100644 index 000000000..0f829b018 --- /dev/null +++ b/plan/src/test/java/org/apache/flink/agents/plan/actions/ActionParsedConditionsTest.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.plan.actions; + +import org.apache.flink.agents.api.EventType; +import org.apache.flink.agents.plan.Function; +import org.apache.flink.agents.plan.JavaFunction; +import org.apache.flink.agents.plan.condition.ParsedCondition; +import org.apache.flink.agents.plan.condition.ParsedCondition.CelExpression; +import org.apache.flink.agents.plan.condition.ParsedCondition.TypeMatch; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Tests for parsedConditions: classify, round-trip, and derived views. */ +class ActionParsedConditionsTest { + + /** Reusable executor — Action's constructor only validates the signature. */ + private static Function execFn() throws Exception { + return new JavaFunction( + "org.apache.flink.agents.plan.actions.ActionParsedConditionsTest", + "noopExec", + new Class[] { + org.apache.flink.agents.api.Event.class, + org.apache.flink.agents.api.context.RunnerContext.class, + }); + } + + /** Dummy executor referenced reflectively from {@link #execFn()}. */ + public static void noopExec( + org.apache.flink.agents.api.Event e, + org.apache.flink.agents.api.context.RunnerContext c) {} + + private static List celSources(Action action) { + return action.getParsedConditions().stream() + .filter(pc -> pc instanceof CelExpression) + .map(pc -> ((CelExpression) pc).source()) + .collect(Collectors.toList()); + } + + @Test + void parsedConditions_singleTypeName_resolvesToTypeMatch() throws Exception { + Action a = new Action("a", execFn(), Collections.singletonList(EventType.InputEvent)); + + List pcs = a.getParsedConditions(); + assertEquals(1, pcs.size()); + assertTrue(pcs.get(0) instanceof TypeMatch); + + TypeMatch tm = (TypeMatch) pcs.get(0); + assertEquals(EventType.InputEvent, tm.source()); + assertEquals(EventType.InputEvent, tm.source()); + + assertEquals(Collections.singletonList(EventType.InputEvent), a.getListenEventTypes()); + assertEquals(Collections.emptyList(), celSources(a)); + } + + @Test + void parsedConditions_rawEventTypeString_isPreservedUnchanged() throws Exception { + Action a = new Action("a", execFn(), Collections.singletonList("_input_event")); + + ParsedCondition pc = a.getParsedConditions().get(0); + assertTrue(pc instanceof TypeMatch); + TypeMatch tm = (TypeMatch) pc; + assertEquals("_input_event", tm.source()); + assertEquals("_input_event", tm.source()); + } + + @Test + void parsedConditions_celExpression_resolvesToCelExpression() throws Exception { + Action a = + new Action( + "a", + execFn(), + Collections.singletonList("type == EventType.InputEvent && id > 0")); + + List pcs = a.getParsedConditions(); + assertEquals(1, pcs.size()); + assertTrue(pcs.get(0) instanceof CelExpression); + assertEquals("type == EventType.InputEvent && id > 0", pcs.get(0).source()); + + assertEquals(Collections.emptyList(), a.getListenEventTypes()); + assertEquals( + Collections.singletonList("type == EventType.InputEvent && id > 0"), celSources(a)); + } + + @Test + void parsedConditions_hasCall_isClassifiedAsCel() throws Exception { + Action a = new Action("a", execFn(), Collections.singletonList("has(attributes.user_id)")); + assertTrue(a.getParsedConditions().get(0) instanceof CelExpression); + } + + @Test + void parsedConditions_mixedList_preservesOrderAndKinds() throws Exception { + List contains = + Arrays.asList( + EventType.InputEvent, // TypeMatch + "_chat_response_event", // TypeMatch (raw) + "type == 'x' && id > 0", // CelExpression + EventType.OutputEvent, // TypeMatch + "has(attributes.k)"); // CelExpression + Action a = new Action("a", execFn(), contains); + + List pcs = a.getParsedConditions(); + assertEquals(5, pcs.size()); + + assertTrue(pcs.get(0) instanceof TypeMatch); + assertEquals(EventType.InputEvent, ((TypeMatch) pcs.get(0)).source()); + + assertTrue(pcs.get(1) instanceof TypeMatch); + assertEquals("_chat_response_event", ((TypeMatch) pcs.get(1)).source()); + + assertTrue(pcs.get(2) instanceof CelExpression); + + assertTrue(pcs.get(3) instanceof TypeMatch); + assertEquals(EventType.OutputEvent, ((TypeMatch) pcs.get(3)).source()); + + assertTrue(pcs.get(4) instanceof CelExpression); + + assertEquals( + Arrays.asList(EventType.InputEvent, "_chat_response_event", EventType.OutputEvent), + a.getListenEventTypes()); + assertEquals(Arrays.asList("type == 'x' && id > 0", "has(attributes.k)"), celSources(a)); + } + + @Test + void parsedConditions_rejectsNullOrEmptyEntry() throws Exception { + List withEmpty = Arrays.asList(EventType.InputEvent, "", "type == 'x'"); + assertThrows(IllegalArgumentException.class, () -> new Action("a", execFn(), withEmpty)); + + List withNull = Arrays.asList(EventType.InputEvent, null, "type == 'x'"); + assertThrows(IllegalArgumentException.class, () -> new Action("a", execFn(), withNull)); + } + + @Test + void parsedConditions_listIsUnmodifiable() throws Exception { + Action a = new Action("a", execFn(), Collections.singletonList(EventType.InputEvent)); + List pcs = a.getParsedConditions(); + assertNotNull(pcs); + assertThrows( + UnsupportedOperationException.class, + () -> pcs.add(new CelExpression("type == 'y'"))); + } + + @Test + void parsedConditions_hasCelCondition_isTrueWhenAnyCelEntryPresent() throws Exception { + Action a = new Action("a", execFn(), Arrays.asList(EventType.InputEvent, "type == 'x'")); + assertTrue(a.hasCelCondition()); + + Action b = new Action("b", execFn(), Collections.singletonList(EventType.InputEvent)); + assertEquals(false, b.hasCelCondition()); + } + + @Test + void parsedConditions_listenEventTypes_returnsTypeMatchNamesOnly() throws Exception { + Action a = + new Action( + "a", + execFn(), + Arrays.asList(EventType.InputEvent, "type == 'x'", EventType.OutputEvent)); + assertEquals( + Arrays.asList(EventType.InputEvent, EventType.OutputEvent), + a.getListenEventTypes()); + } +} diff --git a/plan/src/test/java/org/apache/flink/agents/plan/condition/ParsedConditionTest.java b/plan/src/test/java/org/apache/flink/agents/plan/condition/ParsedConditionTest.java new file mode 100644 index 000000000..a05f4bc4d --- /dev/null +++ b/plan/src/test/java/org/apache/flink/agents/plan/condition/ParsedConditionTest.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.plan.condition; + +import org.apache.flink.agents.plan.condition.ParsedCondition.CelExpression; +import org.apache.flink.agents.plan.condition.ParsedCondition.TypeMatch; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests for {@link ParsedCondition}: value semantics of TypeMatch/CelExpression, and {@link + * ParsedCondition#classify} (corpus equivalence, regression guards, and routing). + */ +class ParsedConditionTest { + + // ================================================================== + // Value semantics + // ================================================================== + + @Test + void typeMatch_storesSource() { + TypeMatch tm = new TypeMatch("_input_event"); + assertEquals("_input_event", tm.source()); + assertTrue(((ParsedCondition) tm) instanceof TypeMatch); + assertFalse(((ParsedCondition) tm) instanceof CelExpression); + } + + @Test + void typeMatch_equalityAndHash() { + TypeMatch a = new TypeMatch("_input_event"); + TypeMatch b = new TypeMatch("_input_event"); + TypeMatch c = new TypeMatch("_output_event"); + assertEquals(a, b); + assertEquals(a.hashCode(), b.hashCode()); + assertNotEquals(a, c); + } + + @Test + void celExpression_storesSource() { + CelExpression ce = new CelExpression("type == EventType.InputEvent && id > 0"); + assertEquals("type == EventType.InputEvent && id > 0", ce.source()); + assertFalse(((ParsedCondition) ce) instanceof TypeMatch); + assertTrue(((ParsedCondition) ce) instanceof CelExpression); + } + + @Test + void celExpression_equalityAndHash() { + CelExpression a = new CelExpression("type == 'x'"); + CelExpression b = new CelExpression("type == 'x'"); + CelExpression c = new CelExpression("type == 'y'"); + assertEquals(a, b); + assertEquals(a.hashCode(), b.hashCode()); + assertNotEquals(a, c); + } + + @Test + void celExpression_isNotEqualToTypeMatchWithSameSource() { + CelExpression ce = new CelExpression("InputEvent"); + TypeMatch tm = new TypeMatch("InputEvent"); + assertNotEquals(ce, tm); + assertNotEquals(tm, ce); + } + + // ================================================================== + // classify(): corpus. Format: input, expected_kind, expected_event_type (null for CEL). + // ================================================================== + + static Stream classifierCorpus() { + return Stream.of( + // ----- bare identifiers are TypeMatch (no short-name translation) ----- + Arguments.of("InputEvent", "TypeMatch", "InputEvent"), + Arguments.of("OutputEvent", "TypeMatch", "OutputEvent"), + Arguments.of("ChatRequestEvent", "TypeMatch", "ChatRequestEvent"), + Arguments.of("ChatResponseEvent", "TypeMatch", "ChatResponseEvent"), + Arguments.of("ToolRequestEvent", "TypeMatch", "ToolRequestEvent"), + Arguments.of("ToolResponseEvent", "TypeMatch", "ToolResponseEvent"), + Arguments.of( + "ContextRetrievalRequestEvent", + "TypeMatch", + "ContextRetrievalRequestEvent"), + Arguments.of( + "ContextRetrievalResponseEvent", + "TypeMatch", + "ContextRetrievalResponseEvent"), + + // ----- raw event-type strings & passthrough ----- + Arguments.of("_input_event", "TypeMatch", "_input_event"), + Arguments.of("_my_custom_event", "TypeMatch", "_my_custom_event"), + Arguments.of("CustomEvent", "TypeMatch", "CustomEvent"), + Arguments.of("in_progress_event", "TypeMatch", "in_progress_event"), + + // ----- equality / inequality ----- + Arguments.of("type == 'x'", "CelExpression", null), + Arguments.of("type != 'x'", "CelExpression", null), + + // ----- numeric comparisons ----- + Arguments.of("id > 0", "CelExpression", null), + Arguments.of("id < 0", "CelExpression", null), + Arguments.of("id >= 0", "CelExpression", null), + Arguments.of("id <= 0", "CelExpression", null), + + // ----- logical combinators ----- + Arguments.of("a == 1 && b == 2", "CelExpression", null), + Arguments.of("a == 1 || b == 2", "CelExpression", null), + + // ----- has() macro & in keyword ----- + Arguments.of("has(attributes.user_id)", "CelExpression", null), + Arguments.of("type in ['a', 'b']", "CelExpression", null), + + // ----- attribute access shapes ----- + Arguments.of("attributes.score >= 3", "CelExpression", null), + Arguments.of("attributes.score < 0", "CelExpression", null), + Arguments.of("attributes['k'] == 'v'", "CelExpression", null), + Arguments.of("type in ['_input_event']", "CelExpression", null), + + // ----- size / null / has compounds ----- + Arguments.of("size(attributes) > 0", "CelExpression", null), + Arguments.of("category == 'premium' && score > 5", "CelExpression", null), + Arguments.of("id != null", "CelExpression", null), + Arguments.of("has(attributes.k)", "CelExpression", null), + + // ----- EventType-qualified comparisons ----- + Arguments.of("type == EventType.InputEvent", "CelExpression", null), + Arguments.of("type == EventType.InputEvent && id > 0", "CelExpression", null)); + } + + @ParameterizedTest(name = "[{index}] classify(\"{0}\") -> {1}") + @MethodSource("classifierCorpus") + void astClassifierMatchesCorpus(String input, String expectedKind, String expectedEventType) { + ParsedCondition pc = ParsedCondition.classify(input); + + switch (expectedKind) { + case "TypeMatch": + assertThat(pc) + .as("expected TypeMatch for input %s", input) + .isInstanceOf(TypeMatch.class); + assertThat(pc.source()) + .as("source must round-trip unchanged for input %s", input) + .isEqualTo(input); + assertThat(((TypeMatch) pc).source()) + .as("eventType disagreement on input %s", input) + .isEqualTo(expectedEventType); + break; + case "CelExpression": + assertThat(pc) + .as("expected CelExpression for input %s", input) + .isInstanceOf(CelExpression.class); + assertThat(pc.source()) + .as("source must round-trip unchanged for input %s", input) + .isEqualTo(input); + assertThat(expectedEventType) + .as( + "CelExpression cases must declare null expected_event_type (input %s)", + input) + .isNull(); + break; + default: + throw new IllegalStateException( + "Unknown expected_kind '" + expectedKind + "' for input " + input); + } + } + + // Regression: inputs the old regex classifier silently mis-routed as TypeMatch. + + static Stream formerlySilentRegressions() { + return Stream.of( + "event.someFlag", // SELECT root + "!event.flag", // CALL root, op `!_` + "myBoolFunc()", // CALL root, function call + "'OrderEvent'", // CONSTANT root, string literal + "true", // CONSTANT root, boolean literal + "event.size + 1 > 0" // CALL root, comparison + ); + } + + @ParameterizedTest(name = "[{index}] {0} -> CelExpression (was silent TypeMatch)") + @MethodSource("formerlySilentRegressions") + void fixesSilentRegexMisclassifications(String input) { + ParsedCondition pc = ParsedCondition.classify(input); + assertThat(pc) + .as( + "input %s must classify as CelExpression — was silently TypeMatch under the" + + " regex classifier", + input) + .isInstanceOf(CelExpression.class); + assertThat(pc.source()).isEqualTo(input); + } + + // -- Failure mode: silent → loud -- + + @Test + void invalidCelThrows_atClassifyTime() { + assertThatThrownBy(() -> ParsedCondition.classify("type ==")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid CEL expression"); + assertThatThrownBy(() -> ParsedCondition.classify("event.size +")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid CEL expression"); + } + + @Test + void rejectsNullOrEmptyInput() { + assertThatThrownBy(() -> ParsedCondition.classify(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("non-null"); + assertThatThrownBy(() -> ParsedCondition.classify("")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("non-null"); + } + + // -- Reserved-keyword rejection -- + + @Test + void rejectsReservedKeywordAsEventTypeName() { + // "type" is a framework-owned activation variable; using it as a bare event type must + // fail loudly. + assertThatThrownBy(() -> ParsedCondition.classify("type")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("reserved keyword") + .hasMessageContaining("@action(\"type == "); + } + + @Test + void idIsNotReservedAndClassifiesAsTypeMatch() { + // "id" is user-overridable, not framework-reserved — a bare "id" entry is a legal event + // type alias. + ParsedCondition pc = ParsedCondition.classify("id"); + assertThat(pc).isInstanceOf(TypeMatch.class); + assertThat(pc.source()).isEqualTo("id"); + } + + @Test + void hasOnBareAttributeNameClassifiesAsCelExpression() { + // Custom has() macro accepts bare attribute names at parse time. + ParsedCondition pc = ParsedCondition.classify("has(score)"); + assertThat(pc).isInstanceOf(CelExpression.class); + assertThat(pc.source()).isEqualTo("has(score)"); + } +} diff --git a/plan/src/test/java/org/apache/flink/agents/plan/serializer/ActionJsonSerializerTest.java b/plan/src/test/java/org/apache/flink/agents/plan/serializer/ActionJsonSerializerTest.java index b6c2170b5..1c675f881 100644 --- a/plan/src/test/java/org/apache/flink/agents/plan/serializer/ActionJsonSerializerTest.java +++ b/plan/src/test/java/org/apache/flink/agents/plan/serializer/ActionJsonSerializerTest.java @@ -37,6 +37,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; /** Test for {@link ActionJsonSerializer}. */ @@ -147,27 +148,19 @@ public void testSerializeMultipleEventTypes() throws Exception { } @Test - public void testSerializeEmptyEventTypes() throws Exception { - // Create a JavaFunction - JavaFunction function = - new JavaFunction( - "org.apache.flink.agents.plan.TestAction", - "legal", - new Class[] {Event.class, RunnerContext.class}); - - // Create an Action with an empty event types list - Action action = new Action("emptyEventsAction", function, Collections.emptyList()); - - // Serialize the action to JSON - String json = new ObjectMapper().writeValueAsString(action); - - // Verify the JSON contains the expected fields - assertTrue( - json.contains("\"name\":\"emptyEventsAction\""), - "JSON should contain the action name"); - assertTrue( - json.contains("\"trigger_conditions\":[]"), - "JSON should contain an empty trigger conditions array"); + public void testSerializeEmptyEventTypes() { + // Empty trigger conditions are now rejected at Action construction time + // (the CEL feature requires at least one entry to evaluate against). + assertThrows( + IllegalArgumentException.class, + () -> { + JavaFunction function = + new JavaFunction( + "org.apache.flink.agents.plan.TestAction", + "legal", + new Class[] {Event.class, RunnerContext.class}); + new Action("emptyEventsAction", function, Collections.emptyList()); + }); } @Test diff --git a/pom.xml b/pom.xml index 4728fa589..57b8c8185 100644 --- a/pom.xml +++ b/pom.xml @@ -52,6 +52,7 @@ under the License. 3.27.7 5.14.2 1.15.4 + 0.12.0 true diff --git a/runtime/pom.xml b/runtime/pom.xml index 422c5c320..c7b78ccab 100644 --- a/runtime/pom.xml +++ b/runtime/pom.xml @@ -193,9 +193,22 @@ under the License. jackson-datatype-jsr310 ${jackson.version} + + dev.cel + cel + ${cel.version} + + + + src/test/resources + + + ${project.basedir}/../e2e-test/cel-fixtures + + diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/condition/ActionRouter.java b/runtime/src/main/java/org/apache/flink/agents/runtime/condition/ActionRouter.java new file mode 100644 index 000000000..d4e2f819f --- /dev/null +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/condition/ActionRouter.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.runtime.condition; + +import org.apache.flink.agents.api.Event; +import org.apache.flink.agents.api.configuration.AgentConfigOptions; +import org.apache.flink.agents.plan.AgentPlan; +import org.apache.flink.agents.plan.actions.Action; +import org.apache.flink.agents.plan.condition.ParsedCondition; +import org.apache.flink.agents.plan.condition.ParsedCondition.CelExpression; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; + +/** + * Routes an event to matching actions: type-index fast path first, then CEL slow path. + * + *

Each action fires at most once per event; typed hits ordered before CEL hits. + */ +public final class ActionRouter { + + private final AgentPlan agentPlan; + + /** Null when the plan contains no CEL expressions. */ + private CelConditionEvaluator conditionEvaluator; + + public ActionRouter(AgentPlan agentPlan) { + if (agentPlan == null) { + throw new IllegalArgumentException("ActionRouter: agentPlan must not be null"); + } + this.agentPlan = agentPlan; + } + + /** Pre-compiles all CEL expressions in the plan. */ + public void open() { + List celExpressions = new ArrayList<>(); + for (Action action : agentPlan.getActions().values()) { + for (ParsedCondition pc : action.getParsedConditions()) { + if (pc instanceof CelExpression) { + celExpressions.add((CelExpression) pc); + } + } + } + if (celExpressions.isEmpty()) { + return; + } + conditionEvaluator = + new CelConditionEvaluator( + agentPlan + .getConfig() + .get(AgentConfigOptions.CEL_EVALUATION_FAILURE_POLICY)); + conditionEvaluator.initPrograms(celExpressions); + } + + /** Returns actions to fire for {@code event}: typed hits first, then CEL hits. */ + public List route(Event event) { + List typedHits = + agentPlan + .getActionsByEvent() + .getOrDefault(event.getType(), Collections.emptyList()); + + // CEL candidates = actions with at least one CEL entry, excluding those already + // matched by typed routing for this event type. This avoids double-firing. + List celCandidates; + List withCel = agentPlan.getActionsWithCel(); + if (withCel.isEmpty()) { + celCandidates = Collections.emptyList(); + } else { + celCandidates = new ArrayList<>(); + for (Action a : withCel) { + if (!typedHits.contains(a)) { + celCandidates.add(a); + } + } + } + + if (celCandidates.isEmpty()) { + return typedHits; + } + + // Preserves typed-first ordering and deduplicates. + LinkedHashSet matched = new LinkedHashSet<>(typedHits); + + Map activation = null; + for (Action a : celCandidates) { + // Within-action OR: first matching CEL expression admits the action. + for (ParsedCondition pc : a.getParsedConditions()) { + if (!(pc instanceof CelExpression)) { + continue; + } + if (activation == null) { + activation = conditionEvaluator.createActivation(event); + } + if (conditionEvaluator.evaluate((CelExpression) pc, activation)) { + matched.add(a); + break; + } + } + } + return new ArrayList<>(matched); + } + + /** Idempotent. */ + public void close() { + if (conditionEvaluator != null) { + conditionEvaluator.close(); + conditionEvaluator = null; + } + } +} diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/condition/CelConditionEvaluator.java b/runtime/src/main/java/org/apache/flink/agents/runtime/condition/CelConditionEvaluator.java new file mode 100644 index 000000000..cd41f60b1 --- /dev/null +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/condition/CelConditionEvaluator.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.runtime.condition; + +import com.fasterxml.jackson.databind.ObjectMapper; +import dev.cel.runtime.CelEvaluationException; +import dev.cel.runtime.CelRuntime; +import org.apache.flink.agents.api.Event; +import org.apache.flink.agents.api.configuration.CelEvaluationFailurePolicy; +import org.apache.flink.agents.plan.condition.ParsedCondition.CelExpression; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Evaluates CEL condition expressions against event data. */ +public class CelConditionEvaluator { + + private static final Logger LOG = LoggerFactory.getLogger(CelConditionEvaluator.class); + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + /** Frozen after {@link #initPrograms}; cleared by {@link #close}. */ + @Nullable private Map programCache; + + private final CelEvaluationFailurePolicy failurePolicy; + + public CelConditionEvaluator() { + this(CelEvaluationFailurePolicy.WARN_AND_SKIP); + } + + public CelConditionEvaluator(CelEvaluationFailurePolicy failurePolicy) { + this.failurePolicy = failurePolicy; + } + + /** Pre-compiles {@code expressions} and freezes the cache. Nulls are skipped. */ + public void initPrograms(Collection expressions) { + Map programs = new HashMap<>(); + for (CelExpression expression : expressions) { + if (expression == null) { + continue; + } + String source = expression.source(); + programs.computeIfAbsent(source, CelExpressionFacade::toProgram); + } + this.programCache = Collections.unmodifiableMap(programs); + } + + public void close() { + programCache = null; + } + + /** Evaluates {@code expression} (which must have been pre-compiled). Null returns true. */ + public boolean evaluate(@Nullable CelExpression expression, Map activation) { + if (expression == null) { + return true; + } + String source = expression.source(); + try { + CelRuntime.Program program = programCache.get(source); + if (program == null) { + throw new IllegalStateException( + "CEL condition was not pre-compiled via initPrograms(): \"" + + source + + "\""); + } + return evaluateProgram(source, program, activation); + } catch (CelEvaluationException e) { + if (failurePolicy == CelEvaluationFailurePolicy.FAIL) { + throw new IllegalStateException( + "CEL condition evaluation failed for '" + source + "'", e); + } + LOG.warn("CEL condition evaluation failed for '{}', skipping action", source, e); + return false; + } + } + + private boolean evaluateProgram( + String condition, CelRuntime.Program program, Map activation) + throws CelEvaluationException { + Object result = program.eval(activation); + if (result instanceof Boolean) { + return (Boolean) result; + } + String msg = + String.format( + "CEL condition '%s' returned non-boolean type %s, treating as false", + condition, result == null ? "null" : result.getClass().getName()); + if (failurePolicy == CelEvaluationFailurePolicy.FAIL) { + throw new IllegalStateException(msg); + } + LOG.warn(msg); + return false; + } + + /** + * Builds the CEL activation. Contract (mirror of Python {@code cel_facade}): + * + *

    + *
  • {@code type} and {@code EventType} are framework-owned and always win. + *
  • {@code attributes} holds the single-level merge of user data: {@code output.*} subkeys, + * then root attribute fields, then {@code input.*} subkeys ({@code output > root > input} + * on collision, via {@link Map#putIfAbsent}). Only one level is flattened — nested fields + * stay nested ({@code mylist.name}, not {@code name}). + *
  • Every merged attribute is also promoted to the activation top level, so conditions can + * use bare identifiers ({@code score > 0.8}) without any AST rewriting. Framework keys + * are never shadowed. + *
  • {@code id} is the user-supplied {@code id} attribute when present, otherwise falls back + * to the event UUID. + *
+ * + *

JSON-shaped strings auto-parse first; narrow numerics widen to long/double. + */ + @SuppressWarnings("unchecked") + public Map createActivation(Event event) { + Map activation = new HashMap<>(); + activation.put("type", event.getType()); + activation.put("EventType", CelExpressionFacade.EVENT_TYPE_CONSTANTS); + + Object normalizedAttrs = normalizeValue(event.getAttributes(), 0); + Map merged = new HashMap<>(); + if (normalizedAttrs instanceof Map) { + Map attrs = (Map) normalizedAttrs; + + // Precedence: output subkeys > root attributes > input subkeys (putIfAbsent keeps the + // earliest insertion). Root iteration includes the "input"/"output" maps themselves, + // so nested paths like input.region.width keep working. + Object outputObj = attrs.get("output"); + if (outputObj instanceof Map) { + ((Map) outputObj).forEach(merged::putIfAbsent); + } + attrs.forEach(merged::putIfAbsent); + Object inputObj = attrs.get("input"); + if (inputObj instanceof Map) { + ((Map) inputObj).forEach(merged::putIfAbsent); + } + } + + activation.put("attributes", merged); + // Promote to top level for bare-identifier access; framework keys win on collision. + merged.forEach(activation::putIfAbsent); + // Event UUID only as fallback — a user-supplied id attribute takes precedence. + activation.putIfAbsent("id", event.getId().toString()); + + return activation; + } + + /** + * Maximum recursion depth for {@link #normalizeValue}. Past this depth, strings are kept as + * plain strings rather than parsed as JSON (graceful degrade, mirror of Python {@code + * _MAX_NORMALIZE_DEPTH}). Prevents stack blow-up on adversarial nested JSON input. + */ + static final int MAX_NORMALIZE_DEPTH = 16; + + /** JSON-looking strings → Map/List; narrow numerics widened to long/double for CEL. */ + @SuppressWarnings("unchecked") + private static Object normalizeValue(Object value, int depth) { + if (value == null) { + return null; + } + if (value instanceof String) { + // Depth cap reached — keep the raw string (mirrors Python's _MAX_NORMALIZE_DEPTH). + if (depth >= MAX_NORMALIZE_DEPTH) { + return value; + } + String s = ((String) value).trim(); + if (s.length() >= 2 + && ((s.charAt(0) == '{' && s.charAt(s.length() - 1) == '}') + || (s.charAt(0) == '[' && s.charAt(s.length() - 1) == ']'))) { + try { + return normalizeValue(MAPPER.readValue(s, Object.class), depth + 1); + } catch (Exception ignored) { + // Not valid JSON — fall through as plain string. + } + } + return value; + } + if (value instanceof Map) { + Map src = (Map) value; + Map dst = new HashMap<>(src.size()); + for (Map.Entry entry : src.entrySet()) { + dst.put(entry.getKey(), normalizeValue(entry.getValue(), depth + 1)); + } + return dst; + } + if (value instanceof List) { + List src = (List) value; + List dst = new ArrayList<>(src.size()); + for (Object item : src) { + dst.add(normalizeValue(item, depth + 1)); + } + return dst; + } + if (value instanceof Byte || value instanceof Short || value instanceof Integer) { + return ((Number) value).longValue(); + } + if (value instanceof Float) { + return ((Float) value).doubleValue(); + } + if (value instanceof BigInteger) { + BigInteger bigInt = (BigInteger) value; + if (bigInt.bitLength() < 64) { + return bigInt.longValue(); + } + throw new IllegalArgumentException( + "CEL normalizeValue: BigInteger value overflows int64: " + bigInt); + } + if (value instanceof BigDecimal) { + BigDecimal bigDec = (BigDecimal) value; + LOG.debug( + "CEL normalizeValue: converting BigDecimal to double (possible precision loss): {}", + bigDec); + return bigDec.doubleValue(); + } + return value; + } +} diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/condition/CelExpressionFacade.java b/runtime/src/main/java/org/apache/flink/agents/runtime/condition/CelExpressionFacade.java new file mode 100644 index 000000000..350fef4ab --- /dev/null +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/condition/CelExpressionFacade.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.runtime.condition; + +import dev.cel.common.CelAbstractSyntaxTree; +import dev.cel.common.CelOptions; +import dev.cel.common.CelValidationException; +import dev.cel.common.ast.CelExpr; +import dev.cel.common.navigation.CelNavigableAst; +import dev.cel.common.types.CelType; +import dev.cel.common.types.MapType; +import dev.cel.common.types.SimpleType; +import dev.cel.compiler.CelCompiler; +import dev.cel.compiler.CelCompilerBuilder; +import dev.cel.compiler.CelCompilerFactory; +import dev.cel.parser.CelParser; +import dev.cel.parser.CelParserFactory; +import dev.cel.runtime.CelEvaluationException; +import dev.cel.runtime.CelRuntime; +import dev.cel.runtime.CelRuntimeFactory; +import org.apache.flink.agents.api.EventType; +import org.apache.flink.agents.plan.condition.CelMacroPolicy; +import org.apache.flink.annotation.VisibleForTesting; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * CEL Parse → Validate → Check → Program pipeline. Compiled {@link CelRuntime.Program} instances + * are cached process-wide by source string. + */ +public final class CelExpressionFacade { + + /** Immutable {@code constant name → value} map bound to the CEL {@code EventType} variable. */ + static final Map EVENT_TYPE_CONSTANTS = Map.copyOf(EventType.allConstants()); + + /** Cel-java has no wall-clock timeout, so these caps bound expression size/depth. */ + private static final CelOptions CEL_OPTIONS = + CelOptions.current() + .maxExpressionCodePointSize(8_192) + .maxParseRecursionDepth(32) + .comprehensionMaxIterations(1_000) + .build(); + + /** + * Only the custom has() macro is enabled; all others are rejected by {@link CelMacroPolicy}. + */ + private static final CelParser CEL_PARSER = + CelParserFactory.standardCelParserBuilder() + .setOptions(CEL_OPTIONS) + .addMacros(CelMacroPolicy.HAS) + .build(); + + /** + * Vars always declared at type-check; mirrors {@link CelConditionEvaluator#createActivation}. + */ + private static final Map BASE_VARS = + Map.of( + "type", + SimpleType.STRING, + "id", + SimpleType.DYN, + "EventType", + MapType.create(SimpleType.STRING, SimpleType.STRING), + "attributes", + MapType.create(SimpleType.STRING, SimpleType.DYN)); + + private static final CelRuntime CEL_RUNTIME = + CelRuntimeFactory.standardCelRuntimeBuilder().setOptions(CEL_OPTIONS).build(); + + /** Process-wide bounded LRU cache of compiled CEL programs, keyed by source string. */ + static final int PROGRAM_CACHE_MAX_SIZE = 1024; + + private static final Map PROGRAM_CACHE = + Collections.synchronizedMap( + new LinkedHashMap( + 256, 0.75f, /* accessOrder */ true) { + @Override + protected boolean removeEldestEntry( + Map.Entry eldest) { + return size() > PROGRAM_CACHE_MAX_SIZE; + } + }); + + private CelExpressionFacade() {} + + /** Parses {@code source} into an untyped AST (no type-check). */ + public static CelAbstractSyntaxTree parse(String source) { + if (source == null || source.isEmpty()) { + throw new IllegalArgumentException( + "CelExpressionFacade.parse: source must be non-null and non-empty"); + } + try { + return CEL_PARSER.parse(source).getAst(); + } catch (CelValidationException e) { + throw new IllegalArgumentException( + "Invalid CEL expression: \"" + source + "\" — " + e.getMessage(), e); + } + } + + /** Compiles {@code source} into a cached, thread-safe program. */ + public static CelRuntime.Program toProgram(String source) { + if (source == null || source.isEmpty()) { + throw new IllegalArgumentException( + "CelExpressionFacade.toProgram: source must be non-null and non-empty"); + } + return PROGRAM_CACHE.computeIfAbsent(source, CelExpressionFacade::compile); + } + + private static CelRuntime.Program compile(String source) { + CelAbstractSyntaxTree parsed; + try { + parsed = parse(source); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + "Invalid CEL condition expression: \"" + source + "\" — " + e.getMessage(), e); + } + return compileFromAst(source, parsed); + } + + private static CelRuntime.Program compileFromAst(String source, CelAbstractSyntaxTree parsed) { + CelMacroPolicy.findFirstDisallowedMacro(parsed) + .ifPresent( + macro -> { + throw new IllegalArgumentException( + CelMacroPolicy.formatDisallowedMessage(macro, source)); + }); + + validateEventTypeReferences(parsed); + + try { + CelAbstractSyntaxTree checked = compilerFor(parsed).check(parsed).getAst(); + return CEL_RUNTIME.createProgram(checked); + } catch (CelValidationException | CelEvaluationException e) { + throw new IllegalArgumentException( + "Invalid CEL condition expression: \"" + source + "\" — " + e.getMessage(), e); + } + } + + /** + * Builds a type-checker for {@code parsed}: base vars plus every identifier appearing in the + * expression declared as DYN. + */ + private static CelCompiler compilerFor(CelAbstractSyntaxTree parsed) { + CelCompilerBuilder builder = + CelCompilerFactory.standardCelCompilerBuilder().setOptions(CEL_OPTIONS); + BASE_VARS.forEach(builder::addVar); + for (String ident : collectIdentifiers(parsed)) { + if (!BASE_VARS.containsKey(ident)) { + builder.addVar(ident, SimpleType.DYN); + } + } + return builder.build(); + } + + /** + * Throws {@link IllegalArgumentException} when any {@code EventType.X} in {@code ast} names an + * unknown constant. + */ + private static void validateEventTypeReferences(CelAbstractSyntaxTree ast) { + CelNavigableAst.fromAst(ast) + .getRoot() + .allNodes() + .filter(node -> node.getKind() == CelExpr.ExprKind.Kind.SELECT) + .map(node -> node.expr().select()) + .filter( + select -> + select.operand().getKind() == CelExpr.ExprKind.Kind.IDENT + && "EventType".equals(select.operand().ident().name())) + .forEach( + select -> { + if (!EVENT_TYPE_CONSTANTS.containsKey(select.field())) { + throw new IllegalArgumentException( + "Unknown EventType constant: EventType." + select.field()); + } + }); + } + + private static Set collectIdentifiers(CelAbstractSyntaxTree ast) { + return CelNavigableAst.fromAst(ast) + .getRoot() + .allNodes() + .filter(node -> node.getKind() == CelExpr.ExprKind.Kind.IDENT) + .map(node -> node.expr().ident().name()) + .collect(Collectors.toSet()); + } + + @VisibleForTesting + static void clearProgramCacheForTests() { + PROGRAM_CACHE.clear(); + } + + @VisibleForTesting + static long programCacheSizeForTests() { + return PROGRAM_CACHE.size(); + } +} diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java index 665551bba..11f0d4216 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java @@ -266,7 +266,7 @@ private void processEvent(Object key, Event event) throws Exception { } // We then obtain the triggered action and add ActionTasks to the waiting processing // queue. - List triggerActions = eventRouter.getActionsTriggeredBy(event, agentPlan); + List triggerActions = eventRouter.getActionsTriggeredBy(event); if (triggerActions != null && !triggerActions.isEmpty()) { for (Action triggerAction : triggerActions) { stateManager.addActionTask(createActionTask(key, triggerAction, event)); diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java index 7eecc5d26..ab6350ecc 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java @@ -30,6 +30,7 @@ import org.apache.flink.agents.api.logger.LoggerType; import org.apache.flink.agents.plan.AgentPlan; import org.apache.flink.agents.plan.actions.Action; +import org.apache.flink.agents.runtime.condition.ActionRouter; import org.apache.flink.agents.runtime.eventlog.FileEventLogger; import org.apache.flink.agents.runtime.eventlog.Slf4jEventLogger; import org.apache.flink.agents.runtime.metrics.BuiltInMetrics; @@ -87,6 +88,13 @@ class EventRouter implements AutoCloseable { private final EventLogger eventLogger; private final List eventListeners; private final AgentPlan agentPlan; + + /** + * CEL-aware router; owns event → action resolution including CEL conditions. Built and opened + * at construction. + */ + private final ActionRouter actionRouter; + private StreamRecord reusedStreamRecord; private SegmentedQueue keySegmentQueue; private BuiltInMetrics builtInMetrics; @@ -101,6 +109,8 @@ class EventRouter implements AutoCloseable { this.inputIsJava = inputIsJava; this.eventLogger = eventLogger; this.eventListeners = new ArrayList<>(); + this.actionRouter = new ActionRouter(agentPlan); + this.actionRouter.open(); } /** @@ -216,8 +226,8 @@ OUT getOutputFromOutputEvent(Event event, PythonActionExecutor pythonActionExecu } } - List getActionsTriggeredBy(Event event, AgentPlan agentPlan) { - return agentPlan.getActionsTriggeredBy(event.getType()); + List getActionsTriggeredBy(Event event) { + return actionRouter.route(event); } /** @@ -308,6 +318,7 @@ private static EventLogger createEventLogger(AgentPlan agentPlan) { @Override public void close() throws Exception { + actionRouter.close(); if (eventLogger != null) { eventLogger.close(); } diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/condition/ActionRouterTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/condition/ActionRouterTest.java new file mode 100644 index 000000000..e9b857a6e --- /dev/null +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/condition/ActionRouterTest.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.runtime.condition; + +import org.apache.flink.agents.api.Event; +import org.apache.flink.agents.api.InputEvent; +import org.apache.flink.agents.api.OutputEvent; +import org.apache.flink.agents.api.configuration.AgentConfigOptions; +import org.apache.flink.agents.api.configuration.CelEvaluationFailurePolicy; +import org.apache.flink.agents.api.context.RunnerContext; +import org.apache.flink.agents.plan.AgentPlan; +import org.apache.flink.agents.plan.JavaFunction; +import org.apache.flink.agents.plan.actions.Action; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Unit tests for ActionRouter: typed-only, CEL-only, mixed-dedup, and lifecycle. */ +class ActionRouterTest { + + /** Stub handler — only its signature matters for {@link Action} construction. */ + public static void handlerStub(Event event, RunnerContext ctx) {} + + private ActionRouter router; + + @AfterEach + void tearDown() { + if (router != null) { + router.close(); + router = null; + } + // Reset facade cache so the "no CEL in plan ⇔ evaluator is null" invariant stays clean + // under repeated runs. + CelExpressionFacade.clearProgramCacheForTests(); + } + + // -- Helpers -- + + private static JavaFunction execStub() throws Exception { + return new JavaFunction( + ActionRouterTest.class.getName(), + "handlerStub", + new Class[] {Event.class, RunnerContext.class}); + } + + /** Builds an AgentPlan mirroring extraction-time indexing. */ + private static AgentPlan planOf(Action... actions) { + Map byName = new HashMap<>(); + Map> byType = new HashMap<>(); + for (Action a : actions) { + byName.put(a.getName(), a); + List typeNames = a.getListenEventTypes(); + for (String t : typeNames) { + byType.computeIfAbsent(t, k -> new java.util.ArrayList<>()).add(a); + } + } + return new AgentPlan(byName, byType); + } + + // -- Constructor + lifecycle -- + + @Test + void constructor_rejectsNullPlan() { + assertThatThrownBy(() -> new ActionRouter(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("agentPlan"); + } + + @Test + void close_isIdempotent() throws Exception { + Action a = new Action("celOnly", execStub(), List.of("type == 'x'"), null); + router = new ActionRouter(planOf(a)); + router.open(); + router.close(); + router.close(); // must not throw + } + + // -- Routing: typed-only fast path -- + + @Test + void route_typedOnly_returnsTypedHitsWithoutCelEvaluation() throws Exception { + Action onInput = new Action("onInput", execStub(), List.of(InputEvent.EVENT_TYPE), null); + Action onOutput = new Action("onOutput", execStub(), List.of(OutputEvent.EVENT_TYPE), null); + router = new ActionRouter(planOf(onInput, onOutput)); + router.open(); + + List matched = router.route(new InputEvent("hello")); + assertThat(matched).containsExactly(onInput); + } + + @Test + void route_typedOnly_returnsEmptyListForUnknownEventType() throws Exception { + Action onInput = new Action("onInput", execStub(), List.of(InputEvent.EVENT_TYPE), null); + router = new ActionRouter(planOf(onInput)); + router.open(); + + // Unknown event type, no CEL candidates → empty trigger set. + Event other = new Event("_some_other_event") {}; + assertThat(router.route(other)).isEmpty(); + } + + // -- Routing: CEL-only slow path -- + + @Test + void route_celOnly_firesActionWhenAnyCelExpressionMatches() throws Exception { + // Two CEL exprs on one action — within-action OR semantics; first matches. + Action onPriceOrType = + new Action( + "onPriceOrType", + execStub(), + Arrays.asList( + "type == '_input_event'", + "has(attributes.priceX) && attributes.priceX > 100"), + null); + router = new ActionRouter(planOf(onPriceOrType)); + router.open(); + + assertThat(router.route(new InputEvent("anything"))).containsExactly(onPriceOrType); + } + + @Test + void route_celOnly_doesNotFireWhenNoCelExpressionMatches() throws Exception { + // CEL gates on attributes.priceX — field absent in InputEvent attributes. + Action onPrice = + new Action( + "onPrice", + execStub(), + List.of("has(attributes.priceX) && attributes.priceX > 100"), + null); + router = new ActionRouter(planOf(onPrice)); + router.open(); + + assertThat(router.route(new InputEvent("anything"))).isEmpty(); + } + + @Test + void route_celOnly_shortCircuitsOnFirstMatchingExpression() throws Exception { + // Two matching exprs on one action — still fires it exactly once (OR semantics). + Action onMulti = + new Action( + "onMulti", + execStub(), + Arrays.asList("type == '_input_event'", "type == '_input_event'"), + null); + router = new ActionRouter(planOf(onMulti)); + router.open(); + + List matched = router.route(new InputEvent("x")); + assertThat(matched).containsExactly(onMulti); + } + + // -- Routing: mixed typed + CEL → dedup -- + + @Test + void route_mixedActionAppearsExactlyOnce() throws Exception { + // Type + CEL both match the same action — must dedupe to one entry (H1 regression). + Action mixed = + new Action( + "mixed", + execStub(), + Arrays.asList(InputEvent.EVENT_TYPE, "type == '_input_event'"), + null); + router = new ActionRouter(planOf(mixed)); + router.open(); + + List matched = router.route(new InputEvent("x")); + assertThat(matched).containsExactly(mixed); // exactly once + } + + @Test + void route_typedAndCelMix_preservesTypedFirstThenCelOrdering() throws Exception { + // Contract: typed hits ordered before CEL hits. + Action pureType = new Action("pureType", execStub(), List.of(InputEvent.EVENT_TYPE), null); + Action pureCel = new Action("pureCel", execStub(), List.of("type == '_input_event'"), null); + router = new ActionRouter(planOf(pureType, pureCel)); + router.open(); + + List matched = router.route(new InputEvent("x")); + assertThat(matched).containsExactly(pureType, pureCel); + } + + // -- Routing: CEL failure policy from plan config -- + + @Test + void route_failPolicyFromConfig_throwsOnConditionEvaluationError() throws Exception { + // Compiles fine; errors at runtime because `attributes.nonexistent` is unset. + Action a = new Action("celFail", execStub(), List.of("attributes.nonexistent > 3"), null); + AgentPlan plan = planOf(a); + plan.getConfig() + .set( + AgentConfigOptions.CEL_EVALUATION_FAILURE_POLICY, + CelEvaluationFailurePolicy.FAIL); + + router = new ActionRouter(plan); + router.open(); + + assertThatThrownBy(() -> router.route(new Event("any_type") {})) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("CEL condition evaluation failed"); + } + + @Test + void route_defaultWarnAndSkip_swallowsConditionEvaluationError() throws Exception { + // Same expression, same event — but no policy set in config. Default must remain + // WARN_AND_SKIP, so route() returns empty rather than throwing. + Action a = + new Action("celWarnSkip", execStub(), List.of("attributes.nonexistent > 3"), null); + router = new ActionRouter(planOf(a)); + router.open(); + + assertThat(router.route(new Event("any_type") {})).isEmpty(); + } +} diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/condition/CelConditionEvaluatorTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/condition/CelConditionEvaluatorTest.java new file mode 100644 index 000000000..99f2f5aec --- /dev/null +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/condition/CelConditionEvaluatorTest.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.runtime.condition; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import org.apache.flink.agents.api.Event; +import org.apache.flink.agents.api.configuration.CelEvaluationFailurePolicy; +import org.apache.flink.agents.plan.condition.ParsedCondition.CelExpression; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Unit tests for {@link CelConditionEvaluator}. */ +class CelConditionEvaluatorTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new YAMLFactory()); + + private CelConditionEvaluator evaluator; + + /** Test case from conformance JSON. */ + static class ConformanceTestCase { + String name; + String condition; + Map event; + boolean expected; + + public String getName() { + return name; + } + + public String getCondition() { + return condition; + } + + public Map getEvent() { + return event; + } + + public boolean isExpected() { + return expected; + } + } + + @BeforeEach + void setUp() throws IOException { + evaluator = new CelConditionEvaluator(); + // Pre-compile every condition from the conformance JSON. + List testCases = loadConformanceCases(); + Collection conditions = new ArrayList<>(); + for (ConformanceTestCase tc : testCases) { + if (tc.getCondition() != null && !tc.getCondition().isEmpty()) { + conditions.add(new CelExpression(tc.getCondition())); + } + } + evaluator.initPrograms(conditions); + } + + private static List loadConformanceCases() throws IOException { + try (InputStream is = + CelConditionEvaluatorTest.class.getResourceAsStream( + "/cel_conformance_cases.yaml")) { + if (is == null) { + throw new IOException("cel_conformance_cases.yaml not found"); + } + return OBJECT_MAPPER.readValue(is, new TypeReference>() {}); + } + } + + private Event buildEvent(Map eventData) { + String id = (String) eventData.get("id"); + String type = (String) eventData.get("type"); + @SuppressWarnings("unchecked") + Map attributes = + (Map) eventData.getOrDefault("attributes", new HashMap<>()); + UUID uuid; + try { + // Reuse the raw id when it's a valid UUID so id-based filters work in conformance + // cases. + uuid = UUID.fromString(id); + } catch (IllegalArgumentException e) { + uuid = UUID.nameUUIDFromBytes(id.getBytes()); + } + Event event = new Event(uuid, type, attributes); + return event; + } + + static Stream conformanceCases() throws IOException { + return loadConformanceCases().stream(); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("conformanceCases") + void testConformanceCases(ConformanceTestCase testCase) { + Event event = buildEvent(testCase.getEvent()); + Map activation = evaluator.createActivation(event); + CelExpression expr = + (testCase.getCondition() == null || testCase.getCondition().isEmpty()) + ? null + : new CelExpression(testCase.getCondition()); + boolean result = evaluator.evaluate(expr, activation); + assertThat(result).isEqualTo(testCase.isExpected()); + } + + @Test + void testFailPolicyThrowsOnEvaluationError() { + CelConditionEvaluator failEvaluator = + new CelConditionEvaluator(CelEvaluationFailurePolicy.FAIL); + // Pre-compile a condition that will fail at runtime + CelExpression cond = new CelExpression("attributes.nonexistent > 3"); + failEvaluator.initPrograms(List.of(cond)); + + Event event = new Event("test_type"); + Map activation = failEvaluator.createActivation(event); + + assertThatThrownBy(() -> failEvaluator.evaluate(cond, activation)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("CEL condition evaluation failed"); + } + + @Test + void testInitProgramsInvalidExpressionThrows() { + CelConditionEvaluator testEvaluator = new CelConditionEvaluator(); + assertThatThrownBy(() -> testEvaluator.initPrograms(List.of(new CelExpression("type ==")))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid CEL condition expression"); + } + + @Test + void testNormalizeValueBigIntegerWithinLongRange() { + // BigInteger within Long range should pass through normalizeValue cleanly. + CelConditionEvaluator testEvaluator = new CelConditionEvaluator(); + CelExpression condition = new CelExpression("attributes.amount > 1000"); + testEvaluator.initPrograms(List.of(condition)); + + Event event = new Event("test_type"); + event.getAttributes().put("amount", java.math.BigInteger.valueOf(9999999999999L)); + + Map activation = testEvaluator.createActivation(event); + assertThat(testEvaluator.evaluate(condition, activation)).isTrue(); + } + + @Test + void testNormalizeValueBigIntegerOverflowThrows() { + // BigInteger exceeding Long.MAX_VALUE should throw IllegalArgumentException + Event event = new Event("test_type"); + java.math.BigInteger overflow = + java.math.BigInteger.valueOf(Long.MAX_VALUE).multiply(java.math.BigInteger.TEN); + event.getAttributes().put("amount", overflow); + + assertThatThrownBy(() -> evaluator.createActivation(event)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("overflows int64"); + } + + @Test + void testNormalizeValueBigDecimalConvertedToDouble() { + CelConditionEvaluator testEvaluator = new CelConditionEvaluator(); + CelExpression condition = new CelExpression("attributes.score > 3.14"); + testEvaluator.initPrograms(List.of(condition)); + + Event event = new Event("test_type"); + event.getAttributes().put("score", new java.math.BigDecimal("99.99")); + + Map activation = testEvaluator.createActivation(event); + assertThat(testEvaluator.evaluate(condition, activation)).isTrue(); + } + + @Test + void evaluate_warnAndSkipReturnsFalseOnRuntimeError() { + // Positive assertion for the default WARN_AND_SKIP policy: runtime errors are swallowed + // and the action is treated as not matching, with the failure surfaced via WARN log. + CelConditionEvaluator e = new CelConditionEvaluator(); + CelExpression cond = new CelExpression("attributes.nope > 3"); + e.initPrograms(List.of(cond)); + + Event event = new Event("test_type"); + Map activation = e.createActivation(event); + assertThat(e.evaluate(cond, activation)).isFalse(); + } + + // ----- Nested has() short-circuit (has(a.b.c) desugaring) ----- + // These use the FAIL policy on purpose: under the default WARN_AND_SKIP a thrown + // CelEvaluationException is swallowed and also reported as false, so it cannot tell a + // genuine false apart from a silently-skipped error. FAIL re-throws, so asserting false + // here proves the macro short-circuits a missing level instead of evaluating an operand + // that errors with "key '...' is not present in map". + + @Test + void evaluate_nestedHasIntermediateMissing_returnsFalseWithoutThrowing() { + CelConditionEvaluator failEvaluator = + new CelConditionEvaluator(CelEvaluationFailurePolicy.FAIL); + CelExpression cond = new CelExpression("has(attributes.meta.source)"); + failEvaluator.initPrograms(List.of(cond)); + + // No attributes ⇒ intermediate 'meta' absent. The old single-testOnly desugaring + // evaluated attributes.meta and threw; the chain desugaring short-circuits to false. + Event event = new Event("test_type"); + Map activation = failEvaluator.createActivation(event); + assertThat(failEvaluator.evaluate(cond, activation)).isFalse(); + } + + @Test + void evaluate_nestedHasBareRootWhollyAbsent_returnsFalseWithoutThrowing() { + CelConditionEvaluator failEvaluator = + new CelConditionEvaluator(CelEvaluationFailurePolicy.FAIL); + CelExpression cond = new CelExpression("has(value.user.tier)"); + failEvaluator.initPrograms(List.of(cond)); + + // Bare root 'value' is wholly absent: the has(value) guard short-circuits before the + // unbound 'value' lookup can throw. + Event event = new Event("test_type"); + Map activation = failEvaluator.createActivation(event); + assertThat(failEvaluator.evaluate(cond, activation)).isFalse(); + } + + @Test + void evaluate_nestedHasFullPathPresent_returnsTrue() { + CelConditionEvaluator failEvaluator = + new CelConditionEvaluator(CelEvaluationFailurePolicy.FAIL); + CelExpression cond = new CelExpression("has(attributes.meta.source)"); + failEvaluator.initPrograms(List.of(cond)); + + Event event = new Event("test_type"); + Map meta = new HashMap<>(); + meta.put("source", "api"); + event.getAttributes().put("meta", meta); + Map activation = failEvaluator.createActivation(event); + assertThat(failEvaluator.evaluate(cond, activation)).isTrue(); + } +} diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/condition/CelExpressionFacadeTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/condition/CelExpressionFacadeTest.java new file mode 100644 index 000000000..4eb41b8d0 --- /dev/null +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/condition/CelExpressionFacadeTest.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.runtime.condition; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import dev.cel.runtime.CelEvaluationException; +import dev.cel.runtime.CelRuntime; +import org.apache.flink.agents.plan.condition.CelMacroPolicy; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link CelExpressionFacade}: parse validation and toProgram compilation + caching. */ +class CelExpressionFacadeTest { + + @BeforeEach + void clearCache() { + // Per-process cache keyed by source string — clear between tests so cache assertions are + // deterministic. + CelExpressionFacade.clearProgramCacheForTests(); + } + + @ParameterizedTest(name = "rejects disallowed macro {0}") + @ValueSource( + strings = { + "[1, 2, 3].all(x, x > 0)", + "[1, 2, 3].exists(x, x > 0)", + "attributes.tags.exists(x, x == 'a')", + "has(attributes.x) && attributes.list.all(t, t > 0)" + }) + void toProgram_rejectsDisallowedMacroCalls(String source) { + assertThatThrownBy(() -> CelExpressionFacade.toProgram(source)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("disallowed macro"); + } + + @Test + void disallowedMacroMessage_isByteIdenticalToPython() { + // Must match Python ``cel_facade._format_disallowed_message`` byte-for-byte. CI + // ``cel-conformance.yml`` validates this alignment via diff. + String expected = + "CEL expression uses disallowed macro 'all': \"test_source\". " + + "Only allows: [has]."; + assertThat(CelMacroPolicy.formatDisallowedMessage("all", "test_source")) + .isEqualTo(expected); + } + + @Test + void toProgram_acceptsMacroNameInStringLiteral() { + // "exists" inside a string literal is not a macro call — should compile fine. + CelRuntime.Program program = CelExpressionFacade.toProgram("type == 'exists_check_event'"); + assertThat(program).isNotNull(); + } + + @Test + void toProgram_acceptsMacroNameAsFieldSubstring() { + // "existing" contains "exist" but is not a macro call. + CelRuntime.Program program = CelExpressionFacade.toProgram("existing == true"); + assertThat(program).isNotNull(); + } + + @Test + void toProgram_doesNotMisclassifyFieldNamedAllAsDisallowedMacro() { + // `attributes.all` is a field access, not a macro call. AST-based check (vs. old + // error-message string match) won't false-positive on the substring "all". + CelRuntime.Program program = CelExpressionFacade.toProgram("attributes.all == 'value'"); + assertThat(program).isNotNull(); + } + + @Test + void toProgram_findsDisallowedMacroInDeeplyNestedExpression() { + // Macro buried inside arg of another macro arg — AST walker must find it. + assertThatThrownBy( + () -> + CelExpressionFacade.toProgram( + "has(attributes.x) && (attributes.y > 0 || attributes.tags.exists(t, t == 'a'))")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("'exists'"); + } + + @Test + void parse_rejectsHasOnNonFieldArgument() { + // has(1 + 1) is neither a field selection nor an attribute name. + assertThatThrownBy(() -> CelExpressionFacade.toProgram("has(1 + 1)")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("invalid argument to has() macro"); + } + + // -- EventType map access -- + + @Test + void toProgram_unknownEventTypeConstantFailsAtPlanLoad() { + assertThatThrownBy(() -> CelExpressionFacade.toProgram("type == EventType.NotARealEvent")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unknown EventType constant: EventType.NotARealEvent"); + } + + // -- Dynamic identifier declaration -- + + @Test + void toProgram_userSuppliedIdSupportsNonStringComparison() throws CelEvaluationException { + // id is declared DYN: users may supply numeric ids via attributes. + CelRuntime.Program program = CelExpressionFacade.toProgram("id > 0"); + Map activation = new HashMap<>(); + activation.put("id", 42L); + assertThat(program.eval(activation)).isEqualTo(true); + } + + // -- toProgram(String) -- + + @Test + void toProgram_fromString_returnsRunnableProgram() throws CelEvaluationException { + CelRuntime.Program program = CelExpressionFacade.toProgram("type == 'a'"); + Map activation = new HashMap<>(); + activation.put("id", "42"); + activation.put("type", "a"); + activation.put("attributes", new HashMap()); + Object result = program.eval(activation); + assertThat(result).isEqualTo(true); + } + + @Test + void toProgram_fromString_throwsOnInvalidSource() { + assertThatThrownBy(() -> CelExpressionFacade.toProgram("type ==")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid CEL condition expression"); + } + + @Test + void toProgram_fromString_throwsOnNullOrEmpty() { + assertThatThrownBy(() -> CelExpressionFacade.toProgram((String) null)) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> CelExpressionFacade.toProgram("")) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void toProgram_fromString_isCachedBySource() { + CelRuntime.Program first = CelExpressionFacade.toProgram("type == 'x'"); + CelRuntime.Program second = CelExpressionFacade.toProgram("type == 'x'"); + assertThat(second).isSameAs(first); // cache hit + } + + // -- String functions: contains / startsWith / endsWith / matches -- + + static Stream stringFunctionCases() { + return Stream.of( + Arguments.of("name.contains(\"foo\")", "name", "hello_foo_bar", true), + Arguments.of("name.contains(\"xyz\")", "name", "hello_foo_bar", false), + Arguments.of("name.contains(\"\")", "name", "anything", true), + Arguments.of("name.startsWith(\"pre_\")", "name", "pre_order_123", true), + Arguments.of("name.startsWith(\"pre_\")", "name", "order_pre_123", false), + Arguments.of("name.startsWith(\"\")", "name", "anything", true), + Arguments.of("name.endsWith(\".json\")", "name", "config.json", true), + Arguments.of("name.endsWith(\".json\")", "name", "config.yaml", false), + Arguments.of("name.matches(\"^order_[0-9]+$\")", "name", "order_42", true), + Arguments.of("name.matches(\"^order_[0-9]+$\")", "name", "order_abc", false), + Arguments.of("name.matches(\"[\")", "name", "test", CelEvaluationException.class)); + } + + @ParameterizedTest(name = "{0} on {1}={2} → {3}") + @MethodSource("stringFunctionCases") + void stringFunctions_evaluateOrThrow( + String source, String attrName, String attrValue, Object expected) + throws CelEvaluationException { + CelRuntime.Program program = CelExpressionFacade.toProgram(source); + Map attrs = new HashMap<>(); + attrs.put(attrName, attrValue); + Map activation = new HashMap<>(); + activation.put("id", "1"); + activation.put("type", "t"); + activation.put("attributes", attrs); + // Flattened contract: bare identifiers read from the activation top level. + activation.put(attrName, attrValue); + + if (expected instanceof Class) { + @SuppressWarnings("unchecked") + Class exceptionClass = (Class) expected; + assertThatThrownBy(() -> program.eval(activation)).isInstanceOf(exceptionClass); + } else { + assertThat(program.eval(activation)).isEqualTo(expected); + } + } + + // -- Fixture-driven tests from disallowed_macros.yaml -- + + private static final String FIXTURE_RESOURCE = "/disallowed_macros.yaml"; + + @SuppressWarnings("unchecked") + private static Map> loadMacroFixture() throws IOException { + ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory()); + InputStream is = CelExpressionFacadeTest.class.getResourceAsStream(FIXTURE_RESOURCE); + assertThat(is) + .as("Shared fixture resource must exist on classpath: %s", FIXTURE_RESOURCE) + .isNotNull(); + return yamlMapper.readValue(is, Map.class); + } + + static Stream rejectFixtureCases() throws IOException { + return loadMacroFixture().get("reject").stream().map(Arguments::of); + } + + static Stream acceptFixtureCases() throws IOException { + return loadMacroFixture().get("accept").stream().map(Arguments::of); + } + + @ParameterizedTest(name = "REJECT: {0}") + @MethodSource("rejectFixtureCases") + void fixture_rejectDisallowedMacro(String expression) { + assertThatThrownBy(() -> CelExpressionFacade.toProgram(expression)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("disallowed macro"); + } + + @ParameterizedTest(name = "ACCEPT: {0}") + @MethodSource("acceptFixtureCases") + void fixture_acceptAllowedExpression(String expression) { + CelRuntime.Program program = CelExpressionFacade.toProgram(expression); + assertThat(program).isNotNull(); + } + + // -- Resource guards -- + + @Nested + class ResourceGuards { + @Test + void parse_oversizedSource_rejected() { + String huge = "true" + " || true".repeat(2000); // ~16K chars + assertThatThrownBy(() -> CelExpressionFacade.parse(huge)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid CEL expression"); + } + + @Test + void parse_overlyNestedExpression_rejected() { + // Parenthesis nesting reliably hits maxParseRecursionDepth (member-access chains + // are left-recursive in ANTLR4 and parsed iteratively, so they don't). + String deep = "(".repeat(40) + "true" + ")".repeat(40); + assertThatThrownBy(() -> CelExpressionFacade.parse(deep)) + .isInstanceOf(IllegalArgumentException.class); + } + } +} diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/condition/CelResourceLimitsTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/condition/CelResourceLimitsTest.java new file mode 100644 index 000000000..84b2960e2 --- /dev/null +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/condition/CelResourceLimitsTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.runtime.condition; + +import org.apache.flink.agents.api.Event; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for CEL runtime resource limits: PROGRAM_CACHE LRU + normalize depth cap. */ +class CelResourceLimitsTest { + + @BeforeEach + void clearCache() { + CelExpressionFacade.clearProgramCacheForTests(); + } + + @Test + void programCache_evictsAtMaxSize() { + // Insert MAX_SIZE + 1 unique sources; cache must cap exactly at MAX_SIZE + // (LinkedHashMap.removeEldestEntry is strict, unlike Caffeine's approximate TinyLFU). + for (int i = 0; i <= CelExpressionFacade.PROGRAM_CACHE_MAX_SIZE; i++) { + CelExpressionFacade.toProgram("attributes.field_" + i + " == " + i); + } + long size = CelExpressionFacade.programCacheSizeForTests(); + assertThat(size) + .as( + "program cache must cap at exactly %d, got %d", + CelExpressionFacade.PROGRAM_CACHE_MAX_SIZE, size) + .isEqualTo(CelExpressionFacade.PROGRAM_CACHE_MAX_SIZE); + } + + @Test + @SuppressWarnings("unchecked") + void normalizeValue_preservesDeepJsonStringAsPlain() { + // Build a nested map of depth (MAX + 2) with a JSON-shaped string at the bottom. + String innerJson = "{\"deep_key\":\"deep_value\"}"; + Object inner = innerJson; + for (int i = 0; i <= CelConditionEvaluator.MAX_NORMALIZE_DEPTH; i++) { + Map wrap = new HashMap<>(); + wrap.put("wrap", inner); + inner = wrap; + } + Event event = new Event("_input_event", (Map) inner); + + CelConditionEvaluator evaluator = new CelConditionEvaluator(); + Map activation = evaluator.createActivation(event); + Map attrs = (Map) activation.get("attributes"); + + Object cur = attrs; + for (int i = 0; i <= CelConditionEvaluator.MAX_NORMALIZE_DEPTH; i++) { + assertThat(cur).as("layer %d must still be a Map", i).isInstanceOf(Map.class); + Map m = (Map) cur; + cur = m.get("wrap"); + } + assertThat(cur) + .as("deep JSON string must NOT be parsed once past depth limit") + .isInstanceOf(String.class) + .isEqualTo(innerJson); + } + + @Test + @SuppressWarnings("unchecked") + void normalizeValue_stillExpandsShallowJsonString() { + Map attrs = new HashMap<>(); + attrs.put("input", "{\"foo\":\"bar\",\"n\":42}"); + Event event = new Event("_input_event", attrs); + + CelConditionEvaluator evaluator = new CelConditionEvaluator(); + Map activation = evaluator.createActivation(event); + Map activationAttrs = (Map) activation.get("attributes"); + Object input = activationAttrs.get("input"); + assertThat(input) + .as("shallow JSON-shaped string must still be parsed into a Map") + .isInstanceOf(Map.class); + } +} diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/EventRouterTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/EventRouterTest.java index 97f994b42..da6bafd14 100644 --- a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/EventRouterTest.java +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/EventRouterTest.java @@ -72,7 +72,7 @@ void getActionsTriggeredByReturnsActionsForJavaEventClass() throws Exception { EventRouter router = new EventRouter<>(plan, /* inputIsJava */ true); - List triggered = router.getActionsTriggeredBy(new InputEvent(0L), plan); + List triggered = router.getActionsTriggeredBy(new InputEvent(0L)); assertThat(triggered).containsExactly(action); }