Skip to content
This repository was archived by the owner on Jul 18, 2025. It is now read-only.

Commit 6f7b9df

Browse files
committed
Reduce transport interface and expose underlying spdy transport
Signed-off-by: Derek McGowan <derek@mcgstyle.net> (github: dmcgowan)
1 parent 1e6ebd2 commit 6f7b9df

7 files changed

Lines changed: 67 additions & 76 deletions

File tree

libchan.go

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package libchan
22

33
import (
44
"io"
5-
"net"
65
)
76

87
// Transport represents a connection which can multiplex channels and
@@ -16,27 +15,6 @@ type Transport interface {
1615
// WaitReceiveChannel waits for a new channel be created by the
1716
// remote end of the transport calling NewSendChannel.
1817
WaitReceiveChannel() (Receiver, error)
19-
20-
// RegisterConn registers a network connection to be used
21-
// by inbound messages referring to the connection
22-
// with the registered connection's local and remote address.
23-
// Note: a connection does not need to be registered before
24-
// being sent in a message, but does need to be registered
25-
// to by the receiver of a message. If registration should be
26-
// automatic, register a listener instead.
27-
RegisterConn(net.Conn) error
28-
29-
// RegisterListener accepts all connections from the listener
30-
// and immediately registers them.
31-
RegisterListener(net.Listener)
32-
33-
// Unregister removes the connection from the list of known
34-
// connections. This should be called when a connection is
35-
// closed and no longer expected in inbound messages.
36-
// Failure to unregister connections will increase memory
37-
// usage since the transport is not notified of closed
38-
// connections to automatically unregister.
39-
Unregister(net.Conn)
4018
}
4119

4220
// Sender is a channel which sent messages of any content

spdy/conn_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ type RecvMultiMessage struct {
2525

2626
func TestMultiTcpByteStream(t *testing.T) {
2727
wait := make(chan bool)
28-
client := func(t *testing.T, sender libchan.Sender, s *session) {
28+
client := func(t *testing.T, sender libchan.Sender, s *SpdyTransport) {
2929
<-wait
3030
both, connErr := net.Dial("tcp", "localhost:9272")
3131
if connErr != nil {
@@ -97,7 +97,7 @@ func TestMultiTcpByteStream(t *testing.T) {
9797
in.Close()
9898
out.Close()
9999
}
100-
server := func(t *testing.T, receiver libchan.Receiver, s *session) {
100+
server := func(t *testing.T, receiver libchan.Receiver, s *SpdyTransport) {
101101
listener, listenerErr := net.Listen("tcp", "localhost:9272")
102102
if listenerErr != nil {
103103
t.Fatalf("Error creating byte stream listener: %s", listenerErr)

spdy/encode.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
"github.com/dmcgowan/go/codec"
1111
)
1212

13-
func (s *session) encodeChannel(v reflect.Value) ([]byte, error) {
13+
func (s *SpdyTransport) encodeChannel(v reflect.Value) ([]byte, error) {
1414
rc := v.Interface().(channel)
1515
if rc.stream == nil {
1616
return nil, errors.New("bad type")
@@ -36,7 +36,7 @@ func (s *session) encodeChannel(v reflect.Value) ([]byte, error) {
3636
return buf[:(written + 1)], nil
3737
}
3838

39-
func (s *session) decodeChannel(v reflect.Value, b []byte) error {
39+
func (s *SpdyTransport) decodeChannel(v reflect.Value, b []byte) error {
4040
rc := v.Interface().(channel)
4141

4242
if b[0] == 0x01 {
@@ -62,7 +62,7 @@ func (s *session) decodeChannel(v reflect.Value, b []byte) error {
6262
return nil
6363
}
6464

65-
func (s *session) encodeStream(v reflect.Value) ([]byte, error) {
65+
func (s *SpdyTransport) encodeStream(v reflect.Value) ([]byte, error) {
6666
bs := v.Interface().(byteStream)
6767
if bs.ReferenceId == 0 {
6868
return nil, errors.New("bad type")
@@ -73,7 +73,7 @@ func (s *session) encodeStream(v reflect.Value) ([]byte, error) {
7373
return buf[:written], nil
7474
}
7575

76-
func (s *session) decodeStream(v reflect.Value, b []byte) error {
76+
func (s *SpdyTransport) decodeStream(v reflect.Value, b []byte) error {
7777
referenceId, readN := binary.Uvarint(b)
7878
if readN == 0 {
7979
return errors.New("bad reference id")
@@ -87,7 +87,7 @@ func (s *session) decodeStream(v reflect.Value, b []byte) error {
8787
return nil
8888
}
8989

90-
func (s *session) waitConn(network, local, remote string, timeout time.Duration) (net.Conn, error) {
90+
func (s *SpdyTransport) waitConn(network, local, remote string, timeout time.Duration) (net.Conn, error) {
9191
timeoutChan := time.After(timeout)
9292
connChan := make(chan net.Conn)
9393

@@ -173,7 +173,7 @@ func decodeString3(b []byte) (string, string, string, error) {
173173
return s1, s2, s3, nil
174174
}
175175

176-
func (s *session) encodeNetConn(v reflect.Value) ([]byte, error) {
176+
func (s *SpdyTransport) encodeNetConn(v reflect.Value) ([]byte, error) {
177177
var conn net.Conn
178178
switch t := v.Interface().(type) {
179179
case net.TCPConn:
@@ -190,7 +190,7 @@ func (s *session) encodeNetConn(v reflect.Value) ([]byte, error) {
190190
return encodeString3(conn.LocalAddr().Network(), conn.RemoteAddr().String(), conn.LocalAddr().String())
191191
}
192192

193-
func (s *session) decodeNetConn(v reflect.Value, b []byte) error {
193+
func (s *SpdyTransport) decodeNetConn(v reflect.Value, b []byte) error {
194194
network, local, remote, err := decodeString3(b)
195195
if err != nil {
196196
return err
@@ -205,7 +205,7 @@ func (s *session) decodeNetConn(v reflect.Value, b []byte) error {
205205
return nil
206206
}
207207

208-
func (s *session) initializeHandler() *codec.MsgpackHandle {
208+
func (s *SpdyTransport) initializeHandler() *codec.MsgpackHandle {
209209
mh := &codec.MsgpackHandle{WriteExt: true}
210210
mh.RawToString = true
211211
err := mh.AddExt(reflect.TypeOf(channel{}), 0x01, s.encodeChannel, s.decodeChannel)

spdy/listener.go

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,38 +2,36 @@ package spdy
22

33
import (
44
"net"
5-
6-
"github.com/docker/libchan"
75
)
86

9-
// SessionListener is a listener which accepts new
10-
// connections angi rd spawns spdy sessions.
11-
type SessionListener struct {
7+
// TransportListener is a listener which accepts new
8+
// connections angi rd spawns spdy transports.
9+
type TransportListener struct {
1210
listener net.Listener
1311
auth Authenticator
1412
}
1513

16-
// NewSessionListener creates a new listen session using
14+
// NewTransportListener creates a new listen transport using
1715
// a network listeners and function to authenticate
18-
// new connections. SessionListener expects tls session
16+
// new connections. TransportListener expects tls session
1917
// handling to occur by the authenticator or the listener,
20-
// SessionListener will not perform tls handshakes.
21-
func NewSessionListener(listener net.Listener, auth Authenticator) (*SessionListener, error) {
22-
return &SessionListener{
18+
// TransportListener will not perform tls handshakes.
19+
func NewTransportListener(listener net.Listener, auth Authenticator) (*TransportListener, error) {
20+
return &TransportListener{
2321
listener: listener,
2422
auth: auth,
2523
}, nil
2624
}
2725

2826
// Close closes the underlying listener
29-
func (l *SessionListener) Close() error {
27+
func (l *TransportListener) Close() error {
3028
return l.listener.Close()
3129
}
3230

33-
// AcceptSessions waits for a new network connections
31+
// AcceptTransport waits for a new network connections
3432
// and creates a new stream. Connections which fail
3533
// authentication will not be returned.
36-
func (l *SessionListener) AcceptSession() (libchan.Transport, error) {
34+
func (l *TransportListener) AcceptTransport() (*SpdyTransport, error) {
3735
for {
3836
conn, err := l.listener.Accept()
3937
if err != nil {

spdy/pipe.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ import (
88
)
99

1010
type pipeSender struct {
11-
session *session
11+
session *SpdyTransport
1212
sender libchan.Sender
1313
}
1414

1515
type pipeReceiver struct {
16-
session *session
16+
session *SpdyTransport
1717
receiver libchan.Receiver
1818
}
1919

spdy/session.go

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ var (
2424
ErrWrongDirection = errors.New("Wrong channel direction")
2525
)
2626

27-
// session is a transport session on top of a network
27+
// SpdyTransport is a transport session on top of a network
2828
// connection using spdy.
29-
type session struct {
29+
type SpdyTransport struct {
3030
conn *spdystream.Connection
3131
handler codec.Handle
3232

@@ -44,32 +44,32 @@ type session struct {
4444

4545
type channel struct {
4646
stream *spdystream.Stream
47-
session *session
47+
session *SpdyTransport
4848
direction direction
4949
}
5050

51-
// NewClientSession creates a new stream session from the
51+
// NewClientTransport creates a new stream transport from the
5252
// provided network connection. The network connection is
5353
// expected to already provide a tls session.
54-
func NewClientSession(conn net.Conn) (libchan.Transport, error) {
54+
func NewClientTransport(conn net.Conn) (*SpdyTransport, error) {
5555
return newSession(conn, false)
5656
}
5757

58-
// NewServerSession creates a new stream session from the
58+
// NewServerTransport creates a new stream transport from the
5959
// provided network connection. The network connection is
6060
// expected to already have completed the tls handshake.
61-
func NewServerSession(conn net.Conn) (libchan.Transport, error) {
61+
func NewServerTransport(conn net.Conn) (*SpdyTransport, error) {
6262
return newSession(conn, true)
6363
}
6464

65-
func newSession(conn net.Conn, server bool) (*session, error) {
65+
func newSession(conn net.Conn, server bool) (*SpdyTransport, error) {
6666
var referenceCounter uint64
6767
if server {
6868
referenceCounter = 2
6969
} else {
7070
referenceCounter = 1
7171
}
72-
session := &session{
72+
session := &SpdyTransport{
7373
streamChan: make(chan *spdystream.Stream),
7474
referenceCounter: referenceCounter,
7575
byteStreamC: sync.NewCond(new(sync.Mutex)),
@@ -91,7 +91,7 @@ func newSession(conn net.Conn, server bool) (*session, error) {
9191
return session, nil
9292
}
9393

94-
func (s *session) newStreamHandler(stream *spdystream.Stream) {
94+
func (s *SpdyTransport) newStreamHandler(stream *spdystream.Stream) {
9595
referenceIdString := stream.Headers().Get("libchan-ref")
9696

9797
returnHeaders := http.Header{}
@@ -120,7 +120,7 @@ func (s *session) newStreamHandler(stream *spdystream.Stream) {
120120
stream.SendReply(returnHeaders, finish)
121121
}
122122

123-
func (s *session) getByteStream(referenceId uint64) *byteStream {
123+
func (s *SpdyTransport) getByteStream(referenceId uint64) *byteStream {
124124
s.byteStreamC.L.Lock()
125125
bs, ok := s.byteStreams[referenceId]
126126
if !ok {
@@ -131,7 +131,7 @@ func (s *session) getByteStream(referenceId uint64) *byteStream {
131131
return bs
132132
}
133133

134-
func (s *session) dial(referenceId uint64) (*byteStream, error) {
134+
func (s *SpdyTransport) dial(referenceId uint64) (*byteStream, error) {
135135
headers := http.Header{}
136136
headers.Set("libchan-ref", strconv.FormatUint(referenceId, 10))
137137
stream, streamErr := s.conn.CreateStream(headers, nil, false)
@@ -145,7 +145,7 @@ func (s *session) dial(referenceId uint64) (*byteStream, error) {
145145
return bs, nil
146146
}
147147

148-
func (s *session) createByteStream() (io.ReadWriteCloser, error) {
148+
func (s *SpdyTransport) createByteStream() (io.ReadWriteCloser, error) {
149149
s.referenceLock.Lock()
150150
referenceId := s.referenceCounter
151151
s.referenceCounter = referenceId + 2
@@ -172,7 +172,14 @@ func addrKey(local, remote string) string {
172172
return string(b)
173173
}
174174

175-
func (s *session) RegisterConn(conn net.Conn) error {
175+
// RegisterConn registers a network connection to be used
176+
// by inbound messages referring to the connection
177+
// with the registered connection's local and remote address.
178+
// Note: a connection does not need to be registered before
179+
// being sent in a message, but does need to be registered
180+
// to by the receiver of a message. If registration should be
181+
// automatic, register a listener instead.
182+
func (s *SpdyTransport) RegisterConn(conn net.Conn) error {
176183
s.netConnC.L.Lock()
177184
defer s.netConnC.L.Unlock()
178185
networkType, ok := s.networks[conn.LocalAddr().Network()]
@@ -186,7 +193,9 @@ func (s *session) RegisterConn(conn net.Conn) error {
186193
return nil
187194
}
188195

189-
func (s *session) RegisterListener(listener net.Listener) {
196+
// RegisterListener accepts all connections from the listener
197+
// and immediately registers them.
198+
func (s *SpdyTransport) RegisterListener(listener net.Listener) {
190199
go func() {
191200
for {
192201
conn, err := listener.Accept()
@@ -203,18 +212,24 @@ func (s *session) RegisterListener(listener net.Listener) {
203212
}()
204213
}
205214

206-
func (s *session) Unregister(net.Conn) {
215+
// Unregister removes the connection from the list of known
216+
// connections. This should be called when a connection is
217+
// closed and no longer expected in inbound messages.
218+
// Failure to unregister connections will increase memory
219+
// usage since the transport is not notified of closed
220+
// connections to automatically unregister.
221+
func (s *SpdyTransport) Unregister(net.Conn) {
207222

208223
}
209224

210-
func (s *session) Close() error {
225+
func (s *SpdyTransport) Close() error {
211226
return s.conn.Close()
212227
}
213228

214229
// NewSendChannel creates and returns a new send channel. The receive
215230
// end will get picked up on the remote end through the remote calling
216231
// WaitReceiveChannel.
217-
func (s *session) NewSendChannel() (libchan.Sender, error) {
232+
func (s *SpdyTransport) NewSendChannel() (libchan.Sender, error) {
218233
stream, streamErr := s.conn.CreateStream(http.Header{}, nil, false)
219234
if streamErr != nil {
220235
return nil, streamErr
@@ -224,7 +239,7 @@ func (s *session) NewSendChannel() (libchan.Sender, error) {
224239

225240
// WaitReceiveChannel waits for a new channel be created by a remote
226241
// call to NewSendChannel.
227-
func (s *session) WaitReceiveChannel() (libchan.Receiver, error) {
242+
func (s *SpdyTransport) WaitReceiveChannel() (libchan.Receiver, error) {
228243
stream, ok := <-s.streamChan
229244
if !ok {
230245
return nil, io.EOF

0 commit comments

Comments
 (0)