@@ -32,23 +32,37 @@ func (c *channel) copyValue(v interface{}) (interface{}, error) {
3232 }
3333 return c .copySender (val )
3434 }
35+ case * channelWrapper :
36+ if val .direction == inbound {
37+ return c .copyReceiver (val )
38+ }
39+ return c .copySender (val )
3540 case * net.TCPConn :
3641 // Do nothing until socket support is added
3742 case * net.UDPConn :
3843 // Do nothing until socket support is added
3944 case io.ReadWriteCloser :
4045 return c .copyByteStream (val )
46+ case io.ReadCloser :
47+ return c .copyByteReadStream (val )
48+ case io.WriteCloser :
49+ return c .copyByteWriteStream (val )
4150 case libchan.Sender :
4251 return c .copySender (val )
4352 case libchan.Receiver :
4453 return c .copyReceiver (val )
4554 case map [string ]interface {}:
4655 return c .copyChannelMessage (val )
47- case struct {}:
48- return c .copyStructure (v )
49- case * struct {}:
50- return c .copyStructure (v )
56+ case map [interface {}]interface {}:
57+ return c .copyChannelInterfaceMessage (val )
5158 default :
59+ if rv := reflect .ValueOf (v ); rv .Kind () == reflect .Ptr {
60+ if rv .Elem ().Kind () == reflect .Struct {
61+ return c .copyStructValue (rv .Elem ())
62+ }
63+ } else if rv .Kind () == reflect .Struct {
64+ return c .copyStructValue (rv )
65+ }
5266 }
5367 return v , nil
5468}
@@ -93,6 +107,31 @@ func (c *channel) copyByteStream(stream io.ReadWriteCloser) (io.ReadWriteCloser,
93107 return streamCopy , nil
94108}
95109
110+ func (c * channel ) copyByteReadStream (stream io.ReadCloser ) (io.ReadCloser , error ) {
111+ streamCopy , err := c .session .createByteStream ()
112+ if err != nil {
113+ return nil , err
114+ }
115+ go func () {
116+ io .Copy (streamCopy , stream )
117+ streamCopy .Close ()
118+ stream .Close ()
119+ }()
120+ return streamCopy , nil
121+ }
122+
123+ func (c * channel ) copyByteWriteStream (stream io.WriteCloser ) (io.WriteCloser , error ) {
124+ streamCopy , err := c .session .createByteStream ()
125+ if err != nil {
126+ return nil , err
127+ }
128+ go func () {
129+ io .Copy (stream , streamCopy )
130+ stream .Close ()
131+ }()
132+ return streamCopy , nil
133+ }
134+
96135func (c * channel ) copyChannelMessage (m map [string ]interface {}) (interface {}, error ) {
97136 mCopy := make (map [string ]interface {})
98137 for k , v := range m {
@@ -106,6 +145,23 @@ func (c *channel) copyChannelMessage(m map[string]interface{}) (interface{}, err
106145 return mCopy , nil
107146}
108147
148+ func (c * channel ) copyChannelInterfaceMessage (m map [interface {}]interface {}) (interface {}, error ) {
149+ mCopy := make (map [string ]interface {})
150+ for k , v := range m {
151+ vCopy , vErr := c .copyValue (v )
152+ if vErr != nil {
153+ return nil , vErr
154+ }
155+ keyStr , ok := k .(string )
156+ if ! ok {
157+ return nil , errors .New ("invalid non string key" )
158+ }
159+ mCopy [keyStr ] = vCopy
160+ }
161+
162+ return mCopy , nil
163+ }
164+
109165func (c * channel ) copyStructure (m interface {}) (interface {}, error ) {
110166 v := reflect .ValueOf (m )
111167 if v .Kind () == reflect .Ptr {
@@ -114,6 +170,10 @@ func (c *channel) copyStructure(m interface{}) (interface{}, error) {
114170 if v .Kind () != reflect .Struct {
115171 return nil , errors .New ("invalid non struct type" )
116172 }
173+ return c .copyStructValue (v )
174+ }
175+
176+ func (c * channel ) copyStructValue (v reflect.Value ) (interface {}, error ) {
117177 mCopy := make (map [string ]interface {})
118178 t := v .Type ()
119179 for i := 0 ; i < v .NumField (); i ++ {
0 commit comments