diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index d170c65688..38caecdb36 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -28,9 +28,11 @@ import ( "github.com/influxdata/influxdb/v2/pkg/limiter" "github.com/influxdata/influxdb/v2/tsdb" + "go.uber.org/zap" ) const maxTSMFileSize = uint32(2048 * 1024 * 1024) // 2GB +const logEvery = 2 * DefaultSegmentSize const ( // CompactionTempExtension is the extension used for temporary files created during compaction. @@ -91,7 +93,7 @@ type CompactionPlanner interface { PlanLevel(level int) []CompactionGroup PlanOptimize() []CompactionGroup Release(group []CompactionGroup) - FullyCompacted() bool + FullyCompacted() (bool, string) // ForceFull causes the planner to return a full compaction plan the next // time Plan() is called if there are files that could be compacted. @@ -211,9 +213,15 @@ func (c *DefaultPlanner) ParseFileName(path string) (int, int, error) { } // FullyCompacted returns true if the shard is fully compacted. -func (c *DefaultPlanner) FullyCompacted() bool { +func (c *DefaultPlanner) FullyCompacted() (bool, string) { gens := c.findGenerations(false) - return len(gens) <= 1 && !gens.hasTombstones() + if len(gens) > 1 { + return false, "not fully compacted and not idle because of more than one generation" + } else if gens.hasTombstones() { + return false, "not fully compacted and not idle because of tombstones" + } else { + return true, "" + } } // ForceFull causes the planner to return a full compaction plan the next time @@ -254,7 +262,7 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup { for i := 0; i < len(generations); i++ { cur := generations[i] - // See if this generation is orphan'd which would prevent it from being further + // See if this generation is orphaned which would prevent it from being further // compacted until a final full compaction runs. if i < len(generations)-1 { if cur.level() < generations[i+1].level() { @@ -818,7 +826,7 @@ func (c *Compactor) EnableCompactions() { } // WriteSnapshot writes a Cache snapshot to one or more new TSM files. -func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { +func (c *Compactor) WriteSnapshot(cache *Cache, logger *zap.Logger) ([]string, error) { c.mu.RLock() enabled := c.snapshotsEnabled intC := c.snapshotsInterrupt @@ -857,7 +865,7 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { for i := 0; i < concurrency; i++ { go func(sp *Cache) { iter := NewCacheKeyIterator(sp, tsdb.DefaultMaxPointsPerBlock, intC) - files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, nil, iter, throttle) + files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, nil, iter, throttle, logger) resC <- res{files: files, err: err} }(splits[i]) @@ -891,7 +899,7 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { } // compact writes multiple smaller TSM files into 1 or more larger files. -func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) { +func (c *Compactor) compact(fast bool, tsmFiles []string, logger *zap.Logger) ([]string, error) { size := c.Size if size <= 0 { size = tsdb.DefaultMaxPointsPerBlock @@ -942,6 +950,7 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) { } if len(trs) == 0 { + logger.Debug("No input files") return nil, nil } @@ -950,11 +959,11 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) { return nil, err } - return c.writeNewFiles(maxGeneration, maxSequence, tsmFiles, tsm, true) + return c.writeNewFiles(maxGeneration, maxSequence, tsmFiles, tsm, true, logger) } // CompactFull writes multiple smaller TSM files into 1 or more larger files. -func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) { +func (c *Compactor) CompactFull(tsmFiles []string, logger *zap.Logger) ([]string, error) { c.mu.RLock() enabled := c.compactionsEnabled c.mu.RUnlock() @@ -968,7 +977,7 @@ func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) { } defer c.remove(tsmFiles) - files, err := c.compact(false, tsmFiles) + files, err := c.compact(false, tsmFiles, logger) // See if we were disabled while writing a snapshot c.mu.RLock() @@ -986,7 +995,7 @@ func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) { } // CompactFast writes multiple smaller TSM files into 1 or more larger files. -func (c *Compactor) CompactFast(tsmFiles []string) ([]string, error) { +func (c *Compactor) CompactFast(tsmFiles []string, logger *zap.Logger) ([]string, error) { c.mu.RLock() enabled := c.compactionsEnabled c.mu.RUnlock() @@ -1000,7 +1009,7 @@ func (c *Compactor) CompactFast(tsmFiles []string) ([]string, error) { } defer c.remove(tsmFiles) - files, err := c.compact(true, tsmFiles) + files, err := c.compact(true, tsmFiles, logger) // See if we were disabled while writing a snapshot c.mu.RLock() @@ -1031,7 +1040,7 @@ func (c *Compactor) removeTmpFiles(files []string) error { // writeNewFiles writes from the iterator into new TSM files, rotating // to a new file once it has reached the max TSM file size. -func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter KeyIterator, throttle bool) ([]string, error) { +func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter KeyIterator, throttle bool, logger *zap.Logger) ([]string, error) { // These are the new TSM files written var files []string @@ -1040,16 +1049,19 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K // New TSM files are written to a temp file and renamed when fully completed. fileName := filepath.Join(c.Dir, c.formatFileName(generation, sequence)+"."+TSMFileExtension+"."+TmpTSMFileExtension) + logger.Debug("Compacting files", zap.Int("file_count", len(src)), zap.String("output_file", fileName)) // Write as much as possible to this file - err := c.write(fileName, iter, throttle) + err := c.write(fileName, iter, throttle, logger) // We've hit the max file limit and there is more to write. Create a new file // and continue. if err == errMaxFileExceeded || err == ErrMaxBlocksExceeded { files = append(files, fileName) + logger.Debug("file size or block count exceeded, opening another output file", zap.String("output_file", fileName)) continue } else if err == ErrNoValues { + logger.Debug("Dropping empty file", zap.String("output_file", fileName)) // If the file only contained tombstoned entries, then it would be a 0 length // file that we can drop. if err := os.RemoveAll(fileName); err != nil { @@ -1061,16 +1073,15 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K // planner keeps track of which files are assigned to compaction plans now. return nil, err } else if err != nil { + // We hit an error and didn't finish the compaction. Abort. // Remove any tmp files we already completed + // discard later errors to return the first one from the write() call for _, f := range files { - if err := os.RemoveAll(f); err != nil { - return nil, err - } - } - // We hit an error and didn't finish the compaction. Remove the temp file and abort. - if err := os.RemoveAll(fileName); err != nil { - return nil, err + _ = os.RemoveAll(f) } + // Remove the temp file + // discard later errors to return the first one from the write() call + _ = os.RemoveAll(fileName) return nil, err } @@ -1081,7 +1092,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K return files, nil } -func (c *Compactor) write(path string, iter KeyIterator, throttle bool) (err error) { +func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *zap.Logger) (err error) { fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666) if err != nil { return errCompactionInProgress{err: err} @@ -1134,10 +1145,11 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool) (err err } if err != nil { - w.Remove() + _ = w.Remove() } }() + lastLogSize := w.Size() for iter.Next() { c.mu.RLock() enabled := c.snapshotsEnabled || c.compactionsEnabled @@ -1176,6 +1188,9 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool) (err err } return errMaxFileExceeded + } else if (w.Size() - lastLogSize) > logEvery { + logger.Debug("Compaction progress", zap.String("output_file", path), zap.Uint32("size", w.Size())) + lastLogSize = w.Size() } } @@ -1188,6 +1203,7 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool) (err err if err := w.WriteIndex(); err != nil { return err } + logger.Debug("Compaction finished", zap.String("output_file", path), zap.Uint32("size", w.Size())) return nil } diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index 6f65374e00..bd02ea8684 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/influxdb/v2/tsdb" "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" + "go.uber.org/zap" ) // Tests compacting a Cache snapshot into a single TSM file @@ -39,7 +40,7 @@ func TestCompactor_Snapshot(t *testing.T) { compactor.Dir = dir compactor.FileStore = &fakeFileStore{} - files, err := compactor.WriteSnapshot(c) + files, err := compactor.WriteSnapshot(c, zap.NewNop()) if err == nil { t.Fatalf("expected error writing snapshot: %v", err) } @@ -50,7 +51,7 @@ func TestCompactor_Snapshot(t *testing.T) { compactor.Open() - files, err = compactor.WriteSnapshot(c) + files, err = compactor.WriteSnapshot(c, zap.NewNop()) if err != nil { t.Fatalf("unexpected error writing snapshot: %v", err) } @@ -119,7 +120,7 @@ func TestCompactor_CompactFullLastTimestamp(t *testing.T) { compactor.FileStore = fs compactor.Open() - files, err := compactor.CompactFull([]string{f1, f2}) + files, err := compactor.CompactFull([]string{f1, f2}, zap.NewNop()) if err != nil { t.Fatalf("unexpected error writing snapshot: %#v", err) } @@ -175,7 +176,7 @@ func TestCompactor_CompactFull(t *testing.T) { compactor.Dir = dir compactor.FileStore = fs - files, err := compactor.CompactFull([]string{f1, f2, f3}) + files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop()) if err == nil { t.Fatalf("expected error writing snapshot: %v", err) } @@ -186,7 +187,7 @@ func TestCompactor_CompactFull(t *testing.T) { compactor.Open() - files, err = compactor.CompactFull([]string{f1, f2, f3}) + files, err = compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop()) if err != nil { t.Fatalf("unexpected error writing snapshot: %v", err) } @@ -285,7 +286,7 @@ func TestCompactor_DecodeError(t *testing.T) { compactor.Dir = dir compactor.FileStore = fs - files, err := compactor.CompactFull([]string{f1, f2, f3}) + files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop()) if err == nil { t.Fatalf("expected error writing snapshot: %v", err) } @@ -296,9 +297,7 @@ func TestCompactor_DecodeError(t *testing.T) { compactor.Open() - _, err = compactor.CompactFull([]string{f1, f2, f3}) - if err == nil || - !strings.Contains(err.Error(), "decode error: unable to decompress block type float for key 'cpu,host=A#!~#value': unpackBlock: not enough data for timestamp") { + if _, err = compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop()); err == nil || !strings.Contains(err.Error(), "decode error: unable to decompress block type float for key 'cpu,host=A#!~#value': unpackBlock: not enough data for timestamp") { t.Fatalf("expected error writing snapshot: %v", err) } } @@ -336,7 +335,7 @@ func TestCompactor_Compact_OverlappingBlocks(t *testing.T) { compactor.Open() - files, err := compactor.CompactFast([]string{f1, f3}) + files, err := compactor.CompactFast([]string{f1, f3}, zap.NewNop()) if err != nil { t.Fatalf("unexpected error writing snapshot: %v", err) } @@ -416,7 +415,7 @@ func TestCompactor_Compact_OverlappingBlocksMultiple(t *testing.T) { compactor.Open() - files, err := compactor.CompactFast([]string{f1, f2, f3}) + files, err := compactor.CompactFast([]string{f1, f2, f3}, zap.NewNop()) if err != nil { t.Fatalf("unexpected error writing snapshot: %v", err) } @@ -484,7 +483,7 @@ func TestCompactor_Compact_UnsortedBlocks(t *testing.T) { compactor.Open() - files, err := compactor.CompactFast([]string{f1, f2}) + files, err := compactor.CompactFast([]string{f1, f2}, zap.NewNop()) if err != nil { t.Fatalf("unexpected error writing snapshot: %v", err) } @@ -558,7 +557,7 @@ func TestCompactor_Compact_UnsortedBlocksOverlapping(t *testing.T) { compactor.Open() - files, err := compactor.CompactFast([]string{f1, f2, f3}) + files, err := compactor.CompactFast([]string{f1, f2, f3}, zap.NewNop()) if err != nil { t.Fatalf("unexpected error writing snapshot: %v", err) } @@ -629,7 +628,7 @@ func TestCompactor_CompactFull_SkipFullBlocks(t *testing.T) { compactor.Size = 2 compactor.Open() - files, err := compactor.CompactFull([]string{f1, f2, f3}) + files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop()) if err != nil { t.Fatalf("unexpected error writing snapshot: %v", err) } @@ -731,7 +730,7 @@ func TestCompactor_CompactFull_TombstonedSkipBlock(t *testing.T) { compactor.Size = 2 compactor.Open() - files, err := compactor.CompactFull([]string{f1, f2, f3}) + files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop()) if err != nil { t.Fatalf("unexpected error writing snapshot: %v", err) } @@ -834,7 +833,7 @@ func TestCompactor_CompactFull_TombstonedPartialBlock(t *testing.T) { compactor.Size = 2 compactor.Open() - files, err := compactor.CompactFull([]string{f1, f2, f3}) + files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop()) if err != nil { t.Fatalf("unexpected error writing snapshot: %v", err) } @@ -942,7 +941,7 @@ func TestCompactor_CompactFull_TombstonedMultipleRanges(t *testing.T) { compactor.Size = 2 compactor.Open() - files, err := compactor.CompactFull([]string{f1, f2, f3}) + files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop()) if err != nil { t.Fatalf("unexpected error writing snapshot: %v", err) } @@ -1058,7 +1057,7 @@ func TestCompactor_CompactFull_MaxKeys(t *testing.T) { compactor.Open() // Compact both files, should get 2 files back - files, err := compactor.CompactFull([]string{f1Name, f2Name}) + files, err := compactor.CompactFull([]string{f1Name, f2Name}, zap.NewNop()) if err != nil { t.Fatalf("unexpected error writing snapshot: %v", err) } diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index ba214da5cf..d851e1f228 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -906,8 +906,8 @@ func (e *Engine) IsIdle() (state bool, reason string) { if cacheSize := e.Cache.Size(); cacheSize > 0 { return false, "not idle because cache size is nonzero" - } else if !e.CompactionPlan.FullyCompacted() { - return false, "not idle because shard is not fully compacted" + } else if c, r := e.CompactionPlan.FullyCompacted(); !c { + return false, r } else { return true, "" } @@ -1916,7 +1916,7 @@ func (e *Engine) writeSnapshotAndCommit(log *zap.Logger, closedFiles []string, s }() // write the new snapshot files - newFiles, err := e.Compactor.WriteSnapshot(snapshot) + newFiles, err := e.Compactor.WriteSnapshot(snapshot, e.logger) if err != nil { log.Info("Error writing snapshot from compactor", zap.Error(err)) return err @@ -2161,11 +2161,10 @@ func (s *compactionStrategy) compactGroup() { err error files []string ) - if s.fast { - files, err = s.compactor.CompactFast(group) + files, err = s.compactor.CompactFast(group, log) } else { - files, err = s.compactor.CompactFull(group) + files, err = s.compactor.CompactFull(group, log) } if err != nil { diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index abc5e50065..58b9594251 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -2735,7 +2735,7 @@ func (m *mockPlanner) Plan(lastWrite time.Time) []tsm1.CompactionGroup { return func (m *mockPlanner) PlanLevel(level int) []tsm1.CompactionGroup { return nil } func (m *mockPlanner) PlanOptimize() []tsm1.CompactionGroup { return nil } func (m *mockPlanner) Release(groups []tsm1.CompactionGroup) {} -func (m *mockPlanner) FullyCompacted() bool { return false } +func (m *mockPlanner) FullyCompacted() (bool, string) { return false, "not compacted" } func (m *mockPlanner) ForceFull() {} func (m *mockPlanner) SetFileStore(fs *tsm1.FileStore) {}