fix: rename TSM files on errBlockRead

pull/25839/head
davidby-influx 2025-01-14 16:51:15 -08:00
parent f04105bede
commit ee2cfaea7a
4 changed files with 65 additions and 41 deletions

View File

@ -85,6 +85,15 @@ type errBlockRead struct {
err error err error
} }
func (e errBlockRead) Unwrap() error {
return e.err
}
func (e errBlockRead) Is(target error) bool {
_, ok := target.(errBlockRead)
return ok
}
func (e errBlockRead) Error() string { func (e errBlockRead) Error() string {
if e.err != nil { if e.err != nil {
return fmt.Sprintf("block read error on %s: %s", e.file, e.err) return fmt.Sprintf("block read error on %s: %s", e.file, e.err)
@ -1684,7 +1693,7 @@ func (k *tsmBatchKeyIterator) Err() error {
errs = make([]error, 0, len(k.errs)+1) errs = make([]error, 0, len(k.errs)+1)
errs = append(errs, k.errs...) errs = append(errs, k.errs...)
errs = append(errs, fmt.Errorf("additional errors dropped: %d", k.overflowErrors)) errs = append(errs, fmt.Errorf("additional errors dropped: %d", k.overflowErrors))
return errs return errors.Join(errs...)
} }
type cacheKeyIterator struct { type cacheKeyIterator struct {

View File

@ -9,7 +9,6 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"sort" "sort"
"strings"
"testing" "testing"
"time" "time"
@ -292,19 +291,19 @@ func TestCompactor_DecodeError(t *testing.T) {
compactor.FileStore = ffs compactor.FileStore = ffs
files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop()) files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop())
if err == nil { require.Error(t, err, "expected error writing snapshot")
t.Fatalf("expected error writing snapshot: %v", err) require.Zero(t, len(files), "no files should be compacted")
}
if len(files) > 0 {
t.Fatalf("no files should be compacted: got %v", len(files))
}
compactor.Open() compactor.Open()
if _, err = compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop()); err == nil || !strings.Contains(err.Error(), "decode error: unable to decompress block type float for key 'cpu,host=A#!~#value': unpackBlock: not enough data for timestamp") { _, err = compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop())
t.Fatalf("expected error writing snapshot: %v", err)
} require.ErrorContains(t, err, "decode error: unable to decompress block type float for key 'cpu,host=A#!~#value': unpackBlock: not enough data for timestamp")
tsm1.MoveTsmOnReadErr(err, zap.NewNop(), func(strings []string, strings2 []string, f func([]tsm1.TSMFile)) error {
require.Equal(t, 1, len(strings))
require.Equal(t, strings[0], f3)
return nil
})
} }
// Ensures that a compaction will properly merge multiple TSM files // Ensures that a compaction will properly merge multiple TSM files

View File

@ -2332,16 +2332,7 @@ func (s *compactionStrategy) compactGroup() {
log.Warn("Error compacting TSM files", zap.Error(err)) log.Warn("Error compacting TSM files", zap.Error(err))
// We hit a bad TSM file - rename so the next compaction can proceed. MoveTsmOnReadErr(err, log, s.fileStore.ReplaceWithCallback)
if _, ok := err.(errBlockRead); ok {
path := err.(errBlockRead).file
log.Info("Renaming a corrupt TSM file due to compaction error", zap.Error(err))
if err := s.fileStore.ReplaceWithCallback([]string{path}, nil, nil); err != nil {
log.Info("Error removing bad TSM file", zap.Error(err))
} else if e := os.Rename(path, path+"."+BadTSMFileExtension); e != nil {
log.Info("Error renaming corrupt TSM file", zap.Error((err)))
}
}
atomic.AddInt64(s.errorStat, 1) atomic.AddInt64(s.errorStat, 1)
time.Sleep(time.Second) time.Sleep(time.Second)
@ -2370,6 +2361,20 @@ func (s *compactionStrategy) compactGroup() {
atomic.AddInt64(s.successStat, 1) atomic.AddInt64(s.successStat, 1)
} }
func MoveTsmOnReadErr(err error, log *zap.Logger, ReplaceWithCallback func([]string, []string, func([]TSMFile)) error) {
var blockReadErr errBlockRead
// We hit a bad TSM file - rename so the next compaction can proceed.
if ok := errors.As(err, &blockReadErr); ok {
path := blockReadErr.file
log.Error("Renaming a corrupt TSM file due to compaction error", zap.String("file", path), zap.Error(err))
if err := ReplaceWithCallback([]string{path}, nil, nil); err != nil {
log.Info("Error removing bad TSM file", zap.String("file", path), zap.Error(err))
} else if e := os.Rename(path, path+"."+BadTSMFileExtension); e != nil {
log.Info("Error renaming corrupt TSM file", zap.String("file", path), zap.Error(err))
}
}
}
// levelCompactionStrategy returns a compactionStrategy for the given level. // levelCompactionStrategy returns a compactionStrategy for the given level.
// It returns nil if there are no TSM files to compact. // It returns nil if there are no TSM files to compact.
func (e *Engine) levelCompactionStrategy(group CompactionGroup, fast bool, level int) *compactionStrategy { func (e *Engine) levelCompactionStrategy(group CompactionGroup, fast bool, level int) *compactionStrategy {

View File

@ -3,12 +3,12 @@ package tsm1
import ( import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"errors"
"fmt" "fmt"
"math" "math"
"os" "os"
"path/filepath" "path/filepath"
"sort" "sort"
"strings"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -1867,8 +1867,16 @@ func TestTSMReader_References(t *testing.T) {
} }
func TestBatchKeyIterator_Errors(t *testing.T) { func TestBatchKeyIterator_Errors(t *testing.T) {
const MaxErrors = 10 const testFile = "testFile.tsm"
errorCases := []error{
fmt.Errorf("test error 0"),
errBlockRead{
file: testFile,
err: fmt.Errorf("decode error: unable to decompress block type %s for key '%s': %v",
"string", "summary#!~#mfu_estimated_percent", fmt.Errorf("test invalid error 1"))},
fmt.Errorf("test error 2"),
fmt.Errorf("test error 3"),
}
dir, name := createTestTSM(t) dir, name := createTestTSM(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
fr, err := os.Open(name) fr, err := os.Open(name)
@ -1893,27 +1901,30 @@ func TestBatchKeyIterator_Errors(t *testing.T) {
}() }()
interrupts := make(chan struct{}) interrupts := make(chan struct{})
var iter KeyIterator var iter KeyIterator
if iter, err = NewTSMBatchKeyIterator(3, false, MaxErrors, interrupts, []string{name}, r); err != nil { if iter, err = NewTSMBatchKeyIterator(3, false, len(errorCases), interrupts, []string{name}, r); err != nil {
t.Fatalf("unexpected error creating tsmBatchKeyIterator: %v", err) t.Fatalf("unexpected error creating tsmBatchKeyIterator: %v", err)
} }
var i int var i int
for i = 0; i < MaxErrors*2; i++ { for i = 0; i < 2; i++ {
saved := iter.(*tsmBatchKeyIterator).AppendError(fmt.Errorf("fake error: %d", i)) for j, e := range errorCases {
if i < MaxErrors && !saved { saved := iter.(*tsmBatchKeyIterator).AppendError(e)
t.Fatalf("error unexpectedly not saved: %d", i) index := (1 + j) + (len(errorCases) * i)
} if index < len(errorCases) && !saved {
if i >= MaxErrors && saved { t.Fatalf("error unexpectedly not saved: %d", index)
t.Fatalf("error unexpectedly saved: %d", i) } else if index > len(errorCases) && saved {
t.Fatalf("error unexpectedly saved: %d", index)
}
} }
} }
errs := iter.Err() var blockReadError errBlockRead
if errCnt := len(errs.(TSMErrors)); errCnt != (MaxErrors + 1) { iterErr := iter.Err()
t.Fatalf("saved wrong number of errors: expected %d, got %d", MaxErrors, errCnt) joinErr, ok := iterErr.(interface{ Unwrap() []error })
} require.True(t, ok, "errs does not implement Unwrap() as a joinError should: %T", iterErr)
expected := fmt.Sprintf("additional errors dropped: %d", i-MaxErrors) require.Equal(t, 1+len(errorCases), len(joinErr.Unwrap()), "saved wrong number of errors")
if strings.Compare(errs.(TSMErrors)[MaxErrors].Error(), expected) != 0 { require.True(t, errors.As(iterErr, &blockReadError), "expected errBlockRead error, got %T", err)
t.Fatalf("expected: '%s', got: '%s", expected, errs.(TSMErrors)[MaxErrors].Error()) require.Equal(t, testFile, blockReadError.file, "unexpected file name in error")
} expected := fmt.Sprintf("additional errors dropped: %d", len(errorCases))
require.Equal(t, expected, joinErr.Unwrap()[len(errorCases)].Error(), "unexpected error message for dropped errors")
} }
func createTestTSM(t *testing.T) (dir string, name string) { func createTestTSM(t *testing.T) (dir string, name string) {