Fix load hang when restart querynode many time in short time (#6351)

* fix load hang when restart querynode many time in short time

Signed-off-by: xige-16 <xi.ge@zilliz.com>

* add multi queryNode ut

Signed-off-by: xige-16 <xi.ge@zilliz.com>

* add ut for restart querynode

Signed-off-by: xige-16 <xi.ge@zilliz.com>

* set queryCoord contex to load collection

Signed-off-by: xige-16 <xi.ge@zilliz.com>
pull/6470/head
xige-16 2021-07-13 14:16:00 +08:00 committed by GitHub
parent 40dc04f6a6
commit 2b720fd2f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1129 additions and 471 deletions

View File

@ -68,7 +68,7 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
s := &Server{
ctx: ctx1,
cancel: cancel,
querynode: qn.NewQueryNodeWithoutID(ctx, factory),
querynode: qn.NewQueryNode(ctx, factory),
grpcErrChan: make(chan error),
}
return s, nil
@ -96,10 +96,6 @@ func (s *Server) init() error {
return err
}
if err := s.querynode.Register(); err != nil {
return err
}
// --- RootCoord Client ---
//ms.Params.Init()
addr := Params.RootCoordAddress
@ -163,39 +159,16 @@ func (s *Server) init() error {
panic(err)
}
// --- DataCoord ---
log.Debug("QueryNode start to new DataCoordClient", zap.Any("DataCoordAddress", Params.DataCoordAddress))
dataCoord, err := dsc.NewClient(s.ctx, qn.Params.MetaRootPath, qn.Params.EtcdEndpoints)
if err != nil {
log.Debug("QueryNode new DataCoordClient failed", zap.Error(err))
panic(err)
}
if err = dataCoord.Init(); err != nil {
log.Debug("QueryNode DataCoordClient Init failed", zap.Error(err))
panic(err)
}
if err = dataCoord.Start(); err != nil {
log.Debug("QueryNode DataCoordClient Start failed", zap.Error(err))
panic(err)
}
log.Debug("QueryNode start to wait for DataCoord ready")
err = funcutil.WaitForComponentInitOrHealthy(s.ctx, dataCoord, "DataCoord", 1000000, time.Millisecond*200)
if err != nil {
log.Debug("QueryNode wait for DataCoord ready failed", zap.Error(err))
panic(err)
}
log.Debug("QueryNode report DataCoord is ready")
if err := s.SetDataCoord(dataCoord); err != nil {
panic(err)
}
s.querynode.UpdateStateCode(internalpb.StateCode_Initializing)
log.Debug("QueryNode", zap.Any("State", internalpb.StateCode_Initializing))
if err := s.querynode.Init(); err != nil {
log.Error("QueryNode init error: ", zap.Error(err))
return err
}
if err := s.querynode.Register(); err != nil {
return err
}
return nil
}
@ -288,10 +261,6 @@ func (s *Server) SetIndexCoord(indexCoord types.IndexCoord) error {
return s.querynode.SetIndexCoord(indexCoord)
}
func (s *Server) SetDataCoord(dataCoord types.DataCoord) error {
return s.querynode.SetDataCoord(dataCoord)
}
func (s *Server) GetTimeTickChannel(ctx context.Context, req *internalpb.GetTimeTickChannelRequest) (*milvuspb.StringResponse, error) {
return s.querynode.GetTimeTickChannel(ctx)
}

View File

@ -78,7 +78,7 @@ func (c *queryNodeCluster) reloadFromKV() error {
}
err = c.RegisterNode(context.Background(), session, nodeID)
if err != nil {
log.Debug("query node failed to register")
log.Debug("reloadFromKV: failed to add queryNode to cluster", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
continue
}
nodeIDs = append(nodeIDs, nodeID)
@ -195,7 +195,7 @@ func (c *queryNodeCluster) ReleaseSegments(ctx context.Context, nodeID int64, in
for _, segmentID := range in.SegmentIDs {
err := c.clusterMeta.removeSegmentInfo(segmentID)
if err != nil {
log.Error("remove segmentInfo Error", zap.Any("error", err.Error()), zap.Int64("segmentID", segmentID))
log.Error("ReleaseSegments: remove segmentInfo Error", zap.Any("error", err.Error()), zap.Int64("segmentID", segmentID))
}
}
status, err := node.client.ReleaseSegments(ctx, in)
@ -222,9 +222,9 @@ func (c *queryNodeCluster) WatchDmChannels(ctx context.Context, nodeID int64, in
for _, info := range in.Infos {
channels = append(channels, info.ChannelName)
}
log.Debug("wait queryNode watch dm channel")
log.Debug("WatchDmChannels: wait queryNode watch dm channel")
status, err := node.client.WatchDmChannels(ctx, in)
log.Debug("queryNode watch dm channel done")
log.Debug("WatchDmChannels: queryNode watch dm channel done")
if err == nil && status.ErrorCode == commonpb.ErrorCode_Success {
collectionID := in.CollectionID
//c.clusterMeta.addCollection(collectionID, in.Schema)
@ -251,6 +251,9 @@ func (c *queryNodeCluster) AddQueryChannel(ctx context.Context, nodeID int64, in
c.Lock()
defer c.Unlock()
if node, ok := c.nodes[nodeID]; ok {
if !node.isOnService() {
return nil, errors.New("node offline")
}
status, err := node.client.AddQueryChannel(ctx, in)
if err == nil && status.ErrorCode == commonpb.ErrorCode_Success {
//TODO::should reopen
@ -260,7 +263,7 @@ func (c *queryNodeCluster) AddQueryChannel(ctx context.Context, nodeID int64, in
node.addQueryChannel(collectionID, queryChannelInfo)
return status, err
}
log.Error("queryChannel for collection not assigned", zap.Int64("collectionID", collectionID))
log.Error("AddQueryChannel: queryChannel for collection not assigned", zap.Int64("collectionID", collectionID))
}
return status, err
}
@ -272,6 +275,9 @@ func (c *queryNodeCluster) removeQueryChannel(ctx context.Context, nodeID int64,
defer c.Unlock()
if node, ok := c.nodes[nodeID]; ok {
if !node.isOnService() {
return nil, errors.New("node offline")
}
status, err := node.client.RemoveQueryChannel(ctx, in)
if err == nil && status.ErrorCode == commonpb.ErrorCode_Success {
//TODO::should reopen
@ -281,7 +287,7 @@ func (c *queryNodeCluster) removeQueryChannel(ctx context.Context, nodeID int64,
node.removeQueryChannel(collectionID)
return status, err
}
log.Error("queryChannel for collection not watched", zap.Int64("collectionID", collectionID))
log.Error("removeQueryChannel: queryChannel for collection not watched", zap.Int64("collectionID", collectionID))
}
return status, err
}
@ -294,6 +300,9 @@ func (c *queryNodeCluster) releaseCollection(ctx context.Context, nodeID int64,
defer c.Unlock()
if node, ok := c.nodes[nodeID]; ok {
if !node.isOnService() {
return nil, errors.New("node offline")
}
status, err := node.client.ReleaseCollection(ctx, in)
if err == nil && status.ErrorCode == commonpb.ErrorCode_Success {
node.releaseCollection(in.CollectionID)
@ -310,6 +319,9 @@ func (c *queryNodeCluster) releasePartitions(ctx context.Context, nodeID int64,
defer c.Unlock()
if node, ok := c.nodes[nodeID]; ok {
if !node.isOnService() {
return nil, errors.New("node offline")
}
status, err := node.client.ReleasePartitions(ctx, in)
if err == nil && status.ErrorCode == commonpb.ErrorCode_Success {
for _, partitionID := range in.PartitionIDs {
@ -324,8 +336,8 @@ func (c *queryNodeCluster) releasePartitions(ctx context.Context, nodeID int64,
}
func (c *queryNodeCluster) getSegmentInfo(ctx context.Context, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error) {
c.Lock()
defer c.Unlock()
c.RLock()
defer c.RUnlock()
segmentInfos := make([]*querypb.SegmentInfo, 0)
nodes, err := c.getOnServiceNodes()
@ -345,8 +357,8 @@ func (c *queryNodeCluster) getSegmentInfo(ctx context.Context, in *querypb.GetSe
}
func (c *queryNodeCluster) getNumDmChannels(nodeID int64) (int, error) {
c.Lock()
defer c.Unlock()
c.RLock()
defer c.RUnlock()
if _, ok := c.nodes[nodeID]; !ok {
return 0, errors.New("Can't find query node by nodeID ")
@ -364,8 +376,8 @@ func (c *queryNodeCluster) getNumDmChannels(nodeID int64) (int, error) {
}
func (c *queryNodeCluster) getNumSegments(nodeID int64) (int, error) {
c.Lock()
defer c.Unlock()
c.RLock()
defer c.RUnlock()
if _, ok := c.nodes[nodeID]; !ok {
return 0, errors.New("Can't find query node by nodeID ")
@ -384,26 +396,31 @@ func (c *queryNodeCluster) RegisterNode(ctx context.Context, session *sessionuti
c.Lock()
defer c.Unlock()
sessionJSON, err := json.Marshal(session)
if err != nil {
return err
}
key := fmt.Sprintf("%s/%d", queryNodeInfoPrefix, id)
err = c.client.Save(key, string(sessionJSON))
if err != nil {
return err
}
node, err := newQueryNode(ctx, session.Address, id, c.client)
if err != nil {
return err
}
log.Debug("register a new query node", zap.Int64("nodeID", id), zap.String("address", session.Address))
if _, ok := c.nodes[id]; !ok {
c.nodes[id] = node
sessionJSON, err := json.Marshal(session)
if err != nil {
return err
}
key := fmt.Sprintf("%s/%d", queryNodeInfoPrefix, id)
err = c.client.Save(key, string(sessionJSON))
if err != nil {
return err
}
c.nodes[id] = newQueryNode(ctx, session.Address, id, c.client)
log.Debug("RegisterNode: create a new query node", zap.Int64("nodeID", id), zap.String("address", session.Address))
go func() {
err = c.nodes[id].start()
if err != nil {
log.Error("RegisterNode: start queryNode client failed", zap.Int64("nodeID", id), zap.String("error", err.Error()))
return
}
log.Debug("RegisterNode: start queryNode success, print cluster meta info", zap.Int64("nodeID", id))
c.printMeta()
}()
return nil
}
return fmt.Errorf("node %d alredy exists in cluster", id)
}
@ -428,16 +445,25 @@ func (c *queryNodeCluster) removeNodeInfo(nodeID int64) error {
return err
}
err = c.nodes[nodeID].clearNodeInfo()
if err != nil {
return err
if _, ok := c.nodes[nodeID]; ok {
err = c.nodes[nodeID].clearNodeInfo()
if err != nil {
return err
}
delete(c.nodes, nodeID)
log.Debug("removeNodeInfo: delete nodeInfo in cluster meta and etcd", zap.Int64("nodeID", nodeID))
}
delete(c.nodes, nodeID)
log.Debug("delete nodeInfo in cluster meta and etcd", zap.Int64("nodeID", nodeID))
return nil
}
func (c *queryNodeCluster) stopNode(nodeID int64) {
if node, ok := c.nodes[nodeID]; ok {
node.stop()
log.Debug("stopNode: queryNode offline", zap.Int64("nodeID", nodeID))
}
}
func (c *queryNodeCluster) onServiceNodes() (map[int64]*queryNode, error) {
c.RLock()
defer c.RUnlock()

View File

@ -110,8 +110,8 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle
loadCollectionTask := &LoadCollectionTask{
BaseTask: BaseTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
ctx: qc.loopCtx,
Condition: NewTaskCondition(qc.loopCtx),
triggerCondition: querypb.TriggerCondition_grpcRequest,
},
LoadCollectionRequest: req,
@ -156,7 +156,7 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas
releaseCollectionTask := &ReleaseCollectionTask{
BaseTask: BaseTask{
ctx: ctx,
ctx: qc.loopCtx,
Condition: NewTaskCondition(qc.loopCtx),
triggerCondition: querypb.TriggerCondition_grpcRequest,
},

View File

@ -16,10 +16,16 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/clientv3"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
)
func TestReplica_Release(t *testing.T) {
meta, err := newMeta(nil)
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints})
assert.Nil(t, err)
etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
meta, err := newMeta(etcdKV)
assert.Nil(t, err)
err = meta.addCollection(1, nil)
require.NoError(t, err)

View File

@ -1,159 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querycoord
import (
"context"
"errors"
"strconv"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
)
const (
numSegment = 12
)
type RootCoordMock struct {
types.RootCoord
CollectionIDs []UniqueID
Col2partition map[UniqueID][]UniqueID
Partition2segment map[UniqueID][]UniqueID
}
func NewRootCoordMock() *RootCoordMock {
collectionIDs := make([]UniqueID, 0)
collectionIDs = append(collectionIDs, 1)
col2partition := make(map[UniqueID][]UniqueID)
partitionIDs := make([]UniqueID, 0)
partitionIDs = append(partitionIDs, 1)
col2partition[1] = partitionIDs
partition2segment := make(map[UniqueID][]UniqueID)
segmentIDs := make([]UniqueID, 0)
for i := 0; i < numSegment; i++ {
segmentIDs = append(segmentIDs, UniqueID(i))
}
partition2segment[1] = segmentIDs
return &RootCoordMock{
CollectionIDs: collectionIDs,
Col2partition: col2partition,
Partition2segment: partition2segment,
}
}
func (rc *RootCoordMock) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
collectionID := in.CollectionID
partitionIDs := make([]UniqueID, 0)
for _, id := range rc.CollectionIDs {
if id == collectionID {
partitions := rc.Col2partition[collectionID]
partitionIDs = append(partitionIDs, partitions...)
}
}
response := &milvuspb.ShowPartitionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
PartitionIDs: partitionIDs,
}
return response, nil
}
func (rc *RootCoordMock) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) {
collectionID := in.CollectionID
partitionID := in.PartitionID
for _, id := range rc.CollectionIDs {
if id == collectionID {
partitions := rc.Col2partition[collectionID]
for _, partition := range partitions {
if partition == partitionID {
return &milvuspb.ShowSegmentsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
//SegmentIDs: rc.Partition2segment[partition],
}, nil
}
}
}
}
return nil, errors.New("collection id or partition id not found")
}
type DataMock struct {
types.DataCoord
SegmentIDs []UniqueID
SegmentStates map[UniqueID]*datapb.SegmentStateInfo
}
func NewDataMock() *DataMock {
positions := make([]*internalpb.MsgPosition, 0)
positions = append(positions, &internalpb.MsgPosition{ChannelName: "insert-" + strconv.FormatInt(0, 10)})
positions = append(positions, &internalpb.MsgPosition{ChannelName: "insert-" + strconv.FormatInt(1, 10)})
positions = append(positions, &internalpb.MsgPosition{ChannelName: "insert-" + strconv.FormatInt(2, 10)})
positions = append(positions, &internalpb.MsgPosition{ChannelName: "insert-" + strconv.FormatInt(3, 10)})
fillStates := func(segmentID UniqueID, time uint64, position *internalpb.MsgPosition) *datapb.SegmentStateInfo {
return &datapb.SegmentStateInfo{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
SegmentID: segmentID,
State: commonpb.SegmentState_Flushed,
StartPosition: position,
}
}
segmentStates := make(map[UniqueID]*datapb.SegmentStateInfo)
segmentIDs := make([]UniqueID, 0)
for i := 0; i < numSegment; i++ {
segmentIDs = append(segmentIDs, UniqueID(i))
pick := i % 4
segmentStates[UniqueID(i)] = fillStates(UniqueID(i), uint64(i), positions[pick])
}
return &DataMock{
SegmentIDs: segmentIDs,
SegmentStates: segmentStates,
}
}
func (data *DataMock) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
ret := &datapb.GetSegmentStatesResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
}
for _, segID := range req.SegmentIDs {
for _, segmentID := range data.SegmentIDs {
if segmentID == segID {
ret.States = append(ret.States, data.SegmentStates[segmentID])
}
}
}
if ret.States == nil {
return ret, nil
}
return ret, nil
}

View File

@ -0,0 +1,521 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querycoord
import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"path"
"strconv"
"sync"
"google.golang.org/grpc"
"github.com/milvus-io/milvus/internal/kv"
minioKV "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
qn "github.com/milvus-io/milvus/internal/querynode"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/retry"
)
const (
defaultCollectionID = UniqueID(2021)
defaultPartitionID = UniqueID(2021)
defaultSegmentID = UniqueID(2021)
defaultShardsNum = 2
)
func genCollectionSchema(collectionID UniqueID, isBinary bool) *schemapb.CollectionSchema {
var fieldVec schemapb.FieldSchema
if isBinary {
fieldVec = schemapb.FieldSchema{
FieldID: UniqueID(101),
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_BinaryVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "128",
},
},
IndexParams: []*commonpb.KeyValuePair{
{
Key: "metric_type",
Value: "JACCARD",
},
},
}
} else {
fieldVec = schemapb.FieldSchema{
FieldID: UniqueID(101),
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "16",
},
},
IndexParams: []*commonpb.KeyValuePair{
{
Key: "metric_type",
Value: "L2",
},
},
}
}
return &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{
{FieldID: 0, Name: "row_id", IsPrimaryKey: false, Description: "row_id", DataType: schemapb.DataType_Int64},
{FieldID: 1, Name: "Ts", IsPrimaryKey: false, Description: "Ts", DataType: schemapb.DataType_Int64},
{FieldID: 100, Name: "field_age", IsPrimaryKey: false, Description: "int64", DataType: schemapb.DataType_Int64},
&fieldVec,
},
}
}
func genETCDCollectionMeta(collectionID UniqueID, isBinary bool) *etcdpb.CollectionMeta {
schema := genCollectionSchema(collectionID, isBinary)
collectionMeta := etcdpb.CollectionMeta{
ID: collectionID,
Schema: schema,
CreateTime: Timestamp(0),
PartitionIDs: []UniqueID{defaultPartitionID},
}
return &collectionMeta
}
func generateInsertBinLog(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, keyPrefix string, kv kv.BaseKV) (map[int64]string, error) {
const (
msgLength = 1000
DIM = 16
)
idData := make([]int64, 0)
for n := 0; n < msgLength; n++ {
idData = append(idData, int64(n))
}
var timestamps []int64
for n := 0; n < msgLength; n++ {
timestamps = append(timestamps, int64(n+1))
}
var fieldAgeData []int64
for n := 0; n < msgLength; n++ {
fieldAgeData = append(fieldAgeData, int64(n))
}
fieldVecData := make([]float32, 0)
for n := 0; n < msgLength; n++ {
for i := 0; i < DIM; i++ {
fieldVecData = append(fieldVecData, float32(n*i)*0.1)
}
}
insertData := &storage.InsertData{
Data: map[int64]storage.FieldData{
0: &storage.Int64FieldData{
NumRows: msgLength,
Data: idData,
},
1: &storage.Int64FieldData{
NumRows: msgLength,
Data: timestamps,
},
100: &storage.Int64FieldData{
NumRows: msgLength,
Data: fieldAgeData,
},
101: &storage.FloatVectorFieldData{
NumRows: msgLength,
Data: fieldVecData,
Dim: DIM,
},
},
}
// buffer data to binLogs
collMeta := genETCDCollectionMeta(collectionID, false)
inCodec := storage.NewInsertCodec(collMeta)
binLogs, _, err := inCodec.Serialize(partitionID, segmentID, insertData)
if err != nil {
log.Debug("insert data serialize error")
return nil, err
}
// binLogs -> minIO/S3
segIDStr := strconv.FormatInt(segmentID, 10)
keyPrefix = path.Join(keyPrefix, segIDStr)
fieldID2Paths := make(map[int64]string)
for _, blob := range binLogs {
uid := rand.Int63n(100000000)
path := path.Join(keyPrefix, blob.Key, strconv.FormatInt(uid, 10))
err = kv.Save(path, string(blob.Value[:]))
if err != nil {
return nil, err
}
fieldID, err := strconv.Atoi(blob.Key)
if err != nil {
return nil, err
}
fieldID2Paths[int64(fieldID)] = path
}
return fieldID2Paths, nil
}
type rootCoordMock struct {
types.RootCoord
CollectionIDs []UniqueID
Col2partition map[UniqueID][]UniqueID
sync.RWMutex
}
func newRootCoordMock() *rootCoordMock {
collectionIDs := make([]UniqueID, 0)
col2partition := make(map[UniqueID][]UniqueID)
return &rootCoordMock{
CollectionIDs: collectionIDs,
Col2partition: col2partition,
}
}
func (rc *rootCoordMock) createCollection(collectionID UniqueID) {
rc.Lock()
defer rc.Unlock()
if _, ok := rc.Col2partition[collectionID]; !ok {
rc.CollectionIDs = append(rc.CollectionIDs, collectionID)
rc.Col2partition[collectionID] = make([]UniqueID, 0)
}
}
func (rc *rootCoordMock) createPartition(collectionID UniqueID, partitionID UniqueID) error {
rc.Lock()
defer rc.Unlock()
if _, ok := rc.Col2partition[collectionID]; ok {
rc.Col2partition[collectionID] = append(rc.Col2partition[collectionID], partitionID)
}
return errors.New("collection not exist")
}
func (rc *rootCoordMock) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
collectionID := in.CollectionID
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
if partitionIDs, ok := rc.Col2partition[collectionID]; ok {
response := &milvuspb.ShowPartitionsResponse{
Status: status,
PartitionIDs: partitionIDs,
}
return response, nil
}
rc.createCollection(collectionID)
rc.createPartition(collectionID, defaultPartitionID)
return &milvuspb.ShowPartitionsResponse{
Status: status,
PartitionIDs: rc.Col2partition[collectionID],
}, nil
}
func (rc *rootCoordMock) ReleaseDQLMessageStream(ctx context.Context, in *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
func (rc *rootCoordMock) DescribeSegment(ctx context.Context, req *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) {
return nil, errors.New("describeSegment fail")
}
type dataCoordMock struct {
types.DataCoord
minioKV kv.BaseKV
collections []UniqueID
col2DmChannels map[UniqueID][]*datapb.VchannelInfo
partitionID2Segment map[UniqueID]UniqueID
Segment2Binlog map[UniqueID][]*datapb.SegmentBinlogs
assignedSegmentID UniqueID
}
func newDataCoordMock(ctx context.Context) (*dataCoordMock, error) {
collectionIDs := make([]UniqueID, 0)
col2DmChannels := make(map[UniqueID][]*datapb.VchannelInfo)
partitionID2Segment := make(map[UniqueID]UniqueID)
segment2Binglog := make(map[UniqueID][]*datapb.SegmentBinlogs)
// create minio client
option := &minioKV.Option{
Address: Params.MinioEndPoint,
AccessKeyID: Params.MinioAccessKeyID,
SecretAccessKeyID: Params.MinioSecretAccessKey,
UseSSL: Params.MinioUseSSLStr,
BucketName: Params.MinioBucketName,
CreateBucket: true,
}
kv, err := minioKV.NewMinIOKV(ctx, option)
if err != nil {
return nil, err
}
return &dataCoordMock{
minioKV: kv,
collections: collectionIDs,
col2DmChannels: col2DmChannels,
partitionID2Segment: partitionID2Segment,
Segment2Binlog: segment2Binglog,
assignedSegmentID: defaultSegmentID,
}, nil
}
func (data *dataCoordMock) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) {
collectionID := req.CollectionID
partitionID := req.PartitionID
if _, ok := data.col2DmChannels[collectionID]; !ok {
segmentID := data.assignedSegmentID
data.partitionID2Segment[partitionID] = segmentID
fieldID2Paths, err := generateInsertBinLog(collectionID, partitionID, segmentID, "queryCoorf-mockDataCoord", data.minioKV)
if err != nil {
return nil, err
}
fieldBinlogs := make([]*datapb.FieldBinlog, 0)
for fieldID, path := range fieldID2Paths {
fieldBinlog := &datapb.FieldBinlog{
FieldID: fieldID,
Binlogs: []string{path},
}
fieldBinlogs = append(fieldBinlogs, fieldBinlog)
}
data.Segment2Binlog[segmentID] = make([]*datapb.SegmentBinlogs, 0)
segmentBinlog := &datapb.SegmentBinlogs{
SegmentID: segmentID,
FieldBinlogs: fieldBinlogs,
}
data.Segment2Binlog[segmentID] = append(data.Segment2Binlog[segmentID], segmentBinlog)
channelInfos := make([]*datapb.VchannelInfo, 0)
data.collections = append(data.collections, collectionID)
collectionName := funcutil.RandomString(8)
for i := 0; i < defaultShardsNum; i++ {
vChannel := fmt.Sprintf("%s_%d_%d_v", collectionName, collectionID, i)
channelInfo := &datapb.VchannelInfo{
CollectionID: collectionID,
ChannelName: vChannel,
SeekPosition: &internalpb.MsgPosition{
ChannelName: vChannel,
},
FlushedSegments: []int64{segmentID},
}
channelInfos = append(channelInfos, channelInfo)
}
data.col2DmChannels[collectionID] = channelInfos
}
segmentID := data.partitionID2Segment[partitionID]
return &datapb.GetRecoveryInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Channels: data.col2DmChannels[collectionID],
Binlogs: data.Segment2Binlog[segmentID],
}, nil
}
type indexCoordMock struct {
types.IndexCoord
}
func newIndexCoordMock() *indexCoordMock {
return &indexCoordMock{}
}
func (c *indexCoordMock) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error) {
return nil, errors.New("get index file path fail")
}
type queryNodeServerMock struct {
querypb.QueryNodeServer
ctx context.Context
cancel context.CancelFunc
queryNode *qn.QueryNode
grpcErrChan chan error
grpcServer *grpc.Server
}
func newQueryNodeServerMock(ctx context.Context) *queryNodeServerMock {
ctx1, cancel := context.WithCancel(ctx)
factory := msgstream.NewPmsFactory()
return &queryNodeServerMock{
ctx: ctx,
cancel: cancel,
queryNode: qn.NewQueryNode(ctx1, factory),
grpcErrChan: make(chan error),
}
}
func (qs *queryNodeServerMock) init() error {
qn.Params.Init()
qn.Params.MetaRootPath = Params.MetaRootPath
qn.Params.QueryNodeIP = funcutil.GetLocalIP()
grpcPort := Params.Port
go func() {
var lis net.Listener
var err error
err = retry.Do(qs.ctx, func() error {
addr := ":" + strconv.Itoa(grpcPort)
lis, err = net.Listen("tcp", addr)
if err == nil {
qn.Params.QueryNodePort = int64(lis.Addr().(*net.TCPAddr).Port)
} else {
// set port=0 to get next available port
grpcPort = 0
}
return err
}, retry.Attempts(10))
if err != nil {
log.Error(err.Error())
}
qs.grpcServer = grpc.NewServer()
querypb.RegisterQueryNodeServer(qs.grpcServer, qs)
if err = qs.grpcServer.Serve(lis); err != nil {
log.Error(err.Error())
}
}()
rootCoord := newRootCoordMock()
indexCoord := newIndexCoordMock()
qs.queryNode.SetRootCoord(rootCoord)
qs.queryNode.SetIndexCoord(indexCoord)
err := qs.queryNode.Init()
if err != nil {
return err
}
if err = qs.queryNode.Register(); err != nil {
return err
}
return nil
}
func (qs *queryNodeServerMock) start() error {
return qs.queryNode.Start()
}
func (qs *queryNodeServerMock) stop() error {
qs.cancel()
if qs.grpcServer != nil {
qs.grpcServer.GracefulStop()
}
err := qs.queryNode.Stop()
if err != nil {
return err
}
return nil
}
func (qs *queryNodeServerMock) run() error {
if err := qs.init(); err != nil {
return err
}
if err := qs.start(); err != nil {
return err
}
return nil
}
func (qs *queryNodeServerMock) AddQueryChannel(ctx context.Context, req *querypb.AddQueryChannelRequest) (*commonpb.Status, error) {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
func (qs *queryNodeServerMock) WatchDmChannels(ctx context.Context, req *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
func (qs *queryNodeServerMock) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) (*commonpb.Status, error) {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
func (qs *queryNodeServerMock) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
func (qs *queryNodeServerMock) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
func (qs *queryNodeServerMock) ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest) (*commonpb.Status, error) {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
func startQueryNodeServer(ctx context.Context) (*queryNodeServerMock, error) {
node := newQueryNodeServerMock(ctx)
err := node.run()
if err != nil {
return nil, err
}
return node, nil
}

View File

@ -14,6 +14,7 @@ package querycoord
import (
"fmt"
"path"
"strconv"
"strings"
"sync"
@ -50,6 +51,13 @@ type ParamTable struct {
EtcdEndpoints []string
MetaRootPath string
KvRootPath string
//--- Minio ---
MinioEndPoint string
MinioAccessKeyID string
MinioSecretAccessKey string
MinioUseSSLStr bool
MinioBucketName string
}
var Params ParamTable
@ -81,6 +89,13 @@ func (p *ParamTable) Init() {
p.initEtcdEndpoints()
p.initMetaRootPath()
p.initKvRootPath()
//--- Minio ----
p.initMinioEndPoint()
p.initMinioAccessKeyID()
p.initMinioSecretAccessKey()
p.initMinioUseSSLStr()
p.initMinioBucketName()
})
}
@ -188,3 +203,47 @@ func (p *ParamTable) initKvRootPath() {
}
p.KvRootPath = path.Join(rootPath, subPath)
}
func (p *ParamTable) initMinioEndPoint() {
url, err := p.Load("_MinioAddress")
if err != nil {
panic(err)
}
p.MinioEndPoint = url
}
func (p *ParamTable) initMinioAccessKeyID() {
id, err := p.Load("minio.accessKeyID")
if err != nil {
panic(err)
}
p.MinioAccessKeyID = id
}
func (p *ParamTable) initMinioSecretAccessKey() {
key, err := p.Load("minio.secretAccessKey")
if err != nil {
panic(err)
}
p.MinioSecretAccessKey = key
}
func (p *ParamTable) initMinioUseSSLStr() {
ssl, err := p.Load("minio.useSSL")
if err != nil {
panic(err)
}
sslBoolean, err := strconv.ParseBool(ssl)
if err != nil {
panic(err)
}
p.MinioUseSSLStr = sslBoolean
}
func (p *ParamTable) initMinioBucketName() {
bucketName, err := p.Load("minio.bucketName")
if err != nil {
panic(err)
}
p.MinioBucketName = bucketName
}

View File

@ -70,6 +70,7 @@ type QueryCoord struct {
// Register register query service at etcd
func (qc *QueryCoord) Register() error {
log.Debug("query coord session info", zap.String("metaPath", Params.MetaRootPath), zap.Strings("etcdEndPoints", Params.EtcdEndpoints), zap.String("address", Params.Address))
qc.session = sessionutil.NewSession(qc.loopCtx, Params.MetaRootPath, Params.EtcdEndpoints)
qc.session.Init(typeutil.QueryCoordRole, Params.Address, true)
Params.NodeID = uint64(qc.session.ServerID)
@ -184,19 +185,16 @@ func (qc *QueryCoord) watchNodeLoop() {
for nodeID, session := range sessionMap {
if _, ok := qc.cluster.nodes[nodeID]; !ok {
serverID := session.ServerID
go func() {
err := qc.cluster.RegisterNode(ctx, session, serverID)
if err != nil {
log.Error("register queryNode error", zap.Any("error", err.Error()))
}
log.Debug("query coordinator", zap.Any("Add QueryNode, session serverID", serverID))
}()
log.Debug("start add a queryNode to cluster", zap.Any("nodeID", serverID))
err := qc.cluster.RegisterNode(ctx, session, serverID)
if err != nil {
log.Error("query node failed to register", zap.Int64("nodeID", serverID), zap.String("error info", err.Error()))
}
}
}
for nodeID := range qc.cluster.nodes {
if _, ok := sessionMap[nodeID]; !ok {
qc.cluster.nodes[nodeID].setNodeState(false)
qc.cluster.nodes[nodeID].client.Stop()
qc.cluster.stopNode(nodeID)
loadBalanceSegment := &querypb.LoadBalanceRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadBalanceSegments,
@ -230,50 +228,43 @@ func (qc *QueryCoord) watchNodeLoop() {
switch event.EventType {
case sessionutil.SessionAddEvent:
serverID := event.Session.ServerID
go func() {
err := qc.cluster.RegisterNode(ctx, event.Session, serverID)
if err != nil {
log.Error(err.Error())
}
log.Debug("query coordinator", zap.Any("Add QueryNode, session serverID", serverID))
}()
log.Debug("start add a queryNode to cluster", zap.Any("nodeID", serverID))
err := qc.cluster.RegisterNode(ctx, event.Session, serverID)
if err != nil {
log.Error("query node failed to register", zap.Int64("nodeID", serverID), zap.String("error info", err.Error()))
}
case sessionutil.SessionDelEvent:
serverID := event.Session.ServerID
if _, ok := qc.cluster.nodes[serverID]; ok {
log.Debug("query coordinator", zap.Any("The QueryNode crashed with ID", serverID))
qc.cluster.nodes[serverID].setNodeState(false)
qc.cluster.nodes[serverID].client.Stop()
loadBalanceSegment := &querypb.LoadBalanceRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadBalanceSegments,
SourceID: qc.session.ServerID,
},
SourceNodeIDs: []int64{serverID},
BalanceReason: querypb.TriggerCondition_nodeDown,
}
loadBalanceTask := &LoadBalanceTask{
BaseTask: BaseTask{
ctx: qc.loopCtx,
Condition: NewTaskCondition(qc.loopCtx),
triggerCondition: querypb.TriggerCondition_nodeDown,
},
LoadBalanceRequest: loadBalanceSegment,
rootCoord: qc.rootCoordClient,
dataCoord: qc.dataCoordClient,
cluster: qc.cluster,
meta: qc.meta,
}
qc.scheduler.Enqueue([]task{loadBalanceTask})
go func() {
err := loadBalanceTask.WaitToFinish()
if err != nil {
log.Error(err.Error())
}
log.Debug("load balance done after queryNode down", zap.Int64s("nodeIDs", loadBalanceTask.SourceNodeIDs))
//TODO::remove nodeInfo and clear etcd
}()
log.Debug("get a del event after queryNode down", zap.Int64("nodeID", serverID))
_, err := qc.cluster.getNodeByID(serverID)
if err != nil {
log.Error("queryNode not exist", zap.Int64("nodeID", serverID))
continue
}
qc.cluster.stopNode(serverID)
loadBalanceSegment := &querypb.LoadBalanceRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadBalanceSegments,
SourceID: qc.session.ServerID,
},
SourceNodeIDs: []int64{serverID},
BalanceReason: querypb.TriggerCondition_nodeDown,
}
loadBalanceTask := &LoadBalanceTask{
BaseTask: BaseTask{
ctx: qc.loopCtx,
Condition: NewTaskCondition(qc.loopCtx),
triggerCondition: querypb.TriggerCondition_nodeDown,
},
LoadBalanceRequest: loadBalanceSegment,
rootCoord: qc.rootCoordClient,
dataCoord: qc.dataCoordClient,
cluster: qc.cluster,
meta: qc.meta,
}
qc.scheduler.Enqueue([]task{loadBalanceTask})
}
}
}

View File

@ -13,40 +13,55 @@ package querycoord
import (
"context"
"math/rand"
"os"
"strconv"
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/querypb"
)
var metaRootPath string
func setup() {
Params.Init()
metaRootPath = Params.MetaRootPath
}
func refreshChannelNames() {
suffix := "-test-query-Coord" + strconv.FormatInt(rand.Int63n(1000000), 10)
Params.StatsChannelName = Params.StatsChannelName + suffix
Params.TimeTickChannelName = Params.TimeTickChannelName + suffix
}
func TestMain(m *testing.M) {
setup()
//refreshChannelNames()
exitCode := m.Run()
os.Exit(exitCode)
}
func TestQueryCoord_Init(t *testing.T) {
ctx := context.Background()
msFactory := msgstream.NewPmsFactory()
service, err := NewQueryCoord(context.Background(), msFactory)
assert.Nil(t, err)
service.Register()
service.Init()
service.Start()
t.Run("Test create channel", func(t *testing.T) {
request := &querypb.CreateQueryChannelRequest{}
response, err := service.CreateQueryChannel(ctx, request)
assert.Nil(t, err)
assert.Equal(t, response.RequestChannel, "query-0")
assert.Equal(t, response.ResultChannel, "queryResult-0")
})
t.Run("Test Get statistics channel", func(t *testing.T) {
response, err := service.GetStatisticsChannel(ctx)
assert.Nil(t, err)
assert.Equal(t, response, "query-node-stats")
assert.Equal(t, response.Value, "query-node-stats")
})
t.Run("Test Get timeTick channel", func(t *testing.T) {
response, err := service.GetTimeTickChannel(ctx)
assert.Nil(t, err)
assert.Equal(t, response, "queryTimeTick")
assert.Equal(t, response.Value, "queryTimeTick")
})
service.Stop()
@ -59,7 +74,7 @@ func TestQueryCoord_Init(t *testing.T) {
// assert.Nil(t, err)
// service.Init()
// service.Start()
// service.SetRootCoord(NewRootCoordMock())
// service.SetRootCoord(newRootCoordMock())
// service.SetDataCoord(NewDataMock())
// registerNodeRequest := &querypb.RegisterNodeRequest{
// Address: &commonpb.Address{},

View File

@ -29,6 +29,8 @@ import (
)
type queryNode struct {
ctx context.Context
cancel context.CancelFunc
id int64
address string
client types.QueryNode
@ -40,28 +42,48 @@ type queryNode struct {
onService bool
}
func newQueryNode(ctx context.Context, address string, id UniqueID, kv *etcdkv.EtcdKV) (*queryNode, error) {
client, err := nodeclient.NewClient(ctx, address)
if err != nil {
return nil, err
}
if err := client.Init(); err != nil {
return nil, err
}
if err := client.Start(); err != nil {
return nil, err
}
func newQueryNode(ctx context.Context, address string, id UniqueID, kv *etcdkv.EtcdKV) *queryNode {
collectionInfo := make(map[UniqueID]*querypb.CollectionInfo)
watchedChannels := make(map[UniqueID]*querypb.QueryChannelInfo)
return &queryNode{
childCtx, cancel := context.WithCancel(ctx)
node := &queryNode{
ctx: childCtx,
cancel: cancel,
id: id,
address: address,
client: client,
kvClient: kv,
collectionInfos: collectionInfo,
watchedQueryChannels: watchedChannels,
onService: true,
}, nil
onService: false,
}
return node
}
func (qn *queryNode) start() error {
client, err := nodeclient.NewClient(qn.ctx, qn.address)
if err != nil {
return err
}
if err = client.Init(); err != nil {
return err
}
if err = client.Start(); err != nil {
return err
}
qn.client = client
qn.onService = true
log.Debug("queryNode client start success", zap.Int64("nodeID", qn.id), zap.String("address", qn.address))
return nil
}
func (qn *queryNode) stop() {
qn.onService = false
if qn.client != nil {
qn.client.Stop()
}
qn.cancel()
}
func (qn *queryNode) hasCollection(collectionID UniqueID) bool {

View File

@ -0,0 +1,154 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querycoord
import (
"context"
"math/rand"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
)
func startQueryCoord(ctx context.Context) (*QueryCoord, error) {
factory := msgstream.NewPmsFactory()
rand.Seed(time.Now().UnixNano())
suffix := "-test-query-Coord" + strconv.FormatInt(rand.Int63(), 10)
Params.MetaRootPath = metaRootPath + suffix
coord, err := NewQueryCoord(ctx, factory)
if err != nil {
return nil, err
}
rootCoord := newRootCoordMock()
rootCoord.createCollection(defaultCollectionID)
rootCoord.createPartition(defaultCollectionID, defaultPartitionID)
dataCoord, err := newDataCoordMock(ctx)
if err != nil {
return nil, err
}
coord.SetRootCoord(rootCoord)
coord.SetDataCoord(dataCoord)
err = coord.Register()
if err != nil {
return nil, err
}
err = coord.Init()
if err != nil {
return nil, err
}
err = coord.Start()
if err != nil {
return nil, err
}
return coord, nil
}
func TestQueryNode_MultiNode_stop(t *testing.T) {
baseCtx := context.Background()
queryCoord, err := startQueryCoord(baseCtx)
assert.Nil(t, err)
queryNode1, err := startQueryNodeServer(baseCtx)
assert.Nil(t, err)
queryNode2, err := startQueryNodeServer(baseCtx)
assert.Nil(t, err)
queryNode3, err := startQueryNodeServer(baseCtx)
assert.Nil(t, err)
queryNode4, err := startQueryNodeServer(baseCtx)
assert.Nil(t, err)
queryNode5, err := startQueryNodeServer(baseCtx)
assert.Nil(t, err)
time.Sleep(2 * time.Second)
queryNode1.stop()
queryNode2.stop()
queryNode3.stop()
queryNode4.stop()
queryCoord.LoadCollection(baseCtx, &querypb.LoadCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadCollection,
},
CollectionID: defaultCollectionID,
Schema: genCollectionSchema(defaultCollectionID, false),
})
time.Sleep(2 * time.Second)
_, err = queryCoord.ReleaseCollection(baseCtx, &querypb.ReleaseCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ReleaseCollection,
},
CollectionID: defaultCollectionID,
})
assert.Nil(t, err)
time.Sleep(2 * time.Second)
queryNode5.stop()
queryCoord.Stop()
}
func TestQueryNode_MultiNode_reStart(t *testing.T) {
baseCtx := context.Background()
queryCoord, err := startQueryCoord(baseCtx)
assert.Nil(t, err)
queryNode1, err := startQueryNodeServer(baseCtx)
assert.Nil(t, err)
queryNode2, err := startQueryNodeServer(baseCtx)
assert.Nil(t, err)
time.Sleep(2 * time.Second)
queryCoord.LoadCollection(baseCtx, &querypb.LoadCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadCollection,
},
CollectionID: defaultCollectionID,
Schema: genCollectionSchema(defaultCollectionID, false),
})
queryNode1.stop()
queryNode2.stop()
queryNode3, err := startQueryNodeServer(baseCtx)
assert.Nil(t, err)
queryNode4, err := startQueryNodeServer(baseCtx)
assert.Nil(t, err)
queryNode5, err := startQueryNodeServer(baseCtx)
assert.Nil(t, err)
time.Sleep(2 * time.Second)
_, err = queryCoord.ReleaseCollection(baseCtx, &querypb.ReleaseCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ReleaseCollection,
},
CollectionID: defaultCollectionID,
})
assert.Nil(t, err)
queryNode3.stop()
queryNode4.stop()
queryNode5.stop()
time.Sleep(2 * time.Second)
queryCoord.Stop()
}

View File

@ -247,8 +247,10 @@ func (lct *LoadCollectionTask) Execute(ctx context.Context) error {
BinlogPaths: segmentBingLog.FieldBinlogs,
}
msgBase := proto.Clone(lct.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_LoadSegments
loadSegmentReq := &querypb.LoadSegmentsRequest{
Base: lct.Base,
Base: msgBase,
Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo},
Schema: lct.Schema,
LoadCondition: querypb.TriggerCondition_grpcRequest,
@ -272,8 +274,10 @@ func (lct *LoadCollectionTask) Execute(ctx context.Context) error {
}
}
if !merged {
msgBase := proto.Clone(lct.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_WatchDmChannels
watchRequest := &querypb.WatchDmChannelsRequest{
Base: lct.Base,
Base: msgBase,
CollectionID: collectionID,
Infos: []*datapb.VchannelInfo{info},
Schema: lct.Schema,
@ -282,8 +286,10 @@ func (lct *LoadCollectionTask) Execute(ctx context.Context) error {
watchDmChannelReqs = append(watchDmChannelReqs, watchRequest)
}
} else {
msgBase := proto.Clone(lct.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_WatchDmChannels
watchRequest := &querypb.WatchDmChannelsRequest{
Base: lct.Base,
Base: msgBase,
CollectionID: collectionID,
PartitionID: partitionID,
Infos: []*datapb.VchannelInfo{info},
@ -543,8 +549,10 @@ func (lpt *LoadPartitionTask) Execute(ctx context.Context) error {
BinlogPaths: segmentBingLog.FieldBinlogs,
}
msgBase := proto.Clone(lpt.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_LoadSegments
loadSegmentReq := &querypb.LoadSegmentsRequest{
Base: lpt.Base,
Base: msgBase,
Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo},
Schema: lpt.Schema,
LoadCondition: querypb.TriggerCondition_grpcRequest,
@ -555,8 +563,10 @@ func (lpt *LoadPartitionTask) Execute(ctx context.Context) error {
for _, info := range recoveryInfo.Channels {
channel := info.ChannelName
msgBase := proto.Clone(lpt.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_WatchDmChannels
watchDmRequest := &querypb.WatchDmChannelsRequest{
Base: lpt.Base,
Base: msgBase,
CollectionID: collectionID,
PartitionID: partitionID,
Infos: []*datapb.VchannelInfo{info},
@ -819,14 +829,12 @@ func (lst *LoadSegmentTask) Reschedule() ([]task, error) {
collectionID := lst.Infos[0].CollectionID
reScheduledTask := make([]task, 0)
for _, info := range lst.Infos {
segmentID := info.SegmentID
segmentIDs = append(segmentIDs, segmentID)
segmentIDs = append(segmentIDs, info.SegmentID)
}
segment2Nodes := shuffleSegmentsToQueryNode(segmentIDs, lst.cluster)
node2segmentInfos := make(map[int64][]*querypb.SegmentLoadInfo)
for _, info := range lst.Infos {
segmentID := info.SegmentID
nodeID := segment2Nodes[segmentID]
for index, info := range lst.Infos {
nodeID := segment2Nodes[index]
if _, ok := node2segmentInfos[nodeID]; !ok {
node2segmentInfos[nodeID] = make([]*querypb.SegmentLoadInfo, 0)
}
@ -835,7 +843,11 @@ func (lst *LoadSegmentTask) Reschedule() ([]task, error) {
for nodeID, infos := range node2segmentInfos {
loadSegmentTask := &LoadSegmentTask{
BaseTask: lst.BaseTask,
BaseTask: BaseTask{
ctx: lst.ctx,
Condition: NewTaskCondition(lst.ctx),
triggerCondition: lst.LoadCondition,
},
LoadSegmentsRequest: &querypb.LoadSegmentsRequest{
Base: lst.Base,
NodeID: nodeID,
@ -853,8 +865,10 @@ func (lst *LoadSegmentTask) Reschedule() ([]task, error) {
if !hasWatchQueryChannel {
queryChannel, queryResultChannel := lst.meta.GetQueryChannel(collectionID)
msgBase := proto.Clone(lst.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_WatchQueryChannels
addQueryChannelRequest := &querypb.AddQueryChannelRequest{
Base: lst.Base,
Base: msgBase,
NodeID: nodeID,
CollectionID: collectionID,
RequestChannelID: queryChannel,
@ -864,7 +878,7 @@ func (lst *LoadSegmentTask) Reschedule() ([]task, error) {
BaseTask: BaseTask{
ctx: lst.ctx,
Condition: NewTaskCondition(lst.ctx),
triggerCondition: querypb.TriggerCondition_grpcRequest,
triggerCondition: lst.LoadCondition,
},
AddQueryChannelRequest: addQueryChannelRequest,
@ -1007,8 +1021,7 @@ func (wdt *WatchDmChannelTask) Reschedule() ([]task, error) {
channelIDs := make([]string, 0)
reScheduledTask := make([]task, 0)
for _, info := range wdt.Infos {
channelID := info.ChannelName
channelIDs = append(channelIDs, channelID)
channelIDs = append(channelIDs, info.ChannelName)
}
channel2Nodes := shuffleChannelsToQueryNode(channelIDs, wdt.cluster)
@ -1023,7 +1036,11 @@ func (wdt *WatchDmChannelTask) Reschedule() ([]task, error) {
for nodeID, infos := range node2channelInfos {
loadSegmentTask := &WatchDmChannelTask{
BaseTask: wdt.BaseTask,
BaseTask: BaseTask{
ctx: wdt.ctx,
Condition: NewTaskCondition(wdt.ctx),
triggerCondition: wdt.triggerCondition,
},
WatchDmChannelsRequest: &querypb.WatchDmChannelsRequest{
Base: wdt.Base,
NodeID: nodeID,
@ -1043,8 +1060,10 @@ func (wdt *WatchDmChannelTask) Reschedule() ([]task, error) {
if !hasWatchQueryChannel {
queryChannel, queryResultChannel := wdt.meta.GetQueryChannel(collectionID)
msgBase := proto.Clone(wdt.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_WatchQueryChannels
addQueryChannelRequest := &querypb.AddQueryChannelRequest{
Base: wdt.Base,
Base: msgBase,
NodeID: nodeID,
CollectionID: collectionID,
RequestChannelID: queryChannel,
@ -1054,7 +1073,7 @@ func (wdt *WatchDmChannelTask) Reschedule() ([]task, error) {
BaseTask: BaseTask{
ctx: wdt.ctx,
Condition: NewTaskCondition(wdt.ctx),
triggerCondition: querypb.TriggerCondition_grpcRequest,
triggerCondition: wdt.triggerCondition,
},
AddQueryChannelRequest: addQueryChannelRequest,
@ -1232,8 +1251,10 @@ func (lbt *LoadBalanceTask) Execute(ctx context.Context) error {
BinlogPaths: segmentBingLog.FieldBinlogs,
}
msgBase := proto.Clone(lbt.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_LoadSegments
loadSegmentReq := &querypb.LoadSegmentsRequest{
Base: lbt.Base,
Base: msgBase,
Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo},
Schema: schema,
LoadCondition: querypb.TriggerCondition_nodeDown,
@ -1258,8 +1279,10 @@ func (lbt *LoadBalanceTask) Execute(ctx context.Context) error {
}
}
if !merged {
msgBase := proto.Clone(lbt.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_WatchDmChannels
watchRequest := &querypb.WatchDmChannelsRequest{
Base: lbt.Base,
Base: msgBase,
CollectionID: collectionID,
Infos: []*datapb.VchannelInfo{channelInfo},
Schema: schema,
@ -1268,8 +1291,10 @@ func (lbt *LoadBalanceTask) Execute(ctx context.Context) error {
watchDmChannelReqs = append(watchDmChannelReqs, watchRequest)
}
} else {
msgBase := proto.Clone(lbt.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_WatchDmChannels
watchRequest := &querypb.WatchDmChannelsRequest{
Base: lbt.Base,
Base: msgBase,
CollectionID: collectionID,
PartitionID: partitionID,
Infos: []*datapb.VchannelInfo{channelInfo},
@ -1478,7 +1503,7 @@ func assignInternalTask(ctx context.Context,
}
segment2Nodes := shuffleSegmentsToQueryNode(segmentsToLoad, cluster)
watchRequest2Nodes := shuffleChannelsToQueryNode(channelsToWatch, cluster)
log.Debug("assignInternalTask: segment to node", zap.Any("segments mao", segment2Nodes), zap.Int64("collectionID", collectionID))
log.Debug("assignInternalTask: segment to node", zap.Any("segments map", segment2Nodes), zap.Int64("collectionID", collectionID))
log.Debug("assignInternalTask: watch request to node", zap.Any("request map", watchRequest2Nodes), zap.Int64("collectionID", collectionID))
watchQueryChannelInfo := make(map[int64]bool)
@ -1518,7 +1543,7 @@ func assignInternalTask(ctx context.Context,
cluster: cluster,
}
parentTask.AddChildTask(loadSegmentTask)
log.Debug("assignInternalTask: add a loadSegmentTask childTask")
log.Debug("assignInternalTask: add a loadSegmentTask childTask", zap.Any("task", loadSegmentTask))
}
for index, nodeID := range watchRequest2Nodes {
@ -1544,8 +1569,10 @@ func assignInternalTask(ctx context.Context,
ctx = opentracing.ContextWithSpan(context.Background(), sp)
queryChannel, queryResultChannel := meta.GetQueryChannel(collectionID)
msgBase := proto.Clone(parentTask.MsgBase()).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_WatchQueryChannels
addQueryChannelRequest := &querypb.AddQueryChannelRequest{
Base: parentTask.MsgBase(),
Base: msgBase,
NodeID: nodeID,
CollectionID: collectionID,
RequestChannelID: queryChannel,

View File

@ -606,6 +606,7 @@ func (scheduler *TaskScheduler) waitActivateTaskDone(wg *sync.WaitGroup, t task)
redoFunc2 := func() {
if t.IsValid() {
log.Debug("waitActivateTaskDone: retry the active task", zap.Int64("taskID", t.ID()))
scheduler.activateTaskChan <- t
wg.Add(1)
go scheduler.waitActivateTaskDone(wg, t)

View File

@ -35,12 +35,11 @@ type historical struct {
func newHistorical(ctx context.Context,
rootCoord types.RootCoord,
dataCoord types.DataCoord,
indexCoord types.IndexCoord,
factory msgstream.Factory,
etcdKV *etcdkv.EtcdKV) *historical {
replica := newCollectionReplica(etcdKV)
loader := newSegmentLoader(ctx, rootCoord, indexCoord, dataCoord, replica, etcdKV)
loader := newSegmentLoader(ctx, rootCoord, indexCoord, replica, etcdKV)
ss := newStatsService(ctx, replica, loader.indexLoader.fieldStatsChan, factory)
return &historical{

View File

@ -35,14 +35,14 @@ func (node *QueryNode) GetComponentStates(ctx context.Context) (*internalpb.Comp
ErrorCode: commonpb.ErrorCode_Success,
},
}
code, ok := node.stateCode.Load().(internalpb.StateCode)
if !ok {
errMsg := "unexpected error in type assertion"
code := node.stateCode.Load().(internalpb.StateCode)
if code != internalpb.StateCode_Healthy {
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeID)
stats.Status = &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: errMsg,
Reason: err.Error(),
}
return stats, errors.New(errMsg)
return stats, err
}
info := &internalpb.ComponentInfo{
NodeID: Params.QueryNodeID,
@ -74,6 +74,15 @@ func (node *QueryNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.Stri
}
func (node *QueryNode) AddQueryChannel(ctx context.Context, in *queryPb.AddQueryChannelRequest) (*commonpb.Status, error) {
code := node.stateCode.Load().(internalpb.StateCode)
if code != internalpb.StateCode_Healthy {
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeID)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
return status, err
}
collectionID := in.CollectionID
if node.queryService == nil {
errMsg := "null query service, collectionID = " + fmt.Sprintln(collectionID)
@ -178,6 +187,15 @@ func (node *QueryNode) RemoveQueryChannel(ctx context.Context, in *queryPb.Remov
}
func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) {
code := node.stateCode.Load().(internalpb.StateCode)
if code != internalpb.StateCode_Healthy {
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeID)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
return status, err
}
dct := &watchDmChannelsTask{
baseTask: baseTask{
ctx: ctx,
@ -214,6 +232,15 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmC
}
func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegmentsRequest) (*commonpb.Status, error) {
code := node.stateCode.Load().(internalpb.StateCode)
if code != internalpb.StateCode_Healthy {
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeID)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
return status, err
}
dct := &loadSegmentsTask{
baseTask: baseTask{
ctx: ctx,
@ -254,6 +281,15 @@ func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegment
}
func (node *QueryNode) ReleaseCollection(ctx context.Context, in *queryPb.ReleaseCollectionRequest) (*commonpb.Status, error) {
code := node.stateCode.Load().(internalpb.StateCode)
if code != internalpb.StateCode_Healthy {
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeID)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
return status, err
}
dct := &releaseCollectionTask{
baseTask: baseTask{
ctx: ctx,
@ -290,6 +326,15 @@ func (node *QueryNode) ReleaseCollection(ctx context.Context, in *queryPb.Releas
}
func (node *QueryNode) ReleasePartitions(ctx context.Context, in *queryPb.ReleasePartitionsRequest) (*commonpb.Status, error) {
code := node.stateCode.Load().(internalpb.StateCode)
if code != internalpb.StateCode_Healthy {
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeID)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
return status, err
}
dct := &releasePartitionsTask{
baseTask: baseTask{
ctx: ctx,
@ -327,6 +372,15 @@ func (node *QueryNode) ReleasePartitions(ctx context.Context, in *queryPb.Releas
// ReleaseSegments deprecated
func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseSegmentsRequest) (*commonpb.Status, error) {
code := node.stateCode.Load().(internalpb.StateCode)
if code != internalpb.StateCode_Healthy {
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeID)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
return status, err
}
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
@ -348,6 +402,17 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseS
}
func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmentInfoRequest) (*queryPb.GetSegmentInfoResponse, error) {
code := node.stateCode.Load().(internalpb.StateCode)
if code != internalpb.StateCode_Healthy {
err := fmt.Errorf("query node %d is not ready", Params.QueryNodeID)
res := &queryPb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
}
return res, err
}
infos := make([]*queryPb.SegmentInfo, 0)
getSegmentInfo := func(segment *Segment) *queryPb.SegmentInfo {
var indexName string

View File

@ -19,7 +19,6 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"
"go.uber.org/zap"
@ -50,49 +49,49 @@ type indexLoader struct {
kv kv.BaseKV // minio kv
}
func (loader *indexLoader) doLoadIndex(wg *sync.WaitGroup) {
collectionIDs, _, segmentIDs := loader.replica.getSegmentsBySegmentType(segmentTypeSealed)
if len(collectionIDs) <= 0 {
wg.Done()
return
}
log.Debug("do load index for sealed segments:", zap.String("segmentIDs", fmt.Sprintln(segmentIDs)))
for i := range collectionIDs {
// we don't need index id yet
segment, err := loader.replica.getSegmentByID(segmentIDs[i])
if err != nil {
log.Warn(err.Error())
continue
}
vecFieldIDs, err := loader.replica.getVecFieldIDsByCollectionID(collectionIDs[i])
if err != nil {
log.Warn(err.Error())
continue
}
for _, fieldID := range vecFieldIDs {
err = loader.setIndexInfo(collectionIDs[i], segment, fieldID)
if err != nil {
log.Warn(err.Error())
continue
}
err = loader.loadIndex(segment, fieldID)
if err != nil {
log.Warn(err.Error())
continue
}
}
}
// sendQueryNodeStats
err := loader.sendQueryNodeStats()
if err != nil {
log.Error(err.Error())
wg.Done()
return
}
wg.Done()
}
//func (loader *indexLoader) doLoadIndex(wg *sync.WaitGroup) {
// collectionIDs, _, segmentIDs := loader.replica.getSegmentsBySegmentType(segmentTypeSealed)
// if len(collectionIDs) <= 0 {
// wg.Done()
// return
// }
// log.Debug("do load index for sealed segments:", zap.String("segmentIDs", fmt.Sprintln(segmentIDs)))
// for i := range collectionIDs {
// // we don't need index id yet
// segment, err := loader.replica.getSegmentByID(segmentIDs[i])
// if err != nil {
// log.Warn(err.Error())
// continue
// }
// vecFieldIDs, err := loader.replica.getVecFieldIDsByCollectionID(collectionIDs[i])
// if err != nil {
// log.Warn(err.Error())
// continue
// }
// for _, fieldID := range vecFieldIDs {
// err = loader.setIndexInfo(collectionIDs[i], segment, fieldID)
// if err != nil {
// log.Warn(err.Error())
// continue
// }
//
// err = loader.loadIndex(segment, fieldID)
// if err != nil {
// log.Warn(err.Error())
// continue
// }
// }
// }
// // sendQueryNodeStats
// err := loader.sendQueryNodeStats()
// if err != nil {
// log.Error(err.Error())
// wg.Done()
// return
// }
//
// wg.Done()
//}
func (loader *indexLoader) loadIndex(segment *Segment, fieldID int64) error {
// 1. use msg's index paths to get index bytes
@ -336,26 +335,26 @@ func (loader *indexLoader) setIndexInfo(collectionID UniqueID, segment *Segment,
return nil
}
func (loader *indexLoader) getIndexPaths(indexBuildID UniqueID) ([]string, error) {
ctx := context.TODO()
if loader.indexCoord == nil {
return nil, errors.New("null index coordinator client")
}
indexFilePathRequest := &indexpb.GetIndexFilePathsRequest{
IndexBuildIDs: []UniqueID{indexBuildID},
}
pathResponse, err := loader.indexCoord.GetIndexFilePaths(ctx, indexFilePathRequest)
if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_Success {
return nil, err
}
if len(pathResponse.FilePaths) <= 0 {
return nil, errors.New("illegal index file paths")
}
return pathResponse.FilePaths[0].IndexFilePaths, nil
}
//func (loader *indexLoader) getIndexPaths(indexBuildID UniqueID) ([]string, error) {
// ctx := context.TODO()
// if loader.indexCoord == nil {
// return nil, errors.New("null index coordinator client")
// }
//
// indexFilePathRequest := &indexpb.GetIndexFilePathsRequest{
// IndexBuildIDs: []UniqueID{indexBuildID},
// }
// pathResponse, err := loader.indexCoord.GetIndexFilePaths(ctx, indexFilePathRequest)
// if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_Success {
// return nil, err
// }
//
// if len(pathResponse.FilePaths) <= 0 {
// return nil, errors.New("illegal index file paths")
// }
//
// return pathResponse.FilePaths[0].IndexFilePaths, nil
//}
func newIndexLoader(ctx context.Context, rootCoord types.RootCoord, indexCoord types.IndexCoord, replica ReplicaInterface) *indexLoader {
option := &minioKV.Option{

View File

@ -27,21 +27,19 @@ import "C"
import (
"context"
"errors"
"math/rand"
"strconv"
"sync/atomic"
"time"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/util/retry"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
@ -50,8 +48,7 @@ type QueryNode struct {
queryNodeLoopCtx context.Context
queryNodeLoopCancel context.CancelFunc
QueryNodeID UniqueID
stateCode atomic.Value
stateCode atomic.Value
// internal components
historical *historical
@ -63,7 +60,6 @@ type QueryNode struct {
// clients
rootCoord types.RootCoord
indexCoord types.IndexCoord
dataCoord types.DataCoord
msFactory msgstream.Factory
scheduler *taskScheduler
@ -74,24 +70,7 @@ type QueryNode struct {
etcdKV *etcdkv.EtcdKV
}
func NewQueryNode(ctx context.Context, queryNodeID UniqueID, factory msgstream.Factory) *QueryNode {
rand.Seed(time.Now().UnixNano())
ctx1, cancel := context.WithCancel(ctx)
node := &QueryNode{
queryNodeLoopCtx: ctx1,
queryNodeLoopCancel: cancel,
QueryNodeID: queryNodeID,
queryService: nil,
msFactory: factory,
}
node.scheduler = newTaskScheduler(ctx1)
node.UpdateStateCode(internalpb.StateCode_Abnormal)
return node
}
func NewQueryNodeWithoutID(ctx context.Context, factory msgstream.Factory) *QueryNode {
func NewQueryNode(ctx context.Context, factory msgstream.Factory) *QueryNode {
ctx1, cancel := context.WithCancel(ctx)
node := &QueryNode{
queryNodeLoopCtx: ctx1,
@ -108,10 +87,12 @@ func NewQueryNodeWithoutID(ctx context.Context, factory msgstream.Factory) *Quer
// Register register query node at etcd
func (node *QueryNode) Register() error {
log.Debug("query node session info", zap.String("metaPath", Params.MetaRootPath), zap.Strings("etcdEndPoints", Params.EtcdEndpoints))
node.session = sessionutil.NewSession(node.queryNodeLoopCtx, Params.MetaRootPath, Params.EtcdEndpoints)
node.session.Init(typeutil.QueryNodeRole, Params.QueryNodeIP+":"+strconv.FormatInt(Params.QueryNodePort, 10), false)
Params.QueryNodeID = node.session.ServerID
log.Debug("query nodeID", zap.Int64("nodeID", Params.QueryNodeID))
log.Debug("query node address", zap.String("address", node.session.Address))
// This param needs valid QueryNodeID
Params.initMsgChannelSubName()
@ -139,7 +120,6 @@ func (node *QueryNode) Init() error {
node.historical = newHistorical(node.queryNodeLoopCtx,
node.rootCoord,
node.dataCoord,
node.indexCoord,
node.msFactory,
node.etcdKV)
@ -155,10 +135,6 @@ func (node *QueryNode) Init() error {
log.Error("null index coordinator detected")
}
if node.dataCoord == nil {
log.Error("null data coordinator detected")
}
return nil
}
@ -228,11 +204,3 @@ func (node *QueryNode) SetIndexCoord(index types.IndexCoord) error {
node.indexCoord = index
return nil
}
func (node *QueryNode) SetDataCoord(data types.DataCoord) error {
if data == nil {
return errors.New("null data coordinator interface")
}
node.dataCoord = data
return nil
}

View File

@ -174,8 +174,8 @@ func newQueryNodeMock() *QueryNode {
if err != nil {
panic(err)
}
svr := NewQueryNode(ctx, Params.QueryNodeID, msFactory)
svr.historical = newHistorical(svr.queryNodeLoopCtx, nil, nil, nil, svr.msFactory, etcdKV)
svr := NewQueryNode(ctx, msFactory)
svr.historical = newHistorical(svr.queryNodeLoopCtx, nil, nil, svr.msFactory, etcdKV)
svr.streaming = newStreaming(ctx, msFactory, etcdKV)
return svr

View File

@ -28,7 +28,6 @@ type queryService struct {
historical *historical
streaming *streaming
queryNodeID UniqueID
queryCollections map[UniqueID]*queryCollection
factory msgstream.Factory
@ -47,7 +46,6 @@ func newQueryService(ctx context.Context,
historical: historical,
streaming: streaming,
queryNodeID: Params.QueryNodeID,
queryCollections: make(map[UniqueID]*queryCollection),
factory: factory,

View File

@ -23,7 +23,6 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
minioKV "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
queryPb "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/storage"
@ -175,25 +174,25 @@ func (loader *segmentLoader) loadSegmentInternal(collectionID UniqueID,
return nil
}
func (loader *segmentLoader) GetSegmentStates(segmentID UniqueID) (*datapb.GetSegmentStatesResponse, error) {
ctx := context.TODO()
if loader.dataCoord == nil {
return nil, errors.New("null data service client")
}
segmentStatesRequest := &datapb.GetSegmentStatesRequest{
SegmentIDs: []int64{segmentID},
}
statesResponse, err := loader.dataCoord.GetSegmentStates(ctx, segmentStatesRequest)
if err != nil || statesResponse.Status.ErrorCode != commonpb.ErrorCode_Success {
return nil, err
}
if len(statesResponse.States) != 1 {
return nil, errors.New("segment states' len should be 1")
}
return statesResponse, nil
}
//func (loader *segmentLoader) GetSegmentStates(segmentID UniqueID) (*datapb.GetSegmentStatesResponse, error) {
// ctx := context.TODO()
// if loader.dataCoord == nil {
// return nil, errors.New("null data service client")
// }
//
// segmentStatesRequest := &datapb.GetSegmentStatesRequest{
// SegmentIDs: []int64{segmentID},
// }
// statesResponse, err := loader.dataCoord.GetSegmentStates(ctx, segmentStatesRequest)
// if err != nil || statesResponse.Status.ErrorCode != commonpb.ErrorCode_Success {
// return nil, err
// }
// if len(statesResponse.States) != 1 {
// return nil, errors.New("segment states' len should be 1")
// }
//
// return statesResponse, nil
//}
func (loader *segmentLoader) filterOutVectorFields(binlogPaths []*datapb.FieldBinlog,
vectorFields []int64) []*datapb.FieldBinlog {
@ -301,7 +300,7 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, binlogPaths
return nil
}
func newSegmentLoader(ctx context.Context, rootCoord types.RootCoord, indexCoord types.IndexCoord, dataCoord types.DataCoord, replica ReplicaInterface, etcdKV *etcdkv.EtcdKV) *segmentLoader {
func newSegmentLoader(ctx context.Context, rootCoord types.RootCoord, indexCoord types.IndexCoord, replica ReplicaInterface, etcdKV *etcdkv.EtcdKV) *segmentLoader {
option := &minioKV.Option{
Address: Params.MinioEndPoint,
AccessKeyID: Params.MinioAccessKeyID,
@ -320,8 +319,6 @@ func newSegmentLoader(ctx context.Context, rootCoord types.RootCoord, indexCoord
return &segmentLoader{
historicalReplica: replica,
dataCoord: dataCoord,
minioKV: client,
etcdKV: etcdKV,