Change channel to Interface (#27839)

This PR changes `*channel` into RWChannel interface

See also: #25309

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/28231/head
XuanYang-cn 2023-11-13 11:16:18 +08:00 committed by GitHub
parent 0aa90de141
commit a153950b10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 908 additions and 790 deletions

View File

@ -0,0 +1,85 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"fmt"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
)
type ROChannel interface {
String() string
GetName() string
GetCollectionID() UniqueID
GetStartPositions() []*commonpb.KeyDataPair
GetSchema() *schemapb.CollectionSchema
GetCreateTimestamp() Timestamp
GetWatchInfo() *datapb.ChannelWatchInfo
}
type RWChannel interface {
ROChannel
UpdateWatchInfo(info *datapb.ChannelWatchInfo)
}
var _ RWChannel = (*channelMeta)(nil)
type channelMeta struct {
Name string
CollectionID UniqueID
StartPositions []*commonpb.KeyDataPair
Schema *schemapb.CollectionSchema
CreateTimestamp uint64
WatchInfo *datapb.ChannelWatchInfo
}
func (ch *channelMeta) UpdateWatchInfo(info *datapb.ChannelWatchInfo) {
ch.WatchInfo = info
}
func (ch *channelMeta) GetWatchInfo() *datapb.ChannelWatchInfo {
return ch.WatchInfo
}
func (ch *channelMeta) GetName() string {
return ch.Name
}
func (ch *channelMeta) GetCollectionID() UniqueID {
return ch.CollectionID
}
func (ch *channelMeta) GetStartPositions() []*commonpb.KeyDataPair {
return ch.StartPositions
}
func (ch *channelMeta) GetSchema() *schemapb.CollectionSchema {
return ch.Schema
}
func (ch *channelMeta) GetCreateTimestamp() Timestamp {
return ch.CreateTimestamp
}
// String implement Stringer.
func (ch *channelMeta) String() string {
// schema maybe too large to print
return fmt.Sprintf("Name: %s, CollectionID: %d, StartPositions: %v", ch.Name, ch.CollectionID, ch.StartPositions)
}

View File

@ -22,13 +22,12 @@ import (
"sync"
"time"
"github.com/samber/lo"
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"stathat.com/c/consistent"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common"
@ -60,20 +59,6 @@ type ChannelManager struct {
lastActiveTimestamp time.Time
}
type channel struct {
Name string
CollectionID UniqueID
StartPositions []*commonpb.KeyDataPair
Schema *schemapb.CollectionSchema
CreateTimestamp uint64
}
// String implement Stringer.
func (ch *channel) String() string {
// schema maybe too large to print
return fmt.Sprintf("Name: %s, CollectionID: %d, StartPositions: %v", ch.Name, ch.CollectionID, ch.StartPositions)
}
// ChannelManagerOpt is to set optional parameters in channel manager.
type ChannelManagerOpt func(c *ChannelManager)
@ -251,17 +236,17 @@ func (c *ChannelManager) unwatchDroppedChannels() {
nodeChannels := c.store.GetChannels()
for _, nodeChannel := range nodeChannels {
for _, ch := range nodeChannel.Channels {
if !c.h.CheckShouldDropChannel(ch.Name) {
if !c.isMarkedDrop(ch.GetName()) {
continue
}
err := c.remove(nodeChannel.NodeID, ch)
if err != nil {
log.Warn("unable to remove channel", zap.String("channel", ch.Name), zap.Error(err))
log.Warn("unable to remove channel", zap.String("channel", ch.GetName()), zap.Error(err))
continue
}
err = c.h.FinishDropChannel(ch.Name)
err = c.h.FinishDropChannel(ch.GetName())
if err != nil {
log.Warn("FinishDropChannel failed when unwatchDroppedChannels", zap.String("channel", ch.Name), zap.Error(err))
log.Warn("FinishDropChannel failed when unwatchDroppedChannels", zap.String("channel", ch.GetName()), zap.Error(err))
}
}
}
@ -351,27 +336,24 @@ func (c *ChannelManager) AddNode(nodeID int64) error {
updates := bufferedUpdates
// try bufferedUpdates first
if len(updates) <= 0 {
if updates == nil {
if !Params.DataCoordCfg.AutoBalance.GetAsBool() {
log.Info("auto balance disabled, skip reassignment for balance", zap.Int64("registered node", nodeID))
return nil
}
// if auto balance enabled, try balanceUpdates
updates = balanceUpdates
}
if len(updates) <= 0 {
if updates == nil {
log.Info("register node with no reassignment", zap.Int64("registered node", nodeID))
return nil
}
log.Info("register node",
zap.Int64("registered node", nodeID),
zap.Array("updates", updates))
log.Info("register node", zap.Int64("registered node", nodeID), zap.Array("updates", updates))
state := datapb.ChannelWatchState_ToRelease
for _, u := range updates {
for _, u := range updates.Collect() {
if u.Type == Delete && u.NodeID == bufferID {
state = datapb.ChannelWatchState_ToWatch
break
@ -395,15 +377,13 @@ func (c *ChannelManager) DeleteNode(nodeID int64) error {
c.unsubAttempt(nodeChannelInfo)
updates := c.deregisterPolicy(c.store, nodeID)
log.Info("deregister node",
zap.Int64("nodeID", nodeID),
zap.Array("updates", updates))
if len(updates) <= 0 {
if updates == nil {
return nil
}
log.Info("deregister node", zap.Int64("nodeID", nodeID), zap.Array("updates", updates))
var channels []*channel
for _, op := range updates {
var channels []RWChannel
for _, op := range updates.Collect() {
if op.Type == Delete {
channels = op.Channels
}
@ -411,7 +391,7 @@ func (c *ChannelManager) DeleteNode(nodeID int64) error {
chNames := make([]string, 0, len(channels))
for _, ch := range channels {
chNames = append(chNames, ch.Name)
chNames = append(chNames, ch.GetName())
}
log.Info("remove timers for channel of the deregistered node",
zap.Strings("channels", chNames), zap.Int64("nodeID", nodeID))
@ -440,20 +420,20 @@ func (c *ChannelManager) unsubAttempt(ncInfo *NodeChannelInfo) {
nodeID := ncInfo.NodeID
for _, ch := range ncInfo.Channels {
// align to datanode subname, using vchannel name
subName := fmt.Sprintf("%s-%d-%s", Params.CommonCfg.DataNodeSubName.GetValue(), nodeID, ch.Name)
pchannelName := funcutil.ToPhysicalChannel(ch.Name)
subName := fmt.Sprintf("%s-%d-%s", Params.CommonCfg.DataNodeSubName.GetValue(), nodeID, ch.GetName())
pchannelName := funcutil.ToPhysicalChannel(ch.GetName())
msgstream.UnsubscribeChannels(c.ctx, c.msgstreamFactory, subName, []string{pchannelName})
}
}
// Watch tries to add the channel to cluster. Watch is a no op if the channel already exists.
func (c *ChannelManager) Watch(ctx context.Context, ch *channel) error {
func (c *ChannelManager) Watch(ctx context.Context, ch RWChannel) error {
log := log.Ctx(ctx)
c.mu.Lock()
defer c.mu.Unlock()
updates := c.assignPolicy(c.store, []*channel{ch})
if len(updates) == 0 {
updates := c.assignPolicy(c.store, []RWChannel{ch})
if updates == nil {
return nil
}
log.Info("try to update channel watch info with ToWatch state",
@ -468,20 +448,6 @@ func (c *ChannelManager) Watch(ctx context.Context, ch *channel) error {
return err
}
// fillChannelWatchInfo updates the channel op by filling in channel watch info.
func (c *ChannelManager) fillChannelWatchInfo(op *ChannelOp) {
for _, ch := range op.Channels {
vcInfo := c.h.GetDataVChanPositions(ch, allPartitionID)
info := &datapb.ChannelWatchInfo{
Vchan: vcInfo,
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_Uncomplete,
Schema: ch.Schema,
}
op.ChannelWatchInfos = append(op.ChannelWatchInfos, info)
}
}
// fillChannelWatchInfoWithState updates the channel op by filling in channel watch info.
func (c *ChannelManager) fillChannelWatchInfoWithState(op *ChannelOp, state datapb.ChannelWatchState) []string {
channelsWithTimer := []string{}
@ -493,16 +459,16 @@ func (c *ChannelManager) fillChannelWatchInfoWithState(op *ChannelOp, state data
Vchan: vcInfo,
StartTs: startTs,
State: state,
Schema: ch.Schema,
Schema: ch.GetSchema(),
}
// Only set timer for watchInfo not from bufferID
if op.NodeID != bufferID {
c.stateTimer.startOne(state, ch.Name, op.NodeID, checkInterval)
channelsWithTimer = append(channelsWithTimer, ch.Name)
c.stateTimer.startOne(state, ch.GetName(), op.NodeID, checkInterval)
channelsWithTimer = append(channelsWithTimer, ch.GetName())
}
op.ChannelWatchInfos = append(op.ChannelWatchInfos, info)
ch.UpdateWatchInfo(info)
}
return channelsWithTimer
}
@ -515,19 +481,6 @@ func (c *ChannelManager) GetAssignedChannels() []*NodeChannelInfo {
return c.store.GetNodesChannels()
}
func (c *ChannelManager) GetChannelsByCollectionID(collectionID UniqueID) []*channel {
channels := make([]*channel, 0)
for _, nodeChannels := range c.store.GetChannels() {
for _, channelInfo := range nodeChannels.Channels {
if collectionID == channelInfo.CollectionID {
channels = append(channels, channelInfo)
}
}
}
log.Info("get channel", zap.Any("collection", collectionID), zap.Any("channel", channels))
return channels
}
// GetBufferChannels gets buffer channels.
func (c *ChannelManager) GetBufferChannels() *NodeChannelInfo {
c.mu.RLock()
@ -536,6 +489,43 @@ func (c *ChannelManager) GetBufferChannels() *NodeChannelInfo {
return c.store.GetBufferChannelInfo()
}
// GetNodeChannelsByCollectionID gets all node channels map of the collection
func (c *ChannelManager) GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string {
nodeChs := make(map[UniqueID][]string)
for _, nodeChannels := range c.GetAssignedChannels() {
filtered := lo.Filter(nodeChannels.Channels, func(channel RWChannel, _ int) bool {
return channel.GetCollectionID() == collectionID
})
channelNames := lo.Map(filtered, func(channel RWChannel, _ int) string {
return channel.GetName()
})
nodeChs[nodeChannels.NodeID] = channelNames
}
return nodeChs
}
// Get all channels belong to the collection
func (c *ChannelManager) GetChannelsByCollectionID(collectionID UniqueID) []RWChannel {
channels := make([]RWChannel, 0)
for _, nodeChannels := range c.GetAssignedChannels() {
filtered := lo.Filter(nodeChannels.Channels, func(channel RWChannel, _ int) bool {
return channel.GetCollectionID() == collectionID
})
channels = append(channels, filtered...)
}
return channels
}
// Get all channel names belong to the collection
func (c *ChannelManager) GetChannelNamesByCollectionID(collectionID UniqueID) []string {
channels := c.GetChannelsByCollectionID(collectionID)
return lo.Map(channels, func(channel RWChannel, _ int) string {
return channel.GetName()
})
}
// Match checks and returns whether the node ID and channel match.
// use vchannel
func (c *ChannelManager) Match(nodeID int64, channel string) bool {
@ -548,7 +538,7 @@ func (c *ChannelManager) Match(nodeID int64, channel string) bool {
}
for _, ch := range info.Channels {
if ch.Name == channel {
if ch.GetName() == channel {
return true
}
}
@ -563,7 +553,7 @@ func (c *ChannelManager) FindWatcher(channel string) (int64, error) {
infos := c.store.GetNodesChannels()
for _, info := range infos {
for _, channelInfo := range info.Channels {
if channelInfo.Name == channel {
if channelInfo.GetName() == channel {
return info.NodeID, nil
}
}
@ -572,7 +562,7 @@ func (c *ChannelManager) FindWatcher(channel string) (int64, error) {
// channel in buffer
bufferInfo := c.store.GetBufferChannelInfo()
for _, channelInfo := range bufferInfo.Channels {
if channelInfo.Name == channel {
if channelInfo.GetName() == channel {
return bufferID, errChannelInBuffer
}
}
@ -593,24 +583,23 @@ func (c *ChannelManager) RemoveChannel(channelName string) error {
}
// remove deletes the nodeID-channel pair from data store.
func (c *ChannelManager) remove(nodeID int64, ch *channel) error {
var op ChannelOpSet
op.Delete(nodeID, []*channel{ch})
func (c *ChannelManager) remove(nodeID int64, ch RWChannel) error {
op := NewChannelOpSet(NewDeleteOp(nodeID, ch))
log.Info("remove channel assignment",
zap.Int64("nodeID to be removed", nodeID),
zap.String("channelID", ch.Name),
zap.Int64("collectionID", ch.CollectionID))
zap.String("channel", ch.GetName()),
zap.Int64("collectionID", ch.GetCollectionID()))
if err := c.store.Update(op); err != nil {
return err
}
return nil
}
func (c *ChannelManager) findChannel(channelName string) (int64, *channel) {
func (c *ChannelManager) findChannel(channelName string) (int64, RWChannel) {
infos := c.store.GetNodesChannels()
for _, info := range infos {
for _, channelInfo := range info.Channels {
if channelInfo.Name == channelName {
if channelInfo.GetName() == channelName {
return info.NodeID, channelInfo
}
}
@ -636,9 +625,9 @@ type ackEvent struct {
nodeID UniqueID
}
func (c *ChannelManager) updateWithTimer(updates ChannelOpSet, state datapb.ChannelWatchState) error {
func (c *ChannelManager) updateWithTimer(updates *ChannelOpSet, state datapb.ChannelWatchState) error {
channelsWithTimer := []string{}
for _, op := range updates {
for _, op := range updates.Collect() {
if op.Type == Add {
channelsWithTimer = append(channelsWithTimer, c.fillChannelWatchInfoWithState(op, state)...)
}
@ -784,7 +773,7 @@ func (c *ChannelManager) Release(nodeID UniqueID, channelName string) error {
return fmt.Errorf("fail to find matching nodeID: %d with channelName: %s", nodeID, channelName)
}
toReleaseUpdates := getReleaseOp(nodeID, toReleaseChannel)
toReleaseUpdates := NewChannelOpSet(NewAddOp(nodeID, toReleaseChannel))
err := c.updateWithTimer(toReleaseUpdates, datapb.ChannelWatchState_ToRelease)
if err != nil {
log.Warn("fail to update to release with timer", zap.Array("to release updates", toReleaseUpdates))
@ -803,7 +792,7 @@ func (c *ChannelManager) Reassign(originNodeID UniqueID, channelName string) err
}
c.mu.RUnlock()
reallocates := &NodeChannelInfo{originNodeID, []*channel{ch}}
reallocates := &NodeChannelInfo{originNodeID, []RWChannel{ch}}
isDropped := c.isMarkedDrop(channelName)
c.mu.Lock()
@ -826,12 +815,12 @@ func (c *ChannelManager) Reassign(originNodeID UniqueID, channelName string) err
// Reassign policy won't choose the original node when a reassigning a channel.
updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates})
if len(updates) <= 0 {
if updates == nil {
// Skip the remove if reassign to the original node.
log.Warn("failed to reassign channel to other nodes, assigning to the original DataNode",
zap.Int64("nodeID", originNodeID),
zap.String("channelName", channelName))
updates.Add(originNodeID, []*channel{ch})
updates = NewChannelOpSet(NewAddOp(originNodeID, ch))
}
log.Info("channel manager reassigning channels",
@ -853,12 +842,12 @@ func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string)
if c.msgstreamFactory == nil {
log.Warn("msgstream factory is not set, unable to clean up topics")
} else {
subName := fmt.Sprintf("%s-%d-%d", Params.CommonCfg.DataNodeSubName.GetValue(), nodeID, chToCleanUp.CollectionID)
subName := fmt.Sprintf("%s-%d-%d", Params.CommonCfg.DataNodeSubName.GetValue(), nodeID, chToCleanUp.GetCollectionID())
pchannelName := funcutil.ToPhysicalChannel(channelName)
msgstream.UnsubscribeChannels(c.ctx, c.msgstreamFactory, subName, []string{pchannelName})
}
reallocates := &NodeChannelInfo{nodeID, []*channel{chToCleanUp}}
reallocates := &NodeChannelInfo{nodeID, []RWChannel{chToCleanUp}}
isDropped := c.isMarkedDrop(channelName)
c.mu.Lock()
@ -884,12 +873,12 @@ func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string)
// Reassign policy won't choose the original node when a reassigning a channel.
updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates})
if len(updates) <= 0 {
if updates == nil {
// Skip the remove if reassign to the original node.
log.Warn("failed to reassign channel to other nodes, add channel to the original node",
zap.Int64("node ID", nodeID),
zap.String("channelName", channelName))
updates.Add(nodeID, []*channel{chToCleanUp})
updates = NewChannelOpSet(NewAddOp(nodeID, chToCleanUp))
}
log.Info("channel manager reassigning channels",
@ -898,8 +887,8 @@ func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string)
return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch)
}
func (c *ChannelManager) getChannelByNodeAndName(nodeID UniqueID, channelName string) *channel {
var ret *channel
func (c *ChannelManager) getChannelByNodeAndName(nodeID UniqueID, channelName string) RWChannel {
var ret RWChannel
nodeChannelInfo := c.store.GetNode(nodeID)
if nodeChannelInfo == nil {
@ -907,7 +896,7 @@ func (c *ChannelManager) getChannelByNodeAndName(nodeID UniqueID, channelName st
}
for _, channel := range nodeChannelInfo.Channels {
if channel.Name == channelName {
if channel.GetName() == channelName {
ret = channel
break
}
@ -915,10 +904,21 @@ func (c *ChannelManager) getChannelByNodeAndName(nodeID UniqueID, channelName st
return ret
}
func (c *ChannelManager) getCollectionIDByChannel(channel string) (bool, UniqueID) {
for _, nodeChannel := range c.GetAssignedChannels() {
for _, ch := range nodeChannel.Channels {
if ch.GetName() == channel {
return true, ch.GetCollectionID()
}
}
}
return false, 0
}
func (c *ChannelManager) getNodeIDByChannelName(chName string) (bool, UniqueID) {
for _, nodeChannel := range c.GetAssignedChannels() {
for _, ch := range nodeChannel.Channels {
if ch.Name == chName {
if ch.GetName() == chName {
return true, nodeChannel.NodeID
}
}
@ -930,12 +930,6 @@ func (c *ChannelManager) isMarkedDrop(channelName string) bool {
return c.h.CheckShouldDropChannel(channelName)
}
func getReleaseOp(nodeID UniqueID, ch *channel) ChannelOpSet {
var op ChannelOpSet
op.Add(nodeID, []*channel{ch})
return op
}
func (c *ChannelManager) isSilent() bool {
if c.stateTimer.hasRunningTimers() {
return false

View File

@ -75,14 +75,8 @@ func waitAndCheckState(t *testing.T, kv kv.MetaKv, expectedState datapb.ChannelW
}
}
func getOpsWithWatchInfo(nodeID UniqueID, ch *channel) ChannelOpSet {
var ops ChannelOpSet
ops.Add(nodeID, []*channel{ch})
for _, op := range ops {
op.ChannelWatchInfos = []*datapb.ChannelWatchInfo{{}}
}
return ops
func getTestOps(nodeID UniqueID, ch RWChannel) *ChannelOpSet {
return NewChannelOpSet(NewAddOp(nodeID, ch))
}
func TestChannelManager_StateTransfer(t *testing.T) {
@ -122,9 +116,9 @@ func TestChannelManager_StateTransfer(t *testing.T) {
}()
chManager.AddNode(nodeID)
chManager.Watch(ctx, &channel{Name: cName, CollectionID: collectionID})
chManager.Watch(ctx, &channelMeta{Name: cName, CollectionID: collectionID})
key := path.Join(prefix, strconv.FormatInt(nodeID, 10), cName)
key := buildNodeChannelKey(nodeID, cName)
waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess)
waitAndCheckState(t, watchkv, datapb.ChannelWatchState_WatchSuccess, nodeID, cName, collectionID)
@ -152,7 +146,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
}()
chManager.AddNode(nodeID)
chManager.Watch(ctx, &channel{Name: cName, CollectionID: collectionID})
chManager.Watch(ctx, &channelMeta{Name: cName, CollectionID: collectionID})
key := path.Join(prefix, strconv.FormatInt(nodeID, 10), cName)
waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchFailure)
@ -183,7 +177,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
}()
chManager.AddNode(nodeID)
chManager.Watch(ctx, &channel{Name: cName, CollectionID: collectionID})
chManager.Watch(ctx, &channelMeta{Name: cName, CollectionID: collectionID})
// simulating timeout behavior of startOne, cuz 20s is a long wait
e := &ackEvent{
@ -223,10 +217,10 @@ func TestChannelManager_StateTransfer(t *testing.T) {
chManager.store = &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{
{Name: cName, CollectionID: collectionID},
nodeID: {nodeID, []RWChannel{
&channelMeta{Name: cName, CollectionID: collectionID},
}},
oldNode: {oldNode, []*channel{}},
oldNode: {oldNode, []RWChannel{}},
},
}
@ -266,8 +260,8 @@ func TestChannelManager_StateTransfer(t *testing.T) {
chManager.store = &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{
{Name: cName, CollectionID: collectionID},
nodeID: {nodeID, []RWChannel{
&channelMeta{Name: cName, CollectionID: collectionID},
}},
},
}
@ -312,10 +306,10 @@ func TestChannelManager_StateTransfer(t *testing.T) {
chManager.store = &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{
{Name: cName, CollectionID: collectionID},
nodeID: {nodeID, []RWChannel{
&channelMeta{Name: cName, CollectionID: collectionID},
}},
oldNode: {oldNode, []*channel{}},
oldNode: {oldNode, []RWChannel{}},
},
}
@ -358,8 +352,8 @@ func TestChannelManager_StateTransfer(t *testing.T) {
chManager.store = &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{
{Name: cName, CollectionID: collectionID},
nodeID: {nodeID, []RWChannel{
&channelMeta{Name: cName, CollectionID: collectionID},
}},
},
}
@ -406,9 +400,9 @@ func TestChannelManager(t *testing.T) {
chManager.store = &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{
{Name: channel1, CollectionID: collectionID},
{Name: channel2, CollectionID: collectionID},
nodeID: {nodeID, []RWChannel{
&channelMeta{Name: channel1, CollectionID: collectionID},
&channelMeta{Name: channel2, CollectionID: collectionID},
}},
},
}
@ -421,7 +415,7 @@ func TestChannelManager(t *testing.T) {
assert.False(t, chManager.Match(nodeToAdd, channel1))
assert.False(t, chManager.Match(nodeToAdd, channel2))
err = chManager.Watch(context.TODO(), &channel{Name: "channel-3", CollectionID: collectionID})
err = chManager.Watch(context.TODO(), &channelMeta{Name: "channel-3", CollectionID: collectionID})
assert.NoError(t, err)
assert.True(t, chManager.Match(nodeToAdd, "channel-3"))
@ -444,9 +438,9 @@ func TestChannelManager(t *testing.T) {
chManager.store = &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
bufferID: {bufferID, []*channel{
{Name: channel1, CollectionID: collectionID},
{Name: channel2, CollectionID: collectionID},
bufferID: {bufferID, []RWChannel{
&channelMeta{Name: channel1, CollectionID: collectionID},
&channelMeta{Name: channel2, CollectionID: collectionID},
}},
},
}
@ -463,7 +457,7 @@ func TestChannelManager(t *testing.T) {
assert.True(t, chManager.Match(nodeID, channel1))
assert.True(t, chManager.Match(nodeID, channel2))
err = chManager.Watch(context.TODO(), &channel{Name: "channel-3", CollectionID: collectionID})
err = chManager.Watch(context.TODO(), &channelMeta{Name: "channel-3", CollectionID: collectionID})
assert.NoError(t, err)
waitAndCheckState(t, watchkv, datapb.ChannelWatchState_ToWatch, nodeID, "channel-3", collectionID)
@ -482,13 +476,13 @@ func TestChannelManager(t *testing.T) {
chManager, err := NewChannelManager(watchkv, newMockHandler())
require.NoError(t, err)
err = chManager.Watch(context.TODO(), &channel{Name: bufferCh, CollectionID: collectionID})
err = chManager.Watch(context.TODO(), &channelMeta{Name: bufferCh, CollectionID: collectionID})
assert.NoError(t, err)
waitAndCheckState(t, watchkv, datapb.ChannelWatchState_ToWatch, bufferID, bufferCh, collectionID)
chManager.store.Add(nodeID)
err = chManager.Watch(context.TODO(), &channel{Name: chanToAdd, CollectionID: collectionID})
err = chManager.Watch(context.TODO(), &channelMeta{Name: chanToAdd, CollectionID: collectionID})
assert.NoError(t, err)
waitAndCheckState(t, watchkv, datapb.ChannelWatchState_ToWatch, nodeID, chanToAdd, collectionID)
@ -508,7 +502,7 @@ func TestChannelManager(t *testing.T) {
chManager.store = &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{{Name: channelName, CollectionID: collectionID}}},
nodeID: {nodeID, []RWChannel{&channelMeta{Name: channelName, CollectionID: collectionID}}},
},
}
@ -540,7 +534,7 @@ func TestChannelManager(t *testing.T) {
// prepare tests
for _, test := range tests {
chManager.store.Add(test.nodeID)
ops := getOpsWithWatchInfo(test.nodeID, &channel{Name: test.chName, CollectionID: collectionID})
ops := getTestOps(test.nodeID, &channelMeta{Name: test.chName, CollectionID: collectionID, WatchInfo: &datapb.ChannelWatchInfo{}})
err = chManager.store.Update(ops)
require.NoError(t, err)
@ -591,7 +585,7 @@ func TestChannelManager(t *testing.T) {
require.NoError(t, err)
chManager.store.Add(1)
ops := getOpsWithWatchInfo(1, &channel{Name: "chan", CollectionID: collectionID})
ops := getTestOps(1, &channelMeta{Name: "chan", CollectionID: collectionID, WatchInfo: &datapb.ChannelWatchInfo{}})
err = chManager.store.Update(ops)
require.NoError(t, err)
@ -617,7 +611,7 @@ func TestChannelManager(t *testing.T) {
require.NoError(t, err)
chManager.store.Add(1)
ops := getOpsWithWatchInfo(1, &channel{Name: "chan", CollectionID: 1})
ops := getTestOps(1, &channelMeta{Name: "chan", CollectionID: 1, WatchInfo: &datapb.ChannelWatchInfo{}})
err = chManager.store.Update(ops)
require.NoError(t, err)
@ -642,7 +636,7 @@ func TestChannelManager(t *testing.T) {
require.NoError(t, err)
chManager.store.Add(1)
ops := getOpsWithWatchInfo(1, &channel{Name: "chan", CollectionID: 1})
ops := getTestOps(1, &channelMeta{Name: "chan", CollectionID: 1, WatchInfo: &datapb.ChannelWatchInfo{}})
err = chManager.store.Update(ops)
require.NoError(t, err)
@ -669,7 +663,7 @@ func TestChannelManager(t *testing.T) {
require.NoError(t, err)
chManager.store.Add(1)
ops := getOpsWithWatchInfo(1, &channel{Name: "chan", CollectionID: 1})
ops := getTestOps(1, &channelMeta{Name: "chan", CollectionID: 1, WatchInfo: &datapb.ChannelWatchInfo{}})
err = chManager.store.Update(ops)
require.NoError(t, err)
@ -688,11 +682,11 @@ func TestChannelManager(t *testing.T) {
chManager.store = &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
1: {1, []*channel{
{Name: "channel-1", CollectionID: collectionID},
{Name: "channel-2", CollectionID: collectionID},
1: {1, []RWChannel{
&channelMeta{Name: "channel-1", CollectionID: collectionID},
&channelMeta{Name: "channel-2", CollectionID: collectionID},
}},
bufferID: {bufferID, []*channel{}},
bufferID: {bufferID, []RWChannel{}},
},
}
chManager.stateTimer.startOne(datapb.ChannelWatchState_ToRelease, "channel-1", 1, Params.DataCoordCfg.WatchTimeoutInterval.GetAsDuration(time.Second))
@ -726,7 +720,7 @@ func TestChannelManager(t *testing.T) {
// prepare tests
for _, test := range tests {
chManager.store.Add(test.nodeID)
ops := getOpsWithWatchInfo(test.nodeID, &channel{Name: test.chName, CollectionID: collectionID})
ops := getTestOps(test.nodeID, &channelMeta{Name: test.chName, CollectionID: collectionID, WatchInfo: &datapb.ChannelWatchInfo{}})
err = chManager.store.Update(ops)
require.NoError(t, err)
@ -780,13 +774,13 @@ func TestChannelManager(t *testing.T) {
chManager.store = &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{{Name: channelName, CollectionID: collectionID}}},
nodeID: {nodeID, []RWChannel{&channelMeta{Name: channelName, CollectionID: collectionID}}},
},
}
ch = chManager.getChannelByNodeAndName(nodeID, channelName)
assert.NotNil(t, ch)
assert.Equal(t, collectionID, ch.CollectionID)
assert.Equal(t, channelName, ch.Name)
assert.Equal(t, collectionID, ch.GetCollectionID())
assert.Equal(t, channelName, ch.GetName())
})
t.Run("test fillChannelWatchInfoWithState", func(t *testing.T) {
@ -811,13 +805,13 @@ func TestChannelManager(t *testing.T) {
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
ops := getReleaseOp(nodeID, &channel{Name: channelName, CollectionID: collectionID})
for _, op := range ops {
ops := NewChannelOpSet(NewAddOp(nodeID, &channelMeta{Name: channelName, CollectionID: collectionID}))
for _, op := range ops.Collect() {
chs := chManager.fillChannelWatchInfoWithState(op, test.inState)
assert.Equal(t, 1, len(chs))
assert.Equal(t, channelName, chs[0])
assert.Equal(t, 1, len(op.ChannelWatchInfos))
assert.Equal(t, test.inState, op.ChannelWatchInfos[0].GetState())
assert.NotNil(t, op.Channels[0].GetWatchInfo())
assert.Equal(t, test.inState, op.Channels[0].GetWatchInfo().GetState())
chManager.stateTimer.removeTimers(chs)
}
@ -836,7 +830,7 @@ func TestChannelManager(t *testing.T) {
require.NoError(t, err)
chManager.store.Add(nodeID)
opSet := getReleaseOp(nodeID, &channel{Name: channelName, CollectionID: collectionID})
opSet := NewChannelOpSet(NewAddOp(nodeID, &channelMeta{Name: channelName, CollectionID: collectionID}))
chManager.updateWithTimer(opSet, datapb.ChannelWatchState_ToWatch)
chManager.stateTimer.removeTimers([]string{channelName})
@ -869,7 +863,7 @@ func TestChannelManager(t *testing.T) {
assert.False(t, chManager.stateTimer.hasRunningTimers())
// 3. watch one channel
chManager.Watch(ctx, &channel{Name: cName, CollectionID: collectionID})
chManager.Watch(ctx, &channelMeta{Name: cName, CollectionID: collectionID})
assert.False(t, chManager.isSilent())
assert.True(t, chManager.stateTimer.hasRunningTimers())
key := path.Join(prefix, strconv.FormatInt(nodeID, 10), cName)
@ -949,7 +943,7 @@ func TestChannelManager_Reload(t *testing.T) {
chManager.store = &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{{Name: channelName, CollectionID: collectionID}}},
nodeID: {nodeID, []RWChannel{&channelMeta{Name: channelName, CollectionID: collectionID}}},
},
}
@ -972,7 +966,7 @@ func TestChannelManager_Reload(t *testing.T) {
chManager.store = &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{{Name: channelName, CollectionID: collectionID}}},
nodeID: {nodeID, []RWChannel{&channelMeta{Name: channelName, CollectionID: collectionID}}},
},
}
@ -999,8 +993,8 @@ func TestChannelManager_Reload(t *testing.T) {
chManager.store = &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{{Name: channelName, CollectionID: collectionID}}},
999: {999, []*channel{}},
nodeID: {nodeID, []RWChannel{&channelMeta{Name: channelName, CollectionID: collectionID}}},
999: {999, []RWChannel{}},
},
}
require.NoError(t, err)
@ -1030,8 +1024,8 @@ func TestChannelManager_Reload(t *testing.T) {
cm.store = &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
1: {1, []*channel{{Name: "channel1", CollectionID: 1}}},
2: {2, []*channel{{Name: "channel2", CollectionID: 1}}},
1: {1, []RWChannel{&channelMeta{Name: "channel1", CollectionID: 1}}},
2: {2, []RWChannel{&channelMeta{Name: "channel2", CollectionID: 1}}},
},
}
@ -1083,10 +1077,10 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) {
chManager.store = &ChannelStore{
store: watchkv,
channelsInfo: map[int64]*NodeChannelInfo{
1: {1, []*channel{
{Name: "channel-1", CollectionID: collectionID},
{Name: "channel-2", CollectionID: collectionID},
{Name: "channel-3", CollectionID: collectionID},
1: {1, []RWChannel{
&channelMeta{Name: "channel-1", CollectionID: collectionID},
&channelMeta{Name: "channel-2", CollectionID: collectionID},
&channelMeta{Name: "channel-3", CollectionID: collectionID},
}},
},
}
@ -1107,7 +1101,7 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) {
assert.True(t, chManager.Match(2, "channel-1"))
chManager.AddNode(3)
chManager.Watch(ctx, &channel{Name: "channel-4", CollectionID: collectionID})
chManager.Watch(ctx, &channelMeta{Name: "channel-4", CollectionID: collectionID})
key = path.Join(prefix, "3", "channel-4")
waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess)
@ -1165,8 +1159,8 @@ func TestChannelManager_RemoveChannel(t *testing.T) {
channelsInfo: map[int64]*NodeChannelInfo{
1: {
NodeID: 1,
Channels: []*channel{
{Name: "ch1", CollectionID: 1},
Channels: []RWChannel{
&channelMeta{Name: "ch1", CollectionID: 1},
},
},
},
@ -1262,14 +1256,14 @@ func TestChannelManager_BackgroundChannelChecker(t *testing.T) {
mockStore.EXPECT().GetNodesChannels().Return([]*NodeChannelInfo{
{
NodeID: 1,
Channels: []*channel{
{
Channels: []RWChannel{
&channelMeta{
Name: "channel-1",
},
{
&channelMeta{
Name: "channel-2",
},
{
&channelMeta{
Name: "channel-3",
},
},
@ -1286,7 +1280,7 @@ func TestChannelManager_BackgroundChannelChecker(t *testing.T) {
updateCounter := atomic.NewInt64(0)
mockStore.EXPECT().Update(mock.Anything).Run(func(op ChannelOpSet) {
mockStore.EXPECT().Update(mock.Anything).Run(func(op *ChannelOpSet) {
updateCounter.Inc()
}).Return(nil).Maybe()

View File

@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
@ -32,6 +33,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
const (
@ -52,33 +54,131 @@ const (
// ChannelOp is an individual ADD or DELETE operation to the channel store.
type ChannelOp struct {
Type ChannelOpType
NodeID int64
Channels []*channel
ChannelWatchInfos []*datapb.ChannelWatchInfo
Type ChannelOpType
NodeID int64
Channels []RWChannel
}
// ChannelOpSet is a set of channel operations.
type ChannelOpSet []*ChannelOp
// Add appends a single operation to add the mapping between a node and channels.
func (cos *ChannelOpSet) Add(id int64, channels []*channel) {
*cos = append(*cos, &ChannelOp{
func NewAddOp(id int64, channels ...RWChannel) *ChannelOp {
return &ChannelOp{
NodeID: id,
Type: Add,
Channels: channels,
})
}
}
// Delete appends a single operation to remove the mapping between a node and channels.
func (cos *ChannelOpSet) Delete(id int64, channels []*channel) {
*cos = append(*cos, &ChannelOp{
func NewDeleteOp(id int64, channels ...RWChannel) *ChannelOp {
return &ChannelOp{
NodeID: id,
Type: Delete,
Channels: channels,
}
}
func (op *ChannelOp) Append(channels ...RWChannel) {
op.Channels = append(op.Channels, channels...)
}
func (op *ChannelOp) GetChannelNames() []string {
return lo.Map(op.Channels, func(c RWChannel, _ int) string {
return c.GetName()
})
}
func (op *ChannelOp) BuildKV() (map[string]string, []string, error) {
var (
saves = make(map[string]string)
removals = []string{}
)
for _, ch := range op.Channels {
k := buildNodeChannelKey(op.NodeID, ch.GetName())
switch op.Type {
case Add:
info, err := proto.Marshal(ch.GetWatchInfo())
if err != nil {
return saves, removals, err
}
saves[k] = string(info)
case Delete:
removals = append(removals, k)
default:
return saves, removals, errUnknownOpType
}
}
return saves, removals, nil
}
// ChannelOpSet is a set of channel operations.
type ChannelOpSet struct {
ops []*ChannelOp
}
func NewChannelOpSet(ops ...*ChannelOp) *ChannelOpSet {
if ops == nil {
ops = []*ChannelOp{}
}
return &ChannelOpSet{ops}
}
func (c *ChannelOpSet) Insert(ops ...*ChannelOp) {
c.ops = append(c.ops, ops...)
}
func (c *ChannelOpSet) Collect() []*ChannelOp {
if c == nil {
return []*ChannelOp{}
}
return c.ops
}
func (c *ChannelOpSet) Len() int {
if c == nil {
return 0
}
return len(c.ops)
}
// Add a new Add channel op, for ToWatch and ToRelease
func (c *ChannelOpSet) Add(id int64, channels ...RWChannel) {
c.ops = append(c.ops, NewAddOp(id, channels...))
}
func (c *ChannelOpSet) Delete(id int64, channels ...RWChannel) {
c.ops = append(c.ops, NewDeleteOp(id, channels...))
}
func (c *ChannelOpSet) GetChannelNumber() int {
if c == nil {
return 0
}
number := 0
for _, op := range c.ops {
number += len(op.Channels)
}
return number
}
func (c *ChannelOpSet) SplitByChannel() map[string]*ChannelOpSet {
perChOps := make(map[string]*ChannelOpSet)
for _, op := range c.Collect() {
for _, ch := range op.Channels {
if _, ok := perChOps[ch.GetName()]; !ok {
perChOps[ch.GetName()] = NewChannelOpSet()
}
if op.Type == Add {
perChOps[ch.GetName()].Add(op.NodeID, ch)
} else {
perChOps[ch.GetName()].Delete(op.NodeID, ch)
}
}
}
return perChOps
}
// ROChannelStore is a read only channel store for channels and nodes.
type ROChannelStore interface {
// GetNode returns the channel info of a specific node.
@ -103,9 +203,9 @@ type RWChannelStore interface {
// Add creates a new node-channels mapping, with no channels assigned to the node.
Add(nodeID int64)
// Delete removes nodeID and returns its channels.
Delete(nodeID int64) ([]*channel, error)
Delete(nodeID int64) ([]RWChannel, error)
// Update applies the operations in ChannelOpSet.
Update(op ChannelOpSet) error
Update(op *ChannelOpSet) error
}
// ChannelStore must satisfy RWChannelStore.
@ -120,7 +220,7 @@ type ChannelStore struct {
// NodeChannelInfo stores the nodeID and its channels.
type NodeChannelInfo struct {
NodeID int64
Channels []*channel
Channels []RWChannel
}
// NewChannelStore creates and returns a new ChannelStore.
@ -131,7 +231,7 @@ func NewChannelStore(kv kv.TxnKV) *ChannelStore {
}
c.channelsInfo[bufferID] = &NodeChannelInfo{
NodeID: bufferID,
Channels: make([]*channel, 0),
Channels: make([]RWChannel, 0),
}
return c
}
@ -158,10 +258,11 @@ func (c *ChannelStore) Reload() error {
reviseVChannelInfo(cw.GetVchan())
c.Add(nodeID)
channel := &channel{
channel := &channelMeta{
Name: cw.GetVchan().GetChannelName(),
CollectionID: cw.GetVchan().GetCollectionID(),
Schema: cw.GetSchema(),
WatchInfo: cw,
}
c.channelsInfo[nodeID].Channels = append(c.channelsInfo[nodeID].Channels, channel)
log.Info("channel store reload channel",
@ -181,59 +282,44 @@ func (c *ChannelStore) Add(nodeID int64) {
c.channelsInfo[nodeID] = &NodeChannelInfo{
NodeID: nodeID,
Channels: make([]*channel, 0),
Channels: make([]RWChannel, 0),
}
}
// Update applies the channel operations in opSet.
func (c *ChannelStore) Update(opSet ChannelOpSet) error {
totalChannelNum := 0
for _, op := range opSet {
totalChannelNum += len(op.Channels)
}
func (c *ChannelStore) Update(opSet *ChannelOpSet) error {
totalChannelNum := opSet.GetChannelNumber()
if totalChannelNum <= maxOperationsPerTxn {
return c.update(opSet)
}
// Split opset into multiple txn. Operations on the same channel must be executed in one txn.
perChOps := make(map[string]ChannelOpSet)
for _, op := range opSet {
for i, ch := range op.Channels {
chOp := &ChannelOp{
Type: op.Type,
NodeID: op.NodeID,
Channels: []*channel{ch},
}
if op.Type == Add {
chOp.ChannelWatchInfos = []*datapb.ChannelWatchInfo{op.ChannelWatchInfos[i]}
}
perChOps[ch.Name] = append(perChOps[ch.Name], chOp)
}
}
// Execute a txn for every 128 operations.
// Split opset into multiple txn. Operations on the same channel must be executed in one txn.
perChOps := opSet.SplitByChannel()
// Execute a txn for every 64 operations.
count := 0
operations := make([]*ChannelOp, 0, maxOperationsPerTxn)
for _, opset := range perChOps {
if count+len(opset) > maxOperationsPerTxn {
if err := c.update(operations); err != nil {
if count+opset.Len() > maxOperationsPerTxn {
if err := c.update(NewChannelOpSet(operations...)); err != nil {
return err
}
count = 0
operations = make([]*ChannelOp, 0, maxOperationsPerTxn)
}
count += len(opset)
operations = append(operations, opset...)
count += opset.Len()
operations = append(operations, opset.Collect()...)
}
if count == 0 {
return nil
}
return c.update(operations)
return c.update(NewChannelOpSet(operations...))
}
func (c *ChannelStore) checkIfExist(nodeID int64, channel *channel) bool {
func (c *ChannelStore) checkIfExist(nodeID int64, channel RWChannel) bool {
if _, ok := c.channelsInfo[nodeID]; ok {
for _, ch := range c.channelsInfo[nodeID].Channels {
if channel.Name == ch.Name && channel.CollectionID == ch.CollectionID {
if channel.GetName() == ch.GetName() && channel.GetCollectionID() == ch.GetCollectionID() {
return true
}
}
@ -242,14 +328,14 @@ func (c *ChannelStore) checkIfExist(nodeID int64, channel *channel) bool {
}
// update applies the ADD/DELETE operations to the current channel store.
func (c *ChannelStore) update(opSet ChannelOpSet) error {
func (c *ChannelStore) update(opSet *ChannelOpSet) error {
// Update ChannelStore's kv store.
if err := c.txn(opSet); err != nil {
return err
}
// Update node id -> channel mapping.
for _, op := range opSet {
for _, op := range opSet.Collect() {
switch op.Type {
case Add:
for _, ch := range op.Channels {
@ -260,15 +346,12 @@ func (c *ChannelStore) update(opSet ChannelOpSet) error {
c.channelsInfo[op.NodeID].Channels = append(c.channelsInfo[op.NodeID].Channels, ch)
}
case Delete:
// Remove target channels from channel store.
del := make(map[string]struct{})
for _, ch := range op.Channels {
del[ch.Name] = struct{}{}
}
del := typeutil.NewSet(op.GetChannelNames()...)
prev := c.channelsInfo[op.NodeID].Channels
curr := make([]*channel, 0, len(prev))
curr := make([]RWChannel, 0, len(prev))
for _, ch := range prev {
if _, ok := del[ch.Name]; !ok {
if !del.Contain(ch.GetName()) {
curr = append(curr, ch)
}
}
@ -331,7 +414,7 @@ func (c *ChannelStore) GetNodeChannelCount(nodeID int64) int {
}
// Delete removes the given node from the channel store and returns its channels.
func (c *ChannelStore) Delete(nodeID int64) ([]*channel, error) {
func (c *ChannelStore) Delete(nodeID int64) ([]RWChannel, error) {
for id, info := range c.channelsInfo {
if id == nodeID {
if err := c.remove(nodeID); err != nil {
@ -362,25 +445,19 @@ func (c *ChannelStore) remove(nodeID int64) error {
}
// txn updates the channelStore's kv store with the given channel ops.
func (c *ChannelStore) txn(opSet ChannelOpSet) error {
saves := make(map[string]string)
var removals []string
for _, op := range opSet {
for i, ch := range op.Channels {
k := buildNodeChannelKey(op.NodeID, ch.Name)
switch op.Type {
case Add:
info, err := proto.Marshal(op.ChannelWatchInfos[i])
if err != nil {
return err
}
saves[k] = string(info)
case Delete:
removals = append(removals, k)
default:
return errUnknownOpType
}
func (c *ChannelStore) txn(opSet *ChannelOpSet) error {
var (
saves = make(map[string]string)
removals []string
)
for _, op := range opSet.Collect() {
opSaves, opRemovals, err := op.BuildKV()
if err != nil {
return err
}
saves = lo.Assign(opSaves, saves)
removals = append(removals, opRemovals...)
}
return c.store.MultiSaveAndRemove(saves, removals)
}
@ -409,13 +486,13 @@ var ChannelOpTypeNames = []string{"Add", "Delete"}
// TODO: NIT: ObjectMarshaler -> ObjectMarshaller
// MarshalLogObject implements the interface ObjectMarshaler.
func (cu *ChannelOp) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("type", ChannelOpTypeNames[cu.Type])
enc.AddInt64("nodeID", cu.NodeID)
func (op *ChannelOp) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("type", ChannelOpTypeNames[op.Type])
enc.AddInt64("nodeID", op.NodeID)
cstr := "["
if len(cu.Channels) > 0 {
for _, s := range cu.Channels {
cstr += s.Name
if len(op.Channels) > 0 {
for _, s := range op.Channels {
cstr += s.GetName()
cstr += ", "
}
cstr = cstr[:len(cstr)-2]
@ -427,8 +504,8 @@ func (cu *ChannelOp) MarshalLogObject(enc zapcore.ObjectEncoder) error {
// TODO: NIT: ArrayMarshaler -> ArrayMarshaller
// MarshalLogArray implements the interface of ArrayMarshaler of zap.
func (cos ChannelOpSet) MarshalLogArray(enc zapcore.ArrayEncoder) error {
for _, o := range cos {
func (c *ChannelOpSet) MarshalLogArray(enc zapcore.ArrayEncoder) error {
for _, o := range c.Collect() {
enc.AppendObject(o)
}
return nil

View File

@ -35,10 +35,10 @@ import (
)
func genNodeChannelInfos(id int64, num int) *NodeChannelInfo {
channels := make([]*channel, 0, num)
channels := make([]RWChannel, 0, num)
for i := 0; i < num; i++ {
name := fmt.Sprintf("ch%d", i)
channels = append(channels, &channel{Name: name, CollectionID: 1})
channels = append(channels, &channelMeta{Name: name, CollectionID: 1, WatchInfo: &datapb.ChannelWatchInfo{}})
}
return &NodeChannelInfo{
NodeID: id,
@ -46,36 +46,24 @@ func genNodeChannelInfos(id int64, num int) *NodeChannelInfo {
}
}
func genChannelOperations(from, to int64, num int) ChannelOpSet {
ops := make([]*ChannelOp, 0, 2)
channels := make([]*channel, 0, num)
channelWatchInfos := make([]*datapb.ChannelWatchInfo, 0, num)
func genChannelOperations(from, to int64, num int) *ChannelOpSet {
channels := make([]RWChannel, 0, num)
for i := 0; i < num; i++ {
name := fmt.Sprintf("ch%d", i)
channels = append(channels, &channel{Name: name, CollectionID: 1})
channelWatchInfos = append(channelWatchInfos, &datapb.ChannelWatchInfo{})
channels = append(channels, &channelMeta{Name: name, CollectionID: 1, WatchInfo: &datapb.ChannelWatchInfo{}})
}
ops = append(ops, &ChannelOp{
Type: Delete,
NodeID: from,
Channels: channels,
})
ops = append(ops, &ChannelOp{
Type: Add,
NodeID: to,
Channels: channels,
ChannelWatchInfos: channelWatchInfos,
})
ops := NewChannelOpSet(
NewAddOp(to, channels...),
NewDeleteOp(from, channels...),
)
return ops
}
func TestChannelStore_Update(t *testing.T) {
txnKv := mocks.NewTxnKV(t)
txnKv.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything).Run(func(saves map[string]string, removals []string, preds ...predicates.Predicate) {
assert.False(t, len(saves)+len(removals) > 128, "too many operations")
assert.False(t, len(saves)+len(removals) > 64, "too many operations")
}).Return(nil)
type fields struct {
@ -83,7 +71,7 @@ func TestChannelStore_Update(t *testing.T) {
channelsInfo map[int64]*NodeChannelInfo
}
type args struct {
opSet ChannelOpSet
opSet *ChannelOpSet
}
tests := []struct {
name string

View File

@ -72,7 +72,7 @@ func (c *Cluster) UnRegister(node *NodeInfo) error {
// Watch tries to add a channel in datanode cluster
func (c *Cluster) Watch(ctx context.Context, ch string, collectionID UniqueID) error {
return c.channelManager.Watch(ctx, &channel{Name: ch, CollectionID: collectionID})
return c.channelManager.Watch(ctx, &channelMeta{Name: ch, CollectionID: collectionID})
}
// Flush sends flush requests to dataNodes specified
@ -88,7 +88,7 @@ func (c *Cluster) Flush(ctx context.Context, nodeID int64, channel string,
return fmt.Errorf("channel %s is not watched on node %d", channel, nodeID)
}
ch := c.channelManager.getChannelByNodeAndName(nodeID, channel)
_, collID := c.channelManager.getCollectionIDByChannel(channel)
getSegmentID := func(segment *datapb.SegmentInfo, _ int) int64 {
return segment.GetID()
@ -100,7 +100,7 @@ func (c *Cluster) Flush(ctx context.Context, nodeID int64, channel string,
commonpbutil.WithSourceID(paramtable.GetNodeID()),
commonpbutil.WithTargetID(nodeID),
),
CollectionID: ch.CollectionID,
CollectionID: collID,
SegmentIDs: lo.Map(segments, getSegmentID),
ChannelName: channel,
}

View File

@ -133,7 +133,21 @@ func (suite *ClusterSuite) TestCreate() {
suite.NoError(err)
channels := channelManager.GetAssignedChannels()
suite.EqualValues([]*NodeChannelInfo{{1, []*channel{{Name: "channel1", CollectionID: 1}}}}, channels)
suite.EqualValues([]*NodeChannelInfo{{1, []RWChannel{
&channelMeta{
Name: "channel1",
CollectionID: 1,
WatchInfo: &datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{
CollectionID: 1,
ChannelName: "channel1",
UnflushedSegmentIds: []int64{},
FlushedSegmentIds: []int64{},
DroppedSegmentIds: []int64{},
},
},
},
}}}, channels)
})
suite.Run("remove_all_nodes_and_restart_with_other_nodes", func() {
@ -235,7 +249,7 @@ func (suite *ClusterSuite) TestRegister() {
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, newMockHandler())
suite.NoError(err)
err = channelManager.Watch(context.TODO(), &channel{
err = channelManager.Watch(context.TODO(), &channelMeta{
Name: "ch1",
CollectionID: 0,
})
@ -256,7 +270,7 @@ func (suite *ClusterSuite) TestRegister() {
nodeChannels := channelManager.GetAssignedChannels()
suite.EqualValues(1, len(nodeChannels))
suite.EqualValues(1, nodeChannels[0].NodeID)
suite.EqualValues("ch1", nodeChannels[0].Channels[0].Name)
suite.EqualValues("ch1", nodeChannels[0].Channels[0].GetName())
suite.MetricsEqual(metrics.DataCoordNumDataNodes, 1)
})
@ -356,7 +370,7 @@ func (suite *ClusterSuite) TestUnregister() {
suite.EqualValues(1, len(channels))
suite.EqualValues(2, channels[0].NodeID)
suite.EqualValues(1, len(channels[0].Channels))
suite.EqualValues("ch1", channels[0].Channels[0].Name)
suite.EqualValues("ch1", channels[0].Channels[0].GetName())
suite.MetricsEqual(metrics.DataCoordNumDataNodes, 1)
})
@ -391,7 +405,7 @@ func (suite *ClusterSuite) TestUnregister() {
channel := channelManager.GetBufferChannels()
suite.NotNil(channel)
suite.EqualValues(1, len(channel.Channels))
suite.EqualValues("ch_1", channel.Channels[0].Name)
suite.EqualValues("ch_1", channel.Channels[0].GetName())
suite.MetricsEqual(metrics.DataCoordNumDataNodes, 0)
})
@ -435,7 +449,7 @@ func TestWatchIfNeeded(t *testing.T) {
assert.NoError(t, err)
channels := channelManager.GetAssignedChannels()
assert.EqualValues(t, 1, len(channels))
assert.EqualValues(t, "ch1", channels[0].Channels[0].Name)
assert.EqualValues(t, "ch1", channels[0].Channels[0].GetName())
})
t.Run("watch channel to empty cluster", func(t *testing.T) {
@ -456,7 +470,7 @@ func TestWatchIfNeeded(t *testing.T) {
assert.Empty(t, channels)
channel := channelManager.GetBufferChannels()
assert.NotNil(t, channel)
assert.EqualValues(t, "ch1", channel.Channels[0].Name)
assert.EqualValues(t, "ch1", channel.Channels[0].GetName())
})
}

View File

@ -242,7 +242,7 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
chManager: &ChannelManager{
store: &ChannelStore{
channelsInfo: map[int64]*NodeChannelInfo{
1: {NodeID: 1, Channels: []*channel{{Name: "ch1"}}},
1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: "ch1"}}},
},
},
},
@ -262,8 +262,8 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
chManager: &ChannelManager{
store: &ChannelStore{
channelsInfo: map[int64]*NodeChannelInfo{
1: {NodeID: 1, Channels: []*channel{}},
bufferID: {NodeID: bufferID, Channels: []*channel{}},
1: {NodeID: 1, Channels: []RWChannel{}},
bufferID: {NodeID: bufferID, Channels: []RWChannel{}},
},
},
},
@ -289,7 +289,7 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
Params.Save(Params.DataCoordCfg.CompactionCheckIntervalInSeconds.Key, "1")
c.start()
err := c.execCompactionPlan(tt.args.signal, tt.args.plan)
assert.ErrorIs(t, tt.err, err)
require.ErrorIs(t, tt.err, err)
task := c.getCompaction(tt.args.plan.PlanID)
if !tt.wantErr {
@ -329,7 +329,7 @@ func Test_compactionPlanHandler_execWithParallels(t *testing.T) {
chManager: &ChannelManager{
store: &ChannelStore{
channelsInfo: map[int64]*NodeChannelInfo{
1: {NodeID: 1, Channels: []*channel{{Name: "ch1"}}},
1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: "ch1"}}},
},
},
},

View File

@ -36,9 +36,9 @@ import (
// Handler handles some channel method for ChannelManager
type Handler interface {
// GetQueryVChanPositions gets the information recovery needed of a channel for QueryCoord
GetQueryVChanPositions(ch *channel, partitionIDs ...UniqueID) *datapb.VchannelInfo
GetQueryVChanPositions(ch RWChannel, partitionIDs ...UniqueID) *datapb.VchannelInfo
// GetDataVChanPositions gets the information recovery needed of a channel for DataNode
GetDataVChanPositions(ch *channel, partitionID UniqueID) *datapb.VchannelInfo
GetDataVChanPositions(ch RWChannel, partitionID UniqueID) *datapb.VchannelInfo
CheckShouldDropChannel(ch string) bool
FinishDropChannel(ch string) error
GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error)
@ -55,13 +55,13 @@ func newServerHandler(s *Server) *ServerHandler {
}
// GetDataVChanPositions gets vchannel latest positions with provided dml channel names for DataNode.
func (h *ServerHandler) GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo {
func (h *ServerHandler) GetDataVChanPositions(channel RWChannel, partitionID UniqueID) *datapb.VchannelInfo {
segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
return s.InsertChannel == channel.Name && !s.GetIsFake()
return s.InsertChannel == channel.GetName() && !s.GetIsFake()
})
log.Info("GetDataVChanPositions",
zap.Int64("collectionID", channel.CollectionID),
zap.String("channel", channel.Name),
zap.Int64("collectionID", channel.GetCollectionID()),
zap.String("channel", channel.GetName()),
zap.Int("numOfSegments", len(segments)),
)
var (
@ -90,8 +90,8 @@ func (h *ServerHandler) GetDataVChanPositions(channel *channel, partitionID Uniq
}
return &datapb.VchannelInfo{
CollectionID: channel.CollectionID,
ChannelName: channel.Name,
CollectionID: channel.GetCollectionID(),
ChannelName: channel.GetName(),
SeekPosition: h.GetChannelSeekPosition(channel, partitionID),
FlushedSegmentIds: flushedIDs.Collect(),
UnflushedSegmentIds: unflushedIDs.Collect(),
@ -102,10 +102,10 @@ func (h *ServerHandler) GetDataVChanPositions(channel *channel, partitionID Uniq
// GetQueryVChanPositions gets vchannel latest positions with provided dml channel names for QueryCoord,
// we expect QueryCoord gets the indexed segments to load, so the flushed segments below are actually the indexed segments,
// the unflushed segments are actually the segments without index, even they are flushed.
func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionIDs ...UniqueID) *datapb.VchannelInfo {
func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs ...UniqueID) *datapb.VchannelInfo {
// cannot use GetSegmentsByChannel since dropped segments are needed here
segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
return s.InsertChannel == channel.Name && !s.GetIsFake()
return s.InsertChannel == channel.GetName() && !s.GetIsFake()
})
segmentInfos := make(map[int64]*SegmentInfo)
indexedSegments := FilterInIndexedSegments(h, h.s.meta, segments...)
@ -114,8 +114,8 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionIDs ..
indexed.Insert(segment.GetID())
}
log.Info("GetQueryVChanPositions",
zap.Int64("collectionID", channel.CollectionID),
zap.String("channel", channel.Name),
zap.Int64("collectionID", channel.GetCollectionID()),
zap.String("channel", channel.GetName()),
zap.Int("numOfSegments", len(segments)),
zap.Int("indexed segment", len(indexedSegments)),
)
@ -203,8 +203,8 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionIDs ..
indexedIDs.Insert(unIndexedIDs.Collect()...)
return &datapb.VchannelInfo{
CollectionID: channel.CollectionID,
ChannelName: channel.Name,
CollectionID: channel.GetCollectionID(),
ChannelName: channel.GetName(),
SeekPosition: h.GetChannelSeekPosition(channel, partitionIDs...),
FlushedSegmentIds: indexedIDs.Collect(),
UnflushedSegmentIds: growingIDs.Collect(),
@ -214,12 +214,12 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionIDs ..
// getEarliestSegmentDMLPos returns the earliest dml position of segments,
// this is mainly for COMPATIBILITY with old version <=2.1.x
func (h *ServerHandler) getEarliestSegmentDMLPos(channel *channel, partitionIDs ...UniqueID) *msgpb.MsgPosition {
func (h *ServerHandler) getEarliestSegmentDMLPos(channel string, partitionIDs ...UniqueID) *msgpb.MsgPosition {
var minPos *msgpb.MsgPosition
var minPosSegID int64
var minPosTs uint64
segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
return s.InsertChannel == channel.Name
return s.InsertChannel == channel
})
validPartitions := lo.Filter(partitionIDs, func(partitionID int64, _ int) bool { return partitionID > allPartitionID })
@ -259,27 +259,26 @@ func (h *ServerHandler) getEarliestSegmentDMLPos(channel *channel, partitionIDs
}
// getCollectionStartPos returns collection start position.
func (h *ServerHandler) getCollectionStartPos(channel *channel) *msgpb.MsgPosition {
func (h *ServerHandler) getCollectionStartPos(channel RWChannel) *msgpb.MsgPosition {
log := log.With(zap.String("channel", channel.GetName()))
// use collection start position when segment position is not found
var startPosition *msgpb.MsgPosition
if channel.StartPositions == nil {
collection, err := h.GetCollection(h.s.ctx, channel.CollectionID)
if channel.GetStartPositions() == nil {
collection, err := h.GetCollection(h.s.ctx, channel.GetCollectionID())
if collection != nil && err == nil {
startPosition = getCollectionStartPosition(channel.Name, collection)
startPosition = getCollectionStartPosition(channel.GetName(), collection)
}
log.Info("NEITHER segment position or channel start position are found, setting channel seek position to collection start position",
zap.String("channel", channel.Name),
zap.Uint64("posTs", startPosition.GetTimestamp()),
zap.Time("posTime", tsoutil.PhysicalTime(startPosition.GetTimestamp())),
)
} else {
// use passed start positions, skip to ask RootCoord.
startPosition = toMsgPosition(channel.Name, channel.StartPositions)
startPosition = toMsgPosition(channel.GetName(), channel.GetStartPositions())
if startPosition != nil {
startPosition.Timestamp = channel.CreateTimestamp
startPosition.Timestamp = channel.GetCreateTimestamp()
}
log.Info("segment position not found, setting channel seek position to channel start position",
zap.String("channel", channel.Name),
zap.Uint64("posTs", startPosition.GetTimestamp()),
zap.Time("posTime", tsoutil.PhysicalTime(startPosition.GetTimestamp())),
)
@ -292,21 +291,20 @@ func (h *ServerHandler) getCollectionStartPos(channel *channel) *msgpb.MsgPositi
// 2. Segments earliest dml position;
// 3. Collection start position;
// And would return if any position is valid.
func (h *ServerHandler) GetChannelSeekPosition(channel *channel, partitionIDs ...UniqueID) *msgpb.MsgPosition {
func (h *ServerHandler) GetChannelSeekPosition(channel RWChannel, partitionIDs ...UniqueID) *msgpb.MsgPosition {
log := log.With(zap.String("channel", channel.GetName()))
var seekPosition *msgpb.MsgPosition
seekPosition = h.s.meta.GetChannelCheckpoint(channel.Name)
seekPosition = h.s.meta.GetChannelCheckpoint(channel.GetName())
if seekPosition != nil {
log.Info("channel seek position set from channel checkpoint meta",
zap.String("channel", channel.Name),
zap.Uint64("posTs", seekPosition.Timestamp),
zap.Time("posTime", tsoutil.PhysicalTime(seekPosition.GetTimestamp())))
return seekPosition
}
seekPosition = h.getEarliestSegmentDMLPos(channel, partitionIDs...)
seekPosition = h.getEarliestSegmentDMLPos(channel.GetName(), partitionIDs...)
if seekPosition != nil {
log.Info("channel seek position set from earliest segment dml position",
zap.String("channel", channel.Name),
zap.Uint64("posTs", seekPosition.Timestamp),
zap.Time("posTime", tsoutil.PhysicalTime(seekPosition.GetTimestamp())))
return seekPosition
@ -315,14 +313,12 @@ func (h *ServerHandler) GetChannelSeekPosition(channel *channel, partitionIDs ..
seekPosition = h.getCollectionStartPos(channel)
if seekPosition != nil {
log.Info("channel seek position set from collection start position",
zap.String("channel", channel.Name),
zap.Uint64("posTs", seekPosition.Timestamp),
zap.Time("posTime", tsoutil.PhysicalTime(seekPosition.GetTimestamp())))
return seekPosition
}
log.Warn("get channel checkpoint failed, channelCPMeta and earliestSegDMLPos and collStartPos are all invalid",
zap.String("channel", channel.Name))
log.Warn("get channel checkpoint failed, channelCPMeta and earliestSegDMLPos and collStartPos are all invalid")
return nil
}

View File

@ -51,19 +51,19 @@ func (_c *MockRWChannelStore_Add_Call) RunAndReturn(run func(int64)) *MockRWChan
}
// Delete provides a mock function with given fields: nodeID
func (_m *MockRWChannelStore) Delete(nodeID int64) ([]*channel, error) {
func (_m *MockRWChannelStore) Delete(nodeID int64) ([]RWChannel, error) {
ret := _m.Called(nodeID)
var r0 []*channel
var r0 []RWChannel
var r1 error
if rf, ok := ret.Get(0).(func(int64) ([]*channel, error)); ok {
if rf, ok := ret.Get(0).(func(int64) ([]RWChannel, error)); ok {
return rf(nodeID)
}
if rf, ok := ret.Get(0).(func(int64) []*channel); ok {
if rf, ok := ret.Get(0).(func(int64) []RWChannel); ok {
r0 = rf(nodeID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*channel)
r0 = ret.Get(0).([]RWChannel)
}
}
@ -94,12 +94,12 @@ func (_c *MockRWChannelStore_Delete_Call) Run(run func(nodeID int64)) *MockRWCha
return _c
}
func (_c *MockRWChannelStore_Delete_Call) Return(_a0 []*channel, _a1 error) *MockRWChannelStore_Delete_Call {
func (_c *MockRWChannelStore_Delete_Call) Return(_a0 []RWChannel, _a1 error) *MockRWChannelStore_Delete_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockRWChannelStore_Delete_Call) RunAndReturn(run func(int64) ([]*channel, error)) *MockRWChannelStore_Delete_Call {
func (_c *MockRWChannelStore_Delete_Call) RunAndReturn(run func(int64) ([]RWChannel, error)) *MockRWChannelStore_Delete_Call {
_c.Call.Return(run)
return _c
}
@ -404,11 +404,11 @@ func (_c *MockRWChannelStore_Reload_Call) RunAndReturn(run func() error) *MockRW
}
// Update provides a mock function with given fields: op
func (_m *MockRWChannelStore) Update(op ChannelOpSet) error {
func (_m *MockRWChannelStore) Update(op *ChannelOpSet) error {
ret := _m.Called(op)
var r0 error
if rf, ok := ret.Get(0).(func(ChannelOpSet) error); ok {
if rf, ok := ret.Get(0).(func(*ChannelOpSet) error); ok {
r0 = rf(op)
} else {
r0 = ret.Error(0)
@ -423,14 +423,14 @@ type MockRWChannelStore_Update_Call struct {
}
// Update is a helper method to define mock.On call
// - op ChannelOpSet
// - op *ChannelOpSet
func (_e *MockRWChannelStore_Expecter) Update(op interface{}) *MockRWChannelStore_Update_Call {
return &MockRWChannelStore_Update_Call{Call: _e.mock.On("Update", op)}
}
func (_c *MockRWChannelStore_Update_Call) Run(run func(op ChannelOpSet)) *MockRWChannelStore_Update_Call {
func (_c *MockRWChannelStore_Update_Call) Run(run func(op *ChannelOpSet)) *MockRWChannelStore_Update_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(ChannelOpSet))
run(args[0].(*ChannelOpSet))
})
return _c
}
@ -440,7 +440,7 @@ func (_c *MockRWChannelStore_Update_Call) Return(_a0 error) *MockRWChannelStore_
return _c
}
func (_c *MockRWChannelStore_Update_Call) RunAndReturn(run func(ChannelOpSet) error) *MockRWChannelStore_Update_Call {
func (_c *MockRWChannelStore_Update_Call) RunAndReturn(run func(*ChannelOpSet) error) *MockRWChannelStore_Update_Call {
_c.Call.Return(run)
return _c
}

View File

@ -162,11 +162,11 @@ func (_c *NMockHandler_GetCollection_Call) RunAndReturn(run func(context.Context
}
// GetDataVChanPositions provides a mock function with given fields: ch, partitionID
func (_m *NMockHandler) GetDataVChanPositions(ch *channel, partitionID int64) *datapb.VchannelInfo {
func (_m *NMockHandler) GetDataVChanPositions(ch RWChannel, partitionID int64) *datapb.VchannelInfo {
ret := _m.Called(ch, partitionID)
var r0 *datapb.VchannelInfo
if rf, ok := ret.Get(0).(func(*channel, int64) *datapb.VchannelInfo); ok {
if rf, ok := ret.Get(0).(func(RWChannel, int64) *datapb.VchannelInfo); ok {
r0 = rf(ch, partitionID)
} else {
if ret.Get(0) != nil {
@ -183,15 +183,15 @@ type NMockHandler_GetDataVChanPositions_Call struct {
}
// GetDataVChanPositions is a helper method to define mock.On call
// - ch *channel
// - ch RWChannel
// - partitionID int64
func (_e *NMockHandler_Expecter) GetDataVChanPositions(ch interface{}, partitionID interface{}) *NMockHandler_GetDataVChanPositions_Call {
return &NMockHandler_GetDataVChanPositions_Call{Call: _e.mock.On("GetDataVChanPositions", ch, partitionID)}
}
func (_c *NMockHandler_GetDataVChanPositions_Call) Run(run func(ch *channel, partitionID int64)) *NMockHandler_GetDataVChanPositions_Call {
func (_c *NMockHandler_GetDataVChanPositions_Call) Run(run func(ch RWChannel, partitionID int64)) *NMockHandler_GetDataVChanPositions_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*channel), args[1].(int64))
run(args[0].(RWChannel), args[1].(int64))
})
return _c
}
@ -201,13 +201,13 @@ func (_c *NMockHandler_GetDataVChanPositions_Call) Return(_a0 *datapb.VchannelIn
return _c
}
func (_c *NMockHandler_GetDataVChanPositions_Call) RunAndReturn(run func(*channel, int64) *datapb.VchannelInfo) *NMockHandler_GetDataVChanPositions_Call {
func (_c *NMockHandler_GetDataVChanPositions_Call) RunAndReturn(run func(RWChannel, int64) *datapb.VchannelInfo) *NMockHandler_GetDataVChanPositions_Call {
_c.Call.Return(run)
return _c
}
// GetQueryVChanPositions provides a mock function with given fields: ch, partitionIDs
func (_m *NMockHandler) GetQueryVChanPositions(ch *channel, partitionIDs ...int64) *datapb.VchannelInfo {
func (_m *NMockHandler) GetQueryVChanPositions(ch RWChannel, partitionIDs ...int64) *datapb.VchannelInfo {
_va := make([]interface{}, len(partitionIDs))
for _i := range partitionIDs {
_va[_i] = partitionIDs[_i]
@ -218,7 +218,7 @@ func (_m *NMockHandler) GetQueryVChanPositions(ch *channel, partitionIDs ...int6
ret := _m.Called(_ca...)
var r0 *datapb.VchannelInfo
if rf, ok := ret.Get(0).(func(*channel, ...int64) *datapb.VchannelInfo); ok {
if rf, ok := ret.Get(0).(func(RWChannel, ...int64) *datapb.VchannelInfo); ok {
r0 = rf(ch, partitionIDs...)
} else {
if ret.Get(0) != nil {
@ -235,14 +235,14 @@ type NMockHandler_GetQueryVChanPositions_Call struct {
}
// GetQueryVChanPositions is a helper method to define mock.On call
// - ch *channel
// - ch RWChannel
// - partitionIDs ...int64
func (_e *NMockHandler_Expecter) GetQueryVChanPositions(ch interface{}, partitionIDs ...interface{}) *NMockHandler_GetQueryVChanPositions_Call {
return &NMockHandler_GetQueryVChanPositions_Call{Call: _e.mock.On("GetQueryVChanPositions",
append([]interface{}{ch}, partitionIDs...)...)}
}
func (_c *NMockHandler_GetQueryVChanPositions_Call) Run(run func(ch *channel, partitionIDs ...int64)) *NMockHandler_GetQueryVChanPositions_Call {
func (_c *NMockHandler_GetQueryVChanPositions_Call) Run(run func(ch RWChannel, partitionIDs ...int64)) *NMockHandler_GetQueryVChanPositions_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]int64, len(args)-1)
for i, a := range args[1:] {
@ -250,7 +250,7 @@ func (_c *NMockHandler_GetQueryVChanPositions_Call) Run(run func(ch *channel, pa
variadicArgs[i] = a.(int64)
}
}
run(args[0].(*channel), variadicArgs...)
run(args[0].(RWChannel), variadicArgs...)
})
return _c
}
@ -260,7 +260,7 @@ func (_c *NMockHandler_GetQueryVChanPositions_Call) Return(_a0 *datapb.VchannelI
return _c
}
func (_c *NMockHandler_GetQueryVChanPositions_Call) RunAndReturn(run func(*channel, ...int64) *datapb.VchannelInfo) *NMockHandler_GetQueryVChanPositions_Call {
func (_c *NMockHandler_GetQueryVChanPositions_Call) RunAndReturn(run func(RWChannel, ...int64) *datapb.VchannelInfo) *NMockHandler_GetQueryVChanPositions_Call {
_c.Call.Return(run)
return _c
}

View File

@ -770,17 +770,17 @@ func newMockHandler() *mockHandler {
return &mockHandler{}
}
func (h *mockHandler) GetQueryVChanPositions(channel *channel, partitionID ...UniqueID) *datapb.VchannelInfo {
func (h *mockHandler) GetQueryVChanPositions(channel RWChannel, partitionID ...UniqueID) *datapb.VchannelInfo {
return &datapb.VchannelInfo{
CollectionID: channel.CollectionID,
ChannelName: channel.Name,
CollectionID: channel.GetCollectionID(),
ChannelName: channel.GetName(),
}
}
func (h *mockHandler) GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo {
func (h *mockHandler) GetDataVChanPositions(channel RWChannel, partitionID UniqueID) *datapb.VchannelInfo {
return &datapb.VchannelInfo{
CollectionID: channel.CollectionID,
ChannelName: channel.Name,
CollectionID: channel.GetCollectionID(),
ChannelName: channel.GetName(),
}
}

View File

@ -23,6 +23,7 @@ import (
"strconv"
"time"
"github.com/samber/lo"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"stathat.com/c/consistent"
@ -32,31 +33,31 @@ import (
// RegisterPolicy decides the channels mapping after registering the nodeID
// return bufferedUpdates and balanceUpdates
type RegisterPolicy func(store ROChannelStore, nodeID int64) (ChannelOpSet, ChannelOpSet)
type RegisterPolicy func(store ROChannelStore, nodeID int64) (*ChannelOpSet, *ChannelOpSet)
// EmptyRegister does nothing
func EmptyRegister(store ROChannelStore, nodeID int64) (ChannelOpSet, ChannelOpSet) {
func EmptyRegister(store ROChannelStore, nodeID int64) (*ChannelOpSet, *ChannelOpSet) {
return nil, nil
}
// BufferChannelAssignPolicy assigns buffer channels to new registered node
func BufferChannelAssignPolicy(store ROChannelStore, nodeID int64) ChannelOpSet {
func BufferChannelAssignPolicy(store ROChannelStore, nodeID int64) *ChannelOpSet {
info := store.GetBufferChannelInfo()
if info == nil || len(info.Channels) == 0 {
return nil
}
opSet := ChannelOpSet{}
opSet.Delete(info.NodeID, info.Channels)
opSet.Add(nodeID, info.Channels)
opSet := NewChannelOpSet(
NewDeleteOp(bufferID, info.Channels...),
NewAddOp(nodeID, info.Channels...))
return opSet
}
// AvgAssignRegisterPolicy assigns channels with average to new registered node
// Register will not directly delete the node-channel pair. Channel manager will handle channel release.
func AvgAssignRegisterPolicy(store ROChannelStore, nodeID int64) (ChannelOpSet, ChannelOpSet) {
func AvgAssignRegisterPolicy(store ROChannelStore, nodeID int64) (*ChannelOpSet, *ChannelOpSet) {
opSet := BufferChannelAssignPolicy(store, nodeID)
if len(opSet) != 0 {
if opSet != nil {
return opSet, nil
}
@ -78,7 +79,7 @@ func AvgAssignRegisterPolicy(store ROChannelStore, nodeID int64) (ChannelOpSet,
return len(avaNodes[i].Channels) > len(avaNodes[j].Channels)
})
releases := make(map[int64][]*channel)
releases := make(map[int64][]RWChannel)
for i := 0; i < chPerNode; i++ {
// Pick a node with its channel to release.
toRelease := avaNodes[i%len(avaNodes)]
@ -92,10 +93,10 @@ func AvgAssignRegisterPolicy(store ROChannelStore, nodeID int64) (ChannelOpSet,
releases[toRelease.NodeID] = append(releases[toRelease.NodeID], toRelease.Channels[chIdx])
}
opSet = ChannelOpSet{}
// Channels in `releases` are reassigned eventually by channel manager.
opSet = NewChannelOpSet()
for k, v := range releases {
opSet.Add(k, v)
opSet.Add(k, v...)
}
return nil, opSet
}
@ -114,28 +115,27 @@ func filterNode(infos []*NodeChannelInfo, nodeID int64) []*NodeChannelInfo {
// ConsistentHashRegisterPolicy use a consistent hash to maintain the mapping
func ConsistentHashRegisterPolicy(hashRing *consistent.Consistent) RegisterPolicy {
return func(store ROChannelStore, nodeID int64) (ChannelOpSet, ChannelOpSet) {
return func(store ROChannelStore, nodeID int64) (*ChannelOpSet, *ChannelOpSet) {
elems := formatNodeIDs(store.GetNodes())
hashRing.Set(elems)
releases := make(map[int64][]*channel)
releases := make(map[int64][]RWChannel)
// If there are buffer channels, then nodeID is the first node.
opSet := BufferChannelAssignPolicy(store, nodeID)
if len(opSet) != 0 {
if opSet := BufferChannelAssignPolicy(store, nodeID); opSet != nil {
return opSet, nil
}
opSet = ChannelOpSet{}
opSet := NewChannelOpSet()
// If there are other nodes, channels on these nodes may be reassigned to
// the new registered node. We should find these channels.
channelsInfo := store.GetNodesChannels()
for _, c := range channelsInfo {
for _, ch := range c.Channels {
idStr, err := hashRing.Get(ch.Name)
idStr, err := hashRing.Get(ch.GetName())
if err != nil {
log.Warn("receive error when getting from hashRing",
zap.String("channel", ch.Name), zap.Error(err))
zap.String("channel", ch.String()), zap.Error(err))
return nil, nil
}
did, err := deformatNodeID(idStr)
@ -151,7 +151,7 @@ func ConsistentHashRegisterPolicy(hashRing *consistent.Consistent) RegisterPolic
// Channels in `releases` are reassigned eventually by channel manager.
for id, channels := range releases {
opSet.Add(id, channels)
opSet.Add(id, channels...)
}
return nil, opSet
}
@ -166,21 +166,21 @@ func deformatNodeID(node string) (int64, error) {
}
// ChannelAssignPolicy assign channels to registered nodes.
type ChannelAssignPolicy func(store ROChannelStore, channels []*channel) ChannelOpSet
type ChannelAssignPolicy func(store ROChannelStore, channels []RWChannel) *ChannelOpSet
// AverageAssignPolicy ensure that the number of channels per nodes is approximately the same
func AverageAssignPolicy(store ROChannelStore, channels []*channel) ChannelOpSet {
func AverageAssignPolicy(store ROChannelStore, channels []RWChannel) *ChannelOpSet {
newChannels := filterChannels(store, channels)
if len(newChannels) == 0 {
return nil
}
opSet := ChannelOpSet{}
opSet := NewChannelOpSet()
allDataNodes := store.GetNodesChannels()
// If no datanode alive, save channels in buffer
if len(allDataNodes) == 0 {
opSet.Add(bufferID, channels)
opSet.Add(bufferID, channels...)
return opSet
}
@ -189,21 +189,21 @@ func AverageAssignPolicy(store ROChannelStore, channels []*channel) ChannelOpSet
return len(allDataNodes[i].Channels) <= len(allDataNodes[j].Channels)
})
updates := make(map[int64][]*channel)
updates := make(map[int64][]RWChannel)
for i, newChannel := range newChannels {
n := allDataNodes[i%len(allDataNodes)].NodeID
updates[n] = append(updates[n], newChannel)
}
for id, chs := range updates {
opSet.Add(id, chs)
opSet.Add(id, chs...)
}
return opSet
}
// ConsistentHashChannelAssignPolicy use a consistent hash algorithm to determine channel assignment
func ConsistentHashChannelAssignPolicy(hashRing *consistent.Consistent) ChannelAssignPolicy {
return func(store ROChannelStore, channels []*channel) ChannelOpSet {
return func(store ROChannelStore, channels []RWChannel) *ChannelOpSet {
hashRing.Set(formatNodeIDs(store.GetNodes()))
filteredChannels := filterChannels(store, channels)
@ -211,24 +211,24 @@ func ConsistentHashChannelAssignPolicy(hashRing *consistent.Consistent) ChannelA
return nil
}
opSet := NewChannelOpSet()
if len(hashRing.Members()) == 0 {
opSet := ChannelOpSet{}
opSet.Add(bufferID, channels)
opSet.Add(bufferID, channels...)
return opSet
}
adds := make(map[int64][]*channel)
adds := make(map[int64][]RWChannel)
for _, c := range filteredChannels {
idStr, err := hashRing.Get(c.Name)
idStr, err := hashRing.Get(c.GetName())
if err != nil {
log.Warn("receive error when getting from hashRing",
zap.String("channel", c.Name), zap.Error(err))
zap.String("channel", c.String()), zap.Error(err))
return nil
}
did, err := deformatNodeID(idStr)
if err != nil {
log.Warn("failed to deformat node id", zap.Int64("nodeID", did))
return nil
return NewChannelOpSet()
}
adds[did] = append(adds[did], c)
}
@ -237,24 +237,23 @@ func ConsistentHashChannelAssignPolicy(hashRing *consistent.Consistent) ChannelA
return nil
}
opSet := ChannelOpSet{}
for id, chs := range adds {
opSet.Add(id, chs)
opSet.Add(id, chs...)
}
return opSet
}
}
func filterChannels(store ROChannelStore, channels []*channel) []*channel {
channelsMap := make(map[string]*channel)
func filterChannels(store ROChannelStore, channels []RWChannel) []RWChannel {
channelsMap := make(map[string]RWChannel)
for _, c := range channels {
channelsMap[c.Name] = c
channelsMap[c.GetName()] = c
}
allChannelsInfo := store.GetChannels()
for _, info := range allChannelsInfo {
for _, c := range info.Channels {
delete(channelsMap, c.Name)
delete(channelsMap, c.GetName())
}
}
@ -262,7 +261,7 @@ func filterChannels(store ROChannelStore, channels []*channel) []*channel {
return nil
}
filtered := make([]*channel, 0, len(channelsMap))
filtered := make([]RWChannel, 0, len(channelsMap))
for _, v := range channelsMap {
filtered = append(filtered, v)
}
@ -270,23 +269,23 @@ func filterChannels(store ROChannelStore, channels []*channel) []*channel {
}
// DeregisterPolicy determine the mapping after deregistering the nodeID
type DeregisterPolicy func(store ROChannelStore, nodeID int64) ChannelOpSet
type DeregisterPolicy func(store ROChannelStore, nodeID int64) *ChannelOpSet
// EmptyDeregisterPolicy do nothing
func EmptyDeregisterPolicy(store ROChannelStore, nodeID int64) ChannelOpSet {
func EmptyDeregisterPolicy(store ROChannelStore, nodeID int64) *ChannelOpSet {
return nil
}
// AvgAssignUnregisteredChannels evenly assign the unregistered channels
func AvgAssignUnregisteredChannels(store ROChannelStore, nodeID int64) ChannelOpSet {
func AvgAssignUnregisteredChannels(store ROChannelStore, nodeID int64) *ChannelOpSet {
allNodes := store.GetNodesChannels()
avaNodes := make([]*NodeChannelInfo, 0, len(allNodes))
unregisteredChannels := make([]*channel, 0)
opSet := ChannelOpSet{}
unregisteredChannels := make([]RWChannel, 0)
opSet := NewChannelOpSet()
for _, c := range allNodes {
if c.NodeID == nodeID {
opSet.Delete(nodeID, c.Channels)
opSet.Delete(nodeID, c.Channels...)
unregisteredChannels = append(unregisteredChannels, c.Channels...)
continue
}
@ -294,7 +293,7 @@ func AvgAssignUnregisteredChannels(store ROChannelStore, nodeID int64) ChannelOp
}
if len(avaNodes) == 0 {
opSet.Add(bufferID, unregisteredChannels)
opSet.Add(bufferID, unregisteredChannels...)
return opSet
}
@ -303,24 +302,24 @@ func AvgAssignUnregisteredChannels(store ROChannelStore, nodeID int64) ChannelOp
return len(avaNodes[i].Channels) <= len(avaNodes[j].Channels)
})
updates := make(map[int64][]*channel)
updates := make(map[int64][]RWChannel)
for i, unregisteredChannel := range unregisteredChannels {
n := avaNodes[i%len(avaNodes)].NodeID
updates[n] = append(updates[n], unregisteredChannel)
}
for id, chs := range updates {
opSet.Add(id, chs)
opSet.Add(id, chs...)
}
return opSet
}
// ConsistentHashDeregisterPolicy return a DeregisterPolicy that uses consistent hash
func ConsistentHashDeregisterPolicy(hashRing *consistent.Consistent) DeregisterPolicy {
return func(store ROChannelStore, nodeID int64) ChannelOpSet {
return func(store ROChannelStore, nodeID int64) *ChannelOpSet {
hashRing.Set(formatNodeIDsWithFilter(store.GetNodes(), nodeID))
channels := store.GetNodesChannels()
opSet := ChannelOpSet{}
opSet := NewChannelOpSet()
var deletedInfo *NodeChannelInfo
for _, cinfo := range channels {
@ -334,20 +333,20 @@ func ConsistentHashDeregisterPolicy(hashRing *consistent.Consistent) DeregisterP
return nil
}
opSet.Delete(nodeID, deletedInfo.Channels)
opSet.Delete(nodeID, deletedInfo.Channels...)
// If no members in hash ring, store channels in buffer
if len(hashRing.Members()) == 0 {
opSet.Add(bufferID, deletedInfo.Channels)
opSet.Add(bufferID, deletedInfo.Channels...)
return opSet
}
// reassign channels of deleted node
updates := make(map[int64][]*channel)
updates := make(map[int64][]RWChannel)
for _, c := range deletedInfo.Channels {
idStr, err := hashRing.Get(c.Name)
idStr, err := hashRing.Get(c.GetName())
if err != nil {
log.Warn("failed to get channel in hash ring", zap.String("channel", c.Name))
log.Warn("failed to get channel in hash ring", zap.String("channel", c.String()))
return nil
}
@ -360,48 +359,43 @@ func ConsistentHashDeregisterPolicy(hashRing *consistent.Consistent) DeregisterP
}
for id, chs := range updates {
opSet.Add(id, chs)
opSet.Add(id, chs...)
}
return opSet
}
}
type BalanceChannelPolicy func(store ROChannelStore, ts time.Time) ChannelOpSet
type BalanceChannelPolicy func(store ROChannelStore, ts time.Time) *ChannelOpSet
func AvgBalanceChannelPolicy(store ROChannelStore, ts time.Time) ChannelOpSet {
channelOps := make(ChannelOpSet, 0)
func AvgBalanceChannelPolicy(store ROChannelStore, ts time.Time) *ChannelOpSet {
opSet := NewChannelOpSet()
reAllocates, err := BgBalanceCheck(store.GetNodesChannels(), ts)
if err != nil {
log.Error("failed to balance node channels", zap.Error(err))
return channelOps
return opSet
}
for _, reAlloc := range reAllocates {
toRelease := &ChannelOp{
Type: Add,
Channels: reAlloc.Channels,
NodeID: reAlloc.NodeID,
}
channelOps = append(channelOps, toRelease)
opSet.Add(reAlloc.NodeID, reAlloc.Channels...)
}
return channelOps
return opSet
}
// ChannelReassignPolicy is a policy for reassigning channels
type ChannelReassignPolicy func(store ROChannelStore, reassigns []*NodeChannelInfo) ChannelOpSet
type ChannelReassignPolicy func(store ROChannelStore, reassigns []*NodeChannelInfo) *ChannelOpSet
// EmptyReassignPolicy is a dummy reassign policy
func EmptyReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo) ChannelOpSet {
func EmptyReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo) *ChannelOpSet {
return nil
}
// EmptyBalancePolicy is a dummy balance policy
func EmptyBalancePolicy(store ROChannelStore, ts time.Time) ChannelOpSet {
func EmptyBalancePolicy(store ROChannelStore, ts time.Time) *ChannelOpSet {
return nil
}
// RoundRobinReassignPolicy is a reassigning policy that evenly assign channels
func RoundRobinReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo) ChannelOpSet {
func RoundRobinReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo) *ChannelOpSet {
allNodes := store.GetNodesChannels()
filterMap := make(map[int64]struct{})
for _, reassign := range reassigns {
@ -414,10 +408,10 @@ func RoundRobinReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo
}
avaNodes = append(avaNodes, c)
}
ret := make([]*ChannelOp, 0)
opSet := NewChannelOpSet()
if len(avaNodes) == 0 {
// if no node is left, do not reassign
return ret
return opSet
}
sort.Slice(avaNodes, func(i, j int) bool {
return len(avaNodes[i].Channels) <= len(avaNodes[j].Channels)
@ -427,35 +421,25 @@ func RoundRobinReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo
i := 0
addUpdates := make(map[int64]*ChannelOp)
for _, reassign := range reassigns {
deleteUpdate := &ChannelOp{
Type: Delete,
Channels: reassign.Channels,
NodeID: reassign.NodeID,
}
ret = append(ret, deleteUpdate)
opSet.Delete(reassign.NodeID, reassign.Channels...)
for _, ch := range reassign.Channels {
targetID := avaNodes[i%len(avaNodes)].NodeID
i++
if _, ok := addUpdates[targetID]; !ok {
addUpdates[targetID] = &ChannelOp{
Type: Add,
NodeID: targetID,
Channels: []*channel{ch},
}
addUpdates[targetID] = NewAddOp(targetID, ch)
} else {
addUpdates[targetID].Channels = append(addUpdates[targetID].Channels, ch)
addUpdates[targetID].Append(ch)
}
}
}
for _, update := range addUpdates {
ret = append(ret, update)
}
return ret
opSet.Insert(lo.Values(addUpdates)...)
return opSet
}
// AverageReassignPolicy is a reassigning policy that evenly balance channels among datanodes
// which is used by bgChecker
func AverageReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo) ChannelOpSet {
func AverageReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo) *ChannelOpSet {
allNodes := store.GetNodesChannels()
filterMap := make(map[int64]struct{})
toReassignTotalNum := 0
@ -474,13 +458,14 @@ func AverageReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo) C
}
log.Info("AverageReassignPolicy working", zap.Int("avaNodesCount", len(avaNodes)),
zap.Int("toAssignChannelNum", toReassignTotalNum), zap.Int("avaNodesChannelSum", avaNodesChannelSum))
ret := make([]*ChannelOp, 0)
if len(avaNodes) == 0 {
// if no node is left, do not reassign
log.Warn("there is no available nodes when reassigning, return")
return ret
return nil
}
opSet := NewChannelOpSet()
avgChannelCount := int(math.Ceil(float64(avaNodesChannelSum+toReassignTotalNum) / (float64(len(avaNodes)))))
sort.Slice(avaNodes, func(i, j int) bool {
if len(avaNodes[i].Channels) == len(avaNodes[j].Channels) {
@ -492,12 +477,7 @@ func AverageReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo) C
// reassign channels to remaining nodes
addUpdates := make(map[int64]*ChannelOp)
for _, reassign := range reassigns {
deleteUpdate := &ChannelOp{
Type: Delete,
Channels: reassign.Channels,
NodeID: reassign.NodeID,
}
ret = append(ret, deleteUpdate)
opSet.Delete(reassign.NodeID, reassign.Channels...)
for _, ch := range reassign.Channels {
nodeIdx := 0
for {
@ -524,22 +504,16 @@ func AverageReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo) C
nodeIdx++
}
if _, ok := addUpdates[targetID]; !ok {
addUpdates[targetID] = &ChannelOp{
Type: Add,
NodeID: targetID,
Channels: []*channel{ch},
}
addUpdates[targetID] = NewAddOp(targetID, ch)
} else {
addUpdates[targetID].Channels = append(addUpdates[targetID].Channels, ch)
addUpdates[targetID].Append(ch)
}
break
}
}
}
for _, update := range addUpdates {
ret = append(ret, update)
}
return ret
opSet.Insert(lo.Values(addUpdates)...)
return opSet
}
// ChannelBGChecker check nodes' channels and return the channels needed to be reallocated.
@ -559,7 +533,7 @@ func (rallocates ReAllocates) MarshalLogArray(enc zapcore.ArrayEncoder) error {
cstr := "["
if len(nChannelInfo.Channels) > 0 {
for _, s := range nChannelInfo.Channels {
cstr += s.Name
cstr += s.GetName()
cstr += ", "
}
cstr = cstr[:len(cstr)-2]
@ -591,7 +565,7 @@ func BgBalanceCheck(nodeChannels []*NodeChannelInfo, ts time.Time) ([]*NodeChann
}
reallocate := &NodeChannelInfo{
NodeID: nChannels.NodeID,
Channels: make([]*channel, 0),
Channels: make([]RWChannel, 0),
}
toReleaseCount := chCount - channelCountPerNode - 1
for _, ch := range nChannels.Channels {

View File

@ -20,56 +20,68 @@ import (
"testing"
"time"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"stathat.com/c/consistent"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
)
func fillEmptyPosition(operations ChannelOpSet) {
for _, op := range operations {
if op.Type == Add {
for range op.Channels {
op.ChannelWatchInfos = append(op.ChannelWatchInfos, nil)
}
}
}
}
func TestBufferChannelAssignPolicy(t *testing.T) {
kv := memkv.NewMemoryKV()
channels := []*channel{{Name: "chan1", CollectionID: 1}}
channels := []RWChannel{getChannel("chan1", 1)}
store := &ChannelStore{
store: kv,
channelsInfo: map[int64]*NodeChannelInfo{bufferID: {bufferID, channels}},
store: kv,
channelsInfo: map[int64]*NodeChannelInfo{
1: {1, []RWChannel{}},
bufferID: {bufferID, channels},
},
}
updates := BufferChannelAssignPolicy(store, 1)
updates := BufferChannelAssignPolicy(store, 1).Collect()
assert.NotNil(t, updates)
assert.Equal(t, 2, len(updates))
assert.EqualValues(t, &ChannelOp{Type: Delete, NodeID: bufferID, Channels: channels}, updates[0])
assert.EqualValues(t, 1, updates[1].NodeID)
assert.Equal(t, Add, updates[1].Type)
assert.Equal(t, channels, updates[1].Channels)
assert.ElementsMatch(t,
NewChannelOpSet(
NewAddOp(1, channels...),
NewDeleteOp(bufferID, channels...),
).Collect(),
updates)
}
func getChannel(name string, collID int64) *channelMeta {
return &channelMeta{Name: name, CollectionID: collID}
}
func getChannels(ch2Coll map[string]int64) []RWChannel {
return lo.MapToSlice(ch2Coll, func(name string, coll int64) RWChannel {
return &channelMeta{Name: name, CollectionID: coll}
})
}
func TestConsistentHashRegisterPolicy(t *testing.T) {
t.Run("first register", func(t *testing.T) {
kv := memkv.NewMemoryKV()
channels := []*channel{
{Name: "chan1", CollectionID: 1},
{Name: "chan2", CollectionID: 2},
ch2Coll := map[string]int64{
"chan1": 1,
"chan2": 2,
}
channels := getChannels(ch2Coll)
store := &ChannelStore{
store: kv,
channelsInfo: map[int64]*NodeChannelInfo{bufferID: {bufferID, channels}},
store: kv,
channelsInfo: map[int64]*NodeChannelInfo{
bufferID: {bufferID, channels},
1: {1, []RWChannel{}},
},
}
hashring := consistent.New()
policy := ConsistentHashRegisterPolicy(hashring)
updates, _ := policy(store, 1)
up, _ := policy(store, 1)
updates := up.Collect()
assert.NotNil(t, updates)
assert.Equal(t, 2, len(updates))
assert.EqualValues(t, &ChannelOp{Type: Delete, NodeID: bufferID, Channels: channels}, updates[0])
@ -79,38 +91,44 @@ func TestConsistentHashRegisterPolicy(t *testing.T) {
t.Run("rebalance after register", func(t *testing.T) {
kv := memkv.NewMemoryKV()
channels := []*channel{
{Name: "chan1", CollectionID: 1},
{Name: "chan2", CollectionID: 2},
ch2Coll := map[string]int64{
"chan1": 1,
"chan2": 2,
}
channels := getChannels(ch2Coll)
store := &ChannelStore{
store: kv,
channelsInfo: map[int64]*NodeChannelInfo{1: {1, channels}, 2: {2, []*channel{}}},
channelsInfo: map[int64]*NodeChannelInfo{1: {1, channels}, 2: {2, []RWChannel{}}},
}
hashring := consistent.New()
hashring.Add(formatNodeID(1))
policy := ConsistentHashRegisterPolicy(hashring)
_, updates := policy(store, 2)
_, up := policy(store, 2)
updates := up.Collect()
assert.NotNil(t, updates)
assert.Equal(t, 1, len(updates))
// No Delete operation will be generated
assert.EqualValues(t, &ChannelOp{Type: Add, NodeID: 1, Channels: []*channel{channels[0]}}, updates[0])
assert.EqualValues(t, &ChannelOp{Type: Add, NodeID: 1, Channels: []RWChannel{channels[0]}}, updates[0])
assert.ElementsMatch(t,
NewChannelOpSet(NewAddOp(1, channels[0])).Collect(),
updates,
)
})
}
func TestAverageAssignPolicy(t *testing.T) {
type args struct {
store ROChannelStore
channels []*channel
channels []RWChannel
}
tests := []struct {
name string
args args
want ChannelOpSet
want *ChannelOpSet
}{
{
"test assign empty cluster",
@ -119,9 +137,9 @@ func TestAverageAssignPolicy(t *testing.T) {
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{},
},
[]*channel{{Name: "chan1", CollectionID: 1}},
[]RWChannel{getChannel("chan1", 1)},
},
[]*ChannelOp{{Add, bufferID, []*channel{{Name: "chan1", CollectionID: 1}}, nil}},
NewChannelOpSet(NewAddOp(bufferID, getChannel("chan1", 1))),
},
{
"test watch same channel",
@ -129,12 +147,12 @@ func TestAverageAssignPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{Name: "chan1", CollectionID: 1}}},
1: {1, []RWChannel{getChannel("chan1", 1)}},
},
},
[]*channel{{Name: "chan1", CollectionID: 1}},
[]RWChannel{getChannel("chan1", 1)},
},
nil,
NewChannelOpSet(),
},
{
"test normal assign",
@ -142,19 +160,19 @@ func TestAverageAssignPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}}},
2: {2, []*channel{{Name: "chan3", CollectionID: 1}}},
1: {1, []RWChannel{getChannel("chan1", 1), getChannel("chan2", 1)}},
2: {2, []RWChannel{getChannel("chan3", 1)}},
},
},
[]*channel{{Name: "chan4", CollectionID: 1}},
[]RWChannel{getChannel("chan4", 1)},
},
[]*ChannelOp{{Add, 2, []*channel{{Name: "chan4", CollectionID: 1}}, nil}},
NewChannelOpSet(NewAddOp(2, getChannel("chan4", 1))),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := AverageAssignPolicy(tt.args.store, tt.args.channels)
assert.EqualValues(t, tt.want, got)
assert.EqualValues(t, tt.want.Collect(), got.Collect())
})
}
}
@ -163,12 +181,12 @@ func TestConsistentHashChannelAssignPolicy(t *testing.T) {
type args struct {
hashring *consistent.Consistent
store ROChannelStore
channels []*channel
channels []RWChannel
}
tests := []struct {
name string
args args
want ChannelOpSet
want *ChannelOpSet
}{
{
"test assign empty cluster",
@ -178,9 +196,9 @@ func TestConsistentHashChannelAssignPolicy(t *testing.T) {
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{},
},
[]*channel{{Name: "chan1", CollectionID: 1}},
[]RWChannel{getChannel("chan1", 1)},
},
[]*ChannelOp{{Add, bufferID, []*channel{{Name: "chan1", CollectionID: 1}}, nil}},
NewChannelOpSet(NewAddOp(bufferID, getChannel("chan1", 1))),
},
{
"test watch same channel",
@ -189,12 +207,12 @@ func TestConsistentHashChannelAssignPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}}},
1: {1, []RWChannel{getChannel("chan1", 1), getChannel("chan2", 1)}},
},
},
[]*channel{{Name: "chan1", CollectionID: 1}},
[]RWChannel{getChannel("chan1", 1)},
},
nil,
NewChannelOpSet(),
},
{
"test normal watch",
@ -204,17 +222,22 @@ func TestConsistentHashChannelAssignPolicy(t *testing.T) {
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{1: {1, nil}, 2: {2, nil}, 3: {3, nil}},
},
[]*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}, {Name: "chan3", CollectionID: 1}},
[]RWChannel{getChannel("chan1", 1), getChannel("chan2", 1), getChannel("chan3", 1)},
},
[]*ChannelOp{{Add, 2, []*channel{{Name: "chan1", CollectionID: 1}}, nil}, {Add, 1, []*channel{{Name: "chan2", CollectionID: 1}}, nil}, {Add, 3, []*channel{{Name: "chan3", CollectionID: 1}}, nil}},
NewChannelOpSet(
NewAddOp(2, getChannel("chan1", 1)),
NewAddOp(1, getChannel("chan2", 1)),
NewAddOp(3, getChannel("chan3", 1)),
),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
policy := ConsistentHashChannelAssignPolicy(tt.args.hashring)
got := policy(tt.args.store, tt.args.channels)
assert.Equal(t, len(tt.want), len(got))
for _, op := range tt.want {
got := policy(tt.args.store, tt.args.channels).Collect()
want := tt.want.Collect()
assert.Equal(t, len(want), len(got))
for _, op := range want {
assert.Contains(t, got, op)
}
})
@ -229,7 +252,7 @@ func TestAvgAssignUnregisteredChannels(t *testing.T) {
tests := []struct {
name string
args args
want ChannelOpSet
want *ChannelOpSet
}{
{
"test deregister the last node",
@ -237,12 +260,15 @@ func TestAvgAssignUnregisteredChannels(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{Name: "chan1", CollectionID: 1}}},
1: {1, []RWChannel{getChannel("chan1", 1)}},
},
},
1,
},
[]*ChannelOp{{Delete, 1, []*channel{{Name: "chan1", CollectionID: 1}}, nil}, {Add, bufferID, []*channel{{Name: "chan1", CollectionID: 1}}, nil}},
NewChannelOpSet(
NewDeleteOp(1, getChannel("chan1", 1)),
NewAddOp(bufferID, getChannel("chan1", 1)),
),
},
{
"test rebalance channels after deregister",
@ -250,20 +276,23 @@ func TestAvgAssignUnregisteredChannels(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{Name: "chan1", CollectionID: 1}}},
2: {2, []*channel{{Name: "chan2", CollectionID: 1}}},
3: {3, []*channel{}},
1: {1, []RWChannel{getChannel("chan1", 1)}},
2: {2, []RWChannel{getChannel("chan2", 1)}},
3: {3, []RWChannel{}},
},
},
2,
},
[]*ChannelOp{{Delete, 2, []*channel{{Name: "chan2", CollectionID: 1}}, nil}, {Add, 3, []*channel{{Name: "chan2", CollectionID: 1}}, nil}},
NewChannelOpSet(
NewDeleteOp(2, getChannel("chan2", 1)),
NewAddOp(3, getChannel("chan2", 1)),
),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := AvgAssignUnregisteredChannels(tt.args.store, tt.args.nodeID)
assert.EqualValues(t, tt.want, got)
assert.EqualValues(t, tt.want.Collect(), got.Collect())
})
}
}
@ -277,7 +306,7 @@ func TestConsistentHashDeregisterPolicy(t *testing.T) {
tests := []struct {
name string
args args
want ChannelOpSet
want *ChannelOpSet
}{
{
"test deregister the last node",
@ -286,12 +315,15 @@ func TestConsistentHashDeregisterPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{Name: "chan1", CollectionID: 1}}},
1: {1, []RWChannel{getChannel("chan1", 1)}},
},
},
1,
},
[]*ChannelOp{{Delete, 1, []*channel{{Name: "chan1", CollectionID: 1}}, nil}, {Add, bufferID, []*channel{{Name: "chan1", CollectionID: 1}}, nil}},
NewChannelOpSet(
NewDeleteOp(1, getChannel("chan1", 1)),
NewAddOp(bufferID, getChannel("chan1", 1)),
),
},
{
"rebalance after deregister",
@ -300,21 +332,24 @@ func TestConsistentHashDeregisterPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{Name: "chan2", CollectionID: 1}}},
2: {2, []*channel{{Name: "chan1", CollectionID: 1}}},
3: {3, []*channel{{Name: "chan3", CollectionID: 1}}},
1: {1, []RWChannel{getChannel("chan2", 1)}},
2: {2, []RWChannel{getChannel("chan1", 1)}},
3: {3, []RWChannel{getChannel("chan3", 1)}},
},
},
2,
},
[]*ChannelOp{{Delete, 2, []*channel{{Name: "chan1", CollectionID: 1}}, nil}, {Add, 1, []*channel{{Name: "chan1", CollectionID: 1}}, nil}},
NewChannelOpSet(
NewDeleteOp(2, getChannel("chan1", 1)),
NewAddOp(1, getChannel("chan1", 1)),
),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
policy := ConsistentHashDeregisterPolicy(tt.args.hashring)
got := policy(tt.args.store, tt.args.nodeID)
assert.EqualValues(t, tt.want, got)
assert.EqualValues(t, tt.want.Collect(), got.Collect())
})
}
}
@ -327,7 +362,7 @@ func TestRoundRobinReassignPolicy(t *testing.T) {
tests := []struct {
name string
args args
want ChannelOpSet
want *ChannelOpSet
}{
{
"test only one node",
@ -335,32 +370,35 @@ func TestRoundRobinReassignPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{Name: "chan1", CollectionID: 1}}},
1: {1, []RWChannel{getChannel("chan1", 1)}},
},
},
[]*NodeChannelInfo{{1, []*channel{{Name: "chan1", CollectionID: 1}}}},
[]*NodeChannelInfo{{1, []RWChannel{getChannel("chan1", 1)}}},
},
[]*ChannelOp{},
NewChannelOpSet(),
},
{
"test normal reassing",
"test normal reassigning",
args{
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}}},
2: {2, []*channel{}},
1: {1, []RWChannel{getChannel("chan1", 1), getChannel("chan2", 1)}},
2: {2, []RWChannel{}},
},
},
[]*NodeChannelInfo{{1, []*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}}}},
[]*NodeChannelInfo{{1, []RWChannel{getChannel("chan1", 1), getChannel("chan2", 1)}}},
},
[]*ChannelOp{{Delete, 1, []*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}}, nil}, {Add, 2, []*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}}, nil}},
NewChannelOpSet(
NewDeleteOp(1, getChannel("chan1", 1), getChannel("chan2", 1)),
NewAddOp(2, getChannel("chan1", 1), getChannel("chan2", 1)),
),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := RoundRobinReassignPolicy(tt.args.store, tt.args.reassigns)
assert.EqualValues(t, tt.want, got)
assert.EqualValues(t, tt.want.Collect(), got.Collect())
})
}
}
@ -381,9 +419,9 @@ func TestBgCheckForChannelBalance(t *testing.T) {
"test even distribution",
args{
[]*NodeChannelInfo{
{1, []*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}}},
{2, []*channel{{Name: "chan1", CollectionID: 2}, {Name: "chan2", CollectionID: 2}}},
{3, []*channel{{Name: "chan1", CollectionID: 3}, {Name: "chan2", CollectionID: 3}}},
{1, []RWChannel{getChannel("chan1", 1), getChannel("chan2", 1)}},
{2, []RWChannel{getChannel("chan1", 2), getChannel("chan2", 2)}},
{3, []RWChannel{getChannel("chan1", 3), getChannel("chan2", 3)}},
},
time.Now(),
},
@ -395,8 +433,8 @@ func TestBgCheckForChannelBalance(t *testing.T) {
"test uneven with conservative effect",
args{
[]*NodeChannelInfo{
{1, []*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}}},
{2, []*channel{}},
{1, []RWChannel{getChannel("chan1", 1), getChannel("chan2", 1)}},
{2, []RWChannel{}},
},
time.Now(),
},
@ -409,16 +447,16 @@ func TestBgCheckForChannelBalance(t *testing.T) {
"test uneven with zero",
args{
[]*NodeChannelInfo{
{1, []*channel{
{Name: "chan1", CollectionID: 1},
{Name: "chan2", CollectionID: 1},
{Name: "chan3", CollectionID: 1},
{1, []RWChannel{
getChannel("chan1", 1),
getChannel("chan2", 1),
getChannel("chan3", 1),
}},
{2, []*channel{}},
{2, []RWChannel{}},
},
time.Now(),
},
[]*NodeChannelInfo{{1, []*channel{{Name: "chan1", CollectionID: 1}}}},
[]*NodeChannelInfo{{1, []RWChannel{getChannel("chan1", 1)}}},
nil,
},
}
@ -440,7 +478,7 @@ func TestAvgReassignPolicy(t *testing.T) {
tests := []struct {
name string
args args
want ChannelOpSet
want *ChannelOpSet
}{
{
"test_only_one_node",
@ -448,13 +486,13 @@ func TestAvgReassignPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{Name: "chan1", CollectionID: 1}}},
1: {1, []RWChannel{getChannel("chan1", 1)}},
},
},
[]*NodeChannelInfo{{1, []*channel{{Name: "chan1", CollectionID: 1}}}},
[]*NodeChannelInfo{{1, []RWChannel{getChannel("chan1", 1)}}},
},
// as there's no available nodes except the input node, there's no reassign plan generated
[]*ChannelOp{},
NewChannelOpSet(),
},
{
"test_zero_avg",
@ -462,20 +500,20 @@ func TestAvgReassignPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{Name: "chan1", CollectionID: 1}}},
2: {2, []*channel{}},
3: {2, []*channel{}},
4: {2, []*channel{}},
1: {1, []RWChannel{getChannel("chan1", 1)}},
2: {2, []RWChannel{}},
3: {2, []RWChannel{}},
4: {2, []RWChannel{}},
},
},
[]*NodeChannelInfo{{1, []*channel{{Name: "chan1", CollectionID: 1}}}},
},
[]*ChannelOp{
// as we use ceil to calculate the wanted average number, there should be one reassign
// though the average num less than 1
{Delete, 1, []*channel{{Name: "chan1", CollectionID: 1}}, nil},
{Add, 2, []*channel{{Name: "chan1", CollectionID: 1}}, nil},
[]*NodeChannelInfo{{1, []RWChannel{getChannel("chan1", 1)}}},
},
// as we use ceil to calculate the wanted average number, there should be one reassign
// though the average num less than 1
NewChannelOpSet(
NewDeleteOp(1, getChannel("chan1", 1)),
NewAddOp(2, getChannel("chan1", 1)),
),
},
{
"test_normal_reassigning_for_one_available_nodes",
@ -483,16 +521,16 @@ func TestAvgReassignPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}}},
2: {2, []*channel{}},
1: {1, []RWChannel{getChannel("chan1", 1), getChannel("chan2", 1)}},
2: {2, []RWChannel{}},
},
},
[]*NodeChannelInfo{{1, []*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}}}},
},
[]*ChannelOp{
{Delete, 1, []*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}}, nil},
{Add, 2, []*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}}, nil},
[]*NodeChannelInfo{{1, []RWChannel{getChannel("chan1", 1), getChannel("chan2", 1)}}},
},
NewChannelOpSet(
NewDeleteOp(1, getChannel("chan1", 1), getChannel("chan2", 1)),
NewAddOp(2, getChannel("chan1", 1), getChannel("chan2", 1)),
),
},
{
"test_normal_reassigning_for_multiple_available_nodes",
@ -500,37 +538,33 @@ func TestAvgReassignPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{
{Name: "chan1", CollectionID: 1},
{Name: "chan2", CollectionID: 1},
{Name: "chan3", CollectionID: 1},
{Name: "chan4", CollectionID: 1},
1: {1, []RWChannel{
getChannel("chan1", 1),
getChannel("chan2", 1),
getChannel("chan3", 1),
getChannel("chan4", 1),
}},
2: {2, []*channel{}},
3: {3, []*channel{}},
4: {4, []*channel{}},
2: {2, []RWChannel{}},
3: {3, []RWChannel{}},
4: {4, []RWChannel{}},
},
},
[]*NodeChannelInfo{{1, []*channel{
{Name: "chan1", CollectionID: 1},
{Name: "chan2", CollectionID: 1},
{Name: "chan3", CollectionID: 1},
[]*NodeChannelInfo{{1, []RWChannel{
getChannel("chan1", 1),
getChannel("chan2", 1),
getChannel("chan3", 1),
}}},
},
[]*ChannelOp{
{
Delete, 1,
[]*channel{
{Name: "chan1", CollectionID: 1},
{Name: "chan2", CollectionID: 1},
{Name: "chan3", CollectionID: 1},
},
nil,
},
{Add, 2, []*channel{{Name: "chan1", CollectionID: 1}}, nil},
{Add, 3, []*channel{{Name: "chan2", CollectionID: 1}}, nil},
{Add, 4, []*channel{{Name: "chan3", CollectionID: 1}}, nil},
},
NewChannelOpSet(
NewDeleteOp(1, []RWChannel{
getChannel("chan1", 1),
getChannel("chan2", 1),
getChannel("chan3", 1),
}...),
NewAddOp(2, getChannel("chan1", 1)),
NewAddOp(3, getChannel("chan2", 1)),
NewAddOp(4, getChannel("chan3", 1)),
),
},
{
"test_reassigning_for_extreme_case",
@ -538,76 +572,77 @@ func TestAvgReassignPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{
{Name: "chan1", CollectionID: 1},
{Name: "chan2", CollectionID: 1},
{Name: "chan3", CollectionID: 1},
{Name: "chan4", CollectionID: 1},
{Name: "chan5", CollectionID: 1},
{Name: "chan6", CollectionID: 1},
{Name: "chan7", CollectionID: 1},
{Name: "chan8", CollectionID: 1},
{Name: "chan9", CollectionID: 1},
{Name: "chan10", CollectionID: 1},
{Name: "chan11", CollectionID: 1},
{Name: "chan12", CollectionID: 1},
1: {1, []RWChannel{
getChannel("chan1", 1),
getChannel("chan2", 1),
getChannel("chan3", 1),
getChannel("chan4", 1),
getChannel("chan5", 1),
getChannel("chan6", 1),
getChannel("chan7", 1),
getChannel("chan8", 1),
getChannel("chan9", 1),
getChannel("chan10", 1),
getChannel("chan11", 1),
getChannel("chan12", 1),
}},
2: {2, []*channel{
{Name: "chan13", CollectionID: 1}, {Name: "chan14", CollectionID: 1},
2: {2, []RWChannel{
getChannel("chan13", 1),
getChannel("chan14", 1),
}},
3: {3, []*channel{{Name: "chan15", CollectionID: 1}}},
4: {4, []*channel{}},
3: {3, []RWChannel{getChannel("chan15", 1)}},
4: {4, []RWChannel{}},
},
},
[]*NodeChannelInfo{{1, []*channel{
{Name: "chan1", CollectionID: 1},
{Name: "chan2", CollectionID: 1},
{Name: "chan3", CollectionID: 1},
{Name: "chan4", CollectionID: 1},
{Name: "chan5", CollectionID: 1},
{Name: "chan6", CollectionID: 1},
{Name: "chan7", CollectionID: 1},
{Name: "chan8", CollectionID: 1},
{Name: "chan9", CollectionID: 1},
{Name: "chan10", CollectionID: 1},
{Name: "chan11", CollectionID: 1},
{Name: "chan12", CollectionID: 1},
[]*NodeChannelInfo{{1, []RWChannel{
getChannel("chan1", 1),
getChannel("chan2", 1),
getChannel("chan3", 1),
getChannel("chan4", 1),
getChannel("chan5", 1),
getChannel("chan6", 1),
getChannel("chan7", 1),
getChannel("chan8", 1),
getChannel("chan9", 1),
getChannel("chan10", 1),
getChannel("chan11", 1),
getChannel("chan12", 1),
}}},
},
[]*ChannelOp{
{Delete, 1, []*channel{
{Name: "chan1", CollectionID: 1},
{Name: "chan2", CollectionID: 1},
{Name: "chan3", CollectionID: 1},
{Name: "chan4", CollectionID: 1},
{Name: "chan5", CollectionID: 1},
{Name: "chan6", CollectionID: 1},
{Name: "chan7", CollectionID: 1},
{Name: "chan8", CollectionID: 1},
{Name: "chan9", CollectionID: 1},
{Name: "chan10", CollectionID: 1},
{Name: "chan11", CollectionID: 1},
{Name: "chan12", CollectionID: 1},
}, nil},
{Add, 4, []*channel{
{Name: "chan1", CollectionID: 1},
{Name: "chan2", CollectionID: 1},
{Name: "chan3", CollectionID: 1},
{Name: "chan4", CollectionID: 1},
{Name: "chan5", CollectionID: 1},
}, nil},
{Add, 3, []*channel{
{Name: "chan6", CollectionID: 1},
{Name: "chan7", CollectionID: 1},
{Name: "chan8", CollectionID: 1},
{Name: "chan9", CollectionID: 1},
}, nil},
{Add, 2, []*channel{
{Name: "chan10", CollectionID: 1},
{Name: "chan11", CollectionID: 1},
{Name: "chan12", CollectionID: 1},
}, nil},
},
NewChannelOpSet(
NewDeleteOp(1, []RWChannel{
getChannel("chan1", 1),
getChannel("chan2", 1),
getChannel("chan3", 1),
getChannel("chan4", 1),
getChannel("chan5", 1),
getChannel("chan6", 1),
getChannel("chan7", 1),
getChannel("chan8", 1),
getChannel("chan9", 1),
getChannel("chan10", 1),
getChannel("chan11", 1),
getChannel("chan12", 1),
}...),
NewAddOp(4, []RWChannel{
getChannel("chan1", 1),
getChannel("chan2", 1),
getChannel("chan3", 1),
getChannel("chan4", 1),
getChannel("chan5", 1),
}...),
NewAddOp(3, []RWChannel{
getChannel("chan6", 1),
getChannel("chan7", 1),
getChannel("chan8", 1),
getChannel("chan9", 1),
}...),
NewAddOp(2, []RWChannel{
getChannel("chan10", 1),
getChannel("chan11", 1),
getChannel("chan12", 1),
}...),
),
},
}
for _, tt := range tests {
@ -617,7 +652,7 @@ func TestAvgReassignPolicy(t *testing.T) {
}
t.Run(tt.name, func(t *testing.T) {
got := AverageReassignPolicy(tt.args.store, tt.args.reassigns)
assert.EqualValues(t, tt.want, got)
assert.ElementsMatch(t, tt.want.Collect(), got.Collect())
})
}
}
@ -629,7 +664,7 @@ func TestAvgBalanceChannelPolicy(t *testing.T) {
tests := []struct {
name string
args args
want ChannelOpSet
want *ChannelOpSet
}{
{
"test_only_one_node",
@ -638,29 +673,25 @@ func TestAvgBalanceChannelPolicy(t *testing.T) {
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {
1, []*channel{
{Name: "chan1", CollectionID: 1},
{Name: "chan2", CollectionID: 1},
{Name: "chan3", CollectionID: 1},
{Name: "chan4", CollectionID: 1},
1, []RWChannel{
getChannel("chan1", 1),
getChannel("chan2", 1),
getChannel("chan3", 1),
getChannel("chan4", 1),
},
},
2: {2, []*channel{}},
2: {2, []RWChannel{}},
},
},
},
[]*ChannelOp{
{Add, 1, []*channel{
{Name: "chan1", CollectionID: 1},
}, nil},
},
NewChannelOpSet(NewAddOp(1, getChannel("chan1", 1))),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := AvgBalanceChannelPolicy(tt.args.store, time.Now())
assert.EqualValues(t, tt.want, got)
assert.EqualValues(t, tt.want.Collect(), got.Collect())
})
}
}
@ -673,8 +704,8 @@ func TestAvgAssignRegisterPolicy(t *testing.T) {
tests := []struct {
name string
args args
bufferedUpdates ChannelOpSet
balanceUpdates ChannelOpSet
bufferedUpdates *ChannelOpSet
balanceUpdates *ChannelOpSet
}{
{
"test empty",
@ -682,13 +713,13 @@ func TestAvgAssignRegisterPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {NodeID: 1, Channels: make([]*channel, 0)},
1: {NodeID: 1, Channels: make([]RWChannel, 0)},
},
},
1,
},
nil,
nil,
NewChannelOpSet(),
NewChannelOpSet(),
},
{
"test with buffer channel",
@ -696,25 +727,17 @@ func TestAvgAssignRegisterPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
bufferID: {bufferID, []*channel{{Name: "ch1", CollectionID: 1}}},
1: {NodeID: 1, Channels: []*channel{}},
bufferID: {bufferID, []RWChannel{getChannel("ch1", 1)}},
1: {NodeID: 1, Channels: []RWChannel{}},
},
},
1,
},
[]*ChannelOp{
{
Type: Delete,
NodeID: bufferID,
Channels: []*channel{{Name: "ch1", CollectionID: 1}},
},
{
Type: Add,
NodeID: 1,
Channels: []*channel{{Name: "ch1", CollectionID: 1}},
},
},
nil,
NewChannelOpSet(
NewDeleteOp(bufferID, getChannel("ch1", 1)),
NewAddOp(1, getChannel("ch1", 1)),
),
NewChannelOpSet(),
},
{
"test with avg assign",
@ -722,20 +745,14 @@ func TestAvgAssignRegisterPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{Name: "ch1", CollectionID: 1}, {Name: "ch2", CollectionID: 1}}},
3: {3, []*channel{}},
1: {1, []RWChannel{getChannel("ch1", 1), getChannel("ch2", 1)}},
3: {3, []RWChannel{}},
},
},
3,
},
nil,
[]*ChannelOp{
{
Type: Add,
NodeID: 1,
Channels: []*channel{{Name: "ch1", CollectionID: 1}},
},
},
NewChannelOpSet(),
NewChannelOpSet(NewAddOp(1, getChannel("ch1", 1))),
},
{
"test with avg equals to zero",
@ -743,15 +760,15 @@ func TestAvgAssignRegisterPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{Name: "ch1", CollectionID: 1}}},
2: {2, []*channel{{Name: "ch3", CollectionID: 1}}},
3: {3, []*channel{}},
1: {1, []RWChannel{getChannel("ch1", 1)}},
2: {2, []RWChannel{getChannel("ch3", 1)}},
3: {3, []RWChannel{}},
},
},
3,
},
nil,
nil,
NewChannelOpSet(),
NewChannelOpSet(),
},
{
"test node with empty channel",
@ -759,28 +776,22 @@ func TestAvgAssignRegisterPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{Name: "ch1", CollectionID: 1}, {Name: "ch2", CollectionID: 1}, {Name: "ch3", CollectionID: 1}}},
2: {2, []*channel{}},
3: {3, []*channel{}},
1: {1, []RWChannel{getChannel("ch1", 1), getChannel("ch2", 1), getChannel("ch3", 1)}},
2: {2, []RWChannel{}},
3: {3, []RWChannel{}},
},
},
3,
},
nil,
[]*ChannelOp{
{
Type: Add,
NodeID: 1,
Channels: []*channel{{Name: "ch1", CollectionID: 1}},
},
},
NewChannelOpSet(),
NewChannelOpSet(NewAddOp(1, getChannel("ch1", 1))),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
bufferedUpdates, balanceUpdates := AvgAssignRegisterPolicy(tt.args.store, tt.args.nodeID)
assert.EqualValues(t, tt.bufferedUpdates, bufferedUpdates)
assert.EqualValues(t, tt.balanceUpdates, balanceUpdates)
assert.EqualValues(t, tt.bufferedUpdates.Collect(), bufferedUpdates.Collect())
assert.EqualValues(t, tt.balanceUpdates.Collect(), balanceUpdates.Collect())
})
}
}

View File

@ -356,7 +356,7 @@ func TestFlush(t *testing.T) {
err := svr.channelManager.AddNode(1)
assert.NoError(t, err)
err = svr.channelManager.Watch(context.TODO(), &channel{Name: "ch1", CollectionID: 0})
err = svr.channelManager.Watch(context.TODO(), &channelMeta{Name: "ch1", CollectionID: 0})
assert.NoError(t, err)
resp, err := svr.Flush(context.TODO(), req)
@ -1310,7 +1310,7 @@ func TestSaveBinlogPaths(t *testing.T) {
err := svr.channelManager.AddNode(0)
assert.NoError(t, err)
err = svr.channelManager.Watch(ctx, &channel{Name: "ch1", CollectionID: 0})
err = svr.channelManager.Watch(ctx, &channelMeta{Name: "ch1", CollectionID: 0})
assert.NoError(t, err)
resp, err := svr.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
@ -1412,7 +1412,7 @@ func TestSaveBinlogPaths(t *testing.T) {
ctx := context.Background()
err := svr.channelManager.AddNode(0)
assert.NoError(t, err)
err = svr.channelManager.Watch(ctx, &channel{Name: "ch1", CollectionID: 0})
err = svr.channelManager.Watch(ctx, &channelMeta{Name: "ch1", CollectionID: 0})
assert.NoError(t, err)
resp, err := svr.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
@ -1490,7 +1490,7 @@ func TestSaveBinlogPaths(t *testing.T) {
ctx := context.Background()
err := svr.channelManager.AddNode(0)
assert.NoError(t, err)
err = svr.channelManager.Watch(ctx, &channel{Name: "ch1", CollectionID: 0})
err = svr.channelManager.Watch(ctx, &channelMeta{Name: "ch1", CollectionID: 0})
assert.NoError(t, err)
resp, err := svr.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
@ -1544,7 +1544,7 @@ func TestSaveBinlogPaths(t *testing.T) {
ctx := context.Background()
err := svr.channelManager.AddNode(0)
assert.NoError(t, err)
err = svr.channelManager.Watch(ctx, &channel{Name: "ch1", CollectionID: 0})
err = svr.channelManager.Watch(ctx, &channelMeta{Name: "ch1", CollectionID: 0})
assert.NoError(t, err)
resp, err := svr.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
@ -1591,7 +1591,7 @@ func TestSaveBinlogPaths(t *testing.T) {
defer closeTestServer(t, svr)
err := svr.channelManager.AddNode(0)
require.Nil(t, err)
err = svr.channelManager.Watch(context.TODO(), &channel{Name: "ch1", CollectionID: 0})
err = svr.channelManager.Watch(context.TODO(), &channelMeta{Name: "ch1", CollectionID: 0})
require.Nil(t, err)
s := &datapb.SegmentInfo{
ID: 1,
@ -1709,7 +1709,7 @@ func TestDropVirtualChannel(t *testing.T) {
ctx := context.Background()
err := svr.channelManager.AddNode(0)
require.Nil(t, err)
err = svr.channelManager.Watch(ctx, &channel{Name: "ch1", CollectionID: 0})
err = svr.channelManager.Watch(ctx, &channelMeta{Name: "ch1", CollectionID: 0})
require.Nil(t, err)
req := &datapb.DropVirtualChannelRequest{
@ -1781,7 +1781,7 @@ func TestDropVirtualChannel(t *testing.T) {
<-spyCh
err = svr.channelManager.Watch(ctx, &channel{Name: "ch1", CollectionID: 0})
err = svr.channelManager.Watch(ctx, &channelMeta{Name: "ch1", CollectionID: 0})
require.Nil(t, err)
// resend
@ -1795,7 +1795,7 @@ func TestDropVirtualChannel(t *testing.T) {
defer closeTestServer(t, svr)
err := svr.channelManager.AddNode(0)
require.Nil(t, err)
err = svr.channelManager.Watch(context.TODO(), &channel{Name: "ch1", CollectionID: 0})
err = svr.channelManager.Watch(context.TODO(), &channelMeta{Name: "ch1", CollectionID: 0})
require.Nil(t, err)
resp, err := svr.DropVirtualChannel(context.Background(), &datapb.DropVirtualChannelRequest{
@ -1905,7 +1905,7 @@ func TestGetChannelSeekPosition(t *testing.T) {
assert.NoError(t, err)
}
seekPos := svr.handler.(*ServerHandler).GetChannelSeekPosition(&channel{
seekPos := svr.handler.(*ServerHandler).GetChannelSeekPosition(&channelMeta{
Name: test.channelName,
CollectionID: 0,
}, allPartitionID)
@ -1996,13 +1996,13 @@ func TestGetDataVChanPositions(t *testing.T) {
require.Nil(t, err)
t.Run("get unexisted channel", func(t *testing.T) {
vchan := svr.handler.GetDataVChanPositions(&channel{Name: "chx1", CollectionID: 0}, allPartitionID)
vchan := svr.handler.GetDataVChanPositions(&channelMeta{Name: "chx1", CollectionID: 0}, allPartitionID)
assert.Empty(t, vchan.UnflushedSegmentIds)
assert.Empty(t, vchan.FlushedSegmentIds)
})
t.Run("get existed channel", func(t *testing.T) {
vchan := svr.handler.GetDataVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID)
vchan := svr.handler.GetDataVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}, allPartitionID)
assert.EqualValues(t, 1, len(vchan.FlushedSegmentIds))
assert.EqualValues(t, 1, vchan.FlushedSegmentIds[0])
assert.EqualValues(t, 2, len(vchan.UnflushedSegmentIds))
@ -2010,14 +2010,14 @@ func TestGetDataVChanPositions(t *testing.T) {
})
t.Run("empty collection", func(t *testing.T) {
infos := svr.handler.GetDataVChanPositions(&channel{Name: "ch0_suffix", CollectionID: 1}, allPartitionID)
infos := svr.handler.GetDataVChanPositions(&channelMeta{Name: "ch0_suffix", CollectionID: 1}, allPartitionID)
assert.EqualValues(t, 1, infos.CollectionID)
assert.EqualValues(t, 0, len(infos.FlushedSegmentIds))
assert.EqualValues(t, 0, len(infos.UnflushedSegmentIds))
})
t.Run("filter partition", func(t *testing.T) {
infos := svr.handler.GetDataVChanPositions(&channel{Name: "ch1", CollectionID: 0}, 1)
infos := svr.handler.GetDataVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}, 1)
assert.EqualValues(t, 0, infos.CollectionID)
assert.EqualValues(t, 0, len(infos.FlushedSegmentIds))
assert.EqualValues(t, 1, len(infos.UnflushedSegmentIds))
@ -2026,7 +2026,7 @@ func TestGetDataVChanPositions(t *testing.T) {
t.Run("empty collection with passed positions", func(t *testing.T) {
vchannel := "ch_no_segment_1"
pchannel := funcutil.ToPhysicalChannel(vchannel)
infos := svr.handler.GetDataVChanPositions(&channel{
infos := svr.handler.GetDataVChanPositions(&channelMeta{
Name: vchannel,
CollectionID: 0,
StartPositions: []*commonpb.KeyDataPair{{Key: pchannel, Data: []byte{14, 15, 16}}},
@ -2154,27 +2154,27 @@ func TestGetQueryVChanPositions(t *testing.T) {
//}
t.Run("get unexisted channel", func(t *testing.T) {
vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "chx1", CollectionID: 0}, allPartitionID)
vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "chx1", CollectionID: 0}, allPartitionID)
assert.Empty(t, vchan.UnflushedSegmentIds)
assert.Empty(t, vchan.FlushedSegmentIds)
})
t.Run("get existed channel", func(t *testing.T) {
vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID)
vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}, allPartitionID)
assert.EqualValues(t, 1, len(vchan.FlushedSegmentIds))
assert.ElementsMatch(t, []int64{1}, vchan.FlushedSegmentIds)
assert.EqualValues(t, 2, len(vchan.UnflushedSegmentIds))
})
t.Run("empty collection", func(t *testing.T) {
infos := svr.handler.GetQueryVChanPositions(&channel{Name: "ch0_suffix", CollectionID: 1}, allPartitionID)
infos := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch0_suffix", CollectionID: 1}, allPartitionID)
assert.EqualValues(t, 1, infos.CollectionID)
assert.EqualValues(t, 0, len(infos.FlushedSegmentIds))
assert.EqualValues(t, 0, len(infos.UnflushedSegmentIds))
})
t.Run("filter partition", func(t *testing.T) {
infos := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, 1)
infos := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}, 1)
assert.EqualValues(t, 0, infos.CollectionID)
assert.EqualValues(t, 0, len(infos.FlushedSegmentIds))
assert.EqualValues(t, 1, len(infos.UnflushedSegmentIds))
@ -2183,7 +2183,7 @@ func TestGetQueryVChanPositions(t *testing.T) {
t.Run("empty collection with passed positions", func(t *testing.T) {
vchannel := "ch_no_segment_1"
pchannel := funcutil.ToPhysicalChannel(vchannel)
infos := svr.handler.GetQueryVChanPositions(&channel{
infos := svr.handler.GetQueryVChanPositions(&channelMeta{
Name: vchannel,
CollectionID: 0,
StartPositions: []*commonpb.KeyDataPair{{Key: pchannel, Data: []byte{14, 15, 16}}},
@ -2258,7 +2258,7 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(e))
assert.NoError(t, err)
vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID)
vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}, allPartitionID)
assert.EqualValues(t, 2, len(vchan.FlushedSegmentIds))
assert.EqualValues(t, 0, len(vchan.UnflushedSegmentIds))
assert.ElementsMatch(t, []int64{c.GetID(), d.GetID()}, vchan.FlushedSegmentIds) // expected c, d
@ -2344,7 +2344,7 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(e))
assert.NoError(t, err)
vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID)
vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}, allPartitionID)
assert.EqualValues(t, 2, len(vchan.FlushedSegmentIds))
assert.EqualValues(t, 0, len(vchan.UnflushedSegmentIds))
assert.ElementsMatch(t, []int64{c.GetID(), d.GetID()}, vchan.FlushedSegmentIds) // expected c, d
@ -2436,7 +2436,7 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
})
assert.NoError(t, err)
vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID)
vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}, allPartitionID)
assert.EqualValues(t, 1, len(vchan.FlushedSegmentIds))
assert.EqualValues(t, 0, len(vchan.UnflushedSegmentIds))
assert.ElementsMatch(t, []int64{e.GetID()}, vchan.FlushedSegmentIds) // expected e
@ -2799,7 +2799,7 @@ func TestGetRecoveryInfo(t *testing.T) {
err = svr.channelManager.AddNode(0)
assert.NoError(t, err)
err = svr.channelManager.Watch(context.TODO(), &channel{Name: "vchan1", CollectionID: 0})
err = svr.channelManager.Watch(context.TODO(), &channelMeta{Name: "vchan1", CollectionID: 0})
assert.NoError(t, err)
sResp, err := svr.SaveBinlogPaths(context.TODO(), binlogReq)
@ -3392,7 +3392,7 @@ func TestGetFlushState(t *testing.T) {
svr.channelManager = &ChannelManager{
store: &ChannelStore{
channelsInfo: map[int64]*NodeChannelInfo{
1: {NodeID: 1, Channels: []*channel{{Name: vchannel, CollectionID: collection}}},
1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: vchannel, CollectionID: collection}}},
},
},
}
@ -3440,7 +3440,7 @@ func TestGetFlushState(t *testing.T) {
svr.channelManager = &ChannelManager{
store: &ChannelStore{
channelsInfo: map[int64]*NodeChannelInfo{
1: {NodeID: 1, Channels: []*channel{{Name: vchannel, CollectionID: collection}}},
1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: vchannel, CollectionID: collection}}},
},
},
}
@ -3488,7 +3488,7 @@ func TestGetFlushState(t *testing.T) {
svr.channelManager = &ChannelManager{
store: &ChannelStore{
channelsInfo: map[int64]*NodeChannelInfo{
1: {NodeID: 1, Channels: []*channel{{Name: vchannel, CollectionID: collection}}},
1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: vchannel, CollectionID: collection}}},
},
},
}
@ -3521,7 +3521,7 @@ func TestGetFlushState(t *testing.T) {
svr.channelManager = &ChannelManager{
store: &ChannelStore{
channelsInfo: map[int64]*NodeChannelInfo{
1: {NodeID: 1, Channels: []*channel{{Name: vchannel, CollectionID: collection}}},
1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: vchannel, CollectionID: collection}}},
},
},
}
@ -3557,7 +3557,7 @@ func TestGetFlushState(t *testing.T) {
svr.channelManager = &ChannelManager{
store: &ChannelStore{
channelsInfo: map[int64]*NodeChannelInfo{
1: {NodeID: 1, Channels: []*channel{{Name: vchannel, CollectionID: collection}}},
1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: vchannel, CollectionID: collection}}},
},
},
}
@ -3881,7 +3881,7 @@ func TestDataCoord_Import(t *testing.T) {
})
err := svr.channelManager.AddNode(0)
assert.NoError(t, err)
err = svr.channelManager.Watch(svr.ctx, &channel{Name: "ch1", CollectionID: 0})
err = svr.channelManager.Watch(svr.ctx, &channelMeta{Name: "ch1", CollectionID: 0})
assert.NoError(t, err)
resp, err := svr.Import(svr.ctx, &datapb.ImportTaskRequest{
@ -3900,7 +3900,7 @@ func TestDataCoord_Import(t *testing.T) {
err := svr.channelManager.AddNode(0)
assert.NoError(t, err)
err = svr.channelManager.Watch(svr.ctx, &channel{Name: "ch1", CollectionID: 0})
err = svr.channelManager.Watch(svr.ctx, &channelMeta{Name: "ch1", CollectionID: 0})
assert.NoError(t, err)
resp, err := svr.Import(svr.ctx, &datapb.ImportTaskRequest{
@ -4044,7 +4044,7 @@ func TestDataCoord_SaveImportSegment(t *testing.T) {
})
err := svr.channelManager.AddNode(110)
assert.NoError(t, err)
err = svr.channelManager.Watch(context.TODO(), &channel{Name: "ch1", CollectionID: 100})
err = svr.channelManager.Watch(context.TODO(), &channelMeta{Name: "ch1", CollectionID: 100})
assert.NoError(t, err)
status, err := svr.SaveImportSegment(context.TODO(), &datapb.SaveImportSegmentRequest{
@ -4081,7 +4081,7 @@ func TestDataCoord_SaveImportSegment(t *testing.T) {
err := svr.channelManager.AddNode(110)
assert.NoError(t, err)
err = svr.channelManager.Watch(context.TODO(), &channel{Name: "ch1", CollectionID: 100})
err = svr.channelManager.Watch(context.TODO(), &channelMeta{Name: "ch1", CollectionID: 100})
assert.NoError(t, err)
status, err := svr.SaveImportSegment(context.TODO(), &datapb.SaveImportSegmentRequest{

View File

@ -115,14 +115,9 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F
var isUnimplemented bool
err = retry.Do(ctx, func() error {
for _, channelInfo := range s.channelManager.GetAssignedChannels() {
nodeID := channelInfo.NodeID
channels := lo.Filter(channelInfo.Channels, func(channel *channel, _ int) bool {
return channel.CollectionID == req.GetCollectionID()
})
channelNames := lo.Map(channels, func(channel *channel, _ int) string {
return channel.Name
})
nodeChannels := s.channelManager.GetNodeChannelsByCollectionID(req.GetCollectionID())
for nodeID, channelNames := range nodeChannels {
err = s.cluster.FlushChannels(ctx, nodeID, ts, channelNames)
if err != nil && errors.Is(err, merr.ErrServiceUnimplemented) {
isUnimplemented = true
@ -665,7 +660,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
channelInfos := make([]*datapb.VchannelInfo, 0, len(channels))
flushedIDs := make(typeutil.UniqueSet)
for _, c := range channels {
channelInfo := s.handler.GetQueryVChanPositions(&channel{Name: c, CollectionID: collectionID}, partitionID)
channelInfo := s.handler.GetQueryVChanPositions(&channelMeta{Name: c, CollectionID: collectionID}, partitionID)
channelInfos = append(channelInfos, channelInfo)
log.Info("datacoord append channelInfo in GetRecoveryInfo",
zap.String("channel", channelInfo.GetChannelName()),
@ -1181,7 +1176,7 @@ func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsReq
}, nil
}
for _, channelName := range req.GetChannelNames() {
ch := &channel{
ch := &channelMeta{
Name: channelName,
CollectionID: req.GetCollectionID(),
StartPositions: req.GetStartPositions(),
@ -1235,17 +1230,7 @@ func (s *Server) GetFlushState(ctx context.Context, req *datapb.GetFlushStateReq
}
}
channels := make([]string, 0)
for _, channelInfo := range s.channelManager.GetAssignedChannels() {
filtered := lo.Filter(channelInfo.Channels, func(channel *channel, _ int) bool {
return channel.CollectionID == req.GetCollectionID()
})
channelNames := lo.Map(filtered, func(channel *channel, _ int) string {
return channel.Name
})
channels = append(channels, channelNames...)
}
channels := s.channelManager.GetChannelNamesByCollectionID(req.GetCollectionID())
if len(channels) == 0 { // For compatibility with old client
resp.Flushed = true

View File

@ -228,7 +228,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
})
assert.NoError(t, err)
ch := &channel{Name: "vchan1", CollectionID: 0}
ch := &channelMeta{Name: "vchan1", CollectionID: 0}
svr.channelManager.AddNode(0)
svr.channelManager.Watch(context.Background(), ch)
@ -308,7 +308,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2))
assert.NoError(t, err)
ch := &channel{Name: "vchan1", CollectionID: 0}
ch := &channelMeta{Name: "vchan1", CollectionID: 0}
svr.channelManager.AddNode(0)
svr.channelManager.Watch(context.Background(), ch)
@ -403,7 +403,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
err = svr.channelManager.AddNode(0)
assert.NoError(t, err)
err = svr.channelManager.Watch(context.Background(), &channel{Name: "vchan1", CollectionID: 0})
err = svr.channelManager.Watch(context.Background(), &channelMeta{Name: "vchan1", CollectionID: 0})
assert.NoError(t, err)
sResp, err := svr.SaveBinlogPaths(context.TODO(), binlogReq)
@ -449,7 +449,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2))
assert.NoError(t, err)
ch := &channel{Name: "vchan1", CollectionID: 0}
ch := &channelMeta{Name: "vchan1", CollectionID: 0}
svr.channelManager.AddNode(0)
svr.channelManager.Watch(context.Background(), ch)
@ -495,7 +495,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2))
assert.NoError(t, err)
ch := &channel{Name: "vchan1", CollectionID: 0}
ch := &channelMeta{Name: "vchan1", CollectionID: 0}
svr.channelManager.AddNode(0)
svr.channelManager.Watch(context.Background(), ch)
@ -579,7 +579,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
IndexSize: 0,
})
ch := &channel{Name: "vchan1", CollectionID: 0}
ch := &channelMeta{Name: "vchan1", CollectionID: 0}
svr.channelManager.AddNode(0)
svr.channelManager.Watch(context.Background(), ch)