Refactor tests to use constants. Refactor new leader election test to use server setup method
parent
bdc1976c2a
commit
2505efbf31
|
@ -22,6 +22,12 @@ var _ = Suite(&CoordinatorSuite{})
|
|||
var nextPortNum int
|
||||
var nextDirNum int
|
||||
|
||||
const (
|
||||
MAX_RING_LOCATIONS = 10
|
||||
SERVER_STARTUP_TIME = time.Millisecond * 300
|
||||
REPLICATION_LAG = time.Millisecond * 200
|
||||
)
|
||||
|
||||
func nextPort() int {
|
||||
nextPortNum += 1
|
||||
// this is a hack for OSX boxes running spotify. It binds to 127.0.0.1:8099. net.Listen doesn't return an
|
||||
|
@ -38,6 +44,7 @@ func nextDir() string {
|
|||
}
|
||||
|
||||
func startAndVerifyCluster(count int, c *C) []*RaftServer {
|
||||
nextPortNum = 0
|
||||
servers := make([]*RaftServer, count, count)
|
||||
errs := make([]error, count, count)
|
||||
for i := 0; i < count; i++ {
|
||||
|
@ -58,7 +65,7 @@ func startAndVerifyCluster(count int, c *C) []*RaftServer {
|
|||
}
|
||||
errs[i] = err
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
time.Sleep(SERVER_STARTUP_TIME)
|
||||
for _, err := range errs {
|
||||
c.Assert(err, Equals, nil)
|
||||
}
|
||||
|
@ -74,9 +81,12 @@ func clean(servers []*RaftServer) {
|
|||
}
|
||||
|
||||
func newConfigAndServer(path string, port int) (*ClusterConfiguration, *RaftServer) {
|
||||
fullPath := "/tmp/chronos_coordinator_test/" + path
|
||||
fullPath := path
|
||||
if !strings.HasPrefix(fullPath, "/tmp") {
|
||||
fullPath = "/tmp/chronos_coordinator_test/" + path
|
||||
}
|
||||
os.MkdirAll(fullPath, 0744)
|
||||
config := NewClusterConfiguration(10)
|
||||
config := NewClusterConfiguration(MAX_RING_LOCATIONS)
|
||||
server := NewRaftServer(fullPath, "localhost", port, config)
|
||||
return config, server
|
||||
}
|
||||
|
@ -106,7 +116,7 @@ func (self *CoordinatorSuite) TestCanCreateCoordinatorWithNoSeed(c *C) {
|
|||
go func() {
|
||||
err = server.ListenAndServe([]string{}, false)
|
||||
}()
|
||||
time.Sleep(time.Second)
|
||||
time.Sleep(SERVER_STARTUP_TIME)
|
||||
c.Assert(err, Equals, nil)
|
||||
}
|
||||
|
||||
|
@ -120,7 +130,7 @@ func (self *CoordinatorSuite) TestCanCreateCoordinatorWithSeedThatIsNotRunning(c
|
|||
go func() {
|
||||
err = server.ListenAndServe([]string{"localhost:8079"}, false)
|
||||
}()
|
||||
time.Sleep(time.Second)
|
||||
time.Sleep(SERVER_STARTUP_TIME)
|
||||
c.Assert(err, Equals, nil)
|
||||
server.Close()
|
||||
}
|
||||
|
@ -135,14 +145,14 @@ func (self *CoordinatorSuite) TestCanRecoverCoordinatorWithData(c *C) {
|
|||
go func() {
|
||||
err = server.ListenAndServe([]string{}, false)
|
||||
}()
|
||||
time.Sleep(time.Second)
|
||||
time.Sleep(SERVER_STARTUP_TIME)
|
||||
c.Assert(err, Equals, nil)
|
||||
server.AddReadApiKey("db", "key1")
|
||||
|
||||
assertConfigContains(port, "key1", true, c)
|
||||
|
||||
server.Close()
|
||||
time.Sleep(time.Second)
|
||||
time.Sleep(SERVER_STARTUP_TIME)
|
||||
|
||||
_, server = newConfigAndServer(logDir, port)
|
||||
defer server.Close()
|
||||
|
@ -150,7 +160,7 @@ func (self *CoordinatorSuite) TestCanRecoverCoordinatorWithData(c *C) {
|
|||
go func() {
|
||||
err = server.ListenAndServe([]string{}, false)
|
||||
}()
|
||||
time.Sleep(time.Second)
|
||||
time.Sleep(SERVER_STARTUP_TIME)
|
||||
c.Assert(err, Equals, nil)
|
||||
assertConfigContains(port, "key1", true, c)
|
||||
}
|
||||
|
@ -170,7 +180,7 @@ func (self *CoordinatorSuite) TestCanCreateCoordinatorsAndReplicate(c *C) {
|
|||
go func() {
|
||||
err = server.ListenAndServe([]string{fmt.Sprintf("localhost:%d", port2)}, false)
|
||||
}()
|
||||
time.Sleep(time.Second)
|
||||
time.Sleep(SERVER_STARTUP_TIME)
|
||||
_, server2 := newConfigAndServer(logDir2, port2)
|
||||
defer server2.Close()
|
||||
defer clearPath(logDir2)
|
||||
|
@ -180,12 +190,12 @@ func (self *CoordinatorSuite) TestCanCreateCoordinatorsAndReplicate(c *C) {
|
|||
err2 = server2.ListenAndServe([]string{fmt.Sprintf("localhost:%d", port1)}, true)
|
||||
}()
|
||||
|
||||
time.Sleep(time.Second)
|
||||
time.Sleep(SERVER_STARTUP_TIME)
|
||||
c.Assert(err2, Equals, nil)
|
||||
c.Assert(err, Equals, nil)
|
||||
|
||||
server.AddReadApiKey("db", "key1")
|
||||
time.Sleep(time.Millisecond * 200)
|
||||
time.Sleep(REPLICATION_LAG)
|
||||
assertConfigContains(port1, "key1", true, c)
|
||||
assertConfigContains(port2, "key1", true, c)
|
||||
}
|
||||
|
@ -214,7 +224,7 @@ func (self *CoordinatorSuite) TestDoWriteOperationsFromNonLeaderServer(c *C) {
|
|||
go func() {
|
||||
err2 = server2.ListenAndServe([]string{fmt.Sprintf("localhost:%d", port1)}, true)
|
||||
}()
|
||||
time.Sleep(time.Second)
|
||||
time.Sleep(SERVER_STARTUP_TIME)
|
||||
c.Assert(err, Equals, nil)
|
||||
c.Assert(err2, Equals, nil)
|
||||
err = server2.AddReadApiKey("db", "key1")
|
||||
|
@ -223,7 +233,7 @@ func (self *CoordinatorSuite) TestDoWriteOperationsFromNonLeaderServer(c *C) {
|
|||
c.Assert(err, Equals, nil)
|
||||
err = server2.AddServerToLocation("somehost", int64(-1))
|
||||
c.Assert(err, Equals, nil)
|
||||
time.Sleep(time.Millisecond * 200)
|
||||
time.Sleep(REPLICATION_LAG)
|
||||
assertConfigContains(port1, "key1", true, c)
|
||||
assertConfigContains(port2, "key1", true, c)
|
||||
assertConfigContains(port1, "key2", true, c)
|
||||
|
@ -235,7 +245,7 @@ func (self *CoordinatorSuite) TestDoWriteOperationsFromNonLeaderServer(c *C) {
|
|||
c.Assert(err, Equals, nil)
|
||||
err = server2.RemoveServerFromLocation("somehost", int64(-1))
|
||||
c.Assert(err, Equals, nil)
|
||||
time.Sleep(time.Millisecond * 200)
|
||||
time.Sleep(REPLICATION_LAG)
|
||||
assertConfigContains(port2, "key2", false, c)
|
||||
assertConfigContains(port1, "somehost", false, c)
|
||||
}
|
||||
|
@ -255,7 +265,7 @@ func (self *CoordinatorSuite) TestNewServerJoiningClusterWillPickUpData(c *C) {
|
|||
err = server.ListenAndServe([]string{}, false)
|
||||
}()
|
||||
defer server.Close()
|
||||
time.Sleep(time.Second)
|
||||
time.Sleep(SERVER_STARTUP_TIME)
|
||||
c.Assert(err, Equals, nil)
|
||||
server.AddReadApiKey("db", "key1")
|
||||
|
||||
|
@ -266,91 +276,57 @@ func (self *CoordinatorSuite) TestNewServerJoiningClusterWillPickUpData(c *C) {
|
|||
err = server2.ListenAndServe([]string{fmt.Sprintf("localhost:%d", port)}, true)
|
||||
}()
|
||||
defer server2.Close()
|
||||
time.Sleep(time.Second)
|
||||
time.Sleep(SERVER_STARTUP_TIME)
|
||||
c.Assert(err, Equals, nil)
|
||||
assertConfigContains(port2, "key1", true, c)
|
||||
}
|
||||
|
||||
func (self *CoordinatorSuite) TestCanElectNewLeaderAndRecover(c *C) {
|
||||
defer http.DefaultTransport.(*http.Transport).CloseIdleConnections()
|
||||
logDir1 := nextDir()
|
||||
port1 := nextPort()
|
||||
logDir2 := nextDir()
|
||||
port2 := nextPort()
|
||||
logDir3 := nextDir()
|
||||
port3 := nextPort()
|
||||
defer clearPath(logDir1)
|
||||
defer clearPath(logDir2)
|
||||
defer clearPath(logDir3)
|
||||
|
||||
_, server := newConfigAndServer(logDir1, port1)
|
||||
|
||||
var err error
|
||||
go func() {
|
||||
err = server.ListenAndServe([]string{fmt.Sprintf("localhost:%d", port2)}, false)
|
||||
}()
|
||||
|
||||
_, server2 := newConfigAndServer(logDir2, port2)
|
||||
defer server2.Close()
|
||||
|
||||
var err2 error
|
||||
go func() {
|
||||
err2 = server2.ListenAndServe([]string{fmt.Sprintf("localhost:%d", port1), fmt.Sprintf("localhost:%d", port3)}, true)
|
||||
}()
|
||||
|
||||
_, server3 := newConfigAndServer(logDir3, port3)
|
||||
defer server3.Close()
|
||||
|
||||
var err3 error
|
||||
go func() {
|
||||
err3 = server3.ListenAndServe([]string{fmt.Sprintf("localhost:%d", port1), fmt.Sprintf("localhost:%d", port2)}, true)
|
||||
}()
|
||||
time.Sleep(time.Second)
|
||||
servers := startAndVerifyCluster(3, c)
|
||||
|
||||
err := servers[0].AddReadApiKey("db", "key1")
|
||||
c.Assert(err, Equals, nil)
|
||||
c.Assert(err2, Equals, nil)
|
||||
c.Assert(err3, Equals, nil)
|
||||
err = server.AddReadApiKey("db", "key1")
|
||||
c.Assert(err, Equals, nil)
|
||||
time.Sleep(time.Millisecond * 200)
|
||||
assertConfigContains(port1, "key1", true, c)
|
||||
assertConfigContains(port2, "key1", true, c)
|
||||
assertConfigContains(port3, "key1", true, c)
|
||||
time.Sleep(REPLICATION_LAG)
|
||||
assertConfigContains(servers[0].port, "key1", true, c)
|
||||
assertConfigContains(servers[1].port, "key1", true, c)
|
||||
assertConfigContains(servers[2].port, "key1", true, c)
|
||||
|
||||
leader, _ := server2.leaderConnectString()
|
||||
c.Assert(leader, Equals, fmt.Sprintf("http://localhost:%d", port1))
|
||||
server.Close()
|
||||
leader, _ := servers[1].leaderConnectString()
|
||||
c.Assert(leader, Equals, fmt.Sprintf("http://localhost:%d", servers[0].port))
|
||||
|
||||
// kill the leader
|
||||
servers[0].Close()
|
||||
http.DefaultTransport.(*http.Transport).CloseIdleConnections()
|
||||
err = nil
|
||||
time.Sleep(time.Second)
|
||||
leader, _ = server2.leaderConnectString()
|
||||
c.Assert(leader, Not(Equals), fmt.Sprintf("http://localhost:%d", port1))
|
||||
err = server2.AddReadApiKey("db", "key2")
|
||||
time.Sleep(REPLICATION_LAG)
|
||||
leader, _ = servers[1].leaderConnectString()
|
||||
c.Assert(leader, Not(Equals), fmt.Sprintf("http://localhost:%d", servers[0].port))
|
||||
err = servers[1].AddReadApiKey("db", "key2")
|
||||
c.Assert(err, Equals, nil)
|
||||
time.Sleep(time.Millisecond * 200)
|
||||
assertConfigContains(port2, "key2", true, c)
|
||||
assertConfigContains(port3, "key2", true, c)
|
||||
time.Sleep(REPLICATION_LAG)
|
||||
assertConfigContains(servers[1].port, "key2", true, c)
|
||||
assertConfigContains(servers[2].port, "key2", true, c)
|
||||
|
||||
_, server = newConfigAndServer(logDir1, port1)
|
||||
_, server := newConfigAndServer(servers[0].path, servers[0].port)
|
||||
defer server.Close()
|
||||
err = nil
|
||||
go func() {
|
||||
err = server.ListenAndServe([]string{fmt.Sprintf("localhost:%d", port2), fmt.Sprintf("localhost:%d", port3)}, false)
|
||||
err = server.ListenAndServe([]string{fmt.Sprintf("localhost:%d", servers[1].port), fmt.Sprintf("localhost:%d", servers[2].port)}, false)
|
||||
}()
|
||||
time.Sleep(time.Second)
|
||||
time.Sleep(SERVER_STARTUP_TIME)
|
||||
c.Assert(err, Equals, nil)
|
||||
|
||||
c.Assert(server.clusterConfig.ReadApiKeys["dbkey1"], Equals, true)
|
||||
c.Assert(server.clusterConfig.ReadApiKeys["dbkey2"], Equals, true)
|
||||
assertConfigContains(port1, "key1", true, c)
|
||||
assertConfigContains(port1, "key2", true, c)
|
||||
assertConfigContains(server.port, "key1", true, c)
|
||||
assertConfigContains(server.port, "key2", true, c)
|
||||
|
||||
err = server.AddReadApiKey("blah", "sdf")
|
||||
c.Assert(err, Equals, nil)
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
assertConfigContains(port1, "sdf", true, c)
|
||||
assertConfigContains(port2, "sdf", true, c)
|
||||
assertConfigContains(port3, "sdf", true, c)
|
||||
time.Sleep(REPLICATION_LAG)
|
||||
assertConfigContains(servers[0].port, "sdf", true, c)
|
||||
assertConfigContains(servers[1].port, "sdf", true, c)
|
||||
assertConfigContains(servers[2].port, "sdf", true, c)
|
||||
}
|
||||
|
||||
func (self *CoordinatorSuite) TestCanJoinAClusterWhenNotInitiallyPointedAtLeader(c *C) {
|
||||
|
@ -387,7 +363,7 @@ func (self *CoordinatorSuite) TestCanJoinAClusterWhenNotInitiallyPointedAtLeader
|
|||
go func() {
|
||||
err3 = server3.ListenAndServe([]string{fmt.Sprintf("localhost:%d", port2)}, true)
|
||||
}()
|
||||
time.Sleep(time.Second)
|
||||
time.Sleep(SERVER_STARTUP_TIME)
|
||||
|
||||
c.Assert(err, Equals, nil)
|
||||
c.Assert(err2, Equals, nil)
|
||||
|
@ -397,7 +373,7 @@ func (self *CoordinatorSuite) TestCanJoinAClusterWhenNotInitiallyPointedAtLeader
|
|||
|
||||
err = server.AddReadApiKey("db", "key1")
|
||||
c.Assert(err, Equals, nil)
|
||||
time.Sleep(time.Millisecond * 200)
|
||||
time.Sleep(REPLICATION_LAG)
|
||||
assertConfigContains(port1, "key1", true, c)
|
||||
assertConfigContains(port2, "key1", true, c)
|
||||
assertConfigContains(port3, "key1", true, c)
|
||||
|
@ -407,9 +383,8 @@ func (self *CoordinatorSuite) TestCanCreateDatabase(c *C) {
|
|||
servers := startAndVerifyCluster(3, c)
|
||||
defer clean(servers)
|
||||
id, _ := servers[0].GetNextDatabaseId()
|
||||
time.Sleep(time.Second)
|
||||
c.Assert(id, Equals, "1")
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
time.Sleep(REPLICATION_LAG)
|
||||
c.Assert(id, Equals, servers[1].clusterConfig.CurrentDatabaseId())
|
||||
c.Assert(id, Equals, servers[2].clusterConfig.CurrentDatabaseId())
|
||||
id2, _ := servers[1].GetNextDatabaseId()
|
||||
|
@ -418,7 +393,7 @@ func (self *CoordinatorSuite) TestCanCreateDatabase(c *C) {
|
|||
c.Assert(id2, Equals, "2")
|
||||
c.Assert(id3, Equals, "3")
|
||||
c.Assert(id4, Equals, "4")
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
time.Sleep(REPLICATION_LAG)
|
||||
c.Assert(id4, Equals, servers[1].clusterConfig.CurrentDatabaseId())
|
||||
c.Assert(id4, Equals, servers[2].clusterConfig.CurrentDatabaseId())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue