diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 25f9e9a5b5..e3f0ee2f54 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -85,17 +85,9 @@ func NewServer(c *Config, version string) (*Server, error) { MetaStore: meta.NewStore(c.Meta), TSDBStore: tsdbStore, - reportingDisabled: c.ReportingDisabled, - } + Monitor: monitor.New(c.Monitor), - // Start the monitor service. - clusterID, err := s.MetaStore.ClusterID() - if err != nil { - return nil, err - } - s.Monitor = monitor.New(c.Monitor) - if err := s.Monitor.Open(clusterID, s.MetaStore.NodeID(), s.Hostname); err != nil { - return nil, err + reportingDisabled: c.ReportingDisabled, } // Copy TSDB configuration. @@ -113,7 +105,7 @@ func NewServer(c *Config, version string) (*Server, error) { s.QueryExecutor = tsdb.NewQueryExecutor(s.TSDBStore) s.QueryExecutor.MetaStore = s.MetaStore s.QueryExecutor.MetaStatementExecutor = &meta.StatementExecutor{Store: s.MetaStore} - s.QueryExecutor.MonitorStatementExecutor = s.Monitor + s.QueryExecutor.MonitorStatementExecutor = &monitor.StatementExecutor{Monitor: s.Monitor} s.QueryExecutor.ShardMapper = s.ShardMapper // Set the shard writer @@ -131,6 +123,13 @@ func NewServer(c *Config, version string) (*Server, error) { s.PointsWriter.ShardWriter = s.ShardWriter s.PointsWriter.HintedHandoff = s.HintedHandoff + // Initialize the monitor + s.Monitor.MetaStore = s.MetaStore + s.Monitor.PointsWriter = s.PointsWriter + if err := s.Monitor.Open(); err != nil { + return nil, err + } + // Append services. s.appendClusterService(c.Cluster) s.appendPrecreatorService(c.Precreator) @@ -383,6 +382,9 @@ func (s *Server) Close() error { if s.HintedHandoff != nil { s.HintedHandoff.Close() } + if s.Monitor != nil { + s.Monitor.Close() + } for _, service := range s.Services { service.Close() } diff --git a/cmd/influxd/run/server_helpers_test.go b/cmd/influxd/run/server_helpers_test.go index f00f536def..f8e6c5b1c6 100644 --- a/cmd/influxd/run/server_helpers_test.go +++ b/cmd/influxd/run/server_helpers_test.go @@ -164,6 +164,8 @@ func NewConfig() *run.Config { c.HTTPD.BindAddress = "127.0.0.1:0" c.HTTPD.LogEnabled = testing.Verbose() + c.Monitor.StoreEnabled = false + return c } diff --git a/etc/config.sample.toml b/etc/config.sample.toml index af1398be17..6240c0bf98 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -89,14 +89,13 @@ reporting-disabled = false check-interval = "10m" ### -### Controls the system self-monitoring, statistics, diagnostics, and expvar data. +### Controls the system self-monitoring, statistics and diagnostics. ### [monitor] - store-enabled = false # Whether to record statistics in an InfluxDB system + store-enabled = true # Whether to record statistics internally. store-database = "_internal" # The destination database for recorded statistics store-interval = "1m" # The interval at which to record statistics - store-address = "http://127.0.0.1:8086" # The protocol and host for the recorded data ### ### [admin] diff --git a/monitor/config.go b/monitor/config.go index 061a472d5a..c452a99738 100644 --- a/monitor/config.go +++ b/monitor/config.go @@ -16,9 +16,6 @@ const ( // DefaultStoreInterval is the period between storing gathered information. DefaultStoreInterval = time.Minute - - // DefaultStoreAddress is the destination system for gathered information. - DefaultStoreAddress = "127.0.0.1:8086" ) // Config represents the configuration for the monitor service. @@ -26,15 +23,13 @@ type Config struct { StoreEnabled bool `toml:"store-enabled"` StoreDatabase string `toml:"store-database"` StoreInterval toml.Duration `toml:"store-interval"` - StoreAddress string `toml:"store-address"` } // NewConfig returns an instance of Config with defaults. func NewConfig() Config { return Config{ - StoreEnabled: false, + StoreEnabled: true, StoreDatabase: DefaultStoreDatabase, StoreInterval: toml.Duration(DefaultStoreInterval), - StoreAddress: DefaultStoreAddress, } } diff --git a/monitor/config_test.go b/monitor/config_test.go index 0626a7cfca..ee62e73466 100644 --- a/monitor/config_test.go +++ b/monitor/config_test.go @@ -15,7 +15,6 @@ func TestConfig_Parse(t *testing.T) { store-enabled=true store-database="the_db" store-interval="10m" -store-address="server1" `, &c); err != nil { t.Fatal(err) } @@ -27,7 +26,5 @@ store-address="server1" t.Fatalf("unexpected store-database: %s", c.StoreDatabase) } else if time.Duration(c.StoreInterval) != 10*time.Minute { t.Fatalf("unexpected store-interval: %s", c.StoreInterval) - } else if c.StoreAddress != "server1" { - t.Fatalf("unexpected store-address: %s", c.StoreAddress) } } diff --git a/monitor/service.go b/monitor/service.go index aa3dd36ad9..fd38e4f7df 100644 --- a/monitor/service.go +++ b/monitor/service.go @@ -2,19 +2,19 @@ package monitor import ( "expvar" - "fmt" "log" - "net/http" - "net/url" "os" "sort" "strconv" "sync" "time" - "github.com/influxdb/influxdb/influxql" + "github.com/influxdb/influxdb/cluster" + "github.com/influxdb/influxdb/meta" ) +const leaderWaitTimeout = 30 * time.Second + // Client is the interface modules must implement if they wish to register with monitor. type Client interface { Statistics() (map[string]interface{}, error) @@ -28,25 +28,32 @@ type Monitor struct { mu sync.Mutex registrations []*clientWithMeta - hostname string - clusterID uint64 - nodeID uint64 - storeEnabled bool storeDatabase string storeAddress string storeInterval time.Duration + MetaStore interface { + ClusterID() (uint64, error) + NodeID() uint64 + WaitForLeader(d time.Duration) error + CreateDatabaseIfNotExists(name string) (*meta.DatabaseInfo, error) + } + + PointsWriter interface { + WritePoints(p *cluster.WritePointsRequest) error + } + Logger *log.Logger } // New returns a new instance of the monitor system. func New(c Config) *Monitor { return &Monitor{ + done: make(chan struct{}), registrations: make([]*clientWithMeta, 0), storeEnabled: c.StoreEnabled, storeDatabase: c.StoreDatabase, - storeAddress: c.StoreAddress, storeInterval: time.Duration(c.StoreInterval), Logger: log.New(os.Stderr, "[monitor] ", log.LstdFlags), } @@ -54,24 +61,14 @@ func New(c Config) *Monitor { // Open opens the monitoring system, using the given clusterID, node ID, and hostname // for identification purposem. -func (m *Monitor) Open(clusterID, nodeID uint64, hostname string) error { - m.Logger.Printf("starting monitor system for cluster %d, host %s", clusterID, hostname) - m.clusterID = clusterID - m.nodeID = nodeID - m.hostname = hostname +func (m *Monitor) Open() error { + m.Logger.Printf("Starting monitor system") // Self-register Go runtime statm. m.Register("runtime", nil, &goRuntime{}) // If enabled, record stats in a InfluxDB system. if m.storeEnabled { - m.Logger.Printf("storing in %s, database '%s', interval %s", - m.storeAddress, m.storeDatabase, m.storeInterval) - - m.Logger.Printf("ensuring database %s exists on %s", m.storeDatabase, m.storeAddress) - if err := ensureDatabaseExists(m.storeAddress, m.storeDatabase); err != nil { - return err - } // Start periodic writes to system. m.wg.Add(1) @@ -108,38 +105,8 @@ func (m *Monitor) Register(name string, tags map[string]string, client Client) e return nil } -// ExecuteStatement executes monitor-related query statements. -func (m *Monitor) ExecuteStatement(stmt influxql.Statement) *influxql.Result { - switch stmt := stmt.(type) { - case *influxql.ShowStatsStatement: - return m.executeShowStatistics(stmt) - default: - panic(fmt.Sprintf("unsupported statement type: %T", stmt)) - } -} - -// executeShowStatistics returns the statistics of the registered monitor client in -// the standard form expected by users of the InfluxDB system. -func (m *Monitor) executeShowStatistics(q *influxql.ShowStatsStatement) *influxql.Result { - stats, _ := m.statistics() - rows := make([]*influxql.Row, len(stats)) - - for n, stat := range stats { - row := &influxql.Row{Name: stat.Name, Tags: stat.Tags} - - values := make([]interface{}, 0, len(stat.Values)) - for _, k := range stat.valueNames() { - row.Columns = append(row.Columns, k) - values = append(values, stat.Values[k]) - } - row.Values = [][]interface{}{values} - rows[n] = row - } - return &influxql.Result{Series: rows} -} - // statistics returns the combined statistics for all registered clients. -func (m *Monitor) statistics() ([]*statistic, error) { +func (m *Monitor) Statistics() ([]*statistic, error) { m.mu.Lock() defer m.mu.Unlock() @@ -168,6 +135,20 @@ func (m *Monitor) storeStatistics() { //a.Tags["hostname"] = m.hostname defer m.wg.Done() + m.Logger.Printf("storing statistics in database '%s', interval %s", + m.storeDatabase, m.storeInterval) + + if err := m.MetaStore.WaitForLeader(leaderWaitTimeout); err != nil { + m.Logger.Printf("failed to detect a cluster leader, terminating storage: %s", err.Error()) + return + } + + if _, err := m.MetaStore.CreateDatabaseIfNotExists(m.storeDatabase); err != nil { + m.Logger.Printf("failed to create database '%s', terminating storage: %s", + m.storeDatabase, err.Error()) + return + } + tick := time.NewTicker(m.storeInterval) defer tick.Stop() for { @@ -175,7 +156,7 @@ func (m *Monitor) storeStatistics() { case <-tick.C: // Write stats here. case <-m.done: - m.Logger.Printf("terminating storage of statistics to %s", m.storeAddress) + m.Logger.Printf("terminating storage of statistics") return } @@ -271,17 +252,3 @@ func (m MonitorClient) Statistics() (map[string]interface{}, error) { func (m MonitorClient) Diagnostics() (map[string]interface{}, error) { return nil, nil } - -func ensureDatabaseExists(host, database string) error { - values := url.Values{} - values.Set("q", fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", database)) - resp, err := http.Get(host + "/query?" + values.Encode()) - if err != nil { - return fmt.Errorf("failed to create monitoring database on %s: %s", host, err.Error()) - } - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("failed to create monitoring database on %s, received code: %d", - host, resp.StatusCode) - } - return nil -} diff --git a/monitor/service_test.go b/monitor/service_test.go index c4b0a292e2..3f22ff5576 100644 --- a/monitor/service_test.go +++ b/monitor/service_test.go @@ -3,13 +3,16 @@ package monitor import ( "strings" "testing" + "time" "github.com/influxdb/influxdb/influxql" + "github.com/influxdb/influxdb/meta" ) // Test that a registered stats client results in the correct SHOW STATS output. func Test_RegisterStats(t *testing.T) { monitor := openMonitor(t) + executor := &StatementExecutor{Monitor: monitor} client := mockStatsClient{ StatisticsFn: func() (map[string]interface{}, error) { @@ -24,7 +27,7 @@ func Test_RegisterStats(t *testing.T) { if err := monitor.Register("foo", nil, client); err != nil { t.Fatalf("failed to register client: %s", err.Error()) } - json := executeShowStatsJSON(t, monitor) + json := executeShowStatsJSON(t, executor) if !strings.Contains(json, `{"name":"foo","columns":["bar","qux"],"values":[[1,2.4]]}]}`) { t.Fatalf("SHOW STATS response incorrect, got: %s\n", json) } @@ -33,7 +36,7 @@ func Test_RegisterStats(t *testing.T) { if err := monitor.Register("baz", map[string]string{"proto": "tcp"}, client); err != nil { t.Fatalf("failed to register client: %s", err.Error()) } - json = executeShowStatsJSON(t, monitor) + json = executeShowStatsJSON(t, executor) if !strings.Contains(json, `{"name":"baz","tags":{"proto":"tcp"},"columns":["bar","qux"],"values":[[1,2.4]]}]}`) { t.Fatalf("SHOW STATS response incorrect, got: %s\n", json) } @@ -51,16 +54,26 @@ func (m mockStatsClient) Diagnostics() (map[string]interface{}, error) { return nil, nil } +type mockMetastore struct{} + +func (m *mockMetastore) ClusterID() (uint64, error) { return 1, nil } +func (m *mockMetastore) NodeID() uint64 { return 2 } +func (m *mockMetastore) WaitForLeader(d time.Duration) error { return nil } +func (m *mockMetastore) CreateDatabaseIfNotExists(name string) (*meta.DatabaseInfo, error) { + return nil, nil +} + func openMonitor(t *testing.T) *Monitor { monitor := New(NewConfig()) - err := monitor.Open(1, 2, "serverA") + monitor.MetaStore = &mockMetastore{} + err := monitor.Open() if err != nil { t.Fatalf("failed to open monitor: %s", err.Error()) } return monitor } -func executeShowStatsJSON(t *testing.T, s *Monitor) string { +func executeShowStatsJSON(t *testing.T, s *StatementExecutor) string { r := s.ExecuteStatement(&influxql.ShowStatsStatement{}) b, err := r.MarshalJSON() if err != nil { diff --git a/monitor/statement_executor.go b/monitor/statement_executor.go new file mode 100644 index 0000000000..3aadf82834 --- /dev/null +++ b/monitor/statement_executor.go @@ -0,0 +1,48 @@ +package monitor + +import ( + "fmt" + + "github.com/influxdb/influxdb/influxql" +) + +// StatementExecutor translates InfluxQL queries to Monitor methods. +type StatementExecutor struct { + Monitor interface { + Statistics() ([]*statistic, error) + } +} + +// ExecuteStatement executes monitor-related query statements. +func (s *StatementExecutor) ExecuteStatement(stmt influxql.Statement) *influxql.Result { + switch stmt := stmt.(type) { + case *influxql.ShowStatsStatement: + return s.executeShowStatistics() + case *influxql.ShowDiagnosticsStatement: + return s.executeShowDiagnostics() + default: + panic(fmt.Sprintf("unsupported statement type: %T", stmt)) + } +} + +func (s *StatementExecutor) executeShowStatistics() *influxql.Result { + stats, _ := s.Monitor.Statistics() + rows := make([]*influxql.Row, len(stats)) + + for n, stat := range stats { + row := &influxql.Row{Name: stat.Name, Tags: stat.Tags} + + values := make([]interface{}, 0, len(stat.Values)) + for _, k := range stat.valueNames() { + row.Columns = append(row.Columns, k) + values = append(values, stat.Values[k]) + } + row.Values = [][]interface{}{values} + rows[n] = row + } + return &influxql.Result{Series: rows} +} + +func (s *StatementExecutor) executeShowDiagnostics() *influxql.Result { + return nil +}