Merge pull request #7854 from influxdata/js-7846-subquery-tag-propagation

Update subqueries so groupings are propagated to inner queries
pull/7861/head
Jonathan A. Sternberg 2017-01-23 14:47:18 -06:00 committed by GitHub
commit f199c50d25
7 changed files with 144 additions and 27 deletions

View File

@ -4705,6 +4705,11 @@ func TestServer_Query_Subqueries(t *testing.T) {
command: `SELECT min(value) FROM (SELECT max(usage_user), usage_user - usage_system AS value FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","min"],"values":[["2000-01-01T00:00:10Z",-44]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT min(value) FROM (SELECT top(usage_user, 2), usage_user - usage_system AS value FROM cpu) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z' GROUP BY host`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","min"],"values":[["2000-01-01T00:00:10Z",-10]]},{"name":"cpu","tags":{"host":"server02"},"columns":["time","min"],"values":[["2000-01-01T00:00:10Z",-44]]}]}]}`,
},
}...)
for i, query := range test.queries {

View File

@ -1922,7 +1922,7 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error {
return fmt.Errorf("invalid group interval: %v", err)
}
if c, ok := expr.Args[0].(*Call); ok && groupByInterval == 0 {
if c, ok := expr.Args[0].(*Call); ok && groupByInterval == 0 && tr != targetSubquery {
return fmt.Errorf("%s aggregate requires a GROUP BY interval", expr.Name)
} else if !ok && groupByInterval > 0 {
return fmt.Errorf("aggregate function required inside the call to %s", expr.Name)
@ -1983,7 +1983,7 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error {
return fmt.Errorf("invalid group interval: %v", err)
}
if _, ok := expr.Args[0].(*Call); ok && groupByInterval == 0 {
if _, ok := expr.Args[0].(*Call); ok && groupByInterval == 0 && tr != targetSubquery {
return fmt.Errorf("%s aggregate requires a GROUP BY interval", expr.Name)
} else if !ok {
return fmt.Errorf("must use aggregate function with %s", expr.Name)
@ -2043,6 +2043,11 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error {
// If we have an aggregate function with a group by time without a where clause, it's an invalid statement
if tr == targetNotRequired { // ignore create continuous query statements
if err := s.validateTimeExpression(); err != nil {
return err
}
}
if tr != targetSubquery {
if err := s.validateGroupByInterval(); err != nil {
return err
}
@ -2050,10 +2055,10 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error {
return nil
}
// validateGroupByInterval ensures that any select statements that have a group
// validateTimeExpression ensures that any select statements that have a group
// by interval either have a time expression limiting the time range or have a
// parent query that does that.
func (s *SelectStatement) validateGroupByInterval() error {
func (s *SelectStatement) validateTimeExpression() error {
// If we have a time expression, we and all subqueries are fine.
if HasTimeExpr(s.Condition) {
return nil
@ -2072,6 +2077,44 @@ func (s *SelectStatement) validateGroupByInterval() error {
// statement, we don't need to do this because parent time ranges propagate
// to children. So we only execute this when there is no time condition in
// the parent.
for _, source := range s.Sources {
switch source := source.(type) {
case *SubQuery:
if err := source.Statement.validateTimeExpression(); err != nil {
return err
}
}
}
return nil
}
// validateGroupByInterval ensures that a select statement is grouped by an
// interval if it contains certain functions.
func (s *SelectStatement) validateGroupByInterval() error {
interval, err := s.GroupByInterval()
if err != nil {
return err
} else if interval > 0 {
// If we have an interval here, that means the interval will propagate
// into any subqueries and we can just stop looking.
return nil
}
// Check inside of the fields for any of the specific functions that ned a group by interval.
for _, f := range s.Fields {
switch expr := f.Expr.(type) {
case *Call:
switch expr.Name {
case "derivative", "non_negative_derivative", "difference", "moving_average", "cumulative_sum", "elapsed", "holt_winters", "holt_winters_with_fit":
// If the first argument is a call, we needed a group by interval and we don't have one.
if _, ok := expr.Args[0].(*Call); ok {
return fmt.Errorf("%s aggregate requires a GROUP BY interval", expr.Name)
}
}
}
}
// Validate the subqueries.
for _, source := range s.Sources {
switch source := source.(type) {
case *SubQuery:
@ -4503,11 +4546,14 @@ func FieldDimensions(sources Sources, m FieldMapper) (fields map[string]DataType
fields[k] = typ
}
}
for _, d := range src.Statement.Dimensions {
switch d := d.Expr.(type) {
case *VarRef:
dimensions[d.Val] = struct{}{}
}
_, d, err := FieldDimensions(src.Statement.Sources, m)
if err != nil {
return nil, nil, err
}
for k := range d {
dimensions[k] = struct{}{}
}
}
}

View File

@ -411,7 +411,7 @@ func TestSelectStatement_RewriteFields(t *testing.T) {
// Rewrite subquery
{
stmt: `SELECT * FROM (SELECT mean(value1) FROM cpu GROUP BY host) GROUP BY *`,
rewrite: `SELECT mean::float FROM (SELECT mean(value1::float) FROM cpu GROUP BY host) GROUP BY host`,
rewrite: `SELECT mean::float FROM (SELECT mean(value1::float) FROM cpu GROUP BY host) GROUP BY host, region`,
},
}

View File

@ -96,7 +96,7 @@ func NewFloatSliceFuncReducer(fn FloatReduceSliceFunc) *FloatSliceFuncReducer {
// AggregateFloat copies the FloatPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *FloatSliceFuncReducer) AggregateFloat(p *FloatPoint) {
r.points = append(r.points, *p)
r.points = append(r.points, *p.Clone())
}
// AggregateFloatBulk performs a bulk copy of FloatPoints into the internal slice.
@ -166,7 +166,7 @@ func NewFloatSliceFuncIntegerReducer(fn FloatReduceIntegerSliceFunc) *FloatSlice
// AggregateFloat copies the FloatPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *FloatSliceFuncIntegerReducer) AggregateFloat(p *FloatPoint) {
r.points = append(r.points, *p)
r.points = append(r.points, *p.Clone())
}
// AggregateFloatBulk performs a bulk copy of FloatPoints into the internal slice.
@ -236,7 +236,7 @@ func NewFloatSliceFuncStringReducer(fn FloatReduceStringSliceFunc) *FloatSliceFu
// AggregateFloat copies the FloatPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *FloatSliceFuncStringReducer) AggregateFloat(p *FloatPoint) {
r.points = append(r.points, *p)
r.points = append(r.points, *p.Clone())
}
// AggregateFloatBulk performs a bulk copy of FloatPoints into the internal slice.
@ -306,7 +306,7 @@ func NewFloatSliceFuncBooleanReducer(fn FloatReduceBooleanSliceFunc) *FloatSlice
// AggregateFloat copies the FloatPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *FloatSliceFuncBooleanReducer) AggregateFloat(p *FloatPoint) {
r.points = append(r.points, *p)
r.points = append(r.points, *p.Clone())
}
// AggregateFloatBulk performs a bulk copy of FloatPoints into the internal slice.
@ -510,7 +510,7 @@ func NewIntegerSliceFuncFloatReducer(fn IntegerReduceFloatSliceFunc) *IntegerSli
// AggregateInteger copies the IntegerPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *IntegerSliceFuncFloatReducer) AggregateInteger(p *IntegerPoint) {
r.points = append(r.points, *p)
r.points = append(r.points, *p.Clone())
}
// AggregateIntegerBulk performs a bulk copy of IntegerPoints into the internal slice.
@ -580,7 +580,7 @@ func NewIntegerSliceFuncReducer(fn IntegerReduceSliceFunc) *IntegerSliceFuncRedu
// AggregateInteger copies the IntegerPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *IntegerSliceFuncReducer) AggregateInteger(p *IntegerPoint) {
r.points = append(r.points, *p)
r.points = append(r.points, *p.Clone())
}
// AggregateIntegerBulk performs a bulk copy of IntegerPoints into the internal slice.
@ -650,7 +650,7 @@ func NewIntegerSliceFuncStringReducer(fn IntegerReduceStringSliceFunc) *IntegerS
// AggregateInteger copies the IntegerPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *IntegerSliceFuncStringReducer) AggregateInteger(p *IntegerPoint) {
r.points = append(r.points, *p)
r.points = append(r.points, *p.Clone())
}
// AggregateIntegerBulk performs a bulk copy of IntegerPoints into the internal slice.
@ -720,7 +720,7 @@ func NewIntegerSliceFuncBooleanReducer(fn IntegerReduceBooleanSliceFunc) *Intege
// AggregateInteger copies the IntegerPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *IntegerSliceFuncBooleanReducer) AggregateInteger(p *IntegerPoint) {
r.points = append(r.points, *p)
r.points = append(r.points, *p.Clone())
}
// AggregateIntegerBulk performs a bulk copy of IntegerPoints into the internal slice.
@ -924,7 +924,7 @@ func NewStringSliceFuncFloatReducer(fn StringReduceFloatSliceFunc) *StringSliceF
// AggregateString copies the StringPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *StringSliceFuncFloatReducer) AggregateString(p *StringPoint) {
r.points = append(r.points, *p)
r.points = append(r.points, *p.Clone())
}
// AggregateStringBulk performs a bulk copy of StringPoints into the internal slice.
@ -994,7 +994,7 @@ func NewStringSliceFuncIntegerReducer(fn StringReduceIntegerSliceFunc) *StringSl
// AggregateString copies the StringPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *StringSliceFuncIntegerReducer) AggregateString(p *StringPoint) {
r.points = append(r.points, *p)
r.points = append(r.points, *p.Clone())
}
// AggregateStringBulk performs a bulk copy of StringPoints into the internal slice.
@ -1064,7 +1064,7 @@ func NewStringSliceFuncReducer(fn StringReduceSliceFunc) *StringSliceFuncReducer
// AggregateString copies the StringPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *StringSliceFuncReducer) AggregateString(p *StringPoint) {
r.points = append(r.points, *p)
r.points = append(r.points, *p.Clone())
}
// AggregateStringBulk performs a bulk copy of StringPoints into the internal slice.
@ -1134,7 +1134,7 @@ func NewStringSliceFuncBooleanReducer(fn StringReduceBooleanSliceFunc) *StringSl
// AggregateString copies the StringPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *StringSliceFuncBooleanReducer) AggregateString(p *StringPoint) {
r.points = append(r.points, *p)
r.points = append(r.points, *p.Clone())
}
// AggregateStringBulk performs a bulk copy of StringPoints into the internal slice.
@ -1338,7 +1338,7 @@ func NewBooleanSliceFuncFloatReducer(fn BooleanReduceFloatSliceFunc) *BooleanSli
// AggregateBoolean copies the BooleanPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *BooleanSliceFuncFloatReducer) AggregateBoolean(p *BooleanPoint) {
r.points = append(r.points, *p)
r.points = append(r.points, *p.Clone())
}
// AggregateBooleanBulk performs a bulk copy of BooleanPoints into the internal slice.
@ -1408,7 +1408,7 @@ func NewBooleanSliceFuncIntegerReducer(fn BooleanReduceIntegerSliceFunc) *Boolea
// AggregateBoolean copies the BooleanPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *BooleanSliceFuncIntegerReducer) AggregateBoolean(p *BooleanPoint) {
r.points = append(r.points, *p)
r.points = append(r.points, *p.Clone())
}
// AggregateBooleanBulk performs a bulk copy of BooleanPoints into the internal slice.
@ -1478,7 +1478,7 @@ func NewBooleanSliceFuncStringReducer(fn BooleanReduceStringSliceFunc) *BooleanS
// AggregateBoolean copies the BooleanPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *BooleanSliceFuncStringReducer) AggregateBoolean(p *BooleanPoint) {
r.points = append(r.points, *p)
r.points = append(r.points, *p.Clone())
}
// AggregateBooleanBulk performs a bulk copy of BooleanPoints into the internal slice.
@ -1548,7 +1548,7 @@ func NewBooleanSliceFuncReducer(fn BooleanReduceSliceFunc) *BooleanSliceFuncRedu
// AggregateBoolean copies the BooleanPoint into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *BooleanSliceFuncReducer) AggregateBoolean(p *BooleanPoint) {
r.points = append(r.points, *p)
r.points = append(r.points, *p.Clone())
}
// AggregateBooleanBulk performs a bulk copy of BooleanPoints into the internal slice.

View File

@ -94,7 +94,7 @@ func New{{$k.Name}}SliceFunc{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer(f
// Aggregate{{$k.Name}} copies the {{$k.Name}}Point into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *{{$k.Name}}SliceFunc{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer) Aggregate{{$k.Name}}(p *{{$k.Name}}Point) {
r.points = append(r.points, *p)
r.points = append(r.points, *p.Clone())
}
// Aggregate{{$k.Name}}Bulk performs a bulk copy of {{$k.Name}}Points into the internal slice.

View File

@ -755,7 +755,11 @@ func newIteratorOptionsSubstatement(stmt *SelectStatement, opt IteratorOptions)
if subOpt.EndTime > opt.EndTime {
subOpt.EndTime = opt.EndTime
}
// Propagate the dimensions to the inner subquery.
subOpt.Dimensions = opt.Dimensions
for d := range opt.GroupBy {
subOpt.GroupBy[d] = struct{}{}
}
subOpt.InterruptCh = opt.InterruptCh
// Propagate the SLIMIT and SOFFSET from the outer query.
@ -778,6 +782,12 @@ func newIteratorOptionsSubstatement(stmt *SelectStatement, opt IteratorOptions)
return IteratorOptions{}, err
}
subOpt.Ordered = opt.Ordered && (interval == 0 && stmt.HasSelector())
// If there is no interval for this subquery, but the outer query has an
// interval, inherit the parent interval.
if interval == 0 {
subOpt.Interval = opt.Interval
}
return subOpt, nil
}

View File

@ -1248,6 +1248,61 @@ func TestParser_ParseStatement(t *testing.T) {
},
},
{
s: `SELECT sum(derivative) FROM (SELECT derivative(mean(value)) FROM cpu GROUP BY host) WHERE time >= now() - 1d GROUP BY time(1h)`,
stmt: &influxql.SelectStatement{
Fields: []*influxql.Field{{
Expr: &influxql.Call{
Name: "sum",
Args: []influxql.Expr{
&influxql.VarRef{Val: "derivative"},
}},
}},
Dimensions: []*influxql.Dimension{{
Expr: &influxql.Call{
Name: "time",
Args: []influxql.Expr{
&influxql.DurationLiteral{Val: time.Hour},
},
},
}},
Sources: []influxql.Source{
&influxql.SubQuery{
Statement: &influxql.SelectStatement{
Fields: []*influxql.Field{{
Expr: &influxql.Call{
Name: "derivative",
Args: []influxql.Expr{
&influxql.Call{
Name: "mean",
Args: []influxql.Expr{
&influxql.VarRef{Val: "value"},
},
},
},
},
}},
Dimensions: []*influxql.Dimension{{
Expr: &influxql.VarRef{Val: "host"},
}},
Sources: []influxql.Source{
&influxql.Measurement{Name: "cpu"},
},
},
},
},
Condition: &influxql.BinaryExpr{
Op: influxql.GTE,
LHS: &influxql.VarRef{Val: "time"},
RHS: &influxql.BinaryExpr{
Op: influxql.SUB,
LHS: &influxql.Call{Name: "now"},
RHS: &influxql.DurationLiteral{Val: 24 * time.Hour},
},
},
},
},
// See issues https://github.com/influxdata/influxdb/issues/1647
// and https://github.com/influxdata/influxdb/issues/4404
// DELETE statement
@ -2434,6 +2489,7 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `SELECT derivative(max()) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for max, expected 1, got 0`},
{s: `SELECT derivative(percentile(value)) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for percentile, expected 2, got 1`},
{s: `SELECT derivative(mean(value), 1h) FROM myseries where time < now() and time > now() - 1d`, err: `derivative aggregate requires a GROUP BY interval`},
{s: `SELECT min(derivative) FROM (SELECT derivative(mean(value), 1h) FROM myseries) where time < now() and time > now() - 1d`, err: `derivative aggregate requires a GROUP BY interval`},
{s: `SELECT non_negative_derivative(), field1 FROM myseries`, err: `mixing aggregate and non-aggregate queries is not supported`},
{s: `select non_negative_derivative() from myseries`, err: `invalid number of arguments for non_negative_derivative, expected at least 1 but no more than 2, got 0`},
{s: `select non_negative_derivative(mean(value), 1h, 3) from myseries`, err: `invalid number of arguments for non_negative_derivative, expected at least 1 but no more than 2, got 3`},