Refactor run command.

pull/2709/head
Ben Johnson 2015-05-29 13:50:05 -06:00 committed by Cory LaNou
parent 1f294ce8de
commit 9d4527071e
39 changed files with 1187 additions and 1116 deletions

View File

@ -1,67 +0,0 @@
package admin
import (
"log"
"net"
"net/http"
"strings"
"sync"
"github.com/rakyll/statik/fs"
// Register static assets via statik.
_ "github.com/influxdb/influxdb/statik"
)
// Server manages InfluxDB's admin web server.
type Server struct {
mu sync.Mutex
addr string
listener net.Listener
closed bool
}
// NewServer constructs a new admin web server. The "addr" argument should be a
// string that looks like ":8083" or whatever addr to serve on.
func NewServer(addr string) *Server {
return &Server{addr: addr, closed: true}
}
// ListenAndServe starts the admin web server and serves requests until
// s.Close() is called.
func (s *Server) ListenAndServe() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.addr == "" {
return nil
}
var err error
s.listener, err = net.Listen("tcp", s.addr)
if err != nil {
return err
}
s.closed = false
statikFS, _ := fs.New()
go func() {
err = http.Serve(s.listener, http.FileServer(statikFS))
if !strings.Contains(err.Error(), "closed") {
log.Fatalf("admin server failed to server on %s: %s", s.addr, err)
}
}()
return err
}
// Close stops the admin web server.
func (s *Server) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return nil
}
s.closed = true
return s.listener.Close()
}

View File

@ -1,31 +0,0 @@
package admin_test
import (
"io/ioutil"
"net/http"
"testing"
"github.com/influxdb/influxdb/admin"
)
func Test_ServesIndexByDefault(t *testing.T) {
s := admin.NewServer(":8083")
go func() { s.ListenAndServe() }()
defer s.Close()
resp, err := http.Get("http://localhost:8083/")
if err != nil {
t.Fatalf("couldn't complete GET to / on port 8083")
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("didn't get a 200 OK response from server, got %d instead", resp.StatusCode)
}
_, err = ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("couldn't read body")
}
}

1
admin/config.go Normal file
View File

@ -0,0 +1 @@
package admin

26
admin/config_test.go Normal file
View File

@ -0,0 +1,26 @@
package admin_test
import (
"testing"
"github.com/BurntSushi/toml"
"github.com/influxdb/influxdb/admin"
)
func TestConfig_Parse(t *testing.T) {
// Parse configuration.
var c admin.Config
if _, err := toml.Decode(`
enabled = true
bind-address = ":8083"
`, &c); err != nil {
t.Fatal(err)
}
// Validate configuration.
if c.Enabled != true {
t.Fatalf("unexpected enabled: %v", c.Enabled)
} else if c.BindAddress != ":8083" {
t.Fatalf("unexpected bind address: %s", c.BindAddress)
}
}

View File

@ -1,32 +1,80 @@
package admin
import "log"
import (
"fmt"
"net"
"net/http"
"strings"
// Register static assets via statik.
_ "github.com/influxdb/influxdb/statik"
"github.com/rakyll/statik/fs"
)
// Service manages the listener for an admin endpoint.
type Service struct {
listener net.Listener
addr string
err chan error
}
func NewService(c *Config) *Service {
return &Service{}
}
func (s *Service) Open() error {
if err := cmd.node.openAdminServer(cmd.config.Admin.Port); err != nil {
log.Fatalf("admin server failed to listen on :%d: %s", cmd.config.Admin.Port, err)
// NewService returns a new instance of Service.
func NewService(c Config) *Service {
return &Service{
addr: c.BindAddress,
err: make(chan error),
}
log.Printf("admin server listening on :%d", cmd.config.Admin.Port)
}
func (s *Service) Close() error { return nil }
// Open starts the service
func (s *Service) Open() error {
// Open listener.
listener, err := net.Listen("tcp", s.addr)
if err != nil {
return err
}
s.listener = listener
type Config struct {
Enabled bool `toml:"enabled"`
Port int `toml:"port"`
// Begin listening for requests in a separate goroutine.
go s.serve()
return nil
}
func (s *Node) closeAdminServer() error {
if s.adminServer != nil {
return s.adminServer.Close()
// Close closes the underlying listener.
func (s *Service) Close() error {
if s.listener != nil {
return s.listener.Close()
}
return nil
}
// Err returns a channel for fatal errors that occur on the listener.
func (s *Service) Err() <-chan error { return s.err }
// Addr returns the listener's address. Returns nil if listener is closed.
func (s *Service) Addr() net.Addr {
if s.listener != nil {
return s.listener.Addr()
}
return nil
}
// serve serves the handler from the listener.
func (s *Service) serve() {
// Instantiate file system from embedded admin.
statikFS, err := fs.New()
if err != nil {
panic(err)
}
// Run file system handler on listener.
err = http.Serve(s.listener, http.FileServer(statikFS))
if err != nil && !strings.Contains(err.Error(), "closed") {
s.err <- fmt.Errorf("listener failed: addr=%s, err=%s", s.Addr(), err)
}
}
type Config struct {
Enabled bool `toml:"enabled"`
BindAddress string `toml:"bind-address"`
}

33
admin/service_test.go Normal file
View File

@ -0,0 +1,33 @@
package admin_test
import (
"io/ioutil"
"net/http"
"testing"
"github.com/influxdb/influxdb/admin"
)
// Ensure service can serve the root index page of the admin.
func TestService_Index(t *testing.T) {
// Start service on random port.
s := admin.NewService(admin.Config{BindAddress: ":0"})
if err := s.Open(); err != nil {
t.Fatal(err)
}
defer s.Close()
// Request root index page.
resp, err := http.Get("http://" + s.Addr().String())
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
// Validate status code and body.
if resp.StatusCode != http.StatusOK {
t.Fatalf("unexpected status: %d", resp.StatusCode)
} else if _, err := ioutil.ReadAll(resp.Body); err != nil {
t.Fatalf("unable to read body: %s", err)
}
}

3
cluster/config.go Normal file
View File

@ -0,0 +1,3 @@
package cluster
type Config struct{}

View File

@ -1,13 +1,14 @@
package cluster
import "net"
type Service struct {
}
func NewService(c *Config) *Service {
func NewService(c Config) *Service {
return &Service{}
}
func (s *Service) Open() error { return nil }
func (s *Service) Close() error { return nil }
type Config struct{}
func (s *Service) Open() error { return nil }
func (s *Service) Close() error { return nil }
func (s *Service) Addr() net.Addr { return nil }

View File

@ -1,4 +1,4 @@
package main
package help
import "fmt"

View File

@ -11,20 +11,10 @@ import (
"runtime/pprof"
"strings"
"time"
"github.com/influxdb/influxdb/cmd/influxd/run"
)
const logo = `
8888888 .d888 888 8888888b. 888888b.
888 d88P" 888 888 "Y88b 888 "88b
888 888 888 888 888 888 .88P
888 88888b. 888888 888 888 888 888 888 888 888 8888888K.
888 888 "88b 888 888 888 888 Y8bd8P' 888 888 888 "Y88b
888 888 888 888 888 888 888 X88K 888 888 888 888
888 888 888 888 888 Y88b 888 .d8""8b. 888 .d88P 888 d88P
8888888 888 888 888 888 "Y88888 888 888 8888888P" 8888888P"
`
// These variables are populated via the Go linker.
var (
version string = "0.9"
@ -69,37 +59,38 @@ func main() {
cmd = args[0]
}
// FIXME(benbjohnson): Parse profiling args & start profiling.
// Extract name from args.
switch cmd {
case "run":
cmd := NewRunCommand()
if err := cmd.Run(args[1:]...); err != nil {
log.Fatalf("run: %s", err)
}
case "":
cmd := NewRunCommand()
if err := cmd.Run(args...); err != nil {
if err := run.NewCommand().Run(args...); err != nil {
log.Fatalf("run: %s", err)
}
case "backup":
cmd := NewBackupCommand()
if err := cmd.Run(args[1:]...); err != nil {
log.Fatalf("backup: %s", err)
}
case "restore":
cmd := NewRestoreCommand()
if err := cmd.Run(args[1:]...); err != nil {
log.Fatalf("restore: %s", err)
case "run":
if err := run.NewCommand().Run(args[1:]...); err != nil {
log.Fatalf("run: %s", err)
}
// case "backup":
// cmd := NewBackupCommand()
// if err := cmd.Run(args[1:]...); err != nil {
// log.Fatalf("backup: %s", err)
// }
// case "restore":
// cmd := NewRestoreCommand()
// if err := cmd.Run(args[1:]...); err != nil {
// log.Fatalf("restore: %s", err)
// }
case "version":
execVersion(args[1:])
case "config":
execConfig(args[1:])
case "help":
cmd := NewHelpCommand()
if err := cmd.Run(args[1:]...); err != nil {
log.Fatalf("help: %s", err)
if err := run.NewPrintConfigCommand().Run(args[1:]...); err != nil {
log.Fatalf("config: %s", err)
}
// case "help":
// if err := help.NewCommand().Run(args[1:]...); err != nil {
// log.Fatalf("help: %s", err)
// }
default:
log.Fatalf(`influxd: unknown command "%s"`+"\n"+`Run 'influxd help' for usage`+"\n\n", cmd)
}
@ -124,43 +115,6 @@ func execVersion(args []string) {
log.Print(s)
}
// execConfig parses and prints the current config loaded.
func execConfig(args []string) {
// Parse command flags.
fs := flag.NewFlagSet("", flag.ExitOnError)
fs.Usage = func() {
fmt.Println(`usage: config
config displays the default configuration
`)
}
var (
configPath string
hostname string
)
fs.StringVar(&configPath, "config", "", "")
fs.StringVar(&hostname, "hostname", "", "")
fs.Parse(args)
var config *Config
var err error
if configPath == "" {
config, err = NewTestConfig()
} else {
config, err = ParseConfigFile(configPath)
}
if err != nil {
log.Fatalf("parse config: %s", err)
}
// Override config properties.
if hostname != "" {
config.Hostname = hostname
}
config.Write(os.Stdout)
}
type Stopper interface {
Stop()
}

195
cmd/influxd/run/command.go Normal file
View File

@ -0,0 +1,195 @@
package run
import (
"flag"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"strconv"
"github.com/BurntSushi/toml"
)
const logo = `
8888888 .d888 888 8888888b. 888888b.
888 d88P" 888 888 "Y88b 888 "88b
888 888 888 888 888 888 .88P
888 88888b. 888888 888 888 888 888 888 888 888 8888888K.
888 888 "88b 888 888 888 888 Y8bd8P' 888 888 888 "Y88b
888 888 888 888 888 888 888 X88K 888 888 888 888
888 888 888 888 888 Y88b 888 .d8""8b. 888 .d88P 888 d88P
8888888 888 888 888 888 "Y88888 888 888 8888888P" 8888888P"
`
// Command represents the command executed by "influxd run".
type Command struct {
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
Server *Server
}
// NewCommand return a new instance of Command.
func NewCommand() *Command {
return &Command{
Stdin: os.Stdin,
Stdout: os.Stdout,
Stderr: os.Stderr,
}
}
// Run parses the config from args and runs the server.
func (cmd *Command) Run(args ...string) error {
// Parse the command line flags.
options, err := cmd.ParseFlags(args...)
if err != nil {
return err
}
// Print sweet InfluxDB logo.
fmt.Print(logo)
// Write the PID file.
if err := cmd.writePIDFile(options.PIDFile); err != nil {
return fmt.Errorf("write pid file: %s", err)
}
// Set parallelism.
runtime.GOMAXPROCS(runtime.NumCPU())
fmt.Fprintf(cmd.Stderr, "GOMAXPROCS set to %d", runtime.GOMAXPROCS(0))
// Parse config
config, err := cmd.ParseConfig(options.ConfigPath)
if err != nil {
return fmt.Errorf("parse config: %s", err)
}
// Override config hostname if specified in the command line args.
if options.Hostname != "" {
config.Hostname = options.Hostname
}
// FIXME(benbjohnson): cmd.node.hostname = cmd.config.Hostname
// Use the config JoinURLs by default
// If a -join flag was passed, these should override the config
joinURLs := config.Initialization.JoinURLs
if options.Join != "" {
joinURLs = options.Join
}
// Normalize and validate the configuration.
config.Normalize()
if err := config.Validate(); err != nil {
return fmt.Errorf("%s. To generate a valid configuration file run `influxd config > influxdb.generated.conf`.", err)
}
// Create server from config and start it.
s := NewServer(config, joinURLs)
if err := s.Open(); err != nil {
return fmt.Errorf("open server: %s", err)
}
cmd.Server = s
return nil
}
// Close shuts down the server.
func (cmd *Command) Close() error {
if cmd.Server != nil {
return cmd.Server.Close()
}
return nil
}
// ParseFlags parses the command line flags from args and returns an options set.
func (cmd *Command) ParseFlags(args ...string) (Options, error) {
var options Options
fs := flag.NewFlagSet("", flag.ContinueOnError)
fs.StringVar(&options.ConfigPath, "config", "", "")
fs.StringVar(&options.PIDFile, "pidfile", "", "")
fs.StringVar(&options.Hostname, "hostname", "", "")
fs.StringVar(&options.Join, "join", "", "")
fs.StringVar(&options.CPUProfile, "cpuprofile", "", "")
fs.StringVar(&options.MemProfile, "memprofile", "", "")
fs.Usage = func() { fmt.Fprintln(cmd.Stderr, usage) }
if err := fs.Parse(args); err != nil {
return Options{}, err
}
return options, nil
}
// writePIDFile writes the process ID to path.
func (cmd *Command) writePIDFile(path string) error {
// Ignore if path is not set.
if path == "" {
return nil
}
// Ensure the required directory structure exists.
err := os.MkdirAll(filepath.Dir(path), 0777)
if err != nil {
return fmt.Errorf("mkdir: %s", err)
}
// Retrieve the PID and write it.
pid := strconv.Itoa(os.Getpid())
if err := ioutil.WriteFile(path, []byte(pid), 0666); err != nil {
return fmt.Errorf("write file: %s", err)
}
return nil
}
// ParseConfig parses the config at path.
// Returns a demo configuration if path is blank.
func (cmd *Command) ParseConfig(path string) (*Config, error) {
// Use demo configuration if no config path is specified.
if path == "" {
fmt.Fprintln(cmd.Stdout, "no configuration provided, using default settings")
return NewTestConfig()
}
fmt.Fprintf(cmd.Stdout, "using configuration at: %s\n", path)
config := NewConfig()
if _, err := toml.DecodeFile(path, &config); err != nil {
return nil, err
}
return config, nil
}
var usage = `usage: run [flags]
run starts the broker and data node server. If this is the first time running
the command then a new cluster will be initialized unless the -join argument
is used.
-config <path>
Set the path to the configuration file.
-hostname <name>
Override the hostname, the 'hostname' configuration
option will be overridden.
-join <url>
Joins the server to an existing cluster.
-pidfile <path>
Write process ID to a file.
`
// Options represents the command line options that can be parsed.
type Options struct {
ConfigPath string
PIDFile string
Hostname string
Join string
CPUProfile string
MemProfile string
}

View File

@ -1,6 +1,7 @@
package run
import (
"errors"
"fmt"
"io"
"log"
@ -9,14 +10,18 @@ import (
"os/user"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/BurntSushi/toml"
"github.com/influxdb/influxdb/admin"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/collectd"
"github.com/influxdb/influxdb/graphite"
"github.com/influxdb/influxdb/httpd"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/meta/continuous_querier"
"github.com/influxdb/influxdb/monitor"
"github.com/influxdb/influxdb/opentsdb"
"github.com/influxdb/influxdb/tsdb"
)
@ -43,64 +48,9 @@ const (
// DefaultClusterPort represents the default port the cluster runs ons.
DefaultClusterPort = 8086
// DefaultBrokerEnabled is the default for starting a node as a broker
DefaultBrokerEnabled = true
// DefaultDataEnabled is the default for starting a node as a data node
DefaultDataEnabled = true
// DefaultRetentionCreatePeriod represents how often the server will check to see if new
// shard groups need to be created in advance for writing
DefaultRetentionCreatePeriod = 45 * time.Minute
// DefaultBrokerTruncationInterval is the default period between truncating topics.
DefaultBrokerTruncationInterval = 10 * time.Minute
// DefaultMaxTopicSize is the default maximum size in bytes a segment can consume on disk of a broker.
DefaultBrokerMaxSegmentSize = 10 * 1024 * 1024
// DefaultMaxTopicSize is the default maximum size in bytes a topic can consume on disk of a broker.
DefaultBrokerMaxTopicSize = 5 * DefaultBrokerMaxSegmentSize
// DefaultRaftApplyInterval is the period between applying commited Raft log entries.
DefaultRaftApplyInterval = 10 * time.Millisecond
// DefaultRaftElectionTimeout is the default Leader Election timeout.
DefaultRaftElectionTimeout = 1 * time.Second
// DefaultRaftHeartbeatInterval is the interval between leader heartbeats.
DefaultRaftHeartbeatInterval = 100 * time.Millisecond
// DefaultRaftReconnectTimeout is the default wait time between reconnections.
DefaultRaftReconnectTimeout = 10 * time.Millisecond
// DefaultGraphiteDatabaseName is the default Graphite database if none is specified
DefaultGraphiteDatabaseName = "graphite"
// DefaultOpenTSDBDatabaseName is the default OpenTSDB database if none is specified
DefaultOpenTSDBDatabaseName = "opentsdb"
// DefaultRetentionAutoCreate is the default for auto-creating retention policies
DefaultRetentionAutoCreate = true
// DefaultRetentionCheckEnabled is the default for checking for retention policy enforcement
DefaultRetentionCheckEnabled = true
// DefaultRetentionCheckPeriod is the period of time between retention policy checks are run
DefaultRetentionCheckPeriod = 10 * time.Minute
// DefaultRecomputePreviousN is ???
DefaultContinuousQueryRecomputePreviousN = 2
// DefaultContinuousQueryRecomputeNoOlderThan is ???
DefaultContinuousQueryRecomputeNoOlderThan = 10 * time.Minute
// DefaultContinuousQueryComputeRunsPerInterval is ???
DefaultContinuousQueryComputeRunsPerInterval = 10
// DefaultContinousQueryComputeNoMoreThan is ???
DefaultContinousQueryComputeNoMoreThan = 2 * time.Minute
// DefaultStatisticsEnabled is the default setting for whether internal statistics are collected
DefaultStatisticsEnabled = false
@ -109,9 +59,6 @@ const (
// DefaultStatisticsRetentionPolicy is he default internal statistics rentention policy name
DefaultStatisticsRetentionPolicy = "default"
// DefaultStatisticsWriteInterval is the interval of time between internal stats are written
DefaultStatisticsWriteInterval = 1 * time.Minute
)
var DefaultSnapshotURL = url.URL{
@ -124,87 +71,34 @@ var DefaultSnapshotURL = url.URL{
// Enabled bool `toml:"enabled"`
// }
// Initialization contains configuration options for the first time a node boots
type Initialization struct {
// JoinURLs are cluster URLs to use when joining a node to a cluster the first time it boots. After,
// a node is joined to a cluster, these URLS are ignored. These will be overriden at runtime if
// the node is started with the `-join` flag.
JoinURLs string `toml:"join-urls"`
}
// Config represents the configuration format for the influxd binary.
type Config struct {
Hostname string `toml:"hostname"`
BindAddress string `toml:"bind-address"`
Port int `toml:"port"`
ReportingDisabled bool `toml:"reporting-disabled"`
Version string `toml:"-"`
InfluxDBVersion string `toml:"-"`
Hostname string `toml:"hostname"`
BindAddress string `toml:"bind-address"`
ReportingEnabled bool `toml:"reporting-enabled"`
// Version string `toml:"-"`
// InfluxDBVersion string `toml:"-"`
Initialization Initialization `toml:"initialization"`
Initialization struct {
// JoinURLs are cluster URLs to use when joining a node to a cluster the first time it boots. After,
// a node is joined to a cluster, these URLS are ignored. These will be overriden at runtime if
// the node is started with the `-join` flag.
JoinURLs string `toml:"join-urls"`
} `toml:"initialization"`
Authentication struct {
Enabled bool `toml:"enabled"`
} `toml:"authentication"`
Meta meta.Config `toml:"meta"`
Data tsdb.Config `toml:"data"`
Cluster cluster.Config `toml:"cluster"`
Admin admin.Config `toml:"admin"`
HTTPAPI httpd.Config `toml:"api"`
Graphites []Graphite `toml:"graphite"`
Collectd Collectd `toml:"collectd"`
OpenTSDB OpenTSDB `toml:"opentsdb"`
UDP struct {
Enabled bool `toml:"enabled"`
BindAddress string `toml:"bind-address"`
Port int `toml:"port"`
} `toml:"udp"`
Data tsdb.Config `toml:"data"`
Admin admin.Config `toml:"admin"`
HTTPD httpd.Config `toml:"api"`
Graphites []graphite.Config `toml:"graphite"`
Collectd collectd.Config `toml:"collectd"`
OpenTSDB opentsdb.Config `toml:"opentsdb"`
// Snapshot SnapshotConfig `toml:"snapshot"`
Monitoring struct {
Enabled bool `toml:"enabled"`
WriteInterval Duration `toml:"write-interval"`
} `toml:"monitoring"`
Debugging struct {
PprofEnabled bool `toml:"pprof-enabled"`
} `toml:"debugging"`
ContinuousQuery struct {
// when continuous queries are run we'll automatically recompute previous intervals
// in case lagged data came in. Set to zero if you never have lagged data. We do
// it this way because invalidating previously computed intervals would be insanely hard
// and expensive.
RecomputePreviousN int `toml:"recompute-previous-n"`
// The RecomputePreviousN setting provides guidance for how far back to recompute, the RecomputeNoOlderThan
// setting sets a ceiling on how far back in time it will go. For example, if you have 2 PreviousN
// and have this set to 10m, then we'd only compute the previous two intervals for any
// CQs that have a group by time <= 5m. For all others, we'd only recompute the previous window
RecomputeNoOlderThan Duration `toml:"recompute-no-older-than"`
// ComputeRunsPerInterval will determine how many times the current and previous N intervals
// will be computed. The group by time will be divided by this and it will get computed this many times:
// group by time seconds / runs per interval
// This will give partial results for current group by intervals and will determine how long it will
// be until lagged data is recomputed. For example, if this number is 10 and the group by time is 10m, it
// will be a minute past the previous 10m bucket of time before lagged data is picked up
ComputeRunsPerInterval int `toml:"compute-runs-per-interval"`
// ComputeNoMoreThan paired with the RunsPerInterval will determine the ceiling of how many times smaller
// group by times will be computed. For example, if you have RunsPerInterval set to 10 and this setting
// to 1m. Then for a group by time(1m) will actually only get computed once per interval (and once per PreviousN).
// If you have a group by time(5m) then you'll get five computes per interval. Any group by time window larger
// than 10m will get computed 10 times for each interval.
ComputeNoMoreThan Duration `toml:"compute-no-more-than"`
// If this flag is set to true, both the brokers and data nodes should ignore any CQ processing.
Disabled bool `toml:"disabled"`
} `toml:"continuous_queries"`
Monitoring monitor.Config `toml:"monitoring"`
ContinuousQuery continuous_querier.Config `toml:"continuous_queries"`
}
// NewConfig returns an instance of Config with reasonable defaults.
@ -212,35 +106,12 @@ func NewConfig() *Config {
c := &Config{}
c.Hostname = DefaultHostName
c.BindAddress = DefaultBindAddress
c.Port = DefaultClusterPort
c.Data.Enabled = DefaultDataEnabled
c.Broker.Enabled = DefaultBrokerEnabled
c.Data.RetentionAutoCreate = DefaultRetentionAutoCreate
c.Data.RetentionCheckEnabled = DefaultRetentionCheckEnabled
c.Data.RetentionCheckPeriod = Duration(DefaultRetentionCheckPeriod)
c.Data.RetentionCreatePeriod = Duration(DefaultRetentionCreatePeriod)
c.Logging.HTTPAccess = true
c.Logging.WriteTracing = false
c.Logging.RaftTracing = false
c.Monitoring.Enabled = false
c.Monitoring.WriteInterval = Duration(DefaultStatisticsWriteInterval)
c.ContinuousQuery.RecomputePreviousN = DefaultContinuousQueryRecomputePreviousN
c.ContinuousQuery.RecomputeNoOlderThan = Duration(DefaultContinuousQueryRecomputeNoOlderThan)
c.ContinuousQuery.ComputeRunsPerInterval = DefaultContinuousQueryComputeRunsPerInterval
c.ContinuousQuery.ComputeNoMoreThan = Duration(DefaultContinousQueryComputeNoMoreThan)
c.Broker.TruncationInterval = Duration(DefaultBrokerTruncationInterval)
c.Broker.MaxTopicSize = DefaultBrokerMaxTopicSize
c.Broker.MaxSegmentSize = DefaultBrokerMaxSegmentSize
c.Raft.ApplyInterval = Duration(DefaultRaftApplyInterval)
c.Raft.ElectionTimeout = Duration(DefaultRaftElectionTimeout)
c.Raft.HeartbeatInterval = Duration(DefaultRaftHeartbeatInterval)
c.Raft.ReconnectTimeout = Duration(DefaultRaftReconnectTimeout)
c.Meta = meta.NewConfig()
c.Data = tsdb.NewConfig()
c.HTTPD = httpd.NewConfig()
c.Monitoring = monitor.NewConfig()
c.ContinuousQuery = continuous_querier.NewConfig()
return c
}
@ -250,73 +121,47 @@ func NewConfig() *Config {
func NewTestConfig() (*Config, error) {
c := NewConfig()
// By default, store broker and data files in current users home directory
// By default, store meta and data files in current users home directory
u, err := user.Current()
if err != nil {
return nil, fmt.Errorf("failed to determine current user for storage")
}
c.Broker.Enabled = true
c.Broker.Dir = filepath.Join(u.HomeDir, ".influxdb/broker")
c.Raft.ApplyInterval = Duration(DefaultRaftApplyInterval)
c.Raft.ElectionTimeout = Duration(DefaultRaftElectionTimeout)
c.Raft.HeartbeatInterval = Duration(DefaultRaftHeartbeatInterval)
c.Raft.ReconnectTimeout = Duration(DefaultRaftReconnectTimeout)
c.Data.Enabled = true
c.Meta.Dir = filepath.Join(u.HomeDir, ".influxdb/meta")
c.Data.Dir = filepath.Join(u.HomeDir, ".influxdb/data")
c.Admin.Enabled = true
c.Admin.Port = 8083
c.Admin.BindAddress = ":8083"
c.Monitoring.Enabled = false
c.Snapshot.Enabled = true
//c.Snapshot.Enabled = true
return c, nil
}
// APIAddr returns the TCP binding address for the API server.
func (c *Config) APIAddr() string {
// Default to cluster bind address if not overriden
ba := c.BindAddress
if c.HTTPAPI.BindAddress != "" {
ba = c.HTTPAPI.BindAddress
// Normalize sets default values on config.
func (c *Config) Normalize() {
// Normalize Graphite configs.
for i, _ := range c.Graphites {
if c.Graphites[i].BindAddress == "" {
c.Graphites[i].BindAddress = c.BindAddress
}
}
// Default to cluster port if not overridden
bp := c.Port
if c.HTTPAPI.Port != 0 {
bp = c.HTTPAPI.Port
}
return net.JoinHostPort(ba, strconv.Itoa(bp))
}
// APIAddrUDP returns the UDP address for the series listener.
func (c *Config) APIAddrUDP() string {
return net.JoinHostPort(c.UDP.BindAddress, strconv.Itoa(c.UDP.Port))
}
// ClusterAddr returns the binding address for the cluster
func (c *Config) ClusterAddr() string {
return net.JoinHostPort(c.BindAddress, strconv.Itoa(c.Port))
}
// ClusterURL returns the URL required to contact the server cluster endpoints.
func (c *Config) ClusterURL() url.URL {
return url.URL{
Scheme: "http",
Host: net.JoinHostPort(c.Hostname, strconv.Itoa(c.Port)),
// Normalize OpenTSDB config.
if c.OpenTSDB.BindAddress == "" {
c.OpenTSDB.BindAddress = c.BindAddress
}
}
// BrokerDir returns the data directory to start up in and does home directory expansion if necessary.
func (c *Config) BrokerDir() string {
p, e := filepath.Abs(c.Broker.Dir)
if e != nil {
log.Fatalf("Unable to get absolute path for Broker Directory: %q", c.Broker.Dir)
// Validate returns an error if the config is invalid.
func (c *Config) Validate() error {
if c.Meta.Dir == "" {
return errors.New("Meta.Dir must be specified")
} else if c.Data.Dir == "" {
return errors.New("Data.Dir must be specified")
}
return p
return nil
}
// DataDir returns the data directory to start up in and does home directory expansion if necessary.
@ -328,154 +173,7 @@ func (c *Config) DataDir() string {
return p
}
// ShardGroupPreCreateCheckPeriod returns the check interval to pre-create shard groups.
// If it was not defined in the config, it defaults to DefaultShardGroupPreCreatePeriod
func (c *Config) ShardGroupPreCreateCheckPeriod() time.Duration {
if c.Data.RetentionCreatePeriod != 0 {
return time.Duration(c.Data.RetentionCreatePeriod)
}
return DefaultRetentionCreatePeriod
}
// WriteConfigFile writes the config to the specified writer
func (c *Config) Write(w io.Writer) error {
return toml.NewEncoder(w).Encode(c)
}
// Size represents a TOML parseable file size.
// Users can specify size using "m" for megabytes and "g" for gigabytes.
type Size int
// UnmarshalText parses a byte size from text.
func (s *Size) UnmarshalText(text []byte) error {
// Parse numeric portion of value.
length := len(string(text))
size, err := strconv.ParseInt(string(text[:length-1]), 10, 64)
if err != nil {
return err
}
// Parse unit of measure ("m", "g", etc).
switch suffix := text[len(text)-1]; suffix {
case 'm':
size *= 1 << 20 // MB
case 'g':
size *= 1 << 30 // GB
default:
return fmt.Errorf("unknown size suffix: %c", suffix)
}
// Check for overflow.
if size > maxInt {
return fmt.Errorf("size %d cannot be represented by an int", size)
}
*s = Size(size)
return nil
}
// ParseConfigFile parses a configuration file at a given path.
func ParseConfigFile(path string) (*Config, error) {
c := NewConfig()
if _, err := toml.DecodeFile(path, &c); err != nil {
return nil, err
}
return c, nil
}
// ParseConfig parses a configuration string into a config object.
func ParseConfig(s string) (*Config, error) {
c := NewConfig()
if _, err := toml.Decode(s, &c); err != nil {
return nil, err
}
return c, nil
}
type Collectd struct {
BindAddress string `toml:"bind-address"`
Port uint16 `toml:"port"`
Database string `toml:"database"`
Enabled bool `toml:"enabled"`
TypesDB string `toml:"typesdb"`
}
// ConnnectionString returns the connection string for this collectd config in the form host:port.
func (c *Collectd) ConnectionString(defaultBindAddr string) string {
addr := c.BindAddress
// If no address specified, use default.
if addr == "" {
addr = defaultBindAddr
}
port := c.Port
// If no port specified, use default.
if port == 0 {
port = collectd.DefaultPort
}
return fmt.Sprintf("%s:%d", addr, port)
}
type Graphite struct {
BindAddress string `toml:"bind-address"`
Port int `toml:"port"`
Database string `toml:"database"`
Enabled bool `toml:"enabled"`
Protocol string `toml:"protocol"`
NamePosition string `toml:"name-position"`
NameSeparator string `toml:"name-separator"`
}
// ConnnectionString returns the connection string for this Graphite config in the form host:port.
func (g *Graphite) ConnectionString() string {
return net.JoinHostPort(g.BindAddress, strconv.Itoa(g.Port))
}
// NameSeparatorString returns the character separating fields for Graphite data, or the default
// if no separator is set.
func (g *Graphite) NameSeparatorString() string {
if g.NameSeparator == "" {
return graphite.DefaultGraphiteNameSeparator
}
return g.NameSeparator
}
func (g *Graphite) DatabaseString() string {
if g.Database == "" {
return DefaultGraphiteDatabaseName
}
return g.Database
}
// LastEnabled returns whether the Graphite Server shoudl intepret the last field as "name".
func (g *Graphite) LastEnabled() bool {
return g.NamePosition == strings.ToLower("last")
}
// maxInt is the largest integer representable by a word (architeture dependent).
const maxInt = int64(^uint(0) >> 1)
type OpenTSDB struct {
Addr string `toml:"address"`
Port int `toml:"port"`
Enabled bool `toml:"enabled"`
Database string `toml:"database"`
RetentionPolicy string `toml:"retention-policy"`
}
func (o OpenTSDB) DatabaseString() string {
if o.Database == "" {
return DefaultOpenTSDBDatabaseName
}
return o.Database
}
func (o OpenTSDB) ListenAddress() string {
return net.JoinHostPort(o.Addr, strconv.Itoa(o.Port))
}

View File

@ -0,0 +1,70 @@
package run
import (
"flag"
"fmt"
"io"
"os"
"github.com/BurntSushi/toml"
)
// PrintConfigCommand represents the command executed by "influxd config".
type PrintConfigCommand struct {
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
}
// NewPrintConfigCommand return a new instance of PrintConfigCommand.
func NewPrintConfigCommand() *PrintConfigCommand {
return &PrintConfigCommand{
Stdin: os.Stdin,
Stdout: os.Stdout,
Stderr: os.Stderr,
}
}
// Run parses and prints the current config loaded.
func (cmd *PrintConfigCommand) Run(args ...string) error {
// Parse command flags.
fs := flag.NewFlagSet("", flag.ContinueOnError)
configPath := fs.String("config", "", "")
hostname := fs.String("hostname", "", "")
fs.Usage = func() { fmt.Fprintln(cmd.Stderr, printConfigUsage) }
if err := fs.Parse(args); err != nil {
return err
}
// Parse config from path.
config, err := cmd.parseConfig(*configPath)
if err != nil {
return fmt.Errorf("parse config: %s", err)
}
// Override config properties.
if *hostname != "" {
config.Hostname = *hostname
}
config.Write(cmd.Stdout)
return nil
}
// ParseConfig parses the config at path.
// Returns a demo configuration if path is blank.
func (cmd *PrintConfigCommand) parseConfig(path string) (*Config, error) {
if path == "" {
return NewTestConfig()
}
config := NewConfig()
if _, err := toml.DecodeFile(path, &config); err != nil {
return nil, err
}
return config, nil
}
var printConfigUsage = `usage: config
config displays the default configuration
`

View File

@ -1,346 +1,89 @@
package run_test
import (
"bytes"
"reflect"
"strings"
"testing"
"time"
"github.com/BurntSushi/toml"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/cmd/influxd/run"
)
// Testing configuration file.
const testFile = `
# Welcome to the InfluxDB configuration file.
// Ensure the configuration can be parsed.
func TestConfig_Parse(t *testing.T) {
// Parse configuration.
var c run.Config
if _, err := toml.Decode(`
hostname = "localhost"
bind-address = ":8086"
reporting-enabled = true
# If hostname (on the OS) doesn't return a name that can be resolved by the other
# systems in the cluster, you'll have to set the hostname to an IP or something
# that can be resolved here.
hostname = "myserver.com"
port = 8086
[initialization]
join-urls = "serverA,serverB"
[meta]
dir = "/tmp/meta"
# Control authentication
[authentication]
enabled = true
[data]
dir = "/tmp/data"
[logging]
write-tracing = true
raft-tracing = true
[cluster]
[admin]
bind-address = ":8083"
[api]
bind-address = ":8087"
[[graphite]]
protocol = "udp"
[[graphite]]
protocol = "tcp"
[collectd]
bind-address = ":1000"
[opentsdb]
bind-address = ":2000"
[monitoring]
enabled = true
write-interval = "1m"
# Configure the admin server
[admin]
enabled = true
port = 8083
# Controls certain parameters that only take effect until an initial successful
# start-up has occurred.
[initialization]
join-urls = "http://127.0.0.1:8086"
# Configure the http api
[api]
bind-address = "10.1.2.3"
ssl-port = 8087 # Ssl support is enabled if you set a port and cert
ssl-cert = "../cert.pem"
# connections will timeout after this amount of time. Ensures that clients that misbehave
# and keep alive connections they don't use won't end up connection a million times.
# However, if a request is taking longer than this to complete, could be a problem.
read-timeout = "5s"
[input_plugins]
[input_plugins.udp]
enabled = true
port = 4444
database = "test"
# Configure the Graphite servers
[[graphite]]
protocol = "TCP"
enabled = true
bind-address = "192.168.0.1"
port = 2003
database = "graphite_tcp" # store graphite data in this database
name-position = "last"
name-separator = "-"
[[graphite]]
protocol = "udP"
enabled = true
bind-address = "192.168.0.2"
port = 2005
# Configure collectd server
[collectd]
enabled = true
bind-address = "192.168.0.3"
port = 25827
database = "collectd_database"
typesdb = "foo-db-type"
# Configure OpenTSDB server
[opentsdb]
enabled = true
address = "192.168.0.3"
port = 4242
database = "opentsdb_database"
retention-policy = "raw"
# Broker configuration
[broker]
# The broker port should be open between all servers in a cluster.
# However, this port shouldn't be accessible from the internet.
enabled = false
# Where the broker logs are stored. The user running InfluxDB will need read/write access.
dir = "/tmp/influxdb/development/broker"
# Raft distributed consensus
[raft]
apply-interval = "10ms"
election-timeout = "1s"
[data]
dir = "/tmp/influxdb/development/db"
retention-auto-create = false
retention-check-enabled = true
retention-check-period = "5m"
enabled = false
[continuous_queries]
disabled = true
[snapshot]
enabled = true
`
`, &c); err != nil {
t.Fatal(err)
}
// Ensure that megabyte sizes can be parsed.
func TestSize_UnmarshalText_MB(t *testing.T) {
var s influxdb.Size
if err := s.UnmarshalText([]byte("200m")); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if s != 200*(1<<20) {
t.Fatalf("unexpected size: %d", s)
}
}
// Ensure that gigabyte sizes can be parsed.
func TestSize_UnmarshalText_GB(t *testing.T) {
if typ := reflect.TypeOf(0); typ.Size() != 8 {
t.Skip("large gigabyte parsing on 64-bit arch only")
}
var s influxdb.Size
if err := s.UnmarshalText([]byte("10g")); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if s != 10*(1<<30) {
t.Fatalf("unexpected size: %d", s)
}
}
// Ensure that a TOML configuration file can be parsed into a Config.
func TestParseConfig(t *testing.T) {
c, err := influxdb.ParseConfig(testFile)
if err != nil {
t.Fatalf("unexpected error: %s", err)
} else if c.Hostname != "myserver.com" {
t.Fatalf("hostname mismatch: %v", c.Hostname)
}
if exp := 8086; c.Port != exp {
t.Fatalf("port mismatch. got %v, exp %v", c.Port, exp)
}
if c.Initialization.JoinURLs != "http://127.0.0.1:8086" {
t.Fatalf("JoinURLs mistmatch: %v", c.Initialization.JoinURLs)
}
if !c.Authentication.Enabled {
t.Fatalf("authentication enabled mismatch: %v", c.Authentication.Enabled)
}
if exp := "10.1.2.3"; c.HTTPAPI.BindAddress != exp {
t.Fatalf("http api bind-address mismatch: got %v, exp %v", c.HTTPAPI.BindAddress, exp)
}
if c.UDP.Enabled {
t.Fatalf("udp enabled mismatch: %v", c.UDP.Enabled)
}
if c.Admin.Enabled != true {
t.Fatalf("admin enabled mismatch: %v", c.Admin.Enabled)
}
if c.Admin.Port != 8083 {
t.Fatalf("admin port mismatch: %v", c.Admin.Port)
}
if c.ContinuousQuery.Disabled != true {
t.Fatalf("continuous query disable mismatch: %v", c.ContinuousQuery.Disabled)
}
if len(c.Graphites) != 2 {
t.Fatalf("graphites mismatch. expected %v, got: %v", 2, len(c.Graphites))
}
tcpGraphite := c.Graphites[0]
switch {
case tcpGraphite.Enabled != true:
t.Fatalf("graphite tcp enabled mismatch: expected: %v, got %v", true, tcpGraphite.Enabled)
case tcpGraphite.BindAddress != "192.168.0.1":
t.Fatalf("graphite tcp address mismatch: expected %v, got %v", "192.168.0.1", tcpGraphite.BindAddress)
case tcpGraphite.Port != 2003:
t.Fatalf("graphite tcp port mismatch: expected %v, got %v", 2003, tcpGraphite.Port)
case tcpGraphite.Database != "graphite_tcp":
t.Fatalf("graphite tcp database mismatch: expected %v, got %v", "graphite_tcp", tcpGraphite.Database)
case strings.ToLower(tcpGraphite.Protocol) != "tcp":
t.Fatalf("graphite tcp protocol mismatch: expected %v, got %v", "tcp", strings.ToLower(tcpGraphite.Protocol))
case tcpGraphite.LastEnabled() != true:
t.Fatalf("graphite tcp name-position mismatch: expected %v, got %v", "last", tcpGraphite.NamePosition)
case tcpGraphite.NameSeparatorString() != "-":
t.Fatalf("graphite tcp name-separator mismatch: expected %v, got %v", "-", tcpGraphite.NameSeparatorString())
}
udpGraphite := c.Graphites[1]
switch {
case udpGraphite.Enabled != true:
t.Fatalf("graphite udp enabled mismatch: expected: %v, got %v", true, udpGraphite.Enabled)
case udpGraphite.BindAddress != "192.168.0.2":
t.Fatalf("graphite udp address mismatch: expected %v, got %v", "192.168.0.2", udpGraphite.BindAddress)
case udpGraphite.Port != 2005:
t.Fatalf("graphite udp port mismatch: expected %v, got %v", 2005, udpGraphite.Port)
case udpGraphite.DatabaseString() != "graphite":
t.Fatalf("graphite database mismatch: expected %v, got %v", "graphite", udpGraphite.Database)
case strings.ToLower(udpGraphite.Protocol) != "udp":
t.Fatalf("graphite udp protocol mismatch: expected %v, got %v", "udp", strings.ToLower(udpGraphite.Protocol))
}
switch {
case c.Collectd.Enabled != true:
t.Errorf("collectd enabled mismatch: expected: %v, got %v", true, c.Collectd.Enabled)
case c.Collectd.BindAddress != "192.168.0.3":
t.Errorf("collectd address mismatch: expected %v, got %v", "192.168.0.3", c.Collectd.BindAddress)
case c.Collectd.Port != 25827:
t.Errorf("collectd port mismatch: expected %v, got %v", 2005, c.Collectd.Port)
case c.Collectd.Database != "collectd_database":
t.Errorf("collectdabase mismatch: expected %v, got %v", "collectd_database", c.Collectd.Database)
case c.Collectd.TypesDB != "foo-db-type":
t.Errorf("collectd typesdb mismatch: expected %v, got %v", "foo-db-type", c.Collectd.TypesDB)
}
switch {
case c.OpenTSDB.Enabled != true:
t.Errorf("opentsdb enabled mismatch: expected: %v, got %v", true, c.OpenTSDB.Enabled)
case c.OpenTSDB.ListenAddress() != "192.168.0.3:4242":
t.Errorf("opentsdb listen address mismatch: expected %v, got %v", "192.168.0.3:4242", c.OpenTSDB.ListenAddress())
case c.OpenTSDB.DatabaseString() != "opentsdb_database":
t.Errorf("opentsdb database mismatch: expected %v, got %v", "opentsdb_database", c.OpenTSDB.DatabaseString())
case c.OpenTSDB.RetentionPolicy != "raw":
t.Errorf("collectd retention-policy mismatch: expected %v, got %v", "foo-db-type", c.OpenTSDB.RetentionPolicy)
}
if c.Broker.Dir != "/tmp/influxdb/development/broker" {
t.Fatalf("broker dir mismatch: %v", c.Broker.Dir)
}
if c.Broker.Enabled != false {
t.Fatalf("broker disabled mismatch: %v, got: %v", false, c.Broker.Enabled)
}
if c.Raft.ApplyInterval != influxdb.Duration(10*time.Millisecond) {
t.Fatalf("Raft apply interval mismatch: %v, got %v", 10*time.Millisecond, c.Raft.ApplyInterval)
}
if c.Data.Dir != "/tmp/influxdb/development/db" {
t.Fatalf("data dir mismatch: %v", c.Data.Dir)
}
if c.Data.RetentionCheckEnabled != true {
t.Fatalf("Retention check enabled mismatch: %v", c.Data.RetentionCheckEnabled)
}
if c.Data.RetentionCheckPeriod != influxdb.Duration(5*time.Minute) {
t.Fatalf("Retention check period mismatch: %v", c.Data.RetentionCheckPeriod)
}
if c.Data.Enabled != false {
t.Fatalf("data disabled mismatch: %v, got: %v", false, c.Data.Enabled)
}
if c.Monitoring.WriteInterval.String() != "1m0s" {
t.Fatalf("Monitoring.WriteInterval mismatch: %v", c.Monitoring.WriteInterval)
}
if !c.Snapshot.Enabled {
t.Fatalf("snapshot enabled mismatch: %v, got %v", true, c.Snapshot.Enabled)
}
// TODO: UDP Servers testing.
/*
c.Assert(config.UdpServers, HasLen, 1)
c.Assert(config.UdpServers[0].Enabled, Equals, true)
c.Assert(config.UdpServers[0].Port, Equals, 4444)
c.Assert(config.UdpServers[0].Database, Equals, "test")
*/
}
func TestEncodeConfig(t *testing.T) {
c := influxdb.Config{}
c.Monitoring.WriteInterval = influxdb.Duration(time.Minute)
buf := new(bytes.Buffer)
if err := toml.NewEncoder(buf).Encode(&c); err != nil {
t.Fatal("Failed to encode: ", err)
}
got, search := buf.String(), `write-interval = "1m0s"`
if !strings.Contains(got, search) {
t.Fatalf("Encoding config failed.\nfailed to find %s in:\n%s\n", search, got)
}
}
func TestCollectd_ConnectionString(t *testing.T) {
var tests = []struct {
name string
defaultBindAddr string
connectionString string
config influxdb.Collectd
}{
{
name: "No address or port provided from config",
defaultBindAddr: "192.168.0.1",
connectionString: "192.168.0.1:25826",
config: influxdb.Collectd{},
},
{
name: "address provided, no port provided from config",
defaultBindAddr: "192.168.0.1",
connectionString: "192.168.0.2:25826",
config: influxdb.Collectd{BindAddress: "192.168.0.2"},
},
{
name: "no address provided, port provided from config",
defaultBindAddr: "192.168.0.1",
connectionString: "192.168.0.1:25827",
config: influxdb.Collectd{Port: 25827},
},
{
name: "both address and port provided from config",
defaultBindAddr: "192.168.0.1",
connectionString: "192.168.0.2:25827",
config: influxdb.Collectd{BindAddress: "192.168.0.2", Port: 25827},
},
}
for _, test := range tests {
t.Logf("test: %q", test.name)
s := test.config.ConnectionString(test.defaultBindAddr)
if s != test.connectionString {
t.Errorf("connection string mismatch, expected: %q, got: %q", test.connectionString, s)
}
// Validate configuration.
if c.Hostname != "localhost" {
t.Fatalf("unexpected hostname: %v", c.Hostname)
} else if c.BindAddress != ":8086" {
t.Fatalf("unexpected bind address: %s", c.BindAddress)
} else if c.ReportingEnabled != true {
t.Fatalf("unexpected reporting enabled: %v", c.ReportingEnabled)
} else if c.Initialization.JoinURLs != "serverA,serverB" {
t.Fatalf("unexpected join urls: %s", c.Initialization.JoinURLs)
} else if c.Meta.Dir != "/tmp/meta" {
t.Fatalf("unexpected meta dir: %s", c.Meta.Dir)
} else if c.Data.Dir != "/tmp/data" {
t.Fatalf("unexpected data dir: %s", c.Data.Dir)
} else if c.Admin.BindAddress != ":8083" {
t.Fatalf("unexpected admin bind address: %s", c.Admin.BindAddress)
} else if c.HTTPD.BindAddress != ":8087" {
t.Fatalf("unexpected api bind address: %s", c.HTTPD.BindAddress)
} else if len(c.Graphites) != 2 {
t.Fatalf("unexpected graphites count: %d", len(c.Graphites))
} else if c.Graphites[0].Protocol != "udp" {
t.Fatalf("unexpected graphite protocol(0): %s", c.Graphites[0].Protocol)
} else if c.Graphites[1].Protocol != "tcp" {
t.Fatalf("unexpected graphite protocol(1): %s", c.Graphites[1].Protocol)
} else if c.Collectd.BindAddress != ":1000" {
t.Fatalf("unexpected collectd bind address: %s", c.Collectd.BindAddress)
} else if c.OpenTSDB.BindAddress != ":2000" {
t.Fatalf("unexpected opentsdb bind address: %s", c.OpenTSDB.BindAddress)
} else if c.Monitoring.Enabled != true {
t.Fatalf("unexpected monitoring enabled: %v", c.Monitoring.Enabled)
} else if c.ContinuousQuery.Enabled != true {
t.Fatalf("unexpected continuous query enabled: %v", c.ContinuousQuery.Enabled)
}
}

View File

@ -1,44 +0,0 @@
package run
import (
"log"
"os"
"github.com/influxdb/influxdb"
)
type RunCommand struct {
// The logger passed to the ticker during execution.
logWriter *os.File
config *influxdb.Config
hostname string
node *Node
}
func NewRunCommand() *RunCommand {
return &RunCommand{
node: &Node{},
}
}
func printRunUsage() {
log.Printf(`usage: run [flags]
run starts the broker and data node server. If this is the first time running
the command then a new cluster will be initialized unless the -join argument
is used.
-config <path>
Set the path to the configuration file.
-hostname <name>
Override the hostname, the 'hostname' configuration
option will be overridden.
-join <url>
Joins the server to an existing cluster.
-pidfile <path>
Write process ID to a file.
`)
}

View File

@ -1,27 +1,12 @@
package run
import (
"flag"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"time"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/admin"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/collectd"
"github.com/influxdb/influxdb/graphite"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/opentsdb"
"github.com/influxdb/influxdb/tsdb"
)
@ -33,11 +18,11 @@ type Server struct {
}
// NewServer returns a new instance of Server built from a config.
func NewServer(c *influxdb.Config) (*Server, error) {
func NewServer(c *Config, joinURLs string) *Server {
// Construct base meta store and data store.
s := &Server{
MetaStore: meta.NewStore(filepath.Join(path, "meta")),
TSDBStore: tsdb.NewStore(filepath.Join(path, "data")),
MetaStore: meta.NewStore(c.Meta.Dir),
TSDBStore: tsdb.NewStore(c.Data.Dir),
}
// Add cluster Service
@ -100,6 +85,14 @@ func (s *Server) Close() error {
return nil
}
// Service represents a service attached to the server.
type Service interface {
Open() error
Close() error
Addr() net.Addr
}
/*
type Node struct {
Server *influxdb.Server
@ -232,119 +225,7 @@ func (s *Node) closeClusterListener() error {
return err
}
func (cmd *RunCommand) ParseConfig(path string) error {
// Parse configuration file from disk.
if path != "" {
var err error
cmd.config, err = ParseConfigFile(path)
if err != nil {
return fmt.Errorf("error parsing configuration %s - %s\n", path, err)
}
log.Printf("using configuration at: %s\n", path)
} else {
var err error
cmd.config, err = NewTestConfig()
if err != nil {
return fmt.Errorf("error parsing default config: %s\n", err)
}
log.Println("no configuration provided, using default settings")
}
// Override config properties.
if cmd.hostname != "" {
cmd.config.Hostname = cmd.hostname
}
cmd.node.hostname = cmd.config.Hostname
return nil
}
func (cmd *RunCommand) Run(args ...string) error {
// Parse command flags.
fs := flag.NewFlagSet("", flag.ExitOnError)
var configPath, pidfile, hostname, join, cpuprofile, memprofile string
fs.StringVar(&configPath, "config", "", "")
fs.StringVar(&pidfile, "pidfile", "", "")
fs.StringVar(&hostname, "hostname", "", "")
fs.StringVar(&join, "join", "", "")
fs.StringVar(&cpuprofile, "cpuprofile", "", "")
fs.StringVar(&memprofile, "memprofile", "", "")
fs.Usage = printRunUsage
fs.Parse(args)
cmd.hostname = hostname
// Start profiling, if set.
startProfiling(cpuprofile, memprofile)
defer stopProfiling()
// Print sweet InfluxDB logo and write the process id to file.
fmt.Print(logo)
writePIDFile(pidfile)
// Set parallelism.
runtime.GOMAXPROCS(runtime.NumCPU())
log.Printf("GOMAXPROCS set to %d", runtime.GOMAXPROCS(0))
// Parse config
if err := cmd.ParseConfig(configPath); err != nil {
log.Fatal(err)
}
// Use the config JoinURLs by default
joinURLs := cmd.config.Initialization.JoinURLs
// If a -join flag was passed, these should override the config
if join != "" {
joinURLs = join
}
cmd.CheckConfig()
cmd.Open(cmd.config, joinURLs)
// Wait indefinitely.
<-(chan struct{})(nil)
return nil
}
// CheckConfig validates the configuration
func (cmd *RunCommand) CheckConfig() {
// Set any defaults that aren't set
// TODO: bring more defaults in here instead of letting helpers do it
// Normalize Graphite configs
for i, _ := range cmd.config.Graphites {
if cmd.config.Graphites[i].BindAddress == "" {
cmd.config.Graphites[i].BindAddress = cmd.config.BindAddress
}
if cmd.config.Graphites[i].Port == 0 {
cmd.config.Graphites[i].Port = graphite.DefaultGraphitePort
}
}
// Normalize openTSDB config
if cmd.config.OpenTSDB.Addr == "" {
cmd.config.OpenTSDB.Addr = cmd.config.BindAddress
}
if cmd.config.OpenTSDB.Port == 0 {
cmd.config.OpenTSDB.Port = opentsdb.DefaultPort
}
// Validate that we have a sane config
if !(cmd.config.Data.Enabled || cmd.config.Broker.Enabled) {
log.Fatal("Node must be configured as a broker node, data node, or as both. To generate a valid configuration file run `influxd config > influxdb.generated.conf`.")
}
if cmd.config.Broker.Enabled && cmd.config.Broker.Dir == "" {
log.Fatal("Broker.Dir must be specified. To generate a valid configuration file run `influxd config > influxdb.generated.conf`.")
}
if cmd.config.Data.Enabled && cmd.config.Data.Dir == "" {
log.Fatal("Data.Dir must be specified. To generate a valid configuration file run `influxd config > influxdb.generated.conf`.")
}
}
func (cmd *RunCommand) Open(config *Config, join string) *Node {
if config != nil {
@ -496,7 +377,7 @@ func (cmd *RunCommand) Open(config *Config, join string) *Node {
}
// unless disabled, start the loop to report anonymous usage stats every 24h
if !cmd.config.ReportingDisabled {
if cmd.config.ReportingEnabled {
if cmd.config.Broker.Enabled && cmd.config.Data.Enabled {
// Make sure we have a config object b4 we try to use it.
if clusterID := cmd.node.Broker.Broker.ClusterID(); clusterID != 0 {
@ -531,24 +412,6 @@ func (cmd *RunCommand) Close() {
cmd.node.Close()
}
// write the current process id to a file specified by path.
func writePIDFile(path string) {
if path == "" {
return
}
// Ensure the required directory structure exists.
err := os.MkdirAll(filepath.Dir(path), 0755)
if err != nil {
log.Fatal(err)
}
// Retrieve the PID and write it.
pid := strconv.Itoa(os.Getpid())
if err := ioutil.WriteFile(path, []byte(pid), 0644); err != nil {
log.Fatal(err)
}
}
// creates and initializes a broker.
func (cmd *RunCommand) openBroker(brokerURLs []url.URL, h *Handler) {
@ -734,3 +597,4 @@ func fileExists(path string) bool {
}
return true
}
*/

View File

@ -1,5 +1,6 @@
package run_test
/*
import (
"bytes"
"encoding/json"
@ -113,7 +114,7 @@ func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes int
c.Port = basePort
c.Admin.Port = 0
c.Admin.Enabled = false
c.ReportingDisabled = true
c.ReportingEnabled = false
c.Snapshot.Enabled = false
c.Logging.HTTPAccess = false
@ -1229,7 +1230,7 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
expected: `{"results":[{"series":[{"name":"cpu","columns":["_id","host","region"],"values":[[2,"server01","uswest"]]}]}]}`,
},
{
query: "SHOW SERIES WHERE region =~ /ca.*/",
query: "SHOW SERIES WHERE region =~ /ca.*./",
queryDb: "%DB%",
expected: `{"results":[{"series":[{"name":"gpu","columns":["_id","host","region"],"values":[[6,"server03","caeast"]]}]}]}`,
},
@ -1260,12 +1261,12 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
expected: `{"results":[{"series":[{"name":"measurements","columns":["name"],"values":[["cpu"],["gpu"]]}]}]}`,
},
{
query: "SHOW MEASUREMENTS WHERE region =~ /ca.*/",
query: "SHOW MEASUREMENTS WHERE region =~ /ca.*./",
queryDb: "%DB%",
expected: `{"results":[{"series":[{"name":"measurements","columns":["name"],"values":[["gpu"],["other"]]}]}]}`,
},
{
query: "SHOW MEASUREMENTS WHERE region !~ /ca.*/",
query: "SHOW MEASUREMENTS WHERE region !~ /ca.*./",
queryDb: "%DB%",
expected: `{"results":[{"series":[{"name":"measurements","columns":["name"],"values":[["cpu"]]}]}]}`,
},
@ -1320,7 +1321,7 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
expected: `{"results":[{"series":[{"name":"hostTagValues","columns":["host"],"values":[["server01"]]}]}]}`,
},
{
query: `SHOW TAG VALUES WITH KEY = host WHERE region =~ /ca.*/`,
query: `SHOW TAG VALUES WITH KEY = host WHERE region =~ /ca.*./`,
queryDb: "%DB%",
expected: `{"results":[{"series":[{"name":"hostTagValues","columns":["host"],"values":[["server03"]]}]}]}`,
},
@ -2341,14 +2342,14 @@ func TestSeparateBrokerDataNode(t *testing.T) {
brokerConfig.Admin.Enabled = false
brokerConfig.Data.Enabled = false
brokerConfig.Broker.Dir = filepath.Join(tmpBrokerDir, strconv.Itoa(brokerConfig.Port))
brokerConfig.ReportingDisabled = true
brokerConfig.ReportingEnabled = false
dataConfig := main.NewConfig()
dataConfig.Port = 0
dataConfig.Admin.Enabled = false
dataConfig.Broker.Enabled = false
dataConfig.Data.Dir = filepath.Join(tmpDataDir, strconv.Itoa(dataConfig.Port))
dataConfig.ReportingDisabled = true
dataConfig.ReportingEnabled = false
brokerCmd := main.NewRunCommand()
broker := brokerCmd.Open(brokerConfig, "")
@ -2394,7 +2395,7 @@ func TestSeparateBrokerTwoDataNodes(t *testing.T) {
brokerConfig.Admin.Enabled = false
brokerConfig.Data.Enabled = false
brokerConfig.Broker.Dir = filepath.Join(tmpBrokerDir, "1")
brokerConfig.ReportingDisabled = true
brokerConfig.ReportingEnabled = false
brokerCmd := main.NewRunCommand()
broker := brokerCmd.Open(brokerConfig, "")
@ -2413,7 +2414,7 @@ func TestSeparateBrokerTwoDataNodes(t *testing.T) {
dataConfig1.Admin.Enabled = false
dataConfig1.Broker.Enabled = false
dataConfig1.Data.Dir = filepath.Join(tmpDataDir, "1")
dataConfig1.ReportingDisabled = true
dataConfig1.ReportingEnabled = false
dataCmd1 := main.NewRunCommand()
@ -2430,7 +2431,7 @@ func TestSeparateBrokerTwoDataNodes(t *testing.T) {
dataConfig2.Admin.Enabled = false
dataConfig2.Broker.Enabled = false
dataConfig2.Data.Dir = filepath.Join(tmpDataDir, "2")
dataConfig2.ReportingDisabled = true
dataConfig2.ReportingEnabled = false
dataCmd2 := main.NewRunCommand()
@ -2462,3 +2463,4 @@ func mustMarshalJSON(v interface{}) string {
func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) }
func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) }
*/

8
collectd/config.go Normal file
View File

@ -0,0 +1,8 @@
package collectd
type Config struct {
Enabled bool `toml:"enabled"`
BindAddress string `toml:"bind-address"`
Database string `toml:"database"`
TypesDB string `toml:"typesdb"`
}

32
collectd/config_test.go Normal file
View File

@ -0,0 +1,32 @@
package collectd_test
import (
"testing"
"github.com/BurntSushi/toml"
"github.com/influxdb/influxdb/collectd"
)
func TestConfig_Parse(t *testing.T) {
// Parse configuration.
var c collectd.Config
if _, err := toml.Decode(`
enabled = true
bind-address = ":9000"
database = "xxx"
typesdb = "yyy"
`, &c); err != nil {
t.Fatal(err)
}
// Validate configuration.
if c.Enabled != true {
t.Fatalf("unexpected enabled: %v", c.Enabled)
} else if c.BindAddress != ":9000" {
t.Fatalf("unexpected bind address: %s", c.BindAddress)
} else if c.Database != "xxx" {
t.Fatalf("unexpected database: %s", c.Database)
} else if c.TypesDB != "yyy" {
t.Fatalf("unexpected types db: %s", c.TypesDB)
}
}

34
graphite/config.go Normal file
View File

@ -0,0 +1,34 @@
package graphite
import "strings"
const (
// DefaultDatabase is the default database if none is specified.
DefaultDatabase = "graphite"
// DefaultNameSeparator represents the default field separator.
DefaultNameSeparator = "."
)
// Config represents the configuration for Graphite endpoints.
type Config struct {
BindAddress string `toml:"bind-address"`
Database string `toml:"database"`
Enabled bool `toml:"enabled"`
Protocol string `toml:"protocol"`
NamePosition string `toml:"name-position"`
NameSeparator string `toml:"name-separator"`
}
// NewConfig returns a new Config with defaults.
func NewConfig() Config {
return Config{
Database: DefaultDatabase,
NameSeparator: DefaultNameSeparator,
}
}
// LastEnabled returns whether the server should interpret the last field as "name".
func (c *Config) LastEnabled() bool {
return c.NamePosition == strings.ToLower("last")
}

35
graphite/config_test.go Normal file
View File

@ -0,0 +1,35 @@
package graphite_test
import (
"testing"
"github.com/BurntSushi/toml"
"github.com/influxdb/influxdb/httpd"
)
func TestConfig_Parse(t *testing.T) {
// Parse configuration.
var c httpd.Config
if _, err := toml.Decode(`
bind-address = ":8080"
auth-enabled = true
log-enabled = true
write-tracing = true
pprof-enabled = true
`, &c); err != nil {
t.Fatal(err)
}
// Validate configuration.
if c.BindAddress != ":8080" {
t.Fatalf("unexpected bind address: %s", c.BindAddress)
} else if c.AuthEnabled != true {
t.Fatalf("unexpected auth enabled: %v", c.AuthEnabled)
} else if c.LogEnabled != true {
t.Fatalf("unexpected log enabled: %v", c.LogEnabled)
} else if c.WriteTracing != true {
t.Fatalf("unexpected write tracing: %v", c.WriteTracing)
} else if c.PprofEnabled != true {
t.Fatalf("unexpected pprof enabled: %v", c.PprofEnabled)
}
}

View File

@ -14,9 +14,6 @@ import (
const (
// DefaultGraphitePort represents the default Graphite (Carbon) plaintext port.
DefaultGraphitePort = 2003
// DefaultGraphiteNameSeparator represents the default Graphite field separator.
DefaultGraphiteNameSeparator = "."
)
var (
@ -63,7 +60,7 @@ type Parser struct {
// NewParser returns a GraphiteParser instance.
func NewParser() *Parser {
return &Parser{Separator: DefaultGraphiteNameSeparator}
return &Parser{Separator: DefaultNameSeparator}
}
// Parse performs Graphite parsing of a single line.

16
httpd/config.go Normal file
View File

@ -0,0 +1,16 @@
package httpd
type Config struct {
BindAddress string `toml:"bind-address"`
AuthEnabled bool `toml:"auth-enabled"`
LogEnabled bool `toml:"log-enabled"`
WriteTracing bool `toml:"write-tracing"`
PprofEnabled bool `toml:"pprof-enabled"`
}
func NewConfig() Config {
return Config{
LogEnabled: true,
WriteTracing: false,
}
}

35
httpd/config_test.go Normal file
View File

@ -0,0 +1,35 @@
package httpd_test
import (
"testing"
"github.com/BurntSushi/toml"
"github.com/influxdb/influxdb/httpd"
)
func TestConfig_Parse(t *testing.T) {
// Parse configuration.
var c httpd.Config
if _, err := toml.Decode(`
bind-address = ":8080"
auth-enabled = true
log-enabled = true
write-tracing = true
pprof-enabled = true
`, &c); err != nil {
t.Fatal(err)
}
// Validate configuration.
if c.BindAddress != ":8080" {
t.Fatalf("unexpected bind address: %s", c.BindAddress)
} else if c.AuthEnabled != true {
t.Fatalf("unexpected auth enabled: %v", c.AuthEnabled)
} else if c.LogEnabled != true {
t.Fatalf("unexpected log enabled: %v", c.LogEnabled)
} else if c.WriteTracing != true {
t.Fatalf("unexpected write tracing: %v", c.WriteTracing)
} else if c.PprofEnabled != true {
t.Fatalf("unexpected pprof enabled: %v", c.PprofEnabled)
}
}

View File

@ -7,8 +7,8 @@ import (
"strings"
)
// HTTPService manages the listener and handler for an HTTP endpoint.
type HTTPService struct {
// Service manages the listener and handler for an HTTP endpoint.
type Service struct {
listener net.Listener
addr string
err chan error
@ -16,16 +16,16 @@ type HTTPService struct {
Handler Handler
}
// NewHTTPService returns a new instance of HTTPService.
func NewHTTPService(c *Config) *HTTPService {
return &HTTPService{
// NewService returns a new instance of Service.
func NewService(c *Config) *Service {
return &Service{
addr: c.BindAddress,
err: make(chan error),
}
}
// Open starts the service
func (s *HTTPService) Open() error {
func (s *Service) Open() error {
// Open listener.
listener, err := net.Listen("tcp", s.addr)
if err != nil {
@ -39,7 +39,7 @@ func (s *HTTPService) Open() error {
}
// Close closes the underlying listener.
func (s *HTTPService) Close() error {
func (s *Service) Close() error {
if s.listener != nil {
return s.listener.Close()
}
@ -47,10 +47,10 @@ func (s *HTTPService) Close() error {
}
// Err returns a channel for fatal errors that occur on the listener.
func (s *HTTPService) Err() <-chan error { return s.err }
func (s *Service) Err() <-chan error { return s.err }
// Addr returns the listener's address. Returns nil if listener is closed.
func (s *HTTPService) Addr() net.Addr {
func (s *Service) Addr() net.Addr {
if s.listener != nil {
return s.listener.Addr()
}
@ -58,7 +58,7 @@ func (s *HTTPService) Addr() net.Addr {
}
// serve serves the handler from the listener.
func (s *HTTPService) serve() {
func (s *Service) serve() {
// The listener was closed so exit
// See https://github.com/golang/go/issues/4373
err := http.Serve(s.listener, &s.Handler)
@ -66,10 +66,3 @@ func (s *HTTPService) serve() {
s.err <- fmt.Errorf("listener failed: addr=%s, err=%s", s.Addr(), err)
}
}
type Config struct {
BindAddress string `toml:"bind-address"`
Port int `toml:"port"`
LogEnabled bool `toml:"log-enabled"`
WriteTracing bool `toml:"write-tracing"`
}

39
meta/config.go Normal file
View File

@ -0,0 +1,39 @@
package meta
import (
"time"
"github.com/influxdb/influxdb/toml"
)
const (
// DefaultHeartbeatTimeout is the default heartbeat timeout for the store.
DefaultHeartbeatTimeout = 1000 * time.Millisecond
// DefaultElectionTimeout is the default election timeout for the store.
DefaultElectionTimeout = 1000 * time.Millisecond
// DefaultLeaderLeaseTimeout is the default leader lease for the store.
DefaultLeaderLeaseTimeout = 500 * time.Millisecond
// DefaultCommitTimeout is the default commit timeout for the store.
DefaultCommitTimeout = 50 * time.Millisecond
)
// Config represents the meta configuration.
type Config struct {
Dir string `toml:"dir"`
ElectionTimeout toml.Duration `toml:"election-timeout"`
HeartbeatTimeout toml.Duration `toml:"heartbeat-timeout"`
LeaderLeaseTimeout toml.Duration `toml:"leader-lease-timeout"`
CommitTimeout toml.Duration `toml:"commit-timeout"`
}
func NewConfig() Config {
return Config{
ElectionTimeout: toml.Duration(DefaultElectionTimeout),
HeartbeatTimeout: toml.Duration(DefaultHeartbeatTimeout),
LeaderLeaseTimeout: toml.Duration(DefaultLeaderLeaseTimeout),
CommitTimeout: toml.Duration(DefaultCommitTimeout),
}
}

36
meta/config_test.go Normal file
View File

@ -0,0 +1,36 @@
package meta_test
import (
"testing"
"time"
"github.com/BurntSushi/toml"
"github.com/influxdb/influxdb/meta"
)
func TestConfig_Parse(t *testing.T) {
// Parse configuration.
var c meta.Config
if _, err := toml.Decode(`
dir = "/tmp/foo"
election-timeout = "10s"
heartbeat-timeout = "20s"
leader-lease-timeout = "30h"
commit-timeout = "40m"
`, &c); err != nil {
t.Fatal(err)
}
// Validate configuration.
if c.Dir != "/tmp/foo" {
t.Fatalf("unexpected dir: %s", c.Dir)
} else if time.Duration(c.ElectionTimeout) != 10*time.Second {
t.Fatalf("unexpected election timeout: %v", c.ElectionTimeout)
} else if time.Duration(c.HeartbeatTimeout) != 20*time.Second {
t.Fatalf("unexpected heartbeat timeout: %v", c.HeartbeatTimeout)
} else if time.Duration(c.LeaderLeaseTimeout) != 30*time.Hour {
t.Fatalf("unexpected leader lease timeout: %v", c.LeaderLeaseTimeout)
} else if time.Duration(c.CommitTimeout) != 40*time.Minute {
t.Fatalf("unexpected commit timeout: %v", c.CommitTimeout)
}
}

View File

@ -0,0 +1,61 @@
package continuous_querier
import (
"time"
"github.com/influxdb/influxdb/toml"
)
const (
DefaultRecomputePreviousN = 2
DefaultRecomputeNoOlderThan = 10 * time.Minute
DefaultComputeRunsPerInterval = 10
DefaultComputeNoMoreThan = 2 * time.Minute
)
// Config represents a configuration for the continuous query service.
type Config struct {
// If this flag is set to false, both the brokers and data nodes should ignore any CQ processing.
Enabled bool `toml:"enabled"`
// when continuous queries are run we'll automatically recompute previous intervals
// in case lagged data came in. Set to zero if you never have lagged data. We do
// it this way because invalidating previously computed intervals would be insanely hard
// and expensive.
RecomputePreviousN int `toml:"recompute-previous-n"`
// The RecomputePreviousN setting provides guidance for how far back to recompute, the RecomputeNoOlderThan
// setting sets a ceiling on how far back in time it will go. For example, if you have 2 PreviousN
// and have this set to 10m, then we'd only compute the previous two intervals for any
// CQs that have a group by time <= 5m. For all others, we'd only recompute the previous window
RecomputeNoOlderThan toml.Duration `toml:"recompute-no-older-than"`
// ComputeRunsPerInterval will determine how many times the current and previous N intervals
// will be computed. The group by time will be divided by this and it will get computed this many times:
// group by time seconds / runs per interval
// This will give partial results for current group by intervals and will determine how long it will
// be until lagged data is recomputed. For example, if this number is 10 and the group by time is 10m, it
// will be a minute past the previous 10m bucket of time before lagged data is picked up
ComputeRunsPerInterval int `toml:"compute-runs-per-interval"`
// ComputeNoMoreThan paired with the RunsPerInterval will determine the ceiling of how many times smaller
// group by times will be computed. For example, if you have RunsPerInterval set to 10 and this setting
// to 1m. Then for a group by time(1m) will actually only get computed once per interval (and once per PreviousN).
// If you have a group by time(5m) then you'll get five computes per interval. Any group by time window larger
// than 10m will get computed 10 times for each interval.
ComputeNoMoreThan toml.Duration `toml:"compute-no-more-than"`
}
// NewConfig returns a new instance of Config with defaults.
func NewConfig() Config {
return Config{
Enabled: true,
RecomputePreviousN: DefaultRecomputePreviousN,
RecomputeNoOlderThan: toml.Duration(DefaultRecomputeNoOlderThan),
ComputeRunsPerInterval: DefaultComputeRunsPerInterval,
ComputeNoMoreThan: toml.Duration(DefaultComputeNoMoreThan),
}
}

View File

@ -0,0 +1,36 @@
package continuous_querier_test
import (
"testing"
"time"
"github.com/BurntSushi/toml"
"github.com/influxdb/influxdb/meta/continuous_querier"
)
func TestConfig_Parse(t *testing.T) {
// Parse configuration.
var c continuous_querier.Config
if _, err := toml.Decode(`
recompute-previous-n = 1
recompute-no-older-than = "10s"
compute-runs-per-interval = 2
compute-no-more-than = "20s"
disabled = true
`, &c); err != nil {
t.Fatal(err)
}
// Validate configuration.
if c.RecomputePreviousN != 1 {
t.Fatalf("unexpected recompute previous n: %d", c.RecomputePreviousN)
} else if time.Duration(c.RecomputeNoOlderThan) != 10*time.Second {
t.Fatalf("unexpected recompute no older than: %v", c.RecomputeNoOlderThan)
} else if c.ComputeRunsPerInterval != 2 {
t.Fatalf("unexpected compute runs per interval: %d", c.ComputeRunsPerInterval)
} else if time.Duration(c.ComputeNoMoreThan) != 20*time.Second {
t.Fatalf("unexpected compute no more than: %v", c.ComputeNoMoreThan)
} else if c.Disabled != true {
t.Fatalf("unexpected disabled: %v", c.Disabled)
}
}

View File

@ -0,0 +1 @@
package continuous_querier

View File

@ -19,20 +19,6 @@ import (
"golang.org/x/crypto/bcrypt"
)
const (
// DefaultHeartbeatTimeout is the default heartbeat timeout for the store.
DefaultHeartbeatTimeout = 1000 * time.Millisecond
// DefaultElectionTimeout is the default election timeout for the store.
DefaultElectionTimeout = 1000 * time.Millisecond
// DefaultLeaderLeaseTimeout is the default leader lease for the store.
DefaultLeaderLeaseTimeout = 500 * time.Millisecond
// DefaultCommitTimeout is the default commit timeout for the store.
DefaultCommitTimeout = 50 * time.Millisecond
)
// Raft configuration.
const (
raftLogCacheSize = 512

25
monitor/config.go Normal file
View File

@ -0,0 +1,25 @@
package monitor
import (
"time"
"github.com/influxdb/influxdb/toml"
)
const (
// DefaultStatisticsWriteInterval is the interval of time between internal stats are written
DefaultStatisticsWriteInterval = 1 * time.Minute
)
// Config represents a configuration for the monitor.
type Config struct {
Enabled bool `toml:"enabled"`
WriteInterval toml.Duration `toml:"write-interval"`
}
func NewConfig() Config {
return Config{
Enabled: false,
WriteInterval: toml.Duration(DefaultStatisticsWriteInterval),
}
}

83
monitor/monitor.go Normal file
View File

@ -0,0 +1,83 @@
package monitor
// Monitor represents a TSDB monitoring service.
type Monitor struct {
Store interface{}
}
func (m *Monitor) Open() error { return nil }
func (m *Monitor) Close() error { return nil }
// StartSelfMonitoring starts a goroutine which monitors the InfluxDB server
// itself and stores the results in the specified database at a given interval.
/*
func (s *Server) StartSelfMonitoring(database, retention string, interval time.Duration) error {
if interval == 0 {
return fmt.Errorf("statistics check interval must be non-zero")
}
go func() {
tick := time.NewTicker(interval)
for {
<-tick.C
// Create the batch and tags
tags := map[string]string{"serverID": strconv.FormatUint(s.ID(), 10)}
if h, err := os.Hostname(); err == nil {
tags["host"] = h
}
batch := pointsFromStats(s.stats, tags)
// Shard-level stats.
tags["shardID"] = strconv.FormatUint(s.id, 10)
s.mu.RLock()
for _, sh := range s.shards {
if !sh.HasDataNodeID(s.id) {
// No stats for non-local shards.
continue
}
batch = append(batch, pointsFromStats(sh.stats, tags)...)
}
s.mu.RUnlock()
// Server diagnostics.
for _, row := range s.DiagnosticsAsRows() {
points, err := s.convertRowToPoints(row.Name, row)
if err != nil {
s.Logger.Printf("failed to write diagnostic row for %s: %s", row.Name, err.Error())
continue
}
for _, p := range points {
p.AddTag("serverID", strconv.FormatUint(s.ID(), 10))
}
batch = append(batch, points...)
}
s.WriteSeries(database, retention, batch)
}
}()
return nil
}
// Function for local use turns stats into a slice of points
func pointsFromStats(st *Stats, tags map[string]string) []tsdb.Point {
var points []tsdb.Point
now := time.Now()
st.Walk(func(k string, v int64) {
point := tsdb.NewPoint(
st.name+"_"+k,
make(map[string]string),
map[string]interface{}{"value": int(v)},
now,
)
// Specifically create a new map.
for k, v := range tags {
tags[k] = v
point.AddTag(k, v)
}
points = append(points, point)
})
return points
}
*/

8
opentsdb/config.go Normal file
View File

@ -0,0 +1,8 @@
package opentsdb
type Config struct {
Enabled bool `toml:"enabled"`
BindAddress string `toml:"bind-address"`
Database string `toml:"database"`
RetentionPolicy string `toml:"retention-policy"`
}

29
opentsdb/config_test.go Normal file
View File

@ -0,0 +1,29 @@
package opentsdb_test
import (
"testing"
"github.com/BurntSushi/toml"
"github.com/influxdb/influxdb/opentsdb"
)
func TestConfig_Parse(t *testing.T) {
// Parse configuration.
var c opentsdb.Config
if _, err := toml.Decode(`
enabled = true
bind-address = ":9000"
database = "xxx"
`, &c); err != nil {
t.Fatal(err)
}
// Validate configuration.
if c.Enabled != true {
t.Fatalf("unexpected enabled: %v", c.Enabled)
} else if c.BindAddress != ":9000" {
t.Fatalf("unexpected bind address: %s", c.BindAddress)
} else if c.Database != "xxx" {
t.Fatalf("unexpected database: %s", c.Database)
}
}

View File

@ -35,8 +35,7 @@ type SeriesWriter interface {
type Server struct {
writer SeriesWriter
database string
retentionpolicy string
database string
listener *net.TCPListener
tsdbhttp *tsdbHTTPListener
@ -50,7 +49,6 @@ func NewServer(w SeriesWriter, retpol string, db string) *Server {
s := &Server{}
s.writer = w
s.retentionpolicy = retpol
s.database = db
s.tsdbhttp = makeTSDBHTTPListener()
@ -225,7 +223,7 @@ func (s *Server) HandleTelnet(conn net.Conn) {
p := tsdb.NewPoint(name, tags, fields, t)
_, err = s.writer.WriteSeries(s.database, s.retentionpolicy, []tsdb.Point{p})
_, err = s.writer.WriteSeries(s.database, "", []tsdb.Point{p})
if err != nil {
log.Println("TSDB cannot write data: ", err)
continue
@ -331,7 +329,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
idps = append(idps, p)
}
_, err = s.writer.WriteSeries(s.database, s.retentionpolicy, idps)
_, err = s.writer.WriteSeries(s.database, "", idps)
if err != nil {
log.Println("TSDB cannot write data: ", err)
}

View File

@ -1,6 +1,13 @@
package toml
import "time"
import (
"fmt"
"strconv"
"time"
)
// maxInt is the largest integer representable by a word (architeture dependent).
const maxInt = int64(^uint(0) >> 1)
// Duration is a TOML wrapper type for time.Duration.
type Duration time.Duration
@ -31,3 +38,35 @@ func (d *Duration) UnmarshalText(text []byte) error {
func (d Duration) MarshalText() (text []byte, err error) {
return []byte(d.String()), nil
}
// Size represents a TOML parseable file size.
// Users can specify size using "m" for megabytes and "g" for gigabytes.
type Size int
// UnmarshalText parses a byte size from text.
func (s *Size) UnmarshalText(text []byte) error {
// Parse numeric portion of value.
length := len(string(text))
size, err := strconv.ParseInt(string(text[:length-1]), 10, 64)
if err != nil {
return err
}
// Parse unit of measure ("m", "g", etc).
switch suffix := text[len(text)-1]; suffix {
case 'm':
size *= 1 << 20 // MB
case 'g':
size *= 1 << 30 // GB
default:
return fmt.Errorf("unknown size suffix: %c", suffix)
}
// Check for overflow.
if size > maxInt {
return fmt.Errorf("size %d cannot be represented by an int", size)
}
*s = Size(size)
return nil
}

47
toml/toml_test.go Normal file
View File

@ -0,0 +1,47 @@
package toml_test
import (
"reflect"
"testing"
"github.com/influxdb/influxdb/toml"
)
// Ensure that megabyte sizes can be parsed.
func TestSize_UnmarshalText_MB(t *testing.T) {
var s toml.Size
if err := s.UnmarshalText([]byte("200m")); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if s != 200*(1<<20) {
t.Fatalf("unexpected size: %d", s)
}
}
// Ensure that gigabyte sizes can be parsed.
func TestSize_UnmarshalText_GB(t *testing.T) {
if typ := reflect.TypeOf(0); typ.Size() != 8 {
t.Skip("large gigabyte parsing on 64-bit arch only")
}
var s toml.Size
if err := s.UnmarshalText([]byte("10g")); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if s != 10*(1<<30) {
t.Fatalf("unexpected size: %d", s)
}
}
/*
func TestConfig_Encode(t *testing.T) {
var c influxdb.Config
c.Monitoring.WriteInterval = influxdb.Duration(time.Minute)
buf := new(bytes.Buffer)
if err := toml.NewEncoder(buf).Encode(&c); err != nil {
t.Fatal("Failed to encode: ", err)
}
got, search := buf.String(), `write-interval = "1m0s"`
if !strings.Contains(got, search) {
t.Fatalf("Encoding config failed.\nfailed to find %s in:\n%s\n", search, got)
}
}
*/

View File

@ -1,6 +1,25 @@
package tsdb
import "github.com/influxdb/influxdb/toml"
import (
"time"
"github.com/influxdb/influxdb/toml"
)
const (
// DefaultRetentionAutoCreate is the default for auto-creating retention policies
DefaultRetentionAutoCreate = true
// DefaultRetentionCheckEnabled is the default for checking for retention policy enforcement
DefaultRetentionCheckEnabled = true
// DefaultRetentionCreatePeriod represents how often the server will check to see if new
// shard groups need to be created in advance for writing
DefaultRetentionCreatePeriod = 45 * time.Minute
// DefaultRetentionCheckPeriod is the period of time between retention policy checks are run
DefaultRetentionCheckPeriod = 10 * time.Minute
)
type Config struct {
Dir string `toml:"dir"`
@ -9,3 +28,21 @@ type Config struct {
RetentionCheckPeriod toml.Duration `toml:"retention-check-period"`
RetentionCreatePeriod toml.Duration `toml:"retention-create-period"`
}
func NewConfig() Config {
return Config{
RetentionAutoCreate: DefaultRetentionAutoCreate,
RetentionCheckEnabled: DefaultRetentionCheckEnabled,
RetentionCheckPeriod: toml.Duration(DefaultRetentionCheckPeriod),
RetentionCreatePeriod: toml.Duration(DefaultRetentionCreatePeriod),
}
}
// ShardGroupPreCreateCheckPeriod returns the check interval to pre-create shard groups.
// If it was not defined in the config, it defaults to DefaultShardGroupPreCreatePeriod
func (c *Config) ShardGroupPreCreateCheckPeriod() time.Duration {
if c.RetentionCreatePeriod != 0 {
return time.Duration(c.RetentionCreatePeriod)
}
return DefaultRetentionCreatePeriod
}