chore: add logging to Filestore.purger (#26089)

Also fixes error type checks in
TestCompactor_CompactFull_InProgress
pull/26143/head
davidby-influx 2025-03-05 11:46:07 -08:00 committed by GitHub
parent 1efb8dad43
commit 2ab5aad52e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 47 additions and 30 deletions

View File

@ -1088,7 +1088,11 @@ func (c *Compactor) RemoveTmpFiles(files []string) error {
func (c *Compactor) RemoveTmpFilesOnErr(files []string, originalErrs ...error) error {
removeErr := c.RemoveTmpFiles(files)
if removeErr == nil {
return errors.Join(originalErrs...)
if len(originalErrs) == 1 {
return originalErrs[0]
} else {
return errors.Join(originalErrs...)
}
} else if errJoin, ok := removeErr.(interface{ Unwrap() []error }); ok {
return errors.Join(append(originalErrs, errJoin.Unwrap()...)...)
} else {

View File

@ -299,7 +299,7 @@ func TestCompactor_DecodeError(t *testing.T) {
_, err = compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop())
require.ErrorContains(t, err, "decode error: unable to decompress block type float for key 'cpu,host=A#!~#value': unpackBlock: not enough data for timestamp")
tsm1.MoveTsmOnReadErr(err, zap.NewNop(), func(strings []string, strings2 []string, f func([]tsm1.TSMFile)) error {
tsm1.MoveTsmOnReadErr(err, zap.NewNop(), func(strings []string, strings2 []string) error {
require.Equal(t, 1, len(strings))
require.Equal(t, strings[0], f3)
return nil
@ -1137,11 +1137,10 @@ func TestCompactor_CompactFull_InProgress(t *testing.T) {
}()
_, err = compactor.CompactFull([]string{f2Name}, zap.NewNop())
assert.Errorf(t, err, "expected an error writing snapshot for %s", f2Name)
e := errors.Unwrap(err)
assert.NotNil(t, e, "expected an error wrapped by errCompactionInProgress")
assert.Truef(t, errors.Is(e, fs.ErrExist), "error did not indicate file existence: %v", e)
assert.ErrorContainsf(t, err, "file exists", "unexpected error writing snapshot for %s", f2Name)
assert.Truef(t, errors.Is(err, fs.ErrExist), "error did not indicate file existence: %v", err)
pathErr := &os.PathError{}
assert.Truef(t, errors.As(e, &pathErr), "expected path error, got %v", e)
assert.Truef(t, errors.As(err, &pathErr), "expected path error, got %v", err)
assert.Truef(t, errors.Is(pathErr, fs.ErrExist), "error did not indicate file existence: %v", pathErr)
}

View File

@ -2337,15 +2337,15 @@ func (s *compactionStrategy) compactGroup() {
log.Warn("Error compacting TSM files", zap.Error(err))
MoveTsmOnReadErr(err, log, s.fileStore.ReplaceWithCallback)
MoveTsmOnReadErr(err, log, s.fileStore.Replace)
atomic.AddInt64(s.errorStat, 1)
time.Sleep(time.Second)
return
}
if err := s.fileStore.ReplaceWithCallback(group, files, nil); err != nil {
log.Info("Error replacing new TSM files", zap.Error(err))
if err := s.fileStore.Replace(group, files); err != nil {
log.Error("Error replacing new TSM files", zap.Error(err))
atomic.AddInt64(s.errorStat, 1)
time.Sleep(time.Second)
@ -2366,13 +2366,13 @@ func (s *compactionStrategy) compactGroup() {
atomic.AddInt64(s.successStat, 1)
}
func MoveTsmOnReadErr(err error, log *zap.Logger, ReplaceWithCallback func([]string, []string, func([]TSMFile)) error) {
func MoveTsmOnReadErr(err error, log *zap.Logger, replaceFn func([]string, []string) error) {
var blockReadErr errBlockRead
// We hit a bad TSM file - rename so the next compaction can proceed.
if ok := errors.As(err, &blockReadErr); ok {
path := blockReadErr.file
log.Info("Renaming a corrupt TSM file due to compaction error", zap.String("file", path), zap.Error(err))
if err := ReplaceWithCallback([]string{path}, nil, nil); err != nil {
if err := replaceFn([]string{path}, nil); err != nil {
log.Info("Error removing bad TSM file", zap.String("file", path), zap.Error(err))
} else if e := os.Rename(path, path+"."+BadTSMFileExtension); e != nil {
log.Info("Error renaming corrupt TSM file", zap.String("file", path), zap.Error(err))

View File

@ -18,6 +18,7 @@ import (
"syscall"
"time"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/file"
"github.com/influxdata/influxdb/pkg/limiter"
@ -822,11 +823,6 @@ func (f *FileStore) Stats() []FileStat {
return f.lastFileStats
}
// ReplaceWithCallback replaces oldFiles with newFiles and calls updatedFn with the files to be added the FileStore.
func (f *FileStore) ReplaceWithCallback(oldFiles, newFiles []string, updatedFn func(r []TSMFile)) error {
return f.replace(oldFiles, newFiles, updatedFn)
}
// Replace replaces oldFiles with newFiles.
func (f *FileStore) Replace(oldFiles, newFiles []string) error {
return f.replace(oldFiles, newFiles, nil)
@ -861,18 +857,19 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
// The new TSM files have a tmp extension. First rename them.
newName = file[:len(file)-4]
if err := os.Rename(oldName, newName); err != nil {
return err
return fmt.Errorf("failed renaming %s to %s: %w", oldName, newName, err)
}
}
// Any error after this point should result in the file being bein named
// Any error after this point should result in the file being named
// back to the original name. The caller then has the opportunity to
// remove it.
fd, err := os.Open(newName)
if err != nil {
err = fmt.Errorf("failed opening %s: %w", newName, err)
if newName != oldName {
if err1 := os.Rename(newName, oldName); err1 != nil {
return err1
return errors.Join(err, fmt.Errorf("failed renaming %s to %s: %w", oldName, newName, err1))
}
}
return err
@ -887,9 +884,10 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
tsm, err := NewTSMReader(fd, f.readerOptions...)
if err != nil {
err = fmt.Errorf("failed creating TSMReader for %s: %w", newName, err)
if newName != oldName {
if err1 := os.Rename(newName, oldName); err1 != nil {
return err1
return errors.Join(err, fmt.Errorf("failed renaming %s to %s: %w", oldName, newName, err1))
}
}
return err
@ -952,14 +950,14 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
// Rename the TSM file used by this reader
tempPath := fmt.Sprintf("%s.%s", file.Path(), TmpTSMFileExtension)
if err := file.Rename(tempPath); err != nil {
return err
return fmt.Errorf("failed renaming open TSM file to %s: %w", tempPath, err)
}
// Remove the old file and tombstones. We can't use the normal TSMReader.Remove()
// because it now refers to our temp file which we can't remove.
for _, f := range deletes {
if err := os.Remove(f); err != nil {
return err
return fmt.Errorf("failed removing old file %s: %w", f, err)
}
}
@ -968,11 +966,11 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
}
if err := file.Close(); err != nil {
return err
return fmt.Errorf("failed to close TSM file %s: %w", file.Path(), err)
}
if err := file.Remove(); err != nil {
return err
return fmt.Errorf("failed to remove TSM file %s: %w", file.Path(), err)
}
break
}
@ -984,7 +982,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
}
if err := file.SyncDir(f.dir); err != nil {
return err
return fmt.Errorf("failed to sync directory %s: %w", f.dir, err)
}
// Tell the purger about our in-use files we need to remove
@ -1570,24 +1568,38 @@ type purger struct {
}
func (p *purger) add(files []TSMFile) {
var fileNames []string
p.mu.Lock()
for _, f := range files {
p.files[f.Path()] = f
fileName := f.Path()
fileNames = append(fileNames, fileName)
p.files[fileName] = f
}
p.mu.Unlock()
p.purge()
p.purge(fileNames)
}
func (p *purger) purge() {
func (p *purger) purge(fileNames []string) {
logger, logEndOp := logger.NewOperation(p.logger, "Purge held files", "filestore_purger")
logger.Info("added", zap.Int("count", len(fileNames)))
logger.Debug("purging", zap.Strings("files", fileNames))
p.mu.Lock()
if p.running {
p.mu.Unlock()
logger.Info("already running, files added to previous operation")
logEndOp()
return
}
p.running = true
p.mu.Unlock()
go func() {
var purgeCount int
defer func() {
logger.Info("removed", zap.Int("files", purgeCount))
logEndOp()
}()
for {
p.mu.Lock()
for k, v := range p.files {
@ -1598,15 +1610,17 @@ func (p *purger) purge() {
// we allow calls to Ref and Unref under the read lock and no lock at all respectively.
if !v.InUse() {
if err := v.Close(); err != nil {
p.logger.Info("Purge: close file", zap.Error(err))
logger.Error("close file failed", zap.String("file", k), zap.Error(err))
continue
}
if err := v.Remove(); err != nil {
p.logger.Info("Purge: remove file", zap.Error(err))
logger.Error("remove file failed", zap.String("file", k), zap.Error(err))
continue
}
logger.Debug("successfully removed", zap.String("file", k))
delete(p.files, k)
purgeCount++
}
}