mirror of https://github.com/milvus-io/milvus.git
enhance: Release compacted growing segment if in dropped list (#37245)
See also #37205 Previously releasing growing segments could be triggered by two conditions: - Sealed Segment with same id is loaded - Segment start position is before target checkpoint ts Which has a worst case that the corresponding sealed segment is compacted and the checkpoint is pinned by a growing l0 segment. This PR introduces a new rule that: a growing segment could be released if the segment id appeared in current target dropped segment id list. Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/37221/head
parent
7dd6651124
commit
9539739781
|
@ -36,6 +36,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
)
|
||||
|
||||
const initialTargetVersion = int64(0)
|
||||
|
@ -203,6 +204,12 @@ func (c *SegmentChecker) getGrowingSegmentDiff(collectionID int64,
|
|||
if channel, ok := currentTargetChannelMap[segment.InsertChannel]; ok {
|
||||
timestampInSegment := segment.GetStartPosition().GetTimestamp()
|
||||
timestampInTarget := channel.GetSeekPosition().GetTimestamp()
|
||||
// release growing segment if in dropped segment list
|
||||
if funcutil.SliceContain(channel.GetDroppedSegmentIds(), segment.GetID()) {
|
||||
log.Info("growing segment exists in dropped segment list, release it", zap.Int64("segmentID", segment.GetID()))
|
||||
toRelease = append(toRelease, segment)
|
||||
continue
|
||||
}
|
||||
// filter toRelease which seekPosition is newer than next target dmChannel
|
||||
if timestampInSegment < timestampInTarget {
|
||||
log.Info("growing segment not exist in target, so release it",
|
||||
|
|
|
@ -646,6 +646,62 @@ func (suite *SegmentCheckerTestSuite) TestReleaseGrowingSegments() {
|
|||
suite.Equal(tasks[1].Priority(), task.TaskPriorityNormal)
|
||||
}
|
||||
|
||||
func (suite *SegmentCheckerTestSuite) TestReleaseCompactedGrowingSegments() {
|
||||
checker := suite.checker
|
||||
|
||||
checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
|
||||
checker.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
|
||||
checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
|
||||
|
||||
segments := []*datapb.SegmentInfo{
|
||||
{
|
||||
ID: 3,
|
||||
PartitionID: 1,
|
||||
InsertChannel: "test-insert-channel",
|
||||
},
|
||||
}
|
||||
channels := []*datapb.VchannelInfo{
|
||||
{
|
||||
CollectionID: 1,
|
||||
ChannelName: "test-insert-channel",
|
||||
SeekPosition: &msgpb.MsgPosition{Timestamp: 10},
|
||||
DroppedSegmentIds: []int64{4},
|
||||
},
|
||||
}
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
|
||||
channels, segments, nil)
|
||||
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
|
||||
checker.targetMgr.UpdateCollectionCurrentTarget(int64(1))
|
||||
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
|
||||
|
||||
growingSegments := make(map[int64]*meta.Segment)
|
||||
// segment start pos after chekcpoint
|
||||
growingSegments[4] = utils.CreateTestSegment(1, 1, 4, 2, 1, "test-insert-channel")
|
||||
growingSegments[4].SegmentInfo.StartPosition = &msgpb.MsgPosition{Timestamp: 11}
|
||||
|
||||
dmChannel := utils.CreateTestChannel(1, 2, 1, "test-insert-channel")
|
||||
dmChannel.UnflushedSegmentIds = []int64{2, 3}
|
||||
checker.dist.ChannelDistManager.Update(2, dmChannel)
|
||||
view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 2}, growingSegments)
|
||||
view.TargetVersion = checker.targetMgr.GetCollectionTargetVersion(int64(1), meta.CurrentTarget)
|
||||
checker.dist.LeaderViewManager.Update(2, view)
|
||||
checker.dist.SegmentDistManager.Update(2, utils.CreateTestSegment(1, 1, 3, 2, 2, "test-insert-channel"))
|
||||
|
||||
tasks := checker.Check(context.TODO())
|
||||
suite.Len(tasks, 1)
|
||||
sort.Slice(tasks, func(i, j int) bool {
|
||||
return tasks[i].Actions()[0].(*task.SegmentAction).GetSegmentID() < tasks[j].Actions()[0].(*task.SegmentAction).GetSegmentID()
|
||||
})
|
||||
suite.Len(tasks[0].Actions(), 1)
|
||||
action, ok := tasks[0].Actions()[0].(*task.SegmentAction)
|
||||
suite.True(ok)
|
||||
suite.EqualValues(1, tasks[0].ReplicaID())
|
||||
suite.Equal(task.ActionTypeReduce, action.Type())
|
||||
suite.EqualValues(4, action.GetSegmentID())
|
||||
suite.EqualValues(2, action.Node())
|
||||
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
|
||||
}
|
||||
|
||||
func (suite *SegmentCheckerTestSuite) TestSkipReleaseGrowingSegments() {
|
||||
checker := suite.checker
|
||||
checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
|
||||
|
|
Loading…
Reference in New Issue