Use merr in proxy (#22904)

Signed-off-by: lixinguo <xinguo.li@zilliz.com>
Co-authored-by: lixinguo <xinguo.li@zilliz.com>
pull/22977/head
smellthemoon 2023-03-24 15:27:58 +08:00 committed by GitHub
parent 8b3e5189e1
commit 3dae84f065
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 30 additions and 227 deletions

View File

@ -2,12 +2,14 @@ package proxy
import (
"context"
"fmt"
"strings"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/crypto"
"github.com/milvus-io/milvus/internal/util/merr"
"go.uber.org/zap"
"google.golang.org/grpc/metadata"
)
@ -51,10 +53,10 @@ func AuthenticationInterceptor(ctx context.Context) (context.Context, error) {
// See: https://godoc.org/google.golang.org/grpc/metadata#New
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, ErrMissingMetadata()
return nil, merr.WrapErrIoKeyNotFound("metadata", "auth check failure, due to occurs inner error: missing metadata")
}
if globalMetaCache == nil {
return nil, ErrProxyNotReady()
return nil, merr.WrapErrServiceUnavailable("internal: Milvus Proxy is not ready yet. please wait")
}
// check:
// 1. if rpc call from a member (like index/query/data component)
@ -63,7 +65,8 @@ func AuthenticationInterceptor(ctx context.Context) (context.Context, error) {
if !validSourceID(ctx, md[strings.ToLower(util.HeaderSourceID)]) {
username, password := parseMD(md[strings.ToLower(util.HeaderAuthorize)])
if !passwordVerify(ctx, username, password, globalMetaCache) {
return nil, ErrUnauthenticated()
msg := fmt.Sprintf("username: %s, password: %s", username, password)
return nil, merr.WrapErrParameterInvalid("vaild username and password", msg, "auth check failure, please check username and password are correct")
}
metrics.UserRPCCounter.WithLabelValues(username).Inc()
}

View File

@ -21,14 +21,14 @@ import (
"github.com/cockroachdb/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
)
// TODO(dragondriver): add more common error type
// Keep this error temporarily
// this error belongs to ErrServiceMemoryLimitExceeded
// but in the error returned by querycoord,the collection id is given
// which can not be thrown out
// the error will be deleted after reaching an agreement on collection name and id in qn
// ErrInsufficientMemory returns insufficient memory error.
var ErrInsufficientMemory = errors.New("InsufficientMemoryToLoad")
@ -40,66 +40,3 @@ func InSufficientMemoryStatus(collectionName string) *commonpb.Status {
Reason: fmt.Sprintf("deny to load, insufficient memory, please allocate more resources, collectionName: %s", collectionName),
}
}
func errInvalidNumRows(numRows uint32) error {
return fmt.Errorf("invalid num_rows: %d", numRows)
}
func errNumRowsLessThanOrEqualToZero(numRows uint32) error {
return fmt.Errorf("num_rows(%d) should be greater than 0", numRows)
}
func errNumRowsOfFieldDataMismatchPassed(idx int, fieldNumRows, passedNumRows uint32) error {
return fmt.Errorf("the num_rows(%d) of %dth field is not equal to passed NumRows(%d)", fieldNumRows, idx, passedNumRows)
}
var errEmptyFieldData = errors.New("empty field data")
func errFieldsLessThanNeeded(fieldsNum, needed int) error {
return fmt.Errorf("the length(%d) of passed fields is less than needed(%d)", fieldsNum, needed)
}
func errUnsupportedDataType(dType schemapb.DataType) error {
return fmt.Errorf("%v is not supported now", dType)
}
func errUnsupportedDType(dType string) error {
return fmt.Errorf("%s is not supported now", dType)
}
func errInvalidDim(dim int) error {
return fmt.Errorf("invalid dim: %d", dim)
}
func errDimLessThanOrEqualToZero(dim int) error {
return fmt.Errorf("dim(%d) should be greater than 0", dim)
}
func errDimShouldDivide8(dim int) error {
return fmt.Errorf("dim(%d) should divide 8", dim)
}
func msgProxyIsUnhealthy(id UniqueID) string {
return fmt.Sprintf("proxy %d is unhealthy", id)
}
// errProxyIsUnhealthy returns an error represent proxy is unhealthy
func errProxyIsUnhealthy(id UniqueID) error {
return errors.New(msgProxyIsUnhealthy(id))
}
func ErrMissingMetadata() error {
return fmt.Errorf("auth check failure, due to occurs inner error: missing metadata")
}
func ErrUnauthenticated() error {
return fmt.Errorf("auth check failure, please check username and password are correct")
}
func ErrProxyNotReady() error {
return status.Errorf(codes.Unavailable, "internal: Milvus Proxy is not ready yet. please wait")
}
func ErrPartitionNotExist(partitionName string) error {
return fmt.Errorf("partition is not exist: %s", partitionName)
}

View File

@ -21,141 +21,10 @@ import (
"testing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/log"
"github.com/stretchr/testify/assert"
)
func Test_errInvalidNumRows(t *testing.T) {
invalidNumRowsList := []uint32{
0,
16384,
}
for _, invalidNumRows := range invalidNumRowsList {
log.Info("Test_errInvalidNumRows",
zap.Error(errInvalidNumRows(invalidNumRows)))
}
}
func Test_errNumRowsLessThanOrEqualToZero(t *testing.T) {
invalidNumRowsList := []uint32{
0,
16384,
}
for _, invalidNumRows := range invalidNumRowsList {
log.Info("Test_errNumRowsLessThanOrEqualToZero",
zap.Error(errNumRowsLessThanOrEqualToZero(invalidNumRows)))
}
}
func Test_errEmptyFieldData(t *testing.T) {
log.Info("Test_errEmptyFieldData",
zap.Error(errEmptyFieldData))
}
func Test_errFieldsLessThanNeeded(t *testing.T) {
cases := []struct {
fieldsNum int
neededNum int
}{
{0, 1},
{1, 2},
}
for _, test := range cases {
log.Info("Test_errFieldsLessThanNeeded",
zap.Error(errFieldsLessThanNeeded(test.fieldsNum, test.neededNum)))
}
}
func Test_errUnsupportedDataType(t *testing.T) {
unsupportedDTypes := []schemapb.DataType{
schemapb.DataType_None,
}
for _, dType := range unsupportedDTypes {
log.Info("Test_errUnsupportedDataType",
zap.Error(errUnsupportedDataType(dType)))
}
}
func Test_errUnsupportedDType(t *testing.T) {
unsupportedDTypes := []string{
"bytes",
"None",
}
for _, dType := range unsupportedDTypes {
log.Info("Test_errUnsupportedDType",
zap.Error(errUnsupportedDType(dType)))
}
}
func Test_errInvalidDim(t *testing.T) {
invalidDimList := []int{
0,
-1,
}
for _, invalidDim := range invalidDimList {
log.Info("Test_errInvalidDim",
zap.Error(errInvalidDim(invalidDim)))
}
}
func Test_errDimLessThanOrEqualToZero(t *testing.T) {
invalidDimList := []int{
0,
-1,
}
for _, invalidDim := range invalidDimList {
log.Info("Test_errDimLessThanOrEqualToZero",
zap.Error(errDimLessThanOrEqualToZero(invalidDim)))
}
}
func Test_errDimShouldDivide8(t *testing.T) {
invalidDimList := []int{
0,
1,
7,
}
for _, invalidDim := range invalidDimList {
log.Info("Test_errDimShouldDivide8",
zap.Error(errDimShouldDivide8(invalidDim)))
}
}
func Test_msgProxyIsUnhealthy(t *testing.T) {
ids := []UniqueID{
1,
}
for _, id := range ids {
log.Info("Test_msgProxyIsUnhealthy",
zap.String("msg", msgProxyIsUnhealthy(id)))
}
}
func Test_errProxyIsUnhealthy(t *testing.T) {
ids := []UniqueID{
1,
}
for _, id := range ids {
log.Info("Test_errProxyIsUnhealthy",
zap.Error(errProxyIsUnhealthy(id)))
}
}
func Test_ErrInsufficientMemory(t *testing.T) {
err := fmt.Errorf("%w, mock insufficient memory error", ErrInsufficientMemory)
assert.True(t, errors.Is(err, ErrInsufficientMemory))

View File

@ -48,6 +48,7 @@ import (
"github.com/milvus-io/milvus/internal/util/errorutil"
"github.com/milvus-io/milvus/internal/util/importutil"
"github.com/milvus-io/milvus/internal/util/logutil"
"github.com/milvus-io/milvus/internal/util/merr"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/ratelimitutil"
@ -1458,10 +1459,7 @@ func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.Get
}
}
return &milvuspb.GetLoadingProgressResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Status: merr.Status(err),
}
}
if err := validateCollectionName(request.CollectionName); err != nil {
@ -1532,10 +1530,7 @@ func (node *Proxy) GetLoadState(ctx context.Context, request *milvuspb.GetLoadSt
zap.Error(err))
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
return &milvuspb.GetLoadStateResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Status: merr.Status(err),
}
}
@ -3244,16 +3239,14 @@ func (node *Proxy) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsReque
zap.String("req", req.Request))
if !node.checkHealthy() {
err := merr.WrapErrServiceNotReady(fmt.Sprintf("proxy %d is unhealthy", paramtable.GetNodeID()))
log.Warn("Proxy.GetMetrics failed",
zap.Int64("nodeID", paramtable.GetNodeID()),
zap.String("req", req.Request),
zap.Error(errProxyIsUnhealthy(paramtable.GetNodeID())))
zap.Error(err))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: msgProxyIsUnhealthy(paramtable.GetNodeID()),
},
Status: merr.Status(err),
Response: "",
}, nil
}
@ -3322,14 +3315,12 @@ func (node *Proxy) GetProxyMetrics(ctx context.Context, req *milvuspb.GetMetrics
zap.String("req", req.Request))
if !node.checkHealthy() {
err := merr.WrapErrServiceNotReady(fmt.Sprintf("proxy %d is unhealthy", paramtable.GetNodeID()))
log.Warn("Proxy.GetProxyMetrics failed",
zap.Error(errProxyIsUnhealthy(paramtable.GetNodeID())))
zap.Error(err))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: msgProxyIsUnhealthy(paramtable.GetNodeID()),
},
Status: merr.Status(err),
}, nil
}

View File

@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/util/commonpbutil"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/merr"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/retry"
@ -470,7 +471,7 @@ func (m *MetaCache) GetPartitionInfo(ctx context.Context, collectionName string,
log.Debug("proxy", zap.Any("GetPartitionID:partitions after update", partitions), zap.Any("collectionName", collectionName))
partInfo, ok = m.collInfo[collectionName].partInfo[partitionName]
if !ok {
return nil, ErrPartitionNotExist(partitionName)
return nil, merr.WrapErrPartitionNotFound(partitionName)
}
}
metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "GetPartitionInfo", metrics.CacheHitLabel).Inc()

View File

@ -44,6 +44,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/util/distance"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/merr"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/internal/util/uniquegenerator"
@ -1139,7 +1140,7 @@ func TestDropPartitionTask(t *testing.T) {
mockCache := newMockCache()
mockCache.setGetPartitionIDFunc(func(ctx context.Context, collectionName string, partitionName string) (typeutil.UniqueID, error) {
return 0, ErrPartitionNotExist(task.PartitionName)
return 0, merr.WrapErrPartitionNotFound(partitionName)
})
mockCache.setGetIDFunc(func(ctx context.Context, collectionName string) (typeutil.UniqueID, error) {
return 1, nil

View File

@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/internal/util/commonpbutil"
"github.com/milvus-io/milvus/internal/util/merr"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/proto/querypb"
@ -733,7 +734,7 @@ func GetCurUserFromContext(ctx context.Context) (string, error) {
func GetRole(username string) ([]string, error) {
if globalMetaCache == nil {
return []string{}, ErrProxyNotReady()
return []string{}, merr.WrapErrServiceUnavailable("internal: Milvus Proxy is not ready yet. please wait")
}
return globalMetaCache.GetUserRole(username), nil
}
@ -912,7 +913,7 @@ func checkLengthOfFieldsData(schema *schemapb.CollectionSchema, insertMsg *msgst
}
if len(insertMsg.FieldsData) < neededFieldsNum {
return errFieldsLessThanNeeded(len(insertMsg.FieldsData), neededFieldsNum)
return merr.WrapErrParameterInvalid(neededFieldsNum, len(insertMsg.FieldsData), "the length of passed fields is less than needed")
}
return nil
@ -922,7 +923,7 @@ func checkPrimaryFieldData(schema *schemapb.CollectionSchema, result *milvuspb.M
rowNums := uint32(insertMsg.NRows())
// TODO(dragondriver): in fact, NumRows is not trustable, we should check all input fields
if insertMsg.NRows() <= 0 {
return nil, errNumRowsLessThanOrEqualToZero(rowNums)
return nil, merr.WrapErrParameterInvalid("invalid num_rows", fmt.Sprint(rowNums), "num_rows should be greater than 0")
}
if err := checkLengthOfFieldsData(schema, insertMsg); err != nil {

View File

@ -96,7 +96,7 @@ var (
ErrParameterInvalid = newMilvusError("invalid parameter", 1100, false)
// Metrics related
ErrMetricNotFound = newMilvusError("MetricNotFound", 1200, false)
ErrMetricNotFound = newMilvusError("metric not found", 1200, false)
// Do NOT export this,
// never allow programmer using this, keep only for converting unknown error to milvusError