mirror of https://github.com/milvus-io/milvus.git
enhance: cherry pick some improved PRs from the master branch (#34391)
issue: https://github.com/milvus-io/milvus/issues/33205,https://github.com/milvus-io/milvus/issues/33342 pr: https://github.com/milvus-io/milvus/pull/33530 pr: #33343 pr: #33206 --------- Signed-off-by: jaime <yun.zhang@zilliz.com> Co-authored-by: xiaofanluan <xiaofan.luan@zilliz.com> Co-authored-by: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com>pull/34293/head
parent
07daa8f12b
commit
3e0034bea2
|
@ -494,17 +494,9 @@ func (c *ChannelManagerImpl) GetBufferChannels() *NodeChannelInfo {
|
|||
|
||||
// GetNodeChannelsByCollectionID gets all node channels map of the collection
|
||||
func (c *ChannelManagerImpl) GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string {
|
||||
nodeChs := make(map[UniqueID][]string)
|
||||
for _, nodeChannels := range c.GetAssignedChannels() {
|
||||
var channelNames []string
|
||||
for name, ch := range nodeChannels.Channels {
|
||||
if ch.GetCollectionID() == collectionID {
|
||||
channelNames = append(channelNames, name)
|
||||
}
|
||||
}
|
||||
nodeChs[nodeChannels.NodeID] = channelNames
|
||||
}
|
||||
return nodeChs
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
return c.store.GetNodeChannelsByCollectionID(collectionID)
|
||||
}
|
||||
|
||||
// Get all channels belong to the collection
|
||||
|
@ -891,15 +883,6 @@ func (c *ChannelManagerImpl) GetCollectionIDByChannel(channelName string) (bool,
|
|||
return false, 0
|
||||
}
|
||||
|
||||
func (c *ChannelManagerImpl) GetNodeIDByChannelName(channelName string) (UniqueID, bool) {
|
||||
for _, nodeChannel := range c.GetAssignedChannels() {
|
||||
if _, ok := nodeChannel.Channels[channelName]; ok {
|
||||
return nodeChannel.NodeID, true
|
||||
}
|
||||
}
|
||||
return 0, false
|
||||
}
|
||||
|
||||
func (c *ChannelManagerImpl) GetChannel(nodeID int64, channelName string) (RWChannel, bool) {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
|
|
@ -49,7 +49,6 @@ type ChannelManager interface {
|
|||
FindWatcher(channel string) (UniqueID, error)
|
||||
|
||||
GetChannel(nodeID int64, channel string) (RWChannel, bool)
|
||||
GetNodeIDByChannelName(channel string) (int64, bool)
|
||||
GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string
|
||||
GetChannelsByCollectionID(collectionID int64) []RWChannel
|
||||
GetChannelNamesByCollectionID(collectionID int64) []string
|
||||
|
@ -352,31 +351,10 @@ func (m *ChannelManagerImplV2) GetChannel(nodeID int64, channelName string) (RWC
|
|||
return nil, false
|
||||
}
|
||||
|
||||
func (m *ChannelManagerImplV2) GetNodeIDByChannelName(channel string) (int64, bool) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
nodeChannels := m.store.GetNodeChannelsBy(
|
||||
WithoutBufferNode(),
|
||||
WithChannelName(channel))
|
||||
|
||||
if len(nodeChannels) > 0 {
|
||||
return nodeChannels[0].NodeID, true
|
||||
}
|
||||
|
||||
return 0, false
|
||||
}
|
||||
|
||||
func (m *ChannelManagerImplV2) GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
nodeChs := make(map[UniqueID][]string)
|
||||
nodeChannels := m.store.GetNodeChannelsBy(
|
||||
WithoutBufferNode(),
|
||||
WithCollectionIDV2(collectionID))
|
||||
lo.ForEach(nodeChannels, func(info *NodeChannelInfo, _ int) {
|
||||
nodeChs[info.NodeID] = lo.Keys(info.Channels)
|
||||
})
|
||||
return nodeChs
|
||||
return m.store.GetNodeChannelsByCollectionID(collectionID)
|
||||
}
|
||||
|
||||
func (m *ChannelManagerImplV2) GetChannelsByCollectionID(collectionID int64) []RWChannel {
|
||||
|
|
|
@ -38,6 +38,8 @@ import (
|
|||
)
|
||||
|
||||
// ROChannelStore is a read only channel store for channels and nodes.
|
||||
//
|
||||
//go:generate mockery --name=ROChannelStore --structname=ROChannelStore --output=./ --filename=mock_ro_channel_store.go --with-expecter
|
||||
type ROChannelStore interface {
|
||||
// GetNode returns the channel info of a specific node.
|
||||
// Returns nil if the node doesn't belong to the cluster
|
||||
|
@ -53,12 +55,16 @@ type ROChannelStore interface {
|
|||
GetNodes() []int64
|
||||
// GetNodeChannelCount
|
||||
GetNodeChannelCount(nodeID int64) int
|
||||
// GetNodeChannels for given collection
|
||||
GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string
|
||||
|
||||
// GetNodeChannelsBy used by channel_store_v2 and channel_manager_v2 only
|
||||
GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo
|
||||
}
|
||||
|
||||
// RWChannelStore is the read write channel store for channels and nodes.
|
||||
//
|
||||
//go:generate mockery --name=RWChannelStore --structname=RWChannelStore --output=./ --filename=mock_channel_store.go --with-expecter
|
||||
type RWChannelStore interface {
|
||||
ROChannelStore
|
||||
// Reload restores the buffer channels and node-channels mapping form kv.
|
||||
|
@ -463,6 +469,23 @@ func (c *ChannelStore) GetNodesChannels() []*NodeChannelInfo {
|
|||
return ret
|
||||
}
|
||||
|
||||
func (c *ChannelStore) GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string {
|
||||
nodeChs := make(map[UniqueID][]string)
|
||||
for id, info := range c.channelsInfo {
|
||||
if id == bufferID {
|
||||
continue
|
||||
}
|
||||
var channelNames []string
|
||||
for name, ch := range info.Channels {
|
||||
if ch.GetCollectionID() == collectionID {
|
||||
channelNames = append(channelNames, name)
|
||||
}
|
||||
}
|
||||
nodeChs[id] = channelNames
|
||||
}
|
||||
return nodeChs
|
||||
}
|
||||
|
||||
// GetBufferChannelInfo returns all unassigned channels.
|
||||
func (c *ChannelStore) GetBufferChannelInfo() *NodeChannelInfo {
|
||||
if info, ok := c.channelsInfo[bufferID]; ok {
|
||||
|
|
|
@ -367,7 +367,7 @@ func WithChannelStates(states ...ChannelState) ChannelSelector {
|
|||
}
|
||||
|
||||
func (c *StateChannelStore) GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo {
|
||||
nodeChannels := make(map[int64]*NodeChannelInfo)
|
||||
var nodeChannels []*NodeChannelInfo
|
||||
for nodeID, cInfo := range c.channelsInfo {
|
||||
if nodeSelector(nodeID) {
|
||||
selected := make(map[string]RWChannel)
|
||||
|
@ -383,13 +383,13 @@ func (c *StateChannelStore) GetNodeChannelsBy(nodeSelector NodeSelector, channel
|
|||
selected[chName] = channel
|
||||
}
|
||||
}
|
||||
nodeChannels[nodeID] = &NodeChannelInfo{
|
||||
nodeChannels = append(nodeChannels, &NodeChannelInfo{
|
||||
NodeID: nodeID,
|
||||
Channels: selected,
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
return lo.Values(nodeChannels)
|
||||
return nodeChannels
|
||||
}
|
||||
|
||||
func (c *StateChannelStore) GetNodesChannels() []*NodeChannelInfo {
|
||||
|
@ -402,6 +402,23 @@ func (c *StateChannelStore) GetNodesChannels() []*NodeChannelInfo {
|
|||
return ret
|
||||
}
|
||||
|
||||
func (c *StateChannelStore) GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string {
|
||||
nodeChs := make(map[UniqueID][]string)
|
||||
for id, info := range c.channelsInfo {
|
||||
if id == bufferID {
|
||||
continue
|
||||
}
|
||||
var channelNames []string
|
||||
for name, ch := range info.Channels {
|
||||
if ch.GetCollectionID() == collectionID {
|
||||
channelNames = append(channelNames, name)
|
||||
}
|
||||
}
|
||||
nodeChs[id] = channelNames
|
||||
}
|
||||
return nodeChs
|
||||
}
|
||||
|
||||
func (c *StateChannelStore) GetBufferChannelInfo() *NodeChannelInfo {
|
||||
return c.GetNode(bufferID)
|
||||
}
|
||||
|
|
|
@ -179,6 +179,50 @@ func (_c *MockRWChannelStore_GetNodeChannelCount_Call) RunAndReturn(run func(int
|
|||
return _c
|
||||
}
|
||||
|
||||
// GetNodeChannelsByCollectionID provides a mock function with given fields: collectionID
|
||||
func (_m *MockRWChannelStore) GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string {
|
||||
ret := _m.Called(collectionID)
|
||||
|
||||
var r0 map[int64][]string
|
||||
if rf, ok := ret.Get(0).(func(int64) map[int64][]string); ok {
|
||||
r0 = rf(collectionID)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(map[int64][]string)
|
||||
}
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// MockRWChannelStore_GetNodeChannelsByCollectionID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeChannelsByCollectionID'
|
||||
type MockRWChannelStore_GetNodeChannelsByCollectionID_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// GetNodeChannelsByCollectionID is a helper method to define mock.On call
|
||||
// - collectionID int64
|
||||
func (_e *MockRWChannelStore_Expecter) GetNodeChannelsByCollectionID(collectionID interface{}) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call {
|
||||
return &MockRWChannelStore_GetNodeChannelsByCollectionID_Call{Call: _e.mock.On("GetNodeChannelsByCollectionID", collectionID)}
|
||||
}
|
||||
|
||||
func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) Run(run func(collectionID int64)) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(int64))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) Return(_a0 map[int64][]string) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call {
|
||||
_c.Call.Return(_a0)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) RunAndReturn(run func(int64) map[int64][]string) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// GetNodeChannelsBy provides a mock function with given fields: nodeSelector, channelSelectors
|
||||
func (_m *MockRWChannelStore) GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo {
|
||||
_va := make([]interface{}, len(channelSelectors))
|
||||
|
|
|
@ -58,9 +58,7 @@ type Broker interface {
|
|||
GcConfirm(ctx context.Context, collectionID, partitionID UniqueID) bool
|
||||
|
||||
DropCollectionIndex(ctx context.Context, collID UniqueID, partIDs []UniqueID) error
|
||||
GetSegmentIndexState(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error)
|
||||
DescribeIndex(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error)
|
||||
|
||||
// notify observer to clean their meta cache
|
||||
BroadcastAlteredCollection(ctx context.Context, req *milvuspb.AlterCollectionRequest) error
|
||||
}
|
||||
|
||||
|
@ -270,12 +268,6 @@ func (b *ServerBroker) BroadcastAlteredCollection(ctx context.Context, req *milv
|
|||
return nil
|
||||
}
|
||||
|
||||
func (b *ServerBroker) DescribeIndex(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) {
|
||||
return b.s.dataCoord.DescribeIndex(ctx, &indexpb.DescribeIndexRequest{
|
||||
CollectionID: colID,
|
||||
})
|
||||
}
|
||||
|
||||
func (b *ServerBroker) GcConfirm(ctx context.Context, collectionID, partitionID UniqueID) bool {
|
||||
log := log.Ctx(ctx).With(zap.Int64("collection", collectionID), zap.Int64("partition", partitionID))
|
||||
|
||||
|
|
|
@ -887,7 +887,6 @@ type mockBroker struct {
|
|||
FlushFunc func(ctx context.Context, cID int64, segIDs []int64) error
|
||||
|
||||
DropCollectionIndexFunc func(ctx context.Context, collID UniqueID, partIDs []UniqueID) error
|
||||
DescribeIndexFunc func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error)
|
||||
GetSegmentIndexStateFunc func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error)
|
||||
|
||||
BroadcastAlteredCollectionFunc func(ctx context.Context, req *milvuspb.AlterCollectionRequest) error
|
||||
|
@ -923,10 +922,6 @@ func (b mockBroker) DropCollectionIndex(ctx context.Context, collID UniqueID, pa
|
|||
return b.DropCollectionIndexFunc(ctx, collID, partIDs)
|
||||
}
|
||||
|
||||
func (b mockBroker) DescribeIndex(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) {
|
||||
return b.DescribeIndexFunc(ctx, colID)
|
||||
}
|
||||
|
||||
func (b mockBroker) GetSegmentIndexState(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error) {
|
||||
return b.GetSegmentIndexStateFunc(ctx, collID, indexName, segIDs)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue