test auto replace raft node

pull/4685/head
Cory LaNou 2015-11-10 09:20:19 -06:00
parent 3912c71ccf
commit b2ed141136
3 changed files with 110 additions and 7 deletions

View File

@ -35,6 +35,7 @@ type raftState interface {
lastIndex() uint64
apply(b []byte) error
snapshot() error
isLocal() bool
}
// localRaft is a consensus strategy that uses a local raft implementation for
@ -351,6 +352,10 @@ func (r *localRaft) isLeader() bool {
return r.raft.State() == raft.Leader
}
func (r *localRaft) isLocal() bool {
return true
}
// remoteRaft is a consensus strategy that uses a remote raft cluster for
// consensus operations.
type remoteRaft struct {
@ -469,6 +474,10 @@ func (r *remoteRaft) isLeader() bool {
return false
}
func (r *remoteRaft) isLocal() bool {
return false
}
func (r *remoteRaft) lastIndex() uint64 {
return r.store.cachedData().Index
}

View File

@ -267,6 +267,7 @@ func (s *Store) Open() error {
if s.raftPromotionEnabled {
s.wg.Add(1)
s.Logger.Printf("spun up monitoring for %d", s.NodeID())
go s.monitorPeerHealth()
}
@ -454,6 +455,7 @@ func (s *Store) monitorPeerHealth() {
func (s *Store) promoteNodeToPeer() error {
// Only do this if you are the leader
if !s.IsLeader() {
return nil
}
@ -689,6 +691,13 @@ func (s *Store) IsLeader() bool {
return s.raftState.isLeader()
}
// IsLocal returns true if the store is currently participating in local raft.
func (s *Store) IsLocal() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.raftState.isLocal()
}
// Leader returns what the store thinks is the current leader. An empty
// string indicates no leader exists.
func (s *Store) Leader() string {

View File

@ -1056,6 +1056,79 @@ func TestCluster_Restart(t *testing.T) {
wg.Wait()
}
// Ensure a multi-node cluster can start, join the cluster, and the first three members are raft nodes., then add a 4th non raft
// Remove a raft node, ensure the 4th promotes to raft
func TestCluster_ReplaceRaft(t *testing.T) {
t.Parallel()
// Start a single node.
c := MustOpenCluster(1)
defer c.Close()
// Check that the node becomes leader.
if s := c.Leader(); s == nil {
t.Fatal("no leader found")
}
// Add 2 more nodes.
for i := 0; i < 2; i++ {
if err := c.Join(); err != nil {
t.Fatalf("failed to join cluster: %v", err)
}
}
// sleep to let them become raft
time.Sleep(time.Second)
// ensure we have 3 raft nodes
for _, s := range c.Stores {
if !s.IsLocal() {
t.Fatalf("node %d is not a local raft instance.", s.NodeID())
}
}
// ensure all the nodes see the same metastore data
assertDatabaseReplicated(t, c)
// Add another node
if err := c.Join(); err != nil {
t.Fatalf("failed to join cluster: %v", err)
}
var leader, follower *Store
// find a non-leader node
for _, s := range c.Stores {
if s.IsLeader() {
leader = s
}
// Find any follower to remove
if !s.IsLeader() && s.IsLocal() {
follower = s
}
if leader != nil && follower != nil {
break
}
}
// drop the node
if err := leader.DeleteNode(follower.NodeID(), true); err != nil {
t.Fatal(err)
}
if err := c.Remove(follower.NodeID()); err != nil {
t.Fatal(err)
}
// sleep to let them become raft
time.Sleep(1 * time.Second)
// ensure we have 3 raft nodes
for _, s := range c.Stores {
if !s.IsLocal() {
t.Fatalf("node %d is not a local raft instance.", s.NodeID())
}
}
}
// Store is a test wrapper for meta.Store.
type Store struct {
*meta.Store
@ -1149,13 +1222,14 @@ func (s *Store) Close() error {
// NewConfig returns the default test configuration.
func NewConfig(path string) *meta.Config {
return &meta.Config{
Dir: path,
Hostname: "localhost",
BindAddress: "127.0.0.1:0",
HeartbeatTimeout: toml.Duration(500 * time.Millisecond),
ElectionTimeout: toml.Duration(500 * time.Millisecond),
LeaderLeaseTimeout: toml.Duration(500 * time.Millisecond),
CommitTimeout: toml.Duration(5 * time.Millisecond),
Dir: path,
Hostname: "localhost",
BindAddress: "127.0.0.1:0",
HeartbeatTimeout: toml.Duration(500 * time.Millisecond),
ElectionTimeout: toml.Duration(500 * time.Millisecond),
LeaderLeaseTimeout: toml.Duration(500 * time.Millisecond),
CommitTimeout: toml.Duration(5 * time.Millisecond),
RaftPromotionEnabled: true,
}
}
@ -1210,6 +1284,17 @@ func (c *Cluster) Join() error {
return nil
}
func (c *Cluster) Remove(nodeID uint64) error {
for i, s := range c.Stores {
if s.NodeID() == nodeID {
// This could hang for a variety of reasons, so don't wait for it
go s.Close()
c.Stores = append(c.Stores[:i], c.Stores[i+1:]...)
}
}
return nil
}
// Open opens and initializes all stores in the cluster.
func (c *Cluster) Open() error {
if err := func() error {