Fix the concurrent write issue when init privilege policy (#21073)

Signed-off-by: SimFG <bang.fu@zilliz.com>

Signed-off-by: SimFG <bang.fu@zilliz.com>
pull/21083/head
SimFG 2022-12-08 19:45:20 +08:00 committed by GitHub
parent a334853475
commit 7149b01c86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 26 additions and 25 deletions

View File

@ -338,6 +338,7 @@ func (s *Server) init() error {
}
s.etcdCli = etcdCli
s.proxy.SetEtcdClient(s.etcdCli)
proxy.InitPolicyModel()
errChan := make(chan error, 1)
{

View File

@ -6,20 +6,17 @@ import (
"reflect"
"strings"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/util"
"github.com/casbin/casbin/v2"
"github.com/casbin/casbin/v2/model"
jsonadapter "github.com/casbin/json-adapter/v2"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/funcutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type PrivilegeFunc func(ctx context.Context, req interface{}) (context.Context, error)
@ -41,27 +38,22 @@ e = some(where (p.eft == allow))
[matchers]
m = r.sub == p.sub && globMatch(r.obj, p.obj) && globMatch(r.act, p.act) || r.sub == "admin" || (r.sub == p.sub && p.act == "PrivilegeAll")
`
ModelKey = "casbin"
)
var modelStore = make(map[string]model.Model, 1)
var (
casbinModel model.Model
)
func initPolicyModel() (model.Model, error) {
if policyModel, ok := modelStore[ModelStr]; ok {
return policyModel, nil
}
policyModel, err := model.NewModelFromString(ModelStr)
func InitPolicyModel() {
var err error
casbinModel, err = model.NewModelFromString(ModelStr)
if err != nil {
log.Error("NewModelFromString fail", zap.String("model", ModelStr), zap.Error(err))
return nil, err
log.Panic("NewModelFromString fail", zap.String("model", ModelStr), zap.Error(err))
}
modelStore[ModelKey] = policyModel
return modelStore[ModelKey], nil
}
// UnaryServerInterceptor returns a new unary server interceptors that performs per-request privilege access.
func UnaryServerInterceptor(privilegeFunc PrivilegeFunc) grpc.UnaryServerInterceptor {
initPolicyModel()
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
newCtx, err := privilegeFunc(ctx, req)
if err != nil {
@ -115,13 +107,10 @@ func PrivilegeInterceptor(ctx context.Context, req interface{}) (context.Context
policy := fmt.Sprintf("[%s]", policyInfo)
b := []byte(policy)
a := jsonadapter.NewAdapter(&b)
policyModel, err := initPolicyModel()
if err != nil {
errStr := "fail to get policy model"
log.Error(errStr, zap.Error(err))
return ctx, err
if casbinModel == nil {
log.Panic("fail to get policy model")
}
e, err := casbin.NewEnforcer(policyModel, a)
e, err := casbin.NewEnforcer(casbinModel, a)
if err != nil {
log.Error("NewEnforcer fail", zap.String("policy", policy), zap.Error(err))
return ctx, err

View File

@ -19,7 +19,9 @@ func TestUnaryServerInterceptor(t *testing.T) {
func TestPrivilegeInterceptor(t *testing.T) {
ctx := context.Background()
t.Run("Authorization Disabled", func(t *testing.T) {
InitPolicyModel()
Params.CommonCfg.AuthorizationEnabled = false
_, err := PrivilegeInterceptor(ctx, &milvuspb.LoadCollectionRequest{
DbName: "db_test",
@ -29,6 +31,7 @@ func TestPrivilegeInterceptor(t *testing.T) {
})
t.Run("Authorization Enabled", func(t *testing.T) {
InitPolicyModel()
Params.CommonCfg.AuthorizationEnabled = true
_, err := PrivilegeInterceptor(ctx, &milvuspb.HasCollectionRequest{})
@ -109,6 +112,14 @@ func TestPrivilegeInterceptor(t *testing.T) {
CollectionName: "col1",
})
assert.Nil(t, err)
casbinModel = nil
assert.Panics(t, func() {
PrivilegeInterceptor(GetContext(context.Background(), "fooo:123456"), &milvuspb.LoadCollectionRequest{
DbName: "db_test",
CollectionName: "col1",
})
})
})
}