mirror of https://github.com/milvus-io/milvus.git
Unify liveness check in datacoord and datanode (#8480)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/8762/head
parent
7df90e0eb7
commit
d374ea71ba
|
@ -98,9 +98,9 @@ type Server struct {
|
|||
flushCh chan UniqueID
|
||||
msFactory msgstream.Factory
|
||||
|
||||
session *sessionutil.Session
|
||||
activeCh <-chan bool
|
||||
eventCh <-chan *sessionutil.SessionEvent
|
||||
session *sessionutil.Session
|
||||
liveCh <-chan bool
|
||||
eventCh <-chan *sessionutil.SessionEvent
|
||||
|
||||
dataNodeCreator DataNodeCreatorFunc
|
||||
rootCoordClientCreator RootCoordCreatorFunc
|
||||
|
@ -183,7 +183,7 @@ func (s *Server) Register() error {
|
|||
if s.session == nil {
|
||||
return errors.New("failed to initialize session")
|
||||
}
|
||||
s.activeCh = s.session.Init(typeutil.DataCoordRole, Params.IP, true)
|
||||
s.liveCh = s.session.Init(typeutil.DataCoordRole, Params.IP, true)
|
||||
Params.NodeID = s.session.ServerID
|
||||
return nil
|
||||
}
|
||||
|
@ -304,12 +304,14 @@ func (s *Server) initMeta() error {
|
|||
|
||||
func (s *Server) startServerLoop() {
|
||||
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
|
||||
s.serverLoopWg.Add(5)
|
||||
s.serverLoopWg.Add(4)
|
||||
go s.startStatsChannel(s.serverLoopCtx)
|
||||
go s.startDataNodeTtLoop(s.serverLoopCtx)
|
||||
go s.startWatchService(s.serverLoopCtx)
|
||||
go s.startActiveCheck(s.serverLoopCtx)
|
||||
go s.startFlushLoop(s.serverLoopCtx)
|
||||
go s.session.LivenessCheck(s.serverLoopCtx, s.liveCh, func() {
|
||||
s.Stop()
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Server) startStatsChannel(ctx context.Context) {
|
||||
|
@ -469,26 +471,6 @@ func (s *Server) handleSessionEvent(ctx context.Context, event *sessionutil.Sess
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Server) startActiveCheck(ctx context.Context) {
|
||||
defer logutil.LogPanic()
|
||||
defer s.serverLoopWg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case _, ok := <-s.activeCh:
|
||||
if ok {
|
||||
continue
|
||||
}
|
||||
go func() { s.Stop() }()
|
||||
log.Debug("disconnect with etcd and shutdown data coordinator")
|
||||
return
|
||||
case <-ctx.Done():
|
||||
log.Debug("connection check shutdown")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) startFlushLoop(ctx context.Context) {
|
||||
defer logutil.LogPanic()
|
||||
defer s.serverLoopWg.Done()
|
||||
|
|
|
@ -95,6 +95,7 @@ type DataNode struct {
|
|||
dataCoord types.DataCoord
|
||||
|
||||
session *sessionutil.Session
|
||||
liveCh <-chan bool
|
||||
kvClient *etcdkv.EtcdKV
|
||||
|
||||
closer io.Closer
|
||||
|
@ -149,8 +150,7 @@ func (node *DataNode) SetDataCoordInterface(ds types.DataCoord) error {
|
|||
// Register register datanode to etcd
|
||||
func (node *DataNode) Register() error {
|
||||
node.session = sessionutil.NewSession(node.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
|
||||
activeCh := node.session.Init(typeutil.DataNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false)
|
||||
go node.etcdAliveCheck(node.ctx, activeCh)
|
||||
node.liveCh = node.session.Init(typeutil.DataNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false)
|
||||
Params.NodeID = node.session.ServerID
|
||||
node.NodeID = node.session.ServerID
|
||||
// Start node watch node
|
||||
|
@ -202,26 +202,6 @@ func (node *DataNode) StartWatchChannels(ctx context.Context) {
|
|||
}
|
||||
}
|
||||
|
||||
// etcdAliveCheck performs alive check for etcd connection
|
||||
// will close datanode if check fails
|
||||
func (node *DataNode) etcdAliveCheck(ctx context.Context, ch <-chan bool) {
|
||||
for {
|
||||
select {
|
||||
case _, ok := <-ch:
|
||||
if ok { // ok means still alive do nothing
|
||||
continue
|
||||
}
|
||||
// not ok, disconnect
|
||||
go func() { node.Stop() }()
|
||||
log.Warn("disconnected from etcd, shuting down datanode", zap.Int64("ServerID", node.NodeID))
|
||||
return
|
||||
case <-ctx.Done():
|
||||
log.Warn("etcd alive check quit, due to ctx done")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleChannelEvt handles event from kv watch event
|
||||
func (node *DataNode) handleChannelEvt(evt *clientv3.Event) {
|
||||
switch evt.Type {
|
||||
|
@ -368,6 +348,10 @@ func (node *DataNode) Start() error {
|
|||
|
||||
go node.BackGroundGC(node.clearSignal)
|
||||
|
||||
go node.session.LivenessCheck(node.ctx, node.liveCh, func() {
|
||||
node.Stop()
|
||||
})
|
||||
|
||||
Params.CreatedTime = time.Now()
|
||||
Params.UpdatedTime = time.Now()
|
||||
|
||||
|
|
|
@ -449,31 +449,6 @@ func TestDataNode(t *testing.T) {
|
|||
node.Stop()
|
||||
}
|
||||
|
||||
func TestDataNodeEtcdAlive(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
node := newIDLEDataNodeMock(ctx)
|
||||
node.Init()
|
||||
node.Start()
|
||||
|
||||
mockCh := make(chan bool)
|
||||
go node.etcdAliveCheck(ctx, mockCh)
|
||||
|
||||
mockCh <- true
|
||||
flag := false
|
||||
select {
|
||||
case <-node.ctx.Done():
|
||||
flag = true
|
||||
default:
|
||||
}
|
||||
assert.False(t, flag)
|
||||
|
||||
close(mockCh)
|
||||
|
||||
_, ok := <-node.ctx.Done()
|
||||
assert.False(t, ok)
|
||||
}
|
||||
|
||||
func TestWatchChannel(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
node := newIDLEDataNodeMock(ctx)
|
||||
|
|
Loading…
Reference in New Issue