mirror of https://github.com/milvus-io/milvus.git
fix: Delegator may becomes unserviceable after querycoord restart (#37055)
issue: #37054 after querycoord restart, segment_checker may release segment by mistake due to next target isn't ready yet. This PR requires release segment must happens after next target is ready. Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/37104/head
parent
0d376f1a8f
commit
39a91eb100
|
@ -192,13 +192,14 @@ func (c *SegmentChecker) getGrowingSegmentDiff(collectionID int64,
|
|||
continue
|
||||
}
|
||||
|
||||
nextTargetExist := c.targetMgr.IsNextTargetExist(collectionID)
|
||||
nextTargetSegmentIDs := c.targetMgr.GetGrowingSegmentsByCollection(collectionID, meta.NextTarget)
|
||||
currentTargetSegmentIDs := c.targetMgr.GetGrowingSegmentsByCollection(collectionID, meta.CurrentTarget)
|
||||
currentTargetChannelMap := c.targetMgr.GetDmChannelsByCollection(collectionID, meta.CurrentTarget)
|
||||
|
||||
// get segment which exist on leader view, but not on current target and next target
|
||||
for _, segment := range view.GrowingSegments {
|
||||
if !currentTargetSegmentIDs.Contain(segment.GetID()) && !nextTargetSegmentIDs.Contain(segment.GetID()) {
|
||||
if !currentTargetSegmentIDs.Contain(segment.GetID()) && nextTargetExist && !nextTargetSegmentIDs.Contain(segment.GetID()) {
|
||||
if channel, ok := currentTargetChannelMap[segment.InsertChannel]; ok {
|
||||
timestampInSegment := segment.GetStartPosition().GetTimestamp()
|
||||
timestampInTarget := channel.GetSeekPosition().GetTimestamp()
|
||||
|
@ -270,6 +271,7 @@ func (c *SegmentChecker) getSealedSegmentDiff(
|
|||
return !existInDist
|
||||
}
|
||||
|
||||
nextTargetExist := c.targetMgr.IsNextTargetExist(collectionID)
|
||||
nextTargetMap := c.targetMgr.GetSealedSegmentsByCollection(collectionID, meta.NextTarget)
|
||||
currentTargetMap := c.targetMgr.GetSealedSegmentsByCollection(collectionID, meta.CurrentTarget)
|
||||
|
||||
|
@ -298,7 +300,7 @@ func (c *SegmentChecker) getSealedSegmentDiff(
|
|||
_, existOnNext := nextTargetMap[segment.GetID()]
|
||||
|
||||
// l0 segment should be release with channel together
|
||||
if !existOnNext && !existOnCurrent {
|
||||
if !existOnNext && nextTargetExist && !existOnCurrent {
|
||||
toRelease = append(toRelease, segment)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -317,6 +317,7 @@ func (suite *SegmentCheckerTestSuite) TestReleaseL0Segments() {
|
|||
channels, nil, nil)
|
||||
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
|
||||
checker.targetMgr.UpdateCollectionCurrentTarget(int64(1))
|
||||
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
|
||||
|
||||
tasks = checker.Check(context.TODO())
|
||||
suite.Len(tasks, 1)
|
||||
|
@ -532,6 +533,7 @@ func (suite *SegmentCheckerTestSuite) TestSkipReleaseSealedSegments() {
|
|||
channels, segments, nil)
|
||||
checker.targetMgr.UpdateCollectionNextTarget(collectionID)
|
||||
checker.targetMgr.UpdateCollectionCurrentTarget(collectionID)
|
||||
checker.targetMgr.UpdateCollectionNextTarget(collectionID)
|
||||
readableVersion := checker.targetMgr.GetCollectionTargetVersion(collectionID, meta.CurrentTarget)
|
||||
|
||||
// test less target version exist on leader,meet segment doesn't exit in target, segment should be released
|
||||
|
@ -602,6 +604,7 @@ func (suite *SegmentCheckerTestSuite) TestReleaseGrowingSegments() {
|
|||
channels, segments, nil)
|
||||
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
|
||||
checker.targetMgr.UpdateCollectionCurrentTarget(int64(1))
|
||||
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
|
||||
|
||||
growingSegments := make(map[int64]*meta.Segment)
|
||||
growingSegments[2] = utils.CreateTestSegment(1, 1, 2, 2, 0, "test-insert-channel")
|
||||
|
@ -661,6 +664,7 @@ func (suite *SegmentCheckerTestSuite) TestSkipReleaseGrowingSegments() {
|
|||
channels, segments, nil)
|
||||
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
|
||||
checker.targetMgr.UpdateCollectionCurrentTarget(int64(1))
|
||||
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
|
||||
|
||||
growingSegments := make(map[int64]*meta.Segment)
|
||||
growingSegments[2] = utils.CreateTestSegment(1, 1, 2, 2, 0, "test-insert-channel")
|
||||
|
|
Loading…
Reference in New Issue