diff --git a/cluster/internal/data.pb.go b/cluster/internal/data.pb.go index 316116f4e0..f000aad978 100644 --- a/cluster/internal/data.pb.go +++ b/cluster/internal/data.pb.go @@ -11,25 +11,27 @@ It is generated from these files: It has these top-level messages: WriteShardRequest WriteShardResponse - MapShardRequest - MapShardResponse ExecuteStatementRequest ExecuteStatementResponse + CreateIteratorRequest + CreateIteratorResponse + FieldDimensionsRequest + FieldDimensionsResponse + SeriesKeysRequest + SeriesKeysResponse */ package internal import proto "github.com/gogo/protobuf/proto" -import fmt "fmt" import math "math" // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal -var _ = fmt.Errorf var _ = math.Inf type WriteShardRequest struct { - ShardID *uint64 `protobuf:"varint,1,req,name=ShardID" json:"ShardID,omitempty"` - Points [][]byte `protobuf:"bytes,2,rep,name=Points" json:"Points,omitempty"` + ShardID *uint64 `protobuf:"varint,1,req" json:"ShardID,omitempty"` + Points [][]byte `protobuf:"bytes,2,rep" json:"Points,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -52,8 +54,8 @@ func (m *WriteShardRequest) GetPoints() [][]byte { } type WriteShardResponse struct { - Code *int32 `protobuf:"varint,1,req,name=Code" json:"Code,omitempty"` - Message *string `protobuf:"bytes,2,opt,name=Message" json:"Message,omitempty"` + Code *int32 `protobuf:"varint,1,req" json:"Code,omitempty"` + Message *string `protobuf:"bytes,2,opt" json:"Message,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -75,89 +77,9 @@ func (m *WriteShardResponse) GetMessage() string { return "" } -type MapShardRequest struct { - ShardID *uint64 `protobuf:"varint,1,req,name=ShardID" json:"ShardID,omitempty"` - Query *string `protobuf:"bytes,2,req,name=Query" json:"Query,omitempty"` - ChunkSize *int32 `protobuf:"varint,3,req,name=ChunkSize" json:"ChunkSize,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *MapShardRequest) Reset() { *m = MapShardRequest{} } -func (m *MapShardRequest) String() string { return proto.CompactTextString(m) } -func (*MapShardRequest) ProtoMessage() {} - -func (m *MapShardRequest) GetShardID() uint64 { - if m != nil && m.ShardID != nil { - return *m.ShardID - } - return 0 -} - -func (m *MapShardRequest) GetQuery() string { - if m != nil && m.Query != nil { - return *m.Query - } - return "" -} - -func (m *MapShardRequest) GetChunkSize() int32 { - if m != nil && m.ChunkSize != nil { - return *m.ChunkSize - } - return 0 -} - -type MapShardResponse struct { - Code *int32 `protobuf:"varint,1,req,name=Code" json:"Code,omitempty"` - Message *string `protobuf:"bytes,2,opt,name=Message" json:"Message,omitempty"` - Data []byte `protobuf:"bytes,3,opt,name=Data" json:"Data,omitempty"` - TagSets []string `protobuf:"bytes,4,rep,name=TagSets" json:"TagSets,omitempty"` - Fields []string `protobuf:"bytes,5,rep,name=Fields" json:"Fields,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *MapShardResponse) Reset() { *m = MapShardResponse{} } -func (m *MapShardResponse) String() string { return proto.CompactTextString(m) } -func (*MapShardResponse) ProtoMessage() {} - -func (m *MapShardResponse) GetCode() int32 { - if m != nil && m.Code != nil { - return *m.Code - } - return 0 -} - -func (m *MapShardResponse) GetMessage() string { - if m != nil && m.Message != nil { - return *m.Message - } - return "" -} - -func (m *MapShardResponse) GetData() []byte { - if m != nil { - return m.Data - } - return nil -} - -func (m *MapShardResponse) GetTagSets() []string { - if m != nil { - return m.TagSets - } - return nil -} - -func (m *MapShardResponse) GetFields() []string { - if m != nil { - return m.Fields - } - return nil -} - type ExecuteStatementRequest struct { - Statement *string `protobuf:"bytes,1,req,name=Statement" json:"Statement,omitempty"` - Database *string `protobuf:"bytes,2,req,name=Database" json:"Database,omitempty"` + Statement *string `protobuf:"bytes,1,req" json:"Statement,omitempty"` + Database *string `protobuf:"bytes,2,req" json:"Database,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -180,8 +102,8 @@ func (m *ExecuteStatementRequest) GetDatabase() string { } type ExecuteStatementResponse struct { - Code *int32 `protobuf:"varint,1,req,name=Code" json:"Code,omitempty"` - Message *string `protobuf:"bytes,2,opt,name=Message" json:"Message,omitempty"` + Code *int32 `protobuf:"varint,1,req" json:"Code,omitempty"` + Message *string `protobuf:"bytes,2,opt" json:"Message,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -203,11 +125,149 @@ func (m *ExecuteStatementResponse) GetMessage() string { return "" } -func init() { - proto.RegisterType((*WriteShardRequest)(nil), "internal.WriteShardRequest") - proto.RegisterType((*WriteShardResponse)(nil), "internal.WriteShardResponse") - proto.RegisterType((*MapShardRequest)(nil), "internal.MapShardRequest") - proto.RegisterType((*MapShardResponse)(nil), "internal.MapShardResponse") - proto.RegisterType((*ExecuteStatementRequest)(nil), "internal.ExecuteStatementRequest") - proto.RegisterType((*ExecuteStatementResponse)(nil), "internal.ExecuteStatementResponse") +type CreateIteratorRequest struct { + ShardIDs []uint64 `protobuf:"varint,1,rep" json:"ShardIDs,omitempty"` + Opt []byte `protobuf:"bytes,2,req" json:"Opt,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *CreateIteratorRequest) Reset() { *m = CreateIteratorRequest{} } +func (m *CreateIteratorRequest) String() string { return proto.CompactTextString(m) } +func (*CreateIteratorRequest) ProtoMessage() {} + +func (m *CreateIteratorRequest) GetShardIDs() []uint64 { + if m != nil { + return m.ShardIDs + } + return nil +} + +func (m *CreateIteratorRequest) GetOpt() []byte { + if m != nil { + return m.Opt + } + return nil +} + +type CreateIteratorResponse struct { + Err *string `protobuf:"bytes,1,opt" json:"Err,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *CreateIteratorResponse) Reset() { *m = CreateIteratorResponse{} } +func (m *CreateIteratorResponse) String() string { return proto.CompactTextString(m) } +func (*CreateIteratorResponse) ProtoMessage() {} + +func (m *CreateIteratorResponse) GetErr() string { + if m != nil && m.Err != nil { + return *m.Err + } + return "" +} + +type FieldDimensionsRequest struct { + ShardIDs []uint64 `protobuf:"varint,1,rep" json:"ShardIDs,omitempty"` + Sources []byte `protobuf:"bytes,2,req" json:"Sources,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *FieldDimensionsRequest) Reset() { *m = FieldDimensionsRequest{} } +func (m *FieldDimensionsRequest) String() string { return proto.CompactTextString(m) } +func (*FieldDimensionsRequest) ProtoMessage() {} + +func (m *FieldDimensionsRequest) GetShardIDs() []uint64 { + if m != nil { + return m.ShardIDs + } + return nil +} + +func (m *FieldDimensionsRequest) GetSources() []byte { + if m != nil { + return m.Sources + } + return nil +} + +type FieldDimensionsResponse struct { + Fields []string `protobuf:"bytes,1,rep" json:"Fields,omitempty"` + Dimensions []string `protobuf:"bytes,2,rep" json:"Dimensions,omitempty"` + Err *string `protobuf:"bytes,3,opt" json:"Err,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *FieldDimensionsResponse) Reset() { *m = FieldDimensionsResponse{} } +func (m *FieldDimensionsResponse) String() string { return proto.CompactTextString(m) } +func (*FieldDimensionsResponse) ProtoMessage() {} + +func (m *FieldDimensionsResponse) GetFields() []string { + if m != nil { + return m.Fields + } + return nil +} + +func (m *FieldDimensionsResponse) GetDimensions() []string { + if m != nil { + return m.Dimensions + } + return nil +} + +func (m *FieldDimensionsResponse) GetErr() string { + if m != nil && m.Err != nil { + return *m.Err + } + return "" +} + +type SeriesKeysRequest struct { + ShardIDs []uint64 `protobuf:"varint,1,rep" json:"ShardIDs,omitempty"` + Opt []byte `protobuf:"bytes,2,req" json:"Opt,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *SeriesKeysRequest) Reset() { *m = SeriesKeysRequest{} } +func (m *SeriesKeysRequest) String() string { return proto.CompactTextString(m) } +func (*SeriesKeysRequest) ProtoMessage() {} + +func (m *SeriesKeysRequest) GetShardIDs() []uint64 { + if m != nil { + return m.ShardIDs + } + return nil +} + +func (m *SeriesKeysRequest) GetOpt() []byte { + if m != nil { + return m.Opt + } + return nil +} + +type SeriesKeysResponse struct { + SeriesList []byte `protobuf:"bytes,1,opt" json:"SeriesList,omitempty"` + Err *string `protobuf:"bytes,2,opt" json:"Err,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *SeriesKeysResponse) Reset() { *m = SeriesKeysResponse{} } +func (m *SeriesKeysResponse) String() string { return proto.CompactTextString(m) } +func (*SeriesKeysResponse) ProtoMessage() {} + +func (m *SeriesKeysResponse) GetSeriesList() []byte { + if m != nil { + return m.SeriesList + } + return nil +} + +func (m *SeriesKeysResponse) GetErr() string { + if m != nil && m.Err != nil { + return *m.Err + } + return "" +} + +func init() { } diff --git a/cluster/internal/data.proto b/cluster/internal/data.proto index 9b798cbd7a..bae9d4855a 100644 --- a/cluster/internal/data.proto +++ b/cluster/internal/data.proto @@ -2,34 +2,51 @@ package internal; message WriteShardRequest { required uint64 ShardID = 1; - repeated bytes Points = 2; + repeated bytes Points = 2; } message WriteShardResponse { - required int32 Code = 1; + required int32 Code = 1; optional string Message = 2; } -message MapShardRequest { - required uint64 ShardID = 1; - required string Query = 2; - required int32 ChunkSize = 3; -} - -message MapShardResponse { - required int32 Code = 1; - optional string Message = 2; - optional bytes Data = 3; - repeated string TagSets = 4; - repeated string Fields = 5; -} - message ExecuteStatementRequest { required string Statement = 1; - required string Database = 2; + required string Database = 2; } message ExecuteStatementResponse { - required int32 Code = 1; + required int32 Code = 1; optional string Message = 2; } + +message CreateIteratorRequest { + repeated uint64 ShardIDs = 1; + required bytes Opt = 2; +} + +message CreateIteratorResponse { + optional string Err = 1; +} + +message FieldDimensionsRequest { + repeated uint64 ShardIDs = 1; + required bytes Sources = 2; +} + +message FieldDimensionsResponse { + repeated string Fields = 1; + repeated string Dimensions = 2; + optional string Err = 3; +} + +message SeriesKeysRequest { + repeated uint64 ShardIDs = 1; + required bytes Opt = 2; +} + +message SeriesKeysResponse { + optional bytes SeriesList = 1; + optional string Err = 2; +} + diff --git a/cluster/iterator_creator.go b/cluster/iterator_creator.go deleted file mode 100644 index b0cc610588..0000000000 --- a/cluster/iterator_creator.go +++ /dev/null @@ -1,46 +0,0 @@ -package cluster - -import ( - "time" - - "github.com/influxdata/influxdb/influxql" - "github.com/influxdata/influxdb/services/meta" -) - -// IteratorCreator is responsible for creating iterators for queries. -// Iterators can be created for the local node or can be retrieved remotely. -type IteratorCreator struct { - MetaStore interface { - NodeID() uint64 - Node(id uint64) (ni *meta.NodeInfo, err error) - } - - TSDBStore influxql.IteratorCreator - - // Duration before idle remote iterators are disconnected. - Timeout time.Duration - - // Treats all shards as remote. Useful for testing. - ForceRemoteMapping bool - - pool *clientPool -} - -// NewIteratorCreator returns a new instance of IteratorCreator. -func NewIteratorCreator() *IteratorCreator { - return &IteratorCreator{ - pool: newClientPool(), - } -} - -// CreateIterator creates an iterator from local and remote shards. -func (ic *IteratorCreator) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) { - // FIXME(benbjohnson): Integrate remote execution. - return ic.TSDBStore.CreateIterator(opt) -} - -// FieldDimensions returns the unique fields and dimensions across a list of sources. -func (ic *IteratorCreator) FieldDimensions(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) { - // FIXME(benbjohnson): Integrate remote execution. - return ic.TSDBStore.FieldDimensions(sources) -} diff --git a/cluster/meta_client.go b/cluster/meta_client.go new file mode 100644 index 0000000000..dfdb7a09d7 --- /dev/null +++ b/cluster/meta_client.go @@ -0,0 +1,40 @@ +package cluster + +import ( + "time" + + "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxdb/services/meta" +) + +// MetaClient is an interface for accessing meta data. +type MetaClient interface { + CreateContinuousQuery(database, name, query string) error + CreateDatabase(name string) (*meta.DatabaseInfo, error) + CreateDatabaseWithRetentionPolicy(name string, rpi *meta.RetentionPolicyInfo) (*meta.DatabaseInfo, error) + CreateRetentionPolicy(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error) + CreateSubscription(database, rp, name, mode string, destinations []string) error + CreateUser(name, password string, admin bool) (*meta.UserInfo, error) + Database(name string) (*meta.DatabaseInfo, error) + Databases() ([]meta.DatabaseInfo, error) + DataNode(id uint64) (*meta.NodeInfo, error) + DataNodes() ([]meta.NodeInfo, error) + DeleteDataNode(id uint64) error + DeleteMetaNode(id uint64) error + DropContinuousQuery(database, name string) error + DropDatabase(name string) error + DropRetentionPolicy(database, name string) error + DropSubscription(database, rp, name string) error + DropUser(name string) error + MetaNodes() ([]meta.NodeInfo, error) + RetentionPolicy(database, name string) (rpi *meta.RetentionPolicyInfo, err error) + SetAdminPrivilege(username string, admin bool) error + SetDefaultRetentionPolicy(database, name string) error + SetPrivilege(username, database string, p influxql.Privilege) error + ShardsByTimeRange(sources influxql.Sources, tmin, tmax time.Time) (a []meta.ShardInfo, err error) + UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate) error + UpdateUser(name, password string) error + UserPrivilege(username, database string) (*influxql.Privilege, error) + UserPrivileges(username string) (map[string]influxql.Privilege, error) + Users() []meta.UserInfo +} diff --git a/cluster/meta_client_test.go b/cluster/meta_client_test.go new file mode 100644 index 0000000000..ebaff4c8db --- /dev/null +++ b/cluster/meta_client_test.go @@ -0,0 +1,160 @@ +package cluster_test + +import ( + "time" + + "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxdb/services/meta" +) + +// MetaClient is a mockable implementation of cluster.MetaClient. +type MetaClient struct { + CreateContinuousQueryFn func(database, name, query string) error + CreateDatabaseFn func(name string) (*meta.DatabaseInfo, error) + CreateDatabaseWithRetentionPolicyFn func(name string, rpi *meta.RetentionPolicyInfo) (*meta.DatabaseInfo, error) + CreateRetentionPolicyFn func(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error) + CreateSubscriptionFn func(database, rp, name, mode string, destinations []string) error + CreateUserFn func(name, password string, admin bool) (*meta.UserInfo, error) + DatabaseFn func(name string) (*meta.DatabaseInfo, error) + DatabasesFn func() ([]meta.DatabaseInfo, error) + DataNodeFn func(id uint64) (*meta.NodeInfo, error) + DataNodesFn func() ([]meta.NodeInfo, error) + DeleteDataNodeFn func(id uint64) error + DeleteMetaNodeFn func(id uint64) error + DropContinuousQueryFn func(database, name string) error + DropDatabaseFn func(name string) error + DropRetentionPolicyFn func(database, name string) error + DropSubscriptionFn func(database, rp, name string) error + DropUserFn func(name string) error + MetaNodesFn func() ([]meta.NodeInfo, error) + RetentionPolicyFn func(database, name string) (rpi *meta.RetentionPolicyInfo, err error) + SetAdminPrivilegeFn func(username string, admin bool) error + SetDefaultRetentionPolicyFn func(database, name string) error + SetPrivilegeFn func(username, database string, p influxql.Privilege) error + ShardsByTimeRangeFn func(sources influxql.Sources, tmin, tmax time.Time) (a []meta.ShardInfo, err error) + UpdateRetentionPolicyFn func(database, name string, rpu *meta.RetentionPolicyUpdate) error + UpdateUserFn func(name, password string) error + UserPrivilegeFn func(username, database string) (*influxql.Privilege, error) + UserPrivilegesFn func(username string) (map[string]influxql.Privilege, error) + UsersFn func() []meta.UserInfo +} + +func (c *MetaClient) CreateContinuousQuery(database, name, query string) error { + return c.CreateContinuousQueryFn(database, name, query) +} + +func (c *MetaClient) CreateDatabase(name string) (*meta.DatabaseInfo, error) { + return c.CreateDatabaseFn(name) +} + +func (c *MetaClient) CreateDatabaseWithRetentionPolicy(name string, rpi *meta.RetentionPolicyInfo) (*meta.DatabaseInfo, error) { + return c.CreateDatabaseWithRetentionPolicyFn(name, rpi) +} + +func (c *MetaClient) CreateRetentionPolicy(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error) { + return c.CreateRetentionPolicyFn(database, rpi) +} + +func (c *MetaClient) CreateSubscription(database, rp, name, mode string, destinations []string) error { + return c.CreateSubscriptionFn(database, rp, name, mode, destinations) +} + +func (c *MetaClient) CreateUser(name, password string, admin bool) (*meta.UserInfo, error) { + return c.CreateUserFn(name, password, admin) +} + +func (c *MetaClient) Database(name string) (*meta.DatabaseInfo, error) { + return c.DatabaseFn(name) +} + +func (c *MetaClient) Databases() ([]meta.DatabaseInfo, error) { + return c.DatabasesFn() +} + +func (c *MetaClient) DataNode(id uint64) (*meta.NodeInfo, error) { + return c.DataNodeFn(id) +} + +func (c *MetaClient) DataNodes() ([]meta.NodeInfo, error) { + return c.DataNodesFn() +} + +func (c *MetaClient) DeleteDataNode(id uint64) error { + return c.DeleteDataNodeFn(id) +} + +func (c *MetaClient) DeleteMetaNode(id uint64) error { + return c.DeleteMetaNodeFn(id) +} + +func (c *MetaClient) DropContinuousQuery(database, name string) error { + return c.DropContinuousQueryFn(database, name) +} + +func (c *MetaClient) DropDatabase(name string) error { + return c.DropDatabaseFn(name) +} + +func (c *MetaClient) DropRetentionPolicy(database, name string) error { + return c.DropRetentionPolicyFn(database, name) +} + +func (c *MetaClient) DropSubscription(database, rp, name string) error { + return c.DropSubscriptionFn(database, rp, name) +} + +func (c *MetaClient) DropUser(name string) error { + return c.DropUserFn(name) +} + +func (c *MetaClient) MetaNodes() ([]meta.NodeInfo, error) { + return c.MetaNodesFn() +} + +func (c *MetaClient) RetentionPolicy(database, name string) (rpi *meta.RetentionPolicyInfo, err error) { + return c.RetentionPolicyFn(database, name) +} + +func (c *MetaClient) SetAdminPrivilege(username string, admin bool) error { + return c.SetAdminPrivilegeFn(username, admin) +} + +func (c *MetaClient) SetDefaultRetentionPolicy(database, name string) error { + return c.SetDefaultRetentionPolicyFn(database, name) +} + +func (c *MetaClient) SetPrivilege(username, database string, p influxql.Privilege) error { + return c.SetPrivilegeFn(username, database, p) +} + +func (c *MetaClient) ShardsByTimeRange(sources influxql.Sources, tmin, tmax time.Time) (a []meta.ShardInfo, err error) { + return c.ShardsByTimeRangeFn(sources, tmin, tmax) +} + +func (c *MetaClient) UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate) error { + return c.UpdateRetentionPolicyFn(database, name, rpu) +} + +func (c *MetaClient) UpdateUser(name, password string) error { + return c.UpdateUserFn(name, password) +} + +func (c *MetaClient) UserPrivilege(username, database string) (*influxql.Privilege, error) { + return c.UserPrivilegeFn(username, database) +} + +func (c *MetaClient) UserPrivileges(username string) (map[string]influxql.Privilege, error) { + return c.UserPrivilegesFn(username) +} + +func (c *MetaClient) Users() []meta.UserInfo { + return c.UsersFn() +} + +// DefaultMetaClientDatabaseFn returns a single database (db0) with a retention policy. +func DefaultMetaClientDatabaseFn(name string) (*meta.DatabaseInfo, error) { + return &meta.DatabaseInfo{ + Name: DefaultDatabase, + DefaultRetentionPolicy: DefaultRetentionPolicy, + }, nil +} diff --git a/cluster/points_writer_test.go b/cluster/points_writer_test.go index 482b5ecb9b..c0cc0f1fbf 100644 --- a/cluster/points_writer_test.go +++ b/cluster/points_writer_test.go @@ -13,9 +13,11 @@ import ( "github.com/influxdata/influxdb/services/meta" ) +// TODO(benbjohnson): Rewrite tests to use cluster_test.MetaClient. + // Ensures the points writer maps a single point to a single shard. func TestPointsWriter_MapShards_One(t *testing.T) { - ms := MetaClient{} + ms := PointsWriterMetaClient{} rp := NewRetentionPolicy("myp", time.Hour, 3) ms.NodeIDFn = func() uint64 { return 1 } @@ -50,7 +52,7 @@ func TestPointsWriter_MapShards_One(t *testing.T) { // Ensures the points writer maps a multiple points across shard group boundaries. func TestPointsWriter_MapShards_Multiple(t *testing.T) { - ms := MetaClient{} + ms := PointsWriterMetaClient{} rp := NewRetentionPolicy("myp", time.Hour, 3) AttachShardGroupInfo(rp, []meta.ShardOwner{ {NodeID: 1}, @@ -304,7 +306,7 @@ func TestPointsWriter_WritePoints(t *testing.T) { }, } - ms := NewMetaClient() + ms := NewPointsWriterMetaClient() ms.DatabaseFn = func(database string) (*meta.DatabaseInfo, error) { return nil, nil } @@ -374,8 +376,8 @@ func (f *fakeStore) CreateShard(database, retentionPolicy string, shardID uint64 return f.CreateShardfn(database, retentionPolicy, shardID) } -func NewMetaClient() *MetaClient { - ms := &MetaClient{} +func NewPointsWriterMetaClient() *PointsWriterMetaClient { + ms := &PointsWriterMetaClient{} rp := NewRetentionPolicy("myp", time.Hour, 3) AttachShardGroupInfo(rp, []meta.ShardOwner{ {NodeID: 1}, @@ -403,7 +405,7 @@ func NewMetaClient() *MetaClient { return ms } -type MetaClient struct { +type PointsWriterMetaClient struct { NodeIDFn func() uint64 RetentionPolicyFn func(database, name string) (*meta.RetentionPolicyInfo, error) CreateShardGroupIfNotExistsFn func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) @@ -411,21 +413,21 @@ type MetaClient struct { ShardOwnerFn func(shardID uint64) (string, string, *meta.ShardGroupInfo) } -func (m MetaClient) NodeID() uint64 { return m.NodeIDFn() } +func (m PointsWriterMetaClient) NodeID() uint64 { return m.NodeIDFn() } -func (m MetaClient) RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error) { +func (m PointsWriterMetaClient) RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error) { return m.RetentionPolicyFn(database, name) } -func (m MetaClient) CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) { +func (m PointsWriterMetaClient) CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) { return m.CreateShardGroupIfNotExistsFn(database, policy, timestamp) } -func (m MetaClient) Database(database string) (*meta.DatabaseInfo, error) { +func (m PointsWriterMetaClient) Database(database string) (*meta.DatabaseInfo, error) { return m.DatabaseFn(database) } -func (m MetaClient) ShardOwner(shardID uint64) (string, string, *meta.ShardGroupInfo) { +func (m PointsWriterMetaClient) ShardOwner(shardID uint64) (string, string, *meta.ShardGroupInfo) { return m.ShardOwnerFn(shardID) } diff --git a/cluster/query_executor.go b/cluster/query_executor.go index 6172a75658..07d16a4669 100644 --- a/cluster/query_executor.go +++ b/cluster/query_executor.go @@ -8,6 +8,8 @@ import ( "io" "io/ioutil" "log" + "math/rand" + "net" "sort" "strconv" "time" @@ -17,16 +19,18 @@ import ( "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/monitor" "github.com/influxdata/influxdb/services/meta" - "github.com/influxdata/influxdb/tsdb" ) // A QueryExecutor is responsible for processing a influxql.Query and // executing all of the statements within, on nodes in a cluster. type QueryExecutor struct { - MetaClient *meta.Client + // Reference to local node. + Node *influxdb.Node + + MetaClient MetaClient // TSDB storage for local node. - TSDBStore *tsdb.Store + TSDBStore TSDBStore // Holds monitoring data for SHOW STATS and SHOW DIAGNOSTICS. Monitor *monitor.Monitor @@ -37,6 +41,9 @@ type QueryExecutor struct { // Used for executing meta statements on all data nodes. MetaExecutor *MetaExecutor + // Remote execution timeout + Timeout time.Duration + // Output of all logging. // Defaults to discarding all log output. LogOutput io.Writer @@ -54,6 +61,7 @@ const ( // NewQueryExecutor returns a new instance of QueryExecutor. func NewQueryExecutor() *QueryExecutor { return &QueryExecutor{ + Timeout: DefaultShardMapperTimeout, LogOutput: ioutil.Discard, statMap: influxdb.NewStatistics("queryExecutor", "queryExecutor", nil), } @@ -424,22 +432,21 @@ func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, c // Remove "time" from fields list. stmt.RewriteTimeFields() - // Filter only shards that contain date range. - shardIDs, err := e.MetaClient.ShardIDsByTimeRange(stmt.Sources, opt.MinTime, opt.MaxTime) + // Create an iterator creator based on the shards in the cluster. + ic, err := e.iteratorCreator(stmt, &opt) if err != nil { return err } - shards := e.TSDBStore.Shards(shardIDs) // Rewrite wildcards, if any exist. - tmp, err := stmt.RewriteWildcards(tsdb.Shards(shards)) + tmp, err := stmt.RewriteWildcards(ic) if err != nil { return err } stmt = tmp // Create a set of iterators from a selection. - itrs, err := influxql.Select(stmt, tsdb.Shards(shards), &opt) + itrs, err := influxql.Select(stmt, ic, &opt) if err != nil { return err } @@ -507,6 +514,70 @@ func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, c return nil } +// iteratorCreator returns a new instance of IteratorCreator based on stmt. +func (e *QueryExecutor) iteratorCreator(stmt *influxql.SelectStatement, opt *influxql.SelectOptions) (influxql.IteratorCreator, error) { + // Retrieve a list of shard IDs. + shards, err := e.MetaClient.ShardsByTimeRange(stmt.Sources, opt.MinTime, opt.MaxTime) + if err != nil { + return nil, err + } + + // Map shards to nodes. + shardIDsByNodeID := make(map[uint64][]uint64) + for _, si := range shards { + // Always assign to local node if it has the shard. + // Otherwise randomly select a remote node. + var nodeID uint64 + if si.OwnedBy(e.Node.ID) { + nodeID = e.Node.ID + } else if len(si.Owners) > 0 { + nodeID = si.Owners[rand.Intn(len(si.Owners))].NodeID + } else { + // This should not occur but if the shard has no owners then + // we don't want this to panic by trying to randomly select a node. + continue + } + + // Otherwise assign it to a remote shard randomly. + shardIDsByNodeID[nodeID] = append(shardIDsByNodeID[nodeID], si.ID) + } + + // Generate iterators for each node. + ics := make([]influxql.IteratorCreator, 0) + if err := func() error { + for nodeID, shardIDs := range shardIDsByNodeID { + // Sort shard IDs so we get more predicable execution. + sort.Sort(uint64Slice(shardIDs)) + + // Create iterator creators from TSDB if local. + if nodeID == e.Node.ID { + for _, shardID := range shardIDs { + ic := e.TSDBStore.ShardIteratorCreator(shardID) + if ic == nil { + continue + } + ics = append(ics, ic) + } + continue + } + + // Otherwise create iterator creator remotely. + dialer := &NodeDialer{ + MetaClient: e.MetaClient, + Timeout: e.Timeout, + } + ics = append(ics, newRemoteIteratorCreator(dialer, nodeID, shardIDs)) + } + + return nil + }(); err != nil { + influxql.IteratorCreators(ics).Close() + return nil, err + } + + return influxql.IteratorCreators(ics), nil +} + func (e *QueryExecutor) executeShowContinuousQueriesStatement(stmt *influxql.ShowContinuousQueriesStatement) (models.Rows, error) { dis, err := e.MetaClient.Databases() if err != nil { @@ -896,6 +967,147 @@ type IntoWriteRequest struct { Points []models.Point } +// remoteIteratorCreator creates iterators for remote shards. +type remoteIteratorCreator struct { + dialer *NodeDialer + nodeID uint64 + shardIDs []uint64 +} + +// newRemoteIteratorCreator returns a new instance of remoteIteratorCreator for a remote shard. +func newRemoteIteratorCreator(dialer *NodeDialer, nodeID uint64, shardIDs []uint64) *remoteIteratorCreator { + return &remoteIteratorCreator{ + dialer: dialer, + nodeID: nodeID, + shardIDs: shardIDs, + } +} + +// CreateIterator creates a remote streaming iterator. +func (ic *remoteIteratorCreator) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) { + conn, err := ic.dialer.DialNode(ic.nodeID) + if err != nil { + return nil, err + } + + if err := func() error { + // Write request. + if err := EncodeTLV(conn, createIteratorRequestMessage, &CreateIteratorRequest{ + ShardIDs: ic.shardIDs, + Opt: opt, + }); err != nil { + return err + } + + // Read the response. + var resp CreateIteratorResponse + if _, err := DecodeTLV(conn, &resp); err != nil { + return err + } else if resp.Err != nil { + return err + } + + return nil + }(); err != nil { + conn.Close() + return nil, err + } + + return influxql.NewReaderIterator(conn) +} + +// FieldDimensions returns the unique fields and dimensions across a list of sources. +func (ic *remoteIteratorCreator) FieldDimensions(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) { + conn, err := ic.dialer.DialNode(ic.nodeID) + if err != nil { + return nil, nil, err + } + defer conn.Close() + + // Write request. + if err := EncodeTLV(conn, fieldDimensionsRequestMessage, &FieldDimensionsRequest{ + ShardIDs: ic.shardIDs, + Sources: sources, + }); err != nil { + return nil, nil, err + } + + // Read the response. + var resp FieldDimensionsResponse + if _, err := DecodeTLV(conn, &resp); err != nil { + return nil, nil, err + } + return resp.Fields, resp.Dimensions, resp.Err +} + +// SeriesKeys returns a list of series keys from the underlying shard. +func (ic *remoteIteratorCreator) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error) { + conn, err := ic.dialer.DialNode(ic.nodeID) + if err != nil { + return nil, err + } + defer conn.Close() + + // Write request. + if err := EncodeTLV(conn, seriesKeysRequestMessage, &SeriesKeysRequest{ + ShardIDs: ic.shardIDs, + Opt: opt, + }); err != nil { + return nil, err + } + + // Read the response. + var resp SeriesKeysResponse + if _, err := DecodeTLV(conn, &resp); err != nil { + return nil, err + } + return resp.SeriesList, resp.Err +} + +// NodeDialer dials connections to a given node. +type NodeDialer struct { + MetaClient MetaClient + Timeout time.Duration +} + +// DialNode returns a connection to a node. +func (d *NodeDialer) DialNode(nodeID uint64) (net.Conn, error) { + ni, err := d.MetaClient.DataNode(nodeID) + if err != nil { + return nil, err + } + + conn, err := net.Dial("tcp", ni.TCPHost) + if err != nil { + return nil, err + } + conn.SetDeadline(time.Now().Add(d.Timeout)) + + // Write the cluster multiplexing header byte + if _, err := conn.Write([]byte{MuxHeader}); err != nil { + conn.Close() + return nil, err + } + + return conn, nil +} + +// TSDBStore is an interface for accessing the time series data store. +type TSDBStore interface { + CreateShard(database, policy string, shardID uint64) error + WriteToShard(shardID uint64, points []models.Point) error + + DeleteDatabase(name string) error + DeleteMeasurement(database, name string) error + DeleteRetentionPolicy(database, name string) error + DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error + ExecuteShowFieldKeysStatement(stmt *influxql.ShowFieldKeysStatement, database string) (models.Rows, error) + ExecuteShowSeriesStatement(stmt *influxql.ShowSeriesStatement, database string) (models.Rows, error) + ExecuteShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement, database string) (models.Rows, error) + ExpandSources(sources influxql.Sources) (influxql.Sources, error) + ShardIteratorCreator(id uint64) influxql.IteratorCreator +} + // joinUint64 returns a comma-delimited string of uint64 numbers. func joinUint64(a []uint64) string { var buf bytes.Buffer @@ -966,3 +1178,9 @@ func (s stringSet) intersect(o stringSet) stringSet { } return ns } + +type uint64Slice []uint64 + +func (a uint64Slice) Len() int { return len(a) } +func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] } diff --git a/cluster/query_executor_test.go b/cluster/query_executor_test.go new file mode 100644 index 0000000000..c7bab87a4b --- /dev/null +++ b/cluster/query_executor_test.go @@ -0,0 +1,311 @@ +package cluster_test + +import ( + "bytes" + "io" + "os" + "reflect" + "testing" + "time" + + "github.com/davecgh/go-spew/spew" + "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/cluster" + "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/services/meta" +) + +const ( + // DefaultDatabase is the default database name used in tests. + DefaultDatabase = "db0" + + // DefaultRetentionPolicy is the default retention policy name used in tests. + DefaultRetentionPolicy = "rp0" +) + +// Ensure query executor can execute a simple SELECT statement. +func TestQueryExecutor_ExecuteQuery_SelectStatement(t *testing.T) { + e := DefaultQueryExecutor() + + // The meta client should return a single shard owned by the local node. + e.MetaClient.ShardsByTimeRangeFn = func(sources influxql.Sources, tmin, tmax time.Time) (a []meta.ShardInfo, err error) { + return []meta.ShardInfo{{ID: 100, Owners: []meta.ShardOwner{{NodeID: 0}}}}, nil + } + + // The TSDB store should return an IteratorCreator for shard. + // This IteratorCreator returns a single iterator with "value" in the aux fields. + e.TSDBStore.ShardIteratorCreatorFn = func(id uint64) influxql.IteratorCreator { + if id != 100 { + t.Fatalf("unexpected shard id: %d", id) + } + + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return &FloatIterator{Points: []influxql.FloatPoint{ + {Name: "cpu", Time: int64(0 * time.Second), Aux: []interface{}{float64(100)}}, + {Name: "cpu", Time: int64(1 * time.Second), Aux: []interface{}{float64(200)}}, + }}, nil + } + ic.FieldDimensionsFn = func(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) { + return map[string]struct{}{"value": struct{}{}}, nil, nil + } + ic.SeriesKeysFn = func(opt influxql.IteratorOptions) (influxql.SeriesList, error) { + return influxql.SeriesList{ + {Name: "cpu", Aux: []influxql.DataType{influxql.Float}}, + }, nil + } + return &ic + } + + // Verify all results from the query. + if a := ReadAllResults(e.ExecuteQuery(`SELECT * FROM cpu`, "db0", 0)); !reflect.DeepEqual(a, []*influxql.Result{ + { + StatementID: 0, + Series: []*models.Row{{ + Name: "cpu", + Columns: []string{"time", "value"}, + Values: [][]interface{}{ + {time.Unix(0, 0).UTC(), float64(100)}, + {time.Unix(1, 0).UTC(), float64(200)}, + }, + }}, + }, + }) { + t.Fatalf("unexpected results: %s", spew.Sdump(a)) + } +} + +// Ensure query executor can execute a distributed SELECT statement. +func TestQueryExecutor_ExecuteQuery_SelectStatement_Remote(t *testing.T) { + // Local executor. + e := DefaultQueryExecutor() + + // Start a second service. + s := MustOpenService() + defer s.Close() + + // Mock the remote service to create an iterator. + s.TSDBStore.ShardIteratorCreatorFn = func(shardID uint64) influxql.IteratorCreator { + if shardID != 200 { + t.Fatalf("unexpected remote shard id: %d", shardID) + } + + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return &FloatIterator{Points: []influxql.FloatPoint{ + {Name: "cpu", Time: int64(0 * time.Second), Value: 20}, + }}, nil + } + return &ic + } + + // Two shards are returned. One local and one remote. + e.MetaClient.ShardsByTimeRangeFn = func(sources influxql.Sources, tmin, tmax time.Time) (a []meta.ShardInfo, err error) { + return []meta.ShardInfo{ + {ID: 100, Owners: []meta.ShardOwner{{NodeID: 0}}}, + {ID: 200, Owners: []meta.ShardOwner{{NodeID: 1}}}, + }, nil + } + + // The meta client should return node data for the remote node. + e.MetaClient.DataNodeFn = func(id uint64) (*meta.NodeInfo, error) { + return &meta.NodeInfo{ID: 1, TCPHost: s.Addr().String()}, nil + } + + // The local node should return a single iterator. + e.TSDBStore.ShardIteratorCreatorFn = func(id uint64) influxql.IteratorCreator { + if id != 100 { + t.Fatalf("unexpected shard id: %d", id) + } + + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return &FloatIterator{Points: []influxql.FloatPoint{ + {Name: "cpu", Time: int64(0 * time.Second), Value: 10}, + }}, nil + } + return &ic + } + + // Verify all results from the query. + if a := ReadAllResults(e.ExecuteQuery(`SELECT count(value) FROM cpu`, "db0", 0)); !reflect.DeepEqual(a, []*influxql.Result{ + { + StatementID: 0, + Series: []*models.Row{{ + Name: "cpu", + Columns: []string{"time", "count"}, + Values: [][]interface{}{ + {time.Unix(0, 0).UTC(), float64(30)}, + }, + }}, + }, + }) { + t.Fatalf("unexpected results: %s", spew.Sdump(a)) + } +} + +// QueryExecutor is a test wrapper for cluster.QueryExecutor. +type QueryExecutor struct { + *cluster.QueryExecutor + + MetaClient MetaClient + TSDBStore TSDBStore + LogOutput bytes.Buffer +} + +// NewQueryExecutor returns a new instance of QueryExecutor. +// This query executor always has a node id of 0. +func NewQueryExecutor() *QueryExecutor { + e := &QueryExecutor{ + QueryExecutor: cluster.NewQueryExecutor(), + } + e.Node = &influxdb.Node{ID: 0} + e.QueryExecutor.MetaClient = &e.MetaClient + e.QueryExecutor.TSDBStore = &e.TSDBStore + + e.QueryExecutor.LogOutput = &e.LogOutput + if testing.Verbose() { + e.QueryExecutor.LogOutput = io.MultiWriter(e.QueryExecutor.LogOutput, os.Stderr) + } + + return e +} + +// DefaultQueryExecutor returns a QueryExecutor with a database (db0) and retention policy (rp0). +func DefaultQueryExecutor() *QueryExecutor { + e := NewQueryExecutor() + e.MetaClient.DatabaseFn = DefaultMetaClientDatabaseFn + e.TSDBStore.ExpandSourcesFn = DefaultTSDBStoreExpandSourcesFn + return e +} + +// ExecuteQuery parses query and executes against the database. +func (e *QueryExecutor) ExecuteQuery(query, database string, chunkSize int) <-chan *influxql.Result { + return e.QueryExecutor.ExecuteQuery(MustParseQuery(query), database, chunkSize, make(chan struct{})) +} + +// TSDBStore is a mockable implementation of cluster.TSDBStore. +type TSDBStore struct { + CreateShardFn func(database, policy string, shardID uint64) error + WriteToShardFn func(shardID uint64, points []models.Point) error + + DeleteDatabaseFn func(name string) error + DeleteMeasurementFn func(database, name string) error + DeleteRetentionPolicyFn func(database, name string) error + DeleteSeriesFn func(database string, sources []influxql.Source, condition influxql.Expr) error + ExecuteShowFieldKeysStatementFn func(stmt *influxql.ShowFieldKeysStatement, database string) (models.Rows, error) + ExecuteShowSeriesStatementFn func(stmt *influxql.ShowSeriesStatement, database string) (models.Rows, error) + ExecuteShowTagValuesStatementFn func(stmt *influxql.ShowTagValuesStatement, database string) (models.Rows, error) + ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error) + ShardIteratorCreatorFn func(id uint64) influxql.IteratorCreator +} + +func (s *TSDBStore) CreateShard(database, policy string, shardID uint64) error { + return s.CreateShardFn(database, policy, shardID) +} + +func (s *TSDBStore) WriteToShard(shardID uint64, points []models.Point) error { + return s.WriteToShardFn(shardID, points) +} + +func (s *TSDBStore) DeleteDatabase(name string) error { + return s.DeleteDatabaseFn(name) +} + +func (s *TSDBStore) DeleteMeasurement(database, name string) error { + return s.DeleteMeasurementFn(database, name) +} + +func (s *TSDBStore) DeleteRetentionPolicy(database, name string) error { + return s.DeleteRetentionPolicyFn(database, name) +} + +func (s *TSDBStore) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error { + return s.DeleteSeriesFn(database, sources, condition) +} + +func (s *TSDBStore) ExecuteShowFieldKeysStatement(stmt *influxql.ShowFieldKeysStatement, database string) (models.Rows, error) { + return s.ExecuteShowFieldKeysStatementFn(stmt, database) +} + +func (s *TSDBStore) ExecuteShowSeriesStatement(stmt *influxql.ShowSeriesStatement, database string) (models.Rows, error) { + return s.ExecuteShowSeriesStatementFn(stmt, database) +} + +func (s *TSDBStore) ExecuteShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement, database string) (models.Rows, error) { + return s.ExecuteShowTagValuesStatementFn(stmt, database) +} + +func (s *TSDBStore) ExpandSources(sources influxql.Sources) (influxql.Sources, error) { + return s.ExpandSourcesFn(sources) +} + +func (s *TSDBStore) ShardIteratorCreator(id uint64) influxql.IteratorCreator { + return s.ShardIteratorCreatorFn(id) +} + +// DefaultTSDBStoreExpandSourcesFn expands a single source using the default database & retention policy. +func DefaultTSDBStoreExpandSourcesFn(sources influxql.Sources) (influxql.Sources, error) { + return influxql.Sources{&influxql.Measurement{ + Database: DefaultDatabase, + RetentionPolicy: DefaultRetentionPolicy, + Name: sources[0].(*influxql.Measurement).Name}, + }, nil +} + +// MustParseQuery parses s into a query. Panic on error. +func MustParseQuery(s string) *influxql.Query { + q, err := influxql.ParseQuery(s) + if err != nil { + panic(err) + } + return q +} + +// ReadAllResults reads all results from c and returns as a slice. +func ReadAllResults(c <-chan *influxql.Result) []*influxql.Result { + var a []*influxql.Result + for result := range c { + a = append(a, result) + } + return a +} + +// IteratorCreator is a mockable implementation of IteratorCreator. +type IteratorCreator struct { + CreateIteratorFn func(opt influxql.IteratorOptions) (influxql.Iterator, error) + FieldDimensionsFn func(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) + SeriesKeysFn func(opt influxql.IteratorOptions) (influxql.SeriesList, error) +} + +func (ic *IteratorCreator) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return ic.CreateIteratorFn(opt) +} + +func (ic *IteratorCreator) FieldDimensions(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) { + return ic.FieldDimensionsFn(sources) +} + +func (ic *IteratorCreator) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error) { + return ic.SeriesKeysFn(opt) +} + +// FloatIterator is a represents an iterator that reads from a slice. +type FloatIterator struct { + Points []influxql.FloatPoint +} + +// Close is a no-op. +func (itr *FloatIterator) Close() error { return nil } + +// Next returns the next value and shifts it off the beginning of the points slice. +func (itr *FloatIterator) Next() *influxql.FloatPoint { + if len(itr.Points) == 0 { + return nil + } + + v := &itr.Points[0] + itr.Points = itr.Points[1:] + return v +} diff --git a/cluster/rpc.go b/cluster/rpc.go index d16598c840..01cbaf2df7 100644 --- a/cluster/rpc.go +++ b/cluster/rpc.go @@ -1,108 +1,18 @@ package cluster import ( + "errors" "fmt" "time" "github.com/gogo/protobuf/proto" "github.com/influxdata/influxdb/cluster/internal" + "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" ) //go:generate protoc --gogo_out=. internal/data.proto -// MapShardRequest represents the request to map a remote shard for a query. -type MapShardRequest struct { - pb internal.MapShardRequest -} - -// ShardID of the map request -func (m *MapShardRequest) ShardID() uint64 { return m.pb.GetShardID() } - -// Query returns the Shard map request's query -func (m *MapShardRequest) Query() string { return m.pb.GetQuery() } - -// ChunkSize returns Shard map request's chunk size -func (m *MapShardRequest) ChunkSize() int32 { return m.pb.GetChunkSize() } - -// SetShardID sets the map request's shard id -func (m *MapShardRequest) SetShardID(id uint64) { m.pb.ShardID = &id } - -// SetQuery sets the Shard map request's Query -func (m *MapShardRequest) SetQuery(query string) { m.pb.Query = &query } - -// SetChunkSize sets the Shard map request's chunk size -func (m *MapShardRequest) SetChunkSize(chunkSize int32) { m.pb.ChunkSize = &chunkSize } - -// MarshalBinary encodes the object to a binary format. -func (m *MapShardRequest) MarshalBinary() ([]byte, error) { - return proto.Marshal(&m.pb) -} - -// UnmarshalBinary populates MapShardRequest from a binary format. -func (m *MapShardRequest) UnmarshalBinary(buf []byte) error { - if err := proto.Unmarshal(buf, &m.pb); err != nil { - return err - } - return nil -} - -// MapShardResponse represents the response returned from a remote MapShardRequest call -type MapShardResponse struct { - pb internal.MapShardResponse -} - -// NewMapShardResponse returns the response returned from a remote MapShardRequest call -func NewMapShardResponse(code int, message string) *MapShardResponse { - m := &MapShardResponse{} - m.SetCode(code) - m.SetMessage(message) - return m -} - -// Code returns the Shard map response's code -func (r *MapShardResponse) Code() int { return int(r.pb.GetCode()) } - -// Message returns the the Shard map response's Message -func (r *MapShardResponse) Message() string { return r.pb.GetMessage() } - -// TagSets returns Shard map response's tag sets -func (r *MapShardResponse) TagSets() []string { return r.pb.GetTagSets() } - -// Fields returns the Shard map response's Fields -func (r *MapShardResponse) Fields() []string { return r.pb.GetFields() } - -// Data returns the Shard map response's Data -func (r *MapShardResponse) Data() []byte { return r.pb.GetData() } - -// SetCode sets the Shard map response's code -func (r *MapShardResponse) SetCode(code int) { r.pb.Code = proto.Int32(int32(code)) } - -// SetMessage sets Shard map response's message -func (r *MapShardResponse) SetMessage(message string) { r.pb.Message = &message } - -// SetTagSets sets Shard map response's tagsets -func (r *MapShardResponse) SetTagSets(tagsets []string) { r.pb.TagSets = tagsets } - -// SetFields sets the Shard map response's Fields -func (r *MapShardResponse) SetFields(fields []string) { r.pb.Fields = fields } - -// SetData sets the Shard map response's Data -func (r *MapShardResponse) SetData(data []byte) { r.pb.Data = data } - -// MarshalBinary encodes the object to a binary format. -func (r *MapShardResponse) MarshalBinary() ([]byte, error) { - return proto.Marshal(&r.pb) -} - -// UnmarshalBinary populates WritePointRequest from a binary format. -func (r *MapShardResponse) UnmarshalBinary(buf []byte) error { - if err := proto.Unmarshal(buf, &r.pb); err != nil { - return err - } - return nil -} - // WritePointsRequest represents a request to write point data to the cluster type WritePointsRequest struct { Database string @@ -281,3 +191,202 @@ func (w *ExecuteStatementResponse) UnmarshalBinary(buf []byte) error { } return nil } + +// CreateIteratorRequest represents a request to create a remote iterator. +type CreateIteratorRequest struct { + ShardIDs []uint64 + Opt influxql.IteratorOptions +} + +// MarshalBinary encodes r to a binary format. +func (r *CreateIteratorRequest) MarshalBinary() ([]byte, error) { + buf, err := r.Opt.MarshalBinary() + if err != nil { + return nil, err + } + return proto.Marshal(&internal.CreateIteratorRequest{ + ShardIDs: r.ShardIDs, + Opt: buf, + }) +} + +// UnmarshalBinary decodes data into r. +func (r *CreateIteratorRequest) UnmarshalBinary(data []byte) error { + var pb internal.CreateIteratorRequest + if err := proto.Unmarshal(data, &pb); err != nil { + return err + } + + r.ShardIDs = pb.GetShardIDs() + if err := r.Opt.UnmarshalBinary(pb.GetOpt()); err != nil { + return err + } + return nil +} + +// CreateIteratorResponse represents a response from remote iterator creation. +type CreateIteratorResponse struct { + Err error +} + +// MarshalBinary encodes r to a binary format. +func (r *CreateIteratorResponse) MarshalBinary() ([]byte, error) { + var pb internal.CreateIteratorResponse + if r.Err != nil { + pb.Err = proto.String(r.Err.Error()) + } + return proto.Marshal(&pb) +} + +// UnmarshalBinary decodes data into r. +func (r *CreateIteratorResponse) UnmarshalBinary(data []byte) error { + var pb internal.CreateIteratorResponse + if err := proto.Unmarshal(data, &pb); err != nil { + return err + } + if pb.Err != nil { + r.Err = errors.New(pb.GetErr()) + } + return nil +} + +// FieldDimensionsRequest represents a request to retrieve unique fields & dimensions. +type FieldDimensionsRequest struct { + ShardIDs []uint64 + Sources influxql.Sources +} + +// MarshalBinary encodes r to a binary format. +func (r *FieldDimensionsRequest) MarshalBinary() ([]byte, error) { + buf, err := r.Sources.MarshalBinary() + if err != nil { + return nil, err + } + return proto.Marshal(&internal.FieldDimensionsRequest{ + ShardIDs: r.ShardIDs, + Sources: buf, + }) +} + +// UnmarshalBinary decodes data into r. +func (r *FieldDimensionsRequest) UnmarshalBinary(data []byte) error { + var pb internal.FieldDimensionsRequest + if err := proto.Unmarshal(data, &pb); err != nil { + return err + } + + r.ShardIDs = pb.GetShardIDs() + if err := r.Sources.UnmarshalBinary(pb.GetSources()); err != nil { + return err + } + return nil +} + +// FieldDimensionsResponse represents a response from remote iterator creation. +type FieldDimensionsResponse struct { + Fields map[string]struct{} + Dimensions map[string]struct{} + Err error +} + +// MarshalBinary encodes r to a binary format. +func (r *FieldDimensionsResponse) MarshalBinary() ([]byte, error) { + var pb internal.FieldDimensionsResponse + + pb.Fields = make([]string, 0, len(r.Fields)) + for k := range r.Fields { + pb.Fields = append(pb.Fields, k) + } + + pb.Dimensions = make([]string, 0, len(r.Dimensions)) + for k := range r.Dimensions { + pb.Dimensions = append(pb.Dimensions, k) + } + + if r.Err != nil { + pb.Err = proto.String(r.Err.Error()) + } + return proto.Marshal(&pb) +} + +// UnmarshalBinary decodes data into r. +func (r *FieldDimensionsResponse) UnmarshalBinary(data []byte) error { + var pb internal.FieldDimensionsResponse + if err := proto.Unmarshal(data, &pb); err != nil { + return err + } + + r.Fields = make(map[string]struct{}, len(pb.GetFields())) + for _, s := range pb.GetFields() { + r.Fields[s] = struct{}{} + } + + r.Dimensions = make(map[string]struct{}, len(pb.GetDimensions())) + for _, s := range pb.GetDimensions() { + r.Dimensions[s] = struct{}{} + } + + if pb.Err != nil { + r.Err = errors.New(pb.GetErr()) + } + return nil +} + +// SeriesKeysRequest represents a request to retrieve a list of series keys. +type SeriesKeysRequest struct { + ShardIDs []uint64 + Opt influxql.IteratorOptions +} + +// MarshalBinary encodes r to a binary format. +func (r *SeriesKeysRequest) MarshalBinary() ([]byte, error) { + buf, err := r.Opt.MarshalBinary() + if err != nil { + return nil, err + } + return proto.Marshal(&internal.SeriesKeysRequest{ + ShardIDs: r.ShardIDs, + Opt: buf, + }) +} + +// UnmarshalBinary decodes data into r. +func (r *SeriesKeysRequest) UnmarshalBinary(data []byte) error { + var pb internal.SeriesKeysRequest + if err := proto.Unmarshal(data, &pb); err != nil { + return err + } + + r.ShardIDs = pb.GetShardIDs() + if err := r.Opt.UnmarshalBinary(pb.GetOpt()); err != nil { + return err + } + return nil +} + +// SeriesKeysResponse represents a response from retrieving series keys. +type SeriesKeysResponse struct { + SeriesList influxql.SeriesList + Err error +} + +// MarshalBinary encodes r to a binary format. +func (r *SeriesKeysResponse) MarshalBinary() ([]byte, error) { + var pb internal.CreateIteratorResponse + if r.Err != nil { + pb.Err = proto.String(r.Err.Error()) + } + return proto.Marshal(&pb) +} + +// UnmarshalBinary decodes data into r. +func (r *SeriesKeysResponse) UnmarshalBinary(data []byte) error { + var pb internal.CreateIteratorResponse + if err := proto.Unmarshal(data, &pb); err != nil { + return err + } + if pb.Err != nil { + r.Err = errors.New(pb.GetErr()) + } + return nil +} diff --git a/cluster/service.go b/cluster/service.go index 8747f02f6d..bdd1ce57a6 100644 --- a/cluster/service.go +++ b/cluster/service.go @@ -1,6 +1,7 @@ package cluster import ( + "encoding" "encoding/binary" "expvar" "fmt" @@ -13,7 +14,6 @@ import ( "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/influxql" - "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/tsdb" ) @@ -29,8 +29,15 @@ const ( writeShardReq = "writeShardReq" writeShardPointsReq = "writeShardPointsReq" writeShardFail = "writeShardFail" - mapShardReq = "mapShardReq" - mapShardResp = "mapShardResp" + + createIteratorReq = "createIteratorReq" + createIteratorResp = "createIteratorResp" + + fieldDimensionsReq = "fieldDimensionsReq" + fieldDimensionsResp = "fieldDimensionsResp" + + seriesKeysReq = "seriesKeysReq" + seriesKeysResp = "seriesKeysResp" ) // Service processes data received over raw TCP connections. @@ -46,14 +53,7 @@ type Service struct { ShardOwner(shardID uint64) (string, string, *meta.ShardGroupInfo) } - TSDBStore interface { - CreateShard(database, policy string, shardID uint64) error - WriteToShard(shardID uint64, points []models.Point) error - DeleteDatabase(name string) error - DeleteMeasurement(database, name string) error - DeleteSeries(database string, source []influxql.Source, condition influxql.Expr) error - DeleteRetentionPolicy(database, name string) error - } + TSDBStore TSDBStore Logger *log.Logger statMap *expvar.Map @@ -148,42 +148,51 @@ func (s *Service) handleConn(conn net.Conn) { }() for { // Read type-length-value. - typ, buf, err := ReadTLV(conn) + typ, err := ReadType(conn) if err != nil { if strings.HasSuffix(err.Error(), "EOF") { return } - s.Logger.Printf("unable to read type-length-value %s", err) + s.Logger.Printf("unable to read type: %s", err) return } // Delegate message processing by type. switch typ { case writeShardRequestMessage: + buf, err := ReadLV(conn) + if err != nil { + s.Logger.Printf("unable to read length-value: %s", err) + return + } + s.statMap.Add(writeShardReq, 1) - err := s.processWriteShardRequest(buf) + err = s.processWriteShardRequest(buf) if err != nil { s.Logger.Printf("process write shard error: %s", err) } s.writeShardResponse(conn, err) - case mapShardRequestMessage: - s.statMap.Add(mapShardReq, 1) - panic("FIXME(benbjohnson: integrate remote execution with iterators") - /* - err := s.processMapShardRequest(conn, buf) - if err != nil { - s.Logger.Printf("process map shard error: %s", err) - if err := writeMapShardResponseMessage(conn, NewMapShardResponse(1, err.Error())); err != nil { - s.Logger.Printf("process map shard error writing response: %s", err.Error()) - } - } - */ case executeStatementRequestMessage: - err := s.processExecuteStatementRequest(buf) + buf, err := ReadLV(conn) + if err != nil { + s.Logger.Printf("unable to read length-value: %s", err) + return + } + + err = s.processExecuteStatementRequest(buf) if err != nil { s.Logger.Printf("process execute statement error: %s", err) } s.writeShardResponse(conn, err) + case createIteratorRequestMessage: + s.statMap.Add(createIteratorReq, 1) + s.processCreateIteratorRequest(conn) + case fieldDimensionsRequestMessage: + s.statMap.Add(fieldDimensionsReq, 1) + s.processFieldDimensionsRequest(conn) + case seriesKeysRequestMessage: + s.statMap.Add(seriesKeysReq, 1) + s.processSeriesKeysRequest(conn) default: s.Logger.Printf("cluster service message type not found: %d", typ) } @@ -287,120 +296,210 @@ func (s *Service) writeShardResponse(w io.Writer, e error) { } } -/* -func (s *Service) processMapShardRequest(w io.Writer, buf []byte) error { - // Decode request - var req MapShardRequest - if err := req.UnmarshalBinary(buf); err != nil { - return err - } +func (s *Service) processCreateIteratorRequest(conn net.Conn) { + defer conn.Close() - // Parse the statement. - q, err := influxql.ParseQuery(req.Query()) - if err != nil { - return fmt.Errorf("processing map shard: %s", err) - } else if len(q.Statements) != 1 { - return fmt.Errorf("processing map shard: expected 1 statement but got %d", len(q.Statements)) - } - - m, err := s.TSDBStore.CreateMapper(req.ShardID(), q.Statements[0], int(req.ChunkSize())) - if err != nil { - return fmt.Errorf("create mapper: %s", err) - } - if m == nil { - return writeMapShardResponseMessage(w, NewMapShardResponse(0, "")) - } - - if err := m.Open(); err != nil { - return fmt.Errorf("mapper open: %s", err) - } - defer m.Close() - - var metaSent bool - for { - var resp MapShardResponse - - if !metaSent { - resp.SetTagSets(m.TagSets()) - resp.SetFields(m.Fields()) - metaSent = true - } - - chunk, err := m.NextChunk() - if err != nil { - return fmt.Errorf("next chunk: %s", err) - } - - // NOTE: Even if the chunk is nil, we still need to send one - // empty response to let the other side know we're out of data. - - if chunk != nil { - b, err := json.Marshal(chunk) - if err != nil { - return fmt.Errorf("encoding: %s", err) - } - resp.SetData(b) - } - - // Write to connection. - resp.SetCode(0) - if err := writeMapShardResponseMessage(w, &resp); err != nil { + var itr influxql.Iterator + if err := func() error { + // Parse request. + var req CreateIteratorRequest + if err := DecodeLV(conn, &req); err != nil { return err } - s.statMap.Add(mapShardResp, 1) - if chunk == nil { - // All mapper data sent. - return nil + // Collect iterator creators for each shard. + ics := make([]influxql.IteratorCreator, 0, len(req.ShardIDs)) + for _, shardID := range req.ShardIDs { + ic := s.TSDBStore.ShardIteratorCreator(shardID) + if ic == nil { + return nil + } + ics = append(ics, ic) } + + // Generate a single iterator from all shards. + i, err := influxql.IteratorCreators(ics).CreateIterator(req.Opt) + if err != nil { + return err + } + itr = i + + return nil + }(); err != nil { + itr.Close() + s.Logger.Printf("error reading CreateIterator request: %s", err) + EncodeTLV(conn, createIteratorResponseMessage, &CreateIteratorResponse{Err: err}) + return + } + + // Encode success response. + if err := EncodeTLV(conn, createIteratorResponseMessage, &CreateIteratorResponse{}); err != nil { + s.Logger.Printf("error writing CreateIterator response: %s", err) + return + } + + // Exit if no iterator was produced. + if itr == nil { + return + } + + // Stream iterator to connection. + if err := influxql.NewIteratorEncoder(conn).EncodeIterator(itr); err != nil { + s.Logger.Printf("error encoding CreateIterator iterator: %s", err) + return } } -*/ -func writeMapShardResponseMessage(w io.Writer, msg *MapShardResponse) error { - buf, err := msg.MarshalBinary() - if err != nil { - return err +func (s *Service) processFieldDimensionsRequest(conn net.Conn) { + var fields, dimensions map[string]struct{} + if err := func() error { + // Parse request. + var req FieldDimensionsRequest + if err := DecodeLV(conn, &req); err != nil { + return err + } + + // Collect iterator creators for each shard. + ics := make([]influxql.IteratorCreator, 0, len(req.ShardIDs)) + for _, shardID := range req.ShardIDs { + ic := s.TSDBStore.ShardIteratorCreator(shardID) + if ic == nil { + return nil + } + ics = append(ics, ic) + } + + // Generate a single iterator from all shards. + f, d, err := influxql.IteratorCreators(ics).FieldDimensions(req.Sources) + if err != nil { + return err + } + fields, dimensions = f, d + + return nil + }(); err != nil { + s.Logger.Printf("error reading FieldDimensions request: %s", err) + EncodeTLV(conn, fieldDimensionsResponseMessage, &FieldDimensionsResponse{Err: err}) + return + } + + // Encode success response. + if err := EncodeTLV(conn, fieldDimensionsResponseMessage, &FieldDimensionsResponse{ + Fields: fields, + Dimensions: dimensions, + }); err != nil { + s.Logger.Printf("error writing FieldDimensions response: %s", err) + return + } +} + +func (s *Service) processSeriesKeysRequest(conn net.Conn) { + var seriesList influxql.SeriesList + if err := func() error { + // Parse request. + var req SeriesKeysRequest + if err := DecodeLV(conn, &req); err != nil { + return err + } + + // Collect iterator creators for each shard. + ics := make([]influxql.IteratorCreator, 0, len(req.ShardIDs)) + for _, shardID := range req.ShardIDs { + ic := s.TSDBStore.ShardIteratorCreator(shardID) + if ic == nil { + return nil + } + ics = append(ics, ic) + } + + // Generate a single iterator from all shards. + a, err := influxql.IteratorCreators(ics).SeriesKeys(req.Opt) + if err != nil { + return err + } + seriesList = a + + return nil + }(); err != nil { + s.Logger.Printf("error reading SeriesKeys request: %s", err) + EncodeTLV(conn, seriesKeysResponseMessage, &SeriesKeysResponse{Err: err}) + return + } + + // Encode success response. + if err := EncodeTLV(conn, seriesKeysResponseMessage, &SeriesKeysResponse{ + SeriesList: seriesList, + }); err != nil { + s.Logger.Printf("error writing SeriesKeys response: %s", err) + return } - return WriteTLV(w, mapShardResponseMessage, buf) } // ReadTLV reads a type-length-value record from r. func ReadTLV(r io.Reader) (byte, []byte, error) { - var typ [1]byte - if _, err := io.ReadFull(r, typ[:]); err != nil { - return 0, nil, fmt.Errorf("read message type: %s", err) + typ, err := ReadType(r) + if err != nil { + return 0, nil, err } + buf, err := ReadLV(r) + if err != nil { + return 0, nil, err + } + return typ, buf, err +} + +// ReadType reads the type from a TLV record. +func ReadType(r io.Reader) (byte, error) { + var typ [1]byte + if _, err := io.ReadFull(r, typ[:]); err != nil { + return 0, fmt.Errorf("read message type: %s", err) + } + return typ[0], nil +} + +// ReadLV reads the length-value from a TLV record. +func ReadLV(r io.Reader) ([]byte, error) { // Read the size of the message. var sz int64 if err := binary.Read(r, binary.BigEndian, &sz); err != nil { - return 0, nil, fmt.Errorf("read message size: %s", err) - } - - if sz == 0 { - return 0, nil, fmt.Errorf("invalid message size: %d", sz) + return nil, fmt.Errorf("read message size: %s", err) } if sz >= MaxMessageSize { - return 0, nil, fmt.Errorf("max message size of %d exceeded: %d", MaxMessageSize, sz) + return nil, fmt.Errorf("max message size of %d exceeded: %d", MaxMessageSize, sz) } // Read the value. buf := make([]byte, sz) if _, err := io.ReadFull(r, buf); err != nil { - return 0, nil, fmt.Errorf("read message value: %s", err) + return nil, fmt.Errorf("read message value: %s", err) } - return typ[0], buf, nil + return buf, nil } // WriteTLV writes a type-length-value record to w. func WriteTLV(w io.Writer, typ byte, buf []byte) error { + if err := WriteType(w, typ); err != nil { + return err + } + if err := WriteLV(w, buf); err != nil { + return err + } + return nil +} + +// WriteType writes the type in a TLV record to w. +func WriteType(w io.Writer, typ byte) error { if _, err := w.Write([]byte{typ}); err != nil { return fmt.Errorf("write message type: %s", err) } + return nil +} +// WriteLV writes the length-value in a TLV record to w. +func WriteLV(w io.Writer, buf []byte) error { // Write the size of the message. if err := binary.Write(w, binary.BigEndian, int64(len(buf))); err != nil { return fmt.Errorf("write message size: %s", err) @@ -410,6 +509,54 @@ func WriteTLV(w io.Writer, typ byte, buf []byte) error { if _, err := w.Write(buf); err != nil { return fmt.Errorf("write message value: %s", err) } - + return nil +} + +// EncodeTLV encodes v to a binary format and writes the record-length-value record to w. +func EncodeTLV(w io.Writer, typ byte, v encoding.BinaryMarshaler) error { + if err := WriteType(w, typ); err != nil { + return err + } + if err := EncodeLV(w, v); err != nil { + return err + } + return nil +} + +// EncodeLV encodes v to a binary format and writes the length-value record to w. +func EncodeLV(w io.Writer, v encoding.BinaryMarshaler) error { + buf, err := v.MarshalBinary() + if err != nil { + return err + } + + if err := WriteLV(w, buf); err != nil { + return err + } + return nil +} + +// DecodeTLV reads the type-length-value record from r and unmarshals it into v. +func DecodeTLV(r io.Reader, v encoding.BinaryUnmarshaler) (typ byte, err error) { + typ, err = ReadType(r) + if err != nil { + return 0, err + } + if err := DecodeLV(r, v); err != nil { + return 0, err + } + return typ, nil +} + +// DecodeLV reads the length-value record from r and unmarshals it into v. +func DecodeLV(r io.Reader, v encoding.BinaryUnmarshaler) error { + buf, err := ReadLV(r) + if err != nil { + return err + } + + if err := v.UnmarshalBinary(buf); err != nil { + return err + } return nil } diff --git a/cluster/service_test.go b/cluster/service_test.go index a31a855469..68ecf43fc9 100644 --- a/cluster/service_test.go +++ b/cluster/service_test.go @@ -2,11 +2,11 @@ package cluster_test import ( "fmt" + "io" "net" "time" "github.com/influxdata/influxdb/cluster" - "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/tcp" @@ -24,15 +24,11 @@ func (m *metaClient) DataNode(nodeID uint64) (*meta.NodeInfo, error) { } type testService struct { - nodeID uint64 - ln net.Listener - muxln net.Listener - writeShardFunc func(shardID uint64, points []models.Point) error - createShardFunc func(database, policy string, shardID uint64) error - deleteDatabaseFunc func(database string) error - deleteMeasurementFunc func(database, name string) error - deleteSeriesFunc func(database string, sources []influxql.Source, condition influxql.Expr) error - deleteRetentionPolicyFunc func(database, name string) error + nodeID uint64 + ln net.Listener + muxln net.Listener + + TSDBStore TSDBStore } func newTestWriteService(f func(shardID uint64, points []models.Point) error) testService { @@ -45,11 +41,12 @@ func newTestWriteService(f func(shardID uint64, points []models.Point) error) te muxln := mux.Listen(cluster.MuxHeader) go mux.Serve(ln) - return testService{ - writeShardFunc: f, - ln: ln, - muxln: muxln, + s := testService{ + ln: ln, + muxln: muxln, } + s.TSDBStore.WriteToShardFn = f + return s } func (ts *testService) Close() { @@ -65,30 +62,6 @@ type serviceResponse struct { points []models.Point } -func (t testService) WriteToShard(shardID uint64, points []models.Point) error { - return t.writeShardFunc(shardID, points) -} - -func (t testService) CreateShard(database, policy string, shardID uint64) error { - return t.createShardFunc(database, policy, shardID) -} - -func (t testService) DeleteDatabase(database string) error { - return t.deleteDatabaseFunc(database) -} - -func (t testService) DeleteMeasurement(database, name string) error { - return t.deleteMeasurementFunc(database, name) -} - -func (t testService) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error { - return t.deleteSeriesFunc(database, sources, condition) -} - -func (t testService) DeleteRetentionPolicy(database, name string) error { - return t.deleteRetentionPolicyFunc(database, name) -} - func writeShardSuccess(shardID uint64, points []models.Point) error { responses <- &serviceResponse{ shardID: shardID, @@ -122,3 +95,76 @@ func (testService) ResponseN(n int) ([]*serviceResponse, error) { } } } + +// Service is a test wrapper for cluster.Service. +type Service struct { + *cluster.Service + + ln net.Listener + TSDBStore TSDBStore +} + +// NewService returns a new instance of Service. +func NewService() *Service { + s := &Service{ + Service: cluster.NewService(cluster.Config{}), + } + s.Service.TSDBStore = &s.TSDBStore + return s +} + +// MustOpenService returns a new, open service on a random port. Panic on error. +func MustOpenService() *Service { + s := NewService() + s.ln = MustListen("tcp", "127.0.0.1:0") + s.Listener = &muxListener{s.ln} + if err := s.Open(); err != nil { + panic(err) + } + return s +} + +// Close closes the listener and waits for the service to close. +func (s *Service) Close() error { + if s.ln != nil { + s.ln.Close() + } + return s.Service.Close() +} + +// Addr returns the network address of the service. +func (s *Service) Addr() net.Addr { return s.ln.Addr() } + +// muxListener is a net.Listener implementation that strips off the first byte. +// This is used to simulate the listener from pkg/mux. +type muxListener struct { + net.Listener +} + +// Accept accepts the next connection and removes the first byte. +func (ln *muxListener) Accept() (net.Conn, error) { + conn, err := ln.Listener.Accept() + if err != nil { + return nil, err + } + + var buf [1]byte + if _, err := io.ReadFull(conn, buf[:]); err != nil { + conn.Close() + return nil, err + } else if buf[0] != cluster.MuxHeader { + conn.Close() + panic(fmt.Sprintf("unexpected mux header byte: %d", buf[0])) + } + + return conn, nil +} + +// MustListen opens a listener. Panic on error. +func MustListen(network, laddr string) net.Listener { + ln, err := net.Listen(network, laddr) + if err != nil { + panic(err) + } + return ln +} diff --git a/cluster/shard_mapper.go b/cluster/shard_mapper.go deleted file mode 100644 index 7f757f4230..0000000000 --- a/cluster/shard_mapper.go +++ /dev/null @@ -1,264 +0,0 @@ -package cluster - -/* -import ( - "encoding/json" - "fmt" - "net" - "time" - - "github.com/influxdata/influxdb" - "github.com/influxdata/influxdb/influxql" - "github.com/influxdata/influxdb/services/meta" - "github.com/influxdata/influxdb/tsdb" -) - -// ShardMapper is responsible for providing mappers for requested shards. It is -// responsible for creating those mappers from the local store, or reaching -// out to another node on the cluster. -type ShardMapper struct { - ForceRemoteMapping bool // All shards treated as remote. Useful for testing. - - Node *influxdb.Node - - MetaClient interface { - DataNode(id uint64) (ni *meta.NodeInfo, err error) - } - - TSDBStore interface { - // CreateMapper(shardID uint64, stmt influxql.Statement, chunkSize int) (tsdb.Mapper, error) - } - - timeout time.Duration - pool *clientPool -} - -// NewShardMapper returns a mapper of local and remote shards. -func NewShardMapper(timeout time.Duration) *ShardMapper { - return &ShardMapper{ - pool: newClientPool(), - timeout: timeout, - } -} - -// CreateMapper returns a Mapper for the given shard ID. -func (s *ShardMapper) CreateMapper(sh meta.ShardInfo, stmt influxql.Statement, chunkSize int) (tsdb.Mapper, error) { - // Create a remote mapper if the local node doesn't own the shard. - if !sh.OwnedBy(s.Node.ID) || s.ForceRemoteMapping { - // Pick a node in a pseudo-random manner. - conn, err := s.dial(sh.Owners[rand.Intn(len(sh.Owners))].NodeID) - if err != nil { - return nil, err - } - conn.SetDeadline(time.Now().Add(s.timeout)) - - return NewRemoteMapper(conn, sh.ID, stmt, chunkSize), nil - } - - // If it is local then return the mapper from the store. - m, err := s.TSDBStore.CreateMapper(sh.ID, stmt, chunkSize) - if err != nil { - return nil, err - } - return m, nil -} - -func (s *ShardMapper) dial(nodeID uint64) (net.Conn, error) { - ni, err := s.MetaClient.DataNode(nodeID) - if err != nil { - return nil, err - } - conn, err := net.Dial("tcp", ni.TCPHost) - if err != nil { - return nil, err - } - - // Write the cluster multiplexing header byte - conn.Write([]byte{MuxHeader}) - - return conn, nil -} - -// RemoteMapper implements the tsdb.Mapper interface. It connects to a remote node, -// sends a query, and interprets the stream of data that comes back. -type RemoteMapper struct { - shardID uint64 - stmt influxql.Statement - chunkSize int - - tagsets []string - fields []string - - conn net.Conn - bufferedResponse *MapShardResponse - - // unmarshallers []tsdb.UnmarshalFunc // Mapping-specific unmarshal functions. -} - -// NewRemoteMapper returns a new remote mapper using the given connection. -func NewRemoteMapper(c net.Conn, shardID uint64, stmt influxql.Statement, chunkSize int) *RemoteMapper { - return &RemoteMapper{ - conn: c, - shardID: shardID, - stmt: stmt, - chunkSize: chunkSize, - } -} - -// Open connects to the remote node and starts receiving data. -func (r *RemoteMapper) Open() (err error) { - defer func() { - if err != nil { - r.conn.Close() - } - }() - - // Build Map request. - var request MapShardRequest - request.SetShardID(r.shardID) - request.SetQuery(r.stmt.String()) - request.SetChunkSize(int32(r.chunkSize)) - - // Marshal into protocol buffers. - buf, err := request.MarshalBinary() - if err != nil { - return err - } - - // Write request. - if err := WriteTLV(r.conn, mapShardRequestMessage, buf); err != nil { - return err - } - - // Read the response. - _, buf, err = ReadTLV(r.conn) - if err != nil { - return err - } - - // Unmarshal response. - r.bufferedResponse = &MapShardResponse{} - if err := r.bufferedResponse.UnmarshalBinary(buf); err != nil { - return err - } - - if r.bufferedResponse.Code() != 0 { - return fmt.Errorf("error code %d: %s", r.bufferedResponse.Code(), r.bufferedResponse.Message()) - } - - // Decode the first response to get the TagSets. - r.tagsets = r.bufferedResponse.TagSets() - r.fields = r.bufferedResponse.Fields() - - // Set up each mapping function for this statement. - if stmt, ok := r.stmt.(*influxql.SelectStatement); ok { - for _, c := range stmt.FunctionCalls() { - fn, err := tsdb.InitializeUnmarshaller(c) - if err != nil { - return err - } - r.unmarshallers = append(r.unmarshallers, fn) - } - } - - return nil -} - -// TagSets returns the TagSets -func (r *RemoteMapper) TagSets() []string { - return r.tagsets -} - -// Fields returns RemoteMapper's Fields -func (r *RemoteMapper) Fields() []string { - return r.fields -} - -// NextChunk returns the next chunk read from the remote node to the client. -func (r *RemoteMapper) NextChunk() (chunk interface{}, err error) { - var response *MapShardResponse - if r.bufferedResponse != nil { - response = r.bufferedResponse - r.bufferedResponse = nil - } else { - response = &MapShardResponse{} - - // Read the response. - _, buf, err := ReadTLV(r.conn) - if err != nil { - return nil, err - } - - // Unmarshal response. - if err := response.UnmarshalBinary(buf); err != nil { - return nil, err - } - - if response.Code() != 0 { - return nil, fmt.Errorf("error code %d: %s", response.Code(), response.Message()) - } - } - - if response.Data() == nil { - return nil, nil - } - - moj := &tsdb.MapperOutputJSON{} - if err := json.Unmarshal(response.Data(), moj); err != nil { - return nil, err - } - mvj := []*tsdb.MapperValueJSON{} - if err := json.Unmarshal(moj.Values, &mvj); err != nil { - return nil, err - } - - // Prep the non-JSON version of Mapper output. - mo := &tsdb.MapperOutput{ - Name: moj.Name, - Tags: moj.Tags, - Fields: moj.Fields, - CursorKey: moj.CursorKey, - } - - if len(mvj) == 1 && len(mvj[0].AggData) > 0 { - // The MapperValue is carrying aggregate data, so run it through the - // custom unmarshallers for the map functions through which the data - // was mapped. - aggValues := []interface{}{} - for i, b := range mvj[0].AggData { - v, err := r.unmarshallers[i](b) - if err != nil { - return nil, err - } - aggValues = append(aggValues, v) - } - mo.Values = []*tsdb.MapperValue{&tsdb.MapperValue{ - Time: mvj[0].Time, - Value: aggValues, - Tags: mvj[0].Tags, - }} - } else { - // Must be raw data instead. - for _, v := range mvj { - var rawValue interface{} - if err := json.Unmarshal(v.RawData, &rawValue); err != nil { - return nil, err - } - - mo.Values = append(mo.Values, &tsdb.MapperValue{ - Time: v.Time, - Value: rawValue, - Tags: v.Tags, - }) - } - } - - return mo, nil -} - -// Close the Mapper -func (r *RemoteMapper) Close() { - r.conn.Close() -} - -*/ diff --git a/cluster/shard_mapper_test.go b/cluster/shard_mapper_test.go deleted file mode 100644 index 94e4c0def9..0000000000 --- a/cluster/shard_mapper_test.go +++ /dev/null @@ -1,112 +0,0 @@ -package cluster - -/* -import ( - "bytes" - "encoding/json" - "fmt" - "io" - "net" - "testing" - - "github.com/influxdata/influxdb/influxql" - "github.com/influxdata/influxdb/tsdb" -) - -// remoteShardResponder implements the remoteShardConn interface. -type remoteShardResponder struct { - net.Conn - t *testing.T - rxBytes []byte - - buffer *bytes.Buffer -} - -func newRemoteShardResponder(outputs []*tsdb.MapperOutput, tagsets []string) *remoteShardResponder { - r := &remoteShardResponder{} - a := make([]byte, 0, 1024) - r.buffer = bytes.NewBuffer(a) - - // Pump the outputs in the buffer for later reading. - for _, o := range outputs { - resp := &MapShardResponse{} - resp.SetCode(0) - if o != nil { - d, _ := json.Marshal(o) - resp.SetData(d) - resp.SetTagSets(tagsets) - } - - g, _ := resp.MarshalBinary() - WriteTLV(r.buffer, mapShardResponseMessage, g) - } - - return r -} - -func (r remoteShardResponder) Close() error { return nil } -func (r remoteShardResponder) Read(p []byte) (n int, err error) { - return io.ReadFull(r.buffer, p) -} - -func (r remoteShardResponder) Write(p []byte) (n int, err error) { - if r.rxBytes == nil { - r.rxBytes = make([]byte, 0) - } - r.rxBytes = append(r.rxBytes, p...) - return len(p), nil -} - -// Ensure a RemoteMapper can process valid responses from a remote shard. -func TestShardWriter_RemoteMapper_Success(t *testing.T) { - expTagSets := []string{"tagsetA"} - expOutput := &tsdb.MapperOutput{ - Name: "cpu", - Tags: map[string]string{"host": "serverA"}, - } - - c := newRemoteShardResponder([]*tsdb.MapperOutput{expOutput, nil}, expTagSets) - - r := NewRemoteMapper(c, 1234, mustParseStmt("SELECT * FROM CPU"), 10) - if err := r.Open(); err != nil { - t.Fatalf("failed to open remote mapper: %s", err.Error()) - } - - if r.TagSets()[0] != expTagSets[0] { - t.Fatalf("incorrect tagsets received, exp %v, got %v", expTagSets, r.TagSets()) - } - - // Get first chunk from mapper. - chunk, err := r.NextChunk() - if err != nil { - t.Fatalf("failed to get next chunk from mapper: %s", err.Error()) - } - output, ok := chunk.(*tsdb.MapperOutput) - if !ok { - t.Fatal("chunk is not of expected type") - } - if output.Name != "cpu" { - t.Fatalf("received output incorrect, exp: %v, got %v", expOutput, output) - } - - // Next chunk should be nil, indicating no more data. - chunk, err = r.NextChunk() - if err != nil { - t.Fatalf("failed to get next chunk from mapper: %s", err.Error()) - } - if chunk != nil { - t.Fatal("received more chunks when none expected") - } -} - -// mustParseStmt parses a single statement or panics. -func mustParseStmt(stmt string) influxql.Statement { - q, err := influxql.ParseQuery(stmt) - if err != nil { - panic(err) - } else if len(q.Statements) != 1 { - panic(fmt.Sprintf("expected 1 statement but got %d", len(q.Statements))) - } - return q.Statements[0] -} -*/ diff --git a/cluster/shard_writer.go b/cluster/shard_writer.go index 80a4289585..59e001a48f 100644 --- a/cluster/shard_writer.go +++ b/cluster/shard_writer.go @@ -12,10 +12,18 @@ import ( const ( writeShardRequestMessage byte = iota + 1 writeShardResponseMessage - mapShardRequestMessage - mapShardResponseMessage + executeStatementRequestMessage executeStatementResponseMessage + + createIteratorRequestMessage + createIteratorResponseMessage + + fieldDimensionsRequestMessage + fieldDimensionsResponseMessage + + seriesKeysRequestMessage + seriesKeysResponseMessage ) // ShardWriter writes a set of points to a shard. diff --git a/cluster/shard_writer_test.go b/cluster/shard_writer_test.go index fab8e6bc82..b7fbfa266f 100644 --- a/cluster/shard_writer_test.go +++ b/cluster/shard_writer_test.go @@ -16,7 +16,7 @@ func TestShardWriter_WriteShard_Success(t *testing.T) { ts := newTestWriteService(writeShardSuccess) s := cluster.NewService(cluster.Config{}) s.Listener = ts.muxln - s.TSDBStore = ts + s.TSDBStore = &ts.TSDBStore if err := s.Open(); err != nil { t.Fatal(err) } @@ -63,7 +63,7 @@ func TestShardWriter_WriteShard_Multiple(t *testing.T) { ts := newTestWriteService(writeShardSuccess) s := cluster.NewService(cluster.Config{}) s.Listener = ts.muxln - s.TSDBStore = ts + s.TSDBStore = &ts.TSDBStore if err := s.Open(); err != nil { t.Fatal(err) } @@ -112,7 +112,7 @@ func TestShardWriter_WriteShard_Error(t *testing.T) { ts := newTestWriteService(writeShardFail) s := cluster.NewService(cluster.Config{}) s.Listener = ts.muxln - s.TSDBStore = ts + s.TSDBStore = &ts.TSDBStore if err := s.Open(); err != nil { t.Fatal(err) } @@ -140,7 +140,7 @@ func TestShardWriter_Write_ErrDialTimeout(t *testing.T) { ts := newTestWriteService(writeShardSuccess) s := cluster.NewService(cluster.Config{}) s.Listener = ts.muxln - s.TSDBStore = ts + s.TSDBStore = &ts.TSDBStore if err := s.Open(); err != nil { t.Fatal(err) } @@ -195,7 +195,7 @@ func TestShardWriter_Write_PoolMax(t *testing.T) { ShardWriterTimeout: toml.Duration(100 * time.Millisecond), }) s.Listener = ts.muxln - s.TSDBStore = ts + s.TSDBStore = &ts.TSDBStore if err := s.Open(); err != nil { t.Fatal(err) } diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index f8a38c1829..b1d90c2d57 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -62,13 +62,12 @@ type Server struct { MetaClient *meta.Client MetaService *meta.Service - TSDBStore *tsdb.Store - QueryExecutor *cluster.QueryExecutor - PointsWriter *cluster.PointsWriter - ShardWriter *cluster.ShardWriter - IteratorCreator *cluster.IteratorCreator - HintedHandoff *hh.Service - Subscriber *subscriber.Service + TSDBStore *tsdb.Store + QueryExecutor *cluster.QueryExecutor + PointsWriter *cluster.PointsWriter + ShardWriter *cluster.ShardWriter + HintedHandoff *hh.Service + Subscriber *subscriber.Service Services []Service @@ -435,6 +434,8 @@ func (s *Server) Open() error { } } + s.QueryExecutor.Node = s.Node + s.Subscriber.MetaClient = s.MetaClient s.ShardWriter.MetaClient = s.MetaClient s.HintedHandoff.MetaClient = s.MetaClient diff --git a/influxql/ast.go b/influxql/ast.go index 3c221ab93f..08e8b76eff 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -9,6 +9,9 @@ import ( "strconv" "strings" "time" + + "github.com/gogo/protobuf/proto" + "github.com/influxdata/influxdb/influxql/internal" ) // DataType represents the primitive data types available in InfluxQL. @@ -321,6 +324,33 @@ func (a Sources) String() string { return buf.String() } +// MarshalBinary encodes a list of sources to a binary format. +func (a Sources) MarshalBinary() ([]byte, error) { + var pb internal.Measurements + pb.Items = make([]*internal.Measurement, len(a)) + for i, source := range a { + pb.Items[i] = encodeMeasurement(source.(*Measurement)) + } + return proto.Marshal(&pb) +} + +// UnmarshalBinary decodes binary data into a list of sources. +func (a *Sources) UnmarshalBinary(buf []byte) error { + var pb internal.Measurements + if err := proto.Unmarshal(buf, &pb); err != nil { + return err + } + *a = make(Sources, len(pb.GetItems())) + for i := range pb.GetItems() { + mm, err := decodeMeasurement(pb.GetItems()[i]) + if err != nil { + return err + } + (*a)[i] = mm + } + return nil +} + // IsSystemName returns true if name is an internal system name. // System names are prefixed with an underscore. func IsSystemName(name string) bool { return strings.HasPrefix(name, "_") } @@ -2799,6 +2829,38 @@ func (m *Measurement) String() string { return buf.String() } +func encodeMeasurement(mm *Measurement) *internal.Measurement { + pb := &internal.Measurement{ + Database: proto.String(mm.Database), + RetentionPolicy: proto.String(mm.RetentionPolicy), + Name: proto.String(mm.Name), + IsTarget: proto.Bool(mm.IsTarget), + } + if mm.Regex != nil { + pb.Regex = proto.String(mm.Regex.String()) + } + return pb +} + +func decodeMeasurement(pb *internal.Measurement) (*Measurement, error) { + mm := &Measurement{ + Database: pb.GetDatabase(), + RetentionPolicy: pb.GetRetentionPolicy(), + Name: pb.GetName(), + IsTarget: pb.GetIsTarget(), + } + + if pb.Regex != nil { + regex, err := regexp.Compile(pb.GetRegex()) + if err != nil { + return nil, fmt.Errorf("invalid binary measurement regex: value=%q, err=%s", pb.GetRegex(), err) + } + mm.Regex = &RegexLiteral{Val: regex} + } + + return mm, nil +} + // VarRef represents a reference to a variable. type VarRef struct { Val string diff --git a/influxql/internal/internal.pb.go b/influxql/internal/internal.pb.go index 0af9e6efb1..70ab1b8305 100644 --- a/influxql/internal/internal.pb.go +++ b/influxql/internal/internal.pb.go @@ -11,29 +11,31 @@ It is generated from these files: It has these top-level messages: Point Aux + IteratorOptions + Measurements + Measurement + Interval */ package internal import proto "github.com/gogo/protobuf/proto" -import fmt "fmt" import math "math" // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal -var _ = fmt.Errorf var _ = math.Inf type Point struct { - Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"` - Tags *string `protobuf:"bytes,2,req,name=Tags" json:"Tags,omitempty"` - Time *int64 `protobuf:"varint,3,req,name=Time" json:"Time,omitempty"` - Nil *bool `protobuf:"varint,4,req,name=Nil" json:"Nil,omitempty"` - Aux []*Aux `protobuf:"bytes,5,rep,name=Aux" json:"Aux,omitempty"` - Aggregated *uint32 `protobuf:"varint,6,opt,name=Aggregated" json:"Aggregated,omitempty"` - FloatValue *float64 `protobuf:"fixed64,7,opt,name=FloatValue" json:"FloatValue,omitempty"` - IntegerValue *int64 `protobuf:"varint,8,opt,name=IntegerValue" json:"IntegerValue,omitempty"` - StringValue *string `protobuf:"bytes,9,opt,name=StringValue" json:"StringValue,omitempty"` - BooleanValue *bool `protobuf:"varint,10,opt,name=BooleanValue" json:"BooleanValue,omitempty"` + Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` + Tags *string `protobuf:"bytes,2,req" json:"Tags,omitempty"` + Time *int64 `protobuf:"varint,3,req" json:"Time,omitempty"` + Nil *bool `protobuf:"varint,4,req" json:"Nil,omitempty"` + Aux []*Aux `protobuf:"bytes,5,rep" json:"Aux,omitempty"` + Aggregated *uint32 `protobuf:"varint,6,opt" json:"Aggregated,omitempty"` + FloatValue *float64 `protobuf:"fixed64,7,opt" json:"FloatValue,omitempty"` + IntegerValue *int64 `protobuf:"varint,8,opt" json:"IntegerValue,omitempty"` + StringValue *string `protobuf:"bytes,9,opt" json:"StringValue,omitempty"` + BooleanValue *bool `protobuf:"varint,10,opt" json:"BooleanValue,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -112,11 +114,11 @@ func (m *Point) GetBooleanValue() bool { } type Aux struct { - DataType *int32 `protobuf:"varint,1,req,name=DataType" json:"DataType,omitempty"` - FloatValue *float64 `protobuf:"fixed64,2,opt,name=FloatValue" json:"FloatValue,omitempty"` - IntegerValue *int64 `protobuf:"varint,3,opt,name=IntegerValue" json:"IntegerValue,omitempty"` - StringValue *string `protobuf:"bytes,4,opt,name=StringValue" json:"StringValue,omitempty"` - BooleanValue *bool `protobuf:"varint,5,opt,name=BooleanValue" json:"BooleanValue,omitempty"` + DataType *int32 `protobuf:"varint,1,req" json:"DataType,omitempty"` + FloatValue *float64 `protobuf:"fixed64,2,opt" json:"FloatValue,omitempty"` + IntegerValue *int64 `protobuf:"varint,3,opt" json:"IntegerValue,omitempty"` + StringValue *string `protobuf:"bytes,4,opt" json:"StringValue,omitempty"` + BooleanValue *bool `protobuf:"varint,5,opt" json:"BooleanValue,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -159,7 +161,229 @@ func (m *Aux) GetBooleanValue() bool { return false } -func init() { - proto.RegisterType((*Point)(nil), "internal.Point") - proto.RegisterType((*Aux)(nil), "internal.Aux") +type IteratorOptions struct { + Expr *string `protobuf:"bytes,1,opt" json:"Expr,omitempty"` + Aux []string `protobuf:"bytes,2,rep" json:"Aux,omitempty"` + Sources []*Measurement `protobuf:"bytes,3,rep" json:"Sources,omitempty"` + Interval *Interval `protobuf:"bytes,4,opt" json:"Interval,omitempty"` + Dimensions []string `protobuf:"bytes,5,rep" json:"Dimensions,omitempty"` + Fill *int32 `protobuf:"varint,6,opt" json:"Fill,omitempty"` + FillValue *float64 `protobuf:"fixed64,7,opt" json:"FillValue,omitempty"` + Condition *string `protobuf:"bytes,8,opt" json:"Condition,omitempty"` + StartTime *int64 `protobuf:"varint,9,opt" json:"StartTime,omitempty"` + EndTime *int64 `protobuf:"varint,10,opt" json:"EndTime,omitempty"` + Ascending *bool `protobuf:"varint,11,opt" json:"Ascending,omitempty"` + Limit *int64 `protobuf:"varint,12,opt" json:"Limit,omitempty"` + Offset *int64 `protobuf:"varint,13,opt" json:"Offset,omitempty"` + SLimit *int64 `protobuf:"varint,14,opt" json:"SLimit,omitempty"` + SOffset *int64 `protobuf:"varint,15,opt" json:"SOffset,omitempty"` + Dedupe *bool `protobuf:"varint,16,opt" json:"Dedupe,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *IteratorOptions) Reset() { *m = IteratorOptions{} } +func (m *IteratorOptions) String() string { return proto.CompactTextString(m) } +func (*IteratorOptions) ProtoMessage() {} + +func (m *IteratorOptions) GetExpr() string { + if m != nil && m.Expr != nil { + return *m.Expr + } + return "" +} + +func (m *IteratorOptions) GetAux() []string { + if m != nil { + return m.Aux + } + return nil +} + +func (m *IteratorOptions) GetSources() []*Measurement { + if m != nil { + return m.Sources + } + return nil +} + +func (m *IteratorOptions) GetInterval() *Interval { + if m != nil { + return m.Interval + } + return nil +} + +func (m *IteratorOptions) GetDimensions() []string { + if m != nil { + return m.Dimensions + } + return nil +} + +func (m *IteratorOptions) GetFill() int32 { + if m != nil && m.Fill != nil { + return *m.Fill + } + return 0 +} + +func (m *IteratorOptions) GetFillValue() float64 { + if m != nil && m.FillValue != nil { + return *m.FillValue + } + return 0 +} + +func (m *IteratorOptions) GetCondition() string { + if m != nil && m.Condition != nil { + return *m.Condition + } + return "" +} + +func (m *IteratorOptions) GetStartTime() int64 { + if m != nil && m.StartTime != nil { + return *m.StartTime + } + return 0 +} + +func (m *IteratorOptions) GetEndTime() int64 { + if m != nil && m.EndTime != nil { + return *m.EndTime + } + return 0 +} + +func (m *IteratorOptions) GetAscending() bool { + if m != nil && m.Ascending != nil { + return *m.Ascending + } + return false +} + +func (m *IteratorOptions) GetLimit() int64 { + if m != nil && m.Limit != nil { + return *m.Limit + } + return 0 +} + +func (m *IteratorOptions) GetOffset() int64 { + if m != nil && m.Offset != nil { + return *m.Offset + } + return 0 +} + +func (m *IteratorOptions) GetSLimit() int64 { + if m != nil && m.SLimit != nil { + return *m.SLimit + } + return 0 +} + +func (m *IteratorOptions) GetSOffset() int64 { + if m != nil && m.SOffset != nil { + return *m.SOffset + } + return 0 +} + +func (m *IteratorOptions) GetDedupe() bool { + if m != nil && m.Dedupe != nil { + return *m.Dedupe + } + return false +} + +type Measurements struct { + Items []*Measurement `protobuf:"bytes,1,rep" json:"Items,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Measurements) Reset() { *m = Measurements{} } +func (m *Measurements) String() string { return proto.CompactTextString(m) } +func (*Measurements) ProtoMessage() {} + +func (m *Measurements) GetItems() []*Measurement { + if m != nil { + return m.Items + } + return nil +} + +type Measurement struct { + Database *string `protobuf:"bytes,1,opt" json:"Database,omitempty"` + RetentionPolicy *string `protobuf:"bytes,2,opt" json:"RetentionPolicy,omitempty"` + Name *string `protobuf:"bytes,3,opt" json:"Name,omitempty"` + Regex *string `protobuf:"bytes,4,opt" json:"Regex,omitempty"` + IsTarget *bool `protobuf:"varint,5,opt" json:"IsTarget,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Measurement) Reset() { *m = Measurement{} } +func (m *Measurement) String() string { return proto.CompactTextString(m) } +func (*Measurement) ProtoMessage() {} + +func (m *Measurement) GetDatabase() string { + if m != nil && m.Database != nil { + return *m.Database + } + return "" +} + +func (m *Measurement) GetRetentionPolicy() string { + if m != nil && m.RetentionPolicy != nil { + return *m.RetentionPolicy + } + return "" +} + +func (m *Measurement) GetName() string { + if m != nil && m.Name != nil { + return *m.Name + } + return "" +} + +func (m *Measurement) GetRegex() string { + if m != nil && m.Regex != nil { + return *m.Regex + } + return "" +} + +func (m *Measurement) GetIsTarget() bool { + if m != nil && m.IsTarget != nil { + return *m.IsTarget + } + return false +} + +type Interval struct { + Duration *int64 `protobuf:"varint,1,opt" json:"Duration,omitempty"` + Offset *int64 `protobuf:"varint,2,opt" json:"Offset,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Interval) Reset() { *m = Interval{} } +func (m *Interval) String() string { return proto.CompactTextString(m) } +func (*Interval) ProtoMessage() {} + +func (m *Interval) GetDuration() int64 { + if m != nil && m.Duration != nil { + return *m.Duration + } + return 0 +} + +func (m *Interval) GetOffset() int64 { + if m != nil && m.Offset != nil { + return *m.Offset + } + return 0 +} + +func init() { } diff --git a/influxql/internal/internal.proto b/influxql/internal/internal.proto index 1d05ea2112..b03057e719 100644 --- a/influxql/internal/internal.proto +++ b/influxql/internal/internal.proto @@ -21,3 +21,39 @@ message Aux { optional string StringValue = 4; optional bool BooleanValue = 5; } + +message IteratorOptions { + optional string Expr = 1; + repeated string Aux = 2; + repeated Measurement Sources = 3; + optional Interval Interval = 4; + repeated string Dimensions = 5; + optional int32 Fill = 6; + optional double FillValue = 7; + optional string Condition = 8; + optional int64 StartTime = 9; + optional int64 EndTime = 10; + optional bool Ascending = 11; + optional int64 Limit = 12; + optional int64 Offset = 13; + optional int64 SLimit = 14; + optional int64 SOffset = 15; + optional bool Dedupe = 16; +} + +message Measurements { + repeated Measurement Items = 1; +} + +message Measurement { + optional string Database = 1; + optional string RetentionPolicy = 2; + optional string Name = 3; + optional string Regex = 4; + optional bool IsTarget = 5; +} + +message Interval { + optional int64 Duration = 1; + optional int64 Offset = 2; +} diff --git a/influxql/iterator.gen.go b/influxql/iterator.gen.go index 20dc122381..9da25a01b6 100644 --- a/influxql/iterator.gen.go +++ b/influxql/iterator.gen.go @@ -10,6 +10,7 @@ import ( "container/heap" "errors" "fmt" + "io" "log" "sort" "sync" @@ -909,6 +910,52 @@ func (itr *floatDedupeIterator) Next() *FloatPoint { } } +// floatReaderIterator represents an iterator that streams from a reader. +type floatReaderIterator struct { + r io.Reader + dec *FloatPointDecoder + first *FloatPoint +} + +// newFloatReaderIterator returns a new instance of floatReaderIterator. +func newFloatReaderIterator(r io.Reader, first *FloatPoint) *floatReaderIterator { + return &floatReaderIterator{ + r: r, + dec: NewFloatPointDecoder(r), + first: first, + } +} + +// Close closes the underlying reader, if applicable. +func (itr *floatReaderIterator) Close() error { + if r, ok := itr.r.(io.ReadCloser); ok { + return r.Close() + } + return nil +} + +// Next returns the next point from the iterator. +func (itr *floatReaderIterator) Next() *FloatPoint { + // Send first point if it hasn't been sent yet. + if itr.first != nil { + p := itr.first + itr.first = nil + return p + } + + // OPTIMIZE(benbjohnson): Reuse point on iterator. + + // Unmarshal next point. + p := &FloatPoint{} + if err := itr.dec.DecodeFloatPoint(p); err == io.EOF { + return nil + } else if err != nil { + log.Printf("error reading iterator point: %s", err) + return nil + } + return p +} + // IntegerIterator represents a stream of integer points. type IntegerIterator interface { Iterator @@ -1798,6 +1845,52 @@ func (itr *integerDedupeIterator) Next() *IntegerPoint { } } +// integerReaderIterator represents an iterator that streams from a reader. +type integerReaderIterator struct { + r io.Reader + dec *IntegerPointDecoder + first *IntegerPoint +} + +// newIntegerReaderIterator returns a new instance of integerReaderIterator. +func newIntegerReaderIterator(r io.Reader, first *IntegerPoint) *integerReaderIterator { + return &integerReaderIterator{ + r: r, + dec: NewIntegerPointDecoder(r), + first: first, + } +} + +// Close closes the underlying reader, if applicable. +func (itr *integerReaderIterator) Close() error { + if r, ok := itr.r.(io.ReadCloser); ok { + return r.Close() + } + return nil +} + +// Next returns the next point from the iterator. +func (itr *integerReaderIterator) Next() *IntegerPoint { + // Send first point if it hasn't been sent yet. + if itr.first != nil { + p := itr.first + itr.first = nil + return p + } + + // OPTIMIZE(benbjohnson): Reuse point on iterator. + + // Unmarshal next point. + p := &IntegerPoint{} + if err := itr.dec.DecodeIntegerPoint(p); err == io.EOF { + return nil + } else if err != nil { + log.Printf("error reading iterator point: %s", err) + return nil + } + return p +} + // StringIterator represents a stream of string points. type StringIterator interface { Iterator @@ -2687,6 +2780,52 @@ func (itr *stringDedupeIterator) Next() *StringPoint { } } +// stringReaderIterator represents an iterator that streams from a reader. +type stringReaderIterator struct { + r io.Reader + dec *StringPointDecoder + first *StringPoint +} + +// newStringReaderIterator returns a new instance of stringReaderIterator. +func newStringReaderIterator(r io.Reader, first *StringPoint) *stringReaderIterator { + return &stringReaderIterator{ + r: r, + dec: NewStringPointDecoder(r), + first: first, + } +} + +// Close closes the underlying reader, if applicable. +func (itr *stringReaderIterator) Close() error { + if r, ok := itr.r.(io.ReadCloser); ok { + return r.Close() + } + return nil +} + +// Next returns the next point from the iterator. +func (itr *stringReaderIterator) Next() *StringPoint { + // Send first point if it hasn't been sent yet. + if itr.first != nil { + p := itr.first + itr.first = nil + return p + } + + // OPTIMIZE(benbjohnson): Reuse point on iterator. + + // Unmarshal next point. + p := &StringPoint{} + if err := itr.dec.DecodeStringPoint(p); err == io.EOF { + return nil + } else if err != nil { + log.Printf("error reading iterator point: %s", err) + return nil + } + return p +} + // BooleanIterator represents a stream of boolean points. type BooleanIterator interface { Iterator @@ -3575,3 +3714,143 @@ func (itr *booleanDedupeIterator) Next() *BooleanPoint { return p } } + +// booleanReaderIterator represents an iterator that streams from a reader. +type booleanReaderIterator struct { + r io.Reader + dec *BooleanPointDecoder + first *BooleanPoint +} + +// newBooleanReaderIterator returns a new instance of booleanReaderIterator. +func newBooleanReaderIterator(r io.Reader, first *BooleanPoint) *booleanReaderIterator { + return &booleanReaderIterator{ + r: r, + dec: NewBooleanPointDecoder(r), + first: first, + } +} + +// Close closes the underlying reader, if applicable. +func (itr *booleanReaderIterator) Close() error { + if r, ok := itr.r.(io.ReadCloser); ok { + return r.Close() + } + return nil +} + +// Next returns the next point from the iterator. +func (itr *booleanReaderIterator) Next() *BooleanPoint { + // Send first point if it hasn't been sent yet. + if itr.first != nil { + p := itr.first + itr.first = nil + return p + } + + // OPTIMIZE(benbjohnson): Reuse point on iterator. + + // Unmarshal next point. + p := &BooleanPoint{} + if err := itr.dec.DecodeBooleanPoint(p); err == io.EOF { + return nil + } else if err != nil { + log.Printf("error reading iterator point: %s", err) + return nil + } + return p +} + +// IteratorEncoder is an encoder for encoding an iterator's points to w. +type IteratorEncoder struct { + w io.Writer +} + +// NewIteratorEncoder encodes an iterator's points to w. +func NewIteratorEncoder(w io.Writer) *IteratorEncoder { + return &IteratorEncoder{w: w} +} + +// Encode encodes and writes all of itr's points to the underlying writer. +func (enc *IteratorEncoder) EncodeIterator(itr Iterator) error { + switch itr := itr.(type) { + case FloatIterator: + return enc.encodeFloatIterator(itr) + case IntegerIterator: + return enc.encodeIntegerIterator(itr) + case StringIterator: + return enc.encodeStringIterator(itr) + case BooleanIterator: + return enc.encodeBooleanIterator(itr) + default: + panic(fmt.Sprintf("unsupported iterator for encoder: %T", itr)) + } +} + +// encodeFloatIterator encodes all points from itr to the underlying writer. +func (enc *IteratorEncoder) encodeFloatIterator(itr FloatIterator) error { + penc := NewFloatPointEncoder(enc.w) + for { + // Retrieve the next point from the iterator. + p := itr.Next() + if p == nil { + return nil + } + + // Write the point to the point encoder. + if err := penc.EncodeFloatPoint(p); err != nil { + return err + } + } +} + +// encodeIntegerIterator encodes all points from itr to the underlying writer. +func (enc *IteratorEncoder) encodeIntegerIterator(itr IntegerIterator) error { + penc := NewIntegerPointEncoder(enc.w) + for { + // Retrieve the next point from the iterator. + p := itr.Next() + if p == nil { + return nil + } + + // Write the point to the point encoder. + if err := penc.EncodeIntegerPoint(p); err != nil { + return err + } + } +} + +// encodeStringIterator encodes all points from itr to the underlying writer. +func (enc *IteratorEncoder) encodeStringIterator(itr StringIterator) error { + penc := NewStringPointEncoder(enc.w) + for { + // Retrieve the next point from the iterator. + p := itr.Next() + if p == nil { + return nil + } + + // Write the point to the point encoder. + if err := penc.EncodeStringPoint(p); err != nil { + return err + } + } +} + +// encodeBooleanIterator encodes all points from itr to the underlying writer. +func (enc *IteratorEncoder) encodeBooleanIterator(itr BooleanIterator) error { + penc := NewBooleanPointEncoder(enc.w) + for { + // Retrieve the next point from the iterator. + p := itr.Next() + if p == nil { + return nil + } + + // Write the point to the point encoder. + if err := penc.EncodeBooleanPoint(p); err != nil { + return err + } + } +} diff --git a/influxql/iterator.gen.go.tmpl b/influxql/iterator.gen.go.tmpl index 391e2cf993..5dbe07a286 100644 --- a/influxql/iterator.gen.go.tmpl +++ b/influxql/iterator.gen.go.tmpl @@ -4,6 +4,7 @@ import ( "container/heap" "errors" "fmt" + "io" "sort" "sync" "log" @@ -908,4 +909,97 @@ func (itr *{{.name}}DedupeIterator) Next() *{{.Name}}Point { } } +// {{.name}}ReaderIterator represents an iterator that streams from a reader. +type {{.name}}ReaderIterator struct { + r io.Reader + dec *{{.Name}}PointDecoder + first *{{.Name}}Point +} + +// new{{.Name}}ReaderIterator returns a new instance of {{.name}}ReaderIterator. +func new{{.Name}}ReaderIterator(r io.Reader, first *{{.Name}}Point) *{{.name}}ReaderIterator { + return &{{.name}}ReaderIterator{ + r: r, + dec: New{{.Name}}PointDecoder(r), + first: first, + } +} + +// Close closes the underlying reader, if applicable. +func (itr *{{.name}}ReaderIterator) Close() error { + if r, ok := itr.r.(io.ReadCloser); ok { + return r.Close() + } + return nil +} + +// Next returns the next point from the iterator. +func (itr *{{.name}}ReaderIterator) Next() *{{.Name}}Point { + // Send first point if it hasn't been sent yet. + if itr.first != nil { + p := itr.first + itr.first = nil + return p + } + + // OPTIMIZE(benbjohnson): Reuse point on iterator. + + // Unmarshal next point. + p := &{{.Name}}Point{} + if err := itr.dec.Decode{{.Name}}Point(p); err == io.EOF { + return nil + } else if err != nil { + log.Printf("error reading iterator point: %s", err) + return nil + } + return p +} + +{{end}} + + +// IteratorEncoder is an encoder for encoding an iterator's points to w. +type IteratorEncoder struct { + w io.Writer +} + +// NewIteratorEncoder encodes an iterator's points to w. +func NewIteratorEncoder(w io.Writer) *IteratorEncoder { + return &IteratorEncoder{w: w} +} + +// Encode encodes and writes all of itr's points to the underlying writer. +func (enc *IteratorEncoder) EncodeIterator(itr Iterator) error { + switch itr := itr.(type) { + case FloatIterator: + return enc.encodeFloatIterator(itr) + case IntegerIterator: + return enc.encodeIntegerIterator(itr) + case StringIterator: + return enc.encodeStringIterator(itr) + case BooleanIterator: + return enc.encodeBooleanIterator(itr) + default: + panic(fmt.Sprintf("unsupported iterator for encoder: %T", itr)) + } +} + +{{range .}} +// encode{{.Name}}Iterator encodes all points from itr to the underlying writer. +func (enc *IteratorEncoder) encode{{.Name}}Iterator(itr {{.Name}}Iterator) error { + penc := New{{.Name}}PointEncoder(enc.w) + for { + // Retrieve the next point from the iterator. + p := itr.Next() + if p == nil { + return nil + } + + // Write the point to the point encoder. + if err := penc.Encode{{.Name}}Point(p); err != nil { + return err + } + } +} + {{end}} diff --git a/influxql/iterator.go b/influxql/iterator.go index 756858a136..c00bdf37f8 100644 --- a/influxql/iterator.go +++ b/influxql/iterator.go @@ -3,8 +3,13 @@ package influxql import ( "errors" "fmt" + "io" + "sort" "sync" "time" + + "github.com/gogo/protobuf/proto" + "github.com/influxdata/influxdb/influxql/internal" ) // ErrUnknownCall is returned when operating on an unknown function call. @@ -390,6 +395,27 @@ func drainIterator(itr Iterator) { } } +// NewReaderIterator returns an iterator that streams from a reader. +func NewReaderIterator(r io.Reader) (Iterator, error) { + var p Point + if err := NewPointDecoder(r).DecodePoint(&p); err != nil { + return nil, err + } + + switch p := p.(type) { + case *FloatPoint: + return newFloatReaderIterator(r, p), nil + case *IntegerPoint: + return newIntegerReaderIterator(r, p), nil + case *StringPoint: + return newStringReaderIterator(r, p), nil + case *BooleanPoint: + return newBooleanReaderIterator(r, p), nil + default: + panic(fmt.Sprintf("unsupported point for reader iterator: %T", p)) + } +} + // IteratorCreator represents an interface for objects that can create Iterators. type IteratorCreator interface { // Creates a simple iterator for use in an InfluxQL query. @@ -402,6 +428,104 @@ type IteratorCreator interface { SeriesKeys(opt IteratorOptions) (SeriesList, error) } +// IteratorCreators represents a list of iterator creators. +type IteratorCreators []IteratorCreator + +// Close closes all iterator creators that implement io.Closer. +func (a IteratorCreators) Close() error { + for _, ic := range a { + if ic, ok := ic.(io.Closer); ok { + ic.Close() + } + } + return nil +} + +// CreateIterator returns a single combined iterator from multiple iterator creators. +func (a IteratorCreators) CreateIterator(opt IteratorOptions) (Iterator, error) { + // Create iterators for each shard. + // Ensure that they are closed if an error occurs. + itrs := make([]Iterator, 0, len(a)) + if err := func() error { + for _, ic := range a { + itr, err := ic.CreateIterator(opt) + if err != nil { + return err + } + itrs = append(itrs, itr) + } + return nil + }(); err != nil { + Iterators(itrs).Close() + return nil, err + } + + // Merge into a single iterator. + if opt.MergeSorted() { + return NewSortedMergeIterator(itrs, opt), nil + } + + itr := NewMergeIterator(itrs, opt) + if opt.Expr != nil { + if expr, ok := opt.Expr.(*Call); ok && expr.Name == "count" { + opt.Expr = &Call{ + Name: "sum", + Args: expr.Args, + } + } + } + return NewCallIterator(itr, opt) +} + +// FieldDimensions returns unique fields and dimensions from multiple iterator creators. +func (a IteratorCreators) FieldDimensions(sources Sources) (fields, dimensions map[string]struct{}, err error) { + fields = make(map[string]struct{}) + dimensions = make(map[string]struct{}) + + for _, ic := range a { + f, d, err := ic.FieldDimensions(sources) + if err != nil { + return nil, nil, err + } + for k := range f { + fields[k] = struct{}{} + } + for k := range d { + dimensions[k] = struct{}{} + } + } + return +} + +// SeriesKeys returns a list of series in all iterator creators in a. +// If a series exists in multiple creators in a, all instances will be combined +// into a single Series by calling Combine on it. +func (a IteratorCreators) SeriesKeys(opt IteratorOptions) (SeriesList, error) { + seriesMap := make(map[string]Series) + for _, sh := range a { + series, err := sh.SeriesKeys(opt) + if err != nil { + return nil, err + } + + for _, s := range series { + cur, ok := seriesMap[s.ID()] + if ok { + cur.Combine(&s) + } else { + seriesMap[s.ID()] = s + } + } + } + + seriesList := make([]Series, 0, len(seriesMap)) + for _, s := range seriesMap { + seriesList = append(seriesList, s) + } + sort.Sort(SeriesList(seriesList)) + return SeriesList(seriesList), nil +} + // IteratorOptions is an object passed to CreateIterator to specify creation options. type IteratorOptions struct { // Expression to iterate for. @@ -547,6 +671,118 @@ func (opt IteratorOptions) DerivativeInterval() Interval { return Interval{Duration: time.Second} } +// MarshalBinary encodes opt into a binary format. +func (opt *IteratorOptions) MarshalBinary() ([]byte, error) { + return proto.Marshal(encodeIteratorOptions(opt)) +} + +// UnmarshalBinary decodes from a binary format in to opt. +func (opt *IteratorOptions) UnmarshalBinary(buf []byte) error { + var pb internal.IteratorOptions + if err := proto.Unmarshal(buf, &pb); err != nil { + return err + } + + other, err := decodeIteratorOptions(&pb) + if err != nil { + return err + } + *opt = *other + + return nil +} + +func encodeIteratorOptions(opt *IteratorOptions) *internal.IteratorOptions { + pb := &internal.IteratorOptions{ + Aux: opt.Aux, + Interval: encodeInterval(opt.Interval), + Dimensions: opt.Dimensions, + Fill: proto.Int32(int32(opt.Fill)), + StartTime: proto.Int64(opt.StartTime), + EndTime: proto.Int64(opt.EndTime), + Ascending: proto.Bool(opt.Ascending), + Limit: proto.Int64(int64(opt.Limit)), + Offset: proto.Int64(int64(opt.Offset)), + SLimit: proto.Int64(int64(opt.SLimit)), + SOffset: proto.Int64(int64(opt.SOffset)), + Dedupe: proto.Bool(opt.Dedupe), + } + + // Set expression, if set. + if opt.Expr != nil { + pb.Expr = proto.String(opt.Expr.String()) + } + + // Convert and encode sources to measurements. + sources := make([]*internal.Measurement, len(opt.Sources)) + for i, source := range opt.Sources { + mm := source.(*Measurement) + sources[i] = encodeMeasurement(mm) + } + pb.Sources = sources + + // Fill value can only be a number. Set it if available. + if v, ok := opt.FillValue.(float64); ok { + pb.FillValue = proto.Float64(v) + } + + // Set condition, if set. + if opt.Condition != nil { + pb.Condition = proto.String(opt.Condition.String()) + } + + return pb +} + +func decodeIteratorOptions(pb *internal.IteratorOptions) (*IteratorOptions, error) { + opt := &IteratorOptions{ + Aux: pb.GetAux(), + Interval: decodeInterval(pb.GetInterval()), + Dimensions: pb.GetDimensions(), + Fill: FillOption(pb.GetFill()), + FillValue: pb.GetFillValue(), + StartTime: pb.GetStartTime(), + EndTime: pb.GetEndTime(), + Ascending: pb.GetAscending(), + Limit: int(pb.GetLimit()), + Offset: int(pb.GetOffset()), + SLimit: int(pb.GetSLimit()), + SOffset: int(pb.GetSOffset()), + Dedupe: pb.GetDedupe(), + } + + // Set expression, if set. + if pb.Expr != nil { + expr, err := ParseExpr(pb.GetExpr()) + if err != nil { + return nil, err + } + opt.Expr = expr + } + + // Convert and encode sources to measurements. + sources := make([]Source, len(pb.GetSources())) + for i, source := range pb.GetSources() { + mm, err := decodeMeasurement(source) + if err != nil { + return nil, err + } + sources[i] = mm + } + opt.Sources = sources + + // Set condition, if set. + if pb.Condition != nil { + expr, err := ParseExpr(pb.GetCondition()) + if err != nil { + return nil, err + } + opt.Condition = expr + } + + return opt, nil +} + // selectInfo represents an object that stores info about select fields. type selectInfo struct { calls map[*Call]struct{} @@ -623,6 +859,20 @@ type Interval struct { // IsZero returns true if the interval has no duration. func (i Interval) IsZero() bool { return i.Duration == 0 } +func encodeInterval(i Interval) *internal.Interval { + return &internal.Interval{ + Duration: proto.Int64(i.Duration.Nanoseconds()), + Offset: proto.Int64(i.Offset.Nanoseconds()), + } +} + +func decodeInterval(pb *internal.Interval) Interval { + return Interval{ + Duration: time.Duration(pb.GetDuration()), + Offset: time.Duration(pb.GetOffset()), + } +} + // reduceOptions represents options for performing reductions on windows of points. type reduceOptions struct { startTime int64 diff --git a/influxql/iterator_test.go b/influxql/iterator_test.go index 0c38d22765..3849781440 100644 --- a/influxql/iterator_test.go +++ b/influxql/iterator_test.go @@ -4,6 +4,8 @@ import ( "fmt" "math" "math/rand" + "reflect" + "regexp" "testing" "time" @@ -860,6 +862,70 @@ func TestIteratorOptions_DerivativeInterval_Call(t *testing.T) { } } +// Ensure iterator options can be marshaled to and from a binary format. +func TestIteratorOptions_MarshalBinary(t *testing.T) { + opt := &influxql.IteratorOptions{ + Expr: MustParseExpr("count(value)"), + Aux: []string{"a", "b", "c"}, + Sources: []influxql.Source{ + &influxql.Measurement{Database: "db0", RetentionPolicy: "rp0", Name: "mm0"}, + }, + Interval: influxql.Interval{ + Duration: 1 * time.Hour, + Offset: 20 * time.Minute, + }, + Dimensions: []string{"region", "host"}, + Fill: influxql.NumberFill, + FillValue: float64(100), + Condition: MustParseExpr(`foo = 'bar'`), + StartTime: 1000, + EndTime: 2000, + Ascending: true, + Limit: 100, + Offset: 200, + SLimit: 300, + SOffset: 400, + Dedupe: true, + } + + // Marshal to binary. + buf, err := opt.MarshalBinary() + if err != nil { + t.Fatal(err) + } + + // Unmarshal back to an object. + var other influxql.IteratorOptions + if err := other.UnmarshalBinary(buf); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(&other, opt) { + t.Fatalf("unexpected options: %s", spew.Sdump(other)) + } +} + +// Ensure iterator options with a regex measurement can be marshaled. +func TestIteratorOptions_MarshalBinary_Measurement_Regex(t *testing.T) { + opt := &influxql.IteratorOptions{ + Sources: []influxql.Source{ + &influxql.Measurement{Database: "db1", RetentionPolicy: "rp2", Regex: &influxql.RegexLiteral{Val: regexp.MustCompile(`series.+`)}}, + }, + } + + // Marshal to binary. + buf, err := opt.MarshalBinary() + if err != nil { + t.Fatal(err) + } + + // Unmarshal back to an object. + var other influxql.IteratorOptions + if err := other.UnmarshalBinary(buf); err != nil { + t.Fatal(err) + } else if v := other.Sources[0].(*influxql.Measurement).Regex.Val.String(); v != `/series.+/` { + t.Fatalf("unexpected measurement regex: %s", v) + } +} + // IteratorCreator is a mockable implementation of SelectStatementExecutor.IteratorCreator. type IteratorCreator struct { CreateIteratorFn func(opt influxql.IteratorOptions) (influxql.Iterator, error) diff --git a/influxql/point.gen.go b/influxql/point.gen.go index 74edb74ab9..517bd3e13e 100644 --- a/influxql/point.gen.go +++ b/influxql/point.gen.go @@ -7,6 +7,9 @@ package influxql import ( + "encoding/binary" + "io" + "github.com/gogo/protobuf/proto" "github.com/influxdata/influxdb/influxql/internal" ) @@ -123,6 +126,70 @@ func floatPointsSortBy(points []FloatPoint, cmp func(a, b *FloatPoint) bool) *fl } } +// NewFloatPointEncoder encodes FloatPoint points to a writer. +type FloatPointEncoder struct { + w io.Writer +} + +// NewFloatPointEncoder returns a new instance of FloatPointEncoder that writes to w. +func NewFloatPointEncoder(w io.Writer) *FloatPointEncoder { + return &FloatPointEncoder{w: w} +} + +// EncodeFloatPoint marshals and writes p to the underlying writer. +func (enc *FloatPointEncoder) EncodeFloatPoint(p *FloatPoint) error { + // Marshal to bytes. + buf, err := proto.Marshal(encodeFloatPoint(p)) + if err != nil { + return err + } + + // Write the length. + if err := binary.Write(enc.w, binary.BigEndian, uint32(len(buf))); err != nil { + return err + } + + // Write the encoded point. + if _, err := enc.w.Write(buf); err != nil { + return err + } + return nil +} + +// NewFloatPointDecoder decodes FloatPoint points from a reader. +type FloatPointDecoder struct { + r io.Reader +} + +// NewFloatPointDecoder returns a new instance of FloatPointDecoder that reads from r. +func NewFloatPointDecoder(r io.Reader) *FloatPointDecoder { + return &FloatPointDecoder{r: r} +} + +// DecodeFloatPoint reads from the underlying reader and unmarshals into p. +func (dec *FloatPointDecoder) DecodeFloatPoint(p *FloatPoint) error { + // Read length. + var sz uint32 + if err := binary.Read(dec.r, binary.BigEndian, &sz); err != nil { + return err + } + + // Read point data. + buf := make([]byte, sz) + if _, err := io.ReadFull(dec.r, buf); err != nil { + return err + } + + // Unmarshal into point. + var pb internal.Point + if err := proto.Unmarshal(buf, &pb); err != nil { + return err + } + *p = *decodeFloatPoint(&pb) + + return nil +} + // IntegerPoint represents a point with a int64 value. // DO NOT ADD ADDITIONAL FIELDS TO THIS STRUCT. // See TestPoint_Fields in influxql/point_test.go for more details. @@ -235,6 +302,70 @@ func integerPointsSortBy(points []IntegerPoint, cmp func(a, b *IntegerPoint) boo } } +// NewIntegerPointEncoder encodes IntegerPoint points to a writer. +type IntegerPointEncoder struct { + w io.Writer +} + +// NewIntegerPointEncoder returns a new instance of IntegerPointEncoder that writes to w. +func NewIntegerPointEncoder(w io.Writer) *IntegerPointEncoder { + return &IntegerPointEncoder{w: w} +} + +// EncodeIntegerPoint marshals and writes p to the underlying writer. +func (enc *IntegerPointEncoder) EncodeIntegerPoint(p *IntegerPoint) error { + // Marshal to bytes. + buf, err := proto.Marshal(encodeIntegerPoint(p)) + if err != nil { + return err + } + + // Write the length. + if err := binary.Write(enc.w, binary.BigEndian, uint32(len(buf))); err != nil { + return err + } + + // Write the encoded point. + if _, err := enc.w.Write(buf); err != nil { + return err + } + return nil +} + +// NewIntegerPointDecoder decodes IntegerPoint points from a reader. +type IntegerPointDecoder struct { + r io.Reader +} + +// NewIntegerPointDecoder returns a new instance of IntegerPointDecoder that reads from r. +func NewIntegerPointDecoder(r io.Reader) *IntegerPointDecoder { + return &IntegerPointDecoder{r: r} +} + +// DecodeIntegerPoint reads from the underlying reader and unmarshals into p. +func (dec *IntegerPointDecoder) DecodeIntegerPoint(p *IntegerPoint) error { + // Read length. + var sz uint32 + if err := binary.Read(dec.r, binary.BigEndian, &sz); err != nil { + return err + } + + // Read point data. + buf := make([]byte, sz) + if _, err := io.ReadFull(dec.r, buf); err != nil { + return err + } + + // Unmarshal into point. + var pb internal.Point + if err := proto.Unmarshal(buf, &pb); err != nil { + return err + } + *p = *decodeIntegerPoint(&pb) + + return nil +} + // StringPoint represents a point with a string value. // DO NOT ADD ADDITIONAL FIELDS TO THIS STRUCT. // See TestPoint_Fields in influxql/point_test.go for more details. @@ -347,6 +478,70 @@ func stringPointsSortBy(points []StringPoint, cmp func(a, b *StringPoint) bool) } } +// NewStringPointEncoder encodes StringPoint points to a writer. +type StringPointEncoder struct { + w io.Writer +} + +// NewStringPointEncoder returns a new instance of StringPointEncoder that writes to w. +func NewStringPointEncoder(w io.Writer) *StringPointEncoder { + return &StringPointEncoder{w: w} +} + +// EncodeStringPoint marshals and writes p to the underlying writer. +func (enc *StringPointEncoder) EncodeStringPoint(p *StringPoint) error { + // Marshal to bytes. + buf, err := proto.Marshal(encodeStringPoint(p)) + if err != nil { + return err + } + + // Write the length. + if err := binary.Write(enc.w, binary.BigEndian, uint32(len(buf))); err != nil { + return err + } + + // Write the encoded point. + if _, err := enc.w.Write(buf); err != nil { + return err + } + return nil +} + +// NewStringPointDecoder decodes StringPoint points from a reader. +type StringPointDecoder struct { + r io.Reader +} + +// NewStringPointDecoder returns a new instance of StringPointDecoder that reads from r. +func NewStringPointDecoder(r io.Reader) *StringPointDecoder { + return &StringPointDecoder{r: r} +} + +// DecodeStringPoint reads from the underlying reader and unmarshals into p. +func (dec *StringPointDecoder) DecodeStringPoint(p *StringPoint) error { + // Read length. + var sz uint32 + if err := binary.Read(dec.r, binary.BigEndian, &sz); err != nil { + return err + } + + // Read point data. + buf := make([]byte, sz) + if _, err := io.ReadFull(dec.r, buf); err != nil { + return err + } + + // Unmarshal into point. + var pb internal.Point + if err := proto.Unmarshal(buf, &pb); err != nil { + return err + } + *p = *decodeStringPoint(&pb) + + return nil +} + // BooleanPoint represents a point with a bool value. // DO NOT ADD ADDITIONAL FIELDS TO THIS STRUCT. // See TestPoint_Fields in influxql/point_test.go for more details. @@ -458,3 +653,67 @@ func booleanPointsSortBy(points []BooleanPoint, cmp func(a, b *BooleanPoint) boo cmp: cmp, } } + +// NewBooleanPointEncoder encodes BooleanPoint points to a writer. +type BooleanPointEncoder struct { + w io.Writer +} + +// NewBooleanPointEncoder returns a new instance of BooleanPointEncoder that writes to w. +func NewBooleanPointEncoder(w io.Writer) *BooleanPointEncoder { + return &BooleanPointEncoder{w: w} +} + +// EncodeBooleanPoint marshals and writes p to the underlying writer. +func (enc *BooleanPointEncoder) EncodeBooleanPoint(p *BooleanPoint) error { + // Marshal to bytes. + buf, err := proto.Marshal(encodeBooleanPoint(p)) + if err != nil { + return err + } + + // Write the length. + if err := binary.Write(enc.w, binary.BigEndian, uint32(len(buf))); err != nil { + return err + } + + // Write the encoded point. + if _, err := enc.w.Write(buf); err != nil { + return err + } + return nil +} + +// NewBooleanPointDecoder decodes BooleanPoint points from a reader. +type BooleanPointDecoder struct { + r io.Reader +} + +// NewBooleanPointDecoder returns a new instance of BooleanPointDecoder that reads from r. +func NewBooleanPointDecoder(r io.Reader) *BooleanPointDecoder { + return &BooleanPointDecoder{r: r} +} + +// DecodeBooleanPoint reads from the underlying reader and unmarshals into p. +func (dec *BooleanPointDecoder) DecodeBooleanPoint(p *BooleanPoint) error { + // Read length. + var sz uint32 + if err := binary.Read(dec.r, binary.BigEndian, &sz); err != nil { + return err + } + + // Read point data. + buf := make([]byte, sz) + if _, err := io.ReadFull(dec.r, buf); err != nil { + return err + } + + // Unmarshal into point. + var pb internal.Point + if err := proto.Unmarshal(buf, &pb); err != nil { + return err + } + *p = *decodeBooleanPoint(&pb) + + return nil +} diff --git a/influxql/point.gen.go.tmpl b/influxql/point.gen.go.tmpl index 0d830ec603..648dc9d96d 100644 --- a/influxql/point.gen.go.tmpl +++ b/influxql/point.gen.go.tmpl @@ -1,6 +1,9 @@ package influxql import ( + "encoding/binary" + "io" + "github.com/gogo/protobuf/proto" "github.com/influxdata/influxdb/influxql/internal" ) @@ -129,4 +132,69 @@ func {{.name}}PointsSortBy(points []{{.Name}}Point, cmp func(a, b *{{.Name}}Poin } } +// New{{.Name}}PointEncoder encodes {{.Name}}Point points to a writer. +type {{.Name}}PointEncoder struct { + w io.Writer +} + +// New{{.Name}}PointEncoder returns a new instance of {{.Name}}PointEncoder that writes to w. +func New{{.Name}}PointEncoder(w io.Writer) *{{.Name}}PointEncoder { + return &{{.Name}}PointEncoder{w: w} +} + +// Encode{{.Name}}Point marshals and writes p to the underlying writer. +func (enc *{{.Name}}PointEncoder) Encode{{.Name}}Point(p *{{.Name}}Point) error { + // Marshal to bytes. + buf, err := proto.Marshal(encode{{.Name}}Point(p)) + if err != nil { + return err + } + + // Write the length. + if err := binary.Write(enc.w, binary.BigEndian, uint32(len(buf))); err != nil { + return err + } + + // Write the encoded point. + if _, err := enc.w.Write(buf); err != nil { + return err + } + return nil +} + + +// New{{.Name}}PointDecoder decodes {{.Name}}Point points from a reader. +type {{.Name}}PointDecoder struct { + r io.Reader +} + +// New{{.Name}}PointDecoder returns a new instance of {{.Name}}PointDecoder that reads from r. +func New{{.Name}}PointDecoder(r io.Reader) *{{.Name}}PointDecoder { + return &{{.Name}}PointDecoder{r: r} +} + +// Decode{{.Name}}Point reads from the underlying reader and unmarshals into p. +func (dec *{{.Name}}PointDecoder) Decode{{.Name}}Point(p *{{.Name}}Point) error { + // Read length. + var sz uint32 + if err := binary.Read(dec.r, binary.BigEndian, &sz); err != nil { + return err + } + + // Read point data. + buf := make([]byte, sz) + if _, err := io.ReadFull(dec.r, buf); err != nil { + return err + } + + // Unmarshal into point. + var pb internal.Point + if err := proto.Unmarshal(buf, &pb); err != nil { + return err + } + *p = *decode{{.Name}}Point(&pb) + + return nil +} + {{end}} diff --git a/influxql/point.go b/influxql/point.go index ba64b7351e..fc098b8ced 100644 --- a/influxql/point.go +++ b/influxql/point.go @@ -1,6 +1,9 @@ package influxql import ( + "bytes" + "encoding/binary" + "io" "sort" "github.com/gogo/protobuf/proto" @@ -164,7 +167,26 @@ func encodeTags(m map[string]string) []byte { } // decodeTags parses an identifier into a map of tags. -func decodeTags(id []byte) map[string]string { panic("FIXME: implement") } +func decodeTags(id []byte) map[string]string { + a := bytes.Split(id, []byte{'\x00'}) + + // There must be an even number of segments. + if len(a) > 0 && len(a)%2 == 1 { + a = a[:len(a)-1] + } + + // Return nil if there are no segments. + if len(a) == 0 { + return nil + } + + // Decode key/value tags. + m := make(map[string]string) + for i := 0; i < len(a); i += 2 { + m[string(a[i])] = string(a[i+1]) + } + return m +} func encodeAux(aux []interface{}) []*internal.Aux { pb := make([]*internal.Aux, len(aux)) @@ -227,3 +249,46 @@ func decodeAux(pb []*internal.Aux) []interface{} { } return aux } + +// NewPointDecoder decodes generic points from a reader. +type PointDecoder struct { + r io.Reader +} + +// NewPointDecoder returns a new instance of PointDecoder that reads from r. +func NewPointDecoder(r io.Reader) *PointDecoder { + return &PointDecoder{r: r} +} + +// DecodePoint reads from the underlying reader and unmarshals into p. +func (dec *PointDecoder) DecodePoint(p *Point) error { + // Read length. + var sz uint32 + if err := binary.Read(dec.r, binary.BigEndian, &sz); err != nil { + return err + } + + // Read point data. + buf := make([]byte, sz) + if _, err := io.ReadFull(dec.r, buf); err != nil { + return err + } + + // Unmarshal into point. + var pb internal.Point + if err := proto.Unmarshal(buf, &pb); err != nil { + return err + } + + if pb.IntegerValue != nil { + *p = decodeIntegerPoint(&pb) + } else if pb.StringValue != nil { + *p = decodeStringPoint(&pb) + } else if pb.BooleanValue != nil { + *p = decodeBooleanPoint(&pb) + } else { + *p = decodeFloatPoint(&pb) + } + + return nil +} diff --git a/services/meta/client.go b/services/meta/client.go index f500898020..c93c880e15 100644 --- a/services/meta/client.go +++ b/services/meta/client.go @@ -675,9 +675,9 @@ func (c *Client) ShardGroupsByTimeRange(database, policy string, min, max time.T return groups, nil } -// ShardIDsByTimeRange returns a slice of shards that may contain data in the time range. -func (c *Client) ShardIDsByTimeRange(sources influxql.Sources, tmin, tmax time.Time) (a []uint64, err error) { - m := make(map[uint64]struct{}) +// ShardsByTimeRange returns a slice of shards that may contain data in the time range. +func (c *Client) ShardsByTimeRange(sources influxql.Sources, tmin, tmax time.Time) (a []ShardInfo, err error) { + m := make(map[*ShardInfo]struct{}) for _, src := range sources { mm, ok := src.(*influxql.Measurement) if !ok { @@ -689,15 +689,15 @@ func (c *Client) ShardIDsByTimeRange(sources influxql.Sources, tmin, tmax time.T return nil, err } for _, g := range groups { - for _, sh := range g.Shards { - m[sh.ID] = struct{}{} + for i := range g.Shards { + m[&g.Shards[i]] = struct{}{} } } } - a = make([]uint64, 0, len(m)) - for k := range m { - a = append(a, k) + a = make([]ShardInfo, 0, len(m)) + for sh := range m { + a = append(a, *sh) } return a, nil diff --git a/tsdb/shard.go b/tsdb/shard.go index 941c2dd741..3fdfb97679 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -384,9 +384,30 @@ func (s *Shard) WriteTo(w io.Writer) (int64, error) { // CreateIterator returns an iterator for the data in the shard. func (s *Shard) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) { + if influxql.Sources(opt.Sources).HasSystemSource() { + return s.createSystemIterator(opt) + } return s.engine.CreateIterator(opt) } +// createSystemIterator returns an iterator for a system source. +func (s *Shard) createSystemIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) { + // Only support a single system source. + if len(opt.Sources) > 1 { + return nil, errors.New("cannot select from multiple system sources") + } + + m := opt.Sources[0].(*influxql.Measurement) + switch m.Name { + case "_measurements": + return NewMeasurementIterator(s, opt) + case "_tagKeys": + return NewTagKeysIterator(s, opt) + default: + return nil, fmt.Errorf("unknown system source: %s", m.Name) + } +} + // FieldDimensions returns unique sets of fields and dimensions across a list of sources. func (s *Shard) FieldDimensions(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) { fields = make(map[string]struct{}) @@ -416,78 +437,6 @@ func (s *Shard) FieldDimensions(sources influxql.Sources) (fields, dimensions ma // SeriesKeys returns a list of series in the shard. func (s *Shard) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error) { - return s.engine.SeriesKeys(opt) -} - -// Shards represents a sortable list of shards. -type Shards []*Shard - -func (a Shards) Len() int { return len(a) } -func (a Shards) Less(i, j int) bool { return a[i].id < a[j].id } -func (a Shards) Swap(i, j int) { a[i], a[j] = a[j], a[i] } - -// CreateIterator returns a single combined iterator for the shards. -func (a Shards) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) { - if influxql.Sources(opt.Sources).HasSystemSource() { - return a.createSystemIterator(opt) - } - - // Create iterators for each shard. - // Ensure that they are closed if an error occurs. - itrs := make([]influxql.Iterator, 0, len(a)) - if err := func() error { - for _, sh := range a { - itr, err := sh.CreateIterator(opt) - if err != nil { - return err - } - itrs = append(itrs, itr) - } - return nil - }(); err != nil { - influxql.Iterators(itrs).Close() - return nil, err - } - - // Merge into a single iterator. - if opt.MergeSorted() { - return influxql.NewSortedMergeIterator(itrs, opt), nil - } - - itr := influxql.NewMergeIterator(itrs, opt) - if opt.Expr != nil { - if expr, ok := opt.Expr.(*influxql.Call); ok && expr.Name == "count" { - opt.Expr = &influxql.Call{ - Name: "sum", - Args: expr.Args, - } - } - } - return influxql.NewCallIterator(itr, opt) -} - -// createSystemIterator returns an iterator for a system source. -func (a Shards) createSystemIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) { - // Only support a single system source. - if len(opt.Sources) > 1 { - return nil, errors.New("cannot select from multiple system sources") - } - - m := opt.Sources[0].(*influxql.Measurement) - switch m.Name { - case "_measurements": - return a.createMeasurementsIterator(opt) - case "_tagKeys": - return a.createTagKeysIterator(opt) - default: - return nil, fmt.Errorf("unknown system source: %s", m.Name) - } -} - -// SeriesKeys returns a list of series in in all shards in a. If a series -// exists in multiple shards in a, all instances will be combined into a single -// Series by calling Combine on it. -func (a Shards) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error) { if influxql.Sources(opt.Sources).HasSystemSource() { // Only support a single system source. if len(opt.Sources) > 1 { @@ -497,88 +446,15 @@ func (a Shards) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, e return []influxql.Series{{Aux: []influxql.DataType{influxql.String}}}, nil } - seriesMap := make(map[string]influxql.Series) - for _, sh := range a { - series, err := sh.SeriesKeys(opt) - if err != nil { - return nil, err - } - - for _, s := range series { - cur, ok := seriesMap[s.ID()] - if ok { - cur.Combine(&s) - } else { - seriesMap[s.ID()] = s - } - } - } - - seriesList := make([]influxql.Series, 0, len(seriesMap)) - for _, s := range seriesMap { - seriesList = append(seriesList, s) - } - sort.Sort(influxql.SeriesList(seriesList)) - return influxql.SeriesList(seriesList), nil + return s.engine.SeriesKeys(opt) } -// createMeasurementsIterator returns an iterator for all measurement names. -func (a Shards) createMeasurementsIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) { - itrs := make([]influxql.Iterator, 0, len(a)) - if err := func() error { - for _, sh := range a { - itr, err := NewMeasurementIterator(sh, opt) - if err != nil { - return err - } - itrs = append(itrs, itr) - } - return nil - }(); err != nil { - influxql.Iterators(itrs).Close() - return nil, err - } - return influxql.NewMergeIterator(itrs, opt), nil -} +// Shards represents a sortable list of shards. +type Shards []*Shard -// createTagKeysIterator returns an iterator for all tag keys across measurements. -func (a Shards) createTagKeysIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) { - itrs := make([]influxql.Iterator, 0, len(a)) - if err := func() error { - for _, sh := range a { - itr, err := NewTagKeysIterator(sh, opt) - if err != nil { - return err - } - itrs = append(itrs, itr) - } - return nil - }(); err != nil { - influxql.Iterators(itrs).Close() - return nil, err - } - return influxql.NewMergeIterator(itrs, opt), nil -} - -// FieldDimensions returns the unique fields and dimensions across a list of sources. -func (a Shards) FieldDimensions(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) { - fields = make(map[string]struct{}) - dimensions = make(map[string]struct{}) - - for _, sh := range a { - f, d, err := sh.FieldDimensions(sources) - if err != nil { - return nil, nil, err - } - for k := range f { - fields[k] = struct{}{} - } - for k := range d { - dimensions[k] = struct{}{} - } - } - return -} +func (a Shards) Len() int { return len(a) } +func (a Shards) Less(i, j int) bool { return a[i].id < a[j].id } +func (a Shards) Swap(i, j int) { a[i], a[j] = a[j], a[i] } // MeasurementFields holds the fields of a measurement and their codec. type MeasurementFields struct { @@ -909,6 +785,24 @@ func (f *FieldCodec) FieldByName(name string) *Field { return f.fieldsByName[name] } +// shardIteratorCreator creates iterators for a local shard. +// This simply wraps the shard so that Close() does not close the underlying shard. +type shardIteratorCreator struct { + sh *Shard +} + +func (ic *shardIteratorCreator) Close() error { return nil } + +func (ic *shardIteratorCreator) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return ic.sh.CreateIterator(opt) +} +func (ic *shardIteratorCreator) FieldDimensions(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) { + return ic.sh.FieldDimensions(sources) +} +func (ic *shardIteratorCreator) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error) { + return ic.sh.SeriesKeys(opt) +} + // MeasurementIterator represents a string iterator that emits all measurement names in a shard. type MeasurementIterator struct { mms Measurements diff --git a/tsdb/store.go b/tsdb/store.go index f18ec0b08a..b142d13263 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -299,8 +299,16 @@ func (s *Store) deleteShard(shardID uint64) error { return nil } -// DeleteDatabase will close all shards associated with a database and -// remove the directory and files from disk. +// ShardIteratorCreator returns an iterator creator for a shard. +func (s *Store) ShardIteratorCreator(id uint64) influxql.IteratorCreator { + sh := s.Shard(id) + if sh == nil { + return nil + } + return &shardIteratorCreator{sh: sh} +} + +// DeleteDatabase will close all shards associated with a database and remove the directory and files from disk. func (s *Store) DeleteDatabase(name string) error { s.mu.Lock() defer s.mu.Unlock() diff --git a/tsdb/store_test.go b/tsdb/store_test.go index a6ee8f758c..97c45bafb5 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -215,8 +215,15 @@ func TestShards_CreateIterator(t *testing.T) { `cpu,host=serverC value=3 60`, ) + // Retrieve shards and convert to iterator creators. + shards := s.Shards([]uint64{0, 1}) + ics := make(influxql.IteratorCreators, len(shards)) + for i := range ics { + ics[i] = shards[i] + } + // Create iterator. - itr, err := tsdb.Shards(s.Shards([]uint64{0, 1})).CreateIterator(influxql.IteratorOptions{ + itr, err := ics.CreateIterator(influxql.IteratorOptions{ Expr: influxql.MustParseExpr(`value`), Dimensions: []string{"host"}, Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},