2015-09-01 03:17:13 +00:00
|
|
|
package monitor
|
|
|
|
|
|
|
|
import (
|
|
|
|
"expvar"
|
2015-09-04 23:42:22 +00:00
|
|
|
"fmt"
|
2015-09-01 03:17:13 +00:00
|
|
|
"log"
|
|
|
|
"os"
|
2015-09-04 20:14:38 +00:00
|
|
|
"runtime"
|
2015-09-01 03:17:13 +00:00
|
|
|
"sort"
|
|
|
|
"strconv"
|
|
|
|
"sync"
|
|
|
|
"time"
|
2015-09-02 22:45:11 +00:00
|
|
|
|
|
|
|
"github.com/influxdb/influxdb/cluster"
|
|
|
|
"github.com/influxdb/influxdb/meta"
|
2015-09-03 02:38:31 +00:00
|
|
|
"github.com/influxdb/influxdb/tsdb"
|
2015-09-01 03:17:13 +00:00
|
|
|
)
|
|
|
|
|
2015-09-02 23:14:03 +00:00
|
|
|
const leaderWaitTimeout = 30 * time.Second
|
|
|
|
|
2015-09-03 02:38:31 +00:00
|
|
|
// DiagsClient is the interface modules implement if they register diags with monitor.
|
|
|
|
type DiagsClient interface {
|
|
|
|
Diagnostics() (*Diagnostic, error)
|
|
|
|
}
|
|
|
|
|
|
|
|
// The DiagsClientFunc type is an adapter to allow the use of
|
|
|
|
// ordinary functions as Diagnostis clients.
|
|
|
|
type DiagsClientFunc func() (*Diagnostic, error)
|
|
|
|
|
|
|
|
// Diagnostics calls f().
|
|
|
|
func (f DiagsClientFunc) Diagnostics() (*Diagnostic, error) {
|
|
|
|
return f()
|
|
|
|
}
|
|
|
|
|
|
|
|
// Diagnostic represents a table of diagnostic information. The first value
|
|
|
|
// is the name of the columns, the second is a slice of interface slices containing
|
|
|
|
// the values for each column, by row. This information is never written to an InfluxDB
|
|
|
|
// system and is display-only. An example showing, say, connections follows:
|
|
|
|
//
|
|
|
|
// source_ip source_port dest_ip dest_port
|
|
|
|
// 182.1.0.2 2890 127.0.0.1 38901
|
|
|
|
// 174.33.1.2 2924 127.0.0.1 38902
|
|
|
|
type Diagnostic struct {
|
|
|
|
Columns []string
|
|
|
|
Rows [][]interface{}
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewDiagnostic(columns []string) *Diagnostic {
|
|
|
|
return &Diagnostic{
|
|
|
|
Columns: columns,
|
|
|
|
Rows: make([][]interface{}, 0),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *Diagnostic) AddRow(r []interface{}) {
|
|
|
|
d.Rows = append(d.Rows, r)
|
2015-09-01 03:17:13 +00:00
|
|
|
}
|
|
|
|
|
2015-09-02 22:07:30 +00:00
|
|
|
// Monitor represents an instance of the monitor system.
|
|
|
|
type Monitor struct {
|
2015-09-03 02:38:31 +00:00
|
|
|
wg sync.WaitGroup
|
|
|
|
done chan struct{}
|
|
|
|
mu sync.Mutex
|
|
|
|
|
|
|
|
diagRegistrations map[string]DiagsClient
|
2015-09-01 03:17:13 +00:00
|
|
|
|
2015-09-05 05:12:58 +00:00
|
|
|
storeEnabled bool
|
|
|
|
storeDatabase string
|
|
|
|
storeRetentionPolicy string
|
|
|
|
storeRetentionDuration time.Duration
|
|
|
|
storeReplicationFactor int
|
|
|
|
storeAddress string
|
|
|
|
storeInterval time.Duration
|
2015-09-01 03:17:13 +00:00
|
|
|
|
2015-09-02 22:45:11 +00:00
|
|
|
MetaStore interface {
|
|
|
|
ClusterID() (uint64, error)
|
|
|
|
NodeID() uint64
|
2015-09-02 23:14:03 +00:00
|
|
|
WaitForLeader(d time.Duration) error
|
2015-09-02 22:45:11 +00:00
|
|
|
CreateDatabaseIfNotExists(name string) (*meta.DatabaseInfo, error)
|
2015-09-05 05:12:58 +00:00
|
|
|
CreateRetentionPolicyIfNotExists(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error)
|
2015-09-02 22:45:11 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
PointsWriter interface {
|
|
|
|
WritePoints(p *cluster.WritePointsRequest) error
|
|
|
|
}
|
|
|
|
|
2015-09-01 03:17:13 +00:00
|
|
|
Logger *log.Logger
|
|
|
|
}
|
|
|
|
|
2015-09-02 22:07:30 +00:00
|
|
|
// New returns a new instance of the monitor system.
|
|
|
|
func New(c Config) *Monitor {
|
|
|
|
return &Monitor{
|
2015-09-05 05:12:58 +00:00
|
|
|
done: make(chan struct{}),
|
|
|
|
diagRegistrations: make(map[string]DiagsClient),
|
|
|
|
storeEnabled: c.StoreEnabled,
|
|
|
|
storeDatabase: c.StoreDatabase,
|
|
|
|
storeRetentionPolicy: c.StoreRetentionPolicy,
|
|
|
|
storeRetentionDuration: time.Duration(c.StoreRetentionDuration),
|
|
|
|
storeReplicationFactor: c.StoreReplicationFactor,
|
|
|
|
storeInterval: time.Duration(c.StoreInterval),
|
|
|
|
Logger: log.New(os.Stderr, "[monitor] ", log.LstdFlags),
|
2015-09-01 03:17:13 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-09-02 22:07:30 +00:00
|
|
|
// Open opens the monitoring system, using the given clusterID, node ID, and hostname
|
|
|
|
// for identification purposem.
|
2015-09-02 22:45:11 +00:00
|
|
|
func (m *Monitor) Open() error {
|
|
|
|
m.Logger.Printf("Starting monitor system")
|
2015-09-01 03:17:13 +00:00
|
|
|
|
2015-09-03 02:38:31 +00:00
|
|
|
// Self-register various stats and diagnostics.
|
2015-09-04 05:12:33 +00:00
|
|
|
m.RegisterDiagnosticsClient("runtime", &goRuntime{})
|
2015-09-03 02:38:31 +00:00
|
|
|
m.RegisterDiagnosticsClient("network", &network{})
|
|
|
|
m.RegisterDiagnosticsClient("system", &system{})
|
2015-09-01 03:17:13 +00:00
|
|
|
|
|
|
|
// If enabled, record stats in a InfluxDB system.
|
2015-09-02 22:07:30 +00:00
|
|
|
if m.storeEnabled {
|
2015-09-01 03:17:13 +00:00
|
|
|
|
|
|
|
// Start periodic writes to system.
|
2015-09-02 22:07:30 +00:00
|
|
|
m.wg.Add(1)
|
|
|
|
go m.storeStatistics()
|
2015-09-01 03:17:13 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2015-09-02 22:07:30 +00:00
|
|
|
// Close closes the monitor system.
|
|
|
|
func (m *Monitor) Close() {
|
|
|
|
m.Logger.Println("shutting down monitor system")
|
|
|
|
close(m.done)
|
|
|
|
m.wg.Wait()
|
|
|
|
m.done = nil
|
2015-09-01 03:17:13 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// SetLogger sets the internal logger to the logger passed in.
|
2015-09-02 22:07:30 +00:00
|
|
|
func (m *Monitor) SetLogger(l *log.Logger) {
|
|
|
|
m.Logger = l
|
2015-09-01 03:17:13 +00:00
|
|
|
}
|
|
|
|
|
2015-09-03 02:38:31 +00:00
|
|
|
// RegisterDiagnosticsClient registers a diagnostics client with the given name and tags.
|
|
|
|
func (m *Monitor) RegisterDiagnosticsClient(name string, client DiagsClient) error {
|
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
|
|
|
m.diagRegistrations[name] = client
|
|
|
|
m.Logger.Printf(`'%s' registered for diagnostics monitoring`, name)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2015-09-04 23:42:22 +00:00
|
|
|
// Statistics returns the combined statistics for all expvar data. The given
|
|
|
|
// tags are added to each of the returned statistics.
|
|
|
|
func (m *Monitor) Statistics(tags map[string]string) ([]*statistic, error) {
|
2015-09-04 05:12:33 +00:00
|
|
|
statistics := make([]*statistic, 0)
|
2015-09-01 03:17:13 +00:00
|
|
|
|
2015-09-04 05:12:33 +00:00
|
|
|
expvar.Do(func(kv expvar.KeyValue) {
|
|
|
|
// Skip built-in expvar stats.
|
|
|
|
if kv.Key == "memstats" || kv.Key == "cmdline" {
|
|
|
|
return
|
2015-09-01 03:17:13 +00:00
|
|
|
}
|
2015-09-01 03:50:44 +00:00
|
|
|
|
2015-09-04 05:12:33 +00:00
|
|
|
statistic := &statistic{
|
|
|
|
Tags: make(map[string]string),
|
|
|
|
Values: make(map[string]interface{}),
|
|
|
|
}
|
|
|
|
|
2015-09-04 23:42:22 +00:00
|
|
|
// Add any supplied tags.
|
|
|
|
for k, v := range tags {
|
|
|
|
statistic.Tags[k] = v
|
|
|
|
}
|
|
|
|
|
2015-09-04 05:12:33 +00:00
|
|
|
// Every other top-level expvar value is a map.
|
|
|
|
m := kv.Value.(*expvar.Map)
|
|
|
|
|
|
|
|
m.Do(func(subKV expvar.KeyValue) {
|
|
|
|
switch subKV.Key {
|
|
|
|
case "name":
|
|
|
|
// straight to string name.
|
|
|
|
u, err := strconv.Unquote(subKV.Value.String())
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
statistic.Name = u
|
|
|
|
case "tags":
|
|
|
|
// string-string tags map.
|
|
|
|
n := subKV.Value.(*expvar.Map)
|
|
|
|
n.Do(func(t expvar.KeyValue) {
|
|
|
|
u, err := strconv.Unquote(t.Value.String())
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
statistic.Tags[t.Key] = u
|
|
|
|
})
|
|
|
|
case "values":
|
|
|
|
// string-interface map.
|
|
|
|
n := subKV.Value.(*expvar.Map)
|
|
|
|
n.Do(func(kv expvar.KeyValue) {
|
|
|
|
var f interface{}
|
|
|
|
var err error
|
|
|
|
switch v := kv.Value.(type) {
|
|
|
|
case *expvar.Float:
|
|
|
|
f, err = strconv.ParseFloat(v.String(), 64)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
case *expvar.Int:
|
2015-09-04 21:41:13 +00:00
|
|
|
f, err = strconv.ParseInt(v.String(), 10, 64)
|
2015-09-04 05:12:33 +00:00
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
return
|
|
|
|
}
|
|
|
|
statistic.Values[kv.Key] = f
|
|
|
|
})
|
|
|
|
}
|
|
|
|
})
|
|
|
|
|
2015-09-01 03:50:44 +00:00
|
|
|
// If a registered client has no field data, don't include it in the results
|
2015-09-04 05:12:33 +00:00
|
|
|
if len(statistic.Values) == 0 {
|
|
|
|
return
|
2015-09-01 03:50:44 +00:00
|
|
|
}
|
2015-09-04 20:14:38 +00:00
|
|
|
|
2015-09-04 05:12:33 +00:00
|
|
|
statistics = append(statistics, statistic)
|
|
|
|
})
|
2015-09-01 03:50:44 +00:00
|
|
|
|
2015-09-04 20:14:38 +00:00
|
|
|
// Add Go memstats.
|
|
|
|
statistic := &statistic{
|
|
|
|
Name: "runtime",
|
|
|
|
Tags: make(map[string]string),
|
|
|
|
Values: make(map[string]interface{}),
|
|
|
|
}
|
|
|
|
var rt runtime.MemStats
|
|
|
|
runtime.ReadMemStats(&rt)
|
|
|
|
statistic.Values = map[string]interface{}{
|
|
|
|
"Alloc": int64(rt.Alloc),
|
|
|
|
"TotalAlloc": int64(rt.TotalAlloc),
|
|
|
|
"Sys": int64(rt.Sys),
|
|
|
|
"Lookups": int64(rt.Lookups),
|
|
|
|
"Mallocs": int64(rt.Mallocs),
|
|
|
|
"Frees": int64(rt.Frees),
|
|
|
|
"HeapAlloc": int64(rt.HeapAlloc),
|
|
|
|
"HeapSys": int64(rt.HeapSys),
|
|
|
|
"HeapIdle": int64(rt.HeapIdle),
|
|
|
|
"HeapInUse": int64(rt.HeapInuse),
|
|
|
|
"HeapReleased": int64(rt.HeapReleased),
|
|
|
|
"HeapObjects": int64(rt.HeapObjects),
|
|
|
|
"PauseTotalNs": int64(rt.PauseTotalNs),
|
|
|
|
"NumGC": int64(rt.NumGC),
|
|
|
|
"NumGoroutine": int64(runtime.NumGoroutine()),
|
|
|
|
}
|
|
|
|
statistics = append(statistics, statistic)
|
|
|
|
|
2015-09-01 03:17:13 +00:00
|
|
|
return statistics, nil
|
|
|
|
}
|
|
|
|
|
2015-09-03 02:38:31 +00:00
|
|
|
func (m *Monitor) Diagnostics() (map[string]*Diagnostic, error) {
|
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
|
|
|
|
|
|
|
diags := make(map[string]*Diagnostic, len(m.diagRegistrations))
|
|
|
|
for k, v := range m.diagRegistrations {
|
|
|
|
d, err := v.Diagnostics()
|
|
|
|
if err != nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
diags[k] = d
|
|
|
|
}
|
|
|
|
return diags, nil
|
|
|
|
}
|
|
|
|
|
2015-09-01 03:17:13 +00:00
|
|
|
// storeStatistics writes the statistics to an InfluxDB system.
|
2015-09-02 22:07:30 +00:00
|
|
|
func (m *Monitor) storeStatistics() {
|
|
|
|
defer m.wg.Done()
|
2015-09-05 05:12:58 +00:00
|
|
|
m.Logger.Printf("Storing statistics in database '%s' retention policy '%s', at interval %s",
|
|
|
|
m.storeDatabase, m.storeRetentionPolicy, m.storeInterval)
|
2015-09-02 23:14:03 +00:00
|
|
|
|
|
|
|
if err := m.MetaStore.WaitForLeader(leaderWaitTimeout); err != nil {
|
|
|
|
m.Logger.Printf("failed to detect a cluster leader, terminating storage: %s", err.Error())
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2015-09-04 23:42:22 +00:00
|
|
|
// Get cluster-level metadata. Nothing different is going to happen if errors occur.
|
|
|
|
clusterID, _ := m.MetaStore.ClusterID()
|
|
|
|
nodeID := m.MetaStore.NodeID()
|
|
|
|
hostname, _ := os.Hostname()
|
|
|
|
clusterTags := map[string]string{
|
|
|
|
"clusterID": fmt.Sprintf("%d", clusterID),
|
|
|
|
"nodeID": fmt.Sprintf("%d", nodeID),
|
|
|
|
"hostname": hostname,
|
|
|
|
}
|
|
|
|
|
2015-09-02 23:14:03 +00:00
|
|
|
if _, err := m.MetaStore.CreateDatabaseIfNotExists(m.storeDatabase); err != nil {
|
|
|
|
m.Logger.Printf("failed to create database '%s', terminating storage: %s",
|
|
|
|
m.storeDatabase, err.Error())
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2015-09-05 05:12:58 +00:00
|
|
|
rpi := meta.NewRetentionPolicyInfo(m.storeRetentionPolicy)
|
|
|
|
rpi.Duration = m.storeRetentionDuration
|
|
|
|
rpi.ReplicaN = m.storeReplicationFactor
|
|
|
|
|
|
|
|
if _, err := m.MetaStore.CreateRetentionPolicyIfNotExists(m.storeDatabase, rpi); err != nil {
|
|
|
|
m.Logger.Printf("failed to create retention policy '%s', terminating storage: %s",
|
|
|
|
m.storeRetentionPolicy, err.Error())
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2015-09-02 22:07:30 +00:00
|
|
|
tick := time.NewTicker(m.storeInterval)
|
2015-09-01 03:17:13 +00:00
|
|
|
defer tick.Stop()
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-tick.C:
|
2015-09-04 23:42:22 +00:00
|
|
|
stats, err := m.Statistics(clusterTags)
|
2015-09-03 02:38:31 +00:00
|
|
|
if err != nil {
|
|
|
|
m.Logger.Printf("failed to retrieve registered statistics: %s", err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
points := make(tsdb.Points, 0, len(stats))
|
|
|
|
for _, s := range stats {
|
|
|
|
points = append(points, tsdb.NewPoint(s.Name, s.Tags, s.Values, time.Now()))
|
|
|
|
}
|
|
|
|
|
|
|
|
err = m.PointsWriter.WritePoints(&cluster.WritePointsRequest{
|
|
|
|
Database: m.storeDatabase,
|
2015-09-05 05:12:58 +00:00
|
|
|
RetentionPolicy: m.storeRetentionPolicy,
|
2015-09-03 02:38:31 +00:00
|
|
|
ConsistencyLevel: cluster.ConsistencyLevelOne,
|
|
|
|
Points: points,
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
m.Logger.Printf("failed to store statistics: %s", err)
|
|
|
|
}
|
2015-09-02 22:07:30 +00:00
|
|
|
case <-m.done:
|
2015-09-02 22:55:59 +00:00
|
|
|
m.Logger.Printf("terminating storage of statistics")
|
2015-09-01 03:17:13 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// statistic represents the information returned by a single monitor client.
|
|
|
|
type statistic struct {
|
|
|
|
Name string
|
|
|
|
Tags map[string]string
|
|
|
|
Values map[string]interface{}
|
|
|
|
}
|
|
|
|
|
2015-09-03 02:38:31 +00:00
|
|
|
// newStatistic returns a new statistic object.
|
2015-09-01 03:17:13 +00:00
|
|
|
func newStatistic(name string, tags map[string]string, values map[string]interface{}) *statistic {
|
|
|
|
return &statistic{
|
|
|
|
Name: name,
|
2015-09-03 02:38:31 +00:00
|
|
|
Tags: tags,
|
2015-09-01 03:17:13 +00:00
|
|
|
Values: values,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// valueNames returns a sorted list of the value names, if any.
|
|
|
|
func (s *statistic) valueNames() []string {
|
|
|
|
a := make([]string, 0, len(s.Values))
|
|
|
|
for k, _ := range s.Values {
|
|
|
|
a = append(a, k)
|
|
|
|
}
|
|
|
|
sort.Strings(a)
|
|
|
|
return a
|
|
|
|
}
|
|
|
|
|
2015-09-03 02:38:31 +00:00
|
|
|
// DiagnosticFromMap returns a Diagnostic from a map.
|
|
|
|
func DiagnosticFromMap(m map[string]interface{}) *Diagnostic {
|
|
|
|
// Display columns in deterministic order.
|
|
|
|
sortedKeys := make([]string, 0, len(m))
|
|
|
|
for k, _ := range m {
|
|
|
|
sortedKeys = append(sortedKeys, k)
|
|
|
|
}
|
|
|
|
sort.Strings(sortedKeys)
|
|
|
|
|
|
|
|
d := NewDiagnostic(sortedKeys)
|
|
|
|
row := make([]interface{}, len(sortedKeys))
|
|
|
|
for i, k := range sortedKeys {
|
|
|
|
row[i] = m[k]
|
|
|
|
}
|
|
|
|
d.AddRow(row)
|
|
|
|
|
|
|
|
return d
|
2015-09-01 03:17:13 +00:00
|
|
|
}
|