Rename MonitorService to just Monitor
monitor is not a service, it has more in common with meta, since it provides functionality to the query layer. This names makes this clearer.pull/3959/head
parent
dedabea5e0
commit
4e2ee1ea70
|
@ -58,7 +58,7 @@ type Server struct {
|
|||
ClusterService *cluster.Service
|
||||
SnapshotterService *snapshotter.Service
|
||||
|
||||
MonitorService *monitor.Service
|
||||
Monitor *monitor.Monitor
|
||||
|
||||
// Server reporting
|
||||
reportingDisabled bool
|
||||
|
@ -93,8 +93,8 @@ func NewServer(c *Config, version string) (*Server, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.MonitorService = monitor.NewService(c.Monitor)
|
||||
if err := s.MonitorService.Open(clusterID, s.MetaStore.NodeID(), s.Hostname); err != nil {
|
||||
s.Monitor = monitor.New(c.Monitor)
|
||||
if err := s.Monitor.Open(clusterID, s.MetaStore.NodeID(), s.Hostname); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -113,7 +113,7 @@ func NewServer(c *Config, version string) (*Server, error) {
|
|||
s.QueryExecutor = tsdb.NewQueryExecutor(s.TSDBStore)
|
||||
s.QueryExecutor.MetaStore = s.MetaStore
|
||||
s.QueryExecutor.MetaStatementExecutor = &meta.StatementExecutor{Store: s.MetaStore}
|
||||
s.QueryExecutor.MonitorStatementExecutor = s.MonitorService
|
||||
s.QueryExecutor.MonitorStatementExecutor = s.Monitor
|
||||
s.QueryExecutor.ShardMapper = s.ShardMapper
|
||||
|
||||
// Set the shard writer
|
||||
|
@ -244,7 +244,7 @@ func (s *Server) appendGraphiteService(c graphite.Config) error {
|
|||
|
||||
srv.PointsWriter = s.PointsWriter
|
||||
srv.MetaStore = s.MetaStore
|
||||
srv.MonitorService = s.MonitorService
|
||||
srv.Monitor = s.Monitor
|
||||
s.Services = append(s.Services, srv)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -21,8 +21,8 @@ type Client interface {
|
|||
Diagnostics() (map[string]interface{}, error)
|
||||
}
|
||||
|
||||
// Service represents an instance of the monitor service.
|
||||
type Service struct {
|
||||
// Monitor represents an instance of the monitor system.
|
||||
type Monitor struct {
|
||||
wg sync.WaitGroup
|
||||
done chan struct{}
|
||||
mu sync.Mutex
|
||||
|
@ -40,9 +40,9 @@ type Service struct {
|
|||
Logger *log.Logger
|
||||
}
|
||||
|
||||
// NewService returns a new instance of the monitor service.
|
||||
func NewService(c Config) *Service {
|
||||
return &Service{
|
||||
// New returns a new instance of the monitor system.
|
||||
func New(c Config) *Monitor {
|
||||
return &Monitor{
|
||||
registrations: make([]*clientWithMeta, 0),
|
||||
storeEnabled: c.StoreEnabled,
|
||||
storeDatabase: c.StoreDatabase,
|
||||
|
@ -52,67 +52,67 @@ func NewService(c Config) *Service {
|
|||
}
|
||||
}
|
||||
|
||||
// Open opens the monitoring service, using the given clusterID, node ID, and hostname
|
||||
// for identification purposes.
|
||||
func (s *Service) Open(clusterID, nodeID uint64, hostname string) error {
|
||||
s.Logger.Printf("starting monitor service for cluster %d, host %s", clusterID, hostname)
|
||||
s.clusterID = clusterID
|
||||
s.nodeID = nodeID
|
||||
s.hostname = hostname
|
||||
// Open opens the monitoring system, using the given clusterID, node ID, and hostname
|
||||
// for identification purposem.
|
||||
func (m *Monitor) Open(clusterID, nodeID uint64, hostname string) error {
|
||||
m.Logger.Printf("starting monitor system for cluster %d, host %s", clusterID, hostname)
|
||||
m.clusterID = clusterID
|
||||
m.nodeID = nodeID
|
||||
m.hostname = hostname
|
||||
|
||||
// Self-register Go runtime stats.
|
||||
s.Register("runtime", nil, &goRuntime{})
|
||||
// Self-register Go runtime statm.
|
||||
m.Register("runtime", nil, &goRuntime{})
|
||||
|
||||
// If enabled, record stats in a InfluxDB system.
|
||||
if s.storeEnabled {
|
||||
s.Logger.Printf("storing in %s, database '%s', interval %s",
|
||||
s.storeAddress, s.storeDatabase, s.storeInterval)
|
||||
if m.storeEnabled {
|
||||
m.Logger.Printf("storing in %s, database '%s', interval %s",
|
||||
m.storeAddress, m.storeDatabase, m.storeInterval)
|
||||
|
||||
s.Logger.Printf("ensuring database %s exists on %s", s.storeDatabase, s.storeAddress)
|
||||
if err := ensureDatabaseExists(s.storeAddress, s.storeDatabase); err != nil {
|
||||
m.Logger.Printf("ensuring database %s exists on %s", m.storeDatabase, m.storeAddress)
|
||||
if err := ensureDatabaseExists(m.storeAddress, m.storeDatabase); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Start periodic writes to system.
|
||||
s.wg.Add(1)
|
||||
go s.storeStatistics()
|
||||
m.wg.Add(1)
|
||||
go m.storeStatistics()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the monitor service.
|
||||
func (s *Service) Close() {
|
||||
s.Logger.Println("shutting down monitor service")
|
||||
close(s.done)
|
||||
s.wg.Wait()
|
||||
s.done = nil
|
||||
// Close closes the monitor system.
|
||||
func (m *Monitor) Close() {
|
||||
m.Logger.Println("shutting down monitor system")
|
||||
close(m.done)
|
||||
m.wg.Wait()
|
||||
m.done = nil
|
||||
}
|
||||
|
||||
// SetLogger sets the internal logger to the logger passed in.
|
||||
func (s *Service) SetLogger(l *log.Logger) {
|
||||
s.Logger = l
|
||||
func (m *Monitor) SetLogger(l *log.Logger) {
|
||||
m.Logger = l
|
||||
}
|
||||
|
||||
// Register registers a client with the given name and tags.
|
||||
func (s *Service) Register(name string, tags map[string]string, client Client) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
func (m *Monitor) Register(name string, tags map[string]string, client Client) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
c := &clientWithMeta{
|
||||
Client: client,
|
||||
name: name,
|
||||
tags: tags,
|
||||
}
|
||||
s.registrations = append(s.registrations, c)
|
||||
s.Logger.Printf(`'%s:%v' registered for monitoring`, name, tags)
|
||||
m.registrations = append(m.registrations, c)
|
||||
m.Logger.Printf(`'%s:%v' registered for monitoring`, name, tags)
|
||||
return nil
|
||||
}
|
||||
|
||||
// ExecuteStatement executes monitor-related query statements.
|
||||
func (s *Service) ExecuteStatement(stmt influxql.Statement) *influxql.Result {
|
||||
func (m *Monitor) ExecuteStatement(stmt influxql.Statement) *influxql.Result {
|
||||
switch stmt := stmt.(type) {
|
||||
case *influxql.ShowStatsStatement:
|
||||
return s.executeShowStatistics(stmt)
|
||||
return m.executeShowStatistics(stmt)
|
||||
default:
|
||||
panic(fmt.Sprintf("unsupported statement type: %T", stmt))
|
||||
}
|
||||
|
@ -120,8 +120,8 @@ func (s *Service) ExecuteStatement(stmt influxql.Statement) *influxql.Result {
|
|||
|
||||
// executeShowStatistics returns the statistics of the registered monitor client in
|
||||
// the standard form expected by users of the InfluxDB system.
|
||||
func (s *Service) executeShowStatistics(q *influxql.ShowStatsStatement) *influxql.Result {
|
||||
stats, _ := s.statistics()
|
||||
func (m *Monitor) executeShowStatistics(q *influxql.ShowStatsStatement) *influxql.Result {
|
||||
stats, _ := m.statistics()
|
||||
rows := make([]*influxql.Row, len(stats))
|
||||
|
||||
for n, stat := range stats {
|
||||
|
@ -139,12 +139,12 @@ func (s *Service) executeShowStatistics(q *influxql.ShowStatsStatement) *influxq
|
|||
}
|
||||
|
||||
// statistics returns the combined statistics for all registered clients.
|
||||
func (s *Service) statistics() ([]*statistic, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
func (m *Monitor) statistics() ([]*statistic, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
statistics := make([]*statistic, 0, len(s.registrations))
|
||||
for _, r := range s.registrations {
|
||||
statistics := make([]*statistic, 0, len(m.registrations))
|
||||
for _, r := range m.registrations {
|
||||
stats, err := r.Client.Statistics()
|
||||
if err != nil {
|
||||
continue
|
||||
|
@ -161,21 +161,21 @@ func (s *Service) statistics() ([]*statistic, error) {
|
|||
}
|
||||
|
||||
// storeStatistics writes the statistics to an InfluxDB system.
|
||||
func (s *Service) storeStatistics() {
|
||||
func (m *Monitor) storeStatistics() {
|
||||
// XXX add tags such as local hostname and cluster ID
|
||||
//a.Tags["clusterID"] = strconv.FormatUint(s.clusterID, 10)
|
||||
//a.Tags["nodeID"] = strconv.FormatUint(s.nodeID, 10)
|
||||
//a.Tags["hostname"] = s.hostname
|
||||
defer s.wg.Done()
|
||||
//a.Tags["clusterID"] = strconv.FormatUint(m.clusterID, 10)
|
||||
//a.Tags["nodeID"] = strconv.FormatUint(m.nodeID, 10)
|
||||
//a.Tags["hostname"] = m.hostname
|
||||
defer m.wg.Done()
|
||||
|
||||
tick := time.NewTicker(s.storeInterval)
|
||||
tick := time.NewTicker(m.storeInterval)
|
||||
defer tick.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-tick.C:
|
||||
// Write stats here.
|
||||
case <-s.done:
|
||||
s.Logger.Printf("terminating storage of statistics to %s", s.storeAddress)
|
||||
case <-m.done:
|
||||
m.Logger.Printf("terminating storage of statistics to %s", m.storeAddress)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -223,7 +223,7 @@ func (s *statistic) valueNames() []string {
|
|||
return a
|
||||
}
|
||||
|
||||
// clientWithMeta wraps a registered client with its associated name and tags.
|
||||
// clientWithMeta wraps a registered client with its associated name and tagm.
|
||||
type clientWithMeta struct {
|
||||
Client
|
||||
name string
|
||||
|
|
|
@ -8,8 +8,8 @@ import (
|
|||
)
|
||||
|
||||
// Test that a registered stats client results in the correct SHOW STATS output.
|
||||
func Test_ServiceRegisterStats(t *testing.T) {
|
||||
service := openService(t)
|
||||
func Test_RegisterStats(t *testing.T) {
|
||||
monitor := openMonitor(t)
|
||||
|
||||
client := mockStatsClient{
|
||||
StatisticsFn: func() (map[string]interface{}, error) {
|
||||
|
@ -21,19 +21,19 @@ func Test_ServiceRegisterStats(t *testing.T) {
|
|||
}
|
||||
|
||||
// Register a client without tags.
|
||||
if err := service.Register("foo", nil, client); err != nil {
|
||||
if err := monitor.Register("foo", nil, client); err != nil {
|
||||
t.Fatalf("failed to register client: %s", err.Error())
|
||||
}
|
||||
json := executeShowStatsJSON(t, service)
|
||||
json := executeShowStatsJSON(t, monitor)
|
||||
if !strings.Contains(json, `{"name":"foo","columns":["bar","qux"],"values":[[1,2.4]]}]}`) {
|
||||
t.Fatalf("SHOW STATS response incorrect, got: %s\n", json)
|
||||
}
|
||||
|
||||
// Register a client with tags.
|
||||
if err := service.Register("baz", map[string]string{"proto": "tcp"}, client); err != nil {
|
||||
if err := monitor.Register("baz", map[string]string{"proto": "tcp"}, client); err != nil {
|
||||
t.Fatalf("failed to register client: %s", err.Error())
|
||||
}
|
||||
json = executeShowStatsJSON(t, service)
|
||||
json = executeShowStatsJSON(t, monitor)
|
||||
if !strings.Contains(json, `{"name":"baz","tags":{"proto":"tcp"},"columns":["bar","qux"],"values":[[1,2.4]]}]}`) {
|
||||
t.Fatalf("SHOW STATS response incorrect, got: %s\n", json)
|
||||
}
|
||||
|
@ -51,16 +51,16 @@ func (m mockStatsClient) Diagnostics() (map[string]interface{}, error) {
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
func openService(t *testing.T) *Service {
|
||||
service := NewService(NewConfig())
|
||||
err := service.Open(1, 2, "serverA")
|
||||
func openMonitor(t *testing.T) *Monitor {
|
||||
monitor := New(NewConfig())
|
||||
err := monitor.Open(1, 2, "serverA")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to open service: %s", err.Error())
|
||||
t.Fatalf("failed to open monitor: %s", err.Error())
|
||||
}
|
||||
return service
|
||||
return monitor
|
||||
}
|
||||
|
||||
func executeShowStatsJSON(t *testing.T, s *Service) string {
|
||||
func executeShowStatsJSON(t *testing.T, s *Monitor) string {
|
||||
r := s.ExecuteStatement(&influxql.ShowStatsStatement{})
|
||||
b, err := r.MarshalJSON()
|
||||
if err != nil {
|
||||
|
|
|
@ -68,7 +68,7 @@ type Service struct {
|
|||
wg sync.WaitGroup
|
||||
done chan struct{}
|
||||
|
||||
MonitorService interface {
|
||||
Monitor interface {
|
||||
Register(name string, tags map[string]string, client monitor.Client) error
|
||||
}
|
||||
PointsWriter interface {
|
||||
|
@ -120,16 +120,16 @@ func (s *Service) Open() error {
|
|||
|
||||
// One Graphite service hooks up monitoring for all Graphite functionality.
|
||||
monitorOnce.Do(func() {
|
||||
if s.MonitorService == nil {
|
||||
if s.Monitor == nil {
|
||||
s.logger.Println("no monitor service available, no monitoring will be performed")
|
||||
return
|
||||
}
|
||||
|
||||
t := monitor.NewMonitorClient(statMapTCP)
|
||||
s.MonitorService.Register("graphite", map[string]string{"proto": "tcp"}, t)
|
||||
s.Monitor.Register("graphite", map[string]string{"proto": "tcp"}, t)
|
||||
|
||||
u := monitor.NewMonitorClient(statMapUDP)
|
||||
s.MonitorService.Register("graphite", map[string]string{"proto": "udp"}, u)
|
||||
s.Monitor.Register("graphite", map[string]string{"proto": "udp"}, u)
|
||||
})
|
||||
|
||||
if err := s.MetaStore.WaitForLeader(leaderWaitTimeout); err != nil {
|
||||
|
|
Loading…
Reference in New Issue