mirror of https://github.com/milvus-io/milvus.git
Refactor retry logic with max sleep time
Signed-off-by: dragondriver <jiquan.long@zilliz.com>pull/4973/head^2
parent
0c2f40f580
commit
9920877e51
|
@ -137,7 +137,10 @@ type Allocator struct {
|
|||
}
|
||||
|
||||
func (ta *Allocator) Start() error {
|
||||
err := ta.connectMaster()
|
||||
connectMasterFn := func() error {
|
||||
return ta.connectMaster()
|
||||
}
|
||||
err := Retry(10, time.Millisecond*200, connectMasterFn)
|
||||
if err != nil {
|
||||
panic("connect to master failed")
|
||||
}
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
package allocator
|
||||
|
||||
import (
|
||||
"log"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Reference: https://blog.cyeam.com/golang/2018/08/27/retry
|
||||
|
||||
func RetryImpl(attempts int, sleep time.Duration, fn func() error, maxSleepTime time.Duration) error {
|
||||
if err := fn(); err != nil {
|
||||
if s, ok := err.(InterruptError); ok {
|
||||
return s.error
|
||||
}
|
||||
|
||||
if attempts--; attempts > 0 {
|
||||
log.Printf("retry func error: %s. attempts #%d after %s.", err.Error(), attempts, sleep)
|
||||
time.Sleep(sleep)
|
||||
if sleep < maxSleepTime {
|
||||
return RetryImpl(attempts, 2*sleep, fn, maxSleepTime)
|
||||
}
|
||||
return RetryImpl(attempts, maxSleepTime, fn, maxSleepTime)
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Retry(attempts int, sleep time.Duration, fn func() error) error {
|
||||
maxSleepTime := time.Millisecond * 1000
|
||||
return RetryImpl(attempts, sleep, fn, maxSleepTime)
|
||||
}
|
||||
|
||||
type InterruptError struct {
|
||||
error
|
||||
}
|
||||
|
||||
func NoRetryError(err error) InterruptError {
|
||||
return InterruptError{err}
|
||||
}
|
|
@ -54,34 +54,48 @@ func CreateBuilder(ctx context.Context) (*Builder, error) {
|
|||
loopCancel: cancel,
|
||||
}
|
||||
|
||||
etcdAddress := Params.EtcdAddress
|
||||
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}})
|
||||
connectEtcdFn := func() error {
|
||||
etcdAddress := Params.EtcdAddress
|
||||
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
|
||||
metakv, err := NewMetaTable(etcdKV)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.metaTable = metakv
|
||||
return nil
|
||||
}
|
||||
err := Retry(10, time.Millisecond*200, connectEtcdFn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
|
||||
metakv, err := NewMetaTable(etcdKV)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
b.metaTable = metakv
|
||||
|
||||
idAllocator, err := allocator.NewIDAllocator(b.loopCtx, Params.MasterAddress)
|
||||
b.idAllocator = idAllocator
|
||||
|
||||
option := &miniokv.Option{
|
||||
Address: Params.MinIOAddress,
|
||||
AccessKeyID: Params.MinIOAccessKeyID,
|
||||
SecretAccessKeyID: Params.MinIOSecretAccessKey,
|
||||
UseSSL: Params.MinIOUseSSL,
|
||||
BucketName: Params.MinioBucketName,
|
||||
CreateBucket: true,
|
||||
connectMinIOFn := func() error {
|
||||
option := &miniokv.Option{
|
||||
Address: Params.MinIOAddress,
|
||||
AccessKeyID: Params.MinIOAccessKeyID,
|
||||
SecretAccessKeyID: Params.MinIOSecretAccessKey,
|
||||
UseSSL: Params.MinIOUseSSL,
|
||||
BucketName: Params.MinioBucketName,
|
||||
CreateBucket: true,
|
||||
}
|
||||
|
||||
b.kv, err = miniokv.NewMinIOKV(b.loopCtx, option)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
b.kv, err = miniokv.NewMinIOKV(b.loopCtx, option)
|
||||
err = Retry(10, time.Millisecond*200, connectMinIOFn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
b.idAllocator = idAllocator
|
||||
|
||||
b.sched, err = NewTaskScheduler(b.loopCtx, b.idAllocator, b.kv, b.metaTable)
|
||||
if err != nil {
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
package indexbuilder
|
||||
|
||||
import (
|
||||
"log"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Reference: https://blog.cyeam.com/golang/2018/08/27/retry
|
||||
|
||||
func RetryImpl(attempts int, sleep time.Duration, fn func() error, maxSleepTime time.Duration) error {
|
||||
if err := fn(); err != nil {
|
||||
if s, ok := err.(InterruptError); ok {
|
||||
return s.error
|
||||
}
|
||||
|
||||
if attempts--; attempts > 0 {
|
||||
log.Printf("retry func error: %s. attempts #%d after %s.", err.Error(), attempts, sleep)
|
||||
time.Sleep(sleep)
|
||||
if sleep < maxSleepTime {
|
||||
return RetryImpl(attempts, 2*sleep, fn, maxSleepTime)
|
||||
}
|
||||
return RetryImpl(attempts, maxSleepTime, fn, maxSleepTime)
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Retry(attempts int, sleep time.Duration, fn func() error) error {
|
||||
maxSleepTime := time.Millisecond * 1000
|
||||
return RetryImpl(attempts, sleep, fn, maxSleepTime)
|
||||
}
|
||||
|
||||
type InterruptError struct {
|
||||
error
|
||||
}
|
||||
|
||||
func NoRetryError(err error) InterruptError {
|
||||
return InterruptError{err}
|
||||
}
|
Loading…
Reference in New Issue