mirror of https://github.com/milvus-io/milvus.git
enhance: change frequency log to rated level (#31084)
This PR change frequency log of check shard leader to rated level --------- Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/31147/head
parent
efe8cecc88
commit
ddd918ba04
|
@ -111,7 +111,7 @@ func (c *ChannelChecker) checkReplica(ctx context.Context, replica *meta.Replica
|
|||
task.SetReason("collection released", tasks...)
|
||||
ret = append(ret, tasks...)
|
||||
|
||||
repeated := c.findRepeatedChannels(replica.GetID())
|
||||
repeated := c.findRepeatedChannels(ctx, replica.GetID())
|
||||
tasks = c.createChannelReduceTasks(c.getTraceCtx(ctx, replica.CollectionID), repeated, replica.GetID())
|
||||
task.SetReason("redundancies of channel", tasks...)
|
||||
ret = append(ret, tasks...)
|
||||
|
@ -168,7 +168,8 @@ func (c *ChannelChecker) getChannelDist(replica *meta.Replica) []*meta.DmChannel
|
|||
return dist
|
||||
}
|
||||
|
||||
func (c *ChannelChecker) findRepeatedChannels(replicaID int64) []*meta.DmChannel {
|
||||
func (c *ChannelChecker) findRepeatedChannels(ctx context.Context, replicaID int64) []*meta.DmChannel {
|
||||
log := log.Ctx(ctx).WithRateGroup("ChannelChecker.findRepeatedChannels", 1, 60)
|
||||
replica := c.meta.Get(replicaID)
|
||||
ret := make([]*meta.DmChannel, 0)
|
||||
|
||||
|
@ -191,8 +192,8 @@ func (c *ChannelChecker) findRepeatedChannels(replicaID int64) []*meta.DmChannel
|
|||
continue
|
||||
}
|
||||
|
||||
if err := CheckLeaderAvaliable(c.nodeMgr, leaderView, targets); err != nil {
|
||||
log.Info("replica has unavailable shard leader",
|
||||
if err := CheckLeaderAvailable(c.nodeMgr, leaderView, targets); err != nil {
|
||||
log.RatedInfo(10, "replica has unavailable shard leader",
|
||||
zap.Int64("collectionID", replica.GetCollectionID()),
|
||||
zap.Int64("replicaID", replicaID),
|
||||
zap.Int64("leaderID", ch.Node),
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package checkers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
@ -40,8 +41,10 @@ func CheckNodeAvailable(nodeID int64, info *session.NodeInfo) error {
|
|||
// 2. All QueryNodes in the distribution are online
|
||||
// 3. The last heartbeat response time is within HeartbeatAvailableInterval for all QueryNodes(include leader) in the distribution
|
||||
// 4. All segments of the shard in target should be in the distribution
|
||||
func CheckLeaderAvaliable(nodeMgr *session.NodeManager, leader *meta.LeaderView, currentTargets map[int64]*datapb.SegmentInfo) error {
|
||||
log := log.With(zap.Int64("leaderID", leader.ID))
|
||||
func CheckLeaderAvailable(nodeMgr *session.NodeManager, leader *meta.LeaderView, currentTargets map[int64]*datapb.SegmentInfo) error {
|
||||
log := log.Ctx(context.TODO()).
|
||||
WithRateGroup("checkers.CheckLeaderAvailable", 1, 60).
|
||||
With(zap.Int64("leaderID", leader.ID))
|
||||
info := nodeMgr.Get(leader.ID)
|
||||
|
||||
// Check whether leader is online
|
||||
|
@ -70,7 +73,7 @@ func CheckLeaderAvaliable(nodeMgr *session.NodeManager, leader *meta.LeaderView,
|
|||
|
||||
_, exist := leader.Segments[segmentID]
|
||||
if !exist {
|
||||
log.Info("leader is not available due to lack of segment", zap.Int64("segmentID", segmentID))
|
||||
log.RatedInfo(10, "leader is not available due to lack of segment", zap.Int64("segmentID", segmentID))
|
||||
return merr.WrapErrSegmentLack(segmentID)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -53,7 +53,7 @@ func (suite *UtilTestSuite) TestCheckLeaderAvaliable() {
|
|||
}
|
||||
|
||||
suite.setNodeAvailable(1, 2)
|
||||
err := CheckLeaderAvaliable(suite.nodeMgr, leadview, map[int64]*datapb.SegmentInfo{
|
||||
err := CheckLeaderAvailable(suite.nodeMgr, leadview, map[int64]*datapb.SegmentInfo{
|
||||
2: {
|
||||
ID: 2,
|
||||
InsertChannel: "test",
|
||||
|
@ -71,7 +71,7 @@ func (suite *UtilTestSuite) TestCheckLeaderAvaliableFailed() {
|
|||
}
|
||||
// leader nodeID=1 not available
|
||||
suite.setNodeAvailable(2)
|
||||
err := CheckLeaderAvaliable(suite.nodeMgr, leadview, map[int64]*datapb.SegmentInfo{
|
||||
err := CheckLeaderAvailable(suite.nodeMgr, leadview, map[int64]*datapb.SegmentInfo{
|
||||
2: {
|
||||
ID: 2,
|
||||
InsertChannel: "test",
|
||||
|
@ -89,7 +89,7 @@ func (suite *UtilTestSuite) TestCheckLeaderAvaliableFailed() {
|
|||
}
|
||||
// leader nodeID=2 not available
|
||||
suite.setNodeAvailable(1)
|
||||
err := CheckLeaderAvaliable(suite.nodeMgr, leadview, map[int64]*datapb.SegmentInfo{
|
||||
err := CheckLeaderAvailable(suite.nodeMgr, leadview, map[int64]*datapb.SegmentInfo{
|
||||
2: {
|
||||
ID: 2,
|
||||
InsertChannel: "test",
|
||||
|
@ -106,7 +106,7 @@ func (suite *UtilTestSuite) TestCheckLeaderAvaliableFailed() {
|
|||
Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}},
|
||||
}
|
||||
suite.setNodeAvailable(1, 2)
|
||||
err := CheckLeaderAvaliable(suite.nodeMgr, leadview, map[int64]*datapb.SegmentInfo{
|
||||
err := CheckLeaderAvailable(suite.nodeMgr, leadview, map[int64]*datapb.SegmentInfo{
|
||||
// target segmentID=1 not in leadView
|
||||
1: {
|
||||
ID: 1,
|
||||
|
|
|
@ -891,7 +891,7 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade
|
|||
}
|
||||
|
||||
for _, leader := range leaders {
|
||||
if err := checkers.CheckLeaderAvaliable(s.nodeMgr, leader, currentTargets); err != nil {
|
||||
if err := checkers.CheckLeaderAvailable(s.nodeMgr, leader, currentTargets); err != nil {
|
||||
multierr.AppendInto(&channelErr, err)
|
||||
continue
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue