From 827232f56c9fe5956249fd32dfa9471d686ee68b Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 18 Jun 2026 12:25:21 +0000 Subject: [PATCH 1/7] feat(eap-items): insert via the official clickhouse crate inserter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the hand-rolled RowBinary write path for eap_items (a vendored RowBinary serializer + reqwest/native-LZ4 HTTP client) with the official `clickhouse` crate's `Inserter`, picking up RowBinaryWithNamesAndTypes schema validation, compression, and connection pooling. The Rust consumer is a sentry_arroyo push-based pipeline, so the crate's typed `Inserter` is wired into a new `EapItemsInserterSink` strategy: - The eap_items processor now emits a typed `EAPItemRow` (`process_message_eap_row` → `BytesInsertBatch>`) instead of pre-encoded bytes. - `EapItemsInserterSink` owns a long-lived `Inserter` on a dedicated actor task. It writes each row on arrival (the wide struct is serialized into the inserter buffer and dropped immediately, so peak memory stays bounded by the buffer, not row count) and lets the inserter own the flush boundary via `with_max_rows`/`with_max_bytes` + `with_period`. On each `commit()` flush it pushes exactly the flushed rows' Kafka offsets downstream — the flush is the durability barrier, so a failed flush never advances offsets (batch replays). - `EAPItemRow` derives `clickhouse::Row`; the hand-maintained `COLUMN_NAMES` and the vendored `rowbinary` serializer are removed (the crate generates the column list and RowBinaryWithNamesAndTypes tolerates field ordering). The path is gated by the existing `use_row_binary` flag (EAPItemsProcessor only); JSONEachRow remains the default and the rollback. The JSONEachRow HTTP writer (`writer_v2`'s client + native-LZ4) stays for all other storages and is simplified to JSON-only. The crate's validation requires Rust 1.89+, so bump the pinned toolchain to 1.94.1 (Dockerfile/CI install from rust-toolchain.toml); this surfaced two newer clippy lints on existing code, fixed in passing. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf --- rust_snuba/Cargo.lock | 156 ++++- rust_snuba/Cargo.toml | 5 + rust_snuba/rust-toolchain.toml | 2 +- rust_snuba/src/factory_v2.rs | 318 ++++----- rust_snuba/src/processors/eap_items.rs | 147 ++--- rust_snuba/src/processors/generic_metrics.rs | 2 +- .../accepted_outcomes/produce_outcome.rs | 2 +- .../strategies/clickhouse/inserter_sink.rs | 603 ++++++++++++++++++ rust_snuba/src/strategies/clickhouse/mod.rs | 2 +- .../strategies/clickhouse/rowbinary/mod.rs | 36 -- .../strategies/clickhouse/rowbinary/ser.rs | 418 ------------ .../src/strategies/clickhouse/writer_v2.rs | 88 +-- rust_snuba/src/strategies/processor.rs | 91 ++- rust_snuba/src/types.rs | 49 ++ 14 files changed, 1137 insertions(+), 782 deletions(-) create mode 100644 rust_snuba/src/strategies/clickhouse/inserter_sink.rs delete mode 100644 rust_snuba/src/strategies/clickhouse/rowbinary/mod.rs delete mode 100644 rust_snuba/src/strategies/clickhouse/rowbinary/ser.rs diff --git a/rust_snuba/Cargo.lock b/rust_snuba/Cargo.lock index b21216974ad..a33fda1f7eb 100644 --- a/rust_snuba/Cargo.lock +++ b/rust_snuba/Cargo.lock @@ -550,12 +550,27 @@ dependencies = [ "tracing", ] +[[package]] +name = "bnum" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "119771309b95163ec7aaf79810da82f7cd0599c19722d48b9c03894dca833966" + [[package]] name = "borrow-or-share" version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc0b364ead1874514c8c2855ab558056ebfeb775653e7ae45ff72f28f8f3166c" +[[package]] +name = "bstr" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63044e1ae8e69f3b5a92c736ca6269b8d12fa7efe39bf34ddb06d102cf0e2cab" +dependencies = [ + "memchr", +] + [[package]] name = "bumpalo" version = "3.16.0" @@ -573,6 +588,9 @@ name = "bytes" version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" +dependencies = [ + "serde", +] [[package]] name = "cadence" @@ -701,6 +719,57 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" +[[package]] +name = "clickhouse" +version = "0.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8063696febb0a10a6fb9df1460c52c509188f1d19da2157694ab3ca0feffc74" +dependencies = [ + "bnum", + "bstr", + "bytes", + "cityhash-rs", + "clickhouse-macros", + "clickhouse-types", + "futures-channel", + "futures-util", + "http-body-util", + "hyper 1.6.0", + "hyper-util", + "lz4_flex", + "polonius-the-crab", + "quanta", + "serde", + "serde_json", + "thiserror 2.0.18", + "tokio", + "tracing", + "url", + "uuid", +] + +[[package]] +name = "clickhouse-macros" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff6669899e23cb87b43daf7996f0ea3b9c07d0fb933d745bb7b815b052515ae3" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals 0.29.1", + "syn 2.0.117", +] + +[[package]] +name = "clickhouse-types" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30a5efddc880ce9e2573bd867413d9056fa2bea0206af88dec21e72178b9dc74" +dependencies = [ + "bytes", + "thiserror 2.0.18", +] + [[package]] name = "cmake" version = "0.1.57" @@ -1563,6 +1632,17 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "higher-kinded-types" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e690f8474c6c5d8ff99656fcbc195a215acc3949481a8b0b3351c838972dc776" +dependencies = [ + "macro_rules_attribute", + "never-say-never", + "paste", +] + [[package]] name = "hostname" version = "0.4.1" @@ -2362,6 +2442,22 @@ dependencies = [ "twox-hash", ] +[[package]] +name = "macro_rules_attribute" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65049d7923698040cd0b1ddcced9b0eb14dd22c5f86ae59c3740eab64a676520" +dependencies = [ + "macro_rules_attribute-proc_macro", + "paste", +] + +[[package]] +name = "macro_rules_attribute-proc_macro" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "670fdfda89751bc4a84ac13eaa63e205cf0fd22b4c9a5fbfa085b63c1f1d3a30" + [[package]] name = "matchers" version = "0.2.0" @@ -2448,6 +2544,12 @@ dependencies = [ "tempfile", ] +[[package]] +name = "never-say-never" +version = "6.6.666" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf5a574dadd7941adeaa71823ecba5e28331b8313fb2e1c6a5c7e5981ea53ad6" + [[package]] name = "new_debug_unreachable" version = "1.0.6" @@ -2701,6 +2803,12 @@ dependencies = [ "windows-link 0.2.1", ] +[[package]] +name = "paste" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" + [[package]] name = "pathfinding" version = "4.9.1" @@ -2909,6 +3017,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "polonius-the-crab" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec242d7eccbb2fd8b3b5b6e3cf89f94a91a800f469005b44d154359609f8af72" +dependencies = [ + "higher-kinded-types", + "never-say-never", +] + [[package]] name = "portable-atomic" version = "1.11.1" @@ -3083,6 +3201,21 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "quanta" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quote" version = "1.0.40" @@ -3163,6 +3296,15 @@ dependencies = [ "getrandom 0.3.4", ] +[[package]] +name = "raw-cpuid" +version = "11.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186" +dependencies = [ + "bitflags 2.11.0", +] + [[package]] name = "rayon" version = "1.10.0" @@ -3400,6 +3542,7 @@ dependencies = [ "cadence", "chrono", "cityhash-rs", + "clickhouse", "criterion", "ctrlc", "data-encoding", @@ -3582,7 +3725,7 @@ checksum = "c767fd6fa65d9ccf9cf026122c1b555f2ef9a4f0cea69da4d7dbc3e258d30967" dependencies = [ "proc-macro2", "quote", - "serde_derive_internals", + "serde_derive_internals 0.26.0", "syn 1.0.109", ] @@ -3893,6 +4036,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "serde_derive_internals" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "serde_json" version = "1.0.149" diff --git a/rust_snuba/Cargo.toml b/rust_snuba/Cargo.toml index 39f69d6c3fd..3c49e77a3ea 100644 --- a/rust_snuba/Cargo.toml +++ b/rust_snuba/Cargo.toml @@ -27,6 +27,11 @@ anyhow = { version = "1.0.69", features = ["backtrace"] } cadence = "1.0.0" chrono = { version = "0.4.26", features = ["serde"] } cityhash-rs = "1.0.1" +clickhouse = { version = "0.15", default-features = false, features = [ + "inserter", + "lz4", + "uuid", +] } ctrlc = { version = "3.2.5", features = ["termination"] } data-encoding = "2.5.0" futures = "0.3.21" diff --git a/rust_snuba/rust-toolchain.toml b/rust_snuba/rust-toolchain.toml index e88baf106b9..9cf4a67c474 100644 --- a/rust_snuba/rust-toolchain.toml +++ b/rust_snuba/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "1.88.0" +channel = "1.94.1" diff --git a/rust_snuba/src/factory_v2.rs b/rust_snuba/src/factory_v2.rs index 83acfeb9503..089af11eb1b 100644 --- a/rust_snuba/src/factory_v2.rs +++ b/rust_snuba/src/factory_v2.rs @@ -26,12 +26,14 @@ use crate::processors::{self, get_cogs_label}; use crate::strategies::accountant::RecordCogs; use crate::strategies::blq_router::BLQRouter; -use crate::strategies::clickhouse::writer_v2::{ClickhouseWriterStep, InsertFormat}; +use crate::strategies::clickhouse::inserter_sink::EapItemsInserterSink; +use crate::strategies::clickhouse::writer_v2::ClickhouseWriterStep; use crate::strategies::commit_log::ProduceCommitLog; use crate::strategies::healthcheck::HealthCheck as SnubaHealthCheck; use crate::strategies::join_timeout::SetJoinTimeout; use crate::strategies::processor::{ - get_schema, make_rust_processor, make_rust_processor_with_replacements, validate_schema, + get_schema, make_rust_processor, make_rust_processor_typed, + make_rust_processor_with_replacements, validate_schema, }; use crate::strategies::python::PythonTransformStep; use crate::strategies::replacements::ProduceReplacements; @@ -87,32 +89,29 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { } } + // The Reduce accumulator closure returns Result<_, SubmitError<_>>, whose + // Err variant is large (it wraps a Message); that shape is dictated by + // arroyo's API, not us. + #[allow(clippy::result_large_err)] fn create(&self) -> Box> { - let (insert_format, process_fn_override, insert_columns): ( - InsertFormat, - Option, - Option<&'static [&'static str]>, - ) = if self.use_row_binary { - tracing::info!("Using RowBinary wire format"); + // When `use_row_binary` is set, eap_items inserts go through the + // official clickhouse-crate inserter sink (RowBinaryWithNamesAndTypes + + // schema validation), bypassing the JSONEachRow `Reduce` + + // `ClickhouseWriterStep` pair. It is only wired for EAPItemsProcessor. + let use_eap_crate_path = self.use_row_binary; + if use_eap_crate_path { let processor_name = self .storage_config .message_processor .python_class_name .as_str(); - let (func, columns): ( - crate::processors::ProcessingFunction, - &'static [&'static str], - ) = match processor_name { - "EAPItemsProcessor" => ( - crate::processors::eap_items::process_message_row_binary, - crate::processors::eap_items::EAPItemRow::COLUMN_NAMES, - ), - name => panic!("RowBinary not supported for processor: {name}"), - }; - (InsertFormat::RowBinary, Some(func), Some(columns)) - } else { - (InsertFormat::JsonEachRow, None, None) - }; + assert_eq!( + processor_name, "EAPItemsProcessor", + "use_row_binary (clickhouse-crate inserter) is only supported for \ + EAPItemsProcessor, got {processor_name}", + ); + tracing::info!("Using clickhouse-crate RowBinary inserter for eap_items"); + } // Commit offsets let next_step = CommitOffsets::new(Duration::from_secs(1)); @@ -151,136 +150,158 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { Some(Duration::from_millis(self.join_timeout_ms.unwrap_or(0))), ); - let next_step = ClickhouseWriterStep::new( - next_step, - self.storage_config.clickhouse_cluster.clone(), - self.storage_config.clickhouse_table_name.clone(), - false, - &self.clickhouse_concurrency, - self.storage_config.name.clone(), - insert_format, - insert_columns, - ); - - let accumulator = Arc::new( - |batch: BytesInsertBatch, small_batch: Message>| { - Ok(batch.merge(small_batch.into_payload())) - }, - ); - - let compute_batch_size: fn(&BytesInsertBatch) -> usize = - match self.max_batch_size_calculation { - config::BatchSizeCalculation::Bytes => |batch| batch.num_bytes(), - config::BatchSizeCalculation::Rows => |batch| batch.len(), - }; - - let next_step = Reduce::new( - next_step, - accumulator, - Arc::new(move || { - BytesInsertBatch::::new( - RowData::default(), - None, - None, - None, - Default::default(), - CogsData::default(), - ) - }), - self.max_batch_size, - self.max_batch_time, - compute_batch_size, - // we need to enable this to deal with storages where we skip 100% of values. - // we still need to commit there - ) - .flush_empty_batches(true); - - // RowBinary can only be emitted by the Rust processor (the Python path - // always returns JSONEachRow bytes). If the storage opted into - // RowBinary, force the Rust processor branch — otherwise we'd POST - // JSON bytes under `FORMAT RowBinary` and ClickHouse would reject the - // batch. The previous early-return for RowBinary did this implicitly. - let use_rust_processor = self.use_rust_processor || self.use_row_binary; - - // Transform messages - let next_step = match ( - use_rust_processor, - processors::get_processing_function( - &self.storage_config.message_processor.python_class_name, - ), - ) { - ( - true, - Some(processors::ProcessingFunctionType::ProcessingFunctionWithReplacements(func)), - ) => { - let (replacements_config, replacements_destination) = - self.replacements_config.clone().unwrap(); - - let producer = KafkaProducer::new(replacements_config); - let replacements_step = ProduceReplacements::new( - next_step, - producer, - replacements_destination, - &self.replacements_concurrency, - false, - ); - - make_rust_processor_with_replacements( - replacements_step, - func, - &self.logical_topic_name, - self.enforce_schema, - &self.processing_concurrency, - config::ProcessorConfig { - env_config: self.env_config.clone(), - storage_name: self.storage_config.name.clone(), - }, - self.stop_at_timestamp, - ) - } - (true, Some(processors::ProcessingFunctionType::ProcessingFunction(func))) => { - // For storages opted into RowBinary, swap the registered JSON - // processor for its RowBinary sibling. Same signature, same - // pipeline; only the encoding inside the processor differs. - let func = process_fn_override.unwrap_or(func); - make_rust_processor( - next_step, - func, - &self.logical_topic_name, - self.enforce_schema, - &self.processing_concurrency, - config::ProcessorConfig { - env_config: self.env_config.clone(), - storage_name: self.storage_config.name.clone(), - }, - self.stop_at_timestamp, - ) - } - ( + let next_step: Box> = if use_eap_crate_path { + // eap_items: typed processor → long-lived clickhouse-crate inserter + // sink (which replaces Reduce + ClickhouseWriterStep for this path). + let sink = EapItemsInserterSink::new( + next_step, + self.storage_config.clickhouse_cluster.clone(), + self.storage_config.clickhouse_table_name.clone(), + self.storage_config.name.clone(), + &self.clickhouse_concurrency, + self.max_batch_size, + self.max_batch_time, + self.max_batch_size_calculation, false, - Some(processors::ProcessingFunctionType::ProcessingFunctionWithReplacements(_)), - ) => { - panic!("Consumer with replacements cannot be run in hybrid-mode"); - } - _ => { - let schema = get_schema(&self.logical_topic_name, self.enforce_schema); + ); + make_rust_processor_typed( + sink, + crate::processors::eap_items::process_message_eap_row, + &self.logical_topic_name, + self.enforce_schema, + &self.processing_concurrency, + config::ProcessorConfig { + env_config: self.env_config.clone(), + storage_name: self.storage_config.name.clone(), + }, + self.stop_at_timestamp, + ) + } else { + let next_step = ClickhouseWriterStep::new( + next_step, + self.storage_config.clickhouse_cluster.clone(), + self.storage_config.clickhouse_table_name.clone(), + false, + &self.clickhouse_concurrency, + self.storage_config.name.clone(), + ); + + let accumulator = Arc::new( + |batch: BytesInsertBatch, + small_batch: Message>| { + Ok(batch.merge(small_batch.into_payload())) + }, + ); + + let compute_batch_size: fn(&BytesInsertBatch) -> usize = + match self.max_batch_size_calculation { + config::BatchSizeCalculation::Bytes => |batch| batch.num_bytes(), + config::BatchSizeCalculation::Rows => |batch| batch.len(), + }; + + let next_step = Reduce::new( + next_step, + accumulator, + Arc::new(move || { + BytesInsertBatch::::new( + RowData::default(), + None, + None, + None, + Default::default(), + CogsData::default(), + ) + }), + self.max_batch_size, + self.max_batch_time, + compute_batch_size, + // we need to enable this to deal with storages where we skip 100% of values. + // we still need to commit there + ) + .flush_empty_batches(true); + + let use_rust_processor = self.use_rust_processor; + + // Transform messages + let next_step = match ( + use_rust_processor, + processors::get_processing_function( + &self.storage_config.message_processor.python_class_name, + ), + ) { + ( + true, + Some(processors::ProcessingFunctionType::ProcessingFunctionWithReplacements( + func, + )), + ) => { + let (replacements_config, replacements_destination) = + self.replacements_config.clone().unwrap(); + + let producer = KafkaProducer::new(replacements_config); + let replacements_step = ProduceReplacements::new( + next_step, + producer, + replacements_destination, + &self.replacements_concurrency, + false, + ); - Box::new(RunTaskInThreads::new( - PythonTransformStep::new( + make_rust_processor_with_replacements( + replacements_step, + func, + &self.logical_topic_name, + self.enforce_schema, + &self.processing_concurrency, + config::ProcessorConfig { + env_config: self.env_config.clone(), + storage_name: self.storage_config.name.clone(), + }, + self.stop_at_timestamp, + ) + } + (true, Some(processors::ProcessingFunctionType::ProcessingFunction(func))) => { + make_rust_processor( next_step, - self.storage_config.message_processor.clone(), - self.processing_concurrency.concurrency, - self.python_max_queue_depth, + func, + &self.logical_topic_name, + self.enforce_schema, + &self.processing_concurrency, + config::ProcessorConfig { + env_config: self.env_config.clone(), + storage_name: self.storage_config.name.clone(), + }, + self.stop_at_timestamp, ) - .unwrap(), - SchemaValidator { - schema, - enforce_schema: self.enforce_schema, - }, - &self.processing_concurrency, - Some("validate_schema"), - )) - } + } + ( + false, + Some(processors::ProcessingFunctionType::ProcessingFunctionWithReplacements(_)), + ) => { + panic!("Consumer with replacements cannot be run in hybrid-mode"); + } + _ => { + let schema = get_schema(&self.logical_topic_name, self.enforce_schema); + + Box::new(RunTaskInThreads::new( + PythonTransformStep::new( + next_step, + self.storage_config.message_processor.clone(), + self.processing_concurrency.concurrency, + self.python_max_queue_depth, + ) + .unwrap(), + SchemaValidator { + schema, + enforce_schema: self.enforce_schema, + }, + &self.processing_concurrency, + Some("validate_schema"), + )) + } + }; + + next_step }; // force message processor to drop all in-flight messages, as it is not worth the time @@ -420,6 +441,7 @@ mod tests { } } + #[allow(clippy::result_large_err)] fn build_reduce( next_step: RecordingStep, max_batch_size: usize, diff --git a/rust_snuba/src/processors/eap_items.rs b/rust_snuba/src/processors/eap_items.rs index ebfe8ba25dd..1e0e4917a9a 100644 --- a/rust_snuba/src/processors/eap_items.rs +++ b/rust_snuba/src/processors/eap_items.rs @@ -19,9 +19,10 @@ use crate::processors::utils::{ record_invalid_timestamp_metric, SilencedDLQMessage, }; use crate::runtime_config::get_str_config; -use crate::strategies::clickhouse::rowbinary; use crate::types::CogsData; -use crate::types::{item_type_name, InsertBatch, ItemTypeMetrics, KafkaMessageMetadata}; +use crate::types::{ + item_type_name, InsertBatch, ItemTypeMetrics, KafkaMessageMetadata, TypedInsertBatch, +}; /// Runtime config key prefix. Per-storage key /// `eap_items_dlq_grace_period_min:`: a non-negative integer @@ -196,27 +197,32 @@ pub fn process_message( Ok(batch) } -pub fn process_message_row_binary( +/// Production RowBinary path: build the typed `EAPItemRow` and hand it +/// downstream to the `clickhouse`-crate inserter sink, which serializes it +/// itself (`write(&row)`) the moment it arrives and drops the wide struct, +/// keeping only the inserter's byte buffer in memory. No bytes are encoded +/// here — the crate owns RowBinary encoding + schema validation. +pub fn process_message_eap_row( msg: KafkaPayload, _metadata: KafkaMessageMetadata, config: &ProcessorConfig, -) -> anyhow::Result { +) -> anyhow::Result> { + // Source proto length, used as the byte-size estimate for byte-based batch + // sizing (the sink also reports the exact encoded size after each flush). + let num_bytes = msg.payload().map(|p| p.len()).unwrap_or(0); let processed = process_eap_item(msg, config)?; if processed.should_skip { - return Ok(InsertBatch::skip()); + return Ok(TypedInsertBatch::skip()); } let row = EAPItemRow::try_from(processed.eap_item)?; - - // Encode the row to RowBinary bytes inline so the wide typed struct (~80 - // Vec<(String, _)> buckets) drops here instead of riding the pipeline to - // the writer step. The batch downstream sees only a compact Vec. - let mut encoded_rows = Vec::new(); - rowbinary::serialize_into(&mut encoded_rows, &row)?; - - let mut batch = InsertBatch::from_encoded_rows(encoded_rows, 1, processed.origin_timestamp); - batch.cogs_data = Some(processed.cogs_data); - batch.item_type_metrics = Some(processed.item_type_metrics); - Ok(batch) + Ok(TypedInsertBatch { + row: Some(row), + origin_timestamp: processed.origin_timestamp, + sentry_received_timestamp: None, + cogs_data: Some(processed.cogs_data), + item_type_metrics: Some(processed.item_type_metrics), + num_bytes, + }) } /// Test-only: returns the typed `EAPItemRow` (plus the metadata fields the @@ -481,13 +487,13 @@ impl AttributeMap { } seq_attrs! { -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, clickhouse::Row)] pub struct EAPItemRow { organization_id: u64, project_id: u64, item_type: u8, timestamp: u32, - #[serde(with = "crate::strategies::clickhouse::rowbinary::uuid")] + #[serde(with = "clickhouse::serde::uuid")] trace_id: Uuid, item_id: u128, @@ -513,48 +519,6 @@ pub struct EAPItemRow { } } -seq_attrs! { -impl EAPItemRow { - /// Column names in struct (= wire) order. We MUST pass this list to - /// ClickHouse on insert (`INSERT INTO t (col1, col2, ...) FORMAT RowBinary`) - /// because the on-disk column order in `eap_items_1_local` differs from - /// the struct order: - /// - /// * `client_sample_rate` and `server_sample_rate` were added with - /// identical `AFTER sampling_factor` in migration 0048, so the table - /// ends up with the pair reversed (server before client). - /// * The struct interleaves `attributes_string_N, attributes_float_N` for - /// each `N` (per the `seq_attrs!` expansion), while the initial table - /// put all `attributes_string_*` first, then all `attributes_float_*`. - /// - /// Without an explicit column list, ClickHouse falls back to the table's - /// positional order and misreads bytes (the integration test hits a - /// `CANNOT_READ_ALL_DATA` deep inside the maps section). - pub(crate) const COLUMN_NAMES: &'static [&'static str] = &[ - "organization_id", - "project_id", - "item_type", - "timestamp", - "trace_id", - "item_id", - "indexed_name", - "sampling_weight", - "sampling_factor", - "client_sample_rate", - "server_sample_rate", - "retention_days", - "downsampled_retention_days", - "attributes_bool", - "attributes_int", - #( - concat!("attributes_string_", stringify!(N)), - concat!("attributes_float_", stringify!(N)), - )* - "attributes_array", - ]; -} -} - impl TryFrom for EAPItemRow { type Error = anyhow::Error; @@ -1003,7 +967,9 @@ mod tests { /// adding/reordering fields on `EAPItemRow` also updates this list. #[test] fn test_column_names_match_struct_layout() { - let names = EAPItemRow::COLUMN_NAMES; + // Column names now come from the `clickhouse::Row` derive (declaration + // order). This still guards that field names/order match the schema. + let names = ::COLUMN_NAMES; // 12 scalars + indexed_name + attributes_bool + attributes_int + 80 buckets + attributes_array assert_eq!(names.len(), 96); assert_eq!(names[0], "organization_id"); @@ -1615,11 +1581,11 @@ mod tests { } /// End-to-end test of the production RowBinary path against a live - /// ClickHouse instance: process_message_row_binary produces bytes via the - /// vendored serializer, those bytes are POSTed verbatim with - /// `FORMAT RowBinary`, and the row is read back via `FORMAT JSON`. This - /// is the cross-boundary check that our wire format matches what - /// ClickHouse expects — pure unit tests can't catch that. + /// ClickHouse instance: `process_message_eap_row` produces a typed + /// `EAPItemRow`, the official clickhouse-crate `Inserter` serializes + + /// inserts it (RowBinaryWithNamesAndTypes + schema validation), and the row + /// is read back via `FORMAT JSON`. This is the cross-boundary check that our + /// types match the live schema — pure unit tests can't catch that. #[tokio::test] async fn test_row_binary_clickhouse_insert() { let host = std::env::var("CLICKHOUSE_HOST").unwrap_or("127.0.0.1".to_string()); @@ -1628,6 +1594,8 @@ mod tests { .parse() .unwrap(); let database = std::env::var("CLICKHOUSE_DATABASE").unwrap_or("default".to_string()); + let user = std::env::var("CLICKHOUSE_USER").unwrap_or("default".to_string()); + let password = std::env::var("CLICKHOUSE_PASSWORD").unwrap_or_default(); let base_url = format!("http://{host}:{http_port}"); let http = reqwest::Client::new(); @@ -1677,35 +1645,26 @@ mod tests { timestamp: DateTime::from(SystemTime::now()), }; - // Production path: encodes the row to RowBinary bytes inside the processor. - let batch = process_message_row_binary(payload, meta, &ProcessorConfig::default()) + // Production path: build the typed row, then insert it through the + // official clickhouse-crate inserter (RowBinaryWithNamesAndTypes + + // schema validation). Exercises the real EAPItemRow → live-schema + // mapping (UUID, Map columns, DateTime, UInt128, …). + let batch = process_message_eap_row(payload, meta, &ProcessorConfig::default()) .expect("The message should be processed"); - assert_eq!(batch.rows.num_rows, 1); - - // Insert: POST the pre-encoded bytes with FORMAT RowBinary. We must - // pass the column list — the struct's wire order does NOT match the - // table's on-disk column order (see EAPItemRow::COLUMN_NAMES). - let insert_query = format!( - "INSERT INTO eap_items_1_local ({}) FORMAT RowBinary", - EAPItemRow::COLUMN_NAMES.join(", "), - ); - let insert_resp = http - .post(&base_url) - .header("X-ClickHouse-Database", &database) - .query(&[ - ("query", insert_query.as_str()), - ("input_format_binary_read_json_as_string", "1"), - ("insert_deduplicate", "0"), - ]) - .body(batch.rows.encoded_rows.clone()) - .send() - .await - .expect("Insert request failed to send"); - assert!( - insert_resp.status().is_success(), - "Insert failed: {}", - insert_resp.text().await.unwrap_or_default() - ); + let row = batch.row.expect("row should be present"); + + let client = clickhouse::Client::default() + .with_url(base_url.as_str()) + .with_user(user) + .with_password(password) + .with_database(database.clone()) + .with_setting("input_format_binary_read_json_as_string", "1"); + let mut inserter = client + .inserter::("eap_items_1_local") + .with_max_rows(1); + inserter.write(&row).await.expect("write row"); + let quantities = inserter.end().await.expect("flush insert"); + assert_eq!(quantities.rows, 1); // Read it back via FORMAT JSON. We use organization_id (primary key prefix) // for a deterministic lookup. ClickHouse's FORMAT JSON renders 64-bit diff --git a/rust_snuba/src/processors/generic_metrics.rs b/rust_snuba/src/processors/generic_metrics.rs index 8fcac903d2c..d62a6c0d08f 100644 --- a/rust_snuba/src/processors/generic_metrics.rs +++ b/rust_snuba/src/processors/generic_metrics.rs @@ -166,7 +166,7 @@ fn decode_encoded_into_numeric_array( where T: Decodable, { - if data.len() % T::SIZE == 0 { + if data.len().is_multiple_of(T::SIZE) { Ok(data .chunks_exact(T::SIZE) .map(TryInto::try_into) diff --git a/rust_snuba/src/strategies/accepted_outcomes/produce_outcome.rs b/rust_snuba/src/strategies/accepted_outcomes/produce_outcome.rs index 23979cf52e4..f279dbdcf65 100644 --- a/rust_snuba/src/strategies/accepted_outcomes/produce_outcome.rs +++ b/rust_snuba/src/strategies/accepted_outcomes/produce_outcome.rs @@ -87,7 +87,7 @@ impl TaskRunner // Pace produces to avoid flooding the producer queue with a single // large batch all at once. produced_count += 1; - if produced_count % throttle_batch_size == 0 { + if produced_count.is_multiple_of(throttle_batch_size) { tokio::time::sleep(throttle_sleep).await; } } diff --git a/rust_snuba/src/strategies/clickhouse/inserter_sink.rs b/rust_snuba/src/strategies/clickhouse/inserter_sink.rs new file mode 100644 index 00000000000..2c2d19c1269 --- /dev/null +++ b/rust_snuba/src/strategies/clickhouse/inserter_sink.rs @@ -0,0 +1,603 @@ +//! Arroyo sink that inserts `eap_items` rows through the official `clickhouse` +//! crate's [`Inserter`]. +//! +//! Shape: the processor hands this strategy a *typed* [`EAPItemRow`] per +//! message (wrapped in `BytesInsertBatch>`; `None` == a +//! skipped/empty message that still carries offsets). A long-lived actor task +//! owns one [`clickhouse::inserter::Inserter`] and writes each row the moment +//! it arrives — the wide struct is serialized into the inserter's byte buffer +//! and dropped immediately, so peak memory stays bounded by the buffer, not by +//! row count. +//! +//! The inserter owns the flush boundary: it is configured with our batch +//! settings (`with_max_rows`/`with_max_bytes` + `with_period`), and we drive it +//! with [`Inserter::commit`] (never `force_commit`). When `commit()` reports a +//! real flush (`Quantities.rows > 0`), the rows it flushed are durably in +//! ClickHouse; only then do we push *their* Kafka offsets downstream toward +//! `CommitOffsets`. A failed flush is never pushed, so its offsets are not +//! committed and the batch replays on restart — the same durability barrier the +//! old writer had. + +use std::collections::BTreeMap; +use std::time::{Duration, SystemTime}; + +use sentry_arroyo::processing::strategies::run_task_in_threads::ConcurrencyConfig; +use sentry_arroyo::processing::strategies::{ + merge_commit_request, CommitRequest, MessageRejected, ProcessingStrategy, StrategyError, + SubmitError, +}; +use sentry_arroyo::types::{Message, Partition}; +use sentry_arroyo::utils::timing::Deadline; +use sentry_arroyo::{counter, timer}; +use tokio::sync::mpsc; + +use crate::config::{BatchSizeCalculation, ClickhouseConfig}; +use crate::processors::eap_items::EAPItemRow; +use crate::runtime_config::{get_load_balancing_config, get_max_insert_block_size}; +use crate::types::BytesInsertBatch; + +/// Upper bound on typed rows in flight between `submit` and the actor. Kept +/// small (and independent of `max_batch_size`) so the channel never becomes a +/// `Vec` accumulator — the inserter's byte buffer is the only place +/// a whole batch lives. When full, `submit` returns `MessageRejected` and +/// arroyo back-pressures upstream. +const WORK_CHANNEL_CAPACITY: usize = 1024; + +/// Work handed from the strategy (sync) to the actor (async), in arrival order. +enum WorkItem { + Row { + // Boxed because `EAPItemRow` is wide (~96 columns); keeps the channel + // item small and the enum cheap to move. + row: Box, + committable: Vec<(Partition, u64)>, + meta: BytesInsertBatch<()>, + }, + Skip { + committable: Vec<(Partition, u64)>, + meta: BytesInsertBatch<()>, + }, +} + +/// A flush succeeded (or a skip-only window elapsed): these offsets + metadata +/// are safe to push downstream. +struct ReadyFlush { + committable: BTreeMap, + meta: BytesInsertBatch<()>, + bytes: u64, + rows: u64, + elapsed: Duration, +} + +/// Result handed from the actor back to the strategy, drained in `poll`. +enum FlushOutcome { + /// Boxed because [`ReadyFlush`] is much larger than the `Err` variant. + Ready(Box), + /// A write/commit/end failed: the offsets it would have covered are + /// discarded (never pushed) so the batch replays. + Err(String), +} + +/// Metadata accumulated for the current unflushed window. Rows themselves live +/// only in the inserter's byte buffer. +#[derive(Default)] +struct Acc { + committable: BTreeMap, + meta: BytesInsertBatch<()>, + rows_written: usize, +} + +impl Acc { + fn merge(&mut self, committable: Vec<(Partition, u64)>, meta: BytesInsertBatch<()>) { + for (partition, offset) in committable { + let entry = self.committable.entry(partition).or_insert(offset); + *entry = (*entry).max(offset); + } + self.meta.merge_metadata(meta); + } + + fn has_offsets(&self) -> bool { + !self.committable.is_empty() + } +} + +/// Take the accumulated window and send it downstream as a ready flush. +fn emit_ready( + out_tx: &mpsc::UnboundedSender, + acc: &mut Acc, + bytes: u64, + rows: u64, + elapsed: Duration, +) { + let taken = std::mem::take(acc); + let _ = out_tx.send(FlushOutcome::Ready(Box::new(ReadyFlush { + committable: taken.committable, + meta: taken.meta, + bytes, + rows, + elapsed, + }))); +} + +fn build_client(cfg: &ClickhouseConfig) -> clickhouse::Client { + let scheme = if cfg.secure { "https" } else { "http" }; + clickhouse::Client::default() + .with_url(format!("{}://{}:{}", scheme, cfg.host, cfg.http_port)) + .with_user(cfg.user.clone()) + .with_password(cfg.password.clone()) + .with_database(cfg.database.clone()) + // Mirror the old writer's URL params: synchronous distributed insert and + // "treat binary strings bound for JSON columns as JSON text". + .with_setting("insert_distributed_sync", "1") + .with_setting("input_format_binary_read_json_as_string", "1") +} + +fn make_inserter( + client: &clickhouse::Client, + table: &str, + storage_name: &str, + max_rows: u64, + max_bytes: u64, + max_batch_time: Duration, +) -> clickhouse::inserter::Inserter { + let lb = get_load_balancing_config(storage_name); + let mut inserter = client + .inserter::(table) + .with_max_rows(max_rows) + .with_max_bytes(max_bytes) + .with_period(Some(max_batch_time)) + .with_setting("load_balancing", lb.load_balancing); + if let Some(first_offset) = lb.first_offset { + inserter = inserter.with_setting("load_balancing_first_offset", first_offset); + } + if let Some(block_size) = get_max_insert_block_size(storage_name) { + inserter = inserter.with_setting("max_insert_block_size", block_size.to_string()); + } + inserter +} + +#[allow(clippy::too_many_arguments)] +async fn run_inserter_actor( + config: ClickhouseConfig, + table: String, + storage_name: String, + max_rows: u64, + max_bytes: u64, + max_batch_time: Duration, + skip_write: bool, + mut work_rx: mpsc::Receiver, + out_tx: mpsc::UnboundedSender, +) { + let client = build_client(&config); + let mut inserter = make_inserter( + &client, + &table, + &storage_name, + max_rows, + max_bytes, + max_batch_time, + ); + let mut acc = Acc::default(); + + // The crate only evaluates `with_period` inside `commit()`, so we call + // `commit()` on a tick. Ticking finer than `max_batch_time` makes the + // period flush fire promptly instead of slipping a whole window. + let tick_period = (max_batch_time / 4).max(Duration::from_millis(10)); + let mut interval = tokio::time::interval(tick_period); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + interval.tick().await; // consume the immediate first tick + + // Bounds how often a skip-only window pushes offsets, so we don't emit a + // downstream message every tick when no rows are flowing. + let mut skip_deadline = Deadline::new(max_batch_time); + + macro_rules! on_flush_err { + ($e:expr) => {{ + let _ = out_tx.send(FlushOutcome::Err($e.to_string())); + // `write` panics if reused after an error — rebuild, and drop the + // doomed window so its offsets are never pushed (→ replay). + inserter = make_inserter( + &client, + &table, + &storage_name, + max_rows, + max_bytes, + max_batch_time, + ); + acc = Acc::default(); + skip_deadline = Deadline::new(max_batch_time); + }}; + } + + loop { + tokio::select! { + maybe = work_rx.recv() => match maybe { + Some(WorkItem::Row { row, committable, meta }) => { + acc.merge(committable, meta); + acc.rows_written += 1; + if skip_write { + continue; + } + let start = SystemTime::now(); + if let Err(e) = inserter.write(&*row).await { + on_flush_err!(e); + continue; + } + match inserter.commit().await { + Ok(q) if q.rows > 0 => { + let elapsed = start.elapsed().unwrap_or_default(); + emit_ready(&out_tx, &mut acc, q.bytes, q.rows, elapsed); + skip_deadline = Deadline::new(max_batch_time); + } + Ok(_) => {} + Err(e) => on_flush_err!(e), + } + } + Some(WorkItem::Skip { committable, meta }) => acc.merge(committable, meta), + None => break, // strategy dropped the sender: finalize below + }, + _ = interval.tick() => { + if skip_write { + if acc.has_offsets() && skip_deadline.has_elapsed() { + let rows = acc.rows_written as u64; + emit_ready(&out_tx, &mut acc, 0, rows, Duration::ZERO); + skip_deadline = Deadline::new(max_batch_time); + } + continue; + } + let start = SystemTime::now(); + match inserter.commit().await { + Ok(q) if q.rows > 0 => { + let elapsed = start.elapsed().unwrap_or_default(); + emit_ready(&out_tx, &mut acc, q.bytes, q.rows, elapsed); + skip_deadline = Deadline::new(max_batch_time); + } + Ok(_) => { + // Period elapsed with nothing to flush (skip-only + // window): advance offsets so the consumer doesn't stall. + if acc.has_offsets() && acc.rows_written == 0 && skip_deadline.has_elapsed() { + emit_ready(&out_tx, &mut acc, 0, 0, Duration::ZERO); + skip_deadline = Deadline::new(max_batch_time); + } + } + Err(e) => on_flush_err!(e), + } + } + } + } + + // Finalize: flush the last partial window. + if skip_write { + if acc.has_offsets() { + let rows = acc.rows_written as u64; + emit_ready(&out_tx, &mut acc, 0, rows, Duration::ZERO); + } + return; + } + let start = SystemTime::now(); + match inserter.end().await { + Ok(q) => { + if acc.has_offsets() { + let elapsed = start.elapsed().unwrap_or_default(); + emit_ready(&out_tx, &mut acc, q.bytes, q.rows, elapsed); + } + } + Err(e) => { + let _ = out_tx.send(FlushOutcome::Err(e.to_string())); + } + } +} + +pub struct EapItemsInserterSink { + next_step: N, + /// `None` once the stream has been closed (in `join`/`terminate`). + work_tx: Option>, + out_rx: mpsc::UnboundedReceiver, + actor: Option>, + /// A flushed message that the next step rejected; retried before pulling more. + message_carried_over: Option>>, + commit_request_carried_over: Option, + actor_done: bool, +} + +impl EapItemsInserterSink +where + N: ProcessingStrategy> + 'static, +{ + #[allow(clippy::too_many_arguments)] + pub fn new( + next_step: N, + cluster: ClickhouseConfig, + table: String, + storage_name: String, + concurrency: &ConcurrencyConfig, + max_batch_size: usize, + max_batch_time: Duration, + batch_size_calculation: BatchSizeCalculation, + skip_write: bool, + ) -> Self { + let (max_rows, max_bytes) = match batch_size_calculation { + BatchSizeCalculation::Rows => (max_batch_size as u64, u64::MAX), + BatchSizeCalculation::Bytes => (u64::MAX, max_batch_size as u64), + }; + + let (work_tx, work_rx) = mpsc::channel::(WORK_CHANNEL_CAPACITY); + let (out_tx, out_rx) = mpsc::unbounded_channel::(); + + let actor = concurrency.handle().spawn(run_inserter_actor( + cluster, + table, + storage_name, + max_rows, + max_bytes, + max_batch_time, + skip_write, + work_rx, + out_tx, + )); + + Self { + next_step, + work_tx: Some(work_tx), + out_rx, + actor: Some(actor), + message_carried_over: None, + commit_request_carried_over: None, + actor_done: false, + } + } + + /// Retry a downstream message that was previously rejected. + fn submit_carried_over(&mut self) -> Result<(), StrategyError> { + if let Some(message) = self.message_carried_over.take() { + match self.next_step.submit(message) { + Ok(()) => {} + Err(SubmitError::MessageRejected(MessageRejected { message })) => { + self.message_carried_over = Some(message); + } + Err(SubmitError::InvalidMessage(e)) => return Err(e.into()), + } + } + Ok(()) + } + + /// Drain ready flushes from the actor and push them downstream. Stops on + /// downstream backpressure (carry the message and retry next poll). + fn drain_outcomes(&mut self) -> Result<(), StrategyError> { + while self.message_carried_over.is_none() { + match self.out_rx.try_recv() { + Ok(FlushOutcome::Ready(ready)) => { + let ReadyFlush { + committable, + meta, + bytes, + rows, + elapsed, + } = *ready; + timer!("insertions.batch_write_ms", elapsed); + counter!("insertions.batch_write_bytes", bytes as i64); + counter!("insertions.batch_write_msgs", rows as i64); + meta.record_message_latency(); + meta.emit_item_type_metrics(); + let message = Message::new_any_message(meta, committable); + match self.next_step.submit(message) { + Ok(()) => {} + Err(SubmitError::MessageRejected(MessageRejected { message })) => { + self.message_carried_over = Some(message); + } + Err(SubmitError::InvalidMessage(e)) => return Err(e.into()), + } + } + Ok(FlushOutcome::Err(e)) => { + counter!( + "rust_consumer.clickhouse_insert_error", + 1, + "status" => "insert_error", + "retried" => "false" + ); + tracing::error!("ClickHouse inserter flush failed: {}", e); + return Err(StrategyError::Other(e.into())); + } + Err(mpsc::error::TryRecvError::Empty) => break, + Err(mpsc::error::TryRecvError::Disconnected) => { + self.actor_done = true; + break; + } + } + } + Ok(()) + } +} + +impl ProcessingStrategy>> for EapItemsInserterSink +where + N: ProcessingStrategy> + 'static, +{ + fn poll(&mut self) -> Result, StrategyError> { + let commit_request = self.next_step.poll()?; + self.commit_request_carried_over = + merge_commit_request(self.commit_request_carried_over.take(), commit_request); + self.submit_carried_over()?; + self.drain_outcomes()?; + Ok(self.commit_request_carried_over.take()) + } + + fn submit( + &mut self, + message: Message>>, + ) -> Result<(), SubmitError>>> { + // Downstream is backpressured, or we're shutting down: reject upstream too. + if self.message_carried_over.is_some() { + return Err(SubmitError::MessageRejected(MessageRejected { message })); + } + let Some(work_tx) = self.work_tx.as_ref() else { + return Err(SubmitError::MessageRejected(MessageRejected { message })); + }; + // Reserve a slot first so we never destructure `message` on the reject path. + let Ok(permit) = work_tx.try_reserve() else { + return Err(SubmitError::MessageRejected(MessageRejected { message })); + }; + + let committable: Vec<(Partition, u64)> = message.committable().collect(); + let (row, meta) = message.into_payload().take(); + let item = match row { + Some(row) => WorkItem::Row { + row: Box::new(row), + committable, + meta, + }, + None => WorkItem::Skip { committable, meta }, + }; + permit.send(item); + Ok(()) + } + + fn terminate(&mut self) { + // Drop the sender and abort the actor (abandons any open INSERT; the + // server discards an un-`end()`ed insert, and un-pushed offsets replay). + self.work_tx = None; + if let Some(handle) = self.actor.take() { + handle.abort(); + } + self.next_step.terminate(); + } + + fn join(&mut self, timeout: Option) -> Result, StrategyError> { + // Closing the sender makes the actor finalize (flush via `end()`), then + // drop `out_tx` so `out_rx` disconnects. + self.work_tx = None; + let deadline = timeout.map(Deadline::new); + + loop { + let commit_request = self.next_step.poll()?; + self.commit_request_carried_over = + merge_commit_request(self.commit_request_carried_over.take(), commit_request); + self.submit_carried_over()?; + self.drain_outcomes()?; + + if self.actor_done && self.message_carried_over.is_none() { + break; + } + if let Some(deadline) = &deadline { + if deadline.has_elapsed() { + tracing::warn!( + "inserter sink join timed out; partial batch left unacked (will replay)" + ); + break; + } + } + // Let the actor make progress on its runtime threads. + std::thread::sleep(Duration::from_millis(1)); + } + + let next_commit = self.next_step.join(timeout)?; + Ok(merge_commit_request( + self.commit_request_carried_over.take(), + next_commit, + )) + } +} + +impl Drop for EapItemsInserterSink { + fn drop(&mut self) { + // Abort the actor if it's still around so a dropped sink never flushes a + // partial INSERT (whose offsets were never pushed) behind our back. + if let Some(handle) = self.actor.take() { + handle.abort(); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::{Arc, Mutex}; + + use sentry_arroyo::processing::strategies::ProcessingStrategy; + use sentry_arroyo::types::{Partition, Topic}; + + /// Records the committable of every message it receives. + struct RecordingStep { + committables: Arc>>>, + } + + impl ProcessingStrategy> for RecordingStep { + fn poll(&mut self) -> Result, StrategyError> { + Ok(None) + } + fn submit( + &mut self, + message: Message>, + ) -> Result<(), SubmitError>> { + let committable: BTreeMap = message.committable().collect(); + self.committables.lock().unwrap().push(committable); + Ok(()) + } + fn terminate(&mut self) {} + fn join(&mut self, _: Option) -> Result, StrategyError> { + Ok(None) + } + } + + fn test_cluster() -> ClickhouseConfig { + ClickhouseConfig { + host: "127.0.0.1".to_string(), + port: 9000, + secure: false, + http_port: 8123, + user: "default".to_string(), + password: "".to_string(), + database: "default".to_string(), + } + } + + /// With `skip_write`, the actor never touches ClickHouse, so this exercises + /// the full submit → actor → drain → downstream path (and `offset + 1` + /// next-to-consume semantics) without a live cluster. Skipped (`None`) rows + /// must still advance offsets, mirroring `flush_empty_batches`. + #[test] + fn skipped_messages_advance_offsets_without_clickhouse() { + crate::testutils::initialize_python(); + + let recorded = Arc::new(Mutex::new(Vec::new())); + let next_step = RecordingStep { + committables: recorded.clone(), + }; + let concurrency = ConcurrencyConfig::new(1); + + let mut sink = EapItemsInserterSink::new( + next_step, + test_cluster(), + "eap_items_1_local".to_string(), + "test_storage".to_string(), + &concurrency, + 1000, + Duration::from_millis(50), + BatchSizeCalculation::Rows, + true, // skip_write + ); + + let partition = Partition::new(Topic::new("snuba-items"), 0); + for offset in 0..5u64 { + let message = Message::new_broker_message( + BytesInsertBatch::>::default(), + partition, + offset, + chrono::Utc::now(), + ); + sink.submit(message).unwrap(); + } + + // join() closes the stream so the actor finalizes and flushes the + // accumulated offsets, which we then drain downstream. + sink.join(Some(Duration::from_secs(5))).unwrap(); + + let recorded = recorded.lock().unwrap(); + let max_offset = recorded + .iter() + .filter_map(|c| c.get(&partition).copied()) + .max() + .expect("offsets should have been pushed downstream"); + // Highest consumed offset is 4 → next-to-consume committable is 5. + assert_eq!(max_offset, 5); + } +} diff --git a/rust_snuba/src/strategies/clickhouse/mod.rs b/rust_snuba/src/strategies/clickhouse/mod.rs index 45a3d852766..905b7251b15 100644 --- a/rust_snuba/src/strategies/clickhouse/mod.rs +++ b/rust_snuba/src/strategies/clickhouse/mod.rs @@ -1,2 +1,2 @@ -pub mod rowbinary; +pub mod inserter_sink; pub mod writer_v2; diff --git a/rust_snuba/src/strategies/clickhouse/rowbinary/mod.rs b/rust_snuba/src/strategies/clickhouse/rowbinary/mod.rs deleted file mode 100644 index 880bf3aafda..00000000000 --- a/rust_snuba/src/strategies/clickhouse/rowbinary/mod.rs +++ /dev/null @@ -1,36 +0,0 @@ -//! Minimal RowBinary serializer for inserts into ClickHouse. -//! -//! Vendored from the `clickhouse` crate (Apache-2.0 / MIT) and reduced to the -//! shape `rust_snuba` actually emits: fixed-width integers, f32/f64, bool, str, -//! raw byte slices (UUID / FixedString), length-prefixed sequences, tuples and -//! structs. We do this in-process so the typed rows can be serialized to bytes -//! inside the processor and dropped immediately, instead of riding the pipeline -//! all the way to the writer step. - -mod ser; - -// `Error` is part of the module's public surface (it's the error type of -// `serialize_into`). It isn't referenced by name anywhere in the crate today -// — callers propagate via `?` into `anyhow::Error` — but keep the re-export -// so consumers can match on the variant if they ever need to. -#[allow(unused_imports)] -pub use ser::{serialize_into, Error}; - -pub mod uuid { - //! Serde adapter for `#[serde(with = "...")]` on `Uuid` fields. ClickHouse - //! stores UUID as two little-endian u64 halves, so the canonical 16-byte - //! UUID has each half reversed before being written. - - use serde::Serializer; - use uuid::Uuid; - - pub fn serialize(uuid: &Uuid, serializer: S) -> Result - where - S: Serializer, - { - let mut bytes = *uuid.as_bytes(); - bytes[..8].reverse(); - bytes[8..].reverse(); - serializer.serialize_bytes(&bytes) - } -} diff --git a/rust_snuba/src/strategies/clickhouse/rowbinary/ser.rs b/rust_snuba/src/strategies/clickhouse/rowbinary/ser.rs deleted file mode 100644 index e17fab74d7b..00000000000 --- a/rust_snuba/src/strategies/clickhouse/rowbinary/ser.rs +++ /dev/null @@ -1,418 +0,0 @@ -use std::fmt; - -use serde::ser::{ - self, Impossible, Serialize, SerializeMap, SerializeSeq, SerializeStruct, SerializeTuple, - SerializeTupleStruct, -}; - -/// Append the RowBinary encoding of `value` to `buf`. -pub fn serialize_into(buf: &mut Vec, value: &T) -> Result<(), Error> -where - T: Serialize + ?Sized, -{ - let mut serializer = Serializer { buf }; - value.serialize(&mut serializer) -} - -#[derive(Debug)] -pub enum Error { - Message(String), - /// Sequences and maps must report a known length so we can emit the - /// LEB128 length prefix up front. - SequenceLengthRequired, - Unsupported(&'static str), -} - -impl fmt::Display for Error { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Error::Message(s) => f.write_str(s), - Error::SequenceLengthRequired => { - f.write_str("rowbinary: sequences and maps must have a known length") - } - Error::Unsupported(what) => write!(f, "rowbinary: unsupported serde type: {what}"), - } - } -} - -impl std::error::Error for Error {} - -impl ser::Error for Error { - fn custom(msg: T) -> Self { - Error::Message(msg.to_string()) - } -} - -struct Serializer<'a> { - buf: &'a mut Vec, -} - -fn write_uvarint(buf: &mut Vec, mut v: u64) { - while v >= 0x80 { - buf.push((v as u8) | 0x80); - v >>= 7; - } - buf.push(v as u8); -} - -macro_rules! impl_le { - ($($name:ident => $ty:ty),+ $(,)?) => {$( - #[inline] - fn $name(self, v: $ty) -> Result<(), Error> { - self.buf.extend_from_slice(&v.to_le_bytes()); - Ok(()) - } - )+}; -} - -impl<'a, 'b> ser::Serializer for &'a mut Serializer<'b> { - type Ok = (); - type Error = Error; - - type SerializeSeq = Self; - type SerializeTuple = Self; - type SerializeTupleStruct = Self; - type SerializeMap = Self; - type SerializeStruct = Self; - type SerializeTupleVariant = Impossible<(), Error>; - type SerializeStructVariant = Impossible<(), Error>; - - #[inline] - fn serialize_bool(self, v: bool) -> Result<(), Error> { - self.buf.push(v as u8); - Ok(()) - } - - #[inline] - fn serialize_u8(self, v: u8) -> Result<(), Error> { - self.buf.push(v); - Ok(()) - } - - #[inline] - fn serialize_i8(self, v: i8) -> Result<(), Error> { - self.buf.push(v as u8); - Ok(()) - } - - impl_le! { - serialize_i16 => i16, - serialize_i32 => i32, - serialize_i64 => i64, - serialize_i128 => i128, - serialize_u16 => u16, - serialize_u32 => u32, - serialize_u64 => u64, - serialize_u128 => u128, - serialize_f32 => f32, - serialize_f64 => f64, - } - - fn serialize_char(self, v: char) -> Result<(), Error> { - let mut tmp = [0u8; 4]; - self.serialize_str(v.encode_utf8(&mut tmp)) - } - - fn serialize_str(self, v: &str) -> Result<(), Error> { - write_uvarint(self.buf, v.len() as u64); - self.buf.extend_from_slice(v.as_bytes()); - Ok(()) - } - - /// Raw byte write — no length prefix. Matches RowBinary's fixed-width - /// encoding for UUID and FixedString(N). Variable-length byte sequences - /// should be modeled as `Vec` and hit `serialize_seq`. - fn serialize_bytes(self, v: &[u8]) -> Result<(), Error> { - self.buf.extend_from_slice(v); - Ok(()) - } - - fn serialize_none(self) -> Result<(), Error> { - self.buf.push(1); - Ok(()) - } - - fn serialize_some(self, value: &T) -> Result<(), Error> - where - T: ?Sized + Serialize, - { - self.buf.push(0); - value.serialize(self) - } - - fn serialize_unit(self) -> Result<(), Error> { - Ok(()) - } - - fn serialize_unit_struct(self, _name: &'static str) -> Result<(), Error> { - Ok(()) - } - - fn serialize_unit_variant( - self, - _name: &'static str, - _index: u32, - _variant: &'static str, - ) -> Result<(), Error> { - Err(Error::Unsupported("enum variant")) - } - - fn serialize_newtype_struct(self, _name: &'static str, value: &T) -> Result<(), Error> - where - T: ?Sized + Serialize, - { - value.serialize(self) - } - - fn serialize_newtype_variant( - self, - _name: &'static str, - _index: u32, - _variant: &'static str, - _value: &T, - ) -> Result<(), Error> - where - T: ?Sized + Serialize, - { - Err(Error::Unsupported("enum variant")) - } - - fn serialize_seq(self, len: Option) -> Result { - let len = len.ok_or(Error::SequenceLengthRequired)?; - write_uvarint(self.buf, len as u64); - Ok(self) - } - - fn serialize_tuple(self, _len: usize) -> Result { - Ok(self) - } - - fn serialize_tuple_struct( - self, - _name: &'static str, - _len: usize, - ) -> Result { - Ok(self) - } - - fn serialize_tuple_variant( - self, - _name: &'static str, - _index: u32, - _variant: &'static str, - _len: usize, - ) -> Result { - Err(Error::Unsupported("enum variant")) - } - - fn serialize_map(self, len: Option) -> Result { - let len = len.ok_or(Error::SequenceLengthRequired)?; - write_uvarint(self.buf, len as u64); - Ok(self) - } - - fn serialize_struct( - self, - _name: &'static str, - _len: usize, - ) -> Result { - Ok(self) - } - - fn serialize_struct_variant( - self, - _name: &'static str, - _index: u32, - _variant: &'static str, - _len: usize, - ) -> Result { - Err(Error::Unsupported("enum variant")) - } -} - -impl<'a, 'b> SerializeSeq for &'a mut Serializer<'b> { - type Ok = (); - type Error = Error; - fn serialize_element(&mut self, value: &T) -> Result<(), Error> - where - T: ?Sized + Serialize, - { - value.serialize(&mut **self) - } - fn end(self) -> Result<(), Error> { - Ok(()) - } -} - -impl<'a, 'b> SerializeTuple for &'a mut Serializer<'b> { - type Ok = (); - type Error = Error; - fn serialize_element(&mut self, value: &T) -> Result<(), Error> - where - T: ?Sized + Serialize, - { - value.serialize(&mut **self) - } - fn end(self) -> Result<(), Error> { - Ok(()) - } -} - -impl<'a, 'b> SerializeTupleStruct for &'a mut Serializer<'b> { - type Ok = (); - type Error = Error; - fn serialize_field(&mut self, value: &T) -> Result<(), Error> - where - T: ?Sized + Serialize, - { - value.serialize(&mut **self) - } - fn end(self) -> Result<(), Error> { - Ok(()) - } -} - -impl<'a, 'b> SerializeMap for &'a mut Serializer<'b> { - type Ok = (); - type Error = Error; - fn serialize_key(&mut self, key: &T) -> Result<(), Error> - where - T: ?Sized + Serialize, - { - key.serialize(&mut **self) - } - fn serialize_value(&mut self, value: &T) -> Result<(), Error> - where - T: ?Sized + Serialize, - { - value.serialize(&mut **self) - } - fn end(self) -> Result<(), Error> { - Ok(()) - } -} - -impl<'a, 'b> SerializeStruct for &'a mut Serializer<'b> { - type Ok = (); - type Error = Error; - fn serialize_field(&mut self, _key: &'static str, value: &T) -> Result<(), Error> - where - T: ?Sized + Serialize, - { - value.serialize(&mut **self) - } - fn end(self) -> Result<(), Error> { - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use serde::Serialize; - - fn encode(v: &T) -> Vec { - let mut buf = Vec::new(); - serialize_into(&mut buf, v).unwrap(); - buf - } - - #[test] - fn primitives_are_little_endian() { - assert_eq!(encode(&0x01u8), vec![0x01]); - assert_eq!(encode(&0x0102u16), vec![0x02, 0x01]); - assert_eq!(encode(&0x01020304u32), vec![0x04, 0x03, 0x02, 0x01]); - assert_eq!( - encode(&0x0102030405060708u64), - vec![0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01], - ); - assert_eq!(encode(&true), vec![1]); - assert_eq!(encode(&false), vec![0]); - } - - #[test] - fn strings_get_uvarint_length_prefix() { - assert_eq!(encode(&""), vec![0]); - assert_eq!(encode(&"abc"), vec![3, b'a', b'b', b'c']); - // 130-char string crosses the single-byte LEB128 boundary (0x80). - let s = "x".repeat(130); - let out = encode(&s); - assert_eq!(&out[..2], &[0x82, 0x01]); // 130 = 0b1_0000010 → 0x82 0x01 - assert_eq!(out.len(), 132); - } - - #[test] - fn bytes_are_raw_no_prefix() { - // serialize_bytes is the FixedString path — no length prefix. - // (We use this for UUIDs.) - struct Raw(Vec); - impl Serialize for Raw { - fn serialize(&self, s: S) -> Result { - s.serialize_bytes(&self.0) - } - } - let out = encode(&Raw(vec![0xaa, 0xbb, 0xcc])); - assert_eq!(out, vec![0xaa, 0xbb, 0xcc]); - } - - #[test] - fn sequences_prefix_then_elements() { - let v: Vec = vec![1, 2, 3]; - // 3 elements, then each u16 LE. - assert_eq!(encode(&v), vec![3, 1, 0, 2, 0, 3, 0]); - } - - #[test] - fn map_like_vec_of_pairs_matches_clickhouse_map_shape() { - // Map(String, UInt8) is wire-equivalent to Array(Tuple(String, UInt8)): - // uvarint length, then concatenated (string-prefix, key, u8) tuples. - let v: Vec<(String, u8)> = vec![("a".into(), 7), ("bb".into(), 9)]; - assert_eq!( - encode(&v), - vec![ - 2, // len - 1, b'a', 7, // ("a", 7) - 2, b'b', b'b', 9, // ("bb", 9) - ], - ); - } - - #[test] - fn uuid_adapter_emits_16_bytes_with_halves_reversed() { - use uuid::Uuid; - // 11223344-5566-7788-99aa-bbccddeeff00 - let id = Uuid::from_bytes([ - 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, - 0xff, 0x00, - ]); - #[derive(Serialize)] - struct W { - #[serde(with = "super::super::uuid")] - id: Uuid, - } - let out = encode(&W { id }); - assert_eq!( - out, - vec![ - 0x88, 0x77, 0x66, 0x55, 0x44, 0x33, 0x22, 0x11, // first half reversed - 0x00, 0xff, 0xee, 0xdd, 0xcc, 0xbb, 0xaa, 0x99, // second half reversed - ], - ); - } - - #[test] - fn struct_fields_serialize_in_declaration_order() { - #[derive(Serialize)] - struct S { - a: u8, - b: u16, - c: bool, - } - let out = encode(&S { - a: 1, - b: 2, - c: true, - }); - assert_eq!(out, vec![1, 2, 0, 1]); - } -} diff --git a/rust_snuba/src/strategies/clickhouse/writer_v2.rs b/rust_snuba/src/strategies/clickhouse/writer_v2.rs index 04c7ae65dd1..14f9f29dd3a 100644 --- a/rust_snuba/src/strategies/clickhouse/writer_v2.rs +++ b/rust_snuba/src/strategies/clickhouse/writer_v2.rs @@ -74,34 +74,10 @@ pub struct ClickhouseWriterStep { inner: RunTaskInThreads, BytesInsertBatch<()>, anyhow::Error, N>, } -/// Wire format the writer step posts to ClickHouse with. Picking the right -/// value here is the only thing the consumer needs to know — the pipeline -/// shape, batching, and retry behavior are identical across formats. -#[derive(Clone, Copy, Debug)] -pub enum InsertFormat { - JsonEachRow, - RowBinary, -} - -impl InsertFormat { - fn as_str(self) -> &'static str { - match self { - InsertFormat::JsonEachRow => "JSONEachRow", - InsertFormat::RowBinary => "RowBinary", - } - } -} - impl ClickhouseWriterStep where N: ProcessingStrategy> + 'static, { - /// `columns`: if `Some`, expand into the SQL as - /// `INSERT INTO {table} (col1, col2, ...) FORMAT ...`. Required for - /// `RowBinary`, which would otherwise fall back to the table's positional - /// column order — a footgun whenever wire order and table order diverge. - /// For `JSONEachRow`, pass `None` to preserve historical behavior. - #[allow(clippy::too_many_arguments)] pub fn new( next_step: N, cluster_config: ClickhouseConfig, @@ -109,8 +85,6 @@ where skip_write: bool, concurrency: &ConcurrencyConfig, storage_name: String, - format: InsertFormat, - columns: Option<&'static [&'static str]>, ) -> Self { let inner = RunTaskInThreads::new( next_step, @@ -119,8 +93,6 @@ where &cluster_config.clone(), &table, storage_name, - format, - columns, )), skip_write, ), @@ -182,13 +154,7 @@ pub struct ClickhouseClient { } impl ClickhouseClient { - pub fn new( - config: &ClickhouseConfig, - table: &str, - storage_name: String, - format: InsertFormat, - columns: Option<&[&str]>, - ) -> ClickhouseClient { + pub fn new(config: &ClickhouseConfig, table: &str, storage_name: String) -> ClickhouseClient { let mut headers = HeaderMap::with_capacity(6); headers.insert(CONNECTION, HeaderValue::from_static("keep-alive")); headers.insert(ACCEPT_ENCODING, HeaderValue::from_static("gzip,deflate")); @@ -214,21 +180,8 @@ impl ClickhouseClient { // the same wire format `clickhouse-rs` used and what // `clickhouse-compressor` produces. Distinct from HTTP-standard // `Content-Encoding: lz4`, which would need `enable_http_compression=1`. - let mut base_url = - format!("{scheme}://{host}:{port}?insert_distributed_sync=1&decompress=1"); - if matches!(format, InsertFormat::RowBinary) { - // RowBinary cannot represent JSON values natively; tell ClickHouse - // to treat any binary string targeting a JSON column as JSON text. - base_url.push_str("&input_format_binary_read_json_as_string=1"); - } - let columns_clause = match columns { - Some(cols) => format!(" ({})", cols.join(", ")), - None => String::new(), - }; - let query = format!( - "INSERT INTO {table}{columns_clause} FORMAT {fmt}", - fmt = format.as_str(), - ); + let base_url = format!("{scheme}://{host}:{port}?insert_distributed_sync=1&decompress=1"); + let query = format!("INSERT INTO {table} FORMAT JSONEachRow"); ClickhouseClient { client: Client::new(), @@ -442,13 +395,7 @@ mod tests { crate::testutils::initialize_python(); let config = make_test_config(); println!("config: {config:?}"); - let client = ClickhouseClient::new( - &config, - "querylog_local", - "test_storage".to_string(), - InsertFormat::JsonEachRow, - None, - ); + let client = ClickhouseClient::new(&config, "querylog_local", "test_storage".to_string()); let url = client.build_url(); assert!(url.contains("load_balancing=in_order")); @@ -464,13 +411,7 @@ mod tests { fn test_url_with_runtime_config_override() { crate::testutils::initialize_python(); let config = make_test_config(); - let client = ClickhouseClient::new( - &config, - "test_table", - "writer_v2_lb_test".to_string(), - InsertFormat::JsonEachRow, - None, - ); + let client = ClickhouseClient::new(&config, "test_table", "writer_v2_lb_test".to_string()); // Default: in_order let url = client.build_url(); @@ -500,8 +441,6 @@ mod tests { &config, "test_table", "writer_v2_block_size_test".to_string(), - InsertFormat::JsonEachRow, - None, ); // Default (key absent): no suffix. @@ -517,13 +456,8 @@ mod tests { assert!(url.contains("&max_insert_block_size=2000000")); // A different storage isn't affected. - let other_client = ClickhouseClient::new( - &config, - "test_table", - "writer_v2_other_storage".to_string(), - InsertFormat::JsonEachRow, - None, - ); + let other_client = + ClickhouseClient::new(&config, "test_table", "writer_v2_other_storage".to_string()); let url = other_client.build_url(); assert!(!url.contains("max_insert_block_size")); @@ -638,13 +572,7 @@ mod tests { database: "default".to_string(), }; - let client = ClickhouseClient::new( - &config, - "test_table", - "test_storage".to_string(), - InsertFormat::JsonEachRow, - None, - ); + let client = ClickhouseClient::new(&config, "test_table", "test_storage".to_string()); let start_time = Instant::now(); let result = client diff --git a/rust_snuba/src/strategies/processor.rs b/rust_snuba/src/strategies/processor.rs index 02365873d0a..56f7ffd0a4e 100644 --- a/rust_snuba/src/strategies/processor.rs +++ b/rust_snuba/src/strategies/processor.rs @@ -20,7 +20,7 @@ use crate::processors::utils::SilencedDLQMessage; use crate::processors::{ProcessingFunction, ProcessingFunctionWithReplacements}; use crate::types::{ BytesInsertBatch, CommitLogEntry, CommitLogOffsets, InsertBatch, InsertOrReplacement, - KafkaMessageMetadata, RowData, + KafkaMessageMetadata, RowData, TypedInsertBatch, }; use tokio::time::Instant; @@ -98,6 +98,95 @@ pub fn make_rust_processor( )) } +/// Build the next-step message for a *typed*-row processor (e.g. the +/// `clickhouse`-crate inserter path). Mirrors the commit-log / latency / cogs +/// wiring of `make_rust_processor`'s `result_to_next_msg`, but carries the +/// typed row downstream (as `Option`; `None` == skip) instead of bytes. +fn typed_result_to_next_msg( + transformed: TypedInsertBatch, + partition: Partition, + offset: u64, + timestamp: DateTime, + stop_at_timestamp: Option, +) -> anyhow::Result>>> { + // Don't process any more messages + if let Some(stop) = stop_at_timestamp { + if stop < timestamp.timestamp() { + let payload = BytesInsertBatch::default(); + return Ok(Message::new_broker_message( + payload, partition, offset, timestamp, + )); + } + } + + let num_bytes = transformed.num_bytes; + let mut payload = BytesInsertBatch::from_rows(transformed.row) + .with_num_bytes(num_bytes) + .with_message_timestamp(timestamp) + .with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([( + partition.index, + CommitLogEntry { + offset: offset + 1, + orig_message_ts: timestamp, + received_p99: transformed.origin_timestamp.into_iter().collect(), + }, + )]))) + .with_cogs_data(transformed.cogs_data.unwrap_or_default()); + + if let Some(ts) = transformed.origin_timestamp { + payload = payload.with_origin_timestamp(ts); + } + if let Some(ts) = transformed.sentry_received_timestamp { + payload = payload.with_sentry_received_timestamp(ts); + } + if let Some(metrics) = transformed.item_type_metrics { + payload = payload.with_item_type_metrics(metrics); + } + + Ok(Message::new_broker_message( + payload, partition, offset, timestamp, + )) +} + +/// Like [`make_rust_processor`], but the processing function returns a typed +/// row ([`TypedInsertBatch`]) that is handed downstream verbatim (wrapped in +/// `BytesInsertBatch>`) for a sink that serializes it itself — e.g. +/// the `clickhouse`-crate inserter for `eap_items`. +pub fn make_rust_processor_typed( + next_step: impl ProcessingStrategy>> + 'static, + func: fn( + KafkaPayload, + KafkaMessageMetadata, + config: &ProcessorConfig, + ) -> anyhow::Result>, + schema_name: &str, + enforce_schema: bool, + concurrency: &ConcurrencyConfig, + processor_config: ProcessorConfig, + stop_at_timestamp: Option, +) -> Box> +where + R: Clone + Send + Sync + 'static, +{ + let schema = get_schema(schema_name, enforce_schema); + + let task_runner = MessageProcessor { + schema, + enforce_schema, + func, + result_to_next_msg: typed_result_to_next_msg::, + processor_config, + stop_at_timestamp, + }; + + Box::new(RunTaskInThreads::new( + next_step, + task_runner, + concurrency, + Some("process_message"), + )) +} + pub fn make_rust_processor_with_replacements( next_step: impl ProcessingStrategy>> + 'static, func: ProcessingFunctionWithReplacements, diff --git a/rust_snuba/src/types.rs b/rust_snuba/src/types.rs index b84b0e5fe7f..3283d5ad4f2 100644 --- a/rust_snuba/src/types.rs +++ b/rust_snuba/src/types.rs @@ -250,6 +250,39 @@ impl InsertBatch { } } +/// The return value of processors that hand a *typed* row to a downstream sink +/// that serializes it itself (e.g. the `clickhouse`-crate inserter path for +/// `eap_items`), instead of pre-encoding bytes like [`InsertBatch`]. +/// +/// `row == None` models a skip/stop: the message still carries offsets + +/// metadata so they advance, but there is no row to insert. +#[derive(Clone, Debug)] +pub struct TypedInsertBatch { + pub row: Option, + pub origin_timestamp: Option>, + pub sentry_received_timestamp: Option>, + pub cogs_data: Option, + pub item_type_metrics: Option, + /// Best-effort encoded-size estimate used for byte-based batch sizing + /// before the row is serialized (the sink also has the exact size after a + /// flush). The eap_items processor uses the source proto's `payload.len()`. + pub num_bytes: usize, +} + +impl TypedInsertBatch { + /// A skipped/empty message: no row, no metrics, zero bytes. + pub fn skip() -> Self { + Self { + row: None, + origin_timestamp: None, + sentry_received_timestamp: None, + cogs_data: None, + item_type_metrics: None, + num_bytes: 0, + } + } +} + #[derive(Clone, Debug, Default)] pub struct BytesInsertBatch { pub rows: R, @@ -416,6 +449,22 @@ impl BytesInsertBatch { (self.rows, new) } + /// Merge another batch's *metadata* (everything except `rows`) into this + /// one. Used by the inserter sink, which streams rows into a long-lived + /// `clickhouse::Inserter` and only needs to accumulate the metadata for the + /// flush boundary — mirrors the metadata half of + /// [`BytesInsertBatch::::merge`]. + pub fn merge_metadata(&mut self, other: BytesInsertBatch<()>) { + self.num_bytes += other.num_bytes; + self.commit_log_offsets.merge(other.commit_log_offsets); + self.message_timestamp.merge(other.message_timestamp); + self.origin_timestamp.merge(other.origin_timestamp); + self.sentry_received_timestamp + .merge(other.sentry_received_timestamp); + self.cogs_data.merge(other.cogs_data); + self.item_type_metrics.merge(other.item_type_metrics); + } + pub fn record_message_latency(&self) { let write_time = Utc::now(); From 125884eda28b90520841c1873965cb78078b7f7a Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 18 Jun 2026 12:35:22 +0000 Subject: [PATCH 2/7] fix(eap-items): fail-stop the inserter actor and bound join timeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address PR review findings: - Actor write-ahead past failure (Bugbot, high): on a write/commit error the actor previously rebuilt the inserter and kept consuming queued rows. If a later window flushed durably while the strategy had already failed the consumer on the first error, those rows landed in ClickHouse with their offsets uncommitted → duplicates on replay. The actor now fail-stops (surfaces the error and returns), matching the old writer's fail-stop + replay model; the in-flight window never flushed, so it replays cleanly. - join() timeout doubling (Seer, medium): the shutdown wait loop could consume the full timeout and then pass the original timeout to next_step.join(), blocking up to 2x the caller's budget. Pass the remaining time instead. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf --- .../strategies/clickhouse/inserter_sink.rs | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/rust_snuba/src/strategies/clickhouse/inserter_sink.rs b/rust_snuba/src/strategies/clickhouse/inserter_sink.rs index 2c2d19c1269..287e439181c 100644 --- a/rust_snuba/src/strategies/clickhouse/inserter_sink.rs +++ b/rust_snuba/src/strategies/clickhouse/inserter_sink.rs @@ -192,19 +192,15 @@ async fn run_inserter_actor( macro_rules! on_flush_err { ($e:expr) => {{ + // Fail-stop: surface the error and stop the actor immediately. We do + // NOT keep consuming/inserting later rows. The strategy fails the + // consumer on this error (→ restart + replay from the last committed + // offset), so any rows written ahead would be durably inserted but + // have their offsets uncommitted, producing duplicates on replay. + // The in-flight window never flushed successfully, so it replays + // cleanly. This matches the old writer's fail-stop + replay model. let _ = out_tx.send(FlushOutcome::Err($e.to_string())); - // `write` panics if reused after an error — rebuild, and drop the - // doomed window so its offsets are never pushed (→ replay). - inserter = make_inserter( - &client, - &table, - &storage_name, - max_rows, - max_bytes, - max_batch_time, - ); - acc = Acc::default(); - skip_deadline = Deadline::new(max_batch_time); + return; }}; } @@ -220,7 +216,6 @@ async fn run_inserter_actor( let start = SystemTime::now(); if let Err(e) = inserter.write(&*row).await { on_flush_err!(e); - continue; } match inserter.commit().await { Ok(q) if q.rows > 0 => { @@ -489,7 +484,10 @@ where std::thread::sleep(Duration::from_millis(1)); } - let next_commit = self.next_step.join(timeout)?; + // Pass the *remaining* time, not the original timeout — the wait loop + // above may already have consumed part of it, so reusing `timeout` here + // could block for up to 2x the caller's budget. + let next_commit = self.next_step.join(deadline.map(|d| d.remaining()))?; Ok(merge_commit_request( self.commit_request_carried_over.take(), next_commit, From 11ecadab77afbae3894cbd513192d9f16aee4df0 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 18 Jun 2026 12:46:14 +0000 Subject: [PATCH 3/7] feat(eap-items): retry transient insert errors with backoff MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Re-add the resilience the old JSON writer had (RetryConfig: jittered exponential backoff, 4 retries) to the clickhouse-crate inserter path, without an unbounded memory cost: - The actor retains the current unflushed window's rows (bounded by max_batch_size) until that window's flush succeeds. - On a write/commit error the inserter is poisoned; the actor replays the retained window through a fresh inserter via flush_window_with_retry (write all rows + end()), with backoff, before failing. On success it resets the main inserter, emits the window's offsets, and continues. - If retries are exhausted, it fail-stops (offsets never pushed → replay), matching the old fail-stop-after-retries model. Peak memory is now one batch of typed rows plus the inserter's byte buffer (the accepted tradeoff for retry), still bounded by max_batch_size, not by total row count. Also: clarify in submit() why an unreserved/closed channel is safe (try_reserve rejects a closed channel; a dropped row's offset is never committed, so it replays). The skip-only offset-advance guard is preserved as `acc.rows.is_empty()` — it must not emit while real rows are buffered but unflushed, since their offsets are lower than later skips and committing past them would lose data. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf --- .../strategies/clickhouse/inserter_sink.rs | 242 ++++++++++++++---- 1 file changed, 196 insertions(+), 46 deletions(-) diff --git a/rust_snuba/src/strategies/clickhouse/inserter_sink.rs b/rust_snuba/src/strategies/clickhouse/inserter_sink.rs index 287e439181c..4307ccbe832 100644 --- a/rust_snuba/src/strategies/clickhouse/inserter_sink.rs +++ b/rust_snuba/src/strategies/clickhouse/inserter_sink.rs @@ -5,18 +5,20 @@ //! message (wrapped in `BytesInsertBatch>`; `None` == a //! skipped/empty message that still carries offsets). A long-lived actor task //! owns one [`clickhouse::inserter::Inserter`] and writes each row the moment -//! it arrives — the wide struct is serialized into the inserter's byte buffer -//! and dropped immediately, so peak memory stays bounded by the buffer, not by -//! row count. +//! it arrives. To allow retrying transient insert errors, the rows of the +//! current unflushed window are retained until that window's flush succeeds, so +//! peak memory is bounded by one batch (`max_batch_size`) of rows plus the +//! inserter's serialized byte buffer — not by total row count. //! //! The inserter owns the flush boundary: it is configured with our batch //! settings (`with_max_rows`/`with_max_bytes` + `with_period`), and we drive it //! with [`Inserter::commit`] (never `force_commit`). When `commit()` reports a //! real flush (`Quantities.rows > 0`), the rows it flushed are durably in //! ClickHouse; only then do we push *their* Kafka offsets downstream toward -//! `CommitOffsets`. A failed flush is never pushed, so its offsets are not -//! committed and the batch replays on restart — the same durability barrier the -//! old writer had. +//! `CommitOffsets`. On a flush error the retained window is replayed through a +//! fresh inserter with exponential backoff; if retries are exhausted the actor +//! fail-stops, the offsets are never pushed, and the batch replays on restart — +//! the same durability barrier (plus retry) the old writer had. use std::collections::BTreeMap; use std::time::{Duration, SystemTime}; @@ -31,18 +33,25 @@ use sentry_arroyo::utils::timing::Deadline; use sentry_arroyo::{counter, timer}; use tokio::sync::mpsc; +use clickhouse::inserter::Quantities; + use crate::config::{BatchSizeCalculation, ClickhouseConfig}; use crate::processors::eap_items::EAPItemRow; use crate::runtime_config::{get_load_balancing_config, get_max_insert_block_size}; use crate::types::BytesInsertBatch; /// Upper bound on typed rows in flight between `submit` and the actor. Kept -/// small (and independent of `max_batch_size`) so the channel never becomes a -/// `Vec` accumulator — the inserter's byte buffer is the only place -/// a whole batch lives. When full, `submit` returns `MessageRejected` and -/// arroyo back-pressures upstream. +/// small (independent of `max_batch_size`) so the channel itself isn't an +/// unbounded backlog — when full, `submit` returns `MessageRejected` and arroyo +/// back-pressures upstream. (The current window's rows are retained in the +/// actor for retry; that buffer is bounded by `max_batch_size`.) const WORK_CHANNEL_CAPACITY: usize = 1024; +/// Insert retry policy for transient ClickHouse/network errors, mirroring the +/// old JSON writer's `RetryConfig` (jittered exponential backoff). +const MAX_INSERT_RETRIES: usize = 4; +const INITIAL_RETRY_BACKOFF_MS: u64 = 500; + /// Work handed from the strategy (sync) to the actor (async), in arrival order. enum WorkItem { Row { @@ -77,13 +86,15 @@ enum FlushOutcome { Err(String), } -/// Metadata accumulated for the current unflushed window. Rows themselves live -/// only in the inserter's byte buffer. +/// The current unflushed window. To support retrying transient insert errors, +/// the window's rows are retained here (bounded by `max_batch_size`) until its +/// flush succeeds — the only place a batch of typed rows lives, in addition to +/// the inserter's serialized byte buffer. #[derive(Default)] struct Acc { + rows: Vec, committable: BTreeMap, meta: BytesInsertBatch<()>, - rows_written: usize, } impl Acc { @@ -100,7 +111,8 @@ impl Acc { } } -/// Take the accumulated window and send it downstream as a ready flush. +/// Push the accumulated window downstream as a ready flush, then reset the +/// accumulator — the rows are now durable. fn emit_ready( out_tx: &mpsc::UnboundedSender, acc: &mut Acc, @@ -108,10 +120,12 @@ fn emit_ready( rows: u64, elapsed: Duration, ) { - let taken = std::mem::take(acc); + acc.rows.clear(); + let committable = std::mem::take(&mut acc.committable); + let meta = std::mem::take(&mut acc.meta); let _ = out_tx.send(FlushOutcome::Ready(Box::new(ReadyFlush { - committable: taken.committable, - meta: taken.meta, + committable, + meta, bytes, rows, elapsed, @@ -155,6 +169,79 @@ fn make_inserter( inserter } +/// Durably insert `rows` as a single INSERT, retrying transient failures with +/// jittered exponential backoff. Used to recover a window after the long-lived +/// inserter hit a write/commit error (which poisons it). Returns the inserted +/// `Quantities` on success, or the last error string after exhausting retries. +#[allow(clippy::too_many_arguments)] +async fn flush_window_with_retry( + client: &clickhouse::Client, + table: &str, + storage_name: &str, + max_rows: u64, + max_bytes: u64, + max_batch_time: Duration, + rows: &[EAPItemRow], +) -> Result { + if rows.is_empty() { + return Ok(Quantities::ZERO); + } + let mut backoff = Duration::from_millis(INITIAL_RETRY_BACKOFF_MS); + let mut last_err = String::from("unknown error"); + for attempt in 0..=MAX_INSERT_RETRIES { + // Fresh inserter each attempt; write the whole window then force-flush + // via end(). We never call commit() here, so size/period thresholds are + // irrelevant — end() flushes all buffered rows as one INSERT. + let mut inserter = make_inserter( + client, + table, + storage_name, + max_rows, + max_bytes, + max_batch_time, + ); + let mut write_err: Option = None; + for row in rows { + if let Err(e) = inserter.write(row).await { + write_err = Some(e.to_string()); + break; + } + } + match write_err { + None => match inserter.end().await { + Ok(quantities) => return Ok(quantities), + Err(e) => last_err = e.to_string(), + }, + Some(e) => last_err = e, + } + if attempt < MAX_INSERT_RETRIES { + counter!( + "rust_consumer.clickhouse_insert_error", + 1, + "status" => "insert_error", + "retried" => "true" + ); + tracing::warn!( + "ClickHouse insert failed (attempt {}/{}): {}", + attempt + 1, + MAX_INSERT_RETRIES + 1, + last_err + ); + // ±10% jitter so consumers don't retry in lockstep. + let jitter = 1.0 + (rand::random::() * 0.2 - 0.1); + tokio::time::sleep(backoff.mul_f64(jitter)).await; + backoff = backoff.saturating_mul(2); + } + } + counter!( + "rust_consumer.clickhouse_insert_error", + 1, + "status" => "insert_error", + "retried" => "false" + ); + Err(last_err) +} + #[allow(clippy::too_many_arguments)] async fn run_inserter_actor( config: ClickhouseConfig, @@ -190,17 +277,50 @@ async fn run_inserter_actor( // downstream message every tick when no rows are flowing. let mut skip_deadline = Deadline::new(max_batch_time); - macro_rules! on_flush_err { - ($e:expr) => {{ - // Fail-stop: surface the error and stop the actor immediately. We do - // NOT keep consuming/inserting later rows. The strategy fails the - // consumer on this error (→ restart + replay from the last committed - // offset), so any rows written ahead would be durably inserted but - // have their offsets uncommitted, producing duplicates on replay. - // The in-flight window never flushed successfully, so it replays - // cleanly. This matches the old writer's fail-stop + replay model. - let _ = out_tx.send(FlushOutcome::Err($e.to_string())); - return; + // On a write/commit error the long-lived inserter is poisoned. Replay the + // retained window through a fresh inserter with backoff; on success reset + // the main inserter, emit the offsets, and carry on. If retries are + // exhausted, fail-stop: surface the error and return (the consumer restarts + // and replays from the last committed offset). We deliberately do NOT keep + // consuming later rows on an unrecoverable error — they would be durably + // inserted with uncommitted offsets, producing duplicates on replay. + macro_rules! recover_or_fail { + ($start:expr) => {{ + match flush_window_with_retry( + &client, + &table, + &storage_name, + max_rows, + max_bytes, + max_batch_time, + &acc.rows, + ) + .await + { + Ok(quantities) => { + inserter = make_inserter( + &client, + &table, + &storage_name, + max_rows, + max_bytes, + max_batch_time, + ); + let elapsed = $start.elapsed().unwrap_or_default(); + emit_ready( + &out_tx, + &mut acc, + quantities.bytes, + quantities.rows, + elapsed, + ); + skip_deadline = Deadline::new(max_batch_time); + } + Err(e) => { + let _ = out_tx.send(FlushOutcome::Err(e)); + return; + } + } }}; } @@ -209,13 +329,15 @@ async fn run_inserter_actor( maybe = work_rx.recv() => match maybe { Some(WorkItem::Row { row, committable, meta }) => { acc.merge(committable, meta); - acc.rows_written += 1; if skip_write { continue; } + // Retain the row for retry, then write it from the buffer. + acc.rows.push(*row); let start = SystemTime::now(); - if let Err(e) = inserter.write(&*row).await { - on_flush_err!(e); + if inserter.write(acc.rows.last().unwrap()).await.is_err() { + recover_or_fail!(start); + continue; } match inserter.commit().await { Ok(q) if q.rows > 0 => { @@ -224,7 +346,7 @@ async fn run_inserter_actor( skip_deadline = Deadline::new(max_batch_time); } Ok(_) => {} - Err(e) => on_flush_err!(e), + Err(_) => recover_or_fail!(start), } } Some(WorkItem::Skip { committable, meta }) => acc.merge(committable, meta), @@ -233,8 +355,7 @@ async fn run_inserter_actor( _ = interval.tick() => { if skip_write { if acc.has_offsets() && skip_deadline.has_elapsed() { - let rows = acc.rows_written as u64; - emit_ready(&out_tx, &mut acc, 0, rows, Duration::ZERO); + emit_ready(&out_tx, &mut acc, 0, 0, Duration::ZERO); skip_deadline = Deadline::new(max_batch_time); } continue; @@ -247,37 +368,60 @@ async fn run_inserter_actor( skip_deadline = Deadline::new(max_batch_time); } Ok(_) => { - // Period elapsed with nothing to flush (skip-only - // window): advance offsets so the consumer doesn't stall. - if acc.has_offsets() && acc.rows_written == 0 && skip_deadline.has_elapsed() { + // Period elapsed with no buffered rows (skip-only window): + // advance offsets so the consumer doesn't stall. We must + // NOT emit while real rows are buffered-but-unflushed — + // their offsets are lower than later skips, so committing + // past them would lose data. + if acc.has_offsets() && acc.rows.is_empty() && skip_deadline.has_elapsed() { emit_ready(&out_tx, &mut acc, 0, 0, Duration::ZERO); skip_deadline = Deadline::new(max_batch_time); } } - Err(e) => on_flush_err!(e), + Err(_) => recover_or_fail!(start), } } } } - // Finalize: flush the last partial window. + // Finalize: flush the last partial window (with the same retry on error). if skip_write { if acc.has_offsets() { - let rows = acc.rows_written as u64; - emit_ready(&out_tx, &mut acc, 0, rows, Duration::ZERO); + emit_ready(&out_tx, &mut acc, 0, 0, Duration::ZERO); } return; } let start = SystemTime::now(); - match inserter.end().await { - Ok(q) => { + let final_result = match inserter.end().await { + Ok(quantities) => Ok(quantities), + Err(_) => { + flush_window_with_retry( + &client, + &table, + &storage_name, + max_rows, + max_bytes, + max_batch_time, + &acc.rows, + ) + .await + } + }; + match final_result { + Ok(quantities) => { if acc.has_offsets() { let elapsed = start.elapsed().unwrap_or_default(); - emit_ready(&out_tx, &mut acc, q.bytes, q.rows, elapsed); + emit_ready( + &out_tx, + &mut acc, + quantities.bytes, + quantities.rows, + elapsed, + ); } } Err(e) => { - let _ = out_tx.send(FlushOutcome::Err(e.to_string())); + let _ = out_tx.send(FlushOutcome::Err(e)); } } } @@ -427,7 +571,13 @@ where let Some(work_tx) = self.work_tx.as_ref() else { return Err(SubmitError::MessageRejected(MessageRejected { message })); }; - // Reserve a slot first so we never destructure `message` on the reject path. + // Reserve a slot first so we never destructure `message` on the reject + // path. `try_reserve` also fails (→ MessageRejected) if the actor exited + // and closed the channel, so we never silently accept a row the actor + // can't receive. `Permit::send` is infallible once reserved; and even in + // the narrow race where the actor exits between reserve and send, the + // dropped row's offset is never committed (offsets only advance via the + // downstream flushes we emit), so it simply replays. let Ok(permit) = work_tx.try_reserve() else { return Err(SubmitError::MessageRejected(MessageRejected { message })); }; From 68f0c3cf3f61dbf2033a9513b82535d257d77c94 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 18 Jun 2026 12:52:25 +0000 Subject: [PATCH 4/7] fix(eap-items): assert flush count equals retained window in emit_ready Address a PR review finding (Bugbot, high) about potentially committing offsets past not-yet-durable rows. In our usage this can't happen: we commit() after every single write(), and the crate flushes the entire current INSERT (never a strict subset), so a flush always covers exactly the retained window and emit_ready only advances offsets for durable rows. Make that invariant explicit and enforced with a debug_assert that the flushed row count equals acc.rows.len(), so a future change (e.g. batching multiple writes before a commit) can't silently regress into committing ahead. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf --- .../src/strategies/clickhouse/inserter_sink.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/rust_snuba/src/strategies/clickhouse/inserter_sink.rs b/rust_snuba/src/strategies/clickhouse/inserter_sink.rs index 4307ccbe832..74aff8a426c 100644 --- a/rust_snuba/src/strategies/clickhouse/inserter_sink.rs +++ b/rust_snuba/src/strategies/clickhouse/inserter_sink.rs @@ -113,6 +113,15 @@ impl Acc { /// Push the accumulated window downstream as a ready flush, then reset the /// accumulator — the rows are now durable. +/// +/// Invariant: `rows` (the count actually flushed) must equal the retained +/// window `acc.rows.len()`. We hold it because we `commit()` after every single +/// `write()`, and the crate's flush ends the *entire* current INSERT (it never +/// flushes a strict subset) — so a flush always covers exactly the retained +/// window, and emitting all of `acc.committable` only ever advances offsets for +/// rows that are now durable. The assert guards against a future change (e.g. +/// batching multiple writes before a commit) silently breaking that and +/// committing offsets past not-yet-durable rows. fn emit_ready( out_tx: &mpsc::UnboundedSender, acc: &mut Acc, @@ -120,6 +129,12 @@ fn emit_ready( rows: u64, elapsed: Duration, ) { + debug_assert_eq!( + rows as usize, + acc.rows.len(), + "flush count must equal the retained window; committing more offsets \ + than were made durable would lose data", + ); acc.rows.clear(); let committable = std::mem::take(&mut acc.committable); let meta = std::mem::take(&mut acc.meta); From ab7d32457d7ee55037001fa1b3627f14bdf64b4c Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 18 Jun 2026 13:09:27 +0000 Subject: [PATCH 5/7] fix(eap-items): drain late flush outcomes on join timeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address a PR review finding (Bugbot, medium): if join() hit its deadline while the actor was still finalizing, a late FlushOutcome::Ready was never drained, so rows could be durable in ClickHouse with offsets uncommitted → duplicate on replay. On timeout, abort the actor (cancelling an in-flight INSERT that hasn't landed, so that window simply replays rather than committing rows we'll never ack), then drain outcomes one final time so any Ready produced right around the deadline still has its offsets pushed. A flush that already landed server-side before the abort replays as a duplicate — the same at-least-once shutdown exposure the old writer had, to be closed by the (deferred) insert_deduplication_token. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf --- .../strategies/clickhouse/inserter_sink.rs | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/rust_snuba/src/strategies/clickhouse/inserter_sink.rs b/rust_snuba/src/strategies/clickhouse/inserter_sink.rs index 74aff8a426c..76431957a6c 100644 --- a/rust_snuba/src/strategies/clickhouse/inserter_sink.rs +++ b/rust_snuba/src/strategies/clickhouse/inserter_sink.rs @@ -627,6 +627,7 @@ where self.work_tx = None; let deadline = timeout.map(Deadline::new); + let mut timed_out = false; loop { let commit_request = self.next_step.poll()?; self.commit_request_carried_over = @@ -639,9 +640,7 @@ where } if let Some(deadline) = &deadline { if deadline.has_elapsed() { - tracing::warn!( - "inserter sink join timed out; partial batch left unacked (will replay)" - ); + timed_out = true; break; } } @@ -649,6 +648,25 @@ where std::thread::sleep(Duration::from_millis(1)); } + if timed_out { + // Stop the actor so it can't durably flush a window whose offsets we + // can no longer observe within the deadline: aborting cancels an + // in-flight INSERT that hasn't landed (→ that window simply replays) + // rather than leaving it running to commit rows we'll never ack. A + // flush that already landed server-side replays as a duplicate — + // the same at-least-once shutdown exposure the old writer had, to be + // closed by the (deferred) insert_deduplication_token. + tracing::warn!( + "inserter sink join timed out; aborting actor, partial batch will replay" + ); + if let Some(handle) = self.actor.take() { + handle.abort(); + } + } + // Final drain: emit any `Ready` the actor produced right around the + // deadline so its offsets are committed instead of replaying. + self.drain_outcomes()?; + // Pass the *remaining* time, not the original timeout — the wait loop // above may already have consumed part of it, so reusing `timeout` here // could block for up to 2x the caller's budget. From 411c894bb648c8e3979f62e799efdb32a08e3461 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 18 Jun 2026 13:13:19 +0000 Subject: [PATCH 6/7] fix(eap-items): disable crate validation for the JSON attributes_array column CI's live-ClickHouse test_rust job failed: Code: 117 ... Type of 'attributes_array' must be JSON(max_dynamic_paths=128), not JSON ... While executing BinaryRowInputFormat. (INCORRECT_DATA) `eap_items.attributes_array` is a native JSON(max_dynamic_paths=N) column. Under RowBinaryWithNamesAndTypes the server validates the client's names+types header, and our `String` field can't satisfy a parametrized JSON column (no Rust type maps to it). The old hand-rolled writer avoided this by sending plain RowBinary + input_format_binary_read_json_as_string=1 (write the JSON as a string, let ClickHouse parse it into the JSON column). Set Client::with_validation(false) so the crate uses plain RowBinary, matching the old wire behavior. The crate still emits the column list (from the Row derive) in the INSERT, so column mapping stays correct without a hand-rolled list. Trade-off: this drops RowBinaryWithNamesAndTypes schema validation (it is incompatible with the JSON column, not merely an optimization). Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf --- rust_snuba/src/processors/eap_items.rs | 9 ++++++--- .../src/strategies/clickhouse/inserter_sink.rs | 15 +++++++++++++++ 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/rust_snuba/src/processors/eap_items.rs b/rust_snuba/src/processors/eap_items.rs index 1e0e4917a9a..348d106da2a 100644 --- a/rust_snuba/src/processors/eap_items.rs +++ b/rust_snuba/src/processors/eap_items.rs @@ -1646,9 +1646,11 @@ mod tests { }; // Production path: build the typed row, then insert it through the - // official clickhouse-crate inserter (RowBinaryWithNamesAndTypes + - // schema validation). Exercises the real EAPItemRow → live-schema - // mapping (UUID, Map columns, DateTime, UInt128, …). + // official clickhouse-crate inserter. Exercises the real EAPItemRow → + // live-schema mapping (UUID, Map columns, DateTime, UInt128, the + // JSON attributes_array column, …). Validation is off (plain RowBinary + // + input_format_binary_read_json_as_string) — see build_client in + // inserter_sink.rs for why the JSON column requires this. let batch = process_message_eap_row(payload, meta, &ProcessorConfig::default()) .expect("The message should be processed"); let row = batch.row.expect("row should be present"); @@ -1658,6 +1660,7 @@ mod tests { .with_user(user) .with_password(password) .with_database(database.clone()) + .with_validation(false) .with_setting("input_format_binary_read_json_as_string", "1"); let mut inserter = client .inserter::("eap_items_1_local") diff --git a/rust_snuba/src/strategies/clickhouse/inserter_sink.rs b/rust_snuba/src/strategies/clickhouse/inserter_sink.rs index 76431957a6c..7a66260c637 100644 --- a/rust_snuba/src/strategies/clickhouse/inserter_sink.rs +++ b/rust_snuba/src/strategies/clickhouse/inserter_sink.rs @@ -10,6 +10,11 @@ //! peak memory is bounded by one batch (`max_batch_size`) of rows plus the //! inserter's serialized byte buffer — not by total row count. //! +//! Wire format is plain `RowBinary` (crate validation is off — see +//! `build_client`: the `attributes_array` JSON column can't be sent under +//! `RowBinaryWithNamesAndTypes`). The crate still emits the column list from the +//! `Row` derive in the INSERT, so column mapping is correct. +//! //! The inserter owns the flush boundary: it is configured with our batch //! settings (`with_max_rows`/`with_max_bytes` + `with_period`), and we drive it //! with [`Inserter::commit`] (never `force_commit`). When `commit()` reports a @@ -154,6 +159,16 @@ fn build_client(cfg: &ClickhouseConfig) -> clickhouse::Client { .with_user(cfg.user.clone()) .with_password(cfg.password.clone()) .with_database(cfg.database.clone()) + // Validation OFF → plain `RowBinary` (no names+types header). The + // `eap_items.attributes_array` column is a native `JSON(max_dynamic_paths=N)` + // type; under `RowBinaryWithNamesAndTypes` the server validates the + // header types and rejects our `String` field for that column + // ("Type of 'attributes_array' must be JSON(...)"). Plain RowBinary plus + // `input_format_binary_read_json_as_string=1` (below) writes the JSON as + // a string and lets ClickHouse parse it into the JSON column — exactly + // what the old hand-rolled writer did. The crate still emits the column + // list (from the `Row` derive) in the INSERT, so mapping stays correct. + .with_validation(false) // Mirror the old writer's URL params: synchronous distributed insert and // "treat binary strings bound for JSON columns as JSON text". .with_setting("insert_distributed_sync", "1") From e80e8373bf0438f747b4f7d95316d881ab258972 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 18 Jun 2026 13:15:47 +0000 Subject: [PATCH 7/7] fix(eap-items): enable native-tls so secure ClickHouse clusters work MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address a PR review finding (Bugbot, medium): the clickhouse crate was built with default-features = false and no TLS backend, so build_client's https URL (used when ClickhouseConfig.secure is true) had no TLS stack — row-binary inserts to secure clusters would fail while the JSON path kept working. Add the crate's native-tls feature. It matches the existing reqwest-based writer's TLS (OpenSSL + system roots) and reuses openssl-sys, which reqwest already pulls into the build, so no new system dependency is introduced. Builds clean. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf --- rust_snuba/Cargo.lock | 1 + rust_snuba/Cargo.toml | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/rust_snuba/Cargo.lock b/rust_snuba/Cargo.lock index a33fda1f7eb..a5b5edf1b74 100644 --- a/rust_snuba/Cargo.lock +++ b/rust_snuba/Cargo.lock @@ -735,6 +735,7 @@ dependencies = [ "futures-util", "http-body-util", "hyper 1.6.0", + "hyper-tls 0.6.0", "hyper-util", "lz4_flex", "polonius-the-crab", diff --git a/rust_snuba/Cargo.toml b/rust_snuba/Cargo.toml index 3c49e77a3ea..67f4e889924 100644 --- a/rust_snuba/Cargo.toml +++ b/rust_snuba/Cargo.toml @@ -31,6 +31,10 @@ clickhouse = { version = "0.15", default-features = false, features = [ "inserter", "lz4", "uuid", + # TLS for `secure: true` clusters (https URLs). native-tls matches the + # existing reqwest-based writer (OpenSSL + system roots), and openssl-sys is + # already pulled in by reqwest, so this adds no new system dependency. + "native-tls", ] } ctrlc = { version = "3.2.5", features = ["termination"] } data-encoding = "2.5.0"