From 567fd2bfef059cd09b81d2fd82b21f548d182dda Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 9 Feb 2015 17:55:07 -0800 Subject: [PATCH 1/4] Pass around slices of nodes as cluster --- cmd/influxd/server_single_integration_test.go | 65 ++++++++++--------- 1 file changed, 33 insertions(+), 32 deletions(-) diff --git a/cmd/influxd/server_single_integration_test.go b/cmd/influxd/server_single_integration_test.go index a7f065c81a..309c72f43d 100644 --- a/cmd/influxd/server_single_integration_test.go +++ b/cmd/influxd/server_single_integration_test.go @@ -22,9 +22,11 @@ import ( main "github.com/influxdb/influxdb/cmd/influxd" ) +// node represents a node under test, which is both a broker and data node. type node struct { broker *messaging.Broker server *influxdb.Server + url *url.URL } // createCombinedNodeCluster creates a cluster of nServers nodes, each of which @@ -65,7 +67,11 @@ func createCombinedNodeCluster(t *testing.T, testName string, nNodes, basePort i if s == nil { t.Fatalf("Test %s: failed to create leader data node on port %d", testName, basePort) } - nodes = append(nodes, node{broker: b, server: s}) + nodes = append(nodes, node{ + broker: b, + server: s, + url: &url.URL{Scheme: "http", Host: "localhost:" + strconv.Itoa(basePort)}, + }) // Create subsequent nodes, which join to first node. for i := 1; i < nNodes; i++ { @@ -82,15 +88,22 @@ func createCombinedNodeCluster(t *testing.T, testName string, nNodes, basePort i if s == nil { t.Fatalf("Test %s: failed to create following data node on port %d", testName, basePort) } - nodes = append(nodes, node{broker: b, server: s}) + + nodes = append(nodes, node{ + broker: b, + server: s, + url: &url.URL{Scheme: "http", Host: "localhost:" + strconv.Itoa(nextPort)}, + }) } return nodes } // createDatabase creates a database, and verifies that the creation was successful. -func createDatabase(t *testing.T, testName string, serverURL *url.URL, database string) { +func createDatabase(t *testing.T, testName string, nodes []node, database string) { t.Logf("Test: %s: creating database %s", testName, database) + serverURL := nodes[0].url + u := urlFor(serverURL, "query", url.Values{"q": []string{"CREATE DATABASE foo"}}) resp, err := http.Get(u.String()) if err != nil { @@ -153,9 +166,11 @@ func createDatabase(t *testing.T, testName string, serverURL *url.URL, database } // createRetentionPolicy creates a retetention policy and verifies that the creation was successful. -func createRetentionPolicy(t *testing.T, testName string, serverURL *url.URL, database, retention string, replicaN int) { +func createRetentionPolicy(t *testing.T, testName string, nodes []node, database, retention string, replicaN int) { t.Log("Creating retention policy") + serverURL := nodes[0].url replication := fmt.Sprintf("CREATE RETENTION POLICY bar ON foo DURATION 1h REPLICATION %d DEFAULT", replicaN) + u := urlFor(serverURL, "query", url.Values{"q": []string{replication}}) resp, err := http.Get(u.String()) if err != nil { @@ -184,8 +199,9 @@ func createRetentionPolicy(t *testing.T, testName string, serverURL *url.URL, da // simpleWriteAndQuery creates a simple database, retention policy, and replicates // the data across all nodes. It then ensures a series of writes and queries are OK. -func simpleWriteAndQuery(t *testing.T, testname string, serverURL *url.URL, nNodes int) { +func simpleWriteAndQuery(t *testing.T, testname string, nodes []node, nNodes int) { now := time.Now().UTC() + serverURL := nodes[0].url var results client.Results // Write Data @@ -262,48 +278,33 @@ func Test_ServerSingleIntegration(t *testing.T) { nNodes := 1 basePort := 8090 testName := "single node" - createCombinedNodeCluster(t, "single node", nNodes, basePort) + nodes := createCombinedNodeCluster(t, "single node", nNodes, basePort) - serverURL := &url.URL{ - Scheme: "http", - Host: "localhost:" + strconv.Itoa(basePort), - } - - createDatabase(t, testName, serverURL, "foo") - createRetentionPolicy(t, testName, serverURL, "foo", "bar", nNodes) - simpleWriteAndQuery(t, testName, serverURL, nNodes) + createDatabase(t, testName, nodes, "foo") + createRetentionPolicy(t, testName, nodes, "foo", "bar", nNodes) + simpleWriteAndQuery(t, testName, nodes, nNodes) } func Test_Server3NodeIntegration(t *testing.T) { nNodes := 3 basePort := 8190 testName := "3 node" - createCombinedNodeCluster(t, testName, nNodes, basePort) + nodes := createCombinedNodeCluster(t, testName, nNodes, basePort) - serverURL := &url.URL{ - Scheme: "http", - Host: "localhost:" + strconv.Itoa(basePort), - } - - createDatabase(t, testName, serverURL, "foo") - createRetentionPolicy(t, testName, serverURL, "foo", "bar", nNodes) - simpleWriteAndQuery(t, testName, serverURL, nNodes) + createDatabase(t, testName, nodes, "foo") + createRetentionPolicy(t, testName, nodes, "foo", "bar", nNodes) + simpleWriteAndQuery(t, testName, nodes, nNodes) } func Test_Server5NodeIntegration(t *testing.T) { nNodes := 5 basePort := 8290 testName := "5 node" - createCombinedNodeCluster(t, testName, nNodes, basePort) + nodes := createCombinedNodeCluster(t, testName, nNodes, basePort) - serverURL := &url.URL{ - Scheme: "http", - Host: "localhost:" + strconv.Itoa(basePort), - } - - createDatabase(t, testName, serverURL, "foo") - createRetentionPolicy(t, testName, serverURL, "foo", "bar", nNodes) - simpleWriteAndQuery(t, testName, serverURL, nNodes) + createDatabase(t, testName, nodes, "foo") + createRetentionPolicy(t, testName, nodes, "foo", "bar", nNodes) + simpleWriteAndQuery(t, testName, nodes, nNodes) } func urlFor(u *url.URL, path string, params url.Values) *url.URL { From e3110cee4e10460e53dbb22c5445e1249b5c9226 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 9 Feb 2015 17:59:19 -0800 Subject: [PATCH 2/4] Add explicit leader flag --- cmd/influxd/server_single_integration_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/influxd/server_single_integration_test.go b/cmd/influxd/server_single_integration_test.go index 309c72f43d..db12211440 100644 --- a/cmd/influxd/server_single_integration_test.go +++ b/cmd/influxd/server_single_integration_test.go @@ -27,6 +27,7 @@ type node struct { broker *messaging.Broker server *influxdb.Server url *url.URL + leader bool } // createCombinedNodeCluster creates a cluster of nServers nodes, each of which @@ -71,6 +72,7 @@ func createCombinedNodeCluster(t *testing.T, testName string, nNodes, basePort i broker: b, server: s, url: &url.URL{Scheme: "http", Host: "localhost:" + strconv.Itoa(basePort)}, + leader: true, }) // Create subsequent nodes, which join to first node. From a1953f225e5251e9ab0a9795a8691ab78870a164 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 9 Feb 2015 18:00:58 -0800 Subject: [PATCH 3/4] Use new type for []nodes --- cmd/influxd/server_single_integration_test.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/cmd/influxd/server_single_integration_test.go b/cmd/influxd/server_single_integration_test.go index db12211440..1aa577b832 100644 --- a/cmd/influxd/server_single_integration_test.go +++ b/cmd/influxd/server_single_integration_test.go @@ -30,12 +30,15 @@ type node struct { leader bool } +// cluster represents a multi-node cluster. +type cluster []node + // createCombinedNodeCluster creates a cluster of nServers nodes, each of which // runs as both a Broker and Data node. If any part cluster creation fails, // the testing is marked as failed. // // This function returns a slice of nodes, the first of which will be the leader. -func createCombinedNodeCluster(t *testing.T, testName string, nNodes, basePort int) []node { +func createCombinedNodeCluster(t *testing.T, testName string, nNodes, basePort int) cluster { t.Logf("Creating cluster of %d nodes for test %s", nNodes, testName) if nNodes < 1 { t.Fatalf("Test %s: asked to create nonsense cluster", testName) @@ -102,7 +105,7 @@ func createCombinedNodeCluster(t *testing.T, testName string, nNodes, basePort i } // createDatabase creates a database, and verifies that the creation was successful. -func createDatabase(t *testing.T, testName string, nodes []node, database string) { +func createDatabase(t *testing.T, testName string, nodes cluster, database string) { t.Logf("Test: %s: creating database %s", testName, database) serverURL := nodes[0].url @@ -168,7 +171,7 @@ func createDatabase(t *testing.T, testName string, nodes []node, database string } // createRetentionPolicy creates a retetention policy and verifies that the creation was successful. -func createRetentionPolicy(t *testing.T, testName string, nodes []node, database, retention string, replicaN int) { +func createRetentionPolicy(t *testing.T, testName string, nodes cluster, database, retention string, replicaN int) { t.Log("Creating retention policy") serverURL := nodes[0].url replication := fmt.Sprintf("CREATE RETENTION POLICY bar ON foo DURATION 1h REPLICATION %d DEFAULT", replicaN) @@ -201,7 +204,7 @@ func createRetentionPolicy(t *testing.T, testName string, nodes []node, database // simpleWriteAndQuery creates a simple database, retention policy, and replicates // the data across all nodes. It then ensures a series of writes and queries are OK. -func simpleWriteAndQuery(t *testing.T, testname string, nodes []node, nNodes int) { +func simpleWriteAndQuery(t *testing.T, testname string, nodes cluster, nNodes int) { now := time.Now().UTC() serverURL := nodes[0].url var results client.Results From cb3a222cbd950bf9f696194cf802d4565b10713e Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 9 Feb 2015 18:01:25 -0800 Subject: [PATCH 4/4] Rename integration test file It now performs multi-node cluster testing. --- ...rver_single_integration_test.go => server_integration_test.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename cmd/influxd/{server_single_integration_test.go => server_integration_test.go} (100%) diff --git a/cmd/influxd/server_single_integration_test.go b/cmd/influxd/server_integration_test.go similarity index 100% rename from cmd/influxd/server_single_integration_test.go rename to cmd/influxd/server_integration_test.go