Add HTTP transport.

pull/903/head
Ben Johnson 2014-09-08 17:03:27 -06:00
parent 7d4c3e9c6f
commit 97f24d08a1
3 changed files with 410 additions and 50 deletions

View File

@ -66,6 +66,53 @@ func Test_Simulate_SingleNode(t *testing.T) {
}
}
// Ensure that a cluster of multiple nodes can maintain consensus.
func Test_Simulate_MultiNode(t *testing.T) {
f := func(commands [][]byte) bool {
var fsm TestFSM
l := &raft.Log{
FSM: &fsm,
Clock: clock.NewMockClock(),
Rand: nopRand,
}
l.URL, _ = url.Parse("//node")
if err := l.Open(tempfile()); err != nil {
log.Fatal("open: ", err)
}
defer os.RemoveAll(l.Path())
defer l.Close()
// HACK(benbjohnson): Initialize instead.
if err := l.Initialize(); err != nil {
t.Fatalf("initialize: %s", err)
}
// Execute a series of commands.
for _, command := range commands {
if err := l.Apply(command); err != nil {
t.Fatalf("apply: %s", err)
}
}
// Verify the configuration is set.
if fsm.config != `{"clusterID":"00000000","peers":["//node"]}` {
t.Fatalf("unexpected config: %s", fsm.config)
}
// Verify the commands were executed against the FSM, in order.
for i, command := range commands {
if b := fsm.commands[i]; !bytes.Equal(command, b) {
t.Fatalf("%d. command:\n\nexp: %x\n\n got:%x\n\n", i, command, b)
}
}
return true
}
if err := quick.Check(f, nil); err != nil {
t.Error(err)
}
}
// TestFSM represents a fake state machine that simple records all commands.
type TestFSM struct {
config string

View File

@ -1,8 +1,9 @@
package raft
import (
"bytes"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"path"
@ -19,8 +20,9 @@ func init() {
// Transport represents a handler for connecting the log to another node.
// It uses URLs to direct requests over different protocols.
type Transport interface {
AppendEntries(*url.URL, *AppendEntriesRequest) (term int64, err error)
RequestVote(*url.URL, *VoteRequest) (term int64, err error)
Heartbeat(u *url.URL, term, commitIndex, leaderID uint64) (uint64, uint64, error)
ReadFrom(u *url.URL, term, index uint64) (io.Reader, error)
RequestVote(u *url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) (uint64, error)
}
// DefaultTransport provides support for HTTP and TCP protocols.
@ -42,18 +44,26 @@ func (mux *TransportMux) Handle(scheme string, t Transport) {
mux.m[scheme] = t
}
// AppendEntries sends a list of entries to a follower.
func (mux *TransportMux) AppendEntries(u *url.URL, r *AppendEntriesRequest) (int64, error) {
// Heartbeat checks the status of a follower.
func (mux *TransportMux) Heartbeat(u *url.URL, term, commitIndex, leaderID uint64) (uint64, uint64, error) {
if t, ok := mux.m[u.Scheme]; ok {
return t.AppendEntries(u, r)
return t.Heartbeat(u, term, commitIndex, leaderID)
}
return 0, fmt.Errorf("transport scheme not supported: %s", u.Scheme)
return 0, 0, fmt.Errorf("transport scheme not supported: %s", u.Scheme)
}
// ReadFrom streams the log from a leader.
func (mux *TransportMux) ReadFrom(u *url.URL, term, index uint64) (io.Reader, error) {
if t, ok := mux.m[u.Scheme]; ok {
return t.ReadFrom(u, term, index)
}
return nil, fmt.Errorf("transport scheme not supported: %s", u.Scheme)
}
// RequestVote requests a vote for a candidate in a given term.
func (mux *TransportMux) RequestVote(u *url.URL, r *VoteRequest) (int64, error) {
func (mux *TransportMux) RequestVote(u *url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) (uint64, error) {
if t, ok := mux.m[u.Scheme]; ok {
return t.RequestVote(u, r)
return t.RequestVote(u, term, candidateID, lastLogIndex, lastLogTerm)
}
return 0, fmt.Errorf("transport scheme not supported: %s", u.Scheme)
}
@ -61,59 +71,107 @@ func (mux *TransportMux) RequestVote(u *url.URL, r *VoteRequest) (int64, error)
// HTTPTransport represents a transport for sending RPCs over the HTTP protocol.
type HTTPTransport struct{}
// AppendEntries sends a list of entries to a follower.
func (t *HTTPTransport) AppendEntries(uri *url.URL, r *AppendEntriesRequest) (int64, error) {
// Copy URL and append path.
u := uri
u.Path = path.Join(u.Path, "append_entries")
// Heartbeat checks the status of a follower.
func (t *HTTPTransport) Heartbeat(uri *url.URL, term, commitIndex, leaderID uint64) (uint64, uint64, error) {
// Construct URL.
u := *uri
u.Path = path.Join(u.Path, "heartbeat")
// Create log entry encoder.
var buf bytes.Buffer
enc := NewLogEntryEncoder(&buf)
// Set URL parameters.
v := &url.Values{}
v.Set("term", strconv.FormatUint(term, 10))
v.Set("commitIndex", strconv.FormatUint(commitIndex, 10))
v.Set("leaderID", strconv.FormatUint(leaderID, 10))
u.RawQuery = v.Encode()
// Create an HTTP request.
req, err := http.NewRequest("POST", u.String(), &buf)
// Send HTTP request.
resp, err := http.Get(u.String())
if err != nil {
return 0, err
return 0, 0, err
}
_ = resp.Body.Close()
// Parse returned index.
newIndexString := resp.Header.Get("X-Raft-Index")
newIndex, err := strconv.ParseUint(newIndexString, 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("invalid index: %q", newIndexString)
}
// Set headers.
req.Header = http.Header{
"X-Raft-Term": {strconv.FormatInt(r.Term, 10)},
"X-Raft-LeaderID": {strconv.FormatInt(r.LeaderID, 10)},
"X-Raft-PrevLogIndex": {strconv.FormatInt(r.PrevLogIndex, 10)},
"X-Raft-PrevLogTerm": {strconv.FormatInt(r.PrevLogTerm, 10)},
// Parse returned term.
newTermString := resp.Header.Get("X-Raft-Term")
newTerm, err := strconv.ParseUint(newTermString, 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("invalid term: %q", newTermString)
}
// Write log entries.
for _, e := range r.Entries {
if err := enc.Encode(e); err != nil {
return 0, err
}
// Parse returned error.
if s := resp.Header.Get("X-Raft-Error"); s != "" {
return newIndex, newTerm, errors.New(s)
}
return 0, nil // TODO(benbjohnson)
return newIndex, newTerm, nil
}
// ReadFrom streams the log from a leader.
func (t *HTTPTransport) ReadFrom(uri *url.URL, term, index uint64) (io.Reader, error) {
// Construct URL.
u := *uri
u.Path = path.Join(u.Path, "stream")
// Set URL parameters.
v := &url.Values{}
v.Set("term", strconv.FormatUint(term, 10))
v.Set("index", strconv.FormatUint(index, 10))
u.RawQuery = v.Encode()
// Send HTTP request.
resp, err := http.Get(u.String())
if err != nil {
return nil, err
}
// Parse returned error.
if s := resp.Header.Get("X-Raft-Error"); s != "" {
_ = resp.Body.Close()
return nil, errors.New(s)
}
return resp.Body, nil
}
// RequestVote requests a vote for a candidate in a given term.
func (t *HTTPTransport) RequestVote(u *url.URL, r *VoteRequest) (int64, error) {
return 0, nil // TODO(benbjohnson)
}
func (t *HTTPTransport) RequestVote(uri *url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) (uint64, error) {
// Construct URL.
u := *uri
u.Path = path.Join(u.Path, "vote")
// AppendEntriesRequest represents the arguments for an AppendEntries RPC.
type AppendEntriesRequest struct {
Term int64
LeaderID int64
PrevLogIndex int64
PrevLogTerm int64
Entries []*LogEntry
LeaderCommit int64
}
// Set URL parameters.
v := &url.Values{}
v.Set("term", strconv.FormatUint(term, 10))
v.Set("candidateID", strconv.FormatUint(candidateID, 10))
v.Set("lastLogIndex", strconv.FormatUint(lastLogIndex, 10))
v.Set("lastLogTerm", strconv.FormatUint(lastLogTerm, 10))
u.RawQuery = v.Encode()
// VoteRequest represents the arguments for a RequestVote RPC.
type VoteRequest struct {
Term int64
CandidateID int64
LastLogIndex int64
LastLogTerm int64
// Send HTTP request.
resp, err := http.Get(u.String())
if err != nil {
return 0, err
}
_ = resp.Body.Close()
// Parse returned term.
newTermString := resp.Header.Get("X-Raft-Term")
newTerm, err := strconv.ParseUint(newTermString, 10, 64)
if err != nil {
return 0, fmt.Errorf("invalid term: %q", newTermString)
}
// Parse returned error.
if s := resp.Header.Get("X-Raft-Error"); s != "" {
return newTerm, errors.New(s)
}
return newTerm, nil
}

View File

@ -1 +1,256 @@
package raft_test
import (
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"github.com/influxdb/influxdb/raft"
)
// Ensure a heartbeat on an unsupported scheme returns an error.
func TestTransportMux_Heartbeat_ErrUnsupportedScheme(t *testing.T) {
u, _ := url.Parse("foo://bar")
_, _, err := raft.DefaultTransport.Heartbeat(u, 0, 0, 0)
if err == nil || err.Error() != `transport scheme not supported: foo` {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure a stream on an unsupported scheme returns an error.
func TestTransportMux_ReadFrom_ErrUnsupportedScheme(t *testing.T) {
u, _ := url.Parse("foo://bar")
_, err := raft.DefaultTransport.ReadFrom(u, 0, 0)
if err == nil || err.Error() != `transport scheme not supported: foo` {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure a stream on an unsupported scheme returns an error.
func TestTransportMux_RequestVote_ErrUnsupportedScheme(t *testing.T) {
u, _ := url.Parse("foo://bar")
_, err := raft.DefaultTransport.RequestVote(u, 0, 0, 0, 0)
if err == nil || err.Error() != `transport scheme not supported: foo` {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure a heartbeat over HTTP can be read and responded to.
func TestHTTPTransport_Heartbeat(t *testing.T) {
// Start mock HTTP server.
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if path := r.URL.Path; path != `/heartbeat` {
t.Fatalf("unexpected path: %q", path)
}
if term := r.FormValue("term"); term != `1` {
t.Fatalf("unexpected term: %q", term)
}
if commitIndex := r.FormValue("commitIndex"); commitIndex != `2` {
t.Fatalf("unexpected commit index: %q", commitIndex)
}
if leaderID := r.FormValue("leaderID"); leaderID != `3` {
t.Fatalf("unexpected leader id: %q", leaderID)
}
w.Header().Set("X-Raft-Index", "4")
w.Header().Set("X-Raft-Term", "5")
w.WriteHeader(http.StatusOK)
}))
defer s.Close()
// Execute heartbeat against test server.
u, _ := url.Parse(s.URL)
newIndex, newTerm, err := raft.DefaultTransport.Heartbeat(u, 1, 2, 3)
if err != nil {
t.Fatalf("unexpected error: %s", err)
} else if newIndex != 4 {
t.Fatalf("unexpected new index: %d", newIndex)
} else if newTerm != 5 {
t.Fatalf("unexpected new term: %d", newTerm)
}
}
// Ensure HTTP heartbeats return correct errors.
func TestHTTPTransport_Heartbeat_Err(t *testing.T) {
var tests = []struct {
index string
term string
errstr string
err string
}{
{index: "", term: "", err: `invalid index: ""`},
{index: "1000", term: "", err: `invalid term: ""`},
{index: "1", term: "2", errstr: "bad heartbeat", err: `bad heartbeat`},
}
for i, tt := range tests {
// Start mock HTTP server.
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-Raft-Index", tt.index)
w.Header().Set("X-Raft-Term", tt.term)
w.Header().Set("X-Raft-Error", tt.errstr)
w.WriteHeader(http.StatusOK)
}))
u, _ := url.Parse(s.URL)
_, _, err := raft.DefaultTransport.Heartbeat(u, 1, 2, 3)
if err == nil {
t.Errorf("%d. expected error")
} else if tt.err != err.Error() {
t.Errorf("%d. error:\n\nexp: %s\n\ngot: %s", i, tt.err, err.Error())
}
s.Close()
}
}
// Ensure an HTTP heartbeat to a stopped server returns an error.
func TestHTTPTransport_Heartbeat_ErrConnectionRefused(t *testing.T) {
u, _ := url.Parse("http://localhost:41932")
_, _, err := raft.DefaultTransport.Heartbeat(u, 0, 0, 0)
if err == nil {
t.Fatal("expected error")
} else if !strings.Contains(err.Error(), `connection refused`) {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure the log can be streamed over HTTP.
func TestHTTPTransport_ReadFrom(t *testing.T) {
// Start mock HTTP server.
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if path := r.URL.Path; path != `/stream` {
t.Fatalf("unexpected path: %q", path)
}
if term := r.FormValue("term"); term != `1` {
t.Fatalf("unexpected term: %q", term)
}
if index := r.FormValue("index"); index != `2` {
t.Fatalf("unexpected index: %q", index)
}
w.Write([]byte("test123"))
}))
defer s.Close()
// Execute stream against test server.
u, _ := url.Parse(s.URL)
r, err := raft.DefaultTransport.ReadFrom(u, 1, 2)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
b, _ := ioutil.ReadAll(r)
if string(b) != `test123` {
t.Fatalf("unexpected stream: %q", string(b))
}
}
// Ensure a stream can return an error.
func TestHTTPTransport_ReadFrom_Err(t *testing.T) {
// Start mock HTTP server.
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-Raft-Error", `bad stream`)
}))
defer s.Close()
// Execute stream against test server.
u, _ := url.Parse(s.URL)
r, err := raft.DefaultTransport.ReadFrom(u, 0, 0)
if err == nil {
t.Fatalf("expected error")
} else if err.Error() != `bad stream` {
t.Fatalf("unexpected error: %s", err)
} else if r != nil {
t.Fatal("unexpected reader")
}
}
// Ensure an streaming over HTTP to a stopped server returns an error.
func TestHTTPTransport_ReadFrom_ErrConnectionRefused(t *testing.T) {
u, _ := url.Parse("http://localhost:41932")
_, err := raft.DefaultTransport.ReadFrom(u, 0, 0)
if err == nil {
t.Fatal("expected error")
} else if !strings.Contains(err.Error(), `connection refused`) {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure that requesting over HTTP can be read and responded to.
func TestHTTPTransport_RequestVote(t *testing.T) {
// Start mock HTTP server.
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if path := r.URL.Path; path != `/vote` {
t.Fatalf("unexpected path: %s", path)
}
if term := r.FormValue("term"); term != `1` {
t.Fatalf("unexpected term: %v", term)
}
if candidateID := r.FormValue("candidateID"); candidateID != `2` {
t.Fatalf("unexpected candidate id: %v", candidateID)
}
if lastLogIndex := r.FormValue("lastLogIndex"); lastLogIndex != `3` {
t.Fatalf("unexpected last log index: %v", lastLogIndex)
}
if lastLogTerm := r.FormValue("lastLogTerm"); lastLogTerm != `4` {
t.Fatalf("unexpected last log term: %v", lastLogTerm)
}
w.Header().Set("X-Raft-Term", "5")
w.WriteHeader(http.StatusOK)
}))
defer s.Close()
// Execute heartbeat against test server.
u, _ := url.Parse(s.URL)
newTerm, err := raft.DefaultTransport.RequestVote(u, 1, 2, 3, 4)
if err != nil {
t.Fatalf("unexpected error: %s", err)
} else if newTerm != 5 {
t.Fatalf("unexpected new term: %d", newTerm)
}
}
// Ensure that a returned vote with an invalid term returns an error.
func TestHTTPTransport_RequestVote_ErrInvalidTerm(t *testing.T) {
// Start mock HTTP server.
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-Raft-Term", `xxx`)
}))
defer s.Close()
u, _ := url.Parse(s.URL)
_, err := raft.DefaultTransport.RequestVote(u, 0, 0, 0, 0)
if err == nil {
t.Errorf("expected error")
} else if err.Error() != `invalid term: "xxx"` {
t.Errorf("unexpected error: %s", err)
}
}
// Ensure that a returned vote with an error message returns that error.
func TestHTTPTransport_RequestVote_Error(t *testing.T) {
// Start mock HTTP server.
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-Raft-Term", `1`)
w.Header().Set("X-Raft-Error", `already voted`)
}))
defer s.Close()
u, _ := url.Parse(s.URL)
_, err := raft.DefaultTransport.RequestVote(u, 0, 0, 0, 0)
if err == nil {
t.Errorf("expected error")
} else if err.Error() != `already voted` {
t.Errorf("unexpected error: %s", err)
}
}
// Ensure that requesting a vote over HTTP to a stopped server returns an error.
func TestHTTPTransport_RequestVote_ErrConnectionRefused(t *testing.T) {
u, _ := url.Parse("http://localhost:41932")
_, err := raft.DefaultTransport.RequestVote(u, 0, 0, 0, 0)
if err == nil {
t.Fatal("expected error")
} else if !strings.Contains(err.Error(), `connection refused`) {
t.Fatalf("unexpected error: %s", err)
}
}