diff --git a/services/graphite/graphite_test.go b/services/graphite/graphite_test.go index c67195a4a6..8688ca11e3 100644 --- a/services/graphite/graphite_test.go +++ b/services/graphite/graphite_test.go @@ -1,11 +1,20 @@ package graphite_test import ( + "fmt" + "net" + "reflect" "strconv" + "strings" + "sync" "testing" "time" + "github.com/davecgh/go-spew/spew" + "github.com/influxdb/influxdb/cluster" "github.com/influxdb/influxdb/services/graphite" + "github.com/influxdb/influxdb/toml" + "github.com/influxdb/influxdb/tsdb" ) func Test_DecodeNameAndTags(t *testing.T) { @@ -221,6 +230,83 @@ func Test_DecodeMetric(t *testing.T) { } } +func Test_ServerGraphiteTCP(t *testing.T) { + t.Parallel() + if testing.Short() { + t.Skip() + } + + now := time.Now().UTC().Round(time.Second) + + config := graphite.NewConfig() + config.Database = "graphitedb" + config.BatchSize = 0 // No batching. + config.BatchTimeout = toml.Duration(time.Second) + config.BindAddress = ":0" + + service := graphite.NewService(config) + if service == nil { + t.Fatal("failed to create Graphite service") + } + + // Allow test to wait until points are written. + var wg sync.WaitGroup + wg.Add(1) + + pointsWriter := PointsWriter{ + WritePointsFn: func(req *cluster.WritePointsRequest) error { + defer wg.Done() + + if req.Database != "graphitedb" { + t.Fatalf("unexpected database: %s", req.Database) + } else if req.RetentionPolicy != "" { + t.Fatalf("unexpected retention policy: %s", req.RetentionPolicy) + } else if !reflect.DeepEqual(req.Points, []tsdb.Point{ + tsdb.NewPoint( + "cpu", + map[string]string{}, + map[string]interface{}{"value": 23.456}, + time.Unix(now.Unix(), 0), + ), + }) { + spew.Dump(req.Points) + t.Fatalf("unexpected points: %#v", req.Points) + } + return nil + }, + } + service.PointsWriter = &pointsWriter + + if err := service.Open(); err != nil { + t.Fatalf("failed to open Graphite service: %s", err.Error()) + } + + // Connect to the graphite endpoint we just spun up + conn, err := net.Dial("tcp", strings.TrimPrefix(service.Host(), "[::]")) + if err != nil { + t.Fatal(err) + } + data := []byte(`cpu 23.456 `) + data = append(data, []byte(fmt.Sprintf("%d", now.Unix()))...) + data = append(data, '\n') + _, err = conn.Write(data) + conn.Close() + if err != nil { + t.Fatal(err) + } + + wg.Wait() +} + +// PointsWriter represents a mock impl of PointsWriter. +type PointsWriter struct { + WritePointsFn func(*cluster.WritePointsRequest) error +} + +func (w *PointsWriter) WritePoints(p *cluster.WritePointsRequest) error { + return w.WritePointsFn(p) +} + // Test Helpers func errstr(err error) string { if err != nil {