Simplify 'run' control

This change refactors the existing 'run' command such that repeated code
is removed. It uses -- in a much clearer manner -- the existence of the
Broker and Storage directories to work out in which mode it should run.
The only extra piece of state required on a disk is an empty file,
whose simple existence indicates that a local (loopback) client is
needed.

With this change in place the 'join-cluster' and 'create-cluster'
commands are much closer to being fully functional, since they simply
don't create the empty file. Therefore it all just works.
pull/1233/head
Philip O'Toole 2014-12-15 19:35:26 -08:00
parent 8fb8e72c33
commit e70f00723c
1 changed files with 41 additions and 65 deletions

View File

@ -1,7 +1,6 @@
package main
import (
"encoding/json"
"flag"
"io/ioutil"
"log"
@ -15,9 +14,6 @@ import (
"github.com/influxdb/influxdb/messaging"
)
// stateFilename represents the name of the file to store node state.
const stateFilename = "state"
// execRun runs the "run" command.
func execRun(args []string) {
// Parse command flags.
@ -58,44 +54,48 @@ func execRun(args []string) {
log.Printf("Starting Influx Server %s bound to %s...", version, config.BindAddress)
}
// Bring up the node in the state as is on disk.
var brokerURLs []*url.URL
// Read state from disk.
state, err := createStateIfNotExists(filepath.Join(config.Cluster.Dir, stateFilename))
if err != nil {
log.Fatal(err)
}
// Start up the node.
var client influxdb.MessagingClient
var server *influxdb.Server
var brokerHandler *messaging.Handler
var serverHandler *influxdb.Handler
var brokerDirExists bool
var storageDirExists bool
if state.Mode == "local" {
client = messaging.NewLoopbackClient()
log.Printf("Local messaging client created")
server = influxdb.NewServer(client)
err := server.Open(config.Storage.Dir)
if err != nil {
log.Fatalf("failed to open local Server", err.Error())
if _, err := os.Stat(config.Raft.Dir); err == nil {
brokerDirExists = true
}
if _, err := os.Stat(config.Storage.Dir); err == nil {
storageDirExists = true
}
if !brokerDirExists && !storageDirExists {
// Node is completely new, so create the minimum needed, which
// is a storage directory.
if err := os.MkdirAll(config.Storage.Dir, 0744); err != nil {
log.Fatal(err)
}
serverHandler = influxdb.NewHandler(server)
} else {
// If the Broker directory exists, open a Broker on this node.
if _, err := os.Stat(config.Raft.Dir); err == nil {
b := messaging.NewBroker()
if err := b.Open(config.Raft.Dir); err != nil {
log.Fatalf("failed to open Broker", err.Error())
}
brokerHandler = messaging.NewHandler(b)
// Flag that local mode is required.
ioutil.WriteFile(filepath.Join(config.Storage.Dir, "local"), nil, 0644)
}
// If the Broker directory exists, open a Broker on this node.
if brokerDirExists {
b := messaging.NewBroker()
if err := b.Open(config.Raft.Dir); err != nil {
log.Fatalf("failed to open Broker", err.Error())
}
brokerHandler = messaging.NewHandler(b)
}
// If the storage directory exists, open a Data node.
if storageDirExists {
var client influxdb.MessagingClient
var server *influxdb.Server
if _, err := os.Stat(filepath.Join(config.Storage.Dir, "local")); err == nil {
client = messaging.NewLoopbackClient()
log.Printf("Local messaging client created")
} else {
log.Fatalf("failed to check for Broker directory", err.Error())
}
// If a Data directory exists, open a Data node.
if _, err := os.Stat(config.Storage.Dir); err == nil {
// Create correct client here for connecting to Broker.
c := messaging.NewClient("XXX-CHANGEME-XXX")
if err := c.Open(brokerURLs); err != nil {
@ -104,16 +104,14 @@ func execRun(args []string) {
defer c.Close()
client = c
log.Printf("Cluster messaging client created")
server = influxdb.NewServer(client)
err = server.Open(config.Storage.Dir)
if err != nil {
log.Fatalf("failed to open data Server", err.Error())
}
serverHandler = influxdb.NewHandler(server)
} else {
log.Fatalf("failed to check for Broker directory", err.Error())
}
server = influxdb.NewServer(client)
err = server.Open(config.Storage.Dir)
if err != nil {
log.Fatalf("failed to open data Server", err.Error())
}
serverHandler = influxdb.NewHandler(server)
}
// TODO: startProfiler()
@ -143,25 +141,3 @@ found, then the node runs in "local" mode. "Local" mode
Write process ID to a file.
\n`, configDefaultPath)
}
// createStateIfNotExists returns the cluster state, from the file at path.
// If no file exists at path, the default state is created, and written to the path.
func createStateIfNotExists(path string) (*State, error) {
// Read state from path.
// If state doesn't exist then return a "local" state.
f, err := os.Open(path)
if os.IsNotExist(err) {
return &State{Mode: "local"}, nil
} else if err != nil {
return nil, err
}
defer f.Close()
// Decode state from file and return.
s := &State{}
if err := json.NewDecoder(f).Decode(&s); err != nil {
return nil, err
}
return s, nil
}