feat: store unique tsmBatchKeyIterator errors in order to a limit

DSB_rename_bad_tsm
davidby-influx 2025-01-15 18:58:41 -08:00
parent 06d2cd86aa
commit 1cc527a617
2 changed files with 35 additions and 30 deletions

View File

@ -1381,6 +1381,9 @@ type tsmBatchKeyIterator struct {
// errs is any error we received while iterating values.
errs TSMErrors
// errSet is the error strings we have seen before
errSet map[string]struct{}
// indicates whether the iterator should choose a faster merging strategy over a more
// optimally compressed one. If fast is true, multiple blocks will just be added as is
// and not combined. In some cases, a slower path will need to be utilized even when
@ -1426,13 +1429,18 @@ type tsmBatchKeyIterator struct {
overflowErrors int
}
// AppendError - store unique errors in the order of first appearance,
// up to a limit of maxErrors. If the error is unique and stored, return true.
func (t *tsmBatchKeyIterator) AppendError(err error) bool {
if t.maxErrors > len(t.errs) {
s := err.Error()
if _, ok := t.errSet[s]; ok {
return true
} else if t.maxErrors > len(t.errs) {
t.errs = append(t.errs, err)
// Was the error stored?
t.errSet[s] = struct{}{}
return true
} else {
// Was the error dropped
// Was the error dropped?
t.overflowErrors++
return false
}
@ -1450,6 +1458,7 @@ func NewTSMBatchKeyIterator(size int, fast bool, maxErrors int, interrupt chan s
readers: readers,
values: map[string][]Value{},
pos: make([]int, len(readers)),
errSet: map[string]struct{}{},
size: size,
iterators: iter,
fast: fast,
@ -1680,6 +1689,8 @@ func (k *tsmBatchKeyIterator) Close() error {
for _, r := range k.readers {
errSlice = append(errSlice, r.Close())
}
clear(k.errSet)
k.errs = nil
return errors.Join(errSlice...)
}
@ -1689,8 +1700,7 @@ func (k *tsmBatchKeyIterator) Err() error {
return nil
}
// Copy the errors before appending the dropped error count
var errs TSMErrors
errs = make([]error, 0, len(k.errs)+1)
errs := make([]error, 0, len(k.errs)+1)
errs = append(errs, k.errs...)
errs = append(errs, fmt.Errorf("additional errors dropped: %d", k.overflowErrors))
return errors.Join(errs...)

View File

@ -1875,8 +1875,14 @@ func TestBatchKeyIterator_Errors(t *testing.T) {
err: fmt.Errorf("decode error: unable to decompress block type %s for key '%s': %v",
"string", "summary#!~#mfu_estimated_percent", fmt.Errorf("test invalid error 1"))},
fmt.Errorf("test error 2"),
// Duplicate error - should be stored once, but not counted as dropped.
fmt.Errorf("test error 2"),
fmt.Errorf("test error 3"),
fmt.Errorf("test error 4"),
}
// Store all but the last error
MaxErrors := len(errorCases) - 2
dir, name := createTestTSM(t)
defer os.RemoveAll(dir)
fr, err := os.Open(name)
@ -1887,44 +1893,33 @@ func TestBatchKeyIterator_Errors(t *testing.T) {
if err != nil {
// Only have a deferred close if we could not create the TSMReader
defer func() {
if e := fr.Close(); e != nil {
t.Fatalf("unexpected error closing %s: %v", name, e)
}
require.NoError(t, fr.Close(), "unexpected error closing %s", name)
}()
} else {
defer func() {
require.NoError(t, r.Close(), "unexpected error closing TSMReader for %s", name)
}()
t.Fatalf("unexpected error creating TSMReader for %s: %v", name, err)
}
defer func() {
if e := r.Close(); e != nil {
t.Fatalf("error closing TSMReader for %s: %v", name, e)
}
}()
require.NoError(t, err, "unexpected error creating TSMReader for %s", name)
interrupts := make(chan struct{})
var iter KeyIterator
if iter, err = NewTSMBatchKeyIterator(3, false, len(errorCases), interrupts, []string{name}, r); err != nil {
t.Fatalf("unexpected error creating tsmBatchKeyIterator: %v", err)
}
var i int
for i = 0; i < 2; i++ {
for j, e := range errorCases {
iter, err = NewTSMBatchKeyIterator(3, false, MaxErrors, interrupts, []string{name}, r)
require.NoError(t, err, "unexpected error creating tsmBatchKeyIterator")
for i := 0; i < 2; i++ {
for _, e := range errorCases {
saved := iter.(*tsmBatchKeyIterator).AppendError(e)
index := (1 + j) + (len(errorCases) * i)
if index < len(errorCases) && !saved {
t.Fatalf("error unexpectedly not saved: %d", index)
} else if index > len(errorCases) && saved {
t.Fatalf("error unexpectedly saved: %d", index)
}
savedErrs := iter.(*tsmBatchKeyIterator).Err().(interface{ Unwrap() []error }).Unwrap()
require.False(t, len(savedErrs) < MaxErrors && !saved, "error not saved when it should have been: %v", e)
}
}
var blockReadError errBlockRead
iterErr := iter.Err()
joinErr, ok := iterErr.(interface{ Unwrap() []error })
require.True(t, ok, "errs does not implement Unwrap() as a joinError should: %T", iterErr)
require.Equal(t, 1+len(errorCases), len(joinErr.Unwrap()), "saved wrong number of errors")
require.Equal(t, 1+MaxErrors, len(joinErr.Unwrap()), "saved wrong number of errors")
require.True(t, errors.As(iterErr, &blockReadError), "expected errBlockRead error, got %T", err)
require.Equal(t, testFile, blockReadError.file, "unexpected file name in error")
expected := fmt.Sprintf("additional errors dropped: %d", len(errorCases))
require.Equal(t, expected, joinErr.Unwrap()[len(errorCases)].Error(), "unexpected error message for dropped errors")
}
func createTestTSM(t *testing.T) (dir string, name string) {