Skip to content
This repository was archived by the owner on Jul 18, 2025. It is now read-only.

Commit 224dfb0

Browse files
committed
Update spdy implementation to use provider interface
Define and use the simple provider interface instead of relying on spdystream directly. Signed-off-by: Derek McGowan <derek@mcgstyle.net> (github: dmcgowan)
1 parent 87eb956 commit 224dfb0

7 files changed

Lines changed: 242 additions & 192 deletions

File tree

spdy/encode.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func (c *channel) encodeExtension(iv reflect.Value) (int, []byte, error) {
114114
return 0, nil, errors.New("bad type")
115115
}
116116
if v.session != c.session {
117-
streamCopy, err := c.session.createByteStream()
117+
streamCopy, err := c.createByteStream()
118118
if err != nil {
119119
return 0, nil, err
120120
}
@@ -133,7 +133,7 @@ func (c *channel) encodeExtension(iv reflect.Value) (int, []byte, error) {
133133
return 2, b, err
134134
case io.Reader:
135135
// Either ReadWriteCloser, ReadWriter, or ReadCloser
136-
streamCopy, err := c.session.createByteStream()
136+
streamCopy, err := c.createByteStream()
137137
if err != nil {
138138
return 0, nil, err
139139
}
@@ -154,7 +154,7 @@ func (c *channel) encodeExtension(iv reflect.Value) (int, []byte, error) {
154154
b, err := streamCopy.(*byteStream).streamBytes()
155155
return 2, b, err
156156
case io.Writer:
157-
streamCopy, err := c.session.createByteStream()
157+
streamCopy, err := c.createByteStream()
158158
if err != nil {
159159
return 0, nil, err
160160
}
@@ -208,13 +208,13 @@ func (c *channel) encodeExtension(iv reflect.Value) (int, []byte, error) {
208208
return 0, nil, nil
209209
}
210210

211-
func (s *Transport) decodeStream(v reflect.Value, b []byte) error {
211+
func (c *channel) decodeStream(v reflect.Value, b []byte) error {
212212
referenceID, err := decodeReferenceID(b)
213213
if err != nil {
214214
return err
215215
}
216216

217-
bs := s.getByteStream(referenceID)
217+
bs := c.session.getByteStream(referenceID)
218218
if bs != nil {
219219
v.Set(reflect.ValueOf(bs))
220220
}
@@ -226,6 +226,6 @@ func (c *channel) initializeExtensions() *msgpack.Extensions {
226226
exts := msgpack.NewExtensions()
227227
exts.SetEncoder(c.encodeExtension)
228228
exts.AddDecoder(1, reflect.TypeOf(&channel{}), c.decodeChannel)
229-
exts.AddDecoder(2, reflect.TypeOf(&byteStream{}), c.session.decodeStream)
229+
exts.AddDecoder(2, reflect.TypeOf(&byteStream{}), c.decodeStream)
230230
return exts
231231
}

spdy/listener.go

Lines changed: 0 additions & 49 deletions
This file was deleted.

spdy/pipe.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,48 @@
11
package spdy
22

33
import (
4+
"io"
45
"net"
56

67
"github.com/docker/libchan"
78
)
89

910
type pipeSender struct {
10-
session *Transport
11+
session libchan.Transport
1112
sender *channel
1213
}
1314

1415
type pipeReceiver struct {
15-
session *Transport
16+
session libchan.Transport
1617
receiver *channel
1718
}
1819

1920
// Pipe creates a top-level channel pipe using an in memory transport.
2021
func Pipe() (libchan.Receiver, libchan.Sender, error) {
2122
c1, c2 := net.Pipe()
2223

23-
s1, err := newSession(c1, false)
24+
s1, err := NewSpdyStreamProvider(c1, false)
2425
if err != nil {
2526
return nil, nil, err
2627
}
28+
t1 := NewTransport(s1)
2729

28-
s2, err := newSession(c2, true)
30+
s2, err := NewSpdyStreamProvider(c2, true)
2931
if err != nil {
3032
return nil, nil, err
3133
}
34+
t2 := NewTransport(s2)
3235

3336
var receiver libchan.Receiver
3437
waitError := make(chan error)
3538

3639
go func() {
3740
var err error
38-
receiver, err = s2.WaitReceiveChannel()
41+
receiver, err = t2.WaitReceiveChannel()
3942
waitError <- err
4043
}()
4144

42-
sender, senderErr := s1.NewSendChannel()
45+
sender, senderErr := t1.NewSendChannel()
4346
if senderErr != nil {
4447
c1.Close()
4548
c2.Close()
@@ -52,7 +55,7 @@ func Pipe() (libchan.Receiver, libchan.Sender, error) {
5255
c2.Close()
5356
return nil, nil, receiveErr
5457
}
55-
return &pipeReceiver{s2, receiver.(*channel)}, &pipeSender{s1, sender.(*channel)}, nil
58+
return &pipeReceiver{t2, receiver.(*channel)}, &pipeSender{t1, sender.(*channel)}, nil
5659
}
5760

5861
func (p *pipeSender) Send(message interface{}) error {
@@ -64,7 +67,10 @@ func (p *pipeSender) Close() error {
6467
if err != nil {
6568
return err
6669
}
67-
return p.session.Close()
70+
if closer, ok := p.session.(io.Closer); ok {
71+
return closer.Close()
72+
}
73+
return nil
6874
}
6975

7076
func (p *pipeReceiver) Receive(message interface{}) error {

0 commit comments

Comments
 (0)