Return number of nodes that responded OK
parent
50b08ff73f
commit
e9ebf654e9
|
@ -194,7 +194,7 @@ func write(t *testing.T, node *TestNode, data string) {
|
|||
|
||||
// query executes the given query against all nodes in the cluster, and verifies no errors occured, and
|
||||
// ensures the returned data is as expected
|
||||
func query(t *testing.T, nodes Cluster, urlDb, query, expected, expectPattern string) (string, bool) {
|
||||
func query(t *testing.T, nodes Cluster, urlDb, query, expected, expectPattern string) (string, bool, int) {
|
||||
v := url.Values{"q": []string{query}}
|
||||
if urlDb != "" {
|
||||
v.Set("db", urlDb)
|
||||
|
@ -202,7 +202,7 @@ func query(t *testing.T, nodes Cluster, urlDb, query, expected, expectPattern st
|
|||
|
||||
var actual string
|
||||
// Query the data exists
|
||||
for _, n := range nodes {
|
||||
for i, n := range nodes {
|
||||
u := urlFor(n.url, "query", v)
|
||||
resp, err := http.Get(u.String())
|
||||
if err != nil {
|
||||
|
@ -217,21 +217,21 @@ func query(t *testing.T, nodes Cluster, urlDb, query, expected, expectPattern st
|
|||
actual = string(body)
|
||||
|
||||
if expected != "" && expected != actual {
|
||||
return actual, false
|
||||
return actual, false, i
|
||||
} else if expectPattern != "" {
|
||||
re := regexp.MustCompile(expectPattern)
|
||||
if !re.MatchString(actual) {
|
||||
return actual, false
|
||||
return actual, false, i
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return actual, true
|
||||
return actual, true, len(nodes)
|
||||
}
|
||||
|
||||
// queryAndWait executes the given query against all nodes in the cluster, and verifies no errors occured, and
|
||||
// ensures the returned data is as expected until the timeout occurs
|
||||
func queryAndWait(t *testing.T, nodes Cluster, urlDb, q, expected, expectPattern string, timeout time.Duration) (string, bool) {
|
||||
func queryAndWait(t *testing.T, nodes Cluster, urlDb, q, expected, expectPattern string, timeout time.Duration) (string, bool, int) {
|
||||
v := url.Values{"q": []string{q}}
|
||||
if urlDb != "" {
|
||||
v.Set("db", urlDb)
|
||||
|
@ -256,14 +256,18 @@ func queryAndWait(t *testing.T, nodes Cluster, urlDb, q, expected, expectPattern
|
|||
defer timer.Stop()
|
||||
}
|
||||
|
||||
nQueriedOK := 0
|
||||
for {
|
||||
got, ok := query(t, nodes, urlDb, q, expected, expectPattern)
|
||||
got, ok, n := query(t, nodes, urlDb, q, expected, expectPattern)
|
||||
if n > nQueriedOK {
|
||||
nQueriedOK = n
|
||||
}
|
||||
if ok {
|
||||
return got, ok
|
||||
return got, ok, nQueriedOK
|
||||
}
|
||||
select {
|
||||
case <-done:
|
||||
return got, false
|
||||
return got, false, nQueriedOK
|
||||
case <-time.After(sleep):
|
||||
}
|
||||
}
|
||||
|
@ -309,9 +313,9 @@ func runTest_rawDataReturnsInOrder(t *testing.T, testName string, nodes Cluster,
|
|||
}
|
||||
|
||||
expected = fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","count"],"values":[["1970-01-01T00:00:00Z",%d]]}]}]}`, numPoints-1)
|
||||
got, ok := queryAndWait(t, nodes, database, `SELECT count(value) FROM cpu`, expected, "", 120*time.Second)
|
||||
got, ok, nOK := queryAndWait(t, nodes, database, `SELECT count(value) FROM cpu`, expected, "", 120*time.Second)
|
||||
if !ok {
|
||||
t.Errorf("test %s:rawDataReturnsInOrder failed, SELECT count() query returned unexpected data\nexp: %s\n, got: %s", testName, expected, got)
|
||||
t.Errorf("test %s:rawDataReturnsInOrder failed, SELECT count() query returned unexpected data\nexp: %s\n, got: %s\n%d nodes responded correctly", testName, expected, got, nOK)
|
||||
}
|
||||
|
||||
// Create expected JSON string dynamically.
|
||||
|
@ -320,9 +324,9 @@ func runTest_rawDataReturnsInOrder(t *testing.T, testName string, nodes Cluster,
|
|||
expectedValues = append(expectedValues, fmt.Sprintf(`["%s",%d]`, time.Unix(int64(i), int64(0)).UTC().Format(time.RFC3339), i))
|
||||
}
|
||||
expected = fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[%s]}]}]}`, strings.Join(expectedValues, ","))
|
||||
got, ok = query(t, nodes, database, `SELECT value FROM cpu`, expected, "")
|
||||
got, ok, nOK = query(t, nodes, database, `SELECT value FROM cpu`, expected, "")
|
||||
if !ok {
|
||||
t.Errorf("test %s failed, SELECT query returned unexpected data\nexp: %s\ngot: %s", testName, expected, got)
|
||||
t.Errorf("test %s failed, SELECT query returned unexpected data\nexp: %s\ngot: %s\n%d nodes responded correctly", testName, expected, got, nOK)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -350,9 +354,9 @@ func runTests_Errors(t *testing.T, nodes Cluster) {
|
|||
}
|
||||
|
||||
if tt.query != "" {
|
||||
got, ok := query(t, nodes, "", tt.query, tt.expected, "")
|
||||
got, ok, nOK := query(t, nodes, "", tt.query, tt.expected, "")
|
||||
if !ok {
|
||||
t.Errorf("Test '%s' failed, expected: %s, got: %s", tt.name, tt.expected, got)
|
||||
t.Errorf("Test '%s' failed, expected: %s, got: %s, %d nodes responded correctly", tt.name, tt.expected, got, nOK)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1363,12 +1367,12 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
|
|||
urlDb = tt.queryDb
|
||||
}
|
||||
qry := rewriteDbRp(tt.query, database, retention)
|
||||
got, ok := queryAndWait(t, qNodes, rewriteDbRp(urlDb, database, retention), qry, rewriteDbRp(tt.expected, database, retention), rewriteDbRp(tt.expectPattern, database, retention), 10*time.Second)
|
||||
got, ok, nOK := queryAndWait(t, qNodes, rewriteDbRp(urlDb, database, retention), qry, rewriteDbRp(tt.expected, database, retention), rewriteDbRp(tt.expectPattern, database, retention), 10*time.Second)
|
||||
if !ok {
|
||||
if tt.expected != "" {
|
||||
t.Errorf("Test #%d: \"%s:%s\" failed\n query: %s\n exp: %s\n got: %s\n", i, testName, name, qry, rewriteDbRp(tt.expected, database, retention), got)
|
||||
t.Errorf("Test #%d: \"%s:%s\" failed\n query: %s\n exp: %s\n got: %s\n%d nodes responded correctly", i, testName, name, qry, rewriteDbRp(tt.expected, database, retention), got, nOK)
|
||||
} else {
|
||||
t.Errorf("Test #%d: \"%s:%s\" failed\n query: %s\n exp: %s\n got: %s\n", i, testName, name, qry, rewriteDbRp(tt.expectPattern, database, retention), got)
|
||||
t.Errorf("Test #%d: \"%s:%s\" failed\n query: %s\n exp: %s\n got: %s\n%d nodes responded correctly", i, testName, name, qry, rewriteDbRp(tt.expectPattern, database, retention), got, nOK)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1640,7 +1644,7 @@ func Test_ServerSingleGraphiteIntegration_Default(t *testing.T) {
|
|||
expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","cpu"],"values":[["%s",23.456]]}]}]}`, now.Format(time.RFC3339Nano))
|
||||
|
||||
// query and wait for results
|
||||
got, ok := queryAndWait(t, nodes, "graphite", `select * from "graphite"."raw".cpu`, expected, "", 2*time.Second)
|
||||
got, ok, _ := queryAndWait(t, nodes, "graphite", `select * from "graphite"."raw".cpu`, expected, "", 2*time.Second)
|
||||
if !ok {
|
||||
t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
|
||||
}
|
||||
|
@ -1698,7 +1702,7 @@ func Test_ServerSingleGraphiteIntegration_FractionalTime(t *testing.T) {
|
|||
expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","cpu"],"values":[["%s",23.456]]}]}]}`, now.Format(time.RFC3339Nano))
|
||||
|
||||
// query and wait for results
|
||||
got, ok := queryAndWait(t, nodes, "graphite", `select * from "graphite"."raw".cpu`, expected, "", 2*time.Second)
|
||||
got, ok, _ := queryAndWait(t, nodes, "graphite", `select * from "graphite"."raw".cpu`, expected, "", 2*time.Second)
|
||||
if !ok {
|
||||
t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
|
||||
}
|
||||
|
@ -1755,7 +1759,7 @@ func Test_ServerSingleGraphiteIntegration_ZeroDataPoint(t *testing.T) {
|
|||
expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","cpu"],"values":[["%s",0]]}]}]}`, now.Format(time.RFC3339Nano))
|
||||
|
||||
// query and wait for results
|
||||
got, ok := queryAndWait(t, nodes, "graphite", `select * from "graphite"."raw".cpu`, expected, "", 2*time.Second)
|
||||
got, ok, _ := queryAndWait(t, nodes, "graphite", `select * from "graphite"."raw".cpu`, expected, "", 2*time.Second)
|
||||
if !ok {
|
||||
t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
|
||||
}
|
||||
|
@ -1796,14 +1800,14 @@ func Test_ServerSingleGraphiteIntegration_NoDatabase(t *testing.T) {
|
|||
|
||||
// Need to wait for the database to be created
|
||||
expected := `{"results":[{"series":[{"name":"databases","columns":["name"],"values":[["graphite"]]}]}]}`
|
||||
got, ok := queryAndWait(t, nodes, "graphite", `show databases`, expected, "", 2*time.Second)
|
||||
got, ok, _ := queryAndWait(t, nodes, "graphite", `show databases`, expected, "", 2*time.Second)
|
||||
if !ok {
|
||||
t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
|
||||
}
|
||||
|
||||
// Need to wait for the database to get a default retention policy
|
||||
expected = `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["default","0",1,true]]}]}]}`
|
||||
got, ok = queryAndWait(t, nodes, "graphite", `show retention policies graphite`, expected, "", 2*time.Second)
|
||||
got, ok, _ = queryAndWait(t, nodes, "graphite", `show retention policies graphite`, expected, "", 2*time.Second)
|
||||
if !ok {
|
||||
t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
|
||||
}
|
||||
|
@ -1821,7 +1825,7 @@ func Test_ServerSingleGraphiteIntegration_NoDatabase(t *testing.T) {
|
|||
|
||||
// Wait for data to show up
|
||||
expected = fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","cpu"],"values":[["%s",23.456]]}]}]}`, now.Format(time.RFC3339Nano))
|
||||
got, ok = queryAndWait(t, nodes, "graphite", `select * from "graphite"."default".cpu`, expected, "", 2*time.Second)
|
||||
got, ok, _ = queryAndWait(t, nodes, "graphite", `select * from "graphite"."default".cpu`, expected, "", 2*time.Second)
|
||||
if !ok {
|
||||
t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
|
||||
}
|
||||
|
@ -1875,7 +1879,7 @@ func Test_ServerOpenTSDBIntegration(t *testing.T) {
|
|||
expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["%s",10]]}]}]}`, now.Format(time.RFC3339Nano))
|
||||
|
||||
// query and wait for results
|
||||
got, ok := queryAndWait(t, nodes, "opentsdb", `select * from "opentsdb"."raw".cpu`, expected, "", 2*time.Second)
|
||||
got, ok, _ := queryAndWait(t, nodes, "opentsdb", `select * from "opentsdb"."raw".cpu`, expected, "", 2*time.Second)
|
||||
if !ok {
|
||||
t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
|
||||
}
|
||||
|
@ -1934,7 +1938,7 @@ func Test_ServerOpenTSDBIntegration_WithTags(t *testing.T) {
|
|||
expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["%s",20]]}]}]}`, now.Format(time.RFC3339Nano))
|
||||
|
||||
// query and wait for results
|
||||
got, ok := queryAndWait(t, nodes, "opentsdb", `select * from "opentsdb"."raw".cpu where tag1='val3'`, expected, "", 2*time.Second)
|
||||
got, ok, _ := queryAndWait(t, nodes, "opentsdb", `select * from "opentsdb"."raw".cpu where tag1='val3'`, expected, "", 2*time.Second)
|
||||
if !ok {
|
||||
t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
|
||||
}
|
||||
|
@ -1991,7 +1995,7 @@ func Test_ServerOpenTSDBIntegration_BadData(t *testing.T) {
|
|||
expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["%s",10]]}]}]}`, now.Format(time.RFC3339Nano))
|
||||
|
||||
// query and wait for results
|
||||
got, ok := queryAndWait(t, nodes, "opentsdb", `select * from "opentsdb"."raw".cpu`, expected, "", 2*time.Second)
|
||||
got, ok, _ := queryAndWait(t, nodes, "opentsdb", `select * from "opentsdb"."raw".cpu`, expected, "", 2*time.Second)
|
||||
if !ok {
|
||||
t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue