2021-10-25 12:23:16 +00:00
|
|
|
// Licensed to the LF AI & Data foundation under one
|
|
|
|
// or more contributor license agreements. See the NOTICE file
|
|
|
|
// distributed with this work for additional information
|
|
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
|
|
// to you under the Apache License, Version 2.0 (the
|
|
|
|
// "License"); you may not use this file except in compliance
|
2021-04-19 03:35:38 +00:00
|
|
|
// with the License. You may obtain a copy of the License at
|
|
|
|
//
|
2021-10-25 12:23:16 +00:00
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
2021-04-19 03:35:38 +00:00
|
|
|
//
|
2021-10-25 12:23:16 +00:00
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
2021-06-30 02:20:15 +00:00
|
|
|
|
2021-06-22 02:42:07 +00:00
|
|
|
package datacoord
|
2021-01-22 11:43:27 +00:00
|
|
|
|
|
|
|
import (
|
2021-08-27 07:37:56 +00:00
|
|
|
"context"
|
2021-01-22 11:43:27 +00:00
|
|
|
|
2021-04-22 06:45:57 +00:00
|
|
|
"github.com/milvus-io/milvus/internal/log"
|
2022-03-15 13:51:21 +00:00
|
|
|
"github.com/milvus-io/milvus/internal/metrics"
|
2021-04-22 06:45:57 +00:00
|
|
|
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
2022-04-01 03:33:28 +00:00
|
|
|
"go.uber.org/zap"
|
2021-01-22 11:43:27 +00:00
|
|
|
)
|
|
|
|
|
2021-10-14 07:44:34 +00:00
|
|
|
// Cluster provides interfaces to interact with datanode cluster
|
2021-07-12 03:03:52 +00:00
|
|
|
type Cluster struct {
|
2021-10-14 07:44:34 +00:00
|
|
|
sessionManager *SessionManager
|
|
|
|
channelManager *ChannelManager
|
2021-01-22 11:43:27 +00:00
|
|
|
}
|
|
|
|
|
2021-12-01 11:29:31 +00:00
|
|
|
// NewCluster creates a new cluster
|
2021-10-14 07:44:34 +00:00
|
|
|
func NewCluster(sessionManager *SessionManager, channelManager *ChannelManager) *Cluster {
|
2021-07-12 03:03:52 +00:00
|
|
|
c := &Cluster{
|
2021-10-14 07:44:34 +00:00
|
|
|
sessionManager: sessionManager,
|
|
|
|
channelManager: channelManager,
|
2021-07-12 03:03:52 +00:00
|
|
|
}
|
|
|
|
|
2021-10-14 07:44:34 +00:00
|
|
|
return c
|
2021-06-30 02:20:15 +00:00
|
|
|
}
|
|
|
|
|
2022-02-22 05:15:51 +00:00
|
|
|
// Startup inits the cluster with the given data nodes.
|
2022-03-28 14:33:27 +00:00
|
|
|
func (c *Cluster) Startup(ctx context.Context, nodes []*NodeInfo) error {
|
2021-07-12 03:03:52 +00:00
|
|
|
for _, node := range nodes {
|
2021-10-14 07:44:34 +00:00
|
|
|
c.sessionManager.AddSession(node)
|
2021-07-12 03:03:52 +00:00
|
|
|
}
|
2021-10-14 07:44:34 +00:00
|
|
|
currs := make([]int64, 0, len(nodes))
|
2021-07-12 03:03:52 +00:00
|
|
|
for _, node := range nodes {
|
2021-10-14 07:44:34 +00:00
|
|
|
currs = append(currs, node.NodeID)
|
2021-06-28 05:28:14 +00:00
|
|
|
}
|
2022-03-28 14:33:27 +00:00
|
|
|
return c.channelManager.Startup(ctx, currs)
|
2021-05-26 11:06:56 +00:00
|
|
|
}
|
|
|
|
|
2021-10-14 07:44:34 +00:00
|
|
|
// Register registers a new node in cluster
|
|
|
|
func (c *Cluster) Register(node *NodeInfo) error {
|
|
|
|
c.sessionManager.AddSession(node)
|
2022-03-15 13:51:21 +00:00
|
|
|
err := c.channelManager.AddNode(node.NodeID)
|
|
|
|
if err == nil {
|
|
|
|
metrics.DataCoordNumDataNodes.WithLabelValues().Inc()
|
|
|
|
}
|
|
|
|
return err
|
2021-05-26 11:06:56 +00:00
|
|
|
}
|
|
|
|
|
2021-10-14 07:44:34 +00:00
|
|
|
// UnRegister removes a node from cluster
|
|
|
|
func (c *Cluster) UnRegister(node *NodeInfo) error {
|
|
|
|
c.sessionManager.DeleteSession(node)
|
2022-03-15 13:51:21 +00:00
|
|
|
err := c.channelManager.DeleteNode(node.NodeID)
|
|
|
|
if err == nil {
|
|
|
|
metrics.DataCoordNumDataNodes.WithLabelValues().Dec()
|
|
|
|
}
|
2022-03-25 04:09:25 +00:00
|
|
|
return err
|
2021-05-26 11:06:56 +00:00
|
|
|
}
|
|
|
|
|
2021-12-15 03:58:31 +00:00
|
|
|
// Watch tries to add a channel in datanode cluster
|
2021-10-14 07:44:34 +00:00
|
|
|
func (c *Cluster) Watch(ch string, collectionID UniqueID) error {
|
2021-10-26 12:15:41 +00:00
|
|
|
return c.channelManager.Watch(&channel{Name: ch, CollectionID: collectionID})
|
2021-10-14 07:44:34 +00:00
|
|
|
}
|
2021-05-26 11:06:56 +00:00
|
|
|
|
2022-04-25 03:07:47 +00:00
|
|
|
// Flush sends flush requests to corresponding dataNodes according to channels where segments are assigned to.
|
2021-10-20 07:02:36 +00:00
|
|
|
func (c *Cluster) Flush(ctx context.Context, segments []*datapb.SegmentInfo, markSegments []*datapb.SegmentInfo) {
|
2021-10-14 07:44:34 +00:00
|
|
|
channels := c.channelManager.GetChannels()
|
|
|
|
nodeSegments := make(map[int64][]int64)
|
2021-10-20 07:02:36 +00:00
|
|
|
nodeMarks := make(map[int64][]int64)
|
2021-10-14 07:44:34 +00:00
|
|
|
channelNodes := make(map[string]int64)
|
2021-10-20 07:02:36 +00:00
|
|
|
targetNodes := make(map[int64]struct{})
|
2021-10-14 07:44:34 +00:00
|
|
|
// channel -> node
|
|
|
|
for _, c := range channels {
|
|
|
|
for _, ch := range c.Channels {
|
2021-10-26 12:15:41 +00:00
|
|
|
channelNodes[ch.Name] = c.NodeID
|
2021-01-22 11:43:27 +00:00
|
|
|
}
|
|
|
|
}
|
2021-12-30 11:11:30 +00:00
|
|
|
// collectionID shall be the same in single Flush call
|
|
|
|
var collectionID int64
|
2021-10-14 07:44:34 +00:00
|
|
|
// find node on which segment exists
|
|
|
|
for _, segment := range segments {
|
2021-12-30 11:11:30 +00:00
|
|
|
collectionID = segment.CollectionID
|
2021-10-14 07:44:34 +00:00
|
|
|
nodeID, ok := channelNodes[segment.GetInsertChannel()]
|
2021-05-26 11:06:56 +00:00
|
|
|
if !ok {
|
2021-10-14 07:44:34 +00:00
|
|
|
log.Warn("channel is not allocated to any node", zap.String("channel", segment.GetInsertChannel()))
|
2021-05-26 11:06:56 +00:00
|
|
|
continue
|
|
|
|
}
|
2021-10-14 07:44:34 +00:00
|
|
|
nodeSegments[nodeID] = append(nodeSegments[nodeID], segment.GetID())
|
2021-10-20 07:02:36 +00:00
|
|
|
targetNodes[nodeID] = struct{}{}
|
|
|
|
}
|
|
|
|
for _, segment := range markSegments {
|
2021-12-30 11:11:30 +00:00
|
|
|
collectionID = segment.CollectionID
|
2021-10-20 07:02:36 +00:00
|
|
|
nodeID, ok := channelNodes[segment.GetInsertChannel()]
|
|
|
|
if !ok {
|
|
|
|
log.Warn("channel is not allocated to any node", zap.String("channel", segment.GetInsertChannel()))
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
nodeMarks[nodeID] = append(nodeMarks[nodeID], segment.GetID())
|
|
|
|
targetNodes[nodeID] = struct{}{}
|
2021-07-12 03:03:52 +00:00
|
|
|
}
|
|
|
|
|
2021-10-20 07:02:36 +00:00
|
|
|
for nodeID := range targetNodes {
|
|
|
|
segments := nodeSegments[nodeID]
|
|
|
|
marks := nodeMarks[nodeID]
|
|
|
|
if len(segments)+len(marks) == 0 { // no segment for this node
|
|
|
|
continue
|
|
|
|
}
|
2021-10-14 07:44:34 +00:00
|
|
|
req := &datapb.FlushSegmentsRequest{
|
|
|
|
Base: &commonpb.MsgBase{
|
|
|
|
MsgType: commonpb.MsgType_Flush,
|
2022-04-24 14:03:44 +00:00
|
|
|
SourceID: Params.DataCoordCfg.GetNodeID(),
|
2021-10-14 07:44:34 +00:00
|
|
|
},
|
2021-12-30 11:11:30 +00:00
|
|
|
CollectionID: collectionID,
|
2021-10-20 07:02:36 +00:00
|
|
|
SegmentIDs: segments,
|
|
|
|
MarkSegmentIDs: marks,
|
2021-05-26 11:06:56 +00:00
|
|
|
}
|
2022-04-25 03:07:47 +00:00
|
|
|
log.Info("calling dataNode to flush",
|
|
|
|
zap.Int64("dataNode ID", nodeID),
|
|
|
|
zap.Int64s("segments", segments),
|
|
|
|
zap.Int64s("marks", marks))
|
2021-10-14 07:44:34 +00:00
|
|
|
c.sessionManager.Flush(ctx, nodeID, req)
|
2021-01-28 03:24:41 +00:00
|
|
|
}
|
2021-07-12 03:03:52 +00:00
|
|
|
}
|
|
|
|
|
2022-04-01 03:33:28 +00:00
|
|
|
// Import sends import requests to DataNodes whose ID==nodeID.
|
|
|
|
func (c *Cluster) Import(ctx context.Context, nodeID int64, it *datapb.ImportTaskRequest) {
|
|
|
|
c.sessionManager.Import(ctx, nodeID, it)
|
|
|
|
}
|
|
|
|
|
2022-05-25 06:34:00 +00:00
|
|
|
// ReCollectSegmentStats triggers a ReCollectSegmentStats call from session manager.
|
|
|
|
func (c *Cluster) ReCollectSegmentStats(ctx context.Context, nodeID int64) {
|
|
|
|
c.sessionManager.ReCollectSegmentStats(ctx, nodeID)
|
|
|
|
}
|
|
|
|
|
2022-06-02 10:54:04 +00:00
|
|
|
// AddSegment triggers a AddSegment call from session manager.
|
|
|
|
func (c *Cluster) AddSegment(ctx context.Context, nodeID int64, req *datapb.AddSegmentRequest) {
|
|
|
|
c.sessionManager.AddSegment(ctx, nodeID, req)
|
|
|
|
}
|
|
|
|
|
2021-10-14 07:44:34 +00:00
|
|
|
// GetSessions returns all sessions
|
|
|
|
func (c *Cluster) GetSessions() []*Session {
|
|
|
|
return c.sessionManager.GetSessions()
|
2021-07-12 03:03:52 +00:00
|
|
|
}
|
|
|
|
|
2021-10-14 07:44:34 +00:00
|
|
|
// Close releases resources opened in Cluster
|
2021-07-12 03:03:52 +00:00
|
|
|
func (c *Cluster) Close() {
|
2021-10-14 07:44:34 +00:00
|
|
|
c.sessionManager.Close()
|
2021-02-02 10:53:10 +00:00
|
|
|
}
|