@@ -16,24 +16,31 @@ import (
1616type StreamSession struct {
1717 conn * spdystream.Connection
1818
19- streamLock sync.Mutex
19+ streamCond * sync.Cond
2020 streamChan chan * spdystream.Stream
2121 subStreamChans map [string ]chan * spdystream.Stream
2222}
2323
2424func (s * StreamSession ) addStreamChan (stream * spdystream.Stream , streamChan chan * spdystream.Stream ) {
25+ s .streamCond .L .Lock ()
26+ defer s .streamCond .L .Unlock ()
2527 s .subStreamChans [stream .String ()] = streamChan
28+ s .streamCond .Broadcast ()
2629}
2730
2831func (s * StreamSession ) getStreamChan (stream * spdystream.Stream ) chan * spdystream.Stream {
2932 if stream == nil {
3033 return s .streamChan
3134 }
35+ s .streamCond .L .Lock ()
36+ defer s .streamCond .L .Unlock ()
3237 streamChan , ok := s .subStreamChans [stream .String ()]
33- if ok {
34- return streamChan
38+ for ! ok {
39+ // TODO Test for stream being closed
40+ s .streamCond .Wait ()
41+ streamChan , ok = s .subStreamChans [stream .String ()]
3542 }
36- return s . streamChan
43+ return streamChan
3744}
3845
3946func (s * StreamSession ) newStreamHandler (stream * spdystream.Stream ) {
@@ -44,14 +51,21 @@ func (s *StreamSession) newStreamHandler(stream *spdystream.Stream) {
4451
4552// NewStreamSession creates a new stream session from the
4653// provided network connection. The network connection is
47- // expected to already provide a tls session.
54+ // expected to already provide a tls session. NewStreamSession
55+ // should only be called on client connections, server connections
56+ // should be created through ListenSession.
4857func NewStreamSession (conn net.Conn ) (* StreamSession , error ) {
58+ return newStreamSession (conn , false )
59+ }
60+
61+ func newStreamSession (conn net.Conn , server bool ) (* StreamSession , error ) {
4962 session := & StreamSession {
5063 streamChan : make (chan * spdystream.Stream ),
5164 subStreamChans : make (map [string ]chan * spdystream.Stream ),
65+ streamCond : sync .NewCond (new (sync.Mutex )),
5266 }
5367
54- spdyConn , spdyErr := spdystream .NewConnection (conn , false )
68+ spdyConn , spdyErr := spdystream .NewConnection (conn , server )
5569 if spdyErr != nil {
5670 return nil , spdyErr
5771 }
@@ -76,9 +90,21 @@ func (s *StreamSession) NewSender() (libchan.Sender, error) {
7690 if waitErr != nil {
7791 return nil , waitErr
7892 }
93+
94+ streamChan := make (chan * spdystream.Stream )
95+ s .addStreamChan (stream , streamChan )
7996 return & StreamSender {stream : stream , streamChans : s }, nil
8097}
8198
99+ // ReceiverWaits waits a new stream to be created and returns a
100+ // receiver with that stream.
101+ func (s * StreamSession ) ReceiverWait () (libchan.Receiver , error ) {
102+ stream := <- s .streamChan
103+ streamChan := make (chan * spdystream.Stream )
104+ s .addStreamChan (stream , streamChan )
105+ return & StreamReceiver {stream : stream , streamChans : s , ret : & StreamSender {stream : stream , streamChans : s }}, nil
106+ }
107+
82108// Close closes the underlying network connection, causing
83109// each stream created by this session to closed.
84110func (s * StreamSession ) Close () error {
@@ -115,6 +141,7 @@ func (s *StreamReceiver) Receive(mode int) (*libchan.Message, error) {
115141 } else {
116142 streamChan := s .streamChans .getStreamChan (s .stream )
117143 stream := <- streamChan
144+ s .streamChans .addStreamChan (stream , make (chan * spdystream.Stream ))
118145
119146 data , dataErr := extractDataHeader (stream .Headers ())
120147 if dataErr != nil {
0 commit comments