Merge pull request #5811 from benbjohnson/remote-exec-2

Remote Execution
pull/5825/merge
Ben Johnson 2016-02-25 09:05:17 -07:00
commit eaed2aadcf
30 changed files with 3000 additions and 991 deletions

View File

@ -11,25 +11,27 @@ It is generated from these files:
It has these top-level messages:
WriteShardRequest
WriteShardResponse
MapShardRequest
MapShardResponse
ExecuteStatementRequest
ExecuteStatementResponse
CreateIteratorRequest
CreateIteratorResponse
FieldDimensionsRequest
FieldDimensionsResponse
SeriesKeysRequest
SeriesKeysResponse
*/
package internal
import proto "github.com/gogo/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
type WriteShardRequest struct {
ShardID *uint64 `protobuf:"varint,1,req,name=ShardID" json:"ShardID,omitempty"`
Points [][]byte `protobuf:"bytes,2,rep,name=Points" json:"Points,omitempty"`
ShardID *uint64 `protobuf:"varint,1,req" json:"ShardID,omitempty"`
Points [][]byte `protobuf:"bytes,2,rep" json:"Points,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -52,8 +54,8 @@ func (m *WriteShardRequest) GetPoints() [][]byte {
}
type WriteShardResponse struct {
Code *int32 `protobuf:"varint,1,req,name=Code" json:"Code,omitempty"`
Message *string `protobuf:"bytes,2,opt,name=Message" json:"Message,omitempty"`
Code *int32 `protobuf:"varint,1,req" json:"Code,omitempty"`
Message *string `protobuf:"bytes,2,opt" json:"Message,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -75,89 +77,9 @@ func (m *WriteShardResponse) GetMessage() string {
return ""
}
type MapShardRequest struct {
ShardID *uint64 `protobuf:"varint,1,req,name=ShardID" json:"ShardID,omitempty"`
Query *string `protobuf:"bytes,2,req,name=Query" json:"Query,omitempty"`
ChunkSize *int32 `protobuf:"varint,3,req,name=ChunkSize" json:"ChunkSize,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *MapShardRequest) Reset() { *m = MapShardRequest{} }
func (m *MapShardRequest) String() string { return proto.CompactTextString(m) }
func (*MapShardRequest) ProtoMessage() {}
func (m *MapShardRequest) GetShardID() uint64 {
if m != nil && m.ShardID != nil {
return *m.ShardID
}
return 0
}
func (m *MapShardRequest) GetQuery() string {
if m != nil && m.Query != nil {
return *m.Query
}
return ""
}
func (m *MapShardRequest) GetChunkSize() int32 {
if m != nil && m.ChunkSize != nil {
return *m.ChunkSize
}
return 0
}
type MapShardResponse struct {
Code *int32 `protobuf:"varint,1,req,name=Code" json:"Code,omitempty"`
Message *string `protobuf:"bytes,2,opt,name=Message" json:"Message,omitempty"`
Data []byte `protobuf:"bytes,3,opt,name=Data" json:"Data,omitempty"`
TagSets []string `protobuf:"bytes,4,rep,name=TagSets" json:"TagSets,omitempty"`
Fields []string `protobuf:"bytes,5,rep,name=Fields" json:"Fields,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *MapShardResponse) Reset() { *m = MapShardResponse{} }
func (m *MapShardResponse) String() string { return proto.CompactTextString(m) }
func (*MapShardResponse) ProtoMessage() {}
func (m *MapShardResponse) GetCode() int32 {
if m != nil && m.Code != nil {
return *m.Code
}
return 0
}
func (m *MapShardResponse) GetMessage() string {
if m != nil && m.Message != nil {
return *m.Message
}
return ""
}
func (m *MapShardResponse) GetData() []byte {
if m != nil {
return m.Data
}
return nil
}
func (m *MapShardResponse) GetTagSets() []string {
if m != nil {
return m.TagSets
}
return nil
}
func (m *MapShardResponse) GetFields() []string {
if m != nil {
return m.Fields
}
return nil
}
type ExecuteStatementRequest struct {
Statement *string `protobuf:"bytes,1,req,name=Statement" json:"Statement,omitempty"`
Database *string `protobuf:"bytes,2,req,name=Database" json:"Database,omitempty"`
Statement *string `protobuf:"bytes,1,req" json:"Statement,omitempty"`
Database *string `protobuf:"bytes,2,req" json:"Database,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -180,8 +102,8 @@ func (m *ExecuteStatementRequest) GetDatabase() string {
}
type ExecuteStatementResponse struct {
Code *int32 `protobuf:"varint,1,req,name=Code" json:"Code,omitempty"`
Message *string `protobuf:"bytes,2,opt,name=Message" json:"Message,omitempty"`
Code *int32 `protobuf:"varint,1,req" json:"Code,omitempty"`
Message *string `protobuf:"bytes,2,opt" json:"Message,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -203,11 +125,149 @@ func (m *ExecuteStatementResponse) GetMessage() string {
return ""
}
func init() {
proto.RegisterType((*WriteShardRequest)(nil), "internal.WriteShardRequest")
proto.RegisterType((*WriteShardResponse)(nil), "internal.WriteShardResponse")
proto.RegisterType((*MapShardRequest)(nil), "internal.MapShardRequest")
proto.RegisterType((*MapShardResponse)(nil), "internal.MapShardResponse")
proto.RegisterType((*ExecuteStatementRequest)(nil), "internal.ExecuteStatementRequest")
proto.RegisterType((*ExecuteStatementResponse)(nil), "internal.ExecuteStatementResponse")
type CreateIteratorRequest struct {
ShardIDs []uint64 `protobuf:"varint,1,rep" json:"ShardIDs,omitempty"`
Opt []byte `protobuf:"bytes,2,req" json:"Opt,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *CreateIteratorRequest) Reset() { *m = CreateIteratorRequest{} }
func (m *CreateIteratorRequest) String() string { return proto.CompactTextString(m) }
func (*CreateIteratorRequest) ProtoMessage() {}
func (m *CreateIteratorRequest) GetShardIDs() []uint64 {
if m != nil {
return m.ShardIDs
}
return nil
}
func (m *CreateIteratorRequest) GetOpt() []byte {
if m != nil {
return m.Opt
}
return nil
}
type CreateIteratorResponse struct {
Err *string `protobuf:"bytes,1,opt" json:"Err,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *CreateIteratorResponse) Reset() { *m = CreateIteratorResponse{} }
func (m *CreateIteratorResponse) String() string { return proto.CompactTextString(m) }
func (*CreateIteratorResponse) ProtoMessage() {}
func (m *CreateIteratorResponse) GetErr() string {
if m != nil && m.Err != nil {
return *m.Err
}
return ""
}
type FieldDimensionsRequest struct {
ShardIDs []uint64 `protobuf:"varint,1,rep" json:"ShardIDs,omitempty"`
Sources []byte `protobuf:"bytes,2,req" json:"Sources,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *FieldDimensionsRequest) Reset() { *m = FieldDimensionsRequest{} }
func (m *FieldDimensionsRequest) String() string { return proto.CompactTextString(m) }
func (*FieldDimensionsRequest) ProtoMessage() {}
func (m *FieldDimensionsRequest) GetShardIDs() []uint64 {
if m != nil {
return m.ShardIDs
}
return nil
}
func (m *FieldDimensionsRequest) GetSources() []byte {
if m != nil {
return m.Sources
}
return nil
}
type FieldDimensionsResponse struct {
Fields []string `protobuf:"bytes,1,rep" json:"Fields,omitempty"`
Dimensions []string `protobuf:"bytes,2,rep" json:"Dimensions,omitempty"`
Err *string `protobuf:"bytes,3,opt" json:"Err,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *FieldDimensionsResponse) Reset() { *m = FieldDimensionsResponse{} }
func (m *FieldDimensionsResponse) String() string { return proto.CompactTextString(m) }
func (*FieldDimensionsResponse) ProtoMessage() {}
func (m *FieldDimensionsResponse) GetFields() []string {
if m != nil {
return m.Fields
}
return nil
}
func (m *FieldDimensionsResponse) GetDimensions() []string {
if m != nil {
return m.Dimensions
}
return nil
}
func (m *FieldDimensionsResponse) GetErr() string {
if m != nil && m.Err != nil {
return *m.Err
}
return ""
}
type SeriesKeysRequest struct {
ShardIDs []uint64 `protobuf:"varint,1,rep" json:"ShardIDs,omitempty"`
Opt []byte `protobuf:"bytes,2,req" json:"Opt,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *SeriesKeysRequest) Reset() { *m = SeriesKeysRequest{} }
func (m *SeriesKeysRequest) String() string { return proto.CompactTextString(m) }
func (*SeriesKeysRequest) ProtoMessage() {}
func (m *SeriesKeysRequest) GetShardIDs() []uint64 {
if m != nil {
return m.ShardIDs
}
return nil
}
func (m *SeriesKeysRequest) GetOpt() []byte {
if m != nil {
return m.Opt
}
return nil
}
type SeriesKeysResponse struct {
SeriesList []byte `protobuf:"bytes,1,opt" json:"SeriesList,omitempty"`
Err *string `protobuf:"bytes,2,opt" json:"Err,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *SeriesKeysResponse) Reset() { *m = SeriesKeysResponse{} }
func (m *SeriesKeysResponse) String() string { return proto.CompactTextString(m) }
func (*SeriesKeysResponse) ProtoMessage() {}
func (m *SeriesKeysResponse) GetSeriesList() []byte {
if m != nil {
return m.SeriesList
}
return nil
}
func (m *SeriesKeysResponse) GetErr() string {
if m != nil && m.Err != nil {
return *m.Err
}
return ""
}
func init() {
}

View File

@ -2,34 +2,51 @@ package internal;
message WriteShardRequest {
required uint64 ShardID = 1;
repeated bytes Points = 2;
repeated bytes Points = 2;
}
message WriteShardResponse {
required int32 Code = 1;
required int32 Code = 1;
optional string Message = 2;
}
message MapShardRequest {
required uint64 ShardID = 1;
required string Query = 2;
required int32 ChunkSize = 3;
}
message MapShardResponse {
required int32 Code = 1;
optional string Message = 2;
optional bytes Data = 3;
repeated string TagSets = 4;
repeated string Fields = 5;
}
message ExecuteStatementRequest {
required string Statement = 1;
required string Database = 2;
required string Database = 2;
}
message ExecuteStatementResponse {
required int32 Code = 1;
required int32 Code = 1;
optional string Message = 2;
}
message CreateIteratorRequest {
repeated uint64 ShardIDs = 1;
required bytes Opt = 2;
}
message CreateIteratorResponse {
optional string Err = 1;
}
message FieldDimensionsRequest {
repeated uint64 ShardIDs = 1;
required bytes Sources = 2;
}
message FieldDimensionsResponse {
repeated string Fields = 1;
repeated string Dimensions = 2;
optional string Err = 3;
}
message SeriesKeysRequest {
repeated uint64 ShardIDs = 1;
required bytes Opt = 2;
}
message SeriesKeysResponse {
optional bytes SeriesList = 1;
optional string Err = 2;
}

View File

@ -1,46 +0,0 @@
package cluster
import (
"time"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/services/meta"
)
// IteratorCreator is responsible for creating iterators for queries.
// Iterators can be created for the local node or can be retrieved remotely.
type IteratorCreator struct {
MetaStore interface {
NodeID() uint64
Node(id uint64) (ni *meta.NodeInfo, err error)
}
TSDBStore influxql.IteratorCreator
// Duration before idle remote iterators are disconnected.
Timeout time.Duration
// Treats all shards as remote. Useful for testing.
ForceRemoteMapping bool
pool *clientPool
}
// NewIteratorCreator returns a new instance of IteratorCreator.
func NewIteratorCreator() *IteratorCreator {
return &IteratorCreator{
pool: newClientPool(),
}
}
// CreateIterator creates an iterator from local and remote shards.
func (ic *IteratorCreator) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) {
// FIXME(benbjohnson): Integrate remote execution.
return ic.TSDBStore.CreateIterator(opt)
}
// FieldDimensions returns the unique fields and dimensions across a list of sources.
func (ic *IteratorCreator) FieldDimensions(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) {
// FIXME(benbjohnson): Integrate remote execution.
return ic.TSDBStore.FieldDimensions(sources)
}

40
cluster/meta_client.go Normal file
View File

@ -0,0 +1,40 @@
package cluster
import (
"time"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/services/meta"
)
// MetaClient is an interface for accessing meta data.
type MetaClient interface {
CreateContinuousQuery(database, name, query string) error
CreateDatabase(name string) (*meta.DatabaseInfo, error)
CreateDatabaseWithRetentionPolicy(name string, rpi *meta.RetentionPolicyInfo) (*meta.DatabaseInfo, error)
CreateRetentionPolicy(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error)
CreateSubscription(database, rp, name, mode string, destinations []string) error
CreateUser(name, password string, admin bool) (*meta.UserInfo, error)
Database(name string) (*meta.DatabaseInfo, error)
Databases() ([]meta.DatabaseInfo, error)
DataNode(id uint64) (*meta.NodeInfo, error)
DataNodes() ([]meta.NodeInfo, error)
DeleteDataNode(id uint64) error
DeleteMetaNode(id uint64) error
DropContinuousQuery(database, name string) error
DropDatabase(name string) error
DropRetentionPolicy(database, name string) error
DropSubscription(database, rp, name string) error
DropUser(name string) error
MetaNodes() ([]meta.NodeInfo, error)
RetentionPolicy(database, name string) (rpi *meta.RetentionPolicyInfo, err error)
SetAdminPrivilege(username string, admin bool) error
SetDefaultRetentionPolicy(database, name string) error
SetPrivilege(username, database string, p influxql.Privilege) error
ShardsByTimeRange(sources influxql.Sources, tmin, tmax time.Time) (a []meta.ShardInfo, err error)
UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate) error
UpdateUser(name, password string) error
UserPrivilege(username, database string) (*influxql.Privilege, error)
UserPrivileges(username string) (map[string]influxql.Privilege, error)
Users() []meta.UserInfo
}

160
cluster/meta_client_test.go Normal file
View File

@ -0,0 +1,160 @@
package cluster_test
import (
"time"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/services/meta"
)
// MetaClient is a mockable implementation of cluster.MetaClient.
type MetaClient struct {
CreateContinuousQueryFn func(database, name, query string) error
CreateDatabaseFn func(name string) (*meta.DatabaseInfo, error)
CreateDatabaseWithRetentionPolicyFn func(name string, rpi *meta.RetentionPolicyInfo) (*meta.DatabaseInfo, error)
CreateRetentionPolicyFn func(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error)
CreateSubscriptionFn func(database, rp, name, mode string, destinations []string) error
CreateUserFn func(name, password string, admin bool) (*meta.UserInfo, error)
DatabaseFn func(name string) (*meta.DatabaseInfo, error)
DatabasesFn func() ([]meta.DatabaseInfo, error)
DataNodeFn func(id uint64) (*meta.NodeInfo, error)
DataNodesFn func() ([]meta.NodeInfo, error)
DeleteDataNodeFn func(id uint64) error
DeleteMetaNodeFn func(id uint64) error
DropContinuousQueryFn func(database, name string) error
DropDatabaseFn func(name string) error
DropRetentionPolicyFn func(database, name string) error
DropSubscriptionFn func(database, rp, name string) error
DropUserFn func(name string) error
MetaNodesFn func() ([]meta.NodeInfo, error)
RetentionPolicyFn func(database, name string) (rpi *meta.RetentionPolicyInfo, err error)
SetAdminPrivilegeFn func(username string, admin bool) error
SetDefaultRetentionPolicyFn func(database, name string) error
SetPrivilegeFn func(username, database string, p influxql.Privilege) error
ShardsByTimeRangeFn func(sources influxql.Sources, tmin, tmax time.Time) (a []meta.ShardInfo, err error)
UpdateRetentionPolicyFn func(database, name string, rpu *meta.RetentionPolicyUpdate) error
UpdateUserFn func(name, password string) error
UserPrivilegeFn func(username, database string) (*influxql.Privilege, error)
UserPrivilegesFn func(username string) (map[string]influxql.Privilege, error)
UsersFn func() []meta.UserInfo
}
func (c *MetaClient) CreateContinuousQuery(database, name, query string) error {
return c.CreateContinuousQueryFn(database, name, query)
}
func (c *MetaClient) CreateDatabase(name string) (*meta.DatabaseInfo, error) {
return c.CreateDatabaseFn(name)
}
func (c *MetaClient) CreateDatabaseWithRetentionPolicy(name string, rpi *meta.RetentionPolicyInfo) (*meta.DatabaseInfo, error) {
return c.CreateDatabaseWithRetentionPolicyFn(name, rpi)
}
func (c *MetaClient) CreateRetentionPolicy(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error) {
return c.CreateRetentionPolicyFn(database, rpi)
}
func (c *MetaClient) CreateSubscription(database, rp, name, mode string, destinations []string) error {
return c.CreateSubscriptionFn(database, rp, name, mode, destinations)
}
func (c *MetaClient) CreateUser(name, password string, admin bool) (*meta.UserInfo, error) {
return c.CreateUserFn(name, password, admin)
}
func (c *MetaClient) Database(name string) (*meta.DatabaseInfo, error) {
return c.DatabaseFn(name)
}
func (c *MetaClient) Databases() ([]meta.DatabaseInfo, error) {
return c.DatabasesFn()
}
func (c *MetaClient) DataNode(id uint64) (*meta.NodeInfo, error) {
return c.DataNodeFn(id)
}
func (c *MetaClient) DataNodes() ([]meta.NodeInfo, error) {
return c.DataNodesFn()
}
func (c *MetaClient) DeleteDataNode(id uint64) error {
return c.DeleteDataNodeFn(id)
}
func (c *MetaClient) DeleteMetaNode(id uint64) error {
return c.DeleteMetaNodeFn(id)
}
func (c *MetaClient) DropContinuousQuery(database, name string) error {
return c.DropContinuousQueryFn(database, name)
}
func (c *MetaClient) DropDatabase(name string) error {
return c.DropDatabaseFn(name)
}
func (c *MetaClient) DropRetentionPolicy(database, name string) error {
return c.DropRetentionPolicyFn(database, name)
}
func (c *MetaClient) DropSubscription(database, rp, name string) error {
return c.DropSubscriptionFn(database, rp, name)
}
func (c *MetaClient) DropUser(name string) error {
return c.DropUserFn(name)
}
func (c *MetaClient) MetaNodes() ([]meta.NodeInfo, error) {
return c.MetaNodesFn()
}
func (c *MetaClient) RetentionPolicy(database, name string) (rpi *meta.RetentionPolicyInfo, err error) {
return c.RetentionPolicyFn(database, name)
}
func (c *MetaClient) SetAdminPrivilege(username string, admin bool) error {
return c.SetAdminPrivilegeFn(username, admin)
}
func (c *MetaClient) SetDefaultRetentionPolicy(database, name string) error {
return c.SetDefaultRetentionPolicyFn(database, name)
}
func (c *MetaClient) SetPrivilege(username, database string, p influxql.Privilege) error {
return c.SetPrivilegeFn(username, database, p)
}
func (c *MetaClient) ShardsByTimeRange(sources influxql.Sources, tmin, tmax time.Time) (a []meta.ShardInfo, err error) {
return c.ShardsByTimeRangeFn(sources, tmin, tmax)
}
func (c *MetaClient) UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate) error {
return c.UpdateRetentionPolicyFn(database, name, rpu)
}
func (c *MetaClient) UpdateUser(name, password string) error {
return c.UpdateUserFn(name, password)
}
func (c *MetaClient) UserPrivilege(username, database string) (*influxql.Privilege, error) {
return c.UserPrivilegeFn(username, database)
}
func (c *MetaClient) UserPrivileges(username string) (map[string]influxql.Privilege, error) {
return c.UserPrivilegesFn(username)
}
func (c *MetaClient) Users() []meta.UserInfo {
return c.UsersFn()
}
// DefaultMetaClientDatabaseFn returns a single database (db0) with a retention policy.
func DefaultMetaClientDatabaseFn(name string) (*meta.DatabaseInfo, error) {
return &meta.DatabaseInfo{
Name: DefaultDatabase,
DefaultRetentionPolicy: DefaultRetentionPolicy,
}, nil
}

View File

@ -13,9 +13,11 @@ import (
"github.com/influxdata/influxdb/services/meta"
)
// TODO(benbjohnson): Rewrite tests to use cluster_test.MetaClient.
// Ensures the points writer maps a single point to a single shard.
func TestPointsWriter_MapShards_One(t *testing.T) {
ms := MetaClient{}
ms := PointsWriterMetaClient{}
rp := NewRetentionPolicy("myp", time.Hour, 3)
ms.NodeIDFn = func() uint64 { return 1 }
@ -50,7 +52,7 @@ func TestPointsWriter_MapShards_One(t *testing.T) {
// Ensures the points writer maps a multiple points across shard group boundaries.
func TestPointsWriter_MapShards_Multiple(t *testing.T) {
ms := MetaClient{}
ms := PointsWriterMetaClient{}
rp := NewRetentionPolicy("myp", time.Hour, 3)
AttachShardGroupInfo(rp, []meta.ShardOwner{
{NodeID: 1},
@ -304,7 +306,7 @@ func TestPointsWriter_WritePoints(t *testing.T) {
},
}
ms := NewMetaClient()
ms := NewPointsWriterMetaClient()
ms.DatabaseFn = func(database string) (*meta.DatabaseInfo, error) {
return nil, nil
}
@ -374,8 +376,8 @@ func (f *fakeStore) CreateShard(database, retentionPolicy string, shardID uint64
return f.CreateShardfn(database, retentionPolicy, shardID)
}
func NewMetaClient() *MetaClient {
ms := &MetaClient{}
func NewPointsWriterMetaClient() *PointsWriterMetaClient {
ms := &PointsWriterMetaClient{}
rp := NewRetentionPolicy("myp", time.Hour, 3)
AttachShardGroupInfo(rp, []meta.ShardOwner{
{NodeID: 1},
@ -403,7 +405,7 @@ func NewMetaClient() *MetaClient {
return ms
}
type MetaClient struct {
type PointsWriterMetaClient struct {
NodeIDFn func() uint64
RetentionPolicyFn func(database, name string) (*meta.RetentionPolicyInfo, error)
CreateShardGroupIfNotExistsFn func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
@ -411,21 +413,21 @@ type MetaClient struct {
ShardOwnerFn func(shardID uint64) (string, string, *meta.ShardGroupInfo)
}
func (m MetaClient) NodeID() uint64 { return m.NodeIDFn() }
func (m PointsWriterMetaClient) NodeID() uint64 { return m.NodeIDFn() }
func (m MetaClient) RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error) {
func (m PointsWriterMetaClient) RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error) {
return m.RetentionPolicyFn(database, name)
}
func (m MetaClient) CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) {
func (m PointsWriterMetaClient) CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) {
return m.CreateShardGroupIfNotExistsFn(database, policy, timestamp)
}
func (m MetaClient) Database(database string) (*meta.DatabaseInfo, error) {
func (m PointsWriterMetaClient) Database(database string) (*meta.DatabaseInfo, error) {
return m.DatabaseFn(database)
}
func (m MetaClient) ShardOwner(shardID uint64) (string, string, *meta.ShardGroupInfo) {
func (m PointsWriterMetaClient) ShardOwner(shardID uint64) (string, string, *meta.ShardGroupInfo) {
return m.ShardOwnerFn(shardID)
}

View File

@ -8,6 +8,8 @@ import (
"io"
"io/ioutil"
"log"
"math/rand"
"net"
"sort"
"strconv"
"time"
@ -17,16 +19,18 @@ import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/monitor"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
)
// A QueryExecutor is responsible for processing a influxql.Query and
// executing all of the statements within, on nodes in a cluster.
type QueryExecutor struct {
MetaClient *meta.Client
// Reference to local node.
Node *influxdb.Node
MetaClient MetaClient
// TSDB storage for local node.
TSDBStore *tsdb.Store
TSDBStore TSDBStore
// Holds monitoring data for SHOW STATS and SHOW DIAGNOSTICS.
Monitor *monitor.Monitor
@ -37,6 +41,9 @@ type QueryExecutor struct {
// Used for executing meta statements on all data nodes.
MetaExecutor *MetaExecutor
// Remote execution timeout
Timeout time.Duration
// Output of all logging.
// Defaults to discarding all log output.
LogOutput io.Writer
@ -54,6 +61,7 @@ const (
// NewQueryExecutor returns a new instance of QueryExecutor.
func NewQueryExecutor() *QueryExecutor {
return &QueryExecutor{
Timeout: DefaultShardMapperTimeout,
LogOutput: ioutil.Discard,
statMap: influxdb.NewStatistics("queryExecutor", "queryExecutor", nil),
}
@ -424,22 +432,21 @@ func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, c
// Remove "time" from fields list.
stmt.RewriteTimeFields()
// Filter only shards that contain date range.
shardIDs, err := e.MetaClient.ShardIDsByTimeRange(stmt.Sources, opt.MinTime, opt.MaxTime)
// Create an iterator creator based on the shards in the cluster.
ic, err := e.iteratorCreator(stmt, &opt)
if err != nil {
return err
}
shards := e.TSDBStore.Shards(shardIDs)
// Rewrite wildcards, if any exist.
tmp, err := stmt.RewriteWildcards(tsdb.Shards(shards))
tmp, err := stmt.RewriteWildcards(ic)
if err != nil {
return err
}
stmt = tmp
// Create a set of iterators from a selection.
itrs, err := influxql.Select(stmt, tsdb.Shards(shards), &opt)
itrs, err := influxql.Select(stmt, ic, &opt)
if err != nil {
return err
}
@ -507,6 +514,70 @@ func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, c
return nil
}
// iteratorCreator returns a new instance of IteratorCreator based on stmt.
func (e *QueryExecutor) iteratorCreator(stmt *influxql.SelectStatement, opt *influxql.SelectOptions) (influxql.IteratorCreator, error) {
// Retrieve a list of shard IDs.
shards, err := e.MetaClient.ShardsByTimeRange(stmt.Sources, opt.MinTime, opt.MaxTime)
if err != nil {
return nil, err
}
// Map shards to nodes.
shardIDsByNodeID := make(map[uint64][]uint64)
for _, si := range shards {
// Always assign to local node if it has the shard.
// Otherwise randomly select a remote node.
var nodeID uint64
if si.OwnedBy(e.Node.ID) {
nodeID = e.Node.ID
} else if len(si.Owners) > 0 {
nodeID = si.Owners[rand.Intn(len(si.Owners))].NodeID
} else {
// This should not occur but if the shard has no owners then
// we don't want this to panic by trying to randomly select a node.
continue
}
// Otherwise assign it to a remote shard randomly.
shardIDsByNodeID[nodeID] = append(shardIDsByNodeID[nodeID], si.ID)
}
// Generate iterators for each node.
ics := make([]influxql.IteratorCreator, 0)
if err := func() error {
for nodeID, shardIDs := range shardIDsByNodeID {
// Sort shard IDs so we get more predicable execution.
sort.Sort(uint64Slice(shardIDs))
// Create iterator creators from TSDB if local.
if nodeID == e.Node.ID {
for _, shardID := range shardIDs {
ic := e.TSDBStore.ShardIteratorCreator(shardID)
if ic == nil {
continue
}
ics = append(ics, ic)
}
continue
}
// Otherwise create iterator creator remotely.
dialer := &NodeDialer{
MetaClient: e.MetaClient,
Timeout: e.Timeout,
}
ics = append(ics, newRemoteIteratorCreator(dialer, nodeID, shardIDs))
}
return nil
}(); err != nil {
influxql.IteratorCreators(ics).Close()
return nil, err
}
return influxql.IteratorCreators(ics), nil
}
func (e *QueryExecutor) executeShowContinuousQueriesStatement(stmt *influxql.ShowContinuousQueriesStatement) (models.Rows, error) {
dis, err := e.MetaClient.Databases()
if err != nil {
@ -896,6 +967,147 @@ type IntoWriteRequest struct {
Points []models.Point
}
// remoteIteratorCreator creates iterators for remote shards.
type remoteIteratorCreator struct {
dialer *NodeDialer
nodeID uint64
shardIDs []uint64
}
// newRemoteIteratorCreator returns a new instance of remoteIteratorCreator for a remote shard.
func newRemoteIteratorCreator(dialer *NodeDialer, nodeID uint64, shardIDs []uint64) *remoteIteratorCreator {
return &remoteIteratorCreator{
dialer: dialer,
nodeID: nodeID,
shardIDs: shardIDs,
}
}
// CreateIterator creates a remote streaming iterator.
func (ic *remoteIteratorCreator) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) {
conn, err := ic.dialer.DialNode(ic.nodeID)
if err != nil {
return nil, err
}
if err := func() error {
// Write request.
if err := EncodeTLV(conn, createIteratorRequestMessage, &CreateIteratorRequest{
ShardIDs: ic.shardIDs,
Opt: opt,
}); err != nil {
return err
}
// Read the response.
var resp CreateIteratorResponse
if _, err := DecodeTLV(conn, &resp); err != nil {
return err
} else if resp.Err != nil {
return err
}
return nil
}(); err != nil {
conn.Close()
return nil, err
}
return influxql.NewReaderIterator(conn)
}
// FieldDimensions returns the unique fields and dimensions across a list of sources.
func (ic *remoteIteratorCreator) FieldDimensions(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) {
conn, err := ic.dialer.DialNode(ic.nodeID)
if err != nil {
return nil, nil, err
}
defer conn.Close()
// Write request.
if err := EncodeTLV(conn, fieldDimensionsRequestMessage, &FieldDimensionsRequest{
ShardIDs: ic.shardIDs,
Sources: sources,
}); err != nil {
return nil, nil, err
}
// Read the response.
var resp FieldDimensionsResponse
if _, err := DecodeTLV(conn, &resp); err != nil {
return nil, nil, err
}
return resp.Fields, resp.Dimensions, resp.Err
}
// SeriesKeys returns a list of series keys from the underlying shard.
func (ic *remoteIteratorCreator) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error) {
conn, err := ic.dialer.DialNode(ic.nodeID)
if err != nil {
return nil, err
}
defer conn.Close()
// Write request.
if err := EncodeTLV(conn, seriesKeysRequestMessage, &SeriesKeysRequest{
ShardIDs: ic.shardIDs,
Opt: opt,
}); err != nil {
return nil, err
}
// Read the response.
var resp SeriesKeysResponse
if _, err := DecodeTLV(conn, &resp); err != nil {
return nil, err
}
return resp.SeriesList, resp.Err
}
// NodeDialer dials connections to a given node.
type NodeDialer struct {
MetaClient MetaClient
Timeout time.Duration
}
// DialNode returns a connection to a node.
func (d *NodeDialer) DialNode(nodeID uint64) (net.Conn, error) {
ni, err := d.MetaClient.DataNode(nodeID)
if err != nil {
return nil, err
}
conn, err := net.Dial("tcp", ni.TCPHost)
if err != nil {
return nil, err
}
conn.SetDeadline(time.Now().Add(d.Timeout))
// Write the cluster multiplexing header byte
if _, err := conn.Write([]byte{MuxHeader}); err != nil {
conn.Close()
return nil, err
}
return conn, nil
}
// TSDBStore is an interface for accessing the time series data store.
type TSDBStore interface {
CreateShard(database, policy string, shardID uint64) error
WriteToShard(shardID uint64, points []models.Point) error
DeleteDatabase(name string) error
DeleteMeasurement(database, name string) error
DeleteRetentionPolicy(database, name string) error
DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
ExecuteShowFieldKeysStatement(stmt *influxql.ShowFieldKeysStatement, database string) (models.Rows, error)
ExecuteShowSeriesStatement(stmt *influxql.ShowSeriesStatement, database string) (models.Rows, error)
ExecuteShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement, database string) (models.Rows, error)
ExpandSources(sources influxql.Sources) (influxql.Sources, error)
ShardIteratorCreator(id uint64) influxql.IteratorCreator
}
// joinUint64 returns a comma-delimited string of uint64 numbers.
func joinUint64(a []uint64) string {
var buf bytes.Buffer
@ -966,3 +1178,9 @@ func (s stringSet) intersect(o stringSet) stringSet {
}
return ns
}
type uint64Slice []uint64
func (a uint64Slice) Len() int { return len(a) }
func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] }

View File

@ -0,0 +1,311 @@
package cluster_test
import (
"bytes"
"io"
"os"
"reflect"
"testing"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/cluster"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
)
const (
// DefaultDatabase is the default database name used in tests.
DefaultDatabase = "db0"
// DefaultRetentionPolicy is the default retention policy name used in tests.
DefaultRetentionPolicy = "rp0"
)
// Ensure query executor can execute a simple SELECT statement.
func TestQueryExecutor_ExecuteQuery_SelectStatement(t *testing.T) {
e := DefaultQueryExecutor()
// The meta client should return a single shard owned by the local node.
e.MetaClient.ShardsByTimeRangeFn = func(sources influxql.Sources, tmin, tmax time.Time) (a []meta.ShardInfo, err error) {
return []meta.ShardInfo{{ID: 100, Owners: []meta.ShardOwner{{NodeID: 0}}}}, nil
}
// The TSDB store should return an IteratorCreator for shard.
// This IteratorCreator returns a single iterator with "value" in the aux fields.
e.TSDBStore.ShardIteratorCreatorFn = func(id uint64) influxql.IteratorCreator {
if id != 100 {
t.Fatalf("unexpected shard id: %d", id)
}
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return &FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Time: int64(0 * time.Second), Aux: []interface{}{float64(100)}},
{Name: "cpu", Time: int64(1 * time.Second), Aux: []interface{}{float64(200)}},
}}, nil
}
ic.FieldDimensionsFn = func(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) {
return map[string]struct{}{"value": struct{}{}}, nil, nil
}
ic.SeriesKeysFn = func(opt influxql.IteratorOptions) (influxql.SeriesList, error) {
return influxql.SeriesList{
{Name: "cpu", Aux: []influxql.DataType{influxql.Float}},
}, nil
}
return &ic
}
// Verify all results from the query.
if a := ReadAllResults(e.ExecuteQuery(`SELECT * FROM cpu`, "db0", 0)); !reflect.DeepEqual(a, []*influxql.Result{
{
StatementID: 0,
Series: []*models.Row{{
Name: "cpu",
Columns: []string{"time", "value"},
Values: [][]interface{}{
{time.Unix(0, 0).UTC(), float64(100)},
{time.Unix(1, 0).UTC(), float64(200)},
},
}},
},
}) {
t.Fatalf("unexpected results: %s", spew.Sdump(a))
}
}
// Ensure query executor can execute a distributed SELECT statement.
func TestQueryExecutor_ExecuteQuery_SelectStatement_Remote(t *testing.T) {
// Local executor.
e := DefaultQueryExecutor()
// Start a second service.
s := MustOpenService()
defer s.Close()
// Mock the remote service to create an iterator.
s.TSDBStore.ShardIteratorCreatorFn = func(shardID uint64) influxql.IteratorCreator {
if shardID != 200 {
t.Fatalf("unexpected remote shard id: %d", shardID)
}
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return &FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Time: int64(0 * time.Second), Value: 20},
}}, nil
}
return &ic
}
// Two shards are returned. One local and one remote.
e.MetaClient.ShardsByTimeRangeFn = func(sources influxql.Sources, tmin, tmax time.Time) (a []meta.ShardInfo, err error) {
return []meta.ShardInfo{
{ID: 100, Owners: []meta.ShardOwner{{NodeID: 0}}},
{ID: 200, Owners: []meta.ShardOwner{{NodeID: 1}}},
}, nil
}
// The meta client should return node data for the remote node.
e.MetaClient.DataNodeFn = func(id uint64) (*meta.NodeInfo, error) {
return &meta.NodeInfo{ID: 1, TCPHost: s.Addr().String()}, nil
}
// The local node should return a single iterator.
e.TSDBStore.ShardIteratorCreatorFn = func(id uint64) influxql.IteratorCreator {
if id != 100 {
t.Fatalf("unexpected shard id: %d", id)
}
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return &FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Time: int64(0 * time.Second), Value: 10},
}}, nil
}
return &ic
}
// Verify all results from the query.
if a := ReadAllResults(e.ExecuteQuery(`SELECT count(value) FROM cpu`, "db0", 0)); !reflect.DeepEqual(a, []*influxql.Result{
{
StatementID: 0,
Series: []*models.Row{{
Name: "cpu",
Columns: []string{"time", "count"},
Values: [][]interface{}{
{time.Unix(0, 0).UTC(), float64(30)},
},
}},
},
}) {
t.Fatalf("unexpected results: %s", spew.Sdump(a))
}
}
// QueryExecutor is a test wrapper for cluster.QueryExecutor.
type QueryExecutor struct {
*cluster.QueryExecutor
MetaClient MetaClient
TSDBStore TSDBStore
LogOutput bytes.Buffer
}
// NewQueryExecutor returns a new instance of QueryExecutor.
// This query executor always has a node id of 0.
func NewQueryExecutor() *QueryExecutor {
e := &QueryExecutor{
QueryExecutor: cluster.NewQueryExecutor(),
}
e.Node = &influxdb.Node{ID: 0}
e.QueryExecutor.MetaClient = &e.MetaClient
e.QueryExecutor.TSDBStore = &e.TSDBStore
e.QueryExecutor.LogOutput = &e.LogOutput
if testing.Verbose() {
e.QueryExecutor.LogOutput = io.MultiWriter(e.QueryExecutor.LogOutput, os.Stderr)
}
return e
}
// DefaultQueryExecutor returns a QueryExecutor with a database (db0) and retention policy (rp0).
func DefaultQueryExecutor() *QueryExecutor {
e := NewQueryExecutor()
e.MetaClient.DatabaseFn = DefaultMetaClientDatabaseFn
e.TSDBStore.ExpandSourcesFn = DefaultTSDBStoreExpandSourcesFn
return e
}
// ExecuteQuery parses query and executes against the database.
func (e *QueryExecutor) ExecuteQuery(query, database string, chunkSize int) <-chan *influxql.Result {
return e.QueryExecutor.ExecuteQuery(MustParseQuery(query), database, chunkSize, make(chan struct{}))
}
// TSDBStore is a mockable implementation of cluster.TSDBStore.
type TSDBStore struct {
CreateShardFn func(database, policy string, shardID uint64) error
WriteToShardFn func(shardID uint64, points []models.Point) error
DeleteDatabaseFn func(name string) error
DeleteMeasurementFn func(database, name string) error
DeleteRetentionPolicyFn func(database, name string) error
DeleteSeriesFn func(database string, sources []influxql.Source, condition influxql.Expr) error
ExecuteShowFieldKeysStatementFn func(stmt *influxql.ShowFieldKeysStatement, database string) (models.Rows, error)
ExecuteShowSeriesStatementFn func(stmt *influxql.ShowSeriesStatement, database string) (models.Rows, error)
ExecuteShowTagValuesStatementFn func(stmt *influxql.ShowTagValuesStatement, database string) (models.Rows, error)
ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error)
ShardIteratorCreatorFn func(id uint64) influxql.IteratorCreator
}
func (s *TSDBStore) CreateShard(database, policy string, shardID uint64) error {
return s.CreateShardFn(database, policy, shardID)
}
func (s *TSDBStore) WriteToShard(shardID uint64, points []models.Point) error {
return s.WriteToShardFn(shardID, points)
}
func (s *TSDBStore) DeleteDatabase(name string) error {
return s.DeleteDatabaseFn(name)
}
func (s *TSDBStore) DeleteMeasurement(database, name string) error {
return s.DeleteMeasurementFn(database, name)
}
func (s *TSDBStore) DeleteRetentionPolicy(database, name string) error {
return s.DeleteRetentionPolicyFn(database, name)
}
func (s *TSDBStore) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error {
return s.DeleteSeriesFn(database, sources, condition)
}
func (s *TSDBStore) ExecuteShowFieldKeysStatement(stmt *influxql.ShowFieldKeysStatement, database string) (models.Rows, error) {
return s.ExecuteShowFieldKeysStatementFn(stmt, database)
}
func (s *TSDBStore) ExecuteShowSeriesStatement(stmt *influxql.ShowSeriesStatement, database string) (models.Rows, error) {
return s.ExecuteShowSeriesStatementFn(stmt, database)
}
func (s *TSDBStore) ExecuteShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement, database string) (models.Rows, error) {
return s.ExecuteShowTagValuesStatementFn(stmt, database)
}
func (s *TSDBStore) ExpandSources(sources influxql.Sources) (influxql.Sources, error) {
return s.ExpandSourcesFn(sources)
}
func (s *TSDBStore) ShardIteratorCreator(id uint64) influxql.IteratorCreator {
return s.ShardIteratorCreatorFn(id)
}
// DefaultTSDBStoreExpandSourcesFn expands a single source using the default database & retention policy.
func DefaultTSDBStoreExpandSourcesFn(sources influxql.Sources) (influxql.Sources, error) {
return influxql.Sources{&influxql.Measurement{
Database: DefaultDatabase,
RetentionPolicy: DefaultRetentionPolicy,
Name: sources[0].(*influxql.Measurement).Name},
}, nil
}
// MustParseQuery parses s into a query. Panic on error.
func MustParseQuery(s string) *influxql.Query {
q, err := influxql.ParseQuery(s)
if err != nil {
panic(err)
}
return q
}
// ReadAllResults reads all results from c and returns as a slice.
func ReadAllResults(c <-chan *influxql.Result) []*influxql.Result {
var a []*influxql.Result
for result := range c {
a = append(a, result)
}
return a
}
// IteratorCreator is a mockable implementation of IteratorCreator.
type IteratorCreator struct {
CreateIteratorFn func(opt influxql.IteratorOptions) (influxql.Iterator, error)
FieldDimensionsFn func(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error)
SeriesKeysFn func(opt influxql.IteratorOptions) (influxql.SeriesList, error)
}
func (ic *IteratorCreator) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return ic.CreateIteratorFn(opt)
}
func (ic *IteratorCreator) FieldDimensions(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) {
return ic.FieldDimensionsFn(sources)
}
func (ic *IteratorCreator) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error) {
return ic.SeriesKeysFn(opt)
}
// FloatIterator is a represents an iterator that reads from a slice.
type FloatIterator struct {
Points []influxql.FloatPoint
}
// Close is a no-op.
func (itr *FloatIterator) Close() error { return nil }
// Next returns the next value and shifts it off the beginning of the points slice.
func (itr *FloatIterator) Next() *influxql.FloatPoint {
if len(itr.Points) == 0 {
return nil
}
v := &itr.Points[0]
itr.Points = itr.Points[1:]
return v
}

View File

@ -1,108 +1,18 @@
package cluster
import (
"errors"
"fmt"
"time"
"github.com/gogo/protobuf/proto"
"github.com/influxdata/influxdb/cluster/internal"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
)
//go:generate protoc --gogo_out=. internal/data.proto
// MapShardRequest represents the request to map a remote shard for a query.
type MapShardRequest struct {
pb internal.MapShardRequest
}
// ShardID of the map request
func (m *MapShardRequest) ShardID() uint64 { return m.pb.GetShardID() }
// Query returns the Shard map request's query
func (m *MapShardRequest) Query() string { return m.pb.GetQuery() }
// ChunkSize returns Shard map request's chunk size
func (m *MapShardRequest) ChunkSize() int32 { return m.pb.GetChunkSize() }
// SetShardID sets the map request's shard id
func (m *MapShardRequest) SetShardID(id uint64) { m.pb.ShardID = &id }
// SetQuery sets the Shard map request's Query
func (m *MapShardRequest) SetQuery(query string) { m.pb.Query = &query }
// SetChunkSize sets the Shard map request's chunk size
func (m *MapShardRequest) SetChunkSize(chunkSize int32) { m.pb.ChunkSize = &chunkSize }
// MarshalBinary encodes the object to a binary format.
func (m *MapShardRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(&m.pb)
}
// UnmarshalBinary populates MapShardRequest from a binary format.
func (m *MapShardRequest) UnmarshalBinary(buf []byte) error {
if err := proto.Unmarshal(buf, &m.pb); err != nil {
return err
}
return nil
}
// MapShardResponse represents the response returned from a remote MapShardRequest call
type MapShardResponse struct {
pb internal.MapShardResponse
}
// NewMapShardResponse returns the response returned from a remote MapShardRequest call
func NewMapShardResponse(code int, message string) *MapShardResponse {
m := &MapShardResponse{}
m.SetCode(code)
m.SetMessage(message)
return m
}
// Code returns the Shard map response's code
func (r *MapShardResponse) Code() int { return int(r.pb.GetCode()) }
// Message returns the the Shard map response's Message
func (r *MapShardResponse) Message() string { return r.pb.GetMessage() }
// TagSets returns Shard map response's tag sets
func (r *MapShardResponse) TagSets() []string { return r.pb.GetTagSets() }
// Fields returns the Shard map response's Fields
func (r *MapShardResponse) Fields() []string { return r.pb.GetFields() }
// Data returns the Shard map response's Data
func (r *MapShardResponse) Data() []byte { return r.pb.GetData() }
// SetCode sets the Shard map response's code
func (r *MapShardResponse) SetCode(code int) { r.pb.Code = proto.Int32(int32(code)) }
// SetMessage sets Shard map response's message
func (r *MapShardResponse) SetMessage(message string) { r.pb.Message = &message }
// SetTagSets sets Shard map response's tagsets
func (r *MapShardResponse) SetTagSets(tagsets []string) { r.pb.TagSets = tagsets }
// SetFields sets the Shard map response's Fields
func (r *MapShardResponse) SetFields(fields []string) { r.pb.Fields = fields }
// SetData sets the Shard map response's Data
func (r *MapShardResponse) SetData(data []byte) { r.pb.Data = data }
// MarshalBinary encodes the object to a binary format.
func (r *MapShardResponse) MarshalBinary() ([]byte, error) {
return proto.Marshal(&r.pb)
}
// UnmarshalBinary populates WritePointRequest from a binary format.
func (r *MapShardResponse) UnmarshalBinary(buf []byte) error {
if err := proto.Unmarshal(buf, &r.pb); err != nil {
return err
}
return nil
}
// WritePointsRequest represents a request to write point data to the cluster
type WritePointsRequest struct {
Database string
@ -281,3 +191,202 @@ func (w *ExecuteStatementResponse) UnmarshalBinary(buf []byte) error {
}
return nil
}
// CreateIteratorRequest represents a request to create a remote iterator.
type CreateIteratorRequest struct {
ShardIDs []uint64
Opt influxql.IteratorOptions
}
// MarshalBinary encodes r to a binary format.
func (r *CreateIteratorRequest) MarshalBinary() ([]byte, error) {
buf, err := r.Opt.MarshalBinary()
if err != nil {
return nil, err
}
return proto.Marshal(&internal.CreateIteratorRequest{
ShardIDs: r.ShardIDs,
Opt: buf,
})
}
// UnmarshalBinary decodes data into r.
func (r *CreateIteratorRequest) UnmarshalBinary(data []byte) error {
var pb internal.CreateIteratorRequest
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
r.ShardIDs = pb.GetShardIDs()
if err := r.Opt.UnmarshalBinary(pb.GetOpt()); err != nil {
return err
}
return nil
}
// CreateIteratorResponse represents a response from remote iterator creation.
type CreateIteratorResponse struct {
Err error
}
// MarshalBinary encodes r to a binary format.
func (r *CreateIteratorResponse) MarshalBinary() ([]byte, error) {
var pb internal.CreateIteratorResponse
if r.Err != nil {
pb.Err = proto.String(r.Err.Error())
}
return proto.Marshal(&pb)
}
// UnmarshalBinary decodes data into r.
func (r *CreateIteratorResponse) UnmarshalBinary(data []byte) error {
var pb internal.CreateIteratorResponse
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
if pb.Err != nil {
r.Err = errors.New(pb.GetErr())
}
return nil
}
// FieldDimensionsRequest represents a request to retrieve unique fields & dimensions.
type FieldDimensionsRequest struct {
ShardIDs []uint64
Sources influxql.Sources
}
// MarshalBinary encodes r to a binary format.
func (r *FieldDimensionsRequest) MarshalBinary() ([]byte, error) {
buf, err := r.Sources.MarshalBinary()
if err != nil {
return nil, err
}
return proto.Marshal(&internal.FieldDimensionsRequest{
ShardIDs: r.ShardIDs,
Sources: buf,
})
}
// UnmarshalBinary decodes data into r.
func (r *FieldDimensionsRequest) UnmarshalBinary(data []byte) error {
var pb internal.FieldDimensionsRequest
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
r.ShardIDs = pb.GetShardIDs()
if err := r.Sources.UnmarshalBinary(pb.GetSources()); err != nil {
return err
}
return nil
}
// FieldDimensionsResponse represents a response from remote iterator creation.
type FieldDimensionsResponse struct {
Fields map[string]struct{}
Dimensions map[string]struct{}
Err error
}
// MarshalBinary encodes r to a binary format.
func (r *FieldDimensionsResponse) MarshalBinary() ([]byte, error) {
var pb internal.FieldDimensionsResponse
pb.Fields = make([]string, 0, len(r.Fields))
for k := range r.Fields {
pb.Fields = append(pb.Fields, k)
}
pb.Dimensions = make([]string, 0, len(r.Dimensions))
for k := range r.Dimensions {
pb.Dimensions = append(pb.Dimensions, k)
}
if r.Err != nil {
pb.Err = proto.String(r.Err.Error())
}
return proto.Marshal(&pb)
}
// UnmarshalBinary decodes data into r.
func (r *FieldDimensionsResponse) UnmarshalBinary(data []byte) error {
var pb internal.FieldDimensionsResponse
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
r.Fields = make(map[string]struct{}, len(pb.GetFields()))
for _, s := range pb.GetFields() {
r.Fields[s] = struct{}{}
}
r.Dimensions = make(map[string]struct{}, len(pb.GetDimensions()))
for _, s := range pb.GetDimensions() {
r.Dimensions[s] = struct{}{}
}
if pb.Err != nil {
r.Err = errors.New(pb.GetErr())
}
return nil
}
// SeriesKeysRequest represents a request to retrieve a list of series keys.
type SeriesKeysRequest struct {
ShardIDs []uint64
Opt influxql.IteratorOptions
}
// MarshalBinary encodes r to a binary format.
func (r *SeriesKeysRequest) MarshalBinary() ([]byte, error) {
buf, err := r.Opt.MarshalBinary()
if err != nil {
return nil, err
}
return proto.Marshal(&internal.SeriesKeysRequest{
ShardIDs: r.ShardIDs,
Opt: buf,
})
}
// UnmarshalBinary decodes data into r.
func (r *SeriesKeysRequest) UnmarshalBinary(data []byte) error {
var pb internal.SeriesKeysRequest
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
r.ShardIDs = pb.GetShardIDs()
if err := r.Opt.UnmarshalBinary(pb.GetOpt()); err != nil {
return err
}
return nil
}
// SeriesKeysResponse represents a response from retrieving series keys.
type SeriesKeysResponse struct {
SeriesList influxql.SeriesList
Err error
}
// MarshalBinary encodes r to a binary format.
func (r *SeriesKeysResponse) MarshalBinary() ([]byte, error) {
var pb internal.CreateIteratorResponse
if r.Err != nil {
pb.Err = proto.String(r.Err.Error())
}
return proto.Marshal(&pb)
}
// UnmarshalBinary decodes data into r.
func (r *SeriesKeysResponse) UnmarshalBinary(data []byte) error {
var pb internal.CreateIteratorResponse
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
if pb.Err != nil {
r.Err = errors.New(pb.GetErr())
}
return nil
}

View File

@ -1,6 +1,7 @@
package cluster
import (
"encoding"
"encoding/binary"
"expvar"
"fmt"
@ -13,7 +14,6 @@ import (
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
)
@ -29,8 +29,15 @@ const (
writeShardReq = "writeShardReq"
writeShardPointsReq = "writeShardPointsReq"
writeShardFail = "writeShardFail"
mapShardReq = "mapShardReq"
mapShardResp = "mapShardResp"
createIteratorReq = "createIteratorReq"
createIteratorResp = "createIteratorResp"
fieldDimensionsReq = "fieldDimensionsReq"
fieldDimensionsResp = "fieldDimensionsResp"
seriesKeysReq = "seriesKeysReq"
seriesKeysResp = "seriesKeysResp"
)
// Service processes data received over raw TCP connections.
@ -46,14 +53,7 @@ type Service struct {
ShardOwner(shardID uint64) (string, string, *meta.ShardGroupInfo)
}
TSDBStore interface {
CreateShard(database, policy string, shardID uint64) error
WriteToShard(shardID uint64, points []models.Point) error
DeleteDatabase(name string) error
DeleteMeasurement(database, name string) error
DeleteSeries(database string, source []influxql.Source, condition influxql.Expr) error
DeleteRetentionPolicy(database, name string) error
}
TSDBStore TSDBStore
Logger *log.Logger
statMap *expvar.Map
@ -148,42 +148,51 @@ func (s *Service) handleConn(conn net.Conn) {
}()
for {
// Read type-length-value.
typ, buf, err := ReadTLV(conn)
typ, err := ReadType(conn)
if err != nil {
if strings.HasSuffix(err.Error(), "EOF") {
return
}
s.Logger.Printf("unable to read type-length-value %s", err)
s.Logger.Printf("unable to read type: %s", err)
return
}
// Delegate message processing by type.
switch typ {
case writeShardRequestMessage:
buf, err := ReadLV(conn)
if err != nil {
s.Logger.Printf("unable to read length-value: %s", err)
return
}
s.statMap.Add(writeShardReq, 1)
err := s.processWriteShardRequest(buf)
err = s.processWriteShardRequest(buf)
if err != nil {
s.Logger.Printf("process write shard error: %s", err)
}
s.writeShardResponse(conn, err)
case mapShardRequestMessage:
s.statMap.Add(mapShardReq, 1)
panic("FIXME(benbjohnson: integrate remote execution with iterators")
/*
err := s.processMapShardRequest(conn, buf)
if err != nil {
s.Logger.Printf("process map shard error: %s", err)
if err := writeMapShardResponseMessage(conn, NewMapShardResponse(1, err.Error())); err != nil {
s.Logger.Printf("process map shard error writing response: %s", err.Error())
}
}
*/
case executeStatementRequestMessage:
err := s.processExecuteStatementRequest(buf)
buf, err := ReadLV(conn)
if err != nil {
s.Logger.Printf("unable to read length-value: %s", err)
return
}
err = s.processExecuteStatementRequest(buf)
if err != nil {
s.Logger.Printf("process execute statement error: %s", err)
}
s.writeShardResponse(conn, err)
case createIteratorRequestMessage:
s.statMap.Add(createIteratorReq, 1)
s.processCreateIteratorRequest(conn)
case fieldDimensionsRequestMessage:
s.statMap.Add(fieldDimensionsReq, 1)
s.processFieldDimensionsRequest(conn)
case seriesKeysRequestMessage:
s.statMap.Add(seriesKeysReq, 1)
s.processSeriesKeysRequest(conn)
default:
s.Logger.Printf("cluster service message type not found: %d", typ)
}
@ -287,120 +296,210 @@ func (s *Service) writeShardResponse(w io.Writer, e error) {
}
}
/*
func (s *Service) processMapShardRequest(w io.Writer, buf []byte) error {
// Decode request
var req MapShardRequest
if err := req.UnmarshalBinary(buf); err != nil {
return err
}
func (s *Service) processCreateIteratorRequest(conn net.Conn) {
defer conn.Close()
// Parse the statement.
q, err := influxql.ParseQuery(req.Query())
if err != nil {
return fmt.Errorf("processing map shard: %s", err)
} else if len(q.Statements) != 1 {
return fmt.Errorf("processing map shard: expected 1 statement but got %d", len(q.Statements))
}
m, err := s.TSDBStore.CreateMapper(req.ShardID(), q.Statements[0], int(req.ChunkSize()))
if err != nil {
return fmt.Errorf("create mapper: %s", err)
}
if m == nil {
return writeMapShardResponseMessage(w, NewMapShardResponse(0, ""))
}
if err := m.Open(); err != nil {
return fmt.Errorf("mapper open: %s", err)
}
defer m.Close()
var metaSent bool
for {
var resp MapShardResponse
if !metaSent {
resp.SetTagSets(m.TagSets())
resp.SetFields(m.Fields())
metaSent = true
}
chunk, err := m.NextChunk()
if err != nil {
return fmt.Errorf("next chunk: %s", err)
}
// NOTE: Even if the chunk is nil, we still need to send one
// empty response to let the other side know we're out of data.
if chunk != nil {
b, err := json.Marshal(chunk)
if err != nil {
return fmt.Errorf("encoding: %s", err)
}
resp.SetData(b)
}
// Write to connection.
resp.SetCode(0)
if err := writeMapShardResponseMessage(w, &resp); err != nil {
var itr influxql.Iterator
if err := func() error {
// Parse request.
var req CreateIteratorRequest
if err := DecodeLV(conn, &req); err != nil {
return err
}
s.statMap.Add(mapShardResp, 1)
if chunk == nil {
// All mapper data sent.
return nil
// Collect iterator creators for each shard.
ics := make([]influxql.IteratorCreator, 0, len(req.ShardIDs))
for _, shardID := range req.ShardIDs {
ic := s.TSDBStore.ShardIteratorCreator(shardID)
if ic == nil {
return nil
}
ics = append(ics, ic)
}
// Generate a single iterator from all shards.
i, err := influxql.IteratorCreators(ics).CreateIterator(req.Opt)
if err != nil {
return err
}
itr = i
return nil
}(); err != nil {
itr.Close()
s.Logger.Printf("error reading CreateIterator request: %s", err)
EncodeTLV(conn, createIteratorResponseMessage, &CreateIteratorResponse{Err: err})
return
}
// Encode success response.
if err := EncodeTLV(conn, createIteratorResponseMessage, &CreateIteratorResponse{}); err != nil {
s.Logger.Printf("error writing CreateIterator response: %s", err)
return
}
// Exit if no iterator was produced.
if itr == nil {
return
}
// Stream iterator to connection.
if err := influxql.NewIteratorEncoder(conn).EncodeIterator(itr); err != nil {
s.Logger.Printf("error encoding CreateIterator iterator: %s", err)
return
}
}
*/
func writeMapShardResponseMessage(w io.Writer, msg *MapShardResponse) error {
buf, err := msg.MarshalBinary()
if err != nil {
return err
func (s *Service) processFieldDimensionsRequest(conn net.Conn) {
var fields, dimensions map[string]struct{}
if err := func() error {
// Parse request.
var req FieldDimensionsRequest
if err := DecodeLV(conn, &req); err != nil {
return err
}
// Collect iterator creators for each shard.
ics := make([]influxql.IteratorCreator, 0, len(req.ShardIDs))
for _, shardID := range req.ShardIDs {
ic := s.TSDBStore.ShardIteratorCreator(shardID)
if ic == nil {
return nil
}
ics = append(ics, ic)
}
// Generate a single iterator from all shards.
f, d, err := influxql.IteratorCreators(ics).FieldDimensions(req.Sources)
if err != nil {
return err
}
fields, dimensions = f, d
return nil
}(); err != nil {
s.Logger.Printf("error reading FieldDimensions request: %s", err)
EncodeTLV(conn, fieldDimensionsResponseMessage, &FieldDimensionsResponse{Err: err})
return
}
// Encode success response.
if err := EncodeTLV(conn, fieldDimensionsResponseMessage, &FieldDimensionsResponse{
Fields: fields,
Dimensions: dimensions,
}); err != nil {
s.Logger.Printf("error writing FieldDimensions response: %s", err)
return
}
}
func (s *Service) processSeriesKeysRequest(conn net.Conn) {
var seriesList influxql.SeriesList
if err := func() error {
// Parse request.
var req SeriesKeysRequest
if err := DecodeLV(conn, &req); err != nil {
return err
}
// Collect iterator creators for each shard.
ics := make([]influxql.IteratorCreator, 0, len(req.ShardIDs))
for _, shardID := range req.ShardIDs {
ic := s.TSDBStore.ShardIteratorCreator(shardID)
if ic == nil {
return nil
}
ics = append(ics, ic)
}
// Generate a single iterator from all shards.
a, err := influxql.IteratorCreators(ics).SeriesKeys(req.Opt)
if err != nil {
return err
}
seriesList = a
return nil
}(); err != nil {
s.Logger.Printf("error reading SeriesKeys request: %s", err)
EncodeTLV(conn, seriesKeysResponseMessage, &SeriesKeysResponse{Err: err})
return
}
// Encode success response.
if err := EncodeTLV(conn, seriesKeysResponseMessage, &SeriesKeysResponse{
SeriesList: seriesList,
}); err != nil {
s.Logger.Printf("error writing SeriesKeys response: %s", err)
return
}
return WriteTLV(w, mapShardResponseMessage, buf)
}
// ReadTLV reads a type-length-value record from r.
func ReadTLV(r io.Reader) (byte, []byte, error) {
var typ [1]byte
if _, err := io.ReadFull(r, typ[:]); err != nil {
return 0, nil, fmt.Errorf("read message type: %s", err)
typ, err := ReadType(r)
if err != nil {
return 0, nil, err
}
buf, err := ReadLV(r)
if err != nil {
return 0, nil, err
}
return typ, buf, err
}
// ReadType reads the type from a TLV record.
func ReadType(r io.Reader) (byte, error) {
var typ [1]byte
if _, err := io.ReadFull(r, typ[:]); err != nil {
return 0, fmt.Errorf("read message type: %s", err)
}
return typ[0], nil
}
// ReadLV reads the length-value from a TLV record.
func ReadLV(r io.Reader) ([]byte, error) {
// Read the size of the message.
var sz int64
if err := binary.Read(r, binary.BigEndian, &sz); err != nil {
return 0, nil, fmt.Errorf("read message size: %s", err)
}
if sz == 0 {
return 0, nil, fmt.Errorf("invalid message size: %d", sz)
return nil, fmt.Errorf("read message size: %s", err)
}
if sz >= MaxMessageSize {
return 0, nil, fmt.Errorf("max message size of %d exceeded: %d", MaxMessageSize, sz)
return nil, fmt.Errorf("max message size of %d exceeded: %d", MaxMessageSize, sz)
}
// Read the value.
buf := make([]byte, sz)
if _, err := io.ReadFull(r, buf); err != nil {
return 0, nil, fmt.Errorf("read message value: %s", err)
return nil, fmt.Errorf("read message value: %s", err)
}
return typ[0], buf, nil
return buf, nil
}
// WriteTLV writes a type-length-value record to w.
func WriteTLV(w io.Writer, typ byte, buf []byte) error {
if err := WriteType(w, typ); err != nil {
return err
}
if err := WriteLV(w, buf); err != nil {
return err
}
return nil
}
// WriteType writes the type in a TLV record to w.
func WriteType(w io.Writer, typ byte) error {
if _, err := w.Write([]byte{typ}); err != nil {
return fmt.Errorf("write message type: %s", err)
}
return nil
}
// WriteLV writes the length-value in a TLV record to w.
func WriteLV(w io.Writer, buf []byte) error {
// Write the size of the message.
if err := binary.Write(w, binary.BigEndian, int64(len(buf))); err != nil {
return fmt.Errorf("write message size: %s", err)
@ -410,6 +509,54 @@ func WriteTLV(w io.Writer, typ byte, buf []byte) error {
if _, err := w.Write(buf); err != nil {
return fmt.Errorf("write message value: %s", err)
}
return nil
}
// EncodeTLV encodes v to a binary format and writes the record-length-value record to w.
func EncodeTLV(w io.Writer, typ byte, v encoding.BinaryMarshaler) error {
if err := WriteType(w, typ); err != nil {
return err
}
if err := EncodeLV(w, v); err != nil {
return err
}
return nil
}
// EncodeLV encodes v to a binary format and writes the length-value record to w.
func EncodeLV(w io.Writer, v encoding.BinaryMarshaler) error {
buf, err := v.MarshalBinary()
if err != nil {
return err
}
if err := WriteLV(w, buf); err != nil {
return err
}
return nil
}
// DecodeTLV reads the type-length-value record from r and unmarshals it into v.
func DecodeTLV(r io.Reader, v encoding.BinaryUnmarshaler) (typ byte, err error) {
typ, err = ReadType(r)
if err != nil {
return 0, err
}
if err := DecodeLV(r, v); err != nil {
return 0, err
}
return typ, nil
}
// DecodeLV reads the length-value record from r and unmarshals it into v.
func DecodeLV(r io.Reader, v encoding.BinaryUnmarshaler) error {
buf, err := ReadLV(r)
if err != nil {
return err
}
if err := v.UnmarshalBinary(buf); err != nil {
return err
}
return nil
}

View File

@ -2,11 +2,11 @@ package cluster_test
import (
"fmt"
"io"
"net"
"time"
"github.com/influxdata/influxdb/cluster"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tcp"
@ -24,15 +24,11 @@ func (m *metaClient) DataNode(nodeID uint64) (*meta.NodeInfo, error) {
}
type testService struct {
nodeID uint64
ln net.Listener
muxln net.Listener
writeShardFunc func(shardID uint64, points []models.Point) error
createShardFunc func(database, policy string, shardID uint64) error
deleteDatabaseFunc func(database string) error
deleteMeasurementFunc func(database, name string) error
deleteSeriesFunc func(database string, sources []influxql.Source, condition influxql.Expr) error
deleteRetentionPolicyFunc func(database, name string) error
nodeID uint64
ln net.Listener
muxln net.Listener
TSDBStore TSDBStore
}
func newTestWriteService(f func(shardID uint64, points []models.Point) error) testService {
@ -45,11 +41,12 @@ func newTestWriteService(f func(shardID uint64, points []models.Point) error) te
muxln := mux.Listen(cluster.MuxHeader)
go mux.Serve(ln)
return testService{
writeShardFunc: f,
ln: ln,
muxln: muxln,
s := testService{
ln: ln,
muxln: muxln,
}
s.TSDBStore.WriteToShardFn = f
return s
}
func (ts *testService) Close() {
@ -65,30 +62,6 @@ type serviceResponse struct {
points []models.Point
}
func (t testService) WriteToShard(shardID uint64, points []models.Point) error {
return t.writeShardFunc(shardID, points)
}
func (t testService) CreateShard(database, policy string, shardID uint64) error {
return t.createShardFunc(database, policy, shardID)
}
func (t testService) DeleteDatabase(database string) error {
return t.deleteDatabaseFunc(database)
}
func (t testService) DeleteMeasurement(database, name string) error {
return t.deleteMeasurementFunc(database, name)
}
func (t testService) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error {
return t.deleteSeriesFunc(database, sources, condition)
}
func (t testService) DeleteRetentionPolicy(database, name string) error {
return t.deleteRetentionPolicyFunc(database, name)
}
func writeShardSuccess(shardID uint64, points []models.Point) error {
responses <- &serviceResponse{
shardID: shardID,
@ -122,3 +95,76 @@ func (testService) ResponseN(n int) ([]*serviceResponse, error) {
}
}
}
// Service is a test wrapper for cluster.Service.
type Service struct {
*cluster.Service
ln net.Listener
TSDBStore TSDBStore
}
// NewService returns a new instance of Service.
func NewService() *Service {
s := &Service{
Service: cluster.NewService(cluster.Config{}),
}
s.Service.TSDBStore = &s.TSDBStore
return s
}
// MustOpenService returns a new, open service on a random port. Panic on error.
func MustOpenService() *Service {
s := NewService()
s.ln = MustListen("tcp", "127.0.0.1:0")
s.Listener = &muxListener{s.ln}
if err := s.Open(); err != nil {
panic(err)
}
return s
}
// Close closes the listener and waits for the service to close.
func (s *Service) Close() error {
if s.ln != nil {
s.ln.Close()
}
return s.Service.Close()
}
// Addr returns the network address of the service.
func (s *Service) Addr() net.Addr { return s.ln.Addr() }
// muxListener is a net.Listener implementation that strips off the first byte.
// This is used to simulate the listener from pkg/mux.
type muxListener struct {
net.Listener
}
// Accept accepts the next connection and removes the first byte.
func (ln *muxListener) Accept() (net.Conn, error) {
conn, err := ln.Listener.Accept()
if err != nil {
return nil, err
}
var buf [1]byte
if _, err := io.ReadFull(conn, buf[:]); err != nil {
conn.Close()
return nil, err
} else if buf[0] != cluster.MuxHeader {
conn.Close()
panic(fmt.Sprintf("unexpected mux header byte: %d", buf[0]))
}
return conn, nil
}
// MustListen opens a listener. Panic on error.
func MustListen(network, laddr string) net.Listener {
ln, err := net.Listen(network, laddr)
if err != nil {
panic(err)
}
return ln
}

View File

@ -1,264 +0,0 @@
package cluster
/*
import (
"encoding/json"
"fmt"
"net"
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
)
// ShardMapper is responsible for providing mappers for requested shards. It is
// responsible for creating those mappers from the local store, or reaching
// out to another node on the cluster.
type ShardMapper struct {
ForceRemoteMapping bool // All shards treated as remote. Useful for testing.
Node *influxdb.Node
MetaClient interface {
DataNode(id uint64) (ni *meta.NodeInfo, err error)
}
TSDBStore interface {
// CreateMapper(shardID uint64, stmt influxql.Statement, chunkSize int) (tsdb.Mapper, error)
}
timeout time.Duration
pool *clientPool
}
// NewShardMapper returns a mapper of local and remote shards.
func NewShardMapper(timeout time.Duration) *ShardMapper {
return &ShardMapper{
pool: newClientPool(),
timeout: timeout,
}
}
// CreateMapper returns a Mapper for the given shard ID.
func (s *ShardMapper) CreateMapper(sh meta.ShardInfo, stmt influxql.Statement, chunkSize int) (tsdb.Mapper, error) {
// Create a remote mapper if the local node doesn't own the shard.
if !sh.OwnedBy(s.Node.ID) || s.ForceRemoteMapping {
// Pick a node in a pseudo-random manner.
conn, err := s.dial(sh.Owners[rand.Intn(len(sh.Owners))].NodeID)
if err != nil {
return nil, err
}
conn.SetDeadline(time.Now().Add(s.timeout))
return NewRemoteMapper(conn, sh.ID, stmt, chunkSize), nil
}
// If it is local then return the mapper from the store.
m, err := s.TSDBStore.CreateMapper(sh.ID, stmt, chunkSize)
if err != nil {
return nil, err
}
return m, nil
}
func (s *ShardMapper) dial(nodeID uint64) (net.Conn, error) {
ni, err := s.MetaClient.DataNode(nodeID)
if err != nil {
return nil, err
}
conn, err := net.Dial("tcp", ni.TCPHost)
if err != nil {
return nil, err
}
// Write the cluster multiplexing header byte
conn.Write([]byte{MuxHeader})
return conn, nil
}
// RemoteMapper implements the tsdb.Mapper interface. It connects to a remote node,
// sends a query, and interprets the stream of data that comes back.
type RemoteMapper struct {
shardID uint64
stmt influxql.Statement
chunkSize int
tagsets []string
fields []string
conn net.Conn
bufferedResponse *MapShardResponse
// unmarshallers []tsdb.UnmarshalFunc // Mapping-specific unmarshal functions.
}
// NewRemoteMapper returns a new remote mapper using the given connection.
func NewRemoteMapper(c net.Conn, shardID uint64, stmt influxql.Statement, chunkSize int) *RemoteMapper {
return &RemoteMapper{
conn: c,
shardID: shardID,
stmt: stmt,
chunkSize: chunkSize,
}
}
// Open connects to the remote node and starts receiving data.
func (r *RemoteMapper) Open() (err error) {
defer func() {
if err != nil {
r.conn.Close()
}
}()
// Build Map request.
var request MapShardRequest
request.SetShardID(r.shardID)
request.SetQuery(r.stmt.String())
request.SetChunkSize(int32(r.chunkSize))
// Marshal into protocol buffers.
buf, err := request.MarshalBinary()
if err != nil {
return err
}
// Write request.
if err := WriteTLV(r.conn, mapShardRequestMessage, buf); err != nil {
return err
}
// Read the response.
_, buf, err = ReadTLV(r.conn)
if err != nil {
return err
}
// Unmarshal response.
r.bufferedResponse = &MapShardResponse{}
if err := r.bufferedResponse.UnmarshalBinary(buf); err != nil {
return err
}
if r.bufferedResponse.Code() != 0 {
return fmt.Errorf("error code %d: %s", r.bufferedResponse.Code(), r.bufferedResponse.Message())
}
// Decode the first response to get the TagSets.
r.tagsets = r.bufferedResponse.TagSets()
r.fields = r.bufferedResponse.Fields()
// Set up each mapping function for this statement.
if stmt, ok := r.stmt.(*influxql.SelectStatement); ok {
for _, c := range stmt.FunctionCalls() {
fn, err := tsdb.InitializeUnmarshaller(c)
if err != nil {
return err
}
r.unmarshallers = append(r.unmarshallers, fn)
}
}
return nil
}
// TagSets returns the TagSets
func (r *RemoteMapper) TagSets() []string {
return r.tagsets
}
// Fields returns RemoteMapper's Fields
func (r *RemoteMapper) Fields() []string {
return r.fields
}
// NextChunk returns the next chunk read from the remote node to the client.
func (r *RemoteMapper) NextChunk() (chunk interface{}, err error) {
var response *MapShardResponse
if r.bufferedResponse != nil {
response = r.bufferedResponse
r.bufferedResponse = nil
} else {
response = &MapShardResponse{}
// Read the response.
_, buf, err := ReadTLV(r.conn)
if err != nil {
return nil, err
}
// Unmarshal response.
if err := response.UnmarshalBinary(buf); err != nil {
return nil, err
}
if response.Code() != 0 {
return nil, fmt.Errorf("error code %d: %s", response.Code(), response.Message())
}
}
if response.Data() == nil {
return nil, nil
}
moj := &tsdb.MapperOutputJSON{}
if err := json.Unmarshal(response.Data(), moj); err != nil {
return nil, err
}
mvj := []*tsdb.MapperValueJSON{}
if err := json.Unmarshal(moj.Values, &mvj); err != nil {
return nil, err
}
// Prep the non-JSON version of Mapper output.
mo := &tsdb.MapperOutput{
Name: moj.Name,
Tags: moj.Tags,
Fields: moj.Fields,
CursorKey: moj.CursorKey,
}
if len(mvj) == 1 && len(mvj[0].AggData) > 0 {
// The MapperValue is carrying aggregate data, so run it through the
// custom unmarshallers for the map functions through which the data
// was mapped.
aggValues := []interface{}{}
for i, b := range mvj[0].AggData {
v, err := r.unmarshallers[i](b)
if err != nil {
return nil, err
}
aggValues = append(aggValues, v)
}
mo.Values = []*tsdb.MapperValue{&tsdb.MapperValue{
Time: mvj[0].Time,
Value: aggValues,
Tags: mvj[0].Tags,
}}
} else {
// Must be raw data instead.
for _, v := range mvj {
var rawValue interface{}
if err := json.Unmarshal(v.RawData, &rawValue); err != nil {
return nil, err
}
mo.Values = append(mo.Values, &tsdb.MapperValue{
Time: v.Time,
Value: rawValue,
Tags: v.Tags,
})
}
}
return mo, nil
}
// Close the Mapper
func (r *RemoteMapper) Close() {
r.conn.Close()
}
*/

View File

@ -1,112 +0,0 @@
package cluster
/*
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net"
"testing"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/tsdb"
)
// remoteShardResponder implements the remoteShardConn interface.
type remoteShardResponder struct {
net.Conn
t *testing.T
rxBytes []byte
buffer *bytes.Buffer
}
func newRemoteShardResponder(outputs []*tsdb.MapperOutput, tagsets []string) *remoteShardResponder {
r := &remoteShardResponder{}
a := make([]byte, 0, 1024)
r.buffer = bytes.NewBuffer(a)
// Pump the outputs in the buffer for later reading.
for _, o := range outputs {
resp := &MapShardResponse{}
resp.SetCode(0)
if o != nil {
d, _ := json.Marshal(o)
resp.SetData(d)
resp.SetTagSets(tagsets)
}
g, _ := resp.MarshalBinary()
WriteTLV(r.buffer, mapShardResponseMessage, g)
}
return r
}
func (r remoteShardResponder) Close() error { return nil }
func (r remoteShardResponder) Read(p []byte) (n int, err error) {
return io.ReadFull(r.buffer, p)
}
func (r remoteShardResponder) Write(p []byte) (n int, err error) {
if r.rxBytes == nil {
r.rxBytes = make([]byte, 0)
}
r.rxBytes = append(r.rxBytes, p...)
return len(p), nil
}
// Ensure a RemoteMapper can process valid responses from a remote shard.
func TestShardWriter_RemoteMapper_Success(t *testing.T) {
expTagSets := []string{"tagsetA"}
expOutput := &tsdb.MapperOutput{
Name: "cpu",
Tags: map[string]string{"host": "serverA"},
}
c := newRemoteShardResponder([]*tsdb.MapperOutput{expOutput, nil}, expTagSets)
r := NewRemoteMapper(c, 1234, mustParseStmt("SELECT * FROM CPU"), 10)
if err := r.Open(); err != nil {
t.Fatalf("failed to open remote mapper: %s", err.Error())
}
if r.TagSets()[0] != expTagSets[0] {
t.Fatalf("incorrect tagsets received, exp %v, got %v", expTagSets, r.TagSets())
}
// Get first chunk from mapper.
chunk, err := r.NextChunk()
if err != nil {
t.Fatalf("failed to get next chunk from mapper: %s", err.Error())
}
output, ok := chunk.(*tsdb.MapperOutput)
if !ok {
t.Fatal("chunk is not of expected type")
}
if output.Name != "cpu" {
t.Fatalf("received output incorrect, exp: %v, got %v", expOutput, output)
}
// Next chunk should be nil, indicating no more data.
chunk, err = r.NextChunk()
if err != nil {
t.Fatalf("failed to get next chunk from mapper: %s", err.Error())
}
if chunk != nil {
t.Fatal("received more chunks when none expected")
}
}
// mustParseStmt parses a single statement or panics.
func mustParseStmt(stmt string) influxql.Statement {
q, err := influxql.ParseQuery(stmt)
if err != nil {
panic(err)
} else if len(q.Statements) != 1 {
panic(fmt.Sprintf("expected 1 statement but got %d", len(q.Statements)))
}
return q.Statements[0]
}
*/

View File

@ -12,10 +12,18 @@ import (
const (
writeShardRequestMessage byte = iota + 1
writeShardResponseMessage
mapShardRequestMessage
mapShardResponseMessage
executeStatementRequestMessage
executeStatementResponseMessage
createIteratorRequestMessage
createIteratorResponseMessage
fieldDimensionsRequestMessage
fieldDimensionsResponseMessage
seriesKeysRequestMessage
seriesKeysResponseMessage
)
// ShardWriter writes a set of points to a shard.

View File

@ -16,7 +16,7 @@ func TestShardWriter_WriteShard_Success(t *testing.T) {
ts := newTestWriteService(writeShardSuccess)
s := cluster.NewService(cluster.Config{})
s.Listener = ts.muxln
s.TSDBStore = ts
s.TSDBStore = &ts.TSDBStore
if err := s.Open(); err != nil {
t.Fatal(err)
}
@ -63,7 +63,7 @@ func TestShardWriter_WriteShard_Multiple(t *testing.T) {
ts := newTestWriteService(writeShardSuccess)
s := cluster.NewService(cluster.Config{})
s.Listener = ts.muxln
s.TSDBStore = ts
s.TSDBStore = &ts.TSDBStore
if err := s.Open(); err != nil {
t.Fatal(err)
}
@ -112,7 +112,7 @@ func TestShardWriter_WriteShard_Error(t *testing.T) {
ts := newTestWriteService(writeShardFail)
s := cluster.NewService(cluster.Config{})
s.Listener = ts.muxln
s.TSDBStore = ts
s.TSDBStore = &ts.TSDBStore
if err := s.Open(); err != nil {
t.Fatal(err)
}
@ -140,7 +140,7 @@ func TestShardWriter_Write_ErrDialTimeout(t *testing.T) {
ts := newTestWriteService(writeShardSuccess)
s := cluster.NewService(cluster.Config{})
s.Listener = ts.muxln
s.TSDBStore = ts
s.TSDBStore = &ts.TSDBStore
if err := s.Open(); err != nil {
t.Fatal(err)
}
@ -195,7 +195,7 @@ func TestShardWriter_Write_PoolMax(t *testing.T) {
ShardWriterTimeout: toml.Duration(100 * time.Millisecond),
})
s.Listener = ts.muxln
s.TSDBStore = ts
s.TSDBStore = &ts.TSDBStore
if err := s.Open(); err != nil {
t.Fatal(err)
}

View File

@ -62,13 +62,12 @@ type Server struct {
MetaClient *meta.Client
MetaService *meta.Service
TSDBStore *tsdb.Store
QueryExecutor *cluster.QueryExecutor
PointsWriter *cluster.PointsWriter
ShardWriter *cluster.ShardWriter
IteratorCreator *cluster.IteratorCreator
HintedHandoff *hh.Service
Subscriber *subscriber.Service
TSDBStore *tsdb.Store
QueryExecutor *cluster.QueryExecutor
PointsWriter *cluster.PointsWriter
ShardWriter *cluster.ShardWriter
HintedHandoff *hh.Service
Subscriber *subscriber.Service
Services []Service
@ -435,6 +434,8 @@ func (s *Server) Open() error {
}
}
s.QueryExecutor.Node = s.Node
s.Subscriber.MetaClient = s.MetaClient
s.ShardWriter.MetaClient = s.MetaClient
s.HintedHandoff.MetaClient = s.MetaClient

View File

@ -9,6 +9,9 @@ import (
"strconv"
"strings"
"time"
"github.com/gogo/protobuf/proto"
"github.com/influxdata/influxdb/influxql/internal"
)
// DataType represents the primitive data types available in InfluxQL.
@ -321,6 +324,33 @@ func (a Sources) String() string {
return buf.String()
}
// MarshalBinary encodes a list of sources to a binary format.
func (a Sources) MarshalBinary() ([]byte, error) {
var pb internal.Measurements
pb.Items = make([]*internal.Measurement, len(a))
for i, source := range a {
pb.Items[i] = encodeMeasurement(source.(*Measurement))
}
return proto.Marshal(&pb)
}
// UnmarshalBinary decodes binary data into a list of sources.
func (a *Sources) UnmarshalBinary(buf []byte) error {
var pb internal.Measurements
if err := proto.Unmarshal(buf, &pb); err != nil {
return err
}
*a = make(Sources, len(pb.GetItems()))
for i := range pb.GetItems() {
mm, err := decodeMeasurement(pb.GetItems()[i])
if err != nil {
return err
}
(*a)[i] = mm
}
return nil
}
// IsSystemName returns true if name is an internal system name.
// System names are prefixed with an underscore.
func IsSystemName(name string) bool { return strings.HasPrefix(name, "_") }
@ -2799,6 +2829,38 @@ func (m *Measurement) String() string {
return buf.String()
}
func encodeMeasurement(mm *Measurement) *internal.Measurement {
pb := &internal.Measurement{
Database: proto.String(mm.Database),
RetentionPolicy: proto.String(mm.RetentionPolicy),
Name: proto.String(mm.Name),
IsTarget: proto.Bool(mm.IsTarget),
}
if mm.Regex != nil {
pb.Regex = proto.String(mm.Regex.String())
}
return pb
}
func decodeMeasurement(pb *internal.Measurement) (*Measurement, error) {
mm := &Measurement{
Database: pb.GetDatabase(),
RetentionPolicy: pb.GetRetentionPolicy(),
Name: pb.GetName(),
IsTarget: pb.GetIsTarget(),
}
if pb.Regex != nil {
regex, err := regexp.Compile(pb.GetRegex())
if err != nil {
return nil, fmt.Errorf("invalid binary measurement regex: value=%q, err=%s", pb.GetRegex(), err)
}
mm.Regex = &RegexLiteral{Val: regex}
}
return mm, nil
}
// VarRef represents a reference to a variable.
type VarRef struct {
Val string

View File

@ -11,29 +11,31 @@ It is generated from these files:
It has these top-level messages:
Point
Aux
IteratorOptions
Measurements
Measurement
Interval
*/
package internal
import proto "github.com/gogo/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
type Point struct {
Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"`
Tags *string `protobuf:"bytes,2,req,name=Tags" json:"Tags,omitempty"`
Time *int64 `protobuf:"varint,3,req,name=Time" json:"Time,omitempty"`
Nil *bool `protobuf:"varint,4,req,name=Nil" json:"Nil,omitempty"`
Aux []*Aux `protobuf:"bytes,5,rep,name=Aux" json:"Aux,omitempty"`
Aggregated *uint32 `protobuf:"varint,6,opt,name=Aggregated" json:"Aggregated,omitempty"`
FloatValue *float64 `protobuf:"fixed64,7,opt,name=FloatValue" json:"FloatValue,omitempty"`
IntegerValue *int64 `protobuf:"varint,8,opt,name=IntegerValue" json:"IntegerValue,omitempty"`
StringValue *string `protobuf:"bytes,9,opt,name=StringValue" json:"StringValue,omitempty"`
BooleanValue *bool `protobuf:"varint,10,opt,name=BooleanValue" json:"BooleanValue,omitempty"`
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
Tags *string `protobuf:"bytes,2,req" json:"Tags,omitempty"`
Time *int64 `protobuf:"varint,3,req" json:"Time,omitempty"`
Nil *bool `protobuf:"varint,4,req" json:"Nil,omitempty"`
Aux []*Aux `protobuf:"bytes,5,rep" json:"Aux,omitempty"`
Aggregated *uint32 `protobuf:"varint,6,opt" json:"Aggregated,omitempty"`
FloatValue *float64 `protobuf:"fixed64,7,opt" json:"FloatValue,omitempty"`
IntegerValue *int64 `protobuf:"varint,8,opt" json:"IntegerValue,omitempty"`
StringValue *string `protobuf:"bytes,9,opt" json:"StringValue,omitempty"`
BooleanValue *bool `protobuf:"varint,10,opt" json:"BooleanValue,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -112,11 +114,11 @@ func (m *Point) GetBooleanValue() bool {
}
type Aux struct {
DataType *int32 `protobuf:"varint,1,req,name=DataType" json:"DataType,omitempty"`
FloatValue *float64 `protobuf:"fixed64,2,opt,name=FloatValue" json:"FloatValue,omitempty"`
IntegerValue *int64 `protobuf:"varint,3,opt,name=IntegerValue" json:"IntegerValue,omitempty"`
StringValue *string `protobuf:"bytes,4,opt,name=StringValue" json:"StringValue,omitempty"`
BooleanValue *bool `protobuf:"varint,5,opt,name=BooleanValue" json:"BooleanValue,omitempty"`
DataType *int32 `protobuf:"varint,1,req" json:"DataType,omitempty"`
FloatValue *float64 `protobuf:"fixed64,2,opt" json:"FloatValue,omitempty"`
IntegerValue *int64 `protobuf:"varint,3,opt" json:"IntegerValue,omitempty"`
StringValue *string `protobuf:"bytes,4,opt" json:"StringValue,omitempty"`
BooleanValue *bool `protobuf:"varint,5,opt" json:"BooleanValue,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -159,7 +161,229 @@ func (m *Aux) GetBooleanValue() bool {
return false
}
func init() {
proto.RegisterType((*Point)(nil), "internal.Point")
proto.RegisterType((*Aux)(nil), "internal.Aux")
type IteratorOptions struct {
Expr *string `protobuf:"bytes,1,opt" json:"Expr,omitempty"`
Aux []string `protobuf:"bytes,2,rep" json:"Aux,omitempty"`
Sources []*Measurement `protobuf:"bytes,3,rep" json:"Sources,omitempty"`
Interval *Interval `protobuf:"bytes,4,opt" json:"Interval,omitempty"`
Dimensions []string `protobuf:"bytes,5,rep" json:"Dimensions,omitempty"`
Fill *int32 `protobuf:"varint,6,opt" json:"Fill,omitempty"`
FillValue *float64 `protobuf:"fixed64,7,opt" json:"FillValue,omitempty"`
Condition *string `protobuf:"bytes,8,opt" json:"Condition,omitempty"`
StartTime *int64 `protobuf:"varint,9,opt" json:"StartTime,omitempty"`
EndTime *int64 `protobuf:"varint,10,opt" json:"EndTime,omitempty"`
Ascending *bool `protobuf:"varint,11,opt" json:"Ascending,omitempty"`
Limit *int64 `protobuf:"varint,12,opt" json:"Limit,omitempty"`
Offset *int64 `protobuf:"varint,13,opt" json:"Offset,omitempty"`
SLimit *int64 `protobuf:"varint,14,opt" json:"SLimit,omitempty"`
SOffset *int64 `protobuf:"varint,15,opt" json:"SOffset,omitempty"`
Dedupe *bool `protobuf:"varint,16,opt" json:"Dedupe,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *IteratorOptions) Reset() { *m = IteratorOptions{} }
func (m *IteratorOptions) String() string { return proto.CompactTextString(m) }
func (*IteratorOptions) ProtoMessage() {}
func (m *IteratorOptions) GetExpr() string {
if m != nil && m.Expr != nil {
return *m.Expr
}
return ""
}
func (m *IteratorOptions) GetAux() []string {
if m != nil {
return m.Aux
}
return nil
}
func (m *IteratorOptions) GetSources() []*Measurement {
if m != nil {
return m.Sources
}
return nil
}
func (m *IteratorOptions) GetInterval() *Interval {
if m != nil {
return m.Interval
}
return nil
}
func (m *IteratorOptions) GetDimensions() []string {
if m != nil {
return m.Dimensions
}
return nil
}
func (m *IteratorOptions) GetFill() int32 {
if m != nil && m.Fill != nil {
return *m.Fill
}
return 0
}
func (m *IteratorOptions) GetFillValue() float64 {
if m != nil && m.FillValue != nil {
return *m.FillValue
}
return 0
}
func (m *IteratorOptions) GetCondition() string {
if m != nil && m.Condition != nil {
return *m.Condition
}
return ""
}
func (m *IteratorOptions) GetStartTime() int64 {
if m != nil && m.StartTime != nil {
return *m.StartTime
}
return 0
}
func (m *IteratorOptions) GetEndTime() int64 {
if m != nil && m.EndTime != nil {
return *m.EndTime
}
return 0
}
func (m *IteratorOptions) GetAscending() bool {
if m != nil && m.Ascending != nil {
return *m.Ascending
}
return false
}
func (m *IteratorOptions) GetLimit() int64 {
if m != nil && m.Limit != nil {
return *m.Limit
}
return 0
}
func (m *IteratorOptions) GetOffset() int64 {
if m != nil && m.Offset != nil {
return *m.Offset
}
return 0
}
func (m *IteratorOptions) GetSLimit() int64 {
if m != nil && m.SLimit != nil {
return *m.SLimit
}
return 0
}
func (m *IteratorOptions) GetSOffset() int64 {
if m != nil && m.SOffset != nil {
return *m.SOffset
}
return 0
}
func (m *IteratorOptions) GetDedupe() bool {
if m != nil && m.Dedupe != nil {
return *m.Dedupe
}
return false
}
type Measurements struct {
Items []*Measurement `protobuf:"bytes,1,rep" json:"Items,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Measurements) Reset() { *m = Measurements{} }
func (m *Measurements) String() string { return proto.CompactTextString(m) }
func (*Measurements) ProtoMessage() {}
func (m *Measurements) GetItems() []*Measurement {
if m != nil {
return m.Items
}
return nil
}
type Measurement struct {
Database *string `protobuf:"bytes,1,opt" json:"Database,omitempty"`
RetentionPolicy *string `protobuf:"bytes,2,opt" json:"RetentionPolicy,omitempty"`
Name *string `protobuf:"bytes,3,opt" json:"Name,omitempty"`
Regex *string `protobuf:"bytes,4,opt" json:"Regex,omitempty"`
IsTarget *bool `protobuf:"varint,5,opt" json:"IsTarget,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Measurement) Reset() { *m = Measurement{} }
func (m *Measurement) String() string { return proto.CompactTextString(m) }
func (*Measurement) ProtoMessage() {}
func (m *Measurement) GetDatabase() string {
if m != nil && m.Database != nil {
return *m.Database
}
return ""
}
func (m *Measurement) GetRetentionPolicy() string {
if m != nil && m.RetentionPolicy != nil {
return *m.RetentionPolicy
}
return ""
}
func (m *Measurement) GetName() string {
if m != nil && m.Name != nil {
return *m.Name
}
return ""
}
func (m *Measurement) GetRegex() string {
if m != nil && m.Regex != nil {
return *m.Regex
}
return ""
}
func (m *Measurement) GetIsTarget() bool {
if m != nil && m.IsTarget != nil {
return *m.IsTarget
}
return false
}
type Interval struct {
Duration *int64 `protobuf:"varint,1,opt" json:"Duration,omitempty"`
Offset *int64 `protobuf:"varint,2,opt" json:"Offset,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Interval) Reset() { *m = Interval{} }
func (m *Interval) String() string { return proto.CompactTextString(m) }
func (*Interval) ProtoMessage() {}
func (m *Interval) GetDuration() int64 {
if m != nil && m.Duration != nil {
return *m.Duration
}
return 0
}
func (m *Interval) GetOffset() int64 {
if m != nil && m.Offset != nil {
return *m.Offset
}
return 0
}
func init() {
}

View File

@ -21,3 +21,39 @@ message Aux {
optional string StringValue = 4;
optional bool BooleanValue = 5;
}
message IteratorOptions {
optional string Expr = 1;
repeated string Aux = 2;
repeated Measurement Sources = 3;
optional Interval Interval = 4;
repeated string Dimensions = 5;
optional int32 Fill = 6;
optional double FillValue = 7;
optional string Condition = 8;
optional int64 StartTime = 9;
optional int64 EndTime = 10;
optional bool Ascending = 11;
optional int64 Limit = 12;
optional int64 Offset = 13;
optional int64 SLimit = 14;
optional int64 SOffset = 15;
optional bool Dedupe = 16;
}
message Measurements {
repeated Measurement Items = 1;
}
message Measurement {
optional string Database = 1;
optional string RetentionPolicy = 2;
optional string Name = 3;
optional string Regex = 4;
optional bool IsTarget = 5;
}
message Interval {
optional int64 Duration = 1;
optional int64 Offset = 2;
}

View File

@ -10,6 +10,7 @@ import (
"container/heap"
"errors"
"fmt"
"io"
"log"
"sort"
"sync"
@ -909,6 +910,52 @@ func (itr *floatDedupeIterator) Next() *FloatPoint {
}
}
// floatReaderIterator represents an iterator that streams from a reader.
type floatReaderIterator struct {
r io.Reader
dec *FloatPointDecoder
first *FloatPoint
}
// newFloatReaderIterator returns a new instance of floatReaderIterator.
func newFloatReaderIterator(r io.Reader, first *FloatPoint) *floatReaderIterator {
return &floatReaderIterator{
r: r,
dec: NewFloatPointDecoder(r),
first: first,
}
}
// Close closes the underlying reader, if applicable.
func (itr *floatReaderIterator) Close() error {
if r, ok := itr.r.(io.ReadCloser); ok {
return r.Close()
}
return nil
}
// Next returns the next point from the iterator.
func (itr *floatReaderIterator) Next() *FloatPoint {
// Send first point if it hasn't been sent yet.
if itr.first != nil {
p := itr.first
itr.first = nil
return p
}
// OPTIMIZE(benbjohnson): Reuse point on iterator.
// Unmarshal next point.
p := &FloatPoint{}
if err := itr.dec.DecodeFloatPoint(p); err == io.EOF {
return nil
} else if err != nil {
log.Printf("error reading iterator point: %s", err)
return nil
}
return p
}
// IntegerIterator represents a stream of integer points.
type IntegerIterator interface {
Iterator
@ -1798,6 +1845,52 @@ func (itr *integerDedupeIterator) Next() *IntegerPoint {
}
}
// integerReaderIterator represents an iterator that streams from a reader.
type integerReaderIterator struct {
r io.Reader
dec *IntegerPointDecoder
first *IntegerPoint
}
// newIntegerReaderIterator returns a new instance of integerReaderIterator.
func newIntegerReaderIterator(r io.Reader, first *IntegerPoint) *integerReaderIterator {
return &integerReaderIterator{
r: r,
dec: NewIntegerPointDecoder(r),
first: first,
}
}
// Close closes the underlying reader, if applicable.
func (itr *integerReaderIterator) Close() error {
if r, ok := itr.r.(io.ReadCloser); ok {
return r.Close()
}
return nil
}
// Next returns the next point from the iterator.
func (itr *integerReaderIterator) Next() *IntegerPoint {
// Send first point if it hasn't been sent yet.
if itr.first != nil {
p := itr.first
itr.first = nil
return p
}
// OPTIMIZE(benbjohnson): Reuse point on iterator.
// Unmarshal next point.
p := &IntegerPoint{}
if err := itr.dec.DecodeIntegerPoint(p); err == io.EOF {
return nil
} else if err != nil {
log.Printf("error reading iterator point: %s", err)
return nil
}
return p
}
// StringIterator represents a stream of string points.
type StringIterator interface {
Iterator
@ -2687,6 +2780,52 @@ func (itr *stringDedupeIterator) Next() *StringPoint {
}
}
// stringReaderIterator represents an iterator that streams from a reader.
type stringReaderIterator struct {
r io.Reader
dec *StringPointDecoder
first *StringPoint
}
// newStringReaderIterator returns a new instance of stringReaderIterator.
func newStringReaderIterator(r io.Reader, first *StringPoint) *stringReaderIterator {
return &stringReaderIterator{
r: r,
dec: NewStringPointDecoder(r),
first: first,
}
}
// Close closes the underlying reader, if applicable.
func (itr *stringReaderIterator) Close() error {
if r, ok := itr.r.(io.ReadCloser); ok {
return r.Close()
}
return nil
}
// Next returns the next point from the iterator.
func (itr *stringReaderIterator) Next() *StringPoint {
// Send first point if it hasn't been sent yet.
if itr.first != nil {
p := itr.first
itr.first = nil
return p
}
// OPTIMIZE(benbjohnson): Reuse point on iterator.
// Unmarshal next point.
p := &StringPoint{}
if err := itr.dec.DecodeStringPoint(p); err == io.EOF {
return nil
} else if err != nil {
log.Printf("error reading iterator point: %s", err)
return nil
}
return p
}
// BooleanIterator represents a stream of boolean points.
type BooleanIterator interface {
Iterator
@ -3575,3 +3714,143 @@ func (itr *booleanDedupeIterator) Next() *BooleanPoint {
return p
}
}
// booleanReaderIterator represents an iterator that streams from a reader.
type booleanReaderIterator struct {
r io.Reader
dec *BooleanPointDecoder
first *BooleanPoint
}
// newBooleanReaderIterator returns a new instance of booleanReaderIterator.
func newBooleanReaderIterator(r io.Reader, first *BooleanPoint) *booleanReaderIterator {
return &booleanReaderIterator{
r: r,
dec: NewBooleanPointDecoder(r),
first: first,
}
}
// Close closes the underlying reader, if applicable.
func (itr *booleanReaderIterator) Close() error {
if r, ok := itr.r.(io.ReadCloser); ok {
return r.Close()
}
return nil
}
// Next returns the next point from the iterator.
func (itr *booleanReaderIterator) Next() *BooleanPoint {
// Send first point if it hasn't been sent yet.
if itr.first != nil {
p := itr.first
itr.first = nil
return p
}
// OPTIMIZE(benbjohnson): Reuse point on iterator.
// Unmarshal next point.
p := &BooleanPoint{}
if err := itr.dec.DecodeBooleanPoint(p); err == io.EOF {
return nil
} else if err != nil {
log.Printf("error reading iterator point: %s", err)
return nil
}
return p
}
// IteratorEncoder is an encoder for encoding an iterator's points to w.
type IteratorEncoder struct {
w io.Writer
}
// NewIteratorEncoder encodes an iterator's points to w.
func NewIteratorEncoder(w io.Writer) *IteratorEncoder {
return &IteratorEncoder{w: w}
}
// Encode encodes and writes all of itr's points to the underlying writer.
func (enc *IteratorEncoder) EncodeIterator(itr Iterator) error {
switch itr := itr.(type) {
case FloatIterator:
return enc.encodeFloatIterator(itr)
case IntegerIterator:
return enc.encodeIntegerIterator(itr)
case StringIterator:
return enc.encodeStringIterator(itr)
case BooleanIterator:
return enc.encodeBooleanIterator(itr)
default:
panic(fmt.Sprintf("unsupported iterator for encoder: %T", itr))
}
}
// encodeFloatIterator encodes all points from itr to the underlying writer.
func (enc *IteratorEncoder) encodeFloatIterator(itr FloatIterator) error {
penc := NewFloatPointEncoder(enc.w)
for {
// Retrieve the next point from the iterator.
p := itr.Next()
if p == nil {
return nil
}
// Write the point to the point encoder.
if err := penc.EncodeFloatPoint(p); err != nil {
return err
}
}
}
// encodeIntegerIterator encodes all points from itr to the underlying writer.
func (enc *IteratorEncoder) encodeIntegerIterator(itr IntegerIterator) error {
penc := NewIntegerPointEncoder(enc.w)
for {
// Retrieve the next point from the iterator.
p := itr.Next()
if p == nil {
return nil
}
// Write the point to the point encoder.
if err := penc.EncodeIntegerPoint(p); err != nil {
return err
}
}
}
// encodeStringIterator encodes all points from itr to the underlying writer.
func (enc *IteratorEncoder) encodeStringIterator(itr StringIterator) error {
penc := NewStringPointEncoder(enc.w)
for {
// Retrieve the next point from the iterator.
p := itr.Next()
if p == nil {
return nil
}
// Write the point to the point encoder.
if err := penc.EncodeStringPoint(p); err != nil {
return err
}
}
}
// encodeBooleanIterator encodes all points from itr to the underlying writer.
func (enc *IteratorEncoder) encodeBooleanIterator(itr BooleanIterator) error {
penc := NewBooleanPointEncoder(enc.w)
for {
// Retrieve the next point from the iterator.
p := itr.Next()
if p == nil {
return nil
}
// Write the point to the point encoder.
if err := penc.EncodeBooleanPoint(p); err != nil {
return err
}
}
}

View File

@ -4,6 +4,7 @@ import (
"container/heap"
"errors"
"fmt"
"io"
"sort"
"sync"
"log"
@ -908,4 +909,97 @@ func (itr *{{.name}}DedupeIterator) Next() *{{.Name}}Point {
}
}
// {{.name}}ReaderIterator represents an iterator that streams from a reader.
type {{.name}}ReaderIterator struct {
r io.Reader
dec *{{.Name}}PointDecoder
first *{{.Name}}Point
}
// new{{.Name}}ReaderIterator returns a new instance of {{.name}}ReaderIterator.
func new{{.Name}}ReaderIterator(r io.Reader, first *{{.Name}}Point) *{{.name}}ReaderIterator {
return &{{.name}}ReaderIterator{
r: r,
dec: New{{.Name}}PointDecoder(r),
first: first,
}
}
// Close closes the underlying reader, if applicable.
func (itr *{{.name}}ReaderIterator) Close() error {
if r, ok := itr.r.(io.ReadCloser); ok {
return r.Close()
}
return nil
}
// Next returns the next point from the iterator.
func (itr *{{.name}}ReaderIterator) Next() *{{.Name}}Point {
// Send first point if it hasn't been sent yet.
if itr.first != nil {
p := itr.first
itr.first = nil
return p
}
// OPTIMIZE(benbjohnson): Reuse point on iterator.
// Unmarshal next point.
p := &{{.Name}}Point{}
if err := itr.dec.Decode{{.Name}}Point(p); err == io.EOF {
return nil
} else if err != nil {
log.Printf("error reading iterator point: %s", err)
return nil
}
return p
}
{{end}}
// IteratorEncoder is an encoder for encoding an iterator's points to w.
type IteratorEncoder struct {
w io.Writer
}
// NewIteratorEncoder encodes an iterator's points to w.
func NewIteratorEncoder(w io.Writer) *IteratorEncoder {
return &IteratorEncoder{w: w}
}
// Encode encodes and writes all of itr's points to the underlying writer.
func (enc *IteratorEncoder) EncodeIterator(itr Iterator) error {
switch itr := itr.(type) {
case FloatIterator:
return enc.encodeFloatIterator(itr)
case IntegerIterator:
return enc.encodeIntegerIterator(itr)
case StringIterator:
return enc.encodeStringIterator(itr)
case BooleanIterator:
return enc.encodeBooleanIterator(itr)
default:
panic(fmt.Sprintf("unsupported iterator for encoder: %T", itr))
}
}
{{range .}}
// encode{{.Name}}Iterator encodes all points from itr to the underlying writer.
func (enc *IteratorEncoder) encode{{.Name}}Iterator(itr {{.Name}}Iterator) error {
penc := New{{.Name}}PointEncoder(enc.w)
for {
// Retrieve the next point from the iterator.
p := itr.Next()
if p == nil {
return nil
}
// Write the point to the point encoder.
if err := penc.Encode{{.Name}}Point(p); err != nil {
return err
}
}
}
{{end}}

View File

@ -3,8 +3,13 @@ package influxql
import (
"errors"
"fmt"
"io"
"sort"
"sync"
"time"
"github.com/gogo/protobuf/proto"
"github.com/influxdata/influxdb/influxql/internal"
)
// ErrUnknownCall is returned when operating on an unknown function call.
@ -390,6 +395,27 @@ func drainIterator(itr Iterator) {
}
}
// NewReaderIterator returns an iterator that streams from a reader.
func NewReaderIterator(r io.Reader) (Iterator, error) {
var p Point
if err := NewPointDecoder(r).DecodePoint(&p); err != nil {
return nil, err
}
switch p := p.(type) {
case *FloatPoint:
return newFloatReaderIterator(r, p), nil
case *IntegerPoint:
return newIntegerReaderIterator(r, p), nil
case *StringPoint:
return newStringReaderIterator(r, p), nil
case *BooleanPoint:
return newBooleanReaderIterator(r, p), nil
default:
panic(fmt.Sprintf("unsupported point for reader iterator: %T", p))
}
}
// IteratorCreator represents an interface for objects that can create Iterators.
type IteratorCreator interface {
// Creates a simple iterator for use in an InfluxQL query.
@ -402,6 +428,104 @@ type IteratorCreator interface {
SeriesKeys(opt IteratorOptions) (SeriesList, error)
}
// IteratorCreators represents a list of iterator creators.
type IteratorCreators []IteratorCreator
// Close closes all iterator creators that implement io.Closer.
func (a IteratorCreators) Close() error {
for _, ic := range a {
if ic, ok := ic.(io.Closer); ok {
ic.Close()
}
}
return nil
}
// CreateIterator returns a single combined iterator from multiple iterator creators.
func (a IteratorCreators) CreateIterator(opt IteratorOptions) (Iterator, error) {
// Create iterators for each shard.
// Ensure that they are closed if an error occurs.
itrs := make([]Iterator, 0, len(a))
if err := func() error {
for _, ic := range a {
itr, err := ic.CreateIterator(opt)
if err != nil {
return err
}
itrs = append(itrs, itr)
}
return nil
}(); err != nil {
Iterators(itrs).Close()
return nil, err
}
// Merge into a single iterator.
if opt.MergeSorted() {
return NewSortedMergeIterator(itrs, opt), nil
}
itr := NewMergeIterator(itrs, opt)
if opt.Expr != nil {
if expr, ok := opt.Expr.(*Call); ok && expr.Name == "count" {
opt.Expr = &Call{
Name: "sum",
Args: expr.Args,
}
}
}
return NewCallIterator(itr, opt)
}
// FieldDimensions returns unique fields and dimensions from multiple iterator creators.
func (a IteratorCreators) FieldDimensions(sources Sources) (fields, dimensions map[string]struct{}, err error) {
fields = make(map[string]struct{})
dimensions = make(map[string]struct{})
for _, ic := range a {
f, d, err := ic.FieldDimensions(sources)
if err != nil {
return nil, nil, err
}
for k := range f {
fields[k] = struct{}{}
}
for k := range d {
dimensions[k] = struct{}{}
}
}
return
}
// SeriesKeys returns a list of series in all iterator creators in a.
// If a series exists in multiple creators in a, all instances will be combined
// into a single Series by calling Combine on it.
func (a IteratorCreators) SeriesKeys(opt IteratorOptions) (SeriesList, error) {
seriesMap := make(map[string]Series)
for _, sh := range a {
series, err := sh.SeriesKeys(opt)
if err != nil {
return nil, err
}
for _, s := range series {
cur, ok := seriesMap[s.ID()]
if ok {
cur.Combine(&s)
} else {
seriesMap[s.ID()] = s
}
}
}
seriesList := make([]Series, 0, len(seriesMap))
for _, s := range seriesMap {
seriesList = append(seriesList, s)
}
sort.Sort(SeriesList(seriesList))
return SeriesList(seriesList), nil
}
// IteratorOptions is an object passed to CreateIterator to specify creation options.
type IteratorOptions struct {
// Expression to iterate for.
@ -547,6 +671,118 @@ func (opt IteratorOptions) DerivativeInterval() Interval {
return Interval{Duration: time.Second}
}
// MarshalBinary encodes opt into a binary format.
func (opt *IteratorOptions) MarshalBinary() ([]byte, error) {
return proto.Marshal(encodeIteratorOptions(opt))
}
// UnmarshalBinary decodes from a binary format in to opt.
func (opt *IteratorOptions) UnmarshalBinary(buf []byte) error {
var pb internal.IteratorOptions
if err := proto.Unmarshal(buf, &pb); err != nil {
return err
}
other, err := decodeIteratorOptions(&pb)
if err != nil {
return err
}
*opt = *other
return nil
}
func encodeIteratorOptions(opt *IteratorOptions) *internal.IteratorOptions {
pb := &internal.IteratorOptions{
Aux: opt.Aux,
Interval: encodeInterval(opt.Interval),
Dimensions: opt.Dimensions,
Fill: proto.Int32(int32(opt.Fill)),
StartTime: proto.Int64(opt.StartTime),
EndTime: proto.Int64(opt.EndTime),
Ascending: proto.Bool(opt.Ascending),
Limit: proto.Int64(int64(opt.Limit)),
Offset: proto.Int64(int64(opt.Offset)),
SLimit: proto.Int64(int64(opt.SLimit)),
SOffset: proto.Int64(int64(opt.SOffset)),
Dedupe: proto.Bool(opt.Dedupe),
}
// Set expression, if set.
if opt.Expr != nil {
pb.Expr = proto.String(opt.Expr.String())
}
// Convert and encode sources to measurements.
sources := make([]*internal.Measurement, len(opt.Sources))
for i, source := range opt.Sources {
mm := source.(*Measurement)
sources[i] = encodeMeasurement(mm)
}
pb.Sources = sources
// Fill value can only be a number. Set it if available.
if v, ok := opt.FillValue.(float64); ok {
pb.FillValue = proto.Float64(v)
}
// Set condition, if set.
if opt.Condition != nil {
pb.Condition = proto.String(opt.Condition.String())
}
return pb
}
func decodeIteratorOptions(pb *internal.IteratorOptions) (*IteratorOptions, error) {
opt := &IteratorOptions{
Aux: pb.GetAux(),
Interval: decodeInterval(pb.GetInterval()),
Dimensions: pb.GetDimensions(),
Fill: FillOption(pb.GetFill()),
FillValue: pb.GetFillValue(),
StartTime: pb.GetStartTime(),
EndTime: pb.GetEndTime(),
Ascending: pb.GetAscending(),
Limit: int(pb.GetLimit()),
Offset: int(pb.GetOffset()),
SLimit: int(pb.GetSLimit()),
SOffset: int(pb.GetSOffset()),
Dedupe: pb.GetDedupe(),
}
// Set expression, if set.
if pb.Expr != nil {
expr, err := ParseExpr(pb.GetExpr())
if err != nil {
return nil, err
}
opt.Expr = expr
}
// Convert and encode sources to measurements.
sources := make([]Source, len(pb.GetSources()))
for i, source := range pb.GetSources() {
mm, err := decodeMeasurement(source)
if err != nil {
return nil, err
}
sources[i] = mm
}
opt.Sources = sources
// Set condition, if set.
if pb.Condition != nil {
expr, err := ParseExpr(pb.GetCondition())
if err != nil {
return nil, err
}
opt.Condition = expr
}
return opt, nil
}
// selectInfo represents an object that stores info about select fields.
type selectInfo struct {
calls map[*Call]struct{}
@ -623,6 +859,20 @@ type Interval struct {
// IsZero returns true if the interval has no duration.
func (i Interval) IsZero() bool { return i.Duration == 0 }
func encodeInterval(i Interval) *internal.Interval {
return &internal.Interval{
Duration: proto.Int64(i.Duration.Nanoseconds()),
Offset: proto.Int64(i.Offset.Nanoseconds()),
}
}
func decodeInterval(pb *internal.Interval) Interval {
return Interval{
Duration: time.Duration(pb.GetDuration()),
Offset: time.Duration(pb.GetOffset()),
}
}
// reduceOptions represents options for performing reductions on windows of points.
type reduceOptions struct {
startTime int64

View File

@ -4,6 +4,8 @@ import (
"fmt"
"math"
"math/rand"
"reflect"
"regexp"
"testing"
"time"
@ -860,6 +862,70 @@ func TestIteratorOptions_DerivativeInterval_Call(t *testing.T) {
}
}
// Ensure iterator options can be marshaled to and from a binary format.
func TestIteratorOptions_MarshalBinary(t *testing.T) {
opt := &influxql.IteratorOptions{
Expr: MustParseExpr("count(value)"),
Aux: []string{"a", "b", "c"},
Sources: []influxql.Source{
&influxql.Measurement{Database: "db0", RetentionPolicy: "rp0", Name: "mm0"},
},
Interval: influxql.Interval{
Duration: 1 * time.Hour,
Offset: 20 * time.Minute,
},
Dimensions: []string{"region", "host"},
Fill: influxql.NumberFill,
FillValue: float64(100),
Condition: MustParseExpr(`foo = 'bar'`),
StartTime: 1000,
EndTime: 2000,
Ascending: true,
Limit: 100,
Offset: 200,
SLimit: 300,
SOffset: 400,
Dedupe: true,
}
// Marshal to binary.
buf, err := opt.MarshalBinary()
if err != nil {
t.Fatal(err)
}
// Unmarshal back to an object.
var other influxql.IteratorOptions
if err := other.UnmarshalBinary(buf); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(&other, opt) {
t.Fatalf("unexpected options: %s", spew.Sdump(other))
}
}
// Ensure iterator options with a regex measurement can be marshaled.
func TestIteratorOptions_MarshalBinary_Measurement_Regex(t *testing.T) {
opt := &influxql.IteratorOptions{
Sources: []influxql.Source{
&influxql.Measurement{Database: "db1", RetentionPolicy: "rp2", Regex: &influxql.RegexLiteral{Val: regexp.MustCompile(`series.+`)}},
},
}
// Marshal to binary.
buf, err := opt.MarshalBinary()
if err != nil {
t.Fatal(err)
}
// Unmarshal back to an object.
var other influxql.IteratorOptions
if err := other.UnmarshalBinary(buf); err != nil {
t.Fatal(err)
} else if v := other.Sources[0].(*influxql.Measurement).Regex.Val.String(); v != `/series.+/` {
t.Fatalf("unexpected measurement regex: %s", v)
}
}
// IteratorCreator is a mockable implementation of SelectStatementExecutor.IteratorCreator.
type IteratorCreator struct {
CreateIteratorFn func(opt influxql.IteratorOptions) (influxql.Iterator, error)

View File

@ -7,6 +7,9 @@
package influxql
import (
"encoding/binary"
"io"
"github.com/gogo/protobuf/proto"
"github.com/influxdata/influxdb/influxql/internal"
)
@ -123,6 +126,70 @@ func floatPointsSortBy(points []FloatPoint, cmp func(a, b *FloatPoint) bool) *fl
}
}
// NewFloatPointEncoder encodes FloatPoint points to a writer.
type FloatPointEncoder struct {
w io.Writer
}
// NewFloatPointEncoder returns a new instance of FloatPointEncoder that writes to w.
func NewFloatPointEncoder(w io.Writer) *FloatPointEncoder {
return &FloatPointEncoder{w: w}
}
// EncodeFloatPoint marshals and writes p to the underlying writer.
func (enc *FloatPointEncoder) EncodeFloatPoint(p *FloatPoint) error {
// Marshal to bytes.
buf, err := proto.Marshal(encodeFloatPoint(p))
if err != nil {
return err
}
// Write the length.
if err := binary.Write(enc.w, binary.BigEndian, uint32(len(buf))); err != nil {
return err
}
// Write the encoded point.
if _, err := enc.w.Write(buf); err != nil {
return err
}
return nil
}
// NewFloatPointDecoder decodes FloatPoint points from a reader.
type FloatPointDecoder struct {
r io.Reader
}
// NewFloatPointDecoder returns a new instance of FloatPointDecoder that reads from r.
func NewFloatPointDecoder(r io.Reader) *FloatPointDecoder {
return &FloatPointDecoder{r: r}
}
// DecodeFloatPoint reads from the underlying reader and unmarshals into p.
func (dec *FloatPointDecoder) DecodeFloatPoint(p *FloatPoint) error {
// Read length.
var sz uint32
if err := binary.Read(dec.r, binary.BigEndian, &sz); err != nil {
return err
}
// Read point data.
buf := make([]byte, sz)
if _, err := io.ReadFull(dec.r, buf); err != nil {
return err
}
// Unmarshal into point.
var pb internal.Point
if err := proto.Unmarshal(buf, &pb); err != nil {
return err
}
*p = *decodeFloatPoint(&pb)
return nil
}
// IntegerPoint represents a point with a int64 value.
// DO NOT ADD ADDITIONAL FIELDS TO THIS STRUCT.
// See TestPoint_Fields in influxql/point_test.go for more details.
@ -235,6 +302,70 @@ func integerPointsSortBy(points []IntegerPoint, cmp func(a, b *IntegerPoint) boo
}
}
// NewIntegerPointEncoder encodes IntegerPoint points to a writer.
type IntegerPointEncoder struct {
w io.Writer
}
// NewIntegerPointEncoder returns a new instance of IntegerPointEncoder that writes to w.
func NewIntegerPointEncoder(w io.Writer) *IntegerPointEncoder {
return &IntegerPointEncoder{w: w}
}
// EncodeIntegerPoint marshals and writes p to the underlying writer.
func (enc *IntegerPointEncoder) EncodeIntegerPoint(p *IntegerPoint) error {
// Marshal to bytes.
buf, err := proto.Marshal(encodeIntegerPoint(p))
if err != nil {
return err
}
// Write the length.
if err := binary.Write(enc.w, binary.BigEndian, uint32(len(buf))); err != nil {
return err
}
// Write the encoded point.
if _, err := enc.w.Write(buf); err != nil {
return err
}
return nil
}
// NewIntegerPointDecoder decodes IntegerPoint points from a reader.
type IntegerPointDecoder struct {
r io.Reader
}
// NewIntegerPointDecoder returns a new instance of IntegerPointDecoder that reads from r.
func NewIntegerPointDecoder(r io.Reader) *IntegerPointDecoder {
return &IntegerPointDecoder{r: r}
}
// DecodeIntegerPoint reads from the underlying reader and unmarshals into p.
func (dec *IntegerPointDecoder) DecodeIntegerPoint(p *IntegerPoint) error {
// Read length.
var sz uint32
if err := binary.Read(dec.r, binary.BigEndian, &sz); err != nil {
return err
}
// Read point data.
buf := make([]byte, sz)
if _, err := io.ReadFull(dec.r, buf); err != nil {
return err
}
// Unmarshal into point.
var pb internal.Point
if err := proto.Unmarshal(buf, &pb); err != nil {
return err
}
*p = *decodeIntegerPoint(&pb)
return nil
}
// StringPoint represents a point with a string value.
// DO NOT ADD ADDITIONAL FIELDS TO THIS STRUCT.
// See TestPoint_Fields in influxql/point_test.go for more details.
@ -347,6 +478,70 @@ func stringPointsSortBy(points []StringPoint, cmp func(a, b *StringPoint) bool)
}
}
// NewStringPointEncoder encodes StringPoint points to a writer.
type StringPointEncoder struct {
w io.Writer
}
// NewStringPointEncoder returns a new instance of StringPointEncoder that writes to w.
func NewStringPointEncoder(w io.Writer) *StringPointEncoder {
return &StringPointEncoder{w: w}
}
// EncodeStringPoint marshals and writes p to the underlying writer.
func (enc *StringPointEncoder) EncodeStringPoint(p *StringPoint) error {
// Marshal to bytes.
buf, err := proto.Marshal(encodeStringPoint(p))
if err != nil {
return err
}
// Write the length.
if err := binary.Write(enc.w, binary.BigEndian, uint32(len(buf))); err != nil {
return err
}
// Write the encoded point.
if _, err := enc.w.Write(buf); err != nil {
return err
}
return nil
}
// NewStringPointDecoder decodes StringPoint points from a reader.
type StringPointDecoder struct {
r io.Reader
}
// NewStringPointDecoder returns a new instance of StringPointDecoder that reads from r.
func NewStringPointDecoder(r io.Reader) *StringPointDecoder {
return &StringPointDecoder{r: r}
}
// DecodeStringPoint reads from the underlying reader and unmarshals into p.
func (dec *StringPointDecoder) DecodeStringPoint(p *StringPoint) error {
// Read length.
var sz uint32
if err := binary.Read(dec.r, binary.BigEndian, &sz); err != nil {
return err
}
// Read point data.
buf := make([]byte, sz)
if _, err := io.ReadFull(dec.r, buf); err != nil {
return err
}
// Unmarshal into point.
var pb internal.Point
if err := proto.Unmarshal(buf, &pb); err != nil {
return err
}
*p = *decodeStringPoint(&pb)
return nil
}
// BooleanPoint represents a point with a bool value.
// DO NOT ADD ADDITIONAL FIELDS TO THIS STRUCT.
// See TestPoint_Fields in influxql/point_test.go for more details.
@ -458,3 +653,67 @@ func booleanPointsSortBy(points []BooleanPoint, cmp func(a, b *BooleanPoint) boo
cmp: cmp,
}
}
// NewBooleanPointEncoder encodes BooleanPoint points to a writer.
type BooleanPointEncoder struct {
w io.Writer
}
// NewBooleanPointEncoder returns a new instance of BooleanPointEncoder that writes to w.
func NewBooleanPointEncoder(w io.Writer) *BooleanPointEncoder {
return &BooleanPointEncoder{w: w}
}
// EncodeBooleanPoint marshals and writes p to the underlying writer.
func (enc *BooleanPointEncoder) EncodeBooleanPoint(p *BooleanPoint) error {
// Marshal to bytes.
buf, err := proto.Marshal(encodeBooleanPoint(p))
if err != nil {
return err
}
// Write the length.
if err := binary.Write(enc.w, binary.BigEndian, uint32(len(buf))); err != nil {
return err
}
// Write the encoded point.
if _, err := enc.w.Write(buf); err != nil {
return err
}
return nil
}
// NewBooleanPointDecoder decodes BooleanPoint points from a reader.
type BooleanPointDecoder struct {
r io.Reader
}
// NewBooleanPointDecoder returns a new instance of BooleanPointDecoder that reads from r.
func NewBooleanPointDecoder(r io.Reader) *BooleanPointDecoder {
return &BooleanPointDecoder{r: r}
}
// DecodeBooleanPoint reads from the underlying reader and unmarshals into p.
func (dec *BooleanPointDecoder) DecodeBooleanPoint(p *BooleanPoint) error {
// Read length.
var sz uint32
if err := binary.Read(dec.r, binary.BigEndian, &sz); err != nil {
return err
}
// Read point data.
buf := make([]byte, sz)
if _, err := io.ReadFull(dec.r, buf); err != nil {
return err
}
// Unmarshal into point.
var pb internal.Point
if err := proto.Unmarshal(buf, &pb); err != nil {
return err
}
*p = *decodeBooleanPoint(&pb)
return nil
}

View File

@ -1,6 +1,9 @@
package influxql
import (
"encoding/binary"
"io"
"github.com/gogo/protobuf/proto"
"github.com/influxdata/influxdb/influxql/internal"
)
@ -129,4 +132,69 @@ func {{.name}}PointsSortBy(points []{{.Name}}Point, cmp func(a, b *{{.Name}}Poin
}
}
// New{{.Name}}PointEncoder encodes {{.Name}}Point points to a writer.
type {{.Name}}PointEncoder struct {
w io.Writer
}
// New{{.Name}}PointEncoder returns a new instance of {{.Name}}PointEncoder that writes to w.
func New{{.Name}}PointEncoder(w io.Writer) *{{.Name}}PointEncoder {
return &{{.Name}}PointEncoder{w: w}
}
// Encode{{.Name}}Point marshals and writes p to the underlying writer.
func (enc *{{.Name}}PointEncoder) Encode{{.Name}}Point(p *{{.Name}}Point) error {
// Marshal to bytes.
buf, err := proto.Marshal(encode{{.Name}}Point(p))
if err != nil {
return err
}
// Write the length.
if err := binary.Write(enc.w, binary.BigEndian, uint32(len(buf))); err != nil {
return err
}
// Write the encoded point.
if _, err := enc.w.Write(buf); err != nil {
return err
}
return nil
}
// New{{.Name}}PointDecoder decodes {{.Name}}Point points from a reader.
type {{.Name}}PointDecoder struct {
r io.Reader
}
// New{{.Name}}PointDecoder returns a new instance of {{.Name}}PointDecoder that reads from r.
func New{{.Name}}PointDecoder(r io.Reader) *{{.Name}}PointDecoder {
return &{{.Name}}PointDecoder{r: r}
}
// Decode{{.Name}}Point reads from the underlying reader and unmarshals into p.
func (dec *{{.Name}}PointDecoder) Decode{{.Name}}Point(p *{{.Name}}Point) error {
// Read length.
var sz uint32
if err := binary.Read(dec.r, binary.BigEndian, &sz); err != nil {
return err
}
// Read point data.
buf := make([]byte, sz)
if _, err := io.ReadFull(dec.r, buf); err != nil {
return err
}
// Unmarshal into point.
var pb internal.Point
if err := proto.Unmarshal(buf, &pb); err != nil {
return err
}
*p = *decode{{.Name}}Point(&pb)
return nil
}
{{end}}

View File

@ -1,6 +1,9 @@
package influxql
import (
"bytes"
"encoding/binary"
"io"
"sort"
"github.com/gogo/protobuf/proto"
@ -164,7 +167,26 @@ func encodeTags(m map[string]string) []byte {
}
// decodeTags parses an identifier into a map of tags.
func decodeTags(id []byte) map[string]string { panic("FIXME: implement") }
func decodeTags(id []byte) map[string]string {
a := bytes.Split(id, []byte{'\x00'})
// There must be an even number of segments.
if len(a) > 0 && len(a)%2 == 1 {
a = a[:len(a)-1]
}
// Return nil if there are no segments.
if len(a) == 0 {
return nil
}
// Decode key/value tags.
m := make(map[string]string)
for i := 0; i < len(a); i += 2 {
m[string(a[i])] = string(a[i+1])
}
return m
}
func encodeAux(aux []interface{}) []*internal.Aux {
pb := make([]*internal.Aux, len(aux))
@ -227,3 +249,46 @@ func decodeAux(pb []*internal.Aux) []interface{} {
}
return aux
}
// NewPointDecoder decodes generic points from a reader.
type PointDecoder struct {
r io.Reader
}
// NewPointDecoder returns a new instance of PointDecoder that reads from r.
func NewPointDecoder(r io.Reader) *PointDecoder {
return &PointDecoder{r: r}
}
// DecodePoint reads from the underlying reader and unmarshals into p.
func (dec *PointDecoder) DecodePoint(p *Point) error {
// Read length.
var sz uint32
if err := binary.Read(dec.r, binary.BigEndian, &sz); err != nil {
return err
}
// Read point data.
buf := make([]byte, sz)
if _, err := io.ReadFull(dec.r, buf); err != nil {
return err
}
// Unmarshal into point.
var pb internal.Point
if err := proto.Unmarshal(buf, &pb); err != nil {
return err
}
if pb.IntegerValue != nil {
*p = decodeIntegerPoint(&pb)
} else if pb.StringValue != nil {
*p = decodeStringPoint(&pb)
} else if pb.BooleanValue != nil {
*p = decodeBooleanPoint(&pb)
} else {
*p = decodeFloatPoint(&pb)
}
return nil
}

View File

@ -675,9 +675,9 @@ func (c *Client) ShardGroupsByTimeRange(database, policy string, min, max time.T
return groups, nil
}
// ShardIDsByTimeRange returns a slice of shards that may contain data in the time range.
func (c *Client) ShardIDsByTimeRange(sources influxql.Sources, tmin, tmax time.Time) (a []uint64, err error) {
m := make(map[uint64]struct{})
// ShardsByTimeRange returns a slice of shards that may contain data in the time range.
func (c *Client) ShardsByTimeRange(sources influxql.Sources, tmin, tmax time.Time) (a []ShardInfo, err error) {
m := make(map[*ShardInfo]struct{})
for _, src := range sources {
mm, ok := src.(*influxql.Measurement)
if !ok {
@ -689,15 +689,15 @@ func (c *Client) ShardIDsByTimeRange(sources influxql.Sources, tmin, tmax time.T
return nil, err
}
for _, g := range groups {
for _, sh := range g.Shards {
m[sh.ID] = struct{}{}
for i := range g.Shards {
m[&g.Shards[i]] = struct{}{}
}
}
}
a = make([]uint64, 0, len(m))
for k := range m {
a = append(a, k)
a = make([]ShardInfo, 0, len(m))
for sh := range m {
a = append(a, *sh)
}
return a, nil

View File

@ -384,9 +384,30 @@ func (s *Shard) WriteTo(w io.Writer) (int64, error) {
// CreateIterator returns an iterator for the data in the shard.
func (s *Shard) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) {
if influxql.Sources(opt.Sources).HasSystemSource() {
return s.createSystemIterator(opt)
}
return s.engine.CreateIterator(opt)
}
// createSystemIterator returns an iterator for a system source.
func (s *Shard) createSystemIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) {
// Only support a single system source.
if len(opt.Sources) > 1 {
return nil, errors.New("cannot select from multiple system sources")
}
m := opt.Sources[0].(*influxql.Measurement)
switch m.Name {
case "_measurements":
return NewMeasurementIterator(s, opt)
case "_tagKeys":
return NewTagKeysIterator(s, opt)
default:
return nil, fmt.Errorf("unknown system source: %s", m.Name)
}
}
// FieldDimensions returns unique sets of fields and dimensions across a list of sources.
func (s *Shard) FieldDimensions(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) {
fields = make(map[string]struct{})
@ -416,78 +437,6 @@ func (s *Shard) FieldDimensions(sources influxql.Sources) (fields, dimensions ma
// SeriesKeys returns a list of series in the shard.
func (s *Shard) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error) {
return s.engine.SeriesKeys(opt)
}
// Shards represents a sortable list of shards.
type Shards []*Shard
func (a Shards) Len() int { return len(a) }
func (a Shards) Less(i, j int) bool { return a[i].id < a[j].id }
func (a Shards) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// CreateIterator returns a single combined iterator for the shards.
func (a Shards) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) {
if influxql.Sources(opt.Sources).HasSystemSource() {
return a.createSystemIterator(opt)
}
// Create iterators for each shard.
// Ensure that they are closed if an error occurs.
itrs := make([]influxql.Iterator, 0, len(a))
if err := func() error {
for _, sh := range a {
itr, err := sh.CreateIterator(opt)
if err != nil {
return err
}
itrs = append(itrs, itr)
}
return nil
}(); err != nil {
influxql.Iterators(itrs).Close()
return nil, err
}
// Merge into a single iterator.
if opt.MergeSorted() {
return influxql.NewSortedMergeIterator(itrs, opt), nil
}
itr := influxql.NewMergeIterator(itrs, opt)
if opt.Expr != nil {
if expr, ok := opt.Expr.(*influxql.Call); ok && expr.Name == "count" {
opt.Expr = &influxql.Call{
Name: "sum",
Args: expr.Args,
}
}
}
return influxql.NewCallIterator(itr, opt)
}
// createSystemIterator returns an iterator for a system source.
func (a Shards) createSystemIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) {
// Only support a single system source.
if len(opt.Sources) > 1 {
return nil, errors.New("cannot select from multiple system sources")
}
m := opt.Sources[0].(*influxql.Measurement)
switch m.Name {
case "_measurements":
return a.createMeasurementsIterator(opt)
case "_tagKeys":
return a.createTagKeysIterator(opt)
default:
return nil, fmt.Errorf("unknown system source: %s", m.Name)
}
}
// SeriesKeys returns a list of series in in all shards in a. If a series
// exists in multiple shards in a, all instances will be combined into a single
// Series by calling Combine on it.
func (a Shards) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error) {
if influxql.Sources(opt.Sources).HasSystemSource() {
// Only support a single system source.
if len(opt.Sources) > 1 {
@ -497,88 +446,15 @@ func (a Shards) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, e
return []influxql.Series{{Aux: []influxql.DataType{influxql.String}}}, nil
}
seriesMap := make(map[string]influxql.Series)
for _, sh := range a {
series, err := sh.SeriesKeys(opt)
if err != nil {
return nil, err
}
for _, s := range series {
cur, ok := seriesMap[s.ID()]
if ok {
cur.Combine(&s)
} else {
seriesMap[s.ID()] = s
}
}
}
seriesList := make([]influxql.Series, 0, len(seriesMap))
for _, s := range seriesMap {
seriesList = append(seriesList, s)
}
sort.Sort(influxql.SeriesList(seriesList))
return influxql.SeriesList(seriesList), nil
return s.engine.SeriesKeys(opt)
}
// createMeasurementsIterator returns an iterator for all measurement names.
func (a Shards) createMeasurementsIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) {
itrs := make([]influxql.Iterator, 0, len(a))
if err := func() error {
for _, sh := range a {
itr, err := NewMeasurementIterator(sh, opt)
if err != nil {
return err
}
itrs = append(itrs, itr)
}
return nil
}(); err != nil {
influxql.Iterators(itrs).Close()
return nil, err
}
return influxql.NewMergeIterator(itrs, opt), nil
}
// Shards represents a sortable list of shards.
type Shards []*Shard
// createTagKeysIterator returns an iterator for all tag keys across measurements.
func (a Shards) createTagKeysIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) {
itrs := make([]influxql.Iterator, 0, len(a))
if err := func() error {
for _, sh := range a {
itr, err := NewTagKeysIterator(sh, opt)
if err != nil {
return err
}
itrs = append(itrs, itr)
}
return nil
}(); err != nil {
influxql.Iterators(itrs).Close()
return nil, err
}
return influxql.NewMergeIterator(itrs, opt), nil
}
// FieldDimensions returns the unique fields and dimensions across a list of sources.
func (a Shards) FieldDimensions(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) {
fields = make(map[string]struct{})
dimensions = make(map[string]struct{})
for _, sh := range a {
f, d, err := sh.FieldDimensions(sources)
if err != nil {
return nil, nil, err
}
for k := range f {
fields[k] = struct{}{}
}
for k := range d {
dimensions[k] = struct{}{}
}
}
return
}
func (a Shards) Len() int { return len(a) }
func (a Shards) Less(i, j int) bool { return a[i].id < a[j].id }
func (a Shards) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// MeasurementFields holds the fields of a measurement and their codec.
type MeasurementFields struct {
@ -909,6 +785,24 @@ func (f *FieldCodec) FieldByName(name string) *Field {
return f.fieldsByName[name]
}
// shardIteratorCreator creates iterators for a local shard.
// This simply wraps the shard so that Close() does not close the underlying shard.
type shardIteratorCreator struct {
sh *Shard
}
func (ic *shardIteratorCreator) Close() error { return nil }
func (ic *shardIteratorCreator) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return ic.sh.CreateIterator(opt)
}
func (ic *shardIteratorCreator) FieldDimensions(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) {
return ic.sh.FieldDimensions(sources)
}
func (ic *shardIteratorCreator) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error) {
return ic.sh.SeriesKeys(opt)
}
// MeasurementIterator represents a string iterator that emits all measurement names in a shard.
type MeasurementIterator struct {
mms Measurements

View File

@ -299,8 +299,16 @@ func (s *Store) deleteShard(shardID uint64) error {
return nil
}
// DeleteDatabase will close all shards associated with a database and
// remove the directory and files from disk.
// ShardIteratorCreator returns an iterator creator for a shard.
func (s *Store) ShardIteratorCreator(id uint64) influxql.IteratorCreator {
sh := s.Shard(id)
if sh == nil {
return nil
}
return &shardIteratorCreator{sh: sh}
}
// DeleteDatabase will close all shards associated with a database and remove the directory and files from disk.
func (s *Store) DeleteDatabase(name string) error {
s.mu.Lock()
defer s.mu.Unlock()

View File

@ -215,8 +215,15 @@ func TestShards_CreateIterator(t *testing.T) {
`cpu,host=serverC value=3 60`,
)
// Retrieve shards and convert to iterator creators.
shards := s.Shards([]uint64{0, 1})
ics := make(influxql.IteratorCreators, len(shards))
for i := range ics {
ics[i] = shards[i]
}
// Create iterator.
itr, err := tsdb.Shards(s.Shards([]uint64{0, 1})).CreateIterator(influxql.IteratorOptions{
itr, err := ics.CreateIterator(influxql.IteratorOptions{
Expr: influxql.MustParseExpr(`value`),
Dimensions: []string{"host"},
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},