mirror of https://github.com/milvus-io/milvus.git
Add time record log on search/query execution path (#17497)
Signed-off-by: xige-16 <xi.ge@zilliz.com>pull/17514/head
parent
eb4e39099c
commit
16f0815184
|
@ -435,8 +435,10 @@ func (node *QueryNode) Search(ctx context.Context, req *queryPb.SearchRequest) (
|
|||
return failRet, nil
|
||||
}
|
||||
|
||||
msgID := req.GetReq().GetBase().GetMsgID()
|
||||
log.Debug("Received SearchRequest",
|
||||
zap.Int64("msgID", req.GetReq().GetBase().GetMsgID()),
|
||||
zap.Int64("msgID", msgID),
|
||||
zap.Bool("fromShardLeader", req.GetFromShardLeader()),
|
||||
zap.String("vChannel", req.GetDmlChannel()),
|
||||
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
|
||||
zap.Uint64("guaranteeTimestamp", req.GetReq().GetGuaranteeTimestamp()),
|
||||
|
@ -449,13 +451,17 @@ func (node *QueryNode) Search(ctx context.Context, req *queryPb.SearchRequest) (
|
|||
|
||||
qs, err := node.queryShardService.getQueryShard(req.GetDmlChannel())
|
||||
if err != nil {
|
||||
log.Warn("Search failed, failed to get query shard", zap.String("dml channel", req.GetDmlChannel()), zap.Error(err))
|
||||
log.Warn("Search failed, failed to get query shard",
|
||||
zap.Int64("msgID", msgID),
|
||||
zap.String("dml channel", req.GetDmlChannel()),
|
||||
zap.Error(err))
|
||||
failRet.Status.ErrorCode = commonpb.ErrorCode_NotShardLeader
|
||||
failRet.Status.Reason = err.Error()
|
||||
return failRet, nil
|
||||
}
|
||||
|
||||
tr := timerecord.NewTimeRecorder(fmt.Sprintf("search %d", req.Req.CollectionID))
|
||||
tr := timerecord.NewTimeRecorder(fmt.Sprintf("start do search, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
|
||||
msgID, req.GetFromShardLeader(), req.GetDmlChannel(), req.GetSegmentIDs()))
|
||||
|
||||
if req.FromShardLeader {
|
||||
historicalTask, err2 := newSearchTask(ctx, req)
|
||||
|
@ -477,6 +483,9 @@ func (node *QueryNode) Search(ctx context.Context, req *queryPb.SearchRequest) (
|
|||
return failRet, nil
|
||||
}
|
||||
|
||||
tr.Elapse(fmt.Sprintf("do search done, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
|
||||
msgID, req.GetFromShardLeader(), req.GetDmlChannel(), req.GetSegmentIDs()))
|
||||
|
||||
failRet.Status.ErrorCode = commonpb.ErrorCode_Success
|
||||
metrics.QueryNodeSQLatencyInQueue.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
|
||||
metrics.SearchLabel).Observe(float64(historicalTask.queueDur.Milliseconds()))
|
||||
|
@ -510,7 +519,7 @@ func (node *QueryNode) Search(ctx context.Context, req *queryPb.SearchRequest) (
|
|||
// shard leader dispatches request to its shard cluster
|
||||
oResults, cErr := cluster.Search(searchCtx, req)
|
||||
if cErr != nil {
|
||||
log.Warn("search cluster failed", zap.Int64("collectionID", req.Req.GetCollectionID()), zap.Error(cErr))
|
||||
log.Warn("search cluster failed", zap.Int64("msgID", msgID), zap.Int64("collectionID", req.Req.GetCollectionID()), zap.Error(cErr))
|
||||
cancel()
|
||||
errCluster = cErr
|
||||
return
|
||||
|
@ -568,6 +577,10 @@ func (node *QueryNode) Search(ctx context.Context, req *queryPb.SearchRequest) (
|
|||
failRet.Status.Reason = mainErr.Error()
|
||||
return failRet, nil
|
||||
}
|
||||
|
||||
tr.Elapse(fmt.Sprintf("start reduce search result, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
|
||||
msgID, req.GetFromShardLeader(), req.GetDmlChannel(), req.GetSegmentIDs()))
|
||||
|
||||
results = append(results, streamingResult)
|
||||
ret, err2 := reduceSearchResults(results, req.Req.GetNq(), req.Req.GetTopk(), req.Req.GetMetricType())
|
||||
if err2 != nil {
|
||||
|
@ -575,6 +588,9 @@ func (node *QueryNode) Search(ctx context.Context, req *queryPb.SearchRequest) (
|
|||
return failRet, nil
|
||||
}
|
||||
|
||||
tr.Elapse(fmt.Sprintf("do search done, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
|
||||
msgID, req.GetFromShardLeader(), req.GetDmlChannel(), req.GetSegmentIDs()))
|
||||
|
||||
failRet.Status.ErrorCode = commonpb.ErrorCode_Success
|
||||
latency := tr.ElapseSpan()
|
||||
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel).Observe(float64(latency.Milliseconds()))
|
||||
|
@ -603,8 +619,10 @@ func (node *QueryNode) Query(ctx context.Context, req *queryPb.QueryRequest) (*i
|
|||
return failRet, nil
|
||||
}
|
||||
|
||||
msgID := req.GetReq().GetBase().GetMsgID()
|
||||
log.Debug("Received QueryRequest",
|
||||
zap.Int64("msgID", req.GetReq().GetBase().GetMsgID()),
|
||||
zap.Int64("msgID", msgID),
|
||||
zap.Bool("fromShardLeader", req.GetFromShardLeader()),
|
||||
zap.String("vChannel", req.GetDmlChannel()),
|
||||
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
|
||||
zap.Uint64("guaranteeTimestamp", req.GetReq().GetGuaranteeTimestamp()),
|
||||
|
@ -617,12 +635,14 @@ func (node *QueryNode) Query(ctx context.Context, req *queryPb.QueryRequest) (*i
|
|||
|
||||
qs, err := node.queryShardService.getQueryShard(req.GetDmlChannel())
|
||||
if err != nil {
|
||||
log.Warn("Query failed, failed to get query shard", zap.String("dml channel", req.GetDmlChannel()), zap.Error(err))
|
||||
log.Warn("Query failed, failed to get query shard", zap.Int64("msgID", msgID), zap.String("dml channel", req.GetDmlChannel()), zap.Error(err))
|
||||
failRet.Status.Reason = err.Error()
|
||||
return failRet, nil
|
||||
}
|
||||
|
||||
tr := timerecord.NewTimeRecorder(fmt.Sprintf("retrieve %d", req.Req.CollectionID))
|
||||
tr := timerecord.NewTimeRecorder(fmt.Sprintf("start do query, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
|
||||
msgID, req.GetFromShardLeader(), req.GetDmlChannel(), req.GetSegmentIDs()))
|
||||
|
||||
if req.FromShardLeader {
|
||||
// construct a queryTask
|
||||
queryTask := newQueryTask(ctx, req)
|
||||
|
@ -639,6 +659,10 @@ func (node *QueryNode) Query(ctx context.Context, req *queryPb.QueryRequest) (*i
|
|||
failRet.Status.Reason = err2.Error()
|
||||
return failRet, nil
|
||||
}
|
||||
|
||||
tr.Elapse(fmt.Sprintf("do query done, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
|
||||
msgID, req.GetFromShardLeader(), req.GetDmlChannel(), req.GetSegmentIDs()))
|
||||
|
||||
failRet.Status.ErrorCode = commonpb.ErrorCode_Success
|
||||
metrics.QueryNodeSQLatencyInQueue.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
|
||||
metrics.QueryLabel).Observe(float64(queryTask.queueDur.Milliseconds()))
|
||||
|
@ -671,8 +695,7 @@ func (node *QueryNode) Query(ctx context.Context, req *queryPb.QueryRequest) (*i
|
|||
// shard leader dispatches request to its shard cluster
|
||||
oResults, cErr := cluster.Query(queryCtx, req)
|
||||
if cErr != nil {
|
||||
log.Warn("failed to query cluster", zap.Int64("collectionID", req.Req.GetCollectionID()), zap.Error(cErr))
|
||||
log.Info("czs_query_cluster_cancel", zap.Error(cErr))
|
||||
log.Warn("failed to query cluster", zap.Int64("msgID", msgID), zap.Int64("collectionID", req.Req.GetCollectionID()), zap.Error(cErr))
|
||||
errCluster = cErr
|
||||
cancel()
|
||||
return
|
||||
|
@ -725,12 +748,20 @@ func (node *QueryNode) Query(ctx context.Context, req *queryPb.QueryRequest) (*i
|
|||
failRet.Status.Reason = mainErr.Error()
|
||||
return failRet, nil
|
||||
}
|
||||
|
||||
tr.Elapse(fmt.Sprintf("start reduce query result, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
|
||||
msgID, req.GetFromShardLeader(), req.GetDmlChannel(), req.GetSegmentIDs()))
|
||||
|
||||
results = append(results, streamingResult)
|
||||
ret, err2 := mergeInternalRetrieveResults(results)
|
||||
if err2 != nil {
|
||||
failRet.Status.Reason = err2.Error()
|
||||
return failRet, nil
|
||||
}
|
||||
|
||||
tr.Elapse(fmt.Sprintf("do query done, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
|
||||
msgID, req.GetFromShardLeader(), req.GetDmlChannel(), req.GetSegmentIDs()))
|
||||
|
||||
failRet.Status.ErrorCode = commonpb.ErrorCode_Success
|
||||
latency := tr.ElapseSpan()
|
||||
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.QueryLabel).Observe(float64(latency.Milliseconds()))
|
||||
|
|
Loading…
Reference in New Issue