2013-07-09 02:55:00 +00:00
|
|
|
package raft
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"net"
|
|
|
|
"net/http"
|
|
|
|
"sync"
|
|
|
|
"testing"
|
2013-07-09 03:00:14 +00:00
|
|
|
"time"
|
2013-07-09 02:55:00 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// Ensure that we can start several servers and have them communicate.
|
|
|
|
func TestHTTPTransporter(t *testing.T) {
|
|
|
|
transporter := NewHTTPTransporter("/raft")
|
|
|
|
transporter.DisableKeepAlives = true
|
|
|
|
|
|
|
|
servers := []*Server{}
|
|
|
|
f0 := func(server *Server, httpServer *http.Server) {
|
|
|
|
// Stop the leader and wait for an election.
|
|
|
|
server.Stop()
|
|
|
|
time.Sleep(testElectionTimeout * 2)
|
|
|
|
|
|
|
|
if servers[1].State() != Leader && servers[2].State() != Leader {
|
|
|
|
t.Fatal("Expected re-election:", servers[1].State(), servers[2].State())
|
|
|
|
}
|
2013-07-25 22:40:20 +00:00
|
|
|
server.Start()
|
2013-07-09 02:55:00 +00:00
|
|
|
}
|
|
|
|
f1 := func(server *Server, httpServer *http.Server) {
|
|
|
|
}
|
|
|
|
f2 := func(server *Server, httpServer *http.Server) {
|
|
|
|
}
|
|
|
|
runTestHttpServers(t, &servers, transporter, f0, f1, f2)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Starts multiple independent Raft servers wrapped with HTTP servers.
|
|
|
|
func runTestHttpServers(t *testing.T, servers *[]*Server, transporter *HTTPTransporter, callbacks ...func(*Server, *http.Server)) {
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
httpServers := []*http.Server{}
|
|
|
|
listeners := []net.Listener{}
|
|
|
|
for i, _ := range callbacks {
|
|
|
|
wg.Add(1)
|
2013-07-09 03:00:14 +00:00
|
|
|
port := 9000 + i
|
2013-07-09 02:55:00 +00:00
|
|
|
|
|
|
|
// Create raft server.
|
|
|
|
server := newTestServer(fmt.Sprintf("localhost:%d", port), transporter)
|
|
|
|
server.SetHeartbeatTimeout(testHeartbeatTimeout)
|
|
|
|
server.SetElectionTimeout(testElectionTimeout)
|
2013-07-25 22:40:20 +00:00
|
|
|
server.Start()
|
|
|
|
|
2013-07-09 02:55:00 +00:00
|
|
|
defer server.Stop()
|
|
|
|
*servers = append(*servers, server)
|
|
|
|
|
|
|
|
// Create listener for HTTP server and start it.
|
|
|
|
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
defer listener.Close()
|
|
|
|
listeners = append(listeners, listener)
|
|
|
|
|
|
|
|
// Create wrapping HTTP server.
|
2013-07-09 03:00:14 +00:00
|
|
|
mux := http.NewServeMux()
|
2013-07-09 02:55:00 +00:00
|
|
|
transporter.Install(server, mux)
|
2013-07-09 03:00:14 +00:00
|
|
|
httpServer := &http.Server{Addr: fmt.Sprintf(":%d", port), Handler: mux}
|
2013-07-09 02:55:00 +00:00
|
|
|
httpServers = append(httpServers, httpServer)
|
|
|
|
go func() { httpServer.Serve(listener) }()
|
|
|
|
}
|
2013-07-09 03:00:14 +00:00
|
|
|
|
2013-07-09 02:55:00 +00:00
|
|
|
// Setup configuration.
|
|
|
|
for _, server := range *servers {
|
2013-07-26 19:13:52 +00:00
|
|
|
if _, err := (*servers)[0].Do(&DefaultJoinCommand{Name: server.Name()}); err != nil {
|
2013-07-10 23:07:14 +00:00
|
|
|
t.Fatalf("Server %s unable to join: %v", server.Name(), err)
|
2013-07-09 02:55:00 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Wait for configuration to propagate.
|
|
|
|
time.Sleep(testHeartbeatTimeout * 2)
|
|
|
|
|
2013-08-02 00:58:03 +00:00
|
|
|
c := make(chan bool)
|
|
|
|
start := time.Now()
|
|
|
|
|
|
|
|
for i := 0; i < 1000; i++ {
|
|
|
|
go send(c, (*servers)[0])
|
|
|
|
}
|
|
|
|
|
|
|
|
for i := 0; i < 1000; i++ {
|
|
|
|
<-c
|
|
|
|
}
|
|
|
|
end := time.Now()
|
|
|
|
fmt.Println(end.Sub(start), "commands ", 1000*20)
|
|
|
|
|
|
|
|
// Wait for configuration to propagate.
|
|
|
|
time.Sleep(testHeartbeatTimeout * 2)
|
|
|
|
|
2013-07-09 02:55:00 +00:00
|
|
|
// Execute all the callbacks at the same time.
|
|
|
|
for _i, _f := range callbacks {
|
|
|
|
i, f := _i, _f
|
|
|
|
go func() {
|
|
|
|
defer wg.Done()
|
|
|
|
f((*servers)[i], httpServers[i])
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
|
|
|
|
// Wait until everything is done.
|
|
|
|
wg.Wait()
|
|
|
|
}
|
2013-08-02 00:58:03 +00:00
|
|
|
|
|
|
|
func send(c chan bool, s *Server) {
|
|
|
|
for i := 0; i < 20; i++ {
|
|
|
|
s.Do(&NOPCommand{})
|
|
|
|
}
|
|
|
|
c <- true
|
|
|
|
}
|