From f84d9a589adb56d71e8d1e7bd2955b6819a72e32 Mon Sep 17 00:00:00 2001 From: aoiasd <45024769+aoiasd@users.noreply.github.com> Date: Fri, 26 Jan 2024 10:59:00 +0800 Subject: [PATCH] fix: channel checker reduce balancing channels. (#30087) Ignore leader unavailable when channel checker judge repeat channel to avoid channel checker remove channels balancing. relate: https://github.com/milvus-io/milvus/issues/29841 https://github.com/milvus-io/milvus/issues/29838 Signed-off-by: aoiasd --- .../querycoordv2/checkers/channel_checker.go | 27 +++- .../checkers/channel_checker_test.go | 20 ++- internal/querycoordv2/checkers/controller.go | 2 +- internal/querycoordv2/checkers/util.go | 78 +++++++++++ internal/querycoordv2/checkers/util_test.go | 123 ++++++++++++++++++ internal/querycoordv2/handlers.go | 7 - internal/querycoordv2/services.go | 53 +------- 7 files changed, 250 insertions(+), 60 deletions(-) create mode 100644 internal/querycoordv2/checkers/util.go create mode 100644 internal/querycoordv2/checkers/util_test.go diff --git a/internal/querycoordv2/checkers/channel_checker.go b/internal/querycoordv2/checkers/channel_checker.go index 1ec8158ccb..cf0bc86b11 100644 --- a/internal/querycoordv2/checkers/channel_checker.go +++ b/internal/querycoordv2/checkers/channel_checker.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/balance" "github.com/milvus-io/milvus/internal/querycoordv2/meta" . "github.com/milvus-io/milvus/internal/querycoordv2/params" + "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/log" @@ -39,6 +40,7 @@ type ChannelChecker struct { meta *meta.Meta dist *meta.DistributionManager targetMgr *meta.TargetManager + nodeMgr *session.NodeManager balancer balance.Balance } @@ -47,6 +49,7 @@ func NewChannelChecker( dist *meta.DistributionManager, targetMgr *meta.TargetManager, balancer balance.Balance, + nodeMgr *session.NodeManager, ) *ChannelChecker { return &ChannelChecker{ checkerActivation: newCheckerActivation(), @@ -54,6 +57,7 @@ func NewChannelChecker( dist: dist, targetMgr: targetMgr, balancer: balancer, + nodeMgr: nodeMgr, } } @@ -109,7 +113,7 @@ func (c *ChannelChecker) checkReplica(ctx context.Context, replica *meta.Replica repeated := c.findRepeatedChannels(replica.GetID()) tasks = c.createChannelReduceTasks(c.getTraceCtx(ctx, replica.CollectionID), repeated, replica.GetID()) - task.SetReason("redundancies of channel") + task.SetReason("redundancies of channel", tasks...) ret = append(ret, tasks...) // All channel related tasks should be with high priority @@ -174,8 +178,29 @@ func (c *ChannelChecker) findRepeatedChannels(replicaID int64) []*meta.DmChannel } dist := c.getChannelDist(replica) + targets := c.targetMgr.GetSealedSegmentsByCollection(replica.GetCollectionID(), meta.CurrentTarget) versionsMap := make(map[string]*meta.DmChannel) for _, ch := range dist { + leaderView := c.dist.LeaderViewManager.GetLeaderShardView(ch.Node, ch.GetChannelName()) + if leaderView == nil { + log.Info("shard leadview is not ready, skip", + zap.Int64("collectionID", replica.GetCollectionID()), + zap.Int64("replicaID", replicaID), + zap.Int64("leaderID", ch.Node), + zap.String("channel", ch.GetChannelName())) + continue + } + + if err := CheckLeaderAvaliable(c.nodeMgr, leaderView, targets); err != nil { + log.Info("replica has unavailable shard leader", + zap.Int64("collectionID", replica.GetCollectionID()), + zap.Int64("replicaID", replicaID), + zap.Int64("leaderID", ch.Node), + zap.String("channel", ch.GetChannelName()), + zap.Error(err)) + continue + } + maxVer, ok := versionsMap[ch.GetChannelName()] if !ok { versionsMap[ch.GetChannelName()] = ch diff --git a/internal/querycoordv2/checkers/channel_checker_test.go b/internal/querycoordv2/checkers/channel_checker_test.go index 4223a0e120..c067155db6 100644 --- a/internal/querycoordv2/checkers/channel_checker_test.go +++ b/internal/querycoordv2/checkers/channel_checker_test.go @@ -19,6 +19,7 @@ package checkers import ( "context" "testing" + "time" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -76,7 +77,7 @@ func (suite *ChannelCheckerTestSuite) SetupTest() { distManager := meta.NewDistributionManager() balancer := suite.createMockBalancer() - suite.checker = NewChannelChecker(suite.meta, distManager, targetManager, balancer) + suite.checker = NewChannelChecker(suite.meta, distManager, targetManager, balancer, suite.nodeMgr) suite.broker.EXPECT().GetPartitions(mock.Anything, int64(1)).Return([]int64{1}, nil).Maybe() } @@ -85,6 +86,14 @@ func (suite *ChannelCheckerTestSuite) TearDownTest() { suite.kv.Close() } +func (suite *ChannelCheckerTestSuite) setNodeAvailable(nodes ...int64) { + for _, node := range nodes { + nodeInfo := session.NewNodeInfo(node, "") + nodeInfo.SetLastHeartbeat(time.Now()) + suite.nodeMgr.Add(nodeInfo) + } +} + func (suite *ChannelCheckerTestSuite) createMockBalancer() balance.Balance { balancer := balance.NewMockBalancer(suite.T()) balancer.EXPECT().AssignChannel(mock.Anything, mock.Anything).Maybe().Return(func(channels []*meta.DmChannel, nodes []int64) []balance.ChannelAssignPlan { @@ -151,7 +160,10 @@ func (suite *ChannelCheckerTestSuite) TestReduceChannel() { checker.targetMgr.UpdateCollectionCurrentTarget(int64(1)) checker.dist.ChannelDistManager.Update(1, utils.CreateTestChannel(1, 1, 1, "test-insert-channel1")) + checker.dist.LeaderViewManager.Update(1, &meta.LeaderView{ID: 1, Channel: "test-insert-channel1"}) checker.dist.ChannelDistManager.Update(1, utils.CreateTestChannel(1, 1, 1, "test-insert-channel2")) + checker.dist.LeaderViewManager.Update(1, &meta.LeaderView{ID: 1, Channel: "test-insert-channel2"}) + suite.setNodeAvailable(1) tasks := checker.Check(context.TODO()) suite.Len(tasks, 1) suite.EqualValues(1, tasks[0].ReplicaID()) @@ -191,6 +203,12 @@ func (suite *ChannelCheckerTestSuite) TestRepeatedChannels() { checker.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 2, "test-insert-channel")) tasks := checker.Check(context.TODO()) + suite.Len(tasks, 0) + + suite.setNodeAvailable(1, 2) + checker.dist.LeaderViewManager.Update(1, &meta.LeaderView{ID: 1, Channel: "test-insert-channel"}) + checker.dist.LeaderViewManager.Update(2, &meta.LeaderView{ID: 2, Channel: "test-insert-channel"}) + tasks = checker.Check(context.TODO()) suite.Len(tasks, 1) suite.EqualValues(1, tasks[0].ReplicaID()) suite.Len(tasks[0].Actions(), 1) diff --git a/internal/querycoordv2/checkers/controller.go b/internal/querycoordv2/checkers/controller.go index d96372d07a..c95cc27b4a 100644 --- a/internal/querycoordv2/checkers/controller.go +++ b/internal/querycoordv2/checkers/controller.go @@ -63,7 +63,7 @@ func NewCheckerController( // CheckerController runs checkers with the order, // the former checker has higher priority checkers := map[utils.CheckerType]Checker{ - utils.ChannelChecker: NewChannelChecker(meta, dist, targetMgr, balancer), + utils.ChannelChecker: NewChannelChecker(meta, dist, targetMgr, balancer, nodeMgr), utils.SegmentChecker: NewSegmentChecker(meta, dist, targetMgr, balancer, nodeMgr), utils.BalanceChecker: NewBalanceChecker(meta, balancer, nodeMgr, scheduler), utils.IndexChecker: NewIndexChecker(meta, dist, broker, nodeMgr), diff --git a/internal/querycoordv2/checkers/util.go b/internal/querycoordv2/checkers/util.go new file mode 100644 index 0000000000..23d446839c --- /dev/null +++ b/internal/querycoordv2/checkers/util.go @@ -0,0 +1,78 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package checkers + +import ( + "fmt" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/querycoordv2/meta" + "github.com/milvus-io/milvus/internal/querycoordv2/session" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/merr" +) + +func CheckNodeAvailable(nodeID int64, info *session.NodeInfo) error { + if info == nil { + return merr.WrapErrNodeOffline(nodeID) + } + return nil +} + +// In a replica, a shard is available, if and only if: +// 1. The leader is online +// 2. All QueryNodes in the distribution are online +// 3. The last heartbeat response time is within HeartbeatAvailableInterval for all QueryNodes(include leader) in the distribution +// 4. All segments of the shard in target should be in the distribution +func CheckLeaderAvaliable(nodeMgr *session.NodeManager, leader *meta.LeaderView, currentTargets map[int64]*datapb.SegmentInfo) error { + log := log.With(zap.Int64("leaderID", leader.ID)) + info := nodeMgr.Get(leader.ID) + + // Check whether leader is online + err := CheckNodeAvailable(leader.ID, info) + if err != nil { + log.Info("leader is not available", zap.Error(err)) + return fmt.Errorf("leader not available: %w", err) + } + + for id, version := range leader.Segments { + info := nodeMgr.Get(version.GetNodeID()) + err = CheckNodeAvailable(version.GetNodeID(), info) + if err != nil { + log.Info("leader is not available due to QueryNode unavailable", + zap.Int64("segmentID", id), + zap.Error(err)) + return err + } + } + + // Check whether segments are fully loaded + for segmentID, info := range currentTargets { + if info.GetInsertChannel() != leader.Channel { + continue + } + + _, exist := leader.Segments[segmentID] + if !exist { + log.Info("leader is not available due to lack of segment", zap.Int64("segmentID", segmentID)) + return merr.WrapErrSegmentLack(segmentID) + } + } + return nil +} diff --git a/internal/querycoordv2/checkers/util_test.go b/internal/querycoordv2/checkers/util_test.go new file mode 100644 index 0000000000..2e7147c819 --- /dev/null +++ b/internal/querycoordv2/checkers/util_test.go @@ -0,0 +1,123 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package checkers + +import ( + "testing" + "time" + + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querycoordv2/meta" + "github.com/milvus-io/milvus/internal/querycoordv2/session" +) + +type UtilTestSuite struct { + suite.Suite + nodeMgr *session.NodeManager +} + +func (suite *UtilTestSuite) SetupTest() { + suite.nodeMgr = session.NewNodeManager() +} + +func (suite *UtilTestSuite) setNodeAvailable(nodes ...int64) { + for _, node := range nodes { + nodeInfo := session.NewNodeInfo(node, "") + nodeInfo.SetLastHeartbeat(time.Now()) + suite.nodeMgr.Add(nodeInfo) + } +} + +func (suite *UtilTestSuite) TestCheckLeaderAvaliable() { + leadview := &meta.LeaderView{ + ID: 1, + Channel: "test", + Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}}, + } + + suite.setNodeAvailable(1, 2) + err := CheckLeaderAvaliable(suite.nodeMgr, leadview, map[int64]*datapb.SegmentInfo{ + 2: { + ID: 2, + InsertChannel: "test", + }, + }) + suite.NoError(err) +} + +func (suite *UtilTestSuite) TestCheckLeaderAvaliableFailed() { + suite.Run("leader not available", func() { + leadview := &meta.LeaderView{ + ID: 1, + Channel: "test", + Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}}, + } + // leader nodeID=1 not available + suite.setNodeAvailable(2) + err := CheckLeaderAvaliable(suite.nodeMgr, leadview, map[int64]*datapb.SegmentInfo{ + 2: { + ID: 2, + InsertChannel: "test", + }, + }) + suite.Error(err) + suite.nodeMgr = session.NewNodeManager() + }) + + suite.Run("shard worker not available", func() { + leadview := &meta.LeaderView{ + ID: 1, + Channel: "test", + Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}}, + } + // leader nodeID=2 not available + suite.setNodeAvailable(1) + err := CheckLeaderAvaliable(suite.nodeMgr, leadview, map[int64]*datapb.SegmentInfo{ + 2: { + ID: 2, + InsertChannel: "test", + }, + }) + suite.Error(err) + suite.nodeMgr = session.NewNodeManager() + }) + + suite.Run("segment lacks", func() { + leadview := &meta.LeaderView{ + ID: 1, + Channel: "test", + Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}}, + } + suite.setNodeAvailable(1, 2) + err := CheckLeaderAvaliable(suite.nodeMgr, leadview, map[int64]*datapb.SegmentInfo{ + // target segmentID=1 not in leadView + 1: { + ID: 1, + InsertChannel: "test", + }, + }) + suite.Error(err) + suite.nodeMgr = session.NewNodeManager() + }) +} + +func TestUtilSuite(t *testing.T) { + suite.Run(t, new(UtilTestSuite)) +} diff --git a/internal/querycoordv2/handlers.go b/internal/querycoordv2/handlers.go index 2bf5412968..f756adc333 100644 --- a/internal/querycoordv2/handlers.go +++ b/internal/querycoordv2/handlers.go @@ -356,13 +356,6 @@ func (s *Server) fillReplicaInfo(replica *meta.Replica, withShardNodes bool) (*m return info, nil } -func checkNodeAvailable(nodeID int64, info *session.NodeInfo) error { - if info == nil { - return merr.WrapErrNodeOffline(nodeID) - } - return nil -} - func filterDupLeaders(replicaManager *meta.ReplicaManager, leaders map[int64]*meta.LeaderView) map[int64]*meta.LeaderView { type leaderID struct { ReplicaID int64 diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index e87c2e83fe..9273e019d9 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querycoordv2/checkers" "github.com/milvus-io/milvus/internal/querycoordv2/job" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/utils" @@ -889,57 +890,9 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade channelErr = merr.WrapErrChannelLack("channel not subscribed") } - // In a replica, a shard is available, if and only if: - // 1. The leader is online - // 2. All QueryNodes in the distribution are online - // 3. The last heartbeat response time is within HeartbeatAvailableInterval for all QueryNodes(include leader) in the distribution - // 4. All segments of the shard in target should be in the distribution for _, leader := range leaders { - log := log.With(zap.Int64("leaderID", leader.ID)) - info := s.nodeMgr.Get(leader.ID) - - // Check whether leader is online - err := checkNodeAvailable(leader.ID, info) - if err != nil { - log.Info("leader is not available", zap.Error(err)) - multierr.AppendInto(&channelErr, fmt.Errorf("leader not available: %w", err)) - continue - } - // Check whether QueryNodes are online and available - isAvailable := true - for id, version := range leader.Segments { - info := s.nodeMgr.Get(version.GetNodeID()) - err = checkNodeAvailable(version.GetNodeID(), info) - if err != nil { - log.Info("leader is not available due to QueryNode unavailable", - zap.Int64("segmentID", id), - zap.Error(err)) - isAvailable = false - multierr.AppendInto(&channelErr, err) - break - } - } - - // Avoid iterating all segments if any QueryNode unavailable - if !isAvailable { - continue - } - - // Check whether segments are fully loaded - for segmentID, info := range currentTargets { - if info.GetInsertChannel() != leader.Channel { - continue - } - - _, exist := leader.Segments[segmentID] - if !exist { - log.Info("leader is not available due to lack of segment", zap.Int64("segmentID", segmentID)) - multierr.AppendInto(&channelErr, merr.WrapErrSegmentLack(segmentID)) - isAvailable = false - break - } - } - if !isAvailable { + if err := checkers.CheckLeaderAvaliable(s.nodeMgr, leader, currentTargets); err != nil { + multierr.AppendInto(&channelErr, err) continue }