chore: add logging to compaction (#21707) (#21900)

Compaction logging will generate intermediate information on
volume of data written and output files created, as well as
improve some of the anti-entropy messages related to compaction.

Closes https://github.com/influxdata/influxdb/issues/21704

(cherry picked from commit 73bdb2860e)

Closes https://github.com/influxdata/influxdb/issues/21706
pull/21910/head
davidby-influx 2021-07-21 09:43:21 -07:00 committed by GitHub
parent 3b86c32468
commit a78729b2ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 62 additions and 48 deletions

View File

@ -28,9 +28,11 @@ import (
"github.com/influxdata/influxdb/v2/pkg/limiter"
"github.com/influxdata/influxdb/v2/tsdb"
"go.uber.org/zap"
)
const maxTSMFileSize = uint32(2048 * 1024 * 1024) // 2GB
const logEvery = 2 * DefaultSegmentSize
const (
// CompactionTempExtension is the extension used for temporary files created during compaction.
@ -91,7 +93,7 @@ type CompactionPlanner interface {
PlanLevel(level int) []CompactionGroup
PlanOptimize() []CompactionGroup
Release(group []CompactionGroup)
FullyCompacted() bool
FullyCompacted() (bool, string)
// ForceFull causes the planner to return a full compaction plan the next
// time Plan() is called if there are files that could be compacted.
@ -211,9 +213,15 @@ func (c *DefaultPlanner) ParseFileName(path string) (int, int, error) {
}
// FullyCompacted returns true if the shard is fully compacted.
func (c *DefaultPlanner) FullyCompacted() bool {
func (c *DefaultPlanner) FullyCompacted() (bool, string) {
gens := c.findGenerations(false)
return len(gens) <= 1 && !gens.hasTombstones()
if len(gens) > 1 {
return false, "not fully compacted and not idle because of more than one generation"
} else if gens.hasTombstones() {
return false, "not fully compacted and not idle because of tombstones"
} else {
return true, ""
}
}
// ForceFull causes the planner to return a full compaction plan the next time
@ -254,7 +262,7 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
for i := 0; i < len(generations); i++ {
cur := generations[i]
// See if this generation is orphan'd which would prevent it from being further
// See if this generation is orphaned which would prevent it from being further
// compacted until a final full compaction runs.
if i < len(generations)-1 {
if cur.level() < generations[i+1].level() {
@ -818,7 +826,7 @@ func (c *Compactor) EnableCompactions() {
}
// WriteSnapshot writes a Cache snapshot to one or more new TSM files.
func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
func (c *Compactor) WriteSnapshot(cache *Cache, logger *zap.Logger) ([]string, error) {
c.mu.RLock()
enabled := c.snapshotsEnabled
intC := c.snapshotsInterrupt
@ -857,7 +865,7 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
for i := 0; i < concurrency; i++ {
go func(sp *Cache) {
iter := NewCacheKeyIterator(sp, tsdb.DefaultMaxPointsPerBlock, intC)
files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, nil, iter, throttle)
files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, nil, iter, throttle, logger)
resC <- res{files: files, err: err}
}(splits[i])
@ -891,7 +899,7 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
}
// compact writes multiple smaller TSM files into 1 or more larger files.
func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
func (c *Compactor) compact(fast bool, tsmFiles []string, logger *zap.Logger) ([]string, error) {
size := c.Size
if size <= 0 {
size = tsdb.DefaultMaxPointsPerBlock
@ -942,6 +950,7 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
}
if len(trs) == 0 {
logger.Debug("No input files")
return nil, nil
}
@ -950,11 +959,11 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
return nil, err
}
return c.writeNewFiles(maxGeneration, maxSequence, tsmFiles, tsm, true)
return c.writeNewFiles(maxGeneration, maxSequence, tsmFiles, tsm, true, logger)
}
// CompactFull writes multiple smaller TSM files into 1 or more larger files.
func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) {
func (c *Compactor) CompactFull(tsmFiles []string, logger *zap.Logger) ([]string, error) {
c.mu.RLock()
enabled := c.compactionsEnabled
c.mu.RUnlock()
@ -968,7 +977,7 @@ func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) {
}
defer c.remove(tsmFiles)
files, err := c.compact(false, tsmFiles)
files, err := c.compact(false, tsmFiles, logger)
// See if we were disabled while writing a snapshot
c.mu.RLock()
@ -986,7 +995,7 @@ func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) {
}
// CompactFast writes multiple smaller TSM files into 1 or more larger files.
func (c *Compactor) CompactFast(tsmFiles []string) ([]string, error) {
func (c *Compactor) CompactFast(tsmFiles []string, logger *zap.Logger) ([]string, error) {
c.mu.RLock()
enabled := c.compactionsEnabled
c.mu.RUnlock()
@ -1000,7 +1009,7 @@ func (c *Compactor) CompactFast(tsmFiles []string) ([]string, error) {
}
defer c.remove(tsmFiles)
files, err := c.compact(true, tsmFiles)
files, err := c.compact(true, tsmFiles, logger)
// See if we were disabled while writing a snapshot
c.mu.RLock()
@ -1031,7 +1040,7 @@ func (c *Compactor) removeTmpFiles(files []string) error {
// writeNewFiles writes from the iterator into new TSM files, rotating
// to a new file once it has reached the max TSM file size.
func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter KeyIterator, throttle bool) ([]string, error) {
func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter KeyIterator, throttle bool, logger *zap.Logger) ([]string, error) {
// These are the new TSM files written
var files []string
@ -1040,16 +1049,19 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K
// New TSM files are written to a temp file and renamed when fully completed.
fileName := filepath.Join(c.Dir, c.formatFileName(generation, sequence)+"."+TSMFileExtension+"."+TmpTSMFileExtension)
logger.Debug("Compacting files", zap.Int("file_count", len(src)), zap.String("output_file", fileName))
// Write as much as possible to this file
err := c.write(fileName, iter, throttle)
err := c.write(fileName, iter, throttle, logger)
// We've hit the max file limit and there is more to write. Create a new file
// and continue.
if err == errMaxFileExceeded || err == ErrMaxBlocksExceeded {
files = append(files, fileName)
logger.Debug("file size or block count exceeded, opening another output file", zap.String("output_file", fileName))
continue
} else if err == ErrNoValues {
logger.Debug("Dropping empty file", zap.String("output_file", fileName))
// If the file only contained tombstoned entries, then it would be a 0 length
// file that we can drop.
if err := os.RemoveAll(fileName); err != nil {
@ -1061,16 +1073,15 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K
// planner keeps track of which files are assigned to compaction plans now.
return nil, err
} else if err != nil {
// We hit an error and didn't finish the compaction. Abort.
// Remove any tmp files we already completed
// discard later errors to return the first one from the write() call
for _, f := range files {
if err := os.RemoveAll(f); err != nil {
return nil, err
}
}
// We hit an error and didn't finish the compaction. Remove the temp file and abort.
if err := os.RemoveAll(fileName); err != nil {
return nil, err
_ = os.RemoveAll(f)
}
// Remove the temp file
// discard later errors to return the first one from the write() call
_ = os.RemoveAll(fileName)
return nil, err
}
@ -1081,7 +1092,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K
return files, nil
}
func (c *Compactor) write(path string, iter KeyIterator, throttle bool) (err error) {
func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *zap.Logger) (err error) {
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666)
if err != nil {
return errCompactionInProgress{err: err}
@ -1134,10 +1145,11 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool) (err err
}
if err != nil {
w.Remove()
_ = w.Remove()
}
}()
lastLogSize := w.Size()
for iter.Next() {
c.mu.RLock()
enabled := c.snapshotsEnabled || c.compactionsEnabled
@ -1176,6 +1188,9 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool) (err err
}
return errMaxFileExceeded
} else if (w.Size() - lastLogSize) > logEvery {
logger.Debug("Compaction progress", zap.String("output_file", path), zap.Uint32("size", w.Size()))
lastLogSize = w.Size()
}
}
@ -1188,6 +1203,7 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool) (err err
if err := w.WriteIndex(); err != nil {
return err
}
logger.Debug("Compaction finished", zap.String("output_file", path), zap.Uint32("size", w.Size()))
return nil
}

View File

@ -12,6 +12,7 @@ import (
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
"go.uber.org/zap"
)
// Tests compacting a Cache snapshot into a single TSM file
@ -39,7 +40,7 @@ func TestCompactor_Snapshot(t *testing.T) {
compactor.Dir = dir
compactor.FileStore = &fakeFileStore{}
files, err := compactor.WriteSnapshot(c)
files, err := compactor.WriteSnapshot(c, zap.NewNop())
if err == nil {
t.Fatalf("expected error writing snapshot: %v", err)
}
@ -50,7 +51,7 @@ func TestCompactor_Snapshot(t *testing.T) {
compactor.Open()
files, err = compactor.WriteSnapshot(c)
files, err = compactor.WriteSnapshot(c, zap.NewNop())
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
@ -119,7 +120,7 @@ func TestCompactor_CompactFullLastTimestamp(t *testing.T) {
compactor.FileStore = fs
compactor.Open()
files, err := compactor.CompactFull([]string{f1, f2})
files, err := compactor.CompactFull([]string{f1, f2}, zap.NewNop())
if err != nil {
t.Fatalf("unexpected error writing snapshot: %#v", err)
}
@ -175,7 +176,7 @@ func TestCompactor_CompactFull(t *testing.T) {
compactor.Dir = dir
compactor.FileStore = fs
files, err := compactor.CompactFull([]string{f1, f2, f3})
files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop())
if err == nil {
t.Fatalf("expected error writing snapshot: %v", err)
}
@ -186,7 +187,7 @@ func TestCompactor_CompactFull(t *testing.T) {
compactor.Open()
files, err = compactor.CompactFull([]string{f1, f2, f3})
files, err = compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop())
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
@ -285,7 +286,7 @@ func TestCompactor_DecodeError(t *testing.T) {
compactor.Dir = dir
compactor.FileStore = fs
files, err := compactor.CompactFull([]string{f1, f2, f3})
files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop())
if err == nil {
t.Fatalf("expected error writing snapshot: %v", err)
}
@ -296,9 +297,7 @@ func TestCompactor_DecodeError(t *testing.T) {
compactor.Open()
_, err = compactor.CompactFull([]string{f1, f2, f3})
if err == nil ||
!strings.Contains(err.Error(), "decode error: unable to decompress block type float for key 'cpu,host=A#!~#value': unpackBlock: not enough data for timestamp") {
if _, err = compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop()); err == nil || !strings.Contains(err.Error(), "decode error: unable to decompress block type float for key 'cpu,host=A#!~#value': unpackBlock: not enough data for timestamp") {
t.Fatalf("expected error writing snapshot: %v", err)
}
}
@ -336,7 +335,7 @@ func TestCompactor_Compact_OverlappingBlocks(t *testing.T) {
compactor.Open()
files, err := compactor.CompactFast([]string{f1, f3})
files, err := compactor.CompactFast([]string{f1, f3}, zap.NewNop())
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
@ -416,7 +415,7 @@ func TestCompactor_Compact_OverlappingBlocksMultiple(t *testing.T) {
compactor.Open()
files, err := compactor.CompactFast([]string{f1, f2, f3})
files, err := compactor.CompactFast([]string{f1, f2, f3}, zap.NewNop())
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
@ -484,7 +483,7 @@ func TestCompactor_Compact_UnsortedBlocks(t *testing.T) {
compactor.Open()
files, err := compactor.CompactFast([]string{f1, f2})
files, err := compactor.CompactFast([]string{f1, f2}, zap.NewNop())
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
@ -558,7 +557,7 @@ func TestCompactor_Compact_UnsortedBlocksOverlapping(t *testing.T) {
compactor.Open()
files, err := compactor.CompactFast([]string{f1, f2, f3})
files, err := compactor.CompactFast([]string{f1, f2, f3}, zap.NewNop())
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
@ -629,7 +628,7 @@ func TestCompactor_CompactFull_SkipFullBlocks(t *testing.T) {
compactor.Size = 2
compactor.Open()
files, err := compactor.CompactFull([]string{f1, f2, f3})
files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop())
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
@ -731,7 +730,7 @@ func TestCompactor_CompactFull_TombstonedSkipBlock(t *testing.T) {
compactor.Size = 2
compactor.Open()
files, err := compactor.CompactFull([]string{f1, f2, f3})
files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop())
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
@ -834,7 +833,7 @@ func TestCompactor_CompactFull_TombstonedPartialBlock(t *testing.T) {
compactor.Size = 2
compactor.Open()
files, err := compactor.CompactFull([]string{f1, f2, f3})
files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop())
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
@ -942,7 +941,7 @@ func TestCompactor_CompactFull_TombstonedMultipleRanges(t *testing.T) {
compactor.Size = 2
compactor.Open()
files, err := compactor.CompactFull([]string{f1, f2, f3})
files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop())
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
@ -1058,7 +1057,7 @@ func TestCompactor_CompactFull_MaxKeys(t *testing.T) {
compactor.Open()
// Compact both files, should get 2 files back
files, err := compactor.CompactFull([]string{f1Name, f2Name})
files, err := compactor.CompactFull([]string{f1Name, f2Name}, zap.NewNop())
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}

View File

@ -906,8 +906,8 @@ func (e *Engine) IsIdle() (state bool, reason string) {
if cacheSize := e.Cache.Size(); cacheSize > 0 {
return false, "not idle because cache size is nonzero"
} else if !e.CompactionPlan.FullyCompacted() {
return false, "not idle because shard is not fully compacted"
} else if c, r := e.CompactionPlan.FullyCompacted(); !c {
return false, r
} else {
return true, ""
}
@ -1916,7 +1916,7 @@ func (e *Engine) writeSnapshotAndCommit(log *zap.Logger, closedFiles []string, s
}()
// write the new snapshot files
newFiles, err := e.Compactor.WriteSnapshot(snapshot)
newFiles, err := e.Compactor.WriteSnapshot(snapshot, e.logger)
if err != nil {
log.Info("Error writing snapshot from compactor", zap.Error(err))
return err
@ -2161,11 +2161,10 @@ func (s *compactionStrategy) compactGroup() {
err error
files []string
)
if s.fast {
files, err = s.compactor.CompactFast(group)
files, err = s.compactor.CompactFast(group, log)
} else {
files, err = s.compactor.CompactFull(group)
files, err = s.compactor.CompactFull(group, log)
}
if err != nil {

View File

@ -2735,7 +2735,7 @@ func (m *mockPlanner) Plan(lastWrite time.Time) []tsm1.CompactionGroup { return
func (m *mockPlanner) PlanLevel(level int) []tsm1.CompactionGroup { return nil }
func (m *mockPlanner) PlanOptimize() []tsm1.CompactionGroup { return nil }
func (m *mockPlanner) Release(groups []tsm1.CompactionGroup) {}
func (m *mockPlanner) FullyCompacted() bool { return false }
func (m *mockPlanner) FullyCompacted() (bool, string) { return false, "not compacted" }
func (m *mockPlanner) ForceFull() {}
func (m *mockPlanner) SetFileStore(fs *tsm1.FileStore) {}