diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 31eecbd0cd..9c2b90244a 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -1381,6 +1381,9 @@ type tsmBatchKeyIterator struct { // errs is any error we received while iterating values. errs TSMErrors + // errSet is the error strings we have seen before + errSet map[string]struct{} + // indicates whether the iterator should choose a faster merging strategy over a more // optimally compressed one. If fast is true, multiple blocks will just be added as is // and not combined. In some cases, a slower path will need to be utilized even when @@ -1426,13 +1429,18 @@ type tsmBatchKeyIterator struct { overflowErrors int } +// AppendError - store unique errors in the order of first appearance, +// up to a limit of maxErrors. If the error is unique and stored, return true. func (t *tsmBatchKeyIterator) AppendError(err error) bool { - if t.maxErrors > len(t.errs) { + s := err.Error() + if _, ok := t.errSet[s]; ok { + return true + } else if t.maxErrors > len(t.errs) { t.errs = append(t.errs, err) - // Was the error stored? + t.errSet[s] = struct{}{} return true } else { - // Was the error dropped + // Was the error dropped? t.overflowErrors++ return false } @@ -1450,6 +1458,7 @@ func NewTSMBatchKeyIterator(size int, fast bool, maxErrors int, interrupt chan s readers: readers, values: map[string][]Value{}, pos: make([]int, len(readers)), + errSet: map[string]struct{}{}, size: size, iterators: iter, fast: fast, @@ -1680,6 +1689,8 @@ func (k *tsmBatchKeyIterator) Close() error { for _, r := range k.readers { errSlice = append(errSlice, r.Close()) } + clear(k.errSet) + k.errs = nil return errors.Join(errSlice...) } @@ -1689,8 +1700,7 @@ func (k *tsmBatchKeyIterator) Err() error { return nil } // Copy the errors before appending the dropped error count - var errs TSMErrors - errs = make([]error, 0, len(k.errs)+1) + errs := make([]error, 0, len(k.errs)+1) errs = append(errs, k.errs...) errs = append(errs, fmt.Errorf("additional errors dropped: %d", k.overflowErrors)) return errors.Join(errs...) diff --git a/tsdb/engine/tsm1/reader_test.go b/tsdb/engine/tsm1/reader_test.go index f894897354..c7003a2897 100644 --- a/tsdb/engine/tsm1/reader_test.go +++ b/tsdb/engine/tsm1/reader_test.go @@ -1875,8 +1875,14 @@ func TestBatchKeyIterator_Errors(t *testing.T) { err: fmt.Errorf("decode error: unable to decompress block type %s for key '%s': %v", "string", "summary#!~#mfu_estimated_percent", fmt.Errorf("test invalid error 1"))}, fmt.Errorf("test error 2"), + // Duplicate error - should be stored once, but not counted as dropped. + fmt.Errorf("test error 2"), fmt.Errorf("test error 3"), + fmt.Errorf("test error 4"), } + + // Store all but the last error + MaxErrors := len(errorCases) - 2 dir, name := createTestTSM(t) defer os.RemoveAll(dir) fr, err := os.Open(name) @@ -1887,44 +1893,33 @@ func TestBatchKeyIterator_Errors(t *testing.T) { if err != nil { // Only have a deferred close if we could not create the TSMReader defer func() { - if e := fr.Close(); e != nil { - t.Fatalf("unexpected error closing %s: %v", name, e) - } + require.NoError(t, fr.Close(), "unexpected error closing %s", name) + }() + } else { + defer func() { + require.NoError(t, r.Close(), "unexpected error closing TSMReader for %s", name) }() - - t.Fatalf("unexpected error creating TSMReader for %s: %v", name, err) } - defer func() { - if e := r.Close(); e != nil { - t.Fatalf("error closing TSMReader for %s: %v", name, e) - } - }() + require.NoError(t, err, "unexpected error creating TSMReader for %s", name) interrupts := make(chan struct{}) var iter KeyIterator - if iter, err = NewTSMBatchKeyIterator(3, false, len(errorCases), interrupts, []string{name}, r); err != nil { - t.Fatalf("unexpected error creating tsmBatchKeyIterator: %v", err) - } - var i int - for i = 0; i < 2; i++ { - for j, e := range errorCases { + iter, err = NewTSMBatchKeyIterator(3, false, MaxErrors, interrupts, []string{name}, r) + require.NoError(t, err, "unexpected error creating tsmBatchKeyIterator") + + for i := 0; i < 2; i++ { + for _, e := range errorCases { saved := iter.(*tsmBatchKeyIterator).AppendError(e) - index := (1 + j) + (len(errorCases) * i) - if index < len(errorCases) && !saved { - t.Fatalf("error unexpectedly not saved: %d", index) - } else if index > len(errorCases) && saved { - t.Fatalf("error unexpectedly saved: %d", index) - } + savedErrs := iter.(*tsmBatchKeyIterator).Err().(interface{ Unwrap() []error }).Unwrap() + require.False(t, len(savedErrs) < MaxErrors && !saved, "error not saved when it should have been: %v", e) } } var blockReadError errBlockRead iterErr := iter.Err() joinErr, ok := iterErr.(interface{ Unwrap() []error }) require.True(t, ok, "errs does not implement Unwrap() as a joinError should: %T", iterErr) - require.Equal(t, 1+len(errorCases), len(joinErr.Unwrap()), "saved wrong number of errors") + require.Equal(t, 1+MaxErrors, len(joinErr.Unwrap()), "saved wrong number of errors") require.True(t, errors.As(iterErr, &blockReadError), "expected errBlockRead error, got %T", err) require.Equal(t, testFile, blockReadError.file, "unexpected file name in error") - expected := fmt.Sprintf("additional errors dropped: %d", len(errorCases)) - require.Equal(t, expected, joinErr.Unwrap()[len(errorCases)].Error(), "unexpected error message for dropped errors") } func createTestTSM(t *testing.T) (dir string, name string) {