Factor out cluster creation
parent
a35cb9703c
commit
3cbed350b3
|
@ -10,6 +10,7 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -19,59 +20,52 @@ import (
|
|||
main "github.com/influxdb/influxdb/cmd/influxd"
|
||||
)
|
||||
|
||||
func Test_ServerSingleIntegration(t *testing.T) {
|
||||
// createCombinedNodeCluster creates a cluster of nServers nodes, each of which
|
||||
// runs as both a Broker and Data node. If any part cluster creation fails,
|
||||
// the testing is marked as failed.
|
||||
func createCombinedNodeCluster(t *testing.T, testName string, nServers, basePort int) {
|
||||
t.Logf("Creating cluster of %d nodes for test %s", nServers, testName)
|
||||
|
||||
var (
|
||||
join = ""
|
||||
version = "x.x"
|
||||
)
|
||||
if nServers == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
tmpDir := os.TempDir()
|
||||
tmpBrokerDir := filepath.Join(tmpDir, "broker")
|
||||
tmpDataDir := filepath.Join(tmpDir, "data")
|
||||
t.Logf("Using tmp directory %q for broker\n", tmpBrokerDir)
|
||||
t.Logf("Using tmp directory %q for data\n", tmpDataDir)
|
||||
// Sometimes if this test fails, it's because of a log.Fatal() in the program.
|
||||
tmpBrokerDir := filepath.Join(tmpDir, "broker-integration-test")
|
||||
tmpDataDir := filepath.Join(tmpDir, "data-integration-test")
|
||||
t.Logf("Using tmp directory %q for brokers\n", tmpBrokerDir)
|
||||
t.Logf("Using tmp directory %q for data nodes\n", tmpDataDir)
|
||||
// Sometimes if a test fails, it's because of a log.Fatal() in the program.
|
||||
// This prevents the defer from cleaning up directories.
|
||||
// To be safe, nuke them always before starting
|
||||
_ = os.RemoveAll(tmpBrokerDir)
|
||||
_ = os.RemoveAll(tmpDataDir)
|
||||
|
||||
// Create the first node, special case.
|
||||
c := main.NewConfig()
|
||||
c.Broker.Dir = tmpBrokerDir
|
||||
c.Broker.Port = 8090
|
||||
c.Data.Dir = tmpDataDir
|
||||
c.Data.Port = 8090
|
||||
c.Broker.Dir = filepath.Join(tmpBrokerDir, strconv.Itoa(basePort))
|
||||
c.Data.Dir = filepath.Join(tmpDataDir, strconv.Itoa(basePort))
|
||||
c.Broker.Port = basePort
|
||||
c.Data.Port = basePort
|
||||
s := main.Run(c, "", "x.x", os.Stderr)
|
||||
if s == nil {
|
||||
t.Fatalf("Failed to create node on port %d", basePort)
|
||||
}
|
||||
}
|
||||
|
||||
func Test_ServerSingleIntegration(t *testing.T) {
|
||||
|
||||
now := time.Now().UTC()
|
||||
|
||||
s := main.Run(c, join, version, os.Stderr)
|
||||
|
||||
defer func() {
|
||||
t.Log("Shutting down server and cleaning up tmp directories")
|
||||
if s != nil {
|
||||
s.Close()
|
||||
}
|
||||
|
||||
err := os.RemoveAll(tmpBrokerDir)
|
||||
if err != nil {
|
||||
t.Logf("Failed to clean up %q: %s\n", tmpBrokerDir, err)
|
||||
}
|
||||
err = os.RemoveAll(tmpDataDir)
|
||||
if err != nil {
|
||||
t.Logf("Failed to clean up %q: %s\n", tmpDataDir, err)
|
||||
}
|
||||
}()
|
||||
|
||||
if s == nil {
|
||||
t.Fatalf("Failed to open server")
|
||||
createCombinedNodeCluster(t, "single node", 1, 8090)
|
||||
serverURL := &url.URL{
|
||||
Scheme: "http",
|
||||
Host: "localhost:8090",
|
||||
}
|
||||
|
||||
// Create a database
|
||||
t.Log("Creating database")
|
||||
|
||||
u := urlFor(c.BrokerURL(), "query", url.Values{"q": []string{"CREATE DATABASE foo"}})
|
||||
|
||||
u := urlFor(serverURL, "query", url.Values{"q": []string{"CREATE DATABASE foo"}})
|
||||
resp, err := http.Get(u.String())
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't create database: %s", err)
|
||||
|
@ -97,8 +91,7 @@ func Test_ServerSingleIntegration(t *testing.T) {
|
|||
}
|
||||
|
||||
// Query the database exists
|
||||
u = urlFor(c.BrokerURL(), "query", url.Values{"q": []string{"SHOW DATABASES"}})
|
||||
|
||||
u = urlFor(serverURL, "query", url.Values{"q": []string{"SHOW DATABASES"}})
|
||||
resp, err = http.Get(u.String())
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't query databases: %s", err)
|
||||
|
@ -134,9 +127,7 @@ func Test_ServerSingleIntegration(t *testing.T) {
|
|||
|
||||
// Create a retention policy
|
||||
t.Log("Creating retention policy")
|
||||
|
||||
u = urlFor(c.BrokerURL(), "query", url.Values{"q": []string{"CREATE RETENTION POLICY bar ON foo DURATION 1h REPLICATION 1 DEFAULT"}})
|
||||
|
||||
u = urlFor(serverURL, "query", url.Values{"q": []string{"CREATE RETENTION POLICY bar ON foo DURATION 1h REPLICATION 1 DEFAULT"}})
|
||||
resp, err = http.Get(u.String())
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't create retention policy: %s", err)
|
||||
|
@ -164,12 +155,10 @@ func Test_ServerSingleIntegration(t *testing.T) {
|
|||
|
||||
// Write Data
|
||||
t.Log("Write data")
|
||||
|
||||
u = urlFor(c.BrokerURL(), "write", url.Values{})
|
||||
u = urlFor(serverURL, "write", url.Values{})
|
||||
|
||||
buf := []byte(fmt.Sprintf(`{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": %d, "precision":"n","values": {"value": 100}}]}`, now.UnixNano()))
|
||||
t.Logf("Writing raw data: %s", string(buf))
|
||||
|
||||
resp, err = http.Post(u.String(), "application/json", bytes.NewReader(buf))
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't write data: %s", err)
|
||||
|
@ -184,8 +173,7 @@ func Test_ServerSingleIntegration(t *testing.T) {
|
|||
|
||||
// Query the data exists
|
||||
t.Log("Query data")
|
||||
u = urlFor(c.BrokerURL(), "query", url.Values{"q": []string{`select value from "foo"."bar".cpu`}, "db": []string{"foo"}})
|
||||
|
||||
u = urlFor(serverURL, "query", url.Values{"q": []string{`select value from "foo"."bar".cpu`}, "db": []string{"foo"}})
|
||||
resp, err = http.Get(u.String())
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't query databases: %s", err)
|
||||
|
|
Loading…
Reference in New Issue