Skip to content

Add optional in-band exception stream transformer to Node ResultSet#803

Open
Copilot wants to merge 1 commit into
mainfrom
copilot/adopt-ch-exception-parser
Open

Add optional in-band exception stream transformer to Node ResultSet#803
Copilot wants to merge 1 commit into
mainfrom
copilot/adopt-ch-exception-parser

Conversation

Copilot AI commented Jun 3, 2026

Copy link
Copy Markdown
Contributor

Summary

Adopts a stream transformer that parses ClickHouse's in-band HTTP exception block (the format-agnostic __exception__ trailer appended after a 200 status, 25.11+). It is wired into the Node ResultSet as an opt-in transformer over the internal stream, off by default. The compression handling from the original draft was dropped since the client already decodes Content-Encoding upstream.

  • New transformer (packages/client-node/src/utils/exception_stream.ts): ClickHouseExceptionStream passes result bytes through while retaining a 16 KiB sliding tail, then parses the block backwards at end-of-stream. On detection it errors with ClickHouseException (or, with throwOnException: false, ends cleanly and exposes it via getException() / clickhouse-exception event). Also exports ClickHouseStreamError and DEFAULT_MAX_BLOCK_SIZE.
  • Compression removed: dropped zlib, decompressorFor, clickHouseResponseParser, and the compose/header-bag plumbing; the transformer now expects an already-decompressed body.
  • ResultSet wiring (result_set.ts): new exceptionStream?: boolean option. When enabled and the X-ClickHouse-Exception-Tag header is present, the transformer is inserted between the consumed stream and the row parser; otherwise the pipeline is unchanged.
const rs = ResultSet.instance({
  stream, format, query_id, log_error, response_headers,
  exceptionStream: true, // opt-in; no-op without the exception-tag header
})

This is an initial adoption; tighter integration (public exports, reconciliation with the existing extractErrorAtTheEndOfChunk path, guaranteed delivery of pre-exception rows) is left for a follow-up.

Checklist

  • Unit and integration tests covering the common scenarios were added
  • A human-readable description of the changes was provided to include in CHANGELOG
  • For significant changes, documentation in https://github.com/ClickHouse/clickhouse-docs was updated with further explanations or tutorials

@CLAassistant

Copy link
Copy Markdown

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@codecov

codecov Bot commented Jun 3, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 75.70093% with 26 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
packages/client-node/src/utils/exception_stream.ts 74.00% 10 Missing and 16 partials ⚠️

📢 Thoughts on this report? Let us know!

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an opt-in Node-side Transform stream to detect ClickHouse’s in-band HTTP exception trailer (__exception__ block appended after a 200 response in 25.11+), and wires it into ResultSet.stream() behind a new exceptionStream?: boolean option (only activated when the x-clickhouse-exception-tag header is present).

Changes:

  • Introduces ClickHouseExceptionStream (plus related error types) that forwards bytes while retaining a bounded tail and parsing the exception trailer at end-of-stream.
  • Adds exceptionStream?: boolean to Node ResultSet and conditionally inserts the transformer into the internal streaming pipeline.
  • Adds unit tests covering pass-through, detection across chunk boundaries, truncated blocks, and ResultSet integration behavior.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.

File Description
packages/client-node/src/utils/index.ts Re-exports the new exception-stream utility from the utils barrel.
packages/client-node/src/utils/exception_stream.ts Implements the ClickHouseExceptionStream Transform and exception parsing logic.
packages/client-node/src/result_set.ts Adds an exceptionStream option and conditionally wires the transformer into ResultSet.stream().
packages/client-node/tests/unit/node_exception_stream.test.ts Adds unit/integration-style tests for the transformer and ResultSet wiring.

Comment on lines +101 to +115
constructor(opts: ClickHouseExceptionStreamOptions) {
super()
if (!opts.tag) {
throw new TypeError(
'tag is required (value of the X-ClickHouse-Exception-Tag header)',
)
}
this.tag = Buffer.from(opts.tag, 'ascii')
this.maxBlock = Math.max(
opts.maxBlockSize ?? DEFAULT_MAX_BLOCK_SIZE,
DEFAULT_MAX_BLOCK_SIZE,
)
this.throwOnException = opts.throwOnException ?? true
this.openPattern = Buffer.concat([CRLF, MARKER, CRLF, this.tag])
}
Comment on lines +122 to +137
override _transform(
chunk: Buffer,
_enc: BufferEncoding,
cb: TransformCallback,
): void {
this.buf = this.buf.length === 0 ? chunk : Buffer.concat([this.buf, chunk])
// Anything older than the last `maxBlock` bytes cannot be part of a
// <= maxBlock block sitting at the very end, so release it as confirmed data.
if (this.buf.length > this.maxBlock) {
const releaseLen = this.buf.length - this.maxBlock
const release = this.buf.subarray(0, releaseLen)
this.buf = this.buf.subarray(releaseLen)
this.push(release)
}
cb()
}
Comment on lines +216 to +237
const source = this.consume()
const pipelineCb = function pipelineCb(err: NodeJS.ErrnoException | null) {
if (
err &&
err.name !== 'AbortError' &&
err.message !== resultSetClosedMessage
) {
logError(err)
}
}

// Optionally plug the in-band exception transformer into the internal stream.
// Only used when explicitly enabled and the server advertised an exception tag.
const pipeline =
this.exceptionStream && this.exceptionTag !== undefined
? Stream.pipeline(
source,
new ClickHouseExceptionStream({ tag: this.exceptionTag }),
toRows,
pipelineCb,
)
: Stream.pipeline(source, toRows, pipelineCb)
Comment on lines 24 to +25
import { getAsText } from './utils'
import { ClickHouseExceptionStream } from './utils/exception_stream'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants