Merge pull request #4050 from influxdb/collectd_stats

Add collectd stats
pull/4011/head
Philip O'Toole 2015-09-08 19:17:22 -07:00
commit dbcf56abf6
2 changed files with 35 additions and 5 deletions

View File

@ -4,6 +4,7 @@
With this release InfluxDB is moving to Go 1.5.
### Features
- [#4050](https://github.com/influxdb/influxdb/pull/4050): Add stats to collectd
- [#3771](https://github.com/influxdb/influxdb/pull/3771): Close idle Graphite TCP connections
- [#3755](https://github.com/influxdb/influxdb/issues/3755): Add option to build script. Thanks @fg2it
- [#3863](https://github.com/influxdb/influxdb/pull/3863): Move to Go 1.5

View File

@ -1,13 +1,16 @@
package collectd
import (
"expvar"
"fmt"
"log"
"net"
"os"
"strings"
"sync"
"time"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/tsdb"
@ -16,6 +19,17 @@ import (
const leaderWaitTimeout = 30 * time.Second
// statistics gathered by the collectd service.
const (
statPointsReceived = "points_rx"
statBytesReceived = "bytes_rx"
statPointsParseFail = "points_parse_fail"
statReadFail = "read_fail"
statBatchesTrasmitted = "batches_tx"
statPointsTransmitted = "points_tx"
statBatchesTransmitFail = "batches_tx_fail"
)
// pointsWriter is an internal interface to make testing easier.
type pointsWriter interface {
WritePoints(p *cluster.WritePointsRequest) error
@ -42,6 +56,9 @@ type Service struct {
batcher *tsdb.PointBatcher
typesdb gollectd.Types
addr net.Addr
// expvar-based stats.
statMap *expvar.Map
}
// NewService returns a new instance of the collectd service.
@ -59,6 +76,12 @@ func NewService(c Config) *Service {
func (s *Service) Open() error {
s.Logger.Printf("Starting collectd service")
// Configure expvar monitoring. It's OK to do this even if the service fails to open and
// should be done before any data could arrive for the service.
key := strings.Join([]string{"collectd", s.Config.BindAddress}, ":")
tags := map[string]string{"bind": s.Config.BindAddress}
s.statMap = influxdb.NewStatistics(key, "collectd", tags)
if s.Config.BindAddress == "" {
return fmt.Errorf("bind address is blank")
} else if s.Config.Database == "" {
@ -182,10 +205,12 @@ func (s *Service) serve() {
n, _, err := s.ln.ReadFromUDP(buffer)
if err != nil {
s.statMap.Add(statReadFail, 1)
s.Logger.Printf("collectd ReadFromUDP error: %s", err)
continue
}
if n > 0 {
s.statMap.Add(statBytesReceived, int64(n))
s.handleMessage(buffer[:n])
}
}
@ -194,6 +219,7 @@ func (s *Service) serve() {
func (s *Service) handleMessage(buffer []byte) {
packets, err := gollectd.Packets(buffer, s.typesdb)
if err != nil {
s.statMap.Add(statPointsParseFail, 1)
s.Logger.Printf("Collectd parse error: %s", err)
return
}
@ -202,6 +228,7 @@ func (s *Service) handleMessage(buffer []byte) {
for _, p := range points {
s.batcher.In() <- p
}
s.statMap.Add(statPointsReceived, int64(len(points)))
}
}
@ -213,15 +240,17 @@ func (s *Service) writePoints() {
case <-s.stop:
return
case batch := <-s.batcher.Out():
req := &cluster.WritePointsRequest{
if err := s.PointsWriter.WritePoints(&cluster.WritePointsRequest{
Database: s.Config.Database,
RetentionPolicy: s.Config.RetentionPolicy,
ConsistencyLevel: cluster.ConsistencyLevelAny,
Points: batch,
}
if err := s.PointsWriter.WritePoints(req); err != nil {
s.Logger.Printf("failed to write batch: %s", err)
continue
}); err == nil {
s.statMap.Add(statBatchesTrasmitted, 1)
s.statMap.Add(statPointsTransmitted, int64(len(batch)))
} else {
s.Logger.Printf("failed to write point batch to database %q: %s", s.Config.Database, err)
s.statMap.Add(statBatchesTransmitFail, 1)
}
}
}