Support add new raft nodes
This change adds the first 3 nodes to the cluster as raft peers. Other nodes are data-only.pull/3478/head
parent
f5705aebe1
commit
c93e46d569
47
meta/rpc.go
47
meta/rpc.go
|
@ -11,6 +11,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/influxdb/influxdb/meta/internal"
|
||||
)
|
||||
|
||||
|
@ -29,7 +30,7 @@ type rpc struct {
|
|||
cachedData() *Data
|
||||
IsLeader() bool
|
||||
Leader() string
|
||||
Peers() []string
|
||||
Peers() ([]string, error)
|
||||
AddPeer(host string) error
|
||||
CreateNode(host string) (*NodeInfo, error)
|
||||
NodeByHost(host string) (*NodeInfo, error)
|
||||
|
@ -215,26 +216,33 @@ func (r *rpc) handleJoinRequest(req *internal.JoinRequest) (*internal.JoinRespon
|
|||
r.traceCluster("join request from: %v", *req.Addr)
|
||||
|
||||
node, err := func() (*NodeInfo, error) {
|
||||
|
||||
// attempt to create the node
|
||||
node, err := r.store.CreateNode(*req.Addr)
|
||||
// if it exists, return the existing node
|
||||
if err == ErrNodeExists {
|
||||
return r.store.NodeByHost(*req.Addr)
|
||||
node, err = r.store.NodeByHost(*req.Addr)
|
||||
if err != nil {
|
||||
return node, err
|
||||
}
|
||||
r.logger.Printf("existing node re-joined: id=%v addr=%v", node.ID, node.Host)
|
||||
} else if err != nil {
|
||||
return nil, fmt.Errorf("create node: %v", err)
|
||||
}
|
||||
|
||||
// FIXME: jwilder: adding raft nodes is tricky since going
|
||||
// from 1 node (leader) to two kills the cluster because
|
||||
// quorum is lost after adding the second node. For now,
|
||||
// can only add non-raft enabled nodes
|
||||
peers, err := r.store.Peers()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list peers: %v", err)
|
||||
}
|
||||
|
||||
// If we have less than 3 nodes, add them as raft peers
|
||||
// if len(r.store.Peers()) < MaxRaftNodes {
|
||||
// if err = r.store.AddPeer(*req.Addr); err != nil {
|
||||
// return node, fmt.Errorf("add peer: %v", err)
|
||||
// }
|
||||
// }
|
||||
// If we have less than 3 nodes, add them as raft peers if they are not
|
||||
// already a peer
|
||||
if len(peers) < MaxRaftNodes && !raft.PeerContained(peers, *req.Addr) {
|
||||
r.logger.Printf("adding new raft peer: nodeId=%v addr=%v", node.ID, *req.Addr)
|
||||
if err = r.store.AddPeer(*req.Addr); err != nil {
|
||||
return node, fmt.Errorf("add peer: %v", err)
|
||||
}
|
||||
}
|
||||
return node, err
|
||||
}()
|
||||
|
||||
|
@ -247,13 +255,18 @@ func (r *rpc) handleJoinRequest(req *internal.JoinRequest) (*internal.JoinRespon
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// get the current raft peers
|
||||
peers, err := r.store.Peers()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list peers: %v", err)
|
||||
}
|
||||
|
||||
return &internal.JoinResponse{
|
||||
Header: &internal.ResponseHeader{
|
||||
OK: proto.Bool(true),
|
||||
},
|
||||
//EnableRaft: proto.Bool(contains(r.store.Peers(), *req.Addr)),
|
||||
EnableRaft: proto.Bool(false),
|
||||
RaftNodes: r.store.Peers(),
|
||||
EnableRaft: proto.Bool(raft.PeerContained(peers, *req.Addr)),
|
||||
RaftNodes: peers,
|
||||
NodeID: proto.Uint64(nodeID),
|
||||
}, err
|
||||
|
||||
|
@ -355,7 +368,7 @@ func (r *rpc) call(dest string, req proto.Message) (proto.Message, error) {
|
|||
// Create a connection to the leader.
|
||||
conn, err := net.DialTimeout("tcp", dest, leaderDialTimeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("rpc dial: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
|
@ -421,7 +434,7 @@ func (r *rpc) call(dest string, req proto.Message) (proto.Message, error) {
|
|||
|
||||
func (r *rpc) traceCluster(msg string, args ...interface{}) {
|
||||
if r.tracingEnabled {
|
||||
r.logger.Printf("rpc error: "+msg, args...)
|
||||
r.logger.Printf("rpc: "+msg, args...)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -159,7 +159,7 @@ func TestRPCJoin(t *testing.T) {
|
|||
t.Fatalf("failed to join: %v", err)
|
||||
}
|
||||
|
||||
if exp := false; res.RaftEnabled != false {
|
||||
if exp := true; res.RaftEnabled != true {
|
||||
t.Fatalf("raft enabled mismatch: got %v, exp %v", res.RaftEnabled, exp)
|
||||
}
|
||||
|
||||
|
@ -230,7 +230,7 @@ func (f *fakeStore) cachedData() *Data {
|
|||
|
||||
func (f *fakeStore) IsLeader() bool { return true }
|
||||
func (f *fakeStore) Leader() string { return f.leader }
|
||||
func (f *fakeStore) Peers() []string { return []string{f.leader} }
|
||||
func (f *fakeStore) Peers() ([]string, error) { return []string{f.leader}, nil }
|
||||
func (f *fakeStore) AddPeer(host string) error { return nil }
|
||||
func (f *fakeStore) CreateNode(host string) (*NodeInfo, error) {
|
||||
return &NodeInfo{ID: f.newNodeID, Host: host}, nil
|
||||
|
|
|
@ -1,8 +1,11 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"net"
|
||||
"os"
|
||||
|
@ -18,12 +21,14 @@ import (
|
|||
// the meta.Store to change its behavior with the raft layer at runtime.
|
||||
type raftState interface {
|
||||
open() error
|
||||
remove() error
|
||||
initialize() error
|
||||
leader() string
|
||||
isLeader() bool
|
||||
sync(index uint64, timeout time.Duration) error
|
||||
setPeers(addrs []string) error
|
||||
addPeer(addr string) error
|
||||
peers() ([]string, error)
|
||||
invalidate() error
|
||||
close() error
|
||||
lastIndex() uint64
|
||||
|
@ -42,6 +47,19 @@ type localRaft struct {
|
|||
raftLayer *raftLayer
|
||||
}
|
||||
|
||||
func (r *localRaft) remove() error {
|
||||
if err := os.RemoveAll(filepath.Join(r.store.path, "raft.db")); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := os.RemoveAll(filepath.Join(r.store.path, "peers.json")); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := os.RemoveAll(filepath.Join(r.store.path, "snapshots")); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *localRaft) updateMetaData(ms *Data) {
|
||||
if ms == nil {
|
||||
return
|
||||
|
@ -89,11 +107,6 @@ func (r *localRaft) open() error {
|
|||
// If no peers are set in the config then start as a single server.
|
||||
config.EnableSingleNode = (len(s.peers) == 0)
|
||||
|
||||
// Ensure our addr is in the peer list
|
||||
if config.EnableSingleNode {
|
||||
s.peers = append(s.peers, s.Addr.String())
|
||||
}
|
||||
|
||||
// Build raft layer to multiplex listener.
|
||||
r.raftLayer = newRaftLayer(s.RaftListener, s.Addr)
|
||||
|
||||
|
@ -246,6 +259,10 @@ func (r *localRaft) setPeers(addrs []string) error {
|
|||
return r.raft.SetPeers(a).Error()
|
||||
}
|
||||
|
||||
func (r *localRaft) peers() ([]string, error) {
|
||||
return r.peerStore.Peers()
|
||||
}
|
||||
|
||||
func (r *localRaft) leader() string {
|
||||
if r.raft == nil {
|
||||
return ""
|
||||
|
@ -269,6 +286,10 @@ type remoteRaft struct {
|
|||
store *Store
|
||||
}
|
||||
|
||||
func (r *remoteRaft) remove() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *remoteRaft) updateMetaData(ms *Data) {
|
||||
if ms == nil {
|
||||
return
|
||||
|
@ -300,7 +321,15 @@ func (r *remoteRaft) invalidate() error {
|
|||
}
|
||||
|
||||
func (r *remoteRaft) setPeers(addrs []string) error {
|
||||
return nil
|
||||
// Convert to JSON
|
||||
var buf bytes.Buffer
|
||||
enc := json.NewEncoder(&buf)
|
||||
if err := enc.Encode(addrs); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Write out as JSON
|
||||
return ioutil.WriteFile(filepath.Join(r.store.path, "peers.json"), buf.Bytes(), 0755)
|
||||
}
|
||||
|
||||
// addPeer adds addr to the list of peers in the cluster.
|
||||
|
@ -308,7 +337,15 @@ func (r *remoteRaft) addPeer(addr string) error {
|
|||
return fmt.Errorf("cannot add peer using remote raft")
|
||||
}
|
||||
|
||||
func (r *remoteRaft) peers() ([]string, error) {
|
||||
return readPeersJSON(filepath.Join(r.store.path, "peers.json"))
|
||||
}
|
||||
|
||||
func (r *remoteRaft) open() error {
|
||||
if err := r.setPeers(r.store.peers); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
|
@ -366,3 +403,25 @@ func (r *remoteRaft) sync(index uint64, timeout time.Duration) error {
|
|||
func (r *remoteRaft) snapshot() error {
|
||||
return fmt.Errorf("cannot snapshot while in remote raft state")
|
||||
}
|
||||
|
||||
func readPeersJSON(path string) ([]string, error) {
|
||||
// Read the file
|
||||
buf, err := ioutil.ReadFile(path)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Check for no peers
|
||||
if len(buf) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Decode the peers
|
||||
var peers []string
|
||||
dec := json.NewDecoder(bytes.NewReader(buf))
|
||||
if err := dec.Decode(&peers); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return peers, nil
|
||||
}
|
||||
|
|
171
meta/store.go
171
meta/store.go
|
@ -171,41 +171,6 @@ func (s *Store) IDPath() string { return filepath.Join(s.path, "id") }
|
|||
|
||||
// Open opens and initializes the raft store.
|
||||
func (s *Store) Open() error {
|
||||
// If we have a join addr, attempt to join
|
||||
if s.join != "" {
|
||||
|
||||
joined := false
|
||||
for _, join := range strings.Split(s.join, ",") {
|
||||
res, err := s.rpc.join(s.Addr.String(), join)
|
||||
if err != nil {
|
||||
s.Logger.Printf("join failed: %v", err)
|
||||
continue
|
||||
}
|
||||
joined = true
|
||||
|
||||
s.Logger.Printf("joined remote node %v", join)
|
||||
s.Logger.Printf("raftEnabled=%v raftNodes=%v", res.RaftEnabled, res.RaftNodes)
|
||||
|
||||
s.peers = res.RaftNodes
|
||||
s.id = res.NodeID
|
||||
|
||||
if err := s.writeNodeID(res.NodeID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !res.RaftEnabled {
|
||||
s.raftState = &remoteRaft{s}
|
||||
if err := s.invalidate(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !joined {
|
||||
return fmt.Errorf("failed to join existing cluster at %v", s.join)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify that no more than 3 peers.
|
||||
// https://github.com/influxdb/influxdb/issues/2750
|
||||
if len(s.peers) > MaxRaftNodes {
|
||||
|
@ -231,6 +196,11 @@ func (s *Store) Open() error {
|
|||
}
|
||||
s.opened = true
|
||||
|
||||
// load our raft state
|
||||
if err := s.loadState(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create the root directory if it doesn't already exist.
|
||||
if err := s.createRootDir(); err != nil {
|
||||
return fmt.Errorf("mkdir all: %s", err)
|
||||
|
@ -264,6 +234,11 @@ func (s *Store) Open() error {
|
|||
s.wg.Add(1)
|
||||
go s.serveRPCListener()
|
||||
|
||||
// Join an existing cluster if we needed
|
||||
if err := s.joinCluster(); err != nil {
|
||||
return fmt.Errorf("join: %v", err)
|
||||
}
|
||||
|
||||
// If the ID doesn't exist then create a new node.
|
||||
if s.id == 0 {
|
||||
go s.init()
|
||||
|
@ -274,6 +249,121 @@ func (s *Store) Open() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// loadState sets the appropriate raftState from our persistent storage
|
||||
func (s *Store) loadState() error {
|
||||
peers, err := readPeersJSON(filepath.Join(s.path, "peers.json"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// If we have existing peers, use those. This will override what's in the
|
||||
// config.
|
||||
if len(peers) > 0 {
|
||||
s.peers = peers
|
||||
}
|
||||
|
||||
// if no peers on disk, we need to start raft in order to initialize a new
|
||||
// cluster or join an existing one.
|
||||
if len(peers) == 0 {
|
||||
s.raftState = &localRaft{store: s}
|
||||
// if we have a raft database, (maybe restored), we should start raft locally
|
||||
} else if _, err := os.Stat(filepath.Join(s.path, "raft.db")); err == nil {
|
||||
s.raftState = &localRaft{store: s}
|
||||
// otherwise, we should use remote raft
|
||||
} else {
|
||||
s.raftState = &remoteRaft{store: s}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) joinCluster() error {
|
||||
// No join options, so nothing to do
|
||||
if s.join == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
// We already have a node ID so were already part of a cluster,
|
||||
// don't join again so we can use our existing state.
|
||||
if s.id != 0 {
|
||||
s.Logger.Printf("skipping join: already member of cluster: nodeId=%v raftEnabled=%v raftNodes=%v",
|
||||
s.id, raft.PeerContained(s.peers, s.Addr.String()), s.peers)
|
||||
return nil
|
||||
}
|
||||
|
||||
s.Logger.Printf("joining cluster at: %v", s.join)
|
||||
for {
|
||||
for _, join := range strings.Split(s.join, ",") {
|
||||
res, err := s.rpc.join(s.Addr.String(), join)
|
||||
if err != nil {
|
||||
s.Logger.Printf("join failed: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
s.Logger.Printf("joined remote node %v", join)
|
||||
s.Logger.Printf("nodeId=%v raftEnabled=%v raftNodes=%v", res.NodeID, res.RaftEnabled, res.RaftNodes)
|
||||
|
||||
s.peers = res.RaftNodes
|
||||
s.id = res.NodeID
|
||||
|
||||
if err := s.writeNodeID(res.NodeID); err != nil {
|
||||
s.Logger.Printf("write node id failed: %v", err)
|
||||
break
|
||||
}
|
||||
|
||||
if !res.RaftEnabled {
|
||||
// Shutdown our local raft and transition to a remote raft state
|
||||
if err := s.enableRemoteRaft(); err != nil {
|
||||
s.Logger.Printf("enable remote raft failed: %v", err)
|
||||
break
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
s.Logger.Printf("join failed: retrying...")
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Store) enableLocalRaft() error {
|
||||
if _, ok := s.raftState.(*localRaft); ok {
|
||||
return nil
|
||||
}
|
||||
s.Logger.Printf("switching to local raft")
|
||||
|
||||
lr := &localRaft{store: s}
|
||||
return s.changeState(lr)
|
||||
}
|
||||
|
||||
func (s *Store) enableRemoteRaft() error {
|
||||
if _, ok := s.raftState.(*remoteRaft); ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
s.Logger.Printf("switching to remote raft")
|
||||
rr := &remoteRaft{store: s}
|
||||
return s.changeState(rr)
|
||||
}
|
||||
|
||||
func (s *Store) changeState(state raftState) error {
|
||||
if err := s.raftState.close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Clear out any persistent state
|
||||
if err := s.raftState.remove(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.raftState = state
|
||||
|
||||
if err := s.raftState.open(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// openRaft initializes the raft store.
|
||||
func (s *Store) openRaft() error {
|
||||
return s.raftState.open()
|
||||
|
@ -404,10 +494,6 @@ func (s *Store) Snapshot() error {
|
|||
// WaitForLeader sleeps until a leader is found or a timeout occurs.
|
||||
// timeout == 0 means to wait forever.
|
||||
func (s *Store) WaitForLeader(timeout time.Duration) error {
|
||||
if s.Leader() != "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Begin timeout timer.
|
||||
timer := time.NewTimer(timeout)
|
||||
defer timer.Stop()
|
||||
|
@ -452,6 +538,9 @@ func (s *Store) IsLeader() bool {
|
|||
func (s *Store) Leader() string {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
if s.raftState == nil {
|
||||
return ""
|
||||
}
|
||||
return s.raftState.leader()
|
||||
}
|
||||
|
||||
|
@ -466,10 +555,10 @@ func (s *Store) AddPeer(addr string) error {
|
|||
}
|
||||
|
||||
// Peers returns the list of peers in the cluster.
|
||||
func (s *Store) Peers() []string {
|
||||
func (s *Store) Peers() ([]string, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.peers
|
||||
return s.raftState.peers()
|
||||
}
|
||||
|
||||
// serveExecListener processes remote exec connections.
|
||||
|
|
Loading…
Reference in New Issue