Support cast syntax for selecting a specific type
Casting syntax is done with the PostgreSQL syntax `field1::float` to specify which type should be used when selecting a field. You can also do `field1::field` or `tag1::tag` to specify that a field or tag should be selected. This makes it possible to select a tag when a field key and a tag key conflict with each other in a measurement. It also means it's possible to choose a field with a specific type if multiple shards disagree. If no types are given, the same ordering for how a type is chosen is used to determine which type to return. The FieldDimensions method has been updated to return the data type for the fields that get returned. The SeriesKeys function has also been removed since it is no longer needed. SeriesKeys was originally used for the fill iterator, but then expanded to be used by auxiliary iterators for determining the channel iterator types. The fill iterator doesn't need it anymore and the auxiliary types are better served by FieldDimensions implementing that functionality, so SeriesKeys is no longer needed. Fixes #6519.pull/6529/head
parent
274647b5b2
commit
23f6a706bb
cmd/influxd/run
|
@ -10,6 +10,7 @@
|
|||
- [#6609](https://github.com/influxdata/influxdb/pull/6609): Add support for JWT token authentication.
|
||||
- [#6559](https://github.com/influxdata/influxdb/issues/6559): Teach the http service how to enforce connection limits.
|
||||
- [#6623](https://github.com/influxdata/influxdb/pull/6623): Speed up drop database
|
||||
- [#6519](https://github.com/influxdata/influxdb/issues/6519): Support cast syntax for selecting a specific type.
|
||||
|
||||
### Bugfixes
|
||||
|
||||
|
|
|
@ -3422,7 +3422,7 @@ func TestServer_Query_AggregateSelectors(t *testing.T) {
|
|||
name: "baseline",
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
command: `SELECT * FROM network`,
|
||||
exp: `{"results":[{"series":[{"name":"network","columns":["time","core","host","region","rx","tx"],"values":[["2000-01-01T00:00:00Z",2,"server01","west",10,20],["2000-01-01T00:00:10Z",3,"server02","west",40,50],["2000-01-01T00:00:20Z",4,"server03","east",40,55],["2000-01-01T00:00:30Z",1,"server04","east",40,60],["2000-01-01T00:00:40Z",2,"server05","west",50,70],["2000-01-01T00:00:50Z",3,"server06","east",50,40],["2000-01-01T00:01:00Z",4,"server07","west",70,30],["2000-01-01T00:01:10Z",1,"server08","east",90,10],["2000-01-01T00:01:20Z",2,"server09","east",5,4]]}]}]}`,
|
||||
exp: `{"results":[{"series":[{"name":"network","columns":["time","core","core_1","host","region","rx","tx"],"values":[["2000-01-01T00:00:00Z",2,"1","server01","west",10,20],["2000-01-01T00:00:10Z",3,"2","server02","west",40,50],["2000-01-01T00:00:20Z",4,"3","server03","east",40,55],["2000-01-01T00:00:30Z",1,"4","server04","east",40,60],["2000-01-01T00:00:40Z",2,"1","server05","west",50,70],["2000-01-01T00:00:50Z",3,"2","server06","east",50,40],["2000-01-01T00:01:00Z",4,"3","server07","west",70,30],["2000-01-01T00:01:10Z",1,"4","server08","east",90,10],["2000-01-01T00:01:20Z",2,"1","server09","east",5,4]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "max - baseline 30s",
|
||||
|
@ -4291,10 +4291,10 @@ func TestServer_Query_WildcardExpansion(t *testing.T) {
|
|||
exp: `{"results":[{"series":[{"name":"wildcard","columns":["time","c","h","region","value"],"values":[["2000-01-01T00:00:00Z",80,"A","us-east",10],["2000-01-01T00:00:10Z",90,"B","us-east",20],["2000-01-01T00:00:20Z",70,"B","us-west",30],["2000-01-01T00:00:30Z",60,"A","us-east",40]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "duplicate tag and field key, always favor field over tag",
|
||||
name: "duplicate tag and field key",
|
||||
command: `SELECT * FROM dupnames`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
exp: `{"results":[{"series":[{"name":"dupnames","columns":["time","day","region","value"],"values":[["2000-01-01T00:00:00Z",3,"us-east",10],["2000-01-01T00:00:10Z",2,"us-east",20],["2000-01-01T00:00:20Z",1,"us-west",30]]}]}]}`,
|
||||
exp: `{"results":[{"series":[{"name":"dupnames","columns":["time","day","day_1","region","value"],"values":[["2000-01-01T00:00:00Z",3,"1","us-east",10],["2000-01-01T00:00:10Z",2,"2","us-east",20],["2000-01-01T00:00:20Z",1,"3","us-west",30]]}]}]}`,
|
||||
},
|
||||
}...)
|
||||
|
||||
|
|
|
@ -416,7 +416,7 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
|
|||
}
|
||||
|
||||
// Rewrite wildcards, if any exist.
|
||||
tmp, err := stmt.RewriteWildcards(ic)
|
||||
tmp, err := stmt.RewriteFields(ic)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -48,13 +48,8 @@ func TestQueryExecutor_ExecuteQuery_SelectStatement(t *testing.T) {
|
|||
{Name: "cpu", Time: int64(1 * time.Second), Aux: []interface{}{float64(200)}},
|
||||
}}, nil
|
||||
}
|
||||
ic.FieldDimensionsFn = func(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) {
|
||||
return map[string]struct{}{"value": struct{}{}}, nil, nil
|
||||
}
|
||||
ic.SeriesKeysFn = func(opt influxql.IteratorOptions) (influxql.SeriesList, error) {
|
||||
return influxql.SeriesList{
|
||||
{Name: "cpu", Aux: []influxql.DataType{influxql.Float}},
|
||||
}, nil
|
||||
ic.FieldDimensionsFn = func(sources influxql.Sources) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) {
|
||||
return map[string]influxql.DataType{"value": influxql.Float}, nil, nil
|
||||
}
|
||||
return &ic
|
||||
}
|
||||
|
@ -99,13 +94,8 @@ func TestQueryExecutor_ExecuteQuery_MaxSelectSeriesN(t *testing.T) {
|
|||
stats: influxql.IteratorStats{SeriesN: 2},
|
||||
}, nil
|
||||
}
|
||||
ic.FieldDimensionsFn = func(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) {
|
||||
return map[string]struct{}{"value": struct{}{}}, nil, nil
|
||||
}
|
||||
ic.SeriesKeysFn = func(opt influxql.IteratorOptions) (influxql.SeriesList, error) {
|
||||
return influxql.SeriesList{
|
||||
{Name: "cpu", Aux: []influxql.DataType{influxql.Float}},
|
||||
}, nil
|
||||
ic.FieldDimensionsFn = func(sources influxql.Sources) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) {
|
||||
return map[string]influxql.DataType{"value": influxql.Float}, nil, nil
|
||||
}
|
||||
e.TSDBStore.ShardIteratorCreatorFn = func(id uint64) influxql.IteratorCreator { return &ic }
|
||||
|
||||
|
@ -138,13 +128,8 @@ func TestQueryExecutor_ExecuteQuery_MaxSelectBucketsN(t *testing.T) {
|
|||
Points: []influxql.FloatPoint{{Name: "cpu", Time: int64(0 * time.Second), Aux: []interface{}{float64(100)}}},
|
||||
}, nil
|
||||
}
|
||||
ic.FieldDimensionsFn = func(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) {
|
||||
return map[string]struct{}{"value": struct{}{}}, nil, nil
|
||||
}
|
||||
ic.SeriesKeysFn = func(opt influxql.IteratorOptions) (influxql.SeriesList, error) {
|
||||
return influxql.SeriesList{
|
||||
{Name: "cpu", Aux: []influxql.DataType{influxql.Float}},
|
||||
}, nil
|
||||
ic.FieldDimensionsFn = func(sources influxql.Sources) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) {
|
||||
return map[string]influxql.DataType{"value": influxql.Float}, nil, nil
|
||||
}
|
||||
e.TSDBStore.ShardIteratorCreatorFn = func(id uint64) influxql.IteratorCreator { return &ic }
|
||||
|
||||
|
@ -303,8 +288,7 @@ func ReadAllResults(c <-chan *influxql.Result) []*influxql.Result {
|
|||
// IteratorCreator is a mockable implementation of IteratorCreator.
|
||||
type IteratorCreator struct {
|
||||
CreateIteratorFn func(opt influxql.IteratorOptions) (influxql.Iterator, error)
|
||||
FieldDimensionsFn func(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error)
|
||||
SeriesKeysFn func(opt influxql.IteratorOptions) (influxql.SeriesList, error)
|
||||
FieldDimensionsFn func(sources influxql.Sources) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)
|
||||
ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error)
|
||||
}
|
||||
|
||||
|
@ -312,14 +296,10 @@ func (ic *IteratorCreator) CreateIterator(opt influxql.IteratorOptions) (influxq
|
|||
return ic.CreateIteratorFn(opt)
|
||||
}
|
||||
|
||||
func (ic *IteratorCreator) FieldDimensions(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) {
|
||||
func (ic *IteratorCreator) FieldDimensions(sources influxql.Sources) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) {
|
||||
return ic.FieldDimensionsFn(sources)
|
||||
}
|
||||
|
||||
func (ic *IteratorCreator) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error) {
|
||||
return ic.SeriesKeysFn(opt)
|
||||
}
|
||||
|
||||
func (ic *IteratorCreator) ExpandSources(sources influxql.Sources) (influxql.Sources, error) {
|
||||
return ic.ExpandSourcesFn(sources)
|
||||
}
|
||||
|
|
185
influxql/ast.go
185
influxql/ast.go
|
@ -32,6 +32,10 @@ const (
|
|||
Time = 5
|
||||
// Duration means the data type is a duration of time.
|
||||
Duration = 6
|
||||
// Tag means the data type is a tag.
|
||||
Tag = 7
|
||||
// AnyField means the data type is any field.
|
||||
AnyField = 8
|
||||
)
|
||||
|
||||
// InspectDataType returns the data type of a given value.
|
||||
|
@ -76,6 +80,10 @@ func (d DataType) String() string {
|
|||
return "time"
|
||||
case Duration:
|
||||
return "duration"
|
||||
case Tag:
|
||||
return "tag"
|
||||
case AnyField:
|
||||
return "field"
|
||||
}
|
||||
return "unknown"
|
||||
}
|
||||
|
@ -1012,10 +1020,36 @@ func cloneSource(s Source) Source {
|
|||
}
|
||||
}
|
||||
|
||||
// RewriteWildcards returns the re-written form of the select statement. Any wildcard query
|
||||
// RewriteFields returns the re-written form of the select statement. Any wildcard query
|
||||
// fields are replaced with the supplied fields, and any wildcard GROUP BY fields are replaced
|
||||
// with the supplied dimensions.
|
||||
func (s *SelectStatement) RewriteWildcards(ic IteratorCreator) (*SelectStatement, error) {
|
||||
// with the supplied dimensions. Any fields with no type specifier are rewritten with the
|
||||
// appropriate type.
|
||||
func (s *SelectStatement) RewriteFields(ic IteratorCreator) (*SelectStatement, error) {
|
||||
// Retrieve a list of unique field and dimensions.
|
||||
fieldSet, dimensionSet, err := ic.FieldDimensions(s.Sources)
|
||||
if err != nil {
|
||||
return s, err
|
||||
}
|
||||
|
||||
// Rewrite all variable references in the fields with their types if one
|
||||
// hasn't been specified.
|
||||
rewrite := func(n Node) {
|
||||
ref, ok := n.(*VarRef)
|
||||
if !ok || (ref.Type != Unknown && ref.Type != AnyField) {
|
||||
return
|
||||
}
|
||||
|
||||
if typ, ok := fieldSet[ref.Val]; ok {
|
||||
ref.Type = typ
|
||||
} else if ref.Type != AnyField {
|
||||
if _, ok := dimensionSet[ref.Val]; ok {
|
||||
ref.Type = Tag
|
||||
}
|
||||
}
|
||||
}
|
||||
WalkFunc(s.Fields, rewrite)
|
||||
WalkFunc(s.Condition, rewrite)
|
||||
|
||||
// Ignore if there are no wildcards.
|
||||
hasFieldWildcard := s.HasFieldWildcard()
|
||||
hasDimensionWildcard := s.HasDimensionWildcard()
|
||||
|
@ -1023,12 +1057,6 @@ func (s *SelectStatement) RewriteWildcards(ic IteratorCreator) (*SelectStatement
|
|||
return s, nil
|
||||
}
|
||||
|
||||
// Retrieve a list of unique field and dimensions.
|
||||
fieldSet, dimensionSet, err := ic.FieldDimensions(s.Sources)
|
||||
if err != nil {
|
||||
return s, err
|
||||
}
|
||||
|
||||
// If there are no dimension wildcards then merge dimensions to fields.
|
||||
if !hasDimensionWildcard {
|
||||
// Remove the dimensions present in the group by so they don't get added as fields.
|
||||
|
@ -1040,13 +1068,23 @@ func (s *SelectStatement) RewriteWildcards(ic IteratorCreator) (*SelectStatement
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
for k := range dimensionSet {
|
||||
fieldSet[k] = struct{}{}
|
||||
}
|
||||
dimensionSet = nil
|
||||
}
|
||||
fields := stringSetSlice(fieldSet)
|
||||
|
||||
// Sort the field and dimension names for wildcard expansion.
|
||||
var fields []VarRef
|
||||
if len(fieldSet) > 0 {
|
||||
fields = make([]VarRef, 0, len(fieldSet))
|
||||
for name, typ := range fieldSet {
|
||||
fields = append(fields, VarRef{Val: name, Type: typ})
|
||||
}
|
||||
if !hasDimensionWildcard {
|
||||
for name := range dimensionSet {
|
||||
fields = append(fields, VarRef{Val: name, Type: Tag})
|
||||
}
|
||||
dimensionSet = nil
|
||||
}
|
||||
sort.Sort(VarRefs(fields))
|
||||
}
|
||||
dimensions := stringSetSlice(dimensionSet)
|
||||
|
||||
other := s.Clone()
|
||||
|
@ -1056,10 +1094,15 @@ func (s *SelectStatement) RewriteWildcards(ic IteratorCreator) (*SelectStatement
|
|||
// Allocate a slice assuming there is exactly one wildcard for efficiency.
|
||||
rwFields := make(Fields, 0, len(s.Fields)+len(fields)-1)
|
||||
for _, f := range s.Fields {
|
||||
switch f.Expr.(type) {
|
||||
switch expr := f.Expr.(type) {
|
||||
case *Wildcard:
|
||||
for _, name := range fields {
|
||||
rwFields = append(rwFields, &Field{Expr: &VarRef{Val: name}})
|
||||
for _, ref := range fields {
|
||||
if expr.Type == FIELD && ref.Type == Tag {
|
||||
continue
|
||||
} else if expr.Type == TAG && ref.Type != Tag {
|
||||
continue
|
||||
}
|
||||
rwFields = append(rwFields, &Field{Expr: &VarRef{Val: ref.Val, Type: ref.Type}})
|
||||
}
|
||||
default:
|
||||
rwFields = append(rwFields, f)
|
||||
|
@ -1874,15 +1917,13 @@ func walkNames(exp Expr) []string {
|
|||
case *VarRef:
|
||||
return []string{expr.Val}
|
||||
case *Call:
|
||||
if len(expr.Args) == 0 {
|
||||
return nil
|
||||
var a []string
|
||||
for _, expr := range expr.Args {
|
||||
if ref, ok := expr.(*VarRef); ok {
|
||||
a = append(a, ref.Val)
|
||||
}
|
||||
}
|
||||
lit, ok := expr.Args[0].(*VarRef)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
return []string{lit.Val}
|
||||
return a
|
||||
case *BinaryExpr:
|
||||
var ret []string
|
||||
ret = append(ret, walkNames(expr.LHS)...)
|
||||
|
@ -1895,21 +1936,46 @@ func walkNames(exp Expr) []string {
|
|||
return nil
|
||||
}
|
||||
|
||||
// ExprNames returns a list of non-"time" field names from an expression.
|
||||
func ExprNames(expr Expr) []string {
|
||||
m := make(map[string]struct{})
|
||||
for _, name := range walkNames(expr) {
|
||||
if name == "time" {
|
||||
continue
|
||||
// walkRefs will walk the Expr and return the database fields
|
||||
func walkRefs(exp Expr) []VarRef {
|
||||
switch expr := exp.(type) {
|
||||
case *VarRef:
|
||||
return []VarRef{*expr}
|
||||
case *Call:
|
||||
var a []VarRef
|
||||
for _, expr := range expr.Args {
|
||||
if ref, ok := expr.(*VarRef); ok {
|
||||
a = append(a, *ref)
|
||||
}
|
||||
}
|
||||
m[name] = struct{}{}
|
||||
return a
|
||||
case *BinaryExpr:
|
||||
var ret []VarRef
|
||||
ret = append(ret, walkRefs(expr.LHS)...)
|
||||
ret = append(ret, walkRefs(expr.RHS)...)
|
||||
return ret
|
||||
case *ParenExpr:
|
||||
return walkRefs(expr.Expr)
|
||||
}
|
||||
|
||||
a := make([]string, 0, len(m))
|
||||
return nil
|
||||
}
|
||||
|
||||
// ExprNames returns a list of non-"time" field names from an expression.
|
||||
func ExprNames(expr Expr) []VarRef {
|
||||
m := make(map[VarRef]struct{})
|
||||
for _, ref := range walkRefs(expr) {
|
||||
if ref.Val == "time" {
|
||||
continue
|
||||
}
|
||||
m[ref] = struct{}{}
|
||||
}
|
||||
|
||||
a := make([]VarRef, 0, len(m))
|
||||
for k := range m {
|
||||
a = append(a, k)
|
||||
}
|
||||
sort.Strings(a)
|
||||
sort.Sort(VarRefs(a))
|
||||
|
||||
return a
|
||||
}
|
||||
|
@ -2957,12 +3023,36 @@ func decodeMeasurement(pb *internal.Measurement) (*Measurement, error) {
|
|||
|
||||
// VarRef represents a reference to a variable.
|
||||
type VarRef struct {
|
||||
Val string
|
||||
Val string
|
||||
Type DataType
|
||||
}
|
||||
|
||||
// String returns a string representation of the variable reference.
|
||||
func (r *VarRef) String() string {
|
||||
return QuoteIdent(r.Val)
|
||||
buf := bytes.NewBufferString(QuoteIdent(r.Val))
|
||||
if r.Type != Unknown {
|
||||
buf.WriteString("::")
|
||||
buf.WriteString(r.Type.String())
|
||||
}
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
type VarRefs []VarRef
|
||||
|
||||
func (a VarRefs) Len() int { return len(a) }
|
||||
func (a VarRefs) Less(i, j int) bool {
|
||||
if a[i].Val != a[j].Val {
|
||||
return a[i].Val < a[j].Val
|
||||
}
|
||||
return a[i].Type < a[j].Type
|
||||
}
|
||||
func (a VarRefs) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a VarRefs) Strings() []string {
|
||||
s := make([]string, len(a))
|
||||
for i, ref := range a {
|
||||
s[i] = ref.Val
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// Call represents a function call.
|
||||
|
@ -3238,10 +3328,21 @@ func CloneRegexLiteral(r *RegexLiteral) *RegexLiteral {
|
|||
}
|
||||
|
||||
// Wildcard represents a wild card expression.
|
||||
type Wildcard struct{}
|
||||
type Wildcard struct {
|
||||
Type Token
|
||||
}
|
||||
|
||||
// String returns a string representation of the wildcard.
|
||||
func (e *Wildcard) String() string { return "*" }
|
||||
func (e *Wildcard) String() string {
|
||||
switch e.Type {
|
||||
case FIELD:
|
||||
return "*::field"
|
||||
case TAG:
|
||||
return "*::tag"
|
||||
default:
|
||||
return "*"
|
||||
}
|
||||
}
|
||||
|
||||
// CloneExpr returns a deep copy of the expression.
|
||||
func CloneExpr(expr Expr) Expr {
|
||||
|
@ -3276,7 +3377,7 @@ func CloneExpr(expr Expr) Expr {
|
|||
case *TimeLiteral:
|
||||
return &TimeLiteral{Val: expr.Val}
|
||||
case *VarRef:
|
||||
return &VarRef{Val: expr.Val}
|
||||
return &VarRef{Val: expr.Val, Type: expr.Type}
|
||||
case *Wildcard:
|
||||
return &Wildcard{}
|
||||
}
|
||||
|
@ -4180,14 +4281,14 @@ func reduceParenExpr(expr *ParenExpr, valuer Valuer) Expr {
|
|||
func reduceVarRef(expr *VarRef, valuer Valuer) Expr {
|
||||
// Ignore if there is no valuer.
|
||||
if valuer == nil {
|
||||
return &VarRef{Val: expr.Val}
|
||||
return &VarRef{Val: expr.Val, Type: expr.Type}
|
||||
}
|
||||
|
||||
// Retrieve the value of the ref.
|
||||
// Ignore if the value doesn't exist.
|
||||
v, ok := valuer.Value(expr.Val)
|
||||
if !ok {
|
||||
return &VarRef{Val: expr.Val}
|
||||
return &VarRef{Val: expr.Val, Type: expr.Type}
|
||||
}
|
||||
|
||||
// Return the value as a literal.
|
||||
|
|
|
@ -44,6 +44,7 @@ func TestDataType_String(t *testing.T) {
|
|||
{influxql.String, "string"},
|
||||
{influxql.Time, "time"},
|
||||
{influxql.Duration, "duration"},
|
||||
{influxql.Tag, "tag"},
|
||||
{influxql.Unknown, "unknown"},
|
||||
} {
|
||||
if v := tt.typ.String(); tt.v != v {
|
||||
|
@ -334,8 +335,8 @@ func TestSelectStatement_HasWildcard(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Test SELECT statement wildcard rewrite.
|
||||
func TestSelectStatement_RewriteWildcards(t *testing.T) {
|
||||
// Test SELECT statement field rewrite.
|
||||
func TestSelectStatement_RewriteFields(t *testing.T) {
|
||||
var tests = []struct {
|
||||
stmt string
|
||||
rewrite string
|
||||
|
@ -349,7 +350,7 @@ func TestSelectStatement_RewriteWildcards(t *testing.T) {
|
|||
// Query wildcard
|
||||
{
|
||||
stmt: `SELECT * FROM cpu`,
|
||||
rewrite: `SELECT host, region, value1, value2 FROM cpu`,
|
||||
rewrite: `SELECT host::tag, region::tag, value1::float, value2::integer FROM cpu`,
|
||||
},
|
||||
|
||||
// Parser fundamentally prohibits multiple query sources
|
||||
|
@ -357,19 +358,19 @@ func TestSelectStatement_RewriteWildcards(t *testing.T) {
|
|||
// Query wildcard with explicit
|
||||
{
|
||||
stmt: `SELECT *,value1 FROM cpu`,
|
||||
rewrite: `SELECT host, region, value1, value2, value1 FROM cpu`,
|
||||
rewrite: `SELECT host::tag, region::tag, value1::float, value2::integer, value1::float FROM cpu`,
|
||||
},
|
||||
|
||||
// Query multiple wildcards
|
||||
{
|
||||
stmt: `SELECT *,* FROM cpu`,
|
||||
rewrite: `SELECT host, region, value1, value2, host, region, value1, value2 FROM cpu`,
|
||||
rewrite: `SELECT host::tag, region::tag, value1::float, value2::integer, host::tag, region::tag, value1::float, value2::integer FROM cpu`,
|
||||
},
|
||||
|
||||
// Query wildcards with group by
|
||||
{
|
||||
stmt: `SELECT * FROM cpu GROUP BY host`,
|
||||
rewrite: `SELECT region, value1, value2 FROM cpu GROUP BY host`,
|
||||
rewrite: `SELECT region::tag, value1::float, value2::integer FROM cpu GROUP BY host`,
|
||||
},
|
||||
|
||||
// No GROUP BY wildcards
|
||||
|
@ -396,7 +397,7 @@ func TestSelectStatement_RewriteWildcards(t *testing.T) {
|
|||
rewrite: `SELECT mean(value) FROM cpu WHERE time < now() GROUP BY host, region, time(1m)`,
|
||||
},
|
||||
|
||||
// GROUP BY wildarde with fill
|
||||
// GROUP BY wildcard with fill
|
||||
{
|
||||
stmt: `SELECT mean(value) FROM cpu where time < now() GROUP BY *,time(1m) fill(0)`,
|
||||
rewrite: `SELECT mean(value) FROM cpu WHERE time < now() GROUP BY host, region, time(1m) fill(0)`,
|
||||
|
@ -417,7 +418,7 @@ func TestSelectStatement_RewriteWildcards(t *testing.T) {
|
|||
// Combo
|
||||
{
|
||||
stmt: `SELECT * FROM cpu GROUP BY *`,
|
||||
rewrite: `SELECT value1, value2 FROM cpu GROUP BY host, region`,
|
||||
rewrite: `SELECT value1::float, value2::integer FROM cpu GROUP BY host, region`,
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -429,14 +430,14 @@ func TestSelectStatement_RewriteWildcards(t *testing.T) {
|
|||
}
|
||||
|
||||
var ic IteratorCreator
|
||||
ic.FieldDimensionsFn = func(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) {
|
||||
fields = map[string]struct{}{"value1": struct{}{}, "value2": struct{}{}}
|
||||
ic.FieldDimensionsFn = func(sources influxql.Sources) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) {
|
||||
fields = map[string]influxql.DataType{"value1": influxql.Float, "value2": influxql.Integer}
|
||||
dimensions = map[string]struct{}{"host": struct{}{}, "region": struct{}{}}
|
||||
return
|
||||
}
|
||||
|
||||
// Rewrite statement.
|
||||
rw, err := stmt.(*influxql.SelectStatement).RewriteWildcards(&ic)
|
||||
rw, err := stmt.(*influxql.SelectStatement).RewriteFields(&ic)
|
||||
if err != nil {
|
||||
t.Errorf("%d. %q: error: %s", i, tt.stmt, err)
|
||||
} else if rw == nil {
|
||||
|
|
|
@ -18,6 +18,7 @@ It has these top-level messages:
|
|||
IteratorStats
|
||||
Series
|
||||
SeriesList
|
||||
VarRef
|
||||
*/
|
||||
package influxql
|
||||
|
||||
|
@ -30,10 +31,6 @@ var _ = proto.Marshal
|
|||
var _ = fmt.Errorf
|
||||
var _ = math.Inf
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the proto package it is being compiled against.
|
||||
const _ = proto.GoGoProtoPackageIsVersion1
|
||||
|
||||
type Point struct {
|
||||
Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"`
|
||||
Tags *string `protobuf:"bytes,2,req,name=Tags" json:"Tags,omitempty"`
|
||||
|
@ -49,10 +46,9 @@ type Point struct {
|
|||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *Point) Reset() { *m = Point{} }
|
||||
func (m *Point) String() string { return proto.CompactTextString(m) }
|
||||
func (*Point) ProtoMessage() {}
|
||||
func (*Point) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{0} }
|
||||
func (m *Point) Reset() { *m = Point{} }
|
||||
func (m *Point) String() string { return proto.CompactTextString(m) }
|
||||
func (*Point) ProtoMessage() {}
|
||||
|
||||
func (m *Point) GetName() string {
|
||||
if m != nil && m.Name != nil {
|
||||
|
@ -140,10 +136,9 @@ type Aux struct {
|
|||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *Aux) Reset() { *m = Aux{} }
|
||||
func (m *Aux) String() string { return proto.CompactTextString(m) }
|
||||
func (*Aux) ProtoMessage() {}
|
||||
func (*Aux) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{1} }
|
||||
func (m *Aux) Reset() { *m = Aux{} }
|
||||
func (m *Aux) String() string { return proto.CompactTextString(m) }
|
||||
func (*Aux) ProtoMessage() {}
|
||||
|
||||
func (m *Aux) GetDataType() int32 {
|
||||
if m != nil && m.DataType != nil {
|
||||
|
@ -183,6 +178,7 @@ func (m *Aux) GetBooleanValue() bool {
|
|||
type IteratorOptions struct {
|
||||
Expr *string `protobuf:"bytes,1,opt,name=Expr" json:"Expr,omitempty"`
|
||||
Aux []string `protobuf:"bytes,2,rep,name=Aux" json:"Aux,omitempty"`
|
||||
Fields []*VarRef `protobuf:"bytes,17,rep,name=Fields" json:"Fields,omitempty"`
|
||||
Sources []*Measurement `protobuf:"bytes,3,rep,name=Sources" json:"Sources,omitempty"`
|
||||
Interval *Interval `protobuf:"bytes,4,opt,name=Interval" json:"Interval,omitempty"`
|
||||
Dimensions []string `protobuf:"bytes,5,rep,name=Dimensions" json:"Dimensions,omitempty"`
|
||||
|
@ -200,10 +196,9 @@ type IteratorOptions struct {
|
|||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *IteratorOptions) Reset() { *m = IteratorOptions{} }
|
||||
func (m *IteratorOptions) String() string { return proto.CompactTextString(m) }
|
||||
func (*IteratorOptions) ProtoMessage() {}
|
||||
func (*IteratorOptions) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{2} }
|
||||
func (m *IteratorOptions) Reset() { *m = IteratorOptions{} }
|
||||
func (m *IteratorOptions) String() string { return proto.CompactTextString(m) }
|
||||
func (*IteratorOptions) ProtoMessage() {}
|
||||
|
||||
func (m *IteratorOptions) GetExpr() string {
|
||||
if m != nil && m.Expr != nil {
|
||||
|
@ -219,6 +214,13 @@ func (m *IteratorOptions) GetAux() []string {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m *IteratorOptions) GetFields() []*VarRef {
|
||||
if m != nil {
|
||||
return m.Fields
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *IteratorOptions) GetSources() []*Measurement {
|
||||
if m != nil {
|
||||
return m.Sources
|
||||
|
@ -322,10 +324,9 @@ type Measurements struct {
|
|||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *Measurements) Reset() { *m = Measurements{} }
|
||||
func (m *Measurements) String() string { return proto.CompactTextString(m) }
|
||||
func (*Measurements) ProtoMessage() {}
|
||||
func (*Measurements) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{3} }
|
||||
func (m *Measurements) Reset() { *m = Measurements{} }
|
||||
func (m *Measurements) String() string { return proto.CompactTextString(m) }
|
||||
func (*Measurements) ProtoMessage() {}
|
||||
|
||||
func (m *Measurements) GetItems() []*Measurement {
|
||||
if m != nil {
|
||||
|
@ -343,10 +344,9 @@ type Measurement struct {
|
|||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *Measurement) Reset() { *m = Measurement{} }
|
||||
func (m *Measurement) String() string { return proto.CompactTextString(m) }
|
||||
func (*Measurement) ProtoMessage() {}
|
||||
func (*Measurement) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{4} }
|
||||
func (m *Measurement) Reset() { *m = Measurement{} }
|
||||
func (m *Measurement) String() string { return proto.CompactTextString(m) }
|
||||
func (*Measurement) ProtoMessage() {}
|
||||
|
||||
func (m *Measurement) GetDatabase() string {
|
||||
if m != nil && m.Database != nil {
|
||||
|
@ -389,10 +389,9 @@ type Interval struct {
|
|||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *Interval) Reset() { *m = Interval{} }
|
||||
func (m *Interval) String() string { return proto.CompactTextString(m) }
|
||||
func (*Interval) ProtoMessage() {}
|
||||
func (*Interval) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{5} }
|
||||
func (m *Interval) Reset() { *m = Interval{} }
|
||||
func (m *Interval) String() string { return proto.CompactTextString(m) }
|
||||
func (*Interval) ProtoMessage() {}
|
||||
|
||||
func (m *Interval) GetDuration() int64 {
|
||||
if m != nil && m.Duration != nil {
|
||||
|
@ -414,10 +413,9 @@ type IteratorStats struct {
|
|||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *IteratorStats) Reset() { *m = IteratorStats{} }
|
||||
func (m *IteratorStats) String() string { return proto.CompactTextString(m) }
|
||||
func (*IteratorStats) ProtoMessage() {}
|
||||
func (*IteratorStats) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{6} }
|
||||
func (m *IteratorStats) Reset() { *m = IteratorStats{} }
|
||||
func (m *IteratorStats) String() string { return proto.CompactTextString(m) }
|
||||
func (*IteratorStats) ProtoMessage() {}
|
||||
|
||||
func (m *IteratorStats) GetSeriesN() int64 {
|
||||
if m != nil && m.SeriesN != nil {
|
||||
|
@ -440,10 +438,9 @@ type Series struct {
|
|||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *Series) Reset() { *m = Series{} }
|
||||
func (m *Series) String() string { return proto.CompactTextString(m) }
|
||||
func (*Series) ProtoMessage() {}
|
||||
func (*Series) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{7} }
|
||||
func (m *Series) Reset() { *m = Series{} }
|
||||
func (m *Series) String() string { return proto.CompactTextString(m) }
|
||||
func (*Series) ProtoMessage() {}
|
||||
|
||||
func (m *Series) GetName() string {
|
||||
if m != nil && m.Name != nil {
|
||||
|
@ -471,10 +468,9 @@ type SeriesList struct {
|
|||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *SeriesList) Reset() { *m = SeriesList{} }
|
||||
func (m *SeriesList) String() string { return proto.CompactTextString(m) }
|
||||
func (*SeriesList) ProtoMessage() {}
|
||||
func (*SeriesList) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{8} }
|
||||
func (m *SeriesList) Reset() { *m = SeriesList{} }
|
||||
func (m *SeriesList) String() string { return proto.CompactTextString(m) }
|
||||
func (*SeriesList) ProtoMessage() {}
|
||||
|
||||
func (m *SeriesList) GetItems() []*Series {
|
||||
if m != nil {
|
||||
|
@ -483,6 +479,30 @@ func (m *SeriesList) GetItems() []*Series {
|
|||
return nil
|
||||
}
|
||||
|
||||
type VarRef struct {
|
||||
Val *string `protobuf:"bytes,1,req,name=Val" json:"Val,omitempty"`
|
||||
Type *int32 `protobuf:"varint,2,opt,name=Type" json:"Type,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *VarRef) Reset() { *m = VarRef{} }
|
||||
func (m *VarRef) String() string { return proto.CompactTextString(m) }
|
||||
func (*VarRef) ProtoMessage() {}
|
||||
|
||||
func (m *VarRef) GetVal() string {
|
||||
if m != nil && m.Val != nil {
|
||||
return *m.Val
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *VarRef) GetType() int32 {
|
||||
if m != nil && m.Type != nil {
|
||||
return *m.Type
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterType((*Point)(nil), "influxql.Point")
|
||||
proto.RegisterType((*Aux)(nil), "influxql.Aux")
|
||||
|
@ -493,44 +513,5 @@ func init() {
|
|||
proto.RegisterType((*IteratorStats)(nil), "influxql.IteratorStats")
|
||||
proto.RegisterType((*Series)(nil), "influxql.Series")
|
||||
proto.RegisterType((*SeriesList)(nil), "influxql.SeriesList")
|
||||
}
|
||||
|
||||
var fileDescriptorInternal = []byte{
|
||||
// 569 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x74, 0x53, 0xdf, 0x6e, 0xda, 0x3c,
|
||||
0x14, 0x57, 0x48, 0x53, 0x92, 0x13, 0x52, 0xf8, 0xfc, 0x6d, 0x22, 0xda, 0xcd, 0x50, 0x34, 0x4d,
|
||||
0x5c, 0x6c, 0x6c, 0x43, 0x7b, 0x01, 0x36, 0x5a, 0x09, 0xa9, 0xa3, 0x55, 0x41, 0xbb, 0xf7, 0xc0,
|
||||
0x44, 0x96, 0x4c, 0xcc, 0x6c, 0x67, 0xa2, 0x8f, 0xb8, 0x67, 0xd9, 0x4b, 0xec, 0xd8, 0x49, 0x0a,
|
||||
0x54, 0xec, 0x2e, 0xe7, 0x77, 0x8e, 0x73, 0x7e, 0x7f, 0x6c, 0xe8, 0xf3, 0xc2, 0x30, 0x55, 0x50,
|
||||
0xf1, 0xa1, 0xf9, 0x18, 0xed, 0x94, 0x34, 0x92, 0x84, 0xbc, 0xd8, 0x88, 0x72, 0xff, 0x53, 0x64,
|
||||
0x7f, 0x3c, 0x08, 0xee, 0x25, 0xb6, 0x49, 0x07, 0x2e, 0xe6, 0x74, 0xcb, 0x52, 0x6f, 0xd0, 0x1a,
|
||||
0x46, 0xb6, 0x5a, 0xd2, 0x5c, 0xa7, 0xad, 0xa7, 0x8a, 0x63, 0xcf, 0xc7, 0xca, 0x27, 0x31, 0xf8,
|
||||
0x73, 0x2e, 0xd2, 0x0b, 0x2c, 0x42, 0xf2, 0x0a, 0xfc, 0x49, 0xb9, 0x4f, 0x83, 0x81, 0x3f, 0x8c,
|
||||
0xc7, 0xc9, 0xa8, 0xf9, 0xf1, 0x08, 0x41, 0x42, 0x00, 0x26, 0x79, 0xae, 0x58, 0x4e, 0x0d, 0x5b,
|
||||
0xa7, 0x97, 0x03, 0x6f, 0x98, 0x58, 0xec, 0x46, 0x48, 0x6a, 0xbe, 0x53, 0x51, 0xb2, 0xb4, 0x8d,
|
||||
0x98, 0x47, 0x5e, 0x40, 0x67, 0x86, 0x04, 0x73, 0xa6, 0x2a, 0x34, 0x44, 0xd4, 0x27, 0xff, 0x43,
|
||||
0xbc, 0x30, 0x8a, 0x17, 0x79, 0x05, 0x46, 0x08, 0x46, 0x76, 0xf4, 0x8b, 0x94, 0x82, 0xd1, 0xa2,
|
||||
0x42, 0x01, 0xd1, 0x90, 0xbc, 0x85, 0x60, 0x61, 0xa8, 0xd1, 0x69, 0x8c, 0x65, 0x3c, 0xee, 0x1f,
|
||||
0x68, 0xcc, 0x50, 0x37, 0x35, 0x52, 0xb9, 0x76, 0x26, 0x1c, 0x59, 0xd2, 0x83, 0x70, 0x4a, 0x0d,
|
||||
0x5d, 0x3e, 0xee, 0x2a, 0xb9, 0xc1, 0x33, 0x56, 0xad, 0xb3, 0xac, 0xfc, 0x73, 0xac, 0x2e, 0xce,
|
||||
0xb2, 0x0a, 0x2c, 0xab, 0xec, 0x77, 0x0b, 0xba, 0xcd, 0xfe, 0xbb, 0x9d, 0xe1, 0xb2, 0xd0, 0xd6,
|
||||
0xc9, 0xeb, 0xfd, 0x4e, 0xe1, 0x5a, 0x7b, 0x2e, 0xae, 0xcc, 0x6b, 0xa1, 0x79, 0x11, 0x8a, 0x68,
|
||||
0x2f, 0x64, 0xa9, 0x56, 0x4c, 0xe3, 0x2a, 0xeb, 0xe6, 0xcb, 0x83, 0x8c, 0x6f, 0x8c, 0xea, 0x52,
|
||||
0xb1, 0x2d, 0xc3, 0xa0, 0xde, 0x40, 0x68, 0x79, 0xa9, 0x5f, 0x54, 0xb8, 0xf5, 0xf1, 0x98, 0x1c,
|
||||
0xe9, 0xad, 0x3b, 0x56, 0xd1, 0x14, 0x23, 0x2b, 0xb4, 0x5d, 0xeb, 0xe2, 0x71, 0x31, 0xde, 0x70,
|
||||
0x21, 0x5c, 0x12, 0x01, 0xf9, 0x0f, 0x22, 0x5b, 0x1d, 0x07, 0x81, 0xd0, 0x57, 0x59, 0xac, 0xb9,
|
||||
0xe5, 0xea, 0x52, 0x88, 0x2c, 0x84, 0xde, 0x29, 0xe3, 0xf2, 0x8f, 0x9c, 0x05, 0x5d, 0x68, 0x5f,
|
||||
0x17, 0x6b, 0x07, 0x80, 0x03, 0x70, 0x66, 0xa2, 0x57, 0x0c, 0x0f, 0x16, 0xb9, 0x8b, 0x20, 0x24,
|
||||
0x09, 0x04, 0xb7, 0x7c, 0xcb, 0x4d, 0xda, 0x71, 0x13, 0x57, 0x70, 0x79, 0xb7, 0xd9, 0x68, 0x66,
|
||||
0xd2, 0xa4, 0xa9, 0x17, 0x55, 0xff, 0xaa, 0xf9, 0xe5, 0xa2, 0x1e, 0xe8, 0x36, 0x03, 0x53, 0xb6,
|
||||
0x2e, 0x31, 0xa0, 0x9e, 0xf3, 0xf2, 0x33, 0x74, 0x8e, 0x3c, 0xd0, 0x68, 0x42, 0x80, 0xd6, 0x6e,
|
||||
0x35, 0x1a, 0xf9, 0x6f, 0xab, 0xb2, 0x1c, 0xe2, 0x63, 0xe7, 0xea, 0xdc, 0x7f, 0x50, 0xcd, 0xea,
|
||||
0x00, 0xfa, 0xd0, 0x7d, 0x60, 0x06, 0x7b, 0x28, 0xf8, 0x5e, 0x0a, 0xbe, 0x7a, 0x74, 0xe1, 0x47,
|
||||
0x4f, 0xaf, 0xc1, 0x77, 0x15, 0xaa, 0x79, 0xc0, 0x8b, 0xb0, 0xaf, 0xe3, 0xc6, 0xff, 0xcc, 0xf4,
|
||||
0x92, 0xaa, 0x1c, 0xe9, 0x56, 0x51, 0xbf, 0x3b, 0x64, 0xe2, 0xb6, 0x94, 0x18, 0xba, 0xf5, 0xd0,
|
||||
0x7b, 0xa6, 0xde, 0xfe, 0xdc, 0xcf, 0x3e, 0x42, 0x72, 0x72, 0x2f, 0x9d, 0x7c, 0xa6, 0x38, 0xd3,
|
||||
0xf3, 0xc3, 0x09, 0xf7, 0x2a, 0xe7, 0xf5, 0x89, 0x4f, 0xe8, 0x97, 0x1b, 0x38, 0x7a, 0xa6, 0xde,
|
||||
0xc9, 0x33, 0xf5, 0x86, 0x9d, 0xe6, 0x3a, 0xd9, 0xdb, 0x93, 0x64, 0xef, 0x01, 0xaa, 0x23, 0xb7,
|
||||
0x5c, 0x1b, 0xf2, 0xfa, 0xd4, 0xaf, 0xde, 0xc1, 0xaf, 0x6a, 0xe8, 0x6f, 0x00, 0x00, 0x00, 0xff,
|
||||
0xff, 0xe3, 0x58, 0x08, 0xa6, 0x2c, 0x04, 0x00, 0x00,
|
||||
proto.RegisterType((*VarRef)(nil), "influxql.VarRef")
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ message Aux {
|
|||
message IteratorOptions {
|
||||
optional string Expr = 1;
|
||||
repeated string Aux = 2;
|
||||
repeated VarRef Fields = 17;
|
||||
repeated Measurement Sources = 3;
|
||||
optional Interval Interval = 4;
|
||||
repeated string Dimensions = 5;
|
||||
|
@ -74,3 +75,8 @@ message Series {
|
|||
message SeriesList {
|
||||
repeated Series Items = 1;
|
||||
}
|
||||
|
||||
message VarRef {
|
||||
required string Val = 1;
|
||||
optional int32 Type = 2;
|
||||
}
|
||||
|
|
|
@ -764,11 +764,11 @@ type floatAuxIterator struct {
|
|||
background bool
|
||||
}
|
||||
|
||||
func newFloatAuxIterator(input FloatIterator, seriesKeys SeriesList, opt IteratorOptions) *floatAuxIterator {
|
||||
func newFloatAuxIterator(input FloatIterator, opt IteratorOptions) *floatAuxIterator {
|
||||
return &floatAuxIterator{
|
||||
input: newBufFloatIterator(input),
|
||||
output: make(chan auxFloatPoint, 1),
|
||||
fields: newAuxIteratorFields(seriesKeys, opt),
|
||||
fields: newAuxIteratorFields(opt),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -785,7 +785,9 @@ func (itr *floatAuxIterator) Next() (*FloatPoint, error) {
|
|||
p := <-itr.output
|
||||
return p.point, p.err
|
||||
}
|
||||
func (itr *floatAuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) }
|
||||
func (itr *floatAuxIterator) Iterator(name string, typ DataType) Iterator {
|
||||
return itr.fields.iterator(name, typ)
|
||||
}
|
||||
|
||||
func (itr *floatAuxIterator) CreateIterator(opt IteratorOptions) (Iterator, error) {
|
||||
expr := opt.Expr
|
||||
|
@ -795,20 +797,16 @@ func (itr *floatAuxIterator) CreateIterator(opt IteratorOptions) (Iterator, erro
|
|||
|
||||
switch expr := expr.(type) {
|
||||
case *VarRef:
|
||||
return itr.Iterator(expr.Val), nil
|
||||
return itr.Iterator(expr.Val, expr.Type), nil
|
||||
default:
|
||||
panic(fmt.Sprintf("invalid expression type for an aux iterator: %T", expr))
|
||||
}
|
||||
}
|
||||
|
||||
func (itr *floatAuxIterator) FieldDimensions(sources Sources) (fields, dimensions map[string]struct{}, err error) {
|
||||
func (itr *floatAuxIterator) FieldDimensions(sources Sources) (fields map[string]DataType, dimensions map[string]struct{}, err error) {
|
||||
return nil, nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (itr *floatAuxIterator) SeriesKeys(opt IteratorOptions) (SeriesList, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (itr *floatAuxIterator) ExpandSources(sources Sources) (Sources, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
@ -2772,11 +2770,11 @@ type integerAuxIterator struct {
|
|||
background bool
|
||||
}
|
||||
|
||||
func newIntegerAuxIterator(input IntegerIterator, seriesKeys SeriesList, opt IteratorOptions) *integerAuxIterator {
|
||||
func newIntegerAuxIterator(input IntegerIterator, opt IteratorOptions) *integerAuxIterator {
|
||||
return &integerAuxIterator{
|
||||
input: newBufIntegerIterator(input),
|
||||
output: make(chan auxIntegerPoint, 1),
|
||||
fields: newAuxIteratorFields(seriesKeys, opt),
|
||||
fields: newAuxIteratorFields(opt),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2793,7 +2791,9 @@ func (itr *integerAuxIterator) Next() (*IntegerPoint, error) {
|
|||
p := <-itr.output
|
||||
return p.point, p.err
|
||||
}
|
||||
func (itr *integerAuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) }
|
||||
func (itr *integerAuxIterator) Iterator(name string, typ DataType) Iterator {
|
||||
return itr.fields.iterator(name, typ)
|
||||
}
|
||||
|
||||
func (itr *integerAuxIterator) CreateIterator(opt IteratorOptions) (Iterator, error) {
|
||||
expr := opt.Expr
|
||||
|
@ -2803,20 +2803,16 @@ func (itr *integerAuxIterator) CreateIterator(opt IteratorOptions) (Iterator, er
|
|||
|
||||
switch expr := expr.(type) {
|
||||
case *VarRef:
|
||||
return itr.Iterator(expr.Val), nil
|
||||
return itr.Iterator(expr.Val, expr.Type), nil
|
||||
default:
|
||||
panic(fmt.Sprintf("invalid expression type for an aux iterator: %T", expr))
|
||||
}
|
||||
}
|
||||
|
||||
func (itr *integerAuxIterator) FieldDimensions(sources Sources) (fields, dimensions map[string]struct{}, err error) {
|
||||
func (itr *integerAuxIterator) FieldDimensions(sources Sources) (fields map[string]DataType, dimensions map[string]struct{}, err error) {
|
||||
return nil, nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (itr *integerAuxIterator) SeriesKeys(opt IteratorOptions) (SeriesList, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (itr *integerAuxIterator) ExpandSources(sources Sources) (Sources, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
@ -4777,11 +4773,11 @@ type stringAuxIterator struct {
|
|||
background bool
|
||||
}
|
||||
|
||||
func newStringAuxIterator(input StringIterator, seriesKeys SeriesList, opt IteratorOptions) *stringAuxIterator {
|
||||
func newStringAuxIterator(input StringIterator, opt IteratorOptions) *stringAuxIterator {
|
||||
return &stringAuxIterator{
|
||||
input: newBufStringIterator(input),
|
||||
output: make(chan auxStringPoint, 1),
|
||||
fields: newAuxIteratorFields(seriesKeys, opt),
|
||||
fields: newAuxIteratorFields(opt),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4798,7 +4794,9 @@ func (itr *stringAuxIterator) Next() (*StringPoint, error) {
|
|||
p := <-itr.output
|
||||
return p.point, p.err
|
||||
}
|
||||
func (itr *stringAuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) }
|
||||
func (itr *stringAuxIterator) Iterator(name string, typ DataType) Iterator {
|
||||
return itr.fields.iterator(name, typ)
|
||||
}
|
||||
|
||||
func (itr *stringAuxIterator) CreateIterator(opt IteratorOptions) (Iterator, error) {
|
||||
expr := opt.Expr
|
||||
|
@ -4808,20 +4806,16 @@ func (itr *stringAuxIterator) CreateIterator(opt IteratorOptions) (Iterator, err
|
|||
|
||||
switch expr := expr.(type) {
|
||||
case *VarRef:
|
||||
return itr.Iterator(expr.Val), nil
|
||||
return itr.Iterator(expr.Val, expr.Type), nil
|
||||
default:
|
||||
panic(fmt.Sprintf("invalid expression type for an aux iterator: %T", expr))
|
||||
}
|
||||
}
|
||||
|
||||
func (itr *stringAuxIterator) FieldDimensions(sources Sources) (fields, dimensions map[string]struct{}, err error) {
|
||||
func (itr *stringAuxIterator) FieldDimensions(sources Sources) (fields map[string]DataType, dimensions map[string]struct{}, err error) {
|
||||
return nil, nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (itr *stringAuxIterator) SeriesKeys(opt IteratorOptions) (SeriesList, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (itr *stringAuxIterator) ExpandSources(sources Sources) (Sources, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
@ -6782,11 +6776,11 @@ type booleanAuxIterator struct {
|
|||
background bool
|
||||
}
|
||||
|
||||
func newBooleanAuxIterator(input BooleanIterator, seriesKeys SeriesList, opt IteratorOptions) *booleanAuxIterator {
|
||||
func newBooleanAuxIterator(input BooleanIterator, opt IteratorOptions) *booleanAuxIterator {
|
||||
return &booleanAuxIterator{
|
||||
input: newBufBooleanIterator(input),
|
||||
output: make(chan auxBooleanPoint, 1),
|
||||
fields: newAuxIteratorFields(seriesKeys, opt),
|
||||
fields: newAuxIteratorFields(opt),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6803,7 +6797,9 @@ func (itr *booleanAuxIterator) Next() (*BooleanPoint, error) {
|
|||
p := <-itr.output
|
||||
return p.point, p.err
|
||||
}
|
||||
func (itr *booleanAuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) }
|
||||
func (itr *booleanAuxIterator) Iterator(name string, typ DataType) Iterator {
|
||||
return itr.fields.iterator(name, typ)
|
||||
}
|
||||
|
||||
func (itr *booleanAuxIterator) CreateIterator(opt IteratorOptions) (Iterator, error) {
|
||||
expr := opt.Expr
|
||||
|
@ -6813,20 +6809,16 @@ func (itr *booleanAuxIterator) CreateIterator(opt IteratorOptions) (Iterator, er
|
|||
|
||||
switch expr := expr.(type) {
|
||||
case *VarRef:
|
||||
return itr.Iterator(expr.Val), nil
|
||||
return itr.Iterator(expr.Val, expr.Type), nil
|
||||
default:
|
||||
panic(fmt.Sprintf("invalid expression type for an aux iterator: %T", expr))
|
||||
}
|
||||
}
|
||||
|
||||
func (itr *booleanAuxIterator) FieldDimensions(sources Sources) (fields, dimensions map[string]struct{}, err error) {
|
||||
func (itr *booleanAuxIterator) FieldDimensions(sources Sources) (fields map[string]DataType, dimensions map[string]struct{}, err error) {
|
||||
return nil, nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (itr *booleanAuxIterator) SeriesKeys(opt IteratorOptions) (SeriesList, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (itr *booleanAuxIterator) ExpandSources(sources Sources) (Sources, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
|
|
@ -762,11 +762,11 @@ type {{$k.name}}AuxIterator struct {
|
|||
background bool
|
||||
}
|
||||
|
||||
func new{{$k.Name}}AuxIterator(input {{$k.Name}}Iterator, seriesKeys SeriesList, opt IteratorOptions) *{{$k.name}}AuxIterator {
|
||||
func new{{$k.Name}}AuxIterator(input {{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}AuxIterator {
|
||||
return &{{$k.name}}AuxIterator{
|
||||
input: newBuf{{$k.Name}}Iterator(input),
|
||||
output: make(chan aux{{$k.Name}}Point, 1),
|
||||
fields: newAuxIteratorFields(seriesKeys, opt),
|
||||
fields: newAuxIteratorFields(opt),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -783,7 +783,7 @@ func (itr *{{$k.name}}AuxIterator) Next() (*{{$k.Name}}Point, error) {
|
|||
p := <-itr.output
|
||||
return p.point, p.err
|
||||
}
|
||||
func (itr *{{$k.name}}AuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) }
|
||||
func (itr *{{$k.name}}AuxIterator) Iterator(name string, typ DataType) Iterator { return itr.fields.iterator(name, typ) }
|
||||
|
||||
func (itr *{{$k.name}}AuxIterator) CreateIterator(opt IteratorOptions) (Iterator, error) {
|
||||
expr := opt.Expr
|
||||
|
@ -793,20 +793,16 @@ func (itr *{{$k.name}}AuxIterator) CreateIterator(opt IteratorOptions) (Iterator
|
|||
|
||||
switch expr := expr.(type) {
|
||||
case *VarRef:
|
||||
return itr.Iterator(expr.Val), nil
|
||||
return itr.Iterator(expr.Val, expr.Type), nil
|
||||
default:
|
||||
panic(fmt.Sprintf("invalid expression type for an aux iterator: %T", expr))
|
||||
}
|
||||
}
|
||||
|
||||
func (itr *{{$k.name}}AuxIterator) FieldDimensions(sources Sources) (fields, dimensions map[string]struct{}, err error) {
|
||||
func (itr *{{$k.name}}AuxIterator) FieldDimensions(sources Sources) (fields map[string]DataType, dimensions map[string]struct{}, err error) {
|
||||
return nil, nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (itr *{{$k.name}}AuxIterator) SeriesKeys(opt IteratorOptions) (SeriesList, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (itr *{{$k.name}}AuxIterator) ExpandSources(sources Sources) (Sources, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
|
|
@ -312,7 +312,7 @@ type AuxIterator interface {
|
|||
IteratorCreator
|
||||
|
||||
// Auxilary iterator
|
||||
Iterator(name string) Iterator
|
||||
Iterator(name string, typ DataType) Iterator
|
||||
|
||||
// Start starts writing to the created iterators.
|
||||
Start()
|
||||
|
@ -323,16 +323,16 @@ type AuxIterator interface {
|
|||
}
|
||||
|
||||
// NewAuxIterator returns a new instance of AuxIterator.
|
||||
func NewAuxIterator(input Iterator, seriesKeys SeriesList, opt IteratorOptions) AuxIterator {
|
||||
func NewAuxIterator(input Iterator, opt IteratorOptions) AuxIterator {
|
||||
switch input := input.(type) {
|
||||
case FloatIterator:
|
||||
return newFloatAuxIterator(input, seriesKeys, opt)
|
||||
return newFloatAuxIterator(input, opt)
|
||||
case IntegerIterator:
|
||||
return newIntegerAuxIterator(input, seriesKeys, opt)
|
||||
return newIntegerAuxIterator(input, opt)
|
||||
case StringIterator:
|
||||
return newStringAuxIterator(input, seriesKeys, opt)
|
||||
return newStringAuxIterator(input, opt)
|
||||
case BooleanIterator:
|
||||
return newBooleanAuxIterator(input, seriesKeys, opt)
|
||||
return newBooleanAuxIterator(input, opt)
|
||||
default:
|
||||
panic(fmt.Sprintf("unsupported aux iterator type: %T", input))
|
||||
}
|
||||
|
@ -364,20 +364,10 @@ func (f *auxIteratorField) close() {
|
|||
type auxIteratorFields []*auxIteratorField
|
||||
|
||||
// newAuxIteratorFields returns a new instance of auxIteratorFields from a list of field names.
|
||||
func newAuxIteratorFields(seriesKeys SeriesList, opt IteratorOptions) auxIteratorFields {
|
||||
func newAuxIteratorFields(opt IteratorOptions) auxIteratorFields {
|
||||
fields := make(auxIteratorFields, len(opt.Aux))
|
||||
for i, name := range opt.Aux {
|
||||
fields[i] = &auxIteratorField{name: name, opt: opt}
|
||||
for _, s := range seriesKeys {
|
||||
aux := s.Aux[i]
|
||||
if aux == Unknown {
|
||||
continue
|
||||
}
|
||||
|
||||
if fields[i].typ == Unknown || aux < fields[i].typ {
|
||||
fields[i].typ = aux
|
||||
}
|
||||
}
|
||||
for i, ref := range opt.Aux {
|
||||
fields[i] = &auxIteratorField{name: ref.Val, typ: ref.Type, opt: opt}
|
||||
}
|
||||
return fields
|
||||
}
|
||||
|
@ -389,11 +379,11 @@ func (a auxIteratorFields) close() {
|
|||
}
|
||||
|
||||
// iterator creates a new iterator for a named auxilary field.
|
||||
func (a auxIteratorFields) iterator(name string) Iterator {
|
||||
func (a auxIteratorFields) iterator(name string, typ DataType) Iterator {
|
||||
for _, f := range a {
|
||||
// Skip field if it's name doesn't match.
|
||||
// Exit if no points were received by the iterator.
|
||||
if f.name != name {
|
||||
if f.name != name || (typ != Unknown && f.typ != typ) {
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -407,7 +397,7 @@ func (a auxIteratorFields) iterator(name string) Iterator {
|
|||
itr := &integerChanIterator{cond: sync.NewCond(&sync.Mutex{})}
|
||||
f.append(itr)
|
||||
return itr
|
||||
case String:
|
||||
case String, Tag:
|
||||
itr := &stringChanIterator{cond: sync.NewCond(&sync.Mutex{})}
|
||||
f.append(itr)
|
||||
return itr
|
||||
|
@ -548,10 +538,7 @@ type IteratorCreator interface {
|
|||
CreateIterator(opt IteratorOptions) (Iterator, error)
|
||||
|
||||
// Returns the unique fields and dimensions across a list of sources.
|
||||
FieldDimensions(sources Sources) (fields, dimensions map[string]struct{}, err error)
|
||||
|
||||
// Returns the series keys that will be returned by this iterator.
|
||||
SeriesKeys(opt IteratorOptions) (SeriesList, error)
|
||||
FieldDimensions(sources Sources) (fields map[string]DataType, dimensions map[string]struct{}, err error)
|
||||
|
||||
// Expands regex sources to all matching sources.
|
||||
ExpandSources(sources Sources) (Sources, error)
|
||||
|
@ -618,8 +605,8 @@ func (a IteratorCreators) CreateIterator(opt IteratorOptions) (Iterator, error)
|
|||
}
|
||||
|
||||
// FieldDimensions returns unique fields and dimensions from multiple iterator creators.
|
||||
func (a IteratorCreators) FieldDimensions(sources Sources) (fields, dimensions map[string]struct{}, err error) {
|
||||
fields = make(map[string]struct{})
|
||||
func (a IteratorCreators) FieldDimensions(sources Sources) (fields map[string]DataType, dimensions map[string]struct{}, err error) {
|
||||
fields = make(map[string]DataType)
|
||||
dimensions = make(map[string]struct{})
|
||||
|
||||
for _, ic := range a {
|
||||
|
@ -627,8 +614,10 @@ func (a IteratorCreators) FieldDimensions(sources Sources) (fields, dimensions m
|
|||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
for k := range f {
|
||||
fields[k] = struct{}{}
|
||||
for k, typ := range f {
|
||||
if _, ok := fields[k]; typ != Unknown && (!ok || typ < fields[k]) {
|
||||
fields[k] = typ
|
||||
}
|
||||
}
|
||||
for k := range d {
|
||||
dimensions[k] = struct{}{}
|
||||
|
@ -637,35 +626,6 @@ func (a IteratorCreators) FieldDimensions(sources Sources) (fields, dimensions m
|
|||
return
|
||||
}
|
||||
|
||||
// SeriesKeys returns a list of series in all iterator creators in a.
|
||||
// If a series exists in multiple creators in a, all instances will be combined
|
||||
// into a single Series by calling Combine on it.
|
||||
func (a IteratorCreators) SeriesKeys(opt IteratorOptions) (SeriesList, error) {
|
||||
seriesMap := make(map[string]Series)
|
||||
for _, ic := range a {
|
||||
series, err := ic.SeriesKeys(opt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, s := range series {
|
||||
cur, ok := seriesMap[s.ID()]
|
||||
if ok {
|
||||
cur.Combine(&s)
|
||||
} else {
|
||||
seriesMap[s.ID()] = s
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
seriesList := make([]Series, 0, len(seriesMap))
|
||||
for _, s := range seriesMap {
|
||||
seriesList = append(seriesList, s)
|
||||
}
|
||||
sort.Sort(SeriesList(seriesList))
|
||||
return SeriesList(seriesList), nil
|
||||
}
|
||||
|
||||
// ExpandSources expands sources across all iterator creators and returns a unique result.
|
||||
func (a IteratorCreators) ExpandSources(sources Sources) (Sources, error) {
|
||||
m := make(map[string]Source)
|
||||
|
@ -709,7 +669,7 @@ type IteratorOptions struct {
|
|||
Expr Expr
|
||||
|
||||
// Auxilary tags or values to also retrieve for the point.
|
||||
Aux []string
|
||||
Aux []VarRef
|
||||
|
||||
// Data sources from which to retrieve data.
|
||||
Sources []Source
|
||||
|
@ -902,7 +862,6 @@ func (opt *IteratorOptions) UnmarshalBinary(buf []byte) error {
|
|||
|
||||
func encodeIteratorOptions(opt *IteratorOptions) *internal.IteratorOptions {
|
||||
pb := &internal.IteratorOptions{
|
||||
Aux: opt.Aux,
|
||||
Interval: encodeInterval(opt.Interval),
|
||||
Dimensions: opt.Dimensions,
|
||||
Fill: proto.Int32(int32(opt.Fill)),
|
||||
|
@ -921,6 +880,14 @@ func encodeIteratorOptions(opt *IteratorOptions) *internal.IteratorOptions {
|
|||
pb.Expr = proto.String(opt.Expr.String())
|
||||
}
|
||||
|
||||
// Convert and encode aux fields as variable references.
|
||||
pb.Fields = make([]*internal.VarRef, len(opt.Aux))
|
||||
pb.Aux = make([]string, len(opt.Aux))
|
||||
for i, ref := range opt.Aux {
|
||||
pb.Fields[i] = encodeVarRef(ref)
|
||||
pb.Aux[i] = ref.Val
|
||||
}
|
||||
|
||||
// Convert and encode sources to measurements.
|
||||
sources := make([]*internal.Measurement, len(opt.Sources))
|
||||
for i, source := range opt.Sources {
|
||||
|
@ -944,7 +911,6 @@ func encodeIteratorOptions(opt *IteratorOptions) *internal.IteratorOptions {
|
|||
|
||||
func decodeIteratorOptions(pb *internal.IteratorOptions) (*IteratorOptions, error) {
|
||||
opt := &IteratorOptions{
|
||||
Aux: pb.GetAux(),
|
||||
Interval: decodeInterval(pb.GetInterval()),
|
||||
Dimensions: pb.GetDimensions(),
|
||||
Fill: FillOption(pb.GetFill()),
|
||||
|
@ -968,7 +934,20 @@ func decodeIteratorOptions(pb *internal.IteratorOptions) (*IteratorOptions, erro
|
|||
opt.Expr = expr
|
||||
}
|
||||
|
||||
// Convert and encode sources to measurements.
|
||||
// Convert and decode variable references.
|
||||
if fields := pb.GetFields(); fields != nil {
|
||||
opt.Aux = make([]VarRef, len(fields))
|
||||
for i, ref := range fields {
|
||||
opt.Aux[i] = decodeVarRef(ref)
|
||||
}
|
||||
} else {
|
||||
opt.Aux = make([]VarRef, len(pb.GetAux()))
|
||||
for i, name := range pb.GetAux() {
|
||||
opt.Aux[i] = VarRef{Val: name}
|
||||
}
|
||||
}
|
||||
|
||||
// Convert and dencode sources to measurements.
|
||||
sources := make([]Source, len(pb.GetSources()))
|
||||
for i, source := range pb.GetSources() {
|
||||
mm, err := decodeMeasurement(source)
|
||||
|
@ -1146,6 +1125,20 @@ func decodeInterval(pb *internal.Interval) Interval {
|
|||
}
|
||||
}
|
||||
|
||||
func encodeVarRef(ref VarRef) *internal.VarRef {
|
||||
return &internal.VarRef{
|
||||
Val: proto.String(ref.Val),
|
||||
Type: proto.Int32(int32(ref.Type)),
|
||||
}
|
||||
}
|
||||
|
||||
func decodeVarRef(pb *internal.VarRef) VarRef {
|
||||
return VarRef{
|
||||
Val: pb.GetVal(),
|
||||
Type: DataType(pb.GetType()),
|
||||
}
|
||||
}
|
||||
|
||||
type nilFloatIterator struct{}
|
||||
|
||||
func (*nilFloatIterator) Stats() IteratorStats { return IteratorStats{} }
|
||||
|
|
|
@ -639,17 +639,14 @@ func TestFloatAuxIterator(t *testing.T) {
|
|||
{Time: 0, Value: 1, Aux: []interface{}{float64(100), float64(200)}},
|
||||
{Time: 1, Value: 2, Aux: []interface{}{float64(500), math.NaN()}},
|
||||
}},
|
||||
[]influxql.Series{
|
||||
{Aux: []influxql.DataType{influxql.Float, influxql.Float}},
|
||||
},
|
||||
influxql.IteratorOptions{Aux: []string{"f0", "f1"}},
|
||||
influxql.IteratorOptions{Aux: []influxql.VarRef{{Val: "f0", Type: influxql.Float}, {Val: "f1", Type: influxql.Float}}},
|
||||
)
|
||||
|
||||
itrs := []influxql.Iterator{
|
||||
itr,
|
||||
itr.Iterator("f0"),
|
||||
itr.Iterator("f1"),
|
||||
itr.Iterator("f0"),
|
||||
itr.Iterator("f0", influxql.Unknown),
|
||||
itr.Iterator("f1", influxql.Unknown),
|
||||
itr.Iterator("f0", influxql.Unknown),
|
||||
}
|
||||
itr.Start()
|
||||
|
||||
|
@ -947,7 +944,7 @@ func TestIteratorOptions_ElapsedInterval_Call(t *testing.T) {
|
|||
func TestIteratorOptions_MarshalBinary(t *testing.T) {
|
||||
opt := &influxql.IteratorOptions{
|
||||
Expr: MustParseExpr("count(value)"),
|
||||
Aux: []string{"a", "b", "c"},
|
||||
Aux: []influxql.VarRef{{Val: "a"}, {Val: "b"}, {Val: "c"}},
|
||||
Sources: []influxql.Source{
|
||||
&influxql.Measurement{Database: "db0", RetentionPolicy: "rp0", Name: "mm0"},
|
||||
},
|
||||
|
@ -1087,8 +1084,7 @@ func TestIterator_EncodeDecode(t *testing.T) {
|
|||
// IteratorCreator is a mockable implementation of SelectStatementExecutor.IteratorCreator.
|
||||
type IteratorCreator struct {
|
||||
CreateIteratorFn func(opt influxql.IteratorOptions) (influxql.Iterator, error)
|
||||
FieldDimensionsFn func(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error)
|
||||
SeriesKeysFn func(opt influxql.IteratorOptions) (influxql.SeriesList, error)
|
||||
FieldDimensionsFn func(sources influxql.Sources) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)
|
||||
ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error)
|
||||
}
|
||||
|
||||
|
@ -1096,75 +1092,10 @@ func (ic *IteratorCreator) CreateIterator(opt influxql.IteratorOptions) (influxq
|
|||
return ic.CreateIteratorFn(opt)
|
||||
}
|
||||
|
||||
func (ic *IteratorCreator) FieldDimensions(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) {
|
||||
func (ic *IteratorCreator) FieldDimensions(sources influxql.Sources) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) {
|
||||
return ic.FieldDimensionsFn(sources)
|
||||
}
|
||||
|
||||
func (ic *IteratorCreator) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error) {
|
||||
if ic.SeriesKeysFn != nil {
|
||||
return ic.SeriesKeysFn(opt)
|
||||
}
|
||||
|
||||
itr, err := ic.CreateIterator(opt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
seriesMap := make(map[string]influxql.Series)
|
||||
switch itr := itr.(type) {
|
||||
case influxql.FloatIterator:
|
||||
for p, err := itr.Next(); p != nil; p, err = itr.Next() {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s := influxql.Series{Name: p.Name, Tags: p.Tags, Aux: influxql.InspectDataTypes(p.Aux)}
|
||||
if series, ok := seriesMap[s.ID()]; ok {
|
||||
s.Combine(&series)
|
||||
}
|
||||
seriesMap[s.ID()] = s
|
||||
}
|
||||
case influxql.IntegerIterator:
|
||||
for p, err := itr.Next(); p != nil; p, err = itr.Next() {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s := influxql.Series{Name: p.Name, Tags: p.Tags, Aux: influxql.InspectDataTypes(p.Aux)}
|
||||
if series, ok := seriesMap[s.ID()]; ok {
|
||||
s.Combine(&series)
|
||||
}
|
||||
seriesMap[s.ID()] = s
|
||||
}
|
||||
case influxql.StringIterator:
|
||||
for p, err := itr.Next(); p != nil; p, err = itr.Next() {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s := influxql.Series{Name: p.Name, Tags: p.Tags, Aux: influxql.InspectDataTypes(p.Aux)}
|
||||
if series, ok := seriesMap[s.ID()]; ok {
|
||||
s.Combine(&series)
|
||||
}
|
||||
seriesMap[s.ID()] = s
|
||||
}
|
||||
case influxql.BooleanIterator:
|
||||
for p, err := itr.Next(); p != nil; p, err = itr.Next() {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s := influxql.Series{Name: p.Name, Tags: p.Tags, Aux: influxql.InspectDataTypes(p.Aux)}
|
||||
if series, ok := seriesMap[s.ID()]; ok {
|
||||
s.Combine(&series)
|
||||
}
|
||||
seriesMap[s.ID()] = s
|
||||
}
|
||||
}
|
||||
|
||||
seriesList := make([]influxql.Series, 0, len(seriesMap))
|
||||
for _, s := range seriesMap {
|
||||
seriesList = append(seriesList, s)
|
||||
}
|
||||
return influxql.SeriesList(seriesList), nil
|
||||
}
|
||||
|
||||
func (ic *IteratorCreator) ExpandSources(sources influxql.Sources) (influxql.Sources, error) {
|
||||
return ic.ExpandSourcesFn(sources)
|
||||
}
|
||||
|
|
|
@ -2234,7 +2234,35 @@ func (p *Parser) parseVarRef() (*VarRef, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
vr := &VarRef{Val: strings.Join(segments, ".")}
|
||||
var dtype DataType
|
||||
if tok, _, _ := p.scan(); tok == DOUBLECOLON {
|
||||
tok, pos, lit := p.scan()
|
||||
switch tok {
|
||||
case IDENT:
|
||||
switch strings.ToLower(lit) {
|
||||
case "float":
|
||||
dtype = Float
|
||||
case "integer":
|
||||
dtype = Integer
|
||||
case "string":
|
||||
dtype = String
|
||||
case "boolean":
|
||||
dtype = Boolean
|
||||
default:
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"float", "integer", "string", "boolean", "field", "tag"}, pos)
|
||||
}
|
||||
case FIELD:
|
||||
dtype = AnyField
|
||||
case TAG:
|
||||
dtype = Tag
|
||||
default:
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"float", "integer", "string", "boolean", "field", "tag"}, pos)
|
||||
}
|
||||
} else {
|
||||
p.unscan()
|
||||
}
|
||||
|
||||
vr := &VarRef{Val: strings.Join(segments, "."), Type: dtype}
|
||||
|
||||
return vr, nil
|
||||
}
|
||||
|
@ -2383,7 +2411,19 @@ func (p *Parser) parseUnaryExpr() (Expr, error) {
|
|||
v, _ := ParseDuration(lit)
|
||||
return &DurationLiteral{Val: v}, nil
|
||||
case MUL:
|
||||
return &Wildcard{}, nil
|
||||
wc := &Wildcard{}
|
||||
if tok, _, _ := p.scan(); tok == DOUBLECOLON {
|
||||
tok, pos, lit := p.scan()
|
||||
switch tok {
|
||||
case FIELD, TAG:
|
||||
wc.Type = tok
|
||||
default:
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"field", "tag"}, pos)
|
||||
}
|
||||
} else {
|
||||
p.unscan()
|
||||
}
|
||||
return wc, nil
|
||||
case REGEX:
|
||||
re, err := regexp.Compile(lit)
|
||||
if err != nil {
|
||||
|
|
|
@ -773,6 +773,53 @@ func TestParser_ParseStatement(t *testing.T) {
|
|||
},
|
||||
},
|
||||
|
||||
// SELECT casts
|
||||
{
|
||||
s: `SELECT field1::float, field2::integer, field3::string, field4::boolean, field5::field, tag1::tag FROM cpu`,
|
||||
stmt: &influxql.SelectStatement{
|
||||
IsRawQuery: true,
|
||||
Fields: []*influxql.Field{
|
||||
{
|
||||
Expr: &influxql.VarRef{
|
||||
Val: "field1",
|
||||
Type: influxql.Float,
|
||||
},
|
||||
},
|
||||
{
|
||||
Expr: &influxql.VarRef{
|
||||
Val: "field2",
|
||||
Type: influxql.Integer,
|
||||
},
|
||||
},
|
||||
{
|
||||
Expr: &influxql.VarRef{
|
||||
Val: "field3",
|
||||
Type: influxql.String,
|
||||
},
|
||||
},
|
||||
{
|
||||
Expr: &influxql.VarRef{
|
||||
Val: "field4",
|
||||
Type: influxql.Boolean,
|
||||
},
|
||||
},
|
||||
{
|
||||
Expr: &influxql.VarRef{
|
||||
Val: "field5",
|
||||
Type: influxql.AnyField,
|
||||
},
|
||||
},
|
||||
{
|
||||
Expr: &influxql.VarRef{
|
||||
Val: "tag1",
|
||||
Type: influxql.Tag,
|
||||
},
|
||||
},
|
||||
},
|
||||
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
|
||||
},
|
||||
},
|
||||
|
||||
// See issues https://github.com/influxdata/influxdb/issues/1647
|
||||
// and https://github.com/influxdata/influxdb/issues/4404
|
||||
// DELETE statement
|
||||
|
|
|
@ -95,6 +95,10 @@ func (s *Scanner) Scan() (tok Token, pos Pos, lit string) {
|
|||
case ';':
|
||||
return SEMICOLON, pos, ""
|
||||
case ':':
|
||||
if ch1, _ := s.r.read(); ch1 == ':' {
|
||||
return DOUBLECOLON, pos, ""
|
||||
}
|
||||
s.r.unread()
|
||||
return COLON, pos, ""
|
||||
}
|
||||
|
||||
|
|
|
@ -57,6 +57,8 @@ func TestScanner_Scan(t *testing.T) {
|
|||
{s: `.`, tok: influxql.DOT},
|
||||
{s: `=~`, tok: influxql.EQREGEX},
|
||||
{s: `!~`, tok: influxql.NEQREGEX},
|
||||
{s: `:`, tok: influxql.COLON},
|
||||
{s: `::`, tok: influxql.DOUBLECOLON},
|
||||
|
||||
// Identifiers
|
||||
{s: `foo`, tok: influxql.IDENT, lit: `foo`},
|
||||
|
|
|
@ -38,11 +38,11 @@ func Select(stmt *SelectStatement, ic IteratorCreator, sopt *SelectOptions) ([]I
|
|||
}
|
||||
|
||||
// Determine auxiliary fields to be selected.
|
||||
opt.Aux = make([]string, 0, len(info.refs))
|
||||
opt.Aux = make([]VarRef, 0, len(info.refs))
|
||||
for ref := range info.refs {
|
||||
opt.Aux = append(opt.Aux, ref.Val)
|
||||
opt.Aux = append(opt.Aux, *ref)
|
||||
}
|
||||
sort.Strings(opt.Aux)
|
||||
sort.Sort(VarRefs(opt.Aux))
|
||||
|
||||
// If there are multiple auxilary fields and no calls then construct an aux iterator.
|
||||
if len(info.calls) == 0 && len(info.refs) > 0 {
|
||||
|
@ -55,7 +55,7 @@ func Select(stmt *SelectStatement, ic IteratorCreator, sopt *SelectOptions) ([]I
|
|||
if call.Name == "top" || call.Name == "bottom" {
|
||||
for i := 1; i < len(call.Args)-1; i++ {
|
||||
ref := call.Args[i].(*VarRef)
|
||||
opt.Aux = append(opt.Aux, ref.Val)
|
||||
opt.Aux = append(opt.Aux, *ref)
|
||||
extraFields++
|
||||
}
|
||||
}
|
||||
|
@ -117,14 +117,8 @@ func buildAuxIterators(fields Fields, ic IteratorCreator, opt IteratorOptions) (
|
|||
input = NewLimitIterator(input, opt)
|
||||
}
|
||||
|
||||
seriesKeys, err := ic.SeriesKeys(opt)
|
||||
if err != nil {
|
||||
input.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Wrap in an auxilary iterator to separate the fields.
|
||||
aitr := NewAuxIterator(input, seriesKeys, opt)
|
||||
aitr := NewAuxIterator(input, opt)
|
||||
|
||||
// Generate iterators for each field.
|
||||
itrs := make([]Iterator, len(fields))
|
||||
|
@ -133,7 +127,7 @@ func buildAuxIterators(fields Fields, ic IteratorCreator, opt IteratorOptions) (
|
|||
expr := Reduce(f.Expr, nil)
|
||||
switch expr := expr.(type) {
|
||||
case *VarRef:
|
||||
itrs[i] = aitr.Iterator(expr.Val)
|
||||
itrs[i] = aitr.Iterator(expr.Val, expr.Type)
|
||||
case *BinaryExpr:
|
||||
itr, err := buildExprIterator(expr, aitr, opt, false)
|
||||
if err != nil {
|
||||
|
@ -188,14 +182,9 @@ func buildFieldIterators(fields Fields, ic IteratorCreator, opt IteratorOptions,
|
|||
return nil
|
||||
}
|
||||
|
||||
seriesKeys, err := ic.SeriesKeys(opt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Build the aux iterators. Previous validation should ensure that only one
|
||||
// call was present so we build an AuxIterator from that input.
|
||||
aitr := NewAuxIterator(input, seriesKeys, opt)
|
||||
aitr := NewAuxIterator(input, opt)
|
||||
for i, f := range fields {
|
||||
if itrs[i] != nil {
|
||||
itrs[i] = aitr
|
||||
|
@ -348,8 +337,8 @@ func buildExprIterator(expr Expr, ic IteratorCreator, opt IteratorOptions, selec
|
|||
// This section is O(n^2), but for what should be a low value.
|
||||
for i := 1; i < len(expr.Args)-1; i++ {
|
||||
ref := expr.Args[i].(*VarRef)
|
||||
for index, name := range opt.Aux {
|
||||
if name == ref.Val {
|
||||
for index, aux := range opt.Aux {
|
||||
if aux.Val == ref.Val {
|
||||
tags = append(tags, index)
|
||||
break
|
||||
}
|
||||
|
@ -372,8 +361,8 @@ func buildExprIterator(expr Expr, ic IteratorCreator, opt IteratorOptions, selec
|
|||
// This section is O(n^2), but for what should be a low value.
|
||||
for i := 1; i < len(expr.Args)-1; i++ {
|
||||
ref := expr.Args[i].(*VarRef)
|
||||
for index, name := range opt.Aux {
|
||||
if name == ref.Val {
|
||||
for index, aux := range opt.Aux {
|
||||
if aux.Val == ref.Val {
|
||||
tags = append(tags, index)
|
||||
break
|
||||
}
|
||||
|
|
|
@ -489,7 +489,7 @@ func TestSelect_Top_Tags_Float(t *testing.T) {
|
|||
}
|
||||
|
||||
// Execute selection.
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT top(value, host, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(30s) fill(none)`), &ic, nil)
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT top(value::float, host::tag, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(30s) fill(none)`), &ic, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
|
||||
|
@ -537,7 +537,7 @@ func TestSelect_Top_Tags_Integer(t *testing.T) {
|
|||
}
|
||||
|
||||
// Execute selection.
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT top(value, host, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(30s) fill(none)`), &ic, nil)
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT top(value::integer, host::tag, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(30s) fill(none)`), &ic, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
|
||||
|
@ -585,7 +585,7 @@ func TestSelect_Top_GroupByTags_Float(t *testing.T) {
|
|||
}
|
||||
|
||||
// Execute selection.
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT top(value, host, 1) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY region, time(30s) fill(none)`), &ic, nil)
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT top(value::float, host::tag, 1) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY region, time(30s) fill(none)`), &ic, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
|
||||
|
@ -629,7 +629,7 @@ func TestSelect_Top_GroupByTags_Integer(t *testing.T) {
|
|||
}
|
||||
|
||||
// Execute selection.
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT top(value, host, 1) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY region, time(30s) fill(none)`), &ic, nil)
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT top(value::integer, host::tag, 1) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY region, time(30s) fill(none)`), &ic, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
|
||||
|
@ -673,7 +673,7 @@ func TestSelect_Bottom_NoTags_Float(t *testing.T) {
|
|||
}
|
||||
|
||||
// Execute selection.
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT bottom(value, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(30s), host fill(none)`), &ic, nil)
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT bottom(value::float, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(30s), host fill(none)`), &ic, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
|
||||
|
@ -711,7 +711,7 @@ func TestSelect_Bottom_NoTags_Integer(t *testing.T) {
|
|||
}
|
||||
|
||||
// Execute selection.
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT bottom(value, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(30s), host fill(none)`), &ic, nil)
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT bottom(value::integer, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(30s), host fill(none)`), &ic, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
|
||||
|
@ -749,7 +749,7 @@ func TestSelect_Bottom_Tags_Float(t *testing.T) {
|
|||
}
|
||||
|
||||
// Execute selection.
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT bottom(value, host, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(30s) fill(none)`), &ic, nil)
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT bottom(value::float, host::tag, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(30s) fill(none)`), &ic, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
|
||||
|
@ -797,7 +797,7 @@ func TestSelect_Bottom_Tags_Integer(t *testing.T) {
|
|||
}
|
||||
|
||||
// Execute selection.
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT bottom(value, host, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(30s) fill(none)`), &ic, nil)
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT bottom(value::integer, host::tag, 2) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(30s) fill(none)`), &ic, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
|
||||
|
@ -845,7 +845,7 @@ func TestSelect_Bottom_GroupByTags_Float(t *testing.T) {
|
|||
}
|
||||
|
||||
// Execute selection.
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT bottom(value, host, 1) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY region, time(30s) fill(none)`), &ic, nil)
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT bottom(value::float, host::tag, 1) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY region, time(30s) fill(none)`), &ic, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
|
||||
|
@ -889,7 +889,7 @@ func TestSelect_Bottom_GroupByTags_Integer(t *testing.T) {
|
|||
}
|
||||
|
||||
// Execute selection.
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT bottom(value, host, 1) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY region, time(30s) fill(none)`), &ic, nil)
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT bottom(value::float, host::tag, 1) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY region, time(30s) fill(none)`), &ic, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
|
||||
|
@ -1267,7 +1267,7 @@ func TestSelect_Raw(t *testing.T) {
|
|||
// Mock two iterators -- one for each value in the query.
|
||||
var ic IteratorCreator
|
||||
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
|
||||
if !reflect.DeepEqual(opt.Aux, []string{"v1", "v2"}) {
|
||||
if !reflect.DeepEqual(opt.Aux, []influxql.VarRef{{Val: "v1", Type: influxql.Float}, {Val: "v2", Type: influxql.Float}}) {
|
||||
t.Fatalf("unexpected options: %s", spew.Sdump(opt.Expr))
|
||||
|
||||
}
|
||||
|
@ -1279,7 +1279,7 @@ func TestSelect_Raw(t *testing.T) {
|
|||
}
|
||||
|
||||
// Execute selection.
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT v1, v2 FROM cpu`), &ic, nil)
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT v1::float, v2::float FROM cpu`), &ic, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
|
||||
|
@ -1319,6 +1319,9 @@ func TestSelect_BinaryExpr_Float(t *testing.T) {
|
|||
{Name: "cpu", Time: 9 * Second, Value: 19, Aux: makeAuxFields(19)},
|
||||
}}, nil
|
||||
}
|
||||
ic.FieldDimensionsFn = func(sources influxql.Sources) (map[string]influxql.DataType, map[string]struct{}, error) {
|
||||
return map[string]influxql.DataType{"value": influxql.Float}, nil, nil
|
||||
}
|
||||
|
||||
for _, test := range []struct {
|
||||
Name string
|
||||
|
@ -1506,7 +1509,12 @@ func TestSelect_BinaryExpr_Float(t *testing.T) {
|
|||
},
|
||||
},
|
||||
} {
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(test.Statement), &ic, nil)
|
||||
stmt, err := MustParseSelectStatement(test.Statement).RewriteFields(&ic)
|
||||
if err != nil {
|
||||
t.Errorf("%s: rewrite error: %s", test.Name, err)
|
||||
}
|
||||
|
||||
itrs, err := influxql.Select(stmt, &ic, nil)
|
||||
if err != nil {
|
||||
t.Errorf("%s: parse error: %s", test.Name, err)
|
||||
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
|
||||
|
@ -1534,6 +1542,9 @@ func TestSelect_BinaryExpr_Integer(t *testing.T) {
|
|||
{Name: "cpu", Time: 9 * Second, Value: 19, Aux: makeAuxFields(19)},
|
||||
}}, nil
|
||||
}
|
||||
ic.FieldDimensionsFn = func(sources influxql.Sources) (map[string]influxql.DataType, map[string]struct{}, error) {
|
||||
return map[string]influxql.DataType{"value": influxql.Integer}, nil, nil
|
||||
}
|
||||
|
||||
for _, test := range []struct {
|
||||
Name string
|
||||
|
@ -1721,7 +1732,12 @@ func TestSelect_BinaryExpr_Integer(t *testing.T) {
|
|||
},
|
||||
},
|
||||
} {
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(test.Statement), &ic, nil)
|
||||
stmt, err := MustParseSelectStatement(test.Statement).RewriteFields(&ic)
|
||||
if err != nil {
|
||||
t.Errorf("%s: rewrite error: %s", test.Name, err)
|
||||
}
|
||||
|
||||
itrs, err := influxql.Select(stmt, &ic, nil)
|
||||
if err != nil {
|
||||
t.Errorf("%s: parse error: %s", test.Name, err)
|
||||
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
|
||||
|
@ -1742,6 +1758,12 @@ func TestSelect_BinaryExpr_Mixed(t *testing.T) {
|
|||
{Name: "cpu", Time: 9 * Second, Value: 19, Aux: []interface{}{float64(19), int64(5)}},
|
||||
}}, nil
|
||||
}
|
||||
ic.FieldDimensionsFn = func(sources influxql.Sources) (map[string]influxql.DataType, map[string]struct{}, error) {
|
||||
return map[string]influxql.DataType{
|
||||
"total": influxql.Float,
|
||||
"value": influxql.Integer,
|
||||
}, nil, nil
|
||||
}
|
||||
|
||||
for _, test := range []struct {
|
||||
Name string
|
||||
|
@ -1785,7 +1807,12 @@ func TestSelect_BinaryExpr_Mixed(t *testing.T) {
|
|||
},
|
||||
},
|
||||
} {
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(test.Statement), &ic, nil)
|
||||
stmt, err := MustParseSelectStatement(test.Statement).RewriteFields(&ic)
|
||||
if err != nil {
|
||||
t.Errorf("%s: rewrite error: %s", test.Name, err)
|
||||
}
|
||||
|
||||
itrs, err := influxql.Select(stmt, &ic, nil)
|
||||
if err != nil {
|
||||
t.Errorf("%s: parse error: %s", test.Name, err)
|
||||
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
|
||||
|
@ -1805,9 +1832,15 @@ func TestSelect_BinaryExpr_NilValues(t *testing.T) {
|
|||
return &FloatIterator{Points: []influxql.FloatPoint{
|
||||
{Name: "cpu", Time: 0 * Second, Value: 20, Aux: []interface{}{float64(20), nil}},
|
||||
{Name: "cpu", Time: 5 * Second, Value: 10, Aux: []interface{}{float64(10), float64(15)}},
|
||||
{Name: "cpu", Time: 9 * Second, Value: 19, Aux: []interface{}{nil, int64(5)}},
|
||||
{Name: "cpu", Time: 9 * Second, Value: 19, Aux: []interface{}{nil, float64(5)}},
|
||||
}}, nil
|
||||
}
|
||||
ic.FieldDimensionsFn = func(sources influxql.Sources) (map[string]influxql.DataType, map[string]struct{}, error) {
|
||||
return map[string]influxql.DataType{
|
||||
"total": influxql.Float,
|
||||
"value": influxql.Float,
|
||||
}, nil, nil
|
||||
}
|
||||
|
||||
for _, test := range []struct {
|
||||
Name string
|
||||
|
@ -1851,7 +1884,12 @@ func TestSelect_BinaryExpr_NilValues(t *testing.T) {
|
|||
},
|
||||
},
|
||||
} {
|
||||
itrs, err := influxql.Select(MustParseSelectStatement(test.Statement), &ic, nil)
|
||||
stmt, err := MustParseSelectStatement(test.Statement).RewriteFields(&ic)
|
||||
if err != nil {
|
||||
t.Errorf("%s: rewrite error: %s", test.Name, err)
|
||||
}
|
||||
|
||||
itrs, err := influxql.Select(stmt, &ic, nil)
|
||||
if err != nil {
|
||||
t.Errorf("%s: parse error: %s", test.Name, err)
|
||||
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
|
||||
|
@ -2313,7 +2351,7 @@ func NewRawBenchmarkIteratorCreator(pointN int) *IteratorCreator {
|
|||
}
|
||||
|
||||
for i := range opt.Aux {
|
||||
switch opt.Aux[i] {
|
||||
switch opt.Aux[i].Val {
|
||||
case "fval":
|
||||
p.Aux[i] = float64(100)
|
||||
default:
|
||||
|
|
|
@ -49,12 +49,13 @@ const (
|
|||
GTE // >=
|
||||
operatorEnd
|
||||
|
||||
LPAREN // (
|
||||
RPAREN // )
|
||||
COMMA // ,
|
||||
COLON // :
|
||||
SEMICOLON // ;
|
||||
DOT // .
|
||||
LPAREN // (
|
||||
RPAREN // )
|
||||
COMMA // ,
|
||||
COLON // :
|
||||
DOUBLECOLON // ::
|
||||
SEMICOLON // ;
|
||||
DOT // .
|
||||
|
||||
keywordBeg
|
||||
// ALL and the following are InfluxQL Keywords
|
||||
|
@ -169,12 +170,13 @@ var tokens = [...]string{
|
|||
GT: ">",
|
||||
GTE: ">=",
|
||||
|
||||
LPAREN: "(",
|
||||
RPAREN: ")",
|
||||
COMMA: ",",
|
||||
COLON: ":",
|
||||
SEMICOLON: ";",
|
||||
DOT: ".",
|
||||
LPAREN: "(",
|
||||
RPAREN: ")",
|
||||
COMMA: ",",
|
||||
COLON: ":",
|
||||
DOUBLECOLON: "::",
|
||||
SEMICOLON: ";",
|
||||
DOT: ".",
|
||||
|
||||
ALL: "ALL",
|
||||
ALTER: "ALTER",
|
||||
|
|
|
@ -276,10 +276,10 @@ func (itr *FloatCursorIterator) Next() *influxql.FloatPoint {
|
|||
}
|
||||
|
||||
// Read all auxilary fields.
|
||||
for i, name := range itr.opt.Aux {
|
||||
if v, ok := m[name]; ok {
|
||||
for i, ref := range itr.opt.Aux {
|
||||
if v, ok := m[ref.Val]; ok {
|
||||
itr.point.Aux[i] = v
|
||||
} else if s, ok := tags[name]; ok {
|
||||
} else if s, ok := tags[ref.Val]; ok {
|
||||
itr.point.Aux[i] = s
|
||||
} else {
|
||||
itr.point.Aux[i] = nil
|
||||
|
@ -306,8 +306,8 @@ func (itr *FloatCursorIterator) Next() *influxql.FloatPoint {
|
|||
}
|
||||
|
||||
// Read all auxilary fields.
|
||||
for i, name := range itr.opt.Aux {
|
||||
if tagValue, ok := tags[name]; ok {
|
||||
for i, ref := range itr.opt.Aux {
|
||||
if tagValue, ok := tags[ref.Val]; ok {
|
||||
itr.point.Aux[i] = tagValue
|
||||
} else {
|
||||
itr.point.Aux[i] = value
|
||||
|
|
|
@ -254,7 +254,7 @@ func TestFloatCursorIterator_MultipleValues(t *testing.T) {
|
|||
}, true)
|
||||
|
||||
opt := influxql.IteratorOptions{
|
||||
Expr: &influxql.VarRef{Val: "val1"}, Aux: []string{"val1", "val2"},
|
||||
Expr: &influxql.VarRef{Val: "val1"}, Aux: []influxql.VarRef{{Val: "val1"}, {Val: "val2"}},
|
||||
Ascending: true,
|
||||
StartTime: influxql.MinTime,
|
||||
EndTime: influxql.MaxTime,
|
||||
|
@ -293,7 +293,7 @@ func TestFloatCursorIterator_Aux_SingleValue(t *testing.T) {
|
|||
}, true)
|
||||
|
||||
opt := influxql.IteratorOptions{
|
||||
Aux: []string{"val1"},
|
||||
Aux: []influxql.VarRef{{Val: "val1"}},
|
||||
Ascending: true,
|
||||
StartTime: influxql.MinTime,
|
||||
EndTime: influxql.MaxTime,
|
||||
|
@ -332,7 +332,7 @@ func TestFloatCursorIterator_Aux_MultipleValues(t *testing.T) {
|
|||
}, true)
|
||||
|
||||
opt := influxql.IteratorOptions{
|
||||
Aux: []string{"val1", "val2"},
|
||||
Aux: []influxql.VarRef{{Val: "val1"}, {Val: "val2"}},
|
||||
Ascending: true,
|
||||
StartTime: influxql.MinTime,
|
||||
EndTime: influxql.MaxTime,
|
||||
|
|
|
@ -35,7 +35,6 @@ type Engine interface {
|
|||
Restore(r io.Reader, basePath string) error
|
||||
|
||||
CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error)
|
||||
SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error)
|
||||
WritePoints(points []models.Point) error
|
||||
ContainsSeries(keys []string) (map[string]bool, error)
|
||||
DeleteSeries(keys []string) error
|
||||
|
|
|
@ -879,62 +879,6 @@ func (e *Engine) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator
|
|||
return itr, nil
|
||||
}
|
||||
|
||||
func (e *Engine) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error) {
|
||||
seriesList := influxql.SeriesList{}
|
||||
mms := tsdb.Measurements(e.index.MeasurementsByName(influxql.Sources(opt.Sources).Names()))
|
||||
for _, mm := range mms {
|
||||
// Determine tagsets for this measurement based on dimensions and filters.
|
||||
tagSets, err := mm.TagSets(opt.Dimensions, opt.Condition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Calculate tag sets and apply SLIMIT/SOFFSET.
|
||||
tagSets = influxql.LimitTagSets(tagSets, opt.SLimit, opt.SOffset)
|
||||
for _, t := range tagSets {
|
||||
series := influxql.Series{
|
||||
Name: mm.Name,
|
||||
Tags: influxql.NewTags(t.Tags),
|
||||
Aux: make([]influxql.DataType, len(opt.Aux)),
|
||||
}
|
||||
|
||||
// Determine the aux field types.
|
||||
for _, seriesKey := range t.SeriesKeys {
|
||||
tags := influxql.NewTags(e.index.TagsForSeries(seriesKey))
|
||||
for i, field := range opt.Aux {
|
||||
typ := func() influxql.DataType {
|
||||
mf := e.measurementFields[mm.Name]
|
||||
if mf == nil {
|
||||
return influxql.Unknown
|
||||
}
|
||||
|
||||
f := mf.Field(field)
|
||||
if f == nil {
|
||||
return influxql.Unknown
|
||||
}
|
||||
return f.Type
|
||||
}()
|
||||
|
||||
if typ == influxql.Unknown {
|
||||
if v := tags.Value(field); v != "" {
|
||||
// All tags are strings.
|
||||
typ = influxql.String
|
||||
}
|
||||
}
|
||||
|
||||
if typ != influxql.Unknown {
|
||||
if series.Aux[i] == influxql.Unknown || typ < series.Aux[i] {
|
||||
series.Aux[i] = typ
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
seriesList = append(seriesList, series)
|
||||
}
|
||||
}
|
||||
return seriesList, nil
|
||||
}
|
||||
|
||||
// createVarRefIterator creates an iterator for a variable reference.
|
||||
func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions) ([]influxql.Iterator, error) {
|
||||
ref, _ := opt.Expr.(*influxql.VarRef)
|
||||
|
@ -1051,7 +995,7 @@ func (e *Engine) createTagSetIterators(ref *influxql.VarRef, mm *tsdb.Measuremen
|
|||
|
||||
// createTagSetGroupIterators creates a set of iterators for a subset of a tagset's series.
|
||||
func (e *Engine) createTagSetGroupIterators(ref *influxql.VarRef, mm *tsdb.Measurement, seriesKeys []string, t *influxql.TagSet, filters []influxql.Expr, opt influxql.IteratorOptions) ([]influxql.Iterator, error) {
|
||||
conditionFields := make([]string, len(influxql.ExprNames(opt.Condition)))
|
||||
conditionFields := make([]influxql.VarRef, len(influxql.ExprNames(opt.Condition)))
|
||||
|
||||
itrs := make([]influxql.Iterator, 0, len(seriesKeys))
|
||||
for i, seriesKey := range seriesKeys {
|
||||
|
@ -1076,7 +1020,7 @@ func (e *Engine) createTagSetGroupIterators(ref *influxql.VarRef, mm *tsdb.Measu
|
|||
}
|
||||
|
||||
// createVarRefSeriesIterator creates an iterator for a variable reference for a series.
|
||||
func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, mm *tsdb.Measurement, seriesKey string, t *influxql.TagSet, filter influxql.Expr, conditionFields []string, opt influxql.IteratorOptions) (influxql.Iterator, error) {
|
||||
func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, mm *tsdb.Measurement, seriesKey string, t *influxql.TagSet, filter influxql.Expr, conditionFields []influxql.VarRef, opt influxql.IteratorOptions) (influxql.Iterator, error) {
|
||||
tags := influxql.NewTags(e.index.TagsForSeries(seriesKey))
|
||||
|
||||
// Create options specific for this series.
|
||||
|
@ -1088,17 +1032,35 @@ func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, mm *tsdb.Measu
|
|||
var aux []cursorAt
|
||||
if len(opt.Aux) > 0 {
|
||||
aux = make([]cursorAt, len(opt.Aux))
|
||||
for i := range aux {
|
||||
// Create cursor from field.
|
||||
cur := e.buildCursor(mm.Name, seriesKey, opt.Aux[i], opt)
|
||||
if cur != nil {
|
||||
aux[i] = newBufCursor(cur, opt.Ascending)
|
||||
continue
|
||||
for i, ref := range opt.Aux {
|
||||
// Create cursor from field if a tag wasn't requested.
|
||||
if ref.Type != influxql.Tag {
|
||||
cur := e.buildCursor(mm.Name, seriesKey, &ref, opt)
|
||||
if cur != nil {
|
||||
aux[i] = newBufCursor(cur, opt.Ascending)
|
||||
continue
|
||||
}
|
||||
|
||||
// If a field was requested, use a nil cursor of the requested type.
|
||||
switch ref.Type {
|
||||
case influxql.Float, influxql.AnyField:
|
||||
aux[i] = &floatNilLiteralCursor{}
|
||||
continue
|
||||
case influxql.Integer:
|
||||
aux[i] = &integerNilLiteralCursor{}
|
||||
continue
|
||||
case influxql.String:
|
||||
aux[i] = &stringNilLiteralCursor{}
|
||||
continue
|
||||
case influxql.Boolean:
|
||||
aux[i] = &booleanNilLiteralCursor{}
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// If field doesn't exist, use the tag value.
|
||||
// However, if the tag value is blank then return a null.
|
||||
if v := tags.Value(opt.Aux[i]); v == "" {
|
||||
if v := tags.Value(ref.Val); v == "" {
|
||||
// However, if the tag value is blank then return a null.
|
||||
aux[i] = &stringNilLiteralCursor{}
|
||||
} else {
|
||||
aux[i] = &stringLiteralCursor{value: v}
|
||||
|
@ -1111,33 +1073,53 @@ func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, mm *tsdb.Measu
|
|||
var conds []cursorAt
|
||||
if len(conditionFields) > 0 {
|
||||
conds = make([]cursorAt, len(conditionFields))
|
||||
for i := range conds {
|
||||
cur := e.buildCursor(mm.Name, seriesKey, conditionFields[i], opt)
|
||||
if cur != nil {
|
||||
conds[i] = newBufCursor(cur, opt.Ascending)
|
||||
continue
|
||||
for i, ref := range conditionFields {
|
||||
// Create cursor from field if a tag wasn't requested.
|
||||
if ref.Type != influxql.Tag {
|
||||
cur := e.buildCursor(mm.Name, seriesKey, &ref, opt)
|
||||
if cur != nil {
|
||||
conds[i] = newBufCursor(cur, opt.Ascending)
|
||||
continue
|
||||
}
|
||||
|
||||
// If a field was requested, use a nil cursor of the requested type.
|
||||
switch ref.Type {
|
||||
case influxql.Float, influxql.AnyField:
|
||||
aux[i] = &floatNilLiteralCursor{}
|
||||
continue
|
||||
case influxql.Integer:
|
||||
aux[i] = &integerNilLiteralCursor{}
|
||||
continue
|
||||
case influxql.String:
|
||||
aux[i] = &stringNilLiteralCursor{}
|
||||
continue
|
||||
case influxql.Boolean:
|
||||
aux[i] = &booleanNilLiteralCursor{}
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// If field doesn't exist, use the tag value.
|
||||
// However, if the tag value is blank then return a null.
|
||||
if v := tags.Value(conditionFields[i]); v == "" {
|
||||
if v := tags.Value(ref.Val); v == "" {
|
||||
// However, if the tag value is blank then return a null.
|
||||
conds[i] = &stringNilLiteralCursor{}
|
||||
} else {
|
||||
conds[i] = &stringLiteralCursor{value: v}
|
||||
}
|
||||
}
|
||||
}
|
||||
condNames := influxql.VarRefs(conditionFields).Strings()
|
||||
|
||||
// Limit tags to only the dimensions selected.
|
||||
tags = tags.Subset(opt.Dimensions)
|
||||
|
||||
// If it's only auxiliary fields then it doesn't matter what type of iterator we use.
|
||||
if ref == nil {
|
||||
return newFloatIterator(mm.Name, tags, itrOpt, nil, aux, conds, conditionFields), nil
|
||||
return newFloatIterator(mm.Name, tags, itrOpt, nil, aux, conds, condNames), nil
|
||||
}
|
||||
|
||||
// Build main cursor.
|
||||
cur := e.buildCursor(mm.Name, seriesKey, ref.Val, opt)
|
||||
cur := e.buildCursor(mm.Name, seriesKey, ref, opt)
|
||||
|
||||
// If the field doesn't exist then don't build an iterator.
|
||||
if cur == nil {
|
||||
|
@ -1146,20 +1128,20 @@ func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, mm *tsdb.Measu
|
|||
|
||||
switch cur := cur.(type) {
|
||||
case floatCursor:
|
||||
return newFloatIterator(mm.Name, tags, itrOpt, cur, aux, conds, conditionFields), nil
|
||||
return newFloatIterator(mm.Name, tags, itrOpt, cur, aux, conds, condNames), nil
|
||||
case integerCursor:
|
||||
return newIntegerIterator(mm.Name, tags, itrOpt, cur, aux, conds, conditionFields), nil
|
||||
return newIntegerIterator(mm.Name, tags, itrOpt, cur, aux, conds, condNames), nil
|
||||
case stringCursor:
|
||||
return newStringIterator(mm.Name, tags, itrOpt, cur, aux, conds, conditionFields), nil
|
||||
return newStringIterator(mm.Name, tags, itrOpt, cur, aux, conds, condNames), nil
|
||||
case booleanCursor:
|
||||
return newBooleanIterator(mm.Name, tags, itrOpt, cur, aux, conds, conditionFields), nil
|
||||
return newBooleanIterator(mm.Name, tags, itrOpt, cur, aux, conds, condNames), nil
|
||||
default:
|
||||
panic("unreachable")
|
||||
}
|
||||
}
|
||||
|
||||
// buildCursor creates an untyped cursor for a field.
|
||||
func (e *Engine) buildCursor(measurement, seriesKey, field string, opt influxql.IteratorOptions) cursor {
|
||||
func (e *Engine) buildCursor(measurement, seriesKey string, ref *influxql.VarRef, opt influxql.IteratorOptions) cursor {
|
||||
// Look up fields for measurement.
|
||||
mf := e.measurementFields[measurement]
|
||||
if mf == nil {
|
||||
|
@ -1167,21 +1149,41 @@ func (e *Engine) buildCursor(measurement, seriesKey, field string, opt influxql.
|
|||
}
|
||||
|
||||
// Find individual field.
|
||||
f := mf.Field(field)
|
||||
f := mf.Field(ref.Val)
|
||||
if f == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check if we need to perform a cast. Performing a cast in the
|
||||
// engine (if it is possible) is much more efficient than an automatic cast.
|
||||
if ref.Type != influxql.Unknown && ref.Type != influxql.AnyField && ref.Type != f.Type {
|
||||
switch ref.Type {
|
||||
case influxql.Float:
|
||||
switch f.Type {
|
||||
case influxql.Integer:
|
||||
cur := e.buildIntegerCursor(measurement, seriesKey, ref.Val, opt)
|
||||
return &floatCastIntegerCursor{cursor: cur}
|
||||
}
|
||||
case influxql.Integer:
|
||||
switch f.Type {
|
||||
case influxql.Float:
|
||||
cur := e.buildFloatCursor(measurement, seriesKey, ref.Val, opt)
|
||||
return &integerCastFloatCursor{cursor: cur}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Return appropriate cursor based on type.
|
||||
switch f.Type {
|
||||
case influxql.Float:
|
||||
return e.buildFloatCursor(measurement, seriesKey, field, opt)
|
||||
return e.buildFloatCursor(measurement, seriesKey, ref.Val, opt)
|
||||
case influxql.Integer:
|
||||
return e.buildIntegerCursor(measurement, seriesKey, field, opt)
|
||||
return e.buildIntegerCursor(measurement, seriesKey, ref.Val, opt)
|
||||
case influxql.String:
|
||||
return e.buildStringCursor(measurement, seriesKey, field, opt)
|
||||
return e.buildStringCursor(measurement, seriesKey, ref.Val, opt)
|
||||
case influxql.Boolean:
|
||||
return e.buildBooleanCursor(measurement, seriesKey, field, opt)
|
||||
return e.buildBooleanCursor(measurement, seriesKey, ref.Val, opt)
|
||||
default:
|
||||
panic("unreachable")
|
||||
}
|
||||
|
|
|
@ -450,7 +450,7 @@ func TestEngine_CreateIterator_Aux(t *testing.T) {
|
|||
|
||||
itr, err := e.CreateIterator(influxql.IteratorOptions{
|
||||
Expr: influxql.MustParseExpr(`value`),
|
||||
Aux: []string{"F"},
|
||||
Aux: []influxql.VarRef{{Val: "F"}},
|
||||
Dimensions: []string{"host"},
|
||||
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
|
||||
StartTime: influxql.MinTime,
|
||||
|
|
|
@ -20,3 +20,25 @@ func newLimitIterator(input influxql.Iterator, opt influxql.IteratorOptions) inf
|
|||
panic(fmt.Sprintf("unsupported limit iterator type: %T", input))
|
||||
}
|
||||
}
|
||||
|
||||
type floatCastIntegerCursor struct {
|
||||
cursor integerCursor
|
||||
}
|
||||
|
||||
func (c *floatCastIntegerCursor) next() (t int64, v interface{}) { return c.nextFloat() }
|
||||
|
||||
func (c *floatCastIntegerCursor) nextFloat() (int64, float64) {
|
||||
t, v := c.cursor.nextInteger()
|
||||
return t, float64(v)
|
||||
}
|
||||
|
||||
type integerCastFloatCursor struct {
|
||||
cursor floatCursor
|
||||
}
|
||||
|
||||
func (c *integerCastFloatCursor) next() (t int64, v interface{}) { return c.nextInteger() }
|
||||
|
||||
func (c *integerCastFloatCursor) nextInteger() (int64, int64) {
|
||||
t, v := c.cursor.nextFloat()
|
||||
return t, int64(v)
|
||||
}
|
||||
|
|
|
@ -844,11 +844,11 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Ex
|
|||
|
||||
// For fields, return all series IDs from this measurement and return
|
||||
// the expression passed in, as the filter.
|
||||
if name.Val != "_name" && m.hasField(name.Val) {
|
||||
if name.Val != "_name" && ((name.Type == influxql.Unknown && m.hasField(name.Val)) || name.Type == influxql.AnyField || (name.Type != influxql.Tag && name.Type != influxql.Unknown)) {
|
||||
return m.seriesIDs, n, nil
|
||||
} else if value, ok := value.(*influxql.VarRef); ok {
|
||||
// Check if the RHS is a variable and if it is a field.
|
||||
if value.Val != "_name" && m.hasField(value.Val) {
|
||||
if value.Val != "_name" && ((value.Type == influxql.Unknown && m.hasField(value.Val)) || name.Type == influxql.AnyField || (value.Type != influxql.Tag && value.Type != influxql.Unknown)) {
|
||||
return m.seriesIDs, n, nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -442,8 +442,38 @@ func (s *Shard) createSystemIterator(opt influxql.IteratorOptions) (influxql.Ite
|
|||
}
|
||||
|
||||
// FieldDimensions returns unique sets of fields and dimensions across a list of sources.
|
||||
func (s *Shard) FieldDimensions(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) {
|
||||
fields = make(map[string]struct{})
|
||||
func (s *Shard) FieldDimensions(sources influxql.Sources) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) {
|
||||
if influxql.Sources(sources).HasSystemSource() {
|
||||
// Only support a single system source.
|
||||
if len(sources) > 1 {
|
||||
return nil, nil, errors.New("cannot select from multiple system sources")
|
||||
}
|
||||
|
||||
switch m := sources[0].(type) {
|
||||
case *influxql.Measurement:
|
||||
switch m.Name {
|
||||
case "_fieldKeys":
|
||||
return map[string]influxql.DataType{
|
||||
"fieldKey": influxql.String,
|
||||
"fieldType": influxql.String,
|
||||
}, nil, nil
|
||||
case "_measurements":
|
||||
return map[string]influxql.DataType{"_name": influxql.String}, nil, nil
|
||||
case "_series":
|
||||
return map[string]influxql.DataType{"key": influxql.String}, nil, nil
|
||||
case "_tagKeys":
|
||||
return map[string]influxql.DataType{"tagKey": influxql.String}, nil, nil
|
||||
case "_tags":
|
||||
return map[string]influxql.DataType{
|
||||
"_tagKey": influxql.String,
|
||||
"value": influxql.String,
|
||||
}, nil, nil
|
||||
}
|
||||
}
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
fields = make(map[string]influxql.DataType)
|
||||
dimensions = make(map[string]struct{})
|
||||
|
||||
for _, src := range sources {
|
||||
|
@ -456,8 +486,14 @@ func (s *Shard) FieldDimensions(sources influxql.Sources) (fields, dimensions ma
|
|||
}
|
||||
|
||||
// Append fields and dimensions.
|
||||
for _, name := range mm.FieldNames() {
|
||||
fields[name] = struct{}{}
|
||||
fieldNames := mm.FieldNames()
|
||||
if len(fieldNames) > 0 {
|
||||
mf := s.engine.MeasurementFields(m.Name)
|
||||
for _, name := range fieldNames {
|
||||
if f := mf.Field(name); f != nil {
|
||||
fields[name] = f.Type
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, key := range mm.TagKeys() {
|
||||
dimensions[key] = struct{}{}
|
||||
|
@ -468,30 +504,6 @@ func (s *Shard) FieldDimensions(sources influxql.Sources) (fields, dimensions ma
|
|||
return
|
||||
}
|
||||
|
||||
// SeriesKeys returns a list of series in the shard.
|
||||
func (s *Shard) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error) {
|
||||
if s.closed() {
|
||||
return nil, ErrEngineClosed
|
||||
}
|
||||
|
||||
if influxql.Sources(opt.Sources).HasSystemSource() {
|
||||
// Only support a single system source.
|
||||
if len(opt.Sources) > 1 {
|
||||
return nil, errors.New("cannot select from multiple system sources")
|
||||
}
|
||||
|
||||
// Meta queries don't need to know the series name and
|
||||
// always have a single series of strings.
|
||||
auxFields := make([]influxql.DataType, len(opt.Aux))
|
||||
for i := range auxFields {
|
||||
auxFields[i] = influxql.String
|
||||
}
|
||||
return []influxql.Series{{Aux: auxFields}}, nil
|
||||
}
|
||||
|
||||
return s.engine.SeriesKeys(opt)
|
||||
}
|
||||
|
||||
// ExpandSources expands regex sources and removes duplicates.
|
||||
// NOTE: sources must be normalized (db and rp set) before calling this function.
|
||||
func (s *Shard) ExpandSources(sources influxql.Sources) (influxql.Sources, error) {
|
||||
|
@ -661,6 +673,17 @@ func (m *MeasurementFields) Field(name string) *Field {
|
|||
return f
|
||||
}
|
||||
|
||||
func (m *MeasurementFields) FieldSet() map[string]influxql.DataType {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
fields := make(map[string]influxql.DataType)
|
||||
for name, f := range m.fields {
|
||||
fields[name] = f.Type
|
||||
}
|
||||
return fields
|
||||
}
|
||||
|
||||
// Field represents a series field.
|
||||
type Field struct {
|
||||
ID uint8 `json:"id,omitempty"`
|
||||
|
@ -941,12 +964,9 @@ func (ic *shardIteratorCreator) Close() error { return nil }
|
|||
func (ic *shardIteratorCreator) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) {
|
||||
return ic.sh.CreateIterator(opt)
|
||||
}
|
||||
func (ic *shardIteratorCreator) FieldDimensions(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) {
|
||||
func (ic *shardIteratorCreator) FieldDimensions(sources influxql.Sources) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) {
|
||||
return ic.sh.FieldDimensions(sources)
|
||||
}
|
||||
func (ic *shardIteratorCreator) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error) {
|
||||
return ic.sh.SeriesKeys(opt)
|
||||
}
|
||||
func (ic *shardIteratorCreator) ExpandSources(sources influxql.Sources) (influxql.Sources, error) {
|
||||
return ic.sh.ExpandSources(sources)
|
||||
}
|
||||
|
@ -1144,7 +1164,7 @@ func (itr *seriesIterator) Next() (*influxql.FloatPoint, error) {
|
|||
|
||||
// Write auxiliary fields.
|
||||
for i, f := range itr.opt.Aux {
|
||||
switch f {
|
||||
switch f.Val {
|
||||
case "key":
|
||||
itr.point.Aux[i] = key
|
||||
}
|
||||
|
@ -1273,7 +1293,7 @@ func NewTagValuesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Ite
|
|||
return &tagValuesIterator{
|
||||
series: series,
|
||||
keys: keys.list(),
|
||||
fields: opt.Aux,
|
||||
fields: influxql.VarRefs(opt.Aux).Strings(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -219,7 +219,7 @@ cpu,host=serverB,region=uswest value=25 0
|
|||
// Create iterator.
|
||||
itr, err := sh.CreateIterator(influxql.IteratorOptions{
|
||||
Expr: influxql.MustParseExpr(`value`),
|
||||
Aux: []string{"val2"},
|
||||
Aux: []influxql.VarRef{{Val: "val2"}},
|
||||
Dimensions: []string{"host"},
|
||||
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
|
||||
Ascending: true,
|
||||
|
@ -295,7 +295,7 @@ cpu,host=serverB,region=uswest value=25 0
|
|||
// Create iterator.
|
||||
itr, err := sh.CreateIterator(influxql.IteratorOptions{
|
||||
Expr: influxql.MustParseExpr(`value`),
|
||||
Aux: []string{"val2"},
|
||||
Aux: []influxql.VarRef{{Val: "val2"}},
|
||||
Dimensions: []string{"host"},
|
||||
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
|
||||
Ascending: false,
|
||||
|
|
Loading…
Reference in New Issue