Skip search GRPC call for standalon (#21630)

Signed-off-by: Li Liu <li.liu@zilliz.com>
pull/21646/head
liliu-z 2023-01-10 23:07:40 +08:00 committed by GitHub
parent d78b17f42a
commit e68374b6bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 39 additions and 9 deletions

View File

@ -10,6 +10,7 @@ import (
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/parser/planparserv2"
"github.com/milvus-io/milvus/internal/querynode"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
@ -485,7 +486,16 @@ func (t *searchTask) searchShard(ctx context.Context, nodeID int64, qn types.Que
DmlChannels: channelIDs,
Scope: querypb.DataScope_All,
}
result, err := qn.Search(ctx, req)
queryNode := querynode.GetQueryNode()
var result *internalpb.SearchResults
var err error
if queryNode != nil && queryNode.IsStandAlone {
result, err = queryNode.Search(ctx, req)
} else {
result, err = qn.Search(ctx, req)
}
if err != nil {
log.Ctx(ctx).Warn("QueryNode search return error", zap.Int64("msgID", t.ID()),
zap.Int64("nodeID", nodeID), zap.Strings("channels", channelIDs), zap.Error(err))

View File

@ -729,7 +729,7 @@ func filterSegmentInfo(segmentInfos []*querypb.SegmentInfo, segmentIDs map[int64
// Search performs replica search tasks.
func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) {
if req.GetReq().GetBase().GetTargetID() != node.session.ServerID {
if !node.IsStandAlone && req.GetReq().GetBase().GetTargetID() != node.session.ServerID {
return &internalpb.SearchResults{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_NodeIDNotMatch,
@ -1204,7 +1204,7 @@ func (node *QueryNode) SyncReplicaSegments(ctx context.Context, req *querypb.Syn
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
}
//ShowConfigurations returns the configurations of queryNode matching req.Pattern
// ShowConfigurations returns the configurations of queryNode matching req.Pattern
func (node *QueryNode) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
if !node.isHealthyOrStopping() {
log.Warn("QueryNode.ShowConfigurations failed",

View File

@ -123,22 +123,32 @@ type QueryNode struct {
// pool for load/release channel
taskPool *concurrency.Pool
IsStandAlone bool
}
var queryNode *QueryNode = nil
func GetQueryNode() *QueryNode {
return queryNode
}
// NewQueryNode will return a QueryNode with abnormal state.
func NewQueryNode(ctx context.Context, factory dependency.Factory) *QueryNode {
ctx1, cancel := context.WithCancel(ctx)
node := &QueryNode{
queryNode = &QueryNode{
queryNodeLoopCtx: ctx1,
queryNodeLoopCancel: cancel,
factory: factory,
IsStandAlone: os.Getenv(metricsinfo.DeployModeEnvKey) == metricsinfo.StandaloneDeployMode,
}
node.tSafeReplica = newTSafeReplica()
node.scheduler = newTaskScheduler(ctx1, node.tSafeReplica)
node.UpdateStateCode(commonpb.StateCode_Abnormal)
queryNode.tSafeReplica = newTSafeReplica()
queryNode.scheduler = newTaskScheduler(ctx1, queryNode.tSafeReplica)
queryNode.UpdateStateCode(commonpb.StateCode_Abnormal)
return node
return queryNode
}
func (node *QueryNode) initSession() error {

View File

@ -979,7 +979,17 @@ func (sc *ShardCluster) Search(ctx context.Context, req *querypb.SearchRequest,
wg.Add(1)
go func() {
defer wg.Done()
partialResult, nodeErr := node.client.Search(reqCtx, nodeReq)
queryNode := GetQueryNode()
var partialResult *internalpb.SearchResults
var nodeErr error
if queryNode != nil && queryNode.IsStandAlone {
partialResult, nodeErr = queryNode.Search(reqCtx, nodeReq)
} else {
partialResult, nodeErr = node.client.Search(reqCtx, nodeReq)
}
resultMut.Lock()
defer resultMut.Unlock()
if nodeErr != nil || partialResult.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {