mirror of https://github.com/milvus-io/milvus.git
skip balance on loading collection (#20483)
Signed-off-by: Wei Liu <wei.liu@zilliz.com> Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/20488/head
parent
fca2b71e28
commit
7537dbfa37
|
@ -19,9 +19,11 @@ package balance
|
|||
import (
|
||||
"sort"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||
"github.com/samber/lo"
|
||||
)
|
||||
|
||||
type RowCountBasedBalancer struct {
|
||||
|
@ -81,8 +83,13 @@ func (b *RowCountBasedBalancer) convertToNodeItems(nodeIDs []int64) []*nodeItem
|
|||
func (b *RowCountBasedBalancer) Balance() ([]SegmentAssignPlan, []ChannelAssignPlan) {
|
||||
ids := b.meta.CollectionManager.GetAll()
|
||||
|
||||
// loading collection should skip balance
|
||||
loadedCollections := lo.Filter(ids, func(cid int64, _ int) bool {
|
||||
return b.meta.GetStatus(cid) == querypb.LoadStatus_Loaded
|
||||
})
|
||||
|
||||
segmentPlans, channelPlans := make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0)
|
||||
for _, cid := range ids {
|
||||
for _, cid := range loadedCollections {
|
||||
replicas := b.meta.ReplicaManager.GetByCollection(cid)
|
||||
for _, replica := range replicas {
|
||||
splans, cplans := b.balanceReplica(replica)
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
|
@ -144,7 +145,52 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
|
|||
suite.SetupSuite()
|
||||
defer suite.TearDownTest()
|
||||
balancer := suite.balancer
|
||||
balancer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
|
||||
collection := utils.CreateTestCollection(1, 1)
|
||||
collection.LoadPercentage = 100
|
||||
collection.Status = querypb.LoadStatus_Loaded
|
||||
balancer.meta.CollectionManager.PutCollection(collection)
|
||||
balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, c.nodes))
|
||||
for node, s := range c.distributions {
|
||||
balancer.dist.SegmentDistManager.Update(node, s...)
|
||||
}
|
||||
segmentPlans, channelPlans := balancer.Balance()
|
||||
suite.Empty(channelPlans)
|
||||
suite.ElementsMatch(c.expectPlans, segmentPlans)
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (suite *RowCountBasedBalancerTestSuite) TestBalanceOnLoadingCollection() {
|
||||
cases := []struct {
|
||||
name string
|
||||
nodes []int64
|
||||
distributions map[int64][]*meta.Segment
|
||||
expectPlans []SegmentAssignPlan
|
||||
}{
|
||||
{
|
||||
name: "normal balance",
|
||||
nodes: []int64{1, 2},
|
||||
distributions: map[int64][]*meta.Segment{
|
||||
1: {{SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 10}, Node: 1}},
|
||||
2: {
|
||||
{SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 20}, Node: 2},
|
||||
{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 30}, Node: 2},
|
||||
},
|
||||
},
|
||||
expectPlans: []SegmentAssignPlan{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
suite.Run(c.name, func() {
|
||||
suite.SetupSuite()
|
||||
defer suite.TearDownTest()
|
||||
balancer := suite.balancer
|
||||
collection := utils.CreateTestCollection(1, 1)
|
||||
collection.LoadPercentage = 100
|
||||
collection.Status = querypb.LoadStatus_Loading
|
||||
balancer.meta.CollectionManager.PutCollection(collection)
|
||||
balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, c.nodes))
|
||||
for node, s := range c.distributions {
|
||||
balancer.dist.SegmentDistManager.Update(node, s...)
|
||||
|
|
Loading…
Reference in New Issue