Merge pull request #3963 from influxdb/refactored_monitor

Refactor monitor system
pull/3975/head
Philip O'Toole 2015-09-02 18:21:27 -07:00
commit 59c062b2fa
8 changed files with 117 additions and 94 deletions

View File

@ -85,17 +85,9 @@ func NewServer(c *Config, version string) (*Server, error) {
MetaStore: meta.NewStore(c.Meta), MetaStore: meta.NewStore(c.Meta),
TSDBStore: tsdbStore, TSDBStore: tsdbStore,
reportingDisabled: c.ReportingDisabled, Monitor: monitor.New(c.Monitor),
}
// Start the monitor service. reportingDisabled: c.ReportingDisabled,
clusterID, err := s.MetaStore.ClusterID()
if err != nil {
return nil, err
}
s.Monitor = monitor.New(c.Monitor)
if err := s.Monitor.Open(clusterID, s.MetaStore.NodeID(), s.Hostname); err != nil {
return nil, err
} }
// Copy TSDB configuration. // Copy TSDB configuration.
@ -113,7 +105,7 @@ func NewServer(c *Config, version string) (*Server, error) {
s.QueryExecutor = tsdb.NewQueryExecutor(s.TSDBStore) s.QueryExecutor = tsdb.NewQueryExecutor(s.TSDBStore)
s.QueryExecutor.MetaStore = s.MetaStore s.QueryExecutor.MetaStore = s.MetaStore
s.QueryExecutor.MetaStatementExecutor = &meta.StatementExecutor{Store: s.MetaStore} s.QueryExecutor.MetaStatementExecutor = &meta.StatementExecutor{Store: s.MetaStore}
s.QueryExecutor.MonitorStatementExecutor = s.Monitor s.QueryExecutor.MonitorStatementExecutor = &monitor.StatementExecutor{Monitor: s.Monitor}
s.QueryExecutor.ShardMapper = s.ShardMapper s.QueryExecutor.ShardMapper = s.ShardMapper
// Set the shard writer // Set the shard writer
@ -131,6 +123,13 @@ func NewServer(c *Config, version string) (*Server, error) {
s.PointsWriter.ShardWriter = s.ShardWriter s.PointsWriter.ShardWriter = s.ShardWriter
s.PointsWriter.HintedHandoff = s.HintedHandoff s.PointsWriter.HintedHandoff = s.HintedHandoff
// Initialize the monitor
s.Monitor.MetaStore = s.MetaStore
s.Monitor.PointsWriter = s.PointsWriter
if err := s.Monitor.Open(); err != nil {
return nil, err
}
// Append services. // Append services.
s.appendClusterService(c.Cluster) s.appendClusterService(c.Cluster)
s.appendPrecreatorService(c.Precreator) s.appendPrecreatorService(c.Precreator)
@ -383,6 +382,9 @@ func (s *Server) Close() error {
if s.HintedHandoff != nil { if s.HintedHandoff != nil {
s.HintedHandoff.Close() s.HintedHandoff.Close()
} }
if s.Monitor != nil {
s.Monitor.Close()
}
for _, service := range s.Services { for _, service := range s.Services {
service.Close() service.Close()
} }

View File

@ -164,6 +164,8 @@ func NewConfig() *run.Config {
c.HTTPD.BindAddress = "127.0.0.1:0" c.HTTPD.BindAddress = "127.0.0.1:0"
c.HTTPD.LogEnabled = testing.Verbose() c.HTTPD.LogEnabled = testing.Verbose()
c.Monitor.StoreEnabled = false
return c return c
} }

View File

@ -89,14 +89,13 @@ reporting-disabled = false
check-interval = "10m" check-interval = "10m"
### ###
### Controls the system self-monitoring, statistics, diagnostics, and expvar data. ### Controls the system self-monitoring, statistics and diagnostics.
### ###
[monitor] [monitor]
store-enabled = false # Whether to record statistics in an InfluxDB system store-enabled = true # Whether to record statistics internally.
store-database = "_internal" # The destination database for recorded statistics store-database = "_internal" # The destination database for recorded statistics
store-interval = "1m" # The interval at which to record statistics store-interval = "1m" # The interval at which to record statistics
store-address = "http://127.0.0.1:8086" # The protocol and host for the recorded data
### ###
### [admin] ### [admin]

View File

@ -16,9 +16,6 @@ const (
// DefaultStoreInterval is the period between storing gathered information. // DefaultStoreInterval is the period between storing gathered information.
DefaultStoreInterval = time.Minute DefaultStoreInterval = time.Minute
// DefaultStoreAddress is the destination system for gathered information.
DefaultStoreAddress = "127.0.0.1:8086"
) )
// Config represents the configuration for the monitor service. // Config represents the configuration for the monitor service.
@ -26,15 +23,13 @@ type Config struct {
StoreEnabled bool `toml:"store-enabled"` StoreEnabled bool `toml:"store-enabled"`
StoreDatabase string `toml:"store-database"` StoreDatabase string `toml:"store-database"`
StoreInterval toml.Duration `toml:"store-interval"` StoreInterval toml.Duration `toml:"store-interval"`
StoreAddress string `toml:"store-address"`
} }
// NewConfig returns an instance of Config with defaults. // NewConfig returns an instance of Config with defaults.
func NewConfig() Config { func NewConfig() Config {
return Config{ return Config{
StoreEnabled: false, StoreEnabled: true,
StoreDatabase: DefaultStoreDatabase, StoreDatabase: DefaultStoreDatabase,
StoreInterval: toml.Duration(DefaultStoreInterval), StoreInterval: toml.Duration(DefaultStoreInterval),
StoreAddress: DefaultStoreAddress,
} }
} }

View File

@ -15,7 +15,6 @@ func TestConfig_Parse(t *testing.T) {
store-enabled=true store-enabled=true
store-database="the_db" store-database="the_db"
store-interval="10m" store-interval="10m"
store-address="server1"
`, &c); err != nil { `, &c); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -27,7 +26,5 @@ store-address="server1"
t.Fatalf("unexpected store-database: %s", c.StoreDatabase) t.Fatalf("unexpected store-database: %s", c.StoreDatabase)
} else if time.Duration(c.StoreInterval) != 10*time.Minute { } else if time.Duration(c.StoreInterval) != 10*time.Minute {
t.Fatalf("unexpected store-interval: %s", c.StoreInterval) t.Fatalf("unexpected store-interval: %s", c.StoreInterval)
} else if c.StoreAddress != "server1" {
t.Fatalf("unexpected store-address: %s", c.StoreAddress)
} }
} }

View File

@ -2,19 +2,19 @@ package monitor
import ( import (
"expvar" "expvar"
"fmt"
"log" "log"
"net/http"
"net/url"
"os" "os"
"sort" "sort"
"strconv" "strconv"
"sync" "sync"
"time" "time"
"github.com/influxdb/influxdb/influxql" "github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/meta"
) )
const leaderWaitTimeout = 30 * time.Second
// Client is the interface modules must implement if they wish to register with monitor. // Client is the interface modules must implement if they wish to register with monitor.
type Client interface { type Client interface {
Statistics() (map[string]interface{}, error) Statistics() (map[string]interface{}, error)
@ -28,25 +28,32 @@ type Monitor struct {
mu sync.Mutex mu sync.Mutex
registrations []*clientWithMeta registrations []*clientWithMeta
hostname string
clusterID uint64
nodeID uint64
storeEnabled bool storeEnabled bool
storeDatabase string storeDatabase string
storeAddress string storeAddress string
storeInterval time.Duration storeInterval time.Duration
MetaStore interface {
ClusterID() (uint64, error)
NodeID() uint64
WaitForLeader(d time.Duration) error
CreateDatabaseIfNotExists(name string) (*meta.DatabaseInfo, error)
}
PointsWriter interface {
WritePoints(p *cluster.WritePointsRequest) error
}
Logger *log.Logger Logger *log.Logger
} }
// New returns a new instance of the monitor system. // New returns a new instance of the monitor system.
func New(c Config) *Monitor { func New(c Config) *Monitor {
return &Monitor{ return &Monitor{
done: make(chan struct{}),
registrations: make([]*clientWithMeta, 0), registrations: make([]*clientWithMeta, 0),
storeEnabled: c.StoreEnabled, storeEnabled: c.StoreEnabled,
storeDatabase: c.StoreDatabase, storeDatabase: c.StoreDatabase,
storeAddress: c.StoreAddress,
storeInterval: time.Duration(c.StoreInterval), storeInterval: time.Duration(c.StoreInterval),
Logger: log.New(os.Stderr, "[monitor] ", log.LstdFlags), Logger: log.New(os.Stderr, "[monitor] ", log.LstdFlags),
} }
@ -54,24 +61,14 @@ func New(c Config) *Monitor {
// Open opens the monitoring system, using the given clusterID, node ID, and hostname // Open opens the monitoring system, using the given clusterID, node ID, and hostname
// for identification purposem. // for identification purposem.
func (m *Monitor) Open(clusterID, nodeID uint64, hostname string) error { func (m *Monitor) Open() error {
m.Logger.Printf("starting monitor system for cluster %d, host %s", clusterID, hostname) m.Logger.Printf("Starting monitor system")
m.clusterID = clusterID
m.nodeID = nodeID
m.hostname = hostname
// Self-register Go runtime statm. // Self-register Go runtime statm.
m.Register("runtime", nil, &goRuntime{}) m.Register("runtime", nil, &goRuntime{})
// If enabled, record stats in a InfluxDB system. // If enabled, record stats in a InfluxDB system.
if m.storeEnabled { if m.storeEnabled {
m.Logger.Printf("storing in %s, database '%s', interval %s",
m.storeAddress, m.storeDatabase, m.storeInterval)
m.Logger.Printf("ensuring database %s exists on %s", m.storeDatabase, m.storeAddress)
if err := ensureDatabaseExists(m.storeAddress, m.storeDatabase); err != nil {
return err
}
// Start periodic writes to system. // Start periodic writes to system.
m.wg.Add(1) m.wg.Add(1)
@ -108,38 +105,8 @@ func (m *Monitor) Register(name string, tags map[string]string, client Client) e
return nil return nil
} }
// ExecuteStatement executes monitor-related query statements.
func (m *Monitor) ExecuteStatement(stmt influxql.Statement) *influxql.Result {
switch stmt := stmt.(type) {
case *influxql.ShowStatsStatement:
return m.executeShowStatistics(stmt)
default:
panic(fmt.Sprintf("unsupported statement type: %T", stmt))
}
}
// executeShowStatistics returns the statistics of the registered monitor client in
// the standard form expected by users of the InfluxDB system.
func (m *Monitor) executeShowStatistics(q *influxql.ShowStatsStatement) *influxql.Result {
stats, _ := m.statistics()
rows := make([]*influxql.Row, len(stats))
for n, stat := range stats {
row := &influxql.Row{Name: stat.Name, Tags: stat.Tags}
values := make([]interface{}, 0, len(stat.Values))
for _, k := range stat.valueNames() {
row.Columns = append(row.Columns, k)
values = append(values, stat.Values[k])
}
row.Values = [][]interface{}{values}
rows[n] = row
}
return &influxql.Result{Series: rows}
}
// statistics returns the combined statistics for all registered clients. // statistics returns the combined statistics for all registered clients.
func (m *Monitor) statistics() ([]*statistic, error) { func (m *Monitor) Statistics() ([]*statistic, error) {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
@ -168,6 +135,20 @@ func (m *Monitor) storeStatistics() {
//a.Tags["hostname"] = m.hostname //a.Tags["hostname"] = m.hostname
defer m.wg.Done() defer m.wg.Done()
m.Logger.Printf("storing statistics in database '%s', interval %s",
m.storeDatabase, m.storeInterval)
if err := m.MetaStore.WaitForLeader(leaderWaitTimeout); err != nil {
m.Logger.Printf("failed to detect a cluster leader, terminating storage: %s", err.Error())
return
}
if _, err := m.MetaStore.CreateDatabaseIfNotExists(m.storeDatabase); err != nil {
m.Logger.Printf("failed to create database '%s', terminating storage: %s",
m.storeDatabase, err.Error())
return
}
tick := time.NewTicker(m.storeInterval) tick := time.NewTicker(m.storeInterval)
defer tick.Stop() defer tick.Stop()
for { for {
@ -175,7 +156,7 @@ func (m *Monitor) storeStatistics() {
case <-tick.C: case <-tick.C:
// Write stats here. // Write stats here.
case <-m.done: case <-m.done:
m.Logger.Printf("terminating storage of statistics to %s", m.storeAddress) m.Logger.Printf("terminating storage of statistics")
return return
} }
@ -271,17 +252,3 @@ func (m MonitorClient) Statistics() (map[string]interface{}, error) {
func (m MonitorClient) Diagnostics() (map[string]interface{}, error) { func (m MonitorClient) Diagnostics() (map[string]interface{}, error) {
return nil, nil return nil, nil
} }
func ensureDatabaseExists(host, database string) error {
values := url.Values{}
values.Set("q", fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", database))
resp, err := http.Get(host + "/query?" + values.Encode())
if err != nil {
return fmt.Errorf("failed to create monitoring database on %s: %s", host, err.Error())
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to create monitoring database on %s, received code: %d",
host, resp.StatusCode)
}
return nil
}

View File

@ -3,13 +3,16 @@ package monitor
import ( import (
"strings" "strings"
"testing" "testing"
"time"
"github.com/influxdb/influxdb/influxql" "github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/meta"
) )
// Test that a registered stats client results in the correct SHOW STATS output. // Test that a registered stats client results in the correct SHOW STATS output.
func Test_RegisterStats(t *testing.T) { func Test_RegisterStats(t *testing.T) {
monitor := openMonitor(t) monitor := openMonitor(t)
executor := &StatementExecutor{Monitor: monitor}
client := mockStatsClient{ client := mockStatsClient{
StatisticsFn: func() (map[string]interface{}, error) { StatisticsFn: func() (map[string]interface{}, error) {
@ -24,7 +27,7 @@ func Test_RegisterStats(t *testing.T) {
if err := monitor.Register("foo", nil, client); err != nil { if err := monitor.Register("foo", nil, client); err != nil {
t.Fatalf("failed to register client: %s", err.Error()) t.Fatalf("failed to register client: %s", err.Error())
} }
json := executeShowStatsJSON(t, monitor) json := executeShowStatsJSON(t, executor)
if !strings.Contains(json, `{"name":"foo","columns":["bar","qux"],"values":[[1,2.4]]}]}`) { if !strings.Contains(json, `{"name":"foo","columns":["bar","qux"],"values":[[1,2.4]]}]}`) {
t.Fatalf("SHOW STATS response incorrect, got: %s\n", json) t.Fatalf("SHOW STATS response incorrect, got: %s\n", json)
} }
@ -33,7 +36,7 @@ func Test_RegisterStats(t *testing.T) {
if err := monitor.Register("baz", map[string]string{"proto": "tcp"}, client); err != nil { if err := monitor.Register("baz", map[string]string{"proto": "tcp"}, client); err != nil {
t.Fatalf("failed to register client: %s", err.Error()) t.Fatalf("failed to register client: %s", err.Error())
} }
json = executeShowStatsJSON(t, monitor) json = executeShowStatsJSON(t, executor)
if !strings.Contains(json, `{"name":"baz","tags":{"proto":"tcp"},"columns":["bar","qux"],"values":[[1,2.4]]}]}`) { if !strings.Contains(json, `{"name":"baz","tags":{"proto":"tcp"},"columns":["bar","qux"],"values":[[1,2.4]]}]}`) {
t.Fatalf("SHOW STATS response incorrect, got: %s\n", json) t.Fatalf("SHOW STATS response incorrect, got: %s\n", json)
} }
@ -51,16 +54,26 @@ func (m mockStatsClient) Diagnostics() (map[string]interface{}, error) {
return nil, nil return nil, nil
} }
type mockMetastore struct{}
func (m *mockMetastore) ClusterID() (uint64, error) { return 1, nil }
func (m *mockMetastore) NodeID() uint64 { return 2 }
func (m *mockMetastore) WaitForLeader(d time.Duration) error { return nil }
func (m *mockMetastore) CreateDatabaseIfNotExists(name string) (*meta.DatabaseInfo, error) {
return nil, nil
}
func openMonitor(t *testing.T) *Monitor { func openMonitor(t *testing.T) *Monitor {
monitor := New(NewConfig()) monitor := New(NewConfig())
err := monitor.Open(1, 2, "serverA") monitor.MetaStore = &mockMetastore{}
err := monitor.Open()
if err != nil { if err != nil {
t.Fatalf("failed to open monitor: %s", err.Error()) t.Fatalf("failed to open monitor: %s", err.Error())
} }
return monitor return monitor
} }
func executeShowStatsJSON(t *testing.T, s *Monitor) string { func executeShowStatsJSON(t *testing.T, s *StatementExecutor) string {
r := s.ExecuteStatement(&influxql.ShowStatsStatement{}) r := s.ExecuteStatement(&influxql.ShowStatsStatement{})
b, err := r.MarshalJSON() b, err := r.MarshalJSON()
if err != nil { if err != nil {

View File

@ -0,0 +1,48 @@
package monitor
import (
"fmt"
"github.com/influxdb/influxdb/influxql"
)
// StatementExecutor translates InfluxQL queries to Monitor methods.
type StatementExecutor struct {
Monitor interface {
Statistics() ([]*statistic, error)
}
}
// ExecuteStatement executes monitor-related query statements.
func (s *StatementExecutor) ExecuteStatement(stmt influxql.Statement) *influxql.Result {
switch stmt := stmt.(type) {
case *influxql.ShowStatsStatement:
return s.executeShowStatistics()
case *influxql.ShowDiagnosticsStatement:
return s.executeShowDiagnostics()
default:
panic(fmt.Sprintf("unsupported statement type: %T", stmt))
}
}
func (s *StatementExecutor) executeShowStatistics() *influxql.Result {
stats, _ := s.Monitor.Statistics()
rows := make([]*influxql.Row, len(stats))
for n, stat := range stats {
row := &influxql.Row{Name: stat.Name, Tags: stat.Tags}
values := make([]interface{}, 0, len(stat.Values))
for _, k := range stat.valueNames() {
row.Columns = append(row.Columns, k)
values = append(values, stat.Values[k])
}
row.Values = [][]interface{}{values}
rows[n] = row
}
return &influxql.Result{Series: rows}
}
func (s *StatementExecutor) executeShowDiagnostics() *influxql.Result {
return nil
}