wip/ move write/read to server.go and add test

pull/820/head
Xiang Li 2013-08-15 12:25:00 -07:00
parent ce1bd81d08
commit ff29c6d17d
4 changed files with 185 additions and 85 deletions

View File

@ -1,84 +1,7 @@
package raft
import (
"encoding/json"
"io/ioutil"
"os"
"path"
)
type RaftConfig struct {
KnownCommitIndex uint64 `json:"KnownCommitIndex"`
type Config struct {
CommitIndex uint64 `json:"commitIndex"`
// TODO decide what we need to store in peer struct
Peers []string `json:"Peers"`
}
func (s *Server) writeConf() {
peers := make([]string, len(s.peers))
i := 0
for peer := range s.peers {
peers[i] = peer
i++
}
r := &RaftConfig{
KnownCommitIndex: s.log.commitIndex,
Peers: peers,
}
b, _ := json.Marshal(r)
confBakPath := path.Join(s.path, "conf.bak")
confPath := path.Join(s.path, "conf")
confFile, err := os.OpenFile(confBakPath, os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
panic(err)
}
confFile.Write(b)
os.Remove(confPath)
os.Rename(confBakPath, confPath)
}
// Read the configuration for the server.
func (s *Server) readConf() error {
confPath := path.Join(s.path, "conf")
s.debugln("readConf.open ", confPath)
// open conf file
confFile, err := os.OpenFile(confPath, os.O_RDWR, 0600)
if err != nil {
if os.IsNotExist(err) {
_, err = os.OpenFile(confPath, os.O_WRONLY|os.O_CREATE, 0600)
debugln("readConf.create ", confPath)
if err != nil {
return err
}
}
return err
}
raftConf := &RaftConfig{}
b, err := ioutil.ReadAll(confFile)
if err != nil {
return err
}
confFile.Close()
err = json.Unmarshal(b, raftConf)
if err != nil {
return err
}
s.log.commitIndex = raftConf.KnownCommitIndex
return nil
Peers []string `json:"peers"`
}

View File

@ -927,7 +927,7 @@ func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot
// Adds a peer to the server.
func (s *Server) AddPeer(name string) error {
s.debugln("server.peer.add: ", name, len(s.peers))
defer s.writeConf()
// Do not allow peers to be added twice.
if s.peers[name] != nil {
return nil
@ -946,7 +946,6 @@ func (s *Server) AddPeer(name string) error {
s.peers[peer.name] = peer
s.writeConf()
s.debugln("server.peer.conf.write: ", name)
return nil
@ -956,11 +955,12 @@ func (s *Server) AddPeer(name string) error {
func (s *Server) RemovePeer(name string) error {
s.debugln("server.peer.remove: ", name, len(s.peers))
defer s.writeConf()
if name == s.Name() {
// when the removed node restart, it should be able
// to know it has been removed before. So we need
// to update knownCommitIndex
s.writeConf()
return nil
}
// Return error if peer doesn't exist.
@ -976,8 +976,6 @@ func (s *Server) RemovePeer(name string) error {
delete(s.peers, name)
s.writeConf()
return nil
}
@ -1208,6 +1206,62 @@ func (s *Server) LoadSnapshot() error {
return err
}
//--------------------------------------
// Config File
//--------------------------------------
func (s *Server) writeConf() {
peers := make([]string, len(s.peers))
i := 0
for peer, _ := range s.peers {
peers[i] = peer
i++
}
r := &Config{
CommitIndex: s.log.commitIndex,
Peers: peers,
}
b, _ := json.Marshal(r)
confPath := path.Join(s.path, "conf")
tmpConfPath := path.Join(s.path, "conf.tmp")
err := ioutil.WriteFile(tmpConfPath, b, 0600)
if err != nil {
panic(err)
}
os.Rename(tmpConfPath, confPath)
}
// Read the configuration for the server.
func (s *Server) readConf() error {
confPath := path.Join(s.path, "conf")
s.debugln("readConf.open ", confPath)
// open conf file
b, err := ioutil.ReadFile(confPath)
if err != nil {
return nil
}
conf := &Config{}
if err = json.Unmarshal(b, conf); err != nil {
return err
}
s.log.commitIndex = conf.CommitIndex
return nil
}
//--------------------------------------
// Debugging
//--------------------------------------

View File

@ -316,6 +316,124 @@ func TestServerDenyCommandExecutionWhenFollower(t *testing.T) {
}
}
//--------------------------------------
// Recovery
//--------------------------------------
// Ensure that a follower cannot execute a command.
func TestServerRecoverFromPreviousLogAndConf(t *testing.T) {
// Initialize the servers.
var mutex sync.RWMutex
servers := map[string]*Server{}
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
mutex.RLock()
s := servers[peer.name]
mutex.RUnlock()
return s.RequestVote(req)
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
mutex.RLock()
s := servers[peer.name]
mutex.RUnlock()
return s.AppendEntries(req)
}
disTransporter := &testTransporter{}
disTransporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
return nil
}
disTransporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
return nil
}
var names []string
var paths = make(map[string]string)
n := 5
// add n servers
for i := 1; i <= n; i++ {
names = append(names, strconv.Itoa(i))
}
var leader *Server
for _, name := range names {
server := newTestServer(name, transporter)
servers[name] = server
paths[name] = server.Path()
if name == "1" {
leader = server
server.SetHeartbeatTimeout(testHeartbeatTimeout)
server.Start()
time.Sleep(testHeartbeatTimeout)
} else {
server.SetElectionTimeout(testElectionTimeout)
server.SetHeartbeatTimeout(testHeartbeatTimeout)
server.Start()
time.Sleep(testHeartbeatTimeout)
}
if _, err := leader.Do(&DefaultJoinCommand{Name: name}); err != nil {
t.Fatalf("Unable to join server[%s]: %v", name, err)
}
}
// commit some commands
for i := 0; i < 10; i++ {
if _, err := leader.Do(&testCommand2{X: 1}); err != nil {
t.Fatalf("cannot commit command:", err.Error())
}
}
time.Sleep(2 * testHeartbeatTimeout)
for _, name := range names {
server := servers[name]
if server.CommitIndex() != 16 {
t.Fatalf("%s commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 16)
}
server.Stop()
}
for _, name := range names {
// with old path and disable transportation
server := newTestServerWithPath(name, disTransporter, paths[name])
servers[name] = server
server.Start()
// should only commit to the last join command
if server.CommitIndex() != 6 {
t.Fatalf("%s recover phase 1 commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 6)
}
// peer conf should be recovered
if len(server.Peers()) != 4 {
t.Fatalf("%s recover phase 1 peer failed! [%d/%d]", name, len(server.Peers()), 4)
}
}
// let nodes talk to each other
for _, name := range names {
servers[name].SetTransporter(transporter)
}
time.Sleep(2 * testElectionTimeout)
// should commit to the previous index + 1(nop command when new leader elected)
for _, name := range names {
server := servers[name]
if server.CommitIndex() != 17 {
t.Fatalf("%s commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 16)
}
server.Stop()
}
}
//--------------------------------------
// Membership
//--------------------------------------

View File

@ -69,6 +69,11 @@ func newTestServer(name string, transporter Transporter) *Server {
return server
}
func newTestServerWithPath(name string, transporter Transporter, p string) *Server {
server, _ := NewServer(name, p, transporter, nil, nil)
return server
}
func newTestServerWithLog(name string, transporter Transporter, entries []*LogEntry) *Server {
server := newTestServer(name, transporter)
f, err := os.Create(server.LogPath())