Add initial version of table-driven int. tests

pull/1788/head
Philip O'Toole 2015-02-27 18:05:30 -08:00
parent 4c5beb96ed
commit 1ba6d8a4f9
1 changed files with 143 additions and 442 deletions

View File

@ -2,23 +2,18 @@ package main_test
import ( import (
"bytes" "bytes"
"encoding/json"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"math/rand"
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
"path/filepath" "path/filepath"
"reflect"
"strconv" "strconv"
"sync" "strings"
"testing" "testing"
"time" "time"
"github.com/influxdb/influxdb" "github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/client"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/messaging" "github.com/influxdb/influxdb/messaging"
main "github.com/influxdb/influxdb/cmd/influxd" main "github.com/influxdb/influxdb/cmd/influxd"
@ -30,6 +25,15 @@ const (
batchSize = 4217 batchSize = 4217
) )
// tempfile returns a temporary path.
func tempfile() string {
f, _ := ioutil.TempFile("", "influxdb-")
path := f.Name()
f.Close()
os.Remove(path)
return path
}
// urlFor returns a URL with the path and query params correctly appended and set. // urlFor returns a URL with the path and query params correctly appended and set.
func urlFor(u *url.URL, path string, params url.Values) *url.URL { func urlFor(u *url.URL, path string, params url.Values) *url.URL {
v, _ := url.Parse(u.String()) v, _ := url.Parse(u.String())
@ -38,60 +42,36 @@ func urlFor(u *url.URL, path string, params url.Values) *url.URL {
return v return v
} }
// node represents a node under test, which is both a broker and data node. // rewriteDbRp returns a copy of old with occurrences of %DB% with the given database,
type node struct { // and occurences of %RP with the given retention
func rewriteDbRp(old, database, retention string) string {
return strings.Replace(strings.Replace(old, "%DB%", database, -1), "%RP%", retention, -1)
}
// Node represents a node under test, which is both a broker and data node.
type Node struct {
broker *messaging.Broker broker *messaging.Broker
server *influxdb.Server server *influxdb.Server
url *url.URL url *url.URL
leader bool leader bool
} }
// cluster represents a multi-node cluster. // Cluster represents a multi-node cluster.
type cluster []*node type Cluster []*Node
// createBatch returns a JSON string, representing the request body for a batch write. The timestamp
// simply increases and the value is a random integer.
func createBatch(nPoints int, database, retention, measurement string, tags map[string]string) string {
type Point struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
Timestamp int64 `json:"timestamp"`
Precision string `json:"precision"`
Fields map[string]int `json:"fields"`
}
type PointBatch struct {
Database string `json:"database"`
RetentionPolicy string `json:"retentionPolicy"`
Points []Point `json:"points"`
}
rand.Seed(time.Now().UTC().UnixNano())
points := make([]Point, 0)
for i := 0; i < nPoints; i++ {
fields := map[string]int{"value": rand.Int()}
point := Point{Name: measurement, Tags: tags, Timestamp: time.Now().UTC().UnixNano(), Precision: "n", Fields: fields}
points = append(points, point)
}
batch := PointBatch{Database: database, RetentionPolicy: retention, Points: points}
buf, _ := json.Marshal(batch)
return string(buf)
}
// createCombinedNodeCluster creates a cluster of nServers nodes, each of which // createCombinedNodeCluster creates a cluster of nServers nodes, each of which
// runs as both a Broker and Data node. If any part cluster creation fails, // runs as both a Broker and Data node. If any part cluster creation fails,
// the testing is marked as failed. // the testing is marked as failed.
// //
// This function returns a slice of nodes, the first of which will be the leader. // This function returns a slice of nodes, the first of which will be the leader.
func createCombinedNodeCluster(t *testing.T, testName string, nNodes, basePort int) cluster { func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes, basePort int) Cluster {
t.Logf("Creating cluster of %d nodes for test %s", nNodes, testName) t.Logf("Creating cluster of %d nodes for test %s", nNodes, testName)
if nNodes < 1 { if nNodes < 1 {
t.Fatalf("Test %s: asked to create nonsense cluster", testName) t.Fatalf("Test %s: asked to create nonsense cluster", testName)
} }
nodes := make([]*node, 0) nodes := make([]*Node, 0)
tmpDir := os.TempDir()
tmpBrokerDir := filepath.Join(tmpDir, "broker-integration-test") tmpBrokerDir := filepath.Join(tmpDir, "broker-integration-test")
tmpDataDir := filepath.Join(tmpDir, "data-integration-test") tmpDataDir := filepath.Join(tmpDir, "data-integration-test")
t.Logf("Test %s: using tmp directory %q for brokers\n", testName, tmpBrokerDir) t.Logf("Test %s: using tmp directory %q for brokers\n", testName, tmpBrokerDir)
@ -118,7 +98,7 @@ func createCombinedNodeCluster(t *testing.T, testName string, nNodes, basePort i
if s == nil { if s == nil {
t.Fatalf("Test %s: failed to create leader data node on port %d", testName, basePort) t.Fatalf("Test %s: failed to create leader data node on port %d", testName, basePort)
} }
nodes = append(nodes, &node{ nodes = append(nodes, &Node{
broker: b, broker: b,
server: s, server: s,
url: &url.URL{Scheme: "http", Host: "localhost:" + strconv.Itoa(basePort)}, url: &url.URL{Scheme: "http", Host: "localhost:" + strconv.Itoa(basePort)},
@ -141,7 +121,7 @@ func createCombinedNodeCluster(t *testing.T, testName string, nNodes, basePort i
t.Fatalf("Test %s: failed to create following data node on port %d", testName, basePort) t.Fatalf("Test %s: failed to create following data node on port %d", testName, basePort)
} }
nodes = append(nodes, &node{ nodes = append(nodes, &Node{
broker: b, broker: b,
server: s, server: s,
url: &url.URL{Scheme: "http", Host: "localhost:" + strconv.Itoa(nextPort)}, url: &url.URL{Scheme: "http", Host: "localhost:" + strconv.Itoa(nextPort)},
@ -152,456 +132,177 @@ func createCombinedNodeCluster(t *testing.T, testName string, nNodes, basePort i
} }
// createDatabase creates a database, and verifies that the creation was successful. // createDatabase creates a database, and verifies that the creation was successful.
func createDatabase(t *testing.T, testName string, nodes cluster, database string) { func createDatabase(t *testing.T, testName string, nodes Cluster, database string) {
t.Logf("Test: %s: creating database %s", testName, database) t.Logf("Test: %s: creating database %s", testName, database)
serverURL := nodes[0].url query(t, nodes[:1], "CREATE DATABASE "+database, `{"results":[{}]}`)
u := urlFor(serverURL, "query", url.Values{"q": []string{"CREATE DATABASE " + database}})
resp, err := http.Get(u.String())
if err != nil {
t.Fatalf("Couldn't create database: %s", err)
}
defer resp.Body.Close()
var results client.Results
err = json.NewDecoder(resp.Body).Decode(&results)
if err != nil {
t.Fatalf("Couldn't decode results: %v", err)
}
if results.Error() != nil {
t.Logf("results.Error(): %q", results.Error().Error())
}
if resp.StatusCode != http.StatusOK {
t.Fatalf("Create database failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode)
}
if len(results.Results) != 1 {
t.Fatalf("Create database failed. Unexpected results length. expected: %d, actual %d", 1, len(results.Results))
}
// Query the database exists
u = urlFor(serverURL, "query", url.Values{"q": []string{"SHOW DATABASES"}})
resp, err = http.Get(u.String())
if err != nil {
t.Fatalf("Couldn't query databases: %s", err)
}
defer resp.Body.Close()
err = json.NewDecoder(resp.Body).Decode(&results)
if err != nil {
t.Fatalf("Couldn't decode results: %v", err)
}
if results.Error() != nil {
t.Logf("results.Error(): %q", results.Error().Error())
}
if resp.StatusCode != http.StatusOK {
t.Fatalf("show databases failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode)
}
expectedResults := client.Results{
Results: []client.Result{
{Series: []influxql.Row{
{
Columns: []string{"name"},
Values: [][]interface{}{{database}},
},
}},
},
}
if !reflect.DeepEqual(results, expectedResults) {
t.Fatalf("show databases failed. Unexpected results. expected: %+v, actual %+v", expectedResults, results)
}
} }
// createRetentionPolicy creates a retetention policy and verifies that the creation was successful. // createRetentionPolicy creates a retetention policy and verifies that the creation was successful.
func createRetentionPolicy(t *testing.T, testName string, nodes cluster, database, retention string) { // Replication factor is set to equal the number nodes in the cluster.
t.Log("Creating retention policy") func createRetentionPolicy(t *testing.T, testName string, nodes Cluster, database, retention string) {
serverURL := nodes[0].url t.Logf("Creating retention policy %s for database %s", retention, database)
replication := fmt.Sprintf("CREATE RETENTION POLICY %s ON %s DURATION 1h REPLICATION %d DEFAULT", retention, database, len(nodes)) command := fmt.Sprintf("CREATE RETENTION POLICY %s ON %s DURATION 1h REPLICATION %d DEFAULT", retention, database, len(nodes))
query(t, nodes[:1], command, `{"results":[{}]}`)
u := urlFor(serverURL, "query", url.Values{"q": []string{replication}})
resp, err := http.Get(u.String())
if err != nil {
t.Fatalf("Couldn't create retention policy: %s", err)
}
defer resp.Body.Close()
var results client.Results
err = json.NewDecoder(resp.Body).Decode(&results)
if err != nil {
t.Fatalf("Couldn't decode results: %v", err)
} }
if results.Error() != nil { // deleteDatabase delete a database, and verifies that the deletion was successful.
t.Logf("results.Error(): %q", results.Error().Error()) func deleteDatabase(t *testing.T, testName string, nodes Cluster, database string) {
} t.Logf("Test: %s: deleting database %s", testName, database)
query(t, nodes[:1], "DROP DATABASE "+database, `{"results":[{}]}`)
if resp.StatusCode != http.StatusOK {
t.Fatalf("Create retention policy failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode)
}
if len(results.Results) != 1 {
t.Fatalf("Create retention policy failed. Unexpected results length. expected: %d, actual %d", 1, len(results.Results))
}
} }
// writes writes the provided data to the cluster. It verfies that a 200 OK is returned by the server. // writes writes the provided data to the cluster. It verfies that a 200 OK is returned by the server.
func write(t *testing.T, testName string, nodes cluster, data string) { func write(t *testing.T, node *Node, data string) {
t.Logf("Test %s: writing data", testName) u := urlFor(node.url, "write", url.Values{})
serverURL := nodes[0].url
u := urlFor(serverURL, "write", url.Values{})
buf := []byte(data) resp, err := http.Post(u.String(), "application/json", bytes.NewReader([]byte(data)))
resp, err := http.Post(u.String(), "application/json", bytes.NewReader(buf))
if err != nil { if err != nil {
t.Fatalf("Couldn't write data: %s", err) t.Fatalf("Couldn't write data: %s", err)
} }
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
t.Fatalf("Write to database failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode) body, _ := ioutil.ReadAll(resp.Body)
t.Fatalf("Write to database failed. Unexpected status code. expected: %d, actual %d, %s", http.StatusOK, resp.StatusCode, string(body))
} }
index, err := strconv.ParseInt(resp.Header.Get("X-InfluxDB-Index"), 10, 64) // Until races are solved.
if err != nil { time.Sleep(3 * time.Second)
t.Fatalf("Couldn't get index. header: %s, err: %s.", resp.Header.Get("X-InfluxDB-Index"), err)
}
wait(t, testName, nodes, index)
t.Log("Finished writing and waiting")
} }
// simpleQuery executes the given query against all nodes in the cluster, and verify the // query executes the given query against all nodes in the cluster, and verifies no errors occured, and
// returned results are as expected. // ensures the returned data is as expected
func simpleQuery(t *testing.T, testName string, nodes cluster, query string, expected client.Results) { func query(t *testing.T, nodes Cluster, query, expected string) (string, bool) {
var results client.Results
// Query the data exists // Query the data exists
for _, n := range nodes { for _, n := range nodes {
t.Logf("Test name %s: query data on node %s", testName, n.url)
u := urlFor(n.url, "query", url.Values{"q": []string{query}}) u := urlFor(n.url, "query", url.Values{"q": []string{query}})
resp, err := http.Get(u.String()) resp, err := http.Get(u.String())
if err != nil { if err != nil {
t.Fatalf("Couldn't query databases: %s", err) t.Fatalf("Failed to execute query '%s': %s", query, err.Error())
} }
defer resp.Body.Close() defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body) body, err := ioutil.ReadAll(resp.Body)
if err != nil { if err != nil {
t.Fatalf("Couldn't read body of response: %s", err) t.Fatalf("Couldn't read body of response: %s", err.Error())
}
t.Logf("resp.Body: %s\n", string(body))
dec := json.NewDecoder(bytes.NewReader(body))
dec.UseNumber()
err = dec.Decode(&results)
if err != nil {
t.Fatalf("Couldn't decode results: %v", err)
} }
if results.Error() != nil { if expected != string(body) {
t.Logf("results.Error(): %q", results.Error().Error()) return string(body), false
}
if resp.StatusCode != http.StatusOK {
t.Fatalf("query databases failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode)
}
if !reflect.DeepEqual(results, expected) {
t.Logf("Expected: %#v\n", expected)
t.Logf("Actual: %#v\n", results)
t.Fatalf("query databases failed. Unexpected results.")
}
} }
} }
func wait(t *testing.T, testName string, nodes cluster, index int64) { return "", true
// Wait for the index to sync up
var wg sync.WaitGroup
wg.Add(len(nodes))
for _, n := range nodes {
go func(t *testing.T, testName string, nodes cluster, u *url.URL, index int64) {
u = urlFor(u, fmt.Sprintf("wait/%d", index), nil)
t.Logf("Test name %s: wait on node %s for index %d", testName, u, index)
resp, err := http.Get(u.String())
if err != nil {
t.Fatalf("Couldn't wait: %s", err)
} }
if resp.StatusCode != http.StatusOK { // runTests_Errors tests some basic error cases.
t.Fatalf("query databases failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode) func runTests_Errors(t *testing.T, nodes Cluster) {
t.Logf("Running tests against %d-node cluster", len(nodes))
tests := []struct {
name string
write string // If equal to the empty string, no data is written.
query string // If equal to the blank string, no query is executed.
expected string // If 'query' is equal to the blank string, this is ignored.
}{
{
name: "simple SELECT from non-existent database",
write: "",
query: `SELECT * FROM "qux"."bar".cpu`,
expected: `{"results":[{"error":"database not found: qux"}]}`,
},
} }
defer resp.Body.Close() for _, tt := range tests {
body, err := ioutil.ReadAll(resp.Body) if tt.write != "" {
if err != nil { write(t, nodes[0], tt.write)
t.Fatalf("Couldn't read body of response: %s", err)
}
t.Logf("resp.Body: %s\n", string(body))
i, _ := strconv.Atoi(string(body))
if i == 0 {
t.Fatalf("Unexpected body: %s", string(body))
} }
wg.Done() if tt.query != "" {
got, ok := query(t, nodes, tt.query, tt.expected)
}(t, testName, nodes, n.url, index)
}
wg.Wait()
}
// simpleCountQuery executes the given query against all nodes in the cluster, and verify the
// the count for the given field is as expected.
func simpleCountQuery(t *testing.T, testName string, nodes cluster, query, field string, expected int64) {
var results client.Results
// Query the data exists
for _, n := range nodes {
t.Logf("Test name %s: query data on node %s", testName, n.url)
u := urlFor(n.url, "query", url.Values{"q": []string{query}})
resp, err := http.Get(u.String())
if err != nil {
t.Fatalf("Couldn't query databases: %s", err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Couldn't read body of response: %s", err)
}
t.Logf("resp.Body: %s\n", string(body))
dec := json.NewDecoder(bytes.NewReader(body))
dec.UseNumber()
err = dec.Decode(&results)
if err != nil {
t.Fatalf("Couldn't decode results: %v", err)
}
if results.Error() != nil {
t.Logf("results.Error(): %q", results.Error().Error())
}
if resp.StatusCode != http.StatusOK {
t.Fatalf("query databases failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode)
}
if len(results.Results) != 1 || len(results.Results[0].Series) != 1 {
t.Fatal("results object returned has insufficient entries")
}
j, ok := results.Results[0].Series[0].Values[0][1].(json.Number)
if !ok { if !ok {
t.Fatalf("count is not a JSON number") t.Errorf("Test '%s' failed, expected: %s, got: %s", tt.name, tt.expected, got)
} }
count, err := j.Int64()
if err != nil {
t.Fatalf("failed to convert count to int64")
}
if count != expected {
t.Fatalf("count value is wrong, expected %d, go %d", expected, count)
} }
} }
} }
func Test_ServerSingleIntegration(t *testing.T) { // runTests tests write and query of data.
nNodes := 1 func runTestsData(t *testing.T, testName string, nodes Cluster, database, retention string) {
basePort := 8090 t.Logf("Running tests against %d-node cluster", len(nodes))
testName := "single node"
now := time.Now().UTC()
nodes := createCombinedNodeCluster(t, "single node", nNodes, basePort)
createDatabase(t, testName, nodes, "foo") // Start by ensuring database and retention policy exist.
createRetentionPolicy(t, testName, nodes, "foo", "bar") createDatabase(t, testName, nodes, database)
write(t, testName, nodes, fmt.Sprintf(` createRetentionPolicy(t, testName, nodes, database, retention)
// The tests. Within these tests %DB% and %RP% will be replaced with the database and retention passed into
// this function.
tests := []struct {
reset bool // Delete and recreate the database.
name string // Test name, for easy-to-read test log output.
write string // If equal to the empty string, no data is written.
query string // If equal to the blank string, no query is executed.
expected string // If 'query' is equal to the blank string, this is ignored.
}{
{ {
"database": "foo", reset: true,
"retentionPolicy": "bar", name: "single point with timestamp",
"points": write: `{"database" : "%DB%", "retentionPolicy" : "%RP%", "points": [{"name": "cpu", "timestamp": "2015-02-28T01:03:36.703820946Z", "tags": {"host": "server01"}, "fields": {"value": 100}}]}`,
[{ query: `SELECT * FROM "%DB%"."myrp".cpu`,
"name": "cpu", expected: `{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["2015-02-28T01:03:36.703820946Z",100]]}]}]}`,
"tags": {
"host": "server01"
}, },
"timestamp": %d,
"precision": "n",
"fields":{
"value": 100
}
}]
}
`, now.UnixNano()))
expectedResults := client.Results{
Results: []client.Result{
{Series: []influxql.Row{
{ {
Name: "cpu", name: "single point, select with now()",
Columns: []string{"time", "value"}, query: `SELECT * FROM "%DB%"."%RP%".cpu WHERE time < now()`,
Values: [][]interface{}{ expected: `{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["2015-02-28T01:03:36.703820946Z",100]]}]}]}`,
{now.Format(time.RFC3339Nano), json.Number("100")},
}, },
}}},
},
}
simpleQuery(t, testName, nodes[:1], `select value from "foo"."bar".cpu`, expectedResults)
} }
func Test_Server3NodeIntegration(t *testing.T) { for _, tt := range tests {
if tt.reset {
t.Logf(`reseting for test "%s"`, tt.name)
deleteDatabase(t, testName, nodes, database)
createDatabase(t, testName, nodes, database)
createRetentionPolicy(t, testName, nodes, database, retention)
}
if tt.write != "" {
write(t, nodes[0], rewriteDbRp(tt.write, database, retention))
}
if tt.query != "" {
got, ok := query(t, nodes, rewriteDbRp(tt.query, database, retention), rewriteDbRp(tt.expected, database, retention))
if !ok {
t.Errorf(`Test "%s" failed, expected: %s, got: %s`, tt.name, rewriteDbRp(tt.expected, database, retention), got)
}
}
}
}
func TestSingleServer(t *testing.T) {
testName := "single server integration"
if testing.Short() { if testing.Short() {
t.Skip() t.Skip(fmt.Sprintf("skipping '%s'", testName))
} }
nNodes := 3 dir := tempfile()
basePort := 8190 defer func() {
testName := "3 node" os.RemoveAll(dir)
now := time.Now().UTC() }()
nodes := createCombinedNodeCluster(t, testName, nNodes, basePort)
createDatabase(t, testName, nodes, "foo") nodes := createCombinedNodeCluster(t, testName, dir, 1, 8090)
createRetentionPolicy(t, testName, nodes, "foo", "bar")
write(t, testName, nodes, fmt.Sprintf(` runTestsData(t, testName, nodes, "mydb", "myrp")
{
"database": "foo",
"retentionPolicy": "bar",
"points":
[{
"name": "cpu",
"tags": {
"host": "server01"
},
"timestamp": %d,
"precision": "n",
"fields":{
"value": 100
}
}]
}
`, now.UnixNano()))
expectedResults := client.Results{
Results: []client.Result{
{Series: []influxql.Row{
{
Name: "cpu",
Columns: []string{"time", "value"},
Values: [][]interface{}{
{now.Format(time.RFC3339Nano), json.Number("100")},
},
}}},
},
} }
simpleQuery(t, testName, nodes[:1], `select value from "foo"."bar".cpu`, expectedResults) func Test3NodeServer(t *testing.T) {
} testName := "3-node server integration"
func Test_Server5NodeIntegration(t *testing.T) {
t.Skip()
if testing.Short() { if testing.Short() {
t.Skip() t.Skip(fmt.Sprintf("skipping '%s'", testName))
} }
nNodes := 5 dir := tempfile()
basePort := 8290 defer func() {
testName := "5 node" os.RemoveAll(dir)
now := time.Now().UTC() }()
nodes := createCombinedNodeCluster(t, testName, nNodes, basePort)
createDatabase(t, testName, nodes, "foo") nodes := createCombinedNodeCluster(t, testName, dir, 3, 8190)
createRetentionPolicy(t, testName, nodes, "foo", "bar")
write(t, testName, nodes, fmt.Sprintf(`
{
"database": "foo",
"retentionPolicy": "bar",
"points":
[{
"name": "cpu",
"tags": {
"host": "server01"
},
"timestamp": %d,
"precision": "n",
"fields":{
"value": 100
}
}]
}
`, now.UnixNano()))
expectedResults := client.Results{ runTestsData(t, testName, nodes, "mydb", "myrp")
Results: []client.Result{
{Series: []influxql.Row{
{
Name: "cpu",
Columns: []string{"time", "value"},
Values: [][]interface{}{
{now.Format(time.RFC3339Nano), json.Number("100")},
},
}}},
},
}
simpleQuery(t, testName, nodes[:1], `select value from "foo"."bar".cpu`, expectedResults)
}
func Test_ServerSingleLargeBatchIntegration(t *testing.T) {
if testing.Short() {
t.Skip()
}
nNodes := 1
basePort := 8390
testName := "single node large batch"
nodes := createCombinedNodeCluster(t, testName, nNodes, basePort)
createDatabase(t, testName, nodes, "foo")
createRetentionPolicy(t, testName, nodes, "foo", "bar")
write(t, testName, nodes, createBatch(batchSize, "foo", "bar", "cpu", map[string]string{"host": "server01"}))
simpleCountQuery(t, testName, nodes, `select count(value) from "foo"."bar".cpu`, "value", batchSize)
}
func Test_Server3NodeLargeBatchIntegration(t *testing.T) {
if testing.Short() {
t.Skip()
}
nNodes := 3
basePort := 8490
testName := "3 node large batch"
nodes := createCombinedNodeCluster(t, testName, nNodes, basePort)
createDatabase(t, testName, nodes, "foo")
createRetentionPolicy(t, testName, nodes, "foo", "bar")
write(t, testName, nodes, createBatch(batchSize, "foo", "bar", "cpu", map[string]string{"host": "server01"}))
simpleCountQuery(t, testName, nodes, `select count(value) from "foo"."bar".cpu`, "value", batchSize)
}
func Test_Server5NodeLargeBatchIntegration(t *testing.T) {
t.Skip()
if testing.Short() {
t.Skip()
}
nNodes := 5
basePort := 8590
testName := "5 node large batch"
nodes := createCombinedNodeCluster(t, testName, nNodes, basePort)
createDatabase(t, testName, nodes, "foo")
createRetentionPolicy(t, testName, nodes, "foo", "bar")
write(t, testName, nodes, createBatch(batchSize, "foo", "bar", "cpu", map[string]string{"host": "server01"}))
simpleCountQuery(t, testName, nodes, `select count(value) from "foo"."bar".cpu`, "value", batchSize)
}
func Test_ServerMultiLargeBatchIntegration(t *testing.T) {
t.Skip()
if testing.Short() {
t.Skip()
}
nNodes := 1
nBatches := 5
basePort := 8690
testName := "single node multi batch"
nodes := createCombinedNodeCluster(t, testName, nNodes, basePort)
createDatabase(t, testName, nodes, "foo")
createRetentionPolicy(t, testName, nodes, "foo", "bar")
for i := 0; i < nBatches; i++ {
write(t, testName, nodes, createBatch(batchSize, "foo", "bar", "cpu", map[string]string{"host": "server01"}))
}
simpleCountQuery(t, testName, nodes, `select count(value) from "foo"."bar".cpu`, "value", batchSize*int64(nBatches))
} }