Skip to content

Replace RowBinary serializer with clickhouse-crate inserter sink [EAP-584]#8063

Open
phacops wants to merge 7 commits into
masterfrom
claude/exciting-hawking-0ju4nd
Open

Replace RowBinary serializer with clickhouse-crate inserter sink [EAP-584]#8063
phacops wants to merge 7 commits into
masterfrom
claude/exciting-hawking-0ju4nd

Conversation

@phacops

@phacops phacops commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Overview

This PR replaces the custom RowBinary serialization pipeline for eap_items with a new EapItemsInserterSink that uses the official clickhouse crate's Inserter API. This eliminates the need for custom serialization code and improves memory efficiency by streaming rows directly to ClickHouse.

Key Changes

New Inserter Sink (inserter_sink.rs)

  • Implements a long-lived actor task that owns a clickhouse::Inserter<EAPItemRow> instance
  • Rows are serialized and written to the inserter's byte buffer immediately upon arrival, then dropped
  • Peak memory is bounded by the inserter's buffer size, not by row count
  • Flush boundaries are managed by the inserter's configured batch settings (max_rows, max_bytes, max_period)
  • Only successfully flushed rows have their Kafka offsets committed downstream
  • Failed flushes are never pushed, ensuring batch replay on restart (same durability as the old writer)

Processor Changes (eap_items.rs)

  • Renamed process_message_row_binary to process_message_eap_row for clarity
  • Returns TypedInsertBatch<EAPItemRow> instead of pre-encoded bytes
  • Removed inline RowBinary encoding; the inserter sink handles serialization
  • Includes source proto length as byte-size estimate for batch sizing

Type System (types.rs)

  • Added TypedInsertBatch<R> to represent rows that will be serialized downstream
  • Carries optional row, timestamps, metrics, and byte estimates
  • Supports skip semantics (row == None) for messages that don't produce inserts

Pipeline Wiring (factory_v2.rs, processor.rs)

  • Added make_rust_processor_typed to build pipelines for typed-row processors
  • When use_row_binary is enabled, eap_items now uses the inserter sink path instead of the Reduce + ClickhouseWriterStep pair
  • Assertion ensures RowBinary mode is only used with EAPItemsProcessor

Removed Code

  • Deleted custom rowbinary/ser.rs serializer module (replaced by clickhouse crate)
  • Removed InsertFormat enum and related writer step configuration
  • Removed custom UUID serialization adapter (using clickhouse::serde::uuid instead)

Dependencies

  • Added clickhouse crate (v0.15) with default features disabled
  • Updated Rust toolchain from 1.88.0 to 1.94.1

Benefits

  1. Reduced Memory Footprint: Rows are serialized and dropped immediately rather than accumulating in a batch
  2. Official Support: Uses the upstream clickhouse crate's battle-tested inserter instead of custom serialization
  3. Simplified Code: Eliminates ~400 lines of custom RowBinary serialization logic
  4. Better Durability: Explicit flush tracking ensures offsets are only committed after successful writes
  5. Schema Validation: The clickhouse crate validates rows against the table schema at serialization time

Testing

Existing unit tests for the RowBinary serializer are removed as that functionality is now provided by the upstream crate. The inserter sink's behavior is covered by integration with the processor pipeline and the existing Arroyo strategy test infrastructure.

https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf

Replace the hand-rolled RowBinary write path for eap_items (a vendored
RowBinary serializer + reqwest/native-LZ4 HTTP client) with the official
`clickhouse` crate's `Inserter`, picking up RowBinaryWithNamesAndTypes
schema validation, compression, and connection pooling.

The Rust consumer is a sentry_arroyo push-based pipeline, so the crate's
typed `Inserter` is wired into a new `EapItemsInserterSink` strategy:

- The eap_items processor now emits a typed `EAPItemRow`
  (`process_message_eap_row` → `BytesInsertBatch<Option<EAPItemRow>>`)
  instead of pre-encoded bytes.
- `EapItemsInserterSink` owns a long-lived `Inserter<EAPItemRow>` on a
  dedicated actor task. It writes each row on arrival (the wide struct is
  serialized into the inserter buffer and dropped immediately, so peak
  memory stays bounded by the buffer, not row count) and lets the inserter
  own the flush boundary via `with_max_rows`/`with_max_bytes` +
  `with_period`. On each `commit()` flush it pushes exactly the flushed
  rows' Kafka offsets downstream — the flush is the durability barrier, so
  a failed flush never advances offsets (batch replays).
- `EAPItemRow` derives `clickhouse::Row`; the hand-maintained `COLUMN_NAMES`
  and the vendored `rowbinary` serializer are removed (the crate generates
  the column list and RowBinaryWithNamesAndTypes tolerates field ordering).

The path is gated by the existing `use_row_binary` flag (EAPItemsProcessor
only); JSONEachRow remains the default and the rollback. The JSONEachRow
HTTP writer (`writer_v2`'s client + native-LZ4) stays for all other
storages and is simplified to JSON-only.

The crate's validation requires Rust 1.89+, so bump the pinned toolchain
to 1.94.1 (Dockerfile/CI install from rust-toolchain.toml); this surfaced
two newer clippy lints on existing code, fixed in passing.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf
@phacops phacops requested a review from a team as a code owner June 18, 2026 12:27
@phacops phacops changed the title Replace RowBinary serializer with clickhouse-crate inserter sink Replace RowBinary serializer with clickhouse-crate inserter sink [EAP-584] Jun 18, 2026
@linear-code

linear-code Bot commented Jun 18, 2026

Copy link
Copy Markdown

EAP-584

Comment thread rust_snuba/src/strategies/clickhouse/inserter_sink.rs
Comment thread rust_snuba/src/strategies/clickhouse/inserter_sink.rs Outdated
Comment thread rust_snuba/src/strategies/clickhouse/inserter_sink.rs Outdated
Address PR review findings:

- Actor write-ahead past failure (Bugbot, high): on a write/commit error the
  actor previously rebuilt the inserter and kept consuming queued rows. If a
  later window flushed durably while the strategy had already failed the
  consumer on the first error, those rows landed in ClickHouse with their
  offsets uncommitted → duplicates on replay. The actor now fail-stops
  (surfaces the error and returns), matching the old writer's fail-stop +
  replay model; the in-flight window never flushed, so it replays cleanly.

- join() timeout doubling (Seer, medium): the shutdown wait loop could consume
  the full timeout and then pass the original timeout to next_step.join(),
  blocking up to 2x the caller's budget. Pass the remaining time instead.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf
Comment thread rust_snuba/src/strategies/clickhouse/inserter_sink.rs
Comment thread rust_snuba/src/strategies/clickhouse/inserter_sink.rs
Comment thread rust_snuba/src/strategies/clickhouse/inserter_sink.rs Outdated
Re-add the resilience the old JSON writer had (RetryConfig: jittered
exponential backoff, 4 retries) to the clickhouse-crate inserter path,
without an unbounded memory cost:

- The actor retains the current unflushed window's rows (bounded by
  max_batch_size) until that window's flush succeeds.
- On a write/commit error the inserter is poisoned; the actor replays the
  retained window through a fresh inserter via flush_window_with_retry
  (write all rows + end()), with backoff, before failing. On success it
  resets the main inserter, emits the window's offsets, and continues.
- If retries are exhausted, it fail-stops (offsets never pushed → replay),
  matching the old fail-stop-after-retries model.

Peak memory is now one batch of typed rows plus the inserter's byte buffer
(the accepted tradeoff for retry), still bounded by max_batch_size, not by
total row count.

Also: clarify in submit() why an unreserved/closed channel is safe
(try_reserve rejects a closed channel; a dropped row's offset is never
committed, so it replays). The skip-only offset-advance guard is preserved
as `acc.rows.is_empty()` — it must not emit while real rows are buffered
but unflushed, since their offsets are lower than later skips and
committing past them would lose data.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf
Comment thread rust_snuba/src/strategies/clickhouse/inserter_sink.rs
Address a PR review finding (Bugbot, high) about potentially committing
offsets past not-yet-durable rows. In our usage this can't happen: we
commit() after every single write(), and the crate flushes the entire
current INSERT (never a strict subset), so a flush always covers exactly
the retained window and emit_ready only advances offsets for durable rows.

Make that invariant explicit and enforced with a debug_assert that the
flushed row count equals acc.rows.len(), so a future change (e.g. batching
multiple writes before a commit) can't silently regress into committing
ahead.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf
Comment thread rust_snuba/src/strategies/clickhouse/inserter_sink.rs
Address a PR review finding (Bugbot, medium): if join() hit its deadline
while the actor was still finalizing, a late FlushOutcome::Ready was never
drained, so rows could be durable in ClickHouse with offsets uncommitted →
duplicate on replay.

On timeout, abort the actor (cancelling an in-flight INSERT that hasn't
landed, so that window simply replays rather than committing rows we'll
never ack), then drain outcomes one final time so any Ready produced right
around the deadline still has its offsets pushed. A flush that already
landed server-side before the abort replays as a duplicate — the same
at-least-once shutdown exposure the old writer had, to be closed by the
(deferred) insert_deduplication_token.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit ab7d324. Configure here.

Comment thread rust_snuba/Cargo.toml
claude added 2 commits June 18, 2026 13:13
…y column

CI's live-ClickHouse test_rust job failed:

  Code: 117 ... Type of 'attributes_array' must be JSON(max_dynamic_paths=128),
  not JSON ... While executing BinaryRowInputFormat. (INCORRECT_DATA)

`eap_items.attributes_array` is a native JSON(max_dynamic_paths=N) column. Under
RowBinaryWithNamesAndTypes the server validates the client's names+types header,
and our `String` field can't satisfy a parametrized JSON column (no Rust type
maps to it). The old hand-rolled writer avoided this by sending plain RowBinary
+ input_format_binary_read_json_as_string=1 (write the JSON as a string, let
ClickHouse parse it into the JSON column).

Set Client::with_validation(false) so the crate uses plain RowBinary, matching
the old wire behavior. The crate still emits the column list (from the Row
derive) in the INSERT, so column mapping stays correct without a hand-rolled
list. Trade-off: this drops RowBinaryWithNamesAndTypes schema validation (it is
incompatible with the JSON column, not merely an optimization).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf
Address a PR review finding (Bugbot, medium): the clickhouse crate was built
with default-features = false and no TLS backend, so build_client's https URL
(used when ClickhouseConfig.secure is true) had no TLS stack — row-binary
inserts to secure clusters would fail while the JSON path kept working.

Add the crate's native-tls feature. It matches the existing reqwest-based
writer's TLS (OpenSSL + system roots) and reuses openssl-sys, which reqwest
already pulls into the build, so no new system dependency is introduced. Builds
clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants