feat(logging): Add startup logging for shard counts (#25378)
* feat(tsdb): Adds shard opening progress checks to startup This PR adds a check to see how many shards are remaining vs how many shards are opened. This change displays the percent completed too. closes influxdata/feature-requests#476pull/25472/head
parent
860a74f8a5
commit
3c87f524ed
|
@ -153,6 +153,10 @@ func (cmd *Command) Run(args ...string) error {
|
||||||
s.Logger = cmd.Logger
|
s.Logger = cmd.Logger
|
||||||
s.CPUProfile = options.CPUProfile
|
s.CPUProfile = options.CPUProfile
|
||||||
s.MemProfile = options.MemProfile
|
s.MemProfile = options.MemProfile
|
||||||
|
|
||||||
|
sl := NewStartupProgressLogger(s.Logger)
|
||||||
|
s.SetStartupMetrics(sl)
|
||||||
|
|
||||||
if err := s.Open(); err != nil {
|
if err := s.Open(); err != nil {
|
||||||
return fmt.Errorf("open server: %s", err)
|
return fmt.Errorf("open server: %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,6 +65,11 @@ type BuildInfo struct {
|
||||||
Time string
|
Time string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type StartupProgress interface {
|
||||||
|
AddShard()
|
||||||
|
CompletedShard()
|
||||||
|
}
|
||||||
|
|
||||||
// Server represents a container for the metadata and storage data and services.
|
// Server represents a container for the metadata and storage data and services.
|
||||||
// It is built using a Config and it manages the startup and shutdown of all
|
// It is built using a Config and it manages the startup and shutdown of all
|
||||||
// services in the proper order.
|
// services in the proper order.
|
||||||
|
@ -96,6 +101,8 @@ type Server struct {
|
||||||
|
|
||||||
Monitor *monitor.Monitor
|
Monitor *monitor.Monitor
|
||||||
|
|
||||||
|
StartupProgressMetrics StartupProgress
|
||||||
|
|
||||||
// Server reporting and registration
|
// Server reporting and registration
|
||||||
reportingDisabled bool
|
reportingDisabled bool
|
||||||
|
|
||||||
|
@ -279,6 +286,10 @@ func (s *Server) SetLogOutput(w io.Writer) {
|
||||||
s.MuxLogger = tcp.MuxLogger(w)
|
s.MuxLogger = tcp.MuxLogger(w)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) SetStartupMetrics(sp StartupProgress) {
|
||||||
|
s.StartupProgressMetrics = sp
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Server) appendMonitorService() {
|
func (s *Server) appendMonitorService() {
|
||||||
s.Services = append(s.Services, s.Monitor)
|
s.Services = append(s.Services, s.Monitor)
|
||||||
}
|
}
|
||||||
|
@ -465,6 +476,9 @@ func (s *Server) Open() error {
|
||||||
s.MetaClient.WithLogger(s.Logger)
|
s.MetaClient.WithLogger(s.Logger)
|
||||||
}
|
}
|
||||||
s.TSDBStore.WithLogger(s.Logger)
|
s.TSDBStore.WithLogger(s.Logger)
|
||||||
|
|
||||||
|
s.TSDBStore.WithStartupMetrics(s.StartupProgressMetrics)
|
||||||
|
|
||||||
if s.config.Data.QueryLogEnabled {
|
if s.config.Data.QueryLogEnabled {
|
||||||
s.QueryExecutor.WithLogger(s.Logger)
|
s.QueryExecutor.WithLogger(s.Logger)
|
||||||
} else if s.config.Coordinator.LogQueriesAfter > 0 || s.config.Coordinator.LogTimedOutQueries {
|
} else if s.config.Coordinator.LogQueriesAfter > 0 || s.config.Coordinator.LogTimedOutQueries {
|
||||||
|
|
|
@ -0,0 +1,32 @@
|
||||||
|
package run
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type StartupProgressLogger struct {
|
||||||
|
shardsCompleted atomic.Uint64
|
||||||
|
shardsTotal atomic.Uint64
|
||||||
|
logger *zap.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewStartupProgressLogger(logger *zap.Logger) *StartupProgressLogger {
|
||||||
|
return &StartupProgressLogger{
|
||||||
|
logger: logger,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *StartupProgressLogger) AddShard() {
|
||||||
|
s.shardsTotal.Add(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *StartupProgressLogger) CompletedShard() {
|
||||||
|
shardsCompleted := s.shardsCompleted.Add(1)
|
||||||
|
totalShards := s.shardsTotal.Load()
|
||||||
|
|
||||||
|
percentShards := float64(shardsCompleted) / float64(totalShards) * 100
|
||||||
|
s.logger.Info(fmt.Sprintf("Finished loading shard, current progress %.1f%% shards (%d / %d).", percentShards, shardsCompleted, totalShards))
|
||||||
|
}
|
319
tsdb/store.go
319
tsdb/store.go
|
@ -54,6 +54,12 @@ const SeriesFileDirectory = "_series"
|
||||||
// databaseState keeps track of the state of a database.
|
// databaseState keeps track of the state of a database.
|
||||||
type databaseState struct{ indexTypes map[string]int }
|
type databaseState struct{ indexTypes map[string]int }
|
||||||
|
|
||||||
|
// struct to hold the result of opening each reader in a goroutine
|
||||||
|
type shardResponse struct {
|
||||||
|
s *Shard
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
// addIndexType records that the database has a shard with the given index type.
|
// addIndexType records that the database has a shard with the given index type.
|
||||||
func (d *databaseState) addIndexType(indexType string) {
|
func (d *databaseState) addIndexType(indexType string) {
|
||||||
if d.indexTypes == nil {
|
if d.indexTypes == nil {
|
||||||
|
@ -135,6 +141,11 @@ type Store struct {
|
||||||
baseLogger *zap.Logger
|
baseLogger *zap.Logger
|
||||||
Logger *zap.Logger
|
Logger *zap.Logger
|
||||||
|
|
||||||
|
startupProgressMetrics interface {
|
||||||
|
AddShard()
|
||||||
|
CompletedShard()
|
||||||
|
}
|
||||||
|
|
||||||
closing chan struct{}
|
closing chan struct{}
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
opened bool
|
opened bool
|
||||||
|
@ -167,6 +178,13 @@ func (s *Store) WithLogger(log *zap.Logger) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Store) WithStartupMetrics(sp interface {
|
||||||
|
AddShard()
|
||||||
|
CompletedShard()
|
||||||
|
}) {
|
||||||
|
s.startupProgressMetrics = sp
|
||||||
|
}
|
||||||
|
|
||||||
// Statistics returns statistics for period monitoring.
|
// Statistics returns statistics for period monitoring.
|
||||||
func (s *Store) Statistics(tags map[string]string) []models.Statistic {
|
func (s *Store) Statistics(tags map[string]string) []models.Statistic {
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
|
@ -310,12 +328,6 @@ func (s *Store) Open() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) loadShards() error {
|
func (s *Store) loadShards() error {
|
||||||
// res holds the result from opening each shard in a goroutine
|
|
||||||
type res struct {
|
|
||||||
s *Shard
|
|
||||||
err error
|
|
||||||
}
|
|
||||||
|
|
||||||
// Limit the number of concurrent TSM files to be opened to the number of cores.
|
// Limit the number of concurrent TSM files to be opened to the number of cores.
|
||||||
s.EngineOptions.OpenLimiter = limiter.NewFixed(runtime.GOMAXPROCS(0))
|
s.EngineOptions.OpenLimiter = limiter.NewFixed(runtime.GOMAXPROCS(0))
|
||||||
|
|
||||||
|
@ -363,9 +375,8 @@ func (s *Store) loadShards() error {
|
||||||
log, logEnd := logger.NewOperation(s.Logger, "Open store", "tsdb_open")
|
log, logEnd := logger.NewOperation(s.Logger, "Open store", "tsdb_open")
|
||||||
defer logEnd()
|
defer logEnd()
|
||||||
|
|
||||||
|
shardLoaderWg := new(sync.WaitGroup)
|
||||||
t := limiter.NewFixed(runtime.GOMAXPROCS(0))
|
t := limiter.NewFixed(runtime.GOMAXPROCS(0))
|
||||||
resC := make(chan *res)
|
|
||||||
var n int
|
|
||||||
|
|
||||||
// Determine how many shards we need to open by checking the store path.
|
// Determine how many shards we need to open by checking the store path.
|
||||||
dbDirs, err := os.ReadDir(s.path)
|
dbDirs, err := os.ReadDir(s.path)
|
||||||
|
@ -373,126 +384,155 @@ func (s *Store) loadShards() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, db := range dbDirs {
|
walkShardsAndProcess := func(fn func(sfile *SeriesFile, idx interface{}, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error) error {
|
||||||
dbPath := filepath.Join(s.path, db.Name())
|
for _, db := range dbDirs {
|
||||||
if !db.IsDir() {
|
rpDirs, err := s.getRetentionPolicyDirs(db, log)
|
||||||
log.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory"))
|
if err != nil {
|
||||||
continue
|
return err
|
||||||
}
|
} else if rpDirs == nil {
|
||||||
|
|
||||||
if s.EngineOptions.DatabaseFilter != nil && !s.EngineOptions.DatabaseFilter(db.Name()) {
|
|
||||||
log.Info("Skipping database dir", logger.Database(db.Name()), zap.String("reason", "failed database filter"))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Load series file.
|
|
||||||
sfile, err := s.openSeriesFile(db.Name())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Retrieve database index.
|
|
||||||
idx, err := s.createIndexIfNotExists(db.Name())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Load each retention policy within the database directory.
|
|
||||||
rpDirs, err := os.ReadDir(dbPath)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, rp := range rpDirs {
|
|
||||||
rpPath := filepath.Join(s.path, db.Name(), rp.Name())
|
|
||||||
if !rp.IsDir() {
|
|
||||||
log.Info("Skipping retention policy dir", zap.String("name", rp.Name()), zap.String("reason", "not a directory"))
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// The .series directory is not a retention policy.
|
// Load series file.
|
||||||
if rp.Name() == SeriesFileDirectory {
|
sfile, err := s.openSeriesFile(db.Name())
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if s.EngineOptions.RetentionPolicyFilter != nil && !s.EngineOptions.RetentionPolicyFilter(db.Name(), rp.Name()) {
|
|
||||||
log.Info("Skipping retention policy dir", logger.RetentionPolicy(rp.Name()), zap.String("reason", "failed retention policy filter"))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
shardDirs, err := os.ReadDir(rpPath)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, sh := range shardDirs {
|
// Retrieve database index.
|
||||||
// Series file should not be in a retention policy but skip just in case.
|
idx, err := s.createIndexIfNotExists(db.Name())
|
||||||
if sh.Name() == SeriesFileDirectory {
|
if err != nil {
|
||||||
log.Warn("Skipping series file in retention policy dir", zap.String("path", filepath.Join(s.path, db.Name(), rp.Name())))
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, rp := range rpDirs {
|
||||||
|
shardDirs, err := s.getShards(rp, db, log)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
} else if shardDirs == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
n++
|
for _, sh := range shardDirs {
|
||||||
go func(db, rp, sh string) {
|
// Series file should not be in a retention policy but skip just in case.
|
||||||
t.Take()
|
if sh.Name() == SeriesFileDirectory {
|
||||||
defer t.Release()
|
log.Warn("Skipping series file in retention policy dir", zap.String("path", filepath.Join(s.path, db.Name(), rp.Name())))
|
||||||
|
continue
|
||||||
start := time.Now()
|
|
||||||
path := filepath.Join(s.path, db, rp, sh)
|
|
||||||
walPath := filepath.Join(s.EngineOptions.Config.WALDir, db, rp, sh)
|
|
||||||
|
|
||||||
// Shard file names are numeric shardIDs
|
|
||||||
shardID, err := strconv.ParseUint(sh, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
log.Info("invalid shard ID found at path", zap.String("path", path))
|
|
||||||
resC <- &res{err: fmt.Errorf("%s is not a valid ID. Skipping shard.", sh)}
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.EngineOptions.ShardFilter != nil && !s.EngineOptions.ShardFilter(db, rp, shardID) {
|
if err := fn(sfile, idx, sh, db, rp); err != nil {
|
||||||
log.Info("skipping shard", zap.String("path", path), logger.Shard(shardID))
|
return err
|
||||||
resC <- &res{}
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
// Copy options and assign shared index.
|
|
||||||
opt := s.EngineOptions
|
|
||||||
opt.InmemIndex = idx
|
|
||||||
|
|
||||||
// Provide an implementation of the ShardIDSets
|
|
||||||
opt.SeriesIDSets = shardSet{store: s, db: db}
|
|
||||||
|
|
||||||
// Existing shards should continue to use inmem index.
|
|
||||||
if _, err := os.Stat(filepath.Join(path, "index")); os.IsNotExist(err) {
|
|
||||||
opt.IndexVersion = InmemIndexName
|
|
||||||
}
|
|
||||||
|
|
||||||
// Open engine.
|
|
||||||
shard := NewShard(shardID, path, walPath, sfile, opt)
|
|
||||||
|
|
||||||
// Disable compactions, writes and queries until all shards are loaded
|
|
||||||
shard.EnableOnOpen = false
|
|
||||||
shard.CompactionDisabled = s.EngineOptions.CompactionDisabled
|
|
||||||
shard.WithLogger(s.baseLogger)
|
|
||||||
|
|
||||||
err = s.OpenShard(shard, false)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("Failed to open shard", logger.Shard(shardID), zap.Error(err))
|
|
||||||
resC <- &res{err: fmt.Errorf("failed to open shard: %d: %w", shardID, err)}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
resC <- &res{s: shard}
|
|
||||||
log.Info("Opened shard", zap.String("index_version", shard.IndexType()), zap.String("path", path), zap.Duration("duration", time.Since(start)))
|
|
||||||
}(db.Name(), rp.Name(), sh.Name())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// We use `rawShardCount` as a buffer size for channel creation below.
|
||||||
|
// If there is no startupProgressMetrics count then this will be 0 creating a
|
||||||
|
// zero buffer channel.
|
||||||
|
rawShardCount := 0
|
||||||
|
if s.startupProgressMetrics != nil {
|
||||||
|
err := walkShardsAndProcess(func(sfile *SeriesFile, idx interface{}, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error {
|
||||||
|
rawShardCount++
|
||||||
|
s.startupProgressMetrics.AddShard()
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Gather results of opening shards concurrently, keeping track of how
|
shardResC := make(chan *shardResponse, rawShardCount)
|
||||||
// many databases we are managing.
|
err = walkShardsAndProcess(func(sfile *SeriesFile, idx interface{}, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error {
|
||||||
for i := 0; i < n; i++ {
|
shardLoaderWg.Add(1)
|
||||||
res := <-resC
|
|
||||||
|
go func(db, rp, sh string) {
|
||||||
|
defer shardLoaderWg.Done()
|
||||||
|
|
||||||
|
t.Take()
|
||||||
|
defer t.Release()
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
path := filepath.Join(s.path, db, rp, sh)
|
||||||
|
walPath := filepath.Join(s.EngineOptions.Config.WALDir, db, rp, sh)
|
||||||
|
|
||||||
|
// Shard file names are numeric shardIDs
|
||||||
|
shardID, err := strconv.ParseUint(sh, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
log.Info("invalid shard ID found at path", zap.String("path", path))
|
||||||
|
shardResC <- &shardResponse{err: fmt.Errorf("%s is not a valid ID. Skipping shard.", sh)}
|
||||||
|
if s.startupProgressMetrics != nil {
|
||||||
|
s.startupProgressMetrics.CompletedShard()
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.EngineOptions.ShardFilter != nil && !s.EngineOptions.ShardFilter(db, rp, shardID) {
|
||||||
|
log.Info("skipping shard", zap.String("path", path), logger.Shard(shardID))
|
||||||
|
shardResC <- &shardResponse{}
|
||||||
|
if s.startupProgressMetrics != nil {
|
||||||
|
s.startupProgressMetrics.CompletedShard()
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy options and assign shared index.
|
||||||
|
opt := s.EngineOptions
|
||||||
|
opt.InmemIndex = idx
|
||||||
|
|
||||||
|
// Provide an implementation of the ShardIDSets
|
||||||
|
opt.SeriesIDSets = shardSet{store: s, db: db}
|
||||||
|
|
||||||
|
// Existing shards should continue to use inmem index.
|
||||||
|
if _, err := os.Stat(filepath.Join(path, "index")); os.IsNotExist(err) {
|
||||||
|
opt.IndexVersion = InmemIndexName
|
||||||
|
}
|
||||||
|
|
||||||
|
// Open engine.
|
||||||
|
shard := NewShard(shardID, path, walPath, sfile, opt)
|
||||||
|
|
||||||
|
// Disable compactions, writes and queries until all shards are loaded
|
||||||
|
shard.EnableOnOpen = false
|
||||||
|
shard.CompactionDisabled = s.EngineOptions.CompactionDisabled
|
||||||
|
shard.WithLogger(s.baseLogger)
|
||||||
|
|
||||||
|
err = s.OpenShard(shard, false)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Failed to open shard", logger.Shard(shardID), zap.Error(err))
|
||||||
|
shardResC <- &shardResponse{err: fmt.Errorf("failed to open shard: %d: %w", shardID, err)}
|
||||||
|
if s.startupProgressMetrics != nil {
|
||||||
|
s.startupProgressMetrics.CompletedShard()
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
shardResC <- &shardResponse{s: shard}
|
||||||
|
log.Info("Opened shard", zap.String("index_version", shard.IndexType()), zap.String("path", path), zap.Duration("duration", time.Since(start)))
|
||||||
|
if s.startupProgressMetrics != nil {
|
||||||
|
s.startupProgressMetrics.CompletedShard()
|
||||||
|
}
|
||||||
|
}(db.Name(), rp.Name(), sh.Name())
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err := s.enableShards(shardLoaderWg, shardResC); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) enableShards(wg *sync.WaitGroup, resC chan *shardResponse) error {
|
||||||
|
go func() {
|
||||||
|
wg.Wait()
|
||||||
|
close(resC)
|
||||||
|
}()
|
||||||
|
|
||||||
|
for res := range resC {
|
||||||
if res.s == nil || res.err != nil {
|
if res.s == nil || res.err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -503,7 +543,6 @@ func (s *Store) loadShards() error {
|
||||||
}
|
}
|
||||||
s.databases[res.s.database].addIndexType(res.s.IndexType())
|
s.databases[res.s.database].addIndexType(res.s.IndexType())
|
||||||
}
|
}
|
||||||
close(resC)
|
|
||||||
|
|
||||||
// Check if any databases are running multiple index types.
|
// Check if any databases are running multiple index types.
|
||||||
for db, state := range s.databases {
|
for db, state := range s.databases {
|
||||||
|
@ -1182,12 +1221,8 @@ func byIndexType(name string) ShardPredicate {
|
||||||
// concurrent use. If any of the functions return an error, the first error is
|
// concurrent use. If any of the functions return an error, the first error is
|
||||||
// returned.
|
// returned.
|
||||||
func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error {
|
func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error {
|
||||||
// struct to hold the result of opening each reader in a goroutine
|
|
||||||
type res struct {
|
|
||||||
err error
|
|
||||||
}
|
|
||||||
|
|
||||||
resC := make(chan res)
|
resC := make(chan shardResponse, len(shards))
|
||||||
var n int
|
var n int
|
||||||
|
|
||||||
for _, sh := range shards {
|
for _, sh := range shards {
|
||||||
|
@ -1195,11 +1230,11 @@ func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error {
|
||||||
|
|
||||||
go func(sh *Shard) {
|
go func(sh *Shard) {
|
||||||
if err := fn(sh); err != nil {
|
if err := fn(sh); err != nil {
|
||||||
resC <- res{err: fmt.Errorf("shard %d: %s", sh.id, err)}
|
resC <- shardResponse{err: fmt.Errorf("shard %d: %s", sh.id, err)}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
resC <- res{}
|
resC <- shardResponse{}
|
||||||
}(sh)
|
}(sh)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2367,3 +2402,49 @@ func (s shardSet) ForEach(f func(ids *SeriesIDSet)) error {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Store) getRetentionPolicyDirs(db os.DirEntry, log *zap.Logger) ([]os.DirEntry, error) {
|
||||||
|
dbPath := filepath.Join(s.path, db.Name())
|
||||||
|
if !db.IsDir() {
|
||||||
|
log.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory"))
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.EngineOptions.DatabaseFilter != nil && !s.EngineOptions.DatabaseFilter(db.Name()) {
|
||||||
|
log.Info("Skipping database dir", logger.Database(db.Name()), zap.String("reason", "failed database filter"))
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load each retention policy within the database directory.
|
||||||
|
rpDirs, err := os.ReadDir(dbPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rpDirs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) getShards(rpDir os.DirEntry, dbDir os.DirEntry, log *zap.Logger) ([]os.DirEntry, error) {
|
||||||
|
rpPath := filepath.Join(s.path, dbDir.Name(), rpDir.Name())
|
||||||
|
if !rpDir.IsDir() {
|
||||||
|
log.Info("Skipping retention policy dir", zap.String("name", rpDir.Name()), zap.String("reason", "not a directory"))
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// The .series directory is not a retention policy.
|
||||||
|
if rpDir.Name() == SeriesFileDirectory {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.EngineOptions.RetentionPolicyFilter != nil && !s.EngineOptions.RetentionPolicyFilter(dbDir.Name(), rpDir.Name()) {
|
||||||
|
log.Info("Skipping retention policy dir", logger.RetentionPolicy(rpDir.Name()), zap.String("reason", "failed retention policy filter"))
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
shardDirs, err := os.ReadDir(rpPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return shardDirs, nil
|
||||||
|
}
|
||||||
|
|
|
@ -5,6 +5,8 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"go.uber.org/zap/zaptest"
|
||||||
|
"log"
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
|
@ -19,7 +21,6 @@ import (
|
||||||
|
|
||||||
"github.com/davecgh/go-spew/spew"
|
"github.com/davecgh/go-spew/spew"
|
||||||
"github.com/influxdata/influxdb/internal"
|
"github.com/influxdata/influxdb/internal"
|
||||||
"github.com/influxdata/influxdb/logger"
|
|
||||||
"github.com/influxdata/influxdb/models"
|
"github.com/influxdata/influxdb/models"
|
||||||
"github.com/influxdata/influxdb/pkg/deep"
|
"github.com/influxdata/influxdb/pkg/deep"
|
||||||
"github.com/influxdata/influxdb/pkg/slices"
|
"github.com/influxdata/influxdb/pkg/slices"
|
||||||
|
@ -36,7 +37,7 @@ func TestStore_DeleteRetentionPolicy(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
test := func(index string) {
|
test := func(index string) {
|
||||||
s := MustOpenStore(index)
|
s := MustOpenStore(t, index)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
// Create a new shard and verify that it exists.
|
// Create a new shard and verify that it exists.
|
||||||
|
@ -87,7 +88,7 @@ func TestStore_DeleteRetentionPolicy(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reopen other shard and check it still exists.
|
// Reopen other shard and check it still exists.
|
||||||
if err := s.Reopen(); err != nil {
|
if err := s.Reopen(t); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
} else if sh := s.Shard(3); sh == nil {
|
} else if sh := s.Shard(3); sh == nil {
|
||||||
t.Errorf("shard 3 does not exist")
|
t.Errorf("shard 3 does not exist")
|
||||||
|
@ -112,7 +113,7 @@ func TestStore_CreateShard(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
test := func(index string) {
|
test := func(index string) {
|
||||||
s := MustOpenStore(index)
|
s := MustOpenStore(t, index)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
// Create a new shard and verify that it exists.
|
// Create a new shard and verify that it exists.
|
||||||
|
@ -130,7 +131,7 @@ func TestStore_CreateShard(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reopen shard and recheck.
|
// Reopen shard and recheck.
|
||||||
if err := s.Reopen(); err != nil {
|
if err := s.Reopen(t); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if sh := s.Shard(1); sh == nil {
|
} else if sh := s.Shard(1); sh == nil {
|
||||||
t.Fatalf("expected shard(1)")
|
t.Fatalf("expected shard(1)")
|
||||||
|
@ -144,12 +145,102 @@ func TestStore_CreateShard(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ensure the store can create a new shard.
|
||||||
|
func TestStore_StartupShardProgress(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
test := func(index string) {
|
||||||
|
s := MustOpenStore(t, index)
|
||||||
|
defer s.Close()
|
||||||
|
|
||||||
|
// Create a new shard and verify that it exists.
|
||||||
|
require.NoError(t, s.CreateShard("db0", "rp0", 1, true))
|
||||||
|
sh := s.Shard(1)
|
||||||
|
require.NotNil(t, sh)
|
||||||
|
|
||||||
|
// Create another shard and verify that it exists.
|
||||||
|
require.NoError(t, s.CreateShard("db0", "rp0", 2, true))
|
||||||
|
sh = s.Shard(2)
|
||||||
|
require.NotNil(t, sh)
|
||||||
|
|
||||||
|
msl := &mockStartupLogger{}
|
||||||
|
|
||||||
|
// Reopen shard and recheck.
|
||||||
|
require.NoError(t, s.Reopen(t, WithStartupMetrics(msl)))
|
||||||
|
|
||||||
|
// Equality check to make sure shards are always added prior to
|
||||||
|
// completion being called. This test opens 3 total shards - 1 shard
|
||||||
|
// fails, but we still want to track that it was attempted to be opened.
|
||||||
|
require.Equal(t, msl.shardTracker, []string{
|
||||||
|
"shard-add",
|
||||||
|
"shard-add",
|
||||||
|
"shard-complete",
|
||||||
|
"shard-complete",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, index := range tsdb.RegisteredIndexes() {
|
||||||
|
t.Run(index, func(t *testing.T) { test(index) })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Introduces a test to ensure that shard loading still accounts for bad shards. We still want these to show up
|
||||||
|
// during the initial shard loading even though its in a error state.
|
||||||
|
func TestStore_BadShardLoading(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
test := func(index string) {
|
||||||
|
s := MustOpenStore(t, index)
|
||||||
|
defer s.Close()
|
||||||
|
|
||||||
|
// Create a new shard and verify that it exists.
|
||||||
|
require.NoError(t, s.CreateShard("db0", "rp0", 1, true))
|
||||||
|
sh := s.Shard(1)
|
||||||
|
require.NotNil(t, sh)
|
||||||
|
|
||||||
|
// Create another shard and verify that it exists.
|
||||||
|
require.NoError(t, s.CreateShard("db0", "rp0", 2, true))
|
||||||
|
sh = s.Shard(2)
|
||||||
|
require.NotNil(t, sh)
|
||||||
|
|
||||||
|
// Create another shard and verify that it exists.
|
||||||
|
require.NoError(t, s.CreateShard("db0", "rp0", 3, true))
|
||||||
|
sh = s.Shard(3)
|
||||||
|
require.NotNil(t, sh)
|
||||||
|
|
||||||
|
s.SetShardOpenErrorForTest(sh.ID(), errors.New("a shard opening error occurred"))
|
||||||
|
err2 := s.OpenShard(s.Shard(sh.ID()), false)
|
||||||
|
require.Error(t, err2, "no error opening bad shard")
|
||||||
|
|
||||||
|
msl := &mockStartupLogger{}
|
||||||
|
|
||||||
|
// Reopen shard and recheck.
|
||||||
|
require.NoError(t, s.Reopen(t, WithStartupMetrics(msl)))
|
||||||
|
|
||||||
|
// Equality check to make sure shards are always added prior to
|
||||||
|
// completion being called. This test opens 3 total shards - 1 shard
|
||||||
|
// fails, but we still want to track that it was attempted to be opened.
|
||||||
|
require.Equal(t, msl.shardTracker, []string{
|
||||||
|
"shard-add",
|
||||||
|
"shard-add",
|
||||||
|
"shard-add",
|
||||||
|
"shard-complete",
|
||||||
|
"shard-complete",
|
||||||
|
"shard-complete",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, index := range tsdb.RegisteredIndexes() {
|
||||||
|
t.Run(index, func(t *testing.T) { test(index) })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestStore_BadShard(t *testing.T) {
|
func TestStore_BadShard(t *testing.T) {
|
||||||
const errStr = "a shard open error"
|
const errStr = "a shard open error"
|
||||||
indexes := tsdb.RegisteredIndexes()
|
indexes := tsdb.RegisteredIndexes()
|
||||||
for _, idx := range indexes {
|
for _, idx := range indexes {
|
||||||
func() {
|
func() {
|
||||||
s := MustOpenStore(idx)
|
s := MustOpenStore(t, idx)
|
||||||
defer require.NoErrorf(t, s.Close(), "closing store with index type: %s", idx)
|
defer require.NoErrorf(t, s.Close(), "closing store with index type: %s", idx)
|
||||||
|
|
||||||
sh := tsdb.NewTempShard(idx)
|
sh := tsdb.NewTempShard(idx)
|
||||||
|
@ -175,7 +266,7 @@ func TestStore_CreateMixedShards(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
test := func(index1 string, index2 string) {
|
test := func(index1 string, index2 string) {
|
||||||
s := MustOpenStore(index1)
|
s := MustOpenStore(t, index1)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
// Create a new shard and verify that it exists.
|
// Create a new shard and verify that it exists.
|
||||||
|
@ -187,7 +278,7 @@ func TestStore_CreateMixedShards(t *testing.T) {
|
||||||
|
|
||||||
s.EngineOptions.IndexVersion = index2
|
s.EngineOptions.IndexVersion = index2
|
||||||
s.index = index2
|
s.index = index2
|
||||||
if err := s.Reopen(); err != nil {
|
if err := s.Reopen(t); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -199,7 +290,7 @@ func TestStore_CreateMixedShards(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reopen shard and recheck.
|
// Reopen shard and recheck.
|
||||||
if err := s.Reopen(); err != nil {
|
if err := s.Reopen(t); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if sh := s.Shard(1); sh == nil {
|
} else if sh := s.Shard(1); sh == nil {
|
||||||
t.Fatalf("expected shard(1)")
|
t.Fatalf("expected shard(1)")
|
||||||
|
@ -231,7 +322,7 @@ func TestStore_DropMeasurementMixedShards(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
test := func(index1 string, index2 string) {
|
test := func(index1 string, index2 string) {
|
||||||
s := MustOpenStore(index1)
|
s := MustOpenStore(t, index1)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
|
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
|
||||||
|
@ -242,7 +333,7 @@ func TestStore_DropMeasurementMixedShards(t *testing.T) {
|
||||||
|
|
||||||
s.EngineOptions.IndexVersion = index2
|
s.EngineOptions.IndexVersion = index2
|
||||||
s.index = index2
|
s.index = index2
|
||||||
if err := s.Reopen(); err != nil {
|
if err := s.Reopen(t); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -276,7 +367,7 @@ func TestStore_DropConcurrentWriteMultipleShards(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
test := func(index string) {
|
test := func(index string) {
|
||||||
s := MustOpenStore(index)
|
s := MustOpenStore(t, index)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
|
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
|
||||||
|
@ -340,7 +431,7 @@ func TestStore_WriteMixedShards(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
test := func(index1 string, index2 string) {
|
test := func(index1 string, index2 string) {
|
||||||
s := MustOpenStore(index1)
|
s := MustOpenStore(t, index1)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
|
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
|
||||||
|
@ -351,7 +442,7 @@ func TestStore_WriteMixedShards(t *testing.T) {
|
||||||
|
|
||||||
s.EngineOptions.IndexVersion = index2
|
s.EngineOptions.IndexVersion = index2
|
||||||
s.index = index2
|
s.index = index2
|
||||||
if err := s.Reopen(); err != nil {
|
if err := s.Reopen(t); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -413,7 +504,7 @@ func TestStore_DeleteSeries_NonExistentDB(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
test := func(index string) {
|
test := func(index string) {
|
||||||
s := MustOpenStore(index)
|
s := MustOpenStore(t, index)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
if err := s.DeleteSeries("db0", nil, nil); err != nil {
|
if err := s.DeleteSeries("db0", nil, nil); err != nil {
|
||||||
|
@ -431,7 +522,7 @@ func TestStore_DeleteSeries_MultipleSources(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
test := func(index string) {
|
test := func(index string) {
|
||||||
s := MustOpenStore(index)
|
s := MustOpenStore(t, index)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
|
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
|
||||||
|
@ -461,7 +552,7 @@ func TestStore_DeleteShard(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
test := func(index string) error {
|
test := func(index string) error {
|
||||||
s := MustOpenStore(index)
|
s := MustOpenStore(t, index)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
// Create a new shard and verify that it exists.
|
// Create a new shard and verify that it exists.
|
||||||
|
@ -493,7 +584,7 @@ func TestStore_DeleteShard(t *testing.T) {
|
||||||
s.MustWriteToShardString(3, "cpu,serverb=b v=1")
|
s.MustWriteToShardString(3, "cpu,serverb=b v=1")
|
||||||
|
|
||||||
// Reopen the store and check all shards still exist
|
// Reopen the store and check all shards still exist
|
||||||
if err := s.Reopen(); err != nil {
|
if err := s.Reopen(t); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for i := uint64(1); i <= 3; i++ {
|
for i := uint64(1); i <= 3; i++ {
|
||||||
|
@ -550,7 +641,7 @@ func TestStore_CreateShardSnapShot(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
test := func(index string) {
|
test := func(index string) {
|
||||||
s := MustOpenStore(index)
|
s := MustOpenStore(t, index)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
// Create a new shard and verify that it exists.
|
// Create a new shard and verify that it exists.
|
||||||
|
@ -578,7 +669,7 @@ func TestStore_Open(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
test := func(index string) {
|
test := func(index string) {
|
||||||
s := NewStore(index)
|
s := NewStore(t, index)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp0", "2"), 0777); err != nil {
|
if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp0", "2"), 0777); err != nil {
|
||||||
|
@ -621,9 +712,14 @@ func TestStore_Open_InvalidDatabaseFile(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
test := func(index string) {
|
test := func(index string) {
|
||||||
s := NewStore(index)
|
s := NewStore(t, index)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
|
// Ensure the directory exists before creating the file.
|
||||||
|
if err := os.MkdirAll(s.Path(), 0777); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
// Create a file instead of a directory for a database.
|
// Create a file instead of a directory for a database.
|
||||||
if _, err := os.Create(filepath.Join(s.Path(), "db0")); err != nil {
|
if _, err := os.Create(filepath.Join(s.Path(), "db0")); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -647,7 +743,7 @@ func TestStore_Open_InvalidRetentionPolicy(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
test := func(index string) {
|
test := func(index string) {
|
||||||
s := NewStore(index)
|
s := NewStore(t, index)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
// Create an RP file instead of a directory.
|
// Create an RP file instead of a directory.
|
||||||
|
@ -677,7 +773,7 @@ func TestStore_Open_InvalidShard(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
test := func(index string) {
|
test := func(index string) {
|
||||||
s := NewStore(index)
|
s := NewStore(t, index)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
// Create a non-numeric shard file.
|
// Create a non-numeric shard file.
|
||||||
|
@ -707,7 +803,7 @@ func TestShards_CreateIterator(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
test := func(index string) {
|
test := func(index string) {
|
||||||
s := MustOpenStore(index)
|
s := MustOpenStore(t, index)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
// Create shard #0 with data.
|
// Create shard #0 with data.
|
||||||
|
@ -792,7 +888,7 @@ func TestStore_NewReadersBlocked(t *testing.T) {
|
||||||
|
|
||||||
test := func(index string) {
|
test := func(index string) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
s := MustOpenStore(index)
|
s := MustOpenStore(t, index)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
shardInUse := func(shardID uint64) bool {
|
shardInUse := func(shardID uint64) bool {
|
||||||
|
@ -865,7 +961,7 @@ func TestStore_NewReadersBlocked(t *testing.T) {
|
||||||
// Ensure the store can backup a shard and another store can restore it.
|
// Ensure the store can backup a shard and another store can restore it.
|
||||||
func TestStore_BackupRestoreShard(t *testing.T) {
|
func TestStore_BackupRestoreShard(t *testing.T) {
|
||||||
test := func(index string) {
|
test := func(index string) {
|
||||||
s0, s1 := MustOpenStore(index), MustOpenStore(index)
|
s0, s1 := MustOpenStore(t, index), MustOpenStore(t, index)
|
||||||
defer s0.Close()
|
defer s0.Close()
|
||||||
defer s1.Close()
|
defer s1.Close()
|
||||||
|
|
||||||
|
@ -876,7 +972,7 @@ func TestStore_BackupRestoreShard(t *testing.T) {
|
||||||
`cpu value=3 20`,
|
`cpu value=3 20`,
|
||||||
)
|
)
|
||||||
|
|
||||||
if err := s0.Reopen(); err != nil {
|
if err := s0.Reopen(t); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -946,7 +1042,7 @@ func TestStore_Shard_SeriesN(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
test := func(index string) error {
|
test := func(index string) error {
|
||||||
s := MustOpenStore(index)
|
s := MustOpenStore(t, index)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
// Create shard with data.
|
// Create shard with data.
|
||||||
|
@ -982,7 +1078,7 @@ func TestStore_MeasurementNames_Deduplicate(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
test := func(index string) {
|
test := func(index string) {
|
||||||
s := MustOpenStore(index)
|
s := MustOpenStore(t, index)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
// Create shard with data.
|
// Create shard with data.
|
||||||
|
@ -1083,7 +1179,7 @@ func TestStore_Cardinality_Tombstoning(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
test := func(index string) {
|
test := func(index string) {
|
||||||
store := NewStore(index)
|
store := NewStore(t, index)
|
||||||
if err := store.Open(); err != nil {
|
if err := store.Open(); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
@ -1148,7 +1244,7 @@ func TestStore_Cardinality_Unique(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
test := func(index string) {
|
test := func(index string) {
|
||||||
store := NewStore(index)
|
store := NewStore(t, index)
|
||||||
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
|
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
|
||||||
if err := store.Open(); err != nil {
|
if err := store.Open(); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
@ -1230,7 +1326,7 @@ func TestStore_Cardinality_Duplicates(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
test := func(index string) {
|
test := func(index string) {
|
||||||
store := NewStore(index)
|
store := NewStore(t, index)
|
||||||
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
|
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
|
||||||
if err := store.Open(); err != nil {
|
if err := store.Open(); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
@ -1250,7 +1346,7 @@ func TestStore_MetaQuery_Timeout(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
test := func(index string) {
|
test := func(index string) {
|
||||||
store := NewStore(index)
|
store := NewStore(t, index)
|
||||||
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
|
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
|
||||||
if err := store.Open(); err != nil {
|
if err := store.Open(); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
@ -1442,7 +1538,7 @@ func TestStore_Cardinality_Compactions(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
test := func(index string) error {
|
test := func(index string) error {
|
||||||
store := NewStore(index)
|
store := NewStore(t, index)
|
||||||
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
|
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
|
||||||
if err := store.Open(); err != nil {
|
if err := store.Open(); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
@ -1467,7 +1563,7 @@ func TestStore_Cardinality_Limit_On_InMem_Index(t *testing.T) {
|
||||||
t.Skip("Skipping test in short, race and appveyor mode.")
|
t.Skip("Skipping test in short, race and appveyor mode.")
|
||||||
}
|
}
|
||||||
|
|
||||||
store := NewStore("inmem")
|
store := NewStore(t, "inmem")
|
||||||
store.EngineOptions.Config.MaxSeriesPerDatabase = 100000
|
store.EngineOptions.Config.MaxSeriesPerDatabase = 100000
|
||||||
if err := store.Open(); err != nil {
|
if err := store.Open(); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
@ -1581,7 +1677,7 @@ func TestStore_Sketches(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
test := func(index string) error {
|
test := func(index string) error {
|
||||||
store := MustOpenStore(index)
|
store := MustOpenStore(t, index)
|
||||||
defer store.Close()
|
defer store.Close()
|
||||||
|
|
||||||
// Generate point data to write to the shards.
|
// Generate point data to write to the shards.
|
||||||
|
@ -1610,7 +1706,7 @@ func TestStore_Sketches(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reopen the store.
|
// Reopen the store.
|
||||||
if err := store.Reopen(); err != nil {
|
if err := store.Reopen(t); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1643,7 +1739,7 @@ func TestStore_Sketches(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reopen the store.
|
// Reopen the store.
|
||||||
if err := store.Reopen(); err != nil {
|
if err := store.Reopen(t); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1779,7 +1875,7 @@ func TestStore_TagValues(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
setup := func(index string) (*Store, []uint64) { // returns shard ids
|
setup := func(index string) (*Store, []uint64) { // returns shard ids
|
||||||
s := MustOpenStore(index)
|
s := MustOpenStore(t, index)
|
||||||
|
|
||||||
fmtStr := `cpu1%[1]d,foo=a,ignoreme=nope,host=tv%[2]d,shard=s%[3]d value=1 %[4]d
|
fmtStr := `cpu1%[1]d,foo=a,ignoreme=nope,host=tv%[2]d,shard=s%[3]d value=1 %[4]d
|
||||||
cpu1%[1]d,host=nofoo value=1 %[4]d
|
cpu1%[1]d,host=nofoo value=1 %[4]d
|
||||||
|
@ -1830,7 +1926,7 @@ func TestStore_Measurements_Auth(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
test := func(index string) error {
|
test := func(index string) error {
|
||||||
s := MustOpenStore(index)
|
s := MustOpenStore(t, index)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
// Create shard #0 with data.
|
// Create shard #0 with data.
|
||||||
|
@ -1919,7 +2015,7 @@ func TestStore_TagKeys_Auth(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
test := func(index string) error {
|
test := func(index string) error {
|
||||||
s := MustOpenStore(index)
|
s := MustOpenStore(t, index)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
// Create shard #0 with data.
|
// Create shard #0 with data.
|
||||||
|
@ -2017,7 +2113,7 @@ func TestStore_TagValues_Auth(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
test := func(index string) error {
|
test := func(index string) error {
|
||||||
s := MustOpenStore(index)
|
s := MustOpenStore(t, index)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
// Create shard #0 with data.
|
// Create shard #0 with data.
|
||||||
|
@ -2148,7 +2244,7 @@ func createTagValues(mname string, kvs map[string][]string) tsdb.TagValues {
|
||||||
|
|
||||||
func TestStore_MeasurementNames_ConcurrentDropShard(t *testing.T) {
|
func TestStore_MeasurementNames_ConcurrentDropShard(t *testing.T) {
|
||||||
for _, index := range tsdb.RegisteredIndexes() {
|
for _, index := range tsdb.RegisteredIndexes() {
|
||||||
s := MustOpenStore(index)
|
s := MustOpenStore(t, index)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
shardN := 10
|
shardN := 10
|
||||||
|
@ -2233,7 +2329,7 @@ func TestStore_MeasurementNames_ConcurrentDropShard(t *testing.T) {
|
||||||
|
|
||||||
func TestStore_TagKeys_ConcurrentDropShard(t *testing.T) {
|
func TestStore_TagKeys_ConcurrentDropShard(t *testing.T) {
|
||||||
for _, index := range tsdb.RegisteredIndexes() {
|
for _, index := range tsdb.RegisteredIndexes() {
|
||||||
s := MustOpenStore(index)
|
s := MustOpenStore(t, index)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
shardN := 10
|
shardN := 10
|
||||||
|
@ -2324,7 +2420,7 @@ func TestStore_TagKeys_ConcurrentDropShard(t *testing.T) {
|
||||||
|
|
||||||
func TestStore_TagValues_ConcurrentDropShard(t *testing.T) {
|
func TestStore_TagValues_ConcurrentDropShard(t *testing.T) {
|
||||||
for _, index := range tsdb.RegisteredIndexes() {
|
for _, index := range tsdb.RegisteredIndexes() {
|
||||||
s := MustOpenStore(index)
|
s := MustOpenStore(t, index)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
shardN := 10
|
shardN := 10
|
||||||
|
@ -2428,7 +2524,7 @@ func TestStore_TagValues_ConcurrentDropShard(t *testing.T) {
|
||||||
|
|
||||||
func BenchmarkStore_SeriesCardinality_100_Shards(b *testing.B) {
|
func BenchmarkStore_SeriesCardinality_100_Shards(b *testing.B) {
|
||||||
for _, index := range tsdb.RegisteredIndexes() {
|
for _, index := range tsdb.RegisteredIndexes() {
|
||||||
store := NewStore(index)
|
store := NewStore(b, index)
|
||||||
if err := store.Open(); err != nil {
|
if err := store.Open(); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
@ -2459,7 +2555,7 @@ func BenchmarkStoreOpen_200KSeries_100Shards(b *testing.B) { benchmarkStoreOpen(
|
||||||
func benchmarkStoreOpen(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt, shardCnt int) {
|
func benchmarkStoreOpen(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt, shardCnt int) {
|
||||||
var store *Store
|
var store *Store
|
||||||
setup := func(index string) error {
|
setup := func(index string) error {
|
||||||
store := MustOpenStore(index)
|
store := MustOpenStore(b, index)
|
||||||
|
|
||||||
// Generate test series (measurements + unique tag sets).
|
// Generate test series (measurements + unique tag sets).
|
||||||
series := genTestSeries(mCnt, tkCnt, tvCnt)
|
series := genTestSeries(mCnt, tkCnt, tvCnt)
|
||||||
|
@ -2530,7 +2626,7 @@ func BenchmarkStore_TagValues(b *testing.B) {
|
||||||
}
|
}
|
||||||
|
|
||||||
setup := func(shards, measurements, tagValues int, index string, useRandom bool) (*Store, []uint64) { // returns shard ids
|
setup := func(shards, measurements, tagValues int, index string, useRandom bool) (*Store, []uint64) { // returns shard ids
|
||||||
s := NewStore(index)
|
s := NewStore(b, index)
|
||||||
if err := s.Open(); err != nil {
|
if err := s.Open(); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
@ -2632,23 +2728,46 @@ func BenchmarkStore_TagValues(b *testing.B) {
|
||||||
// Store is a test wrapper for tsdb.Store.
|
// Store is a test wrapper for tsdb.Store.
|
||||||
type Store struct {
|
type Store struct {
|
||||||
*tsdb.Store
|
*tsdb.Store
|
||||||
index string
|
path string
|
||||||
|
index string
|
||||||
|
walPath string
|
||||||
|
opts []StoreOption
|
||||||
|
}
|
||||||
|
|
||||||
|
type StoreOption func(s *Store) error
|
||||||
|
|
||||||
|
func WithStartupMetrics(sm *mockStartupLogger) StoreOption {
|
||||||
|
return func(s *Store) error {
|
||||||
|
s.WithStartupMetrics(sm)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStore returns a new instance of Store with a temporary path.
|
// NewStore returns a new instance of Store with a temporary path.
|
||||||
func NewStore(index string) *Store {
|
func NewStore(tb testing.TB, index string, opts ...StoreOption) *Store {
|
||||||
path, err := os.MkdirTemp("", "influxdb-tsdb-")
|
tb.Helper()
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
// The WAL directory must not be rooted under the data path. Otherwise reopening
|
||||||
|
// the store will generate series indices for the WAL directories.
|
||||||
|
rootPath := tb.TempDir()
|
||||||
|
path := filepath.Join(rootPath, "data")
|
||||||
|
walPath := filepath.Join(rootPath, "wal")
|
||||||
|
|
||||||
|
s := &Store{
|
||||||
|
Store: tsdb.NewStore(path),
|
||||||
|
path: path,
|
||||||
|
index: index,
|
||||||
|
walPath: walPath,
|
||||||
|
opts: opts,
|
||||||
}
|
}
|
||||||
|
|
||||||
s := &Store{Store: tsdb.NewStore(path), index: index}
|
|
||||||
s.EngineOptions.IndexVersion = index
|
s.EngineOptions.IndexVersion = index
|
||||||
s.EngineOptions.Config.WALDir = filepath.Join(path, "wal")
|
s.EngineOptions.Config.WALDir = walPath
|
||||||
s.EngineOptions.Config.TraceLoggingEnabled = true
|
s.EngineOptions.Config.TraceLoggingEnabled = true
|
||||||
|
s.WithLogger(zaptest.NewLogger(tb))
|
||||||
|
|
||||||
if testing.Verbose() {
|
for _, o := range s.opts {
|
||||||
s.WithLogger(logger.New(os.Stdout))
|
err := o(s)
|
||||||
|
require.NoError(tb, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return s
|
return s
|
||||||
|
@ -2656,8 +2775,8 @@ func NewStore(index string) *Store {
|
||||||
|
|
||||||
// MustOpenStore returns a new, open Store using the specified index,
|
// MustOpenStore returns a new, open Store using the specified index,
|
||||||
// at a temporary path.
|
// at a temporary path.
|
||||||
func MustOpenStore(index string) *Store {
|
func MustOpenStore(tb testing.TB, index string, opts ...StoreOption) *Store {
|
||||||
s := NewStore(index)
|
s := NewStore(tb, index, opts...)
|
||||||
|
|
||||||
if err := s.Open(); err != nil {
|
if err := s.Open(); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
@ -2665,27 +2784,44 @@ func MustOpenStore(index string) *Store {
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reopen closes and reopens the store as a new store.
|
func (s *Store) Reopen(tb testing.TB, newOpts ...StoreOption) error {
|
||||||
func (s *Store) Reopen() error {
|
tb.Helper()
|
||||||
if err := s.Store.Close(); err != nil {
|
|
||||||
return err
|
if s.Store != nil {
|
||||||
|
if err := s.Store.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
s.Store = tsdb.NewStore(s.Path())
|
s.Store = tsdb.NewStore(s.path)
|
||||||
s.EngineOptions.IndexVersion = s.index
|
s.EngineOptions.IndexVersion = s.index
|
||||||
s.EngineOptions.Config.WALDir = filepath.Join(s.Path(), "wal")
|
s.EngineOptions.Config.WALDir = s.walPath
|
||||||
s.EngineOptions.Config.TraceLoggingEnabled = true
|
s.EngineOptions.Config.TraceLoggingEnabled = true
|
||||||
|
s.WithLogger(zaptest.NewLogger(tb))
|
||||||
if testing.Verbose() {
|
if len(newOpts) > 0 {
|
||||||
s.WithLogger(logger.New(os.Stdout))
|
s.opts = newOpts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, o := range s.opts {
|
||||||
|
err := o(s)
|
||||||
|
require.NoError(tb, err)
|
||||||
|
}
|
||||||
|
|
||||||
return s.Store.Open()
|
return s.Store.Open()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes the store and removes the underlying data.
|
// Close closes the store and removes the underlying data.
|
||||||
func (s *Store) Close() error {
|
func (s *Store) Close() error {
|
||||||
defer os.RemoveAll(s.Path())
|
defer func(path string) {
|
||||||
return s.Store.Close()
|
err := os.RemoveAll(path)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
}(s.path)
|
||||||
|
if s.Store != nil {
|
||||||
|
return s.Store.Close()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// MustCreateShardWithData creates a shard and writes line protocol data to it.
|
// MustCreateShardWithData creates a shard and writes line protocol data to it.
|
||||||
|
@ -2754,3 +2890,20 @@ func dirExists(path string) bool {
|
||||||
}
|
}
|
||||||
return !os.IsNotExist(err)
|
return !os.IsNotExist(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type mockStartupLogger struct {
|
||||||
|
shardTracker []string
|
||||||
|
mu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockStartupLogger) AddShard() {
|
||||||
|
m.mu.Lock()
|
||||||
|
m.shardTracker = append(m.shardTracker, fmt.Sprintf("shard-add"))
|
||||||
|
m.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockStartupLogger) CompletedShard() {
|
||||||
|
m.mu.Lock()
|
||||||
|
m.shardTracker = append(m.shardTracker, fmt.Sprintf("shard-complete"))
|
||||||
|
m.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue