diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index 27a833f901..81c80e5c34 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -2,23 +2,18 @@ package main_test import ( "bytes" - "encoding/json" "fmt" "io/ioutil" - "math/rand" "net/http" "net/url" "os" "path/filepath" - "reflect" "strconv" - "sync" + "strings" "testing" "time" "github.com/influxdb/influxdb" - "github.com/influxdb/influxdb/client" - "github.com/influxdb/influxdb/influxql" "github.com/influxdb/influxdb/messaging" main "github.com/influxdb/influxdb/cmd/influxd" @@ -30,6 +25,15 @@ const ( batchSize = 4217 ) +// tempfile returns a temporary path. +func tempfile() string { + f, _ := ioutil.TempFile("", "influxdb-") + path := f.Name() + f.Close() + os.Remove(path) + return path +} + // urlFor returns a URL with the path and query params correctly appended and set. func urlFor(u *url.URL, path string, params url.Values) *url.URL { v, _ := url.Parse(u.String()) @@ -38,60 +42,36 @@ func urlFor(u *url.URL, path string, params url.Values) *url.URL { return v } -// node represents a node under test, which is both a broker and data node. -type node struct { +// rewriteDbRp returns a copy of old with occurrences of %DB% with the given database, +// and occurences of %RP with the given retention +func rewriteDbRp(old, database, retention string) string { + return strings.Replace(strings.Replace(old, "%DB%", database, -1), "%RP%", retention, -1) +} + +// Node represents a node under test, which is both a broker and data node. +type Node struct { broker *messaging.Broker server *influxdb.Server url *url.URL leader bool } -// cluster represents a multi-node cluster. -type cluster []*node - -// createBatch returns a JSON string, representing the request body for a batch write. The timestamp -// simply increases and the value is a random integer. -func createBatch(nPoints int, database, retention, measurement string, tags map[string]string) string { - type Point struct { - Name string `json:"name"` - Tags map[string]string `json:"tags"` - Timestamp int64 `json:"timestamp"` - Precision string `json:"precision"` - Fields map[string]int `json:"fields"` - } - type PointBatch struct { - Database string `json:"database"` - RetentionPolicy string `json:"retentionPolicy"` - Points []Point `json:"points"` - } - - rand.Seed(time.Now().UTC().UnixNano()) - points := make([]Point, 0) - for i := 0; i < nPoints; i++ { - fields := map[string]int{"value": rand.Int()} - point := Point{Name: measurement, Tags: tags, Timestamp: time.Now().UTC().UnixNano(), Precision: "n", Fields: fields} - points = append(points, point) - } - batch := PointBatch{Database: database, RetentionPolicy: retention, Points: points} - - buf, _ := json.Marshal(batch) - return string(buf) -} +// Cluster represents a multi-node cluster. +type Cluster []*Node // createCombinedNodeCluster creates a cluster of nServers nodes, each of which // runs as both a Broker and Data node. If any part cluster creation fails, // the testing is marked as failed. // // This function returns a slice of nodes, the first of which will be the leader. -func createCombinedNodeCluster(t *testing.T, testName string, nNodes, basePort int) cluster { +func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes, basePort int) Cluster { t.Logf("Creating cluster of %d nodes for test %s", nNodes, testName) if nNodes < 1 { t.Fatalf("Test %s: asked to create nonsense cluster", testName) } - nodes := make([]*node, 0) + nodes := make([]*Node, 0) - tmpDir := os.TempDir() tmpBrokerDir := filepath.Join(tmpDir, "broker-integration-test") tmpDataDir := filepath.Join(tmpDir, "data-integration-test") t.Logf("Test %s: using tmp directory %q for brokers\n", testName, tmpBrokerDir) @@ -118,7 +98,7 @@ func createCombinedNodeCluster(t *testing.T, testName string, nNodes, basePort i if s == nil { t.Fatalf("Test %s: failed to create leader data node on port %d", testName, basePort) } - nodes = append(nodes, &node{ + nodes = append(nodes, &Node{ broker: b, server: s, url: &url.URL{Scheme: "http", Host: "localhost:" + strconv.Itoa(basePort)}, @@ -141,7 +121,7 @@ func createCombinedNodeCluster(t *testing.T, testName string, nNodes, basePort i t.Fatalf("Test %s: failed to create following data node on port %d", testName, basePort) } - nodes = append(nodes, &node{ + nodes = append(nodes, &Node{ broker: b, server: s, url: &url.URL{Scheme: "http", Host: "localhost:" + strconv.Itoa(nextPort)}, @@ -152,456 +132,177 @@ func createCombinedNodeCluster(t *testing.T, testName string, nNodes, basePort i } // createDatabase creates a database, and verifies that the creation was successful. -func createDatabase(t *testing.T, testName string, nodes cluster, database string) { +func createDatabase(t *testing.T, testName string, nodes Cluster, database string) { t.Logf("Test: %s: creating database %s", testName, database) - serverURL := nodes[0].url - - u := urlFor(serverURL, "query", url.Values{"q": []string{"CREATE DATABASE " + database}}) - resp, err := http.Get(u.String()) - if err != nil { - t.Fatalf("Couldn't create database: %s", err) - } - defer resp.Body.Close() - - var results client.Results - err = json.NewDecoder(resp.Body).Decode(&results) - if err != nil { - t.Fatalf("Couldn't decode results: %v", err) - } - - if results.Error() != nil { - t.Logf("results.Error(): %q", results.Error().Error()) - } - - if resp.StatusCode != http.StatusOK { - t.Fatalf("Create database failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode) - } - - if len(results.Results) != 1 { - t.Fatalf("Create database failed. Unexpected results length. expected: %d, actual %d", 1, len(results.Results)) - } - - // Query the database exists - u = urlFor(serverURL, "query", url.Values{"q": []string{"SHOW DATABASES"}}) - resp, err = http.Get(u.String()) - if err != nil { - t.Fatalf("Couldn't query databases: %s", err) - } - defer resp.Body.Close() - - err = json.NewDecoder(resp.Body).Decode(&results) - if err != nil { - t.Fatalf("Couldn't decode results: %v", err) - } - - if results.Error() != nil { - t.Logf("results.Error(): %q", results.Error().Error()) - } - - if resp.StatusCode != http.StatusOK { - t.Fatalf("show databases failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode) - } - - expectedResults := client.Results{ - Results: []client.Result{ - {Series: []influxql.Row{ - { - Columns: []string{"name"}, - Values: [][]interface{}{{database}}, - }, - }}, - }, - } - if !reflect.DeepEqual(results, expectedResults) { - t.Fatalf("show databases failed. Unexpected results. expected: %+v, actual %+v", expectedResults, results) - } + query(t, nodes[:1], "CREATE DATABASE "+database, `{"results":[{}]}`) } // createRetentionPolicy creates a retetention policy and verifies that the creation was successful. -func createRetentionPolicy(t *testing.T, testName string, nodes cluster, database, retention string) { - t.Log("Creating retention policy") - serverURL := nodes[0].url - replication := fmt.Sprintf("CREATE RETENTION POLICY %s ON %s DURATION 1h REPLICATION %d DEFAULT", retention, database, len(nodes)) +// Replication factor is set to equal the number nodes in the cluster. +func createRetentionPolicy(t *testing.T, testName string, nodes Cluster, database, retention string) { + t.Logf("Creating retention policy %s for database %s", retention, database) + command := fmt.Sprintf("CREATE RETENTION POLICY %s ON %s DURATION 1h REPLICATION %d DEFAULT", retention, database, len(nodes)) + query(t, nodes[:1], command, `{"results":[{}]}`) +} - u := urlFor(serverURL, "query", url.Values{"q": []string{replication}}) - resp, err := http.Get(u.String()) - if err != nil { - t.Fatalf("Couldn't create retention policy: %s", err) - } - defer resp.Body.Close() - - var results client.Results - err = json.NewDecoder(resp.Body).Decode(&results) - if err != nil { - t.Fatalf("Couldn't decode results: %v", err) - } - - if results.Error() != nil { - t.Logf("results.Error(): %q", results.Error().Error()) - } - - if resp.StatusCode != http.StatusOK { - t.Fatalf("Create retention policy failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode) - } - - if len(results.Results) != 1 { - t.Fatalf("Create retention policy failed. Unexpected results length. expected: %d, actual %d", 1, len(results.Results)) - } +// deleteDatabase delete a database, and verifies that the deletion was successful. +func deleteDatabase(t *testing.T, testName string, nodes Cluster, database string) { + t.Logf("Test: %s: deleting database %s", testName, database) + query(t, nodes[:1], "DROP DATABASE "+database, `{"results":[{}]}`) } // writes writes the provided data to the cluster. It verfies that a 200 OK is returned by the server. -func write(t *testing.T, testName string, nodes cluster, data string) { - t.Logf("Test %s: writing data", testName) - serverURL := nodes[0].url - u := urlFor(serverURL, "write", url.Values{}) +func write(t *testing.T, node *Node, data string) { + u := urlFor(node.url, "write", url.Values{}) - buf := []byte(data) - resp, err := http.Post(u.String(), "application/json", bytes.NewReader(buf)) + resp, err := http.Post(u.String(), "application/json", bytes.NewReader([]byte(data))) if err != nil { t.Fatalf("Couldn't write data: %s", err) } if resp.StatusCode != http.StatusOK { - t.Fatalf("Write to database failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode) + body, _ := ioutil.ReadAll(resp.Body) + t.Fatalf("Write to database failed. Unexpected status code. expected: %d, actual %d, %s", http.StatusOK, resp.StatusCode, string(body)) } - index, err := strconv.ParseInt(resp.Header.Get("X-InfluxDB-Index"), 10, 64) - if err != nil { - t.Fatalf("Couldn't get index. header: %s, err: %s.", resp.Header.Get("X-InfluxDB-Index"), err) - } - wait(t, testName, nodes, index) - t.Log("Finished writing and waiting") + // Until races are solved. + time.Sleep(3 * time.Second) } -// simpleQuery executes the given query against all nodes in the cluster, and verify the -// returned results are as expected. -func simpleQuery(t *testing.T, testName string, nodes cluster, query string, expected client.Results) { - var results client.Results - +// query executes the given query against all nodes in the cluster, and verifies no errors occured, and +// ensures the returned data is as expected +func query(t *testing.T, nodes Cluster, query, expected string) (string, bool) { // Query the data exists for _, n := range nodes { - t.Logf("Test name %s: query data on node %s", testName, n.url) u := urlFor(n.url, "query", url.Values{"q": []string{query}}) resp, err := http.Get(u.String()) if err != nil { - t.Fatalf("Couldn't query databases: %s", err) + t.Fatalf("Failed to execute query '%s': %s", query, err.Error()) } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { - t.Fatalf("Couldn't read body of response: %s", err) - } - t.Logf("resp.Body: %s\n", string(body)) - - dec := json.NewDecoder(bytes.NewReader(body)) - dec.UseNumber() - err = dec.Decode(&results) - if err != nil { - t.Fatalf("Couldn't decode results: %v", err) + t.Fatalf("Couldn't read body of response: %s", err.Error()) } - if results.Error() != nil { - t.Logf("results.Error(): %q", results.Error().Error()) - } - - if resp.StatusCode != http.StatusOK { - t.Fatalf("query databases failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode) - } - - if !reflect.DeepEqual(results, expected) { - t.Logf("Expected: %#v\n", expected) - t.Logf("Actual: %#v\n", results) - t.Fatalf("query databases failed. Unexpected results.") + if expected != string(body) { + return string(body), false } } + + return "", true } -func wait(t *testing.T, testName string, nodes cluster, index int64) { - // Wait for the index to sync up - var wg sync.WaitGroup - wg.Add(len(nodes)) - for _, n := range nodes { - go func(t *testing.T, testName string, nodes cluster, u *url.URL, index int64) { - u = urlFor(u, fmt.Sprintf("wait/%d", index), nil) - t.Logf("Test name %s: wait on node %s for index %d", testName, u, index) - resp, err := http.Get(u.String()) - if err != nil { - t.Fatalf("Couldn't wait: %s", err) - } +// runTests_Errors tests some basic error cases. +func runTests_Errors(t *testing.T, nodes Cluster) { + t.Logf("Running tests against %d-node cluster", len(nodes)) - if resp.StatusCode != http.StatusOK { - t.Fatalf("query databases failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode) - } - - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - t.Fatalf("Couldn't read body of response: %s", err) - } - t.Logf("resp.Body: %s\n", string(body)) - - i, _ := strconv.Atoi(string(body)) - if i == 0 { - t.Fatalf("Unexpected body: %s", string(body)) - } - - wg.Done() - - }(t, testName, nodes, n.url, index) - } - wg.Wait() -} - -// simpleCountQuery executes the given query against all nodes in the cluster, and verify the -// the count for the given field is as expected. -func simpleCountQuery(t *testing.T, testName string, nodes cluster, query, field string, expected int64) { - var results client.Results - - // Query the data exists - for _, n := range nodes { - t.Logf("Test name %s: query data on node %s", testName, n.url) - u := urlFor(n.url, "query", url.Values{"q": []string{query}}) - resp, err := http.Get(u.String()) - if err != nil { - t.Fatalf("Couldn't query databases: %s", err) - } - defer resp.Body.Close() - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - t.Fatalf("Couldn't read body of response: %s", err) - } - t.Logf("resp.Body: %s\n", string(body)) - - dec := json.NewDecoder(bytes.NewReader(body)) - dec.UseNumber() - err = dec.Decode(&results) - if err != nil { - t.Fatalf("Couldn't decode results: %v", err) - } - - if results.Error() != nil { - t.Logf("results.Error(): %q", results.Error().Error()) - } - - if resp.StatusCode != http.StatusOK { - t.Fatalf("query databases failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode) - } - - if len(results.Results) != 1 || len(results.Results[0].Series) != 1 { - t.Fatal("results object returned has insufficient entries") - } - j, ok := results.Results[0].Series[0].Values[0][1].(json.Number) - if !ok { - t.Fatalf("count is not a JSON number") - } - count, err := j.Int64() - if err != nil { - t.Fatalf("failed to convert count to int64") - } - if count != expected { - t.Fatalf("count value is wrong, expected %d, go %d", expected, count) - } - } -} - -func Test_ServerSingleIntegration(t *testing.T) { - nNodes := 1 - basePort := 8090 - testName := "single node" - now := time.Now().UTC() - nodes := createCombinedNodeCluster(t, "single node", nNodes, basePort) - - createDatabase(t, testName, nodes, "foo") - createRetentionPolicy(t, testName, nodes, "foo", "bar") - write(t, testName, nodes, fmt.Sprintf(` -{ - "database": "foo", - "retentionPolicy": "bar", - "points": - [{ - "name": "cpu", - "tags": { - "host": "server01" - }, - "timestamp": %d, - "precision": "n", - "fields":{ - "value": 100 - } - }] -} -`, now.UnixNano())) - expectedResults := client.Results{ - Results: []client.Result{ - {Series: []influxql.Row{ - { - Name: "cpu", - Columns: []string{"time", "value"}, - Values: [][]interface{}{ - {now.Format(time.RFC3339Nano), json.Number("100")}, - }, - }}}, - }, - } - simpleQuery(t, testName, nodes[:1], `select value from "foo"."bar".cpu`, expectedResults) -} - -func Test_Server3NodeIntegration(t *testing.T) { - if testing.Short() { - t.Skip() - } - nNodes := 3 - basePort := 8190 - testName := "3 node" - now := time.Now().UTC() - nodes := createCombinedNodeCluster(t, testName, nNodes, basePort) - - createDatabase(t, testName, nodes, "foo") - createRetentionPolicy(t, testName, nodes, "foo", "bar") - write(t, testName, nodes, fmt.Sprintf(` -{ - "database": "foo", - "retentionPolicy": "bar", - "points": - [{ - "name": "cpu", - "tags": { - "host": "server01" - }, - "timestamp": %d, - "precision": "n", - "fields":{ - "value": 100 - } - }] -} -`, now.UnixNano())) - expectedResults := client.Results{ - Results: []client.Result{ - {Series: []influxql.Row{ - { - Name: "cpu", - Columns: []string{"time", "value"}, - Values: [][]interface{}{ - {now.Format(time.RFC3339Nano), json.Number("100")}, - }, - }}}, + tests := []struct { + name string + write string // If equal to the empty string, no data is written. + query string // If equal to the blank string, no query is executed. + expected string // If 'query' is equal to the blank string, this is ignored. + }{ + { + name: "simple SELECT from non-existent database", + write: "", + query: `SELECT * FROM "qux"."bar".cpu`, + expected: `{"results":[{"error":"database not found: qux"}]}`, }, } - simpleQuery(t, testName, nodes[:1], `select value from "foo"."bar".cpu`, expectedResults) -} + for _, tt := range tests { + if tt.write != "" { + write(t, nodes[0], tt.write) + } -func Test_Server5NodeIntegration(t *testing.T) { - t.Skip() - if testing.Short() { - t.Skip() + if tt.query != "" { + got, ok := query(t, nodes, tt.query, tt.expected) + if !ok { + t.Errorf("Test '%s' failed, expected: %s, got: %s", tt.name, tt.expected, got) + } + } } - nNodes := 5 - basePort := 8290 - testName := "5 node" - now := time.Now().UTC() - nodes := createCombinedNodeCluster(t, testName, nNodes, basePort) - - createDatabase(t, testName, nodes, "foo") - createRetentionPolicy(t, testName, nodes, "foo", "bar") - write(t, testName, nodes, fmt.Sprintf(` -{ - "database": "foo", - "retentionPolicy": "bar", - "points": - [{ - "name": "cpu", - "tags": { - "host": "server01" - }, - "timestamp": %d, - "precision": "n", - "fields":{ - "value": 100 - } - }] } -`, now.UnixNano())) - expectedResults := client.Results{ - Results: []client.Result{ - {Series: []influxql.Row{ - { - Name: "cpu", - Columns: []string{"time", "value"}, - Values: [][]interface{}{ - {now.Format(time.RFC3339Nano), json.Number("100")}, - }, - }}}, +// runTests tests write and query of data. +func runTestsData(t *testing.T, testName string, nodes Cluster, database, retention string) { + t.Logf("Running tests against %d-node cluster", len(nodes)) + + // Start by ensuring database and retention policy exist. + createDatabase(t, testName, nodes, database) + createRetentionPolicy(t, testName, nodes, database, retention) + + // The tests. Within these tests %DB% and %RP% will be replaced with the database and retention passed into + // this function. + tests := []struct { + reset bool // Delete and recreate the database. + name string // Test name, for easy-to-read test log output. + write string // If equal to the empty string, no data is written. + query string // If equal to the blank string, no query is executed. + expected string // If 'query' is equal to the blank string, this is ignored. + }{ + { + reset: true, + name: "single point with timestamp", + write: `{"database" : "%DB%", "retentionPolicy" : "%RP%", "points": [{"name": "cpu", "timestamp": "2015-02-28T01:03:36.703820946Z", "tags": {"host": "server01"}, "fields": {"value": 100}}]}`, + query: `SELECT * FROM "%DB%"."myrp".cpu`, + expected: `{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["2015-02-28T01:03:36.703820946Z",100]]}]}]}`, + }, + { + name: "single point, select with now()", + query: `SELECT * FROM "%DB%"."%RP%".cpu WHERE time < now()`, + expected: `{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["2015-02-28T01:03:36.703820946Z",100]]}]}]}`, }, } - simpleQuery(t, testName, nodes[:1], `select value from "foo"."bar".cpu`, expectedResults) + for _, tt := range tests { + if tt.reset { + t.Logf(`reseting for test "%s"`, tt.name) + deleteDatabase(t, testName, nodes, database) + createDatabase(t, testName, nodes, database) + createRetentionPolicy(t, testName, nodes, database, retention) + } + + if tt.write != "" { + write(t, nodes[0], rewriteDbRp(tt.write, database, retention)) + } + + if tt.query != "" { + got, ok := query(t, nodes, rewriteDbRp(tt.query, database, retention), rewriteDbRp(tt.expected, database, retention)) + if !ok { + t.Errorf(`Test "%s" failed, expected: %s, got: %s`, tt.name, rewriteDbRp(tt.expected, database, retention), got) + } + } + } } -func Test_ServerSingleLargeBatchIntegration(t *testing.T) { +func TestSingleServer(t *testing.T) { + testName := "single server integration" if testing.Short() { - t.Skip() + t.Skip(fmt.Sprintf("skipping '%s'", testName)) } - nNodes := 1 - basePort := 8390 - testName := "single node large batch" - nodes := createCombinedNodeCluster(t, testName, nNodes, basePort) + dir := tempfile() + defer func() { + os.RemoveAll(dir) + }() - createDatabase(t, testName, nodes, "foo") - createRetentionPolicy(t, testName, nodes, "foo", "bar") - write(t, testName, nodes, createBatch(batchSize, "foo", "bar", "cpu", map[string]string{"host": "server01"})) - simpleCountQuery(t, testName, nodes, `select count(value) from "foo"."bar".cpu`, "value", batchSize) + nodes := createCombinedNodeCluster(t, testName, dir, 1, 8090) + + runTestsData(t, testName, nodes, "mydb", "myrp") } -func Test_Server3NodeLargeBatchIntegration(t *testing.T) { +func Test3NodeServer(t *testing.T) { + testName := "3-node server integration" if testing.Short() { - t.Skip() + t.Skip(fmt.Sprintf("skipping '%s'", testName)) } - nNodes := 3 - basePort := 8490 - testName := "3 node large batch" - nodes := createCombinedNodeCluster(t, testName, nNodes, basePort) + dir := tempfile() + defer func() { + os.RemoveAll(dir) + }() - createDatabase(t, testName, nodes, "foo") - createRetentionPolicy(t, testName, nodes, "foo", "bar") - write(t, testName, nodes, createBatch(batchSize, "foo", "bar", "cpu", map[string]string{"host": "server01"})) - simpleCountQuery(t, testName, nodes, `select count(value) from "foo"."bar".cpu`, "value", batchSize) -} - -func Test_Server5NodeLargeBatchIntegration(t *testing.T) { - t.Skip() - if testing.Short() { - t.Skip() - } - nNodes := 5 - basePort := 8590 - testName := "5 node large batch" - nodes := createCombinedNodeCluster(t, testName, nNodes, basePort) - - createDatabase(t, testName, nodes, "foo") - createRetentionPolicy(t, testName, nodes, "foo", "bar") - write(t, testName, nodes, createBatch(batchSize, "foo", "bar", "cpu", map[string]string{"host": "server01"})) - simpleCountQuery(t, testName, nodes, `select count(value) from "foo"."bar".cpu`, "value", batchSize) -} - -func Test_ServerMultiLargeBatchIntegration(t *testing.T) { - t.Skip() - if testing.Short() { - t.Skip() - } - nNodes := 1 - nBatches := 5 - basePort := 8690 - testName := "single node multi batch" - nodes := createCombinedNodeCluster(t, testName, nNodes, basePort) - - createDatabase(t, testName, nodes, "foo") - createRetentionPolicy(t, testName, nodes, "foo", "bar") - for i := 0; i < nBatches; i++ { - write(t, testName, nodes, createBatch(batchSize, "foo", "bar", "cpu", map[string]string{"host": "server01"})) - } - simpleCountQuery(t, testName, nodes, `select count(value) from "foo"."bar".cpu`, "value", batchSize*int64(nBatches)) + nodes := createCombinedNodeCluster(t, testName, dir, 3, 8190) + + runTestsData(t, testName, nodes, "mydb", "myrp") }