fix: return and respect cursor errors (#24791) (#24846)

ArrayCursors were ignoring errors, which led to panics when nil
cursors were operated on. This fix passes errors back up the stack
and uses them to enforce healthy cursor creation.

Closes https://github.com/influxdata/influxdb/issues/24789
---------
Co-authored-by: Stuart Carnie <stuart.carnie@gmail.com>

(cherry picked from commit fe6c64b21e)

closes https://github.com/influxdata/influxdb/issues/24836
pull/24887/head
davidby-influx 2024-03-26 14:54:32 -07:00 committed by GitHub
parent 2066c4be46
commit 49d0bef3ea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 245 additions and 93 deletions

View File

@ -315,13 +315,15 @@ func (c *floatMultiShardArrayCursor) nextArrayCursor() bool {
var itr cursors.CursorIterator
var cur cursors.Cursor
for cur == nil && len(c.itrs) > 0 {
var err error
for cur == nil && len(c.itrs) > 0 && err == nil {
itr, c.itrs = c.itrs[0], c.itrs[1:]
cur, _ = itr.Next(c.ctx, c.req)
cur, err = itr.Next(c.ctx, c.req)
}
c.err = err
var ok bool
if cur != nil {
if cur != nil && err == nil {
var next cursors.FloatArrayCursor
next, ok = cur.(cursors.FloatArrayCursor)
if !ok {
@ -1196,13 +1198,15 @@ func (c *integerMultiShardArrayCursor) nextArrayCursor() bool {
var itr cursors.CursorIterator
var cur cursors.Cursor
for cur == nil && len(c.itrs) > 0 {
var err error
for cur == nil && len(c.itrs) > 0 && err == nil {
itr, c.itrs = c.itrs[0], c.itrs[1:]
cur, _ = itr.Next(c.ctx, c.req)
cur, err = itr.Next(c.ctx, c.req)
}
c.err = err
var ok bool
if cur != nil {
if cur != nil && err == nil {
var next cursors.IntegerArrayCursor
next, ok = cur.(cursors.IntegerArrayCursor)
if !ok {
@ -2077,13 +2081,15 @@ func (c *unsignedMultiShardArrayCursor) nextArrayCursor() bool {
var itr cursors.CursorIterator
var cur cursors.Cursor
for cur == nil && len(c.itrs) > 0 {
var err error
for cur == nil && len(c.itrs) > 0 && err == nil {
itr, c.itrs = c.itrs[0], c.itrs[1:]
cur, _ = itr.Next(c.ctx, c.req)
cur, err = itr.Next(c.ctx, c.req)
}
c.err = err
var ok bool
if cur != nil {
if cur != nil && err == nil {
var next cursors.UnsignedArrayCursor
next, ok = cur.(cursors.UnsignedArrayCursor)
if !ok {
@ -2958,13 +2964,15 @@ func (c *stringMultiShardArrayCursor) nextArrayCursor() bool {
var itr cursors.CursorIterator
var cur cursors.Cursor
for cur == nil && len(c.itrs) > 0 {
var err error
for cur == nil && len(c.itrs) > 0 && err == nil {
itr, c.itrs = c.itrs[0], c.itrs[1:]
cur, _ = itr.Next(c.ctx, c.req)
cur, err = itr.Next(c.ctx, c.req)
}
c.err = err
var ok bool
if cur != nil {
if cur != nil && err == nil {
var next cursors.StringArrayCursor
next, ok = cur.(cursors.StringArrayCursor)
if !ok {
@ -3384,13 +3392,15 @@ func (c *booleanMultiShardArrayCursor) nextArrayCursor() bool {
var itr cursors.CursorIterator
var cur cursors.Cursor
for cur == nil && len(c.itrs) > 0 {
var err error
for cur == nil && len(c.itrs) > 0 && err == nil {
itr, c.itrs = c.itrs[0], c.itrs[1:]
cur, _ = itr.Next(c.ctx, c.req)
cur, err = itr.Next(c.ctx, c.req)
}
c.err = err
var ok bool
if cur != nil {
if cur != nil && err == nil {
var next cursors.BooleanArrayCursor
next, ok = cur.(cursors.BooleanArrayCursor)
if !ok {

View File

@ -263,13 +263,15 @@ func (c *{{.name}}MultiShardArrayCursor) nextArrayCursor() bool {
var itr cursors.CursorIterator
var cur cursors.Cursor
for cur == nil && len(c.itrs) > 0 {
var err error
for cur == nil && len(c.itrs) > 0 && err == nil {
itr, c.itrs = c.itrs[0], c.itrs[1:]
cur, _ = itr.Next(c.ctx, c.req)
}
cur, err = itr.Next(c.ctx, c.req)
}
c.err = err
var ok bool
if cur != nil {
if cur != nil && err == nil {
var next cursors.{{.Name}}ArrayCursor
next, ok = cur.(cursors.{{.Name}}ArrayCursor)
if !ok {

View File

@ -113,12 +113,13 @@ func (m *multiShardArrayCursors) createCursor(row SeriesRow) cursors.Cursor {
var shard cursors.CursorIterator
var cur cursors.Cursor
var err error
for cur == nil && len(row.Query) > 0 {
shard, row.Query = row.Query[0], row.Query[1:]
cur, _ = shard.Next(m.ctx, &m.req)
cur, err = shard.Next(m.ctx, &m.req)
}
if cur == nil {
if cur == nil || err != nil {
return nil
}

View File

@ -39,7 +39,8 @@ func newFloatArrayAscendingCursor() *floatArrayAscendingCursor {
return c
}
func (c *floatArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) {
func (c *floatArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error {
var err error
c.end = end
c.cache.values = cacheValues
c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool {
@ -47,10 +48,14 @@ func (c *floatArrayAscendingCursor) reset(seek, end int64, cacheValues Values, t
})
c.tsm.keyCursor = tsmKeyCursor
c.tsm.values, _ = c.tsm.keyCursor.ReadFloatArrayBlock(c.tsm.buf)
c.tsm.values, err = c.tsm.keyCursor.ReadFloatArrayBlock(c.tsm.buf)
if err != nil {
return err
}
c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool {
return c.tsm.values.Timestamps[i] >= seek
})
return nil
}
func (c *floatArrayAscendingCursor) Err() error { return nil }
@ -182,7 +187,8 @@ func newFloatArrayDescendingCursor() *floatArrayDescendingCursor {
return c
}
func (c *floatArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) {
func (c *floatArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error {
var err error
// Search for the time value greater than the seek time (not included)
// and then move our position back one which will include the values in
// our time range.
@ -194,11 +200,15 @@ func (c *floatArrayDescendingCursor) reset(seek, end int64, cacheValues Values,
c.cache.pos--
c.tsm.keyCursor = tsmKeyCursor
c.tsm.values, _ = c.tsm.keyCursor.ReadFloatArrayBlock(c.tsm.buf)
c.tsm.values, err = c.tsm.keyCursor.ReadFloatArrayBlock(c.tsm.buf)
if err != nil {
return err
}
c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool {
return c.tsm.values.Timestamps[i] > seek
})
c.tsm.pos--
return nil
}
func (c *floatArrayDescendingCursor) Err() error { return nil }
@ -321,7 +331,8 @@ func newIntegerArrayAscendingCursor() *integerArrayAscendingCursor {
return c
}
func (c *integerArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) {
func (c *integerArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error {
var err error
c.end = end
c.cache.values = cacheValues
c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool {
@ -329,10 +340,14 @@ func (c *integerArrayAscendingCursor) reset(seek, end int64, cacheValues Values,
})
c.tsm.keyCursor = tsmKeyCursor
c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerArrayBlock(c.tsm.buf)
c.tsm.values, err = c.tsm.keyCursor.ReadIntegerArrayBlock(c.tsm.buf)
if err != nil {
return err
}
c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool {
return c.tsm.values.Timestamps[i] >= seek
})
return nil
}
func (c *integerArrayAscendingCursor) Err() error { return nil }
@ -464,7 +479,8 @@ func newIntegerArrayDescendingCursor() *integerArrayDescendingCursor {
return c
}
func (c *integerArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) {
func (c *integerArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error {
var err error
// Search for the time value greater than the seek time (not included)
// and then move our position back one which will include the values in
// our time range.
@ -476,11 +492,15 @@ func (c *integerArrayDescendingCursor) reset(seek, end int64, cacheValues Values
c.cache.pos--
c.tsm.keyCursor = tsmKeyCursor
c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerArrayBlock(c.tsm.buf)
c.tsm.values, err = c.tsm.keyCursor.ReadIntegerArrayBlock(c.tsm.buf)
if err != nil {
return err
}
c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool {
return c.tsm.values.Timestamps[i] > seek
})
c.tsm.pos--
return nil
}
func (c *integerArrayDescendingCursor) Err() error { return nil }
@ -603,7 +623,8 @@ func newUnsignedArrayAscendingCursor() *unsignedArrayAscendingCursor {
return c
}
func (c *unsignedArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) {
func (c *unsignedArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error {
var err error
c.end = end
c.cache.values = cacheValues
c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool {
@ -611,10 +632,14 @@ func (c *unsignedArrayAscendingCursor) reset(seek, end int64, cacheValues Values
})
c.tsm.keyCursor = tsmKeyCursor
c.tsm.values, _ = c.tsm.keyCursor.ReadUnsignedArrayBlock(c.tsm.buf)
c.tsm.values, err = c.tsm.keyCursor.ReadUnsignedArrayBlock(c.tsm.buf)
if err != nil {
return err
}
c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool {
return c.tsm.values.Timestamps[i] >= seek
})
return nil
}
func (c *unsignedArrayAscendingCursor) Err() error { return nil }
@ -746,7 +771,8 @@ func newUnsignedArrayDescendingCursor() *unsignedArrayDescendingCursor {
return c
}
func (c *unsignedArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) {
func (c *unsignedArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error {
var err error
// Search for the time value greater than the seek time (not included)
// and then move our position back one which will include the values in
// our time range.
@ -758,11 +784,15 @@ func (c *unsignedArrayDescendingCursor) reset(seek, end int64, cacheValues Value
c.cache.pos--
c.tsm.keyCursor = tsmKeyCursor
c.tsm.values, _ = c.tsm.keyCursor.ReadUnsignedArrayBlock(c.tsm.buf)
c.tsm.values, err = c.tsm.keyCursor.ReadUnsignedArrayBlock(c.tsm.buf)
if err != nil {
return err
}
c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool {
return c.tsm.values.Timestamps[i] > seek
})
c.tsm.pos--
return nil
}
func (c *unsignedArrayDescendingCursor) Err() error { return nil }
@ -885,7 +915,8 @@ func newStringArrayAscendingCursor() *stringArrayAscendingCursor {
return c
}
func (c *stringArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) {
func (c *stringArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error {
var err error
c.end = end
c.cache.values = cacheValues
c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool {
@ -893,10 +924,14 @@ func (c *stringArrayAscendingCursor) reset(seek, end int64, cacheValues Values,
})
c.tsm.keyCursor = tsmKeyCursor
c.tsm.values, _ = c.tsm.keyCursor.ReadStringArrayBlock(c.tsm.buf)
c.tsm.values, err = c.tsm.keyCursor.ReadStringArrayBlock(c.tsm.buf)
if err != nil {
return err
}
c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool {
return c.tsm.values.Timestamps[i] >= seek
})
return nil
}
func (c *stringArrayAscendingCursor) Err() error { return nil }
@ -1028,7 +1063,8 @@ func newStringArrayDescendingCursor() *stringArrayDescendingCursor {
return c
}
func (c *stringArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) {
func (c *stringArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error {
var err error
// Search for the time value greater than the seek time (not included)
// and then move our position back one which will include the values in
// our time range.
@ -1040,11 +1076,15 @@ func (c *stringArrayDescendingCursor) reset(seek, end int64, cacheValues Values,
c.cache.pos--
c.tsm.keyCursor = tsmKeyCursor
c.tsm.values, _ = c.tsm.keyCursor.ReadStringArrayBlock(c.tsm.buf)
c.tsm.values, err = c.tsm.keyCursor.ReadStringArrayBlock(c.tsm.buf)
if err != nil {
return err
}
c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool {
return c.tsm.values.Timestamps[i] > seek
})
c.tsm.pos--
return nil
}
func (c *stringArrayDescendingCursor) Err() error { return nil }
@ -1167,7 +1207,8 @@ func newBooleanArrayAscendingCursor() *booleanArrayAscendingCursor {
return c
}
func (c *booleanArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) {
func (c *booleanArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error {
var err error
c.end = end
c.cache.values = cacheValues
c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool {
@ -1175,10 +1216,14 @@ func (c *booleanArrayAscendingCursor) reset(seek, end int64, cacheValues Values,
})
c.tsm.keyCursor = tsmKeyCursor
c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanArrayBlock(c.tsm.buf)
c.tsm.values, err = c.tsm.keyCursor.ReadBooleanArrayBlock(c.tsm.buf)
if err != nil {
return err
}
c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool {
return c.tsm.values.Timestamps[i] >= seek
})
return nil
}
func (c *booleanArrayAscendingCursor) Err() error { return nil }
@ -1310,7 +1355,8 @@ func newBooleanArrayDescendingCursor() *booleanArrayDescendingCursor {
return c
}
func (c *booleanArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) {
func (c *booleanArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error {
var err error
// Search for the time value greater than the seek time (not included)
// and then move our position back one which will include the values in
// our time range.
@ -1322,11 +1368,15 @@ func (c *booleanArrayDescendingCursor) reset(seek, end int64, cacheValues Values
c.cache.pos--
c.tsm.keyCursor = tsmKeyCursor
c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanArrayBlock(c.tsm.buf)
c.tsm.values, err = c.tsm.keyCursor.ReadBooleanArrayBlock(c.tsm.buf)
if err != nil {
return err
}
c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool {
return c.tsm.values.Timestamps[i] > seek
})
c.tsm.pos--
return nil
}
func (c *booleanArrayDescendingCursor) Err() error { return nil }

View File

@ -38,18 +38,23 @@ func new{{$Type}}() *{{$type}} {
return c
}
func (c *{{$type}}) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) {
c.end = end
func (c *{{$type}}) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error {
var err error
c.end = end
c.cache.values = cacheValues
c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool {
return c.cache.values[i].UnixNano() >= seek
})
c.tsm.keyCursor = tsmKeyCursor
c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}ArrayBlock(c.tsm.buf)
c.tsm.values, err = c.tsm.keyCursor.Read{{.Name}}ArrayBlock(c.tsm.buf)
if err != nil {
return err
}
c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool {
return c.tsm.values.Timestamps[i] >= seek
})
return nil
}
func (c *{{$type}}) Err() error { return nil }
@ -184,7 +189,8 @@ func new{{$Type}}() *{{$type}} {
return c
}
func (c *{{$type}}) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) {
func (c *{{$type}}) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error {
var err error
// Search for the time value greater than the seek time (not included)
// and then move our position back one which will include the values in
// our time range.
@ -196,11 +202,15 @@ func (c *{{$type}}) reset(seek, end int64, cacheValues Values, tsmKeyCursor *Key
c.cache.pos--
c.tsm.keyCursor = tsmKeyCursor
c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}ArrayBlock(c.tsm.buf)
c.tsm.values, err = c.tsm.keyCursor.Read{{.Name}}ArrayBlock(c.tsm.buf)
if err != nil {
return err
}
c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool {
return c.tsm.values.Timestamps[i] > seek
})
c.tsm.pos--
return nil
}
func (c *{{$type}}) Err() error { return nil }

View File

@ -15,7 +15,8 @@ import (
)
// buildFloatArrayCursor creates an array cursor for a float field.
func (q *arrayCursorIterator) buildFloatArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) tsdb.FloatArrayCursor {
func (q *arrayCursorIterator) buildFloatArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) (tsdb.FloatArrayCursor, error) {
var err error
key := q.seriesFieldKeyBytes(name, tags, field)
cacheValues := q.e.Cache.Values(key)
keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending)
@ -23,19 +24,26 @@ func (q *arrayCursorIterator) buildFloatArrayCursor(ctx context.Context, name []
if q.asc.Float == nil {
q.asc.Float = newFloatArrayAscendingCursor()
}
q.asc.Float.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor)
return q.asc.Float
err = q.asc.Float.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor)
if err != nil {
return nil, err
}
return q.asc.Float, nil
} else {
if q.desc.Float == nil {
q.desc.Float = newFloatArrayDescendingCursor()
}
q.desc.Float.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor)
return q.desc.Float
err = q.desc.Float.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor)
if err != nil {
return nil, err
}
return q.desc.Float, nil
}
}
// buildIntegerArrayCursor creates an array cursor for a integer field.
func (q *arrayCursorIterator) buildIntegerArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) tsdb.IntegerArrayCursor {
func (q *arrayCursorIterator) buildIntegerArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) (tsdb.IntegerArrayCursor, error) {
var err error
key := q.seriesFieldKeyBytes(name, tags, field)
cacheValues := q.e.Cache.Values(key)
keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending)
@ -43,19 +51,26 @@ func (q *arrayCursorIterator) buildIntegerArrayCursor(ctx context.Context, name
if q.asc.Integer == nil {
q.asc.Integer = newIntegerArrayAscendingCursor()
}
q.asc.Integer.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor)
return q.asc.Integer
err = q.asc.Integer.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor)
if err != nil {
return nil, err
}
return q.asc.Integer, nil
} else {
if q.desc.Integer == nil {
q.desc.Integer = newIntegerArrayDescendingCursor()
}
q.desc.Integer.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor)
return q.desc.Integer
err = q.desc.Integer.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor)
if err != nil {
return nil, err
}
return q.desc.Integer, nil
}
}
// buildUnsignedArrayCursor creates an array cursor for a unsigned field.
func (q *arrayCursorIterator) buildUnsignedArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) tsdb.UnsignedArrayCursor {
func (q *arrayCursorIterator) buildUnsignedArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) (tsdb.UnsignedArrayCursor, error) {
var err error
key := q.seriesFieldKeyBytes(name, tags, field)
cacheValues := q.e.Cache.Values(key)
keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending)
@ -63,19 +78,26 @@ func (q *arrayCursorIterator) buildUnsignedArrayCursor(ctx context.Context, name
if q.asc.Unsigned == nil {
q.asc.Unsigned = newUnsignedArrayAscendingCursor()
}
q.asc.Unsigned.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor)
return q.asc.Unsigned
err = q.asc.Unsigned.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor)
if err != nil {
return nil, err
}
return q.asc.Unsigned, nil
} else {
if q.desc.Unsigned == nil {
q.desc.Unsigned = newUnsignedArrayDescendingCursor()
}
q.desc.Unsigned.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor)
return q.desc.Unsigned
err = q.desc.Unsigned.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor)
if err != nil {
return nil, err
}
return q.desc.Unsigned, nil
}
}
// buildStringArrayCursor creates an array cursor for a string field.
func (q *arrayCursorIterator) buildStringArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) tsdb.StringArrayCursor {
func (q *arrayCursorIterator) buildStringArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) (tsdb.StringArrayCursor, error) {
var err error
key := q.seriesFieldKeyBytes(name, tags, field)
cacheValues := q.e.Cache.Values(key)
keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending)
@ -83,19 +105,26 @@ func (q *arrayCursorIterator) buildStringArrayCursor(ctx context.Context, name [
if q.asc.String == nil {
q.asc.String = newStringArrayAscendingCursor()
}
q.asc.String.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor)
return q.asc.String
err = q.asc.String.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor)
if err != nil {
return nil, err
}
return q.asc.String, nil
} else {
if q.desc.String == nil {
q.desc.String = newStringArrayDescendingCursor()
}
q.desc.String.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor)
return q.desc.String
err = q.desc.String.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor)
if err != nil {
return nil, err
}
return q.desc.String, nil
}
}
// buildBooleanArrayCursor creates an array cursor for a boolean field.
func (q *arrayCursorIterator) buildBooleanArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) tsdb.BooleanArrayCursor {
func (q *arrayCursorIterator) buildBooleanArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) (tsdb.BooleanArrayCursor, error) {
var err error
key := q.seriesFieldKeyBytes(name, tags, field)
cacheValues := q.e.Cache.Values(key)
keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending)
@ -103,13 +132,19 @@ func (q *arrayCursorIterator) buildBooleanArrayCursor(ctx context.Context, name
if q.asc.Boolean == nil {
q.asc.Boolean = newBooleanArrayAscendingCursor()
}
q.asc.Boolean.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor)
return q.asc.Boolean
err = q.asc.Boolean.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor)
if err != nil {
return nil, err
}
return q.asc.Boolean, nil
} else {
if q.desc.Boolean == nil {
q.desc.Boolean = newBooleanArrayDescendingCursor()
}
q.desc.Boolean.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor)
return q.desc.Boolean
err = q.desc.Boolean.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor)
if err != nil {
return nil, err
}
return q.desc.Boolean, nil
}
}

View File

@ -11,7 +11,8 @@ import (
{{range .}}
// build{{.Name}}ArrayCursor creates an array cursor for a {{.name}} field.
func (q *arrayCursorIterator) build{{.Name}}ArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) tsdb.{{.Name}}ArrayCursor {
func (q *arrayCursorIterator) build{{.Name}}ArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) (tsdb.{{.Name}}ArrayCursor, error) {
var err error
key := q.seriesFieldKeyBytes(name, tags, field)
cacheValues := q.e.Cache.Values(key)
keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending)
@ -19,14 +20,20 @@ func (q *arrayCursorIterator) build{{.Name}}ArrayCursor(ctx context.Context, nam
if q.asc.{{.Name}} == nil {
q.asc.{{.Name}} = new{{.Name}}ArrayAscendingCursor()
}
q.asc.{{.Name}}.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor)
return q.asc.{{.Name}}
err = q.asc.{{.Name}}.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor)
if err != nil {
return nil, err
}
return q.asc.{{.Name}}, nil
} else {
if q.desc.{{.Name}} == nil {
q.desc.{{.Name}} = new{{.Name}}ArrayDescendingCursor()
}
q.desc.{{.Name}}.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor)
return q.desc.{{.Name}}
err = q.desc.{{.Name}}.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor)
if err != nil {
return nil, err
}
return q.desc.{{.Name}}, nil
}
}

View File

@ -62,15 +62,15 @@ func (q *arrayCursorIterator) Next(ctx context.Context, r *tsdb.CursorRequest) (
// Return appropriate cursor based on type.
switch f.Type {
case influxql.Float:
return q.buildFloatArrayCursor(ctx, r.Name, r.Tags, r.Field, opt), nil
return q.buildFloatArrayCursor(ctx, r.Name, r.Tags, r.Field, opt)
case influxql.Integer:
return q.buildIntegerArrayCursor(ctx, r.Name, r.Tags, r.Field, opt), nil
return q.buildIntegerArrayCursor(ctx, r.Name, r.Tags, r.Field, opt)
case influxql.Unsigned:
return q.buildUnsignedArrayCursor(ctx, r.Name, r.Tags, r.Field, opt), nil
return q.buildUnsignedArrayCursor(ctx, r.Name, r.Tags, r.Field, opt)
case influxql.String:
return q.buildStringArrayCursor(ctx, r.Name, r.Tags, r.Field, opt), nil
return q.buildStringArrayCursor(ctx, r.Name, r.Tags, r.Field, opt)
case influxql.Boolean:
return q.buildBooleanArrayCursor(ctx, r.Name, r.Tags, r.Field, opt), nil
return q.buildBooleanArrayCursor(ctx, r.Name, r.Tags, r.Field, opt)
default:
panic(fmt.Sprintf("unreachable: %T", f.Type))
}

View File

@ -28,6 +28,14 @@ func MustTempFile(dir string) *os.File {
return f
}
func MustTempDir() string {
dir, err := os.MkdirTemp("", "tsm1-test")
if err != nil {
panic(fmt.Sprintf("failed to create temp dir: %v", err))
}
return dir
}
func newFiles(dir string, values ...keyValues) ([]string, error) {
var files []string
@ -62,6 +70,35 @@ func newFiles(dir string, values ...keyValues) ([]string, error) {
return files, nil
}
func TestCursor_ResetFail(t *testing.T) {
t.Run("bad block", func(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := NewFileStore(dir, tsdb.EngineTags{})
const START, END = 10, 1
data := []keyValues{
// Write a single data point with timestamp equal to END
{"m,_field=v#!~#v", []Value{NewIntegerValue(1, 1)}},
}
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
_ = fs.Replace(nil, files)
kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, false)
defer kc.Close()
// Open a float cursor for an integer block
cur := newFloatArrayDescendingCursor()
err = cur.reset(START, END, nil, kc)
assert.ErrorContains(t, err, "invalid block", "expected invalid block")
})
}
func TestDescendingCursor_SinglePointStartTime(t *testing.T) {
t.Run("cache", func(t *testing.T) {
dir := t.TempDir()
@ -74,7 +111,7 @@ func TestDescendingCursor_SinglePointStartTime(t *testing.T) {
cur := newIntegerArrayDescendingCursor()
t.Cleanup(cur.Close)
// Include a cached value with timestamp equal to END
cur.reset(START, END, Values{NewIntegerValue(1, 1)}, kc)
assert.NoError(t, cur.reset(START, END, Values{NewIntegerValue(1, 1)}, kc), "unexpected error resetting cursor")
var got []int64
ar := cur.Next()
@ -110,7 +147,7 @@ func TestDescendingCursor_SinglePointStartTime(t *testing.T) {
t.Cleanup(kc.Close)
cur := newIntegerArrayDescendingCursor()
t.Cleanup(cur.Close)
cur.reset(START, END, nil, kc)
assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor")
var got []int64
ar := cur.Next()
@ -158,7 +195,7 @@ func TestFileStore_DuplicatePoints(t *testing.T) {
kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, true)
t.Cleanup(kc.Close)
cur := newFloatArrayAscendingCursor()
cur.reset(START, END, nil, kc)
assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor")
t.Cleanup(cur.Close)
var got []int64
@ -178,7 +215,7 @@ func TestFileStore_DuplicatePoints(t *testing.T) {
kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, false)
t.Cleanup(kc.Close)
cur := newFloatArrayDescendingCursor()
cur.reset(START, END, nil, kc)
assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor")
t.Cleanup(cur.Close)
var got []int64
@ -255,7 +292,7 @@ func TestFileStore_MergeBlocksLargerThat1000_SecondEntirelyContained(t *testing.
t.Cleanup(kc.Close)
cur := newFloatArrayAscendingCursor()
t.Cleanup(cur.Close)
cur.reset(START, END, nil, kc)
assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor")
exp := makeTs(1000, 800, 10)
exp = append(exp, makeTs(1005, 400, 10)...)
@ -279,7 +316,7 @@ func TestFileStore_MergeBlocksLargerThat1000_SecondEntirelyContained(t *testing.
t.Cleanup(kc.Close)
cur := newFloatArrayDescendingCursor()
t.Cleanup(cur.Close)
cur.reset(START, END, nil, kc)
assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor")
exp := makeTs(1000, 800, 10)
exp = append(exp, makeTs(1005, 400, 10)...)
@ -360,7 +397,7 @@ func TestFileStore_MergeBlocksLargerThat1000_MultipleBlocksInEachFile(t *testing
t.Cleanup(kc.Close)
cur := newFloatArrayAscendingCursor()
t.Cleanup(cur.Close)
cur.reset(START, END, nil, kc)
assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor")
exp := makeArray(1000, 3500, 10, 1.01)
a2 := makeArray(4005, 3500, 5, 2.01)
@ -388,7 +425,7 @@ func TestFileStore_MergeBlocksLargerThat1000_MultipleBlocksInEachFile(t *testing
t.Cleanup(kc.Close)
cur := newFloatArrayDescendingCursor()
t.Cleanup(cur.Close)
cur.reset(START, END, nil, kc)
assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor")
exp := makeArray(1000, 3500, 10, 1.01)
a2 := makeArray(4005, 3500, 5, 2.01)
@ -456,7 +493,7 @@ func TestFileStore_SeekBoundaries(t *testing.T) {
t.Cleanup(kc.Close)
cur := newFloatArrayAscendingCursor()
t.Cleanup(cur.Close)
cur.reset(START, END, nil, kc)
assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor")
exp := makeArray(1000, 100, 1, 1.01)
@ -482,7 +519,7 @@ func TestFileStore_SeekBoundaries(t *testing.T) {
t.Cleanup(kc.Close)
cur := newFloatArrayAscendingCursor()
t.Cleanup(cur.Close)
cur.reset(START, END, nil, kc)
assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor")
exp := makeArray(1050, 50, 1, 1.01)
a2 := makeArray(1100, 50, 1, 2.01)
@ -510,7 +547,7 @@ func TestFileStore_SeekBoundaries(t *testing.T) {
t.Cleanup(kc.Close)
cur := newFloatArrayDescendingCursor()
t.Cleanup(cur.Close)
cur.reset(START, END, nil, kc)
assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor")
exp := makeArray(1000, 100, 1, 1.01)
sort.Sort(sort.Reverse(&FloatArray{exp}))
@ -537,7 +574,7 @@ func TestFileStore_SeekBoundaries(t *testing.T) {
t.Cleanup(kc.Close)
cur := newFloatArrayDescendingCursor()
t.Cleanup(cur.Close)
cur.reset(START, END, nil, kc)
assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor")
exp := makeArray(1050, 50, 1, 1.01)
a2 := makeArray(1100, 50, 1, 2.01)