2021-12-03 11:25:45 +00:00
|
|
|
|
// Licensed to the LF AI & Data foundation under one
|
|
|
|
|
// or more contributor license agreements. See the NOTICE file
|
|
|
|
|
// distributed with this work for additional information
|
|
|
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
|
|
|
// to you under the Apache License, Version 2.0 (the
|
|
|
|
|
// "License"); you may not use this file except in compliance
|
2021-04-19 05:47:10 +00:00
|
|
|
|
// with the License. You may obtain a copy of the License at
|
|
|
|
|
//
|
2021-12-03 11:25:45 +00:00
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
2021-04-19 05:47:10 +00:00
|
|
|
|
//
|
2021-12-03 11:25:45 +00:00
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
|
// limitations under the License.
|
2021-04-19 05:47:10 +00:00
|
|
|
|
|
2021-04-12 01:18:43 +00:00
|
|
|
|
package querynode
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
2022-05-23 08:41:58 +00:00
|
|
|
|
"errors"
|
2021-06-15 04:41:40 +00:00
|
|
|
|
"fmt"
|
2022-05-23 08:41:58 +00:00
|
|
|
|
"sync"
|
2021-04-12 01:18:43 +00:00
|
|
|
|
|
|
|
|
|
"go.uber.org/zap"
|
|
|
|
|
|
2021-11-25 07:03:16 +00:00
|
|
|
|
"github.com/milvus-io/milvus/internal/common"
|
2021-04-22 06:45:57 +00:00
|
|
|
|
"github.com/milvus-io/milvus/internal/log"
|
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
2022-04-27 02:41:46 +00:00
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
2021-04-22 06:45:57 +00:00
|
|
|
|
queryPb "github.com/milvus-io/milvus/internal/proto/querypb"
|
2021-11-05 08:00:55 +00:00
|
|
|
|
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
2021-04-22 06:45:57 +00:00
|
|
|
|
"github.com/milvus-io/milvus/internal/util/typeutil"
|
2021-04-12 01:18:43 +00:00
|
|
|
|
)
|
|
|
|
|
|
2021-12-24 05:26:30 +00:00
|
|
|
|
// GetComponentStates returns information about whether the node is healthy
|
2021-04-12 01:18:43 +00:00
|
|
|
|
func (node *QueryNode) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
|
|
|
|
|
stats := &internalpb.ComponentStates{
|
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
|
},
|
|
|
|
|
}
|
2021-11-25 07:03:16 +00:00
|
|
|
|
code, ok := node.stateCode.Load().(internalpb.StateCode)
|
|
|
|
|
if !ok {
|
|
|
|
|
errMsg := "unexpected error in type assertion"
|
2021-04-12 01:18:43 +00:00
|
|
|
|
stats.Status = &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
2021-11-25 07:03:16 +00:00
|
|
|
|
Reason: errMsg,
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return stats, nil
|
2021-11-25 07:03:16 +00:00
|
|
|
|
}
|
|
|
|
|
nodeID := common.NotRegisteredID
|
|
|
|
|
if node.session != nil && node.session.Registered() {
|
|
|
|
|
nodeID = node.session.ServerID
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
|
|
|
|
info := &internalpb.ComponentInfo{
|
2021-11-25 07:03:16 +00:00
|
|
|
|
NodeID: nodeID,
|
2021-04-12 01:18:43 +00:00
|
|
|
|
Role: typeutil.QueryNodeRole,
|
|
|
|
|
StateCode: code,
|
|
|
|
|
}
|
|
|
|
|
stats.State = info
|
2021-12-06 07:25:35 +00:00
|
|
|
|
log.Debug("Get QueryNode component state done", zap.Any("stateCode", info.StateCode))
|
2021-04-12 01:18:43 +00:00
|
|
|
|
return stats, nil
|
|
|
|
|
}
|
|
|
|
|
|
2021-10-29 12:10:43 +00:00
|
|
|
|
// GetTimeTickChannel returns the time tick channel
|
|
|
|
|
// TimeTickChannel contains many time tick messages, which will be sent by query nodes
|
2021-04-12 01:18:43 +00:00
|
|
|
|
func (node *QueryNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
|
|
|
|
return &milvuspb.StringResponse{
|
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
|
Reason: "",
|
|
|
|
|
},
|
2022-03-04 03:17:56 +00:00
|
|
|
|
Value: Params.CommonCfg.QueryCoordTimeTick,
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
2021-12-24 05:26:30 +00:00
|
|
|
|
// GetStatisticsChannel returns the statistics channel
|
2021-10-30 11:00:52 +00:00
|
|
|
|
// Statistics channel contains statistics infos of query nodes, such as segment infos, memory infos
|
2021-04-12 01:18:43 +00:00
|
|
|
|
func (node *QueryNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
|
|
|
|
return &milvuspb.StringResponse{
|
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
|
Reason: "",
|
|
|
|
|
},
|
2022-03-04 03:17:56 +00:00
|
|
|
|
Value: Params.CommonCfg.QueryNodeStats,
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
2021-11-01 14:32:33 +00:00
|
|
|
|
// AddQueryChannel watch queryChannel of the collection to receive query message
|
2021-04-12 01:18:43 +00:00
|
|
|
|
func (node *QueryNode) AddQueryChannel(ctx context.Context, in *queryPb.AddQueryChannelRequest) (*commonpb.Status, error) {
|
2021-07-13 06:16:00 +00:00
|
|
|
|
code := node.stateCode.Load().(internalpb.StateCode)
|
|
|
|
|
if code != internalpb.StateCode_Healthy {
|
2022-04-24 14:03:44 +00:00
|
|
|
|
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
|
2021-07-13 06:16:00 +00:00
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return status, nil
|
2021-07-13 06:16:00 +00:00
|
|
|
|
}
|
2021-11-30 02:29:43 +00:00
|
|
|
|
dct := &addQueryChannelTask{
|
|
|
|
|
baseTask: baseTask{
|
|
|
|
|
ctx: ctx,
|
|
|
|
|
done: make(chan error),
|
|
|
|
|
},
|
|
|
|
|
req: in,
|
|
|
|
|
node: node,
|
2021-10-15 12:35:05 +00:00
|
|
|
|
}
|
2021-06-15 04:41:40 +00:00
|
|
|
|
|
2021-11-30 02:29:43 +00:00
|
|
|
|
err := node.scheduler.queue.Enqueue(dct)
|
2021-10-15 12:35:05 +00:00
|
|
|
|
if err != nil {
|
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
2021-09-24 05:57:54 +00:00
|
|
|
|
}
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Warn(err.Error())
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return status, nil
|
2021-06-15 04:41:40 +00:00
|
|
|
|
}
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Info("addQueryChannelTask Enqueue done",
|
2022-01-06 15:17:26 +00:00
|
|
|
|
zap.Int64("collectionID", in.CollectionID),
|
|
|
|
|
zap.String("queryChannel", in.QueryChannel),
|
|
|
|
|
zap.String("queryResultChannel", in.QueryResultChannel),
|
|
|
|
|
)
|
2021-06-15 04:41:40 +00:00
|
|
|
|
|
2021-11-30 02:29:43 +00:00
|
|
|
|
waitFunc := func() (*commonpb.Status, error) {
|
|
|
|
|
err = dct.WaitToFinish()
|
|
|
|
|
if err != nil {
|
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
2021-10-15 12:35:05 +00:00
|
|
|
|
}
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Warn(err.Error())
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return status, nil
|
2021-10-15 12:35:05 +00:00
|
|
|
|
}
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Info("addQueryChannelTask WaitToFinish done",
|
2022-01-06 15:17:26 +00:00
|
|
|
|
zap.Int64("collectionID", in.CollectionID),
|
|
|
|
|
zap.String("queryChannel", in.QueryChannel),
|
|
|
|
|
zap.String("queryResultChannel", in.QueryResultChannel),
|
|
|
|
|
)
|
|
|
|
|
|
2021-11-30 02:29:43 +00:00
|
|
|
|
return &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
|
}, nil
|
2021-10-15 12:35:05 +00:00
|
|
|
|
}
|
2021-06-15 04:41:40 +00:00
|
|
|
|
|
2021-11-30 02:29:43 +00:00
|
|
|
|
return waitFunc()
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
|
|
|
|
|
2021-11-01 14:57:07 +00:00
|
|
|
|
// RemoveQueryChannel remove queryChannel of the collection to stop receiving query message
|
2021-04-12 01:18:43 +00:00
|
|
|
|
func (node *QueryNode) RemoveQueryChannel(ctx context.Context, in *queryPb.RemoveQueryChannelRequest) (*commonpb.Status, error) {
|
|
|
|
|
// if node.searchService == nil || node.searchService.searchMsgStream == nil {
|
|
|
|
|
// errMsg := "null search service or null search result message stream"
|
|
|
|
|
// status := &commonpb.Status{
|
|
|
|
|
// ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
// Reason: errMsg,
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// return status, errors.New(errMsg)
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// searchStream, ok := node.searchService.searchMsgStream.(*pulsarms.PulsarMsgStream)
|
|
|
|
|
// if !ok {
|
|
|
|
|
// errMsg := "type assertion failed for search message stream"
|
|
|
|
|
// status := &commonpb.Status{
|
|
|
|
|
// ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
// Reason: errMsg,
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// return status, errors.New(errMsg)
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// resultStream, ok := node.searchService.searchResultMsgStream.(*pulsarms.PulsarMsgStream)
|
|
|
|
|
// if !ok {
|
|
|
|
|
// errMsg := "type assertion failed for search result message stream"
|
|
|
|
|
// status := &commonpb.Status{
|
|
|
|
|
// ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
// Reason: errMsg,
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// return status, errors.New(errMsg)
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// // remove request channel
|
|
|
|
|
// consumeChannels := []string{in.RequestChannelID}
|
|
|
|
|
// consumeSubName := Params.MsgChannelSubName
|
|
|
|
|
// // TODO: searchStream.RemovePulsarConsumers(producerChannels)
|
|
|
|
|
// searchStream.AsConsumer(consumeChannels, consumeSubName)
|
|
|
|
|
|
|
|
|
|
// // remove result channel
|
|
|
|
|
// producerChannels := []string{in.ResultChannelID}
|
|
|
|
|
// // TODO: resultStream.RemovePulsarProducer(producerChannels)
|
|
|
|
|
// resultStream.AsProducer(producerChannels)
|
|
|
|
|
|
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
|
}
|
|
|
|
|
return status, nil
|
|
|
|
|
}
|
|
|
|
|
|
2021-12-24 09:28:20 +00:00
|
|
|
|
// WatchDmChannels create consumers on dmChannels to receive Incremental data,which is the important part of real-time query
|
2021-04-12 01:18:43 +00:00
|
|
|
|
func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) {
|
2021-07-13 06:16:00 +00:00
|
|
|
|
code := node.stateCode.Load().(internalpb.StateCode)
|
|
|
|
|
if code != internalpb.StateCode_Healthy {
|
2022-04-24 14:03:44 +00:00
|
|
|
|
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
|
2021-07-13 06:16:00 +00:00
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return status, nil
|
2021-07-13 06:16:00 +00:00
|
|
|
|
}
|
2021-04-16 06:40:33 +00:00
|
|
|
|
dct := &watchDmChannelsTask{
|
|
|
|
|
baseTask: baseTask{
|
|
|
|
|
ctx: ctx,
|
|
|
|
|
done: make(chan error),
|
|
|
|
|
},
|
|
|
|
|
req: in,
|
|
|
|
|
node: node,
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
|
|
|
|
|
2021-04-16 06:40:33 +00:00
|
|
|
|
err := node.scheduler.queue.Enqueue(dct)
|
|
|
|
|
if err != nil {
|
2021-04-12 01:18:43 +00:00
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
2021-04-16 06:40:33 +00:00
|
|
|
|
Reason: err.Error(),
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Warn(err.Error())
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return status, nil
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Info("watchDmChannelsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()), zap.Int64("replicaID", in.GetReplicaID()))
|
2021-08-03 14:01:27 +00:00
|
|
|
|
waitFunc := func() (*commonpb.Status, error) {
|
2021-04-16 06:40:33 +00:00
|
|
|
|
err = dct.WaitToFinish()
|
2021-04-12 01:18:43 +00:00
|
|
|
|
if err != nil {
|
2021-08-03 14:01:27 +00:00
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Warn(err.Error())
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return status, nil
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Info("watchDmChannelsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
|
2021-08-03 14:01:27 +00:00
|
|
|
|
return &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
|
}, nil
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
2021-08-03 14:01:27 +00:00
|
|
|
|
|
|
|
|
|
return waitFunc()
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
|
|
|
|
|
2021-12-24 09:16:41 +00:00
|
|
|
|
// WatchDeltaChannels create consumers on dmChannels to receive Incremental data,which is the important part of real-time query
|
2021-11-05 06:47:19 +00:00
|
|
|
|
func (node *QueryNode) WatchDeltaChannels(ctx context.Context, in *queryPb.WatchDeltaChannelsRequest) (*commonpb.Status, error) {
|
2021-11-09 01:27:04 +00:00
|
|
|
|
code := node.stateCode.Load().(internalpb.StateCode)
|
|
|
|
|
if code != internalpb.StateCode_Healthy {
|
2022-04-24 14:03:44 +00:00
|
|
|
|
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
|
2021-11-09 01:27:04 +00:00
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return status, nil
|
2021-11-09 01:27:04 +00:00
|
|
|
|
}
|
|
|
|
|
dct := &watchDeltaChannelsTask{
|
|
|
|
|
baseTask: baseTask{
|
|
|
|
|
ctx: ctx,
|
|
|
|
|
done: make(chan error),
|
|
|
|
|
},
|
|
|
|
|
req: in,
|
|
|
|
|
node: node,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
err := node.scheduler.queue.Enqueue(dct)
|
|
|
|
|
if err != nil {
|
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Warn(err.Error())
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return status, nil
|
2021-11-09 01:27:04 +00:00
|
|
|
|
}
|
2022-05-07 02:27:51 +00:00
|
|
|
|
|
|
|
|
|
log.Info("watchDeltaChannelsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
|
2021-11-09 01:27:04 +00:00
|
|
|
|
|
|
|
|
|
waitFunc := func() (*commonpb.Status, error) {
|
|
|
|
|
err = dct.WaitToFinish()
|
|
|
|
|
if err != nil {
|
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Warn(err.Error())
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return status, nil
|
2021-11-09 01:27:04 +00:00
|
|
|
|
}
|
2022-05-07 02:27:51 +00:00
|
|
|
|
|
|
|
|
|
log.Info("watchDeltaChannelsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
|
2021-11-09 01:27:04 +00:00
|
|
|
|
return &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return waitFunc()
|
2021-11-05 06:47:19 +00:00
|
|
|
|
}
|
|
|
|
|
|
2021-11-02 13:58:22 +00:00
|
|
|
|
// LoadSegments load historical data into query node, historical data can be vector data or index
|
2021-04-12 01:18:43 +00:00
|
|
|
|
func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegmentsRequest) (*commonpb.Status, error) {
|
2021-07-13 06:16:00 +00:00
|
|
|
|
code := node.stateCode.Load().(internalpb.StateCode)
|
|
|
|
|
if code != internalpb.StateCode_Healthy {
|
2022-04-24 14:03:44 +00:00
|
|
|
|
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
|
2021-07-13 06:16:00 +00:00
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return status, nil
|
2021-07-13 06:16:00 +00:00
|
|
|
|
}
|
2021-04-12 01:18:43 +00:00
|
|
|
|
dct := &loadSegmentsTask{
|
|
|
|
|
baseTask: baseTask{
|
|
|
|
|
ctx: ctx,
|
|
|
|
|
done: make(chan error),
|
|
|
|
|
},
|
|
|
|
|
req: in,
|
|
|
|
|
node: node,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
err := node.scheduler.queue.Enqueue(dct)
|
|
|
|
|
if err != nil {
|
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Warn(err.Error())
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return status, nil
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
2021-06-15 04:41:40 +00:00
|
|
|
|
segmentIDs := make([]UniqueID, 0)
|
|
|
|
|
for _, info := range in.Infos {
|
|
|
|
|
segmentIDs = append(segmentIDs, info.SegmentID)
|
|
|
|
|
}
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Info("loadSegmentsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", segmentIDs), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
|
2021-04-12 01:18:43 +00:00
|
|
|
|
|
2021-08-03 14:01:27 +00:00
|
|
|
|
waitFunc := func() (*commonpb.Status, error) {
|
2021-04-16 06:40:33 +00:00
|
|
|
|
err = dct.WaitToFinish()
|
|
|
|
|
if err != nil {
|
2021-08-03 14:01:27 +00:00
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Warn(err.Error())
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return status, nil
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Info("loadSegmentsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", segmentIDs), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
|
2021-08-03 14:01:27 +00:00
|
|
|
|
return &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
|
}, nil
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
2021-08-03 14:01:27 +00:00
|
|
|
|
|
|
|
|
|
return waitFunc()
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
|
|
|
|
|
2021-12-24 09:14:52 +00:00
|
|
|
|
// ReleaseCollection clears all data related to this collection on the querynode
|
2021-04-12 01:18:43 +00:00
|
|
|
|
func (node *QueryNode) ReleaseCollection(ctx context.Context, in *queryPb.ReleaseCollectionRequest) (*commonpb.Status, error) {
|
2021-07-13 06:16:00 +00:00
|
|
|
|
code := node.stateCode.Load().(internalpb.StateCode)
|
|
|
|
|
if code != internalpb.StateCode_Healthy {
|
2022-04-24 14:03:44 +00:00
|
|
|
|
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
|
2021-07-13 06:16:00 +00:00
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return status, nil
|
2021-07-13 06:16:00 +00:00
|
|
|
|
}
|
2021-04-12 01:18:43 +00:00
|
|
|
|
dct := &releaseCollectionTask{
|
|
|
|
|
baseTask: baseTask{
|
|
|
|
|
ctx: ctx,
|
|
|
|
|
done: make(chan error),
|
|
|
|
|
},
|
|
|
|
|
req: in,
|
|
|
|
|
node: node,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
err := node.scheduler.queue.Enqueue(dct)
|
|
|
|
|
if err != nil {
|
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Warn(err.Error())
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return status, nil
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Info("releaseCollectionTask Enqueue done", zap.Int64("collectionID", in.CollectionID))
|
2021-04-12 01:18:43 +00:00
|
|
|
|
|
2021-04-16 09:37:50 +00:00
|
|
|
|
func() {
|
2021-04-16 06:40:33 +00:00
|
|
|
|
err = dct.WaitToFinish()
|
|
|
|
|
if err != nil {
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Warn(err.Error())
|
2021-04-16 06:40:33 +00:00
|
|
|
|
return
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Info("releaseCollectionTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID))
|
2021-04-16 06:40:33 +00:00
|
|
|
|
}()
|
2021-04-12 01:18:43 +00:00
|
|
|
|
|
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
|
}
|
|
|
|
|
return status, nil
|
|
|
|
|
}
|
|
|
|
|
|
2021-11-01 14:58:53 +00:00
|
|
|
|
// ReleasePartitions clears all data related to this partition on the querynode
|
2021-04-12 01:18:43 +00:00
|
|
|
|
func (node *QueryNode) ReleasePartitions(ctx context.Context, in *queryPb.ReleasePartitionsRequest) (*commonpb.Status, error) {
|
2021-07-13 06:16:00 +00:00
|
|
|
|
code := node.stateCode.Load().(internalpb.StateCode)
|
|
|
|
|
if code != internalpb.StateCode_Healthy {
|
2022-04-24 14:03:44 +00:00
|
|
|
|
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
|
2021-07-13 06:16:00 +00:00
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return status, nil
|
2021-07-13 06:16:00 +00:00
|
|
|
|
}
|
2021-04-12 01:18:43 +00:00
|
|
|
|
dct := &releasePartitionsTask{
|
|
|
|
|
baseTask: baseTask{
|
|
|
|
|
ctx: ctx,
|
|
|
|
|
done: make(chan error),
|
|
|
|
|
},
|
|
|
|
|
req: in,
|
|
|
|
|
node: node,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
err := node.scheduler.queue.Enqueue(dct)
|
|
|
|
|
if err != nil {
|
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Warn(err.Error())
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return status, nil
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Info("releasePartitionsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("partitionIDs", in.PartitionIDs))
|
2021-04-12 01:18:43 +00:00
|
|
|
|
|
2021-04-16 09:37:50 +00:00
|
|
|
|
func() {
|
2021-04-16 06:40:33 +00:00
|
|
|
|
err = dct.WaitToFinish()
|
|
|
|
|
if err != nil {
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Warn(err.Error())
|
2021-04-16 06:40:33 +00:00
|
|
|
|
return
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Info("releasePartitionsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("partitionIDs", in.PartitionIDs))
|
2021-04-16 06:40:33 +00:00
|
|
|
|
}()
|
2021-04-12 01:18:43 +00:00
|
|
|
|
|
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
|
}
|
|
|
|
|
return status, nil
|
|
|
|
|
}
|
|
|
|
|
|
2021-11-03 15:13:26 +00:00
|
|
|
|
// ReleaseSegments remove the specified segments from query node according segmentIDs, partitionIDs, and collectionID
|
2021-04-12 01:18:43 +00:00
|
|
|
|
func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseSegmentsRequest) (*commonpb.Status, error) {
|
2021-07-13 06:16:00 +00:00
|
|
|
|
code := node.stateCode.Load().(internalpb.StateCode)
|
|
|
|
|
if code != internalpb.StateCode_Healthy {
|
2022-04-24 14:03:44 +00:00
|
|
|
|
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
|
2021-07-13 06:16:00 +00:00
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return status, nil
|
2021-07-13 06:16:00 +00:00
|
|
|
|
}
|
2021-04-12 01:18:43 +00:00
|
|
|
|
status := &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
|
}
|
2022-05-16 10:23:55 +00:00
|
|
|
|
// collection lock is not needed since we guarantee not query/search will be dispatch from leader
|
2021-04-12 01:18:43 +00:00
|
|
|
|
for _, id := range in.SegmentIDs {
|
2022-05-23 08:41:58 +00:00
|
|
|
|
err := node.historical.removeSegment(id)
|
2021-06-15 04:41:40 +00:00
|
|
|
|
if err != nil {
|
|
|
|
|
// not return, try to release all segments
|
|
|
|
|
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
|
|
|
|
status.Reason = err.Error()
|
|
|
|
|
}
|
2022-05-23 08:41:58 +00:00
|
|
|
|
err = node.streaming.removeSegment(id)
|
2021-06-15 04:41:40 +00:00
|
|
|
|
if err != nil {
|
2021-04-12 01:18:43 +00:00
|
|
|
|
// not return, try to release all segments
|
|
|
|
|
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
2021-06-15 04:41:40 +00:00
|
|
|
|
status.Reason = err.Error()
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
2022-01-05 13:15:20 +00:00
|
|
|
|
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Info("release segments done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", in.SegmentIDs))
|
2021-04-12 01:18:43 +00:00
|
|
|
|
return status, nil
|
|
|
|
|
}
|
|
|
|
|
|
2021-11-02 13:56:34 +00:00
|
|
|
|
// GetSegmentInfo returns segment information of the collection on the queryNode, and the information includes memSize, numRow, indexName, indexID ...
|
2021-04-12 01:18:43 +00:00
|
|
|
|
func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmentInfoRequest) (*queryPb.GetSegmentInfoResponse, error) {
|
2021-07-13 06:16:00 +00:00
|
|
|
|
code := node.stateCode.Load().(internalpb.StateCode)
|
|
|
|
|
if code != internalpb.StateCode_Healthy {
|
2022-04-24 14:03:44 +00:00
|
|
|
|
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
|
2021-07-13 06:16:00 +00:00
|
|
|
|
res := &queryPb.GetSegmentInfoResponse{
|
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
},
|
|
|
|
|
}
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return res, nil
|
2021-07-13 06:16:00 +00:00
|
|
|
|
}
|
2022-02-07 07:35:44 +00:00
|
|
|
|
var segmentInfos []*queryPb.SegmentInfo
|
|
|
|
|
|
|
|
|
|
segmentIDs := make(map[int64]struct{})
|
|
|
|
|
for _, segmentID := range in.GetSegmentIDs() {
|
|
|
|
|
segmentIDs[segmentID] = struct{}{}
|
|
|
|
|
}
|
2021-11-05 08:00:55 +00:00
|
|
|
|
|
2021-05-28 02:26:30 +00:00
|
|
|
|
// get info from historical
|
2022-05-23 08:41:58 +00:00
|
|
|
|
historicalSegmentInfos, err := node.historical.getSegmentInfosByColID(in.CollectionID)
|
2021-10-20 11:47:35 +00:00
|
|
|
|
if err != nil {
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Warn("GetSegmentInfo: get historical segmentInfo failed", zap.Int64("collectionID", in.CollectionID), zap.Error(err))
|
2021-10-20 11:47:35 +00:00
|
|
|
|
res := &queryPb.GetSegmentInfoResponse{
|
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
},
|
|
|
|
|
}
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return res, nil
|
2021-10-20 11:47:35 +00:00
|
|
|
|
}
|
2022-02-07 07:35:44 +00:00
|
|
|
|
segmentInfos = append(segmentInfos, filterSegmentInfo(historicalSegmentInfos, segmentIDs)...)
|
2021-10-20 11:47:35 +00:00
|
|
|
|
|
2021-05-28 02:26:30 +00:00
|
|
|
|
// get info from streaming
|
2022-05-23 08:41:58 +00:00
|
|
|
|
streamingSegmentInfos, err := node.streaming.getSegmentInfosByColID(in.CollectionID)
|
2021-10-20 11:47:35 +00:00
|
|
|
|
if err != nil {
|
2022-05-07 02:27:51 +00:00
|
|
|
|
log.Warn("GetSegmentInfo: get streaming segmentInfo failed", zap.Int64("collectionID", in.CollectionID), zap.Error(err))
|
2021-10-20 11:47:35 +00:00
|
|
|
|
res := &queryPb.GetSegmentInfoResponse{
|
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
},
|
|
|
|
|
}
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return res, nil
|
2021-10-20 11:47:35 +00:00
|
|
|
|
}
|
2022-02-07 07:35:44 +00:00
|
|
|
|
segmentInfos = append(segmentInfos, filterSegmentInfo(streamingSegmentInfos, segmentIDs)...)
|
2021-11-06 07:22:56 +00:00
|
|
|
|
|
2021-04-12 01:18:43 +00:00
|
|
|
|
return &queryPb.GetSegmentInfoResponse{
|
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
|
},
|
2022-02-07 07:35:44 +00:00
|
|
|
|
Infos: segmentInfos,
|
2021-04-12 01:18:43 +00:00
|
|
|
|
}, nil
|
|
|
|
|
}
|
2021-08-17 02:06:11 +00:00
|
|
|
|
|
2022-02-07 07:35:44 +00:00
|
|
|
|
// filterSegmentInfo returns segment info which segment id in segmentIDs map
|
|
|
|
|
func filterSegmentInfo(segmentInfos []*queryPb.SegmentInfo, segmentIDs map[int64]struct{}) []*queryPb.SegmentInfo {
|
|
|
|
|
if len(segmentIDs) == 0 {
|
|
|
|
|
return segmentInfos
|
|
|
|
|
}
|
|
|
|
|
filtered := make([]*queryPb.SegmentInfo, 0, len(segmentIDs))
|
|
|
|
|
for _, info := range segmentInfos {
|
|
|
|
|
_, ok := segmentIDs[info.GetSegmentID()]
|
|
|
|
|
if !ok {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
filtered = append(filtered, info)
|
|
|
|
|
}
|
|
|
|
|
return filtered
|
|
|
|
|
}
|
|
|
|
|
|
2021-12-29 09:37:25 +00:00
|
|
|
|
// isHealthy checks if QueryNode is healthy
|
2021-08-17 02:06:11 +00:00
|
|
|
|
func (node *QueryNode) isHealthy() bool {
|
|
|
|
|
code := node.stateCode.Load().(internalpb.StateCode)
|
|
|
|
|
return code == internalpb.StateCode_Healthy
|
|
|
|
|
}
|
|
|
|
|
|
2022-03-30 04:03:27 +00:00
|
|
|
|
// Search performs replica search tasks.
|
2022-04-02 06:15:31 +00:00
|
|
|
|
func (node *QueryNode) Search(ctx context.Context, req *queryPb.SearchRequest) (*internalpb.SearchResults, error) {
|
2022-05-23 08:41:58 +00:00
|
|
|
|
failRet := &internalpb.SearchResults{
|
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
},
|
|
|
|
|
}
|
2022-04-20 08:15:41 +00:00
|
|
|
|
if !node.isHealthy() {
|
2022-05-23 08:41:58 +00:00
|
|
|
|
failRet.Status.Reason = msgQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID())
|
|
|
|
|
return failRet, nil
|
2022-04-20 08:15:41 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.Debug("Received SearchRequest", zap.String("vchannel", req.GetDmlChannel()), zap.Int64s("segmentIDs", req.GetSegmentIDs()))
|
|
|
|
|
|
|
|
|
|
if node.queryShardService == nil {
|
2022-05-23 08:41:58 +00:00
|
|
|
|
failRet.Status.Reason = "queryShardService is nil"
|
|
|
|
|
return failRet, nil
|
2022-04-20 08:15:41 +00:00
|
|
|
|
}
|
|
|
|
|
|
2022-05-23 08:41:58 +00:00
|
|
|
|
if !node.queryShardService.hasQueryShard(req.GetDmlChannel()) {
|
|
|
|
|
// TODO: add replicaID in request or remove it in query shard
|
|
|
|
|
err := node.queryShardService.addQueryShard(req.Req.CollectionID, req.GetDmlChannel(), 0)
|
|
|
|
|
if err != nil {
|
|
|
|
|
failRet.Status.Reason = err.Error()
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
}
|
2022-04-20 08:15:41 +00:00
|
|
|
|
qs, err := node.queryShardService.getQueryShard(req.GetDmlChannel())
|
|
|
|
|
if err != nil {
|
2022-05-11 13:57:53 +00:00
|
|
|
|
log.Warn("Search failed, failed to get query shard", zap.String("dml channel", req.GetDmlChannel()), zap.Error(err))
|
2022-05-23 08:41:58 +00:00
|
|
|
|
failRet.Status.ErrorCode = commonpb.ErrorCode_NotShardLeader
|
|
|
|
|
failRet.Status.Reason = err.Error()
|
|
|
|
|
return failRet, nil
|
2022-04-20 08:15:41 +00:00
|
|
|
|
}
|
|
|
|
|
|
2022-05-23 08:41:58 +00:00
|
|
|
|
if req.FromShardLeader {
|
|
|
|
|
historicalTask, err2 := newSearchTask(ctx, req)
|
|
|
|
|
if err2 != nil {
|
|
|
|
|
failRet.Status.Reason = err2.Error()
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
historicalTask.QS = qs
|
|
|
|
|
historicalTask.DataScope = querypb.DataScope_Historical
|
|
|
|
|
err2 = node.scheduler.AddReadTask(ctx, historicalTask)
|
|
|
|
|
if err2 != nil {
|
|
|
|
|
failRet.Status.Reason = err2.Error()
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
err2 = historicalTask.WaitToFinish()
|
|
|
|
|
if err2 != nil {
|
|
|
|
|
failRet.Status.Reason = err2.Error()
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
return historicalTask.Ret, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//from Proxy
|
|
|
|
|
cluster, ok := qs.clusterService.getShardCluster(req.GetDmlChannel())
|
|
|
|
|
if !ok {
|
|
|
|
|
failRet.Status.Reason = fmt.Sprintf("channel %s leader is not here", req.GetDmlChannel())
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
searchCtx, cancel := context.WithCancel(ctx)
|
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
|
|
var results []*internalpb.SearchResults
|
|
|
|
|
var streamingResult *internalpb.SearchResults
|
|
|
|
|
|
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
|
var errCluster error
|
|
|
|
|
|
|
|
|
|
wg.Add(1) // search cluster
|
|
|
|
|
go func() {
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
// shard leader dispatches request to its shard cluster
|
|
|
|
|
oResults, cErr := cluster.Search(searchCtx, req)
|
|
|
|
|
if cErr != nil {
|
|
|
|
|
log.Warn("search cluster failed", zap.Int64("collectionID", req.Req.GetCollectionID()), zap.Error(cErr))
|
|
|
|
|
cancel()
|
|
|
|
|
errCluster = cErr
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
results = oResults
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
var errStreaming error
|
|
|
|
|
wg.Add(1) // search streaming
|
|
|
|
|
go func() {
|
|
|
|
|
defer func() {
|
|
|
|
|
if errStreaming != nil {
|
|
|
|
|
cancel()
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
streamingTask, err2 := newSearchTask(searchCtx, req)
|
|
|
|
|
if err2 != nil {
|
|
|
|
|
errStreaming = err2
|
|
|
|
|
}
|
|
|
|
|
streamingTask.QS = qs
|
|
|
|
|
streamingTask.DataScope = querypb.DataScope_Streaming
|
|
|
|
|
err2 = node.scheduler.AddReadTask(searchCtx, streamingTask)
|
|
|
|
|
if err2 != nil {
|
|
|
|
|
errStreaming = err2
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
err2 = streamingTask.WaitToFinish()
|
|
|
|
|
if err2 != nil {
|
|
|
|
|
errStreaming = err2
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
streamingResult = streamingTask.Ret
|
|
|
|
|
}()
|
|
|
|
|
wg.Wait()
|
|
|
|
|
|
|
|
|
|
var mainErr error
|
|
|
|
|
if errCluster != nil {
|
|
|
|
|
mainErr = errCluster
|
|
|
|
|
if errors.Is(errCluster, context.Canceled) {
|
|
|
|
|
if errStreaming != nil {
|
|
|
|
|
mainErr = errStreaming
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else if errStreaming != nil {
|
|
|
|
|
mainErr = errStreaming
|
2022-04-20 08:15:41 +00:00
|
|
|
|
}
|
|
|
|
|
|
2022-05-23 08:41:58 +00:00
|
|
|
|
if mainErr != nil {
|
|
|
|
|
failRet.Status.Reason = mainErr.Error()
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
results = append(results, streamingResult)
|
|
|
|
|
ret, err2 := reduceSearchResults(results, req.Req.GetNq(), req.Req.GetTopk(), req.Req.GetMetricType())
|
|
|
|
|
if err2 != nil {
|
|
|
|
|
failRet.Status.Reason = err2.Error()
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
return ret, nil
|
2022-03-30 04:03:27 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Query performs replica query tasks.
|
2022-04-02 06:15:31 +00:00
|
|
|
|
func (node *QueryNode) Query(ctx context.Context, req *queryPb.QueryRequest) (*internalpb.RetrieveResults, error) {
|
2022-05-23 08:41:58 +00:00
|
|
|
|
failRet := &internalpb.RetrieveResults{
|
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
},
|
|
|
|
|
}
|
2022-04-20 08:15:41 +00:00
|
|
|
|
if !node.isHealthy() {
|
2022-05-23 08:41:58 +00:00
|
|
|
|
failRet.Status.Reason = msgQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID())
|
|
|
|
|
return failRet, nil
|
2022-04-20 08:15:41 +00:00
|
|
|
|
}
|
|
|
|
|
log.Debug("Received QueryRequest", zap.String("vchannel", req.GetDmlChannel()), zap.Int64s("segmentIDs", req.GetSegmentIDs()))
|
|
|
|
|
|
|
|
|
|
if node.queryShardService == nil {
|
2022-05-23 08:41:58 +00:00
|
|
|
|
failRet.Status.Reason = "queryShardService is nil"
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !node.queryShardService.hasQueryShard(req.GetDmlChannel()) {
|
|
|
|
|
err := node.queryShardService.addQueryShard(req.Req.CollectionID, req.GetDmlChannel(), 0) // TODO: add replicaID in request or remove it in query shard
|
|
|
|
|
failRet.Status.Reason = err.Error()
|
|
|
|
|
return failRet, nil
|
2022-04-20 08:15:41 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
qs, err := node.queryShardService.getQueryShard(req.GetDmlChannel())
|
|
|
|
|
if err != nil {
|
2022-05-11 13:57:53 +00:00
|
|
|
|
log.Warn("Query failed, failed to get query shard", zap.String("dml channel", req.GetDmlChannel()), zap.Error(err))
|
2022-05-23 08:41:58 +00:00
|
|
|
|
failRet.Status.Reason = err.Error()
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if req.FromShardLeader {
|
|
|
|
|
// construct a queryTask
|
|
|
|
|
queryTask := newQueryTask(ctx, req)
|
|
|
|
|
queryTask.QS = qs
|
|
|
|
|
queryTask.DataScope = querypb.DataScope_Historical
|
|
|
|
|
err2 := node.scheduler.AddReadTask(ctx, queryTask)
|
|
|
|
|
if err2 != nil {
|
|
|
|
|
failRet.Status.Reason = err2.Error()
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
err2 = queryTask.WaitToFinish()
|
|
|
|
|
if err2 != nil {
|
|
|
|
|
failRet.Status.Reason = err2.Error()
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
return queryTask.Ret, nil
|
2022-04-20 08:15:41 +00:00
|
|
|
|
}
|
|
|
|
|
|
2022-05-23 08:41:58 +00:00
|
|
|
|
cluster, ok := qs.clusterService.getShardCluster(req.GetDmlChannel())
|
|
|
|
|
if !ok {
|
|
|
|
|
failRet.Status.Reason = fmt.Sprintf("channel %s leader is not here", req.GetDmlChannel())
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// add cancel when error occurs
|
|
|
|
|
queryCtx, cancel := context.WithCancel(ctx)
|
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
|
|
var results []*internalpb.RetrieveResults
|
|
|
|
|
var streamingResult *internalpb.RetrieveResults
|
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
|
|
|
|
|
|
var errCluster error
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
go func() {
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
// shard leader dispatches request to its shard cluster
|
|
|
|
|
oResults, cErr := cluster.Query(queryCtx, req)
|
|
|
|
|
if cErr != nil {
|
|
|
|
|
log.Warn("failed to query cluster", zap.Int64("collectionID", req.Req.GetCollectionID()), zap.Error(cErr))
|
|
|
|
|
log.Info("czs_query_cluster_cancel", zap.Error(cErr))
|
|
|
|
|
errCluster = cErr
|
|
|
|
|
cancel()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
results = oResults
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
var errStreaming error
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
go func() {
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
streamingTask := newQueryTask(queryCtx, req)
|
|
|
|
|
streamingTask.DataScope = querypb.DataScope_Streaming
|
|
|
|
|
streamingTask.QS = qs
|
|
|
|
|
err2 := node.scheduler.AddReadTask(queryCtx, streamingTask)
|
|
|
|
|
defer func() {
|
|
|
|
|
errStreaming = err2
|
|
|
|
|
if err2 != nil {
|
|
|
|
|
cancel()
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
if err2 != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
err2 = streamingTask.WaitToFinish()
|
|
|
|
|
if err2 != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
streamingResult = streamingTask.Ret
|
|
|
|
|
}()
|
|
|
|
|
wg.Wait()
|
|
|
|
|
|
|
|
|
|
var mainErr error
|
|
|
|
|
if errCluster != nil {
|
|
|
|
|
mainErr = errCluster
|
|
|
|
|
if errors.Is(errCluster, context.Canceled) {
|
|
|
|
|
if errStreaming != nil {
|
|
|
|
|
mainErr = errStreaming
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else if errStreaming != nil {
|
|
|
|
|
mainErr = errStreaming
|
2022-04-20 08:15:41 +00:00
|
|
|
|
}
|
|
|
|
|
|
2022-05-23 08:41:58 +00:00
|
|
|
|
if mainErr != nil {
|
|
|
|
|
failRet.Status.Reason = mainErr.Error()
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
results = append(results, streamingResult)
|
|
|
|
|
ret, err2 := mergeInternalRetrieveResults(results)
|
|
|
|
|
if err2 != nil {
|
|
|
|
|
failRet.Status.Reason = err2.Error()
|
|
|
|
|
return failRet, nil
|
|
|
|
|
}
|
|
|
|
|
return ret, nil
|
2022-03-30 04:03:27 +00:00
|
|
|
|
}
|
|
|
|
|
|
2022-04-27 02:41:46 +00:00
|
|
|
|
// SyncReplicaSegments syncs replica node & segments states
|
|
|
|
|
func (node *QueryNode) SyncReplicaSegments(ctx context.Context, req *querypb.SyncReplicaSegmentsRequest) (*commonpb.Status, error) {
|
|
|
|
|
if !node.isHealthy() {
|
|
|
|
|
return &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: msgQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID()),
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.Debug("Received SyncReplicaSegments request", zap.String("vchannelName", req.GetVchannelName()))
|
|
|
|
|
|
|
|
|
|
err := node.ShardClusterService.SyncReplicaSegments(req.GetVchannelName(), req.GetReplicaSegments())
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Warn("failed to sync replica semgents,", zap.String("vchannel", req.GetVchannelName()), zap.Error(err))
|
|
|
|
|
return &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.Debug("SyncReplicaSegments Done", zap.String("vchannel", req.GetVchannelName()))
|
|
|
|
|
|
|
|
|
|
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
|
|
|
|
|
}
|
|
|
|
|
|
2021-12-24 09:13:10 +00:00
|
|
|
|
// GetMetrics return system infos of the query node, such as total memory, memory usage, cpu usage ...
|
2021-11-10 15:56:10 +00:00
|
|
|
|
// TODO(dragondriver): cache the Metrics and set a retention to the cache
|
2021-08-17 02:06:11 +00:00
|
|
|
|
func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
|
|
|
|
|
if !node.isHealthy() {
|
|
|
|
|
log.Warn("QueryNode.GetMetrics failed",
|
2022-04-24 14:03:44 +00:00
|
|
|
|
zap.Int64("node_id", Params.QueryNodeCfg.GetNodeID()),
|
2021-08-17 02:06:11 +00:00
|
|
|
|
zap.String("req", req.Request),
|
2022-04-24 14:03:44 +00:00
|
|
|
|
zap.Error(errQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID())))
|
2021-08-17 02:06:11 +00:00
|
|
|
|
|
|
|
|
|
return &milvuspb.GetMetricsResponse{
|
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
2022-04-24 14:03:44 +00:00
|
|
|
|
Reason: msgQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID()),
|
2021-08-17 02:06:11 +00:00
|
|
|
|
},
|
|
|
|
|
Response: "",
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
metricType, err := metricsinfo.ParseMetricType(req.Request)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Warn("QueryNode.GetMetrics failed to parse metric type",
|
2022-04-24 14:03:44 +00:00
|
|
|
|
zap.Int64("node_id", Params.QueryNodeCfg.GetNodeID()),
|
2021-08-17 02:06:11 +00:00
|
|
|
|
zap.String("req", req.Request),
|
|
|
|
|
zap.Error(err))
|
|
|
|
|
|
|
|
|
|
return &milvuspb.GetMetricsResponse{
|
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: err.Error(),
|
|
|
|
|
},
|
|
|
|
|
Response: "",
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if metricType == metricsinfo.SystemInfoMetrics {
|
|
|
|
|
metrics, err := getSystemInfoMetrics(ctx, req, node)
|
2022-01-14 15:55:34 +00:00
|
|
|
|
if err != nil {
|
|
|
|
|
log.Warn("QueryNode.GetMetrics failed",
|
2022-04-24 14:03:44 +00:00
|
|
|
|
zap.Int64("node_id", Params.QueryNodeCfg.GetNodeID()),
|
2022-01-14 15:55:34 +00:00
|
|
|
|
zap.String("req", req.Request),
|
|
|
|
|
zap.String("metric_type", metricType),
|
|
|
|
|
zap.Error(err))
|
|
|
|
|
}
|
2021-08-17 02:06:11 +00:00
|
|
|
|
|
2021-12-01 14:13:37 +00:00
|
|
|
|
return metrics, nil
|
2021-08-17 02:06:11 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.Debug("QueryNode.GetMetrics failed, request metric type is not implemented yet",
|
2022-04-24 14:03:44 +00:00
|
|
|
|
zap.Int64("node_id", Params.QueryNodeCfg.GetNodeID()),
|
2021-08-17 02:06:11 +00:00
|
|
|
|
zap.String("req", req.Request),
|
|
|
|
|
zap.String("metric_type", metricType))
|
|
|
|
|
|
|
|
|
|
return &milvuspb.GetMetricsResponse{
|
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
|
Reason: metricsinfo.MsgUnimplementedMetric,
|
|
|
|
|
},
|
|
|
|
|
Response: "",
|
|
|
|
|
}, nil
|
|
|
|
|
}
|