diff --git a/rust_snuba/Cargo.lock b/rust_snuba/Cargo.lock index b21216974ad..a5b5edf1b74 100644 --- a/rust_snuba/Cargo.lock +++ b/rust_snuba/Cargo.lock @@ -550,12 +550,27 @@ dependencies = [ "tracing", ] +[[package]] +name = "bnum" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "119771309b95163ec7aaf79810da82f7cd0599c19722d48b9c03894dca833966" + [[package]] name = "borrow-or-share" version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc0b364ead1874514c8c2855ab558056ebfeb775653e7ae45ff72f28f8f3166c" +[[package]] +name = "bstr" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63044e1ae8e69f3b5a92c736ca6269b8d12fa7efe39bf34ddb06d102cf0e2cab" +dependencies = [ + "memchr", +] + [[package]] name = "bumpalo" version = "3.16.0" @@ -573,6 +588,9 @@ name = "bytes" version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" +dependencies = [ + "serde", +] [[package]] name = "cadence" @@ -701,6 +719,58 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" +[[package]] +name = "clickhouse" +version = "0.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8063696febb0a10a6fb9df1460c52c509188f1d19da2157694ab3ca0feffc74" +dependencies = [ + "bnum", + "bstr", + "bytes", + "cityhash-rs", + "clickhouse-macros", + "clickhouse-types", + "futures-channel", + "futures-util", + "http-body-util", + "hyper 1.6.0", + "hyper-tls 0.6.0", + "hyper-util", + "lz4_flex", + "polonius-the-crab", + "quanta", + "serde", + "serde_json", + "thiserror 2.0.18", + "tokio", + "tracing", + "url", + "uuid", +] + +[[package]] +name = "clickhouse-macros" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff6669899e23cb87b43daf7996f0ea3b9c07d0fb933d745bb7b815b052515ae3" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals 0.29.1", + "syn 2.0.117", +] + +[[package]] +name = "clickhouse-types" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30a5efddc880ce9e2573bd867413d9056fa2bea0206af88dec21e72178b9dc74" +dependencies = [ + "bytes", + "thiserror 2.0.18", +] + [[package]] name = "cmake" version = "0.1.57" @@ -1563,6 +1633,17 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "higher-kinded-types" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e690f8474c6c5d8ff99656fcbc195a215acc3949481a8b0b3351c838972dc776" +dependencies = [ + "macro_rules_attribute", + "never-say-never", + "paste", +] + [[package]] name = "hostname" version = "0.4.1" @@ -2362,6 +2443,22 @@ dependencies = [ "twox-hash", ] +[[package]] +name = "macro_rules_attribute" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65049d7923698040cd0b1ddcced9b0eb14dd22c5f86ae59c3740eab64a676520" +dependencies = [ + "macro_rules_attribute-proc_macro", + "paste", +] + +[[package]] +name = "macro_rules_attribute-proc_macro" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "670fdfda89751bc4a84ac13eaa63e205cf0fd22b4c9a5fbfa085b63c1f1d3a30" + [[package]] name = "matchers" version = "0.2.0" @@ -2448,6 +2545,12 @@ dependencies = [ "tempfile", ] +[[package]] +name = "never-say-never" +version = "6.6.666" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf5a574dadd7941adeaa71823ecba5e28331b8313fb2e1c6a5c7e5981ea53ad6" + [[package]] name = "new_debug_unreachable" version = "1.0.6" @@ -2701,6 +2804,12 @@ dependencies = [ "windows-link 0.2.1", ] +[[package]] +name = "paste" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" + [[package]] name = "pathfinding" version = "4.9.1" @@ -2909,6 +3018,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "polonius-the-crab" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec242d7eccbb2fd8b3b5b6e3cf89f94a91a800f469005b44d154359609f8af72" +dependencies = [ + "higher-kinded-types", + "never-say-never", +] + [[package]] name = "portable-atomic" version = "1.11.1" @@ -3083,6 +3202,21 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "quanta" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quote" version = "1.0.40" @@ -3163,6 +3297,15 @@ dependencies = [ "getrandom 0.3.4", ] +[[package]] +name = "raw-cpuid" +version = "11.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186" +dependencies = [ + "bitflags 2.11.0", +] + [[package]] name = "rayon" version = "1.10.0" @@ -3400,6 +3543,7 @@ dependencies = [ "cadence", "chrono", "cityhash-rs", + "clickhouse", "criterion", "ctrlc", "data-encoding", @@ -3582,7 +3726,7 @@ checksum = "c767fd6fa65d9ccf9cf026122c1b555f2ef9a4f0cea69da4d7dbc3e258d30967" dependencies = [ "proc-macro2", "quote", - "serde_derive_internals", + "serde_derive_internals 0.26.0", "syn 1.0.109", ] @@ -3893,6 +4037,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "serde_derive_internals" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "serde_json" version = "1.0.149" diff --git a/rust_snuba/Cargo.toml b/rust_snuba/Cargo.toml index 39f69d6c3fd..67f4e889924 100644 --- a/rust_snuba/Cargo.toml +++ b/rust_snuba/Cargo.toml @@ -27,6 +27,15 @@ anyhow = { version = "1.0.69", features = ["backtrace"] } cadence = "1.0.0" chrono = { version = "0.4.26", features = ["serde"] } cityhash-rs = "1.0.1" +clickhouse = { version = "0.15", default-features = false, features = [ + "inserter", + "lz4", + "uuid", + # TLS for `secure: true` clusters (https URLs). native-tls matches the + # existing reqwest-based writer (OpenSSL + system roots), and openssl-sys is + # already pulled in by reqwest, so this adds no new system dependency. + "native-tls", +] } ctrlc = { version = "3.2.5", features = ["termination"] } data-encoding = "2.5.0" futures = "0.3.21" diff --git a/rust_snuba/rust-toolchain.toml b/rust_snuba/rust-toolchain.toml index e88baf106b9..9cf4a67c474 100644 --- a/rust_snuba/rust-toolchain.toml +++ b/rust_snuba/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "1.88.0" +channel = "1.94.1" diff --git a/rust_snuba/src/factory_v2.rs b/rust_snuba/src/factory_v2.rs index 83acfeb9503..089af11eb1b 100644 --- a/rust_snuba/src/factory_v2.rs +++ b/rust_snuba/src/factory_v2.rs @@ -26,12 +26,14 @@ use crate::processors::{self, get_cogs_label}; use crate::strategies::accountant::RecordCogs; use crate::strategies::blq_router::BLQRouter; -use crate::strategies::clickhouse::writer_v2::{ClickhouseWriterStep, InsertFormat}; +use crate::strategies::clickhouse::inserter_sink::EapItemsInserterSink; +use crate::strategies::clickhouse::writer_v2::ClickhouseWriterStep; use crate::strategies::commit_log::ProduceCommitLog; use crate::strategies::healthcheck::HealthCheck as SnubaHealthCheck; use crate::strategies::join_timeout::SetJoinTimeout; use crate::strategies::processor::{ - get_schema, make_rust_processor, make_rust_processor_with_replacements, validate_schema, + get_schema, make_rust_processor, make_rust_processor_typed, + make_rust_processor_with_replacements, validate_schema, }; use crate::strategies::python::PythonTransformStep; use crate::strategies::replacements::ProduceReplacements; @@ -87,32 +89,29 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { } } + // The Reduce accumulator closure returns Result<_, SubmitError<_>>, whose + // Err variant is large (it wraps a Message); that shape is dictated by + // arroyo's API, not us. + #[allow(clippy::result_large_err)] fn create(&self) -> Box> { - let (insert_format, process_fn_override, insert_columns): ( - InsertFormat, - Option, - Option<&'static [&'static str]>, - ) = if self.use_row_binary { - tracing::info!("Using RowBinary wire format"); + // When `use_row_binary` is set, eap_items inserts go through the + // official clickhouse-crate inserter sink (RowBinaryWithNamesAndTypes + + // schema validation), bypassing the JSONEachRow `Reduce` + + // `ClickhouseWriterStep` pair. It is only wired for EAPItemsProcessor. + let use_eap_crate_path = self.use_row_binary; + if use_eap_crate_path { let processor_name = self .storage_config .message_processor .python_class_name .as_str(); - let (func, columns): ( - crate::processors::ProcessingFunction, - &'static [&'static str], - ) = match processor_name { - "EAPItemsProcessor" => ( - crate::processors::eap_items::process_message_row_binary, - crate::processors::eap_items::EAPItemRow::COLUMN_NAMES, - ), - name => panic!("RowBinary not supported for processor: {name}"), - }; - (InsertFormat::RowBinary, Some(func), Some(columns)) - } else { - (InsertFormat::JsonEachRow, None, None) - }; + assert_eq!( + processor_name, "EAPItemsProcessor", + "use_row_binary (clickhouse-crate inserter) is only supported for \ + EAPItemsProcessor, got {processor_name}", + ); + tracing::info!("Using clickhouse-crate RowBinary inserter for eap_items"); + } // Commit offsets let next_step = CommitOffsets::new(Duration::from_secs(1)); @@ -151,136 +150,158 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { Some(Duration::from_millis(self.join_timeout_ms.unwrap_or(0))), ); - let next_step = ClickhouseWriterStep::new( - next_step, - self.storage_config.clickhouse_cluster.clone(), - self.storage_config.clickhouse_table_name.clone(), - false, - &self.clickhouse_concurrency, - self.storage_config.name.clone(), - insert_format, - insert_columns, - ); - - let accumulator = Arc::new( - |batch: BytesInsertBatch, small_batch: Message>| { - Ok(batch.merge(small_batch.into_payload())) - }, - ); - - let compute_batch_size: fn(&BytesInsertBatch) -> usize = - match self.max_batch_size_calculation { - config::BatchSizeCalculation::Bytes => |batch| batch.num_bytes(), - config::BatchSizeCalculation::Rows => |batch| batch.len(), - }; - - let next_step = Reduce::new( - next_step, - accumulator, - Arc::new(move || { - BytesInsertBatch::::new( - RowData::default(), - None, - None, - None, - Default::default(), - CogsData::default(), - ) - }), - self.max_batch_size, - self.max_batch_time, - compute_batch_size, - // we need to enable this to deal with storages where we skip 100% of values. - // we still need to commit there - ) - .flush_empty_batches(true); - - // RowBinary can only be emitted by the Rust processor (the Python path - // always returns JSONEachRow bytes). If the storage opted into - // RowBinary, force the Rust processor branch — otherwise we'd POST - // JSON bytes under `FORMAT RowBinary` and ClickHouse would reject the - // batch. The previous early-return for RowBinary did this implicitly. - let use_rust_processor = self.use_rust_processor || self.use_row_binary; - - // Transform messages - let next_step = match ( - use_rust_processor, - processors::get_processing_function( - &self.storage_config.message_processor.python_class_name, - ), - ) { - ( - true, - Some(processors::ProcessingFunctionType::ProcessingFunctionWithReplacements(func)), - ) => { - let (replacements_config, replacements_destination) = - self.replacements_config.clone().unwrap(); - - let producer = KafkaProducer::new(replacements_config); - let replacements_step = ProduceReplacements::new( - next_step, - producer, - replacements_destination, - &self.replacements_concurrency, - false, - ); - - make_rust_processor_with_replacements( - replacements_step, - func, - &self.logical_topic_name, - self.enforce_schema, - &self.processing_concurrency, - config::ProcessorConfig { - env_config: self.env_config.clone(), - storage_name: self.storage_config.name.clone(), - }, - self.stop_at_timestamp, - ) - } - (true, Some(processors::ProcessingFunctionType::ProcessingFunction(func))) => { - // For storages opted into RowBinary, swap the registered JSON - // processor for its RowBinary sibling. Same signature, same - // pipeline; only the encoding inside the processor differs. - let func = process_fn_override.unwrap_or(func); - make_rust_processor( - next_step, - func, - &self.logical_topic_name, - self.enforce_schema, - &self.processing_concurrency, - config::ProcessorConfig { - env_config: self.env_config.clone(), - storage_name: self.storage_config.name.clone(), - }, - self.stop_at_timestamp, - ) - } - ( + let next_step: Box> = if use_eap_crate_path { + // eap_items: typed processor → long-lived clickhouse-crate inserter + // sink (which replaces Reduce + ClickhouseWriterStep for this path). + let sink = EapItemsInserterSink::new( + next_step, + self.storage_config.clickhouse_cluster.clone(), + self.storage_config.clickhouse_table_name.clone(), + self.storage_config.name.clone(), + &self.clickhouse_concurrency, + self.max_batch_size, + self.max_batch_time, + self.max_batch_size_calculation, false, - Some(processors::ProcessingFunctionType::ProcessingFunctionWithReplacements(_)), - ) => { - panic!("Consumer with replacements cannot be run in hybrid-mode"); - } - _ => { - let schema = get_schema(&self.logical_topic_name, self.enforce_schema); + ); + make_rust_processor_typed( + sink, + crate::processors::eap_items::process_message_eap_row, + &self.logical_topic_name, + self.enforce_schema, + &self.processing_concurrency, + config::ProcessorConfig { + env_config: self.env_config.clone(), + storage_name: self.storage_config.name.clone(), + }, + self.stop_at_timestamp, + ) + } else { + let next_step = ClickhouseWriterStep::new( + next_step, + self.storage_config.clickhouse_cluster.clone(), + self.storage_config.clickhouse_table_name.clone(), + false, + &self.clickhouse_concurrency, + self.storage_config.name.clone(), + ); + + let accumulator = Arc::new( + |batch: BytesInsertBatch, + small_batch: Message>| { + Ok(batch.merge(small_batch.into_payload())) + }, + ); + + let compute_batch_size: fn(&BytesInsertBatch) -> usize = + match self.max_batch_size_calculation { + config::BatchSizeCalculation::Bytes => |batch| batch.num_bytes(), + config::BatchSizeCalculation::Rows => |batch| batch.len(), + }; + + let next_step = Reduce::new( + next_step, + accumulator, + Arc::new(move || { + BytesInsertBatch::::new( + RowData::default(), + None, + None, + None, + Default::default(), + CogsData::default(), + ) + }), + self.max_batch_size, + self.max_batch_time, + compute_batch_size, + // we need to enable this to deal with storages where we skip 100% of values. + // we still need to commit there + ) + .flush_empty_batches(true); + + let use_rust_processor = self.use_rust_processor; + + // Transform messages + let next_step = match ( + use_rust_processor, + processors::get_processing_function( + &self.storage_config.message_processor.python_class_name, + ), + ) { + ( + true, + Some(processors::ProcessingFunctionType::ProcessingFunctionWithReplacements( + func, + )), + ) => { + let (replacements_config, replacements_destination) = + self.replacements_config.clone().unwrap(); + + let producer = KafkaProducer::new(replacements_config); + let replacements_step = ProduceReplacements::new( + next_step, + producer, + replacements_destination, + &self.replacements_concurrency, + false, + ); - Box::new(RunTaskInThreads::new( - PythonTransformStep::new( + make_rust_processor_with_replacements( + replacements_step, + func, + &self.logical_topic_name, + self.enforce_schema, + &self.processing_concurrency, + config::ProcessorConfig { + env_config: self.env_config.clone(), + storage_name: self.storage_config.name.clone(), + }, + self.stop_at_timestamp, + ) + } + (true, Some(processors::ProcessingFunctionType::ProcessingFunction(func))) => { + make_rust_processor( next_step, - self.storage_config.message_processor.clone(), - self.processing_concurrency.concurrency, - self.python_max_queue_depth, + func, + &self.logical_topic_name, + self.enforce_schema, + &self.processing_concurrency, + config::ProcessorConfig { + env_config: self.env_config.clone(), + storage_name: self.storage_config.name.clone(), + }, + self.stop_at_timestamp, ) - .unwrap(), - SchemaValidator { - schema, - enforce_schema: self.enforce_schema, - }, - &self.processing_concurrency, - Some("validate_schema"), - )) - } + } + ( + false, + Some(processors::ProcessingFunctionType::ProcessingFunctionWithReplacements(_)), + ) => { + panic!("Consumer with replacements cannot be run in hybrid-mode"); + } + _ => { + let schema = get_schema(&self.logical_topic_name, self.enforce_schema); + + Box::new(RunTaskInThreads::new( + PythonTransformStep::new( + next_step, + self.storage_config.message_processor.clone(), + self.processing_concurrency.concurrency, + self.python_max_queue_depth, + ) + .unwrap(), + SchemaValidator { + schema, + enforce_schema: self.enforce_schema, + }, + &self.processing_concurrency, + Some("validate_schema"), + )) + } + }; + + next_step }; // force message processor to drop all in-flight messages, as it is not worth the time @@ -420,6 +441,7 @@ mod tests { } } + #[allow(clippy::result_large_err)] fn build_reduce( next_step: RecordingStep, max_batch_size: usize, diff --git a/rust_snuba/src/processors/eap_items.rs b/rust_snuba/src/processors/eap_items.rs index ebfe8ba25dd..348d106da2a 100644 --- a/rust_snuba/src/processors/eap_items.rs +++ b/rust_snuba/src/processors/eap_items.rs @@ -19,9 +19,10 @@ use crate::processors::utils::{ record_invalid_timestamp_metric, SilencedDLQMessage, }; use crate::runtime_config::get_str_config; -use crate::strategies::clickhouse::rowbinary; use crate::types::CogsData; -use crate::types::{item_type_name, InsertBatch, ItemTypeMetrics, KafkaMessageMetadata}; +use crate::types::{ + item_type_name, InsertBatch, ItemTypeMetrics, KafkaMessageMetadata, TypedInsertBatch, +}; /// Runtime config key prefix. Per-storage key /// `eap_items_dlq_grace_period_min:`: a non-negative integer @@ -196,27 +197,32 @@ pub fn process_message( Ok(batch) } -pub fn process_message_row_binary( +/// Production RowBinary path: build the typed `EAPItemRow` and hand it +/// downstream to the `clickhouse`-crate inserter sink, which serializes it +/// itself (`write(&row)`) the moment it arrives and drops the wide struct, +/// keeping only the inserter's byte buffer in memory. No bytes are encoded +/// here — the crate owns RowBinary encoding + schema validation. +pub fn process_message_eap_row( msg: KafkaPayload, _metadata: KafkaMessageMetadata, config: &ProcessorConfig, -) -> anyhow::Result { +) -> anyhow::Result> { + // Source proto length, used as the byte-size estimate for byte-based batch + // sizing (the sink also reports the exact encoded size after each flush). + let num_bytes = msg.payload().map(|p| p.len()).unwrap_or(0); let processed = process_eap_item(msg, config)?; if processed.should_skip { - return Ok(InsertBatch::skip()); + return Ok(TypedInsertBatch::skip()); } let row = EAPItemRow::try_from(processed.eap_item)?; - - // Encode the row to RowBinary bytes inline so the wide typed struct (~80 - // Vec<(String, _)> buckets) drops here instead of riding the pipeline to - // the writer step. The batch downstream sees only a compact Vec. - let mut encoded_rows = Vec::new(); - rowbinary::serialize_into(&mut encoded_rows, &row)?; - - let mut batch = InsertBatch::from_encoded_rows(encoded_rows, 1, processed.origin_timestamp); - batch.cogs_data = Some(processed.cogs_data); - batch.item_type_metrics = Some(processed.item_type_metrics); - Ok(batch) + Ok(TypedInsertBatch { + row: Some(row), + origin_timestamp: processed.origin_timestamp, + sentry_received_timestamp: None, + cogs_data: Some(processed.cogs_data), + item_type_metrics: Some(processed.item_type_metrics), + num_bytes, + }) } /// Test-only: returns the typed `EAPItemRow` (plus the metadata fields the @@ -481,13 +487,13 @@ impl AttributeMap { } seq_attrs! { -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, clickhouse::Row)] pub struct EAPItemRow { organization_id: u64, project_id: u64, item_type: u8, timestamp: u32, - #[serde(with = "crate::strategies::clickhouse::rowbinary::uuid")] + #[serde(with = "clickhouse::serde::uuid")] trace_id: Uuid, item_id: u128, @@ -513,48 +519,6 @@ pub struct EAPItemRow { } } -seq_attrs! { -impl EAPItemRow { - /// Column names in struct (= wire) order. We MUST pass this list to - /// ClickHouse on insert (`INSERT INTO t (col1, col2, ...) FORMAT RowBinary`) - /// because the on-disk column order in `eap_items_1_local` differs from - /// the struct order: - /// - /// * `client_sample_rate` and `server_sample_rate` were added with - /// identical `AFTER sampling_factor` in migration 0048, so the table - /// ends up with the pair reversed (server before client). - /// * The struct interleaves `attributes_string_N, attributes_float_N` for - /// each `N` (per the `seq_attrs!` expansion), while the initial table - /// put all `attributes_string_*` first, then all `attributes_float_*`. - /// - /// Without an explicit column list, ClickHouse falls back to the table's - /// positional order and misreads bytes (the integration test hits a - /// `CANNOT_READ_ALL_DATA` deep inside the maps section). - pub(crate) const COLUMN_NAMES: &'static [&'static str] = &[ - "organization_id", - "project_id", - "item_type", - "timestamp", - "trace_id", - "item_id", - "indexed_name", - "sampling_weight", - "sampling_factor", - "client_sample_rate", - "server_sample_rate", - "retention_days", - "downsampled_retention_days", - "attributes_bool", - "attributes_int", - #( - concat!("attributes_string_", stringify!(N)), - concat!("attributes_float_", stringify!(N)), - )* - "attributes_array", - ]; -} -} - impl TryFrom for EAPItemRow { type Error = anyhow::Error; @@ -1003,7 +967,9 @@ mod tests { /// adding/reordering fields on `EAPItemRow` also updates this list. #[test] fn test_column_names_match_struct_layout() { - let names = EAPItemRow::COLUMN_NAMES; + // Column names now come from the `clickhouse::Row` derive (declaration + // order). This still guards that field names/order match the schema. + let names = ::COLUMN_NAMES; // 12 scalars + indexed_name + attributes_bool + attributes_int + 80 buckets + attributes_array assert_eq!(names.len(), 96); assert_eq!(names[0], "organization_id"); @@ -1615,11 +1581,11 @@ mod tests { } /// End-to-end test of the production RowBinary path against a live - /// ClickHouse instance: process_message_row_binary produces bytes via the - /// vendored serializer, those bytes are POSTed verbatim with - /// `FORMAT RowBinary`, and the row is read back via `FORMAT JSON`. This - /// is the cross-boundary check that our wire format matches what - /// ClickHouse expects — pure unit tests can't catch that. + /// ClickHouse instance: `process_message_eap_row` produces a typed + /// `EAPItemRow`, the official clickhouse-crate `Inserter` serializes + + /// inserts it (RowBinaryWithNamesAndTypes + schema validation), and the row + /// is read back via `FORMAT JSON`. This is the cross-boundary check that our + /// types match the live schema — pure unit tests can't catch that. #[tokio::test] async fn test_row_binary_clickhouse_insert() { let host = std::env::var("CLICKHOUSE_HOST").unwrap_or("127.0.0.1".to_string()); @@ -1628,6 +1594,8 @@ mod tests { .parse() .unwrap(); let database = std::env::var("CLICKHOUSE_DATABASE").unwrap_or("default".to_string()); + let user = std::env::var("CLICKHOUSE_USER").unwrap_or("default".to_string()); + let password = std::env::var("CLICKHOUSE_PASSWORD").unwrap_or_default(); let base_url = format!("http://{host}:{http_port}"); let http = reqwest::Client::new(); @@ -1677,35 +1645,29 @@ mod tests { timestamp: DateTime::from(SystemTime::now()), }; - // Production path: encodes the row to RowBinary bytes inside the processor. - let batch = process_message_row_binary(payload, meta, &ProcessorConfig::default()) + // Production path: build the typed row, then insert it through the + // official clickhouse-crate inserter. Exercises the real EAPItemRow → + // live-schema mapping (UUID, Map columns, DateTime, UInt128, the + // JSON attributes_array column, …). Validation is off (plain RowBinary + // + input_format_binary_read_json_as_string) — see build_client in + // inserter_sink.rs for why the JSON column requires this. + let batch = process_message_eap_row(payload, meta, &ProcessorConfig::default()) .expect("The message should be processed"); - assert_eq!(batch.rows.num_rows, 1); - - // Insert: POST the pre-encoded bytes with FORMAT RowBinary. We must - // pass the column list — the struct's wire order does NOT match the - // table's on-disk column order (see EAPItemRow::COLUMN_NAMES). - let insert_query = format!( - "INSERT INTO eap_items_1_local ({}) FORMAT RowBinary", - EAPItemRow::COLUMN_NAMES.join(", "), - ); - let insert_resp = http - .post(&base_url) - .header("X-ClickHouse-Database", &database) - .query(&[ - ("query", insert_query.as_str()), - ("input_format_binary_read_json_as_string", "1"), - ("insert_deduplicate", "0"), - ]) - .body(batch.rows.encoded_rows.clone()) - .send() - .await - .expect("Insert request failed to send"); - assert!( - insert_resp.status().is_success(), - "Insert failed: {}", - insert_resp.text().await.unwrap_or_default() - ); + let row = batch.row.expect("row should be present"); + + let client = clickhouse::Client::default() + .with_url(base_url.as_str()) + .with_user(user) + .with_password(password) + .with_database(database.clone()) + .with_validation(false) + .with_setting("input_format_binary_read_json_as_string", "1"); + let mut inserter = client + .inserter::("eap_items_1_local") + .with_max_rows(1); + inserter.write(&row).await.expect("write row"); + let quantities = inserter.end().await.expect("flush insert"); + assert_eq!(quantities.rows, 1); // Read it back via FORMAT JSON. We use organization_id (primary key prefix) // for a deterministic lookup. ClickHouse's FORMAT JSON renders 64-bit diff --git a/rust_snuba/src/processors/generic_metrics.rs b/rust_snuba/src/processors/generic_metrics.rs index 8fcac903d2c..d62a6c0d08f 100644 --- a/rust_snuba/src/processors/generic_metrics.rs +++ b/rust_snuba/src/processors/generic_metrics.rs @@ -166,7 +166,7 @@ fn decode_encoded_into_numeric_array( where T: Decodable, { - if data.len() % T::SIZE == 0 { + if data.len().is_multiple_of(T::SIZE) { Ok(data .chunks_exact(T::SIZE) .map(TryInto::try_into) diff --git a/rust_snuba/src/strategies/accepted_outcomes/produce_outcome.rs b/rust_snuba/src/strategies/accepted_outcomes/produce_outcome.rs index 23979cf52e4..f279dbdcf65 100644 --- a/rust_snuba/src/strategies/accepted_outcomes/produce_outcome.rs +++ b/rust_snuba/src/strategies/accepted_outcomes/produce_outcome.rs @@ -87,7 +87,7 @@ impl TaskRunner // Pace produces to avoid flooding the producer queue with a single // large batch all at once. produced_count += 1; - if produced_count % throttle_batch_size == 0 { + if produced_count.is_multiple_of(throttle_batch_size) { tokio::time::sleep(throttle_sleep).await; } } diff --git a/rust_snuba/src/strategies/clickhouse/inserter_sink.rs b/rust_snuba/src/strategies/clickhouse/inserter_sink.rs new file mode 100644 index 00000000000..7a66260c637 --- /dev/null +++ b/rust_snuba/src/strategies/clickhouse/inserter_sink.rs @@ -0,0 +1,799 @@ +//! Arroyo sink that inserts `eap_items` rows through the official `clickhouse` +//! crate's [`Inserter`]. +//! +//! Shape: the processor hands this strategy a *typed* [`EAPItemRow`] per +//! message (wrapped in `BytesInsertBatch>`; `None` == a +//! skipped/empty message that still carries offsets). A long-lived actor task +//! owns one [`clickhouse::inserter::Inserter`] and writes each row the moment +//! it arrives. To allow retrying transient insert errors, the rows of the +//! current unflushed window are retained until that window's flush succeeds, so +//! peak memory is bounded by one batch (`max_batch_size`) of rows plus the +//! inserter's serialized byte buffer — not by total row count. +//! +//! Wire format is plain `RowBinary` (crate validation is off — see +//! `build_client`: the `attributes_array` JSON column can't be sent under +//! `RowBinaryWithNamesAndTypes`). The crate still emits the column list from the +//! `Row` derive in the INSERT, so column mapping is correct. +//! +//! The inserter owns the flush boundary: it is configured with our batch +//! settings (`with_max_rows`/`with_max_bytes` + `with_period`), and we drive it +//! with [`Inserter::commit`] (never `force_commit`). When `commit()` reports a +//! real flush (`Quantities.rows > 0`), the rows it flushed are durably in +//! ClickHouse; only then do we push *their* Kafka offsets downstream toward +//! `CommitOffsets`. On a flush error the retained window is replayed through a +//! fresh inserter with exponential backoff; if retries are exhausted the actor +//! fail-stops, the offsets are never pushed, and the batch replays on restart — +//! the same durability barrier (plus retry) the old writer had. + +use std::collections::BTreeMap; +use std::time::{Duration, SystemTime}; + +use sentry_arroyo::processing::strategies::run_task_in_threads::ConcurrencyConfig; +use sentry_arroyo::processing::strategies::{ + merge_commit_request, CommitRequest, MessageRejected, ProcessingStrategy, StrategyError, + SubmitError, +}; +use sentry_arroyo::types::{Message, Partition}; +use sentry_arroyo::utils::timing::Deadline; +use sentry_arroyo::{counter, timer}; +use tokio::sync::mpsc; + +use clickhouse::inserter::Quantities; + +use crate::config::{BatchSizeCalculation, ClickhouseConfig}; +use crate::processors::eap_items::EAPItemRow; +use crate::runtime_config::{get_load_balancing_config, get_max_insert_block_size}; +use crate::types::BytesInsertBatch; + +/// Upper bound on typed rows in flight between `submit` and the actor. Kept +/// small (independent of `max_batch_size`) so the channel itself isn't an +/// unbounded backlog — when full, `submit` returns `MessageRejected` and arroyo +/// back-pressures upstream. (The current window's rows are retained in the +/// actor for retry; that buffer is bounded by `max_batch_size`.) +const WORK_CHANNEL_CAPACITY: usize = 1024; + +/// Insert retry policy for transient ClickHouse/network errors, mirroring the +/// old JSON writer's `RetryConfig` (jittered exponential backoff). +const MAX_INSERT_RETRIES: usize = 4; +const INITIAL_RETRY_BACKOFF_MS: u64 = 500; + +/// Work handed from the strategy (sync) to the actor (async), in arrival order. +enum WorkItem { + Row { + // Boxed because `EAPItemRow` is wide (~96 columns); keeps the channel + // item small and the enum cheap to move. + row: Box, + committable: Vec<(Partition, u64)>, + meta: BytesInsertBatch<()>, + }, + Skip { + committable: Vec<(Partition, u64)>, + meta: BytesInsertBatch<()>, + }, +} + +/// A flush succeeded (or a skip-only window elapsed): these offsets + metadata +/// are safe to push downstream. +struct ReadyFlush { + committable: BTreeMap, + meta: BytesInsertBatch<()>, + bytes: u64, + rows: u64, + elapsed: Duration, +} + +/// Result handed from the actor back to the strategy, drained in `poll`. +enum FlushOutcome { + /// Boxed because [`ReadyFlush`] is much larger than the `Err` variant. + Ready(Box), + /// A write/commit/end failed: the offsets it would have covered are + /// discarded (never pushed) so the batch replays. + Err(String), +} + +/// The current unflushed window. To support retrying transient insert errors, +/// the window's rows are retained here (bounded by `max_batch_size`) until its +/// flush succeeds — the only place a batch of typed rows lives, in addition to +/// the inserter's serialized byte buffer. +#[derive(Default)] +struct Acc { + rows: Vec, + committable: BTreeMap, + meta: BytesInsertBatch<()>, +} + +impl Acc { + fn merge(&mut self, committable: Vec<(Partition, u64)>, meta: BytesInsertBatch<()>) { + for (partition, offset) in committable { + let entry = self.committable.entry(partition).or_insert(offset); + *entry = (*entry).max(offset); + } + self.meta.merge_metadata(meta); + } + + fn has_offsets(&self) -> bool { + !self.committable.is_empty() + } +} + +/// Push the accumulated window downstream as a ready flush, then reset the +/// accumulator — the rows are now durable. +/// +/// Invariant: `rows` (the count actually flushed) must equal the retained +/// window `acc.rows.len()`. We hold it because we `commit()` after every single +/// `write()`, and the crate's flush ends the *entire* current INSERT (it never +/// flushes a strict subset) — so a flush always covers exactly the retained +/// window, and emitting all of `acc.committable` only ever advances offsets for +/// rows that are now durable. The assert guards against a future change (e.g. +/// batching multiple writes before a commit) silently breaking that and +/// committing offsets past not-yet-durable rows. +fn emit_ready( + out_tx: &mpsc::UnboundedSender, + acc: &mut Acc, + bytes: u64, + rows: u64, + elapsed: Duration, +) { + debug_assert_eq!( + rows as usize, + acc.rows.len(), + "flush count must equal the retained window; committing more offsets \ + than were made durable would lose data", + ); + acc.rows.clear(); + let committable = std::mem::take(&mut acc.committable); + let meta = std::mem::take(&mut acc.meta); + let _ = out_tx.send(FlushOutcome::Ready(Box::new(ReadyFlush { + committable, + meta, + bytes, + rows, + elapsed, + }))); +} + +fn build_client(cfg: &ClickhouseConfig) -> clickhouse::Client { + let scheme = if cfg.secure { "https" } else { "http" }; + clickhouse::Client::default() + .with_url(format!("{}://{}:{}", scheme, cfg.host, cfg.http_port)) + .with_user(cfg.user.clone()) + .with_password(cfg.password.clone()) + .with_database(cfg.database.clone()) + // Validation OFF → plain `RowBinary` (no names+types header). The + // `eap_items.attributes_array` column is a native `JSON(max_dynamic_paths=N)` + // type; under `RowBinaryWithNamesAndTypes` the server validates the + // header types and rejects our `String` field for that column + // ("Type of 'attributes_array' must be JSON(...)"). Plain RowBinary plus + // `input_format_binary_read_json_as_string=1` (below) writes the JSON as + // a string and lets ClickHouse parse it into the JSON column — exactly + // what the old hand-rolled writer did. The crate still emits the column + // list (from the `Row` derive) in the INSERT, so mapping stays correct. + .with_validation(false) + // Mirror the old writer's URL params: synchronous distributed insert and + // "treat binary strings bound for JSON columns as JSON text". + .with_setting("insert_distributed_sync", "1") + .with_setting("input_format_binary_read_json_as_string", "1") +} + +fn make_inserter( + client: &clickhouse::Client, + table: &str, + storage_name: &str, + max_rows: u64, + max_bytes: u64, + max_batch_time: Duration, +) -> clickhouse::inserter::Inserter { + let lb = get_load_balancing_config(storage_name); + let mut inserter = client + .inserter::(table) + .with_max_rows(max_rows) + .with_max_bytes(max_bytes) + .with_period(Some(max_batch_time)) + .with_setting("load_balancing", lb.load_balancing); + if let Some(first_offset) = lb.first_offset { + inserter = inserter.with_setting("load_balancing_first_offset", first_offset); + } + if let Some(block_size) = get_max_insert_block_size(storage_name) { + inserter = inserter.with_setting("max_insert_block_size", block_size.to_string()); + } + inserter +} + +/// Durably insert `rows` as a single INSERT, retrying transient failures with +/// jittered exponential backoff. Used to recover a window after the long-lived +/// inserter hit a write/commit error (which poisons it). Returns the inserted +/// `Quantities` on success, or the last error string after exhausting retries. +#[allow(clippy::too_many_arguments)] +async fn flush_window_with_retry( + client: &clickhouse::Client, + table: &str, + storage_name: &str, + max_rows: u64, + max_bytes: u64, + max_batch_time: Duration, + rows: &[EAPItemRow], +) -> Result { + if rows.is_empty() { + return Ok(Quantities::ZERO); + } + let mut backoff = Duration::from_millis(INITIAL_RETRY_BACKOFF_MS); + let mut last_err = String::from("unknown error"); + for attempt in 0..=MAX_INSERT_RETRIES { + // Fresh inserter each attempt; write the whole window then force-flush + // via end(). We never call commit() here, so size/period thresholds are + // irrelevant — end() flushes all buffered rows as one INSERT. + let mut inserter = make_inserter( + client, + table, + storage_name, + max_rows, + max_bytes, + max_batch_time, + ); + let mut write_err: Option = None; + for row in rows { + if let Err(e) = inserter.write(row).await { + write_err = Some(e.to_string()); + break; + } + } + match write_err { + None => match inserter.end().await { + Ok(quantities) => return Ok(quantities), + Err(e) => last_err = e.to_string(), + }, + Some(e) => last_err = e, + } + if attempt < MAX_INSERT_RETRIES { + counter!( + "rust_consumer.clickhouse_insert_error", + 1, + "status" => "insert_error", + "retried" => "true" + ); + tracing::warn!( + "ClickHouse insert failed (attempt {}/{}): {}", + attempt + 1, + MAX_INSERT_RETRIES + 1, + last_err + ); + // ±10% jitter so consumers don't retry in lockstep. + let jitter = 1.0 + (rand::random::() * 0.2 - 0.1); + tokio::time::sleep(backoff.mul_f64(jitter)).await; + backoff = backoff.saturating_mul(2); + } + } + counter!( + "rust_consumer.clickhouse_insert_error", + 1, + "status" => "insert_error", + "retried" => "false" + ); + Err(last_err) +} + +#[allow(clippy::too_many_arguments)] +async fn run_inserter_actor( + config: ClickhouseConfig, + table: String, + storage_name: String, + max_rows: u64, + max_bytes: u64, + max_batch_time: Duration, + skip_write: bool, + mut work_rx: mpsc::Receiver, + out_tx: mpsc::UnboundedSender, +) { + let client = build_client(&config); + let mut inserter = make_inserter( + &client, + &table, + &storage_name, + max_rows, + max_bytes, + max_batch_time, + ); + let mut acc = Acc::default(); + + // The crate only evaluates `with_period` inside `commit()`, so we call + // `commit()` on a tick. Ticking finer than `max_batch_time` makes the + // period flush fire promptly instead of slipping a whole window. + let tick_period = (max_batch_time / 4).max(Duration::from_millis(10)); + let mut interval = tokio::time::interval(tick_period); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + interval.tick().await; // consume the immediate first tick + + // Bounds how often a skip-only window pushes offsets, so we don't emit a + // downstream message every tick when no rows are flowing. + let mut skip_deadline = Deadline::new(max_batch_time); + + // On a write/commit error the long-lived inserter is poisoned. Replay the + // retained window through a fresh inserter with backoff; on success reset + // the main inserter, emit the offsets, and carry on. If retries are + // exhausted, fail-stop: surface the error and return (the consumer restarts + // and replays from the last committed offset). We deliberately do NOT keep + // consuming later rows on an unrecoverable error — they would be durably + // inserted with uncommitted offsets, producing duplicates on replay. + macro_rules! recover_or_fail { + ($start:expr) => {{ + match flush_window_with_retry( + &client, + &table, + &storage_name, + max_rows, + max_bytes, + max_batch_time, + &acc.rows, + ) + .await + { + Ok(quantities) => { + inserter = make_inserter( + &client, + &table, + &storage_name, + max_rows, + max_bytes, + max_batch_time, + ); + let elapsed = $start.elapsed().unwrap_or_default(); + emit_ready( + &out_tx, + &mut acc, + quantities.bytes, + quantities.rows, + elapsed, + ); + skip_deadline = Deadline::new(max_batch_time); + } + Err(e) => { + let _ = out_tx.send(FlushOutcome::Err(e)); + return; + } + } + }}; + } + + loop { + tokio::select! { + maybe = work_rx.recv() => match maybe { + Some(WorkItem::Row { row, committable, meta }) => { + acc.merge(committable, meta); + if skip_write { + continue; + } + // Retain the row for retry, then write it from the buffer. + acc.rows.push(*row); + let start = SystemTime::now(); + if inserter.write(acc.rows.last().unwrap()).await.is_err() { + recover_or_fail!(start); + continue; + } + match inserter.commit().await { + Ok(q) if q.rows > 0 => { + let elapsed = start.elapsed().unwrap_or_default(); + emit_ready(&out_tx, &mut acc, q.bytes, q.rows, elapsed); + skip_deadline = Deadline::new(max_batch_time); + } + Ok(_) => {} + Err(_) => recover_or_fail!(start), + } + } + Some(WorkItem::Skip { committable, meta }) => acc.merge(committable, meta), + None => break, // strategy dropped the sender: finalize below + }, + _ = interval.tick() => { + if skip_write { + if acc.has_offsets() && skip_deadline.has_elapsed() { + emit_ready(&out_tx, &mut acc, 0, 0, Duration::ZERO); + skip_deadline = Deadline::new(max_batch_time); + } + continue; + } + let start = SystemTime::now(); + match inserter.commit().await { + Ok(q) if q.rows > 0 => { + let elapsed = start.elapsed().unwrap_or_default(); + emit_ready(&out_tx, &mut acc, q.bytes, q.rows, elapsed); + skip_deadline = Deadline::new(max_batch_time); + } + Ok(_) => { + // Period elapsed with no buffered rows (skip-only window): + // advance offsets so the consumer doesn't stall. We must + // NOT emit while real rows are buffered-but-unflushed — + // their offsets are lower than later skips, so committing + // past them would lose data. + if acc.has_offsets() && acc.rows.is_empty() && skip_deadline.has_elapsed() { + emit_ready(&out_tx, &mut acc, 0, 0, Duration::ZERO); + skip_deadline = Deadline::new(max_batch_time); + } + } + Err(_) => recover_or_fail!(start), + } + } + } + } + + // Finalize: flush the last partial window (with the same retry on error). + if skip_write { + if acc.has_offsets() { + emit_ready(&out_tx, &mut acc, 0, 0, Duration::ZERO); + } + return; + } + let start = SystemTime::now(); + let final_result = match inserter.end().await { + Ok(quantities) => Ok(quantities), + Err(_) => { + flush_window_with_retry( + &client, + &table, + &storage_name, + max_rows, + max_bytes, + max_batch_time, + &acc.rows, + ) + .await + } + }; + match final_result { + Ok(quantities) => { + if acc.has_offsets() { + let elapsed = start.elapsed().unwrap_or_default(); + emit_ready( + &out_tx, + &mut acc, + quantities.bytes, + quantities.rows, + elapsed, + ); + } + } + Err(e) => { + let _ = out_tx.send(FlushOutcome::Err(e)); + } + } +} + +pub struct EapItemsInserterSink { + next_step: N, + /// `None` once the stream has been closed (in `join`/`terminate`). + work_tx: Option>, + out_rx: mpsc::UnboundedReceiver, + actor: Option>, + /// A flushed message that the next step rejected; retried before pulling more. + message_carried_over: Option>>, + commit_request_carried_over: Option, + actor_done: bool, +} + +impl EapItemsInserterSink +where + N: ProcessingStrategy> + 'static, +{ + #[allow(clippy::too_many_arguments)] + pub fn new( + next_step: N, + cluster: ClickhouseConfig, + table: String, + storage_name: String, + concurrency: &ConcurrencyConfig, + max_batch_size: usize, + max_batch_time: Duration, + batch_size_calculation: BatchSizeCalculation, + skip_write: bool, + ) -> Self { + let (max_rows, max_bytes) = match batch_size_calculation { + BatchSizeCalculation::Rows => (max_batch_size as u64, u64::MAX), + BatchSizeCalculation::Bytes => (u64::MAX, max_batch_size as u64), + }; + + let (work_tx, work_rx) = mpsc::channel::(WORK_CHANNEL_CAPACITY); + let (out_tx, out_rx) = mpsc::unbounded_channel::(); + + let actor = concurrency.handle().spawn(run_inserter_actor( + cluster, + table, + storage_name, + max_rows, + max_bytes, + max_batch_time, + skip_write, + work_rx, + out_tx, + )); + + Self { + next_step, + work_tx: Some(work_tx), + out_rx, + actor: Some(actor), + message_carried_over: None, + commit_request_carried_over: None, + actor_done: false, + } + } + + /// Retry a downstream message that was previously rejected. + fn submit_carried_over(&mut self) -> Result<(), StrategyError> { + if let Some(message) = self.message_carried_over.take() { + match self.next_step.submit(message) { + Ok(()) => {} + Err(SubmitError::MessageRejected(MessageRejected { message })) => { + self.message_carried_over = Some(message); + } + Err(SubmitError::InvalidMessage(e)) => return Err(e.into()), + } + } + Ok(()) + } + + /// Drain ready flushes from the actor and push them downstream. Stops on + /// downstream backpressure (carry the message and retry next poll). + fn drain_outcomes(&mut self) -> Result<(), StrategyError> { + while self.message_carried_over.is_none() { + match self.out_rx.try_recv() { + Ok(FlushOutcome::Ready(ready)) => { + let ReadyFlush { + committable, + meta, + bytes, + rows, + elapsed, + } = *ready; + timer!("insertions.batch_write_ms", elapsed); + counter!("insertions.batch_write_bytes", bytes as i64); + counter!("insertions.batch_write_msgs", rows as i64); + meta.record_message_latency(); + meta.emit_item_type_metrics(); + let message = Message::new_any_message(meta, committable); + match self.next_step.submit(message) { + Ok(()) => {} + Err(SubmitError::MessageRejected(MessageRejected { message })) => { + self.message_carried_over = Some(message); + } + Err(SubmitError::InvalidMessage(e)) => return Err(e.into()), + } + } + Ok(FlushOutcome::Err(e)) => { + counter!( + "rust_consumer.clickhouse_insert_error", + 1, + "status" => "insert_error", + "retried" => "false" + ); + tracing::error!("ClickHouse inserter flush failed: {}", e); + return Err(StrategyError::Other(e.into())); + } + Err(mpsc::error::TryRecvError::Empty) => break, + Err(mpsc::error::TryRecvError::Disconnected) => { + self.actor_done = true; + break; + } + } + } + Ok(()) + } +} + +impl ProcessingStrategy>> for EapItemsInserterSink +where + N: ProcessingStrategy> + 'static, +{ + fn poll(&mut self) -> Result, StrategyError> { + let commit_request = self.next_step.poll()?; + self.commit_request_carried_over = + merge_commit_request(self.commit_request_carried_over.take(), commit_request); + self.submit_carried_over()?; + self.drain_outcomes()?; + Ok(self.commit_request_carried_over.take()) + } + + fn submit( + &mut self, + message: Message>>, + ) -> Result<(), SubmitError>>> { + // Downstream is backpressured, or we're shutting down: reject upstream too. + if self.message_carried_over.is_some() { + return Err(SubmitError::MessageRejected(MessageRejected { message })); + } + let Some(work_tx) = self.work_tx.as_ref() else { + return Err(SubmitError::MessageRejected(MessageRejected { message })); + }; + // Reserve a slot first so we never destructure `message` on the reject + // path. `try_reserve` also fails (→ MessageRejected) if the actor exited + // and closed the channel, so we never silently accept a row the actor + // can't receive. `Permit::send` is infallible once reserved; and even in + // the narrow race where the actor exits between reserve and send, the + // dropped row's offset is never committed (offsets only advance via the + // downstream flushes we emit), so it simply replays. + let Ok(permit) = work_tx.try_reserve() else { + return Err(SubmitError::MessageRejected(MessageRejected { message })); + }; + + let committable: Vec<(Partition, u64)> = message.committable().collect(); + let (row, meta) = message.into_payload().take(); + let item = match row { + Some(row) => WorkItem::Row { + row: Box::new(row), + committable, + meta, + }, + None => WorkItem::Skip { committable, meta }, + }; + permit.send(item); + Ok(()) + } + + fn terminate(&mut self) { + // Drop the sender and abort the actor (abandons any open INSERT; the + // server discards an un-`end()`ed insert, and un-pushed offsets replay). + self.work_tx = None; + if let Some(handle) = self.actor.take() { + handle.abort(); + } + self.next_step.terminate(); + } + + fn join(&mut self, timeout: Option) -> Result, StrategyError> { + // Closing the sender makes the actor finalize (flush via `end()`), then + // drop `out_tx` so `out_rx` disconnects. + self.work_tx = None; + let deadline = timeout.map(Deadline::new); + + let mut timed_out = false; + loop { + let commit_request = self.next_step.poll()?; + self.commit_request_carried_over = + merge_commit_request(self.commit_request_carried_over.take(), commit_request); + self.submit_carried_over()?; + self.drain_outcomes()?; + + if self.actor_done && self.message_carried_over.is_none() { + break; + } + if let Some(deadline) = &deadline { + if deadline.has_elapsed() { + timed_out = true; + break; + } + } + // Let the actor make progress on its runtime threads. + std::thread::sleep(Duration::from_millis(1)); + } + + if timed_out { + // Stop the actor so it can't durably flush a window whose offsets we + // can no longer observe within the deadline: aborting cancels an + // in-flight INSERT that hasn't landed (→ that window simply replays) + // rather than leaving it running to commit rows we'll never ack. A + // flush that already landed server-side replays as a duplicate — + // the same at-least-once shutdown exposure the old writer had, to be + // closed by the (deferred) insert_deduplication_token. + tracing::warn!( + "inserter sink join timed out; aborting actor, partial batch will replay" + ); + if let Some(handle) = self.actor.take() { + handle.abort(); + } + } + // Final drain: emit any `Ready` the actor produced right around the + // deadline so its offsets are committed instead of replaying. + self.drain_outcomes()?; + + // Pass the *remaining* time, not the original timeout — the wait loop + // above may already have consumed part of it, so reusing `timeout` here + // could block for up to 2x the caller's budget. + let next_commit = self.next_step.join(deadline.map(|d| d.remaining()))?; + Ok(merge_commit_request( + self.commit_request_carried_over.take(), + next_commit, + )) + } +} + +impl Drop for EapItemsInserterSink { + fn drop(&mut self) { + // Abort the actor if it's still around so a dropped sink never flushes a + // partial INSERT (whose offsets were never pushed) behind our back. + if let Some(handle) = self.actor.take() { + handle.abort(); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::{Arc, Mutex}; + + use sentry_arroyo::processing::strategies::ProcessingStrategy; + use sentry_arroyo::types::{Partition, Topic}; + + /// Records the committable of every message it receives. + struct RecordingStep { + committables: Arc>>>, + } + + impl ProcessingStrategy> for RecordingStep { + fn poll(&mut self) -> Result, StrategyError> { + Ok(None) + } + fn submit( + &mut self, + message: Message>, + ) -> Result<(), SubmitError>> { + let committable: BTreeMap = message.committable().collect(); + self.committables.lock().unwrap().push(committable); + Ok(()) + } + fn terminate(&mut self) {} + fn join(&mut self, _: Option) -> Result, StrategyError> { + Ok(None) + } + } + + fn test_cluster() -> ClickhouseConfig { + ClickhouseConfig { + host: "127.0.0.1".to_string(), + port: 9000, + secure: false, + http_port: 8123, + user: "default".to_string(), + password: "".to_string(), + database: "default".to_string(), + } + } + + /// With `skip_write`, the actor never touches ClickHouse, so this exercises + /// the full submit → actor → drain → downstream path (and `offset + 1` + /// next-to-consume semantics) without a live cluster. Skipped (`None`) rows + /// must still advance offsets, mirroring `flush_empty_batches`. + #[test] + fn skipped_messages_advance_offsets_without_clickhouse() { + crate::testutils::initialize_python(); + + let recorded = Arc::new(Mutex::new(Vec::new())); + let next_step = RecordingStep { + committables: recorded.clone(), + }; + let concurrency = ConcurrencyConfig::new(1); + + let mut sink = EapItemsInserterSink::new( + next_step, + test_cluster(), + "eap_items_1_local".to_string(), + "test_storage".to_string(), + &concurrency, + 1000, + Duration::from_millis(50), + BatchSizeCalculation::Rows, + true, // skip_write + ); + + let partition = Partition::new(Topic::new("snuba-items"), 0); + for offset in 0..5u64 { + let message = Message::new_broker_message( + BytesInsertBatch::>::default(), + partition, + offset, + chrono::Utc::now(), + ); + sink.submit(message).unwrap(); + } + + // join() closes the stream so the actor finalizes and flushes the + // accumulated offsets, which we then drain downstream. + sink.join(Some(Duration::from_secs(5))).unwrap(); + + let recorded = recorded.lock().unwrap(); + let max_offset = recorded + .iter() + .filter_map(|c| c.get(&partition).copied()) + .max() + .expect("offsets should have been pushed downstream"); + // Highest consumed offset is 4 → next-to-consume committable is 5. + assert_eq!(max_offset, 5); + } +} diff --git a/rust_snuba/src/strategies/clickhouse/mod.rs b/rust_snuba/src/strategies/clickhouse/mod.rs index 45a3d852766..905b7251b15 100644 --- a/rust_snuba/src/strategies/clickhouse/mod.rs +++ b/rust_snuba/src/strategies/clickhouse/mod.rs @@ -1,2 +1,2 @@ -pub mod rowbinary; +pub mod inserter_sink; pub mod writer_v2; diff --git a/rust_snuba/src/strategies/clickhouse/rowbinary/mod.rs b/rust_snuba/src/strategies/clickhouse/rowbinary/mod.rs deleted file mode 100644 index 880bf3aafda..00000000000 --- a/rust_snuba/src/strategies/clickhouse/rowbinary/mod.rs +++ /dev/null @@ -1,36 +0,0 @@ -//! Minimal RowBinary serializer for inserts into ClickHouse. -//! -//! Vendored from the `clickhouse` crate (Apache-2.0 / MIT) and reduced to the -//! shape `rust_snuba` actually emits: fixed-width integers, f32/f64, bool, str, -//! raw byte slices (UUID / FixedString), length-prefixed sequences, tuples and -//! structs. We do this in-process so the typed rows can be serialized to bytes -//! inside the processor and dropped immediately, instead of riding the pipeline -//! all the way to the writer step. - -mod ser; - -// `Error` is part of the module's public surface (it's the error type of -// `serialize_into`). It isn't referenced by name anywhere in the crate today -// — callers propagate via `?` into `anyhow::Error` — but keep the re-export -// so consumers can match on the variant if they ever need to. -#[allow(unused_imports)] -pub use ser::{serialize_into, Error}; - -pub mod uuid { - //! Serde adapter for `#[serde(with = "...")]` on `Uuid` fields. ClickHouse - //! stores UUID as two little-endian u64 halves, so the canonical 16-byte - //! UUID has each half reversed before being written. - - use serde::Serializer; - use uuid::Uuid; - - pub fn serialize(uuid: &Uuid, serializer: S) -> Result - where - S: Serializer, - { - let mut bytes = *uuid.as_bytes(); - bytes[..8].reverse(); - bytes[8..].reverse(); - serializer.serialize_bytes(&bytes) - } -} diff --git a/rust_snuba/src/strategies/clickhouse/rowbinary/ser.rs b/rust_snuba/src/strategies/clickhouse/rowbinary/ser.rs deleted file mode 100644 index e17fab74d7b..00000000000 --- a/rust_snuba/src/strategies/clickhouse/rowbinary/ser.rs +++ /dev/null @@ -1,418 +0,0 @@ -use std::fmt; - -use serde::ser::{ - self, Impossible, Serialize, SerializeMap, SerializeSeq, SerializeStruct, SerializeTuple, - SerializeTupleStruct, -}; - -/// Append the RowBinary encoding of `value` to `buf`. -pub fn serialize_into(buf: &mut Vec, value: &T) -> Result<(), Error> -where - T: Serialize + ?Sized, -{ - let mut serializer = Serializer { buf }; - value.serialize(&mut serializer) -} - -#[derive(Debug)] -pub enum Error { - Message(String), - /// Sequences and maps must report a known length so we can emit the - /// LEB128 length prefix up front. - SequenceLengthRequired, - Unsupported(&'static str), -} - -impl fmt::Display for Error { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Error::Message(s) => f.write_str(s), - Error::SequenceLengthRequired => { - f.write_str("rowbinary: sequences and maps must have a known length") - } - Error::Unsupported(what) => write!(f, "rowbinary: unsupported serde type: {what}"), - } - } -} - -impl std::error::Error for Error {} - -impl ser::Error for Error { - fn custom(msg: T) -> Self { - Error::Message(msg.to_string()) - } -} - -struct Serializer<'a> { - buf: &'a mut Vec, -} - -fn write_uvarint(buf: &mut Vec, mut v: u64) { - while v >= 0x80 { - buf.push((v as u8) | 0x80); - v >>= 7; - } - buf.push(v as u8); -} - -macro_rules! impl_le { - ($($name:ident => $ty:ty),+ $(,)?) => {$( - #[inline] - fn $name(self, v: $ty) -> Result<(), Error> { - self.buf.extend_from_slice(&v.to_le_bytes()); - Ok(()) - } - )+}; -} - -impl<'a, 'b> ser::Serializer for &'a mut Serializer<'b> { - type Ok = (); - type Error = Error; - - type SerializeSeq = Self; - type SerializeTuple = Self; - type SerializeTupleStruct = Self; - type SerializeMap = Self; - type SerializeStruct = Self; - type SerializeTupleVariant = Impossible<(), Error>; - type SerializeStructVariant = Impossible<(), Error>; - - #[inline] - fn serialize_bool(self, v: bool) -> Result<(), Error> { - self.buf.push(v as u8); - Ok(()) - } - - #[inline] - fn serialize_u8(self, v: u8) -> Result<(), Error> { - self.buf.push(v); - Ok(()) - } - - #[inline] - fn serialize_i8(self, v: i8) -> Result<(), Error> { - self.buf.push(v as u8); - Ok(()) - } - - impl_le! { - serialize_i16 => i16, - serialize_i32 => i32, - serialize_i64 => i64, - serialize_i128 => i128, - serialize_u16 => u16, - serialize_u32 => u32, - serialize_u64 => u64, - serialize_u128 => u128, - serialize_f32 => f32, - serialize_f64 => f64, - } - - fn serialize_char(self, v: char) -> Result<(), Error> { - let mut tmp = [0u8; 4]; - self.serialize_str(v.encode_utf8(&mut tmp)) - } - - fn serialize_str(self, v: &str) -> Result<(), Error> { - write_uvarint(self.buf, v.len() as u64); - self.buf.extend_from_slice(v.as_bytes()); - Ok(()) - } - - /// Raw byte write — no length prefix. Matches RowBinary's fixed-width - /// encoding for UUID and FixedString(N). Variable-length byte sequences - /// should be modeled as `Vec` and hit `serialize_seq`. - fn serialize_bytes(self, v: &[u8]) -> Result<(), Error> { - self.buf.extend_from_slice(v); - Ok(()) - } - - fn serialize_none(self) -> Result<(), Error> { - self.buf.push(1); - Ok(()) - } - - fn serialize_some(self, value: &T) -> Result<(), Error> - where - T: ?Sized + Serialize, - { - self.buf.push(0); - value.serialize(self) - } - - fn serialize_unit(self) -> Result<(), Error> { - Ok(()) - } - - fn serialize_unit_struct(self, _name: &'static str) -> Result<(), Error> { - Ok(()) - } - - fn serialize_unit_variant( - self, - _name: &'static str, - _index: u32, - _variant: &'static str, - ) -> Result<(), Error> { - Err(Error::Unsupported("enum variant")) - } - - fn serialize_newtype_struct(self, _name: &'static str, value: &T) -> Result<(), Error> - where - T: ?Sized + Serialize, - { - value.serialize(self) - } - - fn serialize_newtype_variant( - self, - _name: &'static str, - _index: u32, - _variant: &'static str, - _value: &T, - ) -> Result<(), Error> - where - T: ?Sized + Serialize, - { - Err(Error::Unsupported("enum variant")) - } - - fn serialize_seq(self, len: Option) -> Result { - let len = len.ok_or(Error::SequenceLengthRequired)?; - write_uvarint(self.buf, len as u64); - Ok(self) - } - - fn serialize_tuple(self, _len: usize) -> Result { - Ok(self) - } - - fn serialize_tuple_struct( - self, - _name: &'static str, - _len: usize, - ) -> Result { - Ok(self) - } - - fn serialize_tuple_variant( - self, - _name: &'static str, - _index: u32, - _variant: &'static str, - _len: usize, - ) -> Result { - Err(Error::Unsupported("enum variant")) - } - - fn serialize_map(self, len: Option) -> Result { - let len = len.ok_or(Error::SequenceLengthRequired)?; - write_uvarint(self.buf, len as u64); - Ok(self) - } - - fn serialize_struct( - self, - _name: &'static str, - _len: usize, - ) -> Result { - Ok(self) - } - - fn serialize_struct_variant( - self, - _name: &'static str, - _index: u32, - _variant: &'static str, - _len: usize, - ) -> Result { - Err(Error::Unsupported("enum variant")) - } -} - -impl<'a, 'b> SerializeSeq for &'a mut Serializer<'b> { - type Ok = (); - type Error = Error; - fn serialize_element(&mut self, value: &T) -> Result<(), Error> - where - T: ?Sized + Serialize, - { - value.serialize(&mut **self) - } - fn end(self) -> Result<(), Error> { - Ok(()) - } -} - -impl<'a, 'b> SerializeTuple for &'a mut Serializer<'b> { - type Ok = (); - type Error = Error; - fn serialize_element(&mut self, value: &T) -> Result<(), Error> - where - T: ?Sized + Serialize, - { - value.serialize(&mut **self) - } - fn end(self) -> Result<(), Error> { - Ok(()) - } -} - -impl<'a, 'b> SerializeTupleStruct for &'a mut Serializer<'b> { - type Ok = (); - type Error = Error; - fn serialize_field(&mut self, value: &T) -> Result<(), Error> - where - T: ?Sized + Serialize, - { - value.serialize(&mut **self) - } - fn end(self) -> Result<(), Error> { - Ok(()) - } -} - -impl<'a, 'b> SerializeMap for &'a mut Serializer<'b> { - type Ok = (); - type Error = Error; - fn serialize_key(&mut self, key: &T) -> Result<(), Error> - where - T: ?Sized + Serialize, - { - key.serialize(&mut **self) - } - fn serialize_value(&mut self, value: &T) -> Result<(), Error> - where - T: ?Sized + Serialize, - { - value.serialize(&mut **self) - } - fn end(self) -> Result<(), Error> { - Ok(()) - } -} - -impl<'a, 'b> SerializeStruct for &'a mut Serializer<'b> { - type Ok = (); - type Error = Error; - fn serialize_field(&mut self, _key: &'static str, value: &T) -> Result<(), Error> - where - T: ?Sized + Serialize, - { - value.serialize(&mut **self) - } - fn end(self) -> Result<(), Error> { - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use serde::Serialize; - - fn encode(v: &T) -> Vec { - let mut buf = Vec::new(); - serialize_into(&mut buf, v).unwrap(); - buf - } - - #[test] - fn primitives_are_little_endian() { - assert_eq!(encode(&0x01u8), vec![0x01]); - assert_eq!(encode(&0x0102u16), vec![0x02, 0x01]); - assert_eq!(encode(&0x01020304u32), vec![0x04, 0x03, 0x02, 0x01]); - assert_eq!( - encode(&0x0102030405060708u64), - vec![0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01], - ); - assert_eq!(encode(&true), vec![1]); - assert_eq!(encode(&false), vec![0]); - } - - #[test] - fn strings_get_uvarint_length_prefix() { - assert_eq!(encode(&""), vec![0]); - assert_eq!(encode(&"abc"), vec![3, b'a', b'b', b'c']); - // 130-char string crosses the single-byte LEB128 boundary (0x80). - let s = "x".repeat(130); - let out = encode(&s); - assert_eq!(&out[..2], &[0x82, 0x01]); // 130 = 0b1_0000010 → 0x82 0x01 - assert_eq!(out.len(), 132); - } - - #[test] - fn bytes_are_raw_no_prefix() { - // serialize_bytes is the FixedString path — no length prefix. - // (We use this for UUIDs.) - struct Raw(Vec); - impl Serialize for Raw { - fn serialize(&self, s: S) -> Result { - s.serialize_bytes(&self.0) - } - } - let out = encode(&Raw(vec![0xaa, 0xbb, 0xcc])); - assert_eq!(out, vec![0xaa, 0xbb, 0xcc]); - } - - #[test] - fn sequences_prefix_then_elements() { - let v: Vec = vec![1, 2, 3]; - // 3 elements, then each u16 LE. - assert_eq!(encode(&v), vec![3, 1, 0, 2, 0, 3, 0]); - } - - #[test] - fn map_like_vec_of_pairs_matches_clickhouse_map_shape() { - // Map(String, UInt8) is wire-equivalent to Array(Tuple(String, UInt8)): - // uvarint length, then concatenated (string-prefix, key, u8) tuples. - let v: Vec<(String, u8)> = vec![("a".into(), 7), ("bb".into(), 9)]; - assert_eq!( - encode(&v), - vec![ - 2, // len - 1, b'a', 7, // ("a", 7) - 2, b'b', b'b', 9, // ("bb", 9) - ], - ); - } - - #[test] - fn uuid_adapter_emits_16_bytes_with_halves_reversed() { - use uuid::Uuid; - // 11223344-5566-7788-99aa-bbccddeeff00 - let id = Uuid::from_bytes([ - 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, - 0xff, 0x00, - ]); - #[derive(Serialize)] - struct W { - #[serde(with = "super::super::uuid")] - id: Uuid, - } - let out = encode(&W { id }); - assert_eq!( - out, - vec![ - 0x88, 0x77, 0x66, 0x55, 0x44, 0x33, 0x22, 0x11, // first half reversed - 0x00, 0xff, 0xee, 0xdd, 0xcc, 0xbb, 0xaa, 0x99, // second half reversed - ], - ); - } - - #[test] - fn struct_fields_serialize_in_declaration_order() { - #[derive(Serialize)] - struct S { - a: u8, - b: u16, - c: bool, - } - let out = encode(&S { - a: 1, - b: 2, - c: true, - }); - assert_eq!(out, vec![1, 2, 0, 1]); - } -} diff --git a/rust_snuba/src/strategies/clickhouse/writer_v2.rs b/rust_snuba/src/strategies/clickhouse/writer_v2.rs index 04c7ae65dd1..14f9f29dd3a 100644 --- a/rust_snuba/src/strategies/clickhouse/writer_v2.rs +++ b/rust_snuba/src/strategies/clickhouse/writer_v2.rs @@ -74,34 +74,10 @@ pub struct ClickhouseWriterStep { inner: RunTaskInThreads, BytesInsertBatch<()>, anyhow::Error, N>, } -/// Wire format the writer step posts to ClickHouse with. Picking the right -/// value here is the only thing the consumer needs to know — the pipeline -/// shape, batching, and retry behavior are identical across formats. -#[derive(Clone, Copy, Debug)] -pub enum InsertFormat { - JsonEachRow, - RowBinary, -} - -impl InsertFormat { - fn as_str(self) -> &'static str { - match self { - InsertFormat::JsonEachRow => "JSONEachRow", - InsertFormat::RowBinary => "RowBinary", - } - } -} - impl ClickhouseWriterStep where N: ProcessingStrategy> + 'static, { - /// `columns`: if `Some`, expand into the SQL as - /// `INSERT INTO {table} (col1, col2, ...) FORMAT ...`. Required for - /// `RowBinary`, which would otherwise fall back to the table's positional - /// column order — a footgun whenever wire order and table order diverge. - /// For `JSONEachRow`, pass `None` to preserve historical behavior. - #[allow(clippy::too_many_arguments)] pub fn new( next_step: N, cluster_config: ClickhouseConfig, @@ -109,8 +85,6 @@ where skip_write: bool, concurrency: &ConcurrencyConfig, storage_name: String, - format: InsertFormat, - columns: Option<&'static [&'static str]>, ) -> Self { let inner = RunTaskInThreads::new( next_step, @@ -119,8 +93,6 @@ where &cluster_config.clone(), &table, storage_name, - format, - columns, )), skip_write, ), @@ -182,13 +154,7 @@ pub struct ClickhouseClient { } impl ClickhouseClient { - pub fn new( - config: &ClickhouseConfig, - table: &str, - storage_name: String, - format: InsertFormat, - columns: Option<&[&str]>, - ) -> ClickhouseClient { + pub fn new(config: &ClickhouseConfig, table: &str, storage_name: String) -> ClickhouseClient { let mut headers = HeaderMap::with_capacity(6); headers.insert(CONNECTION, HeaderValue::from_static("keep-alive")); headers.insert(ACCEPT_ENCODING, HeaderValue::from_static("gzip,deflate")); @@ -214,21 +180,8 @@ impl ClickhouseClient { // the same wire format `clickhouse-rs` used and what // `clickhouse-compressor` produces. Distinct from HTTP-standard // `Content-Encoding: lz4`, which would need `enable_http_compression=1`. - let mut base_url = - format!("{scheme}://{host}:{port}?insert_distributed_sync=1&decompress=1"); - if matches!(format, InsertFormat::RowBinary) { - // RowBinary cannot represent JSON values natively; tell ClickHouse - // to treat any binary string targeting a JSON column as JSON text. - base_url.push_str("&input_format_binary_read_json_as_string=1"); - } - let columns_clause = match columns { - Some(cols) => format!(" ({})", cols.join(", ")), - None => String::new(), - }; - let query = format!( - "INSERT INTO {table}{columns_clause} FORMAT {fmt}", - fmt = format.as_str(), - ); + let base_url = format!("{scheme}://{host}:{port}?insert_distributed_sync=1&decompress=1"); + let query = format!("INSERT INTO {table} FORMAT JSONEachRow"); ClickhouseClient { client: Client::new(), @@ -442,13 +395,7 @@ mod tests { crate::testutils::initialize_python(); let config = make_test_config(); println!("config: {config:?}"); - let client = ClickhouseClient::new( - &config, - "querylog_local", - "test_storage".to_string(), - InsertFormat::JsonEachRow, - None, - ); + let client = ClickhouseClient::new(&config, "querylog_local", "test_storage".to_string()); let url = client.build_url(); assert!(url.contains("load_balancing=in_order")); @@ -464,13 +411,7 @@ mod tests { fn test_url_with_runtime_config_override() { crate::testutils::initialize_python(); let config = make_test_config(); - let client = ClickhouseClient::new( - &config, - "test_table", - "writer_v2_lb_test".to_string(), - InsertFormat::JsonEachRow, - None, - ); + let client = ClickhouseClient::new(&config, "test_table", "writer_v2_lb_test".to_string()); // Default: in_order let url = client.build_url(); @@ -500,8 +441,6 @@ mod tests { &config, "test_table", "writer_v2_block_size_test".to_string(), - InsertFormat::JsonEachRow, - None, ); // Default (key absent): no suffix. @@ -517,13 +456,8 @@ mod tests { assert!(url.contains("&max_insert_block_size=2000000")); // A different storage isn't affected. - let other_client = ClickhouseClient::new( - &config, - "test_table", - "writer_v2_other_storage".to_string(), - InsertFormat::JsonEachRow, - None, - ); + let other_client = + ClickhouseClient::new(&config, "test_table", "writer_v2_other_storage".to_string()); let url = other_client.build_url(); assert!(!url.contains("max_insert_block_size")); @@ -638,13 +572,7 @@ mod tests { database: "default".to_string(), }; - let client = ClickhouseClient::new( - &config, - "test_table", - "test_storage".to_string(), - InsertFormat::JsonEachRow, - None, - ); + let client = ClickhouseClient::new(&config, "test_table", "test_storage".to_string()); let start_time = Instant::now(); let result = client diff --git a/rust_snuba/src/strategies/processor.rs b/rust_snuba/src/strategies/processor.rs index 02365873d0a..56f7ffd0a4e 100644 --- a/rust_snuba/src/strategies/processor.rs +++ b/rust_snuba/src/strategies/processor.rs @@ -20,7 +20,7 @@ use crate::processors::utils::SilencedDLQMessage; use crate::processors::{ProcessingFunction, ProcessingFunctionWithReplacements}; use crate::types::{ BytesInsertBatch, CommitLogEntry, CommitLogOffsets, InsertBatch, InsertOrReplacement, - KafkaMessageMetadata, RowData, + KafkaMessageMetadata, RowData, TypedInsertBatch, }; use tokio::time::Instant; @@ -98,6 +98,95 @@ pub fn make_rust_processor( )) } +/// Build the next-step message for a *typed*-row processor (e.g. the +/// `clickhouse`-crate inserter path). Mirrors the commit-log / latency / cogs +/// wiring of `make_rust_processor`'s `result_to_next_msg`, but carries the +/// typed row downstream (as `Option`; `None` == skip) instead of bytes. +fn typed_result_to_next_msg( + transformed: TypedInsertBatch, + partition: Partition, + offset: u64, + timestamp: DateTime, + stop_at_timestamp: Option, +) -> anyhow::Result>>> { + // Don't process any more messages + if let Some(stop) = stop_at_timestamp { + if stop < timestamp.timestamp() { + let payload = BytesInsertBatch::default(); + return Ok(Message::new_broker_message( + payload, partition, offset, timestamp, + )); + } + } + + let num_bytes = transformed.num_bytes; + let mut payload = BytesInsertBatch::from_rows(transformed.row) + .with_num_bytes(num_bytes) + .with_message_timestamp(timestamp) + .with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([( + partition.index, + CommitLogEntry { + offset: offset + 1, + orig_message_ts: timestamp, + received_p99: transformed.origin_timestamp.into_iter().collect(), + }, + )]))) + .with_cogs_data(transformed.cogs_data.unwrap_or_default()); + + if let Some(ts) = transformed.origin_timestamp { + payload = payload.with_origin_timestamp(ts); + } + if let Some(ts) = transformed.sentry_received_timestamp { + payload = payload.with_sentry_received_timestamp(ts); + } + if let Some(metrics) = transformed.item_type_metrics { + payload = payload.with_item_type_metrics(metrics); + } + + Ok(Message::new_broker_message( + payload, partition, offset, timestamp, + )) +} + +/// Like [`make_rust_processor`], but the processing function returns a typed +/// row ([`TypedInsertBatch`]) that is handed downstream verbatim (wrapped in +/// `BytesInsertBatch>`) for a sink that serializes it itself — e.g. +/// the `clickhouse`-crate inserter for `eap_items`. +pub fn make_rust_processor_typed( + next_step: impl ProcessingStrategy>> + 'static, + func: fn( + KafkaPayload, + KafkaMessageMetadata, + config: &ProcessorConfig, + ) -> anyhow::Result>, + schema_name: &str, + enforce_schema: bool, + concurrency: &ConcurrencyConfig, + processor_config: ProcessorConfig, + stop_at_timestamp: Option, +) -> Box> +where + R: Clone + Send + Sync + 'static, +{ + let schema = get_schema(schema_name, enforce_schema); + + let task_runner = MessageProcessor { + schema, + enforce_schema, + func, + result_to_next_msg: typed_result_to_next_msg::, + processor_config, + stop_at_timestamp, + }; + + Box::new(RunTaskInThreads::new( + next_step, + task_runner, + concurrency, + Some("process_message"), + )) +} + pub fn make_rust_processor_with_replacements( next_step: impl ProcessingStrategy>> + 'static, func: ProcessingFunctionWithReplacements, diff --git a/rust_snuba/src/types.rs b/rust_snuba/src/types.rs index b84b0e5fe7f..3283d5ad4f2 100644 --- a/rust_snuba/src/types.rs +++ b/rust_snuba/src/types.rs @@ -250,6 +250,39 @@ impl InsertBatch { } } +/// The return value of processors that hand a *typed* row to a downstream sink +/// that serializes it itself (e.g. the `clickhouse`-crate inserter path for +/// `eap_items`), instead of pre-encoding bytes like [`InsertBatch`]. +/// +/// `row == None` models a skip/stop: the message still carries offsets + +/// metadata so they advance, but there is no row to insert. +#[derive(Clone, Debug)] +pub struct TypedInsertBatch { + pub row: Option, + pub origin_timestamp: Option>, + pub sentry_received_timestamp: Option>, + pub cogs_data: Option, + pub item_type_metrics: Option, + /// Best-effort encoded-size estimate used for byte-based batch sizing + /// before the row is serialized (the sink also has the exact size after a + /// flush). The eap_items processor uses the source proto's `payload.len()`. + pub num_bytes: usize, +} + +impl TypedInsertBatch { + /// A skipped/empty message: no row, no metrics, zero bytes. + pub fn skip() -> Self { + Self { + row: None, + origin_timestamp: None, + sentry_received_timestamp: None, + cogs_data: None, + item_type_metrics: None, + num_bytes: 0, + } + } +} + #[derive(Clone, Debug, Default)] pub struct BytesInsertBatch { pub rows: R, @@ -416,6 +449,22 @@ impl BytesInsertBatch { (self.rows, new) } + /// Merge another batch's *metadata* (everything except `rows`) into this + /// one. Used by the inserter sink, which streams rows into a long-lived + /// `clickhouse::Inserter` and only needs to accumulate the metadata for the + /// flush boundary — mirrors the metadata half of + /// [`BytesInsertBatch::::merge`]. + pub fn merge_metadata(&mut self, other: BytesInsertBatch<()>) { + self.num_bytes += other.num_bytes; + self.commit_log_offsets.merge(other.commit_log_offsets); + self.message_timestamp.merge(other.message_timestamp); + self.origin_timestamp.merge(other.origin_timestamp); + self.sentry_received_timestamp + .merge(other.sentry_received_timestamp); + self.cogs_data.merge(other.cogs_data); + self.item_type_metrics.merge(other.item_type_metrics); + } + pub fn record_message_latency(&self) { let write_time = Utc::now();