Merge pull request #2661 from influxdb/tcp-connection-pool

ClusterWriter/ClusterListener with connection pooling.
pull/2706/head
Cory LaNou 2015-05-27 13:36:37 -06:00
commit 372cb28023
6 changed files with 369 additions and 168 deletions

57
cluster/client_pool.go Normal file
View File

@ -0,0 +1,57 @@
package cluster
import (
"net"
"sync"
"github.com/fatih/pool"
)
type clientPool struct {
mu sync.RWMutex
pool map[uint64]pool.Pool
}
func newClientPool() *clientPool {
return &clientPool{
pool: make(map[uint64]pool.Pool),
}
}
func (c *clientPool) setPool(nodeID uint64, p pool.Pool) {
c.mu.Lock()
c.pool[nodeID] = p
c.mu.Unlock()
}
func (c *clientPool) getPool(nodeID uint64) (pool.Pool, bool) {
c.mu.Lock()
p, ok := c.pool[nodeID]
c.mu.Unlock()
return p, ok
}
func (c *clientPool) size() int {
c.mu.RLock()
var size int
for _, p := range c.pool {
size += p.Len()
}
c.mu.RUnlock()
return size
}
func (c *clientPool) conn(nodeID uint64) (net.Conn, error) {
c.mu.Lock()
conn, err := c.pool[nodeID].Get()
c.mu.Unlock()
return conn, err
}
func (c *clientPool) close() {
c.mu.Lock()
for _, p := range c.pool {
p.Close()
}
c.mu.Unlock()
}

View File

@ -1,4 +1,4 @@
package tcp
package cluster
import (
"encoding/binary"
@ -9,7 +9,6 @@ import (
"os"
"sync"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/tsdb"
)
@ -36,30 +35,38 @@ type Server struct {
Logger *log.Logger
shutdown chan struct{}
mu sync.RWMutex
// the actual addr the server opens on
addr net.Addr
// used to initially spin up the server, could be a zero port
laddr string
}
// NewServer returns a new instance of a Server.
func NewServer(w writer) *Server {
func NewServer(w writer, laddr string) *Server {
return &Server{
writer: w,
laddr: laddr,
Logger: log.New(os.Stderr, "[tcp] ", log.LstdFlags),
shutdown: make(chan struct{}),
}
}
// ListenAndServe instructs the Server to start processing connections
// on the given interface. iface must be in the form host:port
// If successful, it returns the host as the first argument
func (s *Server) ListenAndServe(laddr string) (string, error) {
if laddr == "" { // Make sure we have an laddr
return "", ErrBindAddressRequired
// Open instructs the Server to start processing connections
func (s *Server) Open() error {
if s.laddr == "" { // Make sure we have an laddr
return ErrBindAddressRequired
}
ln, err := net.Listen("tcp", laddr)
ln, err := net.Listen("tcp", s.laddr)
if err != nil {
return "", err
return err
}
s.mu.Lock()
s.listener = &ln
s.addr = ln.Addr()
s.mu.Unlock()
s.Logger.Println("listening on TCP connection", ln.Addr().String())
s.wg.Add(1)
@ -89,8 +96,7 @@ func (s *Server) ListenAndServe(laddr string) (string, error) {
}
}()
// Return the host we started up on. Mostly needed for testing
return ln.Addr().String(), nil
return nil
}
// Close will close the listener
@ -114,40 +120,48 @@ func (s *Server) Close() error {
// handleConnection services an individual TCP connection.
func (s *Server) handleConnection(conn net.Conn) {
defer func() {
conn.Close()
s.wg.Done()
}()
messageChannel := make(chan byte)
// Start our reader up in a go routine so we don't block checking our close channel
go func() {
var messageType byte
err := binary.Read(conn, binary.LittleEndian, &messageType)
if err != nil {
s.Logger.Printf("unable to read message type %s", err)
return
for {
var messageType byte
err := binary.Read(conn, binary.LittleEndian, &messageType)
if err != nil {
s.Logger.Printf("unable to read message type %s", err)
return
}
s.processMessage(messageType, conn)
select {
case <-s.shutdown:
// Are we shutting down? If so, exit
return
default:
}
}
messageChannel <- messageType
}()
for {
select {
case <-s.shutdown:
// Are we shutting down? If so, exit
conn.Close()
s.wg.Done()
return
case messageType := <-messageChannel:
switch messageType {
case writeShardRequestMessage:
err := s.writeShardRequest(conn)
s.writeShardResponse(conn, err)
return
}
default:
}
}
}
func (s *Server) processMessage(messageType byte, conn net.Conn) {
switch messageType {
case writeShardRequestMessage:
err := s.writeShardRequest(conn)
s.writeShardResponse(conn, err)
return
}
return
}
func (s *Server) writeShardRequest(conn net.Conn) error {
@ -163,7 +177,7 @@ func (s *Server) writeShardRequest(conn net.Conn) error {
return err
}
var wsr cluster.WriteShardRequest
var wsr WriteShardRequest
if err := wsr.UnmarshalBinary(message); err != nil {
return err
}
@ -177,7 +191,7 @@ func (s *Server) writeShardResponse(conn net.Conn, e error) {
return
}
var wsr cluster.WriteShardResponse
var wsr WriteShardResponse
if e != nil {
wsr.SetCode(1)
wsr.SetMessage(e.Error())
@ -203,3 +217,10 @@ func (s *Server) writeShardResponse(conn net.Conn, e error) {
return
}
}
func (s *Server) Addr() net.Addr {
s.mu.RLock()
defer s.mu.RUnlock()
return s.addr
}

View File

@ -1,4 +1,4 @@
package tcp_test
package cluster_test
import (
"fmt"
@ -6,10 +6,22 @@ import (
"time"
"github.com/davecgh/go-spew/spew"
"github.com/influxdb/influxdb/tcp"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/tsdb"
)
type metaStore struct {
host string
}
func (m *metaStore) Node(nodeID uint64) (*meta.NodeInfo, error) {
return &meta.NodeInfo{
ID: nodeID,
Host: m.host,
}, nil
}
type testServer struct {
writeShardFunc func(shardID uint64, points []tsdb.Point) error
}
@ -62,12 +74,10 @@ func (testServer) ResponseN(n int) ([]*serverResponse, error) {
func TestServer_Close_ErrServerClosed(t *testing.T) {
var (
ts testServer
s = tcp.NewServer(ts)
s = cluster.NewServer(ts, "127.0.0.1:0")
)
// Start on a random port
_, e := s.ListenAndServe("127.0.0.1:0")
if e != nil {
if e := s.Open(); e != nil {
t.Fatalf("err does not match. expected %v, got %v", nil, e)
}
@ -75,7 +85,7 @@ func TestServer_Close_ErrServerClosed(t *testing.T) {
s.Close()
// Try to close it again
if err := s.Close(); err != tcp.ErrServerClosed {
if err := s.Close(); err != cluster.ErrServerClosed {
t.Fatalf("expected an error, got %v", err)
}
}
@ -83,49 +93,115 @@ func TestServer_Close_ErrServerClosed(t *testing.T) {
func TestServer_Close_ErrBindAddressRequired(t *testing.T) {
var (
ts testServer
s = tcp.NewServer(ts)
s = cluster.NewServer(ts, "")
)
// Start on a random port
_, e := s.ListenAndServe("")
if e == nil {
t.Fatalf("exprected error %s, got nil.", tcp.ErrBindAddressRequired)
if e := s.Open(); e == nil {
t.Fatalf("exprected error %s, got nil.", cluster.ErrBindAddressRequired)
}
}
func TestServer_WriteShardRequestSuccess(t *testing.T) {
var (
ts = newTestServer(writeShardSuccess)
s = tcp.NewServer(ts)
s = cluster.NewServer(ts, "127.0.0.1:0")
)
// Close the server
defer s.Close()
// Start on a random port
host, e := s.ListenAndServe("127.0.0.1:0")
e := s.Open()
if e != nil {
t.Fatalf("err does not match. expected %v, got %v", nil, e)
}
// Close the server
defer s.Close()
client := tcp.NewClient()
err := client.Dial(host)
if err != nil {
t.Fatal(err)
}
writer := cluster.NewWriter(&metaStore{host: s.Addr().String()})
now := time.Now()
shardID := uint64(1)
ownerID := uint64(2)
var points []tsdb.Point
points = append(points, tsdb.NewPoint(
"cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now,
))
if err := client.WriteShard(shardID, points); err != nil {
if err := writer.Write(shardID, ownerID, points); err != nil {
t.Fatal(err)
}
if err := client.Close(); err != nil {
if err := writer.Close(); err != nil {
t.Fatal(err)
}
responses, err := ts.ResponseN(1)
if err != nil {
t.Fatal(err)
}
response := responses[0]
if shardID != response.shardID {
t.Fatalf("unexpected shardID. exp: %d, got %d", shardID, response.shardID)
}
got := response.points[0]
exp := points[0]
t.Log("got: ", spew.Sdump(got))
t.Log("exp: ", spew.Sdump(exp))
if got.Name() != exp.Name() {
t.Fatal("unexpected name")
}
if got.Fields()["value"] != exp.Fields()["value"] {
t.Fatal("unexpected fields")
}
if got.Tags()["host"] != exp.Tags()["host"] {
t.Fatal("unexpected tags")
}
if got.Time().UnixNano() != exp.Time().UnixNano() {
t.Fatal("unexpected time")
}
}
func TestServer_WriteShardRequestMultipleSuccess(t *testing.T) {
var (
ts = newTestServer(writeShardSuccess)
s = cluster.NewServer(ts, "127.0.0.1:0")
)
// Start on a random port
if e := s.Open(); e != nil {
t.Fatalf("err does not match. expected %v, got %v", nil, e)
}
// Close the server
defer s.Close()
writer := cluster.NewWriter(&metaStore{host: s.Addr().String()})
now := time.Now()
shardID := uint64(1)
ownerID := uint64(2)
var points []tsdb.Point
points = append(points, tsdb.NewPoint(
"cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now,
))
if err := writer.Write(shardID, ownerID, points); err != nil {
t.Fatal(err)
}
now = time.Now()
points = append(points, tsdb.NewPoint(
"cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now,
))
if err := writer.Write(shardID, ownerID, points[1:]); err != nil {
t.Fatal(err)
}
if err := writer.Close(); err != nil {
t.Fatal(err)
}
@ -165,32 +241,26 @@ func TestServer_WriteShardRequestSuccess(t *testing.T) {
func TestServer_WriteShardRequestFail(t *testing.T) {
var (
ts = newTestServer(writeShardFail)
s = tcp.NewServer(ts)
s = cluster.NewServer(ts, "127.0.0.1:0")
)
// Start on a random port
if e := s.Open(); e != nil {
t.Fatalf("err does not match. expected %v, got %v", nil, e)
}
// Close the server
defer s.Close()
// Start on a random port
host, e := s.ListenAndServe("127.0.0.1:0")
if e != nil {
t.Fatalf("err does not match. expected %v, got %v", nil, e)
}
client := tcp.NewClient()
err := client.Dial(host)
if err != nil {
t.Fatal(err)
}
writer := cluster.NewWriter(&metaStore{host: s.Addr().String()})
now := time.Now()
shardID := uint64(1)
ownerID := uint64(2)
var points []tsdb.Point
points = append(points, tsdb.NewPoint(
"cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now,
))
if err, exp := client.WriteShard(shardID, points), "error code 1: failed to write"; err == nil || err.Error() != exp {
if err, exp := writer.Write(shardID, ownerID, points), "error code 1: failed to write"; err == nil || err.Error() != exp {
t.Fatalf("expected error %s, got %v", exp, err)
}
}

146
cluster/writer.go Normal file
View File

@ -0,0 +1,146 @@
package cluster
import (
"encoding/binary"
"fmt"
"io"
"net"
"github.com/fatih/pool"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/tsdb"
)
const (
writeShardRequestMessage byte = iota + 1
writeShardResponseMessage
)
const maxConnections = 500
var errMaxConnectionsExceeded = fmt.Errorf("can not exceed max connections of %d", maxConnections)
type metaStore interface {
Node(id uint64) (ni *meta.NodeInfo, err error)
}
type connFactory struct {
metaStore metaStore
nodeID uint64
clientPool interface {
size() int
}
}
func (c *connFactory) dial() (net.Conn, error) {
if c.clientPool.size() > maxConnections {
return nil, errMaxConnectionsExceeded
}
nodeInfo, err := c.metaStore.Node(c.nodeID)
if err != nil {
return nil, err
}
conn, err := net.Dial("tcp", nodeInfo.Host)
if err != nil {
return nil, err
}
return conn, nil
}
type Writer struct {
pool *clientPool
metaStore metaStore
}
func NewWriter(m metaStore) *Writer {
return &Writer{
pool: newClientPool(),
metaStore: m,
}
}
func (c *Writer) dial(nodeID uint64) (net.Conn, error) {
// if we don't have a connection pool for that addr yet, create one
_, ok := c.pool.getPool(nodeID)
if !ok {
factory := &connFactory{nodeID: nodeID, metaStore: c.metaStore, clientPool: c.pool}
p, err := pool.NewChannelPool(1, 3, factory.dial)
if err != nil {
return nil, err
}
c.pool.setPool(nodeID, p)
}
return c.pool.conn(nodeID)
}
func (w *Writer) Write(shardID, ownerID uint64, points []tsdb.Point) error {
conn, err := w.dial(ownerID)
if err != nil {
return err
}
// This will return the connection to the data pool
defer conn.Close()
var mt byte = writeShardRequestMessage
if err := binary.Write(conn, binary.LittleEndian, &mt); err != nil {
return err
}
var request WriteShardRequest
request.SetShardID(shardID)
request.AddPoints(points)
b, err := request.MarshalBinary()
if err != nil {
return err
}
size := int64(len(b))
if err := binary.Write(conn, binary.LittleEndian, &size); err != nil {
return err
}
if _, err := conn.Write(b); err != nil {
return err
}
// read back our response
if err := binary.Read(conn, binary.LittleEndian, &mt); err != nil {
return err
}
if err := binary.Read(conn, binary.LittleEndian, &size); err != nil {
return err
}
message := make([]byte, size)
reader := io.LimitReader(conn, size)
_, err = reader.Read(message)
if err != nil {
return err
}
var response WriteShardResponse
if err := response.UnmarshalBinary(message); err != nil {
return err
}
if response.Code() != 0 {
return fmt.Errorf("error code %d: %s", response.Code(), response.Message())
}
return nil
}
func (w *Writer) Close() error {
if w.pool == nil {
return fmt.Errorf("client already closed")
}
w.pool.close()
w.pool = nil
return nil
}

View File

@ -1,87 +0,0 @@
package tcp
import (
"encoding/binary"
"fmt"
"io"
"net"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/tsdb"
)
type Client struct {
conn net.Conn
}
func NewClient() *Client {
return &Client{}
}
func (c *Client) Dial(addr string) error {
conn, err := net.Dial("tcp", addr)
if err != nil {
return err
}
c.conn = conn
return nil
}
func (c *Client) WriteShard(shardID uint64, points []tsdb.Point) error {
var mt byte = writeShardRequestMessage
if err := binary.Write(c.conn, binary.LittleEndian, &mt); err != nil {
return err
}
var request cluster.WriteShardRequest
request.SetShardID(shardID)
request.AddPoints(points)
b, err := request.MarshalBinary()
if err != nil {
return err
}
size := int64(len(b))
if err := binary.Write(c.conn, binary.LittleEndian, &size); err != nil {
return err
}
if _, err := c.conn.Write(b); err != nil {
return err
}
// read back our response
if err := binary.Read(c.conn, binary.LittleEndian, &mt); err != nil {
return err
}
if err := binary.Read(c.conn, binary.LittleEndian, &size); err != nil {
return err
}
message := make([]byte, size)
reader := io.LimitReader(c.conn, size)
_, err = reader.Read(message)
if err != nil {
return err
}
var response cluster.WriteShardResponse
if err := response.UnmarshalBinary(message); err != nil {
return err
}
if response.Code() != 0 {
return fmt.Errorf("error code %d: %s", response.Code(), response.Message())
}
return nil
}
func (c *Client) Close() error {
return c.conn.Close()
}

View File

@ -1,6 +0,0 @@
package tcp
const (
writeShardRequestMessage byte = iota
writeShardResponseMessage
)