Replace kimor79/gollectd with collectd.org

At this point this is a like-for-like swap.

collectd.org is more active and supports more features.
pull/7435/head
Marc 2016-10-08 18:35:45 +08:00
parent d52eb01c17
commit 0260dfb575
3 changed files with 49 additions and 48 deletions

1
Godeps
View File

@ -10,7 +10,6 @@ github.com/gogo/protobuf a9cd0c35b97daf74d0ebf3514c5254814b2703b4
github.com/golang/snappy d9eb7a3d35ec988b8585d4a0068e462c27d28380
github.com/influxdata/usage-client 6d3895376368aa52a3a81d2a16e90f0f52371967
github.com/jwilder/encoding 4dada27c33277820fe35c7ee71ed34fbc9477d00
github.com/kimor79/gollectd b5dddb1667dcc1e6355b9305e2c1608a2db6983c
github.com/paulbellamy/ratecounter 5a11f585a31379765c190c033b6ad39956584447
github.com/peterh/liner 8975875355a81d612fafb9f5a6037bdcc2d9b073
github.com/rakyll/statik e383bbf6b2ec1a2fb8492dfd152d945fb88919b6

View File

@ -12,7 +12,6 @@
- github.com/golang/snappy [BSD LICENSE](https://github.com/golang/snappy/blob/master/LICENSE)
- github.com/influxdata/usage-client [MIT LICENSE](https://github.com/influxdata/usage-client/blob/master/LICENSE.txt)
- github.com/jwilder/encoding [MIT LICENSE](https://github.com/jwilder/encoding/blob/master/LICENSE)
- github.com/kimor79/gollectd [BSD LICENSE](https://github.com/kimor79/gollectd/blob/master/LICENSE)
- github.com/paulbellamy/ratecounter [MIT LICENSE](https://github.com/paulbellamy/ratecounter/blob/master/LICENSE)
- github.com/peterh/liner [MIT LICENSE](https://github.com/peterh/liner/blob/master/COPYING)
- github.com/rakyll/statik [APACHE LICENSE](https://github.com/rakyll/statik/blob/master/LICENSE)

View File

@ -8,14 +8,16 @@ import (
"net"
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
"collectd.org/api"
"collectd.org/network"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
"github.com/kimor79/gollectd"
)
// statistics gathered by the collectd service.
@ -40,6 +42,16 @@ type metaClient interface {
CreateDatabase(name string) (*meta.DatabaseInfo, error)
}
// Reads a collectd types db from a file.
func TypesDBFile(path string) (typesdb *api.TypesDB, err error) {
var reader *os.File
reader, err = os.Open(path)
if err == nil {
typesdb, err = api.NewTypesDB(reader)
}
return
}
// Service represents a UDP server which receives metrics in collectd's binary
// protocol and stores them in InfluxDB.
type Service struct {
@ -51,7 +63,7 @@ type Service struct {
wg sync.WaitGroup
conn *net.UDPConn
batcher *tsdb.PointBatcher
typesdb gollectd.Types
popts network.ParseOpts
addr net.Addr
mu sync.RWMutex
@ -97,12 +109,12 @@ func (s *Service) Open() error {
return fmt.Errorf("PointsWriter is nil")
}
if s.typesdb == nil {
if s.popts.TypesDB == nil {
// Open collectd types.
if stat, err := os.Stat(s.Config.TypesDB); err != nil {
return fmt.Errorf("Stat(): %s", err)
} else if stat.IsDir() {
alltypesdb := make(gollectd.Types)
alltypesdb := new(api.TypesDB)
var readdir func(path string)
readdir = func(path string) {
files, err := ioutil.ReadDir(path)
@ -119,31 +131,24 @@ func (s *Service) Open() error {
}
s.Logger.Printf("Loading %s\n", fullpath)
types, err := gollectd.TypesDBFile(fullpath)
types, err := TypesDBFile(fullpath)
if err != nil {
s.Logger.Printf("Unable to parse collectd types file: %s\n", f.Name())
continue
}
for k, t := range types {
a, ok := alltypesdb[k]
if ok {
alltypesdb[k] = t
} else {
alltypesdb[k] = append(a, t...)
}
}
alltypesdb.Merge(types)
}
}
readdir(s.Config.TypesDB)
s.typesdb = alltypesdb
s.popts.TypesDB = alltypesdb
} else {
s.Logger.Printf("Loading %s\n", s.Config.TypesDB)
typesdb, err := gollectd.TypesDBFile(s.Config.TypesDB)
types, err := TypesDBFile(s.Config.TypesDB)
if err != nil {
return fmt.Errorf("Open(): %s", err)
}
s.typesdb = typesdb
s.popts.TypesDB = types
}
}
// Resolve our address.
@ -278,7 +283,8 @@ func (s *Service) Statistics(tags map[string]string) []models.Statistic {
// SetTypes sets collectd types db.
func (s *Service) SetTypes(types string) (err error) {
s.typesdb, err = gollectd.TypesDB([]byte(types))
reader := strings.NewReader(types)
s.popts.TypesDB, err = api.NewTypesDB(reader)
return
}
@ -321,14 +327,14 @@ func (s *Service) serve() {
}
func (s *Service) handleMessage(buffer []byte) {
packets, err := gollectd.Packets(buffer, s.typesdb)
valueLists, err := network.Parse(buffer, s.popts)
if err != nil {
atomic.AddInt64(&s.stats.PointsParseFail, 1)
s.Logger.Printf("Collectd parse error: %s", err)
return
}
for _, packet := range *packets {
points := s.UnmarshalCollectd(&packet)
for _, valueList := range valueLists {
points := s.UnmarshalValueList(&valueList)
for _, p := range points {
s.batcher.In() <- p
}
@ -359,41 +365,38 @@ func (s *Service) writePoints() {
}
}
// Unmarshal translates a collectd packet into InfluxDB data points.
func (s *Service) UnmarshalCollectd(packet *gollectd.Packet) []models.Point {
// Prefer high resolution timestamp.
var timestamp time.Time
if packet.TimeHR > 0 {
// TimeHR is "near" nanosecond measurement, but not exactly nanasecond time
// Since we store time in microseconds, we round here (mostly so tests will work easier)
sec := packet.TimeHR >> 30
// Shifting, masking, and dividing by 1 billion to get nanoseconds.
nsec := ((packet.TimeHR & 0x3FFFFFFF) << 30) / 1000 / 1000 / 1000
timestamp = time.Unix(int64(sec), int64(nsec)).UTC().Round(time.Microsecond)
} else {
// If we don't have high resolution time, fall back to basic unix time
timestamp = time.Unix(int64(packet.Time), 0).UTC()
}
// Unmarshal translates a ValueList into InfluxDB data points.
func (s *Service) UnmarshalValueList(vl *api.ValueList) []models.Point {
timestamp := vl.Time.UTC()
var points []models.Point
for i := range packet.Values {
name := fmt.Sprintf("%s_%s", packet.Plugin, packet.Values[i].Name)
for i := range vl.Values {
var name string
name = fmt.Sprintf("%s_%s", vl.Identifier.Plugin, vl.DSName(i))
tags := make(map[string]string)
fields := make(map[string]interface{})
fields["value"] = packet.Values[i].Value
// Convert interface back to actual type, then to float64
switch value := vl.Values[i].(type) {
case api.Gauge:
fields["value"] = float64(value)
case api.Derive:
fields["value"] = float64(value)
case api.Counter:
fields["value"] = float64(value)
}
if packet.Hostname != "" {
tags["host"] = packet.Hostname
if vl.Identifier.Host != "" {
tags["host"] = vl.Identifier.Host
}
if packet.PluginInstance != "" {
tags["instance"] = packet.PluginInstance
if vl.Identifier.PluginInstance != "" {
tags["instance"] = vl.Identifier.PluginInstance
}
if packet.Type != "" {
tags["type"] = packet.Type
if vl.Identifier.Type != "" {
tags["type"] = vl.Identifier.Type
}
if packet.TypeInstance != "" {
tags["type_instance"] = packet.TypeInstance
if vl.Identifier.TypeInstance != "" {
tags["type_instance"] = vl.Identifier.TypeInstance
}
// Drop invalid points