Merge pull request #1669 from influxdb/n_time_parsing

Add single-node large batch write test
pull/1659/head
Philip O'Toole 2015-02-21 12:01:37 -08:00
commit 3a999e95b0
1 changed files with 101 additions and 1 deletions

View File

@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"net/url"
"os"
@ -22,6 +23,12 @@ import (
main "github.com/influxdb/influxdb/cmd/influxd"
)
const (
// Use a prime batch size, so that internal batching code, which most likely
// uses nice round batches, has to deal with leftover.
batchSize = 4217
)
// urlFor returns a URL with the path and query params correctly appended and set.
func urlFor(u *url.URL, path string, params url.Values) *url.URL {
u.Path = path
@ -40,6 +47,35 @@ type node struct {
// cluster represents a multi-node cluster.
type cluster []node
// createBatch returns a JSON string, representing the request body for a batch write. The timestamp
// simply increases and the value is a random integer.
func createBatch(nPoints int, database, retention, measurement string, tags map[string]string) string {
type Point struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
Timestamp int64 `json:"timestamp"`
Precision string `json:"precision"`
Values map[string]int `json:"values"`
}
type PointBatch struct {
Database string `json:"database"`
RetentionPolicy string `json:"retentionPolicy"`
Points []Point `json:"points"`
}
rand.Seed(time.Now().UTC().UnixNano())
points := make([]Point, 0)
for i := 0; i < nPoints; i++ {
values := map[string]int{"value": rand.Int()}
point := Point{Name: measurement, Tags: tags, Timestamp: time.Now().UTC().UnixNano(), Precision: "n", Values: values}
points = append(points, point)
}
batch := PointBatch{Database: database, RetentionPolicy: retention, Points: points}
buf, _ := json.Marshal(batch)
return string(buf)
}
// createCombinedNodeCluster creates a cluster of nServers nodes, each of which
// runs as both a Broker and Data node. If any part cluster creation fails,
// the testing is marked as failed.
@ -217,7 +253,6 @@ func write(t *testing.T, testname string, nodes cluster, data string) {
u := urlFor(serverURL, "write", url.Values{})
buf := []byte(data)
t.Logf("Writing raw data: %s", string(buf))
resp, err := http.Post(u.String(), "application/json", bytes.NewReader(buf))
if err != nil {
t.Fatalf("Couldn't write data: %s", err)
@ -275,6 +310,56 @@ func simpleQuery(t *testing.T, testname string, nodes cluster, query string, exp
}
}
// simpleCountQuery executes the given query against all nodes in the cluster, and verify the
// the count for the given field is as expected.
func simpleCountQuery(t *testing.T, testname string, nodes cluster, query, field string, expected int64) {
var results client.Results
// Query the data exists
for _, n := range nodes {
t.Logf("Test name %s: query data on node %s", testname, n.url)
u := urlFor(n.url, "query", url.Values{"q": []string{query}, "db": []string{"foo"}})
resp, err := http.Get(u.String())
if err != nil {
t.Fatalf("Couldn't query databases: %s", err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Couldn't read body of response: %s", err)
}
t.Logf("resp.Body: %s\n", string(body))
dec := json.NewDecoder(bytes.NewReader(body))
dec.UseNumber()
err = dec.Decode(&results)
if err != nil {
t.Fatalf("Couldn't decode results: %v", err)
}
if results.Error() != nil {
t.Logf("results.Error(): %q", results.Error().Error())
}
if resp.StatusCode != http.StatusOK {
t.Fatalf("query databases failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode)
}
j, ok := results.Results[0].Rows[0].Values[0][1].(json.Number)
if !ok {
t.Fatalf("count is not a JSON number")
}
count, err := j.Int64()
if err != nil {
t.Fatalf("failed to convert count to int64")
}
if count != expected {
t.Fatalf("count value is wrong, expected %d, go %d", expected, count)
}
}
}
func Test_ServerSingleIntegration(t *testing.T) {
nNodes := 1
basePort := 8090
@ -410,3 +495,18 @@ func Test_Server5NodeIntegration(t *testing.T) {
simpleQuery(t, testName, nodes[:1], `select value from "foo"."bar".cpu`, expectedResults)
}
func Test_ServerSingleLargeBatchIntegration(t *testing.T) {
if testing.Short() {
t.Skip()
}
nNodes := 1
basePort := 8390
testName := "single node large batch"
nodes := createCombinedNodeCluster(t, "single node large batch", nNodes, basePort)
createDatabase(t, testName, nodes, "foo")
createRetentionPolicy(t, testName, nodes, "foo", "bar")
write(t, testName, nodes, createBatch(batchSize, "foo", "bar", "cpu", map[string]string{"host": "server01"}))
simpleCountQuery(t, "single node large batch", nodes, `select count(value) from "foo"."bar".cpu`, "value", batchSize)
}