diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 782fd6abcb..25f9e9a5b5 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -58,7 +58,7 @@ type Server struct { ClusterService *cluster.Service SnapshotterService *snapshotter.Service - MonitorService *monitor.Service + Monitor *monitor.Monitor // Server reporting reportingDisabled bool @@ -93,8 +93,8 @@ func NewServer(c *Config, version string) (*Server, error) { if err != nil { return nil, err } - s.MonitorService = monitor.NewService(c.Monitor) - if err := s.MonitorService.Open(clusterID, s.MetaStore.NodeID(), s.Hostname); err != nil { + s.Monitor = monitor.New(c.Monitor) + if err := s.Monitor.Open(clusterID, s.MetaStore.NodeID(), s.Hostname); err != nil { return nil, err } @@ -113,7 +113,7 @@ func NewServer(c *Config, version string) (*Server, error) { s.QueryExecutor = tsdb.NewQueryExecutor(s.TSDBStore) s.QueryExecutor.MetaStore = s.MetaStore s.QueryExecutor.MetaStatementExecutor = &meta.StatementExecutor{Store: s.MetaStore} - s.QueryExecutor.MonitorStatementExecutor = s.MonitorService + s.QueryExecutor.MonitorStatementExecutor = s.Monitor s.QueryExecutor.ShardMapper = s.ShardMapper // Set the shard writer @@ -244,7 +244,7 @@ func (s *Server) appendGraphiteService(c graphite.Config) error { srv.PointsWriter = s.PointsWriter srv.MetaStore = s.MetaStore - srv.MonitorService = s.MonitorService + srv.Monitor = s.Monitor s.Services = append(s.Services, srv) return nil } diff --git a/monitor/service.go b/monitor/service.go index 27961fc1a9..aa3dd36ad9 100644 --- a/monitor/service.go +++ b/monitor/service.go @@ -21,8 +21,8 @@ type Client interface { Diagnostics() (map[string]interface{}, error) } -// Service represents an instance of the monitor service. -type Service struct { +// Monitor represents an instance of the monitor system. +type Monitor struct { wg sync.WaitGroup done chan struct{} mu sync.Mutex @@ -40,9 +40,9 @@ type Service struct { Logger *log.Logger } -// NewService returns a new instance of the monitor service. -func NewService(c Config) *Service { - return &Service{ +// New returns a new instance of the monitor system. +func New(c Config) *Monitor { + return &Monitor{ registrations: make([]*clientWithMeta, 0), storeEnabled: c.StoreEnabled, storeDatabase: c.StoreDatabase, @@ -52,67 +52,67 @@ func NewService(c Config) *Service { } } -// Open opens the monitoring service, using the given clusterID, node ID, and hostname -// for identification purposes. -func (s *Service) Open(clusterID, nodeID uint64, hostname string) error { - s.Logger.Printf("starting monitor service for cluster %d, host %s", clusterID, hostname) - s.clusterID = clusterID - s.nodeID = nodeID - s.hostname = hostname +// Open opens the monitoring system, using the given clusterID, node ID, and hostname +// for identification purposem. +func (m *Monitor) Open(clusterID, nodeID uint64, hostname string) error { + m.Logger.Printf("starting monitor system for cluster %d, host %s", clusterID, hostname) + m.clusterID = clusterID + m.nodeID = nodeID + m.hostname = hostname - // Self-register Go runtime stats. - s.Register("runtime", nil, &goRuntime{}) + // Self-register Go runtime statm. + m.Register("runtime", nil, &goRuntime{}) // If enabled, record stats in a InfluxDB system. - if s.storeEnabled { - s.Logger.Printf("storing in %s, database '%s', interval %s", - s.storeAddress, s.storeDatabase, s.storeInterval) + if m.storeEnabled { + m.Logger.Printf("storing in %s, database '%s', interval %s", + m.storeAddress, m.storeDatabase, m.storeInterval) - s.Logger.Printf("ensuring database %s exists on %s", s.storeDatabase, s.storeAddress) - if err := ensureDatabaseExists(s.storeAddress, s.storeDatabase); err != nil { + m.Logger.Printf("ensuring database %s exists on %s", m.storeDatabase, m.storeAddress) + if err := ensureDatabaseExists(m.storeAddress, m.storeDatabase); err != nil { return err } // Start periodic writes to system. - s.wg.Add(1) - go s.storeStatistics() + m.wg.Add(1) + go m.storeStatistics() } return nil } -// Close closes the monitor service. -func (s *Service) Close() { - s.Logger.Println("shutting down monitor service") - close(s.done) - s.wg.Wait() - s.done = nil +// Close closes the monitor system. +func (m *Monitor) Close() { + m.Logger.Println("shutting down monitor system") + close(m.done) + m.wg.Wait() + m.done = nil } // SetLogger sets the internal logger to the logger passed in. -func (s *Service) SetLogger(l *log.Logger) { - s.Logger = l +func (m *Monitor) SetLogger(l *log.Logger) { + m.Logger = l } // Register registers a client with the given name and tags. -func (s *Service) Register(name string, tags map[string]string, client Client) error { - s.mu.Lock() - defer s.mu.Unlock() +func (m *Monitor) Register(name string, tags map[string]string, client Client) error { + m.mu.Lock() + defer m.mu.Unlock() c := &clientWithMeta{ Client: client, name: name, tags: tags, } - s.registrations = append(s.registrations, c) - s.Logger.Printf(`'%s:%v' registered for monitoring`, name, tags) + m.registrations = append(m.registrations, c) + m.Logger.Printf(`'%s:%v' registered for monitoring`, name, tags) return nil } // ExecuteStatement executes monitor-related query statements. -func (s *Service) ExecuteStatement(stmt influxql.Statement) *influxql.Result { +func (m *Monitor) ExecuteStatement(stmt influxql.Statement) *influxql.Result { switch stmt := stmt.(type) { case *influxql.ShowStatsStatement: - return s.executeShowStatistics(stmt) + return m.executeShowStatistics(stmt) default: panic(fmt.Sprintf("unsupported statement type: %T", stmt)) } @@ -120,8 +120,8 @@ func (s *Service) ExecuteStatement(stmt influxql.Statement) *influxql.Result { // executeShowStatistics returns the statistics of the registered monitor client in // the standard form expected by users of the InfluxDB system. -func (s *Service) executeShowStatistics(q *influxql.ShowStatsStatement) *influxql.Result { - stats, _ := s.statistics() +func (m *Monitor) executeShowStatistics(q *influxql.ShowStatsStatement) *influxql.Result { + stats, _ := m.statistics() rows := make([]*influxql.Row, len(stats)) for n, stat := range stats { @@ -139,12 +139,12 @@ func (s *Service) executeShowStatistics(q *influxql.ShowStatsStatement) *influxq } // statistics returns the combined statistics for all registered clients. -func (s *Service) statistics() ([]*statistic, error) { - s.mu.Lock() - defer s.mu.Unlock() +func (m *Monitor) statistics() ([]*statistic, error) { + m.mu.Lock() + defer m.mu.Unlock() - statistics := make([]*statistic, 0, len(s.registrations)) - for _, r := range s.registrations { + statistics := make([]*statistic, 0, len(m.registrations)) + for _, r := range m.registrations { stats, err := r.Client.Statistics() if err != nil { continue @@ -161,21 +161,21 @@ func (s *Service) statistics() ([]*statistic, error) { } // storeStatistics writes the statistics to an InfluxDB system. -func (s *Service) storeStatistics() { +func (m *Monitor) storeStatistics() { // XXX add tags such as local hostname and cluster ID - //a.Tags["clusterID"] = strconv.FormatUint(s.clusterID, 10) - //a.Tags["nodeID"] = strconv.FormatUint(s.nodeID, 10) - //a.Tags["hostname"] = s.hostname - defer s.wg.Done() + //a.Tags["clusterID"] = strconv.FormatUint(m.clusterID, 10) + //a.Tags["nodeID"] = strconv.FormatUint(m.nodeID, 10) + //a.Tags["hostname"] = m.hostname + defer m.wg.Done() - tick := time.NewTicker(s.storeInterval) + tick := time.NewTicker(m.storeInterval) defer tick.Stop() for { select { case <-tick.C: // Write stats here. - case <-s.done: - s.Logger.Printf("terminating storage of statistics to %s", s.storeAddress) + case <-m.done: + m.Logger.Printf("terminating storage of statistics to %s", m.storeAddress) return } @@ -223,7 +223,7 @@ func (s *statistic) valueNames() []string { return a } -// clientWithMeta wraps a registered client with its associated name and tags. +// clientWithMeta wraps a registered client with its associated name and tagm. type clientWithMeta struct { Client name string diff --git a/monitor/service_test.go b/monitor/service_test.go index cff05ec8e2..c4b0a292e2 100644 --- a/monitor/service_test.go +++ b/monitor/service_test.go @@ -8,8 +8,8 @@ import ( ) // Test that a registered stats client results in the correct SHOW STATS output. -func Test_ServiceRegisterStats(t *testing.T) { - service := openService(t) +func Test_RegisterStats(t *testing.T) { + monitor := openMonitor(t) client := mockStatsClient{ StatisticsFn: func() (map[string]interface{}, error) { @@ -21,19 +21,19 @@ func Test_ServiceRegisterStats(t *testing.T) { } // Register a client without tags. - if err := service.Register("foo", nil, client); err != nil { + if err := monitor.Register("foo", nil, client); err != nil { t.Fatalf("failed to register client: %s", err.Error()) } - json := executeShowStatsJSON(t, service) + json := executeShowStatsJSON(t, monitor) if !strings.Contains(json, `{"name":"foo","columns":["bar","qux"],"values":[[1,2.4]]}]}`) { t.Fatalf("SHOW STATS response incorrect, got: %s\n", json) } // Register a client with tags. - if err := service.Register("baz", map[string]string{"proto": "tcp"}, client); err != nil { + if err := monitor.Register("baz", map[string]string{"proto": "tcp"}, client); err != nil { t.Fatalf("failed to register client: %s", err.Error()) } - json = executeShowStatsJSON(t, service) + json = executeShowStatsJSON(t, monitor) if !strings.Contains(json, `{"name":"baz","tags":{"proto":"tcp"},"columns":["bar","qux"],"values":[[1,2.4]]}]}`) { t.Fatalf("SHOW STATS response incorrect, got: %s\n", json) } @@ -51,16 +51,16 @@ func (m mockStatsClient) Diagnostics() (map[string]interface{}, error) { return nil, nil } -func openService(t *testing.T) *Service { - service := NewService(NewConfig()) - err := service.Open(1, 2, "serverA") +func openMonitor(t *testing.T) *Monitor { + monitor := New(NewConfig()) + err := monitor.Open(1, 2, "serverA") if err != nil { - t.Fatalf("failed to open service: %s", err.Error()) + t.Fatalf("failed to open monitor: %s", err.Error()) } - return service + return monitor } -func executeShowStatsJSON(t *testing.T, s *Service) string { +func executeShowStatsJSON(t *testing.T, s *Monitor) string { r := s.ExecuteStatement(&influxql.ShowStatsStatement{}) b, err := r.MarshalJSON() if err != nil { diff --git a/services/graphite/service.go b/services/graphite/service.go index 2cf6739998..9595eb6155 100644 --- a/services/graphite/service.go +++ b/services/graphite/service.go @@ -68,7 +68,7 @@ type Service struct { wg sync.WaitGroup done chan struct{} - MonitorService interface { + Monitor interface { Register(name string, tags map[string]string, client monitor.Client) error } PointsWriter interface { @@ -120,16 +120,16 @@ func (s *Service) Open() error { // One Graphite service hooks up monitoring for all Graphite functionality. monitorOnce.Do(func() { - if s.MonitorService == nil { + if s.Monitor == nil { s.logger.Println("no monitor service available, no monitoring will be performed") return } t := monitor.NewMonitorClient(statMapTCP) - s.MonitorService.Register("graphite", map[string]string{"proto": "tcp"}, t) + s.Monitor.Register("graphite", map[string]string{"proto": "tcp"}, t) u := monitor.NewMonitorClient(statMapUDP) - s.MonitorService.Register("graphite", map[string]string{"proto": "udp"}, u) + s.Monitor.Register("graphite", map[string]string{"proto": "udp"}, u) }) if err := s.MetaStore.WaitForLeader(leaderWaitTimeout); err != nil {