mirror of https://github.com/milvus-io/milvus.git
Add datanode etcd alive check (#8363)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/8390/head
parent
b897ca5703
commit
6469d83072
|
@ -149,7 +149,8 @@ func (node *DataNode) SetDataCoordInterface(ds types.DataCoord) error {
|
|||
// Register register datanode to etcd
|
||||
func (node *DataNode) Register() error {
|
||||
node.session = sessionutil.NewSession(node.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
|
||||
node.session.Init(typeutil.DataNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false)
|
||||
activeCh := node.session.Init(typeutil.DataNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false)
|
||||
go node.etcdAliveCheck(node.ctx, activeCh)
|
||||
Params.NodeID = node.session.ServerID
|
||||
node.NodeID = node.session.ServerID
|
||||
// Start node watch node
|
||||
|
@ -199,6 +200,26 @@ func (node *DataNode) StartWatchChannels(ctx context.Context) {
|
|||
}
|
||||
}
|
||||
|
||||
// etcdAliveCheck performs alive check for etcd connection
|
||||
// will close datanode if check fails
|
||||
func (node *DataNode) etcdAliveCheck(ctx context.Context, ch <-chan bool) {
|
||||
for {
|
||||
select {
|
||||
case _, ok := <-ch:
|
||||
if ok { // ok means still alive do nothing
|
||||
continue
|
||||
}
|
||||
// not ok, disconnect
|
||||
go func() { node.Stop() }()
|
||||
log.Warn("disconnected from etcd, shuting down datanode", zap.Int64("ServerID", node.NodeID))
|
||||
return
|
||||
case <-ctx.Done():
|
||||
log.Warn("etcd alive check quit, due to ctx done")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleChannelEvt handles event from kv watch event
|
||||
func (node *DataNode) handleChannelEvt(evt *clientv3.Event) {
|
||||
switch evt.Type {
|
||||
|
|
|
@ -414,6 +414,31 @@ func TestDataNode(t *testing.T) {
|
|||
node.Stop()
|
||||
}
|
||||
|
||||
func TestDataNodeEtcdAlive(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
node := newIDLEDataNodeMock(ctx)
|
||||
node.Init()
|
||||
node.Start()
|
||||
|
||||
mockCh := make(chan bool)
|
||||
go node.etcdAliveCheck(ctx, mockCh)
|
||||
|
||||
mockCh <- true
|
||||
flag := false
|
||||
select {
|
||||
case <-node.ctx.Done():
|
||||
flag = true
|
||||
default:
|
||||
}
|
||||
assert.False(t, flag)
|
||||
|
||||
close(mockCh)
|
||||
|
||||
_, ok := <-node.ctx.Done()
|
||||
assert.False(t, ok)
|
||||
}
|
||||
|
||||
func TestWatchChannel(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
node := newIDLEDataNodeMock(ctx)
|
||||
|
|
Loading…
Reference in New Issue