From 2ab5aad52e9e7a38fa6cfef49156c2055d51eefc Mon Sep 17 00:00:00 2001 From: davidby-influx <72418212+davidby-influx@users.noreply.github.com> Date: Wed, 5 Mar 2025 11:46:07 -0800 Subject: [PATCH] chore: add logging to Filestore.purger (#26089) Also fixes error type checks in TestCompactor_CompactFull_InProgress --- tsdb/engine/tsm1/compact.go | 6 +++- tsdb/engine/tsm1/compact_test.go | 9 +++--- tsdb/engine/tsm1/engine.go | 10 +++--- tsdb/engine/tsm1/file_store.go | 52 ++++++++++++++++++++------------ 4 files changed, 47 insertions(+), 30 deletions(-) diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index bcb0359811..ed4d880642 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -1088,7 +1088,11 @@ func (c *Compactor) RemoveTmpFiles(files []string) error { func (c *Compactor) RemoveTmpFilesOnErr(files []string, originalErrs ...error) error { removeErr := c.RemoveTmpFiles(files) if removeErr == nil { - return errors.Join(originalErrs...) + if len(originalErrs) == 1 { + return originalErrs[0] + } else { + return errors.Join(originalErrs...) + } } else if errJoin, ok := removeErr.(interface{ Unwrap() []error }); ok { return errors.Join(append(originalErrs, errJoin.Unwrap()...)...) } else { diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index 578a63c8a4..572e3e9761 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -299,7 +299,7 @@ func TestCompactor_DecodeError(t *testing.T) { _, err = compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop()) require.ErrorContains(t, err, "decode error: unable to decompress block type float for key 'cpu,host=A#!~#value': unpackBlock: not enough data for timestamp") - tsm1.MoveTsmOnReadErr(err, zap.NewNop(), func(strings []string, strings2 []string, f func([]tsm1.TSMFile)) error { + tsm1.MoveTsmOnReadErr(err, zap.NewNop(), func(strings []string, strings2 []string) error { require.Equal(t, 1, len(strings)) require.Equal(t, strings[0], f3) return nil @@ -1137,11 +1137,10 @@ func TestCompactor_CompactFull_InProgress(t *testing.T) { }() _, err = compactor.CompactFull([]string{f2Name}, zap.NewNop()) assert.Errorf(t, err, "expected an error writing snapshot for %s", f2Name) - e := errors.Unwrap(err) - assert.NotNil(t, e, "expected an error wrapped by errCompactionInProgress") - assert.Truef(t, errors.Is(e, fs.ErrExist), "error did not indicate file existence: %v", e) + assert.ErrorContainsf(t, err, "file exists", "unexpected error writing snapshot for %s", f2Name) + assert.Truef(t, errors.Is(err, fs.ErrExist), "error did not indicate file existence: %v", err) pathErr := &os.PathError{} - assert.Truef(t, errors.As(e, &pathErr), "expected path error, got %v", e) + assert.Truef(t, errors.As(err, &pathErr), "expected path error, got %v", err) assert.Truef(t, errors.Is(pathErr, fs.ErrExist), "error did not indicate file existence: %v", pathErr) } diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index cb07db1a59..f93fabd70b 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -2337,15 +2337,15 @@ func (s *compactionStrategy) compactGroup() { log.Warn("Error compacting TSM files", zap.Error(err)) - MoveTsmOnReadErr(err, log, s.fileStore.ReplaceWithCallback) + MoveTsmOnReadErr(err, log, s.fileStore.Replace) atomic.AddInt64(s.errorStat, 1) time.Sleep(time.Second) return } - if err := s.fileStore.ReplaceWithCallback(group, files, nil); err != nil { - log.Info("Error replacing new TSM files", zap.Error(err)) + if err := s.fileStore.Replace(group, files); err != nil { + log.Error("Error replacing new TSM files", zap.Error(err)) atomic.AddInt64(s.errorStat, 1) time.Sleep(time.Second) @@ -2366,13 +2366,13 @@ func (s *compactionStrategy) compactGroup() { atomic.AddInt64(s.successStat, 1) } -func MoveTsmOnReadErr(err error, log *zap.Logger, ReplaceWithCallback func([]string, []string, func([]TSMFile)) error) { +func MoveTsmOnReadErr(err error, log *zap.Logger, replaceFn func([]string, []string) error) { var blockReadErr errBlockRead // We hit a bad TSM file - rename so the next compaction can proceed. if ok := errors.As(err, &blockReadErr); ok { path := blockReadErr.file log.Info("Renaming a corrupt TSM file due to compaction error", zap.String("file", path), zap.Error(err)) - if err := ReplaceWithCallback([]string{path}, nil, nil); err != nil { + if err := replaceFn([]string{path}, nil); err != nil { log.Info("Error removing bad TSM file", zap.String("file", path), zap.Error(err)) } else if e := os.Rename(path, path+"."+BadTSMFileExtension); e != nil { log.Info("Error renaming corrupt TSM file", zap.String("file", path), zap.Error(err)) diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index 044cbfb352..9f17be49dc 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -18,6 +18,7 @@ import ( "syscall" "time" + "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/file" "github.com/influxdata/influxdb/pkg/limiter" @@ -822,11 +823,6 @@ func (f *FileStore) Stats() []FileStat { return f.lastFileStats } -// ReplaceWithCallback replaces oldFiles with newFiles and calls updatedFn with the files to be added the FileStore. -func (f *FileStore) ReplaceWithCallback(oldFiles, newFiles []string, updatedFn func(r []TSMFile)) error { - return f.replace(oldFiles, newFiles, updatedFn) -} - // Replace replaces oldFiles with newFiles. func (f *FileStore) Replace(oldFiles, newFiles []string) error { return f.replace(oldFiles, newFiles, nil) @@ -861,18 +857,19 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF // The new TSM files have a tmp extension. First rename them. newName = file[:len(file)-4] if err := os.Rename(oldName, newName); err != nil { - return err + return fmt.Errorf("failed renaming %s to %s: %w", oldName, newName, err) } } - // Any error after this point should result in the file being bein named + // Any error after this point should result in the file being named // back to the original name. The caller then has the opportunity to // remove it. fd, err := os.Open(newName) if err != nil { + err = fmt.Errorf("failed opening %s: %w", newName, err) if newName != oldName { if err1 := os.Rename(newName, oldName); err1 != nil { - return err1 + return errors.Join(err, fmt.Errorf("failed renaming %s to %s: %w", oldName, newName, err1)) } } return err @@ -887,9 +884,10 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF tsm, err := NewTSMReader(fd, f.readerOptions...) if err != nil { + err = fmt.Errorf("failed creating TSMReader for %s: %w", newName, err) if newName != oldName { if err1 := os.Rename(newName, oldName); err1 != nil { - return err1 + return errors.Join(err, fmt.Errorf("failed renaming %s to %s: %w", oldName, newName, err1)) } } return err @@ -952,14 +950,14 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF // Rename the TSM file used by this reader tempPath := fmt.Sprintf("%s.%s", file.Path(), TmpTSMFileExtension) if err := file.Rename(tempPath); err != nil { - return err + return fmt.Errorf("failed renaming open TSM file to %s: %w", tempPath, err) } // Remove the old file and tombstones. We can't use the normal TSMReader.Remove() // because it now refers to our temp file which we can't remove. for _, f := range deletes { if err := os.Remove(f); err != nil { - return err + return fmt.Errorf("failed removing old file %s: %w", f, err) } } @@ -968,11 +966,11 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF } if err := file.Close(); err != nil { - return err + return fmt.Errorf("failed to close TSM file %s: %w", file.Path(), err) } if err := file.Remove(); err != nil { - return err + return fmt.Errorf("failed to remove TSM file %s: %w", file.Path(), err) } break } @@ -984,7 +982,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF } if err := file.SyncDir(f.dir); err != nil { - return err + return fmt.Errorf("failed to sync directory %s: %w", f.dir, err) } // Tell the purger about our in-use files we need to remove @@ -1570,24 +1568,38 @@ type purger struct { } func (p *purger) add(files []TSMFile) { + var fileNames []string p.mu.Lock() for _, f := range files { - p.files[f.Path()] = f + fileName := f.Path() + fileNames = append(fileNames, fileName) + p.files[fileName] = f } p.mu.Unlock() - p.purge() + p.purge(fileNames) } -func (p *purger) purge() { +func (p *purger) purge(fileNames []string) { + logger, logEndOp := logger.NewOperation(p.logger, "Purge held files", "filestore_purger") + + logger.Info("added", zap.Int("count", len(fileNames))) + logger.Debug("purging", zap.Strings("files", fileNames)) p.mu.Lock() if p.running { p.mu.Unlock() + logger.Info("already running, files added to previous operation") + logEndOp() return } p.running = true p.mu.Unlock() go func() { + var purgeCount int + defer func() { + logger.Info("removed", zap.Int("files", purgeCount)) + logEndOp() + }() for { p.mu.Lock() for k, v := range p.files { @@ -1598,15 +1610,17 @@ func (p *purger) purge() { // we allow calls to Ref and Unref under the read lock and no lock at all respectively. if !v.InUse() { if err := v.Close(); err != nil { - p.logger.Info("Purge: close file", zap.Error(err)) + logger.Error("close file failed", zap.String("file", k), zap.Error(err)) continue } if err := v.Remove(); err != nil { - p.logger.Info("Purge: remove file", zap.Error(err)) + logger.Error("remove file failed", zap.String("file", k), zap.Error(err)) continue } + logger.Debug("successfully removed", zap.String("file", k)) delete(p.files, k) + purgeCount++ } }