Refine search logs in querynode (#21136)

- Make sure all logs logging with ctx
- Add an info log for searching on not-indexed segments
- Separate log prefix of shardleader and non-shardleaders

See also: #20975

Signed-off-by: yangxuan <xuan.yang@zilliz.com>

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/21137/head
XuanYang-cn 2022-12-12 15:23:22 +08:00 committed by GitHub
parent f209c9cc21
commit cc8cc985e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 87 additions and 85 deletions

View File

@ -702,13 +702,6 @@ func (node *QueryNode) isHealthy() bool {
// Search performs replica search tasks.
func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) {
log.Debug("Received SearchRequest",
zap.Int64("msgID", req.GetReq().GetBase().GetMsgID()),
zap.Strings("vChannels", req.GetDmlChannels()),
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
zap.Uint64("guaranteeTimestamp", req.GetReq().GetGuaranteeTimestamp()),
zap.Uint64("timeTravel", req.GetReq().GetTravelTimestamp()))
if req.GetReq().GetBase().GetTargetID() != node.session.ServerID {
return &internalpb.SearchResults{
Status: &commonpb.Status{
@ -725,6 +718,16 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (
ErrorCode: commonpb.ErrorCode_Success,
},
}
tr := timerecord.NewTimeRecorder("Search")
if !req.GetFromShardLeader() {
log.Ctx(ctx).Debug("Received SearchRequest",
zap.Strings("vChannels", req.GetDmlChannels()),
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
zap.Uint64("guaranteeTimestamp", req.GetReq().GetGuaranteeTimestamp()),
zap.Uint64("timeTravel", req.GetReq().GetTravelTimestamp()))
}
toReduceResults := make([]*internalpb.SearchResults, 0)
runningGp, runningCtx := errgroup.WithContext(ctx)
mu := &sync.Mutex{}
@ -758,6 +761,7 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (
if err := runningGp.Wait(); err != nil {
return failRet, nil
}
ret, err := reduceSearchResults(ctx, toReduceResults, req.Req.GetNq(), req.Req.GetTopk(), req.Req.GetMetricType())
if err != nil {
failRet.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
@ -766,6 +770,7 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (
}
if !req.FromShardLeader {
tr.CtxElapse(ctx, "search done in all shards")
rateCol.Add(metricsinfo.NQPerSecond, float64(req.GetReq().GetNq()))
rateCol.Add(metricsinfo.SearchThroughput, float64(proto.Size(req)))
metrics.QueryNodeExecuteCounter.WithLabelValues(strconv.FormatInt(Params.QueryNodeCfg.GetNodeID(), 10), metrics.SearchLabel).Add(float64(proto.Size(req)))
@ -791,15 +796,6 @@ func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.Se
return failRet, nil
}
msgID := req.GetReq().GetBase().GetMsgID()
log.Ctx(ctx).Debug("Received SearchRequest",
zap.Int64("msgID", msgID),
zap.Bool("fromShardLeader", req.GetFromShardLeader()),
zap.String("vChannel", dmlChannel),
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
zap.Uint64("guaranteeTimestamp", req.GetReq().GetGuaranteeTimestamp()),
zap.Uint64("timeTravel", req.GetReq().GetTravelTimestamp()))
if node.queryShardService == nil {
failRet.Status.Reason = "queryShardService is nil"
return failRet, nil
@ -808,22 +804,20 @@ func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.Se
qs, err := node.queryShardService.getQueryShard(dmlChannel)
if err != nil {
log.Ctx(ctx).Warn("Search failed, failed to get query shard",
zap.Int64("msgID", msgID),
zap.String("dml channel", dmlChannel),
zap.String("vChannel", dmlChannel),
zap.Error(err))
failRet.Status.ErrorCode = commonpb.ErrorCode_NotShardLeader
failRet.Status.Reason = err.Error()
return failRet, nil
}
log.Ctx(ctx).Debug("start do search",
zap.Int64("msgID", msgID),
zap.Bool("fromShardLeader", req.GetFromShardLeader()),
zap.String("vChannel", dmlChannel),
zap.Int64s("segmentIDs", req.GetSegmentIDs()))
tr := timerecord.NewTimeRecorder("")
if req.FromShardLeader {
tr := timerecord.NewTimeRecorder("SubSearch")
log.Ctx(ctx).Debug("start do subsearch",
zap.Bool("fromShardLeader", req.GetFromShardLeader()),
zap.String("vChannel", dmlChannel),
zap.Int64s("segmentIDs", req.GetSegmentIDs()))
historicalTask, err2 := newSearchTask(ctx, req)
if err2 != nil {
failRet.Status.Reason = err2.Error()
@ -843,8 +837,7 @@ func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.Se
return failRet, nil
}
tr.CtxElapse(ctx, fmt.Sprintf("do search done, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
msgID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs()))
tr.CtxElapse(ctx, fmt.Sprintf("do subsearch done, vChannel = %s, segmentIDs = %v", dmlChannel, req.GetSegmentIDs()))
failRet.Status.ErrorCode = commonpb.ErrorCode_Success
metrics.QueryNodeSQLatencyInQueue.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
@ -858,6 +851,11 @@ func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.Se
}
//from Proxy
tr := timerecord.NewTimeRecorder("SearchShard")
log.Ctx(ctx).Debug("start do search",
zap.String("vChannel", dmlChannel),
zap.Int64s("segmentIDs", req.GetSegmentIDs()))
cluster, ok := qs.clusterService.getShardCluster(dmlChannel)
if !ok {
failRet.Status.ErrorCode = commonpb.ErrorCode_NotShardLeader
@ -865,12 +863,14 @@ func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.Se
return failRet, nil
}
searchCtx, cancel := context.WithCancel(ctx)
defer cancel()
var (
searchCtx, cancel = context.WithCancel(ctx)
var results []*internalpb.SearchResults
var streamingResult *internalpb.SearchResults
var errCluster error
results []*internalpb.SearchResults
streamingResult *internalpb.SearchResults
errCluster error
)
defer cancel()
withStreaming := func(ctx context.Context) error {
streamingTask, err := newSearchTask(searchCtx, req)
@ -898,13 +898,12 @@ func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.Se
// shard leader dispatches request to its shard cluster
results, errCluster = cluster.Search(searchCtx, req, withStreaming)
if errCluster != nil {
log.Ctx(ctx).Warn("search cluster failed", zap.Int64("msgID", msgID), zap.Int64("collectionID", req.Req.GetCollectionID()), zap.Error(errCluster))
log.Ctx(ctx).Warn("search shard cluster failed", zap.String("vChannel", dmlChannel), zap.Error(errCluster))
failRet.Status.Reason = errCluster.Error()
return failRet, nil
}
tr.CtxElapse(ctx, fmt.Sprintf("start reduce search result, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
msgID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs()))
tr.CtxElapse(ctx, fmt.Sprintf("do search done in shard cluster, vChannel = %s, segmentIDs = %v", dmlChannel, req.GetSegmentIDs()))
results = append(results, streamingResult)
ret, err2 := reduceSearchResults(ctx, results, req.Req.GetNq(), req.Req.GetTopk(), req.Req.GetMetricType())
@ -913,8 +912,7 @@ func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.Se
return failRet, nil
}
tr.CtxElapse(ctx, fmt.Sprintf("do search done, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
msgID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs()))
tr.CtxElapse(ctx, fmt.Sprintf("do reduce done in shard cluster, vChannel = %s, segmentIDs = %v", dmlChannel, req.GetSegmentIDs()))
failRet.Status.ErrorCode = commonpb.ErrorCode_Success
latency := tr.ElapseSpan()
@ -1196,7 +1194,7 @@ func (node *QueryNode) ShowConfigurations(ctx context.Context, req *internalpb.S
// GetMetrics return system infos of the query node, such as total memory, memory usage, cpu usage ...
func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
if !node.isHealthy() {
log.Warn("QueryNode.GetMetrics failed",
log.Ctx(ctx).Warn("QueryNode.GetMetrics failed",
zap.Int64("nodeId", Params.QueryNodeCfg.GetNodeID()),
zap.String("req", req.Request),
zap.Error(errQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID())))
@ -1212,7 +1210,7 @@ func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsR
metricType, err := metricsinfo.ParseMetricType(req.Request)
if err != nil {
log.Warn("QueryNode.GetMetrics failed to parse metric type",
log.Ctx(ctx).Warn("QueryNode.GetMetrics failed to parse metric type",
zap.Int64("nodeId", Params.QueryNodeCfg.GetNodeID()),
zap.String("req", req.Request),
zap.Error(err))
@ -1228,8 +1226,8 @@ func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsR
if metricType == metricsinfo.SystemInfoMetrics {
queryNodeMetrics, err := getSystemInfoMetrics(ctx, req, node)
if err != nil {
log.Warn("QueryNode.GetMetrics failed",
zap.Int64("nodeId", Params.QueryNodeCfg.GetNodeID()),
log.Ctx(ctx).Warn("QueryNode.GetMetrics failed",
zap.Int64("NodeId", Params.QueryNodeCfg.GetNodeID()),
zap.String("req", req.Request),
zap.String("metricType", metricType),
zap.Error(err))
@ -1240,17 +1238,11 @@ func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsR
},
}, nil
}
log.Debug("QueryNode.GetMetrics",
zap.Int64("node_id", Params.QueryNodeCfg.GetNodeID()),
zap.String("req", req.Request),
zap.String("metric_type", metricType),
zap.Any("queryNodeMetrics", queryNodeMetrics))
return queryNodeMetrics, nil
}
log.Debug("QueryNode.GetMetrics failed, request metric type is not implemented yet",
zap.Int64("nodeId", Params.QueryNodeCfg.GetNodeID()),
log.Ctx(ctx).Debug("QueryNode.GetMetrics failed, request metric type is not implemented yet",
zap.Int64("NodeId", Params.QueryNodeCfg.GetNodeID()),
zap.String("req", req.Request),
zap.String("metricType", metricType))

View File

@ -93,7 +93,7 @@ func TestReduce_AllFunc(t *testing.T) {
searchReq.timestamp = 0
assert.NoError(t, err)
searchResult, err := segment.search(searchReq)
searchResult, err := segment.search(ctx, searchReq)
assert.NoError(t, err)
err = checkSearchResult(nq, plan, searchResult)

View File

@ -80,26 +80,20 @@ func reduceStatisticResponse(results []*internalpb.GetStatisticsResponse) (*inte
func reduceSearchResults(ctx context.Context, results []*internalpb.SearchResults, nq int64, topk int64, metricType string) (*internalpb.SearchResults, error) {
searchResultData, err := decodeSearchResults(results)
if err != nil {
log.Ctx(ctx).Warn("shard leader decode search results errors", zap.Error(err))
log.Ctx(ctx).Warn("decode search results errors", zap.Error(err))
return nil, err
}
log.Ctx(ctx).Debug("shard leader get valid search results", zap.Int("numbers", len(searchResultData)))
for i, sData := range searchResultData {
log.Ctx(ctx).Debug("reduceSearchResultData",
zap.Int("result No.", i),
zap.Int64("nq", sData.NumQueries),
zap.Int64("topk", sData.TopK))
}
log.Ctx(ctx).Debug("reduceSearchResultData",
zap.Int("numbers", len(searchResultData)), zap.Int64("targetNq", nq), zap.Int64("targetTopk", topk))
reducedResultData, err := reduceSearchResultData(ctx, searchResultData, nq, topk)
if err != nil {
log.Ctx(ctx).Warn("shard leader reduce errors", zap.Error(err))
log.Ctx(ctx).Warn("reduce search results error", zap.Error(err))
return nil, err
}
searchResults, err := encodeSearchResultData(reducedResultData, nq, topk, metricType)
if err != nil {
log.Warn("shard leader encode search result errors", zap.Error(err))
log.Ctx(ctx).Warn("encode search results error", zap.Error(err))
return nil, err
}
//if searchResults.SlicedBlob == nil {
@ -178,7 +172,10 @@ func reduceSearchResultData(ctx context.Context, searchResultData []*schemapb.Se
// }
ret.Topks = append(ret.Topks, j)
}
log.Ctx(ctx).Debug("skip duplicated search result", zap.Int64("count", skipDupCnt))
if skipDupCnt > 0 {
log.Ctx(ctx).Debug("skip duplicated search result", zap.Int64("count", skipDupCnt))
}
return ret, nil
}

View File

@ -22,6 +22,8 @@ import (
"fmt"
"sync"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
@ -31,16 +33,23 @@ import (
// searchOnSegments performs search on listed segments
// all segment ids are validated before calling this function
func searchSegments(ctx context.Context, replica ReplicaInterface, segType segmentType, searchReq *searchRequest, segIDs []UniqueID) ([]*SearchResult, error) {
// results variables
resultCh := make(chan *SearchResult, len(segIDs))
errs := make([]error, len(segIDs))
var (
// results variables
resultCh = make(chan *SearchResult, len(segIDs))
errs = make([]error, len(segIDs))
wg sync.WaitGroup
// For log only
mu sync.Mutex
segmentsWithoutIndex []UniqueID
)
searchLabel := metrics.SealedSegmentLabel
if segType == commonpb.SegmentState_Growing {
searchLabel = metrics.GrowingSegmentLabel
}
// calling segment search in goroutines
var wg sync.WaitGroup
for i, segID := range segIDs {
wg.Add(1)
go func(segID UniqueID, i int) {
@ -53,9 +62,15 @@ func searchSegments(ctx context.Context, replica ReplicaInterface, segType segme
log.Error(err.Error()) // should not happen but still ignore it since the result is still correct
return
}
if !seg.hasLoadIndexForIndexedField(searchReq.searchFieldID) {
mu.Lock()
segmentsWithoutIndex = append(segmentsWithoutIndex, segID)
mu.Unlock()
}
// record search time
tr := timerecord.NewTimeRecorder("searchOnSegments")
searchResult, err := seg.search(searchReq)
searchResult, err := seg.search(ctx, searchReq)
errs[i] = err
resultCh <- searchResult
// update metrics
@ -78,6 +93,10 @@ func searchSegments(ctx context.Context, replica ReplicaInterface, segType segme
}
}
if len(segmentsWithoutIndex) > 0 {
log.Ctx(ctx).Info("search growing/sealed segments without indexes", zap.Int64s("segmentIDs", segmentsWithoutIndex))
}
return searchResults, nil
}

View File

@ -305,7 +305,7 @@ func (s *Segment) getMemSize() int64 {
return int64(memoryUsageInBytes)
}
func (s *Segment) search(searchReq *searchRequest) (*SearchResult, error) {
func (s *Segment) search(ctx context.Context, searchReq *searchRequest) (*SearchResult, error) {
/*
CStatus
Search(void* plan,
@ -327,7 +327,7 @@ func (s *Segment) search(searchReq *searchRequest) (*SearchResult, error) {
loadIndex := s.hasLoadIndexForIndexedField(searchReq.searchFieldID)
var searchResult SearchResult
log.Debug("start do search on segment",
log.Ctx(ctx).Debug("start do search on segment",
zap.Int64("msgID", searchReq.msgID),
zap.Int64("segmentID", s.segmentID),
zap.String("segmentType", s.segmentType.String()),
@ -341,7 +341,7 @@ func (s *Segment) search(searchReq *searchRequest) (*SearchResult, error) {
if err := HandleCStatus(&status, "Search failed"); err != nil {
return nil, err
}
log.Debug("do search on segment done",
log.Ctx(ctx).Debug("do search on segment done",
zap.Int64("msgID", searchReq.msgID),
zap.Int64("segmentID", s.segmentID),
zap.String("segmentType", s.segmentType.String()),

View File

@ -424,7 +424,7 @@ func TestSegment_segmentSearch(t *testing.T) {
req, err := parseSearchRequest(plan, placeGroupByte)
assert.NoError(t, err)
searchResult, err := segment.search(req)
searchResult, err := segment.search(ctx, req)
assert.NoError(t, err)
err = checkSearchResult(nq, plan, searchResult)

View File

@ -47,7 +47,7 @@ func validateOnHistoricalReplica(ctx context.Context, replica ReplicaInterface,
}
}
log.Ctx(ctx).Debug("read target partitions", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", searchPartIDs))
log.Ctx(ctx).Debug("read target partitions on historical replica", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", searchPartIDs))
col, err2 := replica.getCollectionByID(collectionID)
if err2 != nil {
return searchPartIDs, segmentIDs, err2
@ -55,8 +55,8 @@ func validateOnHistoricalReplica(ctx context.Context, replica ReplicaInterface,
// all partitions have been released
if len(searchPartIDs) == 0 && col.getLoadType() == loadTypePartition {
return searchPartIDs, segmentIDs, errors.New("partitions have been released , collectionID = " +
fmt.Sprintln(collectionID) + "target partitionIDs = " + fmt.Sprintln(searchPartIDs))
return searchPartIDs, segmentIDs,
fmt.Errorf("partitions have been released , collectionID = %d, target partitionID = %v", collectionID, searchPartIDs)
}
if len(searchPartIDs) == 0 && col.getLoadType() == loadTypeCollection {
return searchPartIDs, segmentIDs, nil
@ -112,7 +112,7 @@ func validateOnStreamReplica(ctx context.Context, replica ReplicaInterface, coll
}
}
log.Ctx(ctx).Debug("read target partitions", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", searchPartIDs))
log.Ctx(ctx).Debug("read target partitions on stream replica", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", searchPartIDs))
col, err2 := replica.getCollectionByID(collectionID)
if err2 != nil {
return searchPartIDs, segmentIDs, err2
@ -120,19 +120,13 @@ func validateOnStreamReplica(ctx context.Context, replica ReplicaInterface, coll
// all partitions have been released
if len(searchPartIDs) == 0 && col.getLoadType() == loadTypePartition {
return searchPartIDs, segmentIDs, errors.New("partitions have been released , collectionID = " +
fmt.Sprintln(collectionID) + "target partitionIDs = " + fmt.Sprintln(searchPartIDs))
return searchPartIDs, segmentIDs,
fmt.Errorf("partitions have been released , collectionID = %d, target partitionIDs = %v", collectionID, searchPartIDs)
}
if len(searchPartIDs) == 0 && col.getLoadType() == loadTypeCollection {
return searchPartIDs, segmentIDs, nil
}
segmentIDs, err = replica.getSegmentIDsByVChannel(searchPartIDs, vChannel, segmentTypeGrowing)
log.Ctx(ctx).Debug("validateOnStreamReplica getSegmentIDsByVChannel",
zap.Any("collectionID", collectionID),
zap.Any("vChannel", vChannel),
zap.Any("partitionIDs", searchPartIDs),
zap.Any("segmentIDs", segmentIDs),
zap.Error(err))
return searchPartIDs, segmentIDs, nil
return searchPartIDs, segmentIDs, err
}

View File

@ -91,7 +91,7 @@ func (tr *TimeRecorder) CtxElapse(ctx context.Context, msg string) time.Duration
func (tr *TimeRecorder) printTimeRecord(ctx context.Context, msg string, span time.Duration) {
log.Ctx(ctx).WithOptions(zap.AddCallerSkip(2)).
Debug(fmt.Sprintf("timerecorder %s: record elapsed duration", tr.header),
Debug(fmt.Sprintf("tr/%s", tr.header),
zap.String("msg", msg),
zap.Duration("duration", span),
)