Merge pull request #1280 from influxdb/combine-exec

Create cluster on initial run
pull/1281/head
Ben Johnson 2014-12-31 13:55:04 -07:00
commit 80977aa375
6 changed files with 161 additions and 77 deletions

View File

@ -3,6 +3,8 @@ package main
import (
"fmt"
"os"
"os/user"
"path/filepath"
"strconv"
"time"
@ -20,9 +22,14 @@ const (
// can be queried concurrently at one time.
DefaultConcurrentShardQueryLimit = 10
// DefaultAPIReadTimeout represents the amount time before an API request
// times out.
// DefaultAPIReadTimeout represents the duration before an API request times out.
DefaultAPIReadTimeout = 5 * time.Second
// DefaultBrokerPort represents the default port the broker runs on.
DefaultBrokerPort = 8086
// DefaultHTTPAPIPort represents the default port the HTTP API runs on.
DefaultHTTPAPIPort = 8086
)
// Config represents the configuration format for the influxd binary.
@ -100,14 +107,20 @@ type Config struct {
// NewConfig returns an instance of Config with reasonable defaults.
func NewConfig() *Config {
u, _ := user.Current()
c := &Config{}
c.Data.RetentionSweepPeriod = Duration(10 * time.Minute)
c.Cluster.ConcurrentShardQueryLimit = DefaultConcurrentShardQueryLimit
c.Broker.Dir = filepath.Join(u.HomeDir, ".influxdb/broker")
c.Broker.Port = DefaultBrokerPort
c.Broker.Timeout = Duration(1 * time.Second)
c.HTTPAPI.Port = DefaultHTTPAPIPort
c.HTTPAPI.ReadTimeout = Duration(DefaultAPIReadTimeout)
c.Cluster.MinBackoff = Duration(1 * time.Second)
c.Cluster.MaxBackoff = Duration(10 * time.Second)
c.Cluster.ProtobufHeartbeatInterval = Duration(10 * time.Millisecond)
c.Data.Dir = filepath.Join(u.HomeDir, ".influxdb/data")
c.Data.WriteBufferSize = 1000
c.Cluster.WriteBufferSize = 1000
c.Cluster.MaxResponseBufferSize = 100

View File

@ -53,7 +53,7 @@ func TestParseConfig(t *testing.T) {
t.Fatalf("admin assets mismatch: %v", c.Admin.Assets)
}
if c.HTTPAPI.Port != 0 {
if c.HTTPAPI.Port != main.DefaultBrokerPort {
t.Fatalf("http api port mismatch: %v", c.HTTPAPI.Port)
} else if c.HTTPAPI.SSLPort != 8087 {
t.Fatalf("http api ssl port mismatch: %v", c.HTTPAPI.SSLPort)

View File

@ -56,8 +56,6 @@ func main() {
// Extract name from args.
switch cmd {
case "create-cluster":
execCreateCluster(args[1:])
case "join-cluster":
execJoinCluster(args[1:])
case "run":
@ -103,7 +101,6 @@ Usage:
The commands are:
create-cluster create a new node that other nodes can join to form a new cluster
join-cluster create a new node that will join an existing cluster
run run node with existing configuration
version displays the InfluxDB version

View File

@ -22,25 +22,74 @@ func execRun(args []string) {
var (
configPath = fs.String("config", configDefaultPath, "")
pidPath = fs.String("pidfile", "", "")
role = fs.String("role", "combined", "")
hostname = fs.String("hostname", "", "")
seedServers = fs.String("seed-servers", "", "")
)
fs.Usage = printRunUsage
fs.Parse(args)
// Validate CLI flags.
if *role != "combined" && *role != "broker" && *role != "data" {
log.Fatalf("role must be 'combined', 'broker', or 'data'")
}
// Parse broker urls from seed servers.
brokerURLs := parseSeedServers(*seedServers)
// Print sweet InfluxDB logo and write the process id to file.
log.Print(logo)
writePIDFile(*pidPath)
// Parse the configuration and open the broker & server, if applicable.
// Parse the configuration and determine if a broker and/or server exist.
config := parseConfig(*configPath, *hostname)
b := openBroker(config.Broker.Dir, config.BrokerConnectionString())
s := openServer(config.Data.Dir, strings.Split(*seedServers, ","))
hasBroker := fileExists(config.Broker.Dir)
hasServer := fileExists(config.Data.Dir)
initializing := !hasBroker && !hasServer
// Start the HTTP service(s).
listenAndServe(b, s, config.BrokerListenAddr(), config.ApiHTTPListenAddr())
// Open broker if it exists or if we're initializing for the first time.
var b *messaging.Broker
var h *Handler
if hasBroker || (initializing && (*role == "combined" || *role == "broker")) {
b = openBroker(config.Broker.Dir, config.BrokerConnectionString())
// TODO: Initialize, if necessary.
// If this is the first time running then initialize a broker.
// Update the seed server so the server can connect locally.
if initializing {
if err := b.Initialize(); err != nil {
log.Fatalf("initialize: %s", err)
}
}
// Start the broker handler.
h = &Handler{brokerHandler: messaging.NewHandler(b)}
go func() { log.Fatal(http.ListenAndServe(config.BrokerListenAddr(), h)) }()
log.Printf("Broker running on %s", config.BrokerListenAddr())
}
// Open server if it exists or we're initializing for the first time.
var s *influxdb.Server
if hasServer || (initializing && (*role == "combined" || *role == "data")) {
s = openServer(config.Data.Dir)
// If the server is uninitialized then initialize it with the broker.
// Otherwise simply create a messaging client with the server id.
if s.ID() == 0 {
initServer(s, b)
} else {
openServerClient(s, brokerURLs)
}
// Start the server handler.
// If it uses the same port as the broker then simply attach it.
sh := influxdb.NewHandler(s)
if config.BrokerListenAddr() == config.ApiHTTPListenAddr() {
h.serverHandler = sh
} else {
go func() { log.Fatal(http.ListenAndServe(config.ApiHTTPListenAddr(), sh)) }()
}
log.Printf("DataNode#%d running on %s", s.ID(), config.ApiHTTPListenAddr())
}
// Wait indefinitely.
<-(chan struct{})(nil)
@ -63,7 +112,9 @@ func writePIDFile(path string) {
func parseConfig(path, hostname string) *Config {
// Parse configuration.
config, err := ParseConfigFile(path)
if err != nil {
if os.IsNotExist(err) {
config = NewConfig()
} else if err != nil {
log.Fatalf("config: %s", err)
}
@ -76,91 +127,77 @@ func parseConfig(path, hostname string) *Config {
}
// creates and initializes a broker at a given path.
// Ignored if there is no broker directory.
func openBroker(path, addr string) *messaging.Broker {
// Ignore if there's no broker directory.
if _, err := os.Stat(path); os.IsNotExist(err) {
return nil
}
// If the Broker directory exists, open a Broker on this node.
b := messaging.NewBroker()
if err := b.Open(path, addr); err != nil {
log.Fatalf("failed to open Broker", err.Error())
log.Fatalf("failed to open broker: %s", err)
}
return b
}
// creates and initializes a server at a given path.
// Ignored if there is no data directory.
func openServer(path string, seedServers []string) *influxdb.Server {
// Ignore if the data directory doesn't exists.
if _, err := os.Stat(path); os.IsNotExist(err) {
return nil
}
// Create and open server
func openServer(path string) *influxdb.Server {
s := influxdb.NewServer()
if err := s.Open(path); err != nil {
log.Fatalf("failed to open data server", err.Error())
}
// Open messaging client to communicate with the brokers.
var brokerURLs []*url.URL
for _, s := range seedServers {
u, err := url.Parse(s)
if err != nil {
log.Fatalf("cannot parse seed server: %s", err)
}
brokerURLs = append(brokerURLs, u)
}
// Initialize the messaging client.
c := messaging.NewClient(s.ID())
if err := c.Open(filepath.Join(path, messagingClientFile), brokerURLs); err != nil {
log.Fatalf("error opening messaging client: %s", err)
}
// Assign the client to the server.
if err := s.SetClient(c); err != nil {
log.Fatalf("set messaging client: %s", err)
}
return s
}
// starts handlers for the broker and server.
// If the broker and server are running on the same port then combine them.
func listenAndServe(b *messaging.Broker, s *influxdb.Server, brokerAddr, serverAddr string) {
// Initialize handlers.
var bh, sh http.Handler
if b != nil {
bh = messaging.NewHandler(b)
}
if s != nil {
sh = influxdb.NewHandler(s)
// initializes a new server that does not yet have an ID.
func initServer(s *influxdb.Server, b *messaging.Broker) {
// TODO: Change messaging client to not require a ReplicaID so we can create
// a replica without already being a replica.
// Create replica on broker.
if err := b.CreateReplica(1); err != nil {
log.Fatalf("replica creation error: %d", err)
}
// Combine handlers if they are using the same bind address.
if brokerAddr == serverAddr {
go func() { log.Fatal(http.ListenAndServe(brokerAddr, NewHandler(bh, sh))) }()
} else {
// Otherwise start the handlers on separate ports.
if sh != nil {
go func() { log.Fatal(http.ListenAndServe(serverAddr, sh)) }()
}
if bh != nil {
go func() { log.Fatal(http.ListenAndServe(brokerAddr, bh)) }()
// Initialize messaging client.
c := messaging.NewClient(1)
if err := c.Open(filepath.Join(s.Path(), messagingClientFile), []*url.URL{b.URL()}); err != nil {
log.Fatalf("messaging client error: %s", err)
}
if err := s.SetClient(c); err != nil {
log.Fatalf("set client error: %s", err)
}
// Initialize the server.
if err := s.Initialize(b.URL()); err != nil {
log.Fatalf("server initialization error: %s", err)
}
}
// opens the messaging client and attaches it to the server.
func openServerClient(s *influxdb.Server, brokerURLs []*url.URL) {
c := messaging.NewClient(s.ID())
if err := c.Open(filepath.Join(s.Path(), messagingClientFile), brokerURLs); err != nil {
log.Fatalf("messaging client error: %s", err)
}
if err := s.SetClient(c); err != nil {
log.Fatalf("set client error: %s", err)
}
}
// parses a comma-delimited list of URLs.
func parseSeedServers(s string) (a []*url.URL) {
for _, s := range strings.Split(s, ",") {
u, err := url.Parse(s)
if err != nil {
log.Fatalf("cannot parse seed servers: %s", err)
}
a = append(a, u)
}
return
}
// Log the handlers starting up.
if serverAddr == "" {
log.Printf("Starting Influx Server %s...", version)
} else {
log.Printf("Starting Influx Server %s bound to %s...", version, serverAddr)
// returns true if the file exists.
func fileExists(path string) bool {
if _, err := os.Stat(path); os.IsNotExist(err) {
return false
}
return true
}
func printRunUsage() {
@ -173,6 +210,12 @@ use Distributed Consensus, but is otherwise fully-functional.
-config <path>
Set the path to the configuration file. Defaults to %s.
-role <role>
Set the role to be 'combined', 'broker' or 'data'. broker' means it will take
part in Raft Distributed Consensus. 'data' means it will store time-series data.
'combined' means it will do both. The default is 'combined'. In role other than
these three is invalid.
-hostname <name>
Override the hostname, the 'hostname' configuration option will be overridden.

View File

@ -101,6 +101,11 @@ func (b *Broker) Close() error {
return nil
}
// URL returns the connection url for the broker.
func (b *Broker) URL() *url.URL {
return b.log.URL
}
// Initialize creates a new cluster.
func (b *Broker) Initialize() error {
if err := b.log.Initialize(); err != nil {

View File

@ -292,6 +292,32 @@ func (s *Server) sync(index uint64) error {
}
}
// Initialize creates a new data node and initializes the server's id to 1.
func (s *Server) Initialize(u *url.URL) error {
// Create a new data node.
if err := s.CreateDataNode(u); err != nil {
return err
}
// Ensure the data node returns with an ID of 1.
// If it doesn't then something went really wrong. We have to panic because
// the messaging client relies on the first server being assigned ID 1.
n := s.DataNodeByURL(u)
assert(n != nil && n.ID == 1, "invalid initial server id: %d", n.ID)
// Set the ID on the metastore.
if err := s.meta.mustUpdate(func(tx *metatx) error {
return tx.setID(n.ID)
}); err != nil {
return err
}
// Set the ID on the server.
s.id = 1
return nil
}
// DataNode returns a data node by id.
func (s *Server) DataNode(id uint64) *DataNode {
s.mu.RLock()