diff --git a/internal/dataservice/cluster.go b/internal/dataservice/cluster.go index 9334dd28a8..325e80578c 100644 --- a/internal/dataservice/cluster.go +++ b/internal/dataservice/cluster.go @@ -170,7 +170,9 @@ func (c *cluster) register(n *datapb.DataNodeInfo) { c.dataManager.register(n) cNodes, chanBuffer := c.dataManager.getDataNodes(true) var rets []*datapb.DataNodeInfo + log.Debug("before register policy applied", zap.Any("n.Channels", n.Channels), zap.Any("buffer", chanBuffer)) rets, chanBuffer = c.registerPolicy.apply(cNodes, n, chanBuffer) + log.Debug("after register policy applied", zap.Any("ret", rets), zap.Any("buffer", chanBuffer)) c.dataManager.updateDataNodes(rets, chanBuffer) rets = c.watch(rets) c.dataManager.updateDataNodes(rets, chanBuffer) @@ -179,9 +181,14 @@ func (c *cluster) register(n *datapb.DataNodeInfo) { func (c *cluster) unregister(n *datapb.DataNodeInfo) { c.mu.Lock() defer c.mu.Unlock() + c.sessionManager.releaseSession(n.Address) - c.dataManager.unregister(n) + oldNode := c.dataManager.unregister(n) + if oldNode != nil { + n = oldNode + } cNodes, chanBuffer := c.dataManager.getDataNodes(true) + log.Debug("before unregister policy applied", zap.Any("n.Channels", n.Channels), zap.Any("buffer", chanBuffer)) var rets []*datapb.DataNodeInfo if len(cNodes) == 0 { for _, chStat := range n.Channels { @@ -191,6 +198,7 @@ func (c *cluster) unregister(n *datapb.DataNodeInfo) { } else { rets = c.unregisterPolicy.apply(cNodes, n) } + log.Debug("after register policy applied", zap.Any("ret", rets), zap.Any("buffer", chanBuffer)) c.dataManager.updateDataNodes(rets, chanBuffer) rets = c.watch(rets) c.dataManager.updateDataNodes(rets, chanBuffer) diff --git a/internal/dataservice/cluster_data_manager.go b/internal/dataservice/cluster_data_manager.go index 8e34caef65..5a641e836d 100644 --- a/internal/dataservice/cluster_data_manager.go +++ b/internal/dataservice/cluster_data_manager.go @@ -158,13 +158,14 @@ func (c *clusterNodeManager) register(n *datapb.DataNodeInfo) { c.updateMetrics() } -func (c *clusterNodeManager) unregister(n *datapb.DataNodeInfo) { +func (c *clusterNodeManager) unregister(n *datapb.DataNodeInfo) *datapb.DataNodeInfo { node, ok := c.dataNodes[n.Address] if !ok { - return + return nil } node.status = offline c.updateMetrics() + return node.info } func (c *clusterNodeManager) updateMetrics() {