diff --git a/snuba/web/rpc/common/common.py b/snuba/web/rpc/common/common.py index 3d7c27328c6..d196a66e802 100644 --- a/snuba/web/rpc/common/common.py +++ b/snuba/web/rpc/common/common.py @@ -50,6 +50,25 @@ from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException +def _in_or_has(value: Expression, array: Expression, *, as_has: bool) -> FunctionCall: + """Build the membership test ``value IN array``. + + Returns ``in(value, array)`` by default, so ClickHouse keeps a prepared set for + partition/primary-key pruning — correct for WHERE clauses. When ``as_has`` is set, + returns ``has(array, value)`` instead, for expressions that land in a SELECT clause + / projection / aggregate condition / ``HAVING``. There the prepared set's + server-generated ``__set___`` identifier leaks into the + result-block column name and is not byte-stable across mixed-version distributed + ClickHouse nodes, which fails the read with + ``Code: 10 ... Not found column ... While executing Remote.`` (SNUBA-9W6, SNUBA-B82). + ``has`` over a constant array keeps the array inline in the column name and is + equivalent to ``value IN (array)`` for scalar membership. + """ + if as_has: + return f.has(array, value) + return in_cond(value, array) + + def attribute_key_to_expression(attr_key: AttributeKey) -> Expression: """Convert an AttributeKey proto to a Snuba Expression. @@ -412,16 +431,18 @@ def _analyzer_safe_in_expression( negated: bool, ignore_case: bool = False, guard: bool = True, + membership_as_has: bool = False, ) -> Expression: """``IN`` / ``NOT IN`` as ``[not] in(value, set)``, wrapped in ``and(exists, ...)`` only when ``guard`` is set (see ``_map_backed_operands`` and ``_comparison_can_match_column_default``). The value lists carry only scalars, so the set never contains NULL — the legacy ``has(set, NULL)`` branch - was always ``false``.""" + was always ``false``. ``membership_as_has`` emits ``has(set, value)`` instead of + ``in(value, set)`` for SELECT-clause use (see ``_in_or_has``).""" value, exists = _map_backed_operands(k) if ignore_case: value = f.lower(value) - membership = in_cond(value, v_expression) + membership = _in_or_has(value, v_expression, as_has=membership_as_has) present = and_cond(exists, membership) if guard else membership return not_cond(present) if negated else present @@ -595,6 +616,8 @@ def _type_array_includes_scalar_expression( def _any_attribute_filter_to_expression( filt: AnyAttributeFilter, + *, + membership_as_has: bool = False, ) -> Expression: """Build an expression that searches across attribute values. @@ -605,7 +628,10 @@ def _any_attribute_filter_to_expression( arrayExists(x -> (x, value), mapValues(column)) - wrapped with NOT(...) for negative ops. + wrapped with NOT(...) for negative ops. ``membership_as_has`` builds the IN/NOT_IN + comparison as ``has(array, x)`` rather than ``in(x, array)`` so the (arrayExists'd) + expression carries no ``__set_*`` prepared-set identifier — required when it lands + in a SELECT-clause aggregate on a mixed-version distributed read (see ``_in_or_has``). """ # 1. Extract and validate the comparison value v = filt.value @@ -683,12 +709,13 @@ def _any_attribute_filter_to_expression( lowered = [literal(s.lower()) for s in v.val_str_array.values] else: lowered = [literal(elem.val_str.lower()) for elem in v.val_array.values] - comparison = in_cond( + comparison = _in_or_has( f.lower(x), literals_array(None, lowered), + as_has=membership_as_has, ) else: - comparison = in_cond(x, v_expression) + comparison = _in_or_has(x, v_expression, as_has=membership_as_has) else: raise BadSnubaRPCRequestException( f"Unsupported any_attribute_filter op: {AnyAttributeFilter.Op.Name(filt.op)}" @@ -707,11 +734,18 @@ def _any_attribute_filter_to_expression( def trace_item_filters_to_expression( item_filter: TraceItemFilter, attribute_key_to_expression: Callable[[AttributeKey], Expression], + membership_as_has: bool = False, ) -> Expression: """ Trace Item Filters are things like (span.id=12345 AND start_timestamp >= "june 4th, 2024") This maps those filters into an expression which can be used in a WHERE clause :param item_filter: + :param membership_as_has: build ``IN``/``NOT IN`` membership as ``has(array, x)`` + rather than ``x IN (array)``. Pass ``True`` only when the result lands in a + SELECT clause / projection / aggregate condition / ``HAVING`` — there a constant + ``IN`` set leaks an unstable ``__set_*`` identifier into the result-block column + name and breaks mixed-version distributed reads (see ``_in_or_has``). Leave the + default for WHERE clauses, where the prepared ``IN`` set drives pruning. :return: """ if item_filter.HasField("and_filter"): @@ -719,9 +753,14 @@ def trace_item_filters_to_expression( if len(filters) == 0: return literal(True) elif len(filters) == 1: - return trace_item_filters_to_expression(filters[0], attribute_key_to_expression) + return trace_item_filters_to_expression( + filters[0], attribute_key_to_expression, membership_as_has + ) return and_cond( - *(trace_item_filters_to_expression(x, attribute_key_to_expression) for x in filters) + *( + trace_item_filters_to_expression(x, attribute_key_to_expression, membership_as_has) + for x in filters + ) ) if item_filter.HasField("or_filter"): @@ -729,9 +768,14 @@ def trace_item_filters_to_expression( if len(filters) == 0: raise BadSnubaRPCRequestException("Invalid trace item filter, empty 'or' clause") elif len(filters) == 1: - return trace_item_filters_to_expression(filters[0], attribute_key_to_expression) + return trace_item_filters_to_expression( + filters[0], attribute_key_to_expression, membership_as_has + ) return or_cond( - *(trace_item_filters_to_expression(x, attribute_key_to_expression) for x in filters) + *( + trace_item_filters_to_expression(x, attribute_key_to_expression, membership_as_has) + for x in filters + ) ) if item_filter.HasField("not_filter"): @@ -740,11 +784,18 @@ def trace_item_filters_to_expression( raise BadSnubaRPCRequestException("Invalid trace item filter, empty 'not' clause") elif len(filters) == 1: return not_cond( - trace_item_filters_to_expression(filters[0], attribute_key_to_expression) + trace_item_filters_to_expression( + filters[0], attribute_key_to_expression, membership_as_has + ) ) return not_cond( and_cond( - *(trace_item_filters_to_expression(x, attribute_key_to_expression) for x in filters) + *( + trace_item_filters_to_expression( + x, attribute_key_to_expression, membership_as_has + ) + for x in filters + ) ) ) @@ -965,10 +1016,11 @@ def trace_item_filters_to_expression( negated=False, ignore_case=ignore_case, guard=_comparison_can_match_column_default(k.type, v, value_type), + membership_as_has=membership_as_has, ) if ignore_case: k_expression = f.lower(k_expression) - expr = in_cond(k_expression, v_expression) + expr = _in_or_has(k_expression, v_expression, as_has=membership_as_has) expr_with_null = or_cond( expr, and_cond(f.isNull(k_expression), f.has(v_expression, literal(None))), @@ -999,10 +1051,11 @@ def trace_item_filters_to_expression( negated=True, ignore_case=ignore_case, guard=_comparison_can_match_column_default(k.type, v, value_type), + membership_as_has=membership_as_has, ) if ignore_case: k_expression = f.lower(k_expression) - expr = not_cond(in_cond(k_expression, v_expression)) + expr = not_cond(_in_or_has(k_expression, v_expression, as_has=membership_as_has)) expr_with_null = or_cond( expr, and_cond( @@ -1027,7 +1080,9 @@ def trace_item_filters_to_expression( if item_filter.HasField("any_attribute_filter"): if not state.get_int_config("enable_any_attribute_filter", 1): return literal(True) - return _any_attribute_filter_to_expression(item_filter.any_attribute_filter) + return _any_attribute_filter_to_expression( + item_filter.any_attribute_filter, membership_as_has=membership_as_has + ) return literal(True) diff --git a/snuba/web/rpc/v1/endpoint_get_traces.py b/snuba/web/rpc/v1/endpoint_get_traces.py index a7a7a9a18f1..7096690d012 100644 --- a/snuba/web/rpc/v1/endpoint_get_traces.py +++ b/snuba/web/rpc/v1/endpoint_get_traces.py @@ -560,6 +560,11 @@ def _get_trace_item_filter_expressions( ) -> dict[TraceItemType.ValueType, Expression]: """ Returns a dict mapping item types to a filter expression for that item type. + + These expressions are only ever embedded in the ``filtered_item_count`` countIf + in the SELECT clause, so membership is built as ``has(array, x)`` + (``membership_as_has``) to keep the result-block column name stable across + mixed-version distributed ClickHouse nodes (see common._in_or_has). """ filters_by_item_type: dict[TraceItemType.ValueType, list[TraceItemFilter]] = defaultdict( list @@ -578,6 +583,7 @@ def _get_trace_item_filter_expressions( ), ), attribute_key_to_expression, + membership_as_has=True, ), ) diff --git a/snuba/web/rpc/v1/resolvers/common/aggregation.py b/snuba/web/rpc/v1/resolvers/common/aggregation.py index 5cc7a0135cb..674e0cc69ad 100644 --- a/snuba/web/rpc/v1/resolvers/common/aggregation.py +++ b/snuba/web/rpc/v1/resolvers/common/aggregation.py @@ -66,8 +66,12 @@ def _get_condition_in_aggregation( ) -> Expression: condition_in_aggregation: Expression = literal(True) if isinstance(aggregation, AttributeConditionalAggregation): + # This condition is embedded in SELECT-clause conditional aggregates (countIf, + # sumIf, ...), so build any constant IN-set as has(array, x) to keep the + # result-block column name stable across mixed-version ClickHouse nodes on + # distributed reads (membership_as_has, see common._in_or_has). condition_in_aggregation = trace_item_filters_to_expression( - aggregation.filter, attribute_key_to_expression + aggregation.filter, attribute_key_to_expression, membership_as_has=True ) return condition_in_aggregation diff --git a/snuba/web/rpc/v1/resolvers/common/cross_item_queries.py b/snuba/web/rpc/v1/resolvers/common/cross_item_queries.py index fc6fb1389d7..b51ddf6adef 100644 --- a/snuba/web/rpc/v1/resolvers/common/cross_item_queries.py +++ b/snuba/web/rpc/v1/resolvers/common/cross_item_queries.py @@ -65,7 +65,13 @@ def get_trace_ids_sql_for_cross_item_query( Returns: tuple: (sql_string, query_result) where query_result contains metadata like sampling_tier """ + # filter_expressions feed the WHERE or_cond (keep the prepared IN-sets for + # partition/primary-key pruning); having_filter_expressions feed the HAVING countIf + # (a SELECT-clause aggregate), where the membership must be has(array, x) so its + # result-block column name is stable across mixed-version ClickHouse nodes + # (membership_as_has, see common._in_or_has). filter_expressions = [] + having_filter_expressions = [] if trace_filters: converted_trace_filters = [trace_filter for trace_filter in trace_filters] if isinstance(trace_filters[0], GetTracesRequest.TraceFilter): @@ -77,19 +83,30 @@ def get_trace_ids_sql_for_cross_item_query( ] for trace_filter in converted_trace_filters: + item_type_cond = f.equals(column("item_type"), trace_filter.item_type) filter_expressions.append( and_cond( - f.equals(column("item_type"), trace_filter.item_type), + item_type_cond, trace_item_filters_to_expression( trace_filter.filter, attribute_key_to_expression, ), ) ) + having_filter_expressions.append( + and_cond( + item_type_cond, + trace_item_filters_to_expression( + trace_filter.filter, + attribute_key_to_expression, + membership_as_has=True, + ), + ) + ) if len(filter_expressions) > 1: trace_item_filters_and_expression = and_cond( - *[f.greater(f.countIf(expression), 0) for expression in filter_expressions] + *[f.greater(f.countIf(expression), 0) for expression in having_filter_expressions] ) trace_item_filters_or_expression = or_cond(*filter_expressions) elif len(filter_expressions) == 1: diff --git a/tests/web/rpc/test_aggregation.py b/tests/web/rpc/test_aggregation.py index 8c590b7c05f..cfb938ec7d8 100644 --- a/tests/web/rpc/test_aggregation.py +++ b/tests/web/rpc/test_aggregation.py @@ -6,10 +6,13 @@ ) from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ( AttributeKey, + AttributeValue, ExtrapolationMode, Function, + IntArray, Reliability, ) +from sentry_protos.snuba.v1.trace_item_filter_pb2 import ComparisonFilter, TraceItemFilter from snuba.query.expressions import Column, FunctionCall, JsonPath, Literal, SubscriptableReference from snuba.web.rpc.common.common import ( @@ -328,3 +331,57 @@ def test_aggregation_to_expression_sum_type_array_raises() -> None: ) with pytest.raises(BadSnubaRPCRequestException, match="not supported for array attribute"): aggregation_to_expression(agg, attribute_key_to_expression) + + +def test_conditional_aggregation_uses_has_for_in_sets() -> None: + """Regression guard for SNUBA-9W6 / SNUBA-A1W (mixed-version distributed reads). + + A conditional aggregation's filter is embedded in a SELECT-clause ``countIf``/ + ``sumIf``. A constant ``IN`` set there bakes a server-generated + ``__set___`` identifier into the result-block column name; on a + mixed-version cluster the two sides hash it differently and the distributed read + fails with ``Code: 10 ... Not found column ... While executing Remote.``. The + membership must therefore be built as ``has(array(...), x)`` instead. + """ + project_ids = [11, 22, 33] + agg = AttributeConditionalAggregation( + aggregate=Function.FUNCTION_SUM, + key=AttributeKey(type=AttributeKey.TYPE_DOUBLE, name="my.field"), + label="sum(my.field)", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + filter=TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey(name="sentry.project_id", type=AttributeKey.TYPE_INT), + op=ComparisonFilter.OP_IN, + value=AttributeValue(val_int_array=IntArray(values=project_ids)), + ) + ), + ) + + expr = aggregation_to_expression(agg, attribute_key_to_expression) + + # No in() over a constant array may survive (it would reintroduce the __set_* id). + in_over_arrays = [ + e + for e in expr + if isinstance(e, FunctionCall) + and e.function_name == "in" + and len(e.parameters) == 2 + and isinstance(e.parameters[1], FunctionCall) + and e.parameters[1].function_name == "array" + ] + assert not in_over_arrays, ( + "conditional aggregation must not embed an in() over a constant array" + ) + + # ...and the membership is emitted as has(array(), x). + has_over_pids = [ + e + for e in expr + if isinstance(e, FunctionCall) + and e.function_name == "has" + and isinstance(e.parameters[0], FunctionCall) + and e.parameters[0].function_name == "array" + and [p.value for p in e.parameters[0].parameters if isinstance(p, Literal)] == project_ids + ] + assert has_over_pids, "expected has(array(), x) in the conditional aggregate" diff --git a/tests/web/rpc/test_common.py b/tests/web/rpc/test_common.py index a2f8a599457..d6ce06c540b 100644 --- a/tests/web/rpc/test_common.py +++ b/tests/web/rpc/test_common.py @@ -733,6 +733,60 @@ def test_unsupported_value_type_raises(self) -> None: with pytest.raises(BadSnubaRPCRequestException, match="does not have a value"): _any_attribute_filter_to_expression(filt) + @staticmethod + def _in_over_arrays(expr: Expression) -> list[FunctionCall]: + return [ + e + for e in expr + if isinstance(e, FunctionCall) + and e.function_name == "in" + and len(e.parameters) == 2 + and isinstance(e.parameters[1], FunctionCall) + and e.parameters[1].function_name == "array" + ] + + @staticmethod + def _has_over(expr: Expression, values: list[str]) -> list[FunctionCall]: + return [ + e + for e in expr + if isinstance(e, FunctionCall) + and e.function_name == "has" + and isinstance(e.parameters[0], FunctionCall) + and e.parameters[0].function_name == "array" + and [p.value for p in e.parameters[0].parameters if isinstance(p, Literal)] == values + ] + + @pytest.mark.parametrize("op", [AnyAttributeFilter.OP_IN, AnyAttributeFilter.OP_NOT_IN]) + def test_membership_as_has_for_select_clause(self, op: AnyAttributeFilter.Op.ValueType) -> None: + """Regression guard for SNUBA-9W6 / SNUBA-A1W (mixed-version distributed reads). + + An any-attribute ``IN``/``NOT_IN`` builds ``in(x, array(...))`` inside an + ``arrayExists`` lambda. When that filter lands in a SELECT-clause aggregate, the + constant ``IN`` set's ``__set_`` identifier leaks into the result-block + column name and breaks reads across mixed-version ``Remote`` nodes. With + ``membership_as_has=True`` the comparison must be ``has(array(...), x)`` instead; + the default (WHERE) keeps ``in()`` so the prepared set still drives pruning. + """ + values = ["error", "internal_error"] + filt = AnyAttributeFilter( + op=op, + value=AttributeValue( + val_array=Array(values=[AttributeValue(val_str=v) for v in values]) + ), + ) + + # Default (WHERE) keeps the in() set inside arrayExists. + where_expr = _any_attribute_filter_to_expression(filt) + assert self._in_over_arrays(where_expr), "WHERE form must keep in() over the constant array" + + # SELECT-clause form uses has(array, x) and builds no in() set. + select_expr = _any_attribute_filter_to_expression(filt, membership_as_has=True) + assert not self._in_over_arrays(select_expr), ( + "membership_as_has must replace in() over a constant array with has()" + ) + assert self._has_over(select_expr, values), "expected has(array(values), x) in the lambda" + @pytest.mark.eap @pytest.mark.redis_db diff --git a/tests/web/rpc/v1/test_endpoint_get_traces.py b/tests/web/rpc/v1/test_endpoint_get_traces.py index 3d1b9d14272..30b94387792 100644 --- a/tests/web/rpc/v1/test_endpoint_get_traces.py +++ b/tests/web/rpc/v1/test_endpoint_get_traces.py @@ -21,7 +21,11 @@ ResponseMeta, TraceItemType, ) -from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ( + AttributeKey, + AttributeValue, + IntArray, +) from sentry_protos.snuba.v1.trace_item_filter_pb2 import ( ComparisonFilter, TraceItemFilter, @@ -31,6 +35,11 @@ from snuba import state from snuba.datasets.storages.factory import get_storage from snuba.datasets.storages.storage_key import StorageKey +from snuba.query.expressions import FunctionCall, Literal +from snuba.web.rpc.common.common import ( + attribute_key_to_expression, + trace_item_filters_to_expression, +) from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException from snuba.web.rpc.v1.endpoint_get_traces import EndpointGetTraces from tests.base import BaseApiTest @@ -911,6 +920,69 @@ def trace_filter( ) +def _in_calls_over_arrays(expression: Any) -> list[FunctionCall]: + """All ``in(x, array(...))`` calls in ``expression`` — the form that builds a + constant prepared set (``__set_*``).""" + return [ + exp + for exp in expression + if isinstance(exp, FunctionCall) + and exp.function_name == "in" + and len(exp.parameters) == 2 + and isinstance(exp.parameters[1], FunctionCall) + and exp.parameters[1].function_name == "array" + ] + + +def test_filter_membership_as_has_for_select_clause() -> None: + """Regression guard for SNUBA-9W6 (mixed-version distributed reads). + + The ``filtered_item_count`` filter (and every conditional aggregate) is embedded in + a SELECT-clause ``countIf``. A constant ``IN`` set there bakes a server-generated + ``__set___`` identifier into the result-block column name; on a + mixed-version cluster the two sides hash it differently and the distributed read + fails with ``Code: 10 ... Not found column ... While executing Remote.``. + + ``trace_item_filters_to_expression(..., membership_as_has=True)`` therefore builds + the membership as ``has(array(...), x)`` — byte-stable across versions — while the + default keeps ``x IN (array)`` so WHERE clauses retain their prepared set for + partition/primary-key pruning. + """ + project_ids = [11, 22, 33, 44] + proto_filter = TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey(name="sentry.project_id", type=AttributeKey.TYPE_INT), + op=ComparisonFilter.OP_IN, + value=AttributeValue(val_int_array=IntArray(values=project_ids)), + ), + ) + + # Default (WHERE) form keeps the prepared IN-set over the constant array. + where_expr = trace_item_filters_to_expression(proto_filter, attribute_key_to_expression) + assert _in_calls_over_arrays(where_expr), ( + "WHERE-form filters must keep in() over the constant array (needed for pruning)" + ) + + # SELECT-clause form (membership_as_has=True) builds has(array, x) and no IN-set. + select_expr = trace_item_filters_to_expression( + proto_filter, attribute_key_to_expression, membership_as_has=True + ) + assert not _in_calls_over_arrays(select_expr), ( + "membership_as_has must build has() instead of in() over a constant array " + "to avoid the unstable __set_* identifier" + ) + has_over_project_ids = [ + exp + for exp in select_expr + if isinstance(exp, FunctionCall) + and exp.function_name == "has" + and isinstance(exp.parameters[0], FunctionCall) + and exp.parameters[0].function_name == "array" + and [p.value for p in exp.parameters[0].parameters if isinstance(p, Literal)] == project_ids + ] + assert has_over_project_ids, "expected has(array(), x) with membership_as_has" + + @pytest.mark.clickhouse_db @pytest.mark.redis_db class TestEndpointGetTracesCrossItem(TestEndpointGetTraces):