From 6e124f3ab6fb87e2eeda3f950a60c220ea2981ff Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Wed, 15 Apr 2015 11:18:44 -0600 Subject: [PATCH] Start cluster before broker. This commit fixes an issue where the second node joins but the first node cannot commit because it doesn't have the HTTP endpoint running yet. This is a side effect of streaming raft since we don't synchronize the quorum set with the heartbeats. We should cache the config per term in the future. --- cmd/influxd/run.go | 28 +++++++++++++--------------- raft/log.go | 5 +++++ 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index 9eff5b1710..1a24aa7443 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -254,27 +254,21 @@ func (cmd *RunCommand) Open(config *Config, join string) *Node { // Parse join urls from the --join flag. joinURLs := parseURLs(join) + // Start the broker handler. + h := &Handler{Config: config} + if err := cmd.node.openClusterListener(cmd.config.ClusterAddr(), h); err != nil { + log.Fatalf("Cluster server failed to listen on %s. %s ", cmd.config.ClusterAddr(), err) + } + log.Printf("Cluster server listening on %s", cmd.config.ClusterAddr()) + // Open broker & raft log, initialize or join as necessary. if cmd.config.Broker.Enabled { - cmd.openBroker(joinURLs) + cmd.openBroker(joinURLs, h) // If were running as a broker locally, always connect to it since it must // be ready before we can start the data node. joinURLs = []url.URL{cmd.node.Broker.URL()} } - // Start the broker handler. - h := &Handler{ - Config: config, - Broker: cmd.node.Broker, - Log: cmd.node.raftLog, - } - - err := cmd.node.openClusterListener(cmd.config.ClusterAddr(), h) - if err != nil { - log.Fatalf("Cluster server failed to listen on %s. %s ", cmd.config.ClusterAddr(), err) - } - log.Printf("Cluster server listening on %s", cmd.config.ClusterAddr()) - var s *influxdb.Server // Open server, initialize or join as necessary. if cmd.config.Data.Enabled { @@ -469,7 +463,7 @@ func writePIDFile(path string) { } // creates and initializes a broker. -func (cmd *RunCommand) openBroker(brokerURLs []url.URL) { +func (cmd *RunCommand) openBroker(brokerURLs []url.URL, h *Handler) { path := cmd.config.BrokerDir() u := cmd.config.ClusterURL() raftTracing := cmd.config.Logging.RaftTracing @@ -499,6 +493,10 @@ func (cmd *RunCommand) openBroker(brokerURLs []url.URL) { log.Fatalf("raft: %s", err) } + // Attached broker and log to handler. + h.Broker = b + h.Log = l + // Checks to see if the raft index is 0. If it's 0, it might be the first // node in the cluster and must initialize or join index, _ := l.LastLogIndexTerm() diff --git a/raft/log.go b/raft/log.go index 643a81fe1c..26be8cec0d 100644 --- a/raft/log.go +++ b/raft/log.go @@ -886,8 +886,11 @@ func (l *Log) followerLoop(closing <-chan struct{}) State { close(ch) // Ignore timeout if we are snapshotting. + // Or if we haven't received confirmation of join. if l.isSnapshotting() { continue + } else if l.FSM.Index() == 0 { + continue } // TODO: Prevote before becoming candidate. @@ -1349,6 +1352,7 @@ func (l *Log) appendToWriters(buf []byte) { // If an error occurs then remove the writer and close it. if _, err := w.Write(buf); err != nil { + l.Logger.Printf("append to writers error: %s", err) l.removeWriter(w) i-- continue @@ -1797,6 +1801,7 @@ func (l *Log) removeWriter(writer *logWriter) { l.writers[len(l.writers)-1] = nil l.writers = l.writers[:len(l.writers)-1] _ = w.Close() + l.Logger.Printf("writer removed: %#v", w) break } }