Fix #544. Add a way to force remove a node

This patch adds an endpoint as well as interfaces to RaftServer and
ClusterConfiguration to make it possible to forcibly remove a node from
the cluster. The way it works is similar to changing the connection
string. An InfluxJoinCommand command is created applied to local node as
well as other nodes in the cluster directly, then the command runs
through raft to make sure the change is permanent.
pull/544/head
John Shahid 2014-05-20 17:50:26 -04:00
parent 062de961c9
commit 9d91b41984
5 changed files with 135 additions and 0 deletions

View File

@ -140,6 +140,7 @@ func (self *HttpServer) Serve(listener net.Listener) {
// cluster config endpoints
self.registerEndpoint(p, "get", "/cluster/servers", self.listServers)
self.registerEndpoint(p, "del", "/cluster/servers/:id", self.removeServers)
self.registerEndpoint(p, "post", "/cluster/shards", self.createShard)
self.registerEndpoint(p, "get", "/cluster/shards", self.getShards)
self.registerEndpoint(p, "del", "/cluster/shards/:id", self.dropShard)
@ -954,6 +955,21 @@ func (self *HttpServer) listServers(w libhttp.ResponseWriter, r *libhttp.Request
})
}
func (self *HttpServer) removeServers(w libhttp.ResponseWriter, r *libhttp.Request) {
self.tryAsClusterAdmin(w, r, func(u User) (int, interface{}) {
id, err := strconv.ParseInt(r.URL.Query().Get(":id"), 10, 32)
if err != nil {
return errorToStatusCode(err), err.Error()
}
err = self.raftServer.RemoveServer(uint32(id))
if err != nil {
return errorToStatusCode(err), err.Error()
}
return libhttp.StatusOK, nil
})
}
type newShardInfo struct {
StartTime int64 `json:"startTime"`
EndTime int64 `json:"endTime"`

View File

@ -217,6 +217,24 @@ func (self *ClusterConfiguration) ChangeProtobufConnectionString(server *Cluster
server.Connect()
}
func (self *ClusterConfiguration) RemoveServer(server *ClusterServer) error {
server.connection.Close()
i := 0
l := len(self.servers)
for i = 0; i < l; i++ {
if self.servers[i].Id == server.Id {
log.Debug("Found server %d at index %d", server.Id, i)
break
}
}
if i == l {
return fmt.Errorf("Cannot find server %d", server.Id)
}
self.servers[i], self.servers = self.servers[l-1], self.servers[:l-1]
log.Debug("Removed server %d", server.Id)
return nil
}
func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer) {
self.serversLock.Lock()
defer self.serversLock.Unlock()

View File

@ -17,6 +17,7 @@ func init() {
internalRaftCommands = map[string]raft.Command{}
for _, command := range []raft.Command{
&InfluxJoinCommand{},
&InfluxForceLeaveCommand{},
&InfluxChangeConnectionStringCommand{},
&CreateDatabaseCommand{},
&DropDatabaseCommand{},
@ -256,6 +257,34 @@ func (c *InfluxJoinCommand) NodeName() string {
return c.Name
}
type InfluxForceLeaveCommand struct {
Id uint32 `json:"id"`
}
// The name of the ForceLeave command in the log
func (c *InfluxForceLeaveCommand) CommandName() string {
return "force_leave"
}
func (c *InfluxForceLeaveCommand) Apply(server raft.Server) (interface{}, error) {
clusterConfig := server.Context().(*cluster.ClusterConfiguration)
s := clusterConfig.GetServerById(&c.Id)
if s == nil {
return nil, nil
}
if err := server.RemovePeer(s.RaftName); err != nil {
log.Warn("Cannot remove peer: %s", err)
}
if err := clusterConfig.RemoveServer(s); err != nil {
log.Warn("Cannot remove peer from cluster config: %s", err)
}
server.FlushCommitIndex()
return nil, nil
}
type InfluxChangeConnectionStringCommand struct {
Name string `json:"name"`
Force bool `json:"force"`

View File

@ -552,6 +552,26 @@ func (s *RaftServer) HandleFunc(pattern string, handler func(http.ResponseWriter
s.router.HandleFunc(pattern, handler)
}
// Joins to the leader of an existing cluster.
func (s *RaftServer) RemoveServer(id uint32) error {
command := &InfluxForceLeaveCommand{
Id: id,
}
for _, s := range s.raftServer.Peers() {
// send the command and ignore errors in case a server is down
SendCommandToServer(s.ConnectionString, command)
}
if _, err := command.Apply(s.raftServer); err != nil {
return err
}
// make the change permament
log.Info("Running the actual command")
_, err := s.doOrProxyCommand(command)
return err
}
// Joins to the leader of an existing cluster.
func (s *RaftServer) Join(leader string) error {
command := &InfluxJoinCommand{

View File

@ -0,0 +1,52 @@
package integration
import (
. "integration/helpers"
"os"
. "launchpad.net/gocheck"
)
type RemoveNodeSuite struct {
serverProcesses []*Server
}
var _ = Suite(&RemoveNodeSuite{})
func (self *RemoveNodeSuite) SetUpSuite(c *C) {
}
func (self *RemoveNodeSuite) TearDownSuite(c *C) {
for _, s := range self.serverProcesses {
s.Stop()
}
}
func (self *RemoveNodeSuite) TestRemovingNode(c *C) {
err := os.RemoveAll("/tmp/influxdb/test")
c.Assert(err, IsNil)
s1 := NewServer("src/integration/test_rf_1.toml", c)
defer s1.Stop()
s1.WaitForServerToStart()
s2 := NewServer("src/integration/test_rf_2.toml", c)
s2.WaitForServerToStart()
client := s1.GetClient("", c)
servers, err := client.Servers()
c.Assert(err, IsNil)
c.Assert(servers, HasLen, 2)
s2.Stop()
c.Assert(client.RemoveServer(2), IsNil)
servers, err = client.Servers()
c.Assert(err, IsNil)
c.Assert(servers, HasLen, 1)
c.Assert(client.CreateDatabase("test"), IsNil)
series := CreatePoints("test_replication_factor_1", 1, 1)
c.Assert(client.WriteSeries(series), IsNil)
s1.WaitForServerToSync()
}