mirror of https://github.com/milvus-io/milvus.git
issue: #31480 #31481 pr: #31540 release duplicate l0 segment task, which execute on old delegator may cause segment lack, and execute on new delegator may break new delegator's leader view. This PR skip release duplicate l0 segment by segment_checker, cause l0 segment will be released with unsub channel --------- Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/31645/head
parent
55bc7207ed
commit
ad07289819
|
@ -232,6 +232,11 @@ func (c *SegmentChecker) getSealedSegmentDiff(
|
|||
|
||||
// l0 Segment which exist on current target, but not on dist
|
||||
for segmentID, segment := range currentTargetMap {
|
||||
// to avoid generate duplicate segment task
|
||||
if nextTargetMap[segmentID] != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
node, existInDist := distMap[segmentID]
|
||||
l0WithWrongLocation := false
|
||||
if existInDist && segment.GetLevel() == datapb.SegmentLevel_L0 {
|
||||
|
@ -248,6 +253,8 @@ func (c *SegmentChecker) getSealedSegmentDiff(
|
|||
for _, segment := range dist {
|
||||
_, existOnCurrent := currentTargetMap[segment.GetID()]
|
||||
_, existOnNext := nextTargetMap[segment.GetID()]
|
||||
|
||||
// l0 segment should be release with channel together
|
||||
if !existOnNext && !existOnCurrent {
|
||||
toRelease = append(toRelease, segment)
|
||||
}
|
||||
|
@ -284,6 +291,14 @@ func (c *SegmentChecker) findRepeatedSealedSegments(replicaID int64) []*meta.Seg
|
|||
dist := c.getSealedSegmentsDist(replica)
|
||||
versions := make(map[int64]*meta.Segment)
|
||||
for _, s := range dist {
|
||||
// l0 segment should be release with channel together
|
||||
segment := c.targetMgr.GetSealedSegment(s.GetCollectionID(), s.GetID(), meta.CurrentTargetFirst)
|
||||
existInTarget := segment != nil
|
||||
isL0Segment := existInTarget && segment.GetLevel() == datapb.SegmentLevel_L0
|
||||
if isL0Segment {
|
||||
continue
|
||||
}
|
||||
|
||||
maxVer, ok := versions[s.GetID()]
|
||||
if !ok {
|
||||
versions[s.GetID()] = s
|
||||
|
|
|
@ -240,22 +240,17 @@ func (suite *SegmentCheckerTestSuite) TestLoadL0Segments() {
|
|||
// seg l0 segment exist on a non delegator node
|
||||
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
|
||||
checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, 1, "test-insert-channel"))
|
||||
// test load l0 segments to delegator and release the one on non delegator
|
||||
// test load l0 segments to delegator
|
||||
tasks = checker.Check(context.TODO())
|
||||
suite.Len(tasks, 2)
|
||||
for _, t := range tasks {
|
||||
suite.Len(t.Actions(), 1)
|
||||
action, ok = t.Actions()[0].(*task.SegmentAction)
|
||||
suite.True(ok)
|
||||
suite.EqualValues(1, tasks[0].ReplicaID())
|
||||
suite.EqualValues(1, action.SegmentID())
|
||||
suite.Equal(t.Priority(), task.TaskPriorityNormal)
|
||||
if action.Type() == task.ActionTypeGrow {
|
||||
suite.EqualValues(2, action.Node())
|
||||
} else {
|
||||
suite.EqualValues(1, action.Node())
|
||||
}
|
||||
}
|
||||
suite.Len(tasks, 1)
|
||||
suite.Len(tasks[0].Actions(), 1)
|
||||
action, ok = tasks[0].Actions()[0].(*task.SegmentAction)
|
||||
suite.True(ok)
|
||||
suite.EqualValues(1, tasks[0].ReplicaID())
|
||||
suite.Equal(task.ActionTypeGrow, action.Type())
|
||||
suite.EqualValues(1, action.SegmentID())
|
||||
suite.EqualValues(2, action.Node())
|
||||
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
|
||||
}
|
||||
|
||||
func (suite *SegmentCheckerTestSuite) TestReleaseL0Segments() {
|
||||
|
@ -309,6 +304,18 @@ func (suite *SegmentCheckerTestSuite) TestReleaseL0Segments() {
|
|||
|
||||
// release duplicate l0 segment
|
||||
tasks := checker.Check(context.TODO())
|
||||
suite.Len(tasks, 0)
|
||||
|
||||
checker.dist.SegmentDistManager.Update(1)
|
||||
|
||||
// test release l0 segment which doesn't exist in target
|
||||
suite.broker.ExpectedCalls = nil
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
|
||||
channels, nil, nil)
|
||||
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
|
||||
checker.targetMgr.UpdateCollectionCurrentTarget(int64(1))
|
||||
|
||||
tasks = checker.Check(context.TODO())
|
||||
suite.Len(tasks, 1)
|
||||
suite.Len(tasks[0].Actions(), 1)
|
||||
action, ok := tasks[0].Actions()[0].(*task.SegmentAction)
|
||||
|
@ -316,7 +323,7 @@ func (suite *SegmentCheckerTestSuite) TestReleaseL0Segments() {
|
|||
suite.EqualValues(1, tasks[0].ReplicaID())
|
||||
suite.Equal(task.ActionTypeReduce, action.Type())
|
||||
suite.EqualValues(1, action.SegmentID())
|
||||
suite.EqualValues(1, action.Node())
|
||||
suite.EqualValues(2, action.Node())
|
||||
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
|
||||
|
||||
checker.dist.SegmentDistManager.Update(1)
|
||||
|
|
|
@ -197,18 +197,19 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
|
|||
)
|
||||
|
||||
// Get shard leader for the given replica and segment
|
||||
leaderID, ok := getShardLeader(ex.meta.ReplicaManager, ex.dist, task.CollectionID(), action.Node(), task.Shard())
|
||||
if !ok {
|
||||
replica := ex.meta.ReplicaManager.GetByCollectionAndNode(task.CollectionID(), action.Node())
|
||||
view := ex.dist.LeaderViewManager.GetLatestLeadersByReplicaShard(replica, action.Shard())
|
||||
if view == nil {
|
||||
msg := "no shard leader for the segment to execute loading"
|
||||
err = merr.WrapErrChannelNotFound(task.Shard(), "shard delegator not found")
|
||||
log.Warn(msg, zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log = log.With(zap.Int64("shardLeader", leaderID))
|
||||
log = log.With(zap.Int64("shardLeader", view.ID))
|
||||
|
||||
startTs := time.Now()
|
||||
log.Info("load segments...")
|
||||
status, err := ex.cluster.LoadSegments(task.Context(), leaderID, req)
|
||||
status, err := ex.cluster.LoadSegments(task.Context(), view.ID, req)
|
||||
err = merr.CheckRPCCall(status, err)
|
||||
if err != nil {
|
||||
log.Warn("failed to load segment", zap.Error(err))
|
||||
|
@ -248,13 +249,21 @@ func (ex *Executor) releaseSegment(task *SegmentTask, step int) {
|
|||
req.Shard = task.shard
|
||||
|
||||
if ex.meta.CollectionManager.Exist(task.CollectionID()) {
|
||||
leader, ok := getShardLeader(ex.meta.ReplicaManager, ex.dist, task.CollectionID(), action.Node(), req.GetShard())
|
||||
if !ok {
|
||||
log.Warn("no shard leader for the segment to execute releasing", zap.String("shard", req.GetShard()))
|
||||
// leader, ok := getShardLeader(ex.meta.ReplicaManager, ex.dist, task.CollectionID(), action.Node(), req.GetShard())
|
||||
// if !ok {
|
||||
// log.Warn("no shard leader for the segment to execute releasing", zap.String("shard", req.GetShard()))
|
||||
// return
|
||||
// }
|
||||
replica := ex.meta.ReplicaManager.GetByCollectionAndNode(task.CollectionID(), action.Node())
|
||||
view := ex.dist.LeaderViewManager.GetLatestLeadersByReplicaShard(replica, action.Shard())
|
||||
if view == nil {
|
||||
msg := "no shard leader for the segment to execute releasing"
|
||||
err := merr.WrapErrChannelNotFound(task.Shard(), "shard delegator not found")
|
||||
log.Warn(msg, zap.Error(err))
|
||||
return
|
||||
}
|
||||
dstNode = leader
|
||||
log = log.With(zap.Int64("shardLeader", leader))
|
||||
dstNode = view.ID
|
||||
log = log.With(zap.Int64("shardLeader", view.ID))
|
||||
req.NeedTransfer = true
|
||||
}
|
||||
}
|
||||
|
|
|
@ -429,6 +429,7 @@ func (suite *TaskSuite) TestLoadSegmentTask() {
|
|||
CollectionID: suite.collection,
|
||||
ChannelName: channel.ChannelName,
|
||||
}))
|
||||
suite.dist.LeaderViewManager.Update(targetNode, utils.CreateTestLeaderView(targetNode, suite.collection, channel.ChannelName, map[int64]int64{}, map[int64]*meta.Segment{}))
|
||||
tasks := []Task{}
|
||||
segments := make([]*datapb.SegmentInfo, 0)
|
||||
for _, segment := range suite.loadSegments {
|
||||
|
@ -529,6 +530,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskNotIndex() {
|
|||
CollectionID: suite.collection,
|
||||
ChannelName: channel.ChannelName,
|
||||
}))
|
||||
suite.dist.LeaderViewManager.Update(targetNode, utils.CreateTestLeaderView(targetNode, suite.collection, channel.ChannelName, map[int64]int64{}, map[int64]*meta.Segment{}))
|
||||
tasks := []Task{}
|
||||
segments := make([]*datapb.SegmentInfo, 0)
|
||||
for _, segment := range suite.loadSegments {
|
||||
|
@ -623,6 +625,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskFailed() {
|
|||
CollectionID: suite.collection,
|
||||
ChannelName: channel.ChannelName,
|
||||
}))
|
||||
suite.dist.LeaderViewManager.Update(targetNode, utils.CreateTestLeaderView(targetNode, suite.collection, channel.ChannelName, map[int64]int64{}, map[int64]*meta.Segment{}))
|
||||
tasks := []Task{}
|
||||
segments := make([]*datapb.SegmentInfo, 0)
|
||||
for _, segment := range suite.loadSegments {
|
||||
|
@ -1005,6 +1008,7 @@ func (suite *TaskSuite) TestTaskCanceled() {
|
|||
CollectionID: suite.collection,
|
||||
ChannelName: channel.ChannelName,
|
||||
}))
|
||||
suite.dist.LeaderViewManager.Update(targetNode, utils.CreateTestLeaderView(targetNode, suite.collection, channel.ChannelName, map[int64]int64{}, map[int64]*meta.Segment{}))
|
||||
tasks := []Task{}
|
||||
segmentInfos := []*datapb.SegmentInfo{}
|
||||
for _, segment := range suite.loadSegments {
|
||||
|
@ -1097,6 +1101,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() {
|
|||
CollectionID: suite.collection,
|
||||
ChannelName: channel.ChannelName,
|
||||
}))
|
||||
suite.dist.LeaderViewManager.Update(targetNode, utils.CreateTestLeaderView(targetNode, suite.collection, channel.ChannelName, map[int64]int64{}, map[int64]*meta.Segment{}))
|
||||
tasks := []Task{}
|
||||
segments := make([]*datapb.SegmentInfo, 0)
|
||||
for _, segment := range suite.loadSegments {
|
||||
|
|
|
@ -188,17 +188,18 @@ func (s *BalanceTestSuit) TestBalanceOnSingleReplica() {
|
|||
return len(resp.Channels) == 1 && len(resp.Segments) >= 2
|
||||
}, 30*time.Second, 1*time.Second)
|
||||
|
||||
// check total segment number
|
||||
// check total segment number and total channel number
|
||||
s.Eventually(func() bool {
|
||||
count := 0
|
||||
segNum, chNum := 0, 0
|
||||
for _, node := range s.Cluster.GetAllQueryNodes() {
|
||||
resp1, err := node.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{})
|
||||
s.NoError(err)
|
||||
s.True(merr.Ok(resp1.GetStatus()))
|
||||
count += len(resp1.Segments)
|
||||
segNum += len(resp1.Segments)
|
||||
chNum += len(resp1.Channels)
|
||||
}
|
||||
return count == 8
|
||||
}, 10*time.Second, 1*time.Second)
|
||||
return segNum == 8 && chNum == 2
|
||||
}, 30*time.Second, 1*time.Second)
|
||||
}
|
||||
|
||||
func (s *BalanceTestSuit) TestBalanceOnMultiReplica() {
|
||||
|
@ -244,17 +245,18 @@ func (s *BalanceTestSuit) TestBalanceOnMultiReplica() {
|
|||
return len(resp.Channels) == 1 && len(resp.Segments) >= 2
|
||||
}, 30*time.Second, 1*time.Second)
|
||||
|
||||
// check total segment num
|
||||
// check total segment number and total channel number
|
||||
s.Eventually(func() bool {
|
||||
count := 0
|
||||
segNum, chNum := 0, 0
|
||||
for _, node := range s.Cluster.GetAllQueryNodes() {
|
||||
resp1, err := node.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{})
|
||||
s.NoError(err)
|
||||
s.True(merr.Ok(resp1.GetStatus()))
|
||||
count += len(resp1.Segments)
|
||||
segNum += len(resp1.Segments)
|
||||
chNum += len(resp1.Channels)
|
||||
}
|
||||
return count == 16
|
||||
}, 10*time.Second, 1*time.Second)
|
||||
return segNum == 16 && chNum == 4
|
||||
}, 30*time.Second, 1*time.Second)
|
||||
}
|
||||
|
||||
func (s *BalanceTestSuit) TestNodeDown() {
|
||||
|
|
Loading…
Reference in New Issue