filter delEvent when querynode register fail (#6150)

Signed-off-by: xige-16 <xi.ge@zilliz.com>
pull/6152/head
xige-16 2021-06-27 12:16:09 +08:00 committed by GitHub
parent ca4cba152c
commit f973456c10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 34 additions and 32 deletions

View File

@ -239,39 +239,41 @@ func (qc *QueryCoord) watchNodeLoop() {
}()
case sessionutil.SessionDelEvent:
serverID := event.Session.ServerID
log.Debug("query coordinator", zap.Any("The QueryNode crashed with ID", serverID))
qc.cluster.nodes[serverID].setNodeState(false)
qc.cluster.nodes[serverID].client.Stop()
loadBalanceSegment := &querypb.LoadBalanceRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadBalanceSegments,
SourceID: qc.session.ServerID,
},
SourceNodeIDs: []int64{serverID},
BalanceReason: querypb.TriggerCondition_nodeDown,
}
loadBalanceTask := &LoadBalanceTask{
BaseTask: BaseTask{
ctx: qc.loopCtx,
Condition: NewTaskCondition(qc.loopCtx),
triggerCondition: querypb.TriggerCondition_nodeDown,
},
LoadBalanceRequest: loadBalanceSegment,
rootCoord: qc.rootCoordClient,
dataCoord: qc.dataCoordClient,
cluster: qc.cluster,
meta: qc.meta,
}
qc.scheduler.Enqueue([]task{loadBalanceTask})
go func() {
err := loadBalanceTask.WaitToFinish()
if err != nil {
log.Error(err.Error())
if _, ok := qc.cluster.nodes[serverID]; ok {
log.Debug("query coordinator", zap.Any("The QueryNode crashed with ID", serverID))
qc.cluster.nodes[serverID].setNodeState(false)
qc.cluster.nodes[serverID].client.Stop()
loadBalanceSegment := &querypb.LoadBalanceRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadBalanceSegments,
SourceID: qc.session.ServerID,
},
SourceNodeIDs: []int64{serverID},
BalanceReason: querypb.TriggerCondition_nodeDown,
}
log.Debug("load balance done after queryNode down", zap.Int64s("nodeIDs", loadBalanceTask.SourceNodeIDs))
//TODO::remove nodeInfo and clear etcd
}()
loadBalanceTask := &LoadBalanceTask{
BaseTask: BaseTask{
ctx: qc.loopCtx,
Condition: NewTaskCondition(qc.loopCtx),
triggerCondition: querypb.TriggerCondition_nodeDown,
},
LoadBalanceRequest: loadBalanceSegment,
rootCoord: qc.rootCoordClient,
dataCoord: qc.dataCoordClient,
cluster: qc.cluster,
meta: qc.meta,
}
qc.scheduler.Enqueue([]task{loadBalanceTask})
go func() {
err := loadBalanceTask.WaitToFinish()
if err != nil {
log.Error(err.Error())
}
log.Debug("load balance done after queryNode down", zap.Int64s("nodeIDs", loadBalanceTask.SourceNodeIDs))
//TODO::remove nodeInfo and clear etcd
}()
}
}
}
}