From 4e1b65d38f7a51762a56b01e15ef0fb32b0e4a84 Mon Sep 17 00:00:00 2001 From: SimFG Date: Mon, 7 Aug 2023 11:59:07 +0800 Subject: [PATCH] Refine the rbac cache update process (#26150) Signed-off-by: SimFG --- internal/proxy/meta_cache.go | 14 ++++++---- internal/proxy/meta_cache_test.go | 43 +++++++++++++++++++++++++++++++ internal/rootcoord/root_coord.go | 26 ++++++++++--------- 3 files changed, 66 insertions(+), 17 deletions(-) diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go index 6f5cada7f8..148f60c3cc 100644 --- a/internal/proxy/meta_cache.go +++ b/internal/proxy/meta_cache.go @@ -950,11 +950,12 @@ func (m *MetaCache) GetUserRole(user string) []string { } func (m *MetaCache) RefreshPolicyInfo(op typeutil.CacheOp) error { - m.mu.Lock() - defer m.mu.Unlock() - - if op.OpKey == "" { - return errors.New("empty op key") + if op.OpType != typeutil.CacheRefresh { + m.mu.Lock() + defer m.mu.Unlock() + if op.OpKey == "" { + return errors.New("empty op key") + } } switch op.OpType { case typeutil.CacheGrantPrivilege: @@ -990,6 +991,9 @@ func (m *MetaCache) RefreshPolicyInfo(op typeutil.CacheOp) error { log.Error("fail to init meta cache", zap.Error(err)) return err } + + m.mu.Lock() + defer m.mu.Unlock() m.userToRoles = make(map[string]map[string]struct{}) m.privilegeInfos = make(map[string]struct{}) m.unsafeInitPolicyInfo(resp.PolicyInfos, resp.UserRoles) diff --git a/internal/proxy/meta_cache_test.go b/internal/proxy/meta_cache_test.go index ab4974e343..ac658c3d2e 100644 --- a/internal/proxy/meta_cache_test.go +++ b/internal/proxy/meta_cache_test.go @@ -721,6 +721,49 @@ func TestMetaCache_PolicyInfo(t *testing.T) { err = globalMetaCache.RefreshPolicyInfo(typeutil.CacheOp{OpType: 100, OpKey: "policyX"}) assert.Error(t, err) }) + + t.Run("Delete user or drop role", func(t *testing.T) { + client.listPolicy = func(ctx context.Context, in *internalpb.ListPolicyRequest) (*internalpb.ListPolicyResponse, error) { + return &internalpb.ListPolicyResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + PolicyInfos: []string{"policy1", "policy2", "policy3"}, + UserRoles: []string{funcutil.EncodeUserRoleCache("foo", "role1"), funcutil.EncodeUserRoleCache("foo", "role2"), funcutil.EncodeUserRoleCache("foo2", "role2"), funcutil.EncodeUserRoleCache("foo2", "role3")}, + }, nil + } + err := InitMetaCache(context.Background(), client, qc, mgr) + assert.NoError(t, err) + + err = globalMetaCache.RefreshPolicyInfo(typeutil.CacheOp{OpType: typeutil.CacheDeleteUser, OpKey: "foo"}) + assert.NoError(t, err) + + roles := globalMetaCache.GetUserRole("foo") + assert.Len(t, roles, 0) + + roles = globalMetaCache.GetUserRole("foo2") + assert.Len(t, roles, 2) + + err = globalMetaCache.RefreshPolicyInfo(typeutil.CacheOp{OpType: typeutil.CacheDropRole, OpKey: "role2"}) + assert.NoError(t, err) + roles = globalMetaCache.GetUserRole("foo2") + assert.Len(t, roles, 1) + assert.Equal(t, "role3", roles[0]) + + client.listPolicy = func(ctx context.Context, in *internalpb.ListPolicyRequest) (*internalpb.ListPolicyResponse, error) { + return &internalpb.ListPolicyResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + PolicyInfos: []string{"policy1", "policy2", "policy3"}, + UserRoles: []string{funcutil.EncodeUserRoleCache("foo", "role1"), funcutil.EncodeUserRoleCache("foo", "role2"), funcutil.EncodeUserRoleCache("foo2", "role2"), funcutil.EncodeUserRoleCache("foo2", "role3")}, + }, nil + } + err = globalMetaCache.RefreshPolicyInfo(typeutil.CacheOp{OpType: typeutil.CacheRefresh}) + assert.NoError(t, err) + roles = globalMetaCache.GetUserRole("foo") + assert.Len(t, roles, 2) + }) } func TestMetaCache_RemoveCollection(t *testing.T) { diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index a99d78adb8..e89a4634df 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -636,21 +636,23 @@ func (c *Core) startInternal() error { c.scheduler.Start() c.stepExecutor.Start() + go func() { + // refresh rbac cache + if err := retry.Do(c.ctx, func() error { + if err := c.proxyClientManager.RefreshPolicyInfoCache(c.ctx, &proxypb.RefreshPolicyInfoCacheRequest{ + OpType: int32(typeutil.CacheRefresh), + }); err != nil { + log.Warn("fail to refresh policy info cache", zap.Error(err)) + return err + } + return nil + }, retry.Attempts(100), retry.Sleep(time.Second)); err != nil { + log.Panic("fail to refresh policy info cache", zap.Error(err)) + } + }() c.startServerLoop() c.UpdateStateCode(commonpb.StateCode_Healthy) - // refresh rbac cache - if err := retry.Do(c.ctx, func() error { - if err := c.proxyClientManager.RefreshPolicyInfoCache(c.ctx, &proxypb.RefreshPolicyInfoCacheRequest{ - OpType: int32(typeutil.CacheRefresh), - }); err != nil { - log.Warn("fail to refresh policy info cache", zap.Error(err)) - return err - } - return nil - }, retry.Attempts(100), retry.Sleep(time.Second)); err != nil { - log.Panic("fail to refresh policy info cache", zap.Error(err)) - } logutil.Logger(c.ctx).Info("rootcoord startup successfully") return nil