Add optional in-band exception stream transformer to Node ResultSet#803
Add optional in-band exception stream transformer to Node ResultSet#803Copilot wants to merge 1 commit into
Conversation
|
|
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Pull request overview
Adds an opt-in Node-side Transform stream to detect ClickHouse’s in-band HTTP exception trailer (__exception__ block appended after a 200 response in 25.11+), and wires it into ResultSet.stream() behind a new exceptionStream?: boolean option (only activated when the x-clickhouse-exception-tag header is present).
Changes:
- Introduces
ClickHouseExceptionStream(plus related error types) that forwards bytes while retaining a bounded tail and parsing the exception trailer at end-of-stream. - Adds
exceptionStream?: booleanto NodeResultSetand conditionally inserts the transformer into the internal streaming pipeline. - Adds unit tests covering pass-through, detection across chunk boundaries, truncated blocks, and ResultSet integration behavior.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| packages/client-node/src/utils/index.ts | Re-exports the new exception-stream utility from the utils barrel. |
| packages/client-node/src/utils/exception_stream.ts | Implements the ClickHouseExceptionStream Transform and exception parsing logic. |
| packages/client-node/src/result_set.ts | Adds an exceptionStream option and conditionally wires the transformer into ResultSet.stream(). |
| packages/client-node/tests/unit/node_exception_stream.test.ts | Adds unit/integration-style tests for the transformer and ResultSet wiring. |
| constructor(opts: ClickHouseExceptionStreamOptions) { | ||
| super() | ||
| if (!opts.tag) { | ||
| throw new TypeError( | ||
| 'tag is required (value of the X-ClickHouse-Exception-Tag header)', | ||
| ) | ||
| } | ||
| this.tag = Buffer.from(opts.tag, 'ascii') | ||
| this.maxBlock = Math.max( | ||
| opts.maxBlockSize ?? DEFAULT_MAX_BLOCK_SIZE, | ||
| DEFAULT_MAX_BLOCK_SIZE, | ||
| ) | ||
| this.throwOnException = opts.throwOnException ?? true | ||
| this.openPattern = Buffer.concat([CRLF, MARKER, CRLF, this.tag]) | ||
| } |
| override _transform( | ||
| chunk: Buffer, | ||
| _enc: BufferEncoding, | ||
| cb: TransformCallback, | ||
| ): void { | ||
| this.buf = this.buf.length === 0 ? chunk : Buffer.concat([this.buf, chunk]) | ||
| // Anything older than the last `maxBlock` bytes cannot be part of a | ||
| // <= maxBlock block sitting at the very end, so release it as confirmed data. | ||
| if (this.buf.length > this.maxBlock) { | ||
| const releaseLen = this.buf.length - this.maxBlock | ||
| const release = this.buf.subarray(0, releaseLen) | ||
| this.buf = this.buf.subarray(releaseLen) | ||
| this.push(release) | ||
| } | ||
| cb() | ||
| } |
| const source = this.consume() | ||
| const pipelineCb = function pipelineCb(err: NodeJS.ErrnoException | null) { | ||
| if ( | ||
| err && | ||
| err.name !== 'AbortError' && | ||
| err.message !== resultSetClosedMessage | ||
| ) { | ||
| logError(err) | ||
| } | ||
| } | ||
|
|
||
| // Optionally plug the in-band exception transformer into the internal stream. | ||
| // Only used when explicitly enabled and the server advertised an exception tag. | ||
| const pipeline = | ||
| this.exceptionStream && this.exceptionTag !== undefined | ||
| ? Stream.pipeline( | ||
| source, | ||
| new ClickHouseExceptionStream({ tag: this.exceptionTag }), | ||
| toRows, | ||
| pipelineCb, | ||
| ) | ||
| : Stream.pipeline(source, toRows, pipelineCb) |
| import { getAsText } from './utils' | ||
| import { ClickHouseExceptionStream } from './utils/exception_stream' |
Summary
Adopts a stream transformer that parses ClickHouse's in-band HTTP exception block (the format-agnostic
__exception__trailer appended after a 200 status, 25.11+). It is wired into the NodeResultSetas an opt-in transformer over the internal stream, off by default. The compression handling from the original draft was dropped since the client already decodesContent-Encodingupstream.packages/client-node/src/utils/exception_stream.ts):ClickHouseExceptionStreampasses result bytes through while retaining a 16 KiB sliding tail, then parses the block backwards at end-of-stream. On detection it errors withClickHouseException(or, withthrowOnException: false, ends cleanly and exposes it viagetException()/clickhouse-exceptionevent). Also exportsClickHouseStreamErrorandDEFAULT_MAX_BLOCK_SIZE.zlib,decompressorFor,clickHouseResponseParser, and thecompose/header-bag plumbing; the transformer now expects an already-decompressed body.result_set.ts): newexceptionStream?: booleanoption. When enabled and theX-ClickHouse-Exception-Tagheader is present, the transformer is inserted between the consumed stream and the row parser; otherwise the pipeline is unchanged.This is an initial adoption; tighter integration (public exports, reconciliation with the existing
extractErrorAtTheEndOfChunkpath, guaranteed delivery of pre-exception rows) is left for a follow-up.Checklist