From 3e0034bea23b0072d30a0dbe33d038f0ffd70d3a Mon Sep 17 00:00:00 2001 From: jaime Date: Wed, 3 Jul 2024 19:40:11 +0800 Subject: [PATCH] enhance: cherry pick some improved PRs from the master branch (#34391) issue: https://github.com/milvus-io/milvus/issues/33205,https://github.com/milvus-io/milvus/issues/33342 pr: https://github.com/milvus-io/milvus/pull/33530 pr: #33343 pr: #33206 --------- Signed-off-by: jaime Co-authored-by: xiaofanluan Co-authored-by: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> --- internal/datacoord/channel_manager.go | 23 ++----------- internal/datacoord/channel_manager_v2.go | 24 +------------ internal/datacoord/channel_store.go | 23 +++++++++++++ internal/datacoord/channel_store_v2.go | 25 +++++++++++--- internal/datacoord/mock_channel_store.go | 44 ++++++++++++++++++++++++ internal/rootcoord/broker.go | 10 +----- internal/rootcoord/mock_test.go | 5 --- 7 files changed, 93 insertions(+), 61 deletions(-) diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 42eda97490..154d1bf9be 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -494,17 +494,9 @@ func (c *ChannelManagerImpl) GetBufferChannels() *NodeChannelInfo { // GetNodeChannelsByCollectionID gets all node channels map of the collection func (c *ChannelManagerImpl) GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string { - nodeChs := make(map[UniqueID][]string) - for _, nodeChannels := range c.GetAssignedChannels() { - var channelNames []string - for name, ch := range nodeChannels.Channels { - if ch.GetCollectionID() == collectionID { - channelNames = append(channelNames, name) - } - } - nodeChs[nodeChannels.NodeID] = channelNames - } - return nodeChs + c.mu.RLock() + defer c.mu.RUnlock() + return c.store.GetNodeChannelsByCollectionID(collectionID) } // Get all channels belong to the collection @@ -891,15 +883,6 @@ func (c *ChannelManagerImpl) GetCollectionIDByChannel(channelName string) (bool, return false, 0 } -func (c *ChannelManagerImpl) GetNodeIDByChannelName(channelName string) (UniqueID, bool) { - for _, nodeChannel := range c.GetAssignedChannels() { - if _, ok := nodeChannel.Channels[channelName]; ok { - return nodeChannel.NodeID, true - } - } - return 0, false -} - func (c *ChannelManagerImpl) GetChannel(nodeID int64, channelName string) (RWChannel, bool) { c.mu.RLock() defer c.mu.RUnlock() diff --git a/internal/datacoord/channel_manager_v2.go b/internal/datacoord/channel_manager_v2.go index 63e146cb1a..f9c11beff3 100644 --- a/internal/datacoord/channel_manager_v2.go +++ b/internal/datacoord/channel_manager_v2.go @@ -49,7 +49,6 @@ type ChannelManager interface { FindWatcher(channel string) (UniqueID, error) GetChannel(nodeID int64, channel string) (RWChannel, bool) - GetNodeIDByChannelName(channel string) (int64, bool) GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string GetChannelsByCollectionID(collectionID int64) []RWChannel GetChannelNamesByCollectionID(collectionID int64) []string @@ -352,31 +351,10 @@ func (m *ChannelManagerImplV2) GetChannel(nodeID int64, channelName string) (RWC return nil, false } -func (m *ChannelManagerImplV2) GetNodeIDByChannelName(channel string) (int64, bool) { - m.mu.RLock() - defer m.mu.RUnlock() - nodeChannels := m.store.GetNodeChannelsBy( - WithoutBufferNode(), - WithChannelName(channel)) - - if len(nodeChannels) > 0 { - return nodeChannels[0].NodeID, true - } - - return 0, false -} - func (m *ChannelManagerImplV2) GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string { m.mu.RLock() defer m.mu.RUnlock() - nodeChs := make(map[UniqueID][]string) - nodeChannels := m.store.GetNodeChannelsBy( - WithoutBufferNode(), - WithCollectionIDV2(collectionID)) - lo.ForEach(nodeChannels, func(info *NodeChannelInfo, _ int) { - nodeChs[info.NodeID] = lo.Keys(info.Channels) - }) - return nodeChs + return m.store.GetNodeChannelsByCollectionID(collectionID) } func (m *ChannelManagerImplV2) GetChannelsByCollectionID(collectionID int64) []RWChannel { diff --git a/internal/datacoord/channel_store.go b/internal/datacoord/channel_store.go index 2da7137c34..66ae216fec 100644 --- a/internal/datacoord/channel_store.go +++ b/internal/datacoord/channel_store.go @@ -38,6 +38,8 @@ import ( ) // ROChannelStore is a read only channel store for channels and nodes. +// +//go:generate mockery --name=ROChannelStore --structname=ROChannelStore --output=./ --filename=mock_ro_channel_store.go --with-expecter type ROChannelStore interface { // GetNode returns the channel info of a specific node. // Returns nil if the node doesn't belong to the cluster @@ -53,12 +55,16 @@ type ROChannelStore interface { GetNodes() []int64 // GetNodeChannelCount GetNodeChannelCount(nodeID int64) int + // GetNodeChannels for given collection + GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string // GetNodeChannelsBy used by channel_store_v2 and channel_manager_v2 only GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo } // RWChannelStore is the read write channel store for channels and nodes. +// +//go:generate mockery --name=RWChannelStore --structname=RWChannelStore --output=./ --filename=mock_channel_store.go --with-expecter type RWChannelStore interface { ROChannelStore // Reload restores the buffer channels and node-channels mapping form kv. @@ -463,6 +469,23 @@ func (c *ChannelStore) GetNodesChannels() []*NodeChannelInfo { return ret } +func (c *ChannelStore) GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string { + nodeChs := make(map[UniqueID][]string) + for id, info := range c.channelsInfo { + if id == bufferID { + continue + } + var channelNames []string + for name, ch := range info.Channels { + if ch.GetCollectionID() == collectionID { + channelNames = append(channelNames, name) + } + } + nodeChs[id] = channelNames + } + return nodeChs +} + // GetBufferChannelInfo returns all unassigned channels. func (c *ChannelStore) GetBufferChannelInfo() *NodeChannelInfo { if info, ok := c.channelsInfo[bufferID]; ok { diff --git a/internal/datacoord/channel_store_v2.go b/internal/datacoord/channel_store_v2.go index 5ad2d71b92..b91d114d4d 100644 --- a/internal/datacoord/channel_store_v2.go +++ b/internal/datacoord/channel_store_v2.go @@ -367,7 +367,7 @@ func WithChannelStates(states ...ChannelState) ChannelSelector { } func (c *StateChannelStore) GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo { - nodeChannels := make(map[int64]*NodeChannelInfo) + var nodeChannels []*NodeChannelInfo for nodeID, cInfo := range c.channelsInfo { if nodeSelector(nodeID) { selected := make(map[string]RWChannel) @@ -383,13 +383,13 @@ func (c *StateChannelStore) GetNodeChannelsBy(nodeSelector NodeSelector, channel selected[chName] = channel } } - nodeChannels[nodeID] = &NodeChannelInfo{ + nodeChannels = append(nodeChannels, &NodeChannelInfo{ NodeID: nodeID, Channels: selected, - } + }) } } - return lo.Values(nodeChannels) + return nodeChannels } func (c *StateChannelStore) GetNodesChannels() []*NodeChannelInfo { @@ -402,6 +402,23 @@ func (c *StateChannelStore) GetNodesChannels() []*NodeChannelInfo { return ret } +func (c *StateChannelStore) GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string { + nodeChs := make(map[UniqueID][]string) + for id, info := range c.channelsInfo { + if id == bufferID { + continue + } + var channelNames []string + for name, ch := range info.Channels { + if ch.GetCollectionID() == collectionID { + channelNames = append(channelNames, name) + } + } + nodeChs[id] = channelNames + } + return nodeChs +} + func (c *StateChannelStore) GetBufferChannelInfo() *NodeChannelInfo { return c.GetNode(bufferID) } diff --git a/internal/datacoord/mock_channel_store.go b/internal/datacoord/mock_channel_store.go index e0e469fba7..fc7cb51ef3 100644 --- a/internal/datacoord/mock_channel_store.go +++ b/internal/datacoord/mock_channel_store.go @@ -179,6 +179,50 @@ func (_c *MockRWChannelStore_GetNodeChannelCount_Call) RunAndReturn(run func(int return _c } +// GetNodeChannelsByCollectionID provides a mock function with given fields: collectionID +func (_m *MockRWChannelStore) GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string { + ret := _m.Called(collectionID) + + var r0 map[int64][]string + if rf, ok := ret.Get(0).(func(int64) map[int64][]string); ok { + r0 = rf(collectionID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[int64][]string) + } + } + + return r0 +} + +// MockRWChannelStore_GetNodeChannelsByCollectionID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeChannelsByCollectionID' +type MockRWChannelStore_GetNodeChannelsByCollectionID_Call struct { + *mock.Call +} + +// GetNodeChannelsByCollectionID is a helper method to define mock.On call +// - collectionID int64 +func (_e *MockRWChannelStore_Expecter) GetNodeChannelsByCollectionID(collectionID interface{}) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { + return &MockRWChannelStore_GetNodeChannelsByCollectionID_Call{Call: _e.mock.On("GetNodeChannelsByCollectionID", collectionID)} +} + +func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) Run(run func(collectionID int64)) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) Return(_a0 map[int64][]string) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) RunAndReturn(run func(int64) map[int64][]string) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { + _c.Call.Return(run) + return _c +} + // GetNodeChannelsBy provides a mock function with given fields: nodeSelector, channelSelectors func (_m *MockRWChannelStore) GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo { _va := make([]interface{}, len(channelSelectors)) diff --git a/internal/rootcoord/broker.go b/internal/rootcoord/broker.go index c4b908aede..edd3bc0525 100644 --- a/internal/rootcoord/broker.go +++ b/internal/rootcoord/broker.go @@ -58,9 +58,7 @@ type Broker interface { GcConfirm(ctx context.Context, collectionID, partitionID UniqueID) bool DropCollectionIndex(ctx context.Context, collID UniqueID, partIDs []UniqueID) error - GetSegmentIndexState(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error) - DescribeIndex(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) - + // notify observer to clean their meta cache BroadcastAlteredCollection(ctx context.Context, req *milvuspb.AlterCollectionRequest) error } @@ -270,12 +268,6 @@ func (b *ServerBroker) BroadcastAlteredCollection(ctx context.Context, req *milv return nil } -func (b *ServerBroker) DescribeIndex(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) { - return b.s.dataCoord.DescribeIndex(ctx, &indexpb.DescribeIndexRequest{ - CollectionID: colID, - }) -} - func (b *ServerBroker) GcConfirm(ctx context.Context, collectionID, partitionID UniqueID) bool { log := log.Ctx(ctx).With(zap.Int64("collection", collectionID), zap.Int64("partition", partitionID)) diff --git a/internal/rootcoord/mock_test.go b/internal/rootcoord/mock_test.go index 1828e2763b..e0eb4db1d8 100644 --- a/internal/rootcoord/mock_test.go +++ b/internal/rootcoord/mock_test.go @@ -887,7 +887,6 @@ type mockBroker struct { FlushFunc func(ctx context.Context, cID int64, segIDs []int64) error DropCollectionIndexFunc func(ctx context.Context, collID UniqueID, partIDs []UniqueID) error - DescribeIndexFunc func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) GetSegmentIndexStateFunc func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error) BroadcastAlteredCollectionFunc func(ctx context.Context, req *milvuspb.AlterCollectionRequest) error @@ -923,10 +922,6 @@ func (b mockBroker) DropCollectionIndex(ctx context.Context, collID UniqueID, pa return b.DropCollectionIndexFunc(ctx, collID, partIDs) } -func (b mockBroker) DescribeIndex(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) { - return b.DescribeIndexFunc(ctx, colID) -} - func (b mockBroker) GetSegmentIndexState(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error) { return b.GetSegmentIndexStateFunc(ctx, collID, indexName, segIDs) }