2015-05-27 16:02:38 +00:00
|
|
|
package cluster
|
2015-05-21 22:26:20 +00:00
|
|
|
|
|
|
|
import (
|
2015-05-22 20:53:42 +00:00
|
|
|
"fmt"
|
2015-05-21 22:26:20 +00:00
|
|
|
"net"
|
2015-05-27 20:40:19 +00:00
|
|
|
"time"
|
2015-05-21 22:26:20 +00:00
|
|
|
|
2015-05-27 16:02:38 +00:00
|
|
|
"github.com/influxdb/influxdb/meta"
|
2015-05-23 04:23:01 +00:00
|
|
|
"github.com/influxdb/influxdb/tsdb"
|
2015-05-27 21:25:08 +00:00
|
|
|
"gopkg.in/fatih/pool.v2"
|
2015-05-21 22:26:20 +00:00
|
|
|
)
|
|
|
|
|
2015-05-27 16:02:38 +00:00
|
|
|
const (
|
|
|
|
writeShardRequestMessage byte = iota + 1
|
|
|
|
writeShardResponseMessage
|
|
|
|
)
|
|
|
|
|
2015-05-30 16:11:23 +00:00
|
|
|
// ShardWriter writes a set of points to a shard.
|
|
|
|
type ShardWriter struct {
|
|
|
|
pool *clientPool
|
|
|
|
timeout time.Duration
|
2015-05-26 20:51:11 +00:00
|
|
|
|
2015-05-30 16:11:23 +00:00
|
|
|
MetaStore interface {
|
|
|
|
Node(id uint64) (ni *meta.NodeInfo, err error)
|
2015-05-26 20:51:11 +00:00
|
|
|
}
|
|
|
|
}
|
2015-05-27 16:02:38 +00:00
|
|
|
|
2015-05-30 16:11:23 +00:00
|
|
|
// NewShardWriter returns a new instance of ShardWriter.
|
|
|
|
func NewShardWriter(timeout time.Duration) *ShardWriter {
|
|
|
|
return &ShardWriter{
|
|
|
|
pool: newClientPool(),
|
|
|
|
timeout: timeout,
|
2015-05-26 20:51:11 +00:00
|
|
|
}
|
2015-05-21 22:26:20 +00:00
|
|
|
}
|
|
|
|
|
2015-05-30 16:11:23 +00:00
|
|
|
func (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []tsdb.Point) error {
|
2015-05-27 21:25:08 +00:00
|
|
|
c, err := w.dial(ownerID)
|
2015-05-26 20:51:11 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2015-05-30 16:11:23 +00:00
|
|
|
|
2015-05-27 21:25:08 +00:00
|
|
|
conn, ok := c.(*pool.PoolConn)
|
|
|
|
if !ok {
|
|
|
|
panic("wrong connection type")
|
|
|
|
}
|
2015-05-30 16:11:23 +00:00
|
|
|
defer conn.Close() // return to pool
|
2015-05-26 23:14:46 +00:00
|
|
|
|
2015-05-30 16:11:23 +00:00
|
|
|
// Build write request.
|
2015-05-27 16:02:38 +00:00
|
|
|
var request WriteShardRequest
|
2015-05-22 20:53:42 +00:00
|
|
|
request.SetShardID(shardID)
|
|
|
|
request.AddPoints(points)
|
2015-05-21 22:26:20 +00:00
|
|
|
|
2015-05-30 16:11:23 +00:00
|
|
|
// Marshal into protocol buffers.
|
2015-05-30 20:00:46 +00:00
|
|
|
buf, err := request.MarshalBinary()
|
2015-05-21 22:26:20 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
// Write request.
|
2015-05-27 20:40:19 +00:00
|
|
|
conn.SetWriteDeadline(time.Now().Add(w.timeout))
|
2015-05-30 20:00:46 +00:00
|
|
|
if err := WriteTLV(conn, writeShardRequestMessage, buf); err != nil {
|
2015-05-27 21:25:08 +00:00
|
|
|
conn.MarkUnusable()
|
2015-05-21 22:26:20 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
// Read the response.
|
2015-05-27 20:40:19 +00:00
|
|
|
conn.SetReadDeadline(time.Now().Add(w.timeout))
|
2015-05-30 20:00:46 +00:00
|
|
|
_, buf, err = ReadTLV(conn)
|
2015-05-22 20:53:42 +00:00
|
|
|
if err != nil {
|
2015-05-27 21:25:08 +00:00
|
|
|
conn.MarkUnusable()
|
2015-05-22 20:53:42 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
// Unmarshal response.
|
2015-05-27 16:02:38 +00:00
|
|
|
var response WriteShardResponse
|
2015-05-30 20:00:46 +00:00
|
|
|
if err := response.UnmarshalBinary(buf); err != nil {
|
2015-05-22 20:53:42 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if response.Code() != 0 {
|
|
|
|
return fmt.Errorf("error code %d: %s", response.Code(), response.Message())
|
|
|
|
}
|
|
|
|
|
2015-05-21 22:26:20 +00:00
|
|
|
return nil
|
|
|
|
}
|
2015-05-27 20:40:19 +00:00
|
|
|
|
2015-05-30 16:11:23 +00:00
|
|
|
func (c *ShardWriter) dial(nodeID uint64) (net.Conn, error) {
|
|
|
|
// If we don't have a connection pool for that addr yet, create one
|
|
|
|
_, ok := c.pool.getPool(nodeID)
|
|
|
|
if !ok {
|
|
|
|
factory := &connFactory{nodeID: nodeID, clientPool: c.pool, timeout: c.timeout}
|
|
|
|
factory.metaStore = c.MetaStore
|
|
|
|
|
|
|
|
p, err := pool.NewChannelPool(1, 3, factory.dial)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
c.pool.setPool(nodeID, p)
|
|
|
|
}
|
|
|
|
return c.pool.conn(nodeID)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (w *ShardWriter) Close() error {
|
2015-05-27 16:02:38 +00:00
|
|
|
if w.pool == nil {
|
2015-05-26 20:51:11 +00:00
|
|
|
return fmt.Errorf("client already closed")
|
|
|
|
}
|
2015-05-27 16:06:04 +00:00
|
|
|
w.pool.close()
|
2015-05-27 16:02:38 +00:00
|
|
|
w.pool = nil
|
2015-05-26 20:51:11 +00:00
|
|
|
return nil
|
2015-05-21 22:26:20 +00:00
|
|
|
}
|
2015-05-30 16:11:23 +00:00
|
|
|
|
|
|
|
const (
|
|
|
|
maxConnections = 500
|
|
|
|
maxRetries = 3
|
|
|
|
)
|
|
|
|
|
|
|
|
var errMaxConnectionsExceeded = fmt.Errorf("can not exceed max connections of %d", maxConnections)
|
|
|
|
|
|
|
|
type connFactory struct {
|
|
|
|
nodeID uint64
|
|
|
|
timeout time.Duration
|
|
|
|
|
|
|
|
clientPool interface {
|
|
|
|
size() int
|
|
|
|
}
|
|
|
|
|
|
|
|
metaStore interface {
|
|
|
|
Node(id uint64) (ni *meta.NodeInfo, err error)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *connFactory) dial() (net.Conn, error) {
|
|
|
|
if c.clientPool.size() > maxConnections {
|
|
|
|
return nil, errMaxConnectionsExceeded
|
|
|
|
}
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
ni, err := c.metaStore.Node(c.nodeID)
|
2015-05-30 16:11:23 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
conn, err := net.DialTimeout("tcp", ni.Host, c.timeout)
|
2015-05-30 16:11:23 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return conn, nil
|
|
|
|
}
|