mirror of https://github.com/milvus-io/milvus.git
Improve query shard log (#16950)
Signed-off-by: Letian Jiang <letian.jiang@zilliz.com>pull/16969/head
parent
77d06bd071
commit
f0dee539bb
|
@ -223,14 +223,21 @@ func (q *queryShard) getNewTSafe(tp tsType) (Timestamp, error) {
|
|||
}
|
||||
|
||||
func (q *queryShard) waitUntilServiceable(ctx context.Context, guaranteeTs Timestamp, tp tsType) {
|
||||
st := q.getServiceableTime(tp)
|
||||
log.Debug("serviceable check start", zap.String("tsType", tp.String()), zap.Uint64("guarantee ts", guaranteeTs), zap.Uint64("serviceable ts", st), zap.String("channel", q.channel))
|
||||
serviceable := func() bool {
|
||||
st = q.getServiceableTime(tp)
|
||||
return st >= guaranteeTs
|
||||
}
|
||||
|
||||
q.watcherCond.L.Lock()
|
||||
defer q.watcherCond.L.Unlock()
|
||||
st := q.getServiceableTime(tp)
|
||||
for guaranteeTs > st {
|
||||
for !serviceable() {
|
||||
log.Debug("serviceable ts before guarantee ts", zap.Uint64("serviceable ts", st), zap.Uint64("guarantee ts", guaranteeTs), zap.String("channel", q.channel))
|
||||
q.watcherCond.Wait()
|
||||
if err := ctx.Err(); err != nil {
|
||||
log.Warn("waitUntialServiceable timeout", zap.Uint64("serviceable ts", st), zap.Uint64("guarantee ts", guaranteeTs), zap.String("channel", q.channel))
|
||||
log.Warn("waitUntilServiceable timeout", zap.Uint64("serviceable ts", st), zap.Uint64("guarantee ts", guaranteeTs), zap.String("channel", q.channel))
|
||||
// TODO: implement timeout logic
|
||||
return
|
||||
}
|
||||
st = q.getServiceableTime(tp)
|
||||
|
@ -390,9 +397,8 @@ func (q *queryShard) searchLeader(ctx context.Context, req *querypb.SearchReques
|
|||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
// hold request until guarantee timestamp >= service timestamp
|
||||
guaranteeTs := req.GetReq().GetGuaranteeTimestamp()
|
||||
q.waitUntilServiceable(ctx, guaranteeTs, tsTypeDML)
|
||||
q.waitUntilServiceable(ctx, guaranteeTs, tsTypeDML) // wait until guarantee timestamp >= service timestamp
|
||||
// shard leader queries its own streaming data
|
||||
// TODO add context
|
||||
sResults, _, _, sErr := q.streaming.search(searchRequests, collectionID, partitionIDs, req.DmlChannel, plan, timestamp)
|
||||
|
@ -419,13 +425,13 @@ func (q *queryShard) searchLeader(ctx context.Context, req *querypb.SearchReques
|
|||
MetricType: plan.getMetricType(),
|
||||
NumQueries: queryNum,
|
||||
TopK: topK,
|
||||
SlicedBlob: nil,
|
||||
SlicedBlob: nil, // placeholder for serialized streaming result
|
||||
SlicedOffset: 1,
|
||||
SlicedNumCount: 1,
|
||||
})
|
||||
|
||||
// reduce streaming results and transform to blob
|
||||
if len(streamingResults) > 0 {
|
||||
// reduce search results
|
||||
numSegment := int64(len(streamingResults))
|
||||
err = reduceSearchResultsAndFillData(plan, streamingResults, numSegment)
|
||||
if err != nil {
|
||||
|
@ -615,22 +621,10 @@ func reduceSearchResultData(searchResultData []*schemapb.SearchResultData, nq in
|
|||
}
|
||||
offsets[sel]++
|
||||
}
|
||||
|
||||
// if realTopK != -1 && realTopK != j {
|
||||
// log.Warn("Proxy Reduce Search Result", zap.Error(errors.New("the length (topk) between all result of query is different")))
|
||||
// // return nil, errors.New("the length (topk) between all result of query is different")
|
||||
// }
|
||||
ret.Topks = append(ret.Topks, j)
|
||||
}
|
||||
log.Debug("skip duplicated search result", zap.Int64("count", skipDupCnt), zap.Any("ret", ret))
|
||||
// ret.TopK = realTopK
|
||||
|
||||
// if !distance.PositivelyRelated(metricType) {
|
||||
// for k := range ret.Scores {
|
||||
// ret.Scores[k] *= -1
|
||||
// }
|
||||
// }
|
||||
|
||||
// Note: query shard does not check whether the metricType is positively related, proxy will do the job
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
|
@ -901,6 +895,7 @@ func mergeInternalRetrieveResults(retrieveResults []*internalpb.RetrieveResults)
|
|||
return ret, nil
|
||||
}
|
||||
|
||||
// printSearchResultData is for debug usage
|
||||
// func printSearchResultData(data *schemapb.SearchResultData, header string) {
|
||||
// size := len(data.Ids.GetIntId().Data)
|
||||
// if size != len(data.Scores) {
|
||||
|
|
Loading…
Reference in New Issue