Rename "metrics" to "stats"
parent
ea7b7be534
commit
ae3b3d5252
|
@ -192,6 +192,7 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
|
|||
config.Statistics.Database, err.Error())
|
||||
}
|
||||
s.StartSelfMonitoring(database, policy, time.Duration(interval))
|
||||
log.Printf("started self-monitoring at interval of %s", interval)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
37
server.go
37
server.go
|
@ -72,7 +72,7 @@ type Server struct {
|
|||
|
||||
shards map[uint64]*Shard // shards by shard id
|
||||
|
||||
metrics *Stats
|
||||
stats *Stats
|
||||
Logger *log.Logger
|
||||
WriteTrace bool // Detailed logging of write path
|
||||
|
||||
|
@ -104,9 +104,9 @@ func NewServer() *Server {
|
|||
databases: make(map[string]*database),
|
||||
users: make(map[string]*User),
|
||||
|
||||
shards: make(map[uint64]*Shard),
|
||||
metrics: NewStats("server"),
|
||||
Logger: log.New(os.Stderr, "[server] ", log.LstdFlags),
|
||||
shards: make(map[uint64]*Shard),
|
||||
stats: NewStats("server"),
|
||||
Logger: log.New(os.Stderr, "[server] ", log.LstdFlags),
|
||||
}
|
||||
// Server will always return with authentication enabled.
|
||||
// This ensures that disabling authentication must be an explicit decision.
|
||||
|
@ -329,15 +329,16 @@ func (s *Server) StartSelfMonitoring(database, retention string, interval time.D
|
|||
}
|
||||
|
||||
// Grab the initial stats.
|
||||
prev := s.metrics.Snapshot()
|
||||
prev := s.stats.Snapshot()
|
||||
|
||||
for {
|
||||
time.Sleep(interval)
|
||||
|
||||
// Grab the current stats and diff them.
|
||||
stats := s.metrics.Snapshot()
|
||||
stats := s.stats.Snapshot()
|
||||
diff := stats.Diff(prev)
|
||||
|
||||
// Create the data point and write it.
|
||||
point := Point{
|
||||
Name: diff.Name(),
|
||||
Tags: map[string]string{"id": strconv.FormatUint(s.id, 10)},
|
||||
|
@ -346,8 +347,6 @@ func (s *Server) StartSelfMonitoring(database, retention string, interval time.D
|
|||
diff.Walk(func(k string, v int64) {
|
||||
point.Fields[k] = v
|
||||
})
|
||||
|
||||
// XXX write diff to itself. Tag with server hostname and ID.
|
||||
s.WriteSeries(database, retention, []Point{point})
|
||||
|
||||
// Save stats for the next loop.
|
||||
|
@ -492,7 +491,7 @@ func (s *Server) Client() MessagingClient {
|
|||
// This function waits until the message has been processed by the server.
|
||||
// Returns the broker log index of the message or an error.
|
||||
func (s *Server) broadcast(typ messaging.MessageType, c interface{}) (uint64, error) {
|
||||
s.metrics.Inc("broadcastMessageTx")
|
||||
s.stats.Inc("broadcastMessageTx")
|
||||
|
||||
// Encode the command.
|
||||
data, err := json.Marshal(c)
|
||||
|
@ -979,7 +978,7 @@ func (s *Server) applyCreateShardGroupIfNotExists(m *messaging.Message) (err err
|
|||
nodeIndex++
|
||||
}
|
||||
}
|
||||
s.metrics.Add("shardsCreated", int64(len(g.Shards)))
|
||||
s.stats.Add("shardsCreated", int64(len(g.Shards)))
|
||||
|
||||
// Retention policy has a new shard group, so update the policy.
|
||||
rp.shardGroups = append(rp.shardGroups, g)
|
||||
|
@ -1060,7 +1059,7 @@ func (s *Server) applyDeleteShardGroup(m *messaging.Message) (err error) {
|
|||
// Remove from metastore.
|
||||
rp.removeShardGroupByID(c.ID)
|
||||
err = s.meta.mustUpdate(m.Index, func(tx *metatx) error {
|
||||
s.metrics.Add("shardsDeleted", int64(len(g.Shards)))
|
||||
s.stats.Add("shardsDeleted", int64(len(g.Shards)))
|
||||
return tx.saveDatabase(db)
|
||||
})
|
||||
return
|
||||
|
@ -1547,11 +1546,11 @@ type Point struct {
|
|||
// WriteSeries writes series data to the database.
|
||||
// Returns the messaging index the data was written to.
|
||||
func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (idx uint64, err error) {
|
||||
s.metrics.Inc("batchWriteRx")
|
||||
s.metrics.Add("pointWriteRx", int64(len(points)))
|
||||
s.stats.Inc("batchWriteRx")
|
||||
s.stats.Add("pointWriteRx", int64(len(points)))
|
||||
defer func() {
|
||||
if err != nil {
|
||||
s.metrics.Inc("batchWriteRxError")
|
||||
s.stats.Inc("batchWriteRxError")
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -1671,7 +1670,7 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
|
|||
if err != nil {
|
||||
return maxIndex, err
|
||||
}
|
||||
s.metrics.Inc("writeSeriesMessageTx")
|
||||
s.stats.Inc("writeSeriesMessageTx")
|
||||
if index > maxIndex {
|
||||
maxIndex = index
|
||||
}
|
||||
|
@ -2492,8 +2491,8 @@ func (s *Server) executeShowContinuousQueriesStatement(stmt *influxql.ShowContin
|
|||
|
||||
func (s *Server) executeShowStatsStatement(stmt *influxql.ShowStatsStatement, user *User) *Result {
|
||||
row := &influxql.Row{Columns: []string{}}
|
||||
row.Name = s.metrics.Name()
|
||||
s.metrics.Walk(func(k string, v int64) {
|
||||
row.Name = s.stats.Name()
|
||||
s.stats.Walk(func(k string, v int64) {
|
||||
row.Columns = append(row.Columns, k)
|
||||
row.Values = append(row.Values, []interface{}{v})
|
||||
})
|
||||
|
@ -2875,7 +2874,7 @@ func (s *Server) processor(conn MessagingConn, done chan struct{}) {
|
|||
|
||||
// All messages must be processed under lock.
|
||||
func() {
|
||||
s.metrics.Inc("broadcastMessageRx")
|
||||
s.stats.Inc("broadcastMessageRx")
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
|
@ -3339,7 +3338,7 @@ func (s *Server) shouldRunContinuousQuery(cq *ContinuousQuery) bool {
|
|||
// runContinuousQuery will execute a continuous query
|
||||
// TODO: make this fan out to the cluster instead of running all the queries on this single data node
|
||||
func (s *Server) runContinuousQuery(cq *ContinuousQuery) {
|
||||
s.metrics.Inc("continuousQueryExecuted")
|
||||
s.stats.Inc("continuousQueryExecuted")
|
||||
cq.mu.Lock()
|
||||
defer cq.mu.Unlock()
|
||||
|
||||
|
|
Loading…
Reference in New Issue