influxdb/cmd/influxd/server_integration_test.go

419 lines
11 KiB
Go
Raw Normal View History

2015-02-02 21:01:35 +00:00
package main_test
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"path/filepath"
"reflect"
2015-02-09 23:16:25 +00:00
"strconv"
2015-02-02 21:01:35 +00:00
"testing"
"time"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/client"
2015-02-02 21:01:35 +00:00
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/messaging"
2015-02-02 21:01:35 +00:00
main "github.com/influxdb/influxdb/cmd/influxd"
)
2015-02-10 01:55:07 +00:00
// node represents a node under test, which is both a broker and data node.
type node struct {
broker *messaging.Broker
server *influxdb.Server
2015-02-10 01:55:07 +00:00
url *url.URL
2015-02-10 01:59:19 +00:00
leader bool
}
2015-02-10 02:00:58 +00:00
// cluster represents a multi-node cluster.
type cluster []node
2015-02-09 23:16:25 +00:00
// createCombinedNodeCluster creates a cluster of nServers nodes, each of which
// runs as both a Broker and Data node. If any part cluster creation fails,
// the testing is marked as failed.
//
// This function returns a slice of nodes, the first of which will be the leader.
2015-02-10 02:00:58 +00:00
func createCombinedNodeCluster(t *testing.T, testName string, nNodes, basePort int) cluster {
2015-02-09 23:46:55 +00:00
t.Logf("Creating cluster of %d nodes for test %s", nNodes, testName)
if nNodes < 1 {
t.Fatalf("Test %s: asked to create nonsense cluster", testName)
2015-02-09 23:16:25 +00:00
}
2015-02-02 21:01:35 +00:00
nodes := make([]node, 0)
2015-02-02 21:01:35 +00:00
tmpDir := os.TempDir()
2015-02-09 23:16:25 +00:00
tmpBrokerDir := filepath.Join(tmpDir, "broker-integration-test")
tmpDataDir := filepath.Join(tmpDir, "data-integration-test")
2015-02-09 23:27:59 +00:00
t.Logf("Test %s: using tmp directory %q for brokers\n", testName, tmpBrokerDir)
t.Logf("Test %s: using tmp directory %q for data nodes\n", testName, tmpDataDir)
2015-02-09 23:16:25 +00:00
// Sometimes if a test fails, it's because of a log.Fatal() in the program.
// This prevents the defer from cleaning up directories.
// To be safe, nuke them always before starting
_ = os.RemoveAll(tmpBrokerDir)
_ = os.RemoveAll(tmpDataDir)
2015-02-02 21:01:35 +00:00
2015-02-09 23:16:25 +00:00
// Create the first node, special case.
2015-02-02 21:01:35 +00:00
c := main.NewConfig()
2015-02-09 23:16:25 +00:00
c.Broker.Dir = filepath.Join(tmpBrokerDir, strconv.Itoa(basePort))
c.Data.Dir = filepath.Join(tmpDataDir, strconv.Itoa(basePort))
c.Broker.Port = basePort
c.Data.Port = basePort
2015-02-09 23:40:21 +00:00
b, s := main.Run(c, "", "x.x", os.Stderr)
if b == nil {
t.Fatalf("Test %s: failed to create broker on port %d", testName, basePort)
}
2015-02-09 23:16:25 +00:00
if s == nil {
t.Fatalf("Test %s: failed to create leader data node on port %d", testName, basePort)
2015-02-09 23:40:21 +00:00
}
2015-02-10 01:55:07 +00:00
nodes = append(nodes, node{
broker: b,
server: s,
url: &url.URL{Scheme: "http", Host: "localhost:" + strconv.Itoa(basePort)},
2015-02-10 01:59:19 +00:00
leader: true,
2015-02-10 01:55:07 +00:00
})
2015-02-09 23:40:21 +00:00
// Create subsequent nodes, which join to first node.
2015-02-09 23:46:55 +00:00
for i := 1; i < nNodes; i++ {
2015-02-09 23:40:21 +00:00
nextPort := basePort + i
c.Broker.Dir = filepath.Join(tmpBrokerDir, strconv.Itoa(nextPort))
c.Data.Dir = filepath.Join(tmpDataDir, strconv.Itoa(nextPort))
c.Broker.Port = nextPort
c.Data.Port = nextPort
b, s := main.Run(c, "http://localhost:"+strconv.Itoa(basePort), "x.x", os.Stderr)
if b == nil {
t.Fatalf("Test %s: failed to create following broker on port %d", testName, basePort)
}
2015-02-09 23:40:21 +00:00
if s == nil {
t.Fatalf("Test %s: failed to create following data node on port %d", testName, basePort)
2015-02-09 23:40:21 +00:00
}
2015-02-10 01:55:07 +00:00
nodes = append(nodes, node{
broker: b,
server: s,
url: &url.URL{Scheme: "http", Host: "localhost:" + strconv.Itoa(nextPort)},
})
2015-02-09 23:16:25 +00:00
}
return nodes
2015-02-09 23:16:25 +00:00
}
2015-02-02 21:01:35 +00:00
// createDatabase creates a database, and verifies that the creation was successful.
2015-02-10 02:00:58 +00:00
func createDatabase(t *testing.T, testName string, nodes cluster, database string) {
t.Logf("Test: %s: creating database %s", testName, database)
2015-02-10 01:55:07 +00:00
serverURL := nodes[0].url
2015-02-09 23:16:25 +00:00
u := urlFor(serverURL, "query", url.Values{"q": []string{"CREATE DATABASE foo"}})
2015-02-09 22:29:42 +00:00
resp, err := http.Get(u.String())
2015-02-02 21:01:35 +00:00
if err != nil {
t.Fatalf("Couldn't create database: %s", err)
}
defer resp.Body.Close()
var results client.Results
2015-02-02 21:01:35 +00:00
err = json.NewDecoder(resp.Body).Decode(&results)
if err != nil {
t.Fatalf("Couldn't decode results: %v", err)
}
if results.Error() != nil {
t.Logf("results.Error(): %q", results.Error().Error())
}
if resp.StatusCode != http.StatusOK {
t.Fatalf("Create database failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode)
}
if len(results.Results) != 1 {
t.Fatalf("Create database failed. Unexpected results length. expected: %d, actual %d", 1, len(results.Results))
}
// Query the database exists
2015-02-09 23:16:25 +00:00
u = urlFor(serverURL, "query", url.Values{"q": []string{"SHOW DATABASES"}})
2015-02-09 22:29:42 +00:00
resp, err = http.Get(u.String())
2015-02-02 21:01:35 +00:00
if err != nil {
t.Fatalf("Couldn't query databases: %s", err)
}
defer resp.Body.Close()
err = json.NewDecoder(resp.Body).Decode(&results)
if err != nil {
t.Fatalf("Couldn't decode results: %v", err)
}
if results.Error() != nil {
t.Logf("results.Error(): %q", results.Error().Error())
}
if resp.StatusCode != http.StatusOK {
t.Fatalf("show databases failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode)
}
expectedResults := client.Results{
Results: []client.Result{
{Rows: []influxql.Row{
influxql.Row{
Columns: []string{"name"},
Values: [][]interface{}{{"foo"}},
},
}},
},
2015-02-02 21:01:35 +00:00
}
if !reflect.DeepEqual(results, expectedResults) {
t.Fatalf("show databases failed. Unexpected results. expected: %+v, actual %+v", expectedResults, results)
2015-02-02 21:01:35 +00:00
}
}
2015-02-02 21:01:35 +00:00
// createRetentionPolicy creates a retetention policy and verifies that the creation was successful.
2015-02-10 02:13:40 +00:00
func createRetentionPolicy(t *testing.T, testName string, nodes cluster, database, retention string) {
2015-02-02 21:01:35 +00:00
t.Log("Creating retention policy")
2015-02-10 01:55:07 +00:00
serverURL := nodes[0].url
2015-02-10 02:13:40 +00:00
replication := fmt.Sprintf("CREATE RETENTION POLICY bar ON foo DURATION 1h REPLICATION %d DEFAULT", len(nodes))
2015-02-10 01:55:07 +00:00
u := urlFor(serverURL, "query", url.Values{"q": []string{replication}})
resp, err := http.Get(u.String())
2015-02-02 21:01:35 +00:00
if err != nil {
t.Fatalf("Couldn't create retention policy: %s", err)
}
defer resp.Body.Close()
var results client.Results
2015-02-02 21:01:35 +00:00
err = json.NewDecoder(resp.Body).Decode(&results)
if err != nil {
t.Fatalf("Couldn't decode results: %v", err)
}
if results.Error() != nil {
t.Logf("results.Error(): %q", results.Error().Error())
}
if resp.StatusCode != http.StatusOK {
t.Fatalf("Create retention policy failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode)
}
if len(results.Results) != 1 {
t.Fatalf("Create retention policy failed. Unexpected results length. expected: %d, actual %d", 1, len(results.Results))
}
}
2015-02-02 21:01:35 +00:00
2015-02-10 03:38:06 +00:00
// writes writes the provided data to the cluster. It verfies that a 200 OK is returned by the server.
func write(t *testing.T, testname string, nodes cluster, data string) {
t.Logf("Test %s: writing data", testname)
2015-02-10 01:55:07 +00:00
serverURL := nodes[0].url
u := urlFor(serverURL, "write", url.Values{})
2015-02-02 21:01:35 +00:00
2015-02-10 03:38:06 +00:00
buf := []byte(data)
2015-02-02 21:01:35 +00:00
t.Logf("Writing raw data: %s", string(buf))
resp, err := http.Post(u.String(), "application/json", bytes.NewReader(buf))
2015-02-02 21:01:35 +00:00
if err != nil {
t.Fatalf("Couldn't write data: %s", err)
}
if resp.StatusCode != http.StatusOK {
t.Fatalf("Write to database failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode)
}
// Need some time for server to get consensus and write data
// TODO corylanou query the status endpoint for the server and wait for the index to update to know the write was applied
2015-02-10 02:13:40 +00:00
time.Sleep(time.Duration(len(nodes)) * time.Second)
2015-02-10 03:38:06 +00:00
}
// simpleQuery creates a simple database, retention policy, and replicates
// the data across all nodes. It then ensures a series of writes and queries are OK.
func simpleQuery(t *testing.T, testname string, nodes cluster, query string, expected client.Results) {
2015-02-10 03:38:06 +00:00
serverURL := nodes[0].url
var results client.Results
2015-02-02 21:01:35 +00:00
// Query the data exists
t.Log("Query data")
u := urlFor(serverURL, "query", url.Values{"q": []string{query}, "db": []string{"foo"}})
2015-02-10 03:38:06 +00:00
resp, err := http.Get(u.String())
2015-02-02 21:01:35 +00:00
if err != nil {
t.Fatalf("Couldn't query databases: %s", err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
2015-02-09 20:07:43 +00:00
t.Fatalf("Couldn't read body of response: %s", err)
2015-02-02 21:01:35 +00:00
}
t.Logf("resp.Body: %s\n", string(body))
dec := json.NewDecoder(bytes.NewReader(body))
dec.UseNumber()
err = dec.Decode(&results)
if err != nil {
t.Fatalf("Couldn't decode results: %v", err)
}
if results.Error() != nil {
t.Logf("results.Error(): %q", results.Error().Error())
}
if resp.StatusCode != http.StatusOK {
t.Fatalf("query databases failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode)
}
2015-02-10 03:38:06 +00:00
if !reflect.DeepEqual(results, expected) {
t.Logf("Expected: %#v\n", expected)
t.Logf("Actual: %#v\n", results)
t.Fatalf("query databases failed. Unexpected results.")
}
}
func Test_ServerSingleIntegration(t *testing.T) {
nNodes := 1
basePort := 8090
testName := "single node"
now := time.Now().UTC()
nodes := createCombinedNodeCluster(t, "single node", nNodes, basePort)
createDatabase(t, testName, nodes, "foo")
createRetentionPolicy(t, testName, nodes, "foo", "bar")
write(t, testName, nodes, fmt.Sprintf(`
{
"database":
"foo",
"retentionPolicy":
"bar",
"points":
[{
"name":
"cpu",
"tags": {
"host": "server01"
},
"timestamp": %d,
"precision":"n",
"values":{
"value": 100
}
}]
}
`, now.UnixNano()))
expectedResults := client.Results{
Results: []client.Result{
{Rows: []influxql.Row{
{
Name: "cpu",
Columns: []string{"time", "value"},
Values: [][]interface{}{
2015-02-04 17:17:03 +00:00
[]interface{}{now.Format(time.RFC3339Nano), json.Number("100")},
},
}}},
2015-02-02 21:01:35 +00:00
},
}
simpleQuery(t, testName, nodes, `select value from "foo"."bar".cpu`, expectedResults)
2015-02-09 23:27:59 +00:00
}
2015-02-09 23:40:21 +00:00
func Test_Server3NodeIntegration(t *testing.T) {
if testing.Short() {
t.Skip()
}
2015-02-09 23:46:55 +00:00
nNodes := 3
basePort := 8190
testName := "3 node"
2015-02-10 03:38:06 +00:00
now := time.Now().UTC()
2015-02-10 01:55:07 +00:00
nodes := createCombinedNodeCluster(t, testName, nNodes, basePort)
2015-02-09 23:46:55 +00:00
2015-02-10 01:55:07 +00:00
createDatabase(t, testName, nodes, "foo")
2015-02-10 02:13:40 +00:00
createRetentionPolicy(t, testName, nodes, "foo", "bar")
write(t, testName, nodes, fmt.Sprintf(`
{
"database":
"foo",
"retentionPolicy":
"bar",
"points":
[{
"name":
"cpu",
"tags": {
"host": "server01"
},
"timestamp": %d,
"precision":"n",
"values":{
"value": 100
}
}]
}
`, now.UnixNano()))
2015-02-10 03:38:06 +00:00
expectedResults := client.Results{
Results: []client.Result{
{Rows: []influxql.Row{
{
Name: "cpu",
Columns: []string{"time", "value"},
Values: [][]interface{}{
[]interface{}{now.Format(time.RFC3339Nano), json.Number("100")},
},
}}},
},
}
simpleQuery(t, testName, nodes, `select value from "foo"."bar".cpu`, expectedResults)
2015-02-09 23:46:55 +00:00
}
func Test_Server5NodeIntegration(t *testing.T) {
if testing.Short() {
t.Skip()
}
2015-02-09 23:46:55 +00:00
nNodes := 5
basePort := 8290
testName := "5 node"
2015-02-10 03:38:06 +00:00
now := time.Now().UTC()
2015-02-10 01:55:07 +00:00
nodes := createCombinedNodeCluster(t, testName, nNodes, basePort)
2015-02-10 01:55:07 +00:00
createDatabase(t, testName, nodes, "foo")
2015-02-10 02:13:40 +00:00
createRetentionPolicy(t, testName, nodes, "foo", "bar")
write(t, testName, nodes, fmt.Sprintf(`
{
"database":
"foo",
"retentionPolicy":
"bar",
"points":
[{
"name":
"cpu",
"tags": {
"host": "server01"
},
"timestamp": %d,
"precision":"n",
"values":{
"value": 100
}
}]
}
`, now.UnixNano()))
2015-02-10 03:38:06 +00:00
expectedResults := client.Results{
Results: []client.Result{
{Rows: []influxql.Row{
{
Name: "cpu",
Columns: []string{"time", "value"},
Values: [][]interface{}{
[]interface{}{now.Format(time.RFC3339Nano), json.Number("100")},
},
}}},
},
}
simpleQuery(t, testName, nodes, `select value from "foo"."bar".cpu`, expectedResults)
2015-02-09 23:40:21 +00:00
}
2015-02-02 21:01:35 +00:00
func urlFor(u *url.URL, path string, params url.Values) *url.URL {
u.Path = path
u.RawQuery = params.Encode()
return u
}