From a6edc34f0d2831a05cb72acdbb97022670e37bec Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Wed, 17 Jun 2026 17:02:13 -0700 Subject: [PATCH 1/4] feat(eap-items): add conversation_id and session_id columns Add `conversation_id` and `session_id` UUID columns to the EAP items table, all three downsample tiers, and the read-only variants, plus the materialized views that populate the downsampled tables. - Migration 0059 adds the columns (after `indexed_name`) to the local, distributed, and read-only distributed tables, adds bloom_filter indexes on the local tables, and recreates the downsample MVs (mv_6 -> mv_7) so the new columns are populated on the read paths. - Storage configs declare the new UUID columns and register them with the UUIDColumnProcessor alongside `trace_id`. Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_013H6Jsmx7v293rnQM1cwMwS --- .../storages/eap_items.yaml | 4 +- .../storages/eap_items_downsample_512.yaml | 4 +- .../storages/eap_items_downsample_512_ro.yaml | 4 +- .../storages/eap_items_downsample_64.yaml | 4 +- .../storages/eap_items_downsample_64_ro.yaml | 4 +- .../storages/eap_items_downsample_8.yaml | 4 +- .../storages/eap_items_downsample_8_ro.yaml | 4 +- .../storages/eap_items_ro.yaml | 4 +- ...0059_add_conversation_id_and_session_id.py | 214 ++++++++++++++++++ 9 files changed, 238 insertions(+), 8 deletions(-) create mode 100644 snuba/snuba_migrations/events_analytics_platform/0059_add_conversation_id_and_session_id.py diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items.yaml index df5eaf469bb..30894a55851 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items.yaml @@ -21,6 +21,8 @@ schema: { name: sampling_factor, type: Float, args: { size: 64 } }, { name: retention_days, type: UInt, args: { size: 16 } }, { name: indexed_name, type: String }, + { name: conversation_id, type: UUID }, + { name: session_id, type: UUID }, { name: downsampled_retention_days, type: UInt, args: { size: 16 } }, { name: attributes_bool, type: Map, args: { key: { type: String }, value: { type: Bool } } }, @@ -115,7 +117,7 @@ query_processors: - processor: UniqInSelectAndHavingProcessor - processor: UUIDColumnProcessor args: - columns: [trace_id] + columns: [trace_id, conversation_id, session_id] - processor: HexIntColumnProcessor args: columns: [item_id] diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_512.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_512.yaml index 8adc09873c9..2f52e7c2fc2 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_512.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_512.yaml @@ -22,6 +22,8 @@ schema: { name: retention_days, type: UInt, args: { size: 16 } }, { name: indexed_name, type: String }, + { name: conversation_id, type: UUID }, + { name: session_id, type: UUID }, { name: attributes_bool, type: Map, args: { key: { type: String }, value: { type: Bool } } }, { name: attributes_int, type: Map, args: { key: { type: String }, value: { type: Int, args: { size: 64 } } } }, @@ -115,7 +117,7 @@ query_processors: - processor: UniqInSelectAndHavingProcessor - processor: UUIDColumnProcessor args: - columns: [trace_id] + columns: [trace_id, conversation_id, session_id] - processor: HexIntColumnProcessor args: columns: [item_id] diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_512_ro.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_512_ro.yaml index 30b471aa223..5016de27948 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_512_ro.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_512_ro.yaml @@ -22,6 +22,8 @@ schema: { name: retention_days, type: UInt, args: { size: 16 } }, { name: indexed_name, type: String }, + { name: conversation_id, type: UUID }, + { name: session_id, type: UUID }, { name: attributes_bool, type: Map, args: { key: { type: String }, value: { type: Bool } } }, { name: attributes_int, type: Map, args: { key: { type: String }, value: { type: Int, args: { size: 64 } } } }, @@ -115,7 +117,7 @@ query_processors: - processor: UniqInSelectAndHavingProcessor - processor: UUIDColumnProcessor args: - columns: [trace_id] + columns: [trace_id, conversation_id, session_id] - processor: HexIntColumnProcessor args: columns: [item_id] diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_64.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_64.yaml index f0863958384..821788a0c41 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_64.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_64.yaml @@ -22,6 +22,8 @@ schema: { name: retention_days, type: UInt, args: { size: 16 } }, { name: indexed_name, type: String }, + { name: conversation_id, type: UUID }, + { name: session_id, type: UUID }, { name: attributes_bool, type: Map, args: { key: { type: String }, value: { type: Bool } } }, { name: attributes_int, type: Map, args: { key: { type: String }, value: { type: Int, args: { size: 64 } } } }, @@ -116,7 +118,7 @@ query_processors: - processor: UniqInSelectAndHavingProcessor - processor: UUIDColumnProcessor args: - columns: [trace_id] + columns: [trace_id, conversation_id, session_id] - processor: HexIntColumnProcessor args: columns: [item_id] diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_64_ro.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_64_ro.yaml index e9461de4f53..6ab7a41efd7 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_64_ro.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_64_ro.yaml @@ -22,6 +22,8 @@ schema: { name: retention_days, type: UInt, args: { size: 16 } }, { name: indexed_name, type: String }, + { name: conversation_id, type: UUID }, + { name: session_id, type: UUID }, { name: attributes_bool, type: Map, args: { key: { type: String }, value: { type: Bool } } }, { name: attributes_int, type: Map, args: { key: { type: String }, value: { type: Int, args: { size: 64 } } } }, @@ -116,7 +118,7 @@ query_processors: - processor: UniqInSelectAndHavingProcessor - processor: UUIDColumnProcessor args: - columns: [trace_id] + columns: [trace_id, conversation_id, session_id] - processor: HexIntColumnProcessor args: columns: [item_id] diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_8.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_8.yaml index 0fa40fec786..134850528eb 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_8.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_8.yaml @@ -22,6 +22,8 @@ schema: { name: retention_days, type: UInt, args: { size: 16 } }, { name: indexed_name, type: String }, + { name: conversation_id, type: UUID }, + { name: session_id, type: UUID }, { name: attributes_bool, type: Map, args: { key: { type: String }, value: { type: Bool } } }, { name: attributes_int, type: Map, args: { key: { type: String }, value: { type: Int, args: { size: 64 } } } }, @@ -115,7 +117,7 @@ query_processors: - processor: UniqInSelectAndHavingProcessor - processor: UUIDColumnProcessor args: - columns: [trace_id] + columns: [trace_id, conversation_id, session_id] - processor: HexIntColumnProcessor args: columns: [item_id] diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_8_ro.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_8_ro.yaml index 32b0595c9b6..27450e63cbf 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_8_ro.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_8_ro.yaml @@ -22,6 +22,8 @@ schema: { name: retention_days, type: UInt, args: { size: 16 } }, { name: indexed_name, type: String }, + { name: conversation_id, type: UUID }, + { name: session_id, type: UUID }, { name: attributes_bool, type: Map, args: { key: { type: String }, value: { type: Bool } } }, { name: attributes_int, type: Map, args: { key: { type: String }, value: { type: Int, args: { size: 64 } } } }, @@ -115,7 +117,7 @@ query_processors: - processor: UniqInSelectAndHavingProcessor - processor: UUIDColumnProcessor args: - columns: [trace_id] + columns: [trace_id, conversation_id, session_id] - processor: HexIntColumnProcessor args: columns: [item_id] diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_ro.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_ro.yaml index 74a77e3d262..8558b9c8ff3 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_ro.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_ro.yaml @@ -21,6 +21,8 @@ schema: { name: sampling_factor, type: Float, args: { size: 64 } }, { name: retention_days, type: UInt, args: { size: 16 } }, { name: indexed_name, type: String }, + { name: conversation_id, type: UUID }, + { name: session_id, type: UUID }, { name: downsampled_retention_days, type: UInt, args: { size: 16 } }, { name: attributes_bool, type: Map, args: { key: { type: String }, value: { type: Bool } } }, @@ -115,7 +117,7 @@ query_processors: - processor: UniqInSelectAndHavingProcessor - processor: UUIDColumnProcessor args: - columns: [trace_id] + columns: [trace_id, conversation_id, session_id] - processor: HexIntColumnProcessor args: columns: [item_id] diff --git a/snuba/snuba_migrations/events_analytics_platform/0059_add_conversation_id_and_session_id.py b/snuba/snuba_migrations/events_analytics_platform/0059_add_conversation_id_and_session_id.py new file mode 100644 index 00000000000..08505458835 --- /dev/null +++ b/snuba/snuba_migrations/events_analytics_platform/0059_add_conversation_id_and_session_id.py @@ -0,0 +1,214 @@ +from typing import Callable, List + +from snuba.clickhouse.columns import Column +from snuba.clusters.storage_sets import StorageSetKey +from snuba.migrations import migration, operations +from snuba.migrations.columns import MigrationModifiers as Modifiers +from snuba.migrations.operations import OperationTarget, SqlOperation +from snuba.snuba_migrations.events_analytics_platform.templates import ( + SAMPLING_WEIGHTS, + downsample_mv_select, + get_eap_items_columns, + swap_downsample_materialized_views, +) +from snuba.utils.schemas import UUID + +storage_set = StorageSetKey.EVENTS_ANALYTICS_PLATFORM +ro_storage_set = StorageSetKey.EVENTS_ANALYTICS_PLATFORM_RO +table_name_prefix = "eap_items_1" +sampling_weights = SAMPLING_WEIGHTS + +# Two new identifier columns, stored as UUID to match the existing `trace_id` +# column. They are added right after `indexed_name` so the on-disk order stays +# stable: indexed_name, conversation_id, session_id. +new_columns: List[Column[Modifiers]] = [ + Column("conversation_id", UUID()), + Column("session_id", UUID()), +] +# `after` targets chained so the final order is +# indexed_name -> conversation_id -> session_id. +add_column_after = ["indexed_name", "conversation_id"] + +# Bloom-filter indexes on the new columns (mirrors `bf_indexed_name` from +# migration 0057) so equality lookups can skip granules. Indexes live on the +# MergeTree local tables only. +index_type = "bloom_filter" +index_granularity = 1 +new_column_indexes = [(f"bf_{column.name}", column.name) for column in new_columns] + +# master is at mv_6 (migration 0058_nest_downsample_tiers), so this migration +# bumps 6 -> 7. The downsample materialized views select an explicit column list +# from eap_items_1_local, so the new columns have to be added to the views (not +# just the tables) for them to be populated on the downsampled read paths. +mv_old_version = 6 +mv_new_version = mv_old_version + 1 + + +def _mv_columns_with_new() -> List[Column[Modifiers]]: + """The mv_6 column list with the new columns inserted after `indexed_name`. + + Built locally instead of in ``get_eap_items_columns`` because migration 0058 + runs before this one and also calls ``get_eap_items_columns``; adding the new + columns there would make 0058 build its views referencing columns that do not + exist yet when it runs. + """ + columns = get_eap_items_columns() + insert_at = next(i for i, c in enumerate(columns) if c.name == "indexed_name") + 1 + return columns[:insert_at] + list(new_columns) + columns[insert_at:] + + +def _query_for_weight(columns: List[Column[Modifiers]]) -> Callable[[int], str]: + # Predicate matches the current (mv_6) nested-subset sampling introduced by + # migration 0058: a single, un-perturbed hash on item_id. + def inner(sampling_weight: int) -> str: + return downsample_mv_select( + columns, + sampling_weight, + where_predicate=f"cityHash64(item_id) % {sampling_weight}", + ) + + return inner + + +def _table_prefixes() -> List[str]: + return [table_name_prefix] + [ + f"eap_items_1_downsample_{sampling_weight}" for sampling_weight in sampling_weights + ] + + +def _local_tables() -> List[str]: + return [f"{prefix}_local" for prefix in _table_prefixes()] + + +def _ro_dist_tables() -> List[str]: + # The read-only distributed tables (created via `CREATE TABLE ... AS`) do not + # inherit schema changes from their source tables, so the columns must be + # added explicitly for queries on the read path to see them. + return [f"{table_name_prefix}_dist_ro"] + [ + f"eap_items_1_downsample_{sampling_weight}_dist_ro" for sampling_weight in sampling_weights + ] + + +class Migration(migration.ClickhouseNodeMigration): + blocking = False + + def forwards_ops(self) -> List[SqlOperation]: + ops: List[SqlOperation] = [] + + for prefix in _table_prefixes(): + for column, after in zip(new_columns, add_column_after): + ops.extend( + [ + operations.AddColumn( + storage_set=storage_set, + table_name=f"{prefix}_local", + column=column, + after=after, + target=OperationTarget.LOCAL, + ), + operations.AddColumn( + storage_set=storage_set, + table_name=f"{prefix}_dist", + column=column, + after=after, + target=OperationTarget.DISTRIBUTED, + ), + ] + ) + + for ro_table in _ro_dist_tables(): + for column, after in zip(new_columns, add_column_after): + ops.append( + operations.AddColumn( + storage_set=ro_storage_set, + table_name=ro_table, + column=column, + after=after, + target=OperationTarget.DISTRIBUTED, + ) + ) + + for table_name in _local_tables(): + for index_name, index_expression in new_column_indexes: + ops.append( + operations.AddIndex( + storage_set=storage_set, + table_name=table_name, + index_name=index_name, + index_expression=index_expression, + index_type=index_type, + granularity=index_granularity, + target=OperationTarget.LOCAL, + ) + ) + + # Recreate the downsample materialized views so they select the new + # columns. The columns must already exist on eap_items_1_local and the + # downsampled tables (added above) before the views are recreated. + mv_columns = _mv_columns_with_new() + ops.extend( + swap_downsample_materialized_views( + columns=mv_columns, + create_version=mv_new_version, + drop_version=mv_old_version, + query_for_weight=_query_for_weight(mv_columns), + ) + ) + + return ops + + def backwards_ops(self) -> List[SqlOperation]: + # Restore the previous materialized views (which do not reference the new + # columns) before dropping the columns the new views read from. + base_columns = get_eap_items_columns() + ops: List[SqlOperation] = list( + swap_downsample_materialized_views( + columns=base_columns, + create_version=mv_old_version, + drop_version=mv_new_version, + query_for_weight=_query_for_weight(base_columns), + ) + ) + + for table_name in _local_tables(): + for index_name, _ in new_column_indexes: + ops.append( + operations.DropIndex( + storage_set=storage_set, + table_name=table_name, + index_name=index_name, + target=OperationTarget.LOCAL, + ) + ) + + for prefix in _table_prefixes(): + for column in reversed(new_columns): + ops.extend( + [ + operations.DropColumn( + storage_set=storage_set, + table_name=f"{prefix}_dist", + column_name=column.name, + target=OperationTarget.DISTRIBUTED, + ), + operations.DropColumn( + storage_set=storage_set, + table_name=f"{prefix}_local", + column_name=column.name, + target=OperationTarget.LOCAL, + ), + ] + ) + + for ro_table in _ro_dist_tables(): + for column in reversed(new_columns): + ops.append( + operations.DropColumn( + storage_set=ro_storage_set, + table_name=ro_table, + column_name=column.name, + target=OperationTarget.DISTRIBUTED, + ) + ) + + return ops From 52d459bd4004dbc99c7edb938c0b2b8b968b39b0 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 22 Jun 2026 16:56:42 +0000 Subject: [PATCH 2/4] feat(eap-items): place conversation_id and session_id after trace_id Move the two new UUID columns so the on-disk order is trace_id -> conversation_id -> session_id, grouping the UUID identifier columns together. Updates the AddColumn `after` targets, the downsample materialized-view column list, and all eight storage configs to match. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_019JsLYHmP1AGxNuNNFazfTP --- .../storages/eap_items.yaml | 4 ++-- .../storages/eap_items_downsample_512.yaml | 4 ++-- .../storages/eap_items_downsample_512_ro.yaml | 4 ++-- .../storages/eap_items_downsample_64.yaml | 4 ++-- .../storages/eap_items_downsample_64_ro.yaml | 4 ++-- .../storages/eap_items_downsample_8.yaml | 4 ++-- .../storages/eap_items_downsample_8_ro.yaml | 4 ++-- .../storages/eap_items_ro.yaml | 4 ++-- .../0059_add_conversation_id_and_session_id.py | 12 ++++++------ 9 files changed, 22 insertions(+), 22 deletions(-) diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items.yaml index 30894a55851..e65bd13c052 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items.yaml @@ -16,13 +16,13 @@ schema: { name: item_type, type: UInt, args: { size: 8 } }, { name: timestamp, type: DateTime }, { name: trace_id, type: UUID }, + { name: conversation_id, type: UUID }, + { name: session_id, type: UUID }, { name: item_id, type: UInt, args: { size: 128 } }, { name: sampling_weight, type: UInt, args: { size: 64 } }, { name: sampling_factor, type: Float, args: { size: 64 } }, { name: retention_days, type: UInt, args: { size: 16 } }, { name: indexed_name, type: String }, - { name: conversation_id, type: UUID }, - { name: session_id, type: UUID }, { name: downsampled_retention_days, type: UInt, args: { size: 16 } }, { name: attributes_bool, type: Map, args: { key: { type: String }, value: { type: Bool } } }, diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_512.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_512.yaml index 2f52e7c2fc2..c6888a40649 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_512.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_512.yaml @@ -16,14 +16,14 @@ schema: { name: item_type, type: UInt, args: { size: 8 } }, { name: timestamp, type: DateTime }, { name: trace_id, type: UUID }, + { name: conversation_id, type: UUID }, + { name: session_id, type: UUID }, { name: item_id, type: UInt, args: { size: 128 } }, { name: sampling_weight, type: UInt, args: { size: 64 } }, { name: sampling_factor, type: Float, args: { size: 64 } }, { name: retention_days, type: UInt, args: { size: 16 } }, { name: indexed_name, type: String }, - { name: conversation_id, type: UUID }, - { name: session_id, type: UUID }, { name: attributes_bool, type: Map, args: { key: { type: String }, value: { type: Bool } } }, { name: attributes_int, type: Map, args: { key: { type: String }, value: { type: Int, args: { size: 64 } } } }, diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_512_ro.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_512_ro.yaml index 5016de27948..25a60931b3d 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_512_ro.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_512_ro.yaml @@ -16,14 +16,14 @@ schema: { name: item_type, type: UInt, args: { size: 8 } }, { name: timestamp, type: DateTime }, { name: trace_id, type: UUID }, + { name: conversation_id, type: UUID }, + { name: session_id, type: UUID }, { name: item_id, type: UInt, args: { size: 128 } }, { name: sampling_weight, type: UInt, args: { size: 64 } }, { name: sampling_factor, type: Float, args: { size: 64 } }, { name: retention_days, type: UInt, args: { size: 16 } }, { name: indexed_name, type: String }, - { name: conversation_id, type: UUID }, - { name: session_id, type: UUID }, { name: attributes_bool, type: Map, args: { key: { type: String }, value: { type: Bool } } }, { name: attributes_int, type: Map, args: { key: { type: String }, value: { type: Int, args: { size: 64 } } } }, diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_64.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_64.yaml index 821788a0c41..217d25c63b5 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_64.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_64.yaml @@ -16,14 +16,14 @@ schema: { name: item_type, type: UInt, args: { size: 8 } }, { name: timestamp, type: DateTime }, { name: trace_id, type: UUID }, + { name: conversation_id, type: UUID }, + { name: session_id, type: UUID }, { name: item_id, type: UInt, args: { size: 128 } }, { name: sampling_weight, type: UInt, args: { size: 64 } }, { name: sampling_factor, type: Float, args: { size: 64 } }, { name: retention_days, type: UInt, args: { size: 16 } }, { name: indexed_name, type: String }, - { name: conversation_id, type: UUID }, - { name: session_id, type: UUID }, { name: attributes_bool, type: Map, args: { key: { type: String }, value: { type: Bool } } }, { name: attributes_int, type: Map, args: { key: { type: String }, value: { type: Int, args: { size: 64 } } } }, diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_64_ro.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_64_ro.yaml index 6ab7a41efd7..954ae575848 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_64_ro.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_64_ro.yaml @@ -16,14 +16,14 @@ schema: { name: item_type, type: UInt, args: { size: 8 } }, { name: timestamp, type: DateTime }, { name: trace_id, type: UUID }, + { name: conversation_id, type: UUID }, + { name: session_id, type: UUID }, { name: item_id, type: UInt, args: { size: 128 } }, { name: sampling_weight, type: UInt, args: { size: 64 } }, { name: sampling_factor, type: Float, args: { size: 64 } }, { name: retention_days, type: UInt, args: { size: 16 } }, { name: indexed_name, type: String }, - { name: conversation_id, type: UUID }, - { name: session_id, type: UUID }, { name: attributes_bool, type: Map, args: { key: { type: String }, value: { type: Bool } } }, { name: attributes_int, type: Map, args: { key: { type: String }, value: { type: Int, args: { size: 64 } } } }, diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_8.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_8.yaml index 134850528eb..7b59a86ef88 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_8.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_8.yaml @@ -16,14 +16,14 @@ schema: { name: item_type, type: UInt, args: { size: 8 } }, { name: timestamp, type: DateTime }, { name: trace_id, type: UUID }, + { name: conversation_id, type: UUID }, + { name: session_id, type: UUID }, { name: item_id, type: UInt, args: { size: 128 } }, { name: sampling_weight, type: UInt, args: { size: 64 } }, { name: sampling_factor, type: Float, args: { size: 64 } }, { name: retention_days, type: UInt, args: { size: 16 } }, { name: indexed_name, type: String }, - { name: conversation_id, type: UUID }, - { name: session_id, type: UUID }, { name: attributes_bool, type: Map, args: { key: { type: String }, value: { type: Bool } } }, { name: attributes_int, type: Map, args: { key: { type: String }, value: { type: Int, args: { size: 64 } } } }, diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_8_ro.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_8_ro.yaml index 27450e63cbf..75ec0715d38 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_8_ro.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_8_ro.yaml @@ -16,14 +16,14 @@ schema: { name: item_type, type: UInt, args: { size: 8 } }, { name: timestamp, type: DateTime }, { name: trace_id, type: UUID }, + { name: conversation_id, type: UUID }, + { name: session_id, type: UUID }, { name: item_id, type: UInt, args: { size: 128 } }, { name: sampling_weight, type: UInt, args: { size: 64 } }, { name: sampling_factor, type: Float, args: { size: 64 } }, { name: retention_days, type: UInt, args: { size: 16 } }, { name: indexed_name, type: String }, - { name: conversation_id, type: UUID }, - { name: session_id, type: UUID }, { name: attributes_bool, type: Map, args: { key: { type: String }, value: { type: Bool } } }, { name: attributes_int, type: Map, args: { key: { type: String }, value: { type: Int, args: { size: 64 } } } }, diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_ro.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_ro.yaml index 8558b9c8ff3..c71a6b3b33f 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_ro.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_ro.yaml @@ -16,13 +16,13 @@ schema: { name: item_type, type: UInt, args: { size: 8 } }, { name: timestamp, type: DateTime }, { name: trace_id, type: UUID }, + { name: conversation_id, type: UUID }, + { name: session_id, type: UUID }, { name: item_id, type: UInt, args: { size: 128 } }, { name: sampling_weight, type: UInt, args: { size: 64 } }, { name: sampling_factor, type: Float, args: { size: 64 } }, { name: retention_days, type: UInt, args: { size: 16 } }, { name: indexed_name, type: String }, - { name: conversation_id, type: UUID }, - { name: session_id, type: UUID }, { name: downsampled_retention_days, type: UInt, args: { size: 16 } }, { name: attributes_bool, type: Map, args: { key: { type: String }, value: { type: Bool } } }, diff --git a/snuba/snuba_migrations/events_analytics_platform/0059_add_conversation_id_and_session_id.py b/snuba/snuba_migrations/events_analytics_platform/0059_add_conversation_id_and_session_id.py index 08505458835..9182a9b9e9c 100644 --- a/snuba/snuba_migrations/events_analytics_platform/0059_add_conversation_id_and_session_id.py +++ b/snuba/snuba_migrations/events_analytics_platform/0059_add_conversation_id_and_session_id.py @@ -19,15 +19,15 @@ sampling_weights = SAMPLING_WEIGHTS # Two new identifier columns, stored as UUID to match the existing `trace_id` -# column. They are added right after `indexed_name` so the on-disk order stays -# stable: indexed_name, conversation_id, session_id. +# column. They are added right after `trace_id` so the on-disk order stays +# stable: trace_id, conversation_id, session_id. new_columns: List[Column[Modifiers]] = [ Column("conversation_id", UUID()), Column("session_id", UUID()), ] # `after` targets chained so the final order is -# indexed_name -> conversation_id -> session_id. -add_column_after = ["indexed_name", "conversation_id"] +# trace_id -> conversation_id -> session_id. +add_column_after = ["trace_id", "conversation_id"] # Bloom-filter indexes on the new columns (mirrors `bf_indexed_name` from # migration 0057) so equality lookups can skip granules. Indexes live on the @@ -45,7 +45,7 @@ def _mv_columns_with_new() -> List[Column[Modifiers]]: - """The mv_6 column list with the new columns inserted after `indexed_name`. + """The mv_6 column list with the new columns inserted after `trace_id`. Built locally instead of in ``get_eap_items_columns`` because migration 0058 runs before this one and also calls ``get_eap_items_columns``; adding the new @@ -53,7 +53,7 @@ def _mv_columns_with_new() -> List[Column[Modifiers]]: exist yet when it runs. """ columns = get_eap_items_columns() - insert_at = next(i for i, c in enumerate(columns) if c.name == "indexed_name") + 1 + insert_at = next(i for i, c in enumerate(columns) if c.name == "trace_id") + 1 return columns[:insert_at] + list(new_columns) + columns[insert_at:] From 9c780452025cbd370db93e2db5dc12f116f6134e Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 22 Jun 2026 17:02:36 +0000 Subject: [PATCH 3/4] feat(eap-items): renumber migration to 0060 and rebase MV onto mv_7 master landed 0059_add_array_attribute_map_columns, which both takes the 0059 number and bumps the downsample materialized views from mv_6 to mv_7. Resolve the collision after merging master: - Rename this migration 0059 -> 0060 (the migration loader rejects duplicate numbers). - Bump the downsample views mv_7 -> mv_8 instead of mv_6 -> mv_7, and build the view column list from the mv_7 set (base columns + the four attributes_array_* columns from 0059) with conversation_id/session_id inserted after trace_id, so the regenerated views match the post-0059 on-disk layout. backwards_ops restores the mv_7 column set. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_019JsLYHmP1AGxNuNNFazfTP --- ...060_add_conversation_id_and_session_id.py} | 58 +++++++++++++------ 1 file changed, 41 insertions(+), 17 deletions(-) rename snuba/snuba_migrations/events_analytics_platform/{0059_add_conversation_id_and_session_id.py => 0060_add_conversation_id_and_session_id.py} (76%) diff --git a/snuba/snuba_migrations/events_analytics_platform/0059_add_conversation_id_and_session_id.py b/snuba/snuba_migrations/events_analytics_platform/0060_add_conversation_id_and_session_id.py similarity index 76% rename from snuba/snuba_migrations/events_analytics_platform/0059_add_conversation_id_and_session_id.py rename to snuba/snuba_migrations/events_analytics_platform/0060_add_conversation_id_and_session_id.py index 9182a9b9e9c..6b70411edf4 100644 --- a/snuba/snuba_migrations/events_analytics_platform/0059_add_conversation_id_and_session_id.py +++ b/snuba/snuba_migrations/events_analytics_platform/0060_add_conversation_id_and_session_id.py @@ -11,7 +11,7 @@ get_eap_items_columns, swap_downsample_materialized_views, ) -from snuba.utils.schemas import UUID +from snuba.utils.schemas import UUID, Array, Bool, Float, Int, Map, String storage_set = StorageSetKey.EVENTS_ANALYTICS_PLATFORM ro_storage_set = StorageSetKey.EVENTS_ANALYTICS_PLATFORM_RO @@ -36,30 +36,54 @@ index_granularity = 1 new_column_indexes = [(f"bf_{column.name}", column.name) for column in new_columns] -# master is at mv_6 (migration 0058_nest_downsample_tiers), so this migration -# bumps 6 -> 7. The downsample materialized views select an explicit column list -# from eap_items_1_local, so the new columns have to be added to the views (not -# just the tables) for them to be populated on the downsampled read paths. -mv_old_version = 6 +# master is at mv_7 (migration 0059_add_array_attribute_map_columns), so this +# migration bumps 7 -> 8. The downsample materialized views select an explicit +# column list from eap_items_1_local, so the new columns have to be added to the +# views (not just the tables) for them to be populated on the downsampled read +# paths. +mv_old_version = 7 mv_new_version = mv_old_version + 1 +# The four array-attribute map columns added by migration +# 0059_add_array_attribute_map_columns (which created mv_7), appended after the +# last base column (`attributes_float_39`). They are reconstructed here — rather +# than added to ``get_eap_items_columns`` — so this migration can rebuild the +# views with the exact mv_7 column set. Adding them to the shared helper would +# break the earlier migrations (0058) that call it and run before 0059 existed. +_codec = Modifiers(codecs=["ZSTD(1)"]) +array_attribute_columns: List[Column[Modifiers]] = [ + Column("attributes_array_string", Map(String(), Array(String()), modifiers=_codec)), + Column("attributes_array_int", Map(String(), Array(Int(64)), modifiers=_codec)), + Column("attributes_array_float", Map(String(), Array(Float(64)), modifiers=_codec)), + Column("attributes_array_bool", Map(String(), Array(Bool()), modifiers=_codec)), +] + + +def _mv7_columns() -> List[Column[Modifiers]]: + """The mv_7 downsample-view column list: the base columns plus the + array-attribute map columns added by migration 0059.""" + columns = get_eap_items_columns() + columns.extend(array_attribute_columns) + return columns + def _mv_columns_with_new() -> List[Column[Modifiers]]: - """The mv_6 column list with the new columns inserted after `trace_id`. + """The mv_7 column list with the new columns inserted after `trace_id`. - Built locally instead of in ``get_eap_items_columns`` because migration 0058 - runs before this one and also calls ``get_eap_items_columns``; adding the new - columns there would make 0058 build its views referencing columns that do not - exist yet when it runs. + Built locally instead of in ``get_eap_items_columns`` because earlier + migrations (0058, 0059) also call ``get_eap_items_columns`` and run before + this one; adding the new columns there would make them build their views + referencing columns that do not exist yet when they run. """ - columns = get_eap_items_columns() + columns = _mv7_columns() insert_at = next(i for i, c in enumerate(columns) if c.name == "trace_id") + 1 return columns[:insert_at] + list(new_columns) + columns[insert_at:] def _query_for_weight(columns: List[Column[Modifiers]]) -> Callable[[int], str]: - # Predicate matches the current (mv_6) nested-subset sampling introduced by - # migration 0058: a single, un-perturbed hash on item_id. + # Predicate matches the current (mv_7) nested-subset sampling introduced by + # migration 0058 and carried through 0059: a single, un-perturbed hash on + # item_id. def inner(sampling_weight: int) -> str: return downsample_mv_select( columns, @@ -158,9 +182,9 @@ def forwards_ops(self) -> List[SqlOperation]: return ops def backwards_ops(self) -> List[SqlOperation]: - # Restore the previous materialized views (which do not reference the new - # columns) before dropping the columns the new views read from. - base_columns = get_eap_items_columns() + # Restore the previous (mv_7) materialized views (which do not reference + # the new columns) before dropping the columns the new views read from. + base_columns = _mv7_columns() ops: List[SqlOperation] = list( swap_downsample_materialized_views( columns=base_columns, From d41c3bdcd5b7783bca7746439406a6cd1faae8aa Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 22 Jun 2026 21:35:52 +0000 Subject: [PATCH 4/4] ref(eap-items): trim migration comments to essentials Condense the verbose explanatory comments in the migration and keep the version references current (mv_7 -> mv_8, predecessor migration 0059). Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_019JsLYHmP1AGxNuNNFazfTP --- ...0060_add_conversation_id_and_session_id.py | 52 ++++++------------- 1 file changed, 16 insertions(+), 36 deletions(-) diff --git a/snuba/snuba_migrations/events_analytics_platform/0060_add_conversation_id_and_session_id.py b/snuba/snuba_migrations/events_analytics_platform/0060_add_conversation_id_and_session_id.py index 6b70411edf4..ca2cc15bb09 100644 --- a/snuba/snuba_migrations/events_analytics_platform/0060_add_conversation_id_and_session_id.py +++ b/snuba/snuba_migrations/events_analytics_platform/0060_add_conversation_id_and_session_id.py @@ -18,38 +18,27 @@ table_name_prefix = "eap_items_1" sampling_weights = SAMPLING_WEIGHTS -# Two new identifier columns, stored as UUID to match the existing `trace_id` -# column. They are added right after `trace_id` so the on-disk order stays -# stable: trace_id, conversation_id, session_id. +# New UUID identifier columns, added after `trace_id`. new_columns: List[Column[Modifiers]] = [ Column("conversation_id", UUID()), Column("session_id", UUID()), ] -# `after` targets chained so the final order is -# trace_id -> conversation_id -> session_id. +# Chained so the on-disk order is trace_id -> conversation_id -> session_id. add_column_after = ["trace_id", "conversation_id"] -# Bloom-filter indexes on the new columns (mirrors `bf_indexed_name` from -# migration 0057) so equality lookups can skip granules. Indexes live on the -# MergeTree local tables only. +# Bloom-filter indexes (like `bf_indexed_name`) on the local MergeTree tables. index_type = "bloom_filter" index_granularity = 1 new_column_indexes = [(f"bf_{column.name}", column.name) for column in new_columns] -# master is at mv_7 (migration 0059_add_array_attribute_map_columns), so this -# migration bumps 7 -> 8. The downsample materialized views select an explicit -# column list from eap_items_1_local, so the new columns have to be added to the -# views (not just the tables) for them to be populated on the downsampled read -# paths. +# master is at mv_7 (migration 0059); bump to mv_8 so the regenerated downsample +# views also project the new columns. mv_old_version = 7 mv_new_version = mv_old_version + 1 -# The four array-attribute map columns added by migration -# 0059_add_array_attribute_map_columns (which created mv_7), appended after the -# last base column (`attributes_float_39`). They are reconstructed here — rather -# than added to ``get_eap_items_columns`` — so this migration can rebuild the -# views with the exact mv_7 column set. Adding them to the shared helper would -# break the earlier migrations (0058) that call it and run before 0059 existed. +# Array-attribute map columns added by migration 0059 (mv_7), appended after the +# last base column. Reconstructed here so the rebuilt views match the mv_7 column +# set without touching the shared get_eap_items_columns() helper. _codec = Modifiers(codecs=["ZSTD(1)"]) array_attribute_columns: List[Column[Modifiers]] = [ Column("attributes_array_string", Map(String(), Array(String()), modifiers=_codec)), @@ -60,8 +49,7 @@ def _mv7_columns() -> List[Column[Modifiers]]: - """The mv_7 downsample-view column list: the base columns plus the - array-attribute map columns added by migration 0059.""" + """The mv_7 column list: base columns plus the array columns from migration 0059.""" columns = get_eap_items_columns() columns.extend(array_attribute_columns) return columns @@ -70,10 +58,8 @@ def _mv7_columns() -> List[Column[Modifiers]]: def _mv_columns_with_new() -> List[Column[Modifiers]]: """The mv_7 column list with the new columns inserted after `trace_id`. - Built locally instead of in ``get_eap_items_columns`` because earlier - migrations (0058, 0059) also call ``get_eap_items_columns`` and run before - this one; adding the new columns there would make them build their views - referencing columns that do not exist yet when they run. + Built locally rather than in ``get_eap_items_columns`` so the earlier + migrations (0058, 0059) that call it are unaffected. """ columns = _mv7_columns() insert_at = next(i for i, c in enumerate(columns) if c.name == "trace_id") + 1 @@ -81,9 +67,7 @@ def _mv_columns_with_new() -> List[Column[Modifiers]]: def _query_for_weight(columns: List[Column[Modifiers]]) -> Callable[[int], str]: - # Predicate matches the current (mv_7) nested-subset sampling introduced by - # migration 0058 and carried through 0059: a single, un-perturbed hash on - # item_id. + # Un-perturbed per-item hash, matching the current mv_7 sampling. def inner(sampling_weight: int) -> str: return downsample_mv_select( columns, @@ -105,9 +89,8 @@ def _local_tables() -> List[str]: def _ro_dist_tables() -> List[str]: - # The read-only distributed tables (created via `CREATE TABLE ... AS`) do not - # inherit schema changes from their source tables, so the columns must be - # added explicitly for queries on the read path to see them. + # Read-only distributed tables (CREATE TABLE ... AS) don't inherit schema + # changes from their source, so the columns are added explicitly. return [f"{table_name_prefix}_dist_ro"] + [ f"eap_items_1_downsample_{sampling_weight}_dist_ro" for sampling_weight in sampling_weights ] @@ -166,9 +149,7 @@ def forwards_ops(self) -> List[SqlOperation]: ) ) - # Recreate the downsample materialized views so they select the new - # columns. The columns must already exist on eap_items_1_local and the - # downsampled tables (added above) before the views are recreated. + # Recreate the downsample views so they project the new columns (added above). mv_columns = _mv_columns_with_new() ops.extend( swap_downsample_materialized_views( @@ -182,8 +163,7 @@ def forwards_ops(self) -> List[SqlOperation]: return ops def backwards_ops(self) -> List[SqlOperation]: - # Restore the previous (mv_7) materialized views (which do not reference - # the new columns) before dropping the columns the new views read from. + # Restore the mv_7 views before dropping the columns they read from. base_columns = _mv7_columns() ops: List[SqlOperation] = list( swap_downsample_materialized_views(