Merge pull request #2201 from influxdb/jw-join-urls

Bring back config join URLs
pull/2209/head
Jason Wilder 2015-04-08 20:56:44 -06:00
commit 019110c9c0
8 changed files with 70 additions and 35 deletions

View File

@ -5,6 +5,7 @@
- [#2180](https://github.com/influxdb/influxdb/pull/2180): Allow http write handler to decode gzipped body - [#2180](https://github.com/influxdb/influxdb/pull/2180): Allow http write handler to decode gzipped body
- [#2175](https://github.com/influxdb/influxdb/pull/2175): Separate broker and data nodes - [#2175](https://github.com/influxdb/influxdb/pull/2175): Separate broker and data nodes
- [#2158](https://github.com/influxdb/influxdb/pull/2158): Allow user password to be changed. Thanks @n1tr0g - [#2158](https://github.com/influxdb/influxdb/pull/2158): Allow user password to be changed. Thanks @n1tr0g
- [#2201](https://github.com/influxdb/influxdb/pull/2201): Bring back config join URLs
### Bugfixes ### Bugfixes
- [#2181](https://github.com/influxdb/influxdb/pull/2181): Fix panic on "SHOW DIAGNOSTICS". - [#2181](https://github.com/influxdb/influxdb/pull/2181): Fix panic on "SHOW DIAGNOSTICS".

View File

@ -122,6 +122,14 @@ type Data struct {
RetentionCreatePeriod Duration `toml:"retention-create-period"` RetentionCreatePeriod Duration `toml:"retention-create-period"`
} }
// Initialization contains configuration options for the first time a node boots
type Initialization struct {
// JoinURLs are cluster URLs to use when joining a node to a cluster the first time it boots. After,
// a node is joined to a cluster, these URLS are ignored. These will be overriden at runtime if
// the node is started with the `-join` flag.
JoinURLs string `toml:"join-urls"`
}
// Config represents the configuration format for the influxd binary. // Config represents the configuration format for the influxd binary.
type Config struct { type Config struct {
Hostname string `toml:"hostname"` Hostname string `toml:"hostname"`
@ -131,6 +139,8 @@ type Config struct {
Version string `toml:"-"` Version string `toml:"-"`
InfluxDBVersion string `toml:"-"` InfluxDBVersion string `toml:"-"`
Initialization Initialization `toml:"initialization"`
Authentication struct { Authentication struct {
Enabled bool `toml:"enabled"` Enabled bool `toml:"enabled"`
} `toml:"authentication"` } `toml:"authentication"`

View File

@ -39,6 +39,11 @@ write-interval = "1m"
enabled = true enabled = true
port = 8083 port = 8083
# Controls certain parameters that only take effect until an initial successful
# start-up has occurred.
[initialization]
join-urls = "http://127.0.0.1:8086"
# Configure the http api # Configure the http api
[api] [api]
bind-address = "10.1.2.3" bind-address = "10.1.2.3"
@ -152,6 +157,10 @@ func TestParseConfig(t *testing.T) {
t.Fatalf("port mismatch. got %v, exp %v", c.Port, exp) t.Fatalf("port mismatch. got %v, exp %v", c.Port, exp)
} }
if c.Initialization.JoinURLs != "http://127.0.0.1:8086" {
t.Fatalf("JoinURLs mistmatch: %v", c.Initialization.JoinURLs)
}
if !c.Authentication.Enabled { if !c.Authentication.Enabled {
t.Fatalf("authentication enabled mismatch: %v", c.Authentication.Enabled) t.Fatalf("authentication enabled mismatch: %v", c.Authentication.Enabled)
} }

View File

@ -2,6 +2,7 @@ package main
import ( import (
"log" "log"
"math/rand"
"net/http" "net/http"
"net/url" "net/url"
"strings" "strings"
@ -73,7 +74,7 @@ func (h *Handler) serveMessaging(w http.ResponseWriter, r *http.Request) {
// serveMetadata responds to broker requests // serveMetadata responds to broker requests
func (h *Handler) serveMetadata(w http.ResponseWriter, r *http.Request) { func (h *Handler) serveMetadata(w http.ResponseWriter, r *http.Request) {
if h.Broker == nil && h.Server == nil { if h.Broker == nil && h.Server == nil {
log.Println("no broker or server configured to handle messaging endpoints") log.Println("no broker or server configured to handle metadata endpoints")
http.Error(w, "server unavailable", http.StatusServiceUnavailable) http.Error(w, "server unavailable", http.StatusServiceUnavailable)
return return
} }
@ -98,7 +99,7 @@ func (h *Handler) serveMetadata(w http.ResponseWriter, r *http.Request) {
// serveRaft responds to raft requests. // serveRaft responds to raft requests.
func (h *Handler) serveRaft(w http.ResponseWriter, r *http.Request) { func (h *Handler) serveRaft(w http.ResponseWriter, r *http.Request) {
if h.Log == nil && h.Server == nil { if h.Log == nil && h.Server == nil {
log.Println("no broker or server configured to handle messaging endpoints") log.Println("no broker or server configured to handle raft endpoints")
http.Error(w, "server unavailable", http.StatusServiceUnavailable) http.Error(w, "server unavailable", http.StatusServiceUnavailable)
return return
} }
@ -116,7 +117,7 @@ func (h *Handler) serveRaft(w http.ResponseWriter, r *http.Request) {
// serveData responds to data requests // serveData responds to data requests
func (h *Handler) serveData(w http.ResponseWriter, r *http.Request) { func (h *Handler) serveData(w http.ResponseWriter, r *http.Request) {
if h.Broker == nil && h.Server == nil { if h.Broker == nil && h.Server == nil {
log.Println("no broker or server configured to handle messaging endpoints") log.Println("no broker or server configured to handle data endpoints")
http.Error(w, "server unavailable", http.StatusServiceUnavailable) http.Error(w, "server unavailable", http.StatusServiceUnavailable)
return return
} }
@ -151,5 +152,5 @@ func (h *Handler) redirect(u []url.URL, w http.ResponseWriter, r *http.Request)
// this is happening frequently, the clients are using a suboptimal endpoint // this is happening frequently, the clients are using a suboptimal endpoint
// Redirect the client to a valid data node that can handle the request // Redirect the client to a valid data node that can handle the request
http.Redirect(w, r, u[0].String()+r.RequestURI, http.StatusTemporaryRedirect) http.Redirect(w, r, u[rand.Intn(len(u))].String()+r.RequestURI, http.StatusTemporaryRedirect)
} }

View File

@ -114,8 +114,15 @@ func (cmd *RunCommand) Run(args ...string) error {
cmd.Logger.Println("No config provided, using default settings") cmd.Logger.Println("No config provided, using default settings")
} }
// Use the config JoinURLs by default
joinURLs := cmd.config.Initialization.JoinURLs
// If a -join flag was passed, these should override the config
if join != "" {
joinURLs = join
}
cmd.CheckConfig() cmd.CheckConfig()
cmd.Open(cmd.config, join) cmd.Open(cmd.config, joinURLs)
// Wait indefinitely. // Wait indefinitely.
<-(chan struct{})(nil) <-(chan struct{})(nil)
@ -151,6 +158,9 @@ func (cmd *RunCommand) Open(config *Config, join string) (*messaging.Broker, *in
// Open broker & raft log, initialize or join as necessary. // Open broker & raft log, initialize or join as necessary.
if cmd.config.Broker.Enabled { if cmd.config.Broker.Enabled {
cmd.openBroker(joinURLs) cmd.openBroker(joinURLs)
// If were running as a broker locally, always connect to it since it must
// be ready before we can start the data node.
joinURLs = []url.URL{cmd.node.broker.URL()}
} }
// Start the broker handler. // Start the broker handler.
@ -423,25 +433,23 @@ func (cmd *RunCommand) openBroker(brokerURLs []url.URL) {
} }
index, _ := l.LastLogIndexTerm() index, _ := l.LastLogIndexTerm()
// Checks to see if the raft index is 0. If it's 0, it might be the first
// If we have join URLs and log is not initialized, attempt to join an existing cluster // node in the cluster and must initialize or join
if len(brokerURLs) > 0 { if index == 0 {
if index == 0 { // If we have join URLs, then attemp to join the cluster
if len(brokerURLs) > 0 {
joinLog(l, brokerURLs) joinLog(l, brokerURLs)
return return
} }
log.Printf("Node previously joined to a cluster. Ignoring -join urls flag")
}
// Checks to see if the raft index is 0. If it's 0, it's the first
// node in the cluster and must initialize
if index == 0 {
if err := l.Initialize(); err != nil { if err := l.Initialize(); err != nil {
log.Fatalf("initialize raft log: %s", err) log.Fatalf("initialize raft log: %s", err)
} }
u := b.Broker.URL() u := b.Broker.URL()
log.Printf("initialized broker: %s\n", (&u).String()) log.Printf("initialized broker: %s\n", (&u).String())
return } else {
log.Printf("broker already member of cluster. Using existing state and ignoring join URLs")
} }
} }
@ -450,9 +458,9 @@ func joinLog(l *raft.Log, brokerURLs []url.URL) {
// Attempts to join each server until successful. // Attempts to join each server until successful.
for _, u := range brokerURLs { for _, u := range brokerURLs {
if err := l.Join(u); err != nil { if err := l.Join(u); err != nil {
log.Printf("join: failed to connect to raft cluster: %s: %s", u, err) log.Printf("join: failed to connect to raft cluster: %s: %s", (&u).String(), err)
} else { } else {
log.Printf("join: connected raft log to %s", u) log.Printf("join: connected raft log to %s", (&u).String())
return return
} }
} }
@ -464,12 +472,7 @@ func (cmd *RunCommand) openServer(joinURLs []url.URL) *influxdb.Server {
// Create messaging client to the brokers. // Create messaging client to the brokers.
c := influxdb.NewMessagingClient(cmd.config.ClusterURL()) c := influxdb.NewMessagingClient(cmd.config.ClusterURL())
// If join URLs were passed in then use them to override the client's URLs. c.SetURLs(joinURLs)
if len(joinURLs) > 0 {
c.SetURLs(joinURLs)
} else if cmd.node.broker != nil {
c.SetURLs([]url.URL{cmd.node.broker.URL()})
}
if err := c.Open(filepath.Join(cmd.config.Data.Dir, messagingClientFile)); err != nil { if err := c.Open(filepath.Join(cmd.config.Data.Dir, messagingClientFile)); err != nil {
log.Fatalf("messaging client error: %s", err) log.Fatalf("messaging client error: %s", err)
@ -495,9 +498,9 @@ func (cmd *RunCommand) openServer(joinURLs []url.URL) *influxdb.Server {
// Open server with data directory and broker client. // Open server with data directory and broker client.
if err := s.Open(cmd.config.Data.Dir, c); err != nil { if err := s.Open(cmd.config.Data.Dir, c); err != nil {
log.Fatalf("failed to open data server: %v", err.Error()) log.Fatalf("failed to open data node: %v", err.Error())
} }
log.Printf("data server opened at %s", cmd.config.Data.Dir) log.Printf("data node opened at %s", cmd.config.Data.Dir)
dataNodeIndex := s.Index() dataNodeIndex := s.Index()
if dataNodeIndex == 0 { if dataNodeIndex == 0 {
@ -505,17 +508,16 @@ func (cmd *RunCommand) openServer(joinURLs []url.URL) *influxdb.Server {
joinServer(s, cmd.config.ClusterURL(), joinURLs) joinServer(s, cmd.config.ClusterURL(), joinURLs)
return s return s
} }
log.Printf("Node previously joined to a cluster. Ignoring -join urls flag")
}
if dataNodeIndex == 0 {
if err := s.Initialize(cmd.config.ClusterURL()); err != nil { if err := s.Initialize(cmd.config.ClusterURL()); err != nil {
log.Fatalf("server initialization error: %s", err) log.Fatalf("server initialization error: %s", err)
} }
u := cmd.config.ClusterURL() u := cmd.config.ClusterURL()
log.Printf("initialized data node: %s\n", (&u).String()) log.Printf("initialized data node: %s\n", (&u).String())
return s return s
} else {
log.Printf("data node already member of cluster. Using existing state and ignoring join URLs")
} }
return s return s
@ -536,7 +538,7 @@ func joinServer(s *influxdb.Server, u url.URL, joinURLs []url.URL) {
log.Printf("initialized data node: %s\n", (&u).String()) log.Printf("initialized data node: %s\n", (&u).String())
return return
} }
log.Printf("join: failed to connect data node: %s: %s", u, err) log.Printf("join: failed to connect data node: %s: %s", (&u).String(), err)
} else { } else {
log.Printf("join: connected data node to %s", u) log.Printf("join: connected data node to %s", u)
return return

View File

@ -1860,6 +1860,7 @@ func TestSeparateBrokerDataNode(t *testing.T) {
brokerConfig.ReportingDisabled = true brokerConfig.ReportingDisabled = true
dataConfig := main.NewConfig() dataConfig := main.NewConfig()
dataConfig.Port = 9001
dataConfig.Broker.Enabled = false dataConfig.Broker.Enabled = false
dataConfig.Data.Dir = filepath.Join(tmpDataDir, strconv.Itoa(dataConfig.Port)) dataConfig.Data.Dir = filepath.Join(tmpDataDir, strconv.Itoa(dataConfig.Port))
dataConfig.ReportingDisabled = true dataConfig.ReportingDisabled = true

View File

@ -17,6 +17,10 @@
# In the third case we have to define our own functions which are very dumb # In the third case we have to define our own functions which are very dumb
# and expect the args to be positioned correctly. # and expect the args to be positioned correctly.
# Command-line options that can be set in /etc/default/influxdb. These will override
# any config file values. Example: "-join http://1.2.3.4:8086"
INFLUXD_OPTS=
if [ -r /lib/lsb/init-functions ]; then if [ -r /lib/lsb/init-functions ]; then
source /lib/lsb/init-functions source /lib/lsb/init-functions
fi fi
@ -117,9 +121,9 @@ case $1 in
log_success_msg "Starting the process" "$name" log_success_msg "Starting the process" "$name"
if which start-stop-daemon > /dev/null 2>&1; then if which start-stop-daemon > /dev/null 2>&1; then
start-stop-daemon --chuid influxdb:influxdb --start --quiet --pidfile $pidfile --exec $daemon -- -pidfile $pidfile -config $config >>$STDOUT 2>>$STDERR & start-stop-daemon --chuid influxdb:influxdb --start --quiet --pidfile $pidfile --exec $daemon -- -pidfile $pidfile -config $config $INFLUXD_OPTS >>$STDOUT 2>>$STDERR &
else else
nohup $daemon -pidfile $pidfile -config $config >>$STDOUT 2>>$STDERR & nohup $daemon -pidfile $pidfile -config $config $INFLUXD_OPTS >>$STDOUT 2>>$STDERR &
fi fi
log_success_msg "$name process was started" log_success_msg "$name process was started"
;; ;;

View File

@ -677,9 +677,8 @@ func (s *Server) Join(u *url.URL, joinURL *url.URL) error {
// the Location header is where we should resend the POST. We also need to re-encode // the Location header is where we should resend the POST. We also need to re-encode
// body since the buf was already read. // body since the buf was already read.
for { for {
// Should never get here but bail to avoid a infinite redirect loop to be safe // Should never get here but bail to avoid a infinite redirect loop to be safe
if retries >= 3 { if retries >= 60 {
return ErrUnableToJoin return ErrUnableToJoin
} }
@ -706,7 +705,15 @@ func (s *Server) Join(u *url.URL, joinURL *url.URL) error {
// We likely tried to join onto a broker which cannot handle this request. It // We likely tried to join onto a broker which cannot handle this request. It
// has given us the address of a known data node to join instead. // has given us the address of a known data node to join instead.
if resp.StatusCode == http.StatusTemporaryRedirect { if resp.StatusCode == http.StatusTemporaryRedirect {
joinURL, err = url.Parse(resp.Header.Get("Location")) redirectURL, err := url.Parse(resp.Header.Get("Location"))
// if we happen to get redirected back to ourselves then we'll never join. This
// may because the heartbeater could have already fired once, registering our endpoints
// as a data node and the broker is redirecting data node requests back to us. In
// this case, just re-request the original URL again util we get a different node.
if redirectURL.Host != u.Host {
joinURL = redirectURL
}
if err != nil { if err != nil {
return err return err
} }