diff --git a/meta/internal/meta.pb.go b/meta/internal/meta.pb.go index c82403880f..759a20f567 100644 --- a/meta/internal/meta.pb.go +++ b/meta/internal/meta.pb.go @@ -1255,12 +1255,15 @@ func (m *JoinRequest) GetAddr() string { } type JoinResponse struct { - Header *ResponseHeader `protobuf:"bytes,1,req" json:"Header,omitempty"` - EnableRaft *bool `protobuf:"varint,2,opt" json:"EnableRaft,omitempty"` + Header *ResponseHeader `protobuf:"bytes,1,req" json:"Header,omitempty"` + // Indicates that this node should take part in the raft cluster. + EnableRaft *bool `protobuf:"varint,2,opt" json:"EnableRaft,omitempty"` // The addresses of raft peers to use if joining as a raft member. If not joining // as a raft member, these are the nodes running raft. - Peers []string `protobuf:"bytes,3,rep" json:"Peers,omitempty"` - XXX_unrecognized []byte `json:"-"` + Peers []string `protobuf:"bytes,3,rep" json:"Peers,omitempty"` + // The node ID assigned to the requesting node. + NodeID *uint64 `protobuf:"varint,4,opt" json:"NodeID,omitempty"` + XXX_unrecognized []byte `json:"-"` } func (m *JoinResponse) Reset() { *m = JoinResponse{} } @@ -1288,6 +1291,13 @@ func (m *JoinResponse) GetPeers() []string { return nil } +func (m *JoinResponse) GetNodeID() uint64 { + if m != nil && m.NodeID != nil { + return *m.NodeID + } + return 0 +} + func init() { proto.RegisterEnum("internal.RPCType", RPCType_name, RPCType_value) proto.RegisterEnum("internal.Command_Type", Command_Type_name, Command_Type_value) diff --git a/meta/internal/meta.proto b/meta/internal/meta.proto index cbf3e72bce..22ac8589f0 100644 --- a/meta/internal/meta.proto +++ b/meta/internal/meta.proto @@ -290,8 +290,12 @@ message JoinResponse { required ResponseHeader Header = 1; + // Indicates that this node should take part in the raft cluster. optional bool EnableRaft = 2; // The addresses of raft peers to use if joining as a raft member. If not joining // as a raft member, these are the nodes running raft. repeated string Peers = 3; + + // The node ID assigned to the requesting node. + optional uint64 NodeID = 4; } diff --git a/meta/rpc.go b/meta/rpc.go index 97803fcabb..bb8c1cb3eb 100644 --- a/meta/rpc.go +++ b/meta/rpc.go @@ -23,12 +23,15 @@ type RPC struct { Leader() string Peers() []string AddPeer(host string) error + CreateNode(host string) (*NodeInfo, error) + NodeByHost(host string) (*NodeInfo, error) } } type JoinResult struct { RaftEnabled bool Peers []string + NodeID uint64 } type Reply interface { @@ -138,17 +141,44 @@ func (r *RPC) handleFetchData() (*internal.FetchDataResponse, error) { // handleJoinRequest handles a request to join the cluster func (r *RPC) handleJoinRequest(req *internal.JoinRequest) (*internal.JoinResponse, error) { - if err := r.store.AddPeer(*req.Addr); err != nil { - r.Logger.Printf("join request failed: %v", err) + + node, err := func() (*NodeInfo, error) { + // attempt to create the node + node, err := r.store.CreateNode(*req.Addr) + // if it exists, return the exting node + if err == ErrNodeExists { + return r.store.NodeByHost(*req.Addr) + } else if err != nil { + return nil, fmt.Errorf("create node: %v", err) + } + + // If we have less than 3 nodes, add them as raft peers + if len(r.store.Peers()) < MaxRaftNodes { + if err = r.store.AddPeer(*req.Addr); err != nil { + return node, fmt.Errorf("add peer: %v", err) + } + } + return node, err + }() + + nodeID := uint64(0) + if node != nil { + nodeID = node.ID } + + if err != nil { + r.Logger.Printf("join request failed: create node: %v", err) + } + r.Logger.Printf("recv join request from: %v", *req.Addr) return &internal.JoinResponse{ Header: &internal.ResponseHeader{ OK: proto.Bool(true), }, - EnableRaft: proto.Bool(false), + EnableRaft: proto.Bool(contains(r.store.Peers(), *req.Addr)), Peers: r.store.Peers(), - }, nil + NodeID: proto.Uint64(nodeID), + }, err } @@ -219,6 +249,7 @@ func (r *RPC) join(localAddr, remoteAddr string) (*JoinResult, error) { return &JoinResult{ RaftEnabled: resp.GetEnableRaft(), Peers: resp.GetPeers(), + NodeID: resp.GetNodeID(), }, nil } @@ -267,3 +298,12 @@ func u64tob(v uint64) []byte { func btou64(b []byte) uint64 { return binary.BigEndian.Uint64(b) } + +func contains(s []string, e string) bool { + for _, a := range s { + if a == e { + return true + } + } + return false +} diff --git a/meta/store.go b/meta/store.go index d18a110de2..9a2d9a2e94 100644 --- a/meta/store.go +++ b/meta/store.go @@ -54,6 +54,7 @@ const ( raftSnapshotsRetained = 2 raftTransportMaxPool = 3 raftTransportTimeout = 10 * time.Second + MaxRaftNodes = 3 ) // Store represents a raft-backed metastore. @@ -326,6 +327,12 @@ func (s *Store) Open() error { s.Logger.Printf("Joined remote node %v", s.join) s.Logger.Printf("raftEnabled=%v peers=%v", res.RaftEnabled, res.Peers) s.peers = res.Peers + s.id = res.NodeID + + if err := s.writeNodeID(res.NodeID); err != nil { + return err + } + if !res.RaftEnabled { s.raftState = &remoteRaft{s} if err := s.invalidate(); err != nil { @@ -336,7 +343,7 @@ func (s *Store) Open() error { // Verify that no more than 3 peers. // https://github.com/influxdb/influxdb/issues/2750 - if len(s.peers) > 3 { + if len(s.peers) > MaxRaftNodes { return ErrTooManyPeers } @@ -360,7 +367,7 @@ func (s *Store) Open() error { s.opened = true // Create the root directory if it doesn't already exist. - if err := os.MkdirAll(s.path, 0777); err != nil { + if err := s.createRootDir(); err != nil { return fmt.Errorf("mkdir all: %s", err) } @@ -497,7 +504,7 @@ func (s *Store) createLocalNode() error { } // Write node id to file. - if err := ioutil.WriteFile(s.IDPath(), []byte(strconv.FormatUint(ni.ID, 10)), 0666); err != nil { + if err := s.writeNodeID(ni.ID); err != nil { return fmt.Errorf("write file: %s", err) } @@ -509,6 +516,17 @@ func (s *Store) createLocalNode() error { return nil } +func (s *Store) createRootDir() error { + return os.MkdirAll(s.path, 0777) +} + +func (s *Store) writeNodeID(id uint64) error { + if err := s.createRootDir(); err != nil { + return err + } + return ioutil.WriteFile(s.IDPath(), []byte(strconv.FormatUint(id, 10)), 0666) +} + // Snapshot saves a snapshot of the current state. func (s *Store) Snapshot() error { future := s.raft.Snapshot()