fix remove offline nodes in watchDmChannel (#18921)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/18954/head
wei liu 2022-08-31 19:56:57 +08:00 committed by GitHub
parent 4dc93857c2
commit d08581d9b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 44 additions and 2 deletions

View File

@ -301,7 +301,7 @@ func (c *queryNodeCluster) WatchDmChannels(ctx context.Context, nodeID int64, in
old, ok := c.clusterMeta.getDmChannel(info.ChannelName)
if ok {
nodes = append(nodes, old.NodeIds...)
removeFromSlice(nodes, in.OfflineNodeID)
nodes = removeFromSlice(nodes, in.OfflineNodeID)
log.Debug("Remove offline node from dmChannel",
zap.String("channel", info.ChannelName),
zap.Int64("removedNode", in.OfflineNodeID),

View File

@ -24,6 +24,8 @@ import (
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -31,7 +33,6 @@ import (
queryPb "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"golang.org/x/sync/errgroup"
)
type task interface {
@ -151,6 +152,23 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) (err error) {
// init collection meta
coll := w.node.metaReplica.addCollection(collectionID, w.req.Schema)
loadedChannelCounter := 0
for _, toLoadChannel := range vChannels {
for _, loadedChannel := range coll.vChannels {
if toLoadChannel == loadedChannel {
loadedChannelCounter++
break
}
}
}
// check if all channels has been loaded, if YES, should do nothing and return
// in case of query coord trigger same watchDmChannelTask on multi
if len(vChannels) == loadedChannelCounter {
log.Warn("All channel has been loaded, skip this watchDmChannelsTask")
return nil
}
//add shard cluster
for _, vchannel := range vChannels {
w.node.ShardClusterService.addShardCluster(w.req.GetCollectionID(), w.req.GetReplicaID(), vchannel)

View File

@ -97,6 +97,30 @@ func TestTask_watchDmChannelsTask(t *testing.T) {
assert.NoError(t, err)
})
t.Run("test execute repeated watchDmChannelTask", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
task := watchDmChannelsTask{
req: genWatchDMChannelsRequest(),
node: node,
}
task.req.Infos = []*datapb.VchannelInfo{
{
CollectionID: defaultCollectionID,
ChannelName: defaultDMLChannel,
},
}
task.req.PartitionIDs = []UniqueID{0}
err = task.Execute(ctx)
assert.NoError(t, err)
// query coord may submit same watchDmChannelTask
err = task.Execute(ctx)
assert.NoError(t, err)
})
t.Run("test execute loadPartition", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)