diff --git a/Godeps b/Godeps index 8e86471682..36246ff162 100644 --- a/Godeps +++ b/Godeps @@ -10,7 +10,6 @@ github.com/gogo/protobuf a9cd0c35b97daf74d0ebf3514c5254814b2703b4 github.com/golang/snappy d9eb7a3d35ec988b8585d4a0068e462c27d28380 github.com/influxdata/usage-client 6d3895376368aa52a3a81d2a16e90f0f52371967 github.com/jwilder/encoding 4dada27c33277820fe35c7ee71ed34fbc9477d00 -github.com/kimor79/gollectd b5dddb1667dcc1e6355b9305e2c1608a2db6983c github.com/paulbellamy/ratecounter 5a11f585a31379765c190c033b6ad39956584447 github.com/peterh/liner 8975875355a81d612fafb9f5a6037bdcc2d9b073 github.com/rakyll/statik e383bbf6b2ec1a2fb8492dfd152d945fb88919b6 diff --git a/LICENSE_OF_DEPENDENCIES.md b/LICENSE_OF_DEPENDENCIES.md index 14bbfa9535..f35eb5ba08 100644 --- a/LICENSE_OF_DEPENDENCIES.md +++ b/LICENSE_OF_DEPENDENCIES.md @@ -12,7 +12,6 @@ - github.com/golang/snappy [BSD LICENSE](https://github.com/golang/snappy/blob/master/LICENSE) - github.com/influxdata/usage-client [MIT LICENSE](https://github.com/influxdata/usage-client/blob/master/LICENSE.txt) - github.com/jwilder/encoding [MIT LICENSE](https://github.com/jwilder/encoding/blob/master/LICENSE) -- github.com/kimor79/gollectd [BSD LICENSE](https://github.com/kimor79/gollectd/blob/master/LICENSE) - github.com/paulbellamy/ratecounter [MIT LICENSE](https://github.com/paulbellamy/ratecounter/blob/master/LICENSE) - github.com/peterh/liner [MIT LICENSE](https://github.com/peterh/liner/blob/master/COPYING) - github.com/rakyll/statik [APACHE LICENSE](https://github.com/rakyll/statik/blob/master/LICENSE) diff --git a/services/collectd/service.go b/services/collectd/service.go index aa11aae7d0..7f09d5adf8 100644 --- a/services/collectd/service.go +++ b/services/collectd/service.go @@ -8,14 +8,16 @@ import ( "net" "os" "path/filepath" + "strings" "sync" "sync/atomic" "time" + "collectd.org/api" + "collectd.org/network" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/tsdb" - "github.com/kimor79/gollectd" ) // statistics gathered by the collectd service. @@ -40,6 +42,16 @@ type metaClient interface { CreateDatabase(name string) (*meta.DatabaseInfo, error) } +// Reads a collectd types db from a file. +func TypesDBFile(path string) (typesdb *api.TypesDB, err error) { + var reader *os.File + reader, err = os.Open(path) + if err == nil { + typesdb, err = api.NewTypesDB(reader) + } + return +} + // Service represents a UDP server which receives metrics in collectd's binary // protocol and stores them in InfluxDB. type Service struct { @@ -51,7 +63,7 @@ type Service struct { wg sync.WaitGroup conn *net.UDPConn batcher *tsdb.PointBatcher - typesdb gollectd.Types + popts network.ParseOpts addr net.Addr mu sync.RWMutex @@ -97,12 +109,12 @@ func (s *Service) Open() error { return fmt.Errorf("PointsWriter is nil") } - if s.typesdb == nil { + if s.popts.TypesDB == nil { // Open collectd types. if stat, err := os.Stat(s.Config.TypesDB); err != nil { return fmt.Errorf("Stat(): %s", err) } else if stat.IsDir() { - alltypesdb := make(gollectd.Types) + alltypesdb := new(api.TypesDB) var readdir func(path string) readdir = func(path string) { files, err := ioutil.ReadDir(path) @@ -119,31 +131,24 @@ func (s *Service) Open() error { } s.Logger.Printf("Loading %s\n", fullpath) - types, err := gollectd.TypesDBFile(fullpath) + types, err := TypesDBFile(fullpath) if err != nil { s.Logger.Printf("Unable to parse collectd types file: %s\n", f.Name()) continue } - for k, t := range types { - a, ok := alltypesdb[k] - if ok { - alltypesdb[k] = t - } else { - alltypesdb[k] = append(a, t...) - } - } + alltypesdb.Merge(types) } } readdir(s.Config.TypesDB) - s.typesdb = alltypesdb + s.popts.TypesDB = alltypesdb } else { s.Logger.Printf("Loading %s\n", s.Config.TypesDB) - typesdb, err := gollectd.TypesDBFile(s.Config.TypesDB) + types, err := TypesDBFile(s.Config.TypesDB) if err != nil { return fmt.Errorf("Open(): %s", err) } - s.typesdb = typesdb + s.popts.TypesDB = types } } // Resolve our address. @@ -278,7 +283,8 @@ func (s *Service) Statistics(tags map[string]string) []models.Statistic { // SetTypes sets collectd types db. func (s *Service) SetTypes(types string) (err error) { - s.typesdb, err = gollectd.TypesDB([]byte(types)) + reader := strings.NewReader(types) + s.popts.TypesDB, err = api.NewTypesDB(reader) return } @@ -321,14 +327,14 @@ func (s *Service) serve() { } func (s *Service) handleMessage(buffer []byte) { - packets, err := gollectd.Packets(buffer, s.typesdb) + valueLists, err := network.Parse(buffer, s.popts) if err != nil { atomic.AddInt64(&s.stats.PointsParseFail, 1) s.Logger.Printf("Collectd parse error: %s", err) return } - for _, packet := range *packets { - points := s.UnmarshalCollectd(&packet) + for _, valueList := range valueLists { + points := s.UnmarshalValueList(&valueList) for _, p := range points { s.batcher.In() <- p } @@ -359,41 +365,38 @@ func (s *Service) writePoints() { } } -// Unmarshal translates a collectd packet into InfluxDB data points. -func (s *Service) UnmarshalCollectd(packet *gollectd.Packet) []models.Point { - // Prefer high resolution timestamp. - var timestamp time.Time - if packet.TimeHR > 0 { - // TimeHR is "near" nanosecond measurement, but not exactly nanasecond time - // Since we store time in microseconds, we round here (mostly so tests will work easier) - sec := packet.TimeHR >> 30 - // Shifting, masking, and dividing by 1 billion to get nanoseconds. - nsec := ((packet.TimeHR & 0x3FFFFFFF) << 30) / 1000 / 1000 / 1000 - timestamp = time.Unix(int64(sec), int64(nsec)).UTC().Round(time.Microsecond) - } else { - // If we don't have high resolution time, fall back to basic unix time - timestamp = time.Unix(int64(packet.Time), 0).UTC() - } +// Unmarshal translates a ValueList into InfluxDB data points. +func (s *Service) UnmarshalValueList(vl *api.ValueList) []models.Point { + timestamp := vl.Time.UTC() var points []models.Point - for i := range packet.Values { - name := fmt.Sprintf("%s_%s", packet.Plugin, packet.Values[i].Name) + for i := range vl.Values { + var name string + name = fmt.Sprintf("%s_%s", vl.Identifier.Plugin, vl.DSName(i)) tags := make(map[string]string) fields := make(map[string]interface{}) - fields["value"] = packet.Values[i].Value + // Convert interface back to actual type, then to float64 + switch value := vl.Values[i].(type) { + case api.Gauge: + fields["value"] = float64(value) + case api.Derive: + fields["value"] = float64(value) + case api.Counter: + fields["value"] = float64(value) + } - if packet.Hostname != "" { - tags["host"] = packet.Hostname + if vl.Identifier.Host != "" { + tags["host"] = vl.Identifier.Host } - if packet.PluginInstance != "" { - tags["instance"] = packet.PluginInstance + if vl.Identifier.PluginInstance != "" { + tags["instance"] = vl.Identifier.PluginInstance } - if packet.Type != "" { - tags["type"] = packet.Type + if vl.Identifier.Type != "" { + tags["type"] = vl.Identifier.Type } - if packet.TypeInstance != "" { - tags["type_instance"] = packet.TypeInstance + if vl.Identifier.TypeInstance != "" { + tags["type_instance"] = vl.Identifier.TypeInstance } // Drop invalid points