fix: Transfer l0 segment to new delegator after balance (#31319)

issue: #30186

during channel balance, after new delegator loaded, instead of syncing
l0 segment's location to new delegator, we should load l0 segment on new
delegator, and release the old l0 segment, then start to release old
delegator.

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/31384/head
wei liu 2024-03-19 09:59:05 +08:00 committed by GitHub
parent d9efea2fea
commit c26c1b33c2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 234 additions and 7 deletions

View File

@ -22,6 +22,7 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
@ -121,8 +122,11 @@ func (c *LeaderChecker) findNeedLoadedSegments(ctx context.Context, replica int6
ret := make([]task.Task, 0)
dist = utils.FindMaxVersionSegments(dist)
for _, s := range dist {
existInTarget := c.target.GetSealedSegment(leaderView.CollectionID, s.GetID(), meta.CurrentTargetFirst) != nil
if !existInTarget {
segment := c.target.GetSealedSegment(leaderView.CollectionID, s.GetID(), meta.CurrentTargetFirst)
existInTarget := segment != nil
isL0Segment := existInTarget && segment.GetLevel() == datapb.SegmentLevel_L0
// should set l0 segment location to delegator. l0 segment should be reload in delegator
if !existInTarget || isL0Segment {
continue
}
@ -166,8 +170,10 @@ func (c *LeaderChecker) findNeedRemovedSegments(ctx context.Context, replica int
for sid, s := range leaderView.Segments {
_, ok := distMap[sid]
existInTarget := c.target.GetSealedSegment(leaderView.CollectionID, sid, meta.CurrentTargetFirst) != nil
if ok || existInTarget {
segment := c.target.GetSealedSegment(leaderView.CollectionID, sid, meta.CurrentTargetFirst)
existInTarget := segment != nil
isL0Segment := existInTarget && segment.GetLevel() == datapb.SegmentLevel_L0
if ok || existInTarget || isL0Segment {
continue
}
log.Debug("leader checker append a segment to remove",

View File

@ -123,6 +123,29 @@ func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegments() {
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(1))
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).Version(), loadVersion)
suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh)
// test skip sync l0 segment
segments = []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
Level: datapb.SegmentLevel_L0,
},
}
suite.broker.ExpectedCalls = nil
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
observer.target.UpdateCollectionNextTarget(int64(1))
observer.target.UpdateCollectionCurrentTarget(1)
// mock l0 segment exist on non delegator node, doesn't set to leader view
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, loadVersion, "test-insert-channel"))
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
view = utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(2, view)
tasks = suite.checker.Check(context.TODO())
suite.Len(tasks, 0)
}
func (suite *LeaderCheckerTestSuite) TestActivation() {
@ -324,6 +347,30 @@ func (suite *LeaderCheckerTestSuite) TestSyncRemovedSegments() {
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(3))
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).Version(), int64(0))
suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh)
// skip sync l0 segments
segments := []*datapb.SegmentInfo{
{
ID: 3,
PartitionID: 1,
InsertChannel: "test-insert-channel",
Level: datapb.SegmentLevel_L0,
},
}
suite.broker.ExpectedCalls = nil
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
observer.target.UpdateCollectionNextTarget(int64(1))
observer.target.UpdateCollectionCurrentTarget(1)
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
view = utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 1}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(2, view)
tasks = suite.checker.Check(context.TODO())
suite.Len(tasks, 0)
}
func (suite *LeaderCheckerTestSuite) TestIgnoreSyncRemovedSegments() {

View File

@ -236,7 +236,13 @@ func (c *SegmentChecker) getSealedSegmentDiff(
_, existOnCurrent := currentTargetMap[segment.GetID()]
_, existOnNext := nextTargetMap[segment.GetID()]
if !existOnNext && !existOnCurrent {
l0WithWrongLocation := false
if existOnCurrent {
leader := c.dist.LeaderViewManager.GetLatestLeadersByReplicaShard(replica, segment.GetInsertChannel())
l0WithWrongLocation = segment.GetLevel() == datapb.SegmentLevel_L0 && segment.Node != leader.ID
}
if !existOnNext && !existOnCurrent || l0WithWrongLocation {
toRelease = append(toRelease, segment)
}
}

View File

@ -168,6 +168,77 @@ func (suite *SegmentCheckerTestSuite) TestLoadSegments() {
suite.Len(tasks, 1)
}
func (suite *SegmentCheckerTestSuite) TestLoadL0Segments() {
checker := suite.checker
// set meta
checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
checker.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "localhost",
Hostname: "localhost",
}))
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)
// set target
segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
Level: datapb.SegmentLevel_L0,
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
// set dist
checker.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
// seg l0 segment exist on a non delegator node
checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, 1, "test-insert-channel"))
checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}))
tasks := checker.Check(context.TODO())
suite.Len(tasks, 1)
suite.Len(tasks[0].Actions(), 1)
action, ok := tasks[0].Actions()[0].(*task.SegmentAction)
suite.True(ok)
suite.EqualValues(1, tasks[0].ReplicaID())
suite.Equal(task.ActionTypeGrow, action.Type())
suite.EqualValues(1, action.SegmentID())
suite.EqualValues(2, action.Node())
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
// release duplicate l0 segment
checker.dist.SegmentDistManager.Update(2, utils.CreateTestSegment(1, 1, 1, 2, 100, "test-insert-channel"))
tasks = checker.Check(context.TODO())
suite.Len(tasks, 1)
suite.Len(tasks[0].Actions(), 1)
action, ok = tasks[0].Actions()[0].(*task.SegmentAction)
suite.True(ok)
suite.EqualValues(1, tasks[0].ReplicaID())
suite.Equal(task.ActionTypeReduce, action.Type())
suite.EqualValues(1, action.SegmentID())
suite.EqualValues(1, action.Node())
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
}
func (suite *SegmentCheckerTestSuite) TestSkipLoadSegments() {
checker := suite.checker
// set meta

View File

@ -652,8 +652,10 @@ func (scheduler *taskScheduler) preProcess(task Task) bool {
return false
}
for segmentID := range segmentsInTarget {
if _, exist := leader.Segments[segmentID]; !exist {
for segmentID, s := range segmentsInTarget {
_, exist := leader.Segments[segmentID]
l0WithWrongLocation := exist && s.GetLevel() == datapb.SegmentLevel_L0 && leader.Segments[segmentID].GetNodeID() != leader.ID
if !exist || l0WithWrongLocation {
return false
}
}

View File

@ -1697,6 +1697,101 @@ func (suite *TaskSuite) TestBalanceChannelTask() {
suite.Equal(2, task.step)
}
func (suite *TaskSuite) TestBalanceChannelWithL0SegmentTask() {
collectionID := int64(1)
partitionID := int64(1)
channel := "channel-1"
vchannel := &datapb.VchannelInfo{
CollectionID: collectionID,
ChannelName: channel,
}
segments := []*datapb.SegmentInfo{
{
ID: 1,
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannel: channel,
Level: datapb.SegmentLevel_L0,
},
{
ID: 2,
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannel: channel,
Level: datapb.SegmentLevel_L0,
},
{
ID: 3,
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannel: channel,
Level: datapb.SegmentLevel_L0,
},
}
suite.meta.PutCollection(utils.CreateTestCollection(collectionID, 1), utils.CreateTestPartition(collectionID, 1))
suite.broker.ExpectedCalls = nil
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return([]*datapb.VchannelInfo{vchannel}, segments, nil)
suite.target.UpdateCollectionNextTarget(collectionID)
suite.target.UpdateCollectionCurrentTarget(collectionID)
suite.target.UpdateCollectionNextTarget(collectionID)
suite.dist.LeaderViewManager.Update(2, &meta.LeaderView{
ID: 2,
CollectionID: collectionID,
Channel: channel,
Segments: map[int64]*querypb.SegmentDist{
1: {NodeID: 2},
2: {NodeID: 2},
3: {NodeID: 2},
},
})
suite.dist.LeaderViewManager.Update(1, &meta.LeaderView{
ID: 1,
CollectionID: collectionID,
Channel: channel,
Segments: map[int64]*querypb.SegmentDist{
1: {NodeID: 2},
2: {NodeID: 2},
3: {NodeID: 2},
},
})
task, err := NewChannelTask(context.Background(),
10*time.Second,
WrapIDSource(2),
collectionID,
1,
NewChannelAction(1, ActionTypeGrow, channel),
NewChannelAction(2, ActionTypeReduce, channel),
)
suite.NoError(err)
// l0 hasn't been loaded into delegator, block balance
suite.scheduler.preProcess(task)
suite.Equal(0, task.step)
suite.dist.LeaderViewManager.Update(1, &meta.LeaderView{
ID: 1,
CollectionID: collectionID,
Channel: channel,
Segments: map[int64]*querypb.SegmentDist{
1: {NodeID: 1},
2: {NodeID: 1},
3: {NodeID: 1},
},
})
// new delegator distribution updated, task step up
suite.scheduler.preProcess(task)
suite.Equal(1, task.step)
suite.dist.LeaderViewManager.Update(2)
// old delegator removed
suite.scheduler.preProcess(task)
suite.Equal(2, task.step)
}
func TestTask(t *testing.T) {
suite.Run(t, new(TaskSuite))
}