fix: channel checker reduce balancing channels. (#30087)

Ignore leader unavailable when channel checker judge repeat channel to
avoid channel checker remove channels balancing.
relate: https://github.com/milvus-io/milvus/issues/29841
https://github.com/milvus-io/milvus/issues/29838

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
pull/30304/head
aoiasd 2024-01-26 10:59:00 +08:00 committed by GitHub
parent fd19e419f9
commit f84d9a589a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 250 additions and 60 deletions

View File

@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/balance"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/log"
@ -39,6 +40,7 @@ type ChannelChecker struct {
meta *meta.Meta
dist *meta.DistributionManager
targetMgr *meta.TargetManager
nodeMgr *session.NodeManager
balancer balance.Balance
}
@ -47,6 +49,7 @@ func NewChannelChecker(
dist *meta.DistributionManager,
targetMgr *meta.TargetManager,
balancer balance.Balance,
nodeMgr *session.NodeManager,
) *ChannelChecker {
return &ChannelChecker{
checkerActivation: newCheckerActivation(),
@ -54,6 +57,7 @@ func NewChannelChecker(
dist: dist,
targetMgr: targetMgr,
balancer: balancer,
nodeMgr: nodeMgr,
}
}
@ -109,7 +113,7 @@ func (c *ChannelChecker) checkReplica(ctx context.Context, replica *meta.Replica
repeated := c.findRepeatedChannels(replica.GetID())
tasks = c.createChannelReduceTasks(c.getTraceCtx(ctx, replica.CollectionID), repeated, replica.GetID())
task.SetReason("redundancies of channel")
task.SetReason("redundancies of channel", tasks...)
ret = append(ret, tasks...)
// All channel related tasks should be with high priority
@ -174,8 +178,29 @@ func (c *ChannelChecker) findRepeatedChannels(replicaID int64) []*meta.DmChannel
}
dist := c.getChannelDist(replica)
targets := c.targetMgr.GetSealedSegmentsByCollection(replica.GetCollectionID(), meta.CurrentTarget)
versionsMap := make(map[string]*meta.DmChannel)
for _, ch := range dist {
leaderView := c.dist.LeaderViewManager.GetLeaderShardView(ch.Node, ch.GetChannelName())
if leaderView == nil {
log.Info("shard leadview is not ready, skip",
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replicaID),
zap.Int64("leaderID", ch.Node),
zap.String("channel", ch.GetChannelName()))
continue
}
if err := CheckLeaderAvaliable(c.nodeMgr, leaderView, targets); err != nil {
log.Info("replica has unavailable shard leader",
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replicaID),
zap.Int64("leaderID", ch.Node),
zap.String("channel", ch.GetChannelName()),
zap.Error(err))
continue
}
maxVer, ok := versionsMap[ch.GetChannelName()]
if !ok {
versionsMap[ch.GetChannelName()] = ch

View File

@ -19,6 +19,7 @@ package checkers
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
@ -76,7 +77,7 @@ func (suite *ChannelCheckerTestSuite) SetupTest() {
distManager := meta.NewDistributionManager()
balancer := suite.createMockBalancer()
suite.checker = NewChannelChecker(suite.meta, distManager, targetManager, balancer)
suite.checker = NewChannelChecker(suite.meta, distManager, targetManager, balancer, suite.nodeMgr)
suite.broker.EXPECT().GetPartitions(mock.Anything, int64(1)).Return([]int64{1}, nil).Maybe()
}
@ -85,6 +86,14 @@ func (suite *ChannelCheckerTestSuite) TearDownTest() {
suite.kv.Close()
}
func (suite *ChannelCheckerTestSuite) setNodeAvailable(nodes ...int64) {
for _, node := range nodes {
nodeInfo := session.NewNodeInfo(node, "")
nodeInfo.SetLastHeartbeat(time.Now())
suite.nodeMgr.Add(nodeInfo)
}
}
func (suite *ChannelCheckerTestSuite) createMockBalancer() balance.Balance {
balancer := balance.NewMockBalancer(suite.T())
balancer.EXPECT().AssignChannel(mock.Anything, mock.Anything).Maybe().Return(func(channels []*meta.DmChannel, nodes []int64) []balance.ChannelAssignPlan {
@ -151,7 +160,10 @@ func (suite *ChannelCheckerTestSuite) TestReduceChannel() {
checker.targetMgr.UpdateCollectionCurrentTarget(int64(1))
checker.dist.ChannelDistManager.Update(1, utils.CreateTestChannel(1, 1, 1, "test-insert-channel1"))
checker.dist.LeaderViewManager.Update(1, &meta.LeaderView{ID: 1, Channel: "test-insert-channel1"})
checker.dist.ChannelDistManager.Update(1, utils.CreateTestChannel(1, 1, 1, "test-insert-channel2"))
checker.dist.LeaderViewManager.Update(1, &meta.LeaderView{ID: 1, Channel: "test-insert-channel2"})
suite.setNodeAvailable(1)
tasks := checker.Check(context.TODO())
suite.Len(tasks, 1)
suite.EqualValues(1, tasks[0].ReplicaID())
@ -191,6 +203,12 @@ func (suite *ChannelCheckerTestSuite) TestRepeatedChannels() {
checker.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 2, "test-insert-channel"))
tasks := checker.Check(context.TODO())
suite.Len(tasks, 0)
suite.setNodeAvailable(1, 2)
checker.dist.LeaderViewManager.Update(1, &meta.LeaderView{ID: 1, Channel: "test-insert-channel"})
checker.dist.LeaderViewManager.Update(2, &meta.LeaderView{ID: 2, Channel: "test-insert-channel"})
tasks = checker.Check(context.TODO())
suite.Len(tasks, 1)
suite.EqualValues(1, tasks[0].ReplicaID())
suite.Len(tasks[0].Actions(), 1)

View File

@ -63,7 +63,7 @@ func NewCheckerController(
// CheckerController runs checkers with the order,
// the former checker has higher priority
checkers := map[utils.CheckerType]Checker{
utils.ChannelChecker: NewChannelChecker(meta, dist, targetMgr, balancer),
utils.ChannelChecker: NewChannelChecker(meta, dist, targetMgr, balancer, nodeMgr),
utils.SegmentChecker: NewSegmentChecker(meta, dist, targetMgr, balancer, nodeMgr),
utils.BalanceChecker: NewBalanceChecker(meta, balancer, nodeMgr, scheduler),
utils.IndexChecker: NewIndexChecker(meta, dist, broker, nodeMgr),

View File

@ -0,0 +1,78 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package checkers
import (
"fmt"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
)
func CheckNodeAvailable(nodeID int64, info *session.NodeInfo) error {
if info == nil {
return merr.WrapErrNodeOffline(nodeID)
}
return nil
}
// In a replica, a shard is available, if and only if:
// 1. The leader is online
// 2. All QueryNodes in the distribution are online
// 3. The last heartbeat response time is within HeartbeatAvailableInterval for all QueryNodes(include leader) in the distribution
// 4. All segments of the shard in target should be in the distribution
func CheckLeaderAvaliable(nodeMgr *session.NodeManager, leader *meta.LeaderView, currentTargets map[int64]*datapb.SegmentInfo) error {
log := log.With(zap.Int64("leaderID", leader.ID))
info := nodeMgr.Get(leader.ID)
// Check whether leader is online
err := CheckNodeAvailable(leader.ID, info)
if err != nil {
log.Info("leader is not available", zap.Error(err))
return fmt.Errorf("leader not available: %w", err)
}
for id, version := range leader.Segments {
info := nodeMgr.Get(version.GetNodeID())
err = CheckNodeAvailable(version.GetNodeID(), info)
if err != nil {
log.Info("leader is not available due to QueryNode unavailable",
zap.Int64("segmentID", id),
zap.Error(err))
return err
}
}
// Check whether segments are fully loaded
for segmentID, info := range currentTargets {
if info.GetInsertChannel() != leader.Channel {
continue
}
_, exist := leader.Segments[segmentID]
if !exist {
log.Info("leader is not available due to lack of segment", zap.Int64("segmentID", segmentID))
return merr.WrapErrSegmentLack(segmentID)
}
}
return nil
}

View File

@ -0,0 +1,123 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package checkers
import (
"testing"
"time"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
)
type UtilTestSuite struct {
suite.Suite
nodeMgr *session.NodeManager
}
func (suite *UtilTestSuite) SetupTest() {
suite.nodeMgr = session.NewNodeManager()
}
func (suite *UtilTestSuite) setNodeAvailable(nodes ...int64) {
for _, node := range nodes {
nodeInfo := session.NewNodeInfo(node, "")
nodeInfo.SetLastHeartbeat(time.Now())
suite.nodeMgr.Add(nodeInfo)
}
}
func (suite *UtilTestSuite) TestCheckLeaderAvaliable() {
leadview := &meta.LeaderView{
ID: 1,
Channel: "test",
Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}},
}
suite.setNodeAvailable(1, 2)
err := CheckLeaderAvaliable(suite.nodeMgr, leadview, map[int64]*datapb.SegmentInfo{
2: {
ID: 2,
InsertChannel: "test",
},
})
suite.NoError(err)
}
func (suite *UtilTestSuite) TestCheckLeaderAvaliableFailed() {
suite.Run("leader not available", func() {
leadview := &meta.LeaderView{
ID: 1,
Channel: "test",
Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}},
}
// leader nodeID=1 not available
suite.setNodeAvailable(2)
err := CheckLeaderAvaliable(suite.nodeMgr, leadview, map[int64]*datapb.SegmentInfo{
2: {
ID: 2,
InsertChannel: "test",
},
})
suite.Error(err)
suite.nodeMgr = session.NewNodeManager()
})
suite.Run("shard worker not available", func() {
leadview := &meta.LeaderView{
ID: 1,
Channel: "test",
Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}},
}
// leader nodeID=2 not available
suite.setNodeAvailable(1)
err := CheckLeaderAvaliable(suite.nodeMgr, leadview, map[int64]*datapb.SegmentInfo{
2: {
ID: 2,
InsertChannel: "test",
},
})
suite.Error(err)
suite.nodeMgr = session.NewNodeManager()
})
suite.Run("segment lacks", func() {
leadview := &meta.LeaderView{
ID: 1,
Channel: "test",
Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}},
}
suite.setNodeAvailable(1, 2)
err := CheckLeaderAvaliable(suite.nodeMgr, leadview, map[int64]*datapb.SegmentInfo{
// target segmentID=1 not in leadView
1: {
ID: 1,
InsertChannel: "test",
},
})
suite.Error(err)
suite.nodeMgr = session.NewNodeManager()
})
}
func TestUtilSuite(t *testing.T) {
suite.Run(t, new(UtilTestSuite))
}

View File

@ -356,13 +356,6 @@ func (s *Server) fillReplicaInfo(replica *meta.Replica, withShardNodes bool) (*m
return info, nil
}
func checkNodeAvailable(nodeID int64, info *session.NodeInfo) error {
if info == nil {
return merr.WrapErrNodeOffline(nodeID)
}
return nil
}
func filterDupLeaders(replicaManager *meta.ReplicaManager, leaders map[int64]*meta.LeaderView) map[int64]*meta.LeaderView {
type leaderID struct {
ReplicaID int64

View File

@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/checkers"
"github.com/milvus-io/milvus/internal/querycoordv2/job"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
@ -889,57 +890,9 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade
channelErr = merr.WrapErrChannelLack("channel not subscribed")
}
// In a replica, a shard is available, if and only if:
// 1. The leader is online
// 2. All QueryNodes in the distribution are online
// 3. The last heartbeat response time is within HeartbeatAvailableInterval for all QueryNodes(include leader) in the distribution
// 4. All segments of the shard in target should be in the distribution
for _, leader := range leaders {
log := log.With(zap.Int64("leaderID", leader.ID))
info := s.nodeMgr.Get(leader.ID)
// Check whether leader is online
err := checkNodeAvailable(leader.ID, info)
if err != nil {
log.Info("leader is not available", zap.Error(err))
multierr.AppendInto(&channelErr, fmt.Errorf("leader not available: %w", err))
continue
}
// Check whether QueryNodes are online and available
isAvailable := true
for id, version := range leader.Segments {
info := s.nodeMgr.Get(version.GetNodeID())
err = checkNodeAvailable(version.GetNodeID(), info)
if err != nil {
log.Info("leader is not available due to QueryNode unavailable",
zap.Int64("segmentID", id),
zap.Error(err))
isAvailable = false
multierr.AppendInto(&channelErr, err)
break
}
}
// Avoid iterating all segments if any QueryNode unavailable
if !isAvailable {
continue
}
// Check whether segments are fully loaded
for segmentID, info := range currentTargets {
if info.GetInsertChannel() != leader.Channel {
continue
}
_, exist := leader.Segments[segmentID]
if !exist {
log.Info("leader is not available due to lack of segment", zap.Int64("segmentID", segmentID))
multierr.AppendInto(&channelErr, merr.WrapErrSegmentLack(segmentID))
isAvailable = false
break
}
}
if !isAvailable {
if err := checkers.CheckLeaderAvaliable(s.nodeMgr, leader, currentTargets); err != nil {
multierr.AppendInto(&channelErr, err)
continue
}