Merge pull request #1693 from influxdb/speed-up-integration-testing

Make integration tests faster
pull/1694/head
Cory LaNou 2015-02-23 13:07:57 -07:00
commit 1a7f0681b0
2 changed files with 57 additions and 15 deletions

View File

@ -12,6 +12,7 @@ import (
"path/filepath"
"reflect"
"strconv"
"sync"
"testing"
"time"
@ -247,8 +248,8 @@ func createRetentionPolicy(t *testing.T, testName string, nodes cluster, databas
}
// writes writes the provided data to the cluster. It verfies that a 200 OK is returned by the server.
func write(t *testing.T, testname string, nodes cluster, data string) {
t.Logf("Test %s: writing data", testname)
func write(t *testing.T, testName string, nodes cluster, data string) {
t.Logf("Test %s: writing data", testName)
serverURL := nodes[0].url
u := urlFor(serverURL, "write", url.Values{})
@ -261,19 +262,22 @@ func write(t *testing.T, testname string, nodes cluster, data string) {
t.Fatalf("Write to database failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode)
}
// Need some time for server to get consensus and write data
// TODO corylanou query the status endpoint for the server and wait for the index to update to know the write was applied
time.Sleep(time.Duration(time.Second))
index, err := strconv.ParseInt(resp.Header.Get("X-InfluxDB-Index"), 10, 64)
if err != nil {
t.Fatalf("Couldn't get index. header: %s, err: %s.", resp.Header.Get("X-InfluxDB-Index"), err)
}
wait(t, testName, nodes, index)
t.Log("Finished writing and waiting")
}
// simpleQuery executes the given query against all nodes in the cluster, and verify the
// returned results are as expected.
func simpleQuery(t *testing.T, testname string, nodes cluster, query string, expected client.Results) {
func simpleQuery(t *testing.T, testName string, nodes cluster, query string, expected client.Results) {
var results client.Results
// Query the data exists
for _, n := range nodes {
t.Logf("Test name %s: query data on node %s", testname, n.url)
t.Logf("Test name %s: query data on node %s", testName, n.url)
u := urlFor(n.url, "query", url.Values{"q": []string{query}, "db": []string{"foo"}})
resp, err := http.Get(u.String())
if err != nil {
@ -310,14 +314,50 @@ func simpleQuery(t *testing.T, testname string, nodes cluster, query string, exp
}
}
func wait(t *testing.T, testName string, nodes cluster, index int64) {
// Wait for the index to sync up
var wg sync.WaitGroup
wg.Add(len(nodes))
for _, n := range nodes {
go func(t *testing.T, testName string, nodes cluster, u *url.URL, index int64) {
u = urlFor(u, fmt.Sprintf("wait/%d", index), nil)
t.Logf("Test name %s: wait on node %s for index %d", testName, u, index)
resp, err := http.Get(u.String())
if err != nil {
t.Fatalf("Couldn't wait: %s", err)
}
if resp.StatusCode != http.StatusOK {
t.Fatalf("query databases failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Couldn't read body of response: %s", err)
}
t.Logf("resp.Body: %s\n", string(body))
i, _ := strconv.Atoi(string(body))
if i == 0 {
t.Fatalf("Unexpected body: %s", string(body))
}
wg.Done()
}(t, testName, nodes, n.url, index)
}
wg.Wait()
}
// simpleCountQuery executes the given query against all nodes in the cluster, and verify the
// the count for the given field is as expected.
func simpleCountQuery(t *testing.T, testname string, nodes cluster, query, field string, expected int64) {
func simpleCountQuery(t *testing.T, testName string, nodes cluster, query, field string, expected int64) {
var results client.Results
// Query the data exists
for _, n := range nodes {
t.Logf("Test name %s: query data on node %s", testname, n.url)
t.Logf("Test name %s: query data on node %s", testName, n.url)
u := urlFor(n.url, "query", url.Values{"q": []string{query}, "db": []string{"foo"}})
resp, err := http.Get(u.String())
if err != nil {
@ -402,7 +442,6 @@ func Test_ServerSingleIntegration(t *testing.T) {
}}},
},
}
simpleQuery(t, testName, nodes[:1], `select value from "foo"."bar".cpu`, expectedResults)
}

View File

@ -6,6 +6,7 @@ import (
"fmt"
"io"
"log"
"math"
"net/http"
"net/url"
"os"
@ -212,9 +213,11 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influ
return
}
if _, err := h.server.WriteSeries(bp.Database, bp.RetentionPolicy, points); err != nil {
if index, err := h.server.WriteSeries(bp.Database, bp.RetentionPolicy, points); err != nil {
writeError(influxdb.Result{Err: err}, http.StatusInternalServerError)
return
} else {
w.Header().Add("X-InfluxDB-Index", fmt.Sprintf("%d", index))
}
}
@ -270,7 +273,7 @@ func (h *Handler) serveIndex(w http.ResponseWriter, r *http.Request) {
// Takes optional parameters:
// index - If specified, will poll for index before returning
// timeout (optional) - time in milliseconds to wait until index is met before erring out
// default timeout if not specified is 100 milliseconds
// default timeout if not specified really big (max int64)
func (h *Handler) serveWait(w http.ResponseWriter, r *http.Request) {
index, _ := strconv.ParseUint(r.URL.Query().Get(":index"), 10, 64)
timeout, _ := strconv.Atoi(r.URL.Query().Get("timeout"))
@ -282,7 +285,7 @@ func (h *Handler) serveWait(w http.ResponseWriter, r *http.Request) {
var d time.Duration
if timeout == 0 {
d = 100 * time.Millisecond
d = math.MaxInt64
} else {
d = time.Duration(timeout) * time.Millisecond
}
@ -312,7 +315,7 @@ func (h *Handler) pollForIndex(index uint64, timeout time.Duration) error {
select {
case <-done:
return nil
case <-time.Tick(timeout):
case <-time.After(timeout):
return fmt.Errorf("timed out")
}
}
@ -532,7 +535,7 @@ func gzipFilter(inner http.Handler) http.Handler {
// and adds the X-INFLUXBD-VERSION header to outgoing responses.
func versionHeader(inner http.Handler, version string) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Add("X-Influxdb-Version", version)
w.Header().Add("X-InfluxDB-Version", version)
inner.ServeHTTP(w, r)
})
}