Merge pull request #122 from goraft/interfaces

Server Interface
pull/820/head
Xiang Li 2013-10-14 11:09:37 -07:00
commit 9765b9cdeb
11 changed files with 273 additions and 245 deletions

View File

@ -29,7 +29,7 @@ func init() {
// A command represents an action to be taken on the replicated state machine.
type Command interface {
CommandName() string
Apply(server *Server) (interface{}, error)
Apply(server Server) (interface{}, error)
}
type CommandEncoder interface {

View File

@ -77,7 +77,7 @@ func (t *HTTPTransporter) RequestVotePath() string {
//--------------------------------------
// Applies Raft routes to an HTTP router for a given server.
func (t *HTTPTransporter) Install(server *Server, mux HTTPMuxer) {
func (t *HTTPTransporter) Install(server Server, mux HTTPMuxer) {
mux.HandleFunc(t.AppendEntriesPath(), t.appendEntriesHandler(server))
mux.HandleFunc(t.RequestVotePath(), t.requestVoteHandler(server))
}
@ -87,7 +87,7 @@ func (t *HTTPTransporter) Install(server *Server, mux HTTPMuxer) {
//--------------------------------------
// Sends an AppendEntries RPC to a peer.
func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
func (t *HTTPTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
var b bytes.Buffer
if _, err := req.Encode(&b); err != nil {
traceln("transporter.ae.encoding.error:", err)
@ -115,7 +115,7 @@ func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, r
}
// Sends a RequestVote RPC to a peer.
func (t *HTTPTransporter) SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
func (t *HTTPTransporter) SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
var b bytes.Buffer
if _, err := req.Encode(&b); err != nil {
traceln("transporter.rv.encoding.error:", err)
@ -143,12 +143,12 @@ func (t *HTTPTransporter) SendVoteRequest(server *Server, peer *Peer, req *Reque
}
// Sends a SnapshotRequest RPC to a peer.
func (t *HTTPTransporter) SendSnapshotRequest(server *Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse {
func (t *HTTPTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse {
return nil
}
// Sends a SnapshotRequest RPC to a peer.
func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server *Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
return nil
}
@ -157,7 +157,7 @@ func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server *Server, peer *Peer
//--------------------------------------
// Handles incoming AppendEntries requests.
func (t *HTTPTransporter) appendEntriesHandler(server *Server) http.HandlerFunc {
func (t *HTTPTransporter) appendEntriesHandler(server Server) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
traceln(server.Name(), "RECV /appendEntries")
@ -176,7 +176,7 @@ func (t *HTTPTransporter) appendEntriesHandler(server *Server) http.HandlerFunc
}
// Handles incoming RequestVote requests.
func (t *HTTPTransporter) requestVoteHandler(server *Server) http.HandlerFunc {
func (t *HTTPTransporter) requestVoteHandler(server Server) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
traceln(server.Name(), "RECV /requestVote")

View File

@ -14,8 +14,8 @@ func TestHTTPTransporter(t *testing.T) {
transporter := NewHTTPTransporter("/raft")
transporter.DisableKeepAlives = true
servers := []*Server{}
f0 := func(server *Server, httpServer *http.Server) {
servers := []Server{}
f0 := func(server Server, httpServer *http.Server) {
// Stop the leader and wait for an election.
server.Stop()
time.Sleep(testElectionTimeout * 2)
@ -25,15 +25,15 @@ func TestHTTPTransporter(t *testing.T) {
}
server.Start()
}
f1 := func(server *Server, httpServer *http.Server) {
f1 := func(server Server, httpServer *http.Server) {
}
f2 := func(server *Server, httpServer *http.Server) {
f2 := func(server Server, httpServer *http.Server) {
}
runTestHttpServers(t, &servers, transporter, f0, f1, f2)
}
// Starts multiple independent Raft servers wrapped with HTTP servers.
func runTestHttpServers(t *testing.T, servers *[]*Server, transporter *HTTPTransporter, callbacks ...func(*Server, *http.Server)) {
func runTestHttpServers(t *testing.T, servers *[]Server, transporter *HTTPTransporter, callbacks ...func(Server, *http.Server)) {
var wg sync.WaitGroup
httpServers := []*http.Server{}
listeners := []net.Listener{}
@ -94,7 +94,7 @@ func BenchmarkSpeed(b *testing.B) {
transporter := NewHTTPTransporter("/raft")
transporter.DisableKeepAlives = true
servers := []*Server{}
servers := []Server{}
for i := 0; i < 3; i++ {
port := 9000 + i
@ -145,7 +145,7 @@ func BenchmarkSpeed(b *testing.B) {
}
}
func send(c chan bool, s *Server) {
func send(c chan bool, s Server) {
for i := 0; i < 20; i++ {
s.Do(&NOPCommand{})
}

View File

@ -3,7 +3,7 @@ package raft
// Join command interface
type JoinCommand interface {
CommandName() string
Apply(server *Server) (interface{}, error)
Apply(server Server) (interface{}, error)
NodeName() string
}
@ -18,7 +18,7 @@ func (c *DefaultJoinCommand) CommandName() string {
return "raft:join"
}
func (c *DefaultJoinCommand) Apply(server *Server) (interface{}, error) {
func (c *DefaultJoinCommand) Apply(server Server) (interface{}, error) {
err := server.AddPeer(c.Name, c.ConnectionString)
return []byte("join"), err

View File

@ -3,7 +3,7 @@ package raft
// Leave command interface
type LeaveCommand interface {
CommandName() string
Apply(server *Server) (interface{}, error)
Apply(server Server) (interface{}, error)
NodeName() string
}
@ -17,7 +17,7 @@ func (c *DefaultLeaveCommand) CommandName() string {
return "raft:leave"
}
func (c *DefaultLeaveCommand) Apply(server *Server) (interface{}, error) {
func (c *DefaultLeaveCommand) Apply(server Server) (interface{}, error) {
err := server.RemovePeer(c.Name)
return []byte("leave"), err

View File

@ -13,7 +13,7 @@ func (c NOPCommand) CommandName() string {
return "raft:nop"
}
func (c NOPCommand) Apply(server *Server) (interface{}, error) {
func (c NOPCommand) Apply(server Server) (interface{}, error) {
return nil, nil
}

View File

@ -13,7 +13,7 @@ import (
// A peer is a reference to another server involved in the consensus protocol.
type Peer struct {
server *Server
server *server
Name string `json:"name"`
ConnectionString string `json:"connectionString"`
prevLogIndex uint64
@ -29,7 +29,7 @@ type Peer struct {
//------------------------------------------------------------------------------
// Creates a new peer.
func newPeer(server *Server, name string, connectionString string, heartbeatTimeout time.Duration) *Peer {
func newPeer(server *server, name string, connectionString string, heartbeatTimeout time.Duration) *Peer {
return &Peer{
server: server,
Name: name,

151
server.go
View File

@ -57,7 +57,38 @@ var CommandTimeoutError = errors.New("raft: Command timeout")
// A server is involved in the consensus protocol and can act as a follower,
// candidate or a leader.
type Server struct {
type Server interface {
Name() string
Context() interface{}
StateMachine() StateMachine
Leader() string
State() string
Path() string
LogPath() string
Term() uint64
CommitIndex() uint64
VotedFor() string
MemberCount() int
QuorumSize() int
IsLogEmpty() bool
ElectionTimeout() time.Duration
SetElectionTimeout(duration time.Duration)
HeartbeatTimeout() time.Duration
SetHeartbeatTimeout(duration time.Duration)
Transporter() Transporter
SetTransporter(t Transporter)
AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse
RequestVote(req *RequestVoteRequest) *RequestVoteResponse
AddPeer(name string, connectiongString string) error
RemovePeer(name string) error
Peers() map[string]*Peer
Start() error
Stop()
Running() bool
Do(command Command) (interface{}, error)
}
type server struct {
name string
path string
state string
@ -98,7 +129,7 @@ type event struct {
//------------------------------------------------------------------------------
// Creates a new server with a log at the given path.
func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}, connectionString string) (*Server, error) {
func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}, connectionString string) (Server, error) {
if name == "" {
return nil, errors.New("raft.Server: Name cannot be blank")
}
@ -106,7 +137,7 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S
panic("raft: Transporter required")
}
s := &Server{
s := &server{
name: name,
path: path,
transporter: transporter,
@ -142,22 +173,22 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S
//--------------------------------------
// Retrieves the name of the server.
func (s *Server) Name() string {
func (s *server) Name() string {
return s.name
}
// Retrieves the storage path for the server.
func (s *Server) Path() string {
func (s *server) Path() string {
return s.path
}
// The name of the current leader.
func (s *Server) Leader() string {
func (s *server) Leader() string {
return s.leader
}
// Retrieves a copy of the peer data.
func (s *Server) Peers() map[string]*Peer {
func (s *server) Peers() map[string]*Peer {
s.mutex.Lock()
defer s.mutex.Unlock()
@ -169,42 +200,42 @@ func (s *Server) Peers() map[string]*Peer {
}
// Retrieves the object that transports requests.
func (s *Server) Transporter() Transporter {
func (s *server) Transporter() Transporter {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.transporter
}
func (s *Server) SetTransporter(t Transporter) {
func (s *server) SetTransporter(t Transporter) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.transporter = t
}
// Retrieves the context passed into the constructor.
func (s *Server) Context() interface{} {
func (s *server) Context() interface{} {
return s.context
}
// Retrieves the state machine passed into the constructor.
func (s *Server) StateMachine() StateMachine {
func (s *server) StateMachine() StateMachine {
return s.stateMachine
}
// Retrieves the log path for the server.
func (s *Server) LogPath() string {
func (s *server) LogPath() string {
return path.Join(s.path, "log")
}
// Retrieves the current state of the server.
func (s *Server) State() string {
func (s *server) State() string {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.state
}
// Sets the state of the server.
func (s *Server) setState(state string) {
func (s *server) setState(state string) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.state = state
@ -214,44 +245,44 @@ func (s *Server) setState(state string) {
}
// Retrieves the current term of the server.
func (s *Server) Term() uint64 {
func (s *server) Term() uint64 {
return s.currentTerm
}
// Retrieves the current commit index of the server.
func (s *Server) CommitIndex() uint64 {
func (s *server) CommitIndex() uint64 {
return s.log.commitIndex
}
// Retrieves the name of the candidate this server voted for in this term.
func (s *Server) VotedFor() string {
func (s *server) VotedFor() string {
return s.votedFor
}
// Retrieves whether the server's log has no entries.
func (s *Server) IsLogEmpty() bool {
func (s *server) IsLogEmpty() bool {
return s.log.isEmpty()
}
// A list of all the log entries. This should only be used for debugging purposes.
func (s *Server) LogEntries() []*LogEntry {
func (s *server) LogEntries() []*LogEntry {
return s.log.entries
}
// A reference to the command name of the last entry.
func (s *Server) LastCommandName() string {
func (s *server) LastCommandName() string {
return s.log.lastCommandName()
}
// Get the state of the server for debugging
func (s *Server) GetState() string {
func (s *server) GetState() string {
s.mutex.RLock()
defer s.mutex.RUnlock()
return fmt.Sprintf("Name: %s, State: %s, Term: %v, CommitedIndex: %v ", s.name, s.state, s.currentTerm, s.log.commitIndex)
}
// Check if the server is promotable
func (s *Server) promotable() bool {
func (s *server) promotable() bool {
return s.log.currentIndex() > 0
}
@ -260,14 +291,14 @@ func (s *Server) promotable() bool {
//--------------------------------------
// Retrieves the number of member servers in the consensus.
func (s *Server) MemberCount() int {
func (s *server) MemberCount() int {
s.mutex.Lock()
defer s.mutex.Unlock()
return len(s.peers) + 1
}
// Retrieves the number of servers required to make a quorum.
func (s *Server) QuorumSize() int {
func (s *server) QuorumSize() int {
return (s.MemberCount() / 2) + 1
}
@ -276,14 +307,14 @@ func (s *Server) QuorumSize() int {
//--------------------------------------
// Retrieves the election timeout.
func (s *Server) ElectionTimeout() time.Duration {
func (s *server) ElectionTimeout() time.Duration {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.electionTimeout
}
// Sets the election timeout.
func (s *Server) SetElectionTimeout(duration time.Duration) {
func (s *server) SetElectionTimeout(duration time.Duration) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.electionTimeout = duration
@ -294,14 +325,14 @@ func (s *Server) SetElectionTimeout(duration time.Duration) {
//--------------------------------------
// Retrieves the heartbeat timeout.
func (s *Server) HeartbeatTimeout() time.Duration {
func (s *server) HeartbeatTimeout() time.Duration {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.heartbeatTimeout
}
// Sets the heartbeat timeout.
func (s *Server) SetHeartbeatTimeout(duration time.Duration) {
func (s *server) SetHeartbeatTimeout(duration time.Duration) {
s.mutex.Lock()
defer s.mutex.Unlock()
@ -334,7 +365,7 @@ func init() {
// If no log entries exist and a self-join command is issued then
// immediately become leader and commit entry.
func (s *Server) Start() error {
func (s *server) Start() error {
// Exit if the server is already running.
if s.state != Stopped {
return errors.New("raft.Server: Server already running")
@ -380,7 +411,7 @@ func (s *Server) Start() error {
}
// Shuts down the server.
func (s *Server) Stop() {
func (s *server) Stop() {
s.send(&stopValue)
s.mutex.Lock()
defer s.mutex.Unlock()
@ -388,7 +419,7 @@ func (s *Server) Stop() {
}
// Checks if the server is currently running.
func (s *Server) Running() bool {
func (s *server) Running() bool {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.state != Stopped
@ -400,7 +431,7 @@ func (s *Server) Running() bool {
// Sets the current term for the server. This is only used when an external
// current term is found.
func (s *Server) setCurrentTerm(term uint64, leaderName string, append bool) {
func (s *server) setCurrentTerm(term uint64, leaderName string, append bool) {
s.mutex.Lock()
defer s.mutex.Unlock()
@ -439,7 +470,7 @@ func (s *Server) setCurrentTerm(term uint64, leaderName string, append bool) {
// | new leader | |
// |_______________________|____________________________________ |
// The main event loop for the server
func (s *Server) loop() {
func (s *server) loop() {
defer s.debugln("server.loop.end")
for {
@ -467,13 +498,13 @@ func (s *Server) loop() {
// Sends an event to the event loop to be processed. The function will wait
// until the event is actually processed before returning.
func (s *Server) send(value interface{}) (interface{}, error) {
func (s *server) send(value interface{}) (interface{}, error) {
event := s.sendAsync(value)
err := <-event.c
return event.returnValue, err
}
func (s *Server) sendAsync(value interface{}) *event {
func (s *server) sendAsync(value interface{}) *event {
event := &event{target: value, c: make(chan error, 1)}
s.c <- event
return event
@ -484,7 +515,7 @@ func (s *Server) sendAsync(value interface{}) *event {
// Converts to candidate if election timeout elapses without either:
// 1.Receiving valid AppendEntries RPC, or
// 2.Granting vote to candidate
func (s *Server) followerLoop() {
func (s *server) followerLoop() {
s.setState(Follower)
timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
@ -547,7 +578,7 @@ func (s *Server) followerLoop() {
}
// The event loop that is run when the server is in a Candidate state.
func (s *Server) candidateLoop() {
func (s *server) candidateLoop() {
lastLogIndex, lastLogTerm := s.log.lastInfo()
s.leader = ""
@ -630,7 +661,7 @@ func (s *Server) candidateLoop() {
}
// The event loop that is run when the server is in a Leader state.
func (s *Server) leaderLoop() {
func (s *server) leaderLoop() {
s.setState(Leader)
s.syncedPeer = make(map[string]bool)
logIndex, _ := s.log.lastInfo()
@ -682,7 +713,7 @@ func (s *Server) leaderLoop() {
s.syncedPeer = nil
}
func (s *Server) snapshotLoop() {
func (s *server) snapshotLoop() {
s.setState(Snapshotting)
for {
@ -721,12 +752,12 @@ func (s *Server) snapshotLoop() {
// Attempts to execute a command and replicate it. The function will return
// when the command has been successfully committed or an error has occurred.
func (s *Server) Do(command Command) (interface{}, error) {
func (s *server) Do(command Command) (interface{}, error) {
return s.send(command)
}
// Processes a command.
func (s *Server) processCommand(command Command, e *event) {
func (s *server) processCommand(command Command, e *event) {
s.debugln("server.command.process")
// Create an entry for the command in the log.
@ -779,14 +810,14 @@ func (s *Server) processCommand(command Command, e *event) {
//--------------------------------------
// Appends zero or more log entry from the leader to this server.
func (s *Server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse {
func (s *server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse {
ret, _ := s.send(req)
resp, _ := ret.(*AppendEntriesResponse)
return resp
}
// Processes the "append entries" request.
func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {
func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {
s.traceln("server.ae.process")
@ -824,7 +855,7 @@ func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) (*Append
// Processes the "append entries" response from the peer. This is only
// processed when the server is a leader. Responses received during other
// states are dropped.
func (s *Server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
// If we find a higher term then change to a follower and exit.
if resp.Term > s.currentTerm {
@ -888,14 +919,14 @@ func (s *Server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
// Requests a vote from a server. A vote can be obtained if the vote's term is
// at the server's current term and the server has not made a vote yet. A vote
// can also be obtained if the term is greater than the server's current term.
func (s *Server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
func (s *server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
ret, _ := s.send(req)
resp, _ := ret.(*RequestVoteResponse)
return resp
}
// Processes a "request vote" request.
func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
// If the request is coming from an old term then reject it.
if req.Term < s.currentTerm {
@ -933,7 +964,7 @@ func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot
//--------------------------------------
// Adds a peer to the server.
func (s *Server) AddPeer(name string, connectiongString string) error {
func (s *server) AddPeer(name string, connectiongString string) error {
s.debugln("server.peer.add: ", name, len(s.peers))
// Do not allow peers to be added twice.
@ -959,7 +990,7 @@ func (s *Server) AddPeer(name string, connectiongString string) error {
}
// Removes a peer from the server.
func (s *Server) RemovePeer(name string) error {
func (s *server) RemovePeer(name string) error {
s.debugln("server.peer.remove: ", name, len(s.peers))
// Skip the Peer if it has the same name as the Server
@ -988,7 +1019,7 @@ func (s *Server) RemovePeer(name string) error {
// Log compaction
//--------------------------------------
func (s *Server) TakeSnapshot() error {
func (s *server) TakeSnapshot() error {
//TODO put a snapshot mutex
s.debugln("take Snapshot")
if s.currentSnapshot != nil {
@ -1047,7 +1078,7 @@ func (s *Server) TakeSnapshot() error {
}
// Retrieves the log path for the server.
func (s *Server) saveSnapshot() error {
func (s *server) saveSnapshot() error {
if s.currentSnapshot == nil {
return errors.New("no snapshot to save")
@ -1071,17 +1102,17 @@ func (s *Server) saveSnapshot() error {
}
// Retrieves the log path for the server.
func (s *Server) SnapshotPath(lastIndex uint64, lastTerm uint64) string {
func (s *server) SnapshotPath(lastIndex uint64, lastTerm uint64) string {
return path.Join(s.path, "snapshot", fmt.Sprintf("%v_%v.ss", lastTerm, lastIndex))
}
func (s *Server) RequestSnapshot(req *SnapshotRequest) *SnapshotResponse {
func (s *server) RequestSnapshot(req *SnapshotRequest) *SnapshotResponse {
ret, _ := s.send(req)
resp, _ := ret.(*SnapshotResponse)
return resp
}
func (s *Server) processSnapshotRequest(req *SnapshotRequest) *SnapshotResponse {
func (s *server) processSnapshotRequest(req *SnapshotRequest) *SnapshotResponse {
// If the followers log contains an entry at the snapshots last index with a term
// that matches the snapshots last term
@ -1099,13 +1130,13 @@ func (s *Server) processSnapshotRequest(req *SnapshotRequest) *SnapshotResponse
return newSnapshotResponse(true)
}
func (s *Server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
func (s *server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
ret, _ := s.send(req)
resp, _ := ret.(*SnapshotRecoveryResponse)
return resp
}
func (s *Server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
func (s *server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
s.stateMachine.Recovery(req.State)
@ -1136,7 +1167,7 @@ func (s *Server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *S
}
// Load a snapshot at restart
func (s *Server) LoadSnapshot() error {
func (s *server) LoadSnapshot() error {
dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
if err != nil {
@ -1221,7 +1252,7 @@ func (s *Server) LoadSnapshot() error {
// Config File
//--------------------------------------
func (s *Server) writeConf() {
func (s *server) writeConf() {
peers := make([]*Peer, len(s.peers))
@ -1251,7 +1282,7 @@ func (s *Server) writeConf() {
}
// Read the configuration for the server.
func (s *Server) readConf() error {
func (s *server) readConf() error {
confPath := path.Join(s.path, "conf")
s.debugln("readConf.open ", confPath)
@ -1277,10 +1308,10 @@ func (s *Server) readConf() error {
// Debugging
//--------------------------------------
func (s *Server) debugln(v ...interface{}) {
func (s *server) debugln(v ...interface{}) {
debugf("[%s Term:%d] %s", s.name, s.currentTerm, fmt.Sprintln(v...))
}
func (s *Server) traceln(v ...interface{}) {
func (s *server) traceln(v ...interface{}) {
tracef("[%s] %s", s.name, fmt.Sprintln(v...))
}

View File

@ -37,40 +37,40 @@ func TestServerRequestVote(t *testing.T) {
// // Ensure that a vote request is denied if it comes from an old term.
func TestServerRequestVoteDeniedForStaleTerm(t *testing.T) {
server := newTestServer("1", &testTransporter{})
s := newTestServer("1", &testTransporter{})
server.Start()
if _, err := server.Do(&DefaultJoinCommand{Name: server.Name()}); err != nil {
t.Fatalf("Server %s unable to join: %v", server.Name(), err)
s.Start()
if _, err := s.Do(&DefaultJoinCommand{Name: s.Name()}); err != nil {
t.Fatalf("Server %s unable to join: %v", s.Name(), err)
}
server.currentTerm = 2
defer server.Stop()
resp := server.RequestVote(newRequestVoteRequest(1, "foo", 1, 0))
s.(*server).currentTerm = 2
defer s.Stop()
resp := s.RequestVote(newRequestVoteRequest(1, "foo", 1, 0))
if resp.Term != 2 || resp.VoteGranted {
t.Fatalf("Invalid request vote response: %v/%v", resp.Term, resp.VoteGranted)
}
if server.currentTerm != 2 && server.State() != Follower {
t.Fatalf("Server did not update term and demote: %v / %v", server.currentTerm, server.State())
if s.Term() != 2 && s.State() != Follower {
t.Fatalf("Server did not update term and demote: %v / %v", s.Term(), s.State())
}
}
// Ensure that a vote request is denied if we've already voted for a different candidate.
func TestServerRequestVoteDeniedIfAlreadyVoted(t *testing.T) {
server := newTestServer("1", &testTransporter{})
s := newTestServer("1", &testTransporter{})
server.Start()
if _, err := server.Do(&DefaultJoinCommand{Name: server.Name()}); err != nil {
t.Fatalf("Server %s unable to join: %v", server.Name(), err)
s.Start()
if _, err := s.Do(&DefaultJoinCommand{Name: s.Name()}); err != nil {
t.Fatalf("Server %s unable to join: %v", s.Name(), err)
}
server.currentTerm = 2
defer server.Stop()
resp := server.RequestVote(newRequestVoteRequest(2, "foo", 1, 0))
s.(*server).currentTerm = 2
defer s.Stop()
resp := s.RequestVote(newRequestVoteRequest(2, "foo", 1, 0))
if resp.Term != 2 || !resp.VoteGranted {
t.Fatalf("First vote should not have been denied")
}
resp = server.RequestVote(newRequestVoteRequest(2, "bar", 1, 0))
resp = s.RequestVote(newRequestVoteRequest(2, "bar", 1, 0))
if resp.Term != 2 || resp.VoteGranted {
t.Fatalf("Second vote should have been denied")
}
@ -78,24 +78,24 @@ func TestServerRequestVoteDeniedIfAlreadyVoted(t *testing.T) {
// Ensure that a vote request is approved if vote occurs in a new term.
func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) {
server := newTestServer("1", &testTransporter{})
s := newTestServer("1", &testTransporter{})
server.Start()
if _, err := server.Do(&DefaultJoinCommand{Name: server.Name()}); err != nil {
t.Fatalf("Server %s unable to join: %v", server.Name(), err)
s.Start()
if _, err := s.Do(&DefaultJoinCommand{Name: s.Name()}); err != nil {
t.Fatalf("Server %s unable to join: %v", s.Name(), err)
}
time.Sleep(time.Millisecond * 100)
server.currentTerm = 2
defer server.Stop()
resp := server.RequestVote(newRequestVoteRequest(2, "foo", 2, 1))
if resp.Term != 2 || !resp.VoteGranted || server.VotedFor() != "foo" {
s.(*server).currentTerm = 2
defer s.Stop()
resp := s.RequestVote(newRequestVoteRequest(2, "foo", 2, 1))
if resp.Term != 2 || !resp.VoteGranted || s.VotedFor() != "foo" {
t.Fatalf("First vote should not have been denied")
}
resp = server.RequestVote(newRequestVoteRequest(3, "bar", 2, 1))
resp = s.RequestVote(newRequestVoteRequest(3, "bar", 2, 1))
if resp.Term != 3 || !resp.VoteGranted || server.VotedFor() != "bar" {
if resp.Term != 3 || !resp.VoteGranted || s.VotedFor() != "bar" {
t.Fatalf("Second vote should have been approved")
}
}
@ -106,33 +106,32 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20})
e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100})
e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0})
server := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0, e1, e2})
s := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0, e1, e2})
// start as a follower with term 2 and index 3
server.Start()
defer server.Stop()
s.Start()
defer s.Stop()
// request vote from term 3 with last log entry 2, 2
resp := server.RequestVote(newRequestVoteRequest(3, "foo", 2, 2))
resp := s.RequestVote(newRequestVoteRequest(3, "foo", 2, 2))
if resp.Term != 3 || resp.VoteGranted {
t.Fatalf("Stale index vote should have been denied [%v/%v]", resp.Term, resp.VoteGranted)
}
// request vote from term 2 with last log entry 2, 3
resp = server.RequestVote(newRequestVoteRequest(2, "foo", 3, 2))
resp = s.RequestVote(newRequestVoteRequest(2, "foo", 3, 2))
if resp.Term != 3 || resp.VoteGranted {
t.Fatalf("Stale term vote should have been denied [%v/%v]", resp.Term, resp.VoteGranted)
}
// request vote from term 3 with last log entry 2, 3
resp = server.RequestVote(newRequestVoteRequest(3, "foo", 3, 2))
resp = s.RequestVote(newRequestVoteRequest(3, "foo", 3, 2))
if resp.Term != 3 || !resp.VoteGranted {
t.Fatalf("Matching log vote should have been granted")
}
// request vote from term 3 with last log entry 2, 4
resp = server.RequestVote(newRequestVoteRequest(3, "foo", 4, 2))
resp = s.RequestVote(newRequestVoteRequest(3, "foo", 4, 2))
if resp.Term != 3 || !resp.VoteGranted {
t.Fatalf("Ahead-of-log vote should have been granted")
}
@ -145,28 +144,27 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
// // Ensure that we can self-promote a server to candidate, obtain votes and become a fearless leader.
func TestServerPromoteSelf(t *testing.T) {
e0, _ := newLogEntry(newLog(), 1, 1, &testCommand1{Val: "foo", I: 20})
server := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0})
s := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0})
// start as a follower
server.Start()
defer server.Stop()
s.Start()
defer s.Stop()
time.Sleep(2 * testElectionTimeout)
if server.State() != Leader {
t.Fatalf("Server self-promotion failed: %v", server.State())
if s.State() != Leader {
t.Fatalf("Server self-promotion failed: %v", s.State())
}
}
//Ensure that we can promote a server within a cluster to a leader.
func TestServerPromote(t *testing.T) {
lookup := map[string]*Server{}
lookup := map[string]Server{}
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
transporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
return lookup[peer.Name].RequestVote(req)
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
transporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
return lookup[peer.Name].AppendEntries(req)
}
servers := newTestCluster([]string{"1", "2", "3"}, transporter, lookup)
@ -180,8 +178,8 @@ func TestServerPromote(t *testing.T) {
if servers[0].State() != Leader && servers[1].State() != Leader && servers[2].State() != Leader {
t.Fatalf("No leader elected: (%s, %s, %s)", servers[0].State(), servers[1].State(), servers[2].State())
}
for _, server := range servers {
server.Stop()
for _, s := range servers {
s.Stop()
}
}
@ -191,20 +189,20 @@ func TestServerPromote(t *testing.T) {
// Ensure we can append entries to a server.
func TestServerAppendEntries(t *testing.T) {
server := newTestServer("1", &testTransporter{})
s := newTestServer("1", &testTransporter{})
server.SetHeartbeatTimeout(time.Second * 10)
server.Start()
defer server.Stop()
s.SetHeartbeatTimeout(time.Second * 10)
s.Start()
defer s.Stop()
// Append single entry.
e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
entries := []*LogEntry{e}
resp := server.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries))
resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries))
if resp.Term != 1 || !resp.Success {
t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
}
if index, term := server.log.commitInfo(); index != 0 || term != 0 {
if index, term := s.(*server).log.commitInfo(); index != 0 || term != 0 {
t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
}
@ -212,57 +210,56 @@ func TestServerAppendEntries(t *testing.T) {
e1, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "bar", I: 20})
e2, _ := newLogEntry(nil, 3, 1, &testCommand1{Val: "baz", I: 30})
entries = []*LogEntry{e1, e2}
resp = server.AppendEntries(newAppendEntriesRequest(1, 1, 1, 1, "ldr", entries))
resp = s.AppendEntries(newAppendEntriesRequest(1, 1, 1, 1, "ldr", entries))
if resp.Term != 1 || !resp.Success {
t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
}
if index, term := server.log.commitInfo(); index != 1 || term != 1 {
if index, term := s.(*server).log.commitInfo(); index != 1 || term != 1 {
t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
}
// Send zero entries and commit everything.
resp = server.AppendEntries(newAppendEntriesRequest(2, 3, 1, 3, "ldr", []*LogEntry{}))
resp = s.AppendEntries(newAppendEntriesRequest(2, 3, 1, 3, "ldr", []*LogEntry{}))
if resp.Term != 2 || !resp.Success {
t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
}
if index, term := server.log.commitInfo(); index != 3 || term != 1 {
if index, term := s.(*server).log.commitInfo(); index != 3 || term != 1 {
t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
}
}
//Ensure that entries with stale terms are rejected.
func TestServerAppendEntriesWithStaleTermsAreRejected(t *testing.T) {
server := newTestServer("1", &testTransporter{})
s := newTestServer("1", &testTransporter{})
server.Start()
s.Start()
defer server.Stop()
server.currentTerm = 2
defer s.Stop()
s.(*server).currentTerm = 2
// Append single entry.
e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
entries := []*LogEntry{e}
resp := server.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries))
resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries))
if resp.Term != 2 || resp.Success {
t.Fatalf("AppendEntries should have failed: %v/%v", resp.Term, resp.Success)
}
if index, term := server.log.commitInfo(); index != 0 || term != 0 {
if index, term := s.(*server).log.commitInfo(); index != 0 || term != 0 {
t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
}
}
// Ensure that we reject entries if the commit log is different.
func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) {
server := newTestServer("1", &testTransporter{})
server.Start()
defer server.Stop()
s := newTestServer("1", &testTransporter{})
s.Start()
defer s.Stop()
// Append single entry + commit.
e1, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
e2, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "foo", I: 15})
entries := []*LogEntry{e1, e2}
resp := server.AppendEntries(newAppendEntriesRequest(1, 0, 0, 2, "ldr", entries))
resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 2, "ldr", entries))
if resp.Term != 1 || !resp.Success {
t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
}
@ -270,7 +267,7 @@ func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) {
// Append entry again (post-commit).
e, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "bar", I: 20})
entries = []*LogEntry{e}
resp = server.AppendEntries(newAppendEntriesRequest(1, 2, 1, 1, "ldr", entries))
resp = s.AppendEntries(newAppendEntriesRequest(1, 2, 1, 1, "ldr", entries))
if resp.Term != 1 || resp.Success {
t.Fatalf("AppendEntries should have failed: %v/%v", resp.Term, resp.Success)
}
@ -278,9 +275,9 @@ func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) {
// Ensure that we uncommitted entries are rolled back if new entries overwrite them.
func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) {
server := newTestServer("1", &testTransporter{})
server.Start()
defer server.Stop()
s := newTestServer("1", &testTransporter{})
s.Start()
defer s.Stop()
entry1, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
entry2, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "foo", I: 15})
@ -288,15 +285,15 @@ func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) {
// Append single entry + commit.
entries := []*LogEntry{entry1, entry2}
resp := server.AppendEntries(newAppendEntriesRequest(1, 0, 0, 1, "ldr", entries))
if resp.Term != 1 || !resp.Success || server.log.commitIndex != 1 || !reflect.DeepEqual(server.log.entries, []*LogEntry{entry1, entry2}) {
resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 1, "ldr", entries))
if resp.Term != 1 || !resp.Success || s.(*server).log.commitIndex != 1 || !reflect.DeepEqual(s.(*server).log.entries, []*LogEntry{entry1, entry2}) {
t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
}
// Append entry that overwrites the second (uncommitted) entry.
entries = []*LogEntry{entry3}
resp = server.AppendEntries(newAppendEntriesRequest(2, 1, 1, 2, "ldr", entries))
if resp.Term != 2 || !resp.Success || server.log.commitIndex != 2 || !reflect.DeepEqual(server.log.entries, []*LogEntry{entry1, entry3}) {
resp = s.AppendEntries(newAppendEntriesRequest(2, 1, 1, 2, "ldr", entries))
if resp.Term != 2 || !resp.Success || s.(*server).log.commitIndex != 2 || !reflect.DeepEqual(s.(*server).log.entries, []*LogEntry{entry1, entry3}) {
t.Fatalf("AppendEntries should have succeeded: %v/%v", resp.Term, resp.Success)
}
}
@ -307,11 +304,11 @@ func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) {
// Ensure that a follower cannot execute a command.
func TestServerDenyCommandExecutionWhenFollower(t *testing.T) {
server := newTestServer("1", &testTransporter{})
server.Start()
defer server.Stop()
s := newTestServer("1", &testTransporter{})
s.Start()
defer s.Stop()
var err error
if _, err = server.Do(&testCommand1{Val: "foo", I: 10}); err != NotLeaderError {
if _, err = s.Do(&testCommand1{Val: "foo", I: 10}); err != NotLeaderError {
t.Fatalf("Expected error: %v, got: %v", NotLeaderError, err)
}
}
@ -324,27 +321,27 @@ func TestServerDenyCommandExecutionWhenFollower(t *testing.T) {
func TestServerRecoverFromPreviousLogAndConf(t *testing.T) {
// Initialize the servers.
var mutex sync.RWMutex
servers := map[string]*Server{}
servers := map[string]Server{}
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
transporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
mutex.RLock()
s := servers[peer.Name]
target := servers[peer.Name]
mutex.RUnlock()
return s.RequestVote(req)
return target.RequestVote(req)
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
transporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
mutex.RLock()
s := servers[peer.Name]
target := servers[peer.Name]
mutex.RUnlock()
return s.AppendEntries(req)
return target.AppendEntries(req)
}
disTransporter := &testTransporter{}
disTransporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
disTransporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
return nil
}
disTransporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
disTransporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
return nil
}
@ -358,22 +355,22 @@ func TestServerRecoverFromPreviousLogAndConf(t *testing.T) {
names = append(names, strconv.Itoa(i))
}
var leader *Server
var leader Server
for _, name := range names {
server := newTestServer(name, transporter)
s := newTestServer(name, transporter)
servers[name] = server
paths[name] = server.Path()
servers[name] = s
paths[name] = s.Path()
if name == "1" {
leader = server
server.SetHeartbeatTimeout(testHeartbeatTimeout)
server.Start()
leader = s
s.SetHeartbeatTimeout(testHeartbeatTimeout)
s.Start()
time.Sleep(testHeartbeatTimeout)
} else {
server.SetElectionTimeout(testElectionTimeout)
server.SetHeartbeatTimeout(testHeartbeatTimeout)
server.Start()
s.SetElectionTimeout(testElectionTimeout)
s.SetHeartbeatTimeout(testHeartbeatTimeout)
s.Start()
time.Sleep(testHeartbeatTimeout)
}
if _, err := leader.Do(&DefaultJoinCommand{Name: name}); err != nil {
@ -392,28 +389,28 @@ func TestServerRecoverFromPreviousLogAndConf(t *testing.T) {
time.Sleep(2 * testHeartbeatTimeout)
for _, name := range names {
server := servers[name]
if server.CommitIndex() != 16 {
t.Fatalf("%s commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 16)
s := servers[name]
if s.CommitIndex() != 16 {
t.Fatalf("%s commitIndex is invalid [%d/%d]", name, s.CommitIndex(), 16)
}
server.Stop()
s.Stop()
}
for _, name := range names {
// with old path and disable transportation
server := newTestServerWithPath(name, disTransporter, paths[name])
servers[name] = server
s := newTestServerWithPath(name, disTransporter, paths[name])
servers[name] = s
server.Start()
s.Start()
// should only commit to the last join command
if server.CommitIndex() != 6 {
t.Fatalf("%s recover phase 1 commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 6)
if s.CommitIndex() != 6 {
t.Fatalf("%s recover phase 1 commitIndex is invalid [%d/%d]", name, s.CommitIndex(), 6)
}
// peer conf should be recovered
if len(server.Peers()) != 4 {
t.Fatalf("%s recover phase 1 peer failed! [%d/%d]", name, len(server.Peers()), 4)
if len(s.Peers()) != 4 {
t.Fatalf("%s recover phase 1 peer failed! [%d/%d]", name, len(s.Peers()), 4)
}
}
@ -426,11 +423,11 @@ func TestServerRecoverFromPreviousLogAndConf(t *testing.T) {
// should commit to the previous index + 1(nop command when new leader elected)
for _, name := range names {
server := servers[name]
if server.CommitIndex() != 17 {
t.Fatalf("%s commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 17)
s := servers[name]
if s.CommitIndex() != 17 {
t.Fatalf("%s commitIndex is invalid [%d/%d]", name, s.CommitIndex(), 17)
}
server.Stop()
s.Stop()
}
}
@ -440,29 +437,29 @@ func TestServerRecoverFromPreviousLogAndConf(t *testing.T) {
// Ensure that we can start a single server and append to its log.
func TestServerSingleNode(t *testing.T) {
server := newTestServer("1", &testTransporter{})
if server.State() != Stopped {
t.Fatalf("Unexpected server state: %v", server.State())
s := newTestServer("1", &testTransporter{})
if s.State() != Stopped {
t.Fatalf("Unexpected server state: %v", s.State())
}
server.Start()
s.Start()
time.Sleep(testHeartbeatTimeout)
// Join the server to itself.
if _, err := server.Do(&DefaultJoinCommand{Name: "1"}); err != nil {
if _, err := s.Do(&DefaultJoinCommand{Name: "1"}); err != nil {
t.Fatalf("Unable to join: %v", err)
}
debugln("finish command")
if server.State() != Leader {
t.Fatalf("Unexpected server state: %v", server.State())
if s.State() != Leader {
t.Fatalf("Unexpected server state: %v", s.State())
}
server.Stop()
s.Stop()
if server.State() != Stopped {
t.Fatalf("Unexpected server state: %v", server.State())
if s.State() != Stopped {
t.Fatalf("Unexpected server state: %v", s.State())
}
}
@ -470,27 +467,27 @@ func TestServerSingleNode(t *testing.T) {
func TestServerMultiNode(t *testing.T) {
// Initialize the servers.
var mutex sync.RWMutex
servers := map[string]*Server{}
servers := map[string]Server{}
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
transporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
mutex.RLock()
s := servers[peer.Name]
target := servers[peer.Name]
mutex.RUnlock()
return s.RequestVote(req)
return target.RequestVote(req)
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
transporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
mutex.RLock()
s := servers[peer.Name]
target := servers[peer.Name]
mutex.RUnlock()
return s.AppendEntries(req)
return target.AppendEntries(req)
}
disTransporter := &testTransporter{}
disTransporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
disTransporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
return nil
}
disTransporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
disTransporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
return nil
}
@ -503,24 +500,24 @@ func TestServerMultiNode(t *testing.T) {
names = append(names, strconv.Itoa(i))
}
var leader *Server
var leader Server
for _, name := range names {
server := newTestServer(name, transporter)
defer server.Stop()
s := newTestServer(name, transporter)
defer s.Stop()
mutex.Lock()
servers[name] = server
servers[name] = s
mutex.Unlock()
if name == "1" {
leader = server
server.SetHeartbeatTimeout(testHeartbeatTimeout)
server.Start()
leader = s
s.SetHeartbeatTimeout(testHeartbeatTimeout)
s.Start()
time.Sleep(testHeartbeatTimeout)
} else {
server.SetElectionTimeout(testElectionTimeout)
server.SetHeartbeatTimeout(testHeartbeatTimeout)
server.Start()
s.SetElectionTimeout(testElectionTimeout)
s.SetHeartbeatTimeout(testHeartbeatTimeout)
s.Start()
time.Sleep(testHeartbeatTimeout)
}
if _, err := leader.Do(&DefaultJoinCommand{Name: name}); err != nil {
@ -536,7 +533,7 @@ func TestServerMultiNode(t *testing.T) {
t.Fatalf("Expected member count to be %v, got %v", n, leader.MemberCount())
}
if servers["2"].State() == Leader || servers["3"].State() == Leader {
t.Fatalf("Expected leader should be 1: 2=%v, 3=%v\n", servers["2"].state, servers["3"].state)
t.Fatalf("Expected leader should be 1: 2=%v, 3=%v\n", servers["2"].State(), servers["3"].State())
}
mutex.RUnlock()
@ -573,7 +570,7 @@ func TestServerMultiNode(t *testing.T) {
}
debugln("[Test] Done")
}
debugln("Leader is ", value.Name(), " Index ", value.log.commitIndex)
debugln("Leader is ", value.Name(), " Index ", value.(*server).log.commitIndex)
}
debugln("Not Found leader")
}
@ -584,7 +581,7 @@ func TestServerMultiNode(t *testing.T) {
if value.State() == Leader {
leader++
}
debugln(value.Name(), " ", value.currentTerm, " ", value.state)
debugln(value.Name(), " ", value.(*server).Term(), " ", value.State())
}
}

28
test.go
View File

@ -60,7 +60,7 @@ func setupLog(entries []*LogEntry) (*Log, string) {
// Servers
//--------------------------------------
func newTestServer(name string, transporter Transporter) *Server {
func newTestServer(name string, transporter Transporter) Server {
p, _ := ioutil.TempDir("", "raft-server-")
if err := os.MkdirAll(p, 0644); err != nil {
panic(err.Error())
@ -69,12 +69,12 @@ func newTestServer(name string, transporter Transporter) *Server {
return server
}
func newTestServerWithPath(name string, transporter Transporter, p string) *Server {
func newTestServerWithPath(name string, transporter Transporter, p string) Server {
server, _ := NewServer(name, p, transporter, nil, nil, "")
return server
}
func newTestServerWithLog(name string, transporter Transporter, entries []*LogEntry) *Server {
func newTestServerWithLog(name string, transporter Transporter, entries []*LogEntry) Server {
server := newTestServer(name, transporter)
f, err := os.Create(server.LogPath())
if err != nil {
@ -88,8 +88,8 @@ func newTestServerWithLog(name string, transporter Transporter, entries []*LogEn
return server
}
func newTestCluster(names []string, transporter Transporter, lookup map[string]*Server) []*Server {
servers := []*Server{}
func newTestCluster(names []string, transporter Transporter, lookup map[string]Server) []Server {
servers := []Server{}
e0, _ := newLogEntry(newLog(), 1, 1, &testCommand1{Val: "foo", I: 20})
for _, name := range names {
@ -116,24 +116,24 @@ func newTestCluster(names []string, transporter Transporter, lookup map[string]*
//--------------------------------------
type testTransporter struct {
sendVoteRequestFunc func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse
sendAppendEntriesRequestFunc func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse
sendSnapshotRequestFunc func(server *Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse
sendVoteRequestFunc func(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse
sendAppendEntriesRequestFunc func(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse
sendSnapshotRequestFunc func(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse
}
func (t *testTransporter) SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
func (t *testTransporter) SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
return t.sendVoteRequestFunc(server, peer, req)
}
func (t *testTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
func (t *testTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
return t.sendAppendEntriesRequestFunc(server, peer, req)
}
func (t *testTransporter) SendSnapshotRequest(server *Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse {
func (t *testTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse {
return t.sendSnapshotRequestFunc(server, peer, req)
}
func (t *testTransporter) SendSnapshotRecoveryRequest(server *Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
func (t *testTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
return t.SendSnapshotRecoveryRequest(server, peer, req)
}
@ -163,7 +163,7 @@ func (c *testCommand1) CommandName() string {
return "cmd_1"
}
func (c *testCommand1) Apply(server *Server) (interface{}, error) {
func (c *testCommand1) Apply(server Server) (interface{}, error) {
return nil, nil
}
@ -179,6 +179,6 @@ func (c *testCommand2) CommandName() string {
return "cmd_2"
}
func (c *testCommand2) Apply(server *Server) (interface{}, error) {
func (c *testCommand2) Apply(server Server) (interface{}, error) {
return nil, nil
}

View File

@ -9,8 +9,8 @@ package raft
// Transporter is the interface for allowing the host application to transport
// requests to other nodes.
type Transporter interface {
SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse
SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse
SendSnapshotRequest(server *Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse
SendSnapshotRecoveryRequest(server *Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse
SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse
SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse
SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse
SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse
}