Skip to content
This repository was archived by the owner on Jul 18, 2025. It is now read-only.

Commit 5516ed4

Browse files
committed
Update listener to use stream session
Listener is currently broken when multiple connections are created, using sessions allows proper separation of each connection. Signed-off-by: Derek McGowan <derek@mcgstyle.net> (github: dmcgowan)
1 parent 5d94bed commit 5516ed4

4 files changed

Lines changed: 68 additions & 89 deletions

File tree

http2/listener.go

Lines changed: 18 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,14 @@
11
package http2
22

33
import (
4-
"github.com/docker/libchan"
5-
"github.com/docker/spdystream"
64
"net"
7-
"net/http"
8-
"sync"
95
)
106

117
// ListenSession is a session manager which accepts new
128
// connections and spawns spdy connection managers.
139
type ListenSession struct {
14-
listener net.Listener
15-
streamChan chan *spdystream.Stream
16-
streamLock sync.RWMutex
17-
subStreamChans map[string]chan *spdystream.Stream
18-
auth Authenticator
10+
listener net.Listener
11+
auth Authenticator
1912
}
2013

2114
// NewListenSession creates a new listen session using
@@ -25,78 +18,32 @@ type ListenSession struct {
2518
// ListenSession will not perform tls handshakes.
2619
func NewListenSession(listener net.Listener, auth Authenticator) (*ListenSession, error) {
2720
return &ListenSession{
28-
listener: listener,
29-
streamChan: make(chan *spdystream.Stream),
30-
subStreamChans: make(map[string]chan *spdystream.Stream),
31-
auth: auth,
21+
listener: listener,
22+
auth: auth,
3223
}, nil
3324
}
3425

35-
func (l *ListenSession) streamHandler(stream *spdystream.Stream) {
36-
// TODO authorize stream
37-
stream.SendReply(http.Header{}, false)
38-
streamChan := l.getStreamChan(stream.Parent())
39-
streamChan <- stream
40-
}
41-
42-
func (l *ListenSession) addStreamChan(stream *spdystream.Stream, streamChan chan *spdystream.Stream) {
43-
l.streamLock.Lock()
44-
l.subStreamChans[stream.String()] = streamChan
45-
l.streamLock.Unlock()
46-
}
47-
48-
func (l *ListenSession) getStreamChan(stream *spdystream.Stream) chan *spdystream.Stream {
49-
if stream == nil {
50-
return l.streamChan
51-
}
52-
l.streamLock.RLock()
53-
defer l.streamLock.RUnlock()
54-
streamChan, ok := l.subStreamChans[stream.String()]
55-
if ok {
56-
return streamChan
57-
}
58-
return l.streamChan
26+
// Close closes the underlying listener
27+
func (l *ListenSession) Close() error {
28+
return l.listener.Close()
5929
}
6030

61-
// Serve runs the listen accept loop, spawning connection manager
62-
// for each accepted connection. This function will block until
63-
// the listener is closed or an error occurs on accept.
64-
func (l *ListenSession) Serve() {
31+
// AcceptSessions waits for a new network connections
32+
// and creates a new stream. Connections which fail
33+
// authentication will not be returned.
34+
func (l *ListenSession) AcceptSession() (*StreamSession, error) {
6535
for {
6636
conn, err := l.listener.Accept()
6737
if err != nil {
38+
return nil, err
39+
}
40+
authErr := l.auth(conn)
41+
if authErr != nil {
6842
// TODO log
69-
break
43+
conn.Close()
44+
continue
7045
}
7146

72-
go func() {
73-
authErr := l.auth(conn)
74-
if authErr != nil {
75-
// TODO log
76-
conn.Close()
77-
return
78-
}
79-
80-
spdyConn, spdyErr := spdystream.NewConnection(conn, true)
81-
if spdyErr != nil {
82-
// TODO log
83-
conn.Close()
84-
return
85-
}
86-
87-
go spdyConn.Serve(l.streamHandler)
88-
}()
47+
return newStreamSession(conn, true)
8948
}
9049
}
91-
92-
// Close closes the underlying listener
93-
func (l *ListenSession) Close() error {
94-
return l.listener.Close()
95-
}
96-
97-
// AcceptReceiver waits for a stream to be created in
98-
// the session and returns a receiver for that stream.
99-
func (l *ListenSession) AcceptReceiver() (libchan.Receiver, error) {
100-
stream := <-l.streamChan
101-
return &StreamReceiver{stream: stream, streamChans: l, ret: &StreamSender{stream: stream, streamChans: l}}, nil
102-
}

http2/listener_test.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,19 @@ func TestListenSession(t *testing.T) {
1414
t.Fatalf("Error creating listener: %s", listenErr)
1515
}
1616

17-
session, sessionErr := NewListenSession(listener, NoAuthenticator)
18-
if sessionErr != nil {
19-
t.Fatalf("Error creating session: %s", sessionErr)
17+
listenSession, listenSessionErr := NewListenSession(listener, NoAuthenticator)
18+
if listenSessionErr != nil {
19+
t.Fatalf("Error creating session: %s", listenSessionErr)
2020
}
2121

22-
go session.Serve()
23-
2422
end := make(chan bool)
2523
go exerciseServer(t, listen, end)
2624

27-
receiver, receiverErr := session.AcceptReceiver()
25+
session, sessionErr := listenSession.AcceptSession()
26+
if sessionErr != nil {
27+
t.Fatalf("Error accepting session: %s", sessionErr)
28+
}
29+
receiver, receiverErr := session.ReceiverWait()
2830
if receiverErr != nil {
2931
t.Fatalf("Error accepting receiver: %s", receiverErr)
3032
}

http2/stream.go

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,31 @@ import (
1616
type StreamSession struct {
1717
conn *spdystream.Connection
1818

19-
streamLock sync.Mutex
19+
streamCond *sync.Cond
2020
streamChan chan *spdystream.Stream
2121
subStreamChans map[string]chan *spdystream.Stream
2222
}
2323

2424
func (s *StreamSession) addStreamChan(stream *spdystream.Stream, streamChan chan *spdystream.Stream) {
25+
s.streamCond.L.Lock()
26+
defer s.streamCond.L.Unlock()
2527
s.subStreamChans[stream.String()] = streamChan
28+
s.streamCond.Broadcast()
2629
}
2730

2831
func (s *StreamSession) getStreamChan(stream *spdystream.Stream) chan *spdystream.Stream {
2932
if stream == nil {
3033
return s.streamChan
3134
}
35+
s.streamCond.L.Lock()
36+
defer s.streamCond.L.Unlock()
3237
streamChan, ok := s.subStreamChans[stream.String()]
33-
if ok {
34-
return streamChan
38+
for !ok {
39+
// TODO Test for stream being closed
40+
s.streamCond.Wait()
41+
streamChan, ok = s.subStreamChans[stream.String()]
3542
}
36-
return s.streamChan
43+
return streamChan
3744
}
3845

3946
func (s *StreamSession) newStreamHandler(stream *spdystream.Stream) {
@@ -44,14 +51,21 @@ func (s *StreamSession) newStreamHandler(stream *spdystream.Stream) {
4451

4552
// NewStreamSession creates a new stream session from the
4653
// provided network connection. The network connection is
47-
// expected to already provide a tls session.
54+
// expected to already provide a tls session. NewStreamSession
55+
// should only be called on client connections, server connections
56+
// should be created through ListenSession.
4857
func NewStreamSession(conn net.Conn) (*StreamSession, error) {
58+
return newStreamSession(conn, false)
59+
}
60+
61+
func newStreamSession(conn net.Conn, server bool) (*StreamSession, error) {
4962
session := &StreamSession{
5063
streamChan: make(chan *spdystream.Stream),
5164
subStreamChans: make(map[string]chan *spdystream.Stream),
65+
streamCond: sync.NewCond(new(sync.Mutex)),
5266
}
5367

54-
spdyConn, spdyErr := spdystream.NewConnection(conn, false)
68+
spdyConn, spdyErr := spdystream.NewConnection(conn, server)
5569
if spdyErr != nil {
5670
return nil, spdyErr
5771
}
@@ -76,9 +90,21 @@ func (s *StreamSession) NewSender() (libchan.Sender, error) {
7690
if waitErr != nil {
7791
return nil, waitErr
7892
}
93+
94+
streamChan := make(chan *spdystream.Stream)
95+
s.addStreamChan(stream, streamChan)
7996
return &StreamSender{stream: stream, streamChans: s}, nil
8097
}
8198

99+
// ReceiverWaits waits a new stream to be created and returns a
100+
// receiver with that stream.
101+
func (s *StreamSession) ReceiverWait() (libchan.Receiver, error) {
102+
stream := <-s.streamChan
103+
streamChan := make(chan *spdystream.Stream)
104+
s.addStreamChan(stream, streamChan)
105+
return &StreamReceiver{stream: stream, streamChans: s, ret: &StreamSender{stream: stream, streamChans: s}}, nil
106+
}
107+
82108
// Close closes the underlying network connection, causing
83109
// each stream created by this session to closed.
84110
func (s *StreamSession) Close() error {
@@ -115,6 +141,7 @@ func (s *StreamReceiver) Receive(mode int) (*libchan.Message, error) {
115141
} else {
116142
streamChan := s.streamChans.getStreamChan(s.stream)
117143
stream := <-streamChan
144+
s.streamChans.addStreamChan(stream, make(chan *spdystream.Stream))
118145

119146
data, dataErr := extractDataHeader(stream.Headers())
120147
if dataErr != nil {

http2/stream_test.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,17 +103,20 @@ func runServer(listen string, t *testing.T, endChan chan bool) (io.Closer, error
103103
return nil, lErr
104104
}
105105

106-
session, sessionErr := NewListenSession(listener, NoAuthenticator)
107-
if sessionErr != nil {
108-
t.Fatalf("Error creating session: %s", sessionErr)
106+
listenSession, listenSessionErr := NewListenSession(listener, NoAuthenticator)
107+
if listenSessionErr != nil {
108+
t.Fatalf("Error creating session listener: %s", listenSessionErr)
109109
}
110110

111-
go session.Serve()
112-
113111
go func() {
114112
defer close(endChan)
115113

116-
receiver, receiverErr := session.AcceptReceiver()
114+
session, sessionErr := listenSession.AcceptSession()
115+
if sessionErr != nil {
116+
t.Fatalf("Error accepting session: %s", sessionErr)
117+
}
118+
119+
receiver, receiverErr := session.ReceiverWait()
117120
if receiverErr != nil {
118121
t.Fatalf("Error accepting receiver: %s", receiverErr)
119122
}

0 commit comments

Comments
 (0)