diff --git a/cluster/client_pool.go b/cluster/client_pool.go new file mode 100644 index 0000000000..b518ebd5cf --- /dev/null +++ b/cluster/client_pool.go @@ -0,0 +1,57 @@ +package cluster + +import ( + "net" + "sync" + + "github.com/fatih/pool" +) + +type clientPool struct { + mu sync.RWMutex + pool map[uint64]pool.Pool +} + +func newClientPool() *clientPool { + return &clientPool{ + pool: make(map[uint64]pool.Pool), + } +} + +func (c *clientPool) setPool(nodeID uint64, p pool.Pool) { + c.mu.Lock() + c.pool[nodeID] = p + c.mu.Unlock() +} + +func (c *clientPool) getPool(nodeID uint64) (pool.Pool, bool) { + c.mu.Lock() + p, ok := c.pool[nodeID] + c.mu.Unlock() + return p, ok +} + +func (c *clientPool) size() int { + c.mu.RLock() + var size int + for _, p := range c.pool { + size += p.Len() + } + c.mu.RUnlock() + return size +} + +func (c *clientPool) conn(nodeID uint64) (net.Conn, error) { + c.mu.Lock() + conn, err := c.pool[nodeID].Get() + c.mu.Unlock() + return conn, err +} + +func (c *clientPool) close() { + c.mu.Lock() + for _, p := range c.pool { + p.Close() + } + c.mu.Unlock() +} diff --git a/tcp/listener.go b/cluster/listener.go similarity index 73% rename from tcp/listener.go rename to cluster/listener.go index 62682f0084..ae991ffa65 100644 --- a/tcp/listener.go +++ b/cluster/listener.go @@ -1,4 +1,4 @@ -package tcp +package cluster import ( "encoding/binary" @@ -9,7 +9,6 @@ import ( "os" "sync" - "github.com/influxdb/influxdb/cluster" "github.com/influxdb/influxdb/tsdb" ) @@ -36,30 +35,38 @@ type Server struct { Logger *log.Logger shutdown chan struct{} + + mu sync.RWMutex + // the actual addr the server opens on + addr net.Addr + // used to initially spin up the server, could be a zero port + laddr string } // NewServer returns a new instance of a Server. -func NewServer(w writer) *Server { +func NewServer(w writer, laddr string) *Server { return &Server{ writer: w, + laddr: laddr, Logger: log.New(os.Stderr, "[tcp] ", log.LstdFlags), shutdown: make(chan struct{}), } } -// ListenAndServe instructs the Server to start processing connections -// on the given interface. iface must be in the form host:port -// If successful, it returns the host as the first argument -func (s *Server) ListenAndServe(laddr string) (string, error) { - if laddr == "" { // Make sure we have an laddr - return "", ErrBindAddressRequired +// Open instructs the Server to start processing connections +func (s *Server) Open() error { + if s.laddr == "" { // Make sure we have an laddr + return ErrBindAddressRequired } - ln, err := net.Listen("tcp", laddr) + ln, err := net.Listen("tcp", s.laddr) if err != nil { - return "", err + return err } + s.mu.Lock() s.listener = &ln + s.addr = ln.Addr() + s.mu.Unlock() s.Logger.Println("listening on TCP connection", ln.Addr().String()) s.wg.Add(1) @@ -89,8 +96,7 @@ func (s *Server) ListenAndServe(laddr string) (string, error) { } }() - // Return the host we started up on. Mostly needed for testing - return ln.Addr().String(), nil + return nil } // Close will close the listener @@ -114,40 +120,48 @@ func (s *Server) Close() error { // handleConnection services an individual TCP connection. func (s *Server) handleConnection(conn net.Conn) { - defer func() { - conn.Close() - s.wg.Done() - }() - - messageChannel := make(chan byte) // Start our reader up in a go routine so we don't block checking our close channel go func() { - var messageType byte - err := binary.Read(conn, binary.LittleEndian, &messageType) - if err != nil { - s.Logger.Printf("unable to read message type %s", err) - return + for { + var messageType byte + + err := binary.Read(conn, binary.LittleEndian, &messageType) + if err != nil { + s.Logger.Printf("unable to read message type %s", err) + return + } + s.processMessage(messageType, conn) + + select { + case <-s.shutdown: + // Are we shutting down? If so, exit + return + default: + } } - messageChannel <- messageType }() for { select { case <-s.shutdown: // Are we shutting down? If so, exit + conn.Close() + s.wg.Done() return - case messageType := <-messageChannel: - switch messageType { - case writeShardRequestMessage: - err := s.writeShardRequest(conn) - s.writeShardResponse(conn, err) - return - } default: } } +} +func (s *Server) processMessage(messageType byte, conn net.Conn) { + switch messageType { + case writeShardRequestMessage: + err := s.writeShardRequest(conn) + s.writeShardResponse(conn, err) + return + } + return } func (s *Server) writeShardRequest(conn net.Conn) error { @@ -163,7 +177,7 @@ func (s *Server) writeShardRequest(conn net.Conn) error { return err } - var wsr cluster.WriteShardRequest + var wsr WriteShardRequest if err := wsr.UnmarshalBinary(message); err != nil { return err } @@ -177,7 +191,7 @@ func (s *Server) writeShardResponse(conn net.Conn, e error) { return } - var wsr cluster.WriteShardResponse + var wsr WriteShardResponse if e != nil { wsr.SetCode(1) wsr.SetMessage(e.Error()) @@ -203,3 +217,10 @@ func (s *Server) writeShardResponse(conn net.Conn, e error) { return } } + +func (s *Server) Addr() net.Addr { + s.mu.RLock() + defer s.mu.RUnlock() + return s.addr + +} diff --git a/tcp/listener_test.go b/cluster/listener_test.go similarity index 55% rename from tcp/listener_test.go rename to cluster/listener_test.go index 650f819eb2..9f1c70ff3b 100644 --- a/tcp/listener_test.go +++ b/cluster/listener_test.go @@ -1,4 +1,4 @@ -package tcp_test +package cluster_test import ( "fmt" @@ -6,10 +6,22 @@ import ( "time" "github.com/davecgh/go-spew/spew" - "github.com/influxdb/influxdb/tcp" + "github.com/influxdb/influxdb/cluster" + "github.com/influxdb/influxdb/meta" "github.com/influxdb/influxdb/tsdb" ) +type metaStore struct { + host string +} + +func (m *metaStore) Node(nodeID uint64) (*meta.NodeInfo, error) { + return &meta.NodeInfo{ + ID: nodeID, + Host: m.host, + }, nil +} + type testServer struct { writeShardFunc func(shardID uint64, points []tsdb.Point) error } @@ -62,12 +74,10 @@ func (testServer) ResponseN(n int) ([]*serverResponse, error) { func TestServer_Close_ErrServerClosed(t *testing.T) { var ( ts testServer - s = tcp.NewServer(ts) + s = cluster.NewServer(ts, "127.0.0.1:0") ) - // Start on a random port - _, e := s.ListenAndServe("127.0.0.1:0") - if e != nil { + if e := s.Open(); e != nil { t.Fatalf("err does not match. expected %v, got %v", nil, e) } @@ -75,7 +85,7 @@ func TestServer_Close_ErrServerClosed(t *testing.T) { s.Close() // Try to close it again - if err := s.Close(); err != tcp.ErrServerClosed { + if err := s.Close(); err != cluster.ErrServerClosed { t.Fatalf("expected an error, got %v", err) } } @@ -83,49 +93,115 @@ func TestServer_Close_ErrServerClosed(t *testing.T) { func TestServer_Close_ErrBindAddressRequired(t *testing.T) { var ( ts testServer - s = tcp.NewServer(ts) + s = cluster.NewServer(ts, "") ) - - // Start on a random port - _, e := s.ListenAndServe("") - if e == nil { - t.Fatalf("exprected error %s, got nil.", tcp.ErrBindAddressRequired) + if e := s.Open(); e == nil { + t.Fatalf("exprected error %s, got nil.", cluster.ErrBindAddressRequired) } - } + func TestServer_WriteShardRequestSuccess(t *testing.T) { var ( ts = newTestServer(writeShardSuccess) - s = tcp.NewServer(ts) + s = cluster.NewServer(ts, "127.0.0.1:0") ) - // Close the server - defer s.Close() - - // Start on a random port - host, e := s.ListenAndServe("127.0.0.1:0") + e := s.Open() if e != nil { t.Fatalf("err does not match. expected %v, got %v", nil, e) } + // Close the server + defer s.Close() - client := tcp.NewClient() - err := client.Dial(host) - if err != nil { - t.Fatal(err) - } + writer := cluster.NewWriter(&metaStore{host: s.Addr().String()}) now := time.Now() shardID := uint64(1) + ownerID := uint64(2) var points []tsdb.Point points = append(points, tsdb.NewPoint( "cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now, )) - if err := client.WriteShard(shardID, points); err != nil { + if err := writer.Write(shardID, ownerID, points); err != nil { t.Fatal(err) } - if err := client.Close(); err != nil { + if err := writer.Close(); err != nil { + t.Fatal(err) + } + + responses, err := ts.ResponseN(1) + if err != nil { + t.Fatal(err) + } + + response := responses[0] + + if shardID != response.shardID { + t.Fatalf("unexpected shardID. exp: %d, got %d", shardID, response.shardID) + } + + got := response.points[0] + exp := points[0] + t.Log("got: ", spew.Sdump(got)) + t.Log("exp: ", spew.Sdump(exp)) + + if got.Name() != exp.Name() { + t.Fatal("unexpected name") + } + + if got.Fields()["value"] != exp.Fields()["value"] { + t.Fatal("unexpected fields") + } + + if got.Tags()["host"] != exp.Tags()["host"] { + t.Fatal("unexpected tags") + } + + if got.Time().UnixNano() != exp.Time().UnixNano() { + t.Fatal("unexpected time") + } +} + +func TestServer_WriteShardRequestMultipleSuccess(t *testing.T) { + var ( + ts = newTestServer(writeShardSuccess) + s = cluster.NewServer(ts, "127.0.0.1:0") + ) + // Start on a random port + if e := s.Open(); e != nil { + t.Fatalf("err does not match. expected %v, got %v", nil, e) + } + // Close the server + defer s.Close() + + writer := cluster.NewWriter(&metaStore{host: s.Addr().String()}) + + now := time.Now() + + shardID := uint64(1) + ownerID := uint64(2) + var points []tsdb.Point + points = append(points, tsdb.NewPoint( + "cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now, + )) + + if err := writer.Write(shardID, ownerID, points); err != nil { + t.Fatal(err) + } + + now = time.Now() + + points = append(points, tsdb.NewPoint( + "cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now, + )) + + if err := writer.Write(shardID, ownerID, points[1:]); err != nil { + t.Fatal(err) + } + + if err := writer.Close(); err != nil { t.Fatal(err) } @@ -165,32 +241,26 @@ func TestServer_WriteShardRequestSuccess(t *testing.T) { func TestServer_WriteShardRequestFail(t *testing.T) { var ( ts = newTestServer(writeShardFail) - s = tcp.NewServer(ts) + s = cluster.NewServer(ts, "127.0.0.1:0") ) + // Start on a random port + if e := s.Open(); e != nil { + t.Fatalf("err does not match. expected %v, got %v", nil, e) + } // Close the server defer s.Close() - // Start on a random port - host, e := s.ListenAndServe("127.0.0.1:0") - if e != nil { - t.Fatalf("err does not match. expected %v, got %v", nil, e) - } - - client := tcp.NewClient() - err := client.Dial(host) - if err != nil { - t.Fatal(err) - } - + writer := cluster.NewWriter(&metaStore{host: s.Addr().String()}) now := time.Now() shardID := uint64(1) + ownerID := uint64(2) var points []tsdb.Point points = append(points, tsdb.NewPoint( "cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now, )) - if err, exp := client.WriteShard(shardID, points), "error code 1: failed to write"; err == nil || err.Error() != exp { + if err, exp := writer.Write(shardID, ownerID, points), "error code 1: failed to write"; err == nil || err.Error() != exp { t.Fatalf("expected error %s, got %v", exp, err) } } diff --git a/cluster/writer.go b/cluster/writer.go new file mode 100644 index 0000000000..06ec6c828c --- /dev/null +++ b/cluster/writer.go @@ -0,0 +1,146 @@ +package cluster + +import ( + "encoding/binary" + "fmt" + "io" + "net" + + "github.com/fatih/pool" + "github.com/influxdb/influxdb/meta" + "github.com/influxdb/influxdb/tsdb" +) + +const ( + writeShardRequestMessage byte = iota + 1 + writeShardResponseMessage +) + +const maxConnections = 500 + +var errMaxConnectionsExceeded = fmt.Errorf("can not exceed max connections of %d", maxConnections) + +type metaStore interface { + Node(id uint64) (ni *meta.NodeInfo, err error) +} + +type connFactory struct { + metaStore metaStore + nodeID uint64 + clientPool interface { + size() int + } +} + +func (c *connFactory) dial() (net.Conn, error) { + if c.clientPool.size() > maxConnections { + return nil, errMaxConnectionsExceeded + } + + nodeInfo, err := c.metaStore.Node(c.nodeID) + if err != nil { + return nil, err + } + + conn, err := net.Dial("tcp", nodeInfo.Host) + if err != nil { + return nil, err + } + return conn, nil +} + +type Writer struct { + pool *clientPool + metaStore metaStore +} + +func NewWriter(m metaStore) *Writer { + return &Writer{ + pool: newClientPool(), + metaStore: m, + } +} + +func (c *Writer) dial(nodeID uint64) (net.Conn, error) { + // if we don't have a connection pool for that addr yet, create one + _, ok := c.pool.getPool(nodeID) + if !ok { + factory := &connFactory{nodeID: nodeID, metaStore: c.metaStore, clientPool: c.pool} + p, err := pool.NewChannelPool(1, 3, factory.dial) + if err != nil { + return nil, err + } + c.pool.setPool(nodeID, p) + } + return c.pool.conn(nodeID) +} + +func (w *Writer) Write(shardID, ownerID uint64, points []tsdb.Point) error { + conn, err := w.dial(ownerID) + if err != nil { + return err + } + + // This will return the connection to the data pool + defer conn.Close() + + var mt byte = writeShardRequestMessage + if err := binary.Write(conn, binary.LittleEndian, &mt); err != nil { + return err + } + + var request WriteShardRequest + request.SetShardID(shardID) + request.AddPoints(points) + + b, err := request.MarshalBinary() + if err != nil { + return err + } + + size := int64(len(b)) + + if err := binary.Write(conn, binary.LittleEndian, &size); err != nil { + return err + } + + if _, err := conn.Write(b); err != nil { + return err + } + + // read back our response + if err := binary.Read(conn, binary.LittleEndian, &mt); err != nil { + return err + } + + if err := binary.Read(conn, binary.LittleEndian, &size); err != nil { + return err + } + + message := make([]byte, size) + + reader := io.LimitReader(conn, size) + _, err = reader.Read(message) + if err != nil { + return err + } + + var response WriteShardResponse + if err := response.UnmarshalBinary(message); err != nil { + return err + } + + if response.Code() != 0 { + return fmt.Errorf("error code %d: %s", response.Code(), response.Message()) + } + + return nil +} +func (w *Writer) Close() error { + if w.pool == nil { + return fmt.Errorf("client already closed") + } + w.pool.close() + w.pool = nil + return nil +} diff --git a/tcp/client.go b/tcp/client.go deleted file mode 100644 index 31b6c63582..0000000000 --- a/tcp/client.go +++ /dev/null @@ -1,87 +0,0 @@ -package tcp - -import ( - "encoding/binary" - "fmt" - "io" - "net" - - "github.com/influxdb/influxdb/cluster" - "github.com/influxdb/influxdb/tsdb" -) - -type Client struct { - conn net.Conn -} - -func NewClient() *Client { - return &Client{} -} - -func (c *Client) Dial(addr string) error { - conn, err := net.Dial("tcp", addr) - if err != nil { - return err - } - - c.conn = conn - return nil -} - -func (c *Client) WriteShard(shardID uint64, points []tsdb.Point) error { - var mt byte = writeShardRequestMessage - if err := binary.Write(c.conn, binary.LittleEndian, &mt); err != nil { - return err - } - - var request cluster.WriteShardRequest - request.SetShardID(shardID) - request.AddPoints(points) - - b, err := request.MarshalBinary() - if err != nil { - return err - } - - size := int64(len(b)) - - if err := binary.Write(c.conn, binary.LittleEndian, &size); err != nil { - return err - } - - if _, err := c.conn.Write(b); err != nil { - return err - } - - // read back our response - if err := binary.Read(c.conn, binary.LittleEndian, &mt); err != nil { - return err - } - - if err := binary.Read(c.conn, binary.LittleEndian, &size); err != nil { - return err - } - - message := make([]byte, size) - - reader := io.LimitReader(c.conn, size) - _, err = reader.Read(message) - if err != nil { - return err - } - - var response cluster.WriteShardResponse - if err := response.UnmarshalBinary(message); err != nil { - return err - } - - if response.Code() != 0 { - return fmt.Errorf("error code %d: %s", response.Code(), response.Message()) - } - - return nil -} - -func (c *Client) Close() error { - return c.conn.Close() -} diff --git a/tcp/messages.go b/tcp/messages.go deleted file mode 100644 index d6e7ad5e19..0000000000 --- a/tcp/messages.go +++ /dev/null @@ -1,6 +0,0 @@ -package tcp - -const ( - writeShardRequestMessage byte = iota - writeShardResponseMessage -)