diff --git a/cluster/internal/data.pb.go b/cluster/internal/data.pb.go index 667cea9fb1..54c44951cd 100644 --- a/cluster/internal/data.pb.go +++ b/cluster/internal/data.pb.go @@ -19,23 +19,23 @@ It has these top-level messages: FieldDimensionsResponse SeriesKeysRequest SeriesKeysResponse + ExpandSourcesRequest + ExpandSourcesResponse */ package internal import proto "github.com/gogo/protobuf/proto" -import fmt "fmt" import math "math" // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal -var _ = fmt.Errorf var _ = math.Inf type WriteShardRequest struct { - ShardID *uint64 `protobuf:"varint,1,req,name=ShardID" json:"ShardID,omitempty"` - Points [][]byte `protobuf:"bytes,2,rep,name=Points" json:"Points,omitempty"` - Database *string `protobuf:"bytes,3,opt,name=Database" json:"Database,omitempty"` - RetentionPolicy *string `protobuf:"bytes,4,opt,name=RetentionPolicy" json:"RetentionPolicy,omitempty"` + ShardID *uint64 `protobuf:"varint,1,req" json:"ShardID,omitempty"` + Points [][]byte `protobuf:"bytes,2,rep" json:"Points,omitempty"` + Database *string `protobuf:"bytes,3,opt" json:"Database,omitempty"` + RetentionPolicy *string `protobuf:"bytes,4,opt" json:"RetentionPolicy,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -72,8 +72,8 @@ func (m *WriteShardRequest) GetRetentionPolicy() string { } type WriteShardResponse struct { - Code *int32 `protobuf:"varint,1,req,name=Code" json:"Code,omitempty"` - Message *string `protobuf:"bytes,2,opt,name=Message" json:"Message,omitempty"` + Code *int32 `protobuf:"varint,1,req" json:"Code,omitempty"` + Message *string `protobuf:"bytes,2,opt" json:"Message,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -96,8 +96,8 @@ func (m *WriteShardResponse) GetMessage() string { } type ExecuteStatementRequest struct { - Statement *string `protobuf:"bytes,1,req,name=Statement" json:"Statement,omitempty"` - Database *string `protobuf:"bytes,2,req,name=Database" json:"Database,omitempty"` + Statement *string `protobuf:"bytes,1,req" json:"Statement,omitempty"` + Database *string `protobuf:"bytes,2,req" json:"Database,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -120,8 +120,8 @@ func (m *ExecuteStatementRequest) GetDatabase() string { } type ExecuteStatementResponse struct { - Code *int32 `protobuf:"varint,1,req,name=Code" json:"Code,omitempty"` - Message *string `protobuf:"bytes,2,opt,name=Message" json:"Message,omitempty"` + Code *int32 `protobuf:"varint,1,req" json:"Code,omitempty"` + Message *string `protobuf:"bytes,2,opt" json:"Message,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -144,8 +144,8 @@ func (m *ExecuteStatementResponse) GetMessage() string { } type CreateIteratorRequest struct { - ShardIDs []uint64 `protobuf:"varint,1,rep,name=ShardIDs" json:"ShardIDs,omitempty"` - Opt []byte `protobuf:"bytes,2,req,name=Opt" json:"Opt,omitempty"` + ShardIDs []uint64 `protobuf:"varint,1,rep" json:"ShardIDs,omitempty"` + Opt []byte `protobuf:"bytes,2,req" json:"Opt,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -168,7 +168,7 @@ func (m *CreateIteratorRequest) GetOpt() []byte { } type CreateIteratorResponse struct { - Err *string `protobuf:"bytes,1,opt,name=Err" json:"Err,omitempty"` + Err *string `protobuf:"bytes,1,opt" json:"Err,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -184,8 +184,8 @@ func (m *CreateIteratorResponse) GetErr() string { } type FieldDimensionsRequest struct { - ShardIDs []uint64 `protobuf:"varint,1,rep,name=ShardIDs" json:"ShardIDs,omitempty"` - Sources []byte `protobuf:"bytes,2,req,name=Sources" json:"Sources,omitempty"` + ShardIDs []uint64 `protobuf:"varint,1,rep" json:"ShardIDs,omitempty"` + Sources []byte `protobuf:"bytes,2,req" json:"Sources,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -208,9 +208,9 @@ func (m *FieldDimensionsRequest) GetSources() []byte { } type FieldDimensionsResponse struct { - Fields []string `protobuf:"bytes,1,rep,name=Fields" json:"Fields,omitempty"` - Dimensions []string `protobuf:"bytes,2,rep,name=Dimensions" json:"Dimensions,omitempty"` - Err *string `protobuf:"bytes,3,opt,name=Err" json:"Err,omitempty"` + Fields []string `protobuf:"bytes,1,rep" json:"Fields,omitempty"` + Dimensions []string `protobuf:"bytes,2,rep" json:"Dimensions,omitempty"` + Err *string `protobuf:"bytes,3,opt" json:"Err,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -240,8 +240,8 @@ func (m *FieldDimensionsResponse) GetErr() string { } type SeriesKeysRequest struct { - ShardIDs []uint64 `protobuf:"varint,1,rep,name=ShardIDs" json:"ShardIDs,omitempty"` - Opt []byte `protobuf:"bytes,2,req,name=Opt" json:"Opt,omitempty"` + ShardIDs []uint64 `protobuf:"varint,1,rep" json:"ShardIDs,omitempty"` + Opt []byte `protobuf:"bytes,2,req" json:"Opt,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -264,8 +264,8 @@ func (m *SeriesKeysRequest) GetOpt() []byte { } type SeriesKeysResponse struct { - SeriesList []byte `protobuf:"bytes,1,opt,name=SeriesList" json:"SeriesList,omitempty"` - Err *string `protobuf:"bytes,2,opt,name=Err" json:"Err,omitempty"` + SeriesList []byte `protobuf:"bytes,1,opt" json:"SeriesList,omitempty"` + Err *string `protobuf:"bytes,2,opt" json:"Err,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -287,15 +287,53 @@ func (m *SeriesKeysResponse) GetErr() string { return "" } -func init() { - proto.RegisterType((*WriteShardRequest)(nil), "internal.WriteShardRequest") - proto.RegisterType((*WriteShardResponse)(nil), "internal.WriteShardResponse") - proto.RegisterType((*ExecuteStatementRequest)(nil), "internal.ExecuteStatementRequest") - proto.RegisterType((*ExecuteStatementResponse)(nil), "internal.ExecuteStatementResponse") - proto.RegisterType((*CreateIteratorRequest)(nil), "internal.CreateIteratorRequest") - proto.RegisterType((*CreateIteratorResponse)(nil), "internal.CreateIteratorResponse") - proto.RegisterType((*FieldDimensionsRequest)(nil), "internal.FieldDimensionsRequest") - proto.RegisterType((*FieldDimensionsResponse)(nil), "internal.FieldDimensionsResponse") - proto.RegisterType((*SeriesKeysRequest)(nil), "internal.SeriesKeysRequest") - proto.RegisterType((*SeriesKeysResponse)(nil), "internal.SeriesKeysResponse") +type ExpandSourcesRequest struct { + ShardIDs []uint64 `protobuf:"varint,1,rep" json:"ShardIDs,omitempty"` + Sources []byte `protobuf:"bytes,2,req" json:"Sources,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *ExpandSourcesRequest) Reset() { *m = ExpandSourcesRequest{} } +func (m *ExpandSourcesRequest) String() string { return proto.CompactTextString(m) } +func (*ExpandSourcesRequest) ProtoMessage() {} + +func (m *ExpandSourcesRequest) GetShardIDs() []uint64 { + if m != nil { + return m.ShardIDs + } + return nil +} + +func (m *ExpandSourcesRequest) GetSources() []byte { + if m != nil { + return m.Sources + } + return nil +} + +type ExpandSourcesResponse struct { + Sources []byte `protobuf:"bytes,1,req" json:"Sources,omitempty"` + Err *string `protobuf:"bytes,2,opt" json:"Err,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *ExpandSourcesResponse) Reset() { *m = ExpandSourcesResponse{} } +func (m *ExpandSourcesResponse) String() string { return proto.CompactTextString(m) } +func (*ExpandSourcesResponse) ProtoMessage() {} + +func (m *ExpandSourcesResponse) GetSources() []byte { + if m != nil { + return m.Sources + } + return nil +} + +func (m *ExpandSourcesResponse) GetErr() string { + if m != nil && m.Err != nil { + return *m.Err + } + return "" +} + +func init() { } diff --git a/cluster/internal/data.proto b/cluster/internal/data.proto index 97e686e306..a1579035e0 100644 --- a/cluster/internal/data.proto +++ b/cluster/internal/data.proto @@ -52,3 +52,12 @@ message SeriesKeysResponse { optional string Err = 2; } +message ExpandSourcesRequest { + repeated uint64 ShardIDs = 1; + required bytes Sources = 2; +} + +message ExpandSourcesResponse { + required bytes Sources = 1; + optional string Err = 2; +} diff --git a/cluster/query_executor.go b/cluster/query_executor.go index 937b41594c..e8e83928a1 100644 --- a/cluster/query_executor.go +++ b/cluster/query_executor.go @@ -415,13 +415,6 @@ func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, c opt.MinTime = time.Unix(0, 0) } - // Expand regex sources to their actual source names. - sources, err := e.TSDBStore.ExpandSources(stmt.Sources) - if err != nil { - return err - } - stmt.Sources = sources - // Convert DISTINCT into a call. stmt.RewriteDistinct() @@ -434,6 +427,15 @@ func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, c return err } + // Expand regex sources to their actual source names. + if stmt.Sources.HasRegex() { + sources, err := ic.ExpandSources(stmt.Sources) + if err != nil { + return err + } + stmt.Sources = sources + } + // Rewrite wildcards, if any exist. tmp, err := stmt.RewriteWildcards(ic) if err != nil { @@ -1056,6 +1058,30 @@ func (ic *remoteIteratorCreator) SeriesKeys(opt influxql.IteratorOptions) (influ return resp.SeriesList, resp.Err } +// ExpandSources expands regex sources on a remote iterator creator. +func (ic *remoteIteratorCreator) ExpandSources(sources influxql.Sources) (influxql.Sources, error) { + conn, err := ic.dialer.DialNode(ic.nodeID) + if err != nil { + return nil, err + } + defer conn.Close() + + // Write request. + if err := EncodeTLV(conn, expandSourcesRequestMessage, &ExpandSourcesRequest{ + ShardIDs: ic.shardIDs, + Sources: sources, + }); err != nil { + return nil, err + } + + // Read the response. + var resp ExpandSourcesResponse + if _, err := DecodeTLV(conn, &resp); err != nil { + return nil, err + } + return resp.Sources, resp.Err +} + // NodeDialer dials connections to a given node. type NodeDialer struct { MetaClient MetaClient diff --git a/cluster/query_executor_test.go b/cluster/query_executor_test.go index f14a810c18..6a171cad10 100644 --- a/cluster/query_executor_test.go +++ b/cluster/query_executor_test.go @@ -275,6 +275,7 @@ type IteratorCreator struct { CreateIteratorFn func(opt influxql.IteratorOptions) (influxql.Iterator, error) FieldDimensionsFn func(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) SeriesKeysFn func(opt influxql.IteratorOptions) (influxql.SeriesList, error) + ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error) } func (ic *IteratorCreator) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) { @@ -289,6 +290,10 @@ func (ic *IteratorCreator) SeriesKeys(opt influxql.IteratorOptions) (influxql.Se return ic.SeriesKeysFn(opt) } +func (ic *IteratorCreator) ExpandSources(sources influxql.Sources) (influxql.Sources, error) { + return ic.ExpandSourcesFn(sources) +} + // FloatIterator is a represents an iterator that reads from a slice. type FloatIterator struct { Points []influxql.FloatPoint diff --git a/cluster/rpc.go b/cluster/rpc.go index 8710e89980..814615af3c 100644 --- a/cluster/rpc.go +++ b/cluster/rpc.go @@ -411,3 +411,72 @@ func (r *SeriesKeysResponse) UnmarshalBinary(data []byte) error { return nil } + +// ExpandSourcesRequest represents a request to expand regex sources. +type ExpandSourcesRequest struct { + ShardIDs []uint64 + Sources influxql.Sources +} + +// MarshalBinary encodes r to a binary format. +func (r *ExpandSourcesRequest) MarshalBinary() ([]byte, error) { + buf, err := r.Sources.MarshalBinary() + if err != nil { + return nil, err + } + return proto.Marshal(&internal.ExpandSourcesRequest{ + ShardIDs: r.ShardIDs, + Sources: buf, + }) +} + +// UnmarshalBinary decodes data into r. +func (r *ExpandSourcesRequest) UnmarshalBinary(data []byte) error { + var pb internal.ExpandSourcesRequest + if err := proto.Unmarshal(data, &pb); err != nil { + return err + } + + r.ShardIDs = pb.GetShardIDs() + if err := r.Sources.UnmarshalBinary(pb.GetSources()); err != nil { + return err + } + return nil +} + +// ExpandSourcesResponse represents a response from source expansion. +type ExpandSourcesResponse struct { + Sources influxql.Sources + Err error +} + +// MarshalBinary encodes r to a binary format. +func (r *ExpandSourcesResponse) MarshalBinary() ([]byte, error) { + var pb internal.ExpandSourcesResponse + buf, err := r.Sources.MarshalBinary() + if err != nil { + return nil, err + } + pb.Sources = buf + + if r.Err != nil { + pb.Err = proto.String(r.Err.Error()) + } + return proto.Marshal(&pb) +} + +// UnmarshalBinary decodes data into r. +func (r *ExpandSourcesResponse) UnmarshalBinary(data []byte) error { + var pb internal.ExpandSourcesResponse + if err := proto.Unmarshal(data, &pb); err != nil { + return err + } + if err := r.Sources.UnmarshalBinary(pb.GetSources()); err != nil { + return err + } + + if pb.Err != nil { + r.Err = errors.New(pb.GetErr()) + } + return nil +} diff --git a/cluster/service.go b/cluster/service.go index a24d569a1f..cedc5a76f1 100644 --- a/cluster/service.go +++ b/cluster/service.go @@ -38,6 +38,9 @@ const ( seriesKeysReq = "seriesKeysReq" seriesKeysResp = "seriesKeysResp" + + expandSourcesReq = "expandSourcesReq" + expandSourcesResp = "expandSourcesResp" ) // Service processes data received over raw TCP connections. @@ -196,6 +199,10 @@ func (s *Service) handleConn(conn net.Conn) { s.statMap.Add(seriesKeysReq, 1) s.processSeriesKeysRequest(conn) return + case expandSourcesRequestMessage: + s.statMap.Add(expandSourcesReq, 1) + s.processExpandSourcesRequest(conn) + return default: s.Logger.Printf("cluster service message type not found: %d", typ) } @@ -439,6 +446,48 @@ func (s *Service) processSeriesKeysRequest(conn net.Conn) { } } +func (s *Service) processExpandSourcesRequest(conn net.Conn) { + var sources influxql.Sources + if err := func() error { + // Parse request. + var req ExpandSourcesRequest + if err := DecodeLV(conn, &req); err != nil { + return err + } + + // Collect iterator creators for each shard. + ics := make([]influxql.IteratorCreator, 0, len(req.ShardIDs)) + for _, shardID := range req.ShardIDs { + ic := s.TSDBStore.ShardIteratorCreator(shardID) + if ic == nil { + return nil + } + ics = append(ics, ic) + } + + // Expand sources from all shards. + a, err := influxql.IteratorCreators(ics).ExpandSources(req.Sources) + if err != nil { + return err + } + sources = a + + return nil + }(); err != nil { + s.Logger.Printf("error reading ExpandSources request: %s", err) + EncodeTLV(conn, expandSourcesResponseMessage, &ExpandSourcesResponse{Err: err}) + return + } + + // Encode success response. + if err := EncodeTLV(conn, expandSourcesResponseMessage, &ExpandSourcesResponse{ + Sources: sources, + }); err != nil { + s.Logger.Printf("error writing ExpandSources response: %s", err) + return + } +} + // ReadTLV reads a type-length-value record from r. func ReadTLV(r io.Reader) (byte, []byte, error) { typ, err := ReadType(r) diff --git a/cluster/shard_writer.go b/cluster/shard_writer.go index 9a44137a8e..4a83f8b97d 100644 --- a/cluster/shard_writer.go +++ b/cluster/shard_writer.go @@ -24,6 +24,9 @@ const ( seriesKeysRequestMessage seriesKeysResponseMessage + + expandSourcesRequestMessage + expandSourcesResponseMessage ) // ShardWriter writes a set of points to a shard. diff --git a/influxql/ast.go b/influxql/ast.go index cbfb1a2c04..59fa62da6e 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -309,6 +309,19 @@ func (a Sources) HasSystemSource() bool { return false } +// HasRegex returns true if any of the sources are regex measurements. +func (a Sources) HasRegex() bool { + for _, s := range a { + switch s := s.(type) { + case *Measurement: + if s.Regex != nil { + return true + } + } + } + return false +} + // String returns a string representation of a Sources array. func (a Sources) String() string { var buf bytes.Buffer diff --git a/influxql/internal/internal.pb.go b/influxql/internal/internal.pb.go index 03c7c5f580..74c19fef47 100644 --- a/influxql/internal/internal.pb.go +++ b/influxql/internal/internal.pb.go @@ -21,25 +21,23 @@ It has these top-level messages: package internal import proto "github.com/gogo/protobuf/proto" -import fmt "fmt" import math "math" // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal -var _ = fmt.Errorf var _ = math.Inf type Point struct { - Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"` - Tags *string `protobuf:"bytes,2,req,name=Tags" json:"Tags,omitempty"` - Time *int64 `protobuf:"varint,3,req,name=Time" json:"Time,omitempty"` - Nil *bool `protobuf:"varint,4,req,name=Nil" json:"Nil,omitempty"` - Aux []*Aux `protobuf:"bytes,5,rep,name=Aux" json:"Aux,omitempty"` - Aggregated *uint32 `protobuf:"varint,6,opt,name=Aggregated" json:"Aggregated,omitempty"` - FloatValue *float64 `protobuf:"fixed64,7,opt,name=FloatValue" json:"FloatValue,omitempty"` - IntegerValue *int64 `protobuf:"varint,8,opt,name=IntegerValue" json:"IntegerValue,omitempty"` - StringValue *string `protobuf:"bytes,9,opt,name=StringValue" json:"StringValue,omitempty"` - BooleanValue *bool `protobuf:"varint,10,opt,name=BooleanValue" json:"BooleanValue,omitempty"` + Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` + Tags *string `protobuf:"bytes,2,req" json:"Tags,omitempty"` + Time *int64 `protobuf:"varint,3,req" json:"Time,omitempty"` + Nil *bool `protobuf:"varint,4,req" json:"Nil,omitempty"` + Aux []*Aux `protobuf:"bytes,5,rep" json:"Aux,omitempty"` + Aggregated *uint32 `protobuf:"varint,6,opt" json:"Aggregated,omitempty"` + FloatValue *float64 `protobuf:"fixed64,7,opt" json:"FloatValue,omitempty"` + IntegerValue *int64 `protobuf:"varint,8,opt" json:"IntegerValue,omitempty"` + StringValue *string `protobuf:"bytes,9,opt" json:"StringValue,omitempty"` + BooleanValue *bool `protobuf:"varint,10,opt" json:"BooleanValue,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -118,11 +116,11 @@ func (m *Point) GetBooleanValue() bool { } type Aux struct { - DataType *int32 `protobuf:"varint,1,req,name=DataType" json:"DataType,omitempty"` - FloatValue *float64 `protobuf:"fixed64,2,opt,name=FloatValue" json:"FloatValue,omitempty"` - IntegerValue *int64 `protobuf:"varint,3,opt,name=IntegerValue" json:"IntegerValue,omitempty"` - StringValue *string `protobuf:"bytes,4,opt,name=StringValue" json:"StringValue,omitempty"` - BooleanValue *bool `protobuf:"varint,5,opt,name=BooleanValue" json:"BooleanValue,omitempty"` + DataType *int32 `protobuf:"varint,1,req" json:"DataType,omitempty"` + FloatValue *float64 `protobuf:"fixed64,2,opt" json:"FloatValue,omitempty"` + IntegerValue *int64 `protobuf:"varint,3,opt" json:"IntegerValue,omitempty"` + StringValue *string `protobuf:"bytes,4,opt" json:"StringValue,omitempty"` + BooleanValue *bool `protobuf:"varint,5,opt" json:"BooleanValue,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -166,22 +164,22 @@ func (m *Aux) GetBooleanValue() bool { } type IteratorOptions struct { - Expr *string `protobuf:"bytes,1,opt,name=Expr" json:"Expr,omitempty"` - Aux []string `protobuf:"bytes,2,rep,name=Aux" json:"Aux,omitempty"` - Sources []*Measurement `protobuf:"bytes,3,rep,name=Sources" json:"Sources,omitempty"` - Interval *Interval `protobuf:"bytes,4,opt,name=Interval" json:"Interval,omitempty"` - Dimensions []string `protobuf:"bytes,5,rep,name=Dimensions" json:"Dimensions,omitempty"` - Fill *int32 `protobuf:"varint,6,opt,name=Fill" json:"Fill,omitempty"` - FillValue *float64 `protobuf:"fixed64,7,opt,name=FillValue" json:"FillValue,omitempty"` - Condition *string `protobuf:"bytes,8,opt,name=Condition" json:"Condition,omitempty"` - StartTime *int64 `protobuf:"varint,9,opt,name=StartTime" json:"StartTime,omitempty"` - EndTime *int64 `protobuf:"varint,10,opt,name=EndTime" json:"EndTime,omitempty"` - Ascending *bool `protobuf:"varint,11,opt,name=Ascending" json:"Ascending,omitempty"` - Limit *int64 `protobuf:"varint,12,opt,name=Limit" json:"Limit,omitempty"` - Offset *int64 `protobuf:"varint,13,opt,name=Offset" json:"Offset,omitempty"` - SLimit *int64 `protobuf:"varint,14,opt,name=SLimit" json:"SLimit,omitempty"` - SOffset *int64 `protobuf:"varint,15,opt,name=SOffset" json:"SOffset,omitempty"` - Dedupe *bool `protobuf:"varint,16,opt,name=Dedupe" json:"Dedupe,omitempty"` + Expr *string `protobuf:"bytes,1,opt" json:"Expr,omitempty"` + Aux []string `protobuf:"bytes,2,rep" json:"Aux,omitempty"` + Sources []*Measurement `protobuf:"bytes,3,rep" json:"Sources,omitempty"` + Interval *Interval `protobuf:"bytes,4,opt" json:"Interval,omitempty"` + Dimensions []string `protobuf:"bytes,5,rep" json:"Dimensions,omitempty"` + Fill *int32 `protobuf:"varint,6,opt" json:"Fill,omitempty"` + FillValue *float64 `protobuf:"fixed64,7,opt" json:"FillValue,omitempty"` + Condition *string `protobuf:"bytes,8,opt" json:"Condition,omitempty"` + StartTime *int64 `protobuf:"varint,9,opt" json:"StartTime,omitempty"` + EndTime *int64 `protobuf:"varint,10,opt" json:"EndTime,omitempty"` + Ascending *bool `protobuf:"varint,11,opt" json:"Ascending,omitempty"` + Limit *int64 `protobuf:"varint,12,opt" json:"Limit,omitempty"` + Offset *int64 `protobuf:"varint,13,opt" json:"Offset,omitempty"` + SLimit *int64 `protobuf:"varint,14,opt" json:"SLimit,omitempty"` + SOffset *int64 `protobuf:"varint,15,opt" json:"SOffset,omitempty"` + Dedupe *bool `protobuf:"varint,16,opt" json:"Dedupe,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -302,7 +300,7 @@ func (m *IteratorOptions) GetDedupe() bool { } type Measurements struct { - Items []*Measurement `protobuf:"bytes,1,rep,name=Items" json:"Items,omitempty"` + Items []*Measurement `protobuf:"bytes,1,rep" json:"Items,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -318,11 +316,11 @@ func (m *Measurements) GetItems() []*Measurement { } type Measurement struct { - Database *string `protobuf:"bytes,1,opt,name=Database" json:"Database,omitempty"` - RetentionPolicy *string `protobuf:"bytes,2,opt,name=RetentionPolicy" json:"RetentionPolicy,omitempty"` - Name *string `protobuf:"bytes,3,opt,name=Name" json:"Name,omitempty"` - Regex *string `protobuf:"bytes,4,opt,name=Regex" json:"Regex,omitempty"` - IsTarget *bool `protobuf:"varint,5,opt,name=IsTarget" json:"IsTarget,omitempty"` + Database *string `protobuf:"bytes,1,opt" json:"Database,omitempty"` + RetentionPolicy *string `protobuf:"bytes,2,opt" json:"RetentionPolicy,omitempty"` + Name *string `protobuf:"bytes,3,opt" json:"Name,omitempty"` + Regex *string `protobuf:"bytes,4,opt" json:"Regex,omitempty"` + IsTarget *bool `protobuf:"varint,5,opt" json:"IsTarget,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -366,8 +364,8 @@ func (m *Measurement) GetIsTarget() bool { } type Interval struct { - Duration *int64 `protobuf:"varint,1,opt,name=Duration" json:"Duration,omitempty"` - Offset *int64 `protobuf:"varint,2,opt,name=Offset" json:"Offset,omitempty"` + Duration *int64 `protobuf:"varint,1,opt" json:"Duration,omitempty"` + Offset *int64 `protobuf:"varint,2,opt" json:"Offset,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -390,9 +388,9 @@ func (m *Interval) GetOffset() int64 { } type Series struct { - Name *string `protobuf:"bytes,1,opt,name=Name" json:"Name,omitempty"` - Tags []byte `protobuf:"bytes,2,opt,name=Tags" json:"Tags,omitempty"` - Aux []uint32 `protobuf:"varint,3,rep,name=Aux" json:"Aux,omitempty"` + Name *string `protobuf:"bytes,1,opt" json:"Name,omitempty"` + Tags []byte `protobuf:"bytes,2,opt" json:"Tags,omitempty"` + Aux []uint32 `protobuf:"varint,3,rep" json:"Aux,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -422,7 +420,7 @@ func (m *Series) GetAux() []uint32 { } type SeriesList struct { - Items []*Series `protobuf:"bytes,1,rep,name=Items" json:"Items,omitempty"` + Items []*Series `protobuf:"bytes,1,rep" json:"Items,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -438,12 +436,4 @@ func (m *SeriesList) GetItems() []*Series { } func init() { - proto.RegisterType((*Point)(nil), "internal.Point") - proto.RegisterType((*Aux)(nil), "internal.Aux") - proto.RegisterType((*IteratorOptions)(nil), "internal.IteratorOptions") - proto.RegisterType((*Measurements)(nil), "internal.Measurements") - proto.RegisterType((*Measurement)(nil), "internal.Measurement") - proto.RegisterType((*Interval)(nil), "internal.Interval") - proto.RegisterType((*Series)(nil), "internal.Series") - proto.RegisterType((*SeriesList)(nil), "internal.SeriesList") } diff --git a/influxql/iterator.gen.go b/influxql/iterator.gen.go index a19ba34581..2754407635 100644 --- a/influxql/iterator.gen.go +++ b/influxql/iterator.gen.go @@ -612,6 +612,10 @@ func (itr *floatAuxIterator) SeriesKeys(opt IteratorOptions) (SeriesList, error) return nil, errors.New("not implemented") } +func (itr *floatAuxIterator) ExpandSources(sources Sources) (Sources, error) { + return nil, errors.New("not implemented") +} + func (itr *floatAuxIterator) stream() { for { // Read next point. @@ -1750,6 +1754,10 @@ func (itr *integerAuxIterator) SeriesKeys(opt IteratorOptions) (SeriesList, erro return nil, errors.New("not implemented") } +func (itr *integerAuxIterator) ExpandSources(sources Sources) (Sources, error) { + return nil, errors.New("not implemented") +} + func (itr *integerAuxIterator) stream() { for { // Read next point. @@ -2888,6 +2896,10 @@ func (itr *stringAuxIterator) SeriesKeys(opt IteratorOptions) (SeriesList, error return nil, errors.New("not implemented") } +func (itr *stringAuxIterator) ExpandSources(sources Sources) (Sources, error) { + return nil, errors.New("not implemented") +} + func (itr *stringAuxIterator) stream() { for { // Read next point. @@ -4026,6 +4038,10 @@ func (itr *booleanAuxIterator) SeriesKeys(opt IteratorOptions) (SeriesList, erro return nil, errors.New("not implemented") } +func (itr *booleanAuxIterator) ExpandSources(sources Sources) (Sources, error) { + return nil, errors.New("not implemented") +} + func (itr *booleanAuxIterator) stream() { for { // Read next point. diff --git a/influxql/iterator.gen.go.tmpl b/influxql/iterator.gen.go.tmpl index 661e489300..b4d72f2166 100644 --- a/influxql/iterator.gen.go.tmpl +++ b/influxql/iterator.gen.go.tmpl @@ -611,7 +611,11 @@ func (itr *{{$k.name}}AuxIterator) SeriesKeys(opt IteratorOptions) (SeriesList, return nil, errors.New("not implemented") } -func (itr *{{$k.name}}AuxIterator) stream() { +func (itr *{{.name}}AuxIterator) ExpandSources(sources Sources) (Sources, error) { + return nil, errors.New("not implemented") +} + +func (itr *{{.name}}AuxIterator) stream() { for { // Read next point. p := itr.input.Next() diff --git a/influxql/iterator.go b/influxql/iterator.go index cee4cc3ae6..b2bdc44482 100644 --- a/influxql/iterator.go +++ b/influxql/iterator.go @@ -444,6 +444,9 @@ type IteratorCreator interface { // Returns the series keys that will be returned by this iterator. SeriesKeys(opt IteratorOptions) (SeriesList, error) + + // Expands regex sources to all matching sources. + ExpandSources(sources Sources) (Sources, error) } // IteratorCreators represents a list of iterator creators. @@ -544,6 +547,42 @@ func (a IteratorCreators) SeriesKeys(opt IteratorOptions) (SeriesList, error) { return SeriesList(seriesList), nil } +// ExpandSources expands sources across all iterator creators and returns a unique result. +func (a IteratorCreators) ExpandSources(sources Sources) (Sources, error) { + m := make(map[string]Source) + + for _, ic := range a { + expanded, err := ic.ExpandSources(sources) + if err != nil { + return nil, err + } + + for _, src := range expanded { + switch src := src.(type) { + case *Measurement: + m[src.String()] = src + default: + return nil, fmt.Errorf("IteratorCreators.ExpandSources: unsupported source type: %T", src) + } + } + } + + // Convert set to sorted slice. + names := make([]string, 0, len(m)) + for name := range m { + names = append(names, name) + } + sort.Strings(names) + + // Convert set to a list of Sources. + sorted := make(Sources, 0, len(m)) + for _, name := range names { + sorted = append(sorted, m[name]) + } + + return sorted, nil +} + // IteratorOptions is an object passed to CreateIterator to specify creation options. type IteratorOptions struct { // Expression to iterate for. diff --git a/influxql/iterator_test.go b/influxql/iterator_test.go index 668d7e47df..5d9ca71d3d 100644 --- a/influxql/iterator_test.go +++ b/influxql/iterator_test.go @@ -959,6 +959,7 @@ type IteratorCreator struct { CreateIteratorFn func(opt influxql.IteratorOptions) (influxql.Iterator, error) FieldDimensionsFn func(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) SeriesKeysFn func(opt influxql.IteratorOptions) (influxql.SeriesList, error) + ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error) } func (ic *IteratorCreator) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) { @@ -1010,6 +1011,10 @@ func (ic *IteratorCreator) SeriesKeys(opt influxql.IteratorOptions) (influxql.Se return influxql.SeriesList(seriesList), nil } +func (ic *IteratorCreator) ExpandSources(sources influxql.Sources) (influxql.Sources, error) { + return ic.ExpandSourcesFn(sources) +} + // Test implementation of influxql.FloatIterator type FloatIterator struct { Points []influxql.FloatPoint diff --git a/tsdb/shard.go b/tsdb/shard.go index 3b9b58d5c3..8f236f6948 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -491,6 +491,53 @@ func (s *Shard) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, e return s.engine.SeriesKeys(opt) } +// ExpandSources expands regex sources and removes duplicates. +// NOTE: sources must be normalized (db and rp set) before calling this function. +func (s *Shard) ExpandSources(sources influxql.Sources) (influxql.Sources, error) { + // Use a map as a set to prevent duplicates. + set := map[string]influxql.Source{} + + // Iterate all sources, expanding regexes when they're found. + for _, source := range sources { + switch src := source.(type) { + case *influxql.Measurement: + // Add non-regex measurements directly to the set. + if src.Regex == nil { + set[src.String()] = src + continue + } + + // Loop over matching measurements. + for _, m := range s.index.MeasurementsByRegex(src.Regex.Val) { + other := &influxql.Measurement{ + Database: src.Database, + RetentionPolicy: src.RetentionPolicy, + Name: m.Name, + } + set[other.String()] = other + } + + default: + return nil, fmt.Errorf("expandSources: unsupported source type: %T", source) + } + } + + // Convert set to sorted slice. + names := make([]string, 0, len(set)) + for name := range set { + names = append(names, name) + } + sort.Strings(names) + + // Convert set to a list of Sources. + expanded := make(influxql.Sources, 0, len(set)) + for _, name := range names { + expanded = append(expanded, set[name]) + } + + return expanded, nil +} + // Shards represents a sortable list of shards. type Shards []*Shard @@ -844,6 +891,9 @@ func (ic *shardIteratorCreator) FieldDimensions(sources influxql.Sources) (field func (ic *shardIteratorCreator) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error) { return ic.sh.SeriesKeys(opt) } +func (ic *shardIteratorCreator) ExpandSources(sources influxql.Sources) (influxql.Sources, error) { + return ic.sh.ExpandSources(sources) +} func NewFieldKeysIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) { fn := func(m *Measurement) []string { diff --git a/tsdb/store.go b/tsdb/store.go index 51266de99d..eb67902e95 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -486,6 +486,15 @@ func (s *Store) ShardRelativePath(id uint64) (string, error) { // DeleteSeries loops through the local shards and deletes the series data and metadata for the passed in series keys func (s *Store) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error { + // Expand regex expressions in the FROM clause. + a, err := s.ExpandSources(sources) + if err != nil { + return err + } else if sources != nil && len(sources) != 0 && len(a) == 0 { + return nil + } + sources = a + s.mu.RLock() defer s.mu.RUnlock() @@ -495,15 +504,6 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi return nil } - // Expand regex expressions in the FROM clause. - a, err := s.expandSources(sources) - if err != nil { - return err - } else if sources != nil && len(sources) != 0 && len(a) == 0 { - return nil - } - sources = a - measurements, err := measurementsFromSourcesOrDB(db, sources...) if err != nil { return err @@ -566,63 +566,21 @@ func (s *Store) deleteSeries(database string, seriesKeys []string) error { return nil } -// ExpandSources expands regex sources and removes duplicates. -// NOTE: sources must be normalized (db and rp set) before calling this function. +// ExpandSources expands sources against all local shards. func (s *Store) ExpandSources(sources influxql.Sources) (influxql.Sources, error) { - s.mu.RLock() - defer s.mu.RUnlock() - return s.expandSources(sources) + return s.IteratorCreators().ExpandSources(sources) } -func (s *Store) expandSources(sources influxql.Sources) (influxql.Sources, error) { - // Use a map as a set to prevent duplicates. - set := map[string]influxql.Source{} +// IteratorCreators returns a set of all local shards as iterator creators. +func (s *Store) IteratorCreators() influxql.IteratorCreators { + s.mu.Lock() + defer s.mu.Unlock() - // Iterate all sources, expanding regexes when they're found. - for _, source := range sources { - switch src := source.(type) { - case *influxql.Measurement: - // Add non-regex measurements directly to the set. - if src.Regex == nil { - set[src.String()] = src - continue - } - - // Lookup the database. - db := s.databaseIndexes[src.Database] - if db == nil { - return nil, nil - } - - // Loop over matching measurements. - for _, m := range db.MeasurementsByRegex(src.Regex.Val) { - other := &influxql.Measurement{ - Database: src.Database, - RetentionPolicy: src.RetentionPolicy, - Name: m.Name, - } - set[other.String()] = other - } - - default: - return nil, fmt.Errorf("expandSources: unsupported source type: %T", source) - } + a := make(influxql.IteratorCreators, 0, len(s.shards)) + for _, sh := range s.shards { + a = append(a, sh) } - - // Convert set to sorted slice. - names := make([]string, 0, len(set)) - for name := range set { - names = append(names, name) - } - sort.Strings(names) - - // Convert set to a list of Sources. - expanded := make(influxql.Sources, 0, len(set)) - for _, name := range names { - expanded = append(expanded, set[name]) - } - - return expanded, nil + return a } // WriteToShard writes a list of points to a shard identified by its ID.