@@ -60,6 +60,7 @@ func (s *streamSession) newByteStream() (io.ReadWriteCloser, error) {
6060 bs := & byteStream {
6161 Conn : c1 ,
6262 referenceID : s .referenceID ,
63+ session : s ,
6364 }
6465 s .referenceLock .Lock ()
6566 s .byteStreams [s .referenceID ] = bs
@@ -157,53 +158,62 @@ func (s *streamSession) decodeStream(v reflect.Value, b []byte) error {
157158 return nil
158159}
159160
160- func (s * streamSession ) encodeWrapper (v reflect.Value ) ([]byte , error ) {
161- wrapper := v .Interface ().(ByteStreamWrapper )
162- bs , err := s .newByteStream ()
163- if err != nil {
164- return nil , err
165- }
161+ // Wrappers to cue a copy of a previously decoded object
166162
167- go func () {
168- io .Copy (bs , wrapper )
169- bs .Close ()
170- }()
163+ type pipeSenderWrapper struct { * pipeSender }
164+ type pipeReceiverWrapper struct { * pipeReceiver }
165+ type byteStreamWrapper struct { * byteStream }
171166
172- go func () {
173- io .Copy (wrapper , bs )
174- wrapper .Close ()
175- }()
167+ func (s * streamSession ) decodeSenderWrapper (v reflect.Value , b []byte ) error {
168+ ps := & pipeSender {}
169+ v .FieldByName ("pipeSender" ).Set (reflect .ValueOf (ps ))
170+ return s .decodeSender (reflect .ValueOf (ps ).Elem (), b )
171+ }
176172
177- return s .encodeStream (reflect .ValueOf (bs ).Elem ())
173+ func (s * streamSession ) decodeReceiverWrapper (v reflect.Value , b []byte ) error {
174+ pr := & pipeReceiver {}
175+ v .FieldByName ("pipeReceiver" ).Set (reflect .ValueOf (pr ))
176+ return s .decodeReceiver (reflect .ValueOf (pr ).Elem (), b )
178177}
179178
180- func (s * streamSession ) decodeWrapper (v reflect.Value , b []byte ) error {
179+ func (s * streamSession ) decodeByteStreamWrapper (v reflect.Value , b []byte ) error {
181180 bs := & byteStream {}
182- s .decodeStream (reflect .ValueOf (bs ).Elem (), b )
183- v .FieldByName ("ReadWriteCloser" ).Set (reflect .ValueOf (bs ))
184- return nil
181+ v .FieldByName ("byteStream" ).Set (reflect .ValueOf (bs ))
182+ return s .decodeStream (reflect .ValueOf (bs ).Elem (), b )
185183}
186184
185+ // Internal definitions early, does not follow protocol because does not
186+ // interact with external processes
187187func getMsgPackHandler (session * streamSession ) * codec.MsgpackHandle {
188188 mh := & codec.MsgpackHandle {WriteExt : true }
189189 mh .RawToString = true
190190
191- err := mh .AddExt (reflect .TypeOf (pipeReceiver {}), 1 , session . encodeReceiver , session .decodeReceiver )
191+ err := mh .AddExt (reflect .TypeOf (pipeReceiverWrapper {}), 1 , nil , session .decodeReceiverWrapper )
192192 if err != nil {
193193 panic (err )
194194 }
195195
196- err = mh .AddExt (reflect .TypeOf (pipeSender {}), 2 , session .encodeSender , session . decodeSender )
196+ err = mh .AddExt (reflect .TypeOf (pipeReceiver {}), 1 , session .encodeReceiver , nil )
197197 if err != nil {
198198 panic (err )
199199 }
200200
201- err = mh .AddExt (reflect .TypeOf (byteStream {}), 3 , session . encodeStream , session .decodeStream )
201+ err = mh .AddExt (reflect .TypeOf (pipeSenderWrapper {}), 2 , nil , session .decodeSenderWrapper )
202202 if err != nil {
203203 panic (err )
204204 }
205205
206- err = mh .AddExt (reflect .TypeOf (ByteStreamWrapper {}), 4 , session .encodeWrapper , session .decodeWrapper )
206+ err = mh .AddExt (reflect .TypeOf (pipeSender {}), 2 , session .encodeSender , nil )
207+ if err != nil {
208+ panic (err )
209+ }
210+
211+ err = mh .AddExt (reflect .TypeOf (byteStreamWrapper {}), 3 , nil , session .decodeByteStreamWrapper )
212+ if err != nil {
213+ panic (err )
214+ }
215+
216+ err = mh .AddExt (reflect .TypeOf (byteStream {}), 3 , session .encodeStream , nil )
207217 if err != nil {
208218 panic (err )
209219 }
@@ -219,7 +229,11 @@ type pipeSender struct {
219229}
220230
221231func (w * pipeSender ) Send (message interface {}) error {
222- return w .encoder .Encode (message )
232+ mCopy , mErr := w .copyMessage (message )
233+ if mErr != nil {
234+ return mErr
235+ }
236+ return w .encoder .Encode (mCopy )
223237}
224238
225239func (w * pipeSender ) Close () error {
@@ -261,3 +275,115 @@ type byteStream struct {
261275 referenceID uint64
262276 session * streamSession
263277}
278+
279+ func (w * pipeSender ) copyMessage (message interface {}) (interface {}, error ) {
280+ mapCopy , mapOk := message .(map [string ]interface {})
281+ if mapOk {
282+ return w .copyChannelMessage (mapCopy )
283+ }
284+ return w .copyStructure (message )
285+ }
286+
287+ func (w * pipeSender ) copyValue (v interface {}) (interface {}, error ) {
288+ switch val := v .(type ) {
289+ case * byteStream :
290+ if val .session != w .session {
291+ return w .copyByteStream (val )
292+ }
293+ case * pipeSender :
294+ if val .session != w .session {
295+ return w .copySender (val )
296+ }
297+ case * pipeReceiver :
298+ if val .session != w .session {
299+ return w .copyReceiver (val )
300+ }
301+ case io.ReadWriteCloser :
302+ return w .copyByteStream (val )
303+ case Sender :
304+ return w .copySender (val )
305+ case Receiver :
306+ return w .copyReceiver (val )
307+ case map [string ]interface {}:
308+ return w .copyChannelMessage (val )
309+ case struct {}:
310+ return w .copyStructure (v )
311+ case * struct {}:
312+ return w .copyStructure (v )
313+ default :
314+ }
315+ return v , nil
316+ }
317+
318+ func (w * pipeSender ) copySender (val Sender ) (Sender , error ) {
319+ recv , send , err := w .CreateNestedReceiver ()
320+ if err != nil {
321+ return nil , err
322+ }
323+ go func () {
324+ Copy (val , recv )
325+ val .Close ()
326+ }()
327+ return send , nil
328+ }
329+
330+ func (w * pipeSender ) copyReceiver (val Receiver ) (Receiver , error ) {
331+ send , recv , err := w .CreateNestedSender ()
332+ if err != nil {
333+ return nil , err
334+ }
335+ go func () {
336+ Copy (send , val )
337+ send .Close ()
338+ }()
339+ return recv , nil
340+ }
341+
342+ func (w * pipeSender ) copyByteStream (stream io.ReadWriteCloser ) (io.ReadWriteCloser , error ) {
343+ streamCopy , err := w .session .newByteStream ()
344+ if err != nil {
345+ return nil , err
346+ }
347+ go func () {
348+ io .Copy (streamCopy , stream )
349+ streamCopy .Close ()
350+ }()
351+ go func () {
352+ io .Copy (stream , streamCopy )
353+ stream .Close ()
354+ }()
355+ return streamCopy , nil
356+ }
357+
358+ func (w * pipeSender ) copyChannelMessage (m map [string ]interface {}) (interface {}, error ) {
359+ mCopy := make (map [string ]interface {})
360+ for k , v := range m {
361+ vCopy , vErr := w .copyValue (v )
362+ if vErr != nil {
363+ return nil , vErr
364+ }
365+ mCopy [k ] = vCopy
366+ }
367+
368+ return mCopy , nil
369+ }
370+
371+ func (w * pipeSender ) copyStructure (m interface {}) (interface {}, error ) {
372+ v := reflect .ValueOf (m )
373+ if v .Kind () == reflect .Ptr {
374+ v = v .Elem ()
375+ }
376+ if v .Kind () != reflect .Struct {
377+ return nil , errors .New ("invalid non struct type" )
378+ }
379+ mCopy := make (map [string ]interface {})
380+ t := v .Type ()
381+ for i := 0 ; i < v .NumField (); i ++ {
382+ vCopy , vErr := w .copyValue (v .Field (i ).Interface ())
383+ if vErr != nil {
384+ return nil , vErr
385+ }
386+ mCopy [t .Field (i ).Name ] = vCopy
387+ }
388+ return mCopy , nil
389+ }
0 commit comments