diff --git a/src/coordinator/coordinator_test.go b/src/coordinator/coordinator_test.go index 1d4cdc10d5..efae6509b5 100644 --- a/src/coordinator/coordinator_test.go +++ b/src/coordinator/coordinator_test.go @@ -304,3 +304,52 @@ func (self *CoordinatorSuite) TestCanElectNewLeaderAndRecover(c *C) { assertConfigContains(port2, "sdf", true, c) assertConfigContains(port3, "sdf", true, c) } + +func (self *CoordinatorSuite) TestCanJoinAClusterWhenNotInitiallyPointedAtLeader(c *C) { + logDir1 := nextDir() + port1 := nextPort() + logDir2 := nextDir() + port2 := nextPort() + logDir3 := nextDir() + port3 := nextPort() + defer clearPath(logDir1) + defer clearPath(logDir2) + defer clearPath(logDir3) + + _, server := newConfigAndServer(logDir1, port1) + + var err error + go func() { + err = server.ListenAndServe([]string{fmt.Sprintf("localhost:%d", port2)}, false) + }() + + _, server2 := newConfigAndServer(logDir2, port2) + defer server2.Close() + + var err2 error + go func() { + err2 = server2.ListenAndServe([]string{fmt.Sprintf("localhost:%d", port1), fmt.Sprintf("localhost:%d", port3)}, true) + }() + + _, server3 := newConfigAndServer(logDir3, port3) + defer server3.Close() + + var err3 error + go func() { + err3 = server3.ListenAndServe([]string{fmt.Sprintf("localhost:%d", port2)}, true) + }() + time.Sleep(time.Second) + + c.Assert(err, Equals, nil) + c.Assert(err2, Equals, nil) + c.Assert(err3, Equals, nil) + leader, _ := server2.leaderConnectString() + c.Assert(leader, Equals, fmt.Sprintf("http://localhost:%d", port1)) + + err = server.AddReadApiKey("db", "key1") + c.Assert(err, Equals, nil) + time.Sleep(time.Millisecond * 200) + assertConfigContains(port1, "key1", true, c) + assertConfigContains(port2, "key1", true, c) + assertConfigContains(port3, "key1", true, c) +} diff --git a/src/coordinator/raft_server.go b/src/coordinator/raft_server.go index d3abffc583..25b545de1b 100644 --- a/src/coordinator/raft_server.go +++ b/src/coordinator/raft_server.go @@ -13,6 +13,7 @@ import ( "net" "net/http" "path/filepath" + "strings" "sync" "time" ) @@ -212,28 +213,50 @@ func (s *RaftServer) Join(leader string) error { Name: s.raftServer.Name(), ConnectionString: s.connectionString(), } + connectUrl := leader + if !strings.HasPrefix(connectUrl, "http://") { + connectUrl = "http://" + connectUrl + } + if !strings.HasSuffix(connectUrl, "/join") { + connectUrl = connectUrl + "/join" + } var b bytes.Buffer json.NewEncoder(&b).Encode(command) - resp, err := http.Post(fmt.Sprintf("http://%s/join", leader), "application/json", &b) + resp, err := http.Post(connectUrl, "application/json", &b) if err != nil { + log.Println("ERROR: ", err) return err } - resp.Body.Close() + defer resp.Body.Close() + if resp.StatusCode == http.StatusTemporaryRedirect { + address := resp.Header.Get("Location") + log.Printf("Redirected to %s to join leader\n", address) + return s.Join(address) + } return nil } func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) { - command := &raft.DefaultJoinCommand{} + if s.raftServer.State() == raft.Leader { + command := &raft.DefaultJoinCommand{} - if err := json.NewDecoder(req.Body).Decode(&command); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - if _, err := s.raftServer.Do(command); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return + if err := json.NewDecoder(req.Body).Decode(&command); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if _, err := s.raftServer.Do(command); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } else { + if leader, ok := s.leaderConnectString(); ok { + log.Println("redirecting to leader to join...") + http.Redirect(w, req, leader+"/join", http.StatusTemporaryRedirect) + } else { + http.Error(w, errors.New("Couldn't find leader of the cluster to join").Error(), http.StatusInternalServerError) + } } }