Merge pull request #110 from coreos/conf

Conf
pull/820/head
Ben Johnson 2013-08-19 08:48:33 -07:00
commit ad4053e8b4
12 changed files with 327 additions and 142 deletions

7
config.go Normal file
View File

@ -0,0 +1,7 @@
package raft
type Config struct {
CommitIndex uint64 `json:"commitIndex"`
// TODO decide what we need to store in peer struct
Peers []*Peer `json:"peers"`
}

View File

@ -94,7 +94,7 @@ func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, r
return nil
}
url := fmt.Sprintf("http://%s%s", peer.Name(), t.AppendEntriesPath())
url := fmt.Sprintf("http://%s%s", peer.Name, t.AppendEntriesPath())
traceln(server.Name(), "POST", url)
client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}}
@ -122,7 +122,7 @@ func (t *HTTPTransporter) SendVoteRequest(server *Server, peer *Peer, req *Reque
return nil
}
url := fmt.Sprintf("http://%s%s", peer.Name(), t.RequestVotePath())
url := fmt.Sprintf("http://%s%s", peer.Name, t.RequestVotePath())
traceln(server.Name(), "POST", url)
client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}}

View File

@ -9,7 +9,8 @@ type JoinCommand interface {
// Join command
type DefaultJoinCommand struct {
Name string `json:"name"`
Name string `json:"name"`
ConnectionString string `json:"connectionString"`
}
// The name of the Join command in the log
@ -18,7 +19,7 @@ func (c *DefaultJoinCommand) CommandName() string {
}
func (c *DefaultJoinCommand) Apply(server *Server) (interface{}, error) {
err := server.AddPeer(c.Name)
err := server.AddPeer(c.Name, c.ConnectionString)
return []byte("join"), err
}

9
log.go
View File

@ -183,6 +183,15 @@ func (l *Log) open(path string) error {
// Append entry.
l.entries = append(l.entries, entry)
if entry.Index <= l.commitIndex {
command, err := newCommand(entry.CommandName, entry.Command)
if err != nil {
continue
}
l.ApplyFunc(command)
}
debugln("open.log.append log index ", entry.Index)
readBytes += int64(n)

58
peer.go
View File

@ -14,7 +14,8 @@ import (
// A peer is a reference to another server involved in the consensus protocol.
type Peer struct {
server *Server
name string
Name string `json:"name"`
ConnectionString string `json:"connectionString"`
prevLogIndex uint64
mutex sync.RWMutex
stopChan chan bool
@ -28,10 +29,11 @@ type Peer struct {
//------------------------------------------------------------------------------
// Creates a new peer.
func newPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer {
func newPeer(server *Server, name string, connectionString string, heartbeatTimeout time.Duration) *Peer {
return &Peer{
server: server,
name: name,
Name: name,
ConnectionString: connectionString,
heartbeatTimeout: heartbeatTimeout,
}
}
@ -42,11 +44,6 @@ func newPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer
//
//------------------------------------------------------------------------------
// Retrieves the name of the peer.
func (p *Peer) Name() string {
return p.name
}
// Sets the heartbeat timeout.
func (p *Peer) setHeartbeatTimeout(duration time.Duration) {
p.heartbeatTimeout = duration
@ -99,7 +96,7 @@ func (p *Peer) stopHeartbeat(flush bool) {
case p.stopChan <- flush:
default:
panic("[" + p.server.Name() + "] cannot stop [" + p.Name() + "] heartbeat")
panic("[" + p.server.Name() + "] cannot stop [" + p.Name + "] heartbeat")
}
}
@ -113,8 +110,9 @@ func (p *Peer) clone() *Peer {
p.mutex.Lock()
defer p.mutex.Unlock()
return &Peer{
name: p.name,
prevLogIndex: p.prevLogIndex,
Name: p.Name,
ConnectionString: p.ConnectionString,
prevLogIndex: p.prevLogIndex,
}
}
@ -128,19 +126,19 @@ func (p *Peer) heartbeat(c chan bool) {
c <- true
debugln("peer.heartbeat: ", p.Name(), p.heartbeatTimeout)
debugln("peer.heartbeat: ", p.Name, p.heartbeatTimeout)
for {
select {
case flush := <-stopChan:
if !flush {
debugln("peer.heartbeat.stop: ", p.Name())
debugln("peer.heartbeat.stop: ", p.Name)
return
} else {
// before we can safely remove a node
// we must flush the remove command to the node first
p.flush()
debugln("peer.heartbeat.stop: ", p.Name())
debugln("peer.heartbeat.stop: ", p.Name)
return
}
@ -151,7 +149,7 @@ func (p *Peer) heartbeat(c chan bool) {
}
func (p *Peer) flush() {
debugln("peer.heartbeat.run: ", p.Name())
debugln("peer.heartbeat.run: ", p.Name)
prevLogIndex := p.getPrevLogIndex()
entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)
@ -172,14 +170,14 @@ func (p *Peer) flush() {
// Sends an AppendEntries request to the peer through the transport.
func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
traceln("peer.flush.send: ", p.server.Name(), "->", p.Name(), " ", len(req.Entries))
traceln("peer.flush.send: ", p.server.Name(), "->", p.Name, " ", len(req.Entries))
resp := p.server.Transporter().SendAppendEntriesRequest(p.server, p, req)
if resp == nil {
debugln("peer.flush.timeout: ", p.server.Name(), "->", p.Name())
debugln("peer.flush.timeout: ", p.server.Name(), "->", p.Name)
return
}
traceln("peer.flush.recv: ", p.Name())
traceln("peer.flush.recv: ", p.Name)
// If successful then update the previous log index.
p.mutex.Lock()
@ -193,7 +191,7 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
resp.append = true
}
}
traceln("peer.flush.success: ", p.server.Name(), "->", p.Name(), "; idx =", p.prevLogIndex)
traceln("peer.flush.success: ", p.server.Name(), "->", p.Name, "; idx =", p.prevLogIndex)
// If it was unsuccessful then decrement the previous log index and
// we'll try again next time.
@ -207,7 +205,7 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
p.prevLogIndex = resp.CommitIndex
debugln("peer.flush.commitIndex: ", p.server.Name(), "->", p.Name(), " idx =", p.prevLogIndex)
debugln("peer.flush.commitIndex: ", p.server.Name(), "->", p.Name, " idx =", p.prevLogIndex)
} else if p.prevLogIndex > 0 {
// Decrement the previous log index down until we find a match. Don't
// let it go below where the peer's commit index is though. That's a
@ -218,35 +216,35 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
p.prevLogIndex = resp.Index
}
debugln("peer.flush.decrement: ", p.server.Name(), "->", p.Name(), " idx =", p.prevLogIndex)
debugln("peer.flush.decrement: ", p.server.Name(), "->", p.Name, " idx =", p.prevLogIndex)
}
}
p.mutex.Unlock()
// Attach the peer to resp, thus server can know where it comes from
resp.peer = p.Name()
resp.peer = p.Name
// Send response to server for processing.
p.server.send(resp)
}
// Sends an Snapshot request to the peer through the transport.
func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) {
debugln("peer.snap.send: ", p.name)
debugln("peer.snap.send: ", p.Name)
resp := p.server.Transporter().SendSnapshotRequest(p.server, p, req)
if resp == nil {
debugln("peer.snap.timeout: ", p.name)
debugln("peer.snap.timeout: ", p.Name)
return
}
debugln("peer.snap.recv: ", p.name)
debugln("peer.snap.recv: ", p.Name)
// If successful, the peer should have been to snapshot state
// Send it the snapshot!
if resp.Success {
p.sendSnapshotRecoveryRequest()
} else {
debugln("peer.snap.failed: ", p.name)
debugln("peer.snap.failed: ", p.Name)
return
}
@ -255,12 +253,12 @@ func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) {
// Sends an Snapshot Recovery request to the peer through the transport.
func (p *Peer) sendSnapshotRecoveryRequest() {
req := newSnapshotRecoveryRequest(p.server.name, p.server.lastSnapshot)
debugln("peer.snap.recovery.send: ", p.name)
debugln("peer.snap.recovery.send: ", p.Name)
resp := p.server.Transporter().SendSnapshotRecoveryRequest(p.server, p, req)
if resp.Success {
p.prevLogIndex = req.LastIndex
} else {
debugln("peer.snap.recovery.failed: ", p.name)
debugln("peer.snap.recovery.failed: ", p.Name)
return
}
// Send response to server for processing.
@ -273,10 +271,10 @@ func (p *Peer) sendSnapshotRecoveryRequest() {
// send VoteRequest Request
func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteResponse) {
debugln("peer.vote: ", p.server.Name(), "->", p.Name())
debugln("peer.vote: ", p.server.Name(), "->", p.Name)
req.peer = p
if resp := p.server.Transporter().SendVoteRequest(p.server, p, req); resp != nil {
debugln("peer.vote: recv", p.server.Name(), "<-", p.Name())
debugln("peer.vote: recv", p.server.Name(), "<-", p.Name)
resp.peer = p
c <- resp
}

View File

@ -14,12 +14,12 @@ var _ = &json.SyntaxError{}
var _ = math.Inf
type ProtoSnapshotRecoveryRequest struct {
LeaderName *string `protobuf:"bytes,1,req" json:"LeaderName,omitempty"`
LastIndex *uint64 `protobuf:"varint,2,req" json:"LastIndex,omitempty"`
LastTerm *uint64 `protobuf:"varint,3,req" json:"LastTerm,omitempty"`
Peers []string `protobuf:"bytes,4,rep" json:"Peers,omitempty"`
State []byte `protobuf:"bytes,5,req" json:"State,omitempty"`
XXX_unrecognized []byte `json:"-"`
LeaderName *string `protobuf:"bytes,1,req" json:"LeaderName,omitempty"`
LastIndex *uint64 `protobuf:"varint,2,req" json:"LastIndex,omitempty"`
LastTerm *uint64 `protobuf:"varint,3,req" json:"LastTerm,omitempty"`
Peers []*ProtoSnapshotRecoveryRequest_ProtoPeer `protobuf:"bytes,4,rep" json:"Peers,omitempty"`
State []byte `protobuf:"bytes,5,req" json:"State,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *ProtoSnapshotRecoveryRequest) Reset() { *m = ProtoSnapshotRecoveryRequest{} }
@ -47,7 +47,7 @@ func (m *ProtoSnapshotRecoveryRequest) GetLastTerm() uint64 {
return 0
}
func (m *ProtoSnapshotRecoveryRequest) GetPeers() []string {
func (m *ProtoSnapshotRecoveryRequest) GetPeers() []*ProtoSnapshotRecoveryRequest_ProtoPeer {
if m != nil {
return m.Peers
}
@ -61,5 +61,31 @@ func (m *ProtoSnapshotRecoveryRequest) GetState() []byte {
return nil
}
type ProtoSnapshotRecoveryRequest_ProtoPeer struct {
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
ConnectionString *string `protobuf:"bytes,2,req" json:"ConnectionString,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *ProtoSnapshotRecoveryRequest_ProtoPeer) Reset() {
*m = ProtoSnapshotRecoveryRequest_ProtoPeer{}
}
func (m *ProtoSnapshotRecoveryRequest_ProtoPeer) String() string { return proto.CompactTextString(m) }
func (*ProtoSnapshotRecoveryRequest_ProtoPeer) ProtoMessage() {}
func (m *ProtoSnapshotRecoveryRequest_ProtoPeer) GetName() string {
if m != nil && m.Name != nil {
return *m.Name
}
return ""
}
func (m *ProtoSnapshotRecoveryRequest_ProtoPeer) GetConnectionString() string {
if m != nil && m.ConnectionString != nil {
return *m.ConnectionString
}
return ""
}
func init() {
}

View File

@ -3,7 +3,13 @@ package protobuf;
message ProtoSnapshotRecoveryRequest {
required string LeaderName=1;
required uint64 LastIndex=2;
required uint64 LastTerm=3;
repeated string Peers=4;
required uint64 LastTerm=3;
message ProtoPeer {
required string Name=1;
required string ConnectionString=2;
}
repeated ProtoPeer Peers=4;
required bytes State=5;
}

171
server.go
View File

@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"hash/crc32"
"io"
"io/ioutil"
"os"
"path"
@ -81,8 +80,6 @@ type Server struct {
lastSnapshot *Snapshot
stateMachine StateMachine
maxLogEntriesPerRequest uint64
confFile *os.File
}
// An event to be processed by the server's event loop.
@ -338,14 +335,14 @@ func (s *Server) Start() error {
// Create snapshot directory if not exist
os.Mkdir(path.Join(s.path, "snapshot"), 0700)
// Initialize the log and load it up.
if err := s.log.open(s.LogPath()); err != nil {
s.debugln("raft: Log error: ", err)
if err := s.readConf(); err != nil {
s.debugln("raft: Conf file error: ", err)
return fmt.Errorf("raft: Initialization error: %s", err)
}
if err := s.readConf(); err != nil {
s.debugln("raft: Conf file error: ", err)
// Initialize the log and load it up.
if err := s.log.open(s.LogPath()); err != nil {
s.debugln("raft: Log error: ", err)
return fmt.Errorf("raft: Initialization error: %s", err)
}
@ -374,53 +371,6 @@ func (s *Server) Start() error {
return nil
}
// Read the configuration for the server.
func (s *Server) readConf() error {
var err error
confPath := path.Join(s.path, "conf")
s.debugln("readConf.open ", confPath)
// open conf file
s.confFile, err = os.OpenFile(confPath, os.O_RDWR, 0600)
if err != nil {
if os.IsNotExist(err) {
s.confFile, err = os.OpenFile(confPath, os.O_WRONLY|os.O_CREATE, 0600)
debugln("readConf.create ", confPath)
if err != nil {
return err
}
}
return err
}
peerNames := make([]string, 0)
for {
var peerName string
_, err = fmt.Fscanf(s.confFile, "%s\n", &peerName)
if err != nil {
if err == io.EOF {
s.debugln("server.peer.conf: finish")
break
}
return err
}
s.debugln("server.peer.conf.read: ", peerName)
peerNames = append(peerNames, peerName)
}
s.confFile.Truncate(0)
s.confFile.Seek(0, os.SEEK_SET)
for _, peerName := range peerNames {
s.AddPeer(peerName)
}
return nil
}
// Shuts down the server.
func (s *Server) Stop() {
s.send(&stopValue)
@ -975,9 +925,9 @@ func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot
//--------------------------------------
// Adds a peer to the server.
func (s *Server) AddPeer(name string) error {
func (s *Server) AddPeer(name string, connectiongString string) error {
s.debugln("server.peer.add: ", name, len(s.peers))
defer s.writeConf()
// Do not allow peers to be added twice.
if s.peers[name] != nil {
return nil
@ -988,19 +938,15 @@ func (s *Server) AddPeer(name string) error {
return nil
}
// when loading snapshot s.confFile should be nil
if s.confFile != nil {
_, err := fmt.Fprintln(s.confFile, name)
s.debugln("server.peer.conf.write: ", name)
if err != nil {
return err
}
}
peer := newPeer(s, name, s.heartbeatTimeout)
peer := newPeer(s, name, connectiongString, s.heartbeatTimeout)
if s.State() == Leader {
peer.startHeartbeat()
}
s.peers[peer.name] = peer
s.peers[peer.Name] = peer
s.debugln("server.peer.conf.write: ", name)
return nil
}
@ -1009,14 +955,20 @@ func (s *Server) AddPeer(name string) error {
func (s *Server) RemovePeer(name string) error {
s.debugln("server.peer.remove: ", name, len(s.peers))
defer s.writeConf()
if name == s.Name() {
// when the removed node restart, it should be able
// to know it has been removed before. So we need
// to update knownCommitIndex
return nil
}
// Return error if peer doesn't exist.
peer := s.peers[name]
if peer == nil {
return fmt.Errorf("raft: Peer not found: %s", name)
}
// TODO: Flush entries to the peer first.
// Stop peer and remove it.
if s.State() == Leader {
peer.stopHeartbeat(true)
@ -1024,16 +976,6 @@ func (s *Server) RemovePeer(name string) error {
delete(s.peers, name)
s.confFile.Truncate(0)
s.confFile.Seek(0, os.SEEK_SET)
for peer := range s.peers {
_, err := fmt.Fprintln(s.confFile, peer)
if err != nil {
return err
}
}
return nil
}
@ -1070,14 +1012,13 @@ func (s *Server) TakeSnapshot() error {
state = []byte{0}
}
var peerNames []string
var peers []*Peer
for _, peer := range s.peers {
peerNames = append(peerNames, peer.Name())
peers = append(peers, peer.clone())
}
peerNames = append(peerNames, s.Name())
s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peerNames, state, path}
s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peers, state, path}
s.saveSnapshot()
@ -1160,8 +1101,8 @@ func (s *Server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *S
s.peers = make(map[string]*Peer)
// recovery the cluster configuration
for _, peerName := range req.Peers {
s.AddPeer(peerName)
for _, peer := range req.Peers {
s.AddPeer(peer.Name, peer.ConnectionString)
}
//update term and index
@ -1253,8 +1194,8 @@ func (s *Server) LoadSnapshot() error {
return err
}
for _, peerName := range s.lastSnapshot.Peers {
s.AddPeer(peerName)
for _, peer := range s.lastSnapshot.Peers {
s.AddPeer(peer.Name, peer.ConnectionString)
}
s.log.startTerm = s.lastSnapshot.LastTerm
@ -1264,6 +1205,62 @@ func (s *Server) LoadSnapshot() error {
return err
}
//--------------------------------------
// Config File
//--------------------------------------
func (s *Server) writeConf() {
peers := make([]*Peer, len(s.peers))
i := 0
for _, peer := range s.peers {
peers[i] = peer.clone()
i++
}
r := &Config{
CommitIndex: s.log.commitIndex,
Peers: peers,
}
b, _ := json.Marshal(r)
confPath := path.Join(s.path, "conf")
tmpConfPath := path.Join(s.path, "conf.tmp")
err := ioutil.WriteFile(tmpConfPath, b, 0600)
if err != nil {
panic(err)
}
os.Rename(tmpConfPath, confPath)
}
// Read the configuration for the server.
func (s *Server) readConf() error {
confPath := path.Join(s.path, "conf")
s.debugln("readConf.open ", confPath)
// open conf file
b, err := ioutil.ReadFile(confPath)
if err != nil {
return nil
}
conf := &Config{}
if err = json.Unmarshal(b, conf); err != nil {
return err
}
s.log.commitIndex = conf.CommitIndex
return nil
}
//--------------------------------------
// Debugging
//--------------------------------------

View File

@ -164,10 +164,10 @@ func TestServerPromote(t *testing.T) {
lookup := map[string]*Server{}
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
return lookup[peer.Name()].RequestVote(req)
return lookup[peer.Name].RequestVote(req)
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
return lookup[peer.Name()].AppendEntries(req)
return lookup[peer.Name].AppendEntries(req)
}
servers := newTestCluster([]string{"1", "2", "3"}, transporter, lookup)
@ -316,6 +316,124 @@ func TestServerDenyCommandExecutionWhenFollower(t *testing.T) {
}
}
//--------------------------------------
// Recovery
//--------------------------------------
// Ensure that a follower cannot execute a command.
func TestServerRecoverFromPreviousLogAndConf(t *testing.T) {
// Initialize the servers.
var mutex sync.RWMutex
servers := map[string]*Server{}
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
mutex.RLock()
s := servers[peer.Name]
mutex.RUnlock()
return s.RequestVote(req)
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
mutex.RLock()
s := servers[peer.Name]
mutex.RUnlock()
return s.AppendEntries(req)
}
disTransporter := &testTransporter{}
disTransporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
return nil
}
disTransporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
return nil
}
var names []string
var paths = make(map[string]string)
n := 5
// add n servers
for i := 1; i <= n; i++ {
names = append(names, strconv.Itoa(i))
}
var leader *Server
for _, name := range names {
server := newTestServer(name, transporter)
servers[name] = server
paths[name] = server.Path()
if name == "1" {
leader = server
server.SetHeartbeatTimeout(testHeartbeatTimeout)
server.Start()
time.Sleep(testHeartbeatTimeout)
} else {
server.SetElectionTimeout(testElectionTimeout)
server.SetHeartbeatTimeout(testHeartbeatTimeout)
server.Start()
time.Sleep(testHeartbeatTimeout)
}
if _, err := leader.Do(&DefaultJoinCommand{Name: name}); err != nil {
t.Fatalf("Unable to join server[%s]: %v", name, err)
}
}
// commit some commands
for i := 0; i < 10; i++ {
if _, err := leader.Do(&testCommand2{X: 1}); err != nil {
t.Fatalf("cannot commit command:", err.Error())
}
}
time.Sleep(2 * testHeartbeatTimeout)
for _, name := range names {
server := servers[name]
if server.CommitIndex() != 16 {
t.Fatalf("%s commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 16)
}
server.Stop()
}
for _, name := range names {
// with old path and disable transportation
server := newTestServerWithPath(name, disTransporter, paths[name])
servers[name] = server
server.Start()
// should only commit to the last join command
if server.CommitIndex() != 6 {
t.Fatalf("%s recover phase 1 commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 6)
}
// peer conf should be recovered
if len(server.Peers()) != 4 {
t.Fatalf("%s recover phase 1 peer failed! [%d/%d]", name, len(server.Peers()), 4)
}
}
// let nodes talk to each other
for _, name := range names {
servers[name].SetTransporter(transporter)
}
time.Sleep(2 * testElectionTimeout)
// should commit to the previous index + 1(nop command when new leader elected)
for _, name := range names {
server := servers[name]
if server.CommitIndex() != 17 {
t.Fatalf("%s commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 16)
}
server.Stop()
}
}
//--------------------------------------
// Membership
//--------------------------------------
@ -357,13 +475,13 @@ func TestServerMultiNode(t *testing.T) {
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
mutex.RLock()
s := servers[peer.name]
s := servers[peer.Name]
mutex.RUnlock()
return s.RequestVote(req)
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
mutex.RLock()
s := servers[peer.name]
s := servers[peer.Name]
mutex.RUnlock()
return s.AppendEntries(req)
}

View File

@ -20,9 +20,9 @@ type Snapshot struct {
LastIndex uint64 `json:"lastIndex"`
LastTerm uint64 `json:"lastTerm"`
// cluster configuration.
Peers []string `json: "peers"`
State []byte `json: "state"`
Path string `json: "path"`
Peers []*Peer `json: "peers"`
State []byte `json: "state"`
Path string `json: "path"`
}
// Save the snapshot to a file

View File

@ -12,7 +12,7 @@ type SnapshotRecoveryRequest struct {
LeaderName string
LastIndex uint64
LastTerm uint64
Peers []string
Peers []*Peer
State []byte
}
@ -36,11 +36,21 @@ func newSnapshotRecoveryRequest(leaderName string, snapshot *Snapshot) *Snapshot
// Encodes the SnapshotRecoveryRequest to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (req *SnapshotRecoveryRequest) encode(w io.Writer) (int, error) {
protoPeers := make([]*protobuf.ProtoSnapshotRecoveryRequest_ProtoPeer, len(req.Peers))
for i, peer := range req.Peers {
protoPeers[i] = &protobuf.ProtoSnapshotRecoveryRequest_ProtoPeer{
Name: proto.String(peer.Name),
ConnectionString: proto.String(peer.ConnectionString),
}
}
pb := &protobuf.ProtoSnapshotRecoveryRequest{
LeaderName: proto.String(req.LeaderName),
LastIndex: proto.Uint64(req.LastIndex),
LastTerm: proto.Uint64(req.LastTerm),
Peers: req.Peers,
Peers: protoPeers,
State: req.State,
}
p, err := proto.Marshal(pb)
@ -62,7 +72,7 @@ func (req *SnapshotRecoveryRequest) decode(r io.Reader) (int, error) {
totalBytes := len(data)
pb := &protobuf.ProtoSnapshotRequest{}
pb := &protobuf.ProtoSnapshotRecoveryRequest{}
if err = proto.Unmarshal(data, pb); err != nil {
return -1, err
}
@ -70,8 +80,16 @@ func (req *SnapshotRecoveryRequest) decode(r io.Reader) (int, error) {
req.LeaderName = pb.GetLeaderName()
req.LastIndex = pb.GetLastIndex()
req.LastTerm = pb.GetLastTerm()
req.Peers = req.Peers
req.State = req.State
req.Peers = make([]*Peer, len(pb.Peers))
for i, peer := range pb.Peers {
req.Peers[i] = &Peer{
Name: peer.GetName(),
ConnectionString: peer.GetConnectionString(),
}
}
return totalBytes, nil
}

View File

@ -69,6 +69,11 @@ func newTestServer(name string, transporter Transporter) *Server {
return server
}
func newTestServerWithPath(name string, transporter Transporter, p string) *Server {
server, _ := NewServer(name, p, transporter, nil, nil)
return server
}
func newTestServerWithLog(name string, transporter Transporter, entries []*LogEntry) *Server {
server := newTestServer(name, transporter)
f, err := os.Create(server.LogPath())
@ -100,7 +105,7 @@ func newTestCluster(names []string, transporter Transporter, lookup map[string]*
server.SetHeartbeatTimeout(testHeartbeatTimeout)
server.Start()
for _, peer := range servers {
server.AddPeer(peer.Name())
server.AddPeer(peer.Name(), "")
}
}
return servers