enhance: [2.4] Use map instead of slice to maintain channel info (#32273) (#32316)

Cherry-pick from master
pr: #32273
See also #32165

`ChannelManager.Match` is a frequent operation for datacoord. When the
collection number is large, iteration over all channels will cost lots
of CPU time and time consuming.

This PR change the data structure storing datanode-channel info to map
avoiding this iteration when checking channel existence.

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/32429/head
congqixia 2024-04-17 19:07:20 +08:00 committed by GitHub
parent dff96c323b
commit 8f7ac8f7b8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 249 additions and 216 deletions

View File

@ -508,13 +508,12 @@ func (c *ChannelManagerImpl) GetBufferChannels() *NodeChannelInfo {
func (c *ChannelManagerImpl) GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string {
nodeChs := make(map[UniqueID][]string)
for _, nodeChannels := range c.GetAssignedChannels() {
filtered := lo.Filter(nodeChannels.Channels, func(channel RWChannel, _ int) bool {
return channel.GetCollectionID() == collectionID
})
channelNames := lo.Map(filtered, func(channel RWChannel, _ int) string {
return channel.GetName()
})
var channelNames []string
for name, ch := range nodeChannels.Channels {
if ch.GetCollectionID() == collectionID {
channelNames = append(channelNames, name)
}
}
nodeChs[nodeChannels.NodeID] = channelNames
}
return nodeChs
@ -524,11 +523,11 @@ func (c *ChannelManagerImpl) GetNodeChannelsByCollectionID(collectionID UniqueID
func (c *ChannelManagerImpl) GetChannelsByCollectionID(collectionID UniqueID) []RWChannel {
channels := make([]RWChannel, 0)
for _, nodeChannels := range c.GetAssignedChannels() {
filtered := lo.Filter(nodeChannels.Channels, func(channel RWChannel, _ int) bool {
return channel.GetCollectionID() == collectionID
})
channels = append(channels, filtered...)
for _, ch := range nodeChannels.Channels {
if ch.GetCollectionID() == collectionID {
channels = append(channels, ch)
}
}
}
return channels
}
@ -807,7 +806,7 @@ func (c *ChannelManagerImpl) Reassign(originNodeID UniqueID, channelName string)
}
c.mu.RUnlock()
reallocates := &NodeChannelInfo{originNodeID, []RWChannel{ch}}
reallocates := NewNodeChannelInfo(originNodeID, ch)
isDropped := c.isMarkedDrop(channelName)
c.mu.Lock()
@ -862,7 +861,7 @@ func (c *ChannelManagerImpl) CleanupAndReassign(nodeID UniqueID, channelName str
msgstream.UnsubscribeChannels(c.ctx, c.msgstreamFactory, subName, []string{pchannelName})
}
reallocates := &NodeChannelInfo{nodeID, []RWChannel{chToCleanUp}}
reallocates := NewNodeChannelInfo(nodeID, chToCleanUp)
isDropped := c.isMarkedDrop(channelName)
c.mu.Lock()

View File

@ -56,6 +56,31 @@ func waitAndStore(t *testing.T, watchkv kv.MetaKv, key string, waitState, storeS
}
}
func waitPrefixAndStore(t *testing.T, watchkv kv.MetaKv, prefix string, waitState, storeState datapb.ChannelWatchState) string {
channelName := ""
for {
keys, values, err := watchkv.LoadWithPrefix(prefix)
if err == nil && len(values) > 0 {
for idx, value := range values {
watchInfo, err := parseWatchInfo(keys[idx], []byte(value))
require.NoError(t, err)
require.Equal(t, waitState, watchInfo.GetState())
channelName = watchInfo.GetVchan().GetChannelName()
watchInfo.State = storeState
data, err := proto.Marshal(watchInfo)
require.NoError(t, err)
watchkv.Save(path.Join(prefix, watchInfo.GetVchan().GetChannelName()), string(data))
}
break
}
time.Sleep(100 * time.Millisecond)
}
return channelName
}
// waitAndCheckState checks if the DataCoord writes expected state into Etcd
func waitAndCheckState(t *testing.T, kv kv.MetaKv, expectedState datapb.ChannelWatchState, nodeID UniqueID, channelName string, collectionID UniqueID) {
for {
@ -217,10 +242,8 @@ func TestChannelManager_StateTransfer(t *testing.T) {
chManager.store = &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []RWChannel{
&channelMeta{Name: cName, CollectionID: collectionID},
}},
oldNode: {oldNode, []RWChannel{}},
nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: cName, CollectionID: collectionID}),
oldNode: NewNodeChannelInfo(oldNode),
},
}
@ -260,9 +283,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
chManager.store = &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []RWChannel{
&channelMeta{Name: cName, CollectionID: collectionID},
}},
nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: cName, CollectionID: collectionID}),
},
}
@ -306,10 +327,8 @@ func TestChannelManager_StateTransfer(t *testing.T) {
chManager.store = &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []RWChannel{
&channelMeta{Name: cName, CollectionID: collectionID},
}},
oldNode: {oldNode, []RWChannel{}},
nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: cName, CollectionID: collectionID}),
oldNode: NewNodeChannelInfo(oldNode),
},
}
@ -352,9 +371,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
chManager.store = &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []RWChannel{
&channelMeta{Name: cName, CollectionID: collectionID},
}},
nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: cName, CollectionID: collectionID}),
},
}
@ -400,10 +417,7 @@ func TestChannelManager(t *testing.T) {
chManager.store = &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []RWChannel{
&channelMeta{Name: channel1, CollectionID: collectionID},
&channelMeta{Name: channel2, CollectionID: collectionID},
}},
nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: channel1, CollectionID: collectionID}, &channelMeta{Name: channel2, CollectionID: collectionID}),
},
}
@ -438,10 +452,7 @@ func TestChannelManager(t *testing.T) {
chManager.store = &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
bufferID: {bufferID, []RWChannel{
&channelMeta{Name: channel1, CollectionID: collectionID},
&channelMeta{Name: channel2, CollectionID: collectionID},
}},
bufferID: NewNodeChannelInfo(bufferID, &channelMeta{Name: channel1, CollectionID: collectionID}, &channelMeta{Name: channel2, CollectionID: collectionID}),
},
}
@ -502,7 +513,7 @@ func TestChannelManager(t *testing.T) {
chManager.store = &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []RWChannel{&channelMeta{Name: channelName, CollectionID: collectionID}}},
nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: channelName, CollectionID: collectionID}),
},
}
@ -682,11 +693,9 @@ func TestChannelManager(t *testing.T) {
chManager.store = &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
1: {1, []RWChannel{
&channelMeta{Name: "channel-1", CollectionID: collectionID},
&channelMeta{Name: "channel-2", CollectionID: collectionID},
}},
bufferID: {bufferID, []RWChannel{}},
1: NewNodeChannelInfo(1, &channelMeta{Name: "channel-1", CollectionID: collectionID},
&channelMeta{Name: "channel-2", CollectionID: collectionID}),
bufferID: NewNodeChannelInfo(bufferID),
},
}
chManager.stateTimer.startOne(datapb.ChannelWatchState_ToRelease, "channel-1", 1, Params.DataCoordCfg.WatchTimeoutInterval.GetAsDuration(time.Second))
@ -774,7 +783,7 @@ func TestChannelManager(t *testing.T) {
chManager.store = &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []RWChannel{&channelMeta{Name: channelName, CollectionID: collectionID}}},
nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: channelName, CollectionID: collectionID}),
},
}
ch = chManager.getChannelByNodeAndName(nodeID, channelName)
@ -943,7 +952,7 @@ func TestChannelManager_Reload(t *testing.T) {
chManager.store = &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []RWChannel{&channelMeta{Name: channelName, CollectionID: collectionID}}},
nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: channelName, CollectionID: collectionID}),
},
}
@ -966,7 +975,7 @@ func TestChannelManager_Reload(t *testing.T) {
chManager.store = &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []RWChannel{&channelMeta{Name: channelName, CollectionID: collectionID}}},
nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: channelName, CollectionID: collectionID}),
},
}
@ -993,8 +1002,8 @@ func TestChannelManager_Reload(t *testing.T) {
chManager.store = &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []RWChannel{&channelMeta{Name: channelName, CollectionID: collectionID}}},
999: {999, []RWChannel{}},
nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: channelName, CollectionID: collectionID}),
999: NewNodeChannelInfo(999),
},
}
require.NoError(t, err)
@ -1024,8 +1033,8 @@ func TestChannelManager_Reload(t *testing.T) {
cm.store = &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
1: {1, []RWChannel{&channelMeta{Name: "channel1", CollectionID: 1}}},
2: {2, []RWChannel{&channelMeta{Name: "channel2", CollectionID: 1}}},
1: NewNodeChannelInfo(1, &channelMeta{Name: "channel1", CollectionID: 1}),
2: NewNodeChannelInfo(2, &channelMeta{Name: "channel2", CollectionID: 1}),
},
}
@ -1077,58 +1086,69 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) {
chManager.store = &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
1: {1, []RWChannel{
&channelMeta{Name: "channel-1", CollectionID: collectionID},
1: NewNodeChannelInfo(1, &channelMeta{Name: "channel-1", CollectionID: collectionID},
&channelMeta{Name: "channel-2", CollectionID: collectionID},
&channelMeta{Name: "channel-3", CollectionID: collectionID},
}},
&channelMeta{Name: "channel-3", CollectionID: collectionID}),
},
}
var channelBalanced string
chManager.AddNode(2)
channelBalanced = "channel-1"
key := path.Join(prefix, "1", channelBalanced)
waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseSuccess)
watchPrefix := path.Join(prefix, "1")
channelBalanced = waitPrefixAndStore(t, watchkv, watchPrefix, datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseSuccess)
key = path.Join(prefix, "2", channelBalanced)
key := path.Join(prefix, "2", channelBalanced)
waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess)
assert.True(t, chManager.Match(1, "channel-2"))
assert.True(t, chManager.Match(1, "channel-3"))
assert.True(t, chManager.Match(2, "channel-1"))
for _, channel := range []string{"channel-1", "channel-2", "channel-3"} {
if channel == channelBalanced {
assert.True(t, chManager.Match(2, channel))
} else {
assert.True(t, chManager.Match(1, channel))
}
}
chManager.AddNode(3)
chManager.Watch(ctx, &channelMeta{Name: "channel-4", CollectionID: collectionID})
key = path.Join(prefix, "3", "channel-4")
waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess)
// key = path.Join(prefix, "3", "channel-4")
watchPrefix = path.Join(prefix, "3")
channelBalanced2 := waitPrefixAndStore(t, watchkv, watchPrefix, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess)
assert.True(t, chManager.Match(1, "channel-2"))
assert.True(t, chManager.Match(1, "channel-3"))
assert.True(t, chManager.Match(2, "channel-1"))
assert.True(t, chManager.Match(3, "channel-4"))
for _, channel := range []string{"channel-1", "channel-2", "channel-3", "channel-4"} {
if channel == channelBalanced {
assert.True(t, chManager.Match(2, channel))
} else if channel == channelBalanced2 {
assert.True(t, chManager.Match(3, channel))
} else {
assert.True(t, chManager.Match(1, channel))
}
}
chManager.DeleteNode(3)
key = path.Join(prefix, "2", "channel-4")
key = path.Join(prefix, "2", channelBalanced2)
waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess)
assert.True(t, chManager.Match(1, "channel-2"))
assert.True(t, chManager.Match(1, "channel-3"))
assert.True(t, chManager.Match(2, "channel-1"))
assert.True(t, chManager.Match(2, "channel-4"))
for _, channel := range []string{"channel-1", "channel-2", "channel-3", "channel-4"} {
if channel == channelBalanced {
assert.True(t, chManager.Match(2, channel))
} else if channel == channelBalanced2 {
assert.True(t, chManager.Match(2, channel))
} else {
assert.True(t, chManager.Match(1, channel))
}
}
chManager.DeleteNode(2)
key = path.Join(prefix, "1", "channel-4")
key = path.Join(prefix, "1", channelBalanced)
waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess)
key = path.Join(prefix, "1", "channel-1")
key = path.Join(prefix, "1", channelBalanced2)
waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess)
assert.True(t, chManager.Match(1, "channel-2"))
assert.True(t, chManager.Match(1, "channel-3"))
assert.True(t, chManager.Match(1, "channel-1"))
assert.True(t, chManager.Match(1, "channel-4"))
for _, channel := range []string{"channel-1", "channel-2", "channel-3", "channel-4"} {
assert.True(t, chManager.Match(1, channel))
}
})
}
@ -1157,12 +1177,7 @@ func TestChannelManager_RemoveChannel(t *testing.T) {
store: &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
1: {
NodeID: 1,
Channels: []RWChannel{
&channelMeta{Name: "ch1", CollectionID: 1},
},
},
1: NewNodeChannelInfo(1, &channelMeta{Name: "ch1", CollectionID: 1}),
},
},
},
@ -1257,14 +1272,14 @@ func TestChannelManager_BackgroundChannelChecker(t *testing.T) {
mockStore.EXPECT().GetNodesChannels().Return([]*NodeChannelInfo{
{
NodeID: 1,
Channels: []RWChannel{
&channelMeta{
Channels: map[string]RWChannel{
"channel-1": &channelMeta{
Name: "channel-1",
},
&channelMeta{
"channel-2": &channelMeta{
Name: "channel-2",
},
&channelMeta{
"channel-3": &channelMeta{
Name: "channel-3",
},
},

View File

@ -33,7 +33,6 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
const (
@ -220,7 +219,31 @@ type ChannelStore struct {
// NodeChannelInfo stores the nodeID and its channels.
type NodeChannelInfo struct {
NodeID int64
Channels []RWChannel
Channels map[string]RWChannel
// ChannelsSet typeutil.Set[string] // map for fast channel check
}
// AddChannel appends channel info node channel list.
func (info *NodeChannelInfo) AddChannel(ch RWChannel) {
info.Channels[ch.GetName()] = ch
}
// RemoveChannel removes channel from Channels.
func (info *NodeChannelInfo) RemoveChannel(channelName string) {
delete(info.Channels, channelName)
}
func NewNodeChannelInfo(nodeID int64, channels ...RWChannel) *NodeChannelInfo {
info := &NodeChannelInfo{
NodeID: nodeID,
Channels: make(map[string]RWChannel),
}
for _, channel := range channels {
info.Channels[channel.GetName()] = channel
}
return info
}
// NewChannelStore creates and returns a new ChannelStore.
@ -231,7 +254,7 @@ func NewChannelStore(kv kv.TxnKV) *ChannelStore {
}
c.channelsInfo[bufferID] = &NodeChannelInfo{
NodeID: bufferID,
Channels: make([]RWChannel, 0),
Channels: make(map[string]RWChannel),
}
return c
}
@ -264,7 +287,8 @@ func (c *ChannelStore) Reload() error {
Schema: cw.GetSchema(),
WatchInfo: cw,
}
c.channelsInfo[nodeID].Channels = append(c.channelsInfo[nodeID].Channels, channel)
c.channelsInfo[nodeID].AddChannel(channel)
log.Info("channel store reload channel",
zap.Int64("nodeID", nodeID), zap.String("channel", channel.Name))
metrics.DataCoordDmlChannelNum.WithLabelValues(strconv.FormatInt(nodeID, 10)).Set(float64(len(c.channelsInfo[nodeID].Channels)))
@ -280,10 +304,7 @@ func (c *ChannelStore) Add(nodeID int64) {
return
}
c.channelsInfo[nodeID] = &NodeChannelInfo{
NodeID: nodeID,
Channels: make([]RWChannel, 0),
}
c.channelsInfo[nodeID] = NewNodeChannelInfo(nodeID)
}
// Update applies the channel operations in opSet.
@ -317,11 +338,9 @@ func (c *ChannelStore) Update(opSet *ChannelOpSet) error {
}
func (c *ChannelStore) checkIfExist(nodeID int64, channel RWChannel) bool {
if _, ok := c.channelsInfo[nodeID]; ok {
for _, ch := range c.channelsInfo[nodeID].Channels {
if channel.GetName() == ch.GetName() && channel.GetCollectionID() == ch.GetCollectionID() {
return true
}
if info, ok := c.channelsInfo[nodeID]; ok {
if ch, ok := info.Channels[channel.GetName()]; ok {
return ch.GetCollectionID() == channel.GetCollectionID()
}
}
return false
@ -343,19 +362,13 @@ func (c *ChannelStore) update(opSet *ChannelOpSet) error {
continue // prevent adding duplicated channel info
}
// Append target channels to channel store.
c.channelsInfo[op.NodeID].Channels = append(c.channelsInfo[op.NodeID].Channels, ch)
c.channelsInfo[op.NodeID].AddChannel(ch)
}
case Delete:
del := typeutil.NewSet(op.GetChannelNames()...)
prev := c.channelsInfo[op.NodeID].Channels
curr := make([]RWChannel, 0, len(prev))
for _, ch := range prev {
if !del.Contain(ch.GetName()) {
curr = append(curr, ch)
}
info := c.channelsInfo[op.NodeID]
for _, channelName := range op.GetChannelNames() {
info.RemoveChannel(channelName)
}
c.channelsInfo[op.NodeID].Channels = curr
default:
return errUnknownOpType
}
@ -421,7 +434,7 @@ func (c *ChannelStore) Delete(nodeID int64) ([]RWChannel, error) {
return nil, err
}
delete(c.channelsInfo, id)
return info.Channels, nil
return lo.Values(info.Channels), nil
}
}
return nil, nil

View File

@ -40,10 +40,7 @@ func genNodeChannelInfos(id int64, num int) *NodeChannelInfo {
name := fmt.Sprintf("ch%d", i)
channels = append(channels, &channelMeta{Name: name, CollectionID: 1, WatchInfo: &datapb.ChannelWatchInfo{}})
}
return &NodeChannelInfo{
NodeID: id,
Channels: channels,
}
return NewNodeChannelInfo(id, channels...)
}
func genChannelOperations(from, to int64, num int) *ChannelOpSet {
@ -85,7 +82,7 @@ func TestChannelStore_Update(t *testing.T) {
txnKv,
map[int64]*NodeChannelInfo{
1: genNodeChannelInfos(1, 500),
2: {NodeID: 2},
2: NewNodeChannelInfo(2),
},
},
args{

View File

@ -47,8 +47,8 @@ func BufferChannelAssignPolicy(store ROChannelStore, nodeID int64) *ChannelOpSet
}
opSet := NewChannelOpSet(
NewDeleteOp(bufferID, info.Channels...),
NewAddOp(nodeID, info.Channels...))
NewDeleteOp(bufferID, lo.Values(info.Channels)...),
NewAddOp(nodeID, lo.Values(info.Channels)...))
return opSet
}
@ -89,7 +89,7 @@ func AvgAssignRegisterPolicy(store ROChannelStore, nodeID int64) (*ChannelOpSet,
// TODO: Consider re-picking in case assignment is extremely uneven?
continue
}
releases[toRelease.NodeID] = append(releases[toRelease.NodeID], toRelease.Channels[chIdx])
releases[toRelease.NodeID] = append(releases[toRelease.NodeID], lo.Values(toRelease.Channels)[chIdx])
}
// Channels in `releases` are reassigned eventually by channel manager.
@ -197,8 +197,8 @@ func AvgAssignUnregisteredChannels(store ROChannelStore, nodeID int64) *ChannelO
for _, c := range allNodes {
if c.NodeID == nodeID {
opSet.Delete(nodeID, c.Channels...)
unregisteredChannels = append(unregisteredChannels, c.Channels...)
opSet.Delete(nodeID, lo.Values(c.Channels)...)
unregisteredChannels = append(unregisteredChannels, lo.Values(c.Channels)...)
continue
}
avaNodes = append(avaNodes, c)
@ -236,7 +236,7 @@ func AvgBalanceChannelPolicy(store ROChannelStore, ts time.Time) *ChannelOpSet {
return opSet
}
for _, reAlloc := range reAllocates {
opSet.Add(reAlloc.NodeID, reAlloc.Channels...)
opSet.Add(reAlloc.NodeID, lo.Values(reAlloc.Channels)...)
}
return opSet
@ -295,7 +295,7 @@ func AverageReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo) *
// reassign channels to remaining nodes
addUpdates := make(map[int64]*ChannelOp)
for _, reassign := range reassigns {
opSet.Delete(reassign.NodeID, reassign.Channels...)
opSet.Delete(reassign.NodeID, lo.Values(reassign.Channels)...)
for _, ch := range reassign.Channels {
nodeIdx := 0
for {
@ -381,13 +381,10 @@ func BgBalanceCheck(nodeChannels []*NodeChannelInfo, ts time.Time) ([]*NodeChann
zap.Int("channelCountPerNode", channelCountPerNode))
continue
}
reallocate := &NodeChannelInfo{
NodeID: nChannels.NodeID,
Channels: make([]RWChannel, 0),
}
reallocate := NewNodeChannelInfo(nChannels.NodeID)
toReleaseCount := chCount - channelCountPerNode - 1
for _, ch := range nChannels.Channels {
reallocate.Channels = append(reallocate.Channels, ch)
reallocate.AddChannel(ch)
toReleaseCount--
if toReleaseCount <= 0 {
break

View File

@ -33,8 +33,8 @@ func TestBufferChannelAssignPolicy(t *testing.T) {
store := &ChannelStore{
store: kv,
channelsInfo: map[int64]*NodeChannelInfo{
1: {1, []RWChannel{}},
bufferID: {bufferID, channels},
1: NewNodeChannelInfo(1),
bufferID: NewNodeChannelInfo(bufferID, channels...),
},
}
@ -86,7 +86,7 @@ func TestAverageAssignPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []RWChannel{getChannel("chan1", 1)}},
1: NewNodeChannelInfo(1, getChannel("chan1", 1)),
},
},
[]RWChannel{getChannel("chan1", 1)},
@ -99,8 +99,8 @@ func TestAverageAssignPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []RWChannel{getChannel("chan1", 1), getChannel("chan2", 1)}},
2: {2, []RWChannel{getChannel("chan3", 1)}},
1: NewNodeChannelInfo(1, getChannel("chan", 1), getChannel("chan2", 1)),
2: NewNodeChannelInfo(2, getChannel("chan3", 1)),
},
},
[]RWChannel{getChannel("chan4", 1)},
@ -132,7 +132,7 @@ func TestAvgAssignUnregisteredChannels(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []RWChannel{getChannel("chan1", 1)}},
1: NewNodeChannelInfo(1, getChannel("chan1", 1)),
},
},
1,
@ -148,9 +148,9 @@ func TestAvgAssignUnregisteredChannels(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []RWChannel{getChannel("chan1", 1)}},
2: {2, []RWChannel{getChannel("chan2", 1)}},
3: {3, []RWChannel{}},
1: NewNodeChannelInfo(1, getChannel("chan1", 1)),
2: NewNodeChannelInfo(2, getChannel("chan2", 1)),
3: NewNodeChannelInfo(3),
},
},
2,
@ -176,53 +176,50 @@ func TestBgCheckForChannelBalance(t *testing.T) {
}
tests := []struct {
name string
args args
want []*NodeChannelInfo
name string
args args
// want []*NodeChannelInfo
want int
wantErr error
}{
{
"test even distribution",
args{
[]*NodeChannelInfo{
{1, []RWChannel{getChannel("chan1", 1), getChannel("chan2", 1)}},
{2, []RWChannel{getChannel("chan1", 2), getChannel("chan2", 2)}},
{3, []RWChannel{getChannel("chan1", 3), getChannel("chan2", 3)}},
NewNodeChannelInfo(1, getChannel("chan1", 1), getChannel("chan2", 1)),
NewNodeChannelInfo(2, getChannel("chan1", 2), getChannel("chan2", 2)),
NewNodeChannelInfo(3, getChannel("chan1", 3), getChannel("chan2", 3)),
},
time.Now(),
},
// there should be no reallocate
[]*NodeChannelInfo{},
0,
nil,
},
{
"test uneven with conservative effect",
args{
[]*NodeChannelInfo{
{1, []RWChannel{getChannel("chan1", 1), getChannel("chan2", 1)}},
{2, []RWChannel{}},
NewNodeChannelInfo(1, getChannel("chan1", 1), getChannel("chan2", 1)),
NewNodeChannelInfo(2),
},
time.Now(),
},
// as we deem that the node having only one channel more than average as even, so there's no reallocation
// for this test case
[]*NodeChannelInfo{},
0,
nil,
},
{
"test uneven with zero",
args{
[]*NodeChannelInfo{
{1, []RWChannel{
getChannel("chan1", 1),
getChannel("chan2", 1),
getChannel("chan3", 1),
}},
{2, []RWChannel{}},
NewNodeChannelInfo(1, getChannel("chan1", 1), getChannel("chan2", 1), getChannel("chan3", 1)),
NewNodeChannelInfo(2),
},
time.Now(),
},
[]*NodeChannelInfo{{1, []RWChannel{getChannel("chan1", 1)}}},
1,
nil,
},
}
@ -231,7 +228,7 @@ func TestBgCheckForChannelBalance(t *testing.T) {
policy := BgBalanceCheck
got, err := policy(tt.args.channels, tt.args.timestamp)
assert.Equal(t, tt.wantErr, err)
assert.EqualValues(t, tt.want, got)
assert.EqualValues(t, tt.want, len(got))
})
}
}
@ -252,10 +249,10 @@ func TestAvgReassignPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []RWChannel{getChannel("chan1", 1)}},
1: NewNodeChannelInfo(1, getChannel("chan1", 1)),
},
},
[]*NodeChannelInfo{{1, []RWChannel{getChannel("chan1", 1)}}},
[]*NodeChannelInfo{NewNodeChannelInfo(1, getChannel("chan1", 1))},
},
// as there's no available nodes except the input node, there's no reassign plan generated
NewChannelOpSet(),
@ -266,13 +263,13 @@ func TestAvgReassignPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []RWChannel{getChannel("chan1", 1)}},
2: {2, []RWChannel{}},
3: {2, []RWChannel{}},
4: {2, []RWChannel{}},
1: NewNodeChannelInfo(1, getChannel("chan1", 1)),
2: NewNodeChannelInfo(2),
3: NewNodeChannelInfo(3),
4: NewNodeChannelInfo(4),
},
},
[]*NodeChannelInfo{{1, []RWChannel{getChannel("chan1", 1)}}},
[]*NodeChannelInfo{NewNodeChannelInfo(1, getChannel("chan1", 1))},
},
// as we use ceil to calculate the wanted average number, there should be one reassign
// though the average num less than 1
@ -287,11 +284,11 @@ func TestAvgReassignPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []RWChannel{getChannel("chan1", 1), getChannel("chan2", 1)}},
2: {2, []RWChannel{}},
1: NewNodeChannelInfo(1, getChannel("chan1", 1), getChannel("chan2", 1)),
2: NewNodeChannelInfo(2),
},
},
[]*NodeChannelInfo{{1, []RWChannel{getChannel("chan1", 1), getChannel("chan2", 1)}}},
[]*NodeChannelInfo{NewNodeChannelInfo(1, getChannel("chan1", 1), getChannel("chan2", 1))},
},
NewChannelOpSet(
NewDeleteOp(1, getChannel("chan1", 1), getChannel("chan2", 1)),
@ -304,22 +301,19 @@ func TestAvgReassignPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []RWChannel{
getChannel("chan1", 1),
1: NewNodeChannelInfo(1, getChannel("chan1", 1),
getChannel("chan2", 1),
getChannel("chan3", 1),
getChannel("chan4", 1),
}},
2: {2, []RWChannel{}},
3: {3, []RWChannel{}},
4: {4, []RWChannel{}},
getChannel("chan4", 1)),
2: NewNodeChannelInfo(2),
3: NewNodeChannelInfo(3),
4: NewNodeChannelInfo(4),
},
},
[]*NodeChannelInfo{{1, []RWChannel{
getChannel("chan1", 1),
[]*NodeChannelInfo{NewNodeChannelInfo(1, getChannel("chan1", 1),
getChannel("chan2", 1),
getChannel("chan3", 1),
}}},
getChannel("chan4", 1))},
},
NewChannelOpSet(
NewDeleteOp(1, []RWChannel{
@ -338,7 +332,7 @@ func TestAvgReassignPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []RWChannel{
1: NewNodeChannelInfo(1,
getChannel("chan1", 1),
getChannel("chan2", 1),
getChannel("chan3", 1),
@ -350,17 +344,15 @@ func TestAvgReassignPolicy(t *testing.T) {
getChannel("chan9", 1),
getChannel("chan10", 1),
getChannel("chan11", 1),
getChannel("chan12", 1),
}},
2: {2, []RWChannel{
getChannel("chan12", 1)),
2: NewNodeChannelInfo(2,
getChannel("chan13", 1),
getChannel("chan14", 1),
}},
3: {3, []RWChannel{getChannel("chan15", 1)}},
4: {4, []RWChannel{}},
getChannel("chan14", 1)),
3: NewNodeChannelInfo(3, getChannel("chan15", 1)),
4: NewNodeChannelInfo(4),
},
},
[]*NodeChannelInfo{{1, []RWChannel{
[]*NodeChannelInfo{NewNodeChannelInfo(1,
getChannel("chan1", 1),
getChannel("chan2", 1),
getChannel("chan3", 1),
@ -372,8 +364,7 @@ func TestAvgReassignPolicy(t *testing.T) {
getChannel("chan9", 1),
getChannel("chan10", 1),
getChannel("chan11", 1),
getChannel("chan12", 1),
}}},
getChannel("chan12", 1))},
},
NewChannelOpSet(
NewDeleteOp(1, []RWChannel{
@ -430,7 +421,7 @@ func TestAvgBalanceChannelPolicy(t *testing.T) {
tests := []struct {
name string
args args
want *ChannelOpSet
want int
}{
{
"test_only_one_node",
@ -438,26 +429,24 @@ func TestAvgBalanceChannelPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {
1, []RWChannel{
getChannel("chan1", 1),
getChannel("chan2", 1),
getChannel("chan3", 1),
getChannel("chan4", 1),
},
},
2: {2, []RWChannel{}},
1: NewNodeChannelInfo(1,
getChannel("chan1", 1),
getChannel("chan2", 1),
getChannel("chan3", 1),
getChannel("chan4", 1),
),
2: NewNodeChannelInfo(2),
},
},
},
NewChannelOpSet(NewAddOp(1, getChannel("chan1", 1))),
1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := AvgBalanceChannelPolicy(tt.args.store, time.Now())
assert.EqualValues(t, tt.want.Collect(), got.Collect())
assert.EqualValues(t, tt.want, len(got.Collect()))
})
}
}
@ -468,10 +457,13 @@ func TestAvgAssignRegisterPolicy(t *testing.T) {
nodeID int64
}
tests := []struct {
name string
args args
bufferedUpdates *ChannelOpSet
balanceUpdates *ChannelOpSet
name string
args args
bufferedUpdates *ChannelOpSet
balanceUpdates *ChannelOpSet
exact bool
bufferedUpdatesNum int
balanceUpdatesNum int
}{
{
"test empty",
@ -479,13 +471,16 @@ func TestAvgAssignRegisterPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {NodeID: 1, Channels: make([]RWChannel, 0)},
1: NewNodeChannelInfo(1),
},
},
1,
},
NewChannelOpSet(),
NewChannelOpSet(),
true,
0,
0,
},
{
"test with buffer channel",
@ -493,8 +488,8 @@ func TestAvgAssignRegisterPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
bufferID: {bufferID, []RWChannel{getChannel("ch1", 1)}},
1: {NodeID: 1, Channels: []RWChannel{}},
bufferID: NewNodeChannelInfo(bufferID, getChannel("ch1", 1)),
1: NewNodeChannelInfo(1),
},
},
1,
@ -504,6 +499,9 @@ func TestAvgAssignRegisterPolicy(t *testing.T) {
NewAddOp(1, getChannel("ch1", 1)),
),
NewChannelOpSet(),
true,
0,
0,
},
{
"test with avg assign",
@ -511,14 +509,17 @@ func TestAvgAssignRegisterPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []RWChannel{getChannel("ch1", 1), getChannel("ch2", 1)}},
3: {3, []RWChannel{}},
1: NewNodeChannelInfo(1, getChannel("ch1", 1), getChannel("ch2", 1)),
3: NewNodeChannelInfo(3),
},
},
3,
},
NewChannelOpSet(),
NewChannelOpSet(NewAddOp(1, getChannel("ch1", 1))),
false,
0,
1,
},
{
"test with avg equals to zero",
@ -526,38 +527,49 @@ func TestAvgAssignRegisterPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []RWChannel{getChannel("ch1", 1)}},
2: {2, []RWChannel{getChannel("ch3", 1)}},
3: {3, []RWChannel{}},
1: NewNodeChannelInfo(1, getChannel("ch1", 1)),
2: NewNodeChannelInfo(2, getChannel("ch3", 1)),
3: NewNodeChannelInfo(3),
},
},
3,
},
NewChannelOpSet(),
NewChannelOpSet(),
true,
0,
0,
},
{
"test node with empty channel",
"test_node_with_empty_channel",
args{
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []RWChannel{getChannel("ch1", 1), getChannel("ch2", 1), getChannel("ch3", 1)}},
2: {2, []RWChannel{}},
3: {3, []RWChannel{}},
1: NewNodeChannelInfo(1, getChannel("ch1", 1), getChannel("ch2", 1), getChannel("ch3", 1)),
2: NewNodeChannelInfo(2),
3: NewNodeChannelInfo(3),
},
},
3,
},
NewChannelOpSet(),
NewChannelOpSet(NewAddOp(1, getChannel("ch1", 1))),
false,
0,
1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
bufferedUpdates, balanceUpdates := AvgAssignRegisterPolicy(tt.args.store, tt.args.nodeID)
assert.EqualValues(t, tt.bufferedUpdates.Collect(), bufferedUpdates.Collect())
assert.EqualValues(t, tt.balanceUpdates.Collect(), balanceUpdates.Collect())
if tt.exact {
assert.EqualValues(t, tt.bufferedUpdates.Collect(), bufferedUpdates.Collect())
assert.EqualValues(t, tt.balanceUpdates.Collect(), balanceUpdates.Collect())
} else {
assert.Equal(t, tt.bufferedUpdatesNum, len(bufferedUpdates.Collect()))
assert.Equal(t, tt.balanceUpdatesNum, len(balanceUpdates.Collect()))
}
})
}
}