Close #677. Extract coordinator.Permissions

All ClusterAdmins now qualify as DbAdmins too.
pull/687/head
Mark Rushakoff 2014-06-22 13:36:42 -07:00 committed by John Shahid
parent 574646bb2d
commit 083c30d050
5 changed files with 569 additions and 38 deletions

View File

@ -88,6 +88,10 @@ func (self *ClusterAdmin) IsClusterAdmin() bool {
return true
}
func (self *ClusterAdmin) IsDbAdmin(_ string) bool {
return true
}
func (self *ClusterAdmin) HasWriteAccess(_ string) bool {
return true
}

View File

@ -26,6 +26,7 @@ func (self *UserSuite) SetUpSuite(c *C) {
func (self *UserSuite) TestProperties(c *C) {
u := ClusterAdmin{CommonUser{Name: "root"}}
c.Assert(u.IsClusterAdmin(), Equals, true)
c.Assert(u.IsDbAdmin("db"), Equals, true)
c.Assert(u.GetName(), Equals, "root")
hash, err := HashPassword("foobar")
c.Assert(err, IsNil)

View File

@ -21,6 +21,7 @@ type CoordinatorImpl struct {
clusterConfiguration *cluster.ClusterConfiguration
raftServer ClusterConsensus
config *configuration.Configuration
permissions Permissions
}
const (
@ -60,6 +61,7 @@ func NewCoordinatorImpl(config *configuration.Configuration, raftServer ClusterC
config: config,
clusterConfiguration: clusterConfiguration,
raftServer: raftServer,
permissions: Permissions{},
}
return coordinator
@ -200,8 +202,8 @@ func (self *CoordinatorImpl) runListSeriesQuery(querySpec *parser.QuerySpec, ser
func (self *CoordinatorImpl) runDeleteQuery(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error {
user := querySpec.User()
db := querySpec.Database()
if !user.IsClusterAdmin() && !user.IsDbAdmin(db) {
return common.NewAuthorizationError("Insufficient permission to write to %s", db)
if ok, err := self.permissions.AuthorizeDeleteQuery(user, db); !ok {
return err
}
querySpec.RunAgainstAllServersInShard = true
return self.runQuerySpec(querySpec, seriesWriter)
@ -211,8 +213,8 @@ func (self *CoordinatorImpl) runDropSeriesQuery(querySpec *parser.QuerySpec, ser
user := querySpec.User()
db := querySpec.Database()
series := querySpec.Query().DropSeriesQuery.GetTableName()
if !user.IsClusterAdmin() && !user.IsDbAdmin(db) && !user.HasWriteAccess(series) {
return common.NewAuthorizationError("Insufficient permissions to drop series")
if ok, err := self.permissions.AuthorizeDropSeries(user, db, series); !ok {
return err
}
querySpec.RunAgainstAllServersInShard = true
return self.runQuerySpec(querySpec, seriesWriter)
@ -727,8 +729,8 @@ func (self *CoordinatorImpl) write(db string, series []*protocol.Series, shard c
}
func (self *CoordinatorImpl) CreateContinuousQuery(user common.User, db string, query string) error {
if !user.IsClusterAdmin() && !user.IsDbAdmin(db) {
return common.NewAuthorizationError("Insufficient permissions to create continuous query")
if ok, err := self.permissions.AuthorizeCreateContinuousQuery(user, db); !ok {
return err
}
err := self.raftServer.CreateContinuousQuery(db, query)
@ -739,8 +741,8 @@ func (self *CoordinatorImpl) CreateContinuousQuery(user common.User, db string,
}
func (self *CoordinatorImpl) DeleteContinuousQuery(user common.User, db string, id uint32) error {
if !user.IsClusterAdmin() && !user.IsDbAdmin(db) {
return common.NewAuthorizationError("Insufficient permissions to delete continuous query")
if ok, err := self.permissions.AuthorizeDeleteContinuousQuery(user, db); !ok {
return err
}
err := self.raftServer.DeleteContinuousQuery(db, id)
@ -751,8 +753,8 @@ func (self *CoordinatorImpl) DeleteContinuousQuery(user common.User, db string,
}
func (self *CoordinatorImpl) ListContinuousQueries(user common.User, db string) ([]*protocol.Series, error) {
if !user.IsClusterAdmin() && !user.IsDbAdmin(db) {
return nil, common.NewAuthorizationError("Insufficient permissions to list continuous queries")
if ok, err := self.permissions.AuthorizeListContinuousQueries(user, db); !ok {
return nil, err
}
queries := self.clusterConfiguration.GetContinuousQueries(db)
@ -778,8 +780,8 @@ func (self *CoordinatorImpl) ListContinuousQueries(user common.User, db string)
}
func (self *CoordinatorImpl) CreateDatabase(user common.User, db string) error {
if !user.IsClusterAdmin() {
return common.NewAuthorizationError("Insufficient permissions to create database")
if ok, err := self.permissions.AuthorizeCreateDatabase(user); !ok {
return err
}
if !isValidName(db) {
@ -794,8 +796,8 @@ func (self *CoordinatorImpl) CreateDatabase(user common.User, db string) error {
}
func (self *CoordinatorImpl) ListDatabases(user common.User) ([]*cluster.Database, error) {
if !user.IsClusterAdmin() {
return nil, common.NewAuthorizationError("Insufficient permissions to list databases")
if ok, err := self.permissions.AuthorizeListDatabases(user); !ok {
return nil, err
}
dbs := self.clusterConfiguration.GetDatabases()
@ -803,8 +805,8 @@ func (self *CoordinatorImpl) ListDatabases(user common.User) ([]*cluster.Databas
}
func (self *CoordinatorImpl) DropDatabase(user common.User, db string) error {
if !user.IsClusterAdmin() {
return common.NewAuthorizationError("Insufficient permissions to drop database")
if ok, err := self.permissions.AuthorizeDropDatabase(user); !ok {
return err
}
if err := self.clusterConfiguration.CreateCheckpoint(); err != nil {
@ -841,16 +843,16 @@ func (self *CoordinatorImpl) AuthenticateClusterAdmin(username, password string)
}
func (self *CoordinatorImpl) ListClusterAdmins(requester common.User) ([]string, error) {
if !requester.IsClusterAdmin() {
return nil, common.NewAuthorizationError("Insufficient permissions")
if ok, err := self.permissions.AuthorizeListClusterAdmins(requester); !ok {
return nil, err
}
return self.clusterConfiguration.GetClusterAdmins(), nil
}
func (self *CoordinatorImpl) CreateClusterAdminUser(requester common.User, username, password string) error {
if !requester.IsClusterAdmin() {
return common.NewAuthorizationError("Insufficient permissions")
if ok, err := self.permissions.AuthorizeCreateClusterAdmin(requester); !ok {
return err
}
if !isValidName(username) {
@ -870,8 +872,8 @@ func (self *CoordinatorImpl) CreateClusterAdminUser(requester common.User, usern
}
func (self *CoordinatorImpl) DeleteClusterAdminUser(requester common.User, username string) error {
if !requester.IsClusterAdmin() {
return common.NewAuthorizationError("Insufficient permissions")
if ok, err := self.permissions.AuthorizeDeleteClusterAdmin(requester); !ok {
return err
}
user := self.clusterConfiguration.GetClusterAdmin(username)
@ -884,8 +886,8 @@ func (self *CoordinatorImpl) DeleteClusterAdminUser(requester common.User, usern
}
func (self *CoordinatorImpl) ChangeClusterAdminPassword(requester common.User, username, password string) error {
if !requester.IsClusterAdmin() {
return common.NewAuthorizationError("Insufficient permissions")
if ok, err := self.permissions.AuthorizeChangeClusterAdminPassword(requester); !ok {
return err
}
user := self.clusterConfiguration.GetClusterAdmin(username)
@ -902,8 +904,8 @@ func (self *CoordinatorImpl) ChangeClusterAdminPassword(requester common.User, u
}
func (self *CoordinatorImpl) CreateDbUser(requester common.User, db, username, password string, permissions ...string) error {
if !requester.IsClusterAdmin() && !requester.IsDbAdmin(db) {
return common.NewAuthorizationError("Insufficient permissions")
if ok, err := self.permissions.AuthorizeCreateDbUser(requester, db); !ok {
return err
}
if username == "" {
@ -943,8 +945,8 @@ func (self *CoordinatorImpl) CreateDbUser(requester common.User, db, username, p
}
func (self *CoordinatorImpl) DeleteDbUser(requester common.User, db, username string) error {
if !requester.IsClusterAdmin() && !requester.IsDbAdmin(db) {
return common.NewAuthorizationError("Insufficient permissions")
if ok, err := self.permissions.AuthorizeDeleteDbUser(requester, db); !ok {
return err
}
user := self.clusterConfiguration.GetDbUser(db, username)
@ -956,16 +958,16 @@ func (self *CoordinatorImpl) DeleteDbUser(requester common.User, db, username st
}
func (self *CoordinatorImpl) ListDbUsers(requester common.User, db string) ([]common.User, error) {
if !requester.IsClusterAdmin() && !requester.IsDbAdmin(db) {
return nil, common.NewAuthorizationError("Insufficient permissions")
if ok, err := self.permissions.AuthorizeListDbUsers(requester, db); !ok {
return nil, err
}
return self.clusterConfiguration.GetDbUsers(db), nil
}
func (self *CoordinatorImpl) GetDbUser(requester common.User, db string, username string) (common.User, error) {
if !requester.IsClusterAdmin() && !requester.IsDbAdmin(db) {
return nil, common.NewAuthorizationError("Insufficient permissions")
if ok, err := self.permissions.AuthorizeGetDbUser(requester, db); !ok {
return nil, err
}
dbUser := self.clusterConfiguration.GetDbUser(db, username)
@ -977,8 +979,8 @@ func (self *CoordinatorImpl) GetDbUser(requester common.User, db string, usernam
}
func (self *CoordinatorImpl) ChangeDbUserPassword(requester common.User, db, username, password string) error {
if !requester.IsClusterAdmin() && !requester.IsDbAdmin(db) && !(requester.GetDb() == db && requester.GetName() == username) {
return common.NewAuthorizationError("Insufficient permissions")
if ok, err := self.permissions.AuthorizeChangeDbUserPassword(requester, db, username); !ok {
return err
}
hash, err := cluster.HashPassword(password)
@ -989,16 +991,16 @@ func (self *CoordinatorImpl) ChangeDbUserPassword(requester common.User, db, use
}
func (self *CoordinatorImpl) ChangeDbUserPermissions(requester common.User, db, username, readPermissions, writePermissions string) error {
if !requester.IsClusterAdmin() && !requester.IsDbAdmin(db) {
return common.NewAuthorizationError("Insufficient permissions")
if ok, err := self.permissions.AuthorizeChangeDbUserPermissions(requester, db); !ok {
return err
}
return self.raftServer.ChangeDbUserPermissions(db, username, readPermissions, writePermissions)
}
func (self *CoordinatorImpl) SetDbAdmin(requester common.User, db, username string, isAdmin bool) error {
if !requester.IsClusterAdmin() && !requester.IsDbAdmin(db) {
return common.NewAuthorizationError("Insufficient permissions")
if ok, err := self.permissions.AuthorizeGrantDbUserAdmin(requester, db); !ok {
return err
}
user := self.clusterConfiguration.GetDbUser(db, username)

View File

@ -0,0 +1,159 @@
package coordinator
import (
"common"
)
type Permissions struct{}
func (self *Permissions) AuthorizeDeleteQuery(user common.User, db string) (ok bool, err common.AuthorizationError) {
if !user.IsDbAdmin(db) {
return false, common.NewAuthorizationError("Insufficient permission to write to %s", db)
}
return true, ""
}
func (self *Permissions) AuthorizeDropSeries(user common.User, db string, seriesName string) (ok bool, err common.AuthorizationError) {
if !user.IsDbAdmin(db) && !user.HasWriteAccess(seriesName) {
return false, common.NewAuthorizationError("Insufficient permissions to drop series")
}
return true, ""
}
func (self *Permissions) AuthorizeCreateContinuousQuery(user common.User, db string) (ok bool, err common.AuthorizationError) {
if !user.IsDbAdmin(db) {
return false, common.NewAuthorizationError("Insufficient permissions to create continuous query")
}
return true, ""
}
func (self *Permissions) AuthorizeDeleteContinuousQuery(user common.User, db string) (ok bool, err common.AuthorizationError) {
if !user.IsDbAdmin(db) {
return false, common.NewAuthorizationError("Insufficient permissions to delete continuous query")
}
return true, ""
}
func (self *Permissions) AuthorizeListContinuousQueries(user common.User, db string) (ok bool, err common.AuthorizationError) {
if !user.IsDbAdmin(db) {
return false, common.NewAuthorizationError("Insufficient permissions to list continuous queries")
}
return true, ""
}
func (self *Permissions) AuthorizeCreateDatabase(user common.User) (ok bool, err common.AuthorizationError) {
if !user.IsClusterAdmin() {
return false, common.NewAuthorizationError("Insufficient permissions to create database")
}
return true, ""
}
func (self *Permissions) AuthorizeListDatabases(user common.User) (ok bool, err common.AuthorizationError) {
if !user.IsClusterAdmin() {
return false, common.NewAuthorizationError("Insufficient permissions to list databases")
}
return true, ""
}
func (self *Permissions) AuthorizeDropDatabase(user common.User) (ok bool, err common.AuthorizationError) {
if !user.IsClusterAdmin() {
return false, common.NewAuthorizationError("Insufficient permissions to drop database")
}
return true, ""
}
func (self *Permissions) AuthorizeListClusterAdmins(user common.User) (ok bool, err common.AuthorizationError) {
if !user.IsClusterAdmin() {
return false, common.NewAuthorizationError("Insufficient permissions to list cluster admins")
}
return true, ""
}
func (self *Permissions) AuthorizeCreateClusterAdmin(user common.User) (ok bool, err common.AuthorizationError) {
if !user.IsClusterAdmin() {
return false, common.NewAuthorizationError("Insufficient permissions to create cluster admin")
}
return true, ""
}
func (self *Permissions) AuthorizeDeleteClusterAdmin(user common.User) (ok bool, err common.AuthorizationError) {
if !user.IsClusterAdmin() {
return false, common.NewAuthorizationError("Insufficient permissions to delete cluster admin")
}
return true, ""
}
func (self *Permissions) AuthorizeChangeClusterAdminPassword(user common.User) (ok bool, err common.AuthorizationError) {
if !user.IsClusterAdmin() {
return false, common.NewAuthorizationError("Insufficient permissions to change cluster admin password")
}
return true, ""
}
func (self *Permissions) AuthorizeCreateDbUser(user common.User, db string) (ok bool, err common.AuthorizationError) {
if !user.IsDbAdmin(db) {
return false, common.NewAuthorizationError("Insufficient permissions to create db user on %s", db)
}
return true, ""
}
func (self *Permissions) AuthorizeDeleteDbUser(user common.User, db string) (ok bool, err common.AuthorizationError) {
if !user.IsDbAdmin(db) {
return false, common.NewAuthorizationError("Insufficient permissions to delete db user on %s", db)
}
return true, ""
}
func (self *Permissions) AuthorizeListDbUsers(user common.User, db string) (ok bool, err common.AuthorizationError) {
if !user.IsDbAdmin(db) {
return false, common.NewAuthorizationError("Insufficient permissions to list db users on %s", db)
}
return true, ""
}
func (self *Permissions) AuthorizeGetDbUser(user common.User, db string) (ok bool, err common.AuthorizationError) {
if !user.IsDbAdmin(db) {
return false, common.NewAuthorizationError("Insufficient permissions to get db user on %s", db)
}
return true, ""
}
func (self *Permissions) AuthorizeChangeDbUserPassword(user common.User, db string, targetUsername string) (ok bool, err common.AuthorizationError) {
if !user.IsDbAdmin(db) && !(user.GetDb() == db && user.GetName() == targetUsername) {
return false, common.NewAuthorizationError("Insufficient permissions to change db user password for %s on %s", targetUsername, db)
}
return true, ""
}
func (self *Permissions) AuthorizeChangeDbUserPermissions(user common.User, db string) (ok bool, err common.AuthorizationError) {
if !user.IsDbAdmin(db) {
return false, common.NewAuthorizationError("Insufficient permissions to change db user permissions on %s", db)
}
return true, ""
}
func (self *Permissions) AuthorizeGrantDbUserAdmin(user common.User, db string) (ok bool, err common.AuthorizationError) {
if !user.IsDbAdmin(db) {
return false, common.NewAuthorizationError("Insufficient permissions to grant db user admin privileges on %s", db)
}
return true, ""
}

View File

@ -0,0 +1,365 @@
package coordinator
import (
"cluster"
"common"
. "launchpad.net/gocheck"
)
type PermissionsSuite struct {
permissions Permissions
commonUser *cluster.DbUser
commonUserNoWrite *cluster.DbUser
dbAdmin *cluster.DbUser
clusterAdmin *cluster.ClusterAdmin
}
var _ = Suite(&PermissionsSuite{})
func (self *PermissionsSuite) SetUpSuite(c *C) {
self.permissions = Permissions{}
self.dbAdmin = &cluster.DbUser{cluster.CommonUser{"db_admin", "", false, "db_admin"}, "db", nil, nil, true}
self.commonUser = &cluster.DbUser{cluster.CommonUser{"common_user", "", false, "common_user"}, "db", []*cluster.Matcher{{true, ".*"}}, []*cluster.Matcher{{true, ".*"}}, false}
self.commonUserNoWrite = &cluster.DbUser{cluster.CommonUser{"no_write_user", "", false, "no_write_user"}, "db", nil, nil, false}
self.clusterAdmin = &cluster.ClusterAdmin{cluster.CommonUser{"root", "", false, "root"}}
}
func (self *PermissionsSuite) TestAuthorizeDeleteQuery(c *C) {
var ok bool
var err common.AuthorizationError
authErr := common.NewAuthorizationError("Insufficient permission to write to db")
ok, err = self.permissions.AuthorizeDeleteQuery(self.commonUser, "db")
c.Assert(ok, Equals, false)
c.Assert(err, Equals, authErr)
ok, _ = self.permissions.AuthorizeDeleteQuery(self.dbAdmin, "db")
c.Assert(ok, Equals, true)
ok, _ = self.permissions.AuthorizeDeleteQuery(self.clusterAdmin, "db")
c.Assert(ok, Equals, true)
}
func (self *PermissionsSuite) TestAuthorizeDropSeries(c *C) {
var ok bool
var err common.AuthorizationError
authErr := common.NewAuthorizationError("Insufficient permissions to drop series")
ok, _ = self.permissions.AuthorizeDropSeries(self.dbAdmin, "db", "series")
c.Assert(ok, Equals, true)
ok, _ = self.permissions.AuthorizeDropSeries(self.clusterAdmin, "db", "series")
c.Assert(ok, Equals, true)
ok, _ = self.permissions.AuthorizeDropSeries(self.commonUser, "db", "series")
c.Assert(ok, Equals, true)
ok, err = self.permissions.AuthorizeDropSeries(self.commonUserNoWrite, "db", "series")
c.Assert(ok, Equals, false)
c.Assert(err, Equals, authErr)
}
func (self *PermissionsSuite) TestAuthorizeCreateContinuousQuery(c *C) {
var ok bool
var err common.AuthorizationError
authErr := common.NewAuthorizationError("Insufficient permissions to create continuous query")
ok, err = self.permissions.AuthorizeCreateContinuousQuery(self.commonUser, "db")
c.Assert(ok, Equals, false)
c.Assert(err, Equals, authErr)
ok, _ = self.permissions.AuthorizeCreateContinuousQuery(self.dbAdmin, "db")
c.Assert(ok, Equals, true)
ok, _ = self.permissions.AuthorizeCreateContinuousQuery(self.clusterAdmin, "db")
c.Assert(ok, Equals, true)
}
func (self *PermissionsSuite) TestAuthorizeDeleteContinuousQuery(c *C) {
var ok bool
var err common.AuthorizationError
authErr := common.NewAuthorizationError("Insufficient permissions to delete continuous query")
ok, err = self.permissions.AuthorizeDeleteContinuousQuery(self.commonUser, "db")
c.Assert(ok, Equals, false)
c.Assert(err, Equals, authErr)
ok, _ = self.permissions.AuthorizeDeleteContinuousQuery(self.dbAdmin, "db")
c.Assert(ok, Equals, true)
ok, _ = self.permissions.AuthorizeDeleteContinuousQuery(self.clusterAdmin, "db")
c.Assert(ok, Equals, true)
}
func (self *PermissionsSuite) TestAuthorizeListContinuousQueries(c *C) {
var ok bool
var err common.AuthorizationError
authErr := common.NewAuthorizationError("Insufficient permissions to list continuous queries")
ok, err = self.permissions.AuthorizeListContinuousQueries(self.commonUser, "db")
c.Assert(ok, Equals, false)
c.Assert(err, Equals, authErr)
ok, _ = self.permissions.AuthorizeListContinuousQueries(self.dbAdmin, "db")
c.Assert(ok, Equals, true)
ok, _ = self.permissions.AuthorizeListContinuousQueries(self.clusterAdmin, "db")
c.Assert(ok, Equals, true)
}
func (self *PermissionsSuite) TestAuthorizeCreateDatabase(c *C) {
var ok bool
var err common.AuthorizationError
authErr := common.NewAuthorizationError("Insufficient permissions to create database")
ok, err = self.permissions.AuthorizeCreateDatabase(self.commonUser)
c.Assert(ok, Equals, false)
c.Assert(err, Equals, authErr)
ok, _ = self.permissions.AuthorizeCreateDatabase(self.dbAdmin)
c.Assert(ok, Equals, false)
c.Assert(err, Equals, authErr)
ok, _ = self.permissions.AuthorizeCreateDatabase(self.clusterAdmin)
c.Assert(ok, Equals, true)
}
func (self *PermissionsSuite) TestAuthorizeListDatabases(c *C) {
var ok bool
var err common.AuthorizationError
authErr := common.NewAuthorizationError("Insufficient permissions to list databases")
ok, err = self.permissions.AuthorizeListDatabases(self.commonUser)
c.Assert(ok, Equals, false)
c.Assert(err, Equals, authErr)
ok, _ = self.permissions.AuthorizeListDatabases(self.dbAdmin)
c.Assert(ok, Equals, false)
c.Assert(err, Equals, authErr)
ok, _ = self.permissions.AuthorizeListDatabases(self.clusterAdmin)
c.Assert(ok, Equals, true)
}
func (self *PermissionsSuite) TestAuthorizeDropDatabase(c *C) {
var ok bool
var err common.AuthorizationError
authErr := common.NewAuthorizationError("Insufficient permissions to drop database")
ok, err = self.permissions.AuthorizeDropDatabase(self.commonUser)
c.Assert(ok, Equals, false)
c.Assert(err, Equals, authErr)
ok, _ = self.permissions.AuthorizeDropDatabase(self.dbAdmin)
c.Assert(ok, Equals, false)
c.Assert(err, Equals, authErr)
ok, _ = self.permissions.AuthorizeDropDatabase(self.clusterAdmin)
c.Assert(ok, Equals, true)
}
func (self *PermissionsSuite) TestAuthorizeListClusterAdmins(c *C) {
var ok bool
var err common.AuthorizationError
authErr := common.NewAuthorizationError("Insufficient permissions to list cluster admins")
ok, err = self.permissions.AuthorizeListClusterAdmins(self.commonUser)
c.Assert(ok, Equals, false)
c.Assert(err, Equals, authErr)
ok, _ = self.permissions.AuthorizeListClusterAdmins(self.dbAdmin)
c.Assert(ok, Equals, false)
c.Assert(err, Equals, authErr)
ok, _ = self.permissions.AuthorizeListClusterAdmins(self.clusterAdmin)
c.Assert(ok, Equals, true)
}
func (self *PermissionsSuite) TestAuthorizeCreateClusterAdmin(c *C) {
var ok bool
var err common.AuthorizationError
authErr := common.NewAuthorizationError("Insufficient permissions to create cluster admin")
ok, err = self.permissions.AuthorizeCreateClusterAdmin(self.commonUser)
c.Assert(ok, Equals, false)
c.Assert(err, Equals, authErr)
ok, _ = self.permissions.AuthorizeCreateClusterAdmin(self.dbAdmin)
c.Assert(ok, Equals, false)
c.Assert(err, Equals, authErr)
ok, _ = self.permissions.AuthorizeCreateClusterAdmin(self.clusterAdmin)
c.Assert(ok, Equals, true)
}
func (self *PermissionsSuite) TestAuthorizeDeleteClusterAdmin(c *C) {
var ok bool
var err common.AuthorizationError
authErr := common.NewAuthorizationError("Insufficient permissions to delete cluster admin")
ok, err = self.permissions.AuthorizeDeleteClusterAdmin(self.commonUser)
c.Assert(ok, Equals, false)
c.Assert(err, Equals, authErr)
ok, _ = self.permissions.AuthorizeDeleteClusterAdmin(self.dbAdmin)
c.Assert(ok, Equals, false)
c.Assert(err, Equals, authErr)
ok, _ = self.permissions.AuthorizeDeleteClusterAdmin(self.clusterAdmin)
c.Assert(ok, Equals, true)
}
func (self *PermissionsSuite) TestAuthorizeChangeClusterAdminPassword(c *C) {
var ok bool
var err common.AuthorizationError
authErr := common.NewAuthorizationError("Insufficient permissions to change cluster admin password")
ok, err = self.permissions.AuthorizeChangeClusterAdminPassword(self.commonUser)
c.Assert(ok, Equals, false)
c.Assert(err, Equals, authErr)
ok, _ = self.permissions.AuthorizeChangeClusterAdminPassword(self.dbAdmin)
c.Assert(ok, Equals, false)
c.Assert(err, Equals, authErr)
ok, _ = self.permissions.AuthorizeChangeClusterAdminPassword(self.clusterAdmin)
c.Assert(ok, Equals, true)
}
func (self *PermissionsSuite) TestAuthorizeCreateDbUser(c *C) {
var ok bool
var err common.AuthorizationError
authErr := common.NewAuthorizationError("Insufficient permissions to create db user on db")
ok, err = self.permissions.AuthorizeCreateDbUser(self.commonUser, "db")
c.Assert(ok, Equals, false)
c.Assert(err, Equals, authErr)
ok, _ = self.permissions.AuthorizeCreateDbUser(self.dbAdmin, "db")
c.Assert(ok, Equals, true)
ok, _ = self.permissions.AuthorizeCreateDbUser(self.clusterAdmin, "db")
c.Assert(ok, Equals, true)
}
func (self *PermissionsSuite) TestAuthorizeDeleteDbUser(c *C) {
var ok bool
var err common.AuthorizationError
authErr := common.NewAuthorizationError("Insufficient permissions to delete db user on db")
ok, err = self.permissions.AuthorizeDeleteDbUser(self.commonUser, "db")
c.Assert(ok, Equals, false)
c.Assert(err, Equals, authErr)
ok, _ = self.permissions.AuthorizeDeleteDbUser(self.dbAdmin, "db")
c.Assert(ok, Equals, true)
ok, _ = self.permissions.AuthorizeDeleteDbUser(self.clusterAdmin, "db")
c.Assert(ok, Equals, true)
}
func (self *PermissionsSuite) TestAuthorizeListDbUsers(c *C) {
var ok bool
var err common.AuthorizationError
authErr := common.NewAuthorizationError("Insufficient permissions to list db users on db")
ok, err = self.permissions.AuthorizeListDbUsers(self.commonUser, "db")
c.Assert(ok, Equals, false)
c.Assert(err, Equals, authErr)
ok, _ = self.permissions.AuthorizeListDbUsers(self.dbAdmin, "db")
c.Assert(ok, Equals, true)
ok, _ = self.permissions.AuthorizeListDbUsers(self.clusterAdmin, "db")
c.Assert(ok, Equals, true)
}
func (self *PermissionsSuite) TestAuthorizeGetDbUser(c *C) {
var ok bool
var err common.AuthorizationError
authErr := common.NewAuthorizationError("Insufficient permissions to get db user on db")
ok, err = self.permissions.AuthorizeGetDbUser(self.commonUser, "db")
c.Assert(ok, Equals, false)
c.Assert(err, Equals, authErr)
ok, _ = self.permissions.AuthorizeGetDbUser(self.dbAdmin, "db")
c.Assert(ok, Equals, true)
ok, _ = self.permissions.AuthorizeGetDbUser(self.clusterAdmin, "db")
c.Assert(ok, Equals, true)
}
func (self *PermissionsSuite) TestAuthorizeChangeDbUserPassword(c *C) {
var ok bool
var err common.AuthorizationError
authErr := common.NewAuthorizationError("Insufficient permissions to change db user password for someone on db")
ok, err = self.permissions.AuthorizeChangeDbUserPassword(self.commonUser, "db", "someone")
c.Assert(ok, Equals, false)
c.Assert(err, Equals, authErr)
ok, _ = self.permissions.AuthorizeChangeDbUserPassword(self.dbAdmin, "db", "someone")
c.Assert(ok, Equals, true)
ok, _ = self.permissions.AuthorizeChangeDbUserPassword(self.clusterAdmin, "db", "someone")
c.Assert(ok, Equals, true)
ok, _ = self.permissions.AuthorizeChangeDbUserPassword(self.commonUser, "db", "common_user")
c.Assert(ok, Equals, true)
ok, _ = self.permissions.AuthorizeChangeDbUserPassword(self.commonUser, "db", "no_write_user")
c.Assert(ok, Equals, false)
}
func (self *PermissionsSuite) TestAuthorizeChangeDbUserPermissions(c *C) {
var ok bool
var err common.AuthorizationError
authErr := common.NewAuthorizationError("Insufficient permissions to change db user permissions on db")
ok, err = self.permissions.AuthorizeChangeDbUserPermissions(self.commonUser, "db")
c.Assert(ok, Equals, false)
c.Assert(err, Equals, authErr)
ok, _ = self.permissions.AuthorizeChangeDbUserPermissions(self.dbAdmin, "db")
c.Assert(ok, Equals, true)
ok, _ = self.permissions.AuthorizeChangeDbUserPermissions(self.clusterAdmin, "db")
c.Assert(ok, Equals, true)
}
func (self *PermissionsSuite) TestAuthorizeGrantDbUserAdmin(c *C) {
var ok bool
var err common.AuthorizationError
authErr := common.NewAuthorizationError("Insufficient permissions to grant db user admin privileges on db")
ok, err = self.permissions.AuthorizeGrantDbUserAdmin(self.commonUser, "db")
c.Assert(ok, Equals, false)
c.Assert(err, Equals, authErr)
ok, _ = self.permissions.AuthorizeGrantDbUserAdmin(self.dbAdmin, "db")
c.Assert(ok, Equals, true)
ok, _ = self.permissions.AuthorizeGrantDbUserAdmin(self.clusterAdmin, "db")
c.Assert(ok, Equals, true)
}