Add some unit tests to subqueries
This is not complete, but it is a starting point for more thorough tests of subqueries. This also reorders the use of `cmp.Diff` so the `want` is first and `got` is second. This way, the `want` shows up as a minus sign in the diff rather than, confusingly, as a plus sign.pull/9641/head
parent
5d9f6519ad
commit
d4db76508f
|
@ -181,8 +181,9 @@ func (b *exprIteratorBuilder) buildVarRefIterator(ctx context.Context, expr *inf
|
|||
input, err := subquery.buildVarRefIterator(ctx, expr, b.opt)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if input != nil {
|
||||
inputs = append(inputs, input)
|
||||
}
|
||||
inputs = append(inputs, input)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -667,8 +668,9 @@ func buildAuxIterator(ctx context.Context, ic IteratorCreator, sources influxql.
|
|||
input, err := b.buildAuxIterator(ctx, opt)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if input != nil {
|
||||
inputs = append(inputs, input)
|
||||
}
|
||||
inputs = append(inputs, input)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -2738,7 +2738,7 @@ func TestSelect(t *testing.T) {
|
|||
t.Fatal("expected error")
|
||||
} else if a, err := ReadCursor(cur); err != nil {
|
||||
t.Fatalf("unexpected point: %s", err)
|
||||
} else if diff := cmp.Diff(a, tt.rows); diff != "" {
|
||||
} else if diff := cmp.Diff(tt.rows, a); diff != "" {
|
||||
t.Fatalf("unexpected points:\n%s", diff)
|
||||
}
|
||||
})
|
||||
|
@ -2790,7 +2790,7 @@ func TestSelect_Raw(t *testing.T) {
|
|||
t.Errorf("parse error: %s", err)
|
||||
} else if a, err := ReadCursor(cur); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
} else if diff := cmp.Diff(a, []query.Row{
|
||||
} else if diff := cmp.Diff([]query.Row{
|
||||
{
|
||||
Time: 0 * Second,
|
||||
Series: query.Series{
|
||||
|
@ -2812,7 +2812,7 @@ func TestSelect_Raw(t *testing.T) {
|
|||
},
|
||||
Values: []interface{}{float64(19), int64(19), uint64(19), "c", true},
|
||||
},
|
||||
}); diff != "" {
|
||||
}, a); diff != "" {
|
||||
t.Errorf("unexpected points:\n%s", diff)
|
||||
}
|
||||
}
|
||||
|
@ -3701,7 +3701,7 @@ func TestSelect_BinaryExpr(t *testing.T) {
|
|||
t.Fatalf("%s: expected error", test.Name)
|
||||
} else if a, err := ReadCursor(cur); err != nil {
|
||||
t.Fatalf("%s: unexpected error: %s", test.Name, err)
|
||||
} else if diff := cmp.Diff(a, test.Rows); diff != "" {
|
||||
} else if diff := cmp.Diff(test.Rows, a); diff != "" {
|
||||
t.Errorf("%s: unexpected points:\n%s", test.Name, diff)
|
||||
}
|
||||
})
|
||||
|
@ -3779,7 +3779,7 @@ func TestSelect_BinaryExpr_Boolean(t *testing.T) {
|
|||
t.Errorf("%s: parse error: %s", test.Name, err)
|
||||
} else if a, err := ReadCursor(cur); err != nil {
|
||||
t.Fatalf("%s: unexpected error: %s", test.Name, err)
|
||||
} else if diff := cmp.Diff(a, test.Rows); diff != "" {
|
||||
} else if diff := cmp.Diff(test.Rows, a); diff != "" {
|
||||
t.Errorf("%s: unexpected points:\n%s", test.Name, diff)
|
||||
}
|
||||
})
|
||||
|
@ -3861,7 +3861,7 @@ func TestSelect_BinaryExpr_NilValues(t *testing.T) {
|
|||
t.Errorf("%s: parse error: %s", test.Name, err)
|
||||
} else if a, err := ReadCursor(cur); err != nil {
|
||||
t.Fatalf("%s: unexpected error: %s", test.Name, err)
|
||||
} else if diff := cmp.Diff(a, test.Rows); diff != "" {
|
||||
} else if diff := cmp.Diff(test.Rows, a); diff != "" {
|
||||
t.Errorf("%s: unexpected points:\n%s", test.Name, diff)
|
||||
}
|
||||
})
|
||||
|
|
|
@ -23,7 +23,12 @@ func (b *subqueryBuilder) buildAuxIterator(ctx context.Context, opt IteratorOpti
|
|||
}
|
||||
|
||||
// Map the desired auxiliary fields from the substatement.
|
||||
indexes := b.mapAuxFields(auxFields)
|
||||
indexes, ok := b.mapAuxFields(auxFields)
|
||||
if !ok {
|
||||
// No actual fields were selected so do not waste time running the query.
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
subOpt, err := newIteratorOptionsSubstatement(ctx, b.stmt, opt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -44,18 +49,19 @@ func (b *subqueryBuilder) buildAuxIterator(ctx context.Context, opt IteratorOpti
|
|||
return input, nil
|
||||
}
|
||||
|
||||
func (b *subqueryBuilder) mapAuxFields(auxFields []influxql.VarRef) []IteratorMap {
|
||||
indexes := make([]IteratorMap, len(auxFields))
|
||||
func (b *subqueryBuilder) mapAuxFields(auxFields []influxql.VarRef) (indexes []IteratorMap, ok bool) {
|
||||
indexes = make([]IteratorMap, len(auxFields))
|
||||
for i, name := range auxFields {
|
||||
m := b.mapAuxField(&name)
|
||||
if m == nil {
|
||||
// If this field doesn't map to anything, use the NullMap so it
|
||||
// shows up as null.
|
||||
m = NullMap{}
|
||||
indexes[i] = NullMap{}
|
||||
continue
|
||||
}
|
||||
indexes[i] = m
|
||||
indexes[i], ok = m, true
|
||||
}
|
||||
return indexes
|
||||
return indexes, ok
|
||||
}
|
||||
|
||||
func (b *subqueryBuilder) mapAuxField(name *influxql.VarRef) IteratorMap {
|
||||
|
@ -113,7 +119,7 @@ func (b *subqueryBuilder) buildVarRefIterator(ctx context.Context, expr *influxq
|
|||
}
|
||||
|
||||
// Map the auxiliary fields to their index in the subquery.
|
||||
indexes := b.mapAuxFields(auxFields)
|
||||
indexes, _ := b.mapAuxFields(auxFields)
|
||||
subOpt, err := newIteratorOptionsSubstatement(ctx, b.stmt, opt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -0,0 +1,172 @@
|
|||
package query_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
"github.com/influxdata/influxql"
|
||||
)
|
||||
|
||||
type CreateIteratorFn func(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) query.Iterator
|
||||
|
||||
func TestSubquery(t *testing.T) {
|
||||
for _, test := range []struct {
|
||||
Name string
|
||||
Statement string
|
||||
Fields map[string]influxql.DataType
|
||||
MapShardsFn func(t *testing.T, tr influxql.TimeRange) CreateIteratorFn
|
||||
Rows []query.Row
|
||||
Skip string
|
||||
}{
|
||||
{
|
||||
Name: "AuxiliaryFields",
|
||||
Statement: `SELECT max / 2.0 FROM (SELECT max(value) FROM cpu GROUP BY time(5s)) WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:15Z'`,
|
||||
Fields: map[string]influxql.DataType{"value": influxql.Float},
|
||||
MapShardsFn: func(t *testing.T, tr influxql.TimeRange) CreateIteratorFn {
|
||||
if got, want := tr.MinTimeNano(), 0*Second; got != want {
|
||||
t.Errorf("unexpected min time: got=%d want=%d", got, want)
|
||||
}
|
||||
if got, want := tr.MaxTimeNano(), 15*Second-1; got != want {
|
||||
t.Errorf("unexpected max time: got=%d want=%d", got, want)
|
||||
}
|
||||
return func(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) query.Iterator {
|
||||
if got, want := m.Name, "cpu"; got != want {
|
||||
t.Errorf("unexpected source: got=%s want=%s", got, want)
|
||||
}
|
||||
if got, want := opt.Expr.String(), "max(value::float)"; got != want {
|
||||
t.Errorf("unexpected expression: got=%s want=%s", got, want)
|
||||
}
|
||||
return &FloatIterator{Points: []query.FloatPoint{
|
||||
{Name: "cpu", Time: 0 * Second, Value: 5},
|
||||
{Name: "cpu", Time: 5 * Second, Value: 3},
|
||||
{Name: "cpu", Time: 10 * Second, Value: 8},
|
||||
}}
|
||||
}
|
||||
},
|
||||
Rows: []query.Row{
|
||||
{Time: 0 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{2.5}},
|
||||
{Time: 5 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{1.5}},
|
||||
{Time: 10 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{float64(4)}},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "AuxiliaryFields_WithWhereClause",
|
||||
Statement: `SELECT host FROM (SELECT max(value), host FROM cpu GROUP BY time(5s)) WHERE max > 4 AND time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:15Z'`,
|
||||
Fields: map[string]influxql.DataType{
|
||||
"value": influxql.Float,
|
||||
"host": influxql.Tag,
|
||||
},
|
||||
MapShardsFn: func(t *testing.T, tr influxql.TimeRange) CreateIteratorFn {
|
||||
if got, want := tr.MinTimeNano(), 0*Second; got != want {
|
||||
t.Errorf("unexpected min time: got=%d want=%d", got, want)
|
||||
}
|
||||
if got, want := tr.MaxTimeNano(), 15*Second-1; got != want {
|
||||
t.Errorf("unexpected max time: got=%d want=%d", got, want)
|
||||
}
|
||||
return func(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) query.Iterator {
|
||||
if got, want := m.Name, "cpu"; got != want {
|
||||
t.Errorf("unexpected source: got=%s want=%s", got, want)
|
||||
}
|
||||
if got, want := opt.Expr.String(), "max(value::float)"; got != want {
|
||||
t.Errorf("unexpected expression: got=%s want=%s", got, want)
|
||||
}
|
||||
if got, want := opt.Aux, []influxql.VarRef{{Val: "host", Type: influxql.Tag}}; !cmp.Equal(got, want) {
|
||||
t.Errorf("unexpected auxiliary fields:\n%s", cmp.Diff(want, got))
|
||||
}
|
||||
return &FloatIterator{Points: []query.FloatPoint{
|
||||
{Name: "cpu", Time: 0 * Second, Value: 5, Aux: []interface{}{"server02"}},
|
||||
{Name: "cpu", Time: 5 * Second, Value: 3, Aux: []interface{}{"server01"}},
|
||||
{Name: "cpu", Time: 10 * Second, Value: 8, Aux: []interface{}{"server03"}},
|
||||
}}
|
||||
}
|
||||
},
|
||||
Rows: []query.Row{
|
||||
{Time: 0 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{"server02"}},
|
||||
{Time: 10 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{"server03"}},
|
||||
},
|
||||
Skip: `causes a panic`,
|
||||
},
|
||||
{
|
||||
Name: "AuxiliaryFields_NonExistentField",
|
||||
Statement: `SELECT host FROM (SELECT max(value) FROM cpu GROUP BY time(5s)) WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:15Z'`,
|
||||
Fields: map[string]influxql.DataType{"value": influxql.Float},
|
||||
MapShardsFn: func(t *testing.T, tr influxql.TimeRange) CreateIteratorFn {
|
||||
return func(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) query.Iterator {
|
||||
return &FloatIterator{Points: []query.FloatPoint{
|
||||
{Name: "cpu", Time: 0 * Second, Value: 5},
|
||||
{Name: "cpu", Time: 5 * Second, Value: 3},
|
||||
{Name: "cpu", Time: 10 * Second, Value: 8},
|
||||
}}
|
||||
}
|
||||
},
|
||||
Rows: []query.Row(nil),
|
||||
},
|
||||
{
|
||||
Name: "AggregateOfMath",
|
||||
Statement: `SELECT mean(percentage) FROM (SELECT value * 100.0 AS percentage FROM cpu) WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:15Z' GROUP BY time(5s)`,
|
||||
Fields: map[string]influxql.DataType{"value": influxql.Float},
|
||||
MapShardsFn: func(t *testing.T, tr influxql.TimeRange) CreateIteratorFn {
|
||||
if got, want := tr.MinTimeNano(), 0*Second; got != want {
|
||||
t.Errorf("unexpected min time: got=%d want=%d", got, want)
|
||||
}
|
||||
if got, want := tr.MaxTimeNano(), 15*Second-1; got != want {
|
||||
t.Errorf("unexpected max time: got=%d want=%d", got, want)
|
||||
}
|
||||
return func(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) query.Iterator {
|
||||
if got, want := m.Name, "cpu"; got != want {
|
||||
t.Errorf("unexpected source: got=%s want=%s", got, want)
|
||||
}
|
||||
if got, want := opt.Expr, influxql.Expr(nil); got != want {
|
||||
t.Errorf("unexpected expression: got=%s want=%s", got, want)
|
||||
}
|
||||
if got, want := opt.Aux, []influxql.VarRef{{Val: "value", Type: influxql.Float}}; !cmp.Equal(got, want) {
|
||||
t.Errorf("unexpected auxiliary fields:\n%s", cmp.Diff(want, got))
|
||||
}
|
||||
return &FloatIterator{Points: []query.FloatPoint{
|
||||
{Name: "cpu", Time: 0 * Second, Aux: []interface{}{0.5}},
|
||||
{Name: "cpu", Time: 2 * Second, Aux: []interface{}{1.0}},
|
||||
{Name: "cpu", Time: 5 * Second, Aux: []interface{}{0.05}},
|
||||
{Name: "cpu", Time: 8 * Second, Aux: []interface{}{0.45}},
|
||||
{Name: "cpu", Time: 12 * Second, Aux: []interface{}{0.34}},
|
||||
}}
|
||||
}
|
||||
},
|
||||
Rows: []query.Row{
|
||||
{Time: 0 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{float64(75)}},
|
||||
{Time: 5 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{float64(25)}},
|
||||
{Time: 10 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{float64(34)}},
|
||||
},
|
||||
},
|
||||
} {
|
||||
t.Run(test.Name, func(t *testing.T) {
|
||||
if test.Skip != "" {
|
||||
t.Skip(test.Skip)
|
||||
}
|
||||
|
||||
shardMapper := ShardMapper{
|
||||
MapShardsFn: func(sources influxql.Sources, tr influxql.TimeRange) query.ShardGroup {
|
||||
fn := test.MapShardsFn(t, tr)
|
||||
return &ShardGroup{
|
||||
Fields: test.Fields,
|
||||
CreateIteratorFn: func(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) {
|
||||
return fn(ctx, m, opt), nil
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
stmt := MustParseSelectStatement(test.Statement)
|
||||
stmt.OmitTime = true
|
||||
cur, err := query.Select(context.Background(), stmt, &shardMapper, query.SelectOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected parse error: %s", err)
|
||||
} else if a, err := ReadCursor(cur); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
} else if diff := cmp.Diff(test.Rows, a); diff != "" {
|
||||
t.Fatalf("unexpected points:\n%s", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue