Skip to content
This repository was archived by the owner on Jul 18, 2025. It is now read-only.

Commit c77230a

Browse files
committed
Update inmem pipe to use go channel
The inmemory pipe implementation may simply use go channels instead of using a pipe. Closes #92 Signed-off-by: Derek McGowan <derek@mcgstyle.net> (github: dmcgowan)
1 parent dbf3d8b commit c77230a

1 file changed

Lines changed: 25 additions & 157 deletions

File tree

inmem.go

Lines changed: 25 additions & 157 deletions
Original file line numberDiff line numberDiff line change
@@ -4,116 +4,31 @@ import (
44
"fmt"
55
"io"
66
"reflect"
7-
"sync"
87
)
98

10-
// Pipe returns an inmemory Sender/Receiver pair.
11-
func Pipe() (Receiver, Sender) {
12-
p := new(pipe)
13-
p.rwait.L = &p.l
14-
p.wwait.L = &p.l
15-
r := &pipeReceiver{p}
16-
w := &pipeSender{p}
17-
return r, w
18-
}
19-
20-
type pipe struct {
21-
rwait sync.Cond
22-
wwait sync.Cond
23-
l sync.Mutex
24-
rl sync.Mutex
25-
wl sync.Mutex
26-
rerr error // if reader closed, error to give writes
27-
werr error // if writer closed, error to give reads
28-
msg interface{}
29-
}
30-
31-
func (p *pipe) send(msg interface{}) error {
32-
var err error
33-
// One writer at a time.
34-
p.wl.Lock()
35-
defer p.wl.Unlock()
36-
37-
p.l.Lock()
38-
defer p.l.Unlock()
39-
p.msg = msg
40-
p.rwait.Signal()
41-
for {
42-
if p.msg == nil {
43-
break
44-
}
45-
if p.rerr != nil {
46-
err = p.rerr
47-
break
48-
}
49-
if p.werr != nil {
50-
err = io.ErrClosedPipe
51-
}
52-
p.wwait.Wait()
53-
}
54-
p.msg = nil // in case of rerr or werr
55-
return err
56-
}
57-
58-
func (p *pipe) receive() (interface{}, error) {
59-
p.rl.Lock()
60-
defer p.rl.Unlock()
9+
type pSender chan<- interface{}
10+
type pReceiver <-chan interface{}
6111

62-
p.l.Lock()
63-
defer p.l.Unlock()
64-
for {
65-
if p.rerr != nil {
66-
return nil, io.ErrClosedPipe
67-
}
68-
if p.msg != nil {
69-
break
70-
}
71-
if p.werr != nil {
72-
return nil, p.werr
73-
}
74-
p.rwait.Wait()
75-
}
76-
msg := p.msg
77-
p.msg = nil
78-
p.wwait.Signal()
79-
return msg, nil
80-
}
81-
82-
func (p *pipe) rclose(err error) {
83-
if err == nil {
84-
err = io.ErrClosedPipe
85-
}
86-
p.l.Lock()
87-
defer p.l.Unlock()
88-
p.rerr = err
89-
p.rwait.Signal()
90-
p.wwait.Signal()
12+
func (s pSender) Close() error {
13+
close(s)
14+
return nil
9115
}
9216

93-
func (p *pipe) wclose(err error) {
94-
if err == nil {
95-
err = io.EOF
96-
}
97-
p.l.Lock()
98-
defer p.l.Unlock()
99-
p.werr = err
100-
p.rwait.Signal()
101-
p.wwait.Signal()
17+
func (s pSender) Send(msg interface{}) error {
18+
s <- msg
19+
return nil
10220
}
10321

10422
type messageDecoder interface {
10523
Decode(v ...interface{}) error
10624
}
10725

108-
type pipeReceiver struct {
109-
p *pipe
110-
}
111-
112-
func (r *pipeReceiver) Receive(msg interface{}) error {
113-
rmsg, err := r.p.receive()
114-
if err != nil {
115-
return err
26+
func (r pReceiver) Receive(msg interface{}) error {
27+
rmsg, ok := <-r
28+
if !ok {
29+
return io.EOF
11630
}
31+
11732
// check type
11833
v := reflect.ValueOf(msg)
11934
rv := reflect.ValueOf(rmsg)
@@ -140,76 +55,29 @@ func (r *pipeReceiver) Receive(msg interface{}) error {
14055
return nil
14156
}
14257

143-
func (r *pipeReceiver) SendTo(dst Sender) (int, error) {
58+
func (r pReceiver) SendTo(dst Sender) (int, error) {
14459
var n int
145-
// If the destination is a pipeSender, we can cheat
146-
pdst, ok := dst.(*pipeSender)
147-
if !ok {
148-
return 0, ErrIncompatibleSender
149-
}
150-
for {
151-
pmsg, err := r.p.receive()
152-
if err == io.EOF {
153-
break
154-
}
155-
if err != nil {
156-
return n, err
157-
}
158-
if err := pdst.p.send(pmsg); err != nil {
159-
return n, err
160-
}
161-
n++
162-
}
163-
return n, nil
164-
}
165-
166-
func (r *pipeReceiver) Close() error {
167-
return r.CloseWithError(nil)
168-
}
169-
170-
func (r *pipeReceiver) CloseWithError(err error) error {
171-
r.p.rclose(err)
172-
return nil
173-
}
174-
175-
// pipeSender
176-
177-
type pipeSender struct {
178-
p *pipe
179-
}
180-
181-
func (w *pipeSender) Send(msg interface{}) error {
182-
return w.p.send(msg)
183-
}
184-
185-
func (w *pipeSender) ReceiveFrom(src Receiver) (int, error) {
186-
var n int
187-
// If the destination is a pipeReceiver, we can cheat
188-
psrc, ok := src.(*pipeReceiver)
189-
if !ok {
190-
return 0, ErrIncompatibleReceiver
191-
}
19260
for {
193-
pmsg, err := psrc.p.receive()
194-
if err == io.EOF {
61+
pmsg, ok := <-r
62+
if !ok {
19563
break
19664
}
197-
if err != nil {
198-
return n, err
199-
}
200-
if err := w.p.send(pmsg); err != nil {
65+
if err := dst.Send(pmsg); err != nil {
20166
return n, err
20267
}
20368
n++
20469
}
20570
return n, nil
20671
}
20772

208-
func (w *pipeSender) Close() error {
209-
return w.CloseWithError(nil)
73+
// Pipe returns an inmemory Sender/Receiver pair.
74+
func Pipe() (Receiver, Sender) {
75+
c := make(chan interface{})
76+
return pReceiver(c), pSender(c)
21077
}
21178

212-
func (w *pipeSender) CloseWithError(err error) error {
213-
w.p.wclose(err)
214-
return nil
79+
// BufferedPipe returns an inmemory buffered pipe.
80+
func BufferedPipe(n int) (Receiver, Sender) {
81+
c := make(chan interface{}, n)
82+
return pReceiver(c), pSender(c)
21583
}

0 commit comments

Comments
 (0)