diff --git a/CHANGELOG.md b/CHANGELOG.md index 4da3f03713..1d264a6478 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,17 @@ ## v1.0.0 [unreleased] +### Release Notes + +* Config option `[cluster]` has been replaced with `[coordinator]` + ### Features - [#3541](https://github.com/influxdata/influxdb/issues/3451): Update SHOW FIELD KEYS to return the field type with the field key. +### Bugfixes + +- [#6604](https://github.com/influxdata/influxdb/pull/6604): Remove old cluster code + ## v0.13.0 [unreleased] ### Release Notes diff --git a/cluster/balancer.go b/cluster/balancer.go deleted file mode 100644 index cb565f2388..0000000000 --- a/cluster/balancer.go +++ /dev/null @@ -1,69 +0,0 @@ -package cluster - -import ( - "math/rand" - - "github.com/influxdata/influxdb/services/meta" -) - -// Balancer represents a load-balancing algorithm for a set of nodes -type Balancer interface { - // Next returns the next Node according to the balancing method - // or nil if there are no nodes available - Next() *meta.NodeInfo -} - -type nodeBalancer struct { - nodes []meta.NodeInfo // data nodes to balance between - p int // current node index -} - -// NewNodeBalancer create a shuffled, round-robin balancer so that -// multiple instances will return nodes in randomized order and each -// each returned node will be repeated in a cycle -func NewNodeBalancer(nodes []meta.NodeInfo) Balancer { - // make a copy of the node slice so we can randomize it - // without affecting the original instance as well as ensure - // that each Balancer returns nodes in a different order - b := &nodeBalancer{} - - b.nodes = make([]meta.NodeInfo, len(nodes)) - copy(b.nodes, nodes) - - b.shuffle() - return b -} - -// shuffle randomizes the ordering the balancers available nodes -func (b *nodeBalancer) shuffle() { - for i := range b.nodes { - j := rand.Intn(i + 1) - b.nodes[i], b.nodes[j] = b.nodes[j], b.nodes[i] - } -} - -// online returns a slice of the nodes that are online -func (b *nodeBalancer) online() []meta.NodeInfo { - return b.nodes -} - -// Next returns the next available nodes -func (b *nodeBalancer) Next() *meta.NodeInfo { - // only use online nodes - up := b.online() - - // no nodes online - if len(up) == 0 { - return nil - } - - // rollover back to the beginning - if b.p >= len(up) { - b.p = 0 - } - - d := &up[b.p] - b.p++ - - return d -} diff --git a/cluster/balancer_test.go b/cluster/balancer_test.go deleted file mode 100644 index 0e52d2381a..0000000000 --- a/cluster/balancer_test.go +++ /dev/null @@ -1,115 +0,0 @@ -package cluster_test - -import ( - "fmt" - "testing" - - "github.com/influxdata/influxdb/cluster" - "github.com/influxdata/influxdb/services/meta" -) - -func NewNodes() []meta.NodeInfo { - var nodes []meta.NodeInfo - for i := 1; i <= 2; i++ { - nodes = append(nodes, meta.NodeInfo{ - ID: uint64(i), - Host: fmt.Sprintf("localhost:999%d", i), - }) - } - return nodes -} - -func TestBalancerEmptyNodes(t *testing.T) { - b := cluster.NewNodeBalancer([]meta.NodeInfo{}) - got := b.Next() - if got != nil { - t.Errorf("expected nil, got %v", got) - } -} - -func TestBalancerUp(t *testing.T) { - nodes := NewNodes() - b := cluster.NewNodeBalancer(nodes) - - // First node in randomized round-robin order - first := b.Next() - if first == nil { - t.Errorf("expected datanode, got %v", first) - } - - // Second node in randomized round-robin order - second := b.Next() - if second == nil { - t.Errorf("expected datanode, got %v", second) - } - - // Should never get the same node in order twice - if first.ID == second.ID { - t.Errorf("expected first != second. got %v = %v", first.ID, second.ID) - } -} - -/* -func TestBalancerDown(t *testing.T) { - nodes := NewNodes() - b := cluster.NewNodeBalancer(nodes) - - nodes[0].Down() - - // First node in randomized round-robin order - first := b.Next() - if first == nil { - t.Errorf("expected datanode, got %v", first) - } - - // Second node should rollover to the first up node - second := b.Next() - if second == nil { - t.Errorf("expected datanode, got %v", second) - } - - // Health node should be returned each time - if first.ID != 2 && first.ID != second.ID { - t.Errorf("expected first != second. got %v = %v", first.ID, second.ID) - } -} -*/ - -/* -func TestBalancerBackUp(t *testing.T) { - nodes := newDataNodes() - b := cluster.NewNodeBalancer(nodes) - - nodes[0].Down() - - for i := 0; i < 3; i++ { - got := b.Next() - if got == nil { - t.Errorf("expected datanode, got %v", got) - } - - if exp := uint64(2); got.ID != exp { - t.Errorf("wrong node id: exp %v, got %v", exp, got.ID) - } - } - - nodes[0].Up() - - // First node in randomized round-robin order - first := b.Next() - if first == nil { - t.Errorf("expected datanode, got %v", first) - } - - // Second node should rollover to the first up node - second := b.Next() - if second == nil { - t.Errorf("expected datanode, got %v", second) - } - - // Should get both nodes returned - if first.ID == second.ID { - t.Errorf("expected first != second. got %v = %v", first.ID, second.ID) - } -} -*/ diff --git a/cluster/client_pool.go b/cluster/client_pool.go deleted file mode 100644 index fed7e18e0e..0000000000 --- a/cluster/client_pool.go +++ /dev/null @@ -1,57 +0,0 @@ -package cluster - -import ( - "net" - "sync" - - "gopkg.in/fatih/pool.v2" -) - -type clientPool struct { - mu sync.RWMutex - pool map[uint64]pool.Pool -} - -func newClientPool() *clientPool { - return &clientPool{ - pool: make(map[uint64]pool.Pool), - } -} - -func (c *clientPool) setPool(nodeID uint64, p pool.Pool) { - c.mu.Lock() - c.pool[nodeID] = p - c.mu.Unlock() -} - -func (c *clientPool) getPool(nodeID uint64) (pool.Pool, bool) { - c.mu.RLock() - p, ok := c.pool[nodeID] - c.mu.RUnlock() - return p, ok -} - -func (c *clientPool) size() int { - c.mu.RLock() - var size int - for _, p := range c.pool { - size += p.Len() - } - c.mu.RUnlock() - return size -} - -func (c *clientPool) conn(nodeID uint64) (net.Conn, error) { - c.mu.RLock() - conn, err := c.pool[nodeID].Get() - c.mu.RUnlock() - return conn, err -} - -func (c *clientPool) close() { - c.mu.Lock() - for _, p := range c.pool { - p.Close() - } - c.mu.Unlock() -} diff --git a/cluster/cluster.go b/cluster/cluster.go deleted file mode 100644 index ed37c240f5..0000000000 --- a/cluster/cluster.go +++ /dev/null @@ -1 +0,0 @@ -package cluster // import "github.com/influxdata/influxdb/cluster" diff --git a/cluster/config.go b/cluster/config.go deleted file mode 100644 index 3c3ea94a7f..0000000000 --- a/cluster/config.go +++ /dev/null @@ -1,64 +0,0 @@ -package cluster - -import ( - "time" - - "github.com/influxdata/influxdb/influxql" - "github.com/influxdata/influxdb/toml" -) - -const ( - // DefaultWriteTimeout is the default timeout for a complete write to succeed. - DefaultWriteTimeout = 5 * time.Second - - // DefaultShardWriterTimeout is the default timeout set on shard writers. - DefaultShardWriterTimeout = 5 * time.Second - - // DefaultShardMapperTimeout is the default timeout set on shard mappers. - DefaultShardMapperTimeout = 5 * time.Second - - // DefaultMaxRemoteWriteConnections is the maximum number of open connections - // that will be available for remote writes to another host. - DefaultMaxRemoteWriteConnections = 3 - - // DefaultMaxConcurrentQueries is the maximum number of running queries. - // A value of zero will make the maximum query limit unlimited. - DefaultMaxConcurrentQueries = 0 - - // DefaultMaxSelectPointN is the maximum number of points a SELECT can process. - // A value of zero will make the maximum point count unlimited. - DefaultMaxSelectPointN = 0 - - // DefaultMaxSelectSeriesN is the maximum number of series a SELECT can run. - // A value of zero will make the maximum series count unlimited. - DefaultMaxSelectSeriesN = 0 -) - -// Config represents the configuration for the clustering service. -type Config struct { - ForceRemoteShardMapping bool `toml:"force-remote-mapping"` - WriteTimeout toml.Duration `toml:"write-timeout"` - ShardWriterTimeout toml.Duration `toml:"shard-writer-timeout"` - MaxRemoteWriteConnections int `toml:"max-remote-write-connections"` - ShardMapperTimeout toml.Duration `toml:"shard-mapper-timeout"` - MaxConcurrentQueries int `toml:"max-concurrent-queries"` - QueryTimeout toml.Duration `toml:"query-timeout"` - LogQueriesAfter toml.Duration `toml:"log-queries-after"` - MaxSelectPointN int `toml:"max-select-point"` - MaxSelectSeriesN int `toml:"max-select-series"` - MaxSelectBucketsN int `toml:"max-select-buckets"` -} - -// NewConfig returns an instance of Config with defaults. -func NewConfig() Config { - return Config{ - WriteTimeout: toml.Duration(DefaultWriteTimeout), - ShardWriterTimeout: toml.Duration(DefaultShardWriterTimeout), - ShardMapperTimeout: toml.Duration(DefaultShardMapperTimeout), - QueryTimeout: toml.Duration(influxql.DefaultQueryTimeout), - MaxRemoteWriteConnections: DefaultMaxRemoteWriteConnections, - MaxConcurrentQueries: DefaultMaxConcurrentQueries, - MaxSelectPointN: DefaultMaxSelectPointN, - MaxSelectSeriesN: DefaultMaxSelectSeriesN, - } -} diff --git a/cluster/internal/data.pb.go b/cluster/internal/data.pb.go deleted file mode 100644 index 4fd41e029a..0000000000 --- a/cluster/internal/data.pb.go +++ /dev/null @@ -1,575 +0,0 @@ -// Code generated by protoc-gen-gogo. -// source: internal/data.proto -// DO NOT EDIT! - -/* -Package cluster is a generated protocol buffer package. - -It is generated from these files: - internal/data.proto - -It has these top-level messages: - WriteShardRequest - WriteShardResponse - ExecuteStatementRequest - ExecuteStatementResponse - CreateIteratorRequest - CreateIteratorResponse - IteratorStats - FieldDimensionsRequest - FieldDimensionsResponse - SeriesKeysRequest - SeriesKeysResponse - ExpandSourcesRequest - ExpandSourcesResponse - RemoteMonitorRequest - RemoteMonitorResponse - BackupShardRequest - BackupShardResponse - CopyShardRequest - CopyShardResponse -*/ -package cluster - -import proto "github.com/gogo/protobuf/proto" -import fmt "fmt" -import math "math" - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -type WriteShardRequest struct { - ShardID *uint64 `protobuf:"varint,1,req,name=ShardID" json:"ShardID,omitempty"` - Points [][]byte `protobuf:"bytes,2,rep,name=Points" json:"Points,omitempty"` - Database *string `protobuf:"bytes,3,opt,name=Database" json:"Database,omitempty"` - RetentionPolicy *string `protobuf:"bytes,4,opt,name=RetentionPolicy" json:"RetentionPolicy,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *WriteShardRequest) Reset() { *m = WriteShardRequest{} } -func (m *WriteShardRequest) String() string { return proto.CompactTextString(m) } -func (*WriteShardRequest) ProtoMessage() {} - -func (m *WriteShardRequest) GetShardID() uint64 { - if m != nil && m.ShardID != nil { - return *m.ShardID - } - return 0 -} - -func (m *WriteShardRequest) GetPoints() [][]byte { - if m != nil { - return m.Points - } - return nil -} - -func (m *WriteShardRequest) GetDatabase() string { - if m != nil && m.Database != nil { - return *m.Database - } - return "" -} - -func (m *WriteShardRequest) GetRetentionPolicy() string { - if m != nil && m.RetentionPolicy != nil { - return *m.RetentionPolicy - } - return "" -} - -type WriteShardResponse struct { - Code *int32 `protobuf:"varint,1,req,name=Code" json:"Code,omitempty"` - Message *string `protobuf:"bytes,2,opt,name=Message" json:"Message,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *WriteShardResponse) Reset() { *m = WriteShardResponse{} } -func (m *WriteShardResponse) String() string { return proto.CompactTextString(m) } -func (*WriteShardResponse) ProtoMessage() {} - -func (m *WriteShardResponse) GetCode() int32 { - if m != nil && m.Code != nil { - return *m.Code - } - return 0 -} - -func (m *WriteShardResponse) GetMessage() string { - if m != nil && m.Message != nil { - return *m.Message - } - return "" -} - -type ExecuteStatementRequest struct { - Statement *string `protobuf:"bytes,1,req,name=Statement" json:"Statement,omitempty"` - Database *string `protobuf:"bytes,2,req,name=Database" json:"Database,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *ExecuteStatementRequest) Reset() { *m = ExecuteStatementRequest{} } -func (m *ExecuteStatementRequest) String() string { return proto.CompactTextString(m) } -func (*ExecuteStatementRequest) ProtoMessage() {} - -func (m *ExecuteStatementRequest) GetStatement() string { - if m != nil && m.Statement != nil { - return *m.Statement - } - return "" -} - -func (m *ExecuteStatementRequest) GetDatabase() string { - if m != nil && m.Database != nil { - return *m.Database - } - return "" -} - -type ExecuteStatementResponse struct { - Code *int32 `protobuf:"varint,1,req,name=Code" json:"Code,omitempty"` - Message *string `protobuf:"bytes,2,opt,name=Message" json:"Message,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *ExecuteStatementResponse) Reset() { *m = ExecuteStatementResponse{} } -func (m *ExecuteStatementResponse) String() string { return proto.CompactTextString(m) } -func (*ExecuteStatementResponse) ProtoMessage() {} - -func (m *ExecuteStatementResponse) GetCode() int32 { - if m != nil && m.Code != nil { - return *m.Code - } - return 0 -} - -func (m *ExecuteStatementResponse) GetMessage() string { - if m != nil && m.Message != nil { - return *m.Message - } - return "" -} - -type CreateIteratorRequest struct { - ShardIDs []uint64 `protobuf:"varint,1,rep,name=ShardIDs" json:"ShardIDs,omitempty"` - Opt []byte `protobuf:"bytes,2,req,name=Opt" json:"Opt,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *CreateIteratorRequest) Reset() { *m = CreateIteratorRequest{} } -func (m *CreateIteratorRequest) String() string { return proto.CompactTextString(m) } -func (*CreateIteratorRequest) ProtoMessage() {} - -func (m *CreateIteratorRequest) GetShardIDs() []uint64 { - if m != nil { - return m.ShardIDs - } - return nil -} - -func (m *CreateIteratorRequest) GetOpt() []byte { - if m != nil { - return m.Opt - } - return nil -} - -type CreateIteratorResponse struct { - Err *string `protobuf:"bytes,1,opt,name=Err" json:"Err,omitempty"` - Type *int32 `protobuf:"varint,2,req,name=Type" json:"Type,omitempty"` - Stats *IteratorStats `protobuf:"bytes,3,opt,name=Stats" json:"Stats,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *CreateIteratorResponse) Reset() { *m = CreateIteratorResponse{} } -func (m *CreateIteratorResponse) String() string { return proto.CompactTextString(m) } -func (*CreateIteratorResponse) ProtoMessage() {} - -func (m *CreateIteratorResponse) GetErr() string { - if m != nil && m.Err != nil { - return *m.Err - } - return "" -} - -func (m *CreateIteratorResponse) GetType() int32 { - if m != nil && m.Type != nil { - return *m.Type - } - return 0 -} - -func (m *CreateIteratorResponse) GetStats() *IteratorStats { - if m != nil { - return m.Stats - } - return nil -} - -type IteratorStats struct { - SeriesN *int64 `protobuf:"varint,1,opt,name=SeriesN" json:"SeriesN,omitempty"` - PointN *int64 `protobuf:"varint,2,opt,name=PointN" json:"PointN,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *IteratorStats) Reset() { *m = IteratorStats{} } -func (m *IteratorStats) String() string { return proto.CompactTextString(m) } -func (*IteratorStats) ProtoMessage() {} - -func (m *IteratorStats) GetSeriesN() int64 { - if m != nil && m.SeriesN != nil { - return *m.SeriesN - } - return 0 -} - -func (m *IteratorStats) GetPointN() int64 { - if m != nil && m.PointN != nil { - return *m.PointN - } - return 0 -} - -type FieldDimensionsRequest struct { - ShardIDs []uint64 `protobuf:"varint,1,rep,name=ShardIDs" json:"ShardIDs,omitempty"` - Sources []byte `protobuf:"bytes,2,req,name=Sources" json:"Sources,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *FieldDimensionsRequest) Reset() { *m = FieldDimensionsRequest{} } -func (m *FieldDimensionsRequest) String() string { return proto.CompactTextString(m) } -func (*FieldDimensionsRequest) ProtoMessage() {} - -func (m *FieldDimensionsRequest) GetShardIDs() []uint64 { - if m != nil { - return m.ShardIDs - } - return nil -} - -func (m *FieldDimensionsRequest) GetSources() []byte { - if m != nil { - return m.Sources - } - return nil -} - -type FieldDimensionsResponse struct { - Fields []string `protobuf:"bytes,1,rep,name=Fields" json:"Fields,omitempty"` - Dimensions []string `protobuf:"bytes,2,rep,name=Dimensions" json:"Dimensions,omitempty"` - Err *string `protobuf:"bytes,3,opt,name=Err" json:"Err,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *FieldDimensionsResponse) Reset() { *m = FieldDimensionsResponse{} } -func (m *FieldDimensionsResponse) String() string { return proto.CompactTextString(m) } -func (*FieldDimensionsResponse) ProtoMessage() {} - -func (m *FieldDimensionsResponse) GetFields() []string { - if m != nil { - return m.Fields - } - return nil -} - -func (m *FieldDimensionsResponse) GetDimensions() []string { - if m != nil { - return m.Dimensions - } - return nil -} - -func (m *FieldDimensionsResponse) GetErr() string { - if m != nil && m.Err != nil { - return *m.Err - } - return "" -} - -type SeriesKeysRequest struct { - ShardIDs []uint64 `protobuf:"varint,1,rep,name=ShardIDs" json:"ShardIDs,omitempty"` - Opt []byte `protobuf:"bytes,2,req,name=Opt" json:"Opt,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *SeriesKeysRequest) Reset() { *m = SeriesKeysRequest{} } -func (m *SeriesKeysRequest) String() string { return proto.CompactTextString(m) } -func (*SeriesKeysRequest) ProtoMessage() {} - -func (m *SeriesKeysRequest) GetShardIDs() []uint64 { - if m != nil { - return m.ShardIDs - } - return nil -} - -func (m *SeriesKeysRequest) GetOpt() []byte { - if m != nil { - return m.Opt - } - return nil -} - -type SeriesKeysResponse struct { - SeriesList []byte `protobuf:"bytes,1,opt,name=SeriesList" json:"SeriesList,omitempty"` - Err *string `protobuf:"bytes,2,opt,name=Err" json:"Err,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *SeriesKeysResponse) Reset() { *m = SeriesKeysResponse{} } -func (m *SeriesKeysResponse) String() string { return proto.CompactTextString(m) } -func (*SeriesKeysResponse) ProtoMessage() {} - -func (m *SeriesKeysResponse) GetSeriesList() []byte { - if m != nil { - return m.SeriesList - } - return nil -} - -func (m *SeriesKeysResponse) GetErr() string { - if m != nil && m.Err != nil { - return *m.Err - } - return "" -} - -type ExpandSourcesRequest struct { - ShardIDs []uint64 `protobuf:"varint,1,rep,name=ShardIDs" json:"ShardIDs,omitempty"` - Sources []byte `protobuf:"bytes,2,req,name=Sources" json:"Sources,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *ExpandSourcesRequest) Reset() { *m = ExpandSourcesRequest{} } -func (m *ExpandSourcesRequest) String() string { return proto.CompactTextString(m) } -func (*ExpandSourcesRequest) ProtoMessage() {} - -func (m *ExpandSourcesRequest) GetShardIDs() []uint64 { - if m != nil { - return m.ShardIDs - } - return nil -} - -func (m *ExpandSourcesRequest) GetSources() []byte { - if m != nil { - return m.Sources - } - return nil -} - -type ExpandSourcesResponse struct { - Sources []byte `protobuf:"bytes,1,req,name=Sources" json:"Sources,omitempty"` - Err *string `protobuf:"bytes,2,opt,name=Err" json:"Err,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *ExpandSourcesResponse) Reset() { *m = ExpandSourcesResponse{} } -func (m *ExpandSourcesResponse) String() string { return proto.CompactTextString(m) } -func (*ExpandSourcesResponse) ProtoMessage() {} - -func (m *ExpandSourcesResponse) GetSources() []byte { - if m != nil { - return m.Sources - } - return nil -} - -func (m *ExpandSourcesResponse) GetErr() string { - if m != nil && m.Err != nil { - return *m.Err - } - return "" -} - -type RemoteMonitorRequest struct { - RemoteAddrs []string `protobuf:"bytes,1,rep,name=RemoteAddrs" json:"RemoteAddrs,omitempty"` - NodeID *string `protobuf:"bytes,2,req,name=NodeID" json:"NodeID,omitempty"` - Username *string `protobuf:"bytes,3,req,name=Username" json:"Username,omitempty"` - Password *string `protobuf:"bytes,4,req,name=Password" json:"Password,omitempty"` - ClusterID *uint64 `protobuf:"varint,5,req,name=ClusterID" json:"ClusterID,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *RemoteMonitorRequest) Reset() { *m = RemoteMonitorRequest{} } -func (m *RemoteMonitorRequest) String() string { return proto.CompactTextString(m) } -func (*RemoteMonitorRequest) ProtoMessage() {} - -func (m *RemoteMonitorRequest) GetRemoteAddrs() []string { - if m != nil { - return m.RemoteAddrs - } - return nil -} - -func (m *RemoteMonitorRequest) GetNodeID() string { - if m != nil && m.NodeID != nil { - return *m.NodeID - } - return "" -} - -func (m *RemoteMonitorRequest) GetUsername() string { - if m != nil && m.Username != nil { - return *m.Username - } - return "" -} - -func (m *RemoteMonitorRequest) GetPassword() string { - if m != nil && m.Password != nil { - return *m.Password - } - return "" -} - -func (m *RemoteMonitorRequest) GetClusterID() uint64 { - if m != nil && m.ClusterID != nil { - return *m.ClusterID - } - return 0 -} - -type RemoteMonitorResponse struct { - Err *string `protobuf:"bytes,1,opt,name=Err" json:"Err,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *RemoteMonitorResponse) Reset() { *m = RemoteMonitorResponse{} } -func (m *RemoteMonitorResponse) String() string { return proto.CompactTextString(m) } -func (*RemoteMonitorResponse) ProtoMessage() {} - -func (m *RemoteMonitorResponse) GetErr() string { - if m != nil && m.Err != nil { - return *m.Err - } - return "" -} - -type BackupShardRequest struct { - ShardID *uint64 `protobuf:"varint,1,req,name=ShardID" json:"ShardID,omitempty"` - Since *int64 `protobuf:"varint,2,opt,name=Since" json:"Since,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *BackupShardRequest) Reset() { *m = BackupShardRequest{} } -func (m *BackupShardRequest) String() string { return proto.CompactTextString(m) } -func (*BackupShardRequest) ProtoMessage() {} - -func (m *BackupShardRequest) GetShardID() uint64 { - if m != nil && m.ShardID != nil { - return *m.ShardID - } - return 0 -} - -func (m *BackupShardRequest) GetSince() int64 { - if m != nil && m.Since != nil { - return *m.Since - } - return 0 -} - -type BackupShardResponse struct { - Err *string `protobuf:"bytes,2,opt,name=Err" json:"Err,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *BackupShardResponse) Reset() { *m = BackupShardResponse{} } -func (m *BackupShardResponse) String() string { return proto.CompactTextString(m) } -func (*BackupShardResponse) ProtoMessage() {} - -func (m *BackupShardResponse) GetErr() string { - if m != nil && m.Err != nil { - return *m.Err - } - return "" -} - -type CopyShardRequest struct { - Host *string `protobuf:"bytes,1,req,name=Host" json:"Host,omitempty"` - Database *string `protobuf:"bytes,2,req,name=Database" json:"Database,omitempty"` - Policy *string `protobuf:"bytes,3,req,name=Policy" json:"Policy,omitempty"` - ShardID *uint64 `protobuf:"varint,4,req,name=ShardID" json:"ShardID,omitempty"` - Since *int64 `protobuf:"varint,5,opt,name=Since" json:"Since,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *CopyShardRequest) Reset() { *m = CopyShardRequest{} } -func (m *CopyShardRequest) String() string { return proto.CompactTextString(m) } -func (*CopyShardRequest) ProtoMessage() {} - -func (m *CopyShardRequest) GetHost() string { - if m != nil && m.Host != nil { - return *m.Host - } - return "" -} - -func (m *CopyShardRequest) GetDatabase() string { - if m != nil && m.Database != nil { - return *m.Database - } - return "" -} - -func (m *CopyShardRequest) GetPolicy() string { - if m != nil && m.Policy != nil { - return *m.Policy - } - return "" -} - -func (m *CopyShardRequest) GetShardID() uint64 { - if m != nil && m.ShardID != nil { - return *m.ShardID - } - return 0 -} - -func (m *CopyShardRequest) GetSince() int64 { - if m != nil && m.Since != nil { - return *m.Since - } - return 0 -} - -type CopyShardResponse struct { - Err *string `protobuf:"bytes,2,opt,name=Err" json:"Err,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *CopyShardResponse) Reset() { *m = CopyShardResponse{} } -func (m *CopyShardResponse) String() string { return proto.CompactTextString(m) } -func (*CopyShardResponse) ProtoMessage() {} - -func (m *CopyShardResponse) GetErr() string { - if m != nil && m.Err != nil { - return *m.Err - } - return "" -} - -func init() { - proto.RegisterType((*WriteShardRequest)(nil), "cluster.WriteShardRequest") - proto.RegisterType((*WriteShardResponse)(nil), "cluster.WriteShardResponse") - proto.RegisterType((*ExecuteStatementRequest)(nil), "cluster.ExecuteStatementRequest") - proto.RegisterType((*ExecuteStatementResponse)(nil), "cluster.ExecuteStatementResponse") - proto.RegisterType((*CreateIteratorRequest)(nil), "cluster.CreateIteratorRequest") - proto.RegisterType((*CreateIteratorResponse)(nil), "cluster.CreateIteratorResponse") - proto.RegisterType((*IteratorStats)(nil), "cluster.IteratorStats") - proto.RegisterType((*FieldDimensionsRequest)(nil), "cluster.FieldDimensionsRequest") - proto.RegisterType((*FieldDimensionsResponse)(nil), "cluster.FieldDimensionsResponse") - proto.RegisterType((*SeriesKeysRequest)(nil), "cluster.SeriesKeysRequest") - proto.RegisterType((*SeriesKeysResponse)(nil), "cluster.SeriesKeysResponse") - proto.RegisterType((*ExpandSourcesRequest)(nil), "cluster.ExpandSourcesRequest") - proto.RegisterType((*ExpandSourcesResponse)(nil), "cluster.ExpandSourcesResponse") - proto.RegisterType((*RemoteMonitorRequest)(nil), "cluster.RemoteMonitorRequest") - proto.RegisterType((*RemoteMonitorResponse)(nil), "cluster.RemoteMonitorResponse") - proto.RegisterType((*BackupShardRequest)(nil), "cluster.BackupShardRequest") - proto.RegisterType((*BackupShardResponse)(nil), "cluster.BackupShardResponse") - proto.RegisterType((*CopyShardRequest)(nil), "cluster.CopyShardRequest") - proto.RegisterType((*CopyShardResponse)(nil), "cluster.CopyShardResponse") -} diff --git a/cluster/internal/data.proto b/cluster/internal/data.proto deleted file mode 100644 index 656ff70e3a..0000000000 --- a/cluster/internal/data.proto +++ /dev/null @@ -1,104 +0,0 @@ -package cluster; - -message WriteShardRequest { - required uint64 ShardID = 1; - repeated bytes Points = 2; - optional string Database = 3; - optional string RetentionPolicy = 4; -} - -message WriteShardResponse { - required int32 Code = 1; - optional string Message = 2; -} - -message ExecuteStatementRequest { - required string Statement = 1; - required string Database = 2; -} - -message ExecuteStatementResponse { - required int32 Code = 1; - optional string Message = 2; -} - -message CreateIteratorRequest { - repeated uint64 ShardIDs = 1; - required bytes Opt = 2; -} - -message CreateIteratorResponse { - optional string Err = 1; - required int32 Type = 2; - - optional IteratorStats Stats = 3; -} - -message IteratorStats { - optional int64 SeriesN = 1; - optional int64 PointN = 2; -} - -message FieldDimensionsRequest { - repeated uint64 ShardIDs = 1; - required bytes Sources = 2; -} - -message FieldDimensionsResponse { - repeated string Fields = 1; - repeated string Dimensions = 2; - optional string Err = 3; -} - -message SeriesKeysRequest { - repeated uint64 ShardIDs = 1; - required bytes Opt = 2; -} - -message SeriesKeysResponse { - optional bytes SeriesList = 1; - optional string Err = 2; -} - -message ExpandSourcesRequest { - repeated uint64 ShardIDs = 1; - required bytes Sources = 2; -} - -message ExpandSourcesResponse { - required bytes Sources = 1; - optional string Err = 2; -} - -message RemoteMonitorRequest { - repeated string RemoteAddrs = 1; - required string NodeID = 2; - required string Username = 3; - required string Password = 4; - required uint64 ClusterID = 5; -} - -message RemoteMonitorResponse { - optional string Err = 1; -} - -message BackupShardRequest { - required uint64 ShardID = 1; - optional int64 Since = 2; -} - -message BackupShardResponse { - optional string Err = 2; -} - -message CopyShardRequest { - required string Host = 1; - required string Database = 2; - required string Policy = 3; - required uint64 ShardID = 4; - optional int64 Since = 5; -} - -message CopyShardResponse { - optional string Err = 2; -} diff --git a/cluster/pool.go b/cluster/pool.go deleted file mode 100644 index 33182c704d..0000000000 --- a/cluster/pool.go +++ /dev/null @@ -1,189 +0,0 @@ -package cluster - -import ( - "errors" - "fmt" - "net" - "sync" - "sync/atomic" - "time" - - "gopkg.in/fatih/pool.v2" -) - -// boundedPool implements the Pool interface based on buffered channels. -type boundedPool struct { - // storage for our net.Conn connections - mu sync.Mutex - conns chan net.Conn - - timeout time.Duration - total int32 - // net.Conn generator - factory Factory -} - -// Factory is a function to create new connections. -type Factory func() (net.Conn, error) - -// NewBoundedPool returns a new pool based on buffered channels with an initial -// capacity, maximum capacity and timeout to wait for a connection from the pool. -// Factory is used when initial capacity is -// greater than zero to fill the pool. A zero initialCap doesn't fill the Pool -// until a new Get() is called. During a Get(), If there is no new connection -// available in the pool and total connections is less than the max, a new connection -// will be created via the Factory() method. Othewise, the call will block until -// a connection is available or the timeout is reached. -func NewBoundedPool(initialCap, maxCap int, timeout time.Duration, factory Factory) (pool.Pool, error) { - if initialCap < 0 || maxCap <= 0 || initialCap > maxCap { - return nil, errors.New("invalid capacity settings") - } - - c := &boundedPool{ - conns: make(chan net.Conn, maxCap), - factory: factory, - timeout: timeout, - } - - // create initial connections, if something goes wrong, - // just close the pool error out. - for i := 0; i < initialCap; i++ { - conn, err := factory() - if err != nil { - c.Close() - return nil, fmt.Errorf("factory is not able to fill the pool: %s", err) - } - c.conns <- conn - atomic.AddInt32(&c.total, 1) - } - - return c, nil -} - -func (c *boundedPool) getConns() chan net.Conn { - c.mu.Lock() - conns := c.conns - c.mu.Unlock() - return conns -} - -// Get implements the Pool interfaces Get() method. If there is no new -// connection available in the pool, a new connection will be created via the -// Factory() method. -func (c *boundedPool) Get() (net.Conn, error) { - conns := c.getConns() - if conns == nil { - return nil, pool.ErrClosed - } - - // Try and grab a connection from the pool - select { - case conn := <-conns: - if conn == nil { - return nil, pool.ErrClosed - } - return c.wrapConn(conn), nil - default: - // Could not get connection, can we create a new one? - if atomic.LoadInt32(&c.total) < int32(cap(conns)) { - conn, err := c.factory() - if err != nil { - return nil, err - } - atomic.AddInt32(&c.total, 1) - - return c.wrapConn(conn), nil - } - } - - // The pool was empty and we couldn't create a new one to - // retry until one is free or we timeout - select { - case conn := <-conns: - if conn == nil { - return nil, pool.ErrClosed - } - return c.wrapConn(conn), nil - case <-time.After(c.timeout): - return nil, fmt.Errorf("timed out waiting for free connection") - } - -} - -// put puts the connection back to the pool. If the pool is full or closed, -// conn is simply closed. A nil conn will be rejected. -func (c *boundedPool) put(conn net.Conn) error { - if conn == nil { - return errors.New("connection is nil. rejecting") - } - - c.mu.Lock() - defer c.mu.Unlock() - - if c.conns == nil { - // pool is closed, close passed connection - return conn.Close() - } - - // put the resource back into the pool. If the pool is full, this will - // block and the default case will be executed. - select { - case c.conns <- conn: - return nil - default: - // pool is full, close passed connection - atomic.AddInt32(&c.total, -1) - return conn.Close() - } -} - -func (c *boundedPool) Close() { - c.mu.Lock() - conns := c.conns - c.conns = nil - c.factory = nil - c.mu.Unlock() - - if conns == nil { - return - } - - close(conns) - for conn := range conns { - conn.Close() - } -} - -func (c *boundedPool) Len() int { return len(c.getConns()) } - -// newConn wraps a standard net.Conn to a poolConn net.Conn. -func (c *boundedPool) wrapConn(conn net.Conn) net.Conn { - p := &pooledConn{c: c} - p.Conn = conn - return p -} - -// pooledConn is a wrapper around net.Conn to modify the the behavior of -// net.Conn's Close() method. -type pooledConn struct { - net.Conn - c *boundedPool - unusable bool -} - -// Close() puts the given connects back to the pool instead of closing it. -func (p pooledConn) Close() error { - if p.unusable { - if p.Conn != nil { - return p.Conn.Close() - } - return nil - } - return p.c.put(p.Conn) -} - -// MarkUnusable() marks the connection not usable any more, to let the pool close it instead of returning it to pool. -func (p *pooledConn) MarkUnusable() { - p.unusable = true - atomic.AddInt32(&p.c.total, -1) -} diff --git a/cluster/rpc.go b/cluster/rpc.go deleted file mode 100644 index 6be04b09c0..0000000000 --- a/cluster/rpc.go +++ /dev/null @@ -1,644 +0,0 @@ -package cluster - -import ( - "errors" - "fmt" - "time" - - "github.com/gogo/protobuf/proto" - internal "github.com/influxdata/influxdb/cluster/internal" - "github.com/influxdata/influxdb/influxql" - "github.com/influxdata/influxdb/models" -) - -//go:generate protoc --gogo_out=. internal/data.proto - -// WritePointsRequest represents a request to write point data to the cluster -type WritePointsRequest struct { - Database string - RetentionPolicy string - Points []models.Point -} - -// AddPoint adds a point to the WritePointRequest with field key 'value' -func (w *WritePointsRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string) { - pt, err := models.NewPoint( - name, tags, map[string]interface{}{"value": value}, timestamp, - ) - if err != nil { - return - } - w.Points = append(w.Points, pt) -} - -// WriteShardRequest represents the a request to write a slice of points to a shard -type WriteShardRequest struct { - pb internal.WriteShardRequest -} - -// WriteShardResponse represents the response returned from a remote WriteShardRequest call -type WriteShardResponse struct { - pb internal.WriteShardResponse -} - -// SetShardID sets the ShardID -func (w *WriteShardRequest) SetShardID(id uint64) { w.pb.ShardID = &id } - -// ShardID gets the ShardID -func (w *WriteShardRequest) ShardID() uint64 { return w.pb.GetShardID() } - -func (w *WriteShardRequest) SetDatabase(db string) { w.pb.Database = &db } - -func (w *WriteShardRequest) SetRetentionPolicy(rp string) { w.pb.RetentionPolicy = &rp } - -func (w *WriteShardRequest) Database() string { return w.pb.GetDatabase() } - -func (w *WriteShardRequest) RetentionPolicy() string { return w.pb.GetRetentionPolicy() } - -// Points returns the time series Points -func (w *WriteShardRequest) Points() []models.Point { return w.unmarshalPoints() } - -// AddPoint adds a new time series point -func (w *WriteShardRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string) { - pt, err := models.NewPoint( - name, tags, map[string]interface{}{"value": value}, timestamp, - ) - if err != nil { - return - } - w.AddPoints([]models.Point{pt}) -} - -// AddPoints adds a new time series point -func (w *WriteShardRequest) AddPoints(points []models.Point) { - for _, p := range points { - b, err := p.MarshalBinary() - if err != nil { - // A error here means that we create a point higher in the stack that we could - // not marshal to a byte slice. If that happens, the endpoint that created that - // point needs to be fixed. - panic(fmt.Sprintf("failed to marshal point: `%v`: %v", p, err)) - } - w.pb.Points = append(w.pb.Points, b) - } -} - -// MarshalBinary encodes the object to a binary format. -func (w *WriteShardRequest) MarshalBinary() ([]byte, error) { - return proto.Marshal(&w.pb) -} - -// UnmarshalBinary populates WritePointRequest from a binary format. -func (w *WriteShardRequest) UnmarshalBinary(buf []byte) error { - if err := proto.Unmarshal(buf, &w.pb); err != nil { - return err - } - return nil -} - -func (w *WriteShardRequest) unmarshalPoints() []models.Point { - points := make([]models.Point, len(w.pb.GetPoints())) - for i, p := range w.pb.GetPoints() { - pt, err := models.NewPointFromBytes(p) - if err != nil { - // A error here means that one node created a valid point and sent us an - // unparseable version. We could log and drop the point and allow - // anti-entropy to resolve the discrepancy, but this shouldn't ever happen. - panic(fmt.Sprintf("failed to parse point: `%v`: %v", string(p), err)) - } - - points[i] = pt - } - return points -} - -// SetCode sets the Code -func (w *WriteShardResponse) SetCode(code int) { w.pb.Code = proto.Int32(int32(code)) } - -// SetMessage sets the Message -func (w *WriteShardResponse) SetMessage(message string) { w.pb.Message = &message } - -// Code returns the Code -func (w *WriteShardResponse) Code() int { return int(w.pb.GetCode()) } - -// Message returns the Message -func (w *WriteShardResponse) Message() string { return w.pb.GetMessage() } - -// MarshalBinary encodes the object to a binary format. -func (w *WriteShardResponse) MarshalBinary() ([]byte, error) { - return proto.Marshal(&w.pb) -} - -// UnmarshalBinary populates WritePointRequest from a binary format. -func (w *WriteShardResponse) UnmarshalBinary(buf []byte) error { - if err := proto.Unmarshal(buf, &w.pb); err != nil { - return err - } - return nil -} - -// ExecuteStatementRequest represents the a request to execute a statement on a node. -type ExecuteStatementRequest struct { - pb internal.ExecuteStatementRequest -} - -// Statement returns the InfluxQL statement. -func (r *ExecuteStatementRequest) Statement() string { return r.pb.GetStatement() } - -// SetStatement sets the InfluxQL statement. -func (r *ExecuteStatementRequest) SetStatement(statement string) { - r.pb.Statement = proto.String(statement) -} - -// Database returns the database name. -func (r *ExecuteStatementRequest) Database() string { return r.pb.GetDatabase() } - -// SetDatabase sets the database name. -func (r *ExecuteStatementRequest) SetDatabase(database string) { r.pb.Database = proto.String(database) } - -// MarshalBinary encodes the object to a binary format. -func (r *ExecuteStatementRequest) MarshalBinary() ([]byte, error) { - return proto.Marshal(&r.pb) -} - -// UnmarshalBinary populates ExecuteStatementRequest from a binary format. -func (r *ExecuteStatementRequest) UnmarshalBinary(buf []byte) error { - if err := proto.Unmarshal(buf, &r.pb); err != nil { - return err - } - return nil -} - -// ExecuteStatementResponse represents the response returned from a remote ExecuteStatementRequest call. -type ExecuteStatementResponse struct { - pb internal.WriteShardResponse -} - -// Code returns the response code. -func (w *ExecuteStatementResponse) Code() int { return int(w.pb.GetCode()) } - -// SetCode sets the Code -func (w *ExecuteStatementResponse) SetCode(code int) { w.pb.Code = proto.Int32(int32(code)) } - -// Message returns the repsonse message. -func (w *ExecuteStatementResponse) Message() string { return w.pb.GetMessage() } - -// SetMessage sets the Message -func (w *ExecuteStatementResponse) SetMessage(message string) { w.pb.Message = &message } - -// MarshalBinary encodes the object to a binary format. -func (w *ExecuteStatementResponse) MarshalBinary() ([]byte, error) { - return proto.Marshal(&w.pb) -} - -// UnmarshalBinary populates ExecuteStatementResponse from a binary format. -func (w *ExecuteStatementResponse) UnmarshalBinary(buf []byte) error { - if err := proto.Unmarshal(buf, &w.pb); err != nil { - return err - } - return nil -} - -// CreateIteratorRequest represents a request to create a remote iterator. -type CreateIteratorRequest struct { - ShardIDs []uint64 - Opt influxql.IteratorOptions -} - -// MarshalBinary encodes r to a binary format. -func (r *CreateIteratorRequest) MarshalBinary() ([]byte, error) { - buf, err := r.Opt.MarshalBinary() - if err != nil { - return nil, err - } - return proto.Marshal(&internal.CreateIteratorRequest{ - ShardIDs: r.ShardIDs, - Opt: buf, - }) -} - -// UnmarshalBinary decodes data into r. -func (r *CreateIteratorRequest) UnmarshalBinary(data []byte) error { - var pb internal.CreateIteratorRequest - if err := proto.Unmarshal(data, &pb); err != nil { - return err - } - - r.ShardIDs = pb.GetShardIDs() - if err := r.Opt.UnmarshalBinary(pb.GetOpt()); err != nil { - return err - } - return nil -} - -// CreateIteratorResponse represents a response from remote iterator creation. -type CreateIteratorResponse struct { - Err error - Type influxql.DataType - Stats influxql.IteratorStats -} - -// MarshalBinary encodes r to a binary format. -func (r *CreateIteratorResponse) MarshalBinary() ([]byte, error) { - var pb internal.CreateIteratorResponse - if r.Err != nil { - pb.Err = proto.String(r.Err.Error()) - } - pb.Type = proto.Int32(int32(r.Type)) - pb.Stats = &internal.IteratorStats{ - SeriesN: proto.Int64(int64(r.Stats.SeriesN)), - PointN: proto.Int64(int64(r.Stats.PointN)), - } - return proto.Marshal(&pb) -} - -// UnmarshalBinary decodes data into r. -func (r *CreateIteratorResponse) UnmarshalBinary(data []byte) error { - var pb internal.CreateIteratorResponse - if err := proto.Unmarshal(data, &pb); err != nil { - return err - } - if pb.Err != nil { - r.Err = errors.New(pb.GetErr()) - } - r.Type = influxql.DataType(pb.GetType()) - if stats := pb.GetStats(); stats != nil { - r.Stats.SeriesN = int(stats.GetSeriesN()) - r.Stats.PointN = int(stats.GetPointN()) - } - return nil -} - -// FieldDimensionsRequest represents a request to retrieve unique fields & dimensions. -type FieldDimensionsRequest struct { - ShardIDs []uint64 - Sources influxql.Sources -} - -// MarshalBinary encodes r to a binary format. -func (r *FieldDimensionsRequest) MarshalBinary() ([]byte, error) { - buf, err := r.Sources.MarshalBinary() - if err != nil { - return nil, err - } - return proto.Marshal(&internal.FieldDimensionsRequest{ - ShardIDs: r.ShardIDs, - Sources: buf, - }) -} - -// UnmarshalBinary decodes data into r. -func (r *FieldDimensionsRequest) UnmarshalBinary(data []byte) error { - var pb internal.FieldDimensionsRequest - if err := proto.Unmarshal(data, &pb); err != nil { - return err - } - - r.ShardIDs = pb.GetShardIDs() - if err := r.Sources.UnmarshalBinary(pb.GetSources()); err != nil { - return err - } - return nil -} - -// FieldDimensionsResponse represents a response from remote iterator creation. -type FieldDimensionsResponse struct { - Fields map[string]struct{} - Dimensions map[string]struct{} - Err error -} - -// MarshalBinary encodes r to a binary format. -func (r *FieldDimensionsResponse) MarshalBinary() ([]byte, error) { - var pb internal.FieldDimensionsResponse - - pb.Fields = make([]string, 0, len(r.Fields)) - for k := range r.Fields { - pb.Fields = append(pb.Fields, k) - } - - pb.Dimensions = make([]string, 0, len(r.Dimensions)) - for k := range r.Dimensions { - pb.Dimensions = append(pb.Dimensions, k) - } - - if r.Err != nil { - pb.Err = proto.String(r.Err.Error()) - } - return proto.Marshal(&pb) -} - -// UnmarshalBinary decodes data into r. -func (r *FieldDimensionsResponse) UnmarshalBinary(data []byte) error { - var pb internal.FieldDimensionsResponse - if err := proto.Unmarshal(data, &pb); err != nil { - return err - } - - r.Fields = make(map[string]struct{}, len(pb.GetFields())) - for _, s := range pb.GetFields() { - r.Fields[s] = struct{}{} - } - - r.Dimensions = make(map[string]struct{}, len(pb.GetDimensions())) - for _, s := range pb.GetDimensions() { - r.Dimensions[s] = struct{}{} - } - - if pb.Err != nil { - r.Err = errors.New(pb.GetErr()) - } - return nil -} - -// SeriesKeysRequest represents a request to retrieve a list of series keys. -type SeriesKeysRequest struct { - ShardIDs []uint64 - Opt influxql.IteratorOptions -} - -// MarshalBinary encodes r to a binary format. -func (r *SeriesKeysRequest) MarshalBinary() ([]byte, error) { - buf, err := r.Opt.MarshalBinary() - if err != nil { - return nil, err - } - return proto.Marshal(&internal.SeriesKeysRequest{ - ShardIDs: r.ShardIDs, - Opt: buf, - }) -} - -// UnmarshalBinary decodes data into r. -func (r *SeriesKeysRequest) UnmarshalBinary(data []byte) error { - var pb internal.SeriesKeysRequest - if err := proto.Unmarshal(data, &pb); err != nil { - return err - } - - r.ShardIDs = pb.GetShardIDs() - if err := r.Opt.UnmarshalBinary(pb.GetOpt()); err != nil { - return err - } - return nil -} - -// SeriesKeysResponse represents a response from retrieving series keys. -type SeriesKeysResponse struct { - SeriesList influxql.SeriesList - Err error -} - -// MarshalBinary encodes r to a binary format. -func (r *SeriesKeysResponse) MarshalBinary() ([]byte, error) { - var pb internal.SeriesKeysResponse - - buf, err := r.SeriesList.MarshalBinary() - if err != nil { - return nil, err - } - pb.SeriesList = buf - - if r.Err != nil { - pb.Err = proto.String(r.Err.Error()) - } - return proto.Marshal(&pb) -} - -// UnmarshalBinary decodes data into r. -func (r *SeriesKeysResponse) UnmarshalBinary(data []byte) error { - var pb internal.SeriesKeysResponse - if err := proto.Unmarshal(data, &pb); err != nil { - return err - } - - if err := r.SeriesList.UnmarshalBinary(pb.GetSeriesList()); err != nil { - return err - } - - if pb.Err != nil { - r.Err = errors.New(pb.GetErr()) - } - - return nil -} - -// ExpandSourcesRequest represents a request to expand regex sources. -type ExpandSourcesRequest struct { - ShardIDs []uint64 - Sources influxql.Sources -} - -// MarshalBinary encodes r to a binary format. -func (r *ExpandSourcesRequest) MarshalBinary() ([]byte, error) { - buf, err := r.Sources.MarshalBinary() - if err != nil { - return nil, err - } - return proto.Marshal(&internal.ExpandSourcesRequest{ - ShardIDs: r.ShardIDs, - Sources: buf, - }) -} - -// UnmarshalBinary decodes data into r. -func (r *ExpandSourcesRequest) UnmarshalBinary(data []byte) error { - var pb internal.ExpandSourcesRequest - if err := proto.Unmarshal(data, &pb); err != nil { - return err - } - - r.ShardIDs = pb.GetShardIDs() - if err := r.Sources.UnmarshalBinary(pb.GetSources()); err != nil { - return err - } - return nil -} - -// ExpandSourcesResponse represents a response from source expansion. -type ExpandSourcesResponse struct { - Sources influxql.Sources - Err error -} - -// MarshalBinary encodes r to a binary format. -func (r *ExpandSourcesResponse) MarshalBinary() ([]byte, error) { - var pb internal.ExpandSourcesResponse - buf, err := r.Sources.MarshalBinary() - if err != nil { - return nil, err - } - pb.Sources = buf - - if r.Err != nil { - pb.Err = proto.String(r.Err.Error()) - } - return proto.Marshal(&pb) -} - -// UnmarshalBinary decodes data into r. -func (r *ExpandSourcesResponse) UnmarshalBinary(data []byte) error { - var pb internal.ExpandSourcesResponse - if err := proto.Unmarshal(data, &pb); err != nil { - return err - } - if err := r.Sources.UnmarshalBinary(pb.GetSources()); err != nil { - return err - } - - if pb.Err != nil { - r.Err = errors.New(pb.GetErr()) - } - return nil -} - -// RemoteMonitorRequest represents a request to configure a -// monitor.Monitor to write to a remote database. -type RemoteMonitorRequest struct { - pb internal.RemoteMonitorRequest -} - -func (m *RemoteMonitorRequest) SetRemoteAddrs(s []string) { - m.pb.RemoteAddrs = s -} - -func (m *RemoteMonitorRequest) SetNodeID(s string) { - m.pb.NodeID = &s -} - -func (m *RemoteMonitorRequest) SetUsername(s string) { - m.pb.Username = &s -} - -func (m *RemoteMonitorRequest) SetPassword(s string) { - m.pb.Password = &s -} - -func (m *RemoteMonitorRequest) SetClusterID(v uint64) { - m.pb.ClusterID = &v -} - -// MarshalBinary encodes the object to a binary format. -func (r *RemoteMonitorRequest) MarshalBinary() ([]byte, error) { - return proto.Marshal(&r.pb) -} - -// UnmarshalBinary populates WritePointRequest from a binary format. -func (r *RemoteMonitorRequest) UnmarshalBinary(buf []byte) error { - if err := proto.Unmarshal(buf, &r.pb); err != nil { - return err - } - return nil -} - -// RemoteMonitorResponse represents a response from source expansion. -type RemoteMonitorResponse struct { - Err error -} - -// MarshalBinary encodes r to a binary format. -func (r *RemoteMonitorResponse) MarshalBinary() ([]byte, error) { - var pb internal.RemoteMonitorResponse - if r.Err != nil { - pb.Err = proto.String(r.Err.Error()) - } - return proto.Marshal(&pb) -} - -// UnmarshalBinary decodes data into r. -func (r *RemoteMonitorResponse) UnmarshalBinary(data []byte) error { - var pb internal.RemoteMonitorResponse - if err := proto.Unmarshal(data, &pb); err != nil { - return err - } - if pb.Err != nil { - r.Err = errors.New(pb.GetErr()) - } - return nil -} - -// BackupShardRequest represents a request to stream a backup of a single shard. -type BackupShardRequest struct { - ShardID uint64 - Since time.Time -} - -// MarshalBinary encodes r to a binary format. -func (r *BackupShardRequest) MarshalBinary() ([]byte, error) { - return proto.Marshal(&internal.BackupShardRequest{ - ShardID: proto.Uint64(r.ShardID), - Since: proto.Int64(r.Since.UnixNano()), - }) -} - -// UnmarshalBinary decodes data into r. -func (r *BackupShardRequest) UnmarshalBinary(data []byte) error { - var pb internal.BackupShardRequest - if err := proto.Unmarshal(data, &pb); err != nil { - return err - } - - r.ShardID = pb.GetShardID() - r.Since = time.Unix(0, pb.GetSince()) - return nil -} - -// CopyShardRequest represents a request to copy a shard from another host. -type CopyShardRequest struct { - Host string - Database string - Policy string - ShardID uint64 - Since time.Time -} - -// MarshalBinary encodes r to a binary format. -func (r *CopyShardRequest) MarshalBinary() ([]byte, error) { - return proto.Marshal(&internal.CopyShardRequest{ - Host: proto.String(r.Host), - Database: proto.String(r.Database), - Policy: proto.String(r.Policy), - ShardID: proto.Uint64(r.ShardID), - Since: proto.Int64(r.Since.UnixNano()), - }) -} - -// UnmarshalBinary decodes data into r. -func (r *CopyShardRequest) UnmarshalBinary(data []byte) error { - var pb internal.CopyShardRequest - if err := proto.Unmarshal(data, &pb); err != nil { - return err - } - - r.Host = pb.GetHost() - r.Database = pb.GetDatabase() - r.Policy = pb.GetPolicy() - r.ShardID = pb.GetShardID() - r.Since = time.Unix(0, pb.GetSince()) - return nil -} - -// CopyShardResponse represents a response from a shard Copy. -type CopyShardResponse struct { - Err error -} - -func (r *CopyShardResponse) MarshalBinary() ([]byte, error) { - var pb internal.CopyShardResponse - if r.Err != nil { - pb.Err = proto.String(r.Err.Error()) - } - return proto.Marshal(&pb) -} - -func (r *CopyShardResponse) UnmarshalBinary(data []byte) error { - var pb internal.CopyShardResponse - - if err := proto.Unmarshal(data, &pb); err != nil { - return err - } - if pb.Err != nil { - r.Err = errors.New(pb.GetErr()) - } - return nil -} diff --git a/cluster/rpc_test.go b/cluster/rpc_test.go deleted file mode 100644 index 54393edb46..0000000000 --- a/cluster/rpc_test.go +++ /dev/null @@ -1,139 +0,0 @@ -package cluster - -import ( - "errors" - "reflect" - "testing" - "time" - - "github.com/davecgh/go-spew/spew" - "github.com/influxdata/influxdb/influxql" -) - -func TestWriteShardRequestBinary(t *testing.T) { - sr := &WriteShardRequest{} - - sr.SetShardID(uint64(1)) - if exp := uint64(1); sr.ShardID() != exp { - t.Fatalf("ShardID mismatch: got %v, exp %v", sr.ShardID(), exp) - } - - sr.AddPoint("cpu", 1.0, time.Unix(0, 0), map[string]string{"host": "serverA"}) - sr.AddPoint("cpu", 2.0, time.Unix(0, 0).Add(time.Hour), nil) - sr.AddPoint("cpu_load", 3.0, time.Unix(0, 0).Add(time.Hour+time.Second), nil) - - b, err := sr.MarshalBinary() - if err != nil { - t.Fatalf("WritePointsRequest.MarshalBinary() failed: %v", err) - } - if len(b) == 0 { - t.Fatalf("WritePointsRequest.MarshalBinary() returned 0 bytes") - } - - got := &WriteShardRequest{} - if err := got.UnmarshalBinary(b); err != nil { - t.Fatalf("WritePointsRequest.UnmarshalMarshalBinary() failed: %v", err) - } - - if got.ShardID() != sr.ShardID() { - t.Errorf("ShardID mismatch: got %v, exp %v", got.ShardID(), sr.ShardID()) - } - - if len(got.Points()) != len(sr.Points()) { - t.Errorf("Points count mismatch: got %v, exp %v", len(got.Points()), len(sr.Points())) - } - - srPoints := sr.Points() - gotPoints := got.Points() - for i, p := range srPoints { - g := gotPoints[i] - - if g.Name() != p.Name() { - t.Errorf("Point %d name mismatch: got %v, exp %v", i, g.Name(), p.Name()) - } - - if !g.Time().Equal(p.Time()) { - t.Errorf("Point %d time mismatch: got %v, exp %v", i, g.Time(), p.Time()) - } - - if g.HashID() != p.HashID() { - t.Errorf("Point #%d HashID() mismatch: got %v, exp %v", i, g.HashID(), p.HashID()) - } - - for k, v := range p.Tags() { - if g.Tags()[k] != v { - t.Errorf("Point #%d tag mismatch: got %v, exp %v", i, k, v) - } - } - - if len(p.Fields()) != len(g.Fields()) { - t.Errorf("Point %d field count mismatch: got %v, exp %v", i, len(g.Fields()), len(p.Fields())) - } - - for j, f := range p.Fields() { - if g.Fields()[j] != f { - t.Errorf("Point %d field mismatch: got %v, exp %v", i, g.Fields()[j], f) - } - } - } -} - -func TestWriteShardResponseBinary(t *testing.T) { - sr := &WriteShardResponse{} - sr.SetCode(10) - sr.SetMessage("foo") - b, err := sr.MarshalBinary() - - if exp := 10; sr.Code() != exp { - t.Fatalf("Code mismatch: got %v, exp %v", sr.Code(), exp) - } - - if exp := "foo"; sr.Message() != exp { - t.Fatalf("Message mismatch: got %v, exp %v", sr.Message(), exp) - } - - if err != nil { - t.Fatalf("WritePointsResponse.MarshalBinary() failed: %v", err) - } - if len(b) == 0 { - t.Fatalf("WritePointsResponse.MarshalBinary() returned 0 bytes") - } - - got := &WriteShardResponse{} - if err := got.UnmarshalBinary(b); err != nil { - t.Fatalf("WritePointsResponse.UnmarshalMarshalBinary() failed: %v", err) - } - - if got.Code() != sr.Code() { - t.Errorf("Code mismatch: got %v, exp %v", got.Code(), sr.Code()) - } - - if got.Message() != sr.Message() { - t.Errorf("Message mismatch: got %v, exp %v", got.Message(), sr.Message()) - } - -} - -// Ensure series list response can be marshaled into and out of a binary format. -func TestSeriesKeysResponse_MarshalBinary(t *testing.T) { - resp := &SeriesKeysResponse{ - SeriesList: []influxql.Series{ - {Name: "cpu", Aux: []influxql.DataType{influxql.Float}}, - }, - Err: errors.New("marker"), - } - - // Marshal to binary. - buf, err := resp.MarshalBinary() - if err != nil { - t.Fatal(err) - } - - // Unmarshal back to an object. - var other SeriesKeysResponse - if err := other.UnmarshalBinary(buf); err != nil { - t.Fatal(err) - } else if !reflect.DeepEqual(&other, resp) { - t.Fatalf("unexpected response: %s", spew.Sdump(other)) - } -} diff --git a/cluster/service.go b/cluster/service.go deleted file mode 100644 index 8cdc5e6c7a..0000000000 --- a/cluster/service.go +++ /dev/null @@ -1,843 +0,0 @@ -package cluster - -import ( - "encoding" - "encoding/binary" - "errors" - "expvar" - "fmt" - "io" - "log" - "net" - "os" - "strings" - "sync" - "time" - - "github.com/influxdata/influxdb/monitor" - - "github.com/influxdata/influxdb" - "github.com/influxdata/influxdb/influxql" - "github.com/influxdata/influxdb/tsdb" -) - -// MaxMessageSize defines how large a message can be before we reject it -const MaxMessageSize = 1024 * 1024 * 1024 // 1GB - -// MuxHeader is the header byte used in the TCP mux. -const MuxHeader = 2 - -// Statistics maintained by the cluster package -const ( - writeShardReq = "writeShardReq" - writeShardPointsReq = "writeShardPointsReq" - writeShardFail = "writeShardFail" - - createIteratorReq = "createIteratorReq" - createIteratorResp = "createIteratorResp" - - fieldDimensionsReq = "fieldDimensionsReq" - fieldDimensionsResp = "fieldDimensionsResp" - - seriesKeysReq = "seriesKeysReq" - seriesKeysResp = "seriesKeysResp" - - expandSourcesReq = "expandSourcesReq" - expandSourcesReqexpandSourcesResp = "expandSourcesResp" - - backupShardReq = "backupShardReq" - backupShardReqbackupShardResp = "backupShardResp" - - copyShardReq = "copyShardReq" - copyShardResp = "copyShardResp" -) - -const ( - writeShardRequestMessage byte = iota + 1 - writeShardResponseMessage - - executeStatementRequestMessage - executeStatementResponseMessage - - createIteratorRequestMessage - createIteratorResponseMessage - - fieldDimensionsRequestMessage - fieldDimensionsResponseMessage - - seriesKeysRequestMessage - seriesKeysResponseMessage - - remoteMonitorRequestMessage - remoteMonitorResponseMessage - - expandSourcesRequestMessage - expandSourcesResponseMessage - - backupShardRequestMessage - backupShardResponseMessage - - copyShardRequestMessage - copyShardResponseMessage -) - -// BackupTimeout is the time before a connection times out when performing a backup. -const BackupTimeout = 30 * time.Second - -// Service processes data received over raw TCP connections. -type Service struct { - mu sync.RWMutex - - wg sync.WaitGroup - closing chan struct{} - - Listener net.Listener - - TSDBStore TSDBStore - Monitor *monitor.Monitor - - Logger *log.Logger - statMap *expvar.Map -} - -// NewService returns a new instance of Service. -func NewService(c Config) *Service { - return &Service{ - closing: make(chan struct{}), - Logger: log.New(os.Stderr, "[cluster] ", log.LstdFlags), - statMap: influxdb.NewStatistics("cluster", "cluster", nil), - } -} - -// Open opens the network listener and begins serving requests. -func (s *Service) Open() error { - - s.Logger.Println("Starting cluster service") - // Begin serving conections. - s.wg.Add(1) - go s.serve() - - return nil -} - -// SetLogOutput sets the writer to which all logs are written. It must not be -// called after Open is called. -func (s *Service) SetLogOutput(w io.Writer) { - s.Logger = log.New(w, "[cluster] ", log.LstdFlags) -} - -// serve accepts connections from the listener and handles them. -func (s *Service) serve() { - defer s.wg.Done() - - for { - // Check if the service is shutting down. - select { - case <-s.closing: - return - default: - } - - // Accept the next connection. - conn, err := s.Listener.Accept() - if err != nil { - if strings.Contains(err.Error(), "connection closed") { - s.Logger.Printf("cluster service accept error: %s", err) - return - } - s.Logger.Printf("accept error: %s", err) - continue - } - - // Delegate connection handling to a separate goroutine. - s.wg.Add(1) - go func() { - defer s.wg.Done() - s.handleConn(conn) - }() - } -} - -// Close shuts down the listener and waits for all connections to finish. -func (s *Service) Close() error { - if s.Listener != nil { - s.Listener.Close() - } - - // Shut down all handlers. - close(s.closing) - s.wg.Wait() - - return nil -} - -// handleConn services an individual TCP connection. -func (s *Service) handleConn(conn net.Conn) { - // Ensure connection is closed when service is closed. - closing := make(chan struct{}) - defer close(closing) - go func() { - select { - case <-closing: - case <-s.closing: - } - conn.Close() - }() - - s.Logger.Printf("accept remote connection from %v\n", conn.RemoteAddr()) - defer func() { - s.Logger.Printf("close remote connection from %v\n", conn.RemoteAddr()) - }() - for { - // Read type-length-value. - typ, err := ReadType(conn) - if err != nil { - if strings.HasSuffix(err.Error(), "EOF") { - return - } - s.Logger.Printf("unable to read type: %s", err) - return - } - - // Delegate message processing by type. - switch typ { - case writeShardRequestMessage: - buf, err := ReadLV(conn) - if err != nil { - s.Logger.Printf("unable to read length-value: %s", err) - return - } - - s.statMap.Add(writeShardReq, 1) - err = s.processWriteShardRequest(buf) - if err != nil { - s.Logger.Printf("process write shard error: %s", err) - } - s.writeShardResponse(conn, err) - case executeStatementRequestMessage: - buf, err := ReadLV(conn) - if err != nil { - s.Logger.Printf("unable to read length-value: %s", err) - return - } - - err = s.processExecuteStatementRequest(buf) - if err != nil { - s.Logger.Printf("process execute statement error: %s", err) - } - s.writeShardResponse(conn, err) - case createIteratorRequestMessage: - s.statMap.Add(createIteratorReq, 1) - s.processCreateIteratorRequest(conn) - return - case fieldDimensionsRequestMessage: - s.statMap.Add(fieldDimensionsReq, 1) - s.processFieldDimensionsRequest(conn) - return - case seriesKeysRequestMessage: - s.statMap.Add(seriesKeysReq, 1) - s.processSeriesKeysRequest(conn) - return - case remoteMonitorRequestMessage: - buf, err := ReadLV(conn) - if err != nil { - s.Logger.Printf("unable to read length-value: %s", err) - return - } - - if err = s.processRemoteMonitorRequest(buf); err != nil { - s.Logger.Printf("process write shard error: %s", err) - } - s.writeRemoteMonitorResponse(conn, err) - case expandSourcesRequestMessage: - s.statMap.Add(expandSourcesReq, 1) - s.processExpandSourcesRequest(conn) - return - case backupShardRequestMessage: - s.statMap.Add(backupShardReq, 1) - s.processBackupShardRequest(conn) - return - case copyShardRequestMessage: - s.statMap.Add(copyShardReq, 1) - s.processCopyShardRequest(conn) - return - default: - s.Logger.Printf("cluster service message type not found: %d", typ) - } - } -} - -func (s *Service) processExecuteStatementRequest(buf []byte) error { - // Unmarshal the request. - var req ExecuteStatementRequest - if err := req.UnmarshalBinary(buf); err != nil { - return err - } - - // Parse the InfluxQL statement. - stmt, err := influxql.ParseStatement(req.Statement()) - if err != nil { - return err - } - - return s.executeStatement(stmt, req.Database()) -} - -func (s *Service) executeStatement(stmt influxql.Statement, database string) error { - switch t := stmt.(type) { - case *influxql.DeleteSeriesStatement: - return s.TSDBStore.DeleteSeries(database, t.Sources, t.Condition) - case *influxql.DropDatabaseStatement: - return s.TSDBStore.DeleteDatabase(t.Name) - case *influxql.DropMeasurementStatement: - return s.TSDBStore.DeleteMeasurement(database, t.Name) - case *influxql.DropSeriesStatement: - return s.TSDBStore.DeleteSeries(database, t.Sources, t.Condition) - case *influxql.DropRetentionPolicyStatement: - return s.TSDBStore.DeleteRetentionPolicy(database, t.Name) - case *influxql.DropShardStatement: - return s.TSDBStore.DeleteShard(t.ID) - default: - return fmt.Errorf("%q should not be executed across a cluster", stmt.String()) - } -} - -func (s *Service) processWriteShardRequest(buf []byte) error { - // Build request - var req WriteShardRequest - if err := req.UnmarshalBinary(buf); err != nil { - return err - } - - points := req.Points() - s.statMap.Add(writeShardPointsReq, int64(len(points))) - err := s.TSDBStore.WriteToShard(req.ShardID(), points) - - // We may have received a write for a shard that we don't have locally because the - // sending node may have just created the shard (via the metastore) and the write - // arrived before the local store could create the shard. In this case, we need - // to check the metastore to determine what database and retention policy this - // shard should reside within. - if err == tsdb.ErrShardNotFound { - db, rp := req.Database(), req.RetentionPolicy() - if db == "" || rp == "" { - s.Logger.Printf("drop write request: shard=%d. no database or rentention policy received", req.ShardID()) - return nil - } - - err = s.TSDBStore.CreateShard(req.Database(), req.RetentionPolicy(), req.ShardID()) - if err != nil { - s.statMap.Add(writeShardFail, 1) - return fmt.Errorf("create shard %d: %s", req.ShardID(), err) - } - - err = s.TSDBStore.WriteToShard(req.ShardID(), points) - if err != nil { - s.statMap.Add(writeShardFail, 1) - return fmt.Errorf("write shard %d: %s", req.ShardID(), err) - } - } - - if err != nil { - s.statMap.Add(writeShardFail, 1) - return fmt.Errorf("write shard %d: %s", req.ShardID(), err) - } - - return nil -} - -func (s *Service) writeShardResponse(w io.Writer, e error) { - // Build response. - var resp WriteShardResponse - if e != nil { - resp.SetCode(1) - resp.SetMessage(e.Error()) - } else { - resp.SetCode(0) - } - - // Marshal response to binary. - buf, err := resp.MarshalBinary() - if err != nil { - s.Logger.Printf("error marshalling shard response: %s", err) - return - } - - // Write to connection. - if err := WriteTLV(w, writeShardResponseMessage, buf); err != nil { - s.Logger.Printf("write shard response error: %s", err) - } -} - -func (s *Service) processCreateIteratorRequest(conn net.Conn) { - defer conn.Close() - - var itr influxql.Iterator - if err := func() error { - // Parse request. - var req CreateIteratorRequest - if err := DecodeLV(conn, &req); err != nil { - return err - } - - sh, ok := s.TSDBStore.(ShardIteratorCreator) - if !ok { - return errors.New("unable to access a specific shard with this tsdb store") - } - - // Collect iterator creators for each shard. - ics := make([]influxql.IteratorCreator, 0, len(req.ShardIDs)) - for _, shardID := range req.ShardIDs { - ic := sh.ShardIteratorCreator(shardID) - if ic == nil { - continue - } - ics = append(ics, ic) - } - - // Return immediately if there are no iterator creators. - if len(ics) == 0 { - return nil - } - - // Generate a single iterator from all shards. - i, err := influxql.IteratorCreators(ics).CreateIterator(req.Opt) - if err != nil { - return err - } - itr = i - - return nil - }(); err != nil { - s.Logger.Printf("error reading CreateIterator request: %s", err) - EncodeTLV(conn, createIteratorResponseMessage, &CreateIteratorResponse{Err: err}) - return - } - - resp := CreateIteratorResponse{} - if itr != nil { - switch itr.(type) { - case influxql.FloatIterator: - resp.Type = influxql.Float - case influxql.IntegerIterator: - resp.Type = influxql.Integer - case influxql.StringIterator: - resp.Type = influxql.String - case influxql.BooleanIterator: - resp.Type = influxql.Boolean - } - resp.Stats = itr.Stats() - } - - // Encode success response. - if err := EncodeTLV(conn, createIteratorResponseMessage, &resp); err != nil { - s.Logger.Printf("error writing CreateIterator response: %s", err) - return - } - - // Exit if no iterator was produced. - if itr == nil { - return - } - - // Stream iterator to connection. - if err := influxql.NewIteratorEncoder(conn).EncodeIterator(itr); err != nil { - s.Logger.Printf("error encoding CreateIterator iterator: %s", err) - return - } -} - -func (s *Service) processFieldDimensionsRequest(conn net.Conn) { - var fields, dimensions map[string]struct{} - if err := func() error { - // Parse request. - var req FieldDimensionsRequest - if err := DecodeLV(conn, &req); err != nil { - return err - } - - sh, ok := s.TSDBStore.(ShardIteratorCreator) - if !ok { - return errors.New("unable to access a specific shard with this tsdb store") - } - - // Collect iterator creators for each shard. - ics := make([]influxql.IteratorCreator, 0, len(req.ShardIDs)) - for _, shardID := range req.ShardIDs { - ic := sh.ShardIteratorCreator(shardID) - if ic == nil { - return nil - } - ics = append(ics, ic) - } - - // Generate a single iterator from all shards. - f, d, err := influxql.IteratorCreators(ics).FieldDimensions(req.Sources) - if err != nil { - return err - } - fields, dimensions = f, d - - return nil - }(); err != nil { - s.Logger.Printf("error reading FieldDimensions request: %s", err) - EncodeTLV(conn, fieldDimensionsResponseMessage, &FieldDimensionsResponse{Err: err}) - return - } - - // Encode success response. - if err := EncodeTLV(conn, fieldDimensionsResponseMessage, &FieldDimensionsResponse{ - Fields: fields, - Dimensions: dimensions, - }); err != nil { - s.Logger.Printf("error writing FieldDimensions response: %s", err) - return - } -} - -func (s *Service) processSeriesKeysRequest(conn net.Conn) { - var seriesList influxql.SeriesList - if err := func() error { - // Parse request. - var req SeriesKeysRequest - if err := DecodeLV(conn, &req); err != nil { - return err - } - - sh, ok := s.TSDBStore.(ShardIteratorCreator) - if !ok { - return errors.New("unable to access a specific shard with this tsdb store") - } - - // Collect iterator creators for each shard. - ics := make([]influxql.IteratorCreator, 0, len(req.ShardIDs)) - for _, shardID := range req.ShardIDs { - ic := sh.ShardIteratorCreator(shardID) - if ic == nil { - return nil - } - ics = append(ics, ic) - } - - // Generate a single iterator from all shards. - a, err := influxql.IteratorCreators(ics).SeriesKeys(req.Opt) - if err != nil { - return err - } - seriesList = a - - return nil - }(); err != nil { - s.Logger.Printf("error reading SeriesKeys request: %s", err) - EncodeTLV(conn, seriesKeysResponseMessage, &SeriesKeysResponse{Err: err}) - return - } - - // Encode success response. - if err := EncodeTLV(conn, seriesKeysResponseMessage, &SeriesKeysResponse{ - SeriesList: seriesList, - }); err != nil { - s.Logger.Printf("error writing SeriesKeys response: %s", err) - return - } -} - -func (s *Service) processRemoteMonitorRequest(buf []byte) error { - // Unmarshal the request. - var req RemoteMonitorRequest - if err := req.UnmarshalBinary(buf); err != nil { - return err - } - - // Process the request - var remoteAddr string - if len(req.pb.GetRemoteAddrs()) > 0 { - remoteAddr = req.pb.GetRemoteAddrs()[0] - } - return s.Monitor.SetRemoteWriter(monitor.RemoteWriterConfig{ - RemoteAddr: remoteAddr, - NodeID: req.pb.GetNodeID(), - Username: req.pb.GetUsername(), - Password: req.pb.GetPassword(), - ClusterID: req.pb.GetClusterID(), - }) -} - -func (s *Service) writeRemoteMonitorResponse(w io.Writer, e error) { - // Build response. - var resp RemoteMonitorResponse - if e != nil { - resp.Err = e - } - - // Marshal response to binary. - buf, err := resp.MarshalBinary() - if err != nil { - s.Logger.Printf("error marshalling remote monitor response: %s", err) - return - } - - // Write to connection. - if err := WriteTLV(w, remoteMonitorResponseMessage, buf); err != nil { - s.Logger.Printf("write remote monitor response error: %s", err) - } -} - -func (s *Service) processExpandSourcesRequest(conn net.Conn) { - var sources influxql.Sources - if err := func() error { - // Parse request. - var req ExpandSourcesRequest - if err := DecodeLV(conn, &req); err != nil { - return err - } - - // Collect iterator creators for each shard. - ics := make([]influxql.IteratorCreator, 0, len(req.ShardIDs)) - for _, shardID := range req.ShardIDs { - ic := s.TSDBStore.ShardIteratorCreator(shardID) - if ic == nil { - return nil - } - ics = append(ics, ic) - } - - // Expand sources from all shards. - a, err := influxql.IteratorCreators(ics).ExpandSources(req.Sources) - if err != nil { - return err - } - sources = a - - return nil - }(); err != nil { - s.Logger.Printf("error reading ExpandSources request: %s", err) - EncodeTLV(conn, expandSourcesResponseMessage, &ExpandSourcesResponse{Err: err}) - return - } - - // Encode success response. - if err := EncodeTLV(conn, expandSourcesResponseMessage, &ExpandSourcesResponse{ - Sources: sources, - }); err != nil { - s.Logger.Printf("error writing ExpandSources response: %s", err) - return - } -} - -func (s *Service) processBackupShardRequest(conn net.Conn) { - if err := func() error { - // Parse request. - var req BackupShardRequest - if err := DecodeLV(conn, &req); err != nil { - return err - } - - // Backup from local shard to the connection. - if err := s.TSDBStore.BackupShard(req.ShardID, req.Since, conn); err != nil { - return err - } - - return nil - }(); err != nil { - s.Logger.Printf("error processing BackupShardRequest: %s", err) - return - } -} - -func (s *Service) processCopyShardRequest(conn net.Conn) { - if err := func() error { - // Parse request. - var req CopyShardRequest - if err := DecodeLV(conn, &req); err != nil { - return err - } - - // Begin streaming backup from remote server. - r, err := s.backupRemoteShard(req.Host, req.ShardID, req.Since) - if err != nil { - return err - } - defer r.Close() - - // Create shard if it doesn't exist. - if err := s.TSDBStore.CreateShard(req.Database, req.Policy, req.ShardID); err != nil { - return err - } - - // Restore to local shard. - if err := s.TSDBStore.RestoreShard(req.ShardID, r); err != nil { - return err - } - - return nil - }(); err != nil { - s.Logger.Printf("error reading CopyShard request: %s", err) - EncodeTLV(conn, copyShardResponseMessage, &CopyShardResponse{Err: err}) - return - } - - // Encode success response. - if err := EncodeTLV(conn, copyShardResponseMessage, &CopyShardResponse{}); err != nil { - s.Logger.Printf("error writing CopyShard response: %s", err) - return - } -} - -// backupRemoteShard connects to a cluster service on a remote host and streams a shard. -func (s *Service) backupRemoteShard(host string, shardID uint64, since time.Time) (io.ReadCloser, error) { - conn, err := net.Dial("tcp", host) - if err != nil { - return nil, err - } - conn.SetDeadline(time.Now().Add(BackupTimeout)) - - if err := func() error { - // Write the cluster multiplexing header byte - if _, err := conn.Write([]byte{MuxHeader}); err != nil { - return err - } - - // Write backup request. - if err := EncodeTLV(conn, backupShardResponseMessage, &BackupShardRequest{ - ShardID: shardID, - Since: since, - }); err != nil { - return fmt.Errorf("error writing BackupShardRequest: %s", err) - } - - return nil - }(); err != nil { - conn.Close() - return nil, err - } - - // Return the connection which will stream the rest of the backup. - return conn, nil -} - -// ReadTLV reads a type-length-value record from r. -func ReadTLV(r io.Reader) (byte, []byte, error) { - typ, err := ReadType(r) - if err != nil { - return 0, nil, err - } - - buf, err := ReadLV(r) - if err != nil { - return 0, nil, err - } - return typ, buf, err -} - -// ReadType reads the type from a TLV record. -func ReadType(r io.Reader) (byte, error) { - var typ [1]byte - if _, err := io.ReadFull(r, typ[:]); err != nil { - return 0, fmt.Errorf("read message type: %s", err) - } - return typ[0], nil -} - -// ReadLV reads the length-value from a TLV record. -func ReadLV(r io.Reader) ([]byte, error) { - // Read the size of the message. - var sz int64 - if err := binary.Read(r, binary.BigEndian, &sz); err != nil { - return nil, fmt.Errorf("read message size: %s", err) - } - - if sz >= MaxMessageSize { - return nil, fmt.Errorf("max message size of %d exceeded: %d", MaxMessageSize, sz) - } - - // Read the value. - buf := make([]byte, sz) - if _, err := io.ReadFull(r, buf); err != nil { - return nil, fmt.Errorf("read message value: %s", err) - } - - return buf, nil -} - -// WriteTLV writes a type-length-value record to w. -func WriteTLV(w io.Writer, typ byte, buf []byte) error { - if err := WriteType(w, typ); err != nil { - return err - } - if err := WriteLV(w, buf); err != nil { - return err - } - return nil -} - -// WriteType writes the type in a TLV record to w. -func WriteType(w io.Writer, typ byte) error { - if _, err := w.Write([]byte{typ}); err != nil { - return fmt.Errorf("write message type: %s", err) - } - return nil -} - -// WriteLV writes the length-value in a TLV record to w. -func WriteLV(w io.Writer, buf []byte) error { - // Write the size of the message. - if err := binary.Write(w, binary.BigEndian, int64(len(buf))); err != nil { - return fmt.Errorf("write message size: %s", err) - } - - // Write the value. - if _, err := w.Write(buf); err != nil { - return fmt.Errorf("write message value: %s", err) - } - return nil -} - -// EncodeTLV encodes v to a binary format and writes the record-length-value record to w. -func EncodeTLV(w io.Writer, typ byte, v encoding.BinaryMarshaler) error { - if err := WriteType(w, typ); err != nil { - return err - } - if err := EncodeLV(w, v); err != nil { - return err - } - return nil -} - -// EncodeLV encodes v to a binary format and writes the length-value record to w. -func EncodeLV(w io.Writer, v encoding.BinaryMarshaler) error { - buf, err := v.MarshalBinary() - if err != nil { - return err - } - - if err := WriteLV(w, buf); err != nil { - return err - } - return nil -} - -// DecodeTLV reads the type-length-value record from r and unmarshals it into v. -func DecodeTLV(r io.Reader, v encoding.BinaryUnmarshaler) (typ byte, err error) { - typ, err = ReadType(r) - if err != nil { - return 0, err - } - if err := DecodeLV(r, v); err != nil { - return 0, err - } - return typ, nil -} - -// DecodeLV reads the length-value record from r and unmarshals it into v. -func DecodeLV(r io.Reader, v encoding.BinaryUnmarshaler) error { - buf, err := ReadLV(r) - if err != nil { - return err - } - - if err := v.UnmarshalBinary(buf); err != nil { - return err - } - return nil -} diff --git a/cluster/service_test.go b/cluster/service_test.go deleted file mode 100644 index 743632ba17..0000000000 --- a/cluster/service_test.go +++ /dev/null @@ -1,174 +0,0 @@ -package cluster_test - -import ( - "fmt" - "io" - "net" - "time" - - "github.com/influxdata/influxdb/cluster" - "github.com/influxdata/influxdb/models" - "github.com/influxdata/influxdb/services/meta" - "github.com/influxdata/influxdb/tcp" -) - -type metaClient struct { - host string -} - -func (m *metaClient) DataNode(nodeID uint64) (*meta.NodeInfo, error) { - return &meta.NodeInfo{ - ID: nodeID, - TCPHost: m.host, - }, nil -} - -func (m *metaClient) ShardOwner(shardID uint64) (db, rp string, sgi *meta.ShardGroupInfo) { - return "db", "rp", &meta.ShardGroupInfo{} -} - -type testService struct { - nodeID uint64 - ln net.Listener - muxln net.Listener - responses chan *serviceResponse - - TSDBStore TSDBStore -} - -func newTestWriteService(f func(shardID uint64, points []models.Point) error) testService { - ln, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - panic(err) - } - - mux := tcp.NewMux() - muxln := mux.Listen(cluster.MuxHeader) - go mux.Serve(ln) - - s := testService{ - ln: ln, - muxln: muxln, - } - s.TSDBStore.WriteToShardFn = f - s.responses = make(chan *serviceResponse, 1024) - return s -} - -func (ts *testService) Close() { - if ts.ln != nil { - ts.ln.Close() - } -} - -type serviceResponses []serviceResponse -type serviceResponse struct { - shardID uint64 - ownerID uint64 - points []models.Point -} - -func (ts *testService) writeShardSuccess(shardID uint64, points []models.Point) error { - ts.responses <- &serviceResponse{ - shardID: shardID, - points: points, - } - return nil -} - -func writeShardFail(shardID uint64, points []models.Point) error { - return fmt.Errorf("failed to write") -} - -func writeShardSlow(shardID uint64, points []models.Point) error { - time.Sleep(1 * time.Second) - return nil -} - -func (ts *testService) ResponseN(n int) ([]*serviceResponse, error) { - var a []*serviceResponse - for { - select { - case r := <-ts.responses: - a = append(a, r) - if len(a) == n { - return a, nil - } - case <-time.After(time.Second): - return a, fmt.Errorf("unexpected response count: expected: %d, actual: %d", n, len(a)) - } - } -} - -// Service is a test wrapper for cluster.Service. -type Service struct { - *cluster.Service - - ln net.Listener - TSDBStore TSDBStore -} - -// NewService returns a new instance of Service. -func NewService() *Service { - s := &Service{ - Service: cluster.NewService(cluster.Config{}), - } - s.Service.TSDBStore = &s.TSDBStore - return s -} - -// MustOpenService returns a new, open service on a random port. Panic on error. -func MustOpenService() *Service { - s := NewService() - s.ln = MustListen("tcp", "127.0.0.1:0") - s.Listener = &muxListener{s.ln} - if err := s.Open(); err != nil { - panic(err) - } - return s -} - -// Close closes the listener and waits for the service to close. -func (s *Service) Close() error { - if s.ln != nil { - s.ln.Close() - } - return s.Service.Close() -} - -// Addr returns the network address of the service. -func (s *Service) Addr() net.Addr { return s.ln.Addr() } - -// muxListener is a net.Listener implementation that strips off the first byte. -// This is used to simulate the listener from pkg/mux. -type muxListener struct { - net.Listener -} - -// Accept accepts the next connection and removes the first byte. -func (ln *muxListener) Accept() (net.Conn, error) { - conn, err := ln.Listener.Accept() - if err != nil { - return nil, err - } - - var buf [1]byte - if _, err := io.ReadFull(conn, buf[:]); err != nil { - conn.Close() - return nil, err - } else if buf[0] != cluster.MuxHeader { - conn.Close() - panic(fmt.Sprintf("unexpected mux header byte: %d", buf[0])) - } - - return conn, nil -} - -// MustListen opens a listener. Panic on error. -func MustListen(network, laddr string) net.Listener { - ln, err := net.Listen(network, laddr) - if err != nil { - panic(err) - } - return ln -} diff --git a/cmd/influxd/run/config.go b/cmd/influxd/run/config.go index c137676c39..27b6f35f42 100644 --- a/cmd/influxd/run/config.go +++ b/cmd/influxd/run/config.go @@ -14,7 +14,7 @@ import ( "time" "github.com/BurntSushi/toml" - "github.com/influxdata/influxdb/cluster" + "github.com/influxdata/influxdb/coordinator" "github.com/influxdata/influxdb/monitor" "github.com/influxdata/influxdb/services/admin" "github.com/influxdata/influxdb/services/collectd" @@ -41,11 +41,11 @@ const ( // Config represents the configuration format for the influxd binary. type Config struct { - Meta *meta.Config `toml:"meta"` - Data tsdb.Config `toml:"data"` - Cluster cluster.Config `toml:"cluster"` - Retention retention.Config `toml:"retention"` - Precreator precreator.Config `toml:"shard-precreation"` + Meta *meta.Config `toml:"meta"` + Data tsdb.Config `toml:"data"` + Coordinator coordinator.Config `toml:"coordinator"` + Retention retention.Config `toml:"retention"` + Precreator precreator.Config `toml:"shard-precreation"` Admin admin.Config `toml:"admin"` Monitor monitor.Config `toml:"monitor"` @@ -76,7 +76,7 @@ func NewConfig() *Config { c := &Config{} c.Meta = meta.NewConfig() c.Data = tsdb.NewConfig() - c.Cluster = cluster.NewConfig() + c.Coordinator = coordinator.NewConfig() c.Precreator = precreator.NewConfig() c.Admin = admin.NewConfig() @@ -140,6 +140,16 @@ func (c *Config) FromToml(input string) error { log.Printf("deprecated config option %s replaced with %s; %s will not be supported in a future release\n", in, out, in) return out }) + + // Replace deprecated [cluster] with [coordinator] + re = regexp.MustCompile(`(?m)^\s*\[(cluster)\]`) + input = re.ReplaceAllStringFunc(input, func(in string) string { + in = strings.TrimSpace(in) + out := "[coordinator]" + log.Printf("deprecated config option %s replaced with %s; %s will not be supported in a future release\n", in, out, in) + return out + }) + _, err := toml.Decode(input, c) return err } diff --git a/cmd/influxd/run/config_test.go b/cmd/influxd/run/config_test.go index 725977ba02..030088e263 100644 --- a/cmd/influxd/run/config_test.go +++ b/cmd/influxd/run/config_test.go @@ -252,6 +252,9 @@ bind-address = ":1000" [opentsdb] bind-address = ":2000" + +[cluster] +max-select-point = 100 `); err != nil { t.Fatal(err) } @@ -261,5 +264,8 @@ bind-address = ":2000" t.Fatalf("unexpected collectd bind address: %s", c.CollectdInputs[0].BindAddress) } else if c.OpenTSDBInputs[0].BindAddress != ":2000" { t.Fatalf("unexpected opentsdb bind address: %s", c.OpenTSDBInputs[0].BindAddress) + } else if c.Coordinator.MaxSelectPointN != 100 { + t.Fatalf("unexpected coordinator max select points: %s", c.Coordinator.MaxSelectPointN) + } } diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index f157623001..33b094fac4 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -12,7 +12,7 @@ import ( "time" "github.com/influxdata/influxdb" - "github.com/influxdata/influxdb/cluster" + "github.com/influxdata/influxdb/coordinator" "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/monitor" @@ -58,13 +58,12 @@ type Server struct { TSDBStore *tsdb.Store QueryExecutor *influxql.QueryExecutor - PointsWriter *cluster.PointsWriter + PointsWriter *coordinator.PointsWriter Subscriber *subscriber.Service Services []Service // These references are required for the tcp muxer. - ClusterService *cluster.Service SnapshotterService *snapshotter.Service Monitor *monitor.Monitor @@ -165,25 +164,25 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) { s.Subscriber = subscriber.NewService(c.Subscriber) // Initialize points writer. - s.PointsWriter = cluster.NewPointsWriter() - s.PointsWriter.WriteTimeout = time.Duration(c.Cluster.WriteTimeout) + s.PointsWriter = coordinator.NewPointsWriter() + s.PointsWriter.WriteTimeout = time.Duration(c.Coordinator.WriteTimeout) s.PointsWriter.TSDBStore = s.TSDBStore s.PointsWriter.Subscriber = s.Subscriber // Initialize query executor. s.QueryExecutor = influxql.NewQueryExecutor() - s.QueryExecutor.StatementExecutor = &cluster.StatementExecutor{ + s.QueryExecutor.StatementExecutor = &coordinator.StatementExecutor{ MetaClient: s.MetaClient, - TSDBStore: cluster.LocalTSDBStore{Store: s.TSDBStore}, + TSDBStore: coordinator.LocalTSDBStore{Store: s.TSDBStore}, Monitor: s.Monitor, PointsWriter: s.PointsWriter, - MaxSelectPointN: c.Cluster.MaxSelectPointN, - MaxSelectSeriesN: c.Cluster.MaxSelectSeriesN, - MaxSelectBucketsN: c.Cluster.MaxSelectBucketsN, + MaxSelectPointN: c.Coordinator.MaxSelectPointN, + MaxSelectSeriesN: c.Coordinator.MaxSelectSeriesN, + MaxSelectBucketsN: c.Coordinator.MaxSelectBucketsN, } - s.QueryExecutor.QueryTimeout = time.Duration(c.Cluster.QueryTimeout) - s.QueryExecutor.LogQueriesAfter = time.Duration(c.Cluster.LogQueriesAfter) - s.QueryExecutor.MaxConcurrentQueries = c.Cluster.MaxConcurrentQueries + s.QueryExecutor.QueryTimeout = time.Duration(c.Coordinator.QueryTimeout) + s.QueryExecutor.LogQueriesAfter = time.Duration(c.Coordinator.LogQueriesAfter) + s.QueryExecutor.MaxConcurrentQueries = c.Coordinator.MaxConcurrentQueries if c.Data.QueryLogEnabled { s.QueryExecutor.Logger = log.New(os.Stderr, "[query] ", log.LstdFlags) } @@ -197,14 +196,6 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) { return s, nil } -func (s *Server) appendClusterService(c cluster.Config) { - srv := cluster.NewService(c) - srv.TSDBStore = cluster.LocalTSDBStore{Store: s.TSDBStore} - srv.Monitor = s.Monitor - s.Services = append(s.Services, srv) - s.ClusterService = srv -} - func (s *Server) appendSnapshotterService() { srv := snapshotter.NewService() srv.TSDBStore = s.TSDBStore @@ -241,7 +232,6 @@ func (s *Server) Open() error { // Append services. s.appendMonitorService() - s.appendClusterService(s.config.Cluster) s.appendPrecreatorService(s.config.Precreator) s.appendSnapshotterService() s.appendAdminService(s.config.Admin) @@ -270,7 +260,6 @@ func (s *Server) Open() error { s.PointsWriter.MetaClient = s.MetaClient s.Monitor.MetaClient = s.MetaClient - s.ClusterService.Listener = mux.Listen(cluster.MuxHeader) s.SnapshotterService.Listener = mux.Listen(snapshotter.MuxHeader) // Configure logging for all services and clients. @@ -283,7 +272,6 @@ func (s *Server) Open() error { for _, svc := range s.Services { svc.SetLogOutput(w) } - s.ClusterService.SetLogOutput(w) s.SnapshotterService.SetLogOutput(w) s.Monitor.SetLogOutput(w) @@ -502,12 +490,12 @@ type tcpaddr struct{ host string } func (a *tcpaddr) Network() string { return "tcp" } func (a *tcpaddr) String() string { return a.host } -// monitorPointsWriter is a wrapper around `cluster.PointsWriter` that helps +// monitorPointsWriter is a wrapper around `coordinator.PointsWriter` that helps // to prevent a circular dependency between the `cluster` and `monitor` packages. -type monitorPointsWriter cluster.PointsWriter +type monitorPointsWriter coordinator.PointsWriter func (pw *monitorPointsWriter) WritePoints(database, retentionPolicy string, points models.Points) error { - return (*cluster.PointsWriter)(pw).WritePoints(database, retentionPolicy, models.ConsistencyLevelAny, points) + return (*coordinator.PointsWriter)(pw).WritePoints(database, retentionPolicy, models.ConsistencyLevelAny, points) } func (s *Server) remoteAddr(addr string) string { diff --git a/cmd/influxd/run/server_helpers_test.go b/cmd/influxd/run/server_helpers_test.go index 79ae29f47b..31f795bc82 100644 --- a/cmd/influxd/run/server_helpers_test.go +++ b/cmd/influxd/run/server_helpers_test.go @@ -230,8 +230,7 @@ func NewConfig() *run.Config { c := run.NewConfig() c.BindAddress = "127.0.0.1:0" c.ReportingDisabled = true - c.Cluster.ShardWriterTimeout = toml.Duration(30 * time.Second) - c.Cluster.WriteTimeout = toml.Duration(30 * time.Second) + c.Coordinator.WriteTimeout = toml.Duration(30 * time.Second) c.Meta.Dir = MustTempFile() if !testing.Verbose() { diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index 108e1aed13..fbbc8ace22 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -9,7 +9,7 @@ import ( "testing" "time" - "github.com/influxdata/influxdb/cluster" + "github.com/influxdata/influxdb/coordinator" "github.com/influxdata/influxdb/models" ) @@ -6097,7 +6097,7 @@ func TestServer_ConcurrentPointsWriter_Subscriber(t *testing.T) { case <-done: return default: - wpr := &cluster.WritePointsRequest{ + wpr := &coordinator.WritePointsRequest{ Database: "db0", RetentionPolicy: "rp0", } diff --git a/coordinator/config.go b/coordinator/config.go new file mode 100644 index 0000000000..add06d2e9d --- /dev/null +++ b/coordinator/config.go @@ -0,0 +1,47 @@ +package coordinator + +import ( + "time" + + "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxdb/toml" +) + +const ( + // DefaultWriteTimeout is the default timeout for a complete write to succeed. + DefaultWriteTimeout = 10 * time.Second + + // DefaultMaxConcurrentQueries is the maximum number of running queries. + // A value of zero will make the maximum query limit unlimited. + DefaultMaxConcurrentQueries = 0 + + // DefaultMaxSelectPointN is the maximum number of points a SELECT can process. + // A value of zero will make the maximum point count unlimited. + DefaultMaxSelectPointN = 0 + + // DefaultMaxSelectSeriesN is the maximum number of series a SELECT can run. + // A value of zero will make the maximum series count unlimited. + DefaultMaxSelectSeriesN = 0 +) + +// Config represents the configuration for the clustering service. +type Config struct { + WriteTimeout toml.Duration `toml:"write-timeout"` + MaxConcurrentQueries int `toml:"max-concurrent-queries"` + QueryTimeout toml.Duration `toml:"query-timeout"` + LogQueriesAfter toml.Duration `toml:"log-queries-after"` + MaxSelectPointN int `toml:"max-select-point"` + MaxSelectSeriesN int `toml:"max-select-series"` + MaxSelectBucketsN int `toml:"max-select-buckets"` +} + +// NewConfig returns an instance of Config with defaults. +func NewConfig() Config { + return Config{ + WriteTimeout: toml.Duration(DefaultWriteTimeout), + QueryTimeout: toml.Duration(influxql.DefaultQueryTimeout), + MaxConcurrentQueries: DefaultMaxConcurrentQueries, + MaxSelectPointN: DefaultMaxSelectPointN, + MaxSelectSeriesN: DefaultMaxSelectSeriesN, + } +} diff --git a/cluster/config_test.go b/coordinator/config_test.go similarity index 50% rename from cluster/config_test.go rename to coordinator/config_test.go index ed3bdf8c3c..2f21436739 100644 --- a/cluster/config_test.go +++ b/coordinator/config_test.go @@ -1,27 +1,24 @@ -package cluster_test +package coordinator_test import ( "testing" "time" "github.com/BurntSushi/toml" - "github.com/influxdata/influxdb/cluster" + "github.com/influxdata/influxdb/coordinator" ) func TestConfig_Parse(t *testing.T) { // Parse configuration. - var c cluster.Config + var c coordinator.Config if _, err := toml.Decode(` -shard-writer-timeout = "10s" write-timeout = "20s" `, &c); err != nil { t.Fatal(err) } // Validate configuration. - if time.Duration(c.ShardWriterTimeout) != 10*time.Second { - t.Fatalf("unexpected shard-writer timeout: %s", c.ShardWriterTimeout) - } else if time.Duration(c.WriteTimeout) != 20*time.Second { + if time.Duration(c.WriteTimeout) != 20*time.Second { t.Fatalf("unexpected write timeout s: %s", c.WriteTimeout) } } diff --git a/cluster/meta_client.go b/coordinator/meta_client.go similarity index 98% rename from cluster/meta_client.go rename to coordinator/meta_client.go index 44056e49ff..64ae32c225 100644 --- a/cluster/meta_client.go +++ b/coordinator/meta_client.go @@ -1,4 +1,4 @@ -package cluster +package coordinator import ( "time" diff --git a/cluster/meta_client_test.go b/coordinator/meta_client_test.go similarity index 99% rename from cluster/meta_client_test.go rename to coordinator/meta_client_test.go index 1e3a8260a4..92ce941845 100644 --- a/cluster/meta_client_test.go +++ b/coordinator/meta_client_test.go @@ -1,4 +1,4 @@ -package cluster_test +package coordinator_test import ( "time" diff --git a/cluster/points_writer.go b/coordinator/points_writer.go similarity index 91% rename from cluster/points_writer.go rename to coordinator/points_writer.go index a93d8a0287..c4ba382022 100644 --- a/cluster/points_writer.go +++ b/coordinator/points_writer.go @@ -1,4 +1,4 @@ -package cluster +package coordinator import ( "errors" @@ -79,6 +79,24 @@ type PointsWriter struct { statMap *expvar.Map } +// WritePointsRequest represents a request to write point data to the cluster +type WritePointsRequest struct { + Database string + RetentionPolicy string + Points []models.Point +} + +// AddPoint adds a point to the WritePointRequest with field key 'value' +func (w *WritePointsRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string) { + pt, err := models.NewPoint( + name, tags, map[string]interface{}{"value": value}, timestamp, + ) + if err != nil { + return + } + w.Points = append(w.Points, pt) +} + // NewPointsWriter returns a new instance of PointsWriter for a node. func NewPointsWriter() *PointsWriter { return &PointsWriter{ @@ -234,10 +252,16 @@ func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistency w.statMap.Add(statSubWriteDrop, 1) } + timeout := time.NewTimer(w.WriteTimeout) + defer timeout.Stop() for range shardMappings.Points { select { case <-w.closing: return ErrWriteFailed + case <-timeout.C: + w.statMap.Add(statWriteTimeout, 1) + // return timeout error to caller + return ErrTimeout case err := <-ch: if err != nil { return err diff --git a/cluster/points_writer_test.go b/coordinator/points_writer_test.go similarity index 93% rename from cluster/points_writer_test.go rename to coordinator/points_writer_test.go index eb299d35c8..5bd7eb13db 100644 --- a/cluster/points_writer_test.go +++ b/coordinator/points_writer_test.go @@ -1,4 +1,4 @@ -package cluster_test +package coordinator_test import ( "fmt" @@ -9,7 +9,7 @@ import ( "time" "github.com/influxdata/influxdb" - "github.com/influxdata/influxdb/cluster" + "github.com/influxdata/influxdb/coordinator" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/services/meta" ) @@ -30,15 +30,15 @@ func TestPointsWriter_MapShards_One(t *testing.T) { return &rp.ShardGroups[0], nil } - c := cluster.PointsWriter{MetaClient: ms} - pr := &cluster.WritePointsRequest{ + c := coordinator.PointsWriter{MetaClient: ms} + pr := &coordinator.WritePointsRequest{ Database: "mydb", RetentionPolicy: "myrp", } pr.AddPoint("cpu", 1.0, time.Now(), nil) var ( - shardMappings *cluster.ShardMapping + shardMappings *coordinator.ShardMapping err error ) if shardMappings, err = c.MapShards(pr); err != nil { @@ -79,8 +79,8 @@ func TestPointsWriter_MapShards_Multiple(t *testing.T) { panic("should not get here") } - c := cluster.PointsWriter{MetaClient: ms} - pr := &cluster.WritePointsRequest{ + c := coordinator.PointsWriter{MetaClient: ms} + pr := &coordinator.WritePointsRequest{ Database: "mydb", RetentionPolicy: "myrp", } @@ -92,7 +92,7 @@ func TestPointsWriter_MapShards_Multiple(t *testing.T) { pr.AddPoint("cpu", 3.0, time.Unix(0, 0).Add(time.Hour+time.Second), nil) var ( - shardMappings *cluster.ShardMapping + shardMappings *coordinator.ShardMapping err error ) if shardMappings, err = c.MapShards(pr); err != nil { @@ -150,7 +150,7 @@ func TestPointsWriter_WritePoints(t *testing.T) { for _, test := range tests { - pr := &cluster.WritePointsRequest{ + pr := &coordinator.WritePointsRequest{ Database: test.database, RetentionPolicy: test.retentionPolicy, } @@ -163,7 +163,7 @@ func TestPointsWriter_WritePoints(t *testing.T) { // copy to prevent data race theTest := test - sm := cluster.NewShardMapping() + sm := coordinator.NewShardMapping() sm.MapPoint( &meta.ShardInfo{ID: uint64(1), Owners: []meta.ShardOwner{ {NodeID: 1}, @@ -186,7 +186,7 @@ func TestPointsWriter_WritePoints(t *testing.T) { }}, pr.Points[2]) - // Local cluster.Node ShardWriter + // Local coordinator.Node ShardWriter // lock on the write increment since these functions get called in parallel var mu sync.Mutex sw := &fakeShardWriter{ @@ -217,13 +217,13 @@ func TestPointsWriter_WritePoints(t *testing.T) { } ms.NodeIDFn = func() uint64 { return 1 } - subPoints := make(chan *cluster.WritePointsRequest, 1) + subPoints := make(chan *coordinator.WritePointsRequest, 1) sub := Subscriber{} - sub.PointsFn = func() chan<- *cluster.WritePointsRequest { + sub.PointsFn = func() chan<- *coordinator.WritePointsRequest { return subPoints } - c := cluster.NewPointsWriter() + c := coordinator.NewPointsWriter() c.MetaClient = ms c.ShardWriter = sw c.TSDBStore = store @@ -337,10 +337,10 @@ func (m PointsWriterMetaClient) ShardOwner(shardID uint64) (string, string, *met } type Subscriber struct { - PointsFn func() chan<- *cluster.WritePointsRequest + PointsFn func() chan<- *coordinator.WritePointsRequest } -func (s Subscriber) Points() chan<- *cluster.WritePointsRequest { +func (s Subscriber) Points() chan<- *coordinator.WritePointsRequest { return s.PointsFn() } diff --git a/cluster/statement_executor.go b/coordinator/statement_executor.go similarity index 99% rename from cluster/statement_executor.go rename to coordinator/statement_executor.go index 56f777bbe6..4b65c99b0e 100644 --- a/cluster/statement_executor.go +++ b/coordinator/statement_executor.go @@ -1,4 +1,4 @@ -package cluster +package coordinator import ( "bytes" diff --git a/cluster/statement_executor_test.go b/coordinator/statement_executor_test.go similarity index 97% rename from cluster/statement_executor_test.go rename to coordinator/statement_executor_test.go index b2c2bdd0d5..18ac941bba 100644 --- a/cluster/statement_executor_test.go +++ b/coordinator/statement_executor_test.go @@ -1,4 +1,4 @@ -package cluster_test +package coordinator_test import ( "bytes" @@ -11,7 +11,7 @@ import ( "time" "github.com/davecgh/go-spew/spew" - "github.com/influxdata/influxdb/cluster" + "github.com/influxdata/influxdb/coordinator" "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/services/meta" @@ -159,13 +159,13 @@ func TestQueryExecutor_ExecuteQuery_MaxSelectBucketsN(t *testing.T) { } } -// QueryExecutor is a test wrapper for cluster.QueryExecutor. +// QueryExecutor is a test wrapper for coordinator.QueryExecutor. type QueryExecutor struct { *influxql.QueryExecutor MetaClient MetaClient TSDBStore TSDBStore - StatementExecutor *cluster.StatementExecutor + StatementExecutor *coordinator.StatementExecutor LogOutput bytes.Buffer } @@ -175,7 +175,7 @@ func NewQueryExecutor() *QueryExecutor { e := &QueryExecutor{ QueryExecutor: influxql.NewQueryExecutor(), } - e.StatementExecutor = &cluster.StatementExecutor{ + e.StatementExecutor = &coordinator.StatementExecutor{ MetaClient: &e.MetaClient, TSDBStore: &e.TSDBStore, } @@ -202,7 +202,7 @@ func (e *QueryExecutor) ExecuteQuery(query, database string, chunkSize int) <-ch return e.QueryExecutor.ExecuteQuery(MustParseQuery(query), database, chunkSize, false, make(chan struct{})) } -// TSDBStore is a mockable implementation of cluster.TSDBStore. +// TSDBStore is a mockable implementation of coordinator.TSDBStore. type TSDBStore struct { CreateShardFn func(database, policy string, shardID uint64) error WriteToShardFn func(shardID uint64, points []models.Point) error diff --git a/services/continuous_querier/service_test.go b/services/continuous_querier/service_test.go index ad08df55a0..ccb6b2c49f 100644 --- a/services/continuous_querier/service_test.go +++ b/services/continuous_querier/service_test.go @@ -9,7 +9,7 @@ import ( "testing" "time" - "github.com/influxdata/influxdb/cluster" + "github.com/influxdata/influxdb/coordinator" "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/services/meta" @@ -395,7 +395,7 @@ func (ms *MetaClient) AcquireLease(name string) (l *meta.Lease, err error) { return nil, meta.ErrServiceUnavailable } -// Databases returns a list of database info about each database in the cluster. +// Databases returns a list of database info about each database in the coordinator. func (ms *MetaClient) Databases() []meta.DatabaseInfo { ms.mu.RLock() defer ms.mu.RUnlock() @@ -506,7 +506,7 @@ func NewQueryExecutor(t *testing.T) *QueryExecutor { // PointsWriter is a mock points writer. type PointsWriter struct { - WritePointsFn func(p *cluster.WritePointsRequest) error + WritePointsFn func(p *coordinator.WritePointsRequest) error Err error PointsPerSecond int t *testing.T @@ -521,7 +521,7 @@ func NewPointsWriter(t *testing.T) *PointsWriter { } // WritePoints mocks writing points. -func (pw *PointsWriter) WritePoints(p *cluster.WritePointsRequest) error { +func (pw *PointsWriter) WritePoints(p *coordinator.WritePointsRequest) error { // If the test set a callback, call it. if pw.WritePointsFn != nil { if err := pw.WritePointsFn(p); err != nil { diff --git a/services/subscriber/service.go b/services/subscriber/service.go index 7c283689d8..1e6c071bbe 100644 --- a/services/subscriber/service.go +++ b/services/subscriber/service.go @@ -11,7 +11,7 @@ import ( "sync" "github.com/influxdata/influxdb" - "github.com/influxdata/influxdb/cluster" + "github.com/influxdata/influxdb/coordinator" "github.com/influxdata/influxdb/services/meta" ) @@ -24,7 +24,7 @@ const ( // PointsWriter is an interface for writing points to a subscription destination. // Only WritePoints() needs to be satisfied. type PointsWriter interface { - WritePoints(p *cluster.WritePointsRequest) error + WritePoints(p *coordinator.WritePointsRequest) error } // unique set that identifies a given subscription @@ -46,7 +46,7 @@ type Service struct { NewPointsWriter func(u url.URL) (PointsWriter, error) Logger *log.Logger statMap *expvar.Map - points chan *cluster.WritePointsRequest + points chan *coordinator.WritePointsRequest wg sync.WaitGroup closed bool closing chan struct{} @@ -60,7 +60,7 @@ func NewService(c Config) *Service { NewPointsWriter: newPointsWriter, Logger: log.New(os.Stderr, "[subscriber] ", log.LstdFlags), statMap: influxdb.NewStatistics("subscriber", "subscriber", nil), - points: make(chan *cluster.WritePointsRequest), + points: make(chan *coordinator.WritePointsRequest), closed: true, closing: make(chan struct{}), } @@ -214,7 +214,7 @@ func (s *Service) createSubscription(se subEntry, mode string, destinations []st } // Points returns a channel into which write point requests can be sent. -func (s *Service) Points() chan<- *cluster.WritePointsRequest { +func (s *Service) Points() chan<- *coordinator.WritePointsRequest { return s.points } @@ -253,7 +253,7 @@ type balancewriter struct { i int } -func (b *balancewriter) WritePoints(p *cluster.WritePointsRequest) error { +func (b *balancewriter) WritePoints(p *coordinator.WritePointsRequest) error { var lastErr error for range b.writers { // round robin through destinations. diff --git a/services/subscriber/service_test.go b/services/subscriber/service_test.go index c4d9da7da9..0339c89e01 100644 --- a/services/subscriber/service_test.go +++ b/services/subscriber/service_test.go @@ -5,7 +5,7 @@ import ( "testing" "time" - "github.com/influxdata/influxdb/cluster" + "github.com/influxdata/influxdb/coordinator" "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/services/subscriber" ) @@ -24,10 +24,10 @@ func (m MetaClient) WaitForDataChanged() chan struct{} { } type Subscription struct { - WritePointsFn func(*cluster.WritePointsRequest) error + WritePointsFn func(*coordinator.WritePointsRequest) error } -func (s Subscription) WritePoints(p *cluster.WritePointsRequest) error { +func (s Subscription) WritePoints(p *coordinator.WritePointsRequest) error { return s.WritePointsFn(p) } @@ -53,11 +53,11 @@ func TestService_IgnoreNonMatch(t *testing.T) { } } - prs := make(chan *cluster.WritePointsRequest, 2) + prs := make(chan *coordinator.WritePointsRequest, 2) urls := make(chan url.URL, 2) newPointsWriter := func(u url.URL) (subscriber.PointsWriter, error) { sub := Subscription{} - sub.WritePointsFn = func(p *cluster.WritePointsRequest) error { + sub.WritePointsFn = func(p *coordinator.WritePointsRequest) error { prs <- p return nil } @@ -88,11 +88,11 @@ func TestService_IgnoreNonMatch(t *testing.T) { } // Write points that don't match any subscription. - s.Points() <- &cluster.WritePointsRequest{ + s.Points() <- &coordinator.WritePointsRequest{ Database: "db1", RetentionPolicy: "rp0", } - s.Points() <- &cluster.WritePointsRequest{ + s.Points() <- &coordinator.WritePointsRequest{ Database: "db0", RetentionPolicy: "rp2", } @@ -128,11 +128,11 @@ func TestService_ModeALL(t *testing.T) { } } - prs := make(chan *cluster.WritePointsRequest, 2) + prs := make(chan *coordinator.WritePointsRequest, 2) urls := make(chan url.URL, 2) newPointsWriter := func(u url.URL) (subscriber.PointsWriter, error) { sub := Subscription{} - sub.WritePointsFn = func(p *cluster.WritePointsRequest) error { + sub.WritePointsFn = func(p *coordinator.WritePointsRequest) error { prs <- p return nil } @@ -163,7 +163,7 @@ func TestService_ModeALL(t *testing.T) { } // Write points that match subscription with mode ALL - expPR := &cluster.WritePointsRequest{ + expPR := &coordinator.WritePointsRequest{ Database: "db0", RetentionPolicy: "rp0", } @@ -171,7 +171,7 @@ func TestService_ModeALL(t *testing.T) { // Should get pr back twice for i := 0; i < 2; i++ { - var pr *cluster.WritePointsRequest + var pr *coordinator.WritePointsRequest select { case pr = <-prs: case <-time.After(10 * time.Millisecond): @@ -206,11 +206,11 @@ func TestService_ModeANY(t *testing.T) { } } - prs := make(chan *cluster.WritePointsRequest, 2) + prs := make(chan *coordinator.WritePointsRequest, 2) urls := make(chan url.URL, 2) newPointsWriter := func(u url.URL) (subscriber.PointsWriter, error) { sub := Subscription{} - sub.WritePointsFn = func(p *cluster.WritePointsRequest) error { + sub.WritePointsFn = func(p *coordinator.WritePointsRequest) error { prs <- p return nil } @@ -240,14 +240,14 @@ func TestService_ModeANY(t *testing.T) { } } // Write points that match subscription with mode ANY - expPR := &cluster.WritePointsRequest{ + expPR := &coordinator.WritePointsRequest{ Database: "db0", RetentionPolicy: "rp0", } s.Points() <- expPR // Validate we get the pr back just once - var pr *cluster.WritePointsRequest + var pr *coordinator.WritePointsRequest select { case pr = <-prs: case <-time.After(10 * time.Millisecond): @@ -294,11 +294,11 @@ func TestService_Multiple(t *testing.T) { } } - prs := make(chan *cluster.WritePointsRequest, 4) + prs := make(chan *coordinator.WritePointsRequest, 4) urls := make(chan url.URL, 4) newPointsWriter := func(u url.URL) (subscriber.PointsWriter, error) { sub := Subscription{} - sub.WritePointsFn = func(p *cluster.WritePointsRequest) error { + sub.WritePointsFn = func(p *coordinator.WritePointsRequest) error { prs <- p return nil } @@ -329,24 +329,24 @@ func TestService_Multiple(t *testing.T) { } // Write points that don't match any subscription. - s.Points() <- &cluster.WritePointsRequest{ + s.Points() <- &coordinator.WritePointsRequest{ Database: "db1", RetentionPolicy: "rp0", } - s.Points() <- &cluster.WritePointsRequest{ + s.Points() <- &coordinator.WritePointsRequest{ Database: "db0", RetentionPolicy: "rp2", } // Write points that match subscription with mode ANY - expPR := &cluster.WritePointsRequest{ + expPR := &coordinator.WritePointsRequest{ Database: "db0", RetentionPolicy: "rp0", } s.Points() <- expPR // Validate we get the pr back just once - var pr *cluster.WritePointsRequest + var pr *coordinator.WritePointsRequest select { case pr = <-prs: case <-time.After(10 * time.Millisecond): @@ -364,7 +364,7 @@ func TestService_Multiple(t *testing.T) { } // Write points that match subscription with mode ALL - expPR = &cluster.WritePointsRequest{ + expPR = &coordinator.WritePointsRequest{ Database: "db0", RetentionPolicy: "rp1", } diff --git a/services/subscriber/udp.go b/services/subscriber/udp.go index ba7405c0b9..0723ad19dc 100644 --- a/services/subscriber/udp.go +++ b/services/subscriber/udp.go @@ -3,7 +3,7 @@ package subscriber import ( "net" - "github.com/influxdata/influxdb/cluster" + "github.com/influxdata/influxdb/coordinator" ) // UDP supports writing points over UDP using the line protocol. @@ -17,7 +17,7 @@ func NewUDP(addr string) *UDP { } // WritePoints writes points over UDP transport. -func (u *UDP) WritePoints(p *cluster.WritePointsRequest) (err error) { +func (u *UDP) WritePoints(p *coordinator.WritePointsRequest) (err error) { var addr *net.UDPAddr var con *net.UDPConn addr, err = net.ResolveUDPAddr("udp", u.addr) diff --git a/toml/toml_test.go b/toml/toml_test.go index bc8db22ccd..4ce4f058a9 100644 --- a/toml/toml_test.go +++ b/toml/toml_test.go @@ -33,7 +33,7 @@ func TestSize_UnmarshalText_GB(t *testing.T) { func TestConfig_Encode(t *testing.T) { var c run.Config - c.Cluster.WriteTimeout = itoml.Duration(time.Minute) + c.Coordinator.WriteTimeout = itoml.Duration(time.Minute) buf := new(bytes.Buffer) if err := toml.NewEncoder(buf).Encode(&c); err != nil { t.Fatal("Failed to encode: ", err)