From 855f71ac8913fa7a5bae72e5ea719496937fe533 Mon Sep 17 00:00:00 2001 From: wei liu Date: Mon, 11 Mar 2024 15:01:11 +0800 Subject: [PATCH] fix: Dirty sealed segment won't release after channel balance (#31095) (#31126) issue: #31074 pr: #31095 This PR fix dirty sealed segment doesn't release after channel balance, dirty sealed segment means segment doesn't exist in targets. Signed-off-by: Wei Liu --- .../querycoordv2/checkers/segment_checker.go | 8 +++++++- .../checkers/segment_checker_test.go | 20 ++++++++++++++++--- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index d5f4e6f9e6..d48df5c543 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -36,6 +36,8 @@ import ( "github.com/milvus-io/milvus/pkg/log" ) +const initialTargetVersion = int64(0) + type SegmentChecker struct { meta *meta.Meta dist *meta.DistributionManager @@ -325,7 +327,11 @@ func (c *SegmentChecker) filterSegmentInUse(replica *meta.Replica, segments []*m view := c.dist.LeaderViewManager.GetLeaderShardView(leaderID, s.GetInsertChannel()) currentTargetVersion := c.targetMgr.GetCollectionTargetVersion(s.CollectionID, meta.CurrentTarget) partition := c.meta.CollectionManager.GetPartition(s.PartitionID) - if partition != nil && view.TargetVersion != currentTargetVersion { + + // if delegator has valid target version, and before it update to latest readable version, skip release it's sealed segment + // Notice: if syncTargetVersion stuck, segment on delegator won't be released + readableVersionNotUpdate := view.TargetVersion != initialTargetVersion && view.TargetVersion < currentTargetVersion + if partition != nil && readableVersionNotUpdate { // leader view version hasn't been updated, segment maybe still in use continue } diff --git a/internal/querycoordv2/checkers/segment_checker_test.go b/internal/querycoordv2/checkers/segment_checker_test.go index af6495256e..5fadcc900b 100644 --- a/internal/querycoordv2/checkers/segment_checker_test.go +++ b/internal/querycoordv2/checkers/segment_checker_test.go @@ -333,7 +333,7 @@ func (suite *SegmentCheckerTestSuite) TestSkipReleaseSealedSegments() { checker.targetMgr.UpdateCollectionCurrentTarget(collectionID) readableVersion := checker.targetMgr.GetCollectionTargetVersion(collectionID, meta.CurrentTarget) - // set dist + // test less target version exist on leader,meet segment doesn't exit in target, segment should be released nodeID := int64(2) segmentID := int64(1) checker.dist.ChannelDistManager.Update(nodeID, utils.CreateTestChannel(collectionID, nodeID, segmentID, "test-insert-channel")) @@ -341,11 +341,10 @@ func (suite *SegmentCheckerTestSuite) TestSkipReleaseSealedSegments() { view.TargetVersion = readableVersion - 1 checker.dist.LeaderViewManager.Update(nodeID, view) checker.dist.SegmentDistManager.Update(nodeID, utils.CreateTestSegment(collectionID, partitionID, segmentID, nodeID, 2, "test-insert-channel")) - tasks := checker.Check(context.TODO()) suite.Len(tasks, 0) - // test less version exist on leader + // test leader's target version update to latest,meet segment doesn't exit in target, segment should be released view = utils.CreateTestLeaderView(nodeID, collectionID, "test-insert-channel", map[int64]int64{1: 3}, map[int64]*meta.Segment{}) view.TargetVersion = readableVersion checker.dist.LeaderViewManager.Update(2, view) @@ -359,6 +358,21 @@ func (suite *SegmentCheckerTestSuite) TestSkipReleaseSealedSegments() { suite.EqualValues(segmentID, action.SegmentID()) suite.EqualValues(nodeID, action.Node()) suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) + + // test leader with initialTargetVersion, meet segment doesn't exit in target, segment should be released + view = utils.CreateTestLeaderView(nodeID, collectionID, "test-insert-channel", map[int64]int64{1: 3}, map[int64]*meta.Segment{}) + view.TargetVersion = initialTargetVersion + checker.dist.LeaderViewManager.Update(2, view) + tasks = checker.Check(context.TODO()) + suite.Len(tasks, 1) + suite.Len(tasks[0].Actions(), 1) + action, ok = tasks[0].Actions()[0].(*task.SegmentAction) + suite.True(ok) + suite.EqualValues(1, tasks[0].ReplicaID()) + suite.Equal(task.ActionTypeReduce, action.Type()) + suite.EqualValues(segmentID, action.SegmentID()) + suite.EqualValues(nodeID, action.Node()) + suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) } func (suite *SegmentCheckerTestSuite) TestReleaseGrowingSegments() {