add connectionstring

pull/820/head
Xiang Li 2013-08-15 16:35:01 -07:00
parent ff29c6d17d
commit 851ca9ceea
11 changed files with 120 additions and 72 deletions

View File

@ -3,5 +3,5 @@ package raft
type Config struct {
CommitIndex uint64 `json:"commitIndex"`
// TODO decide what we need to store in peer struct
Peers []string `json:"peers"`
Peers []*Peer `json:"peers"`
}

View File

@ -94,7 +94,7 @@ func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, r
return nil
}
url := fmt.Sprintf("http://%s%s", peer.Name(), t.AppendEntriesPath())
url := fmt.Sprintf("http://%s%s", peer.Name, t.AppendEntriesPath())
traceln(server.Name(), "POST", url)
client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}}
@ -122,7 +122,7 @@ func (t *HTTPTransporter) SendVoteRequest(server *Server, peer *Peer, req *Reque
return nil
}
url := fmt.Sprintf("http://%s%s", peer.Name(), t.RequestVotePath())
url := fmt.Sprintf("http://%s%s", peer.Name, t.RequestVotePath())
traceln(server.Name(), "POST", url)
client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}}

View File

@ -10,6 +10,7 @@ type JoinCommand interface {
// Join command
type DefaultJoinCommand struct {
Name string `json:"name"`
ConnectionString string `json:"connectionString"`
}
// The name of the Join command in the log
@ -18,7 +19,7 @@ func (c *DefaultJoinCommand) CommandName() string {
}
func (c *DefaultJoinCommand) Apply(server *Server) (interface{}, error) {
err := server.AddPeer(c.Name)
err := server.AddPeer(c.Name, c.ConnectionString)
return []byte("join"), err
}

56
peer.go
View File

@ -14,7 +14,8 @@ import (
// A peer is a reference to another server involved in the consensus protocol.
type Peer struct {
server *Server
name string
Name string `json:"name"`
ConnectionString string `json:"connectionString"`
prevLogIndex uint64
mutex sync.RWMutex
stopChan chan bool
@ -28,10 +29,11 @@ type Peer struct {
//------------------------------------------------------------------------------
// Creates a new peer.
func newPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer {
func newPeer(server *Server, name string, connectionString string, heartbeatTimeout time.Duration) *Peer {
return &Peer{
server: server,
name: name,
Name: name,
ConnectionString: connectionString,
heartbeatTimeout: heartbeatTimeout,
}
}
@ -42,11 +44,6 @@ func newPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer
//
//------------------------------------------------------------------------------
// Retrieves the name of the peer.
func (p *Peer) Name() string {
return p.name
}
// Sets the heartbeat timeout.
func (p *Peer) setHeartbeatTimeout(duration time.Duration) {
p.heartbeatTimeout = duration
@ -99,7 +96,7 @@ func (p *Peer) stopHeartbeat(flush bool) {
case p.stopChan <- flush:
default:
panic("[" + p.server.Name() + "] cannot stop [" + p.Name() + "] heartbeat")
panic("[" + p.server.Name() + "] cannot stop [" + p.Name + "] heartbeat")
}
}
@ -113,7 +110,8 @@ func (p *Peer) clone() *Peer {
p.mutex.Lock()
defer p.mutex.Unlock()
return &Peer{
name: p.name,
Name: p.Name,
ConnectionString: p.ConnectionString,
prevLogIndex: p.prevLogIndex,
}
}
@ -128,19 +126,19 @@ func (p *Peer) heartbeat(c chan bool) {
c <- true
debugln("peer.heartbeat: ", p.Name(), p.heartbeatTimeout)
debugln("peer.heartbeat: ", p.Name, p.heartbeatTimeout)
for {
select {
case flush := <-stopChan:
if !flush {
debugln("peer.heartbeat.stop: ", p.Name())
debugln("peer.heartbeat.stop: ", p.Name)
return
} else {
// before we can safely remove a node
// we must flush the remove command to the node first
p.flush()
debugln("peer.heartbeat.stop: ", p.Name())
debugln("peer.heartbeat.stop: ", p.Name)
return
}
@ -151,7 +149,7 @@ func (p *Peer) heartbeat(c chan bool) {
}
func (p *Peer) flush() {
debugln("peer.heartbeat.run: ", p.Name())
debugln("peer.heartbeat.run: ", p.Name)
prevLogIndex := p.getPrevLogIndex()
entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)
@ -172,14 +170,14 @@ func (p *Peer) flush() {
// Sends an AppendEntries request to the peer through the transport.
func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
traceln("peer.flush.send: ", p.server.Name(), "->", p.Name(), " ", len(req.Entries))
traceln("peer.flush.send: ", p.server.Name(), "->", p.Name, " ", len(req.Entries))
resp := p.server.Transporter().SendAppendEntriesRequest(p.server, p, req)
if resp == nil {
debugln("peer.flush.timeout: ", p.server.Name(), "->", p.Name())
debugln("peer.flush.timeout: ", p.server.Name(), "->", p.Name)
return
}
traceln("peer.flush.recv: ", p.Name())
traceln("peer.flush.recv: ", p.Name)
// If successful then update the previous log index.
p.mutex.Lock()
@ -193,7 +191,7 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
resp.append = true
}
}
traceln("peer.flush.success: ", p.server.Name(), "->", p.Name(), "; idx =", p.prevLogIndex)
traceln("peer.flush.success: ", p.server.Name(), "->", p.Name, "; idx =", p.prevLogIndex)
// If it was unsuccessful then decrement the previous log index and
// we'll try again next time.
@ -207,7 +205,7 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
p.prevLogIndex = resp.CommitIndex
debugln("peer.flush.commitIndex: ", p.server.Name(), "->", p.Name(), " idx =", p.prevLogIndex)
debugln("peer.flush.commitIndex: ", p.server.Name(), "->", p.Name, " idx =", p.prevLogIndex)
} else if p.prevLogIndex > 0 {
// Decrement the previous log index down until we find a match. Don't
// let it go below where the peer's commit index is though. That's a
@ -218,35 +216,35 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
p.prevLogIndex = resp.Index
}
debugln("peer.flush.decrement: ", p.server.Name(), "->", p.Name(), " idx =", p.prevLogIndex)
debugln("peer.flush.decrement: ", p.server.Name(), "->", p.Name, " idx =", p.prevLogIndex)
}
}
p.mutex.Unlock()
// Attach the peer to resp, thus server can know where it comes from
resp.peer = p.Name()
resp.peer = p.Name
// Send response to server for processing.
p.server.send(resp)
}
// Sends an Snapshot request to the peer through the transport.
func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) {
debugln("peer.snap.send: ", p.name)
debugln("peer.snap.send: ", p.Name)
resp := p.server.Transporter().SendSnapshotRequest(p.server, p, req)
if resp == nil {
debugln("peer.snap.timeout: ", p.name)
debugln("peer.snap.timeout: ", p.Name)
return
}
debugln("peer.snap.recv: ", p.name)
debugln("peer.snap.recv: ", p.Name)
// If successful, the peer should have been to snapshot state
// Send it the snapshot!
if resp.Success {
p.sendSnapshotRecoveryRequest()
} else {
debugln("peer.snap.failed: ", p.name)
debugln("peer.snap.failed: ", p.Name)
return
}
@ -255,12 +253,12 @@ func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) {
// Sends an Snapshot Recovery request to the peer through the transport.
func (p *Peer) sendSnapshotRecoveryRequest() {
req := newSnapshotRecoveryRequest(p.server.name, p.server.lastSnapshot)
debugln("peer.snap.recovery.send: ", p.name)
debugln("peer.snap.recovery.send: ", p.Name)
resp := p.server.Transporter().SendSnapshotRecoveryRequest(p.server, p, req)
if resp.Success {
p.prevLogIndex = req.LastIndex
} else {
debugln("peer.snap.recovery.failed: ", p.name)
debugln("peer.snap.recovery.failed: ", p.Name)
return
}
// Send response to server for processing.
@ -273,10 +271,10 @@ func (p *Peer) sendSnapshotRecoveryRequest() {
// send VoteRequest Request
func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteResponse) {
debugln("peer.vote: ", p.server.Name(), "->", p.Name())
debugln("peer.vote: ", p.server.Name(), "->", p.Name)
req.peer = p
if resp := p.server.Transporter().SendVoteRequest(p.server, p, req); resp != nil {
debugln("peer.vote: recv", p.server.Name(), "<-", p.Name())
debugln("peer.vote: recv", p.server.Name(), "<-", p.Name)
resp.peer = p
c <- resp
}

View File

@ -17,7 +17,7 @@ type ProtoSnapshotRecoveryRequest struct {
LeaderName *string `protobuf:"bytes,1,req" json:"LeaderName,omitempty"`
LastIndex *uint64 `protobuf:"varint,2,req" json:"LastIndex,omitempty"`
LastTerm *uint64 `protobuf:"varint,3,req" json:"LastTerm,omitempty"`
Peers []string `protobuf:"bytes,4,rep" json:"Peers,omitempty"`
Peers []*ProtoSnapshotRecoveryRequest_ProtoPeer `protobuf:"bytes,4,rep" json:"Peers,omitempty"`
State []byte `protobuf:"bytes,5,req" json:"State,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -47,7 +47,7 @@ func (m *ProtoSnapshotRecoveryRequest) GetLastTerm() uint64 {
return 0
}
func (m *ProtoSnapshotRecoveryRequest) GetPeers() []string {
func (m *ProtoSnapshotRecoveryRequest) GetPeers() []*ProtoSnapshotRecoveryRequest_ProtoPeer {
if m != nil {
return m.Peers
}
@ -61,5 +61,31 @@ func (m *ProtoSnapshotRecoveryRequest) GetState() []byte {
return nil
}
type ProtoSnapshotRecoveryRequest_ProtoPeer struct {
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
ConnectionString *string `protobuf:"bytes,2,req" json:"ConnectionString,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *ProtoSnapshotRecoveryRequest_ProtoPeer) Reset() {
*m = ProtoSnapshotRecoveryRequest_ProtoPeer{}
}
func (m *ProtoSnapshotRecoveryRequest_ProtoPeer) String() string { return proto.CompactTextString(m) }
func (*ProtoSnapshotRecoveryRequest_ProtoPeer) ProtoMessage() {}
func (m *ProtoSnapshotRecoveryRequest_ProtoPeer) GetName() string {
if m != nil && m.Name != nil {
return *m.Name
}
return ""
}
func (m *ProtoSnapshotRecoveryRequest_ProtoPeer) GetConnectionString() string {
if m != nil && m.ConnectionString != nil {
return *m.ConnectionString
}
return ""
}
func init() {
}

View File

@ -4,6 +4,12 @@ message ProtoSnapshotRecoveryRequest {
required string LeaderName=1;
required uint64 LastIndex=2;
required uint64 LastTerm=3;
repeated string Peers=4;
message ProtoPeer {
required string Name=1;
required string ConnectionString=2;
}
repeated ProtoPeer Peers=4;
required bytes State=5;
}

View File

@ -925,7 +925,7 @@ func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot
//--------------------------------------
// Adds a peer to the server.
func (s *Server) AddPeer(name string) error {
func (s *Server) AddPeer(name string, connectiongString string) error {
s.debugln("server.peer.add: ", name, len(s.peers))
defer s.writeConf()
// Do not allow peers to be added twice.
@ -938,13 +938,13 @@ func (s *Server) AddPeer(name string) error {
return nil
}
peer := newPeer(s, name, s.heartbeatTimeout)
peer := newPeer(s, name, connectiongString, s.heartbeatTimeout)
if s.State() == Leader {
peer.startHeartbeat()
}
s.peers[peer.name] = peer
s.peers[peer.Name] = peer
s.debugln("server.peer.conf.write: ", name)
@ -1012,14 +1012,13 @@ func (s *Server) TakeSnapshot() error {
state = []byte{0}
}
var peerNames []string
var peers []*Peer
for _, peer := range s.peers {
peerNames = append(peerNames, peer.Name())
peers = append(peers, peer.clone())
}
peerNames = append(peerNames, s.Name())
s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peerNames, state, path}
s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peers, state, path}
s.saveSnapshot()
@ -1102,8 +1101,8 @@ func (s *Server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *S
s.peers = make(map[string]*Peer)
// recovery the cluster configuration
for _, peerName := range req.Peers {
s.AddPeer(peerName)
for _, peer := range req.Peers {
s.AddPeer(peer.Name, peer.ConnectionString)
}
//update term and index
@ -1195,8 +1194,8 @@ func (s *Server) LoadSnapshot() error {
return err
}
for _, peerName := range s.lastSnapshot.Peers {
s.AddPeer(peerName)
for _, peer := range s.lastSnapshot.Peers {
s.AddPeer(peer.Name, peer.ConnectionString)
}
s.log.startTerm = s.lastSnapshot.LastTerm
@ -1212,11 +1211,11 @@ func (s *Server) LoadSnapshot() error {
func (s *Server) writeConf() {
peers := make([]string, len(s.peers))
peers := make([]*Peer, len(s.peers))
i := 0
for peer, _ := range s.peers {
peers[i] = peer
for _, peer := range s.peers {
peers[i] = peer.clone()
i++
}

View File

@ -164,10 +164,10 @@ func TestServerPromote(t *testing.T) {
lookup := map[string]*Server{}
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
return lookup[peer.Name()].RequestVote(req)
return lookup[peer.Name].RequestVote(req)
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
return lookup[peer.Name()].AppendEntries(req)
return lookup[peer.Name].AppendEntries(req)
}
servers := newTestCluster([]string{"1", "2", "3"}, transporter, lookup)
@ -329,13 +329,13 @@ func TestServerRecoverFromPreviousLogAndConf(t *testing.T) {
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
mutex.RLock()
s := servers[peer.name]
s := servers[peer.Name]
mutex.RUnlock()
return s.RequestVote(req)
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
mutex.RLock()
s := servers[peer.name]
s := servers[peer.Name]
mutex.RUnlock()
return s.AppendEntries(req)
}
@ -475,13 +475,13 @@ func TestServerMultiNode(t *testing.T) {
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
mutex.RLock()
s := servers[peer.name]
s := servers[peer.Name]
mutex.RUnlock()
return s.RequestVote(req)
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
mutex.RLock()
s := servers[peer.name]
s := servers[peer.Name]
mutex.RUnlock()
return s.AppendEntries(req)
}

View File

@ -21,7 +21,7 @@ type Snapshot struct {
LastIndex uint64 `json:"lastIndex"`
LastTerm uint64 `json:"lastTerm"`
// cluster configuration.
Peers []string `json: "peers"`
Peers []*Peer `json: "peers"`
State []byte `json: "state"`
Path string `json: "path"`
}

View File

@ -12,7 +12,7 @@ type SnapshotRecoveryRequest struct {
LeaderName string
LastIndex uint64
LastTerm uint64
Peers []string
Peers []*Peer
State []byte
}
@ -36,11 +36,21 @@ func newSnapshotRecoveryRequest(leaderName string, snapshot *Snapshot) *Snapshot
// Encodes the SnapshotRecoveryRequest to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (req *SnapshotRecoveryRequest) encode(w io.Writer) (int, error) {
protoPeers := make([]*protobuf.ProtoSnapshotRecoveryRequest_ProtoPeer, len(req.Peers))
for i, peer := range req.Peers {
protoPeers[i] = &protobuf.ProtoSnapshotRecoveryRequest_ProtoPeer{
Name: proto.String(peer.Name),
ConnectionString: proto.String(peer.ConnectionString),
}
}
pb := &protobuf.ProtoSnapshotRecoveryRequest{
LeaderName: proto.String(req.LeaderName),
LastIndex: proto.Uint64(req.LastIndex),
LastTerm: proto.Uint64(req.LastTerm),
Peers: req.Peers,
Peers: protoPeers,
State: req.State,
}
p, err := proto.Marshal(pb)
@ -62,7 +72,7 @@ func (req *SnapshotRecoveryRequest) decode(r io.Reader) (int, error) {
totalBytes := len(data)
pb := &protobuf.ProtoSnapshotRequest{}
pb := &protobuf.ProtoSnapshotRecoveryRequest{}
if err = proto.Unmarshal(data, pb); err != nil {
return -1, err
}
@ -70,8 +80,16 @@ func (req *SnapshotRecoveryRequest) decode(r io.Reader) (int, error) {
req.LeaderName = pb.GetLeaderName()
req.LastIndex = pb.GetLastIndex()
req.LastTerm = pb.GetLastTerm()
req.Peers = req.Peers
req.State = req.State
req.Peers = make([]*Peer, len(pb.Peers))
for i, peer := range pb.Peers {
req.Peers[i] = &Peer{
Name: peer.GetName(),
ConnectionString: peer.GetConnectionString(),
}
}
return totalBytes, nil
}

View File

@ -105,7 +105,7 @@ func newTestCluster(names []string, transporter Transporter, lookup map[string]*
server.SetHeartbeatTimeout(testHeartbeatTimeout)
server.Start()
for _, peer := range servers {
server.AddPeer(peer.Name())
server.AddPeer(peer.Name(), "")
}
}
return servers