Add segment dist containing condition for loading segment (#25736)

Signed-off-by: yah01 <yang.cen@zilliz.com>
pull/25711/head
yah01 2023-07-19 15:02:58 +08:00 committed by GitHub
parent 96141fa998
commit 224515eaa3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 52 additions and 14 deletions

View File

@ -179,3 +179,19 @@ func (m *SegmentDistManager) GetByCollectionAndNode(collectionID, nodeID UniqueI
}
return ret
}
func (m *SegmentDistManager) GetSegmentDist(segmentID int64) []int64 {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
ret := make([]int64, 0)
for nodeID, segments := range m.segments {
for _, segment := range segments {
if segment.GetID() == segmentID {
ret = append(ret, nodeID)
break
}
}
}
return ret
}

View File

@ -98,8 +98,10 @@ func (action *SegmentAction) Scope() querypb.DataScope {
func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool {
if action.Type() == ActionTypeGrow {
nodes := distMgr.LeaderViewManager.GetSealedSegmentDist(action.SegmentID())
return lo.Contains(nodes, action.Node())
leaderSegmentDist := distMgr.LeaderViewManager.GetSealedSegmentDist(action.SegmentID())
nodeSegmentDist := distMgr.SegmentDistManager.GetSegmentDist(action.SegmentID())
return lo.Contains(leaderSegmentDist, action.Node()) &&
lo.Contains(nodeSegmentDist, action.Node())
} else if action.Type() == ActionTypeReduce {
// FIXME: Now shard leader's segment view is a map of segment ID to node ID,
// loading segment replaces the node ID with the new one,

View File

@ -37,6 +37,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
. "github.com/milvus-io/milvus/pkg/util/typeutil"
"github.com/samber/lo"
)
const (
@ -307,9 +308,10 @@ func (scheduler *taskScheduler) preAdd(task Task) error {
}
if GetTaskType(task) == TaskTypeGrow {
nodesWithSegment := scheduler.distMgr.LeaderViewManager.GetSealedSegmentDist(task.SegmentID())
replicaNodeMap := utils.GroupNodesByReplica(scheduler.meta.ReplicaManager, task.CollectionID(), nodesWithSegment)
if _, ok := replicaNodeMap[task.ReplicaID()]; ok {
leaderSegmentDist := scheduler.distMgr.LeaderViewManager.GetSealedSegmentDist(task.SegmentID())
nodeSegmentDist := scheduler.distMgr.SegmentDistManager.GetSegmentDist(task.SegmentID())
if lo.Contains(leaderSegmentDist, task.Actions()[0].Node()) &&
lo.Contains(nodeSegmentDist, task.Actions()[0].Node()) {
return merr.WrapErrServiceInternal("segment loaded, it can be only balanced")
}
}

View File

@ -23,6 +23,7 @@ import (
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
@ -438,7 +439,11 @@ func (suite *TaskSuite) TestLoadSegmentTask() {
for _, segment := range suite.loadSegments {
view.Segments[segment] = &querypb.SegmentDist{NodeID: targetNode, Version: 0}
}
distSegments := lo.Map(segments, func(info *datapb.SegmentInfo, _ int) *meta.Segment {
return meta.SegmentFromInfo(info)
})
suite.dist.LeaderViewManager.Update(targetNode, view)
suite.dist.SegmentDistManager.Update(targetNode, distSegments...)
suite.dispatchAndWait(targetNode)
suite.AssertTaskNum(0, 0, 0, 0)
@ -483,9 +488,9 @@ func (suite *TaskSuite) TestLoadSegmentTaskFailed() {
ChannelName: channel.ChannelName,
}))
tasks := []Task{}
segmentInfos := make([]*datapb.SegmentInfo, 0)
segments := make([]*datapb.SegmentInfo, 0)
for _, segment := range suite.loadSegments {
segmentInfos = append(segmentInfos, &datapb.SegmentInfo{
segments = append(segments, &datapb.SegmentInfo{
ID: segment,
PartitionID: 1,
InsertChannel: channel.ChannelName,
@ -503,7 +508,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskFailed() {
err = suite.scheduler.Add(task)
suite.NoError(err)
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segmentInfos, nil)
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segments, nil)
suite.target.UpdateCollectionNextTargetWithPartitions(suite.collection, int64(1))
segmentsNum := len(suite.loadSegments)
suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum)
@ -734,7 +739,12 @@ func (suite *TaskSuite) TestMoveSegmentTask() {
for _, segment := range suite.moveSegments {
view.Segments[segment] = &querypb.SegmentDist{NodeID: targetNode, Version: 0}
}
distSegments := lo.Map(segmentInfos, func(info *datapb.SegmentInfo, _ int) *meta.Segment {
return meta.SegmentFromInfo(info)
})
suite.dist.LeaderViewManager.Update(leader, view)
suite.dist.SegmentDistManager.Update(targetNode, distSegments...)
// First action done, execute the second action
suite.dispatchAndWait(leader)
// Check second action
@ -873,9 +883,9 @@ func (suite *TaskSuite) TestSegmentTaskStale() {
ChannelName: channel.ChannelName,
}))
tasks := []Task{}
segmentInfos := make([]*datapb.SegmentInfo, 0)
segments := make([]*datapb.SegmentInfo, 0)
for _, segment := range suite.loadSegments {
segmentInfos = append(segmentInfos, &datapb.SegmentInfo{
segments = append(segments, &datapb.SegmentInfo{
ID: segment,
PartitionID: 1,
InsertChannel: channel.GetChannelName(),
@ -893,7 +903,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() {
err = suite.scheduler.Add(task)
suite.NoError(err)
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segmentInfos, nil)
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segments, nil)
suite.target.UpdateCollectionNextTargetWithPartitions(suite.collection, int64(1))
segmentsNum := len(suite.loadSegments)
suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum)
@ -912,10 +922,14 @@ func (suite *TaskSuite) TestSegmentTaskStale() {
for _, segment := range suite.loadSegments[1:] {
view.Segments[segment] = &querypb.SegmentDist{NodeID: targetNode, Version: 0}
}
distSegments := lo.Map(segments, func(info *datapb.SegmentInfo, _ int) *meta.Segment {
return meta.SegmentFromInfo(info)
})
suite.dist.LeaderViewManager.Update(targetNode, view)
segmentInfos = make([]*datapb.SegmentInfo, 0)
suite.dist.SegmentDistManager.Update(targetNode, distSegments...)
segments = make([]*datapb.SegmentInfo, 0)
for _, segment := range suite.loadSegments[1:] {
segmentInfos = append(segmentInfos, &datapb.SegmentInfo{
segments = append(segments, &datapb.SegmentInfo{
ID: segment,
PartitionID: 2,
InsertChannel: channel.GetChannelName(),
@ -924,7 +938,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() {
bakExpectations := suite.broker.ExpectedCalls
suite.broker.AssertExpectations(suite.T())
suite.broker.ExpectedCalls = suite.broker.ExpectedCalls[:0]
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segmentInfos, nil)
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segments, nil)
suite.target.UpdateCollectionNextTargetWithPartitions(suite.collection, int64(2))
suite.dispatchAndWait(targetNode)
suite.AssertTaskNum(0, 0, 0, 0)
@ -1147,7 +1161,11 @@ func (suite *TaskSuite) TestNoExecutor() {
for _, segment := range suite.loadSegments {
view.Segments[segment] = &querypb.SegmentDist{NodeID: targetNode, Version: 0}
}
distSegments := lo.Map(segments, func(info *datapb.SegmentInfo, _ int) *meta.Segment {
return meta.SegmentFromInfo(info)
})
suite.dist.LeaderViewManager.Update(targetNode, view)
suite.dist.SegmentDistManager.Update(targetNode, distSegments...)
suite.dispatchAndWait(targetNode)
suite.AssertTaskNum(0, 0, 0, 0)
}