Access expvar directly from monitor

expvar map is already global so access it directly. This simplifies the
code and makes it much eaisier to use from other modules.
pull/3987/head
Philip O'Toole 2015-09-03 22:12:33 -07:00
parent 02e2ed8443
commit 89bc392ec4
4 changed files with 121 additions and 173 deletions

45
influxvar.go Normal file
View File

@ -0,0 +1,45 @@
package influxdb
import (
"expvar"
"sync"
)
var expvarMu sync.Mutex
// NewStatistics returns an expvar-based map with the given key. Within that map
// is another map. Within there "name" is the Measurement name, "tags" are the tags,
// and values are placed at the key "values.
func NewStatistics(key, name string, tags map[string]string) *expvar.Map {
expvarMu.Lock()
defer expvarMu.Unlock()
// Add expvar for this service.
var v expvar.Var
if v = expvar.Get(key); v == nil {
v = expvar.NewMap(key)
}
m := v.(*expvar.Map)
// Set the name
nameVar := &expvar.String{}
nameVar.Set(name)
m.Set("name", nameVar)
// Set the tags
tagsVar := &expvar.Map{}
tagsVar.Init()
for k, v := range tags {
value := &expvar.String{}
value.Set(v)
tagsVar.Set(k, value)
}
m.Set("tags", tagsVar)
// Create and set the values entry used for actual stats.
statMap := &expvar.Map{}
statMap.Init()
m.Set("values", statMap)
return statMap
}

View File

@ -16,13 +16,6 @@ import (
const leaderWaitTimeout = 30 * time.Second
// StatsClient is the interface modules must implement if they wish to register with monitor.
type StatsClient interface {
// Statistics returns a map of keys to values. Each Value must be either int64 or float64.
// Statistical information is written to an InfluxDB system if enabled.
Statistics() (map[string]interface{}, error)
}
// DiagsClient is the interface modules implement if they register diags with monitor.
type DiagsClient interface {
Diagnostics() (*Diagnostic, error)
@ -67,7 +60,6 @@ type Monitor struct {
done chan struct{}
mu sync.Mutex
statRegistrations []*clientWithMeta
diagRegistrations map[string]DiagsClient
storeEnabled bool
@ -93,7 +85,6 @@ type Monitor struct {
func New(c Config) *Monitor {
return &Monitor{
done: make(chan struct{}),
statRegistrations: make([]*clientWithMeta, 0),
diagRegistrations: make(map[string]DiagsClient),
storeEnabled: c.StoreEnabled,
storeDatabase: c.StoreDatabase,
@ -108,9 +99,7 @@ func (m *Monitor) Open() error {
m.Logger.Printf("Starting monitor system")
// Self-register various stats and diagnostics.
gr := &goRuntime{}
m.RegisterStatsClient("runtime", nil, gr)
m.RegisterDiagnosticsClient("runtime", gr)
m.RegisterDiagnosticsClient("runtime", &goRuntime{})
m.RegisterDiagnosticsClient("network", &network{})
m.RegisterDiagnosticsClient("system", &system{})
@ -138,44 +127,6 @@ func (m *Monitor) SetLogger(l *log.Logger) {
m.Logger = l
}
// Register registers a client with the given name and tags.
func (m *Monitor) RegisterStatsClient(name string, tags map[string]string, client StatsClient) error {
m.mu.Lock()
defer m.mu.Unlock()
a := tags
if a == nil {
a = make(map[string]string)
}
// Get cluster-level metadata to supplement stats.
var clusterID string
var hostname string
var err error
if cID, err := m.MetaStore.ClusterID(); err != nil {
m.Logger.Printf("failed to determine cluster ID: %s", err)
} else {
clusterID = strconv.FormatUint(cID, 10)
}
nodeID := strconv.FormatUint(m.MetaStore.NodeID(), 10)
if hostname, err = os.Hostname(); err != nil {
m.Logger.Printf("failed to determine hostname: %s", err)
}
a["clusterID"] = clusterID
a["nodeID"] = nodeID
a["hostname"] = hostname
c := &clientWithMeta{
StatsClient: client,
name: name,
tags: a,
}
m.statRegistrations = append(m.statRegistrations, c)
m.Logger.Printf(`'%s:%v' registered for statistics monitoring`, name, tags)
return nil
}
// RegisterDiagnosticsClient registers a diagnostics client with the given name and tags.
func (m *Monitor) RegisterDiagnosticsClient(name string, client DiagsClient) error {
m.mu.Lock()
@ -185,25 +136,75 @@ func (m *Monitor) RegisterDiagnosticsClient(name string, client DiagsClient) err
return nil
}
// statistics returns the combined statistics for all registered clients.
// statistics returns the combined statistics for all expvar data.
func (m *Monitor) Statistics() ([]*statistic, error) {
m.mu.Lock()
defer m.mu.Unlock()
statistics := make([]*statistic, 0)
statistics := make([]*statistic, 0, len(m.statRegistrations))
for _, r := range m.statRegistrations {
stats, err := r.StatsClient.Statistics()
if err != nil {
continue
expvar.Do(func(kv expvar.KeyValue) {
// Skip built-in expvar stats.
if kv.Key == "memstats" || kv.Key == "cmdline" {
return
}
statistic := &statistic{
Tags: make(map[string]string),
Values: make(map[string]interface{}),
}
// Every other top-level expvar value is a map.
m := kv.Value.(*expvar.Map)
m.Do(func(subKV expvar.KeyValue) {
switch subKV.Key {
case "name":
// straight to string name.
u, err := strconv.Unquote(subKV.Value.String())
if err != nil {
return
}
statistic.Name = u
case "tags":
// string-string tags map.
n := subKV.Value.(*expvar.Map)
n.Do(func(t expvar.KeyValue) {
u, err := strconv.Unquote(t.Value.String())
if err != nil {
return
}
statistic.Tags[t.Key] = u
})
case "values":
// string-interface map.
n := subKV.Value.(*expvar.Map)
n.Do(func(kv expvar.KeyValue) {
var f interface{}
var err error
switch v := kv.Value.(type) {
case *expvar.Float:
f, err = strconv.ParseFloat(v.String(), 64)
if err != nil {
return
}
case *expvar.Int:
f, err = strconv.ParseUint(v.String(), 10, 64)
if err != nil {
return
}
default:
return
}
statistic.Values[kv.Key] = f
})
}
})
// If a registered client has no field data, don't include it in the results
if len(stats) == 0 {
continue
if len(statistic.Values) == 0 {
return
}
statistics = append(statistics, statistic)
})
statistics = append(statistics, newStatistic(r.name, r.tags, stats))
}
return statistics, nil
}
@ -288,16 +289,6 @@ func newStatistic(name string, tags map[string]string, values map[string]interfa
}
}
// tagNames returns a sorted list of the tag names, if any.
func (s *statistic) tagNames() []string {
a := make([]string, 0, len(s.Tags))
for k, _ := range s.Tags {
a = append(a, k)
}
sort.Strings(a)
return a
}
// valueNames returns a sorted list of the value names, if any.
func (s *statistic) valueNames() []string {
a := make([]string, 0, len(s.Values))
@ -308,50 +299,6 @@ func (s *statistic) valueNames() []string {
return a
}
// clientWithMeta wraps a registered client with its associated name and tagm.
type clientWithMeta struct {
StatsClient
name string
tags map[string]string
}
// StatsMonitorClient wraps a *expvar.Map so that it implements the StatsClient interface.
// It is for use by external packages that just record stats in an expvar.Map type.
type StatsMonitorClient struct {
ep *expvar.Map
}
// NewStatsMonitorClient returns a new StatsMonitorClient using the given expvar.Map.
func NewStatsMonitorClient(ep *expvar.Map) *StatsMonitorClient {
return &StatsMonitorClient{ep: ep}
}
// Statistics implements the Client interface for a StatsMonitorClient.
func (m StatsMonitorClient) Statistics() (map[string]interface{}, error) {
values := make(map[string]interface{})
m.ep.Do(func(kv expvar.KeyValue) {
var f interface{}
var err error
switch v := kv.Value.(type) {
case *expvar.Float:
f, err = strconv.ParseFloat(v.String(), 64)
if err != nil {
return
}
case *expvar.Int:
f, err = strconv.ParseUint(v.String(), 10, 64)
if err != nil {
return
}
default:
return
}
values[kv.Key] = f
})
return values, nil
}
// DiagnosticFromMap returns a Diagnostic from a map.
func DiagnosticFromMap(m map[string]interface{}) *Diagnostic {
// Display columns in deterministic order.

View File

@ -5,6 +5,7 @@ import (
"testing"
"time"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/meta"
)
@ -14,28 +15,19 @@ func Test_RegisterStats(t *testing.T) {
monitor := openMonitor(t)
executor := &StatementExecutor{Monitor: monitor}
client := mockStatsClient{
StatisticsFn: func() (map[string]interface{}, error) {
return map[string]interface{}{
"bar": 1,
"qux": 2.4,
}, nil
},
}
// Register a client without tags.
if err := monitor.RegisterStatsClient("foo", nil, client); err != nil {
t.Fatalf("failed to register client: %s", err.Error())
}
// Register stats without tags.
statMap := influxdb.NewStatistics("foo", "foo", nil)
statMap.Add("bar", 1)
statMap.AddFloat("qux", 2.4)
json := executeShowStatsJSON(t, executor)
if !strings.Contains(json, `"columns":["bar","qux"],"values":[[1,2.4]]`) || !strings.Contains(json, `"name":"foo"`) {
t.Fatalf("SHOW STATS response incorrect, got: %s\n", json)
}
// Register a client with tags.
if err := monitor.RegisterStatsClient("baz", map[string]string{"proto": "tcp"}, client); err != nil {
t.Fatalf("failed to register client: %s", err.Error())
}
statMap = influxdb.NewStatistics("bar", "baz", map[string]string{"proto": "tcp"})
statMap.Add("bar", 1)
statMap.AddFloat("qux", 2.4)
json = executeShowStatsJSON(t, executor)
if !strings.Contains(json, `"columns":["bar","qux"],"values":[[1,2.4]]`) ||
!strings.Contains(json, `"name":"baz"`) ||
@ -45,18 +37,6 @@ func Test_RegisterStats(t *testing.T) {
}
}
type mockStatsClient struct {
StatisticsFn func() (map[string]interface{}, error)
}
func (m mockStatsClient) Statistics() (map[string]interface{}, error) {
return m.StatisticsFn()
}
func (m mockStatsClient) Diagnostics() (map[string]interface{}, error) {
return nil, nil
}
type mockMetastore struct{}
func (m *mockMetastore) ClusterID() (uint64, error) { return 1, nil }

View File

@ -12,6 +12,7 @@ import (
"sync"
"time"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/monitor"
@ -102,7 +103,6 @@ type Service struct {
done chan struct{}
Monitor interface {
RegisterStatsClient(name string, tags map[string]string, client monitor.StatsClient) error
RegisterDiagnosticsClient(name string, client monitor.DiagsClient) error
}
PointsWriter interface {
@ -154,9 +154,11 @@ func (s *Service) Open() error {
// Configure expvar monitoring. It's OK to do this even if the service fails to open and
// should be done before any data could arrive for the service.
s.setExpvar()
key := strings.Join([]string{"graphite", s.protocol, s.bindAddress}, ":")
tags := map[string]string{"proto": s.protocol, "bind": s.bindAddress}
s.statMap = influxdb.NewStatistics(key, "graphite", tags)
// // One Graphite service hooks up diagnostics for all Graphite functionality.
// One Graphite service hooks up diagnostics for all Graphite functionality.
monitorOnce.Do(func() {
if s.Monitor == nil {
s.logger.Println("no monitor service available, no monitoring will be performed")
@ -194,13 +196,6 @@ func (s *Service) Open() error {
return err
}
// Register stats for this service, now that it has started successfully.
if s.Monitor != nil {
t := monitor.NewStatsMonitorClient(s.statMap)
s.Monitor.RegisterStatsClient("graphite",
map[string]string{"proto": s.protocol, "bind": s.bindAddress}, t)
}
s.logger.Printf("Listening on %s: %s", strings.ToUpper(s.protocol), s.addr.String())
return nil
}
@ -369,22 +364,3 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) {
}
}
}
// setExpvar configures the expvar based collection for this service. It must be done within a
// lock so previous registrations for this key can be checked. Re-registering a key will result
// in a panic.
func (s *Service) setExpvar() {
expvarMu.Lock()
defer expvarMu.Unlock()
key := strings.Join([]string{"graphite", s.protocol, s.bindAddress}, ":")
// Add expvar for this service.
var m expvar.Var
if m = expvar.Get(key); m == nil {
m = expvar.NewMap(key)
}
s.statMap = m.(*expvar.Map)
}
var expvarMu sync.Mutex