@@ -158,6 +158,7 @@ const {
158158} = require ( 'internal/fs/promises' ) ;
159159
160160const {
161+ validateAbortSignal,
161162 validateBoolean,
162163 validateFunction,
163164 validateInteger,
@@ -859,7 +860,11 @@ function parseHeaderPairs(pairs) {
859860 const block = { __proto__ : null } ;
860861 for ( let n = 0 ; n + 1 < pairs . length ; n += 2 ) {
861862 if ( block [ pairs [ n ] ] !== undefined ) {
862- block [ pairs [ n ] ] = [ block [ pairs [ n ] ] , pairs [ n + 1 ] ] ;
863+ if ( ArrayIsArray ( block [ pairs [ n ] ] ) ) {
864+ ArrayPrototypePush ( block [ pairs [ n ] ] , pairs [ n + 1 ] ) ;
865+ } else {
866+ block [ pairs [ n ] ] = [ block [ pairs [ n ] ] , pairs [ n + 1 ] ] ;
867+ }
863868 } else {
864869 block [ pairs [ n ] ] = pairs [ n + 1 ] ;
865870 }
@@ -1045,9 +1050,9 @@ async function consumeSyncSource(handle, stream, source) {
10451050 if ( await writeBatchWithDrain ( handle , stream , batch ) ) return ;
10461051 }
10471052 handle . endWrite ( ) ;
1048- } catch {
1053+ } catch ( err ) {
10491054 if ( ! stream . destroyed ) {
1050- handle . resetStream ( 0n ) ;
1055+ stream . destroy ( err ) ;
10511056 }
10521057 }
10531058}
@@ -1666,29 +1671,69 @@ class QuicStream {
16661671 }
16671672
16681673 function endSync ( ) {
1674+ // Per the streams/iter spec, endSync and end follow a try-fallback
1675+ // pattern. That is, callers should try endSync first and if it returns
1676+ // -1, then they should call and await end(). This is a signal that sync
1677+ // end is not currently possible. However, we always support sync end
1678+ // here unless the stream is already errored.
16691679 if ( errored ) return - 1 ;
1680+
1681+ // If we're already closed, just return the total bytes written.
16701682 if ( closed ) return totalBytesWritten ;
1683+
1684+ // If we are waiting for drain to complete, we cannot end synchronously.
1685+ if ( drainWakeup != null ) return - 1 ;
1686+
1687+ // Fantastic, we can end synchronously!
16711688 handle . endWrite ( ) ;
16721689 closed = true ;
16731690 return totalBytesWritten ;
16741691 }
16751692
1676- async function end ( options ) {
1693+ async function end ( options = kEmptyObject ) {
1694+ validateObject ( options , 'options' ) ;
1695+ const { signal } = options ;
1696+ if ( signal !== undefined ) {
1697+ validateAbortSignal ( signal , 'options.signal' ) ;
1698+ signal . throwIfAborted ( ) ;
1699+ // TODO(@jasnell): The stream/iter spec allows individual sync end
1700+ // calls to be canceled via an AbortSignal. We currently do not support
1701+ // this, but we can add before the impl is graduated from experimental.
1702+ // At most we do here is check for signal abort at the start of the call.
1703+ }
1704+
1705+ // Per the streams/iter spec, endSync and end follow a try-fallback
1706+ // pattern. That is, callers should try endSync first and if it returns
1707+ // -1, then they should call and await end(). This is a signal that sync
1708+ // end is not currently possible. However, we always support sync end
1709+ // here unless the stream is already errored.
1710+ // While the user should have already called endSync, we call it again
1711+ // here to actually process the end request. At worst it's called twice.
16771712 const n = endSync ( ) ;
1713+
1714+ // A return value of -1 indicates that endSync was not yet able to
1715+ // process the end request, either because we are errored or because we
1716+ // are awaiting drain. If we're errored, throw the error. If we're waiting
1717+ // for drain, await it and then try ending again.
1718+
16781719 if ( n >= 0 ) return n ;
16791720 if ( errored ) throw error ;
1680- drainWakeup = PromiseWithResolvers ( ) ;
1681- await drainWakeup . promise ;
1682- drainWakeup = null ;
1721+
1722+ drainWakeup ??= PromiseWithResolvers ( ) ;
1723+ try {
1724+ await drainWakeup . promise ;
1725+ } finally {
1726+ drainWakeup = null ;
1727+ }
16831728 return endSync ( ) ;
16841729 }
16851730
16861731 function fail ( reason ) {
16871732 if ( closed || errored ) return ;
16881733 errored = true ;
1689- error = reason ;
1734+ error = reason ?? new ERR_INVALID_STATE ( 'Failed' ) ;
16901735 handle . resetStream ( 0n ) ;
1691- if ( drainWakeup ) {
1736+ if ( drainWakeup != null ) {
16921737 drainWakeup . reject ( reason ) ;
16931738 drainWakeup = null ;
16941739 }
@@ -1710,7 +1755,7 @@ class QuicStream {
17101755 [ drainableProtocol ] ( ) {
17111756 if ( closed || errored ) return null ;
17121757 if ( stream . #state. writeDesiredSize > 0 ) return null ;
1713- drainWakeup = PromiseWithResolvers ( ) ;
1758+ drainWakeup ?? = PromiseWithResolvers ( ) ;
17141759 return drainWakeup . promise ;
17151760 } ,
17161761 [ SymbolAsyncDispose ] ( ) {
@@ -1912,6 +1957,9 @@ class QuicStream {
19121957 this . #stats[ kFinishClose ] ( ) ;
19131958 this . #state[ kFinishClose ] ( ) ;
19141959 this . #session[ kRemoveStream ] ( this ) ;
1960+ if ( this . #writer !== undefined ) {
1961+ this . #writer. fail ( error ) ;
1962+ }
19151963 this . #session = undefined ;
19161964 this . #pendingClose. reject = undefined ;
19171965 this . #pendingClose. resolve = undefined ;
0 commit comments