fix #2814: hook collectd service back up

pull/2837/head
David Norton 2015-06-08 22:44:42 -04:00
parent 0bc20ec622
commit d5f52333a1
7 changed files with 783 additions and 586 deletions

View File

@ -139,6 +139,7 @@ func (s *Server) appendCollectdService(c collectd.Config) {
return
}
srv := collectd.NewService(c)
srv.PointsWriter = s.PointsWriter
s.Services = append(s.Services, srv)
}

View File

@ -1,215 +0,0 @@
package collectd
import (
"errors"
"fmt"
"log"
"net"
"sync"
"time"
"github.com/influxdb/influxdb/tsdb"
"github.com/kimor79/gollectd"
)
// DefaultPort for collectd is 25826
const DefaultPort = 25826
// SeriesWriter defines the interface for the destination of the data.
type SeriesWriter interface {
WriteSeries(database, retentionPolicy string, points []tsdb.Point) (uint64, error)
}
// Server represents a UDP server which receives metrics in collectd's binary
// protocol and stores them in InfluxDB.
type Server struct {
wg sync.WaitGroup
done chan struct{}
conn *net.UDPConn
writer SeriesWriter
Database string
typesdb gollectd.Types
typesdbpath string
BatchSize int
BatchTimeout time.Duration
batcher *tsdb.PointBatcher
}
// NewServer constructs a new Server.
func NewServer(w SeriesWriter, typesDBPath string) *Server {
s := Server{
done: make(chan struct{}),
writer: w,
typesdbpath: typesDBPath,
typesdb: make(gollectd.Types),
}
return &s
}
// ListenAndServe starts starts receiving collectd metrics via UDP and writes
// the received data points into the server's SeriesWriter. The serving
// goroutine is only stopped when s.Close() is called, but ListenAndServe
// returns immediately.
func ListenAndServe(s *Server, iface string) error {
if iface == "" { // Make sure we have an address
return errors.New("bind address required")
} else if s.Database == "" { // Make sure they have a database
return errors.New("database was not specified in config")
}
addr, err := net.ResolveUDPAddr("udp", iface)
if err != nil {
return fmt.Errorf("unable to resolve UDP address: %v", err)
}
s.typesdb, err = gollectd.TypesDBFile(s.typesdbpath)
if err != nil {
return fmt.Errorf("unable to parse typesDBFile: %v", err)
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return fmt.Errorf("unable to listen on UDP: %v", err)
}
s.conn = conn
s.batcher = tsdb.NewPointBatcher(s.BatchSize, s.BatchTimeout)
s.batcher.Start()
s.wg.Add(2)
go s.serve()
go s.writePoints()
return nil
}
func (s *Server) serve() {
defer s.wg.Done()
// From https://collectd.org/wiki/index.php/Binary_protocol
// 1024 bytes (payload only, not including UDP / IP headers)
// In versions 4.0 through 4.7, the receive buffer has a fixed size
// of 1024 bytes. When longer packets are received, the trailing data
// is simply ignored. Since version 4.8, the buffer size can be
// configured. Version 5.0 will increase the default buffer size to
// 1452 bytes (the maximum payload size when using UDP/IPv6 over
// Ethernet).
buffer := make([]byte, 1452)
for {
select {
case <-s.done:
// We closed the connection, time to go.
return
default:
// Keep processing.
}
n, _, err := s.conn.ReadFromUDP(buffer)
if err != nil {
log.Printf("Collectd ReadFromUDP error: %s", err)
continue
}
if n > 0 {
s.handleMessage(buffer[:n])
}
}
}
func (s *Server) handleMessage(buffer []byte) {
packets, err := gollectd.Packets(buffer, s.typesdb)
if err != nil {
log.Printf("Collectd parse error: %s", err)
return
}
for _, packet := range *packets {
points := Unmarshal(&packet)
for _, p := range points {
s.batcher.In() <- p
}
}
}
func (s *Server) writePoints() {
defer s.wg.Done()
for {
select {
case <-s.done:
return
case batch := <-s.batcher.Out():
_, err := s.writer.WriteSeries(s.Database, "", batch)
if err != nil {
log.Printf("Collectd cannot write data: %s", err)
continue
}
}
}
}
// Close shuts down the server's listeners.
func (s *Server) Close() error {
if s.conn == nil {
return errors.New("server already closed")
}
// Close the connection, and wait for the goroutine to exit.
s.conn.Close()
s.batcher.Flush()
close(s.done)
s.wg.Wait()
// Release all remaining resources.
s.done = nil
s.conn = nil
log.Println("collectd UDP closed")
return nil
}
// Unmarshal translates a collectd packet into InfluxDB data points.
func Unmarshal(packet *gollectd.Packet) []tsdb.Point {
// Prefer high resolution timestamp.
var timestamp time.Time
if packet.TimeHR > 0 {
// TimeHR is "near" nanosecond measurement, but not exactly nanasecond time
// Since we store time in microseconds, we round here (mostly so tests will work easier)
sec := packet.TimeHR >> 30
// Shifting, masking, and dividing by 1 billion to get nanoseconds.
nsec := ((packet.TimeHR & 0x3FFFFFFF) << 30) / 1000 / 1000 / 1000
timestamp = time.Unix(int64(sec), int64(nsec)).UTC().Round(time.Microsecond)
} else {
// If we don't have high resolution time, fall back to basic unix time
timestamp = time.Unix(int64(packet.Time), 0).UTC()
}
var points []tsdb.Point
for i := range packet.Values {
name := fmt.Sprintf("%s_%s", packet.Plugin, packet.Values[i].Name)
tags := make(map[string]string)
fields := make(map[string]interface{})
fields["value"] = packet.Values[i].Value
if packet.Hostname != "" {
tags["host"] = packet.Hostname
}
if packet.PluginInstance != "" {
tags["instance"] = packet.PluginInstance
}
if packet.Type != "" {
tags["type"] = packet.Type
}
if packet.TypeInstance != "" {
tags["type_instance"] = packet.TypeInstance
}
p := tsdb.NewPoint(name, tags, fields, timestamp)
points = append(points, p)
}
return points
}

View File

@ -1,356 +0,0 @@
package collectd_test
import (
"encoding/hex"
"fmt"
"net"
"testing"
"time"
"github.com/influxdb/influxdb/services/collectd"
"github.com/influxdb/influxdb/tsdb"
"github.com/kimor79/gollectd"
)
type testServer string
type serverResponses []serverResponse
type serverResponse struct {
database string
retentionPolicy string
points []tsdb.Point
}
var responses = make(chan *serverResponse, 1024)
func (testServer) WriteSeries(database, retentionPolicy string, points []tsdb.Point) (uint64, error) {
responses <- &serverResponse{
database: database,
retentionPolicy: retentionPolicy,
points: points,
}
return 0, nil
}
func (testServer) ResponseN(n int) ([]*serverResponse, error) {
var a []*serverResponse
for {
select {
case r := <-responses:
a = append(a, r)
if len(a) == n {
return a, nil
}
case <-time.After(time.Second):
return a, fmt.Errorf("unexpected response count: expected: %d, actual: %d", n, len(a))
}
}
}
func TestServer_ListenAndServe_ErrBindAddressRequired(t *testing.T) {
var (
ts testServer
s = collectd.NewServer(ts, "foo")
)
e := collectd.ListenAndServe(s, "")
if e == nil {
t.Fatalf("expected an error, got %v", e)
}
}
func TestServer_ListenAndServe_ErrDatabaseNotSpecified(t *testing.T) {
var (
ts testServer
s = collectd.NewServer(ts, "foo")
)
e := collectd.ListenAndServe(s, "127.0.0.1:25826")
if e == nil {
t.Fatalf("expected an error, got %v", e)
}
}
func TestServer_ListenAndServe_ErrCouldNotParseTypesDBFile(t *testing.T) {
var (
ts testServer
s = collectd.NewServer(ts, "foo")
)
s.Database = "foo"
e := collectd.ListenAndServe(s, "127.0.0.1:25829")
if e == nil {
t.Fatalf("expected an error, got %v", e)
}
}
func TestServer_ListenAndServe_Success(t *testing.T) {
var (
ts testServer
// You can typically find this on your mac here: "/usr/local/Cellar/collectd/5.4.1/share/collectd/types.db"
s = collectd.NewServer(ts, "./collectd_test.conf")
)
s.Database = "counter"
e := collectd.ListenAndServe(s, "127.0.0.1:25830")
defer s.Close()
if e != nil {
t.Fatalf("err does not match. expected %v, got %v", nil, e)
}
}
func TestServer_Close_ErrServerClosed(t *testing.T) {
var (
ts testServer
// You can typically find this on your mac here: "/usr/local/Cellar/collectd/5.4.1/share/collectd/types.db"
s = collectd.NewServer(ts, "./collectd_test.conf")
)
s.Database = "counter"
e := collectd.ListenAndServe(s, "127.0.0.1:25830")
if e != nil {
t.Fatalf("err does not match. expected %v, got %v", nil, e)
}
s.Close()
e = s.Close()
if e == nil {
t.Fatalf("expected an error, got %v", e)
}
}
func TestServer_ListenAndServe_ErrResolveUDPAddr(t *testing.T) {
var (
ts testServer
s = collectd.NewServer(ts, "./collectd_test.conf")
)
s.Database = "counter"
e := collectd.ListenAndServe(s, "foo")
if e == nil {
t.Fatalf("expected an error, got %v", e)
}
}
func TestServer_ListenAndServe_ErrListenUDP(t *testing.T) {
var (
ts testServer
s = collectd.NewServer(ts, "./collectd_test.conf")
)
//Open a udp listener on the port prior to force it to err
addr, _ := net.ResolveUDPAddr("udp", "127.0.0.1:25826")
conn, _ := net.ListenUDP("udp", addr)
defer conn.Close()
s.Database = "counter"
e := collectd.ListenAndServe(s, "127.0.0.1:25826")
if e == nil {
t.Fatalf("expected an error, got %v", e)
}
}
func TestServer_Serve_Success(t *testing.T) {
// clear any previous responses
var (
ts testServer
// You can typically find this on your mac here: "/usr/local/Cellar/collectd/5.4.1/share/collectd/types.db"
s = collectd.NewServer(ts, "./collectd_test.conf")
addr = "127.0.0.1:25830"
)
s.Database = "counter"
e := collectd.ListenAndServe(s, addr)
defer s.Close()
if e != nil {
t.Fatalf("err does not match. expected %v, got %v", nil, e)
}
conn, e := net.Dial("udp", addr)
defer conn.Close()
if e != nil {
t.Fatalf("err does not match. expected %v, got %v", nil, e)
}
buf, e := hex.DecodeString("0000000e6c6f63616c686f7374000008000c1512b2e40f5da16f0009000c00000002800000000002000e70726f636573736573000004000d70735f7374617465000005000c72756e6e696e67000006000f000101000000000000f03f0008000c1512b2e40f5db90f0005000d736c656570696e67000006000f0001010000000000c06f400008000c1512b2e40f5dc4a40005000c7a6f6d62696573000006000f00010100000000000000000008000c1512b2e40f5de10b0005000c73746f70706564000006000f00010100000000000000000008000c1512b2e40f5deac20005000b706167696e67000006000f00010100000000000000000008000c1512b2e40f5df59b0005000c626c6f636b6564000006000f00010100000000000000000008000c1512b2e40f7ee0610004000e666f726b5f726174650000050005000006000f000102000000000004572f0008000c1512b2e68e0635e6000200086370750000030006300000040008637075000005000975736572000006000f0001020000000000204f9c0008000c1512b2e68e0665d6000500096e696365000006000f000102000000000000caa30008000c1512b2e68e06789c0005000b73797374656d000006000f00010200000000000607050008000c1512b2e68e06818e0005000969646c65000006000f0001020000000003b090ae0008000c1512b2e68e068bcf0005000977616974000006000f000102000000000000f6810008000c1512b2e68e069c7d0005000e696e74657272757074000006000f000102000000000000001d0008000c1512b2e68e069fec0005000c736f6674697271000006000f0001020000000000000a2a0008000c1512b2e68e06a2b20005000a737465616c000006000f00010200000000000000000008000c1512b2e68e0708d60003000631000005000975736572000006000f00010200000000001d48c60008000c1512b2e68e070c16000500096e696365000006000f0001020000000000007fe60008000c1512b2e68e0710790005000b73797374656d000006000f00010200000000000667890008000c1512b2e68e0713bb0005000969646c65000006000f00010200000000025d0e470008000c1512b2e68e0717790005000977616974000006000f000102000000000002500e0008000c1512b2e68e071bc00005000e696e74657272757074000006000f00010200000000000000000008000c1512b2e68e071f800005000c736f6674697271000006000f00010200000000000006050008000c1512b2e68e07221e0005000a737465616c000006000f00010200000000000000000008000c1512b2e68e0726eb0003000632000005000975736572000006000f00010200000000001ff3e40008000c1512b2e68e0728cb000500096e696365000006000f000102000000000000ca210008000c1512b2e68e072ae70005000b73797374656d000006000f000102000000000006eabe0008000c1512b2e68e072f2f0005000977616974000006000f000102000000000000c1300008000c1512b2e68e072ccb0005000969646c65000006000f00010200000000025b5abb0008000c1512b2e68e07312c0005000e696e74657272757074000006000f00010200000000000000070008000c1512b2e68e0733520005000c736f6674697271000006000f00010200000000000007260008000c1512b2e68e0735b60005000a737465616c000006000f00010200000000000000000008000c1512b2e68e07828d0003000633000005000975736572000006000f000102000000000020f50a0008000c1512b2e68e0787ac000500096e696365000006000f0001020000000000008368")
if e != nil {
t.Fatalf("err from hex.DecodeString does not match. expected %v, got %v", nil, e)
}
_, e = conn.Write(buf)
if e != nil {
t.Fatalf("err does not match. expected %v, got %v", nil, e)
}
if _, err := ts.ResponseN(33); err != nil {
t.Fatal(err)
}
}
func TestUnmarshal_Points(t *testing.T) {
/*
This is a sample of what data can be represented like in json
[
{
"values": [197141504, 175136768],
"dstypes": ["counter", "counter"],
"dsnames": ["read", "write"],
"time": 1251533299,
"interval": 10,
"host": "leeloo.lan.home.verplant.org",
"plugin": "disk",
"plugin_instance": "sda",
"type": "disk_octets",
"type_instance": ""
},
]
*/
var tests = []struct {
name string
packet gollectd.Packet
points []tsdb.Point
}{
{
name: "single value",
packet: gollectd.Packet{
Plugin: "disk",
Values: []gollectd.Value{
{Name: "read", Value: 1},
},
},
points: []tsdb.Point{
tsdb.NewPoint("disk_read", nil, map[string]interface{}{"value": float64(1)}, time.Unix(0, 0)),
},
},
{
name: "multi value",
packet: gollectd.Packet{
Plugin: "disk",
Values: []gollectd.Value{
{Name: "read", Value: 1},
{Name: "write", Value: 5},
},
},
points: []tsdb.Point{
tsdb.NewPoint("disk_read", nil, map[string]interface{}{"value": float64(1)}, time.Unix(0, 0)),
tsdb.NewPoint("disk_write", nil, map[string]interface{}{"value": float64(5)}, time.Unix(0, 0)),
},
},
{
name: "tags",
packet: gollectd.Packet{
Plugin: "disk",
Hostname: "server01",
PluginInstance: "sdk",
Type: "disk_octets",
TypeInstance: "single",
Values: []gollectd.Value{
{Name: "read", Value: 1},
},
},
points: []tsdb.Point{
tsdb.NewPoint("disk_read",
map[string]string{"host": "server01", "instance": "sdk", "type": "disk_octets", "type_instance": "single"},
map[string]interface{}{"value": float64(1)},
time.Unix(0, 0)),
},
},
}
for _, test := range tests {
t.Logf("testing %q", test.name)
points := collectd.Unmarshal(&test.packet)
if len(points) != len(test.points) {
t.Errorf("points len mismatch. expected %d, got %d", len(test.points), len(points))
}
for i, m := range test.points {
// test name
name := fmt.Sprintf("%s_%s", test.packet.Plugin, test.packet.Values[i].Name)
if m.Name() != name {
t.Errorf("point name mismatch. expected %q, got %q", name, m.Name())
}
// test value
mv := m.Fields()["value"].(float64)
pv := test.packet.Values[i].Value
if mv != pv {
t.Errorf("point value mismatch. expected %v, got %v", pv, mv)
}
// test tags
if test.packet.Hostname != m.Tags()["host"] {
t.Errorf(`point tags["host"] mismatch. expected %q, got %q`, test.packet.Hostname, m.Tags()["host"])
}
if test.packet.PluginInstance != m.Tags()["instance"] {
t.Errorf(`point tags["instance"] mismatch. expected %q, got %q`, test.packet.PluginInstance, m.Tags()["instance"])
}
if test.packet.Type != m.Tags()["type"] {
t.Errorf(`point tags["type"] mismatch. expected %q, got %q`, test.packet.Type, m.Tags()["type"])
}
if test.packet.TypeInstance != m.Tags()["type_instance"] {
t.Errorf(`point tags["type_instance"] mismatch. expected %q, got %q`, test.packet.TypeInstance, m.Tags()["type_instance"])
}
}
}
}
func TestUnmarshal_Time(t *testing.T) {
// Its important to remember that collectd stores high resolution time
// as "near" nanoseconds (2^30) so we have to take that into account
// when feeding time into the test.
// Since we only store microseconds, we round it off (mostly to make testing easier)
testTime := time.Now().UTC().Round(time.Microsecond)
var timeHR = func(tm time.Time) uint64 {
sec, nsec := tm.Unix(), tm.UnixNano()%1000000000
hr := (sec << 30) + (nsec * 1000000000 / 1073741824)
return uint64(hr)
}
var tests = []struct {
name string
packet gollectd.Packet
points []tsdb.Point
}{
{
name: "Should parse timeHR properly",
packet: gollectd.Packet{
TimeHR: timeHR(testTime),
Values: []gollectd.Value{
{
Value: 1,
},
},
},
points: []tsdb.Point{
tsdb.NewPoint("", nil, nil, testTime),
},
},
{
name: "Should parse time properly",
packet: gollectd.Packet{
Time: uint64(testTime.Round(time.Second).Unix()),
Values: []gollectd.Value{
{
Value: 1,
},
},
},
points: []tsdb.Point{
tsdb.NewPoint("", nil, nil, testTime.Round(time.Second)),
},
},
}
for _, test := range tests {
t.Logf("testing %q", test.name)
points := collectd.Unmarshal(&test.packet)
if len(points) != len(test.points) {
t.Errorf("point len mismatch. expected %d, got %d", len(test.points), len(points))
}
for _, p := range points {
if test.packet.TimeHR > 0 {
if p.Time().Format(time.RFC3339Nano) != testTime.Format(time.RFC3339Nano) {
t.Errorf("time mis-match, got %v, expected %v", p.Time().Format(time.RFC3339Nano), testTime.Format(time.RFC3339Nano))
} else if p.Time().Format(time.RFC3339) != testTime.Format(time.RFC3339) {
t.Errorf("time mis-match, got %v, expected %v", p.Time().Format(time.RFC3339), testTime.Format(time.RFC3339))
}
}
}
}
}

View File

@ -1,12 +1,43 @@
package collectd
import (
"time"
)
const (
DefaultBindAddress = ":25826"
DefaultDatabase = "collectd"
DefaultRetentionPolicy = ""
DefaultBatchSize = 5000
DefaultBatchDuration = 10 * time.Second
DefaultTypesDB = "/usr/share/collectd/types.db"
)
// Config represents a configuration for the collectd service.
type Config struct {
Enabled bool `toml:"enabled"`
BindAddress string `toml:"bind-address"`
Database string `toml:"database"`
TypesDB string `toml:"typesdb"`
Enabled bool `toml:"enabled"`
BindAddress string `toml:"bind-address"`
Database string `toml:"database"`
RetentionPolicy string `toml:"retention-policy"`
BatchSize int `toml:"batch-size"`
BatchDuration time.Duration `toml:"batch-timeout"`
TypesDB string `toml:"typesdb"`
}
// NewConfig returns a new instance of Config with defaults.
func NewConfig() Config {
return Config{}
return Config{
Enabled: false,
BindAddress: DefaultBindAddress,
Database: DefaultDatabase,
RetentionPolicy: DefaultRetentionPolicy,
BatchSize: DefaultBatchSize,
BatchDuration: DefaultBatchDuration,
TypesDB: DefaultTypesDB,
}
}

View File

@ -1,11 +1,241 @@
package collectd
import "net"
import (
"fmt"
"log"
"net"
"os"
"sync"
"time"
type Service struct{}
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/tsdb"
"github.com/kimor79/gollectd"
)
func NewService(c Config) *Service { return &Service{} }
// pointsWriter is an internal interface to make testing easier.
type pointsWriter interface {
WritePoints(p *cluster.WritePointsRequest) error
}
func (s *Service) Open() error { return nil }
func (s *Service) Close() error { return nil }
func (s *Service) Addr() net.Addr { return nil }
// Service represents a UDP server which receives metrics in collectd's binary
// protocol and stores them in InfluxDB.
type Service struct {
Config *Config
PointsWriter pointsWriter
Logger *log.Logger
wg sync.WaitGroup
err chan error
stop chan struct{}
ln *net.UDPConn
batcher *tsdb.PointBatcher
typesdb gollectd.Types
addr net.Addr
}
// NewService returns a new instance of the collectd service.
func NewService(c Config) *Service {
s := &Service{
Config: &c,
Logger: log.New(os.Stderr, "[collectd] ", log.LstdFlags),
err: make(chan error),
}
return s
}
// Open starts the service.
func (s *Service) Open() error {
assert(s.Config.BindAddress != "", "BindAddress is blank")
assert(s.Config.Database != "", "Database name is blank")
assert(s.PointsWriter != nil, "PointsWriter is nil")
if s.typesdb == nil {
// Open collectd types.
typesdb, err := gollectd.TypesDBFile(s.Config.TypesDB)
if err != nil {
return fmt.Errorf("Open(): %s", err)
}
s.typesdb = typesdb
}
// Resolve our address.
addr, err := net.ResolveUDPAddr("udp", s.Config.BindAddress)
if err != nil {
return fmt.Errorf("unable to resolve UDP address: %s", err)
}
s.addr = addr
// Start listening
ln, err := net.ListenUDP("udp", addr)
if err != nil {
return fmt.Errorf("unable to listen on UDP: %s", err)
}
s.ln = ln
// Start the points batcher.
s.batcher = tsdb.NewPointBatcher(s.Config.BatchSize, s.Config.BatchDuration)
s.batcher.Start()
// Create channel and wait group for signalling goroutines to stop.
s.stop = make(chan struct{})
s.wg.Add(2)
// Start goroutines that process collectd packets.
go s.serve()
go s.writePoints()
s.Logger.Println("collectd UDP started")
return nil
}
// Close stops the service.
func (s *Service) Close() error {
// Close the connection, and wait for the goroutine to exit.
close(s.stop)
s.ln.Close()
s.batcher.Stop()
s.wg.Wait()
// Release all remaining resources.
s.stop = nil
s.ln = nil
s.batcher = nil
s.Logger.Println("collectd UDP closed")
return nil
}
// SetTypes sets collectd types db.
func (s *Service) SetTypes(types string) (err error) {
s.typesdb, err = gollectd.TypesDB([]byte(types))
return
}
// Err returns a channel for fatal errors that occur on go routines.
func (s *Service) Err() chan error { return s.err }
// Addr returns the listener's address. Returns nil if listener is closed.
func (s *Service) Addr() net.Addr {
return s.ln.LocalAddr()
}
func (s *Service) serve() {
defer s.wg.Done()
// From https://collectd.org/wiki/index.php/Binary_protocol
// 1024 bytes (payload only, not including UDP / IP headers)
// In versions 4.0 through 4.7, the receive buffer has a fixed size
// of 1024 bytes. When longer packets are received, the trailing data
// is simply ignored. Since version 4.8, the buffer size can be
// configured. Version 5.0 will increase the default buffer size to
// 1452 bytes (the maximum payload size when using UDP/IPv6 over
// Ethernet).
buffer := make([]byte, 1452)
for {
select {
case <-s.stop:
// We closed the connection, time to go.
return
default:
// Keep processing.
}
n, _, err := s.ln.ReadFromUDP(buffer)
if err != nil {
s.Logger.Printf("collectd ReadFromUDP error: %s", err)
continue
}
if n > 0 {
s.handleMessage(buffer[:n])
}
}
}
func (s *Service) handleMessage(buffer []byte) {
packets, err := gollectd.Packets(buffer, s.typesdb)
if err != nil {
s.Logger.Printf("Collectd parse error: %s", err)
return
}
for _, packet := range *packets {
points := Unmarshal(&packet)
for _, p := range points {
s.batcher.In() <- p
}
}
}
func (s *Service) writePoints() {
defer s.wg.Done()
for {
select {
case <-s.stop:
return
case batch := <-s.batcher.Out():
req := &cluster.WritePointsRequest{
Database: s.Config.Database,
RetentionPolicy: s.Config.RetentionPolicy,
ConsistencyLevel: cluster.ConsistencyLevelAny,
Points: batch,
}
if err := s.PointsWriter.WritePoints(req); err != nil {
s.Logger.Printf("failed to write batch: %s", err)
continue
}
}
}
}
// Unmarshal translates a collectd packet into InfluxDB data points.
func Unmarshal(packet *gollectd.Packet) []tsdb.Point {
// Prefer high resolution timestamp.
var timestamp time.Time
if packet.TimeHR > 0 {
// TimeHR is "near" nanosecond measurement, but not exactly nanasecond time
// Since we store time in microseconds, we round here (mostly so tests will work easier)
sec := packet.TimeHR >> 30
// Shifting, masking, and dividing by 1 billion to get nanoseconds.
nsec := ((packet.TimeHR & 0x3FFFFFFF) << 30) / 1000 / 1000 / 1000
timestamp = time.Unix(int64(sec), int64(nsec)).UTC().Round(time.Microsecond)
} else {
// If we don't have high resolution time, fall back to basic unix time
timestamp = time.Unix(int64(packet.Time), 0).UTC()
}
var points []tsdb.Point
for i := range packet.Values {
name := fmt.Sprintf("%s_%s", packet.Plugin, packet.Values[i].Name)
tags := make(map[string]string)
fields := make(map[string]interface{})
fields["value"] = packet.Values[i].Value
if packet.Hostname != "" {
tags["host"] = packet.Hostname
}
if packet.PluginInstance != "" {
tags["instance"] = packet.PluginInstance
}
if packet.Type != "" {
tags["type"] = packet.Type
}
if packet.TypeInstance != "" {
tags["type_instance"] = packet.TypeInstance
}
p := tsdb.NewPoint(name, tags, fields, timestamp)
points = append(points, p)
}
return points
}
// assert will panic with a given formatted message if the given condition is false.
func assert(condition bool, msg string, v ...interface{}) {
if !condition {
panic(fmt.Sprintf("assert failed: "+msg, v...))
}
}

View File

@ -0,0 +1,457 @@
package collectd
import (
"encoding/hex"
"errors"
"io/ioutil"
"log"
"net"
"testing"
"time"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/tsdb"
)
// Test that the collectd service correctly batches points by BatchSize.
func TestService_BatchSize(t *testing.T) {
t.Parallel()
// Raw data sent by collectd, captured using Wireshark.
testData, err := hex.DecodeString("000000167066312d36322d3231302d39342d313733000001000c00000000544928ff0007000c00000000000000050002000c656e74726f7079000004000c656e74726f7079000006000f0001010000000000007240000200086370750000030006310000040008637075000005000969646c65000006000f0001000000000000a674620005000977616974000006000f0001000000000000000000000200076466000003000500000400076466000005000d6c6976652d636f7700000600180002010100000000a090b641000000a0cb6a2742000200086370750000030006310000040008637075000005000e696e74657272757074000006000f00010000000000000000fe0005000c736f6674697271000006000f000100000000000000000000020007646600000300050000040007646600000500096c6976650000060018000201010000000000000000000000e0ec972742000200086370750000030006310000040008637075000005000a737465616c000006000f00010000000000000000000003000632000005000975736572000006000f0001000000000000005f36000500096e696365000006000f0001000000000000000ad80002000e696e746572666163650000030005000004000e69665f6f6374657473000005000b64756d6d79300000060018000200000000000000000000000000000000041a000200076466000004000764660000050008746d70000006001800020101000000000000f240000000a0ea972742000200086370750000030006320000040008637075000005000b73797374656d000006000f00010000000000000045d30002000e696e746572666163650000030005000004000f69665f7061636b657473000005000b64756d6d79300000060018000200000000000000000000000000000000000f000200086370750000030006320000040008637075000005000969646c65000006000f0001000000000000a66480000200076466000003000500000400076466000005000d72756e2d6c6f636b000006001800020101000000000000000000000000000054410002000e696e74657266616365000004000e69665f6572726f7273000005000b64756d6d793000000600180002000000000000000000000000000000000000000200086370750000030006320000040008637075000005000977616974000006000f00010000000000000000000005000e696e74657272757074000006000f0001000000000000000132")
check(err)
totalPoints := 26
// Batch sizes that totalTestPoints divide evenly by.
batchSizes := []int{1, 2, 13}
for _, batchSize := range batchSizes {
func() {
s := newTestService(batchSize, time.Second)
pointCh := make(chan tsdb.Point)
s.PointsWriter.WritePointsFn = func(req *cluster.WritePointsRequest) error {
if len(req.Points) != batchSize {
t.Errorf("\n\texp = %d\n\tgot = %d\n", batchSize, len(req.Points))
}
for _, p := range req.Points {
pointCh <- p
}
return nil
}
if err := s.Open(); err != nil {
t.Fatal(err)
}
defer func() { t.Log("closing service"); s.Close() }()
// Get the address & port the service is listening on for collectd data.
addr := s.Addr()
conn, err := net.Dial("udp", addr.String())
if err != nil {
t.Fatal(err)
}
// Send the test data to the service.
if n, err := conn.Write(testData); err != nil {
t.Fatal(err)
} else if n != len(testData) {
t.Fatalf("only sent %d of %d bytes", n, len(testData))
}
points := []tsdb.Point{}
Loop:
for {
select {
case p := <-pointCh:
points = append(points, p)
if len(points) == totalPoints {
break Loop
}
case <-time.After(time.Second):
t.Logf("exp %d points, got %d", totalPoints, len(points))
t.Fatal("timed out waiting for points from collectd service")
}
}
if len(points) != totalPoints {
t.Fatalf("exp %d points, got %d", totalPoints, len(points))
}
for i, exp := range expPoints {
got := points[i].String()
if got != exp {
t.Fatalf("\n\texp = %s\n\tgot = %s\n", exp, got)
}
}
}()
}
}
// Test that the collectd service correctly batches points using BatchDuration.
func TestService_BatchDuration(t *testing.T) {
t.Parallel()
// Raw data sent by collectd, captured using Wireshark.
testData, err := hex.DecodeString("000000167066312d36322d3231302d39342d313733000001000c00000000544928ff0007000c00000000000000050002000c656e74726f7079000004000c656e74726f7079000006000f0001010000000000007240000200086370750000030006310000040008637075000005000969646c65000006000f0001000000000000a674620005000977616974000006000f0001000000000000000000000200076466000003000500000400076466000005000d6c6976652d636f7700000600180002010100000000a090b641000000a0cb6a2742000200086370750000030006310000040008637075000005000e696e74657272757074000006000f00010000000000000000fe0005000c736f6674697271000006000f000100000000000000000000020007646600000300050000040007646600000500096c6976650000060018000201010000000000000000000000e0ec972742000200086370750000030006310000040008637075000005000a737465616c000006000f00010000000000000000000003000632000005000975736572000006000f0001000000000000005f36000500096e696365000006000f0001000000000000000ad80002000e696e746572666163650000030005000004000e69665f6f6374657473000005000b64756d6d79300000060018000200000000000000000000000000000000041a000200076466000004000764660000050008746d70000006001800020101000000000000f240000000a0ea972742000200086370750000030006320000040008637075000005000b73797374656d000006000f00010000000000000045d30002000e696e746572666163650000030005000004000f69665f7061636b657473000005000b64756d6d79300000060018000200000000000000000000000000000000000f000200086370750000030006320000040008637075000005000969646c65000006000f0001000000000000a66480000200076466000003000500000400076466000005000d72756e2d6c6f636b000006001800020101000000000000000000000000000054410002000e696e74657266616365000004000e69665f6572726f7273000005000b64756d6d793000000600180002000000000000000000000000000000000000000200086370750000030006320000040008637075000005000977616974000006000f00010000000000000000000005000e696e74657272757074000006000f0001000000000000000132")
check(err)
totalPoints := len(expPoints)
s := newTestService(5000, 250*time.Millisecond)
pointCh := make(chan tsdb.Point, 1000)
s.PointsWriter.WritePointsFn = func(req *cluster.WritePointsRequest) error {
for _, p := range req.Points {
pointCh <- p
}
return nil
}
if err := s.Open(); err != nil {
t.Fatal(err)
}
defer func() { t.Log("closing service"); s.Close() }()
// Get the address & port the service is listening on for collectd data.
addr := s.Addr()
conn, err := net.Dial("udp", addr.String())
if err != nil {
t.Fatal(err)
}
// Send the test data to the service.
if n, err := conn.Write(testData); err != nil {
t.Fatal(err)
} else if n != len(testData) {
t.Fatalf("only sent %d of %d bytes", n, len(testData))
}
points := []tsdb.Point{}
Loop:
for {
select {
case p := <-pointCh:
points = append(points, p)
if len(points) == totalPoints {
break Loop
}
case <-time.After(time.Second):
t.Logf("exp %d points, got %d", totalPoints, len(points))
t.Fatal("timed out waiting for points from collectd service")
}
}
if len(points) != totalPoints {
t.Fatalf("exp %d points, got %d", totalPoints, len(points))
}
for i, exp := range expPoints {
got := points[i].String()
if got != exp {
t.Fatalf("\n\texp = %s\n\tgot = %s\n", exp, got)
}
}
}
type testService struct {
*Service
PointsWriter testPointsWriter
}
func newTestService(batchSize int, batchDuration time.Duration) *testService {
s := &testService{
Service: NewService(Config{
BindAddress: "127.0.0.1:0",
Database: "collectd_test",
BatchSize: batchSize,
BatchDuration: batchDuration,
}),
}
s.Service.PointsWriter = &s.PointsWriter
// Set the collectd types using test string.
if err := s.SetTypes(typesDBText); err != nil {
panic(err)
}
if !testing.Verbose() {
s.Logger = log.New(ioutil.Discard, "", log.LstdFlags)
}
return s
}
type testPointsWriter struct {
WritePointsFn func(*cluster.WritePointsRequest) error
}
func (w *testPointsWriter) WritePoints(p *cluster.WritePointsRequest) error {
return w.WritePointsFn(p)
}
func wait(c chan struct{}, d time.Duration) (err error) {
select {
case <-c:
case <-time.After(d):
err = errors.New("timed out")
}
return
}
func waitInt(c chan int, d time.Duration) (i int, err error) {
select {
case i = <-c:
case <-time.After(d):
err = errors.New("timed out")
}
return
}
func check(err error) {
if err != nil {
panic(err)
}
}
var expPoints = []string{
"entropy_value,host=pf1-62-210-94-173,type=entropy value=288.0 1414080767000000000",
"cpu_value,host=pf1-62-210-94-173,instance=1,type=cpu,type_instance=idle value=10908770.0 1414080767000000000",
"cpu_value,host=pf1-62-210-94-173,instance=1,type=cpu,type_instance=wait value=0.0 1414080767000000000",
"df_used,host=pf1-62-210-94-173,type=df,type_instance=live-cow value=378576896.0 1414080767000000000",
"df_free,host=pf1-62-210-94-173,type=df,type_instance=live-cow value=50287988736.0 1414080767000000000",
"cpu_value,host=pf1-62-210-94-173,instance=1,type=cpu,type_instance=interrupt value=254.0 1414080767000000000",
"cpu_value,host=pf1-62-210-94-173,instance=1,type=cpu,type_instance=softirq value=0.0 1414080767000000000",
"df_used,host=pf1-62-210-94-173,type=df,type_instance=live value=0.0 1414080767000000000",
"df_free,host=pf1-62-210-94-173,type=df,type_instance=live value=50666565632.0 1414080767000000000",
"cpu_value,host=pf1-62-210-94-173,instance=1,type=cpu,type_instance=steal value=0.0 1414080767000000000",
"cpu_value,host=pf1-62-210-94-173,instance=2,type=cpu,type_instance=user value=24374.0 1414080767000000000",
"cpu_value,host=pf1-62-210-94-173,instance=2,type=cpu,type_instance=nice value=2776.0 1414080767000000000",
"interface_rx,host=pf1-62-210-94-173,type=if_octets,type_instance=dummy0 value=0.0 1414080767000000000",
"interface_tx,host=pf1-62-210-94-173,type=if_octets,type_instance=dummy0 value=1050.0 1414080767000000000",
"df_used,host=pf1-62-210-94-173,type=df,type_instance=tmp value=73728.0 1414080767000000000",
"df_free,host=pf1-62-210-94-173,type=df,type_instance=tmp value=50666491904.0 1414080767000000000",
"cpu_value,host=pf1-62-210-94-173,instance=2,type=cpu,type_instance=system value=17875.0 1414080767000000000",
"interface_rx,host=pf1-62-210-94-173,type=if_packets,type_instance=dummy0 value=0.0 1414080767000000000",
"interface_tx,host=pf1-62-210-94-173,type=if_packets,type_instance=dummy0 value=15.0 1414080767000000000",
"cpu_value,host=pf1-62-210-94-173,instance=2,type=cpu,type_instance=idle value=10904704.0 1414080767000000000",
"df_used,host=pf1-62-210-94-173,type=df,type_instance=run-lock value=0.0 1414080767000000000",
"df_free,host=pf1-62-210-94-173,type=df,type_instance=run-lock value=5242880.0 1414080767000000000",
"interface_rx,host=pf1-62-210-94-173,type=if_errors,type_instance=dummy0 value=0.0 1414080767000000000",
"interface_tx,host=pf1-62-210-94-173,type=if_errors,type_instance=dummy0 value=0.0 1414080767000000000",
"cpu_value,host=pf1-62-210-94-173,instance=2,type=cpu,type_instance=wait value=0.0 1414080767000000000",
"cpu_value,host=pf1-62-210-94-173,instance=2,type=cpu,type_instance=interrupt value=306.0 1414080767000000000",
}
// Taken from /usr/share/collectd/types.db on a Ubuntu system
var typesDBText = `
absolute value:ABSOLUTE:0:U
apache_bytes value:DERIVE:0:U
apache_connections value:GAUGE:0:65535
apache_idle_workers value:GAUGE:0:65535
apache_requests value:DERIVE:0:U
apache_scoreboard value:GAUGE:0:65535
ath_nodes value:GAUGE:0:65535
ath_stat value:DERIVE:0:U
backends value:GAUGE:0:65535
bitrate value:GAUGE:0:4294967295
bytes value:GAUGE:0:U
cache_eviction value:DERIVE:0:U
cache_operation value:DERIVE:0:U
cache_ratio value:GAUGE:0:100
cache_result value:DERIVE:0:U
cache_size value:GAUGE:0:4294967295
charge value:GAUGE:0:U
compression_ratio value:GAUGE:0:2
compression uncompressed:DERIVE:0:U, compressed:DERIVE:0:U
connections value:DERIVE:0:U
conntrack value:GAUGE:0:4294967295
contextswitch value:DERIVE:0:U
counter value:COUNTER:U:U
cpufreq value:GAUGE:0:U
cpu value:DERIVE:0:U
current_connections value:GAUGE:0:U
current_sessions value:GAUGE:0:U
current value:GAUGE:U:U
delay value:GAUGE:-1000000:1000000
derive value:DERIVE:0:U
df_complex value:GAUGE:0:U
df_inodes value:GAUGE:0:U
df used:GAUGE:0:1125899906842623, free:GAUGE:0:1125899906842623
disk_latency read:GAUGE:0:U, write:GAUGE:0:U
disk_merged read:DERIVE:0:U, write:DERIVE:0:U
disk_octets read:DERIVE:0:U, write:DERIVE:0:U
disk_ops_complex value:DERIVE:0:U
disk_ops read:DERIVE:0:U, write:DERIVE:0:U
disk_time read:DERIVE:0:U, write:DERIVE:0:U
dns_answer value:DERIVE:0:U
dns_notify value:DERIVE:0:U
dns_octets queries:DERIVE:0:U, responses:DERIVE:0:U
dns_opcode value:DERIVE:0:U
dns_qtype_cached value:GAUGE:0:4294967295
dns_qtype value:DERIVE:0:U
dns_query value:DERIVE:0:U
dns_question value:DERIVE:0:U
dns_rcode value:DERIVE:0:U
dns_reject value:DERIVE:0:U
dns_request value:DERIVE:0:U
dns_resolver value:DERIVE:0:U
dns_response value:DERIVE:0:U
dns_transfer value:DERIVE:0:U
dns_update value:DERIVE:0:U
dns_zops value:DERIVE:0:U
duration seconds:GAUGE:0:U
email_check value:GAUGE:0:U
email_count value:GAUGE:0:U
email_size value:GAUGE:0:U
entropy value:GAUGE:0:4294967295
fanspeed value:GAUGE:0:U
file_size value:GAUGE:0:U
files value:GAUGE:0:U
fork_rate value:DERIVE:0:U
frequency_offset value:GAUGE:-1000000:1000000
frequency value:GAUGE:0:U
fscache_stat value:DERIVE:0:U
gauge value:GAUGE:U:U
hash_collisions value:DERIVE:0:U
http_request_methods value:DERIVE:0:U
http_requests value:DERIVE:0:U
http_response_codes value:DERIVE:0:U
humidity value:GAUGE:0:100
if_collisions value:DERIVE:0:U
if_dropped rx:DERIVE:0:U, tx:DERIVE:0:U
if_errors rx:DERIVE:0:U, tx:DERIVE:0:U
if_multicast value:DERIVE:0:U
if_octets rx:DERIVE:0:U, tx:DERIVE:0:U
if_packets rx:DERIVE:0:U, tx:DERIVE:0:U
if_rx_errors value:DERIVE:0:U
if_rx_octets value:DERIVE:0:U
if_tx_errors value:DERIVE:0:U
if_tx_octets value:DERIVE:0:U
invocations value:DERIVE:0:U
io_octets rx:DERIVE:0:U, tx:DERIVE:0:U
io_packets rx:DERIVE:0:U, tx:DERIVE:0:U
ipt_bytes value:DERIVE:0:U
ipt_packets value:DERIVE:0:U
irq value:DERIVE:0:U
latency value:GAUGE:0:U
links value:GAUGE:0:U
load shortterm:GAUGE:0:5000, midterm:GAUGE:0:5000, longterm:GAUGE:0:5000
md_disks value:GAUGE:0:U
memcached_command value:DERIVE:0:U
memcached_connections value:GAUGE:0:U
memcached_items value:GAUGE:0:U
memcached_octets rx:DERIVE:0:U, tx:DERIVE:0:U
memcached_ops value:DERIVE:0:U
memory value:GAUGE:0:281474976710656
multimeter value:GAUGE:U:U
mutex_operations value:DERIVE:0:U
mysql_commands value:DERIVE:0:U
mysql_handler value:DERIVE:0:U
mysql_locks value:DERIVE:0:U
mysql_log_position value:DERIVE:0:U
mysql_octets rx:DERIVE:0:U, tx:DERIVE:0:U
nfs_procedure value:DERIVE:0:U
nginx_connections value:GAUGE:0:U
nginx_requests value:DERIVE:0:U
node_octets rx:DERIVE:0:U, tx:DERIVE:0:U
node_rssi value:GAUGE:0:255
node_stat value:DERIVE:0:U
node_tx_rate value:GAUGE:0:127
objects value:GAUGE:0:U
operations value:DERIVE:0:U
percent value:GAUGE:0:100.1
percent_bytes value:GAUGE:0:100.1
percent_inodes value:GAUGE:0:100.1
pf_counters value:DERIVE:0:U
pf_limits value:DERIVE:0:U
pf_source value:DERIVE:0:U
pf_states value:GAUGE:0:U
pf_state value:DERIVE:0:U
pg_blks value:DERIVE:0:U
pg_db_size value:GAUGE:0:U
pg_n_tup_c value:DERIVE:0:U
pg_n_tup_g value:GAUGE:0:U
pg_numbackends value:GAUGE:0:U
pg_scan value:DERIVE:0:U
pg_xact value:DERIVE:0:U
ping_droprate value:GAUGE:0:100
ping_stddev value:GAUGE:0:65535
ping value:GAUGE:0:65535
players value:GAUGE:0:1000000
power value:GAUGE:0:U
protocol_counter value:DERIVE:0:U
ps_code value:GAUGE:0:9223372036854775807
ps_count processes:GAUGE:0:1000000, threads:GAUGE:0:1000000
ps_cputime user:DERIVE:0:U, syst:DERIVE:0:U
ps_data value:GAUGE:0:9223372036854775807
ps_disk_octets read:DERIVE:0:U, write:DERIVE:0:U
ps_disk_ops read:DERIVE:0:U, write:DERIVE:0:U
ps_pagefaults minflt:DERIVE:0:U, majflt:DERIVE:0:U
ps_rss value:GAUGE:0:9223372036854775807
ps_stacksize value:GAUGE:0:9223372036854775807
ps_state value:GAUGE:0:65535
ps_vm value:GAUGE:0:9223372036854775807
queue_length value:GAUGE:0:U
records value:GAUGE:0:U
requests value:GAUGE:0:U
response_time value:GAUGE:0:U
response_code value:GAUGE:0:U
route_etx value:GAUGE:0:U
route_metric value:GAUGE:0:U
routes value:GAUGE:0:U
serial_octets rx:DERIVE:0:U, tx:DERIVE:0:U
signal_noise value:GAUGE:U:0
signal_power value:GAUGE:U:0
signal_quality value:GAUGE:0:U
snr value:GAUGE:0:U
spam_check value:GAUGE:0:U
spam_score value:GAUGE:U:U
spl value:GAUGE:U:U
swap_io value:DERIVE:0:U
swap value:GAUGE:0:1099511627776
tcp_connections value:GAUGE:0:4294967295
temperature value:GAUGE:U:U
threads value:GAUGE:0:U
time_dispersion value:GAUGE:-1000000:1000000
timeleft value:GAUGE:0:U
time_offset value:GAUGE:-1000000:1000000
total_bytes value:DERIVE:0:U
total_connections value:DERIVE:0:U
total_objects value:DERIVE:0:U
total_operations value:DERIVE:0:U
total_requests value:DERIVE:0:U
total_sessions value:DERIVE:0:U
total_threads value:DERIVE:0:U
total_time_in_ms value:DERIVE:0:U
total_values value:DERIVE:0:U
uptime value:GAUGE:0:4294967295
users value:GAUGE:0:65535
vcl value:GAUGE:0:65535
vcpu value:GAUGE:0:U
virt_cpu_total value:DERIVE:0:U
virt_vcpu value:DERIVE:0:U
vmpage_action value:DERIVE:0:U
vmpage_faults minflt:DERIVE:0:U, majflt:DERIVE:0:U
vmpage_io in:DERIVE:0:U, out:DERIVE:0:U
vmpage_number value:GAUGE:0:4294967295
volatile_changes value:GAUGE:0:U
voltage_threshold value:GAUGE:U:U, threshold:GAUGE:U:U
voltage value:GAUGE:U:U
vs_memory value:GAUGE:0:9223372036854775807
vs_processes value:GAUGE:0:65535
vs_threads value:GAUGE:0:65535
#
# Legacy types
# (required for the v5 upgrade target)
#
arc_counts demand_data:COUNTER:0:U, demand_metadata:COUNTER:0:U, prefetch_data:COUNTER:0:U, prefetch_metadata:COUNTER:0:U
arc_l2_bytes read:COUNTER:0:U, write:COUNTER:0:U
arc_l2_size value:GAUGE:0:U
arc_ratio value:GAUGE:0:U
arc_size current:GAUGE:0:U, target:GAUGE:0:U, minlimit:GAUGE:0:U, maxlimit:GAUGE:0:U
mysql_qcache hits:COUNTER:0:U, inserts:COUNTER:0:U, not_cached:COUNTER:0:U, lowmem_prunes:COUNTER:0:U, queries_in_cache:GAUGE:0:U
mysql_threads running:GAUGE:0:U, connected:GAUGE:0:U, cached:GAUGE:0:U, created:COUNTER:0:U
`

View File

@ -1,6 +1,7 @@
package tsdb
import (
"sync"
"sync/atomic"
"time"
)
@ -11,11 +12,15 @@ type PointBatcher struct {
size int
duration time.Duration
stop chan struct{}
in chan Point
out chan []Point
flush chan struct{}
stats PointBatcherStats
mu sync.RWMutex
wg *sync.WaitGroup
}
// NewPointBatcher returns a new PointBatcher.
@ -23,8 +28,9 @@ func NewPointBatcher(sz int, d time.Duration) *PointBatcher {
return &PointBatcher{
size: sz,
duration: d,
in: make(chan Point),
out: make(chan []Point),
stop: make(chan struct{}),
in: make(chan Point, 1000000),
out: make(chan []Point, 1000000),
flush: make(chan struct{}),
}
}
@ -40,6 +46,14 @@ type PointBatcherStats struct {
// Start starts the batching process. Returns the in and out channels for points
// and point-batches respectively.
func (b *PointBatcher) Start() {
b.mu.Lock()
defer b.mu.Unlock()
// Already running?
if b.wg != nil {
return
}
var timer *time.Timer
var batch []Point
var timerCh <-chan time.Time
@ -50,9 +64,19 @@ func (b *PointBatcher) Start() {
batch = nil
}
b.wg = &sync.WaitGroup{}
b.wg.Add(1)
go func() {
defer b.wg.Done()
for {
select {
case <-b.stop:
if len(batch) > 0 {
emit()
timerCh = nil
}
return
case p := <-b.in:
atomic.AddUint64(&b.stats.PointTotal, 1)
if batch == nil {
@ -82,21 +106,46 @@ func (b *PointBatcher) Start() {
}()
}
func (b *PointBatcher) Stop() {
b.mu.Lock()
defer b.mu.Unlock()
// If not running, nothing to stop.
if b.wg == nil {
return
}
close(b.stop)
b.wg.Wait()
}
// In returns the channel to which points should be written.
func (b *PointBatcher) In() chan<- Point { return b.in }
func (b *PointBatcher) In() chan<- Point {
b.mu.RLock()
defer b.mu.RUnlock()
return b.in
}
// Out returns the channel from which batches should be read.
func (b *PointBatcher) Out() <-chan []Point { return b.out }
func (b *PointBatcher) Out() <-chan []Point {
b.mu.RLock()
defer b.mu.RUnlock()
return b.out
}
// Flush instructs the batcher to emit any pending points in a batch, regardless of batch size.
// If there are no pending points, no batch is emitted.
func (b *PointBatcher) Flush() {
b.mu.Lock()
defer b.mu.Unlock()
b.flush <- struct{}{}
}
// Stats returns a PointBatcherStats object for the PointBatcher. While the each statistic should be
// closely correlated with each other statistic, it is not guaranteed.
func (b *PointBatcher) Stats() *PointBatcherStats {
b.mu.RLock()
defer b.mu.RUnlock()
stats := PointBatcherStats{}
stats.BatchTotal = atomic.LoadUint64(&b.stats.BatchTotal)
stats.PointTotal = atomic.LoadUint64(&b.stats.PointTotal)