chore(ae): add more logging (#21381)

tsdb.Engine.IsIdle and tsdb.Engine.Digest now return a reason string for why the engine & shard are not idle.
Callers can then use this string for logging, if desired. The returned reason does not allocate memory, so the
caller may want to add the shard ID and path for more information in the log. This is intended to be used in
calls from the anti-entropy service in Enterprise.
pull/21518/head
davidby-influx 2021-05-07 12:55:58 -07:00 committed by GitHub
parent 7490053b7a
commit bf45841359
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 44 additions and 21 deletions

View File

@ -2,6 +2,7 @@ v1.8.6 [unreleased]
-------------------
- [#21290](https://github.com/influxdata/influxdb/pull/21290): fix: Anti-Entropy loops endlessly with empty shard
- [#21381](https://github.com/influxdata/influxdb/pull/21381): chore(ae): add more logging
v1.8.5 [2021-04-19]
-------------------

View File

@ -3,6 +3,7 @@ package control_test
import (
"context"
"testing"
"time"
"github.com/influxdata/flux"
"github.com/influxdata/flux/memory"
@ -23,6 +24,9 @@ func TestController_Query(t *testing.T) {
compiler := &mock.Compiler{
Type: "mock",
CompileFn: func(ctx context.Context) (flux.Program, error) {
// On fast machines, compilation can be faster than clock granularity
// causing the compile duration test below to fail
time.Sleep(time.Second)
return &mock.Program{
StartFn: func(ctx context.Context, alloc *memory.Allocator) (*mock.Query, error) {
ch := make(chan flux.Result)

View File

@ -78,7 +78,7 @@ type Engine interface {
Statistics(tags map[string]string) []models.Statistic
LastModified() time.Time
DiskSize() int64
IsIdle() bool
IsIdle() (bool, string)
Free() error
io.WriterTo

View File

@ -888,17 +888,33 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error {
// IsIdle returns true if the cache is empty, there are no running compactions and the
// shard is fully compacted.
func (e *Engine) IsIdle() bool {
cacheEmpty := e.Cache.Size() == 0
func (e *Engine) IsIdle() (state bool, reason string) {
c := []struct {
ActiveCompactions *int64
LogMessage string
}{
{&e.stats.CacheCompactionsActive, "not idle because of active Cache compactions"},
{&e.stats.TSMCompactionsActive[0], "not idle because of active Level Zero compactions"},
{&e.stats.TSMCompactionsActive[1], "not idle because of active Level One compactions"},
{&e.stats.TSMCompactionsActive[2], "not idle because of active Level Two compactions"},
{&e.stats.TSMFullCompactionsActive, "not idle because of active Full compactions"},
{&e.stats.TSMOptimizeCompactionsActive, "not idle because of active TSM Optimization compactions"},
}
runningCompactions := atomic.LoadInt64(&e.stats.CacheCompactionsActive)
runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[0])
runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[1])
runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[2])
runningCompactions += atomic.LoadInt64(&e.stats.TSMFullCompactionsActive)
runningCompactions += atomic.LoadInt64(&e.stats.TSMOptimizeCompactionsActive)
for _, compactionState := range c {
count := atomic.LoadInt64(compactionState.ActiveCompactions)
if count > 0 {
return false, compactionState.LogMessage
}
}
return cacheEmpty && runningCompactions == 0 && e.CompactionPlan.FullyCompacted()
if cacheSize := e.Cache.Size(); cacheSize > 0 {
return false, "not idle because cache size is nonzero"
} else if !e.CompactionPlan.FullyCompacted() {
return false, "not idle because shard is not fully compacted"
} else {
return true, ""
}
}
// Free releases any resources held by the engine to free up memory or CPU.

View File

@ -447,10 +447,10 @@ func (s *Shard) SeriesFile() (*SeriesFile, error) {
}
// IsIdle return true if the shard is not receiving writes and is fully compacted.
func (s *Shard) IsIdle() bool {
func (s *Shard) IsIdle() (state bool, reason string) {
engine, err := s.Engine()
if err != nil {
return true
return true, ""
}
return engine.IsIdle()
}
@ -1200,19 +1200,20 @@ func (s *Shard) TagKeyCardinality(name, key []byte) int {
}
// Digest returns a digest of the shard.
func (s *Shard) Digest() (io.ReadCloser, int64, error) {
func (s *Shard) Digest() (io.ReadCloser, int64, error, string) {
engine, err := s.Engine()
if err != nil {
return nil, 0, err
return nil, 0, err, ""
}
// Make sure the shard is idle/cold. (No use creating a digest of a
// hot shard that is rapidly changing.)
if !engine.IsIdle() {
return nil, 0, ErrShardNotIdle
if isIdle, reason := engine.IsIdle(); !isIdle {
return nil, 0, ErrShardNotIdle, reason
}
return engine.Digest()
readCloser, size, err := engine.Digest()
return readCloser, size, err, ""
}
// engine safely (under an RLock) returns a reference to the shard's Engine, or

View File

@ -445,7 +445,7 @@ func (s *Store) loadShards() error {
// Enable all shards
for _, sh := range s.shards {
sh.SetEnabled(true)
if sh.IsIdle() {
if isIdle, _ := sh.IsIdle(); isIdle {
if err := sh.Free(); err != nil {
return err
}
@ -593,7 +593,8 @@ func (s *Store) ShardDigest(id uint64) (io.ReadCloser, int64, error) {
return nil, 0, ErrShardNotFound
}
return sh.Digest()
readCloser, size, err, _ := sh.Digest()
return readCloser, size, err
}
// CreateShard creates a shard with the given id and retention policy on a database.
@ -1445,7 +1446,7 @@ func (s *Store) WriteToShardWithContext(ctx context.Context, shardID uint64, poi
// Ensure snapshot compactions are enabled since the shard might have been cold
// and disabled by the monitor.
if sh.IsIdle() {
if isIdle, _ := sh.IsIdle(); isIdle {
sh.SetCompactionsEnabled(true)
}
@ -1986,7 +1987,7 @@ func (s *Store) monitorShards() {
case <-t.C:
s.mu.RLock()
for _, sh := range s.shards {
if sh.IsIdle() {
if isIdle, _ := sh.IsIdle(); isIdle {
if err := sh.Free(); err != nil {
s.Logger.Warn("Error while freeing cold shard resources",
zap.Error(err),