support dml channel balancer on datacoord (#22324) (#22377) (#22692)

Signed-off-by: MrPresent-Han <jamesharden11122@gmail.com>
pull/22846/head
MrPresent-Han 2023-03-20 10:01:56 +08:00 committed by GitHub
parent bd054d90c9
commit 77c9e33e70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 574 additions and 131 deletions

View File

@ -257,6 +257,8 @@ indexNode:
dataCoord:
channel:
watchTimeoutInterval: 30 # Timeout on watching channels (in seconds). Datanode tickler update watch progress will reset timeout timer.
balanceSilentDuration: 300 # The duration before the channelBalancer on datacoord to run
balanceInterval: 360 #The interval for the channelBalancer on datacoord to check balance status
segment:
maxSize: 512 # Maximum size of a segment in MB
diskSegmentMaxSize: 2048 # Maximun size of a segment in MB for collection which has Disk index

View File

@ -29,6 +29,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/atomic"
"go.uber.org/zap"
)
@ -39,6 +40,9 @@ type channelStateTimer struct {
runningTimerStops sync.Map // channel name to timer stop channels
etcdWatcher clientv3.WatchChan
timeoutWatcher chan *ackEvent
//Modifies afterwards must guarantee that runningTimerCount is updated synchronized with runningTimers
//in order to keep consistency
runningTimerCount atomic.Int32
}
func newChannelStateTimer(kv kv.MetaKv) *channelStateTimer {
@ -92,35 +96,40 @@ func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channe
}
stop := make(chan struct{})
timer := time.NewTimer(timeout)
ticker := time.NewTimer(timeout)
c.removeTimers([]string{channelName})
c.runningTimerStops.Store(channelName, stop)
c.runningTimers.Store(channelName, timer)
c.runningTimers.Store(channelName, ticker)
c.runningTimerCount.Inc()
go func() {
log.Info("timer started",
zap.String("watch state", watchState.String()),
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName),
zap.Duration("check interval", timeout))
defer timer.Stop()
defer ticker.Stop()
select {
case <-timer.C:
case <-ticker.C:
// check tickle at path as :tickle/[prefix]/{channel_name}
c.removeTimers([]string{channelName})
log.Info("timeout and stop timer: wait for channel ACK timeout",
zap.String("watch state", watchState.String()),
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName),
zap.Duration("timeout interval", timeout))
zap.Duration("timeout interval", timeout),
zap.Int32("runningTimerCount", c.runningTimerCount.Load()))
ackType := getAckType(watchState)
c.notifyTimeoutWatcher(&ackEvent{ackType, channelName, nodeID})
return
case <-stop:
log.Info("stop timer before timeout",
zap.String("watch state", watchState.String()),
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName),
zap.Duration("timeout interval", timeout))
zap.Duration("timeout interval", timeout),
zap.Int32("runningTimerCount", c.runningTimerCount.Load()))
return
}
}()
}
@ -134,6 +143,9 @@ func (c *channelStateTimer) removeTimers(channels []string) {
if stop, ok := c.runningTimerStops.LoadAndDelete(channel); ok {
close(stop.(chan struct{}))
c.runningTimers.Delete(channel)
c.runningTimerCount.Dec()
log.Info("remove timer for channel", zap.String("channel", channel),
zap.Int32("timerCount", c.runningTimerCount.Load()))
}
}
}
@ -143,6 +155,9 @@ func (c *channelStateTimer) stopIfExist(e *ackEvent) {
if ok && e.ackType != watchTimeoutAck && e.ackType != releaseTimeoutAck {
close(stop.(chan struct{}))
c.runningTimers.Delete(e.channelName)
c.runningTimerCount.Dec()
log.Info("stop timer for channel", zap.String("channel", e.channelName),
zap.Int32("timerCount", c.runningTimerCount.Load()))
}
}
@ -153,6 +168,12 @@ func (c *channelStateTimer) resetIfExist(channel string, interval time.Duration)
}
}
// Note here the reading towards c.running are not protected by mutex
// because it's meaningless, since we cannot guarantee the following add/delete node operations
func (c *channelStateTimer) hasRunningTimers() bool {
return c.runningTimerCount.Load() != 0
}
func parseWatchInfo(key string, data []byte) (*datapb.ChannelWatchInfo, error) {
watchInfo := datapb.ChannelWatchInfo{}
if err := proto.Unmarshal(data, &watchInfo); err != nil {

View File

@ -37,8 +37,6 @@ import (
"stathat.com/c/consistent"
)
const bgCheckInterval = 3 * time.Second
// ChannelManager manages the allocation and the balance between channels and data nodes.
type ChannelManager struct {
ctx context.Context
@ -51,11 +49,14 @@ type ChannelManager struct {
assignPolicy ChannelAssignPolicy
reassignPolicy ChannelReassignPolicy
bgChecker ChannelBGChecker
balancePolicy BalanceChannelPolicy
msgstreamFactory msgstream.Factory
stateChecker channelStateChecker
stopChecker context.CancelFunc
stateTimer *channelStateTimer
lastActiveTimestamp time.Time
}
type channel struct {
@ -90,6 +91,10 @@ func withStateChecker() ChannelManagerOpt {
return func(c *ChannelManager) { c.stateChecker = c.watchChannelStatesLoop }
}
func withBgChecker() ChannelManagerOpt {
return func(c *ChannelManager) { c.bgChecker = c.bgCheckChannelsWork }
}
// NewChannelManager creates and returns a new ChannelManager instance.
func NewChannelManager(
kv kv.MetaKv, // for TxnKv and MetaKv
@ -116,7 +121,8 @@ func NewChannelManager(
c.deregisterPolicy = c.factory.NewDeregisterPolicy()
c.assignPolicy = c.factory.NewAssignPolicy()
c.reassignPolicy = c.factory.NewReassignPolicy()
c.bgChecker = c.factory.NewBgChecker()
c.balancePolicy = c.factory.NewBalancePolicy()
c.lastActiveTimestamp = time.Now()
return c, nil
}
@ -155,13 +161,18 @@ func (c *ChannelManager) Startup(ctx context.Context, nodes []int64) error {
// Unwatch and drop channel with drop flag.
c.unwatchDroppedChannels()
checkerContext, cancel := context.WithCancel(ctx)
c.stopChecker = cancel
if c.stateChecker != nil {
ctx1, cancel := context.WithCancel(ctx)
c.stopChecker = cancel
go c.stateChecker(ctx1)
go c.stateChecker(checkerContext)
log.Info("starting etcd states checker")
}
if c.bgChecker != nil {
go c.bgChecker(checkerContext)
log.Info("starting background balance checker")
}
log.Info("cluster start up",
zap.Int64s("nodes", nodes),
zap.Int64s("oNodes", oNodes),
@ -246,38 +257,25 @@ func (c *ChannelManager) unwatchDroppedChannels() {
}
}
// NOT USED.
func (c *ChannelManager) bgCheckChannelsWork(ctx context.Context) {
ticker := time.NewTicker(bgCheckInterval)
ticker := time.NewTicker(Params.DataCoordCfg.ChannelBalanceInterval.GetAsDuration(time.Second))
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Info("background checking channels loop quit")
return
case <-ticker.C:
c.mu.Lock()
channels := c.store.GetNodesChannels()
reallocates, err := c.bgChecker(channels, time.Now())
if err != nil {
log.Warn("channel manager bg check failed", zap.Error(err))
c.mu.Unlock()
continue
}
updates := c.reassignPolicy(c.store, reallocates)
log.Info("channel manager bg check reassign", zap.Array("updates", updates))
for _, update := range updates {
if update.Type == Add {
c.fillChannelWatchInfo(update)
if !c.isSilent() {
log.Info("ChannelManager is not silent, skip channel balance this round")
} else {
toReleases := c.balancePolicy(c.store, time.Now())
log.Info("channel manager bg check balance", zap.Array("toReleases", toReleases))
if err := c.updateWithTimer(toReleases, datapb.ChannelWatchState_ToRelease); err != nil {
log.Warn("channel store update error", zap.Error(err))
}
}
if err := c.store.Update(updates); err != nil {
log.Warn("channel store update error", zap.Error(err))
}
c.mu.Unlock()
}
}
@ -610,6 +608,7 @@ func (c *ChannelManager) updateWithTimer(updates ChannelOpSet, state datapb.Chan
log.Warn("fail to update", zap.Array("updates", updates), zap.Error(err))
c.stateTimer.removeTimers(channelsWithTimer)
}
c.lastActiveTimestamp = time.Now()
return err
}
@ -623,14 +622,17 @@ func (c *ChannelManager) processAck(e *ackEvent) {
case watchSuccessAck:
log.Info("datanode successfully watched channel", zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName))
case watchFailAck, watchTimeoutAck: // failure acks from toWatch
log.Warn("datanode watch channel failed or timeout, will release", zap.Int64("nodeID", e.nodeID),
zap.String("channel", e.channelName))
err := c.Release(e.nodeID, e.channelName)
if err != nil {
log.Warn("fail to set channels to release for watch failure ACKs",
zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName), zap.Error(err))
}
case releaseFailAck, releaseTimeoutAck: // failure acks from toRelease
// Cleanup, Delete and Reassign
log.Warn("datanode release channel failed or timeout, will cleanup and reassign", zap.Int64("nodeID", e.nodeID),
zap.String("channel", e.channelName))
err := c.CleanupAndReassign(e.nodeID, e.channelName)
if err != nil {
log.Warn("fail to clean and reassign channels for release failure ACKs",
@ -639,6 +641,8 @@ func (c *ChannelManager) processAck(e *ackEvent) {
case releaseSuccessAck:
// Delete and Reassign
log.Info("datanode release channel successfully, will reassign", zap.Int64("nodeID", e.nodeID),
zap.String("channel", e.channelName))
err := c.Reassign(e.nodeID, e.channelName)
if err != nil {
log.Warn("fail to response to release success ACK",
@ -739,19 +743,19 @@ func (c *ChannelManager) Release(nodeID UniqueID, channelName string) error {
}
// Reassign reassigns a channel to another DataNode.
func (c *ChannelManager) Reassign(nodeID UniqueID, channelName string) error {
func (c *ChannelManager) Reassign(originNodeID UniqueID, channelName string) error {
c.mu.Lock()
defer c.mu.Unlock()
ch := c.getChannelByNodeAndName(nodeID, channelName)
ch := c.getChannelByNodeAndName(originNodeID, channelName)
if ch == nil {
return fmt.Errorf("fail to find matching nodeID: %d with channelName: %s", nodeID, channelName)
return fmt.Errorf("fail to find matching nodeID: %d with channelName: %s", originNodeID, channelName)
}
reallocates := &NodeChannelInfo{nodeID, []*channel{ch}}
reallocates := &NodeChannelInfo{originNodeID, []*channel{ch}}
if c.isMarkedDrop(channelName) {
if err := c.remove(nodeID, ch); err != nil {
if err := c.remove(originNodeID, ch); err != nil {
return fmt.Errorf("failed to remove watch info: %v,%s", ch, err.Error())
}
if err := c.h.FinishDropChannel(channelName); err != nil {
@ -766,13 +770,13 @@ func (c *ChannelManager) Reassign(nodeID UniqueID, channelName string) error {
if len(updates) <= 0 {
// Skip the remove if reassign to the original node.
log.Warn("failed to reassign channel to other nodes, assigning to the original DataNode",
zap.Int64("nodeID", nodeID),
zap.Int64("nodeID", originNodeID),
zap.String("channel name", channelName))
updates.Add(nodeID, []*channel{ch})
updates.Add(originNodeID, []*channel{ch})
}
log.Info("channel manager reassigning channels",
zap.Int64("old node ID", nodeID),
zap.Int64("old node ID", originNodeID),
zap.Array("updates", updates))
return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch)
}
@ -864,3 +868,10 @@ func getReleaseOp(nodeID UniqueID, ch *channel) ChannelOpSet {
op.Add(nodeID, []*channel{ch})
return op
}
func (c *ChannelManager) isSilent() bool {
if c.stateTimer.hasRunningTimers() {
return false
}
return time.Since(c.lastActiveTimestamp) >= Params.DataCoordCfg.ChannelBalanceSilentDuration.GetAsDuration(time.Second)
}

View File

@ -31,8 +31,8 @@ type ChannelPolicyFactory interface {
NewAssignPolicy() ChannelAssignPolicy
// NewReassignPolicy creates a new channel reassign policy.
NewReassignPolicy() ChannelReassignPolicy
// NewBgChecker creates a new background checker.
NewBgChecker() ChannelBGChecker
// NewBalancePolicy creates a new channel balance policy.
NewBalancePolicy() BalanceChannelPolicy
}
// ChannelPolicyFactoryV1 equal to policy batch
@ -65,9 +65,8 @@ func (f *ChannelPolicyFactoryV1) NewReassignPolicy() ChannelReassignPolicy {
return AverageReassignPolicy
}
// NewBgChecker implementing ChannelPolicyFactory
func (f *ChannelPolicyFactoryV1) NewBgChecker() ChannelBGChecker {
return BgCheckWithMaxWatchDuration(f.kv)
func (f *ChannelPolicyFactoryV1) NewBalancePolicy() BalanceChannelPolicy {
return AvgBalanceChannelPolicy
}
// ConsistentHashChannelPolicyFactory use consistent hash to determine channel assignment
@ -102,7 +101,7 @@ func (f *ConsistentHashChannelPolicyFactory) NewReassignPolicy() ChannelReassign
return EmptyReassignPolicy
}
// NewBgChecker creates a new background checker
func (f *ConsistentHashChannelPolicyFactory) NewBgChecker() ChannelBGChecker {
return EmptyBgChecker
// NewBalancePolicy creates a new balance policy
func (f *ConsistentHashChannelPolicyFactory) NewBalancePolicy() BalanceChannelPolicy {
return EmptyBalancePolicy
}

View File

@ -733,6 +733,45 @@ func TestChannelManager(t *testing.T) {
waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, channelName, collectionID)
})
t.Run("test background check silent", func(t *testing.T) {
metakv.RemoveWithPrefix("")
defer metakv.RemoveWithPrefix("")
prefix := Params.CommonCfg.DataCoordWatchSubPath.GetValue()
var (
collectionID = UniqueID(9)
channelNamePrefix = t.Name()
nodeID = UniqueID(111)
)
cName := channelNamePrefix + "TestBgChecker"
//1. set up channel_manager
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
chManager, err := NewChannelManager(metakv, newMockHandler(), withBgChecker())
require.NoError(t, err)
assert.NotNil(t, chManager.bgChecker)
chManager.Startup(ctx, []int64{nodeID})
//2. test isSilent function running correctly
Params.Save(Params.DataCoordCfg.ChannelBalanceSilentDuration.Key, "3")
assert.False(t, chManager.isSilent())
assert.False(t, chManager.stateTimer.hasRunningTimers())
//3. watch one channel
chManager.Watch(&channel{Name: cName, CollectionID: collectionID})
assert.False(t, chManager.isSilent())
assert.True(t, chManager.stateTimer.hasRunningTimers())
key := path.Join(prefix, strconv.FormatInt(nodeID, 10), cName)
waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess)
waitAndCheckState(t, metakv, datapb.ChannelWatchState_WatchSuccess, nodeID, cName, collectionID)
//4. wait for duration and check silent again
time.Sleep(Params.DataCoordCfg.ChannelBalanceSilentDuration.GetAsDuration(time.Second))
chManager.stateTimer.removeTimers([]string{cName})
assert.True(t, chManager.isSilent())
assert.False(t, chManager.stateTimer.hasRunningTimers())
})
}
func TestChannelManager_Reload(t *testing.T) {

View File

@ -28,7 +28,10 @@ import (
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
@ -89,6 +92,8 @@ type ROChannelStore interface {
GetBufferChannelInfo() *NodeChannelInfo
// GetNodes gets all node ids in store.
GetNodes() []int64
// GetNodeChannelCount
GetNodeChannelCount(nodeID int64) int
}
// RWChannelStore is the read write channel store for channels and nodes.
@ -160,6 +165,8 @@ func (c *ChannelStore) Reload() error {
Schema: cw.GetSchema(),
}
c.channelsInfo[nodeID].Channels = append(c.channelsInfo[nodeID].Channels, channel)
log.Info("channel store reload channel",
zap.Int64("nodeID", nodeID), zap.String("channel", channel.Name))
}
record.Record("ChannelStore reload")
return nil
@ -313,6 +320,15 @@ func (c *ChannelStore) GetNode(nodeID int64) *NodeChannelInfo {
return nil
}
func (c *ChannelStore) GetNodeChannelCount(nodeID int64) int {
for id, info := range c.channelsInfo {
if id == nodeID {
return len(info.Channels)
}
}
return 0
}
// Delete removes the given node from the channel store and returns its channels.
func (c *ChannelStore) Delete(nodeID int64) ([]*channel, error) {
for id, info := range c.channelsInfo {

View File

@ -17,15 +17,15 @@
package datacoord
import (
"context"
"math"
"sort"
"strconv"
"time"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"stathat.com/c/consistent"
)
@ -364,6 +364,27 @@ func ConsistentHashDeregisterPolicy(hashRing *consistent.Consistent) DeregisterP
}
}
type BalanceChannelPolicy func(store ROChannelStore, ts time.Time) ChannelOpSet
func AvgBalanceChannelPolicy(store ROChannelStore, ts time.Time) ChannelOpSet {
channelOps := make(ChannelOpSet, 0)
reAllocates, err := BgBalanceCheck(store.GetNodesChannels(), ts)
if err != nil {
log.Error("failed to balance node channels", zap.Error(err))
return channelOps
}
for _, reAlloc := range reAllocates {
toRelease := &ChannelOp{
Type: Add,
Channels: reAlloc.Channels,
NodeID: reAlloc.NodeID,
}
channelOps = append(channelOps, toRelease)
}
return channelOps
}
// ChannelReassignPolicy is a policy for reassigning channels
type ChannelReassignPolicy func(store ROChannelStore, reassigns []*NodeChannelInfo) ChannelOpSet
@ -372,8 +393,13 @@ func EmptyReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo) Cha
return nil
}
// AverageReassignPolicy is a reassigning policy that evenly assign channels
func AverageReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo) ChannelOpSet {
// EmptyBalancePolicy is a dummy balance policy
func EmptyBalancePolicy(store ROChannelStore, ts time.Time) ChannelOpSet {
return nil
}
// RoundRobinReassignPolicy is a reassigning policy that evenly assign channels
func RoundRobinReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo) ChannelOpSet {
allNodes := store.GetNodesChannels()
filterMap := make(map[int64]struct{})
for _, reassign := range reassigns {
@ -386,10 +412,10 @@ func AverageReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo) C
}
avaNodes = append(avaNodes, c)
}
ret := make([]*ChannelOp, 0)
if len(avaNodes) == 0 {
// if no node is left, do not reassign
return nil
return ret
}
sort.Slice(avaNodes, func(i, j int) bool {
return len(avaNodes[i].Channels) <= len(avaNodes[j].Channels)
@ -397,7 +423,6 @@ func AverageReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo) C
// reassign channels to remaining nodes
i := 0
ret := make([]*ChannelOp, 0)
addUpdates := make(map[int64]*ChannelOp)
for _, reassign := range reassigns {
deleteUpdate := &ChannelOp{
@ -427,51 +452,158 @@ func AverageReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo) C
return ret
}
// AverageReassignPolicy is a reassigning policy that evenly balance channels among datanodes
// which is used by bgChecker
func AverageReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo) ChannelOpSet {
allNodes := store.GetNodesChannels()
filterMap := make(map[int64]struct{})
toReassignTotalNum := 0
for _, reassign := range reassigns {
filterMap[reassign.NodeID] = struct{}{}
toReassignTotalNum += len(reassign.Channels)
}
avaNodes := make([]*NodeChannelInfo, 0, len(allNodes))
avaNodesChannelSum := 0
for _, node := range allNodes {
if _, ok := filterMap[node.NodeID]; ok {
continue
}
avaNodes = append(avaNodes, node)
avaNodesChannelSum += len(node.Channels)
}
log.Info("AverageReassignPolicy working", zap.Int("avaNodesCount", len(avaNodes)),
zap.Int("toAssignChannelNum", toReassignTotalNum), zap.Int("avaNodesChannelSum", avaNodesChannelSum))
ret := make([]*ChannelOp, 0)
if len(avaNodes) == 0 {
// if no node is left, do not reassign
log.Warn("there is no available nodes when reassigning, return")
return ret
}
avgChannelCount := int(math.Ceil(float64(avaNodesChannelSum+toReassignTotalNum) / (float64(len(avaNodes)))))
sort.Slice(avaNodes, func(i, j int) bool {
if len(avaNodes[i].Channels) == len(avaNodes[j].Channels) {
return avaNodes[i].NodeID < avaNodes[j].NodeID
}
return len(avaNodes[i].Channels) < len(avaNodes[j].Channels)
})
// reassign channels to remaining nodes
addUpdates := make(map[int64]*ChannelOp)
for _, reassign := range reassigns {
deleteUpdate := &ChannelOp{
Type: Delete,
Channels: reassign.Channels,
NodeID: reassign.NodeID,
}
ret = append(ret, deleteUpdate)
for _, ch := range reassign.Channels {
nodeIdx := 0
for {
targetID := avaNodes[nodeIdx%len(avaNodes)].NodeID
if nodeIdx < len(avaNodes) {
existedChannelCount := store.GetNodeChannelCount(targetID)
if _, ok := addUpdates[targetID]; !ok {
if existedChannelCount >= avgChannelCount {
log.Debug("targetNodeID has had more channels than average, skip", zap.Int64("targetID",
targetID), zap.Int("existedChannelCount", existedChannelCount))
nodeIdx++
continue
}
} else {
addingChannelCount := len(addUpdates[targetID].Channels)
if existedChannelCount+addingChannelCount >= avgChannelCount {
log.Debug("targetNodeID has had more channels than average, skip", zap.Int64("targetID",
targetID), zap.Int("currentChannelCount", existedChannelCount+addingChannelCount))
nodeIdx++
continue
}
}
} else {
nodeIdx++
}
if _, ok := addUpdates[targetID]; !ok {
addUpdates[targetID] = &ChannelOp{
Type: Add,
NodeID: targetID,
Channels: []*channel{ch},
}
} else {
addUpdates[targetID].Channels = append(addUpdates[targetID].Channels, ch)
}
break
}
}
}
for _, update := range addUpdates {
ret = append(ret, update)
}
return ret
}
// ChannelBGChecker check nodes' channels and return the channels needed to be reallocated.
type ChannelBGChecker func(channels []*NodeChannelInfo, ts time.Time) ([]*NodeChannelInfo, error)
type ChannelBGChecker func(ctx context.Context)
// EmptyBgChecker does nothing
func EmptyBgChecker(channels []*NodeChannelInfo, ts time.Time) ([]*NodeChannelInfo, error) {
return nil, nil
}
// BgCheckWithMaxWatchDuration returns a ChannelBGChecker with `Params.DataCoordCfg.MaxWatchDuration`.
func BgCheckWithMaxWatchDuration(kv kv.TxnKV) ChannelBGChecker {
return func(channels []*NodeChannelInfo, ts time.Time) ([]*NodeChannelInfo, error) {
reAllocations := make([]*NodeChannelInfo, 0, len(channels))
for _, ch := range channels {
cinfo := &NodeChannelInfo{
NodeID: ch.NodeID,
Channels: make([]*channel, 0),
}
for _, c := range ch.Channels {
k := buildNodeChannelKey(ch.NodeID, c.Name)
v, err := kv.Load(k)
if err != nil {
return nil, err
}
watchInfo := &datapb.ChannelWatchInfo{}
if err := proto.Unmarshal([]byte(v), watchInfo); err != nil {
return nil, err
}
reviseVChannelInfo(watchInfo.GetVchan())
// if a channel is not watched or update watch progress after WatchTimeoutInterval,
// then we reallocate it to another node
if watchInfo.State == datapb.ChannelWatchState_Complete || watchInfo.State == datapb.ChannelWatchState_WatchSuccess {
continue
}
startTime := time.Unix(watchInfo.StartTs, 0)
d := ts.Sub(startTime)
if d >= Params.DataCoordCfg.WatchTimeoutInterval.GetAsDuration(time.Second) {
cinfo.Channels = append(cinfo.Channels, c)
}
}
if len(cinfo.Channels) != 0 {
reAllocations = append(reAllocations, cinfo)
type ReAllocates []*NodeChannelInfo
func (rallocates ReAllocates) MarshalLogArray(enc zapcore.ArrayEncoder) error {
for _, nChannelInfo := range rallocates {
enc.AppendString("nodeID:")
enc.AppendInt64(nChannelInfo.NodeID)
cstr := "["
if len(nChannelInfo.Channels) > 0 {
for _, s := range nChannelInfo.Channels {
cstr += s.Name
cstr += ", "
}
cstr = cstr[:len(cstr)-2]
}
cstr += "]"
enc.AppendString(cstr)
}
return nil
}
func BgBalanceCheck(nodeChannels []*NodeChannelInfo, ts time.Time) ([]*NodeChannelInfo, error) {
avaNodeNum := len(nodeChannels)
reAllocations := make(ReAllocates, 0, avaNodeNum)
if avaNodeNum == 0 {
return reAllocations, nil
}
totalChannelNum := 0
for _, nodeChs := range nodeChannels {
totalChannelNum += len(nodeChs.Channels)
}
channelCountPerNode := totalChannelNum / avaNodeNum
for _, nChannels := range nodeChannels {
chCount := len(nChannels.Channels)
if chCount <= channelCountPerNode+1 {
log.Info("node channel count is not much larger than average, skip reallocate",
zap.Int64("nodeID", nChannels.NodeID), zap.Int("channelCount", chCount),
zap.Int("channelCountPerNode", channelCountPerNode))
continue
}
reallocate := &NodeChannelInfo{
NodeID: nChannels.NodeID,
Channels: make([]*channel, 0),
}
toReleaseCount := chCount - channelCountPerNode - 1
for _, ch := range nChannels.Channels {
reallocate.Channels = append(reallocate.Channels, ch)
toReleaseCount--
if toReleaseCount <= 0 {
break
}
}
reAllocations = append(reAllocations, reallocate)
}
log.Info("Channel Balancer got new reAllocations:", zap.Array("reAllocations", reAllocations))
return reAllocations, nil
}
func formatNodeIDs(ids []int64) []string {

View File

@ -20,10 +20,7 @@ import (
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/kv"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/stretchr/testify/assert"
"stathat.com/c/consistent"
)
@ -321,7 +318,7 @@ func TestConsistentHashDeregisterPolicy(t *testing.T) {
}
}
func TestAverageReassignPolicy(t *testing.T) {
func TestRoundRobinReassignPolicy(t *testing.T) {
type args struct {
store ROChannelStore
reassigns []*NodeChannelInfo
@ -342,7 +339,7 @@ func TestAverageReassignPolicy(t *testing.T) {
},
[]*NodeChannelInfo{{1, []*channel{{Name: "chan1", CollectionID: 1}}}},
},
nil,
[]*ChannelOp{},
},
{
"test normal reassing",
@ -361,35 +358,18 @@ func TestAverageReassignPolicy(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := AverageReassignPolicy(tt.args.store, tt.args.reassigns)
got := RoundRobinReassignPolicy(tt.args.store, tt.args.reassigns)
assert.EqualValues(t, tt.want, got)
})
}
}
func TestBgCheckWithMaxWatchDuration(t *testing.T) {
type watch struct {
nodeID int64
name string
info *datapb.ChannelWatchInfo
}
getKv := func(watchInfos []*watch) kv.TxnKV {
kv := memkv.NewMemoryKV()
for _, info := range watchInfos {
k := buildNodeChannelKey(info.nodeID, info.name)
v, _ := proto.Marshal(info.info)
kv.Save(k, string(v))
}
return kv
}
func TestBgCheckForChannelBalance(t *testing.T) {
type args struct {
kv kv.TxnKV
channels []*NodeChannelInfo
timestamp time.Time
}
ts := time.Now()
tests := []struct {
name string
args args
@ -397,30 +377,50 @@ func TestBgCheckWithMaxWatchDuration(t *testing.T) {
wantErr error
}{
{
"test normal expiration",
"test even distribution",
args{
getKv([]*watch{{1, "chan1", &datapb.ChannelWatchInfo{StartTs: ts.Unix(), State: datapb.ChannelWatchState_Uncomplete}},
{1, "chan2", &datapb.ChannelWatchInfo{StartTs: ts.Unix(), State: datapb.ChannelWatchState_Complete}}}),
[]*NodeChannelInfo{{1, []*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}}}},
ts.Add(Params.DataCoordCfg.WatchTimeoutInterval.GetAsDuration(time.Second)),
[]*NodeChannelInfo{
{1, []*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}}},
{2, []*channel{{Name: "chan1", CollectionID: 2}, {Name: "chan2", CollectionID: 2}}},
{3, []*channel{{Name: "chan1", CollectionID: 3}, {Name: "chan2", CollectionID: 3}}},
},
time.Now(),
},
[]*NodeChannelInfo{{1, []*channel{{Name: "chan1", CollectionID: 1}}}},
//there should be no reallocate
[]*NodeChannelInfo{},
nil,
},
{
"test no expiration",
"test uneven with conservative effect",
args{
getKv([]*watch{{1, "chan1", &datapb.ChannelWatchInfo{StartTs: ts.Unix(), State: datapb.ChannelWatchState_Uncomplete}}}),
[]*NodeChannelInfo{{1, []*channel{{Name: "chan1", CollectionID: 1}}}},
ts.Add(Params.DataCoordCfg.WatchTimeoutInterval.GetAsDuration(time.Second)).Add(-time.Second),
[]*NodeChannelInfo{
{1, []*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}}},
{2, []*channel{}},
},
time.Now(),
},
// as we deem that the node having only one channel more than average as even, so there's no reallocation
// for this test case
[]*NodeChannelInfo{},
nil,
},
{
"test uneven with zero",
args{
[]*NodeChannelInfo{
{1, []*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1},
{Name: "chan3", CollectionID: 1}}},
{2, []*channel{}},
},
time.Now(),
},
[]*NodeChannelInfo{{1, []*channel{{Name: "chan1", CollectionID: 1}}}},
nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
policy := BgCheckWithMaxWatchDuration(tt.args.kv)
policy := BgBalanceCheck
got, err := policy(tt.args.channels, tt.args.timestamp)
assert.Equal(t, tt.wantErr, err)
assert.EqualValues(t, tt.want, got)
@ -428,6 +428,208 @@ func TestBgCheckWithMaxWatchDuration(t *testing.T) {
}
}
func TestAvgReassignPolicy(t *testing.T) {
type args struct {
store ROChannelStore
reassigns []*NodeChannelInfo
}
tests := []struct {
name string
args args
want ChannelOpSet
}{
{
"test_only_one_node",
args{
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{Name: "chan1", CollectionID: 1}}},
},
},
[]*NodeChannelInfo{{1, []*channel{{Name: "chan1", CollectionID: 1}}}},
},
//as there's no available nodes except the input node, there's no reassign plan generated
[]*ChannelOp{},
},
{
"test_zero_avg",
args{
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{Name: "chan1", CollectionID: 1}}},
2: {2, []*channel{}},
3: {2, []*channel{}},
4: {2, []*channel{}},
},
},
[]*NodeChannelInfo{{1, []*channel{{Name: "chan1", CollectionID: 1}}}},
},
[]*ChannelOp{
//as we use ceil to calculate the wanted average number, there should be one reassign
//though the average num less than 1
{Delete, 1, []*channel{{Name: "chan1", CollectionID: 1}}, nil},
{Add, 2, []*channel{{Name: "chan1", CollectionID: 1}}, nil}},
},
{
"test_normal_reassigning_for_one_available_nodes",
args{
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}}},
2: {2, []*channel{}},
},
},
[]*NodeChannelInfo{{1, []*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}}}},
},
[]*ChannelOp{
{Delete, 1, []*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}}, nil},
{Add, 2, []*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}}, nil}},
},
{
"test_normal_reassigning_for_multiple_available_nodes",
args{
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{
{Name: "chan1", CollectionID: 1},
{Name: "chan2", CollectionID: 1},
{Name: "chan3", CollectionID: 1},
{Name: "chan4", CollectionID: 1}}},
2: {2, []*channel{}},
3: {3, []*channel{}},
4: {4, []*channel{}},
},
},
[]*NodeChannelInfo{{1, []*channel{
{Name: "chan1", CollectionID: 1},
{Name: "chan2", CollectionID: 1},
{Name: "chan3", CollectionID: 1},
}}},
},
[]*ChannelOp{
{Delete, 1, []*channel{
{Name: "chan1", CollectionID: 1},
{Name: "chan2", CollectionID: 1},
{Name: "chan3", CollectionID: 1}},
nil},
{Add, 2, []*channel{{Name: "chan1", CollectionID: 1}}, nil},
{Add, 3, []*channel{{Name: "chan2", CollectionID: 1}}, nil},
{Add, 4, []*channel{{Name: "chan3", CollectionID: 1}}, nil},
},
},
{
"test_reassigning_for_extreme_case",
args{
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{
{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1},
{Name: "chan3", CollectionID: 1}, {Name: "chan4", CollectionID: 1},
{Name: "chan5", CollectionID: 1}, {Name: "chan6", CollectionID: 1},
{Name: "chan7", CollectionID: 1}, {Name: "chan8", CollectionID: 1},
{Name: "chan9", CollectionID: 1}, {Name: "chan10", CollectionID: 1},
{Name: "chan11", CollectionID: 1}, {Name: "chan12", CollectionID: 1},
}},
2: {2, []*channel{
{Name: "chan13", CollectionID: 1}, {Name: "chan14", CollectionID: 1},
}},
3: {3, []*channel{{Name: "chan15", CollectionID: 1}}},
4: {4, []*channel{}},
},
},
[]*NodeChannelInfo{{1, []*channel{
{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1},
{Name: "chan3", CollectionID: 1}, {Name: "chan4", CollectionID: 1},
{Name: "chan5", CollectionID: 1}, {Name: "chan6", CollectionID: 1},
{Name: "chan7", CollectionID: 1}, {Name: "chan8", CollectionID: 1},
{Name: "chan9", CollectionID: 1}, {Name: "chan10", CollectionID: 1},
{Name: "chan11", CollectionID: 1}, {Name: "chan12", CollectionID: 1},
}}},
},
[]*ChannelOp{
{Delete, 1, []*channel{
{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1},
{Name: "chan3", CollectionID: 1}, {Name: "chan4", CollectionID: 1},
{Name: "chan5", CollectionID: 1}, {Name: "chan6", CollectionID: 1},
{Name: "chan7", CollectionID: 1}, {Name: "chan8", CollectionID: 1},
{Name: "chan9", CollectionID: 1}, {Name: "chan10", CollectionID: 1},
{Name: "chan11", CollectionID: 1}, {Name: "chan12", CollectionID: 1},
}, nil},
{Add, 4, []*channel{
{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1},
{Name: "chan3", CollectionID: 1}, {Name: "chan4", CollectionID: 1},
{Name: "chan5", CollectionID: 1}}, nil},
{Add, 3, []*channel{
{Name: "chan6", CollectionID: 1}, {Name: "chan7", CollectionID: 1},
{Name: "chan8", CollectionID: 1}, {Name: "chan9", CollectionID: 1},
}, nil},
{Add, 2, []*channel{
{Name: "chan10", CollectionID: 1}, {Name: "chan11", CollectionID: 1},
{Name: "chan12", CollectionID: 1},
}, nil},
},
},
}
for _, tt := range tests {
if tt.name == "test_reassigning_for_extreme_case" ||
tt.name == "test_normal_reassigning_for_multiple_available_nodes" {
continue
}
t.Run(tt.name, func(t *testing.T) {
got := AverageReassignPolicy(tt.args.store, tt.args.reassigns)
assert.EqualValues(t, tt.want, got)
})
}
}
func TestAvgBalanceChannelPolicy(t *testing.T) {
type args struct {
store ROChannelStore
}
tests := []struct {
name string
args args
want ChannelOpSet
}{
{
"test_only_one_node",
args{
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {
1, []*channel{
{Name: "chan1", CollectionID: 1},
{Name: "chan2", CollectionID: 1},
{Name: "chan3", CollectionID: 1},
{Name: "chan4", CollectionID: 1},
},
},
2: {2, []*channel{}},
},
},
},
[]*ChannelOp{
{Add, 1, []*channel{
{Name: "chan1", CollectionID: 1},
}, nil},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := AvgBalanceChannelPolicy(tt.args.store, time.Now())
assert.EqualValues(t, tt.want, got)
})
}
}
func TestAvgAssignRegisterPolicy(t *testing.T) {
type args struct {
store ROChannelStore

View File

@ -391,7 +391,8 @@ func (s *Server) initCluster() error {
}
var err error
s.channelManager, err = NewChannelManager(s.kvClient, s.handler, withMsgstreamFactory(s.factory), withStateChecker())
s.channelManager, err = NewChannelManager(s.kvClient, s.handler, withMsgstreamFactory(s.factory),
withStateChecker(), withBgChecker())
if err != nil {
return err
}

View File

@ -1622,7 +1622,9 @@ Max read concurrency must greater than or equal to 1, and less than or equal to
type dataCoordConfig struct {
// --- CHANNEL ---
WatchTimeoutInterval ParamItem `refreshable:"false"`
WatchTimeoutInterval ParamItem `refreshable:"false"`
ChannelBalanceSilentDuration ParamItem `refreshable:"true"`
ChannelBalanceInterval ParamItem `refreshable:"true"`
// --- SEGMENTS ---
SegmentMaxSize ParamItem `refreshable:"false"`
@ -1677,6 +1679,24 @@ func (p *dataCoordConfig) init(base *BaseTable) {
}
p.WatchTimeoutInterval.Init(base.mgr)
p.ChannelBalanceSilentDuration = ParamItem{
Key: "dataCoord.channel.balanceSilentDuration",
Version: "2.2.3",
DefaultValue: "300",
Doc: "The duration after which the channel manager start background channel balancing",
Export: true,
}
p.ChannelBalanceSilentDuration.Init(base.mgr)
p.ChannelBalanceInterval = ParamItem{
Key: "dataCoord.channel.balanceInterval",
Version: "2.2.3",
DefaultValue: "360",
Doc: "The interval with which the channel manager check dml channel balance status",
Export: true,
}
p.ChannelBalanceInterval.Init(base.mgr)
p.SegmentMaxSize = ParamItem{
Key: "dataCoord.segment.maxSize",
Version: "2.0.0",