feat(storage/reads): replace panic on unsupported type with an error (#19353)

This replaces the panic that happens when attempting to create a cursor
for an unsupported aggregate type for the mean aggregate. It instead
refactors the result set so the cursor is constructed as part of the
`Next()` call and false is returned when there is an error creating the
cursor. The cursor is then accessed with `Cursor()`. When `Next()`
returns false, an error can be accessed by using `Err()`.
pull/19401/head
Jonathan A. Sternberg 2020-08-20 10:13:49 -05:00 committed by GitHub
parent 550966dbe2
commit 519f60f7d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 281 additions and 101 deletions

View File

@ -14,9 +14,11 @@ import (
type windowAggregateResultSet struct {
ctx context.Context
req *datatypes.ReadWindowAggregateRequest
cursor SeriesCursor
seriesRow *SeriesRow
seriesCursor SeriesCursor
seriesRow SeriesRow
arrayCursors *arrayCursors
cursor cursors.Cursor
err error
}
func NewWindowAggregateResultSet(ctx context.Context, req *datatypes.ReadWindowAggregateRequest, cursor SeriesCursor) (ResultSet, error) {
@ -46,25 +48,31 @@ func NewWindowAggregateResultSet(ctx context.Context, req *datatypes.ReadWindowA
results := &windowAggregateResultSet{
ctx: ctx,
req: req,
cursor: cursor,
seriesCursor: cursor,
arrayCursors: newArrayCursors(ctx, req.Range.Start, req.Range.End, ascending),
}
return results, nil
}
func (r *windowAggregateResultSet) Next() bool {
if r == nil {
if r == nil || r.err != nil {
return false
}
r.seriesRow = r.cursor.Next()
return r.seriesRow != nil
seriesRow := r.seriesCursor.Next()
if seriesRow == nil {
return false
}
r.seriesRow = *seriesRow
r.cursor, r.err = r.createCursor(r.seriesRow)
return r.err == nil
}
func (r *windowAggregateResultSet) Cursor() cursors.Cursor {
func (r *windowAggregateResultSet) createCursor(seriesRow SeriesRow) (cursors.Cursor, error) {
agg := r.req.Aggregate[0]
every := r.req.WindowEvery
offset := r.req.Offset
cursor := r.arrayCursors.createCursor(*r.seriesRow)
cursor := r.arrayCursors.createCursor(seriesRow)
if every == math.MaxInt64 {
// This means to aggregate over whole series for the query's time range
@ -74,20 +82,28 @@ func (r *windowAggregateResultSet) Cursor() cursors.Cursor {
}
}
func (r *windowAggregateResultSet) Close() {}
func (r *windowAggregateResultSet) Cursor() cursors.Cursor {
return r.cursor
}
func (r *windowAggregateResultSet) Err() error { return nil }
func (r *windowAggregateResultSet) Close() {
if r == nil {
return
}
r.seriesRow.Query = nil
r.seriesCursor.Close()
}
func (r *windowAggregateResultSet) Err() error { return r.err }
func (r *windowAggregateResultSet) Stats() cursors.CursorStats {
if r.seriesRow == nil || r.seriesRow.Query == nil {
if r.seriesRow.Query == nil {
return cursors.CursorStats{}
}
// See the equivalent method in *resultSet.Stats.
return r.seriesRow.Query.Stats()
}
func (r *windowAggregateResultSet) Tags() models.Tags {
if r.seriesRow == nil {
return models.Tags{}
}
return r.seriesRow.Tags
}

View File

@ -71,13 +71,31 @@ func (i *mockIntegerArrayCursor) Next() *cursors.IntegerArray {
}
}
type mockCursorIterator struct{}
type mockStringArrayCursor struct{}
func (i *mockStringArrayCursor) Close() {}
func (i *mockStringArrayCursor) Err() error { return nil }
func (i *mockStringArrayCursor) Stats() cursors.CursorStats { return cursors.CursorStats{} }
func (i *mockStringArrayCursor) Next() *cursors.StringArray {
return &cursors.StringArray{
Timestamps: []int64{1000000000},
Values: []string{"a"},
}
}
type mockCursorIterator struct {
newCursorFn func() cursors.Cursor
statsFn func() cursors.CursorStats
}
func (i *mockCursorIterator) Next(ctx context.Context, req *cursors.CursorRequest) (cursors.Cursor, error) {
return &mockIntegerArrayCursor{}, nil
return i.newCursorFn(), nil
}
func (i *mockCursorIterator) Stats() cursors.CursorStats {
return cursors.CursorStats{ScannedBytes: 500, ScannedValues: 10}
if i.statsFn == nil {
return cursors.CursorStats{}
}
return i.statsFn()
}
type mockReadCursor struct {
@ -90,7 +108,14 @@ func newMockReadCursor(keys ...string) mockReadCursor {
for i := range keys {
rows[i].Name, rows[i].SeriesTags = models.ParseKeyBytes([]byte(keys[i]))
rows[i].Tags = rows[i].SeriesTags.Clone()
rows[i].Query = &mockCursorIterator{}
rows[i].Query = &mockCursorIterator{
newCursorFn: func() cursors.Cursor {
return &mockIntegerArrayCursor{}
},
statsFn: func() cursors.CursorStats {
return cursors.CursorStats{ScannedBytes: 500, ScannedValues: 10}
},
}
}
return mockReadCursor{rows: rows}
@ -179,3 +204,37 @@ func TestNewWindowAggregateResultSet_Count(t *testing.T) {
t.Errorf("unexpected count values: %v", integerArray.Values)
}
}
func TestNewWindowAggregateResultSet_UnsupportedTyped(t *testing.T) {
newCursor := newMockReadCursor(
"clicks click=1 1",
)
newCursor.rows[0].Query = &mockCursorIterator{
newCursorFn: func() cursors.Cursor {
return &mockStringArrayCursor{}
},
}
request := datatypes.ReadWindowAggregateRequest{
Aggregate: []*datatypes.Aggregate{
{Type: datatypes.AggregateTypeMean},
},
WindowEvery: 10,
}
resultSet, err := reads.NewWindowAggregateResultSet(context.Background(), &request, &newCursor)
if err != nil {
t.Fatalf("error creating WindowAggregateResultSet: %s", err)
}
if resultSet.Next() {
t.Fatal("unexpected: resultSet should not have advanced")
}
err = resultSet.Err()
if err == nil {
t.Fatal("expected error")
}
if want, got := "unsupported input type for mean aggregate: string", err.Error(); want != got {
t.Fatalf("unexpected error:\n\t- %q\n\t+ %q", want, got)
}
}

View File

@ -11,6 +11,7 @@ import (
"fmt"
"math"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/tsdb/cursors"
)
@ -119,20 +120,23 @@ func newWindowCountArrayCursor(cur cursors.Cursor, every, offset int64) cursors.
}
}
func newWindowSumArrayCursor(cur cursors.Cursor, every, offset int64) cursors.Cursor {
func newWindowSumArrayCursor(cur cursors.Cursor, every, offset int64) (cursors.Cursor, error) {
switch cur := cur.(type) {
case cursors.FloatArrayCursor:
return newFloatWindowSumArrayCursor(cur, every, offset)
return newFloatWindowSumArrayCursor(cur, every, offset), nil
case cursors.IntegerArrayCursor:
return newIntegerWindowSumArrayCursor(cur, every, offset)
return newIntegerWindowSumArrayCursor(cur, every, offset), nil
case cursors.UnsignedArrayCursor:
return newUnsignedWindowSumArrayCursor(cur, every, offset)
return newUnsignedWindowSumArrayCursor(cur, every, offset), nil
default:
panic(fmt.Sprintf("unsupported for aggregate sum: %T", cur))
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: fmt.Sprintf("unsupported input type for sum aggregate: %s", arrayCursorType(cur)),
}
}
}
@ -170,20 +174,23 @@ func newWindowMaxArrayCursor(cur cursors.Cursor, every, offset int64) cursors.Cu
}
}
func newWindowMeanArrayCursor(cur cursors.Cursor, every, offset int64) cursors.Cursor {
func newWindowMeanArrayCursor(cur cursors.Cursor, every, offset int64) (cursors.Cursor, error) {
switch cur := cur.(type) {
case cursors.FloatArrayCursor:
return newFloatWindowMeanArrayCursor(cur, every, offset)
return newFloatWindowMeanArrayCursor(cur, every, offset), nil
case cursors.IntegerArrayCursor:
return newIntegerWindowMeanArrayCursor(cur, every, offset)
return newIntegerWindowMeanArrayCursor(cur, every, offset), nil
case cursors.UnsignedArrayCursor:
return newUnsignedWindowMeanArrayCursor(cur, every, offset)
return newUnsignedWindowMeanArrayCursor(cur, every, offset), nil
default:
panic(fmt.Sprintf("unsupported for aggregate mean: %T", cur))
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: fmt.Sprintf("unsupported input type for mean aggregate: %s", arrayCursorType(cur)),
}
}
}
@ -3690,3 +3697,26 @@ func (c *booleanEmptyArrayCursor) Err() error { return nil }
func (c *booleanEmptyArrayCursor) Close() {}
func (c *booleanEmptyArrayCursor) Stats() cursors.CursorStats { return cursors.CursorStats{} }
func (c *booleanEmptyArrayCursor) Next() *cursors.BooleanArray { return &c.res }
func arrayCursorType(cur cursors.Cursor) string {
switch cur.(type) {
case cursors.FloatArrayCursor:
return "float"
case cursors.IntegerArrayCursor:
return "integer"
case cursors.UnsignedArrayCursor:
return "unsigned"
case cursors.StringArrayCursor:
return "string"
case cursors.BooleanArrayCursor:
return "boolean"
default:
return "unknown"
}
}

View File

@ -5,6 +5,7 @@ import (
"fmt"
"math"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/tsdb/cursors"
)
@ -65,19 +66,22 @@ func newWindowCountArrayCursor(cur cursors.Cursor, every, offset int64) cursors.
}
}
func newWindowSumArrayCursor(cur cursors.Cursor, every, offset int64) cursors.Cursor {
func newWindowSumArrayCursor(cur cursors.Cursor, every, offset int64) (cursors.Cursor, error) {
switch cur := cur.(type) {
{{range .}}
{{$Type := .Name}}
{{range .Aggs}}
{{if eq .Name "Sum"}}
case cursors.{{$Type}}ArrayCursor:
return new{{$Type}}WindowSumArrayCursor(cur, every, offset)
return new{{$Type}}WindowSumArrayCursor(cur, every, offset), nil
{{end}}
{{end}}{{/* for each supported agg fn */}}
{{end}}{{/* for each field type */}}
default:
panic(fmt.Sprintf("unsupported for aggregate sum: %T", cur))
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: fmt.Sprintf("unsupported input type for sum aggregate: %s", arrayCursorType(cur)),
}
}
}
@ -113,19 +117,22 @@ func newWindowMaxArrayCursor(cur cursors.Cursor, every, offset int64) cursors.Cu
}
}
func newWindowMeanArrayCursor(cur cursors.Cursor, every, offset int64) cursors.Cursor {
func newWindowMeanArrayCursor(cur cursors.Cursor, every, offset int64) (cursors.Cursor, error) {
switch cur := cur.(type) {
{{range .}}
{{$Type := .Name}}
{{range .Aggs}}
{{if eq .Name "Mean"}}
case cursors.{{$Type}}ArrayCursor:
return new{{$Type}}WindowMeanArrayCursor(cur, every, offset)
return new{{$Type}}WindowMeanArrayCursor(cur, every, offset), nil
{{end}}
{{end}}{{/* for each supported agg fn */}}
{{end}}{{/* for each field type */}}
default:
panic(fmt.Sprintf("unsupported for aggregate mean: %T", cur))
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: fmt.Sprintf("unsupported input type for mean aggregate: %s", arrayCursorType(cur)),
}
}
}
{{range .}}
@ -564,3 +571,14 @@ func (c *{{.name}}EmptyArrayCursor) Stats() cursors.CursorStats { return cursors
func (c *{{.name}}EmptyArrayCursor) Next() {{$arrayType}} { return &c.res }
{{end}}{{/* range . */}}
func arrayCursorType(cur cursors.Cursor) string {
switch cur.(type) {
{{range .}}
case cursors.{{.Name}}ArrayCursor:
return "{{.name}}"
{{end}}{{/* range . */}}
default:
return "unknown"
}
}

View File

@ -16,32 +16,32 @@ func (v *singleValue) Value(key string) (interface{}, bool) {
return v.v, true
}
func newAggregateArrayCursor(ctx context.Context, agg *datatypes.Aggregate, cursor cursors.Cursor) cursors.Cursor {
func newAggregateArrayCursor(ctx context.Context, agg *datatypes.Aggregate, cursor cursors.Cursor) (cursors.Cursor, error) {
switch agg.Type {
case datatypes.AggregateTypeFirst, datatypes.AggregateTypeLast:
return newLimitArrayCursor(cursor)
return newLimitArrayCursor(cursor), nil
}
return newWindowAggregateArrayCursor(ctx, agg, 0, 0, cursor)
}
func newWindowAggregateArrayCursor(ctx context.Context, agg *datatypes.Aggregate, every, offset int64, cursor cursors.Cursor) cursors.Cursor {
func newWindowAggregateArrayCursor(ctx context.Context, agg *datatypes.Aggregate, every, offset int64, cursor cursors.Cursor) (cursors.Cursor, error) {
if cursor == nil {
return nil
return nil, nil
}
switch agg.Type {
case datatypes.AggregateTypeCount:
return newWindowCountArrayCursor(cursor, every, offset)
return newWindowCountArrayCursor(cursor, every, offset), nil
case datatypes.AggregateTypeSum:
return newWindowSumArrayCursor(cursor, every, offset)
case datatypes.AggregateTypeFirst:
return newWindowFirstArrayCursor(cursor, every, offset)
return newWindowFirstArrayCursor(cursor, every, offset), nil
case datatypes.AggregateTypeLast:
return newWindowLastArrayCursor(cursor, every, offset)
return newWindowLastArrayCursor(cursor, every, offset), nil
case datatypes.AggregateTypeMin:
return newWindowMinArrayCursor(cursor, every, offset)
return newWindowMinArrayCursor(cursor, every, offset), nil
case datatypes.AggregateTypeMax:
return newWindowMaxArrayCursor(cursor, every, offset)
return newWindowMaxArrayCursor(cursor, every, offset), nil
case datatypes.AggregateTypeMean:
return newWindowMeanArrayCursor(cursor, every, offset)
default:

View File

@ -41,7 +41,7 @@ func TestNewAggregateArrayCursor_Float(t *testing.T) {
Type: datatypes.AggregateTypeCount,
}
got := newAggregateArrayCursor(context.Background(), agg, &MockFloatArrayCursor{})
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockFloatArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowCountArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -59,7 +59,7 @@ func TestNewAggregateArrayCursor_Float(t *testing.T) {
Type: datatypes.AggregateTypeSum,
}
got := newAggregateArrayCursor(context.Background(), agg, &MockFloatArrayCursor{})
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockFloatArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowSumArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -77,7 +77,7 @@ func TestNewAggregateArrayCursor_Float(t *testing.T) {
Type: datatypes.AggregateTypeMin,
}
got := newAggregateArrayCursor(context.Background(), agg, &MockFloatArrayCursor{})
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockFloatArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMinArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -95,7 +95,7 @@ func TestNewAggregateArrayCursor_Float(t *testing.T) {
Type: datatypes.AggregateTypeMax,
}
got := newAggregateArrayCursor(context.Background(), agg, &MockFloatArrayCursor{})
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockFloatArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMaxArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -113,7 +113,7 @@ func TestNewAggregateArrayCursor_Float(t *testing.T) {
Type: datatypes.AggregateTypeMean,
}
got := newAggregateArrayCursor(context.Background(), agg, &MockFloatArrayCursor{})
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockFloatArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMeanArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -136,7 +136,7 @@ func TestNewWindowAggregateArrayCursor_Float(t *testing.T) {
Type: datatypes.AggregateTypeCount,
}
got := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockFloatArrayCursor{})
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockFloatArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowCountArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -155,7 +155,7 @@ func TestNewWindowAggregateArrayCursor_Float(t *testing.T) {
Type: datatypes.AggregateTypeSum,
}
got := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockFloatArrayCursor{})
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockFloatArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowSumArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -174,7 +174,7 @@ func TestNewWindowAggregateArrayCursor_Float(t *testing.T) {
Type: datatypes.AggregateTypeMin,
}
got := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockFloatArrayCursor{})
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockFloatArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMinArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -193,7 +193,7 @@ func TestNewWindowAggregateArrayCursor_Float(t *testing.T) {
Type: datatypes.AggregateTypeMax,
}
got := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockFloatArrayCursor{})
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockFloatArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMaxArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -212,7 +212,7 @@ func TestNewWindowAggregateArrayCursor_Float(t *testing.T) {
Type: datatypes.AggregateTypeMean,
}
got := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockFloatArrayCursor{})
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockFloatArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMeanArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -246,7 +246,7 @@ func TestNewAggregateArrayCursor_Integer(t *testing.T) {
Type: datatypes.AggregateTypeCount,
}
got := newAggregateArrayCursor(context.Background(), agg, &MockIntegerArrayCursor{})
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowCountArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -264,7 +264,7 @@ func TestNewAggregateArrayCursor_Integer(t *testing.T) {
Type: datatypes.AggregateTypeSum,
}
got := newAggregateArrayCursor(context.Background(), agg, &MockIntegerArrayCursor{})
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowSumArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -282,7 +282,7 @@ func TestNewAggregateArrayCursor_Integer(t *testing.T) {
Type: datatypes.AggregateTypeMin,
}
got := newAggregateArrayCursor(context.Background(), agg, &MockIntegerArrayCursor{})
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMinArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -300,7 +300,7 @@ func TestNewAggregateArrayCursor_Integer(t *testing.T) {
Type: datatypes.AggregateTypeMax,
}
got := newAggregateArrayCursor(context.Background(), agg, &MockIntegerArrayCursor{})
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMaxArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -318,7 +318,7 @@ func TestNewAggregateArrayCursor_Integer(t *testing.T) {
Type: datatypes.AggregateTypeMean,
}
got := newAggregateArrayCursor(context.Background(), agg, &MockIntegerArrayCursor{})
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMeanArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -341,7 +341,7 @@ func TestNewWindowAggregateArrayCursor_Integer(t *testing.T) {
Type: datatypes.AggregateTypeCount,
}
got := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockIntegerArrayCursor{})
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowCountArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -360,7 +360,7 @@ func TestNewWindowAggregateArrayCursor_Integer(t *testing.T) {
Type: datatypes.AggregateTypeSum,
}
got := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockIntegerArrayCursor{})
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowSumArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -379,7 +379,7 @@ func TestNewWindowAggregateArrayCursor_Integer(t *testing.T) {
Type: datatypes.AggregateTypeMin,
}
got := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockIntegerArrayCursor{})
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMinArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -398,7 +398,7 @@ func TestNewWindowAggregateArrayCursor_Integer(t *testing.T) {
Type: datatypes.AggregateTypeMax,
}
got := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockIntegerArrayCursor{})
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMaxArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -417,7 +417,7 @@ func TestNewWindowAggregateArrayCursor_Integer(t *testing.T) {
Type: datatypes.AggregateTypeMean,
}
got := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockIntegerArrayCursor{})
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMeanArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -451,7 +451,7 @@ func TestNewAggregateArrayCursor_Unsigned(t *testing.T) {
Type: datatypes.AggregateTypeCount,
}
got := newAggregateArrayCursor(context.Background(), agg, &MockUnsignedArrayCursor{})
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockUnsignedArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowCountArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -469,7 +469,7 @@ func TestNewAggregateArrayCursor_Unsigned(t *testing.T) {
Type: datatypes.AggregateTypeSum,
}
got := newAggregateArrayCursor(context.Background(), agg, &MockUnsignedArrayCursor{})
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockUnsignedArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowSumArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -487,7 +487,7 @@ func TestNewAggregateArrayCursor_Unsigned(t *testing.T) {
Type: datatypes.AggregateTypeMin,
}
got := newAggregateArrayCursor(context.Background(), agg, &MockUnsignedArrayCursor{})
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockUnsignedArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMinArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -505,7 +505,7 @@ func TestNewAggregateArrayCursor_Unsigned(t *testing.T) {
Type: datatypes.AggregateTypeMax,
}
got := newAggregateArrayCursor(context.Background(), agg, &MockUnsignedArrayCursor{})
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockUnsignedArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMaxArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -523,7 +523,7 @@ func TestNewAggregateArrayCursor_Unsigned(t *testing.T) {
Type: datatypes.AggregateTypeMean,
}
got := newAggregateArrayCursor(context.Background(), agg, &MockUnsignedArrayCursor{})
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockUnsignedArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMeanArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -546,7 +546,7 @@ func TestNewWindowAggregateArrayCursor_Unsigned(t *testing.T) {
Type: datatypes.AggregateTypeCount,
}
got := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockUnsignedArrayCursor{})
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockUnsignedArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowCountArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -565,7 +565,7 @@ func TestNewWindowAggregateArrayCursor_Unsigned(t *testing.T) {
Type: datatypes.AggregateTypeSum,
}
got := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockUnsignedArrayCursor{})
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockUnsignedArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowSumArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -584,7 +584,7 @@ func TestNewWindowAggregateArrayCursor_Unsigned(t *testing.T) {
Type: datatypes.AggregateTypeMin,
}
got := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockUnsignedArrayCursor{})
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockUnsignedArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMinArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -603,7 +603,7 @@ func TestNewWindowAggregateArrayCursor_Unsigned(t *testing.T) {
Type: datatypes.AggregateTypeMax,
}
got := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockUnsignedArrayCursor{})
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockUnsignedArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMaxArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -622,7 +622,7 @@ func TestNewWindowAggregateArrayCursor_Unsigned(t *testing.T) {
Type: datatypes.AggregateTypeMean,
}
got := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockUnsignedArrayCursor{})
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockUnsignedArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMeanArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -656,7 +656,7 @@ func TestNewAggregateArrayCursor_String(t *testing.T) {
Type: datatypes.AggregateTypeCount,
}
got := newAggregateArrayCursor(context.Background(), agg, &MockStringArrayCursor{})
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockStringArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(stringWindowCountArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -679,7 +679,7 @@ func TestNewWindowAggregateArrayCursor_String(t *testing.T) {
Type: datatypes.AggregateTypeCount,
}
got := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockStringArrayCursor{})
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockStringArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(stringWindowCountArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -713,7 +713,7 @@ func TestNewAggregateArrayCursor_Boolean(t *testing.T) {
Type: datatypes.AggregateTypeCount,
}
got := newAggregateArrayCursor(context.Background(), agg, &MockBooleanArrayCursor{})
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockBooleanArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(booleanWindowCountArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -736,7 +736,7 @@ func TestNewWindowAggregateArrayCursor_Boolean(t *testing.T) {
Type: datatypes.AggregateTypeCount,
}
got := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockBooleanArrayCursor{})
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &MockBooleanArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(booleanWindowCountArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)

View File

@ -40,7 +40,7 @@ func TestNewAggregateArrayCursor_{{$ColType}}(t *testing.T) {
Type: datatypes.AggregateType{{$Agg}},
}
got := newAggregateArrayCursor(context.Background(), agg, &Mock{{$ColType}}ArrayCursor{})
got, _ := newAggregateArrayCursor(context.Background(), agg, &Mock{{$ColType}}ArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported({{$colType}}Window{{$Agg}}ArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
@ -64,7 +64,7 @@ func TestNewWindowAggregateArrayCursor_{{$ColType}}(t *testing.T) {
Type: datatypes.AggregateType{{$Agg}},
}
got := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &Mock{{$ColType}}ArrayCursor{})
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, int64(time.Hour), 0, &Mock{{$ColType}}ArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported({{$colType}}Window{{$Agg}}ArrayCursor{})); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)

View File

@ -283,9 +283,11 @@ type groupNoneCursor struct {
cur SeriesCursor
row SeriesRow
keys [][]byte
cursor cursors.Cursor
err error
}
func (c *groupNoneCursor) Err() error { return nil }
func (c *groupNoneCursor) Err() error { return c.err }
func (c *groupNoneCursor) Tags() models.Tags { return c.row.Tags }
func (c *groupNoneCursor) Keys() [][]byte { return c.keys }
func (c *groupNoneCursor) PartitionKeyVals() [][]byte { return nil }
@ -298,21 +300,26 @@ func (c *groupNoneCursor) Aggregate() *datatypes.Aggregate {
func (c *groupNoneCursor) Next() bool {
row := c.cur.Next()
if row == nil {
if row == nil || c.err != nil {
return false
}
c.row = *row
return true
c.cursor, c.err = c.createCursor(c.row)
return c.err == nil
}
func (c *groupNoneCursor) createCursor(seriesRow SeriesRow) (cur cursors.Cursor, err error) {
cur = c.arrayCursors.createCursor(seriesRow)
if c.agg != nil {
cur, err = newAggregateArrayCursor(c.ctx, c.agg, cur)
}
return cur, err
}
func (c *groupNoneCursor) Cursor() cursors.Cursor {
cur := c.arrayCursors.createCursor(c.row)
if c.agg != nil {
cur = newAggregateArrayCursor(c.ctx, c.agg, cur)
}
return cur
return c.cursor
}
type groupByCursor struct {
@ -321,8 +328,10 @@ type groupByCursor struct {
agg *datatypes.Aggregate
i int
seriesRows []*SeriesRow
cursor cursors.Cursor
keys [][]byte
vals [][]byte
err error
}
func (c *groupByCursor) reset(seriesRows []*SeriesRow) {
@ -330,7 +339,7 @@ func (c *groupByCursor) reset(seriesRows []*SeriesRow) {
c.seriesRows = seriesRows
}
func (c *groupByCursor) Err() error { return nil }
func (c *groupByCursor) Err() error { return c.err }
func (c *groupByCursor) Keys() [][]byte { return c.keys }
func (c *groupByCursor) PartitionKeyVals() [][]byte { return c.vals }
func (c *groupByCursor) Tags() models.Tags { return c.seriesRows[c.i-1].Tags }
@ -343,17 +352,22 @@ func (c *groupByCursor) Aggregate() *datatypes.Aggregate {
func (c *groupByCursor) Next() bool {
if c.i < len(c.seriesRows) {
c.i++
return true
c.cursor, c.err = c.createCursor(*c.seriesRows[c.i-1])
return c.err == nil
}
return false
}
func (c *groupByCursor) Cursor() cursors.Cursor {
cur := c.arrayCursors.createCursor(*c.seriesRows[c.i-1])
func (c *groupByCursor) createCursor(seriesRow SeriesRow) (cur cursors.Cursor, err error) {
cur = c.arrayCursors.createCursor(seriesRow)
if c.agg != nil {
cur = newAggregateArrayCursor(c.ctx, c.agg, cur)
cur, err = newAggregateArrayCursor(c.ctx, c.agg, cur)
}
return cur
return cur, err
}
func (c *groupByCursor) Cursor() cursors.Cursor {
return c.cursor
}
func (c *groupByCursor) Stats() cursors.CursorStats {

View File

@ -704,3 +704,47 @@ func TestNewGroupResultSet_GroupBy_Last(t *testing.T) {
t.Errorf("unexpected last values: %v", integerArray.Values)
}
}
func TestNewGroupResultSet_GroupBy_UnsupportedType(t *testing.T) {
request := datatypes.ReadGroupRequest{
Group: datatypes.GroupBy,
GroupKeys: []string{"host", "location"},
Aggregate: &datatypes.Aggregate{
Type: datatypes.AggregateTypeSum,
},
Range: datatypes.TimestampRange{
Start: 0,
End: 15,
},
}
resultSet := reads.NewGroupResultSet(context.Background(), &request, func() (reads.SeriesCursor, error) {
seriesCursor := newMockReadGroupCursor(
"clicks,host=foo,location=dallas click=3 5",
)
seriesCursor.rows[0].Query = &mockCursorIterator{
newCursorFn: func() cursors.Cursor {
return &mockStringArrayCursor{}
},
}
return seriesCursor, nil
})
if resultSet == nil {
t.Fatalf("resultSet was nil")
}
groupByCursor := resultSet.Next()
if groupByCursor == nil {
t.Fatal("unexpected: groupByCursor was nil")
}
if groupByCursor.Next() {
t.Fatal("unexpected: groupByCursor.Next should not have advanced")
}
err := groupByCursor.Err()
if err == nil {
t.Fatal("expected error")
}
if want, got := "unsupported input type for sum aggregate: string", err.Error(); want != got {
t.Fatalf("unexpected error:\n\t- %q\n\t+ %q", want, got)
}
}

View File

@ -10,10 +10,10 @@ import (
type resultSet struct {
ctx context.Context
agg *datatypes.Aggregate
seriesCursor SeriesCursor
seriesRow SeriesRow
arrayCursors *arrayCursors
cursor cursors.Cursor
}
func NewFilteredResultSet(ctx context.Context, req *datatypes.ReadFilterRequest, seriesCursor SeriesCursor) ResultSet {
@ -45,18 +45,13 @@ func (r *resultSet) Next() bool {
if seriesRow == nil {
return false
}
r.seriesRow = *seriesRow
r.cursor = r.arrayCursors.createCursor(r.seriesRow)
return true
}
func (r *resultSet) Cursor() cursors.Cursor {
cur := r.arrayCursors.createCursor(r.seriesRow)
if r.agg != nil {
cur = newAggregateArrayCursor(r.ctx, r.agg, cur)
}
return cur
return r.cursor
}
func (r *resultSet) Tags() models.Tags {
@ -69,5 +64,9 @@ func (r *resultSet) Stats() cursors.CursorStats {
if r.seriesRow.Query == nil {
return cursors.CursorStats{}
}
// All seriesRows share the same underlying cursor iterator
// which contains the aggregated stats of the query.
// So this seems like it is returning the stats only from the
// last series, but this returns the stats from all series.
return r.seriesRow.Query.Stats()
}