milvus/internal/querycoord/cluster.go

549 lines
16 KiB
Go
Raw Normal View History

// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querycoord
import (
"context"
"encoding/json"
"errors"
"fmt"
"path/filepath"
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/sessionutil"
)
const (
queryNodeMetaPrefix = "queryCoord-queryNodeMeta"
queryNodeInfoPrefix = "queryCoord-queryNodeInfo"
)
type Cluster interface {
reloadFromKV() error
getComponentInfos(ctx context.Context) ([]*internalpb.ComponentInfo, error)
loadSegments(ctx context.Context, nodeID int64, in *querypb.LoadSegmentsRequest) error
releaseSegments(ctx context.Context, nodeID int64, in *querypb.ReleaseSegmentsRequest) error
getNumSegments(nodeID int64) (int, error)
watchDmChannels(ctx context.Context, nodeID int64, in *querypb.WatchDmChannelsRequest) error
getNumDmChannels(nodeID int64) (int, error)
hasWatchedQueryChannel(ctx context.Context, nodeID int64, collectionID UniqueID) bool
getCollectionInfosByID(ctx context.Context, nodeID int64) []*querypb.CollectionInfo
addQueryChannel(ctx context.Context, nodeID int64, in *querypb.AddQueryChannelRequest) error
removeQueryChannel(ctx context.Context, nodeID int64, in *querypb.RemoveQueryChannelRequest) error
releaseCollection(ctx context.Context, nodeID int64, in *querypb.ReleaseCollectionRequest) error
releasePartitions(ctx context.Context, nodeID int64, in *querypb.ReleasePartitionsRequest) error
getSegmentInfo(ctx context.Context, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error)
registerNode(ctx context.Context, session *sessionutil.Session, id UniqueID) error
getNodeByID(nodeID int64) (Node, error)
removeNodeInfo(nodeID int64) error
stopNode(nodeID int64)
onServiceNodes() (map[int64]Node, error)
isOnService(nodeID int64) (bool, error)
printMeta()
}
type queryNodeCluster struct {
client *etcdkv.EtcdKV
sync.RWMutex
clusterMeta Meta
nodes map[int64]Node
}
func newQueryNodeCluster(clusterMeta Meta, kv *etcdkv.EtcdKV) (*queryNodeCluster, error) {
nodes := make(map[int64]Node)
c := &queryNodeCluster{
client: kv,
clusterMeta: clusterMeta,
nodes: nodes,
}
err := c.reloadFromKV()
if err != nil {
return nil, err
}
return c, nil
}
func (c *queryNodeCluster) reloadFromKV() error {
nodeIDs := make([]UniqueID, 0)
keys, values, err := c.client.LoadWithPrefix(queryNodeInfoPrefix)
if err != nil {
return err
}
for index := range keys {
nodeID, err := strconv.ParseInt(filepath.Base(keys[index]), 10, 64)
if err != nil {
return err
}
session := &sessionutil.Session{}
err = json.Unmarshal([]byte(values[index]), session)
if err != nil {
return err
}
err = c.registerNode(context.Background(), session, nodeID)
if err != nil {
log.Debug("ReloadFromKV: failed to add queryNode to cluster", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
continue
}
nodeIDs = append(nodeIDs, nodeID)
}
for _, nodeID := range nodeIDs {
infoPrefix := fmt.Sprintf("%s/%d", queryNodeMetaPrefix, nodeID)
_, collectionValues, err := c.client.LoadWithPrefix(infoPrefix)
if err != nil {
return err
}
for _, value := range collectionValues {
collectionInfo := &querypb.CollectionInfo{}
err = proto.UnmarshalText(value, collectionInfo)
if err != nil {
return err
}
err = c.nodes[nodeID].setCollectionInfo(collectionInfo)
if err != nil {
log.Debug("ReloadFromKV: failed to add queryNode meta to cluster", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
return err
}
}
}
return nil
}
func (c *queryNodeCluster) getComponentInfos(ctx context.Context) ([]*internalpb.ComponentInfo, error) {
c.RLock()
defer c.RUnlock()
subComponentInfos := make([]*internalpb.ComponentInfo, 0)
nodes, err := c.getOnServiceNodes()
if err != nil {
log.Debug("GetComponentInfos: failed get on service nodes", zap.String("error info", err.Error()))
return nil, err
}
for _, node := range nodes {
componentState := node.getComponentInfo(ctx)
subComponentInfos = append(subComponentInfos, componentState)
}
return subComponentInfos, nil
}
func (c *queryNodeCluster) loadSegments(ctx context.Context, nodeID int64, in *querypb.LoadSegmentsRequest) error {
c.Lock()
defer c.Unlock()
if node, ok := c.nodes[nodeID]; ok {
segmentInfos := make(map[UniqueID]*querypb.SegmentInfo)
for _, info := range in.Infos {
segmentID := info.SegmentID
segmentInfo, err := c.clusterMeta.getSegmentInfoByID(segmentID)
if err == nil {
segmentInfos[segmentID] = proto.Clone(segmentInfo).(*querypb.SegmentInfo)
if in.LoadCondition != querypb.TriggerCondition_loadBalance {
segmentInfo.SegmentState = querypb.SegmentState_sealing
segmentInfo.NodeID = nodeID
}
} else {
segmentInfo = &querypb.SegmentInfo{
SegmentID: segmentID,
CollectionID: info.CollectionID,
PartitionID: info.PartitionID,
NodeID: nodeID,
SegmentState: querypb.SegmentState_sealing,
}
}
c.clusterMeta.setSegmentInfo(segmentID, segmentInfo)
}
err := node.loadSegments(ctx, in)
if err != nil {
for _, info := range in.Infos {
segmentID := info.SegmentID
if _, ok = segmentInfos[segmentID]; ok {
c.clusterMeta.setSegmentInfo(segmentID, segmentInfos[segmentID])
continue
}
c.clusterMeta.deleteSegmentInfoByID(segmentID)
}
log.Debug("LoadSegments: queryNode load segments error", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
return err
}
return nil
}
return errors.New("LoadSegments: Can't find query node by nodeID ")
}
func (c *queryNodeCluster) releaseSegments(ctx context.Context, nodeID int64, in *querypb.ReleaseSegmentsRequest) error {
c.Lock()
defer c.Unlock()
if node, ok := c.nodes[nodeID]; ok {
if !node.isOnService() {
return errors.New("node offline")
}
err := node.releaseSegments(ctx, in)
if err != nil {
log.Debug("ReleaseSegments: queryNode release segments error", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
return err
}
for _, segmentID := range in.SegmentIDs {
c.clusterMeta.deleteSegmentInfoByID(segmentID)
}
return nil
}
return errors.New("ReleaseSegments: Can't find query node by nodeID ")
}
func (c *queryNodeCluster) watchDmChannels(ctx context.Context, nodeID int64, in *querypb.WatchDmChannelsRequest) error {
c.Lock()
defer c.Unlock()
if node, ok := c.nodes[nodeID]; ok {
err := node.watchDmChannels(ctx, in)
if err != nil {
log.Debug("WatchDmChannels: queryNode watch dm channel error", zap.String("error", err.Error()))
return err
}
channels := make([]string, 0)
for _, info := range in.Infos {
channels = append(channels, info.ChannelName)
}
collectionID := in.CollectionID
//c.clusterMeta.addCollection(collectionID, in.Schema)
err = c.clusterMeta.addDmChannel(collectionID, nodeID, channels)
if err != nil {
log.Debug("WatchDmChannels: queryNode watch dm channel error", zap.String("error", err.Error()))
return err
}
return nil
}
return errors.New("WatchDmChannels: Can't find query node by nodeID ")
}
func (c *queryNodeCluster) hasWatchedQueryChannel(ctx context.Context, nodeID int64, collectionID UniqueID) bool {
c.Lock()
defer c.Unlock()
return c.nodes[nodeID].hasWatchedQueryChannel(collectionID)
}
func (c *queryNodeCluster) addQueryChannel(ctx context.Context, nodeID int64, in *querypb.AddQueryChannelRequest) error {
c.Lock()
defer c.Unlock()
if node, ok := c.nodes[nodeID]; ok {
err := node.addQueryChannel(ctx, in)
if err != nil {
log.Debug("AddQueryChannel: queryNode add query channel error", zap.String("error", err.Error()))
return err
}
return nil
}
return errors.New("AddQueryChannel: can't find query node by nodeID")
}
func (c *queryNodeCluster) removeQueryChannel(ctx context.Context, nodeID int64, in *querypb.RemoveQueryChannelRequest) error {
c.Lock()
defer c.Unlock()
if node, ok := c.nodes[nodeID]; ok {
err := node.removeQueryChannel(ctx, in)
if err != nil {
log.Debug("RemoveQueryChannel: queryNode remove query channel error", zap.String("error", err.Error()))
return err
}
return nil
}
return errors.New("RemoveQueryChannel: can't find query node by nodeID")
}
func (c *queryNodeCluster) releaseCollection(ctx context.Context, nodeID int64, in *querypb.ReleaseCollectionRequest) error {
c.Lock()
defer c.Unlock()
if node, ok := c.nodes[nodeID]; ok {
err := node.releaseCollection(ctx, in)
if err != nil {
log.Debug("ReleaseCollection: queryNode release collection error", zap.String("error", err.Error()))
return err
}
err = c.clusterMeta.releaseCollection(in.CollectionID)
if err != nil {
log.Debug("ReleaseCollection: meta release collection error", zap.String("error", err.Error()))
return err
}
return nil
}
return errors.New("ReleaseCollection: can't find query node by nodeID")
}
func (c *queryNodeCluster) releasePartitions(ctx context.Context, nodeID int64, in *querypb.ReleasePartitionsRequest) error {
c.Lock()
defer c.Unlock()
if node, ok := c.nodes[nodeID]; ok {
err := node.releasePartitions(ctx, in)
if err != nil {
log.Debug("ReleasePartitions: queryNode release partitions error", zap.String("error", err.Error()))
return err
}
for _, partitionID := range in.PartitionIDs {
err = c.clusterMeta.releasePartition(in.CollectionID, partitionID)
if err != nil {
log.Debug("ReleasePartitions: meta release partitions error", zap.String("error", err.Error()))
return err
}
}
return nil
}
return errors.New("ReleasePartitions: can't find query node by nodeID")
}
func (c *queryNodeCluster) getSegmentInfo(ctx context.Context, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error) {
c.RLock()
defer c.RUnlock()
segmentInfos := make([]*querypb.SegmentInfo, 0)
for _, node := range c.nodes {
res, err := node.getSegmentInfo(ctx, in)
if err != nil {
return nil, err
}
if res != nil {
segmentInfos = append(segmentInfos, res.Infos...)
}
}
//TODO::update meta
return segmentInfos, nil
}
type queryNodeGetMetricsResponse struct {
resp *milvuspb.GetMetricsResponse
err error
}
func (c *queryNodeCluster) getMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) []queryNodeGetMetricsResponse {
c.RLock()
defer c.RUnlock()
ret := make([]queryNodeGetMetricsResponse, 0, len(c.nodes))
for _, node := range c.nodes {
resp, err := node.getMetrics(ctx, in)
ret = append(ret, queryNodeGetMetricsResponse{
resp: resp,
err: err,
})
}
return ret
}
func (c *queryNodeCluster) getNumDmChannels(nodeID int64) (int, error) {
c.RLock()
defer c.RUnlock()
if _, ok := c.nodes[nodeID]; !ok {
return 0, errors.New("GetNumDmChannels: Can't find query node by nodeID ")
}
numChannel := 0
collectionInfos := c.clusterMeta.showCollections()
for _, info := range collectionInfos {
for _, channelInfo := range info.ChannelInfos {
if channelInfo.NodeIDLoaded == nodeID {
numChannel++
}
}
}
return numChannel, nil
}
func (c *queryNodeCluster) getNumSegments(nodeID int64) (int, error) {
c.RLock()
defer c.RUnlock()
if _, ok := c.nodes[nodeID]; !ok {
return 0, errors.New("getNumSegments: Can't find query node by nodeID ")
}
numSegment := 0
segmentInfos := make([]*querypb.SegmentInfo, 0)
collectionInfos := c.clusterMeta.showCollections()
for _, info := range collectionInfos {
res := c.clusterMeta.showSegmentInfos(info.CollectionID, nil)
segmentInfos = append(segmentInfos, res...)
}
for _, info := range segmentInfos {
if info.NodeID == nodeID {
numSegment++
}
}
return numSegment, nil
}
func (c *queryNodeCluster) registerNode(ctx context.Context, session *sessionutil.Session, id UniqueID) error {
c.Lock()
defer c.Unlock()
if _, ok := c.nodes[id]; !ok {
sessionJSON, err := json.Marshal(session)
if err != nil {
log.Debug("RegisterNode: marshal session error", zap.Int64("nodeID", id), zap.Any("address", session))
return err
}
key := fmt.Sprintf("%s/%d", queryNodeInfoPrefix, id)
err = c.client.Save(key, string(sessionJSON))
if err != nil {
return err
}
c.nodes[id] = newQueryNode(ctx, session.Address, id, c.client)
log.Debug("RegisterNode: create a new query node", zap.Int64("nodeID", id), zap.String("address", session.Address))
go func() {
err = c.nodes[id].start()
if err != nil {
log.Error("RegisterNode: start queryNode client failed", zap.Int64("nodeID", id), zap.String("error", err.Error()))
return
}
log.Debug("RegisterNode: start queryNode success, print cluster MetaReplica info", zap.Int64("nodeID", id))
c.printMeta()
}()
return nil
}
return fmt.Errorf("RegisterNode: node %d alredy exists in cluster", id)
}
func (c *queryNodeCluster) getNodeByID(nodeID int64) (Node, error) {
c.RLock()
defer c.RUnlock()
if node, ok := c.nodes[nodeID]; ok {
return node, nil
}
return nil, fmt.Errorf("GetNodeByID: query node %d not exist", nodeID)
}
func (c *queryNodeCluster) removeNodeInfo(nodeID int64) error {
c.Lock()
defer c.Unlock()
key := fmt.Sprintf("%s/%d", queryNodeInfoPrefix, nodeID)
err := c.client.Remove(key)
if err != nil {
return err
}
if _, ok := c.nodes[nodeID]; ok {
err = c.nodes[nodeID].clearNodeInfo()
if err != nil {
return err
}
delete(c.nodes, nodeID)
log.Debug("RemoveNodeInfo: delete nodeInfo in cluster MetaReplica and etcd", zap.Int64("nodeID", nodeID))
}
return nil
}
func (c *queryNodeCluster) stopNode(nodeID int64) {
if node, ok := c.nodes[nodeID]; ok {
node.stop()
log.Debug("StopNode: queryNode offline", zap.Int64("nodeID", nodeID))
}
}
func (c *queryNodeCluster) onServiceNodes() (map[int64]Node, error) {
c.RLock()
defer c.RUnlock()
return c.getOnServiceNodes()
}
func (c *queryNodeCluster) getOnServiceNodes() (map[int64]Node, error) {
nodes := make(map[int64]Node)
for nodeID, node := range c.nodes {
if node.isOnService() {
nodes[nodeID] = node
}
}
if len(nodes) == 0 {
return nil, errors.New("GetOnServiceNodes: no queryNode is alive")
}
return nodes, nil
}
func (c *queryNodeCluster) isOnService(nodeID int64) (bool, error) {
c.Lock()
defer c.Unlock()
if node, ok := c.nodes[nodeID]; ok {
return node.isOnService(), nil
}
return false, fmt.Errorf("IsOnService: query node %d not exist", nodeID)
}
func (c *queryNodeCluster) printMeta() {
c.RLock()
defer c.RUnlock()
for id, node := range c.nodes {
if node.isOnService() {
collectionInfos := node.showCollections()
for _, info := range collectionInfos {
log.Debug("PrintMeta: query coordinator cluster info: collectionInfo", zap.Int64("nodeID", id), zap.Int64("collectionID", info.CollectionID), zap.Any("info", info))
}
queryChannelInfos := node.showWatchedQueryChannels()
for _, info := range queryChannelInfos {
log.Debug("PrintMeta: query coordinator cluster info: watchedQueryChannelInfo", zap.Int64("nodeID", id), zap.Int64("collectionID", info.CollectionID), zap.Any("info", info))
}
}
}
}
func (c *queryNodeCluster) getCollectionInfosByID(ctx context.Context, nodeID int64) []*querypb.CollectionInfo {
c.RLock()
defer c.RUnlock()
if node, ok := c.nodes[nodeID]; ok {
return node.showCollections()
}
return nil
}