mirror of https://github.com/milvus-io/milvus.git
Improvement apis and error messages about the graceful stop (#21580)
Signed-off-by: SimFG <bang.fu@zilliz.com>pull/21604/head
parent
7b12865c73
commit
080bed9083
|
@ -198,6 +198,7 @@ queryNode:
|
|||
loadMemoryUsageFactor: 3 # The multiply factor of calculating the memory usage while loading segments
|
||||
enableDisk: true # enable querynode load disk index, and search on disk index
|
||||
maxDiskUsagePercentage: 95
|
||||
gracefulStopTimeout: 30
|
||||
|
||||
stats:
|
||||
publishInterval: 1000 # Interval for querynode to report node information (milliseconds)
|
||||
|
@ -256,6 +257,7 @@ indexNode:
|
|||
port: 21121
|
||||
enableDisk: true # enable index node build disk vector index
|
||||
maxDiskUsagePercentage: 95
|
||||
gracefulStopTimeout: 30
|
||||
|
||||
scheduler:
|
||||
buildParallel: 1
|
||||
|
|
|
@ -68,8 +68,8 @@ func (dh *distHandler) start(ctx context.Context) {
|
|||
logger.Info("close dist handelr")
|
||||
return
|
||||
case <-ticker.C:
|
||||
dh.getDistribution(ctx, func(isSuccess bool) {
|
||||
if !isSuccess {
|
||||
dh.getDistribution(ctx, func(isFail bool) {
|
||||
if isFail {
|
||||
failures++
|
||||
} else {
|
||||
failures = 0
|
||||
|
@ -199,7 +199,7 @@ func (dh *distHandler) updateLeaderView(resp *querypb.GetDataDistributionRespons
|
|||
dh.dist.LeaderViewManager.Update(resp.GetNodeID(), updates...)
|
||||
}
|
||||
|
||||
func (dh *distHandler) getDistribution(ctx context.Context, fn func(isSuccess bool)) {
|
||||
func (dh *distHandler) getDistribution(ctx context.Context, fn func(isFail bool)) {
|
||||
dh.mu.Lock()
|
||||
defer dh.mu.Unlock()
|
||||
cctx, cancel := context.WithTimeout(ctx, distReqTimeout)
|
||||
|
@ -210,15 +210,15 @@ func (dh *distHandler) getDistribution(ctx context.Context, fn func(isSuccess bo
|
|||
})
|
||||
cancel()
|
||||
|
||||
isSuccess := err != nil || resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success
|
||||
if isSuccess {
|
||||
isFail := err != nil || resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success
|
||||
if isFail {
|
||||
dh.logFailureInfo(resp, err)
|
||||
} else {
|
||||
dh.handleDistResp(resp)
|
||||
}
|
||||
|
||||
if fn != nil {
|
||||
fn(isSuccess)
|
||||
fn(isFail)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -452,6 +452,20 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfo
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (s *Server) isStoppingNode(nodeID int64) error {
|
||||
isStopping, err := s.nodeMgr.IsStoppingNode(nodeID)
|
||||
if err != nil {
|
||||
log.Warn("fail to check whether the node is stopping", zap.Int64("node_id", nodeID), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
if isStopping {
|
||||
msg := fmt.Sprintf("failed to balance due to the source/destination node[%d] is stopping", nodeID)
|
||||
log.Warn(msg)
|
||||
return errors.New(msg)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) LoadBalance(ctx context.Context, req *querypb.LoadBalanceRequest) (*commonpb.Status, error) {
|
||||
log := log.With(
|
||||
zap.Int64("msgID", req.GetBase().GetMsgID()),
|
||||
|
@ -487,12 +501,20 @@ func (s *Server) LoadBalance(ctx context.Context, req *querypb.LoadBalanceReques
|
|||
log.Warn(msg)
|
||||
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg), nil
|
||||
}
|
||||
if err := s.isStoppingNode(srcNode); err != nil {
|
||||
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError,
|
||||
fmt.Sprintf("can't balance, because the source node[%d] is invalid", srcNode), err), nil
|
||||
}
|
||||
for _, dstNode := range req.GetDstNodeIDs() {
|
||||
if !replica.Nodes.Contain(dstNode) {
|
||||
msg := "destination nodes have to be in the same replica of source node"
|
||||
log.Warn(msg)
|
||||
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg), nil
|
||||
}
|
||||
if err := s.isStoppingNode(dstNode); err != nil {
|
||||
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError,
|
||||
fmt.Sprintf("can't balance, because the destination node[%d] is invalid", dstNode), err), nil
|
||||
}
|
||||
}
|
||||
|
||||
err := s.balanceSegments(ctx, req, replica)
|
||||
|
|
|
@ -745,6 +745,26 @@ func (suite *ServiceSuite) TestLoadBalanceFailed() {
|
|||
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode)
|
||||
suite.Contains(resp.Reason, "failed to balance segments")
|
||||
suite.Contains(resp.Reason, task.ErrTaskCanceled.Error())
|
||||
|
||||
suite.meta.ReplicaManager.AddNode(replicas[0].ID, 10)
|
||||
req.SourceNodeIDs = []int64{10}
|
||||
resp, err = server.LoadBalance(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode)
|
||||
|
||||
req.SourceNodeIDs = []int64{srcNode}
|
||||
req.DstNodeIDs = []int64{10}
|
||||
resp, err = server.LoadBalance(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode)
|
||||
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(10, "localhost"))
|
||||
suite.nodeMgr.Stopping(10)
|
||||
resp, err = server.LoadBalance(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode)
|
||||
suite.nodeMgr.Remove(10)
|
||||
suite.meta.ReplicaManager.RemoveNode(replicas[0].ID, 10)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -73,6 +73,8 @@ func TestComponentParam(t *testing.T) {
|
|||
assert.Equal(t, CParams.IndexNodeCfg.GracefulStopTimeout, Params.GracefulStopTimeout)
|
||||
t.Logf("default grafeful stop timeout = %d", Params.GracefulStopTimeout)
|
||||
Params.Base.Save("common.gracefulStopTimeout", "50")
|
||||
Params.Base.Remove("queryNode.gracefulStopTimeout")
|
||||
Params.Base.Remove("indexNode.gracefulStopTimeout")
|
||||
Params.initGracefulStopTimeout()
|
||||
assert.Equal(t, Params.GracefulStopTimeout, int64(50))
|
||||
CParams.QueryNodeCfg.initGracefulStopTimeout()
|
||||
|
|
Loading…
Reference in New Issue