Fix #535. WAL replay hangs if the remote server crashed before sending response
parent
a1702044f7
commit
f202547703
|
@ -34,6 +34,7 @@ type ClusterServer struct {
|
|||
type ServerConnection interface {
|
||||
Connect()
|
||||
Close()
|
||||
ClearRequests()
|
||||
MakeRequest(request *protocol.Request, responseStream chan *protocol.Response) error
|
||||
}
|
||||
|
||||
|
@ -94,12 +95,12 @@ func (self *ClusterServer) Connect() {
|
|||
func (self *ClusterServer) MakeRequest(request *protocol.Request, responseStream chan *protocol.Response) {
|
||||
err := self.connection.MakeRequest(request, responseStream)
|
||||
if err != nil {
|
||||
self.isUp = false
|
||||
message := err.Error()
|
||||
select {
|
||||
case responseStream <- &protocol.Response{Type: &endStreamResponse, ErrorMessage: &message}:
|
||||
default:
|
||||
}
|
||||
self.markServerAsDown()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -178,11 +179,16 @@ func (self *ClusterServer) getHeartbeatResponse(responseChan <-chan *protocol.Re
|
|||
return nil
|
||||
}
|
||||
|
||||
func (self *ClusterServer) markServerAsDown() {
|
||||
self.isUp = false
|
||||
self.connection.ClearRequests()
|
||||
}
|
||||
|
||||
func (self *ClusterServer) handleHeartbeatError(err error) {
|
||||
if self.isUp {
|
||||
log.Warn("Server marked as down. Hearbeat error for server: %d - %s: %s", self.Id, self.ProtobufConnectionString, err)
|
||||
}
|
||||
self.isUp = false
|
||||
self.markServerAsDown()
|
||||
self.Backoff *= 2
|
||||
if self.Backoff > self.MaxBackoff {
|
||||
self.Backoff = self.MaxBackoff
|
||||
|
|
|
@ -73,6 +73,7 @@ func (self *ProtobufClient) Close() {
|
|||
self.stopped = true
|
||||
self.conn = nil
|
||||
}
|
||||
self.ClearRequests()
|
||||
}
|
||||
|
||||
func (self *ProtobufClient) getConnection() net.Conn {
|
||||
|
@ -81,6 +82,22 @@ func (self *ProtobufClient) getConnection() net.Conn {
|
|||
return self.conn
|
||||
}
|
||||
|
||||
func (self *ProtobufClient) ClearRequests() {
|
||||
self.requestBufferLock.Lock()
|
||||
defer self.requestBufferLock.Unlock()
|
||||
|
||||
message := "clearing all requests"
|
||||
for _, req := range self.requestBuffer {
|
||||
select {
|
||||
case req.responseChan <- &protocol.Response{Type: &endStreamResponse, ErrorMessage: &message}:
|
||||
default:
|
||||
log.Debug("Cannot send response on channel")
|
||||
}
|
||||
}
|
||||
|
||||
self.requestBuffer = map[uint32]*runningRequest{}
|
||||
}
|
||||
|
||||
// Makes a request to the server. If the responseStream chan is not nil it will expect a response from the server
|
||||
// with a matching request.Id. The REQUEST_RETRY_ATTEMPTS constant of 3 and the RECONNECT_RETRY_WAIT of 100ms means
|
||||
// that an attempt to make a request to a downed server will take 300ms to time out.
|
||||
|
|
|
@ -31,15 +31,7 @@ func NewProtobufRequestHandler(coordinator Coordinator, clusterConfig *cluster.C
|
|||
|
||||
func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, conn net.Conn) error {
|
||||
if *request.Type == protocol.Request_WRITE {
|
||||
shard := self.clusterConfig.GetLocalShardById(*request.ShardId)
|
||||
log.Debug("HANDLE: (%d):%d:%v", self.clusterConfig.LocalServer.Id, request.GetId(), shard)
|
||||
err := shard.WriteLocalOnly(request)
|
||||
if err != nil {
|
||||
log.Error("ProtobufRequestHandler: error writing local shard: ", err)
|
||||
return err
|
||||
}
|
||||
response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk}
|
||||
return self.WriteResponse(conn, response)
|
||||
go self.handleWrites(request, conn)
|
||||
} else if *request.Type == protocol.Request_DROP_DATABASE {
|
||||
go self.handleDropDatabase(request, conn)
|
||||
return nil
|
||||
|
@ -55,6 +47,21 @@ func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, con
|
|||
return nil
|
||||
}
|
||||
|
||||
func (self *ProtobufRequestHandler) handleWrites(request *protocol.Request, conn net.Conn) {
|
||||
shard := self.clusterConfig.GetLocalShardById(*request.ShardId)
|
||||
log.Debug("HANDLE: (%d):%d:%v", self.clusterConfig.LocalServer.Id, request.GetId(), shard)
|
||||
err := shard.WriteLocalOnly(request)
|
||||
var errorMsg *string
|
||||
if err != nil {
|
||||
log.Error("ProtobufRequestHandler: error writing local shard: %s", err)
|
||||
errorMsg = protocol.String(err.Error())
|
||||
}
|
||||
response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk, ErrorMessage: errorMsg}
|
||||
if err := self.WriteResponse(conn, response); err != nil {
|
||||
log.Error("ProtobufRequestHandler: error writing local shard: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (self *ProtobufRequestHandler) handleQuery(request *protocol.Request, conn net.Conn) {
|
||||
// the query should always parse correctly since it was parsed at the originating server.
|
||||
queries, err := parser.ParseQuery(*request.Query)
|
||||
|
|
|
@ -111,6 +111,7 @@ func (self *Server) ListenAndServe() error {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
log.Info("Connection string changed successfully")
|
||||
}
|
||||
|
||||
go self.ProtobufServer.ListenAndServe()
|
||||
|
|
Loading…
Reference in New Issue