fix: reduce redundant map operations in datacoord (#33343)

More refactories will be added.
issue: #33342

Signed-off-by: yiwangdr <yiwangdr@gmail.com>
pull/33369/head
yiwangdr 2024-05-23 21:47:40 -07:00 committed by GitHub
parent 5cdc6ae489
commit e895cfed84
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 95 additions and 50 deletions

View File

@ -494,17 +494,9 @@ func (c *ChannelManagerImpl) GetBufferChannels() *NodeChannelInfo {
// GetNodeChannelsByCollectionID gets all node channels map of the collection
func (c *ChannelManagerImpl) GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string {
nodeChs := make(map[UniqueID][]string)
for _, nodeChannels := range c.GetAssignedChannels() {
var channelNames []string
for name, ch := range nodeChannels.Channels {
if ch.GetCollectionID() == collectionID {
channelNames = append(channelNames, name)
}
}
nodeChs[nodeChannels.NodeID] = channelNames
}
return nodeChs
c.mu.RLock()
defer c.mu.RUnlock()
return c.store.GetNodeChannelsByCollectionID(collectionID)
}
// Get all channels belong to the collection
@ -891,15 +883,6 @@ func (c *ChannelManagerImpl) GetCollectionIDByChannel(channelName string) (bool,
return false, 0
}
func (c *ChannelManagerImpl) GetNodeIDByChannelName(channelName string) (UniqueID, bool) {
for _, nodeChannel := range c.GetAssignedChannels() {
if _, ok := nodeChannel.Channels[channelName]; ok {
return nodeChannel.NodeID, true
}
}
return 0, false
}
func (c *ChannelManagerImpl) GetChannel(nodeID int64, channelName string) (RWChannel, bool) {
c.mu.RLock()
defer c.mu.RUnlock()

View File

@ -48,7 +48,6 @@ type ChannelManager interface {
FindWatcher(channel string) (UniqueID, error)
GetChannel(nodeID int64, channel string) (RWChannel, bool)
GetNodeIDByChannelName(channel string) (int64, bool)
GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string
GetChannelsByCollectionID(collectionID int64) []RWChannel
GetChannelNamesByCollectionID(collectionID int64) []string
@ -351,31 +350,10 @@ func (m *ChannelManagerImplV2) GetChannel(nodeID int64, channelName string) (RWC
return nil, false
}
func (m *ChannelManagerImplV2) GetNodeIDByChannelName(channel string) (int64, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
nodeChannels := m.store.GetNodeChannelsBy(
WithoutBufferNode(),
WithChannelName(channel))
if len(nodeChannels) > 0 {
return nodeChannels[0].NodeID, true
}
return 0, false
}
func (m *ChannelManagerImplV2) GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string {
m.mu.RLock()
defer m.mu.RUnlock()
nodeChs := make(map[UniqueID][]string)
nodeChannels := m.store.GetNodeChannelsBy(
WithoutBufferNode(),
WithCollectionIDV2(collectionID))
lo.ForEach(nodeChannels, func(info *NodeChannelInfo, _ int) {
nodeChs[info.NodeID] = lo.Keys(info.Channels)
})
return nodeChs
return m.store.GetNodeChannelsByCollectionID(collectionID)
}
func (m *ChannelManagerImplV2) GetChannelsByCollectionID(collectionID int64) []RWChannel {

View File

@ -37,6 +37,8 @@ import (
)
// ROChannelStore is a read only channel store for channels and nodes.
//
//go:generate mockery --name=ROChannelStore --structname=ROChannelStore --output=./ --filename=mock_ro_channel_store.go --with-expecter
type ROChannelStore interface {
// GetNode returns the channel info of a specific node.
// Returns nil if the node doesn't belong to the cluster
@ -52,12 +54,16 @@ type ROChannelStore interface {
GetNodes() []int64
// GetNodeChannelCount
GetNodeChannelCount(nodeID int64) int
// GetNodeChannels for given collection
GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string
// GetNodeChannelsBy used by channel_store_v2 and channel_manager_v2 only
GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo
}
// RWChannelStore is the read write channel store for channels and nodes.
//
//go:generate mockery --name=RWChannelStore --structname=RWChannelStore --output=./ --filename=mock_channel_store.go --with-expecter
type RWChannelStore interface {
ROChannelStore
// Reload restores the buffer channels and node-channels mapping form kv.
@ -458,6 +464,23 @@ func (c *ChannelStore) GetNodesChannels() []*NodeChannelInfo {
return ret
}
func (c *ChannelStore) GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string {
nodeChs := make(map[UniqueID][]string)
for id, info := range c.channelsInfo {
if id == bufferID {
continue
}
var channelNames []string
for name, ch := range info.Channels {
if ch.GetCollectionID() == collectionID {
channelNames = append(channelNames, name)
}
}
nodeChs[id] = channelNames
}
return nodeChs
}
// GetBufferChannelInfo returns all unassigned channels.
func (c *ChannelStore) GetBufferChannelInfo() *NodeChannelInfo {
if info, ok := c.channelsInfo[bufferID]; ok {

View File

@ -366,7 +366,7 @@ func WithChannelStates(states ...ChannelState) ChannelSelector {
}
func (c *StateChannelStore) GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo {
nodeChannels := make(map[int64]*NodeChannelInfo)
var nodeChannels []*NodeChannelInfo
for nodeID, cInfo := range c.channelsInfo {
if nodeSelector(nodeID) {
selected := make(map[string]RWChannel)
@ -382,13 +382,13 @@ func (c *StateChannelStore) GetNodeChannelsBy(nodeSelector NodeSelector, channel
selected[chName] = channel
}
}
nodeChannels[nodeID] = &NodeChannelInfo{
nodeChannels = append(nodeChannels, &NodeChannelInfo{
NodeID: nodeID,
Channels: selected,
}
})
}
}
return lo.Values(nodeChannels)
return nodeChannels
}
func (c *StateChannelStore) GetNodesChannels() []*NodeChannelInfo {
@ -401,6 +401,23 @@ func (c *StateChannelStore) GetNodesChannels() []*NodeChannelInfo {
return ret
}
func (c *StateChannelStore) GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string {
nodeChs := make(map[UniqueID][]string)
for id, info := range c.channelsInfo {
if id == bufferID {
continue
}
var channelNames []string
for name, ch := range info.Channels {
if ch.GetCollectionID() == collectionID {
channelNames = append(channelNames, name)
}
}
nodeChs[id] = channelNames
}
return nodeChs
}
func (c *StateChannelStore) GetBufferChannelInfo() *NodeChannelInfo {
return c.GetNode(bufferID)
}

View File

@ -179,6 +179,50 @@ func (_c *MockRWChannelStore_GetNodeChannelCount_Call) RunAndReturn(run func(int
return _c
}
// GetNodeChannelsByCollectionID provides a mock function with given fields: collectionID
func (_m *MockRWChannelStore) GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string {
ret := _m.Called(collectionID)
var r0 map[int64][]string
if rf, ok := ret.Get(0).(func(int64) map[int64][]string); ok {
r0 = rf(collectionID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[int64][]string)
}
}
return r0
}
// MockRWChannelStore_GetNodeChannelsByCollectionID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeChannelsByCollectionID'
type MockRWChannelStore_GetNodeChannelsByCollectionID_Call struct {
*mock.Call
}
// GetNodeChannelsByCollectionID is a helper method to define mock.On call
// - collectionID int64
func (_e *MockRWChannelStore_Expecter) GetNodeChannelsByCollectionID(collectionID interface{}) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call {
return &MockRWChannelStore_GetNodeChannelsByCollectionID_Call{Call: _e.mock.On("GetNodeChannelsByCollectionID", collectionID)}
}
func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) Run(run func(collectionID int64)) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) Return(_a0 map[int64][]string) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) RunAndReturn(run func(int64) map[int64][]string) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call {
_c.Call.Return(run)
return _c
}
// GetNodeChannelsBy provides a mock function with given fields: nodeSelector, channelSelectors
func (_m *MockRWChannelStore) GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo {
_va := make([]interface{}, len(channelSelectors))

View File

@ -102,7 +102,7 @@ func (s *SegmentsInfo) GetSegmentsBySelector(filters ...SegmentFilter) []*Segmen
filter.AddFilter(criterion)
}
var result []*SegmentInfo
var candidates []*SegmentInfo
var candidates map[int64]*SegmentInfo
// apply criterion
switch {
case criterion.collectionID > 0:
@ -110,9 +110,9 @@ func (s *SegmentsInfo) GetSegmentsBySelector(filters ...SegmentFilter) []*Segmen
if !ok {
return nil
}
candidates = lo.Values(collSegments.segments)
candidates = collSegments.segments
default:
candidates = lo.Values(s.segments)
candidates = s.segments
}
for _, segment := range candidates {
if criterion.Match(segment) {