Check whether segments are fully loaded while fetching shard leaders (#20991) (#21139)

Signed-off-by: yah01 <yang.cen@zilliz.com>

Signed-off-by: yah01 <yang.cen@zilliz.com>
pull/21174/head
yah01 2022-12-12 21:49:22 +08:00 committed by GitHub
parent 0dc9a0311c
commit df8c67fe87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 123 additions and 6 deletions

View File

@ -16,8 +16,23 @@
package querycoordv2 package querycoordv2
import "errors" import (
"errors"
"fmt"
)
var ( var (
ErrNotHealthy = errors.New("NotHealthy") ErrNotHealthy = errors.New("NotHealthy")
// Node Availability
ErrLackSegment = errors.New("LackSegment")
ErrNodeOffline = errors.New("NodeOffline")
) )
func WrapErrLackSegment(segmentID int64) error {
return fmt.Errorf("%w(segmentID=%v)", ErrLackSegment, segmentID)
}
func WrapErrNodeOffline(nodeID int64) error {
return fmt.Errorf("%w(nodeID=%v)", ErrNodeOffline, nodeID)
}

View File

@ -39,6 +39,7 @@ import (
"github.com/milvus-io/milvus/internal/util/timerecord" "github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/samber/lo" "github.com/samber/lo"
"go.uber.org/multierr"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -668,35 +669,70 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade
return resp, nil return resp, nil
} }
currentTargets := s.targetMgr.GetSegmentsByCollection(req.GetCollectionID())
for _, channel := range channels { for _, channel := range channels {
log := log.With(zap.String("channel", channel.GetChannelName())) log := log.With(zap.String("channel", channel.GetChannelName()))
leaders := s.dist.LeaderViewManager.GetLeadersByShard(channel.GetChannelName()) leaders := s.dist.LeaderViewManager.GetLeadersByShard(channel.GetChannelName())
ids := make([]int64, 0, len(leaders)) ids := make([]int64, 0, len(leaders))
addrs := make([]string, 0, len(leaders)) addrs := make([]string, 0, len(leaders))
var channelErr error
// In a replica, a shard is available, if and only if:
// 1. The leader is online
// 2. All QueryNodes in the distribution are online
// 3. All segments of the shard in target should be in the distribution
for _, leader := range leaders { for _, leader := range leaders {
log := log.With(zap.Int64("leaderID", leader.ID))
info := s.nodeMgr.Get(leader.ID) info := s.nodeMgr.Get(leader.ID)
if info == nil { if info == nil {
log.Info("leader is not available", zap.Int64("leaderID", leader.ID))
multierr.AppendInto(&channelErr, WrapErrNodeOffline(leader.ID))
continue continue
} }
isAllNodeAvailable := true // Check whether QueryNodes are online and available
isAvailable := true
for _, version := range leader.Segments { for _, version := range leader.Segments {
if s.nodeMgr.Get(version.NodeID) == nil { if s.nodeMgr.Get(version.NodeID) == nil {
isAllNodeAvailable = false log.Info("leader is not available due to QueryNode not available", zap.Int64("nodeID", version.GetNodeID()))
isAvailable = false
multierr.AppendInto(&channelErr, WrapErrNodeOffline(version.GetNodeID()))
break break
} }
} }
if !isAllNodeAvailable {
// Avoid iterating all segments if any QueryNode unavailable
if !isAvailable {
continue continue
} }
// Check whether segments are fully loaded
for _, segment := range currentTargets {
if segment.GetInsertChannel() != leader.Channel {
continue
}
_, exist := leader.Segments[segment.GetID()]
if !exist {
log.Info("leader is not available due to lack of segment", zap.Int64("segmentID", segment.GetID()))
multierr.AppendInto(&channelErr, WrapErrLackSegment(segment.GetID()))
isAvailable = false
break
}
}
if !isAvailable {
continue
}
ids = append(ids, info.ID()) ids = append(ids, info.ID())
addrs = append(addrs, info.Addr()) addrs = append(addrs, info.Addr())
} }
if len(ids) == 0 { if len(ids) == 0 {
msg := fmt.Sprintf("channel %s is not available in any replica", channel.GetChannelName()) msg := fmt.Sprintf("channel %s is not available in any replica", channel.GetChannelName())
log.Warn(msg) log.Warn(msg, zap.Error(channelErr))
resp.Status = utils.WrapStatus(commonpb.ErrorCode_NoReplicaAvailable, msg) resp.Status = utils.WrapStatus(commonpb.ErrorCode_NoReplicaAvailable, msg, channelErr)
resp.Shards = nil resp.Shards = nil
return resp, nil return resp, nil
} }
@ -707,6 +743,7 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade
NodeAddrs: addrs, NodeAddrs: addrs,
}) })
} }
return resp, nil return resp, nil
} }

View File

@ -20,6 +20,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"testing" "testing"
"time"
"github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb" "github.com/milvus-io/milvus-proto/go-api/milvuspb"
@ -901,6 +902,7 @@ func (suite *ServiceSuite) TestGetShardLeaders() {
req := &querypb.GetShardLeadersRequest{ req := &querypb.GetShardLeadersRequest{
CollectionID: collection, CollectionID: collection,
} }
resp, err := server.GetShardLeaders(ctx, req) resp, err := server.GetShardLeaders(ctx, req)
suite.NoError(err) suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode) suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode)
@ -920,6 +922,37 @@ func (suite *ServiceSuite) TestGetShardLeaders() {
suite.Contains(resp.Status.Reason, ErrNotHealthy.Error()) suite.Contains(resp.Status.Reason, ErrNotHealthy.Error())
} }
func (suite *ServiceSuite) TestGetShardLeadersFailed() {
suite.loadAll()
ctx := context.Background()
server := suite.server
for _, collection := range suite.collections {
suite.updateCollectionStatus(collection, querypb.LoadStatus_Loaded)
suite.updateChannelDist(collection)
req := &querypb.GetShardLeadersRequest{
CollectionID: collection,
}
// Node offline
for _, node := range suite.nodes {
suite.nodeMgr.Remove(node)
}
resp, err := server.GetShardLeaders(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_NoReplicaAvailable, resp.Status.ErrorCode)
for _, node := range suite.nodes {
suite.nodeMgr.Add(session.NewNodeInfo(node, "localhost"))
}
// Segment not fully loaded
suite.updateChannelDistWithoutSegment(collection)
resp, err = server.GetShardLeaders(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_NoReplicaAvailable, resp.Status.ErrorCode)
}
}
func (suite *ServiceSuite) loadAll() { func (suite *ServiceSuite) loadAll() {
ctx := context.Background() ctx := context.Background()
for _, collection := range suite.collections { for _, collection := range suite.collections {
@ -1073,6 +1106,38 @@ func (suite *ServiceSuite) updateSegmentDist(collection, node int64) {
func (suite *ServiceSuite) updateChannelDist(collection int64) { func (suite *ServiceSuite) updateChannelDist(collection int64) {
channels := suite.channels[collection] channels := suite.channels[collection]
segments := lo.Flatten(lo.Values(suite.segments[collection]))
replicas := suite.meta.ReplicaManager.GetByCollection(collection)
for _, replica := range replicas {
i := 0
for _, node := range replica.GetNodes() {
suite.dist.ChannelDistManager.Update(node, meta.DmChannelFromVChannel(&datapb.VchannelInfo{
CollectionID: collection,
ChannelName: channels[i],
}))
suite.dist.LeaderViewManager.Update(node, &meta.LeaderView{
ID: node,
CollectionID: collection,
Channel: channels[i],
Segments: lo.SliceToMap(segments, func(segment int64) (int64, *querypb.SegmentDist) {
return segment, &querypb.SegmentDist{
NodeID: node,
Version: time.Now().Unix(),
}
}),
})
i++
if i >= len(channels) {
break
}
}
}
}
func (suite *ServiceSuite) updateChannelDistWithoutSegment(collection int64) {
channels := suite.channels[collection]
replicas := suite.meta.ReplicaManager.GetByCollection(collection) replicas := suite.meta.ReplicaManager.GetByCollection(collection)
for _, replica := range replicas { for _, replica := range replicas {
i := 0 i := 0