Replace RowBinary serializer with clickhouse-crate inserter sink [EAP-584]#8063
Open
phacops wants to merge 7 commits into
Open
Replace RowBinary serializer with clickhouse-crate inserter sink [EAP-584]#8063phacops wants to merge 7 commits into
phacops wants to merge 7 commits into
Conversation
Replace the hand-rolled RowBinary write path for eap_items (a vendored RowBinary serializer + reqwest/native-LZ4 HTTP client) with the official `clickhouse` crate's `Inserter`, picking up RowBinaryWithNamesAndTypes schema validation, compression, and connection pooling. The Rust consumer is a sentry_arroyo push-based pipeline, so the crate's typed `Inserter` is wired into a new `EapItemsInserterSink` strategy: - The eap_items processor now emits a typed `EAPItemRow` (`process_message_eap_row` → `BytesInsertBatch<Option<EAPItemRow>>`) instead of pre-encoded bytes. - `EapItemsInserterSink` owns a long-lived `Inserter<EAPItemRow>` on a dedicated actor task. It writes each row on arrival (the wide struct is serialized into the inserter buffer and dropped immediately, so peak memory stays bounded by the buffer, not row count) and lets the inserter own the flush boundary via `with_max_rows`/`with_max_bytes` + `with_period`. On each `commit()` flush it pushes exactly the flushed rows' Kafka offsets downstream — the flush is the durability barrier, so a failed flush never advances offsets (batch replays). - `EAPItemRow` derives `clickhouse::Row`; the hand-maintained `COLUMN_NAMES` and the vendored `rowbinary` serializer are removed (the crate generates the column list and RowBinaryWithNamesAndTypes tolerates field ordering). The path is gated by the existing `use_row_binary` flag (EAPItemsProcessor only); JSONEachRow remains the default and the rollback. The JSONEachRow HTTP writer (`writer_v2`'s client + native-LZ4) stays for all other storages and is simplified to JSON-only. The crate's validation requires Rust 1.89+, so bump the pinned toolchain to 1.94.1 (Dockerfile/CI install from rust-toolchain.toml); this surfaced two newer clippy lints on existing code, fixed in passing. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf
Address PR review findings: - Actor write-ahead past failure (Bugbot, high): on a write/commit error the actor previously rebuilt the inserter and kept consuming queued rows. If a later window flushed durably while the strategy had already failed the consumer on the first error, those rows landed in ClickHouse with their offsets uncommitted → duplicates on replay. The actor now fail-stops (surfaces the error and returns), matching the old writer's fail-stop + replay model; the in-flight window never flushed, so it replays cleanly. - join() timeout doubling (Seer, medium): the shutdown wait loop could consume the full timeout and then pass the original timeout to next_step.join(), blocking up to 2x the caller's budget. Pass the remaining time instead. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf
Re-add the resilience the old JSON writer had (RetryConfig: jittered exponential backoff, 4 retries) to the clickhouse-crate inserter path, without an unbounded memory cost: - The actor retains the current unflushed window's rows (bounded by max_batch_size) until that window's flush succeeds. - On a write/commit error the inserter is poisoned; the actor replays the retained window through a fresh inserter via flush_window_with_retry (write all rows + end()), with backoff, before failing. On success it resets the main inserter, emits the window's offsets, and continues. - If retries are exhausted, it fail-stops (offsets never pushed → replay), matching the old fail-stop-after-retries model. Peak memory is now one batch of typed rows plus the inserter's byte buffer (the accepted tradeoff for retry), still bounded by max_batch_size, not by total row count. Also: clarify in submit() why an unreserved/closed channel is safe (try_reserve rejects a closed channel; a dropped row's offset is never committed, so it replays). The skip-only offset-advance guard is preserved as `acc.rows.is_empty()` — it must not emit while real rows are buffered but unflushed, since their offsets are lower than later skips and committing past them would lose data. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf
Address a PR review finding (Bugbot, high) about potentially committing offsets past not-yet-durable rows. In our usage this can't happen: we commit() after every single write(), and the crate flushes the entire current INSERT (never a strict subset), so a flush always covers exactly the retained window and emit_ready only advances offsets for durable rows. Make that invariant explicit and enforced with a debug_assert that the flushed row count equals acc.rows.len(), so a future change (e.g. batching multiple writes before a commit) can't silently regress into committing ahead. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf
Address a PR review finding (Bugbot, medium): if join() hit its deadline while the actor was still finalizing, a late FlushOutcome::Ready was never drained, so rows could be durable in ClickHouse with offsets uncommitted → duplicate on replay. On timeout, abort the actor (cancelling an in-flight INSERT that hasn't landed, so that window simply replays rather than committing rows we'll never ack), then drain outcomes one final time so any Ready produced right around the deadline still has its offsets pushed. A flush that already landed server-side before the abort replays as a duplicate — the same at-least-once shutdown exposure the old writer had, to be closed by the (deferred) insert_deduplication_token. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit ab7d324. Configure here.
…y column CI's live-ClickHouse test_rust job failed: Code: 117 ... Type of 'attributes_array' must be JSON(max_dynamic_paths=128), not JSON ... While executing BinaryRowInputFormat. (INCORRECT_DATA) `eap_items.attributes_array` is a native JSON(max_dynamic_paths=N) column. Under RowBinaryWithNamesAndTypes the server validates the client's names+types header, and our `String` field can't satisfy a parametrized JSON column (no Rust type maps to it). The old hand-rolled writer avoided this by sending plain RowBinary + input_format_binary_read_json_as_string=1 (write the JSON as a string, let ClickHouse parse it into the JSON column). Set Client::with_validation(false) so the crate uses plain RowBinary, matching the old wire behavior. The crate still emits the column list (from the Row derive) in the INSERT, so column mapping stays correct without a hand-rolled list. Trade-off: this drops RowBinaryWithNamesAndTypes schema validation (it is incompatible with the JSON column, not merely an optimization). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf
Address a PR review finding (Bugbot, medium): the clickhouse crate was built with default-features = false and no TLS backend, so build_client's https URL (used when ClickhouseConfig.secure is true) had no TLS stack — row-binary inserts to secure clusters would fail while the JSON path kept working. Add the crate's native-tls feature. It matches the existing reqwest-based writer's TLS (OpenSSL + system roots) and reuses openssl-sys, which reqwest already pulls into the build, so no new system dependency is introduced. Builds clean. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.

Overview
This PR replaces the custom RowBinary serialization pipeline for
eap_itemswith a newEapItemsInserterSinkthat uses the officialclickhousecrate'sInserterAPI. This eliminates the need for custom serialization code and improves memory efficiency by streaming rows directly to ClickHouse.Key Changes
New Inserter Sink (
inserter_sink.rs)clickhouse::Inserter<EAPItemRow>instancemax_rows,max_bytes,max_period)Processor Changes (
eap_items.rs)process_message_row_binarytoprocess_message_eap_rowfor clarityTypedInsertBatch<EAPItemRow>instead of pre-encoded bytesType System (
types.rs)TypedInsertBatch<R>to represent rows that will be serialized downstreamrow == None) for messages that don't produce insertsPipeline Wiring (
factory_v2.rs,processor.rs)make_rust_processor_typedto build pipelines for typed-row processorsuse_row_binaryis enabled, eap_items now uses the inserter sink path instead of theReduce+ClickhouseWriterSteppairEAPItemsProcessorRemoved Code
rowbinary/ser.rsserializer module (replaced byclickhousecrate)InsertFormatenum and related writer step configurationclickhouse::serde::uuidinstead)Dependencies
clickhousecrate (v0.15) with default features disabledBenefits
clickhousecrate's battle-tested inserter instead of custom serializationclickhousecrate validates rows against the table schema at serialization timeTesting
Existing unit tests for the RowBinary serializer are removed as that functionality is now provided by the upstream crate. The inserter sink's behavior is covered by integration with the processor pipeline and the existing Arroyo strategy test infrastructure.
https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf