mirror of https://github.com/milvus-io/milvus.git
add metrics for query reduce (#27201)
/kind improvement Signed-off-by: MrPresent-Han <chun.han@zilliz.com>pull/27219/head
parent
a3da25823f
commit
33e3e78937
|
@ -2,7 +2,9 @@ package tasks
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
|
@ -87,9 +89,8 @@ func (t *QueryTask) Execute() error {
|
|||
return err
|
||||
}
|
||||
defer retrievePlan.Delete()
|
||||
|
||||
results, searchedSegments, err := segments.Retrieve(t.ctx, t.segmentManager, retrievePlan, t.req)
|
||||
defer t.segmentManager.Segment.Unpin(searchedSegments)
|
||||
results, querySegments, err := segments.Retrieve(t.ctx, t.segmentManager, retrievePlan, t.req)
|
||||
defer t.segmentManager.Segment.Unpin(querySegments)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -98,8 +99,13 @@ func (t *QueryTask) Execute() error {
|
|||
t.req,
|
||||
t.collection.Schema(),
|
||||
)
|
||||
|
||||
beforeReduce := time.Now()
|
||||
reducedResult, err := reducer.Reduce(t.ctx, results)
|
||||
|
||||
metrics.QueryNodeReduceLatency.WithLabelValues(
|
||||
fmt.Sprint(paramtable.GetNodeID()),
|
||||
metrics.QueryLabel,
|
||||
metrics.ReduceSegments).Observe(float64(time.Since(beforeReduce).Milliseconds()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -194,8 +194,11 @@ func (t *SearchTask) Execute() error {
|
|||
return err
|
||||
}
|
||||
defer segments.DeleteSearchResultDataBlobs(blobs)
|
||||
reduceLatency := tr.RecordSpan()
|
||||
|
||||
metrics.QueryNodeReduceLatency.WithLabelValues(
|
||||
fmt.Sprint(paramtable.GetNodeID()),
|
||||
metrics.SearchLabel,
|
||||
metrics.ReduceSegments).
|
||||
Observe(float64(tr.RecordSpan().Milliseconds()))
|
||||
for i := range t.originNqs {
|
||||
blob, err := segments.GetSearchResultDataBlob(blobs, i)
|
||||
if err != nil {
|
||||
|
@ -213,12 +216,6 @@ func (t *SearchTask) Execute() error {
|
|||
bs := make([]byte, len(blob))
|
||||
copy(bs, blob)
|
||||
|
||||
metrics.QueryNodeReduceLatency.WithLabelValues(
|
||||
fmt.Sprint(paramtable.GetNodeID()),
|
||||
metrics.SearchLabel,
|
||||
metrics.ReduceSegments).
|
||||
Observe(float64(reduceLatency.Milliseconds()))
|
||||
|
||||
task.result = &internalpb.SearchResults{
|
||||
Base: &commonpb.MsgBase{
|
||||
SourceID: paramtable.GetNodeID(),
|
||||
|
@ -235,6 +232,7 @@ func (t *SearchTask) Execute() error {
|
|||
},
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue