Add new monitor service

pull/3916/head
Philip O'Toole 2015-08-31 20:17:13 -07:00
parent 18d15c3f6a
commit 294b685e41
4 changed files with 419 additions and 0 deletions

41
monitor/config.go Normal file
View File

@ -0,0 +1,41 @@
package monitor
import (
"time"
"github.com/influxdb/influxdb/toml"
)
const (
// DefaultStoreEnabled is whether the system writes gathered information in
// an InfluxDB system for historical analysis.
DefaultStoreEnabled = true
// DefaultStoreDatabase is the name of the database where gathered information is written
DefaultStoreDatabase = "_internal"
// DefaultStoreInterval is the period between storing gathered information.
DefaultStoreInterval = time.Minute
// DefaultStoreAddress is the destination system for gathered information.
DefaultStoreAddress = "127.0.0.1:8086"
)
// Config represents the configuration for the monitor service.
type Config struct {
StoreEnabled bool `toml:"store-enabled"`
StoreDatabase string `toml:"store-database"`
StoreInterval toml.Duration `toml:"store-interval"`
StoreAddress string `toml:"store-address"`
ExpvarAddress string `toml:"expvar-address"`
}
// NewConfig returns an instance of Config with defaults.
func NewConfig() Config {
return Config{
StoreEnabled: false,
StoreDatabase: DefaultStoreDatabase,
StoreInterval: toml.Duration(DefaultStoreInterval),
StoreAddress: DefaultStoreAddress,
}
}

36
monitor/config_test.go Normal file
View File

@ -0,0 +1,36 @@
package monitor_test
import (
"testing"
"time"
"github.com/BurntSushi/toml"
"github.com/influxdb/influxdb/monitor"
)
func TestConfig_Parse(t *testing.T) {
// Parse configuration.
var c monitor.Config
if _, err := toml.Decode(`
store-enabled=true
store-database="the_db"
store-interval="10m"
store-address="server1"
expvar-address="127.0.0.1:9950"
`, &c); err != nil {
t.Fatal(err)
}
// Validate configuration.
if !c.StoreEnabled {
t.Fatalf("unexpected store-enabled: %s", c.StoreEnabled)
} else if c.StoreDatabase != "the_db" {
t.Fatalf("unexpected store-database: %s", c.StoreDatabase)
} else if time.Duration(c.StoreInterval) != 10*time.Minute {
t.Fatalf("unexpected store-interval: %s", c.StoreInterval)
} else if c.StoreAddress != "server1" {
t.Fatalf("unexpected store-address: %s", c.StoreAddress)
} else if c.ExpvarAddress != "127.0.0.1:9950" {
t.Fatalf("unexpected expvar-address: %s", c.ExpvarAddress)
}
}

37
monitor/go_runtime.go Normal file
View File

@ -0,0 +1,37 @@
package monitor
import (
"runtime"
)
// goRuntime captures Go runtime statistics and implements the monitor client interface
type goRuntime struct{}
// Statistics returns the statistics for the goRuntime type
func (g *goRuntime) Statistics() (map[string]interface{}, error) {
var m runtime.MemStats
runtime.ReadMemStats(&m)
return map[string]interface{}{
"Alloc": int64(m.Alloc),
"TotalAlloc": int64(m.TotalAlloc),
"Sys": int64(m.Sys),
"Lookups": int64(m.Lookups),
"Mallocs": int64(m.Mallocs),
"Frees": int64(m.Frees),
"HeapAlloc": int64(m.HeapAlloc),
"HeapSys": int64(m.HeapSys),
"HeapIdle": int64(m.HeapIdle),
"HeapInUse": int64(m.HeapInuse),
"HeapReleased": int64(m.HeapReleased),
"HeapObjects": int64(m.HeapObjects),
"PauseTotalNs": int64(m.PauseTotalNs),
"NumGC": int64(m.NumGC),
"NumGoroutine": int64(runtime.NumGoroutine()),
}, nil
}
// Diagnostics returns the statistics for the goRuntime type
func (g *goRuntime) Diagnostics() (map[string]interface{}, error) {
return nil, nil
}

305
monitor/service.go Normal file
View File

@ -0,0 +1,305 @@
package monitor
import (
"expvar"
"fmt"
"log"
"net"
"net/http"
"net/url"
"os"
"sort"
"strconv"
"sync"
"time"
"github.com/influxdb/influxdb/influxql"
)
// Client is the interface modules must implement if they wish to register with monitor.
type Client interface {
Statistics() (map[string]interface{}, error)
Diagnostics() (map[string]interface{}, error)
}
// Service represents an instance of the monitor service.
type Service struct {
wg sync.WaitGroup
done chan struct{}
mu sync.Mutex
registrations []*clientWithMeta
hostname string
clusterID uint64
nodeID uint64
storeEnabled bool
storeDatabase string
storeAddress string
storeInterval time.Duration
expvarAddress string
Logger *log.Logger
}
// NewService returns a new instance of the monitor service.
func NewService(c Config) *Service {
return &Service{
registrations: make([]*clientWithMeta, 0),
storeEnabled: c.StoreEnabled,
storeDatabase: c.StoreDatabase,
storeAddress: c.StoreAddress,
storeInterval: time.Duration(c.StoreInterval),
expvarAddress: c.ExpvarAddress,
Logger: log.New(os.Stderr, "[monitor] ", log.LstdFlags),
}
}
// Open opens the monitoring service, using the given clusterID, node ID, and hostname
// for identification purposes.
func (s *Service) Open(clusterID, nodeID uint64, hostname string) error {
s.Logger.Printf("starting monitor service for cluster %d, host %s", clusterID, hostname)
s.clusterID = clusterID
s.nodeID = nodeID
s.hostname = hostname
// Self-register Go runtime stats.
s.Register("runtime", nil, &goRuntime{})
// If enabled, record stats in a InfluxDB system.
if s.storeEnabled {
s.Logger.Printf("storing in %s, database '%s', interval %s",
s.storeAddress, s.storeDatabase, s.storeInterval)
s.Logger.Printf("ensuring database %s exists on %s", s.storeDatabase, s.storeAddress)
if err := ensureDatabaseExists(s.storeAddress, s.storeDatabase); err != nil {
return err
}
// Start periodic writes to system.
s.wg.Add(1)
go s.storeStatistics()
}
// If enabled, expose all expvar data over HTTP.
if s.expvarAddress != "" {
listener, err := net.Listen("tcp", s.expvarAddress)
if err != nil {
return err
}
go func() {
http.Serve(listener, nil)
}()
s.Logger.Printf("expvar information available on %s", s.expvarAddress)
}
return nil
}
// Close closes the monitor service.
func (s *Service) Close() {
s.Logger.Println("shutting down monitor service")
close(s.done)
s.wg.Wait()
s.done = nil
}
// SetLogger sets the internal logger to the logger passed in.
func (s *Service) SetLogger(l *log.Logger) {
s.Logger = l
}
// Register registers a client with the given name and tags.
func (s *Service) Register(name string, tags map[string]string, client Client) error {
s.mu.Lock()
defer s.mu.Unlock()
c := &clientWithMeta{
Client: client,
name: name,
tags: tags,
}
s.registrations = append(s.registrations, c)
s.Logger.Printf(`'%s:%v' registered for monitoring`, name, tags)
return nil
}
// ExecuteStatement executes monitor-related query statements.
func (s *Service) ExecuteStatement(stmt influxql.Statement) *influxql.Result {
switch stmt := stmt.(type) {
case *influxql.ShowStatsStatement:
return s.executeShowStatistics(stmt)
default:
panic(fmt.Sprintf("unsupported statement type: %T", stmt))
}
}
// executeShowStatistics returns the statistics of the registered monitor client in
// the standard form expected by users of the InfluxDB system.
func (s *Service) executeShowStatistics(q *influxql.ShowStatsStatement) *influxql.Result {
stats, _ := s.statistics()
rows := make([]*influxql.Row, len(stats))
for n, stat := range stats {
row := &influxql.Row{}
values := make([]interface{}, 0, len(stat.Tags)+len(stat.Values))
row.Columns = append(row.Columns, "name")
values = append(values, stat.Name)
for _, k := range stat.tagNames() {
row.Columns = append(row.Columns, k)
values = append(values, stat.Tags[k])
}
for _, k := range stat.valueNames() {
row.Columns = append(row.Columns, k)
values = append(values, stat.Values[k])
}
row.Values = [][]interface{}{values}
rows[n] = row
}
return &influxql.Result{Series: rows}
}
// statistics returns the combined statistics for all registered clients.
func (s *Service) statistics() ([]*statistic, error) {
s.mu.Lock()
defer s.mu.Unlock()
statistics := make([]*statistic, 0, len(s.registrations))
for _, r := range s.registrations {
stats, err := r.Client.Statistics()
if err != nil {
continue
}
statistics[i] = newStatistic(r.name, r.tags, stats)
}
return statistics, nil
}
// storeStatistics writes the statistics to an InfluxDB system.
func (s *Service) storeStatistics() {
// XXX add tags such as local hostname and cluster ID
//a.Tags["clusterID"] = strconv.FormatUint(s.clusterID, 10)
//a.Tags["nodeID"] = strconv.FormatUint(s.nodeID, 10)
//a.Tags["hostname"] = s.hostname
defer s.wg.Done()
tick := time.NewTicker(s.storeInterval)
defer tick.Stop()
for {
select {
case <-tick.C:
// Write stats here.
case <-s.done:
s.Logger.Printf("terminating storage of statistics to %s", s.storeAddress)
return
}
}
return
}
// statistic represents the information returned by a single monitor client.
type statistic struct {
Name string
Tags map[string]string
Values map[string]interface{}
}
// newStatistic returns a new statistic object. It ensures that tags are always non-nil.
func newStatistic(name string, tags map[string]string, values map[string]interface{}) *statistic {
a := tags
if a == nil {
a = make(map[string]string)
}
return &statistic{
Name: name,
Tags: a,
Values: values,
}
}
// tagNames returns a sorted list of the tag names, if any.
func (s *statistic) tagNames() []string {
a := make([]string, 0, len(s.Tags))
for k, _ := range s.Tags {
a = append(a, k)
}
sort.Strings(a)
return a
}
// valueNames returns a sorted list of the value names, if any.
func (s *statistic) valueNames() []string {
a := make([]string, 0, len(s.Values))
for k, _ := range s.Values {
a = append(a, k)
}
sort.Strings(a)
return a
}
// clientWithMeta wraps a registered client with its associated name and tags.
type clientWithMeta struct {
Client
name string
tags map[string]string
}
// MonitorClient wraps a *expvar.Map so that it implements the Client interface. It is for
// use by external packages that just record stats in an expvar.Map type.
type MonitorClient struct {
ep *expvar.Map
}
// NewMonitorClient returns a new MonitorClient using the given expvar.Map.
func NewMonitorClient(ep *expvar.Map) *MonitorClient {
return &MonitorClient{ep: ep}
}
// Statistics implements the Client interface for a MonitorClient.
func (m MonitorClient) Statistics() (map[string]interface{}, error) {
values := make(map[string]interface{})
m.ep.Do(func(kv expvar.KeyValue) {
var f interface{}
var err error
switch v := kv.Value.(type) {
case *expvar.Float:
f, err = strconv.ParseFloat(v.String(), 64)
if err != nil {
return
}
case *expvar.Int:
f, err = strconv.ParseUint(v.String(), 10, 64)
if err != nil {
return
}
default:
return
}
values[kv.Key] = f
})
return values, nil
}
// Diagnostics implements the Client interface for a MonitorClient.
func (m MonitorClient) Diagnostics() (map[string]interface{}, error) {
return nil, nil
}
func ensureDatabaseExists(host, database string) error {
values := url.Values{}
values.Set("q", fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", database))
resp, err := http.Get(host + "/query?" + values.Encode())
if err != nil {
return fmt.Errorf("failed to create monitoring database on %s: %s", host, err.Error())
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to create monitoring database on %s, received code: %d",
host, resp.StatusCode)
}
return nil
}