From e11cbb5629fc056a1b780da4553fc636932b3538 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Fri, 20 Feb 2015 15:11:51 -0800 Subject: [PATCH 1/2] Add simple batching integration test Uses new code to quickly generate batches for test. --- cmd/influxd/server_integration_test.go | 44 ++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index f5ae9a6e88..465b117612 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "math/rand" "net/http" "net/url" "os" @@ -40,6 +41,35 @@ type node struct { // cluster represents a multi-node cluster. type cluster []node +// createBatch returns a JSON string, representing the request body for a batch write. The timestamp +// simply increases and the value is a random integer. +func createBatch(nPoints int, database, retention, measurement string, tags map[string]string) string { + type Point struct { + Name string `json:"name"` + Tags map[string]string `json:"tags"` + Timestamp int64 `json:"timestamp"` + Precision string `json:"precision"` + Values map[string]int `json:"values"` + } + type PointBatch struct { + Database string `json:"database"` + RetentionPolicy string `json:"retentionPolicy"` + Points []Point `json:"points"` + } + + rand.Seed(time.Now().UTC().UnixNano()) + points := make([]Point, 0) + for i := 0; i < nPoints; i++ { + values := map[string]int{"value": rand.Int()} + point := Point{Name: measurement, Tags: tags, Timestamp: time.Now().UTC().UnixNano(), Precision: "n", Values: values} + points = append(points, point) + } + batch := PointBatch{Database: database, RetentionPolicy: retention, Points: points} + + buf, _ := json.Marshal(batch) + return string(buf) +} + // createCombinedNodeCluster creates a cluster of nServers nodes, each of which // runs as both a Broker and Data node. If any part cluster creation fails, // the testing is marked as failed. @@ -410,3 +440,17 @@ func Test_Server5NodeIntegration(t *testing.T) { simpleQuery(t, testName, nodes[:1], `select value from "foo"."bar".cpu`, expectedResults) } + +func Test_ServerSingleLargeBatchIntegration(t *testing.T) { + if testing.Short() { + t.Skip() + } + nNodes := 1 + basePort := 8390 + testName := "single node large batch" + nodes := createCombinedNodeCluster(t, "single node large batch", nNodes, basePort) + + createDatabase(t, testName, nodes, "foo") + createRetentionPolicy(t, testName, nodes, "foo", "bar") + write(t, testName, nodes, createBatch(2, "foo", "bar", "cpu", map[string]string{"host": "server01"})) +} From e9acab4139bc2af632e4e99f5042d39311d7d608 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 21 Feb 2015 11:54:05 -0800 Subject: [PATCH 2/2] Add single-node batch test, for a large batch --- cmd/influxd/server_integration_test.go | 60 +++++++++++++++++++++++++- 1 file changed, 58 insertions(+), 2 deletions(-) diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index 465b117612..16b1743d67 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -23,6 +23,12 @@ import ( main "github.com/influxdb/influxdb/cmd/influxd" ) +const ( + // Use a prime batch size, so that internal batching code, which most likely + // uses nice round batches, has to deal with leftover. + batchSize = 4217 +) + // urlFor returns a URL with the path and query params correctly appended and set. func urlFor(u *url.URL, path string, params url.Values) *url.URL { u.Path = path @@ -247,7 +253,6 @@ func write(t *testing.T, testname string, nodes cluster, data string) { u := urlFor(serverURL, "write", url.Values{}) buf := []byte(data) - t.Logf("Writing raw data: %s", string(buf)) resp, err := http.Post(u.String(), "application/json", bytes.NewReader(buf)) if err != nil { t.Fatalf("Couldn't write data: %s", err) @@ -305,6 +310,56 @@ func simpleQuery(t *testing.T, testname string, nodes cluster, query string, exp } } +// simpleCountQuery executes the given query against all nodes in the cluster, and verify the +// the count for the given field is as expected. +func simpleCountQuery(t *testing.T, testname string, nodes cluster, query, field string, expected int64) { + var results client.Results + + // Query the data exists + for _, n := range nodes { + t.Logf("Test name %s: query data on node %s", testname, n.url) + u := urlFor(n.url, "query", url.Values{"q": []string{query}, "db": []string{"foo"}}) + resp, err := http.Get(u.String()) + if err != nil { + t.Fatalf("Couldn't query databases: %s", err) + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Couldn't read body of response: %s", err) + } + t.Logf("resp.Body: %s\n", string(body)) + + dec := json.NewDecoder(bytes.NewReader(body)) + dec.UseNumber() + err = dec.Decode(&results) + if err != nil { + t.Fatalf("Couldn't decode results: %v", err) + } + + if results.Error() != nil { + t.Logf("results.Error(): %q", results.Error().Error()) + } + + if resp.StatusCode != http.StatusOK { + t.Fatalf("query databases failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode) + } + + j, ok := results.Results[0].Rows[0].Values[0][1].(json.Number) + if !ok { + t.Fatalf("count is not a JSON number") + } + count, err := j.Int64() + if err != nil { + t.Fatalf("failed to convert count to int64") + } + if count != expected { + t.Fatalf("count value is wrong, expected %d, go %d", expected, count) + } + } +} + func Test_ServerSingleIntegration(t *testing.T) { nNodes := 1 basePort := 8090 @@ -452,5 +507,6 @@ func Test_ServerSingleLargeBatchIntegration(t *testing.T) { createDatabase(t, testName, nodes, "foo") createRetentionPolicy(t, testName, nodes, "foo", "bar") - write(t, testName, nodes, createBatch(2, "foo", "bar", "cpu", map[string]string{"host": "server01"})) + write(t, testName, nodes, createBatch(batchSize, "foo", "bar", "cpu", map[string]string{"host": "server01"})) + simpleCountQuery(t, "single node large batch", nodes, `select count(value) from "foo"."bar".cpu`, "value", batchSize) }