2021-12-03 11:25:45 +00:00
|
|
|
|
// Licensed to the LF AI & Data foundation under one
|
|
|
|
|
// or more contributor license agreements. See the NOTICE file
|
|
|
|
|
// distributed with this work for additional information
|
|
|
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
|
|
|
// to you under the Apache License, Version 2.0 (the
|
|
|
|
|
// "License"); you may not use this file except in compliance
|
2021-04-19 05:47:10 +00:00
|
|
|
|
// with the License. You may obtain a copy of the License at
|
|
|
|
|
//
|
2021-12-03 11:25:45 +00:00
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
2021-04-19 05:47:10 +00:00
|
|
|
|
//
|
2021-12-03 11:25:45 +00:00
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
|
// limitations under the License.
|
2021-04-19 05:47:10 +00:00
|
|
|
|
|
2021-04-12 01:18:43 +00:00
|
|
|
|
package querynode
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
2022-07-18 01:58:28 +00:00
|
|
|
|
"errors"
|
2021-06-15 04:41:40 +00:00
|
|
|
|
"fmt"
|
2022-10-31 05:55:33 +00:00
|
|
|
|
"sort"
|
2022-09-16 01:56:47 +00:00
|
|
|
|
"strconv"
|
2022-07-06 07:06:21 +00:00
|
|
|
|
"sync"
|
2022-10-31 05:55:33 +00:00
|
|
|
|
"time"
|
2021-04-12 01:18:43 +00:00
|
|
|
|
|
2022-09-16 01:56:47 +00:00
|
|
|
|
"github.com/golang/protobuf/proto"
|
2021-04-12 01:18:43 +00:00
|
|
|
|
"go.uber.org/zap"
|
2022-07-06 07:06:21 +00:00
|
|
|
|
"golang.org/x/sync/errgroup"
|
2021-04-12 01:18:43 +00:00
|
|
|
|
|
2022-10-16 12:49:27 +00:00
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
|
2021-11-25 07:03:16 +00:00
|
|
|
|
"github.com/milvus-io/milvus/internal/common"
|
2021-04-22 06:45:57 +00:00
|
|
|
|
"github.com/milvus-io/milvus/internal/log"
|
2022-05-27 06:12:01 +00:00
|
|
|
|
"github.com/milvus-io/milvus/internal/metrics"
|
2021-04-22 06:45:57 +00:00
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
2022-04-27 02:41:46 +00:00
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
2021-11-05 08:00:55 +00:00
|
|
|
|
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
2022-05-27 06:12:01 +00:00
|
|
|
|
"github.com/milvus-io/milvus/internal/util/timerecord"
|
2021-04-22 06:45:57 +00:00
|
|
|
|
"github.com/milvus-io/milvus/internal/util/typeutil"
|
2021-04-12 01:18:43 +00:00
|
|
|
|
)
|
|
|
|
|
|
2021-12-24 05:26:30 +00:00
|
|
|
|
// GetComponentStates returns information about whether the node is healthy
|
2022-10-10 07:55:22 +00:00
|
|
|
|
func (node *QueryNode) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) {
|
|
|
|
|
stats := &milvuspb.ComponentStates{
|
2021-04-12 01:18:43 +00:00
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
|
},
|
|
|
|
|
}
|
2022-10-10 07:55:22 +00:00
|
|
|
|
code, ok := node.stateCode.Load().(commonpb.StateCode)
|
2021-11-25 07:03:16 +00:00
|
|
|
|
if !ok {
|
|
|
|
|
errMsg := "unexpected error in type assertion"
|
2021-04-12 01:18:43 +00:00
|
|
|
|
stats.Status = &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
2021-11-25 07:03:16 +00:00
|
|
|
|
Reason: errMsg,
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return stats, nil
|
2021-11-25 07:03:16 +00:00
|
|
|
|
}
|
|
|
|
|
nodeID := common.NotRegisteredID
|
|
|
|
|
if node.session != nil && node.session.Registered() {
|
|
|
|
|
nodeID = node.session.ServerID
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
2022-10-10 07:55:22 +00:00
|
|
|
|
info := &milvuspb.ComponentInfo{
|
2021-11-25 07:03:16 +00:00
|
|
|
|
NodeID: nodeID,
|
2021-04-12 01:18:43 +00:00
|
|
|
|
Role: typeutil.QueryNodeRole,
|
|
|
|
|
StateCode: code,
|
|
|
|
|
}
|
|
|
|
|
stats.State = info
|
2021-12-06 07:25:35 +00:00
|
|
|
|
log.Debug("Get QueryNode component state done", zap.Any("stateCode", info.StateCode))
|
2021-04-12 01:18:43 +00:00
|
|
|
|
return stats, nil
|
|
|
|
|
}
|
|
|
|
|
|
2021-10-29 12:10:43 +00:00
|
|
|
|
// GetTimeTickChannel returns the time tick channel
|
|
|
|
|
// TimeTickChannel contains many time tick messages, which will be sent by query nodes
|
2021-04-12 01:18:43 +00:00
|
|
|
|
func (node *QueryNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
|
|
|
|
return &milvuspb.StringResponse{
|
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
|
Reason: "",
|
|
|
|
|
},
|
2022-03-04 03:17:56 +00:00
|
|
|
|
Value: Params.CommonCfg.QueryCoordTimeTick,
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
2021-12-24 05:26:30 +00:00
|
|
|
|
// GetStatisticsChannel returns the statistics channel
|
2021-10-30 11:00:52 +00:00
|
|
|
|
// Statistics channel contains statistics infos of query nodes, such as segment infos, memory infos
|
2021-04-12 01:18:43 +00:00
|
|
|
|
func (node *QueryNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
|
|
|
|
return &milvuspb.StringResponse{
|
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
|
Reason: "",
|
|
|
|
|
},
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
2022-07-18 01:58:28 +00:00
|
|
|
|
func (node *QueryNode) GetStatistics(ctx context.Context, req *querypb.GetStatisticsRequest) (*internalpb.GetStatisticsResponse, error) {
|
|
|
|
|
log.Debug("received GetStatisticsRequest",
|
|
|
|
|
zap.Int64("msgID", req.GetReq().GetBase().GetMsgID()),
|
|
|
|
|
zap.Strings("vChannels", req.GetDmlChannels()),
|
|
|
|
|
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
|
|
|
|
|
zap.Uint64("guaranteeTimestamp", req.GetReq().GetGuaranteeTimestamp()),
|
|
|
|
|
zap.Uint64("timeTravel", req.GetReq().GetTravelTimestamp()))
|
|
|
|
|
|
|
|
|
|
failRet := &internalpb.GetStatisticsResponse{
|
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
toReduceResults := make([]*internalpb.GetStatisticsResponse, 0)
|
|
|
|
|
runningGp, runningCtx := errgroup.WithContext(ctx)
|
|
|
|
|
mu := &sync.Mutex{}
|
|
|
|
|
for _, ch := range req.GetDmlChannels() {
|
|
|
|
|
ch := ch
|
|
|
|
|
req := &querypb.GetStatisticsRequest{
|
|
|
|
|
Req: req.Req,
|
|
|
|
|
DmlChannels: []string{ch},
|
|
|
|
|
SegmentIDs: req.SegmentIDs,
|
|
|
|
|
FromShardLeader: req.FromShardLeader,
|
|
|
|
|
Scope: req.Scope,
|
|
|
|
|
}
|
|
|
|
|
runningGp.Go(func() error {
|
|
|
|
|
ret, err := node.getStatisticsWithDmlChannel(runningCtx, req, ch)
|
|
|
|
|
mu.Lock()
|
|
|
|
|
defer mu.Unlock()
|
|
|
|
|
if err != nil {
|
|
|
|
|
failRet.Status.Reason = err.Error()
|
|
|
|
|
failRet.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if ret.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
|
|
|
|
|
failRet.Status.Reason = ret.Status.Reason
|
|
|
|
|
failRet.Status.ErrorCode = ret.Status.ErrorCode
|
|
|
|
|
return fmt.Errorf("%s", ret.Status.Reason)
|
|
|
|
|
}
|
|
|
|
|
toReduceResults = append(toReduceResults, ret)
|
|
|
|
|
return nil
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
if err := runningGp.Wait(); err != nil {
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
ret, err := reduceStatisticResponse(toReduceResults)
|
|
|
|
|
if err != nil {
|
|
|
|
|
failRet.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
|
|
|
|
failRet.Status.Reason = err.Error()
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
return ret, nil
|
|
|
|
|
}
|
|
|
|
|
|
2022-09-15 10:48:32 +00:00
|
|
|
|
func (node *QueryNode) getStatisticsWithDmlChannel(ctx context.Context, req *querypb.GetStatisticsRequest, dmlChannel string) (*internalpb.GetStatisticsResponse, error) {
|
2022-07-18 01:58:28 +00:00
|
|
|
|
failRet := &internalpb.GetStatisticsResponse{
|
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !node.isHealthy() {
|
|
|
|
|
failRet.Status.Reason = msgQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID())
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
msgID := req.GetReq().GetBase().GetMsgID()
|
|
|
|
|
log.Debug("received GetStatisticRequest",
|
|
|
|
|
zap.Int64("msgID", msgID),
|
|
|
|
|
zap.Bool("fromShardLeader", req.GetFromShardLeader()),
|
|
|
|
|
zap.String("vChannel", dmlChannel),
|
|
|
|
|
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
|
|
|
|
|
zap.Uint64("guaranteeTimestamp", req.GetReq().GetGuaranteeTimestamp()),
|
|
|
|
|
zap.Uint64("timeTravel", req.GetReq().GetTravelTimestamp()))
|
|
|
|
|
|
|
|
|
|
if node.queryShardService == nil {
|
|
|
|
|
failRet.Status.Reason = "queryShardService is nil"
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
qs, err := node.queryShardService.getQueryShard(dmlChannel)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Warn("get statistics failed, failed to get query shard",
|
|
|
|
|
zap.Int64("msgID", msgID),
|
|
|
|
|
zap.String("dml channel", dmlChannel),
|
|
|
|
|
zap.Error(err))
|
|
|
|
|
failRet.Status.ErrorCode = commonpb.ErrorCode_NotShardLeader
|
|
|
|
|
failRet.Status.Reason = err.Error()
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.Debug("start do statistics",
|
|
|
|
|
zap.Int64("msgID", msgID),
|
|
|
|
|
zap.Bool("fromShardLeader", req.GetFromShardLeader()),
|
|
|
|
|
zap.String("vChannel", dmlChannel),
|
|
|
|
|
zap.Int64s("segmentIDs", req.GetSegmentIDs()))
|
|
|
|
|
tr := timerecord.NewTimeRecorder("")
|
|
|
|
|
|
|
|
|
|
waitCanDo := func(ctx context.Context) error {
|
|
|
|
|
l := node.tSafeReplica.WatchChannel(dmlChannel)
|
|
|
|
|
defer l.Unregister()
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-l.On():
|
|
|
|
|
serviceTime, err := qs.getServiceableTime(dmlChannel)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
guaranteeTs := req.GetReq().GetGuaranteeTimestamp()
|
|
|
|
|
if guaranteeTs <= serviceTime {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return errors.New("get statistics context timeout")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if req.FromShardLeader {
|
|
|
|
|
historicalTask := newStatistics(ctx, req, querypb.DataScope_Historical, qs, waitCanDo)
|
|
|
|
|
err := historicalTask.Execute(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
failRet.Status.Reason = err.Error()
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tr.Elapse(fmt.Sprintf("do statistics done, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
|
|
|
|
|
msgID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs()))
|
|
|
|
|
failRet.Status.ErrorCode = commonpb.ErrorCode_Success
|
|
|
|
|
return historicalTask.Ret, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// from Proxy
|
|
|
|
|
|
|
|
|
|
cluster, ok := qs.clusterService.getShardCluster(dmlChannel)
|
|
|
|
|
if !ok {
|
|
|
|
|
failRet.Status.ErrorCode = commonpb.ErrorCode_NotShardLeader
|
|
|
|
|
failRet.Status.Reason = fmt.Sprintf("channel %s leader is not here", dmlChannel)
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
statisticCtx, cancel := context.WithCancel(ctx)
|
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
|
|
var results []*internalpb.GetStatisticsResponse
|
|
|
|
|
var streamingResult *internalpb.GetStatisticsResponse
|
|
|
|
|
var errCluster error
|
|
|
|
|
|
|
|
|
|
withStreaming := func(ctx context.Context) error {
|
|
|
|
|
streamingTask := newStatistics(ctx, req, querypb.DataScope_Streaming, qs, waitCanDo)
|
|
|
|
|
err := streamingTask.Execute(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
streamingResult = streamingTask.Ret
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// shard leader dispatches request to its shard cluster
|
|
|
|
|
results, errCluster = cluster.GetStatistics(statisticCtx, req, withStreaming)
|
|
|
|
|
if errCluster != nil {
|
|
|
|
|
log.Warn("get statistics on cluster failed", zap.Int64("msgID", msgID), zap.Int64("collectionID", req.Req.GetCollectionID()), zap.Error(errCluster))
|
|
|
|
|
failRet.Status.Reason = errCluster.Error()
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tr.Elapse(fmt.Sprintf("start reduce statistic result, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
|
|
|
|
|
msgID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs()))
|
|
|
|
|
|
|
|
|
|
results = append(results, streamingResult)
|
|
|
|
|
ret, err := reduceStatisticResponse(results)
|
|
|
|
|
if err != nil {
|
|
|
|
|
failRet.Status.Reason = err.Error()
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
log.Debug("reduce statistic result done", zap.Int64("msgID", msgID), zap.Any("results", ret))
|
|
|
|
|
|
|
|
|
|
tr.Elapse(fmt.Sprintf("do statistics done, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
|
|
|
|
|
msgID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs()))
|
|
|
|
|
|
|
|
|
|
failRet.Status.ErrorCode = commonpb.ErrorCode_Success
|
|
|
|
|
return ret, nil
|
|
|
|
|
}
|
|
|
|
|
|
2021-12-24 09:28:20 +00:00
|
|
|
|
// WatchDmChannels create consumers on dmChannels to receive Incremental data,which is the important part of real-time query
|
2022-09-15 10:48:32 +00:00
|
|
|
|
func (node *QueryNode) WatchDmChannels(ctx context.Context, in *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) {
|
2022-10-08 12:26:57 +00:00
|
|
|
|
// check node healthy
|
2022-10-10 07:55:22 +00:00
|
|
|
|
code := node.stateCode.Load().(commonpb.StateCode)
|
|
|
|
|
if code != commonpb.StateCode_Healthy {
|
2022-04-24 14:03:44 +00:00
|
|
|
|
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
|
2021-07-13 06:16:00 +00:00
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return status, nil
|
2021-07-13 06:16:00 +00:00
|
|
|
|
}
|
2022-10-08 12:26:57 +00:00
|
|
|
|
|
|
|
|
|
// check target matches
|
|
|
|
|
if in.GetBase().GetTargetID() != node.session.ServerID {
|
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_NodeIDNotMatch,
|
|
|
|
|
Reason: common.WrapNodeIDNotMatchMsg(in.GetBase().GetTargetID(), node.session.ServerID),
|
|
|
|
|
}
|
|
|
|
|
return status, nil
|
|
|
|
|
}
|
|
|
|
|
|
2022-09-15 10:48:32 +00:00
|
|
|
|
task := &watchDmChannelsTask{
|
2021-04-16 06:40:33 +00:00
|
|
|
|
baseTask: baseTask{
|
|
|
|
|
ctx: ctx,
|
|
|
|
|
done: make(chan error),
|
|
|
|
|
},
|
|
|
|
|
req: in,
|
|
|
|
|
node: node,
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
|
|
|
|
|
2022-10-31 05:55:33 +00:00
|
|
|
|
startTs := time.Now()
|
|
|
|
|
log.Info("watchDmChannels init", zap.Int64("collectionID", in.CollectionID),
|
|
|
|
|
zap.String("channelName", in.Infos[0].GetChannelName()),
|
|
|
|
|
zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
|
|
|
|
|
// currently we only support load one channel as a time
|
|
|
|
|
node.taskLock.RLock(strconv.FormatInt(in.Infos[0].CollectionID, 10))
|
|
|
|
|
defer node.taskLock.RUnlock(strconv.FormatInt(in.Infos[0].CollectionID, 10))
|
|
|
|
|
future := node.taskPool.Submit(func() (interface{}, error) {
|
|
|
|
|
log.Info("watchDmChannels start ", zap.Int64("collectionID", in.CollectionID),
|
|
|
|
|
zap.String("channelName", in.Infos[0].GetChannelName()),
|
|
|
|
|
zap.Duration("timeInQueue", time.Since(startTs)))
|
|
|
|
|
err := task.PreExecute(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}
|
|
|
|
|
log.Warn("failed to subscribe channel on preExecute ", zap.Error(err))
|
|
|
|
|
return status, nil
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
2022-10-31 05:55:33 +00:00
|
|
|
|
|
|
|
|
|
err = task.Execute(ctx)
|
2021-04-12 01:18:43 +00:00
|
|
|
|
if err != nil {
|
2021-08-03 14:01:27 +00:00
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}
|
2022-10-31 05:55:33 +00:00
|
|
|
|
log.Warn("failed to subscribe channel ", zap.Error(err))
|
|
|
|
|
return status, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
err = task.PostExecute(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}
|
|
|
|
|
log.Warn("failed to unsubscribe channel on postExecute ", zap.Error(err))
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return status, nil
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
2022-09-15 10:48:32 +00:00
|
|
|
|
|
|
|
|
|
sc, _ := node.ShardClusterService.getShardCluster(in.Infos[0].GetChannelName())
|
2022-10-20 08:35:28 +00:00
|
|
|
|
sc.SetupFirstVersion()
|
2022-10-31 05:55:33 +00:00
|
|
|
|
log.Info("successfully watchDmChannelsTask", zap.Int64("collectionID", in.CollectionID),
|
|
|
|
|
zap.String("channelName", in.Infos[0].GetChannelName()), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
|
2021-08-03 14:01:27 +00:00
|
|
|
|
return &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
|
}, nil
|
2022-10-31 05:55:33 +00:00
|
|
|
|
})
|
|
|
|
|
ret, _ := future.Await()
|
|
|
|
|
return ret.(*commonpb.Status), nil
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
|
|
|
|
|
2022-09-15 10:48:32 +00:00
|
|
|
|
func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmChannelRequest) (*commonpb.Status, error) {
|
2022-10-08 12:26:57 +00:00
|
|
|
|
// check node healthy
|
2022-10-10 07:55:22 +00:00
|
|
|
|
code := node.stateCode.Load().(commonpb.StateCode)
|
|
|
|
|
if code != commonpb.StateCode_Healthy {
|
2022-09-15 10:48:32 +00:00
|
|
|
|
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
|
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}
|
|
|
|
|
return status, nil
|
|
|
|
|
}
|
2022-10-08 12:26:57 +00:00
|
|
|
|
|
|
|
|
|
// check target matches
|
|
|
|
|
if req.GetBase().GetTargetID() != node.session.ServerID {
|
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_NodeIDNotMatch,
|
|
|
|
|
Reason: common.WrapNodeIDNotMatchMsg(req.GetBase().GetTargetID(), node.session.ServerID),
|
|
|
|
|
}
|
|
|
|
|
return status, nil
|
|
|
|
|
}
|
|
|
|
|
|
2022-09-15 10:48:32 +00:00
|
|
|
|
dct := &releaseCollectionTask{
|
|
|
|
|
baseTask: baseTask{
|
|
|
|
|
ctx: ctx,
|
|
|
|
|
done: make(chan error),
|
|
|
|
|
},
|
|
|
|
|
req: &querypb.ReleaseCollectionRequest{
|
|
|
|
|
Base: req.GetBase(),
|
|
|
|
|
CollectionID: req.GetCollectionID(),
|
|
|
|
|
NodeID: req.GetNodeID(),
|
|
|
|
|
},
|
|
|
|
|
node: node,
|
|
|
|
|
}
|
|
|
|
|
|
2022-10-31 05:55:33 +00:00
|
|
|
|
node.taskLock.Lock(strconv.FormatInt(dct.req.CollectionID, 10))
|
|
|
|
|
defer node.taskLock.Unlock(strconv.FormatInt(dct.req.CollectionID, 10))
|
2022-09-15 10:48:32 +00:00
|
|
|
|
err := node.scheduler.queue.Enqueue(dct)
|
|
|
|
|
if err != nil {
|
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}
|
2022-10-31 05:55:33 +00:00
|
|
|
|
log.Warn("failed to enqueue subscribe channel task", zap.Error(err))
|
2022-09-15 10:48:32 +00:00
|
|
|
|
return status, nil
|
|
|
|
|
}
|
|
|
|
|
log.Info("unsubDmChannel(ReleaseCollection) enqueue done", zap.Int64("collectionID", req.GetCollectionID()))
|
|
|
|
|
|
|
|
|
|
func() {
|
|
|
|
|
err = dct.WaitToFinish()
|
|
|
|
|
if err != nil {
|
2022-10-31 05:55:33 +00:00
|
|
|
|
log.Warn("failed to do subscribe channel task successfully", zap.Error(err))
|
2022-09-15 10:48:32 +00:00
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
log.Info("unsubDmChannel(ReleaseCollection) WaitToFinish done", zap.Int64("collectionID", req.GetCollectionID()))
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
|
}
|
|
|
|
|
return status, nil
|
|
|
|
|
}
|
|
|
|
|
|
2021-11-02 13:58:22 +00:00
|
|
|
|
// LoadSegments load historical data into query node, historical data can be vector data or index
|
2022-09-15 10:48:32 +00:00
|
|
|
|
func (node *QueryNode) LoadSegments(ctx context.Context, in *querypb.LoadSegmentsRequest) (*commonpb.Status, error) {
|
2022-10-08 12:26:57 +00:00
|
|
|
|
// check node healthy
|
2022-10-10 07:55:22 +00:00
|
|
|
|
code := node.stateCode.Load().(commonpb.StateCode)
|
|
|
|
|
if code != commonpb.StateCode_Healthy {
|
2022-04-24 14:03:44 +00:00
|
|
|
|
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
|
2021-07-13 06:16:00 +00:00
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return status, nil
|
2021-07-13 06:16:00 +00:00
|
|
|
|
}
|
2022-10-08 12:26:57 +00:00
|
|
|
|
// check target matches
|
|
|
|
|
if in.GetBase().GetTargetID() != node.session.ServerID {
|
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_NodeIDNotMatch,
|
|
|
|
|
Reason: common.WrapNodeIDNotMatchMsg(in.GetBase().GetTargetID(), node.session.ServerID),
|
|
|
|
|
}
|
|
|
|
|
return status, nil
|
|
|
|
|
}
|
2022-09-15 10:48:32 +00:00
|
|
|
|
|
|
|
|
|
if in.GetNeedTransfer() {
|
|
|
|
|
return node.TransferLoad(ctx, in)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
task := &loadSegmentsTask{
|
2021-04-12 01:18:43 +00:00
|
|
|
|
baseTask: baseTask{
|
|
|
|
|
ctx: ctx,
|
|
|
|
|
done: make(chan error),
|
|
|
|
|
},
|
|
|
|
|
req: in,
|
|
|
|
|
node: node,
|
|
|
|
|
}
|
|
|
|
|
|
2022-06-02 04:52:03 +00:00
|
|
|
|
segmentIDs := make([]UniqueID, 0, len(in.GetInfos()))
|
|
|
|
|
for _, info := range in.Infos {
|
|
|
|
|
segmentIDs = append(segmentIDs, info.SegmentID)
|
|
|
|
|
}
|
2022-10-31 05:55:33 +00:00
|
|
|
|
sort.SliceStable(segmentIDs, func(i, j int) bool {
|
|
|
|
|
return segmentIDs[i] < segmentIDs[j]
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
startTs := time.Now()
|
|
|
|
|
log.Info("loadSegmentsTask init", zap.Int64("collectionID", in.CollectionID),
|
|
|
|
|
zap.Int64s("segmentIDs", segmentIDs),
|
|
|
|
|
zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
|
|
|
|
|
|
|
|
|
|
node.taskLock.RLock(strconv.FormatInt(in.CollectionID, 10))
|
|
|
|
|
for _, segmentID := range segmentIDs {
|
|
|
|
|
node.taskLock.Lock(strconv.FormatInt(segmentID, 10))
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
2022-06-02 04:52:03 +00:00
|
|
|
|
|
2022-10-31 05:55:33 +00:00
|
|
|
|
// release all task locks
|
|
|
|
|
defer func() {
|
|
|
|
|
node.taskLock.RUnlock(strconv.FormatInt(in.CollectionID, 10))
|
|
|
|
|
for _, id := range segmentIDs {
|
|
|
|
|
node.taskLock.Unlock(strconv.FormatInt(id, 10))
|
|
|
|
|
}
|
|
|
|
|
}()
|
2022-11-01 11:25:37 +00:00
|
|
|
|
|
|
|
|
|
// TODO remove concurrent load segment for now, unless we solve the memory issue
|
|
|
|
|
log.Info("loadSegmentsTask start ", zap.Int64("collectionID", in.CollectionID),
|
|
|
|
|
zap.Int64s("segmentIDs", segmentIDs),
|
|
|
|
|
zap.Duration("timeInQueue", time.Since(startTs)))
|
2022-11-01 12:33:35 +00:00
|
|
|
|
err := node.scheduler.queue.Enqueue(task)
|
2022-11-01 11:25:37 +00:00
|
|
|
|
if err != nil {
|
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
2022-10-31 05:55:33 +00:00
|
|
|
|
}
|
2022-11-01 12:33:35 +00:00
|
|
|
|
log.Warn(err.Error())
|
2022-11-01 11:25:37 +00:00
|
|
|
|
return status, nil
|
|
|
|
|
}
|
2021-04-12 01:18:43 +00:00
|
|
|
|
|
2022-11-01 12:33:35 +00:00
|
|
|
|
log.Info("loadSegmentsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", segmentIDs), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
|
|
|
|
|
|
|
|
|
|
waitFunc := func() (*commonpb.Status, error) {
|
|
|
|
|
err = task.WaitToFinish()
|
|
|
|
|
if err != nil {
|
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}
|
|
|
|
|
log.Warn(err.Error())
|
|
|
|
|
return status, nil
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
2022-11-01 12:33:35 +00:00
|
|
|
|
log.Info("loadSegmentsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", segmentIDs), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
|
|
|
|
|
return &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
|
}, nil
|
2022-11-01 11:25:37 +00:00
|
|
|
|
}
|
2022-11-01 12:33:35 +00:00
|
|
|
|
|
|
|
|
|
return waitFunc()
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
|
|
|
|
|
2021-12-24 09:14:52 +00:00
|
|
|
|
// ReleaseCollection clears all data related to this collection on the querynode
|
2022-09-15 10:48:32 +00:00
|
|
|
|
func (node *QueryNode) ReleaseCollection(ctx context.Context, in *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
|
2022-10-10 07:55:22 +00:00
|
|
|
|
code := node.stateCode.Load().(commonpb.StateCode)
|
|
|
|
|
if code != commonpb.StateCode_Healthy {
|
2022-04-24 14:03:44 +00:00
|
|
|
|
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
|
2021-07-13 06:16:00 +00:00
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return status, nil
|
2021-07-13 06:16:00 +00:00
|
|
|
|
}
|
2021-04-12 01:18:43 +00:00
|
|
|
|
dct := &releaseCollectionTask{
|
|
|
|
|
baseTask: baseTask{
|
|
|
|
|
ctx: ctx,
|
|
|
|
|
done: make(chan error),
|
|
|
|
|
},
|
|
|
|
|
req: in,
|
|
|
|
|
node: node,
|
|
|
|
|
}
|
|
|
|
|
|
2022-10-31 05:55:33 +00:00
|
|
|
|
node.taskLock.Lock(strconv.FormatInt(dct.req.CollectionID, 10))
|
|
|
|
|
defer node.taskLock.Unlock(strconv.FormatInt(dct.req.CollectionID, 10))
|
2021-04-12 01:18:43 +00:00
|
|
|
|
err := node.scheduler.queue.Enqueue(dct)
|
|
|
|
|
if err != nil {
|
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Warn(err.Error())
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return status, nil
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Info("releaseCollectionTask Enqueue done", zap.Int64("collectionID", in.CollectionID))
|
2021-04-12 01:18:43 +00:00
|
|
|
|
|
2021-04-16 09:37:50 +00:00
|
|
|
|
func() {
|
2021-04-16 06:40:33 +00:00
|
|
|
|
err = dct.WaitToFinish()
|
|
|
|
|
if err != nil {
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Warn(err.Error())
|
2021-04-16 06:40:33 +00:00
|
|
|
|
return
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Info("releaseCollectionTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID))
|
2021-04-16 06:40:33 +00:00
|
|
|
|
}()
|
2021-04-12 01:18:43 +00:00
|
|
|
|
|
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
|
}
|
|
|
|
|
return status, nil
|
|
|
|
|
}
|
|
|
|
|
|
2021-11-01 14:58:53 +00:00
|
|
|
|
// ReleasePartitions clears all data related to this partition on the querynode
|
2022-09-15 10:48:32 +00:00
|
|
|
|
func (node *QueryNode) ReleasePartitions(ctx context.Context, in *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) {
|
2022-10-10 07:55:22 +00:00
|
|
|
|
code := node.stateCode.Load().(commonpb.StateCode)
|
|
|
|
|
if code != commonpb.StateCode_Healthy {
|
2022-04-24 14:03:44 +00:00
|
|
|
|
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
|
2021-07-13 06:16:00 +00:00
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return status, nil
|
2021-07-13 06:16:00 +00:00
|
|
|
|
}
|
2021-04-12 01:18:43 +00:00
|
|
|
|
dct := &releasePartitionsTask{
|
|
|
|
|
baseTask: baseTask{
|
|
|
|
|
ctx: ctx,
|
|
|
|
|
done: make(chan error),
|
|
|
|
|
},
|
|
|
|
|
req: in,
|
|
|
|
|
node: node,
|
|
|
|
|
}
|
|
|
|
|
|
2022-10-31 05:55:33 +00:00
|
|
|
|
node.taskLock.Lock(strconv.FormatInt(dct.req.CollectionID, 10))
|
|
|
|
|
defer node.taskLock.Unlock(strconv.FormatInt(dct.req.CollectionID, 10))
|
2021-04-12 01:18:43 +00:00
|
|
|
|
err := node.scheduler.queue.Enqueue(dct)
|
|
|
|
|
if err != nil {
|
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Warn(err.Error())
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return status, nil
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Info("releasePartitionsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("partitionIDs", in.PartitionIDs))
|
2021-04-12 01:18:43 +00:00
|
|
|
|
|
2021-04-16 09:37:50 +00:00
|
|
|
|
func() {
|
2021-04-16 06:40:33 +00:00
|
|
|
|
err = dct.WaitToFinish()
|
|
|
|
|
if err != nil {
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Warn(err.Error())
|
2021-04-16 06:40:33 +00:00
|
|
|
|
return
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Info("releasePartitionsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("partitionIDs", in.PartitionIDs))
|
2021-04-16 06:40:33 +00:00
|
|
|
|
}()
|
2021-04-12 01:18:43 +00:00
|
|
|
|
|
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
|
}
|
|
|
|
|
return status, nil
|
|
|
|
|
}
|
|
|
|
|
|
2021-11-03 15:13:26 +00:00
|
|
|
|
// ReleaseSegments remove the specified segments from query node according segmentIDs, partitionIDs, and collectionID
|
2022-09-15 10:48:32 +00:00
|
|
|
|
func (node *QueryNode) ReleaseSegments(ctx context.Context, in *querypb.ReleaseSegmentsRequest) (*commonpb.Status, error) {
|
2022-10-08 12:26:57 +00:00
|
|
|
|
// check node healthy
|
2022-10-10 07:55:22 +00:00
|
|
|
|
code := node.stateCode.Load().(commonpb.StateCode)
|
|
|
|
|
if code != commonpb.StateCode_Healthy {
|
2022-04-24 14:03:44 +00:00
|
|
|
|
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
|
2021-07-13 06:16:00 +00:00
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return status, nil
|
2021-07-13 06:16:00 +00:00
|
|
|
|
}
|
2022-10-08 12:26:57 +00:00
|
|
|
|
// check target matches
|
|
|
|
|
if in.GetBase().GetTargetID() != node.session.ServerID {
|
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_NodeIDNotMatch,
|
|
|
|
|
Reason: common.WrapNodeIDNotMatchMsg(in.GetBase().GetTargetID(), node.session.ServerID),
|
|
|
|
|
}
|
|
|
|
|
return status, nil
|
|
|
|
|
}
|
2022-06-01 05:18:02 +00:00
|
|
|
|
|
2022-09-15 10:48:32 +00:00
|
|
|
|
if in.GetNeedTransfer() {
|
|
|
|
|
return node.TransferRelease(ctx, in)
|
2022-06-22 08:12:14 +00:00
|
|
|
|
}
|
|
|
|
|
|
2022-10-31 05:55:33 +00:00
|
|
|
|
log.Info("start to release segments", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", in.SegmentIDs))
|
|
|
|
|
node.taskLock.RLock(strconv.FormatInt(in.CollectionID, 10))
|
|
|
|
|
sort.SliceStable(in.SegmentIDs, func(i, j int) bool {
|
|
|
|
|
return in.SegmentIDs[i] < in.SegmentIDs[j]
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
for _, segmentID := range in.SegmentIDs {
|
|
|
|
|
node.taskLock.Lock(strconv.FormatInt(segmentID, 10))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// release all task locks
|
|
|
|
|
defer func() {
|
|
|
|
|
node.taskLock.RUnlock(strconv.FormatInt(in.CollectionID, 10))
|
|
|
|
|
for _, id := range in.SegmentIDs {
|
|
|
|
|
node.taskLock.Unlock(strconv.FormatInt(id, 10))
|
|
|
|
|
}
|
|
|
|
|
}()
|
2021-04-12 01:18:43 +00:00
|
|
|
|
for _, id := range in.SegmentIDs {
|
2022-06-17 09:38:12 +00:00
|
|
|
|
switch in.GetScope() {
|
2022-09-15 10:48:32 +00:00
|
|
|
|
case querypb.DataScope_Streaming:
|
2022-06-17 09:38:12 +00:00
|
|
|
|
node.metaReplica.removeSegment(id, segmentTypeGrowing)
|
2022-09-15 10:48:32 +00:00
|
|
|
|
case querypb.DataScope_Historical:
|
2022-06-17 09:38:12 +00:00
|
|
|
|
node.metaReplica.removeSegment(id, segmentTypeSealed)
|
2022-09-15 10:48:32 +00:00
|
|
|
|
case querypb.DataScope_All:
|
2022-06-17 09:38:12 +00:00
|
|
|
|
node.metaReplica.removeSegment(id, segmentTypeSealed)
|
|
|
|
|
node.metaReplica.removeSegment(id, segmentTypeGrowing)
|
|
|
|
|
}
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
2022-01-05 13:15:20 +00:00
|
|
|
|
|
2022-10-26 09:17:31 +00:00
|
|
|
|
// note that argument is dmlchannel name
|
|
|
|
|
node.dataSyncService.removeEmptyFlowGraphByChannel(in.GetCollectionID(), in.GetShard())
|
|
|
|
|
|
2022-06-17 09:38:12 +00:00
|
|
|
|
log.Info("release segments done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", in.SegmentIDs), zap.String("Scope", in.GetScope().String()))
|
2022-06-01 05:18:02 +00:00
|
|
|
|
return &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
|
}, nil
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
|
|
|
|
|
2021-11-02 13:56:34 +00:00
|
|
|
|
// GetSegmentInfo returns segment information of the collection on the queryNode, and the information includes memSize, numRow, indexName, indexID ...
|
2022-09-15 10:48:32 +00:00
|
|
|
|
func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) {
|
2022-10-10 07:55:22 +00:00
|
|
|
|
code := node.stateCode.Load().(commonpb.StateCode)
|
|
|
|
|
if code != commonpb.StateCode_Healthy {
|
2022-04-24 14:03:44 +00:00
|
|
|
|
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
|
2022-09-15 10:48:32 +00:00
|
|
|
|
res := &querypb.GetSegmentInfoResponse{
|
2021-07-13 06:16:00 +00:00
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
},
|
|
|
|
|
}
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return res, nil
|
2021-07-13 06:16:00 +00:00
|
|
|
|
}
|
2022-09-15 10:48:32 +00:00
|
|
|
|
var segmentInfos []*querypb.SegmentInfo
|
2022-02-07 07:35:44 +00:00
|
|
|
|
|
|
|
|
|
segmentIDs := make(map[int64]struct{})
|
|
|
|
|
for _, segmentID := range in.GetSegmentIDs() {
|
|
|
|
|
segmentIDs[segmentID] = struct{}{}
|
|
|
|
|
}
|
2021-11-05 08:00:55 +00:00
|
|
|
|
|
2022-05-31 05:42:03 +00:00
|
|
|
|
infos := node.metaReplica.getSegmentInfosByColID(in.CollectionID)
|
|
|
|
|
segmentInfos = append(segmentInfos, filterSegmentInfo(infos, segmentIDs)...)
|
2021-11-06 07:22:56 +00:00
|
|
|
|
|
2022-09-15 10:48:32 +00:00
|
|
|
|
return &querypb.GetSegmentInfoResponse{
|
2021-04-12 01:18:43 +00:00
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
|
},
|
2022-02-07 07:35:44 +00:00
|
|
|
|
Infos: segmentInfos,
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}, nil
|
|
|
|
|
}
|
2021-08-17 02:06:11 +00:00
|
|
|
|
|
2022-02-07 07:35:44 +00:00
|
|
|
|
// filterSegmentInfo returns segment info which segment id in segmentIDs map
|
2022-09-15 10:48:32 +00:00
|
|
|
|
func filterSegmentInfo(segmentInfos []*querypb.SegmentInfo, segmentIDs map[int64]struct{}) []*querypb.SegmentInfo {
|
2022-02-07 07:35:44 +00:00
|
|
|
|
if len(segmentIDs) == 0 {
|
|
|
|
|
return segmentInfos
|
|
|
|
|
}
|
2022-09-15 10:48:32 +00:00
|
|
|
|
filtered := make([]*querypb.SegmentInfo, 0, len(segmentIDs))
|
2022-02-07 07:35:44 +00:00
|
|
|
|
for _, info := range segmentInfos {
|
|
|
|
|
_, ok := segmentIDs[info.GetSegmentID()]
|
|
|
|
|
if !ok {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
filtered = append(filtered, info)
|
|
|
|
|
}
|
|
|
|
|
return filtered
|
|
|
|
|
}
|
|
|
|
|
|
2021-12-29 09:37:25 +00:00
|
|
|
|
// isHealthy checks if QueryNode is healthy
|
2021-08-17 02:06:11 +00:00
|
|
|
|
func (node *QueryNode) isHealthy() bool {
|
2022-10-10 07:55:22 +00:00
|
|
|
|
code := node.stateCode.Load().(commonpb.StateCode)
|
|
|
|
|
return code == commonpb.StateCode_Healthy
|
2021-08-17 02:06:11 +00:00
|
|
|
|
}
|
|
|
|
|
|
2022-03-30 04:03:27 +00:00
|
|
|
|
// Search performs replica search tasks.
|
2022-09-15 10:48:32 +00:00
|
|
|
|
func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) {
|
|
|
|
|
log.Debug("Received SearchRequest",
|
2022-07-06 07:06:21 +00:00
|
|
|
|
zap.Int64("msgID", req.GetReq().GetBase().GetMsgID()),
|
|
|
|
|
zap.Strings("vChannels", req.GetDmlChannels()),
|
|
|
|
|
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
|
|
|
|
|
zap.Uint64("guaranteeTimestamp", req.GetReq().GetGuaranteeTimestamp()),
|
|
|
|
|
zap.Uint64("timeTravel", req.GetReq().GetTravelTimestamp()))
|
|
|
|
|
|
|
|
|
|
failRet := &internalpb.SearchResults{
|
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
toReduceResults := make([]*internalpb.SearchResults, 0)
|
|
|
|
|
runningGp, runningCtx := errgroup.WithContext(ctx)
|
|
|
|
|
mu := &sync.Mutex{}
|
|
|
|
|
for _, ch := range req.GetDmlChannels() {
|
|
|
|
|
ch := ch
|
|
|
|
|
req := &querypb.SearchRequest{
|
|
|
|
|
Req: req.Req,
|
|
|
|
|
DmlChannels: []string{ch},
|
|
|
|
|
SegmentIDs: req.SegmentIDs,
|
|
|
|
|
FromShardLeader: req.FromShardLeader,
|
|
|
|
|
Scope: req.Scope,
|
|
|
|
|
}
|
|
|
|
|
runningGp.Go(func() error {
|
|
|
|
|
ret, err := node.searchWithDmlChannel(runningCtx, req, ch)
|
|
|
|
|
mu.Lock()
|
|
|
|
|
defer mu.Unlock()
|
|
|
|
|
if err != nil {
|
|
|
|
|
failRet.Status.Reason = err.Error()
|
|
|
|
|
failRet.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if ret.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
|
|
|
|
|
failRet.Status.Reason = ret.Status.Reason
|
|
|
|
|
failRet.Status.ErrorCode = ret.Status.ErrorCode
|
|
|
|
|
return fmt.Errorf("%s", ret.Status.Reason)
|
|
|
|
|
}
|
|
|
|
|
toReduceResults = append(toReduceResults, ret)
|
|
|
|
|
return nil
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
if err := runningGp.Wait(); err != nil {
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
2022-08-23 02:44:52 +00:00
|
|
|
|
ret, err := reduceSearchResults(ctx, toReduceResults, req.Req.GetNq(), req.Req.GetTopk(), req.Req.GetMetricType())
|
2022-07-06 07:06:21 +00:00
|
|
|
|
if err != nil {
|
|
|
|
|
failRet.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
|
|
|
|
failRet.Status.Reason = err.Error()
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
2022-09-16 01:56:47 +00:00
|
|
|
|
|
|
|
|
|
if !req.FromShardLeader {
|
|
|
|
|
rateCol.Add(metricsinfo.NQPerSecond, float64(req.GetReq().GetNq()))
|
|
|
|
|
rateCol.Add(metricsinfo.SearchThroughput, float64(proto.Size(req)))
|
|
|
|
|
metrics.QueryNodeExecuteCounter.WithLabelValues(strconv.FormatInt(Params.QueryNodeCfg.GetNodeID(), 10), metrics.SearchLabel).Add(float64(proto.Size(req)))
|
|
|
|
|
}
|
2022-07-06 07:06:21 +00:00
|
|
|
|
return ret, nil
|
|
|
|
|
}
|
|
|
|
|
|
2022-09-15 10:48:32 +00:00
|
|
|
|
func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.SearchRequest, dmlChannel string) (*internalpb.SearchResults, error) {
|
2022-05-27 06:12:01 +00:00
|
|
|
|
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel, metrics.TotalLabel).Inc()
|
2022-05-23 08:41:58 +00:00
|
|
|
|
failRet := &internalpb.SearchResults{
|
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
},
|
|
|
|
|
}
|
2022-05-27 06:12:01 +00:00
|
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
|
if failRet.Status.ErrorCode != commonpb.ErrorCode_Success {
|
2022-07-22 09:38:29 +00:00
|
|
|
|
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel, metrics.FailLabel).Inc()
|
2022-05-27 06:12:01 +00:00
|
|
|
|
}
|
|
|
|
|
}()
|
2022-04-20 08:15:41 +00:00
|
|
|
|
if !node.isHealthy() {
|
2022-05-23 08:41:58 +00:00
|
|
|
|
failRet.Status.Reason = msgQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID())
|
|
|
|
|
return failRet, nil
|
2022-04-20 08:15:41 +00:00
|
|
|
|
}
|
|
|
|
|
|
2022-06-13 03:42:09 +00:00
|
|
|
|
msgID := req.GetReq().GetBase().GetMsgID()
|
2022-08-23 02:44:52 +00:00
|
|
|
|
log.Ctx(ctx).Debug("Received SearchRequest",
|
2022-06-13 03:42:09 +00:00
|
|
|
|
zap.Int64("msgID", msgID),
|
|
|
|
|
zap.Bool("fromShardLeader", req.GetFromShardLeader()),
|
2022-07-06 07:06:21 +00:00
|
|
|
|
zap.String("vChannel", dmlChannel),
|
2022-05-30 09:18:03 +00:00
|
|
|
|
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
|
|
|
|
|
zap.Uint64("guaranteeTimestamp", req.GetReq().GetGuaranteeTimestamp()),
|
|
|
|
|
zap.Uint64("timeTravel", req.GetReq().GetTravelTimestamp()))
|
2022-04-20 08:15:41 +00:00
|
|
|
|
|
|
|
|
|
if node.queryShardService == nil {
|
2022-05-23 08:41:58 +00:00
|
|
|
|
failRet.Status.Reason = "queryShardService is nil"
|
|
|
|
|
return failRet, nil
|
2022-04-20 08:15:41 +00:00
|
|
|
|
}
|
|
|
|
|
|
2022-07-06 07:06:21 +00:00
|
|
|
|
qs, err := node.queryShardService.getQueryShard(dmlChannel)
|
2022-04-20 08:15:41 +00:00
|
|
|
|
if err != nil {
|
2022-08-23 02:44:52 +00:00
|
|
|
|
log.Ctx(ctx).Warn("Search failed, failed to get query shard",
|
2022-06-13 03:42:09 +00:00
|
|
|
|
zap.Int64("msgID", msgID),
|
2022-07-06 07:06:21 +00:00
|
|
|
|
zap.String("dml channel", dmlChannel),
|
2022-06-13 03:42:09 +00:00
|
|
|
|
zap.Error(err))
|
2022-05-23 08:41:58 +00:00
|
|
|
|
failRet.Status.ErrorCode = commonpb.ErrorCode_NotShardLeader
|
|
|
|
|
failRet.Status.Reason = err.Error()
|
|
|
|
|
return failRet, nil
|
2022-04-20 08:15:41 +00:00
|
|
|
|
}
|
|
|
|
|
|
2022-08-23 02:44:52 +00:00
|
|
|
|
log.Ctx(ctx).Debug("start do search",
|
2022-06-15 12:48:10 +00:00
|
|
|
|
zap.Int64("msgID", msgID),
|
|
|
|
|
zap.Bool("fromShardLeader", req.GetFromShardLeader()),
|
2022-07-06 07:06:21 +00:00
|
|
|
|
zap.String("vChannel", dmlChannel),
|
2022-06-15 12:48:10 +00:00
|
|
|
|
zap.Int64s("segmentIDs", req.GetSegmentIDs()))
|
|
|
|
|
tr := timerecord.NewTimeRecorder("")
|
2022-05-27 06:12:01 +00:00
|
|
|
|
|
2022-05-23 08:41:58 +00:00
|
|
|
|
if req.FromShardLeader {
|
|
|
|
|
historicalTask, err2 := newSearchTask(ctx, req)
|
|
|
|
|
if err2 != nil {
|
|
|
|
|
failRet.Status.Reason = err2.Error()
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
historicalTask.QS = qs
|
|
|
|
|
historicalTask.DataScope = querypb.DataScope_Historical
|
|
|
|
|
err2 = node.scheduler.AddReadTask(ctx, historicalTask)
|
|
|
|
|
if err2 != nil {
|
|
|
|
|
failRet.Status.Reason = err2.Error()
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
err2 = historicalTask.WaitToFinish()
|
|
|
|
|
if err2 != nil {
|
|
|
|
|
failRet.Status.Reason = err2.Error()
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
2022-05-27 06:12:01 +00:00
|
|
|
|
|
2022-08-23 02:44:52 +00:00
|
|
|
|
tr.CtxElapse(ctx, fmt.Sprintf("do search done, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
|
2022-07-06 07:06:21 +00:00
|
|
|
|
msgID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs()))
|
2022-06-13 03:42:09 +00:00
|
|
|
|
|
2022-05-27 06:12:01 +00:00
|
|
|
|
failRet.Status.ErrorCode = commonpb.ErrorCode_Success
|
|
|
|
|
metrics.QueryNodeSQLatencyInQueue.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
|
|
|
|
|
metrics.SearchLabel).Observe(float64(historicalTask.queueDur.Milliseconds()))
|
|
|
|
|
metrics.QueryNodeReduceLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
|
|
|
|
|
metrics.SearchLabel).Observe(float64(historicalTask.reduceDur.Milliseconds()))
|
|
|
|
|
latency := tr.ElapseSpan()
|
|
|
|
|
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel).Observe(float64(latency.Milliseconds()))
|
|
|
|
|
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel, metrics.SuccessLabel).Inc()
|
2022-05-23 08:41:58 +00:00
|
|
|
|
return historicalTask.Ret, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//from Proxy
|
2022-07-06 07:06:21 +00:00
|
|
|
|
cluster, ok := qs.clusterService.getShardCluster(dmlChannel)
|
2022-05-23 08:41:58 +00:00
|
|
|
|
if !ok {
|
2022-07-05 05:08:19 +00:00
|
|
|
|
failRet.Status.ErrorCode = commonpb.ErrorCode_NotShardLeader
|
2022-07-06 07:06:21 +00:00
|
|
|
|
failRet.Status.Reason = fmt.Sprintf("channel %s leader is not here", dmlChannel)
|
2022-05-23 08:41:58 +00:00
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
searchCtx, cancel := context.WithCancel(ctx)
|
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
|
|
var results []*internalpb.SearchResults
|
|
|
|
|
var streamingResult *internalpb.SearchResults
|
|
|
|
|
var errCluster error
|
|
|
|
|
|
2022-06-17 09:38:12 +00:00
|
|
|
|
withStreaming := func(ctx context.Context) error {
|
|
|
|
|
streamingTask, err := newSearchTask(searchCtx, req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
2022-05-23 08:41:58 +00:00
|
|
|
|
}
|
|
|
|
|
streamingTask.QS = qs
|
|
|
|
|
streamingTask.DataScope = querypb.DataScope_Streaming
|
2022-06-17 09:38:12 +00:00
|
|
|
|
err = node.scheduler.AddReadTask(searchCtx, streamingTask)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
2022-05-23 08:41:58 +00:00
|
|
|
|
}
|
2022-06-17 09:38:12 +00:00
|
|
|
|
err = streamingTask.WaitToFinish()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
2022-05-23 08:41:58 +00:00
|
|
|
|
}
|
2022-05-27 06:12:01 +00:00
|
|
|
|
metrics.QueryNodeSQLatencyInQueue.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
|
|
|
|
|
metrics.SearchLabel).Observe(float64(streamingTask.queueDur.Milliseconds()))
|
|
|
|
|
metrics.QueryNodeReduceLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
|
|
|
|
|
metrics.SearchLabel).Observe(float64(streamingTask.reduceDur.Milliseconds()))
|
2022-05-23 08:41:58 +00:00
|
|
|
|
streamingResult = streamingTask.Ret
|
2022-06-17 09:38:12 +00:00
|
|
|
|
return nil
|
2022-04-20 08:15:41 +00:00
|
|
|
|
}
|
|
|
|
|
|
2022-06-17 09:38:12 +00:00
|
|
|
|
// shard leader dispatches request to its shard cluster
|
|
|
|
|
results, errCluster = cluster.Search(searchCtx, req, withStreaming)
|
|
|
|
|
if errCluster != nil {
|
2022-08-23 02:44:52 +00:00
|
|
|
|
log.Ctx(ctx).Warn("search cluster failed", zap.Int64("msgID", msgID), zap.Int64("collectionID", req.Req.GetCollectionID()), zap.Error(errCluster))
|
2022-06-17 09:38:12 +00:00
|
|
|
|
failRet.Status.Reason = errCluster.Error()
|
2022-05-23 08:41:58 +00:00
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
2022-06-13 03:42:09 +00:00
|
|
|
|
|
2022-08-23 02:44:52 +00:00
|
|
|
|
tr.CtxElapse(ctx, fmt.Sprintf("start reduce search result, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
|
2022-07-06 07:06:21 +00:00
|
|
|
|
msgID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs()))
|
2022-06-13 03:42:09 +00:00
|
|
|
|
|
2022-05-23 08:41:58 +00:00
|
|
|
|
results = append(results, streamingResult)
|
2022-08-23 02:44:52 +00:00
|
|
|
|
ret, err2 := reduceSearchResults(ctx, results, req.Req.GetNq(), req.Req.GetTopk(), req.Req.GetMetricType())
|
2022-05-23 08:41:58 +00:00
|
|
|
|
if err2 != nil {
|
|
|
|
|
failRet.Status.Reason = err2.Error()
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
2022-05-27 06:12:01 +00:00
|
|
|
|
|
2022-08-23 02:44:52 +00:00
|
|
|
|
tr.CtxElapse(ctx, fmt.Sprintf("do search done, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
|
2022-07-06 07:06:21 +00:00
|
|
|
|
msgID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs()))
|
2022-06-13 03:42:09 +00:00
|
|
|
|
|
2022-05-27 06:12:01 +00:00
|
|
|
|
failRet.Status.ErrorCode = commonpb.ErrorCode_Success
|
|
|
|
|
latency := tr.ElapseSpan()
|
|
|
|
|
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel).Observe(float64(latency.Milliseconds()))
|
|
|
|
|
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel, metrics.SuccessLabel).Inc()
|
2022-06-02 08:06:03 +00:00
|
|
|
|
metrics.QueryNodeSearchNQ.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Observe(float64(req.Req.GetNq()))
|
|
|
|
|
metrics.QueryNodeSearchTopK.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Observe(float64(req.Req.GetTopk()))
|
2022-09-16 01:56:47 +00:00
|
|
|
|
|
2022-05-23 08:41:58 +00:00
|
|
|
|
return ret, nil
|
2022-03-30 04:03:27 +00:00
|
|
|
|
}
|
|
|
|
|
|
2022-09-15 10:48:32 +00:00
|
|
|
|
func (node *QueryNode) queryWithDmlChannel(ctx context.Context, req *querypb.QueryRequest, dmlChannel string) (*internalpb.RetrieveResults, error) {
|
2022-05-27 06:12:01 +00:00
|
|
|
|
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel).Inc()
|
2022-05-23 08:41:58 +00:00
|
|
|
|
failRet := &internalpb.RetrieveResults{
|
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
},
|
|
|
|
|
}
|
2022-05-27 06:12:01 +00:00
|
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
|
if failRet.Status.ErrorCode != commonpb.ErrorCode_Success {
|
|
|
|
|
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel, metrics.FailLabel).Inc()
|
|
|
|
|
}
|
|
|
|
|
}()
|
2022-04-20 08:15:41 +00:00
|
|
|
|
if !node.isHealthy() {
|
2022-05-23 08:41:58 +00:00
|
|
|
|
failRet.Status.Reason = msgQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID())
|
|
|
|
|
return failRet, nil
|
2022-04-20 08:15:41 +00:00
|
|
|
|
}
|
2022-05-30 09:18:03 +00:00
|
|
|
|
|
2022-06-13 03:42:09 +00:00
|
|
|
|
msgID := req.GetReq().GetBase().GetMsgID()
|
2022-08-23 02:44:52 +00:00
|
|
|
|
log.Ctx(ctx).Debug("Received QueryRequest",
|
2022-06-13 03:42:09 +00:00
|
|
|
|
zap.Int64("msgID", msgID),
|
|
|
|
|
zap.Bool("fromShardLeader", req.GetFromShardLeader()),
|
2022-07-06 07:06:21 +00:00
|
|
|
|
zap.String("vChannel", dmlChannel),
|
2022-05-30 09:18:03 +00:00
|
|
|
|
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
|
|
|
|
|
zap.Uint64("guaranteeTimestamp", req.GetReq().GetGuaranteeTimestamp()),
|
|
|
|
|
zap.Uint64("timeTravel", req.GetReq().GetTravelTimestamp()))
|
2022-04-20 08:15:41 +00:00
|
|
|
|
|
|
|
|
|
if node.queryShardService == nil {
|
2022-05-23 08:41:58 +00:00
|
|
|
|
failRet.Status.Reason = "queryShardService is nil"
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
|
2022-07-06 07:06:21 +00:00
|
|
|
|
qs, err := node.queryShardService.getQueryShard(dmlChannel)
|
2022-04-20 08:15:41 +00:00
|
|
|
|
if err != nil {
|
2022-08-23 02:44:52 +00:00
|
|
|
|
log.Ctx(ctx).Warn("Query failed, failed to get query shard", zap.Int64("msgID", msgID), zap.String("dml channel", dmlChannel), zap.Error(err))
|
2022-05-23 08:41:58 +00:00
|
|
|
|
failRet.Status.Reason = err.Error()
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
|
2022-08-23 02:44:52 +00:00
|
|
|
|
log.Ctx(ctx).Debug("start do query",
|
2022-06-15 12:48:10 +00:00
|
|
|
|
zap.Int64("msgID", msgID),
|
|
|
|
|
zap.Bool("fromShardLeader", req.GetFromShardLeader()),
|
2022-07-06 07:06:21 +00:00
|
|
|
|
zap.String("vChannel", dmlChannel),
|
2022-06-15 12:48:10 +00:00
|
|
|
|
zap.Int64s("segmentIDs", req.GetSegmentIDs()))
|
|
|
|
|
tr := timerecord.NewTimeRecorder("")
|
2022-06-13 03:42:09 +00:00
|
|
|
|
|
2022-05-23 08:41:58 +00:00
|
|
|
|
if req.FromShardLeader {
|
|
|
|
|
// construct a queryTask
|
|
|
|
|
queryTask := newQueryTask(ctx, req)
|
|
|
|
|
queryTask.QS = qs
|
|
|
|
|
queryTask.DataScope = querypb.DataScope_Historical
|
|
|
|
|
err2 := node.scheduler.AddReadTask(ctx, queryTask)
|
|
|
|
|
if err2 != nil {
|
|
|
|
|
failRet.Status.Reason = err2.Error()
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
err2 = queryTask.WaitToFinish()
|
|
|
|
|
if err2 != nil {
|
|
|
|
|
failRet.Status.Reason = err2.Error()
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
2022-06-13 03:42:09 +00:00
|
|
|
|
|
2022-08-23 02:44:52 +00:00
|
|
|
|
tr.CtxElapse(ctx, fmt.Sprintf("do query done, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
|
2022-07-06 07:06:21 +00:00
|
|
|
|
msgID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs()))
|
2022-06-13 03:42:09 +00:00
|
|
|
|
|
2022-05-27 06:12:01 +00:00
|
|
|
|
failRet.Status.ErrorCode = commonpb.ErrorCode_Success
|
|
|
|
|
metrics.QueryNodeSQLatencyInQueue.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
|
|
|
|
|
metrics.QueryLabel).Observe(float64(queryTask.queueDur.Milliseconds()))
|
|
|
|
|
metrics.QueryNodeReduceLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
|
|
|
|
|
metrics.QueryLabel).Observe(float64(queryTask.reduceDur.Milliseconds()))
|
|
|
|
|
latency := tr.ElapseSpan()
|
|
|
|
|
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.QueryLabel).Observe(float64(latency.Milliseconds()))
|
|
|
|
|
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.QueryLabel, metrics.SuccessLabel).Inc()
|
2022-05-23 08:41:58 +00:00
|
|
|
|
return queryTask.Ret, nil
|
2022-04-20 08:15:41 +00:00
|
|
|
|
}
|
|
|
|
|
|
2022-07-06 07:06:21 +00:00
|
|
|
|
cluster, ok := qs.clusterService.getShardCluster(dmlChannel)
|
2022-05-23 08:41:58 +00:00
|
|
|
|
if !ok {
|
2022-07-05 05:08:19 +00:00
|
|
|
|
failRet.Status.ErrorCode = commonpb.ErrorCode_NotShardLeader
|
2022-07-06 07:06:21 +00:00
|
|
|
|
failRet.Status.Reason = fmt.Sprintf("channel %s leader is not here", dmlChannel)
|
2022-05-23 08:41:58 +00:00
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// add cancel when error occurs
|
|
|
|
|
queryCtx, cancel := context.WithCancel(ctx)
|
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
|
|
var results []*internalpb.RetrieveResults
|
|
|
|
|
var streamingResult *internalpb.RetrieveResults
|
|
|
|
|
|
2022-06-17 09:38:12 +00:00
|
|
|
|
withStreaming := func(ctx context.Context) error {
|
2022-05-23 08:41:58 +00:00
|
|
|
|
streamingTask := newQueryTask(queryCtx, req)
|
|
|
|
|
streamingTask.DataScope = querypb.DataScope_Streaming
|
|
|
|
|
streamingTask.QS = qs
|
2022-06-17 09:38:12 +00:00
|
|
|
|
err := node.scheduler.AddReadTask(queryCtx, streamingTask)
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
2022-05-23 08:41:58 +00:00
|
|
|
|
}
|
2022-06-17 09:38:12 +00:00
|
|
|
|
err = streamingTask.WaitToFinish()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
2022-05-23 08:41:58 +00:00
|
|
|
|
}
|
2022-05-27 06:12:01 +00:00
|
|
|
|
metrics.QueryNodeSQLatencyInQueue.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
|
|
|
|
|
metrics.QueryLabel).Observe(float64(streamingTask.queueDur.Milliseconds()))
|
|
|
|
|
metrics.QueryNodeReduceLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
|
|
|
|
|
metrics.QueryLabel).Observe(float64(streamingTask.reduceDur.Milliseconds()))
|
2022-05-23 08:41:58 +00:00
|
|
|
|
streamingResult = streamingTask.Ret
|
2022-06-17 09:38:12 +00:00
|
|
|
|
return nil
|
2022-04-20 08:15:41 +00:00
|
|
|
|
}
|
|
|
|
|
|
2022-06-17 09:38:12 +00:00
|
|
|
|
var errCluster error
|
|
|
|
|
// shard leader dispatches request to its shard cluster
|
|
|
|
|
results, errCluster = cluster.Query(queryCtx, req, withStreaming)
|
|
|
|
|
if errCluster != nil {
|
2022-08-23 02:44:52 +00:00
|
|
|
|
log.Ctx(ctx).Warn("failed to query cluster", zap.Int64("msgID", msgID), zap.Int64("collectionID", req.Req.GetCollectionID()), zap.Error(errCluster))
|
2022-06-17 09:38:12 +00:00
|
|
|
|
failRet.Status.Reason = errCluster.Error()
|
2022-05-23 08:41:58 +00:00
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
2022-06-13 03:42:09 +00:00
|
|
|
|
|
2022-08-23 02:44:52 +00:00
|
|
|
|
tr.CtxElapse(ctx, fmt.Sprintf("start reduce query result, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
|
2022-07-06 07:06:21 +00:00
|
|
|
|
msgID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs()))
|
2022-06-13 03:42:09 +00:00
|
|
|
|
|
2022-05-23 08:41:58 +00:00
|
|
|
|
results = append(results, streamingResult)
|
2022-09-20 12:48:50 +00:00
|
|
|
|
ret, err2 := mergeInternalRetrieveResult(ctx, results, req.Req.GetLimit())
|
2022-05-23 08:41:58 +00:00
|
|
|
|
if err2 != nil {
|
|
|
|
|
failRet.Status.Reason = err2.Error()
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
2022-06-13 03:42:09 +00:00
|
|
|
|
|
2022-08-23 02:44:52 +00:00
|
|
|
|
tr.CtxElapse(ctx, fmt.Sprintf("do query done, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
|
2022-07-06 07:06:21 +00:00
|
|
|
|
msgID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs()))
|
2022-06-13 03:42:09 +00:00
|
|
|
|
|
2022-05-27 06:12:01 +00:00
|
|
|
|
failRet.Status.ErrorCode = commonpb.ErrorCode_Success
|
|
|
|
|
latency := tr.ElapseSpan()
|
|
|
|
|
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.QueryLabel).Observe(float64(latency.Milliseconds()))
|
|
|
|
|
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.QueryLabel, metrics.SuccessLabel).Inc()
|
2022-05-23 08:41:58 +00:00
|
|
|
|
return ret, nil
|
2022-03-30 04:03:27 +00:00
|
|
|
|
}
|
|
|
|
|
|
2022-07-06 07:06:21 +00:00
|
|
|
|
// Query performs replica query tasks.
|
|
|
|
|
func (node *QueryNode) Query(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error) {
|
2022-08-23 02:44:52 +00:00
|
|
|
|
log.Ctx(ctx).Debug("Received QueryRequest", zap.Int64("msgID", req.GetReq().GetBase().GetMsgID()),
|
2022-07-06 07:06:21 +00:00
|
|
|
|
zap.Strings("vChannels", req.GetDmlChannels()),
|
|
|
|
|
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
|
|
|
|
|
zap.Uint64("guaranteeTimestamp", req.Req.GetGuaranteeTimestamp()),
|
|
|
|
|
zap.Uint64("timeTravel", req.GetReq().GetTravelTimestamp()))
|
|
|
|
|
|
|
|
|
|
failRet := &internalpb.RetrieveResults{
|
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
toMergeResults := make([]*internalpb.RetrieveResults, 0)
|
|
|
|
|
runningGp, runningCtx := errgroup.WithContext(ctx)
|
|
|
|
|
mu := &sync.Mutex{}
|
|
|
|
|
|
|
|
|
|
for _, ch := range req.GetDmlChannels() {
|
|
|
|
|
ch := ch
|
|
|
|
|
req := &querypb.QueryRequest{
|
|
|
|
|
Req: req.Req,
|
|
|
|
|
DmlChannels: []string{ch},
|
|
|
|
|
SegmentIDs: req.SegmentIDs,
|
|
|
|
|
FromShardLeader: req.FromShardLeader,
|
|
|
|
|
Scope: req.Scope,
|
|
|
|
|
}
|
|
|
|
|
runningGp.Go(func() error {
|
|
|
|
|
ret, err := node.queryWithDmlChannel(runningCtx, req, ch)
|
|
|
|
|
mu.Lock()
|
|
|
|
|
defer mu.Unlock()
|
|
|
|
|
if err != nil {
|
|
|
|
|
failRet.Status.Reason = err.Error()
|
|
|
|
|
failRet.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if ret.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
|
|
|
|
|
failRet.Status.Reason = ret.Status.Reason
|
|
|
|
|
failRet.Status.ErrorCode = ret.Status.ErrorCode
|
|
|
|
|
return fmt.Errorf("%s", ret.Status.Reason)
|
|
|
|
|
}
|
|
|
|
|
toMergeResults = append(toMergeResults, ret)
|
|
|
|
|
return nil
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
if err := runningGp.Wait(); err != nil {
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
2022-09-20 12:48:50 +00:00
|
|
|
|
ret, err := mergeInternalRetrieveResult(ctx, toMergeResults, req.GetReq().GetLimit())
|
2022-07-06 07:06:21 +00:00
|
|
|
|
if err != nil {
|
|
|
|
|
failRet.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
|
|
|
|
failRet.Status.Reason = err.Error()
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
2022-09-16 01:56:47 +00:00
|
|
|
|
|
|
|
|
|
if !req.FromShardLeader {
|
|
|
|
|
rateCol.Add(metricsinfo.NQPerSecond, 1)
|
|
|
|
|
metrics.QueryNodeExecuteCounter.WithLabelValues(strconv.FormatInt(Params.QueryNodeCfg.GetNodeID(), 10), metrics.QueryLabel).Add(float64(proto.Size(req)))
|
|
|
|
|
}
|
2022-07-06 07:06:21 +00:00
|
|
|
|
return ret, nil
|
|
|
|
|
}
|
|
|
|
|
|
2022-04-27 02:41:46 +00:00
|
|
|
|
// SyncReplicaSegments syncs replica node & segments states
|
|
|
|
|
func (node *QueryNode) SyncReplicaSegments(ctx context.Context, req *querypb.SyncReplicaSegmentsRequest) (*commonpb.Status, error) {
|
|
|
|
|
if !node.isHealthy() {
|
|
|
|
|
return &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: msgQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID()),
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
2022-10-25 11:29:36 +00:00
|
|
|
|
log.Info("Received SyncReplicaSegments request", zap.String("vchannelName", req.GetVchannelName()))
|
2022-04-27 02:41:46 +00:00
|
|
|
|
|
|
|
|
|
err := node.ShardClusterService.SyncReplicaSegments(req.GetVchannelName(), req.GetReplicaSegments())
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Warn("failed to sync replica semgents,", zap.String("vchannel", req.GetVchannelName()), zap.Error(err))
|
|
|
|
|
return &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
2022-10-25 11:29:36 +00:00
|
|
|
|
log.Info("SyncReplicaSegments Done", zap.String("vchannel", req.GetVchannelName()))
|
2022-04-27 02:41:46 +00:00
|
|
|
|
|
|
|
|
|
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
|
|
|
|
|
}
|
|
|
|
|
|
2022-08-02 17:12:33 +00:00
|
|
|
|
//ShowConfigurations returns the configurations of queryNode matching req.Pattern
|
|
|
|
|
func (node *QueryNode) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
|
|
|
|
|
if !node.isHealthy() {
|
|
|
|
|
log.Warn("QueryNode.ShowConfigurations failed",
|
|
|
|
|
zap.Int64("nodeId", Params.QueryNodeCfg.GetNodeID()),
|
|
|
|
|
zap.String("req", req.Pattern),
|
|
|
|
|
zap.Error(errQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID())))
|
|
|
|
|
|
|
|
|
|
return &internalpb.ShowConfigurationsResponse{
|
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: msgQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID()),
|
|
|
|
|
},
|
|
|
|
|
Configuations: nil,
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return getComponentConfigurations(ctx, req), nil
|
|
|
|
|
}
|
|
|
|
|
|
2021-12-24 09:13:10 +00:00
|
|
|
|
// GetMetrics return system infos of the query node, such as total memory, memory usage, cpu usage ...
|
2021-08-17 02:06:11 +00:00
|
|
|
|
func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
|
|
|
|
|
if !node.isHealthy() {
|
|
|
|
|
log.Warn("QueryNode.GetMetrics failed",
|
2022-08-02 17:12:33 +00:00
|
|
|
|
zap.Int64("nodeId", Params.QueryNodeCfg.GetNodeID()),
|
2021-08-17 02:06:11 +00:00
|
|
|
|
zap.String("req", req.Request),
|
2022-04-24 14:03:44 +00:00
|
|
|
|
zap.Error(errQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID())))
|
2021-08-17 02:06:11 +00:00
|
|
|
|
|
|
|
|
|
return &milvuspb.GetMetricsResponse{
|
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
2022-04-24 14:03:44 +00:00
|
|
|
|
Reason: msgQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID()),
|
2021-08-17 02:06:11 +00:00
|
|
|
|
},
|
|
|
|
|
Response: "",
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
metricType, err := metricsinfo.ParseMetricType(req.Request)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Warn("QueryNode.GetMetrics failed to parse metric type",
|
2022-08-02 17:12:33 +00:00
|
|
|
|
zap.Int64("nodeId", Params.QueryNodeCfg.GetNodeID()),
|
2021-08-17 02:06:11 +00:00
|
|
|
|
zap.String("req", req.Request),
|
|
|
|
|
zap.Error(err))
|
|
|
|
|
|
|
|
|
|
return &milvuspb.GetMetricsResponse{
|
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
},
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if metricType == metricsinfo.SystemInfoMetrics {
|
2022-10-25 11:39:30 +00:00
|
|
|
|
queryNodeMetrics, err := getSystemInfoMetrics(ctx, req, node)
|
2022-01-14 15:55:34 +00:00
|
|
|
|
if err != nil {
|
|
|
|
|
log.Warn("QueryNode.GetMetrics failed",
|
2022-08-02 17:12:33 +00:00
|
|
|
|
zap.Int64("nodeId", Params.QueryNodeCfg.GetNodeID()),
|
2022-01-14 15:55:34 +00:00
|
|
|
|
zap.String("req", req.Request),
|
2022-08-02 17:12:33 +00:00
|
|
|
|
zap.String("metricType", metricType),
|
2022-01-14 15:55:34 +00:00
|
|
|
|
zap.Error(err))
|
2022-10-25 11:39:30 +00:00
|
|
|
|
return &milvuspb.GetMetricsResponse{
|
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
},
|
|
|
|
|
}, nil
|
2022-01-14 15:55:34 +00:00
|
|
|
|
}
|
2022-10-25 11:39:30 +00:00
|
|
|
|
log.Debug("QueryNode.GetMetrics",
|
|
|
|
|
zap.Int64("node_id", Params.QueryNodeCfg.GetNodeID()),
|
|
|
|
|
zap.String("req", req.Request),
|
|
|
|
|
zap.String("metric_type", metricType),
|
|
|
|
|
zap.Any("queryNodeMetrics", queryNodeMetrics))
|
2021-08-17 02:06:11 +00:00
|
|
|
|
|
2022-10-25 11:39:30 +00:00
|
|
|
|
return queryNodeMetrics, nil
|
2021-08-17 02:06:11 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.Debug("QueryNode.GetMetrics failed, request metric type is not implemented yet",
|
2022-08-02 17:12:33 +00:00
|
|
|
|
zap.Int64("nodeId", Params.QueryNodeCfg.GetNodeID()),
|
2021-08-17 02:06:11 +00:00
|
|
|
|
zap.String("req", req.Request),
|
2022-08-02 17:12:33 +00:00
|
|
|
|
zap.String("metricType", metricType))
|
2021-08-17 02:06:11 +00:00
|
|
|
|
|
|
|
|
|
return &milvuspb.GetMetricsResponse{
|
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: metricsinfo.MsgUnimplementedMetric,
|
|
|
|
|
},
|
|
|
|
|
Response: "",
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
2022-09-15 10:48:32 +00:00
|
|
|
|
|
|
|
|
|
func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.GetDataDistributionRequest) (*querypb.GetDataDistributionResponse, error) {
|
|
|
|
|
log := log.With(
|
|
|
|
|
zap.Int64("msg-id", req.GetBase().GetMsgID()),
|
|
|
|
|
zap.Int64("node-id", Params.QueryNodeCfg.GetNodeID()),
|
|
|
|
|
)
|
|
|
|
|
if !node.isHealthy() {
|
|
|
|
|
log.Warn("QueryNode.GetMetrics failed",
|
|
|
|
|
zap.Error(errQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID())))
|
|
|
|
|
|
|
|
|
|
return &querypb.GetDataDistributionResponse{
|
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: msgQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID()),
|
|
|
|
|
},
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
2022-10-08 12:26:57 +00:00
|
|
|
|
// check target matches
|
|
|
|
|
if req.GetBase().GetTargetID() != node.session.ServerID {
|
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_NodeIDNotMatch,
|
|
|
|
|
Reason: common.WrapNodeIDNotMatchMsg(req.GetBase().GetTargetID(), node.session.ServerID),
|
|
|
|
|
}
|
|
|
|
|
return &querypb.GetDataDistributionResponse{Status: status}, nil
|
|
|
|
|
}
|
|
|
|
|
|
2022-09-15 10:48:32 +00:00
|
|
|
|
growingSegments := node.metaReplica.getGrowingSegments()
|
|
|
|
|
sealedSegments := node.metaReplica.getSealedSegments()
|
|
|
|
|
shardClusters := node.ShardClusterService.GetShardClusters()
|
|
|
|
|
|
2022-09-20 11:24:51 +00:00
|
|
|
|
channelGrowingsMap := make(map[string][]int64)
|
2022-09-15 10:48:32 +00:00
|
|
|
|
for _, s := range growingSegments {
|
2022-09-20 11:24:51 +00:00
|
|
|
|
channelGrowingsMap[s.vChannelID] = append(channelGrowingsMap[s.vChannelID], s.ID())
|
2022-09-15 10:48:32 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
segmentVersionInfos := make([]*querypb.SegmentVersionInfo, 0, len(sealedSegments))
|
|
|
|
|
for _, s := range sealedSegments {
|
|
|
|
|
info := &querypb.SegmentVersionInfo{
|
|
|
|
|
ID: s.ID(),
|
|
|
|
|
Collection: s.collectionID,
|
|
|
|
|
Partition: s.partitionID,
|
|
|
|
|
Channel: s.vChannelID,
|
|
|
|
|
Version: s.version,
|
|
|
|
|
}
|
|
|
|
|
segmentVersionInfos = append(segmentVersionInfos, info)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
channelVersionInfos := make([]*querypb.ChannelVersionInfo, 0, len(shardClusters))
|
|
|
|
|
leaderViews := make([]*querypb.LeaderView, 0, len(shardClusters))
|
|
|
|
|
for _, sc := range shardClusters {
|
2022-09-20 04:04:49 +00:00
|
|
|
|
if !node.queryShardService.hasQueryShard(sc.vchannelName) {
|
|
|
|
|
continue
|
|
|
|
|
}
|
2022-09-15 10:48:32 +00:00
|
|
|
|
segmentInfos := sc.GetSegmentInfos()
|
2022-09-28 04:10:54 +00:00
|
|
|
|
mapping := make(map[int64]*querypb.SegmentDist)
|
2022-09-15 10:48:32 +00:00
|
|
|
|
for _, info := range segmentInfos {
|
2022-09-28 04:10:54 +00:00
|
|
|
|
mapping[info.segmentID] = &querypb.SegmentDist{
|
|
|
|
|
NodeID: info.nodeID,
|
|
|
|
|
Version: info.version,
|
|
|
|
|
}
|
2022-09-15 10:48:32 +00:00
|
|
|
|
}
|
|
|
|
|
view := &querypb.LeaderView{
|
2022-09-20 11:24:51 +00:00
|
|
|
|
Collection: sc.collectionID,
|
|
|
|
|
Channel: sc.vchannelName,
|
2022-09-28 04:10:54 +00:00
|
|
|
|
SegmentDist: mapping,
|
2022-09-20 11:24:51 +00:00
|
|
|
|
GrowingSegmentIDs: channelGrowingsMap[sc.vchannelName],
|
2022-09-15 10:48:32 +00:00
|
|
|
|
}
|
|
|
|
|
leaderViews = append(leaderViews, view)
|
|
|
|
|
|
|
|
|
|
channelInfo := &querypb.ChannelVersionInfo{
|
|
|
|
|
Channel: sc.vchannelName,
|
|
|
|
|
Collection: sc.collectionID,
|
|
|
|
|
Version: sc.getVersion(),
|
|
|
|
|
}
|
|
|
|
|
channelVersionInfos = append(channelVersionInfos, channelInfo)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return &querypb.GetDataDistributionResponse{
|
2022-09-20 11:24:51 +00:00
|
|
|
|
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
|
|
|
|
|
NodeID: node.session.ServerID,
|
|
|
|
|
Segments: segmentVersionInfos,
|
|
|
|
|
Channels: channelVersionInfos,
|
|
|
|
|
LeaderViews: leaderViews,
|
2022-09-15 10:48:32 +00:00
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDistributionRequest) (*commonpb.Status, error) {
|
2022-09-27 11:22:54 +00:00
|
|
|
|
log := log.Ctx(ctx).With(zap.Int64("collectionID", req.GetCollectionID()), zap.String("channel", req.GetChannel()))
|
2022-10-08 12:26:57 +00:00
|
|
|
|
// check node healthy
|
2022-10-10 07:55:22 +00:00
|
|
|
|
code := node.stateCode.Load().(commonpb.StateCode)
|
|
|
|
|
if code != commonpb.StateCode_Healthy {
|
2022-10-08 12:26:57 +00:00
|
|
|
|
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
|
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}
|
|
|
|
|
return status, nil
|
|
|
|
|
}
|
|
|
|
|
// check target matches
|
|
|
|
|
if req.GetBase().GetTargetID() != node.session.ServerID {
|
2022-10-25 11:29:36 +00:00
|
|
|
|
log.Warn("failed to do match target id when sync ", zap.Int64("expect", req.GetBase().GetTargetID()), zap.Int64("actual", node.session.ServerID))
|
2022-10-08 12:26:57 +00:00
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_NodeIDNotMatch,
|
|
|
|
|
Reason: common.WrapNodeIDNotMatchMsg(req.GetBase().GetTargetID(), node.session.ServerID),
|
|
|
|
|
}
|
|
|
|
|
return status, nil
|
|
|
|
|
}
|
2022-09-15 10:48:32 +00:00
|
|
|
|
shardCluster, ok := node.ShardClusterService.getShardCluster(req.GetChannel())
|
|
|
|
|
if !ok {
|
2022-10-25 11:29:36 +00:00
|
|
|
|
log.Warn("failed to find shard cluster when sync ", zap.String("channel", req.GetChannel()))
|
2022-09-15 10:48:32 +00:00
|
|
|
|
return &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: "shard not exist",
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
for _, action := range req.GetActions() {
|
2022-10-25 11:29:36 +00:00
|
|
|
|
log.Info("sync action", zap.String("Action", action.GetType().String()), zap.Int64("segmentID", action.SegmentID))
|
2022-09-15 10:48:32 +00:00
|
|
|
|
switch action.GetType() {
|
|
|
|
|
case querypb.SyncType_Remove:
|
2022-09-27 11:22:54 +00:00
|
|
|
|
shardCluster.ReleaseSegments(ctx, &querypb.ReleaseSegmentsRequest{
|
|
|
|
|
SegmentIDs: []UniqueID{action.GetSegmentID()},
|
|
|
|
|
Scope: querypb.DataScope_Historical,
|
|
|
|
|
}, true)
|
2022-09-15 10:48:32 +00:00
|
|
|
|
case querypb.SyncType_Set:
|
2022-09-22 05:18:50 +00:00
|
|
|
|
shardCluster.SyncSegments([]*querypb.ReplicaSegmentsInfo{
|
2022-09-28 04:10:54 +00:00
|
|
|
|
{
|
|
|
|
|
NodeId: action.GetNodeID(),
|
|
|
|
|
PartitionId: action.GetPartitionID(),
|
|
|
|
|
SegmentIds: []int64{action.GetSegmentID()},
|
|
|
|
|
Versions: []int64{action.GetVersion()},
|
|
|
|
|
},
|
2022-09-22 05:18:50 +00:00
|
|
|
|
}, segmentStateLoaded)
|
|
|
|
|
|
2022-09-15 10:48:32 +00:00
|
|
|
|
default:
|
|
|
|
|
return &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: "unexpected action type",
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
|
Reason: "",
|
|
|
|
|
}, nil
|
|
|
|
|
}
|