From 73f304ca1ef8e72e02e975325057492f2fb9c652 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Thu, 11 Dec 2014 13:46:14 -0800 Subject: [PATCH 1/4] Bring up broker endpoints If a Broker is running on the node, we need ensure the handler will service its endpoints. --- cmd/influxd/handler.go | 34 ++++++++++++++++++++++++++++++++++ cmd/influxd/run.go | 13 ++++++++++--- 2 files changed, 44 insertions(+), 3 deletions(-) create mode 100644 cmd/influxd/handler.go diff --git a/cmd/influxd/handler.go b/cmd/influxd/handler.go new file mode 100644 index 0000000000..06693e8b6c --- /dev/null +++ b/cmd/influxd/handler.go @@ -0,0 +1,34 @@ +package main + +import ( + "net/http" + "strings" + + "github.com/influxdb/influxdb" + "github.com/influxdb/influxdb/messaging" +) + +// Handler represents an HTTP handler for InfluxDB node. Depending on its role, it +// will serve many different endpoints. +type Handler struct { + brokerHandler *messaging.Handler + serverHandler *influxdb.Handler +} + +// NewHandler returns a new instance of Handler. +func NewHandler(b *messaging.Handler, s *influxdb.Handler) *Handler { + return &Handler{ + brokerHandler: b, + serverHandler: s, + } +} + +// ServeHTTP responds to HTTP request to the handler. +func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if strings.HasPrefix(r.URL.Path, "/raft") || strings.HasPrefix(r.URL.Path, "/messages") { + h.brokerHandler.ServeHTTP(w, r) + return + } + + h.serverHandler.ServeHTTP(w, r) +} diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index 4b68eef52a..b9d844768e 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -67,9 +67,12 @@ func execRun(args []string) { log.Fatal(err) } - // Open + // Start up the node. var client influxdb.MessagingClient var server *influxdb.Server + var brokerHandler *messaging.Handler + var serverHandler *influxdb.Handler + if state.Mode == "local" { client = messaging.NewLoopbackClient() log.Printf("Local messaging client created") @@ -78,6 +81,7 @@ func execRun(args []string) { if err != nil { log.Fatalf("failed to open local Server", err.Error()) } + serverHandler = influxdb.NewHandler(server) } else { // If the Broker directory exists, open a Broker on this node. if _, err := os.Stat(config.Raft.Dir); err == nil { @@ -85,9 +89,11 @@ func execRun(args []string) { if err := b.Open(config.Raft.Dir); err != nil { log.Fatalf("failed to open Broker", err.Error()) } + brokerHandler = messaging.NewHandler(b) } else { log.Fatalf("failed to check for Broker directory", err.Error()) } + // If a Data directory exists, open a Data node. if _, err := os.Stat(config.Storage.Dir); err == nil { // Create correct client here for connecting to Broker. @@ -104,6 +110,7 @@ func execRun(args []string) { if err != nil { log.Fatalf("failed to open data Server", err.Error()) } + serverHandler = influxdb.NewHandler(server) } else { log.Fatalf("failed to check for Broker directory", err.Error()) } @@ -112,8 +119,8 @@ func execRun(args []string) { // TODO: startProfiler() // TODO: -reset-root - // Start up HTTP server with correct endpoints. - h := influxdb.NewHandler(server) + // Start up HTTP server + h := NewHandler(brokerHandler, serverHandler) func() { log.Fatal(http.ListenAndServe(":8086", h)) }() // Wait indefinitely. From dbd9dcbe0f4828334c265678b281115b51d14b55 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Thu, 11 Dec 2014 13:49:42 -0800 Subject: [PATCH 2/4] Add missing default config paths and help They were missing for 'create-cluster' and 'join-cluster'. --- cmd/influxd/create_cluster.go | 9 ++++++--- cmd/influxd/join_cluster.go | 8 +++++--- cmd/influxd/main.go | 5 +++++ cmd/influxd/run.go | 8 ++++---- 4 files changed, 20 insertions(+), 10 deletions(-) diff --git a/cmd/influxd/create_cluster.go b/cmd/influxd/create_cluster.go index de2c79b2c1..00e125b8eb 100644 --- a/cmd/influxd/create_cluster.go +++ b/cmd/influxd/create_cluster.go @@ -13,7 +13,7 @@ func execCreateCluster(args []string) { // Parse command flags. fs := flag.NewFlagSet("", flag.ExitOnError) var ( - configPath = fs.String("config", "", "") + configPath = fs.String("config", configDefaultPath, "") role = fs.String("role", "combined", "") ) fs.Usage = printCreateClusterUsage @@ -57,15 +57,18 @@ func execCreateCluster(args []string) { } func printCreateClusterUsage() { - log.Println(`usage: create-cluster [flags] + log.Printf(`usage: create-cluster [flags] create-cluster creates a completely new node that can act as the first node of a new cluster. This node must be created as a 'combined' or 'broker' node. + -config + Set the path to the configuration file. Defaults to %s. + -role Set the role to be 'combined' or 'broker'. 'broker' means it will take part in Raft Distributed Consensus. 'combined' means it take part in Raft and store time-series data. The default role is 'combined'. Any other role other than these two is invalid. -`) +\n`, configDefaultPath) } diff --git a/cmd/influxd/join_cluster.go b/cmd/influxd/join_cluster.go index a2adc2c0cd..75148c8757 100644 --- a/cmd/influxd/join_cluster.go +++ b/cmd/influxd/join_cluster.go @@ -12,7 +12,7 @@ func execJoinCluster(args []string) { // Parse command flags. fs := flag.NewFlagSet("", flag.ExitOnError) var ( - configPath = fs.String("config", "", "") + configPath = fs.String("config", configDefaultPath, "") role = fs.String("role", "combined", "") seedServers = fs.String("seed-servers", "", "") ) @@ -51,9 +51,11 @@ func execJoinCluster(args []string) { } func printJoinClusterUsage() { - log.Println(`usage: join-cluster [flags] + log.Printf(`usage: join-cluster [flags] join-cluster creates a completely new node that will attempt to join an existing cluster. + -config + Set the path to the configuration file. Defaults to %s. -role Set the role to be 'combined', 'broker' or 'data'. broker' means it will take @@ -65,5 +67,5 @@ join-cluster creates a completely new node that will attempt to join an existing Set the list of servers the node should contact, to join the cluster. This should be comma-delimited list of servers, in the form host:port. This option is REQUIRED. -`) +\n`, configDefaultPath) } diff --git a/cmd/influxd/main.go b/cmd/influxd/main.go index b8ba3b9ad7..4e3b56cf73 100644 --- a/cmd/influxd/main.go +++ b/cmd/influxd/main.go @@ -25,6 +25,11 @@ var ( commit string ) +// Various constants used by the main package. +const ( + configDefaultPath string = "/etc/influxdb.conf" +) + func main() { log.SetFlags(0) diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index b9d844768e..31bf43f89e 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -23,7 +23,7 @@ func execRun(args []string) { // Parse command flags. fs := flag.NewFlagSet("", flag.ExitOnError) var ( - configPath = fs.String("config", "config.sample.toml", "") + configPath = fs.String("config", configDefaultPath, "") pidPath = fs.String("pidfile", "", "") hostname = fs.String("hostname", "", "") ) @@ -128,20 +128,20 @@ func execRun(args []string) { } func printRunUsage() { - log.Println(`usage: run [flags] + log.Printf(`usage: run [flags] run starts the node with any existing cluster configuration. If no cluster configuration is found, then the node runs in "local" mode. "Local" mode -config - Set the path to the configuration file. + Set the path to the configuration file. Defaults to %s. -hostname Override the hostname, the 'hostname' configuration option will be overridden. -pidfile Write process ID to a file. -`) +\n`, configDefaultPath) } // createStateIfNotExists returns the cluster state, from the file at path. From 71da5c4a4a7dfc6f32be33db6c8fda84ae769a34 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Thu, 11 Dec 2014 14:18:48 -0800 Subject: [PATCH 3/4] Correct path of storage dir for 'create-cluster' --- cmd/influxd/create_cluster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/influxd/create_cluster.go b/cmd/influxd/create_cluster.go index 00e125b8eb..226e673de3 100644 --- a/cmd/influxd/create_cluster.go +++ b/cmd/influxd/create_cluster.go @@ -48,7 +48,7 @@ func execCreateCluster(args []string) { } // Now create the storage directory. - if err := os.MkdirAll(config.Cluster.Dir, 0744); err != nil { + if err := os.MkdirAll(config.Storage.Dir, 0744); err != nil { log.Fatal(err) } } From 918c0562be052cf14207d9178b3a4c2b915d2f87 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Thu, 11 Dec 2014 14:20:25 -0800 Subject: [PATCH 4/4] Create storage dir for 'join-cluster' if needed --- cmd/influxd/join_cluster.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cmd/influxd/join_cluster.go b/cmd/influxd/join_cluster.go index 75148c8757..613f05c74c 100644 --- a/cmd/influxd/join_cluster.go +++ b/cmd/influxd/join_cluster.go @@ -3,6 +3,7 @@ package main import ( "flag" "log" + "os" "github.com/influxdb/influxdb/messaging" ) @@ -44,7 +45,9 @@ func execJoinCluster(args []string) { // If joining as a data node then create a data directory. if *role == "combined" || *role == "data" { - // TODO: do any required data-node stuff. + if err := os.MkdirAll(config.Storage.Dir, 0744); err != nil { + log.Fatal(err) + } } log.Printf("joined cluster at %s", *seedServers)