Change retry times and add retry logic

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
pull/4973/head^2
cai.zhang 2021-02-26 15:17:47 +08:00 committed by yefu.chen
parent 4ea27335fd
commit f5977a1302
11 changed files with 107 additions and 113 deletions

View File

@ -2,11 +2,13 @@ package datanode
import (
"context"
"time"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
"github.com/zilliztech/milvus-distributed/internal/util/retry"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
@ -57,13 +59,21 @@ func (dsService *dataSyncService) close() {
func (dsService *dataSyncService) initNodes() {
// TODO: add delete pipeline support
// New metaTable
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}})
if err != nil {
panic(err)
}
var mt *metaTable
connectEtcdFn := func() error {
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}})
if err != nil {
return err
}
etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
mt, err := NewMetaTable(etcdKV)
etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
mt, err = NewMetaTable(etcdKV)
if err != nil {
return err
}
return nil
}
err := retry.Retry(200, time.Millisecond*200, connectEtcdFn)
if err != nil {
panic(err)
}

View File

@ -9,28 +9,21 @@ import (
"sync/atomic"
"time"
grpcdatanodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/datanode/client"
"go.uber.org/zap"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/timesync"
grpcdatanodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/datanode/client"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
"go.etcd.io/etcd/clientv3"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/timesync"
"github.com/zilliztech/milvus-distributed/internal/util/retry"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)
const role = "dataservice"
@ -172,13 +165,20 @@ func (s *Server) checkStateIsHealthy() bool {
}
func (s *Server) initMeta() error {
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}})
if err != nil {
return err
connectEtcdFn := func() error {
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}})
if err != nil {
return err
}
etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
s.client = etcdKV
s.meta, err = newMeta(etcdKV)
if err != nil {
return err
}
return nil
}
etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
s.client = etcdKV
s.meta, err = newMeta(etcdKV)
err := retry.Retry(200, time.Millisecond*200, connectEtcdFn)
if err != nil {
return err
}

View File

@ -18,7 +18,6 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"github.com/zilliztech/milvus-distributed/internal/util/retry"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
@ -106,24 +105,15 @@ func (i *NodeImpl) Init() error {
opentracing.SetGlobalTracer(tracer)
i.closer = closer
connectMinIOFn := func() error {
option := &miniokv.Option{
Address: Params.MinIOAddress,
AccessKeyID: Params.MinIOAccessKeyID,
SecretAccessKeyID: Params.MinIOSecretAccessKey,
UseSSL: Params.MinIOUseSSL,
BucketName: Params.MinioBucketName,
CreateBucket: true,
}
var err error
i.kv, err = miniokv.NewMinIOKV(i.loopCtx, option)
if err != nil {
return err
}
return nil
option := &miniokv.Option{
Address: Params.MinIOAddress,
AccessKeyID: Params.MinIOAccessKeyID,
SecretAccessKeyID: Params.MinIOSecretAccessKey,
UseSSL: Params.MinIOUseSSL,
BucketName: Params.MinioBucketName,
CreateBucket: true,
}
err = retry.Retry(10, time.Millisecond*200, connectMinIOFn)
i.kv, err = miniokv.NewMinIOKV(i.loopCtx, option)
if err != nil {
return err
}

View File

@ -27,9 +27,6 @@ type ParamTable struct {
MasterAddress string
EtcdAddress string
MetaRootPath string
MinIOAddress string
MinIOAccessKeyID string
MinIOSecretAccessKey string
@ -48,8 +45,6 @@ func (pt *ParamTable) Init() {
}
func (pt *ParamTable) initParams() {
pt.initEtcdAddress()
pt.initMetaRootPath()
pt.initMinIOAddress()
pt.initMinIOAccessKeyID()
pt.initMinIOSecretAccessKey()
@ -106,26 +101,6 @@ func (pt *ParamTable) LoadConfigFromInitParams(initParams *internalpb2.InitParam
return nil
}
func (pt *ParamTable) initEtcdAddress() {
addr, err := pt.Load("_EtcdAddress")
if err != nil {
panic(err)
}
pt.EtcdAddress = addr
}
func (pt *ParamTable) initMetaRootPath() {
rootPath, err := pt.Load("etcd.rootPath")
if err != nil {
panic(err)
}
subPath, err := pt.Load("etcd.metaSubPath")
if err != nil {
panic(err)
}
pt.MetaRootPath = rootPath + "/" + subPath
}
func (pt *ParamTable) initMinIOAddress() {
ret, err := pt.Load("_MinioAddress")
if err != nil {

View File

@ -16,11 +16,6 @@ func TestParamTable_Address(t *testing.T) {
fmt.Println(address)
}
func TestParamTable_MetaRootPath(t *testing.T) {
path := Params.MetaRootPath
assert.Equal(t, path, "by-dev/meta")
}
func TestParamTable_MinIOAddress(t *testing.T) {
address := Params.MinIOAddress
fmt.Println(address)

View File

@ -84,7 +84,7 @@ func (i *ServiceImpl) Init() error {
i.metaTable = metakv
return nil
}
err := retry.Retry(10, time.Millisecond*200, connectEtcdFn)
err := retry.Retry(200, time.Millisecond*200, connectEtcdFn)
if err != nil {
return err
}
@ -101,23 +101,16 @@ func (i *ServiceImpl) Init() error {
return err
}
connectMinIOFn := func() error {
option := &miniokv.Option{
Address: Params.MinIOAddress,
AccessKeyID: Params.MinIOAccessKeyID,
SecretAccessKeyID: Params.MinIOSecretAccessKey,
UseSSL: Params.MinIOUseSSL,
BucketName: Params.MinioBucketName,
CreateBucket: true,
}
i.kv, err = miniokv.NewMinIOKV(i.loopCtx, option)
if err != nil {
return err
}
return nil
option := &miniokv.Option{
Address: Params.MinIOAddress,
AccessKeyID: Params.MinIOAccessKeyID,
SecretAccessKeyID: Params.MinIOSecretAccessKey,
UseSSL: Params.MinIOUseSSL,
BucketName: Params.MinioBucketName,
CreateBucket: true,
}
err = retry.Retry(10, time.Millisecond*200, connectMinIOFn)
i.kv, err = miniokv.NewMinIOKV(i.loopCtx, option)
if err != nil {
return err
}

View File

@ -3,6 +3,7 @@ package miniokv
import (
"context"
"fmt"
"time"
"io"
"log"
@ -11,6 +12,7 @@ import (
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/util/retry"
)
type MinIOKV struct {
@ -29,10 +31,20 @@ type Option struct {
}
func NewMinIOKV(ctx context.Context, option *Option) (*MinIOKV, error) {
minIOClient, err := minio.New(option.Address, &minio.Options{
Creds: credentials.NewStaticV4(option.AccessKeyID, option.SecretAccessKeyID, ""),
Secure: option.UseSSL,
})
var minIOClient *minio.Client
connectMinIOFn := func() error {
var err error
minIOClient, err = minio.New(option.Address, &minio.Options{
Creds: credentials.NewStaticV4(option.AccessKeyID, option.SecretAccessKeyID, ""),
Secure: option.UseSSL,
})
if err != nil {
return err
}
return nil
}
err := retry.Retry(200, time.Millisecond*200, connectMinIOFn)
if err != nil {
return nil, err
}

View File

@ -9,8 +9,6 @@ import (
"time"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/tso"
"github.com/zilliztech/milvus-distributed/internal/errors"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
"github.com/zilliztech/milvus-distributed/internal/log"
@ -23,6 +21,8 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
"github.com/zilliztech/milvus-distributed/internal/tso"
"github.com/zilliztech/milvus-distributed/internal/util/retry"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"go.etcd.io/etcd/clientv3"
@ -768,15 +768,21 @@ func (c *Core) SetQueryService(s QueryServiceInterface) error {
func (c *Core) Init() error {
var initError error = nil
c.initOnce.Do(func() {
if c.etcdCli, initError = clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}, DialTimeout: 5 * time.Second}); initError != nil {
connectEtcdFn := func() error {
if c.etcdCli, initError = clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}, DialTimeout: 5 * time.Second}); initError != nil {
return initError
}
c.metaKV = etcdkv.NewEtcdKV(c.etcdCli, Params.MetaRootPath)
if c.MetaTable, initError = NewMetaTable(c.metaKV); initError != nil {
return initError
}
c.kvBase = etcdkv.NewEtcdKV(c.etcdCli, Params.KvRootPath)
return nil
}
err := retry.Retry(200, time.Millisecond*200, connectEtcdFn)
if err != nil {
return
}
c.metaKV = etcdkv.NewEtcdKV(c.etcdCli, Params.MetaRootPath)
if c.MetaTable, initError = NewMetaTable(c.metaKV); initError != nil {
return
}
c.kvBase = etcdkv.NewEtcdKV(c.etcdCli, Params.KvRootPath)
c.idAllocator = allocator.NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{Params.EtcdAddress}, Params.KvRootPath, "gid"))
if initError = c.idAllocator.Initialize(); initError != nil {

View File

@ -113,7 +113,7 @@ func (ms *PulsarMsgStream) AsProducer(channels []string) {
ms.producers = append(ms.producers, pp)
return nil
}
err := util.Retry(10, time.Millisecond*200, fn)
err := util.Retry(20, time.Millisecond*200, fn)
if err != nil {
errMsg := "Failed to create producer " + channels[i] + ", error = " + err.Error()
panic(errMsg)
@ -150,7 +150,7 @@ func (ms *PulsarMsgStream) AsConsumer(channels []string,
go ms.receiveMsg(pc)
return nil
}
err := util.Retry(10, time.Millisecond*200, fn)
err := util.Retry(20, time.Millisecond*200, fn)
if err != nil {
errMsg := "Failed to create consumer " + channels[i] + ", error = " + err.Error()
panic(errMsg)

View File

@ -96,7 +96,7 @@ func (node *NodeImpl) waitForServiceReady(service Component, serviceName string)
return nil
}
// wait for 10 seconds
err := retry.Retry(10, time.Millisecond*200, checkFunc)
err := retry.Retry(200, time.Millisecond*200, checkFunc)
if err != nil {
errMsg := fmt.Sprintf("ProxyNode wait for %s ready failed", serviceName)
return errors.New(errMsg)

View File

@ -13,6 +13,7 @@ import (
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/util/retry"
"go.etcd.io/etcd/clientv3"
)
@ -30,11 +31,23 @@ type metaService struct {
func newMetaService(ctx context.Context, replica collectionReplica) *metaService {
ETCDAddr := Params.ETCDAddress
MetaRootPath := Params.MetaRootPath
var cli *clientv3.Client
var err error
cli, _ := clientv3.New(clientv3.Config{
Endpoints: []string{ETCDAddr},
DialTimeout: 5 * time.Second,
})
connectEtcdFn := func() error {
cli, err = clientv3.New(clientv3.Config{
Endpoints: []string{ETCDAddr},
DialTimeout: 5 * time.Second,
})
if err != nil {
return err
}
return nil
}
err = retry.Retry(200, time.Millisecond*200, connectEtcdFn)
if err != nil {
panic(err)
}
return &metaService{
ctx: ctx,