Fix #217. Make sure permissions error messages aren't getting swallowed.

pull/269/head
Todd Persen 2014-02-20 15:02:40 -05:00
parent 5d68db9f7f
commit 2b9181f122
4 changed files with 48 additions and 17 deletions

View File

@ -413,10 +413,12 @@ func convertToDataStoreSeries(s *SerializedSeries, precision TimePrecision) (*pr
func errorToStatusCode(err error) int {
switch err.(type) {
case common.AuthenticationError:
return libhttp.StatusUnauthorized // HTTP 401
case common.AuthorizationError:
return libhttp.StatusUnauthorized
return libhttp.StatusForbidden // HTTP 403
default:
return libhttp.StatusBadRequest
return libhttp.StatusBadRequest // HTTP 400
}
}

View File

@ -23,6 +23,16 @@ func NewQueryError(code int, msg string, args ...interface{}) *QueryError {
return &QueryError{code, fmt.Sprintf(msg, args...)}
}
type AuthenticationError string
func (self AuthenticationError) Error() string {
return string(self)
}
func NewAuthenticationError(formatStr string, args ...interface{}) AuthenticationError {
return AuthenticationError(fmt.Sprintf(formatStr, args...))
}
type AuthorizationError string
func (self AuthorizationError) Error() string {

View File

@ -80,7 +80,7 @@ func NewCoordinatorImpl(datastore datastore.Datastore, raftServer ClusterConsens
func (self *CoordinatorImpl) ForceCompaction(user common.User) error {
if !user.IsClusterAdmin() {
return fmt.Errorf("Insufficient permission to force a log compaction")
return fmt.Errorf("Insufficient permissions to force a log compaction")
}
return self.raftServer.ForceLogCompaction()
@ -597,7 +597,7 @@ func (self *CoordinatorImpl) handleReplayRequest(r *protocol.Request, replicatio
}
func (self *CoordinatorImpl) WriteSeriesData(user common.User, db string, series *protocol.Series) error {
if !user.HasWriteAccess(db) {
return common.NewAuthorizationError("Insufficient permission to write to %s", db)
return common.NewAuthorizationError("Insufficient permissions to write to %s", db)
}
if len(series.Points) == 0 {
return fmt.Errorf("Can't write series with zero points.")
@ -713,7 +713,7 @@ func (self *CoordinatorImpl) CommitSeriesData(db string, series *protocol.Series
func (self *CoordinatorImpl) DeleteSeriesData(user common.User, db string, query *parser.DeleteQuery, localOnly bool) error {
if !user.IsDbAdmin(db) {
return common.NewAuthorizationError("Insufficient permission to write to %s", db)
return common.NewAuthorizationError("Insufficient permissions to write to %s", db)
}
if self.clusterConfiguration.IsSingleServer() || localOnly {
@ -915,7 +915,7 @@ func (self *CoordinatorImpl) proxyWrite(clusterServer *ClusterServer, request *p
func (self *CoordinatorImpl) CreateContinuousQuery(user common.User, db string, query string) error {
if !user.IsClusterAdmin() && !user.IsDbAdmin(db) {
return common.NewAuthorizationError("Insufficient permission to create continuous query")
return common.NewAuthorizationError("Insufficient permissions to create continuous query")
}
err := self.raftServer.CreateContinuousQuery(db, query)
@ -927,7 +927,7 @@ func (self *CoordinatorImpl) CreateContinuousQuery(user common.User, db string,
func (self *CoordinatorImpl) DeleteContinuousQuery(user common.User, db string, id uint32) error {
if !user.IsClusterAdmin() && !user.IsDbAdmin(db) {
return common.NewAuthorizationError("Insufficient permission to delete continuous query")
return common.NewAuthorizationError("Insufficient permissions to delete continuous query")
}
err := self.raftServer.DeleteContinuousQuery(db, id)
@ -939,7 +939,7 @@ func (self *CoordinatorImpl) DeleteContinuousQuery(user common.User, db string,
func (self *CoordinatorImpl) ListContinuousQueries(user common.User, db string) ([]*protocol.Series, error) {
if !user.IsClusterAdmin() && !user.IsDbAdmin(db) {
return nil, common.NewAuthorizationError("Insufficient permission to list continuous queries")
return nil, common.NewAuthorizationError("Insufficient permissions to list continuous queries")
}
queries := self.clusterConfiguration.GetContinuousQueries(db)
@ -970,7 +970,7 @@ func (self *CoordinatorImpl) ListContinuousQueries(user common.User, db string)
func (self *CoordinatorImpl) CreateDatabase(user common.User, db string, replicationFactor uint8) error {
if !user.IsClusterAdmin() {
return common.NewAuthorizationError("Insufficient permission to create database")
return common.NewAuthorizationError("Insufficient permissions to create database")
}
if !isValidName(db) {
@ -986,7 +986,7 @@ func (self *CoordinatorImpl) CreateDatabase(user common.User, db string, replica
func (self *CoordinatorImpl) ListDatabases(user common.User) ([]*Database, error) {
if !user.IsClusterAdmin() {
return nil, common.NewAuthorizationError("Insufficient permission to list databases")
return nil, common.NewAuthorizationError("Insufficient permissions to list databases")
}
dbs := self.clusterConfiguration.GetDatabases()
@ -1062,7 +1062,7 @@ func (self *CoordinatorImpl) ListSeries(user common.User, database string) ([]*p
func (self *CoordinatorImpl) DropDatabase(user common.User, db string) error {
if !user.IsClusterAdmin() {
return common.NewAuthorizationError("Insufficient permission to drop database")
return common.NewAuthorizationError("Insufficient permissions to drop database")
}
if self.clusterConfiguration.IsSingleServer() {
@ -1089,7 +1089,7 @@ func (self *CoordinatorImpl) DropDatabase(user common.User, db string) error {
func (self *CoordinatorImpl) DropSeries(user common.User, db, series string) error {
if !user.IsClusterAdmin() && !user.IsDbAdmin(db) && !user.HasWriteAccess(series) {
return common.NewAuthorizationError("Insufficient permission to drop series")
return common.NewAuthorizationError("Insufficient permissions to drop series")
}
if self.clusterConfiguration.IsSingleServer() {
@ -1110,25 +1110,25 @@ func (self *CoordinatorImpl) AuthenticateDbUser(db, username, password string) (
log.Debug("(raft:%s) Authenticating password for %s:%s", self.raftServer.(*RaftServer).raftServer.Name(), db, username)
dbUsers := self.clusterConfiguration.dbUsers[db]
if dbUsers == nil || dbUsers[username] == nil {
return nil, common.NewAuthorizationError("Invalid username/password")
return nil, common.NewAuthenticationError("Invalid username/password")
}
user := dbUsers[username]
if user.isValidPwd(password) {
log.Debug("(raft:%s) User %s authenticated succesfuly", self.raftServer.(*RaftServer).raftServer.Name(), username)
return user, nil
}
return nil, common.NewAuthorizationError("Invalid username/password")
return nil, common.NewAuthenticationError("Invalid username/password")
}
func (self *CoordinatorImpl) AuthenticateClusterAdmin(username, password string) (common.User, error) {
user := self.clusterConfiguration.clusterAdmins[username]
if user == nil {
return nil, common.NewAuthorizationError("Invalid username/password")
return nil, common.NewAuthenticationError("Invalid username/password")
}
if user.isValidPwd(password) {
return user, nil
}
return nil, common.NewAuthorizationError("Invalid username/password")
return nil, common.NewAuthenticationError("Invalid username/password")
}
func (self *CoordinatorImpl) ListClusterAdmins(requester common.User) ([]string, error) {
@ -1206,7 +1206,7 @@ func (self *CoordinatorImpl) CreateDbUser(requester common.User, db, username st
return fmt.Errorf("User %s already exists", username)
}
matchers := []*Matcher{&Matcher{true, ".*"}}
log.Debug("(raft:%s) Creating uesr %s:%s", self.raftServer.(*RaftServer).raftServer.Name(), db, username)
log.Debug("(raft:%s) Creating user %s:%s", self.raftServer.(*RaftServer).raftServer.Name(), db, username)
return self.raftServer.SaveDbUser(&dbUser{CommonUser{Name: username, CacheKey: db + "%" + username}, db, matchers, matchers, false})
}

View File

@ -161,6 +161,22 @@ func (self *ServerProcess) QueryWithUsername(database, query string, onlyLocal b
return ResultsToSeriesCollection(js)
}
func (self *ServerProcess) VerifyForbiddenQuery(database, query string, onlyLocal bool, c *C, username, password string) string {
encodedQuery := url.QueryEscape(query)
fullUrl := fmt.Sprintf("http://localhost:%d/db/%s/series?u=%s&p=%s&q=%s", self.apiPort, database, username, password, encodedQuery)
if onlyLocal {
fullUrl = fullUrl + "&force_local=true"
}
resp, err := http.Get(fullUrl)
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusForbidden)
body, err := ioutil.ReadAll(resp.Body)
c.Assert(err, IsNil)
return string(body)
}
func (self *ServerProcess) Post(url, data string, c *C) *http.Response {
return self.Request("POST", url, data, c)
}
@ -191,6 +207,7 @@ func (self *ServerSuite) SetUpSuite(c *C) {
self.serverProcesses[0].Post("/db/test_rep/users?u=root&p=root", "{\"name\":\"paul\", \"password\":\"pass\", \"isAdmin\": true}", c)
self.serverProcesses[0].Post("/db/single_rep/users?u=root&p=root", "{\"name\":\"paul\", \"password\":\"pass\", \"isAdmin\": true}", c)
self.serverProcesses[0].Post("/db/test_cq/users?u=root&p=root", "{\"name\":\"paul\", \"password\":\"pass\", \"isAdmin\": true}", c)
self.serverProcesses[0].Post("/db/test_cq/users?u=root&p=root", "{\"name\":\"weakpaul\", \"password\":\"pass\", \"isAdmin\": false}", c)
time.Sleep(time.Second)
}
@ -669,6 +686,8 @@ func (self *ServerSuite) TestContinuousQueryManagement(c *C) {
c.Assert(series.Points, HasLen, 0)
self.serverProcesses[0].QueryAsRoot("test_cq", "select * from foo into bar;", false, c)
response := self.serverProcesses[0].VerifyForbiddenQuery("test_cq", "select * from foo into bar;", false, c, "weakpaul", "pass")
c.Assert(response, Equals, "Insufficient permissions to create continuous query")
collection = self.serverProcesses[0].QueryAsRoot("test_cq", "list continuous queries;", false, c)
series = collection.GetSeries("continuous queries", c)