2013-12-03 22:19:42 +00:00
|
|
|
package coordinator
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"encoding/binary"
|
2014-02-23 22:52:35 +00:00
|
|
|
"fmt"
|
2013-12-03 22:19:42 +00:00
|
|
|
"io"
|
|
|
|
"net"
|
|
|
|
"sync"
|
|
|
|
"sync/atomic"
|
|
|
|
"time"
|
2014-04-02 21:49:34 +00:00
|
|
|
|
|
|
|
log "code.google.com/p/log4go"
|
2014-08-20 15:59:45 +00:00
|
|
|
"github.com/influxdb/influxdb/cluster"
|
2014-06-27 16:57:06 +00:00
|
|
|
"github.com/influxdb/influxdb/protocol"
|
2013-12-03 22:19:42 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
type ProtobufClient struct {
|
2013-12-10 18:59:44 +00:00
|
|
|
connLock sync.Mutex
|
2013-12-03 22:19:42 +00:00
|
|
|
conn net.Conn
|
|
|
|
hostAndPort string
|
|
|
|
requestBufferLock sync.RWMutex
|
|
|
|
requestBuffer map[uint32]*runningRequest
|
|
|
|
reconnectWait sync.WaitGroup
|
2014-01-31 20:40:10 +00:00
|
|
|
connectCalled bool
|
2014-02-14 20:37:21 +00:00
|
|
|
lastRequestId uint32
|
2014-02-20 21:47:55 +00:00
|
|
|
writeTimeout time.Duration
|
2014-04-14 23:05:30 +00:00
|
|
|
attempts int
|
2014-04-29 19:22:56 +00:00
|
|
|
stopped bool
|
2014-06-09 21:12:23 +00:00
|
|
|
reconChan chan struct{}
|
|
|
|
reconGroup *sync.WaitGroup
|
|
|
|
once *sync.Once
|
2013-12-03 22:19:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type runningRequest struct {
|
2014-08-20 15:59:45 +00:00
|
|
|
timeMade time.Time
|
|
|
|
r cluster.ResponseChannel
|
|
|
|
request *protocol.Request
|
2013-12-03 22:19:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
const (
|
2014-02-23 22:52:35 +00:00
|
|
|
REQUEST_RETRY_ATTEMPTS = 2
|
2014-01-07 20:43:58 +00:00
|
|
|
MAX_RESPONSE_SIZE = MAX_REQUEST_SIZE
|
2013-12-03 22:19:42 +00:00
|
|
|
MAX_REQUEST_TIME = time.Second * 1200
|
|
|
|
RECONNECT_RETRY_WAIT = time.Millisecond * 100
|
|
|
|
)
|
|
|
|
|
2014-02-20 21:47:55 +00:00
|
|
|
func NewProtobufClient(hostAndPort string, writeTimeout time.Duration) *ProtobufClient {
|
2014-02-25 22:45:33 +00:00
|
|
|
log.Debug("NewProtobufClient: ", hostAndPort)
|
2014-02-20 21:47:55 +00:00
|
|
|
return &ProtobufClient{
|
2014-02-25 12:36:40 +00:00
|
|
|
hostAndPort: hostAndPort,
|
|
|
|
requestBuffer: make(map[uint32]*runningRequest),
|
|
|
|
writeTimeout: writeTimeout,
|
2014-06-09 21:12:23 +00:00
|
|
|
reconChan: make(chan struct{}, 1),
|
|
|
|
reconGroup: new(sync.WaitGroup),
|
|
|
|
once: new(sync.Once),
|
2014-04-29 19:22:56 +00:00
|
|
|
stopped: false,
|
2014-02-20 21:47:55 +00:00
|
|
|
}
|
2014-01-31 20:40:10 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (self *ProtobufClient) Connect() {
|
2014-06-09 21:12:23 +00:00
|
|
|
self.once.Do(self.connect)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (self *ProtobufClient) connect() {
|
|
|
|
self.reconChan <- struct{}{}
|
2013-12-03 22:19:42 +00:00
|
|
|
go func() {
|
2014-01-31 20:40:10 +00:00
|
|
|
self.reconnect()
|
|
|
|
self.readResponses()
|
2013-12-03 22:19:42 +00:00
|
|
|
}()
|
2014-01-31 20:40:10 +00:00
|
|
|
go self.peridicallySweepTimedOutRequests()
|
2013-12-03 22:19:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (self *ProtobufClient) Close() {
|
2013-12-10 18:59:44 +00:00
|
|
|
self.connLock.Lock()
|
|
|
|
defer self.connLock.Unlock()
|
2013-12-03 22:19:42 +00:00
|
|
|
if self.conn != nil {
|
|
|
|
self.conn.Close()
|
2014-04-29 19:22:56 +00:00
|
|
|
self.stopped = true
|
2013-12-03 22:19:42 +00:00
|
|
|
self.conn = nil
|
|
|
|
}
|
2014-05-12 21:49:51 +00:00
|
|
|
self.ClearRequests()
|
2013-12-03 22:19:42 +00:00
|
|
|
}
|
|
|
|
|
2013-12-10 18:59:44 +00:00
|
|
|
func (self *ProtobufClient) getConnection() net.Conn {
|
|
|
|
self.connLock.Lock()
|
|
|
|
defer self.connLock.Unlock()
|
|
|
|
return self.conn
|
|
|
|
}
|
|
|
|
|
2014-05-12 21:49:51 +00:00
|
|
|
func (self *ProtobufClient) ClearRequests() {
|
|
|
|
self.requestBufferLock.Lock()
|
|
|
|
defer self.requestBufferLock.Unlock()
|
|
|
|
|
|
|
|
for _, req := range self.requestBuffer {
|
2014-08-20 15:59:45 +00:00
|
|
|
self.cancelRequest(req.request)
|
2014-05-12 21:49:51 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
self.requestBuffer = map[uint32]*runningRequest{}
|
|
|
|
}
|
|
|
|
|
2014-08-20 15:59:45 +00:00
|
|
|
func (self *ProtobufClient) CancelRequest(request *protocol.Request) {
|
|
|
|
self.requestBufferLock.Lock()
|
|
|
|
defer self.requestBufferLock.Unlock()
|
|
|
|
self.cancelRequest(request)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (self *ProtobufClient) cancelRequest(request *protocol.Request) {
|
|
|
|
req, ok := self.requestBuffer[*request.Id]
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
message := "cancelling request"
|
|
|
|
req.r.Yield(&protocol.Response{Type: &endStreamResponse, ErrorMessage: &message})
|
|
|
|
delete(self.requestBuffer, *request.Id)
|
|
|
|
}
|
|
|
|
|
2013-12-03 22:19:42 +00:00
|
|
|
// Makes a request to the server. If the responseStream chan is not nil it will expect a response from the server
|
|
|
|
// with a matching request.Id. The REQUEST_RETRY_ATTEMPTS constant of 3 and the RECONNECT_RETRY_WAIT of 100ms means
|
|
|
|
// that an attempt to make a request to a downed server will take 300ms to time out.
|
2014-08-20 15:59:45 +00:00
|
|
|
func (self *ProtobufClient) MakeRequest(request *protocol.Request, r cluster.ResponseChannel) error {
|
2014-02-14 20:37:21 +00:00
|
|
|
if request.Id == nil {
|
|
|
|
id := atomic.AddUint32(&self.lastRequestId, uint32(1))
|
|
|
|
request.Id = &id
|
|
|
|
}
|
2014-08-20 15:59:45 +00:00
|
|
|
if r != nil {
|
2013-12-03 22:19:42 +00:00
|
|
|
self.requestBufferLock.Lock()
|
|
|
|
|
|
|
|
// this should actually never happen. The sweeper should clear out dead requests
|
|
|
|
// before the uint32 ids roll over.
|
|
|
|
if oldReq, alreadyHasRequestById := self.requestBuffer[*request.Id]; alreadyHasRequestById {
|
2014-02-25 12:36:40 +00:00
|
|
|
message := "already has a request with this id, must have timed out"
|
|
|
|
log.Error(message)
|
2014-08-20 15:59:45 +00:00
|
|
|
oldReq.r.Yield(&protocol.Response{Type: &endStreamResponse, ErrorMessage: &message})
|
2013-12-03 22:19:42 +00:00
|
|
|
}
|
2014-08-20 15:59:45 +00:00
|
|
|
self.requestBuffer[*request.Id] = &runningRequest{timeMade: time.Now(), r: r, request: request}
|
2013-12-03 22:19:42 +00:00
|
|
|
self.requestBufferLock.Unlock()
|
|
|
|
}
|
|
|
|
|
|
|
|
data, err := request.Encode()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2014-02-25 12:36:40 +00:00
|
|
|
conn := self.getConnection()
|
|
|
|
if conn == nil {
|
|
|
|
conn = self.reconnect()
|
2013-12-10 18:59:44 +00:00
|
|
|
if conn == nil {
|
2014-02-25 12:36:40 +00:00
|
|
|
return fmt.Errorf("Failed to connect to server %s", self.hostAndPort)
|
2013-12-10 18:59:44 +00:00
|
|
|
}
|
2014-02-25 12:36:40 +00:00
|
|
|
}
|
2013-12-10 18:59:44 +00:00
|
|
|
|
2014-02-25 21:12:57 +00:00
|
|
|
if self.writeTimeout > 0 {
|
|
|
|
conn.SetWriteDeadline(time.Now().Add(self.writeTimeout))
|
|
|
|
}
|
2014-02-25 12:36:40 +00:00
|
|
|
buff := bytes.NewBuffer(make([]byte, 0, len(data)+8))
|
|
|
|
binary.Write(buff, binary.LittleEndian, uint32(len(data)))
|
|
|
|
_, err = conn.Write(append(buff.Bytes(), data...))
|
2014-02-23 16:46:06 +00:00
|
|
|
|
2014-02-25 12:36:40 +00:00
|
|
|
if err == nil {
|
|
|
|
return nil
|
2013-12-03 22:19:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// if we got here it errored out, clear out the request
|
|
|
|
self.requestBufferLock.Lock()
|
|
|
|
delete(self.requestBuffer, *request.Id)
|
|
|
|
self.requestBufferLock.Unlock()
|
2014-02-25 20:14:28 +00:00
|
|
|
self.reconnect()
|
2013-12-03 22:19:42 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
func (self *ProtobufClient) readResponses() {
|
|
|
|
message := make([]byte, 0, MAX_RESPONSE_SIZE)
|
|
|
|
buff := bytes.NewBuffer(message)
|
2014-04-29 19:22:56 +00:00
|
|
|
for !self.stopped {
|
2013-12-10 18:59:44 +00:00
|
|
|
buff.Reset()
|
|
|
|
conn := self.getConnection()
|
|
|
|
if conn == nil {
|
2014-03-13 20:05:35 +00:00
|
|
|
time.Sleep(200 * time.Millisecond)
|
2013-12-10 18:59:44 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
var messageSizeU uint32
|
|
|
|
var err error
|
|
|
|
err = binary.Read(conn, binary.LittleEndian, &messageSizeU)
|
|
|
|
if err != nil {
|
2014-03-13 20:05:35 +00:00
|
|
|
log.Error("Error while reading messsage size: %d", err)
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
2013-12-10 18:59:44 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
messageSize := int64(messageSizeU)
|
|
|
|
messageReader := io.LimitReader(conn, messageSize)
|
|
|
|
_, err = io.Copy(buff, messageReader)
|
|
|
|
if err != nil {
|
2014-03-13 20:05:35 +00:00
|
|
|
log.Error("Error while reading message: %d", err)
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
2013-12-10 18:59:44 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
response, err := protocol.DecodeResponse(buff)
|
|
|
|
if err != nil {
|
2013-12-11 17:39:48 +00:00
|
|
|
log.Error("error unmarshaling response: %s", err)
|
2014-03-13 20:05:35 +00:00
|
|
|
time.Sleep(200 * time.Millisecond)
|
2013-12-03 22:19:42 +00:00
|
|
|
} else {
|
2013-12-10 18:59:44 +00:00
|
|
|
self.sendResponse(response)
|
2013-12-03 22:19:42 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (self *ProtobufClient) sendResponse(response *protocol.Response) {
|
|
|
|
self.requestBufferLock.RLock()
|
|
|
|
req, ok := self.requestBuffer[*response.RequestId]
|
|
|
|
self.requestBufferLock.RUnlock()
|
2014-08-20 15:59:45 +00:00
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
switch response.GetType() {
|
|
|
|
case protocol.Response_END_STREAM,
|
|
|
|
protocol.Response_WRITE_OK,
|
|
|
|
protocol.Response_HEARTBEAT,
|
|
|
|
protocol.Response_ACCESS_DENIED:
|
|
|
|
// continue and delete the request
|
|
|
|
default:
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
self.requestBufferLock.Lock()
|
|
|
|
req, ok = self.requestBuffer[*response.RequestId]
|
|
|
|
delete(self.requestBuffer, *response.RequestId)
|
|
|
|
self.requestBufferLock.Unlock()
|
|
|
|
if !ok {
|
|
|
|
return
|
2013-12-03 22:19:42 +00:00
|
|
|
}
|
2014-08-20 15:59:45 +00:00
|
|
|
|
2014-08-27 18:16:17 +00:00
|
|
|
log.Debug("ProtobufClient yielding to %s %s", req.r.Name(), response)
|
2014-08-20 15:59:45 +00:00
|
|
|
req.r.Yield(response)
|
2013-12-03 22:19:42 +00:00
|
|
|
}
|
|
|
|
|
2014-02-25 12:36:40 +00:00
|
|
|
func (self *ProtobufClient) reconnect() net.Conn {
|
2014-06-09 21:12:23 +00:00
|
|
|
select {
|
|
|
|
case <-self.reconChan:
|
|
|
|
self.reconGroup.Add(1)
|
|
|
|
defer func() {
|
|
|
|
self.reconGroup.Done()
|
|
|
|
self.reconChan <- struct{}{}
|
|
|
|
}()
|
|
|
|
default:
|
|
|
|
self.reconGroup.Wait()
|
|
|
|
return self.conn
|
|
|
|
}
|
2013-12-03 22:19:42 +00:00
|
|
|
|
2014-02-25 12:36:40 +00:00
|
|
|
if self.conn != nil {
|
|
|
|
self.conn.Close()
|
2013-12-03 22:19:42 +00:00
|
|
|
}
|
2014-02-23 22:52:35 +00:00
|
|
|
conn, err := net.DialTimeout("tcp", self.hostAndPort, self.writeTimeout)
|
2014-06-09 21:12:23 +00:00
|
|
|
if err != nil {
|
|
|
|
self.attempts++
|
|
|
|
if self.attempts < 100 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2014-04-14 23:05:30 +00:00
|
|
|
log.Error("failed to connect to %s %d times", self.hostAndPort, self.attempts)
|
|
|
|
self.attempts = 0
|
|
|
|
}
|
2014-06-09 21:12:23 +00:00
|
|
|
|
|
|
|
self.conn = conn
|
|
|
|
log.Info("connected to %s", self.hostAndPort)
|
|
|
|
return conn
|
2013-12-03 22:19:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (self *ProtobufClient) peridicallySweepTimedOutRequests() {
|
|
|
|
for {
|
|
|
|
time.Sleep(time.Minute)
|
|
|
|
self.requestBufferLock.Lock()
|
|
|
|
maxAge := time.Now().Add(-MAX_REQUEST_TIME)
|
|
|
|
for k, req := range self.requestBuffer {
|
|
|
|
if req.timeMade.Before(maxAge) {
|
|
|
|
delete(self.requestBuffer, k)
|
2014-04-11 16:11:04 +00:00
|
|
|
log.Warn("Request timed out: ", req.request)
|
2013-12-03 22:19:42 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
self.requestBufferLock.Unlock()
|
|
|
|
}
|
|
|
|
}
|