From c916256ac98432806f247f9f020fad3205dc8fdc Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sat, 30 May 2015 10:11:23 -0600 Subject: [PATCH] Rename cluster.Writer to cluster.ShardWriter. --- cluster/coordinator.go | 6 +- cluster/coordinator_test.go | 2 +- cluster/query_coordinator.go | 14 -- cluster/query_coordinator_test.go | 1 - cluster/rpc.go | 4 +- cluster/{writer.go => shard_writer.go} | 135 +++++++++--------- .../{writer_test.go => shard_writer_test.go} | 134 ++++++++--------- cmd/influxd/main.go | 7 +- cmd/influxd/run/server.go | 74 ++++++---- tsdb/store.go | 3 +- 10 files changed, 186 insertions(+), 194 deletions(-) delete mode 100644 cluster/query_coordinator.go delete mode 100644 cluster/query_coordinator_test.go rename cluster/{writer.go => shard_writer.go} (74%) rename cluster/{writer_test.go => shard_writer_test.go} (51%) diff --git a/cluster/coordinator.go b/cluster/coordinator.go index 7a93c77bc8..9c92f3403b 100644 --- a/cluster/coordinator.go +++ b/cluster/coordinator.go @@ -10,7 +10,7 @@ import ( "github.com/influxdb/influxdb/tsdb" ) -const defaultWriteTimeout = 5 * time.Second +const DefaultWriteTimeout = 5 * time.Second // ConsistencyLevel represent a required replication criteria before a write can // be returned as successful @@ -163,9 +163,9 @@ func (c *Coordinator) MapShards(wp *WritePointsRequest) (*ShardMapping, error) { return mapping, nil } -// Write is coordinates multiple writes across local and remote data nodes +// WritePoints is coordinates multiple writes across local and remote data nodes // according the request consistency level -func (c *Coordinator) Write(p *WritePointsRequest) error { +func (c *Coordinator) WritePoints(p *WritePointsRequest) error { shardMappings, err := c.MapShards(p) if err != nil { return err diff --git a/cluster/coordinator_test.go b/cluster/coordinator_test.go index 879ba40673..4a1923ca4b 100644 --- a/cluster/coordinator_test.go +++ b/cluster/coordinator_test.go @@ -251,7 +251,7 @@ func TestCoordinatorWrite(t *testing.T) { Store: store, } - if err := c.Write(pr); err != test.expErr { + if err := c.WritePoints(pr); err != test.expErr { t.Errorf("Coordinator.Write(): '%s' error: got %v, exp %v", test.name, err, test.expErr) } } diff --git a/cluster/query_coordinator.go b/cluster/query_coordinator.go deleted file mode 100644 index 52825628c0..0000000000 --- a/cluster/query_coordinator.go +++ /dev/null @@ -1,14 +0,0 @@ -package cluster - -import ( - "github.com/influxdb/influxdb/tsdb" -) - -// QueryCoordinator provides an interface for translating queries to the -// appropriate metastore or data node function calls. -type QueryCoordinator struct { - MetaStore interface { - //... - } - indexes map[string]*tsdb.DatabaseIndex -} diff --git a/cluster/query_coordinator_test.go b/cluster/query_coordinator_test.go deleted file mode 100644 index f8474fb746..0000000000 --- a/cluster/query_coordinator_test.go +++ /dev/null @@ -1 +0,0 @@ -package cluster_test diff --git a/cluster/rpc.go b/cluster/rpc.go index 77bfe9dbf4..54be3fccfd 100644 --- a/cluster/rpc.go +++ b/cluster/rpc.go @@ -50,7 +50,7 @@ func (w *WriteShardRequest) SetShardID(id uint64) { } func (w *WriteShardRequest) Points() []tsdb.Point { - return w.unmarhalPoints() + return w.unmarshalPoints() } func (w *WriteShardRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string) { @@ -125,7 +125,7 @@ func (w *WriteShardRequest) UnmarshalBinary(buf []byte) error { return nil } -func (w *WriteShardRequest) unmarhalPoints() []tsdb.Point { +func (w *WriteShardRequest) unmarshalPoints() []tsdb.Point { points := make([]tsdb.Point, len(w.pb.GetPoints())) for i, p := range w.pb.GetPoints() { pt := tsdb.NewPoint( diff --git a/cluster/writer.go b/cluster/shard_writer.go similarity index 74% rename from cluster/writer.go rename to cluster/shard_writer.go index fe5e6abecd..ffe5c62915 100644 --- a/cluster/writer.go +++ b/cluster/shard_writer.go @@ -17,83 +17,35 @@ const ( writeShardResponseMessage ) -const ( - maxConnections = 500 - maxRetries = 3 -) +// ShardWriter writes a set of points to a shard. +type ShardWriter struct { + pool *clientPool + timeout time.Duration -var errMaxConnectionsExceeded = fmt.Errorf("can not exceed max connections of %d", maxConnections) - -type metaStore interface { - Node(id uint64) (ni *meta.NodeInfo, err error) -} - -type connFactory struct { - metaStore metaStore - nodeID uint64 - timeout time.Duration - clientPool interface { - size() int + MetaStore interface { + Node(id uint64) (ni *meta.NodeInfo, err error) } } -func (c *connFactory) dial() (net.Conn, error) { - if c.clientPool.size() > maxConnections { - return nil, errMaxConnectionsExceeded - } - - nodeInfo, err := c.metaStore.Node(c.nodeID) - if err != nil { - return nil, err - } - - conn, err := net.DialTimeout("tcp", nodeInfo.Host, c.timeout) - if err != nil { - return nil, err - } - return conn, nil -} - -type Writer struct { - pool *clientPool - metaStore metaStore - timeout time.Duration -} - -func NewWriter(m metaStore, timeout time.Duration) *Writer { - return &Writer{ - pool: newClientPool(), - metaStore: m, - timeout: timeout, +// NewShardWriter returns a new instance of ShardWriter. +func NewShardWriter(timeout time.Duration) *ShardWriter { + return &ShardWriter{ + pool: newClientPool(), + timeout: timeout, } } -func (c *Writer) dial(nodeID uint64) (net.Conn, error) { - // if we don't have a connection pool for that addr yet, create one - _, ok := c.pool.getPool(nodeID) - if !ok { - factory := &connFactory{nodeID: nodeID, metaStore: c.metaStore, clientPool: c.pool, timeout: c.timeout} - p, err := pool.NewChannelPool(1, 3, factory.dial) - if err != nil { - return nil, err - } - c.pool.setPool(nodeID, p) - } - return c.pool.conn(nodeID) -} - -func (w *Writer) Write(shardID, ownerID uint64, points []tsdb.Point) error { +func (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []tsdb.Point) error { c, err := w.dial(ownerID) if err != nil { return err } + conn, ok := c.(*pool.PoolConn) if !ok { panic("wrong connection type") } - - // This will return the connection to the data pool - defer conn.Close() + defer conn.Close() // return to pool conn.SetWriteDeadline(time.Now().Add(w.timeout)) var mt byte = writeShardRequestMessage @@ -102,15 +54,16 @@ func (w *Writer) Write(shardID, ownerID uint64, points []tsdb.Point) error { return err } + // Build write request. var request WriteShardRequest request.SetShardID(shardID) request.AddPoints(points) + // Marshal into protocol buffers. b, err := request.MarshalBinary() if err != nil { return err } - size := int64(len(b)) conn.SetWriteDeadline(time.Now().Add(w.timeout)) @@ -160,7 +113,23 @@ func (w *Writer) Write(shardID, ownerID uint64, points []tsdb.Point) error { return nil } -func (w *Writer) Close() error { +func (c *ShardWriter) dial(nodeID uint64) (net.Conn, error) { + // If we don't have a connection pool for that addr yet, create one + _, ok := c.pool.getPool(nodeID) + if !ok { + factory := &connFactory{nodeID: nodeID, clientPool: c.pool, timeout: c.timeout} + factory.metaStore = c.MetaStore + + p, err := pool.NewChannelPool(1, 3, factory.dial) + if err != nil { + return nil, err + } + c.pool.setPool(nodeID, p) + } + return c.pool.conn(nodeID) +} + +func (w *ShardWriter) Close() error { if w.pool == nil { return fmt.Errorf("client already closed") } @@ -168,3 +137,41 @@ func (w *Writer) Close() error { w.pool = nil return nil } + +const ( + maxConnections = 500 + maxRetries = 3 +) + +var errMaxConnectionsExceeded = fmt.Errorf("can not exceed max connections of %d", maxConnections) + +type connFactory struct { + nodeID uint64 + timeout time.Duration + + clientPool interface { + size() int + } + + metaStore interface { + Node(id uint64) (ni *meta.NodeInfo, err error) + } +} + +func (c *connFactory) dial() (net.Conn, error) { + if c.clientPool.size() > maxConnections { + return nil, errMaxConnectionsExceeded + } + + nodeInfo, err := c.metaStore.Node(c.nodeID) + if err != nil { + return nil, err + } + + conn, err := net.DialTimeout("tcp", nodeInfo.Host, c.timeout) + if err != nil { + return nil, err + } + + return conn, nil +} diff --git a/cluster/writer_test.go b/cluster/shard_writer_test.go similarity index 51% rename from cluster/writer_test.go rename to cluster/shard_writer_test.go index dac74f1bd4..0cbd306a93 100644 --- a/cluster/writer_test.go +++ b/cluster/shard_writer_test.go @@ -12,71 +12,53 @@ import ( "github.com/influxdb/influxdb/tsdb" ) -func Test_WriteShardRequestSuccess(t *testing.T) { - var ( - ts = newTestServer(writeShardSuccess) - s = cluster.NewServer(ts, "127.0.0.1:0") - ) - e := s.Open() - if e != nil { - t.Fatalf("err does not match. expected %v, got %v", nil, e) +// Ensure the shard writer can successful write a single request. +func TestShardWriter_WriteShard_Success(t *testing.T) { + ts := newTestServer(writeShardSuccess) + s := cluster.NewServer(ts, "127.0.0.1:0") + if err := s.Open(); err != nil { + t.Fatal(err) } - // Close the server defer s.Close() - writer := cluster.NewWriter(&metaStore{host: s.Addr().String()}, time.Minute) + w := cluster.NewShardWriter(time.Minute) + w.MetaStore = &metaStore{host: s.Addr().String()} + // Build a single point. now := time.Now() - - shardID := uint64(1) - ownerID := uint64(2) + shardID, ownerID := uint64(1), uint64(2) var points []tsdb.Point - points = append(points, tsdb.NewPoint( - "cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now, - )) + points = append(points, tsdb.NewPoint("cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now)) - if err := writer.Write(shardID, ownerID, points); err != nil { - t.Fatal(err) - } - - if err := writer.Close(); err != nil { + // Write to shard and close. + if err := w.WriteShard(shardID, ownerID, points); err != nil { + t.Fatal(err) + } else if err := w.Close(); err != nil { t.Fatal(err) } + // Validate response. responses, err := ts.ResponseN(1) if err != nil { t.Fatal(err) + } else if responses[0].shardID != 1 { + t.Fatalf("unexpected shard id: %d", responses[0].shardID) } - response := responses[0] - - if shardID != response.shardID { - t.Fatalf("unexpected shardID. exp: %d, got %d", shardID, response.shardID) - } - - got := response.points[0] - exp := points[0] - t.Log("got: ", spew.Sdump(got)) - t.Log("exp: ", spew.Sdump(exp)) - - if got.Name() != exp.Name() { - t.Fatal("unexpected name") - } - - if got.Fields()["value"] != exp.Fields()["value"] { - t.Fatal("unexpected fields") - } - - if got.Tags()["host"] != exp.Tags()["host"] { - t.Fatal("unexpected tags") - } - - if got.Time().UnixNano() != exp.Time().UnixNano() { - t.Fatal("unexpected time") + // Validate point. + if p := responses[0].points[0]; p.Name() != "cpu" { + t.Fatalf("unexpected name: %s", p.Name()) + } else if p.Fields()["value"] != int64(100) { + t.Fatalf("unexpected 'value' field: %d", p.Fields()["value"]) + } else if p.Tags()["host"] != "server01" { + t.Fatalf("unexpected 'host' tag: %s", p.Tags()["host"]) + } else if p.Time().UnixNano() != now.UnixNano() { + t.Fatalf("unexpected time: %s", p.Time()) } } -func Test_WriteShardRequestMultipleSuccess(t *testing.T) { +// Ensure the shard writer can successful write multiple requests. +func TestShardWriter_WriteShard_MultipleSuccess(t *testing.T) { var ( ts = newTestServer(writeShardSuccess) s = cluster.NewServer(ts, "127.0.0.1:0") @@ -88,7 +70,8 @@ func Test_WriteShardRequestMultipleSuccess(t *testing.T) { // Close the server defer s.Close() - writer := cluster.NewWriter(&metaStore{host: s.Addr().String()}, time.Minute) + w := cluster.NewShardWriter(time.Minute) + w.MetaStore = &metaStore{host: s.Addr().String()} now := time.Now() @@ -99,7 +82,7 @@ func Test_WriteShardRequestMultipleSuccess(t *testing.T) { "cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now, )) - if err := writer.Write(shardID, ownerID, points); err != nil { + if err := w.WriteShard(shardID, ownerID, points); err != nil { t.Fatal(err) } @@ -109,11 +92,11 @@ func Test_WriteShardRequestMultipleSuccess(t *testing.T) { "cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now, )) - if err := writer.Write(shardID, ownerID, points[1:]); err != nil { + if err := w.WriteShard(shardID, ownerID, points[1:]); err != nil { t.Fatal(err) } - if err := writer.Close(); err != nil { + if err := w.Close(); err != nil { t.Fatal(err) } @@ -149,19 +132,18 @@ func Test_WriteShardRequestMultipleSuccess(t *testing.T) { t.Fatal("unexpected time") } } -func Test_WriteShardRequestError(t *testing.T) { - var ( - ts = newTestServer(writeShardFail) - s = cluster.NewServer(ts, "127.0.0.1:0") - ) - // Start on a random port - if e := s.Open(); e != nil { - t.Fatalf("err does not match. expected %v, got %v", nil, e) + +// Ensure the shard writer returns an error when the server fails to accept the write. +func TestShardWriter_WriteShard_Error(t *testing.T) { + ts := newTestServer(writeShardFail) + s := cluster.NewServer(ts, "127.0.0.1:0") + if err := s.Open(); err != nil { + t.Fatal(err) } - // Close the server defer s.Close() - writer := cluster.NewWriter(&metaStore{host: s.Addr().String()}, time.Minute) + w := cluster.NewShardWriter(time.Minute) + w.MetaStore = &metaStore{host: s.Addr().String()} now := time.Now() shardID := uint64(1) @@ -171,24 +153,22 @@ func Test_WriteShardRequestError(t *testing.T) { "cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now, )) - if err, exp := writer.Write(shardID, ownerID, points), "error code 1: failed to write"; err == nil || err.Error() != exp { + if err, exp := w.WriteShard(shardID, ownerID, points), "error code 1: failed to write"; err == nil || err.Error() != exp { t.Fatalf("expected error %s, got %v", exp, err) } } -func Test_WriterDialTimeout(t *testing.T) { - var ( - ts = newTestServer(writeShardSuccess) - s = cluster.NewServer(ts, "127.0.0.1:0") - ) - // Start on a random port - if e := s.Open(); e != nil { - t.Fatalf("err does not match. expected %v, got %v", nil, e) +// Ensure the shard writer returns an error when dialing times out. +func TestShardWriter_Write_ErrDialTimeout(t *testing.T) { + ts := newTestServer(writeShardSuccess) + s := cluster.NewServer(ts, "127.0.0.1:0") + if err := s.Open(); err != nil { + t.Fatal(err) } - // Close the server defer s.Close() - writer := cluster.NewWriter(&metaStore{host: s.Addr().String()}, time.Nanosecond) + w := cluster.NewShardWriter(time.Nanosecond) + w.MetaStore = &metaStore{host: s.Addr().String()} now := time.Now() shardID := uint64(1) @@ -198,18 +178,20 @@ func Test_WriterDialTimeout(t *testing.T) { "cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now, )) - if err, exp := writer.Write(shardID, ownerID, points), "i/o timeout"; err == nil || !strings.Contains(err.Error(), exp) { + if err, exp := w.WriteShard(shardID, ownerID, points), "i/o timeout"; err == nil || !strings.Contains(err.Error(), exp) { t.Fatalf("expected error %v, to contain %s", err, exp) } } -func Test_WriterReadTimeout(t *testing.T) { +// Ensure the shard writer returns an error when reading times out. +func TestShardWriter_Write_ErrReadTimeout(t *testing.T) { ln, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { t.Fatal(err) } - writer := cluster.NewWriter(&metaStore{host: ln.Addr().String()}, time.Millisecond) + w := cluster.NewShardWriter(time.Millisecond) + w.MetaStore = &metaStore{host: ln.Addr().String()} now := time.Now() shardID := uint64(1) @@ -219,7 +201,7 @@ func Test_WriterReadTimeout(t *testing.T) { "cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now, )) - err = writer.Write(shardID, ownerID, points) + err = w.WriteShard(shardID, ownerID, points) if err == nil { t.Fatal("expected read io timeout error") } diff --git a/cmd/influxd/main.go b/cmd/influxd/main.go index cd6b3ec75b..d272a645d9 100644 --- a/cmd/influxd/main.go +++ b/cmd/influxd/main.go @@ -25,6 +25,9 @@ func main() { fmt.Println(err) os.Exit(1) } + + // Wait indefinitely. + <-(chan struct{})(nil) } // Main represents the program execution. @@ -77,9 +80,11 @@ func (m *Main) Run(args ...string) error { if err := help.NewCommand().Run(args...); err != nil { return fmt.Errorf("help: %s", err) } + default: + return fmt.Errorf(`unknown command "%s"`+"\n"+`Run 'influxd help' for usage`+"\n\n", name) } - return fmt.Errorf(`unknown command "%s"`+"\n"+`Run 'influxd help' for usage`+"\n\n", name) + return nil } // ParseCommandName extracts the command name and args from the args list. diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 07f863d2b2..06c5a3398a 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -8,15 +8,19 @@ import ( "github.com/influxdb/influxdb/meta" "github.com/influxdb/influxdb/services/admin" "github.com/influxdb/influxdb/services/collectd" - "github.com/influxdb/influxdb/services/graphite" "github.com/influxdb/influxdb/services/httpd" "github.com/influxdb/influxdb/services/opentsdb" "github.com/influxdb/influxdb/tsdb" ) +// Server represents a container for the metadata and storage data and services. +// It is built using a Config and it manages the startup and shutdown of all +// services in the proper order. type Server struct { - MetaStore *meta.Store - TSDBStore *tsdb.Store + MetaStore *meta.Store + TSDBStore *tsdb.Store + QueryExecutor *tsdb.QueryExecutor + PointsWriter tsdb.PointsWriter Services []Service } @@ -29,39 +33,49 @@ func NewServer(c *Config, joinURLs string) *Server { TSDBStore: tsdb.NewStore(c.Data.Dir), } - // Add cluster Service - s.Services = append(s.Services, cluster.NewService(c.Cluster)) - - // Add admin Service - if c.Admin.Enabled { - s.Services = append(s.Services, admin.NewService(c.Admin)) - } - - // HTTP API Service - if c.HTTPD.Enabled { - s.Services = append(s.Services, httpd.NewService(c.HTTPD)) - } - - // Graphite services + // Append services. + s.appendClusterService(c.Cluster) + s.appendAdminService(c.Admin) + s.appendHTTPDService(c.HTTPD) + s.appendCollectdService(c.Collectd) + s.appendOpenTSDBService(c.OpenTSDB) for _, g := range c.Graphites { - if g.Enabled { - s.Services = append(s.Services, graphite.NewService(g)) - } - } - - // Collectd service - if c.Collectd.Enabled { - s.Services = append(s.Services, collectd.NewService(c.Collectd)) - } - - // OpenTSDB services - if c.OpenTSDB.Enabled { - s.Services = append(s.Services, opentsdb.NewService(c.OpenTSDB)) + s.appendGraphiteServices(g) } return s } +func (s *Server) appendClusterService(c cluster.Config) { + srv := cluster.NewService(c) + s.Services = append(s.Services, srv) +} + +func (s *Server) appendAdminService(c admin.Config) { + srv := admin.NewService(c) + s.Services = append(s.Services, srv) +} + +func (s *Server) appendHTTPDService(c httpd.Config) { + srv := httpd.NewService(c) + s.Services = append(s.Services, srv) +} + +func (s *Server) appendCollectdService(c collectd.Config) { + srv := collectd.NewService(c) + s.Services = append(s.Services, srv) +} + +func (s *Server) appendOpenTSDBService(c opentsdb.Config) { + srv := opentsdb.NewService(c) + s.Services = append(s.Services, srv) +} + +func (s *Server) appendGraphiteService(c graphite.Config) { + srv := graphite.NewService(c) + s.Services = append(s.Services, srv) +} + // Open opens the meta and data store and all services. func (s *Server) Open() error { if err := func() error { diff --git a/tsdb/store.go b/tsdb/store.go index 43df666542..41735510c8 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -24,10 +24,9 @@ var ( ) type Store struct { + mu sync.RWMutex path string - mu sync.RWMutex - databaseIndexes map[string]*DatabaseIndex shards map[uint64]*Shard