mirror of https://github.com/milvus-io/milvus.git
enhance: reduce the cpu usage when collection number is high (#32245)
related to #32165 1. for all the manager, support collection level index 2. remove collection level filter to avoid extra cpu usage when collection number increases Signed-off-by: xiaofanluan <xiaofan.luan@zilliz.com>pull/32644/head
parent
9083f6999d
commit
02ace25c68
|
@ -316,7 +316,7 @@ func (b *RowCountBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNode
|
|||
func (b *RowCountBasedBalancer) genStoppingChannelPlan(replica *meta.Replica, onlineNodes []int64, offlineNodes []int64) []ChannelAssignPlan {
|
||||
channelPlans := make([]ChannelAssignPlan, 0)
|
||||
for _, nodeID := range offlineNodes {
|
||||
dmChannels := b.dist.ChannelDistManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(nodeID))
|
||||
dmChannels := b.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(nodeID))
|
||||
plans := b.AssignChannel(dmChannels, onlineNodes, false)
|
||||
for i := range plans {
|
||||
plans[i].From = nodeID
|
||||
|
@ -341,7 +341,7 @@ func (b *RowCountBasedBalancer) genChannelPlan(replica *meta.Replica, onlineNode
|
|||
nodeWithLessChannel := make([]int64, 0)
|
||||
channelsToMove := make([]*meta.DmChannel, 0)
|
||||
for _, node := range onlineNodes {
|
||||
channels := b.dist.ChannelDistManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(node))
|
||||
channels := b.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(node))
|
||||
|
||||
if len(channels) <= average {
|
||||
nodeWithLessChannel = append(nodeWithLessChannel, node)
|
||||
|
|
|
@ -176,7 +176,7 @@ func PrintCurrentReplicaDist(replica *meta.Replica,
|
|||
// 3. print stopping nodes channel distribution
|
||||
distInfo += "[stoppingNodesChannelDist:"
|
||||
for stoppingNodeID := range stoppingNodesSegments {
|
||||
stoppingNodeChannels := channelManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(stoppingNodeID))
|
||||
stoppingNodeChannels := channelManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(stoppingNodeID))
|
||||
distInfo += fmt.Sprintf("[nodeID:%d, count:%d,", stoppingNodeID, len(stoppingNodeChannels))
|
||||
distInfo += "channels:["
|
||||
for _, stoppingChan := range stoppingNodeChannels {
|
||||
|
@ -189,7 +189,7 @@ func PrintCurrentReplicaDist(replica *meta.Replica,
|
|||
// 4. print normal nodes channel distribution
|
||||
distInfo += "[normalNodesChannelDist:"
|
||||
for normalNodeID := range nodeSegments {
|
||||
normalNodeChannels := channelManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(normalNodeID))
|
||||
normalNodeChannels := channelManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(normalNodeID))
|
||||
distInfo += fmt.Sprintf("[nodeID:%d, count:%d,", normalNodeID, len(normalNodeChannels))
|
||||
distInfo += "channels:["
|
||||
for _, normalNodeChan := range normalNodeChannels {
|
||||
|
|
|
@ -162,7 +162,7 @@ func (c *ChannelChecker) getDmChannelDiff(collectionID int64,
|
|||
func (c *ChannelChecker) getChannelDist(replica *meta.Replica) []*meta.DmChannel {
|
||||
dist := make([]*meta.DmChannel, 0)
|
||||
for _, nodeID := range replica.GetNodes() {
|
||||
dist = append(dist, c.dist.ChannelDistManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(nodeID))...)
|
||||
dist = append(dist, c.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(nodeID))...)
|
||||
}
|
||||
return dist
|
||||
}
|
||||
|
@ -183,7 +183,7 @@ func (c *ChannelChecker) findRepeatedChannels(ctx context.Context, replicaID int
|
|||
for _, ch := range dist {
|
||||
leaderView := c.dist.LeaderViewManager.GetLeaderShardView(ch.Node, ch.GetChannelName())
|
||||
if leaderView == nil {
|
||||
log.Info("shard leadview is not ready, skip",
|
||||
log.Info("shard leader view is not ready, skip",
|
||||
zap.Int64("collectionID", replica.GetCollectionID()),
|
||||
zap.Int64("replicaID", replicaID),
|
||||
zap.Int64("leaderID", ch.Node),
|
||||
|
|
|
@ -49,11 +49,17 @@ func waitCollectionReleased(dist *meta.DistributionManager, checkerController *c
|
|||
return partitionSet.Contain(segment.GetPartitionID())
|
||||
})
|
||||
} else {
|
||||
channels = dist.ChannelDistManager.GetByFilter(meta.WithCollectionID2Channel(collection))
|
||||
channels = dist.ChannelDistManager.GetByCollectionAndFilter(collection)
|
||||
}
|
||||
|
||||
if len(channels)+len(segments) == 0 {
|
||||
break
|
||||
} else {
|
||||
log.Info("wait for release done", zap.Int64("collection", collection),
|
||||
zap.Int64s("partitions", partitions),
|
||||
zap.Int("channel", len(channels)),
|
||||
zap.Int("segments", len(segments)),
|
||||
)
|
||||
}
|
||||
|
||||
// trigger check more frequently
|
||||
|
|
|
@ -45,12 +45,6 @@ func WithReplica2Channel(replica *Replica) ChannelDistFilter {
|
|||
}
|
||||
}
|
||||
|
||||
func WithChannelName2Channel(channelName string) ChannelDistFilter {
|
||||
return func(ch *DmChannel) bool {
|
||||
return ch.GetChannelName() == channelName
|
||||
}
|
||||
}
|
||||
|
||||
type DmChannel struct {
|
||||
*datapb.VchannelInfo
|
||||
Node int64
|
||||
|
@ -76,11 +70,15 @@ type ChannelDistManager struct {
|
|||
|
||||
// NodeID -> Channels
|
||||
channels map[UniqueID][]*DmChannel
|
||||
|
||||
// CollectionID -> Channels
|
||||
collectionIndex map[int64][]*DmChannel
|
||||
}
|
||||
|
||||
func NewChannelDistManager() *ChannelDistManager {
|
||||
return &ChannelDistManager{
|
||||
channels: make(map[UniqueID][]*DmChannel),
|
||||
channels: make(map[UniqueID][]*DmChannel),
|
||||
collectionIndex: make(map[int64][]*DmChannel),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -146,6 +144,31 @@ func (m *ChannelDistManager) GetByFilter(filters ...ChannelDistFilter) []*DmChan
|
|||
return ret
|
||||
}
|
||||
|
||||
func (m *ChannelDistManager) GetByCollectionAndFilter(collectionID int64, filters ...ChannelDistFilter) []*DmChannel {
|
||||
m.rwmutex.RLock()
|
||||
defer m.rwmutex.RUnlock()
|
||||
|
||||
mergedFilters := func(ch *DmChannel) bool {
|
||||
for _, fn := range filters {
|
||||
if fn != nil && !fn(ch) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
ret := make([]*DmChannel, 0)
|
||||
|
||||
// If a collection ID is provided, use the collection index
|
||||
for _, channel := range m.collectionIndex[collectionID] {
|
||||
if mergedFilters(channel) {
|
||||
ret = append(ret, channel)
|
||||
}
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func (m *ChannelDistManager) Update(nodeID UniqueID, channels ...*DmChannel) {
|
||||
m.rwmutex.Lock()
|
||||
defer m.rwmutex.Unlock()
|
||||
|
@ -155,4 +178,21 @@ func (m *ChannelDistManager) Update(nodeID UniqueID, channels ...*DmChannel) {
|
|||
}
|
||||
|
||||
m.channels[nodeID] = channels
|
||||
|
||||
m.updateCollectionIndex()
|
||||
}
|
||||
|
||||
// update secondary index for channel distribution
|
||||
func (m *ChannelDistManager) updateCollectionIndex() {
|
||||
m.collectionIndex = make(map[int64][]*DmChannel)
|
||||
for _, nodeChannels := range m.channels {
|
||||
for _, channel := range nodeChannels {
|
||||
collectionID := channel.GetCollectionID()
|
||||
if channels, ok := m.collectionIndex[collectionID]; !ok {
|
||||
m.collectionIndex[collectionID] = []*DmChannel{channel}
|
||||
} else {
|
||||
m.collectionIndex[collectionID] = append(channels, channel)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -76,26 +76,26 @@ func (suite *ChannelDistManagerSuite) TestGetBy() {
|
|||
}
|
||||
|
||||
// Test GetByCollection
|
||||
channels = dist.GetByFilter(WithCollectionID2Channel(suite.collection))
|
||||
channels = dist.GetByCollectionAndFilter(suite.collection)
|
||||
suite.Len(channels, 4)
|
||||
suite.AssertCollection(channels, suite.collection)
|
||||
channels = dist.GetByFilter(WithCollectionID2Channel(-1))
|
||||
channels = dist.GetByCollectionAndFilter(-1)
|
||||
suite.Len(channels, 0)
|
||||
|
||||
// Test GetByNodeAndCollection
|
||||
// 1. Valid node and valid collection
|
||||
for _, node := range suite.nodes {
|
||||
channels := dist.GetByFilter(WithCollectionID2Channel(suite.collection), WithNodeID2Channel(node))
|
||||
channels := dist.GetByCollectionAndFilter(suite.collection, WithNodeID2Channel(node))
|
||||
suite.AssertNode(channels, node)
|
||||
suite.AssertCollection(channels, suite.collection)
|
||||
}
|
||||
|
||||
// 2. Valid node and invalid collection
|
||||
channels = dist.GetByFilter(WithCollectionID2Channel(-1), WithNodeID2Channel(suite.nodes[1]))
|
||||
channels = dist.GetByCollectionAndFilter(-1, WithNodeID2Channel(suite.nodes[1]))
|
||||
suite.Len(channels, 0)
|
||||
|
||||
// 3. Invalid node and valid collection
|
||||
channels = dist.GetByFilter(WithCollectionID2Channel(suite.collection), WithNodeID2Channel(-1))
|
||||
channels = dist.GetByCollectionAndFilter(suite.collection, WithNodeID2Channel(-1))
|
||||
suite.Len(channels, 0)
|
||||
}
|
||||
|
||||
|
|
|
@ -110,7 +110,7 @@ func (ob *ReplicaObserver) checkNodesInReplica() {
|
|||
)
|
||||
removeNodes := make([]int64, 0, len(roNodes))
|
||||
for _, node := range roNodes {
|
||||
channels := ob.distMgr.ChannelDistManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(node))
|
||||
channels := ob.distMgr.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(node))
|
||||
segments := ob.distMgr.SegmentDistManager.GetByFilter(meta.WithCollectionID(collectionID), meta.WithNodeID(node))
|
||||
if len(channels) == 0 && len(segments) == 0 {
|
||||
removeNodes = append(removeNodes, node)
|
||||
|
|
|
@ -91,7 +91,7 @@ func (ob *ResourceObserver) checkAndRecoverResourceGroup() {
|
|||
manager := ob.meta.ResourceManager
|
||||
rgNames := manager.ListResourceGroups()
|
||||
enableRGAutoRecover := params.Params.QueryCoordCfg.EnableRGAutoRecover.GetAsBool()
|
||||
log.Info("start to check resource group", zap.Bool("enableRGAutoRecover", enableRGAutoRecover), zap.Int("resourceGroupNum", len(rgNames)))
|
||||
log.Debug("start to check resource group", zap.Bool("enableRGAutoRecover", enableRGAutoRecover), zap.Int("resourceGroupNum", len(rgNames)))
|
||||
|
||||
// Check if there is any incoming node.
|
||||
if manager.CheckIncomingNodeNum() > 0 {
|
||||
|
@ -100,10 +100,10 @@ func (ob *ResourceObserver) checkAndRecoverResourceGroup() {
|
|||
}
|
||||
|
||||
// Remove all down nodes in resource group manager.
|
||||
log.Info("remove all down nodes in resource group manager...")
|
||||
log.Debug("remove all down nodes in resource group manager...")
|
||||
ob.meta.RemoveAllDownNode()
|
||||
|
||||
log.Info("recover resource groups...")
|
||||
log.Debug("recover resource groups...")
|
||||
// Recover all resource group into expected configuration.
|
||||
for _, rgName := range rgNames {
|
||||
if err := manager.MeetRequirement(rgName); err != nil {
|
||||
|
@ -126,5 +126,5 @@ func (ob *ResourceObserver) checkAndRecoverResourceGroup() {
|
|||
if enableRGAutoRecover {
|
||||
utils.RecoverAllCollection(ob.meta)
|
||||
}
|
||||
log.Info("check resource group done", zap.Bool("enableRGAutoRecover", enableRGAutoRecover), zap.Int("resourceGroupNum", len(rgNames)))
|
||||
log.Debug("check resource group done", zap.Bool("enableRGAutoRecover", enableRGAutoRecover), zap.Int("resourceGroupNum", len(rgNames)))
|
||||
}
|
||||
|
|
|
@ -360,7 +360,7 @@ func (s *Server) TransferChannel(ctx context.Context, req *querypb.TransferChann
|
|||
dstNodeSet.Remove(srcNode)
|
||||
|
||||
// check sealed segment list
|
||||
channels := s.dist.ChannelDistManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(srcNode))
|
||||
channels := s.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(srcNode))
|
||||
toBalance := typeutil.NewSet[*meta.DmChannel]()
|
||||
if req.GetTransferAll() {
|
||||
toBalance.Insert(channels...)
|
||||
|
|
|
@ -1688,7 +1688,7 @@ func (p *queryCoordConfig) init(base *BaseTable) {
|
|||
p.SegmentCheckInterval = ParamItem{
|
||||
Key: "queryCoord.checkSegmentInterval",
|
||||
Version: "2.3.0",
|
||||
DefaultValue: "1000",
|
||||
DefaultValue: "3000",
|
||||
PanicIfEmpty: true,
|
||||
Export: true,
|
||||
}
|
||||
|
@ -1697,7 +1697,7 @@ func (p *queryCoordConfig) init(base *BaseTable) {
|
|||
p.ChannelCheckInterval = ParamItem{
|
||||
Key: "queryCoord.checkChannelInterval",
|
||||
Version: "2.3.0",
|
||||
DefaultValue: "1000",
|
||||
DefaultValue: "3000",
|
||||
PanicIfEmpty: true,
|
||||
Export: true,
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue