diff --git a/raft/raft_test.go b/raft/raft_test.go index dd68ce2b57..67e9603be2 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -66,6 +66,53 @@ func Test_Simulate_SingleNode(t *testing.T) { } } +// Ensure that a cluster of multiple nodes can maintain consensus. +func Test_Simulate_MultiNode(t *testing.T) { + f := func(commands [][]byte) bool { + var fsm TestFSM + l := &raft.Log{ + FSM: &fsm, + Clock: clock.NewMockClock(), + Rand: nopRand, + } + l.URL, _ = url.Parse("//node") + if err := l.Open(tempfile()); err != nil { + log.Fatal("open: ", err) + } + defer os.RemoveAll(l.Path()) + defer l.Close() + + // HACK(benbjohnson): Initialize instead. + if err := l.Initialize(); err != nil { + t.Fatalf("initialize: %s", err) + } + + // Execute a series of commands. + for _, command := range commands { + if err := l.Apply(command); err != nil { + t.Fatalf("apply: %s", err) + } + } + + // Verify the configuration is set. + if fsm.config != `{"clusterID":"00000000","peers":["//node"]}` { + t.Fatalf("unexpected config: %s", fsm.config) + } + + // Verify the commands were executed against the FSM, in order. + for i, command := range commands { + if b := fsm.commands[i]; !bytes.Equal(command, b) { + t.Fatalf("%d. command:\n\nexp: %x\n\n got:%x\n\n", i, command, b) + } + } + + return true + } + if err := quick.Check(f, nil); err != nil { + t.Error(err) + } +} + // TestFSM represents a fake state machine that simple records all commands. type TestFSM struct { config string diff --git a/raft/transport.go b/raft/transport.go index ee24833421..6125c4fa21 100644 --- a/raft/transport.go +++ b/raft/transport.go @@ -1,8 +1,9 @@ package raft import ( - "bytes" + "errors" "fmt" + "io" "net/http" "net/url" "path" @@ -19,8 +20,9 @@ func init() { // Transport represents a handler for connecting the log to another node. // It uses URLs to direct requests over different protocols. type Transport interface { - AppendEntries(*url.URL, *AppendEntriesRequest) (term int64, err error) - RequestVote(*url.URL, *VoteRequest) (term int64, err error) + Heartbeat(u *url.URL, term, commitIndex, leaderID uint64) (uint64, uint64, error) + ReadFrom(u *url.URL, term, index uint64) (io.Reader, error) + RequestVote(u *url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) (uint64, error) } // DefaultTransport provides support for HTTP and TCP protocols. @@ -42,18 +44,26 @@ func (mux *TransportMux) Handle(scheme string, t Transport) { mux.m[scheme] = t } -// AppendEntries sends a list of entries to a follower. -func (mux *TransportMux) AppendEntries(u *url.URL, r *AppendEntriesRequest) (int64, error) { +// Heartbeat checks the status of a follower. +func (mux *TransportMux) Heartbeat(u *url.URL, term, commitIndex, leaderID uint64) (uint64, uint64, error) { if t, ok := mux.m[u.Scheme]; ok { - return t.AppendEntries(u, r) + return t.Heartbeat(u, term, commitIndex, leaderID) } - return 0, fmt.Errorf("transport scheme not supported: %s", u.Scheme) + return 0, 0, fmt.Errorf("transport scheme not supported: %s", u.Scheme) +} + +// ReadFrom streams the log from a leader. +func (mux *TransportMux) ReadFrom(u *url.URL, term, index uint64) (io.Reader, error) { + if t, ok := mux.m[u.Scheme]; ok { + return t.ReadFrom(u, term, index) + } + return nil, fmt.Errorf("transport scheme not supported: %s", u.Scheme) } // RequestVote requests a vote for a candidate in a given term. -func (mux *TransportMux) RequestVote(u *url.URL, r *VoteRequest) (int64, error) { +func (mux *TransportMux) RequestVote(u *url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) (uint64, error) { if t, ok := mux.m[u.Scheme]; ok { - return t.RequestVote(u, r) + return t.RequestVote(u, term, candidateID, lastLogIndex, lastLogTerm) } return 0, fmt.Errorf("transport scheme not supported: %s", u.Scheme) } @@ -61,59 +71,107 @@ func (mux *TransportMux) RequestVote(u *url.URL, r *VoteRequest) (int64, error) // HTTPTransport represents a transport for sending RPCs over the HTTP protocol. type HTTPTransport struct{} -// AppendEntries sends a list of entries to a follower. -func (t *HTTPTransport) AppendEntries(uri *url.URL, r *AppendEntriesRequest) (int64, error) { - // Copy URL and append path. - u := uri - u.Path = path.Join(u.Path, "append_entries") +// Heartbeat checks the status of a follower. +func (t *HTTPTransport) Heartbeat(uri *url.URL, term, commitIndex, leaderID uint64) (uint64, uint64, error) { + // Construct URL. + u := *uri + u.Path = path.Join(u.Path, "heartbeat") - // Create log entry encoder. - var buf bytes.Buffer - enc := NewLogEntryEncoder(&buf) + // Set URL parameters. + v := &url.Values{} + v.Set("term", strconv.FormatUint(term, 10)) + v.Set("commitIndex", strconv.FormatUint(commitIndex, 10)) + v.Set("leaderID", strconv.FormatUint(leaderID, 10)) + u.RawQuery = v.Encode() - // Create an HTTP request. - req, err := http.NewRequest("POST", u.String(), &buf) + // Send HTTP request. + resp, err := http.Get(u.String()) if err != nil { - return 0, err + return 0, 0, err + } + _ = resp.Body.Close() + + // Parse returned index. + newIndexString := resp.Header.Get("X-Raft-Index") + newIndex, err := strconv.ParseUint(newIndexString, 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("invalid index: %q", newIndexString) } - // Set headers. - req.Header = http.Header{ - "X-Raft-Term": {strconv.FormatInt(r.Term, 10)}, - "X-Raft-LeaderID": {strconv.FormatInt(r.LeaderID, 10)}, - "X-Raft-PrevLogIndex": {strconv.FormatInt(r.PrevLogIndex, 10)}, - "X-Raft-PrevLogTerm": {strconv.FormatInt(r.PrevLogTerm, 10)}, + // Parse returned term. + newTermString := resp.Header.Get("X-Raft-Term") + newTerm, err := strconv.ParseUint(newTermString, 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("invalid term: %q", newTermString) } - // Write log entries. - for _, e := range r.Entries { - if err := enc.Encode(e); err != nil { - return 0, err - } + // Parse returned error. + if s := resp.Header.Get("X-Raft-Error"); s != "" { + return newIndex, newTerm, errors.New(s) } - return 0, nil // TODO(benbjohnson) + return newIndex, newTerm, nil +} + +// ReadFrom streams the log from a leader. +func (t *HTTPTransport) ReadFrom(uri *url.URL, term, index uint64) (io.Reader, error) { + // Construct URL. + u := *uri + u.Path = path.Join(u.Path, "stream") + + // Set URL parameters. + v := &url.Values{} + v.Set("term", strconv.FormatUint(term, 10)) + v.Set("index", strconv.FormatUint(index, 10)) + u.RawQuery = v.Encode() + + // Send HTTP request. + resp, err := http.Get(u.String()) + if err != nil { + return nil, err + } + + // Parse returned error. + if s := resp.Header.Get("X-Raft-Error"); s != "" { + _ = resp.Body.Close() + return nil, errors.New(s) + } + + return resp.Body, nil } // RequestVote requests a vote for a candidate in a given term. -func (t *HTTPTransport) RequestVote(u *url.URL, r *VoteRequest) (int64, error) { - return 0, nil // TODO(benbjohnson) -} +func (t *HTTPTransport) RequestVote(uri *url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) (uint64, error) { + // Construct URL. + u := *uri + u.Path = path.Join(u.Path, "vote") -// AppendEntriesRequest represents the arguments for an AppendEntries RPC. -type AppendEntriesRequest struct { - Term int64 - LeaderID int64 - PrevLogIndex int64 - PrevLogTerm int64 - Entries []*LogEntry - LeaderCommit int64 -} + // Set URL parameters. + v := &url.Values{} + v.Set("term", strconv.FormatUint(term, 10)) + v.Set("candidateID", strconv.FormatUint(candidateID, 10)) + v.Set("lastLogIndex", strconv.FormatUint(lastLogIndex, 10)) + v.Set("lastLogTerm", strconv.FormatUint(lastLogTerm, 10)) + u.RawQuery = v.Encode() -// VoteRequest represents the arguments for a RequestVote RPC. -type VoteRequest struct { - Term int64 - CandidateID int64 - LastLogIndex int64 - LastLogTerm int64 + // Send HTTP request. + resp, err := http.Get(u.String()) + if err != nil { + return 0, err + } + _ = resp.Body.Close() + + // Parse returned term. + newTermString := resp.Header.Get("X-Raft-Term") + newTerm, err := strconv.ParseUint(newTermString, 10, 64) + if err != nil { + return 0, fmt.Errorf("invalid term: %q", newTermString) + } + + // Parse returned error. + if s := resp.Header.Get("X-Raft-Error"); s != "" { + return newTerm, errors.New(s) + } + + return newTerm, nil } diff --git a/raft/transport_test.go b/raft/transport_test.go index 8f650156c6..11bf1fa409 100644 --- a/raft/transport_test.go +++ b/raft/transport_test.go @@ -1 +1,256 @@ package raft_test + +import ( + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "testing" + + "github.com/influxdb/influxdb/raft" +) + +// Ensure a heartbeat on an unsupported scheme returns an error. +func TestTransportMux_Heartbeat_ErrUnsupportedScheme(t *testing.T) { + u, _ := url.Parse("foo://bar") + _, _, err := raft.DefaultTransport.Heartbeat(u, 0, 0, 0) + if err == nil || err.Error() != `transport scheme not supported: foo` { + t.Fatalf("unexpected error: %s", err) + } +} + +// Ensure a stream on an unsupported scheme returns an error. +func TestTransportMux_ReadFrom_ErrUnsupportedScheme(t *testing.T) { + u, _ := url.Parse("foo://bar") + _, err := raft.DefaultTransport.ReadFrom(u, 0, 0) + if err == nil || err.Error() != `transport scheme not supported: foo` { + t.Fatalf("unexpected error: %s", err) + } +} + +// Ensure a stream on an unsupported scheme returns an error. +func TestTransportMux_RequestVote_ErrUnsupportedScheme(t *testing.T) { + u, _ := url.Parse("foo://bar") + _, err := raft.DefaultTransport.RequestVote(u, 0, 0, 0, 0) + if err == nil || err.Error() != `transport scheme not supported: foo` { + t.Fatalf("unexpected error: %s", err) + } +} + +// Ensure a heartbeat over HTTP can be read and responded to. +func TestHTTPTransport_Heartbeat(t *testing.T) { + // Start mock HTTP server. + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if path := r.URL.Path; path != `/heartbeat` { + t.Fatalf("unexpected path: %q", path) + } + if term := r.FormValue("term"); term != `1` { + t.Fatalf("unexpected term: %q", term) + } + if commitIndex := r.FormValue("commitIndex"); commitIndex != `2` { + t.Fatalf("unexpected commit index: %q", commitIndex) + } + if leaderID := r.FormValue("leaderID"); leaderID != `3` { + t.Fatalf("unexpected leader id: %q", leaderID) + } + w.Header().Set("X-Raft-Index", "4") + w.Header().Set("X-Raft-Term", "5") + w.WriteHeader(http.StatusOK) + })) + defer s.Close() + + // Execute heartbeat against test server. + u, _ := url.Parse(s.URL) + newIndex, newTerm, err := raft.DefaultTransport.Heartbeat(u, 1, 2, 3) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } else if newIndex != 4 { + t.Fatalf("unexpected new index: %d", newIndex) + } else if newTerm != 5 { + t.Fatalf("unexpected new term: %d", newTerm) + } +} + +// Ensure HTTP heartbeats return correct errors. +func TestHTTPTransport_Heartbeat_Err(t *testing.T) { + var tests = []struct { + index string + term string + errstr string + err string + }{ + {index: "", term: "", err: `invalid index: ""`}, + {index: "1000", term: "", err: `invalid term: ""`}, + {index: "1", term: "2", errstr: "bad heartbeat", err: `bad heartbeat`}, + } + for i, tt := range tests { + // Start mock HTTP server. + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-Raft-Index", tt.index) + w.Header().Set("X-Raft-Term", tt.term) + w.Header().Set("X-Raft-Error", tt.errstr) + w.WriteHeader(http.StatusOK) + })) + + u, _ := url.Parse(s.URL) + _, _, err := raft.DefaultTransport.Heartbeat(u, 1, 2, 3) + if err == nil { + t.Errorf("%d. expected error") + } else if tt.err != err.Error() { + t.Errorf("%d. error:\n\nexp: %s\n\ngot: %s", i, tt.err, err.Error()) + } + s.Close() + } +} + +// Ensure an HTTP heartbeat to a stopped server returns an error. +func TestHTTPTransport_Heartbeat_ErrConnectionRefused(t *testing.T) { + u, _ := url.Parse("http://localhost:41932") + _, _, err := raft.DefaultTransport.Heartbeat(u, 0, 0, 0) + if err == nil { + t.Fatal("expected error") + } else if !strings.Contains(err.Error(), `connection refused`) { + t.Fatalf("unexpected error: %s", err) + } +} + +// Ensure the log can be streamed over HTTP. +func TestHTTPTransport_ReadFrom(t *testing.T) { + // Start mock HTTP server. + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if path := r.URL.Path; path != `/stream` { + t.Fatalf("unexpected path: %q", path) + } + if term := r.FormValue("term"); term != `1` { + t.Fatalf("unexpected term: %q", term) + } + if index := r.FormValue("index"); index != `2` { + t.Fatalf("unexpected index: %q", index) + } + w.Write([]byte("test123")) + })) + defer s.Close() + + // Execute stream against test server. + u, _ := url.Parse(s.URL) + r, err := raft.DefaultTransport.ReadFrom(u, 1, 2) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + b, _ := ioutil.ReadAll(r) + if string(b) != `test123` { + t.Fatalf("unexpected stream: %q", string(b)) + } +} + +// Ensure a stream can return an error. +func TestHTTPTransport_ReadFrom_Err(t *testing.T) { + // Start mock HTTP server. + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-Raft-Error", `bad stream`) + })) + defer s.Close() + + // Execute stream against test server. + u, _ := url.Parse(s.URL) + r, err := raft.DefaultTransport.ReadFrom(u, 0, 0) + if err == nil { + t.Fatalf("expected error") + } else if err.Error() != `bad stream` { + t.Fatalf("unexpected error: %s", err) + } else if r != nil { + t.Fatal("unexpected reader") + } +} + +// Ensure an streaming over HTTP to a stopped server returns an error. +func TestHTTPTransport_ReadFrom_ErrConnectionRefused(t *testing.T) { + u, _ := url.Parse("http://localhost:41932") + _, err := raft.DefaultTransport.ReadFrom(u, 0, 0) + if err == nil { + t.Fatal("expected error") + } else if !strings.Contains(err.Error(), `connection refused`) { + t.Fatalf("unexpected error: %s", err) + } +} + +// Ensure that requesting over HTTP can be read and responded to. +func TestHTTPTransport_RequestVote(t *testing.T) { + // Start mock HTTP server. + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if path := r.URL.Path; path != `/vote` { + t.Fatalf("unexpected path: %s", path) + } + if term := r.FormValue("term"); term != `1` { + t.Fatalf("unexpected term: %v", term) + } + if candidateID := r.FormValue("candidateID"); candidateID != `2` { + t.Fatalf("unexpected candidate id: %v", candidateID) + } + if lastLogIndex := r.FormValue("lastLogIndex"); lastLogIndex != `3` { + t.Fatalf("unexpected last log index: %v", lastLogIndex) + } + if lastLogTerm := r.FormValue("lastLogTerm"); lastLogTerm != `4` { + t.Fatalf("unexpected last log term: %v", lastLogTerm) + } + w.Header().Set("X-Raft-Term", "5") + w.WriteHeader(http.StatusOK) + })) + defer s.Close() + + // Execute heartbeat against test server. + u, _ := url.Parse(s.URL) + newTerm, err := raft.DefaultTransport.RequestVote(u, 1, 2, 3, 4) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } else if newTerm != 5 { + t.Fatalf("unexpected new term: %d", newTerm) + } +} + +// Ensure that a returned vote with an invalid term returns an error. +func TestHTTPTransport_RequestVote_ErrInvalidTerm(t *testing.T) { + // Start mock HTTP server. + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-Raft-Term", `xxx`) + })) + defer s.Close() + + u, _ := url.Parse(s.URL) + _, err := raft.DefaultTransport.RequestVote(u, 0, 0, 0, 0) + if err == nil { + t.Errorf("expected error") + } else if err.Error() != `invalid term: "xxx"` { + t.Errorf("unexpected error: %s", err) + } +} + +// Ensure that a returned vote with an error message returns that error. +func TestHTTPTransport_RequestVote_Error(t *testing.T) { + // Start mock HTTP server. + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-Raft-Term", `1`) + w.Header().Set("X-Raft-Error", `already voted`) + })) + defer s.Close() + + u, _ := url.Parse(s.URL) + _, err := raft.DefaultTransport.RequestVote(u, 0, 0, 0, 0) + if err == nil { + t.Errorf("expected error") + } else if err.Error() != `already voted` { + t.Errorf("unexpected error: %s", err) + } +} + +// Ensure that requesting a vote over HTTP to a stopped server returns an error. +func TestHTTPTransport_RequestVote_ErrConnectionRefused(t *testing.T) { + u, _ := url.Parse("http://localhost:41932") + _, err := raft.DefaultTransport.RequestVote(u, 0, 0, 0, 0) + if err == nil { + t.Fatal("expected error") + } else if !strings.Contains(err.Error(), `connection refused`) { + t.Fatalf("unexpected error: %s", err) + } +}