Properly select a tag within a subquery
Previously, subqueries would only be able to select tag fields within a subquery if the subquery used a selector. But, it didn't filter out aggregates like `mean()` so it would panic instead. Now it is possible to select the tag directly instead of rewriting the query in an invalid way. Some queries in this form will still not work though. For example, the following still does not function (but also doesn't panic): SELECT count(host) FROM (SELECT mean, host FROM (SELECT mean(value) FROM cpu GROUP BY host))pull/8051/head
parent
05fcf04236
commit
72e4dd01b9
|
@ -13,6 +13,7 @@
|
|||
- [#7966](https://github.com/influxdata/influxdb/pull/7966): Prevent a panic when aggregates are used in an inner query with a raw query.
|
||||
- [#8001](https://github.com/influxdata/influxdb/issues/8001): Map types correctly when using a regex and one of the measurements is empty.
|
||||
- [#8040](https://github.com/influxdata/influxdb/issues/8040): Reduce the expression in a subquery to avoid a panic.
|
||||
- [#7968](https://github.com/influxdata/influxdb/issues/7968): Properly select a tag within a subquery.
|
||||
|
||||
## v1.2.0 [2017-01-24]
|
||||
|
||||
|
|
|
@ -4684,11 +4684,15 @@ func TestServer_Query_Subqueries(t *testing.T) {
|
|||
command: `SELECT max(min), host FROM (SELECT min(usage_user) FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","max","host"],"values":[["2000-01-01T00:00:20Z",23,"server01"]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
command: `SELECT mean, host FROM (SELECT mean(usage_user) FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","mean","host"],"values":[["2000-01-01T00:00:00Z",46,"server01"],["2000-01-01T00:00:00Z",17,"server02"]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
command: `SELECT host FROM (SELECT mean(usage_user) FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
skip: true,
|
||||
},
|
||||
&Query{
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
|
@ -4745,14 +4749,6 @@ func TestServer_Query_Subqueries(t *testing.T) {
|
|||
command: `SELECT value FROM (SELECT max(usage_user), usage_user - usage_system AS value FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z' AND value > 0`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","value"],"values":[["2000-01-01T00:00:00Z",40]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
command: `SELECT mean, host FROM (SELECT mean(usage_user) FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
|
||||
// TODO(jsternberg): This should return the hosts for each mean()
|
||||
// value. The query engine is currently limited in a way that
|
||||
// doesn't allow that to work though so we have to do this.
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","mean","host"],"values":[["2000-01-01T00:00:00Z",46,null],["2000-01-01T00:00:00Z",17,null]]}]}]}`,
|
||||
},
|
||||
}...)
|
||||
|
||||
for i, query := range test.queries {
|
||||
|
|
|
@ -3,11 +3,23 @@ package influxql
|
|||
type iteratorMapper struct {
|
||||
e *Emitter
|
||||
buf []interface{}
|
||||
fields []int // which iterator to use for an aux field
|
||||
fields []IteratorMap // which iterator to use for an aux field
|
||||
auxFields []interface{}
|
||||
}
|
||||
|
||||
func NewIteratorMapper(itrs []Iterator, fields []int, opt IteratorOptions) Iterator {
|
||||
type IteratorMap interface {
|
||||
Value(tags Tags, buf []interface{}) interface{}
|
||||
}
|
||||
|
||||
type FieldMap int
|
||||
|
||||
func (i FieldMap) Value(tags Tags, buf []interface{}) interface{} { return buf[i] }
|
||||
|
||||
type TagMap string
|
||||
|
||||
func (s TagMap) Value(tags Tags, buf []interface{}) interface{} { return tags.Value(string(s)) }
|
||||
|
||||
func NewIteratorMapper(itrs []Iterator, fields []IteratorMap, opt IteratorOptions) Iterator {
|
||||
e := NewEmitter(itrs, opt.Ascending, 0)
|
||||
e.OmitTime = true
|
||||
return &iteratorMapper{
|
||||
|
@ -26,7 +38,7 @@ func (itr *iteratorMapper) Next() (*FloatPoint, error) {
|
|||
|
||||
itr.e.readInto(t, name, tags, itr.buf)
|
||||
for i, f := range itr.fields {
|
||||
itr.auxFields[i] = itr.buf[f]
|
||||
itr.auxFields[i] = f.Value(tags, itr.buf)
|
||||
}
|
||||
return &FloatPoint{
|
||||
Name: name,
|
||||
|
|
|
@ -31,14 +31,18 @@ func TestIteratorMapper(t *testing.T) {
|
|||
{Val: "val2", Type: influxql.String},
|
||||
},
|
||||
}
|
||||
itr := influxql.NewIteratorMapper(inputs, []int{0, 1}, opt)
|
||||
itr := influxql.NewIteratorMapper(inputs, []influxql.IteratorMap{
|
||||
influxql.FieldMap(0),
|
||||
influxql.FieldMap(1),
|
||||
influxql.TagMap("host"),
|
||||
}, opt)
|
||||
if a, err := Iterators([]influxql.Iterator{itr}).ReadAll(); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
} else if !deep.Equal(a, [][]influxql.Point{
|
||||
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Aux: []interface{}{float64(1), "a"}}},
|
||||
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 5, Aux: []interface{}{float64(3), "c"}}},
|
||||
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 2, Aux: []interface{}{float64(2), "b"}}},
|
||||
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 8, Aux: []interface{}{float64(8), "h"}}},
|
||||
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Aux: []interface{}{float64(1), "a", "A"}}},
|
||||
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 5, Aux: []interface{}{float64(3), "c", "A"}}},
|
||||
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 2, Aux: []interface{}{float64(2), "b", "B"}}},
|
||||
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 8, Aux: []interface{}{float64(8), "h", "B"}}},
|
||||
}) {
|
||||
t.Errorf("unexpected points: %s", spew.Sdump(a))
|
||||
}
|
||||
|
|
|
@ -114,8 +114,9 @@ func buildAuxIterators(fields Fields, ic IteratorCreator, sources Sources, opt I
|
|||
inputs = append(inputs, input)
|
||||
case *SubQuery:
|
||||
fields := make([]*Field, 0, len(opt.Aux))
|
||||
indexes := make([]int, len(opt.Aux))
|
||||
indexes := make([]IteratorMap, len(opt.Aux))
|
||||
offset := 0
|
||||
AUX:
|
||||
for i, name := range opt.Aux {
|
||||
// Search through the fields to find one that matches this auxiliary field.
|
||||
var match *Field
|
||||
|
@ -144,13 +145,14 @@ func buildAuxIterators(fields Fields, ic IteratorCreator, sources Sources, opt I
|
|||
if match == nil {
|
||||
for _, d := range source.Statement.Dimensions {
|
||||
if d, ok := d.Expr.(*VarRef); ok && name.Val == d.Val {
|
||||
match = &Field{
|
||||
fields = append(fields, &Field{
|
||||
Expr: &VarRef{
|
||||
Val: d.Val,
|
||||
Type: Tag,
|
||||
},
|
||||
}
|
||||
break
|
||||
})
|
||||
indexes[i] = TagMap(d.Val)
|
||||
continue AUX
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -161,7 +163,7 @@ func buildAuxIterators(fields Fields, ic IteratorCreator, sources Sources, opt I
|
|||
match = &Field{Expr: (*nilLiteral)(nil)}
|
||||
}
|
||||
fields = append(fields, match)
|
||||
indexes[i] = i + offset
|
||||
indexes[i] = FieldMap(len(fields) + offset - 1)
|
||||
}
|
||||
|
||||
// Check if we need any selectors within the selected fields.
|
||||
|
@ -192,6 +194,12 @@ func buildAuxIterators(fields Fields, ic IteratorCreator, sources Sources, opt I
|
|||
}
|
||||
}
|
||||
|
||||
// If there are no fields, then we have nothing driving the iterator.
|
||||
// Skip this subquery since it only references tags.
|
||||
if len(fields) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Clone the statement and replace the fields with our custom ordering.
|
||||
stmt := source.Statement.Clone()
|
||||
stmt.Fields = fields
|
||||
|
|
Loading…
Reference in New Issue