Refactor user commands.
parent
3a0e1b817c
commit
ec0835c256
32
TODO
32
TODO
|
@ -3,7 +3,37 @@ TODO
|
|||
|
||||
### Uncompleted
|
||||
|
||||
- [ ]
|
||||
- [ ] Server.CreateClusterAdmin()
|
||||
- [ ] Server.DeleteClusterAdmin()
|
||||
- [ ] Server.ClusterAdmin()
|
||||
- [ ] Server.ClusterAdmins()
|
||||
|
||||
- [ ] ClusterAdmin.SetPassword()
|
||||
|
||||
- [ ] DBUser.SetPassword()
|
||||
- [ ] DBUser.SetPermissions()
|
||||
- [ ] DBUser.SetDBAdmin()
|
||||
|
||||
- [ ] Database.CreateShardSpace()
|
||||
- [ ] Database.DeleteShardSpace()
|
||||
- [ ] Database.UpdateShardSpace()
|
||||
|
||||
- [ ] Database.CreateSeries() (?)
|
||||
- [ ] Database.DropSeries()
|
||||
|
||||
- [ ] Database.CreateShard()
|
||||
- [ ] Database.DeleteShard()
|
||||
|
||||
- [ ] Server.WriteSeriesData()
|
||||
|
||||
- [ ] Server.Query()
|
||||
|
||||
- [ ] Server.CreateContinuousQuery()
|
||||
- [ ] Server.DeleteContinuousQuery()
|
||||
- [ ] Server.ContinuousQueries()
|
||||
|
||||
- [ ] QueryAuthorizer.AuthorizeQuery(q)
|
||||
- [ ] DB.AuthorizeDropSeries(q)
|
||||
|
||||
|
||||
### Completed
|
||||
|
|
|
@ -1,92 +0,0 @@
|
|||
package coordinator
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/cluster"
|
||||
"github.com/influxdb/influxdb/protocol"
|
||||
. "launchpad.net/gocheck"
|
||||
)
|
||||
|
||||
type ClientServerSuite struct{}
|
||||
|
||||
var _ = Suite(&ClientServerSuite{})
|
||||
|
||||
const DB_DIR = "/tmp/influxdb/datastore_test"
|
||||
|
||||
type MockRequestHandler struct {
|
||||
}
|
||||
|
||||
func Test(t *testing.T) {
|
||||
TestingT(t)
|
||||
}
|
||||
|
||||
func stringToSeries(seriesString string, c *C) *protocol.Series {
|
||||
series := &protocol.Series{}
|
||||
err := json.Unmarshal([]byte(seriesString), &series)
|
||||
c.Assert(err, IsNil)
|
||||
return series
|
||||
}
|
||||
|
||||
func (self *MockRequestHandler) HandleRequest(request *protocol.Request, conn net.Conn) error {
|
||||
response := &protocol.Response{RequestId: request.Id, Type: protocol.Response_END_STREAM.Enum()}
|
||||
data, _ := response.Encode()
|
||||
binary.Write(conn, binary.LittleEndian, uint32(len(data)))
|
||||
conn.Write(data)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *ClientServerSuite) TestClientCanMakeRequests(c *C) {
|
||||
requestHandler := &MockRequestHandler{}
|
||||
protobufServer := NewProtobufServer(":8091", requestHandler)
|
||||
go protobufServer.ListenAndServe()
|
||||
c.Assert(protobufServer, Not(IsNil))
|
||||
protobufClient := NewProtobufClient("localhost:8091", 0)
|
||||
protobufClient.Connect()
|
||||
responseStream := make(chan *protocol.Response, 1)
|
||||
|
||||
mock := `
|
||||
{
|
||||
"points": [
|
||||
{ "values": [{"int64_value": 3}]}
|
||||
],
|
||||
"name": "foo",
|
||||
"fields": ["val"]
|
||||
}`
|
||||
fmt.Println("creating series")
|
||||
series := stringToSeries(mock, c)
|
||||
t := time.Now().Unix()
|
||||
s := uint64(1)
|
||||
series.Points[0].Timestamp = &t
|
||||
series.Points[0].SequenceNumber = &s
|
||||
id := uint32(1)
|
||||
database := "pauldb"
|
||||
proxyWrite := protocol.Request_WRITE
|
||||
request := &protocol.Request{Id: &id, Type: &proxyWrite, Database: &database, MultiSeries: []*protocol.Series{series}}
|
||||
|
||||
time.Sleep(time.Second * 1)
|
||||
err := protobufClient.MakeRequest(request, cluster.NewResponseChannelWrapper(responseStream))
|
||||
c.Assert(err, IsNil)
|
||||
timer := time.NewTimer(time.Second)
|
||||
select {
|
||||
case <-timer.C:
|
||||
c.Error("Timed out waiting for response")
|
||||
case response := <-responseStream:
|
||||
c.Assert(*response.Type, Equals, protocol.Response_END_STREAM)
|
||||
}
|
||||
}
|
||||
|
||||
func (self *ClientServerSuite) TestClientReconnectsIfDisconnected(c *C) {
|
||||
}
|
||||
|
||||
func (self *ClientServerSuite) TestServerExecutesReplayRequestIfWriteIsOutOfSequence(c *C) {
|
||||
}
|
||||
|
||||
func (self *ClientServerSuite) TestServerKillsOldHandlerWhenClientReconnects(c *C) {
|
||||
|
||||
}
|
|
@ -1,270 +0,0 @@
|
|||
package coordinator
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
log "code.google.com/p/log4go"
|
||||
"github.com/influxdb/influxdb/_vendor/raft"
|
||||
"github.com/influxdb/influxdb/cluster"
|
||||
"github.com/influxdb/influxdb/protocol"
|
||||
)
|
||||
|
||||
const (
|
||||
SetContinuousQueryTimestampCommandID int = iota + 1
|
||||
)
|
||||
|
||||
type SetContinuousQueryTimestampCommand struct {
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
}
|
||||
|
||||
func (c *SetContinuousQueryTimestampCommand) Apply() (interface{}, error) {
|
||||
config := server.Context().(*cluster.ClusterConfiguration)
|
||||
err := config.SetContinuousQueryTimestamp(c.Timestamp)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
type CreateContinuousQueryCommand struct {
|
||||
Database string `json:"database"`
|
||||
Query string `json:"query"`
|
||||
}
|
||||
|
||||
func (c *CreateContinuousQueryCommand) Apply(server raft.Server) (interface{}, error) {
|
||||
config := server.Context().(*cluster.ClusterConfiguration)
|
||||
err := config.CreateContinuousQuery(c.Database, c.Query)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
type DeleteContinuousQueryCommand struct {
|
||||
Database string `json:"database"`
|
||||
Id uint32 `json:"id"`
|
||||
}
|
||||
|
||||
func (c *DeleteContinuousQueryCommand) Apply(server raft.Server) (interface{}, error) {
|
||||
config := server.Context().(*cluster.ClusterConfiguration)
|
||||
err := config.DeleteContinuousQuery(c.Database, c.Id)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
type DropDatabaseCommand struct {
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
func (c *DropDatabaseCommand) Apply(server raft.Server) (interface{}, error) {
|
||||
config := server.Context().(*cluster.ClusterConfiguration)
|
||||
err := config.DropDatabase(c.Name)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
type SaveDbUserCommand struct {
|
||||
User *cluster.DbUser `json:"user"`
|
||||
}
|
||||
|
||||
func (c *SaveDbUserCommand) Apply(server raft.Server) (interface{}, error) {
|
||||
config := server.Context().(*cluster.ClusterConfiguration)
|
||||
config.SaveDbUser(c.User)
|
||||
log.Debug("(raft:%s) Created user %s:%s", server.Name(), c.User.Db, c.User.Name)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type ChangeDbUserPassword struct {
|
||||
Database string
|
||||
Username string
|
||||
Hash string
|
||||
}
|
||||
|
||||
func (c *ChangeDbUserPassword) Apply(server raft.Server) (interface{}, error) {
|
||||
log.Debug("(raft:%s) changing db user password for %s:%s", server.Name(), c.Database, c.Username)
|
||||
config := server.Context().(*cluster.ClusterConfiguration)
|
||||
return nil, config.ChangeDbUserPassword(c.Database, c.Username, c.Hash)
|
||||
}
|
||||
|
||||
type ChangeDbUserPermissions struct {
|
||||
Database string
|
||||
Username string
|
||||
ReadPermissions string
|
||||
WritePermissions string
|
||||
}
|
||||
|
||||
func (c *ChangeDbUserPermissions) Apply(server raft.Server) (interface{}, error) {
|
||||
log.Debug("(raft:%s) changing db user permissions for %s:%s", server.Name(), c.Database, c.Username)
|
||||
config := server.Context().(*cluster.ClusterConfiguration)
|
||||
return nil, config.ChangeDbUserPermissions(c.Database, c.Username, c.ReadPermissions, c.WritePermissions)
|
||||
}
|
||||
|
||||
type SaveClusterAdminCommand struct {
|
||||
User *cluster.ClusterAdmin `json:"user"`
|
||||
}
|
||||
|
||||
func (c *SaveClusterAdminCommand) Apply(server raft.Server) (interface{}, error) {
|
||||
config := server.Context().(*cluster.ClusterConfiguration)
|
||||
config.SaveClusterAdmin(c.User)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type InfluxJoinCommand struct {
|
||||
Name string `json:"name"`
|
||||
ConnectionString string `json:"connectionString"`
|
||||
ProtobufConnectionString string `json:"protobufConnectionString"`
|
||||
}
|
||||
|
||||
func (c *InfluxJoinCommand) Apply(server raft.Server) (interface{}, error) {
|
||||
err := server.AddPeer(c.Name, c.ConnectionString)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
clusterConfig := server.Context().(*cluster.ClusterConfiguration)
|
||||
|
||||
newServer := clusterConfig.GetServerByRaftName(c.Name)
|
||||
// it's a new server the cluster has never seen, make it a potential
|
||||
if newServer != nil {
|
||||
return nil, fmt.Errorf("Server %s already exist", c.Name)
|
||||
}
|
||||
|
||||
log.Info("Adding new server to the cluster config %s", c.Name)
|
||||
clusterServer := cluster.NewClusterServer(c.Name,
|
||||
c.ConnectionString,
|
||||
c.ProtobufConnectionString,
|
||||
nil,
|
||||
clusterConfig.GetLocalConfiguration())
|
||||
clusterConfig.AddPotentialServer(clusterServer)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type InfluxForceLeaveCommand struct {
|
||||
Id uint32 `json:"id"`
|
||||
}
|
||||
|
||||
func (c *InfluxForceLeaveCommand) Apply(server raft.Server) (interface{}, error) {
|
||||
clusterConfig := server.Context().(*cluster.ClusterConfiguration)
|
||||
s := clusterConfig.GetServerById(&c.Id)
|
||||
|
||||
if s == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if err := server.RemovePeer(s.RaftName); err != nil {
|
||||
log.Warn("Cannot remove peer: %s", err)
|
||||
}
|
||||
|
||||
if err := clusterConfig.RemoveServer(s); err != nil {
|
||||
log.Warn("Cannot remove peer from cluster config: %s", err)
|
||||
}
|
||||
server.FlushCommitIndex()
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type InfluxChangeConnectionStringCommand struct {
|
||||
Name string `json:"name"`
|
||||
Force bool `json:"force"`
|
||||
ConnectionString string `json:"connectionString"`
|
||||
ProtobufConnectionString string `json:"protobufConnectionString"`
|
||||
}
|
||||
|
||||
func (c *InfluxChangeConnectionStringCommand) Apply(server raft.Server) (interface{}, error) {
|
||||
if c.Name == server.Name() {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
server.RemovePeer(c.Name)
|
||||
server.AddPeer(c.Name, c.ConnectionString)
|
||||
|
||||
clusterConfig := server.Context().(*cluster.ClusterConfiguration)
|
||||
|
||||
newServer := clusterConfig.GetServerByRaftName(c.Name)
|
||||
// it's a new server the cluster has never seen, make it a potential
|
||||
if newServer == nil {
|
||||
return nil, fmt.Errorf("Server %s doesn't exist", c.Name)
|
||||
}
|
||||
|
||||
newServer.RaftConnectionString = c.ConnectionString
|
||||
newServer.ProtobufConnectionString = c.ProtobufConnectionString
|
||||
server.Context().(*cluster.ClusterConfiguration).ChangeProtobufConnectionString(newServer)
|
||||
server.FlushCommitIndex()
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type CreateShardsCommand struct {
|
||||
Shards []*cluster.NewShardData
|
||||
SpaceName string
|
||||
}
|
||||
|
||||
func (c *CreateShardsCommand) Apply(server raft.Server) (interface{}, error) {
|
||||
config := server.Context().(*cluster.ClusterConfiguration)
|
||||
createdShards, err := config.AddShards(c.Shards)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
createdShardData := make([]*cluster.NewShardData, 0)
|
||||
for _, s := range createdShards {
|
||||
createdShardData = append(createdShardData, s.ToNewShardData())
|
||||
}
|
||||
return createdShardData, nil
|
||||
}
|
||||
|
||||
type DropShardCommand struct {
|
||||
ShardId uint32
|
||||
ServerIds []uint32
|
||||
}
|
||||
|
||||
func (c *DropShardCommand) Apply(server raft.Server) (interface{}, error) {
|
||||
config := server.Context().(*cluster.ClusterConfiguration)
|
||||
err := config.DropShard(c.ShardId, c.ServerIds)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
type CreateSeriesFieldIdsCommand struct {
|
||||
Database string
|
||||
Series []*protocol.Series
|
||||
}
|
||||
|
||||
func (c *CreateSeriesFieldIdsCommand) Apply(server raft.Server) (interface{}, error) {
|
||||
config := server.Context().(*cluster.ClusterConfiguration)
|
||||
err := config.MetaStore.GetOrSetFieldIds(c.Database, c.Series)
|
||||
return c.Series, err
|
||||
}
|
||||
|
||||
type DropSeriesCommand struct {
|
||||
Database string
|
||||
Series string
|
||||
}
|
||||
|
||||
func (c *DropSeriesCommand) Apply(server raft.Server) (interface{}, error) {
|
||||
config := server.Context().(*cluster.ClusterConfiguration)
|
||||
err := config.DropSeries(c.Database, c.Series)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
type CreateShardSpaceCommand struct {
|
||||
ShardSpace *cluster.ShardSpace
|
||||
}
|
||||
|
||||
func (c *CreateShardSpaceCommand) Apply(server raft.Server) (interface{}, error) {
|
||||
config := server.Context().(*cluster.ClusterConfiguration)
|
||||
err := config.AddShardSpace(c.ShardSpace)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
type DropShardSpaceCommand struct {
|
||||
Database string
|
||||
Name string
|
||||
}
|
||||
|
||||
func (c *DropShardSpaceCommand) Apply(server raft.Server) (interface{}, error) {
|
||||
config := server.Context().(*cluster.ClusterConfiguration)
|
||||
err := config.RemoveShardSpace(c.Database, c.Name)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
type UpdateShardSpaceCommand struct {
|
||||
ShardSpace *cluster.ShardSpace
|
||||
}
|
||||
|
||||
func (c *UpdateShardSpaceCommand) Apply(server raft.Server) (interface{}, error) {
|
||||
config := server.Context().(*cluster.ClusterConfiguration)
|
||||
err := config.UpdateShardSpace(c.ShardSpace)
|
||||
return nil, err
|
||||
}
|
|
@ -304,40 +304,6 @@ func (self *Coordinator) runQuerySpec(querySpec *parser.QuerySpec, p engine.Proc
|
|||
return processor.Close()
|
||||
}
|
||||
|
||||
func (self *Coordinator) ForceCompaction(user common.User) error {
|
||||
if !user.IsClusterAdmin() {
|
||||
return fmt.Errorf("Insufficient permissions to force a log compaction")
|
||||
}
|
||||
|
||||
return self.raftServer.ForceLogCompaction()
|
||||
}
|
||||
|
||||
func (self *Coordinator) WriteSeriesData(user common.User, db string, series []*protocol.Series) error {
|
||||
// make sure that the db exist
|
||||
if !self.clusterConfiguration.DatabasesExists(db) {
|
||||
return fmt.Errorf("Database %s doesn't exist", db)
|
||||
}
|
||||
|
||||
for _, s := range series {
|
||||
seriesName := s.GetName()
|
||||
if user.HasWriteAccess(seriesName) {
|
||||
continue
|
||||
}
|
||||
return common.NewAuthorizationError("User %s doesn't have write permissions for %s", user.GetName(), seriesName)
|
||||
}
|
||||
|
||||
err := self.CommitSeriesData(db, series, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, s := range series {
|
||||
self.ProcessContinuousQueries(db, s)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (self *Coordinator) ProcessContinuousQueries(db string, series *protocol.Series) {
|
||||
if self.clusterConfiguration.ParsedContinuousQueries != nil {
|
||||
incomingSeriesName := *series.Name
|
||||
|
@ -613,166 +579,3 @@ func (self *Coordinator) runDropContinuousQuery(user common.User, db string, id
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *Coordinator) ListContinuousQueries(user common.User, db string) ([]*protocol.Series, error) {
|
||||
if ok, err := self.permissions.AuthorizeListContinuousQueries(user, db); !ok {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
queries := self.clusterConfiguration.GetContinuousQueries(db)
|
||||
points := []*protocol.Point{}
|
||||
|
||||
for _, query := range queries {
|
||||
queryId := int64(query.Id)
|
||||
queryString := query.Query
|
||||
points = append(points, &protocol.Point{
|
||||
Values: []*protocol.FieldValue{
|
||||
{Int64Value: &queryId},
|
||||
{StringValue: &queryString},
|
||||
},
|
||||
})
|
||||
}
|
||||
seriesName := "continuous queries"
|
||||
series := []*protocol.Series{{
|
||||
Name: &seriesName,
|
||||
Fields: []string{"id", "query"},
|
||||
Points: points,
|
||||
}}
|
||||
return series, nil
|
||||
}
|
||||
|
||||
func (self *Coordinator) CreateDatabase(user common.User, db string) error {
|
||||
if ok, err := self.permissions.AuthorizeCreateDatabase(user); !ok {
|
||||
return err
|
||||
}
|
||||
|
||||
if !isValidName(db) {
|
||||
return fmt.Errorf("%s isn't a valid db name", db)
|
||||
}
|
||||
|
||||
err := self.raftServer.CreateDatabase(db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *Coordinator) ListDatabases(user common.User) ([]*cluster.Database, error) {
|
||||
if ok, err := self.permissions.AuthorizeListDatabases(user); !ok {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dbs := self.clusterConfiguration.GetDatabases()
|
||||
return dbs, nil
|
||||
}
|
||||
|
||||
func (self *Coordinator) AuthenticateDbUser(db, username, password string) (common.User, error) {
|
||||
log.Debug("(raft:%s) Authenticating password for %s:%s", self.raftServer.raftServer.Name(), db, username)
|
||||
user, err := self.clusterConfiguration.AuthenticateDbUser(db, username, password)
|
||||
if user != nil {
|
||||
log.Debug("(raft:%s) User %s authenticated succesfully", self.raftServer.raftServer.Name(), username)
|
||||
}
|
||||
return user, err
|
||||
}
|
||||
|
||||
func (self *Coordinator) AuthenticateClusterAdmin(username, password string) (common.User, error) {
|
||||
return self.clusterConfiguration.AuthenticateClusterAdmin(username, password)
|
||||
}
|
||||
|
||||
func (self *Coordinator) ListClusterAdmins(requester common.User) ([]string, error) {
|
||||
if ok, err := self.permissions.AuthorizeListClusterAdmins(requester); !ok {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return self.clusterConfiguration.GetClusterAdmins(), nil
|
||||
}
|
||||
|
||||
func (self *Coordinator) CreateClusterAdminUser(requester common.User, username, password string) error {
|
||||
if ok, err := self.permissions.AuthorizeCreateClusterAdmin(requester); !ok {
|
||||
return err
|
||||
}
|
||||
|
||||
if !isValidName(username) {
|
||||
return fmt.Errorf("%s isn't a valid username", username)
|
||||
}
|
||||
|
||||
hash, err := cluster.HashPassword(password)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if self.clusterConfiguration.GetClusterAdmin(username) != nil {
|
||||
return fmt.Errorf("User %s already exists", username)
|
||||
}
|
||||
|
||||
return self.raftServer.SaveClusterAdminUser(&cluster.ClusterAdmin{cluster.CommonUser{Name: username, CacheKey: username, Hash: string(hash)}})
|
||||
}
|
||||
|
||||
func (self *Coordinator) DeleteClusterAdminUser(requester common.User, username string) error {
|
||||
if ok, err := self.permissions.AuthorizeDeleteClusterAdmin(requester); !ok {
|
||||
return err
|
||||
}
|
||||
|
||||
user := self.clusterConfiguration.GetClusterAdmin(username)
|
||||
if user == nil {
|
||||
return fmt.Errorf("User %s doesn't exists", username)
|
||||
}
|
||||
|
||||
user.CommonUser.IsUserDeleted = true
|
||||
return self.raftServer.SaveClusterAdminUser(user)
|
||||
}
|
||||
|
||||
func (self *Coordinator) ChangeClusterAdminPassword(requester common.User, username, password string) error {
|
||||
if ok, err := self.permissions.AuthorizeChangeClusterAdminPassword(requester); !ok {
|
||||
return err
|
||||
}
|
||||
|
||||
user := self.clusterConfiguration.GetClusterAdmin(username)
|
||||
if user == nil {
|
||||
return fmt.Errorf("Invalid user name %s", username)
|
||||
}
|
||||
|
||||
hash, err := cluster.HashPassword(password)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
user.ChangePassword(string(hash))
|
||||
return self.raftServer.SaveClusterAdminUser(user)
|
||||
}
|
||||
|
||||
func (self *Coordinator) ChangeDbUserPermissions(requester common.User, db, username, readPermissions, writePermissions string) error {
|
||||
if ok, err := self.permissions.AuthorizeChangeDbUserPermissions(requester, db); !ok {
|
||||
return err
|
||||
}
|
||||
|
||||
return self.raftServer.ChangeDbUserPermissions(db, username, readPermissions, writePermissions)
|
||||
}
|
||||
|
||||
func (self *Coordinator) SetDbAdmin(requester common.User, db, username string, isAdmin bool) error {
|
||||
if ok, err := self.permissions.AuthorizeGrantDbUserAdmin(requester, db); !ok {
|
||||
return err
|
||||
}
|
||||
|
||||
user := self.clusterConfiguration.GetDbUser(db, username)
|
||||
if user == nil {
|
||||
return fmt.Errorf("Invalid username %s", username)
|
||||
}
|
||||
user.IsAdmin = isAdmin
|
||||
self.raftServer.SaveDbUser(user)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *Coordinator) ConnectToProtobufServers(localRaftName string) error {
|
||||
log.Info("Connecting to other nodes in the cluster")
|
||||
|
||||
for _, server := range self.clusterConfiguration.Servers() {
|
||||
if server.RaftName != localRaftName {
|
||||
server.Connect()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func isValidName(name string) bool {
|
||||
return !strings.Contains(name, "%")
|
||||
}
|
||||
|
|
|
@ -1,11 +0,0 @@
|
|||
package coordinator
|
||||
|
||||
import (
|
||||
"net"
|
||||
|
||||
"github.com/influxdb/influxdb/protocol"
|
||||
)
|
||||
|
||||
type Handler interface {
|
||||
HandleRequest(*protocol.Request, net.Conn) error
|
||||
}
|
134
server.go
134
server.go
|
@ -15,12 +15,14 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
createDatabaseMessageType = messaging.MessageType(0x00)
|
||||
deleteDatabaseMessageType = messaging.MessageType(0x01)
|
||||
|
||||
createDBUserMessageType = messaging.MessageType(0x11)
|
||||
deleteDBUserMessageType = messaging.MessageType(0x12)
|
||||
changePasswordMessageType = messaging.MessageType(0x13)
|
||||
createDatabaseMessageType = messaging.MessageType(0x00)
|
||||
deleteDatabaseMessageType = messaging.MessageType(0x01)
|
||||
createClusterAdminMessageType = messaging.MessageType(0x02)
|
||||
deleteClusterAdminMessageType = messaging.MessageType(0x03)
|
||||
createDBUserMessageType = messaging.MessageType(0x04)
|
||||
deleteDBUserMessageType = messaging.MessageType(0x05)
|
||||
dbUserSetPasswordMessageType = messaging.MessageType(0x06)
|
||||
clusterAdminSetPasswordMessageType = messaging.MessageType(0x06)
|
||||
)
|
||||
|
||||
// Server represents a collection of metadata and raw metric data.
|
||||
|
@ -34,6 +36,7 @@ type Server struct {
|
|||
errors map[uint64]error // message errors
|
||||
|
||||
databases map[string]*Database
|
||||
admins map[string]*ClusterAdmin
|
||||
}
|
||||
|
||||
// NewServer returns a new instance of Server.
|
||||
|
@ -220,47 +223,13 @@ func (s *Server) applyCreateDBUser(m *messaging.Message) error {
|
|||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// Validate user.
|
||||
if c.Username == "" {
|
||||
return ErrUsernameRequired
|
||||
} else if !isValidName(c.Username) {
|
||||
return ErrInvalidUsername
|
||||
}
|
||||
|
||||
// Retrieve the database.
|
||||
db := s.databases[c.Database]
|
||||
if s.databases[c.Database] == nil {
|
||||
return ErrDatabaseNotFound
|
||||
} else if db.User(c.Username) != nil {
|
||||
return ErrUserExists
|
||||
}
|
||||
|
||||
// Generate the hash of the password.
|
||||
hash, err := HashPassword(c.Password)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Setup matchers.
|
||||
rmatcher := []*Matcher{{true, ".*"}}
|
||||
wmatcher := []*Matcher{{true, ".*"}}
|
||||
if len(c.Permissions) == 2 {
|
||||
rmatcher[0].Name = c.Permissions[0]
|
||||
wmatcher[0].Name = c.Permissions[1]
|
||||
}
|
||||
|
||||
// Create the user.
|
||||
u := &DBUser{
|
||||
CommonUser: CommonUser{
|
||||
Name: c.Username,
|
||||
Hash: string(hash),
|
||||
},
|
||||
DB: c.Database,
|
||||
ReadFrom: rmatcher,
|
||||
WriteTo: wmatcher,
|
||||
IsAdmin: false,
|
||||
}
|
||||
return db.saveUser(u)
|
||||
return db.applyCreateUser(c.Username, c.Password, c.Permissions)
|
||||
}
|
||||
|
||||
type createDBUserCommand struct {
|
||||
|
@ -277,18 +246,13 @@ func (s *Server) applyDeleteDBUser(m *messaging.Message) error {
|
|||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// Validate command.
|
||||
if c.Username == "" {
|
||||
return ErrUsernameRequired
|
||||
}
|
||||
|
||||
// Retrieve the database.
|
||||
db := s.databases[c.Database]
|
||||
if s.databases[c.Database] == nil {
|
||||
return ErrDatabaseNotFound
|
||||
}
|
||||
|
||||
return db.deleteUser(c.Username)
|
||||
return db.applyDeleteUser(c.Username)
|
||||
}
|
||||
|
||||
type deleteDBUserCommand struct {
|
||||
|
@ -296,18 +260,13 @@ type deleteDBUserCommand struct {
|
|||
Username string `json:"username"`
|
||||
}
|
||||
|
||||
func (s *Server) applyChangePassword(m *messaging.Message) error {
|
||||
var c changePasswordCommand
|
||||
func (s *Server) applyDBUserSetPassword(m *messaging.Message) error {
|
||||
var c dbUserSetPasswordCommand
|
||||
mustUnmarshal(m.Data, &c)
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// Validate user.
|
||||
if c.Username == "" {
|
||||
return ErrUsernameRequired
|
||||
}
|
||||
|
||||
// Retrieve the database.
|
||||
db := s.databases[c.Database]
|
||||
if s.databases[c.Database] == nil {
|
||||
|
@ -317,7 +276,7 @@ func (s *Server) applyChangePassword(m *messaging.Message) error {
|
|||
return db.changePassword(c.Username, c.Password)
|
||||
}
|
||||
|
||||
type changePasswordCommand struct {
|
||||
type dbUserSetPasswordCommand struct {
|
||||
Database string `json:"database"`
|
||||
Username string `json:"username"`
|
||||
Password string `json:"password"`
|
||||
|
@ -352,8 +311,8 @@ func (s *Server) processor(done chan struct{}) {
|
|||
err = s.applyCreateDBUser(m)
|
||||
case deleteDBUserMessageType:
|
||||
err = s.applyDeleteDBUser(m)
|
||||
case changePasswordMessageType:
|
||||
err = s.applyChangePassword(m)
|
||||
case dbUserSetPasswordMessageType:
|
||||
err = s.applyDBUserSetPassword(m)
|
||||
}
|
||||
|
||||
// Sync high water mark and errors for broadcast topic.
|
||||
|
@ -435,10 +394,45 @@ func (db *Database) CreateUser(username, password string, permissions []string)
|
|||
return err
|
||||
}
|
||||
|
||||
func (db *Database) saveUser(u *DBUser) error {
|
||||
func (db *Database) applyCreateUser(username, password string, permissions []string) error {
|
||||
db.mu.Lock()
|
||||
db.users[u.Name] = u
|
||||
db.mu.Unlock()
|
||||
defer db.mu.Unlock()
|
||||
|
||||
// Validate user.
|
||||
if username == "" {
|
||||
return ErrUsernameRequired
|
||||
} else if !isValidName(username) {
|
||||
return ErrInvalidUsername
|
||||
} else if db.users[username] != nil {
|
||||
return ErrUserExists
|
||||
}
|
||||
|
||||
// Generate the hash of the password.
|
||||
hash, err := HashPassword(password)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Setup matchers.
|
||||
rmatcher := []*Matcher{{true, ".*"}}
|
||||
wmatcher := []*Matcher{{true, ".*"}}
|
||||
if len(permissions) == 2 {
|
||||
rmatcher[0].Name = permissions[0]
|
||||
wmatcher[0].Name = permissions[1]
|
||||
}
|
||||
|
||||
// Create the user.
|
||||
db.users[username] = &DBUser{
|
||||
CommonUser: CommonUser{
|
||||
Name: username,
|
||||
Hash: string(hash),
|
||||
},
|
||||
DB: db.name,
|
||||
ReadFrom: rmatcher,
|
||||
WriteTo: wmatcher,
|
||||
IsAdmin: false,
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -452,12 +446,14 @@ func (db *Database) DeleteUser(username string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (db *Database) deleteUser(username string) error {
|
||||
func (db *Database) applyDeleteUser(username string) error {
|
||||
db.mu.Lock()
|
||||
defer db.mu.Unlock()
|
||||
|
||||
// Check if user exists.
|
||||
if db.users[username] == nil {
|
||||
// Validate user.
|
||||
if username == "" {
|
||||
return ErrUsernameRequired
|
||||
} else if db.users[username] == nil {
|
||||
return ErrUserNotFound
|
||||
}
|
||||
|
||||
|
@ -466,14 +462,14 @@ func (db *Database) deleteUser(username string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// ChangePassword changes the password for a user in the database.
|
||||
// ChangePassword changes the password for a user in the database
|
||||
func (db *Database) ChangePassword(username, newPassword string) error {
|
||||
c := &changePasswordCommand{
|
||||
c := &dbUserSetPasswordCommand{
|
||||
Database: db.Name(),
|
||||
Username: username,
|
||||
Password: newPassword,
|
||||
}
|
||||
_, err := db.server.broadcast(changePasswordMessageType, c)
|
||||
_, err := db.server.broadcast(dbUserSetPasswordMessageType, c)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -481,9 +477,11 @@ func (db *Database) changePassword(username, newPassword string) error {
|
|||
db.mu.Lock()
|
||||
defer db.mu.Unlock()
|
||||
|
||||
// Check that user exists.
|
||||
// Validate user.
|
||||
u := db.users[username]
|
||||
if u == nil {
|
||||
if username == "" {
|
||||
return ErrUsernameRequired
|
||||
} else if u == nil {
|
||||
return ErrUserNotFound
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue