influxdb/tsdb/index/inmem/inmem_test.go

188 lines
5.8 KiB
Go

package inmem_test
import (
"fmt"
"os"
"testing"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/index/inmem"
)
var notrack = tsdb.NoopStatsTracker()
func createData(lo, hi int) (keys, names [][]byte, tags []models.Tags) {
for i := lo; i < hi; i++ {
keys = append(keys, []byte(fmt.Sprintf("m0,tag0=t%d", i)))
names = append(names, []byte("m0"))
var t models.Tags
t.Set([]byte("tag0"), []byte(fmt.Sprintf("%d", i)))
tags = append(tags, t)
}
return
}
func BenchmarkShardIndex_CreateSeriesListIfNotExists_MaxValuesExceeded(b *testing.B) {
sfile := mustOpenSeriesFile()
defer sfile.Close()
opt := tsdb.EngineOptions{InmemIndex: inmem.NewIndex("foo", sfile.SeriesFile)}
opt.Config.MaxValuesPerTag = 10
si := inmem.NewShardIndex(1, tsdb.NewSeriesIDSet(), opt)
si.Open()
keys, names, tags := createData(0, 10)
si.CreateSeriesListIfNotExists(keys, names, tags, notrack)
b.ReportAllocs()
b.ResetTimer()
keys, names, tags = createData(9, 5010)
for i := 0; i < b.N; i++ {
si.CreateSeriesListIfNotExists(keys, names, tags, notrack)
}
}
func BenchmarkShardIndex_CreateSeriesListIfNotExists_MaxValuesNotExceeded(b *testing.B) {
sfile := mustOpenSeriesFile()
defer sfile.Close()
opt := tsdb.EngineOptions{InmemIndex: inmem.NewIndex("foo", sfile.SeriesFile)}
opt.Config.MaxValuesPerTag = 100000
si := inmem.NewShardIndex(1, tsdb.NewSeriesIDSet(), opt)
si.Open()
keys, names, tags := createData(0, 10)
si.CreateSeriesListIfNotExists(keys, names, tags, notrack)
b.ReportAllocs()
b.ResetTimer()
keys, names, tags = createData(9, 5010)
for i := 0; i < b.N; i++ {
si.CreateSeriesListIfNotExists(keys, names, tags, notrack)
}
}
func BenchmarkShardIndex_CreateSeriesListIfNotExists_NoMaxValues(b *testing.B) {
sfile := mustOpenSeriesFile()
defer sfile.Close()
opt := tsdb.EngineOptions{InmemIndex: inmem.NewIndex("foo", sfile.SeriesFile)}
si := inmem.NewShardIndex(1, tsdb.NewSeriesIDSet(), opt)
si.Open()
keys, names, tags := createData(0, 10)
si.CreateSeriesListIfNotExists(keys, names, tags, notrack)
b.ReportAllocs()
b.ResetTimer()
keys, names, tags = createData(9, 5010)
for i := 0; i < b.N; i++ {
si.CreateSeriesListIfNotExists(keys, names, tags, notrack)
}
}
func BenchmarkShardIndex_CreateSeriesListIfNotExists_MaxSeriesExceeded(b *testing.B) {
sfile := mustOpenSeriesFile()
defer sfile.Close()
opt := tsdb.EngineOptions{InmemIndex: inmem.NewIndex("foo", sfile.SeriesFile)}
opt.Config.MaxValuesPerTag = 0
opt.Config.MaxSeriesPerDatabase = 10
si := inmem.NewShardIndex(1, tsdb.NewSeriesIDSet(), opt)
si.Open()
keys, names, tags := createData(0, 10)
si.CreateSeriesListIfNotExists(keys, names, tags, notrack)
b.ReportAllocs()
b.ResetTimer()
keys, names, tags = createData(9, 5010)
for i := 0; i < b.N; i++ {
si.CreateSeriesListIfNotExists(keys, names, tags, notrack)
}
}
func TestIndex_Bytes(t *testing.T) {
sfile := mustOpenSeriesFile()
defer sfile.Close()
opt := tsdb.EngineOptions{InmemIndex: inmem.NewIndex("foo", sfile.SeriesFile)}
si := inmem.NewShardIndex(1, tsdb.NewSeriesIDSet(), opt).(*inmem.ShardIndex)
indexBaseBytes := si.Bytes()
name := []byte("name")
err := si.CreateSeriesIfNotExists(name, name, models.Tags{}, notrack)
if err != nil {
t.Error(err)
t.FailNow()
}
indexNewBytes := si.Bytes()
if indexBaseBytes >= indexNewBytes {
t.Errorf("index Bytes(): want >%d, got %d", indexBaseBytes, indexNewBytes)
}
}
func TestIndex_MeasurementTracking(t *testing.T) {
sfile := mustOpenSeriesFile()
defer sfile.Close()
opt := tsdb.EngineOptions{InmemIndex: inmem.NewIndex("foo", sfile.SeriesFile)}
s1 := inmem.NewShardIndex(1, tsdb.NewSeriesIDSet(), opt).(*inmem.ShardIndex)
s2 := inmem.NewShardIndex(2, tsdb.NewSeriesIDSet(), opt).(*inmem.ShardIndex)
b := func(s string) []byte { return []byte(s) }
mt := func(k, v string) models.Tag { return models.Tag{Key: b(k), Value: b(v)} }
s1.CreateSeriesIfNotExists(b("m,t=t1"), b("m"), models.Tags{mt("t", "t1")}, notrack)
s1.CreateSeriesIfNotExists(b("m,t=t2"), b("m"), models.Tags{mt("t", "t2")}, notrack)
s2.CreateSeriesIfNotExists(b("m,t=t1"), b("m"), models.Tags{mt("t", "t1")}, notrack)
s2.CreateSeriesIfNotExists(b("m,t=t2"), b("m"), models.Tags{mt("t", "t2")}, notrack)
series1, _ := s1.Series(b("m,t=t1"))
series2, _ := s1.Series(b("m,t=t2"))
if ok, err := s1.DropMeasurementIfSeriesNotExist(b("m")); err != nil || ok {
t.Fatal("invalid drop")
}
if ok, err := s2.DropMeasurementIfSeriesNotExist(b("m")); err != nil || ok {
t.Fatal("invalid drop")
}
s1.DropSeries(series1.ID, b(series1.Key), false)
s1.DropSeries(series2.ID, b(series2.Key), false)
if ok, err := s1.DropMeasurementIfSeriesNotExist(b("m")); err != nil || !ok {
t.Fatal("invalid drop")
}
if ok, err := s2.DropMeasurementIfSeriesNotExist(b("m")); err != nil || ok {
t.Fatal("invalid drop")
}
s2.DropSeries(series1.ID, b(series1.Key), false)
s2.DropSeries(series2.ID, b(series2.Key), false)
if ok, err := s2.DropMeasurementIfSeriesNotExist(b("m")); err != nil || !ok {
t.Fatal("invalid drop")
}
}
// seriesFileWrapper is a test wrapper for tsdb.seriesFileWrapper.
type seriesFileWrapper struct {
*tsdb.SeriesFile
}
// newSeriesFileWrapper returns a new instance of seriesFileWrapper with a temporary file path.
func newSeriesFileWrapper() *seriesFileWrapper {
dir, err := os.MkdirTemp("", "tsdb-series-file-")
if err != nil {
panic(err)
}
return &seriesFileWrapper{SeriesFile: tsdb.NewSeriesFile(dir)}
}
// mustOpenSeriesFile returns a new, open instance of seriesFileWrapper. Panic on error.
func mustOpenSeriesFile() *seriesFileWrapper {
f := newSeriesFileWrapper()
if err := f.Open(); err != nil {
panic(err)
}
return f
}
// Close closes the log file and removes it from disk.
func (f *seriesFileWrapper) Close() error {
defer os.RemoveAll(f.Path())
return f.SeriesFile.Close()
}