Saner names for diags and stats, and use tags

pull/2081/head
Philip O'Toole 2015-03-26 01:07:48 -07:00 committed by Philip O'Toole
parent 70836bc87d
commit 502ee0764f
2 changed files with 46 additions and 37 deletions

View File

@ -25,10 +25,11 @@ func NewGoDiagnostics() *GoDiagnostics {
} }
// AsRow returns the GoDiagnostic object as an InfluxQL row. // AsRow returns the GoDiagnostic object as an InfluxQL row.
func (g *GoDiagnostics) AsRow() *influxql.Row { func (g *GoDiagnostics) AsRow(measurement string, tags map[string]string) *influxql.Row {
return &influxql.Row{ return &influxql.Row{
Name: "go", Name: measurement,
Columns: []string{"time", "goMaxProcs", "numGoRoutine", "version"}, Columns: []string{"time", "goMaxProcs", "numGoRoutine", "version"},
Tags: tags,
Values: [][]interface{}{[]interface{}{time.Now().UTC(), Values: [][]interface{}{[]interface{}{time.Now().UTC(),
g.GoMaxProcs, g.NumGoroutine, g.Version}}, g.GoMaxProcs, g.NumGoroutine, g.Version}},
} }
@ -60,10 +61,11 @@ func NewSystemDiagnostics() *SystemDiagnostics {
} }
// AsRow returns the GoDiagnostic object as an InfluxQL row. // AsRow returns the GoDiagnostic object as an InfluxQL row.
func (s *SystemDiagnostics) AsRow() *influxql.Row { func (s *SystemDiagnostics) AsRow(measurement string, tags map[string]string) *influxql.Row {
return &influxql.Row{ return &influxql.Row{
Name: "system", Name: measurement,
Columns: []string{"time", "hostname", "pid", "os", "arch", "numCPU"}, Columns: []string{"time", "hostname", "pid", "os", "arch", "numCPU"},
Tags: tags,
Values: [][]interface{}{[]interface{}{time.Now().UTC(), Values: [][]interface{}{[]interface{}{time.Now().UTC(),
s.Hostname, s.PID, s.OS, s.Arch, s.NumCPU}}, s.Hostname, s.PID, s.OS, s.Arch, s.NumCPU}},
} }
@ -111,11 +113,12 @@ func NewMemoryDiagnostics() *MemoryDiagnostics {
} }
// AsRow returns the MemoryDiagnostics object as an InfluxQL row. // AsRow returns the MemoryDiagnostics object as an InfluxQL row.
func (m *MemoryDiagnostics) AsRow() *influxql.Row { func (m *MemoryDiagnostics) AsRow(measurement string, tags map[string]string) *influxql.Row {
return &influxql.Row{ return &influxql.Row{
Name: "memory", Name: measurement,
Columns: []string{"time", "alloc", "totalAlloc", "sys", "lookups", "mallocs", "frees", "heapAlloc", Columns: []string{"time", "alloc", "totalAlloc", "sys", "lookups", "mallocs", "frees", "heapAlloc",
"heapSys", "heapIdle", "heapInUse", "heapReleased", "heapObjects", "pauseTotalNs", "numGG"}, "heapSys", "heapIdle", "heapInUse", "heapReleased", "heapObjects", "pauseTotalNs", "numGG"},
Tags: tags,
Values: [][]interface{}{[]interface{}{time.Now().UTC(), Values: [][]interface{}{[]interface{}{time.Now().UTC(),
m.Alloc, m.TotalAlloc, m.Sys, m.Lookups, m.Mallocs, m.Frees, m.HeapAlloc, m.Alloc, m.TotalAlloc, m.Sys, m.Lookups, m.Mallocs, m.Frees, m.HeapAlloc,
m.HeapSys, m.HeapIdle, m.HeapInUse, m.HeapReleased, m.HeapObjects, m.PauseTotalNs, m.NumGC}}, m.HeapSys, m.HeapIdle, m.HeapInUse, m.HeapReleased, m.HeapObjects, m.PauseTotalNs, m.NumGC}},
@ -129,10 +132,11 @@ type BuildDiagnostics struct {
} }
// AsRow returns the BuildDiagnostics object as an InfluxQL row. // AsRow returns the BuildDiagnostics object as an InfluxQL row.
func (b *BuildDiagnostics) AsRow() *influxql.Row { func (b *BuildDiagnostics) AsRow(measurement string, tags map[string]string) *influxql.Row {
return &influxql.Row{ return &influxql.Row{
Name: "build", Name: measurement,
Columns: []string{"time", "version", "commitHash"}, Columns: []string{"time", "version", "commitHash"},
Tags: tags,
Values: [][]interface{}{[]interface{}{time.Now().UTC(), Values: [][]interface{}{[]interface{}{time.Now().UTC(),
b.Version, b.CommitHash}}, b.Version, b.CommitHash}},
} }

View File

@ -339,23 +339,26 @@ func (s *Server) StartSelfMonitoring(database, retention string, interval time.D
return fmt.Errorf("statistics check interval must be non-zero") return fmt.Errorf("statistics check interval must be non-zero")
} }
// Function for local use turns stats into a point. // Function for local use turns stats into a slice of points
pointFromStats := func(st *Stats, tags map[string]string) Point { pointsFromStats := func(st *Stats, tags map[string]string) []Point {
point := Point{
Timestamp: time.Now(),
Name: "stat_" + st.Name(),
Tags: make(map[string]string),
Fields: make(map[string]interface{}),
}
// Specifically create a new map.
for k, v := range tags {
point.Tags[k] = v
}
points := make([]Point, 0)
now := time.Now()
st.Walk(func(k string, v int64) { st.Walk(func(k string, v int64) {
point.Fields[k] = int(v) point := Point{
Timestamp: now,
Name: st.name + "_" + k,
Tags: make(map[string]string),
Fields: map[string]interface{}{"value": int(v)},
}
// Specifically create a new map.
for k, v := range tags {
point.Tags[k] = v
}
points = append(points, point)
}) })
return point
return points
} }
go func() { go func() {
@ -364,25 +367,21 @@ func (s *Server) StartSelfMonitoring(database, retention string, interval time.D
<-tick.C <-tick.C
// Create the batch and tags // Create the batch and tags
batch := make([]Point, 0)
tags := map[string]string{"serverID": strconv.FormatUint(s.ID(), 10)} tags := map[string]string{"serverID": strconv.FormatUint(s.ID(), 10)}
if h, err := os.Hostname(); err == nil { if h, err := os.Hostname(); err == nil {
tags["host"] = h tags["host"] = h
} }
batch := pointsFromStats(s.stats, tags)
// Server stats.
batch = append(batch, pointFromStats(s.stats, tags))
// Shard-level stats. // Shard-level stats.
tags["shardID"] = strconv.FormatUint(s.id, 10)
for _, sh := range s.shards { for _, sh := range s.shards {
point := pointFromStats(sh.stats, tags) batch = append(batch, pointsFromStats(sh.stats, tags)...)
point.Tags["shardID"] = strconv.FormatUint(s.id, 10)
batch = append(batch, point)
} }
// Server diagnostics. // Server diagnostics.
for _, row := range s.DiagnosticsAsRows() { for _, row := range s.DiagnosticsAsRows() {
points, err := s.convertRowToPoints("diag_"+row.Name, row) points, err := s.convertRowToPoints(row.Name, row)
if err != nil { if err != nil {
s.Logger.Printf("failed to write diagnostic row for %s: %s", row.Name, err.Error()) s.Logger.Printf("failed to write diagnostic row for %s: %s", row.Name, err.Error())
continue continue
@ -3115,20 +3114,25 @@ func (s *Server) DiagnosticsAsRows() []*influxql.Row {
md := NewMemoryDiagnostics() md := NewMemoryDiagnostics()
bd := BuildDiagnostics{Version: s.Version, CommitHash: s.CommitHash} bd := BuildDiagnostics{Version: s.Version, CommitHash: s.CommitHash}
// Common tagset.
tags := map[string]string{"serverID": strconv.FormatUint(s.id, 10)}
// Server row. // Server row.
serverRow := &influxql.Row{ serverRow := &influxql.Row{
Name: "server", Name: "server_diag",
Columns: []string{"time", "startTime", "uptime", "id", Columns: []string{"time", "startTime", "uptime", "id",
"path", "authEnabled", "index", "retentionAutoCreate", "numShards", "cqLastRun"}, "path", "authEnabled", "index", "retentionAutoCreate", "numShards", "cqLastRun"},
Tags: tags,
Values: [][]interface{}{[]interface{}{now, startTime.String(), time.Since(startTime).String(), strconv.FormatUint(s.id, 10), Values: [][]interface{}{[]interface{}{now, startTime.String(), time.Since(startTime).String(), strconv.FormatUint(s.id, 10),
s.path, s.authenticationEnabled, int(s.index), s.RetentionAutoCreate, len(s.shards), s.lastContinuousQueryRun.String()}}, s.path, s.authenticationEnabled, int(s.index), s.RetentionAutoCreate, len(s.shards), s.lastContinuousQueryRun.String()}},
} }
// Shard groups. // Shard groups.
shardGroupsRow := &influxql.Row{Columns: []string{}} shardGroupsRow := &influxql.Row{Columns: []string{}}
shardGroupsRow.Name = "shardGroups" shardGroupsRow.Name = "shardGroups_diag"
shardGroupsRow.Columns = append(shardGroupsRow.Columns, "time", "database", "retentionPolicy", "id", shardGroupsRow.Columns = append(shardGroupsRow.Columns, "time", "database", "retentionPolicy", "id",
"startTime", "endTime", "duration", "numShards") "startTime", "endTime", "duration", "numShards")
shardGroupsRow.Tags = tags
// Check all shard groups. // Check all shard groups.
for _, db := range s.databases { for _, db := range s.databases {
for _, rp := range db.policies { for _, rp := range db.policies {
@ -3141,8 +3145,9 @@ func (s *Server) DiagnosticsAsRows() []*influxql.Row {
// Shards // Shards
shardsRow := &influxql.Row{Columns: []string{}} shardsRow := &influxql.Row{Columns: []string{}}
shardsRow.Name = "shards" shardsRow.Name = "shards_diag"
shardsRow.Columns = append(shardsRow.Columns, "time", "id", "dataNodes", "index", "path") shardsRow.Columns = append(shardsRow.Columns, "time", "id", "dataNodes", "index", "path")
shardsRow.Tags = tags
for _, sh := range s.shards { for _, sh := range s.shards {
var nodes []string var nodes []string
for _, n := range sh.DataNodeIDs { for _, n := range sh.DataNodeIDs {
@ -3153,10 +3158,10 @@ func (s *Server) DiagnosticsAsRows() []*influxql.Row {
} }
return []*influxql.Row{ return []*influxql.Row{
gd.AsRow(), gd.AsRow("server_go", tags),
sd.AsRow(), sd.AsRow("server_system", tags),
md.AsRow(), md.AsRow("server_memory", tags),
bd.AsRow(), bd.AsRow("server_build", tags),
serverRow, serverRow,
shardGroupsRow, shardGroupsRow,
shardsRow, shardsRow,