diff --git a/internal/querycoordv2/checkers/balance_checker.go b/internal/querycoordv2/checkers/balance_checker.go index 092bc4dc07..04e1f5d548 100644 --- a/internal/querycoordv2/checkers/balance_checker.go +++ b/internal/querycoordv2/checkers/balance_checker.go @@ -30,6 +30,7 @@ import ( . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" + "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -55,8 +56,8 @@ func NewBalanceChecker(meta *meta.Meta, balancer balance.Balance, nodeMgr *sessi } } -func (b *BalanceChecker) ID() CheckerType { - return balanceChecker +func (b *BalanceChecker) ID() utils.CheckerType { + return utils.BalanceChecker } func (b *BalanceChecker) Description() string { diff --git a/internal/querycoordv2/checkers/channel_checker.go b/internal/querycoordv2/checkers/channel_checker.go index 9584d718bb..4578aa3751 100644 --- a/internal/querycoordv2/checkers/channel_checker.go +++ b/internal/querycoordv2/checkers/channel_checker.go @@ -56,8 +56,8 @@ func NewChannelChecker( } } -func (c *ChannelChecker) ID() CheckerType { - return channelChecker +func (c *ChannelChecker) ID() utils.CheckerType { + return utils.ChannelChecker } func (c *ChannelChecker) Description() string { diff --git a/internal/querycoordv2/checkers/checker.go b/internal/querycoordv2/checkers/checker.go index 33a463b90c..8355ef6d25 100644 --- a/internal/querycoordv2/checkers/checker.go +++ b/internal/querycoordv2/checkers/checker.go @@ -21,10 +21,11 @@ import ( "sync/atomic" "github.com/milvus-io/milvus/internal/querycoordv2/task" + "github.com/milvus-io/milvus/internal/querycoordv2/utils" ) type Checker interface { - ID() CheckerType + ID() utils.CheckerType Description() string Check(ctx context.Context) []task.Task IsActive() bool diff --git a/internal/querycoordv2/checkers/controller.go b/internal/querycoordv2/checkers/controller.go index 1a4b541730..4f742b17f0 100644 --- a/internal/querycoordv2/checkers/controller.go +++ b/internal/querycoordv2/checkers/controller.go @@ -29,44 +29,15 @@ import ( . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" + "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/log" ) -const ( - segmentCheckerName = "segment_checker" - channelCheckerName = "channel_checker" - balanceCheckerName = "balance_checker" - indexCheckerName = "index_checker" -) - -type CheckerType int32 - -const ( - channelChecker CheckerType = iota + 1 - segmentChecker - balanceChecker - indexChecker -) - -var ( - checkRoundTaskNumLimit = 256 - checkerOrder = []string{channelCheckerName, segmentCheckerName, balanceCheckerName, indexCheckerName} - checkerNames = map[CheckerType]string{ - segmentChecker: segmentCheckerName, - channelChecker: channelCheckerName, - balanceChecker: balanceCheckerName, - indexChecker: indexCheckerName, - } - errTypeNotFound = errors.New("checker type not found") -) - -func (s CheckerType) String() string { - return checkerNames[s] -} +var errTypeNotFound = errors.New("checker type not found") type CheckerController struct { cancel context.CancelFunc - manualCheckChs map[CheckerType]chan struct{} + manualCheckChs map[utils.CheckerType]chan struct{} meta *meta.Meta dist *meta.DistributionManager targetMgr *meta.TargetManager @@ -75,7 +46,7 @@ type CheckerController struct { balancer balance.Balance scheduler task.Scheduler - checkers map[CheckerType]Checker + checkers map[utils.CheckerType]Checker stopOnce sync.Once } @@ -91,17 +62,18 @@ func NewCheckerController( ) *CheckerController { // CheckerController runs checkers with the order, // the former checker has higher priority - checkers := map[CheckerType]Checker{ - channelChecker: NewChannelChecker(meta, dist, targetMgr, balancer), - segmentChecker: NewSegmentChecker(meta, dist, targetMgr, balancer, nodeMgr), - balanceChecker: NewBalanceChecker(meta, balancer, nodeMgr, scheduler), - indexChecker: NewIndexChecker(meta, dist, broker, nodeMgr), + checkers := map[utils.CheckerType]Checker{ + utils.ChannelChecker: NewChannelChecker(meta, dist, targetMgr, balancer), + utils.SegmentChecker: NewSegmentChecker(meta, dist, targetMgr, balancer, nodeMgr), + utils.BalanceChecker: NewBalanceChecker(meta, balancer, nodeMgr, scheduler), + utils.IndexChecker: NewIndexChecker(meta, dist, broker, nodeMgr), + utils.LeaderChecker: NewLeaderChecker(meta, dist, targetMgr, nodeMgr), } - manualCheckChs := map[CheckerType]chan struct{}{ - channelChecker: make(chan struct{}, 1), - segmentChecker: make(chan struct{}, 1), - balanceChecker: make(chan struct{}, 1), + manualCheckChs := map[utils.CheckerType]chan struct{}{ + utils.ChannelChecker: make(chan struct{}, 1), + utils.SegmentChecker: make(chan struct{}, 1), + utils.BalanceChecker: make(chan struct{}, 1), } return &CheckerController{ @@ -124,22 +96,24 @@ func (controller *CheckerController) Start() { } } -func getCheckerInterval(checker CheckerType) time.Duration { +func getCheckerInterval(checker utils.CheckerType) time.Duration { switch checker { - case segmentChecker: + case utils.SegmentChecker: return Params.QueryCoordCfg.SegmentCheckInterval.GetAsDuration(time.Millisecond) - case channelChecker: + case utils.ChannelChecker: return Params.QueryCoordCfg.ChannelCheckInterval.GetAsDuration(time.Millisecond) - case balanceChecker: + case utils.BalanceChecker: return Params.QueryCoordCfg.BalanceCheckInterval.GetAsDuration(time.Millisecond) - case indexChecker: + case utils.IndexChecker: return Params.QueryCoordCfg.IndexCheckInterval.GetAsDuration(time.Millisecond) + case utils.LeaderChecker: + return Params.QueryCoordCfg.LeaderViewUpdateInterval.GetAsDuration(time.Millisecond) default: return Params.QueryCoordCfg.CheckInterval.GetAsDuration(time.Millisecond) } } -func (controller *CheckerController) startChecker(ctx context.Context, checker CheckerType) { +func (controller *CheckerController) startChecker(ctx context.Context, checker utils.CheckerType) { interval := getCheckerInterval(checker) ticker := time.NewTicker(interval) defer ticker.Stop() @@ -180,7 +154,7 @@ func (controller *CheckerController) Check() { } // check is the real implementation of Check -func (controller *CheckerController) check(ctx context.Context, checkType CheckerType) { +func (controller *CheckerController) check(ctx context.Context, checkType utils.CheckerType) { checker := controller.checkers[checkType] tasks := checker.Check(ctx) @@ -193,7 +167,7 @@ func (controller *CheckerController) check(ctx context.Context, checkType Checke } } -func (controller *CheckerController) Deactivate(typ CheckerType) error { +func (controller *CheckerController) Deactivate(typ utils.CheckerType) error { for _, checker := range controller.checkers { if checker.ID() == typ { checker.Deactivate() @@ -203,7 +177,7 @@ func (controller *CheckerController) Deactivate(typ CheckerType) error { return errTypeNotFound } -func (controller *CheckerController) Activate(typ CheckerType) error { +func (controller *CheckerController) Activate(typ utils.CheckerType) error { for _, checker := range controller.checkers { if checker.ID() == typ { checker.Activate() @@ -213,7 +187,7 @@ func (controller *CheckerController) Activate(typ CheckerType) error { return errTypeNotFound } -func (controller *CheckerController) IsActive(typ CheckerType) (bool, error) { +func (controller *CheckerController) IsActive(typ utils.CheckerType) (bool, error) { for _, checker := range controller.checkers { if checker.ID() == typ { return checker.IsActive(), nil diff --git a/internal/querycoordv2/checkers/controller_base_test.go b/internal/querycoordv2/checkers/controller_base_test.go index cfb1e202ef..9f5b233def 100644 --- a/internal/querycoordv2/checkers/controller_base_test.go +++ b/internal/querycoordv2/checkers/controller_base_test.go @@ -29,6 +29,7 @@ import ( . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" + "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -80,28 +81,28 @@ func (suite *ControllerBaseTestSuite) SetupTest() { } func (s *ControllerBaseTestSuite) TestActivation() { - active, err := s.controller.IsActive(segmentChecker) + active, err := s.controller.IsActive(utils.SegmentChecker) s.NoError(err) s.True(active) - err = s.controller.Deactivate(segmentChecker) + err = s.controller.Deactivate(utils.SegmentChecker) s.NoError(err) - active, err = s.controller.IsActive(segmentChecker) + active, err = s.controller.IsActive(utils.SegmentChecker) s.NoError(err) s.False(active) - err = s.controller.Activate(segmentChecker) + err = s.controller.Activate(utils.SegmentChecker) s.NoError(err) - active, err = s.controller.IsActive(segmentChecker) + active, err = s.controller.IsActive(utils.SegmentChecker) s.NoError(err) s.True(active) invalidTyp := -1 - _, err = s.controller.IsActive(CheckerType(invalidTyp)) + _, err = s.controller.IsActive(utils.CheckerType(invalidTyp)) s.Equal(errTypeNotFound, err) } func (s *ControllerBaseTestSuite) TestListCheckers() { checkers := s.controller.Checkers() - s.Equal(4, len(checkers)) + s.Equal(5, len(checkers)) } func TestControllerBaseTestSuite(t *testing.T) { diff --git a/internal/querycoordv2/checkers/index_checker.go b/internal/querycoordv2/checkers/index_checker.go index 421bf210b9..0ba2cadd62 100644 --- a/internal/querycoordv2/checkers/index_checker.go +++ b/internal/querycoordv2/checkers/index_checker.go @@ -28,6 +28,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" + "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -58,8 +59,8 @@ func NewIndexChecker( } } -func (c *IndexChecker) ID() CheckerType { - return indexChecker +func (c *IndexChecker) ID() utils.CheckerType { + return utils.IndexChecker } func (c *IndexChecker) Description() string { diff --git a/internal/querycoordv2/checkers/leader_checker.go b/internal/querycoordv2/checkers/leader_checker.go new file mode 100644 index 0000000000..285592d062 --- /dev/null +++ b/internal/querycoordv2/checkers/leader_checker.go @@ -0,0 +1,205 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package checkers + +import ( + "context" + "time" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querycoordv2/meta" + "github.com/milvus-io/milvus/internal/querycoordv2/params" + "github.com/milvus-io/milvus/internal/querycoordv2/session" + "github.com/milvus-io/milvus/internal/querycoordv2/task" + "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +var _ Checker = (*LeaderChecker)(nil) + +// LeaderChecker perform segment index check. +type LeaderChecker struct { + *checkerActivation + meta *meta.Meta + dist *meta.DistributionManager + target *meta.TargetManager + nodeMgr *session.NodeManager +} + +func NewLeaderChecker( + meta *meta.Meta, + dist *meta.DistributionManager, + target *meta.TargetManager, + nodeMgr *session.NodeManager, +) *LeaderChecker { + return &LeaderChecker{ + checkerActivation: newCheckerActivation(), + meta: meta, + dist: dist, + target: target, + nodeMgr: nodeMgr, + } +} + +func (c *LeaderChecker) ID() utils.CheckerType { + return utils.LeaderChecker +} + +func (c *LeaderChecker) Description() string { + return "LeaderChecker checks the difference of leader view between dist, and try to correct it" +} + +func (c *LeaderChecker) Check(ctx context.Context) []task.Task { + if !c.IsActive() { + return nil + } + + collectionIDs := c.meta.CollectionManager.GetAll() + tasks := make([]task.Task, 0) + + for _, collectionID := range collectionIDs { + collection := c.meta.CollectionManager.GetCollection(collectionID) + if collection == nil { + log.Warn("collection released during check leader", zap.Int64("collection", collectionID)) + continue + } + + replicas := c.meta.ReplicaManager.GetByCollection(collectionID) + for _, replica := range replicas { + for _, node := range replica.GetNodes() { + if ok, _ := c.nodeMgr.IsStoppingNode(node); ok { + // no need to correct leader's view which is loaded on stopping node + continue + } + + leaderViews := c.dist.LeaderViewManager.GetByCollectionAndNode(replica.GetCollectionID(), node) + for ch, leaderView := range leaderViews { + dist := c.dist.SegmentDistManager.GetByShardWithReplica(ch, replica) + tasks = append(tasks, c.findNeedLoadedSegments(ctx, replica.ID, leaderView, dist)...) + tasks = append(tasks, c.findNeedRemovedSegments(ctx, replica.ID, leaderView, dist)...) + } + } + } + } + + return tasks +} + +func (c *LeaderChecker) findNeedLoadedSegments(ctx context.Context, replica int64, leaderView *meta.LeaderView, dist []*meta.Segment) []task.Task { + log := log.Ctx(ctx).With( + zap.Int64("collectionID", leaderView.CollectionID), + zap.Int64("replica", replica), + zap.String("channel", leaderView.Channel), + zap.Int64("leaderViewID", leaderView.ID), + ) + ret := make([]task.Task, 0) + dist = utils.FindMaxVersionSegments(dist) + for _, s := range dist { + version, ok := leaderView.Segments[s.GetID()] + currentTarget := c.target.GetSealedSegment(s.CollectionID, s.GetID(), meta.CurrentTarget) + existInCurrentTarget := currentTarget != nil + existInNextTarget := c.target.GetSealedSegment(s.CollectionID, s.GetID(), meta.NextTarget) != nil + + if !existInCurrentTarget && !existInNextTarget { + continue + } + + leaderWithOldVersion := version.GetVersion() < s.Version + // leader has newer version, but the query node which loaded the newer version has been shutdown + leaderWithDirtyVersion := version.GetVersion() > s.Version && c.nodeMgr.Get(version.GetNodeID()) == nil + + if !ok || leaderWithOldVersion || leaderWithDirtyVersion { + log.Debug("leader checker append a segment to set", + zap.Int64("segmentID", s.GetID()), + zap.Int64("nodeID", s.Node)) + action := task.NewSegmentActionWithScope(s.Node, task.ActionTypeGrow, s.GetInsertChannel(), s.GetID(), querypb.DataScope_Historical) + t, err := task.NewSegmentTask( + ctx, + params.Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), + c.ID(), + s.GetCollectionID(), + replica, + action, + ) + if err != nil { + log.Warn("create segment update task failed", + zap.Int64("segmentID", s.GetID()), + zap.Int64("node", s.Node), + zap.Error(err), + ) + continue + } + // index task shall have lower or equal priority than balance task + t.SetPriority(task.TaskPriorityHigh) + t.SetReason("add segment to leader view") + ret = append(ret, t) + } + } + return ret +} + +func (c *LeaderChecker) findNeedRemovedSegments(ctx context.Context, replica int64, leaderView *meta.LeaderView, dists []*meta.Segment) []task.Task { + log := log.Ctx(ctx).With( + zap.Int64("collectionID", leaderView.CollectionID), + zap.Int64("replica", replica), + zap.String("channel", leaderView.Channel), + zap.Int64("leaderViewID", leaderView.ID), + ) + + ret := make([]task.Task, 0) + distMap := make(map[int64]struct{}) + for _, s := range dists { + distMap[s.GetID()] = struct{}{} + } + + for sid, s := range leaderView.Segments { + _, ok := distMap[sid] + existInCurrentTarget := c.target.GetSealedSegment(leaderView.CollectionID, sid, meta.CurrentTarget) != nil + existInNextTarget := c.target.GetSealedSegment(leaderView.CollectionID, sid, meta.NextTarget) != nil + if ok || existInCurrentTarget || existInNextTarget { + continue + } + log.Debug("leader checker append a segment to remove", + zap.Int64("segmentID", sid), + zap.Int64("nodeID", s.NodeID)) + + action := task.NewSegmentActionWithScope(leaderView.ID, task.ActionTypeReduce, leaderView.Channel, sid, querypb.DataScope_Historical) + t, err := task.NewSegmentTask( + ctx, + paramtable.Get().QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), + c.ID(), + leaderView.CollectionID, + replica, + action, + ) + if err != nil { + log.Warn("create segment reduce task failed", + zap.Int64("segmentID", sid), + zap.Int64("nodeID", s.NodeID), + zap.Error(err)) + continue + } + + t.SetPriority(task.TaskPriorityHigh) + t.SetReason("remove segment from leader view") + ret = append(ret, t) + } + return ret +} diff --git a/internal/querycoordv2/checkers/leader_checker_test.go b/internal/querycoordv2/checkers/leader_checker_test.go new file mode 100644 index 0000000000..bfdf7a0f37 --- /dev/null +++ b/internal/querycoordv2/checkers/leader_checker_test.go @@ -0,0 +1,410 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package checkers + +import ( + "context" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus/internal/kv" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/metastore/kv/querycoord" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querycoordv2/meta" + . "github.com/milvus-io/milvus/internal/querycoordv2/params" + "github.com/milvus-io/milvus/internal/querycoordv2/session" + "github.com/milvus-io/milvus/internal/querycoordv2/task" + "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +type LeaderCheckerTestSuite struct { + suite.Suite + checker *LeaderChecker + kv kv.MetaKv + + meta *meta.Meta + broker *meta.MockBroker + nodeMgr *session.NodeManager +} + +func (suite *LeaderCheckerTestSuite) SetupSuite() { + paramtable.Init() +} + +func (suite *LeaderCheckerTestSuite) SetupTest() { + var err error + config := GenerateEtcdConfig() + cli, err := etcd.GetEtcdClient( + config.UseEmbedEtcd.GetAsBool(), + config.EtcdUseSSL.GetAsBool(), + config.Endpoints.GetAsStrings(), + config.EtcdTLSCert.GetValue(), + config.EtcdTLSKey.GetValue(), + config.EtcdTLSCACert.GetValue(), + config.EtcdTLSMinVersion.GetValue()) + suite.Require().NoError(err) + suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue()) + + // meta + store := querycoord.NewCatalog(suite.kv) + idAllocator := RandomIncrementIDAllocator() + suite.nodeMgr = session.NewNodeManager() + suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr) + suite.broker = meta.NewMockBroker(suite.T()) + + distManager := meta.NewDistributionManager() + targetManager := meta.NewTargetManager(suite.broker, suite.meta) + suite.checker = NewLeaderChecker(suite.meta, distManager, targetManager, suite.nodeMgr) +} + +func (suite *LeaderCheckerTestSuite) TearDownTest() { + suite.kv.Close() +} + +func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegments() { + observer := suite.checker + observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) + observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) + observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) + segments := []*datapb.SegmentInfo{ + { + ID: 1, + PartitionID: 1, + InsertChannel: "test-insert-channel", + }, + } + channels := []*datapb.VchannelInfo{ + { + CollectionID: 1, + ChannelName: "test-insert-channel", + }, + } + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( + channels, segments, nil) + observer.target.UpdateCollectionNextTarget(int64(1)) + observer.target.UpdateCollectionCurrentTarget(1) + observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel")) + observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) + view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}) + view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget) + observer.dist.LeaderViewManager.Update(2, view) + + tasks := suite.checker.Check(context.TODO()) + suite.Len(tasks, 1) + suite.Equal(tasks[0].Source(), utils.LeaderChecker) + suite.Len(tasks[0].Actions(), 1) + suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeGrow) + suite.Equal(tasks[0].Actions()[0].Node(), int64(1)) + suite.Equal(tasks[0].Actions()[0].(*task.SegmentAction).SegmentID(), int64(1)) + suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh) +} + +func (suite *LeaderCheckerTestSuite) TestActivation() { + observer := suite.checker + observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) + observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) + observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) + segments := []*datapb.SegmentInfo{ + { + ID: 1, + PartitionID: 1, + InsertChannel: "test-insert-channel", + }, + } + channels := []*datapb.VchannelInfo{ + { + CollectionID: 1, + ChannelName: "test-insert-channel", + }, + } + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( + channels, segments, nil) + observer.target.UpdateCollectionNextTarget(int64(1)) + observer.target.UpdateCollectionCurrentTarget(1) + observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel")) + observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) + view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}) + view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget) + observer.dist.LeaderViewManager.Update(2, view) + + suite.checker.Deactivate() + tasks := suite.checker.Check(context.TODO()) + suite.Len(tasks, 0) + suite.checker.Activate() + tasks = suite.checker.Check(context.TODO()) + suite.Len(tasks, 1) + suite.Equal(tasks[0].Source(), utils.LeaderChecker) + suite.Len(tasks[0].Actions(), 1) + suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeGrow) + suite.Equal(tasks[0].Actions()[0].Node(), int64(1)) + suite.Equal(tasks[0].Actions()[0].(*task.SegmentAction).SegmentID(), int64(1)) + suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh) +} + +func (suite *LeaderCheckerTestSuite) TestStoppingNode() { + observer := suite.checker + observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) + observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) + observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) + segments := []*datapb.SegmentInfo{ + { + ID: 1, + PartitionID: 1, + InsertChannel: "test-insert-channel", + }, + } + channels := []*datapb.VchannelInfo{ + { + CollectionID: 1, + ChannelName: "test-insert-channel", + }, + } + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( + channels, segments, nil) + observer.target.UpdateCollectionNextTarget(int64(1)) + observer.target.UpdateCollectionCurrentTarget(1) + observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel")) + observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) + view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}) + view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget) + observer.dist.LeaderViewManager.Update(2, view) + + suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost")) + suite.nodeMgr.Stopping(2) + + tasks := suite.checker.Check(context.TODO()) + suite.Len(tasks, 0) +} + +func (suite *LeaderCheckerTestSuite) TestIgnoreSyncLoadedSegments() { + observer := suite.checker + observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) + observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) + observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) + segments := []*datapb.SegmentInfo{ + { + ID: 1, + PartitionID: 1, + InsertChannel: "test-insert-channel", + }, + } + channels := []*datapb.VchannelInfo{ + { + CollectionID: 1, + ChannelName: "test-insert-channel", + }, + } + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( + channels, segments, nil) + observer.target.UpdateCollectionNextTarget(int64(1)) + observer.target.UpdateCollectionCurrentTarget(1) + observer.target.UpdateCollectionNextTarget(int64(1)) + observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel"), + utils.CreateTestSegment(1, 1, 2, 2, 1, "test-insert-channel")) + observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) + view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}) + view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget) + observer.dist.LeaderViewManager.Update(2, view) + + tasks := suite.checker.Check(context.TODO()) + suite.Len(tasks, 1) + suite.Equal(tasks[0].Source(), utils.LeaderChecker) + suite.Len(tasks[0].Actions(), 1) + suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeGrow) + suite.Equal(tasks[0].Actions()[0].Node(), int64(1)) + suite.Equal(tasks[0].Actions()[0].(*task.SegmentAction).SegmentID(), int64(1)) + suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh) +} + +func (suite *LeaderCheckerTestSuite) TestIgnoreBalancedSegment() { + observer := suite.checker + observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) + observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) + observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) + segments := []*datapb.SegmentInfo{ + { + ID: 1, + PartitionID: 1, + InsertChannel: "test-insert-channel", + }, + } + channels := []*datapb.VchannelInfo{ + { + CollectionID: 1, + ChannelName: "test-insert-channel", + }, + } + + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( + channels, segments, nil) + observer.target.UpdateCollectionNextTarget(int64(1)) + observer.target.UpdateCollectionCurrentTarget(1) + observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, 1, "test-insert-channel")) + observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) + + // dist with older version and leader view with newer version + leaderView := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}) + leaderView.Segments[1] = &querypb.SegmentDist{ + NodeID: 2, + Version: 2, + } + leaderView.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget) + observer.dist.LeaderViewManager.Update(2, leaderView) + + // test querynode-1 and querynode-2 exist + suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost")) + tasks := suite.checker.Check(context.TODO()) + suite.Len(tasks, 0) + + // test querynode-2 crash + suite.nodeMgr.Remove(2) + tasks = suite.checker.Check(context.TODO()) + suite.Len(tasks, 1) + suite.Equal(tasks[0].Source(), utils.LeaderChecker) + suite.Len(tasks[0].Actions(), 1) + suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeGrow) + suite.Equal(tasks[0].Actions()[0].Node(), int64(1)) + suite.Equal(tasks[0].Actions()[0].(*task.SegmentAction).SegmentID(), int64(1)) + suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh) +} + +func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegmentsWithReplicas() { + observer := suite.checker + observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 2)) + observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) + observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) + observer.meta.ReplicaManager.Put(utils.CreateTestReplica(2, 1, []int64{3, 4})) + segments := []*datapb.SegmentInfo{ + { + ID: 1, + PartitionID: 1, + InsertChannel: "test-insert-channel", + }, + } + channels := []*datapb.VchannelInfo{ + { + CollectionID: 1, + ChannelName: "test-insert-channel", + }, + } + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( + channels, segments, nil) + observer.target.UpdateCollectionNextTarget(int64(1)) + observer.target.UpdateCollectionCurrentTarget(1) + observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, 0, "test-insert-channel")) + observer.dist.SegmentDistManager.Update(4, utils.CreateTestSegment(1, 1, 1, 4, 0, "test-insert-channel")) + observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) + observer.dist.ChannelDistManager.Update(4, utils.CreateTestChannel(1, 4, 2, "test-insert-channel")) + view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}) + view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget) + observer.dist.LeaderViewManager.Update(2, view) + view2 := utils.CreateTestLeaderView(4, 1, "test-insert-channel", map[int64]int64{1: 4}, map[int64]*meta.Segment{}) + view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget) + observer.dist.LeaderViewManager.Update(4, view2) + + tasks := suite.checker.Check(context.TODO()) + suite.Len(tasks, 1) + suite.Equal(tasks[0].Source(), utils.LeaderChecker) + suite.Equal(tasks[0].ReplicaID(), int64(1)) + suite.Len(tasks[0].Actions(), 1) + suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeGrow) + suite.Equal(tasks[0].Actions()[0].Node(), int64(1)) + suite.Equal(tasks[0].Actions()[0].(*task.SegmentAction).SegmentID(), int64(1)) + suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh) +} + +func (suite *LeaderCheckerTestSuite) TestSyncRemovedSegments() { + observer := suite.checker + observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) + observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) + observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) + + channels := []*datapb.VchannelInfo{ + { + CollectionID: 1, + ChannelName: "test-insert-channel", + }, + } + + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( + channels, nil, nil) + observer.target.UpdateCollectionNextTarget(int64(1)) + observer.target.UpdateCollectionCurrentTarget(1) + + observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) + view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 2}, map[int64]*meta.Segment{}) + view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget) + observer.dist.LeaderViewManager.Update(2, view) + + tasks := suite.checker.Check(context.TODO()) + suite.Len(tasks, 1) + suite.Equal(tasks[0].Source(), utils.LeaderChecker) + suite.Equal(tasks[0].ReplicaID(), int64(1)) + suite.Len(tasks[0].Actions(), 1) + suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeReduce) + suite.Equal(tasks[0].Actions()[0].Node(), int64(2)) + suite.Equal(tasks[0].Actions()[0].(*task.SegmentAction).SegmentID(), int64(3)) + suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh) +} + +func (suite *LeaderCheckerTestSuite) TestIgnoreSyncRemovedSegments() { + observer := suite.checker + observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) + observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) + observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) + + segments := []*datapb.SegmentInfo{ + { + ID: 2, + PartitionID: 1, + InsertChannel: "test-insert-channel", + }, + } + channels := []*datapb.VchannelInfo{ + { + CollectionID: 1, + ChannelName: "test-insert-channel", + }, + } + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( + channels, segments, nil) + observer.target.UpdateCollectionNextTarget(int64(1)) + + observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) + observer.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 2, 2: 2}, map[int64]*meta.Segment{})) + + tasks := suite.checker.Check(context.TODO()) + suite.Len(tasks, 1) + suite.Equal(tasks[0].Source(), utils.LeaderChecker) + suite.Equal(tasks[0].ReplicaID(), int64(1)) + suite.Len(tasks[0].Actions(), 1) + suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeReduce) + suite.Equal(tasks[0].Actions()[0].Node(), int64(2)) + suite.Equal(tasks[0].Actions()[0].(*task.SegmentAction).SegmentID(), int64(3)) + suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh) +} + +func TestLeaderCheckerSuite(t *testing.T) { + suite.Run(t, new(LeaderCheckerTestSuite)) +} diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index 603bdb167b..91941f6f1b 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -61,8 +61,8 @@ func NewSegmentChecker( } } -func (c *SegmentChecker) ID() CheckerType { - return segmentChecker +func (c *SegmentChecker) ID() utils.CheckerType { + return utils.SegmentChecker } func (c *SegmentChecker) Description() string { diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index 437ff5338e..6e95245bbf 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -41,7 +41,6 @@ type CollectionObserver struct { meta *meta.Meta targetMgr *meta.TargetManager targetObserver *TargetObserver - leaderObserver *LeaderObserver checkerController *checkers.CheckerController partitionLoadedCount map[int64]int @@ -53,7 +52,6 @@ func NewCollectionObserver( meta *meta.Meta, targetMgr *meta.TargetManager, targetObserver *TargetObserver, - leaderObserver *LeaderObserver, checherController *checkers.CheckerController, ) *CollectionObserver { return &CollectionObserver{ @@ -61,7 +59,6 @@ func NewCollectionObserver( meta: meta, targetMgr: targetMgr, targetObserver: targetObserver, - leaderObserver: leaderObserver, checkerController: checherController, partitionLoadedCount: make(map[int64]int), } diff --git a/internal/querycoordv2/observers/collection_observer_test.go b/internal/querycoordv2/observers/collection_observer_test.go index 653f2b3570..0d2e8e401a 100644 --- a/internal/querycoordv2/observers/collection_observer_test.go +++ b/internal/querycoordv2/observers/collection_observer_test.go @@ -66,7 +66,6 @@ type CollectionObserverSuite struct { meta *meta.Meta targetMgr *meta.TargetManager targetObserver *TargetObserver - leaderObserver *LeaderObserver checkerController *checkers.CheckerController // Test object @@ -200,7 +199,6 @@ func (suite *CollectionObserverSuite) SetupTest() { suite.checkerController = &checkers.CheckerController{} mockCluster := session.NewMockCluster(suite.T()) - suite.leaderObserver = NewLeaderObserver(suite.dist, suite.meta, suite.targetMgr, suite.broker, mockCluster, nodeMgr) mockCluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() // Test object @@ -209,7 +207,6 @@ func (suite *CollectionObserverSuite) SetupTest() { suite.meta, suite.targetMgr, suite.targetObserver, - suite.leaderObserver, suite.checkerController, ) @@ -217,7 +214,6 @@ func (suite *CollectionObserverSuite) SetupTest() { suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil).Maybe() } suite.targetObserver.Start() - suite.leaderObserver.Start() suite.ob.Start() suite.loadAll() } diff --git a/internal/querycoordv2/observers/leader_observer.go b/internal/querycoordv2/observers/leader_observer.go deleted file mode 100644 index b6699a7224..0000000000 --- a/internal/querycoordv2/observers/leader_observer.go +++ /dev/null @@ -1,290 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package observers - -import ( - "context" - "sync" - "time" - - "go.uber.org/zap" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus/internal/proto/querypb" - "github.com/milvus-io/milvus/internal/querycoordv2/meta" - "github.com/milvus-io/milvus/internal/querycoordv2/session" - "github.com/milvus-io/milvus/internal/querycoordv2/utils" - "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/util/commonpbutil" - "github.com/milvus-io/milvus/pkg/util/paramtable" -) - -// LeaderObserver is to sync the distribution with leader -type LeaderObserver struct { - wg sync.WaitGroup - cancel context.CancelFunc - dist *meta.DistributionManager - meta *meta.Meta - target *meta.TargetManager - broker meta.Broker - cluster session.Cluster - nodeMgr *session.NodeManager - - dispatcher *taskDispatcher[int64] - - stopOnce sync.Once -} - -func (o *LeaderObserver) Start() { - ctx, cancel := context.WithCancel(context.Background()) - o.cancel = cancel - - o.dispatcher.Start() - - o.wg.Add(1) - go func() { - defer o.wg.Done() - o.schedule(ctx) - }() -} - -func (o *LeaderObserver) Stop() { - o.stopOnce.Do(func() { - if o.cancel != nil { - o.cancel() - } - o.wg.Wait() - - o.dispatcher.Stop() - }) -} - -func (o *LeaderObserver) schedule(ctx context.Context) { - ticker := time.NewTicker(paramtable.Get().QueryCoordCfg.LeaderViewUpdateInterval.GetAsDuration(time.Second)) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - log.Info("stop leader observer") - return - - case <-ticker.C: - o.observe(ctx) - } - } -} - -func (o *LeaderObserver) observe(ctx context.Context) { - o.observeSegmentsDist(ctx) -} - -func (o *LeaderObserver) readyToObserve(collectionID int64) bool { - metaExist := (o.meta.GetCollection(collectionID) != nil) - targetExist := o.target.IsNextTargetExist(collectionID) || o.target.IsCurrentTargetExist(collectionID) - - return metaExist && targetExist -} - -func (o *LeaderObserver) observeSegmentsDist(ctx context.Context) { - collectionIDs := o.meta.CollectionManager.GetAll() - for _, cid := range collectionIDs { - if o.readyToObserve(cid) { - o.dispatcher.AddTask(cid) - } - } -} - -func (o *LeaderObserver) observeCollection(ctx context.Context, collection int64) { - replicas := o.meta.ReplicaManager.GetByCollection(collection) - for _, replica := range replicas { - leaders := o.dist.ChannelDistManager.GetShardLeadersByReplica(replica) - for ch, leaderID := range leaders { - if ok, _ := o.nodeMgr.IsStoppingNode(leaderID); ok { - // no need to correct leader's view which is loaded on stopping node - continue - } - - leaderView := o.dist.LeaderViewManager.GetLeaderShardView(leaderID, ch) - if leaderView == nil { - continue - } - dists := o.dist.SegmentDistManager.GetByShardWithReplica(ch, replica) - - actions := o.findNeedLoadedSegments(leaderView, dists) - actions = append(actions, o.findNeedRemovedSegments(leaderView, dists)...) - o.sync(ctx, replica.GetID(), leaderView, actions) - } - } -} - -func (o *LeaderObserver) findNeedLoadedSegments(leaderView *meta.LeaderView, dists []*meta.Segment) []*querypb.SyncAction { - ret := make([]*querypb.SyncAction, 0) - dists = utils.FindMaxVersionSegments(dists) - for _, s := range dists { - version, ok := leaderView.Segments[s.GetID()] - currentTarget := o.target.GetSealedSegment(s.CollectionID, s.GetID(), meta.CurrentTarget) - existInCurrentTarget := currentTarget != nil - existInNextTarget := o.target.GetSealedSegment(s.CollectionID, s.GetID(), meta.NextTarget) != nil - - if !existInCurrentTarget && !existInNextTarget { - continue - } - - if !ok || version.GetVersion() < s.Version { // Leader misses this segment - ctx := context.Background() - resp, err := o.broker.GetSegmentInfo(ctx, s.GetID()) - if err != nil || len(resp.GetInfos()) == 0 { - log.Warn("failed to get segment info from DataCoord", zap.Error(err)) - continue - } - - if channel := o.target.GetDmChannel(s.GetCollectionID(), s.GetInsertChannel(), meta.CurrentTargetFirst); channel != nil { - loadInfo := utils.PackSegmentLoadInfo(resp.GetInfos()[0], channel.GetSeekPosition(), nil) - - log.Debug("leader observer append a segment to set", - zap.Int64("collectionID", leaderView.CollectionID), - zap.String("channel", leaderView.Channel), - zap.Int64("leaderViewID", leaderView.ID), - zap.Int64("segmentID", s.GetID()), - zap.Int64("nodeID", s.Node)) - ret = append(ret, &querypb.SyncAction{ - Type: querypb.SyncType_Set, - PartitionID: s.GetPartitionID(), - SegmentID: s.GetID(), - NodeID: s.Node, - Version: s.Version, - Info: loadInfo, - }) - } - } - } - return ret -} - -func (o *LeaderObserver) findNeedRemovedSegments(leaderView *meta.LeaderView, dists []*meta.Segment) []*querypb.SyncAction { - ret := make([]*querypb.SyncAction, 0) - distMap := make(map[int64]struct{}) - for _, s := range dists { - distMap[s.GetID()] = struct{}{} - } - for sid, s := range leaderView.Segments { - _, ok := distMap[sid] - existInCurrentTarget := o.target.GetSealedSegment(leaderView.CollectionID, sid, meta.CurrentTarget) != nil - existInNextTarget := o.target.GetSealedSegment(leaderView.CollectionID, sid, meta.NextTarget) != nil - if ok || existInCurrentTarget || existInNextTarget { - continue - } - log.Debug("leader observer append a segment to remove", - zap.Int64("collectionID", leaderView.CollectionID), - zap.String("channel", leaderView.Channel), - zap.Int64("leaderViewID", leaderView.ID), - zap.Int64("segmentID", sid), - zap.Int64("nodeID", s.NodeID)) - ret = append(ret, &querypb.SyncAction{ - Type: querypb.SyncType_Remove, - SegmentID: sid, - NodeID: s.NodeID, - }) - } - return ret -} - -func (o *LeaderObserver) sync(ctx context.Context, replicaID int64, leaderView *meta.LeaderView, diffs []*querypb.SyncAction) bool { - if len(diffs) == 0 { - return true - } - - log := log.With( - zap.Int64("leaderID", leaderView.ID), - zap.Int64("collectionID", leaderView.CollectionID), - zap.String("channel", leaderView.Channel), - ) - - collectionInfo, err := o.broker.DescribeCollection(ctx, leaderView.CollectionID) - if err != nil { - log.Warn("failed to get collection info", zap.Error(err)) - return false - } - - // Get collection index info - indexInfo, err := o.broker.DescribeIndex(ctx, collectionInfo.CollectionID) - if err != nil { - log.Warn("fail to get index info of collection", zap.Error(err)) - return false - } - - partitions, err := utils.GetPartitions(o.meta.CollectionManager, leaderView.CollectionID) - if err != nil { - log.Warn("failed to get partitions", zap.Error(err)) - return false - } - - req := &querypb.SyncDistributionRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_SyncDistribution), - ), - CollectionID: leaderView.CollectionID, - ReplicaID: replicaID, - Channel: leaderView.Channel, - Actions: diffs, - Schema: collectionInfo.GetSchema(), - LoadMeta: &querypb.LoadMetaInfo{ - LoadType: o.meta.GetLoadType(leaderView.CollectionID), - CollectionID: leaderView.CollectionID, - PartitionIDs: partitions, - }, - Version: time.Now().UnixNano(), - IndexInfoList: indexInfo, - } - ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond)) - defer cancel() - resp, err := o.cluster.SyncDistribution(ctx, leaderView.ID, req) - if err != nil { - log.Warn("failed to sync distribution", zap.Error(err)) - return false - } - - if resp.ErrorCode != commonpb.ErrorCode_Success { - log.Warn("failed to sync distribution", zap.String("reason", resp.GetReason())) - return false - } - - return true -} - -func NewLeaderObserver( - dist *meta.DistributionManager, - meta *meta.Meta, - targetMgr *meta.TargetManager, - broker meta.Broker, - cluster session.Cluster, - nodeMgr *session.NodeManager, -) *LeaderObserver { - ob := &LeaderObserver{ - dist: dist, - meta: meta, - target: targetMgr, - broker: broker, - cluster: cluster, - nodeMgr: nodeMgr, - } - - dispatcher := newTaskDispatcher[int64](ob.observeCollection) - ob.dispatcher = dispatcher - - return ob -} diff --git a/internal/querycoordv2/observers/leader_observer_test.go b/internal/querycoordv2/observers/leader_observer_test.go deleted file mode 100644 index 8077b88a19..0000000000 --- a/internal/querycoordv2/observers/leader_observer_test.go +++ /dev/null @@ -1,558 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package observers - -import ( - "context" - "fmt" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/suite" - "go.uber.org/atomic" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" - "github.com/milvus-io/milvus/internal/kv" - etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" - "github.com/milvus-io/milvus/internal/metastore/kv/querycoord" - "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/indexpb" - "github.com/milvus-io/milvus/internal/proto/querypb" - "github.com/milvus-io/milvus/internal/querycoordv2/meta" - . "github.com/milvus-io/milvus/internal/querycoordv2/params" - "github.com/milvus-io/milvus/internal/querycoordv2/session" - "github.com/milvus-io/milvus/internal/querycoordv2/utils" - "github.com/milvus-io/milvus/pkg/util/etcd" - "github.com/milvus-io/milvus/pkg/util/paramtable" -) - -type LeaderObserverTestSuite struct { - suite.Suite - observer *LeaderObserver - kv kv.MetaKv - mockCluster *session.MockCluster - - meta *meta.Meta - broker *meta.MockBroker -} - -func (suite *LeaderObserverTestSuite) SetupSuite() { - paramtable.Init() -} - -func (suite *LeaderObserverTestSuite) SetupTest() { - var err error - config := GenerateEtcdConfig() - cli, err := etcd.GetEtcdClient( - config.UseEmbedEtcd.GetAsBool(), - config.EtcdUseSSL.GetAsBool(), - config.Endpoints.GetAsStrings(), - config.EtcdTLSCert.GetValue(), - config.EtcdTLSKey.GetValue(), - config.EtcdTLSCACert.GetValue(), - config.EtcdTLSMinVersion.GetValue()) - suite.Require().NoError(err) - suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue()) - - // meta - store := querycoord.NewCatalog(suite.kv) - idAllocator := RandomIncrementIDAllocator() - nodeMgr := session.NewNodeManager() - suite.meta = meta.NewMeta(idAllocator, store, nodeMgr) - suite.broker = meta.NewMockBroker(suite.T()) - - suite.mockCluster = session.NewMockCluster(suite.T()) - // suite.mockCluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(&commonpb.Status{ - // ErrorCode: commonpb.ErrorCode_Success, - // }, nil).Maybe() - distManager := meta.NewDistributionManager() - targetManager := meta.NewTargetManager(suite.broker, suite.meta) - suite.observer = NewLeaderObserver(distManager, suite.meta, targetManager, suite.broker, suite.mockCluster, nodeMgr) -} - -func (suite *LeaderObserverTestSuite) TearDownTest() { - suite.observer.Stop() - suite.kv.Close() -} - -func (suite *LeaderObserverTestSuite) TestSyncLoadedSegments() { - observer := suite.observer - observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) - observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) - observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) - segments := []*datapb.SegmentInfo{ - { - ID: 1, - PartitionID: 1, - InsertChannel: "test-insert-channel", - }, - } - channels := []*datapb.VchannelInfo{ - { - CollectionID: 1, - ChannelName: "test-insert-channel", - }, - } - info := &datapb.SegmentInfo{ - ID: 1, - CollectionID: 1, - PartitionID: 1, - InsertChannel: "test-insert-channel", - } - schema := utils.CreateTestSchema() - suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(1)).Return(&milvuspb.DescribeCollectionResponse{Schema: schema}, nil) - suite.broker.EXPECT().GetSegmentInfo(mock.Anything, int64(1)).Return( - &datapb.GetSegmentInfoResponse{Infos: []*datapb.SegmentInfo{info}}, nil) - // will cause sync failed once - suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock error")).Once() - suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{ - {IndexName: "test"}, - }, nil) - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( - channels, segments, nil) - observer.target.UpdateCollectionNextTarget(int64(1)) - observer.target.UpdateCollectionCurrentTarget(1) - observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel")) - observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) - view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}) - view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget) - observer.dist.LeaderViewManager.Update(2, view) - loadInfo := utils.PackSegmentLoadInfo(info, nil, nil) - - expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest { - return &querypb.SyncDistributionRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_SyncDistribution, - }, - CollectionID: 1, - ReplicaID: 1, - Channel: "test-insert-channel", - Actions: []*querypb.SyncAction{ - { - Type: querypb.SyncType_Set, - PartitionID: 1, - SegmentID: 1, - NodeID: 1, - Version: 1, - Info: loadInfo, - }, - }, - Schema: schema, - LoadMeta: &querypb.LoadMetaInfo{ - CollectionID: 1, - PartitionIDs: []int64{1}, - }, - Version: version, - IndexInfoList: []*indexpb.IndexInfo{{IndexName: "test"}}, - } - } - - called := atomic.NewBool(false) - suite.mockCluster.EXPECT().SyncDistribution(mock.Anything, int64(2), - mock.AnythingOfType("*querypb.SyncDistributionRequest")). - Run(func(ctx context.Context, nodeID int64, req *querypb.SyncDistributionRequest) { - assert.ElementsMatch(suite.T(), []*querypb.SyncDistributionRequest{req}, - []*querypb.SyncDistributionRequest{expectReqeustFunc(req.GetVersion())}) - called.Store(true) - }). - Return(&commonpb.Status{}, nil) - - observer.Start() - - suite.Eventually( - func() bool { - return called.Load() - }, - 10*time.Second, - 500*time.Millisecond, - ) -} - -func (suite *LeaderObserverTestSuite) TestIgnoreSyncLoadedSegments() { - observer := suite.observer - observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) - observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) - observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) - segments := []*datapb.SegmentInfo{ - { - ID: 1, - PartitionID: 1, - InsertChannel: "test-insert-channel", - }, - } - channels := []*datapb.VchannelInfo{ - { - CollectionID: 1, - ChannelName: "test-insert-channel", - }, - } - schema := utils.CreateTestSchema() - suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(1)).Return(&milvuspb.DescribeCollectionResponse{Schema: schema}, nil) - info := &datapb.SegmentInfo{ - ID: 1, - CollectionID: 1, - PartitionID: 1, - InsertChannel: "test-insert-channel", - } - suite.broker.EXPECT().GetSegmentInfo(mock.Anything, int64(1)).Return( - &datapb.GetSegmentInfoResponse{Infos: []*datapb.SegmentInfo{info}}, nil) - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( - channels, segments, nil) - suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{ - {IndexName: "test"}, - }, nil) - observer.target.UpdateCollectionNextTarget(int64(1)) - observer.target.UpdateCollectionCurrentTarget(1) - observer.target.UpdateCollectionNextTarget(int64(1)) - observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel"), - utils.CreateTestSegment(1, 1, 2, 2, 1, "test-insert-channel")) - observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) - view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}) - view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget) - observer.dist.LeaderViewManager.Update(2, view) - loadInfo := utils.PackSegmentLoadInfo(info, nil, nil) - - expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest { - return &querypb.SyncDistributionRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_SyncDistribution, - }, - CollectionID: 1, - ReplicaID: 1, - Channel: "test-insert-channel", - Actions: []*querypb.SyncAction{ - { - Type: querypb.SyncType_Set, - PartitionID: 1, - SegmentID: 1, - NodeID: 1, - Version: 1, - Info: loadInfo, - }, - }, - Schema: schema, - LoadMeta: &querypb.LoadMetaInfo{ - CollectionID: 1, - PartitionIDs: []int64{1}, - }, - Version: version, - IndexInfoList: []*indexpb.IndexInfo{{IndexName: "test"}}, - } - } - called := atomic.NewBool(false) - suite.mockCluster.EXPECT().SyncDistribution(mock.Anything, int64(2), mock.AnythingOfType("*querypb.SyncDistributionRequest")). - Run(func(ctx context.Context, nodeID int64, req *querypb.SyncDistributionRequest) { - assert.ElementsMatch(suite.T(), []*querypb.SyncDistributionRequest{req}, - []*querypb.SyncDistributionRequest{expectReqeustFunc(req.GetVersion())}) - called.Store(true) - }). - Return(&commonpb.Status{}, nil) - - observer.Start() - - suite.Eventually( - func() bool { - return called.Load() - }, - 10*time.Second, - 500*time.Millisecond, - ) -} - -func (suite *LeaderObserverTestSuite) TestIgnoreBalancedSegment() { - observer := suite.observer - observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) - observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) - observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) - segments := []*datapb.SegmentInfo{ - { - ID: 1, - PartitionID: 1, - InsertChannel: "test-insert-channel", - }, - } - channels := []*datapb.VchannelInfo{ - { - CollectionID: 1, - ChannelName: "test-insert-channel", - }, - } - - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( - channels, segments, nil) - observer.target.UpdateCollectionNextTarget(int64(1)) - observer.target.UpdateCollectionCurrentTarget(1) - observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, 1, "test-insert-channel")) - observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) - - // The leader view saw the segment on new node, - // but another nodes not yet - leaderView := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}) - leaderView.Segments[1] = &querypb.SegmentDist{ - NodeID: 2, - Version: 2, - } - leaderView.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget) - observer.dist.LeaderViewManager.Update(2, leaderView) - observer.Start() - - // Nothing should happen - time.Sleep(2 * time.Second) -} - -func (suite *LeaderObserverTestSuite) TestSyncLoadedSegmentsWithReplicas() { - observer := suite.observer - observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 2)) - observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) - observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) - observer.meta.ReplicaManager.Put(utils.CreateTestReplica(2, 1, []int64{3, 4})) - segments := []*datapb.SegmentInfo{ - { - ID: 1, - PartitionID: 1, - InsertChannel: "test-insert-channel", - }, - } - channels := []*datapb.VchannelInfo{ - { - CollectionID: 1, - ChannelName: "test-insert-channel", - }, - } - info := &datapb.SegmentInfo{ - ID: 1, - CollectionID: 1, - PartitionID: 1, - InsertChannel: "test-insert-channel", - } - schema := utils.CreateTestSchema() - suite.broker.EXPECT().GetSegmentInfo(mock.Anything, int64(1)).Return( - &datapb.GetSegmentInfoResponse{Infos: []*datapb.SegmentInfo{info}}, nil) - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( - channels, segments, nil) - suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{{IndexName: "test"}}, nil) - suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(1)).Return(&milvuspb.DescribeCollectionResponse{Schema: schema}, nil) - observer.target.UpdateCollectionNextTarget(int64(1)) - observer.target.UpdateCollectionCurrentTarget(1) - observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, 1, "test-insert-channel")) - observer.dist.SegmentDistManager.Update(4, utils.CreateTestSegment(1, 1, 1, 4, 2, "test-insert-channel")) - observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) - view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}) - view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget) - observer.dist.LeaderViewManager.Update(2, view) - view2 := utils.CreateTestLeaderView(4, 1, "test-insert-channel", map[int64]int64{1: 4}, map[int64]*meta.Segment{}) - view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget) - observer.dist.LeaderViewManager.Update(4, view2) - loadInfo := utils.PackSegmentLoadInfo(info, nil, nil) - - expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest { - return &querypb.SyncDistributionRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_SyncDistribution, - }, - CollectionID: 1, - ReplicaID: 1, - Channel: "test-insert-channel", - Actions: []*querypb.SyncAction{ - { - Type: querypb.SyncType_Set, - PartitionID: 1, - SegmentID: 1, - NodeID: 1, - Version: 1, - Info: loadInfo, - }, - }, - Schema: schema, - LoadMeta: &querypb.LoadMetaInfo{ - CollectionID: 1, - PartitionIDs: []int64{1}, - }, - Version: version, - IndexInfoList: []*indexpb.IndexInfo{{IndexName: "test"}}, - } - } - called := atomic.NewBool(false) - suite.mockCluster.EXPECT().SyncDistribution(mock.Anything, int64(2), - mock.AnythingOfType("*querypb.SyncDistributionRequest")). - Run(func(ctx context.Context, nodeID int64, req *querypb.SyncDistributionRequest) { - assert.ElementsMatch(suite.T(), []*querypb.SyncDistributionRequest{req}, - []*querypb.SyncDistributionRequest{expectReqeustFunc(req.GetVersion())}) - called.Store(true) - }). - Return(&commonpb.Status{}, nil) - - observer.Start() - - suite.Eventually( - func() bool { - return called.Load() - }, - 10*time.Second, - 500*time.Millisecond, - ) -} - -func (suite *LeaderObserverTestSuite) TestSyncRemovedSegments() { - observer := suite.observer - observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) - observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) - observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) - - schema := utils.CreateTestSchema() - suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(1)).Return(&milvuspb.DescribeCollectionResponse{Schema: schema}, nil) - suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{ - {IndexName: "test"}, - }, nil) - channels := []*datapb.VchannelInfo{ - { - CollectionID: 1, - ChannelName: "test-insert-channel", - }, - } - - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( - channels, nil, nil) - observer.target.UpdateCollectionNextTarget(int64(1)) - observer.target.UpdateCollectionCurrentTarget(1) - - observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) - view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 2}, map[int64]*meta.Segment{}) - view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget) - observer.dist.LeaderViewManager.Update(2, view) - - expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest { - return &querypb.SyncDistributionRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_SyncDistribution, - }, - CollectionID: 1, - ReplicaID: 1, - Channel: "test-insert-channel", - Actions: []*querypb.SyncAction{ - { - Type: querypb.SyncType_Remove, - SegmentID: 3, - NodeID: 2, - }, - }, - Schema: schema, - LoadMeta: &querypb.LoadMetaInfo{ - CollectionID: 1, - PartitionIDs: []int64{1}, - }, - Version: version, - IndexInfoList: []*indexpb.IndexInfo{{IndexName: "test"}}, - } - } - ch := make(chan struct{}) - suite.mockCluster.EXPECT().SyncDistribution(mock.Anything, int64(2), - mock.AnythingOfType("*querypb.SyncDistributionRequest")). - Run(func(ctx context.Context, nodeID int64, req *querypb.SyncDistributionRequest) { - assert.ElementsMatch(suite.T(), []*querypb.SyncDistributionRequest{req}, - []*querypb.SyncDistributionRequest{expectReqeustFunc(req.GetVersion())}) - close(ch) - }). - Return(&commonpb.Status{}, nil) - - observer.Start() - - select { - case <-ch: - case <-time.After(2 * time.Second): - } -} - -func (suite *LeaderObserverTestSuite) TestIgnoreSyncRemovedSegments() { - observer := suite.observer - observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) - observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) - observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) - - segments := []*datapb.SegmentInfo{ - { - ID: 2, - PartitionID: 1, - InsertChannel: "test-insert-channel", - }, - } - channels := []*datapb.VchannelInfo{ - { - CollectionID: 1, - ChannelName: "test-insert-channel", - }, - } - schema := utils.CreateTestSchema() - suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(1)).Return(&milvuspb.DescribeCollectionResponse{Schema: schema}, nil) - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( - channels, segments, nil) - suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{ - {IndexName: "test"}, - }, nil) - observer.target.UpdateCollectionNextTarget(int64(1)) - - observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) - observer.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 2, 2: 2}, map[int64]*meta.Segment{})) - - expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest { - return &querypb.SyncDistributionRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_SyncDistribution, - }, - CollectionID: 1, - ReplicaID: 1, - Channel: "test-insert-channel", - Actions: []*querypb.SyncAction{ - { - Type: querypb.SyncType_Remove, - SegmentID: 3, - NodeID: 2, - }, - }, - Schema: schema, - LoadMeta: &querypb.LoadMetaInfo{ - CollectionID: 1, - PartitionIDs: []int64{1}, - }, - Version: version, - IndexInfoList: []*indexpb.IndexInfo{{IndexName: "test"}}, - } - } - called := atomic.NewBool(false) - suite.mockCluster.EXPECT().SyncDistribution(mock.Anything, int64(2), mock.AnythingOfType("*querypb.SyncDistributionRequest")). - Run(func(ctx context.Context, nodeID int64, req *querypb.SyncDistributionRequest) { - assert.ElementsMatch(suite.T(), []*querypb.SyncDistributionRequest{req}, - []*querypb.SyncDistributionRequest{expectReqeustFunc(req.GetVersion())}) - called.Store(true) - }). - Return(&commonpb.Status{}, nil) - - observer.Start() - suite.Eventually(func() bool { - return called.Load() - }, - 10*time.Second, - 500*time.Millisecond, - ) -} - -func TestLeaderObserverSuite(t *testing.T) { - suite.Run(t, new(LeaderObserverTestSuite)) -} diff --git a/internal/querycoordv2/ops_services.go b/internal/querycoordv2/ops_services.go index ce5a8ce44b..d3fe4ddd73 100644 --- a/internal/querycoordv2/ops_services.go +++ b/internal/querycoordv2/ops_services.go @@ -23,7 +23,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/proto/querypb" - "github.com/milvus-io/milvus/internal/querycoordv2/checkers" + "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -73,7 +73,7 @@ func (s *Server) ActivateChecker(ctx context.Context, req *querypb.ActivateCheck log.Warn("failed to activate checker", zap.Error(err)) return merr.Status(err), nil } - if err := s.checkerController.Activate(checkers.CheckerType(req.CheckerID)); err != nil { + if err := s.checkerController.Activate(utils.CheckerType(req.CheckerID)); err != nil { log.Warn("failed to activate checker", zap.Error(err)) return merr.Status(merr.WrapErrServiceInternal(err.Error())), nil } @@ -87,7 +87,7 @@ func (s *Server) DeactivateChecker(ctx context.Context, req *querypb.DeactivateC log.Warn("failed to deactivate checker", zap.Error(err)) return merr.Status(err), nil } - if err := s.checkerController.Deactivate(checkers.CheckerType(req.CheckerID)); err != nil { + if err := s.checkerController.Deactivate(utils.CheckerType(req.CheckerID)); err != nil { log.Warn("failed to deactivate checker", zap.Error(err)) return merr.Status(merr.WrapErrServiceInternal(err.Error())), nil } diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index caa2285a6d..e55fc9255a 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -110,7 +110,6 @@ type Server struct { // Observers collectionObserver *observers.CollectionObserver - leaderObserver *observers.LeaderObserver targetObserver *observers.TargetObserver replicaObserver *observers.ReplicaObserver resourceObserver *observers.ResourceObserver @@ -363,14 +362,6 @@ func (s *Server) initMeta() error { func (s *Server) initObserver() { log.Info("init observers") - s.leaderObserver = observers.NewLeaderObserver( - s.dist, - s.meta, - s.targetMgr, - s.broker, - s.cluster, - s.nodeMgr, - ) s.targetObserver = observers.NewTargetObserver( s.meta, s.targetMgr, @@ -383,7 +374,6 @@ func (s *Server) initObserver() { s.meta, s.targetMgr, s.targetObserver, - s.leaderObserver, s.checkerController, ) @@ -451,7 +441,6 @@ func (s *Server) startServerLoop() { log.Info("start observers...") s.collectionObserver.Start() - s.leaderObserver.Start() s.targetObserver.Start() s.replicaObserver.Start() s.resourceObserver.Start() @@ -495,9 +484,6 @@ func (s *Server) Stop() error { if s.collectionObserver != nil { s.collectionObserver.Stop() } - if s.leaderObserver != nil { - s.leaderObserver.Stop() - } if s.targetObserver != nil { s.targetObserver.Stop() } diff --git a/internal/querycoordv2/server_test.go b/internal/querycoordv2/server_test.go index ed7a979d4b..6372e1d4af 100644 --- a/internal/querycoordv2/server_test.go +++ b/internal/querycoordv2/server_test.go @@ -582,7 +582,6 @@ func (suite *ServerSuite) hackServer() { suite.server.meta, suite.server.targetMgr, suite.server.targetObserver, - suite.server.leaderObserver, suite.server.checkerController, ) diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 8d2e350ab5..e5defb3f58 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -196,7 +196,6 @@ func (suite *ServiceSuite) SetupTest() { suite.server.meta, suite.server.targetMgr, suite.targetObserver, - suite.server.leaderObserver, &checkers.CheckerController{}, ) diff --git a/internal/querycoordv2/task/utils.go b/internal/querycoordv2/task/utils.go index 7ca95aae17..84a8f4f3bc 100644 --- a/internal/querycoordv2/task/utils.go +++ b/internal/querycoordv2/task/utils.go @@ -30,6 +30,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" + "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/funcutil" @@ -112,6 +113,9 @@ func packLoadSegmentRequest( loadScope = querypb.LoadScope_Index } + if task.Source() == utils.LeaderChecker { + loadScope = querypb.LoadScope_Delta + } // field mmap enabled if collection-level mmap enabled or the field mmap enabled collectionMmapEnabled := common.IsMmapEnabled(collectionProperties...) for _, field := range schema.GetFields() { diff --git a/internal/querycoordv2/utils/checker.go b/internal/querycoordv2/utils/checker.go index 6dffd3d166..3c3bdeb31f 100644 --- a/internal/querycoordv2/utils/checker.go +++ b/internal/querycoordv2/utils/checker.go @@ -21,6 +21,36 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) +const ( + SegmentCheckerName = "segment_checker" + ChannelCheckerName = "channel_checker" + BalanceCheckerName = "balance_checker" + IndexCheckerName = "index_checker" + LeaderCheckerName = "leader_checker" +) + +type CheckerType int32 + +const ( + ChannelChecker CheckerType = iota + 1 + SegmentChecker + BalanceChecker + IndexChecker + LeaderChecker +) + +var checkerNames = map[CheckerType]string{ + SegmentChecker: SegmentCheckerName, + ChannelChecker: ChannelCheckerName, + BalanceChecker: BalanceCheckerName, + IndexChecker: IndexCheckerName, + LeaderChecker: LeaderCheckerName, +} + +func (s CheckerType) String() string { + return checkerNames[s] +} + func FilterReleased[E interface{ GetCollectionID() int64 }](elems []E, collections []int64) []E { collectionSet := typeutil.NewUniqueSet(collections...) ret := make([]E, 0, len(elems))