Add connection URL to Broker
parent
e1ef61aafc
commit
021604a6de
|
@ -162,6 +162,11 @@ func (c *Config) RaftListenAddr() string {
|
|||
return fmt.Sprintf("%s:%d", c.BindAddress, c.Raft.Port)
|
||||
}
|
||||
|
||||
// RaftConnectionString returns the address required to contact the Raft server
|
||||
func (c *Config) RaftConnectionString() string {
|
||||
return fmt.Sprintf("%s:%d", c.Hostname, c.Raft.Port)
|
||||
}
|
||||
|
||||
// Size represents a TOML parseable file size.
|
||||
// Users can specify size using "m" for megabytes and "g" for gigabytes.
|
||||
type Size int
|
||||
|
@ -263,10 +268,6 @@ func (c *Config) ProtobufConnectionString() string {
|
|||
return fmt.Sprintf("%s:%d", c.Hostname, c.ProtobufPort)
|
||||
}
|
||||
|
||||
func (c *Config) RaftConnectionString() string {
|
||||
return fmt.Sprintf("http://%s:%d", c.Hostname, c.RaftServerPort)
|
||||
}
|
||||
|
||||
func (c *Config) ProtobufListenString() string {
|
||||
return fmt.Sprintf("%s:%d", c.BindAddress, c.ProtobufPort)
|
||||
}
|
||||
|
|
|
@ -34,7 +34,7 @@ func execCreateCluster(args []string) {
|
|||
|
||||
// Create the broker.
|
||||
b := messaging.NewBroker()
|
||||
if err := b.Open(config.Raft.Dir); err != nil {
|
||||
if err := b.Open(config.Raft.Dir, config.RaftConnectionString()); err != nil {
|
||||
log.Fatalf("broker: %s", err.Error())
|
||||
}
|
||||
|
||||
|
|
|
@ -50,19 +50,21 @@ func execJoinCluster(args []string) {
|
|||
// Broker required -- but don't initialize it.
|
||||
// Joining a cluster will do that.
|
||||
b := messaging.NewBroker()
|
||||
if err := b.Open(config.Raft.Dir); err != nil {
|
||||
if err := b.Open(config.Raft.Dir, config.RaftConnectionString()); err != nil {
|
||||
log.Fatalf("join: %s", err)
|
||||
}
|
||||
|
||||
// Loop through each, connecting to one must succeed.
|
||||
joined := false
|
||||
for _, s := range seedURLs {
|
||||
if err := b.Join(s); err == nil {
|
||||
err := b.Join(s)
|
||||
if err != nil {
|
||||
log.Println("error: join failed to connect to", s, err)
|
||||
} else {
|
||||
log.Println("join: connected successfully to", s)
|
||||
joined = true
|
||||
break
|
||||
}
|
||||
log.Println("error: join failed to connect to", s)
|
||||
}
|
||||
|
||||
if !joined {
|
||||
|
@ -92,7 +94,7 @@ func execJoinCluster(args []string) {
|
|||
|
||||
}
|
||||
|
||||
log.Printf("joined cluster at %s", *seedServers)
|
||||
log.Printf("joined cluster as '%s' at %s", *role, *seedServers)
|
||||
}
|
||||
|
||||
func printJoinClusterUsage() {
|
||||
|
|
|
@ -81,7 +81,7 @@ func execRun(args []string) {
|
|||
// If the Broker directory exists, open a Broker on this node.
|
||||
if brokerDirExists {
|
||||
b := messaging.NewBroker()
|
||||
if err := b.Open(config.Raft.Dir); err != nil {
|
||||
if err := b.Open(config.Raft.Dir, config.RaftConnectionString()); err != nil {
|
||||
log.Fatalf("failed to open Broker", err.Error())
|
||||
}
|
||||
brokerHandler = messaging.NewHandler(b)
|
||||
|
|
|
@ -51,7 +51,7 @@ func (b *Broker) opened() bool { return b.path != "" }
|
|||
|
||||
// Open initializes the log.
|
||||
// The broker then must be initialized or join a cluster before it can be used.
|
||||
func (b *Broker) Open(path string) error {
|
||||
func (b *Broker) Open(path string, connectionAddr string) error {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
|
@ -61,10 +61,15 @@ func (b *Broker) Open(path string) error {
|
|||
}
|
||||
b.path = path
|
||||
|
||||
// Open underlying raft log.
|
||||
// Open underlying raft log and set its connection URL.
|
||||
if err := b.log.Open(filepath.Join(path, "raft")); err != nil {
|
||||
return fmt.Errorf("raft: %s", err)
|
||||
}
|
||||
u, err := url.Parse(connectionAddr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("broker: %s", err)
|
||||
}
|
||||
b.log.URL = u
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue