diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index e1e2ad78b3..1aab053138 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -4705,6 +4705,11 @@ func TestServer_Query_Subqueries(t *testing.T) { command: `SELECT min(value) FROM (SELECT max(usage_user), usage_user - usage_system AS value FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`, exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","min"],"values":[["2000-01-01T00:00:10Z",-44]]}]}]}`, }, + &Query{ + params: url.Values{"db": []string{"db0"}}, + command: `SELECT min(value) FROM (SELECT top(usage_user, 2), usage_user - usage_system AS value FROM cpu) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z' GROUP BY host`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","min"],"values":[["2000-01-01T00:00:10Z",-10]]},{"name":"cpu","tags":{"host":"server02"},"columns":["time","min"],"values":[["2000-01-01T00:00:10Z",-44]]}]}]}`, + }, }...) for i, query := range test.queries { diff --git a/influxql/ast.go b/influxql/ast.go index cf75bd6630..a01f56388d 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -1922,7 +1922,7 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error { return fmt.Errorf("invalid group interval: %v", err) } - if c, ok := expr.Args[0].(*Call); ok && groupByInterval == 0 { + if c, ok := expr.Args[0].(*Call); ok && groupByInterval == 0 && tr != targetSubquery { return fmt.Errorf("%s aggregate requires a GROUP BY interval", expr.Name) } else if !ok && groupByInterval > 0 { return fmt.Errorf("aggregate function required inside the call to %s", expr.Name) @@ -1983,7 +1983,7 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error { return fmt.Errorf("invalid group interval: %v", err) } - if _, ok := expr.Args[0].(*Call); ok && groupByInterval == 0 { + if _, ok := expr.Args[0].(*Call); ok && groupByInterval == 0 && tr != targetSubquery { return fmt.Errorf("%s aggregate requires a GROUP BY interval", expr.Name) } else if !ok { return fmt.Errorf("must use aggregate function with %s", expr.Name) @@ -2043,6 +2043,11 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error { // If we have an aggregate function with a group by time without a where clause, it's an invalid statement if tr == targetNotRequired { // ignore create continuous query statements + if err := s.validateTimeExpression(); err != nil { + return err + } + } + if tr != targetSubquery { if err := s.validateGroupByInterval(); err != nil { return err } @@ -2050,10 +2055,10 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error { return nil } -// validateGroupByInterval ensures that any select statements that have a group +// validateTimeExpression ensures that any select statements that have a group // by interval either have a time expression limiting the time range or have a // parent query that does that. -func (s *SelectStatement) validateGroupByInterval() error { +func (s *SelectStatement) validateTimeExpression() error { // If we have a time expression, we and all subqueries are fine. if HasTimeExpr(s.Condition) { return nil @@ -2072,6 +2077,44 @@ func (s *SelectStatement) validateGroupByInterval() error { // statement, we don't need to do this because parent time ranges propagate // to children. So we only execute this when there is no time condition in // the parent. + for _, source := range s.Sources { + switch source := source.(type) { + case *SubQuery: + if err := source.Statement.validateTimeExpression(); err != nil { + return err + } + } + } + return nil +} + +// validateGroupByInterval ensures that a select statement is grouped by an +// interval if it contains certain functions. +func (s *SelectStatement) validateGroupByInterval() error { + interval, err := s.GroupByInterval() + if err != nil { + return err + } else if interval > 0 { + // If we have an interval here, that means the interval will propagate + // into any subqueries and we can just stop looking. + return nil + } + + // Check inside of the fields for any of the specific functions that ned a group by interval. + for _, f := range s.Fields { + switch expr := f.Expr.(type) { + case *Call: + switch expr.Name { + case "derivative", "non_negative_derivative", "difference", "moving_average", "cumulative_sum", "elapsed", "holt_winters", "holt_winters_with_fit": + // If the first argument is a call, we needed a group by interval and we don't have one. + if _, ok := expr.Args[0].(*Call); ok { + return fmt.Errorf("%s aggregate requires a GROUP BY interval", expr.Name) + } + } + } + } + + // Validate the subqueries. for _, source := range s.Sources { switch source := source.(type) { case *SubQuery: @@ -4503,11 +4546,14 @@ func FieldDimensions(sources Sources, m FieldMapper) (fields map[string]DataType fields[k] = typ } } - for _, d := range src.Statement.Dimensions { - switch d := d.Expr.(type) { - case *VarRef: - dimensions[d.Val] = struct{}{} - } + + _, d, err := FieldDimensions(src.Statement.Sources, m) + if err != nil { + return nil, nil, err + } + + for k := range d { + dimensions[k] = struct{}{} } } } diff --git a/influxql/ast_test.go b/influxql/ast_test.go index 65f7126542..f7c78c84d5 100644 --- a/influxql/ast_test.go +++ b/influxql/ast_test.go @@ -411,7 +411,7 @@ func TestSelectStatement_RewriteFields(t *testing.T) { // Rewrite subquery { stmt: `SELECT * FROM (SELECT mean(value1) FROM cpu GROUP BY host) GROUP BY *`, - rewrite: `SELECT mean::float FROM (SELECT mean(value1::float) FROM cpu GROUP BY host) GROUP BY host`, + rewrite: `SELECT mean::float FROM (SELECT mean(value1::float) FROM cpu GROUP BY host) GROUP BY host, region`, }, } diff --git a/influxql/functions.gen.go b/influxql/functions.gen.go index 1bc391db45..5723d56cf6 100644 --- a/influxql/functions.gen.go +++ b/influxql/functions.gen.go @@ -96,7 +96,7 @@ func NewFloatSliceFuncReducer(fn FloatReduceSliceFunc) *FloatSliceFuncReducer { // AggregateFloat copies the FloatPoint into the internal slice to be passed // to the reduce function when Emit is called. func (r *FloatSliceFuncReducer) AggregateFloat(p *FloatPoint) { - r.points = append(r.points, *p) + r.points = append(r.points, *p.Clone()) } // AggregateFloatBulk performs a bulk copy of FloatPoints into the internal slice. @@ -166,7 +166,7 @@ func NewFloatSliceFuncIntegerReducer(fn FloatReduceIntegerSliceFunc) *FloatSlice // AggregateFloat copies the FloatPoint into the internal slice to be passed // to the reduce function when Emit is called. func (r *FloatSliceFuncIntegerReducer) AggregateFloat(p *FloatPoint) { - r.points = append(r.points, *p) + r.points = append(r.points, *p.Clone()) } // AggregateFloatBulk performs a bulk copy of FloatPoints into the internal slice. @@ -236,7 +236,7 @@ func NewFloatSliceFuncStringReducer(fn FloatReduceStringSliceFunc) *FloatSliceFu // AggregateFloat copies the FloatPoint into the internal slice to be passed // to the reduce function when Emit is called. func (r *FloatSliceFuncStringReducer) AggregateFloat(p *FloatPoint) { - r.points = append(r.points, *p) + r.points = append(r.points, *p.Clone()) } // AggregateFloatBulk performs a bulk copy of FloatPoints into the internal slice. @@ -306,7 +306,7 @@ func NewFloatSliceFuncBooleanReducer(fn FloatReduceBooleanSliceFunc) *FloatSlice // AggregateFloat copies the FloatPoint into the internal slice to be passed // to the reduce function when Emit is called. func (r *FloatSliceFuncBooleanReducer) AggregateFloat(p *FloatPoint) { - r.points = append(r.points, *p) + r.points = append(r.points, *p.Clone()) } // AggregateFloatBulk performs a bulk copy of FloatPoints into the internal slice. @@ -510,7 +510,7 @@ func NewIntegerSliceFuncFloatReducer(fn IntegerReduceFloatSliceFunc) *IntegerSli // AggregateInteger copies the IntegerPoint into the internal slice to be passed // to the reduce function when Emit is called. func (r *IntegerSliceFuncFloatReducer) AggregateInteger(p *IntegerPoint) { - r.points = append(r.points, *p) + r.points = append(r.points, *p.Clone()) } // AggregateIntegerBulk performs a bulk copy of IntegerPoints into the internal slice. @@ -580,7 +580,7 @@ func NewIntegerSliceFuncReducer(fn IntegerReduceSliceFunc) *IntegerSliceFuncRedu // AggregateInteger copies the IntegerPoint into the internal slice to be passed // to the reduce function when Emit is called. func (r *IntegerSliceFuncReducer) AggregateInteger(p *IntegerPoint) { - r.points = append(r.points, *p) + r.points = append(r.points, *p.Clone()) } // AggregateIntegerBulk performs a bulk copy of IntegerPoints into the internal slice. @@ -650,7 +650,7 @@ func NewIntegerSliceFuncStringReducer(fn IntegerReduceStringSliceFunc) *IntegerS // AggregateInteger copies the IntegerPoint into the internal slice to be passed // to the reduce function when Emit is called. func (r *IntegerSliceFuncStringReducer) AggregateInteger(p *IntegerPoint) { - r.points = append(r.points, *p) + r.points = append(r.points, *p.Clone()) } // AggregateIntegerBulk performs a bulk copy of IntegerPoints into the internal slice. @@ -720,7 +720,7 @@ func NewIntegerSliceFuncBooleanReducer(fn IntegerReduceBooleanSliceFunc) *Intege // AggregateInteger copies the IntegerPoint into the internal slice to be passed // to the reduce function when Emit is called. func (r *IntegerSliceFuncBooleanReducer) AggregateInteger(p *IntegerPoint) { - r.points = append(r.points, *p) + r.points = append(r.points, *p.Clone()) } // AggregateIntegerBulk performs a bulk copy of IntegerPoints into the internal slice. @@ -924,7 +924,7 @@ func NewStringSliceFuncFloatReducer(fn StringReduceFloatSliceFunc) *StringSliceF // AggregateString copies the StringPoint into the internal slice to be passed // to the reduce function when Emit is called. func (r *StringSliceFuncFloatReducer) AggregateString(p *StringPoint) { - r.points = append(r.points, *p) + r.points = append(r.points, *p.Clone()) } // AggregateStringBulk performs a bulk copy of StringPoints into the internal slice. @@ -994,7 +994,7 @@ func NewStringSliceFuncIntegerReducer(fn StringReduceIntegerSliceFunc) *StringSl // AggregateString copies the StringPoint into the internal slice to be passed // to the reduce function when Emit is called. func (r *StringSliceFuncIntegerReducer) AggregateString(p *StringPoint) { - r.points = append(r.points, *p) + r.points = append(r.points, *p.Clone()) } // AggregateStringBulk performs a bulk copy of StringPoints into the internal slice. @@ -1064,7 +1064,7 @@ func NewStringSliceFuncReducer(fn StringReduceSliceFunc) *StringSliceFuncReducer // AggregateString copies the StringPoint into the internal slice to be passed // to the reduce function when Emit is called. func (r *StringSliceFuncReducer) AggregateString(p *StringPoint) { - r.points = append(r.points, *p) + r.points = append(r.points, *p.Clone()) } // AggregateStringBulk performs a bulk copy of StringPoints into the internal slice. @@ -1134,7 +1134,7 @@ func NewStringSliceFuncBooleanReducer(fn StringReduceBooleanSliceFunc) *StringSl // AggregateString copies the StringPoint into the internal slice to be passed // to the reduce function when Emit is called. func (r *StringSliceFuncBooleanReducer) AggregateString(p *StringPoint) { - r.points = append(r.points, *p) + r.points = append(r.points, *p.Clone()) } // AggregateStringBulk performs a bulk copy of StringPoints into the internal slice. @@ -1338,7 +1338,7 @@ func NewBooleanSliceFuncFloatReducer(fn BooleanReduceFloatSliceFunc) *BooleanSli // AggregateBoolean copies the BooleanPoint into the internal slice to be passed // to the reduce function when Emit is called. func (r *BooleanSliceFuncFloatReducer) AggregateBoolean(p *BooleanPoint) { - r.points = append(r.points, *p) + r.points = append(r.points, *p.Clone()) } // AggregateBooleanBulk performs a bulk copy of BooleanPoints into the internal slice. @@ -1408,7 +1408,7 @@ func NewBooleanSliceFuncIntegerReducer(fn BooleanReduceIntegerSliceFunc) *Boolea // AggregateBoolean copies the BooleanPoint into the internal slice to be passed // to the reduce function when Emit is called. func (r *BooleanSliceFuncIntegerReducer) AggregateBoolean(p *BooleanPoint) { - r.points = append(r.points, *p) + r.points = append(r.points, *p.Clone()) } // AggregateBooleanBulk performs a bulk copy of BooleanPoints into the internal slice. @@ -1478,7 +1478,7 @@ func NewBooleanSliceFuncStringReducer(fn BooleanReduceStringSliceFunc) *BooleanS // AggregateBoolean copies the BooleanPoint into the internal slice to be passed // to the reduce function when Emit is called. func (r *BooleanSliceFuncStringReducer) AggregateBoolean(p *BooleanPoint) { - r.points = append(r.points, *p) + r.points = append(r.points, *p.Clone()) } // AggregateBooleanBulk performs a bulk copy of BooleanPoints into the internal slice. @@ -1548,7 +1548,7 @@ func NewBooleanSliceFuncReducer(fn BooleanReduceSliceFunc) *BooleanSliceFuncRedu // AggregateBoolean copies the BooleanPoint into the internal slice to be passed // to the reduce function when Emit is called. func (r *BooleanSliceFuncReducer) AggregateBoolean(p *BooleanPoint) { - r.points = append(r.points, *p) + r.points = append(r.points, *p.Clone()) } // AggregateBooleanBulk performs a bulk copy of BooleanPoints into the internal slice. diff --git a/influxql/functions.gen.go.tmpl b/influxql/functions.gen.go.tmpl index 86a50862a6..b23820e396 100644 --- a/influxql/functions.gen.go.tmpl +++ b/influxql/functions.gen.go.tmpl @@ -94,7 +94,7 @@ func New{{$k.Name}}SliceFunc{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer(f // Aggregate{{$k.Name}} copies the {{$k.Name}}Point into the internal slice to be passed // to the reduce function when Emit is called. func (r *{{$k.Name}}SliceFunc{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer) Aggregate{{$k.Name}}(p *{{$k.Name}}Point) { - r.points = append(r.points, *p) + r.points = append(r.points, *p.Clone()) } // Aggregate{{$k.Name}}Bulk performs a bulk copy of {{$k.Name}}Points into the internal slice. diff --git a/influxql/iterator.go b/influxql/iterator.go index 48a820be16..acb45fac36 100644 --- a/influxql/iterator.go +++ b/influxql/iterator.go @@ -755,7 +755,11 @@ func newIteratorOptionsSubstatement(stmt *SelectStatement, opt IteratorOptions) if subOpt.EndTime > opt.EndTime { subOpt.EndTime = opt.EndTime } + // Propagate the dimensions to the inner subquery. subOpt.Dimensions = opt.Dimensions + for d := range opt.GroupBy { + subOpt.GroupBy[d] = struct{}{} + } subOpt.InterruptCh = opt.InterruptCh // Propagate the SLIMIT and SOFFSET from the outer query. @@ -778,6 +782,12 @@ func newIteratorOptionsSubstatement(stmt *SelectStatement, opt IteratorOptions) return IteratorOptions{}, err } subOpt.Ordered = opt.Ordered && (interval == 0 && stmt.HasSelector()) + + // If there is no interval for this subquery, but the outer query has an + // interval, inherit the parent interval. + if interval == 0 { + subOpt.Interval = opt.Interval + } return subOpt, nil } diff --git a/influxql/parser_test.go b/influxql/parser_test.go index d6f82d4141..194dbba2d4 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -1248,6 +1248,61 @@ func TestParser_ParseStatement(t *testing.T) { }, }, + { + s: `SELECT sum(derivative) FROM (SELECT derivative(mean(value)) FROM cpu GROUP BY host) WHERE time >= now() - 1d GROUP BY time(1h)`, + stmt: &influxql.SelectStatement{ + Fields: []*influxql.Field{{ + Expr: &influxql.Call{ + Name: "sum", + Args: []influxql.Expr{ + &influxql.VarRef{Val: "derivative"}, + }}, + }}, + Dimensions: []*influxql.Dimension{{ + Expr: &influxql.Call{ + Name: "time", + Args: []influxql.Expr{ + &influxql.DurationLiteral{Val: time.Hour}, + }, + }, + }}, + Sources: []influxql.Source{ + &influxql.SubQuery{ + Statement: &influxql.SelectStatement{ + Fields: []*influxql.Field{{ + Expr: &influxql.Call{ + Name: "derivative", + Args: []influxql.Expr{ + &influxql.Call{ + Name: "mean", + Args: []influxql.Expr{ + &influxql.VarRef{Val: "value"}, + }, + }, + }, + }, + }}, + Dimensions: []*influxql.Dimension{{ + Expr: &influxql.VarRef{Val: "host"}, + }}, + Sources: []influxql.Source{ + &influxql.Measurement{Name: "cpu"}, + }, + }, + }, + }, + Condition: &influxql.BinaryExpr{ + Op: influxql.GTE, + LHS: &influxql.VarRef{Val: "time"}, + RHS: &influxql.BinaryExpr{ + Op: influxql.SUB, + LHS: &influxql.Call{Name: "now"}, + RHS: &influxql.DurationLiteral{Val: 24 * time.Hour}, + }, + }, + }, + }, + // See issues https://github.com/influxdata/influxdb/issues/1647 // and https://github.com/influxdata/influxdb/issues/4404 // DELETE statement @@ -2434,6 +2489,7 @@ func TestParser_ParseStatement(t *testing.T) { {s: `SELECT derivative(max()) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for max, expected 1, got 0`}, {s: `SELECT derivative(percentile(value)) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for percentile, expected 2, got 1`}, {s: `SELECT derivative(mean(value), 1h) FROM myseries where time < now() and time > now() - 1d`, err: `derivative aggregate requires a GROUP BY interval`}, + {s: `SELECT min(derivative) FROM (SELECT derivative(mean(value), 1h) FROM myseries) where time < now() and time > now() - 1d`, err: `derivative aggregate requires a GROUP BY interval`}, {s: `SELECT non_negative_derivative(), field1 FROM myseries`, err: `mixing aggregate and non-aggregate queries is not supported`}, {s: `select non_negative_derivative() from myseries`, err: `invalid number of arguments for non_negative_derivative, expected at least 1 but no more than 2, got 0`}, {s: `select non_negative_derivative(mean(value), 1h, 3) from myseries`, err: `invalid number of arguments for non_negative_derivative, expected at least 1 but no more than 2, got 3`},