feat(logging): Add startup logging for shard counts (#25378)
* feat(tsdb): Adds shard opening progress checks to startup This PR adds a check to see how many shards are remaining vs how many shards are opened. This change displays the percent completed too. closes influxdata/feature-requests#476pull/25472/head
parent
860a74f8a5
commit
3c87f524ed
|
@ -153,6 +153,10 @@ func (cmd *Command) Run(args ...string) error {
|
|||
s.Logger = cmd.Logger
|
||||
s.CPUProfile = options.CPUProfile
|
||||
s.MemProfile = options.MemProfile
|
||||
|
||||
sl := NewStartupProgressLogger(s.Logger)
|
||||
s.SetStartupMetrics(sl)
|
||||
|
||||
if err := s.Open(); err != nil {
|
||||
return fmt.Errorf("open server: %s", err)
|
||||
}
|
||||
|
|
|
@ -65,6 +65,11 @@ type BuildInfo struct {
|
|||
Time string
|
||||
}
|
||||
|
||||
type StartupProgress interface {
|
||||
AddShard()
|
||||
CompletedShard()
|
||||
}
|
||||
|
||||
// Server represents a container for the metadata and storage data and services.
|
||||
// It is built using a Config and it manages the startup and shutdown of all
|
||||
// services in the proper order.
|
||||
|
@ -96,6 +101,8 @@ type Server struct {
|
|||
|
||||
Monitor *monitor.Monitor
|
||||
|
||||
StartupProgressMetrics StartupProgress
|
||||
|
||||
// Server reporting and registration
|
||||
reportingDisabled bool
|
||||
|
||||
|
@ -279,6 +286,10 @@ func (s *Server) SetLogOutput(w io.Writer) {
|
|||
s.MuxLogger = tcp.MuxLogger(w)
|
||||
}
|
||||
|
||||
func (s *Server) SetStartupMetrics(sp StartupProgress) {
|
||||
s.StartupProgressMetrics = sp
|
||||
}
|
||||
|
||||
func (s *Server) appendMonitorService() {
|
||||
s.Services = append(s.Services, s.Monitor)
|
||||
}
|
||||
|
@ -465,6 +476,9 @@ func (s *Server) Open() error {
|
|||
s.MetaClient.WithLogger(s.Logger)
|
||||
}
|
||||
s.TSDBStore.WithLogger(s.Logger)
|
||||
|
||||
s.TSDBStore.WithStartupMetrics(s.StartupProgressMetrics)
|
||||
|
||||
if s.config.Data.QueryLogEnabled {
|
||||
s.QueryExecutor.WithLogger(s.Logger)
|
||||
} else if s.config.Coordinator.LogQueriesAfter > 0 || s.config.Coordinator.LogTimedOutQueries {
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
package run
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type StartupProgressLogger struct {
|
||||
shardsCompleted atomic.Uint64
|
||||
shardsTotal atomic.Uint64
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
func NewStartupProgressLogger(logger *zap.Logger) *StartupProgressLogger {
|
||||
return &StartupProgressLogger{
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StartupProgressLogger) AddShard() {
|
||||
s.shardsTotal.Add(1)
|
||||
}
|
||||
|
||||
func (s *StartupProgressLogger) CompletedShard() {
|
||||
shardsCompleted := s.shardsCompleted.Add(1)
|
||||
totalShards := s.shardsTotal.Load()
|
||||
|
||||
percentShards := float64(shardsCompleted) / float64(totalShards) * 100
|
||||
s.logger.Info(fmt.Sprintf("Finished loading shard, current progress %.1f%% shards (%d / %d).", percentShards, shardsCompleted, totalShards))
|
||||
}
|
319
tsdb/store.go
319
tsdb/store.go
|
@ -54,6 +54,12 @@ const SeriesFileDirectory = "_series"
|
|||
// databaseState keeps track of the state of a database.
|
||||
type databaseState struct{ indexTypes map[string]int }
|
||||
|
||||
// struct to hold the result of opening each reader in a goroutine
|
||||
type shardResponse struct {
|
||||
s *Shard
|
||||
err error
|
||||
}
|
||||
|
||||
// addIndexType records that the database has a shard with the given index type.
|
||||
func (d *databaseState) addIndexType(indexType string) {
|
||||
if d.indexTypes == nil {
|
||||
|
@ -135,6 +141,11 @@ type Store struct {
|
|||
baseLogger *zap.Logger
|
||||
Logger *zap.Logger
|
||||
|
||||
startupProgressMetrics interface {
|
||||
AddShard()
|
||||
CompletedShard()
|
||||
}
|
||||
|
||||
closing chan struct{}
|
||||
wg sync.WaitGroup
|
||||
opened bool
|
||||
|
@ -167,6 +178,13 @@ func (s *Store) WithLogger(log *zap.Logger) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Store) WithStartupMetrics(sp interface {
|
||||
AddShard()
|
||||
CompletedShard()
|
||||
}) {
|
||||
s.startupProgressMetrics = sp
|
||||
}
|
||||
|
||||
// Statistics returns statistics for period monitoring.
|
||||
func (s *Store) Statistics(tags map[string]string) []models.Statistic {
|
||||
s.mu.RLock()
|
||||
|
@ -310,12 +328,6 @@ func (s *Store) Open() error {
|
|||
}
|
||||
|
||||
func (s *Store) loadShards() error {
|
||||
// res holds the result from opening each shard in a goroutine
|
||||
type res struct {
|
||||
s *Shard
|
||||
err error
|
||||
}
|
||||
|
||||
// Limit the number of concurrent TSM files to be opened to the number of cores.
|
||||
s.EngineOptions.OpenLimiter = limiter.NewFixed(runtime.GOMAXPROCS(0))
|
||||
|
||||
|
@ -363,9 +375,8 @@ func (s *Store) loadShards() error {
|
|||
log, logEnd := logger.NewOperation(s.Logger, "Open store", "tsdb_open")
|
||||
defer logEnd()
|
||||
|
||||
shardLoaderWg := new(sync.WaitGroup)
|
||||
t := limiter.NewFixed(runtime.GOMAXPROCS(0))
|
||||
resC := make(chan *res)
|
||||
var n int
|
||||
|
||||
// Determine how many shards we need to open by checking the store path.
|
||||
dbDirs, err := os.ReadDir(s.path)
|
||||
|
@ -373,126 +384,155 @@ func (s *Store) loadShards() error {
|
|||
return err
|
||||
}
|
||||
|
||||
for _, db := range dbDirs {
|
||||
dbPath := filepath.Join(s.path, db.Name())
|
||||
if !db.IsDir() {
|
||||
log.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory"))
|
||||
continue
|
||||
}
|
||||
|
||||
if s.EngineOptions.DatabaseFilter != nil && !s.EngineOptions.DatabaseFilter(db.Name()) {
|
||||
log.Info("Skipping database dir", logger.Database(db.Name()), zap.String("reason", "failed database filter"))
|
||||
continue
|
||||
}
|
||||
|
||||
// Load series file.
|
||||
sfile, err := s.openSeriesFile(db.Name())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Retrieve database index.
|
||||
idx, err := s.createIndexIfNotExists(db.Name())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Load each retention policy within the database directory.
|
||||
rpDirs, err := os.ReadDir(dbPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, rp := range rpDirs {
|
||||
rpPath := filepath.Join(s.path, db.Name(), rp.Name())
|
||||
if !rp.IsDir() {
|
||||
log.Info("Skipping retention policy dir", zap.String("name", rp.Name()), zap.String("reason", "not a directory"))
|
||||
walkShardsAndProcess := func(fn func(sfile *SeriesFile, idx interface{}, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error) error {
|
||||
for _, db := range dbDirs {
|
||||
rpDirs, err := s.getRetentionPolicyDirs(db, log)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if rpDirs == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// The .series directory is not a retention policy.
|
||||
if rp.Name() == SeriesFileDirectory {
|
||||
continue
|
||||
}
|
||||
|
||||
if s.EngineOptions.RetentionPolicyFilter != nil && !s.EngineOptions.RetentionPolicyFilter(db.Name(), rp.Name()) {
|
||||
log.Info("Skipping retention policy dir", logger.RetentionPolicy(rp.Name()), zap.String("reason", "failed retention policy filter"))
|
||||
continue
|
||||
}
|
||||
|
||||
shardDirs, err := os.ReadDir(rpPath)
|
||||
// Load series file.
|
||||
sfile, err := s.openSeriesFile(db.Name())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, sh := range shardDirs {
|
||||
// Series file should not be in a retention policy but skip just in case.
|
||||
if sh.Name() == SeriesFileDirectory {
|
||||
log.Warn("Skipping series file in retention policy dir", zap.String("path", filepath.Join(s.path, db.Name(), rp.Name())))
|
||||
// Retrieve database index.
|
||||
idx, err := s.createIndexIfNotExists(db.Name())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, rp := range rpDirs {
|
||||
shardDirs, err := s.getShards(rp, db, log)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if shardDirs == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
n++
|
||||
go func(db, rp, sh string) {
|
||||
t.Take()
|
||||
defer t.Release()
|
||||
|
||||
start := time.Now()
|
||||
path := filepath.Join(s.path, db, rp, sh)
|
||||
walPath := filepath.Join(s.EngineOptions.Config.WALDir, db, rp, sh)
|
||||
|
||||
// Shard file names are numeric shardIDs
|
||||
shardID, err := strconv.ParseUint(sh, 10, 64)
|
||||
if err != nil {
|
||||
log.Info("invalid shard ID found at path", zap.String("path", path))
|
||||
resC <- &res{err: fmt.Errorf("%s is not a valid ID. Skipping shard.", sh)}
|
||||
return
|
||||
for _, sh := range shardDirs {
|
||||
// Series file should not be in a retention policy but skip just in case.
|
||||
if sh.Name() == SeriesFileDirectory {
|
||||
log.Warn("Skipping series file in retention policy dir", zap.String("path", filepath.Join(s.path, db.Name(), rp.Name())))
|
||||
continue
|
||||
}
|
||||
|
||||
if s.EngineOptions.ShardFilter != nil && !s.EngineOptions.ShardFilter(db, rp, shardID) {
|
||||
log.Info("skipping shard", zap.String("path", path), logger.Shard(shardID))
|
||||
resC <- &res{}
|
||||
return
|
||||
if err := fn(sfile, idx, sh, db, rp); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Copy options and assign shared index.
|
||||
opt := s.EngineOptions
|
||||
opt.InmemIndex = idx
|
||||
|
||||
// Provide an implementation of the ShardIDSets
|
||||
opt.SeriesIDSets = shardSet{store: s, db: db}
|
||||
|
||||
// Existing shards should continue to use inmem index.
|
||||
if _, err := os.Stat(filepath.Join(path, "index")); os.IsNotExist(err) {
|
||||
opt.IndexVersion = InmemIndexName
|
||||
}
|
||||
|
||||
// Open engine.
|
||||
shard := NewShard(shardID, path, walPath, sfile, opt)
|
||||
|
||||
// Disable compactions, writes and queries until all shards are loaded
|
||||
shard.EnableOnOpen = false
|
||||
shard.CompactionDisabled = s.EngineOptions.CompactionDisabled
|
||||
shard.WithLogger(s.baseLogger)
|
||||
|
||||
err = s.OpenShard(shard, false)
|
||||
if err != nil {
|
||||
log.Error("Failed to open shard", logger.Shard(shardID), zap.Error(err))
|
||||
resC <- &res{err: fmt.Errorf("failed to open shard: %d: %w", shardID, err)}
|
||||
return
|
||||
}
|
||||
|
||||
resC <- &res{s: shard}
|
||||
log.Info("Opened shard", zap.String("index_version", shard.IndexType()), zap.String("path", path), zap.Duration("duration", time.Since(start)))
|
||||
}(db.Name(), rp.Name(), sh.Name())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// We use `rawShardCount` as a buffer size for channel creation below.
|
||||
// If there is no startupProgressMetrics count then this will be 0 creating a
|
||||
// zero buffer channel.
|
||||
rawShardCount := 0
|
||||
if s.startupProgressMetrics != nil {
|
||||
err := walkShardsAndProcess(func(sfile *SeriesFile, idx interface{}, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error {
|
||||
rawShardCount++
|
||||
s.startupProgressMetrics.AddShard()
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Gather results of opening shards concurrently, keeping track of how
|
||||
// many databases we are managing.
|
||||
for i := 0; i < n; i++ {
|
||||
res := <-resC
|
||||
shardResC := make(chan *shardResponse, rawShardCount)
|
||||
err = walkShardsAndProcess(func(sfile *SeriesFile, idx interface{}, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error {
|
||||
shardLoaderWg.Add(1)
|
||||
|
||||
go func(db, rp, sh string) {
|
||||
defer shardLoaderWg.Done()
|
||||
|
||||
t.Take()
|
||||
defer t.Release()
|
||||
|
||||
start := time.Now()
|
||||
path := filepath.Join(s.path, db, rp, sh)
|
||||
walPath := filepath.Join(s.EngineOptions.Config.WALDir, db, rp, sh)
|
||||
|
||||
// Shard file names are numeric shardIDs
|
||||
shardID, err := strconv.ParseUint(sh, 10, 64)
|
||||
if err != nil {
|
||||
log.Info("invalid shard ID found at path", zap.String("path", path))
|
||||
shardResC <- &shardResponse{err: fmt.Errorf("%s is not a valid ID. Skipping shard.", sh)}
|
||||
if s.startupProgressMetrics != nil {
|
||||
s.startupProgressMetrics.CompletedShard()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if s.EngineOptions.ShardFilter != nil && !s.EngineOptions.ShardFilter(db, rp, shardID) {
|
||||
log.Info("skipping shard", zap.String("path", path), logger.Shard(shardID))
|
||||
shardResC <- &shardResponse{}
|
||||
if s.startupProgressMetrics != nil {
|
||||
s.startupProgressMetrics.CompletedShard()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Copy options and assign shared index.
|
||||
opt := s.EngineOptions
|
||||
opt.InmemIndex = idx
|
||||
|
||||
// Provide an implementation of the ShardIDSets
|
||||
opt.SeriesIDSets = shardSet{store: s, db: db}
|
||||
|
||||
// Existing shards should continue to use inmem index.
|
||||
if _, err := os.Stat(filepath.Join(path, "index")); os.IsNotExist(err) {
|
||||
opt.IndexVersion = InmemIndexName
|
||||
}
|
||||
|
||||
// Open engine.
|
||||
shard := NewShard(shardID, path, walPath, sfile, opt)
|
||||
|
||||
// Disable compactions, writes and queries until all shards are loaded
|
||||
shard.EnableOnOpen = false
|
||||
shard.CompactionDisabled = s.EngineOptions.CompactionDisabled
|
||||
shard.WithLogger(s.baseLogger)
|
||||
|
||||
err = s.OpenShard(shard, false)
|
||||
if err != nil {
|
||||
log.Error("Failed to open shard", logger.Shard(shardID), zap.Error(err))
|
||||
shardResC <- &shardResponse{err: fmt.Errorf("failed to open shard: %d: %w", shardID, err)}
|
||||
if s.startupProgressMetrics != nil {
|
||||
s.startupProgressMetrics.CompletedShard()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
shardResC <- &shardResponse{s: shard}
|
||||
log.Info("Opened shard", zap.String("index_version", shard.IndexType()), zap.String("path", path), zap.Duration("duration", time.Since(start)))
|
||||
if s.startupProgressMetrics != nil {
|
||||
s.startupProgressMetrics.CompletedShard()
|
||||
}
|
||||
}(db.Name(), rp.Name(), sh.Name())
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err := s.enableShards(shardLoaderWg, shardResC); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) enableShards(wg *sync.WaitGroup, resC chan *shardResponse) error {
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(resC)
|
||||
}()
|
||||
|
||||
for res := range resC {
|
||||
if res.s == nil || res.err != nil {
|
||||
continue
|
||||
}
|
||||
|
@ -503,7 +543,6 @@ func (s *Store) loadShards() error {
|
|||
}
|
||||
s.databases[res.s.database].addIndexType(res.s.IndexType())
|
||||
}
|
||||
close(resC)
|
||||
|
||||
// Check if any databases are running multiple index types.
|
||||
for db, state := range s.databases {
|
||||
|
@ -1182,12 +1221,8 @@ func byIndexType(name string) ShardPredicate {
|
|||
// concurrent use. If any of the functions return an error, the first error is
|
||||
// returned.
|
||||
func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error {
|
||||
// struct to hold the result of opening each reader in a goroutine
|
||||
type res struct {
|
||||
err error
|
||||
}
|
||||
|
||||
resC := make(chan res)
|
||||
resC := make(chan shardResponse, len(shards))
|
||||
var n int
|
||||
|
||||
for _, sh := range shards {
|
||||
|
@ -1195,11 +1230,11 @@ func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error {
|
|||
|
||||
go func(sh *Shard) {
|
||||
if err := fn(sh); err != nil {
|
||||
resC <- res{err: fmt.Errorf("shard %d: %s", sh.id, err)}
|
||||
resC <- shardResponse{err: fmt.Errorf("shard %d: %s", sh.id, err)}
|
||||
return
|
||||
}
|
||||
|
||||
resC <- res{}
|
||||
resC <- shardResponse{}
|
||||
}(sh)
|
||||
}
|
||||
|
||||
|
@ -2367,3 +2402,49 @@ func (s shardSet) ForEach(f func(ids *SeriesIDSet)) error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) getRetentionPolicyDirs(db os.DirEntry, log *zap.Logger) ([]os.DirEntry, error) {
|
||||
dbPath := filepath.Join(s.path, db.Name())
|
||||
if !db.IsDir() {
|
||||
log.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory"))
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if s.EngineOptions.DatabaseFilter != nil && !s.EngineOptions.DatabaseFilter(db.Name()) {
|
||||
log.Info("Skipping database dir", logger.Database(db.Name()), zap.String("reason", "failed database filter"))
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Load each retention policy within the database directory.
|
||||
rpDirs, err := os.ReadDir(dbPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return rpDirs, nil
|
||||
}
|
||||
|
||||
func (s *Store) getShards(rpDir os.DirEntry, dbDir os.DirEntry, log *zap.Logger) ([]os.DirEntry, error) {
|
||||
rpPath := filepath.Join(s.path, dbDir.Name(), rpDir.Name())
|
||||
if !rpDir.IsDir() {
|
||||
log.Info("Skipping retention policy dir", zap.String("name", rpDir.Name()), zap.String("reason", "not a directory"))
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// The .series directory is not a retention policy.
|
||||
if rpDir.Name() == SeriesFileDirectory {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if s.EngineOptions.RetentionPolicyFilter != nil && !s.EngineOptions.RetentionPolicyFilter(dbDir.Name(), rpDir.Name()) {
|
||||
log.Info("Skipping retention policy dir", logger.RetentionPolicy(rpDir.Name()), zap.String("reason", "failed retention policy filter"))
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
shardDirs, err := os.ReadDir(rpPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return shardDirs, nil
|
||||
}
|
||||
|
|
|
@ -5,6 +5,8 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"go.uber.org/zap/zaptest"
|
||||
"log"
|
||||
"math"
|
||||
"math/rand"
|
||||
"os"
|
||||
|
@ -19,7 +21,6 @@ import (
|
|||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/influxdata/influxdb/internal"
|
||||
"github.com/influxdata/influxdb/logger"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/pkg/deep"
|
||||
"github.com/influxdata/influxdb/pkg/slices"
|
||||
|
@ -36,7 +37,7 @@ func TestStore_DeleteRetentionPolicy(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
test := func(index string) {
|
||||
s := MustOpenStore(index)
|
||||
s := MustOpenStore(t, index)
|
||||
defer s.Close()
|
||||
|
||||
// Create a new shard and verify that it exists.
|
||||
|
@ -87,7 +88,7 @@ func TestStore_DeleteRetentionPolicy(t *testing.T) {
|
|||
}
|
||||
|
||||
// Reopen other shard and check it still exists.
|
||||
if err := s.Reopen(); err != nil {
|
||||
if err := s.Reopen(t); err != nil {
|
||||
t.Error(err)
|
||||
} else if sh := s.Shard(3); sh == nil {
|
||||
t.Errorf("shard 3 does not exist")
|
||||
|
@ -112,7 +113,7 @@ func TestStore_CreateShard(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
test := func(index string) {
|
||||
s := MustOpenStore(index)
|
||||
s := MustOpenStore(t, index)
|
||||
defer s.Close()
|
||||
|
||||
// Create a new shard and verify that it exists.
|
||||
|
@ -130,7 +131,7 @@ func TestStore_CreateShard(t *testing.T) {
|
|||
}
|
||||
|
||||
// Reopen shard and recheck.
|
||||
if err := s.Reopen(); err != nil {
|
||||
if err := s.Reopen(t); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if sh := s.Shard(1); sh == nil {
|
||||
t.Fatalf("expected shard(1)")
|
||||
|
@ -144,12 +145,102 @@ func TestStore_CreateShard(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure the store can create a new shard.
|
||||
func TestStore_StartupShardProgress(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
test := func(index string) {
|
||||
s := MustOpenStore(t, index)
|
||||
defer s.Close()
|
||||
|
||||
// Create a new shard and verify that it exists.
|
||||
require.NoError(t, s.CreateShard("db0", "rp0", 1, true))
|
||||
sh := s.Shard(1)
|
||||
require.NotNil(t, sh)
|
||||
|
||||
// Create another shard and verify that it exists.
|
||||
require.NoError(t, s.CreateShard("db0", "rp0", 2, true))
|
||||
sh = s.Shard(2)
|
||||
require.NotNil(t, sh)
|
||||
|
||||
msl := &mockStartupLogger{}
|
||||
|
||||
// Reopen shard and recheck.
|
||||
require.NoError(t, s.Reopen(t, WithStartupMetrics(msl)))
|
||||
|
||||
// Equality check to make sure shards are always added prior to
|
||||
// completion being called. This test opens 3 total shards - 1 shard
|
||||
// fails, but we still want to track that it was attempted to be opened.
|
||||
require.Equal(t, msl.shardTracker, []string{
|
||||
"shard-add",
|
||||
"shard-add",
|
||||
"shard-complete",
|
||||
"shard-complete",
|
||||
})
|
||||
}
|
||||
|
||||
for _, index := range tsdb.RegisteredIndexes() {
|
||||
t.Run(index, func(t *testing.T) { test(index) })
|
||||
}
|
||||
}
|
||||
|
||||
// Introduces a test to ensure that shard loading still accounts for bad shards. We still want these to show up
|
||||
// during the initial shard loading even though its in a error state.
|
||||
func TestStore_BadShardLoading(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
test := func(index string) {
|
||||
s := MustOpenStore(t, index)
|
||||
defer s.Close()
|
||||
|
||||
// Create a new shard and verify that it exists.
|
||||
require.NoError(t, s.CreateShard("db0", "rp0", 1, true))
|
||||
sh := s.Shard(1)
|
||||
require.NotNil(t, sh)
|
||||
|
||||
// Create another shard and verify that it exists.
|
||||
require.NoError(t, s.CreateShard("db0", "rp0", 2, true))
|
||||
sh = s.Shard(2)
|
||||
require.NotNil(t, sh)
|
||||
|
||||
// Create another shard and verify that it exists.
|
||||
require.NoError(t, s.CreateShard("db0", "rp0", 3, true))
|
||||
sh = s.Shard(3)
|
||||
require.NotNil(t, sh)
|
||||
|
||||
s.SetShardOpenErrorForTest(sh.ID(), errors.New("a shard opening error occurred"))
|
||||
err2 := s.OpenShard(s.Shard(sh.ID()), false)
|
||||
require.Error(t, err2, "no error opening bad shard")
|
||||
|
||||
msl := &mockStartupLogger{}
|
||||
|
||||
// Reopen shard and recheck.
|
||||
require.NoError(t, s.Reopen(t, WithStartupMetrics(msl)))
|
||||
|
||||
// Equality check to make sure shards are always added prior to
|
||||
// completion being called. This test opens 3 total shards - 1 shard
|
||||
// fails, but we still want to track that it was attempted to be opened.
|
||||
require.Equal(t, msl.shardTracker, []string{
|
||||
"shard-add",
|
||||
"shard-add",
|
||||
"shard-add",
|
||||
"shard-complete",
|
||||
"shard-complete",
|
||||
"shard-complete",
|
||||
})
|
||||
}
|
||||
|
||||
for _, index := range tsdb.RegisteredIndexes() {
|
||||
t.Run(index, func(t *testing.T) { test(index) })
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_BadShard(t *testing.T) {
|
||||
const errStr = "a shard open error"
|
||||
indexes := tsdb.RegisteredIndexes()
|
||||
for _, idx := range indexes {
|
||||
func() {
|
||||
s := MustOpenStore(idx)
|
||||
s := MustOpenStore(t, idx)
|
||||
defer require.NoErrorf(t, s.Close(), "closing store with index type: %s", idx)
|
||||
|
||||
sh := tsdb.NewTempShard(idx)
|
||||
|
@ -175,7 +266,7 @@ func TestStore_CreateMixedShards(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
test := func(index1 string, index2 string) {
|
||||
s := MustOpenStore(index1)
|
||||
s := MustOpenStore(t, index1)
|
||||
defer s.Close()
|
||||
|
||||
// Create a new shard and verify that it exists.
|
||||
|
@ -187,7 +278,7 @@ func TestStore_CreateMixedShards(t *testing.T) {
|
|||
|
||||
s.EngineOptions.IndexVersion = index2
|
||||
s.index = index2
|
||||
if err := s.Reopen(); err != nil {
|
||||
if err := s.Reopen(t); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
@ -199,7 +290,7 @@ func TestStore_CreateMixedShards(t *testing.T) {
|
|||
}
|
||||
|
||||
// Reopen shard and recheck.
|
||||
if err := s.Reopen(); err != nil {
|
||||
if err := s.Reopen(t); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if sh := s.Shard(1); sh == nil {
|
||||
t.Fatalf("expected shard(1)")
|
||||
|
@ -231,7 +322,7 @@ func TestStore_DropMeasurementMixedShards(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
test := func(index1 string, index2 string) {
|
||||
s := MustOpenStore(index1)
|
||||
s := MustOpenStore(t, index1)
|
||||
defer s.Close()
|
||||
|
||||
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
|
||||
|
@ -242,7 +333,7 @@ func TestStore_DropMeasurementMixedShards(t *testing.T) {
|
|||
|
||||
s.EngineOptions.IndexVersion = index2
|
||||
s.index = index2
|
||||
if err := s.Reopen(); err != nil {
|
||||
if err := s.Reopen(t); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
@ -276,7 +367,7 @@ func TestStore_DropConcurrentWriteMultipleShards(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
test := func(index string) {
|
||||
s := MustOpenStore(index)
|
||||
s := MustOpenStore(t, index)
|
||||
defer s.Close()
|
||||
|
||||
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
|
||||
|
@ -340,7 +431,7 @@ func TestStore_WriteMixedShards(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
test := func(index1 string, index2 string) {
|
||||
s := MustOpenStore(index1)
|
||||
s := MustOpenStore(t, index1)
|
||||
defer s.Close()
|
||||
|
||||
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
|
||||
|
@ -351,7 +442,7 @@ func TestStore_WriteMixedShards(t *testing.T) {
|
|||
|
||||
s.EngineOptions.IndexVersion = index2
|
||||
s.index = index2
|
||||
if err := s.Reopen(); err != nil {
|
||||
if err := s.Reopen(t); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
@ -413,7 +504,7 @@ func TestStore_DeleteSeries_NonExistentDB(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
test := func(index string) {
|
||||
s := MustOpenStore(index)
|
||||
s := MustOpenStore(t, index)
|
||||
defer s.Close()
|
||||
|
||||
if err := s.DeleteSeries("db0", nil, nil); err != nil {
|
||||
|
@ -431,7 +522,7 @@ func TestStore_DeleteSeries_MultipleSources(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
test := func(index string) {
|
||||
s := MustOpenStore(index)
|
||||
s := MustOpenStore(t, index)
|
||||
defer s.Close()
|
||||
|
||||
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
|
||||
|
@ -461,7 +552,7 @@ func TestStore_DeleteShard(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
test := func(index string) error {
|
||||
s := MustOpenStore(index)
|
||||
s := MustOpenStore(t, index)
|
||||
defer s.Close()
|
||||
|
||||
// Create a new shard and verify that it exists.
|
||||
|
@ -493,7 +584,7 @@ func TestStore_DeleteShard(t *testing.T) {
|
|||
s.MustWriteToShardString(3, "cpu,serverb=b v=1")
|
||||
|
||||
// Reopen the store and check all shards still exist
|
||||
if err := s.Reopen(); err != nil {
|
||||
if err := s.Reopen(t); err != nil {
|
||||
return err
|
||||
}
|
||||
for i := uint64(1); i <= 3; i++ {
|
||||
|
@ -550,7 +641,7 @@ func TestStore_CreateShardSnapShot(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
test := func(index string) {
|
||||
s := MustOpenStore(index)
|
||||
s := MustOpenStore(t, index)
|
||||
defer s.Close()
|
||||
|
||||
// Create a new shard and verify that it exists.
|
||||
|
@ -578,7 +669,7 @@ func TestStore_Open(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
test := func(index string) {
|
||||
s := NewStore(index)
|
||||
s := NewStore(t, index)
|
||||
defer s.Close()
|
||||
|
||||
if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp0", "2"), 0777); err != nil {
|
||||
|
@ -621,9 +712,14 @@ func TestStore_Open_InvalidDatabaseFile(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
test := func(index string) {
|
||||
s := NewStore(index)
|
||||
s := NewStore(t, index)
|
||||
defer s.Close()
|
||||
|
||||
// Ensure the directory exists before creating the file.
|
||||
if err := os.MkdirAll(s.Path(), 0777); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Create a file instead of a directory for a database.
|
||||
if _, err := os.Create(filepath.Join(s.Path(), "db0")); err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -647,7 +743,7 @@ func TestStore_Open_InvalidRetentionPolicy(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
test := func(index string) {
|
||||
s := NewStore(index)
|
||||
s := NewStore(t, index)
|
||||
defer s.Close()
|
||||
|
||||
// Create an RP file instead of a directory.
|
||||
|
@ -677,7 +773,7 @@ func TestStore_Open_InvalidShard(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
test := func(index string) {
|
||||
s := NewStore(index)
|
||||
s := NewStore(t, index)
|
||||
defer s.Close()
|
||||
|
||||
// Create a non-numeric shard file.
|
||||
|
@ -707,7 +803,7 @@ func TestShards_CreateIterator(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
test := func(index string) {
|
||||
s := MustOpenStore(index)
|
||||
s := MustOpenStore(t, index)
|
||||
defer s.Close()
|
||||
|
||||
// Create shard #0 with data.
|
||||
|
@ -792,7 +888,7 @@ func TestStore_NewReadersBlocked(t *testing.T) {
|
|||
|
||||
test := func(index string) {
|
||||
t.Helper()
|
||||
s := MustOpenStore(index)
|
||||
s := MustOpenStore(t, index)
|
||||
defer s.Close()
|
||||
|
||||
shardInUse := func(shardID uint64) bool {
|
||||
|
@ -865,7 +961,7 @@ func TestStore_NewReadersBlocked(t *testing.T) {
|
|||
// Ensure the store can backup a shard and another store can restore it.
|
||||
func TestStore_BackupRestoreShard(t *testing.T) {
|
||||
test := func(index string) {
|
||||
s0, s1 := MustOpenStore(index), MustOpenStore(index)
|
||||
s0, s1 := MustOpenStore(t, index), MustOpenStore(t, index)
|
||||
defer s0.Close()
|
||||
defer s1.Close()
|
||||
|
||||
|
@ -876,7 +972,7 @@ func TestStore_BackupRestoreShard(t *testing.T) {
|
|||
`cpu value=3 20`,
|
||||
)
|
||||
|
||||
if err := s0.Reopen(); err != nil {
|
||||
if err := s0.Reopen(t); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
@ -946,7 +1042,7 @@ func TestStore_Shard_SeriesN(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
test := func(index string) error {
|
||||
s := MustOpenStore(index)
|
||||
s := MustOpenStore(t, index)
|
||||
defer s.Close()
|
||||
|
||||
// Create shard with data.
|
||||
|
@ -982,7 +1078,7 @@ func TestStore_MeasurementNames_Deduplicate(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
test := func(index string) {
|
||||
s := MustOpenStore(index)
|
||||
s := MustOpenStore(t, index)
|
||||
defer s.Close()
|
||||
|
||||
// Create shard with data.
|
||||
|
@ -1083,7 +1179,7 @@ func TestStore_Cardinality_Tombstoning(t *testing.T) {
|
|||
}
|
||||
|
||||
test := func(index string) {
|
||||
store := NewStore(index)
|
||||
store := NewStore(t, index)
|
||||
if err := store.Open(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -1148,7 +1244,7 @@ func TestStore_Cardinality_Unique(t *testing.T) {
|
|||
}
|
||||
|
||||
test := func(index string) {
|
||||
store := NewStore(index)
|
||||
store := NewStore(t, index)
|
||||
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
|
||||
if err := store.Open(); err != nil {
|
||||
panic(err)
|
||||
|
@ -1230,7 +1326,7 @@ func TestStore_Cardinality_Duplicates(t *testing.T) {
|
|||
}
|
||||
|
||||
test := func(index string) {
|
||||
store := NewStore(index)
|
||||
store := NewStore(t, index)
|
||||
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
|
||||
if err := store.Open(); err != nil {
|
||||
panic(err)
|
||||
|
@ -1250,7 +1346,7 @@ func TestStore_MetaQuery_Timeout(t *testing.T) {
|
|||
}
|
||||
|
||||
test := func(index string) {
|
||||
store := NewStore(index)
|
||||
store := NewStore(t, index)
|
||||
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
|
||||
if err := store.Open(); err != nil {
|
||||
panic(err)
|
||||
|
@ -1442,7 +1538,7 @@ func TestStore_Cardinality_Compactions(t *testing.T) {
|
|||
}
|
||||
|
||||
test := func(index string) error {
|
||||
store := NewStore(index)
|
||||
store := NewStore(t, index)
|
||||
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
|
||||
if err := store.Open(); err != nil {
|
||||
panic(err)
|
||||
|
@ -1467,7 +1563,7 @@ func TestStore_Cardinality_Limit_On_InMem_Index(t *testing.T) {
|
|||
t.Skip("Skipping test in short, race and appveyor mode.")
|
||||
}
|
||||
|
||||
store := NewStore("inmem")
|
||||
store := NewStore(t, "inmem")
|
||||
store.EngineOptions.Config.MaxSeriesPerDatabase = 100000
|
||||
if err := store.Open(); err != nil {
|
||||
panic(err)
|
||||
|
@ -1581,7 +1677,7 @@ func TestStore_Sketches(t *testing.T) {
|
|||
}
|
||||
|
||||
test := func(index string) error {
|
||||
store := MustOpenStore(index)
|
||||
store := MustOpenStore(t, index)
|
||||
defer store.Close()
|
||||
|
||||
// Generate point data to write to the shards.
|
||||
|
@ -1610,7 +1706,7 @@ func TestStore_Sketches(t *testing.T) {
|
|||
}
|
||||
|
||||
// Reopen the store.
|
||||
if err := store.Reopen(); err != nil {
|
||||
if err := store.Reopen(t); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -1643,7 +1739,7 @@ func TestStore_Sketches(t *testing.T) {
|
|||
}
|
||||
|
||||
// Reopen the store.
|
||||
if err := store.Reopen(); err != nil {
|
||||
if err := store.Reopen(t); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -1779,7 +1875,7 @@ func TestStore_TagValues(t *testing.T) {
|
|||
}
|
||||
|
||||
setup := func(index string) (*Store, []uint64) { // returns shard ids
|
||||
s := MustOpenStore(index)
|
||||
s := MustOpenStore(t, index)
|
||||
|
||||
fmtStr := `cpu1%[1]d,foo=a,ignoreme=nope,host=tv%[2]d,shard=s%[3]d value=1 %[4]d
|
||||
cpu1%[1]d,host=nofoo value=1 %[4]d
|
||||
|
@ -1830,7 +1926,7 @@ func TestStore_Measurements_Auth(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
test := func(index string) error {
|
||||
s := MustOpenStore(index)
|
||||
s := MustOpenStore(t, index)
|
||||
defer s.Close()
|
||||
|
||||
// Create shard #0 with data.
|
||||
|
@ -1919,7 +2015,7 @@ func TestStore_TagKeys_Auth(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
test := func(index string) error {
|
||||
s := MustOpenStore(index)
|
||||
s := MustOpenStore(t, index)
|
||||
defer s.Close()
|
||||
|
||||
// Create shard #0 with data.
|
||||
|
@ -2017,7 +2113,7 @@ func TestStore_TagValues_Auth(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
test := func(index string) error {
|
||||
s := MustOpenStore(index)
|
||||
s := MustOpenStore(t, index)
|
||||
defer s.Close()
|
||||
|
||||
// Create shard #0 with data.
|
||||
|
@ -2148,7 +2244,7 @@ func createTagValues(mname string, kvs map[string][]string) tsdb.TagValues {
|
|||
|
||||
func TestStore_MeasurementNames_ConcurrentDropShard(t *testing.T) {
|
||||
for _, index := range tsdb.RegisteredIndexes() {
|
||||
s := MustOpenStore(index)
|
||||
s := MustOpenStore(t, index)
|
||||
defer s.Close()
|
||||
|
||||
shardN := 10
|
||||
|
@ -2233,7 +2329,7 @@ func TestStore_MeasurementNames_ConcurrentDropShard(t *testing.T) {
|
|||
|
||||
func TestStore_TagKeys_ConcurrentDropShard(t *testing.T) {
|
||||
for _, index := range tsdb.RegisteredIndexes() {
|
||||
s := MustOpenStore(index)
|
||||
s := MustOpenStore(t, index)
|
||||
defer s.Close()
|
||||
|
||||
shardN := 10
|
||||
|
@ -2324,7 +2420,7 @@ func TestStore_TagKeys_ConcurrentDropShard(t *testing.T) {
|
|||
|
||||
func TestStore_TagValues_ConcurrentDropShard(t *testing.T) {
|
||||
for _, index := range tsdb.RegisteredIndexes() {
|
||||
s := MustOpenStore(index)
|
||||
s := MustOpenStore(t, index)
|
||||
defer s.Close()
|
||||
|
||||
shardN := 10
|
||||
|
@ -2428,7 +2524,7 @@ func TestStore_TagValues_ConcurrentDropShard(t *testing.T) {
|
|||
|
||||
func BenchmarkStore_SeriesCardinality_100_Shards(b *testing.B) {
|
||||
for _, index := range tsdb.RegisteredIndexes() {
|
||||
store := NewStore(index)
|
||||
store := NewStore(b, index)
|
||||
if err := store.Open(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -2459,7 +2555,7 @@ func BenchmarkStoreOpen_200KSeries_100Shards(b *testing.B) { benchmarkStoreOpen(
|
|||
func benchmarkStoreOpen(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt, shardCnt int) {
|
||||
var store *Store
|
||||
setup := func(index string) error {
|
||||
store := MustOpenStore(index)
|
||||
store := MustOpenStore(b, index)
|
||||
|
||||
// Generate test series (measurements + unique tag sets).
|
||||
series := genTestSeries(mCnt, tkCnt, tvCnt)
|
||||
|
@ -2530,7 +2626,7 @@ func BenchmarkStore_TagValues(b *testing.B) {
|
|||
}
|
||||
|
||||
setup := func(shards, measurements, tagValues int, index string, useRandom bool) (*Store, []uint64) { // returns shard ids
|
||||
s := NewStore(index)
|
||||
s := NewStore(b, index)
|
||||
if err := s.Open(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -2632,23 +2728,46 @@ func BenchmarkStore_TagValues(b *testing.B) {
|
|||
// Store is a test wrapper for tsdb.Store.
|
||||
type Store struct {
|
||||
*tsdb.Store
|
||||
index string
|
||||
path string
|
||||
index string
|
||||
walPath string
|
||||
opts []StoreOption
|
||||
}
|
||||
|
||||
type StoreOption func(s *Store) error
|
||||
|
||||
func WithStartupMetrics(sm *mockStartupLogger) StoreOption {
|
||||
return func(s *Store) error {
|
||||
s.WithStartupMetrics(sm)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// NewStore returns a new instance of Store with a temporary path.
|
||||
func NewStore(index string) *Store {
|
||||
path, err := os.MkdirTemp("", "influxdb-tsdb-")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
func NewStore(tb testing.TB, index string, opts ...StoreOption) *Store {
|
||||
tb.Helper()
|
||||
|
||||
// The WAL directory must not be rooted under the data path. Otherwise reopening
|
||||
// the store will generate series indices for the WAL directories.
|
||||
rootPath := tb.TempDir()
|
||||
path := filepath.Join(rootPath, "data")
|
||||
walPath := filepath.Join(rootPath, "wal")
|
||||
|
||||
s := &Store{
|
||||
Store: tsdb.NewStore(path),
|
||||
path: path,
|
||||
index: index,
|
||||
walPath: walPath,
|
||||
opts: opts,
|
||||
}
|
||||
|
||||
s := &Store{Store: tsdb.NewStore(path), index: index}
|
||||
s.EngineOptions.IndexVersion = index
|
||||
s.EngineOptions.Config.WALDir = filepath.Join(path, "wal")
|
||||
s.EngineOptions.Config.WALDir = walPath
|
||||
s.EngineOptions.Config.TraceLoggingEnabled = true
|
||||
s.WithLogger(zaptest.NewLogger(tb))
|
||||
|
||||
if testing.Verbose() {
|
||||
s.WithLogger(logger.New(os.Stdout))
|
||||
for _, o := range s.opts {
|
||||
err := o(s)
|
||||
require.NoError(tb, err)
|
||||
}
|
||||
|
||||
return s
|
||||
|
@ -2656,8 +2775,8 @@ func NewStore(index string) *Store {
|
|||
|
||||
// MustOpenStore returns a new, open Store using the specified index,
|
||||
// at a temporary path.
|
||||
func MustOpenStore(index string) *Store {
|
||||
s := NewStore(index)
|
||||
func MustOpenStore(tb testing.TB, index string, opts ...StoreOption) *Store {
|
||||
s := NewStore(tb, index, opts...)
|
||||
|
||||
if err := s.Open(); err != nil {
|
||||
panic(err)
|
||||
|
@ -2665,27 +2784,44 @@ func MustOpenStore(index string) *Store {
|
|||
return s
|
||||
}
|
||||
|
||||
// Reopen closes and reopens the store as a new store.
|
||||
func (s *Store) Reopen() error {
|
||||
if err := s.Store.Close(); err != nil {
|
||||
return err
|
||||
func (s *Store) Reopen(tb testing.TB, newOpts ...StoreOption) error {
|
||||
tb.Helper()
|
||||
|
||||
if s.Store != nil {
|
||||
if err := s.Store.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
s.Store = tsdb.NewStore(s.Path())
|
||||
s.Store = tsdb.NewStore(s.path)
|
||||
s.EngineOptions.IndexVersion = s.index
|
||||
s.EngineOptions.Config.WALDir = filepath.Join(s.Path(), "wal")
|
||||
s.EngineOptions.Config.WALDir = s.walPath
|
||||
s.EngineOptions.Config.TraceLoggingEnabled = true
|
||||
|
||||
if testing.Verbose() {
|
||||
s.WithLogger(logger.New(os.Stdout))
|
||||
s.WithLogger(zaptest.NewLogger(tb))
|
||||
if len(newOpts) > 0 {
|
||||
s.opts = newOpts
|
||||
}
|
||||
|
||||
for _, o := range s.opts {
|
||||
err := o(s)
|
||||
require.NoError(tb, err)
|
||||
}
|
||||
|
||||
return s.Store.Open()
|
||||
}
|
||||
|
||||
// Close closes the store and removes the underlying data.
|
||||
func (s *Store) Close() error {
|
||||
defer os.RemoveAll(s.Path())
|
||||
return s.Store.Close()
|
||||
defer func(path string) {
|
||||
err := os.RemoveAll(path)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}(s.path)
|
||||
if s.Store != nil {
|
||||
return s.Store.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// MustCreateShardWithData creates a shard and writes line protocol data to it.
|
||||
|
@ -2754,3 +2890,20 @@ func dirExists(path string) bool {
|
|||
}
|
||||
return !os.IsNotExist(err)
|
||||
}
|
||||
|
||||
type mockStartupLogger struct {
|
||||
shardTracker []string
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func (m *mockStartupLogger) AddShard() {
|
||||
m.mu.Lock()
|
||||
m.shardTracker = append(m.shardTracker, fmt.Sprintf("shard-add"))
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
func (m *mockStartupLogger) CompletedShard() {
|
||||
m.mu.Lock()
|
||||
m.shardTracker = append(m.shardTracker, fmt.Sprintf("shard-complete"))
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue