mirror of https://github.com/milvus-io/milvus.git
Add Comment for datacoord cluster (#7676)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/7696/head
parent
ba9b1e8881
commit
d3be316bc2
|
@ -8,6 +8,7 @@
|
|||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
package datacoord
|
||||
|
||||
import (
|
||||
|
|
|
@ -28,38 +28,55 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// clusterPrefix const for kv prefix storing DataNodeInfo
|
||||
const clusterPrefix = "cluster-prefix/"
|
||||
|
||||
// clusterBuffer const for kv key storing buffer channels(no assigned ones)
|
||||
const clusterBuffer = "cluster-buffer"
|
||||
|
||||
// nodeEventChBufferSize magic number for Event Channel buffer size
|
||||
const nodeEventChBufferSize = 1024
|
||||
|
||||
// eventTimeout magic number for event timeout
|
||||
const eventTimeout = 5 * time.Second
|
||||
|
||||
// EventType enum for events
|
||||
type EventType int
|
||||
|
||||
const (
|
||||
Register EventType = 1
|
||||
UnRegister EventType = 2
|
||||
WatchChannel EventType = 3
|
||||
// Register EventType const for data node registration
|
||||
Register EventType = 1
|
||||
// UnRegister EventType const for data node unregistration
|
||||
UnRegister EventType = 2
|
||||
// WatchChannel EventType const for a channel needs to be watched
|
||||
WatchChannel EventType = 3
|
||||
// FlushSegments EventType const for flush specified segments
|
||||
FlushSegments EventType = 4
|
||||
)
|
||||
|
||||
// NodeEventType enum for node events
|
||||
type NodeEventType int
|
||||
|
||||
const (
|
||||
Watch NodeEventType = 0
|
||||
Flush NodeEventType = 1
|
||||
// Watch NodeEventType const for assign channel to datanode for watching
|
||||
Watch NodeEventType = 1
|
||||
// Flush NodeEventTYpe const for flush specified segments
|
||||
Flush NodeEventType = 2
|
||||
)
|
||||
|
||||
// Event event wrapper contains EventType and related parameter
|
||||
type Event struct {
|
||||
Type EventType
|
||||
Data interface{}
|
||||
}
|
||||
|
||||
// WatchChannelParams Watch Event related parameter struct
|
||||
type WatchChannelParams struct {
|
||||
Channel string
|
||||
CollectionID UniqueID
|
||||
}
|
||||
|
||||
// Cluster handles all DataNode life-cycle and functional events
|
||||
type Cluster struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
@ -75,33 +92,42 @@ type Cluster struct {
|
|||
eventCh chan *Event
|
||||
}
|
||||
|
||||
// ClusterOption helper function used when creating a Cluster
|
||||
type ClusterOption func(c *Cluster)
|
||||
|
||||
// withRegisterPolicy helper function setting registerPolicy
|
||||
func withRegisterPolicy(p dataNodeRegisterPolicy) ClusterOption {
|
||||
return func(c *Cluster) { c.registerPolicy = p }
|
||||
}
|
||||
|
||||
// withUnregistorPolicy helper function setting unregisterPolicy
|
||||
func withUnregistorPolicy(p dataNodeUnregisterPolicy) ClusterOption {
|
||||
return func(c *Cluster) { c.unregisterPolicy = p }
|
||||
}
|
||||
|
||||
// withAssignPolicy helper function setting assignPolicy
|
||||
func withAssignPolicy(p channelAssignPolicy) ClusterOption {
|
||||
return func(c *Cluster) { c.assignPolicy = p }
|
||||
}
|
||||
|
||||
// defaultRegisterPolicy returns default registerPolicy
|
||||
func defaultRegisterPolicy() dataNodeRegisterPolicy {
|
||||
return newAssignBufferRegisterPolicy()
|
||||
}
|
||||
|
||||
// defaultUnregisterPolicy returns default unregisterPolicy
|
||||
func defaultUnregisterPolicy() dataNodeUnregisterPolicy {
|
||||
return randomAssignRegisterFunc
|
||||
}
|
||||
|
||||
// defaultAssignPolicy returns default assignPolicy
|
||||
func defaultAssignPolicy() channelAssignPolicy {
|
||||
return newBalancedAssignPolicy()
|
||||
}
|
||||
|
||||
// NewCluster creates a cluster with provided components
|
||||
// triggers loadFromKV to load previous meta from KV if exists
|
||||
// returns error when loadFromKV fails
|
||||
func NewCluster(ctx context.Context, kv kv.TxnKV, store ClusterStore,
|
||||
posProvider positionProvider, opts ...ClusterOption) (*Cluster, error) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
@ -122,13 +148,16 @@ func NewCluster(ctx context.Context, kv kv.TxnKV, store ClusterStore,
|
|||
opt(c)
|
||||
}
|
||||
|
||||
if err := c.loadFromKv(); err != nil {
|
||||
if err := c.loadFromKV(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *Cluster) loadFromKv() error {
|
||||
// loadFromKV load pre-stored kv meta
|
||||
// keys start with clusterPrefix stands for DataNodeInfos
|
||||
// value bind to key clusterBuffer stands for Channels not assigned yet
|
||||
func (c *Cluster) loadFromKV() error {
|
||||
_, values, err := c.kv.LoadWithPrefix(clusterPrefix)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -157,6 +186,9 @@ func (c *Cluster) loadFromKv() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Flush triggers Flush event
|
||||
// puts Event into buffered channel
|
||||
// function returns not guarantee event processed
|
||||
func (c *Cluster) Flush(segments []*datapb.SegmentInfo) {
|
||||
c.eventCh <- &Event{
|
||||
Type: FlushSegments,
|
||||
|
@ -164,6 +196,9 @@ func (c *Cluster) Flush(segments []*datapb.SegmentInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
// Register triggers Register event
|
||||
// put Event into buffered channel
|
||||
// function returns not guarantee event processed
|
||||
func (c *Cluster) Register(node *NodeInfo) {
|
||||
c.eventCh <- &Event{
|
||||
Type: Register,
|
||||
|
@ -171,6 +206,9 @@ func (c *Cluster) Register(node *NodeInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
// UnRegister triggers UnRegister event
|
||||
// put Event into buffered channel
|
||||
// function returns not guarantee event processed
|
||||
func (c *Cluster) UnRegister(node *NodeInfo) {
|
||||
c.eventCh <- &Event{
|
||||
Type: UnRegister,
|
||||
|
@ -178,6 +216,9 @@ func (c *Cluster) UnRegister(node *NodeInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
// Watch triggers Watch event
|
||||
// put Event into buffered channel
|
||||
// function returns not guarantee event processed
|
||||
func (c *Cluster) Watch(channel string, collectionID UniqueID) {
|
||||
c.eventCh <- &Event{
|
||||
Type: WatchChannel,
|
||||
|
@ -188,6 +229,7 @@ func (c *Cluster) Watch(channel string, collectionID UniqueID) {
|
|||
}
|
||||
}
|
||||
|
||||
// handleNodeEvent worker loop handles all node events
|
||||
func (c *Cluster) handleNodeEvent() {
|
||||
defer c.wg.Done()
|
||||
for {
|
||||
|
@ -212,6 +254,7 @@ func (c *Cluster) handleNodeEvent() {
|
|||
}
|
||||
}
|
||||
|
||||
// handleEvent worker loop handles all events belongs to specified DataNode
|
||||
func (c *Cluster) handleEvent(node *NodeInfo) {
|
||||
log.Debug("start handle event", zap.Any("node", node))
|
||||
ctx := node.ctx
|
||||
|
@ -267,6 +310,8 @@ func (c *Cluster) handleEvent(node *NodeInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
// getOrCreateClient get type.DataNode for specified data node
|
||||
// if not connected yet, try to connect
|
||||
func (c *Cluster) getOrCreateClient(ctx context.Context, id UniqueID) (types.DataNode, error) {
|
||||
c.mu.Lock()
|
||||
node := c.nodes.GetNode(id)
|
||||
|
@ -289,6 +334,7 @@ func (c *Cluster) getOrCreateClient(ctx context.Context, id UniqueID) (types.Dat
|
|||
return cli, nil
|
||||
}
|
||||
|
||||
// parseChannelsFromReq map-reduce to fetch channel names
|
||||
func parseChannelsFromReq(req *datapb.WatchDmChannelsRequest) []string {
|
||||
channels := make([]string, 0, len(req.GetVchannels()))
|
||||
for _, vc := range req.GetVchannels() {
|
||||
|
@ -297,6 +343,8 @@ func parseChannelsFromReq(req *datapb.WatchDmChannelsRequest) []string {
|
|||
return channels
|
||||
}
|
||||
|
||||
// createClient create type.DataNode from specified address
|
||||
// needs to be deprecated, since this function hard-corded DataNode to be grpc client
|
||||
func createClient(ctx context.Context, addr string) (types.DataNode, error) {
|
||||
cli, err := grpcdatanodeclient.NewClient(ctx, addr)
|
||||
if err != nil {
|
||||
|
@ -327,6 +375,8 @@ func (c *Cluster) Startup(nodes []*NodeInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
// updateCluster update nodes list
|
||||
// separates them into new nodes list and offline nodes list
|
||||
func (c *Cluster) updateCluster(nodes []*NodeInfo) (newNodes []*NodeInfo, offlines []*NodeInfo) {
|
||||
var onCnt, offCnt float64
|
||||
currentOnline := make(map[int64]struct{})
|
||||
|
@ -352,6 +402,8 @@ func (c *Cluster) updateCluster(nodes []*NodeInfo) (newNodes []*NodeInfo, offlin
|
|||
return
|
||||
}
|
||||
|
||||
// handleRegister handls register logic
|
||||
// applies register policy and save result into kv store
|
||||
func (c *Cluster) handleRegister(n *NodeInfo) {
|
||||
c.mu.Lock()
|
||||
cNodes := c.nodes.GetNodes()
|
||||
|
@ -374,6 +426,8 @@ func (c *Cluster) handleRegister(n *NodeInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
// handleUnRegister handles datanode unregister logic
|
||||
// applies unregisterPolicy and stores results into kv store
|
||||
func (c *Cluster) handleUnRegister(n *NodeInfo) {
|
||||
c.mu.Lock()
|
||||
node := c.nodes.GetNode(n.Info.GetVersion())
|
||||
|
@ -409,6 +463,8 @@ func (c *Cluster) handleUnRegister(n *NodeInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
// handleWatchChannel handls watch channel logic
|
||||
// applies assignPolicy and saves results into kv store
|
||||
func (c *Cluster) handleWatchChannel(channel string, collectionID UniqueID) {
|
||||
c.mu.Lock()
|
||||
cNodes := c.nodes.GetNodes()
|
||||
|
@ -432,6 +488,8 @@ func (c *Cluster) handleWatchChannel(channel string, collectionID UniqueID) {
|
|||
}
|
||||
}
|
||||
|
||||
// handleFlush handles flush logic
|
||||
// finds corresponding data nodes and trigger Node Events
|
||||
func (c *Cluster) handleFlush(segments []*datapb.SegmentInfo) {
|
||||
m := make(map[string]map[UniqueID][]UniqueID) // channel-> map[collectionID]segmentIDs
|
||||
for _, seg := range segments {
|
||||
|
@ -477,6 +535,8 @@ func (c *Cluster) handleFlush(segments []*datapb.SegmentInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
// watch handles watch logic
|
||||
// finds corresponding data nodes and trigger Node Events
|
||||
func (c *Cluster) watch(n *NodeInfo) {
|
||||
channelNames := make([]string, 0)
|
||||
uncompletes := make([]vchannel, 0, len(n.Info.Channels))
|
||||
|
@ -547,12 +607,14 @@ func (c *Cluster) txnSaveNodesAndBuffer(nodes []*NodeInfo, buffer []*datapb.Chan
|
|||
return c.kv.MultiSave(data)
|
||||
}
|
||||
|
||||
// GetNodes returns all nodes info in Cluster
|
||||
func (c *Cluster) GetNodes() []*NodeInfo {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
return c.nodes.GetNodes()
|
||||
}
|
||||
|
||||
// Close dispose all nodes resources
|
||||
func (c *Cluster) Close() {
|
||||
c.cancel()
|
||||
c.wg.Wait()
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
package datacoord
|
||||
|
||||
import (
|
||||
|
|
|
@ -60,12 +60,12 @@ func spyWatchPolicy(ch chan interface{}) channelAssignPolicy {
|
|||
}
|
||||
|
||||
// a mock kv that always fail when LoadWithPrefix
|
||||
type loadPrefixFailKv struct {
|
||||
type loadPrefixFailKV struct {
|
||||
kv.TxnKV
|
||||
}
|
||||
|
||||
// LoadWithPrefix override behavior
|
||||
func (kv *loadPrefixFailKv) LoadWithPrefix(key string) ([]string, []string, error) {
|
||||
func (kv *loadPrefixFailKV) LoadWithPrefix(key string) ([]string, []string, error) {
|
||||
return []string{}, []string{}, errors.New("mocked fail")
|
||||
}
|
||||
|
||||
|
@ -93,7 +93,7 @@ func TestClusterCreate(t *testing.T) {
|
|||
assert.EqualValues(t, "localhost:8080", dataNodes[0].Info.GetAddress())
|
||||
|
||||
t.Run("loadKv Fails", func(t *testing.T) {
|
||||
fkv := &loadPrefixFailKv{TxnKV: memKv}
|
||||
fkv := &loadPrefixFailKV{TxnKV: memKv}
|
||||
cluster, err := NewCluster(context.TODO(), fkv, spyClusterStore, dummyPosProvider{})
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, cluster)
|
||||
|
|
Loading…
Reference in New Issue