feat(logging): Add startup logging for shard counts (#25378)

* feat(tsdb): Adds shard opening progress checks to startup
This PR adds a check to see how many shards are remaining
vs how many shards are opened. This change displays the percent
completed too.

closes influxdata/feature-requests#476
pull/25472/head
WeblWabl 2024-10-16 10:09:15 -05:00 committed by GitHub
parent 860a74f8a5
commit 3c87f524ed
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 474 additions and 190 deletions

View File

@ -153,6 +153,10 @@ func (cmd *Command) Run(args ...string) error {
s.Logger = cmd.Logger
s.CPUProfile = options.CPUProfile
s.MemProfile = options.MemProfile
sl := NewStartupProgressLogger(s.Logger)
s.SetStartupMetrics(sl)
if err := s.Open(); err != nil {
return fmt.Errorf("open server: %s", err)
}

View File

@ -65,6 +65,11 @@ type BuildInfo struct {
Time string
}
type StartupProgress interface {
AddShard()
CompletedShard()
}
// Server represents a container for the metadata and storage data and services.
// It is built using a Config and it manages the startup and shutdown of all
// services in the proper order.
@ -96,6 +101,8 @@ type Server struct {
Monitor *monitor.Monitor
StartupProgressMetrics StartupProgress
// Server reporting and registration
reportingDisabled bool
@ -279,6 +286,10 @@ func (s *Server) SetLogOutput(w io.Writer) {
s.MuxLogger = tcp.MuxLogger(w)
}
func (s *Server) SetStartupMetrics(sp StartupProgress) {
s.StartupProgressMetrics = sp
}
func (s *Server) appendMonitorService() {
s.Services = append(s.Services, s.Monitor)
}
@ -465,6 +476,9 @@ func (s *Server) Open() error {
s.MetaClient.WithLogger(s.Logger)
}
s.TSDBStore.WithLogger(s.Logger)
s.TSDBStore.WithStartupMetrics(s.StartupProgressMetrics)
if s.config.Data.QueryLogEnabled {
s.QueryExecutor.WithLogger(s.Logger)
} else if s.config.Coordinator.LogQueriesAfter > 0 || s.config.Coordinator.LogTimedOutQueries {

View File

@ -0,0 +1,32 @@
package run
import (
"fmt"
"sync/atomic"
"go.uber.org/zap"
)
type StartupProgressLogger struct {
shardsCompleted atomic.Uint64
shardsTotal atomic.Uint64
logger *zap.Logger
}
func NewStartupProgressLogger(logger *zap.Logger) *StartupProgressLogger {
return &StartupProgressLogger{
logger: logger,
}
}
func (s *StartupProgressLogger) AddShard() {
s.shardsTotal.Add(1)
}
func (s *StartupProgressLogger) CompletedShard() {
shardsCompleted := s.shardsCompleted.Add(1)
totalShards := s.shardsTotal.Load()
percentShards := float64(shardsCompleted) / float64(totalShards) * 100
s.logger.Info(fmt.Sprintf("Finished loading shard, current progress %.1f%% shards (%d / %d).", percentShards, shardsCompleted, totalShards))
}

View File

@ -54,6 +54,12 @@ const SeriesFileDirectory = "_series"
// databaseState keeps track of the state of a database.
type databaseState struct{ indexTypes map[string]int }
// struct to hold the result of opening each reader in a goroutine
type shardResponse struct {
s *Shard
err error
}
// addIndexType records that the database has a shard with the given index type.
func (d *databaseState) addIndexType(indexType string) {
if d.indexTypes == nil {
@ -135,6 +141,11 @@ type Store struct {
baseLogger *zap.Logger
Logger *zap.Logger
startupProgressMetrics interface {
AddShard()
CompletedShard()
}
closing chan struct{}
wg sync.WaitGroup
opened bool
@ -167,6 +178,13 @@ func (s *Store) WithLogger(log *zap.Logger) {
}
}
func (s *Store) WithStartupMetrics(sp interface {
AddShard()
CompletedShard()
}) {
s.startupProgressMetrics = sp
}
// Statistics returns statistics for period monitoring.
func (s *Store) Statistics(tags map[string]string) []models.Statistic {
s.mu.RLock()
@ -310,12 +328,6 @@ func (s *Store) Open() error {
}
func (s *Store) loadShards() error {
// res holds the result from opening each shard in a goroutine
type res struct {
s *Shard
err error
}
// Limit the number of concurrent TSM files to be opened to the number of cores.
s.EngineOptions.OpenLimiter = limiter.NewFixed(runtime.GOMAXPROCS(0))
@ -363,9 +375,8 @@ func (s *Store) loadShards() error {
log, logEnd := logger.NewOperation(s.Logger, "Open store", "tsdb_open")
defer logEnd()
shardLoaderWg := new(sync.WaitGroup)
t := limiter.NewFixed(runtime.GOMAXPROCS(0))
resC := make(chan *res)
var n int
// Determine how many shards we need to open by checking the store path.
dbDirs, err := os.ReadDir(s.path)
@ -373,126 +384,155 @@ func (s *Store) loadShards() error {
return err
}
for _, db := range dbDirs {
dbPath := filepath.Join(s.path, db.Name())
if !db.IsDir() {
log.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory"))
continue
}
if s.EngineOptions.DatabaseFilter != nil && !s.EngineOptions.DatabaseFilter(db.Name()) {
log.Info("Skipping database dir", logger.Database(db.Name()), zap.String("reason", "failed database filter"))
continue
}
// Load series file.
sfile, err := s.openSeriesFile(db.Name())
if err != nil {
return err
}
// Retrieve database index.
idx, err := s.createIndexIfNotExists(db.Name())
if err != nil {
return err
}
// Load each retention policy within the database directory.
rpDirs, err := os.ReadDir(dbPath)
if err != nil {
return err
}
for _, rp := range rpDirs {
rpPath := filepath.Join(s.path, db.Name(), rp.Name())
if !rp.IsDir() {
log.Info("Skipping retention policy dir", zap.String("name", rp.Name()), zap.String("reason", "not a directory"))
walkShardsAndProcess := func(fn func(sfile *SeriesFile, idx interface{}, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error) error {
for _, db := range dbDirs {
rpDirs, err := s.getRetentionPolicyDirs(db, log)
if err != nil {
return err
} else if rpDirs == nil {
continue
}
// The .series directory is not a retention policy.
if rp.Name() == SeriesFileDirectory {
continue
}
if s.EngineOptions.RetentionPolicyFilter != nil && !s.EngineOptions.RetentionPolicyFilter(db.Name(), rp.Name()) {
log.Info("Skipping retention policy dir", logger.RetentionPolicy(rp.Name()), zap.String("reason", "failed retention policy filter"))
continue
}
shardDirs, err := os.ReadDir(rpPath)
// Load series file.
sfile, err := s.openSeriesFile(db.Name())
if err != nil {
return err
}
for _, sh := range shardDirs {
// Series file should not be in a retention policy but skip just in case.
if sh.Name() == SeriesFileDirectory {
log.Warn("Skipping series file in retention policy dir", zap.String("path", filepath.Join(s.path, db.Name(), rp.Name())))
// Retrieve database index.
idx, err := s.createIndexIfNotExists(db.Name())
if err != nil {
return err
}
for _, rp := range rpDirs {
shardDirs, err := s.getShards(rp, db, log)
if err != nil {
return err
} else if shardDirs == nil {
continue
}
n++
go func(db, rp, sh string) {
t.Take()
defer t.Release()
start := time.Now()
path := filepath.Join(s.path, db, rp, sh)
walPath := filepath.Join(s.EngineOptions.Config.WALDir, db, rp, sh)
// Shard file names are numeric shardIDs
shardID, err := strconv.ParseUint(sh, 10, 64)
if err != nil {
log.Info("invalid shard ID found at path", zap.String("path", path))
resC <- &res{err: fmt.Errorf("%s is not a valid ID. Skipping shard.", sh)}
return
for _, sh := range shardDirs {
// Series file should not be in a retention policy but skip just in case.
if sh.Name() == SeriesFileDirectory {
log.Warn("Skipping series file in retention policy dir", zap.String("path", filepath.Join(s.path, db.Name(), rp.Name())))
continue
}
if s.EngineOptions.ShardFilter != nil && !s.EngineOptions.ShardFilter(db, rp, shardID) {
log.Info("skipping shard", zap.String("path", path), logger.Shard(shardID))
resC <- &res{}
return
if err := fn(sfile, idx, sh, db, rp); err != nil {
return err
}
// Copy options and assign shared index.
opt := s.EngineOptions
opt.InmemIndex = idx
// Provide an implementation of the ShardIDSets
opt.SeriesIDSets = shardSet{store: s, db: db}
// Existing shards should continue to use inmem index.
if _, err := os.Stat(filepath.Join(path, "index")); os.IsNotExist(err) {
opt.IndexVersion = InmemIndexName
}
// Open engine.
shard := NewShard(shardID, path, walPath, sfile, opt)
// Disable compactions, writes and queries until all shards are loaded
shard.EnableOnOpen = false
shard.CompactionDisabled = s.EngineOptions.CompactionDisabled
shard.WithLogger(s.baseLogger)
err = s.OpenShard(shard, false)
if err != nil {
log.Error("Failed to open shard", logger.Shard(shardID), zap.Error(err))
resC <- &res{err: fmt.Errorf("failed to open shard: %d: %w", shardID, err)}
return
}
resC <- &res{s: shard}
log.Info("Opened shard", zap.String("index_version", shard.IndexType()), zap.String("path", path), zap.Duration("duration", time.Since(start)))
}(db.Name(), rp.Name(), sh.Name())
}
}
}
return nil
}
// We use `rawShardCount` as a buffer size for channel creation below.
// If there is no startupProgressMetrics count then this will be 0 creating a
// zero buffer channel.
rawShardCount := 0
if s.startupProgressMetrics != nil {
err := walkShardsAndProcess(func(sfile *SeriesFile, idx interface{}, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error {
rawShardCount++
s.startupProgressMetrics.AddShard()
return nil
})
if err != nil {
return err
}
}
// Gather results of opening shards concurrently, keeping track of how
// many databases we are managing.
for i := 0; i < n; i++ {
res := <-resC
shardResC := make(chan *shardResponse, rawShardCount)
err = walkShardsAndProcess(func(sfile *SeriesFile, idx interface{}, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error {
shardLoaderWg.Add(1)
go func(db, rp, sh string) {
defer shardLoaderWg.Done()
t.Take()
defer t.Release()
start := time.Now()
path := filepath.Join(s.path, db, rp, sh)
walPath := filepath.Join(s.EngineOptions.Config.WALDir, db, rp, sh)
// Shard file names are numeric shardIDs
shardID, err := strconv.ParseUint(sh, 10, 64)
if err != nil {
log.Info("invalid shard ID found at path", zap.String("path", path))
shardResC <- &shardResponse{err: fmt.Errorf("%s is not a valid ID. Skipping shard.", sh)}
if s.startupProgressMetrics != nil {
s.startupProgressMetrics.CompletedShard()
}
return
}
if s.EngineOptions.ShardFilter != nil && !s.EngineOptions.ShardFilter(db, rp, shardID) {
log.Info("skipping shard", zap.String("path", path), logger.Shard(shardID))
shardResC <- &shardResponse{}
if s.startupProgressMetrics != nil {
s.startupProgressMetrics.CompletedShard()
}
return
}
// Copy options and assign shared index.
opt := s.EngineOptions
opt.InmemIndex = idx
// Provide an implementation of the ShardIDSets
opt.SeriesIDSets = shardSet{store: s, db: db}
// Existing shards should continue to use inmem index.
if _, err := os.Stat(filepath.Join(path, "index")); os.IsNotExist(err) {
opt.IndexVersion = InmemIndexName
}
// Open engine.
shard := NewShard(shardID, path, walPath, sfile, opt)
// Disable compactions, writes and queries until all shards are loaded
shard.EnableOnOpen = false
shard.CompactionDisabled = s.EngineOptions.CompactionDisabled
shard.WithLogger(s.baseLogger)
err = s.OpenShard(shard, false)
if err != nil {
log.Error("Failed to open shard", logger.Shard(shardID), zap.Error(err))
shardResC <- &shardResponse{err: fmt.Errorf("failed to open shard: %d: %w", shardID, err)}
if s.startupProgressMetrics != nil {
s.startupProgressMetrics.CompletedShard()
}
return
}
shardResC <- &shardResponse{s: shard}
log.Info("Opened shard", zap.String("index_version", shard.IndexType()), zap.String("path", path), zap.Duration("duration", time.Since(start)))
if s.startupProgressMetrics != nil {
s.startupProgressMetrics.CompletedShard()
}
}(db.Name(), rp.Name(), sh.Name())
return nil
})
if err := s.enableShards(shardLoaderWg, shardResC); err != nil {
return err
}
return nil
}
func (s *Store) enableShards(wg *sync.WaitGroup, resC chan *shardResponse) error {
go func() {
wg.Wait()
close(resC)
}()
for res := range resC {
if res.s == nil || res.err != nil {
continue
}
@ -503,7 +543,6 @@ func (s *Store) loadShards() error {
}
s.databases[res.s.database].addIndexType(res.s.IndexType())
}
close(resC)
// Check if any databases are running multiple index types.
for db, state := range s.databases {
@ -1182,12 +1221,8 @@ func byIndexType(name string) ShardPredicate {
// concurrent use. If any of the functions return an error, the first error is
// returned.
func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error {
// struct to hold the result of opening each reader in a goroutine
type res struct {
err error
}
resC := make(chan res)
resC := make(chan shardResponse, len(shards))
var n int
for _, sh := range shards {
@ -1195,11 +1230,11 @@ func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error {
go func(sh *Shard) {
if err := fn(sh); err != nil {
resC <- res{err: fmt.Errorf("shard %d: %s", sh.id, err)}
resC <- shardResponse{err: fmt.Errorf("shard %d: %s", sh.id, err)}
return
}
resC <- res{}
resC <- shardResponse{}
}(sh)
}
@ -2367,3 +2402,49 @@ func (s shardSet) ForEach(f func(ids *SeriesIDSet)) error {
}
return nil
}
func (s *Store) getRetentionPolicyDirs(db os.DirEntry, log *zap.Logger) ([]os.DirEntry, error) {
dbPath := filepath.Join(s.path, db.Name())
if !db.IsDir() {
log.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory"))
return nil, nil
}
if s.EngineOptions.DatabaseFilter != nil && !s.EngineOptions.DatabaseFilter(db.Name()) {
log.Info("Skipping database dir", logger.Database(db.Name()), zap.String("reason", "failed database filter"))
return nil, nil
}
// Load each retention policy within the database directory.
rpDirs, err := os.ReadDir(dbPath)
if err != nil {
return nil, err
}
return rpDirs, nil
}
func (s *Store) getShards(rpDir os.DirEntry, dbDir os.DirEntry, log *zap.Logger) ([]os.DirEntry, error) {
rpPath := filepath.Join(s.path, dbDir.Name(), rpDir.Name())
if !rpDir.IsDir() {
log.Info("Skipping retention policy dir", zap.String("name", rpDir.Name()), zap.String("reason", "not a directory"))
return nil, nil
}
// The .series directory is not a retention policy.
if rpDir.Name() == SeriesFileDirectory {
return nil, nil
}
if s.EngineOptions.RetentionPolicyFilter != nil && !s.EngineOptions.RetentionPolicyFilter(dbDir.Name(), rpDir.Name()) {
log.Info("Skipping retention policy dir", logger.RetentionPolicy(rpDir.Name()), zap.String("reason", "failed retention policy filter"))
return nil, nil
}
shardDirs, err := os.ReadDir(rpPath)
if err != nil {
return nil, err
}
return shardDirs, nil
}

View File

@ -5,6 +5,8 @@ import (
"context"
"errors"
"fmt"
"go.uber.org/zap/zaptest"
"log"
"math"
"math/rand"
"os"
@ -19,7 +21,6 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/influxdata/influxdb/internal"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/deep"
"github.com/influxdata/influxdb/pkg/slices"
@ -36,7 +37,7 @@ func TestStore_DeleteRetentionPolicy(t *testing.T) {
t.Parallel()
test := func(index string) {
s := MustOpenStore(index)
s := MustOpenStore(t, index)
defer s.Close()
// Create a new shard and verify that it exists.
@ -87,7 +88,7 @@ func TestStore_DeleteRetentionPolicy(t *testing.T) {
}
// Reopen other shard and check it still exists.
if err := s.Reopen(); err != nil {
if err := s.Reopen(t); err != nil {
t.Error(err)
} else if sh := s.Shard(3); sh == nil {
t.Errorf("shard 3 does not exist")
@ -112,7 +113,7 @@ func TestStore_CreateShard(t *testing.T) {
t.Parallel()
test := func(index string) {
s := MustOpenStore(index)
s := MustOpenStore(t, index)
defer s.Close()
// Create a new shard and verify that it exists.
@ -130,7 +131,7 @@ func TestStore_CreateShard(t *testing.T) {
}
// Reopen shard and recheck.
if err := s.Reopen(); err != nil {
if err := s.Reopen(t); err != nil {
t.Fatal(err)
} else if sh := s.Shard(1); sh == nil {
t.Fatalf("expected shard(1)")
@ -144,12 +145,102 @@ func TestStore_CreateShard(t *testing.T) {
}
}
// Ensure the store can create a new shard.
func TestStore_StartupShardProgress(t *testing.T) {
t.Parallel()
test := func(index string) {
s := MustOpenStore(t, index)
defer s.Close()
// Create a new shard and verify that it exists.
require.NoError(t, s.CreateShard("db0", "rp0", 1, true))
sh := s.Shard(1)
require.NotNil(t, sh)
// Create another shard and verify that it exists.
require.NoError(t, s.CreateShard("db0", "rp0", 2, true))
sh = s.Shard(2)
require.NotNil(t, sh)
msl := &mockStartupLogger{}
// Reopen shard and recheck.
require.NoError(t, s.Reopen(t, WithStartupMetrics(msl)))
// Equality check to make sure shards are always added prior to
// completion being called. This test opens 3 total shards - 1 shard
// fails, but we still want to track that it was attempted to be opened.
require.Equal(t, msl.shardTracker, []string{
"shard-add",
"shard-add",
"shard-complete",
"shard-complete",
})
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) { test(index) })
}
}
// Introduces a test to ensure that shard loading still accounts for bad shards. We still want these to show up
// during the initial shard loading even though its in a error state.
func TestStore_BadShardLoading(t *testing.T) {
t.Parallel()
test := func(index string) {
s := MustOpenStore(t, index)
defer s.Close()
// Create a new shard and verify that it exists.
require.NoError(t, s.CreateShard("db0", "rp0", 1, true))
sh := s.Shard(1)
require.NotNil(t, sh)
// Create another shard and verify that it exists.
require.NoError(t, s.CreateShard("db0", "rp0", 2, true))
sh = s.Shard(2)
require.NotNil(t, sh)
// Create another shard and verify that it exists.
require.NoError(t, s.CreateShard("db0", "rp0", 3, true))
sh = s.Shard(3)
require.NotNil(t, sh)
s.SetShardOpenErrorForTest(sh.ID(), errors.New("a shard opening error occurred"))
err2 := s.OpenShard(s.Shard(sh.ID()), false)
require.Error(t, err2, "no error opening bad shard")
msl := &mockStartupLogger{}
// Reopen shard and recheck.
require.NoError(t, s.Reopen(t, WithStartupMetrics(msl)))
// Equality check to make sure shards are always added prior to
// completion being called. This test opens 3 total shards - 1 shard
// fails, but we still want to track that it was attempted to be opened.
require.Equal(t, msl.shardTracker, []string{
"shard-add",
"shard-add",
"shard-add",
"shard-complete",
"shard-complete",
"shard-complete",
})
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) { test(index) })
}
}
func TestStore_BadShard(t *testing.T) {
const errStr = "a shard open error"
indexes := tsdb.RegisteredIndexes()
for _, idx := range indexes {
func() {
s := MustOpenStore(idx)
s := MustOpenStore(t, idx)
defer require.NoErrorf(t, s.Close(), "closing store with index type: %s", idx)
sh := tsdb.NewTempShard(idx)
@ -175,7 +266,7 @@ func TestStore_CreateMixedShards(t *testing.T) {
t.Parallel()
test := func(index1 string, index2 string) {
s := MustOpenStore(index1)
s := MustOpenStore(t, index1)
defer s.Close()
// Create a new shard and verify that it exists.
@ -187,7 +278,7 @@ func TestStore_CreateMixedShards(t *testing.T) {
s.EngineOptions.IndexVersion = index2
s.index = index2
if err := s.Reopen(); err != nil {
if err := s.Reopen(t); err != nil {
t.Fatal(err)
}
@ -199,7 +290,7 @@ func TestStore_CreateMixedShards(t *testing.T) {
}
// Reopen shard and recheck.
if err := s.Reopen(); err != nil {
if err := s.Reopen(t); err != nil {
t.Fatal(err)
} else if sh := s.Shard(1); sh == nil {
t.Fatalf("expected shard(1)")
@ -231,7 +322,7 @@ func TestStore_DropMeasurementMixedShards(t *testing.T) {
t.Parallel()
test := func(index1 string, index2 string) {
s := MustOpenStore(index1)
s := MustOpenStore(t, index1)
defer s.Close()
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
@ -242,7 +333,7 @@ func TestStore_DropMeasurementMixedShards(t *testing.T) {
s.EngineOptions.IndexVersion = index2
s.index = index2
if err := s.Reopen(); err != nil {
if err := s.Reopen(t); err != nil {
t.Fatal(err)
}
@ -276,7 +367,7 @@ func TestStore_DropConcurrentWriteMultipleShards(t *testing.T) {
t.Parallel()
test := func(index string) {
s := MustOpenStore(index)
s := MustOpenStore(t, index)
defer s.Close()
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
@ -340,7 +431,7 @@ func TestStore_WriteMixedShards(t *testing.T) {
t.Parallel()
test := func(index1 string, index2 string) {
s := MustOpenStore(index1)
s := MustOpenStore(t, index1)
defer s.Close()
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
@ -351,7 +442,7 @@ func TestStore_WriteMixedShards(t *testing.T) {
s.EngineOptions.IndexVersion = index2
s.index = index2
if err := s.Reopen(); err != nil {
if err := s.Reopen(t); err != nil {
t.Fatal(err)
}
@ -413,7 +504,7 @@ func TestStore_DeleteSeries_NonExistentDB(t *testing.T) {
t.Parallel()
test := func(index string) {
s := MustOpenStore(index)
s := MustOpenStore(t, index)
defer s.Close()
if err := s.DeleteSeries("db0", nil, nil); err != nil {
@ -431,7 +522,7 @@ func TestStore_DeleteSeries_MultipleSources(t *testing.T) {
t.Parallel()
test := func(index string) {
s := MustOpenStore(index)
s := MustOpenStore(t, index)
defer s.Close()
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
@ -461,7 +552,7 @@ func TestStore_DeleteShard(t *testing.T) {
t.Parallel()
test := func(index string) error {
s := MustOpenStore(index)
s := MustOpenStore(t, index)
defer s.Close()
// Create a new shard and verify that it exists.
@ -493,7 +584,7 @@ func TestStore_DeleteShard(t *testing.T) {
s.MustWriteToShardString(3, "cpu,serverb=b v=1")
// Reopen the store and check all shards still exist
if err := s.Reopen(); err != nil {
if err := s.Reopen(t); err != nil {
return err
}
for i := uint64(1); i <= 3; i++ {
@ -550,7 +641,7 @@ func TestStore_CreateShardSnapShot(t *testing.T) {
t.Parallel()
test := func(index string) {
s := MustOpenStore(index)
s := MustOpenStore(t, index)
defer s.Close()
// Create a new shard and verify that it exists.
@ -578,7 +669,7 @@ func TestStore_Open(t *testing.T) {
t.Parallel()
test := func(index string) {
s := NewStore(index)
s := NewStore(t, index)
defer s.Close()
if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp0", "2"), 0777); err != nil {
@ -621,9 +712,14 @@ func TestStore_Open_InvalidDatabaseFile(t *testing.T) {
t.Parallel()
test := func(index string) {
s := NewStore(index)
s := NewStore(t, index)
defer s.Close()
// Ensure the directory exists before creating the file.
if err := os.MkdirAll(s.Path(), 0777); err != nil {
t.Fatal(err)
}
// Create a file instead of a directory for a database.
if _, err := os.Create(filepath.Join(s.Path(), "db0")); err != nil {
t.Fatal(err)
@ -647,7 +743,7 @@ func TestStore_Open_InvalidRetentionPolicy(t *testing.T) {
t.Parallel()
test := func(index string) {
s := NewStore(index)
s := NewStore(t, index)
defer s.Close()
// Create an RP file instead of a directory.
@ -677,7 +773,7 @@ func TestStore_Open_InvalidShard(t *testing.T) {
t.Parallel()
test := func(index string) {
s := NewStore(index)
s := NewStore(t, index)
defer s.Close()
// Create a non-numeric shard file.
@ -707,7 +803,7 @@ func TestShards_CreateIterator(t *testing.T) {
t.Parallel()
test := func(index string) {
s := MustOpenStore(index)
s := MustOpenStore(t, index)
defer s.Close()
// Create shard #0 with data.
@ -792,7 +888,7 @@ func TestStore_NewReadersBlocked(t *testing.T) {
test := func(index string) {
t.Helper()
s := MustOpenStore(index)
s := MustOpenStore(t, index)
defer s.Close()
shardInUse := func(shardID uint64) bool {
@ -865,7 +961,7 @@ func TestStore_NewReadersBlocked(t *testing.T) {
// Ensure the store can backup a shard and another store can restore it.
func TestStore_BackupRestoreShard(t *testing.T) {
test := func(index string) {
s0, s1 := MustOpenStore(index), MustOpenStore(index)
s0, s1 := MustOpenStore(t, index), MustOpenStore(t, index)
defer s0.Close()
defer s1.Close()
@ -876,7 +972,7 @@ func TestStore_BackupRestoreShard(t *testing.T) {
`cpu value=3 20`,
)
if err := s0.Reopen(); err != nil {
if err := s0.Reopen(t); err != nil {
t.Fatal(err)
}
@ -946,7 +1042,7 @@ func TestStore_Shard_SeriesN(t *testing.T) {
t.Parallel()
test := func(index string) error {
s := MustOpenStore(index)
s := MustOpenStore(t, index)
defer s.Close()
// Create shard with data.
@ -982,7 +1078,7 @@ func TestStore_MeasurementNames_Deduplicate(t *testing.T) {
t.Parallel()
test := func(index string) {
s := MustOpenStore(index)
s := MustOpenStore(t, index)
defer s.Close()
// Create shard with data.
@ -1083,7 +1179,7 @@ func TestStore_Cardinality_Tombstoning(t *testing.T) {
}
test := func(index string) {
store := NewStore(index)
store := NewStore(t, index)
if err := store.Open(); err != nil {
panic(err)
}
@ -1148,7 +1244,7 @@ func TestStore_Cardinality_Unique(t *testing.T) {
}
test := func(index string) {
store := NewStore(index)
store := NewStore(t, index)
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
if err := store.Open(); err != nil {
panic(err)
@ -1230,7 +1326,7 @@ func TestStore_Cardinality_Duplicates(t *testing.T) {
}
test := func(index string) {
store := NewStore(index)
store := NewStore(t, index)
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
if err := store.Open(); err != nil {
panic(err)
@ -1250,7 +1346,7 @@ func TestStore_MetaQuery_Timeout(t *testing.T) {
}
test := func(index string) {
store := NewStore(index)
store := NewStore(t, index)
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
if err := store.Open(); err != nil {
panic(err)
@ -1442,7 +1538,7 @@ func TestStore_Cardinality_Compactions(t *testing.T) {
}
test := func(index string) error {
store := NewStore(index)
store := NewStore(t, index)
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
if err := store.Open(); err != nil {
panic(err)
@ -1467,7 +1563,7 @@ func TestStore_Cardinality_Limit_On_InMem_Index(t *testing.T) {
t.Skip("Skipping test in short, race and appveyor mode.")
}
store := NewStore("inmem")
store := NewStore(t, "inmem")
store.EngineOptions.Config.MaxSeriesPerDatabase = 100000
if err := store.Open(); err != nil {
panic(err)
@ -1581,7 +1677,7 @@ func TestStore_Sketches(t *testing.T) {
}
test := func(index string) error {
store := MustOpenStore(index)
store := MustOpenStore(t, index)
defer store.Close()
// Generate point data to write to the shards.
@ -1610,7 +1706,7 @@ func TestStore_Sketches(t *testing.T) {
}
// Reopen the store.
if err := store.Reopen(); err != nil {
if err := store.Reopen(t); err != nil {
return err
}
@ -1643,7 +1739,7 @@ func TestStore_Sketches(t *testing.T) {
}
// Reopen the store.
if err := store.Reopen(); err != nil {
if err := store.Reopen(t); err != nil {
return err
}
@ -1779,7 +1875,7 @@ func TestStore_TagValues(t *testing.T) {
}
setup := func(index string) (*Store, []uint64) { // returns shard ids
s := MustOpenStore(index)
s := MustOpenStore(t, index)
fmtStr := `cpu1%[1]d,foo=a,ignoreme=nope,host=tv%[2]d,shard=s%[3]d value=1 %[4]d
cpu1%[1]d,host=nofoo value=1 %[4]d
@ -1830,7 +1926,7 @@ func TestStore_Measurements_Auth(t *testing.T) {
t.Parallel()
test := func(index string) error {
s := MustOpenStore(index)
s := MustOpenStore(t, index)
defer s.Close()
// Create shard #0 with data.
@ -1919,7 +2015,7 @@ func TestStore_TagKeys_Auth(t *testing.T) {
t.Parallel()
test := func(index string) error {
s := MustOpenStore(index)
s := MustOpenStore(t, index)
defer s.Close()
// Create shard #0 with data.
@ -2017,7 +2113,7 @@ func TestStore_TagValues_Auth(t *testing.T) {
t.Parallel()
test := func(index string) error {
s := MustOpenStore(index)
s := MustOpenStore(t, index)
defer s.Close()
// Create shard #0 with data.
@ -2148,7 +2244,7 @@ func createTagValues(mname string, kvs map[string][]string) tsdb.TagValues {
func TestStore_MeasurementNames_ConcurrentDropShard(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
s := MustOpenStore(index)
s := MustOpenStore(t, index)
defer s.Close()
shardN := 10
@ -2233,7 +2329,7 @@ func TestStore_MeasurementNames_ConcurrentDropShard(t *testing.T) {
func TestStore_TagKeys_ConcurrentDropShard(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
s := MustOpenStore(index)
s := MustOpenStore(t, index)
defer s.Close()
shardN := 10
@ -2324,7 +2420,7 @@ func TestStore_TagKeys_ConcurrentDropShard(t *testing.T) {
func TestStore_TagValues_ConcurrentDropShard(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
s := MustOpenStore(index)
s := MustOpenStore(t, index)
defer s.Close()
shardN := 10
@ -2428,7 +2524,7 @@ func TestStore_TagValues_ConcurrentDropShard(t *testing.T) {
func BenchmarkStore_SeriesCardinality_100_Shards(b *testing.B) {
for _, index := range tsdb.RegisteredIndexes() {
store := NewStore(index)
store := NewStore(b, index)
if err := store.Open(); err != nil {
panic(err)
}
@ -2459,7 +2555,7 @@ func BenchmarkStoreOpen_200KSeries_100Shards(b *testing.B) { benchmarkStoreOpen(
func benchmarkStoreOpen(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt, shardCnt int) {
var store *Store
setup := func(index string) error {
store := MustOpenStore(index)
store := MustOpenStore(b, index)
// Generate test series (measurements + unique tag sets).
series := genTestSeries(mCnt, tkCnt, tvCnt)
@ -2530,7 +2626,7 @@ func BenchmarkStore_TagValues(b *testing.B) {
}
setup := func(shards, measurements, tagValues int, index string, useRandom bool) (*Store, []uint64) { // returns shard ids
s := NewStore(index)
s := NewStore(b, index)
if err := s.Open(); err != nil {
panic(err)
}
@ -2632,23 +2728,46 @@ func BenchmarkStore_TagValues(b *testing.B) {
// Store is a test wrapper for tsdb.Store.
type Store struct {
*tsdb.Store
index string
path string
index string
walPath string
opts []StoreOption
}
type StoreOption func(s *Store) error
func WithStartupMetrics(sm *mockStartupLogger) StoreOption {
return func(s *Store) error {
s.WithStartupMetrics(sm)
return nil
}
}
// NewStore returns a new instance of Store with a temporary path.
func NewStore(index string) *Store {
path, err := os.MkdirTemp("", "influxdb-tsdb-")
if err != nil {
panic(err)
func NewStore(tb testing.TB, index string, opts ...StoreOption) *Store {
tb.Helper()
// The WAL directory must not be rooted under the data path. Otherwise reopening
// the store will generate series indices for the WAL directories.
rootPath := tb.TempDir()
path := filepath.Join(rootPath, "data")
walPath := filepath.Join(rootPath, "wal")
s := &Store{
Store: tsdb.NewStore(path),
path: path,
index: index,
walPath: walPath,
opts: opts,
}
s := &Store{Store: tsdb.NewStore(path), index: index}
s.EngineOptions.IndexVersion = index
s.EngineOptions.Config.WALDir = filepath.Join(path, "wal")
s.EngineOptions.Config.WALDir = walPath
s.EngineOptions.Config.TraceLoggingEnabled = true
s.WithLogger(zaptest.NewLogger(tb))
if testing.Verbose() {
s.WithLogger(logger.New(os.Stdout))
for _, o := range s.opts {
err := o(s)
require.NoError(tb, err)
}
return s
@ -2656,8 +2775,8 @@ func NewStore(index string) *Store {
// MustOpenStore returns a new, open Store using the specified index,
// at a temporary path.
func MustOpenStore(index string) *Store {
s := NewStore(index)
func MustOpenStore(tb testing.TB, index string, opts ...StoreOption) *Store {
s := NewStore(tb, index, opts...)
if err := s.Open(); err != nil {
panic(err)
@ -2665,27 +2784,44 @@ func MustOpenStore(index string) *Store {
return s
}
// Reopen closes and reopens the store as a new store.
func (s *Store) Reopen() error {
if err := s.Store.Close(); err != nil {
return err
func (s *Store) Reopen(tb testing.TB, newOpts ...StoreOption) error {
tb.Helper()
if s.Store != nil {
if err := s.Store.Close(); err != nil {
return err
}
}
s.Store = tsdb.NewStore(s.Path())
s.Store = tsdb.NewStore(s.path)
s.EngineOptions.IndexVersion = s.index
s.EngineOptions.Config.WALDir = filepath.Join(s.Path(), "wal")
s.EngineOptions.Config.WALDir = s.walPath
s.EngineOptions.Config.TraceLoggingEnabled = true
if testing.Verbose() {
s.WithLogger(logger.New(os.Stdout))
s.WithLogger(zaptest.NewLogger(tb))
if len(newOpts) > 0 {
s.opts = newOpts
}
for _, o := range s.opts {
err := o(s)
require.NoError(tb, err)
}
return s.Store.Open()
}
// Close closes the store and removes the underlying data.
func (s *Store) Close() error {
defer os.RemoveAll(s.Path())
return s.Store.Close()
defer func(path string) {
err := os.RemoveAll(path)
if err != nil {
log.Fatal(err)
}
}(s.path)
if s.Store != nil {
return s.Store.Close()
}
return nil
}
// MustCreateShardWithData creates a shard and writes line protocol data to it.
@ -2754,3 +2890,20 @@ func dirExists(path string) bool {
}
return !os.IsNotExist(err)
}
type mockStartupLogger struct {
shardTracker []string
mu sync.Mutex
}
func (m *mockStartupLogger) AddShard() {
m.mu.Lock()
m.shardTracker = append(m.shardTracker, fmt.Sprintf("shard-add"))
m.mu.Unlock()
}
func (m *mockStartupLogger) CompletedShard() {
m.mu.Lock()
m.shardTracker = append(m.shardTracker, fmt.Sprintf("shard-complete"))
m.mu.Unlock()
}