enhance: Retry on inconsistent requery result (#31713) (#31734)

Make retry on ErrInConsistentRequery in proxy rather than in every SDK.

issue: https://github.com/milvus-io/milvus/issues/31642

pr: https://github.com/milvus-io/milvus/pull/31713

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/31575/head
yihao.dai 2024-03-30 20:53:19 +08:00 committed by GitHub
parent 32eff9c6e0
commit bc4a9a1abb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 38 additions and 1 deletions

View File

@ -55,6 +55,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
@ -2595,8 +2596,26 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest)
return it.result, nil
}
// Search search the most similar records of requests.
// Search searches the most similar records of requests.
func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
var err error
rsp := &milvuspb.SearchResults{
Status: merr.Success(),
}
err2 := retry.Handle(ctx, func() (bool, error) {
rsp, err = node.search(ctx, request)
if errors.Is(merr.Error(rsp.GetStatus()), merr.ErrInconsistentRequery) {
return true, merr.Error(rsp.GetStatus())
}
return false, nil
})
if err2 != nil {
rsp.Status = merr.Status(err2)
}
return rsp, err
}
func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
receiveSize := proto.Size(request)
metrics.ProxyReceiveBytes.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
@ -2759,6 +2778,24 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest)
}
func (node *Proxy) HybridSearch(ctx context.Context, request *milvuspb.HybridSearchRequest) (*milvuspb.SearchResults, error) {
var err error
rsp := &milvuspb.SearchResults{
Status: merr.Success(),
}
err2 := retry.Handle(ctx, func() (bool, error) {
rsp, err = node.hybridSearch(ctx, request)
if errors.Is(merr.Error(rsp.GetStatus()), merr.ErrInconsistentRequery) {
return true, merr.Error(rsp.GetStatus())
}
return false, nil
})
if err2 != nil {
rsp.Status = merr.Status(err2)
}
return rsp, err
}
func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSearchRequest) (*milvuspb.SearchResults, error) {
receiveSize := proto.Size(request)
metrics.ProxyReceiveBytes.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),