Test cluster admin permission to delete data

Fix a bug that gets triggered when admin tries to delete data in a
cluster.
pull/215/head
John Shahid 2014-01-28 15:36:41 -05:00
parent a4e978f138
commit 4abec25949
6 changed files with 96 additions and 24 deletions

View File

@ -87,6 +87,7 @@ func (self *CoordinatorImpl) DistributeQuery(user common.User, db string, query
servers, replicationFactor := self.clusterConfiguration.GetServersToMakeQueryTo(&db)
id := atomic.AddUint32(&self.requestId, uint32(1))
userName := user.GetName()
isDbUser := !user.IsClusterAdmin()
responseChannels := make([]chan *protocol.Response, 0, len(servers)+1)
queryString := query.GetQueryString()
var localServerToQuery *serverToQuery
@ -94,7 +95,7 @@ func (self *CoordinatorImpl) DistributeQuery(user common.User, db string, query
if server.server.Id == self.clusterConfiguration.localServerId {
localServerToQuery = server
} else {
request := &protocol.Request{Type: &queryRequest, Query: &queryString, Id: &id, Database: &db, UserName: &userName}
request := &protocol.Request{Type: &queryRequest, Query: &queryString, Id: &id, Database: &db, UserName: &userName, IsDbUser: &isDbUser}
if server.ringLocationsToQuery != replicationFactor {
r := server.ringLocationsToQuery
request.RingLocationsToQuery = &r
@ -1010,12 +1011,13 @@ func (self *CoordinatorImpl) ListSeries(user common.User, database string) ([]*p
servers, replicationFactor := self.clusterConfiguration.GetServersToMakeQueryTo(&database)
id := atomic.AddUint32(&self.requestId, uint32(1))
userName := user.GetName()
isDbUser := !user.IsClusterAdmin()
responseChannels := make([]chan *protocol.Response, 0, len(servers)+1)
for _, server := range servers {
if server.server.Id == self.clusterConfiguration.localServerId {
continue
}
request := &protocol.Request{Type: &listSeriesRequest, Id: &id, Database: &database, UserName: &userName}
request := &protocol.Request{Type: &listSeriesRequest, Id: &id, Database: &database, UserName: &userName, IsDbUser: &isDbUser}
if server.ringLocationsToQuery != replicationFactor {
r := server.ringLocationsToQuery
request.RingLocationsToQuery = &r

View File

@ -3,9 +3,11 @@ package coordinator
import (
"bytes"
log "code.google.com/p/log4go"
"common"
"datastore"
"encoding/binary"
"errors"
"fmt"
"net"
"parser"
"protocol"
@ -266,15 +268,29 @@ func (self *ProtobufRequestHandler) handleQuery(request *protocol.Request, conn
}
// the query should always parse correctly since it was parsed at the originating server.
query, _ := parser.ParseSelectQuery(*request.Query)
user := self.clusterConfig.GetDbUser(*request.Database, *request.UserName)
var user common.User
if *request.IsDbUser {
user = self.clusterConfig.GetDbUser(*request.Database, *request.UserName)
} else {
user = self.clusterConfig.GetClusterAdmin(*request.UserName)
}
var response *protocol.Response
var ringFilter func(database, series *string, time *int64) bool
if user == nil {
errorMsg := fmt.Sprintf("Cannot find user %s", *request.UserName)
response = &protocol.Response{ErrorMessage: &errorMsg}
goto response
}
if request.RingLocationsToQuery != nil {
ringFilter = self.clusterConfig.GetRingFilterFunction(*request.Database, *request.RingLocationsToQuery)
}
self.db.ExecuteQuery(user, *request.Database, query, assignNextPointTimesAndSend, ringFilter)
response := &protocol.Response{Type: &endStreamResponse, RequestId: request.Id}
response = &protocol.Response{Type: &endStreamResponse, RequestId: request.Id}
response:
self.WriteResponse(conn, response)
}

View File

@ -77,7 +77,15 @@ func (self *Server) WriteData(data interface{}, extraQueryParams ...string) erro
return nil
}
func (self *Server) RunQuery(query string, precision string) ([]byte, error) {
func (self *Server) RunQuery(query, precision string) ([]byte, error) {
return self.RunQueryAsUser(query, precision, "user", "pass")
}
func (self *Server) RunQueryAsRoot(query, precision string) ([]byte, error) {
return self.RunQueryAsUser(query, precision, "root", "root")
}
func (self *Server) RunQueryAsUser(query, precision, username, password string) ([]byte, error) {
encodedQuery := url.QueryEscape(query)
resp, err := http.Get(fmt.Sprintf("http://localhost:8086/db/db1/series?u=user&p=pass&q=%s&time_precision=%s", encodedQuery, precision))
if err != nil {
@ -229,6 +237,35 @@ func (self *IntegrationSuite) TestWriting(c *C) {
self.writeData(c)
}
// Reported by Alex in the following thread
// https://groups.google.com/forum/#!msg/influxdb/I_Ns6xYiMOc/XilTv6BDgHgJ
func (self *IntegrationSuite) TestAdminPermissionToDeleteData(c *C) {
data := `
[{
"points": [
["val1", 2]
],
"name": "test_delete_admin_permission",
"columns": ["val_1", "val_2"]
}]`
c.Assert(self.server.WriteData(data), IsNil)
bs, err := self.server.RunQueryAsRoot("select count(val_1) from test_delete_admin_permission", "s")
c.Assert(err, IsNil)
series := []*h.SerializedSeries{}
err = json.Unmarshal(bs, &series)
c.Assert(err, IsNil)
c.Assert(series[0].Points, HasLen, 1)
c.Assert(series[0].Points[0][1], Equals, float64(1))
_, err = self.server.RunQueryAsRoot("delete from test_delete_admin_permission", "s")
c.Assert(err, IsNil)
bs, err = self.server.RunQueryAsRoot("select count(val_1) from test_delete_admin_permission", "s")
c.Assert(err, IsNil)
err = json.Unmarshal(bs, &series)
c.Assert(err, IsNil)
c.Assert(series, HasLen, 0)
}
func (self *IntegrationSuite) TestMedians(c *C) {
for i := 0; i < 3; i++ {
err := self.server.WriteData(fmt.Sprintf(`

View File

@ -133,29 +133,16 @@ type Point struct {
}
func (self *ServerProcess) Query(database, query string, onlyLocal bool, c *C) *SeriesCollection {
encodedQuery := url.QueryEscape(query)
fullUrl := fmt.Sprintf("http://localhost:%d/db/%s/series?u=paul&p=pass&q=%s", self.apiPort, database, encodedQuery)
if onlyLocal {
fullUrl = fullUrl + "&force_local=true"
}
resp, err := http.Get(fullUrl)
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusOK)
body, err := ioutil.ReadAll(resp.Body)
c.Assert(err, IsNil)
var js []interface{}
err = json.Unmarshal(body, &js)
if err != nil {
fmt.Println("NOT JSON: ", string(body))
}
c.Assert(err, IsNil)
return ResultsToSeriesCollection(js)
return self.QueryWithUsername(database, query, onlyLocal, c, "paul", "pass")
}
func (self *ServerProcess) QueryAsRoot(database, query string, onlyLocal bool, c *C) *SeriesCollection {
return self.QueryWithUsername(database, query, onlyLocal, c, "root", "root")
}
func (self *ServerProcess) QueryWithUsername(database, query string, onlyLocal bool, c *C, username, password string) *SeriesCollection {
encodedQuery := url.QueryEscape(query)
fullUrl := fmt.Sprintf("http://localhost:%d/db/%s/series?u=root&p=root&q=%s", self.apiPort, database, encodedQuery)
fullUrl := fmt.Sprintf("http://localhost:%d/db/%s/series?u=%s&p=%s&q=%s", self.apiPort, database, username, password, encodedQuery)
if onlyLocal {
fullUrl = fullUrl + "&force_local=true"
}
@ -382,6 +369,27 @@ func (self *ServerSuite) TestDeleteReplication(c *C) {
c.Assert(collection.Members, HasLen, 0)
}
// Reported by Alex in the following thread
// https://groups.google.com/forum/#!msg/influxdb/I_Ns6xYiMOc/XilTv6BDgHgJ
func (self *ServerSuite) TestAdminPermissionToDeleteData(c *C) {
data := `
[{
"points": [
["val1", 2]
],
"name": "test_delete_admin_permission",
"columns": ["val_1", "val_2"]
}]`
self.serverProcesses[0].Post("/db/test_rep/series?u=paul&p=pass", data, c)
collection := self.serverProcesses[0].QueryAsRoot("test_rep", "select count(val_1) from test_delete_admin_permission", false, c)
series := collection.GetSeries("test_delete_admin_permission", c)
c.Assert(series.GetValueForPointAndColumn(0, "count", c), Equals, float64(1))
self.serverProcesses[0].Query("test_rep", "delete from test_delete_admin_permission", false, c)
collection = self.serverProcesses[0].Query("test_rep", "select count(val_1) from test_delete_admin_permission", false, c)
c.Assert(collection.Members, HasLen, 0)
}
func (self *ServerSuite) TestListSeries(c *C) {
self.serverProcesses[0].Post("/db?u=root&p=root", `{"name": "list_series", "replicationFactor": 2}`, c)
self.serverProcesses[0].Post("/db/list_series/users?u=root&p=root", `{"name": "paul", "password": "pass"}`, c)

View File

@ -311,6 +311,7 @@ type Request struct {
ReplicationFactor *uint32 `protobuf:"varint,16,opt,name=replication_factor" json:"replication_factor,omitempty"`
OwnerServerId *uint32 `protobuf:"varint,17,opt,name=owner_server_id" json:"owner_server_id,omitempty"`
LastKnownSequenceNumber *uint64 `protobuf:"varint,18,opt,name=last_known_sequence_number" json:"last_known_sequence_number,omitempty"`
IsDbUser *bool `protobuf:"varint,19,opt,name=is_db_user" json:"is_db_user,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -409,6 +410,13 @@ func (m *Request) GetLastKnownSequenceNumber() uint64 {
return 0
}
func (m *Request) GetIsDbUser() bool {
if m != nil && m.IsDbUser != nil {
return *m.IsDbUser
}
return false
}
type Response struct {
Type *Response_Type `protobuf:"varint,1,req,name=type,enum=protocol.Response_Type" json:"type,omitempty"`
RequestId *uint32 `protobuf:"varint,2,req,name=request_id" json:"request_id,omitempty"`

View File

@ -64,6 +64,7 @@ message Request {
optional uint32 replication_factor = 16;
optional uint32 owner_server_id = 17;
optional uint64 last_known_sequence_number = 18;
optional bool is_db_user = 19;
}
message Response {