Remove per shard monitor goroutine
The monitor goroutine ran for each shard and updated disk stats as well as logged cardinality warnings. This goroutine has been removed by making the disks stats more lightweight and callable direclty from Statisics and move the logging to the tsdb.Store. The latter allows one goroutine to handle all shards.pull/8348/head
parent
684f5d884a
commit
88848a9426
|
@ -74,6 +74,7 @@ type Engine interface {
|
|||
// Statistics will return statistics relevant to this engine.
|
||||
Statistics(tags map[string]string) []models.Statistic
|
||||
LastModified() time.Time
|
||||
DiskSize() int64
|
||||
IsIdle() bool
|
||||
|
||||
io.WriterTo
|
||||
|
|
|
@ -430,6 +430,11 @@ func (e *Engine) Statistics(tags map[string]string) []models.Statistic {
|
|||
return statistics
|
||||
}
|
||||
|
||||
// DiskSize returns the total size in bytes of all TSM and WAL segments on disk.
|
||||
func (e *Engine) DiskSize() int64 {
|
||||
return e.FileStore.DiskSizeBytes() + e.WAL.DiskSizeBytes()
|
||||
}
|
||||
|
||||
// Open opens and initializes the engine.
|
||||
func (e *Engine) Open() error {
|
||||
if err := os.MkdirAll(e.path, 0777); err != nil {
|
||||
|
|
|
@ -409,6 +409,9 @@ func (f *FileStore) Open() error {
|
|||
f.files = append(f.files, res.r)
|
||||
// Accumulate file store size stats
|
||||
atomic.AddInt64(&f.stats.DiskBytes, int64(res.r.Size()))
|
||||
for _, ts := range res.r.TombstoneFiles() {
|
||||
atomic.AddInt64(&f.stats.DiskBytes, int64(ts.Size))
|
||||
}
|
||||
|
||||
// Re-initialize the lastModified time for the file store
|
||||
if res.r.LastModified() > lm {
|
||||
|
@ -439,6 +442,10 @@ func (f *FileStore) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (f *FileStore) DiskSizeBytes() int64 {
|
||||
return atomic.LoadInt64(&f.stats.DiskBytes)
|
||||
}
|
||||
|
||||
// Read returns the slice of values for the given key and the given timestamp,
|
||||
// if any file matches those constraints.
|
||||
func (f *FileStore) Read(key string, t int64) ([]Value, error) {
|
||||
|
@ -628,6 +635,10 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error {
|
|||
var totalSize int64
|
||||
for _, file := range f.files {
|
||||
totalSize += int64(file.Size())
|
||||
for _, ts := range file.TombstoneFiles() {
|
||||
totalSize += int64(ts.Size)
|
||||
}
|
||||
|
||||
}
|
||||
atomic.StoreInt64(&f.stats.DiskBytes, totalSize)
|
||||
|
||||
|
|
|
@ -375,6 +375,10 @@ func (l *WAL) LastWriteTime() time.Time {
|
|||
return l.lastWriteTime
|
||||
}
|
||||
|
||||
func (l *WAL) DiskSizeBytes() int64 {
|
||||
return atomic.LoadInt64(&l.stats.OldBytes) + atomic.LoadInt64(&l.stats.CurrentBytes)
|
||||
}
|
||||
|
||||
func (l *WAL) writeToLog(entry WALEntry) (int, error) {
|
||||
// limit how many concurrent encodings can be in flight. Since we can only
|
||||
// write one at a time to disk, a slow disk can cause the allocations below
|
||||
|
|
104
tsdb/shard.go
104
tsdb/shard.go
|
@ -6,7 +6,6 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"sort"
|
||||
|
@ -207,6 +206,9 @@ func (s *Shard) Statistics(tags map[string]string) []models.Statistic {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Refresh our disk size stat
|
||||
_, _ = s.DiskSize()
|
||||
|
||||
// TODO(edd): Should statSeriesCreate be the current number of series in the
|
||||
// shard, or the total number of series ever created?
|
||||
sSketch, tSketch, err := s.engine.SeriesSketches()
|
||||
|
@ -289,9 +291,6 @@ func (s *Shard) Open() error {
|
|||
}
|
||||
s.engine = e
|
||||
|
||||
s.wg.Add(1)
|
||||
go s.monitor()
|
||||
|
||||
return nil
|
||||
}(); err != nil {
|
||||
s.close(true)
|
||||
|
@ -355,6 +354,12 @@ func (s *Shard) close(clean bool) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (s *Shard) IndexType() string {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.index.Type()
|
||||
}
|
||||
|
||||
// ready determines if the Shard is ready for queries or writes.
|
||||
// It returns nil if ready, otherwise ErrShardClosed or ErrShardDiabled
|
||||
func (s *Shard) ready() error {
|
||||
|
@ -402,33 +407,9 @@ func (s *Shard) SetCompactionsEnabled(enabled bool) {
|
|||
|
||||
// DiskSize returns the size on disk of this shard
|
||||
func (s *Shard) DiskSize() (int64, error) {
|
||||
var size int64
|
||||
err := filepath.Walk(s.path, func(_ string, fi os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !fi.IsDir() {
|
||||
size += fi.Size()
|
||||
}
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
err = filepath.Walk(s.walPath, func(_ string, fi os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !fi.IsDir() {
|
||||
size += fi.Size()
|
||||
}
|
||||
return err
|
||||
})
|
||||
|
||||
return size, err
|
||||
size := s.engine.DiskSize()
|
||||
atomic.StoreInt64(&s.stats.DiskBytes, size)
|
||||
return size, nil
|
||||
}
|
||||
|
||||
// FieldCreate holds information for a field to create on a measurement.
|
||||
|
@ -984,63 +965,12 @@ func (s *Shard) CreateSnapshot() (string, error) {
|
|||
return s.engine.CreateSnapshot()
|
||||
}
|
||||
|
||||
func (s *Shard) monitor() {
|
||||
defer s.wg.Done()
|
||||
func (s *Shard) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error {
|
||||
return s.engine.ForEachMeasurementTagKey(name, fn)
|
||||
}
|
||||
|
||||
t := time.NewTicker(monitorStatInterval)
|
||||
defer t.Stop()
|
||||
t2 := time.NewTicker(time.Minute)
|
||||
defer t2.Stop()
|
||||
var changed time.Time
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.closing:
|
||||
return
|
||||
case <-t.C:
|
||||
// Checking DiskSize can be expensive with a lot of shards and TSM files, only
|
||||
// check if something has changed.
|
||||
lm := s.LastModified()
|
||||
if lm.Equal(changed) {
|
||||
continue
|
||||
}
|
||||
|
||||
size, err := s.DiskSize()
|
||||
if err != nil {
|
||||
s.logger.Info(fmt.Sprintf("Error collecting shard size: %v", err))
|
||||
continue
|
||||
}
|
||||
atomic.StoreInt64(&s.stats.DiskBytes, size)
|
||||
changed = lm
|
||||
case <-t2.C:
|
||||
if s.options.Config.MaxValuesPerTag == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
names, err := s.MeasurementNamesByExpr(nil)
|
||||
if err != nil {
|
||||
s.logger.Warn("cannot retrieve measurement names", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
for _, name := range names {
|
||||
s.engine.ForEachMeasurementTagKey(name, func(k []byte) error {
|
||||
n := s.engine.TagKeyCardinality(name, k)
|
||||
perc := int(float64(n) / float64(s.options.Config.MaxValuesPerTag) * 100)
|
||||
if perc > 100 {
|
||||
perc = 100
|
||||
}
|
||||
|
||||
// Log at 80, 85, 90-100% levels
|
||||
if perc == 80 || perc == 85 || perc >= 90 {
|
||||
s.logger.Info(fmt.Sprintf("WARN: %d%% of max-values-per-tag limit exceeded: (%d/%d), db=%s shard=%d measurement=%s tag=%s",
|
||||
perc, n, s.options.Config.MaxValuesPerTag, s.database, s.id, name, k))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
func (s *Shard) TagKeyCardinality(name, key []byte) int {
|
||||
return s.engine.TagKeyCardinality(name, key)
|
||||
}
|
||||
|
||||
type ShardGroup interface {
|
||||
|
|
|
@ -1055,6 +1055,8 @@ func (s *Store) monitorShards() {
|
|||
defer s.wg.Done()
|
||||
t := time.NewTicker(10 * time.Second)
|
||||
defer t.Stop()
|
||||
t2 := time.NewTicker(time.Minute)
|
||||
defer t2.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-s.closing:
|
||||
|
@ -1069,6 +1071,45 @@ func (s *Store) monitorShards() {
|
|||
}
|
||||
}
|
||||
s.mu.RUnlock()
|
||||
case <-t2.C:
|
||||
if s.EngineOptions.Config.MaxValuesPerTag == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
s.mu.RLock()
|
||||
shards := s.filterShards(func(sh *Shard) bool {
|
||||
return sh.IndexType() == "inmem"
|
||||
})
|
||||
s.mu.RUnlock()
|
||||
|
||||
s.walkShards(shards, func(sh *Shard) error {
|
||||
db := sh.database
|
||||
id := sh.id
|
||||
|
||||
names, err := sh.MeasurementNamesByExpr(nil)
|
||||
if err != nil {
|
||||
s.Logger.Warn("cannot retrieve measurement names", zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, name := range names {
|
||||
sh.ForEachMeasurementTagKey(name, func(k []byte) error {
|
||||
n := sh.TagKeyCardinality(name, k)
|
||||
perc := int(float64(n) / float64(s.EngineOptions.Config.MaxValuesPerTag) * 100)
|
||||
if perc > 100 {
|
||||
perc = 100
|
||||
}
|
||||
|
||||
// Log at 80, 85, 90-100% levels
|
||||
if perc == 80 || perc == 85 || perc >= 90 {
|
||||
s.Logger.Info(fmt.Sprintf("WARN: %d%% of max-values-per-tag limit exceeded: (%d/%d), db=%s shard=%d measurement=%s tag=%s",
|
||||
perc, n, s.EngineOptions.Config.MaxValuesPerTag, db, id, name, k))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue