diff --git a/cmd/influxd/config.go b/cmd/influxd/config.go index 0e18640b88..5a9f23eb5d 100644 --- a/cmd/influxd/config.go +++ b/cmd/influxd/config.go @@ -3,6 +3,8 @@ package main import ( "fmt" "os" + "os/user" + "path/filepath" "strconv" "time" @@ -20,9 +22,14 @@ const ( // can be queried concurrently at one time. DefaultConcurrentShardQueryLimit = 10 - // DefaultAPIReadTimeout represents the amount time before an API request - // times out. + // DefaultAPIReadTimeout represents the duration before an API request times out. DefaultAPIReadTimeout = 5 * time.Second + + // DefaultBrokerPort represents the default port the broker runs on. + DefaultBrokerPort = 8086 + + // DefaultHTTPAPIPort represents the default port the HTTP API runs on. + DefaultHTTPAPIPort = 8086 ) // Config represents the configuration format for the influxd binary. @@ -100,14 +107,20 @@ type Config struct { // NewConfig returns an instance of Config with reasonable defaults. func NewConfig() *Config { + u, _ := user.Current() + c := &Config{} c.Data.RetentionSweepPeriod = Duration(10 * time.Minute) c.Cluster.ConcurrentShardQueryLimit = DefaultConcurrentShardQueryLimit + c.Broker.Dir = filepath.Join(u.HomeDir, ".influxdb/broker") + c.Broker.Port = DefaultBrokerPort c.Broker.Timeout = Duration(1 * time.Second) + c.HTTPAPI.Port = DefaultHTTPAPIPort c.HTTPAPI.ReadTimeout = Duration(DefaultAPIReadTimeout) c.Cluster.MinBackoff = Duration(1 * time.Second) c.Cluster.MaxBackoff = Duration(10 * time.Second) c.Cluster.ProtobufHeartbeatInterval = Duration(10 * time.Millisecond) + c.Data.Dir = filepath.Join(u.HomeDir, ".influxdb/data") c.Data.WriteBufferSize = 1000 c.Cluster.WriteBufferSize = 1000 c.Cluster.MaxResponseBufferSize = 100 diff --git a/cmd/influxd/config_test.go b/cmd/influxd/config_test.go index 9fb266ce2d..c01122e7db 100644 --- a/cmd/influxd/config_test.go +++ b/cmd/influxd/config_test.go @@ -53,7 +53,7 @@ func TestParseConfig(t *testing.T) { t.Fatalf("admin assets mismatch: %v", c.Admin.Assets) } - if c.HTTPAPI.Port != 0 { + if c.HTTPAPI.Port != 8086 { t.Fatalf("http api port mismatch: %v", c.HTTPAPI.Port) } else if c.HTTPAPI.SSLPort != 8087 { t.Fatalf("http api ssl port mismatch: %v", c.HTTPAPI.SSLPort) diff --git a/cmd/influxd/main.go b/cmd/influxd/main.go index a8f37d8153..4b8cdce1f0 100644 --- a/cmd/influxd/main.go +++ b/cmd/influxd/main.go @@ -56,8 +56,6 @@ func main() { // Extract name from args. switch cmd { - case "create-cluster": - execCreateCluster(args[1:]) case "join-cluster": execJoinCluster(args[1:]) case "run": @@ -103,7 +101,6 @@ Usage: The commands are: - create-cluster create a new node that other nodes can join to form a new cluster join-cluster create a new node that will join an existing cluster run run node with existing configuration version displays the InfluxDB version diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index ff36700518..a001982b5f 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -28,19 +28,62 @@ func execRun(args []string) { fs.Usage = printRunUsage fs.Parse(args) + // Parse broker urls from seed servers. + brokerURLs := parseSeedServers(*seedServers) + // Print sweet InfluxDB logo and write the process id to file. log.Print(logo) writePIDFile(*pidPath) - // Parse the configuration and open the broker & server, if applicable. + // Parse the configuration and determine if a broker and/or server exist. config := parseConfig(*configPath, *hostname) - b := openBroker(config.Broker.Dir, config.BrokerConnectionString()) - s := openServer(config.Data.Dir, strings.Split(*seedServers, ",")) + hasBroker := fileExists(config.Broker.Dir) + hasServer := fileExists(config.Data.Dir) + initializing := !hasBroker && !hasServer - // Start the HTTP service(s). - listenAndServe(b, s, config.BrokerListenAddr(), config.ApiHTTPListenAddr()) + // Open broker if it exists or if we're initializing for the first time. + var b *messaging.Broker + var h *Handler + if hasBroker || initializing { + b = openBroker(config.Broker.Dir, config.BrokerConnectionString()) - // TODO: Initialize, if necessary. + // If this is the first time running then initialize a broker. + // Update the seed server so the server can connect locally. + if initializing { + if err := b.Initialize(); err != nil { + log.Fatalf("initialize: %s", err) + } + } + + // Start the broker handler. + h = &Handler{brokerHandler: messaging.NewHandler(b)} + go func() { log.Fatal(http.ListenAndServe(config.BrokerListenAddr(), h)) }() + log.Printf("Broker running on %s", config.BrokerListenAddr()) + } + + // Open server if it exists or we're initializing for the first time. + var s *influxdb.Server + if hasServer || initializing { + s = openServer(config.Data.Dir) + + // If the server is uninitialized then initialize it with the broker. + // Otherwise simply create a messaging client with the server id. + if s.ID() == 0 { + initServer(s, b) + } else { + openServerClient(s, brokerURLs) + } + + // Start the server handler. + // If it uses the same port as the broker then simply attach it. + sh := influxdb.NewHandler(s) + if config.BrokerListenAddr() == config.ApiHTTPListenAddr() { + h.serverHandler = sh + } else { + go func() { log.Fatal(http.ListenAndServe(config.ApiHTTPListenAddr(), sh)) }() + } + log.Printf("DataNode#%d running on %s", s.ID(), config.ApiHTTPListenAddr()) + } // Wait indefinitely. <-(chan struct{})(nil) @@ -63,7 +106,9 @@ func writePIDFile(path string) { func parseConfig(path, hostname string) *Config { // Parse configuration. config, err := ParseConfigFile(path) - if err != nil { + if os.IsNotExist(err) { + config = NewConfig() + } else if err != nil { log.Fatalf("config: %s", err) } @@ -76,91 +121,74 @@ func parseConfig(path, hostname string) *Config { } // creates and initializes a broker at a given path. -// Ignored if there is no broker directory. func openBroker(path, addr string) *messaging.Broker { - // Ignore if there's no broker directory. - if _, err := os.Stat(path); os.IsNotExist(err) { - return nil - } - - // If the Broker directory exists, open a Broker on this node. b := messaging.NewBroker() if err := b.Open(path, addr); err != nil { - log.Fatalf("failed to open Broker", err.Error()) + log.Fatalf("failed to open broker: %s", err) } return b } // creates and initializes a server at a given path. -// Ignored if there is no data directory. -func openServer(path string, seedServers []string) *influxdb.Server { - // Ignore if the data directory doesn't exists. - if _, err := os.Stat(path); os.IsNotExist(err) { - return nil - } - - // Create and open server +func openServer(path string) *influxdb.Server { s := influxdb.NewServer() if err := s.Open(path); err != nil { log.Fatalf("failed to open data server", err.Error()) } - - // Open messaging client to communicate with the brokers. - var brokerURLs []*url.URL - for _, s := range seedServers { - u, err := url.Parse(s) - if err != nil { - log.Fatalf("cannot parse seed server: %s", err) - } - brokerURLs = append(brokerURLs, u) - } - - // Initialize the messaging client. - c := messaging.NewClient(s.ID()) - if err := c.Open(filepath.Join(path, messagingClientFile), brokerURLs); err != nil { - log.Fatalf("error opening messaging client: %s", err) - } - - // Assign the client to the server. - if err := s.SetClient(c); err != nil { - log.Fatalf("set messaging client: %s", err) - } - return s } -// starts handlers for the broker and server. -// If the broker and server are running on the same port then combine them. -func listenAndServe(b *messaging.Broker, s *influxdb.Server, brokerAddr, serverAddr string) { - // Initialize handlers. - var bh, sh http.Handler - if b != nil { - bh = messaging.NewHandler(b) - } - if s != nil { - sh = influxdb.NewHandler(s) +// initializes a new server that does not yet have an ID. +func initServer(s *influxdb.Server, b *messaging.Broker) { + // Create replica on broker. + if err := b.CreateReplica(1); err != nil { + log.Fatalf("replica creation error: %d", err) } - // Combine handlers if they are using the same bind address. - if brokerAddr == serverAddr { - go func() { log.Fatal(http.ListenAndServe(brokerAddr, NewHandler(bh, sh))) }() - } else { - // Otherwise start the handlers on separate ports. - if sh != nil { - go func() { log.Fatal(http.ListenAndServe(serverAddr, sh)) }() - } - if bh != nil { - go func() { log.Fatal(http.ListenAndServe(brokerAddr, bh)) }() + // Initialize messaging client. + c := messaging.NewClient(1) + if err := c.Open(filepath.Join(s.Path(), messagingClientFile), []*url.URL{b.URL()}); err != nil { + log.Fatalf("messaging client error: %s", err) + } + if err := s.SetClient(c); err != nil { + log.Fatalf("set client error: %s", err) + } + + // Initialize the server. + if err := s.Initialize(b.URL()); err != nil { + log.Fatalf("server initialization error: %s", err) + } +} + +// opens the messaging client and attaches it to the server. +func openServerClient(s *influxdb.Server, brokerURLs []*url.URL) { + c := messaging.NewClient(s.ID()) + if err := c.Open(filepath.Join(s.Path(), messagingClientFile), brokerURLs); err != nil { + log.Fatalf("messaging client error: %s", err) + } + if err := s.SetClient(c); err != nil { + log.Fatalf("set client error: %s", err) + } +} + +// parses a comma-delimited list of URLs. +func parseSeedServers(s string) (a []*url.URL) { + for _, s := range strings.Split(s, ",") { + u, err := url.Parse(s) + if err != nil { + log.Fatalf("cannot parse seed servers: %s", err) } + a = append(a, u) } + return +} - // Log the handlers starting up. - if serverAddr == "" { - log.Printf("Starting Influx Server %s...", version) - } else { - log.Printf("Starting Influx Server %s bound to %s...", version, serverAddr) +// returns true if the file exists. +func fileExists(path string) bool { + if _, err := os.Stat(path); os.IsNotExist(err) { + return false } - + return true } func printRunUsage() { diff --git a/messaging/broker.go b/messaging/broker.go index 1e9bb5a961..401fde8e9d 100644 --- a/messaging/broker.go +++ b/messaging/broker.go @@ -101,6 +101,11 @@ func (b *Broker) Close() error { return nil } +// URL returns the connection url for the broker. +func (b *Broker) URL() *url.URL { + return b.log.URL +} + // Initialize creates a new cluster. func (b *Broker) Initialize() error { if err := b.log.Initialize(); err != nil { diff --git a/server.go b/server.go index 689933ba60..856c826ed6 100644 --- a/server.go +++ b/server.go @@ -292,6 +292,32 @@ func (s *Server) sync(index uint64) error { } } +// Initialize creates a new data node and initializes the server's id to 1. +func (s *Server) Initialize(u *url.URL) error { + // Create a new data node. + if err := s.CreateDataNode(u); err != nil { + return err + } + + // Ensure the data node returns with an ID of 1. + // If it doesn't then something went really wrong. We have to panic because + // the messaging client relies on the first server being assigned ID 1. + n := s.DataNodeByURL(u) + assert(n != nil && n.ID == 1, "invalid initial server id: %d", n.ID) + + // Set the ID on the metastore. + if err := s.meta.mustUpdate(func(tx *metatx) error { + return tx.setID(n.ID) + }); err != nil { + return err + } + + // Set the ID on the server. + s.id = 1 + + return nil +} + // DataNode returns a data node by id. func (s *Server) DataNode(id uint64) *DataNode { s.mu.RLock()