From 9d4527071eb9f5f265e048b2bc7a1214ec9645b5 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 29 May 2015 13:50:05 -0600 Subject: [PATCH] Refactor run command. --- admin/admin.go | 67 ---- admin/admin_test.go | 31 -- admin/config.go | 1 + admin/config_test.go | 26 ++ admin/service.go | 82 +++-- admin/service_test.go | 33 ++ cluster/config.go | 3 + cluster/service.go | 11 +- cmd/influxd/help/help.go | 2 +- cmd/influxd/main.go | 94 ++---- cmd/influxd/run/command.go | 195 ++++++++++++ cmd/influxd/run/config.go | 408 ++++--------------------- cmd/influxd/run/config_command.go | 70 +++++ cmd/influxd/run/config_test.go | 393 +++++------------------- cmd/influxd/run/run.go | 44 --- cmd/influxd/run/server.go | 162 +--------- cmd/influxd/run/server_test.go | 22 +- collectd/config.go | 8 + collectd/config_test.go | 32 ++ graphite/config.go | 34 +++ graphite/config_test.go | 35 +++ graphite/graphite.go | 5 +- httpd/config.go | 16 + httpd/config_test.go | 35 +++ httpd/service.go | 27 +- meta/config.go | 39 +++ meta/config_test.go | 36 +++ meta/continuous_querier/config.go | 61 ++++ meta/continuous_querier/config_test.go | 36 +++ meta/continuous_querier/service.go | 1 + meta/store.go | 14 - monitor/config.go | 25 ++ monitor/monitor.go | 83 +++++ opentsdb/config.go | 8 + opentsdb/config_test.go | 29 ++ opentsdb/opentsdb.go | 8 +- toml/toml.go | 41 ++- toml/toml_test.go | 47 +++ tsdb/config.go | 39 ++- 39 files changed, 1187 insertions(+), 1116 deletions(-) delete mode 100644 admin/admin.go delete mode 100644 admin/admin_test.go create mode 100644 admin/config.go create mode 100644 admin/config_test.go create mode 100644 admin/service_test.go create mode 100644 cluster/config.go create mode 100644 cmd/influxd/run/command.go create mode 100644 cmd/influxd/run/config_command.go delete mode 100644 cmd/influxd/run/run.go create mode 100644 collectd/config.go create mode 100644 collectd/config_test.go create mode 100644 graphite/config.go create mode 100644 graphite/config_test.go create mode 100644 httpd/config.go create mode 100644 httpd/config_test.go create mode 100644 meta/config.go create mode 100644 meta/config_test.go create mode 100644 meta/continuous_querier/config.go create mode 100644 meta/continuous_querier/config_test.go create mode 100644 meta/continuous_querier/service.go create mode 100644 monitor/config.go create mode 100644 monitor/monitor.go create mode 100644 opentsdb/config.go create mode 100644 opentsdb/config_test.go create mode 100644 toml/toml_test.go diff --git a/admin/admin.go b/admin/admin.go deleted file mode 100644 index 48ad8042bf..0000000000 --- a/admin/admin.go +++ /dev/null @@ -1,67 +0,0 @@ -package admin - -import ( - "log" - "net" - "net/http" - "strings" - "sync" - - "github.com/rakyll/statik/fs" - - // Register static assets via statik. - _ "github.com/influxdb/influxdb/statik" -) - -// Server manages InfluxDB's admin web server. -type Server struct { - mu sync.Mutex - addr string - listener net.Listener - closed bool -} - -// NewServer constructs a new admin web server. The "addr" argument should be a -// string that looks like ":8083" or whatever addr to serve on. -func NewServer(addr string) *Server { - return &Server{addr: addr, closed: true} -} - -// ListenAndServe starts the admin web server and serves requests until -// s.Close() is called. -func (s *Server) ListenAndServe() error { - s.mu.Lock() - defer s.mu.Unlock() - if s.addr == "" { - return nil - } - - var err error - s.listener, err = net.Listen("tcp", s.addr) - if err != nil { - return err - } - - s.closed = false - statikFS, _ := fs.New() - - go func() { - err = http.Serve(s.listener, http.FileServer(statikFS)) - if !strings.Contains(err.Error(), "closed") { - log.Fatalf("admin server failed to server on %s: %s", s.addr, err) - } - }() - return err -} - -// Close stops the admin web server. -func (s *Server) Close() error { - s.mu.Lock() - defer s.mu.Unlock() - if s.closed { - return nil - } - - s.closed = true - return s.listener.Close() -} diff --git a/admin/admin_test.go b/admin/admin_test.go deleted file mode 100644 index d42054c9c0..0000000000 --- a/admin/admin_test.go +++ /dev/null @@ -1,31 +0,0 @@ -package admin_test - -import ( - "io/ioutil" - "net/http" - "testing" - - "github.com/influxdb/influxdb/admin" -) - -func Test_ServesIndexByDefault(t *testing.T) { - s := admin.NewServer(":8083") - go func() { s.ListenAndServe() }() - defer s.Close() - - resp, err := http.Get("http://localhost:8083/") - if err != nil { - t.Fatalf("couldn't complete GET to / on port 8083") - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - t.Fatalf("didn't get a 200 OK response from server, got %d instead", resp.StatusCode) - } - - _, err = ioutil.ReadAll(resp.Body) - - if err != nil { - t.Fatalf("couldn't read body") - } -} diff --git a/admin/config.go b/admin/config.go new file mode 100644 index 0000000000..d78da5da34 --- /dev/null +++ b/admin/config.go @@ -0,0 +1 @@ +package admin diff --git a/admin/config_test.go b/admin/config_test.go new file mode 100644 index 0000000000..33d043d0fd --- /dev/null +++ b/admin/config_test.go @@ -0,0 +1,26 @@ +package admin_test + +import ( + "testing" + + "github.com/BurntSushi/toml" + "github.com/influxdb/influxdb/admin" +) + +func TestConfig_Parse(t *testing.T) { + // Parse configuration. + var c admin.Config + if _, err := toml.Decode(` +enabled = true +bind-address = ":8083" +`, &c); err != nil { + t.Fatal(err) + } + + // Validate configuration. + if c.Enabled != true { + t.Fatalf("unexpected enabled: %v", c.Enabled) + } else if c.BindAddress != ":8083" { + t.Fatalf("unexpected bind address: %s", c.BindAddress) + } +} diff --git a/admin/service.go b/admin/service.go index 4c8fcd16bf..7eb6c766bf 100644 --- a/admin/service.go +++ b/admin/service.go @@ -1,32 +1,80 @@ package admin -import "log" +import ( + "fmt" + "net" + "net/http" + "strings" + // Register static assets via statik. + _ "github.com/influxdb/influxdb/statik" + "github.com/rakyll/statik/fs" +) + +// Service manages the listener for an admin endpoint. type Service struct { + listener net.Listener + addr string + err chan error } -func NewService(c *Config) *Service { - return &Service{} -} - -func (s *Service) Open() error { - - if err := cmd.node.openAdminServer(cmd.config.Admin.Port); err != nil { - log.Fatalf("admin server failed to listen on :%d: %s", cmd.config.Admin.Port, err) +// NewService returns a new instance of Service. +func NewService(c Config) *Service { + return &Service{ + addr: c.BindAddress, + err: make(chan error), } - log.Printf("admin server listening on :%d", cmd.config.Admin.Port) } -func (s *Service) Close() error { return nil } +// Open starts the service +func (s *Service) Open() error { + // Open listener. + listener, err := net.Listen("tcp", s.addr) + if err != nil { + return err + } + s.listener = listener -type Config struct { - Enabled bool `toml:"enabled"` - Port int `toml:"port"` + // Begin listening for requests in a separate goroutine. + go s.serve() + return nil } -func (s *Node) closeAdminServer() error { - if s.adminServer != nil { - return s.adminServer.Close() +// Close closes the underlying listener. +func (s *Service) Close() error { + if s.listener != nil { + return s.listener.Close() } return nil } + +// Err returns a channel for fatal errors that occur on the listener. +func (s *Service) Err() <-chan error { return s.err } + +// Addr returns the listener's address. Returns nil if listener is closed. +func (s *Service) Addr() net.Addr { + if s.listener != nil { + return s.listener.Addr() + } + return nil +} + +// serve serves the handler from the listener. +func (s *Service) serve() { + // Instantiate file system from embedded admin. + statikFS, err := fs.New() + if err != nil { + panic(err) + } + + // Run file system handler on listener. + err = http.Serve(s.listener, http.FileServer(statikFS)) + if err != nil && !strings.Contains(err.Error(), "closed") { + s.err <- fmt.Errorf("listener failed: addr=%s, err=%s", s.Addr(), err) + } +} + +type Config struct { + Enabled bool `toml:"enabled"` + BindAddress string `toml:"bind-address"` +} diff --git a/admin/service_test.go b/admin/service_test.go new file mode 100644 index 0000000000..214b0afacf --- /dev/null +++ b/admin/service_test.go @@ -0,0 +1,33 @@ +package admin_test + +import ( + "io/ioutil" + "net/http" + "testing" + + "github.com/influxdb/influxdb/admin" +) + +// Ensure service can serve the root index page of the admin. +func TestService_Index(t *testing.T) { + // Start service on random port. + s := admin.NewService(admin.Config{BindAddress: ":0"}) + if err := s.Open(); err != nil { + t.Fatal(err) + } + defer s.Close() + + // Request root index page. + resp, err := http.Get("http://" + s.Addr().String()) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + // Validate status code and body. + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected status: %d", resp.StatusCode) + } else if _, err := ioutil.ReadAll(resp.Body); err != nil { + t.Fatalf("unable to read body: %s", err) + } +} diff --git a/cluster/config.go b/cluster/config.go new file mode 100644 index 0000000000..8bf8a34628 --- /dev/null +++ b/cluster/config.go @@ -0,0 +1,3 @@ +package cluster + +type Config struct{} diff --git a/cluster/service.go b/cluster/service.go index cc9ba8b12d..80c0ecb9b0 100644 --- a/cluster/service.go +++ b/cluster/service.go @@ -1,13 +1,14 @@ package cluster +import "net" + type Service struct { } -func NewService(c *Config) *Service { +func NewService(c Config) *Service { return &Service{} } -func (s *Service) Open() error { return nil } -func (s *Service) Close() error { return nil } - -type Config struct{} +func (s *Service) Open() error { return nil } +func (s *Service) Close() error { return nil } +func (s *Service) Addr() net.Addr { return nil } diff --git a/cmd/influxd/help/help.go b/cmd/influxd/help/help.go index bc00806136..104d5a29c6 100644 --- a/cmd/influxd/help/help.go +++ b/cmd/influxd/help/help.go @@ -1,4 +1,4 @@ -package main +package help import "fmt" diff --git a/cmd/influxd/main.go b/cmd/influxd/main.go index ed0a8e48a0..5c31a96608 100644 --- a/cmd/influxd/main.go +++ b/cmd/influxd/main.go @@ -11,20 +11,10 @@ import ( "runtime/pprof" "strings" "time" + + "github.com/influxdb/influxdb/cmd/influxd/run" ) -const logo = ` - 8888888 .d888 888 8888888b. 888888b. - 888 d88P" 888 888 "Y88b 888 "88b - 888 888 888 888 888 888 .88P - 888 88888b. 888888 888 888 888 888 888 888 888 8888888K. - 888 888 "88b 888 888 888 888 Y8bd8P' 888 888 888 "Y88b - 888 888 888 888 888 888 888 X88K 888 888 888 888 - 888 888 888 888 888 Y88b 888 .d8""8b. 888 .d88P 888 d88P - 8888888 888 888 888 888 "Y88888 888 888 8888888P" 8888888P" - -` - // These variables are populated via the Go linker. var ( version string = "0.9" @@ -69,37 +59,38 @@ func main() { cmd = args[0] } + // FIXME(benbjohnson): Parse profiling args & start profiling. + // Extract name from args. switch cmd { - case "run": - cmd := NewRunCommand() - if err := cmd.Run(args[1:]...); err != nil { - log.Fatalf("run: %s", err) - } case "": - cmd := NewRunCommand() - if err := cmd.Run(args...); err != nil { + if err := run.NewCommand().Run(args...); err != nil { log.Fatalf("run: %s", err) } - case "backup": - cmd := NewBackupCommand() - if err := cmd.Run(args[1:]...); err != nil { - log.Fatalf("backup: %s", err) - } - case "restore": - cmd := NewRestoreCommand() - if err := cmd.Run(args[1:]...); err != nil { - log.Fatalf("restore: %s", err) + case "run": + if err := run.NewCommand().Run(args[1:]...); err != nil { + log.Fatalf("run: %s", err) } + // case "backup": + // cmd := NewBackupCommand() + // if err := cmd.Run(args[1:]...); err != nil { + // log.Fatalf("backup: %s", err) + // } + // case "restore": + // cmd := NewRestoreCommand() + // if err := cmd.Run(args[1:]...); err != nil { + // log.Fatalf("restore: %s", err) + // } case "version": execVersion(args[1:]) case "config": - execConfig(args[1:]) - case "help": - cmd := NewHelpCommand() - if err := cmd.Run(args[1:]...); err != nil { - log.Fatalf("help: %s", err) + if err := run.NewPrintConfigCommand().Run(args[1:]...); err != nil { + log.Fatalf("config: %s", err) } + // case "help": + // if err := help.NewCommand().Run(args[1:]...); err != nil { + // log.Fatalf("help: %s", err) + // } default: log.Fatalf(`influxd: unknown command "%s"`+"\n"+`Run 'influxd help' for usage`+"\n\n", cmd) } @@ -124,43 +115,6 @@ func execVersion(args []string) { log.Print(s) } -// execConfig parses and prints the current config loaded. -func execConfig(args []string) { - // Parse command flags. - fs := flag.NewFlagSet("", flag.ExitOnError) - fs.Usage = func() { - fmt.Println(`usage: config - - config displays the default configuration - `) - } - - var ( - configPath string - hostname string - ) - fs.StringVar(&configPath, "config", "", "") - fs.StringVar(&hostname, "hostname", "", "") - fs.Parse(args) - - var config *Config - var err error - if configPath == "" { - config, err = NewTestConfig() - } else { - config, err = ParseConfigFile(configPath) - } - if err != nil { - log.Fatalf("parse config: %s", err) - } - // Override config properties. - if hostname != "" { - config.Hostname = hostname - } - - config.Write(os.Stdout) -} - type Stopper interface { Stop() } diff --git a/cmd/influxd/run/command.go b/cmd/influxd/run/command.go new file mode 100644 index 0000000000..ae132fb306 --- /dev/null +++ b/cmd/influxd/run/command.go @@ -0,0 +1,195 @@ +package run + +import ( + "flag" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "runtime" + "strconv" + + "github.com/BurntSushi/toml" +) + +const logo = ` + 8888888 .d888 888 8888888b. 888888b. + 888 d88P" 888 888 "Y88b 888 "88b + 888 888 888 888 888 888 .88P + 888 88888b. 888888 888 888 888 888 888 888 888 8888888K. + 888 888 "88b 888 888 888 888 Y8bd8P' 888 888 888 "Y88b + 888 888 888 888 888 888 888 X88K 888 888 888 888 + 888 888 888 888 888 Y88b 888 .d8""8b. 888 .d88P 888 d88P + 8888888 888 888 888 888 "Y88888 888 888 8888888P" 8888888P" + +` + +// Command represents the command executed by "influxd run". +type Command struct { + Stdin io.Reader + Stdout io.Writer + Stderr io.Writer + + Server *Server +} + +// NewCommand return a new instance of Command. +func NewCommand() *Command { + return &Command{ + Stdin: os.Stdin, + Stdout: os.Stdout, + Stderr: os.Stderr, + } +} + +// Run parses the config from args and runs the server. +func (cmd *Command) Run(args ...string) error { + // Parse the command line flags. + options, err := cmd.ParseFlags(args...) + if err != nil { + return err + } + + // Print sweet InfluxDB logo. + fmt.Print(logo) + + // Write the PID file. + if err := cmd.writePIDFile(options.PIDFile); err != nil { + return fmt.Errorf("write pid file: %s", err) + } + + // Set parallelism. + runtime.GOMAXPROCS(runtime.NumCPU()) + fmt.Fprintf(cmd.Stderr, "GOMAXPROCS set to %d", runtime.GOMAXPROCS(0)) + + // Parse config + config, err := cmd.ParseConfig(options.ConfigPath) + if err != nil { + return fmt.Errorf("parse config: %s", err) + } + + // Override config hostname if specified in the command line args. + if options.Hostname != "" { + config.Hostname = options.Hostname + } + // FIXME(benbjohnson): cmd.node.hostname = cmd.config.Hostname + + // Use the config JoinURLs by default + // If a -join flag was passed, these should override the config + joinURLs := config.Initialization.JoinURLs + if options.Join != "" { + joinURLs = options.Join + } + + // Normalize and validate the configuration. + config.Normalize() + if err := config.Validate(); err != nil { + return fmt.Errorf("%s. To generate a valid configuration file run `influxd config > influxdb.generated.conf`.", err) + } + + // Create server from config and start it. + s := NewServer(config, joinURLs) + if err := s.Open(); err != nil { + return fmt.Errorf("open server: %s", err) + } + cmd.Server = s + + return nil +} + +// Close shuts down the server. +func (cmd *Command) Close() error { + if cmd.Server != nil { + return cmd.Server.Close() + } + return nil +} + +// ParseFlags parses the command line flags from args and returns an options set. +func (cmd *Command) ParseFlags(args ...string) (Options, error) { + var options Options + fs := flag.NewFlagSet("", flag.ContinueOnError) + fs.StringVar(&options.ConfigPath, "config", "", "") + fs.StringVar(&options.PIDFile, "pidfile", "", "") + fs.StringVar(&options.Hostname, "hostname", "", "") + fs.StringVar(&options.Join, "join", "", "") + fs.StringVar(&options.CPUProfile, "cpuprofile", "", "") + fs.StringVar(&options.MemProfile, "memprofile", "", "") + fs.Usage = func() { fmt.Fprintln(cmd.Stderr, usage) } + if err := fs.Parse(args); err != nil { + return Options{}, err + } + return options, nil +} + +// writePIDFile writes the process ID to path. +func (cmd *Command) writePIDFile(path string) error { + // Ignore if path is not set. + if path == "" { + return nil + } + + // Ensure the required directory structure exists. + err := os.MkdirAll(filepath.Dir(path), 0777) + if err != nil { + return fmt.Errorf("mkdir: %s", err) + } + + // Retrieve the PID and write it. + pid := strconv.Itoa(os.Getpid()) + if err := ioutil.WriteFile(path, []byte(pid), 0666); err != nil { + return fmt.Errorf("write file: %s", err) + } + + return nil +} + +// ParseConfig parses the config at path. +// Returns a demo configuration if path is blank. +func (cmd *Command) ParseConfig(path string) (*Config, error) { + // Use demo configuration if no config path is specified. + if path == "" { + fmt.Fprintln(cmd.Stdout, "no configuration provided, using default settings") + return NewTestConfig() + } + + fmt.Fprintf(cmd.Stdout, "using configuration at: %s\n", path) + + config := NewConfig() + if _, err := toml.DecodeFile(path, &config); err != nil { + return nil, err + } + + return config, nil +} + +var usage = `usage: run [flags] + +run starts the broker and data node server. If this is the first time running +the command then a new cluster will be initialized unless the -join argument +is used. + + -config + Set the path to the configuration file. + + -hostname + Override the hostname, the 'hostname' configuration + option will be overridden. + + -join + Joins the server to an existing cluster. + + -pidfile + Write process ID to a file. +` + +// Options represents the command line options that can be parsed. +type Options struct { + ConfigPath string + PIDFile string + Hostname string + Join string + CPUProfile string + MemProfile string +} diff --git a/cmd/influxd/run/config.go b/cmd/influxd/run/config.go index 5f6ae1e85b..a3476d8c13 100644 --- a/cmd/influxd/run/config.go +++ b/cmd/influxd/run/config.go @@ -1,6 +1,7 @@ package run import ( + "errors" "fmt" "io" "log" @@ -9,14 +10,18 @@ import ( "os/user" "path/filepath" "strconv" - "strings" "time" "github.com/BurntSushi/toml" "github.com/influxdb/influxdb/admin" + "github.com/influxdb/influxdb/cluster" "github.com/influxdb/influxdb/collectd" "github.com/influxdb/influxdb/graphite" "github.com/influxdb/influxdb/httpd" + "github.com/influxdb/influxdb/meta" + "github.com/influxdb/influxdb/meta/continuous_querier" + "github.com/influxdb/influxdb/monitor" + "github.com/influxdb/influxdb/opentsdb" "github.com/influxdb/influxdb/tsdb" ) @@ -43,64 +48,9 @@ const ( // DefaultClusterPort represents the default port the cluster runs ons. DefaultClusterPort = 8086 - // DefaultBrokerEnabled is the default for starting a node as a broker - DefaultBrokerEnabled = true - - // DefaultDataEnabled is the default for starting a node as a data node - DefaultDataEnabled = true - - // DefaultRetentionCreatePeriod represents how often the server will check to see if new - // shard groups need to be created in advance for writing - DefaultRetentionCreatePeriod = 45 * time.Minute - - // DefaultBrokerTruncationInterval is the default period between truncating topics. - DefaultBrokerTruncationInterval = 10 * time.Minute - - // DefaultMaxTopicSize is the default maximum size in bytes a segment can consume on disk of a broker. - DefaultBrokerMaxSegmentSize = 10 * 1024 * 1024 - - // DefaultMaxTopicSize is the default maximum size in bytes a topic can consume on disk of a broker. - DefaultBrokerMaxTopicSize = 5 * DefaultBrokerMaxSegmentSize - - // DefaultRaftApplyInterval is the period between applying commited Raft log entries. - DefaultRaftApplyInterval = 10 * time.Millisecond - - // DefaultRaftElectionTimeout is the default Leader Election timeout. - DefaultRaftElectionTimeout = 1 * time.Second - - // DefaultRaftHeartbeatInterval is the interval between leader heartbeats. - DefaultRaftHeartbeatInterval = 100 * time.Millisecond - - // DefaultRaftReconnectTimeout is the default wait time between reconnections. - DefaultRaftReconnectTimeout = 10 * time.Millisecond - - // DefaultGraphiteDatabaseName is the default Graphite database if none is specified - DefaultGraphiteDatabaseName = "graphite" - // DefaultOpenTSDBDatabaseName is the default OpenTSDB database if none is specified DefaultOpenTSDBDatabaseName = "opentsdb" - // DefaultRetentionAutoCreate is the default for auto-creating retention policies - DefaultRetentionAutoCreate = true - - // DefaultRetentionCheckEnabled is the default for checking for retention policy enforcement - DefaultRetentionCheckEnabled = true - - // DefaultRetentionCheckPeriod is the period of time between retention policy checks are run - DefaultRetentionCheckPeriod = 10 * time.Minute - - // DefaultRecomputePreviousN is ??? - DefaultContinuousQueryRecomputePreviousN = 2 - - // DefaultContinuousQueryRecomputeNoOlderThan is ??? - DefaultContinuousQueryRecomputeNoOlderThan = 10 * time.Minute - - // DefaultContinuousQueryComputeRunsPerInterval is ??? - DefaultContinuousQueryComputeRunsPerInterval = 10 - - // DefaultContinousQueryComputeNoMoreThan is ??? - DefaultContinousQueryComputeNoMoreThan = 2 * time.Minute - // DefaultStatisticsEnabled is the default setting for whether internal statistics are collected DefaultStatisticsEnabled = false @@ -109,9 +59,6 @@ const ( // DefaultStatisticsRetentionPolicy is he default internal statistics rentention policy name DefaultStatisticsRetentionPolicy = "default" - - // DefaultStatisticsWriteInterval is the interval of time between internal stats are written - DefaultStatisticsWriteInterval = 1 * time.Minute ) var DefaultSnapshotURL = url.URL{ @@ -124,87 +71,34 @@ var DefaultSnapshotURL = url.URL{ // Enabled bool `toml:"enabled"` // } -// Initialization contains configuration options for the first time a node boots -type Initialization struct { - // JoinURLs are cluster URLs to use when joining a node to a cluster the first time it boots. After, - // a node is joined to a cluster, these URLS are ignored. These will be overriden at runtime if - // the node is started with the `-join` flag. - JoinURLs string `toml:"join-urls"` -} - // Config represents the configuration format for the influxd binary. type Config struct { - Hostname string `toml:"hostname"` - BindAddress string `toml:"bind-address"` - Port int `toml:"port"` - ReportingDisabled bool `toml:"reporting-disabled"` - Version string `toml:"-"` - InfluxDBVersion string `toml:"-"` + Hostname string `toml:"hostname"` + BindAddress string `toml:"bind-address"` + ReportingEnabled bool `toml:"reporting-enabled"` + // Version string `toml:"-"` + // InfluxDBVersion string `toml:"-"` - Initialization Initialization `toml:"initialization"` + Initialization struct { + // JoinURLs are cluster URLs to use when joining a node to a cluster the first time it boots. After, + // a node is joined to a cluster, these URLS are ignored. These will be overriden at runtime if + // the node is started with the `-join` flag. + JoinURLs string `toml:"join-urls"` + } `toml:"initialization"` - Authentication struct { - Enabled bool `toml:"enabled"` - } `toml:"authentication"` + Meta meta.Config `toml:"meta"` + Data tsdb.Config `toml:"data"` + Cluster cluster.Config `toml:"cluster"` - Admin admin.Config `toml:"admin"` - - HTTPAPI httpd.Config `toml:"api"` - - Graphites []Graphite `toml:"graphite"` - Collectd Collectd `toml:"collectd"` - OpenTSDB OpenTSDB `toml:"opentsdb"` - - UDP struct { - Enabled bool `toml:"enabled"` - BindAddress string `toml:"bind-address"` - Port int `toml:"port"` - } `toml:"udp"` - - Data tsdb.Config `toml:"data"` + Admin admin.Config `toml:"admin"` + HTTPD httpd.Config `toml:"api"` + Graphites []graphite.Config `toml:"graphite"` + Collectd collectd.Config `toml:"collectd"` + OpenTSDB opentsdb.Config `toml:"opentsdb"` // Snapshot SnapshotConfig `toml:"snapshot"` - - Monitoring struct { - Enabled bool `toml:"enabled"` - WriteInterval Duration `toml:"write-interval"` - } `toml:"monitoring"` - - Debugging struct { - PprofEnabled bool `toml:"pprof-enabled"` - } `toml:"debugging"` - - ContinuousQuery struct { - // when continuous queries are run we'll automatically recompute previous intervals - // in case lagged data came in. Set to zero if you never have lagged data. We do - // it this way because invalidating previously computed intervals would be insanely hard - // and expensive. - RecomputePreviousN int `toml:"recompute-previous-n"` - - // The RecomputePreviousN setting provides guidance for how far back to recompute, the RecomputeNoOlderThan - // setting sets a ceiling on how far back in time it will go. For example, if you have 2 PreviousN - // and have this set to 10m, then we'd only compute the previous two intervals for any - // CQs that have a group by time <= 5m. For all others, we'd only recompute the previous window - RecomputeNoOlderThan Duration `toml:"recompute-no-older-than"` - - // ComputeRunsPerInterval will determine how many times the current and previous N intervals - // will be computed. The group by time will be divided by this and it will get computed this many times: - // group by time seconds / runs per interval - // This will give partial results for current group by intervals and will determine how long it will - // be until lagged data is recomputed. For example, if this number is 10 and the group by time is 10m, it - // will be a minute past the previous 10m bucket of time before lagged data is picked up - ComputeRunsPerInterval int `toml:"compute-runs-per-interval"` - - // ComputeNoMoreThan paired with the RunsPerInterval will determine the ceiling of how many times smaller - // group by times will be computed. For example, if you have RunsPerInterval set to 10 and this setting - // to 1m. Then for a group by time(1m) will actually only get computed once per interval (and once per PreviousN). - // If you have a group by time(5m) then you'll get five computes per interval. Any group by time window larger - // than 10m will get computed 10 times for each interval. - ComputeNoMoreThan Duration `toml:"compute-no-more-than"` - - // If this flag is set to true, both the brokers and data nodes should ignore any CQ processing. - Disabled bool `toml:"disabled"` - } `toml:"continuous_queries"` + Monitoring monitor.Config `toml:"monitoring"` + ContinuousQuery continuous_querier.Config `toml:"continuous_queries"` } // NewConfig returns an instance of Config with reasonable defaults. @@ -212,35 +106,12 @@ func NewConfig() *Config { c := &Config{} c.Hostname = DefaultHostName c.BindAddress = DefaultBindAddress - c.Port = DefaultClusterPort - c.Data.Enabled = DefaultDataEnabled - c.Broker.Enabled = DefaultBrokerEnabled - - c.Data.RetentionAutoCreate = DefaultRetentionAutoCreate - c.Data.RetentionCheckEnabled = DefaultRetentionCheckEnabled - c.Data.RetentionCheckPeriod = Duration(DefaultRetentionCheckPeriod) - c.Data.RetentionCreatePeriod = Duration(DefaultRetentionCreatePeriod) - - c.Logging.HTTPAccess = true - c.Logging.WriteTracing = false - c.Logging.RaftTracing = false - - c.Monitoring.Enabled = false - c.Monitoring.WriteInterval = Duration(DefaultStatisticsWriteInterval) - c.ContinuousQuery.RecomputePreviousN = DefaultContinuousQueryRecomputePreviousN - c.ContinuousQuery.RecomputeNoOlderThan = Duration(DefaultContinuousQueryRecomputeNoOlderThan) - c.ContinuousQuery.ComputeRunsPerInterval = DefaultContinuousQueryComputeRunsPerInterval - c.ContinuousQuery.ComputeNoMoreThan = Duration(DefaultContinousQueryComputeNoMoreThan) - - c.Broker.TruncationInterval = Duration(DefaultBrokerTruncationInterval) - c.Broker.MaxTopicSize = DefaultBrokerMaxTopicSize - c.Broker.MaxSegmentSize = DefaultBrokerMaxSegmentSize - - c.Raft.ApplyInterval = Duration(DefaultRaftApplyInterval) - c.Raft.ElectionTimeout = Duration(DefaultRaftElectionTimeout) - c.Raft.HeartbeatInterval = Duration(DefaultRaftHeartbeatInterval) - c.Raft.ReconnectTimeout = Duration(DefaultRaftReconnectTimeout) + c.Meta = meta.NewConfig() + c.Data = tsdb.NewConfig() + c.HTTPD = httpd.NewConfig() + c.Monitoring = monitor.NewConfig() + c.ContinuousQuery = continuous_querier.NewConfig() return c } @@ -250,73 +121,47 @@ func NewConfig() *Config { func NewTestConfig() (*Config, error) { c := NewConfig() - // By default, store broker and data files in current users home directory + // By default, store meta and data files in current users home directory u, err := user.Current() if err != nil { return nil, fmt.Errorf("failed to determine current user for storage") } - c.Broker.Enabled = true - c.Broker.Dir = filepath.Join(u.HomeDir, ".influxdb/broker") - - c.Raft.ApplyInterval = Duration(DefaultRaftApplyInterval) - c.Raft.ElectionTimeout = Duration(DefaultRaftElectionTimeout) - c.Raft.HeartbeatInterval = Duration(DefaultRaftHeartbeatInterval) - c.Raft.ReconnectTimeout = Duration(DefaultRaftReconnectTimeout) - - c.Data.Enabled = true + c.Meta.Dir = filepath.Join(u.HomeDir, ".influxdb/meta") c.Data.Dir = filepath.Join(u.HomeDir, ".influxdb/data") c.Admin.Enabled = true - c.Admin.Port = 8083 + c.Admin.BindAddress = ":8083" c.Monitoring.Enabled = false - c.Snapshot.Enabled = true + //c.Snapshot.Enabled = true return c, nil } -// APIAddr returns the TCP binding address for the API server. -func (c *Config) APIAddr() string { - // Default to cluster bind address if not overriden - ba := c.BindAddress - if c.HTTPAPI.BindAddress != "" { - ba = c.HTTPAPI.BindAddress +// Normalize sets default values on config. +func (c *Config) Normalize() { + // Normalize Graphite configs. + for i, _ := range c.Graphites { + if c.Graphites[i].BindAddress == "" { + c.Graphites[i].BindAddress = c.BindAddress + } } - // Default to cluster port if not overridden - bp := c.Port - if c.HTTPAPI.Port != 0 { - bp = c.HTTPAPI.Port - } - return net.JoinHostPort(ba, strconv.Itoa(bp)) -} - -// APIAddrUDP returns the UDP address for the series listener. -func (c *Config) APIAddrUDP() string { - return net.JoinHostPort(c.UDP.BindAddress, strconv.Itoa(c.UDP.Port)) -} - -// ClusterAddr returns the binding address for the cluster -func (c *Config) ClusterAddr() string { - return net.JoinHostPort(c.BindAddress, strconv.Itoa(c.Port)) -} - -// ClusterURL returns the URL required to contact the server cluster endpoints. -func (c *Config) ClusterURL() url.URL { - return url.URL{ - Scheme: "http", - Host: net.JoinHostPort(c.Hostname, strconv.Itoa(c.Port)), + // Normalize OpenTSDB config. + if c.OpenTSDB.BindAddress == "" { + c.OpenTSDB.BindAddress = c.BindAddress } } -// BrokerDir returns the data directory to start up in and does home directory expansion if necessary. -func (c *Config) BrokerDir() string { - p, e := filepath.Abs(c.Broker.Dir) - if e != nil { - log.Fatalf("Unable to get absolute path for Broker Directory: %q", c.Broker.Dir) +// Validate returns an error if the config is invalid. +func (c *Config) Validate() error { + if c.Meta.Dir == "" { + return errors.New("Meta.Dir must be specified") + } else if c.Data.Dir == "" { + return errors.New("Data.Dir must be specified") } - return p + return nil } // DataDir returns the data directory to start up in and does home directory expansion if necessary. @@ -328,154 +173,7 @@ func (c *Config) DataDir() string { return p } -// ShardGroupPreCreateCheckPeriod returns the check interval to pre-create shard groups. -// If it was not defined in the config, it defaults to DefaultShardGroupPreCreatePeriod -func (c *Config) ShardGroupPreCreateCheckPeriod() time.Duration { - if c.Data.RetentionCreatePeriod != 0 { - return time.Duration(c.Data.RetentionCreatePeriod) - } - return DefaultRetentionCreatePeriod -} - // WriteConfigFile writes the config to the specified writer func (c *Config) Write(w io.Writer) error { return toml.NewEncoder(w).Encode(c) } - -// Size represents a TOML parseable file size. -// Users can specify size using "m" for megabytes and "g" for gigabytes. -type Size int - -// UnmarshalText parses a byte size from text. -func (s *Size) UnmarshalText(text []byte) error { - // Parse numeric portion of value. - length := len(string(text)) - size, err := strconv.ParseInt(string(text[:length-1]), 10, 64) - if err != nil { - return err - } - - // Parse unit of measure ("m", "g", etc). - switch suffix := text[len(text)-1]; suffix { - case 'm': - size *= 1 << 20 // MB - case 'g': - size *= 1 << 30 // GB - default: - return fmt.Errorf("unknown size suffix: %c", suffix) - } - - // Check for overflow. - if size > maxInt { - return fmt.Errorf("size %d cannot be represented by an int", size) - } - - *s = Size(size) - return nil -} - -// ParseConfigFile parses a configuration file at a given path. -func ParseConfigFile(path string) (*Config, error) { - c := NewConfig() - - if _, err := toml.DecodeFile(path, &c); err != nil { - return nil, err - } - return c, nil -} - -// ParseConfig parses a configuration string into a config object. -func ParseConfig(s string) (*Config, error) { - c := NewConfig() - - if _, err := toml.Decode(s, &c); err != nil { - return nil, err - } - return c, nil -} - -type Collectd struct { - BindAddress string `toml:"bind-address"` - Port uint16 `toml:"port"` - - Database string `toml:"database"` - Enabled bool `toml:"enabled"` - TypesDB string `toml:"typesdb"` -} - -// ConnnectionString returns the connection string for this collectd config in the form host:port. -func (c *Collectd) ConnectionString(defaultBindAddr string) string { - addr := c.BindAddress - // If no address specified, use default. - if addr == "" { - addr = defaultBindAddr - } - - port := c.Port - // If no port specified, use default. - if port == 0 { - port = collectd.DefaultPort - } - - return fmt.Sprintf("%s:%d", addr, port) -} - -type Graphite struct { - BindAddress string `toml:"bind-address"` - Port int `toml:"port"` - - Database string `toml:"database"` - Enabled bool `toml:"enabled"` - Protocol string `toml:"protocol"` - NamePosition string `toml:"name-position"` - NameSeparator string `toml:"name-separator"` -} - -// ConnnectionString returns the connection string for this Graphite config in the form host:port. -func (g *Graphite) ConnectionString() string { - return net.JoinHostPort(g.BindAddress, strconv.Itoa(g.Port)) -} - -// NameSeparatorString returns the character separating fields for Graphite data, or the default -// if no separator is set. -func (g *Graphite) NameSeparatorString() string { - if g.NameSeparator == "" { - return graphite.DefaultGraphiteNameSeparator - } - return g.NameSeparator -} - -func (g *Graphite) DatabaseString() string { - if g.Database == "" { - return DefaultGraphiteDatabaseName - } - return g.Database -} - -// LastEnabled returns whether the Graphite Server shoudl intepret the last field as "name". -func (g *Graphite) LastEnabled() bool { - return g.NamePosition == strings.ToLower("last") -} - -// maxInt is the largest integer representable by a word (architeture dependent). -const maxInt = int64(^uint(0) >> 1) - -type OpenTSDB struct { - Addr string `toml:"address"` - Port int `toml:"port"` - - Enabled bool `toml:"enabled"` - Database string `toml:"database"` - RetentionPolicy string `toml:"retention-policy"` -} - -func (o OpenTSDB) DatabaseString() string { - if o.Database == "" { - return DefaultOpenTSDBDatabaseName - } - return o.Database -} - -func (o OpenTSDB) ListenAddress() string { - return net.JoinHostPort(o.Addr, strconv.Itoa(o.Port)) -} diff --git a/cmd/influxd/run/config_command.go b/cmd/influxd/run/config_command.go new file mode 100644 index 0000000000..2611f116e2 --- /dev/null +++ b/cmd/influxd/run/config_command.go @@ -0,0 +1,70 @@ +package run + +import ( + "flag" + "fmt" + "io" + "os" + + "github.com/BurntSushi/toml" +) + +// PrintConfigCommand represents the command executed by "influxd config". +type PrintConfigCommand struct { + Stdin io.Reader + Stdout io.Writer + Stderr io.Writer +} + +// NewPrintConfigCommand return a new instance of PrintConfigCommand. +func NewPrintConfigCommand() *PrintConfigCommand { + return &PrintConfigCommand{ + Stdin: os.Stdin, + Stdout: os.Stdout, + Stderr: os.Stderr, + } +} + +// Run parses and prints the current config loaded. +func (cmd *PrintConfigCommand) Run(args ...string) error { + // Parse command flags. + fs := flag.NewFlagSet("", flag.ContinueOnError) + configPath := fs.String("config", "", "") + hostname := fs.String("hostname", "", "") + fs.Usage = func() { fmt.Fprintln(cmd.Stderr, printConfigUsage) } + if err := fs.Parse(args); err != nil { + return err + } + + // Parse config from path. + config, err := cmd.parseConfig(*configPath) + if err != nil { + return fmt.Errorf("parse config: %s", err) + } + + // Override config properties. + if *hostname != "" { + config.Hostname = *hostname + } + + config.Write(cmd.Stdout) + return nil +} + +// ParseConfig parses the config at path. +// Returns a demo configuration if path is blank. +func (cmd *PrintConfigCommand) parseConfig(path string) (*Config, error) { + if path == "" { + return NewTestConfig() + } + config := NewConfig() + if _, err := toml.DecodeFile(path, &config); err != nil { + return nil, err + } + return config, nil +} + +var printConfigUsage = `usage: config + + config displays the default configuration +` diff --git a/cmd/influxd/run/config_test.go b/cmd/influxd/run/config_test.go index 5fe9e56ea1..1c06e32a0f 100644 --- a/cmd/influxd/run/config_test.go +++ b/cmd/influxd/run/config_test.go @@ -1,346 +1,89 @@ package run_test import ( - "bytes" - "reflect" - "strings" "testing" - "time" "github.com/BurntSushi/toml" - "github.com/influxdb/influxdb" + "github.com/influxdb/influxdb/cmd/influxd/run" ) -// Testing configuration file. -const testFile = ` -# Welcome to the InfluxDB configuration file. +// Ensure the configuration can be parsed. +func TestConfig_Parse(t *testing.T) { + // Parse configuration. + var c run.Config + if _, err := toml.Decode(` +hostname = "localhost" +bind-address = ":8086" +reporting-enabled = true -# If hostname (on the OS) doesn't return a name that can be resolved by the other -# systems in the cluster, you'll have to set the hostname to an IP or something -# that can be resolved here. -hostname = "myserver.com" -port = 8086 +[initialization] +join-urls = "serverA,serverB" +[meta] +dir = "/tmp/meta" -# Control authentication -[authentication] -enabled = true +[data] +dir = "/tmp/data" -[logging] -write-tracing = true -raft-tracing = true +[cluster] + +[admin] +bind-address = ":8083" + +[api] +bind-address = ":8087" + +[[graphite]] +protocol = "udp" + +[[graphite]] +protocol = "tcp" + +[collectd] +bind-address = ":1000" + +[opentsdb] +bind-address = ":2000" [monitoring] enabled = true -write-interval = "1m" - -# Configure the admin server -[admin] -enabled = true -port = 8083 - -# Controls certain parameters that only take effect until an initial successful -# start-up has occurred. -[initialization] -join-urls = "http://127.0.0.1:8086" - -# Configure the http api -[api] -bind-address = "10.1.2.3" -ssl-port = 8087 # Ssl support is enabled if you set a port and cert -ssl-cert = "../cert.pem" - -# connections will timeout after this amount of time. Ensures that clients that misbehave -# and keep alive connections they don't use won't end up connection a million times. -# However, if a request is taking longer than this to complete, could be a problem. -read-timeout = "5s" - -[input_plugins] - - [input_plugins.udp] - enabled = true - port = 4444 - database = "test" - -# Configure the Graphite servers -[[graphite]] -protocol = "TCP" -enabled = true -bind-address = "192.168.0.1" -port = 2003 -database = "graphite_tcp" # store graphite data in this database -name-position = "last" -name-separator = "-" - -[[graphite]] -protocol = "udP" -enabled = true -bind-address = "192.168.0.2" -port = 2005 - -# Configure collectd server -[collectd] -enabled = true -bind-address = "192.168.0.3" -port = 25827 -database = "collectd_database" -typesdb = "foo-db-type" - -# Configure OpenTSDB server -[opentsdb] -enabled = true -address = "192.168.0.3" -port = 4242 -database = "opentsdb_database" -retention-policy = "raw" - -# Broker configuration -[broker] -# The broker port should be open between all servers in a cluster. -# However, this port shouldn't be accessible from the internet. -enabled = false - -# Where the broker logs are stored. The user running InfluxDB will need read/write access. -dir = "/tmp/influxdb/development/broker" - -# Raft distributed consensus -[raft] -apply-interval = "10ms" -election-timeout = "1s" - -[data] -dir = "/tmp/influxdb/development/db" -retention-auto-create = false -retention-check-enabled = true -retention-check-period = "5m" -enabled = false [continuous_queries] -disabled = true - -[snapshot] enabled = true -` +`, &c); err != nil { + t.Fatal(err) + } -// Ensure that megabyte sizes can be parsed. -func TestSize_UnmarshalText_MB(t *testing.T) { - var s influxdb.Size - if err := s.UnmarshalText([]byte("200m")); err != nil { - t.Fatalf("unexpected error: %s", err) - } else if s != 200*(1<<20) { - t.Fatalf("unexpected size: %d", s) - } -} - -// Ensure that gigabyte sizes can be parsed. -func TestSize_UnmarshalText_GB(t *testing.T) { - if typ := reflect.TypeOf(0); typ.Size() != 8 { - t.Skip("large gigabyte parsing on 64-bit arch only") - } - - var s influxdb.Size - if err := s.UnmarshalText([]byte("10g")); err != nil { - t.Fatalf("unexpected error: %s", err) - } else if s != 10*(1<<30) { - t.Fatalf("unexpected size: %d", s) - } -} - -// Ensure that a TOML configuration file can be parsed into a Config. -func TestParseConfig(t *testing.T) { - c, err := influxdb.ParseConfig(testFile) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } else if c.Hostname != "myserver.com" { - t.Fatalf("hostname mismatch: %v", c.Hostname) - } - - if exp := 8086; c.Port != exp { - t.Fatalf("port mismatch. got %v, exp %v", c.Port, exp) - } - - if c.Initialization.JoinURLs != "http://127.0.0.1:8086" { - t.Fatalf("JoinURLs mistmatch: %v", c.Initialization.JoinURLs) - } - - if !c.Authentication.Enabled { - t.Fatalf("authentication enabled mismatch: %v", c.Authentication.Enabled) - } - - if exp := "10.1.2.3"; c.HTTPAPI.BindAddress != exp { - t.Fatalf("http api bind-address mismatch: got %v, exp %v", c.HTTPAPI.BindAddress, exp) - } - - if c.UDP.Enabled { - t.Fatalf("udp enabled mismatch: %v", c.UDP.Enabled) - } - - if c.Admin.Enabled != true { - t.Fatalf("admin enabled mismatch: %v", c.Admin.Enabled) - } - - if c.Admin.Port != 8083 { - t.Fatalf("admin port mismatch: %v", c.Admin.Port) - } - - if c.ContinuousQuery.Disabled != true { - t.Fatalf("continuous query disable mismatch: %v", c.ContinuousQuery.Disabled) - } - - if len(c.Graphites) != 2 { - t.Fatalf("graphites mismatch. expected %v, got: %v", 2, len(c.Graphites)) - } - - tcpGraphite := c.Graphites[0] - switch { - case tcpGraphite.Enabled != true: - t.Fatalf("graphite tcp enabled mismatch: expected: %v, got %v", true, tcpGraphite.Enabled) - case tcpGraphite.BindAddress != "192.168.0.1": - t.Fatalf("graphite tcp address mismatch: expected %v, got %v", "192.168.0.1", tcpGraphite.BindAddress) - case tcpGraphite.Port != 2003: - t.Fatalf("graphite tcp port mismatch: expected %v, got %v", 2003, tcpGraphite.Port) - case tcpGraphite.Database != "graphite_tcp": - t.Fatalf("graphite tcp database mismatch: expected %v, got %v", "graphite_tcp", tcpGraphite.Database) - case strings.ToLower(tcpGraphite.Protocol) != "tcp": - t.Fatalf("graphite tcp protocol mismatch: expected %v, got %v", "tcp", strings.ToLower(tcpGraphite.Protocol)) - case tcpGraphite.LastEnabled() != true: - t.Fatalf("graphite tcp name-position mismatch: expected %v, got %v", "last", tcpGraphite.NamePosition) - case tcpGraphite.NameSeparatorString() != "-": - t.Fatalf("graphite tcp name-separator mismatch: expected %v, got %v", "-", tcpGraphite.NameSeparatorString()) - } - - udpGraphite := c.Graphites[1] - switch { - case udpGraphite.Enabled != true: - t.Fatalf("graphite udp enabled mismatch: expected: %v, got %v", true, udpGraphite.Enabled) - case udpGraphite.BindAddress != "192.168.0.2": - t.Fatalf("graphite udp address mismatch: expected %v, got %v", "192.168.0.2", udpGraphite.BindAddress) - case udpGraphite.Port != 2005: - t.Fatalf("graphite udp port mismatch: expected %v, got %v", 2005, udpGraphite.Port) - case udpGraphite.DatabaseString() != "graphite": - t.Fatalf("graphite database mismatch: expected %v, got %v", "graphite", udpGraphite.Database) - case strings.ToLower(udpGraphite.Protocol) != "udp": - t.Fatalf("graphite udp protocol mismatch: expected %v, got %v", "udp", strings.ToLower(udpGraphite.Protocol)) - } - - switch { - case c.Collectd.Enabled != true: - t.Errorf("collectd enabled mismatch: expected: %v, got %v", true, c.Collectd.Enabled) - case c.Collectd.BindAddress != "192.168.0.3": - t.Errorf("collectd address mismatch: expected %v, got %v", "192.168.0.3", c.Collectd.BindAddress) - case c.Collectd.Port != 25827: - t.Errorf("collectd port mismatch: expected %v, got %v", 2005, c.Collectd.Port) - case c.Collectd.Database != "collectd_database": - t.Errorf("collectdabase mismatch: expected %v, got %v", "collectd_database", c.Collectd.Database) - case c.Collectd.TypesDB != "foo-db-type": - t.Errorf("collectd typesdb mismatch: expected %v, got %v", "foo-db-type", c.Collectd.TypesDB) - } - - switch { - case c.OpenTSDB.Enabled != true: - t.Errorf("opentsdb enabled mismatch: expected: %v, got %v", true, c.OpenTSDB.Enabled) - case c.OpenTSDB.ListenAddress() != "192.168.0.3:4242": - t.Errorf("opentsdb listen address mismatch: expected %v, got %v", "192.168.0.3:4242", c.OpenTSDB.ListenAddress()) - case c.OpenTSDB.DatabaseString() != "opentsdb_database": - t.Errorf("opentsdb database mismatch: expected %v, got %v", "opentsdb_database", c.OpenTSDB.DatabaseString()) - case c.OpenTSDB.RetentionPolicy != "raw": - t.Errorf("collectd retention-policy mismatch: expected %v, got %v", "foo-db-type", c.OpenTSDB.RetentionPolicy) - } - - if c.Broker.Dir != "/tmp/influxdb/development/broker" { - t.Fatalf("broker dir mismatch: %v", c.Broker.Dir) - } - - if c.Broker.Enabled != false { - t.Fatalf("broker disabled mismatch: %v, got: %v", false, c.Broker.Enabled) - } - - if c.Raft.ApplyInterval != influxdb.Duration(10*time.Millisecond) { - t.Fatalf("Raft apply interval mismatch: %v, got %v", 10*time.Millisecond, c.Raft.ApplyInterval) - } - - if c.Data.Dir != "/tmp/influxdb/development/db" { - t.Fatalf("data dir mismatch: %v", c.Data.Dir) - } - if c.Data.RetentionCheckEnabled != true { - t.Fatalf("Retention check enabled mismatch: %v", c.Data.RetentionCheckEnabled) - } - if c.Data.RetentionCheckPeriod != influxdb.Duration(5*time.Minute) { - t.Fatalf("Retention check period mismatch: %v", c.Data.RetentionCheckPeriod) - } - - if c.Data.Enabled != false { - t.Fatalf("data disabled mismatch: %v, got: %v", false, c.Data.Enabled) - } - - if c.Monitoring.WriteInterval.String() != "1m0s" { - t.Fatalf("Monitoring.WriteInterval mismatch: %v", c.Monitoring.WriteInterval) - } - - if !c.Snapshot.Enabled { - t.Fatalf("snapshot enabled mismatch: %v, got %v", true, c.Snapshot.Enabled) - } - - // TODO: UDP Servers testing. - /* - c.Assert(config.UdpServers, HasLen, 1) - c.Assert(config.UdpServers[0].Enabled, Equals, true) - c.Assert(config.UdpServers[0].Port, Equals, 4444) - c.Assert(config.UdpServers[0].Database, Equals, "test") - */ -} - -func TestEncodeConfig(t *testing.T) { - c := influxdb.Config{} - c.Monitoring.WriteInterval = influxdb.Duration(time.Minute) - buf := new(bytes.Buffer) - if err := toml.NewEncoder(buf).Encode(&c); err != nil { - t.Fatal("Failed to encode: ", err) - } - got, search := buf.String(), `write-interval = "1m0s"` - if !strings.Contains(got, search) { - t.Fatalf("Encoding config failed.\nfailed to find %s in:\n%s\n", search, got) - } -} - -func TestCollectd_ConnectionString(t *testing.T) { - var tests = []struct { - name string - defaultBindAddr string - connectionString string - config influxdb.Collectd - }{ - { - name: "No address or port provided from config", - defaultBindAddr: "192.168.0.1", - connectionString: "192.168.0.1:25826", - config: influxdb.Collectd{}, - }, - { - name: "address provided, no port provided from config", - defaultBindAddr: "192.168.0.1", - connectionString: "192.168.0.2:25826", - config: influxdb.Collectd{BindAddress: "192.168.0.2"}, - }, - { - name: "no address provided, port provided from config", - defaultBindAddr: "192.168.0.1", - connectionString: "192.168.0.1:25827", - config: influxdb.Collectd{Port: 25827}, - }, - { - name: "both address and port provided from config", - defaultBindAddr: "192.168.0.1", - connectionString: "192.168.0.2:25827", - config: influxdb.Collectd{BindAddress: "192.168.0.2", Port: 25827}, - }, - } - - for _, test := range tests { - t.Logf("test: %q", test.name) - s := test.config.ConnectionString(test.defaultBindAddr) - if s != test.connectionString { - t.Errorf("connection string mismatch, expected: %q, got: %q", test.connectionString, s) - } + // Validate configuration. + if c.Hostname != "localhost" { + t.Fatalf("unexpected hostname: %v", c.Hostname) + } else if c.BindAddress != ":8086" { + t.Fatalf("unexpected bind address: %s", c.BindAddress) + } else if c.ReportingEnabled != true { + t.Fatalf("unexpected reporting enabled: %v", c.ReportingEnabled) + } else if c.Initialization.JoinURLs != "serverA,serverB" { + t.Fatalf("unexpected join urls: %s", c.Initialization.JoinURLs) + } else if c.Meta.Dir != "/tmp/meta" { + t.Fatalf("unexpected meta dir: %s", c.Meta.Dir) + } else if c.Data.Dir != "/tmp/data" { + t.Fatalf("unexpected data dir: %s", c.Data.Dir) + } else if c.Admin.BindAddress != ":8083" { + t.Fatalf("unexpected admin bind address: %s", c.Admin.BindAddress) + } else if c.HTTPD.BindAddress != ":8087" { + t.Fatalf("unexpected api bind address: %s", c.HTTPD.BindAddress) + } else if len(c.Graphites) != 2 { + t.Fatalf("unexpected graphites count: %d", len(c.Graphites)) + } else if c.Graphites[0].Protocol != "udp" { + t.Fatalf("unexpected graphite protocol(0): %s", c.Graphites[0].Protocol) + } else if c.Graphites[1].Protocol != "tcp" { + t.Fatalf("unexpected graphite protocol(1): %s", c.Graphites[1].Protocol) + } else if c.Collectd.BindAddress != ":1000" { + t.Fatalf("unexpected collectd bind address: %s", c.Collectd.BindAddress) + } else if c.OpenTSDB.BindAddress != ":2000" { + t.Fatalf("unexpected opentsdb bind address: %s", c.OpenTSDB.BindAddress) + } else if c.Monitoring.Enabled != true { + t.Fatalf("unexpected monitoring enabled: %v", c.Monitoring.Enabled) + } else if c.ContinuousQuery.Enabled != true { + t.Fatalf("unexpected continuous query enabled: %v", c.ContinuousQuery.Enabled) } } diff --git a/cmd/influxd/run/run.go b/cmd/influxd/run/run.go deleted file mode 100644 index d415f54d34..0000000000 --- a/cmd/influxd/run/run.go +++ /dev/null @@ -1,44 +0,0 @@ -package run - -import ( - "log" - "os" - - "github.com/influxdb/influxdb" -) - -type RunCommand struct { - // The logger passed to the ticker during execution. - logWriter *os.File - config *influxdb.Config - hostname string - node *Node -} - -func NewRunCommand() *RunCommand { - return &RunCommand{ - node: &Node{}, - } -} - -func printRunUsage() { - log.Printf(`usage: run [flags] - -run starts the broker and data node server. If this is the first time running -the command then a new cluster will be initialized unless the -join argument -is used. - - -config - Set the path to the configuration file. - - -hostname - Override the hostname, the 'hostname' configuration - option will be overridden. - - -join - Joins the server to an existing cluster. - - -pidfile - Write process ID to a file. -`) -} diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index fffd67f501..9a51079073 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -1,27 +1,12 @@ package run import ( - "flag" "fmt" - "io/ioutil" - "log" "net" - "net/http" - "net/url" - "os" - "path/filepath" - "runtime" - "strconv" - "strings" - "time" - "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/admin" "github.com/influxdb/influxdb/cluster" - "github.com/influxdb/influxdb/collectd" - "github.com/influxdb/influxdb/graphite" "github.com/influxdb/influxdb/meta" - "github.com/influxdb/influxdb/opentsdb" "github.com/influxdb/influxdb/tsdb" ) @@ -33,11 +18,11 @@ type Server struct { } // NewServer returns a new instance of Server built from a config. -func NewServer(c *influxdb.Config) (*Server, error) { +func NewServer(c *Config, joinURLs string) *Server { // Construct base meta store and data store. s := &Server{ - MetaStore: meta.NewStore(filepath.Join(path, "meta")), - TSDBStore: tsdb.NewStore(filepath.Join(path, "data")), + MetaStore: meta.NewStore(c.Meta.Dir), + TSDBStore: tsdb.NewStore(c.Data.Dir), } // Add cluster Service @@ -100,6 +85,14 @@ func (s *Server) Close() error { return nil } +// Service represents a service attached to the server. +type Service interface { + Open() error + Close() error + Addr() net.Addr +} + +/* type Node struct { Server *influxdb.Server @@ -232,119 +225,7 @@ func (s *Node) closeClusterListener() error { return err } -func (cmd *RunCommand) ParseConfig(path string) error { - // Parse configuration file from disk. - if path != "" { - var err error - cmd.config, err = ParseConfigFile(path) - if err != nil { - return fmt.Errorf("error parsing configuration %s - %s\n", path, err) - } - log.Printf("using configuration at: %s\n", path) - } else { - var err error - cmd.config, err = NewTestConfig() - if err != nil { - return fmt.Errorf("error parsing default config: %s\n", err) - } - - log.Println("no configuration provided, using default settings") - } - - // Override config properties. - if cmd.hostname != "" { - cmd.config.Hostname = cmd.hostname - } - cmd.node.hostname = cmd.config.Hostname - return nil -} - -func (cmd *RunCommand) Run(args ...string) error { - // Parse command flags. - fs := flag.NewFlagSet("", flag.ExitOnError) - var configPath, pidfile, hostname, join, cpuprofile, memprofile string - - fs.StringVar(&configPath, "config", "", "") - fs.StringVar(&pidfile, "pidfile", "", "") - fs.StringVar(&hostname, "hostname", "", "") - fs.StringVar(&join, "join", "", "") - fs.StringVar(&cpuprofile, "cpuprofile", "", "") - fs.StringVar(&memprofile, "memprofile", "", "") - - fs.Usage = printRunUsage - fs.Parse(args) - cmd.hostname = hostname - - // Start profiling, if set. - startProfiling(cpuprofile, memprofile) - defer stopProfiling() - - // Print sweet InfluxDB logo and write the process id to file. - fmt.Print(logo) - writePIDFile(pidfile) - - // Set parallelism. - runtime.GOMAXPROCS(runtime.NumCPU()) - log.Printf("GOMAXPROCS set to %d", runtime.GOMAXPROCS(0)) - - // Parse config - if err := cmd.ParseConfig(configPath); err != nil { - log.Fatal(err) - } - - // Use the config JoinURLs by default - joinURLs := cmd.config.Initialization.JoinURLs - - // If a -join flag was passed, these should override the config - if join != "" { - joinURLs = join - } - cmd.CheckConfig() - cmd.Open(cmd.config, joinURLs) - - // Wait indefinitely. - <-(chan struct{})(nil) - return nil -} - -// CheckConfig validates the configuration -func (cmd *RunCommand) CheckConfig() { - // Set any defaults that aren't set - // TODO: bring more defaults in here instead of letting helpers do it - - // Normalize Graphite configs - for i, _ := range cmd.config.Graphites { - if cmd.config.Graphites[i].BindAddress == "" { - cmd.config.Graphites[i].BindAddress = cmd.config.BindAddress - } - if cmd.config.Graphites[i].Port == 0 { - cmd.config.Graphites[i].Port = graphite.DefaultGraphitePort - } - } - - // Normalize openTSDB config - if cmd.config.OpenTSDB.Addr == "" { - cmd.config.OpenTSDB.Addr = cmd.config.BindAddress - } - - if cmd.config.OpenTSDB.Port == 0 { - cmd.config.OpenTSDB.Port = opentsdb.DefaultPort - } - - // Validate that we have a sane config - if !(cmd.config.Data.Enabled || cmd.config.Broker.Enabled) { - log.Fatal("Node must be configured as a broker node, data node, or as both. To generate a valid configuration file run `influxd config > influxdb.generated.conf`.") - } - - if cmd.config.Broker.Enabled && cmd.config.Broker.Dir == "" { - log.Fatal("Broker.Dir must be specified. To generate a valid configuration file run `influxd config > influxdb.generated.conf`.") - } - - if cmd.config.Data.Enabled && cmd.config.Data.Dir == "" { - log.Fatal("Data.Dir must be specified. To generate a valid configuration file run `influxd config > influxdb.generated.conf`.") - } -} func (cmd *RunCommand) Open(config *Config, join string) *Node { if config != nil { @@ -496,7 +377,7 @@ func (cmd *RunCommand) Open(config *Config, join string) *Node { } // unless disabled, start the loop to report anonymous usage stats every 24h - if !cmd.config.ReportingDisabled { + if cmd.config.ReportingEnabled { if cmd.config.Broker.Enabled && cmd.config.Data.Enabled { // Make sure we have a config object b4 we try to use it. if clusterID := cmd.node.Broker.Broker.ClusterID(); clusterID != 0 { @@ -531,24 +412,6 @@ func (cmd *RunCommand) Close() { cmd.node.Close() } -// write the current process id to a file specified by path. -func writePIDFile(path string) { - if path == "" { - return - } - - // Ensure the required directory structure exists. - err := os.MkdirAll(filepath.Dir(path), 0755) - if err != nil { - log.Fatal(err) - } - - // Retrieve the PID and write it. - pid := strconv.Itoa(os.Getpid()) - if err := ioutil.WriteFile(path, []byte(pid), 0644); err != nil { - log.Fatal(err) - } -} // creates and initializes a broker. func (cmd *RunCommand) openBroker(brokerURLs []url.URL, h *Handler) { @@ -734,3 +597,4 @@ func fileExists(path string) bool { } return true } +*/ diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index 4ab7074859..779e26b7bf 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -1,5 +1,6 @@ package run_test +/* import ( "bytes" "encoding/json" @@ -113,7 +114,7 @@ func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes int c.Port = basePort c.Admin.Port = 0 c.Admin.Enabled = false - c.ReportingDisabled = true + c.ReportingEnabled = false c.Snapshot.Enabled = false c.Logging.HTTPAccess = false @@ -1229,7 +1230,7 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent expected: `{"results":[{"series":[{"name":"cpu","columns":["_id","host","region"],"values":[[2,"server01","uswest"]]}]}]}`, }, { - query: "SHOW SERIES WHERE region =~ /ca.*/", + query: "SHOW SERIES WHERE region =~ /ca.*./", queryDb: "%DB%", expected: `{"results":[{"series":[{"name":"gpu","columns":["_id","host","region"],"values":[[6,"server03","caeast"]]}]}]}`, }, @@ -1260,12 +1261,12 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent expected: `{"results":[{"series":[{"name":"measurements","columns":["name"],"values":[["cpu"],["gpu"]]}]}]}`, }, { - query: "SHOW MEASUREMENTS WHERE region =~ /ca.*/", + query: "SHOW MEASUREMENTS WHERE region =~ /ca.*./", queryDb: "%DB%", expected: `{"results":[{"series":[{"name":"measurements","columns":["name"],"values":[["gpu"],["other"]]}]}]}`, }, { - query: "SHOW MEASUREMENTS WHERE region !~ /ca.*/", + query: "SHOW MEASUREMENTS WHERE region !~ /ca.*./", queryDb: "%DB%", expected: `{"results":[{"series":[{"name":"measurements","columns":["name"],"values":[["cpu"]]}]}]}`, }, @@ -1320,7 +1321,7 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent expected: `{"results":[{"series":[{"name":"hostTagValues","columns":["host"],"values":[["server01"]]}]}]}`, }, { - query: `SHOW TAG VALUES WITH KEY = host WHERE region =~ /ca.*/`, + query: `SHOW TAG VALUES WITH KEY = host WHERE region =~ /ca.*./`, queryDb: "%DB%", expected: `{"results":[{"series":[{"name":"hostTagValues","columns":["host"],"values":[["server03"]]}]}]}`, }, @@ -2341,14 +2342,14 @@ func TestSeparateBrokerDataNode(t *testing.T) { brokerConfig.Admin.Enabled = false brokerConfig.Data.Enabled = false brokerConfig.Broker.Dir = filepath.Join(tmpBrokerDir, strconv.Itoa(brokerConfig.Port)) - brokerConfig.ReportingDisabled = true + brokerConfig.ReportingEnabled = false dataConfig := main.NewConfig() dataConfig.Port = 0 dataConfig.Admin.Enabled = false dataConfig.Broker.Enabled = false dataConfig.Data.Dir = filepath.Join(tmpDataDir, strconv.Itoa(dataConfig.Port)) - dataConfig.ReportingDisabled = true + dataConfig.ReportingEnabled = false brokerCmd := main.NewRunCommand() broker := brokerCmd.Open(brokerConfig, "") @@ -2394,7 +2395,7 @@ func TestSeparateBrokerTwoDataNodes(t *testing.T) { brokerConfig.Admin.Enabled = false brokerConfig.Data.Enabled = false brokerConfig.Broker.Dir = filepath.Join(tmpBrokerDir, "1") - brokerConfig.ReportingDisabled = true + brokerConfig.ReportingEnabled = false brokerCmd := main.NewRunCommand() broker := brokerCmd.Open(brokerConfig, "") @@ -2413,7 +2414,7 @@ func TestSeparateBrokerTwoDataNodes(t *testing.T) { dataConfig1.Admin.Enabled = false dataConfig1.Broker.Enabled = false dataConfig1.Data.Dir = filepath.Join(tmpDataDir, "1") - dataConfig1.ReportingDisabled = true + dataConfig1.ReportingEnabled = false dataCmd1 := main.NewRunCommand() @@ -2430,7 +2431,7 @@ func TestSeparateBrokerTwoDataNodes(t *testing.T) { dataConfig2.Admin.Enabled = false dataConfig2.Broker.Enabled = false dataConfig2.Data.Dir = filepath.Join(tmpDataDir, "2") - dataConfig2.ReportingDisabled = true + dataConfig2.ReportingEnabled = false dataCmd2 := main.NewRunCommand() @@ -2462,3 +2463,4 @@ func mustMarshalJSON(v interface{}) string { func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) } func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) } +*/ diff --git a/collectd/config.go b/collectd/config.go new file mode 100644 index 0000000000..dfa30ba5b7 --- /dev/null +++ b/collectd/config.go @@ -0,0 +1,8 @@ +package collectd + +type Config struct { + Enabled bool `toml:"enabled"` + BindAddress string `toml:"bind-address"` + Database string `toml:"database"` + TypesDB string `toml:"typesdb"` +} diff --git a/collectd/config_test.go b/collectd/config_test.go new file mode 100644 index 0000000000..4f4d6f5766 --- /dev/null +++ b/collectd/config_test.go @@ -0,0 +1,32 @@ +package collectd_test + +import ( + "testing" + + "github.com/BurntSushi/toml" + "github.com/influxdb/influxdb/collectd" +) + +func TestConfig_Parse(t *testing.T) { + // Parse configuration. + var c collectd.Config + if _, err := toml.Decode(` +enabled = true +bind-address = ":9000" +database = "xxx" +typesdb = "yyy" +`, &c); err != nil { + t.Fatal(err) + } + + // Validate configuration. + if c.Enabled != true { + t.Fatalf("unexpected enabled: %v", c.Enabled) + } else if c.BindAddress != ":9000" { + t.Fatalf("unexpected bind address: %s", c.BindAddress) + } else if c.Database != "xxx" { + t.Fatalf("unexpected database: %s", c.Database) + } else if c.TypesDB != "yyy" { + t.Fatalf("unexpected types db: %s", c.TypesDB) + } +} diff --git a/graphite/config.go b/graphite/config.go new file mode 100644 index 0000000000..dede73562e --- /dev/null +++ b/graphite/config.go @@ -0,0 +1,34 @@ +package graphite + +import "strings" + +const ( + // DefaultDatabase is the default database if none is specified. + DefaultDatabase = "graphite" + + // DefaultNameSeparator represents the default field separator. + DefaultNameSeparator = "." +) + +// Config represents the configuration for Graphite endpoints. +type Config struct { + BindAddress string `toml:"bind-address"` + Database string `toml:"database"` + Enabled bool `toml:"enabled"` + Protocol string `toml:"protocol"` + NamePosition string `toml:"name-position"` + NameSeparator string `toml:"name-separator"` +} + +// NewConfig returns a new Config with defaults. +func NewConfig() Config { + return Config{ + Database: DefaultDatabase, + NameSeparator: DefaultNameSeparator, + } +} + +// LastEnabled returns whether the server should interpret the last field as "name". +func (c *Config) LastEnabled() bool { + return c.NamePosition == strings.ToLower("last") +} diff --git a/graphite/config_test.go b/graphite/config_test.go new file mode 100644 index 0000000000..63f1cb8d87 --- /dev/null +++ b/graphite/config_test.go @@ -0,0 +1,35 @@ +package graphite_test + +import ( + "testing" + + "github.com/BurntSushi/toml" + "github.com/influxdb/influxdb/httpd" +) + +func TestConfig_Parse(t *testing.T) { + // Parse configuration. + var c httpd.Config + if _, err := toml.Decode(` +bind-address = ":8080" +auth-enabled = true +log-enabled = true +write-tracing = true +pprof-enabled = true +`, &c); err != nil { + t.Fatal(err) + } + + // Validate configuration. + if c.BindAddress != ":8080" { + t.Fatalf("unexpected bind address: %s", c.BindAddress) + } else if c.AuthEnabled != true { + t.Fatalf("unexpected auth enabled: %v", c.AuthEnabled) + } else if c.LogEnabled != true { + t.Fatalf("unexpected log enabled: %v", c.LogEnabled) + } else if c.WriteTracing != true { + t.Fatalf("unexpected write tracing: %v", c.WriteTracing) + } else if c.PprofEnabled != true { + t.Fatalf("unexpected pprof enabled: %v", c.PprofEnabled) + } +} diff --git a/graphite/graphite.go b/graphite/graphite.go index 996e6301ae..694b4f0912 100644 --- a/graphite/graphite.go +++ b/graphite/graphite.go @@ -14,9 +14,6 @@ import ( const ( // DefaultGraphitePort represents the default Graphite (Carbon) plaintext port. DefaultGraphitePort = 2003 - - // DefaultGraphiteNameSeparator represents the default Graphite field separator. - DefaultGraphiteNameSeparator = "." ) var ( @@ -63,7 +60,7 @@ type Parser struct { // NewParser returns a GraphiteParser instance. func NewParser() *Parser { - return &Parser{Separator: DefaultGraphiteNameSeparator} + return &Parser{Separator: DefaultNameSeparator} } // Parse performs Graphite parsing of a single line. diff --git a/httpd/config.go b/httpd/config.go new file mode 100644 index 0000000000..fa2c961d73 --- /dev/null +++ b/httpd/config.go @@ -0,0 +1,16 @@ +package httpd + +type Config struct { + BindAddress string `toml:"bind-address"` + AuthEnabled bool `toml:"auth-enabled"` + LogEnabled bool `toml:"log-enabled"` + WriteTracing bool `toml:"write-tracing"` + PprofEnabled bool `toml:"pprof-enabled"` +} + +func NewConfig() Config { + return Config{ + LogEnabled: true, + WriteTracing: false, + } +} diff --git a/httpd/config_test.go b/httpd/config_test.go new file mode 100644 index 0000000000..a856f98128 --- /dev/null +++ b/httpd/config_test.go @@ -0,0 +1,35 @@ +package httpd_test + +import ( + "testing" + + "github.com/BurntSushi/toml" + "github.com/influxdb/influxdb/httpd" +) + +func TestConfig_Parse(t *testing.T) { + // Parse configuration. + var c httpd.Config + if _, err := toml.Decode(` +bind-address = ":8080" +auth-enabled = true +log-enabled = true +write-tracing = true +pprof-enabled = true +`, &c); err != nil { + t.Fatal(err) + } + + // Validate configuration. + if c.BindAddress != ":8080" { + t.Fatalf("unexpected bind address: %s", c.BindAddress) + } else if c.AuthEnabled != true { + t.Fatalf("unexpected auth enabled: %v", c.AuthEnabled) + } else if c.LogEnabled != true { + t.Fatalf("unexpected log enabled: %v", c.LogEnabled) + } else if c.WriteTracing != true { + t.Fatalf("unexpected write tracing: %v", c.WriteTracing) + } else if c.PprofEnabled != true { + t.Fatalf("unexpected pprof enabled: %v", c.PprofEnabled) + } +} diff --git a/httpd/service.go b/httpd/service.go index 5b4d23d9be..8d8e26b03c 100644 --- a/httpd/service.go +++ b/httpd/service.go @@ -7,8 +7,8 @@ import ( "strings" ) -// HTTPService manages the listener and handler for an HTTP endpoint. -type HTTPService struct { +// Service manages the listener and handler for an HTTP endpoint. +type Service struct { listener net.Listener addr string err chan error @@ -16,16 +16,16 @@ type HTTPService struct { Handler Handler } -// NewHTTPService returns a new instance of HTTPService. -func NewHTTPService(c *Config) *HTTPService { - return &HTTPService{ +// NewService returns a new instance of Service. +func NewService(c *Config) *Service { + return &Service{ addr: c.BindAddress, err: make(chan error), } } // Open starts the service -func (s *HTTPService) Open() error { +func (s *Service) Open() error { // Open listener. listener, err := net.Listen("tcp", s.addr) if err != nil { @@ -39,7 +39,7 @@ func (s *HTTPService) Open() error { } // Close closes the underlying listener. -func (s *HTTPService) Close() error { +func (s *Service) Close() error { if s.listener != nil { return s.listener.Close() } @@ -47,10 +47,10 @@ func (s *HTTPService) Close() error { } // Err returns a channel for fatal errors that occur on the listener. -func (s *HTTPService) Err() <-chan error { return s.err } +func (s *Service) Err() <-chan error { return s.err } // Addr returns the listener's address. Returns nil if listener is closed. -func (s *HTTPService) Addr() net.Addr { +func (s *Service) Addr() net.Addr { if s.listener != nil { return s.listener.Addr() } @@ -58,7 +58,7 @@ func (s *HTTPService) Addr() net.Addr { } // serve serves the handler from the listener. -func (s *HTTPService) serve() { +func (s *Service) serve() { // The listener was closed so exit // See https://github.com/golang/go/issues/4373 err := http.Serve(s.listener, &s.Handler) @@ -66,10 +66,3 @@ func (s *HTTPService) serve() { s.err <- fmt.Errorf("listener failed: addr=%s, err=%s", s.Addr(), err) } } - -type Config struct { - BindAddress string `toml:"bind-address"` - Port int `toml:"port"` - LogEnabled bool `toml:"log-enabled"` - WriteTracing bool `toml:"write-tracing"` -} diff --git a/meta/config.go b/meta/config.go new file mode 100644 index 0000000000..1c2cf1d4cf --- /dev/null +++ b/meta/config.go @@ -0,0 +1,39 @@ +package meta + +import ( + "time" + + "github.com/influxdb/influxdb/toml" +) + +const ( + // DefaultHeartbeatTimeout is the default heartbeat timeout for the store. + DefaultHeartbeatTimeout = 1000 * time.Millisecond + + // DefaultElectionTimeout is the default election timeout for the store. + DefaultElectionTimeout = 1000 * time.Millisecond + + // DefaultLeaderLeaseTimeout is the default leader lease for the store. + DefaultLeaderLeaseTimeout = 500 * time.Millisecond + + // DefaultCommitTimeout is the default commit timeout for the store. + DefaultCommitTimeout = 50 * time.Millisecond +) + +// Config represents the meta configuration. +type Config struct { + Dir string `toml:"dir"` + ElectionTimeout toml.Duration `toml:"election-timeout"` + HeartbeatTimeout toml.Duration `toml:"heartbeat-timeout"` + LeaderLeaseTimeout toml.Duration `toml:"leader-lease-timeout"` + CommitTimeout toml.Duration `toml:"commit-timeout"` +} + +func NewConfig() Config { + return Config{ + ElectionTimeout: toml.Duration(DefaultElectionTimeout), + HeartbeatTimeout: toml.Duration(DefaultHeartbeatTimeout), + LeaderLeaseTimeout: toml.Duration(DefaultLeaderLeaseTimeout), + CommitTimeout: toml.Duration(DefaultCommitTimeout), + } +} diff --git a/meta/config_test.go b/meta/config_test.go new file mode 100644 index 0000000000..dc4337e5e3 --- /dev/null +++ b/meta/config_test.go @@ -0,0 +1,36 @@ +package meta_test + +import ( + "testing" + "time" + + "github.com/BurntSushi/toml" + "github.com/influxdb/influxdb/meta" +) + +func TestConfig_Parse(t *testing.T) { + // Parse configuration. + var c meta.Config + if _, err := toml.Decode(` +dir = "/tmp/foo" +election-timeout = "10s" +heartbeat-timeout = "20s" +leader-lease-timeout = "30h" +commit-timeout = "40m" +`, &c); err != nil { + t.Fatal(err) + } + + // Validate configuration. + if c.Dir != "/tmp/foo" { + t.Fatalf("unexpected dir: %s", c.Dir) + } else if time.Duration(c.ElectionTimeout) != 10*time.Second { + t.Fatalf("unexpected election timeout: %v", c.ElectionTimeout) + } else if time.Duration(c.HeartbeatTimeout) != 20*time.Second { + t.Fatalf("unexpected heartbeat timeout: %v", c.HeartbeatTimeout) + } else if time.Duration(c.LeaderLeaseTimeout) != 30*time.Hour { + t.Fatalf("unexpected leader lease timeout: %v", c.LeaderLeaseTimeout) + } else if time.Duration(c.CommitTimeout) != 40*time.Minute { + t.Fatalf("unexpected commit timeout: %v", c.CommitTimeout) + } +} diff --git a/meta/continuous_querier/config.go b/meta/continuous_querier/config.go new file mode 100644 index 0000000000..d6f08aea77 --- /dev/null +++ b/meta/continuous_querier/config.go @@ -0,0 +1,61 @@ +package continuous_querier + +import ( + "time" + + "github.com/influxdb/influxdb/toml" +) + +const ( + DefaultRecomputePreviousN = 2 + + DefaultRecomputeNoOlderThan = 10 * time.Minute + + DefaultComputeRunsPerInterval = 10 + + DefaultComputeNoMoreThan = 2 * time.Minute +) + +// Config represents a configuration for the continuous query service. +type Config struct { + // If this flag is set to false, both the brokers and data nodes should ignore any CQ processing. + Enabled bool `toml:"enabled"` + + // when continuous queries are run we'll automatically recompute previous intervals + // in case lagged data came in. Set to zero if you never have lagged data. We do + // it this way because invalidating previously computed intervals would be insanely hard + // and expensive. + RecomputePreviousN int `toml:"recompute-previous-n"` + + // The RecomputePreviousN setting provides guidance for how far back to recompute, the RecomputeNoOlderThan + // setting sets a ceiling on how far back in time it will go. For example, if you have 2 PreviousN + // and have this set to 10m, then we'd only compute the previous two intervals for any + // CQs that have a group by time <= 5m. For all others, we'd only recompute the previous window + RecomputeNoOlderThan toml.Duration `toml:"recompute-no-older-than"` + + // ComputeRunsPerInterval will determine how many times the current and previous N intervals + // will be computed. The group by time will be divided by this and it will get computed this many times: + // group by time seconds / runs per interval + // This will give partial results for current group by intervals and will determine how long it will + // be until lagged data is recomputed. For example, if this number is 10 and the group by time is 10m, it + // will be a minute past the previous 10m bucket of time before lagged data is picked up + ComputeRunsPerInterval int `toml:"compute-runs-per-interval"` + + // ComputeNoMoreThan paired with the RunsPerInterval will determine the ceiling of how many times smaller + // group by times will be computed. For example, if you have RunsPerInterval set to 10 and this setting + // to 1m. Then for a group by time(1m) will actually only get computed once per interval (and once per PreviousN). + // If you have a group by time(5m) then you'll get five computes per interval. Any group by time window larger + // than 10m will get computed 10 times for each interval. + ComputeNoMoreThan toml.Duration `toml:"compute-no-more-than"` +} + +// NewConfig returns a new instance of Config with defaults. +func NewConfig() Config { + return Config{ + Enabled: true, + RecomputePreviousN: DefaultRecomputePreviousN, + RecomputeNoOlderThan: toml.Duration(DefaultRecomputeNoOlderThan), + ComputeRunsPerInterval: DefaultComputeRunsPerInterval, + ComputeNoMoreThan: toml.Duration(DefaultComputeNoMoreThan), + } +} diff --git a/meta/continuous_querier/config_test.go b/meta/continuous_querier/config_test.go new file mode 100644 index 0000000000..d1a18d9ec7 --- /dev/null +++ b/meta/continuous_querier/config_test.go @@ -0,0 +1,36 @@ +package continuous_querier_test + +import ( + "testing" + "time" + + "github.com/BurntSushi/toml" + "github.com/influxdb/influxdb/meta/continuous_querier" +) + +func TestConfig_Parse(t *testing.T) { + // Parse configuration. + var c continuous_querier.Config + if _, err := toml.Decode(` +recompute-previous-n = 1 +recompute-no-older-than = "10s" +compute-runs-per-interval = 2 +compute-no-more-than = "20s" +disabled = true +`, &c); err != nil { + t.Fatal(err) + } + + // Validate configuration. + if c.RecomputePreviousN != 1 { + t.Fatalf("unexpected recompute previous n: %d", c.RecomputePreviousN) + } else if time.Duration(c.RecomputeNoOlderThan) != 10*time.Second { + t.Fatalf("unexpected recompute no older than: %v", c.RecomputeNoOlderThan) + } else if c.ComputeRunsPerInterval != 2 { + t.Fatalf("unexpected compute runs per interval: %d", c.ComputeRunsPerInterval) + } else if time.Duration(c.ComputeNoMoreThan) != 20*time.Second { + t.Fatalf("unexpected compute no more than: %v", c.ComputeNoMoreThan) + } else if c.Disabled != true { + t.Fatalf("unexpected disabled: %v", c.Disabled) + } +} diff --git a/meta/continuous_querier/service.go b/meta/continuous_querier/service.go new file mode 100644 index 0000000000..488744f681 --- /dev/null +++ b/meta/continuous_querier/service.go @@ -0,0 +1 @@ +package continuous_querier diff --git a/meta/store.go b/meta/store.go index bffacc542e..d0df1a8735 100644 --- a/meta/store.go +++ b/meta/store.go @@ -19,20 +19,6 @@ import ( "golang.org/x/crypto/bcrypt" ) -const ( - // DefaultHeartbeatTimeout is the default heartbeat timeout for the store. - DefaultHeartbeatTimeout = 1000 * time.Millisecond - - // DefaultElectionTimeout is the default election timeout for the store. - DefaultElectionTimeout = 1000 * time.Millisecond - - // DefaultLeaderLeaseTimeout is the default leader lease for the store. - DefaultLeaderLeaseTimeout = 500 * time.Millisecond - - // DefaultCommitTimeout is the default commit timeout for the store. - DefaultCommitTimeout = 50 * time.Millisecond -) - // Raft configuration. const ( raftLogCacheSize = 512 diff --git a/monitor/config.go b/monitor/config.go new file mode 100644 index 0000000000..1e3798700c --- /dev/null +++ b/monitor/config.go @@ -0,0 +1,25 @@ +package monitor + +import ( + "time" + + "github.com/influxdb/influxdb/toml" +) + +const ( + // DefaultStatisticsWriteInterval is the interval of time between internal stats are written + DefaultStatisticsWriteInterval = 1 * time.Minute +) + +// Config represents a configuration for the monitor. +type Config struct { + Enabled bool `toml:"enabled"` + WriteInterval toml.Duration `toml:"write-interval"` +} + +func NewConfig() Config { + return Config{ + Enabled: false, + WriteInterval: toml.Duration(DefaultStatisticsWriteInterval), + } +} diff --git a/monitor/monitor.go b/monitor/monitor.go new file mode 100644 index 0000000000..2568e6eb17 --- /dev/null +++ b/monitor/monitor.go @@ -0,0 +1,83 @@ +package monitor + +// Monitor represents a TSDB monitoring service. +type Monitor struct { + Store interface{} +} + +func (m *Monitor) Open() error { return nil } +func (m *Monitor) Close() error { return nil } + +// StartSelfMonitoring starts a goroutine which monitors the InfluxDB server +// itself and stores the results in the specified database at a given interval. +/* +func (s *Server) StartSelfMonitoring(database, retention string, interval time.Duration) error { + if interval == 0 { + return fmt.Errorf("statistics check interval must be non-zero") + } + + go func() { + tick := time.NewTicker(interval) + for { + <-tick.C + + // Create the batch and tags + tags := map[string]string{"serverID": strconv.FormatUint(s.ID(), 10)} + if h, err := os.Hostname(); err == nil { + tags["host"] = h + } + batch := pointsFromStats(s.stats, tags) + + // Shard-level stats. + tags["shardID"] = strconv.FormatUint(s.id, 10) + s.mu.RLock() + for _, sh := range s.shards { + if !sh.HasDataNodeID(s.id) { + // No stats for non-local shards. + continue + } + batch = append(batch, pointsFromStats(sh.stats, tags)...) + } + s.mu.RUnlock() + + // Server diagnostics. + for _, row := range s.DiagnosticsAsRows() { + points, err := s.convertRowToPoints(row.Name, row) + if err != nil { + s.Logger.Printf("failed to write diagnostic row for %s: %s", row.Name, err.Error()) + continue + } + for _, p := range points { + p.AddTag("serverID", strconv.FormatUint(s.ID(), 10)) + } + batch = append(batch, points...) + } + + s.WriteSeries(database, retention, batch) + } + }() + return nil +} + +// Function for local use turns stats into a slice of points +func pointsFromStats(st *Stats, tags map[string]string) []tsdb.Point { + var points []tsdb.Point + now := time.Now() + st.Walk(func(k string, v int64) { + point := tsdb.NewPoint( + st.name+"_"+k, + make(map[string]string), + map[string]interface{}{"value": int(v)}, + now, + ) + // Specifically create a new map. + for k, v := range tags { + tags[k] = v + point.AddTag(k, v) + } + points = append(points, point) + }) + + return points +} +*/ diff --git a/opentsdb/config.go b/opentsdb/config.go new file mode 100644 index 0000000000..af474c4c98 --- /dev/null +++ b/opentsdb/config.go @@ -0,0 +1,8 @@ +package opentsdb + +type Config struct { + Enabled bool `toml:"enabled"` + BindAddress string `toml:"bind-address"` + Database string `toml:"database"` + RetentionPolicy string `toml:"retention-policy"` +} diff --git a/opentsdb/config_test.go b/opentsdb/config_test.go new file mode 100644 index 0000000000..8ce2cac08a --- /dev/null +++ b/opentsdb/config_test.go @@ -0,0 +1,29 @@ +package opentsdb_test + +import ( + "testing" + + "github.com/BurntSushi/toml" + "github.com/influxdb/influxdb/opentsdb" +) + +func TestConfig_Parse(t *testing.T) { + // Parse configuration. + var c opentsdb.Config + if _, err := toml.Decode(` +enabled = true +bind-address = ":9000" +database = "xxx" +`, &c); err != nil { + t.Fatal(err) + } + + // Validate configuration. + if c.Enabled != true { + t.Fatalf("unexpected enabled: %v", c.Enabled) + } else if c.BindAddress != ":9000" { + t.Fatalf("unexpected bind address: %s", c.BindAddress) + } else if c.Database != "xxx" { + t.Fatalf("unexpected database: %s", c.Database) + } +} diff --git a/opentsdb/opentsdb.go b/opentsdb/opentsdb.go index 134255de1b..ed45645d3f 100644 --- a/opentsdb/opentsdb.go +++ b/opentsdb/opentsdb.go @@ -35,8 +35,7 @@ type SeriesWriter interface { type Server struct { writer SeriesWriter - database string - retentionpolicy string + database string listener *net.TCPListener tsdbhttp *tsdbHTTPListener @@ -50,7 +49,6 @@ func NewServer(w SeriesWriter, retpol string, db string) *Server { s := &Server{} s.writer = w - s.retentionpolicy = retpol s.database = db s.tsdbhttp = makeTSDBHTTPListener() @@ -225,7 +223,7 @@ func (s *Server) HandleTelnet(conn net.Conn) { p := tsdb.NewPoint(name, tags, fields, t) - _, err = s.writer.WriteSeries(s.database, s.retentionpolicy, []tsdb.Point{p}) + _, err = s.writer.WriteSeries(s.database, "", []tsdb.Point{p}) if err != nil { log.Println("TSDB cannot write data: ", err) continue @@ -331,7 +329,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { idps = append(idps, p) } - _, err = s.writer.WriteSeries(s.database, s.retentionpolicy, idps) + _, err = s.writer.WriteSeries(s.database, "", idps) if err != nil { log.Println("TSDB cannot write data: ", err) } diff --git a/toml/toml.go b/toml/toml.go index 4a8571f84a..1505e7ff85 100644 --- a/toml/toml.go +++ b/toml/toml.go @@ -1,6 +1,13 @@ package toml -import "time" +import ( + "fmt" + "strconv" + "time" +) + +// maxInt is the largest integer representable by a word (architeture dependent). +const maxInt = int64(^uint(0) >> 1) // Duration is a TOML wrapper type for time.Duration. type Duration time.Duration @@ -31,3 +38,35 @@ func (d *Duration) UnmarshalText(text []byte) error { func (d Duration) MarshalText() (text []byte, err error) { return []byte(d.String()), nil } + +// Size represents a TOML parseable file size. +// Users can specify size using "m" for megabytes and "g" for gigabytes. +type Size int + +// UnmarshalText parses a byte size from text. +func (s *Size) UnmarshalText(text []byte) error { + // Parse numeric portion of value. + length := len(string(text)) + size, err := strconv.ParseInt(string(text[:length-1]), 10, 64) + if err != nil { + return err + } + + // Parse unit of measure ("m", "g", etc). + switch suffix := text[len(text)-1]; suffix { + case 'm': + size *= 1 << 20 // MB + case 'g': + size *= 1 << 30 // GB + default: + return fmt.Errorf("unknown size suffix: %c", suffix) + } + + // Check for overflow. + if size > maxInt { + return fmt.Errorf("size %d cannot be represented by an int", size) + } + + *s = Size(size) + return nil +} diff --git a/toml/toml_test.go b/toml/toml_test.go new file mode 100644 index 0000000000..2890aa3871 --- /dev/null +++ b/toml/toml_test.go @@ -0,0 +1,47 @@ +package toml_test + +import ( + "reflect" + "testing" + + "github.com/influxdb/influxdb/toml" +) + +// Ensure that megabyte sizes can be parsed. +func TestSize_UnmarshalText_MB(t *testing.T) { + var s toml.Size + if err := s.UnmarshalText([]byte("200m")); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if s != 200*(1<<20) { + t.Fatalf("unexpected size: %d", s) + } +} + +// Ensure that gigabyte sizes can be parsed. +func TestSize_UnmarshalText_GB(t *testing.T) { + if typ := reflect.TypeOf(0); typ.Size() != 8 { + t.Skip("large gigabyte parsing on 64-bit arch only") + } + + var s toml.Size + if err := s.UnmarshalText([]byte("10g")); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if s != 10*(1<<30) { + t.Fatalf("unexpected size: %d", s) + } +} + +/* +func TestConfig_Encode(t *testing.T) { + var c influxdb.Config + c.Monitoring.WriteInterval = influxdb.Duration(time.Minute) + buf := new(bytes.Buffer) + if err := toml.NewEncoder(buf).Encode(&c); err != nil { + t.Fatal("Failed to encode: ", err) + } + got, search := buf.String(), `write-interval = "1m0s"` + if !strings.Contains(got, search) { + t.Fatalf("Encoding config failed.\nfailed to find %s in:\n%s\n", search, got) + } +} +*/ diff --git a/tsdb/config.go b/tsdb/config.go index 213d2b7ff6..8648698c38 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -1,6 +1,25 @@ package tsdb -import "github.com/influxdb/influxdb/toml" +import ( + "time" + + "github.com/influxdb/influxdb/toml" +) + +const ( + // DefaultRetentionAutoCreate is the default for auto-creating retention policies + DefaultRetentionAutoCreate = true + + // DefaultRetentionCheckEnabled is the default for checking for retention policy enforcement + DefaultRetentionCheckEnabled = true + + // DefaultRetentionCreatePeriod represents how often the server will check to see if new + // shard groups need to be created in advance for writing + DefaultRetentionCreatePeriod = 45 * time.Minute + + // DefaultRetentionCheckPeriod is the period of time between retention policy checks are run + DefaultRetentionCheckPeriod = 10 * time.Minute +) type Config struct { Dir string `toml:"dir"` @@ -9,3 +28,21 @@ type Config struct { RetentionCheckPeriod toml.Duration `toml:"retention-check-period"` RetentionCreatePeriod toml.Duration `toml:"retention-create-period"` } + +func NewConfig() Config { + return Config{ + RetentionAutoCreate: DefaultRetentionAutoCreate, + RetentionCheckEnabled: DefaultRetentionCheckEnabled, + RetentionCheckPeriod: toml.Duration(DefaultRetentionCheckPeriod), + RetentionCreatePeriod: toml.Duration(DefaultRetentionCreatePeriod), + } +} + +// ShardGroupPreCreateCheckPeriod returns the check interval to pre-create shard groups. +// If it was not defined in the config, it defaults to DefaultShardGroupPreCreatePeriod +func (c *Config) ShardGroupPreCreateCheckPeriod() time.Duration { + if c.RetentionCreatePeriod != 0 { + return time.Duration(c.RetentionCreatePeriod) + } + return DefaultRetentionCreatePeriod +}