fix leader observer sync logic (#20478)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/20514/head
wei liu 2022-11-11 11:43:06 +08:00 committed by GitHub
parent 701e7b5a6e
commit eaa5cfdcb5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 125 additions and 6 deletions

View File

@ -71,6 +71,7 @@ func (c *SegmentChecker) Check(ctx context.Context) []task.Task {
segments := c.dist.SegmentDistManager.GetAll()
released := utils.FilterReleased(segments, collectionIDs)
tasks = append(tasks, c.createSegmentReduceTasks(ctx, released, -1, querypb.DataScope_All)...)
task.SetPriority(task.TaskPriorityNormal, tasks...)
return tasks
}

View File

@ -117,6 +117,7 @@ func (suite *SegmentCheckerTestSuite) TestLoadSegments() {
suite.EqualValues(1, tasks[0].ReplicaID())
suite.Equal(task.ActionTypeGrow, action.Type())
suite.EqualValues(1, action.SegmentID())
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
}
@ -139,6 +140,7 @@ func (suite *SegmentCheckerTestSuite) TestReleaseSegments() {
suite.EqualValues(1, tasks[0].ReplicaID())
suite.Equal(task.ActionTypeReduce, action.Type())
suite.EqualValues(2, action.SegmentID())
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
}
func (suite *SegmentCheckerTestSuite) TestReleaseRepeatedSegments() {
@ -173,6 +175,7 @@ func (suite *SegmentCheckerTestSuite) TestReleaseRepeatedSegments() {
suite.Equal(task.ActionTypeReduce, action.Type())
suite.EqualValues(1, action.SegmentID())
suite.EqualValues(1, action.Node())
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
// test less version exist on leader
checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{1: 1}, map[int64]*meta.Segment{}))
@ -231,6 +234,7 @@ func (suite *SegmentCheckerTestSuite) TestReleaseGrowingSegments() {
suite.Equal(task.ActionTypeReduce, action.Type())
suite.EqualValues(2, action.SegmentID())
suite.EqualValues(2, action.Node())
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
suite.Len(tasks[1].Actions(), 1)
action, ok = tasks[1].Actions()[0].(*task.SegmentAction)
@ -239,6 +243,7 @@ func (suite *SegmentCheckerTestSuite) TestReleaseGrowingSegments() {
suite.Equal(task.ActionTypeReduce, action.Type())
suite.EqualValues(3, action.SegmentID())
suite.EqualValues(2, action.Node())
suite.Equal(tasks[1].Priority(), task.TaskPriorityNormal)
}
func (suite *SegmentCheckerTestSuite) TestReleaseDroppedSegments() {
@ -253,6 +258,7 @@ func (suite *SegmentCheckerTestSuite) TestReleaseDroppedSegments() {
suite.Equal(task.ActionTypeReduce, action.Type())
suite.EqualValues(1, action.SegmentID())
suite.EqualValues(1, action.Node())
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
}
func TestSegmentCheckerSuite(t *testing.T) {

View File

@ -105,8 +105,9 @@ func (o *LeaderObserver) findNeedLoadedSegments(leaderView *meta.LeaderView, dis
dists = utils.FindMaxVersionSegments(dists)
for _, s := range dists {
version, ok := leaderView.Segments[s.GetID()]
if ok && version.GetVersion() >= s.Version ||
o.target.GetHistoricalSegment(s.CollectionID, s.GetID(), meta.CurrentTarget) == nil {
existInCurrentTarget := o.target.GetHistoricalSegment(s.CollectionID, s.GetID(), meta.CurrentTarget) != nil
existInNextTarget := o.target.GetHistoricalSegment(s.CollectionID, s.GetID(), meta.NextTarget) != nil
if ok && version.GetVersion() >= s.Version || (!existInCurrentTarget && !existInNextTarget) {
continue
}
ret = append(ret, &querypb.SyncAction{
@ -128,7 +129,9 @@ func (o *LeaderObserver) findNeedRemovedSegments(leaderView *meta.LeaderView, di
}
for sid := range leaderView.Segments {
_, ok := distMap[sid]
if ok || o.target.GetHistoricalSegment(leaderView.CollectionID, sid, meta.CurrentTarget) != nil {
existInCurrentTarget := o.target.GetHistoricalSegment(leaderView.CollectionID, sid, meta.CurrentTarget) != nil
existInNextTarget := o.target.GetHistoricalSegment(leaderView.CollectionID, sid, meta.NextTarget) != nil
if ok || existInCurrentTarget || existInNextTarget {
continue
}
ret = append(ret, &querypb.SyncAction{

View File

@ -129,6 +129,63 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegments() {
)
}
func (suite *LeaderObserverTestSuite) TestIgnoreSyncLoadedSegments() {
observer := suite.observer
observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
segments := []*datapb.SegmentBinlogs{
{
SegmentID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, int64(1), int64(1)).Return(
channels, segments, nil)
observer.target.UpdateCollectionNextTargetWithPartitions(int64(1), int64(1))
observer.target.UpdateCollectionCurrentTarget(1)
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel"),
utils.CreateTestSegment(1, 1, 2, 2, 1, "test-insert-channel"))
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
observer.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}))
expectReq := &querypb.SyncDistributionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SyncDistribution,
},
CollectionID: 1,
Channel: "test-insert-channel",
Actions: []*querypb.SyncAction{
{
Type: querypb.SyncType_Set,
PartitionID: 1,
SegmentID: 1,
NodeID: 1,
Version: 1,
},
},
}
called := atomic.NewBool(false)
suite.mockCluster.EXPECT().SyncDistribution(context.TODO(), int64(2), expectReq).Once().
Run(func(args mock.Arguments) { called.Store(true) }).
Return(&commonpb.Status{}, nil)
observer.Start(context.TODO())
suite.Eventually(
func() bool {
return called.Load()
},
10*time.Second,
500*time.Millisecond,
)
}
func (suite *LeaderObserverTestSuite) TestIgnoreBalancedSegment() {
observer := suite.observer
observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
@ -258,6 +315,58 @@ func (suite *LeaderObserverTestSuite) TestSyncRemovedSegments() {
}
}
func (suite *LeaderObserverTestSuite) TestIgnoreSyncRemovedSegments() {
observer := suite.observer
observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
segments := []*datapb.SegmentBinlogs{
{
SegmentID: 2,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, int64(1), int64(1)).Return(
channels, segments, nil)
observer.target.UpdateCollectionNextTargetWithPartitions(int64(1), int64(1))
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
observer.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 2, 2: 2}, map[int64]*meta.Segment{}))
expectReq := &querypb.SyncDistributionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SyncDistribution,
},
CollectionID: 1,
Channel: "test-insert-channel",
Actions: []*querypb.SyncAction{
{
Type: querypb.SyncType_Remove,
SegmentID: 3,
},
},
}
called := atomic.NewBool(false)
suite.mockCluster.EXPECT().SyncDistribution(context.TODO(), int64(2), expectReq).Once().
Run(func(args mock.Arguments) { called.Store(true) }).
Return(&commonpb.Status{}, nil)
observer.Start(context.TODO())
suite.Eventually(func() bool {
return called.Load()
},
10*time.Second,
500*time.Millisecond,
)
}
func TestLeaderObserverSuite(t *testing.T) {
suite.Run(t, new(LeaderObserverTestSuite))
}

View File

@ -40,9 +40,9 @@ const (
)
const (
TaskPriorityLow = iota
TaskPriorityNormal
TaskPriorityHigh
TaskPriorityLow int32 = iota // for balance checker
TaskPriorityNormal // for segment checker
TaskPriorityHigh // for channel checker
)
var (