mirror of https://github.com/milvus-io/milvus.git
Use errors.Wrap and Combine so retry error could check `errors.Is` (#22520)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/22532/head
parent
bdd6bc7695
commit
51cb8590a1
|
@ -668,7 +668,7 @@ func TestPulsarClient_SubscribeExclusiveFail(t *testing.T) {
|
|||
|
||||
_, err := pc.Subscribe(mqwrapper.ConsumerOptions{Topic: "test_topic_name"})
|
||||
assert.Error(t, err)
|
||||
assert.True(t, retry.IsUnRecoverable(err))
|
||||
assert.False(t, retry.IsRecoverable(err))
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -62,7 +62,7 @@ func (s *stepStack) Execute(ctx context.Context) *stepStack {
|
|||
_, isConfirmGCStep := todo.(*confirmGCStep)
|
||||
skipLog := isWaitForTsSyncedStep || isConfirmGCStep
|
||||
|
||||
if retry.IsUnRecoverable(err) {
|
||||
if !retry.IsRecoverable(err) {
|
||||
if !skipLog {
|
||||
log.Warn("failed to execute step, not able to reschedule", zap.Error(err), zap.String("step", todo.Desc()))
|
||||
}
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
package errutil
|
||||
|
||||
import "github.com/cockroachdb/errors"
|
||||
import (
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/samber/lo"
|
||||
)
|
||||
|
||||
type multiErrors struct {
|
||||
errs []error
|
||||
|
@ -24,10 +27,16 @@ func (e multiErrors) Error() string {
|
|||
}
|
||||
|
||||
func (e multiErrors) Is(err error) bool {
|
||||
return errors.IsAny(err, e.errs...)
|
||||
for _, item := range e.errs {
|
||||
if errors.Is(item, err) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func Combine(errs ...error) error {
|
||||
errs = lo.Filter(errs, func(err error, _ int) bool { return err != nil })
|
||||
if len(errs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -26,6 +26,18 @@ func (s *ErrSuite) TestCombine() {
|
|||
s.Equal("first: second", err.Error())
|
||||
}
|
||||
|
||||
func (s *ErrSuite) TestCombineWithNil() {
|
||||
err := errors.New("non-nil")
|
||||
|
||||
err = Combine(nil, err)
|
||||
s.NotNil(err)
|
||||
}
|
||||
|
||||
func (s *ErrSuite) TestCombineOnlyNil() {
|
||||
err := Combine(nil, nil)
|
||||
s.Nil(err)
|
||||
}
|
||||
|
||||
func TestErrors(t *testing.T) {
|
||||
suite.Run(t, new(ErrSuite))
|
||||
}
|
||||
|
|
|
@ -17,21 +17,24 @@ import (
|
|||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/util/errorutil"
|
||||
"github.com/milvus-io/milvus/internal/util/errutil"
|
||||
)
|
||||
|
||||
// Do will run function with retry mechanism.
|
||||
// fn is the func to run.
|
||||
// Option can control the retry times and timeout.
|
||||
func Do(ctx context.Context, fn func() error, opts ...Option) error {
|
||||
log := log.Ctx(ctx)
|
||||
|
||||
c := newDefaultConfig()
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(c)
|
||||
}
|
||||
var el errorutil.ErrorList
|
||||
|
||||
var el error
|
||||
|
||||
for i := uint(0); i < c.attempts; i++ {
|
||||
if err := fn(); err != nil {
|
||||
|
@ -39,16 +42,17 @@ func Do(ctx context.Context, fn func() error, opts ...Option) error {
|
|||
log.Error("retry func failed", zap.Uint("retry time", i), zap.Error(err))
|
||||
}
|
||||
|
||||
el = append(el, err)
|
||||
err = errors.Wrapf(err, "attempt #%d", i)
|
||||
el = errutil.Combine(el, err)
|
||||
|
||||
if ok := IsUnRecoverable(err); ok {
|
||||
if !IsRecoverable(err) {
|
||||
return el
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(c.sleep):
|
||||
case <-ctx.Done():
|
||||
el = append(el, ctx.Err())
|
||||
el = errutil.Combine(el, errors.Wrapf(ctx.Err(), "context done during sleep after run#%d", i))
|
||||
return el
|
||||
}
|
||||
|
||||
|
@ -63,18 +67,16 @@ func Do(ctx context.Context, fn func() error, opts ...Option) error {
|
|||
return el
|
||||
}
|
||||
|
||||
type unrecoverableError struct {
|
||||
error
|
||||
}
|
||||
// errUnrecoverable is error instance for unrecoverable.
|
||||
var errUnrecoverable = errors.New("unrecoverable error")
|
||||
|
||||
// Unrecoverable method wrap an error to unrecoverableError. This will make retry
|
||||
// quick return.
|
||||
func Unrecoverable(err error) error {
|
||||
return unrecoverableError{err}
|
||||
return errutil.Combine(err, errUnrecoverable)
|
||||
}
|
||||
|
||||
// IsUnRecoverable is used to judge whether the error is wrapped by unrecoverableError.
|
||||
func IsUnRecoverable(err error) bool {
|
||||
_, isUnrecoverable := err.(unrecoverableError)
|
||||
return isUnrecoverable
|
||||
// IsRecoverable is used to judge whether the error is wrapped by unrecoverableError.
|
||||
func IsRecoverable(err error) bool {
|
||||
return !errors.Is(err, errUnrecoverable)
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/lingdor/stackerror"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
@ -28,7 +29,7 @@ func TestDo(t *testing.T) {
|
|||
testFn := func() error {
|
||||
if n < 3 {
|
||||
n++
|
||||
return fmt.Errorf("some error")
|
||||
return errors.New("some error")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -41,7 +42,8 @@ func TestAttempts(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
|
||||
testFn := func() error {
|
||||
return fmt.Errorf("some error")
|
||||
t.Log("executed")
|
||||
return errors.New("some error")
|
||||
}
|
||||
|
||||
err := Do(ctx, testFn, Attempts(1))
|
||||
|
@ -89,14 +91,16 @@ func TestUnRecoveryError(t *testing.T) {
|
|||
attempts := 0
|
||||
ctx := context.Background()
|
||||
|
||||
mockErr := errors.New("some error")
|
||||
testFn := func() error {
|
||||
attempts++
|
||||
return Unrecoverable(fmt.Errorf("some error"))
|
||||
return Unrecoverable(mockErr)
|
||||
}
|
||||
|
||||
err := Do(ctx, testFn, Attempts(3))
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, attempts, 1)
|
||||
assert.True(t, errors.Is(err, mockErr))
|
||||
}
|
||||
|
||||
func TestContextDeadline(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue