milvus/internal/querynode/impl.go

654 lines
19 KiB
Go

// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querynode
import (
"context"
"errors"
"fmt"
"math/rand"
"strconv"
"strings"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
queryPb "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
func (node *QueryNode) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
stats := &internalpb.ComponentStates{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
}
code := node.stateCode.Load().(internalpb.StateCode)
if code != internalpb.StateCode_Healthy {
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeID)
stats.Status = &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
return stats, err
}
info := &internalpb.ComponentInfo{
NodeID: Params.QueryNodeID,
Role: typeutil.QueryNodeRole,
StateCode: code,
}
stats.State = info
return stats, nil
}
func (node *QueryNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Value: Params.QueryTimeTickChannelName,
}, nil
}
func (node *QueryNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Value: Params.StatsChannelName,
}, nil
}
func (node *QueryNode) AddQueryChannel(ctx context.Context, in *queryPb.AddQueryChannelRequest) (*commonpb.Status, error) {
code := node.stateCode.Load().(internalpb.StateCode)
if code != internalpb.StateCode_Healthy {
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeID)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
return status, err
}
collectionID := in.CollectionID
if node.queryService == nil {
errMsg := "null query service, collectionID = " + fmt.Sprintln(collectionID)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
if node.queryService.hasQueryCollection(collectionID) {
log.Debug("queryCollection has been existed when addQueryChannel",
zap.Any("collectionID", collectionID),
)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
return status, nil
}
// add search collection
err := node.queryService.addQueryCollection(collectionID)
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
return status, err
}
log.Debug("add query collection", zap.Any("collectionID", collectionID))
// add request channel
sc, err := node.queryService.getQueryCollection(in.CollectionID)
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
return status, err
}
consumeChannels := []string{in.RequestChannelID}
if in.SeekPosition == nil || len(in.SeekPosition.MsgID) == 0 {
// as consumer
consumeSubName := Params.MsgChannelSubName + "-" + strconv.FormatInt(collectionID, 10) + "-" + strconv.Itoa(rand.Int())
sc.queryMsgStream.AsConsumer(consumeChannels, consumeSubName)
log.Debug("querynode AsConsumer: " + strings.Join(consumeChannels, ", ") + " : " + consumeSubName)
} else {
// seek query channel
sc.queryMsgStream.AsConsumer(consumeChannels, in.SeekPosition.MsgGroup)
err = sc.queryMsgStream.Seek([]*internalpb.MsgPosition{in.SeekPosition})
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
return status, err
}
log.Debug("querynode seek query channel: ", zap.Any("consumeChannels", consumeChannels))
}
// add result channel
producerChannels := []string{in.ResultChannelID}
sc.queryResultMsgStream.AsProducer(producerChannels)
log.Debug("querynode AsProducer: " + strings.Join(producerChannels, ", "))
// init global sealed segments
for _, segment := range in.GlobalSealedSegments {
err = sc.globalSegmentManager.addGlobalSegmentInfo(segment)
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
return status, err
}
}
// start queryCollection, message stream need to asConsumer before start
sc.start()
log.Debug("start query collection", zap.Any("collectionID", collectionID))
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
return status, nil
}
func (node *QueryNode) RemoveQueryChannel(ctx context.Context, in *queryPb.RemoveQueryChannelRequest) (*commonpb.Status, error) {
// if node.searchService == nil || node.searchService.searchMsgStream == nil {
// errMsg := "null search service or null search result message stream"
// status := &commonpb.Status{
// ErrorCode: commonpb.ErrorCode_UnexpectedError,
// Reason: errMsg,
// }
// return status, errors.New(errMsg)
// }
// searchStream, ok := node.searchService.searchMsgStream.(*pulsarms.PulsarMsgStream)
// if !ok {
// errMsg := "type assertion failed for search message stream"
// status := &commonpb.Status{
// ErrorCode: commonpb.ErrorCode_UnexpectedError,
// Reason: errMsg,
// }
// return status, errors.New(errMsg)
// }
// resultStream, ok := node.searchService.searchResultMsgStream.(*pulsarms.PulsarMsgStream)
// if !ok {
// errMsg := "type assertion failed for search result message stream"
// status := &commonpb.Status{
// ErrorCode: commonpb.ErrorCode_UnexpectedError,
// Reason: errMsg,
// }
// return status, errors.New(errMsg)
// }
// // remove request channel
// consumeChannels := []string{in.RequestChannelID}
// consumeSubName := Params.MsgChannelSubName
// // TODO: searchStream.RemovePulsarConsumers(producerChannels)
// searchStream.AsConsumer(consumeChannels, consumeSubName)
// // remove result channel
// producerChannels := []string{in.ResultChannelID}
// // TODO: resultStream.RemovePulsarProducer(producerChannels)
// resultStream.AsProducer(producerChannels)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
return status, nil
}
func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) {
code := node.stateCode.Load().(internalpb.StateCode)
if code != internalpb.StateCode_Healthy {
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeID)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
return status, err
}
dct := &watchDmChannelsTask{
baseTask: baseTask{
ctx: ctx,
done: make(chan error),
},
req: in,
node: node,
}
err := node.scheduler.queue.Enqueue(dct)
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
log.Warn(err.Error())
return status, err
}
log.Debug("watchDmChannelsTask Enqueue done", zap.Any("collectionID", in.CollectionID))
waitFunc := func() (*commonpb.Status, error) {
err = dct.WaitToFinish()
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
log.Warn(err.Error())
return status, err
}
log.Debug("watchDmChannelsTask WaitToFinish done", zap.Any("collectionID", in.CollectionID))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
return waitFunc()
}
func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegmentsRequest) (*commonpb.Status, error) {
code := node.stateCode.Load().(internalpb.StateCode)
if code != internalpb.StateCode_Healthy {
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeID)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
return status, err
}
dct := &loadSegmentsTask{
baseTask: baseTask{
ctx: ctx,
done: make(chan error),
},
req: in,
node: node,
}
err := node.scheduler.queue.Enqueue(dct)
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
log.Warn(err.Error())
return status, err
}
segmentIDs := make([]UniqueID, 0)
for _, info := range in.Infos {
segmentIDs = append(segmentIDs, info.SegmentID)
}
log.Debug("loadSegmentsTask Enqueue done", zap.Int64s("segmentIDs", segmentIDs))
waitFunc := func() (*commonpb.Status, error) {
err = dct.WaitToFinish()
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
log.Warn(err.Error())
return status, err
}
log.Debug("loadSegmentsTask WaitToFinish done", zap.Int64s("segmentIDs", segmentIDs))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
return waitFunc()
}
func (node *QueryNode) ReleaseCollection(ctx context.Context, in *queryPb.ReleaseCollectionRequest) (*commonpb.Status, error) {
code := node.stateCode.Load().(internalpb.StateCode)
if code != internalpb.StateCode_Healthy {
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeID)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
return status, err
}
dct := &releaseCollectionTask{
baseTask: baseTask{
ctx: ctx,
done: make(chan error),
},
req: in,
node: node,
}
err := node.scheduler.queue.Enqueue(dct)
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
log.Warn(err.Error())
return status, err
}
log.Debug("releaseCollectionTask Enqueue done", zap.Any("collectionID", in.CollectionID))
func() {
err = dct.WaitToFinish()
if err != nil {
log.Warn(err.Error())
return
}
log.Debug("releaseCollectionTask WaitToFinish done", zap.Any("collectionID", in.CollectionID))
}()
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
return status, nil
}
func (node *QueryNode) ReleasePartitions(ctx context.Context, in *queryPb.ReleasePartitionsRequest) (*commonpb.Status, error) {
code := node.stateCode.Load().(internalpb.StateCode)
if code != internalpb.StateCode_Healthy {
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeID)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
return status, err
}
dct := &releasePartitionsTask{
baseTask: baseTask{
ctx: ctx,
done: make(chan error),
},
req: in,
node: node,
}
err := node.scheduler.queue.Enqueue(dct)
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
log.Warn(err.Error())
return status, err
}
log.Debug("releasePartitionsTask Enqueue done", zap.Any("collectionID", in.CollectionID))
func() {
err = dct.WaitToFinish()
if err != nil {
log.Warn(err.Error())
return
}
log.Debug("releasePartitionsTask WaitToFinish done", zap.Any("collectionID", in.CollectionID))
}()
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
return status, nil
}
func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseSegmentsRequest) (*commonpb.Status, error) {
code := node.stateCode.Load().(internalpb.StateCode)
if code != internalpb.StateCode_Healthy {
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeID)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
return status, err
}
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
for _, id := range in.SegmentIDs {
err := node.historical.replica.removeSegment(id)
if err != nil {
// not return, try to release all segments
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
}
err = node.streaming.replica.removeSegment(id)
if err != nil {
// not return, try to release all segments
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
}
}
return status, nil
}
func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmentInfoRequest) (*queryPb.GetSegmentInfoResponse, error) {
code := node.stateCode.Load().(internalpb.StateCode)
if code != internalpb.StateCode_Healthy {
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeID)
res := &queryPb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
}
return res, err
}
infos := make([]*queryPb.SegmentInfo, 0)
// TODO: remove segmentType and use queryPb.SegmentState instead
getSegmentStateBySegmentType := func(segType segmentType) queryPb.SegmentState {
switch segType {
case segmentTypeGrowing:
return queryPb.SegmentState_Growing
case segmentTypeSealed:
return queryPb.SegmentState_sealed
default:
// TODO: remove segmentTypeIndexing
return queryPb.SegmentState_None
}
}
getSegmentInfo := func(segment *Segment) *queryPb.SegmentInfo {
var indexName string
var indexID int64
// TODO:: segment has multi vec column
for fieldID := range segment.indexInfos {
indexName = segment.getIndexName(fieldID)
indexID = segment.getIndexID(fieldID)
break
}
info := &queryPb.SegmentInfo{
SegmentID: segment.ID(),
CollectionID: segment.collectionID,
PartitionID: segment.partitionID,
NodeID: Params.QueryNodeID,
MemSize: segment.getMemSize(),
NumRows: segment.getRowCount(),
IndexName: indexName,
IndexID: indexID,
ChannelID: segment.vChannelID,
SegmentState: getSegmentStateBySegmentType(segment.segmentType),
}
return info
}
// get info from historical
node.historical.replica.printReplica()
partitionIDs, err := node.historical.replica.getPartitionIDs(in.CollectionID)
if err != nil {
res := &queryPb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
}
return res, err
}
for _, partitionID := range partitionIDs {
segmentIDs, err := node.historical.replica.getSegmentIDs(partitionID)
if err != nil {
res := &queryPb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
}
return res, err
}
for _, id := range segmentIDs {
segment, err := node.historical.replica.getSegmentByID(id)
if err != nil {
res := &queryPb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
}
return res, err
}
info := getSegmentInfo(segment)
log.Debug("QueryNode::Impl::GetSegmentInfo for historical", zap.Any("SegmentID", id), zap.Any("info", info))
infos = append(infos, info)
}
}
// get info from streaming
node.streaming.replica.printReplica()
partitionIDs, err = node.streaming.replica.getPartitionIDs(in.CollectionID)
if err != nil {
res := &queryPb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
}
return res, err
}
for _, partitionID := range partitionIDs {
segmentIDs, err := node.streaming.replica.getSegmentIDs(partitionID)
if err != nil {
res := &queryPb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
}
return res, err
}
for _, id := range segmentIDs {
segment, err := node.streaming.replica.getSegmentByID(id)
if err != nil {
res := &queryPb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
}
return res, err
}
info := getSegmentInfo(segment)
log.Debug("QueryNode::Impl::GetSegmentInfo for streaming", zap.Any("SegmentID", id), zap.Any("info", info))
infos = append(infos, info)
}
}
return &queryPb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Infos: infos,
}, nil
}
func (node *QueryNode) isHealthy() bool {
code := node.stateCode.Load().(internalpb.StateCode)
return code == internalpb.StateCode_Healthy
}
// TODO(dragondriver): cache the Metrics and set a retention to the cache
func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
log.Debug("QueryNode.GetMetrics",
zap.Int64("node_id", Params.QueryNodeID),
zap.String("req", req.Request))
if !node.isHealthy() {
log.Warn("QueryNode.GetMetrics failed",
zap.Int64("node_id", Params.QueryNodeID),
zap.String("req", req.Request),
zap.Error(errQueryNodeIsUnhealthy(Params.QueryNodeID)))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: msgQueryNodeIsUnhealthy(Params.QueryNodeID),
},
Response: "",
}, nil
}
metricType, err := metricsinfo.ParseMetricType(req.Request)
if err != nil {
log.Warn("QueryNode.GetMetrics failed to parse metric type",
zap.Int64("node_id", Params.QueryNodeID),
zap.String("req", req.Request),
zap.Error(err))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Response: "",
}, nil
}
log.Debug("QueryNode.GetMetrics",
zap.String("metric_type", metricType))
if metricType == metricsinfo.SystemInfoMetrics {
metrics, err := getSystemInfoMetrics(ctx, req, node)
log.Debug("QueryNode.GetMetrics",
zap.Int64("node_id", Params.QueryNodeID),
zap.String("req", req.Request),
zap.String("metric_type", metricType),
zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large
zap.Error(err))
return metrics, err
}
log.Debug("QueryNode.GetMetrics failed, request metric type is not implemented yet",
zap.Int64("node_id", Params.QueryNodeID),
zap.String("req", req.Request),
zap.String("metric_type", metricType))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: metricsinfo.MsgUnimplementedMetric,
},
Response: "",
}, nil
}