fix: move aside TSM file on errBlockRead (#25899)

The error type check for errBlockRead was incorrect,
and bad TSM files were not being moved aside when
that error was encountered. Use errors.Join,
errors.Is, and errors.As to correctly unwrap multiple
errors.

Closes https://github.com/influxdata/influxdb/issues/25838

(cherry picked from commit 800970490a)

Closes https://github.com/influxdata/influxdb/issues/25840
pull/24397/merge
davidby-influx 2025-01-22 14:10:14 -08:00 committed by GitHub
parent c82d4f86ee
commit dd7b4ce351
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 90 additions and 60 deletions

View File

@ -86,6 +86,15 @@ type errBlockRead struct {
err error
}
func (e errBlockRead) Unwrap() error {
return e.err
}
func (e errBlockRead) Is(target error) bool {
_, ok := target.(errBlockRead)
return ok
}
func (e errBlockRead) Error() string {
if e.err != nil {
return fmt.Sprintf("block read error on %s: %s", e.file, e.err)
@ -1348,6 +1357,9 @@ type tsmBatchKeyIterator struct {
// errs is any error we received while iterating values.
errs TSMErrors
// errSet is the error strings we have seen before
errSet map[string]struct{}
// indicates whether the iterator should choose a faster merging strategy over a more
// optimally compressed one. If fast is true, multiple blocks will just be added as is
// and not combined. In some cases, a slower path will need to be utilized even when
@ -1393,13 +1405,18 @@ type tsmBatchKeyIterator struct {
overflowErrors int
}
// AppendError - store unique errors in the order of first appearance,
// up to a limit of maxErrors. If the error is unique and stored, return true.
func (t *tsmBatchKeyIterator) AppendError(err error) bool {
if t.maxErrors > len(t.errs) {
s := err.Error()
if _, ok := t.errSet[s]; ok {
return true
} else if t.maxErrors > len(t.errs) {
t.errs = append(t.errs, err)
// Was the error stored?
t.errSet[s] = struct{}{}
return true
} else {
// Was the error dropped
// Was the error dropped?
t.overflowErrors++
return false
}
@ -1417,6 +1434,7 @@ func NewTSMBatchKeyIterator(size int, fast bool, maxErrors int, interrupt chan s
readers: readers,
values: map[string][]Value{},
pos: make([]int, len(readers)),
errSet: map[string]struct{}{},
size: size,
iterators: iter,
fast: fast,
@ -1616,11 +1634,11 @@ func (k *tsmBatchKeyIterator) merge() {
}
func (k *tsmBatchKeyIterator) handleEncodeError(err error, typ string) {
k.AppendError(errBlockRead{k.currentTsm, fmt.Errorf("encode error: unable to compress block type %s for key '%s': %v", typ, k.key, err)})
k.AppendError(errBlockRead{k.currentTsm, fmt.Errorf("encode error: unable to compress block type %s for key '%s': %w", typ, k.key, err)})
}
func (k *tsmBatchKeyIterator) handleDecodeError(err error, typ string) {
k.AppendError(errBlockRead{k.currentTsm, fmt.Errorf("decode error: unable to decompress block type %s for key '%s': %v", typ, k.key, err)})
k.AppendError(errBlockRead{k.currentTsm, fmt.Errorf("decode error: unable to decompress block type %s for key '%s': %w", typ, k.key, err)})
}
func (k *tsmBatchKeyIterator) Read() ([]byte, int64, int64, []byte, error) {
@ -1647,6 +1665,8 @@ func (k *tsmBatchKeyIterator) Close() error {
for _, r := range k.readers {
errSlice = append(errSlice, r.Close())
}
clear(k.errSet)
k.errs = nil
return errors.Join(errSlice...)
}
@ -1656,11 +1676,10 @@ func (k *tsmBatchKeyIterator) Err() error {
return nil
}
// Copy the errors before appending the dropped error count
var errs TSMErrors
errs = make([]error, 0, len(k.errs)+1)
errs := make([]error, 0, len(k.errs)+1)
errs = append(errs, k.errs...)
errs = append(errs, fmt.Errorf("additional errors dropped: %d", k.overflowErrors))
return errs
return errors.Join(errs...)
}
type cacheKeyIterator struct {

View File

@ -8,13 +8,13 @@ import (
"os"
"path/filepath"
"sort"
"strings"
"testing"
"time"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
@ -294,19 +294,19 @@ func TestCompactor_DecodeError(t *testing.T) {
compactor.FileStore = ffs
files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop())
if err == nil {
t.Fatalf("expected error writing snapshot: %v", err)
}
if len(files) > 0 {
t.Fatalf("no files should be compacted: got %v", len(files))
}
require.Error(t, err, "expected error writing snapshot")
require.Zero(t, len(files), "no files should be compacted")
compactor.Open()
if _, err = compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop()); err == nil || !strings.Contains(err.Error(), "decode error: unable to decompress block type float for key 'cpu,host=A#!~#value': unpackBlock: not enough data for timestamp") {
t.Fatalf("expected error writing snapshot: %v", err)
}
_, err = compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop())
require.ErrorContains(t, err, "decode error: unable to decompress block type float for key 'cpu,host=A#!~#value': unpackBlock: not enough data for timestamp")
tsm1.MoveTsmOnReadErr(err, zap.NewNop(), func(strings []string, strings2 []string, f func([]tsm1.TSMFile)) error {
require.Equal(t, 1, len(strings))
require.Equal(t, strings[0], f3)
return nil
})
}
// Ensures that a compaction will properly merge multiple TSM files

View File

@ -2236,16 +2236,7 @@ func (s *compactionStrategy) compactGroup() {
log.Warn("Error compacting TSM files", zap.Error(err))
// We hit a bad TSM file - rename so the next compaction can proceed.
if _, ok := err.(errBlockRead); ok {
path := err.(errBlockRead).file
log.Info("Renaming a corrupt TSM file due to compaction error", zap.Error(err))
if err := s.fileStore.ReplaceWithCallback([]string{path}, nil, nil); err != nil {
log.Info("Error removing bad TSM file", zap.Error(err))
} else if e := os.Rename(path, path+"."+BadTSMFileExtension); e != nil {
log.Info("Error renaming corrupt TSM file", zap.Error((err)))
}
}
MoveTsmOnReadErr(err, log, s.fileStore.ReplaceWithCallback)
s.errorStat.Inc()
time.Sleep(time.Second)
@ -2273,6 +2264,20 @@ func (s *compactionStrategy) compactGroup() {
zap.Int("tsm1_files_n", len(files)))
}
func MoveTsmOnReadErr(err error, log *zap.Logger, ReplaceWithCallback func([]string, []string, func([]TSMFile)) error) {
var blockReadErr errBlockRead
// We hit a bad TSM file - rename so the next compaction can proceed.
if ok := errors.As(err, &blockReadErr); ok {
path := blockReadErr.file
log.Info("Renaming a corrupt TSM file due to compaction error", zap.String("file", path), zap.Error(err))
if err := ReplaceWithCallback([]string{path}, nil, nil); err != nil {
log.Info("Error removing bad TSM file", zap.String("file", path), zap.Error(err))
} else if e := os.Rename(path, path+"."+BadTSMFileExtension); e != nil {
log.Info("Error renaming corrupt TSM file", zap.String("file", path), zap.Error(err))
}
}
}
// levelCompactionStrategy returns a compactionStrategy for the given level.
// It returns nil if there are no TSM files to compact.
func (e *Engine) levelCompactionStrategy(group CompactionGroup, fast bool, level int) *compactionStrategy {

View File

@ -1,12 +1,12 @@
package tsm1
import (
"errors"
"fmt"
"math"
"os"
"path/filepath"
"sort"
"strings"
"testing"
"github.com/stretchr/testify/require"
@ -1841,8 +1841,22 @@ func TestTSMReader_References(t *testing.T) {
}
func TestBatchKeyIterator_Errors(t *testing.T) {
const MaxErrors = 10
const testFile = "testFile.tsm"
errorCases := []error{
fmt.Errorf("test error 0"),
errBlockRead{
file: testFile,
err: fmt.Errorf("decode error: unable to decompress block type %s for key '%s': %v",
"string", "summary#!~#mfu_estimated_percent", fmt.Errorf("test invalid error 1"))},
fmt.Errorf("test error 2"),
// Duplicate error - should be stored once, but not counted as dropped.
fmt.Errorf("test error 2"),
fmt.Errorf("test error 3"),
fmt.Errorf("test error 4"),
}
// Store all but the last error
MaxErrors := len(errorCases) - 2
dir, name := createTestTSM(t)
defer os.RemoveAll(dir)
fr, err := os.Open(name)
@ -1853,41 +1867,33 @@ func TestBatchKeyIterator_Errors(t *testing.T) {
if err != nil {
// Only have a deferred close if we could not create the TSMReader
defer func() {
if e := fr.Close(); e != nil {
t.Fatalf("unexpected error closing %s: %v", name, e)
}
require.NoError(t, fr.Close(), "unexpected error closing %s", name)
}()
} else {
defer func() {
require.NoError(t, r.Close(), "unexpected error closing TSMReader for %s", name)
}()
t.Fatalf("unexpected error creating TSMReader for %s: %v", name, err)
}
defer func() {
if e := r.Close(); e != nil {
t.Fatalf("error closing TSMReader for %s: %v", name, e)
}
}()
require.NoError(t, err, "unexpected error creating TSMReader for %s", name)
interrupts := make(chan struct{})
var iter KeyIterator
if iter, err = NewTSMBatchKeyIterator(3, false, MaxErrors, interrupts, []string{name}, r); err != nil {
t.Fatalf("unexpected error creating tsmBatchKeyIterator: %v", err)
}
var i int
for i = 0; i < MaxErrors*2; i++ {
saved := iter.(*tsmBatchKeyIterator).AppendError(fmt.Errorf("fake error: %d", i))
if i < MaxErrors && !saved {
t.Fatalf("error unexpectedly not saved: %d", i)
}
if i >= MaxErrors && saved {
t.Fatalf("error unexpectedly saved: %d", i)
iter, err = NewTSMBatchKeyIterator(3, false, MaxErrors, interrupts, []string{name}, r)
require.NoError(t, err, "unexpected error creating tsmBatchKeyIterator")
for i := 0; i < 2; i++ {
for _, e := range errorCases {
saved := iter.(*tsmBatchKeyIterator).AppendError(e)
savedErrs := iter.(*tsmBatchKeyIterator).Err().(interface{ Unwrap() []error }).Unwrap()
require.False(t, len(savedErrs) < MaxErrors && !saved, "error not saved when it should have been: %v", e)
}
}
errs := iter.Err()
if errCnt := len(errs.(TSMErrors)); errCnt != (MaxErrors + 1) {
t.Fatalf("saved wrong number of errors: expected %d, got %d", MaxErrors, errCnt)
}
expected := fmt.Sprintf("additional errors dropped: %d", i-MaxErrors)
if strings.Compare(errs.(TSMErrors)[MaxErrors].Error(), expected) != 0 {
t.Fatalf("expected: '%s', got: '%s", expected, errs.(TSMErrors)[MaxErrors].Error())
}
var blockReadError errBlockRead
iterErr := iter.Err()
joinErr, ok := iterErr.(interface{ Unwrap() []error })
require.True(t, ok, "errs does not implement Unwrap() as a joinError should: %T", iterErr)
require.Equal(t, 1+MaxErrors, len(joinErr.Unwrap()), "saved wrong number of errors")
require.True(t, errors.As(iterErr, &blockReadError), "expected errBlockRead error, got %T", err)
require.Equal(t, testFile, blockReadError.file, "unexpected file name in error")
}
func createTestTSM(t *testing.T) (dir string, name string) {