mirror of https://github.com/milvus-io/milvus.git
fix: delegator doesn't follow with wal if streaming enabled (#39890)
issue: #38399 Signed-off-by: chyezh <chyezh@outlook.com>pull/39507/head
parent
38cfd38b31
commit
64dad60dc2
|
@ -180,11 +180,14 @@ func (b *ScoreBasedBalancer) assignChannel(br *balanceReport, collectionID int64
|
|||
for _, ch := range channels {
|
||||
func(ch *meta.DmChannel) {
|
||||
var targetNode *nodeItem
|
||||
forceAssignChannel := forceAssign
|
||||
if streamingutil.IsStreamingServiceEnabled() {
|
||||
// When streaming service is enabled, we need to assign channel to the node where WAL is located.
|
||||
nodeID := snmanager.StaticStreamingNodeManager.GetWALLocated(ch.GetChannelName())
|
||||
if item, ok := nodeItemsMap[nodeID]; ok {
|
||||
targetNode = item
|
||||
// assgin channel to the node where WAL is located always has enough benefits.
|
||||
forceAssignChannel = true
|
||||
}
|
||||
}
|
||||
// for each channel, pick the node with the least score
|
||||
|
@ -196,10 +199,15 @@ func (b *ScoreBasedBalancer) assignChannel(br *balanceReport, collectionID int64
|
|||
scoreChanges := b.calculateChannelScore(ch, collectionID)
|
||||
|
||||
sourceNode := nodeItemsMap[ch.Node]
|
||||
if sourceNode != nil && sourceNode.nodeID == targetNode.nodeID {
|
||||
// if the channel is already on the target node, skip assignment operation.
|
||||
return
|
||||
}
|
||||
|
||||
// if segment's node exist, which means this segment comes from balancer. we should consider the benefit
|
||||
// if the segment reassignment doesn't got enough benefit, we should skip this reassignment
|
||||
// notice: we should skip benefit check for forceAssign
|
||||
if !forceAssign && sourceNode != nil && !b.hasEnoughBenefit(sourceNode, targetNode, scoreChanges) {
|
||||
if !forceAssignChannel && sourceNode != nil && !b.hasEnoughBenefit(sourceNode, targetNode, scoreChanges) {
|
||||
br.AddRecord(StrRecordf("skip generate balance plan for channel %s since no enough benefit", ch.GetChannelName()))
|
||||
return
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||
"github.com/milvus-io/milvus/internal/util/streamingutil"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/proto/datapb"
|
||||
|
@ -92,7 +93,11 @@ func (c *LeaderChecker) Check(ctx context.Context) []task.Task {
|
|||
|
||||
replicas := c.meta.ReplicaManager.GetByCollection(ctx, collectionID)
|
||||
for _, replica := range replicas {
|
||||
for _, node := range replica.GetRWNodes() {
|
||||
nodes := replica.GetRWNodes()
|
||||
if streamingutil.IsStreamingServiceEnabled() {
|
||||
nodes = replica.GetRWSQNodes()
|
||||
}
|
||||
for _, node := range nodes {
|
||||
leaderViews := c.dist.LeaderViewManager.GetByFilter(meta.WithCollectionID2LeaderView(replica.GetCollectionID()), meta.WithNodeID2LeaderView(node))
|
||||
for _, leaderView := range leaderViews {
|
||||
dist := c.dist.SegmentDistManager.GetByFilter(meta.WithChannel(leaderView.Channel), meta.WithReplica(replica))
|
||||
|
|
Loading…
Reference in New Issue