query healthy servers only

pull/269/head
John Shahid 2014-02-20 16:31:59 -05:00
parent 29f21a10b3
commit 606fe1966a
4 changed files with 110 additions and 23 deletions

View File

@ -2,8 +2,15 @@ package cluster
import (
log "code.google.com/p/log4go"
"fmt"
"net"
"protocol"
"time"
)
const (
DEFAULT_BACKOFF = time.Second
MAX_BACKOFF = 10 * time.Second
)
type ClusterServer struct {
@ -13,6 +20,9 @@ type ClusterServer struct {
RaftConnectionString string
ProtobufConnectionString string
connection ServerConnection
heartbeatInterval time.Duration
backoff time.Duration
isUp bool
}
type ServerConnection interface {
@ -30,13 +40,90 @@ const (
Potential
)
func NewClusterServer(raftName, raftConnectionString, protobufConnectionString string, connection ServerConnection) *ClusterServer {
return &ClusterServer{
func NewClusterServer(raftName, raftConnectionString, protobufConnectionString string, connection ServerConnection, heartbeatInterval time.Duration) *ClusterServer {
s := &ClusterServer{
RaftName: raftName,
RaftConnectionString: raftConnectionString,
ProtobufConnectionString: protobufConnectionString,
connection: connection,
heartbeatInterval: heartbeatInterval,
backoff: DEFAULT_BACKOFF,
}
go s.heartbeat()
return s
}
func (self *ClusterServer) GetId() uint32 {
return self.Id
}
func (self *ClusterServer) Connect() {
if !shouldConnect(self.ProtobufConnectionString) {
return
}
log.Info("ClusterServer: %d connecting to: %s", self.Id, self.ProtobufConnectionString)
self.connection.Connect()
}
func (self *ClusterServer) MakeRequest(request *protocol.Request, responseStream chan *protocol.Response) error {
return self.connection.MakeRequest(request, responseStream)
}
func (self *ClusterServer) IsUp() bool {
return self.isUp
}
// private methods
var HEARTBEAT_TYPE = protocol.Request_HEARTBEAT
func (self *ClusterServer) heartbeat() {
responseChan := make(chan *protocol.Response)
heartbeatRequest := &protocol.Request{
Type: &HEARTBEAT_TYPE,
Database: protocol.String(""),
}
for {
err := self.MakeRequest(heartbeatRequest, responseChan)
if err != nil {
self.handleHeartbeatError()
continue
}
if err := self.getHeartbeatResponse(responseChan); err != nil {
self.handleHeartbeatError()
continue
}
// otherwise, reset the backoff and mark the server as up
self.isUp = true
self.backoff = DEFAULT_BACKOFF
<-time.After(self.heartbeatInterval)
}
}
func (self *ClusterServer) getHeartbeatResponse(responseChan <-chan *protocol.Response) error {
response := <-responseChan
if response.ErrorMessage != nil {
return fmt.Errorf("Server returned error to heartbeat: %s", *response.ErrorMessage)
}
if *response.Type != protocol.Response_HEARTBEAT {
return fmt.Errorf("Server returned a non heartbeat response")
}
return nil
}
func (self *ClusterServer) handleHeartbeatError() {
self.isUp = false
self.backoff *= 2
if self.backoff > MAX_BACKOFF {
self.backoff = MAX_BACKOFF
}
<-time.After(self.backoff)
}
// in the coordinator test we don't want to create protobuf servers,
@ -56,20 +143,3 @@ func shouldConnect(addr string) bool {
}
return true
}
func (self *ClusterServer) GetId() uint32 {
return self.Id
}
func (self *ClusterServer) Connect() {
if !shouldConnect(self.ProtobufConnectionString) {
return
}
log.Info("ClusterServer: %d connecting to: %s", self.Id, self.ProtobufConnectionString)
self.connection.Connect()
}
func (self *ClusterServer) MakeRequest(request *protocol.Request, responseStream chan *protocol.Response) error {
return self.connection.MakeRequest(request, responseStream)
}

View File

@ -223,8 +223,15 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *protoco
return err
}
randServerIndex := int(time.Now().UnixNano() % int64(len(self.clusterServers)))
server := self.clusterServers[randServerIndex]
healthyServers := make([]*ClusterServer, 0, len(self.clusterServers))
for _, s := range self.clusterServers {
if !s.IsUp() {
continue
}
healthyServers = append(healthyServers, s)
}
randServerIndex := int(time.Now().UnixNano() % int64(len(healthyServers)))
server := healthyServers[randServerIndex]
request := self.createRequest(querySpec)
return server.MakeRequest(request, response)

View File

@ -349,7 +349,11 @@ func (s *RaftServer) startRaft() error {
}
protobufConnectString := s.config.ProtobufConnectionString()
clusterServer := cluster.NewClusterServer(name, connectionString, protobufConnectString, NewProtobufClient(protobufConnectString))
clusterServer := cluster.NewClusterServer(name,
connectionString,
protobufConnectString,
NewProtobufClient(protobufConnectString),
s.config.ProtobufHeartbeatInterval.Duration)
command := NewAddPotentialServerCommand(clusterServer)
_, err = s.doOrProxyCommand(command, "add_server")
if err != nil {
@ -587,7 +591,11 @@ func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) {
if server == nil {
log.Info("Adding new server to the cluster config %s", command.Name)
client := NewProtobufClient(command.ProtobufConnectionString)
clusterServer := cluster.NewClusterServer(command.Name, command.ConnectionString, command.ProtobufConnectionString, client)
clusterServer := cluster.NewClusterServer(command.Name,
command.ConnectionString,
command.ProtobufConnectionString,
client,
s.config.ProtobufHeartbeatInterval.Duration)
addServer := NewAddPotentialServerCommand(clusterServer)
if _, err := s.raftServer.Do(addServer); err != nil {
log.Error("Error joining raft server: ", err, command)

View File

@ -32,6 +32,7 @@ message Request {
DROP_DATABASE = 3;
REPLICATION_REPLAY = 6;
SEQUENCE_NUMBER = 8;
HEARTBEAT = 7;
}
optional uint32 id = 1;
required Type type = 2;
@ -75,6 +76,7 @@ message Response {
SEQUENCE_NUMBER = 7;
// Access denied also serves as an end of stream response
ACCESS_DENIED = 8;
HEARTBEAT = 9;
}
enum ErrorCode {
REQUEST_TOO_LARGE = 1;