diff --git a/src/cluster/user.go b/src/cluster/user.go index c6f7a3b272..0063487e56 100644 --- a/src/cluster/user.go +++ b/src/cluster/user.go @@ -88,6 +88,10 @@ func (self *ClusterAdmin) IsClusterAdmin() bool { return true } +func (self *ClusterAdmin) IsDbAdmin(_ string) bool { + return true +} + func (self *ClusterAdmin) HasWriteAccess(_ string) bool { return true } diff --git a/src/cluster/user_test.go b/src/cluster/user_test.go index 41b031f7e8..4b15831409 100644 --- a/src/cluster/user_test.go +++ b/src/cluster/user_test.go @@ -26,6 +26,7 @@ func (self *UserSuite) SetUpSuite(c *C) { func (self *UserSuite) TestProperties(c *C) { u := ClusterAdmin{CommonUser{Name: "root"}} c.Assert(u.IsClusterAdmin(), Equals, true) + c.Assert(u.IsDbAdmin("db"), Equals, true) c.Assert(u.GetName(), Equals, "root") hash, err := HashPassword("foobar") c.Assert(err, IsNil) diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index 731e4c1b49..61f8e4fb7f 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -21,6 +21,7 @@ type CoordinatorImpl struct { clusterConfiguration *cluster.ClusterConfiguration raftServer ClusterConsensus config *configuration.Configuration + permissions Permissions } const ( @@ -60,6 +61,7 @@ func NewCoordinatorImpl(config *configuration.Configuration, raftServer ClusterC config: config, clusterConfiguration: clusterConfiguration, raftServer: raftServer, + permissions: Permissions{}, } return coordinator @@ -200,8 +202,8 @@ func (self *CoordinatorImpl) runListSeriesQuery(querySpec *parser.QuerySpec, ser func (self *CoordinatorImpl) runDeleteQuery(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error { user := querySpec.User() db := querySpec.Database() - if !user.IsClusterAdmin() && !user.IsDbAdmin(db) { - return common.NewAuthorizationError("Insufficient permission to write to %s", db) + if ok, err := self.permissions.AuthorizeDeleteQuery(user, db); !ok { + return err } querySpec.RunAgainstAllServersInShard = true return self.runQuerySpec(querySpec, seriesWriter) @@ -211,8 +213,8 @@ func (self *CoordinatorImpl) runDropSeriesQuery(querySpec *parser.QuerySpec, ser user := querySpec.User() db := querySpec.Database() series := querySpec.Query().DropSeriesQuery.GetTableName() - if !user.IsClusterAdmin() && !user.IsDbAdmin(db) && !user.HasWriteAccess(series) { - return common.NewAuthorizationError("Insufficient permissions to drop series") + if ok, err := self.permissions.AuthorizeDropSeries(user, db, series); !ok { + return err } querySpec.RunAgainstAllServersInShard = true return self.runQuerySpec(querySpec, seriesWriter) @@ -727,8 +729,8 @@ func (self *CoordinatorImpl) write(db string, series []*protocol.Series, shard c } func (self *CoordinatorImpl) CreateContinuousQuery(user common.User, db string, query string) error { - if !user.IsClusterAdmin() && !user.IsDbAdmin(db) { - return common.NewAuthorizationError("Insufficient permissions to create continuous query") + if ok, err := self.permissions.AuthorizeCreateContinuousQuery(user, db); !ok { + return err } err := self.raftServer.CreateContinuousQuery(db, query) @@ -739,8 +741,8 @@ func (self *CoordinatorImpl) CreateContinuousQuery(user common.User, db string, } func (self *CoordinatorImpl) DeleteContinuousQuery(user common.User, db string, id uint32) error { - if !user.IsClusterAdmin() && !user.IsDbAdmin(db) { - return common.NewAuthorizationError("Insufficient permissions to delete continuous query") + if ok, err := self.permissions.AuthorizeDeleteContinuousQuery(user, db); !ok { + return err } err := self.raftServer.DeleteContinuousQuery(db, id) @@ -751,8 +753,8 @@ func (self *CoordinatorImpl) DeleteContinuousQuery(user common.User, db string, } func (self *CoordinatorImpl) ListContinuousQueries(user common.User, db string) ([]*protocol.Series, error) { - if !user.IsClusterAdmin() && !user.IsDbAdmin(db) { - return nil, common.NewAuthorizationError("Insufficient permissions to list continuous queries") + if ok, err := self.permissions.AuthorizeListContinuousQueries(user, db); !ok { + return nil, err } queries := self.clusterConfiguration.GetContinuousQueries(db) @@ -778,8 +780,8 @@ func (self *CoordinatorImpl) ListContinuousQueries(user common.User, db string) } func (self *CoordinatorImpl) CreateDatabase(user common.User, db string) error { - if !user.IsClusterAdmin() { - return common.NewAuthorizationError("Insufficient permissions to create database") + if ok, err := self.permissions.AuthorizeCreateDatabase(user); !ok { + return err } if !isValidName(db) { @@ -794,8 +796,8 @@ func (self *CoordinatorImpl) CreateDatabase(user common.User, db string) error { } func (self *CoordinatorImpl) ListDatabases(user common.User) ([]*cluster.Database, error) { - if !user.IsClusterAdmin() { - return nil, common.NewAuthorizationError("Insufficient permissions to list databases") + if ok, err := self.permissions.AuthorizeListDatabases(user); !ok { + return nil, err } dbs := self.clusterConfiguration.GetDatabases() @@ -803,8 +805,8 @@ func (self *CoordinatorImpl) ListDatabases(user common.User) ([]*cluster.Databas } func (self *CoordinatorImpl) DropDatabase(user common.User, db string) error { - if !user.IsClusterAdmin() { - return common.NewAuthorizationError("Insufficient permissions to drop database") + if ok, err := self.permissions.AuthorizeDropDatabase(user); !ok { + return err } if err := self.clusterConfiguration.CreateCheckpoint(); err != nil { @@ -841,16 +843,16 @@ func (self *CoordinatorImpl) AuthenticateClusterAdmin(username, password string) } func (self *CoordinatorImpl) ListClusterAdmins(requester common.User) ([]string, error) { - if !requester.IsClusterAdmin() { - return nil, common.NewAuthorizationError("Insufficient permissions") + if ok, err := self.permissions.AuthorizeListClusterAdmins(requester); !ok { + return nil, err } return self.clusterConfiguration.GetClusterAdmins(), nil } func (self *CoordinatorImpl) CreateClusterAdminUser(requester common.User, username, password string) error { - if !requester.IsClusterAdmin() { - return common.NewAuthorizationError("Insufficient permissions") + if ok, err := self.permissions.AuthorizeCreateClusterAdmin(requester); !ok { + return err } if !isValidName(username) { @@ -870,8 +872,8 @@ func (self *CoordinatorImpl) CreateClusterAdminUser(requester common.User, usern } func (self *CoordinatorImpl) DeleteClusterAdminUser(requester common.User, username string) error { - if !requester.IsClusterAdmin() { - return common.NewAuthorizationError("Insufficient permissions") + if ok, err := self.permissions.AuthorizeDeleteClusterAdmin(requester); !ok { + return err } user := self.clusterConfiguration.GetClusterAdmin(username) @@ -884,8 +886,8 @@ func (self *CoordinatorImpl) DeleteClusterAdminUser(requester common.User, usern } func (self *CoordinatorImpl) ChangeClusterAdminPassword(requester common.User, username, password string) error { - if !requester.IsClusterAdmin() { - return common.NewAuthorizationError("Insufficient permissions") + if ok, err := self.permissions.AuthorizeChangeClusterAdminPassword(requester); !ok { + return err } user := self.clusterConfiguration.GetClusterAdmin(username) @@ -902,8 +904,8 @@ func (self *CoordinatorImpl) ChangeClusterAdminPassword(requester common.User, u } func (self *CoordinatorImpl) CreateDbUser(requester common.User, db, username, password string, permissions ...string) error { - if !requester.IsClusterAdmin() && !requester.IsDbAdmin(db) { - return common.NewAuthorizationError("Insufficient permissions") + if ok, err := self.permissions.AuthorizeCreateDbUser(requester, db); !ok { + return err } if username == "" { @@ -943,8 +945,8 @@ func (self *CoordinatorImpl) CreateDbUser(requester common.User, db, username, p } func (self *CoordinatorImpl) DeleteDbUser(requester common.User, db, username string) error { - if !requester.IsClusterAdmin() && !requester.IsDbAdmin(db) { - return common.NewAuthorizationError("Insufficient permissions") + if ok, err := self.permissions.AuthorizeDeleteDbUser(requester, db); !ok { + return err } user := self.clusterConfiguration.GetDbUser(db, username) @@ -956,16 +958,16 @@ func (self *CoordinatorImpl) DeleteDbUser(requester common.User, db, username st } func (self *CoordinatorImpl) ListDbUsers(requester common.User, db string) ([]common.User, error) { - if !requester.IsClusterAdmin() && !requester.IsDbAdmin(db) { - return nil, common.NewAuthorizationError("Insufficient permissions") + if ok, err := self.permissions.AuthorizeListDbUsers(requester, db); !ok { + return nil, err } return self.clusterConfiguration.GetDbUsers(db), nil } func (self *CoordinatorImpl) GetDbUser(requester common.User, db string, username string) (common.User, error) { - if !requester.IsClusterAdmin() && !requester.IsDbAdmin(db) { - return nil, common.NewAuthorizationError("Insufficient permissions") + if ok, err := self.permissions.AuthorizeGetDbUser(requester, db); !ok { + return nil, err } dbUser := self.clusterConfiguration.GetDbUser(db, username) @@ -977,8 +979,8 @@ func (self *CoordinatorImpl) GetDbUser(requester common.User, db string, usernam } func (self *CoordinatorImpl) ChangeDbUserPassword(requester common.User, db, username, password string) error { - if !requester.IsClusterAdmin() && !requester.IsDbAdmin(db) && !(requester.GetDb() == db && requester.GetName() == username) { - return common.NewAuthorizationError("Insufficient permissions") + if ok, err := self.permissions.AuthorizeChangeDbUserPassword(requester, db, username); !ok { + return err } hash, err := cluster.HashPassword(password) @@ -989,16 +991,16 @@ func (self *CoordinatorImpl) ChangeDbUserPassword(requester common.User, db, use } func (self *CoordinatorImpl) ChangeDbUserPermissions(requester common.User, db, username, readPermissions, writePermissions string) error { - if !requester.IsClusterAdmin() && !requester.IsDbAdmin(db) { - return common.NewAuthorizationError("Insufficient permissions") + if ok, err := self.permissions.AuthorizeChangeDbUserPermissions(requester, db); !ok { + return err } return self.raftServer.ChangeDbUserPermissions(db, username, readPermissions, writePermissions) } func (self *CoordinatorImpl) SetDbAdmin(requester common.User, db, username string, isAdmin bool) error { - if !requester.IsClusterAdmin() && !requester.IsDbAdmin(db) { - return common.NewAuthorizationError("Insufficient permissions") + if ok, err := self.permissions.AuthorizeGrantDbUserAdmin(requester, db); !ok { + return err } user := self.clusterConfiguration.GetDbUser(db, username) diff --git a/src/coordinator/permissions.go b/src/coordinator/permissions.go new file mode 100644 index 0000000000..e78d4d3b83 --- /dev/null +++ b/src/coordinator/permissions.go @@ -0,0 +1,159 @@ +package coordinator + +import ( + "common" +) + +type Permissions struct{} + +func (self *Permissions) AuthorizeDeleteQuery(user common.User, db string) (ok bool, err common.AuthorizationError) { + if !user.IsDbAdmin(db) { + return false, common.NewAuthorizationError("Insufficient permission to write to %s", db) + } + + return true, "" +} + +func (self *Permissions) AuthorizeDropSeries(user common.User, db string, seriesName string) (ok bool, err common.AuthorizationError) { + if !user.IsDbAdmin(db) && !user.HasWriteAccess(seriesName) { + return false, common.NewAuthorizationError("Insufficient permissions to drop series") + } + + return true, "" +} + +func (self *Permissions) AuthorizeCreateContinuousQuery(user common.User, db string) (ok bool, err common.AuthorizationError) { + if !user.IsDbAdmin(db) { + return false, common.NewAuthorizationError("Insufficient permissions to create continuous query") + } + + return true, "" +} + +func (self *Permissions) AuthorizeDeleteContinuousQuery(user common.User, db string) (ok bool, err common.AuthorizationError) { + if !user.IsDbAdmin(db) { + return false, common.NewAuthorizationError("Insufficient permissions to delete continuous query") + } + + return true, "" +} + +func (self *Permissions) AuthorizeListContinuousQueries(user common.User, db string) (ok bool, err common.AuthorizationError) { + if !user.IsDbAdmin(db) { + return false, common.NewAuthorizationError("Insufficient permissions to list continuous queries") + } + + return true, "" +} + +func (self *Permissions) AuthorizeCreateDatabase(user common.User) (ok bool, err common.AuthorizationError) { + if !user.IsClusterAdmin() { + return false, common.NewAuthorizationError("Insufficient permissions to create database") + } + + return true, "" +} + +func (self *Permissions) AuthorizeListDatabases(user common.User) (ok bool, err common.AuthorizationError) { + if !user.IsClusterAdmin() { + return false, common.NewAuthorizationError("Insufficient permissions to list databases") + } + + return true, "" +} + +func (self *Permissions) AuthorizeDropDatabase(user common.User) (ok bool, err common.AuthorizationError) { + if !user.IsClusterAdmin() { + return false, common.NewAuthorizationError("Insufficient permissions to drop database") + } + + return true, "" +} + +func (self *Permissions) AuthorizeListClusterAdmins(user common.User) (ok bool, err common.AuthorizationError) { + if !user.IsClusterAdmin() { + return false, common.NewAuthorizationError("Insufficient permissions to list cluster admins") + } + + return true, "" +} + +func (self *Permissions) AuthorizeCreateClusterAdmin(user common.User) (ok bool, err common.AuthorizationError) { + if !user.IsClusterAdmin() { + return false, common.NewAuthorizationError("Insufficient permissions to create cluster admin") + } + + return true, "" +} + +func (self *Permissions) AuthorizeDeleteClusterAdmin(user common.User) (ok bool, err common.AuthorizationError) { + if !user.IsClusterAdmin() { + return false, common.NewAuthorizationError("Insufficient permissions to delete cluster admin") + } + + return true, "" +} + +func (self *Permissions) AuthorizeChangeClusterAdminPassword(user common.User) (ok bool, err common.AuthorizationError) { + if !user.IsClusterAdmin() { + return false, common.NewAuthorizationError("Insufficient permissions to change cluster admin password") + } + + return true, "" +} + +func (self *Permissions) AuthorizeCreateDbUser(user common.User, db string) (ok bool, err common.AuthorizationError) { + if !user.IsDbAdmin(db) { + return false, common.NewAuthorizationError("Insufficient permissions to create db user on %s", db) + } + + return true, "" +} + +func (self *Permissions) AuthorizeDeleteDbUser(user common.User, db string) (ok bool, err common.AuthorizationError) { + if !user.IsDbAdmin(db) { + return false, common.NewAuthorizationError("Insufficient permissions to delete db user on %s", db) + } + + return true, "" +} + +func (self *Permissions) AuthorizeListDbUsers(user common.User, db string) (ok bool, err common.AuthorizationError) { + if !user.IsDbAdmin(db) { + return false, common.NewAuthorizationError("Insufficient permissions to list db users on %s", db) + } + + return true, "" +} + +func (self *Permissions) AuthorizeGetDbUser(user common.User, db string) (ok bool, err common.AuthorizationError) { + if !user.IsDbAdmin(db) { + return false, common.NewAuthorizationError("Insufficient permissions to get db user on %s", db) + } + + return true, "" +} + +func (self *Permissions) AuthorizeChangeDbUserPassword(user common.User, db string, targetUsername string) (ok bool, err common.AuthorizationError) { + if !user.IsDbAdmin(db) && !(user.GetDb() == db && user.GetName() == targetUsername) { + return false, common.NewAuthorizationError("Insufficient permissions to change db user password for %s on %s", targetUsername, db) + } + + return true, "" +} + +func (self *Permissions) AuthorizeChangeDbUserPermissions(user common.User, db string) (ok bool, err common.AuthorizationError) { + if !user.IsDbAdmin(db) { + return false, common.NewAuthorizationError("Insufficient permissions to change db user permissions on %s", db) + } + + return true, "" +} + +func (self *Permissions) AuthorizeGrantDbUserAdmin(user common.User, db string) (ok bool, err common.AuthorizationError) { + if !user.IsDbAdmin(db) { + return false, common.NewAuthorizationError("Insufficient permissions to grant db user admin privileges on %s", db) + } + + return true, "" +} diff --git a/src/coordinator/permissions_test.go b/src/coordinator/permissions_test.go new file mode 100644 index 0000000000..ffbcc57499 --- /dev/null +++ b/src/coordinator/permissions_test.go @@ -0,0 +1,365 @@ +package coordinator + +import ( + "cluster" + "common" + + . "launchpad.net/gocheck" +) + +type PermissionsSuite struct { + permissions Permissions + commonUser *cluster.DbUser + commonUserNoWrite *cluster.DbUser + dbAdmin *cluster.DbUser + clusterAdmin *cluster.ClusterAdmin +} + +var _ = Suite(&PermissionsSuite{}) + +func (self *PermissionsSuite) SetUpSuite(c *C) { + self.permissions = Permissions{} + self.dbAdmin = &cluster.DbUser{cluster.CommonUser{"db_admin", "", false, "db_admin"}, "db", nil, nil, true} + self.commonUser = &cluster.DbUser{cluster.CommonUser{"common_user", "", false, "common_user"}, "db", []*cluster.Matcher{{true, ".*"}}, []*cluster.Matcher{{true, ".*"}}, false} + self.commonUserNoWrite = &cluster.DbUser{cluster.CommonUser{"no_write_user", "", false, "no_write_user"}, "db", nil, nil, false} + self.clusterAdmin = &cluster.ClusterAdmin{cluster.CommonUser{"root", "", false, "root"}} +} + +func (self *PermissionsSuite) TestAuthorizeDeleteQuery(c *C) { + var ok bool + var err common.AuthorizationError + + authErr := common.NewAuthorizationError("Insufficient permission to write to db") + + ok, err = self.permissions.AuthorizeDeleteQuery(self.commonUser, "db") + c.Assert(ok, Equals, false) + c.Assert(err, Equals, authErr) + + ok, _ = self.permissions.AuthorizeDeleteQuery(self.dbAdmin, "db") + c.Assert(ok, Equals, true) + + ok, _ = self.permissions.AuthorizeDeleteQuery(self.clusterAdmin, "db") + c.Assert(ok, Equals, true) +} + +func (self *PermissionsSuite) TestAuthorizeDropSeries(c *C) { + var ok bool + var err common.AuthorizationError + + authErr := common.NewAuthorizationError("Insufficient permissions to drop series") + + ok, _ = self.permissions.AuthorizeDropSeries(self.dbAdmin, "db", "series") + c.Assert(ok, Equals, true) + + ok, _ = self.permissions.AuthorizeDropSeries(self.clusterAdmin, "db", "series") + c.Assert(ok, Equals, true) + + ok, _ = self.permissions.AuthorizeDropSeries(self.commonUser, "db", "series") + c.Assert(ok, Equals, true) + + ok, err = self.permissions.AuthorizeDropSeries(self.commonUserNoWrite, "db", "series") + c.Assert(ok, Equals, false) + c.Assert(err, Equals, authErr) +} + +func (self *PermissionsSuite) TestAuthorizeCreateContinuousQuery(c *C) { + var ok bool + var err common.AuthorizationError + + authErr := common.NewAuthorizationError("Insufficient permissions to create continuous query") + + ok, err = self.permissions.AuthorizeCreateContinuousQuery(self.commonUser, "db") + c.Assert(ok, Equals, false) + c.Assert(err, Equals, authErr) + + ok, _ = self.permissions.AuthorizeCreateContinuousQuery(self.dbAdmin, "db") + c.Assert(ok, Equals, true) + + ok, _ = self.permissions.AuthorizeCreateContinuousQuery(self.clusterAdmin, "db") + c.Assert(ok, Equals, true) +} + +func (self *PermissionsSuite) TestAuthorizeDeleteContinuousQuery(c *C) { + var ok bool + var err common.AuthorizationError + + authErr := common.NewAuthorizationError("Insufficient permissions to delete continuous query") + + ok, err = self.permissions.AuthorizeDeleteContinuousQuery(self.commonUser, "db") + c.Assert(ok, Equals, false) + c.Assert(err, Equals, authErr) + + ok, _ = self.permissions.AuthorizeDeleteContinuousQuery(self.dbAdmin, "db") + c.Assert(ok, Equals, true) + + ok, _ = self.permissions.AuthorizeDeleteContinuousQuery(self.clusterAdmin, "db") + c.Assert(ok, Equals, true) +} + +func (self *PermissionsSuite) TestAuthorizeListContinuousQueries(c *C) { + var ok bool + var err common.AuthorizationError + + authErr := common.NewAuthorizationError("Insufficient permissions to list continuous queries") + + ok, err = self.permissions.AuthorizeListContinuousQueries(self.commonUser, "db") + c.Assert(ok, Equals, false) + c.Assert(err, Equals, authErr) + + ok, _ = self.permissions.AuthorizeListContinuousQueries(self.dbAdmin, "db") + c.Assert(ok, Equals, true) + + ok, _ = self.permissions.AuthorizeListContinuousQueries(self.clusterAdmin, "db") + c.Assert(ok, Equals, true) +} + +func (self *PermissionsSuite) TestAuthorizeCreateDatabase(c *C) { + var ok bool + var err common.AuthorizationError + + authErr := common.NewAuthorizationError("Insufficient permissions to create database") + + ok, err = self.permissions.AuthorizeCreateDatabase(self.commonUser) + c.Assert(ok, Equals, false) + c.Assert(err, Equals, authErr) + + ok, _ = self.permissions.AuthorizeCreateDatabase(self.dbAdmin) + c.Assert(ok, Equals, false) + c.Assert(err, Equals, authErr) + + ok, _ = self.permissions.AuthorizeCreateDatabase(self.clusterAdmin) + c.Assert(ok, Equals, true) +} + +func (self *PermissionsSuite) TestAuthorizeListDatabases(c *C) { + var ok bool + var err common.AuthorizationError + + authErr := common.NewAuthorizationError("Insufficient permissions to list databases") + + ok, err = self.permissions.AuthorizeListDatabases(self.commonUser) + c.Assert(ok, Equals, false) + c.Assert(err, Equals, authErr) + + ok, _ = self.permissions.AuthorizeListDatabases(self.dbAdmin) + c.Assert(ok, Equals, false) + c.Assert(err, Equals, authErr) + + ok, _ = self.permissions.AuthorizeListDatabases(self.clusterAdmin) + c.Assert(ok, Equals, true) +} + +func (self *PermissionsSuite) TestAuthorizeDropDatabase(c *C) { + var ok bool + var err common.AuthorizationError + + authErr := common.NewAuthorizationError("Insufficient permissions to drop database") + + ok, err = self.permissions.AuthorizeDropDatabase(self.commonUser) + c.Assert(ok, Equals, false) + c.Assert(err, Equals, authErr) + + ok, _ = self.permissions.AuthorizeDropDatabase(self.dbAdmin) + c.Assert(ok, Equals, false) + c.Assert(err, Equals, authErr) + + ok, _ = self.permissions.AuthorizeDropDatabase(self.clusterAdmin) + c.Assert(ok, Equals, true) +} + +func (self *PermissionsSuite) TestAuthorizeListClusterAdmins(c *C) { + var ok bool + var err common.AuthorizationError + + authErr := common.NewAuthorizationError("Insufficient permissions to list cluster admins") + + ok, err = self.permissions.AuthorizeListClusterAdmins(self.commonUser) + c.Assert(ok, Equals, false) + c.Assert(err, Equals, authErr) + + ok, _ = self.permissions.AuthorizeListClusterAdmins(self.dbAdmin) + c.Assert(ok, Equals, false) + c.Assert(err, Equals, authErr) + + ok, _ = self.permissions.AuthorizeListClusterAdmins(self.clusterAdmin) + c.Assert(ok, Equals, true) +} + +func (self *PermissionsSuite) TestAuthorizeCreateClusterAdmin(c *C) { + var ok bool + var err common.AuthorizationError + + authErr := common.NewAuthorizationError("Insufficient permissions to create cluster admin") + + ok, err = self.permissions.AuthorizeCreateClusterAdmin(self.commonUser) + c.Assert(ok, Equals, false) + c.Assert(err, Equals, authErr) + + ok, _ = self.permissions.AuthorizeCreateClusterAdmin(self.dbAdmin) + c.Assert(ok, Equals, false) + c.Assert(err, Equals, authErr) + + ok, _ = self.permissions.AuthorizeCreateClusterAdmin(self.clusterAdmin) + c.Assert(ok, Equals, true) +} + +func (self *PermissionsSuite) TestAuthorizeDeleteClusterAdmin(c *C) { + var ok bool + var err common.AuthorizationError + + authErr := common.NewAuthorizationError("Insufficient permissions to delete cluster admin") + + ok, err = self.permissions.AuthorizeDeleteClusterAdmin(self.commonUser) + c.Assert(ok, Equals, false) + c.Assert(err, Equals, authErr) + + ok, _ = self.permissions.AuthorizeDeleteClusterAdmin(self.dbAdmin) + c.Assert(ok, Equals, false) + c.Assert(err, Equals, authErr) + + ok, _ = self.permissions.AuthorizeDeleteClusterAdmin(self.clusterAdmin) + c.Assert(ok, Equals, true) +} + +func (self *PermissionsSuite) TestAuthorizeChangeClusterAdminPassword(c *C) { + var ok bool + var err common.AuthorizationError + + authErr := common.NewAuthorizationError("Insufficient permissions to change cluster admin password") + + ok, err = self.permissions.AuthorizeChangeClusterAdminPassword(self.commonUser) + c.Assert(ok, Equals, false) + c.Assert(err, Equals, authErr) + + ok, _ = self.permissions.AuthorizeChangeClusterAdminPassword(self.dbAdmin) + c.Assert(ok, Equals, false) + c.Assert(err, Equals, authErr) + + ok, _ = self.permissions.AuthorizeChangeClusterAdminPassword(self.clusterAdmin) + c.Assert(ok, Equals, true) +} + +func (self *PermissionsSuite) TestAuthorizeCreateDbUser(c *C) { + var ok bool + var err common.AuthorizationError + + authErr := common.NewAuthorizationError("Insufficient permissions to create db user on db") + + ok, err = self.permissions.AuthorizeCreateDbUser(self.commonUser, "db") + c.Assert(ok, Equals, false) + c.Assert(err, Equals, authErr) + + ok, _ = self.permissions.AuthorizeCreateDbUser(self.dbAdmin, "db") + c.Assert(ok, Equals, true) + + ok, _ = self.permissions.AuthorizeCreateDbUser(self.clusterAdmin, "db") + c.Assert(ok, Equals, true) +} + +func (self *PermissionsSuite) TestAuthorizeDeleteDbUser(c *C) { + var ok bool + var err common.AuthorizationError + + authErr := common.NewAuthorizationError("Insufficient permissions to delete db user on db") + + ok, err = self.permissions.AuthorizeDeleteDbUser(self.commonUser, "db") + c.Assert(ok, Equals, false) + c.Assert(err, Equals, authErr) + + ok, _ = self.permissions.AuthorizeDeleteDbUser(self.dbAdmin, "db") + c.Assert(ok, Equals, true) + + ok, _ = self.permissions.AuthorizeDeleteDbUser(self.clusterAdmin, "db") + c.Assert(ok, Equals, true) +} + +func (self *PermissionsSuite) TestAuthorizeListDbUsers(c *C) { + var ok bool + var err common.AuthorizationError + + authErr := common.NewAuthorizationError("Insufficient permissions to list db users on db") + + ok, err = self.permissions.AuthorizeListDbUsers(self.commonUser, "db") + c.Assert(ok, Equals, false) + c.Assert(err, Equals, authErr) + + ok, _ = self.permissions.AuthorizeListDbUsers(self.dbAdmin, "db") + c.Assert(ok, Equals, true) + + ok, _ = self.permissions.AuthorizeListDbUsers(self.clusterAdmin, "db") + c.Assert(ok, Equals, true) +} + +func (self *PermissionsSuite) TestAuthorizeGetDbUser(c *C) { + var ok bool + var err common.AuthorizationError + + authErr := common.NewAuthorizationError("Insufficient permissions to get db user on db") + + ok, err = self.permissions.AuthorizeGetDbUser(self.commonUser, "db") + c.Assert(ok, Equals, false) + c.Assert(err, Equals, authErr) + + ok, _ = self.permissions.AuthorizeGetDbUser(self.dbAdmin, "db") + c.Assert(ok, Equals, true) + + ok, _ = self.permissions.AuthorizeGetDbUser(self.clusterAdmin, "db") + c.Assert(ok, Equals, true) +} + +func (self *PermissionsSuite) TestAuthorizeChangeDbUserPassword(c *C) { + var ok bool + var err common.AuthorizationError + + authErr := common.NewAuthorizationError("Insufficient permissions to change db user password for someone on db") + + ok, err = self.permissions.AuthorizeChangeDbUserPassword(self.commonUser, "db", "someone") + c.Assert(ok, Equals, false) + c.Assert(err, Equals, authErr) + + ok, _ = self.permissions.AuthorizeChangeDbUserPassword(self.dbAdmin, "db", "someone") + c.Assert(ok, Equals, true) + + ok, _ = self.permissions.AuthorizeChangeDbUserPassword(self.clusterAdmin, "db", "someone") + c.Assert(ok, Equals, true) + + ok, _ = self.permissions.AuthorizeChangeDbUserPassword(self.commonUser, "db", "common_user") + c.Assert(ok, Equals, true) + + ok, _ = self.permissions.AuthorizeChangeDbUserPassword(self.commonUser, "db", "no_write_user") + c.Assert(ok, Equals, false) +} + +func (self *PermissionsSuite) TestAuthorizeChangeDbUserPermissions(c *C) { + var ok bool + var err common.AuthorizationError + + authErr := common.NewAuthorizationError("Insufficient permissions to change db user permissions on db") + + ok, err = self.permissions.AuthorizeChangeDbUserPermissions(self.commonUser, "db") + c.Assert(ok, Equals, false) + c.Assert(err, Equals, authErr) + + ok, _ = self.permissions.AuthorizeChangeDbUserPermissions(self.dbAdmin, "db") + c.Assert(ok, Equals, true) + + ok, _ = self.permissions.AuthorizeChangeDbUserPermissions(self.clusterAdmin, "db") + c.Assert(ok, Equals, true) +} + +func (self *PermissionsSuite) TestAuthorizeGrantDbUserAdmin(c *C) { + var ok bool + var err common.AuthorizationError + + authErr := common.NewAuthorizationError("Insufficient permissions to grant db user admin privileges on db") + + ok, err = self.permissions.AuthorizeGrantDbUserAdmin(self.commonUser, "db") + c.Assert(ok, Equals, false) + c.Assert(err, Equals, authErr) + + ok, _ = self.permissions.AuthorizeGrantDbUserAdmin(self.dbAdmin, "db") + c.Assert(ok, Equals, true) + + ok, _ = self.permissions.AuthorizeGrantDbUserAdmin(self.clusterAdmin, "db") + c.Assert(ok, Equals, true) +}