From 62668b891c31c40adc8e78a8043e77e8ddc98062 Mon Sep 17 00:00:00 2001 From: wei liu Date: Tue, 20 Dec 2022 16:33:26 +0800 Subject: [PATCH] fix leader observer sync logic (#20478) (#21315) Signed-off-by: Wei Liu --- .../querycoordv2/checkers/segment_checker.go | 1 + .../checkers/segment_checker_test.go | 6 + .../querycoordv2/observers/leader_observer.go | 9 +- .../observers/leader_observer_test.go | 109 ++++++++++++++++++ internal/querycoordv2/task/task.go | 6 +- 5 files changed, 125 insertions(+), 6 deletions(-) diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index 3f3861e239..ab43ad979f 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -71,6 +71,7 @@ func (c *SegmentChecker) Check(ctx context.Context) []task.Task { segments := c.dist.SegmentDistManager.GetAll() released := utils.FilterReleased(segments, collectionIDs) tasks = append(tasks, c.createSegmentReduceTasks(ctx, released, -1, querypb.DataScope_All)...) + task.SetPriority(task.TaskPriorityNormal, tasks...) return tasks } diff --git a/internal/querycoordv2/checkers/segment_checker_test.go b/internal/querycoordv2/checkers/segment_checker_test.go index 2c36834184..3324421905 100644 --- a/internal/querycoordv2/checkers/segment_checker_test.go +++ b/internal/querycoordv2/checkers/segment_checker_test.go @@ -124,6 +124,7 @@ func (suite *SegmentCheckerTestSuite) TestLoadSegments() { suite.EqualValues(1, tasks[0].ReplicaID()) suite.Equal(task.ActionTypeGrow, action.Type()) suite.EqualValues(1, action.SegmentID()) + suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) } @@ -146,6 +147,7 @@ func (suite *SegmentCheckerTestSuite) TestReleaseSegments() { suite.EqualValues(1, tasks[0].ReplicaID()) suite.Equal(task.ActionTypeReduce, action.Type()) suite.EqualValues(2, action.SegmentID()) + suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) } func (suite *SegmentCheckerTestSuite) TestReleaseRepeatedSegments() { @@ -180,6 +182,7 @@ func (suite *SegmentCheckerTestSuite) TestReleaseRepeatedSegments() { suite.Equal(task.ActionTypeReduce, action.Type()) suite.EqualValues(1, action.SegmentID()) suite.EqualValues(1, action.Node()) + suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) // test less version exist on leader checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{1: 1}, map[int64]*meta.Segment{})) @@ -238,6 +241,7 @@ func (suite *SegmentCheckerTestSuite) TestReleaseGrowingSegments() { suite.Equal(task.ActionTypeReduce, action.Type()) suite.EqualValues(2, action.SegmentID()) suite.EqualValues(2, action.Node()) + suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) suite.Len(tasks[1].Actions(), 1) action, ok = tasks[1].Actions()[0].(*task.SegmentAction) @@ -246,6 +250,7 @@ func (suite *SegmentCheckerTestSuite) TestReleaseGrowingSegments() { suite.Equal(task.ActionTypeReduce, action.Type()) suite.EqualValues(3, action.SegmentID()) suite.EqualValues(2, action.Node()) + suite.Equal(tasks[1].Priority(), task.TaskPriorityNormal) } func (suite *SegmentCheckerTestSuite) TestReleaseDroppedSegments() { @@ -260,6 +265,7 @@ func (suite *SegmentCheckerTestSuite) TestReleaseDroppedSegments() { suite.Equal(task.ActionTypeReduce, action.Type()) suite.EqualValues(1, action.SegmentID()) suite.EqualValues(1, action.Node()) + suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) } func TestSegmentCheckerSuite(t *testing.T) { diff --git a/internal/querycoordv2/observers/leader_observer.go b/internal/querycoordv2/observers/leader_observer.go index c0a0ad05a9..eea8278894 100644 --- a/internal/querycoordv2/observers/leader_observer.go +++ b/internal/querycoordv2/observers/leader_observer.go @@ -105,8 +105,9 @@ func (o *LeaderObserver) findNeedLoadedSegments(leaderView *meta.LeaderView, dis dists = utils.FindMaxVersionSegments(dists) for _, s := range dists { version, ok := leaderView.Segments[s.GetID()] - if ok && version.GetVersion() >= s.Version || - o.target.GetHistoricalSegment(s.CollectionID, s.GetID(), meta.CurrentTarget) == nil { + existInCurrentTarget := o.target.GetHistoricalSegment(s.CollectionID, s.GetID(), meta.CurrentTarget) != nil + existInNextTarget := o.target.GetHistoricalSegment(s.CollectionID, s.GetID(), meta.NextTarget) != nil + if ok && version.GetVersion() >= s.Version || (!existInCurrentTarget && !existInNextTarget) { continue } ret = append(ret, &querypb.SyncAction{ @@ -128,7 +129,9 @@ func (o *LeaderObserver) findNeedRemovedSegments(leaderView *meta.LeaderView, di } for sid := range leaderView.Segments { _, ok := distMap[sid] - if ok || o.target.GetHistoricalSegment(leaderView.CollectionID, sid, meta.CurrentTarget) != nil { + existInCurrentTarget := o.target.GetHistoricalSegment(leaderView.CollectionID, sid, meta.CurrentTarget) != nil + existInNextTarget := o.target.GetHistoricalSegment(leaderView.CollectionID, sid, meta.NextTarget) != nil + if ok || existInCurrentTarget || existInNextTarget { continue } ret = append(ret, &querypb.SyncAction{ diff --git a/internal/querycoordv2/observers/leader_observer_test.go b/internal/querycoordv2/observers/leader_observer_test.go index 80d419b081..651f73cf0d 100644 --- a/internal/querycoordv2/observers/leader_observer_test.go +++ b/internal/querycoordv2/observers/leader_observer_test.go @@ -136,6 +136,63 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegments() { ) } +func (suite *LeaderObserverTestSuite) TestIgnoreSyncLoadedSegments() { + observer := suite.observer + observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) + observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) + segments := []*datapb.SegmentBinlogs{ + { + SegmentID: 1, + InsertChannel: "test-insert-channel", + }, + } + channels := []*datapb.VchannelInfo{ + { + CollectionID: 1, + ChannelName: "test-insert-channel", + }, + } + suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, int64(1), int64(1)).Return( + channels, segments, nil) + observer.target.UpdateCollectionNextTargetWithPartitions(int64(1), int64(1)) + observer.target.UpdateCollectionCurrentTarget(1) + observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel"), + utils.CreateTestSegment(1, 1, 2, 2, 1, "test-insert-channel")) + observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) + observer.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})) + + expectReq := &querypb.SyncDistributionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_SyncDistribution, + }, + CollectionID: 1, + Channel: "test-insert-channel", + Actions: []*querypb.SyncAction{ + { + Type: querypb.SyncType_Set, + PartitionID: 1, + SegmentID: 1, + NodeID: 1, + Version: 1, + }, + }, + } + called := atomic.NewBool(false) + suite.mockCluster.EXPECT().SyncDistribution(context.TODO(), int64(2), expectReq).Once(). + Run(func(args mock.Arguments) { called.Store(true) }). + Return(&commonpb.Status{}, nil) + + observer.Start(context.TODO()) + + suite.Eventually( + func() bool { + return called.Load() + }, + 10*time.Second, + 500*time.Millisecond, + ) +} + func (suite *LeaderObserverTestSuite) TestIgnoreBalancedSegment() { observer := suite.observer observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) @@ -265,6 +322,58 @@ func (suite *LeaderObserverTestSuite) TestSyncRemovedSegments() { } } +func (suite *LeaderObserverTestSuite) TestIgnoreSyncRemovedSegments() { + + observer := suite.observer + observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) + observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) + + segments := []*datapb.SegmentBinlogs{ + { + SegmentID: 2, + InsertChannel: "test-insert-channel", + }, + } + channels := []*datapb.VchannelInfo{ + { + CollectionID: 1, + ChannelName: "test-insert-channel", + }, + } + suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, int64(1), int64(1)).Return( + channels, segments, nil) + observer.target.UpdateCollectionNextTargetWithPartitions(int64(1), int64(1)) + + observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) + observer.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 2, 2: 2}, map[int64]*meta.Segment{})) + + expectReq := &querypb.SyncDistributionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_SyncDistribution, + }, + CollectionID: 1, + Channel: "test-insert-channel", + Actions: []*querypb.SyncAction{ + { + Type: querypb.SyncType_Remove, + SegmentID: 3, + }, + }, + } + called := atomic.NewBool(false) + suite.mockCluster.EXPECT().SyncDistribution(context.TODO(), int64(2), expectReq).Once(). + Run(func(args mock.Arguments) { called.Store(true) }). + Return(&commonpb.Status{}, nil) + + observer.Start(context.TODO()) + suite.Eventually(func() bool { + return called.Load() + }, + 10*time.Second, + 500*time.Millisecond, + ) +} + func TestLeaderObserverSuite(t *testing.T) { suite.Run(t, new(LeaderObserverTestSuite)) } diff --git a/internal/querycoordv2/task/task.go b/internal/querycoordv2/task/task.go index 5c6cae6dcc..ea164a91ee 100644 --- a/internal/querycoordv2/task/task.go +++ b/internal/querycoordv2/task/task.go @@ -40,9 +40,9 @@ const ( ) const ( - TaskPriorityLow = iota - TaskPriorityNormal - TaskPriorityHigh + TaskPriorityLow int32 = iota // for balance checker + TaskPriorityNormal // for segment checker + TaskPriorityHigh // for channel checker ) var (