diff --git a/pkg/pool/bytes.go b/pkg/pool/bytes.go index be1d2ec958..35db6f2cf5 100644 --- a/pkg/pool/bytes.go +++ b/pkg/pool/bytes.go @@ -45,7 +45,7 @@ func (p *Bytes) Put(c []byte) { // LimitedBytes is a pool of byte slices that can be re-used. Slices in // this pool will not be garbage collected when not in use. The pool will // hold onto a fixed number of byte slices of a maximum size. If the pool -// is empty and max pool size has not been allocated yet, it will return a +// is empty or the required size is larger than max size, it will return a // new byte slice. Byte slices added to the pool that are over the max size // are dropped. type LimitedBytes struct { diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index dfd6cf4abb..861ba9f2e5 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -112,7 +112,9 @@ type WAL struct { SegmentSize int // statistics for the WAL - stats *WALStatistics + stats *WALStatistics + + // limiter limits the max concurrency of waiting WAL writes. limiter limiter.Fixed } @@ -409,6 +411,9 @@ func (l *WAL) writeToLog(entry WALEntry) (int, error) { // limit how many concurrent encodings can be in flight. Since we can only // write one at a time to disk, a slow disk can cause the allocations below // to increase quickly. If we're backed up, wait until others have completed. + l.limiter.Take() + defer l.limiter.Release() + bytes := bytesPool.Get(entry.MarshalSize()) b, err := entry.Encode(bytes) diff --git a/tsdb/engine/tsm1/wal_test.go b/tsdb/engine/tsm1/wal_test.go index 2a9bacc56b..6cda6be186 100644 --- a/tsdb/engine/tsm1/wal_test.go +++ b/tsdb/engine/tsm1/wal_test.go @@ -5,8 +5,10 @@ import ( "io" "io/ioutil" "os" + "path/filepath" "reflect" "sort" + "sync" "testing" "github.com/golang/snappy" @@ -18,8 +20,9 @@ import ( func TestWALWriter_WriteMulti_Single(t *testing.T) { dir := MustTempDir() defer os.RemoveAll(dir) - f := MustTempFile(dir) - w := tsm1.NewWALSegmentWriter(f) + w := tsm1.NewWAL(dir) + defer w.Close() + require.NoError(t, w.Open()) p1 := tsm1.NewValue(1, 1.1) p2 := tsm1.NewValue(1, int64(1)) @@ -28,63 +31,42 @@ func TestWALWriter_WriteMulti_Single(t *testing.T) { p5 := tsm1.NewValue(1, ^uint64(0)) values := map[string][]tsm1.Value{ - "cpu,host=A#!~#float": []tsm1.Value{p1}, - "cpu,host=A#!~#int": []tsm1.Value{p2}, - "cpu,host=A#!~#bool": []tsm1.Value{p3}, - "cpu,host=A#!~#string": []tsm1.Value{p4}, - "cpu,host=A#!~#unsigned": []tsm1.Value{p5}, + "cpu,host=A#!~#float": {p1}, + "cpu,host=A#!~#int": {p2}, + "cpu,host=A#!~#bool": {p3}, + "cpu,host=A#!~#string": {p4}, + "cpu,host=A#!~#unsigned": {p5}, } - entry := &tsm1.WriteWALEntry{ - Values: values, - } + _, err := w.WriteMulti(values) + require.NoError(t, err) - if err := w.Write(mustMarshalEntry(entry)); err != nil { - fatal(t, "write points", err) - } + f, r := mustSegmentReader(t, w) + defer r.Close() - if err := w.Flush(); err != nil { - fatal(t, "flush", err) - } - - if _, err := f.Seek(0, io.SeekStart); err != nil { - fatal(t, "seek", err) - } - - r := tsm1.NewWALSegmentReader(f) - - if !r.Next() { - t.Fatalf("expected next, got false") - } + require.True(t, r.Next()) we, err := r.Read() - if err != nil { - fatal(t, "read entry", err) - } + require.NoError(t, err) e, ok := we.(*tsm1.WriteWALEntry) - if !ok { - t.Fatalf("expected WriteWALEntry: got %#v", e) - } + require.True(t, ok) for k, v := range e.Values { for i, vv := range v { - if got, exp := vv.String(), values[k][i].String(); got != exp { - t.Fatalf("points mismatch: got %v, exp %v", got, exp) - } + require.Equal(t, values[k][i].String(), vv.String()) } } - if n := r.Count(); n != MustReadFileSize(f) { - t.Fatalf("wrong count of bytes read, got %d, exp %d", n, MustReadFileSize(f)) - } + require.Equal(t, r.Count(), mustReadFileSize(f)) } func TestWALWriter_WriteMulti_LargeBatch(t *testing.T) { dir := MustTempDir() defer os.RemoveAll(dir) - f := MustTempFile(dir) - w := tsm1.NewWALSegmentWriter(f) + w := tsm1.NewWAL(dir) + defer w.Close() + require.NoError(t, w.Open()) var points []tsm1.Value for i := 0; i < 100000; i++ { @@ -96,55 +78,35 @@ func TestWALWriter_WriteMulti_LargeBatch(t *testing.T) { "mem,host=A,server=01,foo=bar,tag=really-long#!~#float": points, } - entry := &tsm1.WriteWALEntry{ - Values: values, - } + _, err := w.WriteMulti(values) + require.NoError(t, err) - if err := w.Write(mustMarshalEntry(entry)); err != nil { - fatal(t, "write points", err) - } + f, r := mustSegmentReader(t, w) + defer r.Close() - if err := w.Flush(); err != nil { - fatal(t, "flush", err) - } - - if _, err := f.Seek(0, io.SeekStart); err != nil { - fatal(t, "seek", err) - } - - r := tsm1.NewWALSegmentReader(f) - - if !r.Next() { - t.Fatalf("expected next, got false") - } + require.True(t, r.Next()) we, err := r.Read() - if err != nil { - fatal(t, "read entry", err) - } + require.NoError(t, err) e, ok := we.(*tsm1.WriteWALEntry) - if !ok { - t.Fatalf("expected WriteWALEntry: got %#v", e) - } + require.True(t, ok) for k, v := range e.Values { for i, vv := range v { - if got, exp := vv.String(), values[k][i].String(); got != exp { - t.Fatalf("points mismatch: got %v, exp %v", got, exp) - } + require.Equal(t, values[k][i].String(), vv.String()) } } - if n := r.Count(); n != MustReadFileSize(f) { - t.Fatalf("wrong count of bytes read, got %d, exp %d", n, MustReadFileSize(f)) - } + require.Equal(t, r.Count(), mustReadFileSize(f)) } + func TestWALWriter_WriteMulti_Multiple(t *testing.T) { dir := MustTempDir() defer os.RemoveAll(dir) - f := MustTempFile(dir) - w := tsm1.NewWALSegmentWriter(f) + w := tsm1.NewWAL(dir) + defer w.Close() + require.NoError(t, w.Open()) p1 := tsm1.NewValue(1, int64(1)) p2 := tsm1.NewValue(1, int64(2)) @@ -158,309 +120,171 @@ func TestWALWriter_WriteMulti_Multiple(t *testing.T) { } for _, v := range exp { - entry := &tsm1.WriteWALEntry{ - Values: map[string][]tsm1.Value{v.key: v.values}, - } - - if err := w.Write(mustMarshalEntry(entry)); err != nil { - fatal(t, "write points", err) - } - if err := w.Flush(); err != nil { - fatal(t, "flush", err) - } + _, err := w.WriteMulti(map[string][]tsm1.Value{v.key: v.values}) + require.NoError(t, err) } - // Seek back to the beginning of the file for reading - if _, err := f.Seek(0, io.SeekStart); err != nil { - fatal(t, "seek", err) - } - - r := tsm1.NewWALSegmentReader(f) + f, r := mustSegmentReader(t, w) + defer r.Close() for _, ep := range exp { - if !r.Next() { - t.Fatalf("expected next, got false") - } + require.True(t, r.Next()) we, err := r.Read() - if err != nil { - fatal(t, "read entry", err) - } + require.NoError(t, err) e, ok := we.(*tsm1.WriteWALEntry) - if !ok { - t.Fatalf("expected WriteWALEntry: got %#v", e) - } + require.True(t, ok) for k, v := range e.Values { - if got, exp := k, ep.key; got != exp { - t.Fatalf("key mismatch. got %v, exp %v", got, exp) - } - - if got, exp := len(v), len(ep.values); got != exp { - t.Fatalf("values length mismatch: got %v, exp %v", got, exp) - } + require.Equal(t, k, ep.key) + require.Equal(t, len(v), len(ep.values)) for i, vv := range v { - if got, exp := vv.String(), ep.values[i].String(); got != exp { - t.Fatalf("points mismatch: got %v, exp %v", got, exp) - } + require.Equal(t, vv.String(), ep.values[i].String()) } } } - if n := r.Count(); n != MustReadFileSize(f) { - t.Fatalf("wrong count of bytes read, got %d, exp %d", n, MustReadFileSize(f)) - } + require.Equal(t, r.Count(), mustReadFileSize(f)) } func TestWALWriter_WriteDelete_Single(t *testing.T) { dir := MustTempDir() defer os.RemoveAll(dir) - f := MustTempFile(dir) - w := tsm1.NewWALSegmentWriter(f) + w := tsm1.NewWAL(dir) + defer w.Close() + require.NoError(t, w.Open()) - entry := &tsm1.DeleteWALEntry{ - Keys: [][]byte{[]byte("cpu")}, - } + keys := [][]byte{[]byte("cpu")} - if err := w.Write(mustMarshalEntry(entry)); err != nil { - fatal(t, "write points", err) - } + _, err := w.Delete(keys) + require.NoError(t, err) - if err := w.Flush(); err != nil { - fatal(t, "flush", err) - } + _, r := mustSegmentReader(t, w) + defer r.Close() - if _, err := f.Seek(0, io.SeekStart); err != nil { - fatal(t, "seek", err) - } - - r := tsm1.NewWALSegmentReader(f) - - if !r.Next() { - t.Fatalf("expected next, got false") - } + require.True(t, r.Next()) we, err := r.Read() - if err != nil { - fatal(t, "read entry", err) - } + require.NoError(t, err) e, ok := we.(*tsm1.DeleteWALEntry) - if !ok { - t.Fatalf("expected WriteWALEntry: got %#v", e) - } + require.True(t, ok) - if got, exp := len(e.Keys), len(entry.Keys); got != exp { - t.Fatalf("key length mismatch: got %v, exp %v", got, exp) - } - - if got, exp := string(e.Keys[0]), string(entry.Keys[0]); got != exp { - t.Fatalf("key mismatch: got %v, exp %v", got, exp) - } + require.Equal(t, len(e.Keys), len(keys)) + require.Equal(t, string(e.Keys[0]), string(keys[0])) } func TestWALWriter_WriteMultiDelete_Multiple(t *testing.T) { dir := MustTempDir() defer os.RemoveAll(dir) - f := MustTempFile(dir) - w := tsm1.NewWALSegmentWriter(f) + w := tsm1.NewWAL(dir) + defer w.Close() + require.NoError(t, w.Open()) p1 := tsm1.NewValue(1, true) values := map[string][]tsm1.Value{ - "cpu,host=A#!~#value": []tsm1.Value{p1}, + "cpu,host=A#!~#value": {p1}, } - writeEntry := &tsm1.WriteWALEntry{ - Values: values, - } + _, err := w.WriteMulti(values) + require.NoError(t, err) - if err := w.Write(mustMarshalEntry(writeEntry)); err != nil { - fatal(t, "write points", err) - } + deleteKeys := [][]byte{[]byte("cpu,host=A#!~value")} - if err := w.Flush(); err != nil { - fatal(t, "flush", err) - } + _, err = w.Delete(deleteKeys) + require.NoError(t, err) - // Write the delete entry - deleteEntry := &tsm1.DeleteWALEntry{ - Keys: [][]byte{[]byte("cpu,host=A#!~value")}, - } + _, r := mustSegmentReader(t, w) + defer r.Close() - if err := w.Write(mustMarshalEntry(deleteEntry)); err != nil { - fatal(t, "write points", err) - } - - if err := w.Flush(); err != nil { - fatal(t, "flush", err) - } - - // Seek back to the beginning of the file for reading - if _, err := f.Seek(0, io.SeekStart); err != nil { - fatal(t, "seek", err) - } - - r := tsm1.NewWALSegmentReader(f) - - // Read the write points first - if !r.Next() { - t.Fatalf("expected next, got false") - } + require.True(t, r.Next()) we, err := r.Read() - if err != nil { - fatal(t, "read entry", err) - } + require.NoError(t, err) e, ok := we.(*tsm1.WriteWALEntry) - if !ok { - t.Fatalf("expected WriteWALEntry: got %#v", e) - } + require.True(t, ok) for k, v := range e.Values { - if got, exp := len(v), len(values[k]); got != exp { - t.Fatalf("values length mismatch: got %v, exp %v", got, exp) - } + require.Equal(t, len(v), len(values[k])) for i, vv := range v { - if got, exp := vv.String(), values[k][i].String(); got != exp { - t.Fatalf("points mismatch: got %v, exp %v", got, exp) - } + require.Equal(t, vv.String(), values[k][i].String()) } } // Read the delete second - if !r.Next() { - t.Fatalf("expected next, got false") - } + require.True(t, r.Next()) we, err = r.Read() - if err != nil { - fatal(t, "read entry", err) - } + require.NoError(t, err) de, ok := we.(*tsm1.DeleteWALEntry) - if !ok { - t.Fatalf("expected DeleteWALEntry: got %#v", e) - } + require.True(t, ok) - if got, exp := len(de.Keys), len(deleteEntry.Keys); got != exp { - t.Fatalf("key length mismatch: got %v, exp %v", got, exp) - } - - if got, exp := string(de.Keys[0]), string(deleteEntry.Keys[0]); got != exp { - t.Fatalf("key mismatch: got %v, exp %v", got, exp) - } + require.Equal(t, len(de.Keys), len(deleteKeys)) + require.Equal(t, string(de.Keys[0]), string(deleteKeys[0])) } func TestWALWriter_WriteMultiDeleteRange_Multiple(t *testing.T) { dir := MustTempDir() defer os.RemoveAll(dir) - f := MustTempFile(dir) - w := tsm1.NewWALSegmentWriter(f) + w := tsm1.NewWAL(dir) + defer w.Close() + require.NoError(t, w.Open()) p1 := tsm1.NewValue(1, 1.0) p2 := tsm1.NewValue(2, 2.0) p3 := tsm1.NewValue(3, 3.0) values := map[string][]tsm1.Value{ - "cpu,host=A#!~#value": []tsm1.Value{p1, p2, p3}, + "cpu,host=A#!~#value": {p1, p2, p3}, } - writeEntry := &tsm1.WriteWALEntry{ - Values: values, - } - - if err := w.Write(mustMarshalEntry(writeEntry)); err != nil { - fatal(t, "write points", err) - } - - if err := w.Flush(); err != nil { - fatal(t, "flush", err) - } + _, err := w.WriteMulti(values) + require.NoError(t, err) // Write the delete entry - deleteEntry := &tsm1.DeleteRangeWALEntry{ - Keys: [][]byte{[]byte("cpu,host=A#!~value")}, - Min: 2, - Max: 3, - } + deleteKeys := [][]byte{[]byte("cpu,host=A#!~value")} + deleteMin, deleteMax := int64(2), int64(3) - if err := w.Write(mustMarshalEntry(deleteEntry)); err != nil { - fatal(t, "write points", err) - } + _, err = w.DeleteRange(deleteKeys, deleteMin, deleteMax) + require.NoError(t, err) - if err := w.Flush(); err != nil { - fatal(t, "flush", err) - } + _, r := mustSegmentReader(t, w) + defer r.Close() - // Seek back to the beginning of the file for reading - if _, err := f.Seek(0, io.SeekStart); err != nil { - fatal(t, "seek", err) - } - - r := tsm1.NewWALSegmentReader(f) - - // Read the write points first - if !r.Next() { - t.Fatalf("expected next, got false") - } + require.True(t, r.Next()) we, err := r.Read() - if err != nil { - fatal(t, "read entry", err) - } + require.NoError(t, err) e, ok := we.(*tsm1.WriteWALEntry) - if !ok { - t.Fatalf("expected WriteWALEntry: got %#v", e) - } + require.True(t, ok) for k, v := range e.Values { - if got, exp := len(v), len(values[k]); got != exp { - t.Fatalf("values length mismatch: got %v, exp %v", got, exp) - } + require.Equal(t, len(v), len(values[k])) for i, vv := range v { - if got, exp := vv.String(), values[k][i].String(); got != exp { - t.Fatalf("points mismatch: got %v, exp %v", got, exp) - } + require.Equal(t, vv.String(), values[k][i].String()) } } // Read the delete second - if !r.Next() { - t.Fatalf("expected next, got false") - } + require.True(t, r.Next()) we, err = r.Read() - if err != nil { - fatal(t, "read entry", err) - } + require.NoError(t, err) de, ok := we.(*tsm1.DeleteRangeWALEntry) - if !ok { - t.Fatalf("expected DeleteWALEntry: got %#v", e) - } - - if got, exp := len(de.Keys), len(deleteEntry.Keys); got != exp { - t.Fatalf("key length mismatch: got %v, exp %v", got, exp) - } - - if got, exp := string(de.Keys[0]), string(deleteEntry.Keys[0]); got != exp { - t.Fatalf("key mismatch: got %v, exp %v", got, exp) - } - - if got, exp := de.Min, int64(2); got != exp { - t.Fatalf("min time mismatch: got %v, exp %v", got, exp) - } - - if got, exp := de.Max, int64(3); got != exp { - t.Fatalf("min time mismatch: got %v, exp %v", got, exp) - } + require.True(t, ok) + require.Equal(t, len(de.Keys), len(deleteKeys)) + require.Equal(t, string(de.Keys[0]), string(deleteKeys[0])) + require.Equal(t, de.Min, deleteMin) + require.Equal(t, de.Max, deleteMax) } func TestWAL_ClosedSegments(t *testing.T) { @@ -468,45 +292,30 @@ func TestWAL_ClosedSegments(t *testing.T) { defer os.RemoveAll(dir) w := tsm1.NewWAL(dir) - if err := w.Open(); err != nil { - t.Fatalf("error opening WAL: %v", err) - } + require.NoError(t, w.Open()) files, err := w.ClosedSegments() - if err != nil { - t.Fatalf("error getting closed segments: %v", err) - } + require.NoError(t, err) - if got, exp := len(files), 0; got != exp { - t.Fatalf("close segment length mismatch: got %v, exp %v", got, exp) - } + require.Equal(t, len(files), 0) - if _, err := w.WriteMulti(map[string][]tsm1.Value{ - "cpu,host=A#!~#value": []tsm1.Value{ + _, err = w.WriteMulti(map[string][]tsm1.Value{ + "cpu,host=A#!~#value": { tsm1.NewValue(1, 1.1), }, - }); err != nil { - t.Fatalf("error writing points: %v", err) - } + }) + require.NoError(t, err) - if err := w.Close(); err != nil { - t.Fatalf("error closing wal: %v", err) - } + require.NoError(t, w.Close()) // Re-open the WAL w = tsm1.NewWAL(dir) defer w.Close() - if err := w.Open(); err != nil { - t.Fatalf("error opening WAL: %v", err) - } + require.NoError(t, w.Open()) files, err = w.ClosedSegments() - if err != nil { - t.Fatalf("error getting closed segments: %v", err) - } - if got, exp := len(files), 0; got != exp { - t.Fatalf("close segment length mismatch: got %v, exp %v", got, exp) - } + require.NoError(t, err) + require.Equal(t, len(files), 0) } func TestWAL_Delete(t *testing.T) { @@ -514,41 +323,27 @@ func TestWAL_Delete(t *testing.T) { defer os.RemoveAll(dir) w := tsm1.NewWAL(dir) - if err := w.Open(); err != nil { - t.Fatalf("error opening WAL: %v", err) - } + require.NoError(t, w.Open()) files, err := w.ClosedSegments() - if err != nil { - t.Fatalf("error getting closed segments: %v", err) - } - if got, exp := len(files), 0; got != exp { - t.Fatalf("close segment length mismatch: got %v, exp %v", got, exp) - } + require.NoError(t, err) - if _, err := w.Delete([][]byte{[]byte("cpu")}); err != nil { - t.Fatalf("error writing points: %v", err) - } + require.Equal(t, len(files), 0) - if err := w.Close(); err != nil { - t.Fatalf("error closing wal: %v", err) - } + _, err = w.Delete([][]byte{[]byte("cpu")}) + require.NoError(t, err) + + require.NoError(t, w.Close()) // Re-open the WAL w = tsm1.NewWAL(dir) defer w.Close() - if err := w.Open(); err != nil { - t.Fatalf("error opening WAL: %v", err) - } + require.NoError(t, w.Open()) files, err = w.ClosedSegments() - if err != nil { - t.Fatalf("error getting closed segments: %v", err) - } - if got, exp := len(files), 0; got != exp { - t.Fatalf("close segment length mismatch: got %v, exp %v", got, exp) - } + require.NoError(t, err) + require.Equal(t, len(files), 0) } func TestWALWriter_Corrupt(t *testing.T) { @@ -560,52 +355,40 @@ func TestWALWriter_Corrupt(t *testing.T) { p1 := tsm1.NewValue(1, 1.1) values := map[string][]tsm1.Value{ - "cpu,host=A#!~#float": []tsm1.Value{p1}, + "cpu,host=A#!~#float": {p1}, } entry := &tsm1.WriteWALEntry{ Values: values, } - if err := w.Write(mustMarshalEntry(entry)); err != nil { - fatal(t, "write points", err) - } - if err := w.Flush(); err != nil { - fatal(t, "flush", err) - } + require.NoError(t, w.Write(mustMarshalEntry(entry))) + require.NoError(t, w.Flush()) // Write some random bytes to the file to simulate corruption. - if _, err := f.Write(corruption); err != nil { - fatal(t, "corrupt WAL segment", err) - } + _, err := f.Write(corruption) + require.NoError(t, err) // Create the WAL segment reader. - if _, err := f.Seek(0, io.SeekStart); err != nil { - fatal(t, "seek", err) - } + _, err = f.Seek(0, io.SeekStart) + require.NoError(t, err) + r := tsm1.NewWALSegmentReader(f) // Try to decode two entries. + require.True(t, r.Next()) - if !r.Next() { - t.Fatalf("expected next, got false") - } - if _, err := r.Read(); err != nil { - fatal(t, "read entry", err) - } + _, err = r.Read() + require.NoError(t, err) - if !r.Next() { - t.Fatalf("expected next, got false") - } - if _, err := r.Read(); err == nil { - fatal(t, "read entry did not return err", nil) - } + require.True(t, r.Next()) + + _, err = r.Read() + require.Error(t, err) // Count should only return size of valid data. - expCount := MustReadFileSize(f) - int64(len(corruption)) - if n := r.Count(); n != expCount { - t.Fatalf("wrong count of bytes read, got %d, exp %d", n, expCount) - } + expCount := mustReadFileSize(f) - int64(len(corruption)) + require.Equal(t, expCount, r.Count()) } // Reproduces a `panic: runtime error: makeslice: cap out of range` when run with @@ -619,7 +402,7 @@ func TestWALSegmentReader_Corrupt(t *testing.T) { p4 := tsm1.NewValue(1, "string") values := map[string][]tsm1.Value{ - "cpu,host=A#!~#string": []tsm1.Value{p4, p4}, + "cpu,host=A#!~#string": {p4, p4}, } entry := &tsm1.WriteWALEntry{ @@ -632,18 +415,12 @@ func TestWALSegmentReader_Corrupt(t *testing.T) { // negative count and a panic when reading the segment. b[25] = 255 - if err := w.Write(typ, b); err != nil { - fatal(t, "write points", err) - } - - if err := w.Flush(); err != nil { - fatal(t, "flush", err) - } + require.NoError(t, w.Write(typ, b)) + require.NoError(t, w.Flush()) // Create the WAL segment reader. - if _, err := f.Seek(0, io.SeekStart); err != nil { - fatal(t, "seek", err) - } + _, err := f.Seek(0, io.SeekStart) + require.NoError(t, err) r := tsm1.NewWALSegmentReader(f) defer r.Close() @@ -659,48 +436,34 @@ func TestWALRollSegment(t *testing.T) { defer os.RemoveAll(dir) w := tsm1.NewWAL(dir) - if err := w.Open(); err != nil { - t.Fatalf("error opening WAL: %v", err) - } + require.NoError(t, w.Open()) const segSize = 1024 w.SegmentSize = segSize values := map[string][]tsm1.Value{ - "cpu,host=A#!~#value": []tsm1.Value{tsm1.NewValue(1, 1.0)}, - "cpu,host=B#!~#value": []tsm1.Value{tsm1.NewValue(1, 1.0)}, - "cpu,host=C#!~#value": []tsm1.Value{tsm1.NewValue(1, 1.0)}, - } - if _, err := w.WriteMulti(values); err != nil { - fatal(t, "write points", err) + "cpu,host=A#!~#value": {tsm1.NewValue(1, 1.0)}, + "cpu,host=B#!~#value": {tsm1.NewValue(1, 1.0)}, + "cpu,host=C#!~#value": {tsm1.NewValue(1, 1.0)}, } + _, err := w.WriteMulti(values) + require.NoError(t, err) files, err := ioutil.ReadDir(w.Path()) - if err != nil { - fatal(t, "ReadDir", err) - } - if len(files) != 1 { - t.Fatalf("unexpected segments size %d", len(files)) - } + require.NoError(t, err) + require.Equal(t, 1, len(files)) encodeSize := files[0].Size() for i := 0; i < 100; i++ { - if _, err := w.WriteMulti(values); err != nil { - fatal(t, "write points", err) - } + _, err := w.WriteMulti(values) + require.NoError(t, err) } files, err = ioutil.ReadDir(w.Path()) - if err != nil { - fatal(t, "ReadDir", err) - } + require.NoError(t, err) for _, f := range files { - if f.Size() > int64(segSize)+encodeSize { - t.Fatalf("unexpected segment size %d", f.Size()) - } - } - if err := w.Close(); err != nil { - t.Fatalf("error closing wal: %v", err) + require.True(t, f.Size() <= int64(segSize)+encodeSize) } + require.NoError(t, w.Close()) } func TestWAL_DiskSize(t *testing.T) { @@ -798,11 +561,11 @@ func TestWriteWALSegment_UnmarshalBinary_WriteWALCorrupt(t *testing.T) { p5 := tsm1.NewValue(1, uint64(1)) values := map[string][]tsm1.Value{ - "cpu,host=A#!~#float": []tsm1.Value{p1, p1}, - "cpu,host=A#!~#int": []tsm1.Value{p2, p2}, - "cpu,host=A#!~#bool": []tsm1.Value{p3, p3}, - "cpu,host=A#!~#string": []tsm1.Value{p4, p4}, - "cpu,host=A#!~#unsigned": []tsm1.Value{p5, p5}, + "cpu,host=A#!~#float": {p1, p1}, + "cpu,host=A#!~#int": {p2, p2}, + "cpu,host=A#!~#bool": {p3, p3}, + "cpu,host=A#!~#string": {p4, p4}, + "cpu,host=A#!~#unsigned": {p5, p5}, } w := &tsm1.WriteWALEntry{ @@ -810,9 +573,7 @@ func TestWriteWALSegment_UnmarshalBinary_WriteWALCorrupt(t *testing.T) { } b, err := w.MarshalBinary() - if err != nil { - t.Fatalf("unexpected error, got %v", err) - } + require.NoError(t, err) // Test every possible truncation of a write WAL entry for i := 0; i < len(b); i++ { @@ -820,9 +581,7 @@ func TestWriteWALSegment_UnmarshalBinary_WriteWALCorrupt(t *testing.T) { truncated := make([]byte, i) copy(truncated, b[:i]) err := w.UnmarshalBinary(truncated) - if err != nil && err != tsm1.ErrWALCorrupt { - t.Fatalf("unexpected error: %v", err) - } + require.True(t, err == nil || err == tsm1.ErrWALCorrupt) } } @@ -853,21 +612,15 @@ func TestDeleteWALEntry_UnmarshalBinary(t *testing.T) { }, } - for i, example := range examples { + for _, example := range examples { w := &tsm1.DeleteWALEntry{Keys: slices.StringsToBytes(example.In...)} b, err := w.MarshalBinary() - if err != nil { - t.Fatalf("[example %d] unexpected error, got %v", i, err) - } + require.NoError(t, err) out := &tsm1.DeleteWALEntry{} - if err := out.UnmarshalBinary(b); err != nil { - t.Fatalf("[example %d] %v", i, err) - } + require.NoError(t, out.UnmarshalBinary(b)) - if !reflect.DeepEqual(example.Out, out.Keys) { - t.Errorf("[example %d] got %v, expected %v", i, out.Keys, example.Out) - } + require.True(t, reflect.DeepEqual(example.Out, out.Keys)) } } @@ -877,9 +630,7 @@ func TestWriteWALSegment_UnmarshalBinary_DeleteWALCorrupt(t *testing.T) { } b, err := w.MarshalBinary() - if err != nil { - t.Fatalf("unexpected error, got %v", err) - } + require.NoError(t, err) // Test every possible truncation of a write WAL entry for i := 0; i < len(b); i++ { @@ -887,9 +638,7 @@ func TestWriteWALSegment_UnmarshalBinary_DeleteWALCorrupt(t *testing.T) { truncated := make([]byte, i) copy(truncated, b[:i]) err := w.UnmarshalBinary(truncated) - if err != nil && err != tsm1.ErrWALCorrupt { - t.Fatalf("unexpected error: %v", err) - } + require.True(t, err == nil || err == tsm1.ErrWALCorrupt) } } @@ -901,9 +650,7 @@ func TestWriteWALSegment_UnmarshalBinary_DeleteRangeWALCorrupt(t *testing.T) { } b, err := w.MarshalBinary() - if err != nil { - t.Fatalf("unexpected error, got %v", err) - } + require.NoError(t, err) // Test every possible truncation of a write WAL entry for i := 0; i < len(b); i++ { @@ -911,9 +658,81 @@ func TestWriteWALSegment_UnmarshalBinary_DeleteRangeWALCorrupt(t *testing.T) { truncated := make([]byte, i) copy(truncated, b[:i]) err := w.UnmarshalBinary(truncated) - if err != nil && err != tsm1.ErrWALCorrupt { - t.Fatalf("unexpected error: %v", err) - } + require.True(t, err == nil || err == tsm1.ErrWALCorrupt) + } +} + +func BenchmarkWAL_WriteMulti_Concurrency(b *testing.B) { + benchmarks := []struct { + concurrency int + }{ + {1}, + {12}, + {24}, + {50}, + {100}, + {200}, + {300}, + {400}, + {500}, + } + + for _, bm := range benchmarks { + b.Run(fmt.Sprintf("concurrency-%d", bm.concurrency), func(b *testing.B) { + points := map[string][]tsm1.Value{} + for i := 0; i < 5000; i++ { + k := "cpu,host=A#!~#value" + points[k] = append(points[k], tsm1.NewValue(int64(i), 1.1)) + } + + dir := MustTempDir() + defer os.RemoveAll(dir) + + w := tsm1.NewWAL(dir) + defer w.Close() + require.NoError(b, w.Open()) + + start := make(chan struct{}) + stop := make(chan struct{}) + + succeed := make(chan struct{}, 1000) + defer close(succeed) + + wg := &sync.WaitGroup{} + for i := 0; i < bm.concurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + <-start + + for { + select { + case <-stop: + return + default: + _, err := w.WriteMulti(points) + require.NoError(b, err) + + succeed <- struct{}{} + } + } + }() + } + + b.ResetTimer() + + close(start) + + for i := 0; i < b.N; i++ { + <-succeed + } + + b.StopTimer() + + close(stop) + wg.Wait() + }) } } @@ -936,9 +755,7 @@ func BenchmarkWALSegmentWriter(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - if err := w.Write(mustMarshalEntry(write)); err != nil { - b.Fatalf("unexpected error writing entry: %v", err) - } + require.NoError(b, w.Write(mustMarshalEntry(write))) } } @@ -960,9 +777,7 @@ func BenchmarkWALSegmentReader(b *testing.B) { } for i := 0; i < 100; i++ { - if err := w.Write(mustMarshalEntry(write)); err != nil { - b.Fatalf("unexpected error writing entry: %v", err) - } + require.NoError(b, w.Write(mustMarshalEntry(write))) } r := tsm1.NewWALSegmentReader(f) @@ -975,15 +790,26 @@ func BenchmarkWALSegmentReader(b *testing.B) { for r.Next() { _, err := r.Read() - if err != nil { - b.Fatalf("unexpected error reading entry: %v", err) - } + require.NoError(b, err) } } } -// MustReadFileSize returns the size of the file, or panics. -func MustReadFileSize(f *os.File) int64 { +func mustSegmentReader(t *testing.T, w *tsm1.WAL) (*os.File, *tsm1.WALSegmentReader) { + files, err := filepath.Glob(filepath.Join(w.Path(), + fmt.Sprintf("%s*.%s", tsm1.WALFilePrefix, tsm1.WALFileExtension))) + require.NoError(t, err) + require.Equal(t, 1, len(files)) + + sort.Strings(files) + + f, err := os.OpenFile(files[0], os.O_CREATE|os.O_RDWR, 0666) + require.NoError(t, err) + return f, tsm1.NewWALSegmentReader(f) +} + +// mustReadFileSize returns the size of the file, or panics. +func mustReadFileSize(f *os.File) int64 { stat, err := os.Stat(f.Name()) if err != nil { panic(fmt.Sprintf("failed to get size of file at %s: %s", f.Name(), err.Error())) diff --git a/tsdb/series_partition.go b/tsdb/series_partition.go index 54e3b9464f..63ad90fc85 100644 --- a/tsdb/series_partition.go +++ b/tsdb/series_partition.go @@ -88,7 +88,7 @@ func (p *SeriesPartition) Open() error { p.index = NewSeriesIndex(p.IndexPath()) if err := p.index.Open(); err != nil { return err - } else if p.index.Recover(p.segments); err != nil { + } else if err := p.index.Recover(p.segments); err != nil { return err }