mirror of https://github.com/milvus-io/milvus.git
Only balance segement in targets (#20709)
Signed-off-by: lixinguo <xinguo.li@zilliz.com> Signed-off-by: lixinguo <xinguo.li@zilliz.com> Co-authored-by: lixinguo <xinguo.li@zilliz.com>pull/20741/head
parent
0a289c0052
commit
8283d32ac4
|
@ -27,10 +27,11 @@ import (
|
|||
)
|
||||
|
||||
type RowCountBasedBalancer struct {
|
||||
RoundRobinBalancer
|
||||
*RoundRobinBalancer
|
||||
nodeManager *session.NodeManager
|
||||
dist *meta.DistributionManager
|
||||
meta *meta.Meta
|
||||
targetMgr *meta.TargetManager
|
||||
}
|
||||
|
||||
func (b *RowCountBasedBalancer) AssignSegment(segments []*meta.Segment, nodes []int64) []SegmentAssignPlan {
|
||||
|
@ -110,6 +111,10 @@ func (b *RowCountBasedBalancer) balanceReplica(replica *meta.Replica) ([]Segment
|
|||
totalCnt := 0
|
||||
for _, nid := range nodes {
|
||||
segments := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nid)
|
||||
// Only balance segments in targets
|
||||
segments = lo.Filter(segments, func(segment *meta.Segment, _ int) bool {
|
||||
return b.targetMgr.GetHistoricalSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil
|
||||
})
|
||||
cnt := 0
|
||||
for _, s := range segments {
|
||||
cnt += int(s.GetNumOfRows())
|
||||
|
@ -192,12 +197,14 @@ func NewRowCountBasedBalancer(
|
|||
nodeManager *session.NodeManager,
|
||||
dist *meta.DistributionManager,
|
||||
meta *meta.Meta,
|
||||
targetMgr *meta.TargetManager,
|
||||
) *RowCountBasedBalancer {
|
||||
return &RowCountBasedBalancer{
|
||||
RoundRobinBalancer: *NewRoundRobinBalancer(scheduler, nodeManager),
|
||||
RoundRobinBalancer: NewRoundRobinBalancer(scheduler, nodeManager),
|
||||
nodeManager: nodeManager,
|
||||
dist: dist,
|
||||
meta: meta,
|
||||
targetMgr: targetMgr,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||
"github.com/milvus-io/milvus/internal/util/etcd"
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
||||
|
@ -34,6 +35,7 @@ type RowCountBasedBalancerTestSuite struct {
|
|||
suite.Suite
|
||||
balancer *RowCountBasedBalancer
|
||||
kv *etcdkv.EtcdKV
|
||||
broker *meta.MockBroker
|
||||
}
|
||||
|
||||
func (suite *RowCountBasedBalancerTestSuite) SetupSuite() {
|
||||
|
@ -46,14 +48,16 @@ func (suite *RowCountBasedBalancerTestSuite) SetupTest() {
|
|||
cli, err := etcd.GetEtcdClient(config)
|
||||
suite.Require().NoError(err)
|
||||
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
|
||||
suite.broker = meta.NewMockBroker(suite.T())
|
||||
|
||||
store := meta.NewMetaStore(suite.kv)
|
||||
idAllocator := RandomIncrementIDAllocator()
|
||||
testMeta := meta.NewMeta(idAllocator, store)
|
||||
testTarget := meta.NewTargetManager(suite.broker, testMeta)
|
||||
|
||||
distManager := meta.NewDistributionManager()
|
||||
nodeManager := session.NewNodeManager()
|
||||
suite.balancer = NewRowCountBasedBalancer(nil, nodeManager, distManager, testMeta)
|
||||
suite.balancer = NewRowCountBasedBalancer(nil, nodeManager, distManager, testMeta, testTarget)
|
||||
}
|
||||
|
||||
func (suite *RowCountBasedBalancerTestSuite) TearDownTest() {
|
||||
|
@ -146,6 +150,21 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
|
|||
defer suite.TearDownTest()
|
||||
balancer := suite.balancer
|
||||
collection := utils.CreateTestCollection(1, 1)
|
||||
segments := []*datapb.SegmentBinlogs{
|
||||
{
|
||||
SegmentID: 1,
|
||||
},
|
||||
{
|
||||
SegmentID: 2,
|
||||
},
|
||||
{
|
||||
SegmentID: 3,
|
||||
},
|
||||
}
|
||||
suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, int64(1), int64(1)).Return(
|
||||
nil, segments, nil)
|
||||
balancer.targetMgr.UpdateCollectionNextTargetWithPartitions(int64(1), int64(1))
|
||||
balancer.targetMgr.UpdateCollectionCurrentTarget(1, 1)
|
||||
collection.LoadPercentage = 100
|
||||
collection.Status = querypb.LoadStatus_Loaded
|
||||
balancer.meta.CollectionManager.PutCollection(collection)
|
||||
|
|
|
@ -52,7 +52,7 @@ func (p *CollectionTarget) GetAllDmChannelNames() []string {
|
|||
}
|
||||
|
||||
func (p *CollectionTarget) IsEmpty() bool {
|
||||
return len(p.dmChannels) == 0
|
||||
return len(p.dmChannels)+len(p.segments) == 0
|
||||
}
|
||||
|
||||
type target struct {
|
||||
|
|
|
@ -219,6 +219,7 @@ func (s *Server) Init() error {
|
|||
s.nodeMgr,
|
||||
s.dist,
|
||||
s.meta,
|
||||
s.targetMgr,
|
||||
)
|
||||
|
||||
// Init checker controller
|
||||
|
|
|
@ -130,6 +130,7 @@ func (suite *ServiceSuite) SetupTest() {
|
|||
suite.nodeMgr,
|
||||
suite.dist,
|
||||
suite.meta,
|
||||
suite.targetMgr,
|
||||
)
|
||||
|
||||
suite.server = &Server{
|
||||
|
|
Loading…
Reference in New Issue