package tsi1_test import ( "compress/gzip" "errors" "fmt" "io" "os" "path" "path/filepath" "reflect" "regexp" "sort" "sync" "testing" "time" "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/tsdb" "github.com/influxdata/influxdb/v2/tsdb/index/tsi1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) // Bloom filter settings used in tests. const M, K = 4096, 6 // Ensure index can iterate over all measurement names. func TestIndex_ForEachMeasurementName(t *testing.T) { idx := MustOpenDefaultIndex(t) defer idx.Close() // Add series to index. if err := idx.CreateSeriesSliceIfNotExists([]Series{ {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})}, {Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})}, }); err != nil { t.Fatal(err) } // Verify measurements are returned. idx.Run(t, func(t *testing.T) { var names []string if err := idx.ForEachMeasurementName(func(name []byte) error { names = append(names, string(name)) return nil }); err != nil { t.Fatal(err) } if !reflect.DeepEqual(names, []string{"cpu", "mem"}) { t.Fatalf("unexpected names: %#v", names) } }) // Add more series. if err := idx.CreateSeriesSliceIfNotExists([]Series{ {Name: []byte("disk")}, {Name: []byte("mem")}, }); err != nil { t.Fatal(err) } // Verify new measurements. idx.Run(t, func(t *testing.T) { var names []string if err := idx.ForEachMeasurementName(func(name []byte) error { names = append(names, string(name)) return nil }); err != nil { t.Fatal(err) } if !reflect.DeepEqual(names, []string{"cpu", "disk", "mem"}) { t.Fatalf("unexpected names: %#v", names) } }) } // Ensure index can return whether a measurement exists. func TestIndex_MeasurementExists(t *testing.T) { idx := MustOpenDefaultIndex(t) defer idx.Close() // Add series to index. if err := idx.CreateSeriesSliceIfNotExists([]Series{ {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})}, }); err != nil { t.Fatal(err) } // Verify measurement exists. idx.Run(t, func(t *testing.T) { if v, err := idx.MeasurementExists([]byte("cpu")); err != nil { t.Fatal(err) } else if !v { t.Fatal("expected measurement to exist") } }) name, tags := []byte("cpu"), models.NewTags(map[string]string{"region": "east"}) sid := idx.Index.SeriesFile().SeriesID(name, tags, nil) if sid == 0 { t.Fatalf("got 0 series id for %s/%v", name, tags) } // Delete one series. if err := idx.DropSeries(sid, models.MakeKey(name, tags), true); err != nil { t.Fatal(err) } // Verify measurement still exists. idx.Run(t, func(t *testing.T) { if v, err := idx.MeasurementExists([]byte("cpu")); err != nil { t.Fatal(err) } else if !v { t.Fatal("expected measurement to still exist") } }) // Delete second series. tags.Set([]byte("region"), []byte("west")) sid = idx.Index.SeriesFile().SeriesID(name, tags, nil) if sid == 0 { t.Fatalf("got 0 series id for %s/%v", name, tags) } if err := idx.DropSeries(sid, models.MakeKey(name, tags), true); err != nil { t.Fatal(err) } // Verify measurement is now deleted. idx.Run(t, func(t *testing.T) { if v, err := idx.MeasurementExists([]byte("cpu")); err != nil { t.Fatal(err) } else if v { t.Fatal("expected measurement to be deleted") } }) } // Ensure index can return a list of matching measurements. func TestIndex_MeasurementNamesByRegex(t *testing.T) { idx := MustOpenDefaultIndex(t) defer idx.Close() // Add series to index. if err := idx.CreateSeriesSliceIfNotExists([]Series{ {Name: []byte("cpu")}, {Name: []byte("disk")}, {Name: []byte("mem")}, }); err != nil { t.Fatal(err) } // Retrieve measurements by regex. idx.Run(t, func(t *testing.T) { names, err := idx.MeasurementNamesByRegex(regexp.MustCompile(`cpu|mem`)) if err != nil { t.Fatal(err) } else if !reflect.DeepEqual(names, [][]byte{[]byte("cpu"), []byte("mem")}) { t.Fatalf("unexpected names: %v", names) } }) } // Ensure index can delete a measurement and all related keys, values, & series. func TestIndex_DropMeasurement(t *testing.T) { idx := MustOpenDefaultIndex(t) defer idx.Close() // Add series to index. if err := idx.CreateSeriesSliceIfNotExists([]Series{ {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})}, {Name: []byte("disk"), Tags: models.NewTags(map[string]string{"region": "north"})}, {Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "west", "country": "us"})}, }); err != nil { t.Fatal(err) } // Drop measurement. if err := idx.DropMeasurement([]byte("cpu")); err != nil { t.Fatal(err) } // Verify data is gone in each stage. idx.Run(t, func(t *testing.T) { // Verify measurement is gone. if v, err := idx.MeasurementExists([]byte("cpu")); err != nil { t.Fatal(err) } else if v { t.Fatal("expected no measurement") } // Obtain file set to perform lower level checks. fs, err := idx.PartitionAt(0).RetainFileSet() if err != nil { t.Fatal(err) } defer fs.Release() // Verify tags & values are gone. if e := fs.TagKeyIterator([]byte("cpu")).Next(); e != nil && !e.Deleted() { t.Fatal("expected deleted tag key") } if itr := fs.TagValueIterator([]byte("cpu"), []byte("region")); itr != nil { t.Fatal("expected nil tag value iterator") } }) } func TestIndex_OpenFail(t *testing.T) { idx := NewDefaultIndex(t) require.NoError(t, idx.Open()) idx.Index.Close() // mess up the index: tslPath := path.Join(idx.Index.Path(), "3", "L0-00000001.tsl") tslFile, err := os.OpenFile(tslPath, os.O_RDWR, 0666) require.NoError(t, err) require.NoError(t, tslFile.Truncate(0)) // write poisonous TSL file - first byte doesn't matter, remaining bytes are an invalid uvarint _, err = tslFile.Write([]byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}) require.NoError(t, err) require.NoError(t, tslFile.Close()) idx.Index = tsi1.NewIndex(idx.SeriesFile.SeriesFile, "db0", tsi1.WithPath(idx.Index.Path())) err = idx.Index.Open() require.Error(t, err, "expected an error on opening the index") require.Contains(t, err.Error(), ".tsl\": parsing binary-encoded uint64 value failed; binary.Uvarint() returned -11") // ensure each partition is closed: for i := 0; i < int(idx.Index.PartitionN); i++ { assert.Equal(t, idx.Index.PartitionAt(i).FileN(), 0) } require.NoError(t, idx.Close()) } func TestIndex_Open(t *testing.T) { t.Run("open new index", func(t *testing.T) { // Opening a fresh index should set the MANIFEST version to current version. idx := MustOpenDefaultIndex(t) t.Cleanup(func() { assert.NoError(t, idx.Close()) }) // Check version set appropriately. for i := 0; uint64(i) < tsi1.DefaultPartitionN; i++ { partition := idx.PartitionAt(i) if got, exp := partition.Manifest().Version, 1; got != exp { t.Fatalf("got index version %d, expected %d", got, exp) } } for i := 0; i < int(idx.PartitionN); i++ { p := idx.PartitionAt(i) if got, exp := p.NeedsCompaction(false), false; got != exp { t.Fatalf("got needs compaction %v, expected %v", got, exp) } } }) // Reopening an open index should return an error. t.Run("reopen open index", func(t *testing.T) { idx := MustOpenDefaultIndex(t) t.Cleanup(func() { assert.NoError(t, idx.Close()) }) // Manually closing the existing SeriesFile so that it won't be left // opened after idx.Open(), which calls another idx.SeriesFile.Open(). // // This is required for t.TempDir() to be cleaned-up successfully on // Windows. assert.NoError(t, idx.SeriesFile.Close()) err := idx.Open() if err == nil { t.Fatal("didn't get an error on reopen, but expected one") } }) // Opening an incompatible index should return an error. incompatibleVersions := []int{-1, 0, 2} for _, v := range incompatibleVersions { t.Run(fmt.Sprintf("incompatible index version: %d", v), func(t *testing.T) { idx := NewDefaultIndex(t) // Manually create a MANIFEST file for an incompatible index version. // under one of the partitions. partitionPath := filepath.Join(idx.Path(), "2") os.MkdirAll(partitionPath, 0777) mpath := filepath.Join(partitionPath, tsi1.ManifestFileName) m := tsi1.NewManifest(mpath) m.Levels = nil m.Version = v // Set example MANIFEST version. if _, err := m.Write(); err != nil { t.Fatal(err) } // Log the MANIFEST file. data, err := os.ReadFile(mpath) if err != nil { panic(err) } t.Logf("Incompatible MANIFEST: %s", data) // Opening this index should return an error because the MANIFEST has an // incompatible version. err = idx.Open() t.Cleanup(func() { assert.NoError(t, idx.Close()) }) if !errors.Is(err, tsi1.ErrIncompatibleVersion) { t.Fatalf("got error %v, expected %v", err, tsi1.ErrIncompatibleVersion) } }) } } func TestIndex_Manifest(t *testing.T) { t.Run("current MANIFEST", func(t *testing.T) { idx := MustOpenIndex(t, tsi1.DefaultPartitionN) // Check version set appropriately. for i := 0; uint64(i) < tsi1.DefaultPartitionN; i++ { partition := idx.PartitionAt(i) if got, exp := partition.Manifest().Version, tsi1.Version; got != exp { t.Fatalf("got MANIFEST version %d, expected %d", got, exp) } } require.NoError(t, idx.Close()) }) } func TestIndex_DiskSizeBytes(t *testing.T) { idx := MustOpenIndex(t, tsi1.DefaultPartitionN) defer idx.Close() // Add series to index. if err := idx.CreateSeriesSliceIfNotExists([]Series{ {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})}, {Name: []byte("disk"), Tags: models.NewTags(map[string]string{"region": "north"})}, {Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "west", "country": "us"})}, }); err != nil { t.Fatal(err) } idx.RunStateAware(t, func(t *testing.T, state int) { // Each MANIFEST file is 419 bytes and there are tsi1.DefaultPartitionN of them expSize := int64(tsi1.DefaultPartitionN * 419) switch state { case Initial: fallthrough case Reopen: // In the log file, each series stores flag(1) + series(uvarint(2)) + len(name)(1) + len(key)(1) + len(value)(1) + checksum(4). expSize += 4 * 9 case PostCompaction: fallthrough case PostCompactionReopen: // For TSI files after a compaction, instead of 4*9, we have encoded measurement names, tag names, etc which is larger expSize += 2202 } if got, exp := idx.DiskSizeBytes(), expSize; got != exp { // We had some odd errors - if the size is unexpected, log it idx.Index.LogDiskSize(t) t.Fatalf("got %d bytes, expected %d", got, exp) } }) } func TestIndex_TagValueSeriesIDIterator(t *testing.T) { idx1 := MustOpenDefaultIndex(t) // Uses the single series creation method CreateSeriesIfNotExists defer idx1.Close() idx2 := MustOpenDefaultIndex(t) // Uses the batch series creation method CreateSeriesListIfNotExists defer idx2.Close() // Add some series. data := []struct { Key string Name string Tags map[string]string }{ {"cpu,region=west,server=a", "cpu", map[string]string{"region": "west", "server": "a"}}, {"cpu,region=west,server=b", "cpu", map[string]string{"region": "west", "server": "b"}}, {"cpu,region=east,server=a", "cpu", map[string]string{"region": "east", "server": "a"}}, {"cpu,region=north,server=c", "cpu", map[string]string{"region": "north", "server": "c"}}, {"cpu,region=south,server=s", "cpu", map[string]string{"region": "south", "server": "s"}}, {"mem,region=west,server=a", "mem", map[string]string{"region": "west", "server": "a"}}, {"mem,region=west,server=b", "mem", map[string]string{"region": "west", "server": "b"}}, {"mem,region=west,server=c", "mem", map[string]string{"region": "west", "server": "c"}}, {"disk,region=east,server=a", "disk", map[string]string{"region": "east", "server": "a"}}, {"disk,region=east,server=a", "disk", map[string]string{"region": "east", "server": "a"}}, {"disk,region=north,server=c", "disk", map[string]string{"region": "north", "server": "c"}}, } var batchKeys [][]byte var batchNames [][]byte var batchTags []models.Tags for _, pt := range data { if err := idx1.CreateSeriesIfNotExists([]byte(pt.Key), []byte(pt.Name), models.NewTags(pt.Tags)); err != nil { t.Fatal(err) } batchKeys = append(batchKeys, []byte(pt.Key)) batchNames = append(batchNames, []byte(pt.Name)) batchTags = append(batchTags, models.NewTags(pt.Tags)) } if err := idx2.CreateSeriesListIfNotExists(batchKeys, batchNames, batchTags); err != nil { t.Fatal(err) } testTagValueSeriesIDIterator := func(t *testing.T, name, key, value string, expKeys []string) { for i, idx := range []*Index{idx1, idx2} { sitr, err := idx.TagValueSeriesIDIterator([]byte(name), []byte(key), []byte(value)) if err != nil { t.Fatalf("[index %d] %v", i, err) } else if sitr == nil { t.Fatalf("[index %d] series id iterater nil", i) } // Convert series ids to series keys. itr := tsdb.NewSeriesIteratorAdapter(idx.SeriesFile.SeriesFile, sitr) if itr == nil { t.Fatalf("[index %d] got nil iterator", i) } defer itr.Close() var keys []string for e, err := itr.Next(); err == nil; e, err = itr.Next() { if e == nil { break } keys = append(keys, string(models.MakeKey(e.Name(), e.Tags()))) } if err != nil { t.Fatal(err) } // Iterator was in series id order, which may not be series key order. sort.Strings(keys) if got, exp := keys, expKeys; !reflect.DeepEqual(got, exp) { t.Fatalf("[index %d] got %v, expected %v", i, got, exp) } } } // Test that correct series are initially returned t.Run("initial", func(t *testing.T) { testTagValueSeriesIDIterator(t, "mem", "region", "west", []string{ "mem,region=west,server=a", "mem,region=west,server=b", "mem,region=west,server=c", }) }) // The result should now be cached, and the same result should be returned. t.Run("cached", func(t *testing.T) { testTagValueSeriesIDIterator(t, "mem", "region", "west", []string{ "mem,region=west,server=a", "mem,region=west,server=b", "mem,region=west,server=c", }) }) // Adding a new series that would be referenced by some cached bitsets (in this case // the bitsets for mem->region->west and mem->server->c) should cause the cached // bitsets to be updated. if err := idx1.CreateSeriesIfNotExists( []byte("mem,region=west,root=x,server=c"), []byte("mem"), models.NewTags(map[string]string{"region": "west", "root": "x", "server": "c"}), ); err != nil { t.Fatal(err) } if err := idx2.CreateSeriesListIfNotExists( [][]byte{[]byte("mem,region=west,root=x,server=c")}, [][]byte{[]byte("mem")}, []models.Tags{models.NewTags(map[string]string{"region": "west", "root": "x", "server": "c"})}, ); err != nil { t.Fatal(err) } t.Run("insert series", func(t *testing.T) { testTagValueSeriesIDIterator(t, "mem", "region", "west", []string{ "mem,region=west,root=x,server=c", "mem,region=west,server=a", "mem,region=west,server=b", "mem,region=west,server=c", }) }) if err := idx1.CreateSeriesIfNotExists( []byte("mem,region=west,root=x,server=c"), []byte("mem"), models.NewTags(map[string]string{"region": "west", "root": "x", "server": "c"}), ); err != nil { t.Fatal(err) } if err := idx2.CreateSeriesListIfNotExists( [][]byte{[]byte("mem,region=west,root=x,server=c")}, [][]byte{[]byte("mem")}, []models.Tags{models.NewTags(map[string]string{"region": "west", "root": "x", "server": "c"})}, ); err != nil { t.Fatal(err) } t.Run("insert same series", func(t *testing.T) { testTagValueSeriesIDIterator(t, "mem", "region", "west", []string{ "mem,region=west,root=x,server=c", "mem,region=west,server=a", "mem,region=west,server=b", "mem,region=west,server=c", }) }) t.Run("no matching series", func(t *testing.T) { testTagValueSeriesIDIterator(t, "foo", "bar", "zoo", nil) }) } // Index is a test wrapper for tsi1.Index. type Index struct { *tsi1.Index SeriesFile *SeriesFile } // NewIndex returns a new instance of Index at a temporary path. func NewIndex(tb testing.TB, partitionN uint64) *Index { idx := &Index{SeriesFile: NewSeriesFile(tb)} idx.Index = tsi1.NewIndex(idx.SeriesFile.SeriesFile, "db0", tsi1.WithPath(tb.TempDir())) idx.Index.PartitionN = partitionN return idx } // NewIndex returns a new instance of Index with default number of partitions at a temporary path. func NewDefaultIndex(tb testing.TB) *Index { return NewIndex(tb, tsi1.DefaultPartitionN) } // MustOpenIndex returns a new, open index. Panic on error. func MustOpenIndex(tb testing.TB, partitionN uint64) *Index { idx := NewIndex(tb, partitionN) if err := idx.Open(); err != nil { panic(err) } return idx } // MustOpenIndex returns a new, open index with the default number of partitions. func MustOpenDefaultIndex(tb testing.TB) *Index { return MustOpenIndex(tb, tsi1.DefaultPartitionN) } // Open opens the underlying tsi1.Index and tsdb.SeriesFile func (idx Index) Open() error { if err := idx.SeriesFile.Open(); err != nil { return err } return idx.Index.Open() } // Close closes and removes the index directory. func (idx *Index) Close() error { // Series file is opened first and must be closed last if err := idx.Index.Close(); err != nil { return err } if err := idx.SeriesFile.Close(); err != nil { return err } return nil } // Reopen closes and opens the index. func (idx *Index) Reopen(maxLogSize int64) error { if err := idx.Index.Close(); err != nil { return err } // Reopen the series file correctly, by initialising a new underlying series // file using the same disk data. if err := idx.SeriesFile.Reopen(); err != nil { return err } partitionN := idx.Index.PartitionN // Remember how many partitions to use. idx.Index = tsi1.NewIndex(idx.SeriesFile.SeriesFile, "db0", tsi1.WithPath(idx.Index.Path()), tsi1.WithMaximumLogFileSize(maxLogSize)) idx.Index.PartitionN = partitionN return idx.Open() } const ( Initial = iota Reopen PostCompaction PostCompactionReopen ) func curryState(state int, f func(t *testing.T, state int)) func(t *testing.T) { return func(t *testing.T) { f(t, state) } } // Run executes a subtest for each of several different states: // // - Immediately // - After reopen // - After compaction // - After reopen again // // The index should always respond in the same fashion regardless of // how data is stored. This helper allows the index to be easily tested // in all major states. func (idx *Index) RunStateAware(t *testing.T, fn func(t *testing.T, state int)) { // Invoke immediately. t.Run("state=initial", curryState(Initial, fn)) // Reopen and invoke again. if err := idx.Reopen(tsdb.DefaultMaxIndexLogFileSize); err != nil { t.Fatalf("reopen error: %s", err) } t.Run("state=reopen", curryState(Reopen, fn)) // Reopen requiring a full compaction of the TSL files and invoke again. idx.Reopen(1) for { needsCompaction := false for i := 0; i < int(idx.PartitionN); i++ { needsCompaction = needsCompaction || idx.PartitionAt(i).NeedsCompaction(false) } if !needsCompaction { break } time.Sleep(10 * time.Millisecond) } t.Run("state=post-compaction", curryState(PostCompaction, fn)) // Reopen and invoke again. if err := idx.Reopen(tsdb.DefaultMaxIndexLogFileSize); err != nil { t.Fatalf("post-compaction reopen error: %s", err) } t.Run("state=post-compaction-reopen", curryState(PostCompactionReopen, fn)) } // Run is the same is RunStateAware but for tests that do not depend on compaction state func (idx *Index) Run(t *testing.T, fn func(t *testing.T)) { idx.RunStateAware(t, func(t *testing.T, _ int) { fn(t) }) } // CreateSeriesSliceIfNotExists creates multiple series at a time. func (idx *Index) CreateSeriesSliceIfNotExists(a []Series) error { keys := make([][]byte, 0, len(a)) names := make([][]byte, 0, len(a)) tags := make([]models.Tags, 0, len(a)) for _, s := range a { keys = append(keys, models.MakeKey(s.Name, s.Tags)) names = append(names, s.Name) tags = append(tags, s.Tags) } return idx.CreateSeriesListIfNotExists(keys, names, tags) } var tsiditr tsdb.SeriesIDIterator // Calling TagValueSeriesIDIterator on the index involves merging several // SeriesIDSets together.BenchmarkIndex_TagValueSeriesIDIterator, which can have // a non trivial cost. In the case of `tsi` files, the mmapd sets are merged // together. In the case of tsl files the sets need to are cloned and then merged. // // Typical results on an i7 laptop // BenchmarkIndex_IndexFile_TagValueSeriesIDIterator/78888_series_TagValueSeriesIDIterator/cache-8 2000000 643 ns/op 744 B/op 13 allocs/op // BenchmarkIndex_IndexFile_TagValueSeriesIDIterator/78888_series_TagValueSeriesIDIterator/no_cache-8 10000 130749 ns/op 124952 B/op 350 allocs/op func BenchmarkIndex_IndexFile_TagValueSeriesIDIterator(b *testing.B) { runBenchMark := func(b *testing.B, cacheSize int) { var err error sfile := NewSeriesFile(b) // Load index idx := tsi1.NewIndex(sfile.SeriesFile, "foo", tsi1.WithPath("testdata/index-file-index"), tsi1.DisableCompactions(), tsi1.WithSeriesIDCacheSize(cacheSize), ) defer sfile.Close() if err = idx.Open(); err != nil { b.Fatal(err) } defer idx.Close() for i := 0; i < b.N; i++ { tsiditr, err = idx.TagValueSeriesIDIterator([]byte("m4"), []byte("tag0"), []byte("value4")) if err != nil { b.Fatal(err) } else if tsiditr == nil { b.Fatal("got nil iterator") } } } // This benchmark will merge eight bitsets each containing ~10,000 series IDs. b.Run("78888 series TagValueSeriesIDIterator", func(b *testing.B) { b.ReportAllocs() b.Run("cache", func(b *testing.B) { runBenchMark(b, tsdb.DefaultSeriesIDSetCacheSize) }) b.Run("no cache", func(b *testing.B) { runBenchMark(b, 0) }) }) } var errResult error // Typical results on an i7 laptop // BenchmarkIndex_CreateSeriesListIfNotExists/batch_size_1000/partition_1-8 1 4004452124 ns/op 2381998144 B/op 21686990 allocs/op // BenchmarkIndex_CreateSeriesListIfNotExists/batch_size_1000/partition_2-8 1 2625853773 ns/op 2368913968 B/op 21765385 allocs/op // BenchmarkIndex_CreateSeriesListIfNotExists/batch_size_1000/partition_4-8 1 2127205189 ns/op 2338013584 B/op 21908381 allocs/op // BenchmarkIndex_CreateSeriesListIfNotExists/batch_size_1000/partition_8-8 1 2331960889 ns/op 2332643248 B/op 22191763 allocs/op // BenchmarkIndex_CreateSeriesListIfNotExists/batch_size_1000/partition_16-8 1 2398489751 ns/op 2299551824 B/op 22670465 allocs/op // BenchmarkIndex_CreateSeriesListIfNotExists/batch_size_10000/partition_1-8 1 3404683972 ns/op 2387236504 B/op 21600671 allocs/op // BenchmarkIndex_CreateSeriesListIfNotExists/batch_size_10000/partition_2-8 1 2173772186 ns/op 2329237224 B/op 21631104 allocs/op // BenchmarkIndex_CreateSeriesListIfNotExists/batch_size_10000/partition_4-8 1 1729089575 ns/op 2299161840 B/op 21699878 allocs/op // BenchmarkIndex_CreateSeriesListIfNotExists/batch_size_10000/partition_8-8 1 1644295339 ns/op 2161473200 B/op 21796469 allocs/op // BenchmarkIndex_CreateSeriesListIfNotExists/batch_size_10000/partition_16-8 1 1683275418 ns/op 2171872432 B/op 21925974 allocs/op // BenchmarkIndex_CreateSeriesListIfNotExists/batch_size_100000/partition_1-8 1 3330508160 ns/op 2333250904 B/op 21574887 allocs/op // BenchmarkIndex_CreateSeriesListIfNotExists/batch_size_100000/partition_2-8 1 2278604285 ns/op 2292600808 B/op 21628966 allocs/op // BenchmarkIndex_CreateSeriesListIfNotExists/batch_size_100000/partition_4-8 1 1760098762 ns/op 2243730672 B/op 21684608 allocs/op // BenchmarkIndex_CreateSeriesListIfNotExists/batch_size_100000/partition_8-8 1 1693312924 ns/op 2166924112 B/op 21753079 allocs/op // BenchmarkIndex_CreateSeriesListIfNotExists/batch_size_100000/partition_16-8 1 1663610452 ns/op 2131177160 B/op 21806209 allocs/op func BenchmarkIndex_CreateSeriesListIfNotExists(b *testing.B) { // Read line-protocol and coerce into tsdb format. keys := make([][]byte, 0, 1e6) names := make([][]byte, 0, 1e6) tags := make([]models.Tags, 0, 1e6) // 1M series generated with: // $inch -b 10000 -c 1 -t 10,10,10,10,10,10 -f 1 -m 5 -p 1 fd, err := os.Open("../../testdata/line-protocol-1M.txt.gz") if err != nil { b.Fatal(err) } gzr, err := gzip.NewReader(fd) if err != nil { fd.Close() b.Fatal(err) } data, err := io.ReadAll(gzr) if err != nil { b.Fatal(err) } if err := fd.Close(); err != nil { b.Fatal(err) } points, err := models.ParsePoints(data) if err != nil { b.Fatal(err) } for _, pt := range points { keys = append(keys, pt.Key()) names = append(names, pt.Name()) tags = append(tags, pt.Tags()) } batchSizes := []int{1000, 10000, 100000} partitions := []uint64{1, 2, 4, 8, 16} for _, sz := range batchSizes { b.Run(fmt.Sprintf("batch size %d", sz), func(b *testing.B) { for _, partition := range partitions { b.Run(fmt.Sprintf("partition %d", partition), func(b *testing.B) { idx := MustOpenIndex(b, partition) for j := 0; j < b.N; j++ { for i := 0; i < len(keys); i += sz { k := keys[i : i+sz] n := names[i : i+sz] t := tags[i : i+sz] if errResult = idx.CreateSeriesListIfNotExists(k, n, t); errResult != nil { b.Fatal(err) } } // Reset the index... b.StopTimer() if err := idx.Close(); err != nil { b.Fatal(err) } idx = MustOpenIndex(b, partition) b.StartTimer() } }) } }) } } // This benchmark concurrently writes series to the index and fetches cached bitsets. // The idea is to emphasize the performance difference when bitset caching is on and off. // // Typical results for an i7 laptop // BenchmarkIndex_ConcurrentWriteQuery/partition_1/queries_100000/cache-8 1 3836451407 ns/op 2453296232 B/op 22648482 allocs/op // BenchmarkIndex_ConcurrentWriteQuery/partition_4/queries_100000/cache-8 1 1836598730 ns/op 2435668224 B/op 22908705 allocs/op // BenchmarkIndex_ConcurrentWriteQuery/partition_8/queries_100000/cache-8 1 1714771527 ns/op 2341518456 B/op 23450621 allocs/op // BenchmarkIndex_ConcurrentWriteQuery/partition_16/queries_100000/cache-8 1 1810658403 ns/op 2401239408 B/op 23868079 allocs/op // BenchmarkIndex_ConcurrentWriteQuery/partition_1/queries_100000/no_cache-8 1 4044478305 ns/op 4414915048 B/op 27292357 allocs/op // BenchmarkIndex_ConcurrentWriteQuery/partition_4/queries_100000/no_cache-8 1 18663345153 ns/op 23035974472 B/op 54015704 allocs/op // BenchmarkIndex_ConcurrentWriteQuery/partition_8/queries_100000/no_cache-8 1 22242979152 ns/op 28178915600 B/op 80156305 allocs/op // BenchmarkIndex_ConcurrentWriteQuery/partition_16/queries_100000/no_cache-8 1 24817283922 ns/op 34613960984 B/op 150356327 allocs/op func BenchmarkIndex_ConcurrentWriteQuery(b *testing.B) { // Read line-protocol and coerce into tsdb format. keys := make([][]byte, 0, 1e6) names := make([][]byte, 0, 1e6) tags := make([]models.Tags, 0, 1e6) // 1M series generated with: // $inch -b 10000 -c 1 -t 10,10,10,10,10,10 -f 1 -m 5 -p 1 fd, err := os.Open("testdata/line-protocol-1M.txt.gz") if err != nil { b.Fatal(err) } gzr, err := gzip.NewReader(fd) if err != nil { fd.Close() b.Fatal(err) } data, err := io.ReadAll(gzr) if err != nil { b.Fatal(err) } if err := fd.Close(); err != nil { b.Fatal(err) } points, err := models.ParsePoints(data) if err != nil { b.Fatal(err) } for _, pt := range points { keys = append(keys, pt.Key()) names = append(names, pt.Name()) tags = append(tags, pt.Tags()) } runBenchmark := func(b *testing.B, queryN int, partitions uint64, cacheSize int) { idx := &Index{SeriesFile: NewSeriesFile(b)} idx.Index = tsi1.NewIndex(idx.SeriesFile.SeriesFile, "db0", tsi1.WithPath(b.TempDir()), tsi1.WithSeriesIDCacheSize(cacheSize)) idx.Index.PartitionN = partitions if err := idx.Open(); err != nil { panic(err) } var wg sync.WaitGroup // Run concurrent iterator... runIter := func(b *testing.B) { keys := [][]string{ {"m0", "tag2", "value4"}, {"m1", "tag3", "value5"}, {"m2", "tag4", "value6"}, {"m3", "tag0", "value8"}, {"m4", "tag5", "value0"}, } for i := 0; i < queryN/5; i++ { for _, key := range keys { itr, err := idx.TagValueSeriesIDIterator([]byte(key[0]), []byte(key[1]), []byte(key[2])) if err != nil { b.Fatal(err) } else if itr == nil { b.Fatal("got nil iterator") } if err := itr.Close(); err != nil { b.Fatal(err) } } } } wg.Add(1) go func() { defer wg.Done(); runIter(b) }() batchSize := 10000 for j := 0; j < 1; j++ { for i := 0; i < len(keys); i += batchSize { k := keys[i : i+batchSize] n := names[i : i+batchSize] t := tags[i : i+batchSize] if errResult = idx.CreateSeriesListIfNotExists(k, n, t); errResult != nil { b.Fatal(err) } } // Wait for queries to finish wg.Wait() // Reset the index... b.StopTimer() if err := idx.Close(); err != nil { b.Fatal(err) } // Re-open everything idx := &Index{SeriesFile: NewSeriesFile(b)} idx.Index = tsi1.NewIndex(idx.SeriesFile.SeriesFile, "db0", tsi1.WithPath(b.TempDir()), tsi1.WithSeriesIDCacheSize(cacheSize)) idx.Index.PartitionN = partitions if err := idx.Open(); err != nil { b.Fatal(err) } wg.Add(1) go func() { defer wg.Done(); runIter(b) }() b.StartTimer() } } partitions := []uint64{1, 4, 8, 16} queries := []int{1e5} for _, partition := range partitions { b.Run(fmt.Sprintf("partition %d", partition), func(b *testing.B) { for _, queryN := range queries { b.Run(fmt.Sprintf("queries %d", queryN), func(b *testing.B) { b.Run("cache", func(b *testing.B) { runBenchmark(b, queryN, partition, tsdb.DefaultSeriesIDSetCacheSize) }) b.Run("no cache", func(b *testing.B) { runBenchmark(b, queryN, partition, 0) }) }) } }) } }