Add test of Graphite TCP input
parent
21aa28f901
commit
cd038ccddb
|
@ -1,11 +1,20 @@
|
||||||
package graphite_test
|
package graphite_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/davecgh/go-spew/spew"
|
||||||
|
"github.com/influxdb/influxdb/cluster"
|
||||||
"github.com/influxdb/influxdb/services/graphite"
|
"github.com/influxdb/influxdb/services/graphite"
|
||||||
|
"github.com/influxdb/influxdb/toml"
|
||||||
|
"github.com/influxdb/influxdb/tsdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Test_DecodeNameAndTags(t *testing.T) {
|
func Test_DecodeNameAndTags(t *testing.T) {
|
||||||
|
@ -221,6 +230,83 @@ func Test_DecodeMetric(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Test_ServerGraphiteTCP(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip()
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now().UTC().Round(time.Second)
|
||||||
|
|
||||||
|
config := graphite.NewConfig()
|
||||||
|
config.Database = "graphitedb"
|
||||||
|
config.BatchSize = 0 // No batching.
|
||||||
|
config.BatchTimeout = toml.Duration(time.Second)
|
||||||
|
config.BindAddress = ":0"
|
||||||
|
|
||||||
|
service := graphite.NewService(config)
|
||||||
|
if service == nil {
|
||||||
|
t.Fatal("failed to create Graphite service")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Allow test to wait until points are written.
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
|
|
||||||
|
pointsWriter := PointsWriter{
|
||||||
|
WritePointsFn: func(req *cluster.WritePointsRequest) error {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
if req.Database != "graphitedb" {
|
||||||
|
t.Fatalf("unexpected database: %s", req.Database)
|
||||||
|
} else if req.RetentionPolicy != "" {
|
||||||
|
t.Fatalf("unexpected retention policy: %s", req.RetentionPolicy)
|
||||||
|
} else if !reflect.DeepEqual(req.Points, []tsdb.Point{
|
||||||
|
tsdb.NewPoint(
|
||||||
|
"cpu",
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{"value": 23.456},
|
||||||
|
time.Unix(now.Unix(), 0),
|
||||||
|
),
|
||||||
|
}) {
|
||||||
|
spew.Dump(req.Points)
|
||||||
|
t.Fatalf("unexpected points: %#v", req.Points)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
service.PointsWriter = &pointsWriter
|
||||||
|
|
||||||
|
if err := service.Open(); err != nil {
|
||||||
|
t.Fatalf("failed to open Graphite service: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect to the graphite endpoint we just spun up
|
||||||
|
conn, err := net.Dial("tcp", strings.TrimPrefix(service.Host(), "[::]"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
data := []byte(`cpu 23.456 `)
|
||||||
|
data = append(data, []byte(fmt.Sprintf("%d", now.Unix()))...)
|
||||||
|
data = append(data, '\n')
|
||||||
|
_, err = conn.Write(data)
|
||||||
|
conn.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// PointsWriter represents a mock impl of PointsWriter.
|
||||||
|
type PointsWriter struct {
|
||||||
|
WritePointsFn func(*cluster.WritePointsRequest) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *PointsWriter) WritePoints(p *cluster.WritePointsRequest) error {
|
||||||
|
return w.WritePointsFn(p)
|
||||||
|
}
|
||||||
|
|
||||||
// Test Helpers
|
// Test Helpers
|
||||||
func errstr(err error) string {
|
func errstr(err error) string {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in New Issue