From 6346ab55e7172f50e1e05c6cbd0aa45472a45fa2 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 26 May 2015 14:51:11 -0600 Subject: [PATCH 1/9] first pass at tcp connection pooling --- tcp/channel.go | 131 +++++++++++++++++++++++ tcp/channel_test.go | 247 +++++++++++++++++++++++++++++++++++++++++++ tcp/client.go | 102 +++++++++++++++--- tcp/conn.go | 22 ++++ tcp/conn_test.go | 10 ++ tcp/listener_test.go | 13 +-- tcp/pool.go | 28 +++++ 7 files changed, 525 insertions(+), 28 deletions(-) create mode 100644 tcp/channel.go create mode 100644 tcp/channel_test.go create mode 100644 tcp/conn.go create mode 100644 tcp/conn_test.go create mode 100644 tcp/pool.go diff --git a/tcp/channel.go b/tcp/channel.go new file mode 100644 index 0000000000..b60a363beb --- /dev/null +++ b/tcp/channel.go @@ -0,0 +1,131 @@ +package tcp + +import ( + "errors" + "fmt" + "net" + "sync" +) + +// channelPool implements the Pool interface based on buffered channels. +type channelPool struct { + // storage for our net.Conn connections + mu sync.Mutex + conns chan net.Conn + + // net.Conn generator + factory Factory +} + +// Factory is a function to create new connections. +type Factory func() (net.Conn, error) + +// NewChannelPool returns a new pool based on buffered channels with an initial +// capacity and maximum capacity. Factory is used when initial capacity is +// greater than zero to fill the pool. A zero initialCap doesn't fill the Pool +// until a new Get() is called. During a Get(), If there is no new connection +// available in the pool, a new connection will be created via the Factory() +// method. +func NewChannelPool(initialCap, maxCap int, factory Factory) (Pool, error) { + if initialCap < 0 || maxCap <= 0 || initialCap > maxCap { + return nil, errors.New("invalid capacity settings") + } + + c := &channelPool{ + conns: make(chan net.Conn, maxCap), + factory: factory, + } + + // create initial connections, if something goes wrong, + // just close the pool error out. + for i := 0; i < initialCap; i++ { + conn, err := factory() + if err != nil { + c.Close() + return nil, fmt.Errorf("factory is not able to fill the pool: %s", err) + } + c.conns <- conn + } + + return c, nil +} + +func (c *channelPool) getConns() chan net.Conn { + c.mu.Lock() + conns := c.conns + c.mu.Unlock() + return conns +} + +// Get implements the Pool interfaces Get() method. If there is no new +// connection available in the pool, a new connection will be created via the +// Factory() method. +func (c *channelPool) Get() (net.Conn, error) { + conns := c.getConns() + if conns == nil { + return nil, ErrClosed + } + + // wrap our connections with out custom net.Conn implementation (wrapConn + // method) that puts the connection back to the pool if it's closed. + select { + case conn := <-conns: + if conn == nil { + return nil, ErrClosed + } + + return c.wrapConn(conn), nil + default: + conn, err := c.factory() + if err != nil { + return nil, err + } + + return c.wrapConn(conn), nil + } +} + +// put puts the connection back to the pool. If the pool is full or closed, +// conn is simply closed. A nil conn will be rejected. +func (c *channelPool) put(conn net.Conn) error { + if conn == nil { + return errors.New("connection is nil. rejecting") + } + + c.mu.Lock() + defer c.mu.Unlock() + + if c.conns == nil { + // pool is closed, close passed connection + return conn.Close() + } + + // put the resource back into the pool. If the pool is full, this will + // block and the default case will be executed. + select { + case c.conns <- conn: + return nil + default: + // pool is full, close passed connection + return conn.Close() + } +} + +func (c *channelPool) Close() { + c.mu.Lock() + conns := c.conns + c.conns = nil + c.factory = nil + c.mu.Unlock() + + if conns == nil { + return + } + + close(conns) + for conn := range conns { + conn.Close() + } +} + +func (c *channelPool) Len() int { return len(c.getConns()) } diff --git a/tcp/channel_test.go b/tcp/channel_test.go new file mode 100644 index 0000000000..c1cadae324 --- /dev/null +++ b/tcp/channel_test.go @@ -0,0 +1,247 @@ +package tcp + +import ( + "log" + "math/rand" + "net" + "sync" + "testing" + "time" +) + +var ( + InitialCap = 5 + MaximumCap = 30 + network = "tcp" + address = "127.0.0.1:7777" + factory = func() (net.Conn, error) { return net.Dial(network, address) } +) + +func init() { + // used for factory function + go simpleTCPServer() + time.Sleep(time.Millisecond * 300) // wait until tcp server has been settled + + rand.Seed(time.Now().UTC().UnixNano()) +} + +func TestNew(t *testing.T) { + _, err := newChannelPool() + if err != nil { + t.Errorf("New error: %s", err) + } +} +func TestPool_Get_Impl(t *testing.T) { + p, _ := newChannelPool() + defer p.Close() + + conn, err := p.Get() + if err != nil { + t.Errorf("Get error: %s", err) + } + + _, ok := conn.(poolConn) + if !ok { + t.Errorf("Conn is not of type poolConn") + } +} + +func TestPool_Get(t *testing.T) { + p, _ := newChannelPool() + defer p.Close() + + _, err := p.Get() + if err != nil { + t.Errorf("Get error: %s", err) + } + + // after one get, current capacity should be lowered by one. + if p.Len() != (InitialCap - 1) { + t.Errorf("Get error. Expecting %d, got %d", + (InitialCap - 1), p.Len()) + } + + // get them all + var wg sync.WaitGroup + for i := 0; i < (InitialCap - 1); i++ { + wg.Add(1) + go func() { + defer wg.Done() + _, err := p.Get() + if err != nil { + t.Errorf("Get error: %s", err) + } + }() + } + wg.Wait() + + if p.Len() != 0 { + t.Errorf("Get error. Expecting %d, got %d", + (InitialCap - 1), p.Len()) + } + + _, err = p.Get() + if err != nil { + t.Errorf("Get error: %s", err) + } +} + +func TestPool_Put(t *testing.T) { + p, err := NewChannelPool(0, 30, factory) + if err != nil { + t.Fatal(err) + } + defer p.Close() + + // get/create from the pool + conns := make([]net.Conn, MaximumCap) + for i := 0; i < MaximumCap; i++ { + conn, _ := p.Get() + conns[i] = conn + } + + // now put them all back + for _, conn := range conns { + conn.Close() + } + + if p.Len() != MaximumCap { + t.Errorf("Put error len. Expecting %d, got %d", + 1, p.Len()) + } + + conn, _ := p.Get() + p.Close() // close pool + + conn.Close() // try to put into a full pool + if p.Len() != 0 { + t.Errorf("Put error. Closed pool shouldn't allow to put connections.") + } +} + +func TestPool_UsedCapacity(t *testing.T) { + p, _ := newChannelPool() + defer p.Close() + + if p.Len() != InitialCap { + t.Errorf("InitialCap error. Expecting %d, got %d", + InitialCap, p.Len()) + } +} + +func TestPool_Close(t *testing.T) { + p, _ := newChannelPool() + + // now close it and test all cases we are expecting. + p.Close() + + c := p.(*channelPool) + + if c.conns != nil { + t.Errorf("Close error, conns channel should be nil") + } + + if c.factory != nil { + t.Errorf("Close error, factory should be nil") + } + + _, err := p.Get() + if err == nil { + t.Errorf("Close error, get conn should return an error") + } + + if p.Len() != 0 { + t.Errorf("Close error used capacity. Expecting 0, got %d", p.Len()) + } +} + +func TestPoolConcurrent(t *testing.T) { + p, _ := newChannelPool() + pipe := make(chan net.Conn, 0) + + go func() { + p.Close() + }() + + for i := 0; i < MaximumCap; i++ { + go func() { + conn, _ := p.Get() + + pipe <- conn + }() + + go func() { + conn := <-pipe + if conn == nil { + return + } + conn.Close() + }() + } +} + +func TestPoolWriteRead(t *testing.T) { + p, _ := NewChannelPool(0, 30, factory) + + conn, _ := p.Get() + + msg := "hello" + _, err := conn.Write([]byte(msg)) + if err != nil { + t.Error(err) + } +} + +func TestPoolConcurrent2(t *testing.T) { + p, _ := NewChannelPool(0, 30, factory) + + var wg sync.WaitGroup + + go func() { + for i := 0; i < 10; i++ { + wg.Add(1) + go func(i int) { + conn, _ := p.Get() + time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) + conn.Close() + wg.Done() + }(i) + } + }() + + for i := 0; i < 10; i++ { + wg.Add(1) + go func(i int) { + conn, _ := p.Get() + time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) + conn.Close() + wg.Done() + }(i) + } + + wg.Wait() +} + +func newChannelPool() (Pool, error) { + return NewChannelPool(InitialCap, MaximumCap, factory) +} + +func simpleTCPServer() { + l, err := net.Listen(network, address) + if err != nil { + log.Fatal(err) + } + defer l.Close() + + for { + conn, err := l.Accept() + if err != nil { + log.Fatal(err) + } + + go func() { + buffer := make([]byte, 256) + conn.Read(buffer) + }() + } +} diff --git a/tcp/client.go b/tcp/client.go index 31b6c63582..e58fecac9a 100644 --- a/tcp/client.go +++ b/tcp/client.go @@ -5,32 +5,91 @@ import ( "fmt" "io" "net" + "strings" + "sync" "github.com/influxdb/influxdb/cluster" "github.com/influxdb/influxdb/tsdb" ) +const maxConnections = 500 + +var errMaxConnectionsExceeded = fmt.Errorf("can not exceed max connections of %d", maxConnections) + +type clientConn struct { + client *Client + addr string +} + +func newClientConn(addr string, c *Client) *clientConn { + return &clientConn{ + addr: addr, + client: c, + } +} +func (c *clientConn) dial() (net.Conn, error) { + if c.client.poolSize() > maxConnections { + return nil, errMaxConnectionsExceeded + } + + conn, err := net.Dial("tcp", c.addr) + if err != nil { + return nil, err + } + return conn, nil +} + type Client struct { - conn net.Conn + pool map[string]Pool + mu sync.RWMutex } func NewClient() *Client { - return &Client{} + return &Client{ + pool: make(map[string]Pool), + } } -func (c *Client) Dial(addr string) error { - conn, err := net.Dial("tcp", addr) +func (c *Client) poolSize() int { + if c.pool == nil { + return 0 + } + + c.mu.RLock() + defer c.mu.RUnlock() + + var size int + for _, p := range c.pool { + size += p.Len() + } + return size +} + +func (c *Client) dial(addr string) (net.Conn, error) { + addr = strings.ToLower(addr) + // if we don't have a connection pool for that addr yet, create one + c.mu.Lock() + if _, ok := c.pool[addr]; !ok { + c.mu.Unlock() + conn := newClientConn(addr, c) + p, err := NewChannelPool(1, 3, conn.dial) + if err != nil { + return nil, err + } + c.mu.Lock() + c.pool[addr] = p + c.mu.Unlock() + } + return c.pool[addr].Get() +} + +func (c *Client) WriteShard(addr string, shardID uint64, points []tsdb.Point) error { + conn, err := c.dial(addr) if err != nil { return err } - - c.conn = conn - return nil -} - -func (c *Client) WriteShard(shardID uint64, points []tsdb.Point) error { var mt byte = writeShardRequestMessage - if err := binary.Write(c.conn, binary.LittleEndian, &mt); err != nil { + if err := binary.Write(conn, binary.LittleEndian, &mt); err != nil { return err } @@ -45,26 +104,26 @@ func (c *Client) WriteShard(shardID uint64, points []tsdb.Point) error { size := int64(len(b)) - if err := binary.Write(c.conn, binary.LittleEndian, &size); err != nil { + if err := binary.Write(conn, binary.LittleEndian, &size); err != nil { return err } - if _, err := c.conn.Write(b); err != nil { + if _, err := conn.Write(b); err != nil { return err } // read back our response - if err := binary.Read(c.conn, binary.LittleEndian, &mt); err != nil { + if err := binary.Read(conn, binary.LittleEndian, &mt); err != nil { return err } - if err := binary.Read(c.conn, binary.LittleEndian, &size); err != nil { + if err := binary.Read(conn, binary.LittleEndian, &size); err != nil { return err } message := make([]byte, size) - reader := io.LimitReader(c.conn, size) + reader := io.LimitReader(conn, size) _, err = reader.Read(message) if err != nil { return err @@ -83,5 +142,14 @@ func (c *Client) WriteShard(shardID uint64, points []tsdb.Point) error { } func (c *Client) Close() error { - return c.conn.Close() + if c.pool == nil { + return fmt.Errorf("client already closed") + } + c.mu.Lock() + defer c.mu.Unlock() + for _, p := range c.pool { + p.Close() + } + c.pool = nil + return nil } diff --git a/tcp/conn.go b/tcp/conn.go new file mode 100644 index 0000000000..6e288081f2 --- /dev/null +++ b/tcp/conn.go @@ -0,0 +1,22 @@ +package tcp + +import "net" + +// poolConn is a wrapper around net.Conn to modify the the behavior of +// net.Conn's Close() method. +type poolConn struct { + net.Conn + c *channelPool +} + +// Close() puts the given connects back to the pool instead of closing it. +func (p poolConn) Close() error { + return p.c.put(p.Conn) +} + +// newConn wraps a standard net.Conn to a poolConn net.Conn. +func (c *channelPool) wrapConn(conn net.Conn) net.Conn { + p := poolConn{c: c} + p.Conn = conn + return p +} diff --git a/tcp/conn_test.go b/tcp/conn_test.go new file mode 100644 index 0000000000..12f9fdecde --- /dev/null +++ b/tcp/conn_test.go @@ -0,0 +1,10 @@ +package tcp + +import ( + "net" + "testing" +) + +func TestConn_Impl(t *testing.T) { + var _ net.Conn = new(poolConn) +} diff --git a/tcp/listener_test.go b/tcp/listener_test.go index 650f819eb2..0d9a0f030f 100644 --- a/tcp/listener_test.go +++ b/tcp/listener_test.go @@ -108,10 +108,6 @@ func TestServer_WriteShardRequestSuccess(t *testing.T) { } client := tcp.NewClient() - err := client.Dial(host) - if err != nil { - t.Fatal(err) - } now := time.Now() @@ -121,7 +117,7 @@ func TestServer_WriteShardRequestSuccess(t *testing.T) { "cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now, )) - if err := client.WriteShard(shardID, points); err != nil { + if err := client.WriteShard(host, shardID, points); err != nil { t.Fatal(err) } @@ -177,11 +173,6 @@ func TestServer_WriteShardRequestFail(t *testing.T) { } client := tcp.NewClient() - err := client.Dial(host) - if err != nil { - t.Fatal(err) - } - now := time.Now() shardID := uint64(1) @@ -190,7 +181,7 @@ func TestServer_WriteShardRequestFail(t *testing.T) { "cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now, )) - if err, exp := client.WriteShard(shardID, points), "error code 1: failed to write"; err == nil || err.Error() != exp { + if err, exp := client.WriteShard(host, shardID, points), "error code 1: failed to write"; err == nil || err.Error() != exp { t.Fatalf("expected error %s, got %v", exp, err) } } diff --git a/tcp/pool.go b/tcp/pool.go new file mode 100644 index 0000000000..5f3e57b167 --- /dev/null +++ b/tcp/pool.go @@ -0,0 +1,28 @@ +// Design is based heavily (or exactly) on the https://github.com/fatih/pool package +package tcp + +import ( + "errors" + "net" +) + +var ( + // ErrClosed is the error resulting if the pool is closed via pool.Close(). + ErrClosed = errors.New("pool is closed") +) + +// Pool interface describes a pool implementation. A pool should have maximum +// capacity. An ideal pool is threadsafe and easy to use. +type Pool interface { + // Get returns a new connection from the pool. Closing the connections puts + // it back to the Pool. Closing it when the pool is destroyed or full will + // be counted as an error. + Get() (net.Conn, error) + + // Close closes the pool and all its connections. After Close() the pool is + // no longer usable. + Close() + + // Len returns the current number of connections of the pool. + Len() int +} From ee0a67ae368daea6c1669895d735a7fd9234b369 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 26 May 2015 15:04:54 -0600 Subject: [PATCH 2/9] fix package naming --- tcp/client.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tcp/client.go b/tcp/client.go index e58fecac9a..dfa0bc3771 100644 --- a/tcp/client.go +++ b/tcp/client.go @@ -80,7 +80,10 @@ func (c *Client) dial(addr string) (net.Conn, error) { c.pool[addr] = p c.mu.Unlock() } - return c.pool[addr].Get() + c.mu.Lock() + conn, err := c.pool[addr].Get() + c.mu.Unlock() + return conn, err } func (c *Client) WriteShard(addr string, shardID uint64, points []tsdb.Point) error { From 31593e508367891f365c857e387f2df403d1369d Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 26 May 2015 17:14:46 -0600 Subject: [PATCH 3/9] refactoring and deadlock fix --- tcp/client.go | 38 ++++++--------------- tcp/conn.go | 7 +++- tcp/connection_pool.go | 66 ++++++++++++++++++++++++++++++++++++ tcp/listener.go | 43 +++++++++++++----------- tcp/listener_test.go | 76 ++++++++++++++++++++++++++++++++++++++++++ tcp/messages.go | 2 +- 6 files changed, 184 insertions(+), 48 deletions(-) create mode 100644 tcp/connection_pool.go diff --git a/tcp/client.go b/tcp/client.go index dfa0bc3771..85b65e1012 100644 --- a/tcp/client.go +++ b/tcp/client.go @@ -6,7 +6,6 @@ import ( "io" "net" "strings" - "sync" "github.com/influxdb/influxdb/cluster" "github.com/influxdb/influxdb/tsdb" @@ -40,13 +39,12 @@ func (c *clientConn) dial() (net.Conn, error) { } type Client struct { - pool map[string]Pool - mu sync.RWMutex + pool *connectionPool } func NewClient() *Client { return &Client{ - pool: make(map[string]Pool), + pool: newConnectionPool(), } } @@ -55,35 +53,22 @@ func (c *Client) poolSize() int { return 0 } - c.mu.RLock() - defer c.mu.RUnlock() - - var size int - for _, p := range c.pool { - size += p.Len() - } - return size + return c.pool.size() } func (c *Client) dial(addr string) (net.Conn, error) { addr = strings.ToLower(addr) // if we don't have a connection pool for that addr yet, create one - c.mu.Lock() - if _, ok := c.pool[addr]; !ok { - c.mu.Unlock() + _, ok := c.pool.getPool(addr) + if !ok { conn := newClientConn(addr, c) p, err := NewChannelPool(1, 3, conn.dial) if err != nil { return nil, err } - c.mu.Lock() - c.pool[addr] = p - c.mu.Unlock() + c.pool.setPool(addr, p) } - c.mu.Lock() - conn, err := c.pool[addr].Get() - c.mu.Unlock() - return conn, err + return c.pool.conn(addr) } func (c *Client) WriteShard(addr string, shardID uint64, points []tsdb.Point) error { @@ -91,6 +76,10 @@ func (c *Client) WriteShard(addr string, shardID uint64, points []tsdb.Point) er if err != nil { return err } + + // This will return the connection to the data pool + defer conn.Close() + var mt byte = writeShardRequestMessage if err := binary.Write(conn, binary.LittleEndian, &mt); err != nil { return err @@ -148,11 +137,6 @@ func (c *Client) Close() error { if c.pool == nil { return fmt.Errorf("client already closed") } - c.mu.Lock() - defer c.mu.Unlock() - for _, p := range c.pool { - p.Close() - } c.pool = nil return nil } diff --git a/tcp/conn.go b/tcp/conn.go index 6e288081f2..3c82f30cee 100644 --- a/tcp/conn.go +++ b/tcp/conn.go @@ -1,6 +1,10 @@ package tcp -import "net" +import ( + "net" + + "github.com/davecgh/go-spew/spew" +) // poolConn is a wrapper around net.Conn to modify the the behavior of // net.Conn's Close() method. @@ -11,6 +15,7 @@ type poolConn struct { // Close() puts the given connects back to the pool instead of closing it. func (p poolConn) Close() error { + spew.Dump("I'm back on the queue!") return p.c.put(p.Conn) } diff --git a/tcp/connection_pool.go b/tcp/connection_pool.go new file mode 100644 index 0000000000..c7bb26873a --- /dev/null +++ b/tcp/connection_pool.go @@ -0,0 +1,66 @@ +package tcp + +import ( + "log" + "net" + "sync" +) + +type connectionPool struct { + mu sync.RWMutex + pool map[string]Pool +} + +func newConnectionPool() *connectionPool { + return &connectionPool{ + pool: make(map[string]Pool), + } +} + +func (c *connectionPool) setPool(addr string, p Pool) { + log.Println("setting pool") + c.mu.Lock() + c.pool[addr] = p + c.mu.Unlock() + log.Println("setting pool complete") +} + +func (c *connectionPool) getPool(addr string) (Pool, bool) { + log.Println("getting pool") + c.mu.Lock() + p, ok := c.pool[addr] + c.mu.Unlock() + log.Println("getting pool complete") + return p, ok +} + +func (c *connectionPool) size() int { + log.Println("getting pool size") + c.mu.RLock() + var size int + for _, p := range c.pool { + size += p.Len() + } + c.mu.RUnlock() + log.Println("getting pool size complete") + return size +} + +func (c *connectionPool) conn(addr string) (net.Conn, error) { + log.Println("getting connection") + c.mu.Lock() + conn, err := c.pool[addr].Get() + c.mu.Unlock() + log.Println("getting connection complete") + return conn, err +} + +func (c *connectionPool) close() error { + log.Println("closing") + c.mu.Lock() + for _, p := range c.pool { + p.Close() + } + c.mu.Unlock() + return nil +} diff --git a/tcp/listener.go b/tcp/listener.go index 62682f0084..147f1c32b5 100644 --- a/tcp/listener.go +++ b/tcp/listener.go @@ -114,40 +114,45 @@ func (s *Server) Close() error { // handleConnection services an individual TCP connection. func (s *Server) handleConnection(conn net.Conn) { - defer func() { - conn.Close() - s.wg.Done() - }() - - messageChannel := make(chan byte) // Start our reader up in a go routine so we don't block checking our close channel go func() { - var messageType byte - err := binary.Read(conn, binary.LittleEndian, &messageType) - if err != nil { - s.Logger.Printf("unable to read message type %s", err) - return + for { + var messageType byte + + err := binary.Read(conn, binary.LittleEndian, &messageType) + if err != nil { + s.Logger.Printf("unable to read message type %s", err) + return + } + s.processMessage(messageType, conn) + + if s.shutdown == nil { + return + } } - messageChannel <- messageType }() for { select { case <-s.shutdown: // Are we shutting down? If so, exit + conn.Close() + s.wg.Done() return - case messageType := <-messageChannel: - switch messageType { - case writeShardRequestMessage: - err := s.writeShardRequest(conn) - s.writeShardResponse(conn, err) - return - } default: } } +} +func (s *Server) processMessage(messageType byte, conn net.Conn) { + switch messageType { + case writeShardRequestMessage: + err := s.writeShardRequest(conn) + s.writeShardResponse(conn, err) + return + } + return } func (s *Server) writeShardRequest(conn net.Conn) error { diff --git a/tcp/listener_test.go b/tcp/listener_test.go index 0d9a0f030f..3804f9a330 100644 --- a/tcp/listener_test.go +++ b/tcp/listener_test.go @@ -93,6 +93,7 @@ func TestServer_Close_ErrBindAddressRequired(t *testing.T) { } } + func TestServer_WriteShardRequestSuccess(t *testing.T) { var ( ts = newTestServer(writeShardSuccess) @@ -158,6 +159,81 @@ func TestServer_WriteShardRequestSuccess(t *testing.T) { } } +func TestServer_WriteShardRequestMultipleSuccess(t *testing.T) { + var ( + ts = newTestServer(writeShardSuccess) + s = tcp.NewServer(ts) + ) + // Close the server + defer s.Close() + + // Start on a random port + host, e := s.ListenAndServe("127.0.0.1:0") + if e != nil { + t.Fatalf("err does not match. expected %v, got %v", nil, e) + } + + client := tcp.NewClient() + + now := time.Now() + + shardID := uint64(1) + var points []tsdb.Point + points = append(points, tsdb.NewPoint( + "cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now, + )) + + if err := client.WriteShard(host, shardID, points); err != nil { + t.Fatal(err) + } + + now = time.Now() + + points = append(points, tsdb.NewPoint( + "cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now, + )) + + if err := client.WriteShard(host, shardID, points[1:]); err != nil { + t.Fatal(err) + } + + if err := client.Close(); err != nil { + t.Fatal(err) + } + + responses, err := ts.ResponseN(1) + if err != nil { + t.Fatal(err) + } + + response := responses[0] + + if shardID != response.shardID { + t.Fatalf("unexpected shardID. exp: %d, got %d", shardID, response.shardID) + } + + got := response.points[0] + exp := points[0] + t.Log("got: ", spew.Sdump(got)) + t.Log("exp: ", spew.Sdump(exp)) + + if got.Name() != exp.Name() { + t.Fatal("unexpected name") + } + + if got.Fields()["value"] != exp.Fields()["value"] { + t.Fatal("unexpected fields") + } + + if got.Tags()["host"] != exp.Tags()["host"] { + t.Fatal("unexpected tags") + } + + if got.Time().UnixNano() != exp.Time().UnixNano() { + t.Fatal("unexpected time") + } +} + func TestServer_WriteShardRequestFail(t *testing.T) { var ( ts = newTestServer(writeShardFail) diff --git a/tcp/messages.go b/tcp/messages.go index d6e7ad5e19..31af94cc6e 100644 --- a/tcp/messages.go +++ b/tcp/messages.go @@ -1,6 +1,6 @@ package tcp const ( - writeShardRequestMessage byte = iota + writeShardRequestMessage byte = iota + 1 writeShardResponseMessage ) From 00ce3e5cfc00cbfb682cfad6627305da87ff5182 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 26 May 2015 17:19:51 -0600 Subject: [PATCH 4/9] nil out channel so we can check that if needed --- tcp/listener.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tcp/listener.go b/tcp/listener.go index 147f1c32b5..2cb5f92001 100644 --- a/tcp/listener.go +++ b/tcp/listener.go @@ -106,6 +106,7 @@ func (s *Server) Close() error { } // Shut down all handlers close(s.shutdown) + s.shutdown = nil s.wg.Wait() s.listener = nil From 3dc688cff2b4a521d1e709264c010133c2d00d24 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 26 May 2015 17:30:59 -0600 Subject: [PATCH 5/9] check shutdown properly --- tcp/listener.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tcp/listener.go b/tcp/listener.go index 2cb5f92001..9bb5057ceb 100644 --- a/tcp/listener.go +++ b/tcp/listener.go @@ -106,7 +106,6 @@ func (s *Server) Close() error { } // Shut down all handlers close(s.shutdown) - s.shutdown = nil s.wg.Wait() s.listener = nil @@ -128,8 +127,11 @@ func (s *Server) handleConnection(conn net.Conn) { } s.processMessage(messageType, conn) - if s.shutdown == nil { + select { + case <-s.shutdown: + // Are we shutting down? If so, exit return + default: } } }() From 1228de4e7c37e5305a49b5290bb103f74bd70d25 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 27 May 2015 10:02:38 -0600 Subject: [PATCH 6/9] move tcp to cluster --- cluster/client_pool.go | 59 +++++++ {tcp => cluster}/listener.go | 7 +- {tcp => cluster}/listener_test.go | 51 +++--- tcp/client.go => cluster/writer.go | 80 +++++----- tcp/channel.go | 131 --------------- tcp/channel_test.go | 247 ----------------------------- tcp/conn.go | 27 ---- tcp/conn_test.go | 10 -- tcp/connection_pool.go | 66 -------- tcp/messages.go | 6 - tcp/pool.go | 28 ---- 11 files changed, 136 insertions(+), 576 deletions(-) create mode 100644 cluster/client_pool.go rename {tcp => cluster}/listener.go (97%) rename {tcp => cluster}/listener_test.go (80%) rename tcp/client.go => cluster/writer.go (56%) delete mode 100644 tcp/channel.go delete mode 100644 tcp/channel_test.go delete mode 100644 tcp/conn.go delete mode 100644 tcp/conn_test.go delete mode 100644 tcp/connection_pool.go delete mode 100644 tcp/messages.go delete mode 100644 tcp/pool.go diff --git a/cluster/client_pool.go b/cluster/client_pool.go new file mode 100644 index 0000000000..659d171762 --- /dev/null +++ b/cluster/client_pool.go @@ -0,0 +1,59 @@ +package cluster + +import ( + "net" + "sync" + + "github.com/fatih/pool" + "github.com/influxdb/influxdb/meta" +) + +type clientPool struct { + mu sync.RWMutex + pool map[*meta.NodeInfo]pool.Pool +} + +func newClientPool() *clientPool { + return &clientPool{ + pool: make(map[*meta.NodeInfo]pool.Pool), + } +} + +func (c *clientPool) setPool(n *meta.NodeInfo, p pool.Pool) { + c.mu.Lock() + c.pool[n] = p + c.mu.Unlock() +} + +func (c *clientPool) getPool(n *meta.NodeInfo) (pool.Pool, bool) { + c.mu.Lock() + p, ok := c.pool[n] + c.mu.Unlock() + return p, ok +} + +func (c *clientPool) size() int { + c.mu.RLock() + var size int + for _, p := range c.pool { + size += p.Len() + } + c.mu.RUnlock() + return size +} + +func (c *clientPool) conn(n *meta.NodeInfo) (net.Conn, error) { + c.mu.Lock() + conn, err := c.pool[n].Get() + c.mu.Unlock() + return conn, err +} + +func (c *clientPool) close() error { + c.mu.Lock() + for _, p := range c.pool { + p.Close() + } + c.mu.Unlock() + return nil +} diff --git a/tcp/listener.go b/cluster/listener.go similarity index 97% rename from tcp/listener.go rename to cluster/listener.go index 9bb5057ceb..17c25773f7 100644 --- a/tcp/listener.go +++ b/cluster/listener.go @@ -1,4 +1,4 @@ -package tcp +package cluster import ( "encoding/binary" @@ -9,7 +9,6 @@ import ( "os" "sync" - "github.com/influxdb/influxdb/cluster" "github.com/influxdb/influxdb/tsdb" ) @@ -171,7 +170,7 @@ func (s *Server) writeShardRequest(conn net.Conn) error { return err } - var wsr cluster.WriteShardRequest + var wsr WriteShardRequest if err := wsr.UnmarshalBinary(message); err != nil { return err } @@ -185,7 +184,7 @@ func (s *Server) writeShardResponse(conn net.Conn, e error) { return } - var wsr cluster.WriteShardResponse + var wsr WriteShardResponse if e != nil { wsr.SetCode(1) wsr.SetMessage(e.Error()) diff --git a/tcp/listener_test.go b/cluster/listener_test.go similarity index 80% rename from tcp/listener_test.go rename to cluster/listener_test.go index 3804f9a330..38067e3827 100644 --- a/tcp/listener_test.go +++ b/cluster/listener_test.go @@ -1,4 +1,4 @@ -package tcp_test +package cluster_test import ( "fmt" @@ -6,10 +6,22 @@ import ( "time" "github.com/davecgh/go-spew/spew" - "github.com/influxdb/influxdb/tcp" + "github.com/influxdb/influxdb/cluster" + "github.com/influxdb/influxdb/meta" "github.com/influxdb/influxdb/tsdb" ) +type metaStore struct { + host string +} + +func (m *metaStore) Node(nodeID uint64) (*meta.NodeInfo, error) { + return &meta.NodeInfo{ + ID: nodeID, + Host: m.host, + }, nil +} + type testServer struct { writeShardFunc func(shardID uint64, points []tsdb.Point) error } @@ -62,7 +74,7 @@ func (testServer) ResponseN(n int) ([]*serverResponse, error) { func TestServer_Close_ErrServerClosed(t *testing.T) { var ( ts testServer - s = tcp.NewServer(ts) + s = cluster.NewServer(ts) ) // Start on a random port @@ -75,7 +87,7 @@ func TestServer_Close_ErrServerClosed(t *testing.T) { s.Close() // Try to close it again - if err := s.Close(); err != tcp.ErrServerClosed { + if err := s.Close(); err != cluster.ErrServerClosed { t.Fatalf("expected an error, got %v", err) } } @@ -83,13 +95,13 @@ func TestServer_Close_ErrServerClosed(t *testing.T) { func TestServer_Close_ErrBindAddressRequired(t *testing.T) { var ( ts testServer - s = tcp.NewServer(ts) + s = cluster.NewServer(ts) ) // Start on a random port _, e := s.ListenAndServe("") if e == nil { - t.Fatalf("exprected error %s, got nil.", tcp.ErrBindAddressRequired) + t.Fatalf("exprected error %s, got nil.", cluster.ErrBindAddressRequired) } } @@ -97,7 +109,7 @@ func TestServer_Close_ErrBindAddressRequired(t *testing.T) { func TestServer_WriteShardRequestSuccess(t *testing.T) { var ( ts = newTestServer(writeShardSuccess) - s = tcp.NewServer(ts) + s = cluster.NewServer(ts) ) // Close the server defer s.Close() @@ -108,21 +120,22 @@ func TestServer_WriteShardRequestSuccess(t *testing.T) { t.Fatalf("err does not match. expected %v, got %v", nil, e) } - client := tcp.NewClient() + writer := cluster.NewWriter(&metaStore{host: host}) now := time.Now() shardID := uint64(1) + ownerID := uint64(2) var points []tsdb.Point points = append(points, tsdb.NewPoint( "cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now, )) - if err := client.WriteShard(host, shardID, points); err != nil { + if err := writer.Write(shardID, ownerID, points); err != nil { t.Fatal(err) } - if err := client.Close(); err != nil { + if err := writer.Close(); err != nil { t.Fatal(err) } @@ -162,7 +175,7 @@ func TestServer_WriteShardRequestSuccess(t *testing.T) { func TestServer_WriteShardRequestMultipleSuccess(t *testing.T) { var ( ts = newTestServer(writeShardSuccess) - s = tcp.NewServer(ts) + s = cluster.NewServer(ts) ) // Close the server defer s.Close() @@ -173,17 +186,18 @@ func TestServer_WriteShardRequestMultipleSuccess(t *testing.T) { t.Fatalf("err does not match. expected %v, got %v", nil, e) } - client := tcp.NewClient() + writer := cluster.NewWriter(&metaStore{host: host}) now := time.Now() shardID := uint64(1) + ownerID := uint64(2) var points []tsdb.Point points = append(points, tsdb.NewPoint( "cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now, )) - if err := client.WriteShard(host, shardID, points); err != nil { + if err := writer.Write(shardID, ownerID, points); err != nil { t.Fatal(err) } @@ -193,11 +207,11 @@ func TestServer_WriteShardRequestMultipleSuccess(t *testing.T) { "cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now, )) - if err := client.WriteShard(host, shardID, points[1:]); err != nil { + if err := writer.Write(shardID, ownerID, points[1:]); err != nil { t.Fatal(err) } - if err := client.Close(); err != nil { + if err := writer.Close(); err != nil { t.Fatal(err) } @@ -237,7 +251,7 @@ func TestServer_WriteShardRequestMultipleSuccess(t *testing.T) { func TestServer_WriteShardRequestFail(t *testing.T) { var ( ts = newTestServer(writeShardFail) - s = tcp.NewServer(ts) + s = cluster.NewServer(ts) ) // Close the server defer s.Close() @@ -248,16 +262,17 @@ func TestServer_WriteShardRequestFail(t *testing.T) { t.Fatalf("err does not match. expected %v, got %v", nil, e) } - client := tcp.NewClient() + writer := cluster.NewWriter(&metaStore{host: host}) now := time.Now() shardID := uint64(1) + ownerID := uint64(2) var points []tsdb.Point points = append(points, tsdb.NewPoint( "cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now, )) - if err, exp := client.WriteShard(host, shardID, points), "error code 1: failed to write"; err == nil || err.Error() != exp { + if err, exp := writer.Write(shardID, ownerID, points), "error code 1: failed to write"; err == nil || err.Error() != exp { t.Fatalf("expected error %s, got %v", exp, err) } } diff --git a/tcp/client.go b/cluster/writer.go similarity index 56% rename from tcp/client.go rename to cluster/writer.go index 85b65e1012..24b02ec9ff 100644 --- a/tcp/client.go +++ b/cluster/writer.go @@ -1,78 +1,81 @@ -package tcp +package cluster import ( "encoding/binary" "fmt" "io" "net" - "strings" - "github.com/influxdb/influxdb/cluster" + "github.com/fatih/pool" + "github.com/influxdb/influxdb/meta" "github.com/influxdb/influxdb/tsdb" ) +const ( + writeShardRequestMessage byte = iota + 1 + writeShardResponseMessage +) + const maxConnections = 500 var errMaxConnectionsExceeded = fmt.Errorf("can not exceed max connections of %d", maxConnections) -type clientConn struct { - client *Client - addr string +type metaStore interface { + Node(id uint64) (ni *meta.NodeInfo, err error) } -func newClientConn(addr string, c *Client) *clientConn { - return &clientConn{ - addr: addr, - client: c, +type connFactory struct { + nodeInfo *meta.NodeInfo + clientPool interface { + size() int } } -func (c *clientConn) dial() (net.Conn, error) { - if c.client.poolSize() > maxConnections { + +func (c *connFactory) dial() (net.Conn, error) { + if c.clientPool.size() > maxConnections { return nil, errMaxConnectionsExceeded } - conn, err := net.Dial("tcp", c.addr) + conn, err := net.Dial("tcp", c.nodeInfo.Host) if err != nil { return nil, err } return conn, nil } -type Client struct { - pool *connectionPool +type Writer struct { + pool *clientPool + metaStore metaStore } -func NewClient() *Client { - return &Client{ - pool: newConnectionPool(), +func NewWriter(m metaStore) *Writer { + return &Writer{ + pool: newClientPool(), + metaStore: m, } } -func (c *Client) poolSize() int { - if c.pool == nil { - return 0 +func (c *Writer) dial(nodeID uint64) (net.Conn, error) { + nodeInfo, err := c.metaStore.Node(nodeID) + if err != nil { + return nil, err } - return c.pool.size() -} - -func (c *Client) dial(addr string) (net.Conn, error) { - addr = strings.ToLower(addr) // if we don't have a connection pool for that addr yet, create one - _, ok := c.pool.getPool(addr) + _, ok := c.pool.getPool(nodeInfo) if !ok { - conn := newClientConn(addr, c) - p, err := NewChannelPool(1, 3, conn.dial) + factory := &connFactory{nodeInfo: nodeInfo, clientPool: c.pool} + p, err := pool.NewChannelPool(1, 3, factory.dial) if err != nil { return nil, err } - c.pool.setPool(addr, p) + c.pool.setPool(nodeInfo, p) } - return c.pool.conn(addr) + return c.pool.conn(nodeInfo) } -func (c *Client) WriteShard(addr string, shardID uint64, points []tsdb.Point) error { - conn, err := c.dial(addr) +func (w *Writer) Write(shardID, ownerID uint64, points []tsdb.Point) error { + conn, err := w.dial(ownerID) if err != nil { return err } @@ -85,7 +88,7 @@ func (c *Client) WriteShard(addr string, shardID uint64, points []tsdb.Point) er return err } - var request cluster.WriteShardRequest + var request WriteShardRequest request.SetShardID(shardID) request.AddPoints(points) @@ -121,7 +124,7 @@ func (c *Client) WriteShard(addr string, shardID uint64, points []tsdb.Point) er return err } - var response cluster.WriteShardResponse + var response WriteShardResponse if err := response.UnmarshalBinary(message); err != nil { return err } @@ -132,11 +135,10 @@ func (c *Client) WriteShard(addr string, shardID uint64, points []tsdb.Point) er return nil } - -func (c *Client) Close() error { - if c.pool == nil { +func (w *Writer) Close() error { + if w.pool == nil { return fmt.Errorf("client already closed") } - c.pool = nil + w.pool = nil return nil } diff --git a/tcp/channel.go b/tcp/channel.go deleted file mode 100644 index b60a363beb..0000000000 --- a/tcp/channel.go +++ /dev/null @@ -1,131 +0,0 @@ -package tcp - -import ( - "errors" - "fmt" - "net" - "sync" -) - -// channelPool implements the Pool interface based on buffered channels. -type channelPool struct { - // storage for our net.Conn connections - mu sync.Mutex - conns chan net.Conn - - // net.Conn generator - factory Factory -} - -// Factory is a function to create new connections. -type Factory func() (net.Conn, error) - -// NewChannelPool returns a new pool based on buffered channels with an initial -// capacity and maximum capacity. Factory is used when initial capacity is -// greater than zero to fill the pool. A zero initialCap doesn't fill the Pool -// until a new Get() is called. During a Get(), If there is no new connection -// available in the pool, a new connection will be created via the Factory() -// method. -func NewChannelPool(initialCap, maxCap int, factory Factory) (Pool, error) { - if initialCap < 0 || maxCap <= 0 || initialCap > maxCap { - return nil, errors.New("invalid capacity settings") - } - - c := &channelPool{ - conns: make(chan net.Conn, maxCap), - factory: factory, - } - - // create initial connections, if something goes wrong, - // just close the pool error out. - for i := 0; i < initialCap; i++ { - conn, err := factory() - if err != nil { - c.Close() - return nil, fmt.Errorf("factory is not able to fill the pool: %s", err) - } - c.conns <- conn - } - - return c, nil -} - -func (c *channelPool) getConns() chan net.Conn { - c.mu.Lock() - conns := c.conns - c.mu.Unlock() - return conns -} - -// Get implements the Pool interfaces Get() method. If there is no new -// connection available in the pool, a new connection will be created via the -// Factory() method. -func (c *channelPool) Get() (net.Conn, error) { - conns := c.getConns() - if conns == nil { - return nil, ErrClosed - } - - // wrap our connections with out custom net.Conn implementation (wrapConn - // method) that puts the connection back to the pool if it's closed. - select { - case conn := <-conns: - if conn == nil { - return nil, ErrClosed - } - - return c.wrapConn(conn), nil - default: - conn, err := c.factory() - if err != nil { - return nil, err - } - - return c.wrapConn(conn), nil - } -} - -// put puts the connection back to the pool. If the pool is full or closed, -// conn is simply closed. A nil conn will be rejected. -func (c *channelPool) put(conn net.Conn) error { - if conn == nil { - return errors.New("connection is nil. rejecting") - } - - c.mu.Lock() - defer c.mu.Unlock() - - if c.conns == nil { - // pool is closed, close passed connection - return conn.Close() - } - - // put the resource back into the pool. If the pool is full, this will - // block and the default case will be executed. - select { - case c.conns <- conn: - return nil - default: - // pool is full, close passed connection - return conn.Close() - } -} - -func (c *channelPool) Close() { - c.mu.Lock() - conns := c.conns - c.conns = nil - c.factory = nil - c.mu.Unlock() - - if conns == nil { - return - } - - close(conns) - for conn := range conns { - conn.Close() - } -} - -func (c *channelPool) Len() int { return len(c.getConns()) } diff --git a/tcp/channel_test.go b/tcp/channel_test.go deleted file mode 100644 index c1cadae324..0000000000 --- a/tcp/channel_test.go +++ /dev/null @@ -1,247 +0,0 @@ -package tcp - -import ( - "log" - "math/rand" - "net" - "sync" - "testing" - "time" -) - -var ( - InitialCap = 5 - MaximumCap = 30 - network = "tcp" - address = "127.0.0.1:7777" - factory = func() (net.Conn, error) { return net.Dial(network, address) } -) - -func init() { - // used for factory function - go simpleTCPServer() - time.Sleep(time.Millisecond * 300) // wait until tcp server has been settled - - rand.Seed(time.Now().UTC().UnixNano()) -} - -func TestNew(t *testing.T) { - _, err := newChannelPool() - if err != nil { - t.Errorf("New error: %s", err) - } -} -func TestPool_Get_Impl(t *testing.T) { - p, _ := newChannelPool() - defer p.Close() - - conn, err := p.Get() - if err != nil { - t.Errorf("Get error: %s", err) - } - - _, ok := conn.(poolConn) - if !ok { - t.Errorf("Conn is not of type poolConn") - } -} - -func TestPool_Get(t *testing.T) { - p, _ := newChannelPool() - defer p.Close() - - _, err := p.Get() - if err != nil { - t.Errorf("Get error: %s", err) - } - - // after one get, current capacity should be lowered by one. - if p.Len() != (InitialCap - 1) { - t.Errorf("Get error. Expecting %d, got %d", - (InitialCap - 1), p.Len()) - } - - // get them all - var wg sync.WaitGroup - for i := 0; i < (InitialCap - 1); i++ { - wg.Add(1) - go func() { - defer wg.Done() - _, err := p.Get() - if err != nil { - t.Errorf("Get error: %s", err) - } - }() - } - wg.Wait() - - if p.Len() != 0 { - t.Errorf("Get error. Expecting %d, got %d", - (InitialCap - 1), p.Len()) - } - - _, err = p.Get() - if err != nil { - t.Errorf("Get error: %s", err) - } -} - -func TestPool_Put(t *testing.T) { - p, err := NewChannelPool(0, 30, factory) - if err != nil { - t.Fatal(err) - } - defer p.Close() - - // get/create from the pool - conns := make([]net.Conn, MaximumCap) - for i := 0; i < MaximumCap; i++ { - conn, _ := p.Get() - conns[i] = conn - } - - // now put them all back - for _, conn := range conns { - conn.Close() - } - - if p.Len() != MaximumCap { - t.Errorf("Put error len. Expecting %d, got %d", - 1, p.Len()) - } - - conn, _ := p.Get() - p.Close() // close pool - - conn.Close() // try to put into a full pool - if p.Len() != 0 { - t.Errorf("Put error. Closed pool shouldn't allow to put connections.") - } -} - -func TestPool_UsedCapacity(t *testing.T) { - p, _ := newChannelPool() - defer p.Close() - - if p.Len() != InitialCap { - t.Errorf("InitialCap error. Expecting %d, got %d", - InitialCap, p.Len()) - } -} - -func TestPool_Close(t *testing.T) { - p, _ := newChannelPool() - - // now close it and test all cases we are expecting. - p.Close() - - c := p.(*channelPool) - - if c.conns != nil { - t.Errorf("Close error, conns channel should be nil") - } - - if c.factory != nil { - t.Errorf("Close error, factory should be nil") - } - - _, err := p.Get() - if err == nil { - t.Errorf("Close error, get conn should return an error") - } - - if p.Len() != 0 { - t.Errorf("Close error used capacity. Expecting 0, got %d", p.Len()) - } -} - -func TestPoolConcurrent(t *testing.T) { - p, _ := newChannelPool() - pipe := make(chan net.Conn, 0) - - go func() { - p.Close() - }() - - for i := 0; i < MaximumCap; i++ { - go func() { - conn, _ := p.Get() - - pipe <- conn - }() - - go func() { - conn := <-pipe - if conn == nil { - return - } - conn.Close() - }() - } -} - -func TestPoolWriteRead(t *testing.T) { - p, _ := NewChannelPool(0, 30, factory) - - conn, _ := p.Get() - - msg := "hello" - _, err := conn.Write([]byte(msg)) - if err != nil { - t.Error(err) - } -} - -func TestPoolConcurrent2(t *testing.T) { - p, _ := NewChannelPool(0, 30, factory) - - var wg sync.WaitGroup - - go func() { - for i := 0; i < 10; i++ { - wg.Add(1) - go func(i int) { - conn, _ := p.Get() - time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) - conn.Close() - wg.Done() - }(i) - } - }() - - for i := 0; i < 10; i++ { - wg.Add(1) - go func(i int) { - conn, _ := p.Get() - time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) - conn.Close() - wg.Done() - }(i) - } - - wg.Wait() -} - -func newChannelPool() (Pool, error) { - return NewChannelPool(InitialCap, MaximumCap, factory) -} - -func simpleTCPServer() { - l, err := net.Listen(network, address) - if err != nil { - log.Fatal(err) - } - defer l.Close() - - for { - conn, err := l.Accept() - if err != nil { - log.Fatal(err) - } - - go func() { - buffer := make([]byte, 256) - conn.Read(buffer) - }() - } -} diff --git a/tcp/conn.go b/tcp/conn.go deleted file mode 100644 index 3c82f30cee..0000000000 --- a/tcp/conn.go +++ /dev/null @@ -1,27 +0,0 @@ -package tcp - -import ( - "net" - - "github.com/davecgh/go-spew/spew" -) - -// poolConn is a wrapper around net.Conn to modify the the behavior of -// net.Conn's Close() method. -type poolConn struct { - net.Conn - c *channelPool -} - -// Close() puts the given connects back to the pool instead of closing it. -func (p poolConn) Close() error { - spew.Dump("I'm back on the queue!") - return p.c.put(p.Conn) -} - -// newConn wraps a standard net.Conn to a poolConn net.Conn. -func (c *channelPool) wrapConn(conn net.Conn) net.Conn { - p := poolConn{c: c} - p.Conn = conn - return p -} diff --git a/tcp/conn_test.go b/tcp/conn_test.go deleted file mode 100644 index 12f9fdecde..0000000000 --- a/tcp/conn_test.go +++ /dev/null @@ -1,10 +0,0 @@ -package tcp - -import ( - "net" - "testing" -) - -func TestConn_Impl(t *testing.T) { - var _ net.Conn = new(poolConn) -} diff --git a/tcp/connection_pool.go b/tcp/connection_pool.go deleted file mode 100644 index c7bb26873a..0000000000 --- a/tcp/connection_pool.go +++ /dev/null @@ -1,66 +0,0 @@ -package tcp - -import ( - "log" - "net" - "sync" -) - -type connectionPool struct { - mu sync.RWMutex - pool map[string]Pool -} - -func newConnectionPool() *connectionPool { - return &connectionPool{ - pool: make(map[string]Pool), - } -} - -func (c *connectionPool) setPool(addr string, p Pool) { - log.Println("setting pool") - c.mu.Lock() - c.pool[addr] = p - c.mu.Unlock() - log.Println("setting pool complete") -} - -func (c *connectionPool) getPool(addr string) (Pool, bool) { - log.Println("getting pool") - c.mu.Lock() - p, ok := c.pool[addr] - c.mu.Unlock() - log.Println("getting pool complete") - return p, ok -} - -func (c *connectionPool) size() int { - log.Println("getting pool size") - c.mu.RLock() - var size int - for _, p := range c.pool { - size += p.Len() - } - c.mu.RUnlock() - log.Println("getting pool size complete") - return size -} - -func (c *connectionPool) conn(addr string) (net.Conn, error) { - log.Println("getting connection") - c.mu.Lock() - conn, err := c.pool[addr].Get() - c.mu.Unlock() - log.Println("getting connection complete") - return conn, err -} - -func (c *connectionPool) close() error { - log.Println("closing") - c.mu.Lock() - for _, p := range c.pool { - p.Close() - } - c.mu.Unlock() - return nil -} diff --git a/tcp/messages.go b/tcp/messages.go deleted file mode 100644 index 31af94cc6e..0000000000 --- a/tcp/messages.go +++ /dev/null @@ -1,6 +0,0 @@ -package tcp - -const ( - writeShardRequestMessage byte = iota + 1 - writeShardResponseMessage -) diff --git a/tcp/pool.go b/tcp/pool.go deleted file mode 100644 index 5f3e57b167..0000000000 --- a/tcp/pool.go +++ /dev/null @@ -1,28 +0,0 @@ -// Design is based heavily (or exactly) on the https://github.com/fatih/pool package -package tcp - -import ( - "errors" - "net" -) - -var ( - // ErrClosed is the error resulting if the pool is closed via pool.Close(). - ErrClosed = errors.New("pool is closed") -) - -// Pool interface describes a pool implementation. A pool should have maximum -// capacity. An ideal pool is threadsafe and easy to use. -type Pool interface { - // Get returns a new connection from the pool. Closing the connections puts - // it back to the Pool. Closing it when the pool is destroyed or full will - // be counted as an error. - Get() (net.Conn, error) - - // Close closes the pool and all its connections. After Close() the pool is - // no longer usable. - Close() - - // Len returns the current number of connections of the pool. - Len() int -} From 4da0e9a93cfd262f9041cfc481db3ba4961f49e1 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 27 May 2015 10:06:04 -0600 Subject: [PATCH 7/9] close client pool --- cluster/client_pool.go | 3 +-- cluster/writer.go | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cluster/client_pool.go b/cluster/client_pool.go index 659d171762..a4b5fc895f 100644 --- a/cluster/client_pool.go +++ b/cluster/client_pool.go @@ -49,11 +49,10 @@ func (c *clientPool) conn(n *meta.NodeInfo) (net.Conn, error) { return conn, err } -func (c *clientPool) close() error { +func (c *clientPool) close() { c.mu.Lock() for _, p := range c.pool { p.Close() } c.mu.Unlock() - return nil } diff --git a/cluster/writer.go b/cluster/writer.go index 24b02ec9ff..db87481ff0 100644 --- a/cluster/writer.go +++ b/cluster/writer.go @@ -139,6 +139,7 @@ func (w *Writer) Close() error { if w.pool == nil { return fmt.Errorf("client already closed") } + w.pool.close() w.pool = nil return nil } From b699938bdb8c926cb95c9ddfa8bbb8351c7bb7f5 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 27 May 2015 10:30:52 -0600 Subject: [PATCH 8/9] make the cluster listener a Opener --- cluster/listener.go | 36 +++++++++++++++++++-------- cluster/listener_test.go | 54 ++++++++++++++++------------------------ 2 files changed, 46 insertions(+), 44 deletions(-) diff --git a/cluster/listener.go b/cluster/listener.go index 17c25773f7..ae991ffa65 100644 --- a/cluster/listener.go +++ b/cluster/listener.go @@ -35,30 +35,38 @@ type Server struct { Logger *log.Logger shutdown chan struct{} + + mu sync.RWMutex + // the actual addr the server opens on + addr net.Addr + // used to initially spin up the server, could be a zero port + laddr string } // NewServer returns a new instance of a Server. -func NewServer(w writer) *Server { +func NewServer(w writer, laddr string) *Server { return &Server{ writer: w, + laddr: laddr, Logger: log.New(os.Stderr, "[tcp] ", log.LstdFlags), shutdown: make(chan struct{}), } } -// ListenAndServe instructs the Server to start processing connections -// on the given interface. iface must be in the form host:port -// If successful, it returns the host as the first argument -func (s *Server) ListenAndServe(laddr string) (string, error) { - if laddr == "" { // Make sure we have an laddr - return "", ErrBindAddressRequired +// Open instructs the Server to start processing connections +func (s *Server) Open() error { + if s.laddr == "" { // Make sure we have an laddr + return ErrBindAddressRequired } - ln, err := net.Listen("tcp", laddr) + ln, err := net.Listen("tcp", s.laddr) if err != nil { - return "", err + return err } + s.mu.Lock() s.listener = &ln + s.addr = ln.Addr() + s.mu.Unlock() s.Logger.Println("listening on TCP connection", ln.Addr().String()) s.wg.Add(1) @@ -88,8 +96,7 @@ func (s *Server) ListenAndServe(laddr string) (string, error) { } }() - // Return the host we started up on. Mostly needed for testing - return ln.Addr().String(), nil + return nil } // Close will close the listener @@ -210,3 +217,10 @@ func (s *Server) writeShardResponse(conn net.Conn, e error) { return } } + +func (s *Server) Addr() net.Addr { + s.mu.RLock() + defer s.mu.RUnlock() + return s.addr + +} diff --git a/cluster/listener_test.go b/cluster/listener_test.go index 38067e3827..9f1c70ff3b 100644 --- a/cluster/listener_test.go +++ b/cluster/listener_test.go @@ -74,12 +74,10 @@ func (testServer) ResponseN(n int) ([]*serverResponse, error) { func TestServer_Close_ErrServerClosed(t *testing.T) { var ( ts testServer - s = cluster.NewServer(ts) + s = cluster.NewServer(ts, "127.0.0.1:0") ) - // Start on a random port - _, e := s.ListenAndServe("127.0.0.1:0") - if e != nil { + if e := s.Open(); e != nil { t.Fatalf("err does not match. expected %v, got %v", nil, e) } @@ -95,32 +93,26 @@ func TestServer_Close_ErrServerClosed(t *testing.T) { func TestServer_Close_ErrBindAddressRequired(t *testing.T) { var ( ts testServer - s = cluster.NewServer(ts) + s = cluster.NewServer(ts, "") ) - - // Start on a random port - _, e := s.ListenAndServe("") - if e == nil { + if e := s.Open(); e == nil { t.Fatalf("exprected error %s, got nil.", cluster.ErrBindAddressRequired) } - } func TestServer_WriteShardRequestSuccess(t *testing.T) { var ( ts = newTestServer(writeShardSuccess) - s = cluster.NewServer(ts) + s = cluster.NewServer(ts, "127.0.0.1:0") ) - // Close the server - defer s.Close() - - // Start on a random port - host, e := s.ListenAndServe("127.0.0.1:0") + e := s.Open() if e != nil { t.Fatalf("err does not match. expected %v, got %v", nil, e) } + // Close the server + defer s.Close() - writer := cluster.NewWriter(&metaStore{host: host}) + writer := cluster.NewWriter(&metaStore{host: s.Addr().String()}) now := time.Now() @@ -175,18 +167,16 @@ func TestServer_WriteShardRequestSuccess(t *testing.T) { func TestServer_WriteShardRequestMultipleSuccess(t *testing.T) { var ( ts = newTestServer(writeShardSuccess) - s = cluster.NewServer(ts) + s = cluster.NewServer(ts, "127.0.0.1:0") ) + // Start on a random port + if e := s.Open(); e != nil { + t.Fatalf("err does not match. expected %v, got %v", nil, e) + } // Close the server defer s.Close() - // Start on a random port - host, e := s.ListenAndServe("127.0.0.1:0") - if e != nil { - t.Fatalf("err does not match. expected %v, got %v", nil, e) - } - - writer := cluster.NewWriter(&metaStore{host: host}) + writer := cluster.NewWriter(&metaStore{host: s.Addr().String()}) now := time.Now() @@ -251,18 +241,16 @@ func TestServer_WriteShardRequestMultipleSuccess(t *testing.T) { func TestServer_WriteShardRequestFail(t *testing.T) { var ( ts = newTestServer(writeShardFail) - s = cluster.NewServer(ts) + s = cluster.NewServer(ts, "127.0.0.1:0") ) + // Start on a random port + if e := s.Open(); e != nil { + t.Fatalf("err does not match. expected %v, got %v", nil, e) + } // Close the server defer s.Close() - // Start on a random port - host, e := s.ListenAndServe("127.0.0.1:0") - if e != nil { - t.Fatalf("err does not match. expected %v, got %v", nil, e) - } - - writer := cluster.NewWriter(&metaStore{host: host}) + writer := cluster.NewWriter(&metaStore{host: s.Addr().String()}) now := time.Now() shardID := uint64(1) From 5c1d407d5eb9d7e054596ca8d322b07afcdacad0 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 27 May 2015 13:18:03 -0600 Subject: [PATCH 9/9] pool map key is now nodeID, always get a fresh nodeInfo when dialing --- cluster/client_pool.go | 17 ++++++++--------- cluster/writer.go | 23 ++++++++++++----------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/cluster/client_pool.go b/cluster/client_pool.go index a4b5fc895f..b518ebd5cf 100644 --- a/cluster/client_pool.go +++ b/cluster/client_pool.go @@ -5,29 +5,28 @@ import ( "sync" "github.com/fatih/pool" - "github.com/influxdb/influxdb/meta" ) type clientPool struct { mu sync.RWMutex - pool map[*meta.NodeInfo]pool.Pool + pool map[uint64]pool.Pool } func newClientPool() *clientPool { return &clientPool{ - pool: make(map[*meta.NodeInfo]pool.Pool), + pool: make(map[uint64]pool.Pool), } } -func (c *clientPool) setPool(n *meta.NodeInfo, p pool.Pool) { +func (c *clientPool) setPool(nodeID uint64, p pool.Pool) { c.mu.Lock() - c.pool[n] = p + c.pool[nodeID] = p c.mu.Unlock() } -func (c *clientPool) getPool(n *meta.NodeInfo) (pool.Pool, bool) { +func (c *clientPool) getPool(nodeID uint64) (pool.Pool, bool) { c.mu.Lock() - p, ok := c.pool[n] + p, ok := c.pool[nodeID] c.mu.Unlock() return p, ok } @@ -42,9 +41,9 @@ func (c *clientPool) size() int { return size } -func (c *clientPool) conn(n *meta.NodeInfo) (net.Conn, error) { +func (c *clientPool) conn(nodeID uint64) (net.Conn, error) { c.mu.Lock() - conn, err := c.pool[n].Get() + conn, err := c.pool[nodeID].Get() c.mu.Unlock() return conn, err } diff --git a/cluster/writer.go b/cluster/writer.go index db87481ff0..06ec6c828c 100644 --- a/cluster/writer.go +++ b/cluster/writer.go @@ -25,7 +25,8 @@ type metaStore interface { } type connFactory struct { - nodeInfo *meta.NodeInfo + metaStore metaStore + nodeID uint64 clientPool interface { size() int } @@ -36,7 +37,12 @@ func (c *connFactory) dial() (net.Conn, error) { return nil, errMaxConnectionsExceeded } - conn, err := net.Dial("tcp", c.nodeInfo.Host) + nodeInfo, err := c.metaStore.Node(c.nodeID) + if err != nil { + return nil, err + } + + conn, err := net.Dial("tcp", nodeInfo.Host) if err != nil { return nil, err } @@ -56,22 +62,17 @@ func NewWriter(m metaStore) *Writer { } func (c *Writer) dial(nodeID uint64) (net.Conn, error) { - nodeInfo, err := c.metaStore.Node(nodeID) - if err != nil { - return nil, err - } - // if we don't have a connection pool for that addr yet, create one - _, ok := c.pool.getPool(nodeInfo) + _, ok := c.pool.getPool(nodeID) if !ok { - factory := &connFactory{nodeInfo: nodeInfo, clientPool: c.pool} + factory := &connFactory{nodeID: nodeID, metaStore: c.metaStore, clientPool: c.pool} p, err := pool.NewChannelPool(1, 3, factory.dial) if err != nil { return nil, err } - c.pool.setPool(nodeInfo, p) + c.pool.setPool(nodeID, p) } - return c.pool.conn(nodeInfo) + return c.pool.conn(nodeID) } func (w *Writer) Write(shardID, ownerID uint64, points []tsdb.Point) error {