fix balance generate unexpected task (#23299)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/23346/head
wei liu 2023-04-11 14:38:30 +08:00 committed by GitHub
parent f1f8ce01de
commit dbbd703667
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 303 additions and 187 deletions

View File

@ -25,45 +25,16 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/task"
) )
type Weight = int
const (
weightLow int = iota - 1
weightNormal
weightHigh
)
func GetWeight(w int) Weight {
if w > 0 {
return weightHigh
} else if w < 0 {
return weightLow
}
return weightNormal
}
func GetTaskPriorityFromWeight(w Weight) task.Priority {
switch w {
case weightHigh:
return task.TaskPriorityHigh
case weightLow:
return task.TaskPriorityLow
default:
return task.TaskPriorityNormal
}
}
type SegmentAssignPlan struct { type SegmentAssignPlan struct {
Segment *meta.Segment Segment *meta.Segment
ReplicaID int64 ReplicaID int64
From int64 // -1 if empty From int64 // -1 if empty
To int64 To int64
Weight Weight
} }
func (segPlan SegmentAssignPlan) ToString() string { func (segPlan SegmentAssignPlan) ToString() string {
return fmt.Sprintf("SegmentPlan:[collectionID: %d, replicaID: %d, segmentID: %d, from: %d, to: %d, weight: %d]\n", return fmt.Sprintf("SegmentPlan:[collectionID: %d, replicaID: %d, segmentID: %d, from: %d, to: %d]\n",
segPlan.Segment.CollectionID, segPlan.ReplicaID, segPlan.Segment.ID, segPlan.From, segPlan.To, segPlan.Weight) segPlan.Segment.CollectionID, segPlan.ReplicaID, segPlan.Segment.ID, segPlan.From, segPlan.To)
} }
type ChannelAssignPlan struct { type ChannelAssignPlan struct {
@ -71,12 +42,11 @@ type ChannelAssignPlan struct {
ReplicaID int64 ReplicaID int64
From int64 From int64
To int64 To int64
Weight Weight
} }
func (chanPlan ChannelAssignPlan) ToString() string { func (chanPlan ChannelAssignPlan) ToString() string {
return fmt.Sprintf("ChannelPlan:[collectionID: %d, channel: %s, replicaID: %d, from: %d, to: %d, weight: %d]\n", return fmt.Sprintf("ChannelPlan:[collectionID: %d, channel: %s, replicaID: %d, from: %d, to: %d]\n",
chanPlan.Channel.CollectionID, chanPlan.Channel.ChannelName, chanPlan.ReplicaID, chanPlan.From, chanPlan.To, chanPlan.Weight) chanPlan.Channel.CollectionID, chanPlan.Channel.ChannelName, chanPlan.ReplicaID, chanPlan.From, chanPlan.To)
} }
var ( var (

View File

@ -158,20 +158,6 @@ func (suite *BalanceTestSuite) TestAssignChannel() {
} }
} }
func (suite *BalanceTestSuite) TestWeight() {
suite.Run("GetWeight", func() {
suite.Equal(weightHigh, GetWeight(10))
suite.Equal(weightNormal, GetWeight(0))
suite.Equal(weightLow, GetWeight(-10))
})
suite.Run("GetTaskPriorityFromWeight", func() {
suite.Equal(task.TaskPriorityHigh, GetTaskPriorityFromWeight(weightHigh))
suite.Equal(task.TaskPriorityNormal, GetTaskPriorityFromWeight(weightNormal))
suite.Equal(task.TaskPriorityLow, GetTaskPriorityFromWeight(weightLow))
})
}
func TestBalanceSuite(t *testing.T) { func TestBalanceSuite(t *testing.T) {
suite.Run(t, new(BalanceTestSuite)) suite.Run(t, new(BalanceTestSuite))
} }

View File

@ -98,6 +98,12 @@ func (b *RowCountBasedBalancer) Balance() ([]SegmentAssignPlan, []ChannelAssignP
replicas := b.meta.ReplicaManager.GetByCollection(cid) replicas := b.meta.ReplicaManager.GetByCollection(cid)
for _, replica := range replicas { for _, replica := range replicas {
splans, cplans := b.balanceReplica(replica) splans, cplans := b.balanceReplica(replica)
if len(splans) > 0 || len(cplans) > 0 {
log.Debug("nodes info in replica",
zap.Int64("collection", replica.CollectionID),
zap.Int64("replica", replica.ID),
zap.Int64s("nodes", replica.GetNodes()))
}
segmentPlans = append(segmentPlans, splans...) segmentPlans = append(segmentPlans, splans...)
channelPlans = append(channelPlans, cplans...) channelPlans = append(channelPlans, cplans...)
} }
@ -108,13 +114,11 @@ func (b *RowCountBasedBalancer) Balance() ([]SegmentAssignPlan, []ChannelAssignP
func (b *RowCountBasedBalancer) balanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) { func (b *RowCountBasedBalancer) balanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) {
log := log.Ctx(context.Background()).WithRateGroup("qcv2.rowCountBalancer", 1.0, 60.0) log := log.Ctx(context.Background()).WithRateGroup("qcv2.rowCountBalancer", 1.0, 60.0)
nodes := replica.GetNodes() nodes := replica.GetNodes()
if len(nodes) == 0 { if len(nodes) < 2 {
return nil, nil return nil, nil
} }
nodesRowCnt := make(map[int64]int) onlineNodesSegments := make(map[int64][]*meta.Segment)
nodesSegments := make(map[int64][]*meta.Segment)
stoppingNodesSegments := make(map[int64][]*meta.Segment) stoppingNodesSegments := make(map[int64][]*meta.Segment)
outboundNodes := b.meta.ResourceManager.CheckOutboundNodes(replica) outboundNodes := b.meta.ResourceManager.CheckOutboundNodes(replica)
totalCnt := 0 totalCnt := 0
@ -122,7 +126,8 @@ func (b *RowCountBasedBalancer) balanceReplica(replica *meta.Replica) ([]Segment
segments := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nid) segments := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nid)
// Only balance segments in targets // Only balance segments in targets
segments = lo.Filter(segments, func(segment *meta.Segment, _ int) bool { segments = lo.Filter(segments, func(segment *meta.Segment, _ int) bool {
return b.targetMgr.GetHistoricalSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil return b.targetMgr.GetHistoricalSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
b.targetMgr.GetHistoricalSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil
}) })
if isStopping, err := b.nodeManager.IsStoppingNode(nid); err != nil { if isStopping, err := b.nodeManager.IsStoppingNode(nid); err != nil {
@ -139,67 +144,57 @@ func (b *RowCountBasedBalancer) balanceReplica(replica *meta.Replica) ([]Segment
) )
stoppingNodesSegments[nid] = segments stoppingNodesSegments[nid] = segments
} else { } else {
nodesSegments[nid] = segments onlineNodesSegments[nid] = segments
} }
cnt := 0
for _, s := range segments { for _, s := range segments {
cnt += int(s.GetNumOfRows()) totalCnt += int(s.GetNumOfRows())
} }
nodesRowCnt[nid] = cnt
totalCnt += cnt
} }
if len(nodes) == len(stoppingNodesSegments) { log.Info("balance channel xxxxx",
zap.Int64s("online nodes", lo.Keys(onlineNodesSegments)),
zap.Int64s("offline nodes", lo.Keys(stoppingNodesSegments)))
if len(nodes) == len(stoppingNodesSegments) || len(onlineNodesSegments) == 0 {
// no available nodes to balance // no available nodes to balance
return nil, nil return nil, nil
} }
if len(nodesSegments) == 0 {
return nil, nil
}
average := totalCnt / len(nodesSegments)
neededRowCnt := 0
for nodeID := range nodesSegments {
rowcnt := nodesRowCnt[nodeID]
if rowcnt < average {
neededRowCnt += average - rowcnt
}
}
if neededRowCnt == 0 {
return nil, nil
}
segmentsToMove := make([]*meta.Segment, 0) segmentsToMove := make([]*meta.Segment, 0)
for _, stopSegments := range stoppingNodesSegments {
stopSegments, cnt := b.collectionStoppingSegments(stoppingNodesSegments)
segmentsToMove = append(segmentsToMove, stopSegments...) segmentsToMove = append(segmentsToMove, stopSegments...)
neededRowCnt -= cnt
// select segments to be moved
outer:
for nodeID, segments := range nodesSegments {
rowcnt := nodesRowCnt[nodeID]
if rowcnt <= average {
continue
} }
// find nodes with less row count than average
nodesWithLessRow := newPriorityQueue()
average := totalCnt / len(onlineNodesSegments)
for node, segments := range onlineNodesSegments {
sort.Slice(segments, func(i, j int) bool { sort.Slice(segments, func(i, j int) bool {
return segments[i].GetNumOfRows() > segments[j].GetNumOfRows() return segments[i].GetNumOfRows() > segments[j].GetNumOfRows()
}) })
rowCount := 0
for _, s := range segments { for _, s := range segments {
if rowcnt-int(s.GetNumOfRows()) < average { rowCount += int(s.GetNumOfRows())
if rowCount <= average {
continue continue
} }
rowcnt -= int(s.GetNumOfRows())
segmentsToMove = append(segmentsToMove, s) segmentsToMove = append(segmentsToMove, s)
neededRowCnt -= int(s.GetNumOfRows())
if neededRowCnt <= 0 { }
break outer if rowCount < average {
item := newNodeItem(rowCount, node)
nodesWithLessRow.push(&item)
} }
} }
return b.genSegmentPlan(replica, nodesWithLessRow, segmentsToMove, average), b.genChannelPlan(replica, lo.Keys(onlineNodesSegments), lo.Keys(stoppingNodesSegments))
}
func (b *RowCountBasedBalancer) genSegmentPlan(replica *meta.Replica, nodesWithLessRowCount priorityQueue, segmentsToMove []*meta.Segment, average int) []SegmentAssignPlan {
if nodesWithLessRowCount.Len() == 0 || len(segmentsToMove) == 0 {
return nil
} }
sort.Slice(segmentsToMove, func(i, j int) bool { sort.Slice(segmentsToMove, func(i, j int) bool {
@ -207,56 +202,36 @@ outer:
}) })
// allocate segments to those nodes with row cnt less than average // allocate segments to those nodes with row cnt less than average
queue := newPriorityQueue() plans := make([]SegmentAssignPlan, 0)
for nodeID := range nodesSegments { for _, s := range segmentsToMove {
rowcnt := nodesRowCnt[nodeID] if nodesWithLessRowCount.Len() <= 0 {
if rowcnt >= average { break
continue
}
item := newNodeItem(rowcnt, nodeID)
queue.push(&item)
} }
plans := make([]SegmentAssignPlan, 0) node := nodesWithLessRowCount.pop().(*nodeItem)
getPlanWeight := func(nodeID int64) Weight { newPriority := node.getPriority() + int(s.GetNumOfRows())
if _, ok := stoppingNodesSegments[nodeID]; ok { if newPriority > average {
return GetWeight(1) nodesWithLessRowCount.push(node)
continue
} }
return GetWeight(0)
}
for _, s := range segmentsToMove {
node := queue.pop().(*nodeItem)
plan := SegmentAssignPlan{ plan := SegmentAssignPlan{
ReplicaID: replica.GetID(), ReplicaID: replica.GetID(),
From: s.Node, From: s.Node,
To: node.nodeID, To: node.nodeID,
Segment: s, Segment: s,
Weight: getPlanWeight(s.Node),
} }
plans = append(plans, plan) plans = append(plans, plan)
node.setPriority(node.getPriority() + int(s.GetNumOfRows())) node.setPriority(newPriority)
queue.push(node) nodesWithLessRowCount.push(node)
} }
return plans, b.getChannelPlan(replica, lo.Keys(nodesSegments), lo.Keys(stoppingNodesSegments)) return plans
} }
func (b *RowCountBasedBalancer) collectionStoppingSegments(stoppingNodesSegments map[int64][]*meta.Segment) ([]*meta.Segment, int) { func (b *RowCountBasedBalancer) genChannelPlan(replica *meta.Replica, onlineNodes []int64, offlineNodes []int64) []ChannelAssignPlan {
var ( log.Info("balance channel",
segments []*meta.Segment zap.Int64s("online nodes", onlineNodes),
removeRowCnt int zap.Int64s("offline nodes", offlineNodes))
)
for _, stoppingSegments := range stoppingNodesSegments {
for _, segment := range stoppingSegments {
segments = append(segments, segment)
removeRowCnt += int(segment.GetNumOfRows())
}
}
return segments, removeRowCnt
}
func (b *RowCountBasedBalancer) getChannelPlan(replica *meta.Replica, onlineNodes []int64, offlineNodes []int64) []ChannelAssignPlan {
channelPlans := make([]ChannelAssignPlan, 0) channelPlans := make([]ChannelAssignPlan, 0)
for _, nodeID := range offlineNodes { for _, nodeID := range offlineNodes {
dmChannels := b.dist.ChannelDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nodeID) dmChannels := b.dist.ChannelDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nodeID)
@ -264,7 +239,6 @@ func (b *RowCountBasedBalancer) getChannelPlan(replica *meta.Replica, onlineNode
for i := range plans { for i := range plans {
plans[i].From = nodeID plans[i].From = nodeID
plans[i].ReplicaID = replica.ID plans[i].ReplicaID = replica.ID
plans[i].Weight = GetWeight(1)
} }
channelPlans = append(channelPlans, plans...) channelPlans = append(channelPlans, plans...)
} }
@ -310,7 +284,6 @@ func (b *RowCountBasedBalancer) getChannelPlan(replica *meta.Replica, onlineNode
From: targetNode, From: targetNode,
To: sourceNode, To: sourceNode,
ReplicaID: replica.ID, ReplicaID: replica.ID,
Weight: GetWeight(1),
} }
channelPlans = append(channelPlans, plan) channelPlans = append(channelPlans, plan)
for end > 0 && getChannelNum(nodes[end]) <= averageChannel { for end > 0 && getChannelNum(nodes[end]) <= averageChannel {

View File

@ -162,6 +162,25 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
}, },
expectChannelPlans: []ChannelAssignPlan{}, expectChannelPlans: []ChannelAssignPlan{},
}, },
{
name: "balance won't trigger",
nodes: []int64{1, 2, 3},
segmentCnts: []int{1, 2, 2},
states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal},
distributions: map[int64][]*meta.Segment{
1: {{SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 40}, Node: 1}},
2: {
{SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 10}, Node: 2},
{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 40}, Node: 2},
},
3: {
{SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 10}, Node: 3},
{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 40}, Node: 3},
},
},
expectPlans: []SegmentAssignPlan{},
expectChannelPlans: []ChannelAssignPlan{},
},
{ {
name: "all stopping balance", name: "all stopping balance",
nodes: []int64{1, 2}, nodes: []int64{1, 2},
@ -203,11 +222,11 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
}, },
}, },
expectPlans: []SegmentAssignPlan{ expectPlans: []SegmentAssignPlan{
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, ReplicaID: 1, Weight: weightHigh}, {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, ReplicaID: 1},
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, ReplicaID: 1, Weight: weightHigh}, {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, ReplicaID: 1},
}, },
expectChannelPlans: []ChannelAssignPlan{ expectChannelPlans: []ChannelAssignPlan{
{Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3}, From: 3, To: 1, ReplicaID: 1, Weight: weightHigh}, {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3}, From: 3, To: 1, ReplicaID: 1},
}, },
}, },
{ {
@ -235,7 +254,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
}, },
expectPlans: []SegmentAssignPlan{}, expectPlans: []SegmentAssignPlan{},
expectChannelPlans: []ChannelAssignPlan{ expectChannelPlans: []ChannelAssignPlan{
{Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 2}, From: 2, To: 3, ReplicaID: 1, Weight: weightHigh}, {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 2}, From: 2, To: 3, ReplicaID: 1},
}, },
}, },
{ {
@ -252,20 +271,11 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
}, },
10: {{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 30}, Node: 10}}, 10: {{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 30}, Node: 10}},
}, },
// distributionChannels: map[int64][]*meta.DmChannel{
// 1: {
// {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 1},
// },
// 2: {
// {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 2},
// },
// },
expectPlans: []SegmentAssignPlan{}, expectPlans: []SegmentAssignPlan{},
expectChannelPlans: []ChannelAssignPlan{}, expectChannelPlans: []ChannelAssignPlan{},
}, },
} }
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0)
for _, c := range cases { for _, c := range cases {
suite.Run(c.name, func() { suite.Run(c.name, func() {
suite.SetupSuite() suite.SetupSuite()
@ -289,8 +299,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
SegmentID: 5, SegmentID: 5,
}, },
} }
suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, int64(1), int64(1)).Return( suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, int64(1), int64(1)).Return(nil, segments, nil)
nil, segments, nil)
balancer.targetMgr.UpdateCollectionNextTargetWithPartitions(int64(1), int64(1)) balancer.targetMgr.UpdateCollectionNextTargetWithPartitions(int64(1), int64(1))
balancer.targetMgr.UpdateCollectionCurrentTarget(1, 1) balancer.targetMgr.UpdateCollectionCurrentTarget(1, 1)
collection.LoadPercentage = 100 collection.LoadPercentage = 100
@ -298,6 +307,194 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
collection.LoadType = querypb.LoadType_LoadCollection collection.LoadType = querypb.LoadType_LoadCollection
balancer.meta.CollectionManager.PutCollection(collection) balancer.meta.CollectionManager.PutCollection(collection)
balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, append(c.nodes, c.notExistedNodes...))) balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, append(c.nodes, c.notExistedNodes...)))
suite.broker.ExpectedCalls = nil
suite.broker.EXPECT().GetPartitions(mock.Anything, int64(1)).Return([]int64{1}, nil).Maybe()
suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, int64(1), int64(1)).Return(nil, segments, nil)
balancer.targetMgr.UpdateCollectionNextTargetWithPartitions(int64(1), int64(1))
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0)
for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...)
}
for node, v := range c.distributionChannels {
balancer.dist.ChannelDistManager.Update(node, v...)
}
for i := range c.nodes {
nodeInfo := session.NewNodeInfo(c.nodes[i], "127.0.0.1:0")
nodeInfo.UpdateStats(session.WithSegmentCnt(c.segmentCnts[i]))
nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]])))
nodeInfo.SetState(c.states[i])
suite.balancer.nodeManager.Add(nodeInfo)
suite.balancer.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, c.nodes[i])
}
segmentPlans, channelPlans := balancer.Balance()
suite.ElementsMatch(c.expectChannelPlans, channelPlans)
suite.ElementsMatch(c.expectPlans, segmentPlans)
})
}
}
func (suite *RowCountBasedBalancerTestSuite) TestBalanceOnPartStopping() {
cases := []struct {
name string
nodes []int64
notExistedNodes []int64
segmentCnts []int
states []session.State
shouldMock bool
distributions map[int64][]*meta.Segment
distributionChannels map[int64][]*meta.DmChannel
segmentInCurrent []*datapb.SegmentBinlogs
segmentInNext []*datapb.SegmentBinlogs
expectPlans []SegmentAssignPlan
expectChannelPlans []ChannelAssignPlan
}{
{
name: "exist in next target",
nodes: []int64{1, 2, 3},
segmentCnts: []int{1, 2, 2},
states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateStopping},
shouldMock: true,
distributions: map[int64][]*meta.Segment{
1: {{SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 10}, Node: 1}},
2: {
{SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 20}, Node: 2},
{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 30}, Node: 2},
},
3: {
{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 3},
{SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 3},
},
},
segmentInCurrent: []*datapb.SegmentBinlogs{
{
SegmentID: 1,
},
{
SegmentID: 2,
},
{
SegmentID: 3,
},
{
SegmentID: 4,
},
{
SegmentID: 5,
},
},
segmentInNext: []*datapb.SegmentBinlogs{
{
SegmentID: 1,
},
{
SegmentID: 2,
},
{
SegmentID: 3,
},
{
SegmentID: 4,
},
{
SegmentID: 5,
},
},
distributionChannels: map[int64][]*meta.DmChannel{
2: {
{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 2},
},
3: {
{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3},
},
},
expectPlans: []SegmentAssignPlan{
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, ReplicaID: 1},
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, ReplicaID: 1},
},
expectChannelPlans: []ChannelAssignPlan{
{Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3}, From: 3, To: 1, ReplicaID: 1},
},
},
{
name: "not exist in next target",
nodes: []int64{1, 2, 3},
segmentCnts: []int{1, 2, 2},
states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateStopping},
shouldMock: true,
distributions: map[int64][]*meta.Segment{
1: {{SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 10}, Node: 1}},
2: {
{SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 20}, Node: 2},
{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 30}, Node: 2},
},
3: {
{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 3},
{SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 3},
},
},
segmentInCurrent: []*datapb.SegmentBinlogs{
{
SegmentID: 1,
},
{
SegmentID: 2,
},
{
SegmentID: 3,
},
{
SegmentID: 4,
},
{
SegmentID: 5,
},
},
segmentInNext: []*datapb.SegmentBinlogs{
{
SegmentID: 1,
},
{
SegmentID: 2,
},
},
distributionChannels: map[int64][]*meta.DmChannel{
2: {
{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 2},
},
3: {
{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3},
},
},
expectPlans: []SegmentAssignPlan{},
expectChannelPlans: []ChannelAssignPlan{
{Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3}, From: 3, To: 1, ReplicaID: 1},
},
},
}
for _, c := range cases {
suite.Run(c.name, func() {
suite.SetupSuite()
defer suite.TearDownTest()
balancer := suite.balancer
collection := utils.CreateTestCollection(1, 1)
suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, int64(1), int64(1)).Return(nil, c.segmentInCurrent, nil)
balancer.targetMgr.UpdateCollectionNextTargetWithPartitions(int64(1), int64(1))
balancer.targetMgr.UpdateCollectionCurrentTarget(1, 1)
collection.LoadPercentage = 100
collection.LoadType = querypb.LoadType_LoadCollection
collection.Status = querypb.LoadStatus_Loaded
balancer.meta.CollectionManager.PutCollection(collection)
balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, append(c.nodes, c.notExistedNodes...)))
suite.broker.ExpectedCalls = nil
suite.broker.EXPECT().GetPartitions(mock.Anything, int64(1)).Return([]int64{1}, nil).Maybe()
suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, int64(1), int64(1)).Return(nil, c.segmentInNext, nil)
balancer.targetMgr.UpdateCollectionNextTargetWithPartitions(int64(1), int64(1))
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0)
for node, s := range c.distributions { for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...) balancer.dist.SegmentDistManager.Update(node, s...)
} }
@ -359,11 +556,11 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOutboundNodes() {
}, },
}, },
expectPlans: []SegmentAssignPlan{ expectPlans: []SegmentAssignPlan{
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, ReplicaID: 1, Weight: weightHigh}, {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, ReplicaID: 1},
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, ReplicaID: 1, Weight: weightHigh}, {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, ReplicaID: 1},
}, },
expectChannelPlans: []ChannelAssignPlan{ expectChannelPlans: []ChannelAssignPlan{
{Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3}, From: 3, To: 1, ReplicaID: 1, Weight: weightHigh}, {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3}, From: 3, To: 1, ReplicaID: 1},
}, },
}, },
} }
@ -392,8 +589,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOutboundNodes() {
SegmentID: 5, SegmentID: 5,
}, },
} }
suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, int64(1), int64(1)).Return( suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, int64(1), int64(1)).Return(nil, segments, nil)
nil, segments, nil)
balancer.targetMgr.UpdateCollectionNextTargetWithPartitions(int64(1), int64(1)) balancer.targetMgr.UpdateCollectionNextTargetWithPartitions(int64(1), int64(1))
balancer.targetMgr.UpdateCollectionCurrentTarget(1, 1) balancer.targetMgr.UpdateCollectionCurrentTarget(1, 1)
collection.LoadPercentage = 100 collection.LoadPercentage = 100
@ -401,6 +597,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOutboundNodes() {
collection.LoadType = querypb.LoadType_LoadCollection collection.LoadType = querypb.LoadType_LoadCollection
balancer.meta.CollectionManager.PutCollection(collection) balancer.meta.CollectionManager.PutCollection(collection)
balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, append(c.nodes, c.notExistedNodes...))) balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, append(c.nodes, c.notExistedNodes...)))
balancer.targetMgr.UpdateCollectionNextTargetWithPartitions(int64(1), int64(1))
for node, s := range c.distributions { for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...) balancer.dist.SegmentDistManager.Update(node, s...)
} }

View File

@ -70,7 +70,6 @@ func (b *ScoreBasedBalancer) AssignSegment(collectionID int64, segments []*meta.
plan := SegmentAssignPlan{ plan := SegmentAssignPlan{
From: -1, From: -1,
To: ni.nodeID, To: ni.nodeID,
Weight: GetWeight(1),
Segment: s, Segment: s,
} }
plans = append(plans, plan) plans = append(plans, plan)
@ -257,7 +256,6 @@ func (b *ScoreBasedBalancer) getStoppedSegmentPlan(replica *meta.Replica, nodesS
ReplicaID: replica.GetID(), ReplicaID: replica.GetID(),
From: nodeIndex[s.GetID()], From: nodeIndex[s.GetID()],
To: ni.nodeID, To: ni.nodeID,
Weight: GetWeight(1),
Segment: s, Segment: s,
} }
segmentPlans = append(segmentPlans, plan) segmentPlans = append(segmentPlans, plan)
@ -279,7 +277,6 @@ func (b *ScoreBasedBalancer) getStoppedChannelPlan(replica *meta.Replica, online
for i := range plans { for i := range plans {
plans[i].From = nodeID plans[i].From = nodeID
plans[i].ReplicaID = replica.ID plans[i].ReplicaID = replica.ID
plans[i].Weight = GetWeight(1)
} }
channelPlans = append(channelPlans, plans...) channelPlans = append(channelPlans, plans...)
} }
@ -340,7 +337,6 @@ func (b *ScoreBasedBalancer) getNormalSegmentPlan(replica *meta.Replica, nodesSe
From: fromNode.nodeID, From: fromNode.nodeID,
To: toNode.nodeID, To: toNode.nodeID,
Segment: targetSegmentToMove, Segment: targetSegmentToMove,
Weight: GetWeight(0),
} }
segmentPlans = append(segmentPlans, plan) segmentPlans = append(segmentPlans, plan)
} else { } else {
@ -354,7 +350,6 @@ func (b *ScoreBasedBalancer) getNormalSegmentPlan(replica *meta.Replica, nodesSe
From: fromNode.nodeID, From: fromNode.nodeID,
To: toNode.nodeID, To: toNode.nodeID,
Segment: targetSegmentToMove, Segment: targetSegmentToMove,
Weight: GetWeight(0),
} }
segmentPlans = append(segmentPlans, plan) segmentPlans = append(segmentPlans, plan)
} else { } else {

View File

@ -106,11 +106,11 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegment() {
//as assign segments is used while loading collection, //as assign segments is used while loading collection,
//all assignPlan should have weight equal to 1(HIGH PRIORITY) //all assignPlan should have weight equal to 1(HIGH PRIORITY)
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 3, NumOfRows: 15, {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 3, NumOfRows: 15,
CollectionID: 1}}, From: -1, To: 1, Weight: 1}, CollectionID: 1}}, From: -1, To: 1},
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 10, {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 10,
CollectionID: 1}}, From: -1, To: 3, Weight: 1}, CollectionID: 1}}, From: -1, To: 3},
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 5, {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 5,
CollectionID: 1}}, From: -1, To: 2, Weight: 1}, CollectionID: 1}}, From: -1, To: 2},
}, },
}, },
}, },
@ -151,9 +151,9 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegment() {
segmentCnts: []int{0, 0, 0}, segmentCnts: []int{0, 0, 0},
expectPlans: [][]SegmentAssignPlan{ expectPlans: [][]SegmentAssignPlan{
{ {
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 9, NumOfRows: 15, CollectionID: 1}}, From: -1, To: 3, Weight: 1}, {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 9, NumOfRows: 15, CollectionID: 1}}, From: -1, To: 3},
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 8, NumOfRows: 10, CollectionID: 1}}, From: -1, To: 2, Weight: 1}, {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 8, NumOfRows: 10, CollectionID: 1}}, From: -1, To: 2},
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 7, NumOfRows: 5, CollectionID: 1}}, From: -1, To: 1, Weight: 1}, {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 7, NumOfRows: 5, CollectionID: 1}}, From: -1, To: 1},
}, },
}, },
}, },
@ -192,12 +192,12 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegment() {
//much more than node3, but following assignment will still assign segment based on [10,20,40] //much more than node3, but following assignment will still assign segment based on [10,20,40]
//rather than [70,70,40], this flaw will be mitigated by balance process and maybe fixed in the later versions //rather than [70,70,40], this flaw will be mitigated by balance process and maybe fixed in the later versions
{ {
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 4, NumOfRows: 60, CollectionID: 1}}, From: -1, To: 1, Weight: 1}, {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 4, NumOfRows: 60, CollectionID: 1}}, From: -1, To: 1},
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 5, NumOfRows: 50, CollectionID: 1}}, From: -1, To: 2, Weight: 1}, {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 5, NumOfRows: 50, CollectionID: 1}}, From: -1, To: 2},
}, },
{ {
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 6, NumOfRows: 15, CollectionID: 2}}, From: -1, To: 1, Weight: 1}, {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 6, NumOfRows: 15, CollectionID: 2}}, From: -1, To: 1},
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 7, NumOfRows: 10, CollectionID: 2}}, From: -1, To: 2, Weight: 1}, {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 7, NumOfRows: 10, CollectionID: 2}}, From: -1, To: 2},
}, },
}, },
}, },
@ -489,9 +489,9 @@ func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() {
}, },
expectPlans: []SegmentAssignPlan{ expectPlans: []SegmentAssignPlan{
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 20}, {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 20},
Node: 1}, From: 1, To: 3, ReplicaID: 1, Weight: 1}, Node: 1}, From: 1, To: 3, ReplicaID: 1},
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 10}, {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 10},
Node: 1}, From: 1, To: 3, ReplicaID: 1, Weight: 1}, Node: 1}, From: 1, To: 3, ReplicaID: 1},
}, },
expectChannelPlans: []ChannelAssignPlan{}, expectChannelPlans: []ChannelAssignPlan{},
}, },

View File

@ -44,7 +44,7 @@ func CreateSegmentTasksFromPlans(ctx context.Context, checkerID int64, timeout t
action := task.NewSegmentAction(p.From, task.ActionTypeReduce, p.Segment.GetInsertChannel(), p.Segment.GetID()) action := task.NewSegmentAction(p.From, task.ActionTypeReduce, p.Segment.GetInsertChannel(), p.Segment.GetID())
actions = append(actions, action) actions = append(actions, action)
} }
task, err := task.NewSegmentTask( t, err := task.NewSegmentTask(
ctx, ctx,
timeout, timeout,
checkerID, checkerID,
@ -70,8 +70,14 @@ func CreateSegmentTasksFromPlans(ctx context.Context, checkerID int64, timeout t
zap.String("channel", p.Segment.GetInsertChannel()), zap.String("channel", p.Segment.GetInsertChannel()),
zap.Int64("from", p.From), zap.Int64("from", p.From),
zap.Int64("to", p.To)) zap.Int64("to", p.To))
task.SetPriority(GetTaskPriorityFromWeight(p.Weight)) if task.GetTaskType(t) == task.TaskTypeMove {
ret = append(ret, task) // from balance checker
t.SetPriority(task.TaskPriorityLow)
} else {
//from segment checker
t.SetPriority(task.TaskPriorityNormal)
}
ret = append(ret, t)
} }
return ret return ret
} }
@ -88,7 +94,7 @@ func CreateChannelTasksFromPlans(ctx context.Context, checkerID int64, timeout t
action := task.NewChannelAction(p.From, task.ActionTypeReduce, p.Channel.GetChannelName()) action := task.NewChannelAction(p.From, task.ActionTypeReduce, p.Channel.GetChannelName())
actions = append(actions, action) actions = append(actions, action)
} }
task, err := task.NewChannelTask(ctx, timeout, checkerID, p.Channel.GetCollectionID(), p.ReplicaID, actions...) t, err := task.NewChannelTask(ctx, timeout, checkerID, p.Channel.GetCollectionID(), p.ReplicaID, actions...)
if err != nil { if err != nil {
log.Warn("create channel task failed", log.Warn("create channel task failed",
zap.Int64("collection", p.Channel.GetCollectionID()), zap.Int64("collection", p.Channel.GetCollectionID()),
@ -107,8 +113,8 @@ func CreateChannelTasksFromPlans(ctx context.Context, checkerID int64, timeout t
zap.String("channel", p.Channel.GetChannelName()), zap.String("channel", p.Channel.GetChannelName()),
zap.Int64("from", p.From), zap.Int64("from", p.From),
zap.Int64("to", p.To)) zap.Int64("to", p.To))
task.SetPriority(GetTaskPriorityFromWeight(p.Weight)) t.SetPriority(task.TaskPriorityHigh)
ret = append(ret, task) ret = append(ret, t)
} }
return ret return ret
} }

View File

@ -49,12 +49,7 @@ func (b *BalanceChecker) Check(ctx context.Context) []task.Task {
segmentPlans, channelPlans := b.Balance.Balance() segmentPlans, channelPlans := b.Balance.Balance()
tasks := balance.CreateSegmentTasksFromPlans(ctx, b.ID(), Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), segmentPlans) tasks := balance.CreateSegmentTasksFromPlans(ctx, b.ID(), Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), segmentPlans)
task.SetPriorityWithFunc(func(t task.Task) task.Priority { task.SetPriority(task.TaskPriorityLow, tasks...)
if t.Priority() == task.TaskPriorityHigh {
return task.TaskPriorityHigh
}
return task.TaskPriorityLow
}, tasks...)
ret = append(ret, tasks...) ret = append(ret, tasks...)
tasks = balance.CreateChannelTasksFromPlans(ctx, b.ID(), Params.QueryCoordCfg.ChannelTaskTimeout.GetAsDuration(time.Millisecond), channelPlans) tasks = balance.CreateChannelTasksFromPlans(ctx, b.ID(), Params.QueryCoordCfg.ChannelTaskTimeout.GetAsDuration(time.Millisecond), channelPlans)

View File

@ -60,12 +60,6 @@ func SetPriority(priority Priority, tasks ...Task) {
} }
} }
func SetPriorityWithFunc(f func(t Task) Priority, tasks ...Task) {
for i := range tasks {
tasks[i].SetPriority(f(tasks[i]))
}
}
func SetReason(reason string, tasks ...Task) { func SetReason(reason string, tasks ...Task) {
for i := range tasks { for i := range tasks {
tasks[i].SetReason(reason) tasks[i].SetReason(reason)