diff --git a/monitor/config.go b/monitor/config.go new file mode 100644 index 0000000000..324a62dea5 --- /dev/null +++ b/monitor/config.go @@ -0,0 +1,41 @@ +package monitor + +import ( + "time" + + "github.com/influxdb/influxdb/toml" +) + +const ( + // DefaultStoreEnabled is whether the system writes gathered information in + // an InfluxDB system for historical analysis. + DefaultStoreEnabled = true + + // DefaultStoreDatabase is the name of the database where gathered information is written + DefaultStoreDatabase = "_internal" + + // DefaultStoreInterval is the period between storing gathered information. + DefaultStoreInterval = time.Minute + + // DefaultStoreAddress is the destination system for gathered information. + DefaultStoreAddress = "127.0.0.1:8086" +) + +// Config represents the configuration for the monitor service. +type Config struct { + StoreEnabled bool `toml:"store-enabled"` + StoreDatabase string `toml:"store-database"` + StoreInterval toml.Duration `toml:"store-interval"` + StoreAddress string `toml:"store-address"` + ExpvarAddress string `toml:"expvar-address"` +} + +// NewConfig returns an instance of Config with defaults. +func NewConfig() Config { + return Config{ + StoreEnabled: false, + StoreDatabase: DefaultStoreDatabase, + StoreInterval: toml.Duration(DefaultStoreInterval), + StoreAddress: DefaultStoreAddress, + } +} diff --git a/monitor/config_test.go b/monitor/config_test.go new file mode 100644 index 0000000000..d3ec3fda14 --- /dev/null +++ b/monitor/config_test.go @@ -0,0 +1,36 @@ +package monitor_test + +import ( + "testing" + "time" + + "github.com/BurntSushi/toml" + "github.com/influxdb/influxdb/monitor" +) + +func TestConfig_Parse(t *testing.T) { + // Parse configuration. + var c monitor.Config + if _, err := toml.Decode(` +store-enabled=true +store-database="the_db" +store-interval="10m" +store-address="server1" +expvar-address="127.0.0.1:9950" +`, &c); err != nil { + t.Fatal(err) + } + + // Validate configuration. + if !c.StoreEnabled { + t.Fatalf("unexpected store-enabled: %s", c.StoreEnabled) + } else if c.StoreDatabase != "the_db" { + t.Fatalf("unexpected store-database: %s", c.StoreDatabase) + } else if time.Duration(c.StoreInterval) != 10*time.Minute { + t.Fatalf("unexpected store-interval: %s", c.StoreInterval) + } else if c.StoreAddress != "server1" { + t.Fatalf("unexpected store-address: %s", c.StoreAddress) + } else if c.ExpvarAddress != "127.0.0.1:9950" { + t.Fatalf("unexpected expvar-address: %s", c.ExpvarAddress) + } +} diff --git a/monitor/go_runtime.go b/monitor/go_runtime.go new file mode 100644 index 0000000000..ce0ef8bc29 --- /dev/null +++ b/monitor/go_runtime.go @@ -0,0 +1,37 @@ +package monitor + +import ( + "runtime" +) + +// goRuntime captures Go runtime statistics and implements the monitor client interface +type goRuntime struct{} + +// Statistics returns the statistics for the goRuntime type +func (g *goRuntime) Statistics() (map[string]interface{}, error) { + var m runtime.MemStats + runtime.ReadMemStats(&m) + + return map[string]interface{}{ + "Alloc": int64(m.Alloc), + "TotalAlloc": int64(m.TotalAlloc), + "Sys": int64(m.Sys), + "Lookups": int64(m.Lookups), + "Mallocs": int64(m.Mallocs), + "Frees": int64(m.Frees), + "HeapAlloc": int64(m.HeapAlloc), + "HeapSys": int64(m.HeapSys), + "HeapIdle": int64(m.HeapIdle), + "HeapInUse": int64(m.HeapInuse), + "HeapReleased": int64(m.HeapReleased), + "HeapObjects": int64(m.HeapObjects), + "PauseTotalNs": int64(m.PauseTotalNs), + "NumGC": int64(m.NumGC), + "NumGoroutine": int64(runtime.NumGoroutine()), + }, nil +} + +// Diagnostics returns the statistics for the goRuntime type +func (g *goRuntime) Diagnostics() (map[string]interface{}, error) { + return nil, nil +} diff --git a/monitor/service.go b/monitor/service.go new file mode 100644 index 0000000000..2c78e4275f --- /dev/null +++ b/monitor/service.go @@ -0,0 +1,305 @@ +package monitor + +import ( + "expvar" + "fmt" + "log" + "net" + "net/http" + "net/url" + "os" + "sort" + "strconv" + "sync" + "time" + + "github.com/influxdb/influxdb/influxql" +) + +// Client is the interface modules must implement if they wish to register with monitor. +type Client interface { + Statistics() (map[string]interface{}, error) + Diagnostics() (map[string]interface{}, error) +} + +// Service represents an instance of the monitor service. +type Service struct { + wg sync.WaitGroup + done chan struct{} + mu sync.Mutex + registrations []*clientWithMeta + + hostname string + clusterID uint64 + nodeID uint64 + + storeEnabled bool + storeDatabase string + storeAddress string + storeInterval time.Duration + + expvarAddress string + + Logger *log.Logger +} + +// NewService returns a new instance of the monitor service. +func NewService(c Config) *Service { + return &Service{ + registrations: make([]*clientWithMeta, 0), + storeEnabled: c.StoreEnabled, + storeDatabase: c.StoreDatabase, + storeAddress: c.StoreAddress, + storeInterval: time.Duration(c.StoreInterval), + expvarAddress: c.ExpvarAddress, + Logger: log.New(os.Stderr, "[monitor] ", log.LstdFlags), + } +} + +// Open opens the monitoring service, using the given clusterID, node ID, and hostname +// for identification purposes. +func (s *Service) Open(clusterID, nodeID uint64, hostname string) error { + s.Logger.Printf("starting monitor service for cluster %d, host %s", clusterID, hostname) + s.clusterID = clusterID + s.nodeID = nodeID + s.hostname = hostname + + // Self-register Go runtime stats. + s.Register("runtime", nil, &goRuntime{}) + + // If enabled, record stats in a InfluxDB system. + if s.storeEnabled { + s.Logger.Printf("storing in %s, database '%s', interval %s", + s.storeAddress, s.storeDatabase, s.storeInterval) + + s.Logger.Printf("ensuring database %s exists on %s", s.storeDatabase, s.storeAddress) + if err := ensureDatabaseExists(s.storeAddress, s.storeDatabase); err != nil { + return err + } + + // Start periodic writes to system. + s.wg.Add(1) + go s.storeStatistics() + } + + // If enabled, expose all expvar data over HTTP. + if s.expvarAddress != "" { + listener, err := net.Listen("tcp", s.expvarAddress) + if err != nil { + return err + } + + go func() { + http.Serve(listener, nil) + }() + s.Logger.Printf("expvar information available on %s", s.expvarAddress) + } + return nil +} + +// Close closes the monitor service. +func (s *Service) Close() { + s.Logger.Println("shutting down monitor service") + close(s.done) + s.wg.Wait() + s.done = nil +} + +// SetLogger sets the internal logger to the logger passed in. +func (s *Service) SetLogger(l *log.Logger) { + s.Logger = l +} + +// Register registers a client with the given name and tags. +func (s *Service) Register(name string, tags map[string]string, client Client) error { + s.mu.Lock() + defer s.mu.Unlock() + c := &clientWithMeta{ + Client: client, + name: name, + tags: tags, + } + s.registrations = append(s.registrations, c) + s.Logger.Printf(`'%s:%v' registered for monitoring`, name, tags) + return nil +} + +// ExecuteStatement executes monitor-related query statements. +func (s *Service) ExecuteStatement(stmt influxql.Statement) *influxql.Result { + switch stmt := stmt.(type) { + case *influxql.ShowStatsStatement: + return s.executeShowStatistics(stmt) + default: + panic(fmt.Sprintf("unsupported statement type: %T", stmt)) + } +} + +// executeShowStatistics returns the statistics of the registered monitor client in +// the standard form expected by users of the InfluxDB system. +func (s *Service) executeShowStatistics(q *influxql.ShowStatsStatement) *influxql.Result { + stats, _ := s.statistics() + rows := make([]*influxql.Row, len(stats)) + + for n, stat := range stats { + row := &influxql.Row{} + values := make([]interface{}, 0, len(stat.Tags)+len(stat.Values)) + + row.Columns = append(row.Columns, "name") + values = append(values, stat.Name) + + for _, k := range stat.tagNames() { + row.Columns = append(row.Columns, k) + values = append(values, stat.Tags[k]) + } + for _, k := range stat.valueNames() { + row.Columns = append(row.Columns, k) + values = append(values, stat.Values[k]) + } + row.Values = [][]interface{}{values} + rows[n] = row + } + return &influxql.Result{Series: rows} +} + +// statistics returns the combined statistics for all registered clients. +func (s *Service) statistics() ([]*statistic, error) { + s.mu.Lock() + defer s.mu.Unlock() + + statistics := make([]*statistic, 0, len(s.registrations)) + for _, r := range s.registrations { + stats, err := r.Client.Statistics() + if err != nil { + continue + } + statistics[i] = newStatistic(r.name, r.tags, stats) + } + return statistics, nil +} + +// storeStatistics writes the statistics to an InfluxDB system. +func (s *Service) storeStatistics() { + // XXX add tags such as local hostname and cluster ID + //a.Tags["clusterID"] = strconv.FormatUint(s.clusterID, 10) + //a.Tags["nodeID"] = strconv.FormatUint(s.nodeID, 10) + //a.Tags["hostname"] = s.hostname + defer s.wg.Done() + + tick := time.NewTicker(s.storeInterval) + defer tick.Stop() + for { + select { + case <-tick.C: + // Write stats here. + case <-s.done: + s.Logger.Printf("terminating storage of statistics to %s", s.storeAddress) + return + } + + } + return +} + +// statistic represents the information returned by a single monitor client. +type statistic struct { + Name string + Tags map[string]string + Values map[string]interface{} +} + +// newStatistic returns a new statistic object. It ensures that tags are always non-nil. +func newStatistic(name string, tags map[string]string, values map[string]interface{}) *statistic { + a := tags + if a == nil { + a = make(map[string]string) + } + + return &statistic{ + Name: name, + Tags: a, + Values: values, + } +} + +// tagNames returns a sorted list of the tag names, if any. +func (s *statistic) tagNames() []string { + a := make([]string, 0, len(s.Tags)) + for k, _ := range s.Tags { + a = append(a, k) + } + sort.Strings(a) + return a +} + +// valueNames returns a sorted list of the value names, if any. +func (s *statistic) valueNames() []string { + a := make([]string, 0, len(s.Values)) + for k, _ := range s.Values { + a = append(a, k) + } + sort.Strings(a) + return a +} + +// clientWithMeta wraps a registered client with its associated name and tags. +type clientWithMeta struct { + Client + name string + tags map[string]string +} + +// MonitorClient wraps a *expvar.Map so that it implements the Client interface. It is for +// use by external packages that just record stats in an expvar.Map type. +type MonitorClient struct { + ep *expvar.Map +} + +// NewMonitorClient returns a new MonitorClient using the given expvar.Map. +func NewMonitorClient(ep *expvar.Map) *MonitorClient { + return &MonitorClient{ep: ep} +} + +// Statistics implements the Client interface for a MonitorClient. +func (m MonitorClient) Statistics() (map[string]interface{}, error) { + values := make(map[string]interface{}) + m.ep.Do(func(kv expvar.KeyValue) { + var f interface{} + var err error + switch v := kv.Value.(type) { + case *expvar.Float: + f, err = strconv.ParseFloat(v.String(), 64) + if err != nil { + return + } + case *expvar.Int: + f, err = strconv.ParseUint(v.String(), 10, 64) + if err != nil { + return + } + default: + return + } + values[kv.Key] = f + }) + + return values, nil +} + +// Diagnostics implements the Client interface for a MonitorClient. +func (m MonitorClient) Diagnostics() (map[string]interface{}, error) { + return nil, nil +} + +func ensureDatabaseExists(host, database string) error { + values := url.Values{} + values.Set("q", fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", database)) + resp, err := http.Get(host + "/query?" + values.Encode()) + if err != nil { + return fmt.Errorf("failed to create monitoring database on %s: %s", host, err.Error()) + } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("failed to create monitoring database on %s, received code: %d", + host, resp.StatusCode) + } + return nil +}