Fix Session checker for proxy (#10737)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/10723/head
Xiaofan 2021-10-27 21:58:33 +08:00 committed by GitHub
parent 896f43504b
commit 6fddcb3925
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 12 additions and 19 deletions

View File

@ -317,9 +317,7 @@ func (s *Server) startServerLoop() {
go s.startWatchService(s.serverLoopCtx)
go s.startFlushLoop(s.serverLoopCtx)
go s.session.LivenessCheck(s.serverLoopCtx, func() {
if err := s.Stop(); err != nil {
log.Error("failed to stop server", zap.Error(err))
}
log.Fatal("Data Coord disconnected from etcd, process will exit", zap.Int64("Server Id", s.session.ServerID))
})
}

View File

@ -173,10 +173,7 @@ func (node *DataNode) Register() error {
go node.StartWatchChannels(node.ctx)
// Start liveness check
go node.session.LivenessCheck(node.ctx, func() {
err := node.Stop()
if err != nil {
log.Warn("node stop failed", zap.Error(err))
}
log.Fatal("Data Node disconnected from etcd, process will exit", zap.Int64("Server Id", node.session.ServerID))
})
Params.initMsgChannelSubName()

View File

@ -253,7 +253,7 @@ func (i *IndexCoord) Start() error {
go i.watchMetaLoop()
go i.session.LivenessCheck(i.loopCtx, func() {
i.Stop()
log.Fatal("Index Coord disconnected from etcd, process will exit", zap.Int64("Server Id", i.session.ServerID))
})
startErr = i.sched.Start()

View File

@ -185,7 +185,7 @@ func (i *IndexNode) Start() error {
//start liveness check
go i.session.LivenessCheck(i.loopCtx, func() {
i.Stop()
log.Fatal("Index Node disconnected from etcd, process will exit", zap.Int64("Server Id", i.session.ServerID))
})
i.UpdateStateCode(internalpb.StateCode_Healthy)

View File

@ -105,6 +105,9 @@ func (node *Proxy) Register() error {
Params.ProxyID = node.session.ServerID
Params.SetLogger(Params.ProxyID)
Params.initProxySubName()
go node.session.LivenessCheck(node.ctx, func() {
log.Fatal("Proxy disconnected from etcd, process will exit", zap.Int64("Server Id", node.session.ServerID))
})
// TODO Reset the logger
//Params.initLogCfg()
return nil

View File

@ -183,7 +183,7 @@ func (qc *QueryCoord) Start() error {
go qc.watchHandoffSegmentLoop()
go qc.session.LivenessCheck(qc.loopCtx, func() {
qc.Stop()
log.Fatal("Query Coord disconnected from etcd, process will exit", zap.Int64("Server Id", qc.session.ServerID))
})
return nil

View File

@ -118,7 +118,7 @@ func (node *QueryNode) Register() error {
node.session.Init(typeutil.QueryNodeRole, Params.QueryNodeIP+":"+strconv.FormatInt(Params.QueryNodePort, 10), false)
// start liveness check
go node.session.LivenessCheck(node.queryNodeLoopCtx, func() {
node.Stop()
log.Fatal("Query Node disconnected from etcd, process will exit", zap.Int64("Server Id", node.session.ServerID))
})
Params.QueryNodeID = node.session.ServerID

View File

@ -16,7 +16,6 @@ import (
"encoding/json"
"fmt"
"math/rand"
"os"
"strconv"
"sync"
"sync/atomic"
@ -1103,11 +1102,7 @@ func (c *Core) Start() error {
go c.checkFlushedSegmentsLoop()
go c.session.LivenessCheck(c.ctx, func() {
log.Error("rootcoord disconnected from etcd, process will exit in 1 second")
go func() {
time.Sleep(time.Second)
os.Exit(-1)
}()
log.Error("Root Coord disconnected from etcd, process will exit", zap.Int64("Server Id", c.session.ServerID))
})
Params.CreatedTime = time.Now()
Params.UpdatedTime = time.Now()

View File

@ -241,12 +241,12 @@ func (s *Session) processKeepAliveResponse(ch <-chan *clientv3.LeaseKeepAliveRes
return
case resp, ok := <-ch:
if !ok {
log.Debug("session keepalive channel closed")
log.Warn("session keepalive channel closed")
close(failCh)
return
}
if resp == nil {
log.Debug("session keepalive response failed")
log.Warn("session keepalive response failed")
close(failCh)
return
}