diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items.yaml index 6068562ffc..fbb3904e26 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items.yaml @@ -16,6 +16,8 @@ schema: { name: item_type, type: UInt, args: { size: 8 } }, { name: timestamp, type: DateTime }, { name: trace_id, type: UUID }, + { name: conversation_id, type: UUID }, + { name: session_id, type: UUID }, { name: item_id, type: UInt, args: { size: 128 } }, { name: sampling_weight, type: UInt, args: { size: 64 } }, { name: sampling_factor, type: Float, args: { size: 64 } }, @@ -120,7 +122,7 @@ query_processors: - processor: UniqInSelectAndHavingProcessor - processor: UUIDColumnProcessor args: - columns: [trace_id] + columns: [trace_id, conversation_id, session_id] - processor: HexIntColumnProcessor args: columns: [item_id] diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_512.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_512.yaml index ac5d5541dd..5f14c14a3d 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_512.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_512.yaml @@ -16,6 +16,8 @@ schema: { name: item_type, type: UInt, args: { size: 8 } }, { name: timestamp, type: DateTime }, { name: trace_id, type: UUID }, + { name: conversation_id, type: UUID }, + { name: session_id, type: UUID }, { name: item_id, type: UInt, args: { size: 128 } }, { name: sampling_weight, type: UInt, args: { size: 64 } }, { name: sampling_factor, type: Float, args: { size: 64 } }, @@ -120,7 +122,7 @@ query_processors: - processor: UniqInSelectAndHavingProcessor - processor: UUIDColumnProcessor args: - columns: [trace_id] + columns: [trace_id, conversation_id, session_id] - processor: HexIntColumnProcessor args: columns: [item_id] diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_512_ro.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_512_ro.yaml index 5a11dab279..04a0242eb8 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_512_ro.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_512_ro.yaml @@ -16,6 +16,8 @@ schema: { name: item_type, type: UInt, args: { size: 8 } }, { name: timestamp, type: DateTime }, { name: trace_id, type: UUID }, + { name: conversation_id, type: UUID }, + { name: session_id, type: UUID }, { name: item_id, type: UInt, args: { size: 128 } }, { name: sampling_weight, type: UInt, args: { size: 64 } }, { name: sampling_factor, type: Float, args: { size: 64 } }, @@ -120,7 +122,7 @@ query_processors: - processor: UniqInSelectAndHavingProcessor - processor: UUIDColumnProcessor args: - columns: [trace_id] + columns: [trace_id, conversation_id, session_id] - processor: HexIntColumnProcessor args: columns: [item_id] diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_64.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_64.yaml index 7514d26e34..0261bc624b 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_64.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_64.yaml @@ -16,6 +16,8 @@ schema: { name: item_type, type: UInt, args: { size: 8 } }, { name: timestamp, type: DateTime }, { name: trace_id, type: UUID }, + { name: conversation_id, type: UUID }, + { name: session_id, type: UUID }, { name: item_id, type: UInt, args: { size: 128 } }, { name: sampling_weight, type: UInt, args: { size: 64 } }, { name: sampling_factor, type: Float, args: { size: 64 } }, @@ -120,7 +122,7 @@ query_processors: - processor: UniqInSelectAndHavingProcessor - processor: UUIDColumnProcessor args: - columns: [trace_id] + columns: [trace_id, conversation_id, session_id] - processor: HexIntColumnProcessor args: columns: [item_id] diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_64_ro.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_64_ro.yaml index 837bdb998c..e0c6d4799e 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_64_ro.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_64_ro.yaml @@ -16,6 +16,8 @@ schema: { name: item_type, type: UInt, args: { size: 8 } }, { name: timestamp, type: DateTime }, { name: trace_id, type: UUID }, + { name: conversation_id, type: UUID }, + { name: session_id, type: UUID }, { name: item_id, type: UInt, args: { size: 128 } }, { name: sampling_weight, type: UInt, args: { size: 64 } }, { name: sampling_factor, type: Float, args: { size: 64 } }, @@ -120,7 +122,7 @@ query_processors: - processor: UniqInSelectAndHavingProcessor - processor: UUIDColumnProcessor args: - columns: [trace_id] + columns: [trace_id, conversation_id, session_id] - processor: HexIntColumnProcessor args: columns: [item_id] diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_8.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_8.yaml index d1cf865fc1..298f091e02 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_8.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_8.yaml @@ -16,6 +16,8 @@ schema: { name: item_type, type: UInt, args: { size: 8 } }, { name: timestamp, type: DateTime }, { name: trace_id, type: UUID }, + { name: conversation_id, type: UUID }, + { name: session_id, type: UUID }, { name: item_id, type: UInt, args: { size: 128 } }, { name: sampling_weight, type: UInt, args: { size: 64 } }, { name: sampling_factor, type: Float, args: { size: 64 } }, @@ -120,7 +122,7 @@ query_processors: - processor: UniqInSelectAndHavingProcessor - processor: UUIDColumnProcessor args: - columns: [trace_id] + columns: [trace_id, conversation_id, session_id] - processor: HexIntColumnProcessor args: columns: [item_id] diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_8_ro.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_8_ro.yaml index d1f1d1c816..61886ad8f6 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_8_ro.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_8_ro.yaml @@ -16,6 +16,8 @@ schema: { name: item_type, type: UInt, args: { size: 8 } }, { name: timestamp, type: DateTime }, { name: trace_id, type: UUID }, + { name: conversation_id, type: UUID }, + { name: session_id, type: UUID }, { name: item_id, type: UInt, args: { size: 128 } }, { name: sampling_weight, type: UInt, args: { size: 64 } }, { name: sampling_factor, type: Float, args: { size: 64 } }, @@ -120,7 +122,7 @@ query_processors: - processor: UniqInSelectAndHavingProcessor - processor: UUIDColumnProcessor args: - columns: [trace_id] + columns: [trace_id, conversation_id, session_id] - processor: HexIntColumnProcessor args: columns: [item_id] diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_ro.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_ro.yaml index fe8454262f..c7c8a637b5 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_ro.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_items_ro.yaml @@ -16,6 +16,8 @@ schema: { name: item_type, type: UInt, args: { size: 8 } }, { name: timestamp, type: DateTime }, { name: trace_id, type: UUID }, + { name: conversation_id, type: UUID }, + { name: session_id, type: UUID }, { name: item_id, type: UInt, args: { size: 128 } }, { name: sampling_weight, type: UInt, args: { size: 64 } }, { name: sampling_factor, type: Float, args: { size: 64 } }, @@ -120,7 +122,7 @@ query_processors: - processor: UniqInSelectAndHavingProcessor - processor: UUIDColumnProcessor args: - columns: [trace_id] + columns: [trace_id, conversation_id, session_id] - processor: HexIntColumnProcessor args: columns: [item_id] diff --git a/snuba/snuba_migrations/events_analytics_platform/0060_add_conversation_id_and_session_id.py b/snuba/snuba_migrations/events_analytics_platform/0060_add_conversation_id_and_session_id.py new file mode 100644 index 0000000000..ca2cc15bb0 --- /dev/null +++ b/snuba/snuba_migrations/events_analytics_platform/0060_add_conversation_id_and_session_id.py @@ -0,0 +1,218 @@ +from typing import Callable, List + +from snuba.clickhouse.columns import Column +from snuba.clusters.storage_sets import StorageSetKey +from snuba.migrations import migration, operations +from snuba.migrations.columns import MigrationModifiers as Modifiers +from snuba.migrations.operations import OperationTarget, SqlOperation +from snuba.snuba_migrations.events_analytics_platform.templates import ( + SAMPLING_WEIGHTS, + downsample_mv_select, + get_eap_items_columns, + swap_downsample_materialized_views, +) +from snuba.utils.schemas import UUID, Array, Bool, Float, Int, Map, String + +storage_set = StorageSetKey.EVENTS_ANALYTICS_PLATFORM +ro_storage_set = StorageSetKey.EVENTS_ANALYTICS_PLATFORM_RO +table_name_prefix = "eap_items_1" +sampling_weights = SAMPLING_WEIGHTS + +# New UUID identifier columns, added after `trace_id`. +new_columns: List[Column[Modifiers]] = [ + Column("conversation_id", UUID()), + Column("session_id", UUID()), +] +# Chained so the on-disk order is trace_id -> conversation_id -> session_id. +add_column_after = ["trace_id", "conversation_id"] + +# Bloom-filter indexes (like `bf_indexed_name`) on the local MergeTree tables. +index_type = "bloom_filter" +index_granularity = 1 +new_column_indexes = [(f"bf_{column.name}", column.name) for column in new_columns] + +# master is at mv_7 (migration 0059); bump to mv_8 so the regenerated downsample +# views also project the new columns. +mv_old_version = 7 +mv_new_version = mv_old_version + 1 + +# Array-attribute map columns added by migration 0059 (mv_7), appended after the +# last base column. Reconstructed here so the rebuilt views match the mv_7 column +# set without touching the shared get_eap_items_columns() helper. +_codec = Modifiers(codecs=["ZSTD(1)"]) +array_attribute_columns: List[Column[Modifiers]] = [ + Column("attributes_array_string", Map(String(), Array(String()), modifiers=_codec)), + Column("attributes_array_int", Map(String(), Array(Int(64)), modifiers=_codec)), + Column("attributes_array_float", Map(String(), Array(Float(64)), modifiers=_codec)), + Column("attributes_array_bool", Map(String(), Array(Bool()), modifiers=_codec)), +] + + +def _mv7_columns() -> List[Column[Modifiers]]: + """The mv_7 column list: base columns plus the array columns from migration 0059.""" + columns = get_eap_items_columns() + columns.extend(array_attribute_columns) + return columns + + +def _mv_columns_with_new() -> List[Column[Modifiers]]: + """The mv_7 column list with the new columns inserted after `trace_id`. + + Built locally rather than in ``get_eap_items_columns`` so the earlier + migrations (0058, 0059) that call it are unaffected. + """ + columns = _mv7_columns() + insert_at = next(i for i, c in enumerate(columns) if c.name == "trace_id") + 1 + return columns[:insert_at] + list(new_columns) + columns[insert_at:] + + +def _query_for_weight(columns: List[Column[Modifiers]]) -> Callable[[int], str]: + # Un-perturbed per-item hash, matching the current mv_7 sampling. + def inner(sampling_weight: int) -> str: + return downsample_mv_select( + columns, + sampling_weight, + where_predicate=f"cityHash64(item_id) % {sampling_weight}", + ) + + return inner + + +def _table_prefixes() -> List[str]: + return [table_name_prefix] + [ + f"eap_items_1_downsample_{sampling_weight}" for sampling_weight in sampling_weights + ] + + +def _local_tables() -> List[str]: + return [f"{prefix}_local" for prefix in _table_prefixes()] + + +def _ro_dist_tables() -> List[str]: + # Read-only distributed tables (CREATE TABLE ... AS) don't inherit schema + # changes from their source, so the columns are added explicitly. + return [f"{table_name_prefix}_dist_ro"] + [ + f"eap_items_1_downsample_{sampling_weight}_dist_ro" for sampling_weight in sampling_weights + ] + + +class Migration(migration.ClickhouseNodeMigration): + blocking = False + + def forwards_ops(self) -> List[SqlOperation]: + ops: List[SqlOperation] = [] + + for prefix in _table_prefixes(): + for column, after in zip(new_columns, add_column_after): + ops.extend( + [ + operations.AddColumn( + storage_set=storage_set, + table_name=f"{prefix}_local", + column=column, + after=after, + target=OperationTarget.LOCAL, + ), + operations.AddColumn( + storage_set=storage_set, + table_name=f"{prefix}_dist", + column=column, + after=after, + target=OperationTarget.DISTRIBUTED, + ), + ] + ) + + for ro_table in _ro_dist_tables(): + for column, after in zip(new_columns, add_column_after): + ops.append( + operations.AddColumn( + storage_set=ro_storage_set, + table_name=ro_table, + column=column, + after=after, + target=OperationTarget.DISTRIBUTED, + ) + ) + + for table_name in _local_tables(): + for index_name, index_expression in new_column_indexes: + ops.append( + operations.AddIndex( + storage_set=storage_set, + table_name=table_name, + index_name=index_name, + index_expression=index_expression, + index_type=index_type, + granularity=index_granularity, + target=OperationTarget.LOCAL, + ) + ) + + # Recreate the downsample views so they project the new columns (added above). + mv_columns = _mv_columns_with_new() + ops.extend( + swap_downsample_materialized_views( + columns=mv_columns, + create_version=mv_new_version, + drop_version=mv_old_version, + query_for_weight=_query_for_weight(mv_columns), + ) + ) + + return ops + + def backwards_ops(self) -> List[SqlOperation]: + # Restore the mv_7 views before dropping the columns they read from. + base_columns = _mv7_columns() + ops: List[SqlOperation] = list( + swap_downsample_materialized_views( + columns=base_columns, + create_version=mv_old_version, + drop_version=mv_new_version, + query_for_weight=_query_for_weight(base_columns), + ) + ) + + for table_name in _local_tables(): + for index_name, _ in new_column_indexes: + ops.append( + operations.DropIndex( + storage_set=storage_set, + table_name=table_name, + index_name=index_name, + target=OperationTarget.LOCAL, + ) + ) + + for prefix in _table_prefixes(): + for column in reversed(new_columns): + ops.extend( + [ + operations.DropColumn( + storage_set=storage_set, + table_name=f"{prefix}_dist", + column_name=column.name, + target=OperationTarget.DISTRIBUTED, + ), + operations.DropColumn( + storage_set=storage_set, + table_name=f"{prefix}_local", + column_name=column.name, + target=OperationTarget.LOCAL, + ), + ] + ) + + for ro_table in _ro_dist_tables(): + for column in reversed(new_columns): + ops.append( + operations.DropColumn( + storage_set=ro_storage_set, + table_name=ro_table, + column_name=column.name, + target=OperationTarget.DISTRIBUTED, + ) + ) + + return ops