Fix Msgstream exponential retry (#6807)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/6842/head
Xiaofan 2021-08-03 10:39:24 +08:00 committed by GitHub
parent d2767f920c
commit aba234fa3f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 29 additions and 54 deletions

View File

@ -25,6 +25,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/mqclient"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
@ -100,7 +101,7 @@ func (ms *mqMsgStream) AsProducer(channels []string) {
ms.producerLock.Unlock()
return nil
}
err := Retry(20, time.Millisecond*200, fn)
err := retry.Do(context.TODO(), fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200))
if err != nil {
errMsg := "Failed to create producer " + channel + ", error = " + err.Error()
panic(errMsg)
@ -135,7 +136,7 @@ func (ms *mqMsgStream) AsConsumer(channels []string, subName string) {
ms.consumerLock.Unlock()
return nil
}
err := Retry(20, time.Millisecond*200, fn)
err := retry.Do(context.TODO(), fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200))
if err != nil {
errMsg := "Failed to create consumer " + channel + ", error = " + err.Error()
panic(errMsg)
@ -497,7 +498,7 @@ func (ms *MqTtMsgStream) AsConsumer(channels []string, subName string) {
ms.consumerLock.Unlock()
return nil
}
err := Retry(10, time.Millisecond*200, fn)
err := retry.Do(context.TODO(), fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200))
if err != nil {
errMsg := "Failed to create consumer " + channel + ", error = " + err.Error()
panic(errMsg)
@ -733,7 +734,7 @@ func (ms *MqTtMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
if len(mp.MsgID) == 0 {
return fmt.Errorf("when msgID's length equal to 0, please use AsConsumer interface")
}
if err = Retry(20, time.Millisecond*200, fn); err != nil {
if err = retry.Do(context.TODO(), fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200)); err != nil {
return fmt.Errorf("Failed to seek, error %s", err.Error())
}
ms.addConsumer(consumer, mp.ChannelName)

View File

@ -1,45 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package msgstream
import (
"time"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
// Reference: https://blog.cyeam.com/golang/2018/08/27/retry
func Retry(attempts int, sleep time.Duration, fn func() error) error {
if err := fn(); err != nil {
if s, ok := err.(InterruptError); ok {
return s.error
}
if attempts--; attempts > 0 {
log.Debug("retry func error", zap.Int("attempts", attempts), zap.Duration("sleep", sleep), zap.Error(err))
time.Sleep(sleep)
return Retry(attempts, 2*sleep, fn)
}
return err
}
return nil
}
type InterruptError struct {
error
}
func NoRetryError(err error) InterruptError {
return InterruptError{err}
}

View File

@ -26,13 +26,13 @@ import (
"github.com/milvus-io/milvus/internal/kv"
minioKV "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/retry"
)
type indexParam = map[string]string
@ -107,7 +107,10 @@ func (loader *indexLoader) loadIndex(segment *Segment, fieldID int64) error {
}
return nil
}
err = msgstream.Retry(5, time.Millisecond*200, fn)
//TODO retry should be set by config
err = retry.Do(context.TODO(), fn, retry.Attempts(10),
retry.Sleep(time.Second*1), retry.MaxSleepTime(time.Second*10))
if err != nil {
return err
}

View File

@ -23,7 +23,7 @@ func NewDefaultConfig() *Config {
return &Config{
attempts: uint(10),
sleep: 200 * time.Millisecond,
maxSleepTime: 1 * time.Second,
maxSleepTime: 3 * time.Second,
}
}
@ -38,11 +38,20 @@ func Attempts(attempts uint) Option {
func Sleep(sleep time.Duration) Option {
return func(c *Config) {
c.sleep = sleep
// ensure max retry interval is always larger than retry interval
if c.sleep*2 > c.maxSleepTime {
c.maxSleepTime = 2 * c.sleep
}
}
}
func MaxSleepTime(maxSleepTime time.Duration) Option {
return func(c *Config) {
c.maxSleepTime = maxSleepTime
// ensure max retry interval is always larger than retry interval
if c.sleep*2 > maxSleepTime {
c.maxSleepTime = 2 * c.sleep
} else {
c.maxSleepTime = maxSleepTime
}
}
}

View File

@ -32,6 +32,7 @@ func Do(ctx context.Context, fn func() error, opts ...Option) error {
if s, ok := err.(InterruptError); ok {
return s.error
}
// TODO early termination if this is unretriable error?
el[i] = err
select {
@ -53,10 +54,15 @@ func Do(ctx context.Context, fn func() error, opts ...Option) error {
type ErrorList []error
// TODO shouldn't print all retries, might be too much
func (el ErrorList) Error() string {
var builder strings.Builder
builder.WriteString("All attempts results:\n")
for index, err := range el {
// if early termination happens
if err == nil {
break
}
builder.WriteString(fmt.Sprintf("attempt #%d:%s\n", index+1, err.Error()))
}
return builder.String()

View File

@ -17,6 +17,7 @@ import (
"testing"
"time"
"github.com/lingdor/stackerror"
"github.com/stretchr/testify/assert"
)
@ -76,7 +77,7 @@ func TestAllError(t *testing.T) {
ctx := context.Background()
testFn := func() error {
return fmt.Errorf("some error")
return stackerror.New("some error")
}
err := Do(ctx, testFn, Attempts(3))