Skip to content

Commit 996cf18

Browse files
committed
Drain previous values on ReadTask
1 parent 241a083 commit 996cf18

1 file changed

Lines changed: 4 additions & 0 deletions

File tree

stream.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ func (p Pipeline) RunTask(ctx context.Context, task StreamTask) Pipeline {
111111

112112
func ReadFromTask(r io.Reader, format Format) StreamTask {
113113
return StreamFunc(func(s Stream) error {
114+
if err := Drain(s); err != nil {
115+
return err
116+
}
114117
dec := NewDecoder(r, format)
115118
for {
116119
v := new(Value)
@@ -122,6 +125,7 @@ func ReadFromTask(r io.Reader, format Format) StreamTask {
122125
// println("read err", err.Error())
123126
return err
124127
}
128+
125129
if !s.Push(v) {
126130
// println("push failed")
127131
return nil

0 commit comments

Comments
 (0)