Make Monitor safer; don't set clusterID

pull/6551/head
Edd Robinson 2016-05-03 21:33:35 +01:00
parent f2bb9db1c5
commit b2d5616662
1 changed files with 32 additions and 16 deletions

View File

@ -2,7 +2,6 @@ package monitor // import "github.com/influxdata/influxdb/monitor"
import ( import (
"expvar" "expvar"
"fmt"
"io" "io"
"log" "log"
"os" "os"
@ -34,19 +33,21 @@ type Monitor struct {
Branch string Branch string
BuildTime string BuildTime string
wg sync.WaitGroup wg sync.WaitGroup
done chan struct{}
mu sync.Mutex
mu sync.Mutex
diagRegistrations map[string]diagnostics.Client diagRegistrations map[string]diagnostics.Client
clusterID string
nodeAddr string
done chan struct{}
storeCreated bool
storeEnabled bool
storeAddress string
storeCreated bool
storeEnabled bool
storeDatabase string storeDatabase string
storeRetentionPolicy string storeRetentionPolicy string
storeRetentionDuration time.Duration storeRetentionDuration time.Duration
storeReplicationFactor int storeReplicationFactor int
storeAddress string
storeInterval time.Duration storeInterval time.Duration
MetaClient interface { MetaClient interface {
@ -57,8 +58,6 @@ type Monitor struct {
DropRetentionPolicy(database, name string) error DropRetentionPolicy(database, name string) error
} }
NodeID uint64
// Writer for pushing stats back into the database. // Writer for pushing stats back into the database.
// This causes a circular dependency if it depends on cluster directly so it // This causes a circular dependency if it depends on cluster directly so it
// is wrapped in a simpler interface. // is wrapped in a simpler interface.
@ -86,7 +85,6 @@ func New(c Config) *Monitor {
// for identification purpose. // for identification purpose.
func (m *Monitor) Open() error { func (m *Monitor) Open() error {
m.Logger.Printf("Starting monitor system") m.Logger.Printf("Starting monitor system")
m.done = make(chan struct{})
// Self-register various stats and diagnostics. // Self-register various stats and diagnostics.
m.RegisterDiagnosticsClient("build", &build{ m.RegisterDiagnosticsClient("build", &build{
@ -99,6 +97,10 @@ func (m *Monitor) Open() error {
m.RegisterDiagnosticsClient("network", &network{}) m.RegisterDiagnosticsClient("network", &network{})
m.RegisterDiagnosticsClient("system", &system{}) m.RegisterDiagnosticsClient("system", &system{})
m.mu.Lock()
m.done = make(chan struct{})
m.mu.Unlock()
// If enabled, record stats in a InfluxDB system. // If enabled, record stats in a InfluxDB system.
if m.storeEnabled { if m.storeEnabled {
// Start periodic writes to system. // Start periodic writes to system.
@ -112,10 +114,16 @@ func (m *Monitor) Open() error {
// Close closes the monitor system. // Close closes the monitor system.
func (m *Monitor) Close() { func (m *Monitor) Close() {
m.Logger.Println("shutting down monitor system") m.Logger.Println("shutting down monitor system")
m.mu.Lock()
close(m.done) close(m.done)
m.mu.Unlock()
m.wg.Wait() m.wg.Wait()
m.mu.Lock()
m.done = nil m.done = nil
m.mu.Unlock()
m.DeregisterDiagnosticsClient("build") m.DeregisterDiagnosticsClient("build")
m.DeregisterDiagnosticsClient("runtime") m.DeregisterDiagnosticsClient("runtime")
m.DeregisterDiagnosticsClient("network") m.DeregisterDiagnosticsClient("network")
@ -316,20 +324,28 @@ func (m *Monitor) storeStatistics() {
m.Logger.Printf("Storing statistics in database '%s' retention policy '%s', at interval %s", m.Logger.Printf("Storing statistics in database '%s' retention policy '%s', at interval %s",
m.storeDatabase, m.storeRetentionPolicy, m.storeInterval) m.storeDatabase, m.storeRetentionPolicy, m.storeInterval)
// Get cluster-level metadata. Nothing different is going to happen if errors occur.
clusterID := m.MetaClient.ClusterID()
hostname, _ := os.Hostname() hostname, _ := os.Hostname()
clusterTags := map[string]string{ clusterTags := map[string]string{
"clusterID": fmt.Sprintf("%d", clusterID), "hostname": hostname,
"nodeID": fmt.Sprintf("%d", m.NodeID), }
"hostname": hostname,
m.mu.Lock()
if m.clusterID != "" {
clusterTags["clusterID"] = m.clusterID
}
if m.nodeAddr != "" {
clusterTags["nodeAddr"] = m.nodeAddr
} }
tick := time.NewTicker(m.storeInterval) tick := time.NewTicker(m.storeInterval)
m.mu.Unlock()
defer tick.Stop() defer tick.Stop()
for { for {
select { select {
case <-tick.C: case <-tick.C:
m.mu.Lock()
m.createInternalStorage() m.createInternalStorage()
stats, err := m.Statistics(clusterTags) stats, err := m.Statistics(clusterTags)
@ -351,11 +367,11 @@ func (m *Monitor) storeStatistics() {
if err := m.PointsWriter.WritePoints(m.storeDatabase, m.storeRetentionPolicy, points); err != nil { if err := m.PointsWriter.WritePoints(m.storeDatabase, m.storeRetentionPolicy, points); err != nil {
m.Logger.Printf("failed to store statistics: %s", err) m.Logger.Printf("failed to store statistics: %s", err)
} }
m.mu.Unlock()
case <-m.done: case <-m.done:
m.Logger.Printf("terminating storage of statistics") m.Logger.Printf("terminating storage of statistics")
return return
} }
} }
} }