From 711c91eb9aa5a40aedc4b50558b4b4872d266da0 Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 29 Nov 2022 11:53:15 +0800 Subject: [PATCH] Add log for transfer segment operation (#20829) (#20847) Signed-off-by: Congqi Xia Signed-off-by: Congqi Xia Signed-off-by: Congqi Xia --- internal/querynode/impl_utils.go | 32 +++- internal/querynode/impl_utils_test.go | 224 ++++++++++++++++++++++++++ 2 files changed, 252 insertions(+), 4 deletions(-) create mode 100644 internal/querynode/impl_utils_test.go diff --git a/internal/querynode/impl_utils.go b/internal/querynode/impl_utils.go index 5c3d64a23d..ee962d7159 100644 --- a/internal/querynode/impl_utils.go +++ b/internal/querynode/impl_utils.go @@ -6,18 +6,29 @@ import ( "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/samber/lo" "go.uber.org/zap" ) +// TransferLoad transfers load segments with shard cluster. func (node *QueryNode) TransferLoad(ctx context.Context, req *querypb.LoadSegmentsRequest) (*commonpb.Status, error) { if len(req.GetInfos()) == 0 { return &commonpb.Status{}, nil } shard := req.GetInfos()[0].GetInsertChannel() + segmentIDs := lo.Map(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) int64 { + return info.GetSegmentID() + }) + log := log.Ctx(ctx).With( + zap.String("shard", shard), + zap.Int64s("segmentIDs", segmentIDs), + ) + + log.Info("LoadSegment start to transfer load with shard cluster") shardCluster, ok := node.ShardClusterService.getShardCluster(shard) if !ok { - log.Warn("TransferLoad failed to find shard cluster", zap.String("shard", shard)) + log.Warn("TransferLoad failed to find shard cluster") return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_NotShardLeader, Reason: "shard cluster not found, the leader may have changed", @@ -27,22 +38,33 @@ func (node *QueryNode) TransferLoad(ctx context.Context, req *querypb.LoadSegmen req.NeedTransfer = false err := shardCluster.LoadSegments(ctx, req) if err != nil { - log.Warn("shard cluster failed to load segments", zap.String("shard", shard), zap.Error(err)) + log.Warn("shard cluster failed to load segments", zap.Error(err)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), }, nil } + log.Info("LoadSegment transfer load done") + return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, }, nil } +// TransferRelease transfers release segments with shard cluster. func (node *QueryNode) TransferRelease(ctx context.Context, req *querypb.ReleaseSegmentsRequest) (*commonpb.Status, error) { + log := log.Ctx(ctx).With( + zap.String("shard", req.GetShard()), + zap.Int64s("segmentIDs", req.GetSegmentIDs()), + zap.String("scope", req.GetScope().String()), + ) + + log.Info("ReleaseSegments start to transfer release with shard cluster") + shardCluster, ok := node.ShardClusterService.getShardCluster(req.GetShard()) if !ok { - log.Warn("TransferLoad failed to find shard cluster", zap.String("shard", req.GetShard())) + log.Warn("TransferLoad failed to find shard cluster") return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_NotShardLeader, Reason: "shard cluster not found, the leader may have changed", @@ -52,12 +74,14 @@ func (node *QueryNode) TransferRelease(ctx context.Context, req *querypb.Release req.NeedTransfer = false err := shardCluster.ReleaseSegments(ctx, req, false) if err != nil { - log.Warn("shard cluster failed to release segments", zap.String("shard", req.GetShard()), zap.Error(err)) + log.Warn("shard cluster failed to release segments", zap.Error(err)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), }, nil } + + log.Info("ReleaseSegments transfer release done") return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, }, nil diff --git a/internal/querynode/impl_utils_test.go b/internal/querynode/impl_utils_test.go new file mode 100644 index 0000000000..2700bd8ab9 --- /dev/null +++ b/internal/querynode/impl_utils_test.go @@ -0,0 +1,224 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package querynode + +import ( + "context" + "testing" + + "github.com/milvus-io/milvus-proto/go-api/commonpb" + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/stretchr/testify/suite" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/server/v3/etcdserver/api/v3client" +) + +type ImplUtilsSuite struct { + suite.Suite + + querynode *QueryNode + client *clientv3.Client +} + +func (s *ImplUtilsSuite) SetupSuite() { + s.querynode = newQueryNodeMock() + client := v3client.New(embedetcdServer.Server) + s.querynode.session = sessionutil.NewSession(context.Background(), "milvus_ut/sessions", client) + s.querynode.UpdateStateCode(commonpb.StateCode_Healthy) + + s.querynode.ShardClusterService = newShardClusterService(client, s.querynode.session, s.querynode) +} + +func (s *ImplUtilsSuite) TearDownSuite() { + s.querynode.Stop() +} + +func (s *ImplUtilsSuite) SetupTest() { + + nodeEvent := []nodeEvent{ + { + nodeID: s.querynode.session.ServerID, + nodeAddr: s.querynode.session.ServerName, + isLeader: true, + }, + } + cs := NewShardCluster(defaultCollectionID, defaultReplicaID, defaultChannelName, defaultVersion, &mockNodeDetector{ + initNodes: nodeEvent, + }, &mockSegmentDetector{}, buildMockQueryNode) + + s.querynode.ShardClusterService.clusters.Store(defaultChannelName, cs) + cs.SetupFirstVersion() + +} + +func (s *ImplUtilsSuite) TearDownTest() { + s.querynode.ShardClusterService.releaseCollection(defaultCollectionID) +} + +func (s *ImplUtilsSuite) TestTransferLoad() { + ctx := context.Background() + s.Run("normal transfer load", func() { + status, err := s.querynode.TransferLoad(ctx, &querypb.LoadSegmentsRequest{ + Base: &commonpb.MsgBase{ + TargetID: s.querynode.session.ServerID, + }, + DstNodeID: s.querynode.session.ServerID, + Infos: []*querypb.SegmentLoadInfo{ + { + SegmentID: defaultSegmentID, + InsertChannel: defaultChannelName, + CollectionID: defaultCollectionID, + PartitionID: defaultPartitionID, + }, + }, + }) + + s.NoError(err) + s.Equal(commonpb.ErrorCode_Success, status.GetErrorCode()) + }) + + s.Run("transfer non-exist channel load", func() { + status, err := s.querynode.TransferLoad(ctx, &querypb.LoadSegmentsRequest{ + Base: &commonpb.MsgBase{ + TargetID: s.querynode.session.ServerID, + }, + DstNodeID: s.querynode.session.ServerID, + Infos: []*querypb.SegmentLoadInfo{ + { + SegmentID: defaultSegmentID, + InsertChannel: "invalid_channel", + CollectionID: defaultCollectionID, + PartitionID: defaultPartitionID, + }, + }, + }) + + s.NoError(err) + s.Equal(commonpb.ErrorCode_NotShardLeader, status.GetErrorCode()) + }) + + s.Run("transfer empty load segments", func() { + status, err := s.querynode.TransferLoad(ctx, &querypb.LoadSegmentsRequest{ + Base: &commonpb.MsgBase{ + TargetID: s.querynode.session.ServerID, + }, + DstNodeID: s.querynode.session.ServerID, + Infos: []*querypb.SegmentLoadInfo{}, + }) + + s.NoError(err) + s.Equal(commonpb.ErrorCode_Success, status.GetErrorCode()) + }) + + s.Run("transfer load fail", func() { + cs, ok := s.querynode.ShardClusterService.getShardCluster(defaultChannelName) + s.Require().True(ok) + cs.nodes[100] = &shardNode{ + nodeID: 100, + nodeAddr: "test", + client: &mockShardQueryNode{ + loadSegmentsResults: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: "error", + }, + }, + } + + status, err := s.querynode.TransferLoad(ctx, &querypb.LoadSegmentsRequest{ + Base: &commonpb.MsgBase{ + TargetID: s.querynode.session.ServerID, + }, + DstNodeID: 100, + Infos: []*querypb.SegmentLoadInfo{ + { + SegmentID: defaultSegmentID, + InsertChannel: defaultChannelName, + CollectionID: defaultCollectionID, + PartitionID: defaultPartitionID, + }, + }, + }) + + s.NoError(err) + s.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode()) + }) +} + +func (s *ImplUtilsSuite) TestTransferRelease() { + ctx := context.Background() + s.Run("normal transfer release", func() { + status, err := s.querynode.TransferRelease(ctx, &querypb.ReleaseSegmentsRequest{ + Base: &commonpb.MsgBase{ + TargetID: s.querynode.session.ServerID, + }, + SegmentIDs: []int64{}, + Scope: querypb.DataScope_All, + Shard: defaultChannelName, + NodeID: s.querynode.session.ServerID, + }) + + s.NoError(err) + s.Equal(commonpb.ErrorCode_Success, status.GetErrorCode()) + }) + + s.Run("transfer non-exist channel release", func() { + status, err := s.querynode.TransferRelease(ctx, &querypb.ReleaseSegmentsRequest{ + Base: &commonpb.MsgBase{ + TargetID: s.querynode.session.ServerID, + }, + SegmentIDs: []int64{}, + Scope: querypb.DataScope_All, + Shard: "invalid_channel", + NodeID: s.querynode.session.ServerID, + }) + + s.NoError(err) + s.Equal(commonpb.ErrorCode_NotShardLeader, status.GetErrorCode()) + }) + + s.Run("transfer release fail", func() { + cs, ok := s.querynode.ShardClusterService.getShardCluster(defaultChannelName) + s.Require().True(ok) + cs.nodes[100] = &shardNode{ + nodeID: 100, + nodeAddr: "test", + client: &mockShardQueryNode{ + releaseSegmentsResult: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + }, + }, + } + + status, err := s.querynode.TransferRelease(ctx, &querypb.ReleaseSegmentsRequest{ + Base: &commonpb.MsgBase{ + TargetID: s.querynode.session.ServerID, + }, + SegmentIDs: []int64{}, + Scope: querypb.DataScope_All, + Shard: defaultChannelName, + NodeID: 100, + }) + + s.NoError(err) + s.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode()) + }) +} + +func TestImplUtils(t *testing.T) { + suite.Run(t, new(ImplUtilsSuite)) +}