diff --git a/snuba/datasets/configuration/events_analytics_platform/entities/eap_items.yaml b/snuba/datasets/configuration/events_analytics_platform/entities/eap_items.yaml index d47659d1555..bab229eb410 100644 --- a/snuba/datasets/configuration/events_analytics_platform/entities/eap_items.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/entities/eap_items.yaml @@ -109,6 +109,7 @@ storage_selector: selector: EAPItemsStorageSelector query_processors: + - processor: IndexedNameOptimizer - processor: HashBucketFunctionTransformer args: hash_bucket_names: diff --git a/snuba/query/processors/logical/indexed_name_optimizer.py b/snuba/query/processors/logical/indexed_name_optimizer.py new file mode 100644 index 00000000000..bec0de541e7 --- /dev/null +++ b/snuba/query/processors/logical/indexed_name_optimizer.py @@ -0,0 +1,141 @@ +from typing import Optional + +from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType + +from snuba import state +from snuba.query.conditions import ( + ConditionFunctions, + get_first_level_and_conditions, +) +from snuba.query.expressions import ( + Column, + Expression, + FunctionCall, + Literal, + SubscriptableReference, +) +from snuba.query.logical import Query +from snuba.query.processors.logical import LogicalQueryProcessor +from snuba.query.query_settings import QuerySettings + + +class IndexedNameOptimizer(LogicalQueryProcessor): + """ + In eap_items, the per-item-type primary name attribute is promoted to a + dedicated ``indexed_name`` String column with a bloom filter index: + ``sentry.op`` for spans and ``sentry.metric.name`` for metrics. The same + value is still written to the hashed ``attributes_string`` buckets. + + A filter on ``attributes_string['sentry.op']`` (resp. + ``attributes_string['sentry.metric.name']``) is otherwise translated into a + bucket map lookup (``arrayElement(attributes_string_N, ...)``) which has no + index. This processor rewrites that access into a reference to the indexed + ``indexed_name`` column so the bloom filter index can be used:: + + equals(arrayElement(attributes_string, 'sentry.op'), 'db.query') + -> equals(indexed_name, 'db.query') + + The EAP RPC builder emits the ``arrayElement(attributes_string, key)`` form + (see ``snuba.web.rpc.common.common._map_backed_operands``); the equivalent + hand-written SnQL ``SubscriptableReference`` form is handled too. This runs + before ``HashBucketFunctionTransformer``, so untouched accesses still get + mapped to their hashed bucket column downstream. + + ``indexed_name`` was added (with no backfill) by migration + ``0057_add_name_column_and_index``, so rows written before that migration — + or before the value started being populated — have an empty ``indexed_name`` + while still carrying the value in ``attributes_string``. Reading + ``indexed_name`` for those rows would silently drop them, so the rewrite is + gated behind the ``CONFIG_KEY`` runtime flag (default off) and must only be + enabled once ``indexed_name`` is fully populated for the live retention + window. + + The promotion is item-type specific, so the rewrite is only applied when the + query is unambiguously scoped (via an ``item_type`` equality condition) to + the item type that owns the attribute. Otherwise the access is left as is and + falls back to the bucket lookup. + """ + + # Runtime flag gating the rewrite. Off (0) until ``indexed_name`` has been + # backfilled across the retention window; flipping it on lets the bloom + # filter index serve ``sentry.op`` / ``sentry.metric.name`` filters. + CONFIG_KEY = "eap_items_use_indexed_name" + + # item_type proto enum value -> attribute promoted into ``indexed_name`` + INDEXED_NAME_KEY_BY_ITEM_TYPE: dict[int, str] = { + TraceItemType.TRACE_ITEM_TYPE_SPAN: "sentry.op", + TraceItemType.TRACE_ITEM_TYPE_METRIC: "sentry.metric.name", + } + + INDEXED_NAME_COLUMN = "indexed_name" + ATTRIBUTES_STRING_COLUMN = "attributes_string" + + def _indexed_name_key(self, query: Query) -> Optional[str]: + """Return the attribute name promoted into ``indexed_name`` for this + query, or ``None`` if the query is not unambiguously scoped to a single + supported item type.""" + condition = query.get_condition() + if condition is None: + return None + + item_types: set[int] = set() + for cond in get_first_level_and_conditions(condition): + if not isinstance(cond, FunctionCall) or cond.function_name != ConditionFunctions.EQ: + continue + if len(cond.parameters) != 2: + continue + lhs, rhs = cond.parameters + if not isinstance(lhs, Column) or lhs.column_name != "item_type": + continue + if not isinstance(rhs, Literal) or not isinstance(rhs.value, int): + continue + item_types.add(rhs.value) + + if len(item_types) != 1: + return None + return self.INDEXED_NAME_KEY_BY_ITEM_TYPE.get(item_types.pop()) + + def _indexed_name_ref(self, exp: Expression, key: str) -> Optional[Column]: + """If ``exp`` is an ``attributes_string[key]`` access — either the + ``arrayElement(attributes_string, key)`` form emitted by the EAP RPC + builder or the hand-written ``SubscriptableReference`` form — return the + equivalent reference to the ``indexed_name`` column (preserving the alias + and table name). Otherwise return ``None``.""" + if ( + isinstance(exp, FunctionCall) + and exp.function_name == "arrayElement" + and len(exp.parameters) == 2 + and isinstance(exp.parameters[0], Column) + and exp.parameters[0].column_name == self.ATTRIBUTES_STRING_COLUMN + and isinstance(exp.parameters[1], Literal) + and exp.parameters[1].value == key + ): + return Column(exp.alias, exp.parameters[0].table_name, self.INDEXED_NAME_COLUMN) + + if ( + isinstance(exp, SubscriptableReference) + and exp.column.column_name == self.ATTRIBUTES_STRING_COLUMN + and isinstance(exp.key, Literal) + and exp.key.value == key + ): + return Column(exp.alias, exp.column.table_name, self.INDEXED_NAME_COLUMN) + + return None + + def process_query(self, query: Query, query_settings: QuerySettings) -> None: + if state.get_int_config(self.CONFIG_KEY, 0) == 0: + return + + key = self._indexed_name_key(query) + if key is None: + return + + # Bind to a str-typed local: mypy widens a narrowed enclosing-scope + # variable back to its declared Optional[str] inside the nested closure. + indexed_key: str = key + + def transform(exp: Expression) -> Expression: + indexed_ref = self._indexed_name_ref(exp, indexed_key) + return indexed_ref if indexed_ref is not None else exp + + query.transform_expressions(transform) diff --git a/tests/query/processors/test_indexed_name_optimizer.py b/tests/query/processors/test_indexed_name_optimizer.py new file mode 100644 index 00000000000..cf8cea31236 --- /dev/null +++ b/tests/query/processors/test_indexed_name_optimizer.py @@ -0,0 +1,269 @@ +from copy import deepcopy + +import pytest +from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType + +from snuba.clickhouse.columns import ColumnSet +from snuba.datasets.entities.entity_key import EntityKey +from snuba.query import SelectedExpression +from snuba.query.data_source.simple import Entity as QueryEntity +from snuba.query.dsl import ( + and_cond, + arrayElement, + column, + equals, + in_cond, + literal, + literals_array, +) +from snuba.query.expressions import ( + Column, + Expression, + FunctionCall, + Literal, + SubscriptableReference, +) +from snuba.query.logical import Query +from snuba.query.processors.logical.indexed_name_optimizer import ( + IndexedNameOptimizer, +) +from snuba.query.query_settings import HTTPQuerySettings +from snuba.state import set_config + +# Use the protobuf enum values directly so the tests stay in sync with the +# production dependency the optimizer keys off of. +SPAN = TraceItemType.TRACE_ITEM_TYPE_SPAN +METRIC = TraceItemType.TRACE_ITEM_TYPE_METRIC +LOG = TraceItemType.TRACE_ITEM_TYPE_LOG + + +def _attr_arr(key: str) -> FunctionCall: + """The ``arrayElement(attributes_string, key)`` form the EAP RPC builder + emits for a map-backed string filter (see ``_map_backed_operands``).""" + return arrayElement(None, column("attributes_string"), literal(key)) + + +def _attr_str(key: str, alias: str | None = None) -> SubscriptableReference: + """The hand-written SnQL ``attributes_string[key]`` form.""" + return SubscriptableReference( + alias=alias, + column=column("attributes_string"), + key=literal(key), + ) + + +def _query( + condition: Expression, + selected_columns: list[SelectedExpression] | None = None, +) -> Query: + return Query( + QueryEntity(EntityKey.EAP_ITEMS, ColumnSet([])), + selected_columns=selected_columns or [SelectedExpression("c", column("project_id"))], + condition=condition, + ) + + +test_data = [ + pytest.param( + _query( + and_cond( + equals(column("item_type"), literal(SPAN)), + equals(_attr_arr("sentry.op"), literal("db.query")), + ) + ), + _query( + and_cond( + equals(column("item_type"), literal(SPAN)), + equals(column("indexed_name"), literal("db.query")), + ) + ), + id="span sentry.op (arrayElement) rewritten to indexed_name", + ), + pytest.param( + _query( + and_cond( + equals(column("item_type"), literal(SPAN)), + equals(_attr_str("sentry.op"), literal("db.query")), + ) + ), + _query( + and_cond( + equals(column("item_type"), literal(SPAN)), + equals(column("indexed_name"), literal("db.query")), + ) + ), + id="span sentry.op (SubscriptableReference) rewritten to indexed_name", + ), + pytest.param( + _query( + and_cond( + equals(column("item_type"), literal(METRIC)), + equals(_attr_arr("sentry.metric.name"), literal("my.metric")), + ) + ), + _query( + and_cond( + equals(column("item_type"), literal(METRIC)), + equals(column("indexed_name"), literal("my.metric")), + ) + ), + id="metric sentry.metric.name rewritten to indexed_name", + ), + pytest.param( + _query( + and_cond( + equals(column("item_type"), literal(SPAN)), + in_cond( + _attr_arr("sentry.op"), + literals_array(None, [literal("db.query"), literal("http.client")]), + ), + ) + ), + _query( + and_cond( + equals(column("item_type"), literal(SPAN)), + in_cond( + column("indexed_name"), + literals_array(None, [literal("db.query"), literal("http.client")]), + ), + ) + ), + id="span sentry.op IN rewritten to indexed_name", + ), + pytest.param( + # The rewrite is a value substitution, so it applies to any operator the + # access appears under (the index only helps equals/in, but substituting + # is always correct once indexed_name is populated). + _query( + and_cond( + equals(column("item_type"), literal(SPAN)), + FunctionCall(None, "notEquals", (_attr_arr("sentry.op"), literal("db.query"))), + ) + ), + _query( + and_cond( + equals(column("item_type"), literal(SPAN)), + FunctionCall(None, "notEquals", (column("indexed_name"), literal("db.query"))), + ) + ), + id="any operator on the access is rewritten (notEquals)", + ), + pytest.param( + _query( + and_cond( + equals(column("item_type"), literal(SPAN)), + equals(_attr_arr("sentry.metric.name"), literal("my.metric")), + ) + ), + _query( + and_cond( + equals(column("item_type"), literal(SPAN)), + equals(_attr_arr("sentry.metric.name"), literal("my.metric")), + ) + ), + id="span query with metric key is left untouched", + ), + pytest.param( + _query( + and_cond( + equals(column("item_type"), literal(SPAN)), + equals(_attr_arr("foo"), literal("bar")), + ) + ), + _query( + and_cond( + equals(column("item_type"), literal(SPAN)), + equals(_attr_arr("foo"), literal("bar")), + ) + ), + id="non-indexed attribute is left untouched", + ), + pytest.param( + _query(equals(_attr_arr("sentry.op"), literal("db.query"))), + _query(equals(_attr_arr("sentry.op"), literal("db.query"))), + id="no item_type condition leaves bucket lookup", + ), + pytest.param( + _query( + and_cond( + equals(column("item_type"), literal(LOG)), + equals(_attr_arr("sentry.op"), literal("db.query")), + ) + ), + _query( + and_cond( + equals(column("item_type"), literal(LOG)), + equals(_attr_arr("sentry.op"), literal("db.query")), + ) + ), + id="unsupported item_type leaves bucket lookup", + ), + pytest.param( + _query( + condition=equals(column("item_type"), literal(SPAN)), + selected_columns=[ + SelectedExpression("op", _attr_str("sentry.op", alias="op")), + ], + ), + _query( + condition=equals(column("item_type"), literal(SPAN)), + selected_columns=[ + SelectedExpression("op", Column("op", None, "indexed_name")), + ], + ), + id="select preserves alias on rewrite", + ), +] + + +@pytest.mark.redis_db +@pytest.mark.parametrize("pre_format, expected_query", test_data) +def test_indexed_name_optimizer(pre_format: Query, expected_query: Query) -> None: + set_config(IndexedNameOptimizer.CONFIG_KEY, 1) + copy = deepcopy(pre_format) + IndexedNameOptimizer().process_query(copy, HTTPQuerySettings()) + assert copy.get_selected_columns() == expected_query.get_selected_columns() + assert copy.get_condition() == expected_query.get_condition() + + +@pytest.mark.redis_db +def test_disabled_by_default_leaves_query_untouched() -> None: + set_config(IndexedNameOptimizer.CONFIG_KEY, 0) + query = _query( + and_cond( + equals(column("item_type"), literal(SPAN)), + equals(_attr_arr("sentry.op"), literal("db.query")), + ) + ) + copy = deepcopy(query) + IndexedNameOptimizer().process_query(copy, HTTPQuerySettings()) + assert copy.get_condition() == query.get_condition() + assert copy.get_selected_columns() == query.get_selected_columns() + + +@pytest.mark.redis_db +def test_contradictory_item_types_left_untouched() -> None: + set_config(IndexedNameOptimizer.CONFIG_KEY, 1) + query = _query( + and_cond( + equals(column("item_type"), literal(SPAN)), + and_cond( + equals(column("item_type"), literal(METRIC)), + equals(_attr_arr("sentry.op"), literal("db.query")), + ), + ) + ) + copy = deepcopy(query) + IndexedNameOptimizer().process_query(copy, HTTPQuerySettings()) + # ambiguous item_type -> no rewrite; no indexed_name reference is introduced. + condition = copy.get_condition() + assert condition is not None + assert not any(isinstance(e, Column) and e.column_name == "indexed_name" for e in condition) + # the original bucket lookup survives untouched. + assert any( + isinstance(e, FunctionCall) + and e.function_name == "arrayElement" + and isinstance(e.parameters[1], Literal) + and e.parameters[1].value == "sentry.op" + for e in condition + ) diff --git a/tests/web/rpc/v1/test_endpoint_time_series/test_endpoint_time_series_indexed_name.py b/tests/web/rpc/v1/test_endpoint_time_series/test_endpoint_time_series_indexed_name.py new file mode 100644 index 00000000000..8ed925873f8 --- /dev/null +++ b/tests/web/rpc/v1/test_endpoint_time_series/test_endpoint_time_series_indexed_name.py @@ -0,0 +1,130 @@ +from datetime import UTC, datetime, timedelta + +import pytest +from google.protobuf.timestamp_pb2 import Timestamp +from sentry_protos.snuba.v1.endpoint_time_series_pb2 import ( + TimeSeriesRequest, + TimeSeriesResponse, +) +from sentry_protos.snuba.v1.request_common_pb2 import RequestMeta, TraceItemType +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ( + AttributeAggregation, + AttributeKey, + AttributeValue, + ExtrapolationMode, + Function, +) +from sentry_protos.snuba.v1.trace_item_filter_pb2 import ( + ComparisonFilter, + TraceItemFilter, +) +from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue + +from snuba.datasets.storages.factory import get_storage +from snuba.datasets.storages.storage_key import StorageKey +from snuba.query.processors.logical.indexed_name_optimizer import IndexedNameOptimizer +from snuba.state import set_config +from snuba.web.rpc.v1.endpoint_time_series import EndpointTimeSeries +from tests.base import BaseApiTest +from tests.helpers import write_raw_unprocessed_events +from tests.web.rpc.v1.test_utils import gen_item_message + +BASE_TIME = datetime.utcnow().replace( + hour=8, minute=0, second=0, microsecond=0, tzinfo=UTC +) - timedelta(hours=24) + +GRANULARITY_SECS = 300 +QUERY_DURATION = 3600 + +# A matching metric is written this many times; each carries value=1.0, so a SUM +# over the matching items totals MATCHING_COUNT. +MATCHING_COUNT = 6 +NON_MATCHING_COUNT = 4 + + +def _store_metrics() -> None: + """Write METRIC items: ``MATCHING_COUNT`` named ``my.metric`` plus + ``NON_MATCHING_COUNT`` named ``other.metric``. The rust processor promotes + ``sentry.metric.name`` into the ``indexed_name`` column (see + ``rust_snuba/src/processors/eap_items.rs``), which is what the optimizer + rewrites the name filter onto.""" + messages: list[bytes] = [] + for name, count in (("my.metric", MATCHING_COUNT), ("other.metric", NON_MATCHING_COUNT)): + for _ in range(count): + messages.append( + gen_item_message( + start_timestamp=BASE_TIME + timedelta(seconds=60), + type=TraceItemType.TRACE_ITEM_TYPE_METRIC, + remove_default_attributes=True, + attributes={ + "sentry.metric.name": AnyValue(string_value=name), + "value": AnyValue(double_value=1.0), + }, + ) + ) + write_raw_unprocessed_events(get_storage(StorageKey("eap_items")), messages) # type: ignore + + +def _request() -> TimeSeriesRequest: + """SUM(value) over metrics named ``my.metric``. The name filter is what the + IndexedNameOptimizer rewrites onto the indexed_name column for metrics.""" + return TimeSeriesRequest( + meta=RequestMeta( + project_ids=[1], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())), + end_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp()) + QUERY_DURATION), + trace_item_type=TraceItemType.TRACE_ITEM_TYPE_METRIC, + ), + filter=TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey(type=AttributeKey.TYPE_STRING, name="sentry.metric.name"), + op=ComparisonFilter.OP_EQUALS, + value=AttributeValue(val_str="my.metric"), + ) + ), + aggregations=[ + AttributeAggregation( + aggregate=Function.FUNCTION_SUM, + key=AttributeKey(type=AttributeKey.TYPE_FLOAT, name="value"), + label="sum", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + ], + granularity_secs=GRANULARITY_SECS, + ) + + +def _total(response: TimeSeriesResponse) -> float: + return float(sum(dp.data for ts in response.result_timeseries for dp in ts.data_points)) + + +@pytest.mark.eap +@pytest.mark.redis_db +class TestTimeSeriesIndexedName(BaseApiTest): + def test_metric_name_filter_with_indexed_name_enabled(self) -> None: + """A metric-name-filtered time series returns the right rows when the + optimizer rewrites the name filter onto the indexed_name column.""" + _store_metrics() + set_config(IndexedNameOptimizer.CONFIG_KEY, 1) + + response = EndpointTimeSeries().execute(_request()) + + # Only the my.metric items (value=1.0 each) match; other.metric is excluded. + assert _total(response) == float(MATCHING_COUNT) + + def test_indexed_name_rewrite_is_result_preserving(self) -> None: + """The rewrite must not change results: the same request returns the same + time series with the flag off (bucket lookup) and on (indexed_name).""" + _store_metrics() + + set_config(IndexedNameOptimizer.CONFIG_KEY, 0) + disabled = EndpointTimeSeries().execute(_request()) + + set_config(IndexedNameOptimizer.CONFIG_KEY, 1) + enabled = EndpointTimeSeries().execute(_request()) + + assert _total(disabled) == float(MATCHING_COUNT) + assert list(enabled.result_timeseries) == list(disabled.result_timeseries)