2014-12-06 01:02:30 +00:00
|
|
|
package main
|
|
|
|
|
|
|
|
import (
|
|
|
|
"flag"
|
|
|
|
"io/ioutil"
|
|
|
|
"log"
|
|
|
|
"net/http"
|
|
|
|
"net/url"
|
|
|
|
"os"
|
|
|
|
"path/filepath"
|
|
|
|
"strconv"
|
2014-12-16 07:03:09 +00:00
|
|
|
"strings"
|
2014-12-06 01:02:30 +00:00
|
|
|
|
|
|
|
"github.com/influxdb/influxdb"
|
|
|
|
"github.com/influxdb/influxdb/messaging"
|
|
|
|
)
|
|
|
|
|
|
|
|
// execRun runs the "run" command.
|
|
|
|
func execRun(args []string) {
|
|
|
|
// Parse command flags.
|
|
|
|
fs := flag.NewFlagSet("", flag.ExitOnError)
|
|
|
|
var (
|
2014-12-16 07:03:09 +00:00
|
|
|
configPath = fs.String("config", configDefaultPath, "")
|
|
|
|
pidPath = fs.String("pidfile", "", "")
|
|
|
|
hostname = fs.String("hostname", "", "")
|
|
|
|
seedServers = fs.String("seed-servers", "", "")
|
2014-12-06 01:02:30 +00:00
|
|
|
)
|
|
|
|
fs.Usage = printRunUsage
|
|
|
|
fs.Parse(args)
|
|
|
|
|
2014-12-31 19:42:53 +00:00
|
|
|
// Parse broker urls from seed servers.
|
|
|
|
brokerURLs := parseSeedServers(*seedServers)
|
|
|
|
|
2014-12-30 22:46:50 +00:00
|
|
|
// Print sweet InfluxDB logo and write the process id to file.
|
|
|
|
log.Print(logo)
|
|
|
|
writePIDFile(*pidPath)
|
|
|
|
|
2014-12-31 19:42:53 +00:00
|
|
|
// Parse the configuration and determine if a broker and/or server exist.
|
2014-12-30 22:46:50 +00:00
|
|
|
config := parseConfig(*configPath, *hostname)
|
2014-12-31 19:42:53 +00:00
|
|
|
hasBroker := fileExists(config.Broker.Dir)
|
|
|
|
hasServer := fileExists(config.Data.Dir)
|
|
|
|
initializing := !hasBroker && !hasServer
|
|
|
|
|
|
|
|
// Open broker if it exists or if we're initializing for the first time.
|
|
|
|
var b *messaging.Broker
|
|
|
|
var h *Handler
|
|
|
|
if hasBroker || initializing {
|
|
|
|
b = openBroker(config.Broker.Dir, config.BrokerConnectionString())
|
|
|
|
|
|
|
|
// If this is the first time running then initialize a broker.
|
|
|
|
// Update the seed server so the server can connect locally.
|
|
|
|
if initializing {
|
|
|
|
if err := b.Initialize(); err != nil {
|
|
|
|
log.Fatalf("initialize: %s", err)
|
|
|
|
}
|
|
|
|
}
|
2014-12-30 22:46:50 +00:00
|
|
|
|
2014-12-31 19:42:53 +00:00
|
|
|
// Start the broker handler.
|
|
|
|
h = &Handler{brokerHandler: messaging.NewHandler(b)}
|
|
|
|
go func() { log.Fatal(http.ListenAndServe(config.BrokerListenAddr(), h)) }()
|
|
|
|
log.Printf("Broker running on %s", config.BrokerListenAddr())
|
|
|
|
}
|
2014-12-30 22:46:50 +00:00
|
|
|
|
2014-12-31 19:42:53 +00:00
|
|
|
// Open server if it exists or we're initializing for the first time.
|
|
|
|
var s *influxdb.Server
|
|
|
|
if hasServer || initializing {
|
|
|
|
s = openServer(config.Data.Dir)
|
|
|
|
|
|
|
|
// If the server is uninitialized then initialize it with the broker.
|
|
|
|
// Otherwise simply create a messaging client with the server id.
|
|
|
|
if s.ID() == 0 {
|
|
|
|
initServer(s, b)
|
|
|
|
} else {
|
|
|
|
openServerClient(s, brokerURLs)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Start the server handler.
|
|
|
|
// If it uses the same port as the broker then simply attach it.
|
|
|
|
sh := influxdb.NewHandler(s)
|
|
|
|
if config.BrokerListenAddr() == config.ApiHTTPListenAddr() {
|
|
|
|
h.serverHandler = sh
|
|
|
|
} else {
|
|
|
|
go func() { log.Fatal(http.ListenAndServe(config.ApiHTTPListenAddr(), sh)) }()
|
|
|
|
}
|
|
|
|
log.Printf("DataNode#%d running on %s", s.ID(), config.ApiHTTPListenAddr())
|
|
|
|
}
|
2014-12-30 22:46:50 +00:00
|
|
|
|
|
|
|
// Wait indefinitely.
|
|
|
|
<-(chan struct{})(nil)
|
|
|
|
}
|
|
|
|
|
|
|
|
// write the current process id to a file specified by path.
|
|
|
|
func writePIDFile(path string) {
|
|
|
|
if path == "" {
|
|
|
|
return
|
2014-12-06 01:02:30 +00:00
|
|
|
}
|
|
|
|
|
2014-12-30 22:46:50 +00:00
|
|
|
// Retrieve the PID and write it.
|
|
|
|
pid := strconv.Itoa(os.Getpid())
|
|
|
|
if err := ioutil.WriteFile(path, []byte(pid), 0644); err != nil {
|
|
|
|
log.Fatal(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// parses the configuration from a given path. Sets overrides as needed.
|
|
|
|
func parseConfig(path, hostname string) *Config {
|
2014-12-06 01:02:30 +00:00
|
|
|
// Parse configuration.
|
2014-12-30 22:46:50 +00:00
|
|
|
config, err := ParseConfigFile(path)
|
2014-12-31 19:42:53 +00:00
|
|
|
if os.IsNotExist(err) {
|
|
|
|
config = NewConfig()
|
|
|
|
} else if err != nil {
|
2014-12-06 01:02:30 +00:00
|
|
|
log.Fatalf("config: %s", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Override config properties.
|
2014-12-30 22:46:50 +00:00
|
|
|
if hostname != "" {
|
|
|
|
config.Hostname = hostname
|
2014-12-06 01:02:30 +00:00
|
|
|
}
|
|
|
|
|
2014-12-30 22:46:50 +00:00
|
|
|
return config
|
|
|
|
}
|
2014-12-06 01:02:30 +00:00
|
|
|
|
2014-12-30 22:46:50 +00:00
|
|
|
// creates and initializes a broker at a given path.
|
|
|
|
func openBroker(path, addr string) *messaging.Broker {
|
|
|
|
b := messaging.NewBroker()
|
|
|
|
if err := b.Open(path, addr); err != nil {
|
2014-12-31 19:42:53 +00:00
|
|
|
log.Fatalf("failed to open broker: %s", err)
|
2014-12-16 03:35:26 +00:00
|
|
|
}
|
2014-12-30 22:46:50 +00:00
|
|
|
return b
|
|
|
|
}
|
|
|
|
|
|
|
|
// creates and initializes a server at a given path.
|
2014-12-31 19:42:53 +00:00
|
|
|
func openServer(path string) *influxdb.Server {
|
2014-12-30 22:46:50 +00:00
|
|
|
s := influxdb.NewServer()
|
|
|
|
if err := s.Open(path); err != nil {
|
|
|
|
log.Fatalf("failed to open data server", err.Error())
|
2014-12-16 03:35:26 +00:00
|
|
|
}
|
2014-12-31 19:42:53 +00:00
|
|
|
return s
|
|
|
|
}
|
2014-12-16 03:35:26 +00:00
|
|
|
|
2014-12-31 19:42:53 +00:00
|
|
|
// initializes a new server that does not yet have an ID.
|
|
|
|
func initServer(s *influxdb.Server, b *messaging.Broker) {
|
|
|
|
// Create replica on broker.
|
|
|
|
if err := b.CreateReplica(1); err != nil {
|
|
|
|
log.Fatalf("replica creation error: %d", err)
|
2014-12-16 03:35:26 +00:00
|
|
|
}
|
2014-12-11 21:46:14 +00:00
|
|
|
|
2014-12-31 19:42:53 +00:00
|
|
|
// Initialize messaging client.
|
|
|
|
c := messaging.NewClient(1)
|
|
|
|
if err := c.Open(filepath.Join(s.Path(), messagingClientFile), []*url.URL{b.URL()}); err != nil {
|
|
|
|
log.Fatalf("messaging client error: %s", err)
|
2014-12-30 22:46:50 +00:00
|
|
|
}
|
|
|
|
if err := s.SetClient(c); err != nil {
|
2014-12-31 19:42:53 +00:00
|
|
|
log.Fatalf("set client error: %s", err)
|
2014-12-06 01:02:30 +00:00
|
|
|
}
|
|
|
|
|
2014-12-31 19:42:53 +00:00
|
|
|
// Initialize the server.
|
|
|
|
if err := s.Initialize(b.URL()); err != nil {
|
|
|
|
log.Fatalf("server initialization error: %s", err)
|
|
|
|
}
|
2014-12-30 22:46:50 +00:00
|
|
|
}
|
2014-12-06 01:02:30 +00:00
|
|
|
|
2014-12-31 19:42:53 +00:00
|
|
|
// opens the messaging client and attaches it to the server.
|
|
|
|
func openServerClient(s *influxdb.Server, brokerURLs []*url.URL) {
|
|
|
|
c := messaging.NewClient(s.ID())
|
|
|
|
if err := c.Open(filepath.Join(s.Path(), messagingClientFile), brokerURLs); err != nil {
|
|
|
|
log.Fatalf("messaging client error: %s", err)
|
2014-12-30 22:46:50 +00:00
|
|
|
}
|
2014-12-31 19:42:53 +00:00
|
|
|
if err := s.SetClient(c); err != nil {
|
|
|
|
log.Fatalf("set client error: %s", err)
|
|
|
|
}
|
|
|
|
}
|
2014-12-30 22:46:50 +00:00
|
|
|
|
2014-12-31 19:42:53 +00:00
|
|
|
// parses a comma-delimited list of URLs.
|
|
|
|
func parseSeedServers(s string) (a []*url.URL) {
|
|
|
|
for _, s := range strings.Split(s, ",") {
|
|
|
|
u, err := url.Parse(s)
|
|
|
|
if err != nil {
|
|
|
|
log.Fatalf("cannot parse seed servers: %s", err)
|
|
|
|
}
|
|
|
|
a = append(a, u)
|
2014-12-16 06:05:01 +00:00
|
|
|
}
|
2014-12-31 19:42:53 +00:00
|
|
|
return
|
|
|
|
}
|
2014-12-06 01:02:30 +00:00
|
|
|
|
2014-12-31 19:42:53 +00:00
|
|
|
// returns true if the file exists.
|
|
|
|
func fileExists(path string) bool {
|
|
|
|
if _, err := os.Stat(path); os.IsNotExist(err) {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
return true
|
2014-12-06 01:02:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func printRunUsage() {
|
2014-12-11 21:49:42 +00:00
|
|
|
log.Printf(`usage: run [flags]
|
2014-12-06 01:02:30 +00:00
|
|
|
|
|
|
|
run starts the node with any existing cluster configuration. If no cluster configuration is
|
2014-12-16 07:03:09 +00:00
|
|
|
found, then the node runs in "local" mode. "Local" mode is a single-node mode that does not
|
|
|
|
use Distributed Consensus, but is otherwise fully-functional.
|
2014-12-06 01:02:30 +00:00
|
|
|
|
|
|
|
-config <path>
|
2014-12-11 21:49:42 +00:00
|
|
|
Set the path to the configuration file. Defaults to %s.
|
2014-12-06 01:02:30 +00:00
|
|
|
|
|
|
|
-hostname <name>
|
|
|
|
Override the hostname, the 'hostname' configuration option will be overridden.
|
|
|
|
|
2014-12-18 19:35:03 +00:00
|
|
|
-seed-servers <servers>
|
2014-12-16 07:03:09 +00:00
|
|
|
If joining a cluster, overrides any previously configured or discovered
|
|
|
|
Data node seed servers.
|
|
|
|
|
2014-12-06 01:02:30 +00:00
|
|
|
-pidfile <path>
|
|
|
|
Write process ID to a file.
|
2014-12-16 07:03:09 +00:00
|
|
|
`, configDefaultPath)
|
2014-12-06 01:02:30 +00:00
|
|
|
}
|