influxdb/tsdb/engine/tsm1/engine_test.go

1084 lines
32 KiB
Go

package tsm1_test
import (
"archive/tar"
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"math"
"math/rand"
"os"
"path"
"path/filepath"
"reflect"
"runtime"
"strings"
"sync"
"testing"
"time"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/deep"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
"github.com/influxdata/influxdb/tsdb/index/inmem"
"github.com/influxdata/influxql"
)
/*
// Ensure engine can load the metadata index after reopening.
func TestEngine_LoadMetadataIndex(t *testing.T) {
e := MustOpenEngine()
defer e.Close()
if err := e.WritePointsString(`cpu,host=A value=1.1 1000000000`); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
// Ensure we can close and load index from the WAL
if err := e.Reopen(); err != nil {
t.Fatal(err)
}
// Load metadata index.
index := MustNewDatabaseIndex("db")
if err := e.LoadMetadataIndex(1, index); err != nil {
t.Fatal(err)
}
// Verify index is correct.
m, err := index.Measurement([]byte("cpu"))
if err != nil {
t.Fatal(err)
} else if m == nil {
t.Fatal("measurement not found")
} else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags(), models.NewTags(map[string]string{"host": "A"})) {
t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags())
}
// write the snapshot, ensure we can close and load index from TSM
if err := e.WriteSnapshot(); err != nil {
t.Fatalf("error writing snapshot: %s", err.Error())
}
// Ensure we can close and load index from the WAL
if err := e.Reopen(); err != nil {
t.Fatal(err)
}
// Load metadata index.
index = MustNewDatabaseIndex("db")
if err := e.LoadMetadataIndex(1, index); err != nil {
t.Fatal(err)
}
// Verify index is correct.
if m, err = index.Measurement([]byte("cpu")); err != nil {
t.Fatal(err)
} else if m == nil {
t.Fatal("measurement not found")
} else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags(), models.NewTags(map[string]string{"host": "A"})) {
t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags())
}
// Write a new point and ensure we can close and load index from TSM and WAL
if err := e.WritePoints([]models.Point{
MustParsePointString("cpu,host=B value=1.2 2000000000"),
}); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
// Ensure we can close and load index from the TSM & WAL
if err := e.Reopen(); err != nil {
t.Fatal(err)
}
// Load metadata index.
index = MustNewDatabaseIndex("db")
if err := e.LoadMetadataIndex(1, index); err != nil {
t.Fatal(err)
}
// Verify index is correct.
if m, err = index.Measurement([]byte("cpu")); err != nil {
t.Fatal(err)
} else if m == nil {
t.Fatal("measurement not found")
} else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags(), models.NewTags(map[string]string{"host": "A"})) {
t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags())
} else if s := m.SeriesByID(2); s.Key != "cpu,host=B" || !reflect.DeepEqual(s.Tags(), models.NewTags(map[string]string{"host": "B"})) {
t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags())
}
}
*/
// Ensure that deletes only sent to the WAL will clear out the data from the cache on restart
func TestEngine_DeleteWALLoadMetadata(t *testing.T) {
e := MustOpenDefaultEngine()
defer e.Close()
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=B value=1.2 2000000000`,
); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
// Remove series.
if err := e.DeleteSeriesRange([][]byte{[]byte("cpu,host=A")}, math.MinInt64, math.MaxInt64); err != nil {
t.Fatalf("failed to delete series: %s", err.Error())
}
// Ensure we can close and load index from the WAL
if err := e.Reopen(); err != nil {
t.Fatal(err)
}
if exp, got := 0, len(e.Cache.Values(tsm1.SeriesFieldKeyBytes("cpu,host=A", "value"))); exp != got {
t.Fatalf("unexpected number of values: got: %d. exp: %d", got, exp)
}
if exp, got := 1, len(e.Cache.Values(tsm1.SeriesFieldKeyBytes("cpu,host=B", "value"))); exp != got {
t.Fatalf("unexpected number of values: got: %d. exp: %d", got, exp)
}
}
// Ensure that the engine will backup any TSM files created since the passed in time
func TestEngine_Backup(t *testing.T) {
// Generate temporary file.
f, _ := ioutil.TempFile("", "tsm")
f.Close()
os.Remove(f.Name())
walPath := filepath.Join(f.Name(), "wal")
os.MkdirAll(walPath, 0777)
defer os.RemoveAll(f.Name())
// Create a few points.
p1 := MustParsePointString("cpu,host=A value=1.1 1000000000")
p2 := MustParsePointString("cpu,host=B value=1.2 2000000000")
p3 := MustParsePointString("cpu,host=C value=1.3 3000000000")
// Write those points to the engine.
db := path.Base(f.Name())
opt := tsdb.NewEngineOptions()
opt.InmemIndex = inmem.NewIndex(db)
idx := tsdb.MustOpenIndex(1, db, filepath.Join(f.Name(), "index"), opt)
defer idx.Close()
e := tsm1.NewEngine(1, idx, db, f.Name(), walPath, opt).(*tsm1.Engine)
// mock the planner so compactions don't run during the test
e.CompactionPlan = &mockPlanner{}
if err := e.Open(); err != nil {
t.Fatalf("failed to open tsm1 engine: %s", err.Error())
}
if err := e.WritePoints([]models.Point{p1}); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.WriteSnapshot(); err != nil {
t.Fatalf("failed to snapshot: %s", err.Error())
}
if err := e.WritePoints([]models.Point{p2}); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
b := bytes.NewBuffer(nil)
if err := e.Backup(b, "", time.Unix(0, 0)); err != nil {
t.Fatalf("failed to backup: %s", err.Error())
}
tr := tar.NewReader(b)
if len(e.FileStore.Files()) != 2 {
t.Fatalf("file count wrong: exp: %d, got: %d", 2, len(e.FileStore.Files()))
}
fileNames := map[string]bool{}
for _, f := range e.FileStore.Files() {
fileNames[filepath.Base(f.Path())] = true
}
th, err := tr.Next()
for err == nil {
if !fileNames[th.Name] {
t.Errorf("Extra file in backup: %q", th.Name)
}
delete(fileNames, th.Name)
th, err = tr.Next()
}
if err != nil && err != io.EOF {
t.Fatalf("Problem reading tar header: %s", err)
}
for f := range fileNames {
t.Errorf("File missing from backup: %s", f)
}
if t.Failed() {
t.FailNow()
}
lastBackup := time.Now()
// we have to sleep for a second because last modified times only have second level precision.
// so this test won't work properly unless the file is at least a second past the last one
time.Sleep(time.Second)
if err := e.WritePoints([]models.Point{p3}); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
b = bytes.NewBuffer(nil)
if err := e.Backup(b, "", lastBackup); err != nil {
t.Fatalf("failed to backup: %s", err.Error())
}
tr = tar.NewReader(b)
th, err = tr.Next()
if err != nil {
t.Fatalf("error getting next tar header: %s", err.Error())
}
mostRecentFile := e.FileStore.Files()[e.FileStore.Count()-1].Path()
if !strings.Contains(mostRecentFile, th.Name) || th.Name == "" {
t.Fatalf("file name doesn't match:\n\tgot: %s\n\texp: %s", th.Name, mostRecentFile)
}
}
// Ensure engine can create an ascending iterator for cached values.
func TestEngine_CreateIterator_Cache_Ascending(t *testing.T) {
t.Parallel()
e := MustOpenDefaultEngine()
defer e.Close()
// e.CreateMeasurement("cpu")
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float, false)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`,
`cpu,host=A value=1.3 3000000000`,
); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
itr, err := e.CreateIterator(context.Background(), "cpu", query.IteratorOptions{
Expr: influxql.MustParseExpr(`value`),
Dimensions: []string{"host"},
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
Ascending: true,
})
if err != nil {
t.Fatal(err)
}
fitr := itr.(query.FloatIterator)
if p, err := fitr.Next(); err != nil {
t.Fatalf("unexpected error(0): %v", err)
} else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 1000000000, Value: 1.1}) {
t.Fatalf("unexpected point(0): %v", p)
}
if p, err := fitr.Next(); err != nil {
t.Fatalf("unexpected error(1): %v", err)
} else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 2000000000, Value: 1.2}) {
t.Fatalf("unexpected point(1): %v", p)
}
if p, err := fitr.Next(); err != nil {
t.Fatalf("unexpected error(2): %v", err)
} else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 3000000000, Value: 1.3}) {
t.Fatalf("unexpected point(2): %v", p)
}
if p, err := fitr.Next(); err != nil {
t.Fatalf("expected eof, got error: %v", err)
} else if p != nil {
t.Fatalf("expected eof: %v", p)
}
}
// Ensure engine can create an descending iterator for cached values.
func TestEngine_CreateIterator_Cache_Descending(t *testing.T) {
t.Parallel()
e := MustOpenDefaultEngine()
defer e.Close()
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float, false)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`,
`cpu,host=A value=1.3 3000000000`,
); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
itr, err := e.CreateIterator(context.Background(), "cpu", query.IteratorOptions{
Expr: influxql.MustParseExpr(`value`),
Dimensions: []string{"host"},
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
Ascending: false,
})
if err != nil {
t.Fatal(err)
}
fitr := itr.(query.FloatIterator)
if p, err := fitr.Next(); err != nil {
t.Fatalf("unexpected error(0): %v", err)
} else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 3000000000, Value: 1.3}) {
t.Fatalf("unexpected point(0): %v", p)
}
if p, err := fitr.Next(); err != nil {
t.Fatalf("unepxected error(1): %v", err)
} else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 2000000000, Value: 1.2}) {
t.Fatalf("unexpected point(1): %v", p)
}
if p, err := fitr.Next(); err != nil {
t.Fatalf("unexpected error(2): %v", err)
} else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 1000000000, Value: 1.1}) {
t.Fatalf("unexpected point(2): %v", p)
}
if p, err := fitr.Next(); err != nil {
t.Fatalf("expected eof, got error: %v", err)
} else if p != nil {
t.Fatalf("expected eof: %v", p)
}
}
// Ensure engine can create an ascending iterator for tsm values.
func TestEngine_CreateIterator_TSM_Ascending(t *testing.T) {
t.Parallel()
e := MustOpenDefaultEngine()
defer e.Close()
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float, false)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`,
`cpu,host=A value=1.3 3000000000`,
); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
e.MustWriteSnapshot()
itr, err := e.CreateIterator(context.Background(), "cpu", query.IteratorOptions{
Expr: influxql.MustParseExpr(`value`),
Dimensions: []string{"host"},
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
Ascending: true,
})
if err != nil {
t.Fatal(err)
}
fitr := itr.(query.FloatIterator)
if p, err := fitr.Next(); err != nil {
t.Fatalf("unexpected error(0): %v", err)
} else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 1000000000, Value: 1.1}) {
t.Fatalf("unexpected point(0): %v", p)
}
if p, err := fitr.Next(); err != nil {
t.Fatalf("unexpected error(1): %v", err)
} else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 2000000000, Value: 1.2}) {
t.Fatalf("unexpected point(1): %v", p)
}
if p, err := fitr.Next(); err != nil {
t.Fatalf("unexpected error(2): %v", err)
} else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 3000000000, Value: 1.3}) {
t.Fatalf("unexpected point(2): %v", p)
}
if p, err := fitr.Next(); err != nil {
t.Fatalf("expected eof, got error: %v", err)
} else if p != nil {
t.Fatalf("expected eof: %v", p)
}
}
// Ensure engine can create an descending iterator for cached values.
func TestEngine_CreateIterator_TSM_Descending(t *testing.T) {
t.Parallel()
e := MustOpenDefaultEngine()
defer e.Close()
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float, false)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`,
`cpu,host=A value=1.3 3000000000`,
); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
e.MustWriteSnapshot()
itr, err := e.CreateIterator(context.Background(), "cpu", query.IteratorOptions{
Expr: influxql.MustParseExpr(`value`),
Dimensions: []string{"host"},
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
Ascending: false,
})
if err != nil {
t.Fatal(err)
}
fitr := itr.(query.FloatIterator)
if p, err := fitr.Next(); err != nil {
t.Fatalf("unexpected error(0): %v", err)
} else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 3000000000, Value: 1.3}) {
t.Fatalf("unexpected point(0): %v", p)
}
if p, err := fitr.Next(); err != nil {
t.Fatalf("unexpected error(1): %v", err)
} else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 2000000000, Value: 1.2}) {
t.Fatalf("unexpected point(1): %v", p)
}
if p, err := fitr.Next(); err != nil {
t.Fatalf("unexpected error(2): %v", err)
} else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 1000000000, Value: 1.1}) {
t.Fatalf("unexpected point(2): %v", p)
}
if p, err := fitr.Next(); err != nil {
t.Fatalf("expected eof, got error: %v", err)
} else if p != nil {
t.Fatalf("expected eof: %v", p)
}
}
// Ensure engine can create an iterator with auxilary fields.
func TestEngine_CreateIterator_Aux(t *testing.T) {
t.Parallel()
e := MustOpenDefaultEngine()
defer e.Close()
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float, false)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("F"), influxql.Float, false)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A F=100 1000000000`,
`cpu,host=A value=1.2 2000000000`,
`cpu,host=A value=1.3 3000000000`,
`cpu,host=A F=200 3000000000`,
); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
itr, err := e.CreateIterator(context.Background(), "cpu", query.IteratorOptions{
Expr: influxql.MustParseExpr(`value`),
Aux: []influxql.VarRef{{Val: "F"}},
Dimensions: []string{"host"},
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
Ascending: true,
})
if err != nil {
t.Fatal(err)
}
fitr := itr.(query.FloatIterator)
if p, err := fitr.Next(); err != nil {
t.Fatalf("unexpected error(0): %v", err)
} else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 1000000000, Value: 1.1, Aux: []interface{}{float64(100)}}) {
t.Fatalf("unexpected point(0): %v", p)
}
if p, err := fitr.Next(); err != nil {
t.Fatalf("unexpected error(1): %v", err)
} else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 2000000000, Value: 1.2, Aux: []interface{}{(*float64)(nil)}}) {
t.Fatalf("unexpected point(1): %v", p)
}
if p, err := fitr.Next(); err != nil {
t.Fatalf("unexpected error(2): %v", err)
} else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 3000000000, Value: 1.3, Aux: []interface{}{float64(200)}}) {
t.Fatalf("unexpected point(2): %v", p)
}
if p, err := fitr.Next(); err != nil {
t.Fatalf("expected eof, got error: %v", err)
} else if p != nil {
t.Fatalf("expected eof: %v", p)
}
}
// Ensure engine can create an iterator with a condition.
func TestEngine_CreateIterator_Condition(t *testing.T) {
t.Parallel()
e := MustOpenDefaultEngine()
defer e.Close()
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float, false)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("X"), influxql.Float, false)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("Y"), influxql.Float, false)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
e.SetFieldName([]byte("cpu"), "X")
e.SetFieldName([]byte("cpu"), "Y")
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A X=10 1000000000`,
`cpu,host=A Y=100 1000000000`,
`cpu,host=A value=1.2 2000000000`,
`cpu,host=A value=1.3 3000000000`,
`cpu,host=A X=20 3000000000`,
`cpu,host=A Y=200 3000000000`,
); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
itr, err := e.CreateIterator(context.Background(), "cpu", query.IteratorOptions{
Expr: influxql.MustParseExpr(`value`),
Dimensions: []string{"host"},
Condition: influxql.MustParseExpr(`X = 10 OR Y > 150`),
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
Ascending: true,
})
if err != nil {
t.Fatal(err)
}
fitr := itr.(query.FloatIterator)
if p, err := fitr.Next(); err != nil {
t.Fatalf("unexpected error(0): %v", err)
} else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 1000000000, Value: 1.1}) {
t.Fatalf("unexpected point(0): %v", p)
}
if p, err := fitr.Next(); err != nil {
t.Fatalf("unexpected point(1): %v", err)
} else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 3000000000, Value: 1.3}) {
t.Fatalf("unexpected point(1): %v", p)
}
if p, err := fitr.Next(); err != nil {
t.Fatalf("expected eof, got error: %v", err)
} else if p != nil {
t.Fatalf("expected eof: %v", p)
}
}
// Ensures that deleting series from TSM files with multiple fields removes all the
/// series
func TestEngine_DeleteSeries(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
// Create a few points.
p1 := MustParsePointString("cpu,host=A value=1.1 1000000000")
p2 := MustParsePointString("cpu,host=B value=1.2 2000000000")
p3 := MustParsePointString("cpu,host=A sum=1.3 3000000000")
e := NewEngine(index)
// mock the planner so compactions don't run during the test
e.CompactionPlan = &mockPlanner{}
if err := e.Open(); err != nil {
panic(err)
}
defer e.Close()
if err := e.WritePoints([]models.Point{p1, p2, p3}); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.WriteSnapshot(); err != nil {
t.Fatalf("failed to snapshot: %s", err.Error())
}
keys := e.FileStore.Keys()
if exp, got := 3, len(keys); exp != got {
t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
}
if err := e.DeleteSeriesRange([][]byte{[]byte("cpu,host=A")}, math.MinInt64, math.MaxInt64); err != nil {
t.Fatalf("failed to delete series: %v", err)
}
keys = e.FileStore.Keys()
if exp, got := 1, len(keys); exp != got {
t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
}
exp := "cpu,host=B#!~#value"
if _, ok := keys[exp]; !ok {
t.Fatalf("wrong series deleted: exp %v, got %v", exp, keys)
}
})
}
}
func TestEngine_LastModified(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
// Create a few points.
p1 := MustParsePointString("cpu,host=A value=1.1 1000000000")
p2 := MustParsePointString("cpu,host=B value=1.2 2000000000")
p3 := MustParsePointString("cpu,host=A sum=1.3 3000000000")
e := NewEngine(index)
// mock the planner so compactions don't run during the test
e.CompactionPlan = &mockPlanner{}
if lm := e.LastModified(); !lm.IsZero() {
t.Fatalf("expected zero time, got %v", lm.UTC())
}
e.SetEnabled(false)
if err := e.Open(); err != nil {
t.Fatalf("failed to open tsm1 engine: %s", err.Error())
}
defer e.Close()
if err := e.WritePoints([]models.Point{p1, p2, p3}); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
lm := e.LastModified()
if lm.IsZero() {
t.Fatalf("expected non-zero time, got %v", lm.UTC())
}
e.SetEnabled(true)
if err := e.WriteSnapshot(); err != nil {
t.Fatalf("failed to snapshot: %s", err.Error())
}
lm2 := e.LastModified()
if got, exp := lm.Equal(lm2), false; exp != got {
t.Fatalf("expected time change, got %v, exp %v", got, exp)
}
if err := e.DeleteSeriesRange([][]byte{[]byte("cpu,host=A")}, math.MinInt64, math.MaxInt64); err != nil {
t.Fatalf("failed to delete series: %v", err)
}
lm3 := e.LastModified()
if got, exp := lm2.Equal(lm3), false; exp != got {
t.Fatalf("expected time change, got %v, exp %v", got, exp)
}
})
}
}
func TestEngine_SnapshotsDisabled(t *testing.T) {
// Generate temporary file.
dir, _ := ioutil.TempDir("", "tsm")
walPath := filepath.Join(dir, "wal")
os.MkdirAll(walPath, 0777)
defer os.RemoveAll(dir)
// Create a tsm1 engine.
db := path.Base(dir)
opt := tsdb.NewEngineOptions()
opt.InmemIndex = inmem.NewIndex(db)
idx := tsdb.MustOpenIndex(1, db, filepath.Join(dir, "index"), opt)
defer idx.Close()
e := tsm1.NewEngine(1, idx, db, dir, walPath, opt).(*tsm1.Engine)
// mock the planner so compactions don't run during the test
e.CompactionPlan = &mockPlanner{}
e.SetEnabled(false)
if err := e.Open(); err != nil {
t.Fatalf("failed to open tsm1 engine: %s", err.Error())
}
// Make sure Snapshots are disabled.
e.SetCompactionsEnabled(false)
e.Compactor.DisableSnapshots()
// Writing a snapshot should not fail when the snapshot is empty
// even if snapshots are disabled.
if err := e.WriteSnapshot(); err != nil {
t.Fatalf("failed to snapshot: %s", err.Error())
}
}
func BenchmarkEngine_CreateIterator_Count_1K(b *testing.B) {
benchmarkEngineCreateIteratorCount(b, 1000)
}
func BenchmarkEngine_CreateIterator_Count_100K(b *testing.B) {
benchmarkEngineCreateIteratorCount(b, 100000)
}
func BenchmarkEngine_CreateIterator_Count_1M(b *testing.B) {
benchmarkEngineCreateIteratorCount(b, 1000000)
}
func benchmarkEngineCreateIteratorCount(b *testing.B, pointN int) {
benchmarkIterator(b, query.IteratorOptions{
Expr: influxql.MustParseExpr("count(value)"),
Ascending: true,
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
}, pointN)
}
func BenchmarkEngine_CreateIterator_First_1K(b *testing.B) {
benchmarkEngineCreateIteratorFirst(b, 1000)
}
func BenchmarkEngine_CreateIterator_First_100K(b *testing.B) {
benchmarkEngineCreateIteratorFirst(b, 100000)
}
func BenchmarkEngine_CreateIterator_First_1M(b *testing.B) {
benchmarkEngineCreateIteratorFirst(b, 1000000)
}
func benchmarkEngineCreateIteratorFirst(b *testing.B, pointN int) {
benchmarkIterator(b, query.IteratorOptions{
Expr: influxql.MustParseExpr("first(value)"),
Dimensions: []string{"host"},
Ascending: true,
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
}, pointN)
}
func BenchmarkEngine_CreateIterator_Last_1K(b *testing.B) {
benchmarkEngineCreateIteratorLast(b, 1000)
}
func BenchmarkEngine_CreateIterator_Last_100K(b *testing.B) {
benchmarkEngineCreateIteratorLast(b, 100000)
}
func BenchmarkEngine_CreateIterator_Last_1M(b *testing.B) {
benchmarkEngineCreateIteratorLast(b, 1000000)
}
func benchmarkEngineCreateIteratorLast(b *testing.B, pointN int) {
benchmarkIterator(b, query.IteratorOptions{
Expr: influxql.MustParseExpr("last(value)"),
Dimensions: []string{"host"},
Ascending: true,
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
}, pointN)
}
func BenchmarkEngine_CreateIterator_Limit_1K(b *testing.B) {
benchmarkEngineCreateIteratorLimit(b, 1000)
}
func BenchmarkEngine_CreateIterator_Limit_100K(b *testing.B) {
benchmarkEngineCreateIteratorLimit(b, 100000)
}
func BenchmarkEngine_CreateIterator_Limit_1M(b *testing.B) {
benchmarkEngineCreateIteratorLimit(b, 1000000)
}
func BenchmarkEngine_WritePoints(b *testing.B) {
batchSizes := []int{10, 100, 1000, 5000, 10000}
for _, sz := range batchSizes {
for _, index := range tsdb.RegisteredIndexes() {
e := MustOpenEngine(index)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float, false)
pp := make([]models.Point, 0, sz)
for i := 0; i < sz; i++ {
p := MustParsePointString(fmt.Sprintf("cpu,host=%d value=1.2", i))
pp = append(pp, p)
}
b.Run(fmt.Sprintf("%s_%d", index, sz), func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
err := e.WritePoints(pp)
if err != nil {
b.Fatal(err)
}
}
})
e.Close()
}
}
}
func BenchmarkEngine_WritePoints_Parallel(b *testing.B) {
batchSizes := []int{1000, 5000, 10000, 25000, 50000, 75000, 100000, 200000}
for _, sz := range batchSizes {
for _, index := range tsdb.RegisteredIndexes() {
e := MustOpenEngine(index)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float, false)
cpus := runtime.GOMAXPROCS(0)
pp := make([]models.Point, 0, sz*cpus)
for i := 0; i < sz*cpus; i++ {
p := MustParsePointString(fmt.Sprintf("cpu,host=%d value=1.2,other=%di", i, i))
pp = append(pp, p)
}
b.Run(fmt.Sprintf("%s_%d", index, sz), func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
var wg sync.WaitGroup
errC := make(chan error)
for i := 0; i < cpus; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
from, to := i*sz, (i+1)*sz
err := e.WritePoints(pp[from:to])
if err != nil {
errC <- err
return
}
}(i)
}
go func() {
wg.Wait()
close(errC)
}()
for err := range errC {
if err != nil {
b.Error(err)
}
}
}
})
e.Close()
}
}
}
func benchmarkEngineCreateIteratorLimit(b *testing.B, pointN int) {
benchmarkIterator(b, query.IteratorOptions{
Expr: influxql.MustParseExpr("value"),
Dimensions: []string{"host"},
Ascending: true,
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
Limit: 10,
}, pointN)
}
func benchmarkIterator(b *testing.B, opt query.IteratorOptions, pointN int) {
e := MustInitDefaultBenchmarkEngine(pointN)
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
itr, err := e.CreateIterator(context.Background(), "cpu", opt)
if err != nil {
b.Fatal(err)
}
query.DrainIterator(itr)
}
}
var benchmark struct {
Engine *Engine
PointN int
}
var hostNames = []string{"A", "B", "C", "D", "E", "F", "G", "H", "I", "J"}
// MustInitDefaultBenchmarkEngine creates a new engine using the default index
// and fills it with points. Reuses previous engine if the same parameters
// were used.
func MustInitDefaultBenchmarkEngine(pointN int) *Engine {
// Reuse engine, if available.
if benchmark.Engine != nil {
if benchmark.PointN == pointN {
return benchmark.Engine
}
// Otherwise close and remove it.
benchmark.Engine.Close()
benchmark.Engine = nil
}
const batchSize = 1000
if pointN%batchSize != 0 {
panic(fmt.Sprintf("point count (%d) must be a multiple of batch size (%d)", pointN, batchSize))
}
e := MustOpenEngine(tsdb.DefaultIndex)
// Initialize metadata.
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float, false)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
// Generate time ascending points with jitterred time & value.
rand := rand.New(rand.NewSource(0))
for i := 0; i < pointN; i += batchSize {
var buf bytes.Buffer
for j := 0; j < batchSize; j++ {
fmt.Fprintf(&buf, "cpu,host=%s value=%d %d",
hostNames[j%len(hostNames)],
100+rand.Intn(50)-25,
(time.Duration(i+j)*time.Second)+(time.Duration(rand.Intn(500)-250)*time.Millisecond),
)
if j != pointN-1 {
fmt.Fprint(&buf, "\n")
}
}
if err := e.WritePointsString(buf.String()); err != nil {
panic(err)
}
}
if err := e.WriteSnapshot(); err != nil {
panic(err)
}
// Force garbage collection.
runtime.GC()
// Save engine reference for reuse.
benchmark.Engine = e
benchmark.PointN = pointN
return e
}
// Engine is a test wrapper for tsm1.Engine.
type Engine struct {
*tsm1.Engine
root string
index tsdb.Index
}
// NewEngine returns a new instance of Engine at a temporary location.
func NewEngine(index string) *Engine {
root, err := ioutil.TempDir("", "tsm1-")
if err != nil {
panic(err)
}
db := path.Base(root)
opt := tsdb.NewEngineOptions()
opt.IndexVersion = index
if index == "inmem" {
opt.InmemIndex = inmem.NewIndex(db)
}
idx := tsdb.MustOpenIndex(1, db, filepath.Join(root, "data", "index"), opt)
return &Engine{
Engine: tsm1.NewEngine(1,
idx,
db,
filepath.Join(root, "data"),
filepath.Join(root, "wal"),
opt).(*tsm1.Engine),
root: root,
index: idx,
}
}
// MustOpenDefaultEngine returns a new, open instance of Engine using the default
// index. Useful when the index is not directly under test.
func MustOpenDefaultEngine() *Engine {
e := NewEngine(tsdb.DefaultIndex)
if err := e.Open(); err != nil {
panic(err)
}
return e
}
// MustOpenEngine returns a new, open instance of Engine.
func MustOpenEngine(index string) *Engine {
e := NewEngine(index)
if err := e.Open(); err != nil {
panic(err)
}
return e
}
// Close closes the engine and removes all underlying data.
func (e *Engine) Close() error {
if e.index != nil {
e.index.Close()
}
defer os.RemoveAll(e.root)
return e.Engine.Close()
}
// Reopen closes and reopens the engine.
func (e *Engine) Reopen() error {
if err := e.Engine.Close(); err != nil {
return err
} else if e.index.Close(); err != nil {
return err
}
db := path.Base(e.root)
opt := tsdb.NewEngineOptions()
opt.InmemIndex = inmem.NewIndex(db)
e.index = tsdb.MustOpenIndex(1, db, filepath.Join(e.root, "data", "index"), opt)
e.Engine = tsm1.NewEngine(1,
e.index,
db,
filepath.Join(e.root, "data"),
filepath.Join(e.root, "wal"),
opt).(*tsm1.Engine)
if err := e.Engine.Open(); err != nil {
return err
}
return nil
}
// MustWriteSnapshot forces a snapshot of the engine. Panic on error.
func (e *Engine) MustWriteSnapshot() {
if err := e.WriteSnapshot(); err != nil {
panic(err)
}
}
// WritePointsString parses a string buffer and writes the points.
func (e *Engine) WritePointsString(buf ...string) error {
return e.WritePoints(MustParsePointsString(strings.Join(buf, "\n")))
}
// MustParsePointsString parses points from a string. Panic on error.
func MustParsePointsString(buf string) []models.Point {
a, err := models.ParsePointsString(buf)
if err != nil {
panic(err)
}
return a
}
// MustParsePointString parses the first point from a string. Panic on error.
func MustParsePointString(buf string) models.Point { return MustParsePointsString(buf)[0] }
type mockPlanner struct{}
func (m *mockPlanner) Plan(lastWrite time.Time) []tsm1.CompactionGroup { return nil }
func (m *mockPlanner) PlanLevel(level int) []tsm1.CompactionGroup { return nil }
func (m *mockPlanner) PlanOptimize() []tsm1.CompactionGroup { return nil }
func (m *mockPlanner) Release(groups []tsm1.CompactionGroup) {}
func (m *mockPlanner) FullyCompacted() bool { return false }
// ParseTags returns an instance of Tags for a comma-delimited list of key/values.
func ParseTags(s string) query.Tags {
m := make(map[string]string)
for _, kv := range strings.Split(s, ",") {
a := strings.Split(kv, "=")
m[a[0]] = a[1]
}
return query.NewTags(m)
}