enhance: Change ChannelManager to interface (#29300)

Rewrite cluster test
issue: #28854

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/29468/head
XuanYang-cn 2023-12-25 19:24:46 +08:00 committed by GitHub
parent fce1a8dafb
commit ae180d1628
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 878 additions and 626 deletions

View File

@ -439,6 +439,7 @@ generate-mockery-datacoord: getdeps
$(INSTALL_PATH)/mockery --name=compactionPlanContext --dir=internal/datacoord --filename=mock_compaction_plan_context.go --output=internal/datacoord --structname=MockCompactionPlanContext --with-expecter --inpackage $(INSTALL_PATH)/mockery --name=compactionPlanContext --dir=internal/datacoord --filename=mock_compaction_plan_context.go --output=internal/datacoord --structname=MockCompactionPlanContext --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=CompactionMeta --dir=internal/datacoord --filename=mock_compaction_meta.go --output=internal/datacoord --structname=MockCompactionMeta --with-expecter --inpackage $(INSTALL_PATH)/mockery --name=CompactionMeta --dir=internal/datacoord --filename=mock_compaction_meta.go --output=internal/datacoord --structname=MockCompactionMeta --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=Scheduler --dir=internal/datacoord --filename=mock_scheduler.go --output=internal/datacoord --structname=MockScheduler --with-expecter --inpackage $(INSTALL_PATH)/mockery --name=Scheduler --dir=internal/datacoord --filename=mock_scheduler.go --output=internal/datacoord --structname=MockScheduler --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=ChannelManager --dir=internal/datacoord --filename=mock_channelmanager.go --output=internal/datacoord --structname=MockChannelManager --with-expecter --inpackage
generate-mockery-datanode: getdeps generate-mockery-datanode: getdeps
$(INSTALL_PATH)/mockery --name=Allocator --dir=$(PWD)/internal/datanode/allocator --output=$(PWD)/internal/datanode/allocator --filename=mock_allocator.go --with-expecter --structname=MockAllocator --outpkg=allocator --inpackage $(INSTALL_PATH)/mockery --name=Allocator --dir=$(PWD)/internal/datanode/allocator --output=$(PWD)/internal/datanode/allocator --filename=mock_allocator.go --with-expecter --structname=MockAllocator --outpkg=allocator --inpackage

View File

@ -37,8 +37,27 @@ import (
"github.com/milvus-io/milvus/pkg/util/logutil" "github.com/milvus-io/milvus/pkg/util/logutil"
) )
// ChannelManager manages the allocation and the balance between channels and data nodes. type ChannelManager interface {
type ChannelManager struct { Startup(ctx context.Context, nodes []int64) error
Close()
AddNode(nodeID int64) error
DeleteNode(nodeID int64) error
Watch(ctx context.Context, ch RWChannel) error
RemoveChannel(channelName string) error
Release(nodeID UniqueID, channelName string) error
Match(nodeID int64, channel string) bool
FindWatcher(channel string) (int64, error)
GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string
GetChannelsByCollectionID(collectionID UniqueID) []RWChannel
GetCollectionIDByChannel(channel string) (bool, UniqueID)
GetNodeIDByChannelName(channel string) (bool, UniqueID)
}
// ChannelManagerImpl manages the allocation and the balance between channels and data nodes.
type ChannelManagerImpl struct {
ctx context.Context ctx context.Context
mu sync.RWMutex mu sync.RWMutex
h Handler h Handler
@ -60,10 +79,10 @@ type ChannelManager struct {
} }
// ChannelManagerOpt is to set optional parameters in channel manager. // ChannelManagerOpt is to set optional parameters in channel manager.
type ChannelManagerOpt func(c *ChannelManager) type ChannelManagerOpt func(c *ChannelManagerImpl)
func withFactory(f ChannelPolicyFactory) ChannelManagerOpt { func withFactory(f ChannelPolicyFactory) ChannelManagerOpt {
return func(c *ChannelManager) { c.factory = f } return func(c *ChannelManagerImpl) { c.factory = f }
} }
func defaultFactory(hash *consistent.Consistent) ChannelPolicyFactory { func defaultFactory(hash *consistent.Consistent) ChannelPolicyFactory {
@ -71,15 +90,15 @@ func defaultFactory(hash *consistent.Consistent) ChannelPolicyFactory {
} }
func withMsgstreamFactory(f msgstream.Factory) ChannelManagerOpt { func withMsgstreamFactory(f msgstream.Factory) ChannelManagerOpt {
return func(c *ChannelManager) { c.msgstreamFactory = f } return func(c *ChannelManagerImpl) { c.msgstreamFactory = f }
} }
func withStateChecker() ChannelManagerOpt { func withStateChecker() ChannelManagerOpt {
return func(c *ChannelManager) { c.stateChecker = c.watchChannelStatesLoop } return func(c *ChannelManagerImpl) { c.stateChecker = c.watchChannelStatesLoop }
} }
func withBgChecker() ChannelManagerOpt { func withBgChecker() ChannelManagerOpt {
return func(c *ChannelManager) { c.bgChecker = c.bgCheckChannelsWork } return func(c *ChannelManagerImpl) { c.bgChecker = c.bgCheckChannelsWork }
} }
// NewChannelManager creates and returns a new ChannelManager instance. // NewChannelManager creates and returns a new ChannelManager instance.
@ -87,8 +106,8 @@ func NewChannelManager(
kv kv.WatchKV, // for TxnKv, MetaKv and WatchKV kv kv.WatchKV, // for TxnKv, MetaKv and WatchKV
h Handler, h Handler,
options ...ChannelManagerOpt, options ...ChannelManagerOpt,
) (*ChannelManager, error) { ) (*ChannelManagerImpl, error) {
c := &ChannelManager{ c := &ChannelManagerImpl{
ctx: context.TODO(), ctx: context.TODO(),
h: h, h: h,
factory: NewChannelPolicyFactoryV1(kv), factory: NewChannelPolicyFactoryV1(kv),
@ -114,7 +133,7 @@ func NewChannelManager(
} }
// Startup adjusts the channel store according to current cluster states. // Startup adjusts the channel store according to current cluster states.
func (c *ChannelManager) Startup(ctx context.Context, nodes []int64) error { func (c *ChannelManagerImpl) Startup(ctx context.Context, nodes []int64) error {
c.ctx = ctx c.ctx = ctx
channels := c.store.GetNodesChannels() channels := c.store.GetNodesChannels()
// Retrieve the current old nodes. // Retrieve the current old nodes.
@ -171,7 +190,7 @@ func (c *ChannelManager) Startup(ctx context.Context, nodes []int64) error {
} }
// Close notifies the running checker. // Close notifies the running checker.
func (c *ChannelManager) Close() { func (c *ChannelManagerImpl) Close() {
if c.stopChecker != nil { if c.stopChecker != nil {
c.stopChecker() c.stopChecker()
} }
@ -184,7 +203,7 @@ func (c *ChannelManager) Close() {
// ToRelase get startTs and timeoutTs, start timer // ToRelase get startTs and timeoutTs, start timer
// ReleaseSuccess remove // ReleaseSuccess remove
// ReleaseFail clean up and remove // ReleaseFail clean up and remove
func (c *ChannelManager) checkOldNodes(nodes []UniqueID) error { func (c *ChannelManagerImpl) checkOldNodes(nodes []UniqueID) error {
// Load all the watch infos before processing // Load all the watch infos before processing
nodeWatchInfos := make(map[UniqueID][]*datapb.ChannelWatchInfo) nodeWatchInfos := make(map[UniqueID][]*datapb.ChannelWatchInfo)
for _, nodeID := range nodes { for _, nodeID := range nodes {
@ -232,7 +251,7 @@ func (c *ChannelManager) checkOldNodes(nodes []UniqueID) error {
} }
// unwatchDroppedChannels removes drops channel that are marked to drop. // unwatchDroppedChannels removes drops channel that are marked to drop.
func (c *ChannelManager) unwatchDroppedChannels() { func (c *ChannelManagerImpl) unwatchDroppedChannels() {
nodeChannels := c.store.GetChannels() nodeChannels := c.store.GetChannels()
for _, nodeChannel := range nodeChannels { for _, nodeChannel := range nodeChannels {
for _, ch := range nodeChannel.Channels { for _, ch := range nodeChannel.Channels {
@ -252,7 +271,7 @@ func (c *ChannelManager) unwatchDroppedChannels() {
} }
} }
func (c *ChannelManager) bgCheckChannelsWork(ctx context.Context) { func (c *ChannelManagerImpl) bgCheckChannelsWork(ctx context.Context) {
ticker := time.NewTicker(Params.DataCoordCfg.ChannelBalanceInterval.GetAsDuration(time.Second)) ticker := time.NewTicker(Params.DataCoordCfg.ChannelBalanceInterval.GetAsDuration(time.Second))
defer ticker.Stop() defer ticker.Stop()
for { for {
@ -282,7 +301,7 @@ func (c *ChannelManager) bgCheckChannelsWork(ctx context.Context) {
} }
// getOldOnlines returns a list of old online node ids in `old` and in `curr`. // getOldOnlines returns a list of old online node ids in `old` and in `curr`.
func (c *ChannelManager) getOldOnlines(curr []int64, old []int64) []int64 { func (c *ChannelManagerImpl) getOldOnlines(curr []int64, old []int64) []int64 {
mcurr := make(map[int64]struct{}) mcurr := make(map[int64]struct{})
ret := make([]int64, 0, len(old)) ret := make([]int64, 0, len(old))
for _, n := range curr { for _, n := range curr {
@ -297,7 +316,7 @@ func (c *ChannelManager) getOldOnlines(curr []int64, old []int64) []int64 {
} }
// getNewOnLines returns a list of new online node ids in `curr` but not in `old`. // getNewOnLines returns a list of new online node ids in `curr` but not in `old`.
func (c *ChannelManager) getNewOnLines(curr []int64, old []int64) []int64 { func (c *ChannelManagerImpl) getNewOnLines(curr []int64, old []int64) []int64 {
mold := make(map[int64]struct{}) mold := make(map[int64]struct{})
ret := make([]int64, 0, len(curr)) ret := make([]int64, 0, len(curr))
for _, n := range old { for _, n := range old {
@ -312,7 +331,7 @@ func (c *ChannelManager) getNewOnLines(curr []int64, old []int64) []int64 {
} }
// getOffLines returns a list of new offline node ids in `old` but not in `curr`. // getOffLines returns a list of new offline node ids in `old` but not in `curr`.
func (c *ChannelManager) getOffLines(curr []int64, old []int64) []int64 { func (c *ChannelManagerImpl) getOffLines(curr []int64, old []int64) []int64 {
mcurr := make(map[int64]struct{}) mcurr := make(map[int64]struct{})
ret := make([]int64, 0, len(old)) ret := make([]int64, 0, len(old))
for _, n := range curr { for _, n := range curr {
@ -327,7 +346,7 @@ func (c *ChannelManager) getOffLines(curr []int64, old []int64) []int64 {
} }
// AddNode adds a new node to cluster and reassigns the node - channel mapping. // AddNode adds a new node to cluster and reassigns the node - channel mapping.
func (c *ChannelManager) AddNode(nodeID int64) error { func (c *ChannelManagerImpl) AddNode(nodeID int64) error {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
@ -366,7 +385,7 @@ func (c *ChannelManager) AddNode(nodeID int64) error {
// DeleteNode deletes the node from the cluster. // DeleteNode deletes the node from the cluster.
// DeleteNode deletes the nodeID's watchInfos in Etcd and reassign the channels to other Nodes // DeleteNode deletes the nodeID's watchInfos in Etcd and reassign the channels to other Nodes
func (c *ChannelManager) DeleteNode(nodeID int64) error { func (c *ChannelManagerImpl) DeleteNode(nodeID int64) error {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
@ -408,7 +427,7 @@ func (c *ChannelManager) DeleteNode(nodeID int64) error {
} }
// unsubAttempt attempts to unsubscribe node-channel info from the channel. // unsubAttempt attempts to unsubscribe node-channel info from the channel.
func (c *ChannelManager) unsubAttempt(ncInfo *NodeChannelInfo) { func (c *ChannelManagerImpl) unsubAttempt(ncInfo *NodeChannelInfo) {
if ncInfo == nil { if ncInfo == nil {
return return
} }
@ -428,7 +447,7 @@ func (c *ChannelManager) unsubAttempt(ncInfo *NodeChannelInfo) {
} }
// Watch tries to add the channel to cluster. Watch is a no op if the channel already exists. // Watch tries to add the channel to cluster. Watch is a no op if the channel already exists.
func (c *ChannelManager) Watch(ctx context.Context, ch RWChannel) error { func (c *ChannelManagerImpl) Watch(ctx context.Context, ch RWChannel) error {
log := log.Ctx(ctx) log := log.Ctx(ctx)
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
@ -450,7 +469,7 @@ func (c *ChannelManager) Watch(ctx context.Context, ch RWChannel) error {
} }
// fillChannelWatchInfoWithState updates the channel op by filling in channel watch info. // fillChannelWatchInfoWithState updates the channel op by filling in channel watch info.
func (c *ChannelManager) fillChannelWatchInfoWithState(op *ChannelOp, state datapb.ChannelWatchState) []string { func (c *ChannelManagerImpl) fillChannelWatchInfoWithState(op *ChannelOp, state datapb.ChannelWatchState) []string {
channelsWithTimer := []string{} channelsWithTimer := []string{}
startTs := time.Now().Unix() startTs := time.Now().Unix()
checkInterval := Params.DataCoordCfg.WatchTimeoutInterval.GetAsDuration(time.Second) checkInterval := Params.DataCoordCfg.WatchTimeoutInterval.GetAsDuration(time.Second)
@ -475,7 +494,7 @@ func (c *ChannelManager) fillChannelWatchInfoWithState(op *ChannelOp, state data
} }
// GetAssignedChannels gets channels info of registered nodes. // GetAssignedChannels gets channels info of registered nodes.
func (c *ChannelManager) GetAssignedChannels() []*NodeChannelInfo { func (c *ChannelManagerImpl) GetAssignedChannels() []*NodeChannelInfo {
c.mu.RLock() c.mu.RLock()
defer c.mu.RUnlock() defer c.mu.RUnlock()
@ -483,7 +502,7 @@ func (c *ChannelManager) GetAssignedChannels() []*NodeChannelInfo {
} }
// GetBufferChannels gets buffer channels. // GetBufferChannels gets buffer channels.
func (c *ChannelManager) GetBufferChannels() *NodeChannelInfo { func (c *ChannelManagerImpl) GetBufferChannels() *NodeChannelInfo {
c.mu.RLock() c.mu.RLock()
defer c.mu.RUnlock() defer c.mu.RUnlock()
@ -491,7 +510,7 @@ func (c *ChannelManager) GetBufferChannels() *NodeChannelInfo {
} }
// GetNodeChannelsByCollectionID gets all node channels map of the collection // GetNodeChannelsByCollectionID gets all node channels map of the collection
func (c *ChannelManager) GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string { func (c *ChannelManagerImpl) GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string {
nodeChs := make(map[UniqueID][]string) nodeChs := make(map[UniqueID][]string)
for _, nodeChannels := range c.GetAssignedChannels() { for _, nodeChannels := range c.GetAssignedChannels() {
filtered := lo.Filter(nodeChannels.Channels, func(channel RWChannel, _ int) bool { filtered := lo.Filter(nodeChannels.Channels, func(channel RWChannel, _ int) bool {
@ -507,7 +526,7 @@ func (c *ChannelManager) GetNodeChannelsByCollectionID(collectionID UniqueID) ma
} }
// Get all channels belong to the collection // Get all channels belong to the collection
func (c *ChannelManager) GetChannelsByCollectionID(collectionID UniqueID) []RWChannel { func (c *ChannelManagerImpl) GetChannelsByCollectionID(collectionID UniqueID) []RWChannel {
channels := make([]RWChannel, 0) channels := make([]RWChannel, 0)
for _, nodeChannels := range c.GetAssignedChannels() { for _, nodeChannels := range c.GetAssignedChannels() {
filtered := lo.Filter(nodeChannels.Channels, func(channel RWChannel, _ int) bool { filtered := lo.Filter(nodeChannels.Channels, func(channel RWChannel, _ int) bool {
@ -520,7 +539,7 @@ func (c *ChannelManager) GetChannelsByCollectionID(collectionID UniqueID) []RWCh
} }
// Get all channel names belong to the collection // Get all channel names belong to the collection
func (c *ChannelManager) GetChannelNamesByCollectionID(collectionID UniqueID) []string { func (c *ChannelManagerImpl) GetChannelNamesByCollectionID(collectionID UniqueID) []string {
channels := c.GetChannelsByCollectionID(collectionID) channels := c.GetChannelsByCollectionID(collectionID)
return lo.Map(channels, func(channel RWChannel, _ int) string { return lo.Map(channels, func(channel RWChannel, _ int) string {
return channel.GetName() return channel.GetName()
@ -529,7 +548,7 @@ func (c *ChannelManager) GetChannelNamesByCollectionID(collectionID UniqueID) []
// Match checks and returns whether the node ID and channel match. // Match checks and returns whether the node ID and channel match.
// use vchannel // use vchannel
func (c *ChannelManager) Match(nodeID int64, channel string) bool { func (c *ChannelManagerImpl) Match(nodeID int64, channel string) bool {
c.mu.RLock() c.mu.RLock()
defer c.mu.RUnlock() defer c.mu.RUnlock()
@ -547,7 +566,7 @@ func (c *ChannelManager) Match(nodeID int64, channel string) bool {
} }
// FindWatcher finds the datanode watching the provided channel. // FindWatcher finds the datanode watching the provided channel.
func (c *ChannelManager) FindWatcher(channel string) (int64, error) { func (c *ChannelManagerImpl) FindWatcher(channel string) (int64, error) {
c.mu.RLock() c.mu.RLock()
defer c.mu.RUnlock() defer c.mu.RUnlock()
@ -571,7 +590,7 @@ func (c *ChannelManager) FindWatcher(channel string) (int64, error) {
} }
// RemoveChannel removes the channel from channel manager. // RemoveChannel removes the channel from channel manager.
func (c *ChannelManager) RemoveChannel(channelName string) error { func (c *ChannelManagerImpl) RemoveChannel(channelName string) error {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
@ -584,7 +603,7 @@ func (c *ChannelManager) RemoveChannel(channelName string) error {
} }
// remove deletes the nodeID-channel pair from data store. // remove deletes the nodeID-channel pair from data store.
func (c *ChannelManager) remove(nodeID int64, ch RWChannel) error { func (c *ChannelManagerImpl) remove(nodeID int64, ch RWChannel) error {
op := NewChannelOpSet(NewDeleteOp(nodeID, ch)) op := NewChannelOpSet(NewDeleteOp(nodeID, ch))
log.Info("remove channel assignment", log.Info("remove channel assignment",
zap.Int64("nodeID to be removed", nodeID), zap.Int64("nodeID to be removed", nodeID),
@ -596,7 +615,7 @@ func (c *ChannelManager) remove(nodeID int64, ch RWChannel) error {
return nil return nil
} }
func (c *ChannelManager) findChannel(channelName string) (int64, RWChannel) { func (c *ChannelManagerImpl) findChannel(channelName string) (int64, RWChannel) {
infos := c.store.GetNodesChannels() infos := c.store.GetNodesChannels()
for _, info := range infos { for _, info := range infos {
for _, channelInfo := range info.Channels { for _, channelInfo := range info.Channels {
@ -626,7 +645,7 @@ type ackEvent struct {
nodeID UniqueID nodeID UniqueID
} }
func (c *ChannelManager) updateWithTimer(updates *ChannelOpSet, state datapb.ChannelWatchState) error { func (c *ChannelManagerImpl) updateWithTimer(updates *ChannelOpSet, state datapb.ChannelWatchState) error {
channelsWithTimer := []string{} channelsWithTimer := []string{}
for _, op := range updates.Collect() { for _, op := range updates.Collect() {
if op.Type == Add { if op.Type == Add {
@ -643,7 +662,7 @@ func (c *ChannelManager) updateWithTimer(updates *ChannelOpSet, state datapb.Cha
return err return err
} }
func (c *ChannelManager) processAck(e *ackEvent) { func (c *ChannelManagerImpl) processAck(e *ackEvent) {
c.stateTimer.stopIfExist(e) c.stateTimer.stopIfExist(e)
switch e.ackType { switch e.ackType {
@ -684,7 +703,7 @@ func (c *ChannelManager) processAck(e *ackEvent) {
type channelStateChecker func(context.Context, int64) type channelStateChecker func(context.Context, int64)
func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context, revision int64) { func (c *ChannelManagerImpl) watchChannelStatesLoop(ctx context.Context, revision int64) {
defer logutil.LogPanic() defer logutil.LogPanic()
// REF MEP#7 watchInfo paths are orgnized as: [prefix]/channel/{node_id}/{channel_name} // REF MEP#7 watchInfo paths are orgnized as: [prefix]/channel/{node_id}/{channel_name}
@ -765,7 +784,7 @@ func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context, revision in
} }
// Release writes ToRelease channel watch states for a channel // Release writes ToRelease channel watch states for a channel
func (c *ChannelManager) Release(nodeID UniqueID, channelName string) error { func (c *ChannelManagerImpl) Release(nodeID UniqueID, channelName string) error {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
@ -784,7 +803,7 @@ func (c *ChannelManager) Release(nodeID UniqueID, channelName string) error {
} }
// Reassign reassigns a channel to another DataNode. // Reassign reassigns a channel to another DataNode.
func (c *ChannelManager) Reassign(originNodeID UniqueID, channelName string) error { func (c *ChannelManagerImpl) Reassign(originNodeID UniqueID, channelName string) error {
c.mu.RLock() c.mu.RLock()
ch := c.getChannelByNodeAndName(originNodeID, channelName) ch := c.getChannelByNodeAndName(originNodeID, channelName)
if ch == nil { if ch == nil {
@ -831,7 +850,7 @@ func (c *ChannelManager) Reassign(originNodeID UniqueID, channelName string) err
} }
// CleanupAndReassign tries to clean up datanode's subscription, and then reassigns the channel to another DataNode. // CleanupAndReassign tries to clean up datanode's subscription, and then reassigns the channel to another DataNode.
func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string) error { func (c *ChannelManagerImpl) CleanupAndReassign(nodeID UniqueID, channelName string) error {
c.mu.RLock() c.mu.RLock()
chToCleanUp := c.getChannelByNodeAndName(nodeID, channelName) chToCleanUp := c.getChannelByNodeAndName(nodeID, channelName)
if chToCleanUp == nil { if chToCleanUp == nil {
@ -888,7 +907,7 @@ func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string)
return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch) return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch)
} }
func (c *ChannelManager) getChannelByNodeAndName(nodeID UniqueID, channelName string) RWChannel { func (c *ChannelManagerImpl) getChannelByNodeAndName(nodeID UniqueID, channelName string) RWChannel {
var ret RWChannel var ret RWChannel
nodeChannelInfo := c.store.GetNode(nodeID) nodeChannelInfo := c.store.GetNode(nodeID)
@ -905,7 +924,7 @@ func (c *ChannelManager) getChannelByNodeAndName(nodeID UniqueID, channelName st
return ret return ret
} }
func (c *ChannelManager) getCollectionIDByChannel(channel string) (bool, UniqueID) { func (c *ChannelManagerImpl) GetCollectionIDByChannel(channel string) (bool, UniqueID) {
for _, nodeChannel := range c.GetAssignedChannels() { for _, nodeChannel := range c.GetAssignedChannels() {
for _, ch := range nodeChannel.Channels { for _, ch := range nodeChannel.Channels {
if ch.GetName() == channel { if ch.GetName() == channel {
@ -916,10 +935,10 @@ func (c *ChannelManager) getCollectionIDByChannel(channel string) (bool, UniqueI
return false, 0 return false, 0
} }
func (c *ChannelManager) getNodeIDByChannelName(chName string) (bool, UniqueID) { func (c *ChannelManagerImpl) GetNodeIDByChannelName(channel string) (bool, UniqueID) {
for _, nodeChannel := range c.GetAssignedChannels() { for _, nodeChannel := range c.GetAssignedChannels() {
for _, ch := range nodeChannel.Channels { for _, ch := range nodeChannel.Channels {
if ch.GetName() == chName { if ch.GetName() == channel {
return true, nodeChannel.NodeID return true, nodeChannel.NodeID
} }
} }
@ -927,11 +946,11 @@ func (c *ChannelManager) getNodeIDByChannelName(chName string) (bool, UniqueID)
return false, 0 return false, 0
} }
func (c *ChannelManager) isMarkedDrop(channelName string) bool { func (c *ChannelManagerImpl) isMarkedDrop(channel string) bool {
return c.h.CheckShouldDropChannel(channelName) return c.h.CheckShouldDropChannel(channel)
} }
func (c *ChannelManager) isSilent() bool { func (c *ChannelManagerImpl) isSilent() bool {
if c.stateTimer.hasRunningTimers() { if c.stateTimer.hasRunningTimers() {
return false return false
} }

View File

@ -596,7 +596,7 @@ func TestChannelManager(t *testing.T) {
}) })
t.Run("test Reassign-channel not found", func(t *testing.T) { t.Run("test Reassign-channel not found", func(t *testing.T) {
var chManager *ChannelManager var chManager *ChannelManagerImpl
var err error var err error
handler := NewNMockHandler(t) handler := NewNMockHandler(t)
handler.EXPECT(). handler.EXPECT().
@ -621,7 +621,7 @@ func TestChannelManager(t *testing.T) {
}) })
t.Run("test CleanupAndReassign-channel not found", func(t *testing.T) { t.Run("test CleanupAndReassign-channel not found", func(t *testing.T) {
var chManager *ChannelManager var chManager *ChannelManagerImpl
var err error var err error
handler := NewNMockHandler(t) handler := NewNMockHandler(t)
handler.EXPECT(). handler.EXPECT().
@ -1174,7 +1174,7 @@ func TestChannelManager_RemoveChannel(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
c := &ChannelManager{ c := &ChannelManagerImpl{
store: tt.fields.store, store: tt.fields.store,
} }
err := c.RemoveChannel(tt.args.channelName) err := c.RemoveChannel(tt.args.channelName)
@ -1186,7 +1186,7 @@ func TestChannelManager_RemoveChannel(t *testing.T) {
} }
func TestChannelManager_HelperFunc(t *testing.T) { func TestChannelManager_HelperFunc(t *testing.T) {
c := &ChannelManager{} c := &ChannelManagerImpl{}
t.Run("test getOldOnlines", func(t *testing.T) { t.Run("test getOldOnlines", func(t *testing.T) {
tests := []struct { tests := []struct {
nodes []int64 nodes []int64

View File

@ -49,11 +49,11 @@ var _ Cluster = (*ClusterImpl)(nil)
type ClusterImpl struct { type ClusterImpl struct {
sessionManager SessionManager sessionManager SessionManager
channelManager *ChannelManager channelManager ChannelManager
} }
// NewClusterImpl creates a new cluster // NewClusterImpl creates a new cluster
func NewClusterImpl(sessionManager SessionManager, channelManager *ChannelManager) *ClusterImpl { func NewClusterImpl(sessionManager SessionManager, channelManager ChannelManager) *ClusterImpl {
c := &ClusterImpl{ c := &ClusterImpl{
sessionManager: sessionManager, sessionManager: sessionManager,
channelManager: channelManager, channelManager: channelManager,
@ -67,10 +67,9 @@ func (c *ClusterImpl) Startup(ctx context.Context, nodes []*NodeInfo) error {
for _, node := range nodes { for _, node := range nodes {
c.sessionManager.AddSession(node) c.sessionManager.AddSession(node)
} }
currs := make([]int64, 0, len(nodes)) currs := lo.Map(nodes, func(info *NodeInfo, _ int) int64 {
for _, node := range nodes { return info.NodeID
currs = append(currs, node.NodeID) })
}
return c.channelManager.Startup(ctx, currs) return c.channelManager.Startup(ctx, currs)
} }
@ -102,7 +101,7 @@ func (c *ClusterImpl) Flush(ctx context.Context, nodeID int64, channel string, s
return fmt.Errorf("channel %s is not watched on node %d", channel, nodeID) return fmt.Errorf("channel %s is not watched on node %d", channel, nodeID)
} }
_, collID := c.channelManager.getCollectionIDByChannel(channel) _, collID := c.channelManager.GetCollectionIDByChannel(channel)
getSegmentID := func(segment *datapb.SegmentInfo, _ int) int64 { getSegmentID := func(segment *datapb.SegmentInfo, _ int) int64 {
return segment.GetID() return segment.GetID()
@ -153,7 +152,7 @@ func (c *ClusterImpl) Import(ctx context.Context, nodeID int64, it *datapb.Impor
func (c *ClusterImpl) AddImportSegment(ctx context.Context, req *datapb.AddImportSegmentRequest) (*datapb.AddImportSegmentResponse, error) { func (c *ClusterImpl) AddImportSegment(ctx context.Context, req *datapb.AddImportSegmentRequest) (*datapb.AddImportSegmentResponse, error) {
// Look for the DataNode that watches the channel. // Look for the DataNode that watches the channel.
ok, nodeID := c.channelManager.getNodeIDByChannelName(req.GetChannelName()) ok, nodeID := c.channelManager.GetNodeIDByChannelName(req.GetChannelName())
if !ok { if !ok {
err := merr.WrapErrChannelNotFound(req.GetChannelName(), "no DataNode watches this channel") err := merr.WrapErrChannelNotFound(req.GetChannelName(), "no DataNode watches this channel")
log.Error("no DataNode found for channel", zap.String("channelName", req.GetChannelName()), zap.Error(err)) log.Error("no DataNode found for channel", zap.String("channelName", req.GetChannelName()), zap.Error(err))

View File

@ -19,31 +19,22 @@ package datacoord
import ( import (
"context" "context"
"testing" "testing"
"time"
"github.com/cockroachdb/errors" "github.com/samber/lo"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"stathat.com/c/consistent"
"github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/kv/mocks" "github.com/milvus-io/milvus/internal/kv/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/testutils" "github.com/milvus-io/milvus/pkg/util/testutils"
) )
func getMetaKv(t *testing.T) kv.MetaKv { func TestCluster(t *testing.T) {
rootPath := "/etcd/test/root/" + t.Name() suite.Run(t, new(ClusterSuite))
kv, err := etcdkv.NewMetaKvFactory(rootPath, &Params.EtcdCfg)
require.NoError(t, err)
return kv
} }
func getWatchKV(t *testing.T) kv.WatchKV { func getWatchKV(t *testing.T) kv.WatchKV {
@ -57,580 +48,166 @@ func getWatchKV(t *testing.T) kv.WatchKV {
type ClusterSuite struct { type ClusterSuite struct {
testutils.PromMetricsSuite testutils.PromMetricsSuite
kv kv.WatchKV mockKv *mocks.WatchKV
} mockChManager *MockChannelManager
mockSession *MockSessionManager
func (suite *ClusterSuite) getWatchKV() kv.WatchKV {
rootPath := "/etcd/test/root/" + suite.T().Name()
kv, err := etcdkv.NewWatchKVFactory(rootPath, &Params.EtcdCfg)
suite.Require().NoError(err)
return kv
} }
func (suite *ClusterSuite) SetupTest() { func (suite *ClusterSuite) SetupTest() {
kv := getWatchKV(suite.T()) suite.mockKv = mocks.NewWatchKV(suite.T())
suite.kv = kv suite.mockChManager = NewMockChannelManager(suite.T())
suite.mockSession = NewMockSessionManager(suite.T())
} }
func (suite *ClusterSuite) TearDownTest() { func (suite *ClusterSuite) TearDownTest() {}
if suite.kv != nil {
suite.kv.RemoveWithPrefix("") func (suite *ClusterSuite) TestStartup() {
suite.kv.Close() nodes := []*NodeInfo{
{NodeID: 1, Address: "addr1"},
{NodeID: 2, Address: "addr2"},
{NodeID: 3, Address: "addr3"},
{NodeID: 4, Address: "addr4"},
} }
} suite.mockSession.EXPECT().AddSession(mock.Anything).Return().Times(len(nodes))
suite.mockChManager.EXPECT().Startup(mock.Anything, mock.Anything).
RunAndReturn(func(ctx context.Context, nodeIDs []int64) error {
suite.ElementsMatch(lo.Map(nodes, func(info *NodeInfo, _ int) int64 { return info.NodeID }), nodeIDs)
return nil
}).Once()
func (suite *ClusterSuite) TestCreate() { cluster := NewClusterImpl(suite.mockSession, suite.mockChManager)
kv := suite.kv err := cluster.Startup(context.Background(), nodes)
suite.Run("startup_normally", func() {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
sessionManager := NewSessionManagerImpl()
channelManager, err := NewChannelManager(kv, newMockHandler())
suite.NoError(err) suite.NoError(err)
cluster := NewClusterImpl(sessionManager, channelManager)
defer cluster.Close()
addr := "localhost:8080"
info := &NodeInfo{
NodeID: 1,
Address: addr,
}
nodes := []*NodeInfo{info}
err = cluster.Startup(ctx, nodes)
suite.NoError(err)
dataNodes := sessionManager.GetSessions()
suite.EqualValues(1, len(dataNodes))
suite.EqualValues("localhost:8080", dataNodes[0].info.Address)
suite.MetricsEqual(metrics.DataCoordNumDataNodes, 1)
})
suite.Run("startup_with_existed_channel_data", func() {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
var err error
info1 := &datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{
CollectionID: 1,
ChannelName: "channel1",
},
}
info1Data, err := proto.Marshal(info1)
suite.NoError(err)
err = kv.Save(Params.CommonCfg.DataCoordWatchSubPath.GetValue()+"/1/channel1", string(info1Data))
suite.NoError(err)
sessionManager := NewSessionManagerImpl()
channelManager, err := NewChannelManager(kv, newMockHandler())
suite.NoError(err)
cluster := NewClusterImpl(sessionManager, channelManager)
defer cluster.Close()
err = cluster.Startup(ctx, []*NodeInfo{{NodeID: 1, Address: "localhost:9999"}})
suite.NoError(err)
channels := channelManager.GetAssignedChannels()
suite.EqualValues([]*NodeChannelInfo{{1, []RWChannel{
&channelMeta{
Name: "channel1",
CollectionID: 1,
WatchInfo: &datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{
CollectionID: 1,
ChannelName: "channel1",
UnflushedSegmentIds: []int64{},
FlushedSegmentIds: []int64{},
DroppedSegmentIds: []int64{},
},
},
},
}}}, channels)
})
suite.Run("remove_all_nodes_and_restart_with_other_nodes", func() {
defer kv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
sessionManager := NewSessionManagerImpl()
channelManager, err := NewChannelManager(kv, newMockHandler())
suite.NoError(err)
cluster := NewClusterImpl(sessionManager, channelManager)
addr := "localhost:8080"
info := &NodeInfo{
NodeID: 1,
Address: addr,
}
nodes := []*NodeInfo{info}
err = cluster.Startup(ctx, nodes)
suite.NoError(err)
err = cluster.UnRegister(info)
suite.NoError(err)
sessions := sessionManager.GetSessions()
suite.Empty(sessions)
cluster.Close()
sessionManager2 := NewSessionManagerImpl()
channelManager2, err := NewChannelManager(kv, newMockHandler())
suite.NoError(err)
clusterReload := NewClusterImpl(sessionManager2, channelManager2)
defer clusterReload.Close()
addr = "localhost:8081"
info = &NodeInfo{
NodeID: 2,
Address: addr,
}
nodes = []*NodeInfo{info}
err = clusterReload.Startup(ctx, nodes)
suite.NoError(err)
sessions = sessionManager2.GetSessions()
suite.EqualValues(1, len(sessions))
suite.EqualValues(2, sessions[0].info.NodeID)
suite.EqualValues(addr, sessions[0].info.Address)
channels := channelManager2.GetAssignedChannels()
suite.EqualValues(1, len(channels))
suite.EqualValues(2, channels[0].NodeID)
})
suite.Run("loadkv_fails", func() {
defer kv.RemoveWithPrefix("")
metakv := mocks.NewWatchKV(suite.T())
metakv.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, errors.New("failed"))
_, err := NewChannelManager(metakv, newMockHandler())
suite.Error(err)
})
} }
func (suite *ClusterSuite) TestRegister() { func (suite *ClusterSuite) TestRegister() {
kv := suite.kv info := &NodeInfo{NodeID: 1, Address: "addr1"}
suite.Run("register_to_empty_cluster", func() { suite.mockSession.EXPECT().AddSession(mock.Anything).Return().Once()
defer kv.RemoveWithPrefix("") suite.mockChManager.EXPECT().AddNode(mock.Anything).
RunAndReturn(func(nodeID int64) error {
suite.EqualValues(info.NodeID, nodeID)
return nil
}).Once()
ctx, cancel := context.WithCancel(context.TODO()) cluster := NewClusterImpl(suite.mockSession, suite.mockChManager)
defer cancel() err := cluster.Register(info)
sessionManager := NewSessionManagerImpl()
channelManager, err := NewChannelManager(kv, newMockHandler())
suite.NoError(err) suite.NoError(err)
cluster := NewClusterImpl(sessionManager, channelManager)
defer cluster.Close()
addr := "localhost:8080"
err = cluster.Startup(ctx, nil)
suite.NoError(err)
info := &NodeInfo{
NodeID: 1,
Address: addr,
}
err = cluster.Register(info)
suite.NoError(err)
sessions := sessionManager.GetSessions()
suite.EqualValues(1, len(sessions))
suite.EqualValues("localhost:8080", sessions[0].info.Address)
suite.MetricsEqual(metrics.DataCoordNumDataNodes, 1)
})
suite.Run("register_to_empty_cluster_with_buffer_channels", func() {
defer kv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
sessionManager := NewSessionManagerImpl()
channelManager, err := NewChannelManager(kv, newMockHandler())
suite.NoError(err)
err = channelManager.Watch(context.TODO(), &channelMeta{
Name: "ch1",
CollectionID: 0,
})
suite.NoError(err)
cluster := NewClusterImpl(sessionManager, channelManager)
defer cluster.Close()
addr := "localhost:8080"
err = cluster.Startup(ctx, nil)
suite.NoError(err)
info := &NodeInfo{
NodeID: 1,
Address: addr,
}
err = cluster.Register(info)
suite.NoError(err)
bufferChannels := channelManager.GetBufferChannels()
suite.Empty(bufferChannels.Channels)
nodeChannels := channelManager.GetAssignedChannels()
suite.EqualValues(1, len(nodeChannels))
suite.EqualValues(1, nodeChannels[0].NodeID)
suite.EqualValues("ch1", nodeChannels[0].Channels[0].GetName())
suite.MetricsEqual(metrics.DataCoordNumDataNodes, 1)
})
suite.Run("register_and_restart_with_no_channel", func() {
defer kv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
sessionManager := NewSessionManagerImpl()
channelManager, err := NewChannelManager(kv, newMockHandler())
suite.NoError(err)
cluster := NewClusterImpl(sessionManager, channelManager)
addr := "localhost:8080"
err = cluster.Startup(ctx, nil)
suite.NoError(err)
info := &NodeInfo{
NodeID: 1,
Address: addr,
}
err = cluster.Register(info)
suite.NoError(err)
cluster.Close()
sessionManager2 := NewSessionManagerImpl()
channelManager2, err := NewChannelManager(kv, newMockHandler())
suite.NoError(err)
restartCluster := NewClusterImpl(sessionManager2, channelManager2)
defer restartCluster.Close()
channels := channelManager2.GetAssignedChannels()
suite.Empty(channels)
suite.MetricsEqual(metrics.DataCoordNumDataNodes, 1)
})
} }
func (suite *ClusterSuite) TestUnregister() { func (suite *ClusterSuite) TestUnregister() {
kv := suite.kv info := &NodeInfo{NodeID: 1, Address: "addr1"}
suite.Run("remove_node_after_unregister", func() { suite.mockSession.EXPECT().DeleteSession(mock.Anything).Return().Once()
defer kv.RemoveWithPrefix("") suite.mockChManager.EXPECT().DeleteNode(mock.Anything).
RunAndReturn(func(nodeID int64) error {
suite.EqualValues(info.NodeID, nodeID)
return nil
}).Once()
ctx, cancel := context.WithCancel(context.TODO()) cluster := NewClusterImpl(suite.mockSession, suite.mockChManager)
defer cancel() err := cluster.UnRegister(info)
sessionManager := NewSessionManagerImpl()
channelManager, err := NewChannelManager(kv, newMockHandler())
suite.NoError(err) suite.NoError(err)
cluster := NewClusterImpl(sessionManager, channelManager) }
defer cluster.Close()
addr := "localhost:8080"
info := &NodeInfo{
Address: addr,
NodeID: 1,
}
nodes := []*NodeInfo{info}
err = cluster.Startup(ctx, nodes)
suite.NoError(err)
err = cluster.UnRegister(nodes[0])
suite.NoError(err)
sessions := sessionManager.GetSessions()
suite.Empty(sessions)
suite.MetricsEqual(metrics.DataCoordNumDataNodes, 0) func (suite *ClusterSuite) TestWatch() {
var (
ch string = "ch-1"
collectionID UniqueID = 1
)
suite.mockChManager.EXPECT().Watch(mock.Anything, mock.Anything).
RunAndReturn(func(ctx context.Context, channel RWChannel) error {
suite.EqualValues(ch, channel.GetName())
suite.EqualValues(collectionID, channel.GetCollectionID())
return nil
}).Once()
cluster := NewClusterImpl(suite.mockSession, suite.mockChManager)
err := cluster.Watch(context.Background(), ch, collectionID)
suite.NoError(err)
}
func (suite *ClusterSuite) TestFlush() {
suite.mockChManager.EXPECT().Match(mock.Anything, mock.Anything).
RunAndReturn(func(nodeID int64, channel string) bool {
return nodeID != 1
}).Twice()
suite.mockChManager.EXPECT().GetCollectionIDByChannel(mock.Anything).Return(true, 100).Once()
suite.mockSession.EXPECT().Flush(mock.Anything, mock.Anything, mock.Anything).Once()
cluster := NewClusterImpl(suite.mockSession, suite.mockChManager)
err := cluster.Flush(context.Background(), 1, "ch-1", nil)
suite.Error(err)
err = cluster.Flush(context.Background(), 2, "ch-1", nil)
suite.NoError(err)
}
func (suite *ClusterSuite) TestFlushChannels() {
suite.Run("empty channel", func() {
suite.SetupTest()
cluster := NewClusterImpl(suite.mockSession, suite.mockChManager)
err := cluster.FlushChannels(context.Background(), 1, 0, nil)
suite.NoError(err)
}) })
suite.Run("move_channel_to_online_nodes_after_unregister", func() { suite.Run("channel not match with node", func() {
defer kv.RemoveWithPrefix("") suite.SetupTest()
ctx, cancel := context.WithCancel(context.TODO()) suite.mockChManager.EXPECT().Match(mock.Anything, mock.Anything).Return(false).Once()
defer cancel() cluster := NewClusterImpl(suite.mockSession, suite.mockChManager)
err := cluster.FlushChannels(context.Background(), 1, 0, []string{"ch-1", "ch-2"})
sessionManager := NewSessionManagerImpl() suite.Error(err)
channelManager, err := NewChannelManager(kv, newMockHandler())
suite.NoError(err)
cluster := NewClusterImpl(sessionManager, channelManager)
defer cluster.Close()
nodeInfo1 := &NodeInfo{
Address: "localhost:8080",
NodeID: 1,
}
nodeInfo2 := &NodeInfo{
Address: "localhost:8081",
NodeID: 2,
}
nodes := []*NodeInfo{nodeInfo1, nodeInfo2}
err = cluster.Startup(ctx, nodes)
suite.NoError(err)
err = cluster.Watch(ctx, "ch1", 1)
suite.NoError(err)
err = cluster.UnRegister(nodeInfo1)
suite.NoError(err)
channels := channelManager.GetAssignedChannels()
suite.EqualValues(1, len(channels))
suite.EqualValues(2, channels[0].NodeID)
suite.EqualValues(1, len(channels[0].Channels))
suite.EqualValues("ch1", channels[0].Channels[0].GetName())
suite.MetricsEqual(metrics.DataCoordNumDataNodes, 1)
}) })
suite.Run("remove_all_channels_after_unregsiter", func() { suite.Run("channel match with node", func() {
defer kv.RemoveWithPrefix("") suite.SetupTest()
ctx, cancel := context.WithCancel(context.TODO()) channels := []string{"ch-1", "ch-2"}
defer cancel() suite.mockChManager.EXPECT().Match(mock.Anything, mock.Anything).Return(true).Times(len(channels))
suite.mockSession.EXPECT().FlushChannels(mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
mockSessionCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { cluster := NewClusterImpl(suite.mockSession, suite.mockChManager)
return newMockDataNodeClient(1, nil) err := cluster.FlushChannels(context.Background(), 1, 0, channels)
}
sessionManager := NewSessionManagerImpl(withSessionCreator(mockSessionCreator))
channelManager, err := NewChannelManager(kv, newMockHandler())
suite.NoError(err) suite.NoError(err)
cluster := NewClusterImpl(sessionManager, channelManager)
defer cluster.Close()
nodeInfo := &NodeInfo{
Address: "localhost:8080",
NodeID: 1,
}
err = cluster.Startup(ctx, []*NodeInfo{nodeInfo})
suite.NoError(err)
err = cluster.Watch(ctx, "ch_1", 1)
suite.NoError(err)
err = cluster.UnRegister(nodeInfo)
suite.NoError(err)
channels := channelManager.GetAssignedChannels()
suite.Empty(channels)
channel := channelManager.GetBufferChannels()
suite.NotNil(channel)
suite.EqualValues(1, len(channel.Channels))
suite.EqualValues("ch_1", channel.Channels[0].GetName())
suite.MetricsEqual(metrics.DataCoordNumDataNodes, 0)
}) })
} }
func TestCluster(t *testing.T) { func (suite *ClusterSuite) TestImport() {
suite.Run(t, new(ClusterSuite)) suite.mockSession.EXPECT().Import(mock.Anything, mock.Anything, mock.Anything).Return().Once()
} cluster := NewClusterImpl(suite.mockSession, suite.mockChManager)
suite.NotPanics(func() {
func TestWatchIfNeeded(t *testing.T) { cluster.Import(context.Background(), 1, nil)
kv := getWatchKV(t)
defer func() {
kv.RemoveWithPrefix("")
kv.Close()
}()
t.Run("add deplicated channel to cluster", func(t *testing.T) {
defer kv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
mockSessionCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) {
return newMockDataNodeClient(1, nil)
}
sessionManager := NewSessionManagerImpl(withSessionCreator(mockSessionCreator))
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.NoError(t, err)
cluster := NewClusterImpl(sessionManager, channelManager)
defer cluster.Close()
addr := "localhost:8080"
info := &NodeInfo{
Address: addr,
NodeID: 1,
}
err = cluster.Startup(ctx, []*NodeInfo{info})
assert.NoError(t, err)
err = cluster.Watch(ctx, "ch1", 1)
assert.NoError(t, err)
channels := channelManager.GetAssignedChannels()
assert.EqualValues(t, 1, len(channels))
assert.EqualValues(t, "ch1", channels[0].Channels[0].GetName())
})
t.Run("watch channel to empty cluster", func(t *testing.T) {
defer kv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sessionManager := NewSessionManagerImpl()
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.NoError(t, err)
cluster := NewClusterImpl(sessionManager, channelManager)
defer cluster.Close()
err = cluster.Watch(ctx, "ch1", 1)
assert.NoError(t, err)
channels := channelManager.GetAssignedChannels()
assert.Empty(t, channels)
channel := channelManager.GetBufferChannels()
assert.NotNil(t, channel)
assert.EqualValues(t, "ch1", channel.Channels[0].GetName())
}) })
} }
func TestConsistentHashPolicy(t *testing.T) { func (suite *ClusterSuite) TestAddImportSegment() {
kv := getWatchKV(t) suite.Run("channel not fount", func() {
defer func() { suite.SetupTest()
kv.RemoveWithPrefix("") suite.mockChManager.EXPECT().GetNodeIDByChannelName(mock.Anything).Return(false, 0)
kv.Close() cluster := NewClusterImpl(suite.mockSession, suite.mockChManager)
}() resp, err := cluster.AddImportSegment(context.Background(), &datapb.AddImportSegmentRequest{
ChannelName: "ch-1",
})
sessionManager := NewSessionManagerImpl() suite.ErrorIs(err, merr.ErrChannelNotFound)
chash := consistent.New() suite.Nil(resp)
factory := NewConsistentHashChannelPolicyFactory(chash) })
channelManager, err := NewChannelManager(kv, newMockHandler(), withFactory(factory))
assert.NoError(t, err)
cluster := NewClusterImpl(sessionManager, channelManager)
defer cluster.Close()
hash := consistent.New() suite.Run("normal", func() {
hash.Add("1") suite.SetupTest()
hash.Add("2") suite.mockChManager.EXPECT().GetNodeIDByChannelName(mock.Anything).Return(true, 0)
hash.Add("3") suite.mockSession.EXPECT().AddImportSegment(mock.Anything, mock.Anything, mock.Anything).Return(&datapb.AddImportSegmentResponse{}, nil)
nodeInfo1 := &NodeInfo{ cluster := NewClusterImpl(suite.mockSession, suite.mockChManager)
NodeID: 1, resp, err := cluster.AddImportSegment(context.Background(), &datapb.AddImportSegmentRequest{
Address: "localhost:1111", ChannelName: "ch-1",
} })
nodeInfo2 := &NodeInfo{
NodeID: 2,
Address: "localhost:2222",
}
nodeInfo3 := &NodeInfo{
NodeID: 3,
Address: "localhost:3333",
}
err = cluster.Register(nodeInfo1)
assert.NoError(t, err)
err = cluster.Register(nodeInfo2)
assert.NoError(t, err)
err = cluster.Register(nodeInfo3)
assert.NoError(t, err)
channels := []string{"ch1", "ch2", "ch3"} suite.NoError(err)
for _, c := range channels { suite.NotNil(resp)
err = cluster.Watch(context.TODO(), c, 1) })
assert.NoError(t, err)
idstr, err := hash.Get(c)
assert.NoError(t, err)
id, err := deformatNodeID(idstr)
assert.NoError(t, err)
match := channelManager.Match(id, c)
assert.True(t, match)
}
hash.Remove("1")
err = cluster.UnRegister(nodeInfo1)
assert.NoError(t, err)
for _, c := range channels {
idstr, err := hash.Get(c)
assert.NoError(t, err)
id, err := deformatNodeID(idstr)
assert.NoError(t, err)
match := channelManager.Match(id, c)
assert.True(t, match)
}
hash.Remove("2")
err = cluster.UnRegister(nodeInfo2)
assert.NoError(t, err)
for _, c := range channels {
idstr, err := hash.Get(c)
assert.NoError(t, err)
id, err := deformatNodeID(idstr)
assert.NoError(t, err)
match := channelManager.Match(id, c)
assert.True(t, match)
}
hash.Remove("3")
err = cluster.UnRegister(nodeInfo3)
assert.NoError(t, err)
bufferChannels := channelManager.GetBufferChannels()
assert.EqualValues(t, 3, len(bufferChannels.Channels))
}
func TestCluster_Flush(t *testing.T) {
kv := getWatchKV(t)
defer func() {
kv.RemoveWithPrefix("")
kv.Close()
}()
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
sessionManager := NewSessionManagerImpl()
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.NoError(t, err)
cluster := NewClusterImpl(sessionManager, channelManager)
defer cluster.Close()
addr := "localhost:8080"
info := &NodeInfo{
Address: addr,
NodeID: 1,
}
nodes := []*NodeInfo{info}
err = cluster.Startup(ctx, nodes)
assert.NoError(t, err)
err = cluster.Watch(context.Background(), "chan-1", 1)
assert.NoError(t, err)
// flush empty should impact nothing
assert.NotPanics(t, func() {
err := cluster.Flush(context.Background(), 1, "chan-1", []*datapb.SegmentInfo{})
assert.NoError(t, err)
})
// flush not watched channel
assert.NotPanics(t, func() {
err := cluster.Flush(context.Background(), 1, "chan-2", []*datapb.SegmentInfo{{ID: 1}})
assert.Error(t, err)
})
// flush from wrong datanode
assert.NotPanics(t, func() {
err := cluster.Flush(context.Background(), 2, "chan-1", []*datapb.SegmentInfo{{ID: 1}})
assert.Error(t, err)
})
// TODO add a method to verify datanode has flush request after client injection is available
}
func TestCluster_Import(t *testing.T) {
kv := getWatchKV(t)
defer func() {
kv.RemoveWithPrefix("")
kv.Close()
}()
ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond)
defer cancel()
sessionManager := NewSessionManagerImpl()
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.NoError(t, err)
cluster := NewClusterImpl(sessionManager, channelManager)
defer cluster.Close()
addr := "localhost:8080"
info := &NodeInfo{
Address: addr,
NodeID: 1,
}
nodes := []*NodeInfo{info}
err = cluster.Startup(ctx, nodes)
assert.NoError(t, err)
err = cluster.Watch(ctx, "chan-1", 1)
assert.NoError(t, err)
assert.NotPanics(t, func() {
cluster.Import(ctx, 1, &datapb.ImportTaskRequest{})
})
time.Sleep(500 * time.Millisecond)
} }

View File

@ -111,7 +111,7 @@ type compactionPlanHandler struct {
meta CompactionMeta meta CompactionMeta
allocator allocator allocator allocator
chManager *ChannelManager chManager *ChannelManagerImpl
scheduler Scheduler scheduler Scheduler
sessions SessionManager sessions SessionManager
@ -120,7 +120,7 @@ type compactionPlanHandler struct {
stopWg sync.WaitGroup stopWg sync.WaitGroup
} }
func newCompactionPlanHandler(sessions SessionManager, cm *ChannelManager, meta CompactionMeta, allocator allocator, func newCompactionPlanHandler(sessions SessionManager, cm *ChannelManagerImpl, meta CompactionMeta, allocator allocator,
) *compactionPlanHandler { ) *compactionPlanHandler {
return &compactionPlanHandler{ return &compactionPlanHandler{
plans: make(map[int64]*compactionTask), plans: make(map[int64]*compactionTask),

View File

@ -280,7 +280,7 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
type fields struct { type fields struct {
plans map[int64]*compactionTask plans map[int64]*compactionTask
sessions SessionManager sessions SessionManager
chManager *ChannelManager chManager *ChannelManagerImpl
allocatorFactory func() allocator allocatorFactory func() allocator
} }
type args struct { type args struct {
@ -308,7 +308,7 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
}, },
}, },
}, },
chManager: &ChannelManager{ chManager: &ChannelManagerImpl{
store: &ChannelStore{ store: &ChannelStore{
channelsInfo: map[int64]*NodeChannelInfo{ channelsInfo: map[int64]*NodeChannelInfo{
1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: "ch1"}}}, 1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: "ch1"}}},
@ -328,7 +328,7 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
"test exec compaction failed", "test exec compaction failed",
fields{ fields{
plans: map[int64]*compactionTask{}, plans: map[int64]*compactionTask{},
chManager: &ChannelManager{ chManager: &ChannelManagerImpl{
store: &ChannelStore{ store: &ChannelStore{
channelsInfo: map[int64]*NodeChannelInfo{ channelsInfo: map[int64]*NodeChannelInfo{
1: {NodeID: 1, Channels: []RWChannel{}}, 1: {NodeID: 1, Channels: []RWChannel{}},
@ -396,7 +396,7 @@ func Test_compactionPlanHandler_execWithParallels(t *testing.T) {
}, },
}, },
}, },
chManager: &ChannelManager{ chManager: &ChannelManagerImpl{
store: &ChannelStore{ store: &ChannelStore{
channelsInfo: map[int64]*NodeChannelInfo{ channelsInfo: map[int64]*NodeChannelInfo{
1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: "ch1"}}}, 1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: "ch1"}}},
@ -1029,7 +1029,7 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) {
func Test_newCompactionPlanHandler(t *testing.T) { func Test_newCompactionPlanHandler(t *testing.T) {
type args struct { type args struct {
sessions SessionManager sessions SessionManager
cm *ChannelManager cm *ChannelManagerImpl
meta *meta meta *meta
allocator allocator allocator allocator
} }
@ -1042,14 +1042,14 @@ func Test_newCompactionPlanHandler(t *testing.T) {
"test new handler", "test new handler",
args{ args{
&SessionManagerImpl{}, &SessionManagerImpl{},
&ChannelManager{}, &ChannelManagerImpl{},
&meta{}, &meta{},
newMockAllocator(), newMockAllocator(),
}, },
&compactionPlanHandler{ &compactionPlanHandler{
plans: map[int64]*compactionTask{}, plans: map[int64]*compactionTask{},
sessions: &SessionManagerImpl{}, sessions: &SessionManagerImpl{},
chManager: &ChannelManager{}, chManager: &ChannelManagerImpl{},
meta: &meta{}, meta: &meta{},
allocator: newMockAllocator(), allocator: newMockAllocator(),
scheduler: NewCompactionScheduler(), scheduler: NewCompactionScheduler(),

View File

@ -0,0 +1,653 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package datacoord
import (
context "context"
mock "github.com/stretchr/testify/mock"
)
// MockChannelManager is an autogenerated mock type for the ChannelManager type
type MockChannelManager struct {
mock.Mock
}
type MockChannelManager_Expecter struct {
mock *mock.Mock
}
func (_m *MockChannelManager) EXPECT() *MockChannelManager_Expecter {
return &MockChannelManager_Expecter{mock: &_m.Mock}
}
// AddNode provides a mock function with given fields: nodeID
func (_m *MockChannelManager) AddNode(nodeID int64) error {
ret := _m.Called(nodeID)
var r0 error
if rf, ok := ret.Get(0).(func(int64) error); ok {
r0 = rf(nodeID)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockChannelManager_AddNode_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddNode'
type MockChannelManager_AddNode_Call struct {
*mock.Call
}
// AddNode is a helper method to define mock.On call
// - nodeID int64
func (_e *MockChannelManager_Expecter) AddNode(nodeID interface{}) *MockChannelManager_AddNode_Call {
return &MockChannelManager_AddNode_Call{Call: _e.mock.On("AddNode", nodeID)}
}
func (_c *MockChannelManager_AddNode_Call) Run(run func(nodeID int64)) *MockChannelManager_AddNode_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockChannelManager_AddNode_Call) Return(_a0 error) *MockChannelManager_AddNode_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockChannelManager_AddNode_Call) RunAndReturn(run func(int64) error) *MockChannelManager_AddNode_Call {
_c.Call.Return(run)
return _c
}
// Close provides a mock function with given fields:
func (_m *MockChannelManager) Close() {
_m.Called()
}
// MockChannelManager_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'
type MockChannelManager_Close_Call struct {
*mock.Call
}
// Close is a helper method to define mock.On call
func (_e *MockChannelManager_Expecter) Close() *MockChannelManager_Close_Call {
return &MockChannelManager_Close_Call{Call: _e.mock.On("Close")}
}
func (_c *MockChannelManager_Close_Call) Run(run func()) *MockChannelManager_Close_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockChannelManager_Close_Call) Return() *MockChannelManager_Close_Call {
_c.Call.Return()
return _c
}
func (_c *MockChannelManager_Close_Call) RunAndReturn(run func()) *MockChannelManager_Close_Call {
_c.Call.Return(run)
return _c
}
// DeleteNode provides a mock function with given fields: nodeID
func (_m *MockChannelManager) DeleteNode(nodeID int64) error {
ret := _m.Called(nodeID)
var r0 error
if rf, ok := ret.Get(0).(func(int64) error); ok {
r0 = rf(nodeID)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockChannelManager_DeleteNode_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteNode'
type MockChannelManager_DeleteNode_Call struct {
*mock.Call
}
// DeleteNode is a helper method to define mock.On call
// - nodeID int64
func (_e *MockChannelManager_Expecter) DeleteNode(nodeID interface{}) *MockChannelManager_DeleteNode_Call {
return &MockChannelManager_DeleteNode_Call{Call: _e.mock.On("DeleteNode", nodeID)}
}
func (_c *MockChannelManager_DeleteNode_Call) Run(run func(nodeID int64)) *MockChannelManager_DeleteNode_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockChannelManager_DeleteNode_Call) Return(_a0 error) *MockChannelManager_DeleteNode_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockChannelManager_DeleteNode_Call) RunAndReturn(run func(int64) error) *MockChannelManager_DeleteNode_Call {
_c.Call.Return(run)
return _c
}
// FindWatcher provides a mock function with given fields: channel
func (_m *MockChannelManager) FindWatcher(channel string) (int64, error) {
ret := _m.Called(channel)
var r0 int64
var r1 error
if rf, ok := ret.Get(0).(func(string) (int64, error)); ok {
return rf(channel)
}
if rf, ok := ret.Get(0).(func(string) int64); ok {
r0 = rf(channel)
} else {
r0 = ret.Get(0).(int64)
}
if rf, ok := ret.Get(1).(func(string) error); ok {
r1 = rf(channel)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockChannelManager_FindWatcher_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FindWatcher'
type MockChannelManager_FindWatcher_Call struct {
*mock.Call
}
// FindWatcher is a helper method to define mock.On call
// - channel string
func (_e *MockChannelManager_Expecter) FindWatcher(channel interface{}) *MockChannelManager_FindWatcher_Call {
return &MockChannelManager_FindWatcher_Call{Call: _e.mock.On("FindWatcher", channel)}
}
func (_c *MockChannelManager_FindWatcher_Call) Run(run func(channel string)) *MockChannelManager_FindWatcher_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string))
})
return _c
}
func (_c *MockChannelManager_FindWatcher_Call) Return(_a0 int64, _a1 error) *MockChannelManager_FindWatcher_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockChannelManager_FindWatcher_Call) RunAndReturn(run func(string) (int64, error)) *MockChannelManager_FindWatcher_Call {
_c.Call.Return(run)
return _c
}
// GetBufferChannels provides a mock function with given fields:
func (_m *MockChannelManager) GetBufferChannels() *NodeChannelInfo {
ret := _m.Called()
var r0 *NodeChannelInfo
if rf, ok := ret.Get(0).(func() *NodeChannelInfo); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*NodeChannelInfo)
}
}
return r0
}
// MockChannelManager_GetBufferChannels_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetBufferChannels'
type MockChannelManager_GetBufferChannels_Call struct {
*mock.Call
}
// GetBufferChannels is a helper method to define mock.On call
func (_e *MockChannelManager_Expecter) GetBufferChannels() *MockChannelManager_GetBufferChannels_Call {
return &MockChannelManager_GetBufferChannels_Call{Call: _e.mock.On("GetBufferChannels")}
}
func (_c *MockChannelManager_GetBufferChannels_Call) Run(run func()) *MockChannelManager_GetBufferChannels_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockChannelManager_GetBufferChannels_Call) Return(_a0 *NodeChannelInfo) *MockChannelManager_GetBufferChannels_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockChannelManager_GetBufferChannels_Call) RunAndReturn(run func() *NodeChannelInfo) *MockChannelManager_GetBufferChannels_Call {
_c.Call.Return(run)
return _c
}
// GetChannelsByCollectionID provides a mock function with given fields: collectionID
func (_m *MockChannelManager) GetChannelsByCollectionID(collectionID int64) []RWChannel {
ret := _m.Called(collectionID)
var r0 []RWChannel
if rf, ok := ret.Get(0).(func(int64) []RWChannel); ok {
r0 = rf(collectionID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]RWChannel)
}
}
return r0
}
// MockChannelManager_GetChannelsByCollectionID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetChannelsByCollectionID'
type MockChannelManager_GetChannelsByCollectionID_Call struct {
*mock.Call
}
// GetChannelsByCollectionID is a helper method to define mock.On call
// - collectionID int64
func (_e *MockChannelManager_Expecter) GetChannelsByCollectionID(collectionID interface{}) *MockChannelManager_GetChannelsByCollectionID_Call {
return &MockChannelManager_GetChannelsByCollectionID_Call{Call: _e.mock.On("GetChannelsByCollectionID", collectionID)}
}
func (_c *MockChannelManager_GetChannelsByCollectionID_Call) Run(run func(collectionID int64)) *MockChannelManager_GetChannelsByCollectionID_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockChannelManager_GetChannelsByCollectionID_Call) Return(_a0 []RWChannel) *MockChannelManager_GetChannelsByCollectionID_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockChannelManager_GetChannelsByCollectionID_Call) RunAndReturn(run func(int64) []RWChannel) *MockChannelManager_GetChannelsByCollectionID_Call {
_c.Call.Return(run)
return _c
}
// GetCollectionIDByChannel provides a mock function with given fields: channel
func (_m *MockChannelManager) GetCollectionIDByChannel(channel string) (bool, int64) {
ret := _m.Called(channel)
var r0 bool
var r1 int64
if rf, ok := ret.Get(0).(func(string) (bool, int64)); ok {
return rf(channel)
}
if rf, ok := ret.Get(0).(func(string) bool); ok {
r0 = rf(channel)
} else {
r0 = ret.Get(0).(bool)
}
if rf, ok := ret.Get(1).(func(string) int64); ok {
r1 = rf(channel)
} else {
r1 = ret.Get(1).(int64)
}
return r0, r1
}
// MockChannelManager_GetCollectionIDByChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCollectionIDByChannel'
type MockChannelManager_GetCollectionIDByChannel_Call struct {
*mock.Call
}
// GetCollectionIDByChannel is a helper method to define mock.On call
// - channel string
func (_e *MockChannelManager_Expecter) GetCollectionIDByChannel(channel interface{}) *MockChannelManager_GetCollectionIDByChannel_Call {
return &MockChannelManager_GetCollectionIDByChannel_Call{Call: _e.mock.On("GetCollectionIDByChannel", channel)}
}
func (_c *MockChannelManager_GetCollectionIDByChannel_Call) Run(run func(channel string)) *MockChannelManager_GetCollectionIDByChannel_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string))
})
return _c
}
func (_c *MockChannelManager_GetCollectionIDByChannel_Call) Return(_a0 bool, _a1 int64) *MockChannelManager_GetCollectionIDByChannel_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockChannelManager_GetCollectionIDByChannel_Call) RunAndReturn(run func(string) (bool, int64)) *MockChannelManager_GetCollectionIDByChannel_Call {
_c.Call.Return(run)
return _c
}
// GetNodeChannelsByCollectionID provides a mock function with given fields: collectionID
func (_m *MockChannelManager) GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string {
ret := _m.Called(collectionID)
var r0 map[int64][]string
if rf, ok := ret.Get(0).(func(int64) map[int64][]string); ok {
r0 = rf(collectionID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[int64][]string)
}
}
return r0
}
// MockChannelManager_GetNodeChannelsByCollectionID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeChannelsByCollectionID'
type MockChannelManager_GetNodeChannelsByCollectionID_Call struct {
*mock.Call
}
// GetNodeChannelsByCollectionID is a helper method to define mock.On call
// - collectionID int64
func (_e *MockChannelManager_Expecter) GetNodeChannelsByCollectionID(collectionID interface{}) *MockChannelManager_GetNodeChannelsByCollectionID_Call {
return &MockChannelManager_GetNodeChannelsByCollectionID_Call{Call: _e.mock.On("GetNodeChannelsByCollectionID", collectionID)}
}
func (_c *MockChannelManager_GetNodeChannelsByCollectionID_Call) Run(run func(collectionID int64)) *MockChannelManager_GetNodeChannelsByCollectionID_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockChannelManager_GetNodeChannelsByCollectionID_Call) Return(_a0 map[int64][]string) *MockChannelManager_GetNodeChannelsByCollectionID_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockChannelManager_GetNodeChannelsByCollectionID_Call) RunAndReturn(run func(int64) map[int64][]string) *MockChannelManager_GetNodeChannelsByCollectionID_Call {
_c.Call.Return(run)
return _c
}
// GetNodeIDByChannelName provides a mock function with given fields: channel
func (_m *MockChannelManager) GetNodeIDByChannelName(channel string) (bool, int64) {
ret := _m.Called(channel)
var r0 bool
var r1 int64
if rf, ok := ret.Get(0).(func(string) (bool, int64)); ok {
return rf(channel)
}
if rf, ok := ret.Get(0).(func(string) bool); ok {
r0 = rf(channel)
} else {
r0 = ret.Get(0).(bool)
}
if rf, ok := ret.Get(1).(func(string) int64); ok {
r1 = rf(channel)
} else {
r1 = ret.Get(1).(int64)
}
return r0, r1
}
// MockChannelManager_GetNodeIDByChannelName_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeIDByChannelName'
type MockChannelManager_GetNodeIDByChannelName_Call struct {
*mock.Call
}
// GetNodeIDByChannelName is a helper method to define mock.On call
// - channel string
func (_e *MockChannelManager_Expecter) GetNodeIDByChannelName(channel interface{}) *MockChannelManager_GetNodeIDByChannelName_Call {
return &MockChannelManager_GetNodeIDByChannelName_Call{Call: _e.mock.On("GetNodeIDByChannelName", channel)}
}
func (_c *MockChannelManager_GetNodeIDByChannelName_Call) Run(run func(channel string)) *MockChannelManager_GetNodeIDByChannelName_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string))
})
return _c
}
func (_c *MockChannelManager_GetNodeIDByChannelName_Call) Return(_a0 bool, _a1 int64) *MockChannelManager_GetNodeIDByChannelName_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockChannelManager_GetNodeIDByChannelName_Call) RunAndReturn(run func(string) (bool, int64)) *MockChannelManager_GetNodeIDByChannelName_Call {
_c.Call.Return(run)
return _c
}
// Match provides a mock function with given fields: nodeID, channel
func (_m *MockChannelManager) Match(nodeID int64, channel string) bool {
ret := _m.Called(nodeID, channel)
var r0 bool
if rf, ok := ret.Get(0).(func(int64, string) bool); ok {
r0 = rf(nodeID, channel)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// MockChannelManager_Match_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Match'
type MockChannelManager_Match_Call struct {
*mock.Call
}
// Match is a helper method to define mock.On call
// - nodeID int64
// - channel string
func (_e *MockChannelManager_Expecter) Match(nodeID interface{}, channel interface{}) *MockChannelManager_Match_Call {
return &MockChannelManager_Match_Call{Call: _e.mock.On("Match", nodeID, channel)}
}
func (_c *MockChannelManager_Match_Call) Run(run func(nodeID int64, channel string)) *MockChannelManager_Match_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(string))
})
return _c
}
func (_c *MockChannelManager_Match_Call) Return(_a0 bool) *MockChannelManager_Match_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockChannelManager_Match_Call) RunAndReturn(run func(int64, string) bool) *MockChannelManager_Match_Call {
_c.Call.Return(run)
return _c
}
// Release provides a mock function with given fields: nodeID, channelName
func (_m *MockChannelManager) Release(nodeID int64, channelName string) error {
ret := _m.Called(nodeID, channelName)
var r0 error
if rf, ok := ret.Get(0).(func(int64, string) error); ok {
r0 = rf(nodeID, channelName)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockChannelManager_Release_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Release'
type MockChannelManager_Release_Call struct {
*mock.Call
}
// Release is a helper method to define mock.On call
// - nodeID int64
// - channelName string
func (_e *MockChannelManager_Expecter) Release(nodeID interface{}, channelName interface{}) *MockChannelManager_Release_Call {
return &MockChannelManager_Release_Call{Call: _e.mock.On("Release", nodeID, channelName)}
}
func (_c *MockChannelManager_Release_Call) Run(run func(nodeID int64, channelName string)) *MockChannelManager_Release_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(string))
})
return _c
}
func (_c *MockChannelManager_Release_Call) Return(_a0 error) *MockChannelManager_Release_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockChannelManager_Release_Call) RunAndReturn(run func(int64, string) error) *MockChannelManager_Release_Call {
_c.Call.Return(run)
return _c
}
// RemoveChannel provides a mock function with given fields: channelName
func (_m *MockChannelManager) RemoveChannel(channelName string) error {
ret := _m.Called(channelName)
var r0 error
if rf, ok := ret.Get(0).(func(string) error); ok {
r0 = rf(channelName)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockChannelManager_RemoveChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveChannel'
type MockChannelManager_RemoveChannel_Call struct {
*mock.Call
}
// RemoveChannel is a helper method to define mock.On call
// - channelName string
func (_e *MockChannelManager_Expecter) RemoveChannel(channelName interface{}) *MockChannelManager_RemoveChannel_Call {
return &MockChannelManager_RemoveChannel_Call{Call: _e.mock.On("RemoveChannel", channelName)}
}
func (_c *MockChannelManager_RemoveChannel_Call) Run(run func(channelName string)) *MockChannelManager_RemoveChannel_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string))
})
return _c
}
func (_c *MockChannelManager_RemoveChannel_Call) Return(_a0 error) *MockChannelManager_RemoveChannel_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockChannelManager_RemoveChannel_Call) RunAndReturn(run func(string) error) *MockChannelManager_RemoveChannel_Call {
_c.Call.Return(run)
return _c
}
// Startup provides a mock function with given fields: ctx, nodes
func (_m *MockChannelManager) Startup(ctx context.Context, nodes []int64) error {
ret := _m.Called(ctx, nodes)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, []int64) error); ok {
r0 = rf(ctx, nodes)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockChannelManager_Startup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Startup'
type MockChannelManager_Startup_Call struct {
*mock.Call
}
// Startup is a helper method to define mock.On call
// - ctx context.Context
// - nodes []int64
func (_e *MockChannelManager_Expecter) Startup(ctx interface{}, nodes interface{}) *MockChannelManager_Startup_Call {
return &MockChannelManager_Startup_Call{Call: _e.mock.On("Startup", ctx, nodes)}
}
func (_c *MockChannelManager_Startup_Call) Run(run func(ctx context.Context, nodes []int64)) *MockChannelManager_Startup_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].([]int64))
})
return _c
}
func (_c *MockChannelManager_Startup_Call) Return(_a0 error) *MockChannelManager_Startup_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockChannelManager_Startup_Call) RunAndReturn(run func(context.Context, []int64) error) *MockChannelManager_Startup_Call {
_c.Call.Return(run)
return _c
}
// Watch provides a mock function with given fields: ctx, ch
func (_m *MockChannelManager) Watch(ctx context.Context, ch RWChannel) error {
ret := _m.Called(ctx, ch)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, RWChannel) error); ok {
r0 = rf(ctx, ch)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockChannelManager_Watch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Watch'
type MockChannelManager_Watch_Call struct {
*mock.Call
}
// Watch is a helper method to define mock.On call
// - ctx context.Context
// - ch RWChannel
func (_e *MockChannelManager_Expecter) Watch(ctx interface{}, ch interface{}) *MockChannelManager_Watch_Call {
return &MockChannelManager_Watch_Call{Call: _e.mock.On("Watch", ctx, ch)}
}
func (_c *MockChannelManager_Watch_Call) Run(run func(ctx context.Context, ch RWChannel)) *MockChannelManager_Watch_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(RWChannel))
})
return _c
}
func (_c *MockChannelManager_Watch_Call) Return(_a0 error) *MockChannelManager_Watch_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockChannelManager_Watch_Call) RunAndReturn(run func(context.Context, RWChannel) error) *MockChannelManager_Watch_Call {
_c.Call.Return(run)
return _c
}
// NewMockChannelManager creates a new instance of MockChannelManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockChannelManager(t interface {
mock.TestingT
Cleanup(func())
}) *MockChannelManager {
mock := &MockChannelManager{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -117,7 +117,7 @@ type Server struct {
allocator allocator allocator allocator
cluster Cluster cluster Cluster
sessionManager SessionManager sessionManager SessionManager
channelManager *ChannelManager channelManager *ChannelManagerImpl
rootCoordClient types.RootCoordClient rootCoordClient types.RootCoordClient
garbageCollector *garbageCollector garbageCollector *garbageCollector
gcOpt GcOption gcOpt GcOption

View File

@ -3428,7 +3428,7 @@ func TestGetFlushState(t *testing.T) {
collection = int64(0) collection = int64(0)
) )
svr.channelManager = &ChannelManager{ svr.channelManager = &ChannelManagerImpl{
store: &ChannelStore{ store: &ChannelStore{
channelsInfo: map[int64]*NodeChannelInfo{ channelsInfo: map[int64]*NodeChannelInfo{
1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: vchannel, CollectionID: collection}}}, 1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: vchannel, CollectionID: collection}}},
@ -3476,7 +3476,7 @@ func TestGetFlushState(t *testing.T) {
collection = int64(0) collection = int64(0)
) )
svr.channelManager = &ChannelManager{ svr.channelManager = &ChannelManagerImpl{
store: &ChannelStore{ store: &ChannelStore{
channelsInfo: map[int64]*NodeChannelInfo{ channelsInfo: map[int64]*NodeChannelInfo{
1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: vchannel, CollectionID: collection}}}, 1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: vchannel, CollectionID: collection}}},
@ -3524,7 +3524,7 @@ func TestGetFlushState(t *testing.T) {
collection = int64(0) collection = int64(0)
) )
svr.channelManager = &ChannelManager{ svr.channelManager = &ChannelManagerImpl{
store: &ChannelStore{ store: &ChannelStore{
channelsInfo: map[int64]*NodeChannelInfo{ channelsInfo: map[int64]*NodeChannelInfo{
1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: vchannel, CollectionID: collection}}}, 1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: vchannel, CollectionID: collection}}},
@ -3557,7 +3557,7 @@ func TestGetFlushState(t *testing.T) {
collection = int64(0) collection = int64(0)
) )
svr.channelManager = &ChannelManager{ svr.channelManager = &ChannelManagerImpl{
store: &ChannelStore{ store: &ChannelStore{
channelsInfo: map[int64]*NodeChannelInfo{ channelsInfo: map[int64]*NodeChannelInfo{
1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: vchannel, CollectionID: collection}}}, 1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: vchannel, CollectionID: collection}}},
@ -3593,7 +3593,7 @@ func TestGetFlushState(t *testing.T) {
collection = int64(0) collection = int64(0)
) )
svr.channelManager = &ChannelManager{ svr.channelManager = &ChannelManagerImpl{
store: &ChannelStore{ store: &ChannelStore{
channelsInfo: map[int64]*NodeChannelInfo{ channelsInfo: map[int64]*NodeChannelInfo{
1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: vchannel, CollectionID: collection}}}, 1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: vchannel, CollectionID: collection}}},

View File

@ -11,7 +11,9 @@ import (
"github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/testutils"
) )
func TestSessionManagerSuite(t *testing.T) { func TestSessionManagerSuite(t *testing.T) {
@ -19,7 +21,7 @@ func TestSessionManagerSuite(t *testing.T) {
} }
type SessionManagerSuite struct { type SessionManagerSuite struct {
suite.Suite testutils.PromMetricsSuite
dn *mocks.MockDataNodeClient dn *mocks.MockDataNodeClient
@ -34,6 +36,7 @@ func (s *SessionManagerSuite) SetupTest() {
})) }))
s.m.AddSession(&NodeInfo{1000, "addr-1"}) s.m.AddSession(&NodeInfo{1000, "addr-1"})
s.MetricsEqual(metrics.DataCoordNumDataNodes, 1)
} }
func (s *SessionManagerSuite) TestNotifyChannelOperation() { func (s *SessionManagerSuite) TestNotifyChannelOperation() {