Break out write and query

pull/1547/head
Philip O'Toole 2015-02-09 19:38:06 -08:00
parent 221954fbdc
commit 8bbdfd237a
1 changed files with 63 additions and 31 deletions

View File

@ -202,18 +202,13 @@ func createRetentionPolicy(t *testing.T, testName string, nodes cluster, databas
}
}
// simpleWriteAndQuery creates a simple database, retention policy, and replicates
// the data across all nodes. It then ensures a series of writes and queries are OK.
func simpleWriteAndQuery(t *testing.T, testname string, nodes cluster) {
now := time.Now().UTC()
// writes writes the provided data to the cluster. It verfies that a 200 OK is returned by the server.
func write(t *testing.T, testname string, nodes cluster, data string) {
t.Logf("Test %s: writing data", testname)
serverURL := nodes[0].url
var results client.Results
// Write Data
t.Log("Write data")
u := urlFor(serverURL, "write", url.Values{})
buf := []byte(fmt.Sprintf(`{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": %d, "precision":"n","values": {"value": 100}}]}`, now.UnixNano()))
buf := []byte(data)
t.Logf("Writing raw data: %s", string(buf))
resp, err := http.Post(u.String(), "application/json", bytes.NewReader(buf))
if err != nil {
@ -226,11 +221,18 @@ func simpleWriteAndQuery(t *testing.T, testname string, nodes cluster) {
// Need some time for server to get consensus and write data
// TODO corylanou query the status endpoint for the server and wait for the index to update to know the write was applied
time.Sleep(time.Duration(len(nodes)) * time.Second)
}
// simpleQuery creates a simple database, retention policy, and replicates
// the data across all nodes. It then ensures a series of writes and queries are OK.
func simpleQuery(t *testing.T, testname string, nodes cluster, expected client.Results) {
serverURL := nodes[0].url
var results client.Results
// Query the data exists
t.Log("Query data")
u = urlFor(serverURL, "query", url.Values{"q": []string{`select value from "foo"."bar".cpu`}, "db": []string{"foo"}})
resp, err = http.Get(u.String())
u := urlFor(serverURL, "query", url.Values{"q": []string{`select value from "foo"."bar".cpu`}, "db": []string{"foo"}})
resp, err := http.Get(u.String())
if err != nil {
t.Fatalf("Couldn't query databases: %s", err)
}
@ -257,6 +259,23 @@ func simpleWriteAndQuery(t *testing.T, testname string, nodes cluster) {
t.Fatalf("query databases failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode)
}
if !reflect.DeepEqual(results, expected) {
t.Logf("Expected: %#v\n", expected)
t.Logf("Actual: %#v\n", results)
t.Fatalf("query databases failed. Unexpected results.")
}
}
func Test_ServerSingleIntegration(t *testing.T) {
nNodes := 1
basePort := 8090
testName := "single node"
now := time.Now().UTC()
nodes := createCombinedNodeCluster(t, "single node", nNodes, basePort)
createDatabase(t, testName, nodes, "foo")
createRetentionPolicy(t, testName, nodes, "foo", "bar")
write(t, testName, nodes, fmt.Sprintf(`{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": %d, "precision":"n","values": {"value": 100}}]}`, now.UnixNano()))
expectedResults := client.Results{
Results: []client.Result{
{Rows: []influxql.Row{
@ -270,24 +289,7 @@ func simpleWriteAndQuery(t *testing.T, testname string, nodes cluster) {
},
}
if !reflect.DeepEqual(results, expectedResults) {
t.Logf("Expected:\n")
t.Logf("%#v\n", expectedResults)
t.Logf("Actual:\n")
t.Logf("%#v\n", results)
t.Fatalf("query databases failed. Unexpected results.")
}
}
func Test_ServerSingleIntegration(t *testing.T) {
nNodes := 1
basePort := 8090
testName := "single node"
nodes := createCombinedNodeCluster(t, "single node", nNodes, basePort)
createDatabase(t, testName, nodes, "foo")
createRetentionPolicy(t, testName, nodes, "foo", "bar")
simpleWriteAndQuery(t, testName, nodes)
simpleQuery(t, testName, nodes, expectedResults)
}
func Test_Server3NodeIntegration(t *testing.T) {
@ -297,11 +299,26 @@ func Test_Server3NodeIntegration(t *testing.T) {
nNodes := 3
basePort := 8190
testName := "3 node"
now := time.Now().UTC()
nodes := createCombinedNodeCluster(t, testName, nNodes, basePort)
createDatabase(t, testName, nodes, "foo")
createRetentionPolicy(t, testName, nodes, "foo", "bar")
simpleWriteAndQuery(t, testName, nodes)
write(t, testName, nodes, fmt.Sprintf(`{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": %d, "precision":"n","values": {"value": 100}}]}`, now.UnixNano()))
expectedResults := client.Results{
Results: []client.Result{
{Rows: []influxql.Row{
{
Name: "cpu",
Columns: []string{"time", "value"},
Values: [][]interface{}{
[]interface{}{now.Format(time.RFC3339Nano), json.Number("100")},
},
}}},
},
}
simpleQuery(t, testName, nodes, expectedResults)
}
func Test_Server5NodeIntegration(t *testing.T) {
@ -311,11 +328,26 @@ func Test_Server5NodeIntegration(t *testing.T) {
nNodes := 5
basePort := 8290
testName := "5 node"
now := time.Now().UTC()
nodes := createCombinedNodeCluster(t, testName, nNodes, basePort)
createDatabase(t, testName, nodes, "foo")
createRetentionPolicy(t, testName, nodes, "foo", "bar")
simpleWriteAndQuery(t, testName, nodes)
write(t, testName, nodes, fmt.Sprintf(`{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": %d, "precision":"n","values": {"value": 100}}]}`, now.UnixNano()))
expectedResults := client.Results{
Results: []client.Result{
{Rows: []influxql.Row{
{
Name: "cpu",
Columns: []string{"time", "value"},
Values: [][]interface{}{
[]interface{}{now.Format(time.RFC3339Nano), json.Number("100")},
},
}}},
},
}
simpleQuery(t, testName, nodes, expectedResults)
}
func urlFor(u *url.URL, path string, params url.Values) *url.URL {