support fractional time on graphite endpoint

pull/2080/head
Cory LaNou 2015-03-26 09:42:40 -06:00
parent 524eb78a69
commit d653e41712
3 changed files with 68 additions and 7 deletions

View File

@ -1346,12 +1346,64 @@ func Test_ServerSingleGraphiteIntegration(t *testing.T) {
}
}
func Test_ServerSingleGraphiteIntegration_ZeroDataPoint(t *testing.T) {
func Test_ServerSingleGraphiteIntegration_FractionalTime(t *testing.T) {
if testing.Short() {
t.Skip()
}
nNodes := 1
basePort := 8490
testName := "graphite integration fractional time"
dir := tempfile()
now := time.Now().UTC().Round(time.Second).Add(500 * time.Millisecond)
c, _ := main.NewConfig()
g := main.Graphite{
Enabled: true,
Database: "graphite",
Protocol: "TCP",
Port: 2103,
}
c.Graphites = append(c.Graphites, g)
t.Logf("Graphite Connection String: %s\n", g.ConnectionString(c.BindAddress))
nodes := createCombinedNodeCluster(t, testName, dir, nNodes, basePort, c)
createDatabase(t, testName, nodes, "graphite")
createRetentionPolicy(t, testName, nodes, "graphite", "raw")
// Connect to the graphite endpoint we just spun up
conn, err := net.Dial("tcp", g.ConnectionString(c.BindAddress))
if err != nil {
t.Fatal(err)
return
}
t.Log("Writing data")
data := []byte(`cpu 23.456 `)
data = append(data, []byte(fmt.Sprintf("%d", now.Unix()))...)
data = append(data, []byte(".5")...)
data = append(data, '\n')
_, err = conn.Write(data)
conn.Close()
if err != nil {
t.Fatal(err)
return
}
expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","cpu"],"values":[["%s",23.456]]}]}]}`, now.Format(time.RFC3339Nano))
// query and wait for results
got, ok := queryAndWait(t, nodes, "graphite", `select * from "graphite"."raw".cpu`, expected, 2*time.Second)
if !ok {
t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
}
}
func Test_ServerSingleGraphiteIntegration_ZeroDataPoint(t *testing.T) {
if testing.Short() {
t.Skip()
}
nNodes := 1
basePort := 8590
testName := "graphite integration"
dir := tempfile()
now := time.Now().UTC().Round(time.Second)
@ -1360,7 +1412,7 @@ func Test_ServerSingleGraphiteIntegration_ZeroDataPoint(t *testing.T) {
Enabled: true,
Database: "graphite",
Protocol: "TCP",
Port: 2103,
Port: 2203,
}
c.Graphites = append(c.Graphites, g)
@ -1402,14 +1454,14 @@ func Test_ServerSingleGraphiteIntegration_NoDatabase(t *testing.T) {
t.Skip()
}
nNodes := 1
basePort := 8590
basePort := 8690
testName := "graphite integration"
dir := tempfile()
now := time.Now().UTC().Round(time.Second)
c, _ := main.NewConfig()
g := main.Graphite{
Enabled: true,
Port: 2203,
Port: 2303,
Protocol: "TCP",
}
c.Graphites = append(c.Graphites, g)

View File

@ -87,12 +87,21 @@ func (p *Parser) Parse(line string) (influxdb.Point, error) {
fieldValues[name] = v
// Parse timestamp.
unixTime, err := strconv.ParseInt(fields[2], 10, 64)
//unixTime, err := strconv.ParseInt(fields[2], 10, 64)
unixTime, err := strconv.ParseFloat(fields[2], 64)
if err != nil {
return influxdb.Point{}, err
}
timestamp := time.Unix(unixTime, 0)
var timestamp time.Time
// Check if we have fractional seconds
if float64(int64(unixTime)) != unixTime {
nanoseconds := int64((unixTime - float64(int64(unixTime))) * float64(time.Second))
seconds := int64(unixTime)
timestamp = time.Unix(seconds, nanoseconds)
} else {
timestamp = time.Unix(int64(unixTime), 0)
}
point := influxdb.Point{
Name: name,

View File

@ -184,7 +184,7 @@ func Test_DecodeMetric(t *testing.T) {
{
test: "should fail parsing invalid time",
line: `cpu 50.554 14199724z57825`,
err: `strconv.ParseInt: parsing "14199724z57825": invalid syntax`,
err: `strconv.ParseFloat: parsing "14199724z57825": invalid syntax`,
},
}