Add HTTPTransporter.

pull/820/head
Ben Johnson 2013-07-08 20:55:00 -06:00
parent d309819cfa
commit 5aa494ddf9
5 changed files with 319 additions and 12 deletions

View File

@ -16,7 +16,7 @@ const (
Trace = 2
)
var LogLevel int = 0
var LogLevel int = Trace //0
var logger *log.Logger
func init() {

191
http_transporter.go Normal file
View File

@ -0,0 +1,191 @@
package raft
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
)
// Parts from this transporter were heavily influenced by Peter Bougon's
// raft implementation: https://github.com/peterbourgon/raft
//------------------------------------------------------------------------------
//
// Typedefs
//
//------------------------------------------------------------------------------
// An HTTPTransporter is a default transport layer used to communicate between
// multiple servers.
type HTTPTransporter struct {
DisableKeepAlives bool
prefix string
appendEntriesPath string
requestVotePath string
}
type HTTPMuxer interface {
HandleFunc(string, func(http.ResponseWriter, *http.Request))
}
//------------------------------------------------------------------------------
//
// Constructor
//
//------------------------------------------------------------------------------
// Creates a new HTTP transporter with the given path prefix.
func NewHTTPTransporter(prefix string) *HTTPTransporter {
return &HTTPTransporter{
prefix: prefix,
appendEntriesPath: fmt.Sprintf("%s%s", prefix, "/appendEntries"),
requestVotePath: fmt.Sprintf("%s%s", prefix, "/requestVote"),
}
}
//------------------------------------------------------------------------------
//
// Accessors
//
//------------------------------------------------------------------------------
// Retrieves the path prefix used by the transporter.
func (t *HTTPTransporter) Prefix() string {
return t.prefix
}
// Retrieves the AppendEntries path.
func (t *HTTPTransporter) AppendEntriesPath() string {
return t.appendEntriesPath
}
// Retrieves the RequestVote path.
func (t *HTTPTransporter) RequestVotePath() string {
return t.requestVotePath
}
//------------------------------------------------------------------------------
//
// Methods
//
//------------------------------------------------------------------------------
//--------------------------------------
// Installation
//--------------------------------------
// Applies Raft routes to an HTTP router for a given server.
func (t *HTTPTransporter) Install(server *Server, mux HTTPMuxer) {
mux.HandleFunc(t.AppendEntriesPath(), t.appendEntriesHandler(server))
mux.HandleFunc(t.RequestVotePath(), t.requestVoteHandler(server))
}
//--------------------------------------
// Outgoing
//--------------------------------------
// Sends an AppendEntries RPC to a peer.
func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
traceln("REQ:", b.String())
url := fmt.Sprintf("http://%s%s", peer.Name(), t.AppendEntriesPath())
traceln(server.Name(), "POST", url)
client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}}
httpReq, _ := http.NewRequest("POST", url, &b)
httpReq.Header.Add("Content-Type", "application/json")
httpResp, err := client.Do(httpReq)
if httpResp == nil || err != nil {
return nil
}
defer httpResp.Body.Close()
resp := &AppendEntriesResponse{}
if err = json.NewDecoder(httpResp.Body).Decode(&resp); err != nil && err != io.EOF {
return nil
}
return resp
}
// Sends a RequestVote RPC to a peer.
func (t *HTTPTransporter) SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
url := fmt.Sprintf("http://%s%s", peer.Name(), t.RequestVotePath())
traceln(server.Name(), "POST", url)
client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}}
httpReq, _ := http.NewRequest("POST", url, &b)
httpReq.Header.Add("Content-Type", "application/json")
httpResp, err := client.Do(httpReq)
if httpResp == nil || err != nil {
return nil
}
defer httpResp.Body.Close()
resp := &RequestVoteResponse{}
if err = json.NewDecoder(httpResp.Body).Decode(&resp); err != nil && err != io.EOF {
return nil
}
return resp
}
// Sends a SnapshotRequest RPC to a peer.
func (t *HTTPTransporter) SendSnapshotRequest(server *Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse {
// TODO
return nil
}
//--------------------------------------
// Incoming
//--------------------------------------
// Handles incoming AppendEntries requests.
func (t *HTTPTransporter) appendEntriesHandler(server *Server) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
traceln(server.Name(), "RECV /appendEntries")
defer r.Body.Close()
req := &AppendEntriesRequest{}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "", http.StatusBadRequest)
return
}
resp := server.AppendEntries(req)
if err := json.NewEncoder(w).Encode(resp); err != nil {
http.Error(w, "", http.StatusInternalServerError)
return
}
}
}
// Handles incoming RequestVote requests.
func (t *HTTPTransporter) requestVoteHandler(server *Server) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
traceln(server.Name(), "RECV /requestVote")
defer r.Body.Close()
req := &RequestVoteRequest{}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "", http.StatusBadRequest)
return
}
resp := server.RequestVote(req)
if err := json.NewEncoder(w).Encode(resp); err != nil {
http.Error(w, "", http.StatusInternalServerError)
return
}
}
}

112
http_transporter_test.go Normal file
View File

@ -0,0 +1,112 @@
package raft
import (
"fmt"
"net"
"net/http"
"sync"
"time"
"testing"
)
//------------------------------------------------------------------------------
//
// Tests
//
//------------------------------------------------------------------------------
//--------------------------------------
// Membership
//--------------------------------------
// Ensure that we can start several servers and have them communicate.
func TestHTTPTransporter(t *testing.T) {
transporter := NewHTTPTransporter("/raft")
transporter.DisableKeepAlives = true
servers := []*Server{}
f0 := func(server *Server, httpServer *http.Server) {
// Stop the leader and wait for an election.
server.Stop()
time.Sleep(testElectionTimeout * 2)
debugln("STATES: ", servers[1].State(), servers[2].State())
if servers[1].State() != Leader && servers[2].State() != Leader {
t.Fatal("Expected re-election:", servers[1].State(), servers[2].State())
}
server.StartFollower()
}
f1 := func(server *Server, httpServer *http.Server) {
}
f2 := func(server *Server, httpServer *http.Server) {
}
runTestHttpServers(t, &servers, transporter, f0, f1, f2)
}
//------------------------------------------------------------------------------
//
// Helper Functions
//
//------------------------------------------------------------------------------
// Starts multiple independent Raft servers wrapped with HTTP servers.
func runTestHttpServers(t *testing.T, servers *[]*Server, transporter *HTTPTransporter, callbacks ...func(*Server, *http.Server)) {
var wg sync.WaitGroup
httpServers := []*http.Server{}
listeners := []net.Listener{}
for i, _ := range callbacks {
wg.Add(1)
port := 9000+i
// Create raft server.
server := newTestServer(fmt.Sprintf("localhost:%d", port), transporter)
server.SetHeartbeatTimeout(testHeartbeatTimeout)
server.SetElectionTimeout(testElectionTimeout)
server.Initialize()
if i == 0 {
server.StartLeader()
} else {
server.StartFollower()
}
defer server.Stop()
*servers = append(*servers, server)
// Create listener for HTTP server and start it.
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
panic(err)
}
defer listener.Close()
listeners = append(listeners, listener)
// Create wrapping HTTP server.
mux := http.NewServeMux()
transporter.Install(server, mux)
httpServer := &http.Server{Addr:fmt.Sprintf(":%d", port), Handler: mux}
httpServers = append(httpServers, httpServer)
go func() { httpServer.Serve(listener) }()
}
// Setup configuration.
for _, server := range *servers {
if _, err := (*servers)[0].Do(&joinCommand{Name:server.Name()}); err != nil {
t.Fatal("Server unable to join: %v", err)
}
}
// Wait for configuration to propagate.
time.Sleep(testHeartbeatTimeout * 2)
// Execute all the callbacks at the same time.
for _i, _f := range callbacks {
i, f := _i, _f
go func() {
defer wg.Done()
f((*servers)[i], httpServers[i])
}()
}
// Wait until everything is done.
wg.Wait()
}

View File

@ -126,12 +126,16 @@ func (p *Peer) clone() *Peer {
func (p *Peer) heartbeat(c chan bool) {
c <- true
debugln("peer.heartbeat: ", p.Name(), p.heartbeatTimeout)
for {
select {
case <-p.stopChan:
debugln("peer.heartbeat.stop: ", p.Name())
return
case <-time.After(p.heartbeatTimeout):
debugln("peer.heartbeat.run: ", p.Name())
prevLogIndex := p.getPrevLogIndex()
entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex)

View File

@ -560,8 +560,6 @@ func (s *Server) leaderLoop() {
var err error
select {
case e := <-s.c:
s.debugln("server.leader.select")
if e.target == &stopValue {
s.setState(Stopped)
} else if command, ok := e.target.(Command); ok {
@ -646,6 +644,8 @@ func (s *Server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse
// Processes the "append entries" request.
func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) *AppendEntriesResponse {
s.traceln("server.ae.process")
if req.Term < s.currentTerm {
s.debugln("server.ae.error: stale term")
return newAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex())
@ -765,18 +765,17 @@ func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) *RequestVote
// Membership
//--------------------------------------
// Adds a peer to the server. This should be called by a system's join command
// within the context so that it is within the context of the server lock.
// Adds a peer to the server.
func (s *Server) AddPeer(name string) error {
// Do not allow peers to be added twice.
s.debugln("server.peer.add: ", name, len(s.peers))
// Do not allow peers to be added twice.
if s.peers[name] != nil {
return DuplicatePeerError
}
// Only add the peer if it doesn't have the same name.
if s.name != name {
//s.debugln("Add peer ", name)
peer := newPeer(s, name, s.heartbeatTimeout)
if s.State() == Leader {
peer.startHeartbeat()
@ -787,9 +786,10 @@ func (s *Server) AddPeer(name string) error {
return nil
}
// Removes a peer from the server. This should be called by a system's join command
// within the context so that it is within the context of the server lock.
// Removes a peer from the server.
func (s *Server) RemovePeer(name string) error {
s.debugln("server.peer.remove: ", name, len(s.peers))
// Ignore removal of the server itself.
if s.name == name {
return nil