diff --git a/http_transporter.go b/http_transporter.go new file mode 100644 index 0000000000..3fa84d2c8a --- /dev/null +++ b/http_transporter.go @@ -0,0 +1,183 @@ +package raft + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" +) + +// Parts from this transporter were heavily influenced by Peter Bougon's +// raft implementation: https://github.com/peterbourgon/raft + +//------------------------------------------------------------------------------ +// +// Typedefs +// +//------------------------------------------------------------------------------ + +// An HTTPTransporter is a default transport layer used to communicate between +// multiple servers. +type HTTPTransporter struct { + DisableKeepAlives bool + prefix string + appendEntriesPath string + requestVotePath string +} + +type HTTPMuxer interface { + HandleFunc(string, func(http.ResponseWriter, *http.Request)) +} + +//------------------------------------------------------------------------------ +// +// Constructor +// +//------------------------------------------------------------------------------ + +// Creates a new HTTP transporter with the given path prefix. +func NewHTTPTransporter(prefix string) *HTTPTransporter { + return &HTTPTransporter{ + prefix: prefix, + appendEntriesPath: fmt.Sprintf("%s%s", prefix, "/appendEntries"), + requestVotePath: fmt.Sprintf("%s%s", prefix, "/requestVote"), + } +} + +//------------------------------------------------------------------------------ +// +// Accessors +// +//------------------------------------------------------------------------------ + +// Retrieves the path prefix used by the transporter. +func (t *HTTPTransporter) Prefix() string { + return t.prefix +} + +// Retrieves the AppendEntries path. +func (t *HTTPTransporter) AppendEntriesPath() string { + return t.appendEntriesPath +} + +// Retrieves the RequestVote path. +func (t *HTTPTransporter) RequestVotePath() string { + return t.requestVotePath +} + +//------------------------------------------------------------------------------ +// +// Methods +// +//------------------------------------------------------------------------------ + +//-------------------------------------- +// Installation +//-------------------------------------- + +// Applies Raft routes to an HTTP router for a given server. +func (t *HTTPTransporter) Install(server *Server, mux HTTPMuxer) { + mux.HandleFunc(t.AppendEntriesPath(), t.appendEntriesHandler(server)) + mux.HandleFunc(t.RequestVotePath(), t.requestVoteHandler(server)) +} + +//-------------------------------------- +// Outgoing +//-------------------------------------- + +// Sends an AppendEntries RPC to a peer. +func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse { + var b bytes.Buffer + json.NewEncoder(&b).Encode(req) + + url := fmt.Sprintf("http://%s%s", peer.Name(), t.AppendEntriesPath()) + traceln(server.Name(), "POST", url) + + client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}} + httpResp, err := client.Post(url, "application/json", &b) + if httpResp == nil || err != nil { + return nil + } + defer httpResp.Body.Close() + + resp := &AppendEntriesResponse{} + if err = json.NewDecoder(httpResp.Body).Decode(&resp); err != nil && err != io.EOF { + return nil + } + + return resp +} + +// Sends a RequestVote RPC to a peer. +func (t *HTTPTransporter) SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse { + var b bytes.Buffer + json.NewEncoder(&b).Encode(req) + + url := fmt.Sprintf("http://%s%s", peer.Name(), t.RequestVotePath()) + traceln(server.Name(), "POST", url) + + client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}} + httpResp, err := client.Post(url, "application/json", &b) + if httpResp == nil || err != nil { + return nil + } + defer httpResp.Body.Close() + + resp := &RequestVoteResponse{} + if err = json.NewDecoder(httpResp.Body).Decode(&resp); err != nil && err != io.EOF { + return nil + } + + return resp +} + +// Sends a SnapshotRequest RPC to a peer. +func (t *HTTPTransporter) SendSnapshotRequest(server *Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse { + // TODO + return nil +} + +//-------------------------------------- +// Incoming +//-------------------------------------- + +// Handles incoming AppendEntries requests. +func (t *HTTPTransporter) appendEntriesHandler(server *Server) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + traceln(server.Name(), "RECV /appendEntries") + + defer r.Body.Close() + req := &AppendEntriesRequest{} + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "", http.StatusBadRequest) + return + } + + resp := server.AppendEntries(req) + if err := json.NewEncoder(w).Encode(resp); err != nil { + http.Error(w, "", http.StatusInternalServerError) + return + } + } +} + +// Handles incoming RequestVote requests. +func (t *HTTPTransporter) requestVoteHandler(server *Server) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + traceln(server.Name(), "RECV /requestVote") + + defer r.Body.Close() + req := &RequestVoteRequest{} + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "", http.StatusBadRequest) + return + } + + resp := server.RequestVote(req) + if err := json.NewEncoder(w).Encode(resp); err != nil { + http.Error(w, "", http.StatusInternalServerError) + return + } + } +} diff --git a/http_transporter_test.go b/http_transporter_test.go new file mode 100644 index 0000000000..ca349c1fed --- /dev/null +++ b/http_transporter_test.go @@ -0,0 +1,111 @@ +package raft + +import ( + "fmt" + "net" + "net/http" + "sync" + "testing" + "time" +) + +//------------------------------------------------------------------------------ +// +// Tests +// +//------------------------------------------------------------------------------ + +//-------------------------------------- +// Membership +//-------------------------------------- + +// Ensure that we can start several servers and have them communicate. +func TestHTTPTransporter(t *testing.T) { + transporter := NewHTTPTransporter("/raft") + transporter.DisableKeepAlives = true + + servers := []*Server{} + f0 := func(server *Server, httpServer *http.Server) { + // Stop the leader and wait for an election. + server.Stop() + time.Sleep(testElectionTimeout * 2) + + if servers[1].State() != Leader && servers[2].State() != Leader { + t.Fatal("Expected re-election:", servers[1].State(), servers[2].State()) + } + server.Initialize() + server.StartFollower() + } + f1 := func(server *Server, httpServer *http.Server) { + } + f2 := func(server *Server, httpServer *http.Server) { + } + runTestHttpServers(t, &servers, transporter, f0, f1, f2) +} + +//------------------------------------------------------------------------------ +// +// Helper Functions +// +//------------------------------------------------------------------------------ + +// Starts multiple independent Raft servers wrapped with HTTP servers. +func runTestHttpServers(t *testing.T, servers *[]*Server, transporter *HTTPTransporter, callbacks ...func(*Server, *http.Server)) { + var wg sync.WaitGroup + httpServers := []*http.Server{} + listeners := []net.Listener{} + for i, _ := range callbacks { + wg.Add(1) + port := 9000 + i + + // Create raft server. + server := newTestServer(fmt.Sprintf("localhost:%d", port), transporter) + server.SetHeartbeatTimeout(testHeartbeatTimeout) + server.SetElectionTimeout(testElectionTimeout) + server.Initialize() + if i == 0 { + server.StartLeader() + } else { + server.StartFollower() + } + defer server.Stop() + *servers = append(*servers, server) + + // Create listener for HTTP server and start it. + listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + panic(err) + } + defer listener.Close() + listeners = append(listeners, listener) + + // Create wrapping HTTP server. + mux := http.NewServeMux() + transporter.Install(server, mux) + httpServer := &http.Server{Addr: fmt.Sprintf(":%d", port), Handler: mux} + httpServers = append(httpServers, httpServer) + go func() { httpServer.Serve(listener) }() + } + + // Setup configuration. + for _, server := range *servers { + if _, err := (*servers)[0].Do(&joinCommand{Name: server.Name()}); err != nil { + t.Fatal("Server unable to join: %v", err) + } + } + + // Wait for configuration to propagate. + time.Sleep(testHeartbeatTimeout * 2) + + // Execute all the callbacks at the same time. + for _i, _f := range callbacks { + i, f := _i, _f + go func() { + defer wg.Done() + f((*servers)[i], httpServers[i]) + }() + } + + // Wait until everything is done. + wg.Wait() +} diff --git a/peer.go b/peer.go index 33ac4b09c3..5204c4d82c 100644 --- a/peer.go +++ b/peer.go @@ -96,10 +96,10 @@ func (p *Peer) stopHeartbeat() { // I make the channel with 1 buffer // and try to panic here select { - case p.stopChan <- true: + case p.stopChan <- true: - default: - panic("[" + p.server.Name() + "] cannot stop [" + p.Name() + "] heartbeat") + default: + panic("[" + p.server.Name() + "] cannot stop [" + p.Name() + "] heartbeat") } } @@ -128,12 +128,16 @@ func (p *Peer) heartbeat(c chan bool) { c <- true + debugln("peer.heartbeat: ", p.Name(), p.heartbeatTimeout) + for { select { case <-stopChan: + debugln("peer.heartbeat.stop: ", p.Name()) return case <-time.After(p.heartbeatTimeout): + debugln("peer.heartbeat.run: ", p.Name()) prevLogIndex := p.getPrevLogIndex() entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex) diff --git a/server.go b/server.go index 7d67576a5d..139a2b6b5b 100644 --- a/server.go +++ b/server.go @@ -461,7 +461,7 @@ func (s *Server) followerLoop() { // 1.Receiving valid AppendEntries RPC, or // 2.Granting vote to candidate if update { - timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2) + timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2) } // Exit loop on state change. @@ -568,8 +568,6 @@ func (s *Server) leaderLoop() { var err error select { case e := <-s.c: - s.debugln("server.leader.select") - if e.target == &stopValue { s.setState(Stopped) } else if command, ok := e.target.(Command); ok { @@ -654,6 +652,8 @@ func (s *Server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse // Processes the "append entries" request. func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) { + s.traceln("server.ae.process") + if req.Term < s.currentTerm { s.debugln("server.ae.error: stale term") return newAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), false @@ -786,18 +786,17 @@ func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot // Membership //-------------------------------------- -// Adds a peer to the server. This should be called by a system's join command -// within the context so that it is within the context of the server lock. +// Adds a peer to the server. func (s *Server) AddPeer(name string) error { - // Do not allow peers to be added twice. + s.debugln("server.peer.add: ", name, len(s.peers)) + // Do not allow peers to be added twice. if s.peers[name] != nil { return DuplicatePeerError } // Only add the peer if it doesn't have the same name. if s.name != name { - //s.debugln("Add peer ", name) peer := newPeer(s, name, s.heartbeatTimeout) if s.State() == Leader { peer.startHeartbeat() @@ -808,9 +807,10 @@ func (s *Server) AddPeer(name string) error { return nil } -// Removes a peer from the server. This should be called by a system's join command -// within the context so that it is within the context of the server lock. +// Removes a peer from the server. func (s *Server) RemovePeer(name string) error { + s.debugln("server.peer.remove: ", name, len(s.peers)) + // Ignore removal of the server itself. if s.name == name { return nil diff --git a/server_test.go b/server_test.go index bc6b7e15f7..ac152193af 100644 --- a/server_test.go +++ b/server_test.go @@ -396,7 +396,7 @@ func TestServerMultiNode(t *testing.T) { } mutex.RUnlock() - for i := 0; i < 200000; i++ { + for i := 0; i < 20; i++ { retry := 0 fmt.Println("Round ", i)