Merge pull request #1212 from otoolep/add_raft_endpoints
Add Raft endpoints and fix cluster init bugspull/1214/head
commit
3c6e84858d
|
@ -13,7 +13,7 @@ func execCreateCluster(args []string) {
|
|||
// Parse command flags.
|
||||
fs := flag.NewFlagSet("", flag.ExitOnError)
|
||||
var (
|
||||
configPath = fs.String("config", "", "")
|
||||
configPath = fs.String("config", configDefaultPath, "")
|
||||
role = fs.String("role", "combined", "")
|
||||
)
|
||||
fs.Usage = printCreateClusterUsage
|
||||
|
@ -48,7 +48,7 @@ func execCreateCluster(args []string) {
|
|||
}
|
||||
|
||||
// Now create the storage directory.
|
||||
if err := os.MkdirAll(config.Cluster.Dir, 0744); err != nil {
|
||||
if err := os.MkdirAll(config.Storage.Dir, 0744); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
@ -57,15 +57,18 @@ func execCreateCluster(args []string) {
|
|||
}
|
||||
|
||||
func printCreateClusterUsage() {
|
||||
log.Println(`usage: create-cluster [flags]
|
||||
log.Printf(`usage: create-cluster [flags]
|
||||
|
||||
create-cluster creates a completely new node that can act as the first node of a new
|
||||
cluster. This node must be created as a 'combined' or 'broker' node.
|
||||
|
||||
-config <path>
|
||||
Set the path to the configuration file. Defaults to %s.
|
||||
|
||||
-role <role>
|
||||
Set the role to be 'combined' or 'broker'. 'broker' means it will take part
|
||||
in Raft Distributed Consensus. 'combined' means it take part in Raft and
|
||||
store time-series data. The default role is 'combined'. Any other role other
|
||||
than these two is invalid.
|
||||
`)
|
||||
\n`, configDefaultPath)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/influxdb/influxdb"
|
||||
"github.com/influxdb/influxdb/messaging"
|
||||
)
|
||||
|
||||
// Handler represents an HTTP handler for InfluxDB node. Depending on its role, it
|
||||
// will serve many different endpoints.
|
||||
type Handler struct {
|
||||
brokerHandler *messaging.Handler
|
||||
serverHandler *influxdb.Handler
|
||||
}
|
||||
|
||||
// NewHandler returns a new instance of Handler.
|
||||
func NewHandler(b *messaging.Handler, s *influxdb.Handler) *Handler {
|
||||
return &Handler{
|
||||
brokerHandler: b,
|
||||
serverHandler: s,
|
||||
}
|
||||
}
|
||||
|
||||
// ServeHTTP responds to HTTP request to the handler.
|
||||
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
if strings.HasPrefix(r.URL.Path, "/raft") || strings.HasPrefix(r.URL.Path, "/messages") {
|
||||
h.brokerHandler.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
h.serverHandler.ServeHTTP(w, r)
|
||||
}
|
|
@ -3,6 +3,7 @@ package main
|
|||
import (
|
||||
"flag"
|
||||
"log"
|
||||
"os"
|
||||
|
||||
"github.com/influxdb/influxdb/messaging"
|
||||
)
|
||||
|
@ -12,7 +13,7 @@ func execJoinCluster(args []string) {
|
|||
// Parse command flags.
|
||||
fs := flag.NewFlagSet("", flag.ExitOnError)
|
||||
var (
|
||||
configPath = fs.String("config", "", "")
|
||||
configPath = fs.String("config", configDefaultPath, "")
|
||||
role = fs.String("role", "combined", "")
|
||||
seedServers = fs.String("seed-servers", "", "")
|
||||
)
|
||||
|
@ -44,16 +45,20 @@ func execJoinCluster(args []string) {
|
|||
|
||||
// If joining as a data node then create a data directory.
|
||||
if *role == "combined" || *role == "data" {
|
||||
// TODO: do any required data-node stuff.
|
||||
if err := os.MkdirAll(config.Storage.Dir, 0744); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("joined cluster at %s", *seedServers)
|
||||
}
|
||||
|
||||
func printJoinClusterUsage() {
|
||||
log.Println(`usage: join-cluster [flags]
|
||||
log.Printf(`usage: join-cluster [flags]
|
||||
|
||||
join-cluster creates a completely new node that will attempt to join an existing cluster.
|
||||
-config <path>
|
||||
Set the path to the configuration file. Defaults to %s.
|
||||
|
||||
-role <role>
|
||||
Set the role to be 'combined', 'broker' or 'data'. broker' means it will take
|
||||
|
@ -65,5 +70,5 @@ join-cluster creates a completely new node that will attempt to join an existing
|
|||
Set the list of servers the node should contact, to join the cluster. This
|
||||
should be comma-delimited list of servers, in the form host:port. This option
|
||||
is REQUIRED.
|
||||
`)
|
||||
\n`, configDefaultPath)
|
||||
}
|
||||
|
|
|
@ -25,6 +25,11 @@ var (
|
|||
commit string
|
||||
)
|
||||
|
||||
// Various constants used by the main package.
|
||||
const (
|
||||
configDefaultPath string = "/etc/influxdb.conf"
|
||||
)
|
||||
|
||||
func main() {
|
||||
log.SetFlags(0)
|
||||
|
||||
|
|
|
@ -23,7 +23,7 @@ func execRun(args []string) {
|
|||
// Parse command flags.
|
||||
fs := flag.NewFlagSet("", flag.ExitOnError)
|
||||
var (
|
||||
configPath = fs.String("config", "config.sample.toml", "")
|
||||
configPath = fs.String("config", configDefaultPath, "")
|
||||
pidPath = fs.String("pidfile", "", "")
|
||||
hostname = fs.String("hostname", "", "")
|
||||
)
|
||||
|
@ -67,9 +67,12 @@ func execRun(args []string) {
|
|||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// Open
|
||||
// Start up the node.
|
||||
var client influxdb.MessagingClient
|
||||
var server *influxdb.Server
|
||||
var brokerHandler *messaging.Handler
|
||||
var serverHandler *influxdb.Handler
|
||||
|
||||
if state.Mode == "local" {
|
||||
client = messaging.NewLoopbackClient()
|
||||
log.Printf("Local messaging client created")
|
||||
|
@ -78,6 +81,7 @@ func execRun(args []string) {
|
|||
if err != nil {
|
||||
log.Fatalf("failed to open local Server", err.Error())
|
||||
}
|
||||
serverHandler = influxdb.NewHandler(server)
|
||||
} else {
|
||||
// If the Broker directory exists, open a Broker on this node.
|
||||
if _, err := os.Stat(config.Raft.Dir); err == nil {
|
||||
|
@ -85,9 +89,11 @@ func execRun(args []string) {
|
|||
if err := b.Open(config.Raft.Dir); err != nil {
|
||||
log.Fatalf("failed to open Broker", err.Error())
|
||||
}
|
||||
brokerHandler = messaging.NewHandler(b)
|
||||
} else {
|
||||
log.Fatalf("failed to check for Broker directory", err.Error())
|
||||
}
|
||||
|
||||
// If a Data directory exists, open a Data node.
|
||||
if _, err := os.Stat(config.Storage.Dir); err == nil {
|
||||
// Create correct client here for connecting to Broker.
|
||||
|
@ -104,6 +110,7 @@ func execRun(args []string) {
|
|||
if err != nil {
|
||||
log.Fatalf("failed to open data Server", err.Error())
|
||||
}
|
||||
serverHandler = influxdb.NewHandler(server)
|
||||
} else {
|
||||
log.Fatalf("failed to check for Broker directory", err.Error())
|
||||
}
|
||||
|
@ -112,8 +119,8 @@ func execRun(args []string) {
|
|||
// TODO: startProfiler()
|
||||
// TODO: -reset-root
|
||||
|
||||
// Start up HTTP server with correct endpoints.
|
||||
h := influxdb.NewHandler(server)
|
||||
// Start up HTTP server
|
||||
h := NewHandler(brokerHandler, serverHandler)
|
||||
func() { log.Fatal(http.ListenAndServe(":8086", h)) }()
|
||||
|
||||
// Wait indefinitely.
|
||||
|
@ -121,20 +128,20 @@ func execRun(args []string) {
|
|||
}
|
||||
|
||||
func printRunUsage() {
|
||||
log.Println(`usage: run [flags]
|
||||
log.Printf(`usage: run [flags]
|
||||
|
||||
run starts the node with any existing cluster configuration. If no cluster configuration is
|
||||
found, then the node runs in "local" mode. "Local" mode
|
||||
|
||||
-config <path>
|
||||
Set the path to the configuration file.
|
||||
Set the path to the configuration file. Defaults to %s.
|
||||
|
||||
-hostname <name>
|
||||
Override the hostname, the 'hostname' configuration option will be overridden.
|
||||
|
||||
-pidfile <path>
|
||||
Write process ID to a file.
|
||||
`)
|
||||
\n`, configDefaultPath)
|
||||
}
|
||||
|
||||
// createStateIfNotExists returns the cluster state, from the file at path.
|
||||
|
|
Loading…
Reference in New Issue