mirror of https://github.com/milvus-io/milvus.git
Add more logs for GetShardLeaders (#21046)
Also increase the heartbeatAvailableInterval from 2.5s to 10s Signed-off-by: yah01 <yang.cen@zilliz.com> Signed-off-by: yah01 <yang.cen@zilliz.com>pull/21078/head
parent
27219ce9c0
commit
9ebaa10dec
|
@ -186,7 +186,7 @@ queryCoord:
|
|||
channelTaskTimeout: 60000 # 1 minute
|
||||
segmentTaskTimeout: 120000 # 2 minute
|
||||
distPullInterval: 500
|
||||
heartbeatAvailableInterval: 2500 # Only QueryNodes which fetched heartbeats within the duration are available
|
||||
heartbeatAvailableInterval: 10000 # 10s, Only QueryNodes which fetched heartbeats within the duration are available
|
||||
loadTimeoutSeconds: 600
|
||||
checkHandoffInterval: 5000
|
||||
taskMergeCap: 16
|
||||
|
|
|
@ -16,8 +16,33 @@
|
|||
|
||||
package querycoordv2
|
||||
|
||||
import "errors"
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrNotHealthy = errors.New("NotHealthy")
|
||||
|
||||
// Node Availability
|
||||
ErrLackSegment = errors.New("LackSegment")
|
||||
ErrNodeOffline = errors.New("NodeOffline")
|
||||
ErrNodeHeartbeatOutdated = errors.New("NodeHeartbeatOutdated")
|
||||
)
|
||||
|
||||
func WrapErrLackSegment(segmentID int64) error {
|
||||
return fmt.Errorf("%w(segmentID=%v)", ErrLackSegment, segmentID)
|
||||
}
|
||||
|
||||
func WrapErrNodeOffline(nodeID int64) error {
|
||||
return fmt.Errorf("%w(nodeID=%v)", ErrNodeOffline, nodeID)
|
||||
}
|
||||
|
||||
func WrapErrNodeHeartbeatOutdated(nodeID int64, lastHeartbeat time.Time) error {
|
||||
return fmt.Errorf("%w(nodeID=%v, lastHeartbeat=%v)",
|
||||
ErrNodeHeartbeatOutdated,
|
||||
nodeID,
|
||||
lastHeartbeat,
|
||||
)
|
||||
}
|
||||
|
|
|
@ -353,3 +353,12 @@ func errCode(err error) commonpb.ErrorCode {
|
|||
}
|
||||
return commonpb.ErrorCode_UnexpectedError
|
||||
}
|
||||
|
||||
func checkNodeAvailable(nodeID int64, info *session.NodeInfo) error {
|
||||
if info == nil {
|
||||
return WrapErrNodeOffline(nodeID)
|
||||
} else if time.Since(info.LastHeartbeat()) > Params.QueryCoordCfg.HeartbeatAvailableInterval.GetAsDuration(time.Millisecond) {
|
||||
return WrapErrNodeHeartbeatOutdated(nodeID, info.LastHeartbeat())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
|
||||
|
@ -38,6 +37,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/util/timerecord"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/multierr"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
@ -658,29 +658,42 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade
|
|||
ids := make([]int64, 0, len(leaders))
|
||||
addrs := make([]string, 0, len(leaders))
|
||||
|
||||
var channelErr error
|
||||
|
||||
// In a replica, a shard is available, if and only if:
|
||||
// 1. The leader is online
|
||||
// 2. All QueryNodes in the distribution are online
|
||||
// 3. The last heartbeat response time is within HeartbeatAvailableInterval for all QueryNodes(include leader) in the distribution
|
||||
// 4. All segments of the shard in target should be in the distribution
|
||||
for _, leader := range leaders {
|
||||
log := log.With(zap.Int64("leaderID", leader.ID))
|
||||
info := s.nodeMgr.Get(leader.ID)
|
||||
|
||||
// Check whether leader is online
|
||||
if info == nil || time.Since(info.LastHeartbeat()) > Params.QueryCoordCfg.HeartbeatAvailableInterval.GetAsDuration(time.Millisecond) {
|
||||
err := checkNodeAvailable(leader.ID, info)
|
||||
if err != nil {
|
||||
log.Info("leader is not available", zap.Error(err))
|
||||
multierr.AppendInto(&channelErr, fmt.Errorf("leader not available: %w", err))
|
||||
continue
|
||||
}
|
||||
|
||||
// Check whether QueryNodes are online and available
|
||||
isAvailable := true
|
||||
for _, version := range leader.Segments {
|
||||
info := s.nodeMgr.Get(version.NodeID)
|
||||
if info == nil || time.Since(info.LastHeartbeat()) > Params.QueryCoordCfg.HeartbeatAvailableInterval.GetAsDuration(time.Millisecond) {
|
||||
info := s.nodeMgr.Get(version.GetNodeID())
|
||||
err = checkNodeAvailable(version.GetNodeID(), info)
|
||||
if err != nil {
|
||||
log.Info("leader is not available due to QueryNode unavailable", zap.Error(err))
|
||||
isAvailable = false
|
||||
multierr.AppendInto(&channelErr, err)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Avoid iterating all segments if any QueryNode unavailable
|
||||
if !isAvailable {
|
||||
continue
|
||||
}
|
||||
|
||||
// Check whether segments are fully loaded
|
||||
for segmentID, info := range currentTargets {
|
||||
if info.GetInsertChannel() != leader.Channel {
|
||||
|
@ -689,6 +702,8 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade
|
|||
|
||||
_, exist := leader.Segments[segmentID]
|
||||
if !exist {
|
||||
log.Info("leader is not available due to lack of segment", zap.Int64("segmentID", segmentID))
|
||||
multierr.AppendInto(&channelErr, WrapErrLackSegment(segmentID))
|
||||
isAvailable = false
|
||||
break
|
||||
}
|
||||
|
@ -703,8 +718,8 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade
|
|||
|
||||
if len(ids) == 0 {
|
||||
msg := fmt.Sprintf("channel %s is not available in any replica", channel.GetChannelName())
|
||||
log.Warn(msg)
|
||||
resp.Status = utils.WrapStatus(commonpb.ErrorCode_NoReplicaAvailable, msg)
|
||||
log.Warn(msg, zap.Error(channelErr))
|
||||
resp.Status = utils.WrapStatus(commonpb.ErrorCode_NoReplicaAvailable, msg, channelErr)
|
||||
resp.Shards = nil
|
||||
return resp, nil
|
||||
}
|
||||
|
|
|
@ -931,9 +931,21 @@ func (suite *ServiceSuite) TestGetShardLeadersFailed() {
|
|||
CollectionID: collection,
|
||||
}
|
||||
|
||||
// Node offline
|
||||
suite.fetchHeartbeats(time.Now())
|
||||
for _, node := range suite.nodes {
|
||||
suite.nodeMgr.Remove(node)
|
||||
}
|
||||
resp, err := server.GetShardLeaders(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_NoReplicaAvailable, resp.Status.ErrorCode)
|
||||
for _, node := range suite.nodes {
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(node, "localhost"))
|
||||
}
|
||||
|
||||
// Last heartbeat response time too old
|
||||
suite.fetchHeartbeats(time.Now().Add(-Params.QueryCoordCfg.HeartbeatAvailableInterval.GetAsDuration(time.Millisecond) - 1))
|
||||
resp, err := server.GetShardLeaders(ctx, req)
|
||||
resp, err = server.GetShardLeaders(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_NoReplicaAvailable, resp.Status.ErrorCode)
|
||||
|
||||
|
|
|
@ -896,7 +896,7 @@ func (p *queryCoordConfig) init(base *BaseTable) {
|
|||
p.HeartbeatAvailableInterval = ParamItem{
|
||||
Key: "queryCoord.heartbeatAvailableInterval",
|
||||
Version: "2.2.1",
|
||||
DefaultValue: "2500",
|
||||
DefaultValue: "10000",
|
||||
PanicIfEmpty: true,
|
||||
}
|
||||
p.HeartbeatAvailableInterval.Init(base.mgr)
|
||||
|
|
Loading…
Reference in New Issue