Add tests for killing leader in the cluster and adding a new server in the cluster
parent
db4d8a2a00
commit
5679b338eb
|
@ -35,7 +35,7 @@ func nextDir() string {
|
|||
func newConfigAndServer(path string, port int) (*ClusterConfiguration, *RaftServer) {
|
||||
fullPath := "/tmp/chronos_coordinator_test/" + path
|
||||
os.MkdirAll(fullPath, 0744)
|
||||
config := NewClusterConfiguration(2)
|
||||
config := NewClusterConfiguration(10)
|
||||
server := NewRaftServer(fullPath, "localhost", port, config)
|
||||
return config, server
|
||||
}
|
||||
|
@ -48,6 +48,7 @@ func assertConfigContains(port int, contains string, isContained bool, c *C) {
|
|||
host := fmt.Sprintf("localhost:%d", port)
|
||||
resp, err1 := http.Get("http://" + host + "/cluster_config")
|
||||
c.Assert(err1, Equals, nil)
|
||||
defer resp.Body.Close()
|
||||
body, err2 := ioutil.ReadAll(resp.Body)
|
||||
c.Assert(err2, Equals, nil)
|
||||
c.Assert(strings.Contains(string(body), contains), Equals, isContained)
|
||||
|
@ -192,3 +193,114 @@ func (self *CoordinatorSuite) TestDoWriteOperationsFromNonLeaderServer(c *C) {
|
|||
assertConfigContains(port2, "key2", false, c)
|
||||
assertConfigContains(port1, "somehost", false, c)
|
||||
}
|
||||
|
||||
func (self *CoordinatorSuite) TestNewServerJoiningClusterWillPickUpData(c *C) {
|
||||
logDir := nextDir()
|
||||
port := nextPort()
|
||||
logDir2 := nextDir()
|
||||
port2 := nextPort()
|
||||
defer clearPath(logDir)
|
||||
defer clearPath(logDir2)
|
||||
|
||||
_, server := newConfigAndServer(logDir, port)
|
||||
var err error
|
||||
go func() {
|
||||
err = server.ListenAndServe([]string{}, false)
|
||||
}()
|
||||
defer server.Close()
|
||||
time.Sleep(time.Second)
|
||||
c.Assert(err, Equals, nil)
|
||||
server.AddReadApiKey("db", "key1")
|
||||
|
||||
assertConfigContains(port, "key1", true, c)
|
||||
|
||||
_, server2 := newConfigAndServer(logDir2, port2)
|
||||
go func() {
|
||||
err = server2.ListenAndServe([]string{fmt.Sprintf("localhost:%d", port)}, true)
|
||||
}()
|
||||
defer server2.Close()
|
||||
time.Sleep(time.Second)
|
||||
c.Assert(err, Equals, nil)
|
||||
assertConfigContains(port2, "key1", true, c)
|
||||
}
|
||||
|
||||
func (self *CoordinatorSuite) TestCanElectNewLeaderAndRecover(c *C) {
|
||||
logDir1 := nextDir()
|
||||
port1 := nextPort()
|
||||
logDir2 := nextDir()
|
||||
port2 := nextPort()
|
||||
logDir3 := nextDir()
|
||||
port3 := nextPort()
|
||||
defer clearPath(logDir1)
|
||||
defer clearPath(logDir2)
|
||||
defer clearPath(logDir3)
|
||||
|
||||
_, server := newConfigAndServer(logDir1, port1)
|
||||
|
||||
var err error
|
||||
go func() {
|
||||
err = server.ListenAndServe([]string{fmt.Sprintf("localhost:%d", port2)}, false)
|
||||
}()
|
||||
|
||||
_, server2 := newConfigAndServer(logDir2, port2)
|
||||
defer server2.Close()
|
||||
|
||||
var err2 error
|
||||
go func() {
|
||||
err2 = server2.ListenAndServe([]string{fmt.Sprintf("localhost:%d", port1), fmt.Sprintf("localhost:%d", port3)}, true)
|
||||
}()
|
||||
|
||||
_, server3 := newConfigAndServer(logDir3, port3)
|
||||
defer server3.Close()
|
||||
|
||||
var err3 error
|
||||
go func() {
|
||||
err3 = server3.ListenAndServe([]string{fmt.Sprintf("localhost:%d", port1), fmt.Sprintf("localhost:%d", port2)}, true)
|
||||
}()
|
||||
time.Sleep(time.Second)
|
||||
|
||||
c.Assert(err, Equals, nil)
|
||||
c.Assert(err2, Equals, nil)
|
||||
c.Assert(err3, Equals, nil)
|
||||
err = server.AddReadApiKey("db", "key1")
|
||||
c.Assert(err, Equals, nil)
|
||||
time.Sleep(time.Millisecond * 200)
|
||||
assertConfigContains(port1, "key1", true, c)
|
||||
assertConfigContains(port2, "key1", true, c)
|
||||
assertConfigContains(port3, "key1", true, c)
|
||||
|
||||
leader, _ := server2.leaderConnectString()
|
||||
c.Assert(leader, Equals, fmt.Sprintf("http://localhost:%d", port1))
|
||||
server.Close()
|
||||
http.DefaultTransport.(*http.Transport).CloseIdleConnections()
|
||||
err = nil
|
||||
time.Sleep(time.Second)
|
||||
leader, _ = server2.leaderConnectString()
|
||||
c.Assert(leader, Not(Equals), fmt.Sprintf("http://localhost:%d", port1))
|
||||
err = server2.AddReadApiKey("db", "key2")
|
||||
c.Assert(err, Equals, nil)
|
||||
time.Sleep(time.Millisecond * 200)
|
||||
assertConfigContains(port2, "key2", true, c)
|
||||
assertConfigContains(port3, "key2", true, c)
|
||||
|
||||
_, server = newConfigAndServer(logDir1, port1)
|
||||
defer server.Close()
|
||||
err = nil
|
||||
go func() {
|
||||
err = server.ListenAndServe([]string{fmt.Sprintf("localhost:%d", port2), fmt.Sprintf("localhost:%d", port3)}, false)
|
||||
}()
|
||||
time.Sleep(time.Second)
|
||||
c.Assert(err, Equals, nil)
|
||||
|
||||
c.Assert(server.clusterConfig.ReadApiKeys["dbkey1"], Equals, true)
|
||||
c.Assert(server.clusterConfig.ReadApiKeys["dbkey2"], Equals, true)
|
||||
assertConfigContains(port1, "key1", true, c)
|
||||
assertConfigContains(port1, "key2", true, c)
|
||||
|
||||
err = server.AddReadApiKey("blah", "sdf")
|
||||
c.Assert(err, Equals, nil)
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
assertConfigContains(port1, "sdf", true, c)
|
||||
assertConfigContains(port2, "sdf", true, c)
|
||||
assertConfigContains(port3, "sdf", true, c)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue