pvt #59568110. add the drop database command.

pull/17/head
John Shahid 2013-10-25 11:22:31 -04:00
parent e06cb8736b
commit 5331297c68
7 changed files with 83 additions and 4 deletions

View File

@ -116,6 +116,18 @@ func (self *ClusterConfiguration) CreateDatabase(name string) error {
return nil
}
func (self *ClusterConfiguration) DropDatabase(name string) error {
self.createDatabaseLock.Lock()
defer self.createDatabaseLock.Unlock()
if _, ok := self.databaseNames[name]; !ok {
return fmt.Errorf("Database %s doesn't exist", name)
}
delete(self.databaseNames, name)
return nil
}
func (self *ClusterConfiguration) NextDatabaseId() string {
self.nextDatabaseIdLock.Lock()
self.nextDatabaseId += 1

View File

@ -112,6 +112,24 @@ func (c *NextDatabaseIdCommand) Apply(server raft.Server) (interface{}, error) {
return id, nil
}
type DropDatabaseCommand struct {
Name string `json:"name"`
}
func NewDropDatabaseCommand(name string) *DropDatabaseCommand {
return &DropDatabaseCommand{name}
}
func (c *DropDatabaseCommand) CommandName() string {
return "drop_db"
}
func (c *DropDatabaseCommand) Apply(server raft.Server) (interface{}, error) {
config := server.Context().(*ClusterConfiguration)
err := config.DropDatabase(c.Name)
return nil, err
}
type CreateDatabaseCommand struct {
Name string `json:"name"`
}

View File

@ -63,6 +63,10 @@ func (self *CoordinatorImpl) CreateDatabase(db, initialApiKey, requestingApiKey
return err
}
func (self *CoordinatorImpl) DropDatabase(db string) error {
return self.raftServer.DropDatabase(db)
}
func (self *CoordinatorImpl) AuthenticateDbUser(db, username, password string) (User, error) {
dbUsers := self.clusterConfiguration.dbUsers[db]
if dbUsers == nil || dbUsers[username] == nil {

View File

@ -565,15 +565,20 @@ func (self *CoordinatorSuite) TestUserDataReplication(c *C) {
}
}
func (self *CoordinatorSuite) TestCanCreateDatabaseWithName(c *C) {
servers := startAndVerifyCluster(3, c)
defer clean(servers)
func (self *CoordinatorSuite) createDatabases(servers []*RaftServer, c *C) {
err := servers[0].CreateDatabase("db1")
c.Assert(err, IsNil)
err = servers[1].CreateDatabase("db2")
c.Assert(err, IsNil)
err = servers[2].CreateDatabase("db3")
c.Assert(err, IsNil)
}
func (self *CoordinatorSuite) TestCanCreateDatabaseWithName(c *C) {
servers := startAndVerifyCluster(3, c)
defer clean(servers)
self.createDatabases(servers, c)
time.Sleep(REPLICATION_LAG)
@ -582,12 +587,38 @@ func (self *CoordinatorSuite) TestCanCreateDatabaseWithName(c *C) {
c.Assert(databases, DeepEquals, map[string]bool{"db1": true, "db2": true, "db3": true})
}
err = servers[0].CreateDatabase("db3")
err := servers[0].CreateDatabase("db3")
c.Assert(err, ErrorMatches, ".*db3 exists.*")
err = servers[2].CreateDatabase("db3")
c.Assert(err, ErrorMatches, ".*db3 exists.*")
}
func (self *CoordinatorSuite) TestCanDropDatabaseWithName(c *C) {
servers := startAndVerifyCluster(3, c)
defer clean(servers)
self.createDatabases(servers, c)
err := servers[0].DropDatabase("db1")
c.Assert(err, IsNil)
err = servers[1].DropDatabase("db2")
c.Assert(err, IsNil)
err = servers[2].DropDatabase("db3")
c.Assert(err, IsNil)
time.Sleep(REPLICATION_LAG)
for i := 0; i < 3; i++ {
databases := servers[i].clusterConfig.GetDatabases()
c.Assert(databases, HasLen, 0)
}
err = servers[0].DropDatabase("db3")
c.Assert(err, ErrorMatches, ".*db3 doesn't exist.*")
err = servers[2].DropDatabase("db3")
c.Assert(err, ErrorMatches, ".*db3 doesn't exist.*")
}
func (self *CoordinatorSuite) TestCanCreateDatabase(c *C) {
servers := startAndVerifyCluster(3, c)
defer clean(servers)

View File

@ -17,6 +17,7 @@ type Coordinator interface {
DistributeQuery(db string, query *parser.Query, yield func(*protocol.Series) error) error
WriteSeriesData(db string, series *protocol.Series) error
CreateDatabase(db, initialApiKey, requestingApiKey string) error
DropDatabase(db string) error
}
type UserManager interface {

View File

@ -47,6 +47,7 @@ func NewRaftServer(path string, host string, port int, clusterConfig *ClusterCon
raft.RegisterCommand(&RemoveServerFromLocationCommand{})
raft.RegisterCommand(&NextDatabaseIdCommand{})
raft.RegisterCommand(&CreateDatabaseCommand{})
raft.RegisterCommand(&DropDatabaseCommand{})
raft.RegisterCommand(&SaveDbUserCommand{})
raft.RegisterCommand(&SaveClusterAdminCommand{})
}
@ -146,6 +147,12 @@ func (s *RaftServer) CreateDatabase(name string) error {
return err
}
func (s *RaftServer) DropDatabase(name string) error {
command := NewDropDatabaseCommand(name)
_, err := s.doOrProxyCommand(command, "drop_db")
return err
}
func (s *RaftServer) GetNextDatabaseId() (string, error) {
command := NewNextDatabaseIdCommand(s.clusterConfig.nextDatabaseId)
id, err := s.doOrProxyCommand(command, "next_db")
@ -367,6 +374,8 @@ func (s *RaftServer) processCommandHandler(w http.ResponseWriter, req *http.Requ
command = &NextDatabaseIdCommand{}
} else if value == "create_db" {
command = &CreateDatabaseCommand{}
} else if value == "drop_db" {
command = &DropDatabaseCommand{}
} else if value == "save_db_user" {
command = &SaveDbUserCommand{}
} else if value == "save_cluster_admin_user" {

View File

@ -49,6 +49,10 @@ func (self *MockCoordinator) CreateDatabase(db, initialApiKey, requestingApiKey
return nil
}
func (self *MockCoordinator) DropDatabase(db string) error {
return nil
}
func createEngine(c *C, seriesString string) EngineI {
series, err := common.StringToSeriesArray(seriesString)
c.Assert(err, IsNil)