fix few bugs with the heartbeat logic
parent
fe12f175f2
commit
2ebbb69f06
|
@ -231,6 +231,7 @@ func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer) {
|
|||
server.connection = self.connectionCreator(server.ProtobufConnectionString)
|
||||
server.Connect()
|
||||
server.SetWriteBuffer(NewWriteBuffer(server, self.wal, server.Id, self.config.PerServerWriteBufferSize))
|
||||
server.StartHeartbeat()
|
||||
} else if !self.addedLocalServer {
|
||||
log.Info("Added the local server")
|
||||
self.LocalServerId = server.Id
|
||||
|
|
|
@ -24,6 +24,7 @@ type ClusterServer struct {
|
|||
backoff time.Duration
|
||||
isUp bool
|
||||
writeBuffer *WriteBuffer
|
||||
heartbeatStarted bool
|
||||
}
|
||||
|
||||
type ServerConnection interface {
|
||||
|
@ -49,13 +50,22 @@ func NewClusterServer(raftName, raftConnectionString, protobufConnectionString s
|
|||
connection: connection,
|
||||
heartbeatInterval: heartbeatInterval,
|
||||
backoff: DEFAULT_BACKOFF,
|
||||
heartbeatStarted: false,
|
||||
}
|
||||
|
||||
go s.heartbeat()
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func (self *ClusterServer) StartHeartbeat() {
|
||||
if self.heartbeatStarted {
|
||||
return
|
||||
}
|
||||
|
||||
self.heartbeatStarted = true
|
||||
self.isUp = true
|
||||
go self.heartbeat()
|
||||
}
|
||||
|
||||
func (self *ClusterServer) SetWriteBuffer(writeBuffer *WriteBuffer) {
|
||||
self.writeBuffer = writeBuffer
|
||||
}
|
||||
|
@ -97,20 +107,25 @@ func (self *ClusterServer) IsUp() bool {
|
|||
var HEARTBEAT_TYPE = protocol.Request_HEARTBEAT
|
||||
|
||||
func (self *ClusterServer) heartbeat() {
|
||||
defer func() {
|
||||
self.heartbeatStarted = false
|
||||
}()
|
||||
|
||||
responseChan := make(chan *protocol.Response)
|
||||
heartbeatRequest := &protocol.Request{
|
||||
Type: &HEARTBEAT_TYPE,
|
||||
Database: protocol.String(""),
|
||||
}
|
||||
for {
|
||||
heartbeatRequest.Id = nil
|
||||
err := self.MakeRequest(heartbeatRequest, responseChan)
|
||||
if err != nil {
|
||||
self.handleHeartbeatError()
|
||||
self.handleHeartbeatError(err)
|
||||
continue
|
||||
}
|
||||
|
||||
if err := self.getHeartbeatResponse(responseChan); err != nil {
|
||||
self.handleHeartbeatError()
|
||||
err = self.getHeartbeatResponse(responseChan)
|
||||
if err != nil {
|
||||
self.handleHeartbeatError(err)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -133,7 +148,7 @@ func (self *ClusterServer) getHeartbeatResponse(responseChan <-chan *protocol.Re
|
|||
return nil
|
||||
}
|
||||
|
||||
func (self *ClusterServer) handleHeartbeatError() {
|
||||
func (self *ClusterServer) handleHeartbeatError(err error) {
|
||||
self.isUp = false
|
||||
self.backoff *= 2
|
||||
if self.backoff > MAX_BACKOFF {
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package coordinator
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"cluster"
|
||||
log "code.google.com/p/log4go"
|
||||
"common"
|
||||
|
@ -123,12 +122,16 @@ func (self *ProtobufRequestHandler) WriteResponse(conn net.Conn, response *proto
|
|||
return self.WriteResponse(conn, response)
|
||||
}
|
||||
|
||||
buff := bytes.NewBuffer(make([]byte, len(data)+8))
|
||||
err = binary.Write(buff, binary.LittleEndian, uint32(len(data)))
|
||||
err = binary.Write(conn, binary.LittleEndian, uint32(len(data)))
|
||||
if err != nil {
|
||||
log.Error("error writing response length: %s", err)
|
||||
return err
|
||||
}
|
||||
_, err = conn.Write(append(buff.Bytes(), data...))
|
||||
return err
|
||||
|
||||
_, err = conn.Write(data)
|
||||
if err != nil {
|
||||
log.Error("error writing response: %s", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue