enhance: Avoid initializing casbin enforcer for each request (#29117)

See also #29113

This patch:
- Replace plain Enforcer with `casbin.SyncedEnforcer`
- Add implementation of persist.Adapter with `MetaCacheCasbinAdapter`
- Invoke enforcer.LoadPolicy when policy updated

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/29110/head
congqixia 2023-12-12 10:36:43 +08:00 committed by GitHub
parent 994b239161
commit d0bac9d0bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 203 additions and 18 deletions

View File

@ -899,6 +899,12 @@ func (m *MetaCache) expireShardLeaderCache(ctx context.Context) {
}
func (m *MetaCache) InitPolicyInfo(info []string, userRoles []string) {
defer func() {
err := getEnforcer().LoadPolicy()
if err != nil {
log.Error("failed to load policy after RefreshPolicyInfo", zap.Error(err))
}
}()
m.mu.Lock()
defer m.mu.Unlock()
m.unsafeInitPolicyInfo(info, userRoles)
@ -933,7 +939,15 @@ func (m *MetaCache) GetUserRole(user string) []string {
return util.StringList(m.userToRoles[user])
}
func (m *MetaCache) RefreshPolicyInfo(op typeutil.CacheOp) error {
func (m *MetaCache) RefreshPolicyInfo(op typeutil.CacheOp) (err error) {
defer func() {
if err == nil {
le := getEnforcer().LoadPolicy()
if le != nil {
log.Error("failed to load policy after RefreshPolicyInfo", zap.Error(le))
}
}
}()
if op.OpType != typeutil.CacheRefresh {
m.mu.Lock()
defer m.mu.Unlock()
@ -941,6 +955,7 @@ func (m *MetaCache) RefreshPolicyInfo(op typeutil.CacheOp) error {
return errors.New("empty op key")
}
}
switch op.OpType {
case typeutil.CacheGrantPrivilege:
m.privilegeInfos[op.OpKey] = struct{}{}

View File

@ -0,0 +1,85 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"context"
"fmt"
"strings"
"github.com/casbin/casbin/v2/model"
jsonadapter "github.com/casbin/json-adapter/v2"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
)
// MetaCacheCasbinAdapter is the implementation of `persist.Adapter` with Cache
// Since the usage shall be read-only, it implements only `LoadPolicy` for now.
type MetaCacheCasbinAdapter struct {
cacheSource func() Cache
}
func NewMetaCacheCasbinAdapter(cacheSource func() Cache) *MetaCacheCasbinAdapter {
return &MetaCacheCasbinAdapter{
cacheSource: cacheSource,
}
}
// LoadPolicy loads all policy rules from the storage.
// Implementing `persist.Adapter`.
func (a *MetaCacheCasbinAdapter) LoadPolicy(model model.Model) error {
cache := a.cacheSource()
if cache == nil {
return merr.WrapErrServiceInternal("cache source return nil cache")
}
policyInfo := strings.Join(cache.GetPrivilegeInfo(context.Background()), ",")
policy := fmt.Sprintf("[%s]", policyInfo)
log.Ctx(context.Background()).Info("LoddPolicy update policyinfo", zap.String("policyInfo", policy))
byteSource := []byte(policy)
jAdapter := jsonadapter.NewAdapter(&byteSource)
return jAdapter.LoadPolicy(model)
}
// SavePolicy saves all policy rules to the storage.
// Implementing `persist.Adapter`.
// MetaCacheCasbinAdapter is read-only, always returns error
func (a *MetaCacheCasbinAdapter) SavePolicy(model model.Model) error {
return merr.WrapErrServiceInternal("MetaCacheCasbinAdapter is read-only, but received SavePolicy call")
}
// AddPolicy adds a policy rule to the storage.
// Implementing `persist.Adapter`.
// MetaCacheCasbinAdapter is read-only, always returns error
func (a *MetaCacheCasbinAdapter) AddPolicy(sec string, ptype string, rule []string) error {
return merr.WrapErrServiceInternal("MetaCacheCasbinAdapter is read-only, but received AddPolicy call")
}
// RemovePolicy removes a policy rule from the storage.
// Implementing `persist.Adapter`.
// MetaCacheCasbinAdapter is read-only, always returns error
func (a *MetaCacheCasbinAdapter) RemovePolicy(sec string, ptype string, rule []string) error {
return merr.WrapErrServiceInternal("MetaCacheCasbinAdapter is read-only, but received RemovePolicy call")
}
// RemoveFilteredPolicy removes policy rules that match the filter from the storage.
// This is part of the Auto-Save feature.
func (a *MetaCacheCasbinAdapter) RemoveFilteredPolicy(sec string, ptype string, fieldIndex int, fieldValues ...string) error {
return merr.WrapErrServiceInternal("MetaCacheCasbinAdapter is read-only, but received RemoveFilteredPolicy call")
}

View File

@ -0,0 +1,76 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
)
type MetaCacheCasbinAdapterSuite struct {
suite.Suite
cache *MockCache
adapter *MetaCacheCasbinAdapter
}
func (s *MetaCacheCasbinAdapterSuite) SetupTest() {
s.cache = NewMockCache(s.T())
s.adapter = NewMetaCacheCasbinAdapter(func() Cache { return s.cache })
}
func (s *MetaCacheCasbinAdapterSuite) TestLoadPolicy() {
s.Run("normal_load", func() {
s.cache.EXPECT().GetPrivilegeInfo(mock.Anything).Return([]string{})
m := getPolicyModel(ModelStr)
err := s.adapter.LoadPolicy(m)
s.NoError(err)
})
s.Run("source_return_nil", func() {
adapter := NewMetaCacheCasbinAdapter(func() Cache { return nil })
m := getPolicyModel(ModelStr)
err := adapter.LoadPolicy(m)
s.Error(err)
})
}
func (s *MetaCacheCasbinAdapterSuite) TestSavePolicy() {
m := getPolicyModel(ModelStr)
s.Error(s.adapter.SavePolicy(m))
}
func (s *MetaCacheCasbinAdapterSuite) TestAddPolicy() {
s.Error(s.adapter.AddPolicy("", "", []string{}))
}
func (s *MetaCacheCasbinAdapterSuite) TestRemovePolicy() {
s.Error(s.adapter.RemovePolicy("", "", []string{}))
}
func (s *MetaCacheCasbinAdapterSuite) TestRemoveFiltererPolicy() {
s.Error(s.adapter.RemoveFilteredPolicy("", "", 0))
}
func TestMetaCacheCasbinAdapter(t *testing.T) {
suite.Run(t, new(MetaCacheCasbinAdapterSuite))
}

View File

@ -5,10 +5,10 @@ import (
"fmt"
"reflect"
"strings"
"sync"
"github.com/casbin/casbin/v2"
"github.com/casbin/casbin/v2/model"
jsonadapter "github.com/casbin/json-adapter/v2"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@ -43,6 +43,26 @@ m = r.sub == p.sub && globMatch(r.obj, p.obj) && globMatch(r.act, p.act) || r.su
var templateModel = getPolicyModel(ModelStr)
var (
enforcer *casbin.SyncedEnforcer
initOnce sync.Once
)
func getEnforcer() *casbin.SyncedEnforcer {
initOnce.Do(func() {
e, err := casbin.NewSyncedEnforcer()
if err != nil {
log.Panic("failed to create casbin enforcer", zap.Error(err))
}
casbinModel := getPolicyModel(ModelStr)
adapter := NewMetaCacheCasbinAdapter(func() Cache { return globalMetaCache })
e.InitWithModelAndAdapter(casbinModel, adapter)
e.AddFunction("dbMatch", DBMatchFunc)
enforcer = e
})
return enforcer
}
func getPolicyModel(modelString string) model.Model {
m, err := model.NewModelFromString(modelString)
if err != nil {
@ -66,6 +86,7 @@ func PrivilegeInterceptor(ctx context.Context, req interface{}) (context.Context
if !Params.CommonCfg.AuthorizationEnabled.GetAsBool() {
return ctx, nil
}
log := log.Ctx(ctx)
log.Debug("PrivilegeInterceptor", zap.String("type", reflect.TypeOf(req).String()))
privilegeExt, err := funcutil.GetPrivilegeExtObj(req)
if err != nil {
@ -96,26 +117,14 @@ func PrivilegeInterceptor(ctx context.Context, req interface{}) (context.Context
objectNames := funcutil.GetObjectNames(req, objectNameIndexs)
objectPrivilege := privilegeExt.ObjectPrivilege.String()
dbName := GetCurDBNameFromContextOrDefault(ctx)
policyInfo := strings.Join(globalMetaCache.GetPrivilegeInfo(ctx), ",")
log := log.With(zap.String("username", username), zap.Strings("role_names", roleNames),
log = log.With(zap.String("username", username), zap.Strings("role_names", roleNames),
zap.String("object_type", objectType), zap.String("object_privilege", objectPrivilege),
zap.String("db_name", dbName),
zap.Int32("object_index", objectNameIndex), zap.String("object_name", objectName),
zap.Int32("object_indexs", objectNameIndexs), zap.Strings("object_names", objectNames),
zap.String("policy_info", policyInfo))
zap.Int32("object_indexs", objectNameIndexs), zap.Strings("object_names", objectNames))
policy := fmt.Sprintf("[%s]", policyInfo)
b := []byte(policy)
a := jsonadapter.NewAdapter(&b)
// the `templateModel` object isn't safe in the concurrent situation
casbinModel := templateModel.Copy()
e, err := casbin.NewEnforcer(casbinModel, a)
if err != nil {
log.Warn("NewEnforcer fail", zap.String("policy", policy), zap.Error(err))
return ctx, err
}
e.AddFunction("dbMatch", DBMatchFunc)
e := getEnforcer()
for _, roleName := range roleNames {
permitFunc := func(resName string) (bool, error) {
object := funcutil.PolicyForResource(dbName, objectType, resName)
@ -158,7 +167,7 @@ func PrivilegeInterceptor(ctx context.Context, req interface{}) (context.Context
}
}
log.Info("permission deny", zap.String("policy", policy), zap.Strings("roles", roleNames))
log.Info("permission deny", zap.Strings("roles", roleNames))
return ctx, status.Error(codes.PermissionDenied, fmt.Sprintf("%s: permission deny", objectPrivilege))
}