diff --git a/CHANGELOG.md b/CHANGELOG.md index f88aad6873..da86aca9e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ - [#2180](https://github.com/influxdb/influxdb/pull/2180): Allow http write handler to decode gzipped body - [#2175](https://github.com/influxdb/influxdb/pull/2175): Separate broker and data nodes - [#2158](https://github.com/influxdb/influxdb/pull/2158): Allow user password to be changed. Thanks @n1tr0g +- [#2201](https://github.com/influxdb/influxdb/pull/2201): Bring back config join URLs ### Bugfixes - [#2181](https://github.com/influxdb/influxdb/pull/2181): Fix panic on "SHOW DIAGNOSTICS". diff --git a/cmd/influxd/config.go b/cmd/influxd/config.go index 1f1573dc5c..7ccd8fc82c 100644 --- a/cmd/influxd/config.go +++ b/cmd/influxd/config.go @@ -122,6 +122,14 @@ type Data struct { RetentionCreatePeriod Duration `toml:"retention-create-period"` } +// Initialization contains configuration options for the first time a node boots +type Initialization struct { + // JoinURLs are cluster URLs to use when joining a node to a cluster the first time it boots. After, + // a node is joined to a cluster, these URLS are ignored. These will be overriden at runtime if + // the node is started with the `-join` flag. + JoinURLs string `toml:"join-urls"` +} + // Config represents the configuration format for the influxd binary. type Config struct { Hostname string `toml:"hostname"` @@ -131,6 +139,8 @@ type Config struct { Version string `toml:"-"` InfluxDBVersion string `toml:"-"` + Initialization Initialization `toml:"initialization"` + Authentication struct { Enabled bool `toml:"enabled"` } `toml:"authentication"` diff --git a/cmd/influxd/config_test.go b/cmd/influxd/config_test.go index 5e70680e38..9013a8bacd 100644 --- a/cmd/influxd/config_test.go +++ b/cmd/influxd/config_test.go @@ -39,6 +39,11 @@ write-interval = "1m" enabled = true port = 8083 +# Controls certain parameters that only take effect until an initial successful +# start-up has occurred. +[initialization] +join-urls = "http://127.0.0.1:8086" + # Configure the http api [api] bind-address = "10.1.2.3" @@ -152,6 +157,10 @@ func TestParseConfig(t *testing.T) { t.Fatalf("port mismatch. got %v, exp %v", c.Port, exp) } + if c.Initialization.JoinURLs != "http://127.0.0.1:8086" { + t.Fatalf("JoinURLs mistmatch: %v", c.Initialization.JoinURLs) + } + if !c.Authentication.Enabled { t.Fatalf("authentication enabled mismatch: %v", c.Authentication.Enabled) } diff --git a/cmd/influxd/handler.go b/cmd/influxd/handler.go index 89b656905a..e9bd987fdf 100644 --- a/cmd/influxd/handler.go +++ b/cmd/influxd/handler.go @@ -2,6 +2,7 @@ package main import ( "log" + "math/rand" "net/http" "net/url" "strings" @@ -73,7 +74,7 @@ func (h *Handler) serveMessaging(w http.ResponseWriter, r *http.Request) { // serveMetadata responds to broker requests func (h *Handler) serveMetadata(w http.ResponseWriter, r *http.Request) { if h.Broker == nil && h.Server == nil { - log.Println("no broker or server configured to handle messaging endpoints") + log.Println("no broker or server configured to handle metadata endpoints") http.Error(w, "server unavailable", http.StatusServiceUnavailable) return } @@ -98,7 +99,7 @@ func (h *Handler) serveMetadata(w http.ResponseWriter, r *http.Request) { // serveRaft responds to raft requests. func (h *Handler) serveRaft(w http.ResponseWriter, r *http.Request) { if h.Log == nil && h.Server == nil { - log.Println("no broker or server configured to handle messaging endpoints") + log.Println("no broker or server configured to handle raft endpoints") http.Error(w, "server unavailable", http.StatusServiceUnavailable) return } @@ -116,7 +117,7 @@ func (h *Handler) serveRaft(w http.ResponseWriter, r *http.Request) { // serveData responds to data requests func (h *Handler) serveData(w http.ResponseWriter, r *http.Request) { if h.Broker == nil && h.Server == nil { - log.Println("no broker or server configured to handle messaging endpoints") + log.Println("no broker or server configured to handle data endpoints") http.Error(w, "server unavailable", http.StatusServiceUnavailable) return } @@ -151,5 +152,5 @@ func (h *Handler) redirect(u []url.URL, w http.ResponseWriter, r *http.Request) // this is happening frequently, the clients are using a suboptimal endpoint // Redirect the client to a valid data node that can handle the request - http.Redirect(w, r, u[0].String()+r.RequestURI, http.StatusTemporaryRedirect) + http.Redirect(w, r, u[rand.Intn(len(u))].String()+r.RequestURI, http.StatusTemporaryRedirect) } diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index e695fa39b4..ce1287a8bf 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -114,8 +114,15 @@ func (cmd *RunCommand) Run(args ...string) error { cmd.Logger.Println("No config provided, using default settings") } + // Use the config JoinURLs by default + joinURLs := cmd.config.Initialization.JoinURLs + + // If a -join flag was passed, these should override the config + if join != "" { + joinURLs = join + } cmd.CheckConfig() - cmd.Open(cmd.config, join) + cmd.Open(cmd.config, joinURLs) // Wait indefinitely. <-(chan struct{})(nil) @@ -151,6 +158,9 @@ func (cmd *RunCommand) Open(config *Config, join string) (*messaging.Broker, *in // Open broker & raft log, initialize or join as necessary. if cmd.config.Broker.Enabled { cmd.openBroker(joinURLs) + // If were running as a broker locally, always connect to it since it must + // be ready before we can start the data node. + joinURLs = []url.URL{cmd.node.broker.URL()} } // Start the broker handler. @@ -423,25 +433,23 @@ func (cmd *RunCommand) openBroker(brokerURLs []url.URL) { } index, _ := l.LastLogIndexTerm() - - // If we have join URLs and log is not initialized, attempt to join an existing cluster - if len(brokerURLs) > 0 { - if index == 0 { + // Checks to see if the raft index is 0. If it's 0, it might be the first + // node in the cluster and must initialize or join + if index == 0 { + // If we have join URLs, then attemp to join the cluster + if len(brokerURLs) > 0 { joinLog(l, brokerURLs) return } - log.Printf("Node previously joined to a cluster. Ignoring -join urls flag") - } - // Checks to see if the raft index is 0. If it's 0, it's the first - // node in the cluster and must initialize - if index == 0 { if err := l.Initialize(); err != nil { log.Fatalf("initialize raft log: %s", err) } + u := b.Broker.URL() log.Printf("initialized broker: %s\n", (&u).String()) - return + } else { + log.Printf("broker already member of cluster. Using existing state and ignoring join URLs") } } @@ -450,9 +458,9 @@ func joinLog(l *raft.Log, brokerURLs []url.URL) { // Attempts to join each server until successful. for _, u := range brokerURLs { if err := l.Join(u); err != nil { - log.Printf("join: failed to connect to raft cluster: %s: %s", u, err) + log.Printf("join: failed to connect to raft cluster: %s: %s", (&u).String(), err) } else { - log.Printf("join: connected raft log to %s", u) + log.Printf("join: connected raft log to %s", (&u).String()) return } } @@ -464,12 +472,7 @@ func (cmd *RunCommand) openServer(joinURLs []url.URL) *influxdb.Server { // Create messaging client to the brokers. c := influxdb.NewMessagingClient(cmd.config.ClusterURL()) - // If join URLs were passed in then use them to override the client's URLs. - if len(joinURLs) > 0 { - c.SetURLs(joinURLs) - } else if cmd.node.broker != nil { - c.SetURLs([]url.URL{cmd.node.broker.URL()}) - } + c.SetURLs(joinURLs) if err := c.Open(filepath.Join(cmd.config.Data.Dir, messagingClientFile)); err != nil { log.Fatalf("messaging client error: %s", err) @@ -495,9 +498,9 @@ func (cmd *RunCommand) openServer(joinURLs []url.URL) *influxdb.Server { // Open server with data directory and broker client. if err := s.Open(cmd.config.Data.Dir, c); err != nil { - log.Fatalf("failed to open data server: %v", err.Error()) + log.Fatalf("failed to open data node: %v", err.Error()) } - log.Printf("data server opened at %s", cmd.config.Data.Dir) + log.Printf("data node opened at %s", cmd.config.Data.Dir) dataNodeIndex := s.Index() if dataNodeIndex == 0 { @@ -505,17 +508,16 @@ func (cmd *RunCommand) openServer(joinURLs []url.URL) *influxdb.Server { joinServer(s, cmd.config.ClusterURL(), joinURLs) return s } - log.Printf("Node previously joined to a cluster. Ignoring -join urls flag") - } - - if dataNodeIndex == 0 { if err := s.Initialize(cmd.config.ClusterURL()); err != nil { log.Fatalf("server initialization error: %s", err) } + u := cmd.config.ClusterURL() log.Printf("initialized data node: %s\n", (&u).String()) return s + } else { + log.Printf("data node already member of cluster. Using existing state and ignoring join URLs") } return s @@ -536,7 +538,7 @@ func joinServer(s *influxdb.Server, u url.URL, joinURLs []url.URL) { log.Printf("initialized data node: %s\n", (&u).String()) return } - log.Printf("join: failed to connect data node: %s: %s", u, err) + log.Printf("join: failed to connect data node: %s: %s", (&u).String(), err) } else { log.Printf("join: connected data node to %s", u) return diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index 6590cb2ab1..e03ea22d0b 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -1860,6 +1860,7 @@ func TestSeparateBrokerDataNode(t *testing.T) { brokerConfig.ReportingDisabled = true dataConfig := main.NewConfig() + dataConfig.Port = 9001 dataConfig.Broker.Enabled = false dataConfig.Data.Dir = filepath.Join(tmpDataDir, strconv.Itoa(dataConfig.Port)) dataConfig.ReportingDisabled = true diff --git a/scripts/init.sh b/scripts/init.sh index 99d8029f3f..d20d40e657 100755 --- a/scripts/init.sh +++ b/scripts/init.sh @@ -17,6 +17,10 @@ # In the third case we have to define our own functions which are very dumb # and expect the args to be positioned correctly. +# Command-line options that can be set in /etc/default/influxdb. These will override +# any config file values. Example: "-join http://1.2.3.4:8086" +INFLUXD_OPTS= + if [ -r /lib/lsb/init-functions ]; then source /lib/lsb/init-functions fi @@ -117,9 +121,9 @@ case $1 in log_success_msg "Starting the process" "$name" if which start-stop-daemon > /dev/null 2>&1; then - start-stop-daemon --chuid influxdb:influxdb --start --quiet --pidfile $pidfile --exec $daemon -- -pidfile $pidfile -config $config >>$STDOUT 2>>$STDERR & + start-stop-daemon --chuid influxdb:influxdb --start --quiet --pidfile $pidfile --exec $daemon -- -pidfile $pidfile -config $config $INFLUXD_OPTS >>$STDOUT 2>>$STDERR & else - nohup $daemon -pidfile $pidfile -config $config >>$STDOUT 2>>$STDERR & + nohup $daemon -pidfile $pidfile -config $config $INFLUXD_OPTS >>$STDOUT 2>>$STDERR & fi log_success_msg "$name process was started" ;; diff --git a/server.go b/server.go index e0b7f254a8..e1b887e3a2 100644 --- a/server.go +++ b/server.go @@ -677,9 +677,8 @@ func (s *Server) Join(u *url.URL, joinURL *url.URL) error { // the Location header is where we should resend the POST. We also need to re-encode // body since the buf was already read. for { - // Should never get here but bail to avoid a infinite redirect loop to be safe - if retries >= 3 { + if retries >= 60 { return ErrUnableToJoin } @@ -706,7 +705,15 @@ func (s *Server) Join(u *url.URL, joinURL *url.URL) error { // We likely tried to join onto a broker which cannot handle this request. It // has given us the address of a known data node to join instead. if resp.StatusCode == http.StatusTemporaryRedirect { - joinURL, err = url.Parse(resp.Header.Get("Location")) + redirectURL, err := url.Parse(resp.Header.Get("Location")) + + // if we happen to get redirected back to ourselves then we'll never join. This + // may because the heartbeater could have already fired once, registering our endpoints + // as a data node and the broker is redirecting data node requests back to us. In + // this case, just re-request the original URL again util we get a different node. + if redirectURL.Host != u.Host { + joinURL = redirectURL + } if err != nil { return err }