diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index c3496db40f..26d1970217 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -1346,12 +1346,64 @@ func Test_ServerSingleGraphiteIntegration(t *testing.T) { } } -func Test_ServerSingleGraphiteIntegration_ZeroDataPoint(t *testing.T) { +func Test_ServerSingleGraphiteIntegration_FractionalTime(t *testing.T) { if testing.Short() { t.Skip() } nNodes := 1 basePort := 8490 + testName := "graphite integration fractional time" + dir := tempfile() + now := time.Now().UTC().Round(time.Second).Add(500 * time.Millisecond) + c, _ := main.NewConfig() + g := main.Graphite{ + Enabled: true, + Database: "graphite", + Protocol: "TCP", + Port: 2103, + } + c.Graphites = append(c.Graphites, g) + + t.Logf("Graphite Connection String: %s\n", g.ConnectionString(c.BindAddress)) + nodes := createCombinedNodeCluster(t, testName, dir, nNodes, basePort, c) + + createDatabase(t, testName, nodes, "graphite") + createRetentionPolicy(t, testName, nodes, "graphite", "raw") + + // Connect to the graphite endpoint we just spun up + conn, err := net.Dial("tcp", g.ConnectionString(c.BindAddress)) + if err != nil { + t.Fatal(err) + return + } + + t.Log("Writing data") + data := []byte(`cpu 23.456 `) + data = append(data, []byte(fmt.Sprintf("%d", now.Unix()))...) + data = append(data, []byte(".5")...) + data = append(data, '\n') + _, err = conn.Write(data) + conn.Close() + if err != nil { + t.Fatal(err) + return + } + + expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","cpu"],"values":[["%s",23.456]]}]}]}`, now.Format(time.RFC3339Nano)) + + // query and wait for results + got, ok := queryAndWait(t, nodes, "graphite", `select * from "graphite"."raw".cpu`, expected, 2*time.Second) + if !ok { + t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got) + } +} + +func Test_ServerSingleGraphiteIntegration_ZeroDataPoint(t *testing.T) { + if testing.Short() { + t.Skip() + } + nNodes := 1 + basePort := 8590 testName := "graphite integration" dir := tempfile() now := time.Now().UTC().Round(time.Second) @@ -1360,7 +1412,7 @@ func Test_ServerSingleGraphiteIntegration_ZeroDataPoint(t *testing.T) { Enabled: true, Database: "graphite", Protocol: "TCP", - Port: 2103, + Port: 2203, } c.Graphites = append(c.Graphites, g) @@ -1402,14 +1454,14 @@ func Test_ServerSingleGraphiteIntegration_NoDatabase(t *testing.T) { t.Skip() } nNodes := 1 - basePort := 8590 + basePort := 8690 testName := "graphite integration" dir := tempfile() now := time.Now().UTC().Round(time.Second) c, _ := main.NewConfig() g := main.Graphite{ Enabled: true, - Port: 2203, + Port: 2303, Protocol: "TCP", } c.Graphites = append(c.Graphites, g) diff --git a/graphite/graphite.go b/graphite/graphite.go index 8ea7f209a4..659af129ef 100644 --- a/graphite/graphite.go +++ b/graphite/graphite.go @@ -87,12 +87,21 @@ func (p *Parser) Parse(line string) (influxdb.Point, error) { fieldValues[name] = v // Parse timestamp. - unixTime, err := strconv.ParseInt(fields[2], 10, 64) + //unixTime, err := strconv.ParseInt(fields[2], 10, 64) + unixTime, err := strconv.ParseFloat(fields[2], 64) if err != nil { return influxdb.Point{}, err } - timestamp := time.Unix(unixTime, 0) + var timestamp time.Time + // Check if we have fractional seconds + if float64(int64(unixTime)) != unixTime { + nanoseconds := int64((unixTime - float64(int64(unixTime))) * float64(time.Second)) + seconds := int64(unixTime) + timestamp = time.Unix(seconds, nanoseconds) + } else { + timestamp = time.Unix(int64(unixTime), 0) + } point := influxdb.Point{ Name: name, diff --git a/graphite/graphite_test.go b/graphite/graphite_test.go index bd5c9d732e..f8eaaa75ab 100644 --- a/graphite/graphite_test.go +++ b/graphite/graphite_test.go @@ -184,7 +184,7 @@ func Test_DecodeMetric(t *testing.T) { { test: "should fail parsing invalid time", line: `cpu 50.554 14199724z57825`, - err: `strconv.ParseInt: parsing "14199724z57825": invalid syntax`, + err: `strconv.ParseFloat: parsing "14199724z57825": invalid syntax`, }, }