diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index cc6e92a73f..3c5d01f389 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -192,6 +192,7 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B config.Statistics.Database, err.Error()) } s.StartSelfMonitoring(database, policy, time.Duration(interval)) + log.Printf("started self-monitoring at interval of %s", interval) } } diff --git a/server.go b/server.go index 8912d39b01..4e16364333 100644 --- a/server.go +++ b/server.go @@ -72,7 +72,7 @@ type Server struct { shards map[uint64]*Shard // shards by shard id - metrics *Stats + stats *Stats Logger *log.Logger WriteTrace bool // Detailed logging of write path @@ -104,9 +104,9 @@ func NewServer() *Server { databases: make(map[string]*database), users: make(map[string]*User), - shards: make(map[uint64]*Shard), - metrics: NewStats("server"), - Logger: log.New(os.Stderr, "[server] ", log.LstdFlags), + shards: make(map[uint64]*Shard), + stats: NewStats("server"), + Logger: log.New(os.Stderr, "[server] ", log.LstdFlags), } // Server will always return with authentication enabled. // This ensures that disabling authentication must be an explicit decision. @@ -329,15 +329,16 @@ func (s *Server) StartSelfMonitoring(database, retention string, interval time.D } // Grab the initial stats. - prev := s.metrics.Snapshot() + prev := s.stats.Snapshot() for { time.Sleep(interval) // Grab the current stats and diff them. - stats := s.metrics.Snapshot() + stats := s.stats.Snapshot() diff := stats.Diff(prev) + // Create the data point and write it. point := Point{ Name: diff.Name(), Tags: map[string]string{"id": strconv.FormatUint(s.id, 10)}, @@ -346,8 +347,6 @@ func (s *Server) StartSelfMonitoring(database, retention string, interval time.D diff.Walk(func(k string, v int64) { point.Fields[k] = v }) - - // XXX write diff to itself. Tag with server hostname and ID. s.WriteSeries(database, retention, []Point{point}) // Save stats for the next loop. @@ -492,7 +491,7 @@ func (s *Server) Client() MessagingClient { // This function waits until the message has been processed by the server. // Returns the broker log index of the message or an error. func (s *Server) broadcast(typ messaging.MessageType, c interface{}) (uint64, error) { - s.metrics.Inc("broadcastMessageTx") + s.stats.Inc("broadcastMessageTx") // Encode the command. data, err := json.Marshal(c) @@ -979,7 +978,7 @@ func (s *Server) applyCreateShardGroupIfNotExists(m *messaging.Message) (err err nodeIndex++ } } - s.metrics.Add("shardsCreated", int64(len(g.Shards))) + s.stats.Add("shardsCreated", int64(len(g.Shards))) // Retention policy has a new shard group, so update the policy. rp.shardGroups = append(rp.shardGroups, g) @@ -1060,7 +1059,7 @@ func (s *Server) applyDeleteShardGroup(m *messaging.Message) (err error) { // Remove from metastore. rp.removeShardGroupByID(c.ID) err = s.meta.mustUpdate(m.Index, func(tx *metatx) error { - s.metrics.Add("shardsDeleted", int64(len(g.Shards))) + s.stats.Add("shardsDeleted", int64(len(g.Shards))) return tx.saveDatabase(db) }) return @@ -1547,11 +1546,11 @@ type Point struct { // WriteSeries writes series data to the database. // Returns the messaging index the data was written to. func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (idx uint64, err error) { - s.metrics.Inc("batchWriteRx") - s.metrics.Add("pointWriteRx", int64(len(points))) + s.stats.Inc("batchWriteRx") + s.stats.Add("pointWriteRx", int64(len(points))) defer func() { if err != nil { - s.metrics.Inc("batchWriteRxError") + s.stats.Inc("batchWriteRxError") } }() @@ -1671,7 +1670,7 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) ( if err != nil { return maxIndex, err } - s.metrics.Inc("writeSeriesMessageTx") + s.stats.Inc("writeSeriesMessageTx") if index > maxIndex { maxIndex = index } @@ -2492,8 +2491,8 @@ func (s *Server) executeShowContinuousQueriesStatement(stmt *influxql.ShowContin func (s *Server) executeShowStatsStatement(stmt *influxql.ShowStatsStatement, user *User) *Result { row := &influxql.Row{Columns: []string{}} - row.Name = s.metrics.Name() - s.metrics.Walk(func(k string, v int64) { + row.Name = s.stats.Name() + s.stats.Walk(func(k string, v int64) { row.Columns = append(row.Columns, k) row.Values = append(row.Values, []interface{}{v}) }) @@ -2875,7 +2874,7 @@ func (s *Server) processor(conn MessagingConn, done chan struct{}) { // All messages must be processed under lock. func() { - s.metrics.Inc("broadcastMessageRx") + s.stats.Inc("broadcastMessageRx") s.mu.Lock() defer s.mu.Unlock() @@ -3339,7 +3338,7 @@ func (s *Server) shouldRunContinuousQuery(cq *ContinuousQuery) bool { // runContinuousQuery will execute a continuous query // TODO: make this fan out to the cluster instead of running all the queries on this single data node func (s *Server) runContinuousQuery(cq *ContinuousQuery) { - s.metrics.Inc("continuousQueryExecuted") + s.stats.Inc("continuousQueryExecuted") cq.mu.Lock() defer cq.mu.Unlock()