diff --git a/meta/rpc.go b/meta/rpc.go index 2e8fad99af..c8c01165df 100644 --- a/meta/rpc.go +++ b/meta/rpc.go @@ -11,6 +11,7 @@ import ( "time" "github.com/gogo/protobuf/proto" + "github.com/hashicorp/raft" "github.com/influxdb/influxdb/meta/internal" ) @@ -29,7 +30,7 @@ type rpc struct { cachedData() *Data IsLeader() bool Leader() string - Peers() []string + Peers() ([]string, error) AddPeer(host string) error CreateNode(host string) (*NodeInfo, error) NodeByHost(host string) (*NodeInfo, error) @@ -215,26 +216,33 @@ func (r *rpc) handleJoinRequest(req *internal.JoinRequest) (*internal.JoinRespon r.traceCluster("join request from: %v", *req.Addr) node, err := func() (*NodeInfo, error) { + // attempt to create the node node, err := r.store.CreateNode(*req.Addr) // if it exists, return the existing node if err == ErrNodeExists { - return r.store.NodeByHost(*req.Addr) + node, err = r.store.NodeByHost(*req.Addr) + if err != nil { + return node, err + } + r.logger.Printf("existing node re-joined: id=%v addr=%v", node.ID, node.Host) } else if err != nil { return nil, fmt.Errorf("create node: %v", err) } - // FIXME: jwilder: adding raft nodes is tricky since going - // from 1 node (leader) to two kills the cluster because - // quorum is lost after adding the second node. For now, - // can only add non-raft enabled nodes + peers, err := r.store.Peers() + if err != nil { + return nil, fmt.Errorf("list peers: %v", err) + } - // If we have less than 3 nodes, add them as raft peers - // if len(r.store.Peers()) < MaxRaftNodes { - // if err = r.store.AddPeer(*req.Addr); err != nil { - // return node, fmt.Errorf("add peer: %v", err) - // } - // } + // If we have less than 3 nodes, add them as raft peers if they are not + // already a peer + if len(peers) < MaxRaftNodes && !raft.PeerContained(peers, *req.Addr) { + r.logger.Printf("adding new raft peer: nodeId=%v addr=%v", node.ID, *req.Addr) + if err = r.store.AddPeer(*req.Addr); err != nil { + return node, fmt.Errorf("add peer: %v", err) + } + } return node, err }() @@ -247,13 +255,18 @@ func (r *rpc) handleJoinRequest(req *internal.JoinRequest) (*internal.JoinRespon return nil, err } + // get the current raft peers + peers, err := r.store.Peers() + if err != nil { + return nil, fmt.Errorf("list peers: %v", err) + } + return &internal.JoinResponse{ Header: &internal.ResponseHeader{ OK: proto.Bool(true), }, - //EnableRaft: proto.Bool(contains(r.store.Peers(), *req.Addr)), - EnableRaft: proto.Bool(false), - RaftNodes: r.store.Peers(), + EnableRaft: proto.Bool(raft.PeerContained(peers, *req.Addr)), + RaftNodes: peers, NodeID: proto.Uint64(nodeID), }, err @@ -355,7 +368,7 @@ func (r *rpc) call(dest string, req proto.Message) (proto.Message, error) { // Create a connection to the leader. conn, err := net.DialTimeout("tcp", dest, leaderDialTimeout) if err != nil { - return nil, err + return nil, fmt.Errorf("rpc dial: %v", err) } defer conn.Close() @@ -421,7 +434,7 @@ func (r *rpc) call(dest string, req proto.Message) (proto.Message, error) { func (r *rpc) traceCluster(msg string, args ...interface{}) { if r.tracingEnabled { - r.logger.Printf("rpc error: "+msg, args...) + r.logger.Printf("rpc: "+msg, args...) } } diff --git a/meta/rpc_test.go b/meta/rpc_test.go index 2fb5886950..3f60c6bd05 100644 --- a/meta/rpc_test.go +++ b/meta/rpc_test.go @@ -159,7 +159,7 @@ func TestRPCJoin(t *testing.T) { t.Fatalf("failed to join: %v", err) } - if exp := false; res.RaftEnabled != false { + if exp := true; res.RaftEnabled != true { t.Fatalf("raft enabled mismatch: got %v, exp %v", res.RaftEnabled, exp) } @@ -230,7 +230,7 @@ func (f *fakeStore) cachedData() *Data { func (f *fakeStore) IsLeader() bool { return true } func (f *fakeStore) Leader() string { return f.leader } -func (f *fakeStore) Peers() []string { return []string{f.leader} } +func (f *fakeStore) Peers() ([]string, error) { return []string{f.leader}, nil } func (f *fakeStore) AddPeer(host string) error { return nil } func (f *fakeStore) CreateNode(host string) (*NodeInfo, error) { return &NodeInfo{ID: f.newNodeID, Host: host}, nil diff --git a/meta/state.go b/meta/state.go index dc37edef26..732ca355b0 100644 --- a/meta/state.go +++ b/meta/state.go @@ -1,8 +1,11 @@ package meta import ( + "bytes" + "encoding/json" "errors" "fmt" + "io/ioutil" "math/rand" "net" "os" @@ -18,12 +21,14 @@ import ( // the meta.Store to change its behavior with the raft layer at runtime. type raftState interface { open() error + remove() error initialize() error leader() string isLeader() bool sync(index uint64, timeout time.Duration) error setPeers(addrs []string) error addPeer(addr string) error + peers() ([]string, error) invalidate() error close() error lastIndex() uint64 @@ -42,6 +47,19 @@ type localRaft struct { raftLayer *raftLayer } +func (r *localRaft) remove() error { + if err := os.RemoveAll(filepath.Join(r.store.path, "raft.db")); err != nil { + return err + } + if err := os.RemoveAll(filepath.Join(r.store.path, "peers.json")); err != nil { + return err + } + if err := os.RemoveAll(filepath.Join(r.store.path, "snapshots")); err != nil { + return err + } + return nil +} + func (r *localRaft) updateMetaData(ms *Data) { if ms == nil { return @@ -89,11 +107,6 @@ func (r *localRaft) open() error { // If no peers are set in the config then start as a single server. config.EnableSingleNode = (len(s.peers) == 0) - // Ensure our addr is in the peer list - if config.EnableSingleNode { - s.peers = append(s.peers, s.Addr.String()) - } - // Build raft layer to multiplex listener. r.raftLayer = newRaftLayer(s.RaftListener, s.Addr) @@ -246,6 +259,10 @@ func (r *localRaft) setPeers(addrs []string) error { return r.raft.SetPeers(a).Error() } +func (r *localRaft) peers() ([]string, error) { + return r.peerStore.Peers() +} + func (r *localRaft) leader() string { if r.raft == nil { return "" @@ -269,6 +286,10 @@ type remoteRaft struct { store *Store } +func (r *remoteRaft) remove() error { + return nil +} + func (r *remoteRaft) updateMetaData(ms *Data) { if ms == nil { return @@ -300,7 +321,15 @@ func (r *remoteRaft) invalidate() error { } func (r *remoteRaft) setPeers(addrs []string) error { - return nil + // Convert to JSON + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + if err := enc.Encode(addrs); err != nil { + return err + } + + // Write out as JSON + return ioutil.WriteFile(filepath.Join(r.store.path, "peers.json"), buf.Bytes(), 0755) } // addPeer adds addr to the list of peers in the cluster. @@ -308,7 +337,15 @@ func (r *remoteRaft) addPeer(addr string) error { return fmt.Errorf("cannot add peer using remote raft") } +func (r *remoteRaft) peers() ([]string, error) { + return readPeersJSON(filepath.Join(r.store.path, "peers.json")) +} + func (r *remoteRaft) open() error { + if err := r.setPeers(r.store.peers); err != nil { + return err + } + go func() { for { select { @@ -366,3 +403,25 @@ func (r *remoteRaft) sync(index uint64, timeout time.Duration) error { func (r *remoteRaft) snapshot() error { return fmt.Errorf("cannot snapshot while in remote raft state") } + +func readPeersJSON(path string) ([]string, error) { + // Read the file + buf, err := ioutil.ReadFile(path) + if err != nil && !os.IsNotExist(err) { + return nil, err + } + + // Check for no peers + if len(buf) == 0 { + return nil, nil + } + + // Decode the peers + var peers []string + dec := json.NewDecoder(bytes.NewReader(buf)) + if err := dec.Decode(&peers); err != nil { + return nil, err + } + + return peers, nil +} diff --git a/meta/store.go b/meta/store.go index 034ebd9d5b..5abd072dd3 100644 --- a/meta/store.go +++ b/meta/store.go @@ -171,41 +171,6 @@ func (s *Store) IDPath() string { return filepath.Join(s.path, "id") } // Open opens and initializes the raft store. func (s *Store) Open() error { - // If we have a join addr, attempt to join - if s.join != "" { - - joined := false - for _, join := range strings.Split(s.join, ",") { - res, err := s.rpc.join(s.Addr.String(), join) - if err != nil { - s.Logger.Printf("join failed: %v", err) - continue - } - joined = true - - s.Logger.Printf("joined remote node %v", join) - s.Logger.Printf("raftEnabled=%v raftNodes=%v", res.RaftEnabled, res.RaftNodes) - - s.peers = res.RaftNodes - s.id = res.NodeID - - if err := s.writeNodeID(res.NodeID); err != nil { - return err - } - - if !res.RaftEnabled { - s.raftState = &remoteRaft{s} - if err := s.invalidate(); err != nil { - return err - } - } - } - - if !joined { - return fmt.Errorf("failed to join existing cluster at %v", s.join) - } - } - // Verify that no more than 3 peers. // https://github.com/influxdb/influxdb/issues/2750 if len(s.peers) > MaxRaftNodes { @@ -231,6 +196,11 @@ func (s *Store) Open() error { } s.opened = true + // load our raft state + if err := s.loadState(); err != nil { + return err + } + // Create the root directory if it doesn't already exist. if err := s.createRootDir(); err != nil { return fmt.Errorf("mkdir all: %s", err) @@ -264,6 +234,11 @@ func (s *Store) Open() error { s.wg.Add(1) go s.serveRPCListener() + // Join an existing cluster if we needed + if err := s.joinCluster(); err != nil { + return fmt.Errorf("join: %v", err) + } + // If the ID doesn't exist then create a new node. if s.id == 0 { go s.init() @@ -274,6 +249,121 @@ func (s *Store) Open() error { return nil } +// loadState sets the appropriate raftState from our persistent storage +func (s *Store) loadState() error { + peers, err := readPeersJSON(filepath.Join(s.path, "peers.json")) + if err != nil { + return err + } + + // If we have existing peers, use those. This will override what's in the + // config. + if len(peers) > 0 { + s.peers = peers + } + + // if no peers on disk, we need to start raft in order to initialize a new + // cluster or join an existing one. + if len(peers) == 0 { + s.raftState = &localRaft{store: s} + // if we have a raft database, (maybe restored), we should start raft locally + } else if _, err := os.Stat(filepath.Join(s.path, "raft.db")); err == nil { + s.raftState = &localRaft{store: s} + // otherwise, we should use remote raft + } else { + s.raftState = &remoteRaft{store: s} + } + return nil +} + +func (s *Store) joinCluster() error { + // No join options, so nothing to do + if s.join == "" { + return nil + } + + // We already have a node ID so were already part of a cluster, + // don't join again so we can use our existing state. + if s.id != 0 { + s.Logger.Printf("skipping join: already member of cluster: nodeId=%v raftEnabled=%v raftNodes=%v", + s.id, raft.PeerContained(s.peers, s.Addr.String()), s.peers) + return nil + } + + s.Logger.Printf("joining cluster at: %v", s.join) + for { + for _, join := range strings.Split(s.join, ",") { + res, err := s.rpc.join(s.Addr.String(), join) + if err != nil { + s.Logger.Printf("join failed: %v", err) + continue + } + + s.Logger.Printf("joined remote node %v", join) + s.Logger.Printf("nodeId=%v raftEnabled=%v raftNodes=%v", res.NodeID, res.RaftEnabled, res.RaftNodes) + + s.peers = res.RaftNodes + s.id = res.NodeID + + if err := s.writeNodeID(res.NodeID); err != nil { + s.Logger.Printf("write node id failed: %v", err) + break + } + + if !res.RaftEnabled { + // Shutdown our local raft and transition to a remote raft state + if err := s.enableRemoteRaft(); err != nil { + s.Logger.Printf("enable remote raft failed: %v", err) + break + } + } + return nil + } + + s.Logger.Printf("join failed: retrying...") + time.Sleep(time.Second) + } +} + +func (s *Store) enableLocalRaft() error { + if _, ok := s.raftState.(*localRaft); ok { + return nil + } + s.Logger.Printf("switching to local raft") + + lr := &localRaft{store: s} + return s.changeState(lr) +} + +func (s *Store) enableRemoteRaft() error { + if _, ok := s.raftState.(*remoteRaft); ok { + return nil + } + + s.Logger.Printf("switching to remote raft") + rr := &remoteRaft{store: s} + return s.changeState(rr) +} + +func (s *Store) changeState(state raftState) error { + if err := s.raftState.close(); err != nil { + return err + } + + // Clear out any persistent state + if err := s.raftState.remove(); err != nil { + return err + } + + s.raftState = state + + if err := s.raftState.open(); err != nil { + return err + } + + return nil +} + // openRaft initializes the raft store. func (s *Store) openRaft() error { return s.raftState.open() @@ -404,10 +494,6 @@ func (s *Store) Snapshot() error { // WaitForLeader sleeps until a leader is found or a timeout occurs. // timeout == 0 means to wait forever. func (s *Store) WaitForLeader(timeout time.Duration) error { - if s.Leader() != "" { - return nil - } - // Begin timeout timer. timer := time.NewTimer(timeout) defer timer.Stop() @@ -452,6 +538,9 @@ func (s *Store) IsLeader() bool { func (s *Store) Leader() string { s.mu.RLock() defer s.mu.RUnlock() + if s.raftState == nil { + return "" + } return s.raftState.leader() } @@ -466,10 +555,10 @@ func (s *Store) AddPeer(addr string) error { } // Peers returns the list of peers in the cluster. -func (s *Store) Peers() []string { +func (s *Store) Peers() ([]string, error) { s.mu.RLock() defer s.mu.RUnlock() - return s.peers + return s.raftState.peers() } // serveExecListener processes remote exec connections.