mirror of https://github.com/milvus-io/milvus.git
Apply lifetime control for indexnode (#21892)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/21896/head
parent
01e59f8ec2
commit
ec282b1958
|
@ -32,7 +32,6 @@ import (
|
|||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
@ -47,6 +46,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/util/dependency"
|
||||
"github.com/milvus-io/milvus/internal/util/hardware"
|
||||
"github.com/milvus-io/milvus/internal/util/initcore"
|
||||
"github.com/milvus-io/milvus/internal/util/lifetime"
|
||||
"github.com/milvus-io/milvus/internal/util/paramtable"
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
|
@ -74,7 +74,7 @@ type taskKey struct {
|
|||
|
||||
// IndexNode is a component that executes the task of building indexes.
|
||||
type IndexNode struct {
|
||||
stateCode atomic.Value
|
||||
lifetime lifetime.Lifetime[commonpb.StateCode]
|
||||
|
||||
loopCtx context.Context
|
||||
loopCancel func()
|
||||
|
@ -107,8 +107,8 @@ func NewIndexNode(ctx context.Context, factory dependency.Factory) *IndexNode {
|
|||
factory: factory,
|
||||
storageFactory: &chunkMgr{},
|
||||
tasks: map[taskKey]*taskInfo{},
|
||||
lifetime: lifetime.NewLifetime(commonpb.StateCode_Abnormal),
|
||||
}
|
||||
b.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||
sc := NewTaskScheduler(b.loopCtx)
|
||||
|
||||
b.sched = sc
|
||||
|
@ -172,7 +172,7 @@ func (i *IndexNode) Init() error {
|
|||
var initErr error
|
||||
i.initOnce.Do(func() {
|
||||
i.UpdateStateCode(commonpb.StateCode_Initializing)
|
||||
log.Info("IndexNode init", zap.Any("State", i.stateCode.Load().(commonpb.StateCode)))
|
||||
log.Info("IndexNode init", zap.String("state", i.lifetime.GetState().String()))
|
||||
err := i.initSession()
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
|
@ -204,7 +204,7 @@ func (i *IndexNode) Start() error {
|
|||
startErr = i.sched.Start()
|
||||
|
||||
i.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
log.Info("IndexNode", zap.Any("State", i.stateCode.Load()))
|
||||
log.Info("IndexNode", zap.Any("State", i.lifetime.GetState().String()))
|
||||
})
|
||||
|
||||
log.Info("IndexNode start finished", zap.Error(startErr))
|
||||
|
@ -222,6 +222,7 @@ func (i *IndexNode) Stop() error {
|
|||
} else {
|
||||
i.waitTaskFinish()
|
||||
}
|
||||
i.lifetime.Wait()
|
||||
|
||||
// https://github.com/milvus-io/milvus/issues/12282
|
||||
i.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||
|
@ -246,7 +247,7 @@ func (i *IndexNode) Stop() error {
|
|||
|
||||
// UpdateStateCode updates the component state of IndexNode.
|
||||
func (i *IndexNode) UpdateStateCode(code commonpb.StateCode) {
|
||||
i.stateCode.Store(code)
|
||||
i.lifetime.SetState(code)
|
||||
}
|
||||
|
||||
// SetEtcdClient assigns parameter client to its member etcdCli
|
||||
|
@ -265,7 +266,7 @@ func (i *IndexNode) GetComponentStates(ctx context.Context) (*milvuspb.Component
|
|||
// NodeID: Params.NodeID, // will race with i.Register()
|
||||
NodeID: nodeID,
|
||||
Role: typeutil.IndexNodeRole,
|
||||
StateCode: i.stateCode.Load().(commonpb.StateCode),
|
||||
StateCode: i.lifetime.GetState(),
|
||||
}
|
||||
|
||||
ret := &milvuspb.ComponentStates{
|
||||
|
@ -277,8 +278,8 @@ func (i *IndexNode) GetComponentStates(ctx context.Context) (*milvuspb.Component
|
|||
}
|
||||
|
||||
log.RatedInfo(10, "IndexNode Component states",
|
||||
zap.Any("State", ret.State),
|
||||
zap.Any("Status", ret.Status),
|
||||
zap.String("State", ret.State.String()),
|
||||
zap.String("Status", ret.GetStatus().GetErrorCode().String()),
|
||||
zap.Any("SubcomponentStates", ret.SubcomponentStates))
|
||||
return ret, nil
|
||||
}
|
||||
|
@ -310,7 +311,7 @@ func (i *IndexNode) GetNodeID() int64 {
|
|||
|
||||
// ShowConfigurations returns the configurations of indexNode matching req.Pattern
|
||||
func (i *IndexNode) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
|
||||
if !commonpbutil.IsHealthyOrStopping(i.stateCode) {
|
||||
if !i.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
|
||||
log.Warn("IndexNode.ShowConfigurations failed",
|
||||
zap.Int64("nodeId", paramtable.GetNodeID()),
|
||||
zap.String("req", req.Pattern),
|
||||
|
|
|
@ -41,14 +41,15 @@ import (
|
|||
)
|
||||
|
||||
func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest) (*commonpb.Status, error) {
|
||||
if !commonpbutil.IsHealthy(i.stateCode) {
|
||||
stateCode := i.stateCode.Load().(commonpb.StateCode)
|
||||
log.Ctx(ctx).Warn("index node not ready", zap.Int32("state", int32(stateCode)), zap.String("ClusterID", req.ClusterID), zap.Int64("IndexBuildID", req.BuildID))
|
||||
if !i.lifetime.Add(commonpbutil.IsHealthy) {
|
||||
stateCode := i.lifetime.GetState()
|
||||
log.Ctx(ctx).Warn("index node not ready", zap.String("state", stateCode.String()), zap.String("ClusterID", req.ClusterID), zap.Int64("IndexBuildID", req.BuildID))
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: "state code is not healthy",
|
||||
}, nil
|
||||
}
|
||||
defer i.lifetime.Done()
|
||||
log.Ctx(ctx).Info("IndexNode building index ...",
|
||||
zap.String("ClusterID", req.ClusterID),
|
||||
zap.Int64("IndexBuildID", req.BuildID),
|
||||
|
@ -116,9 +117,9 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest
|
|||
}
|
||||
|
||||
func (i *IndexNode) QueryJobs(ctx context.Context, req *indexpb.QueryJobsRequest) (*indexpb.QueryJobsResponse, error) {
|
||||
if !commonpbutil.IsHealthyOrStopping(i.stateCode) {
|
||||
stateCode := i.stateCode.Load().(commonpb.StateCode)
|
||||
log.Ctx(ctx).Warn("index node not ready", zap.Int32("state", int32(stateCode)), zap.String("ClusterID", req.ClusterID))
|
||||
if !i.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
|
||||
stateCode := i.lifetime.GetState()
|
||||
log.Ctx(ctx).Warn("index node not ready", zap.String("state", stateCode.String()), zap.String("ClusterID", req.ClusterID))
|
||||
return &indexpb.QueryJobsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
|
@ -126,6 +127,7 @@ func (i *IndexNode) QueryJobs(ctx context.Context, req *indexpb.QueryJobsRequest
|
|||
},
|
||||
}, nil
|
||||
}
|
||||
defer i.lifetime.Done()
|
||||
infos := make(map[UniqueID]*taskInfo)
|
||||
i.foreachTaskInfo(func(ClusterID string, buildID UniqueID, info *taskInfo) {
|
||||
if ClusterID == req.ClusterID {
|
||||
|
@ -167,14 +169,15 @@ func (i *IndexNode) QueryJobs(ctx context.Context, req *indexpb.QueryJobsRequest
|
|||
|
||||
func (i *IndexNode) DropJobs(ctx context.Context, req *indexpb.DropJobsRequest) (*commonpb.Status, error) {
|
||||
log.Ctx(ctx).Info("drop index build jobs", zap.String("ClusterID", req.ClusterID), zap.Int64s("IndexBuildIDs", req.BuildIDs))
|
||||
if !commonpbutil.IsHealthyOrStopping(i.stateCode) {
|
||||
stateCode := i.stateCode.Load().(commonpb.StateCode)
|
||||
log.Ctx(ctx).Warn("index node not ready", zap.Int32("state", int32(stateCode)), zap.String("ClusterID", req.ClusterID))
|
||||
if !i.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
|
||||
stateCode := i.lifetime.GetState()
|
||||
log.Ctx(ctx).Warn("index node not ready", zap.String("state", stateCode.String()), zap.String("ClusterID", req.ClusterID))
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: "state code is not healthy",
|
||||
}, nil
|
||||
}
|
||||
defer i.lifetime.Done()
|
||||
keys := make([]taskKey, 0, len(req.BuildIDs))
|
||||
for _, buildID := range req.BuildIDs {
|
||||
keys = append(keys, taskKey{ClusterID: req.ClusterID, BuildID: buildID})
|
||||
|
@ -194,9 +197,9 @@ func (i *IndexNode) DropJobs(ctx context.Context, req *indexpb.DropJobsRequest)
|
|||
}
|
||||
|
||||
func (i *IndexNode) GetJobStats(ctx context.Context, req *indexpb.GetJobStatsRequest) (*indexpb.GetJobStatsResponse, error) {
|
||||
if !commonpbutil.IsHealthyOrStopping(i.stateCode) {
|
||||
stateCode := i.stateCode.Load().(commonpb.StateCode)
|
||||
log.Ctx(ctx).Warn("index node not ready", zap.Int32("state", int32(stateCode)))
|
||||
if !i.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
|
||||
stateCode := i.lifetime.GetState()
|
||||
log.Ctx(ctx).Warn("index node not ready", zap.String("state", stateCode.String()))
|
||||
return &indexpb.GetJobStatsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
|
@ -204,6 +207,7 @@ func (i *IndexNode) GetJobStats(ctx context.Context, req *indexpb.GetJobStatsReq
|
|||
},
|
||||
}, nil
|
||||
}
|
||||
defer i.lifetime.Done()
|
||||
unissued, active := i.sched.IndexBuildQueue.GetTaskNum()
|
||||
jobInfos := make([]*indexpb.JobInfo, 0)
|
||||
i.foreachTaskInfo(func(ClusterID string, buildID UniqueID, info *taskInfo) {
|
||||
|
@ -233,7 +237,7 @@ func (i *IndexNode) GetJobStats(ctx context.Context, req *indexpb.GetJobStatsReq
|
|||
// GetMetrics gets the metrics info of IndexNode.
|
||||
// TODO(dragondriver): cache the Metrics and set a retention to the cache
|
||||
func (i *IndexNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
|
||||
if !commonpbutil.IsHealthyOrStopping(i.stateCode) {
|
||||
if !i.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
|
||||
log.Ctx(ctx).Warn("IndexNode.GetMetrics failed",
|
||||
zap.Int64("nodeID", paramtable.GetNodeID()),
|
||||
zap.String("req", req.Request),
|
||||
|
@ -247,6 +251,7 @@ func (i *IndexNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequ
|
|||
Response: "",
|
||||
}, nil
|
||||
}
|
||||
defer i.lifetime.Done()
|
||||
|
||||
metricType, err := metricsinfo.ParseMetricType(req.Request)
|
||||
if err != nil {
|
||||
|
|
|
@ -349,7 +349,7 @@ Loop:
|
|||
assert.Nil(t, in.Stop())
|
||||
node := in.(*mockIndexNodeComponent).IndexNode
|
||||
assert.Equal(t, 0, len(node.tasks))
|
||||
assert.Equal(t, commonpb.StateCode_Abnormal, node.stateCode.Load().(commonpb.StateCode))
|
||||
assert.Equal(t, commonpb.StateCode_Abnormal, node.lifetime.GetState())
|
||||
}
|
||||
|
||||
func TestAbnormalIndexNode(t *testing.T) {
|
||||
|
|
|
@ -40,21 +40,13 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/util/commonpbutil"
|
||||
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
"github.com/milvus-io/milvus/internal/util/timerecord"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
)
|
||||
|
||||
// isHealthy checks if QueryNode is healthy
|
||||
func (node *QueryNode) isHealthy(code commonpb.StateCode) bool {
|
||||
return code == commonpb.StateCode_Healthy
|
||||
}
|
||||
|
||||
func (node *QueryNode) isHealthyOrStopping(code commonpb.StateCode) bool {
|
||||
return code == commonpb.StateCode_Healthy || code == commonpb.StateCode_Stopping
|
||||
}
|
||||
|
||||
// GetComponentStates returns information about whether the node is healthy
|
||||
func (node *QueryNode) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) {
|
||||
stats := &milvuspb.ComponentStates{
|
||||
|
@ -161,7 +153,7 @@ func (node *QueryNode) getStatisticsWithDmlChannel(ctx context.Context, req *que
|
|||
},
|
||||
}
|
||||
|
||||
if !node.lifetime.Add(node.isHealthyOrStopping) {
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
|
||||
failRet.Status.Reason = msgQueryNodeIsUnhealthy(node.GetSession().ServerID)
|
||||
return failRet, nil
|
||||
}
|
||||
|
@ -290,7 +282,7 @@ func (node *QueryNode) getStatisticsWithDmlChannel(ctx context.Context, req *que
|
|||
func (node *QueryNode) WatchDmChannels(ctx context.Context, in *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) {
|
||||
nodeID := node.GetSession().ServerID
|
||||
// check node healthy
|
||||
if !node.lifetime.Add(node.isHealthy) {
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthy) {
|
||||
err := fmt.Errorf("query node %d is not ready", nodeID)
|
||||
status := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
|
@ -380,7 +372,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, in *querypb.WatchDmC
|
|||
func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmChannelRequest) (*commonpb.Status, error) {
|
||||
// check node healthy
|
||||
nodeID := node.GetSession().ServerID
|
||||
if !node.lifetime.Add(node.isHealthyOrStopping) {
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
|
||||
err := fmt.Errorf("query node %d is not ready", nodeID)
|
||||
status := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
|
@ -439,7 +431,7 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC
|
|||
func (node *QueryNode) LoadSegments(ctx context.Context, in *querypb.LoadSegmentsRequest) (*commonpb.Status, error) {
|
||||
nodeID := node.GetSession().ServerID
|
||||
// check node healthy
|
||||
if !node.lifetime.Add(node.isHealthy) {
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthy) {
|
||||
err := fmt.Errorf("query node %d is not ready", nodeID)
|
||||
status := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
|
@ -524,7 +516,7 @@ func (node *QueryNode) LoadSegments(ctx context.Context, in *querypb.LoadSegment
|
|||
|
||||
// ReleaseCollection clears all data related to this collection on the querynode
|
||||
func (node *QueryNode) ReleaseCollection(ctx context.Context, in *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
|
||||
if !node.lifetime.Add(node.isHealthyOrStopping) {
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
|
||||
err := fmt.Errorf("query node %d is not ready", node.GetSession().ServerID)
|
||||
status := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
|
@ -571,7 +563,7 @@ func (node *QueryNode) ReleaseCollection(ctx context.Context, in *querypb.Releas
|
|||
|
||||
// ReleasePartitions clears all data related to this partition on the querynode
|
||||
func (node *QueryNode) ReleasePartitions(ctx context.Context, in *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) {
|
||||
if !node.lifetime.Add(node.isHealthyOrStopping) {
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
|
||||
err := fmt.Errorf("query node %d is not ready", node.GetSession().ServerID)
|
||||
status := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
|
@ -619,7 +611,7 @@ func (node *QueryNode) ReleasePartitions(ctx context.Context, in *querypb.Releas
|
|||
// ReleaseSegments remove the specified segments from query node according segmentIDs, partitionIDs, and collectionID
|
||||
func (node *QueryNode) ReleaseSegments(ctx context.Context, in *querypb.ReleaseSegmentsRequest) (*commonpb.Status, error) {
|
||||
nodeID := node.GetSession().ServerID
|
||||
if !node.lifetime.Add(node.isHealthyOrStopping) {
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
|
||||
err := fmt.Errorf("query node %d is not ready", nodeID)
|
||||
status := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
|
@ -667,7 +659,7 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, in *querypb.ReleaseS
|
|||
|
||||
// GetSegmentInfo returns segment information of the collection on the queryNode, and the information includes memSize, numRow, indexName, indexID ...
|
||||
func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) {
|
||||
if !node.lifetime.Add(node.isHealthyOrStopping) {
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
|
||||
err := fmt.Errorf("query node %d is not ready", node.GetSession().ServerID)
|
||||
res := &querypb.GetSegmentInfoResponse{
|
||||
Status: &commonpb.Status{
|
||||
|
@ -810,7 +802,7 @@ func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.Se
|
|||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(nodeID), metrics.SearchLabel, metrics.FailLabel).Inc()
|
||||
}
|
||||
}()
|
||||
if !node.lifetime.Add(node.isHealthyOrStopping) {
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
|
||||
failRet.Status.Reason = msgQueryNodeIsUnhealthy(nodeID)
|
||||
return failRet, nil
|
||||
}
|
||||
|
@ -960,7 +952,7 @@ func (node *QueryNode) queryWithDmlChannel(ctx context.Context, req *querypb.Que
|
|||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(nodeID), metrics.SearchLabel, metrics.FailLabel).Inc()
|
||||
}
|
||||
}()
|
||||
if !node.lifetime.Add(node.isHealthyOrStopping) {
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
|
||||
failRet.Status.Reason = msgQueryNodeIsUnhealthy(nodeID)
|
||||
return failRet, nil
|
||||
}
|
||||
|
@ -1177,7 +1169,7 @@ func (node *QueryNode) Query(ctx context.Context, req *querypb.QueryRequest) (*i
|
|||
|
||||
// SyncReplicaSegments syncs replica node & segments states
|
||||
func (node *QueryNode) SyncReplicaSegments(ctx context.Context, req *querypb.SyncReplicaSegmentsRequest) (*commonpb.Status, error) {
|
||||
if !node.lifetime.Add(node.isHealthy) {
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthy) {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: msgQueryNodeIsUnhealthy(node.GetSession().ServerID),
|
||||
|
@ -1204,7 +1196,7 @@ func (node *QueryNode) SyncReplicaSegments(ctx context.Context, req *querypb.Syn
|
|||
// ShowConfigurations returns the configurations of queryNode matching req.Pattern
|
||||
func (node *QueryNode) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
|
||||
nodeID := node.GetSession().ServerID
|
||||
if !node.lifetime.Add(node.isHealthyOrStopping) {
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
|
||||
log.Warn("QueryNode.ShowConfigurations failed",
|
||||
zap.Int64("nodeId", nodeID),
|
||||
zap.String("req", req.Pattern),
|
||||
|
@ -1241,7 +1233,7 @@ func (node *QueryNode) ShowConfigurations(ctx context.Context, req *internalpb.S
|
|||
// GetMetrics return system infos of the query node, such as total memory, memory usage, cpu usage ...
|
||||
func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
|
||||
nodeID := node.GetSession().ServerID
|
||||
if !node.lifetime.Add(node.isHealthyOrStopping) {
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
|
||||
log.Ctx(ctx).Warn("QueryNode.GetMetrics failed",
|
||||
zap.Int64("nodeId", nodeID),
|
||||
zap.String("req", req.Request),
|
||||
|
@ -1310,7 +1302,7 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get
|
|||
zap.Int64("msg-id", req.GetBase().GetMsgID()),
|
||||
zap.Int64("node-id", nodeID),
|
||||
)
|
||||
if !node.lifetime.Add(node.isHealthyOrStopping) {
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
|
||||
log.Warn("QueryNode.GetMetrics failed",
|
||||
zap.Error(errQueryNodeIsUnhealthy(nodeID)))
|
||||
|
||||
|
@ -1402,7 +1394,7 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
|
|||
log := log.Ctx(ctx).With(zap.Int64("collectionID", req.GetCollectionID()), zap.String("channel", req.GetChannel()))
|
||||
nodeID := node.GetSession().ServerID
|
||||
// check node healthy
|
||||
if !node.lifetime.Add(node.isHealthyOrStopping) {
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
|
||||
err := fmt.Errorf("query node %d is not ready", nodeID)
|
||||
status := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
|
|
|
@ -509,15 +509,6 @@ func TestImpl_GetSegmentInfo(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestImpl_isHealthy(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
node, err := genSimpleQueryNode(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.True(t, node.isHealthy(node.lifetime.GetState()))
|
||||
}
|
||||
|
||||
func TestImpl_ShowConfigurations(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package commonpbutil
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||
|
@ -102,18 +101,10 @@ func UpdateMsgBase(msgBase *commonpb.MsgBase, options ...MsgBaseOptions) *common
|
|||
return msgBaseRt
|
||||
}
|
||||
|
||||
func IsHealthy(stateCode atomic.Value) bool {
|
||||
code, ok := stateCode.Load().(commonpb.StateCode)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
return code == commonpb.StateCode_Healthy
|
||||
func IsHealthy(stateCode commonpb.StateCode) bool {
|
||||
return stateCode == commonpb.StateCode_Healthy
|
||||
}
|
||||
|
||||
func IsHealthyOrStopping(stateCode atomic.Value) bool {
|
||||
code, ok := stateCode.Load().(commonpb.StateCode)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
return code == commonpb.StateCode_Healthy || code == commonpb.StateCode_Stopping
|
||||
func IsHealthyOrStopping(stateCode commonpb.StateCode) bool {
|
||||
return stateCode == commonpb.StateCode_Healthy || stateCode == commonpb.StateCode_Stopping
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
package commonpbutil
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||
|
@ -27,53 +26,41 @@ import (
|
|||
)
|
||||
|
||||
func TestIsHealthy(t *testing.T) {
|
||||
{
|
||||
v := atomic.Value{}
|
||||
v.Store(1)
|
||||
assert.False(t, IsHealthy(v))
|
||||
type testCase struct {
|
||||
code commonpb.StateCode
|
||||
expect bool
|
||||
}
|
||||
|
||||
{
|
||||
v := atomic.Value{}
|
||||
v.Store(commonpb.StateCode_Abnormal)
|
||||
assert.False(t, IsHealthy(v))
|
||||
cases := []testCase{
|
||||
{commonpb.StateCode_Healthy, true},
|
||||
{commonpb.StateCode_Initializing, false},
|
||||
{commonpb.StateCode_Abnormal, false},
|
||||
{commonpb.StateCode_StandBy, false},
|
||||
{commonpb.StateCode_Stopping, false},
|
||||
}
|
||||
|
||||
{
|
||||
v := atomic.Value{}
|
||||
v.Store(commonpb.StateCode_Stopping)
|
||||
assert.False(t, IsHealthy(v))
|
||||
}
|
||||
|
||||
{
|
||||
v := atomic.Value{}
|
||||
v.Store(commonpb.StateCode_Healthy)
|
||||
assert.True(t, IsHealthy(v))
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.code.String(), func(t *testing.T) {
|
||||
assert.Equal(t, tc.expect, IsHealthy(tc.code))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsHealthyOrStopping(t *testing.T) {
|
||||
{
|
||||
v := atomic.Value{}
|
||||
v.Store(1)
|
||||
assert.False(t, IsHealthyOrStopping(v))
|
||||
type testCase struct {
|
||||
code commonpb.StateCode
|
||||
expect bool
|
||||
}
|
||||
|
||||
{
|
||||
v := atomic.Value{}
|
||||
v.Store(commonpb.StateCode_Abnormal)
|
||||
assert.False(t, IsHealthyOrStopping(v))
|
||||
cases := []testCase{
|
||||
{commonpb.StateCode_Healthy, true},
|
||||
{commonpb.StateCode_Initializing, false},
|
||||
{commonpb.StateCode_Abnormal, false},
|
||||
{commonpb.StateCode_StandBy, false},
|
||||
{commonpb.StateCode_Stopping, true},
|
||||
}
|
||||
|
||||
{
|
||||
v := atomic.Value{}
|
||||
v.Store(commonpb.StateCode_Stopping)
|
||||
assert.True(t, IsHealthyOrStopping(v))
|
||||
}
|
||||
|
||||
{
|
||||
v := atomic.Value{}
|
||||
v.Store(commonpb.StateCode_Healthy)
|
||||
assert.True(t, IsHealthyOrStopping(v))
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.code.String(), func(t *testing.T) {
|
||||
assert.Equal(t, tc.expect, IsHealthyOrStopping(tc.code))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue