enhance: Avoid assign too much segment/channels to new querynode (#34096) (#34461)

issue: #34095
pr: #34096

When a new query node comes online, the segment_checker,
channel_checker, and balance_checker simultaneously attempt to allocate
segments to it. If this occurs during the execution of a load task and
the distribution of the new query node hasn't been updated, the query
coordinator may mistakenly view the new query node as empty. As a
result, it assigns segments or channels to it, potentially overloading
the new query node with more segments or channels than expected.

This PR measures the workload of the executing tasks on the target query
node to prevent assigning an excessive number of segments to it.

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/34626/head
wei liu 2024-07-10 14:22:15 +08:00 committed by GitHub
parent 0bfa1a72bb
commit d3d192064f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 304 additions and 127 deletions

View File

@ -84,7 +84,7 @@ func (b *RoundRobinBalancer) AssignSegment(collectionID int64, segments []*meta.
sort.Slice(nodesInfo, func(i, j int) bool {
cnt1, cnt2 := nodesInfo[i].SegmentCnt(), nodesInfo[j].SegmentCnt()
id1, id2 := nodesInfo[i].ID(), nodesInfo[j].ID()
delta1, delta2 := b.scheduler.GetNodeSegmentDelta(id1), b.scheduler.GetNodeSegmentDelta(id2)
delta1, delta2 := b.scheduler.GetSegmentTaskDelta(id1, -1), b.scheduler.GetSegmentTaskDelta(id2, -1)
return cnt1+delta1 < cnt2+delta2
})
ret := make([]SegmentAssignPlan, 0, len(segments))
@ -114,7 +114,7 @@ func (b *RoundRobinBalancer) AssignChannel(channels []*meta.DmChannel, nodes []i
sort.Slice(nodesInfo, func(i, j int) bool {
cnt1, cnt2 := nodesInfo[i].ChannelCnt(), nodesInfo[j].ChannelCnt()
id1, id2 := nodesInfo[i].ID(), nodesInfo[j].ID()
delta1, delta2 := b.scheduler.GetNodeChannelDelta(id1), b.scheduler.GetNodeChannelDelta(id2)
delta1, delta2 := b.scheduler.GetChannelTaskDelta(id1, -1), b.scheduler.GetChannelTaskDelta(id2, -1)
return cnt1+delta1 < cnt2+delta2
})
ret := make([]ChannelAssignPlan, 0, len(channels))

View File

@ -19,6 +19,7 @@ package balance
import (
"testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -37,6 +38,9 @@ func (suite *BalanceTestSuite) SetupTest() {
nodeManager := session.NewNodeManager()
suite.mockScheduler = task.NewMockScheduler(suite.T())
suite.roundRobinBalancer = NewRoundRobinBalancer(suite.mockScheduler, nodeManager)
suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
}
func (suite *BalanceTestSuite) TestAssignBalance() {
@ -84,13 +88,14 @@ func (suite *BalanceTestSuite) TestAssignBalance() {
for _, c := range cases {
suite.Run(c.name, func() {
suite.SetupTest()
suite.mockScheduler.ExpectedCalls = nil
for i := range c.nodeIDs {
nodeInfo := session.NewNodeInfo(c.nodeIDs[i], "127.0.0.1:0")
nodeInfo.UpdateStats(session.WithSegmentCnt(c.segmentCnts[i]))
nodeInfo.SetState(c.states[i])
suite.roundRobinBalancer.nodeManager.Add(nodeInfo)
if !nodeInfo.IsStoppingState() {
suite.mockScheduler.EXPECT().GetNodeSegmentDelta(c.nodeIDs[i]).Return(c.deltaCnts[i])
suite.mockScheduler.EXPECT().GetSegmentTaskDelta(c.nodeIDs[i], int64(-1)).Return(c.deltaCnts[i])
}
}
plans := suite.roundRobinBalancer.AssignSegment(0, c.assignments, c.nodeIDs, false)
@ -144,13 +149,14 @@ func (suite *BalanceTestSuite) TestAssignChannel() {
for _, c := range cases {
suite.Run(c.name, func() {
suite.SetupTest()
suite.mockScheduler.ExpectedCalls = nil
for i := range c.nodeIDs {
nodeInfo := session.NewNodeInfo(c.nodeIDs[i], "127.0.0.1:0")
nodeInfo.UpdateStats(session.WithChannelCnt(c.channelCnts[i]))
nodeInfo.SetState(c.states[i])
suite.roundRobinBalancer.nodeManager.Add(nodeInfo)
if !nodeInfo.IsStoppingState() {
suite.mockScheduler.EXPECT().GetNodeChannelDelta(c.nodeIDs[i]).Return(c.deltaCnts[i])
suite.mockScheduler.EXPECT().GetChannelTaskDelta(c.nodeIDs[i], int64(-1)).Return(c.deltaCnts[i])
}
}
plans := suite.roundRobinBalancer.AssignChannel(c.assignments, c.nodeIDs, false)

View File

@ -137,6 +137,9 @@ func (b *RowCountBasedBalancer) convertToNodeItemsBySegment(nodeIDs []int64) []*
rowcnt += int(view.NumOfGrowingRows)
}
// calculate executing task cost in scheduler
rowcnt += b.scheduler.GetSegmentTaskDelta(node, -1)
// more row count, less priority
nodeItem := newNodeItem(rowcnt, node)
ret = append(ret, &nodeItem)
@ -150,8 +153,11 @@ func (b *RowCountBasedBalancer) convertToNodeItemsByChannel(nodeIDs []int64) []*
node := nodeInfo.ID()
channels := b.dist.ChannelDistManager.GetByNode(node)
channelCount := len(channels)
// calculate executing task cost in scheduler
channelCount += b.scheduler.GetChannelTaskDelta(node, -1)
// more channel num, less priority
nodeItem := newNodeItem(len(channels), node)
nodeItem := newNodeItem(channelCount, node)
ret = append(ret, &nodeItem)
}
return ret

View File

@ -76,6 +76,9 @@ func (suite *RowCountBasedBalancerTestSuite) SetupTest() {
suite.balancer = NewRowCountBasedBalancer(suite.mockScheduler, nodeManager, distManager, testMeta, testTarget)
suite.broker.EXPECT().GetPartitions(mock.Anything, int64(1)).Return([]int64{1}, nil).Maybe()
suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
}
func (suite *RowCountBasedBalancerTestSuite) TearDownTest() {
@ -451,7 +454,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
balancer.targetMgr.UpdateCollectionCurrentTarget(1)
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe()
for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...)
}
@ -658,7 +660,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOnPartStopping() {
suite.broker.ExpectedCalls = nil
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(nil, c.segmentInNext, nil)
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe()
for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...)
}
@ -756,7 +757,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOutboundNodes() {
},
}
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe()
for _, c := range cases {
suite.Run(c.name, func() {
suite.SetupSuite()
@ -1020,7 +1020,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestDisableBalanceChannel() {
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
balancer.targetMgr.UpdateCollectionCurrentTarget(1)
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe()
for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...)
}

View File

@ -148,19 +148,22 @@ func (b *ScoreBasedBalancer) convertToNodeItems(collectionID int64, nodeIDs []in
}
func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int {
rowCount := 0
nodeRowCount := 0
// calculate global sealed segment row count
globalSegments := b.dist.SegmentDistManager.GetByNode(nodeID)
for _, s := range globalSegments {
rowCount += int(s.GetNumOfRows())
nodeRowCount += int(s.GetNumOfRows())
}
// calculate global growing segment row count
views := b.dist.GetLeaderView(nodeID)
for _, view := range views {
rowCount += int(float64(view.NumOfGrowingRows) * params.Params.QueryCoordCfg.GrowingRowCountWeight.GetAsFloat())
nodeRowCount += int(float64(view.NumOfGrowingRows) * params.Params.QueryCoordCfg.GrowingRowCountWeight.GetAsFloat())
}
// calculate executing task cost in scheduler
nodeRowCount += b.scheduler.GetSegmentTaskDelta(nodeID, -1)
collectionRowCount := 0
// calculate collection sealed segment row count
collectionSegments := b.dist.SegmentDistManager.GetByCollectionAndNode(collectionID, nodeID)
@ -173,7 +176,11 @@ func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int {
for _, view := range collectionViews {
collectionRowCount += int(float64(view.NumOfGrowingRows) * params.Params.QueryCoordCfg.GrowingRowCountWeight.GetAsFloat())
}
return collectionRowCount + int(float64(rowCount)*
// calculate executing task cost in scheduler
collectionRowCount += b.scheduler.GetSegmentTaskDelta(nodeID, collectionID)
return collectionRowCount + int(float64(nodeRowCount)*
params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat())
}

View File

@ -19,6 +19,7 @@ import (
"testing"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
@ -72,6 +73,9 @@ func (suite *ScoreBasedBalancerTestSuite) SetupTest() {
distManager := meta.NewDistributionManager()
suite.mockScheduler = task.NewMockScheduler(suite.T())
suite.balancer = NewScoreBasedBalancer(suite.mockScheduler, nodeManager, distManager, testMeta, testTarget)
suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
}
func (suite *ScoreBasedBalancerTestSuite) TearDownTest() {
@ -431,6 +435,101 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceOneRound() {
}
}
func (suite *ScoreBasedBalancerTestSuite) TestBalanceWithExecutingTask() {
cases := []struct {
name string
nodes []int64
collectionID int64
replicaID int64
collectionsSegments []*datapb.SegmentInfo
states []session.State
shouldMock bool
distributions map[int64][]*meta.Segment
distributionChannels map[int64][]*meta.DmChannel
deltaCounts []int
expectPlans []SegmentAssignPlan
expectChannelPlans []ChannelAssignPlan
}{
{
name: "normal balance for one collection only",
nodes: []int64{1, 2, 3},
deltaCounts: []int{30, 0, 0},
collectionID: 1,
replicaID: 1,
collectionsSegments: []*datapb.SegmentInfo{
{ID: 1, PartitionID: 1}, {ID: 2, PartitionID: 1}, {ID: 3, PartitionID: 1},
},
states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal},
distributions: map[int64][]*meta.Segment{
1: {{SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 10}, Node: 1}},
2: {{SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 10}, Node: 2}},
3: {
{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 20}, Node: 3},
{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 30}, Node: 3},
},
},
expectPlans: []SegmentAssignPlan{
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 20}, Node: 3}, From: 3, To: 2, ReplicaID: 1},
},
expectChannelPlans: []ChannelAssignPlan{},
},
}
for _, c := range cases {
suite.Run(c.name, func() {
suite.SetupSuite()
defer suite.TearDownTest()
balancer := suite.balancer
// 1. set up target for multi collections
collection := utils.CreateTestCollection(c.collectionID, int32(c.replicaID))
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, c.collectionID).Return(
nil, c.collectionsSegments, nil)
suite.broker.EXPECT().GetPartitions(mock.Anything, c.collectionID).Return([]int64{c.collectionID}, nil).Maybe()
collection.LoadPercentage = 100
collection.Status = querypb.LoadStatus_Loaded
balancer.meta.CollectionManager.PutCollection(collection)
balancer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(c.collectionID, c.collectionID))
balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(c.replicaID, c.collectionID, c.nodes))
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
// 2. set up target for distribution for multi collections
for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...)
}
for node, v := range c.distributionChannels {
balancer.dist.ChannelDistManager.Update(node, v...)
}
// 3. set up nodes info and resourceManager for balancer
for i := range c.nodes {
nodeInfo := session.NewNodeInfo(c.nodes[i], "127.0.0.1:0")
nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]])))
nodeInfo.SetState(c.states[i])
suite.balancer.nodeManager.Add(nodeInfo)
suite.balancer.meta.ResourceManager.HandleNodeUp(c.nodes[i])
}
// set node delta count
suite.mockScheduler.ExpectedCalls = nil
for i, node := range c.nodes {
suite.mockScheduler.EXPECT().GetSegmentTaskDelta(node, int64(1)).Return(c.deltaCounts[i]).Maybe()
suite.mockScheduler.EXPECT().GetSegmentTaskDelta(node, int64(-1)).Return(c.deltaCounts[i]).Maybe()
}
// 4. balance and verify result
segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, c.collectionID)
assert.Equal(suite.T(), len(c.expectPlans), len(segmentPlans))
for i := range segmentPlans {
assert.Equal(suite.T(), c.expectPlans[i].To, segmentPlans[i].To)
}
assert.Equal(suite.T(), len(c.expectChannelPlans), len(channelPlans))
})
}
}
func (suite *ScoreBasedBalancerTestSuite) TestBalanceMultiRound() {
balanceCase := struct {
name string
@ -636,11 +735,8 @@ func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() {
expectChannelPlans: []ChannelAssignPlan{},
},
}
for i, c := range cases {
for _, c := range cases {
suite.Run(c.name, func() {
if i == 0 {
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe()
}
suite.SetupSuite()
defer suite.TearDownTest()
balancer := suite.balancer

View File

@ -108,6 +108,9 @@ func (suite *OpsServiceSuite) SetupTest() {
suite.cluster = session.NewMockCluster(suite.T())
suite.jobScheduler = job.NewScheduler()
suite.taskScheduler = task.NewMockScheduler(suite.T())
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.jobScheduler.Start()
suite.balancer = balance.NewScoreBasedBalancer(
suite.taskScheduler,
@ -539,6 +542,8 @@ func (suite *OpsServiceSuite) TestTransferSegment() {
// test copy mode, expect generate 1 load segment task
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error {
actions := t.Actions()
suite.Equal(len(actions), 1)
@ -556,6 +561,8 @@ func (suite *OpsServiceSuite) TestTransferSegment() {
// test transfer all segments, expect generate 4 load segment task
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
counter := atomic.NewInt64(0)
suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error {
actions := t.Actions()
@ -575,6 +582,8 @@ func (suite *OpsServiceSuite) TestTransferSegment() {
// test transfer all segment to all nodes, expect generate 4 load segment task
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
counter = atomic.NewInt64(0)
nodeSet := typeutil.NewUniqueSet()
suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error {
@ -745,6 +754,8 @@ func (suite *OpsServiceSuite) TestTransferChannel() {
// test copy mode, expect generate 1 load segment task
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error {
actions := t.Actions()
suite.Equal(len(actions), 1)
@ -762,6 +773,8 @@ func (suite *OpsServiceSuite) TestTransferChannel() {
// test transfer all channels, expect generate 4 load segment task
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
counter := atomic.NewInt64(0)
suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error {
actions := t.Actions()
@ -781,6 +794,8 @@ func (suite *OpsServiceSuite) TestTransferChannel() {
// test transfer all channels to all nodes, expect generate 4 load segment task
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
counter = atomic.NewInt64(0)
nodeSet := typeutil.NewUniqueSet()
suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error {

View File

@ -162,6 +162,8 @@ func (suite *ServiceSuite) SetupTest() {
suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe()
suite.jobScheduler = job.NewScheduler()
suite.taskScheduler = task.NewMockScheduler(suite.T())
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.jobScheduler.Start()
suite.balancer = balance.NewRowCountBasedBalancer(
suite.taskScheduler,
@ -1096,7 +1098,9 @@ func (suite *ServiceSuite) TestLoadBalance() {
DstNodeIDs: []int64{dstNode},
SealedSegmentIDs: segments,
}
suite.taskScheduler.ExpectedCalls = make([]*mock.Call, 0)
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().Add(mock.Anything).Run(func(task task.Task) {
actions := task.Actions()
suite.Len(actions, 2)
@ -1141,7 +1145,9 @@ func (suite *ServiceSuite) TestLoadBalanceWithNoDstNode() {
SourceNodeIDs: []int64{srcNode},
SealedSegmentIDs: segments,
}
suite.taskScheduler.ExpectedCalls = make([]*mock.Call, 0)
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().Add(mock.Anything).Run(func(task task.Task) {
actions := task.Actions()
suite.Len(actions, 2)
@ -1214,7 +1220,9 @@ func (suite *ServiceSuite) TestLoadBalanceWithEmptySegmentList() {
SourceNodeIDs: []int64{srcNode},
DstNodeIDs: []int64{dstNode},
}
suite.taskScheduler.ExpectedCalls = make([]*mock.Call, 0)
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().Add(mock.Anything).Run(func(t task.Task) {
actions := t.Actions()
suite.Len(actions, 2)

View File

@ -125,6 +125,49 @@ func (_c *MockScheduler_Dispatch_Call) RunAndReturn(run func(int64)) *MockSchedu
return _c
}
// GetChannelTaskDelta provides a mock function with given fields: nodeID, collectionID
func (_m *MockScheduler) GetChannelTaskDelta(nodeID int64, collectionID int64) int {
ret := _m.Called(nodeID, collectionID)
var r0 int
if rf, ok := ret.Get(0).(func(int64, int64) int); ok {
r0 = rf(nodeID, collectionID)
} else {
r0 = ret.Get(0).(int)
}
return r0
}
// MockScheduler_GetChannelTaskDelta_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetChannelTaskDelta'
type MockScheduler_GetChannelTaskDelta_Call struct {
*mock.Call
}
// GetChannelTaskDelta is a helper method to define mock.On call
// - nodeID int64
// - collectionID int64
func (_e *MockScheduler_Expecter) GetChannelTaskDelta(nodeID interface{}, collectionID interface{}) *MockScheduler_GetChannelTaskDelta_Call {
return &MockScheduler_GetChannelTaskDelta_Call{Call: _e.mock.On("GetChannelTaskDelta", nodeID, collectionID)}
}
func (_c *MockScheduler_GetChannelTaskDelta_Call) Run(run func(nodeID int64, collectionID int64)) *MockScheduler_GetChannelTaskDelta_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(int64))
})
return _c
}
func (_c *MockScheduler_GetChannelTaskDelta_Call) Return(_a0 int) *MockScheduler_GetChannelTaskDelta_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockScheduler_GetChannelTaskDelta_Call) RunAndReturn(run func(int64, int64) int) *MockScheduler_GetChannelTaskDelta_Call {
_c.Call.Return(run)
return _c
}
// GetChannelTaskNum provides a mock function with given fields:
func (_m *MockScheduler) GetChannelTaskNum() int {
ret := _m.Called()
@ -210,13 +253,13 @@ func (_c *MockScheduler_GetExecutedFlag_Call) RunAndReturn(run func(int64) <-cha
return _c
}
// GetNodeChannelDelta provides a mock function with given fields: nodeID
func (_m *MockScheduler) GetNodeChannelDelta(nodeID int64) int {
ret := _m.Called(nodeID)
// GetSegmentTaskDelta provides a mock function with given fields: nodeID, collectionID
func (_m *MockScheduler) GetSegmentTaskDelta(nodeID int64, collectionID int64) int {
ret := _m.Called(nodeID, collectionID)
var r0 int
if rf, ok := ret.Get(0).(func(int64) int); ok {
r0 = rf(nodeID)
if rf, ok := ret.Get(0).(func(int64, int64) int); ok {
r0 = rf(nodeID, collectionID)
} else {
r0 = ret.Get(0).(int)
}
@ -224,72 +267,31 @@ func (_m *MockScheduler) GetNodeChannelDelta(nodeID int64) int {
return r0
}
// MockScheduler_GetNodeChannelDelta_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeChannelDelta'
type MockScheduler_GetNodeChannelDelta_Call struct {
// MockScheduler_GetSegmentTaskDelta_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSegmentTaskDelta'
type MockScheduler_GetSegmentTaskDelta_Call struct {
*mock.Call
}
// GetNodeChannelDelta is a helper method to define mock.On call
// GetSegmentTaskDelta is a helper method to define mock.On call
// - nodeID int64
func (_e *MockScheduler_Expecter) GetNodeChannelDelta(nodeID interface{}) *MockScheduler_GetNodeChannelDelta_Call {
return &MockScheduler_GetNodeChannelDelta_Call{Call: _e.mock.On("GetNodeChannelDelta", nodeID)}
// - collectionID int64
func (_e *MockScheduler_Expecter) GetSegmentTaskDelta(nodeID interface{}, collectionID interface{}) *MockScheduler_GetSegmentTaskDelta_Call {
return &MockScheduler_GetSegmentTaskDelta_Call{Call: _e.mock.On("GetSegmentTaskDelta", nodeID, collectionID)}
}
func (_c *MockScheduler_GetNodeChannelDelta_Call) Run(run func(nodeID int64)) *MockScheduler_GetNodeChannelDelta_Call {
func (_c *MockScheduler_GetSegmentTaskDelta_Call) Run(run func(nodeID int64, collectionID int64)) *MockScheduler_GetSegmentTaskDelta_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
run(args[0].(int64), args[1].(int64))
})
return _c
}
func (_c *MockScheduler_GetNodeChannelDelta_Call) Return(_a0 int) *MockScheduler_GetNodeChannelDelta_Call {
func (_c *MockScheduler_GetSegmentTaskDelta_Call) Return(_a0 int) *MockScheduler_GetSegmentTaskDelta_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockScheduler_GetNodeChannelDelta_Call) RunAndReturn(run func(int64) int) *MockScheduler_GetNodeChannelDelta_Call {
_c.Call.Return(run)
return _c
}
// GetNodeSegmentDelta provides a mock function with given fields: nodeID
func (_m *MockScheduler) GetNodeSegmentDelta(nodeID int64) int {
ret := _m.Called(nodeID)
var r0 int
if rf, ok := ret.Get(0).(func(int64) int); ok {
r0 = rf(nodeID)
} else {
r0 = ret.Get(0).(int)
}
return r0
}
// MockScheduler_GetNodeSegmentDelta_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeSegmentDelta'
type MockScheduler_GetNodeSegmentDelta_Call struct {
*mock.Call
}
// GetNodeSegmentDelta is a helper method to define mock.On call
// - nodeID int64
func (_e *MockScheduler_Expecter) GetNodeSegmentDelta(nodeID interface{}) *MockScheduler_GetNodeSegmentDelta_Call {
return &MockScheduler_GetNodeSegmentDelta_Call{Call: _e.mock.On("GetNodeSegmentDelta", nodeID)}
}
func (_c *MockScheduler_GetNodeSegmentDelta_Call) Run(run func(nodeID int64)) *MockScheduler_GetNodeSegmentDelta_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockScheduler_GetNodeSegmentDelta_Call) Return(_a0 int) *MockScheduler_GetNodeSegmentDelta_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockScheduler_GetNodeSegmentDelta_Call) RunAndReturn(run func(int64) int) *MockScheduler_GetNodeSegmentDelta_Call {
func (_c *MockScheduler_GetSegmentTaskDelta_Call) RunAndReturn(run func(int64, int64) int) *MockScheduler_GetSegmentTaskDelta_Call {
_c.Call.Return(run)
return _c
}

View File

@ -133,13 +133,14 @@ type Scheduler interface {
Add(task Task) error
Dispatch(node int64)
RemoveByNode(node int64)
GetNodeSegmentDelta(nodeID int64) int
GetNodeChannelDelta(nodeID int64) int
GetExecutedFlag(nodeID int64) <-chan struct{}
GetChannelTaskNum() int
GetSegmentTaskNum() int
Sync(segmentID, replicaID int64) bool
RemoveSync(segmentID, replicaID int64)
GetSegmentTaskDelta(nodeID int64, collectionID int64) int
GetChannelTaskDelta(nodeID int64, collectionID int64) int
}
type taskScheduler struct {
@ -162,6 +163,11 @@ type taskScheduler struct {
waitQueue *taskQueue
syncTasks map[replicaSegmentIndex]struct{}
// delta changes measure by segment row count and channel num
// executing task delta changes on node: nodeID -> collectionID -> delta changes
// delta changes measure by segment row count and channel num
segmentExecutingTaskDelta map[int64]map[int64]int
channelExecutingTaskDelta map[int64]map[int64]int
}
func NewScheduler(ctx context.Context,
@ -194,6 +200,8 @@ func NewScheduler(ctx context.Context,
processQueue: newTaskQueue(),
waitQueue: newTaskQueue(),
syncTasks: make(map[replicaSegmentIndex]struct{}),
segmentExecutingTaskDelta: make(map[int64]map[int64]int),
channelExecutingTaskDelta: make(map[int64]map[int64]int),
}
}
@ -206,6 +214,8 @@ func (scheduler *taskScheduler) Stop() {
for nodeID, executor := range scheduler.executors {
executor.Stop()
delete(scheduler.executors, nodeID)
delete(scheduler.segmentExecutingTaskDelta, nodeID)
delete(scheduler.channelExecutingTaskDelta, nodeID)
}
for _, task := range scheduler.segmentTasks {
@ -231,6 +241,8 @@ func (scheduler *taskScheduler) AddExecutor(nodeID int64) {
scheduler.cluster,
scheduler.nodeMgr)
scheduler.segmentExecutingTaskDelta[nodeID] = make(map[int64]int)
scheduler.channelExecutingTaskDelta[nodeID] = make(map[int64]int)
scheduler.executors[nodeID] = executor
executor.Start(scheduler.ctx)
log.Info("add executor for new QueryNode", zap.Int64("nodeID", nodeID))
@ -244,6 +256,8 @@ func (scheduler *taskScheduler) RemoveExecutor(nodeID int64) {
if ok {
executor.Stop()
delete(scheduler.executors, nodeID)
delete(scheduler.segmentExecutingTaskDelta, nodeID)
delete(scheduler.channelExecutingTaskDelta, nodeID)
log.Info("remove executor of offline QueryNode", zap.Int64("nodeID", nodeID))
}
}
@ -301,10 +315,51 @@ func (scheduler *taskScheduler) Add(task Task) error {
}
scheduler.updateTaskMetrics()
scheduler.updateTaskDelta(task)
log.Ctx(task.Context()).Info("task added", zap.String("task", task.String()))
return nil
}
func (scheduler *taskScheduler) updateTaskDelta(task Task) {
var delta int
var deltaMap map[int64]map[int64]int
switch task := task.(type) {
case *SegmentTask:
// skip growing segment's count, cause doesn't know realtime row number of growing segment
if task.Actions()[0].(*SegmentAction).Scope() == querypb.DataScope_Historical {
segment := scheduler.targetMgr.GetSealedSegment(task.CollectionID(), task.SegmentID(), meta.NextTargetFirst)
if segment != nil {
delta = int(segment.GetNumOfRows())
}
}
deltaMap = scheduler.segmentExecutingTaskDelta
case *ChannelTask:
delta = 1
deltaMap = scheduler.channelExecutingTaskDelta
}
// turn delta to negative when try to remove task
if task.Status() == TaskStatusSucceeded || task.Status() == TaskStatusFailed || task.Status() == TaskStatusCanceled {
delta = -delta
}
if delta != 0 {
for _, action := range task.Actions() {
if deltaMap[action.Node()] == nil {
deltaMap[action.Node()] = make(map[int64]int)
}
if action.Type() == ActionTypeGrow {
deltaMap[action.Node()][task.CollectionID()] += delta
} else if action.Type() == ActionTypeReduce {
deltaMap[action.Node()][task.CollectionID()] -= delta
}
}
}
}
func (scheduler *taskScheduler) updateTaskMetrics() {
segmentGrowNum, segmentReduceNum, segmentMoveNum := 0, 0, 0
channelGrowNum, channelReduceNum, channelMoveNum := 0, 0, 0
@ -484,18 +539,39 @@ func (scheduler *taskScheduler) Dispatch(node int64) {
}
}
func (scheduler *taskScheduler) GetNodeSegmentDelta(nodeID int64) int {
func (scheduler *taskScheduler) GetSegmentTaskDelta(nodeID, collectionID int64) int {
scheduler.rwmutex.RLock()
defer scheduler.rwmutex.RUnlock()
return calculateNodeDelta(nodeID, scheduler.segmentTasks)
return scheduler.calculateTaskDelta(nodeID, collectionID, scheduler.segmentExecutingTaskDelta)
}
func (scheduler *taskScheduler) GetNodeChannelDelta(nodeID int64) int {
func (scheduler *taskScheduler) GetChannelTaskDelta(nodeID, collectionID int64) int {
scheduler.rwmutex.RLock()
defer scheduler.rwmutex.RUnlock()
return calculateNodeDelta(nodeID, scheduler.channelTasks)
return scheduler.calculateTaskDelta(nodeID, collectionID, scheduler.channelExecutingTaskDelta)
}
func (scheduler *taskScheduler) calculateTaskDelta(nodeID, collectionID int64, deltaMap map[int64]map[int64]int) int {
if nodeID == -1 && collectionID == -1 {
return 0
}
sum := 0
for nid, nInfo := range deltaMap {
if nid != nodeID && -1 != nodeID {
continue
}
for cid, cInfo := range nInfo {
if cid == collectionID || -1 == collectionID {
sum += cInfo
}
}
}
return sum
}
func (scheduler *taskScheduler) GetExecutedFlag(nodeID int64) <-chan struct{} {
@ -524,45 +600,6 @@ func (scheduler *taskScheduler) GetSegmentTaskNum() int {
return len(scheduler.segmentTasks)
}
func calculateNodeDelta[K comparable, T ~map[K]Task](nodeID int64, tasks T) int {
delta := 0
for _, task := range tasks {
for _, action := range task.Actions() {
if action.Node() != nodeID {
continue
}
if action.Type() == ActionTypeGrow {
delta++
} else if action.Type() == ActionTypeReduce {
delta--
}
}
}
return delta
}
func (scheduler *taskScheduler) GetNodeSegmentCntDelta(nodeID int64) int {
scheduler.rwmutex.RLock()
defer scheduler.rwmutex.RUnlock()
delta := 0
for _, task := range scheduler.segmentTasks {
for _, action := range task.Actions() {
if action.Node() != nodeID {
continue
}
segmentAction := action.(*SegmentAction)
segment := scheduler.targetMgr.GetSealedSegment(task.CollectionID(), segmentAction.SegmentID(), meta.NextTarget)
if action.Type() == ActionTypeGrow {
delta += int(segment.GetNumOfRows())
} else {
delta -= int(segment.GetNumOfRows())
}
}
}
return delta
}
// schedule selects some tasks to execute, follow these steps for each started selected tasks:
// 1. check whether this task is stale, set status to canceled if stale
// 2. step up the task's actions, set status to succeeded if all actions finished
@ -814,6 +851,7 @@ func (scheduler *taskScheduler) remove(task Task) {
log = log.With(zap.String("channel", task.Channel()))
}
scheduler.updateTaskDelta(task)
scheduler.updateTaskMetrics()
log.Info("task removed")
}