fix: Bubble up errors for cursors
parent
6a1cc9fcf7
commit
c8ee8127af
|
@ -1,6 +1,8 @@
|
|||
package mock
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/influxdata/influxdb/v2/models"
|
||||
"github.com/influxdata/influxdb/v2/pkg/data/gen"
|
||||
"github.com/influxdata/influxdb/v2/storage/reads"
|
||||
|
@ -61,7 +63,7 @@ func (g *GeneratorResultSet) Next() bool {
|
|||
return g.sg.Next() && (g.max == 0 || remain > 0)
|
||||
}
|
||||
|
||||
func (g *GeneratorResultSet) Cursor() cursors.Cursor {
|
||||
func (g *GeneratorResultSet) Cursor() (cursors.Cursor, error) {
|
||||
switch g.sg.FieldType() {
|
||||
case models.Float:
|
||||
g.f.tv = g.sg.TimeValuesGenerator()
|
||||
|
@ -79,10 +81,10 @@ func (g *GeneratorResultSet) Cursor() cursors.Cursor {
|
|||
g.b.tv = g.sg.TimeValuesGenerator()
|
||||
g.cur = &g.b
|
||||
default:
|
||||
panic("unreachable")
|
||||
return nil, fmt.Errorf("unsupported field type: %v", g.sg.FieldType())
|
||||
}
|
||||
|
||||
return g.cur
|
||||
return g.cur, nil
|
||||
}
|
||||
|
||||
func copyTags(dst, src models.Tags) models.Tags {
|
||||
|
|
|
@ -194,7 +194,10 @@ func (fi *filterIterator) handleRead(f func(flux.Table) error, rs storage.Result
|
|||
|
||||
READ:
|
||||
for rs.Next() {
|
||||
cur = rs.Cursor()
|
||||
cur, err := rs.Cursor()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if cur == nil {
|
||||
// no data for series key + field combination
|
||||
continue
|
||||
|
@ -331,7 +334,7 @@ func (gi *groupIterator) handleRead(f func(flux.Table) error, rs storage.GroupRe
|
|||
READ:
|
||||
for gc != nil {
|
||||
for gc.Next() {
|
||||
cur = gc.Cursor()
|
||||
cur, _ = gc.Cursor()
|
||||
if cur != nil {
|
||||
break
|
||||
}
|
||||
|
@ -740,7 +743,10 @@ func (wai *windowAggregateIterator) handleRead(f func(flux.Table) error, rs stor
|
|||
|
||||
READ:
|
||||
for rs.Next() {
|
||||
cur = rs.Cursor()
|
||||
cur, err = rs.Cursor()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if cur == nil {
|
||||
// no data for series key + field combination
|
||||
continue
|
||||
|
|
|
@ -970,7 +970,7 @@ func (t *floatGroupTable) advanceCursor() bool {
|
|||
t.cur.Close()
|
||||
t.cur = nil
|
||||
for t.gc.Next() {
|
||||
cur := t.gc.Cursor()
|
||||
cur, _ := t.gc.Cursor()
|
||||
if cur == nil {
|
||||
continue
|
||||
}
|
||||
|
@ -1954,7 +1954,7 @@ func (t *integerGroupTable) advanceCursor() bool {
|
|||
t.cur.Close()
|
||||
t.cur = nil
|
||||
for t.gc.Next() {
|
||||
cur := t.gc.Cursor()
|
||||
cur, _ := t.gc.Cursor()
|
||||
if cur == nil {
|
||||
continue
|
||||
}
|
||||
|
@ -2935,7 +2935,7 @@ func (t *unsignedGroupTable) advanceCursor() bool {
|
|||
t.cur.Close()
|
||||
t.cur = nil
|
||||
for t.gc.Next() {
|
||||
cur := t.gc.Cursor()
|
||||
cur, _ := t.gc.Cursor()
|
||||
if cur == nil {
|
||||
continue
|
||||
}
|
||||
|
@ -3860,7 +3860,7 @@ func (t *stringGroupTable) advanceCursor() bool {
|
|||
t.cur.Close()
|
||||
t.cur = nil
|
||||
for t.gc.Next() {
|
||||
cur := t.gc.Cursor()
|
||||
cur, _ := t.gc.Cursor()
|
||||
if cur == nil {
|
||||
continue
|
||||
}
|
||||
|
@ -4785,7 +4785,7 @@ func (t *booleanGroupTable) advanceCursor() bool {
|
|||
t.cur.Close()
|
||||
t.cur = nil
|
||||
for t.gc.Next() {
|
||||
cur := t.gc.Cursor()
|
||||
cur, _ := t.gc.Cursor()
|
||||
if cur == nil {
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -1000,7 +1000,7 @@ func (t *{{.name}}GroupTable) advanceCursor() bool {
|
|||
t.cur.Close()
|
||||
t.cur = nil
|
||||
for t.gc.Next() {
|
||||
cur := t.gc.Cursor()
|
||||
cur, _ := t.gc.Cursor()
|
||||
if cur == nil {
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -97,7 +97,10 @@ func (r *windowAggregateResultSet) createCursor(seriesRow SeriesRow) (cursors.Cu
|
|||
agg := r.req.Aggregate[0]
|
||||
every := r.req.WindowEvery
|
||||
offset := r.req.Offset
|
||||
cursor := r.arrayCursors.createCursor(seriesRow)
|
||||
cursor, err := r.arrayCursors.createCursor(seriesRow)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var everyDur values.Duration
|
||||
var offsetDur values.Duration
|
||||
|
@ -132,8 +135,8 @@ func (r *windowAggregateResultSet) createCursor(seriesRow SeriesRow) (cursors.Cu
|
|||
}
|
||||
}
|
||||
|
||||
func (r *windowAggregateResultSet) Cursor() cursors.Cursor {
|
||||
return r.cursor
|
||||
func (r *windowAggregateResultSet) Cursor() (cursors.Cursor, error) {
|
||||
return r.cursor, nil
|
||||
}
|
||||
|
||||
func (r *windowAggregateResultSet) Close() {
|
||||
|
|
|
@ -2,6 +2,7 @@ package reads_test
|
|||
|
||||
import (
|
||||
"context"
|
||||
"github.com/stretchr/testify/require"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -196,7 +197,8 @@ func TestNewWindowAggregateResultSet_Mean(t *testing.T) {
|
|||
if !resultSet.Next() {
|
||||
t.Fatalf("unexpected: resultSet could not advance")
|
||||
}
|
||||
cursor := resultSet.Cursor()
|
||||
cursor, err := resultSet.Cursor()
|
||||
require.NoError(t, err, "create cursor failed")
|
||||
if cursor == nil {
|
||||
t.Fatalf("unexpected: cursor was nil")
|
||||
}
|
||||
|
@ -238,7 +240,8 @@ func TestNewWindowAggregateResultSet_Months(t *testing.T) {
|
|||
if !resultSet.Next() {
|
||||
t.Fatalf("unexpected: resultSet could not advance")
|
||||
}
|
||||
cursor := resultSet.Cursor()
|
||||
cursor, err := resultSet.Cursor()
|
||||
require.NoError(t, err, "create cursor failed")
|
||||
if cursor == nil {
|
||||
t.Fatalf("unexpected: cursor was nil")
|
||||
}
|
||||
|
|
|
@ -45,7 +45,7 @@ func newLimitArrayCursor(cur cursors.Cursor) (cursors.Cursor, error) {
|
|||
default:
|
||||
return nil, &errors2.Error{
|
||||
Code: errors2.EInvalid,
|
||||
Msg: fmt.Sprintf("unreachable: %s", arrayCursorType(cur)),
|
||||
Msg: fmt.Sprintf("unsupported limit array cursor type: %s", arrayCursorType(cur)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -74,7 +74,7 @@ func newWindowFirstArrayCursor(cur cursors.Cursor, window interval.Window) (curs
|
|||
default:
|
||||
return nil, &errors2.Error{
|
||||
Code: errors2.EInvalid,
|
||||
Msg: fmt.Sprintf("unreachable: %s", arrayCursorType(cur)),
|
||||
Msg: fmt.Sprintf("unsupported window first cursor type: %s", arrayCursorType(cur)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -103,7 +103,7 @@ func newWindowLastArrayCursor(cur cursors.Cursor, window interval.Window) (curso
|
|||
default:
|
||||
return nil, &errors2.Error{
|
||||
Code: errors2.EInvalid,
|
||||
Msg: fmt.Sprintf("unreachable: %s", arrayCursorType(cur)),
|
||||
Msg: fmt.Sprintf("unsupported window last cursor type: %s", arrayCursorType(cur)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -129,7 +129,7 @@ func newWindowCountArrayCursor(cur cursors.Cursor, window interval.Window) (curs
|
|||
default:
|
||||
return nil, &errors2.Error{
|
||||
Code: errors2.EInvalid,
|
||||
Msg: fmt.Sprintf("unreachable: %s", arrayCursorType(cur)),
|
||||
Msg: fmt.Sprintf("unsupported window count cursor type: %s", arrayCursorType(cur)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package reads
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
errors2 "github.com/influxdata/influxdb/v2/kit/platform/errors"
|
||||
|
||||
"github.com/influxdata/flux/interval"
|
||||
|
@ -104,7 +105,7 @@ func newMultiShardArrayCursors(ctx context.Context, start, end int64, asc bool)
|
|||
return m
|
||||
}
|
||||
|
||||
func (m *multiShardArrayCursors) createCursor(row SeriesRow) cursors.Cursor {
|
||||
func (m *multiShardArrayCursors) createCursor(row SeriesRow) (cursors.Cursor, error) {
|
||||
m.req.Name = row.Name
|
||||
m.req.Tags = row.SeriesTags
|
||||
m.req.Field = row.Field
|
||||
|
@ -123,26 +124,29 @@ func (m *multiShardArrayCursors) createCursor(row SeriesRow) cursors.Cursor {
|
|||
}
|
||||
|
||||
if cur == nil || err != nil {
|
||||
return nil
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch c := cur.(type) {
|
||||
case cursors.IntegerArrayCursor:
|
||||
m.cursors.i.reset(c, row.Query, cond)
|
||||
return &m.cursors.i
|
||||
return &m.cursors.i, nil
|
||||
case cursors.FloatArrayCursor:
|
||||
m.cursors.f.reset(c, row.Query, cond)
|
||||
return &m.cursors.f
|
||||
return &m.cursors.f, nil
|
||||
case cursors.UnsignedArrayCursor:
|
||||
m.cursors.u.reset(c, row.Query, cond)
|
||||
return &m.cursors.u
|
||||
return &m.cursors.u, nil
|
||||
case cursors.StringArrayCursor:
|
||||
m.cursors.s.reset(c, row.Query, cond)
|
||||
return &m.cursors.s
|
||||
return &m.cursors.s, nil
|
||||
case cursors.BooleanArrayCursor:
|
||||
m.cursors.b.reset(c, row.Query, cond)
|
||||
return &m.cursors.b
|
||||
return &m.cursors.b, nil
|
||||
default:
|
||||
return nil
|
||||
return nil, &errors2.Error{
|
||||
Code: errors2.EInvalid,
|
||||
Msg: fmt.Sprintf("unsupported cursor type: %s", arrayCursorType(cur)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2227,7 +2227,9 @@ func TestMultiShardArrayCursor(t *testing.T) {
|
|||
row := SeriesRow{Query: iter}
|
||||
ctx := context.Background()
|
||||
msac := newMultiShardArrayCursors(ctx, models.MinNanoTime, models.MaxNanoTime, true)
|
||||
cur, ok := msac.createCursor(row).(cursors.IntegerArrayCursor)
|
||||
cursor, err := msac.createCursor(row)
|
||||
require.NoError(t, err, "create cursor failed")
|
||||
cur, ok := cursor.(cursors.IntegerArrayCursor)
|
||||
require.Truef(t, ok, "Expected IntegerArrayCursor")
|
||||
|
||||
ia := cur.Next()
|
||||
|
|
|
@ -121,7 +121,10 @@ func (g *groupResultSet) Next() GroupCursor {
|
|||
// the time range of the query.
|
||||
func (g *groupResultSet) seriesHasPoints(row *SeriesRow) bool {
|
||||
// TODO(sgc): this is expensive. Storage engine must provide efficient time range queries of series keys.
|
||||
cur := g.arrayCursors.createCursor(*row)
|
||||
cur, err := g.arrayCursors.createCursor(*row)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
var ts []int64
|
||||
switch c := cur.(type) {
|
||||
case cursors.IntegerArrayCursor:
|
||||
|
@ -302,15 +305,18 @@ func (c *groupNoneCursor) Next() bool {
|
|||
}
|
||||
|
||||
func (c *groupNoneCursor) createCursor(seriesRow SeriesRow) (cur cursors.Cursor, err error) {
|
||||
cur = c.arrayCursors.createCursor(c.row)
|
||||
cur, err = c.arrayCursors.createCursor(c.row)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if c.agg != nil {
|
||||
cur, err = newAggregateArrayCursor(c.ctx, c.agg, cur)
|
||||
}
|
||||
return cur, err
|
||||
}
|
||||
|
||||
func (c *groupNoneCursor) Cursor() cursors.Cursor {
|
||||
return c.cursor
|
||||
func (c *groupNoneCursor) Cursor() (cursors.Cursor, error) {
|
||||
return c.cursor, nil
|
||||
}
|
||||
|
||||
type groupByCursor struct {
|
||||
|
@ -350,15 +356,18 @@ func (c *groupByCursor) Next() bool {
|
|||
}
|
||||
|
||||
func (c *groupByCursor) createCursor(seriesRow SeriesRow) (cur cursors.Cursor, err error) {
|
||||
cur = c.arrayCursors.createCursor(seriesRow)
|
||||
cur, err = c.arrayCursors.createCursor(seriesRow)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if c.agg != nil {
|
||||
cur, err = newAggregateArrayCursor(c.ctx, c.agg, cur)
|
||||
}
|
||||
return cur, err
|
||||
}
|
||||
|
||||
func (c *groupByCursor) Cursor() cursors.Cursor {
|
||||
return c.cursor
|
||||
func (c *groupByCursor) Cursor() (cursors.Cursor, error) {
|
||||
return c.cursor, nil
|
||||
}
|
||||
|
||||
func (c *groupByCursor) Stats() cursors.CursorStats {
|
||||
|
|
|
@ -8,7 +8,7 @@ import (
|
|||
)
|
||||
|
||||
type multiShardCursors interface {
|
||||
createCursor(row SeriesRow) cursors.Cursor
|
||||
createCursor(row SeriesRow) (cursors.Cursor, error)
|
||||
}
|
||||
|
||||
type resultSet struct {
|
||||
|
@ -57,7 +57,7 @@ func (r *resultSet) Next() bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func (r *resultSet) Cursor() cursors.Cursor {
|
||||
func (r *resultSet) Cursor() (cursors.Cursor, error) {
|
||||
return r.arrayCursors.createCursor(r.seriesRow)
|
||||
}
|
||||
|
||||
|
|
|
@ -32,7 +32,11 @@ func ResultSetToLineProtocol(wr io.Writer, rs ResultSet) (err error) {
|
|||
line = append(line, ' ')
|
||||
line = append(line, field...)
|
||||
line = append(line, '=')
|
||||
err = cursorToLineProtocol(wr, line, rs.Cursor())
|
||||
cursor, err := rs.Cursor()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = cursorToLineProtocol(wr, line, cursor)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -15,7 +15,7 @@ type ResultSet interface {
|
|||
Next() bool
|
||||
|
||||
// Cursor returns the most recent cursor after a call to Next.
|
||||
Cursor() cursors.Cursor
|
||||
Cursor() (cursors.Cursor, error)
|
||||
|
||||
// Tags returns the tags for the most recent cursor after a call to Next.
|
||||
Tags() models.Tags
|
||||
|
@ -47,7 +47,7 @@ type GroupCursor interface {
|
|||
Next() bool
|
||||
|
||||
// Cursor returns the most recent cursor after a call to Next.
|
||||
Cursor() cursors.Cursor
|
||||
Cursor() (cursors.Cursor, error)
|
||||
|
||||
// Tags returns the tags for the most recent cursor after a call to Next.
|
||||
Tags() models.Tags
|
||||
|
|
|
@ -119,7 +119,7 @@ func resultSetToString(wr io.Writer, rs reads.ResultSet, opts ...optionFn) {
|
|||
for rs.Next() {
|
||||
fmt.Fprint(wr, "series: ")
|
||||
tagsToString(wr, rs.Tags())
|
||||
cur := rs.Cursor()
|
||||
cur, _ := rs.Cursor()
|
||||
|
||||
if po.SkipNilCursor && cur == nil {
|
||||
continue
|
||||
|
|
|
@ -257,7 +257,11 @@ func (s *Store) tagKeysWithFieldPredicate(ctx context.Context, mqAttrs *metaquer
|
|||
rs := reads.NewFilteredResultSet(ctx, mqAttrs.start, mqAttrs.end, cur)
|
||||
for rs.Next() {
|
||||
func() {
|
||||
c := rs.Cursor()
|
||||
c, err := rs.Cursor()
|
||||
if err != nil {
|
||||
s.Logger.Error("failed to get next cursor during iteration", zap.Error(err))
|
||||
return
|
||||
}
|
||||
if c == nil {
|
||||
// no data for series key + field combination
|
||||
return
|
||||
|
@ -623,7 +627,11 @@ func (s *Store) tagValuesSlow(ctx context.Context, mqAttrs *metaqueryAttributes,
|
|||
rs := reads.NewFilteredResultSet(ctx, mqAttrs.start, mqAttrs.end, cur)
|
||||
for rs.Next() {
|
||||
func() {
|
||||
c := rs.Cursor()
|
||||
c, err := rs.Cursor()
|
||||
if err != nil {
|
||||
s.Logger.Error("failed to get next cursor during iteration", zap.Error(err))
|
||||
return
|
||||
}
|
||||
if c == nil {
|
||||
// no data for series key + field combination?
|
||||
// It seems that even when there is no data for this series key + field
|
||||
|
@ -764,7 +772,11 @@ func (s *Store) seriesCardinalityWithPredicateAndTime(ctx context.Context, shard
|
|||
rs := reads.NewFilteredResultSet(ctx, start, end, cur)
|
||||
for rs.Next() {
|
||||
func() {
|
||||
c := rs.Cursor()
|
||||
c, err := rs.Cursor()
|
||||
if err != nil {
|
||||
s.Logger.Error("failed to get next cursor during iteration", zap.Error(err))
|
||||
return
|
||||
}
|
||||
if c == nil {
|
||||
// no data for series key + field combination
|
||||
return
|
||||
|
|
Loading…
Reference in New Issue