package tsi1_test import ( "context" "fmt" "io/ioutil" "math/rand" "os" "path/filepath" "reflect" "regexp" "sync" "testing" "time" "github.com/google/go-cmp/cmp" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxdb/tsdb/tsi1" "go.uber.org/zap" "go.uber.org/zap/zaptest" ) // Bloom filter settings used in tests. const M, K = 4096, 6 // Ensure index can iterate over all measurement names. func TestIndex_ForEachMeasurementName(t *testing.T) { idx := MustOpenIndex(1, tsi1.NewConfig()) defer idx.Close() // Add series to index. if err := idx.CreateSeriesSliceIfNotExists([]Series{ {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})}, {Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})}, }); err != nil { t.Fatal(err) } // Verify measurements are returned. idx.Run(t, func(t *testing.T) { var names []string if err := idx.ForEachMeasurementName(func(name []byte) error { names = append(names, string(name)) return nil }); err != nil { t.Fatal(err) } if !reflect.DeepEqual(names, []string{"cpu", "mem"}) { t.Fatalf("unexpected names: %#v", names) } }) // Add more series. if err := idx.CreateSeriesSliceIfNotExists([]Series{ {Name: []byte("disk")}, {Name: []byte("mem")}, }); err != nil { t.Fatal(err) } // Verify new measurements. idx.Run(t, func(t *testing.T) { var names []string if err := idx.ForEachMeasurementName(func(name []byte) error { names = append(names, string(name)) return nil }); err != nil { t.Fatal(err) } if !reflect.DeepEqual(names, []string{"cpu", "disk", "mem"}) { t.Fatalf("unexpected names: %#v", names) } }) } // Ensure index can return whether a measurement exists. func TestIndex_MeasurementExists(t *testing.T) { idx := MustOpenIndex(1, tsi1.NewConfig()) defer idx.Close() // Add series to index. if err := idx.CreateSeriesSliceIfNotExists([]Series{ {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})}, }); err != nil { t.Fatal(err) } // Verify measurement exists. idx.Run(t, func(t *testing.T) { if v, err := idx.MeasurementExists([]byte("cpu")); err != nil { t.Fatal(err) } else if !v { t.Fatal("expected measurement to exist") } }) name, tags := []byte("cpu"), models.NewTags(map[string]string{"region": "east"}) sid := idx.Index.SeriesFile().SeriesID(name, tags, nil) if sid.IsZero() { t.Fatalf("got 0 series id for %s/%v", name, tags) } // Delete one series. if err := idx.DropSeries(sid, models.MakeKey(name, tags), true); err != nil { t.Fatal(err) } // Verify measurement still exists. idx.Run(t, func(t *testing.T) { if v, err := idx.MeasurementExists([]byte("cpu")); err != nil { t.Fatal(err) } else if !v { t.Fatal("expected measurement to still exist") } }) // Delete second series. tags.Set([]byte("region"), []byte("west")) sid = idx.Index.SeriesFile().SeriesID(name, tags, nil) if sid.IsZero() { t.Fatalf("got 0 series id for %s/%v", name, tags) } if err := idx.DropSeries(sid, models.MakeKey(name, tags), true); err != nil { t.Fatal(err) } // Verify measurement is now deleted. idx.Run(t, func(t *testing.T) { if v, err := idx.MeasurementExists([]byte("cpu")); err != nil { t.Fatal(err) } else if v { t.Fatal("expected measurement to be deleted") } }) } // Ensure index can return a list of matching measurements. func TestIndex_MeasurementNamesByRegex(t *testing.T) { idx := MustOpenIndex(1, tsi1.NewConfig()) defer idx.Close() // Add series to index. if err := idx.CreateSeriesSliceIfNotExists([]Series{ {Name: []byte("cpu")}, {Name: []byte("disk")}, {Name: []byte("mem")}, }); err != nil { t.Fatal(err) } // Retrieve measurements by regex. idx.Run(t, func(t *testing.T) { names, err := idx.MeasurementNamesByRegex(regexp.MustCompile(`cpu|mem`)) if err != nil { t.Fatal(err) } else if !reflect.DeepEqual(names, [][]byte{[]byte("cpu"), []byte("mem")}) { t.Fatalf("unexpected names: %v", names) } }) } // Ensure index can delete a measurement and all related keys, values, & series. func TestIndex_DropMeasurement(t *testing.T) { idx := MustOpenIndex(1, tsi1.NewConfig()) defer idx.Close() // Add series to index. if err := idx.CreateSeriesSliceIfNotExists([]Series{ {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})}, {Name: []byte("disk"), Tags: models.NewTags(map[string]string{"region": "north"})}, {Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "west", "country": "us"})}, }); err != nil { t.Fatal(err) } // Drop measurement. if err := idx.DropMeasurement([]byte("cpu")); err != nil { t.Fatal(err) } // Verify data is gone in each stage. idx.Run(t, func(t *testing.T) { // Verify measurement is gone. if v, err := idx.MeasurementExists([]byte("cpu")); err != nil { t.Fatal(err) } else if v { t.Fatal("expected no measurement") } // Obtain file set to perform lower level checks. fs, err := idx.PartitionAt(0).FileSet() if err != nil { t.Fatal(err) } defer fs.Release() // Verify tags & values are gone. if e := fs.TagKeyIterator([]byte("cpu")).Next(); e != nil && !e.Deleted() { t.Fatal("expected deleted tag key") } if itr := fs.TagValueIterator([]byte("cpu"), []byte("region")); itr != nil { t.Fatal("expected nil tag value iterator") } }) } func TestIndex_Open(t *testing.T) { // Opening a fresh index should set the MANIFEST version to current version. idx := NewIndex(tsi1.DefaultPartitionN, tsi1.NewConfig()) defer idx.Close() t.Run("open new index", func(t *testing.T) { if err := idx.Open(); err != nil { t.Fatal(err) } // Check version set appropriately. for i := 0; uint64(i) < tsi1.DefaultPartitionN; i++ { partition := idx.PartitionAt(i) fs, err := partition.FileSet() if err != nil { t.Fatal(err) } if got, exp := partition.Manifest(fs).Version, 1; got != exp { t.Fatalf("got index version %d, expected %d", got, exp) } fs.Release() } }) // Reopening an open index should return an error. t.Run("reopen open index", func(t *testing.T) { err := idx.Open() if err == nil { idx.Close() t.Fatal("didn't get an error on reopen, but expected one") } idx.Close() }) // Opening an incompatible index should return an error. incompatibleVersions := []int{-1, 0, 2} for _, v := range incompatibleVersions { t.Run(fmt.Sprintf("incompatible index version: %d", v), func(t *testing.T) { idx = NewIndex(tsi1.DefaultPartitionN, tsi1.NewConfig()) // Manually create a MANIFEST file for an incompatible index version. // under one of the partitions. partitionPath := filepath.Join(idx.Path(), "2") os.MkdirAll(partitionPath, 0777) mpath := filepath.Join(partitionPath, tsi1.ManifestFileName) m := tsi1.NewManifest(mpath) m.Levels = nil m.Version = v // Set example MANIFEST version. if _, err := m.Write(); err != nil { t.Fatal(err) } // Log the MANIFEST file. data, err := ioutil.ReadFile(mpath) if err != nil { panic(err) } t.Logf("Incompatible MANIFEST: %s", data) // Opening this index should return an error because the MANIFEST has an // incompatible version. err = idx.Open() if err != tsi1.ErrIncompatibleVersion { idx.Close() t.Fatalf("got error %v, expected %v", err, tsi1.ErrIncompatibleVersion) } }) } } func TestIndex_Manifest(t *testing.T) { t.Run("current MANIFEST", func(t *testing.T) { idx := MustOpenIndex(tsi1.DefaultPartitionN, tsi1.NewConfig()) defer idx.Close() // Check version set appropriately. for i := 0; uint64(i) < tsi1.DefaultPartitionN; i++ { partition := idx.PartitionAt(i) fs, err := partition.FileSet() if err != nil { t.Fatal(err) } if got, exp := partition.Manifest(fs).Version, tsi1.Version; got != exp { t.Fatalf("got MANIFEST version %d, expected %d", got, exp) } fs.Release() } }) } func TestIndex_DiskSizeBytes(t *testing.T) { idx := MustOpenIndex(tsi1.DefaultPartitionN, tsi1.NewConfig()) defer idx.Close() // Add series to index. if err := idx.CreateSeriesSliceIfNotExists([]Series{ {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})}, {Name: []byte("disk"), Tags: models.NewTags(map[string]string{"region": "north"})}, {Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "west", "country": "us"})}, }); err != nil { t.Fatal(err) } // Verify on disk size is the same in each stage. // Each series stores flag(1) + series(uvarint(2)) + len(name)(1) + len(key)(1) + len(value)(1) + checksum(4). expSize := int64(4 * 9) // Each MANIFEST file is 419 bytes and there are tsi1.DefaultPartitionN of them expSize += int64(tsi1.DefaultPartitionN * 419) idx.Run(t, func(t *testing.T) { if got, exp := idx.DiskSizeBytes(), expSize; got != exp { t.Fatalf("got %d bytes, expected %d", got, exp) } }) } // Ensure index can returns measurement cardinality stats. func TestIndex_MeasurementCardinalityStats(t *testing.T) { t.Parallel() t.Run("Empty", func(t *testing.T) { idx := MustOpenIndex(1, tsi1.NewConfig()) defer idx.Close() if diff := cmp.Diff(idx.MeasurementCardinalityStats(), tsi1.MeasurementCardinalityStats{}); diff != "" { t.Fatal(diff) } }) t.Run("Simple", func(t *testing.T) { idx := MustOpenIndex(1, tsi1.NewConfig()) defer idx.Close() if err := idx.CreateSeriesSliceIfNotExists([]Series{ {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})}, {Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})}, }); err != nil { t.Fatal(err) } if diff := cmp.Diff(idx.MeasurementCardinalityStats(), tsi1.MeasurementCardinalityStats{"cpu": 2, "mem": 1}); diff != "" { t.Fatal(diff) } }) t.Run("SimpleWithDelete", func(t *testing.T) { idx := MustOpenIndex(1, tsi1.NewConfig()) defer idx.Close() if err := idx.CreateSeriesSliceIfNotExists([]Series{ {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})}, {Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})}, }); err != nil { t.Fatal(err) } seriesID := idx.SeriesFile.SeriesID([]byte("cpu"), models.NewTags(map[string]string{"region": "west"}), nil) if err := idx.DropSeries(seriesID, idx.SeriesFile.SeriesKey(seriesID), true); err != nil { t.Fatal(err) } else if diff := cmp.Diff(idx.MeasurementCardinalityStats(), tsi1.MeasurementCardinalityStats{"cpu": 1, "mem": 1}); diff != "" { t.Fatal(diff) } seriesID = idx.SeriesFile.SeriesID([]byte("mem"), models.NewTags(map[string]string{"region": "east"}), nil) if err := idx.DropSeries(seriesID, idx.SeriesFile.SeriesKey(seriesID), true); err != nil { t.Fatal(err) } else if diff := cmp.Diff(idx.MeasurementCardinalityStats(), tsi1.MeasurementCardinalityStats{"cpu": 1}); diff != "" { t.Fatal(diff) } }) t.Run("Large", func(t *testing.T) { if testing.Short() { t.Skip("short mode, skipping") } idx := MustOpenIndex(1, tsi1.NewConfig()) defer idx.Close() for i := 0; i < 1000; i++ { a := make([]Series, 1000) for j := range a { a[j] = Series{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": fmt.Sprintf("east%04d", (i*1000)+j)})} } if err := idx.CreateSeriesSliceIfNotExists(a); err != nil { t.Fatal(err) } } if diff := cmp.Diff(idx.MeasurementCardinalityStats(), tsi1.MeasurementCardinalityStats{"cpu": 1000000}); diff != "" { t.Fatal(diff) } // Reopen and verify count. if err := idx.Reopen(); err != nil { t.Fatal(err) } else if diff := cmp.Diff(idx.MeasurementCardinalityStats(), tsi1.MeasurementCardinalityStats{"cpu": 1000000}); diff != "" { t.Fatal(diff) } }) } // Ensure index keeps the correct set of series even with concurrent compactions. func TestIndex_CompactionConsistency(t *testing.T) { t.Skip("TODO: flaky test: https://github.com/influxdata/influxdb/issues/13755") t.Parallel() idx := NewIndex(tsi1.DefaultPartitionN, tsi1.NewConfig()) idx.WithLogger(zaptest.NewLogger(t, zaptest.Level(zap.DebugLevel))) if err := idx.Open(); err != nil { t.Fatal(err) } defer idx.Close() // Set up some framework to track launched goroutines. wg, done := new(sync.WaitGroup), make(chan struct{}) spawn := func(fn func()) { wg.Add(1) go func() { for { select { case <-done: wg.Done() return default: fn() } } }() } // Spawn a goroutine to constantly ask the index to compact. spawn(func() { idx.Compact() }) // Issue a number of writes and deletes for a while. expected, operations := make(map[string]struct{}), []string(nil) spawn(func() { var err error if len(expected) > 0 && rand.Intn(5) == 0 { for m := range expected { err = idx.DropMeasurement([]byte(m)) operations = append(operations, "delete: "+m) delete(expected, m) break } } else { m := []byte(fmt.Sprintf("m%d", rand.Int())) s := make([]Series, 100) for i := range s { s[i] = Series{Name: m, Tags: models.NewTags(map[string]string{fmt.Sprintf("t%d", i): "v"})} } err = idx.CreateSeriesSliceIfNotExists(s) operations = append(operations, "add: "+string(m)) expected[string(m)] = struct{}{} } if err != nil { t.Error(err) } }) // Let them run for a while and then wait. time.Sleep(10 * time.Second) close(done) wg.Wait() defer func() { if !t.Failed() { return } t.Log("expect", len(expected), "measurements after", len(operations), "operations") for _, op := range operations { t.Log(op) } }() for m := range expected { if v, err := idx.MeasurementExists([]byte(m)); err != nil { t.Fatal(err) } else if !v { t.Fatal("expected", m) } } miter, err := idx.MeasurementIterator() if err != nil { t.Fatal(err) } defer miter.Close() for { m, err := miter.Next() if err != nil { t.Fatal(err) } else if m == nil { break } else if _, ok := expected[string(m)]; !ok { t.Fatal("unexpected", string(m)) } } } func BenchmarkIndex_ComputeMeasurementCardinalityStats(b *testing.B) { idx := MustOpenIndex(1, tsi1.NewConfig()) defer idx.Close() const n = 10000 for i := 0; i < n; i++ { name := []byte(fmt.Sprintf("%08x", i)) a := make([]Series, 1000) for j := range a { a[j] = Series{Name: name, Tags: models.NewTags(map[string]string{"region": fmt.Sprintf("east%04d", j)})} } if err := idx.CreateSeriesSliceIfNotExists(a); err != nil { b.Fatal(err) } } b.Run("", func(b *testing.B) { for i := 0; i < b.N; i++ { if _, err := idx.ComputeMeasurementCardinalityStats(); err != nil { b.Fatal(err) } } }) } // Index is a test wrapper for tsi1.Index. type Index struct { *tsi1.Index Config tsi1.Config SeriesFile *SeriesFile } // NewIndex returns a new instance of Index at a temporary path. func NewIndex(partitionN uint64, c tsi1.Config) *Index { idx := &Index{ Config: c, SeriesFile: NewSeriesFile(), } idx.Index = tsi1.NewIndex(idx.SeriesFile.SeriesFile, idx.Config, tsi1.WithPath(MustTempDir())) idx.Index.PartitionN = partitionN return idx } // MustOpenIndex returns a new, open index. Panic on error. func MustOpenIndex(partitionN uint64, c tsi1.Config) *Index { idx := NewIndex(partitionN, c) if err := idx.Open(); err != nil { panic(err) } return idx } // Open opens the underlying tsi1.Index and tsdb.SeriesFile func (idx Index) Open() error { if err := idx.SeriesFile.Open(context.Background()); err != nil { return err } return idx.Index.Open(context.Background()) } // Close closes and removes the index directory. func (idx *Index) Close() error { defer os.RemoveAll(idx.Path()) if err := idx.Index.Close(); err != nil { return err } return idx.SeriesFile.Close() } // Reopen closes and opens the index. func (idx *Index) Reopen() error { if err := idx.Index.Close(); err != nil { return err } // Reopen the series file correctly, by initialising a new underlying series // file using the same disk data. if err := idx.SeriesFile.Reopen(); err != nil { return err } partitionN := idx.Index.PartitionN // Remember how many partitions to use. idx.Index = tsi1.NewIndex(idx.SeriesFile.SeriesFile, idx.Config, tsi1.WithPath(idx.Index.Path())) idx.Index.PartitionN = partitionN return idx.Open() } // Run executes a subtest for each of several different states: // // - Immediately // - After reopen // - After compaction // - After reopen again // // The index should always respond in the same fashion regardless of // how data is stored. This helper allows the index to be easily tested // in all major states. func (idx *Index) Run(t *testing.T, fn func(t *testing.T)) { // Invoke immediately. t.Run("state=initial", fn) // Reopen and invoke again. if err := idx.Reopen(); err != nil { t.Fatalf("reopen error: %s", err) } t.Run("state=reopen", fn) // TODO: Request a compaction. // if err := idx.Compact(); err != nil { // t.Fatalf("compact error: %s", err) // } // t.Run("state=post-compaction", fn) // Reopen and invoke again. if err := idx.Reopen(); err != nil { t.Fatalf("post-compaction reopen error: %s", err) } t.Run("state=post-compaction-reopen", fn) } // CreateSeriesSliceIfNotExists creates multiple series at a time. func (idx *Index) CreateSeriesSliceIfNotExists(a []Series) error { collection := &tsdb.SeriesCollection{ Keys: make([][]byte, 0, len(a)), Names: make([][]byte, 0, len(a)), Tags: make([]models.Tags, 0, len(a)), Types: make([]models.FieldType, 0, len(a)), } for _, s := range a { collection.Keys = append(collection.Keys, models.MakeKey(s.Name, s.Tags)) collection.Names = append(collection.Names, s.Name) collection.Tags = append(collection.Tags, s.Tags) collection.Types = append(collection.Types, s.Type) } return idx.CreateSeriesListIfNotExists(collection) }