Move openBroker to RunCommand.openBroker
parent
c63866ea62
commit
6fa0ea01dd
|
@ -10,24 +10,17 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb"
|
||||
"github.com/influxdb/influxdb/cmd/influxd"
|
||||
main "github.com/influxdb/influxdb/cmd/influxd"
|
||||
)
|
||||
|
||||
func newConfig(path string, brokerPort, dataPort, snapshotPort int) main.Config {
|
||||
return main.Config{
|
||||
Broker: main.Broker{
|
||||
Port: brokerPort,
|
||||
Dir: filepath.Join(path, "broker"),
|
||||
},
|
||||
Data: main.Data{
|
||||
Port: dataPort,
|
||||
Dir: filepath.Join(path, "data"),
|
||||
RetentionAutoCreate: true,
|
||||
},
|
||||
Snapshot: main.Snapshot{
|
||||
Port: snapshotPort,
|
||||
},
|
||||
}
|
||||
config := main.NewConfig()
|
||||
config.Broker.Port = brokerPort
|
||||
config.Broker.Dir = filepath.Join(path, "broker")
|
||||
config.Data.Port = dataPort
|
||||
config.Data.Dir = filepath.Join(path, "data")
|
||||
config.Snapshot.Port = snapshotPort
|
||||
return *config
|
||||
}
|
||||
|
||||
// Ensure the restore command can expand a snapshot and bootstrap a broker.
|
||||
|
|
|
@ -118,7 +118,7 @@ func (cmd *RunCommand) Open(config *Config, join string) (*messaging.Broker, *in
|
|||
}
|
||||
|
||||
// Open broker & raft log, initialize or join as necessary.
|
||||
b, l := openBroker(cmd.config.BrokerDir(), cmd.config.BrokerURL(), initBroker, joinURLs, cmd.config.Logging.RaftTracing)
|
||||
b, l := cmd.openBroker(joinURLs)
|
||||
|
||||
// Start the broker handler.
|
||||
h := &Handler{
|
||||
|
@ -341,7 +341,11 @@ func parseConfig(path, hostname string) (*Config, error) {
|
|||
}
|
||||
|
||||
// creates and initializes a broker.
|
||||
func openBroker(path string, u url.URL, initializing bool, joinURLs []url.URL, raftTracing bool) (*influxdb.Broker, *raft.Log) {
|
||||
func (cmd *RunCommand) openBroker(joinURLs []url.URL) (*influxdb.Broker, *raft.Log) {
|
||||
path := cmd.config.BrokerDir()
|
||||
u := cmd.config.BrokerURL()
|
||||
raftTracing := cmd.config.Logging.RaftTracing
|
||||
|
||||
// Create raft log.
|
||||
l := raft.NewLog()
|
||||
l.SetURL(u)
|
||||
|
@ -365,17 +369,18 @@ func openBroker(path string, u url.URL, initializing bool, joinURLs []url.URL, r
|
|||
log.Fatalf("raft: %s", err)
|
||||
}
|
||||
|
||||
// If this is a new broker then we can initialize two ways:
|
||||
// 1) Start a brand new cluster.
|
||||
// 2) Join an existing cluster.
|
||||
if initializing {
|
||||
if len(joinURLs) == 0 {
|
||||
if err := l.Initialize(); err != nil {
|
||||
log.Fatalf("initialize raft log: %s", err)
|
||||
}
|
||||
} else {
|
||||
joinLog(l, joinURLs)
|
||||
// Checks to see if the raft index is 0. If it's 0, it's the first
|
||||
// node in the cluster and must initialize
|
||||
if i, _ := l.LastLogIndexTerm(); i == 0 {
|
||||
if err := l.Initialize(); err != nil {
|
||||
log.Fatalf("initialize raft log: %s", err)
|
||||
}
|
||||
return b, l
|
||||
}
|
||||
|
||||
// If we have join URLs, attemp to join an existing cluster
|
||||
if len(joinURLs) > 0 {
|
||||
joinLog(l, joinURLs)
|
||||
}
|
||||
|
||||
return b, l
|
||||
|
|
Loading…
Reference in New Issue