From 9da6fb34101c617b72514a5b0c02ce021ac01216 Mon Sep 17 00:00:00 2001 From: Jeff Wendling Date: Sat, 29 Dec 2018 15:08:09 -0700 Subject: [PATCH] fix(write): fix close logic to avoid race from read error In the case that there is a read error, we would close the lines channel before sending the error into the read error channel. closing lines then allows the write goroutine to possibly send in a nil error before read is able to, causing the main function driving both to return a nil error. Additionally, it is possible for both reads and writes to race sending errors into their channels, and the main goroutine will only read from one, causing the other goroutine to leak. To fix this, we close lines only after we have sent an error into the channel, we ensure we read from both errors to make sure that both have exited, and we unify the channels and add a buffer of size two to the channel. It is possible for write to exit leaving read blocked forever, but write only exits with a nil error when read has exited, so this only happens during an actual write error, just like before. Channels are hard. --- write/batcher.go | 26 ++++++++++---------------- write/batcher_test.go | 6 +++--- 2 files changed, 13 insertions(+), 19 deletions(-) diff --git a/write/batcher.go b/write/batcher.go index a7ece64abd..0aef2c1b3b 100644 --- a/write/batcher.go +++ b/write/batcher.go @@ -39,35 +39,31 @@ func (b *Batcher) Write(ctx context.Context, org, bucket platform.ID, r io.Reade lines := make(chan []byte) - writeErrC := make(chan error) - go b.write(ctx, org, bucket, lines, writeErrC) + errC := make(chan error, 2) + go b.write(ctx, org, bucket, lines, errC) + go b.read(ctx, r, lines, errC) - readErrC := make(chan error) - go b.read(ctx, r, lines, readErrC) - - // loop is needed in the case that the read finishes without an - // error, but, the write has yet to complete. - for { + // we loop twice to check if both read and write have an error. if read exits + // cleanly, then we still want to wait for write. + for i := 0; i < 2; i++ { select { case <-ctx.Done(): return ctx.Err() - case err := <-readErrC: - // only exit if the read has an error - // read will have closed the lines channel signaling write to exit + case err := <-errC: + // onky if there is any error, exit immediately. if err != nil { return err } - case err := <-writeErrC: - // if write finishes, exit immediately. reads may block forever - return err } } + return nil } // read will close the line channel when there is no more data, or an error occurs. // it is possible for an io.Reader to block forever; Write's context can be // used to cancel, but, it's possible there will be dangling read go routines. func (b *Batcher) read(ctx context.Context, r io.Reader, lines chan<- []byte, errC chan<- error) { + defer close(lines) scanner := bufio.NewScanner(r) scanner.Split(ScanLines) for scanner.Scan() { @@ -75,12 +71,10 @@ func (b *Batcher) read(ctx context.Context, r io.Reader, lines chan<- []byte, er select { case lines <- scanner.Bytes(): case <-ctx.Done(): - close(lines) errC <- ctx.Err() return } } - close(lines) errC <- scanner.Err() } diff --git a/write/batcher_test.go b/write/batcher_test.go index 8bd9edf0ca..1a1f78e377 100644 --- a/write/batcher_test.go +++ b/write/batcher_test.go @@ -90,7 +90,7 @@ func TestBatcher_read(t *testing.T) { args: args{ r: strings.NewReader("m1,t1=v1 f1=1\nm2,t2=v2 f2=2"), lines: make(chan []byte), - errC: make(chan error), + errC: make(chan error, 1), }, want: []string{"m1,t1=v1 f1=1\n", "m2,t2=v2 f2=2"}, }, @@ -100,7 +100,7 @@ func TestBatcher_read(t *testing.T) { cancel: true, r: strings.NewReader("m1,t1=v1 f1=1"), lines: make(chan []byte), - errC: make(chan error), + errC: make(chan error, 1), }, want: []string{}, wantErr: true, @@ -110,7 +110,7 @@ func TestBatcher_read(t *testing.T) { args: args{ r: &errorReader{}, lines: make(chan []byte), - errC: make(chan error), + errC: make(chan error, 1), }, want: []string{}, wantErr: true,