Skip to content
Merged
83 changes: 69 additions & 14 deletions snuba/web/rpc/common/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,25 @@
from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException


def _in_or_has(value: Expression, array: Expression, *, as_has: bool) -> FunctionCall:
"""Build the membership test ``value IN array``.

Returns ``in(value, array)`` by default, so ClickHouse keeps a prepared set for
partition/primary-key pruning — correct for WHERE clauses. When ``as_has`` is set,
returns ``has(array, value)`` instead, for expressions that land in a SELECT clause
/ projection / aggregate condition / ``HAVING``. There the prepared set's
server-generated ``__set_<Type>_<hash>_<hash>`` identifier leaks into the
result-block column name and is not byte-stable across mixed-version distributed
ClickHouse nodes, which fails the read with
``Code: 10 ... Not found column ... While executing Remote.`` (SNUBA-9W6, SNUBA-B82).
``has`` over a constant array keeps the array inline in the column name and is
equivalent to ``value IN (array)`` for scalar membership.
"""
if as_has:
return f.has(array, value)
return in_cond(value, array)


def attribute_key_to_expression(attr_key: AttributeKey) -> Expression:
"""Convert an AttributeKey proto to a Snuba Expression.

Expand Down Expand Up @@ -412,16 +431,18 @@ def _analyzer_safe_in_expression(
negated: bool,
ignore_case: bool = False,
guard: bool = True,
membership_as_has: bool = False,
) -> Expression:
"""``IN`` / ``NOT IN`` as ``[not] in(value, set)``, wrapped in
``and(exists, ...)`` only when ``guard`` is set (see ``_map_backed_operands``
and ``_comparison_can_match_column_default``). The value lists carry only
scalars, so the set never contains NULL — the legacy ``has(set, NULL)`` branch
was always ``false``."""
was always ``false``. ``membership_as_has`` emits ``has(set, value)`` instead of
``in(value, set)`` for SELECT-clause use (see ``_in_or_has``)."""
value, exists = _map_backed_operands(k)
if ignore_case:
value = f.lower(value)
membership = in_cond(value, v_expression)
membership = _in_or_has(value, v_expression, as_has=membership_as_has)
present = and_cond(exists, membership) if guard else membership
return not_cond(present) if negated else present

Expand Down Expand Up @@ -595,6 +616,8 @@ def _type_array_includes_scalar_expression(

def _any_attribute_filter_to_expression(
filt: AnyAttributeFilter,
*,
membership_as_has: bool = False,
) -> Expression:
"""Build an expression that searches across attribute values.

Expand All @@ -605,7 +628,10 @@ def _any_attribute_filter_to_expression(

arrayExists(x -> <comparison>(x, value), mapValues(column))

wrapped with NOT(...) for negative ops.
wrapped with NOT(...) for negative ops. ``membership_as_has`` builds the IN/NOT_IN
comparison as ``has(array, x)`` rather than ``in(x, array)`` so the (arrayExists'd)
expression carries no ``__set_*`` prepared-set identifier — required when it lands
in a SELECT-clause aggregate on a mixed-version distributed read (see ``_in_or_has``).
"""
# 1. Extract and validate the comparison value
v = filt.value
Expand Down Expand Up @@ -683,12 +709,13 @@ def _any_attribute_filter_to_expression(
lowered = [literal(s.lower()) for s in v.val_str_array.values]
else:
lowered = [literal(elem.val_str.lower()) for elem in v.val_array.values]
comparison = in_cond(
comparison = _in_or_has(
f.lower(x),
literals_array(None, lowered),
as_has=membership_as_has,
)
else:
comparison = in_cond(x, v_expression)
comparison = _in_or_has(x, v_expression, as_has=membership_as_has)
else:
raise BadSnubaRPCRequestException(
f"Unsupported any_attribute_filter op: {AnyAttributeFilter.Op.Name(filt.op)}"
Expand All @@ -707,31 +734,48 @@ def _any_attribute_filter_to_expression(
def trace_item_filters_to_expression(
item_filter: TraceItemFilter,
attribute_key_to_expression: Callable[[AttributeKey], Expression],
membership_as_has: bool = False,
) -> Expression:
"""
Trace Item Filters are things like (span.id=12345 AND start_timestamp >= "june 4th, 2024")
This maps those filters into an expression which can be used in a WHERE clause
:param item_filter:
:param membership_as_has: build ``IN``/``NOT IN`` membership as ``has(array, x)``
rather than ``x IN (array)``. Pass ``True`` only when the result lands in a
SELECT clause / projection / aggregate condition / ``HAVING`` — there a constant
``IN`` set leaks an unstable ``__set_*`` identifier into the result-block column
name and breaks mixed-version distributed reads (see ``_in_or_has``). Leave the
default for WHERE clauses, where the prepared ``IN`` set drives pruning.
:return:
"""
if item_filter.HasField("and_filter"):
filters = item_filter.and_filter.filters
if len(filters) == 0:
return literal(True)
elif len(filters) == 1:
return trace_item_filters_to_expression(filters[0], attribute_key_to_expression)
return trace_item_filters_to_expression(
filters[0], attribute_key_to_expression, membership_as_has
)
return and_cond(
*(trace_item_filters_to_expression(x, attribute_key_to_expression) for x in filters)
*(
trace_item_filters_to_expression(x, attribute_key_to_expression, membership_as_has)
for x in filters
)
)

if item_filter.HasField("or_filter"):
filters = item_filter.or_filter.filters
if len(filters) == 0:
raise BadSnubaRPCRequestException("Invalid trace item filter, empty 'or' clause")
elif len(filters) == 1:
return trace_item_filters_to_expression(filters[0], attribute_key_to_expression)
return trace_item_filters_to_expression(
filters[0], attribute_key_to_expression, membership_as_has
)
return or_cond(
*(trace_item_filters_to_expression(x, attribute_key_to_expression) for x in filters)
*(
trace_item_filters_to_expression(x, attribute_key_to_expression, membership_as_has)
for x in filters
)
)

if item_filter.HasField("not_filter"):
Expand All @@ -740,11 +784,18 @@ def trace_item_filters_to_expression(
raise BadSnubaRPCRequestException("Invalid trace item filter, empty 'not' clause")
elif len(filters) == 1:
return not_cond(
trace_item_filters_to_expression(filters[0], attribute_key_to_expression)
trace_item_filters_to_expression(
filters[0], attribute_key_to_expression, membership_as_has
)
)
return not_cond(
and_cond(
*(trace_item_filters_to_expression(x, attribute_key_to_expression) for x in filters)
*(
trace_item_filters_to_expression(
x, attribute_key_to_expression, membership_as_has
)
for x in filters
)
)
)

Expand Down Expand Up @@ -965,10 +1016,11 @@ def trace_item_filters_to_expression(
negated=False,
ignore_case=ignore_case,
guard=_comparison_can_match_column_default(k.type, v, value_type),
membership_as_has=membership_as_has,
)
if ignore_case:
k_expression = f.lower(k_expression)
expr = in_cond(k_expression, v_expression)
expr = _in_or_has(k_expression, v_expression, as_has=membership_as_has)
expr_with_null = or_cond(
expr,
and_cond(f.isNull(k_expression), f.has(v_expression, literal(None))),
Expand Down Expand Up @@ -999,10 +1051,11 @@ def trace_item_filters_to_expression(
negated=True,
ignore_case=ignore_case,
guard=_comparison_can_match_column_default(k.type, v, value_type),
membership_as_has=membership_as_has,
)
if ignore_case:
k_expression = f.lower(k_expression)
expr = not_cond(in_cond(k_expression, v_expression))
expr = not_cond(_in_or_has(k_expression, v_expression, as_has=membership_as_has))
Comment thread
cursor[bot] marked this conversation as resolved.
expr_with_null = or_cond(
expr,
and_cond(
Expand All @@ -1027,7 +1080,9 @@ def trace_item_filters_to_expression(
if item_filter.HasField("any_attribute_filter"):
if not state.get_int_config("enable_any_attribute_filter", 1):
return literal(True)
return _any_attribute_filter_to_expression(item_filter.any_attribute_filter)
return _any_attribute_filter_to_expression(
item_filter.any_attribute_filter, membership_as_has=membership_as_has
)

return literal(True)

Expand Down
6 changes: 6 additions & 0 deletions snuba/web/rpc/v1/endpoint_get_traces.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,11 @@ def _get_trace_item_filter_expressions(
) -> dict[TraceItemType.ValueType, Expression]:
"""
Returns a dict mapping item types to a filter expression for that item type.

These expressions are only ever embedded in the ``filtered_item_count`` countIf
in the SELECT clause, so membership is built as ``has(array, x)``
(``membership_as_has``) to keep the result-block column name stable across
mixed-version distributed ClickHouse nodes (see common._in_or_has).
"""
filters_by_item_type: dict[TraceItemType.ValueType, list[TraceItemFilter]] = defaultdict(
list
Expand All @@ -578,6 +583,7 @@ def _get_trace_item_filter_expressions(
),
),
attribute_key_to_expression,
membership_as_has=True,
),
)

Expand Down
6 changes: 5 additions & 1 deletion snuba/web/rpc/v1/resolvers/common/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,12 @@ def _get_condition_in_aggregation(
) -> Expression:
condition_in_aggregation: Expression = literal(True)
if isinstance(aggregation, AttributeConditionalAggregation):
# This condition is embedded in SELECT-clause conditional aggregates (countIf,
# sumIf, ...), so build any constant IN-set as has(array, x) to keep the
# result-block column name stable across mixed-version ClickHouse nodes on
# distributed reads (membership_as_has, see common._in_or_has).
condition_in_aggregation = trace_item_filters_to_expression(
aggregation.filter, attribute_key_to_expression
aggregation.filter, attribute_key_to_expression, membership_as_has=True
)
return condition_in_aggregation

Expand Down
21 changes: 19 additions & 2 deletions snuba/web/rpc/v1/resolvers/common/cross_item_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,13 @@ def get_trace_ids_sql_for_cross_item_query(
Returns:
tuple: (sql_string, query_result) where query_result contains metadata like sampling_tier
"""
# filter_expressions feed the WHERE or_cond (keep the prepared IN-sets for
# partition/primary-key pruning); having_filter_expressions feed the HAVING countIf
# (a SELECT-clause aggregate), where the membership must be has(array, x) so its
# result-block column name is stable across mixed-version ClickHouse nodes
# (membership_as_has, see common._in_or_has).
filter_expressions = []
having_filter_expressions = []
if trace_filters:
converted_trace_filters = [trace_filter for trace_filter in trace_filters]
if isinstance(trace_filters[0], GetTracesRequest.TraceFilter):
Expand All @@ -77,19 +83,30 @@ def get_trace_ids_sql_for_cross_item_query(
]

for trace_filter in converted_trace_filters:
item_type_cond = f.equals(column("item_type"), trace_filter.item_type)
filter_expressions.append(
and_cond(
f.equals(column("item_type"), trace_filter.item_type),
item_type_cond,
trace_item_filters_to_expression(
trace_filter.filter,
attribute_key_to_expression,
),
)
)
having_filter_expressions.append(
and_cond(
item_type_cond,
trace_item_filters_to_expression(
trace_filter.filter,
attribute_key_to_expression,
membership_as_has=True,
),
)
)

if len(filter_expressions) > 1:
trace_item_filters_and_expression = and_cond(
*[f.greater(f.countIf(expression), 0) for expression in filter_expressions]
*[f.greater(f.countIf(expression), 0) for expression in having_filter_expressions]
)
trace_item_filters_or_expression = or_cond(*filter_expressions)
elif len(filter_expressions) == 1:
Expand Down
57 changes: 57 additions & 0 deletions tests/web/rpc/test_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
)
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import (
AttributeKey,
AttributeValue,
ExtrapolationMode,
Function,
IntArray,
Reliability,
)
from sentry_protos.snuba.v1.trace_item_filter_pb2 import ComparisonFilter, TraceItemFilter

from snuba.query.expressions import Column, FunctionCall, JsonPath, Literal, SubscriptableReference
from snuba.web.rpc.common.common import (
Expand Down Expand Up @@ -328,3 +331,57 @@ def test_aggregation_to_expression_sum_type_array_raises() -> None:
)
with pytest.raises(BadSnubaRPCRequestException, match="not supported for array attribute"):
aggregation_to_expression(agg, attribute_key_to_expression)


def test_conditional_aggregation_uses_has_for_in_sets() -> None:
"""Regression guard for SNUBA-9W6 / SNUBA-A1W (mixed-version distributed reads).

A conditional aggregation's filter is embedded in a SELECT-clause ``countIf``/
``sumIf``. A constant ``IN`` set there bakes a server-generated
``__set_<Type>_<hash>_<hash>`` identifier into the result-block column name; on a
mixed-version cluster the two sides hash it differently and the distributed read
fails with ``Code: 10 ... Not found column ... While executing Remote.``. The
membership must therefore be built as ``has(array(...), x)`` instead.
"""
project_ids = [11, 22, 33]
agg = AttributeConditionalAggregation(
aggregate=Function.FUNCTION_SUM,
key=AttributeKey(type=AttributeKey.TYPE_DOUBLE, name="my.field"),
label="sum(my.field)",
extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE,
filter=TraceItemFilter(
comparison_filter=ComparisonFilter(
key=AttributeKey(name="sentry.project_id", type=AttributeKey.TYPE_INT),
op=ComparisonFilter.OP_IN,
value=AttributeValue(val_int_array=IntArray(values=project_ids)),
)
),
)

expr = aggregation_to_expression(agg, attribute_key_to_expression)

# No in() over a constant array may survive (it would reintroduce the __set_* id).
in_over_arrays = [
e
for e in expr
if isinstance(e, FunctionCall)
and e.function_name == "in"
and len(e.parameters) == 2
and isinstance(e.parameters[1], FunctionCall)
and e.parameters[1].function_name == "array"
]
assert not in_over_arrays, (
"conditional aggregation must not embed an in() over a constant array"
)

# ...and the membership is emitted as has(array(<project_ids>), x).
has_over_pids = [
e
for e in expr
if isinstance(e, FunctionCall)
and e.function_name == "has"
and isinstance(e.parameters[0], FunctionCall)
and e.parameters[0].function_name == "array"
and [p.value for p in e.parameters[0].parameters if isinstance(p, Literal)] == project_ids
]
assert has_over_pids, "expected has(array(<project_ids>), x) in the conditional aggregate"
Loading
Loading