diff --git a/storage/flux/reader.go b/storage/flux/reader.go index 204b073a82..0ccfb677c6 100644 --- a/storage/flux/reader.go +++ b/storage/flux/reader.go @@ -17,6 +17,32 @@ import ( "github.com/influxdata/influxdb/v2/tsdb/cursors" ) +// GroupCursorError is returned when two different cursor types +// are read for the same table. +type GroupCursorError struct { + typ string + cursor cursors.Cursor +} + +func (err *GroupCursorError) Error() string { + var got string + switch err.cursor.(type) { + case cursors.FloatArrayCursor: + got = "float" + case cursors.IntegerArrayCursor: + got = "integer" + case cursors.UnsignedArrayCursor: + got = "unsigned" + case cursors.StringArrayCursor: + got = "string" + case cursors.BooleanArrayCursor: + got = "boolean" + default: + got = "invalid" + } + return fmt.Sprintf("schema collision: cannot group %s and %s types together", err.typ, got) +} + type storageTable interface { flux.Table Close() diff --git a/storage/flux/table.gen.go b/storage/flux/table.gen.go index 6740532b8d..966c5fcd26 100644 --- a/storage/flux/table.gen.go +++ b/storage/flux/table.gen.go @@ -13,10 +13,10 @@ import ( "github.com/influxdata/flux/arrow" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/memory" + "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/models" storage "github.com/influxdata/influxdb/v2/storage/reads" "github.com/influxdata/influxdb/v2/tsdb/cursors" - "github.com/pkg/errors" ) // @@ -182,7 +182,13 @@ func (t *floatGroupTable) advanceCursor() bool { if typedCur, ok := cur.(cursors.FloatArrayCursor); !ok { // TODO(sgc): error or skip? cur.Close() - t.err = errors.Errorf("expected float cursor type, got %T", cur) + t.err = &influxdb.Error{ + Code: influxdb.EInvalid, + Err: &GroupCursorError{ + typ: "float", + cursor: cur, + }, + } return false } else { t.readTags(t.gc.Tags()) @@ -367,7 +373,13 @@ func (t *integerGroupTable) advanceCursor() bool { if typedCur, ok := cur.(cursors.IntegerArrayCursor); !ok { // TODO(sgc): error or skip? cur.Close() - t.err = errors.Errorf("expected integer cursor type, got %T", cur) + t.err = &influxdb.Error{ + Code: influxdb.EInvalid, + Err: &GroupCursorError{ + typ: "integer", + cursor: cur, + }, + } return false } else { t.readTags(t.gc.Tags()) @@ -552,7 +564,13 @@ func (t *unsignedGroupTable) advanceCursor() bool { if typedCur, ok := cur.(cursors.UnsignedArrayCursor); !ok { // TODO(sgc): error or skip? cur.Close() - t.err = errors.Errorf("expected unsigned cursor type, got %T", cur) + t.err = &influxdb.Error{ + Code: influxdb.EInvalid, + Err: &GroupCursorError{ + typ: "unsigned", + cursor: cur, + }, + } return false } else { t.readTags(t.gc.Tags()) @@ -737,7 +755,13 @@ func (t *stringGroupTable) advanceCursor() bool { if typedCur, ok := cur.(cursors.StringArrayCursor); !ok { // TODO(sgc): error or skip? cur.Close() - t.err = errors.Errorf("expected string cursor type, got %T", cur) + t.err = &influxdb.Error{ + Code: influxdb.EInvalid, + Err: &GroupCursorError{ + typ: "string", + cursor: cur, + }, + } return false } else { t.readTags(t.gc.Tags()) @@ -922,7 +946,13 @@ func (t *booleanGroupTable) advanceCursor() bool { if typedCur, ok := cur.(cursors.BooleanArrayCursor); !ok { // TODO(sgc): error or skip? cur.Close() - t.err = errors.Errorf("expected boolean cursor type, got %T", cur) + t.err = &influxdb.Error{ + Code: influxdb.EInvalid, + Err: &GroupCursorError{ + typ: "boolean", + cursor: cur, + }, + } return false } else { t.readTags(t.gc.Tags()) diff --git a/storage/flux/table.gen.go.tmpl b/storage/flux/table.gen.go.tmpl index d55592df49..44dbb1b89a 100644 --- a/storage/flux/table.gen.go.tmpl +++ b/storage/flux/table.gen.go.tmpl @@ -7,10 +7,10 @@ import ( "github.com/influxdata/flux/arrow" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/memory" + "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/models" storage "github.com/influxdata/influxdb/v2/storage/reads" "github.com/influxdata/influxdb/v2/tsdb/cursors" - "github.com/pkg/errors" ) {{range .}} // @@ -176,7 +176,13 @@ func (t *{{.name}}GroupTable) advanceCursor() bool { if typedCur, ok := cur.(cursors.{{.Name}}ArrayCursor); !ok { // TODO(sgc): error or skip? cur.Close() - t.err = errors.Errorf("expected {{.name}} cursor type, got %T", cur) + t.err = &influxdb.Error { + Code: influxdb.EInvalid, + Err: &GroupCursorError { + typ: "{{.name}}", + cursor: cur, + }, + } return false } else { t.readTags(t.gc.Tags())