diff --git a/cmd/influxd/config.go b/cmd/influxd/config.go index 4d111fcd8c..9b20e5464c 100644 --- a/cmd/influxd/config.go +++ b/cmd/influxd/config.go @@ -4,6 +4,8 @@ import ( "fmt" "net" "os" + "os/user" + "path/filepath" "strconv" "strings" "time" @@ -22,9 +24,14 @@ const ( // can be queried concurrently at one time. DefaultConcurrentShardQueryLimit = 10 - // DefaultAPIReadTimeout represents the amount time before an API request - // times out. + // DefaultAPIReadTimeout represents the duration before an API request times out. DefaultAPIReadTimeout = 5 * time.Second + + // DefaultBrokerPort represents the default port the broker runs on. + DefaultBrokerPort = 8086 + + // DefaultHTTPAPIPort represents the default port the HTTP API runs on. + DefaultHTTPAPIPort = 8086 ) // Config represents the configuration format for the influxd binary. @@ -61,13 +68,13 @@ type Config struct { } `toml:"udp_servers"` } `toml:"input_plugins"` - Raft struct { + Broker struct { Port int `toml:"port"` Dir string `toml:"dir"` Timeout Duration `toml:"election-timeout"` - } `toml:"raft"` + } `toml:"broker"` - Storage struct { + Data struct { Dir string `toml:"dir"` WriteBufferSize int `toml:"write-buffer-size"` MaxOpenShards int `toml:"max-open-shards"` @@ -75,7 +82,7 @@ type Config struct { WriteBatchSize int `toml:"write-batch-size"` Engines map[string]toml.Primitive `toml:"engines"` RetentionSweepPeriod Duration `toml:"retention-sweep-period"` - } `toml:"storage"` + } `toml:"data"` Cluster struct { Dir string `toml:"dir"` @@ -97,15 +104,21 @@ type Config struct { // NewConfig returns an instance of Config with reasonable defaults. func NewConfig() *Config { + u, _ := user.Current() + c := &Config{} - c.Storage.RetentionSweepPeriod = Duration(10 * time.Minute) + c.Data.RetentionSweepPeriod = Duration(10 * time.Minute) c.Cluster.ConcurrentShardQueryLimit = DefaultConcurrentShardQueryLimit - c.Raft.Timeout = Duration(1 * time.Second) + c.Broker.Dir = filepath.Join(u.HomeDir, ".influxdb/broker") + c.Broker.Port = DefaultBrokerPort + c.Broker.Timeout = Duration(1 * time.Second) + c.HTTPAPI.Port = DefaultHTTPAPIPort c.HTTPAPI.ReadTimeout = Duration(DefaultAPIReadTimeout) c.Cluster.MinBackoff = Duration(1 * time.Second) c.Cluster.MaxBackoff = Duration(10 * time.Second) c.Cluster.ProtobufHeartbeatInterval = Duration(10 * time.Millisecond) - c.Storage.WriteBufferSize = 1000 + c.Data.Dir = filepath.Join(u.HomeDir, ".influxdb/data") + c.Data.WriteBufferSize = 1000 c.Cluster.WriteBufferSize = 1000 c.Cluster.MaxResponseBufferSize = 100 @@ -124,29 +137,29 @@ func NewConfig() *Config { return c } -// PointBatchSize returns the storage point batch size, if set. +// PointBatchSize returns the data point batch size, if set. // If not set, the LevelDB point batch size is returned. // If that is not set then the default point batch size is returned. func (c *Config) PointBatchSize() int { - if c.Storage.PointBatchSize != 0 { - return c.Storage.PointBatchSize + if c.Data.PointBatchSize != 0 { + return c.Data.PointBatchSize } return DefaultPointBatchSize } -// WriteBatchSize returns the storage write batch size, if set. +// WriteBatchSize returns the data write batch size, if set. // If not set, the LevelDB write batch size is returned. // If that is not set then the default write batch size is returned. func (c *Config) WriteBatchSize() int { - if c.Storage.WriteBatchSize != 0 { - return c.Storage.WriteBatchSize + if c.Data.WriteBatchSize != 0 { + return c.Data.WriteBatchSize } return DefaultWriteBatchSize } // MaxOpenShards returns the maximum number of shards to keep open at once. func (c *Config) MaxOpenShards() int { - return c.Storage.MaxOpenShards + return c.Data.MaxOpenShards } // ApiHTTPListenAddr returns the binding address the API HTTP server @@ -154,14 +167,14 @@ func (c *Config) ApiHTTPListenAddr() string { return fmt.Sprintf("%s:%d", c.BindAddress, c.HTTPAPI.Port) } -// RaftListenAddr returns the binding address the Raft server -func (c *Config) RaftListenAddr() string { - return fmt.Sprintf("%s:%d", c.BindAddress, c.Raft.Port) +// BrokerListenAddr returns the binding address the Broker server +func (c *Config) BrokerListenAddr() string { + return fmt.Sprintf("%s:%d", c.BindAddress, c.Broker.Port) } -// RaftConnectionString returns the address required to contact the Raft server -func (c *Config) RaftConnectionString() string { - return fmt.Sprintf("http://%s:%d", c.Hostname, c.Raft.Port) +// BrokerConnectionString returns the address required to contact the Broker server +func (c *Config) BrokerConnectionString() string { + return fmt.Sprintf("http://%s:%d", c.Hostname, c.Broker.Port) } // Size represents a TOML parseable file size. diff --git a/cmd/influxd/config_test.go b/cmd/influxd/config_test.go index 44ce173b3c..ff7abea47c 100644 --- a/cmd/influxd/config_test.go +++ b/cmd/influxd/config_test.go @@ -54,7 +54,7 @@ func TestParseConfig(t *testing.T) { t.Fatalf("admin assets mismatch: %v", c.Admin.Assets) } - if c.HTTPAPI.Port != 0 { + if c.HTTPAPI.Port != main.DefaultBrokerPort { t.Fatalf("http api port mismatch: %v", c.HTTPAPI.Port) } else if c.HTTPAPI.SSLPort != 8087 { t.Fatalf("http api ssl port mismatch: %v", c.HTTPAPI.SSLPort) @@ -94,16 +94,16 @@ func TestParseConfig(t *testing.T) { t.Fatalf("graphite udp protocol mismatch: expected %v, got %v", "udp", strings.ToLower(udpGraphite.Protocol)) } - if c.Raft.Port != 8090 { - t.Fatalf("raft port mismatch: %v", c.Raft.Port) - } else if c.Raft.Dir != "/tmp/influxdb/development/raft" { - t.Fatalf("raft dir mismatch: %v", c.Raft.Dir) - } else if time.Duration(c.Raft.Timeout) != time.Second { - t.Fatalf("raft duration mismatch: %v", c.Raft.Timeout) + if c.Broker.Port != 8090 { + t.Fatalf("broker port mismatch: %v", c.Broker.Port) + } else if c.Broker.Dir != "/tmp/influxdb/development/broker" { + t.Fatalf("broker dir mismatch: %v", c.Broker.Dir) + } else if time.Duration(c.Broker.Timeout) != time.Second { + t.Fatalf("broker duration mismatch: %v", c.Broker.Timeout) } - if c.Storage.Dir != "/tmp/influxdb/development/db" { - t.Fatalf("data dir mismatch: %v", c.Storage.Dir) + if c.Data.Dir != "/tmp/influxdb/development/db" { + t.Fatalf("data dir mismatch: %v", c.Data.Dir) } if c.Cluster.ProtobufPort != 8099 { @@ -184,17 +184,21 @@ database = "graphite_udp" # store graphite data in this database # Raft configuration [raft] # The raft port should be open between all servers in a cluster. +# Broker configuration +[broker] +# The broker port should be open between all servers in a cluster. # However, this port shouldn't be accessible from the internet. port = 8090 -# Where the raft logs are stored. The user running InfluxDB will need read/write access. -dir = "/tmp/influxdb/development/raft" +# Where the broker logs are stored. The user running InfluxDB will need read/write access. +dir = "/tmp/influxdb/development/broker" # election-timeout = "2s" -[storage] +[data] dir = "/tmp/influxdb/development/db" + # How many requests to potentially buffer in memory. If the buffer gets filled then writes # will still be logged and once the local storage has caught up (or compacted) the writes # will be replayed from the WAL @@ -211,7 +215,7 @@ retention-sweep-period = "10m" # prior to shutting down. Any server can be pointed to # as a seed. It will find the Raft leader automatically. -# Here's an example. Note that the port on the host is the same as the raft port. +# Here's an example. Note that the port on the host is the same as the broker port. seed-servers = ["hosta:8090", "hostb:8090"] # Replication happens over a TCP connection with a Protobuf protocol. diff --git a/cmd/influxd/create_cluster.go b/cmd/influxd/create_cluster.go index 1bed6b9bec..4de2ffaada 100644 --- a/cmd/influxd/create_cluster.go +++ b/cmd/influxd/create_cluster.go @@ -34,7 +34,7 @@ func execCreateCluster(args []string) { // Create the broker. b := messaging.NewBroker() - if err := b.Open(config.Raft.Dir, config.RaftConnectionString()); err != nil { + if err := b.Open(config.Broker.Dir, config.BrokerConnectionString()); err != nil { log.Fatalf("broker: %s", err.Error()) } @@ -49,22 +49,22 @@ func execCreateCluster(args []string) { log.Fatalf("create-cluster: data directory already exists") } - // Now create the storage directory. - if err := os.MkdirAll(config.Storage.Dir, 0744); err != nil { - log.Fatalf("create-cluster storage: %s", err.Error()) + // Now create the data directory. + if err := os.MkdirAll(config.Data.Dir, 0744); err != nil { + log.Fatalf("create-cluster data dir: %s", err.Error()) } // Configure the Messaging Client such that this node connects to itself. var seedUrls []*url.URL - u, err := url.Parse(config.RaftListenAddr()) + u, err := url.Parse(config.BrokerListenAddr()) if err != nil { log.Fatalf("create-cluster seed URLs: %s", err.Error()) } seedUrls = append(seedUrls, u) c := messaging.NewClient(0) // TODO: Set replica id. - if err := c.Open(filepath.Join(config.Storage.Dir, messagingClientFile), seedUrls); err != nil { + if err := c.Open(filepath.Join(config.Data.Dir, messagingClientFile), seedUrls); err != nil { log.Fatalf("create-cluster open client: %s", err.Error()) } if err := c.Close(); err != nil { @@ -73,7 +73,7 @@ func execCreateCluster(args []string) { } - log.Println("new cluster node created as", *role, "in", config.Raft.Dir) + log.Println("new cluster node created as", *role, "in", config.Broker.Dir) } func printCreateClusterUsage() { diff --git a/cmd/influxd/handler.go b/cmd/influxd/handler.go index 06693e8b6c..ba333c76cd 100644 --- a/cmd/influxd/handler.go +++ b/cmd/influxd/handler.go @@ -3,32 +3,40 @@ package main import ( "net/http" "strings" - - "github.com/influxdb/influxdb" - "github.com/influxdb/influxdb/messaging" ) -// Handler represents an HTTP handler for InfluxDB node. Depending on its role, it -// will serve many different endpoints. +// Handler represents an HTTP handler for InfluxDB node. +// Depending on its role, it will serve many different endpoints. type Handler struct { - brokerHandler *messaging.Handler - serverHandler *influxdb.Handler + brokerHandler http.Handler + serverHandler http.Handler } // NewHandler returns a new instance of Handler. -func NewHandler(b *messaging.Handler, s *influxdb.Handler) *Handler { +func NewHandler(bh, sh http.Handler) *Handler { return &Handler{ - brokerHandler: b, - serverHandler: s, + brokerHandler: bh, + serverHandler: sh, } } // ServeHTTP responds to HTTP request to the handler. func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // Route raft and messaging paths to the broker. if strings.HasPrefix(r.URL.Path, "/raft") || strings.HasPrefix(r.URL.Path, "/messages") { + if h.brokerHandler == nil { + http.NotFound(w, r) + return + } + h.brokerHandler.ServeHTTP(w, r) return } + // Route all other paths to the server. + if h.serverHandler == nil { + http.NotFound(w, r) + return + } h.serverHandler.ServeHTTP(w, r) } diff --git a/cmd/influxd/join_cluster.go b/cmd/influxd/join_cluster.go index 26498b63e6..7679d9cdca 100644 --- a/cmd/influxd/join_cluster.go +++ b/cmd/influxd/join_cluster.go @@ -50,7 +50,7 @@ func execJoinCluster(args []string) { // Broker required -- but don't initialize it. // Joining a cluster will do that. b := messaging.NewBroker() - if err := b.Open(config.Raft.Dir, config.RaftConnectionString()); err != nil { + if err := b.Open(config.Broker.Dir, config.BrokerConnectionString()); err != nil { log.Fatalf("join: %s", err) } @@ -74,17 +74,17 @@ func execJoinCluster(args []string) { // If joining as a data node then create a data directory. if *role == "combined" || *role == "data" { - if _, err := os.Stat(config.Storage.Dir); err == nil { - log.Fatalf("join-cluster: storage directory already exists") + if _, err := os.Stat(config.Data.Dir); err == nil { + log.Fatalf("join-cluster: data directory already exists") } - if err := os.MkdirAll(config.Storage.Dir, 0744); err != nil { - log.Fatalf("join-cluster storage: %s", err.Error()) + if err := os.MkdirAll(config.Data.Dir, 0744); err != nil { + log.Fatalf("join-cluster data dir: %s", err.Error()) } // Configure the Messaging Client. c := messaging.NewClient(0) // TODO: Set replica id. - if err := c.Open(filepath.Join(config.Storage.Dir, messagingClientFile), seedURLs); err != nil { + if err := c.Open(filepath.Join(config.Data.Dir, messagingClientFile), seedURLs); err != nil { log.Fatalf("join-cluster open client: %s", err.Error()) } if err := c.Close(); err != nil { diff --git a/cmd/influxd/main.go b/cmd/influxd/main.go index a8f37d8153..4b8cdce1f0 100644 --- a/cmd/influxd/main.go +++ b/cmd/influxd/main.go @@ -56,8 +56,6 @@ func main() { // Extract name from args. switch cmd { - case "create-cluster": - execCreateCluster(args[1:]) case "join-cluster": execJoinCluster(args[1:]) case "run": @@ -103,7 +101,6 @@ Usage: The commands are: - create-cluster create a new node that other nodes can join to form a new cluster join-cluster create a new node that will join an existing cluster run run node with existing configuration version displays the InfluxDB version diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index 6c07a7b6df..7aca9c6143 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -12,7 +12,6 @@ import ( "strings" "github.com/influxdb/influxdb" - "github.com/influxdb/influxdb/graphite" "github.com/influxdb/influxdb/messaging" ) @@ -23,144 +22,184 @@ func execRun(args []string) { var ( configPath = fs.String("config", configDefaultPath, "") pidPath = fs.String("pidfile", "", "") + role = fs.String("role", "combined", "") hostname = fs.String("hostname", "", "") seedServers = fs.String("seed-servers", "", "") ) fs.Usage = printRunUsage fs.Parse(args) - // Write pid file. - if *pidPath != "" { - pid := strconv.Itoa(os.Getpid()) - if err := ioutil.WriteFile(*pidPath, []byte(pid), 0644); err != nil { - log.Fatal(err) - } + // Validate CLI flags. + if *role != "combined" && *role != "broker" && *role != "data" { + log.Fatalf("role must be 'combined', 'broker', or 'data'") } - // Parse configuration. - config, err := ParseConfigFile(*configPath) - if err != nil { - log.Fatalf("config: %s", err) - } - - // Override config properties. - if *hostname != "" { - config.Hostname = *hostname - } - - // TODO(benbjohnson): Start admin server. + // Parse broker urls from seed servers. + brokerURLs := parseSeedServers(*seedServers) + // Print sweet InfluxDB logo and write the process id to file. log.Print(logo) - if config.BindAddress == "" { - log.Printf("Starting Influx Server %s...", version) - } else { - log.Printf("Starting Influx Server %s bound to %s...", version, config.BindAddress) - } + writePIDFile(*pidPath) - // Start up the node. - var brokerHandler *messaging.Handler - var serverHandler *influxdb.Handler - var brokerDirExists bool - var storageDirExists bool + // Parse the configuration and determine if a broker and/or server exist. + config := parseConfig(*configPath, *hostname) + hasBroker := fileExists(config.Broker.Dir) + hasServer := fileExists(config.Data.Dir) + initializing := !hasBroker && !hasServer - if _, err := os.Stat(config.Raft.Dir); err == nil { - brokerDirExists = true - } - if _, err := os.Stat(config.Storage.Dir); err == nil { - storageDirExists = true - } + // Open broker if it exists or if we're initializing for the first time. + var b *messaging.Broker + var h *Handler + if hasBroker || (initializing && (*role == "combined" || *role == "broker")) { + b = openBroker(config.Broker.Dir, config.BrokerConnectionString()) - if !brokerDirExists && !storageDirExists { - // Node is completely new, so create the minimum needed, which - // is a storage directory. - if err := os.MkdirAll(config.Storage.Dir, 0744); err != nil { - log.Fatal(err) - } - storageDirExists = true - } - - // If the Broker directory exists, open a Broker on this node. - if brokerDirExists { - b := messaging.NewBroker() - if err := b.Open(config.Raft.Dir, config.RaftConnectionString()); err != nil { - log.Fatalf("failed to open Broker: %v", err.Error()) - } - brokerHandler = messaging.NewHandler(b) - } - - // If the storage directory exists, open a Data node. - if storageDirExists { - var client influxdb.MessagingClient - var server *influxdb.Server - - clientFilePath := filepath.Join(config.Storage.Dir, messagingClientFile) - - if _, err := os.Stat(clientFilePath); err == nil { - var brokerURLs []*url.URL - for _, s := range strings.Split(*seedServers, ",") { - u, err := url.Parse(s) - if err != nil { - log.Fatalf("seed server %v", err) - } - brokerURLs = append(brokerURLs, u) + // If this is the first time running then initialize a broker. + // Update the seed server so the server can connect locally. + if initializing { + if err := b.Initialize(); err != nil { + log.Fatalf("initialize: %s", err) } + } - c := messaging.NewClient(0) // TODO: Set replica id. - if err := c.Open(clientFilePath, brokerURLs); err != nil { - log.Fatalf("Error opening Messaging Client: %s", err.Error()) - } - defer c.Close() - client = c + // Start the broker handler. + h = &Handler{brokerHandler: messaging.NewHandler(b)} + go func() { log.Fatal(http.ListenAndServe(config.BrokerListenAddr(), h)) }() + log.Printf("Broker running on %s", config.BrokerListenAddr()) + } + + // Open server if it exists or we're initializing for the first time. + var s *influxdb.Server + if hasServer || (initializing && (*role == "combined" || *role == "data")) { + s = openServer(config.Data.Dir) + + // If the server is uninitialized then initialize it with the broker. + // Otherwise simply create a messaging client with the server id. + if s.ID() == 0 { + initServer(s, b) } else { - client = messaging.NewLoopbackClient() + openServerClient(s, brokerURLs) } - server = influxdb.NewServer(client) - err = server.Open(config.Storage.Dir) - if err != nil { - log.Fatalf("failed to open data Server %v", err.Error()) + // Start the server handler. + // If it uses the same port as the broker then simply attach it. + sh := influxdb.NewHandler(s) + if config.BrokerListenAddr() == config.ApiHTTPListenAddr() { + h.serverHandler = sh + } else { + go func() { log.Fatal(http.ListenAndServe(config.ApiHTTPListenAddr(), sh)) }() } - serverHandler = influxdb.NewHandler(server) - - // Spin up any grahite servers - for _, g := range config.Graphite { - // Get a new server - s := graphite.Server{Server: server} - - // Set database - s.Database = g.Database - - // Set the addresses up - s.TCPAddr = g.TCPAddr(config.BindAddress) - s.UDPAddr = g.UDPAddr(config.BindAddress) - - // Spin it up - go func() { log.Fatal(s.ListenAndServe) }() - - } - - } - - // TODO: startProfiler() - // TODO: -reset-root - - // Start up HTTP server(s) - if config.ApiHTTPListenAddr() != config.RaftListenAddr() { - if serverHandler != nil { - go func() { log.Fatal(http.ListenAndServe(config.ApiHTTPListenAddr(), serverHandler)) }() - } - if brokerHandler != nil { - go func() { log.Fatal(http.ListenAndServe(config.RaftListenAddr(), brokerHandler)) }() - } - } else { - h := NewHandler(brokerHandler, serverHandler) - go func() { log.Fatal(http.ListenAndServe(config.ApiHTTPListenAddr(), h)) }() + log.Printf("DataNode#%d running on %s", s.ID(), config.ApiHTTPListenAddr()) } // Wait indefinitely. <-(chan struct{})(nil) } +// write the current process id to a file specified by path. +func writePIDFile(path string) { + if path == "" { + return + } + + // Retrieve the PID and write it. + pid := strconv.Itoa(os.Getpid()) + if err := ioutil.WriteFile(path, []byte(pid), 0644); err != nil { + log.Fatal(err) + } +} + +// parses the configuration from a given path. Sets overrides as needed. +func parseConfig(path, hostname string) *Config { + // Parse configuration. + config, err := ParseConfigFile(path) + if os.IsNotExist(err) { + config = NewConfig() + } else if err != nil { + log.Fatalf("config: %s", err) + } + + // Override config properties. + if hostname != "" { + config.Hostname = hostname + } + + return config +} + +// creates and initializes a broker at a given path. +func openBroker(path, addr string) *messaging.Broker { + b := messaging.NewBroker() + if err := b.Open(path, addr); err != nil { + log.Fatalf("failed to open broker: %s", err) + } + return b +} + +// creates and initializes a server at a given path. +func openServer(path string) *influxdb.Server { + s := influxdb.NewServer() + if err := s.Open(path); err != nil { + log.Fatalf("failed to open data server: %v", err.Error()) + } + return s +} + +// initializes a new server that does not yet have an ID. +func initServer(s *influxdb.Server, b *messaging.Broker) { + // TODO: Change messaging client to not require a ReplicaID so we can create + // a replica without already being a replica. + + // Create replica on broker. + if err := b.CreateReplica(1); err != nil { + log.Fatalf("replica creation error: %s", err) + } + + // Initialize messaging client. + c := messaging.NewClient(1) + if err := c.Open(filepath.Join(s.Path(), messagingClientFile), []*url.URL{b.URL()}); err != nil { + log.Fatalf("messaging client error: %s", err) + } + if err := s.SetClient(c); err != nil { + log.Fatalf("set client error: %s", err) + } + + // Initialize the server. + if err := s.Initialize(b.URL()); err != nil { + log.Fatalf("server initialization error: %s", err) + } +} + +// opens the messaging client and attaches it to the server. +func openServerClient(s *influxdb.Server, brokerURLs []*url.URL) { + c := messaging.NewClient(s.ID()) + if err := c.Open(filepath.Join(s.Path(), messagingClientFile), brokerURLs); err != nil { + log.Fatalf("messaging client error: %s", err) + } + if err := s.SetClient(c); err != nil { + log.Fatalf("set client error: %s", err) + } +} + +// parses a comma-delimited list of URLs. +func parseSeedServers(s string) (a []*url.URL) { + for _, s := range strings.Split(s, ",") { + u, err := url.Parse(s) + if err != nil { + log.Fatalf("cannot parse seed servers: %s", err) + } + a = append(a, u) + } + return +} + +// returns true if the file exists. +func fileExists(path string) bool { + if _, err := os.Stat(path); os.IsNotExist(err) { + return false + } + return true +} + func printRunUsage() { log.Printf(`usage: run [flags] @@ -171,6 +210,12 @@ use Distributed Consensus, but is otherwise fully-functional. -config Set the path to the configuration file. Defaults to %s. + -role + Set the role to be 'combined', 'broker' or 'data'. broker' means it will take + part in Raft Distributed Consensus. 'data' means it will store time-series data. + 'combined' means it will do both. The default is 'combined'. In role other than + these three is invalid. + -hostname Override the hostname, the 'hostname' configuration option will be overridden. diff --git a/influxql/ast_test.go b/influxql/ast_test.go index 5ea0e8083b..3180843dee 100644 --- a/influxql/ast_test.go +++ b/influxql/ast_test.go @@ -67,9 +67,9 @@ func TestSelectStatement_Substatement(t *testing.T) { // 5. 4 with different condition order { - stmt: `SELECT sum(aa.value) + sum(bb.value) FROM join(aa, bb) WHERE (bb.host = "serverb" OR bb.host = "serverc") AND aa.host = "servera" AND 1 = 2`, + stmt: `SELECT sum(aa.value) + sum(bb.value) FROM join(aa, bb) WHERE ((bb.host = "serverb" OR bb.host = "serverc") AND aa.host = "servera") AND 1 = 2`, expr: &influxql.VarRef{Val: "bb.value"}, - sub: `SELECT bb.value FROM bb WHERE (bb.host = "serverb" OR bb.host = "serverc") AND 1.000 = 2.000`, + sub: `SELECT bb.value FROM bb WHERE ((bb.host = "serverb" OR bb.host = "serverc")) AND 1.000 = 2.000`, }, } diff --git a/messaging/broker.go b/messaging/broker.go index 1e9bb5a961..401fde8e9d 100644 --- a/messaging/broker.go +++ b/messaging/broker.go @@ -101,6 +101,11 @@ func (b *Broker) Close() error { return nil } +// URL returns the connection url for the broker. +func (b *Broker) URL() *url.URL { + return b.log.URL +} + // Initialize creates a new cluster. func (b *Broker) Initialize() error { if err := b.log.Initialize(); err != nil { diff --git a/server.go b/server.go index 50ae69d9e5..1c8a28aeb3 100644 --- a/server.go +++ b/server.go @@ -88,11 +88,8 @@ type Server struct { } // NewServer returns a new instance of Server. -// The server requires a client to the messaging broker to be passed in. -func NewServer(client MessagingClient) *Server { - assert(client != nil, "messaging client required") +func NewServer() *Server { return &Server{ - client: client, meta: &metastore{}, dataNodes: make(map[uint64]*DataNode), databases: make(map[string]*database), @@ -102,9 +99,21 @@ func NewServer(client MessagingClient) *Server { } } +// ID returns the data node id for the server. +// Returns zero if the server is closed or the server has not joined a cluster. +func (s *Server) ID() uint64 { + s.mu.Lock() + defer s.mu.Unlock() + return s.id +} + // Path returns the path used when opening the server. // Returns an empty string when the server is closed. -func (s *Server) Path() string { return s.path } +func (s *Server) Path() string { + s.mu.Lock() + defer s.mu.Unlock() + return s.path +} // shardPath returns the path for a shard. func (s *Server) shardPath(id uint64) string { @@ -144,10 +153,6 @@ func (s *Server) Open(path string) error { // Set the server path. s.path = path - // Start goroutine to read messages from the broker. - s.done = make(chan struct{}, 0) - go s.processor(s.done) - return nil } @@ -163,9 +168,8 @@ func (s *Server) Close() error { return ErrServerClosed } - // Close notification. - close(s.done) - s.done = nil + // Close message processing. + s.setClient(nil) // Close metastore. _ = s.meta.close() @@ -202,6 +206,44 @@ func (s *Server) load() error { }) } +// Client retrieves the current messaging client. +func (s *Server) Client() MessagingClient { + s.mu.RLock() + defer s.mu.RUnlock() + return s.client +} + +// SetClient sets the messaging client on the server. +func (s *Server) SetClient(client MessagingClient) error { + s.mu.Lock() + defer s.mu.Unlock() + return s.setClient(client) +} + +func (s *Server) setClient(client MessagingClient) error { + // Ensure the server is open. + if !s.opened() { + return ErrServerClosed + } + + // Stop previous processor, if running. + if s.done != nil { + close(s.done) + s.done = nil + } + + // Set the messaging client. + s.client = client + + // Start goroutine to read messages from the broker. + if client != nil { + s.done = make(chan struct{}, 0) + go s.processor(client, s.done) + } + + return nil +} + // broadcast encodes a message as JSON and send it to the broker's broadcast topic. // This function waits until the message has been processed by the server. // Returns the broker log index of the message or an error. @@ -250,6 +292,32 @@ func (s *Server) sync(index uint64) error { } } +// Initialize creates a new data node and initializes the server's id to 1. +func (s *Server) Initialize(u *url.URL) error { + // Create a new data node. + if err := s.CreateDataNode(u); err != nil { + return err + } + + // Ensure the data node returns with an ID of 1. + // If it doesn't then something went really wrong. We have to panic because + // the messaging client relies on the first server being assigned ID 1. + n := s.DataNodeByURL(u) + assert(n != nil && n.ID == 1, "invalid initial server id: %d", n.ID) + + // Set the ID on the metastore. + if err := s.meta.mustUpdate(func(tx *metatx) error { + return tx.setID(n.ID) + }); err != nil { + return err + } + + // Set the ID on the server. + s.id = 1 + + return nil +} + // DataNode returns a data node by id. func (s *Server) DataNode(id uint64) *DataNode { s.mu.RLock() @@ -1059,8 +1127,7 @@ func (s *Server) Measurements(database string) (a Measurements) { } // processor runs in a separate goroutine and processes all incoming broker messages. -func (s *Server) processor(done chan struct{}) { - client := s.client +func (s *Server) processor(client MessagingClient, done chan struct{}) { for { // Read incoming message. var m *messaging.Message diff --git a/server_test.go b/server_test.go index 95cd463bfe..d6aa4967e2 100644 --- a/server_test.go +++ b/server_test.go @@ -16,8 +16,7 @@ import ( // Ensure the server can be successfully opened and closed. func TestServer_Open(t *testing.T) { - c := NewMessagingClient() - s := NewServer(c) + s := NewServer() defer s.Close() if err := s.Server.Open(tempfile()); err != nil { t.Fatal(err) @@ -527,28 +526,38 @@ type Server struct { } // NewServer returns a new test server instance. -func NewServer(client influxdb.MessagingClient) *Server { - return &Server{influxdb.NewServer(client)} +func NewServer() *Server { + return &Server{influxdb.NewServer()} } // OpenServer returns a new, open test server instance. func OpenServer(client influxdb.MessagingClient) *Server { - s := NewServer(client) + s := NewServer() if err := s.Open(tempfile()); err != nil { panic(err.Error()) } + if err := s.SetClient(client); err != nil { + panic(err.Error()) + } return s } // Restart stops and restarts the server. func (s *Server) Restart() { - path := s.Path() + path, client := s.Path(), s.Client() + + // Stop the server. if err := s.Server.Close(); err != nil { panic("close: " + err.Error()) } + + // Open and reset the client. if err := s.Server.Open(path); err != nil { panic("open: " + err.Error()) } + if err := s.Server.SetClient(client); err != nil { + panic("client: " + err.Error()) + } } // Close shuts down the server and removes all temporary files.