mirror of https://github.com/milvus-io/milvus.git
enhance: support retry search when topk is reduced and result not enough (#35645)
issue: #35576 This pr is to cover those cases when queryHook optimize search params and make the result size insufficient, add retry search mechanism and add related metrics for alarming. --------- Signed-off-by: chasingegg <chao.gao@zilliz.com>pull/37085/head^2
parent
04343d17b1
commit
1d61b604e1
|
@ -126,6 +126,7 @@ message SearchRequest {
|
|||
int64 group_by_field_id = 23;
|
||||
int64 group_size = 24;
|
||||
int64 field_id = 25;
|
||||
bool is_topk_reduce = 26;
|
||||
}
|
||||
|
||||
message SubSearchResults {
|
||||
|
@ -161,6 +162,7 @@ message SearchResults {
|
|||
repeated SubSearchResults sub_results = 15;
|
||||
bool is_advanced = 16;
|
||||
int64 all_search_count = 17;
|
||||
bool is_topk_reduce = 18;
|
||||
}
|
||||
|
||||
message CostAggregation {
|
||||
|
|
|
@ -2899,9 +2899,30 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest)
|
|||
rsp := &milvuspb.SearchResults{
|
||||
Status: merr.Success(),
|
||||
}
|
||||
|
||||
optimizedSearch := true
|
||||
resultSizeInsufficient := false
|
||||
isTopkReduce := false
|
||||
err2 := retry.Handle(ctx, func() (bool, error) {
|
||||
rsp, err = node.
|
||||
search(ctx, request)
|
||||
rsp, resultSizeInsufficient, isTopkReduce, err = node.search(ctx, request, optimizedSearch)
|
||||
if merr.Ok(rsp.GetStatus()) && optimizedSearch && resultSizeInsufficient && isTopkReduce && paramtable.Get().AutoIndexConfig.EnableResultLimitCheck.GetAsBool() {
|
||||
// without optimize search
|
||||
optimizedSearch = false
|
||||
rsp, resultSizeInsufficient, isTopkReduce, err = node.search(ctx, request, optimizedSearch)
|
||||
metrics.ProxyRetrySearchCount.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
metrics.SearchLabel,
|
||||
request.GetCollectionName(),
|
||||
).Inc()
|
||||
// result size still insufficient
|
||||
if resultSizeInsufficient {
|
||||
metrics.ProxyRetrySearchResultInsufficientCount.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
metrics.SearchLabel,
|
||||
request.GetCollectionName(),
|
||||
).Inc()
|
||||
}
|
||||
}
|
||||
if errors.Is(merr.Error(rsp.GetStatus()), merr.ErrInconsistentRequery) {
|
||||
return true, merr.Error(rsp.GetStatus())
|
||||
}
|
||||
|
@ -2913,11 +2934,13 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest)
|
|||
return rsp, err
|
||||
}
|
||||
|
||||
func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
|
||||
metrics.GetStats(ctx).
|
||||
SetNodeID(paramtable.GetNodeID()).
|
||||
SetInboundLabel(metrics.SearchLabel).
|
||||
SetCollectionName(request.GetCollectionName())
|
||||
func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest, optimizedSearch bool) (*milvuspb.SearchResults, bool, bool, error) {
|
||||
receiveSize := proto.Size(request)
|
||||
metrics.ProxyReceiveBytes.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
metrics.SearchLabel,
|
||||
request.GetCollectionName(),
|
||||
).Add(float64(receiveSize))
|
||||
|
||||
metrics.ProxyReceivedNQ.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
|
@ -2928,7 +2951,7 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest)
|
|||
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
|
||||
return &milvuspb.SearchResults{
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}, false, false, nil
|
||||
}
|
||||
|
||||
method := "Search"
|
||||
|
@ -2949,7 +2972,7 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest)
|
|||
if err != nil {
|
||||
return &milvuspb.SearchResults{
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}, false, false, nil
|
||||
}
|
||||
|
||||
request.PlaceholderGroup = placeholderGroupBytes
|
||||
|
@ -2963,7 +2986,8 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest)
|
|||
commonpbutil.WithMsgType(commonpb.MsgType_Search),
|
||||
commonpbutil.WithSourceID(paramtable.GetNodeID()),
|
||||
),
|
||||
ReqID: paramtable.GetNodeID(),
|
||||
ReqID: paramtable.GetNodeID(),
|
||||
IsTopkReduce: optimizedSearch,
|
||||
},
|
||||
request: request,
|
||||
tr: timerecord.NewTimeRecorder("search"),
|
||||
|
@ -3017,7 +3041,7 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest)
|
|||
|
||||
return &milvuspb.SearchResults{
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}, false, false, nil
|
||||
}
|
||||
tr.CtxRecord(ctx, "search request enqueue")
|
||||
|
||||
|
@ -3043,7 +3067,7 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest)
|
|||
|
||||
return &milvuspb.SearchResults{
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}, false, false, nil
|
||||
}
|
||||
|
||||
span := tr.CtxRecord(ctx, "wait search result")
|
||||
|
@ -3100,7 +3124,7 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest)
|
|||
metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeSearch, dbName, username).Add(float64(v))
|
||||
}
|
||||
}
|
||||
return qt.result, nil
|
||||
return qt.result, qt.resultSizeInsufficient, qt.isTopkReduce, nil
|
||||
}
|
||||
|
||||
func (node *Proxy) HybridSearch(ctx context.Context, request *milvuspb.HybridSearchRequest) (*milvuspb.SearchResults, error) {
|
||||
|
@ -3108,8 +3132,29 @@ func (node *Proxy) HybridSearch(ctx context.Context, request *milvuspb.HybridSea
|
|||
rsp := &milvuspb.SearchResults{
|
||||
Status: merr.Success(),
|
||||
}
|
||||
optimizedSearch := true
|
||||
resultSizeInsufficient := false
|
||||
isTopkReduce := false
|
||||
err2 := retry.Handle(ctx, func() (bool, error) {
|
||||
rsp, err = node.hybridSearch(ctx, request)
|
||||
rsp, resultSizeInsufficient, isTopkReduce, err = node.hybridSearch(ctx, request, optimizedSearch)
|
||||
if merr.Ok(rsp.GetStatus()) && optimizedSearch && resultSizeInsufficient && isTopkReduce && paramtable.Get().AutoIndexConfig.EnableResultLimitCheck.GetAsBool() {
|
||||
// without optimize search
|
||||
optimizedSearch = false
|
||||
rsp, resultSizeInsufficient, isTopkReduce, err = node.hybridSearch(ctx, request, optimizedSearch)
|
||||
metrics.ProxyRetrySearchCount.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
metrics.HybridSearchLabel,
|
||||
request.GetCollectionName(),
|
||||
).Inc()
|
||||
// result size still insufficient
|
||||
if resultSizeInsufficient {
|
||||
metrics.ProxyRetrySearchResultInsufficientCount.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
metrics.HybridSearchLabel,
|
||||
request.GetCollectionName(),
|
||||
).Inc()
|
||||
}
|
||||
}
|
||||
if errors.Is(merr.Error(rsp.GetStatus()), merr.ErrInconsistentRequery) {
|
||||
return true, merr.Error(rsp.GetStatus())
|
||||
}
|
||||
|
@ -3121,16 +3166,18 @@ func (node *Proxy) HybridSearch(ctx context.Context, request *milvuspb.HybridSea
|
|||
return rsp, err
|
||||
}
|
||||
|
||||
func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSearchRequest) (*milvuspb.SearchResults, error) {
|
||||
metrics.GetStats(ctx).
|
||||
SetNodeID(paramtable.GetNodeID()).
|
||||
SetInboundLabel(metrics.HybridSearchLabel).
|
||||
SetCollectionName(request.GetCollectionName())
|
||||
func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSearchRequest, optimizedSearch bool) (*milvuspb.SearchResults, bool, bool, error) {
|
||||
receiveSize := proto.Size(request)
|
||||
metrics.ProxyReceiveBytes.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
metrics.HybridSearchLabel,
|
||||
request.GetCollectionName(),
|
||||
).Add(float64(receiveSize))
|
||||
|
||||
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
|
||||
return &milvuspb.SearchResults{
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}, false, false, nil
|
||||
}
|
||||
|
||||
method := "HybridSearch"
|
||||
|
@ -3154,7 +3201,8 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea
|
|||
commonpbutil.WithMsgType(commonpb.MsgType_Search),
|
||||
commonpbutil.WithSourceID(paramtable.GetNodeID()),
|
||||
),
|
||||
ReqID: paramtable.GetNodeID(),
|
||||
ReqID: paramtable.GetNodeID(),
|
||||
IsTopkReduce: optimizedSearch,
|
||||
},
|
||||
request: newSearchReq,
|
||||
tr: timerecord.NewTimeRecorder(method),
|
||||
|
@ -3203,7 +3251,7 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea
|
|||
|
||||
return &milvuspb.SearchResults{
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}, false, false, nil
|
||||
}
|
||||
tr.CtxRecord(ctx, "hybrid search request enqueue")
|
||||
|
||||
|
@ -3228,7 +3276,7 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea
|
|||
|
||||
return &milvuspb.SearchResults{
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}, false, false, nil
|
||||
}
|
||||
|
||||
span := tr.CtxRecord(ctx, "wait hybrid search result")
|
||||
|
@ -3285,7 +3333,7 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea
|
|||
metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeHybridSearch, dbName, username).Add(float64(v))
|
||||
}
|
||||
}
|
||||
return qt.result, nil
|
||||
return qt.result, qt.resultSizeInsufficient, qt.isTopkReduce, nil
|
||||
}
|
||||
|
||||
func (node *Proxy) getVectorPlaceholderGroupForSearchByPks(ctx context.Context, request *milvuspb.SearchRequest) ([]byte, error) {
|
||||
|
|
|
@ -62,6 +62,8 @@ type searchTask struct {
|
|||
partitionKeyMode bool
|
||||
enableMaterializedView bool
|
||||
mustUsePartitionKey bool
|
||||
resultSizeInsufficient bool
|
||||
isTopkReduce bool
|
||||
|
||||
userOutputFields []string
|
||||
userDynamicFields []string
|
||||
|
@ -644,7 +646,11 @@ func (t *searchTask) PostExecute(ctx context.Context) error {
|
|||
|
||||
t.queryChannelsTs = make(map[string]uint64)
|
||||
t.relatedDataSize = 0
|
||||
isTopkReduce := false
|
||||
for _, r := range toReduceResults {
|
||||
if r.GetIsTopkReduce() {
|
||||
isTopkReduce = true
|
||||
}
|
||||
t.relatedDataSize += r.GetCostAggregation().GetTotalRelatedDataSize()
|
||||
for ch, ts := range r.GetChannelsMvcc() {
|
||||
t.queryChannelsTs[ch] = ts
|
||||
|
@ -657,6 +663,7 @@ func (t *searchTask) PostExecute(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// reduce
|
||||
if t.SearchRequest.GetIsAdvanced() {
|
||||
multipleInternalResults := make([][]*internalpb.SearchResults, len(t.SearchRequest.GetSubReqs()))
|
||||
for _, searchResult := range toReduceResults {
|
||||
|
@ -713,6 +720,17 @@ func (t *searchTask) PostExecute(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
// reduce done, get final result
|
||||
limit := t.SearchRequest.GetTopk() - t.SearchRequest.GetOffset()
|
||||
resultSizeInsufficient := false
|
||||
for _, topk := range t.result.Results.Topks {
|
||||
if topk < limit {
|
||||
resultSizeInsufficient = true
|
||||
break
|
||||
}
|
||||
}
|
||||
t.resultSizeInsufficient = resultSizeInsufficient
|
||||
t.isTopkReduce = isTopkReduce
|
||||
t.result.CollectionName = t.collectionName
|
||||
t.fillInFieldInfo()
|
||||
|
||||
|
|
|
@ -72,7 +72,9 @@ func TestSearchTask_PostExecute(t *testing.T) {
|
|||
task := &searchTask{
|
||||
ctx: ctx,
|
||||
collectionName: collName,
|
||||
SearchRequest: &internalpb.SearchRequest{},
|
||||
SearchRequest: &internalpb.SearchRequest{
|
||||
IsTopkReduce: true,
|
||||
},
|
||||
request: &milvuspb.SearchRequest{
|
||||
CollectionName: collName,
|
||||
Nq: 1,
|
||||
|
@ -98,6 +100,8 @@ func TestSearchTask_PostExecute(t *testing.T) {
|
|||
err := qt.PostExecute(context.TODO())
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, qt.result.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
|
||||
assert.Equal(t, qt.resultSizeInsufficient, true)
|
||||
assert.Equal(t, qt.isTopkReduce, false)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -382,6 +382,7 @@ func (sd *shardDelegator) Search(ctx context.Context, req *querypb.SearchRequest
|
|||
GroupByFieldId: subReq.GetGroupByFieldId(),
|
||||
GroupSize: subReq.GetGroupSize(),
|
||||
FieldId: subReq.GetFieldId(),
|
||||
IsTopkReduce: req.GetReq().GetIsTopkReduce(),
|
||||
}
|
||||
future := conc.Go(func() (*internalpb.SearchResults, error) {
|
||||
searchReq := &querypb.SearchRequest{
|
||||
|
|
|
@ -27,6 +27,7 @@ type QueryHook interface {
|
|||
func OptimizeSearchParams(ctx context.Context, req *querypb.SearchRequest, queryHook QueryHook, numSegments int) (*querypb.SearchRequest, error) {
|
||||
// no hook applied or disabled, just return
|
||||
if queryHook == nil || !paramtable.Get().AutoIndexConfig.Enable.GetAsBool() {
|
||||
req.Req.IsTopkReduce = false
|
||||
return req, nil
|
||||
}
|
||||
|
||||
|
@ -67,7 +68,7 @@ func OptimizeSearchParams(ctx context.Context, req *querypb.SearchRequest, query
|
|||
common.SegmentNumKey: estSegmentNum,
|
||||
common.WithFilterKey: withFilter,
|
||||
common.DataTypeKey: int32(plan.GetVectorAnns().GetVectorType()),
|
||||
common.WithOptimizeKey: paramtable.Get().AutoIndexConfig.EnableOptimize.GetAsBool(),
|
||||
common.WithOptimizeKey: paramtable.Get().AutoIndexConfig.EnableOptimize.GetAsBool() && req.GetReq().GetIsTopkReduce(),
|
||||
common.CollectionKey: req.GetReq().GetCollectionID(),
|
||||
}
|
||||
if withFilter && channelNum > 1 {
|
||||
|
@ -78,7 +79,9 @@ func OptimizeSearchParams(ctx context.Context, req *querypb.SearchRequest, query
|
|||
log.Warn("failed to execute queryHook", zap.Error(err))
|
||||
return nil, merr.WrapErrServiceUnavailable(err.Error(), "queryHook execution failed")
|
||||
}
|
||||
queryInfo.Topk = params[common.TopKKey].(int64)
|
||||
finalTopk := params[common.TopKKey].(int64)
|
||||
isTopkReduce := req.GetReq().GetIsTopkReduce() && (finalTopk < queryInfo.GetTopk())
|
||||
queryInfo.Topk = finalTopk
|
||||
queryInfo.SearchParams = params[common.SearchParamKey].(string)
|
||||
serializedExprPlan, err := proto.Marshal(&plan)
|
||||
if err != nil {
|
||||
|
@ -86,6 +89,7 @@ func OptimizeSearchParams(ctx context.Context, req *querypb.SearchRequest, query
|
|||
return nil, merr.WrapErrParameterInvalid("marshalable search plan", "plan with marshal error", err.Error())
|
||||
}
|
||||
req.Req.SerializedExprPlan = serializedExprPlan
|
||||
req.Req.IsTopkReduce = isTopkReduce
|
||||
log.Debug("optimized search params done", zap.Any("queryInfo", queryInfo))
|
||||
default:
|
||||
log.Warn("not supported node type", zap.String("nodeType", fmt.Sprintf("%T", plan.GetNode())))
|
||||
|
|
|
@ -47,27 +47,43 @@ func (suite *QueryHookSuite) TestOptimizeSearchParam() {
|
|||
suite.queryHook = nil
|
||||
}()
|
||||
|
||||
plan := &planpb.PlanNode{
|
||||
Node: &planpb.PlanNode_VectorAnns{
|
||||
VectorAnns: &planpb.VectorANNS{
|
||||
QueryInfo: &planpb.QueryInfo{
|
||||
Topk: 100,
|
||||
SearchParams: `{"param": 1}`,
|
||||
getPlan := func(topk int64) *planpb.PlanNode {
|
||||
return &planpb.PlanNode{
|
||||
Node: &planpb.PlanNode_VectorAnns{
|
||||
VectorAnns: &planpb.VectorANNS{
|
||||
QueryInfo: &planpb.QueryInfo{
|
||||
Topk: topk,
|
||||
SearchParams: `{"param": 1}`,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
bs, err := proto.Marshal(plan)
|
||||
|
||||
bs, err := proto.Marshal(getPlan(100))
|
||||
suite.Require().NoError(err)
|
||||
|
||||
req, err := OptimizeSearchParams(ctx, &querypb.SearchRequest{
|
||||
Req: &internalpb.SearchRequest{
|
||||
SerializedExprPlan: bs,
|
||||
IsTopkReduce: true,
|
||||
},
|
||||
TotalChannelNum: 2,
|
||||
}, suite.queryHook, 2)
|
||||
suite.NoError(err)
|
||||
suite.verifyQueryInfo(req, 50, `{"param": 2}`)
|
||||
suite.verifyQueryInfo(req, 50, true, `{"param": 2}`)
|
||||
|
||||
bs, err = proto.Marshal(getPlan(50))
|
||||
suite.Require().NoError(err)
|
||||
req, err = OptimizeSearchParams(ctx, &querypb.SearchRequest{
|
||||
Req: &internalpb.SearchRequest{
|
||||
SerializedExprPlan: bs,
|
||||
IsTopkReduce: true,
|
||||
},
|
||||
TotalChannelNum: 2,
|
||||
}, suite.queryHook, 2)
|
||||
suite.NoError(err)
|
||||
suite.verifyQueryInfo(req, 50, false, `{"param": 2}`)
|
||||
})
|
||||
|
||||
suite.Run("disable optimization", func() {
|
||||
|
@ -95,7 +111,7 @@ func (suite *QueryHookSuite) TestOptimizeSearchParam() {
|
|||
TotalChannelNum: 2,
|
||||
}, suite.queryHook, 2)
|
||||
suite.NoError(err)
|
||||
suite.verifyQueryInfo(req, 100, `{"param": 1}`)
|
||||
suite.verifyQueryInfo(req, 100, false, `{"param": 1}`)
|
||||
})
|
||||
|
||||
suite.Run("no_hook", func() {
|
||||
|
@ -118,11 +134,12 @@ func (suite *QueryHookSuite) TestOptimizeSearchParam() {
|
|||
req, err := OptimizeSearchParams(ctx, &querypb.SearchRequest{
|
||||
Req: &internalpb.SearchRequest{
|
||||
SerializedExprPlan: bs,
|
||||
IsTopkReduce: true,
|
||||
},
|
||||
TotalChannelNum: 2,
|
||||
}, suite.queryHook, 2)
|
||||
suite.NoError(err)
|
||||
suite.verifyQueryInfo(req, 100, `{"param": 1}`)
|
||||
suite.verifyQueryInfo(req, 100, false, `{"param": 1}`)
|
||||
})
|
||||
|
||||
suite.Run("other_plannode", func() {
|
||||
|
@ -203,7 +220,7 @@ func (suite *QueryHookSuite) TestOptimizeSearchParam() {
|
|||
})
|
||||
}
|
||||
|
||||
func (suite *QueryHookSuite) verifyQueryInfo(req *querypb.SearchRequest, topK int64, param string) {
|
||||
func (suite *QueryHookSuite) verifyQueryInfo(req *querypb.SearchRequest, topK int64, isTopkReduce bool, param string) {
|
||||
planBytes := req.GetReq().GetSerializedExprPlan()
|
||||
|
||||
plan := planpb.PlanNode{}
|
||||
|
@ -213,6 +230,7 @@ func (suite *QueryHookSuite) verifyQueryInfo(req *querypb.SearchRequest, topK in
|
|||
queryInfo := plan.GetVectorAnns().GetQueryInfo()
|
||||
suite.Equal(topK, queryInfo.GetTopk())
|
||||
suite.Equal(param, queryInfo.GetSearchParams())
|
||||
suite.Equal(isTopkReduce, req.GetReq().GetIsTopkReduce())
|
||||
}
|
||||
|
||||
func TestOptimizeSearchParam(t *testing.T) {
|
||||
|
|
|
@ -63,10 +63,14 @@ func ReduceSearchResults(ctx context.Context, results []*internalpb.SearchResult
|
|||
defer sp.End()
|
||||
|
||||
channelsMvcc := make(map[string]uint64)
|
||||
isTopkReduce := false
|
||||
for _, r := range results {
|
||||
for ch, ts := range r.GetChannelsMvcc() {
|
||||
channelsMvcc[ch] = ts
|
||||
}
|
||||
if r.GetIsTopkReduce() {
|
||||
isTopkReduce = true
|
||||
}
|
||||
// shouldn't let new SearchResults.MetricType to be empty, though the req.MetricType is empty
|
||||
if info.GetMetricType() == "" {
|
||||
info.SetMetricType(r.MetricType)
|
||||
|
@ -120,6 +124,7 @@ func ReduceSearchResults(ctx context.Context, results []*internalpb.SearchResult
|
|||
}, 0)
|
||||
searchResults.CostAggregation.TotalRelatedDataSize = relatedDataSize
|
||||
searchResults.ChannelsMvcc = channelsMvcc
|
||||
searchResults.IsTopkReduce = isTopkReduce
|
||||
return searchResults, nil
|
||||
}
|
||||
|
||||
|
@ -129,11 +134,15 @@ func ReduceAdvancedSearchResults(ctx context.Context, results []*internalpb.Sear
|
|||
|
||||
channelsMvcc := make(map[string]uint64)
|
||||
relatedDataSize := int64(0)
|
||||
isTopkReduce := false
|
||||
searchResults := &internalpb.SearchResults{
|
||||
IsAdvanced: true,
|
||||
}
|
||||
|
||||
for index, result := range results {
|
||||
if result.GetIsTopkReduce() {
|
||||
isTopkReduce = true
|
||||
}
|
||||
relatedDataSize += result.GetCostAggregation().GetTotalRelatedDataSize()
|
||||
for ch, ts := range result.GetChannelsMvcc() {
|
||||
channelsMvcc[ch] = ts
|
||||
|
@ -169,6 +178,7 @@ func ReduceAdvancedSearchResults(ctx context.Context, results []*internalpb.Sear
|
|||
searchResults.CostAggregation = &internalpb.CostAggregation{}
|
||||
}
|
||||
searchResults.CostAggregation.TotalRelatedDataSize = relatedDataSize
|
||||
searchResults.IsTopkReduce = isTopkReduce
|
||||
return searchResults, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -721,6 +721,9 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe
|
|||
resp = task.SearchResult()
|
||||
resp.GetCostAggregation().ResponseTime = tr.ElapseSpan().Milliseconds()
|
||||
resp.GetCostAggregation().TotalNQ = node.scheduler.GetWaitingTaskTotalNQ()
|
||||
if req.GetReq().GetIsTopkReduce() {
|
||||
resp.IsTopkReduce = true
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1157,7 +1157,7 @@ func (suite *ServiceSuite) TestGetSegmentInfo_Failed() {
|
|||
}
|
||||
|
||||
// Test Search
|
||||
func (suite *ServiceSuite) genCSearchRequest(nq int64, dataType schemapb.DataType, fieldID int64, metricType string) (*internalpb.SearchRequest, error) {
|
||||
func (suite *ServiceSuite) genCSearchRequest(nq int64, dataType schemapb.DataType, fieldID int64, metricType string, isTopkReduce bool) (*internalpb.SearchRequest, error) {
|
||||
placeHolder, err := genPlaceHolderGroup(nq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -1181,6 +1181,7 @@ func (suite *ServiceSuite) genCSearchRequest(nq int64, dataType schemapb.DataTyp
|
|||
DslType: commonpb.DslType_BoolExprV1,
|
||||
Nq: nq,
|
||||
MvccTimestamp: typeutil.MaxTimestamp,
|
||||
IsTopkReduce: isTopkReduce,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -1190,7 +1191,7 @@ func (suite *ServiceSuite) TestSearch_Normal() {
|
|||
suite.TestWatchDmChannelsInt64()
|
||||
suite.TestLoadSegments_Int64()
|
||||
|
||||
creq, err := suite.genCSearchRequest(10, schemapb.DataType_FloatVector, 107, defaultMetricType)
|
||||
creq, err := suite.genCSearchRequest(10, schemapb.DataType_FloatVector, 107, defaultMetricType, false)
|
||||
req := &querypb.SearchRequest{
|
||||
Req: creq,
|
||||
|
||||
|
@ -1214,7 +1215,7 @@ func (suite *ServiceSuite) TestSearch_Concurrent() {
|
|||
futures := make([]*conc.Future[*internalpb.SearchResults], 0, concurrency)
|
||||
for i := 0; i < concurrency; i++ {
|
||||
future := conc.Go(func() (*internalpb.SearchResults, error) {
|
||||
creq, err := suite.genCSearchRequest(30, schemapb.DataType_FloatVector, 107, defaultMetricType)
|
||||
creq, err := suite.genCSearchRequest(30, schemapb.DataType_FloatVector, 107, defaultMetricType, false)
|
||||
req := &querypb.SearchRequest{
|
||||
Req: creq,
|
||||
|
||||
|
@ -1240,7 +1241,7 @@ func (suite *ServiceSuite) TestSearch_Failed() {
|
|||
|
||||
// data
|
||||
schema := segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, false)
|
||||
creq, err := suite.genCSearchRequest(10, schemapb.DataType_FloatVector, 107, "invalidMetricType")
|
||||
creq, err := suite.genCSearchRequest(10, schemapb.DataType_FloatVector, 107, "invalidMetricType", false)
|
||||
req := &querypb.SearchRequest{
|
||||
Req: creq,
|
||||
|
||||
|
@ -1319,6 +1320,7 @@ func (suite *ServiceSuite) TestSearchSegments_Unhealthy() {
|
|||
|
||||
rsp, err := suite.node.SearchSegments(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(false, rsp.GetIsTopkReduce())
|
||||
suite.Equal(commonpb.ErrorCode_NotReadyServe, rsp.GetStatus().GetErrorCode())
|
||||
suite.Equal(merr.Code(merr.ErrServiceNotReady), rsp.GetStatus().GetCode())
|
||||
}
|
||||
|
@ -1338,6 +1340,7 @@ func (suite *ServiceSuite) TestSearchSegments_Failed() {
|
|||
|
||||
rsp, err := suite.node.SearchSegments(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(false, rsp.GetIsTopkReduce())
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, rsp.GetStatus().GetErrorCode())
|
||||
suite.Equal(merr.Code(merr.ErrCollectionNotLoaded), rsp.GetStatus().GetCode())
|
||||
|
||||
|
@ -1358,7 +1361,7 @@ func (suite *ServiceSuite) TestSearchSegments_Normal() {
|
|||
suite.TestWatchDmChannelsInt64()
|
||||
suite.TestLoadSegments_Int64()
|
||||
|
||||
creq, err := suite.genCSearchRequest(10, schemapb.DataType_FloatVector, 107, defaultMetricType)
|
||||
creq, err := suite.genCSearchRequest(10, schemapb.DataType_FloatVector, 107, defaultMetricType, false)
|
||||
req := &querypb.SearchRequest{
|
||||
Req: creq,
|
||||
|
||||
|
@ -1369,6 +1372,14 @@ func (suite *ServiceSuite) TestSearchSegments_Normal() {
|
|||
|
||||
rsp, err := suite.node.SearchSegments(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(rsp.GetIsTopkReduce(), false)
|
||||
suite.Equal(commonpb.ErrorCode_Success, rsp.GetStatus().GetErrorCode())
|
||||
|
||||
req.Req, err = suite.genCSearchRequest(10, schemapb.DataType_FloatVector, 107, defaultMetricType, true)
|
||||
suite.NoError(err)
|
||||
rsp, err = suite.node.SearchSegments(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(rsp.GetIsTopkReduce(), true)
|
||||
suite.Equal(commonpb.ErrorCode_Success, rsp.GetStatus().GetErrorCode())
|
||||
}
|
||||
|
||||
|
@ -1378,7 +1389,7 @@ func (suite *ServiceSuite) TestStreamingSearch() {
|
|||
suite.TestWatchDmChannelsInt64()
|
||||
suite.TestLoadSegments_Int64()
|
||||
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.UseStreamComputing.Key, "true")
|
||||
creq, err := suite.genCSearchRequest(10, schemapb.DataType_FloatVector, 107, defaultMetricType)
|
||||
creq, err := suite.genCSearchRequest(10, schemapb.DataType_FloatVector, 107, defaultMetricType, false)
|
||||
req := &querypb.SearchRequest{
|
||||
Req: creq,
|
||||
FromShardLeader: true,
|
||||
|
@ -1391,6 +1402,7 @@ func (suite *ServiceSuite) TestStreamingSearch() {
|
|||
|
||||
rsp, err := suite.node.SearchSegments(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(false, rsp.GetIsTopkReduce())
|
||||
suite.Equal(commonpb.ErrorCode_Success, rsp.GetStatus().GetErrorCode())
|
||||
}
|
||||
|
||||
|
@ -1399,7 +1411,7 @@ func (suite *ServiceSuite) TestStreamingSearchGrowing() {
|
|||
// pre
|
||||
suite.TestWatchDmChannelsInt64()
|
||||
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.UseStreamComputing.Key, "true")
|
||||
creq, err := suite.genCSearchRequest(10, schemapb.DataType_FloatVector, 107, defaultMetricType)
|
||||
creq, err := suite.genCSearchRequest(10, schemapb.DataType_FloatVector, 107, defaultMetricType, false)
|
||||
req := &querypb.SearchRequest{
|
||||
Req: creq,
|
||||
FromShardLeader: true,
|
||||
|
|
|
@ -389,6 +389,25 @@ var (
|
|||
Name: "max_insert_rate",
|
||||
Help: "max insert rate",
|
||||
}, []string{"node_id", "scope"})
|
||||
|
||||
// ProxyRetrySearchCount records the retry search count when result count does not meet limit and topk reduce is on
|
||||
ProxyRetrySearchCount = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "retry_search_cnt",
|
||||
Help: "counter of retry search",
|
||||
}, []string{nodeIDLabelName, queryTypeLabelName, collectionName})
|
||||
|
||||
// ProxyRetrySearchResultInsufficientCount records the retry search without reducing topk that still not meet result limit
|
||||
// there are more likely some non-index-related reasons like we do not have enough entities for very big k, duplicate pks, etc
|
||||
ProxyRetrySearchResultInsufficientCount = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "retry_search_result_insufficient_cnt",
|
||||
Help: "counter of retry search which does not have enough results",
|
||||
}, []string{nodeIDLabelName, queryTypeLabelName, collectionName})
|
||||
)
|
||||
|
||||
// RegisterProxy registers Proxy metrics
|
||||
|
@ -447,6 +466,8 @@ func RegisterProxy(registry *prometheus.Registry) {
|
|||
registry.MustRegister(ProxyReqInQueueLatency)
|
||||
|
||||
registry.MustRegister(MaxInsertRate)
|
||||
registry.MustRegister(ProxyRetrySearchCount)
|
||||
registry.MustRegister(ProxyRetrySearchResultInsufficientCount)
|
||||
|
||||
RegisterStreamingServiceClient(registry)
|
||||
}
|
||||
|
@ -552,4 +573,24 @@ func CleanupProxyCollectionMetrics(nodeID int64, collection string) {
|
|||
nodeIDLabelName: strconv.FormatInt(nodeID, 10),
|
||||
msgTypeLabelName: UpsertLabel, collectionName: collection,
|
||||
})
|
||||
ProxyRetrySearchCount.Delete(prometheus.Labels{
|
||||
nodeIDLabelName: strconv.FormatInt(nodeID, 10),
|
||||
queryTypeLabelName: SearchLabel,
|
||||
collectionName: collection,
|
||||
})
|
||||
ProxyRetrySearchCount.Delete(prometheus.Labels{
|
||||
nodeIDLabelName: strconv.FormatInt(nodeID, 10),
|
||||
queryTypeLabelName: HybridSearchLabel,
|
||||
collectionName: collection,
|
||||
})
|
||||
ProxyRetrySearchResultInsufficientCount.Delete(prometheus.Labels{
|
||||
nodeIDLabelName: strconv.FormatInt(nodeID, 10),
|
||||
queryTypeLabelName: SearchLabel,
|
||||
collectionName: collection,
|
||||
})
|
||||
ProxyRetrySearchResultInsufficientCount.Delete(prometheus.Labels{
|
||||
nodeIDLabelName: strconv.FormatInt(nodeID, 10),
|
||||
queryTypeLabelName: HybridSearchLabel,
|
||||
collectionName: collection,
|
||||
})
|
||||
}
|
||||
|
|
|
@ -30,8 +30,9 @@ import (
|
|||
// /////////////////////////////////////////////////////////////////////////////
|
||||
// --- common ---
|
||||
type autoIndexConfig struct {
|
||||
Enable ParamItem `refreshable:"true"`
|
||||
EnableOptimize ParamItem `refreshable:"true"`
|
||||
Enable ParamItem `refreshable:"true"`
|
||||
EnableOptimize ParamItem `refreshable:"true"`
|
||||
EnableResultLimitCheck ParamItem `refreshable:"true"`
|
||||
|
||||
IndexParams ParamItem `refreshable:"true"`
|
||||
SparseIndexParams ParamItem `refreshable:"true"`
|
||||
|
@ -76,6 +77,14 @@ func (p *autoIndexConfig) init(base *BaseTable) {
|
|||
}
|
||||
p.EnableOptimize.Init(base.mgr)
|
||||
|
||||
p.EnableResultLimitCheck = ParamItem{
|
||||
Key: "autoIndex.resultLimitCheck",
|
||||
Version: "2.5.0",
|
||||
DefaultValue: "true",
|
||||
PanicIfEmpty: true,
|
||||
}
|
||||
p.EnableResultLimitCheck.Init(base.mgr)
|
||||
|
||||
p.IndexParams = ParamItem{
|
||||
Key: "autoIndex.params.build",
|
||||
Version: "2.2.0",
|
||||
|
|
Loading…
Reference in New Issue