Switch to use Transporter interface.

pull/820/head
Ben Johnson 2013-05-28 15:57:38 -04:00
parent 9e8834448a
commit 147d3c5dd8
6 changed files with 106 additions and 95 deletions

20
peer.go
View File

@ -102,21 +102,21 @@ func (p *Peer) stop() {
func (p *Peer) internalFlush() (uint64, bool, error) {
p.mutex.Lock()
defer p.mutex.Unlock()
req, handler := p.server.createInternalAppendEntriesRequest(p.prevLogIndex)
return p.sendFlushRequest(req, handler)
req := p.server.createInternalAppendEntriesRequest(p.prevLogIndex)
return p.sendFlushRequest(req)
}
// Flushes a request through a handler.
func (p *Peer) sendFlushRequest(req *AppendEntriesRequest, handler func(*Server, *Peer, *AppendEntriesRequest) (*AppendEntriesResponse, error)) (uint64, bool, error) {
// Ignore any null requests/handlers.
if req == nil || handler == nil {
return 0, false, errors.New("raft.Peer: Request or handler required")
// Flushes a request through the server's transport.
func (p *Peer) sendFlushRequest(req *AppendEntriesRequest) (uint64, bool, error) {
// Ignore any null requests.
if req == nil {
return 0, false, errors.New("raft.Peer: Request required")
}
// Generate an AppendEntries request based on the state of the server and
// log. Send the request through the user-provided handler and process the
// result.
resp, err := handler(p.server, p, req)
resp, err := p.server.transporter.SendAppendEntriesRequest(p.server, p, req)
p.heartbeatTimer.Reset()
if resp == nil {
return 0, false, err
@ -175,10 +175,10 @@ func (p *Peer) heartbeatTimeoutFunc() {
p.mutex.Unlock()
// Lock the server to create a request.
req, handler := server.createAppendEntriesRequest(prevLogIndex)
req := server.createAppendEntriesRequest(prevLogIndex)
p.mutex.Lock()
p.sendFlushRequest(req, handler)
p.sendFlushRequest(req)
p.mutex.Unlock()
} else {
break

View File

@ -43,8 +43,7 @@ var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
// A server is involved in the consensus protocol and can act as a follower,
// candidate or a leader.
type Server struct {
RequestVoteHandler func(*Server, *Peer, *RequestVoteRequest) (*RequestVoteResponse, error)
AppendEntriesHandler func(*Server, *Peer, *AppendEntriesRequest) (*AppendEntriesResponse, error)
transporter Transporter
name string
path string
state string
@ -65,13 +64,18 @@ type Server struct {
//------------------------------------------------------------------------------
// Creates a new server with a log at the given path.
func NewServer(name string, path string) (*Server, error) {
func NewServer(name string, path string, transporter Transporter) (*Server, error) {
if name == "" {
return nil, errors.New("raft.Server: Name cannot be blank")
}
if transporter == nil {
panic("raft.Server: Transporter required")
}
s := &Server{
name: name,
path: path,
transporter: transporter,
state: Stopped,
peers: make(map[string]*Peer),
log: NewLog(),
@ -108,6 +112,11 @@ func (s *Server) Path() string {
return s.path
}
// Retrieves the object that transports requests.
func (s *Server) Transporter() Transporter {
return s.transporter
}
// Retrieves the log path for the server.
func (s *Server) LogPath() string {
return fmt.Sprintf("%s/log", s.path)
@ -419,20 +428,20 @@ func (s *Server) AppendEntries(req *AppendEntriesRequest) (*AppendEntriesRespons
}
// Creates an AppendEntries request.
func (s *Server) createAppendEntriesRequest(prevLogIndex uint64) (*AppendEntriesRequest, func(*Server, *Peer, *AppendEntriesRequest) (*AppendEntriesResponse, error)) {
func (s *Server) createAppendEntriesRequest(prevLogIndex uint64) *AppendEntriesRequest {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.createInternalAppendEntriesRequest(prevLogIndex)
}
// Creates an AppendEntries request without a lock.
func (s *Server) createInternalAppendEntriesRequest(prevLogIndex uint64) (*AppendEntriesRequest, func(*Server, *Peer, *AppendEntriesRequest) (*AppendEntriesResponse, error)) {
func (s *Server) createInternalAppendEntriesRequest(prevLogIndex uint64) *AppendEntriesRequest {
if s.log == nil {
return nil, nil
return nil
}
entries, prevLogTerm := s.log.GetEntriesAfter(prevLogIndex)
req := NewAppendEntriesRequest(s.currentTerm, s.name, prevLogIndex, prevLogTerm, entries, s.log.CommitIndex())
return req, s.AppendEntriesHandler
return req
}
//--------------------------------------
@ -455,7 +464,7 @@ func (s *Server) promote() (bool, error) {
go func() {
req := NewRequestVoteRequest(term, s.name, lastLogIndex, lastLogTerm)
req.peer = peer
if resp, _ := s.executeRequestVoteHandler(peer, req); resp != nil {
if resp, _ := s.transporter.SendVoteRequest(s, peer, req); resp != nil {
resp.peer = peer
c <- resp
}
@ -600,14 +609,6 @@ func (s *Server) RequestVote(req *RequestVoteRequest) (*RequestVoteResponse, err
return NewRequestVoteResponse(s.currentTerm, true), nil
}
// Executes the handler for sending a RequestVote RPC.
func (s *Server) executeRequestVoteHandler(peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) {
if s.RequestVoteHandler == nil {
panic("raft.Server: RequestVoteHandler not registered")
}
return s.RequestVoteHandler(s, peer, req)
}
// Updates the current term on the server if the term is greater than the
// server's current term. When the term is changed then the server's vote is
// cleared and its state is changed to be a follower.

View File

@ -19,7 +19,7 @@ import (
// Ensure that we can request a vote from a server that has not voted.
func TestServerRequestVote(t *testing.T) {
server := newTestServer("1")
server := newTestServer("1", &testTransporter{})
server.Start()
defer server.Stop()
resp, err := server.RequestVote(NewRequestVoteRequest(1, "foo", 0, 0))
@ -30,7 +30,7 @@ func TestServerRequestVote(t *testing.T) {
// Ensure that a vote request is denied if it comes from an old term.
func TestServerRequestVoteDeniedForStaleTerm(t *testing.T) {
server := newTestServer("1")
server := newTestServer("1", &testTransporter{})
server.state = Leader
server.currentTerm = 2
server.Start()
@ -46,7 +46,7 @@ func TestServerRequestVoteDeniedForStaleTerm(t *testing.T) {
// Ensure that a vote request is denied if we've already voted for a different candidate.
func TestServerRequestVoteDeniedIfAlreadyVoted(t *testing.T) {
server := newTestServer("1")
server := newTestServer("1", &testTransporter{})
server.currentTerm = 2
server.Start()
defer server.Stop()
@ -62,7 +62,7 @@ func TestServerRequestVoteDeniedIfAlreadyVoted(t *testing.T) {
// Ensure that a vote request is approved if vote occurs in a new term.
func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) {
server := newTestServer("1")
server := newTestServer("1", &testTransporter{})
server.currentTerm = 2
server.Start()
defer server.Stop()
@ -78,7 +78,7 @@ func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) {
// Ensure that a vote request is denied if the log is out of date.
func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
server := newTestServerWithLog("1",
server := newTestServerWithLog("1", &testTransporter{},
`cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}`+"\n"+
`4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}`+"\n"+
`6ac5807c 0000000000000003 0000000000000002 cmd_1 {"val":"bar","i":0}`+"\n")
@ -111,7 +111,7 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
// Ensure that we can self-promote a server to candidate, obtain votes and become a fearless leader.
func TestServerPromoteSelf(t *testing.T) {
server := newTestServer("1")
server := newTestServer("1", &testTransporter{})
server.Start()
defer server.Stop()
if success, err := server.promote(); !(success && err == nil && server.state == Leader) {
@ -121,10 +121,12 @@ func TestServerPromoteSelf(t *testing.T) {
// Ensure that we can promote a server within a cluster to a leader.
func TestServerPromote(t *testing.T) {
servers, lookup := newTestCluster([]string{"1", "2", "3"})
servers.SetRequestVoteHandler(func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) {
lookup := map[string]*Server{}
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) {
return lookup[peer.Name()].RequestVote(req)
})
}
servers := newTestCluster([]string{"1", "2", "3"}, transporter, lookup)
for _, server := range servers {
defer server.Stop()
}
@ -136,14 +138,16 @@ func TestServerPromote(t *testing.T) {
// Ensure that a server will restart election if not enough votes are obtained before timeout.
func TestServerPromoteDoubleElection(t *testing.T) {
servers, lookup := newTestCluster([]string{"1", "2", "3"})
lookup := map[string]*Server{}
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) {
return lookup[peer.Name()].RequestVote(req)
}
servers := newTestCluster([]string{"1", "2", "3"}, transporter, lookup)
lookup["2"].currentTerm, lookup["2"].votedFor = 1, "2"
lookup["3"].currentTerm, lookup["3"].votedFor = 1, "3"
lookup["2"].electionTimer.Stop()
lookup["3"].electionTimer.Stop()
servers.SetRequestVoteHandler(func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) {
return lookup[peer.Name()].RequestVote(req)
})
for _, server := range servers {
defer server.Stop()
}
@ -165,7 +169,7 @@ func TestServerPromoteDoubleElection(t *testing.T) {
// Ensure we can append entries to a server.
func TestServerAppendEntries(t *testing.T) {
server := newTestServer("1")
server := newTestServer("1", &testTransporter{})
server.Start()
defer server.Stop()
@ -201,7 +205,7 @@ func TestServerAppendEntries(t *testing.T) {
// Ensure that entries with stale terms are rejected.
func TestServerAppendEntriesWithStaleTermsAreRejected(t *testing.T) {
server := newTestServer("1")
server := newTestServer("1", &testTransporter{})
server.Start()
defer server.Stop()
server.currentTerm = 2
@ -219,7 +223,7 @@ func TestServerAppendEntriesWithStaleTermsAreRejected(t *testing.T) {
// Ensure that we reject entries if the commit log is different.
func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) {
server := newTestServer("1")
server := newTestServer("1", &testTransporter{})
server.Start()
defer server.Stop()
@ -243,7 +247,7 @@ func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) {
// Ensure that we uncommitted entries are rolled back if new entries overwrite them.
func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) {
server := newTestServer("1")
server := newTestServer("1", &testTransporter{})
server.Start()
defer server.Stop()
@ -272,7 +276,7 @@ func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) {
// Ensure that a follower cannot execute a command.
func TestServerDenyCommandExecutionWhenFollower(t *testing.T) {
server := newTestServer("1")
server := newTestServer("1", &testTransporter{})
server.Start()
defer server.Stop()
if err := server.Do(&TestCommand1{"foo", 10}); err != NotLeaderError {
@ -286,7 +290,7 @@ func TestServerDenyCommandExecutionWhenFollower(t *testing.T) {
// Ensure that we can start a single server and append to its log.
func TestServerSingleNode(t *testing.T) {
server := newTestServer("1")
server := newTestServer("1", &testTransporter{})
if server.state != Stopped {
t.Fatalf("Unexpected server state: %v", server.state)
}
@ -325,24 +329,26 @@ func TestServerMultiNode(t *testing.T) {
for _, server := range servers {
defer server.Stop()
}
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) {
mutex.Lock()
s := servers[peer.name]
mutex.Unlock()
return s.RequestVote(req)
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error) {
mutex.Lock()
s := servers[peer.name]
mutex.Unlock()
resp, err := s.AppendEntries(req)
return resp, err
}
var leader *Server
for _, name := range names {
server := newTestServer(name)
server := newTestServer(name, transporter)
server.SetElectionTimeout(testElectionTimeout)
server.SetHeartbeatTimeout(testHeartbeatTimeout)
server.RequestVoteHandler = func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) {
mutex.Lock()
s := servers[peer.name]
mutex.Unlock()
return s.RequestVote(req)
}
server.AppendEntriesHandler = func(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error) {
mutex.Lock()
s := servers[peer.name]
mutex.Unlock()
resp, err := s.AppendEntries(req)
return resp, err
}
if err := server.Start(); err != nil {
t.Fatalf("Unable to start server[%s]: %v", name, err)
}

View File

@ -1,27 +0,0 @@
package raft
//------------------------------------------------------------------------------
//
// Typedefs
//
//------------------------------------------------------------------------------
// A collection of servers.
type Servers []*Server
//------------------------------------------------------------------------------
//
// Methods
//
//------------------------------------------------------------------------------
//--------------------------------------
// Handlers
//--------------------------------------
// Sets the RequestVoteHandler for a set of servers.
func (s Servers) SetRequestVoteHandler(f func(*Server, *Peer, *RequestVoteRequest) (*RequestVoteResponse, error)) {
for _, server := range s {
server.RequestVoteHandler = f
}
}

35
test.go
View File

@ -58,26 +58,25 @@ func setupLog(content string) (*Log, string) {
// Servers
//--------------------------------------
func newTestServer(name string) *Server {
func newTestServer(name string, transporter Transporter) *Server {
path, _ := ioutil.TempDir("", "raft-server-")
server, _ := NewServer(name, path)
server, _ := NewServer(name, path, transporter)
return server
}
func newTestServerWithLog(name string, content string) *Server {
server := newTestServer(name)
func newTestServerWithLog(name string, transporter Transporter, content string) *Server {
server := newTestServer(name, transporter)
ioutil.WriteFile(server.LogPath(), []byte(content), 0644)
return server
}
func newTestCluster(names []string) (Servers, map[string]*Server) {
servers := make(Servers, 0)
lookup := make(map[string]*Server, 0)
func newTestCluster(names []string, transporter Transporter, lookup map[string]*Server) []*Server {
servers := []*Server{}
for _, name := range names {
if lookup[name] != nil {
panic(fmt.Sprintf("Duplicate server in test cluster! %v", name))
}
server := newTestServer(name)
server := newTestServer(name, transporter)
server.SetElectionTimeout(testElectionTimeout)
servers = append(servers, server)
lookup[name] = server
@ -89,9 +88,27 @@ func newTestCluster(names []string) (Servers, map[string]*Server) {
}
server.Start()
}
return servers, lookup
return servers
}
//--------------------------------------
// Transporter
//--------------------------------------
type testTransporter struct {
sendVoteRequestFunc func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error)
sendAppendEntriesRequestFunc func(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error)
}
func (t *testTransporter) SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) {
return t.sendVoteRequestFunc(server, peer, req)
}
func (t *testTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error) {
return t.sendAppendEntriesRequestFunc(server, peer, req)
}
//--------------------------------------
// Join Command
//--------------------------------------

14
transporter.go Normal file
View File

@ -0,0 +1,14 @@
package raft
//------------------------------------------------------------------------------
//
// Typedefs
//
//------------------------------------------------------------------------------
// Transporter is the interface for allowing the host application to transport
// requests to other nodes.
type Transporter interface {
SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error)
SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error)
}