fix #2599: add epoch URL param for timestamp fmt
parent
308f127289
commit
31c597a401
|
@ -175,7 +175,7 @@ func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes int
|
|||
// createDatabase creates a database, and verifies that the creation was successful.
|
||||
func createDatabase(t *testing.T, testName string, nodes Cluster, database string) {
|
||||
t.Logf("Test: %s: creating database %s", testName, database)
|
||||
query(t, nodes[:1], "", "CREATE DATABASE "+database, `{"results":[{}]}`, "")
|
||||
query(t, nodes[:1], "CREATE DATABASE "+database, `{"results":[{}]}`, "", url.Values{})
|
||||
}
|
||||
|
||||
// createRetentionPolicy creates a retetention policy and verifies that the creation was successful.
|
||||
|
@ -183,20 +183,20 @@ func createDatabase(t *testing.T, testName string, nodes Cluster, database strin
|
|||
func createRetentionPolicy(t *testing.T, testName string, nodes Cluster, database, retention string, replicationFactor int) {
|
||||
t.Logf("Creating retention policy %s for database %s", retention, database)
|
||||
command := fmt.Sprintf("CREATE RETENTION POLICY %s ON %s DURATION 1h REPLICATION %d DEFAULT", retention, database, replicationFactor)
|
||||
query(t, nodes[:1], "", command, `{"results":[{}]}`, "")
|
||||
query(t, nodes[:1], command, `{"results":[{}]}`, "", url.Values{})
|
||||
}
|
||||
|
||||
// deleteDatabase delete a database, and verifies that the deletion was successful.
|
||||
func deleteDatabase(t *testing.T, testName string, nodes Cluster, database string) {
|
||||
t.Logf("Test: %s: deleting database %s", testName, database)
|
||||
query(t, nodes[:1], "", "DROP DATABASE "+database, `{"results":[{}]}`, "")
|
||||
query(t, nodes[:1], "DROP DATABASE "+database, `{"results":[{}]}`, "", url.Values{})
|
||||
}
|
||||
|
||||
// dumpClusterDiags dumps the diagnostics of each node.
|
||||
func dumpClusterDiags(t *testing.T, testName string, nodes Cluster) {
|
||||
t.Logf("Test: %s: dumping node diagnostics", testName)
|
||||
for _, n := range nodes {
|
||||
r, _, _ := query(t, Cluster{n}, "", "SHOW DIAGNOSTICS", "", "")
|
||||
r, _, _ := query(t, Cluster{n}, "SHOW DIAGNOSTICS", "", "", url.Values{})
|
||||
t.Log(r)
|
||||
}
|
||||
}
|
||||
|
@ -205,7 +205,7 @@ func dumpClusterDiags(t *testing.T, testName string, nodes Cluster) {
|
|||
func dumpClusterStats(t *testing.T, testName string, nodes Cluster) {
|
||||
t.Logf("Test: %s: dumping node stats", testName)
|
||||
for _, n := range nodes {
|
||||
r, _, _ := query(t, Cluster{n}, "", "SHOW STATS", "", "")
|
||||
r, _, _ := query(t, Cluster{n}, "SHOW STATS", "", "", url.Values{})
|
||||
t.Log(r)
|
||||
}
|
||||
}
|
||||
|
@ -231,16 +231,13 @@ func write(t *testing.T, node *TestNode, data string) {
|
|||
|
||||
// query executes the given query against all nodes in the cluster, and verifies no errors occured, and
|
||||
// ensures the returned data is as expected
|
||||
func query(t *testing.T, nodes Cluster, urlDb, query, expected, expectPattern string) (string, bool, int) {
|
||||
v := url.Values{"q": []string{query}}
|
||||
if urlDb != "" {
|
||||
v.Set("db", urlDb)
|
||||
}
|
||||
func query(t *testing.T, nodes Cluster, query, expected, expectPattern string, urlVals url.Values) (string, bool, int) {
|
||||
urlVals.Set("q", query)
|
||||
|
||||
var actual string
|
||||
// Query the data exists
|
||||
for i, n := range nodes {
|
||||
u := urlFor(n.url, "query", v)
|
||||
u := urlFor(n.url, "query", urlVals)
|
||||
resp, err := http.Get(u.String())
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to execute query '%s': %s", query, err.Error())
|
||||
|
@ -268,12 +265,7 @@ func query(t *testing.T, nodes Cluster, urlDb, query, expected, expectPattern st
|
|||
|
||||
// queryAndWait executes the given query against all nodes in the cluster, and verifies no errors occured, and
|
||||
// ensures the returned data is as expected until the timeout occurs
|
||||
func queryAndWait(t *testing.T, nodes Cluster, urlDb, q, expected, expectPattern string, timeout time.Duration) (string, bool, int) {
|
||||
v := url.Values{"q": []string{q}}
|
||||
if urlDb != "" {
|
||||
v.Set("db", urlDb)
|
||||
}
|
||||
|
||||
func queryAndWait(t *testing.T, nodes Cluster, q, expected, expectPattern string, timeout time.Duration, urlVals url.Values) (string, bool, int) {
|
||||
sleep := 100 * time.Millisecond
|
||||
// Check to see if they set the env for duration sleep
|
||||
if sleepRaw := os.Getenv("TEST_SLEEP"); sleepRaw != "" {
|
||||
|
@ -295,7 +287,7 @@ func queryAndWait(t *testing.T, nodes Cluster, urlDb, q, expected, expectPattern
|
|||
|
||||
nQueriedOK := 0
|
||||
for {
|
||||
got, ok, n := query(t, nodes, urlDb, q, expected, expectPattern)
|
||||
got, ok, n := query(t, nodes, q, expected, expectPattern, urlVals)
|
||||
if n > nQueriedOK {
|
||||
nQueriedOK = n
|
||||
}
|
||||
|
@ -351,7 +343,8 @@ func runTest_rawDataReturnsInOrder(t *testing.T, testName string, nodes Cluster,
|
|||
}
|
||||
|
||||
expected = fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","count"],"values":[["1970-01-01T00:00:00Z",%d]]}]}]}`, numPoints-1)
|
||||
got, ok, nOK := queryAndWait(t, nodes, database, `SELECT count(value) FROM cpu`, expected, "", 120*time.Second)
|
||||
v := url.Values{"db": []string{database}}
|
||||
got, ok, nOK := queryAndWait(t, nodes, `SELECT count(value) FROM cpu`, expected, "", 120*time.Second, v)
|
||||
if !ok {
|
||||
t.Fatalf("test %s:rawDataReturnsInOrder failed, SELECT count() query returned unexpected data\nexp: %s\ngot: %s\n%d nodes responded correctly", testName, expected, got, nOK)
|
||||
dumpClusterDiags(t, testName, nodes)
|
||||
|
@ -364,7 +357,7 @@ func runTest_rawDataReturnsInOrder(t *testing.T, testName string, nodes Cluster,
|
|||
expectedValues = append(expectedValues, fmt.Sprintf(`["%s",%d]`, time.Unix(int64(i), int64(0)).UTC().Format(time.RFC3339), i))
|
||||
}
|
||||
expected = fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[%s]}]}]}`, strings.Join(expectedValues, ","))
|
||||
got, ok, nOK = query(t, nodes, database, `SELECT value FROM cpu`, expected, "")
|
||||
got, ok, nOK = query(t, nodes, `SELECT value FROM cpu`, expected, "", v)
|
||||
if !ok {
|
||||
t.Fatalf("test %s failed, SELECT query returned unexpected data\nexp: %s\ngot: %s\n%d nodes responded correctly", testName, expected, got, nOK)
|
||||
dumpClusterDiags(t, testName, nodes)
|
||||
|
@ -396,7 +389,7 @@ func runTests_Errors(t *testing.T, nodes Cluster) {
|
|||
}
|
||||
|
||||
if tt.query != "" {
|
||||
got, ok, nOK := query(t, nodes, "", tt.query, tt.expected, "")
|
||||
got, ok, nOK := query(t, nodes, tt.query, tt.expected, "", url.Values{})
|
||||
if !ok {
|
||||
t.Fatalf("Test '%s' failed, expected: %s, got: %s, %d nodes responded correctly", tt.name, tt.expected, got, nOK)
|
||||
}
|
||||
|
@ -419,16 +412,17 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
|
|||
// The tests. Within these tests %DB% and %RP% will be replaced with the database and retention passed into
|
||||
// this function.
|
||||
tests := []struct {
|
||||
skip bool // Skip the test.
|
||||
reset bool // Delete and recreate the database.
|
||||
name string // Test name, for easy-to-read test log output.
|
||||
write string // If equal to the empty string, no data is written.
|
||||
writeFn writeFn // If non-nil, called after 'write' data (if any) is written.
|
||||
query string // If equal to the blank string, no query is executed.
|
||||
queryDb string // If set, is used as the "db" query param.
|
||||
queryOne bool // If set, only 1 node is queried.
|
||||
expected string // If 'query' is equal to the blank string, this is ignored.
|
||||
expectPattern string // Regexp alternative to expected field. (ignored if expected is set)
|
||||
skip bool // Skip the test.
|
||||
reset bool // Delete and recreate the database.
|
||||
name string // Test name, for easy-to-read test log output.
|
||||
write string // If equal to the empty string, no data is written.
|
||||
writeFn writeFn // If non-nil, called after 'write' data (if any) is written.
|
||||
query string // If equal to the blank string, no query is executed.
|
||||
queryDb string // If set, is used as the "db" query param.
|
||||
queryOne bool // If set, only 1 node is queried.
|
||||
expected string // If 'query' is equal to the blank string, this is ignored.
|
||||
expectPattern string // Regexp alternative to expected field. (ignored if expected is set)
|
||||
urlValues url.Values // Extra params to be passed on the URL
|
||||
}{
|
||||
// Data read and write tests
|
||||
{
|
||||
|
@ -460,6 +454,30 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
|
|||
query: `SELECT * FROM "%DB%"."%RP%".cpu WHERE time < NOW()`,
|
||||
expected: `{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[["2015-02-28T01:03:36.703820946Z",100]]}]}]}`,
|
||||
},
|
||||
{
|
||||
name: "single point with timestamp in nanosecond epoch",
|
||||
query: `SELECT * FROM "%DB%"."%RP%".cpu`,
|
||||
expected: `{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[[1425085416703820946,100]]}]}]}`,
|
||||
urlValues: url.Values{"epoch": []string{"ns"}},
|
||||
},
|
||||
{
|
||||
name: "single point with timestamp in microsecond epoch",
|
||||
query: `SELECT * FROM "%DB%"."%RP%".cpu`,
|
||||
expected: `{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[[1425085416703820,100]]}]}]}`,
|
||||
urlValues: url.Values{"epoch": []string{"us"}},
|
||||
},
|
||||
{
|
||||
name: "single point with timestamp in millisecond epoch",
|
||||
query: `SELECT * FROM "%DB%"."%RP%".cpu`,
|
||||
expected: `{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[[1425085416703,100]]}]}]}`,
|
||||
urlValues: url.Values{"epoch": []string{"ms"}},
|
||||
},
|
||||
{
|
||||
name: "single point with timestamp in second epoch",
|
||||
query: `SELECT * FROM "%DB%"."%RP%".cpu`,
|
||||
expected: `{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[[1425085416,100]]}]}]}`,
|
||||
urlValues: url.Values{"epoch": []string{"s"}},
|
||||
},
|
||||
|
||||
// Selecting tags
|
||||
{
|
||||
|
@ -1101,13 +1119,15 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
|
|||
},
|
||||
{
|
||||
name: "select where boolean field value =",
|
||||
write: `{"database" : "%DB%", "retentionPolicy" : "%RP%", "points": [{"name": "bool", "time": "2009-11-10T23:00:02Z", "fields": {"value": true}},
|
||||
{"name": "bool", "time": "2009-11-10T23:01:02Z", "fields": {"value": false}}]}`,
|
||||
write: `{"database" : "%DB%", "retentionPolicy" : "%RP%", "points": [{"measurement": "bool", "time": "2009-11-10T23:00:02Z", "fields": {"value": true}},
|
||||
{"measurement": "bool", "time": "2009-11-10T23:01:02Z", "fields": {"value": false}}]}`,
|
||||
queryDb: "%DB%",
|
||||
query: `select value from "%DB%"."%RP%".bool where value = true`,
|
||||
expected: `{"results":[{"series":[{"name":"bool","columns":["time","value"],"values":[["2009-11-10T23:00:02Z",true]]}]}]}`,
|
||||
},
|
||||
{
|
||||
name: "select where boolean field value !=",
|
||||
queryDb: "%DB%",
|
||||
query: `select value from "%DB%"."%RP%".bool where value != true`,
|
||||
expected: `{"results":[{"series":[{"name":"bool","columns":["time","value"],"values":[["2009-11-10T23:01:02Z",false]]}]}]}`,
|
||||
},
|
||||
|
@ -1240,7 +1260,7 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
|
|||
name: "ensure we can query for memory with both tags",
|
||||
query: `SELECT * FROM memory where region='uswest' and host='serverB'`,
|
||||
queryDb: "%DB%",
|
||||
expected: `{"results":[{"series":[{"name":"memory","columns":["time","val"],"values":[["2000-01-01T00:00:01Z",33.2]]}]}]}`,
|
||||
expected: `{"results":[{"series":[{"name":"memory","tags":{"host":"serverB","region":"uswest"},"columns":["time","val"],"values":[["2000-01-01T00:00:01Z",33.2]]}]}]}`,
|
||||
},
|
||||
{
|
||||
query: `DROP MEASUREMENT cpu`,
|
||||
|
@ -1266,17 +1286,17 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
|
|||
{
|
||||
query: `SELECT * FROM memory where host='serverB'`,
|
||||
queryDb: "%DB%",
|
||||
expected: `{"results":[{"series":[{"name":"memory","columns":["time","val"],"values":[["2000-01-01T00:00:01Z",33.2]]}]}]}`,
|
||||
expected: `{"results":[{"series":[{"name":"memory","tags":{"host":"serverB","region":"uswest"},"columns":["time","val"],"values":[["2000-01-01T00:00:01Z",33.2]]}]}]}`,
|
||||
},
|
||||
{
|
||||
query: `SELECT * FROM memory where region='uswest'`,
|
||||
queryDb: "%DB%",
|
||||
expected: `{"results":[{"series":[{"name":"memory","columns":["time","val"],"values":[["2000-01-01T00:00:01Z",33.2]]}]}]}`,
|
||||
expected: `{"results":[{"series":[{"name":"memory","tags":{"host":"serverB","region":"uswest"},"columns":["time","val"],"values":[["2000-01-01T00:00:01Z",33.2]]}]}]}`,
|
||||
},
|
||||
{
|
||||
query: `SELECT * FROM memory where region='uswest' and host='serverB'`,
|
||||
queryDb: "%DB%",
|
||||
expected: `{"results":[{"series":[{"name":"memory","columns":["time","val"],"values":[["2000-01-01T00:00:01Z",33.2]]}]}]}`,
|
||||
expected: `{"results":[{"series":[{"name":"memory","tags":{"host":"serverB","region":"uswest"},"columns":["time","val"],"values":[["2000-01-01T00:00:01Z",33.2]]}]}]}`,
|
||||
},
|
||||
|
||||
// Drop non-existant measurement tests
|
||||
|
@ -1696,7 +1716,15 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
|
|||
urlDb = tt.queryDb
|
||||
}
|
||||
qry := rewriteDbRp(tt.query, database, retention)
|
||||
got, ok, nOK := queryAndWait(t, qNodes, rewriteDbRp(urlDb, database, retention), qry, rewriteDbRp(tt.expected, database, retention), rewriteDbRp(tt.expectPattern, database, retention), 30*time.Second)
|
||||
|
||||
v := tt.urlValues
|
||||
if v == nil {
|
||||
v = url.Values{"db": []string{rewriteDbRp(urlDb, database, retention)}}
|
||||
} else {
|
||||
v.Set("db", rewriteDbRp(urlDb, database, retention))
|
||||
}
|
||||
|
||||
got, ok, nOK := queryAndWait(t, qNodes, qry, rewriteDbRp(tt.expected, database, retention), rewriteDbRp(tt.expectPattern, database, retention), 30*time.Second, v)
|
||||
if !ok {
|
||||
if tt.expected != "" {
|
||||
t.Fatalf("Test #%d: \"%s:%s\" failed\n query: %s\n exp: %s\n got: %s\n%d nodes responded correctly", i, testName, name, qry, rewriteDbRp(tt.expected, database, retention), got, nOK)
|
||||
|
@ -1736,7 +1764,7 @@ func TestServerDiags(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestSingleServer(t *testing.T) {
|
||||
t.Skip()
|
||||
//t.Skip()
|
||||
t.Parallel()
|
||||
testName := "single server integration"
|
||||
if testing.Short() {
|
||||
|
@ -1997,7 +2025,8 @@ func Test_ServerSingleGraphiteIntegration_Default(t *testing.T) {
|
|||
expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["%s",23.456]]}]}]}`, now.Format(time.RFC3339Nano))
|
||||
|
||||
// query and wait for results
|
||||
got, ok, _ := queryAndWait(t, nodes, "graphite", `select * from "graphite"."raw".cpu`, expected, "", graphiteTestTimeout)
|
||||
v := url.Values{"db": []string{"graphite"}}
|
||||
got, ok, _ := queryAndWait(t, nodes, `select * from "graphite"."raw".cpu`, expected, "", graphiteTestTimeout, v)
|
||||
if !ok {
|
||||
t.Fatalf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
|
||||
}
|
||||
|
@ -2055,7 +2084,8 @@ func Test_ServerSingleGraphiteIntegration_FractionalTime(t *testing.T) {
|
|||
expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["%s",23.456]]}]}]}`, now.Format(time.RFC3339Nano))
|
||||
|
||||
// query and wait for results
|
||||
got, ok, _ := queryAndWait(t, nodes, "graphite", `select * from "graphite"."raw".cpu`, expected, "", graphiteTestTimeout)
|
||||
v := url.Values{"db": []string{"graphite"}}
|
||||
got, ok, _ := queryAndWait(t, nodes, `select * from "graphite"."raw".cpu`, expected, "", graphiteTestTimeout, v)
|
||||
if !ok {
|
||||
t.Fatalf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
|
||||
}
|
||||
|
@ -2112,7 +2142,8 @@ func Test_ServerSingleGraphiteIntegration_ZeroDataPoint(t *testing.T) {
|
|||
expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["%s",0]]}]}]}`, now.Format(time.RFC3339Nano))
|
||||
|
||||
// query and wait for results
|
||||
got, ok, _ := queryAndWait(t, nodes, "graphite", `select * from "graphite"."raw".cpu`, expected, "", graphiteTestTimeout)
|
||||
v := url.Values{"db": []string{"graphite"}}
|
||||
got, ok, _ := queryAndWait(t, nodes, `select * from "graphite"."raw".cpu`, expected, "", graphiteTestTimeout, v)
|
||||
if !ok {
|
||||
t.Fatalf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
|
||||
}
|
||||
|
@ -2154,14 +2185,15 @@ func Test_ServerSingleGraphiteIntegration_NoDatabase(t *testing.T) {
|
|||
|
||||
// Need to wait for the database to be created
|
||||
expected := `{"results":[{"series":[{"name":"databases","columns":["name"],"values":[["graphite"]]}]}]}`
|
||||
got, ok, _ := queryAndWait(t, nodes, "graphite", `show databases`, expected, "", 2*time.Second)
|
||||
v := url.Values{"db": []string{"graphite"}}
|
||||
got, ok, _ := queryAndWait(t, nodes, `show databases`, expected, "", 2*time.Second, v)
|
||||
if !ok {
|
||||
t.Fatalf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
|
||||
}
|
||||
|
||||
// Need to wait for the database to get a default retention policy
|
||||
expected = `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["default","0",1,true]]}]}]}`
|
||||
got, ok, _ = queryAndWait(t, nodes, "graphite", `show retention policies graphite`, expected, "", graphiteTestTimeout)
|
||||
got, ok, _ = queryAndWait(t, nodes, `show retention policies graphite`, expected, "", graphiteTestTimeout, v)
|
||||
if !ok {
|
||||
t.Fatalf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
|
||||
}
|
||||
|
@ -2179,7 +2211,7 @@ func Test_ServerSingleGraphiteIntegration_NoDatabase(t *testing.T) {
|
|||
|
||||
// Wait for data to show up
|
||||
expected = fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["%s",23.456]]}]}]}`, now.Format(time.RFC3339Nano))
|
||||
got, ok, _ = queryAndWait(t, nodes, "graphite", `select * from "graphite"."default".cpu`, expected, "", 2*time.Second)
|
||||
got, ok, _ = queryAndWait(t, nodes, `select * from "graphite"."default".cpu`, expected, "", 2*time.Second, v)
|
||||
if !ok {
|
||||
t.Fatalf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
|
||||
}
|
||||
|
@ -2233,7 +2265,8 @@ func Test_ServerOpenTSDBIntegration(t *testing.T) {
|
|||
expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["%s",10]]}]}]}`, now.Format(time.RFC3339Nano))
|
||||
|
||||
// query and wait for results
|
||||
got, ok, _ := queryAndWait(t, nodes, "opentsdb", `select * from "opentsdb"."raw".cpu`, expected, "", openTSDBTestTimeout)
|
||||
v := url.Values{"db": []string{"opentsdb"}}
|
||||
got, ok, _ := queryAndWait(t, nodes, `select * from "opentsdb"."raw".cpu`, expected, "", openTSDBTestTimeout, v)
|
||||
if !ok {
|
||||
t.Fatalf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
|
||||
}
|
||||
|
@ -2292,7 +2325,8 @@ func Test_ServerOpenTSDBIntegration_WithTags(t *testing.T) {
|
|||
expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"tag1":"val3","tag2":"val4"},"columns":["time","value"],"values":[["%s",20]]}]}]}`, now.Format(time.RFC3339Nano))
|
||||
|
||||
// query and wait for results
|
||||
got, ok, _ := queryAndWait(t, nodes, "opentsdb", `select * from "opentsdb"."raw".cpu where tag1='val3'`, expected, "", openTSDBTestTimeout)
|
||||
v := url.Values{"db": []string{"opentsdb"}}
|
||||
got, ok, _ := queryAndWait(t, nodes, `select * from "opentsdb"."raw".cpu where tag1='val3'`, expected, "", openTSDBTestTimeout, v)
|
||||
if !ok {
|
||||
t.Fatalf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
|
||||
}
|
||||
|
@ -2349,7 +2383,8 @@ func Test_ServerOpenTSDBIntegration_BadData(t *testing.T) {
|
|||
expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"tag1":"val1","tag2":"val2"},"columns":["time","value"],"values":[["%s",10]]}]}]}`, now.Format(time.RFC3339Nano))
|
||||
|
||||
// query and wait for results
|
||||
got, ok, _ := queryAndWait(t, nodes, "opentsdb", `select * from "opentsdb"."raw".cpu`, expected, "", openTSDBTestTimeout)
|
||||
v := url.Values{"db": []string{"opentsdb"}}
|
||||
got, ok, _ := queryAndWait(t, nodes, `select * from "opentsdb"."raw".cpu`, expected, "", openTSDBTestTimeout, v)
|
||||
if !ok {
|
||||
t.Fatalf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
|
||||
}
|
||||
|
@ -2383,9 +2418,9 @@ func Test_ServerOpenTSDBIntegrationHTTPSingle(t *testing.T) {
|
|||
ts := fmt.Sprintf("%d", now.Unix())
|
||||
data := bytes.NewBufferString(`{"metric":"cpu","timestamp":` + ts + `,"value":10,"tags":{"tag1":"val1","tag2":"val2"}}`)
|
||||
host := nodes[0].node.OpenTSDBServer.Addr().String()
|
||||
url := "http://" + host + "/api/put"
|
||||
u := "http://" + host + "/api/put"
|
||||
|
||||
resp, err := http.Post(url, "text/json", data)
|
||||
resp, err := http.Post(u, "text/json", data)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
return
|
||||
|
@ -2395,7 +2430,8 @@ func Test_ServerOpenTSDBIntegrationHTTPSingle(t *testing.T) {
|
|||
expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"tag1":"val1","tag2":"val2"},"columns":["time","value"],"values":[["%s",10]]}]}]}`, now.Format(time.RFC3339Nano))
|
||||
|
||||
// query and wait for results
|
||||
got, ok, _ := queryAndWait(t, nodes, "opentsdb", `select * from "opentsdb"."raw".cpu`, expected, "", openTSDBTestTimeout)
|
||||
v := url.Values{"db": []string{"opentsdb"}}
|
||||
got, ok, _ := queryAndWait(t, nodes, `select * from "opentsdb"."raw".cpu`, expected, "", openTSDBTestTimeout, v)
|
||||
if !ok {
|
||||
t.Fatalf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
|
||||
}
|
||||
|
@ -2429,9 +2465,9 @@ func Test_ServerOpenTSDBIntegrationHTTPMulti(t *testing.T) {
|
|||
ts := fmt.Sprintf("%d", now.Unix())
|
||||
data := bytes.NewBufferString(`[{"metric":"cpu","timestamp":` + ts + `,"value":10,"tags":{"tag1":"val1","tag2":"val2"}},{"metric":"cpu","timestamp":` + ts + `,"value":20,"tags":{"tag1":"val3","tag2":"val4"}}]`)
|
||||
host := nodes[0].node.OpenTSDBServer.Addr().String()
|
||||
url := "http://" + host + "/api/put"
|
||||
u := "http://" + host + "/api/put"
|
||||
|
||||
resp, err := http.Post(url, "text/json", data)
|
||||
resp, err := http.Post(u, "text/json", data)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
return
|
||||
|
@ -2443,7 +2479,8 @@ func Test_ServerOpenTSDBIntegrationHTTPMulti(t *testing.T) {
|
|||
expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"tag1":"val1","tag2":"val2"},"columns":["time","value"],"values":[["%s",10]]},{"name":"cpu","tags":{"tag1":"val3","tag2":"val4"},"columns":["time","value"],"values":[["%s",20]]}]}]}`, expts, expts)
|
||||
|
||||
// query and wait for results
|
||||
got, ok, _ := queryAndWait(t, nodes, "opentsdb", `select * from "opentsdb"."raw".cpu`, expected, "", openTSDBTestTimeout)
|
||||
v := url.Values{"db": []string{"opentsdb"}}
|
||||
got, ok, _ := queryAndWait(t, nodes, `select * from "opentsdb"."raw".cpu`, expected, "", openTSDBTestTimeout, v)
|
||||
if !ok {
|
||||
t.Fatalf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
|
||||
}
|
||||
|
|
|
@ -201,6 +201,8 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *influ
|
|||
return
|
||||
}
|
||||
|
||||
epoch := strings.TrimSpace(q.Get("epoch"))
|
||||
|
||||
p := influxql.NewParser(strings.NewReader(qp))
|
||||
db := q.Get("db")
|
||||
|
||||
|
@ -260,6 +262,11 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *influ
|
|||
continue
|
||||
}
|
||||
|
||||
// if requested, convert result timestamps to epoch
|
||||
if epoch != "" {
|
||||
convertToEpoch(r, epoch)
|
||||
}
|
||||
|
||||
// if chunked we write out this result and flush
|
||||
if chunked {
|
||||
res.Results = []*influxdb.Result{r}
|
||||
|
@ -288,6 +295,28 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *influ
|
|||
}
|
||||
}
|
||||
|
||||
// convertToEpoch converts result timestamps from time.Time to the specified epoch.
|
||||
func convertToEpoch(r *influxdb.Result, epoch string) {
|
||||
divisor := int64(1)
|
||||
|
||||
switch epoch {
|
||||
case "us":
|
||||
divisor = 1000
|
||||
case "ms":
|
||||
divisor = 1000000
|
||||
case "s":
|
||||
divisor = 1000000000
|
||||
}
|
||||
|
||||
for _, s := range r.Series {
|
||||
for _, v := range s.Values {
|
||||
if ts, ok := v[0].(time.Time); ok {
|
||||
v[0] = ts.UnixNano() / divisor
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// marshalPretty will marshal the interface to json either pretty printed or not
|
||||
func marshalPretty(r interface{}, pretty bool) []byte {
|
||||
var b []byte
|
||||
|
|
|
@ -939,6 +939,26 @@ type Row struct {
|
|||
Err error `json:"err,omitempty"`
|
||||
}
|
||||
|
||||
// type RowMarshalerJSON struct {
|
||||
// epoch string
|
||||
// }
|
||||
|
||||
// func (r *RowMarshalerJSON) MarshalJSON(v interface{}) ([]byte, error) {
|
||||
// row := []struct{
|
||||
// Name string `json:"name,omitempty"`
|
||||
// Tags map[string]string `json:"tags,omitempty"`
|
||||
// Columns []string `json:"columns"`
|
||||
// Values [][]interface{} `json:"values,omitempty"`
|
||||
// Err error `json:"err,omitempty"`
|
||||
// }{
|
||||
// Name: r.Name,
|
||||
// Tags: = r.Tags,
|
||||
// Columns: r.Columns,
|
||||
// Err: r.Err,
|
||||
// }
|
||||
|
||||
// }
|
||||
|
||||
// tagsHash returns a hash of tag key/value pairs.
|
||||
func (r *Row) tagsHash() uint64 {
|
||||
h := fnv.New64a()
|
||||
|
|
Loading…
Reference in New Issue