feat(query/influxql): support for group by
The transpiler now supports grouping by tags.pull/10616/head
parent
2307b7d91c
commit
98597551d3
|
@ -1,6 +1,8 @@
|
|||
package influxql
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/influxdata/influxql"
|
||||
"github.com/influxdata/platform/query"
|
||||
"github.com/influxdata/platform/query/functions"
|
||||
|
@ -192,10 +194,54 @@ type groupCursor struct {
|
|||
}
|
||||
|
||||
func (gr *groupInfo) group(t *transpilerState, in cursor) (cursor, error) {
|
||||
// TODO(jsternberg): Process group by clause correctly and windowing.
|
||||
// TODO(jsternberg): Process windowing.
|
||||
tags := []string{"_measurement"}
|
||||
if len(t.stmt.Dimensions) > 0 {
|
||||
// Maintain a set of the dimensions we have encountered.
|
||||
// This is so we don't duplicate groupings, but we still maintain the
|
||||
// listing of tags in the tags slice so it is deterministic.
|
||||
m := make(map[string]struct{})
|
||||
for _, d := range t.stmt.Dimensions {
|
||||
// Reduce the expression before attempting anything. Do not evaluate the call.
|
||||
expr := influxql.Reduce(d.Expr, nil)
|
||||
|
||||
switch expr := expr.(type) {
|
||||
case *influxql.VarRef:
|
||||
if strings.ToLower(expr.Val) == "time" {
|
||||
return nil, errors.New("time() is a function and expects at least one argument")
|
||||
} else if _, ok := m[expr.Val]; ok {
|
||||
continue
|
||||
}
|
||||
tags = append(tags, expr.Val)
|
||||
m[expr.Val] = struct{}{}
|
||||
case *influxql.Call:
|
||||
// Ensure the call is time() and it has one or two duration arguments.
|
||||
if expr.Name != "time" {
|
||||
return nil, errors.New("only time() calls allowed in dimensions")
|
||||
} else if got := len(expr.Args); got < 1 || got > 2 {
|
||||
return nil, errors.New("time dimension expected 1 or 2 arguments")
|
||||
} else if _, ok := expr.Args[0].(*influxql.DurationLiteral); !ok {
|
||||
return nil, errors.New("time dimension must have duration argument")
|
||||
} else {
|
||||
return nil, errors.New("unimplemented: windowing support")
|
||||
}
|
||||
case *influxql.Wildcard:
|
||||
return nil, errors.New("unimplemented: dimension wildcards")
|
||||
case *influxql.RegexLiteral:
|
||||
return nil, errors.New("unimplemented: dimension regex wildcards")
|
||||
default:
|
||||
return nil, errors.New("only time and tag dimensions allowed")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Perform the grouping by the tags we found. There is always a group by because
|
||||
// there is always something to group in influxql.
|
||||
// TODO(jsternberg): A wildcard will skip this step.
|
||||
id := t.op("group", &functions.GroupOpSpec{
|
||||
By: []string{"_measurement"},
|
||||
By: tags,
|
||||
}, in.ID())
|
||||
|
||||
return &groupCursor{id: id, cursor: in}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@ func TestMultiResultEncoder_Encode(t *testing.T) {
|
|||
{Label: "value", Type: query.TFloat},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{mustParseTime("2018-05-24T09:00:00Z"), "m0", "server01", float64(2)},
|
||||
{ts("2018-05-24T09:00:00Z"), "m0", "server01", float64(2)},
|
||||
},
|
||||
}},
|
||||
}},
|
||||
|
@ -81,10 +81,15 @@ func (ri *resultErrorIterator) Err() error {
|
|||
return errors.New(ri.Error)
|
||||
}
|
||||
|
||||
func mustParseTime(s string) execute.Time {
|
||||
func mustParseTime(s string) time.Time {
|
||||
t, err := time.Parse(time.RFC3339, s)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return execute.Time(t.UnixNano())
|
||||
return t
|
||||
}
|
||||
|
||||
// ts takes an RFC3339 time string and returns an execute.Time from it using the unix timestamp.
|
||||
func ts(s string) execute.Time {
|
||||
return execute.Time(mustParseTime(s).UnixNano())
|
||||
}
|
||||
|
|
|
@ -1177,6 +1177,126 @@ func TestTranspiler(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
s: `SELECT mean(value) FROM db0..cpu GROUP BY host`,
|
||||
spec: &query.Spec{
|
||||
Operations: []*query.Operation{
|
||||
{
|
||||
ID: "from0",
|
||||
Spec: &functions.FromOpSpec{
|
||||
Bucket: "db0/autogen",
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "range0",
|
||||
Spec: &functions.RangeOpSpec{
|
||||
Start: query.Time{Absolute: time.Unix(0, influxqllib.MinTime)},
|
||||
Stop: query.Time{Absolute: time.Unix(0, influxqllib.MaxTime)},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "filter0",
|
||||
Spec: &functions.FilterOpSpec{
|
||||
Fn: &semantic.FunctionExpression{
|
||||
Params: []*semantic.FunctionParam{
|
||||
{Key: &semantic.Identifier{Name: "r"}},
|
||||
},
|
||||
Body: &semantic.LogicalExpression{
|
||||
Operator: ast.AndOperator,
|
||||
Left: &semantic.BinaryExpression{
|
||||
Operator: ast.EqualOperator,
|
||||
Left: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{
|
||||
Name: "r",
|
||||
},
|
||||
Property: "_measurement",
|
||||
},
|
||||
Right: &semantic.StringLiteral{
|
||||
Value: "cpu",
|
||||
},
|
||||
},
|
||||
Right: &semantic.BinaryExpression{
|
||||
Operator: ast.EqualOperator,
|
||||
Left: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{
|
||||
Name: "r",
|
||||
},
|
||||
Property: "_field",
|
||||
},
|
||||
Right: &semantic.StringLiteral{
|
||||
Value: "value",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "group0",
|
||||
Spec: &functions.GroupOpSpec{
|
||||
By: []string{"_measurement", "host"},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "mean0",
|
||||
Spec: &functions.MeanOpSpec{
|
||||
AggregateConfig: execute.AggregateConfig{
|
||||
TimeSrc: execute.DefaultStartColLabel,
|
||||
TimeDst: execute.DefaultTimeColLabel,
|
||||
Columns: []string{execute.DefaultValueColLabel},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "map0",
|
||||
Spec: &functions.MapOpSpec{
|
||||
Fn: &semantic.FunctionExpression{
|
||||
Params: []*semantic.FunctionParam{{
|
||||
Key: &semantic.Identifier{Name: "r"},
|
||||
}},
|
||||
Body: &semantic.ObjectExpression{
|
||||
Properties: []*semantic.Property{
|
||||
{
|
||||
Key: &semantic.Identifier{Name: "_time"},
|
||||
Value: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{
|
||||
Name: "r",
|
||||
},
|
||||
Property: "_time",
|
||||
},
|
||||
},
|
||||
{
|
||||
Key: &semantic.Identifier{Name: "mean"},
|
||||
Value: &semantic.MemberExpression{
|
||||
Object: &semantic.IdentifierExpression{
|
||||
Name: "r",
|
||||
},
|
||||
Property: "_value",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
MergeKey: true,
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "yield0",
|
||||
Spec: &functions.YieldOpSpec{
|
||||
Name: "0",
|
||||
},
|
||||
},
|
||||
},
|
||||
Edges: []query.Edge{
|
||||
{Parent: "from0", Child: "range0"},
|
||||
{Parent: "range0", Child: "filter0"},
|
||||
{Parent: "filter0", Child: "group0"},
|
||||
{Parent: "group0", Child: "mean0"},
|
||||
{Parent: "mean0", Child: "map0"},
|
||||
{Parent: "map0", Child: "yield0"},
|
||||
},
|
||||
},
|
||||
},
|
||||
} {
|
||||
t.Run(tt.s, func(t *testing.T) {
|
||||
if err := tt.spec.Validate(); err != nil {
|
||||
|
|
|
@ -18,7 +18,6 @@ import (
|
|||
var skipTests = map[string]string{
|
||||
"derivative": "derivative not supported by influxql (https://github.com/influxdata/platform/issues/93)",
|
||||
"filter_by_tags": "arbitrary filtering not supported by influxql (https://github.com/influxdata/platform/issues/94)",
|
||||
"group_ungroup": "influxql/flux disagreement on keycols (https://github.com/influxdata/platform/issues/95)",
|
||||
"window": "ordering of results differs between queries (https://github.com/influxdata/platform/issues/96)",
|
||||
"window_group_mean_ungroup": "error in influxql: failed to run query: timeValue column \"_start\" does not exist (https://github.com/influxdata/platform/issues/97)",
|
||||
}
|
||||
|
|
|
@ -1,3 +1,7 @@
|
|||
from(db:"testdb")
|
||||
|> range(start: 2018-05-23T13:09:22.885021542Z)
|
||||
|> group(by: ["name"])
|
||||
|> filter(fn: (r) => r._measurement == "diskio" and r._field == "io_time")
|
||||
|> group(by: ["_measurement", "name"])
|
||||
|> max()
|
||||
|> map(fn: (r) => {_time: r._time, max: r._value})
|
||||
|> yield(name: "0")
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
SELECT max(io_time) FROM testdb..diskio GROUP BY "name"
|
|
@ -1,17 +1,7 @@
|
|||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string,string
|
||||
#partition,false,false,false,false,false,false,false,false,false,true
|
||||
#default,_result,,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,host,name
|
||||
,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,15204688,io_time,diskio,host.local,disk0
|
||||
,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,15204894,io_time,diskio,host.local,disk0
|
||||
,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,15205102,io_time,diskio,host.local,disk0
|
||||
,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:56Z,15205226,io_time,diskio,host.local,disk0
|
||||
,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:06Z,15205499,io_time,diskio,host.local,disk0
|
||||
,,0,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,15205755,io_time,diskio,host.local,disk0
|
||||
,,1,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,648,io_time,diskio,host.local,disk2
|
||||
,,1,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,648,io_time,diskio,host.local,disk2
|
||||
,,1,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,648,io_time,diskio,host.local,disk2
|
||||
,,1,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:56Z,648,io_time,diskio,host.local,disk2
|
||||
,,1,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:06Z,648,io_time,diskio,host.local,disk2
|
||||
,,1,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,648,io_time,diskio,host.local,disk2
|
||||
#datatype,string,long,string,string,dateTime:RFC3339,long
|
||||
#partition,false,false,true,true,false,false
|
||||
#default,0,,,,,
|
||||
,result,table,_measurement,name,_time,max
|
||||
,,0,diskio,disk0,2018-05-22T19:54:16Z,15205755
|
||||
,,1,diskio,disk2,2018-05-22T19:53:26Z,648
|
||||
|
||||
|
|
|
|
@ -1 +0,0 @@
|
|||
select io_time from testdb..diskio group by "name"
|
Loading…
Reference in New Issue