Merge pull request #1545 from influxdb/more_integration_testing
Further integration testingpull/1550/head
commit
4bd2f8ef65
|
@ -22,17 +22,23 @@ import (
|
|||
main "github.com/influxdb/influxdb/cmd/influxd"
|
||||
)
|
||||
|
||||
// node represents a node under test, which is both a broker and data node.
|
||||
type node struct {
|
||||
broker *messaging.Broker
|
||||
server *influxdb.Server
|
||||
url *url.URL
|
||||
leader bool
|
||||
}
|
||||
|
||||
// cluster represents a multi-node cluster.
|
||||
type cluster []node
|
||||
|
||||
// createCombinedNodeCluster creates a cluster of nServers nodes, each of which
|
||||
// runs as both a Broker and Data node. If any part cluster creation fails,
|
||||
// the testing is marked as failed.
|
||||
//
|
||||
// This function returns a slice of nodes, the first of which will be the leader.
|
||||
func createCombinedNodeCluster(t *testing.T, testName string, nNodes, basePort int) []node {
|
||||
func createCombinedNodeCluster(t *testing.T, testName string, nNodes, basePort int) cluster {
|
||||
t.Logf("Creating cluster of %d nodes for test %s", nNodes, testName)
|
||||
if nNodes < 1 {
|
||||
t.Fatalf("Test %s: asked to create nonsense cluster", testName)
|
||||
|
@ -65,7 +71,12 @@ func createCombinedNodeCluster(t *testing.T, testName string, nNodes, basePort i
|
|||
if s == nil {
|
||||
t.Fatalf("Test %s: failed to create leader data node on port %d", testName, basePort)
|
||||
}
|
||||
nodes = append(nodes, node{broker: b, server: s})
|
||||
nodes = append(nodes, node{
|
||||
broker: b,
|
||||
server: s,
|
||||
url: &url.URL{Scheme: "http", Host: "localhost:" + strconv.Itoa(basePort)},
|
||||
leader: true,
|
||||
})
|
||||
|
||||
// Create subsequent nodes, which join to first node.
|
||||
for i := 1; i < nNodes; i++ {
|
||||
|
@ -82,15 +93,22 @@ func createCombinedNodeCluster(t *testing.T, testName string, nNodes, basePort i
|
|||
if s == nil {
|
||||
t.Fatalf("Test %s: failed to create following data node on port %d", testName, basePort)
|
||||
}
|
||||
nodes = append(nodes, node{broker: b, server: s})
|
||||
|
||||
nodes = append(nodes, node{
|
||||
broker: b,
|
||||
server: s,
|
||||
url: &url.URL{Scheme: "http", Host: "localhost:" + strconv.Itoa(nextPort)},
|
||||
})
|
||||
}
|
||||
|
||||
return nodes
|
||||
}
|
||||
|
||||
// createDatabase creates a database, and verifies that the creation was successful.
|
||||
func createDatabase(t *testing.T, testName string, serverURL *url.URL, database string) {
|
||||
func createDatabase(t *testing.T, testName string, nodes cluster, database string) {
|
||||
t.Logf("Test: %s: creating database %s", testName, database)
|
||||
serverURL := nodes[0].url
|
||||
|
||||
u := urlFor(serverURL, "query", url.Values{"q": []string{"CREATE DATABASE foo"}})
|
||||
resp, err := http.Get(u.String())
|
||||
if err != nil {
|
||||
|
@ -153,9 +171,11 @@ func createDatabase(t *testing.T, testName string, serverURL *url.URL, database
|
|||
}
|
||||
|
||||
// createRetentionPolicy creates a retetention policy and verifies that the creation was successful.
|
||||
func createRetentionPolicy(t *testing.T, testName string, serverURL *url.URL, database, retention string, replicaN int) {
|
||||
func createRetentionPolicy(t *testing.T, testName string, nodes cluster, database, retention string, replicaN int) {
|
||||
t.Log("Creating retention policy")
|
||||
serverURL := nodes[0].url
|
||||
replication := fmt.Sprintf("CREATE RETENTION POLICY bar ON foo DURATION 1h REPLICATION %d DEFAULT", replicaN)
|
||||
|
||||
u := urlFor(serverURL, "query", url.Values{"q": []string{replication}})
|
||||
resp, err := http.Get(u.String())
|
||||
if err != nil {
|
||||
|
@ -184,8 +204,9 @@ func createRetentionPolicy(t *testing.T, testName string, serverURL *url.URL, da
|
|||
|
||||
// simpleWriteAndQuery creates a simple database, retention policy, and replicates
|
||||
// the data across all nodes. It then ensures a series of writes and queries are OK.
|
||||
func simpleWriteAndQuery(t *testing.T, testname string, serverURL *url.URL, nNodes int) {
|
||||
func simpleWriteAndQuery(t *testing.T, testname string, nodes cluster, nNodes int) {
|
||||
now := time.Now().UTC()
|
||||
serverURL := nodes[0].url
|
||||
var results client.Results
|
||||
|
||||
// Write Data
|
||||
|
@ -262,48 +283,33 @@ func Test_ServerSingleIntegration(t *testing.T) {
|
|||
nNodes := 1
|
||||
basePort := 8090
|
||||
testName := "single node"
|
||||
createCombinedNodeCluster(t, "single node", nNodes, basePort)
|
||||
nodes := createCombinedNodeCluster(t, "single node", nNodes, basePort)
|
||||
|
||||
serverURL := &url.URL{
|
||||
Scheme: "http",
|
||||
Host: "localhost:" + strconv.Itoa(basePort),
|
||||
}
|
||||
|
||||
createDatabase(t, testName, serverURL, "foo")
|
||||
createRetentionPolicy(t, testName, serverURL, "foo", "bar", nNodes)
|
||||
simpleWriteAndQuery(t, testName, serverURL, nNodes)
|
||||
createDatabase(t, testName, nodes, "foo")
|
||||
createRetentionPolicy(t, testName, nodes, "foo", "bar", nNodes)
|
||||
simpleWriteAndQuery(t, testName, nodes, nNodes)
|
||||
}
|
||||
|
||||
func Test_Server3NodeIntegration(t *testing.T) {
|
||||
nNodes := 3
|
||||
basePort := 8190
|
||||
testName := "3 node"
|
||||
createCombinedNodeCluster(t, testName, nNodes, basePort)
|
||||
nodes := createCombinedNodeCluster(t, testName, nNodes, basePort)
|
||||
|
||||
serverURL := &url.URL{
|
||||
Scheme: "http",
|
||||
Host: "localhost:" + strconv.Itoa(basePort),
|
||||
}
|
||||
|
||||
createDatabase(t, testName, serverURL, "foo")
|
||||
createRetentionPolicy(t, testName, serverURL, "foo", "bar", nNodes)
|
||||
simpleWriteAndQuery(t, testName, serverURL, nNodes)
|
||||
createDatabase(t, testName, nodes, "foo")
|
||||
createRetentionPolicy(t, testName, nodes, "foo", "bar", nNodes)
|
||||
simpleWriteAndQuery(t, testName, nodes, nNodes)
|
||||
}
|
||||
|
||||
func Test_Server5NodeIntegration(t *testing.T) {
|
||||
nNodes := 5
|
||||
basePort := 8290
|
||||
testName := "5 node"
|
||||
createCombinedNodeCluster(t, testName, nNodes, basePort)
|
||||
nodes := createCombinedNodeCluster(t, testName, nNodes, basePort)
|
||||
|
||||
serverURL := &url.URL{
|
||||
Scheme: "http",
|
||||
Host: "localhost:" + strconv.Itoa(basePort),
|
||||
}
|
||||
|
||||
createDatabase(t, testName, serverURL, "foo")
|
||||
createRetentionPolicy(t, testName, serverURL, "foo", "bar", nNodes)
|
||||
simpleWriteAndQuery(t, testName, serverURL, nNodes)
|
||||
createDatabase(t, testName, nodes, "foo")
|
||||
createRetentionPolicy(t, testName, nodes, "foo", "bar", nNodes)
|
||||
simpleWriteAndQuery(t, testName, nodes, nNodes)
|
||||
}
|
||||
|
||||
func urlFor(u *url.URL, path string, params url.Values) *url.URL {
|
Loading…
Reference in New Issue