Add logic to allow server to be pointed at any server in the cluster and find the leader in order to joiN

pull/17/head
Paul Dix 2013-10-10 11:32:21 -04:00
parent 5679b338eb
commit 4a299ce5d7
2 changed files with 82 additions and 10 deletions

View File

@ -304,3 +304,52 @@ func (self *CoordinatorSuite) TestCanElectNewLeaderAndRecover(c *C) {
assertConfigContains(port2, "sdf", true, c)
assertConfigContains(port3, "sdf", true, c)
}
func (self *CoordinatorSuite) TestCanJoinAClusterWhenNotInitiallyPointedAtLeader(c *C) {
logDir1 := nextDir()
port1 := nextPort()
logDir2 := nextDir()
port2 := nextPort()
logDir3 := nextDir()
port3 := nextPort()
defer clearPath(logDir1)
defer clearPath(logDir2)
defer clearPath(logDir3)
_, server := newConfigAndServer(logDir1, port1)
var err error
go func() {
err = server.ListenAndServe([]string{fmt.Sprintf("localhost:%d", port2)}, false)
}()
_, server2 := newConfigAndServer(logDir2, port2)
defer server2.Close()
var err2 error
go func() {
err2 = server2.ListenAndServe([]string{fmt.Sprintf("localhost:%d", port1), fmt.Sprintf("localhost:%d", port3)}, true)
}()
_, server3 := newConfigAndServer(logDir3, port3)
defer server3.Close()
var err3 error
go func() {
err3 = server3.ListenAndServe([]string{fmt.Sprintf("localhost:%d", port2)}, true)
}()
time.Sleep(time.Second)
c.Assert(err, Equals, nil)
c.Assert(err2, Equals, nil)
c.Assert(err3, Equals, nil)
leader, _ := server2.leaderConnectString()
c.Assert(leader, Equals, fmt.Sprintf("http://localhost:%d", port1))
err = server.AddReadApiKey("db", "key1")
c.Assert(err, Equals, nil)
time.Sleep(time.Millisecond * 200)
assertConfigContains(port1, "key1", true, c)
assertConfigContains(port2, "key1", true, c)
assertConfigContains(port3, "key1", true, c)
}

View File

@ -13,6 +13,7 @@ import (
"net"
"net/http"
"path/filepath"
"strings"
"sync"
"time"
)
@ -212,28 +213,50 @@ func (s *RaftServer) Join(leader string) error {
Name: s.raftServer.Name(),
ConnectionString: s.connectionString(),
}
connectUrl := leader
if !strings.HasPrefix(connectUrl, "http://") {
connectUrl = "http://" + connectUrl
}
if !strings.HasSuffix(connectUrl, "/join") {
connectUrl = connectUrl + "/join"
}
var b bytes.Buffer
json.NewEncoder(&b).Encode(command)
resp, err := http.Post(fmt.Sprintf("http://%s/join", leader), "application/json", &b)
resp, err := http.Post(connectUrl, "application/json", &b)
if err != nil {
log.Println("ERROR: ", err)
return err
}
resp.Body.Close()
defer resp.Body.Close()
if resp.StatusCode == http.StatusTemporaryRedirect {
address := resp.Header.Get("Location")
log.Printf("Redirected to %s to join leader\n", address)
return s.Join(address)
}
return nil
}
func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) {
command := &raft.DefaultJoinCommand{}
if s.raftServer.State() == raft.Leader {
command := &raft.DefaultJoinCommand{}
if err := json.NewDecoder(req.Body).Decode(&command); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if _, err := s.raftServer.Do(command); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
if err := json.NewDecoder(req.Body).Decode(&command); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if _, err := s.raftServer.Do(command); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
} else {
if leader, ok := s.leaderConnectString(); ok {
log.Println("redirecting to leader to join...")
http.Redirect(w, req, leader+"/join", http.StatusTemporaryRedirect)
} else {
http.Error(w, errors.New("Couldn't find leader of the cluster to join").Error(), http.StatusInternalServerError)
}
}
}