Skip to content
This repository was archived by the owner on Jul 18, 2025. It is now read-only.

Commit 0636b9d

Browse files
author
Solomon Hykes
committed
Merge pull request #38 from dmcgowan/interface_update
2 parents 459978d + 3c28884 commit 0636b9d

41 files changed

Lines changed: 2772 additions & 2814 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

copy.go

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,36 +2,29 @@ package libchan
22

33
import (
44
"io"
5-
"sync"
65
)
76

8-
func Copy(dst Sender, src Receiver) (int, error) {
9-
var tasks sync.WaitGroup
10-
defer tasks.Wait()
11-
if senderTo, ok := src.(SenderTo); ok {
12-
if n, err := senderTo.SendTo(dst); err != ErrIncompatibleSender {
13-
return n, err
14-
}
15-
}
16-
if receiverFrom, ok := dst.(ReceiverFrom); ok {
17-
if n, err := receiverFrom.ReceiveFrom(src); err != ErrIncompatibleReceiver {
18-
return n, err
19-
}
20-
}
21-
var (
22-
n int
23-
)
7+
// Copy copies from a receiver to a sender until an EOF is
8+
// received. The number of copies made is returned along
9+
// with any error that may have halted copying prior to an EOF.
10+
func Copy(w Sender, r Receiver) (int, error) {
11+
var n int
2412
for {
25-
msg, err := src.Receive(Ret)
26-
if err == io.EOF {
27-
return n, nil
28-
}
13+
m := make(map[string]interface{})
14+
err := r.Receive(&m)
2915
if err != nil {
30-
return n, err
16+
if err == io.EOF {
17+
break
18+
} else {
19+
return n, err
20+
}
3121
}
32-
if _, err := dst.Send(msg); err != nil {
22+
23+
err = w.Send(m)
24+
if err != nil {
3325
return n, err
3426
}
3527
n++
3628
}
29+
return n, nil
3730
}

copy_test.go

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
package libchan
2+
3+
import (
4+
"io"
5+
"net"
6+
"os"
7+
"runtime/pprof"
8+
"testing"
9+
"time"
10+
)
11+
12+
type ProxiedMessage struct {
13+
Message string
14+
Ret Sender
15+
}
16+
17+
type ProxyAckMessage struct {
18+
N int
19+
MessageLen int
20+
}
21+
22+
func TestChannelProxy(t *testing.T) {
23+
messages := []string{
24+
"Proxied messages",
25+
"Another proxied message",
26+
"Far less interesting message",
27+
"This was ALSO sent over the proxy",
28+
}
29+
client := func(t *testing.T, sender Sender) {
30+
for i, m := range messages {
31+
nestedReceiver, remoteSender := Pipe()
32+
33+
message := &ProxiedMessage{
34+
Message: m,
35+
Ret: remoteSender,
36+
}
37+
38+
err := sender.Send(message)
39+
if err != nil {
40+
t.Fatalf("Error sending message: %s", err)
41+
}
42+
43+
ack := &ProxyAckMessage{}
44+
err = nestedReceiver.Receive(ack)
45+
if err != nil {
46+
t.Fatalf("Error receiving ack: %s", err)
47+
}
48+
49+
if ack.N != i {
50+
t.Fatalf("Unexpected ack value\n\tExpected: %d\n\tActual: %d", i, ack.N)
51+
}
52+
53+
if ack.MessageLen != len(m) {
54+
t.Fatalf("Unexpected ack value\n\tExpected: %d\n\tActual: %d", len(m), ack.MessageLen)
55+
}
56+
}
57+
58+
}
59+
server := func(t *testing.T, receiver Receiver) {
60+
for i, m := range messages {
61+
message := &ProxiedMessage{}
62+
err := receiver.Receive(message)
63+
if err != nil {
64+
t.Fatalf("Error receiving message: %s", err)
65+
}
66+
67+
if message.Message != m {
68+
t.Fatalf("Unexpected message:\n\tExpected: %s\n\tActual: %s", m, message.Message)
69+
}
70+
71+
ack := &ProxyAckMessage{N: i, MessageLen: len(message.Message)}
72+
err = message.Ret.Send(ack)
73+
if err != nil {
74+
t.Fatalf("Error sending ack: %s", err)
75+
}
76+
}
77+
}
78+
SpawnProxyTest(t, client, server, 4)
79+
}
80+
81+
type ProxiedStreamMessage struct {
82+
Stream io.ReadWriteCloser
83+
}
84+
85+
func TestByteStreamProxy(t *testing.T) {
86+
sendString := "Sending a string"
87+
retString := "Returned string"
88+
client := func(t *testing.T, sender Sender) {
89+
bs, bsRemote := net.Pipe()
90+
91+
message := &ProxiedStreamMessage{
92+
Stream: bsRemote,
93+
}
94+
95+
err := sender.Send(message)
96+
if err != nil {
97+
t.Fatalf("Error sending message: %s", err)
98+
}
99+
100+
_, err = bs.Write([]byte(sendString))
101+
if err != nil {
102+
t.Fatalf("Error writing bytes: %s", err)
103+
}
104+
105+
buf := make([]byte, 30)
106+
n, err := bs.Read(buf)
107+
if string(buf[:n]) != retString {
108+
t.Fatalf("Unexpected string value:\n\tExpected: %s\n\tActual: %s", retString, string(buf[:n]))
109+
}
110+
}
111+
server := func(t *testing.T, receiver Receiver) {
112+
message := &ProxiedStreamMessage{}
113+
err := receiver.Receive(message)
114+
if err != nil {
115+
t.Fatalf("Error receiving message: %s", err)
116+
}
117+
118+
buf := make([]byte, 30)
119+
n, err := message.Stream.Read(buf)
120+
if string(buf[:n]) != sendString {
121+
t.Fatalf("Unexpected string value:\n\tExpected: %s\n\tActual: %s", sendString, string(buf[:n]))
122+
}
123+
124+
_, err = message.Stream.Write([]byte(retString))
125+
if err != nil {
126+
t.Fatalf("Error writing bytes: %s", err)
127+
}
128+
}
129+
SpawnProxyTest(t, client, server, 1)
130+
}
131+
132+
func SpawnProxyTest(t *testing.T, client SendTestRoutine, server ReceiveTestRoutine, proxyCount int) {
133+
endClient := make(chan bool)
134+
endServer := make(chan bool)
135+
endProxy := make(chan bool)
136+
137+
receiver1, sender1 := Pipe()
138+
receiver2, sender2 := Pipe()
139+
140+
go func() {
141+
defer close(endProxy)
142+
n, err := Copy(sender2, receiver1)
143+
if err != nil {
144+
t.Errorf("Error proxying: %s", err)
145+
}
146+
err = sender2.Close()
147+
if err != nil {
148+
t.Errorf("Error closing sender: %s", err)
149+
}
150+
if n != proxyCount {
151+
t.Errorf("Wrong proxy count\n\tExpected: %d\n\tActual: %d", proxyCount, n)
152+
}
153+
}()
154+
155+
go func() {
156+
defer close(endClient)
157+
client(t, sender1)
158+
err := sender1.Close()
159+
if err != nil {
160+
t.Errorf("Error closing sender: %s", err)
161+
}
162+
}()
163+
164+
go func() {
165+
defer close(endServer)
166+
server(t, receiver2)
167+
}()
168+
169+
timeout := time.After(RoutineTimeout)
170+
171+
for endClient != nil || endServer != nil {
172+
select {
173+
case <-endProxy:
174+
if t.Failed() {
175+
t.Fatal("Proxy failed")
176+
}
177+
endClient = nil
178+
case <-endClient:
179+
if t.Failed() {
180+
t.Fatal("Client failed")
181+
}
182+
endClient = nil
183+
case <-endServer:
184+
if t.Failed() {
185+
t.Fatal("Server failed")
186+
}
187+
endServer = nil
188+
case <-timeout:
189+
if DumpStackOnTimeout {
190+
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
191+
}
192+
t.Fatal("Timeout")
193+
}
194+
}
195+
}

data/data.go

Lines changed: 0 additions & 119 deletions
This file was deleted.

0 commit comments

Comments
 (0)