diff --git a/meta/state.go b/meta/state.go index 72d2619ed0..77c941b676 100644 --- a/meta/state.go +++ b/meta/state.go @@ -35,6 +35,7 @@ type raftState interface { lastIndex() uint64 apply(b []byte) error snapshot() error + isLocal() bool } // localRaft is a consensus strategy that uses a local raft implementation for @@ -351,6 +352,10 @@ func (r *localRaft) isLeader() bool { return r.raft.State() == raft.Leader } +func (r *localRaft) isLocal() bool { + return true +} + // remoteRaft is a consensus strategy that uses a remote raft cluster for // consensus operations. type remoteRaft struct { @@ -469,6 +474,10 @@ func (r *remoteRaft) isLeader() bool { return false } +func (r *remoteRaft) isLocal() bool { + return false +} + func (r *remoteRaft) lastIndex() uint64 { return r.store.cachedData().Index } diff --git a/meta/store.go b/meta/store.go index a8e2c380a0..be2ac0a2ae 100644 --- a/meta/store.go +++ b/meta/store.go @@ -267,6 +267,7 @@ func (s *Store) Open() error { if s.raftPromotionEnabled { s.wg.Add(1) + s.Logger.Printf("spun up monitoring for %d", s.NodeID()) go s.monitorPeerHealth() } @@ -454,6 +455,7 @@ func (s *Store) monitorPeerHealth() { func (s *Store) promoteNodeToPeer() error { // Only do this if you are the leader + if !s.IsLeader() { return nil } @@ -689,6 +691,13 @@ func (s *Store) IsLeader() bool { return s.raftState.isLeader() } +// IsLocal returns true if the store is currently participating in local raft. +func (s *Store) IsLocal() bool { + s.mu.RLock() + defer s.mu.RUnlock() + return s.raftState.isLocal() +} + // Leader returns what the store thinks is the current leader. An empty // string indicates no leader exists. func (s *Store) Leader() string { diff --git a/meta/store_test.go b/meta/store_test.go index 6c7244ea1d..7a56ec3038 100644 --- a/meta/store_test.go +++ b/meta/store_test.go @@ -1056,6 +1056,79 @@ func TestCluster_Restart(t *testing.T) { wg.Wait() } +// Ensure a multi-node cluster can start, join the cluster, and the first three members are raft nodes., then add a 4th non raft +// Remove a raft node, ensure the 4th promotes to raft +func TestCluster_ReplaceRaft(t *testing.T) { + t.Parallel() + // Start a single node. + c := MustOpenCluster(1) + defer c.Close() + + // Check that the node becomes leader. + if s := c.Leader(); s == nil { + t.Fatal("no leader found") + } + + // Add 2 more nodes. + for i := 0; i < 2; i++ { + if err := c.Join(); err != nil { + t.Fatalf("failed to join cluster: %v", err) + } + } + + // sleep to let them become raft + time.Sleep(time.Second) + + // ensure we have 3 raft nodes + for _, s := range c.Stores { + if !s.IsLocal() { + t.Fatalf("node %d is not a local raft instance.", s.NodeID()) + } + } + + // ensure all the nodes see the same metastore data + assertDatabaseReplicated(t, c) + + // Add another node + if err := c.Join(); err != nil { + t.Fatalf("failed to join cluster: %v", err) + } + + var leader, follower *Store + + // find a non-leader node + for _, s := range c.Stores { + if s.IsLeader() { + leader = s + } + // Find any follower to remove + if !s.IsLeader() && s.IsLocal() { + follower = s + } + if leader != nil && follower != nil { + break + } + } + + // drop the node + if err := leader.DeleteNode(follower.NodeID(), true); err != nil { + t.Fatal(err) + } + if err := c.Remove(follower.NodeID()); err != nil { + t.Fatal(err) + } + + // sleep to let them become raft + time.Sleep(1 * time.Second) + + // ensure we have 3 raft nodes + for _, s := range c.Stores { + if !s.IsLocal() { + t.Fatalf("node %d is not a local raft instance.", s.NodeID()) + } + } +} + // Store is a test wrapper for meta.Store. type Store struct { *meta.Store @@ -1149,13 +1222,14 @@ func (s *Store) Close() error { // NewConfig returns the default test configuration. func NewConfig(path string) *meta.Config { return &meta.Config{ - Dir: path, - Hostname: "localhost", - BindAddress: "127.0.0.1:0", - HeartbeatTimeout: toml.Duration(500 * time.Millisecond), - ElectionTimeout: toml.Duration(500 * time.Millisecond), - LeaderLeaseTimeout: toml.Duration(500 * time.Millisecond), - CommitTimeout: toml.Duration(5 * time.Millisecond), + Dir: path, + Hostname: "localhost", + BindAddress: "127.0.0.1:0", + HeartbeatTimeout: toml.Duration(500 * time.Millisecond), + ElectionTimeout: toml.Duration(500 * time.Millisecond), + LeaderLeaseTimeout: toml.Duration(500 * time.Millisecond), + CommitTimeout: toml.Duration(5 * time.Millisecond), + RaftPromotionEnabled: true, } } @@ -1210,6 +1284,17 @@ func (c *Cluster) Join() error { return nil } +func (c *Cluster) Remove(nodeID uint64) error { + for i, s := range c.Stores { + if s.NodeID() == nodeID { + // This could hang for a variety of reasons, so don't wait for it + go s.Close() + c.Stores = append(c.Stores[:i], c.Stores[i+1:]...) + } + } + return nil +} + // Open opens and initializes all stores in the cluster. func (c *Cluster) Open() error { if err := func() error {