mirror of https://github.com/milvus-io/milvus.git
parent
b6f69fe7f2
commit
2a760c108c
|
@ -46,6 +46,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/util/merr"
|
||||
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
||||
"github.com/milvus-io/milvus/internal/util/paramtable"
|
||||
"github.com/milvus-io/milvus/internal/util/timerecord"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
)
|
||||
|
||||
|
@ -643,6 +644,8 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (
|
|||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.FailLabel).Inc()
|
||||
return failRet, nil
|
||||
}
|
||||
|
||||
tr := timerecord.NewTimeRecorderWithTrace(ctx, "searchRequestReduce")
|
||||
ret, err := segments.ReduceSearchResults(ctx, toReduceResults, req.Req.GetNq(), req.Req.GetTopk(), req.Req.GetMetricType())
|
||||
if err != nil {
|
||||
log.Warn("failed to reduce search results", zap.Error(err))
|
||||
|
@ -650,6 +653,10 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (
|
|||
failRet.Status.Reason = err.Error()
|
||||
return failRet, nil
|
||||
}
|
||||
metrics.QueryNodeReduceLatency.WithLabelValues(
|
||||
fmt.Sprint(paramtable.GetNodeID()),
|
||||
metrics.SearchLabel).
|
||||
Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
|
||||
if !req.FromShardLeader {
|
||||
collector.Rate.Add(metricsinfo.NQPerSecond, float64(req.GetReq().GetNq()))
|
||||
|
|
|
@ -46,6 +46,7 @@ func (s *Scheduler) Add(task Task) bool {
|
|||
case *SearchTask:
|
||||
select {
|
||||
case s.searchWaitQueue <- t:
|
||||
t.tr.RecordSpan()
|
||||
metrics.QueryNodeReadTaskUnsolveLen.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc()
|
||||
default:
|
||||
return false
|
||||
|
@ -65,6 +66,12 @@ func (s *Scheduler) Schedule(ctx context.Context) {
|
|||
if !s.tryPromote(task) {
|
||||
break
|
||||
}
|
||||
|
||||
inQueueDuration := task.tr.RecordSpan()
|
||||
metrics.QueryNodeSQLatencyInQueue.WithLabelValues(
|
||||
fmt.Sprint(paramtable.GetNodeID()),
|
||||
metrics.SearchLabel).
|
||||
Observe(float64(inQueueDuration.Milliseconds()))
|
||||
s.process(task)
|
||||
s.mergedSearchTasks.Remove(task)
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/util"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/milvus-io/milvus/internal/util/paramtable"
|
||||
"github.com/milvus-io/milvus/internal/util/timerecord"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -34,6 +35,8 @@ type SearchTask struct {
|
|||
originNqs []int64
|
||||
others []*SearchTask
|
||||
notifier chan error
|
||||
|
||||
tr *timerecord.TimeRecorder
|
||||
}
|
||||
|
||||
func NewSearchTask(ctx context.Context,
|
||||
|
@ -49,6 +52,8 @@ func NewSearchTask(ctx context.Context,
|
|||
originTopks: []int64{req.GetReq().GetTopk()},
|
||||
originNqs: []int64{req.GetReq().GetNq()},
|
||||
notifier: make(chan error, 1),
|
||||
|
||||
tr: timerecord.NewTimeRecorderWithTrace(ctx, "searchTask"),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -102,6 +107,7 @@ func (t *SearchTask) Execute() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
tr := timerecord.NewTimeRecorderWithTrace(t.ctx, "searchTaskReduce")
|
||||
blobs, err := segments.ReduceSearchResultsAndFillData(
|
||||
searchReq.Plan(),
|
||||
results,
|
||||
|
@ -124,6 +130,11 @@ func (t *SearchTask) Execute() error {
|
|||
bs := make([]byte, len(blob))
|
||||
copy(bs, blob)
|
||||
|
||||
metrics.QueryNodeReduceLatency.WithLabelValues(
|
||||
fmt.Sprint(paramtable.GetNodeID()),
|
||||
metrics.SearchLabel).
|
||||
Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
|
||||
t.result = &internalpb.SearchResults{
|
||||
Status: util.WrapStatus(commonpb.ErrorCode_Success, ""),
|
||||
MetricType: req.GetReq().GetMetricType(),
|
||||
|
|
Loading…
Reference in New Issue