mirror of https://github.com/milvus-io/milvus.git
Fix cache refetch logic not applied to Query (#21281)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/21263/head
parent
9fa58968b0
commit
74db56dece
|
@ -2,7 +2,6 @@ package proxy
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
@ -17,7 +16,6 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/metrics"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/milvus-io/milvus/internal/util/grpcclient"
|
||||
"github.com/milvus-io/milvus/internal/util/paramtable"
|
||||
"github.com/milvus-io/milvus/internal/util/timerecord"
|
||||
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
||||
|
@ -318,6 +316,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
|
|||
func (t *queryTask) Execute(ctx context.Context) error {
|
||||
tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute query %d", t.ID()))
|
||||
defer tr.CtxElapse(ctx, "done")
|
||||
log := log.Ctx(ctx)
|
||||
|
||||
executeQuery := func(withCache bool) error {
|
||||
shards, err := globalMetaCache.GetShards(ctx, withCache, t.collectionName)
|
||||
|
@ -334,17 +333,17 @@ func (t *queryTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
|
||||
err := executeQuery(WithCache)
|
||||
if errors.Is(err, errInvalidShardLeaders) || funcutil.IsGrpcErr(err) || errors.Is(err, grpcclient.ErrConnect) {
|
||||
log.Ctx(ctx).Warn("invalid shard leaders cache, updating shardleader caches and retry search",
|
||||
if err != nil {
|
||||
log.Warn("invalid shard leaders cache, updating shardleader caches and retry query",
|
||||
zap.Error(err))
|
||||
return executeQuery(WithoutCache)
|
||||
err = executeQuery(WithoutCache)
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("fail to search on all shard leaders, err=%s", err.Error())
|
||||
return fmt.Errorf("fail to query on all shard leaders, err=%s", err.Error())
|
||||
}
|
||||
|
||||
log.Ctx(ctx).Debug("Query Execute done.",
|
||||
zap.Any("requestType", "query"))
|
||||
log.Debug("Query Execute done.",
|
||||
zap.String("requestType", "query"))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -387,6 +387,7 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
|
|||
func (t *searchTask) Execute(ctx context.Context) error {
|
||||
sp, ctx := trace.StartSpanFromContextWithOperationName(t.TraceCtx(), "Proxy-Search-Execute")
|
||||
defer sp.Finish()
|
||||
log := log.Ctx(ctx)
|
||||
|
||||
tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute search %d", t.ID()))
|
||||
defer tr.CtxElapse(ctx, "done")
|
||||
|
@ -399,7 +400,7 @@ func (t *searchTask) Execute(ctx context.Context) error {
|
|||
t.resultBuf = make(chan *internalpb.SearchResults, len(shard2Leaders))
|
||||
t.toReduceResults = make([]*internalpb.SearchResults, 0, len(shard2Leaders))
|
||||
if err := t.searchShardPolicy(ctx, t.shardMgr, t.searchShard, shard2Leaders); err != nil {
|
||||
log.Ctx(ctx).Warn("failed to do search", zap.Error(err), zap.String("Shards", fmt.Sprintf("%v", shard2Leaders)))
|
||||
log.Warn("failed to do search", zap.Error(err), zap.String("Shards", fmt.Sprintf("%v", shard2Leaders)))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
@ -407,7 +408,7 @@ func (t *searchTask) Execute(ctx context.Context) error {
|
|||
|
||||
err := executeSearch(WithCache)
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Warn("first search failed, updating shardleader caches and retry search",
|
||||
log.Warn("first search failed, updating shardleader caches and retry search",
|
||||
zap.Error(err))
|
||||
err = executeSearch(WithoutCache)
|
||||
}
|
||||
|
@ -415,7 +416,7 @@ func (t *searchTask) Execute(ctx context.Context) error {
|
|||
return fmt.Errorf("fail to search on all shard leaders, err=%v", err)
|
||||
}
|
||||
|
||||
log.Ctx(ctx).Debug("Search Execute done.")
|
||||
log.Debug("Search Execute done.")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue