mirror of https://github.com/milvus-io/milvus.git
Use timerecord to calculate time span (#10287)
Signed-off-by: yudong.cai <yudong.cai@zilliz.com>pull/9549/head
parent
c18fa9b785
commit
b40513b211
|
@ -24,15 +24,13 @@ import (
|
|||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/milvus-io/milvus/internal/allocator"
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/msgstream"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
|
@ -46,6 +44,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/milvus-io/milvus/internal/util/indexparamcheck"
|
||||
"github.com/milvus-io/milvus/internal/util/timerecord"
|
||||
"github.com/milvus-io/milvus/internal/util/trace"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
)
|
||||
|
@ -1729,13 +1728,12 @@ func decodeSearchResultsSerial(searchResults []*internalpb.SearchResults) ([]*sc
|
|||
return results, nil
|
||||
}
|
||||
|
||||
func decodeSearchResults(searchResults []*internalpb.SearchResults) ([]*schemapb.SearchResultData, error) {
|
||||
t := time.Now()
|
||||
defer func() {
|
||||
log.Debug("decodeSearchResults", zap.Any("time cost", time.Since(t)))
|
||||
}()
|
||||
return decodeSearchResultsSerial(searchResults)
|
||||
// return decodeSearchResultsParallelByCPU(searchResults)
|
||||
func decodeSearchResults(searchResults []*internalpb.SearchResults) (res []*schemapb.SearchResultData, err error) {
|
||||
tr := timerecord.NewTimeRecorder("decodeSearchResults")
|
||||
res, err = decodeSearchResultsSerial(searchResults)
|
||||
// res, err = decodeSearchResultsParallelByCPU(searchResults)
|
||||
tr.Elapse("done")
|
||||
return
|
||||
}
|
||||
|
||||
func reduceSearchResultDataParallel(searchResultData []*schemapb.SearchResultData, availableQueryNodeNum int64,
|
||||
|
@ -1961,12 +1959,11 @@ func reduceSearchResultDataParallel(searchResultData []*schemapb.SearchResultDat
|
|||
}
|
||||
|
||||
func reduceSearchResultData(searchResultData []*schemapb.SearchResultData, availableQueryNodeNum int64,
|
||||
nq int64, topk int64, metricType string) (*milvuspb.SearchResults, error) {
|
||||
t := time.Now()
|
||||
defer func() {
|
||||
log.Debug("reduceSearchResults", zap.Any("time cost", time.Since(t)))
|
||||
}()
|
||||
return reduceSearchResultDataParallel(searchResultData, availableQueryNodeNum, nq, topk, metricType, runtime.NumCPU())
|
||||
nq int64, topk int64, metricType string) (res *milvuspb.SearchResults, err error) {
|
||||
tr := timerecord.NewTimeRecorder("reduceSearchResults")
|
||||
res, err = reduceSearchResultDataParallel(searchResultData, availableQueryNodeNum, nq, topk, metricType, runtime.NumCPU())
|
||||
tr.Elapse("done")
|
||||
return
|
||||
}
|
||||
|
||||
//func printSearchResult(partialSearchResult *internalpb.SearchResults) {
|
||||
|
@ -1984,9 +1981,9 @@ func reduceSearchResultData(searchResultData []*schemapb.SearchResultData, avail
|
|||
func (st *searchTask) PostExecute(ctx context.Context) error {
|
||||
sp, ctx := trace.StartSpanFromContextWithOperationName(st.TraceCtx(), "Proxy-Search-PostExecute")
|
||||
defer sp.Finish()
|
||||
t0 := time.Now()
|
||||
tr := timerecord.NewTimeRecorder("searchTask PostExecute")
|
||||
defer func() {
|
||||
log.Debug("WaitAndPostExecute", zap.Any("time cost", time.Since(t0)))
|
||||
tr.Elapse("done")
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
|
@ -2009,8 +2006,8 @@ func (st *searchTask) PostExecute(ctx context.Context) error {
|
|||
|
||||
availableQueryNodeNum := len(filterSearchResult)
|
||||
log.Debug("Proxy Search PostExecute stage1",
|
||||
zap.Any("availableQueryNodeNum", availableQueryNodeNum),
|
||||
zap.Any("time cost", time.Since(t0)))
|
||||
zap.Any("availableQueryNodeNum", availableQueryNodeNum))
|
||||
tr.Record("Proxy Search PostExecute stage1 done")
|
||||
if availableQueryNodeNum <= 0 {
|
||||
st.result = &milvuspb.SearchResults{
|
||||
Status: &commonpb.Status{
|
||||
|
@ -2401,9 +2398,9 @@ func (qt *queryTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (qt *queryTask) PostExecute(ctx context.Context) error {
|
||||
t0 := time.Now()
|
||||
tr := timerecord.NewTimeRecorder("queryTask PostExecute")
|
||||
defer func() {
|
||||
log.Debug("WaitAndPostExecute", zap.Any("time cost", time.Since(t0)))
|
||||
tr.Elapse("done")
|
||||
}()
|
||||
select {
|
||||
case <-qt.TraceCtx().Done():
|
||||
|
|
Loading…
Reference in New Issue