mirror of https://github.com/milvus-io/milvus.git
Fix manual balance failed with TaskStale (#19400)
Signed-off-by: yah01 <yang.cen@zilliz.com> Signed-off-by: yah01 <yang.cen@zilliz.com>pull/19123/head
parent
2cfacbba8d
commit
6d6e14e67d
|
@ -81,16 +81,26 @@ func (s *Server) balanceSegments(ctx context.Context, req *querypb.LoadBalanceRe
|
|||
}
|
||||
}
|
||||
|
||||
log := log.With(
|
||||
zap.Int64("collectionID", req.GetCollectionID()),
|
||||
zap.Int64("srcNodeID", srcNode),
|
||||
zap.Int64s("destNodeIDs", dstNodeSet.Collect()),
|
||||
)
|
||||
|
||||
plans := s.balancer.AssignSegment(toBalance.Collect(), dstNodeSet.Collect())
|
||||
tasks := make([]task.Task, 0, len(plans))
|
||||
for _, plan := range plans {
|
||||
log.Info("manually balance segment...",
|
||||
zap.Int64("destNodeID", plan.To),
|
||||
zap.Int64("segmentID", plan.Segment.GetID()),
|
||||
)
|
||||
task := task.NewSegmentTask(ctx,
|
||||
Params.QueryCoordCfg.SegmentTaskTimeout,
|
||||
req.Base.GetMsgID(),
|
||||
req.GetCollectionID(),
|
||||
replica.GetID(),
|
||||
task.NewSegmentAction(plan.To, task.ActionTypeGrow, plan.Segment.GetID()),
|
||||
task.NewSegmentAction(plan.From, task.ActionTypeReduce, plan.Segment.GetID()),
|
||||
task.NewSegmentAction(srcNode, task.ActionTypeReduce, plan.Segment.GetID()),
|
||||
)
|
||||
err := s.taskScheduler.Add(task)
|
||||
if err != nil {
|
||||
|
|
|
@ -549,12 +549,19 @@ func (suite *ServiceSuite) TestLoadBalance() {
|
|||
DstNodeIDs: []int64{dstNode},
|
||||
SealedSegmentIDs: segments,
|
||||
}
|
||||
suite.taskScheduler.ExpectedCalls = make([]*mock.Call, 0)
|
||||
suite.taskScheduler.EXPECT().Add(mock.Anything).Run(func(task task.Task) {
|
||||
actions := task.Actions()
|
||||
suite.Len(actions, 2)
|
||||
growAction, reduceAction := actions[0], actions[1]
|
||||
suite.Equal(dstNode, growAction.Node())
|
||||
suite.Equal(srcNode, reduceAction.Node())
|
||||
task.Cancel()
|
||||
}).Return(nil)
|
||||
resp, err := server.LoadBalance(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
suite.taskScheduler.AssertExpectations(suite.T())
|
||||
}
|
||||
|
||||
// Test when server is not healthy
|
||||
|
|
Loading…
Reference in New Issue