Rename cluster.Writer to cluster.ShardWriter.

pull/2713/head
Ben Johnson 2015-05-30 10:11:23 -06:00
parent 016b43e52e
commit c916256ac9
10 changed files with 186 additions and 194 deletions

View File

@ -10,7 +10,7 @@ import (
"github.com/influxdb/influxdb/tsdb"
)
const defaultWriteTimeout = 5 * time.Second
const DefaultWriteTimeout = 5 * time.Second
// ConsistencyLevel represent a required replication criteria before a write can
// be returned as successful
@ -163,9 +163,9 @@ func (c *Coordinator) MapShards(wp *WritePointsRequest) (*ShardMapping, error) {
return mapping, nil
}
// Write is coordinates multiple writes across local and remote data nodes
// WritePoints is coordinates multiple writes across local and remote data nodes
// according the request consistency level
func (c *Coordinator) Write(p *WritePointsRequest) error {
func (c *Coordinator) WritePoints(p *WritePointsRequest) error {
shardMappings, err := c.MapShards(p)
if err != nil {
return err

View File

@ -251,7 +251,7 @@ func TestCoordinatorWrite(t *testing.T) {
Store: store,
}
if err := c.Write(pr); err != test.expErr {
if err := c.WritePoints(pr); err != test.expErr {
t.Errorf("Coordinator.Write(): '%s' error: got %v, exp %v", test.name, err, test.expErr)
}
}

View File

@ -1,14 +0,0 @@
package cluster
import (
"github.com/influxdb/influxdb/tsdb"
)
// QueryCoordinator provides an interface for translating queries to the
// appropriate metastore or data node function calls.
type QueryCoordinator struct {
MetaStore interface {
//...
}
indexes map[string]*tsdb.DatabaseIndex
}

View File

@ -1 +0,0 @@
package cluster_test

View File

@ -50,7 +50,7 @@ func (w *WriteShardRequest) SetShardID(id uint64) {
}
func (w *WriteShardRequest) Points() []tsdb.Point {
return w.unmarhalPoints()
return w.unmarshalPoints()
}
func (w *WriteShardRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string) {
@ -125,7 +125,7 @@ func (w *WriteShardRequest) UnmarshalBinary(buf []byte) error {
return nil
}
func (w *WriteShardRequest) unmarhalPoints() []tsdb.Point {
func (w *WriteShardRequest) unmarshalPoints() []tsdb.Point {
points := make([]tsdb.Point, len(w.pb.GetPoints()))
for i, p := range w.pb.GetPoints() {
pt := tsdb.NewPoint(

View File

@ -17,83 +17,35 @@ const (
writeShardResponseMessage
)
const (
maxConnections = 500
maxRetries = 3
)
// ShardWriter writes a set of points to a shard.
type ShardWriter struct {
pool *clientPool
timeout time.Duration
var errMaxConnectionsExceeded = fmt.Errorf("can not exceed max connections of %d", maxConnections)
type metaStore interface {
Node(id uint64) (ni *meta.NodeInfo, err error)
}
type connFactory struct {
metaStore metaStore
nodeID uint64
timeout time.Duration
clientPool interface {
size() int
MetaStore interface {
Node(id uint64) (ni *meta.NodeInfo, err error)
}
}
func (c *connFactory) dial() (net.Conn, error) {
if c.clientPool.size() > maxConnections {
return nil, errMaxConnectionsExceeded
}
nodeInfo, err := c.metaStore.Node(c.nodeID)
if err != nil {
return nil, err
}
conn, err := net.DialTimeout("tcp", nodeInfo.Host, c.timeout)
if err != nil {
return nil, err
}
return conn, nil
}
type Writer struct {
pool *clientPool
metaStore metaStore
timeout time.Duration
}
func NewWriter(m metaStore, timeout time.Duration) *Writer {
return &Writer{
pool: newClientPool(),
metaStore: m,
timeout: timeout,
// NewShardWriter returns a new instance of ShardWriter.
func NewShardWriter(timeout time.Duration) *ShardWriter {
return &ShardWriter{
pool: newClientPool(),
timeout: timeout,
}
}
func (c *Writer) dial(nodeID uint64) (net.Conn, error) {
// if we don't have a connection pool for that addr yet, create one
_, ok := c.pool.getPool(nodeID)
if !ok {
factory := &connFactory{nodeID: nodeID, metaStore: c.metaStore, clientPool: c.pool, timeout: c.timeout}
p, err := pool.NewChannelPool(1, 3, factory.dial)
if err != nil {
return nil, err
}
c.pool.setPool(nodeID, p)
}
return c.pool.conn(nodeID)
}
func (w *Writer) Write(shardID, ownerID uint64, points []tsdb.Point) error {
func (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []tsdb.Point) error {
c, err := w.dial(ownerID)
if err != nil {
return err
}
conn, ok := c.(*pool.PoolConn)
if !ok {
panic("wrong connection type")
}
// This will return the connection to the data pool
defer conn.Close()
defer conn.Close() // return to pool
conn.SetWriteDeadline(time.Now().Add(w.timeout))
var mt byte = writeShardRequestMessage
@ -102,15 +54,16 @@ func (w *Writer) Write(shardID, ownerID uint64, points []tsdb.Point) error {
return err
}
// Build write request.
var request WriteShardRequest
request.SetShardID(shardID)
request.AddPoints(points)
// Marshal into protocol buffers.
b, err := request.MarshalBinary()
if err != nil {
return err
}
size := int64(len(b))
conn.SetWriteDeadline(time.Now().Add(w.timeout))
@ -160,7 +113,23 @@ func (w *Writer) Write(shardID, ownerID uint64, points []tsdb.Point) error {
return nil
}
func (w *Writer) Close() error {
func (c *ShardWriter) dial(nodeID uint64) (net.Conn, error) {
// If we don't have a connection pool for that addr yet, create one
_, ok := c.pool.getPool(nodeID)
if !ok {
factory := &connFactory{nodeID: nodeID, clientPool: c.pool, timeout: c.timeout}
factory.metaStore = c.MetaStore
p, err := pool.NewChannelPool(1, 3, factory.dial)
if err != nil {
return nil, err
}
c.pool.setPool(nodeID, p)
}
return c.pool.conn(nodeID)
}
func (w *ShardWriter) Close() error {
if w.pool == nil {
return fmt.Errorf("client already closed")
}
@ -168,3 +137,41 @@ func (w *Writer) Close() error {
w.pool = nil
return nil
}
const (
maxConnections = 500
maxRetries = 3
)
var errMaxConnectionsExceeded = fmt.Errorf("can not exceed max connections of %d", maxConnections)
type connFactory struct {
nodeID uint64
timeout time.Duration
clientPool interface {
size() int
}
metaStore interface {
Node(id uint64) (ni *meta.NodeInfo, err error)
}
}
func (c *connFactory) dial() (net.Conn, error) {
if c.clientPool.size() > maxConnections {
return nil, errMaxConnectionsExceeded
}
nodeInfo, err := c.metaStore.Node(c.nodeID)
if err != nil {
return nil, err
}
conn, err := net.DialTimeout("tcp", nodeInfo.Host, c.timeout)
if err != nil {
return nil, err
}
return conn, nil
}

View File

@ -12,71 +12,53 @@ import (
"github.com/influxdb/influxdb/tsdb"
)
func Test_WriteShardRequestSuccess(t *testing.T) {
var (
ts = newTestServer(writeShardSuccess)
s = cluster.NewServer(ts, "127.0.0.1:0")
)
e := s.Open()
if e != nil {
t.Fatalf("err does not match. expected %v, got %v", nil, e)
// Ensure the shard writer can successful write a single request.
func TestShardWriter_WriteShard_Success(t *testing.T) {
ts := newTestServer(writeShardSuccess)
s := cluster.NewServer(ts, "127.0.0.1:0")
if err := s.Open(); err != nil {
t.Fatal(err)
}
// Close the server
defer s.Close()
writer := cluster.NewWriter(&metaStore{host: s.Addr().String()}, time.Minute)
w := cluster.NewShardWriter(time.Minute)
w.MetaStore = &metaStore{host: s.Addr().String()}
// Build a single point.
now := time.Now()
shardID := uint64(1)
ownerID := uint64(2)
shardID, ownerID := uint64(1), uint64(2)
var points []tsdb.Point
points = append(points, tsdb.NewPoint(
"cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now,
))
points = append(points, tsdb.NewPoint("cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now))
if err := writer.Write(shardID, ownerID, points); err != nil {
t.Fatal(err)
}
if err := writer.Close(); err != nil {
// Write to shard and close.
if err := w.WriteShard(shardID, ownerID, points); err != nil {
t.Fatal(err)
} else if err := w.Close(); err != nil {
t.Fatal(err)
}
// Validate response.
responses, err := ts.ResponseN(1)
if err != nil {
t.Fatal(err)
} else if responses[0].shardID != 1 {
t.Fatalf("unexpected shard id: %d", responses[0].shardID)
}
response := responses[0]
if shardID != response.shardID {
t.Fatalf("unexpected shardID. exp: %d, got %d", shardID, response.shardID)
}
got := response.points[0]
exp := points[0]
t.Log("got: ", spew.Sdump(got))
t.Log("exp: ", spew.Sdump(exp))
if got.Name() != exp.Name() {
t.Fatal("unexpected name")
}
if got.Fields()["value"] != exp.Fields()["value"] {
t.Fatal("unexpected fields")
}
if got.Tags()["host"] != exp.Tags()["host"] {
t.Fatal("unexpected tags")
}
if got.Time().UnixNano() != exp.Time().UnixNano() {
t.Fatal("unexpected time")
// Validate point.
if p := responses[0].points[0]; p.Name() != "cpu" {
t.Fatalf("unexpected name: %s", p.Name())
} else if p.Fields()["value"] != int64(100) {
t.Fatalf("unexpected 'value' field: %d", p.Fields()["value"])
} else if p.Tags()["host"] != "server01" {
t.Fatalf("unexpected 'host' tag: %s", p.Tags()["host"])
} else if p.Time().UnixNano() != now.UnixNano() {
t.Fatalf("unexpected time: %s", p.Time())
}
}
func Test_WriteShardRequestMultipleSuccess(t *testing.T) {
// Ensure the shard writer can successful write multiple requests.
func TestShardWriter_WriteShard_MultipleSuccess(t *testing.T) {
var (
ts = newTestServer(writeShardSuccess)
s = cluster.NewServer(ts, "127.0.0.1:0")
@ -88,7 +70,8 @@ func Test_WriteShardRequestMultipleSuccess(t *testing.T) {
// Close the server
defer s.Close()
writer := cluster.NewWriter(&metaStore{host: s.Addr().String()}, time.Minute)
w := cluster.NewShardWriter(time.Minute)
w.MetaStore = &metaStore{host: s.Addr().String()}
now := time.Now()
@ -99,7 +82,7 @@ func Test_WriteShardRequestMultipleSuccess(t *testing.T) {
"cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now,
))
if err := writer.Write(shardID, ownerID, points); err != nil {
if err := w.WriteShard(shardID, ownerID, points); err != nil {
t.Fatal(err)
}
@ -109,11 +92,11 @@ func Test_WriteShardRequestMultipleSuccess(t *testing.T) {
"cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now,
))
if err := writer.Write(shardID, ownerID, points[1:]); err != nil {
if err := w.WriteShard(shardID, ownerID, points[1:]); err != nil {
t.Fatal(err)
}
if err := writer.Close(); err != nil {
if err := w.Close(); err != nil {
t.Fatal(err)
}
@ -149,19 +132,18 @@ func Test_WriteShardRequestMultipleSuccess(t *testing.T) {
t.Fatal("unexpected time")
}
}
func Test_WriteShardRequestError(t *testing.T) {
var (
ts = newTestServer(writeShardFail)
s = cluster.NewServer(ts, "127.0.0.1:0")
)
// Start on a random port
if e := s.Open(); e != nil {
t.Fatalf("err does not match. expected %v, got %v", nil, e)
// Ensure the shard writer returns an error when the server fails to accept the write.
func TestShardWriter_WriteShard_Error(t *testing.T) {
ts := newTestServer(writeShardFail)
s := cluster.NewServer(ts, "127.0.0.1:0")
if err := s.Open(); err != nil {
t.Fatal(err)
}
// Close the server
defer s.Close()
writer := cluster.NewWriter(&metaStore{host: s.Addr().String()}, time.Minute)
w := cluster.NewShardWriter(time.Minute)
w.MetaStore = &metaStore{host: s.Addr().String()}
now := time.Now()
shardID := uint64(1)
@ -171,24 +153,22 @@ func Test_WriteShardRequestError(t *testing.T) {
"cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now,
))
if err, exp := writer.Write(shardID, ownerID, points), "error code 1: failed to write"; err == nil || err.Error() != exp {
if err, exp := w.WriteShard(shardID, ownerID, points), "error code 1: failed to write"; err == nil || err.Error() != exp {
t.Fatalf("expected error %s, got %v", exp, err)
}
}
func Test_WriterDialTimeout(t *testing.T) {
var (
ts = newTestServer(writeShardSuccess)
s = cluster.NewServer(ts, "127.0.0.1:0")
)
// Start on a random port
if e := s.Open(); e != nil {
t.Fatalf("err does not match. expected %v, got %v", nil, e)
// Ensure the shard writer returns an error when dialing times out.
func TestShardWriter_Write_ErrDialTimeout(t *testing.T) {
ts := newTestServer(writeShardSuccess)
s := cluster.NewServer(ts, "127.0.0.1:0")
if err := s.Open(); err != nil {
t.Fatal(err)
}
// Close the server
defer s.Close()
writer := cluster.NewWriter(&metaStore{host: s.Addr().String()}, time.Nanosecond)
w := cluster.NewShardWriter(time.Nanosecond)
w.MetaStore = &metaStore{host: s.Addr().String()}
now := time.Now()
shardID := uint64(1)
@ -198,18 +178,20 @@ func Test_WriterDialTimeout(t *testing.T) {
"cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now,
))
if err, exp := writer.Write(shardID, ownerID, points), "i/o timeout"; err == nil || !strings.Contains(err.Error(), exp) {
if err, exp := w.WriteShard(shardID, ownerID, points), "i/o timeout"; err == nil || !strings.Contains(err.Error(), exp) {
t.Fatalf("expected error %v, to contain %s", err, exp)
}
}
func Test_WriterReadTimeout(t *testing.T) {
// Ensure the shard writer returns an error when reading times out.
func TestShardWriter_Write_ErrReadTimeout(t *testing.T) {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
writer := cluster.NewWriter(&metaStore{host: ln.Addr().String()}, time.Millisecond)
w := cluster.NewShardWriter(time.Millisecond)
w.MetaStore = &metaStore{host: ln.Addr().String()}
now := time.Now()
shardID := uint64(1)
@ -219,7 +201,7 @@ func Test_WriterReadTimeout(t *testing.T) {
"cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now,
))
err = writer.Write(shardID, ownerID, points)
err = w.WriteShard(shardID, ownerID, points)
if err == nil {
t.Fatal("expected read io timeout error")
}

View File

@ -25,6 +25,9 @@ func main() {
fmt.Println(err)
os.Exit(1)
}
// Wait indefinitely.
<-(chan struct{})(nil)
}
// Main represents the program execution.
@ -77,9 +80,11 @@ func (m *Main) Run(args ...string) error {
if err := help.NewCommand().Run(args...); err != nil {
return fmt.Errorf("help: %s", err)
}
default:
return fmt.Errorf(`unknown command "%s"`+"\n"+`Run 'influxd help' for usage`+"\n\n", name)
}
return fmt.Errorf(`unknown command "%s"`+"\n"+`Run 'influxd help' for usage`+"\n\n", name)
return nil
}
// ParseCommandName extracts the command name and args from the args list.

View File

@ -8,15 +8,19 @@ import (
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/services/admin"
"github.com/influxdb/influxdb/services/collectd"
"github.com/influxdb/influxdb/services/graphite"
"github.com/influxdb/influxdb/services/httpd"
"github.com/influxdb/influxdb/services/opentsdb"
"github.com/influxdb/influxdb/tsdb"
)
// Server represents a container for the metadata and storage data and services.
// It is built using a Config and it manages the startup and shutdown of all
// services in the proper order.
type Server struct {
MetaStore *meta.Store
TSDBStore *tsdb.Store
MetaStore *meta.Store
TSDBStore *tsdb.Store
QueryExecutor *tsdb.QueryExecutor
PointsWriter tsdb.PointsWriter
Services []Service
}
@ -29,39 +33,49 @@ func NewServer(c *Config, joinURLs string) *Server {
TSDBStore: tsdb.NewStore(c.Data.Dir),
}
// Add cluster Service
s.Services = append(s.Services, cluster.NewService(c.Cluster))
// Add admin Service
if c.Admin.Enabled {
s.Services = append(s.Services, admin.NewService(c.Admin))
}
// HTTP API Service
if c.HTTPD.Enabled {
s.Services = append(s.Services, httpd.NewService(c.HTTPD))
}
// Graphite services
// Append services.
s.appendClusterService(c.Cluster)
s.appendAdminService(c.Admin)
s.appendHTTPDService(c.HTTPD)
s.appendCollectdService(c.Collectd)
s.appendOpenTSDBService(c.OpenTSDB)
for _, g := range c.Graphites {
if g.Enabled {
s.Services = append(s.Services, graphite.NewService(g))
}
}
// Collectd service
if c.Collectd.Enabled {
s.Services = append(s.Services, collectd.NewService(c.Collectd))
}
// OpenTSDB services
if c.OpenTSDB.Enabled {
s.Services = append(s.Services, opentsdb.NewService(c.OpenTSDB))
s.appendGraphiteServices(g)
}
return s
}
func (s *Server) appendClusterService(c cluster.Config) {
srv := cluster.NewService(c)
s.Services = append(s.Services, srv)
}
func (s *Server) appendAdminService(c admin.Config) {
srv := admin.NewService(c)
s.Services = append(s.Services, srv)
}
func (s *Server) appendHTTPDService(c httpd.Config) {
srv := httpd.NewService(c)
s.Services = append(s.Services, srv)
}
func (s *Server) appendCollectdService(c collectd.Config) {
srv := collectd.NewService(c)
s.Services = append(s.Services, srv)
}
func (s *Server) appendOpenTSDBService(c opentsdb.Config) {
srv := opentsdb.NewService(c)
s.Services = append(s.Services, srv)
}
func (s *Server) appendGraphiteService(c graphite.Config) {
srv := graphite.NewService(c)
s.Services = append(s.Services, srv)
}
// Open opens the meta and data store and all services.
func (s *Server) Open() error {
if err := func() error {

View File

@ -24,10 +24,9 @@ var (
)
type Store struct {
mu sync.RWMutex
path string
mu sync.RWMutex
databaseIndexes map[string]*DatabaseIndex
shards map[uint64]*Shard