Add log for transfer segment operation (#20829) (#20847)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/20834/head
congqixia 2022-11-29 11:53:15 +08:00 committed by GitHub
parent ae42fd5a6e
commit 711c91eb9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 252 additions and 4 deletions

View File

@ -6,18 +6,29 @@ import (
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/samber/lo"
"go.uber.org/zap"
)
// TransferLoad transfers load segments with shard cluster.
func (node *QueryNode) TransferLoad(ctx context.Context, req *querypb.LoadSegmentsRequest) (*commonpb.Status, error) {
if len(req.GetInfos()) == 0 {
return &commonpb.Status{}, nil
}
shard := req.GetInfos()[0].GetInsertChannel()
segmentIDs := lo.Map(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) int64 {
return info.GetSegmentID()
})
log := log.Ctx(ctx).With(
zap.String("shard", shard),
zap.Int64s("segmentIDs", segmentIDs),
)
log.Info("LoadSegment start to transfer load with shard cluster")
shardCluster, ok := node.ShardClusterService.getShardCluster(shard)
if !ok {
log.Warn("TransferLoad failed to find shard cluster", zap.String("shard", shard))
log.Warn("TransferLoad failed to find shard cluster")
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_NotShardLeader,
Reason: "shard cluster not found, the leader may have changed",
@ -27,22 +38,33 @@ func (node *QueryNode) TransferLoad(ctx context.Context, req *querypb.LoadSegmen
req.NeedTransfer = false
err := shardCluster.LoadSegments(ctx, req)
if err != nil {
log.Warn("shard cluster failed to load segments", zap.String("shard", shard), zap.Error(err))
log.Warn("shard cluster failed to load segments", zap.Error(err))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}, nil
}
log.Info("LoadSegment transfer load done")
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
// TransferRelease transfers release segments with shard cluster.
func (node *QueryNode) TransferRelease(ctx context.Context, req *querypb.ReleaseSegmentsRequest) (*commonpb.Status, error) {
log := log.Ctx(ctx).With(
zap.String("shard", req.GetShard()),
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
zap.String("scope", req.GetScope().String()),
)
log.Info("ReleaseSegments start to transfer release with shard cluster")
shardCluster, ok := node.ShardClusterService.getShardCluster(req.GetShard())
if !ok {
log.Warn("TransferLoad failed to find shard cluster", zap.String("shard", req.GetShard()))
log.Warn("TransferLoad failed to find shard cluster")
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_NotShardLeader,
Reason: "shard cluster not found, the leader may have changed",
@ -52,12 +74,14 @@ func (node *QueryNode) TransferRelease(ctx context.Context, req *querypb.Release
req.NeedTransfer = false
err := shardCluster.ReleaseSegments(ctx, req, false)
if err != nil {
log.Warn("shard cluster failed to release segments", zap.String("shard", req.GetShard()), zap.Error(err))
log.Warn("shard cluster failed to release segments", zap.Error(err))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}, nil
}
log.Info("ReleaseSegments transfer release done")
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil

View File

@ -0,0 +1,224 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package querynode
import (
"context"
"testing"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/stretchr/testify/suite"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
)
type ImplUtilsSuite struct {
suite.Suite
querynode *QueryNode
client *clientv3.Client
}
func (s *ImplUtilsSuite) SetupSuite() {
s.querynode = newQueryNodeMock()
client := v3client.New(embedetcdServer.Server)
s.querynode.session = sessionutil.NewSession(context.Background(), "milvus_ut/sessions", client)
s.querynode.UpdateStateCode(commonpb.StateCode_Healthy)
s.querynode.ShardClusterService = newShardClusterService(client, s.querynode.session, s.querynode)
}
func (s *ImplUtilsSuite) TearDownSuite() {
s.querynode.Stop()
}
func (s *ImplUtilsSuite) SetupTest() {
nodeEvent := []nodeEvent{
{
nodeID: s.querynode.session.ServerID,
nodeAddr: s.querynode.session.ServerName,
isLeader: true,
},
}
cs := NewShardCluster(defaultCollectionID, defaultReplicaID, defaultChannelName, defaultVersion, &mockNodeDetector{
initNodes: nodeEvent,
}, &mockSegmentDetector{}, buildMockQueryNode)
s.querynode.ShardClusterService.clusters.Store(defaultChannelName, cs)
cs.SetupFirstVersion()
}
func (s *ImplUtilsSuite) TearDownTest() {
s.querynode.ShardClusterService.releaseCollection(defaultCollectionID)
}
func (s *ImplUtilsSuite) TestTransferLoad() {
ctx := context.Background()
s.Run("normal transfer load", func() {
status, err := s.querynode.TransferLoad(ctx, &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
TargetID: s.querynode.session.ServerID,
},
DstNodeID: s.querynode.session.ServerID,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: defaultSegmentID,
InsertChannel: defaultChannelName,
CollectionID: defaultCollectionID,
PartitionID: defaultPartitionID,
},
},
})
s.NoError(err)
s.Equal(commonpb.ErrorCode_Success, status.GetErrorCode())
})
s.Run("transfer non-exist channel load", func() {
status, err := s.querynode.TransferLoad(ctx, &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
TargetID: s.querynode.session.ServerID,
},
DstNodeID: s.querynode.session.ServerID,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: defaultSegmentID,
InsertChannel: "invalid_channel",
CollectionID: defaultCollectionID,
PartitionID: defaultPartitionID,
},
},
})
s.NoError(err)
s.Equal(commonpb.ErrorCode_NotShardLeader, status.GetErrorCode())
})
s.Run("transfer empty load segments", func() {
status, err := s.querynode.TransferLoad(ctx, &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
TargetID: s.querynode.session.ServerID,
},
DstNodeID: s.querynode.session.ServerID,
Infos: []*querypb.SegmentLoadInfo{},
})
s.NoError(err)
s.Equal(commonpb.ErrorCode_Success, status.GetErrorCode())
})
s.Run("transfer load fail", func() {
cs, ok := s.querynode.ShardClusterService.getShardCluster(defaultChannelName)
s.Require().True(ok)
cs.nodes[100] = &shardNode{
nodeID: 100,
nodeAddr: "test",
client: &mockShardQueryNode{
loadSegmentsResults: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "error",
},
},
}
status, err := s.querynode.TransferLoad(ctx, &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
TargetID: s.querynode.session.ServerID,
},
DstNodeID: 100,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: defaultSegmentID,
InsertChannel: defaultChannelName,
CollectionID: defaultCollectionID,
PartitionID: defaultPartitionID,
},
},
})
s.NoError(err)
s.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode())
})
}
func (s *ImplUtilsSuite) TestTransferRelease() {
ctx := context.Background()
s.Run("normal transfer release", func() {
status, err := s.querynode.TransferRelease(ctx, &querypb.ReleaseSegmentsRequest{
Base: &commonpb.MsgBase{
TargetID: s.querynode.session.ServerID,
},
SegmentIDs: []int64{},
Scope: querypb.DataScope_All,
Shard: defaultChannelName,
NodeID: s.querynode.session.ServerID,
})
s.NoError(err)
s.Equal(commonpb.ErrorCode_Success, status.GetErrorCode())
})
s.Run("transfer non-exist channel release", func() {
status, err := s.querynode.TransferRelease(ctx, &querypb.ReleaseSegmentsRequest{
Base: &commonpb.MsgBase{
TargetID: s.querynode.session.ServerID,
},
SegmentIDs: []int64{},
Scope: querypb.DataScope_All,
Shard: "invalid_channel",
NodeID: s.querynode.session.ServerID,
})
s.NoError(err)
s.Equal(commonpb.ErrorCode_NotShardLeader, status.GetErrorCode())
})
s.Run("transfer release fail", func() {
cs, ok := s.querynode.ShardClusterService.getShardCluster(defaultChannelName)
s.Require().True(ok)
cs.nodes[100] = &shardNode{
nodeID: 100,
nodeAddr: "test",
client: &mockShardQueryNode{
releaseSegmentsResult: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
},
}
status, err := s.querynode.TransferRelease(ctx, &querypb.ReleaseSegmentsRequest{
Base: &commonpb.MsgBase{
TargetID: s.querynode.session.ServerID,
},
SegmentIDs: []int64{},
Scope: querypb.DataScope_All,
Shard: defaultChannelName,
NodeID: 100,
})
s.NoError(err)
s.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode())
})
}
func TestImplUtils(t *testing.T) {
suite.Run(t, new(ImplUtilsSuite))
}