Handle errors by merr for QueryCoord (#24926)

Signed-off-by: yah01 <yang.cen@zilliz.com>
pull/25647/head
yah01 2023-07-17 14:59:34 +08:00 committed by GitHub
parent 04a2ddda17
commit 948d1f1f4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 288 additions and 379 deletions

View File

@ -122,7 +122,7 @@ func (lb *LBPolicyImpl) selectNode(ctx context.Context, workload ChannelWorkload
log.Warn("no available shard delegator found",
zap.Int64s("nodes", nodes),
zap.Int64s("excluded", excludeNodes.Collect()))
return -1, merr.WrapErrNoAvailableNode("all available nodes has been excluded")
return -1, merr.WrapErrServiceUnavailable("no available shard delegator found")
}
targetNode, err = lb.balancer.SelectNode(ctx, availableNodes, workload.nq)

View File

@ -190,7 +190,7 @@ func (s *LBPolicySuite) TestSelectNode() {
// test select node always fails, expected failure
s.lbBalancer.ExpectedCalls = nil
s.lbBalancer.EXPECT().SelectNode(mock.Anything, mock.Anything, mock.Anything).Return(-1, merr.ErrNoAvailableNode)
s.lbBalancer.EXPECT().SelectNode(mock.Anything, mock.Anything, mock.Anything).Return(-1, merr.ErrNodeNotAvailable)
targetNode, err = s.lbPolicy.selectNode(ctx, ChannelWorkload{
db: dbName,
collection: s.collection,
@ -198,12 +198,12 @@ func (s *LBPolicySuite) TestSelectNode() {
shardLeaders: []int64{},
nq: 1,
}, typeutil.NewUniqueSet())
s.ErrorIs(err, merr.ErrNoAvailableNode)
s.ErrorIs(err, merr.ErrNodeNotAvailable)
s.Equal(int64(-1), targetNode)
// test all nodes has been excluded, expected failure
s.lbBalancer.ExpectedCalls = nil
s.lbBalancer.EXPECT().SelectNode(mock.Anything, mock.Anything, mock.Anything).Return(-1, merr.ErrNoAvailableNode)
s.lbBalancer.EXPECT().SelectNode(mock.Anything, mock.Anything, mock.Anything).Return(-1, merr.ErrNodeNotAvailable)
targetNode, err = s.lbPolicy.selectNode(ctx, ChannelWorkload{
db: dbName,
collection: s.collection,
@ -211,14 +211,14 @@ func (s *LBPolicySuite) TestSelectNode() {
shardLeaders: s.nodes,
nq: 1,
}, typeutil.NewUniqueSet(s.nodes...))
s.ErrorIs(err, merr.ErrNoAvailableNode)
s.ErrorIs(err, merr.ErrServiceUnavailable)
s.Equal(int64(-1), targetNode)
// test get shard leaders failed, retry to select node failed
s.lbBalancer.ExpectedCalls = nil
s.lbBalancer.EXPECT().SelectNode(mock.Anything, mock.Anything, mock.Anything).Return(-1, merr.ErrNoAvailableNode)
s.lbBalancer.EXPECT().SelectNode(mock.Anything, mock.Anything, mock.Anything).Return(-1, merr.ErrNodeNotAvailable)
s.qc.ExpectedCalls = nil
s.qc.EXPECT().GetShardLeaders(mock.Anything, mock.Anything).Return(nil, merr.ErrNoAvailableNodeInReplica)
s.qc.EXPECT().GetShardLeaders(mock.Anything, mock.Anything).Return(nil, merr.ErrServiceUnavailable)
targetNode, err = s.lbPolicy.selectNode(ctx, ChannelWorkload{
db: dbName,
collection: s.collection,
@ -226,7 +226,7 @@ func (s *LBPolicySuite) TestSelectNode() {
shardLeaders: s.nodes,
nq: 1,
}, typeutil.NewUniqueSet())
s.ErrorIs(err, merr.ErrNoAvailableNodeInReplica)
s.ErrorIs(err, merr.ErrServiceUnavailable)
s.Equal(int64(-1), targetNode)
}
@ -253,7 +253,7 @@ func (s *LBPolicySuite) TestExecuteWithRetry() {
// test select node failed, expected error
s.lbBalancer.ExpectedCalls = nil
s.lbBalancer.EXPECT().SelectNode(mock.Anything, mock.Anything, mock.Anything).Return(-1, merr.ErrNoAvailableNode)
s.lbBalancer.EXPECT().SelectNode(mock.Anything, mock.Anything, mock.Anything).Return(-1, merr.ErrNodeNotAvailable)
err = s.lbPolicy.ExecuteWithRetry(ctx, ChannelWorkload{
db: dbName,
collection: s.collection,
@ -265,7 +265,7 @@ func (s *LBPolicySuite) TestExecuteWithRetry() {
},
retryTimes: 1,
})
s.ErrorIs(err, merr.ErrNoAvailableNode)
s.ErrorIs(err, merr.ErrNodeNotAvailable)
// test get client failed, and retry failed, expected success
s.mgr.ExpectedCalls = nil
@ -330,6 +330,7 @@ func (s *LBPolicySuite) TestExecuteWithRetry() {
func (s *LBPolicySuite) TestExecute() {
ctx := context.Background()
mockErr := errors.New("mock error")
// test all channel success
s.mgr.EXPECT().GetClient(mock.Anything, mock.Anything).Return(s.qn, nil)
s.lbBalancer.EXPECT().SelectNode(mock.Anything, mock.Anything, mock.Anything).Return(1, nil)
@ -355,7 +356,7 @@ func (s *LBPolicySuite) TestExecute() {
return nil
}
return errors.New("fake error")
return mockErr
},
})
s.Error(err)
@ -363,7 +364,7 @@ func (s *LBPolicySuite) TestExecute() {
// test get shard leader failed
s.qc.ExpectedCalls = nil
globalMetaCache.DeprecateShardCache(dbName, s.collection)
s.qc.EXPECT().GetShardLeaders(mock.Anything, mock.Anything).Return(nil, errors.New("fake error"))
s.qc.EXPECT().GetShardLeaders(mock.Anything, mock.Anything).Return(nil, mockErr)
err = s.lbPolicy.Execute(ctx, CollectionWorkLoad{
db: dbName,
collection: s.collection,
@ -372,7 +373,7 @@ func (s *LBPolicySuite) TestExecute() {
return nil
},
})
s.Error(err)
s.ErrorIs(err, mockErr)
}
func (s *LBPolicySuite) TestUpdateCostMetrics() {

View File

@ -112,7 +112,7 @@ func (b *LookAsideBalancer) SelectNode(ctx context.Context, availableNodes []int
}
if targetNode == -1 {
return -1, merr.WrapErrNoAvailableNode("all available nodes are unreachable")
return -1, merr.WrapErrServiceUnavailable("all available nodes are unreachable")
}
// update executing task cost

View File

@ -304,7 +304,7 @@ func (suite *LookAsideBalancerSuite) TestCheckHealthLoop() {
return suite.balancer.unreachableQueryNodes.Contain(1)
}, 2*time.Second, 100*time.Millisecond)
targetNode, err := suite.balancer.SelectNode(context.Background(), []int64{1}, 1)
suite.ErrorIs(err, merr.ErrNoAvailableNode)
suite.ErrorIs(err, merr.ErrServiceUnavailable)
suite.Equal(int64(-1), targetNode)
suite.Eventually(func() bool {

View File

@ -37,7 +37,7 @@ func NewRoundRobinBalancer() *RoundRobinBalancer {
func (b *RoundRobinBalancer) SelectNode(ctx context.Context, availableNodes []int64, cost int64) (int64, error) {
if len(availableNodes) == 0 {
return -1, merr.ErrNoAvailableNode
return -1, merr.ErrNodeNotAvailable
}
targetNode := int64(-1)

View File

@ -1,47 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package querycoordv2
import (
"fmt"
"time"
"github.com/cockroachdb/errors"
)
var (
// Node Availability
ErrLackSegment = errors.New("LackSegment")
ErrNodeOffline = errors.New("NodeOffline")
ErrNodeHeartbeatOutdated = errors.New("NodeHeartbeatOutdated")
)
func WrapErrLackSegment(segmentID int64) error {
return fmt.Errorf("%w(segmentID=%v)", ErrLackSegment, segmentID)
}
func WrapErrNodeOffline(nodeID int64) error {
return fmt.Errorf("%w(nodeID=%v)", ErrNodeOffline, nodeID)
}
func WrapErrNodeHeartbeatOutdated(nodeID int64, lastHeartbeat time.Time) error {
return fmt.Errorf("%w(nodeID=%v, lastHeartbeat=%v)",
ErrNodeHeartbeatOutdated,
nodeID,
lastHeartbeat,
)
}

View File

@ -29,7 +29,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/job"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
@ -339,7 +338,7 @@ func (s *Server) fillReplicaInfo(replica *meta.Replica, withShardNodes bool) (*m
if leaderInfo == nil {
msg := fmt.Sprintf("failed to get shard leader for shard %s, the collection not loaded or leader is offline", channel)
log.Warn(msg)
return nil, utils.WrapError(msg, session.WrapErrNodeNotFound(leader))
return nil, errors.Wrap(merr.WrapErrNodeNotFound(leader), msg)
}
shard := &milvuspb.ShardReplica{
@ -362,18 +361,11 @@ func (s *Server) fillReplicaInfo(replica *meta.Replica, withShardNodes bool) (*m
return info, nil
}
func errCode(err error) commonpb.ErrorCode {
if errors.Is(err, job.ErrLoadParameterMismatched) {
return commonpb.ErrorCode_IllegalArgument
}
return commonpb.ErrorCode_UnexpectedError
}
func checkNodeAvailable(nodeID int64, info *session.NodeInfo) error {
if info == nil {
return WrapErrNodeOffline(nodeID)
return merr.WrapErrNodeOffline(nodeID)
} else if time.Since(info.LastHeartbeat()) > Params.QueryCoordCfg.HeartbeatAvailableInterval.GetAsDuration(time.Millisecond) {
return WrapErrNodeHeartbeatOutdated(nodeID, info.LastHeartbeat())
return merr.WrapErrNodeOffline(nodeID, fmt.Sprintf("lastHB=%v", info.LastHeartbeat()))
}
return nil
}

View File

@ -1,30 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package job
import "github.com/cockroachdb/errors"
var (
// Common errors
ErrInvalidRequest = errors.New("InvalidRequest")
// Load errors
ErrCollectionLoaded = errors.New("CollectionLoaded")
ErrLoadParameterMismatched = errors.New("LoadParameterMismatched")
ErrNoEnoughNode = errors.New("NoEnoughNode")
ErrPartitionNotInTarget = errors.New("PartitionNotInLoadingTarget")
)

View File

@ -32,6 +32,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -94,12 +95,12 @@ func (job *LoadCollectionJob) PreExecute() error {
job.meta.GetReplicaNumber(req.GetCollectionID()),
)
log.Warn(msg)
return utils.WrapError(msg, ErrLoadParameterMismatched)
return merr.WrapErrParameterInvalid(collection.GetReplicaNumber(), req.GetReplicaNumber(), "can't change the replica number for loaded collection")
} else if !typeutil.MapEqual(collection.GetFieldIndexID(), req.GetFieldIndexID()) {
msg := fmt.Sprintf("collection with different index %v existed, release this collection first before changing its index",
collection.GetFieldIndexID())
log.Warn(msg)
return utils.WrapError(msg, ErrLoadParameterMismatched)
return merr.WrapErrParameterInvalid(collection.GetFieldIndexID(), req.GetFieldIndexID(), "can't change the index for loaded collection")
}
return nil
@ -115,7 +116,7 @@ func (job *LoadCollectionJob) Execute() error {
if err != nil {
msg := "failed to get partitions from RootCoord"
log.Error(msg, zap.Error(err))
return utils.WrapError(msg, err)
return errors.Wrap(err, msg)
}
loadedPartitionIDs := lo.Map(job.meta.CollectionManager.GetPartitionsByCollection(req.GetCollectionID()),
func(partition *meta.Partition, _ int) int64 {
@ -125,7 +126,7 @@ func (job *LoadCollectionJob) Execute() error {
return partID, !lo.Contains(loadedPartitionIDs, partID)
})
if len(lackPartitionIDs) == 0 {
return ErrCollectionLoaded
return nil
}
job.undo.CollectionID = req.GetCollectionID()
job.undo.LackPartitions = lackPartitionIDs
@ -138,7 +139,7 @@ func (job *LoadCollectionJob) Execute() error {
if err != nil {
msg := "failed to clear stale replicas"
log.Warn(msg, zap.Error(err))
return utils.WrapError(msg, err)
return errors.Wrap(err, msg)
}
}
@ -149,7 +150,7 @@ func (job *LoadCollectionJob) Execute() error {
if err != nil {
msg := "failed to spawn replica for collection"
log.Error(msg, zap.Error(err))
return utils.WrapError(msg, err)
return errors.Wrap(err, msg)
}
for _, replica := range replicas {
log.Info("replica created", zap.Int64("replicaID", replica.GetID()),
@ -169,7 +170,7 @@ func (job *LoadCollectionJob) Execute() error {
if err != nil {
msg := "failed to update next target"
log.Error(msg, zap.Error(err))
return utils.WrapError(msg, err)
return errors.Wrap(err, msg)
}
job.undo.TargetUpdated = true
@ -200,7 +201,7 @@ func (job *LoadCollectionJob) Execute() error {
if err != nil {
msg := "failed to store collection and partitions"
log.Error(msg, zap.Error(err))
return utils.WrapError(msg, err)
return errors.Wrap(err, msg)
}
metrics.QueryCoordNumPartitions.WithLabelValues().Add(float64(len(partitions)))
@ -208,7 +209,7 @@ func (job *LoadCollectionJob) Execute() error {
}
func (job *LoadCollectionJob) PostExecute() {
if job.Error() != nil && !errors.Is(job.Error(), ErrCollectionLoaded) {
if job.Error() != nil {
job.undo.RollBack()
}
}
@ -270,12 +271,12 @@ func (job *LoadPartitionJob) PreExecute() error {
if collection.GetReplicaNumber() != req.GetReplicaNumber() {
msg := "collection with different replica number existed, release this collection first before changing its replica number"
log.Warn(msg)
return utils.WrapError(msg, ErrLoadParameterMismatched)
return merr.WrapErrParameterInvalid(collection.GetReplicaNumber(), req.GetReplicaNumber(), "can't change the replica number for loaded partitions")
} else if !typeutil.MapEqual(collection.GetFieldIndexID(), req.GetFieldIndexID()) {
msg := fmt.Sprintf("collection with different index %v existed, release this collection first before changing its index",
job.meta.GetFieldIndex(req.GetCollectionID()))
log.Warn(msg)
return utils.WrapError(msg, ErrLoadParameterMismatched)
return merr.WrapErrParameterInvalid(collection.GetFieldIndexID(), req.GetFieldIndexID(), "can't change the index for loaded partitions")
}
return nil
@ -298,7 +299,7 @@ func (job *LoadPartitionJob) Execute() error {
return partID, !lo.Contains(loadedPartitionIDs, partID)
})
if len(lackPartitionIDs) == 0 {
return ErrCollectionLoaded
return nil
}
job.undo.CollectionID = req.GetCollectionID()
job.undo.LackPartitions = lackPartitionIDs
@ -311,7 +312,7 @@ func (job *LoadPartitionJob) Execute() error {
if err != nil {
msg := "failed to clear stale replicas"
log.Warn(msg, zap.Error(err))
return utils.WrapError(msg, err)
return errors.Wrap(err, msg)
}
}
@ -322,7 +323,7 @@ func (job *LoadPartitionJob) Execute() error {
if err != nil {
msg := "failed to spawn replica for collection"
log.Error(msg, zap.Error(err))
return utils.WrapError(msg, err)
return errors.Wrap(err, msg)
}
for _, replica := range replicas {
log.Info("replica created", zap.Int64("replicaID", replica.GetID()),
@ -342,7 +343,7 @@ func (job *LoadPartitionJob) Execute() error {
if err != nil {
msg := "failed to update next target"
log.Error(msg, zap.Error(err))
return utils.WrapError(msg, err)
return errors.Wrap(err, msg)
}
job.undo.TargetUpdated = true
@ -374,14 +375,14 @@ func (job *LoadPartitionJob) Execute() error {
if err != nil {
msg := "failed to store collection and partitions"
log.Error(msg, zap.Error(err))
return utils.WrapError(msg, err)
return errors.Wrap(err, msg)
}
} else { // collection exists, put partitions only
err = job.meta.CollectionManager.PutPartition(partitions...)
if err != nil {
msg := "failed to store partitions"
log.Error(msg, zap.Error(err))
return utils.WrapError(msg, err)
return errors.Wrap(err, msg)
}
}
@ -390,7 +391,7 @@ func (job *LoadPartitionJob) Execute() error {
}
func (job *LoadPartitionJob) PostExecute() {
if job.Error() != nil && !errors.Is(job.Error(), ErrCollectionLoaded) {
if job.Error() != nil {
job.undo.RollBack()
}
}

View File

@ -19,6 +19,7 @@ package job
import (
"context"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/zap"
@ -27,7 +28,6 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/observers"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
)
@ -86,7 +86,7 @@ func (job *ReleaseCollectionJob) Execute() error {
if err != nil {
msg := "failed to remove collection"
log.Warn(msg, zap.Error(err))
return utils.WrapError(msg, err)
return errors.Wrap(err, msg)
}
err = job.meta.ReplicaManager.RemoveCollection(req.GetCollectionID())
@ -173,7 +173,7 @@ func (job *ReleasePartitionJob) Execute() error {
if err != nil {
msg := "failed to release partitions from store"
log.Warn(msg, zap.Error(err))
return utils.WrapError(msg, err)
return errors.Wrap(err, msg)
}
err = job.meta.ReplicaManager.RemoveCollection(req.GetCollectionID())
if err != nil {
@ -188,7 +188,7 @@ func (job *ReleasePartitionJob) Execute() error {
if err != nil {
msg := "failed to release partitions from store"
log.Warn(msg, zap.Error(err))
return utils.WrapError(msg, err)
return errors.Wrap(err, msg)
}
job.targetMgr.RemovePartition(req.GetCollectionID(), toRelease...)
waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID(), toRelease...)

View File

@ -22,10 +22,10 @@ import (
"go.uber.org/zap"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/log"
)
@ -54,16 +54,6 @@ func NewSyncNewCreatedPartitionJob(
}
func (job *SyncNewCreatedPartitionJob) PreExecute() error {
// check if collection not load or loadType is loadPartition
collection := job.meta.GetCollection(job.req.GetCollectionID())
if collection == nil || collection.GetLoadType() == querypb.LoadType_LoadPartition {
return ErrPartitionNotInTarget
}
// check if partition already existed
if partition := job.meta.GetPartition(job.req.GetPartitionID()); partition != nil {
return ErrPartitionNotInTarget
}
return nil
}
@ -74,6 +64,17 @@ func (job *SyncNewCreatedPartitionJob) Execute() error {
zap.Int64("partitionID", req.GetPartitionID()),
)
// check if collection not load or loadType is loadPartition
collection := job.meta.GetCollection(job.req.GetCollectionID())
if collection == nil || collection.GetLoadType() == querypb.LoadType_LoadPartition {
return nil
}
// check if partition already existed
if partition := job.meta.GetPartition(job.req.GetPartitionID()); partition != nil {
return nil
}
err := loadPartitions(job.ctx, job.meta, job.cluster, job.broker, false, req.GetCollectionID(), req.GetPartitionID())
if err != nil {
return err
@ -92,7 +93,7 @@ func (job *SyncNewCreatedPartitionJob) Execute() error {
if err != nil {
msg := "failed to store partitions"
log.Error(msg, zap.Error(err))
return utils.WrapError(msg, err)
return errors.Wrap(err, msg)
}
return nil

View File

@ -26,7 +26,6 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -36,8 +35,8 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/observers"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/merr"
)
const (
@ -131,10 +130,10 @@ func (suite *JobSuite) SetupSuite() {
suite.cluster = session.NewMockCluster(suite.T())
suite.cluster.EXPECT().
LoadPartitions(mock.Anything, mock.Anything, mock.Anything).
Return(utils.WrapStatus(commonpb.ErrorCode_Success, ""), nil)
Return(merr.Status(nil), nil)
suite.cluster.EXPECT().
ReleasePartitions(mock.Anything, mock.Anything, mock.Anything).
Return(utils.WrapStatus(commonpb.ErrorCode_Success, ""), nil)
Return(merr.Status(nil), nil)
}
func (suite *JobSuite) SetupTest() {
@ -248,7 +247,7 @@ func (suite *JobSuite) TestLoadCollection() {
)
suite.scheduler.Add(job)
err := job.Wait()
suite.ErrorIs(err, ErrCollectionLoaded)
suite.NoError(err)
}
// Test load existed collection with different replica number
@ -273,7 +272,7 @@ func (suite *JobSuite) TestLoadCollection() {
)
suite.scheduler.Add(job)
err := job.Wait()
suite.ErrorIs(err, ErrLoadParameterMismatched)
suite.ErrorIs(err, merr.ErrParameterInvalid)
}
// Test load partition while collection exists
@ -300,7 +299,7 @@ func (suite *JobSuite) TestLoadCollection() {
)
suite.scheduler.Add(job)
err := job.Wait()
suite.ErrorIs(err, ErrCollectionLoaded)
suite.NoError(err)
}
suite.meta.ResourceManager.AddResourceGroup("rg1")
@ -438,7 +437,7 @@ func (suite *JobSuite) TestLoadCollectionWithDiffIndex() {
)
suite.scheduler.Add(job)
err := job.Wait()
suite.ErrorIs(err, ErrLoadParameterMismatched)
suite.ErrorIs(err, merr.ErrParameterInvalid)
}
}
@ -499,7 +498,7 @@ func (suite *JobSuite) TestLoadPartition() {
)
suite.scheduler.Add(job)
err := job.Wait()
suite.ErrorIs(err, ErrCollectionLoaded)
suite.NoError(err)
}
// Test load partition with different replica number
@ -526,7 +525,7 @@ func (suite *JobSuite) TestLoadPartition() {
)
suite.scheduler.Add(job)
err := job.Wait()
suite.ErrorIs(err, ErrLoadParameterMismatched)
suite.ErrorIs(err, merr.ErrParameterInvalid)
}
// Test load partition with more partition
@ -579,7 +578,7 @@ func (suite *JobSuite) TestLoadPartition() {
)
suite.scheduler.Add(job)
err := job.Wait()
suite.ErrorIs(err, ErrCollectionLoaded)
suite.NoError(err)
}
suite.meta.ResourceManager.AddResourceGroup("rg1")
@ -690,7 +689,7 @@ func (suite *JobSuite) TestDynamicLoad() {
job = newLoadPartJob(p0, p1, p2)
suite.scheduler.Add(job)
err = job.Wait()
suite.ErrorIs(err, ErrCollectionLoaded)
suite.NoError(err)
suite.assertPartitionLoaded(collection)
// loaded: p0, p1
@ -837,7 +836,7 @@ func (suite *JobSuite) TestLoadPartitionWithDiffIndex() {
)
suite.scheduler.Add(job)
err := job.Wait()
suite.ErrorIs(err, ErrLoadParameterMismatched)
suite.ErrorIs(err, merr.ErrParameterInvalid)
}
}
@ -1336,7 +1335,7 @@ func (suite *JobSuite) TestCallReleasePartitionFailed() {
return call.Method != "ReleasePartitions"
})
suite.cluster.EXPECT().ReleasePartitions(mock.Anything, mock.Anything, mock.Anything).
Return(utils.WrapStatus(commonpb.ErrorCode_Success, ""), nil)
Return(merr.Status(nil), nil)
}
func (suite *JobSuite) TestSyncNewCreatedPartition() {
@ -1376,7 +1375,7 @@ func (suite *JobSuite) TestSyncNewCreatedPartition() {
)
suite.scheduler.Add(job)
err = job.Wait()
suite.ErrorIs(err, ErrPartitionNotInTarget)
suite.NoError(err)
// test collection loaded, but its loadType is loadPartition
req = &querypb.SyncNewCreatedPartitionRequest{
@ -1392,7 +1391,7 @@ func (suite *JobSuite) TestSyncNewCreatedPartition() {
)
suite.scheduler.Add(job)
err = job.Wait()
suite.ErrorIs(err, ErrPartitionNotInTarget)
suite.NoError(err)
}
func (suite *JobSuite) loadAll() {

View File

@ -26,6 +26,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
. "github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -34,7 +35,6 @@ var (
ErrNodeAlreadyAssign = errors.New("node already assign to other resource group")
ErrRGIsFull = errors.New("resource group is full")
ErrRGIsEmpty = errors.New("resource group is empty")
ErrRGNotExist = errors.New("resource group doesn't exist")
ErrRGAlreadyExist = errors.New("resource group already exist")
ErrRGAssignNodeFailed = errors.New("failed to assign node to resource group")
ErrRGUnAssignNodeFailed = errors.New("failed to unassign node from resource group")
@ -45,7 +45,6 @@ var (
ErrRGNameIsEmpty = errors.New("resource group name couldn't be empty")
ErrDeleteDefaultRG = errors.New("delete default rg is not permitted")
ErrDeleteNonEmptyRG = errors.New("delete non-empty rg is not permitted")
ErrNodeNotExist = errors.New("node does not exist")
ErrNodeStopped = errors.New("node has been stopped")
ErrRGLimit = errors.New("resource group num reach limit 1024")
ErrNodeNotEnough = errors.New("nodes not enough")
@ -202,15 +201,15 @@ func (rm *ResourceManager) AssignNode(rgName string, node int64) error {
func (rm *ResourceManager) assignNode(rgName string, node int64) error {
if rm.groups[rgName] == nil {
return ErrRGNotExist
return merr.WrapErrResourceGroupNotFound(rgName)
}
if rm.nodeMgr.Get(node) == nil {
return ErrNodeNotExist
return merr.WrapErrNodeNotFound(node)
}
if ok, _ := rm.nodeMgr.IsStoppingNode(node); ok {
return ErrNodeStopped
return merr.WrapErrNodeNotAvailable(node)
}
rm.checkRGNodeStatus(rgName)
@ -271,7 +270,7 @@ func (rm *ResourceManager) UnassignNode(rgName string, node int64) error {
func (rm *ResourceManager) unassignNode(rgName string, node int64) error {
if rm.groups[rgName] == nil {
return ErrRGNotExist
return merr.WrapErrResourceGroupNotFound(rgName)
}
if !rm.groups[rgName].containsNode(node) {
@ -324,7 +323,7 @@ func (rm *ResourceManager) GetNodes(rgName string) ([]int64, error) {
rm.rwmutex.RLock()
defer rm.rwmutex.RUnlock()
if rm.groups[rgName] == nil {
return nil, ErrRGNotExist
return nil, merr.WrapErrResourceGroupNotFound(rgName)
}
rm.checkRGNodeStatus(rgName)
@ -397,7 +396,7 @@ func (rm *ResourceManager) GetResourceGroup(rgName string) (*ResourceGroup, erro
defer rm.rwmutex.RUnlock()
if rm.groups[rgName] == nil {
return nil, ErrRGNotExist
return nil, merr.WrapErrResourceGroupNotFound(rgName)
}
rm.checkRGNodeStatus(rgName)
@ -433,7 +432,7 @@ func (rm *ResourceManager) HandleNodeUp(node int64) (string, error) {
defer rm.rwmutex.Unlock()
if rm.nodeMgr.Get(node) == nil {
return "", ErrNodeNotExist
return "", merr.WrapErrNodeNotFound(node)
}
if ok, _ := rm.nodeMgr.IsStoppingNode(node); ok {
@ -515,8 +514,11 @@ func (rm *ResourceManager) TransferNode(from string, to string, numNode int) ([]
rm.rwmutex.Lock()
defer rm.rwmutex.Unlock()
if rm.groups[from] == nil || rm.groups[to] == nil {
return nil, ErrRGNotExist
if rm.groups[from] == nil {
return nil, merr.WrapErrResourceGroupNotFound(from)
}
if rm.groups[to] == nil {
return nil, merr.WrapErrResourceGroupNotFound(to)
}
rm.checkRGNodeStatus(from)
@ -614,7 +616,7 @@ func (rm *ResourceManager) AutoRecoverResourceGroup(rgName string) ([]int64, err
defer rm.rwmutex.Unlock()
if rm.groups[rgName] == nil {
return nil, ErrRGNotExist
return nil, merr.WrapErrResourceGroupNotFound(rgName)
}
ret := make([]int64, 0)

View File

@ -28,6 +28,7 @@ import (
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -91,11 +92,11 @@ func (suite *ResourceManagerSuite) TestManipulateNode() {
// test add non-exist node to rg
err = suite.manager.AssignNode("rg1", 2)
suite.ErrorIs(err, ErrNodeNotExist)
suite.ErrorIs(err, merr.ErrNodeNotFound)
// test add node to non-exist rg
err = suite.manager.AssignNode("rg2", 1)
suite.ErrorIs(err, ErrRGNotExist)
suite.ErrorIs(err, merr.ErrResourceGroupNotFound)
// test remove node from rg
err = suite.manager.UnassignNode("rg1", 1)
@ -107,7 +108,7 @@ func (suite *ResourceManagerSuite) TestManipulateNode() {
// test remove node from non-exist rg
err = suite.manager.UnassignNode("rg2", 1)
suite.ErrorIs(err, ErrRGNotExist)
suite.ErrorIs(err, merr.ErrResourceGroupNotFound)
// add node which already assign to rg to another rg
err = suite.manager.AddResourceGroup("rg2")
@ -123,7 +124,7 @@ func (suite *ResourceManagerSuite) TestManipulateNode() {
// transfer meet non exist rg
_, err = suite.manager.TransferNode("rgggg", "rg2", 1)
suite.ErrorIs(err, ErrRGNotExist)
suite.ErrorIs(err, merr.ErrResourceGroupNotFound)
_, err = suite.manager.TransferNode("rg1", "rg2", 5)
suite.ErrorIs(err, ErrNodeNotEnough)

View File

@ -39,7 +39,6 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/etcd"
@ -411,8 +410,8 @@ func (suite *ServerSuite) expectGetRecoverInfo(collection int64) {
}
func (suite *ServerSuite) expectLoadAndReleasePartitions(querynode *mocks.MockQueryNode) {
querynode.EXPECT().LoadPartitions(mock.Anything, mock.Anything).Return(utils.WrapStatus(commonpb.ErrorCode_Success, ""), nil).Maybe()
querynode.EXPECT().ReleasePartitions(mock.Anything, mock.Anything).Return(utils.WrapStatus(commonpb.ErrorCode_Success, ""), nil).Maybe()
querynode.EXPECT().LoadPartitions(mock.Anything, mock.Anything).Return(merr.Status(nil), nil).Maybe()
querynode.EXPECT().ReleasePartitions(mock.Anything, mock.Anything).Return(merr.Status(nil), nil).Maybe()
}
func (suite *ServerSuite) expectGetRecoverInfoByMockDataCoord(collection int64, dataCoord *coordMocks.MockDataCoord) {

View File

@ -45,25 +45,23 @@ import (
)
var (
ErrCreateResourceGroupFailed = errors.New("failed to create resource group")
ErrDropResourceGroupFailed = errors.New("failed to drop resource group")
ErrAddNodeToRGFailed = errors.New("failed to add node to resource group")
ErrRemoveNodeFromRGFailed = errors.New("failed to remove node from resource group")
ErrTransferNodeFailed = errors.New("failed to transfer node between resource group")
ErrTransferReplicaFailed = errors.New("failed to transfer replica between resource group")
ErrListResourceGroupsFailed = errors.New("failed to list resource group")
ErrDescribeResourceGroupFailed = errors.New("failed to describe resource group")
ErrLoadUseWrongRG = errors.New("load operation should use collection's resource group")
ErrLoadWithDefaultRG = errors.New("load operation can't use default resource group and other resource group together")
// ErrRemoveNodeFromRGFailed = errors.New("failed to remove node from resource group")
// ErrTransferNodeFailed = errors.New("failed to transfer node between resource group")
// ErrTransferReplicaFailed = errors.New("failed to transfer replica between resource group")
// ErrListResourceGroupsFailed = errors.New("failed to list resource group")
// ErrDescribeResourceGroupFailed = errors.New("failed to describe resource group")
// ErrLoadUseWrongRG = errors.New("load operation should use collection's resource group")
// ErrLoadWithDefaultRG = errors.New("load operation can't use default resource group and other resource group together")
)
func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
log.Ctx(ctx).Info("show collections request received", zap.Int64s("collections", req.GetCollectionIDs()))
if err := merr.CheckHealthy(s.State()); err != nil {
log.Warn("failed to show collections", zap.Error(err))
msg := "failed to show collections"
log.Warn(msg, zap.Error(err))
return &querypb.ShowCollectionsResponse{
Status: merr.Status(err),
Status: merr.Status(errors.Wrap(err, msg)),
}, nil
}
defer meta.GlobalFailedLoadCache.TryExpire()
@ -98,8 +96,9 @@ func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectio
}
err := meta.GlobalFailedLoadCache.Get(collectionID)
if err != nil {
log.Warn("show collection failed", zap.Error(err))
status := merr.Status(err)
msg := "show collection failed"
log.Warn(msg, zap.Error(err))
status := merr.Status(errors.Wrap(err, msg))
status.ErrorCode = commonpb.ErrorCode_InsufficientMemoryToLoad
return &querypb.ShowCollectionsResponse{
Status: status,
@ -134,9 +133,10 @@ func (s *Server) ShowPartitions(ctx context.Context, req *querypb.ShowPartitions
log.Info("show partitions request received", zap.Int64s("partitions", req.GetPartitionIDs()))
if err := merr.CheckHealthy(s.State()); err != nil {
log.Warn("failed to show partitions", zap.Error(err))
msg := "failed to show partitions"
log.Warn(msg, zap.Error(err))
return &querypb.ShowPartitionsResponse{
Status: merr.Status(err),
Status: merr.Status(errors.Wrap(err, msg)),
}, nil
}
defer meta.GlobalFailedLoadCache.TryExpire()
@ -162,10 +162,12 @@ func (s *Server) ShowPartitions(ctx context.Context, req *querypb.ShowPartitions
Status: status,
}, nil
}
err = merr.WrapErrPartitionNotLoaded(partitionID)
msg := fmt.Sprintf("partition %d has not been loaded to memory or load failed", partitionID)
log.Warn(msg)
return &querypb.ShowPartitionsResponse{
Status: utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg),
Status: merr.Status(errors.Wrap(err, msg)),
}, nil
}
percentages = append(percentages, int64(percentage))
@ -206,7 +208,7 @@ func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollection
msg := "failed to load collection"
log.Warn(msg, zap.Error(err))
metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc()
return merr.Status(err), nil
return merr.Status(errors.Wrap(err, msg)), nil
}
// If refresh mode is ON.
@ -222,7 +224,7 @@ func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollection
msg := "failed to load collection"
log.Warn(msg, zap.Error(err))
metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc()
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument, msg, err), nil
return merr.Status(errors.Wrap(err, msg)), nil
}
loadJob := job.NewLoadCollectionJob(ctx,
@ -237,11 +239,11 @@ func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollection
)
s.jobScheduler.Add(loadJob)
err := loadJob.Wait()
if err != nil && !errors.Is(err, job.ErrCollectionLoaded) {
if err != nil {
msg := "failed to load collection"
log.Warn(msg, zap.Error(err))
metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc()
return utils.WrapStatus(errCode(err), msg, err), nil
return merr.Status(errors.Wrap(err, msg)), nil
}
metrics.QueryCoordLoadCount.WithLabelValues(metrics.SuccessLabel).Inc()
@ -260,7 +262,7 @@ func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseColl
msg := "failed to release collection"
log.Warn(msg, zap.Error(err))
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.FailLabel).Inc()
return merr.Status(err), nil
return merr.Status(errors.Wrap(err, msg)), nil
}
releaseJob := job.NewReleaseCollectionJob(ctx,
@ -279,7 +281,7 @@ func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseColl
msg := "failed to release collection"
log.Error(msg, zap.Error(err))
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.FailLabel).Inc()
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, err), nil
return merr.Status(errors.Wrap(err, msg)), nil
}
log.Info("collection released")
@ -306,7 +308,7 @@ func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitions
msg := "failed to load partitions"
log.Warn(msg, zap.Error(err))
metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc()
return merr.Status(err), nil
return merr.Status(errors.Wrap(err, msg)), nil
}
// If refresh mode is ON.
@ -320,9 +322,9 @@ func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitions
if err := s.checkResourceGroup(req.GetCollectionID(), req.GetResourceGroups()); err != nil {
msg := "failed to load partitions"
log.Warn(msg, zap.Error(ErrLoadUseWrongRG))
log.Warn(msg, zap.Error(err))
metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc()
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument, msg, ErrLoadUseWrongRG), nil
return merr.Status(errors.Wrap(err, msg)), nil
}
loadJob := job.NewLoadPartitionJob(ctx,
@ -337,11 +339,11 @@ func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitions
)
s.jobScheduler.Add(loadJob)
err := loadJob.Wait()
if err != nil && !errors.Is(err, job.ErrCollectionLoaded) {
if err != nil {
msg := "failed to load partitions"
log.Warn(msg, zap.Error(err))
metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc()
return utils.WrapStatus(errCode(err), msg, err), nil
return merr.Status(errors.Wrap(err, msg)), nil
}
metrics.QueryCoordLoadCount.WithLabelValues(metrics.SuccessLabel).Inc()
@ -353,11 +355,11 @@ func (s *Server) checkResourceGroup(collectionID int64, resourceGroups []string)
collectionUsedRG := s.meta.ReplicaManager.GetResourceGroupByCollection(collectionID)
for _, rgName := range resourceGroups {
if len(collectionUsedRG) > 0 && !collectionUsedRG.Contain(rgName) {
return ErrLoadUseWrongRG
return merr.WrapErrParameterInvalid("created resource group(s)", rgName, "given resource group not found")
}
if len(resourceGroups) > 1 && rgName == meta.DefaultResourceGroupName {
return ErrLoadWithDefaultRG
return merr.WrapErrParameterInvalid("no default resource group mixed with the other resource group(s)", rgName)
}
}
}
@ -377,14 +379,14 @@ func (s *Server) ReleasePartitions(ctx context.Context, req *querypb.ReleasePart
msg := "failed to release partitions"
log.Warn(msg, zap.Error(err))
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.FailLabel).Inc()
return merr.Status(err), nil
return merr.Status(errors.Wrap(err, msg)), nil
}
if len(req.GetPartitionIDs()) == 0 {
msg := "partitions is empty"
log.Warn(msg)
err := merr.WrapErrParameterInvalid("any parttiion", "empty partition list")
log.Warn("no partition to release", zap.Error(err))
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.FailLabel).Inc()
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg), nil
return merr.Status(err), nil
}
tr := timerecord.NewTimeRecorder("release-partitions")
@ -404,7 +406,7 @@ func (s *Server) ReleasePartitions(ctx context.Context, req *querypb.ReleasePart
msg := "failed to release partitions"
log.Error(msg, zap.Error(err))
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.FailLabel).Inc()
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, err), nil
return merr.Status(errors.Wrap(err, msg)), nil
}
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.SuccessLabel).Inc()
@ -425,13 +427,13 @@ func (s *Server) GetPartitionStates(ctx context.Context, req *querypb.GetPartiti
msg := "failed to get partition states"
log.Warn(msg, zap.Error(err))
return &querypb.GetPartitionStatesResponse{
Status: merr.Status(err),
Status: merr.Status(errors.Wrap(err, msg)),
}, nil
}
msg := "partition not loaded"
notLoadResp := &querypb.GetPartitionStatesResponse{
Status: utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg),
Status: merr.Status(merr.WrapErrPartitionNotLoaded(req.GetPartitionIDs())),
}
states := make([]*querypb.PartitionStates, 0, len(req.GetPartitionIDs()))
@ -493,7 +495,7 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfo
msg := "failed to get segment info"
log.Warn(msg, zap.Error(err))
return &querypb.GetSegmentInfoResponse{
Status: merr.Status(err),
Status: merr.Status(errors.Wrap(err, msg)),
}, nil
}
@ -504,10 +506,11 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfo
for _, segmentID := range req.GetSegmentIDs() {
segments := s.dist.SegmentDistManager.Get(segmentID)
if len(segments) == 0 {
err := merr.WrapErrSegmentNotLoaded(segmentID)
msg := fmt.Sprintf("segment %v not found in any node", segmentID)
log.Warn(msg, zap.Int64("segment", segmentID))
return &querypb.GetSegmentInfoResponse{
Status: utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg),
Status: merr.Status(errors.Wrap(err, msg)),
}, nil
}
info := &querypb.SegmentInfo{}
@ -539,9 +542,9 @@ func (s *Server) SyncNewCreatedPartition(ctx context.Context, req *querypb.SyncN
syncJob := job.NewSyncNewCreatedPartitionJob(ctx, req, s.meta, s.cluster, s.broker)
s.jobScheduler.Add(syncJob)
err := syncJob.Wait()
if err != nil && !errors.Is(err, job.ErrPartitionNotInTarget) {
if err != nil {
log.Warn(failedMsg, zap.Error(err))
return utils.WrapStatus(errCode(err), failedMsg, err), nil
return merr.Status(err), nil
}
return merr.Status(nil), nil
@ -654,40 +657,44 @@ func (s *Server) LoadBalance(ctx context.Context, req *querypb.LoadBalanceReques
if err := merr.CheckHealthy(s.State()); err != nil {
msg := "failed to load balance"
log.Warn(msg, zap.Error(err))
return merr.Status(err), nil
return merr.Status(errors.Wrap(err, msg)), nil
}
// Verify request
if len(req.GetSourceNodeIDs()) != 1 {
err := merr.WrapErrParameterInvalid("only 1 source node", fmt.Sprintf("%d source nodes", len(req.GetSourceNodeIDs())))
msg := "source nodes can only contain 1 node"
log.Warn(msg, zap.Int("source-nodes-num", len(req.GetSourceNodeIDs())))
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg), nil
return merr.Status(err), nil
}
if s.meta.CollectionManager.CalculateLoadPercentage(req.GetCollectionID()) < 100 {
err := merr.WrapErrCollectionNotFullyLoaded(req.GetCollectionID())
msg := "can't balance segments of not fully loaded collection"
log.Warn(msg)
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg), nil
return merr.Status(err), nil
}
srcNode := req.GetSourceNodeIDs()[0]
replica := s.meta.ReplicaManager.GetByCollectionAndNode(req.GetCollectionID(), srcNode)
if replica == nil {
err := merr.WrapErrReplicaNotFound(-1, fmt.Sprintf("replica not found for collection %d and node %d", req.GetCollectionID(), srcNode))
msg := "source node not found in any replica"
log.Warn(msg)
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg), nil
return merr.Status(err), nil
}
if err := s.isStoppingNode(srcNode); err != nil {
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError,
fmt.Sprintf("can't balance, because the source node[%d] is invalid", srcNode), err), nil
return merr.Status(errors.Wrap(err,
fmt.Sprintf("can't balance, because the source node[%d] is invalid", srcNode))), nil
}
for _, dstNode := range req.GetDstNodeIDs() {
if !replica.Contains(dstNode) {
err := merr.WrapErrParameterInvalid("destination node in the same replica as source node", fmt.Sprintf("destination node %d not in replica %d", dstNode, replica.GetID()))
msg := "destination nodes have to be in the same replica of source node"
log.Warn(msg)
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg), nil
return merr.Status(err), nil
}
if err := s.isStoppingNode(dstNode); err != nil {
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError,
fmt.Sprintf("can't balance, because the destination node[%d] is invalid", dstNode), err), nil
return merr.Status(errors.Wrap(err,
fmt.Sprintf("can't balance, because the destination node[%d] is invalid", dstNode))), nil
}
}
@ -695,7 +702,7 @@ func (s *Server) LoadBalance(ctx context.Context, req *querypb.LoadBalanceReques
if err != nil {
msg := "failed to balance segments"
log.Warn(msg, zap.Error(err))
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, err), nil
return merr.Status(errors.Wrap(err, msg)), nil
}
return merr.Status(nil), nil
}
@ -709,7 +716,7 @@ func (s *Server) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon
msg := "failed to show configurations"
log.Warn(msg, zap.Error(err))
return &internalpb.ShowConfigurationsResponse{
Status: merr.Status(err),
Status: merr.Status(errors.Wrap(err, msg)),
}, nil
}
configList := make([]*commonpb.KeyValuePair, 0)
@ -740,7 +747,7 @@ func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest
msg := "failed to get metrics"
log.Warn(msg, zap.Error(err))
return &milvuspb.GetMetricsResponse{
Status: merr.Status(err),
Status: merr.Status(errors.Wrap(err, msg)),
}, nil
}
@ -754,7 +761,7 @@ func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest
if err != nil {
msg := "failed to parse metric type"
log.Warn(msg, zap.Error(err))
resp.Status = utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, err)
resp.Status = merr.Status(errors.Wrap(err, msg))
return resp, nil
}
@ -762,7 +769,7 @@ func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest
msg := "invalid metric type"
err := errors.New(metricsinfo.MsgUnimplementedMetric)
log.Warn(msg, zap.Error(err))
resp.Status = utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, err)
resp.Status = merr.Status(errors.Wrap(err, msg))
return resp, nil
}
@ -770,7 +777,7 @@ func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest
if err != nil {
msg := "failed to get system info metrics"
log.Warn(msg, zap.Error(err))
resp.Status = utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, err)
resp.Status = merr.Status(errors.Wrap(err, msg))
return resp, nil
}
@ -788,7 +795,7 @@ func (s *Server) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasReque
msg := "failed to get replicas"
log.Warn(msg, zap.Error(err))
return &milvuspb.GetReplicasResponse{
Status: merr.Status(err),
Status: merr.Status(errors.Wrap(err, msg)),
}, nil
}
@ -799,20 +806,21 @@ func (s *Server) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasReque
replicas := s.meta.ReplicaManager.GetByCollection(req.GetCollectionID())
if len(replicas) == 0 {
err := merr.WrapErrReplicaNotFound(req.GetCollectionID(), "failed to get replicas by collection")
msg := "failed to get replicas, collection not loaded"
log.Warn(msg)
resp.Status = utils.WrapStatus(commonpb.ErrorCode_MetaFailed, msg)
resp.Status = merr.Status(err)
return resp, nil
}
for _, replica := range replicas {
msg := "failed to get replica info"
if len(replica.GetNodes()) == 0 {
err := merr.WrapErrNoAvailableNodeInReplica(replica.ID)
err := merr.WrapErrReplicaNotAvailable(replica.GetID(), "no available nodes in replica")
log.Warn(msg,
zap.Int64("replica", replica.GetID()),
zap.Error(err))
resp.Status = utils.WrapStatus(commonpb.ErrorCode_MetaFailed, msg, err)
resp.Status = merr.Status(err)
break
}
@ -821,7 +829,7 @@ func (s *Server) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasReque
log.Warn(msg,
zap.Int64("replica", replica.GetID()),
zap.Error(err))
resp.Status = utils.WrapStatus(commonpb.ErrorCode_MetaFailed, msg, err)
resp.Status = merr.Status(err)
break
}
resp.Replicas = append(resp.Replicas, info)
@ -839,7 +847,7 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade
msg := "failed to get shard leaders"
log.Warn(msg, zap.Error(err))
return &querypb.GetShardLeadersResponse{
Status: merr.Status(err),
Status: merr.Status(errors.Wrap(err, msg)),
}, nil
}
@ -860,18 +868,19 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade
percentage = 100
}
if percentage < 100 {
err := merr.WrapErrCollectionNotFullyLoaded(req.GetCollectionID())
msg := fmt.Sprintf("collection %v is not fully loaded", req.GetCollectionID())
log.Warn(msg)
resp.Status = utils.WrapStatus(commonpb.ErrorCode_NoReplicaAvailable, msg)
resp.Status = merr.Status(err)
return resp, nil
}
channels := s.targetMgr.GetDmChannelsByCollection(req.GetCollectionID(), meta.CurrentTarget)
if len(channels) == 0 {
msg := "failed to get channels"
err := merr.WrapErrCollectionNotFound(req.GetCollectionID())
err := merr.WrapErrCollectionNotLoaded(req.GetCollectionID())
log.Warn(msg, zap.Error(err))
resp.Status = utils.WrapStatus(commonpb.ErrorCode_MetaFailed, msg, err)
resp.Status = merr.Status(err)
return resp, nil
}
@ -929,7 +938,7 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade
_, exist := leader.Segments[segmentID]
if !exist {
log.Info("leader is not available due to lack of segment", zap.Int64("segmentID", segmentID))
multierr.AppendInto(&channelErr, WrapErrLackSegment(segmentID))
multierr.AppendInto(&channelErr, merr.WrapErrSegmentLack(segmentID))
isAvailable = false
break
}
@ -945,7 +954,8 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade
if len(ids) == 0 {
msg := fmt.Sprintf("channel %s is not available in any replica", channel.GetChannelName())
log.Warn(msg, zap.Error(channelErr))
resp.Status = utils.WrapStatus(commonpb.ErrorCode_NoReplicaAvailable, msg, channelErr)
resp.Status = merr.Status(
errors.Wrap(merr.WrapErrChannelNotAvailable(channel.GetChannelName()), channelErr.Error()))
resp.Shards = nil
return resp, nil
}
@ -1030,14 +1040,15 @@ func (s *Server) DropResourceGroup(ctx context.Context, req *milvuspb.DropResour
replicas := s.meta.ReplicaManager.GetByResourceGroup(req.GetResourceGroup())
if len(replicas) > 0 {
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
fmt.Sprintf("some replicas still loaded in resource group[%s], release it first", req.GetResourceGroup()), meta.ErrDeleteNonEmptyRG), nil
err := merr.WrapErrParameterInvalid("empty resource group", fmt.Sprintf("resource group %s has collection %d loaded", req.GetResourceGroup(), replicas[0].GetCollectionID()))
return merr.Status(errors.Wrap(err,
fmt.Sprintf("some replicas still loaded in resource group[%s], release it first", req.GetResourceGroup()))), nil
}
err := s.meta.ResourceManager.RemoveResourceGroup(req.GetResourceGroup())
if err != nil {
log.Warn("failed to drop resource group", zap.Error(err))
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, "failed to drop resource group", err), nil
return merr.Status(err), nil
}
return merr.Status(nil), nil
}
@ -1056,40 +1067,41 @@ func (s *Server) TransferNode(ctx context.Context, req *milvuspb.TransferNodeReq
}
if ok := s.meta.ResourceManager.ContainResourceGroup(req.GetSourceResourceGroup()); !ok {
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
fmt.Sprintf("the source resource group[%s] doesn't exist", req.GetTargetResourceGroup()), meta.ErrRGNotExist), nil
err := merr.WrapErrParameterInvalid("valid resource group", req.GetSourceResourceGroup(), "source resource group not found")
return merr.Status(err), nil
}
if ok := s.meta.ResourceManager.ContainResourceGroup(req.GetTargetResourceGroup()); !ok {
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
fmt.Sprintf("the target resource group[%s] doesn't exist", req.GetTargetResourceGroup()), meta.ErrRGNotExist), nil
err := merr.WrapErrParameterInvalid("valid resource group", req.GetTargetResourceGroup(), "target resource group not found")
return merr.Status(err), nil
}
if req.GetNumNode() <= 0 {
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
fmt.Sprintf("transfer node num can't be [%d]", req.GetNumNode()), nil), nil
err := merr.WrapErrParameterInvalid("NumNode > 0", fmt.Sprintf("invalid NumNode %d", req.GetNumNode()))
return merr.Status(err), nil
}
replicasInSource := s.meta.ReplicaManager.GetByResourceGroup(req.GetSourceResourceGroup())
replicasInTarget := s.meta.ReplicaManager.GetByResourceGroup(req.GetTargetResourceGroup())
loadSameCollection := false
sameCollectionID := int64(0)
for _, r1 := range replicasInSource {
for _, r2 := range replicasInTarget {
if r1.GetCollectionID() == r2.GetCollectionID() {
loadSameCollection = true
sameCollectionID = r1.GetCollectionID()
}
}
}
if loadSameCollection {
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
fmt.Sprintf("can't transfer node, cause the resource group[%s] and the resource group[%s] loaded same collection",
req.GetSourceResourceGroup(), req.GetTargetResourceGroup()), meta.ErrRGNotExist), nil
err := merr.WrapErrParameterInvalid("resource groups load not the same collection", fmt.Sprintf("collection %d loaded for both", sameCollectionID))
return merr.Status(err), nil
}
nodes, err := s.meta.ResourceManager.TransferNode(req.GetSourceResourceGroup(), req.GetTargetResourceGroup(), int(req.GetNumNode()))
if err != nil {
log.Warn(ErrTransferNodeFailed.Error(), zap.Error(err))
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, ErrTransferNodeFailed.Error(), err), nil
log.Warn("failed to transfer node", zap.Error(err))
return merr.Status(err), nil
}
utils.AddNodesToCollectionsInRG(s.meta, req.GetTargetResourceGroup(), nodes...)
@ -1111,44 +1123,47 @@ func (s *Server) TransferReplica(ctx context.Context, req *querypb.TransferRepli
}
if ok := s.meta.ResourceManager.ContainResourceGroup(req.GetSourceResourceGroup()); !ok {
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
fmt.Sprintf("the source resource group[%s] doesn't exist", req.GetSourceResourceGroup()), meta.ErrRGNotExist), nil
err := merr.WrapErrResourceGroupNotFound(req.GetSourceResourceGroup())
return merr.Status(errors.Wrap(err,
fmt.Sprintf("the source resource group[%s] doesn't exist", req.GetSourceResourceGroup()))), nil
}
if ok := s.meta.ResourceManager.ContainResourceGroup(req.GetTargetResourceGroup()); !ok {
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
fmt.Sprintf("the target resource group[%s] doesn't exist", req.GetTargetResourceGroup()), meta.ErrRGNotExist), nil
err := merr.WrapErrResourceGroupNotFound(req.GetTargetResourceGroup())
return merr.Status(errors.Wrap(err,
fmt.Sprintf("the target resource group[%s] doesn't exist", req.GetTargetResourceGroup()))), nil
}
if req.GetNumReplica() <= 0 {
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
fmt.Sprintf("transfer replica num can't be [%d]", req.GetNumReplica()), nil), nil
err := merr.WrapErrParameterInvalid("NumReplica > 0", fmt.Sprintf("invalid NumReplica %d", req.GetNumReplica()))
return merr.Status(err), nil
}
replicas := s.meta.ReplicaManager.GetByCollectionAndRG(req.GetCollectionID(), req.GetSourceResourceGroup())
if len(replicas) < int(req.GetNumReplica()) {
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
fmt.Sprintf("only found [%d] replicas in source resource group[%s]",
len(replicas), req.GetSourceResourceGroup())), nil
err := merr.WrapErrParameterInvalid("NumReplica not greater than the number of replica in source resource group", fmt.Sprintf("only found [%d] replicas in source resource group[%s]",
len(replicas), req.GetSourceResourceGroup()))
return merr.Status(err), nil
}
replicas = s.meta.ReplicaManager.GetByCollectionAndRG(req.GetCollectionID(), req.GetTargetResourceGroup())
if len(replicas) > 0 {
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
fmt.Sprintf("found [%d] replicas of same collection in target resource group[%s], dynamically increase replica num is unsupported",
len(replicas), req.GetTargetResourceGroup())), nil
err := merr.WrapErrParameterInvalid("no same collection in target resource group", fmt.Sprintf("found [%d] replicas of same collection in target resource group[%s], dynamically increase replica num is unsupported",
len(replicas), req.GetTargetResourceGroup()))
return merr.Status(err), nil
}
replicas = s.meta.ReplicaManager.GetByCollection(req.GetCollectionID())
if (req.GetSourceResourceGroup() == meta.DefaultResourceGroupName || req.GetTargetResourceGroup() == meta.DefaultResourceGroupName) &&
len(replicas) != int(req.GetNumReplica()) {
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
"transfer replica will cause replica loaded in both default rg and other rg", nil), nil
err := merr.WrapErrParameterInvalid("tranfer all replicas from/to default resource group",
fmt.Sprintf("try to transfer %d replicas from/to but %d replicas exist", req.GetNumReplica(), len(replicas)))
return merr.Status(err), nil
}
err := s.transferReplica(req.GetTargetResourceGroup(), replicas[:req.GetNumReplica()])
if err != nil {
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument, ErrTransferReplicaFailed.Error(), err), nil
return merr.Status(err), nil
}
return merr.Status(nil), nil
@ -1197,14 +1212,14 @@ func (s *Server) DescribeResourceGroup(ctx context.Context, req *querypb.Describ
Status: merr.Status(nil),
}
if err := merr.CheckHealthy(s.State()); err != nil {
log.Warn(ErrDescribeResourceGroupFailed.Error(), zap.Error(err))
log.Warn("failed to describe resource group", zap.Error(err))
resp.Status = merr.Status(err)
return resp, nil
}
rg, err := s.meta.ResourceManager.GetResourceGroup(req.GetResourceGroup())
if err != nil {
resp.Status = utils.WrapStatus(commonpb.ErrorCode_IllegalArgument, ErrDescribeResourceGroupFailed.Error(), err)
resp.Status = merr.Status(err)
return resp, nil
}

View File

@ -435,7 +435,7 @@ func (suite *ServiceSuite) TestResourceGroupFailed() {
}
resp, err := server.DescribeResourceGroup(ctx, describeRG)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.GetStatus().GetErrorCode())
suite.ErrorIs(merr.Error(resp.GetStatus()), merr.ErrResourceGroupNotFound)
// server unhealthy
server.UpdateStateCode(commonpb.StateCode_Abnormal)
@ -520,7 +520,6 @@ func (suite *ServiceSuite) TestTransferNode() {
})
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode)
suite.Contains(resp.Reason, "can't transfer node")
// test transfer node meet non-exist source rg
resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{
@ -528,7 +527,6 @@ func (suite *ServiceSuite) TestTransferNode() {
TargetResourceGroup: meta.DefaultResourceGroupName,
})
suite.NoError(err)
suite.Contains(resp.Reason, meta.ErrRGNotExist.Error())
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode)
// test transfer node meet non-exist target rg
@ -537,7 +535,6 @@ func (suite *ServiceSuite) TestTransferNode() {
TargetResourceGroup: "rgggg",
})
suite.NoError(err)
suite.Contains(resp.Reason, meta.ErrRGNotExist.Error())
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode)
err = server.meta.ResourceManager.AddResourceGroup("rg3")
@ -611,7 +608,7 @@ func (suite *ServiceSuite) TestTransferReplica() {
NumReplica: 2,
})
suite.NoError(err)
suite.Contains(resp.Reason, "only found [0] replicas in source resource group")
suite.ErrorIs(merr.Error(resp), merr.ErrParameterInvalid)
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
SourceResourceGroup: "rgg",
@ -620,7 +617,7 @@ func (suite *ServiceSuite) TestTransferReplica() {
NumReplica: 2,
})
suite.NoError(err)
suite.Equal(resp.ErrorCode, commonpb.ErrorCode_IllegalArgument)
suite.ErrorIs(merr.Error(resp), merr.ErrResourceGroupNotFound)
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
SourceResourceGroup: meta.DefaultResourceGroupName,
@ -629,7 +626,7 @@ func (suite *ServiceSuite) TestTransferReplica() {
NumReplica: 2,
})
suite.NoError(err)
suite.Equal(resp.ErrorCode, commonpb.ErrorCode_IllegalArgument)
suite.ErrorIs(merr.Error(resp), merr.ErrResourceGroupNotFound)
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
SourceResourceGroup: meta.DefaultResourceGroupName,
@ -638,7 +635,7 @@ func (suite *ServiceSuite) TestTransferReplica() {
NumReplica: 0,
})
suite.NoError(err)
suite.Equal(resp.ErrorCode, commonpb.ErrorCode_IllegalArgument)
suite.ErrorIs(merr.Error(resp), merr.ErrParameterInvalid)
suite.server.meta.Put(meta.NewReplica(&querypb.Replica{
CollectionID: 1,
@ -693,7 +690,7 @@ func (suite *ServiceSuite) TestTransferReplica() {
NumReplica: 1,
})
suite.NoError(err)
suite.Contains(resp.Reason, "transfer replica will cause replica loaded in both default rg and other rg")
suite.ErrorIs(merr.Error(resp), merr.ErrParameterInvalid)
replicaNum := len(suite.server.meta.ReplicaManager.GetByCollection(1))
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
@ -731,8 +728,7 @@ func (suite *ServiceSuite) TestLoadCollectionFailed() {
}
resp, err := server.LoadCollection(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode)
suite.Contains(resp.Reason, job.ErrLoadParameterMismatched.Error())
suite.ErrorIs(merr.Error(resp), merr.ErrParameterInvalid)
}
req := &querypb.LoadCollectionRequest{
@ -756,7 +752,6 @@ func (suite *ServiceSuite) TestLoadCollectionFailed() {
resp, err := server.LoadCollection(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode)
suite.Contains(resp.Reason, job.ErrLoadParameterMismatched.Error())
}
// Test load with wrong rg num
@ -769,7 +764,6 @@ func (suite *ServiceSuite) TestLoadCollectionFailed() {
resp, err := server.LoadCollection(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode)
suite.Contains(resp.Reason, ErrLoadUseWrongRG.Error())
}
}
@ -866,7 +860,6 @@ func (suite *ServiceSuite) TestLoadPartitionFailed() {
resp, err := server.LoadPartitions(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode)
suite.Contains(resp.Reason, job.ErrLoadParameterMismatched.Error())
}
}
@ -916,7 +909,7 @@ func (suite *ServiceSuite) TestReleasePartition() {
// Test release all partitions
suite.cluster.EXPECT().ReleasePartitions(mock.Anything, mock.Anything, mock.Anything).
Return(utils.WrapStatus(commonpb.ErrorCode_Success, ""), nil)
Return(merr.Status(nil), nil)
for _, collection := range suite.collections {
req := &querypb.ReleasePartitionsRequest{
CollectionID: collection,
@ -1199,8 +1192,7 @@ func (suite *ServiceSuite) TestLoadBalanceFailed() {
}
resp, err := server.LoadBalance(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode)
suite.Contains(resp.Reason, "source nodes can only contain 1 node")
suite.ErrorIs(merr.Error(resp), merr.ErrParameterInvalid)
}
// Test load balance with not fully loaded
@ -1219,8 +1211,7 @@ func (suite *ServiceSuite) TestLoadBalanceFailed() {
}
resp, err := server.LoadBalance(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode)
suite.Contains(resp.Reason, "can't balance segments of not fully loaded collection")
suite.ErrorIs(merr.Error(resp), merr.ErrCollectionNotFullyLoaded)
}
// Test load balance with source node and dest node not in the same replica
@ -1243,8 +1234,7 @@ func (suite *ServiceSuite) TestLoadBalanceFailed() {
}
resp, err := server.LoadBalance(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode)
suite.Contains(resp.Reason, "destination nodes have to be in the same replica of source node")
suite.ErrorIs(merr.Error(resp), merr.ErrParameterInvalid)
}
// Test balance task failed
@ -1424,8 +1414,7 @@ func (suite *ServiceSuite) TestGetReplicasFailed() {
}
resp, err := server.GetReplicas(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_MetaFailed, resp.GetStatus().GetErrorCode())
suite.EqualValues(resp.GetStatus().GetReason(), "failed to get replica info, err=replica=100001: no available node in replica")
suite.ErrorIs(merr.Error(resp.GetStatus()), merr.ErrReplicaNotAvailable)
}
func (suite *ServiceSuite) TestCheckHealth() {
@ -1731,7 +1720,7 @@ func (suite *ServiceSuite) expectLoadPartitions() {
suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).
Return(nil, nil)
suite.cluster.EXPECT().LoadPartitions(mock.Anything, mock.Anything, mock.Anything).
Return(utils.WrapStatus(commonpb.ErrorCode_Success, ""), nil)
Return(merr.Status(nil), nil)
}
func (suite *ServiceSuite) getAllSegments(collection int64) []int64 {

View File

@ -213,7 +213,7 @@ func (suite *TaskSuite) TestSubscribeChannelTask() {
},
},
}, nil)
suite.cluster.EXPECT().WatchDmChannels(mock.Anything, targetNode, mock.Anything).Return(utils.WrapStatus(commonpb.ErrorCode_Success, ""), nil)
suite.cluster.EXPECT().WatchDmChannels(mock.Anything, targetNode, mock.Anything).Return(merr.Status(nil), nil)
// Test subscribe channel task
tasks := []Task{}
@ -308,7 +308,7 @@ func (suite *TaskSuite) TestUnsubscribeChannelTask() {
targetNode := int64(1)
// Expect
suite.cluster.EXPECT().UnsubDmChannel(mock.Anything, targetNode, mock.Anything).Return(utils.WrapStatus(commonpb.ErrorCode_Success, ""), nil)
suite.cluster.EXPECT().UnsubDmChannel(mock.Anything, targetNode, mock.Anything).Return(merr.Status(nil), nil)
// Test unsubscribe channel task
tasks := []Task{}
@ -386,7 +386,7 @@ func (suite *TaskSuite) TestLoadSegmentTask() {
}, nil)
suite.broker.EXPECT().GetIndexInfo(mock.Anything, suite.collection, segment).Return(nil, nil)
}
suite.cluster.EXPECT().LoadSegments(mock.Anything, targetNode, mock.Anything).Return(utils.WrapStatus(commonpb.ErrorCode_Success, ""), nil)
suite.cluster.EXPECT().LoadSegments(mock.Anything, targetNode, mock.Anything).Return(merr.Status(nil), nil)
// Test load segment task
suite.dist.ChannelDistManager.Update(targetNode, meta.DmChannelFromVChannel(&datapb.VchannelInfo{
@ -529,7 +529,7 @@ func (suite *TaskSuite) TestReleaseSegmentTask() {
}
// Expect
suite.cluster.EXPECT().ReleaseSegments(mock.Anything, targetNode, mock.Anything).Return(utils.WrapStatus(commonpb.ErrorCode_Success, ""), nil)
suite.cluster.EXPECT().ReleaseSegments(mock.Anything, targetNode, mock.Anything).Return(merr.Status(nil), nil)
// Test load segment task
view := &meta.LeaderView{
@ -590,7 +590,7 @@ func (suite *TaskSuite) TestReleaseGrowingSegmentTask() {
targetNode := int64(3)
// Expect
suite.cluster.EXPECT().ReleaseSegments(mock.Anything, targetNode, mock.Anything).Return(utils.WrapStatus(commonpb.ErrorCode_Success, ""), nil)
suite.cluster.EXPECT().ReleaseSegments(mock.Anything, targetNode, mock.Anything).Return(merr.Status(nil), nil)
tasks := []Task{}
for _, segment := range suite.releaseSegments {
@ -667,10 +667,8 @@ func (suite *TaskSuite) TestMoveSegmentTask() {
}, nil)
suite.broker.EXPECT().GetIndexInfo(mock.Anything, suite.collection, segment).Return(nil, nil)
}
suite.cluster.EXPECT().LoadSegments(mock.Anything, leader, mock.Anything).Return(utils.WrapStatus(commonpb.ErrorCode_Success, ""), nil)
suite.cluster.EXPECT().ReleaseSegments(mock.Anything, leader, mock.Anything).Return(utils.WrapStatus(commonpb.ErrorCode_Success, ""), nil)
// Test move segment task
suite.cluster.EXPECT().LoadSegments(mock.Anything, leader, mock.Anything).Return(merr.Status(nil), nil)
suite.cluster.EXPECT().ReleaseSegments(mock.Anything, leader, mock.Anything).Return(merr.Status(nil), nil)
vchannel := &datapb.VchannelInfo{
CollectionID: suite.collection,
ChannelName: channel.ChannelName,
@ -767,7 +765,7 @@ func (suite *TaskSuite) TestTaskCanceled() {
}, nil)
suite.broker.EXPECT().GetIndexInfo(mock.Anything, suite.collection, segment).Return(nil, nil)
}
suite.cluster.EXPECT().LoadSegments(mock.Anything, targetNode, mock.Anything).Return(utils.WrapStatus(commonpb.ErrorCode_Success, ""), nil)
suite.cluster.EXPECT().LoadSegments(mock.Anything, targetNode, mock.Anything).Return(merr.Status(nil), nil)
// Test load segment task
suite.dist.ChannelDistManager.Update(targetNode, meta.DmChannelFromVChannel(&datapb.VchannelInfo{
@ -846,7 +844,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() {
}, nil)
suite.broker.EXPECT().GetIndexInfo(mock.Anything, suite.collection, segment).Return(nil, nil)
}
suite.cluster.EXPECT().LoadSegments(mock.Anything, targetNode, mock.Anything).Return(utils.WrapStatus(commonpb.ErrorCode_Success, ""), nil)
suite.cluster.EXPECT().LoadSegments(mock.Anything, targetNode, mock.Anything).Return(merr.Status(nil), nil)
// Test load segment task
suite.meta.ReplicaManager.Put(createReplica(suite.collection, targetNode))

View File

@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
)
var (
@ -221,7 +222,7 @@ func SpawnReplicasWithRG(m *meta.Meta, collection int64, resourceGroups []string
replicaSet := make([]*meta.Replica, 0)
for _, rgName := range resourceGroups {
if !m.ResourceManager.ContainResourceGroup(rgName) {
return nil, meta.ErrRGNotExist
return nil, merr.WrapErrResourceGroupNotFound(rgName)
}
replicas, err := m.ReplicaManager.Spawn(collection, 1, rgName)

View File

@ -17,7 +17,6 @@
package utils
import (
"fmt"
"time"
"go.uber.org/zap"
@ -31,25 +30,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
// WrapStatus wraps status with given error code, message and errors
func WrapStatus(code commonpb.ErrorCode, msg string, errs ...error) *commonpb.Status {
status := &commonpb.Status{
ErrorCode: code,
Reason: msg,
}
for _, err := range errs {
status.Reason = fmt.Sprintf("%s, err=%v", status.Reason, err)
}
return status
}
// WrapError wraps error with given message
func WrapError(msg string, err error) error {
return fmt.Errorf("%s[%w]", msg, err)
}
func SegmentBinlogs2SegmentInfo(collectionID int64, partitionID int64, segmentBinlogs *datapb.SegmentBinlogs) *datapb.SegmentInfo {
return &datapb.SegmentInfo{
ID: segmentBinlogs.GetSegmentID(),

View File

@ -76,7 +76,7 @@ func (c *averageCollector) Average(label string) (float64, error) {
average, ok := c.averages[label]
if !ok {
return 0, merr.WrapErrAverageLabelNotRegister(label)
return 0, merr.WrapErrMetricNotFound(label)
}
return average.Value(), nil

View File

@ -1020,7 +1020,7 @@ func (suite *ServiceSuite) TestSearch_Failed() {
req.GetReq().MetricType = "IP"
resp, err = suite.node.Search(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
suite.ErrorIs(merr.Error(resp.GetStatus()), merr.ErrParameterInvalid)
suite.Contains(resp.GetStatus().GetReason(), merr.ErrParameterInvalid.Error())
req.GetReq().MetricType = "L2"

View File

@ -59,22 +59,25 @@ var (
ErrCollectionNotFound = newMilvusError("collection not found", 100, false)
ErrCollectionNotLoaded = newMilvusError("collection not loaded", 101, false)
ErrCollectionNumLimitExceeded = newMilvusError("exceeded the limit number of collections", 102, false)
ErrCollectionNotFullyLoaded = newMilvusError("collection not fully loaded", 103, true)
// Partition related
ErrPartitionNotFound = newMilvusError("partition not found", 202, false)
ErrPartitionNotLoaded = newMilvusError("partition not loaded", 203, false)
ErrPartitionNotFound = newMilvusError("partition not found", 202, false)
ErrPartitionNotLoaded = newMilvusError("partition not loaded", 203, false)
ErrPartitionNotFullyLoaded = newMilvusError("collection not fully loaded", 103, true)
// ResourceGroup related
ErrResourceGroupNotFound = newMilvusError("resource group not found", 300, false)
// Replica related
ErrReplicaNotFound = newMilvusError("replica not found", 400, false)
ErrNoAvailableNodeInReplica = newMilvusError("no available node in replica", 401, false)
ErrReplicaNotFound = newMilvusError("replica not found", 400, false)
ErrReplicaNotAvailable = newMilvusError("replica not available", 401, false)
// Channel related
ErrChannelNotFound = newMilvusError("channel not found", 500, false)
ErrChannelLack = newMilvusError("channel lacks", 501, false)
ErrChannelReduplicate = newMilvusError("channel reduplicates", 502, false)
ErrChannelNotFound = newMilvusError("channel not found", 500, false)
ErrChannelLack = newMilvusError("channel lacks", 501, false)
ErrChannelReduplicate = newMilvusError("channel reduplicates", 502, false)
ErrChannelNotAvailable = newMilvusError("channel not available", 503, false)
// Segment related
ErrSegmentNotFound = newMilvusError("segment not found", 600, false)
@ -91,11 +94,11 @@ var (
ErrInvalidedDatabaseName = newMilvusError("invalided database name", 802, false)
// Node related
ErrNodeNotFound = newMilvusError("node not found", 901, false)
ErrNodeOffline = newMilvusError("node offline", 902, false)
ErrNodeLack = newMilvusError("node lacks", 903, false)
ErrNodeNotMatch = newMilvusError("node not match", 904, false)
ErrNoAvailableNode = newMilvusError("no available node", 905, false)
ErrNodeNotFound = newMilvusError("node not found", 901, false)
ErrNodeOffline = newMilvusError("node offline", 902, false)
ErrNodeLack = newMilvusError("node lacks", 903, false)
ErrNodeNotMatch = newMilvusError("node not match", 904, false)
ErrNodeNotAvailable = newMilvusError("node not available", 905, false)
// IO related
ErrIoKeyNotFound = newMilvusError("key not found", 1000, false)
@ -111,9 +114,6 @@ var (
ErrTopicNotFound = newMilvusError("topic not found", 1300, false)
ErrTopicNotEmpty = newMilvusError("topic not empty", 1301, false)
// Average related
ErrAverageLabelNotRegister = newMilvusError("average label not register", 1400, false)
// shard delegator related
ErrShardDelegatorNotFound = newMilvusError("shard delegator not found", 1500, false)
ErrShardDelegatorAccessFailed = newMilvusError("fail to access shard delegator", 1501, true)
@ -121,9 +121,6 @@ var (
ErrShardDelegatorQueryFailed = newMilvusError("fail to query on all shard leaders", 1503, true)
ErrShardDelegatorStatisticFailed = newMilvusError("get statistics on all shard leaders", 1504, true)
// task related
ErrTaskQueueFull = newMilvusError("task queue full", 1600, false)
// field related
ErrFieldNotFound = newMilvusError("field not found", 1700, false)

View File

@ -70,16 +70,19 @@ func (s *ErrSuite) TestWrap() {
// Collection related
s.ErrorIs(WrapErrCollectionNotFound("test_collection", "failed to get collection"), ErrCollectionNotFound)
s.ErrorIs(WrapErrCollectionNotLoaded("test_collection", "failed to query"), ErrCollectionNotLoaded)
s.ErrorIs(WrapErrCollectionNotFullyLoaded("test_collection", "failed to query"), ErrCollectionNotFullyLoaded)
// Partition related
s.ErrorIs(WrapErrPartitionNotFound("test_Partition", "failed to get Partition"), ErrPartitionNotFound)
s.ErrorIs(WrapErrPartitionNotLoaded("test_Partition", "failed to query"), ErrPartitionNotLoaded)
s.ErrorIs(WrapErrPartitionNotFound("test_partition", "failed to get partition"), ErrPartitionNotFound)
s.ErrorIs(WrapErrPartitionNotLoaded("test_partition", "failed to query"), ErrPartitionNotLoaded)
s.ErrorIs(WrapErrPartitionNotFullyLoaded("test_partition", "failed to query"), ErrPartitionNotFullyLoaded)
// ResourceGroup related
s.ErrorIs(WrapErrResourceGroupNotFound("test_ResourceGroup", "failed to get ResourceGroup"), ErrResourceGroupNotFound)
// Replica related
s.ErrorIs(WrapErrReplicaNotFound(1, "failed to get Replica"), ErrReplicaNotFound)
s.ErrorIs(WrapErrReplicaNotFound(1, "failed to get replica"), ErrReplicaNotFound)
s.ErrorIs(WrapErrReplicaNotAvailable(1, "failed to get replica"), ErrReplicaNotAvailable)
// Channel related
s.ErrorIs(WrapErrChannelNotFound("test_Channel", "failed to get Channel"), ErrChannelNotFound)
@ -115,15 +118,9 @@ func (s *ErrSuite) TestWrap() {
s.ErrorIs(WrapErrTopicNotFound("unknown", "failed to get topic"), ErrTopicNotFound)
s.ErrorIs(WrapErrTopicNotEmpty("unknown", "topic is not empty"), ErrTopicNotEmpty)
// average related
s.ErrorIs(WrapErrAverageLabelNotRegister("unknown", "average label not register"), ErrAverageLabelNotRegister)
// shard delegator related
s.ErrorIs(WrapErrShardDelegatorNotFound("unknown", "fail to get shard delegator"), ErrShardDelegatorNotFound)
// task related
s.ErrorIs(WrapErrTaskQueueFull("test_task_queue", "task queue is full"), ErrTaskQueueFull)
// field related
s.ErrorIs(WrapErrFieldNotFound("meta", "failed to get field"), ErrFieldNotFound)
}

View File

@ -82,8 +82,14 @@ func oldCode(code int32) commonpb.ErrorCode {
return commonpb.ErrorCode_NotReadyServe
case ErrCollectionNotFound.code():
return commonpb.ErrorCode_CollectionNotExists
case ErrParameterInvalid.code():
return commonpb.ErrorCode_IllegalArgument
case ErrNodeNotMatch.code():
return commonpb.ErrorCode_NodeIDNotMatch
case ErrCollectionNotFound.code(), ErrPartitionNotFound.code(), ErrReplicaNotFound.code():
return commonpb.ErrorCode_MetaFailed
case ErrReplicaNotAvailable.code(), ErrChannelNotAvailable.code(), ErrNodeNotAvailable.code():
return commonpb.ErrorCode_NoReplicaAvailable
default:
return commonpb.ErrorCode_UnexpectedError
}
@ -217,6 +223,14 @@ func WrapErrCollectionResourceLimitExceeded(msg ...string) error {
return err
}
func WrapErrCollectionNotFullyLoaded(collection any, msg ...string) error {
err := wrapWithField(ErrCollectionNotFullyLoaded, "collection", collection)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
// Partition related
func WrapErrPartitionNotFound(partition any, msg ...string) error {
err := wrapWithField(ErrPartitionNotFound, "partition", partition)
@ -234,6 +248,14 @@ func WrapErrPartitionNotLoaded(partition any, msg ...string) error {
return err
}
func WrapErrPartitionNotFullyLoaded(partition any, msg ...string) error {
err := wrapWithField(ErrPartitionNotFullyLoaded, "partition", partition)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
// ResourceGroup related
func WrapErrResourceGroupNotFound(rg any, msg ...string) error {
err := wrapWithField(ErrResourceGroupNotFound, "rg", rg)
@ -252,8 +274,8 @@ func WrapErrReplicaNotFound(id int64, msg ...string) error {
return err
}
func WrapErrNoAvailableNodeInReplica(id int64, msg ...string) error {
err := wrapWithField(ErrNoAvailableNodeInReplica, "replica", id)
func WrapErrReplicaNotAvailable(id int64, msg ...string) error {
err := wrapWithField(ErrReplicaNotAvailable, "replica", id)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
@ -285,6 +307,14 @@ func WrapErrChannelReduplicate(name string, msg ...string) error {
return err
}
func WrapErrChannelNotAvailable(name string, msg ...string) error {
err := wrapWithField(ErrChannelNotAvailable, "channel", name)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
// Segment related
func WrapErrSegmentNotFound(id int64, msg ...string) error {
err := wrapWithField(ErrSegmentNotFound, "segment", id)
@ -352,8 +382,8 @@ func WrapErrNodeLack(expectedNum, actualNum int64, msg ...string) error {
return err
}
func WrapErrNoAvailableNode(msg ...string) error {
err := error(ErrNoAvailableNode)
func WrapErrNodeNotAvailable(id int64, msg ...string) error {
err := wrapWithField(ErrNodeNotAvailable, "node", id)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
@ -436,15 +466,6 @@ func WrapErrTopicNotEmpty(name string, msg ...string) error {
return err
}
// Average related
func WrapErrAverageLabelNotRegister(label string, msg ...string) error {
err := errors.Wrapf(ErrAverageLabelNotRegister, "averageLabel=%s", label)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
// shard delegator related
func WrapErrShardDelegatorNotFound(channel string, msg ...string) error {
err := errors.Wrapf(ErrShardDelegatorNotFound, "channel=%s", channel)
@ -486,15 +507,6 @@ func WrapErrShardDelegatorStatisticFailed(msg ...string) error {
return err
}
// task related
func WrapErrTaskQueueFull(msg ...string) error {
err := error(ErrTaskQueueFull)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
// field related
func WrapErrFieldNotFound[T any](field T, msg ...string) error {
err := errors.Wrapf(ErrFieldNotFound, "field=%v", field)

View File

@ -82,6 +82,7 @@ list_content = "list_content"
dict_content = "dict_content"
value_content = "value_content"
code = "code"
err_code = "err_code"
err_msg = "err_msg"
in_cluster_env = "IN_CLUSTER"

View File

@ -269,7 +269,7 @@ class TestUtilityParams(TestcaseBase):
collection_w = self.init_collection_general(prefix)[0]
log.debug(collection_w.num_entities)
collection_w.load()
err_msg = {ct.err_code: 1, ct.err_msg: f"partitionID of partitionName:{ct.default_tag} can not be find"}
err_msg = {ct.err_code: 15, ct.err_msg: f"partition not found"}
self.utility_wrap.loading_progress(collection_w.name, partition_names,
check_task=CheckTasks.err_res, check_items=err_msg)