From ce536037dc20df4efb686e1093a115f5bc07fd90 Mon Sep 17 00:00:00 2001
From: Yun Zhao <zhaoyun2316@gmail.com>
Date: Fri, 4 Jun 2021 04:11:36 +0800
Subject: [PATCH] fix(tsm1): limit concurrent WAL encodings to reduce memory
 pressure under heavy write load (#20814)

Co-authored-by: zhaoyun.248 <zhaoyun.248@bytedance.com>
---
 pkg/pool/bytes.go            |   2 +-
 tsdb/engine/tsm1/wal.go      |   7 +-
 tsdb/engine/tsm1/wal_test.go | 708 +++++++++++++----------------------
 tsdb/series_partition.go     |   2 +-
 4 files changed, 275 insertions(+), 444 deletions(-)

diff --git a/pkg/pool/bytes.go b/pkg/pool/bytes.go
index be1d2ec958..35db6f2cf5 100644
--- a/pkg/pool/bytes.go
+++ b/pkg/pool/bytes.go
@@ -45,7 +45,7 @@ func (p *Bytes) Put(c []byte) {
 // LimitedBytes is a pool of byte slices that can be re-used.  Slices in
 // this pool will not be garbage collected when not in use.  The pool will
 // hold onto a fixed number of byte slices of a maximum size.  If the pool
-// is empty and max pool size has not been allocated yet, it will return a
+// is empty or the required size is larger than max size, it will return a
 // new byte slice.  Byte slices added to the pool that are over the max size
 // are dropped.
 type LimitedBytes struct {
diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go
index dfd6cf4abb..861ba9f2e5 100644
--- a/tsdb/engine/tsm1/wal.go
+++ b/tsdb/engine/tsm1/wal.go
@@ -112,7 +112,9 @@ type WAL struct {
 	SegmentSize int
 
 	// statistics for the WAL
-	stats   *WALStatistics
+	stats *WALStatistics
+
+	// limiter limits the max concurrency of waiting WAL writes.
 	limiter limiter.Fixed
 }
 
@@ -409,6 +411,9 @@ func (l *WAL) writeToLog(entry WALEntry) (int, error) {
 	// limit how many concurrent encodings can be in flight.  Since we can only
 	// write one at a time to disk, a slow disk can cause the allocations below
 	// to increase quickly.  If we're backed up, wait until others have completed.
+	l.limiter.Take()
+	defer l.limiter.Release()
+
 	bytes := bytesPool.Get(entry.MarshalSize())
 
 	b, err := entry.Encode(bytes)
diff --git a/tsdb/engine/tsm1/wal_test.go b/tsdb/engine/tsm1/wal_test.go
index 2a9bacc56b..6cda6be186 100644
--- a/tsdb/engine/tsm1/wal_test.go
+++ b/tsdb/engine/tsm1/wal_test.go
@@ -5,8 +5,10 @@ import (
 	"io"
 	"io/ioutil"
 	"os"
+	"path/filepath"
 	"reflect"
 	"sort"
+	"sync"
 	"testing"
 
 	"github.com/golang/snappy"
@@ -18,8 +20,9 @@ import (
 func TestWALWriter_WriteMulti_Single(t *testing.T) {
 	dir := MustTempDir()
 	defer os.RemoveAll(dir)
-	f := MustTempFile(dir)
-	w := tsm1.NewWALSegmentWriter(f)
+	w := tsm1.NewWAL(dir)
+	defer w.Close()
+	require.NoError(t, w.Open())
 
 	p1 := tsm1.NewValue(1, 1.1)
 	p2 := tsm1.NewValue(1, int64(1))
@@ -28,63 +31,42 @@ func TestWALWriter_WriteMulti_Single(t *testing.T) {
 	p5 := tsm1.NewValue(1, ^uint64(0))
 
 	values := map[string][]tsm1.Value{
-		"cpu,host=A#!~#float":    []tsm1.Value{p1},
-		"cpu,host=A#!~#int":      []tsm1.Value{p2},
-		"cpu,host=A#!~#bool":     []tsm1.Value{p3},
-		"cpu,host=A#!~#string":   []tsm1.Value{p4},
-		"cpu,host=A#!~#unsigned": []tsm1.Value{p5},
+		"cpu,host=A#!~#float":    {p1},
+		"cpu,host=A#!~#int":      {p2},
+		"cpu,host=A#!~#bool":     {p3},
+		"cpu,host=A#!~#string":   {p4},
+		"cpu,host=A#!~#unsigned": {p5},
 	}
 
-	entry := &tsm1.WriteWALEntry{
-		Values: values,
-	}
+	_, err := w.WriteMulti(values)
+	require.NoError(t, err)
 
-	if err := w.Write(mustMarshalEntry(entry)); err != nil {
-		fatal(t, "write points", err)
-	}
+	f, r := mustSegmentReader(t, w)
+	defer r.Close()
 
-	if err := w.Flush(); err != nil {
-		fatal(t, "flush", err)
-	}
-
-	if _, err := f.Seek(0, io.SeekStart); err != nil {
-		fatal(t, "seek", err)
-	}
-
-	r := tsm1.NewWALSegmentReader(f)
-
-	if !r.Next() {
-		t.Fatalf("expected next, got false")
-	}
+	require.True(t, r.Next())
 
 	we, err := r.Read()
-	if err != nil {
-		fatal(t, "read entry", err)
-	}
+	require.NoError(t, err)
 
 	e, ok := we.(*tsm1.WriteWALEntry)
-	if !ok {
-		t.Fatalf("expected WriteWALEntry: got %#v", e)
-	}
+	require.True(t, ok)
 
 	for k, v := range e.Values {
 		for i, vv := range v {
-			if got, exp := vv.String(), values[k][i].String(); got != exp {
-				t.Fatalf("points mismatch: got %v, exp %v", got, exp)
-			}
+			require.Equal(t, values[k][i].String(), vv.String())
 		}
 	}
 
-	if n := r.Count(); n != MustReadFileSize(f) {
-		t.Fatalf("wrong count of bytes read, got %d, exp %d", n, MustReadFileSize(f))
-	}
+	require.Equal(t, r.Count(), mustReadFileSize(f))
 }
 
 func TestWALWriter_WriteMulti_LargeBatch(t *testing.T) {
 	dir := MustTempDir()
 	defer os.RemoveAll(dir)
-	f := MustTempFile(dir)
-	w := tsm1.NewWALSegmentWriter(f)
+	w := tsm1.NewWAL(dir)
+	defer w.Close()
+	require.NoError(t, w.Open())
 
 	var points []tsm1.Value
 	for i := 0; i < 100000; i++ {
@@ -96,55 +78,35 @@ func TestWALWriter_WriteMulti_LargeBatch(t *testing.T) {
 		"mem,host=A,server=01,foo=bar,tag=really-long#!~#float": points,
 	}
 
-	entry := &tsm1.WriteWALEntry{
-		Values: values,
-	}
+	_, err := w.WriteMulti(values)
+	require.NoError(t, err)
 
-	if err := w.Write(mustMarshalEntry(entry)); err != nil {
-		fatal(t, "write points", err)
-	}
+	f, r := mustSegmentReader(t, w)
+	defer r.Close()
 
-	if err := w.Flush(); err != nil {
-		fatal(t, "flush", err)
-	}
-
-	if _, err := f.Seek(0, io.SeekStart); err != nil {
-		fatal(t, "seek", err)
-	}
-
-	r := tsm1.NewWALSegmentReader(f)
-
-	if !r.Next() {
-		t.Fatalf("expected next, got false")
-	}
+	require.True(t, r.Next())
 
 	we, err := r.Read()
-	if err != nil {
-		fatal(t, "read entry", err)
-	}
+	require.NoError(t, err)
 
 	e, ok := we.(*tsm1.WriteWALEntry)
-	if !ok {
-		t.Fatalf("expected WriteWALEntry: got %#v", e)
-	}
+	require.True(t, ok)
 
 	for k, v := range e.Values {
 		for i, vv := range v {
-			if got, exp := vv.String(), values[k][i].String(); got != exp {
-				t.Fatalf("points mismatch: got %v, exp %v", got, exp)
-			}
+			require.Equal(t, values[k][i].String(), vv.String())
 		}
 	}
 
-	if n := r.Count(); n != MustReadFileSize(f) {
-		t.Fatalf("wrong count of bytes read, got %d, exp %d", n, MustReadFileSize(f))
-	}
+	require.Equal(t, r.Count(), mustReadFileSize(f))
 }
+
 func TestWALWriter_WriteMulti_Multiple(t *testing.T) {
 	dir := MustTempDir()
 	defer os.RemoveAll(dir)
-	f := MustTempFile(dir)
-	w := tsm1.NewWALSegmentWriter(f)
+	w := tsm1.NewWAL(dir)
+	defer w.Close()
+	require.NoError(t, w.Open())
 
 	p1 := tsm1.NewValue(1, int64(1))
 	p2 := tsm1.NewValue(1, int64(2))
@@ -158,309 +120,171 @@ func TestWALWriter_WriteMulti_Multiple(t *testing.T) {
 	}
 
 	for _, v := range exp {
-		entry := &tsm1.WriteWALEntry{
-			Values: map[string][]tsm1.Value{v.key: v.values},
-		}
-
-		if err := w.Write(mustMarshalEntry(entry)); err != nil {
-			fatal(t, "write points", err)
-		}
-		if err := w.Flush(); err != nil {
-			fatal(t, "flush", err)
-		}
+		_, err := w.WriteMulti(map[string][]tsm1.Value{v.key: v.values})
+		require.NoError(t, err)
 	}
 
-	// Seek back to the beginning of the file for reading
-	if _, err := f.Seek(0, io.SeekStart); err != nil {
-		fatal(t, "seek", err)
-	}
-
-	r := tsm1.NewWALSegmentReader(f)
+	f, r := mustSegmentReader(t, w)
+	defer r.Close()
 
 	for _, ep := range exp {
-		if !r.Next() {
-			t.Fatalf("expected next, got false")
-		}
+		require.True(t, r.Next())
 
 		we, err := r.Read()
-		if err != nil {
-			fatal(t, "read entry", err)
-		}
+		require.NoError(t, err)
 
 		e, ok := we.(*tsm1.WriteWALEntry)
-		if !ok {
-			t.Fatalf("expected WriteWALEntry: got %#v", e)
-		}
+		require.True(t, ok)
 
 		for k, v := range e.Values {
-			if got, exp := k, ep.key; got != exp {
-				t.Fatalf("key mismatch. got %v, exp %v", got, exp)
-			}
-
-			if got, exp := len(v), len(ep.values); got != exp {
-				t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
-			}
+			require.Equal(t, k, ep.key)
+			require.Equal(t, len(v), len(ep.values))
 
 			for i, vv := range v {
-				if got, exp := vv.String(), ep.values[i].String(); got != exp {
-					t.Fatalf("points mismatch: got %v, exp %v", got, exp)
-				}
+				require.Equal(t, vv.String(), ep.values[i].String())
 			}
 		}
 	}
 
-	if n := r.Count(); n != MustReadFileSize(f) {
-		t.Fatalf("wrong count of bytes read, got %d, exp %d", n, MustReadFileSize(f))
-	}
+	require.Equal(t, r.Count(), mustReadFileSize(f))
 }
 
 func TestWALWriter_WriteDelete_Single(t *testing.T) {
 	dir := MustTempDir()
 	defer os.RemoveAll(dir)
-	f := MustTempFile(dir)
-	w := tsm1.NewWALSegmentWriter(f)
+	w := tsm1.NewWAL(dir)
+	defer w.Close()
+	require.NoError(t, w.Open())
 
-	entry := &tsm1.DeleteWALEntry{
-		Keys: [][]byte{[]byte("cpu")},
-	}
+	keys := [][]byte{[]byte("cpu")}
 
-	if err := w.Write(mustMarshalEntry(entry)); err != nil {
-		fatal(t, "write points", err)
-	}
+	_, err := w.Delete(keys)
+	require.NoError(t, err)
 
-	if err := w.Flush(); err != nil {
-		fatal(t, "flush", err)
-	}
+	_, r := mustSegmentReader(t, w)
+	defer r.Close()
 
-	if _, err := f.Seek(0, io.SeekStart); err != nil {
-		fatal(t, "seek", err)
-	}
-
-	r := tsm1.NewWALSegmentReader(f)
-
-	if !r.Next() {
-		t.Fatalf("expected next, got false")
-	}
+	require.True(t, r.Next())
 
 	we, err := r.Read()
-	if err != nil {
-		fatal(t, "read entry", err)
-	}
+	require.NoError(t, err)
 
 	e, ok := we.(*tsm1.DeleteWALEntry)
-	if !ok {
-		t.Fatalf("expected WriteWALEntry: got %#v", e)
-	}
+	require.True(t, ok)
 
-	if got, exp := len(e.Keys), len(entry.Keys); got != exp {
-		t.Fatalf("key length mismatch: got %v, exp %v", got, exp)
-	}
-
-	if got, exp := string(e.Keys[0]), string(entry.Keys[0]); got != exp {
-		t.Fatalf("key mismatch: got %v, exp %v", got, exp)
-	}
+	require.Equal(t, len(e.Keys), len(keys))
+	require.Equal(t, string(e.Keys[0]), string(keys[0]))
 }
 
 func TestWALWriter_WriteMultiDelete_Multiple(t *testing.T) {
 	dir := MustTempDir()
 	defer os.RemoveAll(dir)
-	f := MustTempFile(dir)
-	w := tsm1.NewWALSegmentWriter(f)
+	w := tsm1.NewWAL(dir)
+	defer w.Close()
+	require.NoError(t, w.Open())
 
 	p1 := tsm1.NewValue(1, true)
 	values := map[string][]tsm1.Value{
-		"cpu,host=A#!~#value": []tsm1.Value{p1},
+		"cpu,host=A#!~#value": {p1},
 	}
 
-	writeEntry := &tsm1.WriteWALEntry{
-		Values: values,
-	}
+	_, err := w.WriteMulti(values)
+	require.NoError(t, err)
 
-	if err := w.Write(mustMarshalEntry(writeEntry)); err != nil {
-		fatal(t, "write points", err)
-	}
+	deleteKeys := [][]byte{[]byte("cpu,host=A#!~value")}
 
-	if err := w.Flush(); err != nil {
-		fatal(t, "flush", err)
-	}
+	_, err = w.Delete(deleteKeys)
+	require.NoError(t, err)
 
-	// Write the delete entry
-	deleteEntry := &tsm1.DeleteWALEntry{
-		Keys: [][]byte{[]byte("cpu,host=A#!~value")},
-	}
+	_, r := mustSegmentReader(t, w)
+	defer r.Close()
 
-	if err := w.Write(mustMarshalEntry(deleteEntry)); err != nil {
-		fatal(t, "write points", err)
-	}
-
-	if err := w.Flush(); err != nil {
-		fatal(t, "flush", err)
-	}
-
-	// Seek back to the beginning of the file for reading
-	if _, err := f.Seek(0, io.SeekStart); err != nil {
-		fatal(t, "seek", err)
-	}
-
-	r := tsm1.NewWALSegmentReader(f)
-
-	// Read the write points first
-	if !r.Next() {
-		t.Fatalf("expected next, got false")
-	}
+	require.True(t, r.Next())
 
 	we, err := r.Read()
-	if err != nil {
-		fatal(t, "read entry", err)
-	}
+	require.NoError(t, err)
 
 	e, ok := we.(*tsm1.WriteWALEntry)
-	if !ok {
-		t.Fatalf("expected WriteWALEntry: got %#v", e)
-	}
+	require.True(t, ok)
 
 	for k, v := range e.Values {
-		if got, exp := len(v), len(values[k]); got != exp {
-			t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
-		}
+		require.Equal(t, len(v), len(values[k]))
 
 		for i, vv := range v {
-			if got, exp := vv.String(), values[k][i].String(); got != exp {
-				t.Fatalf("points mismatch: got %v, exp %v", got, exp)
-			}
+			require.Equal(t, vv.String(), values[k][i].String())
 		}
 	}
 
 	// Read the delete second
-	if !r.Next() {
-		t.Fatalf("expected next, got false")
-	}
+	require.True(t, r.Next())
 
 	we, err = r.Read()
-	if err != nil {
-		fatal(t, "read entry", err)
-	}
+	require.NoError(t, err)
 
 	de, ok := we.(*tsm1.DeleteWALEntry)
-	if !ok {
-		t.Fatalf("expected DeleteWALEntry: got %#v", e)
-	}
+	require.True(t, ok)
 
-	if got, exp := len(de.Keys), len(deleteEntry.Keys); got != exp {
-		t.Fatalf("key length mismatch: got %v, exp %v", got, exp)
-	}
-
-	if got, exp := string(de.Keys[0]), string(deleteEntry.Keys[0]); got != exp {
-		t.Fatalf("key mismatch: got %v, exp %v", got, exp)
-	}
+	require.Equal(t, len(de.Keys), len(deleteKeys))
+	require.Equal(t, string(de.Keys[0]), string(deleteKeys[0]))
 }
 
 func TestWALWriter_WriteMultiDeleteRange_Multiple(t *testing.T) {
 	dir := MustTempDir()
 	defer os.RemoveAll(dir)
-	f := MustTempFile(dir)
-	w := tsm1.NewWALSegmentWriter(f)
+	w := tsm1.NewWAL(dir)
+	defer w.Close()
+	require.NoError(t, w.Open())
 
 	p1 := tsm1.NewValue(1, 1.0)
 	p2 := tsm1.NewValue(2, 2.0)
 	p3 := tsm1.NewValue(3, 3.0)
 
 	values := map[string][]tsm1.Value{
-		"cpu,host=A#!~#value": []tsm1.Value{p1, p2, p3},
+		"cpu,host=A#!~#value": {p1, p2, p3},
 	}
 
-	writeEntry := &tsm1.WriteWALEntry{
-		Values: values,
-	}
-
-	if err := w.Write(mustMarshalEntry(writeEntry)); err != nil {
-		fatal(t, "write points", err)
-	}
-
-	if err := w.Flush(); err != nil {
-		fatal(t, "flush", err)
-	}
+	_, err := w.WriteMulti(values)
+	require.NoError(t, err)
 
 	// Write the delete entry
-	deleteEntry := &tsm1.DeleteRangeWALEntry{
-		Keys: [][]byte{[]byte("cpu,host=A#!~value")},
-		Min:  2,
-		Max:  3,
-	}
+	deleteKeys := [][]byte{[]byte("cpu,host=A#!~value")}
+	deleteMin, deleteMax := int64(2), int64(3)
 
-	if err := w.Write(mustMarshalEntry(deleteEntry)); err != nil {
-		fatal(t, "write points", err)
-	}
+	_, err = w.DeleteRange(deleteKeys, deleteMin, deleteMax)
+	require.NoError(t, err)
 
-	if err := w.Flush(); err != nil {
-		fatal(t, "flush", err)
-	}
+	_, r := mustSegmentReader(t, w)
+	defer r.Close()
 
-	// Seek back to the beginning of the file for reading
-	if _, err := f.Seek(0, io.SeekStart); err != nil {
-		fatal(t, "seek", err)
-	}
-
-	r := tsm1.NewWALSegmentReader(f)
-
-	// Read the write points first
-	if !r.Next() {
-		t.Fatalf("expected next, got false")
-	}
+	require.True(t, r.Next())
 
 	we, err := r.Read()
-	if err != nil {
-		fatal(t, "read entry", err)
-	}
+	require.NoError(t, err)
 
 	e, ok := we.(*tsm1.WriteWALEntry)
-	if !ok {
-		t.Fatalf("expected WriteWALEntry: got %#v", e)
-	}
+	require.True(t, ok)
 
 	for k, v := range e.Values {
-		if got, exp := len(v), len(values[k]); got != exp {
-			t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
-		}
+		require.Equal(t, len(v), len(values[k]))
 
 		for i, vv := range v {
-			if got, exp := vv.String(), values[k][i].String(); got != exp {
-				t.Fatalf("points mismatch: got %v, exp %v", got, exp)
-			}
+			require.Equal(t, vv.String(), values[k][i].String())
 		}
 	}
 
 	// Read the delete second
-	if !r.Next() {
-		t.Fatalf("expected next, got false")
-	}
+	require.True(t, r.Next())
 
 	we, err = r.Read()
-	if err != nil {
-		fatal(t, "read entry", err)
-	}
+	require.NoError(t, err)
 
 	de, ok := we.(*tsm1.DeleteRangeWALEntry)
-	if !ok {
-		t.Fatalf("expected DeleteWALEntry: got %#v", e)
-	}
-
-	if got, exp := len(de.Keys), len(deleteEntry.Keys); got != exp {
-		t.Fatalf("key length mismatch: got %v, exp %v", got, exp)
-	}
-
-	if got, exp := string(de.Keys[0]), string(deleteEntry.Keys[0]); got != exp {
-		t.Fatalf("key mismatch: got %v, exp %v", got, exp)
-	}
-
-	if got, exp := de.Min, int64(2); got != exp {
-		t.Fatalf("min time mismatch: got %v, exp %v", got, exp)
-	}
-
-	if got, exp := de.Max, int64(3); got != exp {
-		t.Fatalf("min time mismatch: got %v, exp %v", got, exp)
-	}
+	require.True(t, ok)
 
+	require.Equal(t, len(de.Keys), len(deleteKeys))
+	require.Equal(t, string(de.Keys[0]), string(deleteKeys[0]))
+	require.Equal(t, de.Min, deleteMin)
+	require.Equal(t, de.Max, deleteMax)
 }
 
 func TestWAL_ClosedSegments(t *testing.T) {
@@ -468,45 +292,30 @@ func TestWAL_ClosedSegments(t *testing.T) {
 	defer os.RemoveAll(dir)
 
 	w := tsm1.NewWAL(dir)
-	if err := w.Open(); err != nil {
-		t.Fatalf("error opening WAL: %v", err)
-	}
+	require.NoError(t, w.Open())
 
 	files, err := w.ClosedSegments()
-	if err != nil {
-		t.Fatalf("error getting closed segments: %v", err)
-	}
+	require.NoError(t, err)
 
-	if got, exp := len(files), 0; got != exp {
-		t.Fatalf("close segment length mismatch: got %v, exp %v", got, exp)
-	}
+	require.Equal(t, len(files), 0)
 
-	if _, err := w.WriteMulti(map[string][]tsm1.Value{
-		"cpu,host=A#!~#value": []tsm1.Value{
+	_, err = w.WriteMulti(map[string][]tsm1.Value{
+		"cpu,host=A#!~#value": {
 			tsm1.NewValue(1, 1.1),
 		},
-	}); err != nil {
-		t.Fatalf("error writing points: %v", err)
-	}
+	})
+	require.NoError(t, err)
 
-	if err := w.Close(); err != nil {
-		t.Fatalf("error closing wal: %v", err)
-	}
+	require.NoError(t, w.Close())
 
 	// Re-open the WAL
 	w = tsm1.NewWAL(dir)
 	defer w.Close()
-	if err := w.Open(); err != nil {
-		t.Fatalf("error opening WAL: %v", err)
-	}
+	require.NoError(t, w.Open())
 
 	files, err = w.ClosedSegments()
-	if err != nil {
-		t.Fatalf("error getting closed segments: %v", err)
-	}
-	if got, exp := len(files), 0; got != exp {
-		t.Fatalf("close segment length mismatch: got %v, exp %v", got, exp)
-	}
+	require.NoError(t, err)
+	require.Equal(t, len(files), 0)
 }
 
 func TestWAL_Delete(t *testing.T) {
@@ -514,41 +323,27 @@ func TestWAL_Delete(t *testing.T) {
 	defer os.RemoveAll(dir)
 
 	w := tsm1.NewWAL(dir)
-	if err := w.Open(); err != nil {
-		t.Fatalf("error opening WAL: %v", err)
-	}
+	require.NoError(t, w.Open())
 
 	files, err := w.ClosedSegments()
-	if err != nil {
-		t.Fatalf("error getting closed segments: %v", err)
-	}
 
-	if got, exp := len(files), 0; got != exp {
-		t.Fatalf("close segment length mismatch: got %v, exp %v", got, exp)
-	}
+	require.NoError(t, err)
 
-	if _, err := w.Delete([][]byte{[]byte("cpu")}); err != nil {
-		t.Fatalf("error writing points: %v", err)
-	}
+	require.Equal(t, len(files), 0)
 
-	if err := w.Close(); err != nil {
-		t.Fatalf("error closing wal: %v", err)
-	}
+	_, err = w.Delete([][]byte{[]byte("cpu")})
+	require.NoError(t, err)
+
+	require.NoError(t, w.Close())
 
 	// Re-open the WAL
 	w = tsm1.NewWAL(dir)
 	defer w.Close()
-	if err := w.Open(); err != nil {
-		t.Fatalf("error opening WAL: %v", err)
-	}
+	require.NoError(t, w.Open())
 
 	files, err = w.ClosedSegments()
-	if err != nil {
-		t.Fatalf("error getting closed segments: %v", err)
-	}
-	if got, exp := len(files), 0; got != exp {
-		t.Fatalf("close segment length mismatch: got %v, exp %v", got, exp)
-	}
+	require.NoError(t, err)
+	require.Equal(t, len(files), 0)
 }
 
 func TestWALWriter_Corrupt(t *testing.T) {
@@ -560,52 +355,40 @@ func TestWALWriter_Corrupt(t *testing.T) {
 
 	p1 := tsm1.NewValue(1, 1.1)
 	values := map[string][]tsm1.Value{
-		"cpu,host=A#!~#float": []tsm1.Value{p1},
+		"cpu,host=A#!~#float": {p1},
 	}
 
 	entry := &tsm1.WriteWALEntry{
 		Values: values,
 	}
-	if err := w.Write(mustMarshalEntry(entry)); err != nil {
-		fatal(t, "write points", err)
-	}
 
-	if err := w.Flush(); err != nil {
-		fatal(t, "flush", err)
-	}
+	require.NoError(t, w.Write(mustMarshalEntry(entry)))
+	require.NoError(t, w.Flush())
 
 	// Write some random bytes to the file to simulate corruption.
-	if _, err := f.Write(corruption); err != nil {
-		fatal(t, "corrupt WAL segment", err)
-	}
+	_, err := f.Write(corruption)
+	require.NoError(t, err)
 
 	// Create the WAL segment reader.
-	if _, err := f.Seek(0, io.SeekStart); err != nil {
-		fatal(t, "seek", err)
-	}
+	_, err = f.Seek(0, io.SeekStart)
+	require.NoError(t, err)
+
 	r := tsm1.NewWALSegmentReader(f)
 
 	// Try to decode two entries.
+	require.True(t, r.Next())
 
-	if !r.Next() {
-		t.Fatalf("expected next, got false")
-	}
-	if _, err := r.Read(); err != nil {
-		fatal(t, "read entry", err)
-	}
+	_, err = r.Read()
+	require.NoError(t, err)
 
-	if !r.Next() {
-		t.Fatalf("expected next, got false")
-	}
-	if _, err := r.Read(); err == nil {
-		fatal(t, "read entry did not return err", nil)
-	}
+	require.True(t, r.Next())
+
+	_, err = r.Read()
+	require.Error(t, err)
 
 	// Count should only return size of valid data.
-	expCount := MustReadFileSize(f) - int64(len(corruption))
-	if n := r.Count(); n != expCount {
-		t.Fatalf("wrong count of bytes read, got %d, exp %d", n, expCount)
-	}
+	expCount := mustReadFileSize(f) - int64(len(corruption))
+	require.Equal(t, expCount, r.Count())
 }
 
 // Reproduces a `panic: runtime error: makeslice: cap out of range` when run with
@@ -619,7 +402,7 @@ func TestWALSegmentReader_Corrupt(t *testing.T) {
 	p4 := tsm1.NewValue(1, "string")
 
 	values := map[string][]tsm1.Value{
-		"cpu,host=A#!~#string": []tsm1.Value{p4, p4},
+		"cpu,host=A#!~#string": {p4, p4},
 	}
 
 	entry := &tsm1.WriteWALEntry{
@@ -632,18 +415,12 @@ func TestWALSegmentReader_Corrupt(t *testing.T) {
 	// negative count and a panic when reading the segment.
 	b[25] = 255
 
-	if err := w.Write(typ, b); err != nil {
-		fatal(t, "write points", err)
-	}
-
-	if err := w.Flush(); err != nil {
-		fatal(t, "flush", err)
-	}
+	require.NoError(t, w.Write(typ, b))
+	require.NoError(t, w.Flush())
 
 	// Create the WAL segment reader.
-	if _, err := f.Seek(0, io.SeekStart); err != nil {
-		fatal(t, "seek", err)
-	}
+	_, err := f.Seek(0, io.SeekStart)
+	require.NoError(t, err)
 
 	r := tsm1.NewWALSegmentReader(f)
 	defer r.Close()
@@ -659,48 +436,34 @@ func TestWALRollSegment(t *testing.T) {
 	defer os.RemoveAll(dir)
 
 	w := tsm1.NewWAL(dir)
-	if err := w.Open(); err != nil {
-		t.Fatalf("error opening WAL: %v", err)
-	}
+	require.NoError(t, w.Open())
 	const segSize = 1024
 	w.SegmentSize = segSize
 
 	values := map[string][]tsm1.Value{
-		"cpu,host=A#!~#value": []tsm1.Value{tsm1.NewValue(1, 1.0)},
-		"cpu,host=B#!~#value": []tsm1.Value{tsm1.NewValue(1, 1.0)},
-		"cpu,host=C#!~#value": []tsm1.Value{tsm1.NewValue(1, 1.0)},
-	}
-	if _, err := w.WriteMulti(values); err != nil {
-		fatal(t, "write points", err)
+		"cpu,host=A#!~#value": {tsm1.NewValue(1, 1.0)},
+		"cpu,host=B#!~#value": {tsm1.NewValue(1, 1.0)},
+		"cpu,host=C#!~#value": {tsm1.NewValue(1, 1.0)},
 	}
+	_, err := w.WriteMulti(values)
+	require.NoError(t, err)
 
 	files, err := ioutil.ReadDir(w.Path())
-	if err != nil {
-		fatal(t, "ReadDir", err)
-	}
-	if len(files) != 1 {
-		t.Fatalf("unexpected segments size %d", len(files))
-	}
+	require.NoError(t, err)
+	require.Equal(t, 1, len(files))
 
 	encodeSize := files[0].Size()
 
 	for i := 0; i < 100; i++ {
-		if _, err := w.WriteMulti(values); err != nil {
-			fatal(t, "write points", err)
-		}
+		_, err := w.WriteMulti(values)
+		require.NoError(t, err)
 	}
 	files, err = ioutil.ReadDir(w.Path())
-	if err != nil {
-		fatal(t, "ReadDir", err)
-	}
+	require.NoError(t, err)
 	for _, f := range files {
-		if f.Size() > int64(segSize)+encodeSize {
-			t.Fatalf("unexpected segment size %d", f.Size())
-		}
-	}
-	if err := w.Close(); err != nil {
-		t.Fatalf("error closing wal: %v", err)
+		require.True(t, f.Size() <= int64(segSize)+encodeSize)
 	}
+	require.NoError(t, w.Close())
 }
 
 func TestWAL_DiskSize(t *testing.T) {
@@ -798,11 +561,11 @@ func TestWriteWALSegment_UnmarshalBinary_WriteWALCorrupt(t *testing.T) {
 	p5 := tsm1.NewValue(1, uint64(1))
 
 	values := map[string][]tsm1.Value{
-		"cpu,host=A#!~#float":    []tsm1.Value{p1, p1},
-		"cpu,host=A#!~#int":      []tsm1.Value{p2, p2},
-		"cpu,host=A#!~#bool":     []tsm1.Value{p3, p3},
-		"cpu,host=A#!~#string":   []tsm1.Value{p4, p4},
-		"cpu,host=A#!~#unsigned": []tsm1.Value{p5, p5},
+		"cpu,host=A#!~#float":    {p1, p1},
+		"cpu,host=A#!~#int":      {p2, p2},
+		"cpu,host=A#!~#bool":     {p3, p3},
+		"cpu,host=A#!~#string":   {p4, p4},
+		"cpu,host=A#!~#unsigned": {p5, p5},
 	}
 
 	w := &tsm1.WriteWALEntry{
@@ -810,9 +573,7 @@ func TestWriteWALSegment_UnmarshalBinary_WriteWALCorrupt(t *testing.T) {
 	}
 
 	b, err := w.MarshalBinary()
-	if err != nil {
-		t.Fatalf("unexpected error, got %v", err)
-	}
+	require.NoError(t, err)
 
 	// Test every possible truncation of a write WAL entry
 	for i := 0; i < len(b); i++ {
@@ -820,9 +581,7 @@ func TestWriteWALSegment_UnmarshalBinary_WriteWALCorrupt(t *testing.T) {
 		truncated := make([]byte, i)
 		copy(truncated, b[:i])
 		err := w.UnmarshalBinary(truncated)
-		if err != nil && err != tsm1.ErrWALCorrupt {
-			t.Fatalf("unexpected error: %v", err)
-		}
+		require.True(t, err == nil || err == tsm1.ErrWALCorrupt)
 	}
 }
 
@@ -853,21 +612,15 @@ func TestDeleteWALEntry_UnmarshalBinary(t *testing.T) {
 		},
 	}
 
-	for i, example := range examples {
+	for _, example := range examples {
 		w := &tsm1.DeleteWALEntry{Keys: slices.StringsToBytes(example.In...)}
 		b, err := w.MarshalBinary()
-		if err != nil {
-			t.Fatalf("[example %d] unexpected error, got %v", i, err)
-		}
+		require.NoError(t, err)
 
 		out := &tsm1.DeleteWALEntry{}
-		if err := out.UnmarshalBinary(b); err != nil {
-			t.Fatalf("[example %d] %v", i, err)
-		}
+		require.NoError(t, out.UnmarshalBinary(b))
 
-		if !reflect.DeepEqual(example.Out, out.Keys) {
-			t.Errorf("[example %d] got %v, expected %v", i, out.Keys, example.Out)
-		}
+		require.True(t, reflect.DeepEqual(example.Out, out.Keys))
 	}
 }
 
@@ -877,9 +630,7 @@ func TestWriteWALSegment_UnmarshalBinary_DeleteWALCorrupt(t *testing.T) {
 	}
 
 	b, err := w.MarshalBinary()
-	if err != nil {
-		t.Fatalf("unexpected error, got %v", err)
-	}
+	require.NoError(t, err)
 
 	// Test every possible truncation of a write WAL entry
 	for i := 0; i < len(b); i++ {
@@ -887,9 +638,7 @@ func TestWriteWALSegment_UnmarshalBinary_DeleteWALCorrupt(t *testing.T) {
 		truncated := make([]byte, i)
 		copy(truncated, b[:i])
 		err := w.UnmarshalBinary(truncated)
-		if err != nil && err != tsm1.ErrWALCorrupt {
-			t.Fatalf("unexpected error: %v", err)
-		}
+		require.True(t, err == nil || err == tsm1.ErrWALCorrupt)
 	}
 }
 
@@ -901,9 +650,7 @@ func TestWriteWALSegment_UnmarshalBinary_DeleteRangeWALCorrupt(t *testing.T) {
 	}
 
 	b, err := w.MarshalBinary()
-	if err != nil {
-		t.Fatalf("unexpected error, got %v", err)
-	}
+	require.NoError(t, err)
 
 	// Test every possible truncation of a write WAL entry
 	for i := 0; i < len(b); i++ {
@@ -911,9 +658,81 @@ func TestWriteWALSegment_UnmarshalBinary_DeleteRangeWALCorrupt(t *testing.T) {
 		truncated := make([]byte, i)
 		copy(truncated, b[:i])
 		err := w.UnmarshalBinary(truncated)
-		if err != nil && err != tsm1.ErrWALCorrupt {
-			t.Fatalf("unexpected error: %v", err)
-		}
+		require.True(t, err == nil || err == tsm1.ErrWALCorrupt)
+	}
+}
+
+func BenchmarkWAL_WriteMulti_Concurrency(b *testing.B) {
+	benchmarks := []struct {
+		concurrency int
+	}{
+		{1},
+		{12},
+		{24},
+		{50},
+		{100},
+		{200},
+		{300},
+		{400},
+		{500},
+	}
+
+	for _, bm := range benchmarks {
+		b.Run(fmt.Sprintf("concurrency-%d", bm.concurrency), func(b *testing.B) {
+			points := map[string][]tsm1.Value{}
+			for i := 0; i < 5000; i++ {
+				k := "cpu,host=A#!~#value"
+				points[k] = append(points[k], tsm1.NewValue(int64(i), 1.1))
+			}
+
+			dir := MustTempDir()
+			defer os.RemoveAll(dir)
+
+			w := tsm1.NewWAL(dir)
+			defer w.Close()
+			require.NoError(b, w.Open())
+
+			start := make(chan struct{})
+			stop := make(chan struct{})
+
+			succeed := make(chan struct{}, 1000)
+			defer close(succeed)
+
+			wg := &sync.WaitGroup{}
+			for i := 0; i < bm.concurrency; i++ {
+				wg.Add(1)
+				go func() {
+					defer wg.Done()
+
+					<-start
+
+					for {
+						select {
+						case <-stop:
+							return
+						default:
+							_, err := w.WriteMulti(points)
+							require.NoError(b, err)
+
+							succeed <- struct{}{}
+						}
+					}
+				}()
+			}
+
+			b.ResetTimer()
+
+			close(start)
+
+			for i := 0; i < b.N; i++ {
+				<-succeed
+			}
+
+			b.StopTimer()
+
+			close(stop)
+			wg.Wait()
+		})
 	}
 }
 
@@ -936,9 +755,7 @@ func BenchmarkWALSegmentWriter(b *testing.B) {
 
 	b.ResetTimer()
 	for i := 0; i < b.N; i++ {
-		if err := w.Write(mustMarshalEntry(write)); err != nil {
-			b.Fatalf("unexpected error writing entry: %v", err)
-		}
+		require.NoError(b, w.Write(mustMarshalEntry(write)))
 	}
 }
 
@@ -960,9 +777,7 @@ func BenchmarkWALSegmentReader(b *testing.B) {
 	}
 
 	for i := 0; i < 100; i++ {
-		if err := w.Write(mustMarshalEntry(write)); err != nil {
-			b.Fatalf("unexpected error writing entry: %v", err)
-		}
+		require.NoError(b, w.Write(mustMarshalEntry(write)))
 	}
 
 	r := tsm1.NewWALSegmentReader(f)
@@ -975,15 +790,26 @@ func BenchmarkWALSegmentReader(b *testing.B) {
 
 		for r.Next() {
 			_, err := r.Read()
-			if err != nil {
-				b.Fatalf("unexpected error reading entry: %v", err)
-			}
+			require.NoError(b, err)
 		}
 	}
 }
 
-// MustReadFileSize returns the size of the file, or panics.
-func MustReadFileSize(f *os.File) int64 {
+func mustSegmentReader(t *testing.T, w *tsm1.WAL) (*os.File, *tsm1.WALSegmentReader) {
+	files, err := filepath.Glob(filepath.Join(w.Path(),
+		fmt.Sprintf("%s*.%s", tsm1.WALFilePrefix, tsm1.WALFileExtension)))
+	require.NoError(t, err)
+	require.Equal(t, 1, len(files))
+
+	sort.Strings(files)
+
+	f, err := os.OpenFile(files[0], os.O_CREATE|os.O_RDWR, 0666)
+	require.NoError(t, err)
+	return f, tsm1.NewWALSegmentReader(f)
+}
+
+// mustReadFileSize returns the size of the file, or panics.
+func mustReadFileSize(f *os.File) int64 {
 	stat, err := os.Stat(f.Name())
 	if err != nil {
 		panic(fmt.Sprintf("failed to get size of file at %s: %s", f.Name(), err.Error()))
diff --git a/tsdb/series_partition.go b/tsdb/series_partition.go
index 54e3b9464f..63ad90fc85 100644
--- a/tsdb/series_partition.go
+++ b/tsdb/series_partition.go
@@ -88,7 +88,7 @@ func (p *SeriesPartition) Open() error {
 		p.index = NewSeriesIndex(p.IndexPath())
 		if err := p.index.Open(); err != nil {
 			return err
-		} else if p.index.Recover(p.segments); err != nil {
+		} else if err := p.index.Recover(p.segments); err != nil {
 			return err
 		}