Merge pull request #6604 from influxdata/jw-cluster

Remove old cluster code
pull/6613/head
Jason Wilder 2016-05-11 16:57:06 -06:00
commit 590b3f9fc7
31 changed files with 185 additions and 3080 deletions

View File

@ -1,9 +1,17 @@
## v1.0.0 [unreleased]
### Release Notes
* Config option `[cluster]` has been replaced with `[coordinator]`
### Features
- [#3541](https://github.com/influxdata/influxdb/issues/3451): Update SHOW FIELD KEYS to return the field type with the field key.
### Bugfixes
- [#6604](https://github.com/influxdata/influxdb/pull/6604): Remove old cluster code
## v0.13.0 [unreleased]
### Release Notes

View File

@ -1,69 +0,0 @@
package cluster
import (
"math/rand"
"github.com/influxdata/influxdb/services/meta"
)
// Balancer represents a load-balancing algorithm for a set of nodes
type Balancer interface {
// Next returns the next Node according to the balancing method
// or nil if there are no nodes available
Next() *meta.NodeInfo
}
type nodeBalancer struct {
nodes []meta.NodeInfo // data nodes to balance between
p int // current node index
}
// NewNodeBalancer create a shuffled, round-robin balancer so that
// multiple instances will return nodes in randomized order and each
// each returned node will be repeated in a cycle
func NewNodeBalancer(nodes []meta.NodeInfo) Balancer {
// make a copy of the node slice so we can randomize it
// without affecting the original instance as well as ensure
// that each Balancer returns nodes in a different order
b := &nodeBalancer{}
b.nodes = make([]meta.NodeInfo, len(nodes))
copy(b.nodes, nodes)
b.shuffle()
return b
}
// shuffle randomizes the ordering the balancers available nodes
func (b *nodeBalancer) shuffle() {
for i := range b.nodes {
j := rand.Intn(i + 1)
b.nodes[i], b.nodes[j] = b.nodes[j], b.nodes[i]
}
}
// online returns a slice of the nodes that are online
func (b *nodeBalancer) online() []meta.NodeInfo {
return b.nodes
}
// Next returns the next available nodes
func (b *nodeBalancer) Next() *meta.NodeInfo {
// only use online nodes
up := b.online()
// no nodes online
if len(up) == 0 {
return nil
}
// rollover back to the beginning
if b.p >= len(up) {
b.p = 0
}
d := &up[b.p]
b.p++
return d
}

View File

@ -1,115 +0,0 @@
package cluster_test
import (
"fmt"
"testing"
"github.com/influxdata/influxdb/cluster"
"github.com/influxdata/influxdb/services/meta"
)
func NewNodes() []meta.NodeInfo {
var nodes []meta.NodeInfo
for i := 1; i <= 2; i++ {
nodes = append(nodes, meta.NodeInfo{
ID: uint64(i),
Host: fmt.Sprintf("localhost:999%d", i),
})
}
return nodes
}
func TestBalancerEmptyNodes(t *testing.T) {
b := cluster.NewNodeBalancer([]meta.NodeInfo{})
got := b.Next()
if got != nil {
t.Errorf("expected nil, got %v", got)
}
}
func TestBalancerUp(t *testing.T) {
nodes := NewNodes()
b := cluster.NewNodeBalancer(nodes)
// First node in randomized round-robin order
first := b.Next()
if first == nil {
t.Errorf("expected datanode, got %v", first)
}
// Second node in randomized round-robin order
second := b.Next()
if second == nil {
t.Errorf("expected datanode, got %v", second)
}
// Should never get the same node in order twice
if first.ID == second.ID {
t.Errorf("expected first != second. got %v = %v", first.ID, second.ID)
}
}
/*
func TestBalancerDown(t *testing.T) {
nodes := NewNodes()
b := cluster.NewNodeBalancer(nodes)
nodes[0].Down()
// First node in randomized round-robin order
first := b.Next()
if first == nil {
t.Errorf("expected datanode, got %v", first)
}
// Second node should rollover to the first up node
second := b.Next()
if second == nil {
t.Errorf("expected datanode, got %v", second)
}
// Health node should be returned each time
if first.ID != 2 && first.ID != second.ID {
t.Errorf("expected first != second. got %v = %v", first.ID, second.ID)
}
}
*/
/*
func TestBalancerBackUp(t *testing.T) {
nodes := newDataNodes()
b := cluster.NewNodeBalancer(nodes)
nodes[0].Down()
for i := 0; i < 3; i++ {
got := b.Next()
if got == nil {
t.Errorf("expected datanode, got %v", got)
}
if exp := uint64(2); got.ID != exp {
t.Errorf("wrong node id: exp %v, got %v", exp, got.ID)
}
}
nodes[0].Up()
// First node in randomized round-robin order
first := b.Next()
if first == nil {
t.Errorf("expected datanode, got %v", first)
}
// Second node should rollover to the first up node
second := b.Next()
if second == nil {
t.Errorf("expected datanode, got %v", second)
}
// Should get both nodes returned
if first.ID == second.ID {
t.Errorf("expected first != second. got %v = %v", first.ID, second.ID)
}
}
*/

View File

@ -1,57 +0,0 @@
package cluster
import (
"net"
"sync"
"gopkg.in/fatih/pool.v2"
)
type clientPool struct {
mu sync.RWMutex
pool map[uint64]pool.Pool
}
func newClientPool() *clientPool {
return &clientPool{
pool: make(map[uint64]pool.Pool),
}
}
func (c *clientPool) setPool(nodeID uint64, p pool.Pool) {
c.mu.Lock()
c.pool[nodeID] = p
c.mu.Unlock()
}
func (c *clientPool) getPool(nodeID uint64) (pool.Pool, bool) {
c.mu.RLock()
p, ok := c.pool[nodeID]
c.mu.RUnlock()
return p, ok
}
func (c *clientPool) size() int {
c.mu.RLock()
var size int
for _, p := range c.pool {
size += p.Len()
}
c.mu.RUnlock()
return size
}
func (c *clientPool) conn(nodeID uint64) (net.Conn, error) {
c.mu.RLock()
conn, err := c.pool[nodeID].Get()
c.mu.RUnlock()
return conn, err
}
func (c *clientPool) close() {
c.mu.Lock()
for _, p := range c.pool {
p.Close()
}
c.mu.Unlock()
}

View File

@ -1 +0,0 @@
package cluster // import "github.com/influxdata/influxdb/cluster"

View File

@ -1,64 +0,0 @@
package cluster
import (
"time"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/toml"
)
const (
// DefaultWriteTimeout is the default timeout for a complete write to succeed.
DefaultWriteTimeout = 5 * time.Second
// DefaultShardWriterTimeout is the default timeout set on shard writers.
DefaultShardWriterTimeout = 5 * time.Second
// DefaultShardMapperTimeout is the default timeout set on shard mappers.
DefaultShardMapperTimeout = 5 * time.Second
// DefaultMaxRemoteWriteConnections is the maximum number of open connections
// that will be available for remote writes to another host.
DefaultMaxRemoteWriteConnections = 3
// DefaultMaxConcurrentQueries is the maximum number of running queries.
// A value of zero will make the maximum query limit unlimited.
DefaultMaxConcurrentQueries = 0
// DefaultMaxSelectPointN is the maximum number of points a SELECT can process.
// A value of zero will make the maximum point count unlimited.
DefaultMaxSelectPointN = 0
// DefaultMaxSelectSeriesN is the maximum number of series a SELECT can run.
// A value of zero will make the maximum series count unlimited.
DefaultMaxSelectSeriesN = 0
)
// Config represents the configuration for the clustering service.
type Config struct {
ForceRemoteShardMapping bool `toml:"force-remote-mapping"`
WriteTimeout toml.Duration `toml:"write-timeout"`
ShardWriterTimeout toml.Duration `toml:"shard-writer-timeout"`
MaxRemoteWriteConnections int `toml:"max-remote-write-connections"`
ShardMapperTimeout toml.Duration `toml:"shard-mapper-timeout"`
MaxConcurrentQueries int `toml:"max-concurrent-queries"`
QueryTimeout toml.Duration `toml:"query-timeout"`
LogQueriesAfter toml.Duration `toml:"log-queries-after"`
MaxSelectPointN int `toml:"max-select-point"`
MaxSelectSeriesN int `toml:"max-select-series"`
MaxSelectBucketsN int `toml:"max-select-buckets"`
}
// NewConfig returns an instance of Config with defaults.
func NewConfig() Config {
return Config{
WriteTimeout: toml.Duration(DefaultWriteTimeout),
ShardWriterTimeout: toml.Duration(DefaultShardWriterTimeout),
ShardMapperTimeout: toml.Duration(DefaultShardMapperTimeout),
QueryTimeout: toml.Duration(influxql.DefaultQueryTimeout),
MaxRemoteWriteConnections: DefaultMaxRemoteWriteConnections,
MaxConcurrentQueries: DefaultMaxConcurrentQueries,
MaxSelectPointN: DefaultMaxSelectPointN,
MaxSelectSeriesN: DefaultMaxSelectSeriesN,
}
}

View File

@ -1,575 +0,0 @@
// Code generated by protoc-gen-gogo.
// source: internal/data.proto
// DO NOT EDIT!
/*
Package cluster is a generated protocol buffer package.
It is generated from these files:
internal/data.proto
It has these top-level messages:
WriteShardRequest
WriteShardResponse
ExecuteStatementRequest
ExecuteStatementResponse
CreateIteratorRequest
CreateIteratorResponse
IteratorStats
FieldDimensionsRequest
FieldDimensionsResponse
SeriesKeysRequest
SeriesKeysResponse
ExpandSourcesRequest
ExpandSourcesResponse
RemoteMonitorRequest
RemoteMonitorResponse
BackupShardRequest
BackupShardResponse
CopyShardRequest
CopyShardResponse
*/
package cluster
import proto "github.com/gogo/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
type WriteShardRequest struct {
ShardID *uint64 `protobuf:"varint,1,req,name=ShardID" json:"ShardID,omitempty"`
Points [][]byte `protobuf:"bytes,2,rep,name=Points" json:"Points,omitempty"`
Database *string `protobuf:"bytes,3,opt,name=Database" json:"Database,omitempty"`
RetentionPolicy *string `protobuf:"bytes,4,opt,name=RetentionPolicy" json:"RetentionPolicy,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *WriteShardRequest) Reset() { *m = WriteShardRequest{} }
func (m *WriteShardRequest) String() string { return proto.CompactTextString(m) }
func (*WriteShardRequest) ProtoMessage() {}
func (m *WriteShardRequest) GetShardID() uint64 {
if m != nil && m.ShardID != nil {
return *m.ShardID
}
return 0
}
func (m *WriteShardRequest) GetPoints() [][]byte {
if m != nil {
return m.Points
}
return nil
}
func (m *WriteShardRequest) GetDatabase() string {
if m != nil && m.Database != nil {
return *m.Database
}
return ""
}
func (m *WriteShardRequest) GetRetentionPolicy() string {
if m != nil && m.RetentionPolicy != nil {
return *m.RetentionPolicy
}
return ""
}
type WriteShardResponse struct {
Code *int32 `protobuf:"varint,1,req,name=Code" json:"Code,omitempty"`
Message *string `protobuf:"bytes,2,opt,name=Message" json:"Message,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *WriteShardResponse) Reset() { *m = WriteShardResponse{} }
func (m *WriteShardResponse) String() string { return proto.CompactTextString(m) }
func (*WriteShardResponse) ProtoMessage() {}
func (m *WriteShardResponse) GetCode() int32 {
if m != nil && m.Code != nil {
return *m.Code
}
return 0
}
func (m *WriteShardResponse) GetMessage() string {
if m != nil && m.Message != nil {
return *m.Message
}
return ""
}
type ExecuteStatementRequest struct {
Statement *string `protobuf:"bytes,1,req,name=Statement" json:"Statement,omitempty"`
Database *string `protobuf:"bytes,2,req,name=Database" json:"Database,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *ExecuteStatementRequest) Reset() { *m = ExecuteStatementRequest{} }
func (m *ExecuteStatementRequest) String() string { return proto.CompactTextString(m) }
func (*ExecuteStatementRequest) ProtoMessage() {}
func (m *ExecuteStatementRequest) GetStatement() string {
if m != nil && m.Statement != nil {
return *m.Statement
}
return ""
}
func (m *ExecuteStatementRequest) GetDatabase() string {
if m != nil && m.Database != nil {
return *m.Database
}
return ""
}
type ExecuteStatementResponse struct {
Code *int32 `protobuf:"varint,1,req,name=Code" json:"Code,omitempty"`
Message *string `protobuf:"bytes,2,opt,name=Message" json:"Message,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *ExecuteStatementResponse) Reset() { *m = ExecuteStatementResponse{} }
func (m *ExecuteStatementResponse) String() string { return proto.CompactTextString(m) }
func (*ExecuteStatementResponse) ProtoMessage() {}
func (m *ExecuteStatementResponse) GetCode() int32 {
if m != nil && m.Code != nil {
return *m.Code
}
return 0
}
func (m *ExecuteStatementResponse) GetMessage() string {
if m != nil && m.Message != nil {
return *m.Message
}
return ""
}
type CreateIteratorRequest struct {
ShardIDs []uint64 `protobuf:"varint,1,rep,name=ShardIDs" json:"ShardIDs,omitempty"`
Opt []byte `protobuf:"bytes,2,req,name=Opt" json:"Opt,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *CreateIteratorRequest) Reset() { *m = CreateIteratorRequest{} }
func (m *CreateIteratorRequest) String() string { return proto.CompactTextString(m) }
func (*CreateIteratorRequest) ProtoMessage() {}
func (m *CreateIteratorRequest) GetShardIDs() []uint64 {
if m != nil {
return m.ShardIDs
}
return nil
}
func (m *CreateIteratorRequest) GetOpt() []byte {
if m != nil {
return m.Opt
}
return nil
}
type CreateIteratorResponse struct {
Err *string `protobuf:"bytes,1,opt,name=Err" json:"Err,omitempty"`
Type *int32 `protobuf:"varint,2,req,name=Type" json:"Type,omitempty"`
Stats *IteratorStats `protobuf:"bytes,3,opt,name=Stats" json:"Stats,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *CreateIteratorResponse) Reset() { *m = CreateIteratorResponse{} }
func (m *CreateIteratorResponse) String() string { return proto.CompactTextString(m) }
func (*CreateIteratorResponse) ProtoMessage() {}
func (m *CreateIteratorResponse) GetErr() string {
if m != nil && m.Err != nil {
return *m.Err
}
return ""
}
func (m *CreateIteratorResponse) GetType() int32 {
if m != nil && m.Type != nil {
return *m.Type
}
return 0
}
func (m *CreateIteratorResponse) GetStats() *IteratorStats {
if m != nil {
return m.Stats
}
return nil
}
type IteratorStats struct {
SeriesN *int64 `protobuf:"varint,1,opt,name=SeriesN" json:"SeriesN,omitempty"`
PointN *int64 `protobuf:"varint,2,opt,name=PointN" json:"PointN,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *IteratorStats) Reset() { *m = IteratorStats{} }
func (m *IteratorStats) String() string { return proto.CompactTextString(m) }
func (*IteratorStats) ProtoMessage() {}
func (m *IteratorStats) GetSeriesN() int64 {
if m != nil && m.SeriesN != nil {
return *m.SeriesN
}
return 0
}
func (m *IteratorStats) GetPointN() int64 {
if m != nil && m.PointN != nil {
return *m.PointN
}
return 0
}
type FieldDimensionsRequest struct {
ShardIDs []uint64 `protobuf:"varint,1,rep,name=ShardIDs" json:"ShardIDs,omitempty"`
Sources []byte `protobuf:"bytes,2,req,name=Sources" json:"Sources,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *FieldDimensionsRequest) Reset() { *m = FieldDimensionsRequest{} }
func (m *FieldDimensionsRequest) String() string { return proto.CompactTextString(m) }
func (*FieldDimensionsRequest) ProtoMessage() {}
func (m *FieldDimensionsRequest) GetShardIDs() []uint64 {
if m != nil {
return m.ShardIDs
}
return nil
}
func (m *FieldDimensionsRequest) GetSources() []byte {
if m != nil {
return m.Sources
}
return nil
}
type FieldDimensionsResponse struct {
Fields []string `protobuf:"bytes,1,rep,name=Fields" json:"Fields,omitempty"`
Dimensions []string `protobuf:"bytes,2,rep,name=Dimensions" json:"Dimensions,omitempty"`
Err *string `protobuf:"bytes,3,opt,name=Err" json:"Err,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *FieldDimensionsResponse) Reset() { *m = FieldDimensionsResponse{} }
func (m *FieldDimensionsResponse) String() string { return proto.CompactTextString(m) }
func (*FieldDimensionsResponse) ProtoMessage() {}
func (m *FieldDimensionsResponse) GetFields() []string {
if m != nil {
return m.Fields
}
return nil
}
func (m *FieldDimensionsResponse) GetDimensions() []string {
if m != nil {
return m.Dimensions
}
return nil
}
func (m *FieldDimensionsResponse) GetErr() string {
if m != nil && m.Err != nil {
return *m.Err
}
return ""
}
type SeriesKeysRequest struct {
ShardIDs []uint64 `protobuf:"varint,1,rep,name=ShardIDs" json:"ShardIDs,omitempty"`
Opt []byte `protobuf:"bytes,2,req,name=Opt" json:"Opt,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *SeriesKeysRequest) Reset() { *m = SeriesKeysRequest{} }
func (m *SeriesKeysRequest) String() string { return proto.CompactTextString(m) }
func (*SeriesKeysRequest) ProtoMessage() {}
func (m *SeriesKeysRequest) GetShardIDs() []uint64 {
if m != nil {
return m.ShardIDs
}
return nil
}
func (m *SeriesKeysRequest) GetOpt() []byte {
if m != nil {
return m.Opt
}
return nil
}
type SeriesKeysResponse struct {
SeriesList []byte `protobuf:"bytes,1,opt,name=SeriesList" json:"SeriesList,omitempty"`
Err *string `protobuf:"bytes,2,opt,name=Err" json:"Err,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *SeriesKeysResponse) Reset() { *m = SeriesKeysResponse{} }
func (m *SeriesKeysResponse) String() string { return proto.CompactTextString(m) }
func (*SeriesKeysResponse) ProtoMessage() {}
func (m *SeriesKeysResponse) GetSeriesList() []byte {
if m != nil {
return m.SeriesList
}
return nil
}
func (m *SeriesKeysResponse) GetErr() string {
if m != nil && m.Err != nil {
return *m.Err
}
return ""
}
type ExpandSourcesRequest struct {
ShardIDs []uint64 `protobuf:"varint,1,rep,name=ShardIDs" json:"ShardIDs,omitempty"`
Sources []byte `protobuf:"bytes,2,req,name=Sources" json:"Sources,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *ExpandSourcesRequest) Reset() { *m = ExpandSourcesRequest{} }
func (m *ExpandSourcesRequest) String() string { return proto.CompactTextString(m) }
func (*ExpandSourcesRequest) ProtoMessage() {}
func (m *ExpandSourcesRequest) GetShardIDs() []uint64 {
if m != nil {
return m.ShardIDs
}
return nil
}
func (m *ExpandSourcesRequest) GetSources() []byte {
if m != nil {
return m.Sources
}
return nil
}
type ExpandSourcesResponse struct {
Sources []byte `protobuf:"bytes,1,req,name=Sources" json:"Sources,omitempty"`
Err *string `protobuf:"bytes,2,opt,name=Err" json:"Err,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *ExpandSourcesResponse) Reset() { *m = ExpandSourcesResponse{} }
func (m *ExpandSourcesResponse) String() string { return proto.CompactTextString(m) }
func (*ExpandSourcesResponse) ProtoMessage() {}
func (m *ExpandSourcesResponse) GetSources() []byte {
if m != nil {
return m.Sources
}
return nil
}
func (m *ExpandSourcesResponse) GetErr() string {
if m != nil && m.Err != nil {
return *m.Err
}
return ""
}
type RemoteMonitorRequest struct {
RemoteAddrs []string `protobuf:"bytes,1,rep,name=RemoteAddrs" json:"RemoteAddrs,omitempty"`
NodeID *string `protobuf:"bytes,2,req,name=NodeID" json:"NodeID,omitempty"`
Username *string `protobuf:"bytes,3,req,name=Username" json:"Username,omitempty"`
Password *string `protobuf:"bytes,4,req,name=Password" json:"Password,omitempty"`
ClusterID *uint64 `protobuf:"varint,5,req,name=ClusterID" json:"ClusterID,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *RemoteMonitorRequest) Reset() { *m = RemoteMonitorRequest{} }
func (m *RemoteMonitorRequest) String() string { return proto.CompactTextString(m) }
func (*RemoteMonitorRequest) ProtoMessage() {}
func (m *RemoteMonitorRequest) GetRemoteAddrs() []string {
if m != nil {
return m.RemoteAddrs
}
return nil
}
func (m *RemoteMonitorRequest) GetNodeID() string {
if m != nil && m.NodeID != nil {
return *m.NodeID
}
return ""
}
func (m *RemoteMonitorRequest) GetUsername() string {
if m != nil && m.Username != nil {
return *m.Username
}
return ""
}
func (m *RemoteMonitorRequest) GetPassword() string {
if m != nil && m.Password != nil {
return *m.Password
}
return ""
}
func (m *RemoteMonitorRequest) GetClusterID() uint64 {
if m != nil && m.ClusterID != nil {
return *m.ClusterID
}
return 0
}
type RemoteMonitorResponse struct {
Err *string `protobuf:"bytes,1,opt,name=Err" json:"Err,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *RemoteMonitorResponse) Reset() { *m = RemoteMonitorResponse{} }
func (m *RemoteMonitorResponse) String() string { return proto.CompactTextString(m) }
func (*RemoteMonitorResponse) ProtoMessage() {}
func (m *RemoteMonitorResponse) GetErr() string {
if m != nil && m.Err != nil {
return *m.Err
}
return ""
}
type BackupShardRequest struct {
ShardID *uint64 `protobuf:"varint,1,req,name=ShardID" json:"ShardID,omitempty"`
Since *int64 `protobuf:"varint,2,opt,name=Since" json:"Since,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *BackupShardRequest) Reset() { *m = BackupShardRequest{} }
func (m *BackupShardRequest) String() string { return proto.CompactTextString(m) }
func (*BackupShardRequest) ProtoMessage() {}
func (m *BackupShardRequest) GetShardID() uint64 {
if m != nil && m.ShardID != nil {
return *m.ShardID
}
return 0
}
func (m *BackupShardRequest) GetSince() int64 {
if m != nil && m.Since != nil {
return *m.Since
}
return 0
}
type BackupShardResponse struct {
Err *string `protobuf:"bytes,2,opt,name=Err" json:"Err,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *BackupShardResponse) Reset() { *m = BackupShardResponse{} }
func (m *BackupShardResponse) String() string { return proto.CompactTextString(m) }
func (*BackupShardResponse) ProtoMessage() {}
func (m *BackupShardResponse) GetErr() string {
if m != nil && m.Err != nil {
return *m.Err
}
return ""
}
type CopyShardRequest struct {
Host *string `protobuf:"bytes,1,req,name=Host" json:"Host,omitempty"`
Database *string `protobuf:"bytes,2,req,name=Database" json:"Database,omitempty"`
Policy *string `protobuf:"bytes,3,req,name=Policy" json:"Policy,omitempty"`
ShardID *uint64 `protobuf:"varint,4,req,name=ShardID" json:"ShardID,omitempty"`
Since *int64 `protobuf:"varint,5,opt,name=Since" json:"Since,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *CopyShardRequest) Reset() { *m = CopyShardRequest{} }
func (m *CopyShardRequest) String() string { return proto.CompactTextString(m) }
func (*CopyShardRequest) ProtoMessage() {}
func (m *CopyShardRequest) GetHost() string {
if m != nil && m.Host != nil {
return *m.Host
}
return ""
}
func (m *CopyShardRequest) GetDatabase() string {
if m != nil && m.Database != nil {
return *m.Database
}
return ""
}
func (m *CopyShardRequest) GetPolicy() string {
if m != nil && m.Policy != nil {
return *m.Policy
}
return ""
}
func (m *CopyShardRequest) GetShardID() uint64 {
if m != nil && m.ShardID != nil {
return *m.ShardID
}
return 0
}
func (m *CopyShardRequest) GetSince() int64 {
if m != nil && m.Since != nil {
return *m.Since
}
return 0
}
type CopyShardResponse struct {
Err *string `protobuf:"bytes,2,opt,name=Err" json:"Err,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *CopyShardResponse) Reset() { *m = CopyShardResponse{} }
func (m *CopyShardResponse) String() string { return proto.CompactTextString(m) }
func (*CopyShardResponse) ProtoMessage() {}
func (m *CopyShardResponse) GetErr() string {
if m != nil && m.Err != nil {
return *m.Err
}
return ""
}
func init() {
proto.RegisterType((*WriteShardRequest)(nil), "cluster.WriteShardRequest")
proto.RegisterType((*WriteShardResponse)(nil), "cluster.WriteShardResponse")
proto.RegisterType((*ExecuteStatementRequest)(nil), "cluster.ExecuteStatementRequest")
proto.RegisterType((*ExecuteStatementResponse)(nil), "cluster.ExecuteStatementResponse")
proto.RegisterType((*CreateIteratorRequest)(nil), "cluster.CreateIteratorRequest")
proto.RegisterType((*CreateIteratorResponse)(nil), "cluster.CreateIteratorResponse")
proto.RegisterType((*IteratorStats)(nil), "cluster.IteratorStats")
proto.RegisterType((*FieldDimensionsRequest)(nil), "cluster.FieldDimensionsRequest")
proto.RegisterType((*FieldDimensionsResponse)(nil), "cluster.FieldDimensionsResponse")
proto.RegisterType((*SeriesKeysRequest)(nil), "cluster.SeriesKeysRequest")
proto.RegisterType((*SeriesKeysResponse)(nil), "cluster.SeriesKeysResponse")
proto.RegisterType((*ExpandSourcesRequest)(nil), "cluster.ExpandSourcesRequest")
proto.RegisterType((*ExpandSourcesResponse)(nil), "cluster.ExpandSourcesResponse")
proto.RegisterType((*RemoteMonitorRequest)(nil), "cluster.RemoteMonitorRequest")
proto.RegisterType((*RemoteMonitorResponse)(nil), "cluster.RemoteMonitorResponse")
proto.RegisterType((*BackupShardRequest)(nil), "cluster.BackupShardRequest")
proto.RegisterType((*BackupShardResponse)(nil), "cluster.BackupShardResponse")
proto.RegisterType((*CopyShardRequest)(nil), "cluster.CopyShardRequest")
proto.RegisterType((*CopyShardResponse)(nil), "cluster.CopyShardResponse")
}

View File

@ -1,104 +0,0 @@
package cluster;
message WriteShardRequest {
required uint64 ShardID = 1;
repeated bytes Points = 2;
optional string Database = 3;
optional string RetentionPolicy = 4;
}
message WriteShardResponse {
required int32 Code = 1;
optional string Message = 2;
}
message ExecuteStatementRequest {
required string Statement = 1;
required string Database = 2;
}
message ExecuteStatementResponse {
required int32 Code = 1;
optional string Message = 2;
}
message CreateIteratorRequest {
repeated uint64 ShardIDs = 1;
required bytes Opt = 2;
}
message CreateIteratorResponse {
optional string Err = 1;
required int32 Type = 2;
optional IteratorStats Stats = 3;
}
message IteratorStats {
optional int64 SeriesN = 1;
optional int64 PointN = 2;
}
message FieldDimensionsRequest {
repeated uint64 ShardIDs = 1;
required bytes Sources = 2;
}
message FieldDimensionsResponse {
repeated string Fields = 1;
repeated string Dimensions = 2;
optional string Err = 3;
}
message SeriesKeysRequest {
repeated uint64 ShardIDs = 1;
required bytes Opt = 2;
}
message SeriesKeysResponse {
optional bytes SeriesList = 1;
optional string Err = 2;
}
message ExpandSourcesRequest {
repeated uint64 ShardIDs = 1;
required bytes Sources = 2;
}
message ExpandSourcesResponse {
required bytes Sources = 1;
optional string Err = 2;
}
message RemoteMonitorRequest {
repeated string RemoteAddrs = 1;
required string NodeID = 2;
required string Username = 3;
required string Password = 4;
required uint64 ClusterID = 5;
}
message RemoteMonitorResponse {
optional string Err = 1;
}
message BackupShardRequest {
required uint64 ShardID = 1;
optional int64 Since = 2;
}
message BackupShardResponse {
optional string Err = 2;
}
message CopyShardRequest {
required string Host = 1;
required string Database = 2;
required string Policy = 3;
required uint64 ShardID = 4;
optional int64 Since = 5;
}
message CopyShardResponse {
optional string Err = 2;
}

View File

@ -1,189 +0,0 @@
package cluster
import (
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"gopkg.in/fatih/pool.v2"
)
// boundedPool implements the Pool interface based on buffered channels.
type boundedPool struct {
// storage for our net.Conn connections
mu sync.Mutex
conns chan net.Conn
timeout time.Duration
total int32
// net.Conn generator
factory Factory
}
// Factory is a function to create new connections.
type Factory func() (net.Conn, error)
// NewBoundedPool returns a new pool based on buffered channels with an initial
// capacity, maximum capacity and timeout to wait for a connection from the pool.
// Factory is used when initial capacity is
// greater than zero to fill the pool. A zero initialCap doesn't fill the Pool
// until a new Get() is called. During a Get(), If there is no new connection
// available in the pool and total connections is less than the max, a new connection
// will be created via the Factory() method. Othewise, the call will block until
// a connection is available or the timeout is reached.
func NewBoundedPool(initialCap, maxCap int, timeout time.Duration, factory Factory) (pool.Pool, error) {
if initialCap < 0 || maxCap <= 0 || initialCap > maxCap {
return nil, errors.New("invalid capacity settings")
}
c := &boundedPool{
conns: make(chan net.Conn, maxCap),
factory: factory,
timeout: timeout,
}
// create initial connections, if something goes wrong,
// just close the pool error out.
for i := 0; i < initialCap; i++ {
conn, err := factory()
if err != nil {
c.Close()
return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
}
c.conns <- conn
atomic.AddInt32(&c.total, 1)
}
return c, nil
}
func (c *boundedPool) getConns() chan net.Conn {
c.mu.Lock()
conns := c.conns
c.mu.Unlock()
return conns
}
// Get implements the Pool interfaces Get() method. If there is no new
// connection available in the pool, a new connection will be created via the
// Factory() method.
func (c *boundedPool) Get() (net.Conn, error) {
conns := c.getConns()
if conns == nil {
return nil, pool.ErrClosed
}
// Try and grab a connection from the pool
select {
case conn := <-conns:
if conn == nil {
return nil, pool.ErrClosed
}
return c.wrapConn(conn), nil
default:
// Could not get connection, can we create a new one?
if atomic.LoadInt32(&c.total) < int32(cap(conns)) {
conn, err := c.factory()
if err != nil {
return nil, err
}
atomic.AddInt32(&c.total, 1)
return c.wrapConn(conn), nil
}
}
// The pool was empty and we couldn't create a new one to
// retry until one is free or we timeout
select {
case conn := <-conns:
if conn == nil {
return nil, pool.ErrClosed
}
return c.wrapConn(conn), nil
case <-time.After(c.timeout):
return nil, fmt.Errorf("timed out waiting for free connection")
}
}
// put puts the connection back to the pool. If the pool is full or closed,
// conn is simply closed. A nil conn will be rejected.
func (c *boundedPool) put(conn net.Conn) error {
if conn == nil {
return errors.New("connection is nil. rejecting")
}
c.mu.Lock()
defer c.mu.Unlock()
if c.conns == nil {
// pool is closed, close passed connection
return conn.Close()
}
// put the resource back into the pool. If the pool is full, this will
// block and the default case will be executed.
select {
case c.conns <- conn:
return nil
default:
// pool is full, close passed connection
atomic.AddInt32(&c.total, -1)
return conn.Close()
}
}
func (c *boundedPool) Close() {
c.mu.Lock()
conns := c.conns
c.conns = nil
c.factory = nil
c.mu.Unlock()
if conns == nil {
return
}
close(conns)
for conn := range conns {
conn.Close()
}
}
func (c *boundedPool) Len() int { return len(c.getConns()) }
// newConn wraps a standard net.Conn to a poolConn net.Conn.
func (c *boundedPool) wrapConn(conn net.Conn) net.Conn {
p := &pooledConn{c: c}
p.Conn = conn
return p
}
// pooledConn is a wrapper around net.Conn to modify the the behavior of
// net.Conn's Close() method.
type pooledConn struct {
net.Conn
c *boundedPool
unusable bool
}
// Close() puts the given connects back to the pool instead of closing it.
func (p pooledConn) Close() error {
if p.unusable {
if p.Conn != nil {
return p.Conn.Close()
}
return nil
}
return p.c.put(p.Conn)
}
// MarkUnusable() marks the connection not usable any more, to let the pool close it instead of returning it to pool.
func (p *pooledConn) MarkUnusable() {
p.unusable = true
atomic.AddInt32(&p.c.total, -1)
}

View File

@ -1,644 +0,0 @@
package cluster
import (
"errors"
"fmt"
"time"
"github.com/gogo/protobuf/proto"
internal "github.com/influxdata/influxdb/cluster/internal"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
)
//go:generate protoc --gogo_out=. internal/data.proto
// WritePointsRequest represents a request to write point data to the cluster
type WritePointsRequest struct {
Database string
RetentionPolicy string
Points []models.Point
}
// AddPoint adds a point to the WritePointRequest with field key 'value'
func (w *WritePointsRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string) {
pt, err := models.NewPoint(
name, tags, map[string]interface{}{"value": value}, timestamp,
)
if err != nil {
return
}
w.Points = append(w.Points, pt)
}
// WriteShardRequest represents the a request to write a slice of points to a shard
type WriteShardRequest struct {
pb internal.WriteShardRequest
}
// WriteShardResponse represents the response returned from a remote WriteShardRequest call
type WriteShardResponse struct {
pb internal.WriteShardResponse
}
// SetShardID sets the ShardID
func (w *WriteShardRequest) SetShardID(id uint64) { w.pb.ShardID = &id }
// ShardID gets the ShardID
func (w *WriteShardRequest) ShardID() uint64 { return w.pb.GetShardID() }
func (w *WriteShardRequest) SetDatabase(db string) { w.pb.Database = &db }
func (w *WriteShardRequest) SetRetentionPolicy(rp string) { w.pb.RetentionPolicy = &rp }
func (w *WriteShardRequest) Database() string { return w.pb.GetDatabase() }
func (w *WriteShardRequest) RetentionPolicy() string { return w.pb.GetRetentionPolicy() }
// Points returns the time series Points
func (w *WriteShardRequest) Points() []models.Point { return w.unmarshalPoints() }
// AddPoint adds a new time series point
func (w *WriteShardRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string) {
pt, err := models.NewPoint(
name, tags, map[string]interface{}{"value": value}, timestamp,
)
if err != nil {
return
}
w.AddPoints([]models.Point{pt})
}
// AddPoints adds a new time series point
func (w *WriteShardRequest) AddPoints(points []models.Point) {
for _, p := range points {
b, err := p.MarshalBinary()
if err != nil {
// A error here means that we create a point higher in the stack that we could
// not marshal to a byte slice. If that happens, the endpoint that created that
// point needs to be fixed.
panic(fmt.Sprintf("failed to marshal point: `%v`: %v", p, err))
}
w.pb.Points = append(w.pb.Points, b)
}
}
// MarshalBinary encodes the object to a binary format.
func (w *WriteShardRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(&w.pb)
}
// UnmarshalBinary populates WritePointRequest from a binary format.
func (w *WriteShardRequest) UnmarshalBinary(buf []byte) error {
if err := proto.Unmarshal(buf, &w.pb); err != nil {
return err
}
return nil
}
func (w *WriteShardRequest) unmarshalPoints() []models.Point {
points := make([]models.Point, len(w.pb.GetPoints()))
for i, p := range w.pb.GetPoints() {
pt, err := models.NewPointFromBytes(p)
if err != nil {
// A error here means that one node created a valid point and sent us an
// unparseable version. We could log and drop the point and allow
// anti-entropy to resolve the discrepancy, but this shouldn't ever happen.
panic(fmt.Sprintf("failed to parse point: `%v`: %v", string(p), err))
}
points[i] = pt
}
return points
}
// SetCode sets the Code
func (w *WriteShardResponse) SetCode(code int) { w.pb.Code = proto.Int32(int32(code)) }
// SetMessage sets the Message
func (w *WriteShardResponse) SetMessage(message string) { w.pb.Message = &message }
// Code returns the Code
func (w *WriteShardResponse) Code() int { return int(w.pb.GetCode()) }
// Message returns the Message
func (w *WriteShardResponse) Message() string { return w.pb.GetMessage() }
// MarshalBinary encodes the object to a binary format.
func (w *WriteShardResponse) MarshalBinary() ([]byte, error) {
return proto.Marshal(&w.pb)
}
// UnmarshalBinary populates WritePointRequest from a binary format.
func (w *WriteShardResponse) UnmarshalBinary(buf []byte) error {
if err := proto.Unmarshal(buf, &w.pb); err != nil {
return err
}
return nil
}
// ExecuteStatementRequest represents the a request to execute a statement on a node.
type ExecuteStatementRequest struct {
pb internal.ExecuteStatementRequest
}
// Statement returns the InfluxQL statement.
func (r *ExecuteStatementRequest) Statement() string { return r.pb.GetStatement() }
// SetStatement sets the InfluxQL statement.
func (r *ExecuteStatementRequest) SetStatement(statement string) {
r.pb.Statement = proto.String(statement)
}
// Database returns the database name.
func (r *ExecuteStatementRequest) Database() string { return r.pb.GetDatabase() }
// SetDatabase sets the database name.
func (r *ExecuteStatementRequest) SetDatabase(database string) { r.pb.Database = proto.String(database) }
// MarshalBinary encodes the object to a binary format.
func (r *ExecuteStatementRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(&r.pb)
}
// UnmarshalBinary populates ExecuteStatementRequest from a binary format.
func (r *ExecuteStatementRequest) UnmarshalBinary(buf []byte) error {
if err := proto.Unmarshal(buf, &r.pb); err != nil {
return err
}
return nil
}
// ExecuteStatementResponse represents the response returned from a remote ExecuteStatementRequest call.
type ExecuteStatementResponse struct {
pb internal.WriteShardResponse
}
// Code returns the response code.
func (w *ExecuteStatementResponse) Code() int { return int(w.pb.GetCode()) }
// SetCode sets the Code
func (w *ExecuteStatementResponse) SetCode(code int) { w.pb.Code = proto.Int32(int32(code)) }
// Message returns the repsonse message.
func (w *ExecuteStatementResponse) Message() string { return w.pb.GetMessage() }
// SetMessage sets the Message
func (w *ExecuteStatementResponse) SetMessage(message string) { w.pb.Message = &message }
// MarshalBinary encodes the object to a binary format.
func (w *ExecuteStatementResponse) MarshalBinary() ([]byte, error) {
return proto.Marshal(&w.pb)
}
// UnmarshalBinary populates ExecuteStatementResponse from a binary format.
func (w *ExecuteStatementResponse) UnmarshalBinary(buf []byte) error {
if err := proto.Unmarshal(buf, &w.pb); err != nil {
return err
}
return nil
}
// CreateIteratorRequest represents a request to create a remote iterator.
type CreateIteratorRequest struct {
ShardIDs []uint64
Opt influxql.IteratorOptions
}
// MarshalBinary encodes r to a binary format.
func (r *CreateIteratorRequest) MarshalBinary() ([]byte, error) {
buf, err := r.Opt.MarshalBinary()
if err != nil {
return nil, err
}
return proto.Marshal(&internal.CreateIteratorRequest{
ShardIDs: r.ShardIDs,
Opt: buf,
})
}
// UnmarshalBinary decodes data into r.
func (r *CreateIteratorRequest) UnmarshalBinary(data []byte) error {
var pb internal.CreateIteratorRequest
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
r.ShardIDs = pb.GetShardIDs()
if err := r.Opt.UnmarshalBinary(pb.GetOpt()); err != nil {
return err
}
return nil
}
// CreateIteratorResponse represents a response from remote iterator creation.
type CreateIteratorResponse struct {
Err error
Type influxql.DataType
Stats influxql.IteratorStats
}
// MarshalBinary encodes r to a binary format.
func (r *CreateIteratorResponse) MarshalBinary() ([]byte, error) {
var pb internal.CreateIteratorResponse
if r.Err != nil {
pb.Err = proto.String(r.Err.Error())
}
pb.Type = proto.Int32(int32(r.Type))
pb.Stats = &internal.IteratorStats{
SeriesN: proto.Int64(int64(r.Stats.SeriesN)),
PointN: proto.Int64(int64(r.Stats.PointN)),
}
return proto.Marshal(&pb)
}
// UnmarshalBinary decodes data into r.
func (r *CreateIteratorResponse) UnmarshalBinary(data []byte) error {
var pb internal.CreateIteratorResponse
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
if pb.Err != nil {
r.Err = errors.New(pb.GetErr())
}
r.Type = influxql.DataType(pb.GetType())
if stats := pb.GetStats(); stats != nil {
r.Stats.SeriesN = int(stats.GetSeriesN())
r.Stats.PointN = int(stats.GetPointN())
}
return nil
}
// FieldDimensionsRequest represents a request to retrieve unique fields & dimensions.
type FieldDimensionsRequest struct {
ShardIDs []uint64
Sources influxql.Sources
}
// MarshalBinary encodes r to a binary format.
func (r *FieldDimensionsRequest) MarshalBinary() ([]byte, error) {
buf, err := r.Sources.MarshalBinary()
if err != nil {
return nil, err
}
return proto.Marshal(&internal.FieldDimensionsRequest{
ShardIDs: r.ShardIDs,
Sources: buf,
})
}
// UnmarshalBinary decodes data into r.
func (r *FieldDimensionsRequest) UnmarshalBinary(data []byte) error {
var pb internal.FieldDimensionsRequest
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
r.ShardIDs = pb.GetShardIDs()
if err := r.Sources.UnmarshalBinary(pb.GetSources()); err != nil {
return err
}
return nil
}
// FieldDimensionsResponse represents a response from remote iterator creation.
type FieldDimensionsResponse struct {
Fields map[string]struct{}
Dimensions map[string]struct{}
Err error
}
// MarshalBinary encodes r to a binary format.
func (r *FieldDimensionsResponse) MarshalBinary() ([]byte, error) {
var pb internal.FieldDimensionsResponse
pb.Fields = make([]string, 0, len(r.Fields))
for k := range r.Fields {
pb.Fields = append(pb.Fields, k)
}
pb.Dimensions = make([]string, 0, len(r.Dimensions))
for k := range r.Dimensions {
pb.Dimensions = append(pb.Dimensions, k)
}
if r.Err != nil {
pb.Err = proto.String(r.Err.Error())
}
return proto.Marshal(&pb)
}
// UnmarshalBinary decodes data into r.
func (r *FieldDimensionsResponse) UnmarshalBinary(data []byte) error {
var pb internal.FieldDimensionsResponse
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
r.Fields = make(map[string]struct{}, len(pb.GetFields()))
for _, s := range pb.GetFields() {
r.Fields[s] = struct{}{}
}
r.Dimensions = make(map[string]struct{}, len(pb.GetDimensions()))
for _, s := range pb.GetDimensions() {
r.Dimensions[s] = struct{}{}
}
if pb.Err != nil {
r.Err = errors.New(pb.GetErr())
}
return nil
}
// SeriesKeysRequest represents a request to retrieve a list of series keys.
type SeriesKeysRequest struct {
ShardIDs []uint64
Opt influxql.IteratorOptions
}
// MarshalBinary encodes r to a binary format.
func (r *SeriesKeysRequest) MarshalBinary() ([]byte, error) {
buf, err := r.Opt.MarshalBinary()
if err != nil {
return nil, err
}
return proto.Marshal(&internal.SeriesKeysRequest{
ShardIDs: r.ShardIDs,
Opt: buf,
})
}
// UnmarshalBinary decodes data into r.
func (r *SeriesKeysRequest) UnmarshalBinary(data []byte) error {
var pb internal.SeriesKeysRequest
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
r.ShardIDs = pb.GetShardIDs()
if err := r.Opt.UnmarshalBinary(pb.GetOpt()); err != nil {
return err
}
return nil
}
// SeriesKeysResponse represents a response from retrieving series keys.
type SeriesKeysResponse struct {
SeriesList influxql.SeriesList
Err error
}
// MarshalBinary encodes r to a binary format.
func (r *SeriesKeysResponse) MarshalBinary() ([]byte, error) {
var pb internal.SeriesKeysResponse
buf, err := r.SeriesList.MarshalBinary()
if err != nil {
return nil, err
}
pb.SeriesList = buf
if r.Err != nil {
pb.Err = proto.String(r.Err.Error())
}
return proto.Marshal(&pb)
}
// UnmarshalBinary decodes data into r.
func (r *SeriesKeysResponse) UnmarshalBinary(data []byte) error {
var pb internal.SeriesKeysResponse
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
if err := r.SeriesList.UnmarshalBinary(pb.GetSeriesList()); err != nil {
return err
}
if pb.Err != nil {
r.Err = errors.New(pb.GetErr())
}
return nil
}
// ExpandSourcesRequest represents a request to expand regex sources.
type ExpandSourcesRequest struct {
ShardIDs []uint64
Sources influxql.Sources
}
// MarshalBinary encodes r to a binary format.
func (r *ExpandSourcesRequest) MarshalBinary() ([]byte, error) {
buf, err := r.Sources.MarshalBinary()
if err != nil {
return nil, err
}
return proto.Marshal(&internal.ExpandSourcesRequest{
ShardIDs: r.ShardIDs,
Sources: buf,
})
}
// UnmarshalBinary decodes data into r.
func (r *ExpandSourcesRequest) UnmarshalBinary(data []byte) error {
var pb internal.ExpandSourcesRequest
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
r.ShardIDs = pb.GetShardIDs()
if err := r.Sources.UnmarshalBinary(pb.GetSources()); err != nil {
return err
}
return nil
}
// ExpandSourcesResponse represents a response from source expansion.
type ExpandSourcesResponse struct {
Sources influxql.Sources
Err error
}
// MarshalBinary encodes r to a binary format.
func (r *ExpandSourcesResponse) MarshalBinary() ([]byte, error) {
var pb internal.ExpandSourcesResponse
buf, err := r.Sources.MarshalBinary()
if err != nil {
return nil, err
}
pb.Sources = buf
if r.Err != nil {
pb.Err = proto.String(r.Err.Error())
}
return proto.Marshal(&pb)
}
// UnmarshalBinary decodes data into r.
func (r *ExpandSourcesResponse) UnmarshalBinary(data []byte) error {
var pb internal.ExpandSourcesResponse
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
if err := r.Sources.UnmarshalBinary(pb.GetSources()); err != nil {
return err
}
if pb.Err != nil {
r.Err = errors.New(pb.GetErr())
}
return nil
}
// RemoteMonitorRequest represents a request to configure a
// monitor.Monitor to write to a remote database.
type RemoteMonitorRequest struct {
pb internal.RemoteMonitorRequest
}
func (m *RemoteMonitorRequest) SetRemoteAddrs(s []string) {
m.pb.RemoteAddrs = s
}
func (m *RemoteMonitorRequest) SetNodeID(s string) {
m.pb.NodeID = &s
}
func (m *RemoteMonitorRequest) SetUsername(s string) {
m.pb.Username = &s
}
func (m *RemoteMonitorRequest) SetPassword(s string) {
m.pb.Password = &s
}
func (m *RemoteMonitorRequest) SetClusterID(v uint64) {
m.pb.ClusterID = &v
}
// MarshalBinary encodes the object to a binary format.
func (r *RemoteMonitorRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(&r.pb)
}
// UnmarshalBinary populates WritePointRequest from a binary format.
func (r *RemoteMonitorRequest) UnmarshalBinary(buf []byte) error {
if err := proto.Unmarshal(buf, &r.pb); err != nil {
return err
}
return nil
}
// RemoteMonitorResponse represents a response from source expansion.
type RemoteMonitorResponse struct {
Err error
}
// MarshalBinary encodes r to a binary format.
func (r *RemoteMonitorResponse) MarshalBinary() ([]byte, error) {
var pb internal.RemoteMonitorResponse
if r.Err != nil {
pb.Err = proto.String(r.Err.Error())
}
return proto.Marshal(&pb)
}
// UnmarshalBinary decodes data into r.
func (r *RemoteMonitorResponse) UnmarshalBinary(data []byte) error {
var pb internal.RemoteMonitorResponse
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
if pb.Err != nil {
r.Err = errors.New(pb.GetErr())
}
return nil
}
// BackupShardRequest represents a request to stream a backup of a single shard.
type BackupShardRequest struct {
ShardID uint64
Since time.Time
}
// MarshalBinary encodes r to a binary format.
func (r *BackupShardRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(&internal.BackupShardRequest{
ShardID: proto.Uint64(r.ShardID),
Since: proto.Int64(r.Since.UnixNano()),
})
}
// UnmarshalBinary decodes data into r.
func (r *BackupShardRequest) UnmarshalBinary(data []byte) error {
var pb internal.BackupShardRequest
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
r.ShardID = pb.GetShardID()
r.Since = time.Unix(0, pb.GetSince())
return nil
}
// CopyShardRequest represents a request to copy a shard from another host.
type CopyShardRequest struct {
Host string
Database string
Policy string
ShardID uint64
Since time.Time
}
// MarshalBinary encodes r to a binary format.
func (r *CopyShardRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(&internal.CopyShardRequest{
Host: proto.String(r.Host),
Database: proto.String(r.Database),
Policy: proto.String(r.Policy),
ShardID: proto.Uint64(r.ShardID),
Since: proto.Int64(r.Since.UnixNano()),
})
}
// UnmarshalBinary decodes data into r.
func (r *CopyShardRequest) UnmarshalBinary(data []byte) error {
var pb internal.CopyShardRequest
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
r.Host = pb.GetHost()
r.Database = pb.GetDatabase()
r.Policy = pb.GetPolicy()
r.ShardID = pb.GetShardID()
r.Since = time.Unix(0, pb.GetSince())
return nil
}
// CopyShardResponse represents a response from a shard Copy.
type CopyShardResponse struct {
Err error
}
func (r *CopyShardResponse) MarshalBinary() ([]byte, error) {
var pb internal.CopyShardResponse
if r.Err != nil {
pb.Err = proto.String(r.Err.Error())
}
return proto.Marshal(&pb)
}
func (r *CopyShardResponse) UnmarshalBinary(data []byte) error {
var pb internal.CopyShardResponse
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
if pb.Err != nil {
r.Err = errors.New(pb.GetErr())
}
return nil
}

View File

@ -1,139 +0,0 @@
package cluster
import (
"errors"
"reflect"
"testing"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/influxdata/influxdb/influxql"
)
func TestWriteShardRequestBinary(t *testing.T) {
sr := &WriteShardRequest{}
sr.SetShardID(uint64(1))
if exp := uint64(1); sr.ShardID() != exp {
t.Fatalf("ShardID mismatch: got %v, exp %v", sr.ShardID(), exp)
}
sr.AddPoint("cpu", 1.0, time.Unix(0, 0), map[string]string{"host": "serverA"})
sr.AddPoint("cpu", 2.0, time.Unix(0, 0).Add(time.Hour), nil)
sr.AddPoint("cpu_load", 3.0, time.Unix(0, 0).Add(time.Hour+time.Second), nil)
b, err := sr.MarshalBinary()
if err != nil {
t.Fatalf("WritePointsRequest.MarshalBinary() failed: %v", err)
}
if len(b) == 0 {
t.Fatalf("WritePointsRequest.MarshalBinary() returned 0 bytes")
}
got := &WriteShardRequest{}
if err := got.UnmarshalBinary(b); err != nil {
t.Fatalf("WritePointsRequest.UnmarshalMarshalBinary() failed: %v", err)
}
if got.ShardID() != sr.ShardID() {
t.Errorf("ShardID mismatch: got %v, exp %v", got.ShardID(), sr.ShardID())
}
if len(got.Points()) != len(sr.Points()) {
t.Errorf("Points count mismatch: got %v, exp %v", len(got.Points()), len(sr.Points()))
}
srPoints := sr.Points()
gotPoints := got.Points()
for i, p := range srPoints {
g := gotPoints[i]
if g.Name() != p.Name() {
t.Errorf("Point %d name mismatch: got %v, exp %v", i, g.Name(), p.Name())
}
if !g.Time().Equal(p.Time()) {
t.Errorf("Point %d time mismatch: got %v, exp %v", i, g.Time(), p.Time())
}
if g.HashID() != p.HashID() {
t.Errorf("Point #%d HashID() mismatch: got %v, exp %v", i, g.HashID(), p.HashID())
}
for k, v := range p.Tags() {
if g.Tags()[k] != v {
t.Errorf("Point #%d tag mismatch: got %v, exp %v", i, k, v)
}
}
if len(p.Fields()) != len(g.Fields()) {
t.Errorf("Point %d field count mismatch: got %v, exp %v", i, len(g.Fields()), len(p.Fields()))
}
for j, f := range p.Fields() {
if g.Fields()[j] != f {
t.Errorf("Point %d field mismatch: got %v, exp %v", i, g.Fields()[j], f)
}
}
}
}
func TestWriteShardResponseBinary(t *testing.T) {
sr := &WriteShardResponse{}
sr.SetCode(10)
sr.SetMessage("foo")
b, err := sr.MarshalBinary()
if exp := 10; sr.Code() != exp {
t.Fatalf("Code mismatch: got %v, exp %v", sr.Code(), exp)
}
if exp := "foo"; sr.Message() != exp {
t.Fatalf("Message mismatch: got %v, exp %v", sr.Message(), exp)
}
if err != nil {
t.Fatalf("WritePointsResponse.MarshalBinary() failed: %v", err)
}
if len(b) == 0 {
t.Fatalf("WritePointsResponse.MarshalBinary() returned 0 bytes")
}
got := &WriteShardResponse{}
if err := got.UnmarshalBinary(b); err != nil {
t.Fatalf("WritePointsResponse.UnmarshalMarshalBinary() failed: %v", err)
}
if got.Code() != sr.Code() {
t.Errorf("Code mismatch: got %v, exp %v", got.Code(), sr.Code())
}
if got.Message() != sr.Message() {
t.Errorf("Message mismatch: got %v, exp %v", got.Message(), sr.Message())
}
}
// Ensure series list response can be marshaled into and out of a binary format.
func TestSeriesKeysResponse_MarshalBinary(t *testing.T) {
resp := &SeriesKeysResponse{
SeriesList: []influxql.Series{
{Name: "cpu", Aux: []influxql.DataType{influxql.Float}},
},
Err: errors.New("marker"),
}
// Marshal to binary.
buf, err := resp.MarshalBinary()
if err != nil {
t.Fatal(err)
}
// Unmarshal back to an object.
var other SeriesKeysResponse
if err := other.UnmarshalBinary(buf); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(&other, resp) {
t.Fatalf("unexpected response: %s", spew.Sdump(other))
}
}

View File

@ -1,843 +0,0 @@
package cluster
import (
"encoding"
"encoding/binary"
"errors"
"expvar"
"fmt"
"io"
"log"
"net"
"os"
"strings"
"sync"
"time"
"github.com/influxdata/influxdb/monitor"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/tsdb"
)
// MaxMessageSize defines how large a message can be before we reject it
const MaxMessageSize = 1024 * 1024 * 1024 // 1GB
// MuxHeader is the header byte used in the TCP mux.
const MuxHeader = 2
// Statistics maintained by the cluster package
const (
writeShardReq = "writeShardReq"
writeShardPointsReq = "writeShardPointsReq"
writeShardFail = "writeShardFail"
createIteratorReq = "createIteratorReq"
createIteratorResp = "createIteratorResp"
fieldDimensionsReq = "fieldDimensionsReq"
fieldDimensionsResp = "fieldDimensionsResp"
seriesKeysReq = "seriesKeysReq"
seriesKeysResp = "seriesKeysResp"
expandSourcesReq = "expandSourcesReq"
expandSourcesReqexpandSourcesResp = "expandSourcesResp"
backupShardReq = "backupShardReq"
backupShardReqbackupShardResp = "backupShardResp"
copyShardReq = "copyShardReq"
copyShardResp = "copyShardResp"
)
const (
writeShardRequestMessage byte = iota + 1
writeShardResponseMessage
executeStatementRequestMessage
executeStatementResponseMessage
createIteratorRequestMessage
createIteratorResponseMessage
fieldDimensionsRequestMessage
fieldDimensionsResponseMessage
seriesKeysRequestMessage
seriesKeysResponseMessage
remoteMonitorRequestMessage
remoteMonitorResponseMessage
expandSourcesRequestMessage
expandSourcesResponseMessage
backupShardRequestMessage
backupShardResponseMessage
copyShardRequestMessage
copyShardResponseMessage
)
// BackupTimeout is the time before a connection times out when performing a backup.
const BackupTimeout = 30 * time.Second
// Service processes data received over raw TCP connections.
type Service struct {
mu sync.RWMutex
wg sync.WaitGroup
closing chan struct{}
Listener net.Listener
TSDBStore TSDBStore
Monitor *monitor.Monitor
Logger *log.Logger
statMap *expvar.Map
}
// NewService returns a new instance of Service.
func NewService(c Config) *Service {
return &Service{
closing: make(chan struct{}),
Logger: log.New(os.Stderr, "[cluster] ", log.LstdFlags),
statMap: influxdb.NewStatistics("cluster", "cluster", nil),
}
}
// Open opens the network listener and begins serving requests.
func (s *Service) Open() error {
s.Logger.Println("Starting cluster service")
// Begin serving conections.
s.wg.Add(1)
go s.serve()
return nil
}
// SetLogOutput sets the writer to which all logs are written. It must not be
// called after Open is called.
func (s *Service) SetLogOutput(w io.Writer) {
s.Logger = log.New(w, "[cluster] ", log.LstdFlags)
}
// serve accepts connections from the listener and handles them.
func (s *Service) serve() {
defer s.wg.Done()
for {
// Check if the service is shutting down.
select {
case <-s.closing:
return
default:
}
// Accept the next connection.
conn, err := s.Listener.Accept()
if err != nil {
if strings.Contains(err.Error(), "connection closed") {
s.Logger.Printf("cluster service accept error: %s", err)
return
}
s.Logger.Printf("accept error: %s", err)
continue
}
// Delegate connection handling to a separate goroutine.
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.handleConn(conn)
}()
}
}
// Close shuts down the listener and waits for all connections to finish.
func (s *Service) Close() error {
if s.Listener != nil {
s.Listener.Close()
}
// Shut down all handlers.
close(s.closing)
s.wg.Wait()
return nil
}
// handleConn services an individual TCP connection.
func (s *Service) handleConn(conn net.Conn) {
// Ensure connection is closed when service is closed.
closing := make(chan struct{})
defer close(closing)
go func() {
select {
case <-closing:
case <-s.closing:
}
conn.Close()
}()
s.Logger.Printf("accept remote connection from %v\n", conn.RemoteAddr())
defer func() {
s.Logger.Printf("close remote connection from %v\n", conn.RemoteAddr())
}()
for {
// Read type-length-value.
typ, err := ReadType(conn)
if err != nil {
if strings.HasSuffix(err.Error(), "EOF") {
return
}
s.Logger.Printf("unable to read type: %s", err)
return
}
// Delegate message processing by type.
switch typ {
case writeShardRequestMessage:
buf, err := ReadLV(conn)
if err != nil {
s.Logger.Printf("unable to read length-value: %s", err)
return
}
s.statMap.Add(writeShardReq, 1)
err = s.processWriteShardRequest(buf)
if err != nil {
s.Logger.Printf("process write shard error: %s", err)
}
s.writeShardResponse(conn, err)
case executeStatementRequestMessage:
buf, err := ReadLV(conn)
if err != nil {
s.Logger.Printf("unable to read length-value: %s", err)
return
}
err = s.processExecuteStatementRequest(buf)
if err != nil {
s.Logger.Printf("process execute statement error: %s", err)
}
s.writeShardResponse(conn, err)
case createIteratorRequestMessage:
s.statMap.Add(createIteratorReq, 1)
s.processCreateIteratorRequest(conn)
return
case fieldDimensionsRequestMessage:
s.statMap.Add(fieldDimensionsReq, 1)
s.processFieldDimensionsRequest(conn)
return
case seriesKeysRequestMessage:
s.statMap.Add(seriesKeysReq, 1)
s.processSeriesKeysRequest(conn)
return
case remoteMonitorRequestMessage:
buf, err := ReadLV(conn)
if err != nil {
s.Logger.Printf("unable to read length-value: %s", err)
return
}
if err = s.processRemoteMonitorRequest(buf); err != nil {
s.Logger.Printf("process write shard error: %s", err)
}
s.writeRemoteMonitorResponse(conn, err)
case expandSourcesRequestMessage:
s.statMap.Add(expandSourcesReq, 1)
s.processExpandSourcesRequest(conn)
return
case backupShardRequestMessage:
s.statMap.Add(backupShardReq, 1)
s.processBackupShardRequest(conn)
return
case copyShardRequestMessage:
s.statMap.Add(copyShardReq, 1)
s.processCopyShardRequest(conn)
return
default:
s.Logger.Printf("cluster service message type not found: %d", typ)
}
}
}
func (s *Service) processExecuteStatementRequest(buf []byte) error {
// Unmarshal the request.
var req ExecuteStatementRequest
if err := req.UnmarshalBinary(buf); err != nil {
return err
}
// Parse the InfluxQL statement.
stmt, err := influxql.ParseStatement(req.Statement())
if err != nil {
return err
}
return s.executeStatement(stmt, req.Database())
}
func (s *Service) executeStatement(stmt influxql.Statement, database string) error {
switch t := stmt.(type) {
case *influxql.DeleteSeriesStatement:
return s.TSDBStore.DeleteSeries(database, t.Sources, t.Condition)
case *influxql.DropDatabaseStatement:
return s.TSDBStore.DeleteDatabase(t.Name)
case *influxql.DropMeasurementStatement:
return s.TSDBStore.DeleteMeasurement(database, t.Name)
case *influxql.DropSeriesStatement:
return s.TSDBStore.DeleteSeries(database, t.Sources, t.Condition)
case *influxql.DropRetentionPolicyStatement:
return s.TSDBStore.DeleteRetentionPolicy(database, t.Name)
case *influxql.DropShardStatement:
return s.TSDBStore.DeleteShard(t.ID)
default:
return fmt.Errorf("%q should not be executed across a cluster", stmt.String())
}
}
func (s *Service) processWriteShardRequest(buf []byte) error {
// Build request
var req WriteShardRequest
if err := req.UnmarshalBinary(buf); err != nil {
return err
}
points := req.Points()
s.statMap.Add(writeShardPointsReq, int64(len(points)))
err := s.TSDBStore.WriteToShard(req.ShardID(), points)
// We may have received a write for a shard that we don't have locally because the
// sending node may have just created the shard (via the metastore) and the write
// arrived before the local store could create the shard. In this case, we need
// to check the metastore to determine what database and retention policy this
// shard should reside within.
if err == tsdb.ErrShardNotFound {
db, rp := req.Database(), req.RetentionPolicy()
if db == "" || rp == "" {
s.Logger.Printf("drop write request: shard=%d. no database or rentention policy received", req.ShardID())
return nil
}
err = s.TSDBStore.CreateShard(req.Database(), req.RetentionPolicy(), req.ShardID())
if err != nil {
s.statMap.Add(writeShardFail, 1)
return fmt.Errorf("create shard %d: %s", req.ShardID(), err)
}
err = s.TSDBStore.WriteToShard(req.ShardID(), points)
if err != nil {
s.statMap.Add(writeShardFail, 1)
return fmt.Errorf("write shard %d: %s", req.ShardID(), err)
}
}
if err != nil {
s.statMap.Add(writeShardFail, 1)
return fmt.Errorf("write shard %d: %s", req.ShardID(), err)
}
return nil
}
func (s *Service) writeShardResponse(w io.Writer, e error) {
// Build response.
var resp WriteShardResponse
if e != nil {
resp.SetCode(1)
resp.SetMessage(e.Error())
} else {
resp.SetCode(0)
}
// Marshal response to binary.
buf, err := resp.MarshalBinary()
if err != nil {
s.Logger.Printf("error marshalling shard response: %s", err)
return
}
// Write to connection.
if err := WriteTLV(w, writeShardResponseMessage, buf); err != nil {
s.Logger.Printf("write shard response error: %s", err)
}
}
func (s *Service) processCreateIteratorRequest(conn net.Conn) {
defer conn.Close()
var itr influxql.Iterator
if err := func() error {
// Parse request.
var req CreateIteratorRequest
if err := DecodeLV(conn, &req); err != nil {
return err
}
sh, ok := s.TSDBStore.(ShardIteratorCreator)
if !ok {
return errors.New("unable to access a specific shard with this tsdb store")
}
// Collect iterator creators for each shard.
ics := make([]influxql.IteratorCreator, 0, len(req.ShardIDs))
for _, shardID := range req.ShardIDs {
ic := sh.ShardIteratorCreator(shardID)
if ic == nil {
continue
}
ics = append(ics, ic)
}
// Return immediately if there are no iterator creators.
if len(ics) == 0 {
return nil
}
// Generate a single iterator from all shards.
i, err := influxql.IteratorCreators(ics).CreateIterator(req.Opt)
if err != nil {
return err
}
itr = i
return nil
}(); err != nil {
s.Logger.Printf("error reading CreateIterator request: %s", err)
EncodeTLV(conn, createIteratorResponseMessage, &CreateIteratorResponse{Err: err})
return
}
resp := CreateIteratorResponse{}
if itr != nil {
switch itr.(type) {
case influxql.FloatIterator:
resp.Type = influxql.Float
case influxql.IntegerIterator:
resp.Type = influxql.Integer
case influxql.StringIterator:
resp.Type = influxql.String
case influxql.BooleanIterator:
resp.Type = influxql.Boolean
}
resp.Stats = itr.Stats()
}
// Encode success response.
if err := EncodeTLV(conn, createIteratorResponseMessage, &resp); err != nil {
s.Logger.Printf("error writing CreateIterator response: %s", err)
return
}
// Exit if no iterator was produced.
if itr == nil {
return
}
// Stream iterator to connection.
if err := influxql.NewIteratorEncoder(conn).EncodeIterator(itr); err != nil {
s.Logger.Printf("error encoding CreateIterator iterator: %s", err)
return
}
}
func (s *Service) processFieldDimensionsRequest(conn net.Conn) {
var fields, dimensions map[string]struct{}
if err := func() error {
// Parse request.
var req FieldDimensionsRequest
if err := DecodeLV(conn, &req); err != nil {
return err
}
sh, ok := s.TSDBStore.(ShardIteratorCreator)
if !ok {
return errors.New("unable to access a specific shard with this tsdb store")
}
// Collect iterator creators for each shard.
ics := make([]influxql.IteratorCreator, 0, len(req.ShardIDs))
for _, shardID := range req.ShardIDs {
ic := sh.ShardIteratorCreator(shardID)
if ic == nil {
return nil
}
ics = append(ics, ic)
}
// Generate a single iterator from all shards.
f, d, err := influxql.IteratorCreators(ics).FieldDimensions(req.Sources)
if err != nil {
return err
}
fields, dimensions = f, d
return nil
}(); err != nil {
s.Logger.Printf("error reading FieldDimensions request: %s", err)
EncodeTLV(conn, fieldDimensionsResponseMessage, &FieldDimensionsResponse{Err: err})
return
}
// Encode success response.
if err := EncodeTLV(conn, fieldDimensionsResponseMessage, &FieldDimensionsResponse{
Fields: fields,
Dimensions: dimensions,
}); err != nil {
s.Logger.Printf("error writing FieldDimensions response: %s", err)
return
}
}
func (s *Service) processSeriesKeysRequest(conn net.Conn) {
var seriesList influxql.SeriesList
if err := func() error {
// Parse request.
var req SeriesKeysRequest
if err := DecodeLV(conn, &req); err != nil {
return err
}
sh, ok := s.TSDBStore.(ShardIteratorCreator)
if !ok {
return errors.New("unable to access a specific shard with this tsdb store")
}
// Collect iterator creators for each shard.
ics := make([]influxql.IteratorCreator, 0, len(req.ShardIDs))
for _, shardID := range req.ShardIDs {
ic := sh.ShardIteratorCreator(shardID)
if ic == nil {
return nil
}
ics = append(ics, ic)
}
// Generate a single iterator from all shards.
a, err := influxql.IteratorCreators(ics).SeriesKeys(req.Opt)
if err != nil {
return err
}
seriesList = a
return nil
}(); err != nil {
s.Logger.Printf("error reading SeriesKeys request: %s", err)
EncodeTLV(conn, seriesKeysResponseMessage, &SeriesKeysResponse{Err: err})
return
}
// Encode success response.
if err := EncodeTLV(conn, seriesKeysResponseMessage, &SeriesKeysResponse{
SeriesList: seriesList,
}); err != nil {
s.Logger.Printf("error writing SeriesKeys response: %s", err)
return
}
}
func (s *Service) processRemoteMonitorRequest(buf []byte) error {
// Unmarshal the request.
var req RemoteMonitorRequest
if err := req.UnmarshalBinary(buf); err != nil {
return err
}
// Process the request
var remoteAddr string
if len(req.pb.GetRemoteAddrs()) > 0 {
remoteAddr = req.pb.GetRemoteAddrs()[0]
}
return s.Monitor.SetRemoteWriter(monitor.RemoteWriterConfig{
RemoteAddr: remoteAddr,
NodeID: req.pb.GetNodeID(),
Username: req.pb.GetUsername(),
Password: req.pb.GetPassword(),
ClusterID: req.pb.GetClusterID(),
})
}
func (s *Service) writeRemoteMonitorResponse(w io.Writer, e error) {
// Build response.
var resp RemoteMonitorResponse
if e != nil {
resp.Err = e
}
// Marshal response to binary.
buf, err := resp.MarshalBinary()
if err != nil {
s.Logger.Printf("error marshalling remote monitor response: %s", err)
return
}
// Write to connection.
if err := WriteTLV(w, remoteMonitorResponseMessage, buf); err != nil {
s.Logger.Printf("write remote monitor response error: %s", err)
}
}
func (s *Service) processExpandSourcesRequest(conn net.Conn) {
var sources influxql.Sources
if err := func() error {
// Parse request.
var req ExpandSourcesRequest
if err := DecodeLV(conn, &req); err != nil {
return err
}
// Collect iterator creators for each shard.
ics := make([]influxql.IteratorCreator, 0, len(req.ShardIDs))
for _, shardID := range req.ShardIDs {
ic := s.TSDBStore.ShardIteratorCreator(shardID)
if ic == nil {
return nil
}
ics = append(ics, ic)
}
// Expand sources from all shards.
a, err := influxql.IteratorCreators(ics).ExpandSources(req.Sources)
if err != nil {
return err
}
sources = a
return nil
}(); err != nil {
s.Logger.Printf("error reading ExpandSources request: %s", err)
EncodeTLV(conn, expandSourcesResponseMessage, &ExpandSourcesResponse{Err: err})
return
}
// Encode success response.
if err := EncodeTLV(conn, expandSourcesResponseMessage, &ExpandSourcesResponse{
Sources: sources,
}); err != nil {
s.Logger.Printf("error writing ExpandSources response: %s", err)
return
}
}
func (s *Service) processBackupShardRequest(conn net.Conn) {
if err := func() error {
// Parse request.
var req BackupShardRequest
if err := DecodeLV(conn, &req); err != nil {
return err
}
// Backup from local shard to the connection.
if err := s.TSDBStore.BackupShard(req.ShardID, req.Since, conn); err != nil {
return err
}
return nil
}(); err != nil {
s.Logger.Printf("error processing BackupShardRequest: %s", err)
return
}
}
func (s *Service) processCopyShardRequest(conn net.Conn) {
if err := func() error {
// Parse request.
var req CopyShardRequest
if err := DecodeLV(conn, &req); err != nil {
return err
}
// Begin streaming backup from remote server.
r, err := s.backupRemoteShard(req.Host, req.ShardID, req.Since)
if err != nil {
return err
}
defer r.Close()
// Create shard if it doesn't exist.
if err := s.TSDBStore.CreateShard(req.Database, req.Policy, req.ShardID); err != nil {
return err
}
// Restore to local shard.
if err := s.TSDBStore.RestoreShard(req.ShardID, r); err != nil {
return err
}
return nil
}(); err != nil {
s.Logger.Printf("error reading CopyShard request: %s", err)
EncodeTLV(conn, copyShardResponseMessage, &CopyShardResponse{Err: err})
return
}
// Encode success response.
if err := EncodeTLV(conn, copyShardResponseMessage, &CopyShardResponse{}); err != nil {
s.Logger.Printf("error writing CopyShard response: %s", err)
return
}
}
// backupRemoteShard connects to a cluster service on a remote host and streams a shard.
func (s *Service) backupRemoteShard(host string, shardID uint64, since time.Time) (io.ReadCloser, error) {
conn, err := net.Dial("tcp", host)
if err != nil {
return nil, err
}
conn.SetDeadline(time.Now().Add(BackupTimeout))
if err := func() error {
// Write the cluster multiplexing header byte
if _, err := conn.Write([]byte{MuxHeader}); err != nil {
return err
}
// Write backup request.
if err := EncodeTLV(conn, backupShardResponseMessage, &BackupShardRequest{
ShardID: shardID,
Since: since,
}); err != nil {
return fmt.Errorf("error writing BackupShardRequest: %s", err)
}
return nil
}(); err != nil {
conn.Close()
return nil, err
}
// Return the connection which will stream the rest of the backup.
return conn, nil
}
// ReadTLV reads a type-length-value record from r.
func ReadTLV(r io.Reader) (byte, []byte, error) {
typ, err := ReadType(r)
if err != nil {
return 0, nil, err
}
buf, err := ReadLV(r)
if err != nil {
return 0, nil, err
}
return typ, buf, err
}
// ReadType reads the type from a TLV record.
func ReadType(r io.Reader) (byte, error) {
var typ [1]byte
if _, err := io.ReadFull(r, typ[:]); err != nil {
return 0, fmt.Errorf("read message type: %s", err)
}
return typ[0], nil
}
// ReadLV reads the length-value from a TLV record.
func ReadLV(r io.Reader) ([]byte, error) {
// Read the size of the message.
var sz int64
if err := binary.Read(r, binary.BigEndian, &sz); err != nil {
return nil, fmt.Errorf("read message size: %s", err)
}
if sz >= MaxMessageSize {
return nil, fmt.Errorf("max message size of %d exceeded: %d", MaxMessageSize, sz)
}
// Read the value.
buf := make([]byte, sz)
if _, err := io.ReadFull(r, buf); err != nil {
return nil, fmt.Errorf("read message value: %s", err)
}
return buf, nil
}
// WriteTLV writes a type-length-value record to w.
func WriteTLV(w io.Writer, typ byte, buf []byte) error {
if err := WriteType(w, typ); err != nil {
return err
}
if err := WriteLV(w, buf); err != nil {
return err
}
return nil
}
// WriteType writes the type in a TLV record to w.
func WriteType(w io.Writer, typ byte) error {
if _, err := w.Write([]byte{typ}); err != nil {
return fmt.Errorf("write message type: %s", err)
}
return nil
}
// WriteLV writes the length-value in a TLV record to w.
func WriteLV(w io.Writer, buf []byte) error {
// Write the size of the message.
if err := binary.Write(w, binary.BigEndian, int64(len(buf))); err != nil {
return fmt.Errorf("write message size: %s", err)
}
// Write the value.
if _, err := w.Write(buf); err != nil {
return fmt.Errorf("write message value: %s", err)
}
return nil
}
// EncodeTLV encodes v to a binary format and writes the record-length-value record to w.
func EncodeTLV(w io.Writer, typ byte, v encoding.BinaryMarshaler) error {
if err := WriteType(w, typ); err != nil {
return err
}
if err := EncodeLV(w, v); err != nil {
return err
}
return nil
}
// EncodeLV encodes v to a binary format and writes the length-value record to w.
func EncodeLV(w io.Writer, v encoding.BinaryMarshaler) error {
buf, err := v.MarshalBinary()
if err != nil {
return err
}
if err := WriteLV(w, buf); err != nil {
return err
}
return nil
}
// DecodeTLV reads the type-length-value record from r and unmarshals it into v.
func DecodeTLV(r io.Reader, v encoding.BinaryUnmarshaler) (typ byte, err error) {
typ, err = ReadType(r)
if err != nil {
return 0, err
}
if err := DecodeLV(r, v); err != nil {
return 0, err
}
return typ, nil
}
// DecodeLV reads the length-value record from r and unmarshals it into v.
func DecodeLV(r io.Reader, v encoding.BinaryUnmarshaler) error {
buf, err := ReadLV(r)
if err != nil {
return err
}
if err := v.UnmarshalBinary(buf); err != nil {
return err
}
return nil
}

View File

@ -1,174 +0,0 @@
package cluster_test
import (
"fmt"
"io"
"net"
"time"
"github.com/influxdata/influxdb/cluster"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tcp"
)
type metaClient struct {
host string
}
func (m *metaClient) DataNode(nodeID uint64) (*meta.NodeInfo, error) {
return &meta.NodeInfo{
ID: nodeID,
TCPHost: m.host,
}, nil
}
func (m *metaClient) ShardOwner(shardID uint64) (db, rp string, sgi *meta.ShardGroupInfo) {
return "db", "rp", &meta.ShardGroupInfo{}
}
type testService struct {
nodeID uint64
ln net.Listener
muxln net.Listener
responses chan *serviceResponse
TSDBStore TSDBStore
}
func newTestWriteService(f func(shardID uint64, points []models.Point) error) testService {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
panic(err)
}
mux := tcp.NewMux()
muxln := mux.Listen(cluster.MuxHeader)
go mux.Serve(ln)
s := testService{
ln: ln,
muxln: muxln,
}
s.TSDBStore.WriteToShardFn = f
s.responses = make(chan *serviceResponse, 1024)
return s
}
func (ts *testService) Close() {
if ts.ln != nil {
ts.ln.Close()
}
}
type serviceResponses []serviceResponse
type serviceResponse struct {
shardID uint64
ownerID uint64
points []models.Point
}
func (ts *testService) writeShardSuccess(shardID uint64, points []models.Point) error {
ts.responses <- &serviceResponse{
shardID: shardID,
points: points,
}
return nil
}
func writeShardFail(shardID uint64, points []models.Point) error {
return fmt.Errorf("failed to write")
}
func writeShardSlow(shardID uint64, points []models.Point) error {
time.Sleep(1 * time.Second)
return nil
}
func (ts *testService) ResponseN(n int) ([]*serviceResponse, error) {
var a []*serviceResponse
for {
select {
case r := <-ts.responses:
a = append(a, r)
if len(a) == n {
return a, nil
}
case <-time.After(time.Second):
return a, fmt.Errorf("unexpected response count: expected: %d, actual: %d", n, len(a))
}
}
}
// Service is a test wrapper for cluster.Service.
type Service struct {
*cluster.Service
ln net.Listener
TSDBStore TSDBStore
}
// NewService returns a new instance of Service.
func NewService() *Service {
s := &Service{
Service: cluster.NewService(cluster.Config{}),
}
s.Service.TSDBStore = &s.TSDBStore
return s
}
// MustOpenService returns a new, open service on a random port. Panic on error.
func MustOpenService() *Service {
s := NewService()
s.ln = MustListen("tcp", "127.0.0.1:0")
s.Listener = &muxListener{s.ln}
if err := s.Open(); err != nil {
panic(err)
}
return s
}
// Close closes the listener and waits for the service to close.
func (s *Service) Close() error {
if s.ln != nil {
s.ln.Close()
}
return s.Service.Close()
}
// Addr returns the network address of the service.
func (s *Service) Addr() net.Addr { return s.ln.Addr() }
// muxListener is a net.Listener implementation that strips off the first byte.
// This is used to simulate the listener from pkg/mux.
type muxListener struct {
net.Listener
}
// Accept accepts the next connection and removes the first byte.
func (ln *muxListener) Accept() (net.Conn, error) {
conn, err := ln.Listener.Accept()
if err != nil {
return nil, err
}
var buf [1]byte
if _, err := io.ReadFull(conn, buf[:]); err != nil {
conn.Close()
return nil, err
} else if buf[0] != cluster.MuxHeader {
conn.Close()
panic(fmt.Sprintf("unexpected mux header byte: %d", buf[0]))
}
return conn, nil
}
// MustListen opens a listener. Panic on error.
func MustListen(network, laddr string) net.Listener {
ln, err := net.Listen(network, laddr)
if err != nil {
panic(err)
}
return ln
}

View File

@ -14,7 +14,7 @@ import (
"time"
"github.com/BurntSushi/toml"
"github.com/influxdata/influxdb/cluster"
"github.com/influxdata/influxdb/coordinator"
"github.com/influxdata/influxdb/monitor"
"github.com/influxdata/influxdb/services/admin"
"github.com/influxdata/influxdb/services/collectd"
@ -41,11 +41,11 @@ const (
// Config represents the configuration format for the influxd binary.
type Config struct {
Meta *meta.Config `toml:"meta"`
Data tsdb.Config `toml:"data"`
Cluster cluster.Config `toml:"cluster"`
Retention retention.Config `toml:"retention"`
Precreator precreator.Config `toml:"shard-precreation"`
Meta *meta.Config `toml:"meta"`
Data tsdb.Config `toml:"data"`
Coordinator coordinator.Config `toml:"coordinator"`
Retention retention.Config `toml:"retention"`
Precreator precreator.Config `toml:"shard-precreation"`
Admin admin.Config `toml:"admin"`
Monitor monitor.Config `toml:"monitor"`
@ -76,7 +76,7 @@ func NewConfig() *Config {
c := &Config{}
c.Meta = meta.NewConfig()
c.Data = tsdb.NewConfig()
c.Cluster = cluster.NewConfig()
c.Coordinator = coordinator.NewConfig()
c.Precreator = precreator.NewConfig()
c.Admin = admin.NewConfig()
@ -140,6 +140,16 @@ func (c *Config) FromToml(input string) error {
log.Printf("deprecated config option %s replaced with %s; %s will not be supported in a future release\n", in, out, in)
return out
})
// Replace deprecated [cluster] with [coordinator]
re = regexp.MustCompile(`(?m)^\s*\[(cluster)\]`)
input = re.ReplaceAllStringFunc(input, func(in string) string {
in = strings.TrimSpace(in)
out := "[coordinator]"
log.Printf("deprecated config option %s replaced with %s; %s will not be supported in a future release\n", in, out, in)
return out
})
_, err := toml.Decode(input, c)
return err
}

View File

@ -252,6 +252,9 @@ bind-address = ":1000"
[opentsdb]
bind-address = ":2000"
[cluster]
max-select-point = 100
`); err != nil {
t.Fatal(err)
}
@ -261,5 +264,8 @@ bind-address = ":2000"
t.Fatalf("unexpected collectd bind address: %s", c.CollectdInputs[0].BindAddress)
} else if c.OpenTSDBInputs[0].BindAddress != ":2000" {
t.Fatalf("unexpected opentsdb bind address: %s", c.OpenTSDBInputs[0].BindAddress)
} else if c.Coordinator.MaxSelectPointN != 100 {
t.Fatalf("unexpected coordinator max select points: %s", c.Coordinator.MaxSelectPointN)
}
}

View File

@ -12,7 +12,7 @@ import (
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/cluster"
"github.com/influxdata/influxdb/coordinator"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/monitor"
@ -58,13 +58,12 @@ type Server struct {
TSDBStore *tsdb.Store
QueryExecutor *influxql.QueryExecutor
PointsWriter *cluster.PointsWriter
PointsWriter *coordinator.PointsWriter
Subscriber *subscriber.Service
Services []Service
// These references are required for the tcp muxer.
ClusterService *cluster.Service
SnapshotterService *snapshotter.Service
Monitor *monitor.Monitor
@ -165,25 +164,25 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
s.Subscriber = subscriber.NewService(c.Subscriber)
// Initialize points writer.
s.PointsWriter = cluster.NewPointsWriter()
s.PointsWriter.WriteTimeout = time.Duration(c.Cluster.WriteTimeout)
s.PointsWriter = coordinator.NewPointsWriter()
s.PointsWriter.WriteTimeout = time.Duration(c.Coordinator.WriteTimeout)
s.PointsWriter.TSDBStore = s.TSDBStore
s.PointsWriter.Subscriber = s.Subscriber
// Initialize query executor.
s.QueryExecutor = influxql.NewQueryExecutor()
s.QueryExecutor.StatementExecutor = &cluster.StatementExecutor{
s.QueryExecutor.StatementExecutor = &coordinator.StatementExecutor{
MetaClient: s.MetaClient,
TSDBStore: cluster.LocalTSDBStore{Store: s.TSDBStore},
TSDBStore: coordinator.LocalTSDBStore{Store: s.TSDBStore},
Monitor: s.Monitor,
PointsWriter: s.PointsWriter,
MaxSelectPointN: c.Cluster.MaxSelectPointN,
MaxSelectSeriesN: c.Cluster.MaxSelectSeriesN,
MaxSelectBucketsN: c.Cluster.MaxSelectBucketsN,
MaxSelectPointN: c.Coordinator.MaxSelectPointN,
MaxSelectSeriesN: c.Coordinator.MaxSelectSeriesN,
MaxSelectBucketsN: c.Coordinator.MaxSelectBucketsN,
}
s.QueryExecutor.QueryTimeout = time.Duration(c.Cluster.QueryTimeout)
s.QueryExecutor.LogQueriesAfter = time.Duration(c.Cluster.LogQueriesAfter)
s.QueryExecutor.MaxConcurrentQueries = c.Cluster.MaxConcurrentQueries
s.QueryExecutor.QueryTimeout = time.Duration(c.Coordinator.QueryTimeout)
s.QueryExecutor.LogQueriesAfter = time.Duration(c.Coordinator.LogQueriesAfter)
s.QueryExecutor.MaxConcurrentQueries = c.Coordinator.MaxConcurrentQueries
if c.Data.QueryLogEnabled {
s.QueryExecutor.Logger = log.New(os.Stderr, "[query] ", log.LstdFlags)
}
@ -197,14 +196,6 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
return s, nil
}
func (s *Server) appendClusterService(c cluster.Config) {
srv := cluster.NewService(c)
srv.TSDBStore = cluster.LocalTSDBStore{Store: s.TSDBStore}
srv.Monitor = s.Monitor
s.Services = append(s.Services, srv)
s.ClusterService = srv
}
func (s *Server) appendSnapshotterService() {
srv := snapshotter.NewService()
srv.TSDBStore = s.TSDBStore
@ -241,7 +232,6 @@ func (s *Server) Open() error {
// Append services.
s.appendMonitorService()
s.appendClusterService(s.config.Cluster)
s.appendPrecreatorService(s.config.Precreator)
s.appendSnapshotterService()
s.appendAdminService(s.config.Admin)
@ -270,7 +260,6 @@ func (s *Server) Open() error {
s.PointsWriter.MetaClient = s.MetaClient
s.Monitor.MetaClient = s.MetaClient
s.ClusterService.Listener = mux.Listen(cluster.MuxHeader)
s.SnapshotterService.Listener = mux.Listen(snapshotter.MuxHeader)
// Configure logging for all services and clients.
@ -283,7 +272,6 @@ func (s *Server) Open() error {
for _, svc := range s.Services {
svc.SetLogOutput(w)
}
s.ClusterService.SetLogOutput(w)
s.SnapshotterService.SetLogOutput(w)
s.Monitor.SetLogOutput(w)
@ -502,12 +490,12 @@ type tcpaddr struct{ host string }
func (a *tcpaddr) Network() string { return "tcp" }
func (a *tcpaddr) String() string { return a.host }
// monitorPointsWriter is a wrapper around `cluster.PointsWriter` that helps
// monitorPointsWriter is a wrapper around `coordinator.PointsWriter` that helps
// to prevent a circular dependency between the `cluster` and `monitor` packages.
type monitorPointsWriter cluster.PointsWriter
type monitorPointsWriter coordinator.PointsWriter
func (pw *monitorPointsWriter) WritePoints(database, retentionPolicy string, points models.Points) error {
return (*cluster.PointsWriter)(pw).WritePoints(database, retentionPolicy, models.ConsistencyLevelAny, points)
return (*coordinator.PointsWriter)(pw).WritePoints(database, retentionPolicy, models.ConsistencyLevelAny, points)
}
func (s *Server) remoteAddr(addr string) string {

View File

@ -230,8 +230,7 @@ func NewConfig() *run.Config {
c := run.NewConfig()
c.BindAddress = "127.0.0.1:0"
c.ReportingDisabled = true
c.Cluster.ShardWriterTimeout = toml.Duration(30 * time.Second)
c.Cluster.WriteTimeout = toml.Duration(30 * time.Second)
c.Coordinator.WriteTimeout = toml.Duration(30 * time.Second)
c.Meta.Dir = MustTempFile()
if !testing.Verbose() {

View File

@ -9,7 +9,7 @@ import (
"testing"
"time"
"github.com/influxdata/influxdb/cluster"
"github.com/influxdata/influxdb/coordinator"
"github.com/influxdata/influxdb/models"
)
@ -6097,7 +6097,7 @@ func TestServer_ConcurrentPointsWriter_Subscriber(t *testing.T) {
case <-done:
return
default:
wpr := &cluster.WritePointsRequest{
wpr := &coordinator.WritePointsRequest{
Database: "db0",
RetentionPolicy: "rp0",
}

47
coordinator/config.go Normal file
View File

@ -0,0 +1,47 @@
package coordinator
import (
"time"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/toml"
)
const (
// DefaultWriteTimeout is the default timeout for a complete write to succeed.
DefaultWriteTimeout = 10 * time.Second
// DefaultMaxConcurrentQueries is the maximum number of running queries.
// A value of zero will make the maximum query limit unlimited.
DefaultMaxConcurrentQueries = 0
// DefaultMaxSelectPointN is the maximum number of points a SELECT can process.
// A value of zero will make the maximum point count unlimited.
DefaultMaxSelectPointN = 0
// DefaultMaxSelectSeriesN is the maximum number of series a SELECT can run.
// A value of zero will make the maximum series count unlimited.
DefaultMaxSelectSeriesN = 0
)
// Config represents the configuration for the clustering service.
type Config struct {
WriteTimeout toml.Duration `toml:"write-timeout"`
MaxConcurrentQueries int `toml:"max-concurrent-queries"`
QueryTimeout toml.Duration `toml:"query-timeout"`
LogQueriesAfter toml.Duration `toml:"log-queries-after"`
MaxSelectPointN int `toml:"max-select-point"`
MaxSelectSeriesN int `toml:"max-select-series"`
MaxSelectBucketsN int `toml:"max-select-buckets"`
}
// NewConfig returns an instance of Config with defaults.
func NewConfig() Config {
return Config{
WriteTimeout: toml.Duration(DefaultWriteTimeout),
QueryTimeout: toml.Duration(influxql.DefaultQueryTimeout),
MaxConcurrentQueries: DefaultMaxConcurrentQueries,
MaxSelectPointN: DefaultMaxSelectPointN,
MaxSelectSeriesN: DefaultMaxSelectSeriesN,
}
}

View File

@ -1,27 +1,24 @@
package cluster_test
package coordinator_test
import (
"testing"
"time"
"github.com/BurntSushi/toml"
"github.com/influxdata/influxdb/cluster"
"github.com/influxdata/influxdb/coordinator"
)
func TestConfig_Parse(t *testing.T) {
// Parse configuration.
var c cluster.Config
var c coordinator.Config
if _, err := toml.Decode(`
shard-writer-timeout = "10s"
write-timeout = "20s"
`, &c); err != nil {
t.Fatal(err)
}
// Validate configuration.
if time.Duration(c.ShardWriterTimeout) != 10*time.Second {
t.Fatalf("unexpected shard-writer timeout: %s", c.ShardWriterTimeout)
} else if time.Duration(c.WriteTimeout) != 20*time.Second {
if time.Duration(c.WriteTimeout) != 20*time.Second {
t.Fatalf("unexpected write timeout s: %s", c.WriteTimeout)
}
}

View File

@ -1,4 +1,4 @@
package cluster
package coordinator
import (
"time"

View File

@ -1,4 +1,4 @@
package cluster_test
package coordinator_test
import (
"time"

View File

@ -1,4 +1,4 @@
package cluster
package coordinator
import (
"errors"
@ -79,6 +79,24 @@ type PointsWriter struct {
statMap *expvar.Map
}
// WritePointsRequest represents a request to write point data to the cluster
type WritePointsRequest struct {
Database string
RetentionPolicy string
Points []models.Point
}
// AddPoint adds a point to the WritePointRequest with field key 'value'
func (w *WritePointsRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string) {
pt, err := models.NewPoint(
name, tags, map[string]interface{}{"value": value}, timestamp,
)
if err != nil {
return
}
w.Points = append(w.Points, pt)
}
// NewPointsWriter returns a new instance of PointsWriter for a node.
func NewPointsWriter() *PointsWriter {
return &PointsWriter{
@ -234,10 +252,16 @@ func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistency
w.statMap.Add(statSubWriteDrop, 1)
}
timeout := time.NewTimer(w.WriteTimeout)
defer timeout.Stop()
for range shardMappings.Points {
select {
case <-w.closing:
return ErrWriteFailed
case <-timeout.C:
w.statMap.Add(statWriteTimeout, 1)
// return timeout error to caller
return ErrTimeout
case err := <-ch:
if err != nil {
return err

View File

@ -1,4 +1,4 @@
package cluster_test
package coordinator_test
import (
"fmt"
@ -9,7 +9,7 @@ import (
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/cluster"
"github.com/influxdata/influxdb/coordinator"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
)
@ -30,15 +30,15 @@ func TestPointsWriter_MapShards_One(t *testing.T) {
return &rp.ShardGroups[0], nil
}
c := cluster.PointsWriter{MetaClient: ms}
pr := &cluster.WritePointsRequest{
c := coordinator.PointsWriter{MetaClient: ms}
pr := &coordinator.WritePointsRequest{
Database: "mydb",
RetentionPolicy: "myrp",
}
pr.AddPoint("cpu", 1.0, time.Now(), nil)
var (
shardMappings *cluster.ShardMapping
shardMappings *coordinator.ShardMapping
err error
)
if shardMappings, err = c.MapShards(pr); err != nil {
@ -79,8 +79,8 @@ func TestPointsWriter_MapShards_Multiple(t *testing.T) {
panic("should not get here")
}
c := cluster.PointsWriter{MetaClient: ms}
pr := &cluster.WritePointsRequest{
c := coordinator.PointsWriter{MetaClient: ms}
pr := &coordinator.WritePointsRequest{
Database: "mydb",
RetentionPolicy: "myrp",
}
@ -92,7 +92,7 @@ func TestPointsWriter_MapShards_Multiple(t *testing.T) {
pr.AddPoint("cpu", 3.0, time.Unix(0, 0).Add(time.Hour+time.Second), nil)
var (
shardMappings *cluster.ShardMapping
shardMappings *coordinator.ShardMapping
err error
)
if shardMappings, err = c.MapShards(pr); err != nil {
@ -150,7 +150,7 @@ func TestPointsWriter_WritePoints(t *testing.T) {
for _, test := range tests {
pr := &cluster.WritePointsRequest{
pr := &coordinator.WritePointsRequest{
Database: test.database,
RetentionPolicy: test.retentionPolicy,
}
@ -163,7 +163,7 @@ func TestPointsWriter_WritePoints(t *testing.T) {
// copy to prevent data race
theTest := test
sm := cluster.NewShardMapping()
sm := coordinator.NewShardMapping()
sm.MapPoint(
&meta.ShardInfo{ID: uint64(1), Owners: []meta.ShardOwner{
{NodeID: 1},
@ -186,7 +186,7 @@ func TestPointsWriter_WritePoints(t *testing.T) {
}},
pr.Points[2])
// Local cluster.Node ShardWriter
// Local coordinator.Node ShardWriter
// lock on the write increment since these functions get called in parallel
var mu sync.Mutex
sw := &fakeShardWriter{
@ -217,13 +217,13 @@ func TestPointsWriter_WritePoints(t *testing.T) {
}
ms.NodeIDFn = func() uint64 { return 1 }
subPoints := make(chan *cluster.WritePointsRequest, 1)
subPoints := make(chan *coordinator.WritePointsRequest, 1)
sub := Subscriber{}
sub.PointsFn = func() chan<- *cluster.WritePointsRequest {
sub.PointsFn = func() chan<- *coordinator.WritePointsRequest {
return subPoints
}
c := cluster.NewPointsWriter()
c := coordinator.NewPointsWriter()
c.MetaClient = ms
c.ShardWriter = sw
c.TSDBStore = store
@ -337,10 +337,10 @@ func (m PointsWriterMetaClient) ShardOwner(shardID uint64) (string, string, *met
}
type Subscriber struct {
PointsFn func() chan<- *cluster.WritePointsRequest
PointsFn func() chan<- *coordinator.WritePointsRequest
}
func (s Subscriber) Points() chan<- *cluster.WritePointsRequest {
func (s Subscriber) Points() chan<- *coordinator.WritePointsRequest {
return s.PointsFn()
}

View File

@ -1,4 +1,4 @@
package cluster
package coordinator
import (
"bytes"

View File

@ -1,4 +1,4 @@
package cluster_test
package coordinator_test
import (
"bytes"
@ -11,7 +11,7 @@ import (
"time"
"github.com/davecgh/go-spew/spew"
"github.com/influxdata/influxdb/cluster"
"github.com/influxdata/influxdb/coordinator"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
@ -159,13 +159,13 @@ func TestQueryExecutor_ExecuteQuery_MaxSelectBucketsN(t *testing.T) {
}
}
// QueryExecutor is a test wrapper for cluster.QueryExecutor.
// QueryExecutor is a test wrapper for coordinator.QueryExecutor.
type QueryExecutor struct {
*influxql.QueryExecutor
MetaClient MetaClient
TSDBStore TSDBStore
StatementExecutor *cluster.StatementExecutor
StatementExecutor *coordinator.StatementExecutor
LogOutput bytes.Buffer
}
@ -175,7 +175,7 @@ func NewQueryExecutor() *QueryExecutor {
e := &QueryExecutor{
QueryExecutor: influxql.NewQueryExecutor(),
}
e.StatementExecutor = &cluster.StatementExecutor{
e.StatementExecutor = &coordinator.StatementExecutor{
MetaClient: &e.MetaClient,
TSDBStore: &e.TSDBStore,
}
@ -202,7 +202,7 @@ func (e *QueryExecutor) ExecuteQuery(query, database string, chunkSize int) <-ch
return e.QueryExecutor.ExecuteQuery(MustParseQuery(query), database, chunkSize, false, make(chan struct{}))
}
// TSDBStore is a mockable implementation of cluster.TSDBStore.
// TSDBStore is a mockable implementation of coordinator.TSDBStore.
type TSDBStore struct {
CreateShardFn func(database, policy string, shardID uint64) error
WriteToShardFn func(shardID uint64, points []models.Point) error

View File

@ -9,7 +9,7 @@ import (
"testing"
"time"
"github.com/influxdata/influxdb/cluster"
"github.com/influxdata/influxdb/coordinator"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
@ -395,7 +395,7 @@ func (ms *MetaClient) AcquireLease(name string) (l *meta.Lease, err error) {
return nil, meta.ErrServiceUnavailable
}
// Databases returns a list of database info about each database in the cluster.
// Databases returns a list of database info about each database in the coordinator.
func (ms *MetaClient) Databases() []meta.DatabaseInfo {
ms.mu.RLock()
defer ms.mu.RUnlock()
@ -506,7 +506,7 @@ func NewQueryExecutor(t *testing.T) *QueryExecutor {
// PointsWriter is a mock points writer.
type PointsWriter struct {
WritePointsFn func(p *cluster.WritePointsRequest) error
WritePointsFn func(p *coordinator.WritePointsRequest) error
Err error
PointsPerSecond int
t *testing.T
@ -521,7 +521,7 @@ func NewPointsWriter(t *testing.T) *PointsWriter {
}
// WritePoints mocks writing points.
func (pw *PointsWriter) WritePoints(p *cluster.WritePointsRequest) error {
func (pw *PointsWriter) WritePoints(p *coordinator.WritePointsRequest) error {
// If the test set a callback, call it.
if pw.WritePointsFn != nil {
if err := pw.WritePointsFn(p); err != nil {

View File

@ -11,7 +11,7 @@ import (
"sync"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/cluster"
"github.com/influxdata/influxdb/coordinator"
"github.com/influxdata/influxdb/services/meta"
)
@ -24,7 +24,7 @@ const (
// PointsWriter is an interface for writing points to a subscription destination.
// Only WritePoints() needs to be satisfied.
type PointsWriter interface {
WritePoints(p *cluster.WritePointsRequest) error
WritePoints(p *coordinator.WritePointsRequest) error
}
// unique set that identifies a given subscription
@ -46,7 +46,7 @@ type Service struct {
NewPointsWriter func(u url.URL) (PointsWriter, error)
Logger *log.Logger
statMap *expvar.Map
points chan *cluster.WritePointsRequest
points chan *coordinator.WritePointsRequest
wg sync.WaitGroup
closed bool
closing chan struct{}
@ -60,7 +60,7 @@ func NewService(c Config) *Service {
NewPointsWriter: newPointsWriter,
Logger: log.New(os.Stderr, "[subscriber] ", log.LstdFlags),
statMap: influxdb.NewStatistics("subscriber", "subscriber", nil),
points: make(chan *cluster.WritePointsRequest),
points: make(chan *coordinator.WritePointsRequest),
closed: true,
closing: make(chan struct{}),
}
@ -214,7 +214,7 @@ func (s *Service) createSubscription(se subEntry, mode string, destinations []st
}
// Points returns a channel into which write point requests can be sent.
func (s *Service) Points() chan<- *cluster.WritePointsRequest {
func (s *Service) Points() chan<- *coordinator.WritePointsRequest {
return s.points
}
@ -253,7 +253,7 @@ type balancewriter struct {
i int
}
func (b *balancewriter) WritePoints(p *cluster.WritePointsRequest) error {
func (b *balancewriter) WritePoints(p *coordinator.WritePointsRequest) error {
var lastErr error
for range b.writers {
// round robin through destinations.

View File

@ -5,7 +5,7 @@ import (
"testing"
"time"
"github.com/influxdata/influxdb/cluster"
"github.com/influxdata/influxdb/coordinator"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/services/subscriber"
)
@ -24,10 +24,10 @@ func (m MetaClient) WaitForDataChanged() chan struct{} {
}
type Subscription struct {
WritePointsFn func(*cluster.WritePointsRequest) error
WritePointsFn func(*coordinator.WritePointsRequest) error
}
func (s Subscription) WritePoints(p *cluster.WritePointsRequest) error {
func (s Subscription) WritePoints(p *coordinator.WritePointsRequest) error {
return s.WritePointsFn(p)
}
@ -53,11 +53,11 @@ func TestService_IgnoreNonMatch(t *testing.T) {
}
}
prs := make(chan *cluster.WritePointsRequest, 2)
prs := make(chan *coordinator.WritePointsRequest, 2)
urls := make(chan url.URL, 2)
newPointsWriter := func(u url.URL) (subscriber.PointsWriter, error) {
sub := Subscription{}
sub.WritePointsFn = func(p *cluster.WritePointsRequest) error {
sub.WritePointsFn = func(p *coordinator.WritePointsRequest) error {
prs <- p
return nil
}
@ -88,11 +88,11 @@ func TestService_IgnoreNonMatch(t *testing.T) {
}
// Write points that don't match any subscription.
s.Points() <- &cluster.WritePointsRequest{
s.Points() <- &coordinator.WritePointsRequest{
Database: "db1",
RetentionPolicy: "rp0",
}
s.Points() <- &cluster.WritePointsRequest{
s.Points() <- &coordinator.WritePointsRequest{
Database: "db0",
RetentionPolicy: "rp2",
}
@ -128,11 +128,11 @@ func TestService_ModeALL(t *testing.T) {
}
}
prs := make(chan *cluster.WritePointsRequest, 2)
prs := make(chan *coordinator.WritePointsRequest, 2)
urls := make(chan url.URL, 2)
newPointsWriter := func(u url.URL) (subscriber.PointsWriter, error) {
sub := Subscription{}
sub.WritePointsFn = func(p *cluster.WritePointsRequest) error {
sub.WritePointsFn = func(p *coordinator.WritePointsRequest) error {
prs <- p
return nil
}
@ -163,7 +163,7 @@ func TestService_ModeALL(t *testing.T) {
}
// Write points that match subscription with mode ALL
expPR := &cluster.WritePointsRequest{
expPR := &coordinator.WritePointsRequest{
Database: "db0",
RetentionPolicy: "rp0",
}
@ -171,7 +171,7 @@ func TestService_ModeALL(t *testing.T) {
// Should get pr back twice
for i := 0; i < 2; i++ {
var pr *cluster.WritePointsRequest
var pr *coordinator.WritePointsRequest
select {
case pr = <-prs:
case <-time.After(10 * time.Millisecond):
@ -206,11 +206,11 @@ func TestService_ModeANY(t *testing.T) {
}
}
prs := make(chan *cluster.WritePointsRequest, 2)
prs := make(chan *coordinator.WritePointsRequest, 2)
urls := make(chan url.URL, 2)
newPointsWriter := func(u url.URL) (subscriber.PointsWriter, error) {
sub := Subscription{}
sub.WritePointsFn = func(p *cluster.WritePointsRequest) error {
sub.WritePointsFn = func(p *coordinator.WritePointsRequest) error {
prs <- p
return nil
}
@ -240,14 +240,14 @@ func TestService_ModeANY(t *testing.T) {
}
}
// Write points that match subscription with mode ANY
expPR := &cluster.WritePointsRequest{
expPR := &coordinator.WritePointsRequest{
Database: "db0",
RetentionPolicy: "rp0",
}
s.Points() <- expPR
// Validate we get the pr back just once
var pr *cluster.WritePointsRequest
var pr *coordinator.WritePointsRequest
select {
case pr = <-prs:
case <-time.After(10 * time.Millisecond):
@ -294,11 +294,11 @@ func TestService_Multiple(t *testing.T) {
}
}
prs := make(chan *cluster.WritePointsRequest, 4)
prs := make(chan *coordinator.WritePointsRequest, 4)
urls := make(chan url.URL, 4)
newPointsWriter := func(u url.URL) (subscriber.PointsWriter, error) {
sub := Subscription{}
sub.WritePointsFn = func(p *cluster.WritePointsRequest) error {
sub.WritePointsFn = func(p *coordinator.WritePointsRequest) error {
prs <- p
return nil
}
@ -329,24 +329,24 @@ func TestService_Multiple(t *testing.T) {
}
// Write points that don't match any subscription.
s.Points() <- &cluster.WritePointsRequest{
s.Points() <- &coordinator.WritePointsRequest{
Database: "db1",
RetentionPolicy: "rp0",
}
s.Points() <- &cluster.WritePointsRequest{
s.Points() <- &coordinator.WritePointsRequest{
Database: "db0",
RetentionPolicy: "rp2",
}
// Write points that match subscription with mode ANY
expPR := &cluster.WritePointsRequest{
expPR := &coordinator.WritePointsRequest{
Database: "db0",
RetentionPolicy: "rp0",
}
s.Points() <- expPR
// Validate we get the pr back just once
var pr *cluster.WritePointsRequest
var pr *coordinator.WritePointsRequest
select {
case pr = <-prs:
case <-time.After(10 * time.Millisecond):
@ -364,7 +364,7 @@ func TestService_Multiple(t *testing.T) {
}
// Write points that match subscription with mode ALL
expPR = &cluster.WritePointsRequest{
expPR = &coordinator.WritePointsRequest{
Database: "db0",
RetentionPolicy: "rp1",
}

View File

@ -3,7 +3,7 @@ package subscriber
import (
"net"
"github.com/influxdata/influxdb/cluster"
"github.com/influxdata/influxdb/coordinator"
)
// UDP supports writing points over UDP using the line protocol.
@ -17,7 +17,7 @@ func NewUDP(addr string) *UDP {
}
// WritePoints writes points over UDP transport.
func (u *UDP) WritePoints(p *cluster.WritePointsRequest) (err error) {
func (u *UDP) WritePoints(p *coordinator.WritePointsRequest) (err error) {
var addr *net.UDPAddr
var con *net.UDPConn
addr, err = net.ResolveUDPAddr("udp", u.addr)

View File

@ -33,7 +33,7 @@ func TestSize_UnmarshalText_GB(t *testing.T) {
func TestConfig_Encode(t *testing.T) {
var c run.Config
c.Cluster.WriteTimeout = itoml.Duration(time.Minute)
c.Coordinator.WriteTimeout = itoml.Duration(time.Minute)
buf := new(bytes.Buffer)
if err := toml.NewEncoder(buf).Encode(&c); err != nil {
t.Fatal("Failed to encode: ", err)