diff --git a/src/cluster/cluster_configuration.go b/src/cluster/cluster_configuration.go index dad9ae024a..6e63cf8a61 100644 --- a/src/cluster/cluster_configuration.go +++ b/src/cluster/cluster_configuration.go @@ -231,6 +231,7 @@ func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer) { server.connection = self.connectionCreator(server.ProtobufConnectionString) server.Connect() server.SetWriteBuffer(NewWriteBuffer(server, self.wal, server.Id, self.config.PerServerWriteBufferSize)) + server.StartHeartbeat() } else if !self.addedLocalServer { log.Info("Added the local server") self.LocalServerId = server.Id diff --git a/src/cluster/cluster_server.go b/src/cluster/cluster_server.go index 73076c6cdc..6d21c8cb52 100644 --- a/src/cluster/cluster_server.go +++ b/src/cluster/cluster_server.go @@ -24,6 +24,7 @@ type ClusterServer struct { backoff time.Duration isUp bool writeBuffer *WriteBuffer + heartbeatStarted bool } type ServerConnection interface { @@ -49,13 +50,22 @@ func NewClusterServer(raftName, raftConnectionString, protobufConnectionString s connection: connection, heartbeatInterval: heartbeatInterval, backoff: DEFAULT_BACKOFF, + heartbeatStarted: false, } - go s.heartbeat() - return s } +func (self *ClusterServer) StartHeartbeat() { + if self.heartbeatStarted { + return + } + + self.heartbeatStarted = true + self.isUp = true + go self.heartbeat() +} + func (self *ClusterServer) SetWriteBuffer(writeBuffer *WriteBuffer) { self.writeBuffer = writeBuffer } @@ -97,20 +107,25 @@ func (self *ClusterServer) IsUp() bool { var HEARTBEAT_TYPE = protocol.Request_HEARTBEAT func (self *ClusterServer) heartbeat() { + defer func() { + self.heartbeatStarted = false + }() + responseChan := make(chan *protocol.Response) heartbeatRequest := &protocol.Request{ Type: &HEARTBEAT_TYPE, Database: protocol.String(""), } for { + heartbeatRequest.Id = nil err := self.MakeRequest(heartbeatRequest, responseChan) if err != nil { - self.handleHeartbeatError() + self.handleHeartbeatError(err) continue } - - if err := self.getHeartbeatResponse(responseChan); err != nil { - self.handleHeartbeatError() + err = self.getHeartbeatResponse(responseChan) + if err != nil { + self.handleHeartbeatError(err) continue } @@ -133,7 +148,7 @@ func (self *ClusterServer) getHeartbeatResponse(responseChan <-chan *protocol.Re return nil } -func (self *ClusterServer) handleHeartbeatError() { +func (self *ClusterServer) handleHeartbeatError(err error) { self.isUp = false self.backoff *= 2 if self.backoff > MAX_BACKOFF { diff --git a/src/coordinator/protobuf_request_handler.go b/src/coordinator/protobuf_request_handler.go index 78cfd23973..12b478de44 100644 --- a/src/coordinator/protobuf_request_handler.go +++ b/src/coordinator/protobuf_request_handler.go @@ -1,7 +1,6 @@ package coordinator import ( - "bytes" "cluster" log "code.google.com/p/log4go" "common" @@ -123,12 +122,16 @@ func (self *ProtobufRequestHandler) WriteResponse(conn net.Conn, response *proto return self.WriteResponse(conn, response) } - buff := bytes.NewBuffer(make([]byte, len(data)+8)) - err = binary.Write(buff, binary.LittleEndian, uint32(len(data))) + err = binary.Write(conn, binary.LittleEndian, uint32(len(data))) if err != nil { log.Error("error writing response length: %s", err) return err } - _, err = conn.Write(append(buff.Bytes(), data...)) - return err + + _, err = conn.Write(data) + if err != nil { + log.Error("error writing response: %s", err) + return err + } + return nil }