diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 048d46631e..fc0cd2a40a 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -69,6 +69,11 @@ func (e errCompactionInProgress) Unwrap() error { return e.err } +func (e errCompactionInProgress) Is(target error) bool { + _, ok := target.(errCompactionInProgress) + return ok +} + type errCompactionAborted struct { err error } @@ -80,6 +85,15 @@ func (e errCompactionAborted) Error() string { return "compaction aborted" } +func (e errCompactionAborted) Unwrap() error { + return e.err +} + +func (e errCompactionAborted) Is(target error) bool { + _, ok := target.(errCompactionAborted) + return ok +} + type errBlockRead struct { file string err error @@ -1053,7 +1067,7 @@ func (c *Compactor) CompactFull(tsmFiles []string, logger *zap.Logger, pointsPer c.mu.RUnlock() if !enabled { - if err := c.RemoveTmpFiles(files); err != nil { + if err := removeTmpFiles(files); err != nil { return nil, err } return nil, errCompactionsDisabled @@ -1085,7 +1099,7 @@ func (c *Compactor) CompactFast(tsmFiles []string, logger *zap.Logger, pointsPer c.mu.RUnlock() if !enabled { - if err := c.RemoveTmpFiles(files); err != nil { + if err := removeTmpFiles(files); err != nil { return nil, err } return nil, errCompactionsDisabled @@ -1095,9 +1109,9 @@ func (c *Compactor) CompactFast(tsmFiles []string, logger *zap.Logger, pointsPer } -// RemoveTmpFiles is responsible for cleaning up a compaction that +// removeTmpFiles is responsible for cleaning up a compaction that // was started, but then abandoned before the temporary files were dealt with. -func (c *Compactor) RemoveTmpFiles(files []string) error { +func removeTmpFiles(files []string) error { var errs []error for _, f := range files { if err := os.Remove(f); err != nil { @@ -1107,8 +1121,8 @@ func (c *Compactor) RemoveTmpFiles(files []string) error { return errors.Join(errs...) } -func (c *Compactor) RemoveTmpFilesOnErr(files []string, originalErrs ...error) error { - removeErr := c.RemoveTmpFiles(files) +func removeTmpFilesOnErr(files []string, originalErrs ...error) error { + removeErr := removeTmpFiles(files) if removeErr == nil { if len(originalErrs) == 1 { return originalErrs[0] @@ -1151,7 +1165,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K // file that we can drop. if err := os.RemoveAll(fileName); err != nil { // Only return an error if we couldn't remove the temp files - return nil, c.RemoveTmpFilesOnErr(files, err) + return nil, removeTmpFilesOnErr(files, err) } break } else if errors.As(err, &eInProgress) { @@ -1164,12 +1178,12 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K } // We hit an error and didn't finish the compaction. Abort. // Remove any tmp files we already completed - return nil, c.RemoveTmpFilesOnErr(files, err) + return nil, removeTmpFilesOnErr(files, err) } else if err != nil { // We hit an error and didn't finish the compaction. Abort. // Remove any tmp files we already completed, as well as the current // file we were writing to. - return nil, c.RemoveTmpFilesOnErr(files, err, os.RemoveAll(fileName)) + return nil, removeTmpFilesOnErr(files, err, os.RemoveAll(fileName)) } files = append(files, fileName) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 84bc642bf1..3ad3b36a8f 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -2646,21 +2646,21 @@ func (s *compactionStrategy) compactGroup() { if err != nil { defer func(fs []string) { - if removeErr := s.compactor.RemoveTmpFiles(fs); removeErr != nil { - log.Warn("Unable to remove temporary file(s)", zap.Error(removeErr)) + if removeErr := removeTmpFiles(fs); removeErr != nil { + log.Error("Unable to remove temporary file(s)", zap.Error(removeErr), zap.Strings("files", fs)) } }(files) - _, inProgress := err.(errCompactionInProgress) - if err == errCompactionsDisabled || inProgress { + inProgress := errors.Is(err, errCompactionInProgress{}) + if errors.Is(err, errCompactionsDisabled) || inProgress { log.Info("Aborted compaction", zap.Error(err)) - if _, ok := err.(errCompactionInProgress); ok { + if inProgress { time.Sleep(time.Second) } return } - log.Warn("Error compacting TSM files", zap.Error(err)) + log.Error("Error compacting TSM files", zap.Error(err)) MoveTsmOnReadErr(err, log, s.fileStore.Replace) @@ -2683,11 +2683,7 @@ func (s *compactionStrategy) compactGroup() { return } - for i, f := range files { - log.Info("Compacted file", zap.Int("tsm1_index", i), zap.String("tsm1_file", f)) - } - log.Info("Finished compacting files", - zap.Int("tsm1_files_n", len(files))) + log.Info("Finished compacting and renaming files", zap.Int("count", len(files)), zap.Strings("files", files)) atomic.AddInt64(s.successStat, 1) } @@ -2698,9 +2694,9 @@ func MoveTsmOnReadErr(err error, log *zap.Logger, replaceFn func([]string, []str path := blockReadErr.file log.Info("Renaming a corrupt TSM file due to compaction error", zap.String("file", path), zap.Error(err)) if err := replaceFn([]string{path}, nil); err != nil { - log.Info("Error removing bad TSM file", zap.String("file", path), zap.Error(err)) + log.Error("Failed removing bad TSM file", zap.String("file", path), zap.Error(err)) } else if e := os.Rename(path, path+"."+BadTSMFileExtension); e != nil { - log.Info("Error renaming corrupt TSM file", zap.String("file", path), zap.Error(err)) + log.Error("Failed renaming corrupt TSM file", zap.String("file", path), zap.Error(err)) } } } diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index 3f3f1e5d16..e34d13b8b2 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -875,12 +875,13 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF for _, file := range newFiles { if !strings.HasSuffix(file, tsmTmpExt) && !strings.HasSuffix(file, TSMFileExtension) { // This isn't a .tsm or .tsm.tmp file. + f.logger.Debug("wrong file type for rename: not a TSM file", zap.String("file", file)) continue } // give the observer a chance to process the file first. if err := f.obs.FileFinishing(file); err != nil { - return err + return fmt.Errorf("error from observer on file rename of %s: %w", file, err) } var oldName, newName = file, file @@ -900,7 +901,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF err = fmt.Errorf("failed opening %s: %w", newName, err) if newName != oldName { if err1 := os.Rename(newName, oldName); err1 != nil { - return errors.Join(err, fmt.Errorf("failed renaming %s to %s: %w", oldName, newName, err1)) + return errors.Join(err, fmt.Errorf("failed renaming %s back to %s: %w", newName, oldName, err1)) } } return err @@ -918,7 +919,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF err = fmt.Errorf("failed creating TSMReader for %s: %w", newName, err) if newName != oldName { if err1 := os.Rename(newName, oldName); err1 != nil { - return errors.Join(err, fmt.Errorf("failed renaming %s to %s: %w", oldName, newName, err1)) + return errors.Join(err, fmt.Errorf("failed renaming %s back to %s: %w", newName, oldName, err1)) } } return err @@ -952,12 +953,12 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF // give the observer a chance to process the file first. if err := f.obs.FileUnlinking(file.Path()); err != nil { - return err + return fmt.Errorf("error from observer on file unlinking %s: %w", file.Path(), err) } if ts := file.TombstoneStats(); ts.TombstoneExists { if err := f.obs.FileUnlinking(ts.Path); err != nil { - return err + return fmt.Errorf("error from observer on tombstone file unlinking %s: %w", ts.Path, err) } } @@ -981,7 +982,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF // Rename the TSM file used by this reader tempPath := fmt.Sprintf("%s.%s", file.Path(), TmpTSMFileExtension) if err := file.Rename(tempPath); err != nil { - return fmt.Errorf("failed renaming open TSM file to %s: %w", tempPath, err) + return fmt.Errorf("failed renaming open TSM file %s to %s: %w", file.Path(), tempPath, err) } // Remove the old file and tombstones. We can't use the normal TSMReader.Remove()