11package spdy
22
33import (
4- "bytes "
4+ "bufio "
55 "errors"
66 "io"
77 "net"
@@ -48,6 +48,11 @@ type channel struct {
4848 stream * spdystream.Stream
4949 session * Transport
5050 direction direction
51+ encodeLock sync.Mutex
52+ encoder * msgpack.Encoder
53+ decodeLock sync.Mutex
54+ decoder * msgpack.Decoder
55+ buffer * bufio.Writer
5156}
5257
5358// NewClientTransport creates a new stream transport from the
@@ -126,6 +131,7 @@ func (s *Transport) newStreamHandler(stream *spdystream.Stream) {
126131 stream : stream ,
127132 session : s ,
128133 }
134+
129135 s .channelC .L .Lock ()
130136 s .channels [referenceID ] = c
131137 s .channelC .Broadcast ()
@@ -306,21 +312,19 @@ func (c *channel) Send(message interface{}) error {
306312 return ErrWrongDirection
307313 }
308314
309- buf := bytes . NewBuffer ( nil )
310- encoder := msgpack . NewEncoder ( buf )
311- encoder . AddExtensions ( c . initializeExtensions ())
312- encodeErr := encoder . Encode ( message )
313- if encodeErr != nil {
314- return encodeErr
315+ c . encodeLock . Lock ( )
316+ defer c . encodeLock . Unlock ( )
317+ if c . encoder == nil {
318+ c . buffer = bufio . NewWriter ( c . stream )
319+ c . encoder = msgpack . NewEncoder ( c . buffer )
320+ c . encoder . AddExtensions ( c . initializeExtensions ())
315321 }
316322
317- // TODO check length of buf
318- _ , writeErr := c .stream .Write (buf .Bytes ())
319- if writeErr != nil {
320- return writeErr
323+ if err := c .encoder .Encode (message ); err != nil {
324+ return err
321325 }
322326
323- return nil
327+ return c . buffer . Flush ()
324328}
325329
326330// Receive receives a message sent across the channel from
@@ -329,21 +333,20 @@ func (c *channel) Receive(message interface{}) error {
329333 if c .direction == outbound {
330334 return ErrWrongDirection
331335 }
332- buf , readErr := c . stream . ReadData ()
333- if readErr != nil {
334- if readErr == io . EOF {
335- c . stream . Close ()
336- }
337- return readErr
336+
337+ c . decodeLock . Lock ()
338+ defer c . decodeLock . Unlock ()
339+ if c . decoder == nil {
340+ c . decoder = msgpack . NewDecoder ( c . stream )
341+ c . decoder . AddExtensions ( c . initializeExtensions ())
338342 }
339343
340- decoder := msgpack .NewDecoder (bytes .NewReader (buf ))
341- decoder .AddExtensions (c .initializeExtensions ())
342- decodeErr := decoder .Decode (message )
343- if decodeErr != nil {
344- return decodeErr
344+ decodeErr := c .decoder .Decode (message )
345+ if decodeErr == io .EOF {
346+ c .stream .Close ()
347+ c .decoder = nil
345348 }
346- return nil
349+ return decodeErr
347350}
348351
349352func (c * channel ) SendTo (dst libchan.Sender ) (int , error ) {
0 commit comments