influxdb/tsdb/engine/tsm1/wal_test.go

822 lines
18 KiB
Go

package tsm1_test
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"reflect"
"sort"
"sync"
"testing"
"time"
"github.com/golang/snappy"
"github.com/influxdata/influxdb/v2/pkg/slices"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
"github.com/stretchr/testify/require"
)
func NewWAL(path string, maxConcurrentWrites int, maxWriteDelay time.Duration) *tsm1.WAL {
// EngineTags is only for metrics, not needed for tests
return tsm1.NewWAL(path, maxConcurrentWrites, maxWriteDelay, tsdb.EngineTags{})
}
func TestWALWriter_WriteMulti_Single(t *testing.T) {
dir := t.TempDir()
w := NewWAL(dir, 0, 0)
defer w.Close()
require.NoError(t, w.Open())
p1 := tsm1.NewValue(1, 1.1)
p2 := tsm1.NewValue(1, int64(1))
p3 := tsm1.NewValue(1, true)
p4 := tsm1.NewValue(1, "string")
p5 := tsm1.NewValue(1, ^uint64(0))
values := map[string][]tsm1.Value{
"cpu,host=A#!~#float": {p1},
"cpu,host=A#!~#int": {p2},
"cpu,host=A#!~#bool": {p3},
"cpu,host=A#!~#string": {p4},
"cpu,host=A#!~#unsigned": {p5},
}
_, err := w.WriteMulti(context.Background(), values)
require.NoError(t, err)
f, r := mustSegmentReader(t, w)
defer r.Close()
require.True(t, r.Next())
we, err := r.Read()
require.NoError(t, err)
e, ok := we.(*tsm1.WriteWALEntry)
require.True(t, ok)
for k, v := range e.Values {
for i, vv := range v {
require.Equal(t, values[k][i].String(), vv.String())
}
}
require.Equal(t, r.Count(), mustReadFileSize(f))
}
func TestWALWriter_WriteMulti_LargeBatch(t *testing.T) {
dir := t.TempDir()
w := NewWAL(dir, 0, 0)
defer w.Close()
require.NoError(t, w.Open())
var points []tsm1.Value
for i := 0; i < 100000; i++ {
points = append(points, tsm1.NewValue(int64(i), int64(1)))
}
values := map[string][]tsm1.Value{
"cpu,host=A,server=01,foo=bar,tag=really-long#!~#float": points,
"mem,host=A,server=01,foo=bar,tag=really-long#!~#float": points,
}
_, err := w.WriteMulti(context.Background(), values)
require.NoError(t, err)
f, r := mustSegmentReader(t, w)
defer r.Close()
require.True(t, r.Next())
we, err := r.Read()
require.NoError(t, err)
e, ok := we.(*tsm1.WriteWALEntry)
require.True(t, ok)
for k, v := range e.Values {
for i, vv := range v {
require.Equal(t, values[k][i].String(), vv.String())
}
}
require.Equal(t, r.Count(), mustReadFileSize(f))
}
func TestWALWriter_WriteMulti_Multiple(t *testing.T) {
dir := t.TempDir()
w := NewWAL(dir, 0, 0)
defer w.Close()
require.NoError(t, w.Open())
p1 := tsm1.NewValue(1, int64(1))
p2 := tsm1.NewValue(1, int64(2))
exp := []struct {
key string
values []tsm1.Value
}{
{"cpu,host=A#!~#value", []tsm1.Value{p1}},
{"cpu,host=B#!~#value", []tsm1.Value{p2}},
}
for _, v := range exp {
_, err := w.WriteMulti(context.Background(), map[string][]tsm1.Value{v.key: v.values})
require.NoError(t, err)
}
f, r := mustSegmentReader(t, w)
defer r.Close()
for _, ep := range exp {
require.True(t, r.Next())
we, err := r.Read()
require.NoError(t, err)
e, ok := we.(*tsm1.WriteWALEntry)
require.True(t, ok)
for k, v := range e.Values {
require.Equal(t, k, ep.key)
require.Equal(t, len(v), len(ep.values))
for i, vv := range v {
require.Equal(t, vv.String(), ep.values[i].String())
}
}
}
require.Equal(t, r.Count(), mustReadFileSize(f))
}
func TestWALWriter_WriteDelete_Single(t *testing.T) {
dir := t.TempDir()
w := NewWAL(dir, 0, 0)
defer w.Close()
require.NoError(t, w.Open())
keys := [][]byte{[]byte("cpu")}
_, err := w.Delete(context.Background(), keys)
require.NoError(t, err)
_, r := mustSegmentReader(t, w)
defer r.Close()
require.True(t, r.Next())
we, err := r.Read()
require.NoError(t, err)
e, ok := we.(*tsm1.DeleteWALEntry)
require.True(t, ok)
require.Equal(t, len(e.Keys), len(keys))
require.Equal(t, string(e.Keys[0]), string(keys[0]))
}
func TestWALWriter_WriteMultiDelete_Multiple(t *testing.T) {
dir := t.TempDir()
w := NewWAL(dir, 0, 0)
defer w.Close()
require.NoError(t, w.Open())
p1 := tsm1.NewValue(1, true)
values := map[string][]tsm1.Value{
"cpu,host=A#!~#value": {p1},
}
_, err := w.WriteMulti(context.Background(), values)
require.NoError(t, err)
deleteKeys := [][]byte{[]byte("cpu,host=A#!~value")}
_, err = w.Delete(context.Background(), deleteKeys)
require.NoError(t, err)
_, r := mustSegmentReader(t, w)
defer r.Close()
require.True(t, r.Next())
we, err := r.Read()
require.NoError(t, err)
e, ok := we.(*tsm1.WriteWALEntry)
require.True(t, ok)
for k, v := range e.Values {
require.Equal(t, len(v), len(values[k]))
for i, vv := range v {
require.Equal(t, vv.String(), values[k][i].String())
}
}
// Read the delete second
require.True(t, r.Next())
we, err = r.Read()
require.NoError(t, err)
de, ok := we.(*tsm1.DeleteWALEntry)
require.True(t, ok)
require.Equal(t, len(de.Keys), len(deleteKeys))
require.Equal(t, string(de.Keys[0]), string(deleteKeys[0]))
}
func TestWALWriter_WriteMultiDeleteRange_Multiple(t *testing.T) {
dir := t.TempDir()
w := NewWAL(dir, 0, 0)
defer w.Close()
require.NoError(t, w.Open())
p1 := tsm1.NewValue(1, 1.0)
p2 := tsm1.NewValue(2, 2.0)
p3 := tsm1.NewValue(3, 3.0)
values := map[string][]tsm1.Value{
"cpu,host=A#!~#value": {p1, p2, p3},
}
_, err := w.WriteMulti(context.Background(), values)
require.NoError(t, err)
// Write the delete entry
deleteKeys := [][]byte{[]byte("cpu,host=A#!~value")}
deleteMin, deleteMax := int64(2), int64(3)
_, err = w.DeleteRange(context.Background(), deleteKeys, deleteMin, deleteMax)
require.NoError(t, err)
_, r := mustSegmentReader(t, w)
defer r.Close()
require.True(t, r.Next())
we, err := r.Read()
require.NoError(t, err)
e, ok := we.(*tsm1.WriteWALEntry)
require.True(t, ok)
for k, v := range e.Values {
require.Equal(t, len(v), len(values[k]))
for i, vv := range v {
require.Equal(t, vv.String(), values[k][i].String())
}
}
// Read the delete second
require.True(t, r.Next())
we, err = r.Read()
require.NoError(t, err)
de, ok := we.(*tsm1.DeleteRangeWALEntry)
require.True(t, ok)
require.Equal(t, len(de.Keys), len(deleteKeys))
require.Equal(t, string(de.Keys[0]), string(deleteKeys[0]))
require.Equal(t, de.Min, deleteMin)
require.Equal(t, de.Max, deleteMax)
}
func TestWAL_ClosedSegments(t *testing.T) {
dir := t.TempDir()
w := NewWAL(dir, 0, 0)
require.NoError(t, w.Open())
files, err := w.ClosedSegments()
require.NoError(t, err)
require.Equal(t, len(files), 0)
_, err = w.WriteMulti(context.Background(), map[string][]tsm1.Value{
"cpu,host=A#!~#value": {
tsm1.NewValue(1, 1.1),
},
})
require.NoError(t, err)
require.NoError(t, w.Close())
// Re-open the WAL
w = NewWAL(dir, 0, 0)
defer w.Close()
require.NoError(t, w.Open())
files, err = w.ClosedSegments()
require.NoError(t, err)
require.Equal(t, len(files), 0)
}
func TestWAL_Delete(t *testing.T) {
dir := t.TempDir()
w := NewWAL(dir, 0, 0)
require.NoError(t, w.Open())
files, err := w.ClosedSegments()
require.NoError(t, err)
require.Equal(t, len(files), 0)
_, err = w.Delete(context.Background(), [][]byte{[]byte("cpu")})
require.NoError(t, err)
require.NoError(t, w.Close())
// Re-open the WAL
w = NewWAL(dir, 0, 0)
defer w.Close()
require.NoError(t, w.Open())
files, err = w.ClosedSegments()
require.NoError(t, err)
require.Equal(t, len(files), 0)
}
func TestWALWriter_Corrupt(t *testing.T) {
dir := t.TempDir()
f := MustTempFile(t, dir)
w := tsm1.NewWALSegmentWriter(f)
corruption := []byte{1, 4, 0, 0, 0}
p1 := tsm1.NewValue(1, 1.1)
values := map[string][]tsm1.Value{
"cpu,host=A#!~#float": {p1},
}
entry := &tsm1.WriteWALEntry{
Values: values,
}
require.NoError(t, w.Write(mustMarshalEntry(entry)))
require.NoError(t, w.Flush())
// Write some random bytes to the file to simulate corruption.
_, err := f.Write(corruption)
require.NoError(t, err)
// Create the WAL segment reader.
_, err = f.Seek(0, io.SeekStart)
require.NoError(t, err)
r := tsm1.NewWALSegmentReader(f)
// Try to decode two entries.
require.True(t, r.Next())
_, err = r.Read()
require.NoError(t, err)
require.True(t, r.Next())
_, err = r.Read()
require.Error(t, err)
// Count should only return size of valid data.
expCount := mustReadFileSize(f) - int64(len(corruption))
require.Equal(t, expCount, r.Count())
}
// Reproduces a `panic: runtime error: makeslice: cap out of range` when run with
// GOARCH=386 go test -run TestWALSegmentReader_Corrupt -v ./tsdb/engine/tsm1/
func TestWALSegmentReader_Corrupt(t *testing.T) {
dir := t.TempDir()
f := MustTempFile(t, dir)
w := tsm1.NewWALSegmentWriter(f)
p4 := tsm1.NewValue(1, "string")
values := map[string][]tsm1.Value{
"cpu,host=A#!~#string": {p4, p4},
}
entry := &tsm1.WriteWALEntry{
Values: values,
}
typ, b := mustMarshalEntry(entry)
// This causes the nvals field to overflow on 32 bit systems which produces a
// negative count and a panic when reading the segment.
b[25] = 255
require.NoError(t, w.Write(typ, b))
require.NoError(t, w.Flush())
// Create the WAL segment reader.
_, err := f.Seek(0, io.SeekStart)
require.NoError(t, err)
r := tsm1.NewWALSegmentReader(f)
defer r.Close()
// Try to decode two entries.
for r.Next() {
r.Read()
}
}
func TestWALRollSegment(t *testing.T) {
dir := t.TempDir()
w := NewWAL(dir, 0, 0)
require.NoError(t, w.Open())
const segSize = 1024
w.SegmentSize = segSize
values := map[string][]tsm1.Value{
"cpu,host=A#!~#value": {tsm1.NewValue(1, 1.0)},
"cpu,host=B#!~#value": {tsm1.NewValue(1, 1.0)},
"cpu,host=C#!~#value": {tsm1.NewValue(1, 1.0)},
}
_, err := w.WriteMulti(context.Background(), values)
require.NoError(t, err)
files, err := os.ReadDir(w.Path())
require.NoError(t, err)
require.Equal(t, 1, len(files))
file, err := files[0].Info()
require.NoError(t, err)
encodeSize := file.Size()
for i := 0; i < 100; i++ {
_, err := w.WriteMulti(context.Background(), values)
require.NoError(t, err)
}
files, err = os.ReadDir(w.Path())
require.NoError(t, err)
for _, f := range files {
file, err := f.Info()
require.NoError(t, err)
require.True(t, file.Size() <= int64(segSize)+encodeSize)
}
require.NoError(t, w.Close())
}
func TestWAL_DiskSize(t *testing.T) {
test := func(w *tsm1.WAL, oldZero, curZero bool) {
// get disk size by reading file
files, err := os.ReadDir(w.Path())
require.NoError(t, err)
sort.Slice(files, func(i, j int) bool {
return files[i].Name() < files[j].Name()
})
var old, cur int64
if len(files) > 0 {
file, err := files[len(files)-1].Info()
require.NoError(t, err)
cur = file.Size()
for i := 0; i < len(files)-1; i++ {
file, err := files[i].Info()
require.NoError(t, err)
old += file.Size()
}
}
// test zero size condition
require.False(t, oldZero && old > 0)
require.False(t, !oldZero && old == 0)
require.False(t, curZero && cur > 0)
require.False(t, !curZero && cur == 0)
// test method DiskSizeBytes
require.Equal(t, old+cur, w.DiskSizeBytes(), "total disk size")
}
dir := t.TempDir()
w := NewWAL(dir, 0, 0)
const segSize = 1024
w.SegmentSize = segSize
// open
require.NoError(t, w.Open())
test(w, true, true)
// write some values, the total size of these values does not exceed segSize(1024),
// so rollSegment will not be triggered
values := map[string][]tsm1.Value{
"cpu,host=A#!~#value": {tsm1.NewValue(1, 1.0)},
"cpu,host=B#!~#value": {tsm1.NewValue(1, 1.0)},
"cpu,host=C#!~#value": {tsm1.NewValue(1, 1.0)},
}
_, err := w.WriteMulti(context.Background(), values)
require.NoError(t, err)
test(w, true, false)
// write some values, the total size of these values exceeds segSize(1024),
// so rollSegment will be triggered
for i := 0; i < 100; i++ {
_, err := w.WriteMulti(context.Background(), values)
require.NoError(t, err)
}
test(w, false, false)
// reopen
require.NoError(t, w.Close())
require.NoError(t, w.Open())
test(w, false, false)
// remove
closedSegments, err := w.ClosedSegments()
require.NoError(t, err)
require.NoError(t, w.Remove(closedSegments))
test(w, true, false)
require.NoError(t, w.Close())
}
func TestWriteWALSegment_UnmarshalBinary_WriteWALCorrupt(t *testing.T) {
p1 := tsm1.NewValue(1, 1.1)
p2 := tsm1.NewValue(1, int64(1))
p3 := tsm1.NewValue(1, true)
p4 := tsm1.NewValue(1, "string")
p5 := tsm1.NewValue(1, uint64(1))
values := map[string][]tsm1.Value{
"cpu,host=A#!~#float": {p1, p1},
"cpu,host=A#!~#int": {p2, p2},
"cpu,host=A#!~#bool": {p3, p3},
"cpu,host=A#!~#string": {p4, p4},
"cpu,host=A#!~#unsigned": {p5, p5},
}
w := &tsm1.WriteWALEntry{
Values: values,
}
b, err := w.MarshalBinary()
require.NoError(t, err)
// Test every possible truncation of a write WAL entry
for i := 0; i < len(b); i++ {
// re-allocated to ensure capacity would be exceed if slicing
truncated := make([]byte, i)
copy(truncated, b[:i])
err := w.UnmarshalBinary(truncated)
require.True(t, err == nil || err == tsm1.ErrWALCorrupt)
}
}
func TestDeleteWALEntry_UnmarshalBinary(t *testing.T) {
examples := []struct {
In []string
Out [][]byte
}{
{
In: []string{""},
Out: nil,
},
{
In: []string{"foo"},
Out: [][]byte{[]byte("foo")},
},
{
In: []string{"foo", "bar"},
Out: [][]byte{[]byte("foo"), []byte("bar")},
},
{
In: []string{"foo", "bar", "z", "abc"},
Out: [][]byte{[]byte("foo"), []byte("bar"), []byte("z"), []byte("abc")},
},
{
In: []string{"foo", "bar", "z", "a"},
Out: [][]byte{[]byte("foo"), []byte("bar"), []byte("z"), []byte("a")},
},
}
for _, example := range examples {
w := &tsm1.DeleteWALEntry{Keys: slices.StringsToBytes(example.In...)}
b, err := w.MarshalBinary()
require.NoError(t, err)
out := &tsm1.DeleteWALEntry{}
require.NoError(t, out.UnmarshalBinary(b))
require.True(t, reflect.DeepEqual(example.Out, out.Keys))
}
}
func TestWriteWALSegment_UnmarshalBinary_DeleteWALCorrupt(t *testing.T) {
w := &tsm1.DeleteWALEntry{
Keys: [][]byte{[]byte("foo"), []byte("bar")},
}
b, err := w.MarshalBinary()
require.NoError(t, err)
// Test every possible truncation of a write WAL entry
for i := 0; i < len(b); i++ {
// re-allocated to ensure capacity would be exceed if slicing
truncated := make([]byte, i)
copy(truncated, b[:i])
err := w.UnmarshalBinary(truncated)
require.True(t, err == nil || err == tsm1.ErrWALCorrupt)
}
}
func TestWriteWALSegment_UnmarshalBinary_DeleteRangeWALCorrupt(t *testing.T) {
w := &tsm1.DeleteRangeWALEntry{
Keys: [][]byte{[]byte("foo"), []byte("bar")},
Min: 1,
Max: 2,
}
b, err := w.MarshalBinary()
require.NoError(t, err)
// Test every possible truncation of a write WAL entry
for i := 0; i < len(b); i++ {
// re-allocated to ensure capacity would be exceed if slicing
truncated := make([]byte, i)
copy(truncated, b[:i])
err := w.UnmarshalBinary(truncated)
require.True(t, err == nil || err == tsm1.ErrWALCorrupt)
}
}
func BenchmarkWAL_WriteMulti_Concurrency(b *testing.B) {
benchmarks := []struct {
concurrency int
}{
{1},
{12},
{24},
{50},
{100},
{200},
{300},
{400},
{500},
}
for _, bm := range benchmarks {
b.Run(fmt.Sprintf("concurrency-%d", bm.concurrency), func(b *testing.B) {
points := map[string][]tsm1.Value{}
for i := 0; i < 5000; i++ {
k := "cpu,host=A#!~#value"
points[k] = append(points[k], tsm1.NewValue(int64(i), 1.1))
}
dir := b.TempDir()
w := NewWAL(dir, 0, 0)
defer w.Close()
require.NoError(b, w.Open())
start := make(chan struct{})
stop := make(chan struct{})
succeed := make(chan struct{}, 1000)
defer close(succeed)
wg := &sync.WaitGroup{}
for i := 0; i < bm.concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
<-start
for {
select {
case <-stop:
return
default:
_, err := w.WriteMulti(context.Background(), points)
require.NoError(b, err)
succeed <- struct{}{}
}
}
}()
}
b.ResetTimer()
close(start)
for i := 0; i < b.N; i++ {
<-succeed
}
b.StopTimer()
close(stop)
wg.Wait()
})
}
}
func BenchmarkWALSegmentWriter(b *testing.B) {
points := map[string][]tsm1.Value{}
for i := 0; i < 5000; i++ {
k := "cpu,host=A#!~#value"
points[k] = append(points[k], tsm1.NewValue(int64(i), 1.1))
}
dir := b.TempDir()
f := MustTempFile(b, dir)
w := tsm1.NewWALSegmentWriter(f)
write := &tsm1.WriteWALEntry{
Values: points,
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
require.NoError(b, w.Write(mustMarshalEntry(write)))
}
}
func BenchmarkWALSegmentReader(b *testing.B) {
points := map[string][]tsm1.Value{}
for i := 0; i < 5000; i++ {
k := "cpu,host=A#!~#value"
points[k] = append(points[k], tsm1.NewValue(int64(i), 1.1))
}
dir := b.TempDir()
f := MustTempFile(b, dir)
w := tsm1.NewWALSegmentWriter(f)
write := &tsm1.WriteWALEntry{
Values: points,
}
for i := 0; i < 100; i++ {
require.NoError(b, w.Write(mustMarshalEntry(write)))
}
r := tsm1.NewWALSegmentReader(f)
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
f.Seek(0, io.SeekStart)
b.StartTimer()
for r.Next() {
_, err := r.Read()
require.NoError(b, err)
}
}
}
func mustSegmentReader(t *testing.T, w *tsm1.WAL) (*os.File, *tsm1.WALSegmentReader) {
files, err := filepath.Glob(filepath.Join(w.Path(),
fmt.Sprintf("%s*.%s", tsm1.WALFilePrefix, tsm1.WALFileExtension)))
require.NoError(t, err)
require.Equal(t, 1, len(files))
sort.Strings(files)
f, err := os.OpenFile(files[0], os.O_CREATE|os.O_RDWR, 0666)
require.NoError(t, err)
return f, tsm1.NewWALSegmentReader(f)
}
// mustReadFileSize returns the size of the file, or panics.
func mustReadFileSize(f *os.File) int64 {
stat, err := os.Stat(f.Name())
if err != nil {
panic(fmt.Sprintf("failed to get size of file at %s: %s", f.Name(), err.Error()))
}
return stat.Size()
}
func mustMarshalEntry(entry tsm1.WALEntry) (tsm1.WalEntryType, []byte) {
bytes := make([]byte, 1024<<2)
b, err := entry.Encode(bytes)
if err != nil {
panic(fmt.Sprintf("error encoding: %v", err))
}
return entry.Type(), snappy.Encode(b, b)
}