fix: discard excessive errors (#22379) (#22391)

The tsmBatchKeyIterator discards excessive errors to avoid
out-of-memory crashes when compacting very corrupt files.
Any error beyond DefaultMaxSavedErrors (100) will be
discarded instead of appended to the error slice.

closes https://github.com/influxdata/influxdb/issues/22328

(cherry picked from commit e53f75e06d)

closes https://github.com/influxdata/influxdb/issues/22381
pull/21783/merge
davidby-influx 2021-09-03 14:57:36 -07:00 committed by GitHub
parent e30eb3cc46
commit 7ad612b0d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 121 additions and 5 deletions

View File

@ -40,6 +40,10 @@ const (
// TSMFileExtension is the extension used for TSM files.
TSMFileExtension = "tsm"
// DefaultMaxSavedErrors is the number of errors that are stored by a TSMBatchKeyReader before
// subsequent errors are discarded
DefaultMaxSavedErrors = 100
)
var (
@ -954,7 +958,7 @@ func (c *Compactor) compact(fast bool, tsmFiles []string, logger *zap.Logger) ([
return nil, nil
}
tsm, err := NewTSMBatchKeyIterator(size, fast, intC, tsmFiles, trs...)
tsm, err := NewTSMBatchKeyIterator(size, fast, DefaultMaxSavedErrors, intC, tsmFiles, trs...)
if err != nil {
return nil, err
}
@ -1660,15 +1664,28 @@ type tsmBatchKeyIterator struct {
// without decode
merged blocks
interrupt chan struct{}
// maxErrors is the maximum number of errors to store before discarding.
maxErrors int
// overflowErrors is the number of errors we have ignored.
overflowErrors int
}
func (t *tsmBatchKeyIterator) AppendError(err error) {
t.errs = append(t.errs, err)
func (t *tsmBatchKeyIterator) AppendError(err error) bool {
if t.maxErrors > len(t.errs) {
t.errs = append(t.errs, err)
// Was the error stored?
return true
} else {
// Was the error dropped
t.overflowErrors++
return false
}
}
// NewTSMBatchKeyIterator returns a new TSM key iterator from readers.
// size indicates the maximum number of values to encode in a single block.
func NewTSMBatchKeyIterator(size int, fast bool, interrupt chan struct{}, tsmFiles []string, readers ...*TSMReader) (KeyIterator, error) {
func NewTSMBatchKeyIterator(size int, fast bool, maxErrors int, interrupt chan struct{}, tsmFiles []string, readers ...*TSMReader) (KeyIterator, error) {
var iter []*BlockIterator
for _, r := range readers {
iter = append(iter, r.BlockIterator())
@ -1689,6 +1706,7 @@ func NewTSMBatchKeyIterator(size int, fast bool, interrupt chan struct{}, tsmFil
mergedBooleanValues: &tsdb.BooleanArray{},
mergedStringValues: &tsdb.StringArray{},
interrupt: interrupt,
maxErrors: maxErrors,
}, nil
}
@ -1916,7 +1934,12 @@ func (k *tsmBatchKeyIterator) Err() error {
if len(k.errs) == 0 {
return nil
}
return k.errs
// Copy the errors before appending the dropped error count
var errs TSMErrors
errs = make([]error, 0, len(k.errs)+1)
errs = append(errs, k.errs...)
errs = append(errs, fmt.Errorf("additional errors dropped: %d", k.overflowErrors))
return errs
}
type cacheKeyIterator struct {

View File

@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
"sort"
"strings"
"testing"
"github.com/stretchr/testify/require"
@ -1864,6 +1865,98 @@ func TestTSMReader_References(t *testing.T) {
}
}
func TestBatchKeyIterator_Errors(t *testing.T) {
const MaxErrors = 10
dir, name := createTestTSM(t)
defer os.RemoveAll(dir)
fr, err := os.Open(name)
if err != nil {
t.Fatalf("unexpected error opening file %s: %v", name, err)
}
r, err := NewTSMReader(fr)
if err != nil {
// Only have a deferred close if we could not create the TSMReader
defer func() {
if e := fr.Close(); e != nil {
t.Fatalf("unexpected error closing %s: %v", name, e)
}
}()
t.Fatalf("unexpected error creating TSMReader for %s: %v", name, err)
}
defer func() {
if e := r.Close(); e != nil {
t.Fatalf("error closing TSMReader for %s: %v", name, e)
}
}()
interrupts := make(chan struct{})
var iter KeyIterator
if iter, err = NewTSMBatchKeyIterator(3, false, MaxErrors, interrupts, []string{name}, r); err != nil {
t.Fatalf("unexpected error creating tsmBatchKeyIterator: %v", err)
}
var i int
for i = 0; i < MaxErrors*2; i++ {
saved := iter.(*tsmBatchKeyIterator).AppendError(fmt.Errorf("fake error: %d", i))
if i < MaxErrors && !saved {
t.Fatalf("error unexpectedly not saved: %d", i)
}
if i >= MaxErrors && saved {
t.Fatalf("error unexpectedly saved: %d", i)
}
}
errs := iter.Err()
if errCnt := len(errs.(TSMErrors)); errCnt != (MaxErrors + 1) {
t.Fatalf("saved wrong number of errors: expected %d, got %d", MaxErrors, errCnt)
}
expected := fmt.Sprintf("additional errors dropped: %d", i-MaxErrors)
if strings.Compare(errs.(TSMErrors)[MaxErrors].Error(), expected) != 0 {
t.Fatalf("expected: '%s', got: '%s", expected, errs.(TSMErrors)[MaxErrors].Error())
}
}
func createTestTSM(t *testing.T) (dir string, name string) {
dir = MustTempDir()
f := mustTempFile(dir)
name = f.Name()
w, err := NewTSMWriter(f)
if err != nil {
f.Close()
t.Fatalf("unexpected error creating writer for %s: %v", name, err)
}
defer func() {
if e := w.Close(); e != nil {
t.Fatalf("write TSM close of %s: %v", name, err)
}
}()
var data = map[string][]Value{
"float": []Value{NewValue(1, 1.0)},
"int": []Value{NewValue(1, int64(1))},
"uint": []Value{NewValue(1, ^uint64(0))},
"bool": []Value{NewValue(1, true)},
"string": []Value{NewValue(1, "foo")},
}
keys := make([]string, 0, len(data))
for k := range data {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
if err := w.Write([]byte(k), data[k]); err != nil {
t.Fatalf("write TSM value: %v", err)
}
}
if err := w.WriteIndex(); err != nil {
t.Fatalf("write TSM index: %v", err)
}
return dir, name
}
func BenchmarkIndirectIndex_UnmarshalBinary(b *testing.B) {
index := NewIndexWriter()
for i := 0; i < 100000; i++ {