mirror of https://github.com/milvus-io/milvus.git
fix remove offline nodes in watchDmChannel (#18906)
Signed-off-by: Wei Liu <wei.liu@zilliz.com> Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/18919/head
parent
e1227b343d
commit
0733aece2d
|
@ -305,7 +305,7 @@ func (c *queryNodeCluster) WatchDmChannels(ctx context.Context, nodeID int64, in
|
|||
old, ok := c.clusterMeta.getDmChannel(info.ChannelName)
|
||||
if ok {
|
||||
nodes = append(nodes, old.NodeIds...)
|
||||
removeFromSlice(nodes, in.OfflineNodeID)
|
||||
nodes = removeFromSlice(nodes, in.OfflineNodeID)
|
||||
log.Debug("Remove offline node from dmChannel",
|
||||
zap.String("channel", info.ChannelName),
|
||||
zap.Int64("removedNode", in.OfflineNodeID),
|
||||
|
|
|
@ -151,6 +151,23 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) (err error) {
|
|||
// init collection meta
|
||||
coll := w.node.metaReplica.addCollection(collectionID, w.req.Schema)
|
||||
|
||||
loadedChannelCounter := 0
|
||||
for _, toLoadChannel := range vChannels {
|
||||
for _, loadedChannel := range coll.vChannels {
|
||||
if toLoadChannel == loadedChannel {
|
||||
loadedChannelCounter += 1
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// check if all channels has been loaded, if YES, should do nothing and return
|
||||
// in case of query coord trigger same watchDmChannelTask on multi
|
||||
if len(vChannels) == loadedChannelCounter {
|
||||
log.Warn("All channel has been loaded, skip this watchDmChannelsTask")
|
||||
return nil
|
||||
}
|
||||
|
||||
//add shard cluster
|
||||
for _, vchannel := range vChannels {
|
||||
w.node.ShardClusterService.addShardCluster(w.req.GetCollectionID(), w.req.GetReplicaID(), vchannel)
|
||||
|
|
|
@ -97,6 +97,30 @@ func TestTask_watchDmChannelsTask(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("test execute repeated watchDmChannelTask", func(t *testing.T) {
|
||||
node, err := genSimpleQueryNode(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
task := watchDmChannelsTask{
|
||||
req: genWatchDMChannelsRequest(),
|
||||
node: node,
|
||||
}
|
||||
|
||||
task.req.Infos = []*datapb.VchannelInfo{
|
||||
{
|
||||
CollectionID: defaultCollectionID,
|
||||
ChannelName: defaultDMLChannel,
|
||||
},
|
||||
}
|
||||
task.req.PartitionIDs = []UniqueID{0}
|
||||
|
||||
err = task.Execute(ctx)
|
||||
assert.NoError(t, err)
|
||||
// query coord may submit same watchDmChannelTask
|
||||
err = task.Execute(ctx)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("test execute loadPartition", func(t *testing.T) {
|
||||
node, err := genSimpleQueryNode(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
|
Loading…
Reference in New Issue