188 lines
5.8 KiB
Go
188 lines
5.8 KiB
Go
package inmem_test
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"testing"
|
|
|
|
"github.com/influxdata/influxdb/models"
|
|
"github.com/influxdata/influxdb/tsdb"
|
|
"github.com/influxdata/influxdb/tsdb/index/inmem"
|
|
)
|
|
|
|
var notrack = tsdb.NoopStatsTracker()
|
|
|
|
func createData(lo, hi int) (keys, names [][]byte, tags []models.Tags) {
|
|
for i := lo; i < hi; i++ {
|
|
keys = append(keys, []byte(fmt.Sprintf("m0,tag0=t%d", i)))
|
|
names = append(names, []byte("m0"))
|
|
var t models.Tags
|
|
t.Set([]byte("tag0"), []byte(fmt.Sprintf("%d", i)))
|
|
tags = append(tags, t)
|
|
}
|
|
return
|
|
}
|
|
|
|
func BenchmarkShardIndex_CreateSeriesListIfNotExists_MaxValuesExceeded(b *testing.B) {
|
|
sfile := mustOpenSeriesFile()
|
|
defer sfile.Close()
|
|
opt := tsdb.EngineOptions{InmemIndex: inmem.NewIndex("foo", sfile.SeriesFile)}
|
|
opt.Config.MaxValuesPerTag = 10
|
|
si := inmem.NewShardIndex(1, tsdb.NewSeriesIDSet(), opt)
|
|
si.Open()
|
|
keys, names, tags := createData(0, 10)
|
|
si.CreateSeriesListIfNotExists(keys, names, tags, notrack)
|
|
b.ReportAllocs()
|
|
b.ResetTimer()
|
|
|
|
keys, names, tags = createData(9, 5010)
|
|
for i := 0; i < b.N; i++ {
|
|
si.CreateSeriesListIfNotExists(keys, names, tags, notrack)
|
|
}
|
|
}
|
|
|
|
func BenchmarkShardIndex_CreateSeriesListIfNotExists_MaxValuesNotExceeded(b *testing.B) {
|
|
sfile := mustOpenSeriesFile()
|
|
defer sfile.Close()
|
|
opt := tsdb.EngineOptions{InmemIndex: inmem.NewIndex("foo", sfile.SeriesFile)}
|
|
opt.Config.MaxValuesPerTag = 100000
|
|
si := inmem.NewShardIndex(1, tsdb.NewSeriesIDSet(), opt)
|
|
si.Open()
|
|
keys, names, tags := createData(0, 10)
|
|
si.CreateSeriesListIfNotExists(keys, names, tags, notrack)
|
|
b.ReportAllocs()
|
|
b.ResetTimer()
|
|
|
|
keys, names, tags = createData(9, 5010)
|
|
for i := 0; i < b.N; i++ {
|
|
si.CreateSeriesListIfNotExists(keys, names, tags, notrack)
|
|
}
|
|
}
|
|
|
|
func BenchmarkShardIndex_CreateSeriesListIfNotExists_NoMaxValues(b *testing.B) {
|
|
sfile := mustOpenSeriesFile()
|
|
defer sfile.Close()
|
|
opt := tsdb.EngineOptions{InmemIndex: inmem.NewIndex("foo", sfile.SeriesFile)}
|
|
si := inmem.NewShardIndex(1, tsdb.NewSeriesIDSet(), opt)
|
|
si.Open()
|
|
keys, names, tags := createData(0, 10)
|
|
si.CreateSeriesListIfNotExists(keys, names, tags, notrack)
|
|
b.ReportAllocs()
|
|
b.ResetTimer()
|
|
|
|
keys, names, tags = createData(9, 5010)
|
|
for i := 0; i < b.N; i++ {
|
|
si.CreateSeriesListIfNotExists(keys, names, tags, notrack)
|
|
}
|
|
}
|
|
|
|
func BenchmarkShardIndex_CreateSeriesListIfNotExists_MaxSeriesExceeded(b *testing.B) {
|
|
sfile := mustOpenSeriesFile()
|
|
defer sfile.Close()
|
|
opt := tsdb.EngineOptions{InmemIndex: inmem.NewIndex("foo", sfile.SeriesFile)}
|
|
opt.Config.MaxValuesPerTag = 0
|
|
opt.Config.MaxSeriesPerDatabase = 10
|
|
si := inmem.NewShardIndex(1, tsdb.NewSeriesIDSet(), opt)
|
|
si.Open()
|
|
keys, names, tags := createData(0, 10)
|
|
si.CreateSeriesListIfNotExists(keys, names, tags, notrack)
|
|
b.ReportAllocs()
|
|
b.ResetTimer()
|
|
|
|
keys, names, tags = createData(9, 5010)
|
|
for i := 0; i < b.N; i++ {
|
|
si.CreateSeriesListIfNotExists(keys, names, tags, notrack)
|
|
}
|
|
}
|
|
|
|
func TestIndex_Bytes(t *testing.T) {
|
|
sfile := mustOpenSeriesFile()
|
|
defer sfile.Close()
|
|
opt := tsdb.EngineOptions{InmemIndex: inmem.NewIndex("foo", sfile.SeriesFile)}
|
|
si := inmem.NewShardIndex(1, tsdb.NewSeriesIDSet(), opt).(*inmem.ShardIndex)
|
|
|
|
indexBaseBytes := si.Bytes()
|
|
|
|
name := []byte("name")
|
|
err := si.CreateSeriesIfNotExists(name, name, models.Tags{}, notrack)
|
|
if err != nil {
|
|
t.Error(err)
|
|
t.FailNow()
|
|
}
|
|
|
|
indexNewBytes := si.Bytes()
|
|
if indexBaseBytes >= indexNewBytes {
|
|
t.Errorf("index Bytes(): want >%d, got %d", indexBaseBytes, indexNewBytes)
|
|
}
|
|
}
|
|
|
|
func TestIndex_MeasurementTracking(t *testing.T) {
|
|
sfile := mustOpenSeriesFile()
|
|
defer sfile.Close()
|
|
opt := tsdb.EngineOptions{InmemIndex: inmem.NewIndex("foo", sfile.SeriesFile)}
|
|
s1 := inmem.NewShardIndex(1, tsdb.NewSeriesIDSet(), opt).(*inmem.ShardIndex)
|
|
s2 := inmem.NewShardIndex(2, tsdb.NewSeriesIDSet(), opt).(*inmem.ShardIndex)
|
|
b := func(s string) []byte { return []byte(s) }
|
|
mt := func(k, v string) models.Tag { return models.Tag{Key: b(k), Value: b(v)} }
|
|
|
|
s1.CreateSeriesIfNotExists(b("m,t=t1"), b("m"), models.Tags{mt("t", "t1")}, notrack)
|
|
s1.CreateSeriesIfNotExists(b("m,t=t2"), b("m"), models.Tags{mt("t", "t2")}, notrack)
|
|
s2.CreateSeriesIfNotExists(b("m,t=t1"), b("m"), models.Tags{mt("t", "t1")}, notrack)
|
|
s2.CreateSeriesIfNotExists(b("m,t=t2"), b("m"), models.Tags{mt("t", "t2")}, notrack)
|
|
series1, _ := s1.Series(b("m,t=t1"))
|
|
series2, _ := s1.Series(b("m,t=t2"))
|
|
|
|
if ok, err := s1.DropMeasurementIfSeriesNotExist(b("m")); err != nil || ok {
|
|
t.Fatal("invalid drop")
|
|
}
|
|
if ok, err := s2.DropMeasurementIfSeriesNotExist(b("m")); err != nil || ok {
|
|
t.Fatal("invalid drop")
|
|
}
|
|
|
|
s1.DropSeries(series1.ID, b(series1.Key), false)
|
|
s1.DropSeries(series2.ID, b(series2.Key), false)
|
|
|
|
if ok, err := s1.DropMeasurementIfSeriesNotExist(b("m")); err != nil || !ok {
|
|
t.Fatal("invalid drop")
|
|
}
|
|
if ok, err := s2.DropMeasurementIfSeriesNotExist(b("m")); err != nil || ok {
|
|
t.Fatal("invalid drop")
|
|
}
|
|
|
|
s2.DropSeries(series1.ID, b(series1.Key), false)
|
|
s2.DropSeries(series2.ID, b(series2.Key), false)
|
|
|
|
if ok, err := s2.DropMeasurementIfSeriesNotExist(b("m")); err != nil || !ok {
|
|
t.Fatal("invalid drop")
|
|
}
|
|
}
|
|
|
|
// seriesFileWrapper is a test wrapper for tsdb.seriesFileWrapper.
|
|
type seriesFileWrapper struct {
|
|
*tsdb.SeriesFile
|
|
}
|
|
|
|
// newSeriesFileWrapper returns a new instance of seriesFileWrapper with a temporary file path.
|
|
func newSeriesFileWrapper() *seriesFileWrapper {
|
|
dir, err := os.MkdirTemp("", "tsdb-series-file-")
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return &seriesFileWrapper{SeriesFile: tsdb.NewSeriesFile(dir)}
|
|
}
|
|
|
|
// mustOpenSeriesFile returns a new, open instance of seriesFileWrapper. Panic on error.
|
|
func mustOpenSeriesFile() *seriesFileWrapper {
|
|
f := newSeriesFileWrapper()
|
|
if err := f.Open(); err != nil {
|
|
panic(err)
|
|
}
|
|
return f
|
|
}
|
|
|
|
// Close closes the log file and removes it from disk.
|
|
func (f *seriesFileWrapper) Close() error {
|
|
defer os.RemoveAll(f.Path())
|
|
return f.SeriesFile.Close()
|
|
}
|