change retry (#5996)

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
pull/5998/head
godchen 2021-06-23 09:24:10 +08:00 committed by GitHub
parent 6406f9e2de
commit 99be4c09a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 460 additions and 705 deletions

View File

@ -23,6 +23,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
@ -71,7 +72,7 @@ func NewIDAllocator(ctx context.Context, metaRoot string, etcdEndpoints []string
func (ia *IDAllocator) Start() error {
var err error
ia.rootCoordClient, err = rcc.NewClient(ia.Ctx, ia.metaRoot, ia.etcdEndpoints, 3*time.Second)
ia.rootCoordClient, err = rcc.NewClient(ia.Ctx, ia.metaRoot, ia.etcdEndpoints, retry.Attempts(300))
if err != nil {
panic(err)
}

View File

@ -15,6 +15,7 @@ import (
"sync"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/retry"
)
const retryTimes = 2
@ -42,7 +43,7 @@ func newClusterSessionManager(ctx context.Context, dataClientCreator dataNodeCre
// lock acquired
func (m *clusterSessionManager) createSession(addr string) (types.DataNode, error) {
cli, err := m.dataClientCreator(m.ctx, addr)
cli, err := m.dataClientCreator(m.ctx, addr, retry.Attempts(300))
if err != nil {
return nil, err
}

View File

@ -48,8 +48,8 @@ type (
Timestamp = typeutil.Timestamp
)
type dataNodeCreatorFunc func(ctx context.Context, addr string) (types.DataNode, error)
type rootCoordCreatorFunc func(ctx context.Context) (types.RootCoord, error)
type dataNodeCreatorFunc func(ctx context.Context, addr string, retryOptions ...retry.Option) (types.DataNode, error)
type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdEndpoints []string, retryOptions ...retry.Option) (types.RootCoord, error)
type Server struct {
ctx context.Context
@ -91,12 +91,12 @@ func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, erro
return s, nil
}
func defaultDataNodeCreatorFunc(ctx context.Context, addr string) (types.DataNode, error) {
return datanodeclient.NewClient(ctx, addr, 3*time.Second)
func defaultDataNodeCreatorFunc(ctx context.Context, addr string, retryOptions ...retry.Option) (types.DataNode, error) {
return datanodeclient.NewClient(ctx, addr, retryOptions)
}
func defaultRootCoordCreatorFunc(ctx context.Context) (types.RootCoord, error) {
return rootcoordclient.NewClient(ctx, Params.MetaRootPath, Params.EtcdEndpoints, rootCoordClientTimout)
func defaultRootCoordCreatorFunc(ctx context.Context, metaRootPath string, etcdEndpoints []string, retryOptions ...retry.Option) (types.RootCoord, error) {
return rootcoordclient.NewClient(ctx, metaRootPath, etcdEndpoints, retryOptions...)
}
// Register register data service at etcd
@ -222,7 +222,7 @@ func (s *Server) initMeta() error {
}
return nil
}
return retry.Retry(connEtcdMaxRetryTime, connEtcdRetryInterval, connectEtcdFn)
return retry.Do(s.ctx, connectEtcdFn, retry.Attempts(connEtcdMaxRetryTime))
}
func (s *Server) initFlushMsgStream() error {
@ -449,7 +449,7 @@ func (s *Server) handleFlushingSegments(ctx context.Context) {
func (s *Server) initRootCoordClient() error {
var err error
if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx); err != nil {
if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints, retry.Attempts(300)); err != nil {
return err
}
if err = s.rootCoordClient.Init(); err != nil {

View File

@ -636,7 +636,7 @@ func TestGetRecoveryInfo(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoord, error) {
svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdEndpoints []string, retryOptions ...retry.Option) (types.RootCoord, error) {
return newMockRootCoordService(), nil
}
@ -773,10 +773,10 @@ func newTestServer(t *testing.T, receiveCh chan interface{}) *Server {
svr, err := CreateServer(context.TODO(), factory)
assert.Nil(t, err)
svr.dataClientCreator = func(ctx context.Context, addr string) (types.DataNode, error) {
svr.dataClientCreator = func(ctx context.Context, addr string, retryOptions ...retry.Option) (types.DataNode, error) {
return newMockDataNodeClient(0, receiveCh)
}
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoord, error) {
svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdEndpoints []string, retryOptions ...retry.Option) (types.RootCoord, error) {
return newMockRootCoordService(), nil
}
assert.Nil(t, err)
@ -806,7 +806,7 @@ func initEtcd(etcdEndpoints []string) (*clientv3.Client, error) {
etcdCli = etcd
return nil
}
err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
err := retry.Do(context.TODO(), connectEtcdFn, retry.Attempts(300))
if err != nil {
return nil, err
}

View File

@ -34,16 +34,16 @@ import (
)
type Client struct {
ctx context.Context
cancel context.CancelFunc
grpcClient datapb.DataCoordClient
conn *grpc.ClientConn
ctx context.Context
addr string
sess *sessionutil.Session
addr string
timeout time.Duration
recallTry int
reconnTry int
retryOptions []retry.Option
}
func getDataCoordAddress(sess *sessionutil.Session) (string, error) {
@ -61,24 +61,24 @@ func getDataCoordAddress(sess *sessionutil.Session) (string, error) {
return ms.Address, nil
}
func NewClient(metaRoot string, etcdEndpoints []string, timeout time.Duration) *Client {
sess := sessionutil.NewSession(context.Background(), metaRoot, etcdEndpoints)
return &Client{
ctx: context.Background(),
sess: sess,
timeout: timeout,
recallTry: 3,
reconnTry: 10,
func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string, retryOptions ...retry.Option) (*Client, error) {
sess := sessionutil.NewSession(ctx, metaRoot, etcdEndpoints)
if sess == nil {
err := fmt.Errorf("new session error, maybe can not connect to etcd")
log.Debug("DataCoordClient NewClient failed", zap.Error(err))
return nil, err
}
ctx, cancel := context.WithCancel(ctx)
return &Client{
ctx: ctx,
cancel: cancel,
sess: sess,
retryOptions: retryOptions,
}, nil
}
func (c *Client) Init() error {
// for now, we must try many times in Init Stage
initFunc := func() error {
return c.connect()
}
err := retry.Retry(10000, 3*time.Second, initFunc)
return err
return c.connect()
}
func (c *Client) connect() error {
@ -90,17 +90,16 @@ func (c *Client) connect() error {
}
return nil
}
err = retry.Retry(c.reconnTry, 3*time.Second, getDataCoordAddressFn)
err = retry.Do(c.ctx, getDataCoordAddressFn, c.retryOptions...)
if err != nil {
log.Debug("DataCoordClient try reconnect getDataCoordAddressFn failed", zap.Error(err))
return err
}
connectGrpcFunc := func() error {
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
defer cancelFunc()
opts := trace.GetInterceptorOpts()
log.Debug("DataCoordClient try reconnect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
conn, err := grpc.DialContext(c.ctx, c.addr,
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(5*time.Second),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(),
@ -119,7 +118,7 @@ func (c *Client) connect() error {
return nil
}
err = retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc)
err = retry.Do(c.ctx, connectGrpcFunc, c.retryOptions...)
if err != nil {
log.Debug("DataCoord try reconnect failed", zap.Error(err))
return err
@ -133,14 +132,9 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error)
if err == nil {
return ret, nil
}
for i := 0; i < c.reconnTry; i++ {
err = c.connect()
if err == nil {
break
}
}
err = c.connect()
if err != nil {
return nil, err
return ret, err
}
ret, err = caller()
if err == nil {
@ -154,6 +148,7 @@ func (c *Client) Start() error {
}
func (c *Client) Stop() error {
c.cancel()
return c.conn.Close()
}

View File

@ -33,50 +33,41 @@ import (
)
type Client struct {
ctx context.Context
ctx context.Context
cancel context.CancelFunc
grpc datapb.DataNodeClient
conn *grpc.ClientConn
addr string
timeout time.Duration
reconnTry int
recallTry int
retryOptions []retry.Option
}
func NewClient(ctx context.Context, addr string, timeout time.Duration) (*Client, error) {
func NewClient(ctx context.Context, addr string, retryOptions []retry.Option) (*Client, error) {
if addr == "" {
return nil, fmt.Errorf("address is empty")
}
ctx, cancel := context.WithCancel(ctx)
return &Client{
grpc: nil,
conn: nil,
addr: addr,
ctx: ctx,
timeout: timeout,
recallTry: 3,
reconnTry: 10,
ctx: ctx,
cancel: cancel,
addr: addr,
retryOptions: retryOptions,
}, nil
}
func (c *Client) Init() error {
// for now, we must try many times in Init Stage
initFunc := func() error {
return c.connect()
}
err := retry.Retry(10000, 3*time.Second, initFunc)
return err
return c.connect()
}
func (c *Client) connect() error {
connectGrpcFunc := func() error {
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
defer cancelFunc()
opts := trace.GetInterceptorOpts()
log.Debug("DataNode connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
conn, err := grpc.DialContext(c.ctx, c.addr,
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(5*time.Second),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(),
@ -95,7 +86,7 @@ func (c *Client) connect() error {
return nil
}
err := retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc)
err := retry.Do(c.ctx, connectGrpcFunc, c.retryOptions...)
if err != nil {
log.Debug("DataNodeClient try connect failed", zap.Error(err))
return err
@ -110,14 +101,9 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error)
if err == nil {
return ret, nil
}
for i := 0; i < c.reconnTry; i++ {
err = c.connect()
if err == nil {
break
}
}
err = c.connect()
if err != nil {
return nil, err
return ret, err
}
ret, err = caller()
if err == nil {
@ -131,6 +117,7 @@ func (c *Client) Start() error {
}
func (c *Client) Stop() error {
c.cancel()
return c.conn.Close()
}

View File

@ -18,7 +18,6 @@ import (
"net"
"strconv"
"testing"
"time"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -26,6 +25,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/stretchr/testify/assert"
)
@ -111,11 +111,11 @@ func TestRun(t *testing.T) {
Params.Init()
dnServer.newRootCoordClient = func() (types.RootCoord, error) {
dnServer.newRootCoordClient = func(string, []string, ...retry.Option) (types.RootCoord, error) {
return &mockRootCoord{}, nil
}
dnServer.newDataCoordClient = func(string, []string, time.Duration) types.DataCoord {
return &mockDataCoord{}
dnServer.newDataCoordClient = func(string, []string, ...retry.Option) (types.DataCoord, error) {
return &mockDataCoord{}, nil
}
grpcPort := rand.Int()%100 + 10000

View File

@ -38,6 +38,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/trace"
)
@ -54,8 +55,8 @@ type Server struct {
rootCoord types.RootCoord
dataCoord types.DataCoord
newRootCoordClient func() (types.RootCoord, error)
newDataCoordClient func(string, []string, time.Duration) types.DataCoord
newRootCoordClient func(string, []string, ...retry.Option) (types.RootCoord, error)
newDataCoordClient func(string, []string, ...retry.Option) (types.DataCoord, error)
closer io.Closer
}
@ -68,11 +69,11 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
cancel: cancel,
msFactory: factory,
grpcErrChan: make(chan error),
newRootCoordClient: func() (types.RootCoord, error) {
return rcc.NewClient(ctx1, dn.Params.MetaRootPath, dn.Params.EtcdEndpoints, 3*time.Second)
newRootCoordClient: func(etcdMetaRoot string, etcdEndpoints []string, retryOptions ...retry.Option) (types.RootCoord, error) {
return rcc.NewClient(ctx1, etcdMetaRoot, etcdEndpoints, retryOptions...)
},
newDataCoordClient: func(etcdMetaRoot string, etcdEndpoints []string, timeout time.Duration) types.DataCoord {
return dsc.NewClient(etcdMetaRoot, etcdEndpoints, timeout)
newDataCoordClient: func(etcdMetaRoot string, etcdEndpoints []string, retryOptions ...retry.Option) (types.DataCoord, error) {
return dsc.NewClient(ctx1, etcdMetaRoot, etcdEndpoints, retryOptions...)
},
}
@ -177,7 +178,7 @@ func (s *Server) init() error {
if s.newRootCoordClient != nil {
log.Debug("RootCoord address", zap.String("address", Params.RootCoordAddress))
log.Debug("Init root coord client ...")
rootCoordClient, err := s.newRootCoordClient()
rootCoordClient, err := s.newRootCoordClient(dn.Params.MetaRootPath, dn.Params.EtcdEndpoints, retry.Attempts(300))
if err != nil {
log.Debug("DataNode newRootCoordClient failed", zap.Error(err))
panic(err)
@ -205,7 +206,11 @@ func (s *Server) init() error {
if s.newDataCoordClient != nil {
log.Debug("Data service address", zap.String("address", Params.DataCoordAddress))
log.Debug("DataNode Init data service client ...")
dataCoordClient := s.newDataCoordClient(dn.Params.MetaRootPath, dn.Params.EtcdEndpoints, 10*time.Second)
dataCoordClient, err := s.newDataCoordClient(dn.Params.MetaRootPath, dn.Params.EtcdEndpoints, retry.Attempts(300))
if err != nil {
log.Debug("DataNode newDataCoordClient failed", zap.Error(err))
panic(err)
}
if err = dataCoordClient.Init(); err != nil {
log.Debug("DataNode newDataCoord failed", zap.Error(err))
panic(err)

View File

@ -34,19 +34,17 @@ import (
"github.com/milvus-io/milvus/internal/proto/milvuspb"
)
type UniqueID = typeutil.UniqueID
type Client struct {
ctx context.Context
ctx context.Context
cancel context.CancelFunc
grpcClient indexpb.IndexCoordClient
conn *grpc.ClientConn
addr string
sess *sessionutil.Session
timeout time.Duration
recallTry int
reconnTry int
retryOptions []retry.Option
}
func getIndexCoordAddr(sess *sessionutil.Session) (string, error) {
@ -65,24 +63,24 @@ func getIndexCoordAddr(sess *sessionutil.Session) (string, error) {
return ms.Address, nil
}
func NewClient(metaRoot string, etcdEndpoints []string, timeout time.Duration) *Client {
sess := sessionutil.NewSession(context.Background(), metaRoot, etcdEndpoints)
return &Client{
ctx: context.Background(),
sess: sess,
timeout: timeout,
recallTry: 3,
reconnTry: 10,
func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string, retryOptions ...retry.Option) (*Client, error) {
sess := sessionutil.NewSession(ctx, metaRoot, etcdEndpoints)
if sess == nil {
err := fmt.Errorf("new session error, maybe can not connect to etcd")
log.Debug("RootCoordClient NewClient failed", zap.Error(err))
return nil, err
}
ctx, cancel := context.WithCancel(ctx)
return &Client{
ctx: ctx,
cancel: cancel,
sess: sess,
retryOptions: retryOptions,
}, nil
}
func (c *Client) Init() error {
// for now, we must try many times in Init Stage
initFunc := func() error {
return c.connect()
}
err := retry.Retry(10000, 3*time.Second, initFunc)
return err
return c.connect()
}
func (c *Client) connect() error {
@ -94,18 +92,17 @@ func (c *Client) connect() error {
}
return nil
}
err = retry.Retry(c.reconnTry, 3*time.Second, getIndexCoordaddrFn)
err = retry.Do(c.ctx, getIndexCoordaddrFn, c.retryOptions...)
if err != nil {
log.Debug("IndexCoordClient getIndexCoordAddress failed", zap.Error(err))
return err
}
log.Debug("IndexCoordClient getIndexCoordAddress success")
connectGrpcFunc := func() error {
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
defer cancelFunc()
opts := trace.GetInterceptorOpts()
log.Debug("IndexCoordClient try connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
conn, err := grpc.DialContext(c.ctx, c.addr,
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(5*time.Second),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(),
@ -124,7 +121,7 @@ func (c *Client) connect() error {
return nil
}
err = retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc)
err = retry.Do(c.ctx, connectGrpcFunc, c.retryOptions...)
if err != nil {
log.Debug("IndexCoordClient try connect failed", zap.Error(err))
return err
@ -139,14 +136,9 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error)
if err == nil {
return ret, nil
}
for i := 0; i < c.reconnTry; i++ {
err = c.connect()
if err == nil {
break
}
}
err = c.connect()
if err != nil {
return nil, err
return ret, err
}
ret, err = caller()
if err == nil {
@ -160,7 +152,8 @@ func (c *Client) Start() error {
}
func (c *Client) Stop() error {
return nil
c.cancel()
return c.conn.Close()
}
// Register dummy

View File

@ -32,46 +32,41 @@ import (
)
type Client struct {
ctx context.Context
cancel context.CancelFunc
grpcClient indexpb.IndexNodeClient
conn *grpc.ClientConn
ctx context.Context
addr string
timeout time.Duration
reconnTry int
recallTry int
retryOptions []retry.Option
}
func NewClient(addr string, timeout time.Duration) (*Client, error) {
func NewClient(ctx context.Context, addr string, retryOptions ...retry.Option) (*Client, error) {
if addr == "" {
return nil, fmt.Errorf("address is empty")
}
ctx, cancel := context.WithCancel(ctx)
return &Client{
addr: addr,
ctx: context.Background(),
timeout: timeout,
recallTry: 3,
reconnTry: 10,
ctx: ctx,
cancel: cancel,
addr: addr,
retryOptions: retryOptions,
}, nil
}
func (c *Client) Init() error {
// for now, we must try many times in Init Stage
initFunc := func() error {
return c.connect()
}
err := retry.Retry(10000, 3*time.Second, initFunc)
return err
return c.connect()
}
func (c *Client) connect() error {
connectGrpcFunc := func() error {
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
defer cancelFunc()
opts := trace.GetInterceptorOpts()
log.Debug("IndexNodeClient try connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
conn, err := grpc.DialContext(c.ctx, c.addr,
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(5*time.Second),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(),
@ -90,7 +85,7 @@ func (c *Client) connect() error {
return nil
}
err := retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc)
err := retry.Do(c.ctx, connectGrpcFunc, c.retryOptions...)
if err != nil {
log.Debug("IndexNodeClient try connect failed", zap.Error(err))
return err
@ -105,14 +100,9 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error)
if err == nil {
return ret, nil
}
for i := 0; i < c.reconnTry; i++ {
err = c.connect()
if err == nil {
break
}
}
err = c.connect()
if err != nil {
return nil, err
return ret, err
}
ret, err = caller()
if err == nil {

View File

@ -19,7 +19,6 @@ import (
"net"
"strconv"
"sync"
"time"
"go.uber.org/zap"
@ -33,6 +32,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/trace"
"google.golang.org/grpc"
)
@ -136,10 +136,14 @@ func (s *Server) init() error {
return err
}
s.indexCoordClient = grpcindexcoordclient.NewClient(indexnode.Params.MetaRootPath, indexnode.Params.EtcdEndpoints, 3*time.Second)
s.indexCoordClient, err = grpcindexcoordclient.NewClient(s.loopCtx, indexnode.Params.MetaRootPath, indexnode.Params.EtcdEndpoints, retry.Attempts(300))
if err != nil {
log.Debug("New indexCoordeClient failed", zap.Error(err))
return err
}
err = s.indexCoordClient.Init()
if err != nil {
log.Debug("IndexNode indexSerticeClient init failed", zap.Error(err))
log.Debug("IndexNode indexCoordeClient init failed", zap.Error(err))
return err
}
s.indexnode.SetIndexCoordClient(s.indexCoordClient)

View File

@ -13,6 +13,7 @@ package grpcproxyclient
import (
"context"
"fmt"
"time"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
@ -30,42 +31,41 @@ import (
)
type Client struct {
ctx context.Context
cancel context.CancelFunc
grpcClient proxypb.ProxyClient
conn *grpc.ClientConn
ctx context.Context
addr string
timeout time.Duration
reconnTry int
recallTry int
addr string
retryOptions []retry.Option
}
func NewClient(addr string, timeout time.Duration) *Client {
return &Client{
addr: addr,
ctx: context.Background(),
timeout: timeout,
recallTry: 3,
reconnTry: 10,
func NewClient(ctx context.Context, addr string, retryOptions ...retry.Option) (*Client, error) {
if addr == "" {
return nil, fmt.Errorf("address is empty")
}
ctx, cancel := context.WithCancel(ctx)
return &Client{
ctx: ctx,
cancel: cancel,
addr: addr,
retryOptions: retryOptions,
}, nil
}
func (c *Client) Init() error {
// for now, we must try many times in Init Stage
initFunc := func() error {
return c.connect()
}
err := retry.Retry(10000, 3*time.Second, initFunc)
return err
return c.connect()
}
func (c *Client) connect() error {
connectGrpcFunc := func() error {
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
defer cancelFunc()
opts := trace.GetInterceptorOpts()
log.Debug("ProxyClient try connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
log.Debug("ProxyNodeClient try connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(c.ctx, c.addr,
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(5*time.Second),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(),
@ -84,7 +84,7 @@ func (c *Client) connect() error {
return nil
}
err := retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc)
err := retry.Do(c.ctx, connectGrpcFunc, c.retryOptions...)
if err != nil {
log.Debug("ProxyClient try connect failed", zap.Error(err))
return err
@ -99,14 +99,9 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error)
if err == nil {
return ret, nil
}
for i := 0; i < c.reconnTry; i++ {
err = c.connect()
if err == nil {
break
}
}
err = c.connect()
if err != nil {
return nil, err
return ret, err
}
ret, err = caller()
if err == nil {

View File

@ -38,6 +38,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proxy"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/opentracing/opentracing-go"
)
@ -169,8 +170,7 @@ func (s *Server) init() error {
rootCoordAddr := Params.RootCoordAddress
log.Debug("Proxy", zap.String("RootCoord address", rootCoordAddr))
timeout := 3 * time.Second
s.rootCoordClient, err = rcc.NewClient(s.ctx, proxy.Params.MetaRootPath, proxy.Params.EtcdEndpoints, timeout)
s.rootCoordClient, err = rcc.NewClient(s.ctx, proxy.Params.MetaRootPath, proxy.Params.EtcdEndpoints, retry.Attempts(300))
if err != nil {
log.Debug("Proxy new rootCoordClient failed ", zap.Error(err))
return err
@ -190,7 +190,11 @@ func (s *Server) init() error {
dataCoordAddr := Params.DataCoordAddress
log.Debug("Proxy", zap.String("data coordinator address", dataCoordAddr))
s.dataCoordClient = grpcdatacoordclient.NewClient(proxy.Params.MetaRootPath, proxy.Params.EtcdEndpoints, timeout)
s.dataCoordClient, err = grpcdatacoordclient.NewClient(s.ctx, proxy.Params.MetaRootPath, proxy.Params.EtcdEndpoints, retry.Attempts(300))
if err != nil {
log.Debug("Proxy new dataCoordClient failed ", zap.Error(err))
return err
}
err = s.dataCoordClient.Init()
if err != nil {
log.Debug("Proxy dataCoordClient init failed ", zap.Error(err))
@ -202,8 +206,11 @@ func (s *Server) init() error {
indexCoordAddr := Params.IndexCoordAddress
log.Debug("Proxy", zap.String("index coordinator address", indexCoordAddr))
s.indexCoordClient = grpcindexcoordclient.NewClient(proxy.Params.MetaRootPath, proxy.Params.EtcdEndpoints, timeout)
s.indexCoordClient, err = grpcindexcoordclient.NewClient(s.ctx, proxy.Params.MetaRootPath, proxy.Params.EtcdEndpoints, retry.Attempts(300))
if err != nil {
log.Debug("Proxy new indexCoordClient failed ", zap.Error(err))
return err
}
err = s.indexCoordClient.Init()
if err != nil {
log.Debug("Proxy indexCoordClient init failed ", zap.Error(err))
@ -215,7 +222,7 @@ func (s *Server) init() error {
queryCoordAddr := Params.QueryCoordAddress
log.Debug("Proxy", zap.String("query coordinator address", queryCoordAddr))
s.queryCooedClient, err = grpcquerycoordclient.NewClient(proxy.Params.MetaRootPath, proxy.Params.EtcdEndpoints, timeout)
s.queryCooedClient, err = grpcquerycoordclient.NewClient(s.ctx, proxy.Params.MetaRootPath, proxy.Params.EtcdEndpoints, retry.Attempts(300))
if err != nil {
return err
}

View File

@ -34,15 +34,16 @@ import (
)
type Client struct {
ctx context.Context
ctx context.Context
cancel context.CancelFunc
grpcClient querypb.QueryCoordClient
conn *grpc.ClientConn
addr string
timeout time.Duration
sess *sessionutil.Session
reconnTry int
recallTry int
sess *sessionutil.Session
addr string
retryOptions []retry.Option
}
func getQueryCoordAddress(sess *sessionutil.Session) (string, error) {
@ -61,27 +62,24 @@ func getQueryCoordAddress(sess *sessionutil.Session) (string, error) {
}
// NewClient creates a client for QueryService grpc call.
func NewClient(metaRootPath string, etcdEndpoints []string, timeout time.Duration) (*Client, error) {
sess := sessionutil.NewSession(context.Background(), metaRootPath, etcdEndpoints)
func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string, retryOptions ...retry.Option) (*Client, error) {
sess := sessionutil.NewSession(ctx, metaRoot, etcdEndpoints)
if sess == nil {
err := fmt.Errorf("new session error, maybe can not connect to etcd")
log.Debug("QueryCoordClient NewClient failed", zap.Error(err))
return nil, err
}
ctx, cancel := context.WithCancel(ctx)
return &Client{
ctx: context.Background(),
grpcClient: nil,
conn: nil,
timeout: timeout,
reconnTry: 10,
recallTry: 3,
sess: sess,
ctx: ctx,
cancel: cancel,
sess: sess,
retryOptions: retryOptions,
}, nil
}
func (c *Client) Init() error {
// for now, we must try many times in Init Stage
initFunc := func() error {
return c.connect()
}
err := retry.Retry(10000, 3*time.Second, initFunc)
return err
return c.connect()
}
func (c *Client) connect() error {
@ -93,17 +91,16 @@ func (c *Client) connect() error {
}
return nil
}
err = retry.Retry(c.reconnTry, 3*time.Second, getQueryCoordAddressFn)
err = retry.Do(c.ctx, getQueryCoordAddressFn, c.retryOptions...)
if err != nil {
log.Debug("QueryCoordClient getQueryCoordAddress failed", zap.Error(err))
return err
}
connectGrpcFunc := func() error {
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
defer cancelFunc()
opts := trace.GetInterceptorOpts()
log.Debug("QueryCoordClient try reconnect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
conn, err := grpc.DialContext(c.ctx, c.addr,
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(5*time.Second),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(),
@ -122,7 +119,7 @@ func (c *Client) connect() error {
return nil
}
err = retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc)
err = retry.Do(c.ctx, connectGrpcFunc, c.retryOptions...)
if err != nil {
log.Debug("QueryCoordClient try reconnect failed", zap.Error(err))
return err
@ -131,19 +128,15 @@ func (c *Client) connect() error {
c.grpcClient = querypb.NewQueryCoordClient(c.conn)
return nil
}
func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) {
ret, err := caller()
if err == nil {
return ret, nil
}
for i := 0; i < c.reconnTry; i++ {
err = c.connect()
if err == nil {
break
}
}
err = c.connect()
if err != nil {
return nil, err
return ret, err
}
ret, err = caller()
if err == nil {
@ -157,6 +150,7 @@ func (c *Client) Start() error {
}
func (c *Client) Stop() error {
c.cancel()
return c.conn.Close()
}

View File

@ -28,6 +28,7 @@ import (
qc "github.com/milvus-io/milvus/internal/querycoord"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/trace"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -107,7 +108,7 @@ func (s *Server) init() error {
// --- Master Server Client ---
log.Debug("QueryCoord try to new RootCoord client", zap.Any("RootCoordAddress", Params.RootCoordAddress))
rootCoord, err := rcc.NewClient(s.loopCtx, qc.Params.MetaRootPath, qc.Params.EtcdEndpoints, 3*time.Second)
rootCoord, err := rcc.NewClient(s.loopCtx, qc.Params.MetaRootPath, qc.Params.EtcdEndpoints, retry.Attempts(300))
if err != nil {
log.Debug("QueryCoord try to new RootCoord client failed", zap.Error(err))
panic(err)
@ -138,7 +139,11 @@ func (s *Server) init() error {
// --- Data service client ---
log.Debug("QueryCoord try to new DataCoord client", zap.Any("DataCoordAddress", Params.DataCoordAddress))
dataCoord := dsc.NewClient(qc.Params.MetaRootPath, qc.Params.EtcdEndpoints, 3*time.Second)
dataCoord, err := dsc.NewClient(s.loopCtx, qc.Params.MetaRootPath, qc.Params.EtcdEndpoints, retry.Attempts(300))
if err != nil {
log.Debug("QueryService try to new DataCoord client failed", zap.Error(err))
panic(err)
}
if err = dataCoord.Init(); err != nil {
log.Debug("QueryCoord DataCoordClient Init failed", zap.Error(err))
panic(err)

View File

@ -17,7 +17,6 @@ import (
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
@ -33,46 +32,41 @@ import (
)
type Client struct {
ctx context.Context
ctx context.Context
cancel context.CancelFunc
grpcClient querypb.QueryNodeClient
conn *grpc.ClientConn
addr string
timeout time.Duration
reconnTry int
recallTry int
retryTimes uint
}
func NewClient(addr string, timeout time.Duration) (*Client, error) {
func NewClient(ctx context.Context, addr string, reTryTimes uint) (*Client, error) {
if addr == "" {
return nil, fmt.Errorf("addr is empty")
}
ctx, cancel := context.WithCancel(ctx)
return &Client{
ctx: context.Background(),
addr: addr,
timeout: timeout,
recallTry: 3,
reconnTry: 10,
ctx: ctx,
cancel: cancel,
addr: addr,
retryTimes: reTryTimes,
}, nil
}
func (c *Client) Init() error {
// for now, we must try many times in Init Stage
initFunc := func() error {
return c.connect()
}
err := retry.Retry(10000, 3*time.Second, initFunc)
return err
return c.connect()
}
func (c *Client) connect() error {
connectGrpcFunc := func() error {
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
defer cancelFunc()
opts := trace.GetInterceptorOpts()
log.Debug("QueryNodeClient try connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
conn, err := grpc.DialContext(c.ctx, c.addr,
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(5*time.Second),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(),
@ -91,7 +85,7 @@ func (c *Client) connect() error {
return nil
}
err := retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc)
err := retry.Do(c.ctx, connectGrpcFunc, retry.Attempts(c.retryTimes))
if err != nil {
log.Debug("QueryNodeClient try connect failed", zap.Error(err))
return err
@ -106,17 +100,9 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error)
if err == nil {
return ret, nil
}
if c.conn.GetState() == connectivity.Shutdown {
return ret, err
}
for i := 0; i < c.reconnTry; i++ {
err = c.connect()
if err == nil {
break
}
}
err = c.connect()
if err != nil {
return nil, err
return ret, err
}
ret, err = caller()
if err == nil {

View File

@ -103,7 +103,7 @@ func (s *Server) init() error {
}
// --- QueryCoord ---
log.Debug("QueryNode start to new QueryCoordClient", zap.Any("QueryCoordAddress", Params.QueryCoordAddress))
queryCoord, err := qcc.NewClient(qn.Params.MetaRootPath, qn.Params.EtcdEndpoints, 3*time.Second)
queryCoord, err := qcc.NewClient(s.ctx, qn.Params.MetaRootPath, qn.Params.EtcdEndpoints, retry.Attempts(300))
if err != nil {
log.Debug("QueryNode new QueryCoordClient failed", zap.Error(err))
panic(err)
@ -136,7 +136,7 @@ func (s *Server) init() error {
addr := Params.RootCoordAddress
log.Debug("QueryNode start to new RootCoordClient", zap.Any("QueryCoordAddress", addr))
rootCoord, err := rcc.NewClient(s.ctx, qn.Params.MetaRootPath, qn.Params.EtcdEndpoints, 3*time.Second)
rootCoord, err := rcc.NewClient(s.ctx, qn.Params.MetaRootPath, qn.Params.EtcdEndpoints, retry.Attempts(300))
if err != nil {
log.Debug("QueryNode new RootCoordClient failed", zap.Error(err))
panic(err)
@ -165,7 +165,12 @@ func (s *Server) init() error {
// --- IndexCoord ---
log.Debug("Index coord", zap.String("address", Params.IndexCoordAddress))
indexCoord := isc.NewClient(qn.Params.MetaRootPath, qn.Params.EtcdEndpoints, 3*time.Second)
indexCoord, err := isc.NewClient(s.ctx, qn.Params.MetaRootPath, qn.Params.EtcdEndpoints, retry.Attempts(300))
if err != nil {
log.Debug("QueryNode new IndexCoordClient failed", zap.Error(err))
panic(err)
}
if err := indexCoord.Init(); err != nil {
log.Debug("QueryNode IndexCoordClient Init failed", zap.Error(err))
@ -191,7 +196,11 @@ func (s *Server) init() error {
// --- DataCoord ---
log.Debug("QueryNode start to new DataCoordClient", zap.Any("DataCoordAddress", Params.DataCoordAddress))
dataCoord := dsc.NewClient(qn.Params.MetaRootPath, qn.Params.EtcdEndpoints, 3*time.Second)
dataCoord, err := dsc.NewClient(s.ctx, qn.Params.MetaRootPath, qn.Params.EtcdEndpoints, retry.Attempts(300))
if err != nil {
log.Debug("QueryNode new DataCoordClient failed", zap.Error(err))
panic(err)
}
if err = dataCoord.Init(); err != nil {
log.Debug("QueryNode DataCoordClient Init failed", zap.Error(err))
panic(err)
@ -230,7 +239,7 @@ func (s *Server) startGrpcLoop(grpcPort int) {
var lis net.Listener
var err error
err = retry.Retry(10, 0, func() error {
err = retry.Do(s.ctx, func() error {
addr := ":" + strconv.Itoa(grpcPort)
lis, err = net.Listen("tcp", addr)
if err == nil {
@ -240,7 +249,7 @@ func (s *Server) startGrpcLoop(grpcPort int) {
grpcPort = 0
}
return err
})
}, retry.Attempts(10))
if err != nil {
log.Error("QueryNode GrpcServer:failed to listen", zap.Error(err))
s.grpcErrChan <- err

View File

@ -13,7 +13,6 @@ package grpcrootcoordclient
import (
"context"
"errors"
"fmt"
"time"
@ -36,17 +35,16 @@ import (
// GrpcClient grpc client
type GrpcClient struct {
ctx context.Context
cancel context.CancelFunc
grpcClient rootcoordpb.RootCoordClient
conn *grpc.ClientConn
ctx context.Context
//inner member
addr string
timeout time.Duration
reconnTry int
recallTry int
sess *sessionutil.Session
addr string
retryOptions []retry.Option
}
func getRootCoordAddr(sess *sessionutil.Session) (string, error) {
@ -70,84 +68,65 @@ func getRootCoordAddr(sess *sessionutil.Session) (string, error) {
// metaRoot is the path in etcd for root coordinator registration
// etcdEndpoints are the address list for etcd end points
// timeout is default setting for each grpc call
func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string, timeout time.Duration) (*GrpcClient, error) {
func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string, retryOptions ...retry.Option) (*GrpcClient, error) {
sess := sessionutil.NewSession(ctx, metaRoot, etcdEndpoints)
if sess == nil {
err := fmt.Errorf("new session error, maybe can not connect to etcd")
log.Debug("RootCoordClient NewClient failed", zap.Error(err))
return nil, err
}
ctx, cancel := context.WithCancel(ctx)
return &GrpcClient{
grpcClient: nil,
conn: nil,
ctx: ctx,
timeout: timeout,
reconnTry: 300,
recallTry: 3,
sess: sess,
ctx: ctx,
cancel: cancel,
sess: sess,
retryOptions: retryOptions,
}, nil
}
func (c *GrpcClient) Init() error {
return c.connect()
}
func (c *GrpcClient) connect() error {
var err error
getRootCoordAddrFn := func() error {
ch := make(chan struct{}, 1)
var err error
go func() {
c.addr, err = getRootCoordAddr(c.sess)
ch <- struct{}{}
}()
select {
case <-c.ctx.Done():
return retry.NoRetryError(errors.New("context canceled"))
case <-ch:
}
c.addr, err = getRootCoordAddr(c.sess)
if err != nil {
return err
}
return nil
}
err = retry.Retry(c.reconnTry, 3*time.Second, getRootCoordAddrFn)
err = retry.Do(c.ctx, getRootCoordAddrFn, c.retryOptions...)
if err != nil {
log.Debug("RootCoordClient getRootCoordAddr failed", zap.Error(err))
return err
}
connectGrpcFunc := func() error {
log.Debug("RootCoordClient try reconnect ", zap.String("address", c.addr))
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
defer cancelFunc()
var conn *grpc.ClientConn
var err error
ch := make(chan struct{}, 1)
opts := trace.GetInterceptorOpts()
go func() {
conn, err = grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(),
grpc_opentracing.UnaryClientInterceptor(opts...),
)),
grpc.WithStreamInterceptor(
grpc_middleware.ChainStreamClient(
grpc_retry.StreamClientInterceptor(),
grpc_opentracing.StreamClientInterceptor(opts...),
)),
)
ch <- struct{}{}
}()
select {
case <-c.ctx.Done():
return retry.NoRetryError(errors.New("context canceled"))
case <-ch:
log.Debug("RootCoordClient try reconnect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(c.ctx, c.addr,
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(5*time.Second),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(),
grpc_opentracing.UnaryClientInterceptor(opts...),
)),
grpc.WithStreamInterceptor(
grpc_middleware.ChainStreamClient(
grpc_retry.StreamClientInterceptor(),
grpc_opentracing.StreamClientInterceptor(opts...),
)),
)
if err != nil {
return err
}
if err == nil {
c.conn = conn
}
return err
c.conn = conn
return nil
}
err = retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc)
err = retry.Do(c.ctx, connectGrpcFunc, c.retryOptions...)
if err != nil {
log.Debug("RootCoordClient try reconnect failed", zap.Error(err))
return err
@ -157,19 +136,12 @@ func (c *GrpcClient) connect() error {
return nil
}
func (c *GrpcClient) Init() error {
// for now, we must try many times in Init Stage
initFunc := func() error {
return c.connect()
}
err := retry.Retry(10000, 3*time.Second, initFunc)
return err
}
func (c *GrpcClient) Start() error {
return nil
}
func (c *GrpcClient) Stop() error {
c.cancel()
return c.conn.Close()
}
@ -183,14 +155,9 @@ func (c *GrpcClient) recall(caller func() (interface{}, error)) (interface{}, er
if err == nil {
return ret, nil
}
for i := 0; i < c.reconnTry; i++ {
err = c.connect()
if err == nil {
break
}
}
err = c.connect()
if err != nil {
return nil, err
return ret, err
}
ret, err = caller()
if err == nil {

View File

@ -38,6 +38,7 @@ import (
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/trace"
)
@ -57,9 +58,9 @@ type Server struct {
indexCoord types.IndexCoord
queryCoord types.QueryCoord
newIndexCoordClient func(string, []string, time.Duration) types.IndexCoord
newDataCoordClient func(string, []string, time.Duration) types.DataCoord
newQueryCoordClient func(string, []string, time.Duration) types.QueryCoord
newIndexCoordClient func(string, []string, ...retry.Option) types.IndexCoord
newDataCoordClient func(string, []string, ...retry.Option) types.DataCoord
newQueryCoordClient func(string, []string, ...retry.Option) types.QueryCoord
closer io.Closer
}
@ -83,8 +84,11 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
func (s *Server) setClient() {
ctx := context.Background()
s.newDataCoordClient = func(etcdMetaRoot string, etcdEndpoints []string, timeout time.Duration) types.DataCoord {
dsClient := dsc.NewClient(etcdMetaRoot, etcdEndpoints, timeout)
s.newDataCoordClient = func(etcdMetaRoot string, etcdEndpoints []string, retryOptions ...retry.Option) types.DataCoord {
dsClient, err := dsc.NewClient(s.ctx, etcdMetaRoot, etcdEndpoints, retryOptions...)
if err != nil {
panic(err)
}
if err := dsClient.Init(); err != nil {
panic(err)
}
@ -96,8 +100,11 @@ func (s *Server) setClient() {
}
return dsClient
}
s.newIndexCoordClient = func(metaRootPath string, etcdEndpoints []string, timeout time.Duration) types.IndexCoord {
isClient := isc.NewClient(metaRootPath, etcdEndpoints, timeout)
s.newIndexCoordClient = func(metaRootPath string, etcdEndpoints []string, retryOptions ...retry.Option) types.IndexCoord {
isClient, err := isc.NewClient(s.ctx, metaRootPath, etcdEndpoints, retryOptions...)
if err != nil {
panic(err)
}
if err := isClient.Init(); err != nil {
panic(err)
}
@ -106,8 +113,8 @@ func (s *Server) setClient() {
}
return isClient
}
s.newQueryCoordClient = func(metaRootPath string, etcdEndpoints []string, timeout time.Duration) types.QueryCoord {
qsClient, err := qsc.NewClient(metaRootPath, etcdEndpoints, timeout)
s.newQueryCoordClient = func(metaRootPath string, etcdEndpoints []string, retryOptions ...retry.Option) types.QueryCoord {
qsClient, err := qsc.NewClient(s.ctx, metaRootPath, etcdEndpoints, retryOptions...)
if err != nil {
panic(err)
}
@ -160,7 +167,10 @@ func (s *Server) init() error {
log.Debug("RootCoord", zap.Any("State", internalpb.StateCode_Initializing))
s.rootCoord.SetNewProxyClient(
func(s *sessionutil.Session) (types.Proxy, error) {
cli := pnc.NewClient(s.Address, 3*time.Second)
cli, err := pnc.NewClient(ctx, s.Address, retry.Attempts(300))
if err != nil {
return nil, err
}
if err := cli.Init(); err != nil {
return nil, err
}
@ -173,7 +183,7 @@ func (s *Server) init() error {
if s.newDataCoordClient != nil {
log.Debug("RootCoord start to create DataCoord client")
dataCoord := s.newDataCoordClient(rootcoord.Params.MetaRootPath, rootcoord.Params.EtcdEndpoints, 3*time.Second)
dataCoord := s.newDataCoordClient(rootcoord.Params.MetaRootPath, rootcoord.Params.EtcdEndpoints, retry.Attempts(300))
if err := s.rootCoord.SetDataCoord(ctx, dataCoord); err != nil {
panic(err)
}
@ -181,7 +191,7 @@ func (s *Server) init() error {
}
if s.newIndexCoordClient != nil {
log.Debug("RootCoord start to create IndexCoord client")
indexCoord := s.newIndexCoordClient(rootcoord.Params.MetaRootPath, rootcoord.Params.EtcdEndpoints, 3*time.Second)
indexCoord := s.newIndexCoordClient(rootcoord.Params.MetaRootPath, rootcoord.Params.EtcdEndpoints, retry.Attempts(300))
if err := s.rootCoord.SetIndexCoord(indexCoord); err != nil {
panic(err)
}
@ -189,7 +199,7 @@ func (s *Server) init() error {
}
if s.newQueryCoordClient != nil {
log.Debug("RootCoord start to create QueryCoord client")
queryCoord := s.newQueryCoordClient(rootcoord.Params.MetaRootPath, rootcoord.Params.EtcdEndpoints, 3*time.Second)
queryCoord := s.newQueryCoordClient(rootcoord.Params.MetaRootPath, rootcoord.Params.EtcdEndpoints, retry.Attempts(300))
if err := s.rootCoord.SetQueryCoord(queryCoord); err != nil {
panic(err)
}

View File

@ -258,7 +258,7 @@ func TestGrpcService(t *testing.T) {
svr.rootCoord.UpdateStateCode(internalpb.StateCode_Healthy)
cli, err := rcc.NewClient(context.Background(), rootcoord.Params.MetaRootPath, rootcoord.Params.EtcdEndpoints, 3*time.Second)
cli, err := rcc.NewClient(context.Background(), rootcoord.Params.MetaRootPath, rootcoord.Params.EtcdEndpoints, retry.Attempts(300))
assert.Nil(t, err)
err = cli.Init()
@ -925,13 +925,13 @@ func TestRun(t *testing.T) {
assert.NotNil(t, err)
assert.EqualError(t, err, "listen tcp: address 1000000: invalid port")
svr.newDataCoordClient = func(string, []string, time.Duration) types.DataCoord {
svr.newDataCoordClient = func(string, []string, ...retry.Option) types.DataCoord {
return &mockDataCoord{}
}
svr.newIndexCoordClient = func(string, []string, time.Duration) types.IndexCoord {
svr.newIndexCoordClient = func(string, []string, ...retry.Option) types.IndexCoord {
return &mockIndex{}
}
svr.newQueryCoordClient = func(string, []string, time.Duration) types.QueryCoord {
svr.newQueryCoordClient = func(string, []string, ...retry.Option) types.QueryCoord {
return &mockQuery{}
}
@ -965,7 +965,7 @@ func initEtcd(etcdEndpoints []string) (*clientv3.Client, error) {
etcdCli = etcd
return nil
}
err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
err := retry.Do(context.TODO(), connectEtcdFn, retry.Attempts(300))
if err != nil {
return nil, err
}

View File

@ -123,7 +123,7 @@ func (i *IndexCoord) Init() error {
return err
}
log.Debug("IndexCoord try to connect etcd")
err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
err := retry.Do(i.loopCtx, connectEtcdFn, retry.Attempts(300))
if err != nil {
log.Debug("IndexCoord try to connect etcd failed", zap.Error(err))
return err

View File

@ -12,10 +12,10 @@
package indexcoord
import (
"context"
"fmt"
"strconv"
"sync"
"time"
"github.com/milvus-io/milvus/internal/util/retry"
@ -169,7 +169,7 @@ func (mt *metaTable) BuildIndex(indexBuildID UniqueID, nodeID int64) error {
m.indexMeta.NodeID = nodeID
return mt.saveIndexMeta(m)
}
err2 := retry.Retry(5, time.Millisecond*200, fn)
err2 := retry.Do(context.TODO(), fn, retry.Attempts(5))
if err2 != nil {
return err2
}
@ -207,7 +207,7 @@ func (mt *metaTable) UpdateVersion(indexBuildID UniqueID) error {
return mt.saveIndexMeta(m)
}
err2 := retry.Retry(5, time.Millisecond*200, fn)
err2 := retry.Do(context.TODO(), fn, retry.Attempts(5))
return err2
}
@ -234,7 +234,7 @@ func (mt *metaTable) MarkIndexAsDeleted(indexID UniqueID) error {
m.indexMeta.MarkDeleted = true
return mt.saveIndexMeta(m)
}
err2 := retry.Retry(5, time.Millisecond*200, fn)
err2 := retry.Do(context.TODO(), fn, retry.Attempts(5))
if err2 != nil {
return err2
}
@ -319,7 +319,7 @@ func (mt *metaTable) UpdateRecycleState(indexBuildID UniqueID) error {
m.indexMeta.Recycled = true
return mt.saveIndexMeta(m)
}
err2 := retry.Retry(5, time.Millisecond*200, fn)
err2 := retry.Do(context.TODO(), fn, retry.Attempts(5))
if err2 != nil {
meta.indexMeta.Recycled = false
log.Debug("IndexCoord metaTable UpdateRecycleState failed", zap.Error(err2))

View File

@ -14,7 +14,6 @@ package indexcoord
import (
"context"
"strconv"
"time"
"go.uber.org/zap"
@ -23,6 +22,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/retry"
)
func (i *IndexCoord) removeNode(nodeID UniqueID) {
@ -44,7 +44,7 @@ func (i *IndexCoord) addNode(nodeID UniqueID, req *indexpb.RegisterNodeRequest)
}
nodeAddress := req.Address.Ip + ":" + strconv.FormatInt(req.Address.Port, 10)
nodeClient, err := grpcindexnodeclient.NewClient(nodeAddress, 3*time.Second)
nodeClient, err := grpcindexnodeclient.NewClient(context.TODO(), nodeAddress, retry.Attempts(300))
if err != nil {
return err
}

View File

@ -98,7 +98,7 @@ func (i *IndexNode) Init() error {
i.etcdKV = etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
return err
}
err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
err := retry.Do(i.loopCtx, connectEtcdFn, retry.Attempts(300))
if err != nil {
log.Debug("IndexNode try connect etcd failed", zap.Error(err))
return err

View File

@ -16,7 +16,6 @@ import (
"errors"
"runtime"
"strconv"
"time"
"github.com/milvus-io/milvus/internal/util/retry"
@ -115,7 +114,7 @@ func (it *IndexBuildTask) OnEnqueue() error {
return nil
}
func (it *IndexBuildTask) checkIndexMeta(pre bool) error {
func (it *IndexBuildTask) checkIndexMeta(ctx context.Context, pre bool) error {
fn := func() error {
indexMeta := indexpb.IndexMeta{}
_, values, versions, err := it.etcdKV.LoadWithPrefix2(it.req.MetaPath)
@ -158,7 +157,7 @@ func (it *IndexBuildTask) checkIndexMeta(pre bool) error {
return err
}
err := retry.Retry(3, time.Millisecond*200, fn)
err := retry.Do(ctx, fn, retry.Attempts(3))
log.Debug("IndexNode checkIndexMeta final", zap.Error(err))
return err
@ -166,13 +165,13 @@ func (it *IndexBuildTask) checkIndexMeta(pre bool) error {
func (it *IndexBuildTask) PreExecute(ctx context.Context) error {
log.Debug("IndexNode IndexBuildTask preExecute...")
return it.checkIndexMeta(true)
return it.checkIndexMeta(ctx, true)
}
func (it *IndexBuildTask) PostExecute(ctx context.Context) error {
log.Debug("IndexNode IndexBuildTask PostExecute...")
return it.checkIndexMeta(false)
return it.checkIndexMeta(ctx, false)
}
func (it *IndexBuildTask) Execute(ctx context.Context) error {
@ -360,7 +359,7 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
}
return saveBlob(savePath, value)
}
err := retry.Retry(5, time.Millisecond*200, saveIndexFileFn)
err := retry.Do(ctx, saveIndexFileFn, retry.Attempts(5))
log.Debug("IndexNode try saveIndexFile final", zap.Error(err), zap.Any("savePath", savePath))
if err != nil {
return err

View File

@ -62,7 +62,7 @@ func NewMinIOKV(ctx context.Context, option *Option) (*MinIOKV, error) {
bucketExists, err = minIOClient.BucketExists(ctx, option.BucketName)
return err
}
err = retry.Retry(100000, time.Millisecond*200, checkBucketFn)
err = retry.Do(ctx, checkBucketFn, retry.Attempts(300))
if err != nil {
return nil, err
}

View File

@ -12,6 +12,7 @@
package proxy
import (
"context"
"encoding/json"
"errors"
"io/ioutil"
@ -35,7 +36,7 @@ func GetPulsarConfig(protocol, ip, port, url string) (map[string]interface{}, er
return err
}
err = retry.Retry(10, time.Second, getResp)
err = retry.Do(context.TODO(), getResp, retry.Attempts(10), retry.Sleep(time.Second))
if err != nil {
return nil, err
}

View File

@ -98,7 +98,7 @@ func (qc *QueryCoord) Init() error {
return err
}
log.Debug("query coordinator try to connect etcd")
err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
err := retry.Do(qc.loopCtx, connectEtcdFn, retry.Attempts(300))
if err != nil {
log.Debug("query coordinator try to connect etcd failed", zap.Error(err))
return err

View File

@ -12,10 +12,10 @@
package querycoord
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
@ -41,7 +41,7 @@ type queryNode struct {
}
func newQueryNode(address string, id UniqueID, kv *etcdkv.EtcdKV) (*queryNode, error) {
client, err := nodeclient.NewClient(address, 3*time.Second)
client, err := nodeclient.NewClient(context.TODO(), address, 300)
if err != nil {
return nil, err
}

View File

@ -134,7 +134,7 @@ func (node *QueryNode) Init() error {
return err
}
log.Debug("queryNode try to connect etcd")
err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
err := retry.Do(context.TODO(), connectEtcdFn, retry.Attempts(300))
if err != nil {
log.Debug("queryNode try to connect etcd failed", zap.Error(err))
return err

View File

@ -962,7 +962,7 @@ func (c *Core) Init() error {
return nil
}
log.Debug("RootCoord, Connect to Etcd")
err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
err := retry.Do(c.ctx, connectEtcdFn, retry.Attempts(300))
if err != nil {
return
}

View File

@ -60,7 +60,7 @@ func GetLocalIP() string {
return ipv4.LocalIP()
}
func WaitForComponentStates(ctx context.Context, service types.Component, serviceName string, states []internalpb.StateCode, attempts int, sleep time.Duration) error {
func WaitForComponentStates(ctx context.Context, service types.Component, serviceName string, states []internalpb.StateCode, attempts uint, sleep time.Duration) error {
checkFunc := func() error {
resp, err := service.GetComponentStates(ctx)
if err != nil {
@ -84,18 +84,18 @@ func WaitForComponentStates(ctx context.Context, service types.Component, servic
}
return nil
}
return retry.Retry(attempts, sleep, checkFunc)
return retry.Do(ctx, checkFunc, retry.Attempts(attempts), retry.Sleep(sleep))
}
func WaitForComponentInitOrHealthy(ctx context.Context, service types.Component, serviceName string, attempts int, sleep time.Duration) error {
func WaitForComponentInitOrHealthy(ctx context.Context, service types.Component, serviceName string, attempts uint, sleep time.Duration) error {
return WaitForComponentStates(ctx, service, serviceName, []internalpb.StateCode{internalpb.StateCode_Initializing, internalpb.StateCode_Healthy}, attempts, sleep)
}
func WaitForComponentInit(ctx context.Context, service types.Component, serviceName string, attempts int, sleep time.Duration) error {
func WaitForComponentInit(ctx context.Context, service types.Component, serviceName string, attempts uint, sleep time.Duration) error {
return WaitForComponentStates(ctx, service, serviceName, []internalpb.StateCode{internalpb.StateCode_Initializing}, attempts, sleep)
}
func WaitForComponentHealthy(ctx context.Context, service types.Component, serviceName string, attempts int, sleep time.Duration) error {
func WaitForComponentHealthy(ctx context.Context, service types.Component, serviceName string, attempts uint, sleep time.Duration) error {
return WaitForComponentStates(ctx, service, serviceName, []internalpb.StateCode{internalpb.StateCode_Healthy}, attempts, sleep)
}

View File

@ -1,48 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package retry
import "time"
type Config struct {
attempts uint
sleep time.Duration
maxSleepTime time.Duration
}
func NewDefaultConfig() *Config {
return &Config{
attempts: uint(10),
sleep: 200 * time.Millisecond,
maxSleepTime: 1 * time.Second,
}
}
type Option func(*Config)
func Attempts(attempts uint) Option {
return func(c *Config) {
c.attempts = attempts
}
}
func Sleep(sleep time.Duration) Option {
return func(c *Config) {
c.sleep = sleep
}
}
func MaxSleepTime(maxSleepTime time.Duration) Option {
return func(c *Config) {
c.maxSleepTime = maxSleepTime
}
}

View File

@ -1,71 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package retry
import (
"context"
"fmt"
"strings"
"time"
)
func Do(ctx context.Context, fn func() error, opts ...Option) error {
c := NewDefaultConfig()
for _, opt := range opts {
opt(c)
}
el := make(ErrorList, c.attempts)
for i := uint(0); i < c.attempts; i++ {
if err := fn(); err != nil {
if s, ok := err.(InterruptError); ok {
return s.error
}
el[i] = err
select {
case <-time.After(c.sleep):
case <-ctx.Done():
return ctx.Err()
}
c.sleep *= 2
if c.sleep > c.maxSleepTime {
c.sleep = c.maxSleepTime
}
} else {
return nil
}
}
return el
}
type ErrorList []error
func (el ErrorList) Error() string {
var builder strings.Builder
builder.WriteString("All attempts results:\n")
for index, err := range el {
builder.WriteString(fmt.Sprintf("attempt #%d:%s\n", index+1, err.Error()))
}
return builder.String()
}
type InterruptError struct {
error
}
func NoRetryError(err error) InterruptError {
return InterruptError{err}
}

View File

@ -1,115 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package retry
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestDo(t *testing.T) {
ctx := context.Background()
n := 0
testFn := func() error {
if n < 3 {
n++
return fmt.Errorf("some error")
}
return nil
}
err := Do(ctx, testFn)
assert.Nil(t, err)
}
func TestAttempts(t *testing.T) {
ctx := context.Background()
testFn := func() error {
return fmt.Errorf("some error")
}
err := Do(ctx, testFn, Attempts(1))
assert.NotNil(t, err)
fmt.Println(err)
}
func TestMaxSleepTime(t *testing.T) {
ctx := context.Background()
testFn := func() error {
return fmt.Errorf("some error")
}
err := Do(ctx, testFn, Attempts(3), MaxSleepTime(200*time.Millisecond))
assert.NotNil(t, err)
fmt.Println(err)
}
func TestSleep(t *testing.T) {
ctx := context.Background()
testFn := func() error {
return fmt.Errorf("some error")
}
err := Do(ctx, testFn, Attempts(3), Sleep(500*time.Millisecond))
assert.NotNil(t, err)
fmt.Println(err)
}
func TestAllError(t *testing.T) {
ctx := context.Background()
testFn := func() error {
return fmt.Errorf("some error")
}
err := Do(ctx, testFn, Attempts(3))
assert.NotNil(t, err)
fmt.Println(err)
}
func TestContextDeadline(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
testFn := func() error {
return fmt.Errorf("some error")
}
err := Do(ctx, testFn)
assert.NotNil(t, err)
fmt.Println(err)
}
func TestContextCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
testFn := func() error {
return fmt.Errorf("some error")
}
go func() {
time.Sleep(1 * time.Second)
cancel()
}()
err := Do(ctx, testFn)
assert.NotNil(t, err)
fmt.Println(err)
}

View File

@ -12,36 +12,54 @@
package retry
import (
"context"
"fmt"
"strings"
"time"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
// Reference: https://blog.cyeam.com/golang/2018/08/27/retry
func Do(ctx context.Context, fn func() error, opts ...Option) error {
func Impl(attempts int, sleep time.Duration, fn func() error, maxSleepTime time.Duration) error {
if err := fn(); err != nil {
if s, ok := err.(InterruptError); ok {
return s.error
}
c := NewDefaultConfig()
if attempts--; attempts > 0 {
log.Debug("retry func error", zap.Int("attempts", attempts), zap.Duration("sleep", sleep), zap.Error(err))
time.Sleep(sleep)
if sleep < maxSleepTime {
return Impl(attempts, 2*sleep, fn, maxSleepTime)
}
return Impl(attempts, maxSleepTime, fn, maxSleepTime)
}
return err
for _, opt := range opts {
opt(c)
}
return nil
el := make(ErrorList, c.attempts)
for i := uint(0); i < c.attempts; i++ {
if err := fn(); err != nil {
if s, ok := err.(InterruptError); ok {
return s.error
}
el[i] = err
select {
case <-time.After(c.sleep):
case <-ctx.Done():
return ctx.Err()
}
c.sleep *= 2
if c.sleep > c.maxSleepTime {
c.sleep = c.maxSleepTime
}
} else {
return nil
}
}
return el
}
func Retry(attempts int, sleep time.Duration, fn func() error) error {
maxSleepTime := time.Millisecond * 1000
return Impl(attempts, sleep, fn, maxSleepTime)
type ErrorList []error
func (el ErrorList) Error() string {
var builder strings.Builder
builder.WriteString("All attempts results:\n")
for index, err := range el {
builder.WriteString(fmt.Sprintf("attempt #%d:%s\n", index+1, err.Error()))
}
return builder.String()
}
type InterruptError struct {

View File

@ -12,65 +12,104 @@
package retry
import (
"errors"
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestImpl(t *testing.T) {
attempts := 10
sleep := time.Millisecond * 1
maxSleepTime := time.Millisecond * 16
func TestDo(t *testing.T) {
ctx := context.Background()
var err error
naiveF := func() error {
n := 0
testFn := func() error {
if n < 3 {
n++
return fmt.Errorf("some error")
}
return nil
}
err = Impl(attempts, sleep, naiveF, maxSleepTime)
assert.Equal(t, err, nil)
errorF := func() error {
return errors.New("errorF")
}
err = Impl(attempts, sleep, errorF, maxSleepTime)
assert.NotEqual(t, err, nil)
begin := 0
stop := 2
interruptF := func() error {
if begin >= stop {
return NoRetryError(errors.New("interrupt here"))
}
begin++
return errors.New("interruptF")
}
err = Impl(attempts, sleep, interruptF, maxSleepTime)
assert.NotEqual(t, err, nil)
begin = 0
stop = attempts / 2
untilSucceedF := func() error {
if begin >= stop {
return nil
}
begin++
return errors.New("untilSucceedF")
}
err = Impl(attempts, sleep, untilSucceedF, maxSleepTime)
assert.Equal(t, err, nil)
begin = 0
stop = attempts * 2
noRetryF := func() error {
if begin >= stop {
return nil
}
begin++
return errors.New("noRetryF")
}
err = Impl(attempts, sleep, noRetryF, maxSleepTime)
assert.NotEqual(t, err, nil)
err := Do(ctx, testFn)
assert.Nil(t, err)
}
func TestAttempts(t *testing.T) {
ctx := context.Background()
testFn := func() error {
return fmt.Errorf("some error")
}
err := Do(ctx, testFn, Attempts(1))
assert.NotNil(t, err)
fmt.Println(err)
}
func TestMaxSleepTime(t *testing.T) {
ctx := context.Background()
testFn := func() error {
return fmt.Errorf("some error")
}
err := Do(ctx, testFn, Attempts(3), MaxSleepTime(200*time.Millisecond))
assert.NotNil(t, err)
fmt.Println(err)
}
func TestSleep(t *testing.T) {
ctx := context.Background()
testFn := func() error {
return fmt.Errorf("some error")
}
err := Do(ctx, testFn, Attempts(3), Sleep(500*time.Millisecond))
assert.NotNil(t, err)
fmt.Println(err)
}
func TestAllError(t *testing.T) {
ctx := context.Background()
testFn := func() error {
return fmt.Errorf("some error")
}
err := Do(ctx, testFn, Attempts(3))
assert.NotNil(t, err)
fmt.Println(err)
}
func TestContextDeadline(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
testFn := func() error {
return fmt.Errorf("some error")
}
err := Do(ctx, testFn)
assert.NotNil(t, err)
fmt.Println(err)
}
func TestContextCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
testFn := func() error {
return fmt.Errorf("some error")
}
go func() {
time.Sleep(1 * time.Second)
cancel()
}()
err := Do(ctx, testFn)
assert.NotNil(t, err)
fmt.Println(err)
}

View File

@ -71,7 +71,7 @@ func NewSession(ctx context.Context, metaRoot string, etcdEndpoints []string) *S
session.etcdCli = etcdCli
return nil
}
err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
err := retry.Do(ctx, connectEtcdFn, retry.Attempts(300))
if err != nil {
return nil
}
@ -113,7 +113,7 @@ func (s *Session) checkIDExist() {
}
func (s *Session) getServerIDWithKey(key string, retryTimes int) (int64, error) {
func (s *Session) getServerIDWithKey(key string, retryTimes uint) (int64, error) {
res := int64(0)
getServerIDWithKeyFn := func() error {
getResp, err := s.etcdCli.Get(s.ctx, path.Join(s.metaRoot, DefaultServiceRoot, key))
@ -146,7 +146,7 @@ func (s *Session) getServerIDWithKey(key string, retryTimes int) (int64, error)
return nil
}
err := retry.Retry(retryTimes, time.Millisecond*500, getServerIDWithKeyFn)
err := retry.Do(s.ctx, getServerIDWithKeyFn, retry.Attempts(retryTimes), retry.Sleep(500*time.Millisecond))
return res, err
}
@ -205,7 +205,7 @@ func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, er
}
return nil
}
err := retry.Retry(DefaultRetryTimes, time.Millisecond*500, registerFn)
err := retry.Do(s.ctx, registerFn, retry.Attempts(DefaultRetryTimes), retry.Sleep(500*time.Millisecond))
if err != nil {
return nil, err
}
@ -310,20 +310,3 @@ func (s *Session) WatchServices(prefix string, revision int64) (eventChannel <-c
}()
return eventCh
}
func initEtcd(etcdEndpoints []string) (*clientv3.Client, error) {
var etcdCli *clientv3.Client
connectEtcdFn := func() error {
etcd, err := clientv3.New(clientv3.Config{Endpoints: etcdEndpoints, DialTimeout: 5 * time.Second})
if err != nil {
return err
}
etcdCli = etcd
return nil
}
err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
if err != nil {
return nil, err
}
return etcdCli, nil
}