Fix restart node logic (#6265)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/6284/head
congqixia 2021-07-02 16:48:30 +08:00 committed by GitHub
parent 7e9a06f78d
commit 76fd9ad339
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 19 additions and 6 deletions

View File

@ -141,25 +141,38 @@ func (c *cluster) refresh(dataNodes []*datapb.DataNodeInfo) error {
}
}
// 2. restart nodes try to watch
// 2. restart nodes, disable node&session, execute unregister policy and put node into candidate list
restartNodes := make([]*datapb.DataNodeInfo, 0, len(deltaChange.restarts))
for _, node := range deltaChange.restarts {
info, ok := c.dataManager.dataNodes[node]
if ok {
restartNodes = append(restartNodes, info.info)
for _, cs := range info.info.Channels {
cs.State = datapb.ChannelWatchState_Uncomplete
}
c.dataManager.unregister(node) // remove from cluster
c.sessionManager.releaseSession(node)
} else {
log.Warn("Restart node not in node manager", zap.String("restart_node", node))
}
}
if len(restartNodes) > 0 {
_, buffer := c.dataManager.getDataNodes(true)
c.updateNodeWatch(restartNodes, buffer)
for _, node := range restartNodes {
cluster, buffer := c.dataManager.getDataNodes(true)
if len(cluster) > 0 {
ret := c.unregisterPolicy.apply(cluster, node)
c.updateNodeWatch(ret, buffer)
} else {
// no online node, put all watched channels to buffer
buffer = append(buffer, node.Channels...)
c.updateNodeWatch([]*datapb.DataNodeInfo{}, buffer)
}
node.Channels = node.Channels[:0] // clear channels
c.candidateManager.add(node) // put node into candidate list
}
}
// 3. offline do unregister
unregisterNodes := make([]*datapb.DataNodeInfo, 0, len(deltaChange.offlines)) // possible nodes info to unregister
for _, node := range deltaChange.offlines {
c.sessionManager.releaseSession(node)
info := c.dataManager.unregister(node)
if info != nil {
unregisterNodes = append(unregisterNodes, info)