mirror of https://github.com/milvus-io/milvus.git
parent
951c312ef9
commit
c84496a1ba
|
@ -106,8 +106,6 @@ func (node *QueryNode) loadDeltaLogs(ctx context.Context, req *querypb.LoadSegme
|
|||
}
|
||||
|
||||
func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryRequest, channel string) (*internalpb.RetrieveResults, error) {
|
||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel).Inc()
|
||||
failRet := WrapRetrieveResult(commonpb.ErrorCode_UnexpectedError, "")
|
||||
msgID := req.Req.Base.GetMsgID()
|
||||
traceID := trace.SpanFromContext(ctx).SpanContext().TraceID()
|
||||
log := log.Ctx(ctx).With(
|
||||
|
@ -117,9 +115,11 @@ func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryReque
|
|||
zap.String("scope", req.GetScope().String()),
|
||||
)
|
||||
|
||||
failRet := WrapRetrieveResult(commonpb.ErrorCode_UnexpectedError, "")
|
||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.Leader).Inc()
|
||||
defer func() {
|
||||
if failRet.Status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.FailLabel).Inc()
|
||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.FailLabel, metrics.Leader).Inc()
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -189,7 +189,7 @@ func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryReque
|
|||
failRet.Status.ErrorCode = commonpb.ErrorCode_Success
|
||||
latency := tr.ElapseSpan()
|
||||
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.Leader).Observe(float64(latency.Milliseconds()))
|
||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.SuccessLabel).Inc()
|
||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.SuccessLabel, metrics.Leader).Inc()
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
|
@ -294,6 +294,14 @@ func (node *QueryNode) searchChannel(ctx context.Context, req *querypb.SearchReq
|
|||
}
|
||||
defer node.lifetime.Done()
|
||||
|
||||
failRet := WrapSearchResult(commonpb.ErrorCode_UnexpectedError, "")
|
||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.Leader).Inc()
|
||||
defer func() {
|
||||
if failRet.Status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.FailLabel, metrics.Leader).Inc()
|
||||
}
|
||||
}()
|
||||
|
||||
log.Debug("start to search channel",
|
||||
zap.Bool("fromShardLeader", req.GetFromShardLeader()),
|
||||
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
|
||||
|
@ -308,18 +316,21 @@ func (node *QueryNode) searchChannel(ctx context.Context, req *querypb.SearchReq
|
|||
if !ok {
|
||||
err := merr.WrapErrServiceUnavailable("failed to get query shard delegator")
|
||||
log.Warn("Query failed, failed to get query shard delegator", zap.Error(err))
|
||||
return nil, err
|
||||
failRet.Status.Reason = err.Error()
|
||||
return failRet, err
|
||||
}
|
||||
req, err := node.optimizeSearchParams(ctx, req, sd)
|
||||
if err != nil {
|
||||
log.Warn("failed to optimize search params", zap.Error(err))
|
||||
return nil, err
|
||||
failRet.Status.Reason = err.Error()
|
||||
return failRet, err
|
||||
}
|
||||
// do search
|
||||
results, err := sd.Search(searchCtx, req)
|
||||
if err != nil {
|
||||
log.Warn("failed to search on delegator", zap.Error(err))
|
||||
return nil, err
|
||||
failRet.Status.Reason = err.Error()
|
||||
return failRet, err
|
||||
}
|
||||
|
||||
// reduce result
|
||||
|
@ -332,7 +343,8 @@ func (node *QueryNode) searchChannel(ctx context.Context, req *querypb.SearchReq
|
|||
|
||||
ret, err := segments.ReduceSearchResults(ctx, results, req.Req.GetNq(), req.Req.GetTopk(), req.Req.GetMetricType())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
failRet.Status.Reason = err.Error()
|
||||
return failRet, err
|
||||
}
|
||||
|
||||
tr.CtxElapse(ctx, fmt.Sprintf("do search with channel done , vChannel = %s, segmentIDs = %v",
|
||||
|
@ -341,9 +353,10 @@ func (node *QueryNode) searchChannel(ctx context.Context, req *querypb.SearchReq
|
|||
))
|
||||
|
||||
// update metric to prometheus
|
||||
failRet.Status.ErrorCode = commonpb.ErrorCode_Success
|
||||
latency := tr.ElapseSpan()
|
||||
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.Leader).Observe(float64(latency.Milliseconds()))
|
||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.SuccessLabel).Inc()
|
||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.SuccessLabel, metrics.Leader).Inc()
|
||||
metrics.QueryNodeSearchNQ.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(req.Req.GetNq()))
|
||||
metrics.QueryNodeSearchTopK.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(req.Req.GetTopk()))
|
||||
|
||||
|
|
|
@ -696,6 +696,14 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe
|
|||
}
|
||||
defer node.lifetime.Done()
|
||||
|
||||
failRet := WrapSearchResult(commonpb.ErrorCode_UnexpectedError, "")
|
||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.TotalLabel, metrics.FromLeader).Inc()
|
||||
defer func() {
|
||||
if failRet.Status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.FailLabel, metrics.FromLeader).Inc()
|
||||
}
|
||||
}()
|
||||
|
||||
log.Debug("start to search segments on worker",
|
||||
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
|
||||
)
|
||||
|
@ -708,19 +716,23 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe
|
|||
collection := node.manager.Collection.Get(req.Req.GetCollectionID())
|
||||
if collection == nil {
|
||||
log.Warn("failed to search segments", zap.Error(segments.ErrCollectionNotFound))
|
||||
return nil, segments.WrapCollectionNotFound(req.GetReq().GetCollectionID())
|
||||
err := segments.WrapCollectionNotFound(req.GetReq().GetCollectionID())
|
||||
failRet.Status.Reason = err.Error()
|
||||
return failRet, err
|
||||
}
|
||||
|
||||
task := tasks.NewSearchTask(searchCtx, collection, node.manager, req)
|
||||
if err := node.scheduler.Add(task); err != nil {
|
||||
log.Warn("failed to search channel", zap.Error(err))
|
||||
return nil, err
|
||||
failRet.Status.Reason = err.Error()
|
||||
return failRet, err
|
||||
}
|
||||
|
||||
err := task.Wait()
|
||||
if err != nil {
|
||||
log.Warn("failed to search segments", zap.Error(err))
|
||||
return nil, err
|
||||
failRet.Status.Reason = err.Error()
|
||||
return failRet, err
|
||||
}
|
||||
|
||||
tr.CtxElapse(ctx, fmt.Sprintf("search segments done, channel = %s, segmentIDs = %v",
|
||||
|
@ -729,9 +741,10 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe
|
|||
))
|
||||
|
||||
// TODO QueryNodeSQLatencyInQueue QueryNodeReduceLatency
|
||||
failRet.Status.ErrorCode = commonpb.ErrorCode_Success
|
||||
latency := tr.ElapseSpan()
|
||||
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.FromLeader).Observe(float64(latency.Milliseconds()))
|
||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.SuccessLabel).Inc()
|
||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.SuccessLabel, metrics.FromLeader).Inc()
|
||||
|
||||
result := task.Result()
|
||||
if result.CostAggregation != nil {
|
||||
|
@ -778,8 +791,6 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (
|
|||
common.WrapNodeIDNotMatchMsg(targetID, paramtable.GetNodeID())), nil
|
||||
}
|
||||
|
||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.TotalLabel).Inc()
|
||||
|
||||
failRet := &internalpb.SearchResults{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
|
@ -835,7 +846,6 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (
|
|||
})
|
||||
}
|
||||
if err := runningGp.Wait(); err != nil {
|
||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.FailLabel).Inc()
|
||||
return failRet, nil
|
||||
}
|
||||
|
||||
|
@ -865,7 +875,6 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (
|
|||
|
||||
// only used for delegator query segments from worker
|
||||
func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error) {
|
||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel).Inc()
|
||||
failRet := WrapRetrieveResult(commonpb.ErrorCode_UnexpectedError, "")
|
||||
msgID := req.Req.Base.GetMsgID()
|
||||
traceID := trace.SpanFromContext(ctx).SpanContext().TraceID()
|
||||
|
@ -877,12 +886,6 @@ func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequ
|
|||
zap.String("scope", req.GetScope().String()),
|
||||
)
|
||||
|
||||
defer func() {
|
||||
if failRet.Status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.FailLabel).Inc()
|
||||
}
|
||||
}()
|
||||
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthy) {
|
||||
err := merr.WrapErrServiceUnavailable(fmt.Sprintf("node id: %d is unhealthy", paramtable.GetNodeID()))
|
||||
failRet.Status = merr.Status(err)
|
||||
|
@ -890,6 +893,13 @@ func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequ
|
|||
}
|
||||
defer node.lifetime.Done()
|
||||
|
||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.FromLeader).Inc()
|
||||
defer func() {
|
||||
if failRet.Status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.FailLabel, metrics.FromLeader).Inc()
|
||||
}
|
||||
}()
|
||||
|
||||
log.Debug("start do query segments",
|
||||
zap.Bool("fromShardLeader", req.GetFromShardLeader()),
|
||||
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
|
||||
|
@ -917,7 +927,7 @@ func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequ
|
|||
// TODO QueryNodeSQLatencyInQueue QueryNodeReduceLatency
|
||||
latency := tr.ElapseSpan()
|
||||
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.FromLeader).Observe(float64(latency.Milliseconds()))
|
||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.SuccessLabel).Inc()
|
||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.SuccessLabel, metrics.FromLeader).Inc()
|
||||
results.CostAggregation = &internalpb.CostAggregation{
|
||||
ServiceTime: latency.Milliseconds(),
|
||||
ResponseTime: latency.Milliseconds(),
|
||||
|
|
|
@ -113,6 +113,7 @@ var (
|
|||
nodeIDLabelName,
|
||||
queryTypeLabelName,
|
||||
statusLabelName,
|
||||
requestScope,
|
||||
})
|
||||
|
||||
QueryNodeSQReqLatency = prometheus.NewHistogramVec(
|
||||
|
|
Loading…
Reference in New Issue