From feb7a2842c16ce5b6509115e6d1c630c704d2c53 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Tue, 17 Jan 2017 10:49:53 -0800 Subject: [PATCH] Use unbuffered error channels in tests --- client/v2/client_test.go | 14 +++++++++----- services/httpd/listen_test.go | 17 ++++++++++------ tcp/mux_test.go | 19 ++++++++++++------ tsdb/engine/tsm1/cache_race_test.go | 30 +++++++++++++++++++---------- tsdb/engine/tsm1/engine_test.go | 15 ++++++++++----- tsdb/engine/tsm1/ring_test.go | 16 ++++++++++----- tsdb/shard_test.go | 16 +++++++++------ 7 files changed, 84 insertions(+), 43 deletions(-) diff --git a/client/v2/client_test.go b/client/v2/client_test.go index 145086263e..42d8dda05e 100644 --- a/client/v2/client_test.go +++ b/client/v2/client_test.go @@ -258,7 +258,7 @@ func TestClient_Concurrent_Use(t *testing.T) { wg.Add(3) n := 1000 - errC := make(chan error, 3) + errC := make(chan error) go func() { defer wg.Done() bp, err := NewBatchPoints(BatchPointsConfig{}) @@ -293,11 +293,15 @@ func TestClient_Concurrent_Use(t *testing.T) { } }() - wg.Wait() + go func() { + wg.Wait() + close(errC) + }() - close(errC) - if err := <-errC; err != nil { - t.Fatal(err) + for err := range errC { + if err != nil { + t.Error(err) + } } } diff --git a/services/httpd/listen_test.go b/services/httpd/listen_test.go index 4f780152d4..3ef148214a 100644 --- a/services/httpd/listen_test.go +++ b/services/httpd/listen_test.go @@ -82,22 +82,27 @@ func BenchmarkLimitListener(b *testing.B) { wg.Add(b.N) l := httpd.LimitListener(&fakeListener{}, b.N) - errC := make(chan error, 1) + errC := make(chan error) for i := 0; i < b.N; i++ { go func() { + defer wg.Done() c, err := l.Accept() if err != nil { errC <- err return } c.Close() - wg.Done() }() } - wg.Wait() - close(errC) - if err := <-errC; err != nil { - b.Fatal(err) + go func() { + wg.Wait() + close(errC) + }() + + for err := range errC { + if err != nil { + b.Error(err) + } } } diff --git a/tcp/mux_test.go b/tcp/mux_test.go index f99cc540d5..78c041eb57 100644 --- a/tcp/mux_test.go +++ b/tcp/mux_test.go @@ -43,7 +43,7 @@ func TestMux(t *testing.T) { mux.Logger = log.New(ioutil.Discard, "", 0) } - errC := make(chan error, n) + errC := make(chan error) for i := uint8(0); i < n; i++ { ln := mux.Listen(byte(i)) @@ -121,14 +121,21 @@ func TestMux(t *testing.T) { // Close original TCP listener and wait for all goroutines to close. tcpListener.Close() - wg.Wait() - close(errC) - if err := <-errC; err != nil { - t.Fatal(err) + go func() { + wg.Wait() + close(errC) + }() + + ok := true + for err := range errC { + if err != nil { + ok = false + t.Error(err) + } } - return true + return ok }, nil); err != nil { t.Error(err) } diff --git a/tsdb/engine/tsm1/cache_race_test.go b/tsdb/engine/tsm1/cache_race_test.go index 0c4d57dc05..3028724db4 100644 --- a/tsdb/engine/tsm1/cache_race_test.go +++ b/tsdb/engine/tsm1/cache_race_test.go @@ -86,7 +86,7 @@ func TestCacheRace(t *testing.T) { }(s) } - errC := make(chan error, 1) + errC := make(chan error) wg.Add(1) go func() { defer wg.Done() @@ -106,11 +106,16 @@ func TestCacheRace(t *testing.T) { }() close(ch) - wg.Wait() - close(errC) - if err := <-errC; err != nil { - t.Fatal(err) + go func() { + wg.Wait() + close(errC) + }() + + for err := range errC { + if err != nil { + t.Error(err) + } } } @@ -148,7 +153,7 @@ func TestCacheRace2Compacters(t *testing.T) { fileCounter := 0 mapFiles := map[int]bool{} mu := sync.Mutex{} - errC := make(chan error, 1000) + errC := make(chan error) for i := 0; i < 2; i++ { wg.Add(1) go func() { @@ -187,10 +192,15 @@ func TestCacheRace2Compacters(t *testing.T) { }() } close(ch) - wg.Wait() - close(errC) - if err := <-errC; err != nil { - t.Fatal(err) + go func() { + wg.Wait() + close(errC) + }() + + for err := range errC { + if err != nil { + t.Error(err) + } } } diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 1381fdccc4..8725b895a3 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -822,7 +822,7 @@ func benchmarkEngine_WritePoints_Parallel(b *testing.B, batchSize int) { b.StartTimer() var wg sync.WaitGroup - errC := make(chan error, cpus) + errC := make(chan error) for i := 0; i < cpus; i++ { wg.Add(1) go func(i int) { @@ -835,11 +835,16 @@ func benchmarkEngine_WritePoints_Parallel(b *testing.B, batchSize int) { } }(i) } - wg.Wait() - close(errC) - if err := <-errC; err != nil { - b.Fatal(err) + go func() { + wg.Wait() + close(errC) + }() + + for err := range errC { + if err != nil { + b.Error(err) + } } } } diff --git a/tsdb/engine/tsm1/ring_test.go b/tsdb/engine/tsm1/ring_test.go index 79e9294387..3ba6e092a6 100644 --- a/tsdb/engine/tsm1/ring_test.go +++ b/tsdb/engine/tsm1/ring_test.go @@ -67,7 +67,7 @@ func benchmarkRingWrite(b *testing.B, r *ring, n int) { for i := 0; i < b.N; i++ { var wg sync.WaitGroup for i := 0; i < runtime.GOMAXPROCS(0); i++ { - errC := make(chan error, n) + errC := make(chan error) wg.Add(1) go func() { defer wg.Done() @@ -77,10 +77,16 @@ func benchmarkRingWrite(b *testing.B, r *ring, n int) { } } }() - wg.Wait() - close(errC) - if err := <-errC; err != nil { - b.Fatal(err) + + go func() { + wg.Wait() + close(errC) + }() + + for err := range errC { + if err != nil { + b.Error(err) + } } } } diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index fad0cb6b1b..a6a13d4f5d 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -402,7 +402,7 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) { var wg sync.WaitGroup wg.Add(2) - errC := make(chan error, 2) + errC := make(chan error) go func() { defer wg.Done() for i := 0; i < 50; i++ { @@ -434,12 +434,16 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) { } }() - wg.Wait() - close(errC) - if err := <-errC; err != nil { - t.Fatal(err) - } + go func() { + wg.Wait() + close(errC) + }() + for err := range errC { + if err != nil { + t.Error(err) + } + } } // Ensures that when a shard is closed, it removes any series meta-data