Init once for indexnode (#8027)

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
pull/8436/head
cai.zhang 2021-09-23 21:27:54 +08:00 committed by GitHub
parent f2da1e3269
commit 1d16930a03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 41 additions and 30 deletions

View File

@ -28,6 +28,7 @@ import (
"io"
"math/rand"
"strconv"
"sync"
"sync/atomic"
"time"
"unsafe"
@ -73,6 +74,8 @@ type IndexNode struct {
finishedTasks map[UniqueID]commonpb.IndexState
closer io.Closer
initOnce sync.Once
}
func NewIndexNode(ctx context.Context) (*IndexNode, error) {
@ -114,39 +117,47 @@ func (i *IndexNode) initKnowhere() {
}
func (i *IndexNode) Init() error {
i.UpdateStateCode(internalpb.StateCode_Initializing)
log.Debug("IndexNode", zap.Any("State", internalpb.StateCode_Initializing))
connectEtcdFn := func() error {
etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
i.etcdKV = etcdKV
return err
}
err := retry.Do(i.loopCtx, connectEtcdFn, retry.Attempts(300))
if err != nil {
log.Debug("IndexNode try connect etcd failed", zap.Error(err))
return err
}
log.Debug("IndexNode try connect etcd success")
var initErr error = nil
i.initOnce.Do(func() {
Params.Init()
i.UpdateStateCode(internalpb.StateCode_Initializing)
log.Debug("IndexNode", zap.Any("State", internalpb.StateCode_Initializing))
connectEtcdFn := func() error {
etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
i.etcdKV = etcdKV
return err
}
err := retry.Do(i.loopCtx, connectEtcdFn, retry.Attempts(300))
if err != nil {
log.Debug("IndexNode try connect etcd failed", zap.Error(err))
initErr = err
return
}
log.Debug("IndexNode try connect etcd success")
option := &miniokv.Option{
Address: Params.MinIOAddress,
AccessKeyID: Params.MinIOAccessKeyID,
SecretAccessKeyID: Params.MinIOSecretAccessKey,
UseSSL: Params.MinIOUseSSL,
BucketName: Params.MinioBucketName,
CreateBucket: true,
}
i.kv, err = miniokv.NewMinIOKV(i.loopCtx, option)
if err != nil {
log.Debug("IndexNode NewMinIOKV failed", zap.Error(err))
return err
}
log.Debug("IndexNode NewMinIOKV success")
i.closer = trace.InitTracing("index_node")
option := &miniokv.Option{
Address: Params.MinIOAddress,
AccessKeyID: Params.MinIOAccessKeyID,
SecretAccessKeyID: Params.MinIOSecretAccessKey,
UseSSL: Params.MinIOUseSSL,
BucketName: Params.MinioBucketName,
CreateBucket: true,
}
i.kv, err = miniokv.NewMinIOKV(i.loopCtx, option)
if err != nil {
log.Debug("IndexNode NewMinIOKV failed", zap.Error(err))
initErr = err
return
}
log.Debug("IndexNode NewMinIOKV success")
i.closer = trace.InitTracing("index_node")
i.initKnowhere()
i.initKnowhere()
})
return nil
log.Debug("Init IndexNode finished", zap.Error(initErr))
return initErr
}
func (i *IndexNode) Start() error {