mirror of https://github.com/milvus-io/milvus.git
Add a specific `KeyNotExistError` err for the metastore `load` interface (#18582)
Signed-off-by: SimFG <bang.fu@zilliz.com>pull/18325/head
parent
f0a336a3e2
commit
1366f130a2
|
@ -16,6 +16,8 @@
|
|||
|
||||
package common
|
||||
|
||||
import "fmt"
|
||||
|
||||
type IgnorableError struct {
|
||||
msg string
|
||||
}
|
||||
|
@ -34,3 +36,22 @@ func IsIgnorableError(err error) bool {
|
|||
_, ok := err.(*IgnorableError)
|
||||
return ok
|
||||
}
|
||||
|
||||
var _ error = &KeyNotExistError{}
|
||||
|
||||
func NewKeyNotExistError(key string) error {
|
||||
return &KeyNotExistError{key: key}
|
||||
}
|
||||
|
||||
func IsKeyNotExistError(err error) bool {
|
||||
_, ok := err.(*KeyNotExistError)
|
||||
return ok
|
||||
}
|
||||
|
||||
type KeyNotExistError struct {
|
||||
key string
|
||||
}
|
||||
|
||||
func (k *KeyNotExistError) Error() string {
|
||||
return fmt.Sprintf("there is no value on key = %s", k.key)
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package common
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
|
@ -29,3 +30,9 @@ func TestIgnorableError(t *testing.T) {
|
|||
assert.True(t, IsIgnorableError(iErr))
|
||||
assert.False(t, IsIgnorableError(err))
|
||||
}
|
||||
|
||||
func TestNotExistError(t *testing.T) {
|
||||
err := errors.New("err")
|
||||
assert.Equal(t, false, IsKeyNotExistError(err))
|
||||
assert.Equal(t, true, IsKeyNotExistError(NewKeyNotExistError("foo")))
|
||||
}
|
||||
|
|
|
@ -24,6 +24,8 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/server/v3/embed"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
|
||||
|
@ -199,7 +201,7 @@ func (kv *EmbedEtcdKV) Load(key string) (string, error) {
|
|||
return "", err
|
||||
}
|
||||
if resp.Count <= 0 {
|
||||
return "", fmt.Errorf("there is no value on key = %s", key)
|
||||
return "", common.NewKeyNotExistError(key)
|
||||
}
|
||||
|
||||
return string(resp.Kvs[0].Value), nil
|
||||
|
@ -215,7 +217,7 @@ func (kv *EmbedEtcdKV) LoadBytes(key string) ([]byte, error) {
|
|||
return nil, err
|
||||
}
|
||||
if resp.Count <= 0 {
|
||||
return nil, fmt.Errorf("there is no value on key = %s", key)
|
||||
return nil, common.NewKeyNotExistError(key)
|
||||
}
|
||||
|
||||
return resp.Kvs[0].Value, nil
|
||||
|
|
|
@ -22,6 +22,8 @@ import (
|
|||
"path"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
|
||||
|
@ -179,7 +181,7 @@ func (kv *EtcdKV) Load(key string) (string, error) {
|
|||
return "", err
|
||||
}
|
||||
if resp.Count <= 0 {
|
||||
return "", fmt.Errorf("there is no value on key = %s", key)
|
||||
return "", common.NewKeyNotExistError(key)
|
||||
}
|
||||
CheckElapseAndWarn(start, "Slow etcd operation load", zap.String("key", key))
|
||||
return string(resp.Kvs[0].Value), nil
|
||||
|
@ -196,7 +198,7 @@ func (kv *EtcdKV) LoadBytes(key string) ([]byte, error) {
|
|||
return []byte{}, err
|
||||
}
|
||||
if resp.Count <= 0 {
|
||||
return []byte{}, fmt.Errorf("there is no value on key = %s", key)
|
||||
return []byte{}, common.NewKeyNotExistError(key)
|
||||
}
|
||||
CheckElapseAndWarn(start, "Slow etcd operation load", zap.String("key", key))
|
||||
return resp.Kvs[0].Value, nil
|
||||
|
|
|
@ -17,10 +17,11 @@
|
|||
package memkv
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
|
||||
"github.com/google/btree"
|
||||
)
|
||||
|
||||
|
@ -81,7 +82,7 @@ func (kv *MemoryKV) Load(key string) (string, error) {
|
|||
defer kv.RUnlock()
|
||||
item := kv.tree.Get(memoryKVItem{key: key})
|
||||
if item == nil {
|
||||
return "", fmt.Errorf("invalid key: %s", key)
|
||||
return "", common.NewKeyNotExistError(key)
|
||||
}
|
||||
return item.(memoryKVItem).value.String(), nil
|
||||
}
|
||||
|
@ -92,7 +93,7 @@ func (kv *MemoryKV) LoadBytes(key string) ([]byte, error) {
|
|||
defer kv.RUnlock()
|
||||
item := kv.tree.Get(memoryKVItem{key: key})
|
||||
if item == nil {
|
||||
return []byte{}, fmt.Errorf("invalid key: %s", key)
|
||||
return []byte{}, common.NewKeyNotExistError(key)
|
||||
}
|
||||
return item.(memoryKVItem).value.ByteSlice(), nil
|
||||
}
|
||||
|
|
|
@ -10,6 +10,8 @@ import (
|
|||
"reflect"
|
||||
"strconv"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
||||
|
@ -980,7 +982,7 @@ func (kc *Catalog) OperatePrivilege(ctx context.Context, tenant string, entity *
|
|||
if funcutil.IsRevoke(operateType) {
|
||||
return err
|
||||
}
|
||||
if !funcutil.IsKeyNotExistError(err) {
|
||||
if !common.IsKeyNotExistError(err) {
|
||||
return err
|
||||
}
|
||||
curGrantPrivilegeEntity.Entities = append(curGrantPrivilegeEntity.Entities, &milvuspb.GrantorEntity{
|
||||
|
|
|
@ -3279,6 +3279,10 @@ func testProxyRole(ctx context.Context, t *testing.T, proxy *Proxy) {
|
|||
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
roleNum := len(resp.Results)
|
||||
|
||||
resp, _ = proxy.SelectRole(ctx, &milvuspb.SelectRoleRequest{Role: &milvuspb.RoleEntity{Name: "not_existed"}})
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
assert.Equal(t, 0, len(resp.Results))
|
||||
|
||||
roleName := "unit_test"
|
||||
roleResp, _ := proxy.CreateRole(ctx, &milvuspb.CreateRoleRequest{Entity: &milvuspb.RoleEntity{Name: roleName}})
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, roleResp.ErrorCode)
|
||||
|
@ -3306,7 +3310,8 @@ func testProxyRole(ctx context.Context, t *testing.T, proxy *Proxy) {
|
|||
|
||||
entity.Name = "not_existed"
|
||||
resp, _ = proxy.SelectUser(ctx, &milvuspb.SelectUserRequest{User: entity})
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
assert.Equal(t, 0, len(resp.Results))
|
||||
|
||||
entity.Name = "root"
|
||||
resp, _ = proxy.SelectUser(ctx, &milvuspb.SelectUserRequest{User: entity})
|
||||
|
|
|
@ -1351,7 +1351,7 @@ func TestRbacOperatePrivilege(t *testing.T) {
|
|||
assert.NotNil(t, err)
|
||||
|
||||
mockTxnKV.load = func(key string) (string, error) {
|
||||
return "fail", fmt.Errorf("there is no value on key = %s", key)
|
||||
return "fail", common.NewKeyNotExistError(key)
|
||||
}
|
||||
err = mt.OperatePrivilege(util.DefaultTenant, entity, milvuspb.OperatePrivilegeType_Grant)
|
||||
assert.Nil(t, err)
|
||||
|
|
|
@ -3086,7 +3086,7 @@ func (c *Core) CreateRole(ctx context.Context, in *milvuspb.CreateRoleRequest) (
|
|||
errMsg := "role already exists:" + entity.Name
|
||||
return failStatus(commonpb.ErrorCode_CreateRoleFailure, errMsg), errors.New(errMsg)
|
||||
}
|
||||
if !funcutil.IsKeyNotExistError(err) {
|
||||
if !common.IsKeyNotExistError(err) {
|
||||
return failStatus(commonpb.ErrorCode_CreateRoleFailure, err.Error()), err
|
||||
}
|
||||
|
||||
|
@ -3241,6 +3241,11 @@ func (c *Core) SelectRole(ctx context.Context, in *milvuspb.SelectRoleRequest) (
|
|||
if _, err := c.MetaTable.SelectRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: in.Role.Name}, false); err != nil {
|
||||
errMsg := "fail to select the role to check the role name"
|
||||
logger.Error(errMsg, zap.String("role_name", in.Role.Name), zap.Error(err))
|
||||
if common.IsKeyNotExistError(err) {
|
||||
return &milvuspb.SelectRoleResponse{
|
||||
Status: succStatus(),
|
||||
}, nil
|
||||
}
|
||||
return &milvuspb.SelectRoleResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_SelectRoleFailure, errMsg),
|
||||
}, err
|
||||
|
@ -3282,6 +3287,11 @@ func (c *Core) SelectUser(ctx context.Context, in *milvuspb.SelectUserRequest) (
|
|||
if _, err := c.MetaTable.SelectUser(util.DefaultTenant, &milvuspb.UserEntity{Name: in.User.Name}, false); err != nil {
|
||||
errMsg := "fail to select the user to check the username"
|
||||
logger.Error(errMsg, zap.String("username", in.User.Name), zap.Error(err))
|
||||
if common.IsKeyNotExistError(err) {
|
||||
return &milvuspb.SelectUserResponse{
|
||||
Status: succStatus(),
|
||||
}, nil
|
||||
}
|
||||
return &milvuspb.SelectUserResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_SelectUserFailure, errMsg),
|
||||
}, err
|
||||
|
|
|
@ -385,11 +385,6 @@ func IsGrant(operateType milvuspb.OperatePrivilegeType) bool {
|
|||
return operateType == milvuspb.OperatePrivilegeType_Grant
|
||||
}
|
||||
|
||||
// IsKeyNotExistError Judging by the error message whether the key does not exist or not for the ectd server
|
||||
func IsKeyNotExistError(err error) bool {
|
||||
return strings.Contains(err.Error(), "there is no value on key")
|
||||
}
|
||||
|
||||
func EncodeUserRoleCache(user string, role string) string {
|
||||
return fmt.Sprintf("%s/%s", user, role)
|
||||
}
|
||||
|
|
|
@ -530,11 +530,6 @@ func TestIsGrant(t *testing.T) {
|
|||
assert.Equal(t, false, IsGrant(milvuspb.OperatePrivilegeType_Revoke))
|
||||
}
|
||||
|
||||
func TestIsKeyNotExistError(t *testing.T) {
|
||||
assert.Equal(t, true, IsKeyNotExistError(fmt.Errorf("there is no value on key = %s", "foo")))
|
||||
assert.Equal(t, false, IsKeyNotExistError(fmt.Errorf("err: key = %s", "fooo")))
|
||||
}
|
||||
|
||||
func TestUserRoleCache(t *testing.T) {
|
||||
user, role := "foo", "root"
|
||||
cache := EncodeUserRoleCache(user, role)
|
||||
|
|
Loading…
Reference in New Issue