@@ -11,6 +11,7 @@ import (
1111 "github.com/dmcgowan/go/codec"
1212)
1313
14+ // Pipe returns an inmemory Sender/Receiver pair.
1415func Pipe () (Receiver , Sender ) {
1516 session := createStreamSession ()
1617 return session .createPipe ()
@@ -25,15 +26,15 @@ type streamSession struct {
2526 handler codec.Handle
2627
2728 referenceLock sync.Mutex
28- referenceId uint64
29+ referenceID uint64
2930 byteStreams map [uint64 ]* byteStream
3031}
3132
3233func createStreamSession () * streamSession {
3334 session := & streamSession {
3435 pipeReaders : make (map [uint64 ]* io.PipeReader ),
3536 pipeWriters : make (map [uint64 ]* io.PipeWriter ),
36- referenceId : 2 ,
37+ referenceID : 2 ,
3738 byteStreams : make (map [uint64 ]* byteStream ),
3839 }
3940 session .handler = getMsgPackHandler (session )
@@ -43,110 +44,110 @@ func createStreamSession() *streamSession {
4344func (s * streamSession ) createPipe () (Receiver , Sender ) {
4445 r , w := io .Pipe ()
4546 s .pipeLock .Lock ()
46- pipeId := s .pipeCount + 1
47- s .pipeCount = pipeId
48- s .pipeReaders [pipeId ] = r
49- s .pipeWriters [pipeId ] = w
47+ pipeID := s .pipeCount + 1
48+ s .pipeCount = pipeID
49+ s .pipeReaders [pipeID ] = r
50+ s .pipeWriters [pipeID ] = w
5051 s .pipeLock .Unlock ()
5152
52- recv := & pipeReceiver {pipeId , s , r , codec .NewDecoder (r , s .handler )}
53- send := & pipeSender {pipeId , s , w , codec .NewEncoder (w , s .handler )}
53+ recv := & pipeReceiver {pipeID , s , r , codec .NewDecoder (r , s .handler )}
54+ send := & pipeSender {pipeID , s , w , codec .NewEncoder (w , s .handler )}
5455 return recv , send
5556}
5657
5758func (s * streamSession ) newByteStream () (io.ReadWriteCloser , error ) {
5859 c1 , c2 := net .Pipe ()
5960 bs := & byteStream {
6061 Conn : c1 ,
61- referenceId : s .referenceId ,
62+ referenceID : s .referenceID ,
6263 }
6364 s .referenceLock .Lock ()
64- s .byteStreams [s .referenceId ] = bs
65- s .byteStreams [s .referenceId + 1 ] = & byteStream {
65+ s .byteStreams [s .referenceID ] = bs
66+ s .byteStreams [s .referenceID + 1 ] = & byteStream {
6667 Conn : c2 ,
67- referenceId : s .referenceId + 1 ,
68+ referenceID : s .referenceID + 1 ,
6869 session : s ,
6970 }
70- s .referenceId = s .referenceId + 2
71+ s .referenceID = s .referenceID + 2
7172 s .referenceLock .Unlock ()
7273
7374 return bs , nil
7475}
7576
7677func (s * streamSession ) encodeReceiver (v reflect.Value ) ([]byte , error ) {
7778 bs := v .Interface ().(pipeReceiver )
78- if bs .pipeId == 0 {
79+ if bs .pipeID == 0 {
7980 return nil , errors .New ("bad type" )
8081 }
8182 var buf [8 ]byte
82- written := binary .PutUvarint (buf [:], uint64 (bs .pipeId ))
83+ written := binary .PutUvarint (buf [:], uint64 (bs .pipeID ))
8384
8485 return buf [:written ], nil
8586}
8687
8788func (s * streamSession ) decodeReceiver (v reflect.Value , b []byte ) error {
88- pipeId , readN := binary .Uvarint (b )
89+ pipeID , readN := binary .Uvarint (b )
8990 if readN == 0 {
9091 return errors .New ("bad reference id" )
9192 }
9293
93- r , ok := s .pipeReaders [pipeId ]
94+ r , ok := s .pipeReaders [pipeID ]
9495 if ! ok {
95- return errors .New ("Receiver does not exist" )
96+ return errors .New ("receiver does not exist" )
9697 }
9798
98- v .Set (reflect .ValueOf (pipeReceiver {pipeId , s , r , codec .NewDecoder (r , s .handler )}))
99+ v .Set (reflect .ValueOf (pipeReceiver {pipeID , s , r , codec .NewDecoder (r , s .handler )}))
99100
100101 return nil
101102}
102103
103104func (s * streamSession ) encodeSender (v reflect.Value ) ([]byte , error ) {
104105 sender := v .Interface ().(pipeSender )
105- if sender .pipeId == 0 {
106+ if sender .pipeID == 0 {
106107 return nil , errors .New ("bad type" )
107108 }
108109 var buf [8 ]byte
109- written := binary .PutUvarint (buf [:], uint64 (sender .pipeId ))
110+ written := binary .PutUvarint (buf [:], uint64 (sender .pipeID ))
110111
111112 return buf [:written ], nil
112113}
113114
114115func (s * streamSession ) decodeSender (v reflect.Value , b []byte ) error {
115- pipeId , readN := binary .Uvarint (b )
116+ pipeID , readN := binary .Uvarint (b )
116117 if readN == 0 {
117118 return errors .New ("bad reference id" )
118119 }
119120
120- w , ok := s .pipeWriters [pipeId ]
121+ w , ok := s .pipeWriters [pipeID ]
121122 if ! ok {
122- return errors .New ("Receiver does not exist" )
123+ return errors .New ("receiver does not exist" )
123124 }
124125
125- v .Set (reflect .ValueOf (pipeSender {pipeId , s , w , codec .NewEncoder (w , s .handler )}))
126+ v .Set (reflect .ValueOf (pipeSender {pipeID , s , w , codec .NewEncoder (w , s .handler )}))
126127
127128 return nil
128129}
129130
130131func (s * streamSession ) encodeStream (v reflect.Value ) ([]byte , error ) {
131132 bs := v .Interface ().(byteStream )
132- if bs .referenceId == 0 {
133+ if bs .referenceID == 0 {
133134 return nil , errors .New ("bad type" )
134135 }
135136 var buf [8 ]byte
136- written := binary .PutUvarint (buf [:], uint64 (bs .referenceId )^ 0x01 )
137+ written := binary .PutUvarint (buf [:], uint64 (bs .referenceID )^ 0x01 )
137138
138139 return buf [:written ], nil
139140}
140141
141142func (s * streamSession ) decodeStream (v reflect.Value , b []byte ) error {
142- referenceId , readN := binary .Uvarint (b )
143+ referenceID , readN := binary .Uvarint (b )
143144 if readN == 0 {
144145 return errors .New ("bad reference id" )
145146 }
146147
147- bs , ok := s .byteStreams [referenceId ]
148+ bs , ok := s .byteStreams [referenceID ]
148149 if ! ok {
149- return errors .New ("Byte stream does not exist" )
150+ return errors .New ("byte stream does not exist" )
150151 }
151152
152153 if bs != nil {
@@ -211,7 +212,7 @@ func getMsgPackHandler(session *streamSession) *codec.MsgpackHandle {
211212}
212213
213214type pipeSender struct {
214- pipeId uint64
215+ pipeID uint64
215216 session * streamSession
216217 p * io.PipeWriter
217218 encoder * codec.Encoder
@@ -241,7 +242,7 @@ func (w *pipeSender) CreateNestedSender() (Sender, Receiver, error) {
241242}
242243
243244type pipeReceiver struct {
244- pipeId uint64
245+ pipeID uint64
245246 session * streamSession
246247 p * io.PipeReader
247248 decoder * codec.Decoder
@@ -257,6 +258,6 @@ func (r *pipeReceiver) Close() error {
257258
258259type byteStream struct {
259260 net.Conn
260- referenceId uint64
261+ referenceID uint64
261262 session * streamSession
262263}
0 commit comments