mirror of https://github.com/milvus-io/milvus.git
Refine the rbac cache update process (#26150)
Signed-off-by: SimFG <bang.fu@zilliz.com>pull/26166/head
parent
ae3a3d148c
commit
4e1b65d38f
|
@ -950,11 +950,12 @@ func (m *MetaCache) GetUserRole(user string) []string {
|
|||
}
|
||||
|
||||
func (m *MetaCache) RefreshPolicyInfo(op typeutil.CacheOp) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
if op.OpKey == "" {
|
||||
return errors.New("empty op key")
|
||||
if op.OpType != typeutil.CacheRefresh {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if op.OpKey == "" {
|
||||
return errors.New("empty op key")
|
||||
}
|
||||
}
|
||||
switch op.OpType {
|
||||
case typeutil.CacheGrantPrivilege:
|
||||
|
@ -990,6 +991,9 @@ func (m *MetaCache) RefreshPolicyInfo(op typeutil.CacheOp) error {
|
|||
log.Error("fail to init meta cache", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.userToRoles = make(map[string]map[string]struct{})
|
||||
m.privilegeInfos = make(map[string]struct{})
|
||||
m.unsafeInitPolicyInfo(resp.PolicyInfos, resp.UserRoles)
|
||||
|
|
|
@ -721,6 +721,49 @@ func TestMetaCache_PolicyInfo(t *testing.T) {
|
|||
err = globalMetaCache.RefreshPolicyInfo(typeutil.CacheOp{OpType: 100, OpKey: "policyX"})
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("Delete user or drop role", func(t *testing.T) {
|
||||
client.listPolicy = func(ctx context.Context, in *internalpb.ListPolicyRequest) (*internalpb.ListPolicyResponse, error) {
|
||||
return &internalpb.ListPolicyResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
},
|
||||
PolicyInfos: []string{"policy1", "policy2", "policy3"},
|
||||
UserRoles: []string{funcutil.EncodeUserRoleCache("foo", "role1"), funcutil.EncodeUserRoleCache("foo", "role2"), funcutil.EncodeUserRoleCache("foo2", "role2"), funcutil.EncodeUserRoleCache("foo2", "role3")},
|
||||
}, nil
|
||||
}
|
||||
err := InitMetaCache(context.Background(), client, qc, mgr)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = globalMetaCache.RefreshPolicyInfo(typeutil.CacheOp{OpType: typeutil.CacheDeleteUser, OpKey: "foo"})
|
||||
assert.NoError(t, err)
|
||||
|
||||
roles := globalMetaCache.GetUserRole("foo")
|
||||
assert.Len(t, roles, 0)
|
||||
|
||||
roles = globalMetaCache.GetUserRole("foo2")
|
||||
assert.Len(t, roles, 2)
|
||||
|
||||
err = globalMetaCache.RefreshPolicyInfo(typeutil.CacheOp{OpType: typeutil.CacheDropRole, OpKey: "role2"})
|
||||
assert.NoError(t, err)
|
||||
roles = globalMetaCache.GetUserRole("foo2")
|
||||
assert.Len(t, roles, 1)
|
||||
assert.Equal(t, "role3", roles[0])
|
||||
|
||||
client.listPolicy = func(ctx context.Context, in *internalpb.ListPolicyRequest) (*internalpb.ListPolicyResponse, error) {
|
||||
return &internalpb.ListPolicyResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
},
|
||||
PolicyInfos: []string{"policy1", "policy2", "policy3"},
|
||||
UserRoles: []string{funcutil.EncodeUserRoleCache("foo", "role1"), funcutil.EncodeUserRoleCache("foo", "role2"), funcutil.EncodeUserRoleCache("foo2", "role2"), funcutil.EncodeUserRoleCache("foo2", "role3")},
|
||||
}, nil
|
||||
}
|
||||
err = globalMetaCache.RefreshPolicyInfo(typeutil.CacheOp{OpType: typeutil.CacheRefresh})
|
||||
assert.NoError(t, err)
|
||||
roles = globalMetaCache.GetUserRole("foo")
|
||||
assert.Len(t, roles, 2)
|
||||
})
|
||||
}
|
||||
|
||||
func TestMetaCache_RemoveCollection(t *testing.T) {
|
||||
|
|
|
@ -636,21 +636,23 @@ func (c *Core) startInternal() error {
|
|||
|
||||
c.scheduler.Start()
|
||||
c.stepExecutor.Start()
|
||||
go func() {
|
||||
// refresh rbac cache
|
||||
if err := retry.Do(c.ctx, func() error {
|
||||
if err := c.proxyClientManager.RefreshPolicyInfoCache(c.ctx, &proxypb.RefreshPolicyInfoCacheRequest{
|
||||
OpType: int32(typeutil.CacheRefresh),
|
||||
}); err != nil {
|
||||
log.Warn("fail to refresh policy info cache", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}, retry.Attempts(100), retry.Sleep(time.Second)); err != nil {
|
||||
log.Panic("fail to refresh policy info cache", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
c.startServerLoop()
|
||||
c.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
// refresh rbac cache
|
||||
if err := retry.Do(c.ctx, func() error {
|
||||
if err := c.proxyClientManager.RefreshPolicyInfoCache(c.ctx, &proxypb.RefreshPolicyInfoCacheRequest{
|
||||
OpType: int32(typeutil.CacheRefresh),
|
||||
}); err != nil {
|
||||
log.Warn("fail to refresh policy info cache", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}, retry.Attempts(100), retry.Sleep(time.Second)); err != nil {
|
||||
log.Panic("fail to refresh policy info cache", zap.Error(err))
|
||||
}
|
||||
logutil.Logger(c.ctx).Info("rootcoord startup successfully")
|
||||
|
||||
return nil
|
||||
|
|
Loading…
Reference in New Issue