2021-06-15 04:41:40 +00:00
|
|
|
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
|
|
|
// with the License. You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
|
|
|
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
|
|
|
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
|
|
|
|
2021-06-22 08:44:09 +00:00
|
|
|
package querycoord
|
2021-06-15 04:41:40 +00:00
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2021-06-19 03:45:09 +00:00
|
|
|
"encoding/json"
|
2021-06-15 04:41:40 +00:00
|
|
|
"errors"
|
|
|
|
"fmt"
|
2021-06-19 03:45:09 +00:00
|
|
|
"path/filepath"
|
|
|
|
"strconv"
|
2021-06-15 04:41:40 +00:00
|
|
|
"sync"
|
|
|
|
|
2021-06-19 03:45:09 +00:00
|
|
|
"github.com/golang/protobuf/proto"
|
2021-06-15 04:41:40 +00:00
|
|
|
"go.uber.org/zap"
|
|
|
|
|
2021-06-19 03:45:09 +00:00
|
|
|
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
2021-06-15 04:41:40 +00:00
|
|
|
"github.com/milvus-io/milvus/internal/log"
|
2021-11-12 10:49:10 +00:00
|
|
|
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
2021-06-15 04:41:40 +00:00
|
|
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
2021-08-26 06:17:54 +00:00
|
|
|
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
2021-06-15 04:41:40 +00:00
|
|
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
2021-06-19 03:45:09 +00:00
|
|
|
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
2021-09-15 12:40:07 +00:00
|
|
|
"github.com/milvus-io/milvus/internal/util/typeutil"
|
2021-06-19 03:45:09 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
2021-06-22 08:44:09 +00:00
|
|
|
queryNodeMetaPrefix = "queryCoord-queryNodeMeta"
|
|
|
|
queryNodeInfoPrefix = "queryCoord-queryNodeInfo"
|
2021-06-15 04:41:40 +00:00
|
|
|
)
|
|
|
|
|
2021-09-27 02:47:57 +00:00
|
|
|
// Cluster manages all query node connections and grpc requests
|
2021-08-02 14:39:25 +00:00
|
|
|
type Cluster interface {
|
|
|
|
reloadFromKV() error
|
|
|
|
getComponentInfos(ctx context.Context) ([]*internalpb.ComponentInfo, error)
|
|
|
|
|
|
|
|
loadSegments(ctx context.Context, nodeID int64, in *querypb.LoadSegmentsRequest) error
|
|
|
|
releaseSegments(ctx context.Context, nodeID int64, in *querypb.ReleaseSegmentsRequest) error
|
|
|
|
getNumSegments(nodeID int64) (int, error)
|
|
|
|
|
|
|
|
watchDmChannels(ctx context.Context, nodeID int64, in *querypb.WatchDmChannelsRequest) error
|
2021-11-05 06:47:19 +00:00
|
|
|
watchDeltaChannels(ctx context.Context, nodeID int64, in *querypb.WatchDeltaChannelsRequest) error
|
2021-09-15 12:40:07 +00:00
|
|
|
//TODO:: removeDmChannel
|
2021-08-02 14:39:25 +00:00
|
|
|
getNumDmChannels(nodeID int64) (int, error)
|
|
|
|
|
|
|
|
hasWatchedQueryChannel(ctx context.Context, nodeID int64, collectionID UniqueID) bool
|
2021-11-13 00:49:08 +00:00
|
|
|
hasWatchedDeltaChannel(ctx context.Context, nodeID int64, collectionID UniqueID) bool
|
2021-08-02 14:39:25 +00:00
|
|
|
getCollectionInfosByID(ctx context.Context, nodeID int64) []*querypb.CollectionInfo
|
|
|
|
addQueryChannel(ctx context.Context, nodeID int64, in *querypb.AddQueryChannelRequest) error
|
|
|
|
removeQueryChannel(ctx context.Context, nodeID int64, in *querypb.RemoveQueryChannelRequest) error
|
|
|
|
releaseCollection(ctx context.Context, nodeID int64, in *querypb.ReleaseCollectionRequest) error
|
|
|
|
releasePartitions(ctx context.Context, nodeID int64, in *querypb.ReleasePartitionsRequest) error
|
|
|
|
getSegmentInfo(ctx context.Context, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error)
|
2021-11-05 08:00:55 +00:00
|
|
|
getSegmentInfoByNode(ctx context.Context, nodeID int64, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error)
|
2021-11-12 10:49:10 +00:00
|
|
|
getSegmentInfoByID(ctx context.Context, segmentID UniqueID) (*querypb.SegmentInfo, error)
|
2021-08-02 14:39:25 +00:00
|
|
|
|
2021-09-15 12:40:07 +00:00
|
|
|
registerNode(ctx context.Context, session *sessionutil.Session, id UniqueID, state nodeState) error
|
2021-11-05 08:00:55 +00:00
|
|
|
getNodeInfoByID(nodeID int64) (Node, error)
|
2021-08-02 14:39:25 +00:00
|
|
|
removeNodeInfo(nodeID int64) error
|
|
|
|
stopNode(nodeID int64)
|
2021-09-15 12:40:07 +00:00
|
|
|
onlineNodes() (map[int64]Node, error)
|
|
|
|
isOnline(nodeID int64) (bool, error)
|
|
|
|
offlineNodes() (map[int64]Node, error)
|
2021-11-05 08:00:55 +00:00
|
|
|
hasNode(nodeID int64) bool
|
2021-08-02 14:39:25 +00:00
|
|
|
|
2021-11-12 10:49:10 +00:00
|
|
|
allocateSegmentsToQueryNode(ctx context.Context, reqs []*querypb.LoadSegmentsRequest, wait bool, excludeNodeIDs []int64, includeNodeIDs []int64) error
|
2021-11-11 04:56:42 +00:00
|
|
|
allocateChannelsToQueryNode(ctx context.Context, reqs []*querypb.WatchDmChannelsRequest, wait bool, excludeNodeIDs []int64) error
|
|
|
|
|
2021-09-15 12:40:07 +00:00
|
|
|
getSessionVersion() int64
|
|
|
|
|
|
|
|
getMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) []queryNodeGetMetricsResponse
|
2021-08-02 14:39:25 +00:00
|
|
|
}
|
|
|
|
|
2021-08-26 06:17:54 +00:00
|
|
|
type newQueryNodeFn func(ctx context.Context, address string, id UniqueID, kv *etcdkv.EtcdKV) (Node, error)
|
|
|
|
|
2021-09-15 12:40:07 +00:00
|
|
|
type nodeState int
|
|
|
|
|
|
|
|
const (
|
|
|
|
disConnect nodeState = 0
|
|
|
|
online nodeState = 1
|
|
|
|
offline nodeState = 2
|
|
|
|
)
|
|
|
|
|
2021-06-15 04:41:40 +00:00
|
|
|
type queryNodeCluster struct {
|
2021-09-15 12:40:07 +00:00
|
|
|
ctx context.Context
|
|
|
|
cancel context.CancelFunc
|
2021-06-19 03:45:09 +00:00
|
|
|
client *etcdkv.EtcdKV
|
|
|
|
|
2021-09-15 12:40:07 +00:00
|
|
|
session *sessionutil.Session
|
|
|
|
sessionVersion int64
|
|
|
|
|
2021-06-15 04:41:40 +00:00
|
|
|
sync.RWMutex
|
2021-11-11 04:56:42 +00:00
|
|
|
clusterMeta Meta
|
|
|
|
nodes map[int64]Node
|
|
|
|
newNodeFn newQueryNodeFn
|
|
|
|
segmentAllocator SegmentAllocatePolicy
|
|
|
|
channelAllocator ChannelAllocatePolicy
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
|
|
|
|
2021-11-05 08:00:55 +00:00
|
|
|
func newQueryNodeCluster(ctx context.Context, clusterMeta Meta, kv *etcdkv.EtcdKV, newNodeFn newQueryNodeFn, session *sessionutil.Session) (Cluster, error) {
|
2021-09-15 12:40:07 +00:00
|
|
|
childCtx, cancel := context.WithCancel(ctx)
|
2021-08-02 14:39:25 +00:00
|
|
|
nodes := make(map[int64]Node)
|
2021-06-19 03:45:09 +00:00
|
|
|
c := &queryNodeCluster{
|
2021-11-11 04:56:42 +00:00
|
|
|
ctx: childCtx,
|
|
|
|
cancel: cancel,
|
|
|
|
client: kv,
|
|
|
|
session: session,
|
|
|
|
clusterMeta: clusterMeta,
|
|
|
|
nodes: nodes,
|
|
|
|
newNodeFn: newNodeFn,
|
|
|
|
segmentAllocator: defaultSegAllocatePolicy(),
|
|
|
|
channelAllocator: defaultChannelAllocatePolicy(),
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
2021-06-19 03:45:09 +00:00
|
|
|
err := c.reloadFromKV()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return c, nil
|
|
|
|
}
|
|
|
|
|
2021-09-23 13:03:55 +00:00
|
|
|
// Reload trigger task, trigger task states, internal task, internal task state from etcd
|
|
|
|
// Assign the internal task to the corresponding trigger task as a child task
|
2021-06-19 03:45:09 +00:00
|
|
|
func (c *queryNodeCluster) reloadFromKV() error {
|
2021-09-15 12:40:07 +00:00
|
|
|
toLoadMetaNodeIDs := make([]int64, 0)
|
|
|
|
// get current online session
|
|
|
|
onlineNodeSessions, version, _ := c.session.GetSessions(typeutil.QueryNodeRole)
|
|
|
|
onlineSessionMap := make(map[int64]*sessionutil.Session)
|
|
|
|
for _, session := range onlineNodeSessions {
|
|
|
|
nodeID := session.ServerID
|
|
|
|
onlineSessionMap[nodeID] = session
|
2021-06-19 03:45:09 +00:00
|
|
|
}
|
2021-09-15 12:40:07 +00:00
|
|
|
for nodeID, session := range onlineSessionMap {
|
|
|
|
log.Debug("ReloadFromKV: register a queryNode to cluster", zap.Any("nodeID", nodeID))
|
|
|
|
err := c.registerNode(c.ctx, session, nodeID, disConnect)
|
2021-06-19 03:45:09 +00:00
|
|
|
if err != nil {
|
2021-09-15 12:40:07 +00:00
|
|
|
log.Error("query node failed to register", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
|
2021-06-19 03:45:09 +00:00
|
|
|
return err
|
|
|
|
}
|
2021-09-15 12:40:07 +00:00
|
|
|
toLoadMetaNodeIDs = append(toLoadMetaNodeIDs, nodeID)
|
|
|
|
}
|
|
|
|
c.sessionVersion = version
|
2021-06-26 14:28:10 +00:00
|
|
|
|
2021-09-15 12:40:07 +00:00
|
|
|
// load node information before power off from etcd
|
|
|
|
oldStringNodeIDs, oldNodeSessions, err := c.client.LoadWithPrefix(queryNodeInfoPrefix)
|
|
|
|
if err != nil {
|
|
|
|
log.Error("reloadFromKV: get previous node info from etcd error", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
for index := range oldStringNodeIDs {
|
|
|
|
nodeID, err := strconv.ParseInt(filepath.Base(oldStringNodeIDs[index]), 10, 64)
|
2021-06-19 03:45:09 +00:00
|
|
|
if err != nil {
|
2021-09-15 12:40:07 +00:00
|
|
|
log.Error("WatchNodeLoop: parse nodeID error", zap.Error(err))
|
2021-06-19 03:45:09 +00:00
|
|
|
return err
|
|
|
|
}
|
2021-09-15 12:40:07 +00:00
|
|
|
if _, ok := onlineSessionMap[nodeID]; !ok {
|
|
|
|
session := &sessionutil.Session{}
|
|
|
|
err = json.Unmarshal([]byte(oldNodeSessions[index]), session)
|
|
|
|
if err != nil {
|
|
|
|
log.Error("WatchNodeLoop: unmarshal session error", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
err = c.registerNode(context.Background(), session, nodeID, offline)
|
|
|
|
if err != nil {
|
|
|
|
log.Debug("ReloadFromKV: failed to add queryNode to cluster", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
toLoadMetaNodeIDs = append(toLoadMetaNodeIDs, nodeID)
|
2021-06-19 03:45:09 +00:00
|
|
|
}
|
|
|
|
}
|
2021-09-15 12:40:07 +00:00
|
|
|
|
|
|
|
// load collection meta of queryNode from etcd
|
|
|
|
for _, nodeID := range toLoadMetaNodeIDs {
|
2021-06-19 03:45:09 +00:00
|
|
|
infoPrefix := fmt.Sprintf("%s/%d", queryNodeMetaPrefix, nodeID)
|
2021-08-02 14:39:25 +00:00
|
|
|
_, collectionValues, err := c.client.LoadWithPrefix(infoPrefix)
|
2021-06-19 03:45:09 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-08-02 14:39:25 +00:00
|
|
|
for _, value := range collectionValues {
|
|
|
|
collectionInfo := &querypb.CollectionInfo{}
|
2021-09-29 12:26:00 +00:00
|
|
|
err = proto.Unmarshal([]byte(value), collectionInfo)
|
2021-06-19 03:45:09 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-08-02 14:39:25 +00:00
|
|
|
err = c.nodes[nodeID].setCollectionInfo(collectionInfo)
|
2021-06-19 03:45:09 +00:00
|
|
|
if err != nil {
|
2021-08-02 14:39:25 +00:00
|
|
|
log.Debug("ReloadFromKV: failed to add queryNode meta to cluster", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
|
2021-06-19 03:45:09 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
|
|
|
|
2021-09-15 12:40:07 +00:00
|
|
|
func (c *queryNodeCluster) getSessionVersion() int64 {
|
|
|
|
return c.sessionVersion
|
|
|
|
}
|
|
|
|
|
2021-08-02 14:39:25 +00:00
|
|
|
func (c *queryNodeCluster) getComponentInfos(ctx context.Context) ([]*internalpb.ComponentInfo, error) {
|
2021-06-15 04:41:40 +00:00
|
|
|
c.RLock()
|
|
|
|
defer c.RUnlock()
|
|
|
|
subComponentInfos := make([]*internalpb.ComponentInfo, 0)
|
2021-09-15 12:40:07 +00:00
|
|
|
nodes, err := c.getOnlineNodes()
|
2021-06-19 03:45:09 +00:00
|
|
|
if err != nil {
|
2021-08-02 14:39:25 +00:00
|
|
|
log.Debug("GetComponentInfos: failed get on service nodes", zap.String("error info", err.Error()))
|
2021-06-19 03:45:09 +00:00
|
|
|
return nil, err
|
|
|
|
}
|
2021-08-02 14:39:25 +00:00
|
|
|
for _, node := range nodes {
|
|
|
|
componentState := node.getComponentInfo(ctx)
|
|
|
|
subComponentInfos = append(subComponentInfos, componentState)
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
|
|
|
|
2021-06-19 03:45:09 +00:00
|
|
|
return subComponentInfos, nil
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
|
|
|
|
2021-08-02 14:39:25 +00:00
|
|
|
func (c *queryNodeCluster) loadSegments(ctx context.Context, nodeID int64, in *querypb.LoadSegmentsRequest) error {
|
2021-06-15 04:41:40 +00:00
|
|
|
c.Lock()
|
|
|
|
defer c.Unlock()
|
2021-06-19 03:45:09 +00:00
|
|
|
|
2021-06-15 04:41:40 +00:00
|
|
|
if node, ok := c.nodes[nodeID]; ok {
|
2021-08-02 14:39:25 +00:00
|
|
|
err := node.loadSegments(ctx, in)
|
|
|
|
if err != nil {
|
2021-11-16 01:31:24 +00:00
|
|
|
log.Debug("loadSegments: queryNode load segments error", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
|
2021-08-02 14:39:25 +00:00
|
|
|
return err
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
2021-10-22 11:07:15 +00:00
|
|
|
|
2021-08-02 14:39:25 +00:00
|
|
|
return nil
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
2021-11-16 01:31:24 +00:00
|
|
|
return fmt.Errorf("loadSegments: Can't find query node by nodeID, nodeID = %d", nodeID)
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
|
|
|
|
2021-08-02 14:39:25 +00:00
|
|
|
func (c *queryNodeCluster) releaseSegments(ctx context.Context, nodeID int64, in *querypb.ReleaseSegmentsRequest) error {
|
2021-06-15 04:41:40 +00:00
|
|
|
c.Lock()
|
|
|
|
defer c.Unlock()
|
|
|
|
|
|
|
|
if node, ok := c.nodes[nodeID]; ok {
|
2021-09-15 12:40:07 +00:00
|
|
|
if !node.isOnline() {
|
2021-08-02 14:39:25 +00:00
|
|
|
return errors.New("node offline")
|
2021-06-19 03:45:09 +00:00
|
|
|
}
|
2021-08-02 14:39:25 +00:00
|
|
|
|
|
|
|
err := node.releaseSegments(ctx, in)
|
|
|
|
if err != nil {
|
2021-11-16 01:33:20 +00:00
|
|
|
log.Debug("releaseSegments: queryNode release segments error", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
|
2021-08-02 14:39:25 +00:00
|
|
|
return err
|
2021-06-19 03:45:09 +00:00
|
|
|
}
|
2021-08-02 14:39:25 +00:00
|
|
|
|
|
|
|
return nil
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
|
|
|
|
2021-11-16 01:33:20 +00:00
|
|
|
return fmt.Errorf("releaseSegments: Can't find query node by nodeID, nodeID = %d", nodeID)
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
|
|
|
|
2021-08-02 14:39:25 +00:00
|
|
|
func (c *queryNodeCluster) watchDmChannels(ctx context.Context, nodeID int64, in *querypb.WatchDmChannelsRequest) error {
|
2021-06-15 04:41:40 +00:00
|
|
|
c.Lock()
|
|
|
|
defer c.Unlock()
|
2021-06-19 03:45:09 +00:00
|
|
|
|
2021-06-15 04:41:40 +00:00
|
|
|
if node, ok := c.nodes[nodeID]; ok {
|
2021-08-02 14:39:25 +00:00
|
|
|
err := node.watchDmChannels(ctx, in)
|
|
|
|
if err != nil {
|
2021-11-16 01:35:14 +00:00
|
|
|
log.Debug("watchDmChannels: queryNode watch dm channel error", zap.String("error", err.Error()))
|
2021-08-02 14:39:25 +00:00
|
|
|
return err
|
2021-06-19 03:45:09 +00:00
|
|
|
}
|
2021-06-15 04:41:40 +00:00
|
|
|
channels := make([]string, 0)
|
|
|
|
for _, info := range in.Infos {
|
|
|
|
channels = append(channels, info.ChannelName)
|
|
|
|
}
|
2021-06-19 03:45:09 +00:00
|
|
|
|
2021-08-02 14:39:25 +00:00
|
|
|
collectionID := in.CollectionID
|
|
|
|
err = c.clusterMeta.addDmChannel(collectionID, nodeID, channels)
|
|
|
|
if err != nil {
|
2021-11-16 01:35:14 +00:00
|
|
|
log.Debug("watchDmChannels: queryNode watch dm channel error", zap.String("error", err.Error()))
|
2021-08-02 14:39:25 +00:00
|
|
|
return err
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
2021-08-02 14:39:25 +00:00
|
|
|
|
|
|
|
return nil
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
2021-11-16 01:35:14 +00:00
|
|
|
return fmt.Errorf("watchDmChannels: Can't find query node by nodeID, nodeID = %d", nodeID)
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
|
|
|
|
2021-11-05 06:47:19 +00:00
|
|
|
func (c *queryNodeCluster) watchDeltaChannels(ctx context.Context, nodeID int64, in *querypb.WatchDeltaChannelsRequest) error {
|
|
|
|
c.Lock()
|
|
|
|
defer c.Unlock()
|
|
|
|
|
|
|
|
if node, ok := c.nodes[nodeID]; ok {
|
|
|
|
err := node.watchDeltaChannels(ctx, in)
|
|
|
|
if err != nil {
|
2021-11-16 13:49:25 +00:00
|
|
|
log.Debug("watchDeltaChannels: queryNode watch dm channel error", zap.String("error", err.Error()))
|
2021-11-05 06:47:19 +00:00
|
|
|
return err
|
|
|
|
}
|
2021-11-13 00:49:08 +00:00
|
|
|
err = c.clusterMeta.setDeltaChannel(in.CollectionID, in.Infos)
|
|
|
|
if err != nil {
|
2021-11-16 13:49:25 +00:00
|
|
|
log.Debug("watchDeltaChannels: queryNode watch delta channel error", zap.String("error", err.Error()))
|
2021-11-13 00:49:08 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2021-11-05 06:47:19 +00:00
|
|
|
return nil
|
|
|
|
}
|
2021-11-16 13:49:25 +00:00
|
|
|
return errors.New("watchDeltaChannels: Can't find query node by nodeID ")
|
2021-11-05 06:47:19 +00:00
|
|
|
}
|
|
|
|
|
2021-11-13 00:49:08 +00:00
|
|
|
func (c *queryNodeCluster) hasWatchedDeltaChannel(ctx context.Context, nodeID int64, collectionID UniqueID) bool {
|
|
|
|
c.Lock()
|
|
|
|
defer c.Unlock()
|
|
|
|
|
|
|
|
return c.nodes[nodeID].hasWatchedDeltaChannel(collectionID)
|
|
|
|
}
|
|
|
|
|
2021-06-15 04:41:40 +00:00
|
|
|
func (c *queryNodeCluster) hasWatchedQueryChannel(ctx context.Context, nodeID int64, collectionID UniqueID) bool {
|
|
|
|
c.Lock()
|
|
|
|
defer c.Unlock()
|
|
|
|
|
|
|
|
return c.nodes[nodeID].hasWatchedQueryChannel(collectionID)
|
|
|
|
}
|
|
|
|
|
2021-08-02 14:39:25 +00:00
|
|
|
func (c *queryNodeCluster) addQueryChannel(ctx context.Context, nodeID int64, in *querypb.AddQueryChannelRequest) error {
|
2021-06-15 04:41:40 +00:00
|
|
|
c.Lock()
|
|
|
|
defer c.Unlock()
|
|
|
|
if node, ok := c.nodes[nodeID]; ok {
|
2021-08-02 14:39:25 +00:00
|
|
|
err := node.addQueryChannel(ctx, in)
|
|
|
|
if err != nil {
|
2021-11-16 13:51:18 +00:00
|
|
|
log.Debug("addQueryChannel: queryNode add query channel error", zap.String("error", err.Error()))
|
2021-08-02 14:39:25 +00:00
|
|
|
return err
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
2021-08-02 14:39:25 +00:00
|
|
|
return nil
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
|
|
|
|
2021-11-16 13:51:18 +00:00
|
|
|
return fmt.Errorf("addQueryChannel: can't find query node by nodeID, nodeID = %d", nodeID)
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
2021-08-02 14:39:25 +00:00
|
|
|
func (c *queryNodeCluster) removeQueryChannel(ctx context.Context, nodeID int64, in *querypb.RemoveQueryChannelRequest) error {
|
2021-06-15 04:41:40 +00:00
|
|
|
c.Lock()
|
|
|
|
defer c.Unlock()
|
|
|
|
|
|
|
|
if node, ok := c.nodes[nodeID]; ok {
|
2021-08-02 14:39:25 +00:00
|
|
|
err := node.removeQueryChannel(ctx, in)
|
|
|
|
if err != nil {
|
|
|
|
log.Debug("RemoveQueryChannel: queryNode remove query channel error", zap.String("error", err.Error()))
|
|
|
|
return err
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
2021-08-02 14:39:25 +00:00
|
|
|
|
|
|
|
return nil
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
|
|
|
|
2021-11-05 08:00:55 +00:00
|
|
|
return fmt.Errorf("RemoveQueryChannel: can't find query node by nodeID, nodeID = %d", nodeID)
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
|
|
|
|
2021-08-02 14:39:25 +00:00
|
|
|
func (c *queryNodeCluster) releaseCollection(ctx context.Context, nodeID int64, in *querypb.ReleaseCollectionRequest) error {
|
2021-06-15 04:41:40 +00:00
|
|
|
c.Lock()
|
|
|
|
defer c.Unlock()
|
|
|
|
|
|
|
|
if node, ok := c.nodes[nodeID]; ok {
|
2021-08-02 14:39:25 +00:00
|
|
|
err := node.releaseCollection(ctx, in)
|
|
|
|
if err != nil {
|
|
|
|
log.Debug("ReleaseCollection: queryNode release collection error", zap.String("error", err.Error()))
|
|
|
|
return err
|
2021-07-13 06:16:00 +00:00
|
|
|
}
|
2021-08-02 14:39:25 +00:00
|
|
|
err = c.clusterMeta.releaseCollection(in.CollectionID)
|
|
|
|
if err != nil {
|
|
|
|
log.Debug("ReleaseCollection: meta release collection error", zap.String("error", err.Error()))
|
|
|
|
return err
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
2021-08-02 14:39:25 +00:00
|
|
|
return nil
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
|
|
|
|
2021-11-05 08:00:55 +00:00
|
|
|
return fmt.Errorf("ReleaseCollection: can't find query node by nodeID, nodeID = %d", nodeID)
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
|
|
|
|
2021-08-02 14:39:25 +00:00
|
|
|
func (c *queryNodeCluster) releasePartitions(ctx context.Context, nodeID int64, in *querypb.ReleasePartitionsRequest) error {
|
2021-06-15 04:41:40 +00:00
|
|
|
c.Lock()
|
|
|
|
defer c.Unlock()
|
|
|
|
|
|
|
|
if node, ok := c.nodes[nodeID]; ok {
|
2021-08-02 14:39:25 +00:00
|
|
|
err := node.releasePartitions(ctx, in)
|
|
|
|
if err != nil {
|
|
|
|
log.Debug("ReleasePartitions: queryNode release partitions error", zap.String("error", err.Error()))
|
|
|
|
return err
|
2021-07-13 06:16:00 +00:00
|
|
|
}
|
2021-10-11 01:54:37 +00:00
|
|
|
|
2021-08-02 14:39:25 +00:00
|
|
|
for _, partitionID := range in.PartitionIDs {
|
|
|
|
err = c.clusterMeta.releasePartition(in.CollectionID, partitionID)
|
|
|
|
if err != nil {
|
|
|
|
log.Debug("ReleasePartitions: meta release partitions error", zap.String("error", err.Error()))
|
|
|
|
return err
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
|
|
|
}
|
2021-08-02 14:39:25 +00:00
|
|
|
return nil
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
|
|
|
|
2021-11-05 08:00:55 +00:00
|
|
|
return fmt.Errorf("ReleasePartitions: can't find query node by nodeID, nodeID = %d", nodeID)
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
|
|
|
|
2021-11-12 10:49:10 +00:00
|
|
|
func (c *queryNodeCluster) getSegmentInfoByID(ctx context.Context, segmentID UniqueID) (*querypb.SegmentInfo, error) {
|
|
|
|
c.RLock()
|
|
|
|
defer c.RUnlock()
|
|
|
|
|
|
|
|
segmentInfo, err := c.clusterMeta.getSegmentInfoByID(segmentID)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if node, ok := c.nodes[segmentInfo.NodeID]; ok {
|
|
|
|
res, err := node.getSegmentInfo(ctx, &querypb.GetSegmentInfoRequest{
|
|
|
|
Base: &commonpb.MsgBase{
|
|
|
|
MsgType: commonpb.MsgType_SegmentInfo,
|
|
|
|
},
|
|
|
|
CollectionID: segmentInfo.CollectionID,
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if res != nil {
|
|
|
|
for _, info := range res.Infos {
|
|
|
|
if info.SegmentID == segmentID {
|
|
|
|
return info, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil, fmt.Errorf("updateSegmentInfo: can't find segment %d on query node %d", segmentID, segmentInfo.NodeID)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil, fmt.Errorf("updateSegmentInfo: can't find query node by nodeID, nodeID = %d", segmentInfo.NodeID)
|
|
|
|
}
|
|
|
|
|
2021-06-15 04:41:40 +00:00
|
|
|
func (c *queryNodeCluster) getSegmentInfo(ctx context.Context, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error) {
|
2021-07-13 06:16:00 +00:00
|
|
|
c.RLock()
|
|
|
|
defer c.RUnlock()
|
2021-06-15 04:41:40 +00:00
|
|
|
|
|
|
|
segmentInfos := make([]*querypb.SegmentInfo, 0)
|
2021-08-02 14:39:25 +00:00
|
|
|
for _, node := range c.nodes {
|
|
|
|
res, err := node.getSegmentInfo(ctx, in)
|
2021-06-15 04:41:40 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2021-08-02 14:39:25 +00:00
|
|
|
if res != nil {
|
|
|
|
segmentInfos = append(segmentInfos, res.Infos...)
|
|
|
|
}
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
|
|
|
|
2021-08-02 14:39:25 +00:00
|
|
|
//TODO::update meta
|
2021-06-15 04:41:40 +00:00
|
|
|
return segmentInfos, nil
|
|
|
|
}
|
|
|
|
|
2021-11-05 08:00:55 +00:00
|
|
|
func (c *queryNodeCluster) getSegmentInfoByNode(ctx context.Context, nodeID int64, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error) {
|
|
|
|
c.RLock()
|
|
|
|
defer c.RUnlock()
|
|
|
|
|
|
|
|
if node, ok := c.nodes[nodeID]; ok {
|
|
|
|
res, err := node.getSegmentInfo(ctx, in)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return res.Infos, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil, fmt.Errorf("ReleasePartitions: can't find query node by nodeID, nodeID = %d", nodeID)
|
|
|
|
}
|
|
|
|
|
2021-08-17 02:06:11 +00:00
|
|
|
type queryNodeGetMetricsResponse struct {
|
|
|
|
resp *milvuspb.GetMetricsResponse
|
|
|
|
err error
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *queryNodeCluster) getMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) []queryNodeGetMetricsResponse {
|
|
|
|
c.RLock()
|
|
|
|
defer c.RUnlock()
|
|
|
|
|
|
|
|
ret := make([]queryNodeGetMetricsResponse, 0, len(c.nodes))
|
|
|
|
for _, node := range c.nodes {
|
|
|
|
resp, err := node.getMetrics(ctx, in)
|
|
|
|
ret = append(ret, queryNodeGetMetricsResponse{
|
|
|
|
resp: resp,
|
|
|
|
err: err,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
return ret
|
|
|
|
}
|
|
|
|
|
2021-06-15 04:41:40 +00:00
|
|
|
func (c *queryNodeCluster) getNumDmChannels(nodeID int64) (int, error) {
|
2021-07-13 06:16:00 +00:00
|
|
|
c.RLock()
|
|
|
|
defer c.RUnlock()
|
2021-06-15 04:41:40 +00:00
|
|
|
|
|
|
|
if _, ok := c.nodes[nodeID]; !ok {
|
2021-11-05 08:00:55 +00:00
|
|
|
return 0, fmt.Errorf("GetNumDmChannels: Can't find query node by nodeID, nodeID = %d", nodeID)
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
numChannel := 0
|
2021-08-02 14:39:25 +00:00
|
|
|
collectionInfos := c.clusterMeta.showCollections()
|
|
|
|
for _, info := range collectionInfos {
|
2021-06-15 04:41:40 +00:00
|
|
|
for _, channelInfo := range info.ChannelInfos {
|
|
|
|
if channelInfo.NodeIDLoaded == nodeID {
|
|
|
|
numChannel++
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return numChannel, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *queryNodeCluster) getNumSegments(nodeID int64) (int, error) {
|
2021-07-13 06:16:00 +00:00
|
|
|
c.RLock()
|
|
|
|
defer c.RUnlock()
|
2021-06-15 04:41:40 +00:00
|
|
|
|
|
|
|
if _, ok := c.nodes[nodeID]; !ok {
|
2021-11-05 08:00:55 +00:00
|
|
|
return 0, fmt.Errorf("getNumSegments: Can't find query node by nodeID, nodeID = %d", nodeID)
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
numSegment := 0
|
2021-08-02 14:39:25 +00:00
|
|
|
segmentInfos := make([]*querypb.SegmentInfo, 0)
|
|
|
|
collectionInfos := c.clusterMeta.showCollections()
|
|
|
|
for _, info := range collectionInfos {
|
|
|
|
res := c.clusterMeta.showSegmentInfos(info.CollectionID, nil)
|
|
|
|
segmentInfos = append(segmentInfos, res...)
|
|
|
|
}
|
|
|
|
for _, info := range segmentInfos {
|
2021-06-15 04:41:40 +00:00
|
|
|
if info.NodeID == nodeID {
|
|
|
|
numSegment++
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return numSegment, nil
|
|
|
|
}
|
|
|
|
|
2021-09-15 12:40:07 +00:00
|
|
|
func (c *queryNodeCluster) registerNode(ctx context.Context, session *sessionutil.Session, id UniqueID, state nodeState) error {
|
2021-06-19 03:45:09 +00:00
|
|
|
c.Lock()
|
|
|
|
defer c.Unlock()
|
|
|
|
|
2021-06-15 04:41:40 +00:00
|
|
|
if _, ok := c.nodes[id]; !ok {
|
2021-07-13 06:16:00 +00:00
|
|
|
sessionJSON, err := json.Marshal(session)
|
|
|
|
if err != nil {
|
2021-08-02 14:39:25 +00:00
|
|
|
log.Debug("RegisterNode: marshal session error", zap.Int64("nodeID", id), zap.Any("address", session))
|
2021-07-13 06:16:00 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
key := fmt.Sprintf("%s/%d", queryNodeInfoPrefix, id)
|
|
|
|
err = c.client.Save(key, string(sessionJSON))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-09-15 12:40:07 +00:00
|
|
|
node, err := c.newNodeFn(ctx, session.Address, id, c.client)
|
2021-08-26 06:17:54 +00:00
|
|
|
if err != nil {
|
|
|
|
log.Debug("RegisterNode: create a new query node failed", zap.Int64("nodeID", id), zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
2021-09-15 12:40:07 +00:00
|
|
|
node.setState(state)
|
|
|
|
if state < online {
|
|
|
|
go node.start()
|
|
|
|
}
|
|
|
|
c.nodes[id] = node
|
2021-07-13 06:16:00 +00:00
|
|
|
log.Debug("RegisterNode: create a new query node", zap.Int64("nodeID", id), zap.String("address", session.Address))
|
2021-06-15 04:41:40 +00:00
|
|
|
return nil
|
|
|
|
}
|
2021-08-02 14:39:25 +00:00
|
|
|
return fmt.Errorf("RegisterNode: node %d alredy exists in cluster", id)
|
2021-06-15 04:41:40 +00:00
|
|
|
}
|
2021-06-19 03:45:09 +00:00
|
|
|
|
2021-11-05 08:00:55 +00:00
|
|
|
func (c *queryNodeCluster) getNodeInfoByID(nodeID int64) (Node, error) {
|
2021-06-30 09:48:19 +00:00
|
|
|
c.RLock()
|
|
|
|
defer c.RUnlock()
|
|
|
|
|
|
|
|
if node, ok := c.nodes[nodeID]; ok {
|
2021-11-05 08:00:55 +00:00
|
|
|
nodeInfo, err := node.getNodeInfo()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return nodeInfo, nil
|
2021-06-30 09:48:19 +00:00
|
|
|
}
|
|
|
|
|
2021-08-02 14:39:25 +00:00
|
|
|
return nil, fmt.Errorf("GetNodeByID: query node %d not exist", nodeID)
|
2021-06-30 09:48:19 +00:00
|
|
|
}
|
|
|
|
|
2021-06-19 03:45:09 +00:00
|
|
|
func (c *queryNodeCluster) removeNodeInfo(nodeID int64) error {
|
2021-06-22 06:10:09 +00:00
|
|
|
c.Lock()
|
|
|
|
defer c.Unlock()
|
|
|
|
|
2021-06-19 03:45:09 +00:00
|
|
|
key := fmt.Sprintf("%s/%d", queryNodeInfoPrefix, nodeID)
|
2021-06-22 06:10:09 +00:00
|
|
|
err := c.client.Remove(key)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2021-07-13 06:16:00 +00:00
|
|
|
if _, ok := c.nodes[nodeID]; ok {
|
|
|
|
err = c.nodes[nodeID].clearNodeInfo()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
delete(c.nodes, nodeID)
|
2021-08-02 14:39:25 +00:00
|
|
|
log.Debug("RemoveNodeInfo: delete nodeInfo in cluster MetaReplica and etcd", zap.Int64("nodeID", nodeID))
|
2021-06-22 06:10:09 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
2021-06-19 03:45:09 +00:00
|
|
|
}
|
|
|
|
|
2021-07-13 06:16:00 +00:00
|
|
|
func (c *queryNodeCluster) stopNode(nodeID int64) {
|
2021-08-26 06:17:54 +00:00
|
|
|
c.Lock()
|
|
|
|
defer c.Unlock()
|
|
|
|
|
2021-07-13 06:16:00 +00:00
|
|
|
if node, ok := c.nodes[nodeID]; ok {
|
|
|
|
node.stop()
|
2021-08-02 14:39:25 +00:00
|
|
|
log.Debug("StopNode: queryNode offline", zap.Int64("nodeID", nodeID))
|
2021-07-13 06:16:00 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-15 12:40:07 +00:00
|
|
|
func (c *queryNodeCluster) onlineNodes() (map[int64]Node, error) {
|
2021-06-30 09:48:19 +00:00
|
|
|
c.RLock()
|
|
|
|
defer c.RUnlock()
|
2021-06-19 03:45:09 +00:00
|
|
|
|
2021-09-15 12:40:07 +00:00
|
|
|
return c.getOnlineNodes()
|
2021-06-19 03:45:09 +00:00
|
|
|
}
|
|
|
|
|
2021-09-15 12:40:07 +00:00
|
|
|
func (c *queryNodeCluster) getOnlineNodes() (map[int64]Node, error) {
|
2021-08-02 14:39:25 +00:00
|
|
|
nodes := make(map[int64]Node)
|
2021-06-19 03:45:09 +00:00
|
|
|
for nodeID, node := range c.nodes {
|
2021-09-15 12:40:07 +00:00
|
|
|
if node.isOnline() {
|
2021-06-30 09:48:19 +00:00
|
|
|
nodes[nodeID] = node
|
2021-06-19 03:45:09 +00:00
|
|
|
}
|
|
|
|
}
|
2021-06-30 09:48:19 +00:00
|
|
|
if len(nodes) == 0 {
|
2021-09-15 12:40:07 +00:00
|
|
|
return nil, errors.New("GetOnlineNodes: no queryNode is alive")
|
2021-06-19 03:45:09 +00:00
|
|
|
}
|
|
|
|
|
2021-06-30 09:48:19 +00:00
|
|
|
return nodes, nil
|
|
|
|
}
|
|
|
|
|
2021-09-15 12:40:07 +00:00
|
|
|
func (c *queryNodeCluster) offlineNodes() (map[int64]Node, error) {
|
|
|
|
c.RLock()
|
|
|
|
defer c.RUnlock()
|
|
|
|
|
|
|
|
return c.getOfflineNodes()
|
|
|
|
}
|
|
|
|
|
2021-11-05 08:00:55 +00:00
|
|
|
func (c *queryNodeCluster) hasNode(nodeID int64) bool {
|
|
|
|
c.RLock()
|
|
|
|
defer c.RUnlock()
|
|
|
|
|
|
|
|
if _, ok := c.nodes[nodeID]; ok {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2021-09-15 12:40:07 +00:00
|
|
|
func (c *queryNodeCluster) getOfflineNodes() (map[int64]Node, error) {
|
|
|
|
nodes := make(map[int64]Node)
|
|
|
|
for nodeID, node := range c.nodes {
|
|
|
|
if node.isOffline() {
|
|
|
|
nodes[nodeID] = node
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if len(nodes) == 0 {
|
|
|
|
return nil, errors.New("GetOfflineNodes: no queryNode is offline")
|
|
|
|
}
|
|
|
|
|
|
|
|
return nodes, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *queryNodeCluster) isOnline(nodeID int64) (bool, error) {
|
2021-06-30 09:48:19 +00:00
|
|
|
c.Lock()
|
|
|
|
defer c.Unlock()
|
|
|
|
|
|
|
|
if node, ok := c.nodes[nodeID]; ok {
|
2021-09-15 12:40:07 +00:00
|
|
|
return node.isOnline(), nil
|
2021-06-30 09:48:19 +00:00
|
|
|
}
|
|
|
|
|
2021-11-05 08:00:55 +00:00
|
|
|
return false, fmt.Errorf("IsOnline: query node %d not exist", nodeID)
|
2021-06-19 03:45:09 +00:00
|
|
|
}
|
|
|
|
|
2021-09-15 12:40:07 +00:00
|
|
|
//func (c *queryNodeCluster) printMeta() {
|
|
|
|
// c.RLock()
|
|
|
|
// defer c.RUnlock()
|
|
|
|
//
|
|
|
|
// for id, node := range c.nodes {
|
|
|
|
// if node.isOnline() {
|
|
|
|
// collectionInfos := node.showCollections()
|
|
|
|
// for _, info := range collectionInfos {
|
|
|
|
// log.Debug("PrintMeta: query coordinator cluster info: collectionInfo", zap.Int64("nodeID", id), zap.Int64("collectionID", info.CollectionID), zap.Any("info", info))
|
|
|
|
// }
|
|
|
|
//
|
|
|
|
// queryChannelInfos := node.showWatchedQueryChannels()
|
|
|
|
// for _, info := range queryChannelInfos {
|
|
|
|
// log.Debug("PrintMeta: query coordinator cluster info: watchedQueryChannelInfo", zap.Int64("nodeID", id), zap.Int64("collectionID", info.CollectionID), zap.Any("info", info))
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
//}
|
2021-08-02 14:39:25 +00:00
|
|
|
|
|
|
|
func (c *queryNodeCluster) getCollectionInfosByID(ctx context.Context, nodeID int64) []*querypb.CollectionInfo {
|
|
|
|
c.RLock()
|
|
|
|
defer c.RUnlock()
|
|
|
|
if node, ok := c.nodes[nodeID]; ok {
|
|
|
|
return node.showCollections()
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
2021-11-11 04:56:42 +00:00
|
|
|
|
2021-11-12 10:49:10 +00:00
|
|
|
func (c *queryNodeCluster) allocateSegmentsToQueryNode(ctx context.Context, reqs []*querypb.LoadSegmentsRequest, wait bool, excludeNodeIDs []int64, includeNodeIDs []int64) error {
|
|
|
|
return c.segmentAllocator(ctx, reqs, c, wait, excludeNodeIDs, includeNodeIDs)
|
2021-11-11 04:56:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (c *queryNodeCluster) allocateChannelsToQueryNode(ctx context.Context, reqs []*querypb.WatchDmChannelsRequest, wait bool, excludeNodeIDs []int64) error {
|
|
|
|
return c.channelAllocator(ctx, reqs, c, wait, excludeNodeIDs)
|
|
|
|
}
|