solve conflicts

pull/820/head
Xiang Li 2013-06-04 17:22:09 -07:00
commit 32729ef871
10 changed files with 137 additions and 47 deletions

20
LICENSE Normal file
View File

@ -0,0 +1,20 @@
Copyright 2013 go-raft contributors
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@ -40,7 +40,7 @@ type Command interface {
//--------------------------------------
// Creates a new instance of a command by name.
func newCommand(name string) (Command, error) {
func NewCommand(name string) (Command, error) {
// Find the registered command.
command := commandTypes[name]
if command == nil {

14
log.go
View File

@ -94,6 +94,18 @@ func (l *Log) IsEmpty() bool {
return (len(l.entries) == 0)
}
// The name of the last command in the log.
func (l *Log) LastCommandName() string {
l.mutex.Lock()
defer l.mutex.Unlock()
if len(l.entries) > 0 {
if command := l.entries[len(l.entries)-1].Command; command != nil {
return command.CommandName()
}
}
return ""
}
//--------------------------------------
// Log Terms
//--------------------------------------
@ -146,8 +158,6 @@ func (l *Log) Open(path string) error {
entry := NewLogEntry(l, 0, 0, nil)
n, err := entry.Decode(reader)
if err != nil {
warn("raft.Log: %v", err)
warn("raft.Log: Recovering (%d)", lastIndex)
file.Close()
if err = os.Truncate(path, int64(lastIndex)); err != nil {
return fmt.Errorf("raft.Log: Unable to recover: %v", err)

View File

@ -134,7 +134,7 @@ func (e *LogEntry) Decode(r io.Reader) (pos int, err error) {
}
// Instantiate command by name.
command, err := newCommand(commandName)
command, err := NewCommand(commandName)
if err != nil {
err = fmt.Errorf("raft.LogEntry: Unable to instantiate command (%s): %v", commandName, err)
return
@ -184,7 +184,7 @@ func (e *LogEntry) UnmarshalJSON(data []byte) error {
// Create a command based on the name.
var err error
if e.Command, err = newCommand(obj.Name); err != nil {
if e.Command, err = NewCommand(obj.Name); err != nil {
return err
}
json.Unmarshal(obj.Command, e.Command)

View File

@ -120,8 +120,6 @@ func TestLogContainsEntries(t *testing.T) {
// Ensure that we can recover from an incomplete/corrupt log and continue logging.
func TestLogRecovery(t *testing.T) {
warn("")
warn("--- BEGIN RECOVERY TEST")
path := setupLogFile(`cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}` + "\n" +
`4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}` + "\n" +
`6ac5807c 0000000000000003 00000000000`)
@ -172,7 +170,6 @@ func TestLogRecovery(t *testing.T) {
if string(actual) != expected {
t.Fatalf("Unexpected buffer:\nexp:\n%s\ngot:\n%s", expected, string(actual))
}
warn("--- END RECOVERY TEST\n")
}
//--------------------------------------

13
peer.go
View File

@ -36,9 +36,11 @@ func NewPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer
heartbeatTimer: NewTimer(heartbeatTimeout, heartbeatTimeout),
}
// Start the heartbeat timeout.
go p.heartbeatTimeoutFunc()
// Start the heartbeat timeout and wait for the goroutine to start.
c := make(chan bool)
go p.heartbeatTimeoutFunc(c)
<-c
return p
}
@ -188,8 +190,9 @@ func (p *Peer) sendFlushRequest(req *AppendEntriesRequest) (uint64, bool, error)
//--------------------------------------
// Listens to the heartbeat timeout and flushes an AppendEntries RPC.
func (p *Peer) heartbeatTimeoutFunc() {
fmt.Println("heart beat")
func (p *Peer) heartbeatTimeoutFunc(startChannel chan bool) {
startChannel <- true
for {
// Grab the current timer channel.
p.mutex.Lock()

View File

@ -46,10 +46,11 @@ var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
// A server is involved in the consensus protocol and can act as a follower,
// candidate or a leader.
type Server struct {
transporter Transporter
name string
path string
state string
transporter Transporter
context interface{}
currentTerm uint64
votedFor string
log *Log
@ -71,7 +72,7 @@ type Server struct {
//------------------------------------------------------------------------------
// Creates a new server with a log at the given path.
func NewServer(name string, path string, transporter Transporter) (*Server, error) {
func NewServer(name string, path string, transporter Transporter, context interface{}) (*Server, error) {
if name == "" {
return nil, errors.New("raft.Server: Name cannot be blank")
}
@ -83,6 +84,7 @@ func NewServer(name string, path string, transporter Transporter) (*Server, erro
name: name,
path: path,
transporter: transporter,
context: context,
state: Stopped,
peers: make(map[string]*Peer),
log: NewLog(),
@ -119,7 +121,7 @@ func (s *Server) Path() string {
return s.path
}
func (s *Server) GetLeader() string {
func (s *Server) Leader() string {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.leader
@ -129,6 +131,11 @@ func (s *Server) Transporter() Transporter {
return s.transporter
}
// Retrieves the context passed into the constructor.
func (s *Server) Context() interface{} {
return s.context
}
// Retrieves the log path for the server.
func (s *Server) LogPath() string {
return fmt.Sprintf("%s/log", s.path)
@ -150,17 +157,31 @@ func (s *Server) VotedFor() string {
// Retrieves whether the server's log has no entries.
func (s *Server) IsLogEmpty() bool {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.log.IsEmpty()
}
// A list of all the log entries. This should only be used for debugging purposes.
func (s *Server) LogEntries() []*LogEntry {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.log != nil {
return s.log.entries
}
return nil
}
// A reference to the command name of the last entry.
func (s *Server) LastCommandName() string {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.log != nil {
return s.log.LastCommandName()
}
return ""
}
//--------------------------------------
// Membership
//--------------------------------------
@ -256,8 +277,9 @@ func (s *Server) Start() error {
}
// Start the election timeout.
go s.electionTimeoutFunc()
s.electionTimer.Reset()
c := make(chan bool)
go s.electionTimeoutFunc(c)
<- c
return nil
}
@ -272,7 +294,10 @@ func (s *Server) Stop() {
// Unloads the server.
func (s *Server) unload() {
// Kill the election timer.
s.electionTimer.Stop()
if s.electionTimer != nil {
s.electionTimer.Stop()
s.electionTimer = nil
}
// Remove peers.
for _, peer := range s.peers {
@ -307,9 +332,9 @@ func (s *Server) Initialize() error {
// Exit if the server is not running.
if !s.Running() {
return errors.New("raft.Server: Cannot join while stopped")
return errors.New("raft.Server: Cannot initialize while stopped")
} else if s.MemberCount() > 1 {
return errors.New("raft.Server: Cannot join; already in membership")
return errors.New("raft.Server: Cannot initialize; already in membership")
}
// Promote to leader.
@ -367,9 +392,15 @@ func (s *Server) do(command Command) error {
fmt.Println("DO: error")
return
} else if term > currentTerm {
s.mutex.Lock()
s.setCurrentTerm(term)
s.electionTimer.Reset()
if s.electionTimer != nil {
s.electionTimer.Reset()
}
fmt.Println("DO: term")
s.mutex.Unlock()
return
}
@ -437,16 +468,15 @@ func (s *Server) AppendEntries(req *AppendEntriesRequest) (*AppendEntriesRespons
return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), fmt.Errorf("raft.Server: Stale request term")
}
s.setCurrentTerm(req.Term)
s.state = Follower
// Update the current leader.
s.leader = req.LeaderName
for _, peer := range s.peers {
peer.pause()
}
fmt.Println("2")
// Reset election timeout.
s.electionTimer.Reset()
fmt.Println("3")
if s.electionTimer != nil {
s.electionTimer.Reset()
}
// Reject if log doesn't contain a matching previous entry.
if err := s.log.Truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), err
@ -495,7 +525,10 @@ func (s *Server) createInternalAppendEntriesRequest(prevLogIndex uint64) *Append
func (s *Server) promote() (bool, error) {
for {
// Start a new election.
term, lastLogIndex, lastLogTerm := s.promoteToCandidate()
term, lastLogIndex, lastLogTerm, err := s.promoteToCandidate()
if err != nil {
return false, err
}
// Request votes from each of our peers.
c := make(chan *RequestVoteResponse, len(s.peers))
@ -535,8 +568,12 @@ func (s *Server) promote() (bool, error) {
if resp != nil {
// Step down if we discover a higher term.
if resp.Term > term {
s.mutex.Lock()
s.setCurrentTerm(term)
s.electionTimer.Reset()
if s.electionTimer != nil {
s.electionTimer.Reset()
}
s.mutex.Unlock()
return false, fmt.Errorf("raft.Server: Higher term discovered, stepping down: (%v > %v)", resp.Term, term)
}
votes[resp.peer.Name()] = resp.VoteGranted
@ -565,10 +602,15 @@ func (s *Server) promote() (bool, error) {
// Promotes the server to a candidate and increases the election term. The
// term and log state are returned for use in the RPCs.
func (s *Server) promoteToCandidate() (term uint64, lastLogIndex uint64, lastLogTerm uint64) {
func (s *Server) promoteToCandidate() (uint64, uint64, uint64, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
// Ignore promotion if the server is not a follower.
if s.state != Follower && s.state != Candidate {
return 0, 0, 0, fmt.Errorf("raft: Invalid promotion state: %s", s.state)
}
// Move server to become a candidate, increase our term & vote for ourself.
s.state = Candidate
s.currentTerm++
@ -578,8 +620,8 @@ func (s *Server) promoteToCandidate() (term uint64, lastLogIndex uint64, lastLog
s.electionTimer.Pause()
// Return server state so we can check for it during leader promotion.
lastLogIndex, lastLogTerm = s.log.CommitInfo()
return s.currentTerm, lastLogIndex, lastLogTerm
lastLogIndex, lastLogTerm := s.log.CommitInfo()
return s.currentTerm, lastLogIndex, lastLogTerm, nil
}
// Promotes the server from a candidate to a leader. This can only occur if
@ -646,7 +688,9 @@ func (s *Server) RequestVote(req *RequestVoteRequest) (*RequestVoteResponse, err
// If we made it this far then cast a vote and reset our election time out.
s.votedFor = req.CandidateName
s.electionTimer.Reset()
if s.electionTimer != nil {
s.electionTimer.Reset()
}
return NewRequestVoteResponse(s.currentTerm, true), nil
}
@ -665,7 +709,8 @@ func (s *Server) setCurrentTerm(term uint64) {
}
// Listens to the election timeout and kicks off a new election.
func (s *Server) electionTimeoutFunc() {
func (s *Server) electionTimeoutFunc(startChannel chan bool) {
startChannel <- true
for {
// Grab the current timer channel.
s.mutex.Lock()
@ -706,6 +751,9 @@ func (s *Server) AddPeer(name string) error {
// Only add the peer if it doesn't have the same name.
if s.name != name {
peer := NewPeer(s, name, s.heartbeatTimeout)
if s.state == Leader {
peer.resume()
}
s.peers[peer.name] = peer
peer.resume()
if peer.heartbeatTimer.Running() {

View File

@ -127,6 +127,9 @@ func TestServerPromote(t *testing.T) {
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) {
return lookup[peer.Name()].RequestVote(req)
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error) {
return lookup[peer.Name()].AppendEntries(req)
}
servers := newTestCluster([]string{"1", "2", "3"}, transporter, lookup)
for _, server := range servers {
defer server.Stop()
@ -142,7 +145,12 @@ func TestServerPromoteDoubleElection(t *testing.T) {
lookup := map[string]*Server{}
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) {
return lookup[peer.Name()].RequestVote(req)
resp, err := lookup[peer.Name()].RequestVote(req)
return resp, err
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error) {
resp, err := lookup[peer.Name()].AppendEntries(req)
return resp, err
}
servers := newTestCluster([]string{"1", "2", "3"}, transporter, lookup)
lookup["2"].currentTerm, lookup["2"].votedFor = 1, "2"
@ -156,11 +164,12 @@ func TestServerPromoteDoubleElection(t *testing.T) {
if success, err := leader.promote(); !(success && err == nil && leader.state == Leader && leader.currentTerm == 2) {
t.Fatalf("Server promotion in cluster failed: %v (%v)", leader.state, err)
}
if lookup["2"].VotedFor() != "1" {
t.Fatalf("Unexpected vote for server 2: %v", lookup["2"].VotedFor())
time.Sleep(50 * time.Millisecond)
if lookup["2"].votedFor != "1" {
t.Fatalf("Unexpected vote for server 2: %v", lookup["2"].votedFor)
}
if lookup["3"].VotedFor() != "1" {
t.Fatalf("Unexpected vote for server 3: %v", lookup["3"].VotedFor())
if lookup["3"].votedFor != "1" {
t.Fatalf("Unexpected vote for server 3: %v", lookup["3"].votedFor)
}
}
@ -335,7 +344,8 @@ func TestServerMultiNode(t *testing.T) {
mutex.Lock()
s := servers[peer.name]
mutex.Unlock()
return s.RequestVote(req)
resp, err := s.RequestVote(req)
return resp, err
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error) {
mutex.Lock()
@ -367,7 +377,7 @@ func TestServerMultiNode(t *testing.T) {
servers[name] = server
mutex.Unlock()
}
time.Sleep(500 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
// Check that two peers exist on leader.
mutex.Lock()
@ -377,9 +387,9 @@ func TestServerMultiNode(t *testing.T) {
mutex.Unlock()
// Stop the first server and wait for a re-election.
time.Sleep(500 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
leader.Stop()
time.Sleep(500 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
// Check that either server 2 or 3 is the leader now.
mutex.Lock()

View File

@ -60,7 +60,7 @@ func setupLog(content string) (*Log, string) {
func newTestServer(name string, transporter Transporter) *Server {
path, _ := ioutil.TempDir("", "raft-server-")
server, _ := NewServer(name, path, transporter)
server, _ := NewServer(name, path, transporter, nil)
return server
}

View File

@ -162,7 +162,9 @@ func (t *Timer) Reset() {
case v, ok := <-internalTimer.C:
if ok {
t.mutex.Lock()
t.c <- v
if t.c != nil {
t.c <- v
}
t.mutex.Unlock()
}
case <-resetChannel: