Fix #495. Enforce write permissions correctly

pull/505/head
John Shahid 2014-05-02 17:52:51 -04:00
parent 75741e9248
commit 1641d26414
4 changed files with 83 additions and 15 deletions

View File

@ -11,6 +11,7 @@
- [Issue #431](https://github.com/influxdb/influxdb/issues/431). Don't log to standard output if a log file is specified in the config file - [Issue #431](https://github.com/influxdb/influxdb/issues/431). Don't log to standard output if a log file is specified in the config file
- [Issue #483](https://github.com/influxdb/influxdb/issues/483). Return 409 if a database already exist (Thanks, Edward Muller) - [Issue #483](https://github.com/influxdb/influxdb/issues/483). Return 409 if a database already exist (Thanks, Edward Muller)
- [Issue #490](https://github.com/influxdb/influxdb/issues/490). Database user password's cannot be changed (Thanks, Edward Muller) - [Issue #490](https://github.com/influxdb/influxdb/issues/490). Database user password's cannot be changed (Thanks, Edward Muller)
- [Issue #495](https://github.com/influxdb/influxdb/issues/495). Enforce write permissions properly
## v0.5.12 [2014-04-29] ## v0.5.12 [2014-04-29]

View File

@ -99,8 +99,8 @@ func (self *ClusterAdmin) HasReadAccess(_ string) bool {
type DbUser struct { type DbUser struct {
CommonUser `json:"common"` CommonUser `json:"common"`
Db string `json:"db"` Db string `json:"db"`
WriteTo []*Matcher `json:"write_matchers"`
ReadFrom []*Matcher `json:"read_matchers"` ReadFrom []*Matcher `json:"read_matchers"`
WriteTo []*Matcher `json:"write_matchers"`
IsAdmin bool `json:"is_admin"` IsAdmin bool `json:"is_admin"`
} }

View File

@ -467,8 +467,12 @@ func (self *CoordinatorImpl) ForceCompaction(user common.User) error {
} }
func (self *CoordinatorImpl) WriteSeriesData(user common.User, db string, series []*protocol.Series) error { func (self *CoordinatorImpl) WriteSeriesData(user common.User, db string, series []*protocol.Series) error {
if !user.HasWriteAccess(db) { for _, s := range series {
return common.NewAuthorizationError("Insufficient permissions to write to %s", db) seriesName := s.GetName()
if user.HasWriteAccess(seriesName) {
continue
}
return common.NewAuthorizationError("User %s doesn't have write permissions for %s", user.GetName(), seriesName)
} }
err := self.CommitSeriesData(db, series, false) err := self.CommitSeriesData(db, series, false)
@ -879,7 +883,7 @@ func (self *CoordinatorImpl) CreateDbUser(requester common.User, db, username, p
case 0: case 0:
case 2: case 2:
readMatcher[0].Name = permissions[0] readMatcher[0].Name = permissions[0]
writeMatcher[0].Name = permissions[0] writeMatcher[0].Name = permissions[1]
} }
log.Debug("(raft:%s) Creating user %s:%s", self.raftServer.(*RaftServer).raftServer.Name(), db, username) log.Debug("(raft:%s) Creating user %s:%s", self.raftServer.(*RaftServer).raftServer.Name(), db, username)
return self.raftServer.SaveDbUser(&cluster.DbUser{cluster.CommonUser{ return self.raftServer.SaveDbUser(&cluster.DbUser{cluster.CommonUser{

View File

@ -3,6 +3,7 @@ package integration
import ( import (
"bytes" "bytes"
"crypto/tls" "crypto/tls"
"encoding/json"
"fmt" "fmt"
. "integration/helpers" . "integration/helpers"
"io/ioutil" "io/ioutil"
@ -139,10 +140,69 @@ func (self *SingleServerSuite) TestSingleServerHostnameChange(c *C) {
c.Assert(client.Ping(), IsNil) c.Assert(client.Ping(), IsNil)
} }
func (self *SingleServerSuite) TestUserPermissions(c *C) { func (self *SingleServerSuite) TestUserWritePermissions(c *C) {
client, err := influxdb.NewClient(&influxdb.ClientConfig{}) rootUser := self.server.GetClient("", c)
// create two users one that can only read and one that can only write. both can access test_should_read
// series only
c.Assert(rootUser.CreateDatabaseUser("db1", "limited_user", "pass", "^$", "^$"), IsNil)
config := &influxdb.ClientConfig{
Username: "limited_user",
Password: "pass",
Database: "db1",
}
user, err := influxdb.NewClient(config)
c.Assert(err, IsNil) c.Assert(err, IsNil)
c.Assert(client.CreateDatabaseUser("db1", "limited_user", "pass", "test_should_read", ".*"), IsNil)
data := `
[
{
"points": [
[1]
],
"name": "test_should_write",
"columns": ["value"]
}
]`
invalidData := `
[
{
"points": [
[2]
],
"name": "test_should_not_write",
"columns": ["value"]
}
]`
series := []*influxdb.Series{}
c.Assert(json.Unmarshal([]byte(data), &series), IsNil)
// readUser shouldn't be able to write
c.Assert(user.WriteSeries(series), NotNil)
content := self.server.RunQueryAsRoot("select * from test_should_write", "m", c)
c.Assert(content, HasLen, 0)
rootUser.ChangeDatabaseUser("db1", "limited_user", "pass", false, "^$", "test_should_write")
// write the data to test the write permissions
c.Assert(user.WriteSeries(series), IsNil)
invalidSeries := []*influxdb.Series{}
content = self.server.RunQueryAsRoot("select * from test_should_write", "m", c)
c.Assert(content, HasLen, 1)
c.Assert(json.Unmarshal([]byte(invalidData), &invalidSeries), IsNil)
c.Assert(user.WriteSeries(invalidSeries), NotNil)
content = self.server.RunQueryAsRoot("select * from test_should_not_write", "m", c)
c.Assert(content, HasLen, 0)
rootUser.ChangeDatabaseUser("db1", "limited_user", "pass", false, "^$", "test_.*")
c.Assert(user.WriteSeries(invalidSeries), IsNil)
content = self.server.RunQueryAsRoot("select * from test_should_not_write", "m", c)
c.Assert(content, HasLen, 1)
}
func (self *SingleServerSuite) TestUserReadPermissions(c *C) {
rootUser := self.server.GetClient("", c)
// create two users one that can only read and one that can only write. both can access test_should_read
// series only
c.Assert(rootUser.CreateDatabaseUser("db1", "limited_user2", "pass", "test_should_read", "^$"), IsNil)
data := ` data := `
[ [
@ -163,18 +223,21 @@ func (self *SingleServerSuite) TestUserPermissions(c *C) {
]` ]`
self.server.WriteData(data, c) self.server.WriteData(data, c)
series := self.server.RunQueryAsUser("select value from test_should_read", "s", "limited_user", "pass", true, c) // test all three cases, read from one series that the user has read access to, one that the user doesn't have
c.Assert(series[0].Points, HasLen, 1) // access to and a regex
c.Assert(series[0].Points[0][2], Equals, float64(1)) content := self.server.RunQueryAsUser("select value from test_should_read", "s", "limited_user2", "pass", true, c)
c.Assert(content, HasLen, 1)
_ = self.server.RunQueryAsUser("select value from test_should_not_read", "s", "limited_user", "pass", false, c) c.Assert(content[0].Points, HasLen, 1)
series = self.server.RunQueryAsUser("select value from /.*/", "s", "limited_user", "pass", true, c) c.Assert(content[0].Points[0][2], Equals, float64(1))
// the following query should return an error
_ = self.server.RunQueryAsUser("select value from test_should_not_read", "s", "limited_user2", "pass", false, c)
series := self.server.RunQueryAsUser("select value from /.*/", "s", "limited_user2", "pass", true, c)
c.Assert(series, HasLen, 1) c.Assert(series, HasLen, 1)
c.Assert(series[0].Name, Equals, "test_should_read") c.Assert(series[0].Name, Equals, "test_should_read")
client.UpdateDatabaseUserPermissions("db1", "limited_user", ".*", ".*") rootUser.UpdateDatabaseUserPermissions("db1", "limited_user2", ".*", ".*")
self.server.WaitForServerToSync() self.server.WaitForServerToSync()
series = self.server.RunQueryAsUser("select value from /.*/", "s", "limited_user", "pass", true, c) series = self.server.RunQueryAsUser("select value from /.*/", "s", "limited_user2", "pass", true, c)
c.Assert(series, HasLen, 2) c.Assert(series, HasLen, 2)
} }