Init query node once (#8086)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/8523/head
bigsheeper 2021-09-24 21:03:56 +08:00 committed by GitHub
parent aec4c98363
commit e61bafc6f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 41 additions and 28 deletions

View File

@ -28,6 +28,7 @@ import (
"context"
"errors"
"strconv"
"sync"
"sync/atomic"
"unsafe"
@ -50,6 +51,8 @@ type QueryNode struct {
stateCode atomic.Value
//call once
initOnce sync.Once
// liveness channel with etcd
liveCh <-chan bool
@ -116,41 +119,51 @@ func (node *QueryNode) InitSegcore() {
}
func (node *QueryNode) Init() error {
//ctx := context.Background()
connectEtcdFn := func() error {
etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
if err != nil {
var initError error = nil
node.initOnce.Do(func() {
//ctx := context.Background()
connectEtcdFn := func() error {
etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
if err != nil {
return err
}
node.etcdKV = etcdKV
return err
}
node.etcdKV = etcdKV
return err
}
log.Debug("queryNode try to connect etcd")
err := retry.Do(node.queryNodeLoopCtx, connectEtcdFn, retry.Attempts(300))
if err != nil {
log.Debug("queryNode try to connect etcd failed", zap.Error(err))
return err
}
log.Debug("queryNode try to connect etcd success")
log.Debug("queryNode try to connect etcd",
zap.Any("EtcdEndpoints", Params.EtcdEndpoints),
zap.Any("MetaRootPath", Params.MetaRootPath),
)
err := retry.Do(node.queryNodeLoopCtx, connectEtcdFn, retry.Attempts(300))
if err != nil {
log.Debug("queryNode try to connect etcd failed", zap.Error(err))
initError = err
return
}
log.Debug("queryNode try to connect etcd success",
zap.Any("EtcdEndpoints", Params.EtcdEndpoints),
zap.Any("MetaRootPath", Params.MetaRootPath),
)
node.historical = newHistorical(node.queryNodeLoopCtx,
node.rootCoord,
node.indexCoord,
node.msFactory,
node.etcdKV)
node.streaming = newStreaming(node.queryNodeLoopCtx, node.msFactory, node.etcdKV)
node.historical = newHistorical(node.queryNodeLoopCtx,
node.rootCoord,
node.indexCoord,
node.msFactory,
node.etcdKV)
node.streaming = newStreaming(node.queryNodeLoopCtx, node.msFactory, node.etcdKV)
node.InitSegcore()
node.InitSegcore()
if node.rootCoord == nil {
log.Error("null root coordinator detected")
}
if node.rootCoord == nil {
log.Error("null root coordinator detected")
}
if node.indexCoord == nil {
log.Error("null index coordinator detected")
}
if node.indexCoord == nil {
log.Error("null index coordinator detected")
}
})
return nil
return initError
}
func (node *QueryNode) Start() error {