Refactor cluster in dataservice (#6356)

Signed-off-by: sunby <bingyi.sun@zilliz.com>
pull/6431/head v2.0.0-rc2
sunby 2021-07-12 11:03:52 +08:00 committed by GitHub
parent bd317a5461
commit a8e5fd2024
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 809 additions and 1555 deletions

View File

@ -14,59 +14,79 @@ package datacoord
import (
"fmt"
"sync"
"time"
"github.com/golang/protobuf/proto"
grpcdatanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/types"
"go.uber.org/zap"
"golang.org/x/net/context"
)
type cluster struct {
mu sync.RWMutex
ctx context.Context
dataManager *clusterNodeManager
sessionManager sessionManager
candidateManager *candidateManager
posProvider positionProvider
const clusterPrefix = "cluster-prefix/"
const clusterBuffer = "cluster-buffer"
const nodeEventChBufferSize = 1024
startupPolicy clusterStartupPolicy
const eventTimeout = 5 * time.Second
type EventType int
const (
Register EventType = 1
UnRegister EventType = 2
WatchChannel EventType = 3
FlushSegments EventType = 4
)
type NodeEventType int
const (
Watch NodeEventType = 0
Flush NodeEventType = 1
)
type Event struct {
Type EventType
Data interface{}
}
type WatchChannelParams struct {
Channel string
CollectionID UniqueID
}
type Cluster struct {
ctx context.Context
cancel context.CancelFunc
mu sync.Mutex
wg sync.WaitGroup
nodes ClusterStore
posProvider positionProvider
chanBuffer []*datapb.ChannelStatus //Unwatched channels buffer
kv kv.TxnKV
registerPolicy dataNodeRegisterPolicy
unregisterPolicy dataNodeUnregisterPolicy
assignPolicy channelAssignPolicy
eventCh chan *Event
}
type clusterOption struct {
apply func(c *cluster)
type ClusterOption func(c *Cluster)
func withRegisterPolicy(p dataNodeRegisterPolicy) ClusterOption {
return func(c *Cluster) { c.registerPolicy = p }
}
func withStartupPolicy(p clusterStartupPolicy) clusterOption {
return clusterOption{
apply: func(c *cluster) { c.startupPolicy = p },
}
func withUnregistorPolicy(p dataNodeUnregisterPolicy) ClusterOption {
return func(c *Cluster) { c.unregisterPolicy = p }
}
func withRegisterPolicy(p dataNodeRegisterPolicy) clusterOption {
return clusterOption{
apply: func(c *cluster) { c.registerPolicy = p },
}
}
func withUnregistorPolicy(p dataNodeUnregisterPolicy) clusterOption {
return clusterOption{
apply: func(c *cluster) { c.unregisterPolicy = p },
}
}
func withAssignPolicy(p channelAssignPolicy) clusterOption {
return clusterOption{
apply: func(c *cluster) { c.assignPolicy = p },
}
}
func defaultStartupPolicy() clusterStartupPolicy {
return newWatchRestartsStartupPolicy()
func withAssignPolicy(p channelAssignPolicy) ClusterOption {
return func(c *Cluster) { c.assignPolicy = p }
}
func defaultRegisterPolicy() dataNodeRegisterPolicy {
@ -81,329 +101,313 @@ func defaultAssignPolicy() channelAssignPolicy {
return newBalancedAssignPolicy()
}
func newCluster(ctx context.Context, dataManager *clusterNodeManager,
sessionManager sessionManager, posProvider positionProvider,
opts ...clusterOption) *cluster {
c := &cluster{
func NewCluster(ctx context.Context, kv kv.TxnKV, store ClusterStore,
posProvider positionProvider, opts ...ClusterOption) (*Cluster, error) {
ctx, cancel := context.WithCancel(ctx)
c := &Cluster{
ctx: ctx,
sessionManager: sessionManager,
dataManager: dataManager,
cancel: cancel,
kv: kv,
nodes: store,
posProvider: posProvider,
startupPolicy: defaultStartupPolicy(),
chanBuffer: []*datapb.ChannelStatus{},
registerPolicy: defaultRegisterPolicy(),
unregisterPolicy: defaultUnregisterPolicy(),
assignPolicy: defaultAssignPolicy(),
eventCh: make(chan *Event, nodeEventChBufferSize),
}
c.candidateManager = newCandidateManager(20, c.validateDataNode, c.enableDataNode)
for _, opt := range opts {
opt.apply(c)
opt(c)
}
return c
if err := c.loadFromKv(); err != nil {
return nil, err
}
return c, nil
}
// startup applies statup policy
func (c *cluster) startup(dataNodes []*datapb.DataNodeInfo) error {
/*deltaChange := c.dataManager.updateCluster(dataNodes)
nodes, chanBuffer := c.dataManager.getDataNodes(false)
var rets []*datapb.DataNodeInfo
var err error
rets, chanBuffer = c.startupPolicy.apply(nodes, deltaChange, chanBuffer)
c.dataManager.updateDataNodes(rets, chanBuffer)
rets, err = c.watch(rets)
if err != nil {
log.Warn("Failed to watch all the status change", zap.Error(err))
//does not trigger new another refresh, pending evt will do
}
c.dataManager.updateDataNodes(rets, chanBuffer)
return nil*/
return c.refresh(dataNodes)
}
// refresh rough refresh datanode status after event received
func (c *cluster) refresh(dataNodes []*datapb.DataNodeInfo) error {
c.mu.Lock()
defer c.mu.Unlock()
deltaChange := c.dataManager.updateCluster(dataNodes)
log.Debug("refresh delta", zap.Any("new", deltaChange.newNodes),
zap.Any("restart", deltaChange.restarts),
zap.Any("offline", deltaChange.offlines))
// cannot use startup policy directly separate into three parts:
// 1. add new nodes into candidates list
for _, dn := range dataNodes {
for _, newAddr := range deltaChange.newNodes {
if dn.Address == newAddr {
c.candidateManager.add(dn)
}
}
}
// 2. restart nodes, disable node&session, execute unregister policy and put node into candidate list
restartNodes := make([]*datapb.DataNodeInfo, 0, len(deltaChange.restarts))
for _, node := range deltaChange.restarts {
info, ok := c.dataManager.dataNodes[node]
if ok {
restartNodes = append(restartNodes, info.info)
c.dataManager.unregister(node) // remove from cluster
c.sessionManager.releaseSession(node)
} else {
log.Warn("Restart node not in node manager", zap.String("restart_node", node))
}
}
if len(restartNodes) > 0 {
for _, node := range restartNodes {
cluster, buffer := c.dataManager.getDataNodes(true)
if len(cluster) > 0 {
ret := c.unregisterPolicy.apply(cluster, node)
c.updateNodeWatch(ret, buffer)
} else {
// no online node, put all watched channels to buffer
buffer = append(buffer, node.Channels...)
c.updateNodeWatch([]*datapb.DataNodeInfo{}, buffer)
}
node.Channels = node.Channels[:0] // clear channels
c.candidateManager.add(node) // put node into candidate list
}
}
// 3. offline do unregister
unregisterNodes := make([]*datapb.DataNodeInfo, 0, len(deltaChange.offlines)) // possible nodes info to unregister
for _, node := range deltaChange.offlines {
c.sessionManager.releaseSession(node)
info := c.dataManager.unregister(node)
if info != nil {
unregisterNodes = append(unregisterNodes, info)
}
}
for _, node := range unregisterNodes {
cluster, buffer := c.dataManager.getDataNodes(true)
if len(cluster) > 0 { // cluster has online nodes, migrate channels
ret := c.unregisterPolicy.apply(cluster, node)
c.updateNodeWatch(ret, buffer)
} else {
// no online node, put all watched channels to buffer
buffer = append(buffer, node.Channels...)
c.updateNodeWatch([]*datapb.DataNodeInfo{}, buffer)
}
}
return nil
}
// updateNodeWatch save nodes uncomplete status and try to watch channels which is unwatched, save the execution result
func (c *cluster) updateNodeWatch(nodes []*datapb.DataNodeInfo, buffer []*datapb.ChannelStatus) error {
c.dataManager.updateDataNodes(nodes, buffer)
rets, err := c.watch(nodes)
if err != nil {
log.Warn("Failed to watch all the status change", zap.Error(err)) //
}
c.dataManager.updateDataNodes(rets, buffer)
return err
}
// paraRun parallel run, with max Parallel limit
func paraRun(works []func(), maxRunner int) {
wg := sync.WaitGroup{}
ch := make(chan func())
wg.Add(len(works))
if maxRunner > len(works) {
maxRunner = len(works)
}
for i := 0; i < maxRunner; i++ {
go func() {
work, ok := <-ch
if !ok {
return
}
work()
wg.Done()
}()
}
for _, work := range works {
ch <- work
}
wg.Wait()
close(ch)
}
func (c *cluster) validateDataNode(dn *datapb.DataNodeInfo) error {
log.Warn("[CM] start validate candidate", zap.String("addr", dn.Address))
_, err := c.sessionManager.getOrCreateSession(dn.Address) // this might take time if address went offline
log.Warn("[CM] candidate validation finished", zap.String("addr", dn.Address), zap.Error(err))
func (c *Cluster) loadFromKv() error {
_, values, err := c.kv.LoadWithPrefix(clusterPrefix)
if err != nil {
return err
}
for _, v := range values {
info := &datapb.DataNodeInfo{}
if err := proto.UnmarshalText(v, info); err != nil {
return err
}
node := NewNodeInfo(c.ctx, info)
c.nodes.SetNode(info.GetVersion(), node)
go c.handleEvent(node)
}
dn, _ := c.kv.Load(clusterBuffer)
//TODO add not value error check
if dn != "" {
info := &datapb.DataNodeInfo{}
if err := proto.UnmarshalText(dn, info); err != nil {
return err
}
c.chanBuffer = info.Channels
}
return nil
}
func (c *cluster) enableDataNode(dn *datapb.DataNodeInfo) error {
log.Warn("[CM] enabling candidate", zap.String("addr", dn.Address))
c.register(dn)
return nil
func (c *Cluster) Flush(segments []*datapb.SegmentInfo) {
c.eventCh <- &Event{
Type: FlushSegments,
Data: segments,
}
}
func (c *cluster) watch(nodes []*datapb.DataNodeInfo) ([]*datapb.DataNodeInfo, error) {
works := make([]func(), 0, len(nodes))
mut := sync.Mutex{}
errs := make([]error, 0, len(nodes))
for _, n := range nodes {
works = append(works, func() {
logMsg := fmt.Sprintf("Begin to watch channels for node %s, channels:", n.Address)
uncompletes := make([]vchannel, 0, len(n.Channels))
for _, ch := range n.Channels {
if ch.State == datapb.ChannelWatchState_Uncomplete {
if len(uncompletes) == 0 {
logMsg += ch.Name
} else {
logMsg += "," + ch.Name
}
uncompletes = append(uncompletes, vchannel{
CollectionID: ch.CollectionID,
DmlChannel: ch.Name,
})
}
}
if len(uncompletes) == 0 {
return // all set, just return
}
log.Debug(logMsg)
vchanInfos, err := c.posProvider.GetVChanPositions(uncompletes, true)
if err != nil {
log.Warn("get vchannel position failed", zap.Error(err))
mut.Lock()
errs = append(errs, err)
mut.Unlock()
return
}
cli, err := c.sessionManager.getSession(n.Address) //fail fast, don't create session
if err != nil {
log.Warn("get session failed", zap.String("addr", n.Address), zap.Error(err))
mut.Lock()
errs = append(errs, err)
mut.Unlock()
return
}
req := &datapb.WatchDmChannelsRequest{
Base: &commonpb.MsgBase{
SourceID: Params.NodeID,
},
Vchannels: vchanInfos,
}
resp, err := cli.WatchDmChannels(c.ctx, req)
if err != nil {
log.Warn("watch dm channel failed", zap.String("addr", n.Address), zap.Error(err))
mut.Lock()
errs = append(errs, err)
mut.Unlock()
return
}
if resp.ErrorCode != commonpb.ErrorCode_Success {
log.Warn("watch channels failed", zap.String("address", n.Address), zap.Error(err))
mut.Lock()
errs = append(errs, fmt.Errorf("watch fail with stat %v, msg:%s", resp.ErrorCode, resp.Reason))
mut.Unlock()
return
}
for _, ch := range n.Channels {
if ch.State == datapb.ChannelWatchState_Uncomplete {
ch.State = datapb.ChannelWatchState_Complete
}
}
})
func (c *Cluster) Register(node *NodeInfo) {
c.eventCh <- &Event{
Type: Register,
Data: node,
}
paraRun(works, 20)
if len(errs) > 0 {
return nodes, retry.ErrorList(errs)
}
return nodes, nil
}
func (c *cluster) register(n *datapb.DataNodeInfo) {
c.mu.Lock()
defer c.mu.Unlock()
c.dataManager.register(n)
cNodes, chanBuffer := c.dataManager.getDataNodes(true)
var rets []*datapb.DataNodeInfo
func (c *Cluster) UnRegister(node *NodeInfo) {
c.eventCh <- &Event{
Type: UnRegister,
Data: node,
}
}
func (c *Cluster) Watch(channel string, collectionID UniqueID) {
c.eventCh <- &Event{
Type: WatchChannel,
Data: &WatchChannelParams{
Channel: channel,
CollectionID: collectionID,
},
}
}
func (c *Cluster) handleNodeEvent() {
defer c.wg.Done()
for {
select {
case <-c.ctx.Done():
return
case e := <-c.eventCh:
switch e.Type {
case Register:
c.handleRegister(e.Data.(*NodeInfo))
case UnRegister:
c.handleUnRegister(e.Data.(*NodeInfo))
case WatchChannel:
params := e.Data.(*WatchChannelParams)
c.handleWatchChannel(params.Channel, params.CollectionID)
case FlushSegments:
c.handleFlush(e.Data.([]*datapb.SegmentInfo))
default:
log.Warn("Unknow node event type")
}
}
}
}
func (c *Cluster) handleEvent(node *NodeInfo) {
ctx := node.ctx
ch := node.GetEventChannel()
var cli types.DataNode
var err error
log.Debug("before register policy applied", zap.Any("n.Channels", n.Channels), zap.Any("buffer", chanBuffer))
rets, chanBuffer = c.registerPolicy.apply(cNodes, n, chanBuffer)
log.Debug("after register policy applied", zap.Any("ret", rets), zap.Any("buffer", chanBuffer))
c.dataManager.updateDataNodes(rets, chanBuffer)
rets, err = c.watch(rets)
for {
select {
case <-ctx.Done():
return
case event := <-ch:
cli = node.GetClient()
if cli == nil {
cli, err = createClient(ctx, node.info.GetAddress())
if err != nil {
log.Warn("failed to create client", zap.Any("node", node), zap.Error(err))
continue
}
c.mu.Lock()
c.nodes.SetClient(node.info.GetVersion(), cli)
c.mu.Unlock()
}
switch event.Type {
case Watch:
req, ok := event.Req.(*datapb.WatchDmChannelsRequest)
if !ok {
log.Warn("request type is not Watch")
continue
}
tCtx, cancel := context.WithTimeout(ctx, eventTimeout)
resp, err := cli.WatchDmChannels(tCtx, req)
cancel()
if err = VerifyResponse(resp, err); err != nil {
log.Warn("Failed to watch dm channels", zap.String("addr", node.info.GetAddress()))
}
c.mu.Lock()
c.nodes.SetWatched(node.info.GetVersion(), parseChannelsFromReq(req))
node = c.nodes.GetNode(node.info.GetVersion())
c.mu.Unlock()
if err = c.saveNode(node); err != nil {
log.Warn("failed to save node info", zap.Any("node", node))
continue
}
case Flush:
req, ok := event.Req.(*datapb.FlushSegmentsRequest)
if !ok {
log.Warn("request type is not Flush")
continue
}
tCtx, cancel := context.WithTimeout(ctx, eventTimeout)
resp, err := cli.FlushSegments(tCtx, req)
cancel()
if err = VerifyResponse(resp, err); err != nil {
log.Warn("Failed to flush segments", zap.String("addr", node.info.GetAddress()))
}
default:
log.Warn("Wrong event type", zap.Any("type", event.Type))
}
}
}
}
func parseChannelsFromReq(req *datapb.WatchDmChannelsRequest) []string {
channels := make([]string, 0, len(req.GetVchannels()))
for _, vc := range req.GetVchannels() {
channels = append(channels, vc.ChannelName)
}
return channels
}
func createClient(ctx context.Context, addr string) (types.DataNode, error) {
cli, err := grpcdatanodeclient.NewClient(ctx, addr)
if err != nil {
log.Warn("Failed to watch all the status change", zap.Error(err))
//does not trigger new another refresh, pending evt will do
return nil, err
}
c.dataManager.updateDataNodes(rets, chanBuffer)
if err := cli.Init(); err != nil {
return nil, err
}
if err := cli.Start(); err != nil {
return nil, err
}
return cli, nil
}
func (c *cluster) unregister(n *datapb.DataNodeInfo) {
c.mu.Lock()
defer c.mu.Unlock()
c.sessionManager.releaseSession(n.Address)
oldNode := c.dataManager.unregister(n.Address)
if oldNode != nil {
n = oldNode
// Startup applies statup policy
func (c *Cluster) Startup(nodes []*NodeInfo) {
c.wg.Add(1)
go c.handleNodeEvent()
// before startup, we have restore all nodes recorded last time. We should
// find new created/offlined/restarted nodes and adjust channels allocation.
addNodes, deleteNodes := c.updateCluster(nodes)
for _, node := range addNodes {
c.Register(node)
}
cNodes, chanBuffer := c.dataManager.getDataNodes(true)
log.Debug("before unregister policy applied", zap.Any("n.Channels", n.Channels), zap.Any("buffer", chanBuffer))
var rets []*datapb.DataNodeInfo
var err error
for _, node := range deleteNodes {
c.UnRegister(node)
}
}
func (c *Cluster) updateCluster(nodes []*NodeInfo) (newNodes []*NodeInfo, offlines []*NodeInfo) {
var onCnt, offCnt float64
currentOnline := make(map[int64]struct{})
for _, n := range nodes {
currentOnline[n.info.GetVersion()] = struct{}{}
node := c.nodes.GetNode(n.info.GetVersion())
if node == nil {
newNodes = append(newNodes, n)
}
onCnt++
}
currNodes := c.nodes.GetNodes()
for _, node := range currNodes {
_, has := currentOnline[node.info.GetVersion()]
if !has {
offlines = append(offlines, node)
offCnt++
}
}
metrics.DataCoordDataNodeList.WithLabelValues("online").Set(onCnt)
metrics.DataCoordDataNodeList.WithLabelValues("offline").Set(offCnt)
return
}
func (c *Cluster) handleRegister(n *NodeInfo) {
c.mu.Lock()
cNodes := c.nodes.GetNodes()
var nodes []*NodeInfo
log.Debug("before register policy applied", zap.Any("n.Channels", n.info.GetChannels()), zap.Any("buffer", c.chanBuffer))
nodes, c.chanBuffer = c.registerPolicy(cNodes, n, c.chanBuffer)
log.Debug("after register policy applied", zap.Any("ret", nodes), zap.Any("buffer", c.chanBuffer))
go c.handleEvent(n)
c.txnSaveNodesAndBuffer(nodes, c.chanBuffer)
for _, node := range nodes {
c.nodes.SetNode(node.info.GetVersion(), node)
}
c.mu.Unlock()
for _, node := range nodes {
c.watch(node)
}
}
func (c *Cluster) handleUnRegister(n *NodeInfo) {
c.mu.Lock()
node := c.nodes.GetNode(n.info.GetVersion())
if node == nil {
c.mu.Unlock()
return
}
node.Dispose()
c.nodes.DeleteNode(n.info.GetVersion())
cNodes := c.nodes.GetNodes()
log.Debug("before unregister policy applied", zap.Any("node.Channels", node.info.GetChannels()), zap.Any("buffer", c.chanBuffer))
var rets []*NodeInfo
if len(cNodes) == 0 {
for _, chStat := range n.Channels {
for _, chStat := range node.info.GetChannels() {
chStat.State = datapb.ChannelWatchState_Uncomplete
chanBuffer = append(chanBuffer, chStat)
c.chanBuffer = append(c.chanBuffer, chStat)
}
} else {
rets = c.unregisterPolicy.apply(cNodes, n)
rets = c.unregisterPolicy(cNodes, n)
}
log.Debug("after register policy applied", zap.Any("ret", rets), zap.Any("buffer", chanBuffer))
c.dataManager.updateDataNodes(rets, chanBuffer)
rets, err = c.watch(rets)
if err != nil {
log.Warn("Failed to watch all the status change", zap.Error(err))
//does not trigger new another refresh, pending evt will do
c.txnSaveNodesAndBuffer(rets, c.chanBuffer)
for _, node := range rets {
c.nodes.SetNode(node.info.GetVersion(), node)
}
c.mu.Unlock()
for _, node := range rets {
c.watch(node)
}
c.dataManager.updateDataNodes(rets, chanBuffer)
}
func (c *cluster) watchIfNeeded(channel string, collectionID UniqueID) {
func (c *Cluster) handleWatchChannel(channel string, collectionID UniqueID) {
c.mu.Lock()
defer c.mu.Unlock()
cNodes, chanBuffer := c.dataManager.getDataNodes(true)
var rets []*datapb.DataNodeInfo
var err error
cNodes := c.nodes.GetNodes()
var rets []*NodeInfo
if len(cNodes) == 0 { // no nodes to assign, put into buffer
chanBuffer = append(chanBuffer, &datapb.ChannelStatus{
c.chanBuffer = append(c.chanBuffer, &datapb.ChannelStatus{
Name: channel,
CollectionID: collectionID,
State: datapb.ChannelWatchState_Uncomplete,
})
} else {
rets = c.assignPolicy.apply(cNodes, channel, collectionID)
rets = c.assignPolicy(cNodes, channel, collectionID)
}
c.dataManager.updateDataNodes(rets, chanBuffer)
rets, err = c.watch(rets)
if err != nil {
log.Warn("Failed to watch all the status change", zap.Error(err))
//does not trigger new another refresh, pending evt will do
c.txnSaveNodesAndBuffer(rets, c.chanBuffer)
for _, node := range rets {
c.nodes.SetNode(node.info.GetVersion(), node)
}
c.mu.Unlock()
for _, node := range rets {
c.watch(node)
}
c.dataManager.updateDataNodes(rets, chanBuffer)
}
func (c *cluster) flush(segments []*datapb.SegmentInfo) {
c.mu.Lock()
defer c.mu.Unlock()
func (c *Cluster) handleFlush(segments []*datapb.SegmentInfo) {
m := make(map[string]map[UniqueID][]UniqueID) // channel-> map[collectionID]segmentIDs
for _, seg := range segments {
if _, ok := m[seg.InsertChannel]; !ok {
m[seg.InsertChannel] = make(map[UniqueID][]UniqueID)
@ -412,12 +416,14 @@ func (c *cluster) flush(segments []*datapb.SegmentInfo) {
m[seg.InsertChannel][seg.CollectionID] = append(m[seg.InsertChannel][seg.CollectionID], seg.ID)
}
dataNodes, _ := c.dataManager.getDataNodes(true)
c.mu.Lock()
dataNodes := c.nodes.GetNodes()
c.mu.Unlock()
channel2Node := make(map[string]string)
channel2Node := make(map[string]*NodeInfo)
for _, node := range dataNodes {
for _, chstatus := range node.Channels {
channel2Node[chstatus.Name] = node.Address
for _, chstatus := range node.info.GetChannels() {
channel2Node[chstatus.Name] = node
}
}
@ -426,11 +432,6 @@ func (c *cluster) flush(segments []*datapb.SegmentInfo) {
if !ok {
continue
}
cli, err := c.sessionManager.getSession(node)
if err != nil {
log.Warn("get session failed", zap.String("addr", node), zap.Error(err))
continue
}
for coll, segs := range coll2seg {
req := &datapb.FlushSegmentsRequest{
Base: &commonpb.MsgBase{
@ -440,23 +441,96 @@ func (c *cluster) flush(segments []*datapb.SegmentInfo) {
CollectionID: coll,
SegmentIDs: segs,
}
resp, err := cli.FlushSegments(c.ctx, req)
if err != nil {
log.Warn("flush segment failed", zap.String("addr", node), zap.Error(err))
continue
ch := node.GetEventChannel()
e := &NodeEvent{
Type: Flush,
Req: req,
}
if resp.ErrorCode != commonpb.ErrorCode_Success {
log.Warn("flush segment failed", zap.String("dataNode", node), zap.Error(err))
continue
}
log.Debug("flush segments succeed", zap.Any("segmentIDs", segs))
ch <- e
}
}
}
func (c *cluster) releaseSessions() {
func (c *Cluster) watch(n *NodeInfo) {
var logMsg string
uncompletes := make([]vchannel, 0, len(n.info.Channels))
for _, ch := range n.info.GetChannels() {
if ch.State == datapb.ChannelWatchState_Uncomplete {
if len(uncompletes) == 0 {
logMsg += ch.Name
} else {
logMsg += "," + ch.Name
}
uncompletes = append(uncompletes, vchannel{
CollectionID: ch.CollectionID,
DmlChannel: ch.Name,
})
}
}
if len(uncompletes) == 0 {
return // all set, just return
}
log.Debug(logMsg)
vchanInfos, err := c.posProvider.GetVChanPositions(uncompletes, true)
if err != nil {
log.Warn("get vchannel position failed", zap.Error(err))
return
}
req := &datapb.WatchDmChannelsRequest{
Base: &commonpb.MsgBase{
SourceID: Params.NodeID,
},
Vchannels: vchanInfos,
}
e := &NodeEvent{
Type: Watch,
Req: req,
}
ch := n.GetEventChannel()
ch <- e
}
func (c *Cluster) saveNode(n *NodeInfo) error {
key := fmt.Sprintf("%s%d", clusterPrefix, n.info.GetVersion())
value := proto.MarshalTextString(n.info)
return c.kv.Save(key, value)
}
func (c *Cluster) txnSaveNodesAndBuffer(nodes []*NodeInfo, buffer []*datapb.ChannelStatus) error {
if len(nodes) == 0 && len(buffer) == 0 {
return nil
}
data := make(map[string]string)
for _, n := range nodes {
key := fmt.Sprintf("%s%d", clusterPrefix, n.info.GetVersion())
value := proto.MarshalTextString(n.info)
data[key] = value
}
// short cut, reusing datainfo to store array of channel status
bufNode := &datapb.DataNodeInfo{
Channels: buffer,
}
data[clusterBuffer] = proto.MarshalTextString(bufNode)
return c.kv.MultiSave(data)
}
func (c *Cluster) GetNodes() []*NodeInfo {
c.mu.Lock()
defer c.mu.Unlock()
c.sessionManager.release()
c.candidateManager.dispose()
return c.nodes.GetNodes()
}
func (c *Cluster) Close() {
c.cancel()
c.wg.Wait()
c.mu.Lock()
defer c.mu.Unlock()
nodes := c.nodes.GetNodes()
for _, node := range nodes {
node.Dispose()
}
}

View File

@ -1,124 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package datacoord
import (
"context"
"sync"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"go.uber.org/zap"
)
// candidateManager manages data node candidates
type candidateManager struct {
candidatePool sync.Map // current processing candidates
taskQueue chan candidate // task queue to notify workers
cancel func() // global cancel func
validate func(*datapb.DataNodeInfo) error // candidate validation
enable func(*datapb.DataNodeInfo) error // enable operation if candidate validate
}
// candidate stands for datanode info from etcd
// it needs to be validated before put into cluster
// since etcd key has a lease timeout of 10 seconds
type candidate struct {
key string // key to specify candidate, usually candidate address
node *datapb.DataNodeInfo // node info
ctx context.Context //context obj to control validation process
cancel func() // cancel func to cancel single candidate
}
// newCandidateManager create candidate with specified worker number
func newCandidateManager(wn int, validate, enable func(*datapb.DataNodeInfo) error) *candidateManager {
if wn <= 0 {
wn = 20
}
ctx, cancel := context.WithCancel(context.Background())
cm := &candidateManager{
candidatePool: sync.Map{},
cancel: cancel,
taskQueue: make(chan candidate, wn), // wn * 2 cap, wn worker & wn buffer
validate: validate,
enable: enable,
}
for i := 0; i < wn; i++ {
//start worker
go cm.work(ctx)
}
return cm
}
// work processes the candidates from channel
// each task can be cancel by candidate contex or by global cancel fund
func (cm *candidateManager) work(ctx context.Context) {
for {
select {
case cand := <-cm.taskQueue:
ch := make(chan struct{})
var err error
go func() {
err = cm.validate(cand.node)
ch <- struct{}{}
}()
select {
case <-ch:
if err == nil {
cm.enable(cand.node) // success, enable candidate
} else {
log.Warn("[CM] candidate failed", zap.String("addr", cand.node.Address))
}
case <-cand.ctx.Done():
}
cm.candidatePool.Delete(cand.key) // remove from candidatePool
case <-ctx.Done():
return
}
}
}
// add datanode into candidate pool
// the operation is non-blocking
func (cm *candidateManager) add(dn *datapb.DataNodeInfo) {
log.Warn("[CM]add new candidate", zap.String("addr", dn.Address))
key := dn.Address
ctx, cancel := context.WithCancel(context.Background())
cand := candidate{
key: key,
node: dn,
ctx: ctx,
cancel: cancel,
}
_, loaded := cm.candidatePool.LoadOrStore(key, cand)
if !loaded {
go func() { // start goroutine to non-blocking add into queue
cm.taskQueue <- cand
}()
}
}
// stop the candidate validation process if it exists in the pool
func (cm *candidateManager) stop(key string) {
val, loaded := cm.candidatePool.LoadAndDelete(key)
if loaded {
cand, ok := val.(candidate)
if ok {
cand.cancel()
}
}
}
// dispose the manager for stopping app
func (cm *candidateManager) dispose() {
cm.cancel()
}

View File

@ -1,198 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package datacoord
import (
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/datapb"
)
const clusterPrefix = "cluster-prefix/"
const clusterBuffer = "cluster-buffer"
type dataNodeStatus int8
const (
online dataNodeStatus = iota
offline
)
type dataNodeInfo struct {
info *datapb.DataNodeInfo
status dataNodeStatus
}
type clusterNodeManager struct {
kv kv.TxnKV
dataNodes map[string]*dataNodeInfo
chanBuffer []*datapb.ChannelStatus //Unwatched channels buffer
}
func newClusterNodeManager(kv kv.TxnKV) (*clusterNodeManager, error) {
c := &clusterNodeManager{
kv: kv,
dataNodes: make(map[string]*dataNodeInfo),
chanBuffer: []*datapb.ChannelStatus{},
}
return c, c.loadFromKv()
}
func (c *clusterNodeManager) loadFromKv() error {
_, values, err := c.kv.LoadWithPrefix(clusterPrefix)
if err != nil {
return err
}
for _, v := range values {
info := &datapb.DataNodeInfo{}
if err := proto.UnmarshalText(v, info); err != nil {
return err
}
node := &dataNodeInfo{
info: info,
status: offline,
}
c.dataNodes[info.Address] = node
}
dn, _ := c.kv.Load(clusterBuffer)
//TODO add not value error check
if dn != "" {
info := &datapb.DataNodeInfo{}
if err := proto.UnmarshalText(dn, info); err != nil {
return err
}
c.chanBuffer = info.Channels
}
return nil
}
func (c *clusterNodeManager) updateCluster(dataNodes []*datapb.DataNodeInfo) *clusterDeltaChange {
newNodes := make([]string, 0)
offlines := make([]string, 0)
restarts := make([]string, 0)
var onCnt, offCnt float64
currentOnline := make(map[string]struct{})
for _, n := range dataNodes {
currentOnline[n.Address] = struct{}{}
onCnt++
node, ok := c.dataNodes[n.Address]
if ok {
node.status = online
if node.info.Version != n.Version {
restarts = append(restarts, n.Address)
}
continue
}
newNodes = append(newNodes, n.Address)
}
for nAddr, node := range c.dataNodes {
_, has := currentOnline[nAddr]
if !has && node.status == online {
node.status = offline
offCnt++
offlines = append(offlines, nAddr)
}
}
metrics.DataCoordDataNodeList.WithLabelValues("online").Set(onCnt)
metrics.DataCoordDataNodeList.WithLabelValues("offline").Set(offCnt)
return &clusterDeltaChange{
newNodes: newNodes,
offlines: offlines,
restarts: restarts,
}
}
// updateDataNodes update dataNodes input mereged with existing cluster and buffer
func (c *clusterNodeManager) updateDataNodes(dataNodes []*datapb.DataNodeInfo, buffer []*datapb.ChannelStatus) error {
for _, node := range dataNodes {
c.dataNodes[node.Address].info = node
}
return c.txnSaveNodes(dataNodes, buffer)
}
// getDataNodes get current synced data nodes with buffered channel
func (c *clusterNodeManager) getDataNodes(onlyOnline bool) (map[string]*datapb.DataNodeInfo, []*datapb.ChannelStatus) {
ret := make(map[string]*datapb.DataNodeInfo)
for k, v := range c.dataNodes {
if !onlyOnline || v.status == online {
ret[k] = proto.Clone(v.info).(*datapb.DataNodeInfo)
}
}
return ret, c.chanBuffer
}
func (c *clusterNodeManager) register(n *datapb.DataNodeInfo) {
node, ok := c.dataNodes[n.Address]
if ok {
node.status = online
node.info.Version = n.Version
} else {
c.dataNodes[n.Address] = &dataNodeInfo{
info: n,
status: online,
}
}
c.updateMetrics()
}
// unregister removes node with specified address, returns node info if exists
func (c *clusterNodeManager) unregister(addr string) *datapb.DataNodeInfo {
node, ok := c.dataNodes[addr]
if !ok {
return nil
}
delete(c.dataNodes, addr)
node.status = offline
c.updateMetrics()
return node.info
}
func (c *clusterNodeManager) updateMetrics() {
var offCnt, onCnt float64
for _, node := range c.dataNodes {
if node.status == online {
onCnt++
} else {
offCnt++
}
}
metrics.DataCoordDataNodeList.WithLabelValues("online").Set(onCnt)
metrics.DataCoordDataNodeList.WithLabelValues("offline").Set(offCnt)
}
func (c *clusterNodeManager) txnSaveNodes(nodes []*datapb.DataNodeInfo, buffer []*datapb.ChannelStatus) error {
if len(nodes) == 0 && len(buffer) == 0 {
return nil
}
data := make(map[string]string)
for _, n := range nodes {
c.dataNodes[n.Address].info = n
key := clusterPrefix + n.Address
value := proto.MarshalTextString(n)
data[key] = value
}
c.chanBuffer = buffer
// short cut, reusing datainfo to store array of channel status
bufNode := &datapb.DataNodeInfo{
Channels: buffer,
}
data[clusterBuffer] = proto.MarshalTextString(bufNode)
return c.kv.MultiSave(data)
}

View File

@ -1,110 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package datacoord
import (
"context"
"errors"
"sync"
"github.com/milvus-io/milvus/internal/types"
)
type sessionManager interface {
// try get session, without retry
getSession(addr string) (types.DataNode, error)
// try get session from manager with addr, if not exists, create one
getOrCreateSession(addr string) (types.DataNode, error)
releaseSession(addr string)
release()
}
type clusterSessionManager struct {
sync.RWMutex
ctx context.Context
sessions map[string]types.DataNode
dataClientCreator dataNodeCreatorFunc
}
func newClusterSessionManager(ctx context.Context, dataClientCreator dataNodeCreatorFunc) *clusterSessionManager {
return &clusterSessionManager{
ctx: ctx,
sessions: make(map[string]types.DataNode),
dataClientCreator: dataClientCreator,
}
}
// getSession with out creation if not found
func (m *clusterSessionManager) getSession(addr string) (types.DataNode, error) {
m.RLock()
defer m.RUnlock()
cli, has := m.sessions[addr]
if has {
return cli, nil
}
return nil, errors.New("not found")
}
func (m *clusterSessionManager) createSession(addr string) (types.DataNode, error) {
cli, err := m.dataClientCreator(m.ctx, addr)
if err != nil {
return nil, err
}
if err := cli.Init(); err != nil {
return nil, err
}
if err := cli.Start(); err != nil {
return nil, err
}
m.Lock()
m.sessions[addr] = cli
m.Unlock()
return cli, nil
}
// entry function
func (m *clusterSessionManager) getOrCreateSession(addr string) (types.DataNode, error) {
m.RLock()
dn, has := m.sessions[addr]
m.RUnlock()
if has {
return dn, nil
}
// does not need double check, addr has outer sync.Map
dn, err := m.createSession(addr)
return dn, err
}
// // lock acquired
// func (m *clusterSessionManager) hasSession(addr string) bool {
// _, ok := m.sessions[addr]
// return ok
// }
func (m *clusterSessionManager) releaseSession(addr string) {
m.Lock()
defer m.Unlock()
cli, ok := m.sessions[addr]
if !ok {
return
}
_ = cli.Stop()
delete(m.sessions, addr)
}
func (m *clusterSessionManager) release() {
m.Lock()
defer m.Unlock()
for _, cli := range m.sessions {
_ = cli.Stop()
}
m.sessions = map[string]types.DataNode{}
}

View File

@ -0,0 +1,187 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package datacoord
import (
"context"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/types"
)
type ClusterStore interface {
GetNodes() []*NodeInfo
SetNode(nodeID UniqueID, node *NodeInfo)
DeleteNode(nodeID UniqueID)
GetNode(nodeID UniqueID) *NodeInfo
SetClient(nodeID UniqueID, client types.DataNode)
SetWatched(nodeID UniqueID, channelsName []string)
}
type NodeInfo struct {
info *datapb.DataNodeInfo
eventCh chan *NodeEvent
client types.DataNode
ctx context.Context
cancel context.CancelFunc
}
const eventChBuffer = 1024
type NodeEvent struct {
Type NodeEventType
Req interface{}
}
func NewNodeInfo(ctx context.Context, info *datapb.DataNodeInfo) *NodeInfo {
ctx, cancel := context.WithCancel(ctx)
return &NodeInfo{
info: info,
eventCh: make(chan *NodeEvent, eventChBuffer),
ctx: ctx,
cancel: cancel,
}
}
func (n *NodeInfo) ShadowClone(opts ...NodeOpt) *NodeInfo {
cloned := &NodeInfo{
info: n.info,
eventCh: n.eventCh,
client: n.client,
ctx: n.ctx,
cancel: n.cancel,
}
for _, opt := range opts {
opt(cloned)
}
return cloned
}
func (n *NodeInfo) Clone(opts ...NodeOpt) *NodeInfo {
info := proto.Clone(n.info).(*datapb.DataNodeInfo)
cloned := &NodeInfo{
info: info,
eventCh: n.eventCh,
client: n.client,
ctx: n.ctx,
cancel: n.cancel,
}
for _, opt := range opts {
opt(cloned)
}
return cloned
}
func (n *NodeInfo) GetEventChannel() chan *NodeEvent {
return n.eventCh
}
func (n *NodeInfo) GetClient() types.DataNode {
return n.client
}
func (n *NodeInfo) Dispose() {
defer n.cancel()
if n.client != nil {
n.client.Stop()
}
}
type NodesInfo struct {
nodes map[UniqueID]*NodeInfo
}
func NewNodesInfo() *NodesInfo {
c := &NodesInfo{
nodes: make(map[UniqueID]*NodeInfo),
}
return c
}
func (c *NodesInfo) GetNodes() []*NodeInfo {
nodes := make([]*NodeInfo, 0, len(c.nodes))
for _, node := range c.nodes {
nodes = append(nodes, node)
}
return nodes
}
func (c *NodesInfo) SetNode(nodeID UniqueID, node *NodeInfo) {
c.nodes[nodeID] = node
metrics.DataCoordDataNodeList.WithLabelValues("online").Inc()
metrics.DataCoordDataNodeList.WithLabelValues("offline").Dec()
}
func (c *NodesInfo) DeleteNode(nodeID UniqueID) {
delete(c.nodes, nodeID)
metrics.DataCoordDataNodeList.WithLabelValues("online").Dec()
metrics.DataCoordDataNodeList.WithLabelValues("offline").Inc()
}
func (c *NodesInfo) GetNode(nodeID UniqueID) *NodeInfo {
node, ok := c.nodes[nodeID]
if !ok {
return nil
}
return node
}
func (c *NodesInfo) SetClient(nodeID UniqueID, client types.DataNode) {
if node, ok := c.nodes[nodeID]; ok {
c.nodes[nodeID] = node.ShadowClone(SetClient(client))
}
}
func (c *NodesInfo) SetWatched(nodeID UniqueID, channelsName []string) {
if node, ok := c.nodes[nodeID]; ok {
c.nodes[nodeID] = node.Clone(SetWatched(channelsName))
}
}
type NodeOpt func(n *NodeInfo)
func SetWatched(channelsName []string) NodeOpt {
return func(n *NodeInfo) {
channelsMap := make(map[string]struct{})
for _, channelName := range channelsName {
channelsMap[channelName] = struct{}{}
}
for _, ch := range n.info.Channels {
_, ok := channelsMap[ch.GetName()]
if !ok {
continue
}
if ch.State == datapb.ChannelWatchState_Uncomplete {
ch.State = datapb.ChannelWatchState_Complete
}
}
}
}
func SetClient(client types.DataNode) NodeOpt {
return func(n *NodeInfo) {
n.client = client
}
}
func AddChannels(channels []*datapb.ChannelStatus) NodeOpt {
return func(n *NodeInfo) {
n.info.Channels = append(n.info.Channels, channels...)
}
}
func SetChannels(channels []*datapb.ChannelStatus) NodeOpt {
return func(n *NodeInfo) {
n.info.Channels = channels
}
}

View File

@ -12,8 +12,7 @@ package datacoord
import (
"context"
"errors"
"strings"
"fmt"
"testing"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
@ -21,208 +20,156 @@ import (
"github.com/stretchr/testify/assert"
)
func TestClusterCreate(t *testing.T) {
cPolicy := newMockStartupPolicy()
ch := make(chan struct{}, 1)
cluster := createCluster(t, nil, withStartupPolicy(cPolicy), mockValidatorOption(ch))
addr := "localhost:8080"
nodes := []*datapb.DataNodeInfo{
{
Address: addr,
Version: 1,
Channels: []*datapb.ChannelStatus{},
},
type SpyClusterStore struct {
*NodesInfo
ch chan interface{}
}
func (s *SpyClusterStore) SetNode(nodeID UniqueID, node *NodeInfo) {
s.NodesInfo.SetNode(nodeID, node)
s.ch <- struct{}{}
}
func (s *SpyClusterStore) DeleteNode(nodeID UniqueID) {
s.NodesInfo.DeleteNode(nodeID)
s.ch <- struct{}{}
}
func spyWatchPolicy(ch chan interface{}) channelAssignPolicy {
return func(cluster []*NodeInfo, channel string, collectionID UniqueID) []*NodeInfo {
for _, node := range cluster {
for _, c := range node.info.GetChannels() {
if c.GetName() == channel && c.GetCollectionID() == collectionID {
ch <- struct{}{}
return nil
}
}
}
ret := make([]*NodeInfo, 0)
c := &datapb.ChannelStatus{
Name: channel,
State: datapb.ChannelWatchState_Uncomplete,
CollectionID: collectionID,
}
n := cluster[0].Clone(AddChannels([]*datapb.ChannelStatus{c}))
ret = append(ret, n)
return ret
}
err := cluster.startup(nodes)
}
func TestClusterCreate(t *testing.T) {
ch := make(chan interface{})
kv := memkv.NewMemoryKV()
spyClusterStore := &SpyClusterStore{
NodesInfo: NewNodesInfo(),
ch: ch,
}
cluster, err := NewCluster(context.TODO(), kv, spyClusterStore, dummyPosProvider{})
assert.Nil(t, err)
defer cluster.Close()
addr := "localhost:8080"
info := &datapb.DataNodeInfo{
Address: addr,
Version: 1,
Channels: []*datapb.ChannelStatus{},
}
nodes := []*NodeInfo{NewNodeInfo(context.TODO(), info)}
cluster.Startup(nodes)
<-ch
dataNodes, _ := cluster.dataManager.getDataNodes(true)
dataNodes := cluster.GetNodes()
assert.EqualValues(t, 1, len(dataNodes))
assert.EqualValues(t, "localhost:8080", dataNodes[addr].Address)
assert.EqualValues(t, "localhost:8080", dataNodes[0].info.GetAddress())
}
func TestRegister(t *testing.T) {
cPolicy := newMockStartupPolicy()
registerPolicy := newEmptyRegisterPolicy()
cluster := createCluster(t, nil, withStartupPolicy(cPolicy), withRegisterPolicy(registerPolicy))
ch := make(chan interface{})
kv := memkv.NewMemoryKV()
spyClusterStore := &SpyClusterStore{
NodesInfo: NewNodesInfo(),
ch: ch,
}
cluster, err := NewCluster(context.TODO(), kv, spyClusterStore, dummyPosProvider{}, withRegisterPolicy(registerPolicy))
assert.Nil(t, err)
defer cluster.Close()
addr := "localhost:8080"
err := cluster.startup(nil)
assert.Nil(t, err)
cluster.register(&datapb.DataNodeInfo{
cluster.Startup(nil)
info := &datapb.DataNodeInfo{
Address: addr,
Version: 1,
Channels: []*datapb.ChannelStatus{},
})
dataNodes, _ := cluster.dataManager.getDataNodes(true)
}
node := NewNodeInfo(context.TODO(), info)
cluster.Register(node)
<-ch
dataNodes := cluster.GetNodes()
assert.EqualValues(t, 1, len(dataNodes))
assert.EqualValues(t, "localhost:8080", dataNodes[addr].Address)
assert.EqualValues(t, "localhost:8080", dataNodes[0].info.GetAddress())
}
func TestUnregister(t *testing.T) {
cPolicy := newMockStartupPolicy()
unregisterPolicy := newEmptyUnregisterPolicy()
ch := make(chan struct{}, 1)
cluster := createCluster(t, nil, withStartupPolicy(cPolicy), withUnregistorPolicy(unregisterPolicy), mockValidatorOption(ch))
addr := "localhost:8080"
nodes := []*datapb.DataNodeInfo{
{
Address: addr,
Version: 1,
Channels: []*datapb.ChannelStatus{},
},
ch := make(chan interface{})
kv := memkv.NewMemoryKV()
spyClusterStore := &SpyClusterStore{
NodesInfo: NewNodesInfo(),
ch: ch,
}
err := cluster.startup(nodes)
cluster, err := NewCluster(context.TODO(), kv, spyClusterStore, dummyPosProvider{}, withUnregistorPolicy(unregisterPolicy))
assert.Nil(t, err)
<-ch
dataNodes, _ := cluster.dataManager.getDataNodes(true)
assert.EqualValues(t, 1, len(dataNodes))
assert.EqualValues(t, "localhost:8080", dataNodes[addr].Address)
cluster.unregister(&datapb.DataNodeInfo{
defer cluster.Close()
addr := "localhost:8080"
info := &datapb.DataNodeInfo{
Address: addr,
Version: 1,
Channels: []*datapb.ChannelStatus{},
})
dataNodes, _ = cluster.dataManager.getDataNodes(false)
}
nodes := []*NodeInfo{NewNodeInfo(context.TODO(), info)}
cluster.Startup(nodes)
<-ch
dataNodes := cluster.GetNodes()
assert.EqualValues(t, 1, len(dataNodes))
assert.EqualValues(t, "localhost:8080", dataNodes[0].info.GetAddress())
cluster.UnRegister(nodes[0])
<-ch
dataNodes = cluster.GetNodes()
assert.EqualValues(t, 0, len(dataNodes))
}
func TestRefresh(t *testing.T) {
cPolicy := newMockStartupPolicy()
ch := make(chan struct{}, 1)
cluster := createCluster(t, nil, withStartupPolicy(cPolicy), clusterOption{
apply: func(c *cluster) {
c.candidateManager.validate = func(dn *datapb.DataNodeInfo) error {
if strings.Contains(dn.Address, "inv") {
return errors.New("invalid dn")
}
return nil
}
c.candidateManager.enable = func(dn *datapb.DataNodeInfo) error {
err := c.enableDataNode(dn)
ch <- struct{}{}
return err
}
},
})
addr := "localhost:8080"
nodes := []*datapb.DataNodeInfo{
{
Address: addr,
Version: 1,
Channels: []*datapb.ChannelStatus{},
},
{
Address: addr + "invalid",
Version: 1,
Channels: []*datapb.ChannelStatus{},
},
}
err := cluster.startup(nodes)
assert.Nil(t, err)
<-ch
dataNodes, _ := cluster.dataManager.getDataNodes(true)
if !assert.Equal(t, 1, len(dataNodes)) {
t.FailNow()
}
assert.Equal(t, addr, dataNodes[addr].Address)
addr2 := "localhost:8081"
nodes = []*datapb.DataNodeInfo{
{
Address: addr2,
Version: 1,
Channels: []*datapb.ChannelStatus{},
},
{
Address: addr2 + "invalid",
Version: 1,
Channels: []*datapb.ChannelStatus{},
},
}
err = cluster.refresh(nodes)
assert.Nil(t, err)
<-ch
dataNodes, _ = cluster.dataManager.getDataNodes(true)
assert.Equal(t, 1, len(dataNodes))
_, has := dataNodes[addr]
assert.False(t, has)
assert.Equal(t, addr2, dataNodes[addr2].Address)
}
func TestWatchIfNeeded(t *testing.T) {
cPolicy := newMockStartupPolicy()
ch := make(chan struct{}, 1)
cluster := createCluster(t, nil, withStartupPolicy(cPolicy), mockValidatorOption(ch))
addr := "localhost:8080"
nodes := []*datapb.DataNodeInfo{
{
Address: addr,
Version: 1,
Channels: []*datapb.ChannelStatus{},
},
}
err := cluster.startup(nodes)
assert.Nil(t, err)
<-ch
dataNodes, _ := cluster.dataManager.getDataNodes(true)
assert.EqualValues(t, 1, len(dataNodes))
assert.EqualValues(t, "localhost:8080", dataNodes[addr].Address)
chName := "ch1"
cluster.watchIfNeeded(chName, 0)
dataNodes, _ = cluster.dataManager.getDataNodes(true)
assert.EqualValues(t, 1, len(dataNodes[addr].Channels))
assert.EqualValues(t, chName, dataNodes[addr].Channels[0].Name)
cluster.watchIfNeeded(chName, 0)
assert.EqualValues(t, 1, len(dataNodes[addr].Channels))
assert.EqualValues(t, chName, dataNodes[addr].Channels[0].Name)
}
func TestFlushSegments(t *testing.T) {
cPolicy := newMockStartupPolicy()
ch := make(chan struct{}, 1)
cluster := createCluster(t, nil, withStartupPolicy(cPolicy), mockValidatorOption(ch))
addr := "localhost:8080"
nodes := []*datapb.DataNodeInfo{
{
Address: addr,
Version: 1,
Channels: []*datapb.ChannelStatus{},
},
}
err := cluster.startup(nodes)
assert.Nil(t, err)
<-ch
segments := []*datapb.SegmentInfo{
{
ID: 0,
CollectionID: 0,
InsertChannel: "ch1",
},
}
cluster.flush(segments)
}
func mockValidatorOption(ch chan<- struct{}) clusterOption {
return clusterOption{
apply: func(c *cluster) {
c.candidateManager.validate = func(dn *datapb.DataNodeInfo) error {
return nil
}
c.candidateManager.enable = func(dn *datapb.DataNodeInfo) error {
err := c.enableDataNode(dn)
ch <- struct{}{}
return err
}
},
}
}
func createCluster(t *testing.T, ch chan interface{}, options ...clusterOption) *cluster {
ch := make(chan interface{})
kv := memkv.NewMemoryKV()
sessionManager := newMockSessionManager(ch)
dataManager, err := newClusterNodeManager(kv)
spyClusterStore := &SpyClusterStore{
NodesInfo: NewNodesInfo(),
ch: ch,
}
pch := make(chan interface{})
spyPolicy := spyWatchPolicy(pch)
cluster, err := NewCluster(context.TODO(), kv, spyClusterStore, dummyPosProvider{}, withAssignPolicy(spyPolicy))
assert.Nil(t, err)
return newCluster(context.TODO(), dataManager, sessionManager, dummyPosProvider{}, options...)
defer cluster.Close()
addr := "localhost:8080"
info := &datapb.DataNodeInfo{
Address: addr,
Version: 1,
Channels: []*datapb.ChannelStatus{},
}
node := NewNodeInfo(context.TODO(), info)
node.client, err = newMockDataNodeClient(1, make(chan interface{}))
assert.Nil(t, err)
nodes := []*NodeInfo{node}
cluster.Startup(nodes)
fmt.Println("11111")
<-ch
chName := "ch1"
cluster.Watch(chName, 0)
fmt.Println("222")
<-ch
dataNodes := cluster.GetNodes()
assert.EqualValues(t, 1, len(dataNodes[0].info.GetChannels()))
assert.EqualValues(t, chName, dataNodes[0].info.Channels[0].Name)
cluster.Watch(chName, 0)
<-pch
}

View File

@ -30,7 +30,7 @@ type dummyPosProvider struct{}
//GetVChanPositions implements positionProvider
func (dp dummyPosProvider) GetVChanPositions(vchans []vchannel, isAccurate bool) ([]*datapb.VchannelInfo, error) {
pairs := make([]*datapb.VchannelInfo, len(vchans))
pairs := make([]*datapb.VchannelInfo, 0, len(vchans))
for _, vchan := range vchans {
pairs = append(pairs, &datapb.VchannelInfo{
CollectionID: vchan.CollectionID,

View File

@ -1,150 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package datacoord
import (
"sort"
"time"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
)
// flushMonitor check segments / channels meet the provided flush policy
type flushMonitor struct {
meta *meta
segmentPolicy SegmentFlushPolicy
channelPolicy ChannelFlushPolicy
}
// SegmentFlushPolicy checks segment size and returns whether segment needs to be flushed
type SegmentFlushPolicy func(*datapb.SegmentInfo) bool
// ChannelFlushPolicy checks segments inside single Vchannel count and returns segment ids needs to be flushed
type ChannelFlushPolicy func(string, []*datapb.SegmentInfo, *internalpb.MsgPosition) []UniqueID
// emptyFlushMonitor returns empty flush montior
func emptyFlushMonitor(meta *meta) flushMonitor {
return flushMonitor{
meta: meta,
}
}
// defaultFlushMonitor generates auto flusher with default policies
func defaultFlushMonitor(meta *meta) flushMonitor {
return flushMonitor{
meta: meta,
// segmentPolicy: estSegmentSizePolicy(1024, 1024*1024*1536), // row 1024 byte, limit 1.5GiB
channelPolicy: channelSizeEpochPolicy(1024, uint64(time.Hour)),
}
}
// CheckSegments check segments meet flush policy, returns segment id needs to flush
func (f flushMonitor) CheckSegments(segments []*datapb.SegmentInfo) []UniqueID {
if f.segmentPolicy == nil {
return []UniqueID{}
}
result := make([]UniqueID, 0, len(segments))
for _, segment := range segments {
if f.segmentPolicy(segment) {
result = append(result, segment.ID)
}
}
return result
}
// CheckChannels check channels changed, apply `ChannelPolicy`
func (f flushMonitor) CheckChannels(channels []string, latest *internalpb.MsgPosition) []UniqueID {
segHits := make(map[UniqueID]struct{})
for _, channel := range channels {
segments := f.meta.GetSegmentsByChannel(channel)
growingSegments := make([]*datapb.SegmentInfo, 0, len(segments))
for _, segment := range segments {
if segment.State != commonpb.SegmentState_Growing {
continue
}
growingSegments = append(growingSegments, segment)
if f.segmentPolicy != nil && f.segmentPolicy(segment) {
segHits[segment.ID] = struct{}{}
}
}
if f.channelPolicy != nil {
hits := f.channelPolicy(channel, growingSegments, latest)
for _, hit := range hits {
segHits[hit] = struct{}{}
}
}
}
result := make([]UniqueID, 0, len(segHits))
for segID := range segHits {
result = append(result, segID)
}
return result
}
// deprecated
func estSegmentSizePolicy(rowSize, limit int64) SegmentFlushPolicy {
return func(seg *datapb.SegmentInfo) bool {
if seg == nil {
return false
}
if seg.NumOfRows*rowSize > limit {
return true
}
return false
}
}
// channelSizeEpochPolicy policy check channel sizes and segment life time
// segmentMax is the max number of segment allowed in the channel
// epochDuration is the max live time segment has
func channelSizeEpochPolicy(segmentMax int, epochDuration uint64) ChannelFlushPolicy {
return func(channel string, segments []*datapb.SegmentInfo, latest *internalpb.MsgPosition) []UniqueID {
if len(segments) < segmentMax && latest == nil {
return []UniqueID{}
}
sortSegmentsByDmlPos(segments)
result := []UniqueID{}
overflow := len(segments) - segmentMax
for idx, segment := range segments {
if idx < overflow {
result = append(result, segment.ID)
continue
}
if latest != nil {
if segment.DmlPosition == nil || latest.Timestamp-segment.DmlPosition.Timestamp > epochDuration {
result = append(result, segment.ID)
continue
}
}
break
}
return result
}
}
// sortSegmentsByDmlPos sorts input segments in ascending order by `DmlPosition.Timestamp`, nil value is less than 0
func sortSegmentsByDmlPos(segments []*datapb.SegmentInfo) {
sort.Slice(segments, func(i, j int) bool {
if segments[i].DmlPosition == nil {
return true
}
if segments[j].DmlPosition == nil {
return false
}
return segments[i].DmlPosition.Timestamp < segments[j].DmlPosition.Timestamp
})
}

View File

@ -1,118 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package datacoord
import (
"testing"
"time"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/stretchr/testify/assert"
)
func TestFlushMonitor(t *testing.T) {
const collID = UniqueID(0)
const partID0 = UniqueID(100)
const partID1 = UniqueID(101)
const channelName = "c1"
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator)
assert.Nil(t, err)
testSchema := newTestSchema()
collInfo := &datapb.CollectionInfo{
ID: collID,
Schema: testSchema,
Partitions: []UniqueID{partID0, partID1},
}
meta.AddCollection(collInfo)
// create seg0 for partition0, seg0/seg1 for partition1
segID0_0, err := mockAllocator.allocID()
assert.Nil(t, err)
segInfo0_0 := buildSegment(collID, partID0, segID0_0, channelName)
segID1_0, err := mockAllocator.allocID()
assert.Nil(t, err)
segInfo1_0 := buildSegment(collID, partID1, segID1_0, channelName)
segID1_1, err := mockAllocator.allocID()
assert.Nil(t, err)
segInfo1_1 := buildSegment(collID, partID1, segID1_1, channelName)
// check AddSegment
err = meta.AddSegment(segInfo0_0)
assert.Nil(t, err)
err = meta.AddSegment(segInfo1_0)
assert.Nil(t, err)
err = meta.AddSegment(segInfo1_1)
assert.Nil(t, err)
t.Run("Test empty flush monitor", func(t *testing.T) {
fm := emptyFlushMonitor(meta)
ids := fm.CheckSegments([]*datapb.SegmentInfo{})
assert.Equal(t, 0, len(ids))
ids = fm.CheckChannels([]string{channelName}, nil)
assert.Equal(t, 0, len(ids))
})
t.Run("Test custom segment policy", func(t *testing.T) {
fm := emptyFlushMonitor(meta)
fm.segmentPolicy = estSegmentSizePolicy(1024*1024, 1024*1024*2) // row size 1Mib Limit 2 MB
segID3Rows, err := mockAllocator.allocID()
assert.Nil(t, err)
segInfo3Rows := buildSegment(collID, partID1, segID3Rows, channelName)
segInfo3Rows.NumOfRows = 3
ids := fm.CheckSegments([]*datapb.SegmentInfo{segInfo3Rows})
if assert.Equal(t, 1, len(ids)) {
assert.Equal(t, segID3Rows, ids[0])
}
})
t.Run("Test custom channel policy", func(t *testing.T) {
const channelName2 = `ch2`
fm := emptyFlushMonitor(meta)
fm.channelPolicy = channelSizeEpochPolicy(100, uint64(time.Hour))
for i := 0; i < 100; i++ {
segID, err := mockAllocator.allocID()
assert.Nil(t, err)
seg := buildSegment(collID, partID0, segID, channelName2)
seg.DmlPosition = &internalpb.MsgPosition{
Timestamp: uint64(i + 1),
}
meta.AddSegment(seg)
}
ids := fm.CheckChannels([]string{channelName2}, nil)
assert.Equal(t, 0, len(ids))
exSegID, err := mockAllocator.allocID()
assert.Nil(t, err)
seg := buildSegment(collID, partID0, exSegID, channelName2)
seg.DmlPosition = &internalpb.MsgPosition{
Timestamp: uint64(0), // the oldest
}
meta.AddSegment(seg)
ids = fm.CheckChannels([]string{channelName2}, nil)
if assert.Equal(t, 1, len(ids)) {
assert.Equal(t, exSegID, ids[0])
}
ids = fm.CheckChannels([]string{channelName2}, &internalpb.MsgPosition{Timestamp: uint64(time.Hour + 5)})
assert.Equal(t, 5, len(ids))
})
}

View File

@ -60,6 +60,7 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F
resp.Status.Reason = fmt.Sprintf("Failed to flush %d, %s", req.CollectionID, err)
return resp, nil
}
log.Debug("flush response with segments", zap.Any("segments", sealedSegments))
resp.Status.ErrorCode = commonpb.ErrorCode_Success
resp.DbID = req.GetDbID()
resp.CollectionID = req.GetCollectionID()
@ -110,8 +111,7 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
//assigns = append(assigns, result)
//continue
//}
s.cluster.watchIfNeeded(r.ChannelName, r.CollectionID)
s.cluster.Watch(r.ChannelName, r.CollectionID)
segmentID, retCount, expireTs, err := s.segmentManager.AllocSegment(ctx,
r.CollectionID, r.PartitionID, r.ChannelName, int64(r.Count))

View File

@ -16,7 +16,6 @@ import (
"time"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -301,38 +300,3 @@ func (m *mockRootCoordService) SegmentFlushCompleted(ctx context.Context, in *da
func (m *mockRootCoordService) AddNewSegment(ctx context.Context, in *datapb.SegmentMsg) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}
type mockStartupPolicy struct {
}
func newMockStartupPolicy() clusterStartupPolicy {
return &mockStartupPolicy{}
}
func (p *mockStartupPolicy) apply(oldCluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange, buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus) {
return nil, nil
}
type mockSessionManager struct {
ch chan interface{}
}
func newMockSessionManager(ch chan interface{}) sessionManager {
return &mockSessionManager{
ch: ch,
}
}
func (m *mockSessionManager) getSession(addr string) (types.DataNode, error) {
return newMockDataNodeClient(0, m.ch)
}
func (m *mockSessionManager) getOrCreateSession(addr string) (types.DataNode, error) {
return newMockDataNodeClient(0, m.ch)
}
func (m *mockSessionManager) releaseSession(addr string) {
}
func (m *mockSessionManager) release() {
}

View File

@ -27,108 +27,18 @@ type clusterDeltaChange struct {
restarts []string
}
// clusterStartupPolicy defines the behavior when datacoord starts/restarts
type clusterStartupPolicy interface {
// apply accept all nodes and new/offline/restarts nodes and returns datanodes whose status need to be changed
apply(oldCluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange, buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus)
}
type watchRestartsStartupPolicy struct {
}
func newWatchRestartsStartupPolicy() clusterStartupPolicy {
return watchRestartStartup
}
// startup func
type startupFunc func(cluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange,
buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus)
// implement watchRestartsStartupPolicy for startupFunc
func (f startupFunc) apply(cluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange,
buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus) {
return f(cluster, delta, buffer)
}
var watchRestartStartup startupFunc = func(cluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange,
buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus) {
ret := make([]*datapb.DataNodeInfo, 0)
for _, addr := range delta.restarts {
node := cluster[addr]
for _, ch := range node.Channels {
ch.State = datapb.ChannelWatchState_Uncomplete
}
ret = append(ret, node)
}
// put all channels from offline into buffer first
for _, addr := range delta.offlines {
node := cluster[addr]
for _, ch := range node.Channels {
ch.State = datapb.ChannelWatchState_Uncomplete
buffer = append(buffer, ch)
}
}
// try new nodes first
if len(delta.newNodes) > 0 && len(buffer) > 0 {
idx := 0
for len(buffer) > 0 {
node := cluster[delta.newNodes[idx%len(delta.newNodes)]]
node.Channels = append(node.Channels, buffer[0])
buffer = buffer[1:]
if idx < len(delta.newNodes) {
ret = append(ret, node)
}
idx++
}
}
// try online nodes if buffer is not empty
if len(buffer) > 0 {
online := make([]*datapb.DataNodeInfo, 0, len(cluster))
for _, node := range cluster {
online = append(online, node)
}
if len(online) > 0 {
idx := 0
for len(buffer) > 0 {
node := online[idx%len(online)]
node.Channels = append(node.Channels, buffer[0])
buffer = buffer[1:]
if idx < len(online) {
ret = append(ret, node)
}
idx++
}
}
}
return ret, buffer
}
// dataNodeRegisterPolicy defines the behavior when a datanode is registered
type dataNodeRegisterPolicy interface {
// apply accept all online nodes and new created node, returns nodes needed to be changed
apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo, buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus)
}
// data node register func, simple func wrapping policy
type dataNodeRegisterFunc func(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo, buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus)
// implement dataNodeRegisterPolicy for dataNodeRegisterFunc
func (f dataNodeRegisterFunc) apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo,
buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus) {
return f(cluster, session, buffer)
}
type dataNodeRegisterPolicy func(cluster []*NodeInfo, session *NodeInfo, buffer []*datapb.ChannelStatus) ([]*NodeInfo, []*datapb.ChannelStatus)
// test logic, register and do nothing
var emptyRegister dataNodeRegisterFunc = func(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo,
buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus) {
return []*datapb.DataNodeInfo{session}, buffer
var emptyRegister dataNodeRegisterPolicy = func(cluster []*NodeInfo, session *NodeInfo, buffer []*datapb.ChannelStatus) ([]*NodeInfo, []*datapb.ChannelStatus) {
return []*NodeInfo{session}, buffer
}
// assign existing buffered channels into newly registered data node session
var registerAssignWithBuffer dataNodeRegisterFunc = func(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo,
buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus) {
session.Channels = append(session.Channels, buffer...)
return []*datapb.DataNodeInfo{session}, []*datapb.ChannelStatus{}
var registerAssignWithBuffer dataNodeRegisterPolicy = func(cluster []*NodeInfo, session *NodeInfo, buffer []*datapb.ChannelStatus) ([]*NodeInfo, []*datapb.ChannelStatus) {
node := session.Clone(AddChannels(buffer))
return []*NodeInfo{node}, []*datapb.ChannelStatus{}
}
func newEmptyRegisterPolicy() dataNodeRegisterPolicy {
@ -139,41 +49,35 @@ func newAssiggBufferRegisterPolicy() dataNodeRegisterPolicy {
return registerAssignWithBuffer
}
// dataNodeUnregisterPolicy defines the behavior when datanode unregisters
type dataNodeUnregisterPolicy interface {
// apply accept all online nodes and unregistered node, returns nodes needed to be changed
apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo
}
// unregisterNodeFunc, short cut for functions implement policy
type unregisterNodeFunc func(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo
// implement dataNodeUnregisterPolicy for unregisterNodeFunc
func (f unregisterNodeFunc) apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo {
return f(cluster, session)
}
type dataNodeUnregisterPolicy func(cluster []*NodeInfo, session *NodeInfo) []*NodeInfo
// test logic, do nothing when node unregister
var emptyUnregisterFunc unregisterNodeFunc = func(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo {
var emptyUnregisterFunc dataNodeUnregisterPolicy = func(cluster []*NodeInfo, session *NodeInfo) []*NodeInfo {
return nil
}
// randomly assign channels from unregistered node into existing nodes
// if there is no nodes online, this func will not be invoked, buffer will be filled outside this func
var randomAssignRegisterFunc unregisterNodeFunc = func(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo {
var randomAssignRegisterFunc dataNodeUnregisterPolicy = func(cluster []*NodeInfo, session *NodeInfo) []*NodeInfo {
if len(cluster) == 0 || // no available node
session == nil ||
len(session.Channels) == 0 { // lost node not watching any channels
return []*datapb.DataNodeInfo{}
len(session.info.GetChannels()) == 0 { // lost node not watching any channels
return []*NodeInfo{}
}
appliedNodes := make([]*datapb.DataNodeInfo, 0, len(session.Channels))
appliedNodes := make([]*NodeInfo, 0, len(session.info.GetChannels()))
channels := session.info.GetChannels()
// clear unregistered node's channels
node := session.Clone(SetChannels(nil))
appliedNodes = append(appliedNodes, node)
raResult := make(map[int][]*datapb.ChannelStatus)
for _, chanSt := range session.Channels {
for _, chanSt := range channels {
bIdx, err := rand.Int(rand.Reader, big.NewInt(int64(len(cluster))))
if err != nil {
log.Error("error generated rand idx", zap.Error(err))
return []*datapb.DataNodeInfo{}
return []*NodeInfo{}
}
idx := bIdx.Int64()
if int(idx) >= len(cluster) {
@ -193,11 +97,10 @@ var randomAssignRegisterFunc unregisterNodeFunc = func(cluster map[string]*datap
cs, ok := raResult[i]
i++
if ok {
node.Channels = append(node.Channels, cs...)
appliedNodes = append(appliedNodes, node)
n := node.Clone(AddChannels(cs))
appliedNodes = append(appliedNodes, n)
}
}
return appliedNodes
}
@ -205,27 +108,16 @@ func newEmptyUnregisterPolicy() dataNodeUnregisterPolicy {
return emptyUnregisterFunc
}
// channelAssignPolicy defines the behavior when a new channel needs to be assigned
type channelAssignPolicy interface {
// apply accept all online nodes and new created channel with collectionID, returns node needed to be changed
apply(cluster map[string]*datapb.DataNodeInfo, channel string, collectionID UniqueID) []*datapb.DataNodeInfo
}
// channelAssignFunc, function shortcut for policy
type channelAssignFunc func(cluster map[string]*datapb.DataNodeInfo, channel string, collectionID UniqueID) []*datapb.DataNodeInfo
// implement channelAssignPolicy for channelAssign func
func (f channelAssignFunc) apply(cluster map[string]*datapb.DataNodeInfo, channel string, collectionID UniqueID) []*datapb.DataNodeInfo {
return f(cluster, channel, collectionID)
}
type channelAssignPolicy func(cluster []*NodeInfo, channel string, collectionID UniqueID) []*NodeInfo
// deprecated
// test logic, assign channel to all existing data node, works fine only when there is only one data node!
var assignAllFunc channelAssignFunc = func(cluster map[string]*datapb.DataNodeInfo, channel string, collectionID UniqueID) []*datapb.DataNodeInfo {
ret := make([]*datapb.DataNodeInfo, 0)
var assignAllFunc channelAssignPolicy = func(cluster []*NodeInfo, channel string, collectionID UniqueID) []*NodeInfo {
ret := make([]*NodeInfo, 0)
for _, node := range cluster {
has := false
for _, ch := range node.Channels {
for _, ch := range node.info.GetChannels() {
if ch.Name == channel {
has = true
break
@ -234,45 +126,47 @@ var assignAllFunc channelAssignFunc = func(cluster map[string]*datapb.DataNodeIn
if has {
continue
}
node.Channels = append(node.Channels, &datapb.ChannelStatus{
c := &datapb.ChannelStatus{
Name: channel,
State: datapb.ChannelWatchState_Uncomplete,
CollectionID: collectionID,
})
ret = append(ret, node)
}
n := node.Clone(AddChannels([]*datapb.ChannelStatus{c}))
ret = append(ret, n)
}
return ret
}
// balanced assign channel, select the datanode with least amount of channels to assign
var balancedAssignFunc channelAssignFunc = func(cluster map[string]*datapb.DataNodeInfo, channel string, collectionID UniqueID) []*datapb.DataNodeInfo {
var balancedAssignFunc channelAssignPolicy = func(cluster []*NodeInfo, channel string, collectionID UniqueID) []*NodeInfo {
if len(cluster) == 0 {
return []*datapb.DataNodeInfo{}
return []*NodeInfo{}
}
// filter existed channel
for _, node := range cluster {
for _, c := range node.GetChannels() {
for _, c := range node.info.GetChannels() {
if c.GetName() == channel && c.GetCollectionID() == collectionID {
return nil
}
}
}
target, min := "", math.MaxInt32
target, min := -1, math.MaxInt32
for k, v := range cluster {
if len(v.GetChannels()) < min {
if len(v.info.GetChannels()) < min {
target = k
min = len(v.GetChannels())
min = len(v.info.GetChannels())
}
}
ret := make([]*datapb.DataNodeInfo, 0)
cluster[target].Channels = append(cluster[target].Channels, &datapb.ChannelStatus{
ret := make([]*NodeInfo, 0)
c := &datapb.ChannelStatus{
Name: channel,
State: datapb.ChannelWatchState_Uncomplete,
CollectionID: collectionID,
})
ret = append(ret, cluster[target])
}
n := cluster[target].Clone(AddChannels([]*datapb.ChannelStatus{c}))
ret = append(ret, n)
return ret
}

View File

@ -12,96 +12,60 @@
package datacoord
import (
"context"
"testing"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/stretchr/testify/assert"
)
func TestWatchRestartsPolicy(t *testing.T) {
p := newWatchRestartsStartupPolicy()
c := make(map[string]*datapb.DataNodeInfo)
c["localhost:1111"] = &datapb.DataNodeInfo{
Address: "localhost:1111",
Version: 0,
Channels: []*datapb.ChannelStatus{
{
Name: "vch1",
State: datapb.ChannelWatchState_Complete,
CollectionID: 0,
},
},
}
c["localhost:2222"] = &datapb.DataNodeInfo{
Address: "localhost:2222",
Version: 0,
Channels: []*datapb.ChannelStatus{
{
Name: "vch2",
State: datapb.ChannelWatchState_Complete,
CollectionID: 0,
},
},
}
dchange := &clusterDeltaChange{
newNodes: []string{},
offlines: []string{},
restarts: []string{"localhost:2222"},
}
nodes, _ := p.apply(c, dchange, []*datapb.ChannelStatus{})
assert.EqualValues(t, 1, len(nodes))
assert.EqualValues(t, datapb.ChannelWatchState_Uncomplete, nodes[0].Channels[0].State)
}
func TestRandomReassign(t *testing.T) {
p := randomAssignRegisterFunc
clusters := make(map[string]*datapb.DataNodeInfo)
clusters["addr1"] = &datapb.DataNodeInfo{
clusters := make([]*NodeInfo, 0)
info1 := &datapb.DataNodeInfo{
Address: "addr1",
Channels: make([]*datapb.ChannelStatus, 0, 10),
}
clusters["addr2"] = &datapb.DataNodeInfo{
info2 := &datapb.DataNodeInfo{
Address: "addr2",
Channels: make([]*datapb.ChannelStatus, 0, 10),
}
clusters["addr3"] = &datapb.DataNodeInfo{
info3 := &datapb.DataNodeInfo{
Address: "addr3",
Channels: make([]*datapb.ChannelStatus, 0, 10),
}
cases := []*datapb.DataNodeInfo{
{
Channels: []*datapb.ChannelStatus{},
},
{
Channels: []*datapb.ChannelStatus{
{Name: "VChan1", CollectionID: 1},
{Name: "VChan2", CollectionID: 2},
},
},
{
Channels: []*datapb.ChannelStatus{
{Name: "VChan3", CollectionID: 1},
{Name: "VChan4", CollectionID: 2},
},
node1 := NewNodeInfo(context.TODO(), info1)
node2 := NewNodeInfo(context.TODO(), info2)
node3 := NewNodeInfo(context.TODO(), info3)
clusters = append(clusters, node1, node2, node3)
caseInfo1 := &datapb.DataNodeInfo{
Channels: []*datapb.ChannelStatus{},
}
caseInfo2 := &datapb.DataNodeInfo{
Channels: []*datapb.ChannelStatus{
{Name: "VChan1", CollectionID: 1},
{Name: "VChan2", CollectionID: 2},
},
}
cases := []*NodeInfo{
{info: caseInfo1},
{info: caseInfo2},
nil,
}
for _, ca := range cases {
nodes := p.apply(clusters, ca)
if ca == nil || len(ca.Channels) == 0 {
nodes := p(clusters, ca)
if ca == nil || len(ca.info.GetChannels()) == 0 {
assert.Equal(t, 0, len(nodes))
} else {
for _, ch := range ca.Channels {
for _, ch := range ca.info.GetChannels() {
found := false
loop:
for _, node := range nodes {
for _, nch := range node.Channels {
for _, nch := range node.info.GetChannels() {
if nch.Name == ch.Name {
found = true
assert.EqualValues(t, datapb.ChannelWatchState_Uncomplete, nch.State)

View File

@ -75,7 +75,7 @@ type Server struct {
meta *meta
segmentManager Manager
allocator allocator
cluster *cluster
cluster *Cluster
rootCoordClient types.RootCoord
ddChannelName string
@ -169,13 +169,9 @@ func (s *Server) Start() error {
}
func (s *Server) initCluster() error {
dManager, err := newClusterNodeManager(s.kvClient)
if err != nil {
return err
}
sManager := newClusterSessionManager(s.ctx, s.dataClientCreator)
s.cluster = newCluster(s.ctx, dManager, sManager, s)
return nil
var err error
s.cluster, err = NewCluster(s.ctx, s.kvClient, NewNodesInfo(), s)
return err
}
func (s *Server) initServiceDiscovery() error {
@ -186,19 +182,18 @@ func (s *Server) initServiceDiscovery() error {
}
log.Debug("registered sessions", zap.Any("sessions", sessions))
datanodes := make([]*datapb.DataNodeInfo, 0, len(sessions))
datanodes := make([]*NodeInfo, 0, len(sessions))
for _, session := range sessions {
datanodes = append(datanodes, &datapb.DataNodeInfo{
info := &datapb.DataNodeInfo{
Address: session.Address,
Version: session.ServerID,
Channels: []*datapb.ChannelStatus{},
})
}
nodeInfo := NewNodeInfo(s.ctx, info)
datanodes = append(datanodes, nodeInfo)
}
if err := s.cluster.startup(datanodes); err != nil {
log.Debug("DataCoord loadMetaFromRootCoord failed", zap.Error(err))
return err
}
s.cluster.Startup(datanodes)
s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1)
return nil
@ -349,7 +344,7 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
segmentInfos = append(segmentInfos, sInfo)
}
if len(segmentInfos) > 0 {
s.cluster.flush(segmentInfos)
s.cluster.Flush(segmentInfos)
}
s.segmentManager.ExpireAllocations(ch, ts)
}
@ -365,24 +360,23 @@ func (s *Server) startWatchService(ctx context.Context) {
log.Debug("watch service shutdown")
return
case event := <-s.eventCh:
datanode := &datapb.DataNodeInfo{
info := &datapb.DataNodeInfo{
Address: event.Session.Address,
Version: event.Session.ServerID,
Channels: []*datapb.ChannelStatus{},
}
node := NewNodeInfo(ctx, info)
switch event.EventType {
case sessionutil.SessionAddEvent:
log.Info("Received datanode register",
zap.String("address", datanode.Address),
zap.Int64("serverID", datanode.Version))
//s.cluster.register(datanode)
s.cluster.refresh(s.loadDataNodes())
zap.String("address", info.Address),
zap.Int64("serverID", info.Version))
s.cluster.Register(node)
case sessionutil.SessionDelEvent:
log.Info("Received datanode unregister",
zap.String("address", datanode.Address),
zap.Int64("serverID", datanode.Version))
//s.cluster.unregister(datanode)
s.cluster.refresh(s.loadDataNodes())
zap.String("address", info.Address),
zap.Int64("serverID", info.Version))
s.cluster.UnRegister(node)
default:
log.Warn("receive unknown service event type",
zap.Any("type", event.EventType))
@ -482,7 +476,7 @@ func (s *Server) Stop() error {
}
log.Debug("DataCoord server shutdown")
atomic.StoreInt64(&s.isServing, ServerStateStopped)
s.cluster.releaseSessions()
s.cluster.Close()
s.stopServerLoop()
return nil
}

View File

@ -498,8 +498,7 @@ func TestDataNodeTtChannel(t *testing.T) {
},
}
}
svr.cluster.sessionManager.getOrCreateSession("localhost:7777") // trigger create session manually
svr.cluster.register(&datapb.DataNodeInfo{
info := &datapb.DataNodeInfo{
Address: "localhost:7777",
Version: 0,
Channels: []*datapb.ChannelStatus{
@ -508,7 +507,11 @@ func TestDataNodeTtChannel(t *testing.T) {
State: datapb.ChannelWatchState_Complete,
},
},
})
}
node := NewNodeInfo(context.TODO(), info)
node.client, err = newMockDataNodeClient(1, ch)
assert.Nil(t, err)
svr.cluster.Register(node)
t.Run("Test segment flush after tt", func(t *testing.T) {
resp, err := svr.AssignSegmentID(context.TODO(), &datapb.AssignSegmentIDRequest{

View File

@ -1,73 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package datacoord
import (
"errors"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/msgstream"
)
type streamType int
const (
_ streamType = iota
streamTypeFlush
streamTypeStats
)
var (
errInvalidStreamType = errors.New("invalid stream type")
)
// storeStreamPos store current processed stream pos
func (s *Server) storeStreamPos(st streamType, pos *msgstream.MsgPosition) error {
key := s.streamTypeSubKey(st)
if key == "" {
return errInvalidStreamType
}
val := proto.MarshalTextString(pos)
err := s.kvClient.Save(key, val)
if err != nil {
return err
}
return nil
}
// loadStreamLastPos load last successful pos with specified stream type
func (s *Server) loadStreamLastPos(st streamType) (pos *msgstream.MsgPosition, err error) {
key := s.streamTypeSubKey(st)
if key == "" {
return nil, errInvalidStreamType
}
var val string
pos = &msgstream.MsgPosition{}
val, err = s.kvClient.Load(key)
if err != nil {
return pos, err
}
err = proto.UnmarshalText(val, pos)
return pos, err
}
// streamTypeSubKey converts stream type to corresponding k-v store key
func (s *Server) streamTypeSubKey(st streamType) string {
switch st {
case streamTypeFlush:
return Params.FlushStreamPosSubPath
case streamTypeStats:
return Params.StatsStreamPosSubPath
default:
return ""
}
}