Removed retry logic and extra cruft from protobuf client for something simpler.

Retries are handled elsewhere either by the write buffer or the client making the request. Connection status and other weird reconnect things where simplified to use a single lock.
pull/269/head
Paul Dix 2014-02-25 07:36:40 -05:00
parent 679403b464
commit 417582d70b
1 changed files with 28 additions and 43 deletions

View File

@ -19,7 +19,6 @@ type ProtobufClient struct {
hostAndPort string hostAndPort string
requestBufferLock sync.RWMutex requestBufferLock sync.RWMutex
requestBuffer map[uint32]*runningRequest requestBuffer map[uint32]*runningRequest
connectionStatus uint32
reconnectWait sync.WaitGroup reconnectWait sync.WaitGroup
connectCalled bool connectCalled bool
lastRequestId uint32 lastRequestId uint32
@ -34,22 +33,22 @@ type runningRequest struct {
const ( const (
REQUEST_RETRY_ATTEMPTS = 2 REQUEST_RETRY_ATTEMPTS = 2
MAX_RESPONSE_SIZE = MAX_REQUEST_SIZE MAX_RESPONSE_SIZE = MAX_REQUEST_SIZE
IS_RECONNECTING = uint32(1)
IS_CONNECTED = uint32(0)
MAX_REQUEST_TIME = time.Second * 1200 MAX_REQUEST_TIME = time.Second * 1200
RECONNECT_RETRY_WAIT = time.Millisecond * 100 RECONNECT_RETRY_WAIT = time.Millisecond * 100
) )
func NewProtobufClient(hostAndPort string, writeTimeout time.Duration) *ProtobufClient { func NewProtobufClient(hostAndPort string, writeTimeout time.Duration) *ProtobufClient {
fmt.Println("NewProtobufClient")
return &ProtobufClient{ return &ProtobufClient{
hostAndPort: hostAndPort, hostAndPort: hostAndPort,
requestBuffer: make(map[uint32]*runningRequest), requestBuffer: make(map[uint32]*runningRequest),
connectionStatus: IS_CONNECTED, writeTimeout: writeTimeout,
writeTimeout: writeTimeout,
} }
} }
func (self *ProtobufClient) Connect() { func (self *ProtobufClient) Connect() {
self.connLock.Lock()
defer self.connLock.Unlock()
if self.connectCalled { if self.connectCalled {
return return
} }
@ -90,8 +89,9 @@ func (self *ProtobufClient) MakeRequest(request *protocol.Request, responseStrea
// this should actually never happen. The sweeper should clear out dead requests // this should actually never happen. The sweeper should clear out dead requests
// before the uint32 ids roll over. // before the uint32 ids roll over.
if oldReq, alreadyHasRequestById := self.requestBuffer[*request.Id]; alreadyHasRequestById { if oldReq, alreadyHasRequestById := self.requestBuffer[*request.Id]; alreadyHasRequestById {
log.Error("already has a request with this id, must have timed out") message := "already has a request with this id, must have timed out"
close(oldReq.responseChan) log.Error(message)
oldReq.responseChan <- &protocol.Response{Type: &endStreamResponse, ErrorMessage: &message}
} }
self.requestBuffer[*request.Id] = &runningRequest{time.Now(), responseStream} self.requestBuffer[*request.Id] = &runningRequest{time.Now(), responseStream}
self.requestBufferLock.Unlock() self.requestBufferLock.Unlock()
@ -102,35 +102,27 @@ func (self *ProtobufClient) MakeRequest(request *protocol.Request, responseStrea
return err return err
} }
// retry sending this at least a few times conn := self.getConnection()
for attempts := 0; attempts <= REQUEST_RETRY_ATTEMPTS; attempts++ { if conn == nil {
conn := self.getConnection() conn = self.reconnect()
if conn == nil { if conn == nil {
self.reconnect() return fmt.Errorf("Failed to connect to server %s", self.hostAndPort)
continue
} }
}
conn.SetWriteDeadline(time.Now().Add(self.writeTimeout)) conn.SetWriteDeadline(time.Now().Add(self.writeTimeout))
buff := bytes.NewBuffer(make([]byte, 0, len(data)+8)) buff := bytes.NewBuffer(make([]byte, 0, len(data)+8))
binary.Write(buff, binary.LittleEndian, uint32(len(data))) binary.Write(buff, binary.LittleEndian, uint32(len(data)))
_, err = conn.Write(append(buff.Bytes(), data...)) _, err = conn.Write(append(buff.Bytes(), data...))
if err == nil { if err == nil {
return nil return nil
}
log.Error("ProtobufClient: error making request: %s", err)
// TODO: do something smarter here based on whatever the error is.
// failed to make the request, reconnect and try again.
self.reconnect()
} }
// if we got here it errored out, clear out the request // if we got here it errored out, clear out the request
self.requestBufferLock.Lock() self.requestBufferLock.Lock()
delete(self.requestBuffer, *request.Id) delete(self.requestBuffer, *request.Id)
self.requestBufferLock.Unlock() self.requestBufferLock.Unlock()
if err == nil {
err = fmt.Errorf("Failed to connect after %d tries.", REQUEST_RETRY_ATTEMPTS)
}
return err return err
} }
@ -179,28 +171,21 @@ func (self *ProtobufClient) sendResponse(response *protocol.Response) {
} }
} }
func (self *ProtobufClient) reconnect() { func (self *ProtobufClient) reconnect() net.Conn {
swapped := atomic.CompareAndSwapUint32(&self.connectionStatus, IS_CONNECTED, IS_RECONNECTING) self.connLock.Lock()
defer self.connLock.Unlock()
// if it's not swapped, some other goroutine is already handling the reconect. Wait for it if self.conn != nil {
if !swapped { self.conn.Close()
return
} }
defer func() {
self.connectionStatus = IS_CONNECTED
}()
self.Close()
conn, err := net.DialTimeout("tcp", self.hostAndPort, self.writeTimeout) conn, err := net.DialTimeout("tcp", self.hostAndPort, self.writeTimeout)
if err == nil { if err == nil {
self.connLock.Lock()
defer self.connLock.Unlock()
self.conn = conn self.conn = conn
log.Info("connected to %s", self.hostAndPort) log.Info("connected to %s", self.hostAndPort)
} else { return self.conn
log.Error("failed to connect to %s", self.hostAndPort)
} }
return log.Error("failed to connect to %s", self.hostAndPort)
return nil
} }
func (self *ProtobufClient) peridicallySweepTimedOutRequests() { func (self *ProtobufClient) peridicallySweepTimedOutRequests() {