mirror of https://github.com/milvus-io/milvus.git
fix: Skip release duplicate l0 segment (#31540)
issue: #31480 #31481 release duplicate l0 segment task, which execute on old delegator may cause segment lack, and execute on new delegator may break new delegator's leader view. This PR skip release duplicate l0 segment by segment_checker, cause l0 segment will be released with unsub channel --------- Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/31615/head^2
parent
d995b3f0fa
commit
5d752498e7
|
@ -218,15 +218,33 @@ func (c *SegmentChecker) getSealedSegmentDiff(
|
|||
|
||||
// Segment which exist on next target, but not on dist
|
||||
for segmentID, segment := range nextTargetMap {
|
||||
leader := c.dist.LeaderViewManager.GetLatestLeadersByReplicaShard(replica,
|
||||
segment.GetInsertChannel(),
|
||||
)
|
||||
node, ok := distMap[segmentID]
|
||||
if !ok ||
|
||||
node, existInDist := distMap[segmentID]
|
||||
l0WithWrongLocation := false
|
||||
if existInDist && segment.GetLevel() == datapb.SegmentLevel_L0 {
|
||||
// the L0 segments have to been in the same node as the channel watched
|
||||
leader != nil &&
|
||||
segment.GetLevel() == datapb.SegmentLevel_L0 &&
|
||||
node != leader.ID {
|
||||
leader := c.dist.LeaderViewManager.GetLatestLeadersByReplicaShard(replica, segment.GetInsertChannel())
|
||||
l0WithWrongLocation = leader != nil && node != leader.ID
|
||||
}
|
||||
if !existInDist || l0WithWrongLocation {
|
||||
toLoad = append(toLoad, segment)
|
||||
}
|
||||
}
|
||||
|
||||
// l0 Segment which exist on current target, but not on dist
|
||||
for segmentID, segment := range currentTargetMap {
|
||||
// to avoid generate duplicate segment task
|
||||
if nextTargetMap[segmentID] != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
node, existInDist := distMap[segmentID]
|
||||
l0WithWrongLocation := false
|
||||
if existInDist && segment.GetLevel() == datapb.SegmentLevel_L0 {
|
||||
// the L0 segments have to been in the same node as the channel watched
|
||||
leader := c.dist.LeaderViewManager.GetLatestLeadersByReplicaShard(replica, segment.GetInsertChannel())
|
||||
l0WithWrongLocation = leader != nil && node != leader.ID
|
||||
}
|
||||
if !existInDist || l0WithWrongLocation {
|
||||
toLoad = append(toLoad, segment)
|
||||
}
|
||||
}
|
||||
|
@ -236,13 +254,8 @@ func (c *SegmentChecker) getSealedSegmentDiff(
|
|||
_, existOnCurrent := currentTargetMap[segment.GetID()]
|
||||
_, existOnNext := nextTargetMap[segment.GetID()]
|
||||
|
||||
l0WithWrongLocation := false
|
||||
if existOnCurrent {
|
||||
leader := c.dist.LeaderViewManager.GetLatestLeadersByReplicaShard(replica, segment.GetInsertChannel())
|
||||
l0WithWrongLocation = segment.GetLevel() == datapb.SegmentLevel_L0 && segment.Node != leader.ID
|
||||
}
|
||||
|
||||
if !existOnNext && !existOnCurrent || l0WithWrongLocation {
|
||||
// l0 segment should be release with channel together
|
||||
if !existOnNext && !existOnCurrent {
|
||||
toRelease = append(toRelease, segment)
|
||||
}
|
||||
}
|
||||
|
@ -278,6 +291,14 @@ func (c *SegmentChecker) findRepeatedSealedSegments(replicaID int64) []*meta.Seg
|
|||
dist := c.getSealedSegmentsDist(replica)
|
||||
versions := make(map[int64]*meta.Segment)
|
||||
for _, s := range dist {
|
||||
// l0 segment should be release with channel together
|
||||
segment := c.targetMgr.GetSealedSegment(s.GetCollectionID(), s.GetID(), meta.CurrentTargetFirst)
|
||||
existInTarget := segment != nil
|
||||
isL0Segment := existInTarget && segment.GetLevel() == datapb.SegmentLevel_L0
|
||||
if isL0Segment {
|
||||
continue
|
||||
}
|
||||
|
||||
maxVer, ok := versions[s.GetID()]
|
||||
if !ok {
|
||||
versions[s.GetID()] = s
|
||||
|
|
|
@ -210,10 +210,9 @@ func (suite *SegmentCheckerTestSuite) TestLoadL0Segments() {
|
|||
|
||||
// set dist
|
||||
checker.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
|
||||
// seg l0 segment exist on a non delegator node
|
||||
checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, 1, "test-insert-channel"))
|
||||
checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}))
|
||||
|
||||
// test load l0 segments in next target
|
||||
tasks := checker.Check(context.TODO())
|
||||
suite.Len(tasks, 1)
|
||||
suite.Len(tasks[0].Actions(), 1)
|
||||
|
@ -225,17 +224,106 @@ func (suite *SegmentCheckerTestSuite) TestLoadL0Segments() {
|
|||
suite.EqualValues(2, action.Node())
|
||||
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
|
||||
|
||||
// release duplicate l0 segment
|
||||
checker.dist.SegmentDistManager.Update(2, utils.CreateTestSegment(1, 1, 1, 2, 100, "test-insert-channel"))
|
||||
checker.targetMgr.UpdateCollectionCurrentTarget(int64(1))
|
||||
// test load l0 segments in current target
|
||||
tasks = checker.Check(context.TODO())
|
||||
suite.Len(tasks, 1)
|
||||
suite.Len(tasks[0].Actions(), 1)
|
||||
action, ok = tasks[0].Actions()[0].(*task.SegmentAction)
|
||||
suite.True(ok)
|
||||
suite.EqualValues(1, tasks[0].ReplicaID())
|
||||
suite.Equal(task.ActionTypeGrow, action.Type())
|
||||
suite.EqualValues(1, action.SegmentID())
|
||||
suite.EqualValues(2, action.Node())
|
||||
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
|
||||
|
||||
// seg l0 segment exist on a non delegator node
|
||||
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
|
||||
checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, 1, "test-insert-channel"))
|
||||
// test load l0 segments to delegator
|
||||
tasks = checker.Check(context.TODO())
|
||||
suite.Len(tasks, 1)
|
||||
suite.Len(tasks[0].Actions(), 1)
|
||||
action, ok = tasks[0].Actions()[0].(*task.SegmentAction)
|
||||
suite.True(ok)
|
||||
suite.EqualValues(1, tasks[0].ReplicaID())
|
||||
suite.Equal(task.ActionTypeGrow, action.Type())
|
||||
suite.EqualValues(1, action.SegmentID())
|
||||
suite.EqualValues(2, action.Node())
|
||||
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
|
||||
}
|
||||
|
||||
func (suite *SegmentCheckerTestSuite) TestReleaseL0Segments() {
|
||||
checker := suite.checker
|
||||
// set meta
|
||||
checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
|
||||
checker.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
|
||||
checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
||||
NodeID: 1,
|
||||
Address: "localhost",
|
||||
Hostname: "localhost",
|
||||
}))
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
||||
NodeID: 2,
|
||||
Address: "localhost",
|
||||
Hostname: "localhost",
|
||||
}))
|
||||
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
|
||||
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)
|
||||
|
||||
// set target
|
||||
segments := []*datapb.SegmentInfo{
|
||||
{
|
||||
ID: 1,
|
||||
PartitionID: 1,
|
||||
InsertChannel: "test-insert-channel",
|
||||
Level: datapb.SegmentLevel_L0,
|
||||
},
|
||||
}
|
||||
|
||||
channels := []*datapb.VchannelInfo{
|
||||
{
|
||||
CollectionID: 1,
|
||||
ChannelName: "test-insert-channel",
|
||||
},
|
||||
}
|
||||
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
|
||||
channels, segments, nil)
|
||||
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
|
||||
checker.targetMgr.UpdateCollectionCurrentTarget(int64(1))
|
||||
|
||||
// set dist
|
||||
checker.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
|
||||
checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}))
|
||||
|
||||
// seg l0 segment exist on a non delegator node
|
||||
checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, 1, "test-insert-channel"))
|
||||
checker.dist.SegmentDistManager.Update(2, utils.CreateTestSegment(1, 1, 1, 2, 100, "test-insert-channel"))
|
||||
|
||||
// release duplicate l0 segment
|
||||
tasks := checker.Check(context.TODO())
|
||||
suite.Len(tasks, 0)
|
||||
|
||||
checker.dist.SegmentDistManager.Update(1)
|
||||
|
||||
// test release l0 segment which doesn't exist in target
|
||||
suite.broker.ExpectedCalls = nil
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
|
||||
channels, nil, nil)
|
||||
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
|
||||
checker.targetMgr.UpdateCollectionCurrentTarget(int64(1))
|
||||
|
||||
tasks = checker.Check(context.TODO())
|
||||
suite.Len(tasks, 1)
|
||||
suite.Len(tasks[0].Actions(), 1)
|
||||
action, ok := tasks[0].Actions()[0].(*task.SegmentAction)
|
||||
suite.True(ok)
|
||||
suite.EqualValues(1, tasks[0].ReplicaID())
|
||||
suite.Equal(task.ActionTypeReduce, action.Type())
|
||||
suite.EqualValues(1, action.SegmentID())
|
||||
suite.EqualValues(1, action.Node())
|
||||
suite.EqualValues(2, action.Node())
|
||||
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
|
||||
}
|
||||
|
||||
|
|
|
@ -197,18 +197,19 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
|
|||
)
|
||||
|
||||
// Get shard leader for the given replica and segment
|
||||
leaderID, ok := getShardLeader(ex.meta.ReplicaManager, ex.dist, task.CollectionID(), action.Node(), task.Shard())
|
||||
if !ok {
|
||||
replica := ex.meta.ReplicaManager.GetByCollectionAndNode(task.CollectionID(), action.Node())
|
||||
view := ex.dist.LeaderViewManager.GetLatestLeadersByReplicaShard(replica, action.Shard())
|
||||
if view == nil {
|
||||
msg := "no shard leader for the segment to execute loading"
|
||||
err = merr.WrapErrChannelNotFound(task.Shard(), "shard delegator not found")
|
||||
log.Warn(msg, zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log = log.With(zap.Int64("shardLeader", leaderID))
|
||||
log = log.With(zap.Int64("shardLeader", view.ID))
|
||||
|
||||
startTs := time.Now()
|
||||
log.Info("load segments...")
|
||||
status, err := ex.cluster.LoadSegments(task.Context(), leaderID, req)
|
||||
status, err := ex.cluster.LoadSegments(task.Context(), view.ID, req)
|
||||
err = merr.CheckRPCCall(status, err)
|
||||
if err != nil {
|
||||
log.Warn("failed to load segment", zap.Error(err))
|
||||
|
@ -248,13 +249,21 @@ func (ex *Executor) releaseSegment(task *SegmentTask, step int) {
|
|||
req.Shard = task.shard
|
||||
|
||||
if ex.meta.CollectionManager.Exist(task.CollectionID()) {
|
||||
leader, ok := getShardLeader(ex.meta.ReplicaManager, ex.dist, task.CollectionID(), action.Node(), req.GetShard())
|
||||
if !ok {
|
||||
log.Warn("no shard leader for the segment to execute releasing", zap.String("shard", req.GetShard()))
|
||||
// leader, ok := getShardLeader(ex.meta.ReplicaManager, ex.dist, task.CollectionID(), action.Node(), req.GetShard())
|
||||
// if !ok {
|
||||
// log.Warn("no shard leader for the segment to execute releasing", zap.String("shard", req.GetShard()))
|
||||
// return
|
||||
// }
|
||||
replica := ex.meta.ReplicaManager.GetByCollectionAndNode(task.CollectionID(), action.Node())
|
||||
view := ex.dist.LeaderViewManager.GetLatestLeadersByReplicaShard(replica, action.Shard())
|
||||
if view == nil {
|
||||
msg := "no shard leader for the segment to execute releasing"
|
||||
err := merr.WrapErrChannelNotFound(task.Shard(), "shard delegator not found")
|
||||
log.Warn(msg, zap.Error(err))
|
||||
return
|
||||
}
|
||||
dstNode = leader
|
||||
log = log.With(zap.Int64("shardLeader", leader))
|
||||
dstNode = view.ID
|
||||
log = log.With(zap.Int64("shardLeader", view.ID))
|
||||
req.NeedTransfer = true
|
||||
}
|
||||
}
|
||||
|
|
|
@ -429,6 +429,7 @@ func (suite *TaskSuite) TestLoadSegmentTask() {
|
|||
CollectionID: suite.collection,
|
||||
ChannelName: channel.ChannelName,
|
||||
}))
|
||||
suite.dist.LeaderViewManager.Update(targetNode, utils.CreateTestLeaderView(targetNode, suite.collection, channel.ChannelName, map[int64]int64{}, map[int64]*meta.Segment{}))
|
||||
tasks := []Task{}
|
||||
segments := make([]*datapb.SegmentInfo, 0)
|
||||
for _, segment := range suite.loadSegments {
|
||||
|
@ -529,6 +530,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskNotIndex() {
|
|||
CollectionID: suite.collection,
|
||||
ChannelName: channel.ChannelName,
|
||||
}))
|
||||
suite.dist.LeaderViewManager.Update(targetNode, utils.CreateTestLeaderView(targetNode, suite.collection, channel.ChannelName, map[int64]int64{}, map[int64]*meta.Segment{}))
|
||||
tasks := []Task{}
|
||||
segments := make([]*datapb.SegmentInfo, 0)
|
||||
for _, segment := range suite.loadSegments {
|
||||
|
@ -623,6 +625,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskFailed() {
|
|||
CollectionID: suite.collection,
|
||||
ChannelName: channel.ChannelName,
|
||||
}))
|
||||
suite.dist.LeaderViewManager.Update(targetNode, utils.CreateTestLeaderView(targetNode, suite.collection, channel.ChannelName, map[int64]int64{}, map[int64]*meta.Segment{}))
|
||||
tasks := []Task{}
|
||||
segments := make([]*datapb.SegmentInfo, 0)
|
||||
for _, segment := range suite.loadSegments {
|
||||
|
@ -1005,6 +1008,7 @@ func (suite *TaskSuite) TestTaskCanceled() {
|
|||
CollectionID: suite.collection,
|
||||
ChannelName: channel.ChannelName,
|
||||
}))
|
||||
suite.dist.LeaderViewManager.Update(targetNode, utils.CreateTestLeaderView(targetNode, suite.collection, channel.ChannelName, map[int64]int64{}, map[int64]*meta.Segment{}))
|
||||
tasks := []Task{}
|
||||
segmentInfos := []*datapb.SegmentInfo{}
|
||||
for _, segment := range suite.loadSegments {
|
||||
|
@ -1097,6 +1101,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() {
|
|||
CollectionID: suite.collection,
|
||||
ChannelName: channel.ChannelName,
|
||||
}))
|
||||
suite.dist.LeaderViewManager.Update(targetNode, utils.CreateTestLeaderView(targetNode, suite.collection, channel.ChannelName, map[int64]int64{}, map[int64]*meta.Segment{}))
|
||||
tasks := []Task{}
|
||||
segments := make([]*datapb.SegmentInfo, 0)
|
||||
for _, segment := range suite.loadSegments {
|
||||
|
|
|
@ -188,17 +188,18 @@ func (s *BalanceTestSuit) TestBalanceOnSingleReplica() {
|
|||
return len(resp.Channels) == 1 && len(resp.Segments) >= 2
|
||||
}, 30*time.Second, 1*time.Second)
|
||||
|
||||
// check total segment number
|
||||
// check total segment number and total channel number
|
||||
s.Eventually(func() bool {
|
||||
count := 0
|
||||
segNum, chNum := 0, 0
|
||||
for _, node := range s.Cluster.GetAllQueryNodes() {
|
||||
resp1, err := node.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{})
|
||||
s.NoError(err)
|
||||
s.True(merr.Ok(resp1.GetStatus()))
|
||||
count += len(resp1.Segments)
|
||||
segNum += len(resp1.Segments)
|
||||
chNum += len(resp1.Channels)
|
||||
}
|
||||
return count == 8
|
||||
}, 10*time.Second, 1*time.Second)
|
||||
return segNum == 8 && chNum == 2
|
||||
}, 30*time.Second, 1*time.Second)
|
||||
}
|
||||
|
||||
func (s *BalanceTestSuit) TestBalanceOnMultiReplica() {
|
||||
|
@ -244,17 +245,18 @@ func (s *BalanceTestSuit) TestBalanceOnMultiReplica() {
|
|||
return len(resp.Channels) == 1 && len(resp.Segments) >= 2
|
||||
}, 30*time.Second, 1*time.Second)
|
||||
|
||||
// check total segment num
|
||||
// check total segment number and total channel number
|
||||
s.Eventually(func() bool {
|
||||
count := 0
|
||||
segNum, chNum := 0, 0
|
||||
for _, node := range s.Cluster.GetAllQueryNodes() {
|
||||
resp1, err := node.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{})
|
||||
s.NoError(err)
|
||||
s.True(merr.Ok(resp1.GetStatus()))
|
||||
count += len(resp1.Segments)
|
||||
segNum += len(resp1.Segments)
|
||||
chNum += len(resp1.Channels)
|
||||
}
|
||||
return count == 16
|
||||
}, 10*time.Second, 1*time.Second)
|
||||
return segNum == 16 && chNum == 4
|
||||
}, 30*time.Second, 1*time.Second)
|
||||
}
|
||||
|
||||
func (s *BalanceTestSuit) TestNodeDown() {
|
||||
|
|
Loading…
Reference in New Issue