add support for remote expansion of regex

This commit moves the `tsdb.Store.ExpandSources()` function onto
the `influxql.IteratorCreator` and provides support for issuing
source expansion across a cluster.
pull/5994/head
Ben Johnson 2016-03-04 11:01:41 -07:00
parent 31a964e890
commit beda072426
15 changed files with 431 additions and 157 deletions

View File

@ -19,23 +19,23 @@ It has these top-level messages:
FieldDimensionsResponse
SeriesKeysRequest
SeriesKeysResponse
ExpandSourcesRequest
ExpandSourcesResponse
*/
package internal
import proto "github.com/gogo/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
type WriteShardRequest struct {
ShardID *uint64 `protobuf:"varint,1,req,name=ShardID" json:"ShardID,omitempty"`
Points [][]byte `protobuf:"bytes,2,rep,name=Points" json:"Points,omitempty"`
Database *string `protobuf:"bytes,3,opt,name=Database" json:"Database,omitempty"`
RetentionPolicy *string `protobuf:"bytes,4,opt,name=RetentionPolicy" json:"RetentionPolicy,omitempty"`
ShardID *uint64 `protobuf:"varint,1,req" json:"ShardID,omitempty"`
Points [][]byte `protobuf:"bytes,2,rep" json:"Points,omitempty"`
Database *string `protobuf:"bytes,3,opt" json:"Database,omitempty"`
RetentionPolicy *string `protobuf:"bytes,4,opt" json:"RetentionPolicy,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -72,8 +72,8 @@ func (m *WriteShardRequest) GetRetentionPolicy() string {
}
type WriteShardResponse struct {
Code *int32 `protobuf:"varint,1,req,name=Code" json:"Code,omitempty"`
Message *string `protobuf:"bytes,2,opt,name=Message" json:"Message,omitempty"`
Code *int32 `protobuf:"varint,1,req" json:"Code,omitempty"`
Message *string `protobuf:"bytes,2,opt" json:"Message,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -96,8 +96,8 @@ func (m *WriteShardResponse) GetMessage() string {
}
type ExecuteStatementRequest struct {
Statement *string `protobuf:"bytes,1,req,name=Statement" json:"Statement,omitempty"`
Database *string `protobuf:"bytes,2,req,name=Database" json:"Database,omitempty"`
Statement *string `protobuf:"bytes,1,req" json:"Statement,omitempty"`
Database *string `protobuf:"bytes,2,req" json:"Database,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -120,8 +120,8 @@ func (m *ExecuteStatementRequest) GetDatabase() string {
}
type ExecuteStatementResponse struct {
Code *int32 `protobuf:"varint,1,req,name=Code" json:"Code,omitempty"`
Message *string `protobuf:"bytes,2,opt,name=Message" json:"Message,omitempty"`
Code *int32 `protobuf:"varint,1,req" json:"Code,omitempty"`
Message *string `protobuf:"bytes,2,opt" json:"Message,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -144,8 +144,8 @@ func (m *ExecuteStatementResponse) GetMessage() string {
}
type CreateIteratorRequest struct {
ShardIDs []uint64 `protobuf:"varint,1,rep,name=ShardIDs" json:"ShardIDs,omitempty"`
Opt []byte `protobuf:"bytes,2,req,name=Opt" json:"Opt,omitempty"`
ShardIDs []uint64 `protobuf:"varint,1,rep" json:"ShardIDs,omitempty"`
Opt []byte `protobuf:"bytes,2,req" json:"Opt,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -168,7 +168,7 @@ func (m *CreateIteratorRequest) GetOpt() []byte {
}
type CreateIteratorResponse struct {
Err *string `protobuf:"bytes,1,opt,name=Err" json:"Err,omitempty"`
Err *string `protobuf:"bytes,1,opt" json:"Err,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -184,8 +184,8 @@ func (m *CreateIteratorResponse) GetErr() string {
}
type FieldDimensionsRequest struct {
ShardIDs []uint64 `protobuf:"varint,1,rep,name=ShardIDs" json:"ShardIDs,omitempty"`
Sources []byte `protobuf:"bytes,2,req,name=Sources" json:"Sources,omitempty"`
ShardIDs []uint64 `protobuf:"varint,1,rep" json:"ShardIDs,omitempty"`
Sources []byte `protobuf:"bytes,2,req" json:"Sources,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -208,9 +208,9 @@ func (m *FieldDimensionsRequest) GetSources() []byte {
}
type FieldDimensionsResponse struct {
Fields []string `protobuf:"bytes,1,rep,name=Fields" json:"Fields,omitempty"`
Dimensions []string `protobuf:"bytes,2,rep,name=Dimensions" json:"Dimensions,omitempty"`
Err *string `protobuf:"bytes,3,opt,name=Err" json:"Err,omitempty"`
Fields []string `protobuf:"bytes,1,rep" json:"Fields,omitempty"`
Dimensions []string `protobuf:"bytes,2,rep" json:"Dimensions,omitempty"`
Err *string `protobuf:"bytes,3,opt" json:"Err,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -240,8 +240,8 @@ func (m *FieldDimensionsResponse) GetErr() string {
}
type SeriesKeysRequest struct {
ShardIDs []uint64 `protobuf:"varint,1,rep,name=ShardIDs" json:"ShardIDs,omitempty"`
Opt []byte `protobuf:"bytes,2,req,name=Opt" json:"Opt,omitempty"`
ShardIDs []uint64 `protobuf:"varint,1,rep" json:"ShardIDs,omitempty"`
Opt []byte `protobuf:"bytes,2,req" json:"Opt,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -264,8 +264,8 @@ func (m *SeriesKeysRequest) GetOpt() []byte {
}
type SeriesKeysResponse struct {
SeriesList []byte `protobuf:"bytes,1,opt,name=SeriesList" json:"SeriesList,omitempty"`
Err *string `protobuf:"bytes,2,opt,name=Err" json:"Err,omitempty"`
SeriesList []byte `protobuf:"bytes,1,opt" json:"SeriesList,omitempty"`
Err *string `protobuf:"bytes,2,opt" json:"Err,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -287,15 +287,53 @@ func (m *SeriesKeysResponse) GetErr() string {
return ""
}
func init() {
proto.RegisterType((*WriteShardRequest)(nil), "internal.WriteShardRequest")
proto.RegisterType((*WriteShardResponse)(nil), "internal.WriteShardResponse")
proto.RegisterType((*ExecuteStatementRequest)(nil), "internal.ExecuteStatementRequest")
proto.RegisterType((*ExecuteStatementResponse)(nil), "internal.ExecuteStatementResponse")
proto.RegisterType((*CreateIteratorRequest)(nil), "internal.CreateIteratorRequest")
proto.RegisterType((*CreateIteratorResponse)(nil), "internal.CreateIteratorResponse")
proto.RegisterType((*FieldDimensionsRequest)(nil), "internal.FieldDimensionsRequest")
proto.RegisterType((*FieldDimensionsResponse)(nil), "internal.FieldDimensionsResponse")
proto.RegisterType((*SeriesKeysRequest)(nil), "internal.SeriesKeysRequest")
proto.RegisterType((*SeriesKeysResponse)(nil), "internal.SeriesKeysResponse")
type ExpandSourcesRequest struct {
ShardIDs []uint64 `protobuf:"varint,1,rep" json:"ShardIDs,omitempty"`
Sources []byte `protobuf:"bytes,2,req" json:"Sources,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *ExpandSourcesRequest) Reset() { *m = ExpandSourcesRequest{} }
func (m *ExpandSourcesRequest) String() string { return proto.CompactTextString(m) }
func (*ExpandSourcesRequest) ProtoMessage() {}
func (m *ExpandSourcesRequest) GetShardIDs() []uint64 {
if m != nil {
return m.ShardIDs
}
return nil
}
func (m *ExpandSourcesRequest) GetSources() []byte {
if m != nil {
return m.Sources
}
return nil
}
type ExpandSourcesResponse struct {
Sources []byte `protobuf:"bytes,1,req" json:"Sources,omitempty"`
Err *string `protobuf:"bytes,2,opt" json:"Err,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *ExpandSourcesResponse) Reset() { *m = ExpandSourcesResponse{} }
func (m *ExpandSourcesResponse) String() string { return proto.CompactTextString(m) }
func (*ExpandSourcesResponse) ProtoMessage() {}
func (m *ExpandSourcesResponse) GetSources() []byte {
if m != nil {
return m.Sources
}
return nil
}
func (m *ExpandSourcesResponse) GetErr() string {
if m != nil && m.Err != nil {
return *m.Err
}
return ""
}
func init() {
}

View File

@ -52,3 +52,12 @@ message SeriesKeysResponse {
optional string Err = 2;
}
message ExpandSourcesRequest {
repeated uint64 ShardIDs = 1;
required bytes Sources = 2;
}
message ExpandSourcesResponse {
required bytes Sources = 1;
optional string Err = 2;
}

View File

@ -415,13 +415,6 @@ func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, c
opt.MinTime = time.Unix(0, 0)
}
// Expand regex sources to their actual source names.
sources, err := e.TSDBStore.ExpandSources(stmt.Sources)
if err != nil {
return err
}
stmt.Sources = sources
// Convert DISTINCT into a call.
stmt.RewriteDistinct()
@ -434,6 +427,15 @@ func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, c
return err
}
// Expand regex sources to their actual source names.
if stmt.Sources.HasRegex() {
sources, err := ic.ExpandSources(stmt.Sources)
if err != nil {
return err
}
stmt.Sources = sources
}
// Rewrite wildcards, if any exist.
tmp, err := stmt.RewriteWildcards(ic)
if err != nil {
@ -1056,6 +1058,30 @@ func (ic *remoteIteratorCreator) SeriesKeys(opt influxql.IteratorOptions) (influ
return resp.SeriesList, resp.Err
}
// ExpandSources expands regex sources on a remote iterator creator.
func (ic *remoteIteratorCreator) ExpandSources(sources influxql.Sources) (influxql.Sources, error) {
conn, err := ic.dialer.DialNode(ic.nodeID)
if err != nil {
return nil, err
}
defer conn.Close()
// Write request.
if err := EncodeTLV(conn, expandSourcesRequestMessage, &ExpandSourcesRequest{
ShardIDs: ic.shardIDs,
Sources: sources,
}); err != nil {
return nil, err
}
// Read the response.
var resp ExpandSourcesResponse
if _, err := DecodeTLV(conn, &resp); err != nil {
return nil, err
}
return resp.Sources, resp.Err
}
// NodeDialer dials connections to a given node.
type NodeDialer struct {
MetaClient MetaClient

View File

@ -275,6 +275,7 @@ type IteratorCreator struct {
CreateIteratorFn func(opt influxql.IteratorOptions) (influxql.Iterator, error)
FieldDimensionsFn func(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error)
SeriesKeysFn func(opt influxql.IteratorOptions) (influxql.SeriesList, error)
ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error)
}
func (ic *IteratorCreator) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) {
@ -289,6 +290,10 @@ func (ic *IteratorCreator) SeriesKeys(opt influxql.IteratorOptions) (influxql.Se
return ic.SeriesKeysFn(opt)
}
func (ic *IteratorCreator) ExpandSources(sources influxql.Sources) (influxql.Sources, error) {
return ic.ExpandSourcesFn(sources)
}
// FloatIterator is a represents an iterator that reads from a slice.
type FloatIterator struct {
Points []influxql.FloatPoint

View File

@ -411,3 +411,72 @@ func (r *SeriesKeysResponse) UnmarshalBinary(data []byte) error {
return nil
}
// ExpandSourcesRequest represents a request to expand regex sources.
type ExpandSourcesRequest struct {
ShardIDs []uint64
Sources influxql.Sources
}
// MarshalBinary encodes r to a binary format.
func (r *ExpandSourcesRequest) MarshalBinary() ([]byte, error) {
buf, err := r.Sources.MarshalBinary()
if err != nil {
return nil, err
}
return proto.Marshal(&internal.ExpandSourcesRequest{
ShardIDs: r.ShardIDs,
Sources: buf,
})
}
// UnmarshalBinary decodes data into r.
func (r *ExpandSourcesRequest) UnmarshalBinary(data []byte) error {
var pb internal.ExpandSourcesRequest
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
r.ShardIDs = pb.GetShardIDs()
if err := r.Sources.UnmarshalBinary(pb.GetSources()); err != nil {
return err
}
return nil
}
// ExpandSourcesResponse represents a response from source expansion.
type ExpandSourcesResponse struct {
Sources influxql.Sources
Err error
}
// MarshalBinary encodes r to a binary format.
func (r *ExpandSourcesResponse) MarshalBinary() ([]byte, error) {
var pb internal.ExpandSourcesResponse
buf, err := r.Sources.MarshalBinary()
if err != nil {
return nil, err
}
pb.Sources = buf
if r.Err != nil {
pb.Err = proto.String(r.Err.Error())
}
return proto.Marshal(&pb)
}
// UnmarshalBinary decodes data into r.
func (r *ExpandSourcesResponse) UnmarshalBinary(data []byte) error {
var pb internal.ExpandSourcesResponse
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
if err := r.Sources.UnmarshalBinary(pb.GetSources()); err != nil {
return err
}
if pb.Err != nil {
r.Err = errors.New(pb.GetErr())
}
return nil
}

View File

@ -38,6 +38,9 @@ const (
seriesKeysReq = "seriesKeysReq"
seriesKeysResp = "seriesKeysResp"
expandSourcesReq = "expandSourcesReq"
expandSourcesResp = "expandSourcesResp"
)
// Service processes data received over raw TCP connections.
@ -196,6 +199,10 @@ func (s *Service) handleConn(conn net.Conn) {
s.statMap.Add(seriesKeysReq, 1)
s.processSeriesKeysRequest(conn)
return
case expandSourcesRequestMessage:
s.statMap.Add(expandSourcesReq, 1)
s.processExpandSourcesRequest(conn)
return
default:
s.Logger.Printf("cluster service message type not found: %d", typ)
}
@ -439,6 +446,48 @@ func (s *Service) processSeriesKeysRequest(conn net.Conn) {
}
}
func (s *Service) processExpandSourcesRequest(conn net.Conn) {
var sources influxql.Sources
if err := func() error {
// Parse request.
var req ExpandSourcesRequest
if err := DecodeLV(conn, &req); err != nil {
return err
}
// Collect iterator creators for each shard.
ics := make([]influxql.IteratorCreator, 0, len(req.ShardIDs))
for _, shardID := range req.ShardIDs {
ic := s.TSDBStore.ShardIteratorCreator(shardID)
if ic == nil {
return nil
}
ics = append(ics, ic)
}
// Expand sources from all shards.
a, err := influxql.IteratorCreators(ics).ExpandSources(req.Sources)
if err != nil {
return err
}
sources = a
return nil
}(); err != nil {
s.Logger.Printf("error reading ExpandSources request: %s", err)
EncodeTLV(conn, expandSourcesResponseMessage, &ExpandSourcesResponse{Err: err})
return
}
// Encode success response.
if err := EncodeTLV(conn, expandSourcesResponseMessage, &ExpandSourcesResponse{
Sources: sources,
}); err != nil {
s.Logger.Printf("error writing ExpandSources response: %s", err)
return
}
}
// ReadTLV reads a type-length-value record from r.
func ReadTLV(r io.Reader) (byte, []byte, error) {
typ, err := ReadType(r)

View File

@ -24,6 +24,9 @@ const (
seriesKeysRequestMessage
seriesKeysResponseMessage
expandSourcesRequestMessage
expandSourcesResponseMessage
)
// ShardWriter writes a set of points to a shard.

View File

@ -309,6 +309,19 @@ func (a Sources) HasSystemSource() bool {
return false
}
// HasRegex returns true if any of the sources are regex measurements.
func (a Sources) HasRegex() bool {
for _, s := range a {
switch s := s.(type) {
case *Measurement:
if s.Regex != nil {
return true
}
}
}
return false
}
// String returns a string representation of a Sources array.
func (a Sources) String() string {
var buf bytes.Buffer

View File

@ -21,25 +21,23 @@ It has these top-level messages:
package internal
import proto "github.com/gogo/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
type Point struct {
Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"`
Tags *string `protobuf:"bytes,2,req,name=Tags" json:"Tags,omitempty"`
Time *int64 `protobuf:"varint,3,req,name=Time" json:"Time,omitempty"`
Nil *bool `protobuf:"varint,4,req,name=Nil" json:"Nil,omitempty"`
Aux []*Aux `protobuf:"bytes,5,rep,name=Aux" json:"Aux,omitempty"`
Aggregated *uint32 `protobuf:"varint,6,opt,name=Aggregated" json:"Aggregated,omitempty"`
FloatValue *float64 `protobuf:"fixed64,7,opt,name=FloatValue" json:"FloatValue,omitempty"`
IntegerValue *int64 `protobuf:"varint,8,opt,name=IntegerValue" json:"IntegerValue,omitempty"`
StringValue *string `protobuf:"bytes,9,opt,name=StringValue" json:"StringValue,omitempty"`
BooleanValue *bool `protobuf:"varint,10,opt,name=BooleanValue" json:"BooleanValue,omitempty"`
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
Tags *string `protobuf:"bytes,2,req" json:"Tags,omitempty"`
Time *int64 `protobuf:"varint,3,req" json:"Time,omitempty"`
Nil *bool `protobuf:"varint,4,req" json:"Nil,omitempty"`
Aux []*Aux `protobuf:"bytes,5,rep" json:"Aux,omitempty"`
Aggregated *uint32 `protobuf:"varint,6,opt" json:"Aggregated,omitempty"`
FloatValue *float64 `protobuf:"fixed64,7,opt" json:"FloatValue,omitempty"`
IntegerValue *int64 `protobuf:"varint,8,opt" json:"IntegerValue,omitempty"`
StringValue *string `protobuf:"bytes,9,opt" json:"StringValue,omitempty"`
BooleanValue *bool `protobuf:"varint,10,opt" json:"BooleanValue,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -118,11 +116,11 @@ func (m *Point) GetBooleanValue() bool {
}
type Aux struct {
DataType *int32 `protobuf:"varint,1,req,name=DataType" json:"DataType,omitempty"`
FloatValue *float64 `protobuf:"fixed64,2,opt,name=FloatValue" json:"FloatValue,omitempty"`
IntegerValue *int64 `protobuf:"varint,3,opt,name=IntegerValue" json:"IntegerValue,omitempty"`
StringValue *string `protobuf:"bytes,4,opt,name=StringValue" json:"StringValue,omitempty"`
BooleanValue *bool `protobuf:"varint,5,opt,name=BooleanValue" json:"BooleanValue,omitempty"`
DataType *int32 `protobuf:"varint,1,req" json:"DataType,omitempty"`
FloatValue *float64 `protobuf:"fixed64,2,opt" json:"FloatValue,omitempty"`
IntegerValue *int64 `protobuf:"varint,3,opt" json:"IntegerValue,omitempty"`
StringValue *string `protobuf:"bytes,4,opt" json:"StringValue,omitempty"`
BooleanValue *bool `protobuf:"varint,5,opt" json:"BooleanValue,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -166,22 +164,22 @@ func (m *Aux) GetBooleanValue() bool {
}
type IteratorOptions struct {
Expr *string `protobuf:"bytes,1,opt,name=Expr" json:"Expr,omitempty"`
Aux []string `protobuf:"bytes,2,rep,name=Aux" json:"Aux,omitempty"`
Sources []*Measurement `protobuf:"bytes,3,rep,name=Sources" json:"Sources,omitempty"`
Interval *Interval `protobuf:"bytes,4,opt,name=Interval" json:"Interval,omitempty"`
Dimensions []string `protobuf:"bytes,5,rep,name=Dimensions" json:"Dimensions,omitempty"`
Fill *int32 `protobuf:"varint,6,opt,name=Fill" json:"Fill,omitempty"`
FillValue *float64 `protobuf:"fixed64,7,opt,name=FillValue" json:"FillValue,omitempty"`
Condition *string `protobuf:"bytes,8,opt,name=Condition" json:"Condition,omitempty"`
StartTime *int64 `protobuf:"varint,9,opt,name=StartTime" json:"StartTime,omitempty"`
EndTime *int64 `protobuf:"varint,10,opt,name=EndTime" json:"EndTime,omitempty"`
Ascending *bool `protobuf:"varint,11,opt,name=Ascending" json:"Ascending,omitempty"`
Limit *int64 `protobuf:"varint,12,opt,name=Limit" json:"Limit,omitempty"`
Offset *int64 `protobuf:"varint,13,opt,name=Offset" json:"Offset,omitempty"`
SLimit *int64 `protobuf:"varint,14,opt,name=SLimit" json:"SLimit,omitempty"`
SOffset *int64 `protobuf:"varint,15,opt,name=SOffset" json:"SOffset,omitempty"`
Dedupe *bool `protobuf:"varint,16,opt,name=Dedupe" json:"Dedupe,omitempty"`
Expr *string `protobuf:"bytes,1,opt" json:"Expr,omitempty"`
Aux []string `protobuf:"bytes,2,rep" json:"Aux,omitempty"`
Sources []*Measurement `protobuf:"bytes,3,rep" json:"Sources,omitempty"`
Interval *Interval `protobuf:"bytes,4,opt" json:"Interval,omitempty"`
Dimensions []string `protobuf:"bytes,5,rep" json:"Dimensions,omitempty"`
Fill *int32 `protobuf:"varint,6,opt" json:"Fill,omitempty"`
FillValue *float64 `protobuf:"fixed64,7,opt" json:"FillValue,omitempty"`
Condition *string `protobuf:"bytes,8,opt" json:"Condition,omitempty"`
StartTime *int64 `protobuf:"varint,9,opt" json:"StartTime,omitempty"`
EndTime *int64 `protobuf:"varint,10,opt" json:"EndTime,omitempty"`
Ascending *bool `protobuf:"varint,11,opt" json:"Ascending,omitempty"`
Limit *int64 `protobuf:"varint,12,opt" json:"Limit,omitempty"`
Offset *int64 `protobuf:"varint,13,opt" json:"Offset,omitempty"`
SLimit *int64 `protobuf:"varint,14,opt" json:"SLimit,omitempty"`
SOffset *int64 `protobuf:"varint,15,opt" json:"SOffset,omitempty"`
Dedupe *bool `protobuf:"varint,16,opt" json:"Dedupe,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -302,7 +300,7 @@ func (m *IteratorOptions) GetDedupe() bool {
}
type Measurements struct {
Items []*Measurement `protobuf:"bytes,1,rep,name=Items" json:"Items,omitempty"`
Items []*Measurement `protobuf:"bytes,1,rep" json:"Items,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -318,11 +316,11 @@ func (m *Measurements) GetItems() []*Measurement {
}
type Measurement struct {
Database *string `protobuf:"bytes,1,opt,name=Database" json:"Database,omitempty"`
RetentionPolicy *string `protobuf:"bytes,2,opt,name=RetentionPolicy" json:"RetentionPolicy,omitempty"`
Name *string `protobuf:"bytes,3,opt,name=Name" json:"Name,omitempty"`
Regex *string `protobuf:"bytes,4,opt,name=Regex" json:"Regex,omitempty"`
IsTarget *bool `protobuf:"varint,5,opt,name=IsTarget" json:"IsTarget,omitempty"`
Database *string `protobuf:"bytes,1,opt" json:"Database,omitempty"`
RetentionPolicy *string `protobuf:"bytes,2,opt" json:"RetentionPolicy,omitempty"`
Name *string `protobuf:"bytes,3,opt" json:"Name,omitempty"`
Regex *string `protobuf:"bytes,4,opt" json:"Regex,omitempty"`
IsTarget *bool `protobuf:"varint,5,opt" json:"IsTarget,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -366,8 +364,8 @@ func (m *Measurement) GetIsTarget() bool {
}
type Interval struct {
Duration *int64 `protobuf:"varint,1,opt,name=Duration" json:"Duration,omitempty"`
Offset *int64 `protobuf:"varint,2,opt,name=Offset" json:"Offset,omitempty"`
Duration *int64 `protobuf:"varint,1,opt" json:"Duration,omitempty"`
Offset *int64 `protobuf:"varint,2,opt" json:"Offset,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -390,9 +388,9 @@ func (m *Interval) GetOffset() int64 {
}
type Series struct {
Name *string `protobuf:"bytes,1,opt,name=Name" json:"Name,omitempty"`
Tags []byte `protobuf:"bytes,2,opt,name=Tags" json:"Tags,omitempty"`
Aux []uint32 `protobuf:"varint,3,rep,name=Aux" json:"Aux,omitempty"`
Name *string `protobuf:"bytes,1,opt" json:"Name,omitempty"`
Tags []byte `protobuf:"bytes,2,opt" json:"Tags,omitempty"`
Aux []uint32 `protobuf:"varint,3,rep" json:"Aux,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -422,7 +420,7 @@ func (m *Series) GetAux() []uint32 {
}
type SeriesList struct {
Items []*Series `protobuf:"bytes,1,rep,name=Items" json:"Items,omitempty"`
Items []*Series `protobuf:"bytes,1,rep" json:"Items,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -438,12 +436,4 @@ func (m *SeriesList) GetItems() []*Series {
}
func init() {
proto.RegisterType((*Point)(nil), "internal.Point")
proto.RegisterType((*Aux)(nil), "internal.Aux")
proto.RegisterType((*IteratorOptions)(nil), "internal.IteratorOptions")
proto.RegisterType((*Measurements)(nil), "internal.Measurements")
proto.RegisterType((*Measurement)(nil), "internal.Measurement")
proto.RegisterType((*Interval)(nil), "internal.Interval")
proto.RegisterType((*Series)(nil), "internal.Series")
proto.RegisterType((*SeriesList)(nil), "internal.SeriesList")
}

View File

@ -612,6 +612,10 @@ func (itr *floatAuxIterator) SeriesKeys(opt IteratorOptions) (SeriesList, error)
return nil, errors.New("not implemented")
}
func (itr *floatAuxIterator) ExpandSources(sources Sources) (Sources, error) {
return nil, errors.New("not implemented")
}
func (itr *floatAuxIterator) stream() {
for {
// Read next point.
@ -1750,6 +1754,10 @@ func (itr *integerAuxIterator) SeriesKeys(opt IteratorOptions) (SeriesList, erro
return nil, errors.New("not implemented")
}
func (itr *integerAuxIterator) ExpandSources(sources Sources) (Sources, error) {
return nil, errors.New("not implemented")
}
func (itr *integerAuxIterator) stream() {
for {
// Read next point.
@ -2888,6 +2896,10 @@ func (itr *stringAuxIterator) SeriesKeys(opt IteratorOptions) (SeriesList, error
return nil, errors.New("not implemented")
}
func (itr *stringAuxIterator) ExpandSources(sources Sources) (Sources, error) {
return nil, errors.New("not implemented")
}
func (itr *stringAuxIterator) stream() {
for {
// Read next point.
@ -4026,6 +4038,10 @@ func (itr *booleanAuxIterator) SeriesKeys(opt IteratorOptions) (SeriesList, erro
return nil, errors.New("not implemented")
}
func (itr *booleanAuxIterator) ExpandSources(sources Sources) (Sources, error) {
return nil, errors.New("not implemented")
}
func (itr *booleanAuxIterator) stream() {
for {
// Read next point.

View File

@ -611,7 +611,11 @@ func (itr *{{$k.name}}AuxIterator) SeriesKeys(opt IteratorOptions) (SeriesList,
return nil, errors.New("not implemented")
}
func (itr *{{$k.name}}AuxIterator) stream() {
func (itr *{{.name}}AuxIterator) ExpandSources(sources Sources) (Sources, error) {
return nil, errors.New("not implemented")
}
func (itr *{{.name}}AuxIterator) stream() {
for {
// Read next point.
p := itr.input.Next()

View File

@ -444,6 +444,9 @@ type IteratorCreator interface {
// Returns the series keys that will be returned by this iterator.
SeriesKeys(opt IteratorOptions) (SeriesList, error)
// Expands regex sources to all matching sources.
ExpandSources(sources Sources) (Sources, error)
}
// IteratorCreators represents a list of iterator creators.
@ -544,6 +547,42 @@ func (a IteratorCreators) SeriesKeys(opt IteratorOptions) (SeriesList, error) {
return SeriesList(seriesList), nil
}
// ExpandSources expands sources across all iterator creators and returns a unique result.
func (a IteratorCreators) ExpandSources(sources Sources) (Sources, error) {
m := make(map[string]Source)
for _, ic := range a {
expanded, err := ic.ExpandSources(sources)
if err != nil {
return nil, err
}
for _, src := range expanded {
switch src := src.(type) {
case *Measurement:
m[src.String()] = src
default:
return nil, fmt.Errorf("IteratorCreators.ExpandSources: unsupported source type: %T", src)
}
}
}
// Convert set to sorted slice.
names := make([]string, 0, len(m))
for name := range m {
names = append(names, name)
}
sort.Strings(names)
// Convert set to a list of Sources.
sorted := make(Sources, 0, len(m))
for _, name := range names {
sorted = append(sorted, m[name])
}
return sorted, nil
}
// IteratorOptions is an object passed to CreateIterator to specify creation options.
type IteratorOptions struct {
// Expression to iterate for.

View File

@ -959,6 +959,7 @@ type IteratorCreator struct {
CreateIteratorFn func(opt influxql.IteratorOptions) (influxql.Iterator, error)
FieldDimensionsFn func(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error)
SeriesKeysFn func(opt influxql.IteratorOptions) (influxql.SeriesList, error)
ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error)
}
func (ic *IteratorCreator) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) {
@ -1010,6 +1011,10 @@ func (ic *IteratorCreator) SeriesKeys(opt influxql.IteratorOptions) (influxql.Se
return influxql.SeriesList(seriesList), nil
}
func (ic *IteratorCreator) ExpandSources(sources influxql.Sources) (influxql.Sources, error) {
return ic.ExpandSourcesFn(sources)
}
// Test implementation of influxql.FloatIterator
type FloatIterator struct {
Points []influxql.FloatPoint

View File

@ -491,6 +491,53 @@ func (s *Shard) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, e
return s.engine.SeriesKeys(opt)
}
// ExpandSources expands regex sources and removes duplicates.
// NOTE: sources must be normalized (db and rp set) before calling this function.
func (s *Shard) ExpandSources(sources influxql.Sources) (influxql.Sources, error) {
// Use a map as a set to prevent duplicates.
set := map[string]influxql.Source{}
// Iterate all sources, expanding regexes when they're found.
for _, source := range sources {
switch src := source.(type) {
case *influxql.Measurement:
// Add non-regex measurements directly to the set.
if src.Regex == nil {
set[src.String()] = src
continue
}
// Loop over matching measurements.
for _, m := range s.index.MeasurementsByRegex(src.Regex.Val) {
other := &influxql.Measurement{
Database: src.Database,
RetentionPolicy: src.RetentionPolicy,
Name: m.Name,
}
set[other.String()] = other
}
default:
return nil, fmt.Errorf("expandSources: unsupported source type: %T", source)
}
}
// Convert set to sorted slice.
names := make([]string, 0, len(set))
for name := range set {
names = append(names, name)
}
sort.Strings(names)
// Convert set to a list of Sources.
expanded := make(influxql.Sources, 0, len(set))
for _, name := range names {
expanded = append(expanded, set[name])
}
return expanded, nil
}
// Shards represents a sortable list of shards.
type Shards []*Shard
@ -844,6 +891,9 @@ func (ic *shardIteratorCreator) FieldDimensions(sources influxql.Sources) (field
func (ic *shardIteratorCreator) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error) {
return ic.sh.SeriesKeys(opt)
}
func (ic *shardIteratorCreator) ExpandSources(sources influxql.Sources) (influxql.Sources, error) {
return ic.sh.ExpandSources(sources)
}
func NewFieldKeysIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) {
fn := func(m *Measurement) []string {

View File

@ -486,6 +486,15 @@ func (s *Store) ShardRelativePath(id uint64) (string, error) {
// DeleteSeries loops through the local shards and deletes the series data and metadata for the passed in series keys
func (s *Store) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error {
// Expand regex expressions in the FROM clause.
a, err := s.ExpandSources(sources)
if err != nil {
return err
} else if sources != nil && len(sources) != 0 && len(a) == 0 {
return nil
}
sources = a
s.mu.RLock()
defer s.mu.RUnlock()
@ -495,15 +504,6 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi
return nil
}
// Expand regex expressions in the FROM clause.
a, err := s.expandSources(sources)
if err != nil {
return err
} else if sources != nil && len(sources) != 0 && len(a) == 0 {
return nil
}
sources = a
measurements, err := measurementsFromSourcesOrDB(db, sources...)
if err != nil {
return err
@ -566,63 +566,21 @@ func (s *Store) deleteSeries(database string, seriesKeys []string) error {
return nil
}
// ExpandSources expands regex sources and removes duplicates.
// NOTE: sources must be normalized (db and rp set) before calling this function.
// ExpandSources expands sources against all local shards.
func (s *Store) ExpandSources(sources influxql.Sources) (influxql.Sources, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.expandSources(sources)
return s.IteratorCreators().ExpandSources(sources)
}
func (s *Store) expandSources(sources influxql.Sources) (influxql.Sources, error) {
// Use a map as a set to prevent duplicates.
set := map[string]influxql.Source{}
// IteratorCreators returns a set of all local shards as iterator creators.
func (s *Store) IteratorCreators() influxql.IteratorCreators {
s.mu.Lock()
defer s.mu.Unlock()
// Iterate all sources, expanding regexes when they're found.
for _, source := range sources {
switch src := source.(type) {
case *influxql.Measurement:
// Add non-regex measurements directly to the set.
if src.Regex == nil {
set[src.String()] = src
continue
}
// Lookup the database.
db := s.databaseIndexes[src.Database]
if db == nil {
return nil, nil
}
// Loop over matching measurements.
for _, m := range db.MeasurementsByRegex(src.Regex.Val) {
other := &influxql.Measurement{
Database: src.Database,
RetentionPolicy: src.RetentionPolicy,
Name: m.Name,
}
set[other.String()] = other
}
default:
return nil, fmt.Errorf("expandSources: unsupported source type: %T", source)
}
a := make(influxql.IteratorCreators, 0, len(s.shards))
for _, sh := range s.shards {
a = append(a, sh)
}
// Convert set to sorted slice.
names := make([]string, 0, len(set))
for name := range set {
names = append(names, name)
}
sort.Strings(names)
// Convert set to a list of Sources.
expanded := make(influxql.Sources, 0, len(set))
for _, name := range names {
expanded = append(expanded, set[name])
}
return expanded, nil
return a
}
// WriteToShard writes a list of points to a shard identified by its ID.