Fix Query Coord UT and cleanup logic (#13618)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/13671/head
Xiaofan 2021-12-17 21:30:42 +08:00 committed by GitHub
parent 6099d4c55f
commit b8d3808052
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 30 additions and 27 deletions

View File

@ -219,7 +219,7 @@ func (c *queryNodeCluster) reloadFromKV() error {
} }
err = c.nodes[nodeID].setCollectionInfo(collectionInfo) err = c.nodes[nodeID].setCollectionInfo(collectionInfo)
if err != nil { if err != nil {
log.Debug("reloadFromKV: failed to add queryNode meta to cluster", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error())) log.Warn("reloadFromKV: failed to add queryNode meta to cluster", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
return err return err
} }
log.Debug("reloadFromKV: reload collection info from etcd", zap.Any("info", collectionInfo)) log.Debug("reloadFromKV: reload collection info from etcd", zap.Any("info", collectionInfo))
@ -647,7 +647,7 @@ func (c *queryNodeCluster) registerNode(ctx context.Context, session *sessionuti
go node.start() go node.start()
} }
c.nodes[id] = node c.nodes[id] = node
log.Debug("registerNode: create a new query node", zap.Int64("nodeID", id), zap.String("address", session.Address)) log.Debug("registerNode: create a new query node", zap.Int64("nodeID", id), zap.String("address", session.Address), zap.Any("state", state))
return nil return nil
} }
return fmt.Errorf("registerNode: node %d alredy exists in cluster", id) return fmt.Errorf("registerNode: node %d alredy exists in cluster", id)

View File

@ -119,8 +119,8 @@ func (qc *QueryCoord) initSession() error {
// Init function initializes the queryCoord's meta, cluster, etcdKV and task scheduler // Init function initializes the queryCoord's meta, cluster, etcdKV and task scheduler
func (qc *QueryCoord) Init() error { func (qc *QueryCoord) Init() error {
log.Debug("query coord session info", zap.String("metaPath", Params.MetaRootPath), zap.Strings("etcdEndPoints", Params.EtcdEndpoints), zap.String("address", Params.Address)) log.Debug("query coordinator start init, session info", zap.String("metaPath", Params.MetaRootPath),
log.Debug("query coordinator start init") zap.Strings("etcdEndPoints", Params.EtcdEndpoints), zap.String("address", Params.Address))
//connect etcd //connect etcd
connectEtcdFn := func() error { connectEtcdFn := func() error {
etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath) etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
@ -134,11 +134,11 @@ func (qc *QueryCoord) Init() error {
qc.initOnce.Do(func() { qc.initOnce.Do(func() {
err := qc.initSession() err := qc.initSession()
if err != nil { if err != nil {
log.Error("QueryCoord init session failed", zap.Error(err)) log.Error("queryCoord init session failed", zap.Error(err))
initError = err initError = err
return return
} }
log.Debug("QueryCoord try to connect etcd") log.Debug("queryCoord try to connect etcd")
initError = retry.Do(qc.loopCtx, connectEtcdFn, retry.Attempts(300)) initError = retry.Do(qc.loopCtx, connectEtcdFn, retry.Attempts(300))
if initError != nil { if initError != nil {
log.Debug("query coordinator try to connect etcd failed", zap.Error(initError)) log.Debug("query coordinator try to connect etcd failed", zap.Error(initError))
@ -150,6 +150,7 @@ func (qc *QueryCoord) Init() error {
var idAllocatorKV *etcdkv.EtcdKV var idAllocatorKV *etcdkv.EtcdKV
idAllocatorKV, initError = tsoutil.NewTSOKVBase(Params.EtcdEndpoints, Params.KvRootPath, "queryCoordTaskID") idAllocatorKV, initError = tsoutil.NewTSOKVBase(Params.EtcdEndpoints, Params.KvRootPath, "queryCoordTaskID")
if initError != nil { if initError != nil {
log.Debug("query coordinator idAllocatorKV initialize failed", zap.Error(initError))
return return
} }
idAllocator := allocator.NewGlobalIDAllocator("idTimestamp", idAllocatorKV) idAllocator := allocator.NewGlobalIDAllocator("idTimestamp", idAllocatorKV)

View File

@ -30,10 +30,12 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/sessionutil"
"go.uber.org/zap"
) )
func setup() { func setup() {
@ -176,10 +178,18 @@ func TestWatchNodeLoop(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
for { for {
_, err = queryCoord.cluster.offlineNodes() offlineNodes, err := queryCoord.cluster.offlineNodes()
if err == nil { if err == nil {
log.Warn("find offline Nodes", zap.Any("node map", offlineNodes))
break break
} }
// if session id not exist, means querycoord already handled it and remove
_, err = kv.Load(nodeKey)
if err != nil {
log.Warn("already handled by querycoord", zap.Error(err))
break
}
time.Sleep(time.Duration(1) * time.Second)
} }
queryCoord.Stop() queryCoord.Stop()

View File

@ -386,14 +386,9 @@ func (qn *queryNode) removeQueryChannelInfo(collectionID UniqueID) {
func (qn *queryNode) clearNodeInfo() error { func (qn *queryNode) clearNodeInfo() error {
qn.RLock() qn.RLock()
defer qn.RUnlock() defer qn.RUnlock()
for collectionID := range qn.collectionInfos { // delete query node meta and all the collection info
err := removeNodeCollectionInfo(collectionID, qn.id, qn.kvClient) key := fmt.Sprintf("%s/%d", queryNodeMetaPrefix, qn.id)
if err != nil { return qn.kvClient.RemoveWithPrefix(key)
return err
}
}
return nil
} }
func (qn *queryNode) setState(state nodeState) { func (qn *queryNode) setState(state nodeState) {

View File

@ -130,36 +130,33 @@ func (s *Session) Register() {
} }
func (s *Session) getServerID() (int64, error) { func (s *Session) getServerID() (int64, error) {
return s.getServerIDWithKey(DefaultIDKey, DefaultRetryTimes) return s.getServerIDWithKey(DefaultIDKey)
} }
func (s *Session) checkIDExist() { func (s *Session) checkIDExist() {
log.Debug("Session checkIDExist Begin")
s.etcdCli.Txn(s.ctx).If( s.etcdCli.Txn(s.ctx).If(
clientv3.Compare( clientv3.Compare(
clientv3.Version(path.Join(s.metaRoot, DefaultServiceRoot, DefaultIDKey)), clientv3.Version(path.Join(s.metaRoot, DefaultServiceRoot, DefaultIDKey)),
"=", "=",
0)). 0)).
Then(clientv3.OpPut(path.Join(s.metaRoot, DefaultServiceRoot, DefaultIDKey), "1")).Commit() Then(clientv3.OpPut(path.Join(s.metaRoot, DefaultServiceRoot, DefaultIDKey), "1")).Commit()
log.Debug("Session checkIDExist End")
} }
func (s *Session) getServerIDWithKey(key string, retryTimes uint) (int64, error) { func (s *Session) getServerIDWithKey(key string) (int64, error) {
for { for {
log.Debug("Session try to get serverID")
getResp, err := s.etcdCli.Get(s.ctx, path.Join(s.metaRoot, DefaultServiceRoot, key)) getResp, err := s.etcdCli.Get(s.ctx, path.Join(s.metaRoot, DefaultServiceRoot, key))
if err != nil { if err != nil {
log.Debug("Session get etcd key error", zap.String("key", key), zap.Error(err)) log.Warn("Session get etcd key error", zap.String("key", key), zap.Error(err))
return -1, err return -1, err
} }
if getResp.Count <= 0 { if getResp.Count <= 0 {
log.Debug("Session there is no value", zap.String("key", key)) log.Warn("Session there is no value", zap.String("key", key))
continue continue
} }
value := string(getResp.Kvs[0].Value) value := string(getResp.Kvs[0].Value)
valueInt, err := strconv.ParseInt(value, 10, 64) valueInt, err := strconv.ParseInt(value, 10, 64)
if err != nil { if err != nil {
log.Debug("Session ParseInt error", zap.String("value", value), zap.Error(err)) log.Warn("Session ParseInt error", zap.String("value", value), zap.Error(err))
continue continue
} }
txnResp, err := s.etcdCli.Txn(s.ctx).If( txnResp, err := s.etcdCli.Txn(s.ctx).If(
@ -169,15 +166,15 @@ func (s *Session) getServerIDWithKey(key string, retryTimes uint) (int64, error)
value)). value)).
Then(clientv3.OpPut(path.Join(s.metaRoot, DefaultServiceRoot, key), strconv.FormatInt(valueInt+1, 10))).Commit() Then(clientv3.OpPut(path.Join(s.metaRoot, DefaultServiceRoot, key), strconv.FormatInt(valueInt+1, 10))).Commit()
if err != nil { if err != nil {
log.Debug("Session Txn failed", zap.String("key", key), zap.Error(err)) log.Warn("Session Txn failed", zap.String("key", key), zap.Error(err))
return -1, err return -1, err
} }
if !txnResp.Succeeded { if !txnResp.Succeeded {
log.Debug("Session Txn unsuccessful", zap.String("key", key)) log.Warn("Session Txn unsuccessful", zap.String("key", key))
continue continue
} }
log.Debug("Session get serverID success") log.Debug("Session get serverID success", zap.String("key", key), zap.Int64("ServerId", valueInt))
return valueInt, nil return valueInt, nil
} }
} }
@ -238,7 +235,7 @@ func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, er
fmt.Printf("keep alive error %s\n", err) fmt.Printf("keep alive error %s\n", err)
return err return err
} }
log.Debug("Session Register End", zap.Int64("ServerID", s.ServerID)) log.Debug("Session register successfully", zap.Int64("ServerID", s.ServerID))
return nil return nil
} }
err := retry.Do(s.ctx, registerFn, retry.Attempts(DefaultRetryTimes), retry.Sleep(500*time.Millisecond)) err := retry.Do(s.ctx, registerFn, retry.Attempts(DefaultRetryTimes), retry.Sleep(500*time.Millisecond))