mirror of https://github.com/milvus-io/milvus.git
fix: Balance channel may cause channel not availble error (#28829)
issue: #28831 release old delegator before new delegator update it's distribution may cause `channel not availble` error This PR will block release old delgator before new delegator finish `syncDistribution` Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/28887/head
parent
af98d1cb64
commit
043ac87be0
|
@ -614,12 +614,52 @@ func (scheduler *taskScheduler) isRelated(task Task, node int64) bool {
|
|||
// return true if the task should be executed,
|
||||
// false otherwise
|
||||
func (scheduler *taskScheduler) preProcess(task Task) bool {
|
||||
log := log.Ctx(scheduler.ctx).WithRateGroup("qcv2.taskScheduler", 1, 60).With(
|
||||
zap.Int64("collectionID", task.CollectionID()),
|
||||
zap.Int64("taskID", task.ID()),
|
||||
)
|
||||
if task.Status() != TaskStatusStarted {
|
||||
return false
|
||||
}
|
||||
|
||||
// check if new delegator is ready to release old delegator
|
||||
checkLeaderView := func(collectionID int64, channel string, node int64) bool {
|
||||
segmentsInTarget := scheduler.targetMgr.GetSealedSegmentsByChannel(collectionID, channel, meta.CurrentTarget)
|
||||
leader := scheduler.distMgr.LeaderViewManager.GetLeaderShardView(node, channel)
|
||||
if leader == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
for segmentID := range segmentsInTarget {
|
||||
if _, exist := leader.Segments[segmentID]; !exist {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
actions, step := task.Actions(), task.Step()
|
||||
for step < len(actions) && actions[step].IsFinished(scheduler.distMgr) {
|
||||
if GetTaskType(task) == TaskTypeMove && actions[step].Type() == ActionTypeGrow {
|
||||
var ready bool
|
||||
switch actions[step].(type) {
|
||||
case *ChannelAction:
|
||||
// if balance channel task has finished grow action, block reduce action until
|
||||
// segment distribution has been sync to new delegator, cause new delegator may
|
||||
// causes a few time to load delta log, if reduce the old delegator in advance,
|
||||
// new delegator can't service search and query, will got no available channel error
|
||||
channelAction := actions[step].(*ChannelAction)
|
||||
ready = checkLeaderView(task.CollectionID(), channelAction.Shard(), channelAction.Node())
|
||||
default:
|
||||
ready = true
|
||||
}
|
||||
|
||||
if !ready {
|
||||
log.RatedInfo(30, "Blocking reduce action in balance channel task")
|
||||
break
|
||||
}
|
||||
}
|
||||
task.StepUp()
|
||||
step++
|
||||
}
|
||||
|
|
|
@ -1428,6 +1428,92 @@ func createReplica(collection int64, nodes ...int64) *meta.Replica {
|
|||
)
|
||||
}
|
||||
|
||||
func (suite *TaskSuite) TestBalanceChannelTask() {
|
||||
collectionID := int64(1)
|
||||
partitionID := int64(1)
|
||||
channel := "channel-1"
|
||||
vchannel := &datapb.VchannelInfo{
|
||||
CollectionID: collectionID,
|
||||
ChannelName: channel,
|
||||
}
|
||||
|
||||
segments := []*datapb.SegmentInfo{
|
||||
{
|
||||
ID: 1,
|
||||
CollectionID: collectionID,
|
||||
PartitionID: partitionID,
|
||||
InsertChannel: channel,
|
||||
},
|
||||
{
|
||||
ID: 2,
|
||||
CollectionID: collectionID,
|
||||
PartitionID: partitionID,
|
||||
InsertChannel: channel,
|
||||
},
|
||||
{
|
||||
ID: 3,
|
||||
CollectionID: collectionID,
|
||||
PartitionID: partitionID,
|
||||
InsertChannel: channel,
|
||||
},
|
||||
}
|
||||
suite.meta.PutCollection(utils.CreateTestCollection(collectionID, 1), utils.CreateTestPartition(collectionID, 1))
|
||||
suite.broker.ExpectedCalls = nil
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return([]*datapb.VchannelInfo{vchannel}, segments, nil)
|
||||
suite.target.UpdateCollectionNextTarget(collectionID)
|
||||
suite.target.UpdateCollectionCurrentTarget(collectionID)
|
||||
suite.target.UpdateCollectionNextTarget(collectionID)
|
||||
|
||||
suite.dist.LeaderViewManager.Update(2, &meta.LeaderView{
|
||||
ID: 2,
|
||||
CollectionID: collectionID,
|
||||
Channel: channel,
|
||||
Segments: map[int64]*querypb.SegmentDist{
|
||||
1: {},
|
||||
2: {},
|
||||
3: {},
|
||||
},
|
||||
})
|
||||
suite.dist.LeaderViewManager.Update(1, &meta.LeaderView{
|
||||
ID: 1,
|
||||
CollectionID: collectionID,
|
||||
Channel: channel,
|
||||
})
|
||||
task, err := NewChannelTask(context.Background(),
|
||||
10*time.Second,
|
||||
WrapIDSource(2),
|
||||
collectionID,
|
||||
1,
|
||||
NewChannelAction(1, ActionTypeGrow, channel),
|
||||
NewChannelAction(2, ActionTypeReduce, channel),
|
||||
)
|
||||
suite.NoError(err)
|
||||
|
||||
// new delegator distribution hasn't updated, block balance
|
||||
suite.scheduler.preProcess(task)
|
||||
suite.Equal(0, task.step)
|
||||
|
||||
suite.dist.LeaderViewManager.Update(1, &meta.LeaderView{
|
||||
ID: 1,
|
||||
CollectionID: collectionID,
|
||||
Channel: channel,
|
||||
Segments: map[int64]*querypb.SegmentDist{
|
||||
1: {},
|
||||
2: {},
|
||||
3: {},
|
||||
},
|
||||
})
|
||||
|
||||
// new delegator distribution updated, task step up
|
||||
suite.scheduler.preProcess(task)
|
||||
suite.Equal(1, task.step)
|
||||
|
||||
suite.dist.LeaderViewManager.Update(2)
|
||||
// old delegator removed
|
||||
suite.scheduler.preProcess(task)
|
||||
suite.Equal(2, task.step)
|
||||
}
|
||||
|
||||
func TestTask(t *testing.T) {
|
||||
suite.Run(t, new(TaskSuite))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue