chore: improve logging on compaction failures (#26545) (#26630)

Co-authored-by: davidby-influx <72418212+davidby-influx@users.noreply.github.com>
pull/26631/merge
WeblWabl 2025-07-25 13:21:28 -05:00 committed by GitHub
parent 6f526cd15e
commit c6453d9f44
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 39 additions and 28 deletions

View File

@ -69,6 +69,11 @@ func (e errCompactionInProgress) Unwrap() error {
return e.err
}
func (e errCompactionInProgress) Is(target error) bool {
_, ok := target.(errCompactionInProgress)
return ok
}
type errCompactionAborted struct {
err error
}
@ -80,6 +85,15 @@ func (e errCompactionAborted) Error() string {
return "compaction aborted"
}
func (e errCompactionAborted) Unwrap() error {
return e.err
}
func (e errCompactionAborted) Is(target error) bool {
_, ok := target.(errCompactionAborted)
return ok
}
type errBlockRead struct {
file string
err error
@ -1053,7 +1067,7 @@ func (c *Compactor) CompactFull(tsmFiles []string, logger *zap.Logger, pointsPer
c.mu.RUnlock()
if !enabled {
if err := c.RemoveTmpFiles(files); err != nil {
if err := removeTmpFiles(files); err != nil {
return nil, err
}
return nil, errCompactionsDisabled
@ -1085,7 +1099,7 @@ func (c *Compactor) CompactFast(tsmFiles []string, logger *zap.Logger, pointsPer
c.mu.RUnlock()
if !enabled {
if err := c.RemoveTmpFiles(files); err != nil {
if err := removeTmpFiles(files); err != nil {
return nil, err
}
return nil, errCompactionsDisabled
@ -1095,9 +1109,9 @@ func (c *Compactor) CompactFast(tsmFiles []string, logger *zap.Logger, pointsPer
}
// RemoveTmpFiles is responsible for cleaning up a compaction that
// removeTmpFiles is responsible for cleaning up a compaction that
// was started, but then abandoned before the temporary files were dealt with.
func (c *Compactor) RemoveTmpFiles(files []string) error {
func removeTmpFiles(files []string) error {
var errs []error
for _, f := range files {
if err := os.Remove(f); err != nil {
@ -1107,8 +1121,8 @@ func (c *Compactor) RemoveTmpFiles(files []string) error {
return errors.Join(errs...)
}
func (c *Compactor) RemoveTmpFilesOnErr(files []string, originalErrs ...error) error {
removeErr := c.RemoveTmpFiles(files)
func removeTmpFilesOnErr(files []string, originalErrs ...error) error {
removeErr := removeTmpFiles(files)
if removeErr == nil {
if len(originalErrs) == 1 {
return originalErrs[0]
@ -1151,7 +1165,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K
// file that we can drop.
if err := os.RemoveAll(fileName); err != nil {
// Only return an error if we couldn't remove the temp files
return nil, c.RemoveTmpFilesOnErr(files, err)
return nil, removeTmpFilesOnErr(files, err)
}
break
} else if errors.As(err, &eInProgress) {
@ -1164,12 +1178,12 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K
}
// We hit an error and didn't finish the compaction. Abort.
// Remove any tmp files we already completed
return nil, c.RemoveTmpFilesOnErr(files, err)
return nil, removeTmpFilesOnErr(files, err)
} else if err != nil {
// We hit an error and didn't finish the compaction. Abort.
// Remove any tmp files we already completed, as well as the current
// file we were writing to.
return nil, c.RemoveTmpFilesOnErr(files, err, os.RemoveAll(fileName))
return nil, removeTmpFilesOnErr(files, err, os.RemoveAll(fileName))
}
files = append(files, fileName)

View File

@ -2646,21 +2646,21 @@ func (s *compactionStrategy) compactGroup() {
if err != nil {
defer func(fs []string) {
if removeErr := s.compactor.RemoveTmpFiles(fs); removeErr != nil {
log.Warn("Unable to remove temporary file(s)", zap.Error(removeErr))
if removeErr := removeTmpFiles(fs); removeErr != nil {
log.Error("Unable to remove temporary file(s)", zap.Error(removeErr), zap.Strings("files", fs))
}
}(files)
_, inProgress := err.(errCompactionInProgress)
if err == errCompactionsDisabled || inProgress {
inProgress := errors.Is(err, errCompactionInProgress{})
if errors.Is(err, errCompactionsDisabled) || inProgress {
log.Info("Aborted compaction", zap.Error(err))
if _, ok := err.(errCompactionInProgress); ok {
if inProgress {
time.Sleep(time.Second)
}
return
}
log.Warn("Error compacting TSM files", zap.Error(err))
log.Error("Error compacting TSM files", zap.Error(err))
MoveTsmOnReadErr(err, log, s.fileStore.Replace)
@ -2683,11 +2683,7 @@ func (s *compactionStrategy) compactGroup() {
return
}
for i, f := range files {
log.Info("Compacted file", zap.Int("tsm1_index", i), zap.String("tsm1_file", f))
}
log.Info("Finished compacting files",
zap.Int("tsm1_files_n", len(files)))
log.Info("Finished compacting and renaming files", zap.Int("count", len(files)), zap.Strings("files", files))
atomic.AddInt64(s.successStat, 1)
}
@ -2698,9 +2694,9 @@ func MoveTsmOnReadErr(err error, log *zap.Logger, replaceFn func([]string, []str
path := blockReadErr.file
log.Info("Renaming a corrupt TSM file due to compaction error", zap.String("file", path), zap.Error(err))
if err := replaceFn([]string{path}, nil); err != nil {
log.Info("Error removing bad TSM file", zap.String("file", path), zap.Error(err))
log.Error("Failed removing bad TSM file", zap.String("file", path), zap.Error(err))
} else if e := os.Rename(path, path+"."+BadTSMFileExtension); e != nil {
log.Info("Error renaming corrupt TSM file", zap.String("file", path), zap.Error(err))
log.Error("Failed renaming corrupt TSM file", zap.String("file", path), zap.Error(err))
}
}
}

View File

@ -875,12 +875,13 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
for _, file := range newFiles {
if !strings.HasSuffix(file, tsmTmpExt) && !strings.HasSuffix(file, TSMFileExtension) {
// This isn't a .tsm or .tsm.tmp file.
f.logger.Debug("wrong file type for rename: not a TSM file", zap.String("file", file))
continue
}
// give the observer a chance to process the file first.
if err := f.obs.FileFinishing(file); err != nil {
return err
return fmt.Errorf("error from observer on file rename of %s: %w", file, err)
}
var oldName, newName = file, file
@ -900,7 +901,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
err = fmt.Errorf("failed opening %s: %w", newName, err)
if newName != oldName {
if err1 := os.Rename(newName, oldName); err1 != nil {
return errors.Join(err, fmt.Errorf("failed renaming %s to %s: %w", oldName, newName, err1))
return errors.Join(err, fmt.Errorf("failed renaming %s back to %s: %w", newName, oldName, err1))
}
}
return err
@ -918,7 +919,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
err = fmt.Errorf("failed creating TSMReader for %s: %w", newName, err)
if newName != oldName {
if err1 := os.Rename(newName, oldName); err1 != nil {
return errors.Join(err, fmt.Errorf("failed renaming %s to %s: %w", oldName, newName, err1))
return errors.Join(err, fmt.Errorf("failed renaming %s back to %s: %w", newName, oldName, err1))
}
}
return err
@ -952,12 +953,12 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
// give the observer a chance to process the file first.
if err := f.obs.FileUnlinking(file.Path()); err != nil {
return err
return fmt.Errorf("error from observer on file unlinking %s: %w", file.Path(), err)
}
if ts := file.TombstoneStats(); ts.TombstoneExists {
if err := f.obs.FileUnlinking(ts.Path); err != nil {
return err
return fmt.Errorf("error from observer on tombstone file unlinking %s: %w", ts.Path, err)
}
}
@ -981,7 +982,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
// Rename the TSM file used by this reader
tempPath := fmt.Sprintf("%s.%s", file.Path(), TmpTSMFileExtension)
if err := file.Rename(tempPath); err != nil {
return fmt.Errorf("failed renaming open TSM file to %s: %w", tempPath, err)
return fmt.Errorf("failed renaming open TSM file %s to %s: %w", file.Path(), tempPath, err)
}
// Remove the old file and tombstones. We can't use the normal TSMReader.Remove()