fix leader observer sync logic (#20478) (#21315)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/21326/head
wei liu 2022-12-20 16:33:26 +08:00 committed by GitHub
parent 01e609daff
commit 62668b891c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 125 additions and 6 deletions

View File

@ -71,6 +71,7 @@ func (c *SegmentChecker) Check(ctx context.Context) []task.Task {
segments := c.dist.SegmentDistManager.GetAll() segments := c.dist.SegmentDistManager.GetAll()
released := utils.FilterReleased(segments, collectionIDs) released := utils.FilterReleased(segments, collectionIDs)
tasks = append(tasks, c.createSegmentReduceTasks(ctx, released, -1, querypb.DataScope_All)...) tasks = append(tasks, c.createSegmentReduceTasks(ctx, released, -1, querypb.DataScope_All)...)
task.SetPriority(task.TaskPriorityNormal, tasks...)
return tasks return tasks
} }

View File

@ -124,6 +124,7 @@ func (suite *SegmentCheckerTestSuite) TestLoadSegments() {
suite.EqualValues(1, tasks[0].ReplicaID()) suite.EqualValues(1, tasks[0].ReplicaID())
suite.Equal(task.ActionTypeGrow, action.Type()) suite.Equal(task.ActionTypeGrow, action.Type())
suite.EqualValues(1, action.SegmentID()) suite.EqualValues(1, action.SegmentID())
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
} }
@ -146,6 +147,7 @@ func (suite *SegmentCheckerTestSuite) TestReleaseSegments() {
suite.EqualValues(1, tasks[0].ReplicaID()) suite.EqualValues(1, tasks[0].ReplicaID())
suite.Equal(task.ActionTypeReduce, action.Type()) suite.Equal(task.ActionTypeReduce, action.Type())
suite.EqualValues(2, action.SegmentID()) suite.EqualValues(2, action.SegmentID())
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
} }
func (suite *SegmentCheckerTestSuite) TestReleaseRepeatedSegments() { func (suite *SegmentCheckerTestSuite) TestReleaseRepeatedSegments() {
@ -180,6 +182,7 @@ func (suite *SegmentCheckerTestSuite) TestReleaseRepeatedSegments() {
suite.Equal(task.ActionTypeReduce, action.Type()) suite.Equal(task.ActionTypeReduce, action.Type())
suite.EqualValues(1, action.SegmentID()) suite.EqualValues(1, action.SegmentID())
suite.EqualValues(1, action.Node()) suite.EqualValues(1, action.Node())
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
// test less version exist on leader // test less version exist on leader
checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{1: 1}, map[int64]*meta.Segment{})) checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{1: 1}, map[int64]*meta.Segment{}))
@ -238,6 +241,7 @@ func (suite *SegmentCheckerTestSuite) TestReleaseGrowingSegments() {
suite.Equal(task.ActionTypeReduce, action.Type()) suite.Equal(task.ActionTypeReduce, action.Type())
suite.EqualValues(2, action.SegmentID()) suite.EqualValues(2, action.SegmentID())
suite.EqualValues(2, action.Node()) suite.EqualValues(2, action.Node())
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
suite.Len(tasks[1].Actions(), 1) suite.Len(tasks[1].Actions(), 1)
action, ok = tasks[1].Actions()[0].(*task.SegmentAction) action, ok = tasks[1].Actions()[0].(*task.SegmentAction)
@ -246,6 +250,7 @@ func (suite *SegmentCheckerTestSuite) TestReleaseGrowingSegments() {
suite.Equal(task.ActionTypeReduce, action.Type()) suite.Equal(task.ActionTypeReduce, action.Type())
suite.EqualValues(3, action.SegmentID()) suite.EqualValues(3, action.SegmentID())
suite.EqualValues(2, action.Node()) suite.EqualValues(2, action.Node())
suite.Equal(tasks[1].Priority(), task.TaskPriorityNormal)
} }
func (suite *SegmentCheckerTestSuite) TestReleaseDroppedSegments() { func (suite *SegmentCheckerTestSuite) TestReleaseDroppedSegments() {
@ -260,6 +265,7 @@ func (suite *SegmentCheckerTestSuite) TestReleaseDroppedSegments() {
suite.Equal(task.ActionTypeReduce, action.Type()) suite.Equal(task.ActionTypeReduce, action.Type())
suite.EqualValues(1, action.SegmentID()) suite.EqualValues(1, action.SegmentID())
suite.EqualValues(1, action.Node()) suite.EqualValues(1, action.Node())
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
} }
func TestSegmentCheckerSuite(t *testing.T) { func TestSegmentCheckerSuite(t *testing.T) {

View File

@ -105,8 +105,9 @@ func (o *LeaderObserver) findNeedLoadedSegments(leaderView *meta.LeaderView, dis
dists = utils.FindMaxVersionSegments(dists) dists = utils.FindMaxVersionSegments(dists)
for _, s := range dists { for _, s := range dists {
version, ok := leaderView.Segments[s.GetID()] version, ok := leaderView.Segments[s.GetID()]
if ok && version.GetVersion() >= s.Version || existInCurrentTarget := o.target.GetHistoricalSegment(s.CollectionID, s.GetID(), meta.CurrentTarget) != nil
o.target.GetHistoricalSegment(s.CollectionID, s.GetID(), meta.CurrentTarget) == nil { existInNextTarget := o.target.GetHistoricalSegment(s.CollectionID, s.GetID(), meta.NextTarget) != nil
if ok && version.GetVersion() >= s.Version || (!existInCurrentTarget && !existInNextTarget) {
continue continue
} }
ret = append(ret, &querypb.SyncAction{ ret = append(ret, &querypb.SyncAction{
@ -128,7 +129,9 @@ func (o *LeaderObserver) findNeedRemovedSegments(leaderView *meta.LeaderView, di
} }
for sid := range leaderView.Segments { for sid := range leaderView.Segments {
_, ok := distMap[sid] _, ok := distMap[sid]
if ok || o.target.GetHistoricalSegment(leaderView.CollectionID, sid, meta.CurrentTarget) != nil { existInCurrentTarget := o.target.GetHistoricalSegment(leaderView.CollectionID, sid, meta.CurrentTarget) != nil
existInNextTarget := o.target.GetHistoricalSegment(leaderView.CollectionID, sid, meta.NextTarget) != nil
if ok || existInCurrentTarget || existInNextTarget {
continue continue
} }
ret = append(ret, &querypb.SyncAction{ ret = append(ret, &querypb.SyncAction{

View File

@ -136,6 +136,63 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegments() {
) )
} }
func (suite *LeaderObserverTestSuite) TestIgnoreSyncLoadedSegments() {
observer := suite.observer
observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
segments := []*datapb.SegmentBinlogs{
{
SegmentID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, int64(1), int64(1)).Return(
channels, segments, nil)
observer.target.UpdateCollectionNextTargetWithPartitions(int64(1), int64(1))
observer.target.UpdateCollectionCurrentTarget(1)
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel"),
utils.CreateTestSegment(1, 1, 2, 2, 1, "test-insert-channel"))
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
observer.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}))
expectReq := &querypb.SyncDistributionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SyncDistribution,
},
CollectionID: 1,
Channel: "test-insert-channel",
Actions: []*querypb.SyncAction{
{
Type: querypb.SyncType_Set,
PartitionID: 1,
SegmentID: 1,
NodeID: 1,
Version: 1,
},
},
}
called := atomic.NewBool(false)
suite.mockCluster.EXPECT().SyncDistribution(context.TODO(), int64(2), expectReq).Once().
Run(func(args mock.Arguments) { called.Store(true) }).
Return(&commonpb.Status{}, nil)
observer.Start(context.TODO())
suite.Eventually(
func() bool {
return called.Load()
},
10*time.Second,
500*time.Millisecond,
)
}
func (suite *LeaderObserverTestSuite) TestIgnoreBalancedSegment() { func (suite *LeaderObserverTestSuite) TestIgnoreBalancedSegment() {
observer := suite.observer observer := suite.observer
observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
@ -265,6 +322,58 @@ func (suite *LeaderObserverTestSuite) TestSyncRemovedSegments() {
} }
} }
func (suite *LeaderObserverTestSuite) TestIgnoreSyncRemovedSegments() {
observer := suite.observer
observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
segments := []*datapb.SegmentBinlogs{
{
SegmentID: 2,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, int64(1), int64(1)).Return(
channels, segments, nil)
observer.target.UpdateCollectionNextTargetWithPartitions(int64(1), int64(1))
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
observer.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 2, 2: 2}, map[int64]*meta.Segment{}))
expectReq := &querypb.SyncDistributionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SyncDistribution,
},
CollectionID: 1,
Channel: "test-insert-channel",
Actions: []*querypb.SyncAction{
{
Type: querypb.SyncType_Remove,
SegmentID: 3,
},
},
}
called := atomic.NewBool(false)
suite.mockCluster.EXPECT().SyncDistribution(context.TODO(), int64(2), expectReq).Once().
Run(func(args mock.Arguments) { called.Store(true) }).
Return(&commonpb.Status{}, nil)
observer.Start(context.TODO())
suite.Eventually(func() bool {
return called.Load()
},
10*time.Second,
500*time.Millisecond,
)
}
func TestLeaderObserverSuite(t *testing.T) { func TestLeaderObserverSuite(t *testing.T) {
suite.Run(t, new(LeaderObserverTestSuite)) suite.Run(t, new(LeaderObserverTestSuite))
} }

View File

@ -40,9 +40,9 @@ const (
) )
const ( const (
TaskPriorityLow = iota TaskPriorityLow int32 = iota // for balance checker
TaskPriorityNormal TaskPriorityNormal // for segment checker
TaskPriorityHigh TaskPriorityHigh // for channel checker
) )
var ( var (