enhance: Preserve fixed-size memory in delegator node for growing segment (#34600)

issue: #34595
pr: #34596
When consuming insert data on the delegator node, QueryCoord will move
out some sealed segments to manage its memory usage. After the growing
segment gets flushed, some sealed segments from other workers will be
moved back to the delegator node. To avoid the frequent movement of
segments, we estimate the maximum growing row count and preserve a
fixed-size memory in the delegator node.

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/34693/head
wei liu 2024-07-15 20:51:46 +08:00 committed by GitHub
parent 10c04f33c7
commit cf701a9bf0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 39 additions and 13 deletions

View File

@ -305,10 +305,10 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TestAssignSegmentWithGrowing()
distributions := map[int64][]*meta.Segment{
1: {
{SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 20, CollectionID: 1}, Node: 1},
{SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 100, CollectionID: 1}, Node: 1},
},
2: {
{SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 20, CollectionID: 1}, Node: 2},
{SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 100, CollectionID: 1}, Node: 2},
},
}
for node, s := range distributions {
@ -333,9 +333,8 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TestAssignSegmentWithGrowing()
// mock 50 growing row count in node 1, which is delegator, expect all segment assign to node 2
leaderView := &meta.LeaderView{
ID: 1,
CollectionID: 1,
NumOfGrowingRows: 50,
ID: 1,
CollectionID: 1,
}
suite.balancer.dist.LeaderViewManager.Update(1, leaderView)
plans := balancer.AssignSegment(1, toAssign, lo.Keys(distributions), false)

View File

@ -150,17 +150,22 @@ func (b *ScoreBasedBalancer) convertToNodeItems(collectionID int64, nodeIDs []in
}
func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int {
delegatorOverloadFactor := params.Params.QueryCoordCfg.DelegatorMemoryOverloadFactor.GetAsFloat()
nodeRowCount := 0
nodeCollectionRowCount := make(map[int64]int)
// calculate global sealed segment row count
globalSegments := b.dist.SegmentDistManager.GetByFilter(meta.WithNodeID(nodeID))
for _, s := range globalSegments {
nodeRowCount += int(s.GetNumOfRows())
nodeCollectionRowCount[s.CollectionID] += int(s.GetNumOfRows())
}
// calculate global growing segment row count
views := b.dist.LeaderViewManager.GetByFilter(meta.WithNodeID2LeaderView(nodeID))
for _, view := range views {
nodeRowCount += int(float64(view.NumOfGrowingRows) * params.Params.QueryCoordCfg.GrowingRowCountWeight.GetAsFloat())
nodeRowCount += int(float64(view.NumOfGrowingRows))
nodeRowCount += int(float64(nodeCollectionRowCount[view.CollectionID]) * delegatorOverloadFactor)
}
// calculate executing task cost in scheduler
@ -176,7 +181,8 @@ func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int {
// calculate collection growing segment row count
collectionViews := b.dist.LeaderViewManager.GetByFilter(meta.WithCollectionID2LeaderView(collectionID), meta.WithNodeID2LeaderView(nodeID))
for _, view := range collectionViews {
collectionRowCount += int(float64(view.NumOfGrowingRows) * params.Params.QueryCoordCfg.GrowingRowCountWeight.GetAsFloat())
collectionRowCount += int(float64(view.NumOfGrowingRows))
collectionRowCount += int(float64(collectionRowCount) * delegatorOverloadFactor)
}
// calculate executing task cost in scheduler

View File

@ -303,12 +303,22 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegmentWithGrowing() {
defer suite.TearDownTest()
balancer := suite.balancer
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.DelegatorMemoryOverloadFactor.Key, "0.3")
suite.balancer.meta.PutCollection(&meta.Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: 1,
},
}, &meta.Partition{
PartitionLoadInfo: &querypb.PartitionLoadInfo{
PartitionID: 1,
},
})
distributions := map[int64][]*meta.Segment{
1: {
{SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 20, CollectionID: 1}, Node: 1},
{SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 100, CollectionID: 1}, Node: 1},
},
2: {
{SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 20, CollectionID: 1}, Node: 2},
{SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 100, CollectionID: 1}, Node: 2},
},
}
for node, s := range distributions {
@ -333,9 +343,8 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegmentWithGrowing() {
// mock 50 growing row count in node 1, which is delegator, expect all segment assign to node 2
leaderView := &meta.LeaderView{
ID: 1,
CollectionID: 1,
NumOfGrowingRows: 50,
ID: 1,
CollectionID: 1,
}
suite.balancer.dist.LeaderViewManager.Update(1, leaderView)
plans := balancer.AssignSegment(1, toAssign, lo.Keys(distributions), false)

View File

@ -1540,6 +1540,7 @@ type queryCoordConfig struct {
RowCountMaxSteps ParamItem `refreshable:"true"`
RandomMaxSteps ParamItem `refreshable:"true"`
GrowingRowCountWeight ParamItem `refreshable:"true"`
DelegatorMemoryOverloadFactor ParamItem `refreshable:"true`
BalanceCostThreshold ParamItem `refreshable:"true"`
SegmentCheckInterval ParamItem `refreshable:"true"`
@ -1774,6 +1775,16 @@ func (p *queryCoordConfig) init(base *BaseTable) {
}
p.GrowingRowCountWeight.Init(base.mgr)
p.DelegatorMemoryOverloadFactor = ParamItem{
Key: "queryCoord.delegatorMemoryOverloadFactor",
Version: "2.3.19",
DefaultValue: "0.3",
PanicIfEmpty: true,
Doc: "the factor of delegator overloaded memory",
Export: true,
}
p.DelegatorMemoryOverloadFactor.Init(base.mgr)
p.BalanceCostThreshold = ParamItem{
Key: "queryCoord.balanceCostThreshold",
Version: "2.4.0",

View File

@ -319,6 +319,8 @@ func TestComponentParam(t *testing.T) {
params.Save("queryCoord.checkExecutedFlagInterval", "200")
assert.Equal(t, 200, Params.CheckExecutedFlagInterval.GetAsInt())
params.Reset("queryCoord.checkExecutedFlagInterval")
assert.Equal(t, 0.3, Params.DelegatorMemoryOverloadFactor.GetAsFloat())
})
t.Run("test queryNodeConfig", func(t *testing.T) {

View File

@ -50,7 +50,6 @@ func (s *ReplicaTestSuit) SetupSuite() {
paramtable.Init()
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.BalanceCheckInterval.Key, "1000")
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.GracefulStopTimeout.Key, "1")
s.Require().NoError(s.SetupEmbedEtcd())
}