Fix cluster-wide restart issue.

pull/2283/head
Ben Johnson 2015-04-14 13:43:25 -06:00
parent 1da5bc3746
commit c5bdb5af86
2 changed files with 18 additions and 13 deletions

View File

@ -499,9 +499,9 @@ func (cmd *RunCommand) openBroker(brokerURLs []url.URL) {
log.Fatalf("raft: %s", err)
}
index, _ := l.LastLogIndexTerm()
// Checks to see if the raft index is 0. If it's 0, it might be the first
// node in the cluster and must initialize or join
index, _ := l.LastLogIndexTerm()
if index == 0 {
// If we have join URLs, then attemp to join the cluster
if len(brokerURLs) > 0 {
@ -524,7 +524,9 @@ func (cmd *RunCommand) openBroker(brokerURLs []url.URL) {
func joinLog(l *raft.Log, brokerURLs []url.URL) {
// Attempts to join each server until successful.
for _, u := range brokerURLs {
if err := l.Join(u); err != nil {
if err := l.Join(u); err == raft.ErrInitialized {
return
} else if err != nil {
log.Printf("join: failed to connect to raft cluster: %s: %s", (&u).String(), err)
} else {
log.Printf("join: connected raft log to %s", (&u).String())
@ -567,17 +569,19 @@ func (cmd *RunCommand) openServer(joinURLs []url.URL) *influxdb.Server {
if err := s.Open(cmd.config.Data.Dir, c); err != nil {
log.Fatalf("failed to open data node: %v", err.Error())
}
log.Printf("data node opened at %s", cmd.config.Data.Dir)
log.Printf("data node(%d) opened at %s", s.ID(), cmd.config.Data.Dir)
dataNodeIndex := s.Index()
if dataNodeIndex == 0 {
// Give brokers time to elect a leader if entire cluster is being restarted.
time.Sleep(1 * time.Second)
if s.ID() == 0 && s.Index() == 0 {
if len(joinURLs) > 0 {
joinServer(s, cmd.config.ClusterURL(), joinURLs)
return s
}
if err := s.Initialize(cmd.config.ClusterURL()); err != nil {
log.Fatalf("server initialization error: %s", err)
log.Fatalf("server initialization error(0): %s", err)
}
u := cmd.config.ClusterURL()
@ -596,15 +600,14 @@ func joinServer(s *influxdb.Server, u url.URL, joinURLs []url.URL) {
// Create data node on an existing data node.
for _, joinURL := range joinURLs {
if err := s.Join(&u, &joinURL); err != nil {
if err := s.Join(&u, &joinURL); err == influxdb.ErrDataNodeNotFound {
// No data nodes could be found to join. We're the first.
if err == influxdb.ErrDataNodeNotFound {
if err := s.Initialize(u); err != nil {
log.Fatalf("server initialization error: %s", err)
}
log.Printf("initialized data node: %s\n", (&u).String())
return
if err := s.Initialize(u); err != nil {
log.Fatalf("server initialization error(1): %s", err)
}
log.Printf("initialized data node: %s\n", (&u).String())
return
} else if err != nil {
log.Printf("join: failed to connect data node: %s: %s", (&u).String(), err)
} else {
log.Printf("join: connected data node to %s", u)

View File

@ -700,6 +700,7 @@ func (s *Server) Join(u *url.URL, joinURL *url.URL) error {
// If we get a service unavailable, the other data nodes may still be booting
// so retry again
if resp.StatusCode == http.StatusServiceUnavailable {
s.Logger.Printf("join unavailable, retrying")
retries += 1
time.Sleep(1 * time.Second)
continue
@ -709,6 +710,7 @@ func (s *Server) Join(u *url.URL, joinURL *url.URL) error {
// has given us the address of a known data node to join instead.
if resp.StatusCode == http.StatusTemporaryRedirect {
redirectURL, err := url.Parse(resp.Header.Get("Location"))
s.Logger.Printf("redirect join: %s", redirectURL)
// if we happen to get redirected back to ourselves then we'll never join. This
// may because the heartbeater could have already fired once, registering our endpoints