diff --git a/cmd/influxd/config.go b/cmd/influxd/config.go index 8ade01cd67..957bc0c91c 100644 --- a/cmd/influxd/config.go +++ b/cmd/influxd/config.go @@ -162,6 +162,11 @@ func (c *Config) RaftListenAddr() string { return fmt.Sprintf("%s:%d", c.BindAddress, c.Raft.Port) } +// RaftConnectionString returns the address required to contact the Raft server +func (c *Config) RaftConnectionString() string { + return fmt.Sprintf("%s:%d", c.Hostname, c.Raft.Port) +} + // Size represents a TOML parseable file size. // Users can specify size using "m" for megabytes and "g" for gigabytes. type Size int @@ -263,10 +268,6 @@ func (c *Config) ProtobufConnectionString() string { return fmt.Sprintf("%s:%d", c.Hostname, c.ProtobufPort) } -func (c *Config) RaftConnectionString() string { - return fmt.Sprintf("http://%s:%d", c.Hostname, c.RaftServerPort) -} - func (c *Config) ProtobufListenString() string { return fmt.Sprintf("%s:%d", c.BindAddress, c.ProtobufPort) } diff --git a/cmd/influxd/create_cluster.go b/cmd/influxd/create_cluster.go index bfa12a3d86..cb2e5f2c7e 100644 --- a/cmd/influxd/create_cluster.go +++ b/cmd/influxd/create_cluster.go @@ -34,7 +34,7 @@ func execCreateCluster(args []string) { // Create the broker. b := messaging.NewBroker() - if err := b.Open(config.Raft.Dir); err != nil { + if err := b.Open(config.Raft.Dir, config.RaftConnectionString()); err != nil { log.Fatalf("broker: %s", err.Error()) } diff --git a/cmd/influxd/join_cluster.go b/cmd/influxd/join_cluster.go index a2679ea807..f4d794d4b7 100644 --- a/cmd/influxd/join_cluster.go +++ b/cmd/influxd/join_cluster.go @@ -50,19 +50,21 @@ func execJoinCluster(args []string) { // Broker required -- but don't initialize it. // Joining a cluster will do that. b := messaging.NewBroker() - if err := b.Open(config.Raft.Dir); err != nil { + if err := b.Open(config.Raft.Dir, config.RaftConnectionString()); err != nil { log.Fatalf("join: %s", err) } // Loop through each, connecting to one must succeed. joined := false for _, s := range seedURLs { - if err := b.Join(s); err == nil { + err := b.Join(s) + if err != nil { + log.Println("error: join failed to connect to", s, err) + } else { log.Println("join: connected successfully to", s) joined = true break } - log.Println("error: join failed to connect to", s) } if !joined { @@ -92,7 +94,7 @@ func execJoinCluster(args []string) { } - log.Printf("joined cluster at %s", *seedServers) + log.Printf("joined cluster as '%s' at %s", *role, *seedServers) } func printJoinClusterUsage() { diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index 67238aa109..c4374784ba 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -81,7 +81,7 @@ func execRun(args []string) { // If the Broker directory exists, open a Broker on this node. if brokerDirExists { b := messaging.NewBroker() - if err := b.Open(config.Raft.Dir); err != nil { + if err := b.Open(config.Raft.Dir, config.RaftConnectionString()); err != nil { log.Fatalf("failed to open Broker", err.Error()) } brokerHandler = messaging.NewHandler(b) diff --git a/messaging/broker.go b/messaging/broker.go index c4f004dbd8..16aa051144 100644 --- a/messaging/broker.go +++ b/messaging/broker.go @@ -51,7 +51,7 @@ func (b *Broker) opened() bool { return b.path != "" } // Open initializes the log. // The broker then must be initialized or join a cluster before it can be used. -func (b *Broker) Open(path string) error { +func (b *Broker) Open(path string, connectionAddr string) error { b.mu.Lock() defer b.mu.Unlock() @@ -61,10 +61,15 @@ func (b *Broker) Open(path string) error { } b.path = path - // Open underlying raft log. + // Open underlying raft log and set its connection URL. if err := b.log.Open(filepath.Join(path, "raft")); err != nil { return fmt.Errorf("raft: %s", err) } + u, err := url.Parse(connectionAddr) + if err != nil { + return fmt.Errorf("broker: %s", err) + } + b.log.URL = u return nil }