mirror of https://github.com/milvus-io/milvus.git
Signed-off-by: lixinguo <xinguo.li@zilliz.com> Signed-off-by: lixinguo <xinguo.li@zilliz.com> Co-authored-by: lixinguo <xinguo.li@zilliz.com> Signed-off-by: lixinguo <xinguo.li@zilliz.com> Co-authored-by: lixinguo <xinguo.li@zilliz.com>pull/20754/head
parent
efa1cf7f23
commit
64e9cb3061
|
@ -27,10 +27,11 @@ import (
|
|||
)
|
||||
|
||||
type RowCountBasedBalancer struct {
|
||||
RoundRobinBalancer
|
||||
*RoundRobinBalancer
|
||||
nodeManager *session.NodeManager
|
||||
dist *meta.DistributionManager
|
||||
meta *meta.Meta
|
||||
targetMgr *meta.TargetManager
|
||||
}
|
||||
|
||||
func (b *RowCountBasedBalancer) AssignSegment(segments []*meta.Segment, nodes []int64) []SegmentAssignPlan {
|
||||
|
@ -110,6 +111,10 @@ func (b *RowCountBasedBalancer) balanceReplica(replica *meta.Replica) ([]Segment
|
|||
totalCnt := 0
|
||||
for _, nid := range nodes {
|
||||
segments := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nid)
|
||||
// Only balance segments in targets
|
||||
segments = lo.Filter(segments, func(segment *meta.Segment, _ int) bool {
|
||||
return b.targetMgr.GetSegment(segment.GetID()) != nil
|
||||
})
|
||||
cnt := 0
|
||||
for _, s := range segments {
|
||||
cnt += int(s.GetNumOfRows())
|
||||
|
@ -192,12 +197,14 @@ func NewRowCountBasedBalancer(
|
|||
nodeManager *session.NodeManager,
|
||||
dist *meta.DistributionManager,
|
||||
meta *meta.Meta,
|
||||
targetMgr *meta.TargetManager,
|
||||
) *RowCountBasedBalancer {
|
||||
return &RowCountBasedBalancer{
|
||||
RoundRobinBalancer: *NewRoundRobinBalancer(scheduler, nodeManager),
|
||||
RoundRobinBalancer: NewRoundRobinBalancer(scheduler, nodeManager),
|
||||
nodeManager: nodeManager,
|
||||
dist: dist,
|
||||
meta: meta,
|
||||
targetMgr: targetMgr,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ type RowCountBasedBalancerTestSuite struct {
|
|||
suite.Suite
|
||||
balancer *RowCountBasedBalancer
|
||||
kv *etcdkv.EtcdKV
|
||||
broker *meta.MockBroker
|
||||
}
|
||||
|
||||
func (suite *RowCountBasedBalancerTestSuite) SetupSuite() {
|
||||
|
@ -46,14 +47,16 @@ func (suite *RowCountBasedBalancerTestSuite) SetupTest() {
|
|||
cli, err := etcd.GetEtcdClient(&config)
|
||||
suite.Require().NoError(err)
|
||||
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath)
|
||||
suite.broker = meta.NewMockBroker(suite.T())
|
||||
|
||||
store := meta.NewMetaStore(suite.kv)
|
||||
idAllocator := RandomIncrementIDAllocator()
|
||||
testMeta := meta.NewMeta(idAllocator, store)
|
||||
testTarget := meta.NewTargetManager()
|
||||
|
||||
distManager := meta.NewDistributionManager()
|
||||
nodeManager := session.NewNodeManager()
|
||||
suite.balancer = NewRowCountBasedBalancer(nil, nodeManager, distManager, testMeta)
|
||||
suite.balancer = NewRowCountBasedBalancer(nil, nodeManager, distManager, testMeta, testTarget)
|
||||
}
|
||||
|
||||
func (suite *RowCountBasedBalancerTestSuite) TearDownTest() {
|
||||
|
@ -146,6 +149,9 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
|
|||
defer suite.TearDownTest()
|
||||
balancer := suite.balancer
|
||||
collection := utils.CreateTestCollection(1, 1)
|
||||
balancer.targetMgr.AddSegment(utils.CreateTestSegmentInfo(1, 1, 1, "test-insert-channel"))
|
||||
balancer.targetMgr.AddSegment(utils.CreateTestSegmentInfo(1, 1, 2, "test-insert-channel"))
|
||||
balancer.targetMgr.AddSegment(utils.CreateTestSegmentInfo(1, 1, 3, "test-insert-channel"))
|
||||
collection.LoadPercentage = 100
|
||||
collection.Status = querypb.LoadStatus_Loaded
|
||||
balancer.meta.CollectionManager.PutCollection(collection)
|
||||
|
|
|
@ -222,6 +222,7 @@ func (s *Server) Init() error {
|
|||
s.nodeMgr,
|
||||
s.dist,
|
||||
s.meta,
|
||||
s.targetMgr,
|
||||
)
|
||||
|
||||
// Init checker controller
|
||||
|
|
|
@ -140,6 +140,7 @@ func (suite *ServiceSuite) SetupTest() {
|
|||
suite.nodeMgr,
|
||||
suite.dist,
|
||||
suite.meta,
|
||||
suite.targetMgr,
|
||||
)
|
||||
|
||||
suite.server = &Server{
|
||||
|
|
Loading…
Reference in New Issue