2015-02-02 21:01:35 +00:00
|
|
|
package main_test
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
|
|
|
"io/ioutil"
|
2015-02-20 23:11:51 +00:00
|
|
|
"math/rand"
|
2015-02-02 21:01:35 +00:00
|
|
|
"net/http"
|
|
|
|
"net/url"
|
|
|
|
"os"
|
|
|
|
"path/filepath"
|
|
|
|
"reflect"
|
2015-02-09 23:16:25 +00:00
|
|
|
"strconv"
|
2015-02-23 18:01:21 +00:00
|
|
|
"sync"
|
2015-02-02 21:01:35 +00:00
|
|
|
"testing"
|
|
|
|
"time"
|
|
|
|
|
2015-02-10 00:35:28 +00:00
|
|
|
"github.com/influxdb/influxdb"
|
2015-02-03 00:20:34 +00:00
|
|
|
"github.com/influxdb/influxdb/client"
|
2015-02-02 21:01:35 +00:00
|
|
|
"github.com/influxdb/influxdb/influxql"
|
2015-02-10 00:35:28 +00:00
|
|
|
"github.com/influxdb/influxdb/messaging"
|
2015-02-02 21:01:35 +00:00
|
|
|
|
|
|
|
main "github.com/influxdb/influxdb/cmd/influxd"
|
|
|
|
)
|
|
|
|
|
2015-02-21 19:54:05 +00:00
|
|
|
const (
|
|
|
|
// Use a prime batch size, so that internal batching code, which most likely
|
|
|
|
// uses nice round batches, has to deal with leftover.
|
|
|
|
batchSize = 4217
|
|
|
|
)
|
|
|
|
|
2015-02-10 03:58:17 +00:00
|
|
|
// urlFor returns a URL with the path and query params correctly appended and set.
|
|
|
|
func urlFor(u *url.URL, path string, params url.Values) *url.URL {
|
2015-02-26 00:01:06 +00:00
|
|
|
v, _ := url.Parse(u.String())
|
|
|
|
v.Path = path
|
|
|
|
v.RawQuery = params.Encode()
|
|
|
|
return v
|
2015-02-10 03:58:17 +00:00
|
|
|
}
|
|
|
|
|
2015-02-10 01:55:07 +00:00
|
|
|
// node represents a node under test, which is both a broker and data node.
|
2015-02-10 00:35:28 +00:00
|
|
|
type node struct {
|
|
|
|
broker *messaging.Broker
|
|
|
|
server *influxdb.Server
|
2015-02-10 01:55:07 +00:00
|
|
|
url *url.URL
|
2015-02-10 01:59:19 +00:00
|
|
|
leader bool
|
2015-02-10 00:35:28 +00:00
|
|
|
}
|
|
|
|
|
2015-02-10 02:00:58 +00:00
|
|
|
// cluster represents a multi-node cluster.
|
2015-02-25 23:57:58 +00:00
|
|
|
type cluster []*node
|
2015-02-10 02:00:58 +00:00
|
|
|
|
2015-02-20 23:11:51 +00:00
|
|
|
// createBatch returns a JSON string, representing the request body for a batch write. The timestamp
|
|
|
|
// simply increases and the value is a random integer.
|
|
|
|
func createBatch(nPoints int, database, retention, measurement string, tags map[string]string) string {
|
|
|
|
type Point struct {
|
|
|
|
Name string `json:"name"`
|
|
|
|
Tags map[string]string `json:"tags"`
|
|
|
|
Timestamp int64 `json:"timestamp"`
|
|
|
|
Precision string `json:"precision"`
|
2015-02-23 22:37:10 +00:00
|
|
|
Fields map[string]int `json:"fields"`
|
2015-02-20 23:11:51 +00:00
|
|
|
}
|
|
|
|
type PointBatch struct {
|
|
|
|
Database string `json:"database"`
|
|
|
|
RetentionPolicy string `json:"retentionPolicy"`
|
|
|
|
Points []Point `json:"points"`
|
|
|
|
}
|
|
|
|
|
|
|
|
rand.Seed(time.Now().UTC().UnixNano())
|
|
|
|
points := make([]Point, 0)
|
|
|
|
for i := 0; i < nPoints; i++ {
|
2015-02-23 22:37:10 +00:00
|
|
|
fields := map[string]int{"value": rand.Int()}
|
|
|
|
point := Point{Name: measurement, Tags: tags, Timestamp: time.Now().UTC().UnixNano(), Precision: "n", Fields: fields}
|
2015-02-20 23:11:51 +00:00
|
|
|
points = append(points, point)
|
|
|
|
}
|
|
|
|
batch := PointBatch{Database: database, RetentionPolicy: retention, Points: points}
|
|
|
|
|
|
|
|
buf, _ := json.Marshal(batch)
|
|
|
|
return string(buf)
|
|
|
|
}
|
|
|
|
|
2015-02-09 23:16:25 +00:00
|
|
|
// createCombinedNodeCluster creates a cluster of nServers nodes, each of which
|
|
|
|
// runs as both a Broker and Data node. If any part cluster creation fails,
|
|
|
|
// the testing is marked as failed.
|
2015-02-10 00:35:28 +00:00
|
|
|
//
|
|
|
|
// This function returns a slice of nodes, the first of which will be the leader.
|
2015-02-10 02:00:58 +00:00
|
|
|
func createCombinedNodeCluster(t *testing.T, testName string, nNodes, basePort int) cluster {
|
2015-02-09 23:46:55 +00:00
|
|
|
t.Logf("Creating cluster of %d nodes for test %s", nNodes, testName)
|
|
|
|
if nNodes < 1 {
|
2015-02-09 23:51:10 +00:00
|
|
|
t.Fatalf("Test %s: asked to create nonsense cluster", testName)
|
2015-02-09 23:16:25 +00:00
|
|
|
}
|
2015-02-02 21:01:35 +00:00
|
|
|
|
2015-02-25 23:57:58 +00:00
|
|
|
nodes := make([]*node, 0)
|
2015-02-10 00:35:28 +00:00
|
|
|
|
2015-02-02 21:01:35 +00:00
|
|
|
tmpDir := os.TempDir()
|
2015-02-09 23:16:25 +00:00
|
|
|
tmpBrokerDir := filepath.Join(tmpDir, "broker-integration-test")
|
|
|
|
tmpDataDir := filepath.Join(tmpDir, "data-integration-test")
|
2015-02-09 23:27:59 +00:00
|
|
|
t.Logf("Test %s: using tmp directory %q for brokers\n", testName, tmpBrokerDir)
|
|
|
|
t.Logf("Test %s: using tmp directory %q for data nodes\n", testName, tmpDataDir)
|
2015-02-09 23:16:25 +00:00
|
|
|
// Sometimes if a test fails, it's because of a log.Fatal() in the program.
|
2015-02-05 21:06:18 +00:00
|
|
|
// This prevents the defer from cleaning up directories.
|
|
|
|
// To be safe, nuke them always before starting
|
|
|
|
_ = os.RemoveAll(tmpBrokerDir)
|
|
|
|
_ = os.RemoveAll(tmpDataDir)
|
2015-02-02 21:01:35 +00:00
|
|
|
|
2015-02-09 23:16:25 +00:00
|
|
|
// Create the first node, special case.
|
2015-02-02 21:01:35 +00:00
|
|
|
c := main.NewConfig()
|
2015-02-09 23:16:25 +00:00
|
|
|
c.Broker.Dir = filepath.Join(tmpBrokerDir, strconv.Itoa(basePort))
|
|
|
|
c.Data.Dir = filepath.Join(tmpDataDir, strconv.Itoa(basePort))
|
|
|
|
c.Broker.Port = basePort
|
|
|
|
c.Data.Port = basePort
|
2015-02-12 00:33:21 +00:00
|
|
|
c.Admin.Enabled = false
|
2015-02-12 19:23:10 +00:00
|
|
|
c.ReportingDisabled = true
|
2015-02-09 23:40:21 +00:00
|
|
|
|
2015-02-10 00:35:28 +00:00
|
|
|
b, s := main.Run(c, "", "x.x", os.Stderr)
|
|
|
|
if b == nil {
|
|
|
|
t.Fatalf("Test %s: failed to create broker on port %d", testName, basePort)
|
|
|
|
}
|
2015-02-09 23:16:25 +00:00
|
|
|
if s == nil {
|
2015-02-10 00:35:28 +00:00
|
|
|
t.Fatalf("Test %s: failed to create leader data node on port %d", testName, basePort)
|
2015-02-09 23:40:21 +00:00
|
|
|
}
|
2015-02-25 23:57:58 +00:00
|
|
|
nodes = append(nodes, &node{
|
2015-02-10 01:55:07 +00:00
|
|
|
broker: b,
|
|
|
|
server: s,
|
|
|
|
url: &url.URL{Scheme: "http", Host: "localhost:" + strconv.Itoa(basePort)},
|
2015-02-10 01:59:19 +00:00
|
|
|
leader: true,
|
2015-02-10 01:55:07 +00:00
|
|
|
})
|
2015-02-09 23:40:21 +00:00
|
|
|
|
|
|
|
// Create subsequent nodes, which join to first node.
|
2015-02-09 23:46:55 +00:00
|
|
|
for i := 1; i < nNodes; i++ {
|
2015-02-09 23:40:21 +00:00
|
|
|
nextPort := basePort + i
|
|
|
|
c.Broker.Dir = filepath.Join(tmpBrokerDir, strconv.Itoa(nextPort))
|
|
|
|
c.Data.Dir = filepath.Join(tmpDataDir, strconv.Itoa(nextPort))
|
|
|
|
c.Broker.Port = nextPort
|
|
|
|
c.Data.Port = nextPort
|
|
|
|
|
2015-02-10 00:35:28 +00:00
|
|
|
b, s := main.Run(c, "http://localhost:"+strconv.Itoa(basePort), "x.x", os.Stderr)
|
|
|
|
if b == nil {
|
|
|
|
t.Fatalf("Test %s: failed to create following broker on port %d", testName, basePort)
|
|
|
|
}
|
2015-02-09 23:40:21 +00:00
|
|
|
if s == nil {
|
2015-02-10 00:35:28 +00:00
|
|
|
t.Fatalf("Test %s: failed to create following data node on port %d", testName, basePort)
|
2015-02-09 23:40:21 +00:00
|
|
|
}
|
2015-02-10 01:55:07 +00:00
|
|
|
|
2015-02-25 23:57:58 +00:00
|
|
|
nodes = append(nodes, &node{
|
2015-02-10 01:55:07 +00:00
|
|
|
broker: b,
|
|
|
|
server: s,
|
|
|
|
url: &url.URL{Scheme: "http", Host: "localhost:" + strconv.Itoa(nextPort)},
|
|
|
|
})
|
2015-02-09 23:16:25 +00:00
|
|
|
}
|
2015-02-10 00:35:28 +00:00
|
|
|
|
|
|
|
return nodes
|
2015-02-09 23:16:25 +00:00
|
|
|
}
|
2015-02-02 21:01:35 +00:00
|
|
|
|
2015-02-10 01:37:17 +00:00
|
|
|
// createDatabase creates a database, and verifies that the creation was successful.
|
2015-02-10 02:00:58 +00:00
|
|
|
func createDatabase(t *testing.T, testName string, nodes cluster, database string) {
|
2015-02-10 01:37:17 +00:00
|
|
|
t.Logf("Test: %s: creating database %s", testName, database)
|
2015-02-10 01:55:07 +00:00
|
|
|
serverURL := nodes[0].url
|
|
|
|
|
2015-02-09 23:16:25 +00:00
|
|
|
u := urlFor(serverURL, "query", url.Values{"q": []string{"CREATE DATABASE foo"}})
|
2015-02-09 22:29:42 +00:00
|
|
|
resp, err := http.Get(u.String())
|
2015-02-02 21:01:35 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("Couldn't create database: %s", err)
|
|
|
|
}
|
|
|
|
defer resp.Body.Close()
|
|
|
|
|
2015-02-03 00:20:34 +00:00
|
|
|
var results client.Results
|
2015-02-02 21:01:35 +00:00
|
|
|
err = json.NewDecoder(resp.Body).Decode(&results)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("Couldn't decode results: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if results.Error() != nil {
|
|
|
|
t.Logf("results.Error(): %q", results.Error().Error())
|
|
|
|
}
|
|
|
|
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
|
|
t.Fatalf("Create database failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode)
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(results.Results) != 1 {
|
|
|
|
t.Fatalf("Create database failed. Unexpected results length. expected: %d, actual %d", 1, len(results.Results))
|
|
|
|
}
|
|
|
|
|
|
|
|
// Query the database exists
|
2015-02-09 23:16:25 +00:00
|
|
|
u = urlFor(serverURL, "query", url.Values{"q": []string{"SHOW DATABASES"}})
|
2015-02-09 22:29:42 +00:00
|
|
|
resp, err = http.Get(u.String())
|
2015-02-02 21:01:35 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("Couldn't query databases: %s", err)
|
|
|
|
}
|
|
|
|
defer resp.Body.Close()
|
|
|
|
|
|
|
|
err = json.NewDecoder(resp.Body).Decode(&results)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("Couldn't decode results: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if results.Error() != nil {
|
|
|
|
t.Logf("results.Error(): %q", results.Error().Error())
|
|
|
|
}
|
|
|
|
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
|
|
t.Fatalf("show databases failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode)
|
|
|
|
}
|
|
|
|
|
2015-02-03 00:20:34 +00:00
|
|
|
expectedResults := client.Results{
|
|
|
|
Results: []client.Result{
|
2015-02-23 05:21:49 +00:00
|
|
|
{Series: []influxql.Row{
|
2015-02-13 18:25:48 +00:00
|
|
|
{
|
2015-02-03 00:20:34 +00:00
|
|
|
Columns: []string{"name"},
|
|
|
|
Values: [][]interface{}{{"foo"}},
|
|
|
|
},
|
|
|
|
}},
|
|
|
|
},
|
2015-02-02 21:01:35 +00:00
|
|
|
}
|
2015-02-03 00:20:34 +00:00
|
|
|
if !reflect.DeepEqual(results, expectedResults) {
|
|
|
|
t.Fatalf("show databases failed. Unexpected results. expected: %+v, actual %+v", expectedResults, results)
|
2015-02-02 21:01:35 +00:00
|
|
|
}
|
2015-02-10 01:37:17 +00:00
|
|
|
}
|
2015-02-02 21:01:35 +00:00
|
|
|
|
2015-02-10 01:37:17 +00:00
|
|
|
// createRetentionPolicy creates a retetention policy and verifies that the creation was successful.
|
2015-02-10 02:13:40 +00:00
|
|
|
func createRetentionPolicy(t *testing.T, testName string, nodes cluster, database, retention string) {
|
2015-02-02 21:01:35 +00:00
|
|
|
t.Log("Creating retention policy")
|
2015-02-10 01:55:07 +00:00
|
|
|
serverURL := nodes[0].url
|
2015-02-10 02:13:40 +00:00
|
|
|
replication := fmt.Sprintf("CREATE RETENTION POLICY bar ON foo DURATION 1h REPLICATION %d DEFAULT", len(nodes))
|
2015-02-10 01:55:07 +00:00
|
|
|
|
2015-02-10 01:37:17 +00:00
|
|
|
u := urlFor(serverURL, "query", url.Values{"q": []string{replication}})
|
|
|
|
resp, err := http.Get(u.String())
|
2015-02-02 21:01:35 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("Couldn't create retention policy: %s", err)
|
|
|
|
}
|
|
|
|
defer resp.Body.Close()
|
|
|
|
|
2015-02-10 01:37:17 +00:00
|
|
|
var results client.Results
|
2015-02-02 21:01:35 +00:00
|
|
|
err = json.NewDecoder(resp.Body).Decode(&results)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("Couldn't decode results: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if results.Error() != nil {
|
|
|
|
t.Logf("results.Error(): %q", results.Error().Error())
|
|
|
|
}
|
|
|
|
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
|
|
t.Fatalf("Create retention policy failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode)
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(results.Results) != 1 {
|
|
|
|
t.Fatalf("Create retention policy failed. Unexpected results length. expected: %d, actual %d", 1, len(results.Results))
|
|
|
|
}
|
2015-02-10 01:37:17 +00:00
|
|
|
}
|
2015-02-02 21:01:35 +00:00
|
|
|
|
2015-02-10 03:38:06 +00:00
|
|
|
// writes writes the provided data to the cluster. It verfies that a 200 OK is returned by the server.
|
2015-02-23 18:01:21 +00:00
|
|
|
func write(t *testing.T, testName string, nodes cluster, data string) {
|
|
|
|
t.Logf("Test %s: writing data", testName)
|
2015-02-10 01:55:07 +00:00
|
|
|
serverURL := nodes[0].url
|
2015-02-10 01:37:17 +00:00
|
|
|
u := urlFor(serverURL, "write", url.Values{})
|
2015-02-02 21:01:35 +00:00
|
|
|
|
2015-02-10 03:38:06 +00:00
|
|
|
buf := []byte(data)
|
2015-02-10 01:37:17 +00:00
|
|
|
resp, err := http.Post(u.String(), "application/json", bytes.NewReader(buf))
|
2015-02-02 21:01:35 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("Couldn't write data: %s", err)
|
|
|
|
}
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
|
|
t.Fatalf("Write to database failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode)
|
|
|
|
}
|
|
|
|
|
2015-02-23 18:01:21 +00:00
|
|
|
index, err := strconv.ParseInt(resp.Header.Get("X-InfluxDB-Index"), 10, 64)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("Couldn't get index. header: %s, err: %s.", resp.Header.Get("X-InfluxDB-Index"), err)
|
|
|
|
}
|
|
|
|
wait(t, testName, nodes, index)
|
|
|
|
t.Log("Finished writing and waiting")
|
2015-02-10 03:38:06 +00:00
|
|
|
}
|
|
|
|
|
2015-02-10 03:56:54 +00:00
|
|
|
// simpleQuery executes the given query against all nodes in the cluster, and verify the
|
|
|
|
// returned results are as expected.
|
2015-02-23 18:01:21 +00:00
|
|
|
func simpleQuery(t *testing.T, testName string, nodes cluster, query string, expected client.Results) {
|
2015-02-10 03:38:06 +00:00
|
|
|
var results client.Results
|
2015-02-02 21:01:35 +00:00
|
|
|
|
|
|
|
// Query the data exists
|
2015-02-10 03:56:54 +00:00
|
|
|
for _, n := range nodes {
|
2015-02-23 18:01:21 +00:00
|
|
|
t.Logf("Test name %s: query data on node %s", testName, n.url)
|
2015-02-10 03:56:54 +00:00
|
|
|
u := urlFor(n.url, "query", url.Values{"q": []string{query}, "db": []string{"foo"}})
|
|
|
|
resp, err := http.Get(u.String())
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("Couldn't query databases: %s", err)
|
|
|
|
}
|
|
|
|
defer resp.Body.Close()
|
2015-02-02 21:01:35 +00:00
|
|
|
|
2015-02-10 03:56:54 +00:00
|
|
|
body, err := ioutil.ReadAll(resp.Body)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("Couldn't read body of response: %s", err)
|
|
|
|
}
|
|
|
|
t.Logf("resp.Body: %s\n", string(body))
|
2015-02-02 21:01:35 +00:00
|
|
|
|
2015-02-10 03:56:54 +00:00
|
|
|
dec := json.NewDecoder(bytes.NewReader(body))
|
|
|
|
dec.UseNumber()
|
|
|
|
err = dec.Decode(&results)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("Couldn't decode results: %v", err)
|
|
|
|
}
|
2015-02-02 21:01:35 +00:00
|
|
|
|
2015-02-10 03:56:54 +00:00
|
|
|
if results.Error() != nil {
|
|
|
|
t.Logf("results.Error(): %q", results.Error().Error())
|
|
|
|
}
|
2015-02-02 21:01:35 +00:00
|
|
|
|
2015-02-10 03:56:54 +00:00
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
|
|
t.Fatalf("query databases failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode)
|
|
|
|
}
|
2015-02-02 21:01:35 +00:00
|
|
|
|
2015-02-10 03:56:54 +00:00
|
|
|
if !reflect.DeepEqual(results, expected) {
|
|
|
|
t.Logf("Expected: %#v\n", expected)
|
|
|
|
t.Logf("Actual: %#v\n", results)
|
|
|
|
t.Fatalf("query databases failed. Unexpected results.")
|
|
|
|
}
|
2015-02-10 03:38:06 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-02-23 18:01:21 +00:00
|
|
|
func wait(t *testing.T, testName string, nodes cluster, index int64) {
|
|
|
|
// Wait for the index to sync up
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
wg.Add(len(nodes))
|
|
|
|
for _, n := range nodes {
|
|
|
|
go func(t *testing.T, testName string, nodes cluster, u *url.URL, index int64) {
|
|
|
|
u = urlFor(u, fmt.Sprintf("wait/%d", index), nil)
|
|
|
|
t.Logf("Test name %s: wait on node %s for index %d", testName, u, index)
|
|
|
|
resp, err := http.Get(u.String())
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("Couldn't wait: %s", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
|
|
t.Fatalf("query databases failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode)
|
|
|
|
}
|
|
|
|
|
|
|
|
defer resp.Body.Close()
|
|
|
|
body, err := ioutil.ReadAll(resp.Body)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("Couldn't read body of response: %s", err)
|
|
|
|
}
|
|
|
|
t.Logf("resp.Body: %s\n", string(body))
|
|
|
|
|
|
|
|
i, _ := strconv.Atoi(string(body))
|
|
|
|
if i == 0 {
|
|
|
|
t.Fatalf("Unexpected body: %s", string(body))
|
|
|
|
}
|
|
|
|
|
|
|
|
wg.Done()
|
|
|
|
|
|
|
|
}(t, testName, nodes, n.url, index)
|
|
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
}
|
|
|
|
|
2015-02-21 19:54:05 +00:00
|
|
|
// simpleCountQuery executes the given query against all nodes in the cluster, and verify the
|
|
|
|
// the count for the given field is as expected.
|
2015-02-23 18:01:21 +00:00
|
|
|
func simpleCountQuery(t *testing.T, testName string, nodes cluster, query, field string, expected int64) {
|
2015-02-21 19:54:05 +00:00
|
|
|
var results client.Results
|
|
|
|
|
|
|
|
// Query the data exists
|
|
|
|
for _, n := range nodes {
|
2015-02-23 18:01:21 +00:00
|
|
|
t.Logf("Test name %s: query data on node %s", testName, n.url)
|
2015-02-21 19:54:05 +00:00
|
|
|
u := urlFor(n.url, "query", url.Values{"q": []string{query}, "db": []string{"foo"}})
|
|
|
|
resp, err := http.Get(u.String())
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("Couldn't query databases: %s", err)
|
|
|
|
}
|
|
|
|
defer resp.Body.Close()
|
|
|
|
|
|
|
|
body, err := ioutil.ReadAll(resp.Body)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("Couldn't read body of response: %s", err)
|
|
|
|
}
|
|
|
|
t.Logf("resp.Body: %s\n", string(body))
|
|
|
|
|
|
|
|
dec := json.NewDecoder(bytes.NewReader(body))
|
|
|
|
dec.UseNumber()
|
|
|
|
err = dec.Decode(&results)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("Couldn't decode results: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if results.Error() != nil {
|
|
|
|
t.Logf("results.Error(): %q", results.Error().Error())
|
|
|
|
}
|
|
|
|
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
|
|
t.Fatalf("query databases failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode)
|
|
|
|
}
|
|
|
|
|
2015-02-23 05:21:49 +00:00
|
|
|
if len(results.Results) != 1 || len(results.Results[0].Series) != 1 {
|
2015-02-21 20:07:19 +00:00
|
|
|
t.Fatal("results object returned has insufficient entries")
|
|
|
|
}
|
2015-02-23 05:21:49 +00:00
|
|
|
j, ok := results.Results[0].Series[0].Values[0][1].(json.Number)
|
2015-02-21 19:54:05 +00:00
|
|
|
if !ok {
|
|
|
|
t.Fatalf("count is not a JSON number")
|
|
|
|
}
|
|
|
|
count, err := j.Int64()
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("failed to convert count to int64")
|
|
|
|
}
|
|
|
|
if count != expected {
|
|
|
|
t.Fatalf("count value is wrong, expected %d, go %d", expected, count)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-02-10 03:38:06 +00:00
|
|
|
func Test_ServerSingleIntegration(t *testing.T) {
|
|
|
|
nNodes := 1
|
|
|
|
basePort := 8090
|
|
|
|
testName := "single node"
|
|
|
|
now := time.Now().UTC()
|
|
|
|
nodes := createCombinedNodeCluster(t, "single node", nNodes, basePort)
|
|
|
|
|
|
|
|
createDatabase(t, testName, nodes, "foo")
|
|
|
|
createRetentionPolicy(t, testName, nodes, "foo", "bar")
|
2015-02-10 03:42:56 +00:00
|
|
|
write(t, testName, nodes, fmt.Sprintf(`
|
|
|
|
{
|
2015-02-20 23:14:20 +00:00
|
|
|
"database": "foo",
|
|
|
|
"retentionPolicy": "bar",
|
2015-02-10 03:42:56 +00:00
|
|
|
"points":
|
|
|
|
[{
|
2015-02-20 23:14:20 +00:00
|
|
|
"name": "cpu",
|
2015-02-10 03:42:56 +00:00
|
|
|
"tags": {
|
|
|
|
"host": "server01"
|
|
|
|
},
|
|
|
|
"timestamp": %d,
|
2015-02-20 23:14:20 +00:00
|
|
|
"precision": "n",
|
2015-02-23 22:37:10 +00:00
|
|
|
"fields":{
|
2015-02-10 03:42:56 +00:00
|
|
|
"value": 100
|
|
|
|
}
|
|
|
|
}]
|
|
|
|
}
|
|
|
|
`, now.UnixNano()))
|
2015-02-10 01:37:17 +00:00
|
|
|
expectedResults := client.Results{
|
2015-02-03 00:20:34 +00:00
|
|
|
Results: []client.Result{
|
2015-02-23 05:21:49 +00:00
|
|
|
{Series: []influxql.Row{
|
2015-02-03 00:20:34 +00:00
|
|
|
{
|
|
|
|
Name: "cpu",
|
|
|
|
Columns: []string{"time", "value"},
|
|
|
|
Values: [][]interface{}{
|
2015-02-13 18:25:48 +00:00
|
|
|
{now.Format(time.RFC3339Nano), json.Number("100")},
|
2015-02-03 00:20:34 +00:00
|
|
|
},
|
|
|
|
}}},
|
2015-02-02 21:01:35 +00:00
|
|
|
},
|
|
|
|
}
|
2015-02-10 03:56:54 +00:00
|
|
|
simpleQuery(t, testName, nodes[:1], `select value from "foo"."bar".cpu`, expectedResults)
|
2015-02-09 23:27:59 +00:00
|
|
|
}
|
|
|
|
|
2015-02-09 23:40:21 +00:00
|
|
|
func Test_Server3NodeIntegration(t *testing.T) {
|
2015-02-10 02:09:32 +00:00
|
|
|
if testing.Short() {
|
|
|
|
t.Skip()
|
|
|
|
}
|
2015-02-09 23:46:55 +00:00
|
|
|
nNodes := 3
|
2015-02-10 00:39:15 +00:00
|
|
|
basePort := 8190
|
2015-02-10 01:37:17 +00:00
|
|
|
testName := "3 node"
|
2015-02-10 03:38:06 +00:00
|
|
|
now := time.Now().UTC()
|
2015-02-10 01:55:07 +00:00
|
|
|
nodes := createCombinedNodeCluster(t, testName, nNodes, basePort)
|
2015-02-09 23:46:55 +00:00
|
|
|
|
2015-02-10 01:55:07 +00:00
|
|
|
createDatabase(t, testName, nodes, "foo")
|
2015-02-10 02:13:40 +00:00
|
|
|
createRetentionPolicy(t, testName, nodes, "foo", "bar")
|
2015-02-10 03:42:56 +00:00
|
|
|
write(t, testName, nodes, fmt.Sprintf(`
|
|
|
|
{
|
2015-02-20 23:14:20 +00:00
|
|
|
"database": "foo",
|
|
|
|
"retentionPolicy": "bar",
|
2015-02-10 03:42:56 +00:00
|
|
|
"points":
|
|
|
|
[{
|
2015-02-20 23:14:20 +00:00
|
|
|
"name": "cpu",
|
2015-02-10 03:42:56 +00:00
|
|
|
"tags": {
|
|
|
|
"host": "server01"
|
|
|
|
},
|
|
|
|
"timestamp": %d,
|
2015-02-20 23:14:20 +00:00
|
|
|
"precision": "n",
|
2015-02-23 22:37:10 +00:00
|
|
|
"fields":{
|
2015-02-10 03:42:56 +00:00
|
|
|
"value": 100
|
|
|
|
}
|
|
|
|
}]
|
|
|
|
}
|
|
|
|
`, now.UnixNano()))
|
2015-02-10 03:38:06 +00:00
|
|
|
expectedResults := client.Results{
|
|
|
|
Results: []client.Result{
|
2015-02-23 05:21:49 +00:00
|
|
|
{Series: []influxql.Row{
|
2015-02-10 03:38:06 +00:00
|
|
|
{
|
|
|
|
Name: "cpu",
|
|
|
|
Columns: []string{"time", "value"},
|
|
|
|
Values: [][]interface{}{
|
2015-02-13 18:25:48 +00:00
|
|
|
{now.Format(time.RFC3339Nano), json.Number("100")},
|
2015-02-10 03:38:06 +00:00
|
|
|
},
|
|
|
|
}}},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
2015-02-10 03:56:54 +00:00
|
|
|
simpleQuery(t, testName, nodes[:1], `select value from "foo"."bar".cpu`, expectedResults)
|
2015-02-09 23:46:55 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func Test_Server5NodeIntegration(t *testing.T) {
|
2015-02-25 03:23:44 +00:00
|
|
|
t.Skip()
|
2015-02-10 02:09:32 +00:00
|
|
|
if testing.Short() {
|
|
|
|
t.Skip()
|
|
|
|
}
|
2015-02-09 23:46:55 +00:00
|
|
|
nNodes := 5
|
2015-02-10 00:39:15 +00:00
|
|
|
basePort := 8290
|
2015-02-10 01:37:17 +00:00
|
|
|
testName := "5 node"
|
2015-02-10 03:38:06 +00:00
|
|
|
now := time.Now().UTC()
|
2015-02-10 01:55:07 +00:00
|
|
|
nodes := createCombinedNodeCluster(t, testName, nNodes, basePort)
|
2015-02-10 01:37:17 +00:00
|
|
|
|
2015-02-10 01:55:07 +00:00
|
|
|
createDatabase(t, testName, nodes, "foo")
|
2015-02-10 02:13:40 +00:00
|
|
|
createRetentionPolicy(t, testName, nodes, "foo", "bar")
|
2015-02-10 03:42:56 +00:00
|
|
|
write(t, testName, nodes, fmt.Sprintf(`
|
|
|
|
{
|
2015-02-20 23:14:20 +00:00
|
|
|
"database": "foo",
|
|
|
|
"retentionPolicy": "bar",
|
2015-02-10 03:42:56 +00:00
|
|
|
"points":
|
|
|
|
[{
|
2015-02-20 23:14:20 +00:00
|
|
|
"name": "cpu",
|
2015-02-10 03:42:56 +00:00
|
|
|
"tags": {
|
|
|
|
"host": "server01"
|
|
|
|
},
|
|
|
|
"timestamp": %d,
|
2015-02-20 23:14:20 +00:00
|
|
|
"precision": "n",
|
2015-02-23 22:37:10 +00:00
|
|
|
"fields":{
|
2015-02-10 03:42:56 +00:00
|
|
|
"value": 100
|
|
|
|
}
|
|
|
|
}]
|
|
|
|
}
|
|
|
|
`, now.UnixNano()))
|
|
|
|
|
2015-02-10 03:38:06 +00:00
|
|
|
expectedResults := client.Results{
|
|
|
|
Results: []client.Result{
|
2015-02-23 05:21:49 +00:00
|
|
|
{Series: []influxql.Row{
|
2015-02-10 03:38:06 +00:00
|
|
|
{
|
|
|
|
Name: "cpu",
|
|
|
|
Columns: []string{"time", "value"},
|
|
|
|
Values: [][]interface{}{
|
2015-02-13 18:25:48 +00:00
|
|
|
{now.Format(time.RFC3339Nano), json.Number("100")},
|
2015-02-10 03:38:06 +00:00
|
|
|
},
|
|
|
|
}}},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
2015-02-10 03:56:54 +00:00
|
|
|
simpleQuery(t, testName, nodes[:1], `select value from "foo"."bar".cpu`, expectedResults)
|
2015-02-09 23:40:21 +00:00
|
|
|
}
|
2015-02-20 23:11:51 +00:00
|
|
|
|
|
|
|
func Test_ServerSingleLargeBatchIntegration(t *testing.T) {
|
|
|
|
if testing.Short() {
|
|
|
|
t.Skip()
|
|
|
|
}
|
|
|
|
nNodes := 1
|
|
|
|
basePort := 8390
|
|
|
|
testName := "single node large batch"
|
2015-02-21 20:24:02 +00:00
|
|
|
nodes := createCombinedNodeCluster(t, testName, nNodes, basePort)
|
2015-02-20 23:11:51 +00:00
|
|
|
|
|
|
|
createDatabase(t, testName, nodes, "foo")
|
|
|
|
createRetentionPolicy(t, testName, nodes, "foo", "bar")
|
2015-02-21 19:54:05 +00:00
|
|
|
write(t, testName, nodes, createBatch(batchSize, "foo", "bar", "cpu", map[string]string{"host": "server01"}))
|
2015-02-21 20:24:02 +00:00
|
|
|
simpleCountQuery(t, testName, nodes, `select count(value) from "foo"."bar".cpu`, "value", batchSize)
|
2015-02-20 23:11:51 +00:00
|
|
|
}
|
2015-02-21 20:07:19 +00:00
|
|
|
|
|
|
|
func Test_Server3NodeLargeBatchIntegration(t *testing.T) {
|
|
|
|
if testing.Short() {
|
|
|
|
t.Skip()
|
|
|
|
}
|
|
|
|
nNodes := 3
|
|
|
|
basePort := 8490
|
|
|
|
testName := "3 node large batch"
|
|
|
|
nodes := createCombinedNodeCluster(t, testName, nNodes, basePort)
|
|
|
|
|
|
|
|
createDatabase(t, testName, nodes, "foo")
|
|
|
|
createRetentionPolicy(t, testName, nodes, "foo", "bar")
|
|
|
|
write(t, testName, nodes, createBatch(batchSize, "foo", "bar", "cpu", map[string]string{"host": "server01"}))
|
|
|
|
simpleCountQuery(t, "single node large batch", nodes, `select count(value) from "foo"."bar".cpu`, "value", batchSize)
|
|
|
|
}
|
|
|
|
|
|
|
|
func Test_Server5NodeLargeBatchIntegration(t *testing.T) {
|
2015-02-25 03:23:44 +00:00
|
|
|
t.Skip()
|
2015-02-21 20:07:19 +00:00
|
|
|
if testing.Short() {
|
|
|
|
t.Skip()
|
|
|
|
}
|
|
|
|
nNodes := 5
|
|
|
|
basePort := 8590
|
|
|
|
testName := "5 node large batch"
|
|
|
|
nodes := createCombinedNodeCluster(t, testName, nNodes, basePort)
|
|
|
|
|
|
|
|
createDatabase(t, testName, nodes, "foo")
|
|
|
|
createRetentionPolicy(t, testName, nodes, "foo", "bar")
|
|
|
|
write(t, testName, nodes, createBatch(batchSize, "foo", "bar", "cpu", map[string]string{"host": "server01"}))
|
|
|
|
simpleCountQuery(t, "single node large batch", nodes, `select count(value) from "foo"."bar".cpu`, "value", batchSize)
|
|
|
|
}
|
2015-02-21 20:21:34 +00:00
|
|
|
|
|
|
|
func Test_ServerMultiLargeBatchIntegration(t *testing.T) {
|
2015-02-25 03:23:44 +00:00
|
|
|
t.Skip()
|
2015-02-21 20:21:34 +00:00
|
|
|
if testing.Short() {
|
|
|
|
t.Skip()
|
|
|
|
}
|
|
|
|
nNodes := 1
|
|
|
|
nBatches := 5
|
|
|
|
basePort := 8690
|
|
|
|
testName := "single node multi batch"
|
|
|
|
nodes := createCombinedNodeCluster(t, testName, nNodes, basePort)
|
|
|
|
|
|
|
|
createDatabase(t, testName, nodes, "foo")
|
|
|
|
createRetentionPolicy(t, testName, nodes, "foo", "bar")
|
|
|
|
for i := 0; i < nBatches; i++ {
|
|
|
|
write(t, testName, nodes, createBatch(batchSize, "foo", "bar", "cpu", map[string]string{"host": "server01"}))
|
|
|
|
}
|
2015-02-21 20:24:02 +00:00
|
|
|
simpleCountQuery(t, testName, nodes, `select count(value) from "foo"."bar".cpu`, "value", batchSize*int64(nBatches))
|
2015-02-21 20:21:34 +00:00
|
|
|
}
|