fix(write): fix close logic to avoid race from read error

In the case that there is a read error, we would close the lines
channel before sending the error into the read error channel. closing
lines then allows the write goroutine to possibly send in a nil error
before read is able to, causing the main function driving both to
return a nil error. Additionally, it is possible for both reads and
writes to race sending errors into their channels, and the main
goroutine will only read from one, causing the other goroutine to leak.

To fix this, we close lines only after we have sent an error into
the channel, we ensure we read from both errors to make sure that
both have exited, and we unify the channels and add a buffer of size
two to the channel. It is possible for write to exit leaving read blocked
forever, but write only exits with a nil error when read has exited, so
this only happens during an actual write error, just like before.

Channels are hard.
pull/10616/head
Jeff Wendling 2018-12-29 15:08:09 -07:00
parent 3429e8d0c6
commit 9da6fb3410
2 changed files with 13 additions and 19 deletions

View File

@ -39,35 +39,31 @@ func (b *Batcher) Write(ctx context.Context, org, bucket platform.ID, r io.Reade
lines := make(chan []byte)
writeErrC := make(chan error)
go b.write(ctx, org, bucket, lines, writeErrC)
errC := make(chan error, 2)
go b.write(ctx, org, bucket, lines, errC)
go b.read(ctx, r, lines, errC)
readErrC := make(chan error)
go b.read(ctx, r, lines, readErrC)
// loop is needed in the case that the read finishes without an
// error, but, the write has yet to complete.
for {
// we loop twice to check if both read and write have an error. if read exits
// cleanly, then we still want to wait for write.
for i := 0; i < 2; i++ {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-readErrC:
// only exit if the read has an error
// read will have closed the lines channel signaling write to exit
case err := <-errC:
// onky if there is any error, exit immediately.
if err != nil {
return err
}
case err := <-writeErrC:
// if write finishes, exit immediately. reads may block forever
return err
}
}
return nil
}
// read will close the line channel when there is no more data, or an error occurs.
// it is possible for an io.Reader to block forever; Write's context can be
// used to cancel, but, it's possible there will be dangling read go routines.
func (b *Batcher) read(ctx context.Context, r io.Reader, lines chan<- []byte, errC chan<- error) {
defer close(lines)
scanner := bufio.NewScanner(r)
scanner.Split(ScanLines)
for scanner.Scan() {
@ -75,12 +71,10 @@ func (b *Batcher) read(ctx context.Context, r io.Reader, lines chan<- []byte, er
select {
case lines <- scanner.Bytes():
case <-ctx.Done():
close(lines)
errC <- ctx.Err()
return
}
}
close(lines)
errC <- scanner.Err()
}

View File

@ -90,7 +90,7 @@ func TestBatcher_read(t *testing.T) {
args: args{
r: strings.NewReader("m1,t1=v1 f1=1\nm2,t2=v2 f2=2"),
lines: make(chan []byte),
errC: make(chan error),
errC: make(chan error, 1),
},
want: []string{"m1,t1=v1 f1=1\n", "m2,t2=v2 f2=2"},
},
@ -100,7 +100,7 @@ func TestBatcher_read(t *testing.T) {
cancel: true,
r: strings.NewReader("m1,t1=v1 f1=1"),
lines: make(chan []byte),
errC: make(chan error),
errC: make(chan error, 1),
},
want: []string{},
wantErr: true,
@ -110,7 +110,7 @@ func TestBatcher_read(t *testing.T) {
args: args{
r: &errorReader{},
lines: make(chan []byte),
errC: make(chan error),
errC: make(chan error, 1),
},
want: []string{},
wantErr: true,