diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index 5dcbc644c4..8725e8fd9e 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -202,18 +202,13 @@ func createRetentionPolicy(t *testing.T, testName string, nodes cluster, databas } } -// simpleWriteAndQuery creates a simple database, retention policy, and replicates -// the data across all nodes. It then ensures a series of writes and queries are OK. -func simpleWriteAndQuery(t *testing.T, testname string, nodes cluster) { - now := time.Now().UTC() +// writes writes the provided data to the cluster. It verfies that a 200 OK is returned by the server. +func write(t *testing.T, testname string, nodes cluster, data string) { + t.Logf("Test %s: writing data", testname) serverURL := nodes[0].url - var results client.Results - - // Write Data - t.Log("Write data") u := urlFor(serverURL, "write", url.Values{}) - buf := []byte(fmt.Sprintf(`{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": %d, "precision":"n","values": {"value": 100}}]}`, now.UnixNano())) + buf := []byte(data) t.Logf("Writing raw data: %s", string(buf)) resp, err := http.Post(u.String(), "application/json", bytes.NewReader(buf)) if err != nil { @@ -226,11 +221,18 @@ func simpleWriteAndQuery(t *testing.T, testname string, nodes cluster) { // Need some time for server to get consensus and write data // TODO corylanou query the status endpoint for the server and wait for the index to update to know the write was applied time.Sleep(time.Duration(len(nodes)) * time.Second) +} + +// simpleQuery creates a simple database, retention policy, and replicates +// the data across all nodes. It then ensures a series of writes and queries are OK. +func simpleQuery(t *testing.T, testname string, nodes cluster, expected client.Results) { + serverURL := nodes[0].url + var results client.Results // Query the data exists t.Log("Query data") - u = urlFor(serverURL, "query", url.Values{"q": []string{`select value from "foo"."bar".cpu`}, "db": []string{"foo"}}) - resp, err = http.Get(u.String()) + u := urlFor(serverURL, "query", url.Values{"q": []string{`select value from "foo"."bar".cpu`}, "db": []string{"foo"}}) + resp, err := http.Get(u.String()) if err != nil { t.Fatalf("Couldn't query databases: %s", err) } @@ -257,6 +259,23 @@ func simpleWriteAndQuery(t *testing.T, testname string, nodes cluster) { t.Fatalf("query databases failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode) } + if !reflect.DeepEqual(results, expected) { + t.Logf("Expected: %#v\n", expected) + t.Logf("Actual: %#v\n", results) + t.Fatalf("query databases failed. Unexpected results.") + } +} + +func Test_ServerSingleIntegration(t *testing.T) { + nNodes := 1 + basePort := 8090 + testName := "single node" + now := time.Now().UTC() + nodes := createCombinedNodeCluster(t, "single node", nNodes, basePort) + + createDatabase(t, testName, nodes, "foo") + createRetentionPolicy(t, testName, nodes, "foo", "bar") + write(t, testName, nodes, fmt.Sprintf(`{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": %d, "precision":"n","values": {"value": 100}}]}`, now.UnixNano())) expectedResults := client.Results{ Results: []client.Result{ {Rows: []influxql.Row{ @@ -270,24 +289,7 @@ func simpleWriteAndQuery(t *testing.T, testname string, nodes cluster) { }, } - if !reflect.DeepEqual(results, expectedResults) { - t.Logf("Expected:\n") - t.Logf("%#v\n", expectedResults) - t.Logf("Actual:\n") - t.Logf("%#v\n", results) - t.Fatalf("query databases failed. Unexpected results.") - } -} - -func Test_ServerSingleIntegration(t *testing.T) { - nNodes := 1 - basePort := 8090 - testName := "single node" - nodes := createCombinedNodeCluster(t, "single node", nNodes, basePort) - - createDatabase(t, testName, nodes, "foo") - createRetentionPolicy(t, testName, nodes, "foo", "bar") - simpleWriteAndQuery(t, testName, nodes) + simpleQuery(t, testName, nodes, expectedResults) } func Test_Server3NodeIntegration(t *testing.T) { @@ -297,11 +299,26 @@ func Test_Server3NodeIntegration(t *testing.T) { nNodes := 3 basePort := 8190 testName := "3 node" + now := time.Now().UTC() nodes := createCombinedNodeCluster(t, testName, nNodes, basePort) createDatabase(t, testName, nodes, "foo") createRetentionPolicy(t, testName, nodes, "foo", "bar") - simpleWriteAndQuery(t, testName, nodes) + write(t, testName, nodes, fmt.Sprintf(`{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": %d, "precision":"n","values": {"value": 100}}]}`, now.UnixNano())) + expectedResults := client.Results{ + Results: []client.Result{ + {Rows: []influxql.Row{ + { + Name: "cpu", + Columns: []string{"time", "value"}, + Values: [][]interface{}{ + []interface{}{now.Format(time.RFC3339Nano), json.Number("100")}, + }, + }}}, + }, + } + + simpleQuery(t, testName, nodes, expectedResults) } func Test_Server5NodeIntegration(t *testing.T) { @@ -311,11 +328,26 @@ func Test_Server5NodeIntegration(t *testing.T) { nNodes := 5 basePort := 8290 testName := "5 node" + now := time.Now().UTC() nodes := createCombinedNodeCluster(t, testName, nNodes, basePort) createDatabase(t, testName, nodes, "foo") createRetentionPolicy(t, testName, nodes, "foo", "bar") - simpleWriteAndQuery(t, testName, nodes) + write(t, testName, nodes, fmt.Sprintf(`{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": %d, "precision":"n","values": {"value": 100}}]}`, now.UnixNano())) + expectedResults := client.Results{ + Results: []client.Result{ + {Rows: []influxql.Row{ + { + Name: "cpu", + Columns: []string{"time", "value"}, + Values: [][]interface{}{ + []interface{}{now.Format(time.RFC3339Nano), json.Number("100")}, + }, + }}}, + }, + } + + simpleQuery(t, testName, nodes, expectedResults) } func urlFor(u *url.URL, path string, params url.Values) *url.URL {