fixes #258, fixes #250, fixes #216, fixes #166, fixes #165, fixes #164, fixes #132, fixes #103, fixes #65

Merge branch 'master' into create-cluster-interfaces-and-refactor

Conflicts:
	src/api/http/api.go
	src/api/http/api_test.go
	src/coordinator/cluster_configuration.go
pull/269/head
John Shahid 2014-02-25 16:42:37 -05:00
commit d0839e2b17
14 changed files with 287 additions and 31 deletions

View File

@ -225,8 +225,19 @@
- Make the leveldb max open files configurable in the toml file
## v0.4.5 [unreleased]
## v0.5.0 [unreleased]
### Bugfixes
- Ensure large deletes don't take too much memory
- [Issue #240](https://github.com/influxdb/influxdb/pull/240). Unable to query against columns with `.` in the name.
- [Issue #189](https://github.com/influxdb/influxdb/issues/189). Deprecate more field names that were missed in 0.4.0
### Features
- [Issue #243](https://github.com/influxdb/influxdb/
issues/243). Should have endpoint to GET a user's attributes.
### Deprecated
- `/cluster_admins` and `/db/:db/users` return usernames in a `name` key instead of `username` key.

View File

@ -161,7 +161,7 @@ endif
timeout = 10m
GOTEST_OPTS += -test.timeout=$(timeout)
test: test_dependencies parser
test: test_dependencies parser protobuf
$(GO) test $(packages) $(GOTEST_OPTS)
coverage: test_dependencies

View File

@ -1 +0,0 @@
0.0.2.dev

View File

@ -111,6 +111,7 @@ func (self *HttpServer) Serve(listener net.Listener) {
self.registerEndpoint(p, "get", "/db/:db/authenticate", self.authenticateDbUser)
self.registerEndpoint(p, "get", "/db/:db/users", self.listDbUsers)
self.registerEndpoint(p, "post", "/db/:db/users", self.createDbUser)
self.registerEndpoint(p, "get", "/db/:db/users/:user", self.showDbUser)
self.registerEndpoint(p, "del", "/db/:db/users/:user", self.deleteDbUser)
self.registerEndpoint(p, "post", "/db/:db/users/:user", self.updateDbUser)
@ -533,6 +534,11 @@ type ApiUser struct {
Name string `json:"username"`
}
type UserDetail struct {
Name string `json:"name"`
IsAdmin bool `json:"isAdmin"`
}
type NewContinuousQuery struct {
Query string `json:"query"`
}
@ -682,19 +688,35 @@ func (self *HttpServer) listDbUsers(w libhttp.ResponseWriter, r *libhttp.Request
db := r.URL.Query().Get(":db")
self.tryAsDbUserAndClusterAdmin(w, r, func(u User) (int, interface{}) {
names, err := self.userManager.ListDbUsers(u, db)
dbUsers, err := self.userManager.ListDbUsers(u, db)
if err != nil {
return errorToStatusCode(err), err.Error()
}
users := make([]*ApiUser, 0, len(names))
for _, name := range names {
users = append(users, &ApiUser{name})
users := make([]*UserDetail, 0, len(dbUsers))
for _, dbUser := range dbUsers {
users = append(users, &UserDetail{dbUser.GetName(), dbUser.IsDbAdmin(db)})
}
return libhttp.StatusOK, users
})
}
func (self *HttpServer) showDbUser(w libhttp.ResponseWriter, r *libhttp.Request) {
db := r.URL.Query().Get(":db")
username := r.URL.Query().Get(":user")
self.tryAsDbUserAndClusterAdmin(w, r, func(u User) (int, interface{}) {
user, err := self.userManager.GetDbUser(u, db, username)
if err != nil {
return errorToStatusCode(err), err.Error()
}
userDetail := &UserDetail{user.GetName(), user.IsDbAdmin(db)}
return libhttp.StatusOK, userDetail
})
}
func (self *HttpServer) createDbUser(w libhttp.ResponseWriter, r *libhttp.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {

View File

@ -175,9 +175,10 @@ func (self *ApiSuite) SetUpSuite(c *C) {
},
},
}
self.manager = &MockUserManager{
clusterAdmins: []string{"root"},
dbUsers: map[string][]string{"db1": []string{"db_user1"}},
dbUsers: map[string]map[string]MockDbUser{"db1": map[string]MockDbUser{"db_user1": {Name: "db_user1", IsAdmin: false}}},
}
dir := c.MkDir()
self.server = NewHttpServer("", dir, self.coordinator, self.manager, nil, nil)
@ -734,10 +735,25 @@ func (self *ApiSuite) TestDbUsersIndex(c *C) {
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
c.Assert(err, IsNil)
users := []*ApiUser{}
users := []*UserDetail{}
err = json.Unmarshal(body, &users)
c.Assert(err, IsNil)
c.Assert(users, DeepEquals, []*ApiUser{&ApiUser{"db_user1"}})
c.Assert(users, HasLen, 1)
c.Assert(users[0], DeepEquals, &UserDetail{"db_user1", false})
}
func (self *ApiSuite) TestDbUserShow(c *C) {
url := self.formatUrl("/db/db1/users/db_user1?u=root&p=root")
resp, err := libhttp.Get(url)
c.Assert(err, IsNil)
c.Assert(resp.Header.Get("content-type"), Equals, "application/json")
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
c.Assert(err, IsNil)
userDetail := &UserDetail{}
err = json.Unmarshal(body, &userDetail)
c.Assert(err, IsNil)
c.Assert(userDetail, DeepEquals, &UserDetail{"db_user1", false})
}
func (self *ApiSuite) TestDatabasesIndex(c *C) {
@ -749,10 +765,12 @@ func (self *ApiSuite) TestDatabasesIndex(c *C) {
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
c.Assert(err, IsNil)
users := []*cluster.Database{}
err = json.Unmarshal(body, &users)
databases := []*cluster.Database{}
err = json.Unmarshal(body, &databases)
c.Assert(err, IsNil)
c.Assert(users, DeepEquals, []*cluster.Database{&cluster.Database{"db1", uint8(1)}, &cluster.Database{"db2", uint8(1)}})
err = json.Unmarshal(body, &databases)
c.Assert(err, IsNil)
c.Assert(databases, DeepEquals, []*cluster.Database{&cluster.Database{"db1", uint8(1)}, &cluster.Database{"db2", uint8(1)}})
}
}
@ -767,10 +785,11 @@ func (self *ApiSuite) TestBasicAuthentication(c *C) {
body, err := ioutil.ReadAll(resp.Body)
c.Assert(err, IsNil)
c.Assert(resp.StatusCode, Equals, libhttp.StatusOK)
users := []*cluster.Database{}
err = json.Unmarshal(body, &users)
databases := []*cluster.Database{}
c.Assert(err, IsNil)
c.Assert(users, DeepEquals, []*cluster.Database{&cluster.Database{"db1", 1}, &cluster.Database{"db2", 1}})
err = json.Unmarshal(body, &databases)
c.Assert(err, IsNil)
c.Assert(databases, DeepEquals, []*cluster.Database{&cluster.Database{"db1", 1}, &cluster.Database{"db2", 1}})
}
func (self *ApiSuite) TestContinuousQueryOperations(c *C) {

View File

@ -12,8 +12,41 @@ type Operation struct {
isAdmin bool
}
type MockDbUser struct {
Name string
IsAdmin bool
}
func (self MockDbUser) GetName() string {
return self.Name
}
func (self MockDbUser) IsDeleted() bool {
return false
}
func (self MockDbUser) IsClusterAdmin() bool {
return false
}
func (self MockDbUser) IsDbAdmin(_ string) bool {
return self.IsAdmin
}
func (self MockDbUser) GetDb() string {
return ""
}
func (self MockDbUser) HasWriteAccess(_ string) bool {
return true
}
func (self MockDbUser) HasReadAccess(_ string) bool {
return true
}
type MockUserManager struct {
dbUsers map[string][]string
dbUsers map[string]map[string]MockDbUser
clusterAdmins []string
ops []*Operation
}
@ -29,6 +62,7 @@ func (self *MockUserManager) AuthenticateDbUser(db, username, password string) (
return nil, nil
}
func (self *MockUserManager) AuthenticateClusterAdmin(username, password string) (common.User, error) {
if username == "fail_auth" {
return nil, fmt.Errorf("Invalid username/password")
@ -40,6 +74,7 @@ func (self *MockUserManager) AuthenticateClusterAdmin(username, password string)
return nil, nil
}
func (self *MockUserManager) CreateClusterAdminUser(request common.User, username string) error {
if username == "" {
return fmt.Errorf("Invalid empty username")
@ -48,14 +83,17 @@ func (self *MockUserManager) CreateClusterAdminUser(request common.User, usernam
self.ops = append(self.ops, &Operation{"cluster_admin_add", username, "", false})
return nil
}
func (self *MockUserManager) DeleteClusterAdminUser(requester common.User, username string) error {
self.ops = append(self.ops, &Operation{"cluster_admin_del", username, "", false})
return nil
}
func (self *MockUserManager) ChangeClusterAdminPassword(requester common.User, username, password string) error {
self.ops = append(self.ops, &Operation{"cluster_admin_passwd", username, password, false})
return nil
}
func (self *MockUserManager) CreateDbUser(request common.User, db, username string) error {
if username == "" {
return fmt.Errorf("Invalid empty username")
@ -64,21 +102,41 @@ func (self *MockUserManager) CreateDbUser(request common.User, db, username stri
self.ops = append(self.ops, &Operation{"db_user_add", username, "", false})
return nil
}
func (self *MockUserManager) DeleteDbUser(requester common.User, db, username string) error {
self.ops = append(self.ops, &Operation{"db_user_del", username, "", false})
return nil
}
func (self *MockUserManager) ChangeDbUserPassword(requester common.User, db, username, password string) error {
self.ops = append(self.ops, &Operation{"db_user_passwd", username, password, false})
return nil
}
func (self *MockUserManager) SetDbAdmin(requester common.User, db, username string, isAdmin bool) error {
self.ops = append(self.ops, &Operation{"db_user_admin", username, "", isAdmin})
return nil
}
func (self *MockUserManager) ListClusterAdmins(requester common.User) ([]string, error) {
return self.clusterAdmins, nil
}
func (self *MockUserManager) ListDbUsers(requester common.User, db string) ([]string, error) {
return self.dbUsers[db], nil
func (self *MockUserManager) ListDbUsers(requester common.User, db string) ([]common.User, error) {
dbUsers := self.dbUsers[db]
users := make([]common.User, 0, len(dbUsers))
for _, user := range dbUsers {
users = append(users, user)
}
return users, nil
}
func (self *MockUserManager) GetDbUser(requester common.User, db, username string) (common.User, error) {
dbUsers := self.dbUsers[db]
if dbUser, ok := dbUsers[username]; ok {
return MockDbUser{Name: dbUser.GetName(), IsAdmin: dbUser.IsDbAdmin(db)}, nil
} else {
return nil, fmt.Errorf("'%s' is not a valid username for database '%s'", username, db)
}
}

View File

@ -328,15 +328,17 @@ func (self *ClusterConfiguration) GetContinuousQueries(db string) []*ContinuousQ
return self.continuousQueries[db]
}
func (self *ClusterConfiguration) GetDbUsers(db string) (names []string) {
func (self *ClusterConfiguration) GetDbUsers(db string) []common.User {
self.usersLock.RLock()
defer self.usersLock.RUnlock()
dbUsers := self.dbUsers[db]
users := make([]common.User, 0, len(dbUsers))
for name, _ := range dbUsers {
names = append(names, name)
dbUser := dbUsers[name]
users = append(users, dbUser)
}
return
return users
}
func (self *ClusterConfiguration) GetDbUser(db, username string) *DbUser {

View File

@ -626,13 +626,13 @@ func (self *CoordinatorImpl) DeleteDbUser(requester common.User, db, username st
user := self.clusterConfiguration.GetDbUser(db, username)
if user == nil {
return fmt.Errorf("User %s doesn't exists", username)
return fmt.Errorf("User %s doesn't exist", username)
}
user.CommonUser.IsUserDeleted = true
return self.raftServer.SaveDbUser(user)
}
func (self *CoordinatorImpl) ListDbUsers(requester common.User, db string) ([]string, error) {
func (self *CoordinatorImpl) ListDbUsers(requester common.User, db string) ([]common.User, error) {
if !requester.IsClusterAdmin() && !requester.IsDbAdmin(db) {
return nil, common.NewAuthorizationError("Insufficient permissions")
}
@ -640,6 +640,19 @@ func (self *CoordinatorImpl) ListDbUsers(requester common.User, db string) ([]st
return self.clusterConfiguration.GetDbUsers(db), nil
}
func (self *CoordinatorImpl) GetDbUser(requester common.User, db string, username string) (common.User, error) {
if !requester.IsClusterAdmin() && !requester.IsDbAdmin(db) {
return nil, common.NewAuthorizationError("Insufficient permissions")
}
dbUser := self.clusterConfiguration.GetDbUser(db, username)
if dbUser == nil {
return nil, fmt.Errorf("Invalid username %s", username)
}
return dbUser, nil
}
func (self *CoordinatorImpl) ChangeDbUserPassword(requester common.User, db, username, password string) error {
if !requester.IsClusterAdmin() && !requester.IsDbAdmin(db) && !(requester.GetDb() == db && requester.GetName() == username) {
return common.NewAuthorizationError("Insufficient permissions")

View File

@ -377,6 +377,17 @@ func (self *CoordinatorSuite) TestAdminOperations(c *C) {
c.Assert(u.IsClusterAdmin(), Equals, false)
c.Assert(u.IsDbAdmin("db1"), Equals, false)
// can get properties of db users
dbUser, err := coordinator.GetDbUser(root, "db1", "db_user")
c.Assert(err, IsNil)
c.Assert(dbUser, NotNil)
c.Assert(dbUser.GetName(), Equals, "db_user")
c.Assert(dbUser.IsDbAdmin("db1"), Equals, false)
dbUser, err = coordinator.GetDbUser(root, "db1", "invalid_user")
c.Assert(err, NotNil)
c.Assert(err, ErrorMatches, "Invalid username invalid_user")
// can make db users db admins
c.Assert(coordinator.SetDbAdmin(root, "db1", "db_user", true), IsNil)
u, err = coordinator.AuthenticateDbUser("db1", "db_user", "db_pass")
@ -386,7 +397,9 @@ func (self *CoordinatorSuite) TestAdminOperations(c *C) {
// can list db users
dbUsers, err := coordinator.ListDbUsers(root, "db1")
c.Assert(err, IsNil)
c.Assert(dbUsers, DeepEquals, []string{"db_user"})
c.Assert(dbUsers, HasLen, 1)
c.Assert(dbUsers[0].GetName(), Equals, "db_user")
c.Assert(dbUsers[0].IsDbAdmin("db1"), Equals, true)
// can delete cluster admins and db users
c.Assert(coordinator.DeleteDbUser(root, "db1", "db_user"), IsNil)
@ -486,11 +499,10 @@ func (self *CoordinatorSuite) TestDbAdminOperations(c *C) {
// can get db users
admins, err := coordinator.ListDbUsers(dbUser, "db1")
c.Assert(err, IsNil)
adminsSet := map[string]bool{}
for _, admin := range admins {
adminsSet[admin] = true
}
c.Assert(adminsSet, DeepEquals, map[string]bool{"db_user": true, "db_user2": true})
c.Assert(admins[0].GetName(), Equals, "db_user")
c.Assert(admins[0].IsDbAdmin("db1"), Equals, true)
c.Assert(admins[1].GetName(), Equals, "db_user2")
c.Assert(admins[1].IsDbAdmin("db1"), Equals, false)
// cannot create db users for a different db
c.Assert(coordinator.CreateDbUser(dbUser, "db2", "db_user"), NotNil)

View File

@ -50,7 +50,8 @@ type UserManager interface {
// Change db user's password. It's an error if requester isn't a cluster admin or db admin
ChangeDbUserPassword(requester common.User, db, username, password string) error
// list cluster admins. only a cluster admin or the db admin can list the db users
ListDbUsers(requester common.User, db string) ([]string, error)
ListDbUsers(requester common.User, db string) ([]common.User, error)
GetDbUser(requester common.User, db, username string) (common.User, error)
// make user a db admin for 'db'. It's an error if the requester
// isn't a db admin or cluster admin or if user isn't a db user
// for the given db

View File

@ -621,12 +621,23 @@ func (self *LevelDbDatastore) deleteRangeOfSeriesCommon(database, series string,
}
}
}
count := 0
for it = it; it.Valid(); it.Next() {
k := it.Key()
if len(k) < 16 || !bytes.Equal(k[:8], field.Id) || bytes.Compare(k[8:16], endTimeBytes) == 1 {
break
}
wb.Delete(k)
count++
// delete every one million keys which is approximately 24 megabytes
if count == ONE_MEGABYTE {
err = self.db.Write(self.writeOptions, wb)
if err != nil {
return err
}
wb.Clear()
count = 0
}
endKey = k
}
err = self.db.Write(self.writeOptions, wb)

View File

@ -906,6 +906,43 @@ func (self *IntegrationSuite) TestDeleteQuery(c *C) {
}
}
func (self *IntegrationSuite) TestLargeDeletes(c *C) {
numberOfPoints := 2 * 1024 * 1024
points := []interface{}{}
for i := 0; i < numberOfPoints; i++ {
points = append(points, []interface{}{i})
}
pointsString, _ := json.Marshal(points)
err := self.server.WriteData(fmt.Sprintf(`
[
{
"name": "test_large_deletes",
"columns": ["val1"],
"points":%s
}
]`, string(pointsString)))
c.Assert(err, IsNil)
bs, err := self.server.RunQuery("select count(val1) from test_large_deletes", "m")
c.Assert(err, IsNil)
data := []*h.SerializedSeries{}
err = json.Unmarshal(bs, &data)
c.Assert(data, HasLen, 1)
c.Assert(data[0].Points, HasLen, 1)
c.Assert(data[0].Points[0][1], Equals, float64(numberOfPoints))
query := "delete from test_large_deletes"
_, err = self.server.RunQuery(query, "m")
c.Assert(err, IsNil)
// this shouldn't return any data
bs, err = self.server.RunQuery("select count(val1) from test_large_deletes", "m")
c.Assert(err, IsNil)
data = []*h.SerializedSeries{}
err = json.Unmarshal(bs, &data)
c.Assert(err, IsNil)
c.Assert(data, HasLen, 0)
}
func (self *IntegrationSuite) TestReading(c *C) {
if !*benchmark {
c.Skip("Benchmarking is disabled")
@ -938,6 +975,41 @@ func (self *IntegrationSuite) TestReading(c *C) {
}
}
func (self *IntegrationSuite) TestReadingWhenColumnHasDot(c *C) {
err := self.server.WriteData(`
[
{
"name": "test_column_names_with_dots",
"columns": ["first.name", "last.name"],
"points": [["paul", "dix"], ["john", "shahid"]]
}
]`)
c.Assert(err, IsNil)
for name, expected := range map[string]map[string]bool{
"first.name": map[string]bool{"paul": true, "john": true},
"last.name": map[string]bool{"dix": true, "shahid": true},
} {
q := fmt.Sprintf("select %s from test_column_names_with_dots", name)
bs, err := self.server.RunQuery(q, "m")
c.Assert(err, IsNil)
data := []*h.SerializedSeries{}
err = json.Unmarshal(bs, &data)
c.Assert(err, IsNil)
c.Assert(data, HasLen, 1)
c.Assert(data[0].Columns, HasLen, 3) // time, sequence number and the requested columns
c.Assert(data[0].Columns[2], Equals, name)
names := map[string]bool{}
for _, p := range data[0].Points {
names[p[2].(string)] = true
}
c.Assert(names, DeepEquals, expected)
}
}
func (self *IntegrationSuite) TestSinglePointSelect(c *C) {
err := self.server.WriteData(`
[

View File

@ -73,6 +73,24 @@ func (self *QueryParserSuite) TestParseDeleteQueryWithEndTime(c *C) {
c.Assert(q.GetEndTime(), Equals, time.Unix(1389040522, 0).UTC())
}
func (self *QueryParserSuite) TestParseSelectQueryWithDotInColumnName(c *C) {
query := "select patient.first.name from foo"
queries, err := ParseQuery(query)
c.Assert(err, IsNil)
c.Assert(queries, HasLen, 1)
_q := queries[0]
c.Assert(_q.SelectQuery, NotNil)
q := _q.SelectQuery
for _, columns := range q.GetReferencedColumns() {
c.Assert(columns, DeepEquals, []string{"patient.first.name"})
}
}
func (self *QueryParserSuite) TestParseDropSeries(c *C) {
query := "drop series foobar"
queries, err := ParseQuery(query)

View File

@ -168,6 +168,24 @@ func (self *SelectQuery) GetReferencedColumns() map[*Value][]string {
delete(mapping, name)
}
if len(mapping) == 0 {
return returnedMapping
}
// if `mapping` still have some mappings, then we have mistaken a
// column name with dots with a prefix.column, see issue #240
for prefix, columnNames := range mapping {
for _, columnName := range columnNames {
for table, columns := range returnedMapping {
if len(returnedMapping[table]) > 1 && returnedMapping[table][0] == "*" {
continue
}
returnedMapping[table] = append(columns, prefix+"."+columnName)
}
}
delete(mapping, prefix)
}
return returnedMapping
}