Co-authored-by: davidby-influx <72418212+davidby-influx@users.noreply.github.com>pull/26631/merge
parent
6f526cd15e
commit
c6453d9f44
|
@ -69,6 +69,11 @@ func (e errCompactionInProgress) Unwrap() error {
|
|||
return e.err
|
||||
}
|
||||
|
||||
func (e errCompactionInProgress) Is(target error) bool {
|
||||
_, ok := target.(errCompactionInProgress)
|
||||
return ok
|
||||
}
|
||||
|
||||
type errCompactionAborted struct {
|
||||
err error
|
||||
}
|
||||
|
@ -80,6 +85,15 @@ func (e errCompactionAborted) Error() string {
|
|||
return "compaction aborted"
|
||||
}
|
||||
|
||||
func (e errCompactionAborted) Unwrap() error {
|
||||
return e.err
|
||||
}
|
||||
|
||||
func (e errCompactionAborted) Is(target error) bool {
|
||||
_, ok := target.(errCompactionAborted)
|
||||
return ok
|
||||
}
|
||||
|
||||
type errBlockRead struct {
|
||||
file string
|
||||
err error
|
||||
|
@ -1053,7 +1067,7 @@ func (c *Compactor) CompactFull(tsmFiles []string, logger *zap.Logger, pointsPer
|
|||
c.mu.RUnlock()
|
||||
|
||||
if !enabled {
|
||||
if err := c.RemoveTmpFiles(files); err != nil {
|
||||
if err := removeTmpFiles(files); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, errCompactionsDisabled
|
||||
|
@ -1085,7 +1099,7 @@ func (c *Compactor) CompactFast(tsmFiles []string, logger *zap.Logger, pointsPer
|
|||
c.mu.RUnlock()
|
||||
|
||||
if !enabled {
|
||||
if err := c.RemoveTmpFiles(files); err != nil {
|
||||
if err := removeTmpFiles(files); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, errCompactionsDisabled
|
||||
|
@ -1095,9 +1109,9 @@ func (c *Compactor) CompactFast(tsmFiles []string, logger *zap.Logger, pointsPer
|
|||
|
||||
}
|
||||
|
||||
// RemoveTmpFiles is responsible for cleaning up a compaction that
|
||||
// removeTmpFiles is responsible for cleaning up a compaction that
|
||||
// was started, but then abandoned before the temporary files were dealt with.
|
||||
func (c *Compactor) RemoveTmpFiles(files []string) error {
|
||||
func removeTmpFiles(files []string) error {
|
||||
var errs []error
|
||||
for _, f := range files {
|
||||
if err := os.Remove(f); err != nil {
|
||||
|
@ -1107,8 +1121,8 @@ func (c *Compactor) RemoveTmpFiles(files []string) error {
|
|||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
func (c *Compactor) RemoveTmpFilesOnErr(files []string, originalErrs ...error) error {
|
||||
removeErr := c.RemoveTmpFiles(files)
|
||||
func removeTmpFilesOnErr(files []string, originalErrs ...error) error {
|
||||
removeErr := removeTmpFiles(files)
|
||||
if removeErr == nil {
|
||||
if len(originalErrs) == 1 {
|
||||
return originalErrs[0]
|
||||
|
@ -1151,7 +1165,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K
|
|||
// file that we can drop.
|
||||
if err := os.RemoveAll(fileName); err != nil {
|
||||
// Only return an error if we couldn't remove the temp files
|
||||
return nil, c.RemoveTmpFilesOnErr(files, err)
|
||||
return nil, removeTmpFilesOnErr(files, err)
|
||||
}
|
||||
break
|
||||
} else if errors.As(err, &eInProgress) {
|
||||
|
@ -1164,12 +1178,12 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K
|
|||
}
|
||||
// We hit an error and didn't finish the compaction. Abort.
|
||||
// Remove any tmp files we already completed
|
||||
return nil, c.RemoveTmpFilesOnErr(files, err)
|
||||
return nil, removeTmpFilesOnErr(files, err)
|
||||
} else if err != nil {
|
||||
// We hit an error and didn't finish the compaction. Abort.
|
||||
// Remove any tmp files we already completed, as well as the current
|
||||
// file we were writing to.
|
||||
return nil, c.RemoveTmpFilesOnErr(files, err, os.RemoveAll(fileName))
|
||||
return nil, removeTmpFilesOnErr(files, err, os.RemoveAll(fileName))
|
||||
}
|
||||
|
||||
files = append(files, fileName)
|
||||
|
|
|
@ -2646,21 +2646,21 @@ func (s *compactionStrategy) compactGroup() {
|
|||
|
||||
if err != nil {
|
||||
defer func(fs []string) {
|
||||
if removeErr := s.compactor.RemoveTmpFiles(fs); removeErr != nil {
|
||||
log.Warn("Unable to remove temporary file(s)", zap.Error(removeErr))
|
||||
if removeErr := removeTmpFiles(fs); removeErr != nil {
|
||||
log.Error("Unable to remove temporary file(s)", zap.Error(removeErr), zap.Strings("files", fs))
|
||||
}
|
||||
}(files)
|
||||
_, inProgress := err.(errCompactionInProgress)
|
||||
if err == errCompactionsDisabled || inProgress {
|
||||
inProgress := errors.Is(err, errCompactionInProgress{})
|
||||
if errors.Is(err, errCompactionsDisabled) || inProgress {
|
||||
log.Info("Aborted compaction", zap.Error(err))
|
||||
|
||||
if _, ok := err.(errCompactionInProgress); ok {
|
||||
if inProgress {
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
log.Warn("Error compacting TSM files", zap.Error(err))
|
||||
log.Error("Error compacting TSM files", zap.Error(err))
|
||||
|
||||
MoveTsmOnReadErr(err, log, s.fileStore.Replace)
|
||||
|
||||
|
@ -2683,11 +2683,7 @@ func (s *compactionStrategy) compactGroup() {
|
|||
return
|
||||
}
|
||||
|
||||
for i, f := range files {
|
||||
log.Info("Compacted file", zap.Int("tsm1_index", i), zap.String("tsm1_file", f))
|
||||
}
|
||||
log.Info("Finished compacting files",
|
||||
zap.Int("tsm1_files_n", len(files)))
|
||||
log.Info("Finished compacting and renaming files", zap.Int("count", len(files)), zap.Strings("files", files))
|
||||
atomic.AddInt64(s.successStat, 1)
|
||||
}
|
||||
|
||||
|
@ -2698,9 +2694,9 @@ func MoveTsmOnReadErr(err error, log *zap.Logger, replaceFn func([]string, []str
|
|||
path := blockReadErr.file
|
||||
log.Info("Renaming a corrupt TSM file due to compaction error", zap.String("file", path), zap.Error(err))
|
||||
if err := replaceFn([]string{path}, nil); err != nil {
|
||||
log.Info("Error removing bad TSM file", zap.String("file", path), zap.Error(err))
|
||||
log.Error("Failed removing bad TSM file", zap.String("file", path), zap.Error(err))
|
||||
} else if e := os.Rename(path, path+"."+BadTSMFileExtension); e != nil {
|
||||
log.Info("Error renaming corrupt TSM file", zap.String("file", path), zap.Error(err))
|
||||
log.Error("Failed renaming corrupt TSM file", zap.String("file", path), zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -875,12 +875,13 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
|
|||
for _, file := range newFiles {
|
||||
if !strings.HasSuffix(file, tsmTmpExt) && !strings.HasSuffix(file, TSMFileExtension) {
|
||||
// This isn't a .tsm or .tsm.tmp file.
|
||||
f.logger.Debug("wrong file type for rename: not a TSM file", zap.String("file", file))
|
||||
continue
|
||||
}
|
||||
|
||||
// give the observer a chance to process the file first.
|
||||
if err := f.obs.FileFinishing(file); err != nil {
|
||||
return err
|
||||
return fmt.Errorf("error from observer on file rename of %s: %w", file, err)
|
||||
}
|
||||
|
||||
var oldName, newName = file, file
|
||||
|
@ -900,7 +901,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
|
|||
err = fmt.Errorf("failed opening %s: %w", newName, err)
|
||||
if newName != oldName {
|
||||
if err1 := os.Rename(newName, oldName); err1 != nil {
|
||||
return errors.Join(err, fmt.Errorf("failed renaming %s to %s: %w", oldName, newName, err1))
|
||||
return errors.Join(err, fmt.Errorf("failed renaming %s back to %s: %w", newName, oldName, err1))
|
||||
}
|
||||
}
|
||||
return err
|
||||
|
@ -918,7 +919,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
|
|||
err = fmt.Errorf("failed creating TSMReader for %s: %w", newName, err)
|
||||
if newName != oldName {
|
||||
if err1 := os.Rename(newName, oldName); err1 != nil {
|
||||
return errors.Join(err, fmt.Errorf("failed renaming %s to %s: %w", oldName, newName, err1))
|
||||
return errors.Join(err, fmt.Errorf("failed renaming %s back to %s: %w", newName, oldName, err1))
|
||||
}
|
||||
}
|
||||
return err
|
||||
|
@ -952,12 +953,12 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
|
|||
|
||||
// give the observer a chance to process the file first.
|
||||
if err := f.obs.FileUnlinking(file.Path()); err != nil {
|
||||
return err
|
||||
return fmt.Errorf("error from observer on file unlinking %s: %w", file.Path(), err)
|
||||
}
|
||||
|
||||
if ts := file.TombstoneStats(); ts.TombstoneExists {
|
||||
if err := f.obs.FileUnlinking(ts.Path); err != nil {
|
||||
return err
|
||||
return fmt.Errorf("error from observer on tombstone file unlinking %s: %w", ts.Path, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -981,7 +982,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
|
|||
// Rename the TSM file used by this reader
|
||||
tempPath := fmt.Sprintf("%s.%s", file.Path(), TmpTSMFileExtension)
|
||||
if err := file.Rename(tempPath); err != nil {
|
||||
return fmt.Errorf("failed renaming open TSM file to %s: %w", tempPath, err)
|
||||
return fmt.Errorf("failed renaming open TSM file %s to %s: %w", file.Path(), tempPath, err)
|
||||
}
|
||||
|
||||
// Remove the old file and tombstones. We can't use the normal TSMReader.Remove()
|
||||
|
|
Loading…
Reference in New Issue