fix(storage): cursor requests are [start, stop] instead of [start, stop) (#21318)
The cursors were previously [start, stop) to be consistent with how flux requests data, but the underlying storage file store was [start, stop] because that's how influxql read data. This reverts back the cursor behavior so that it is now [start, stop] everywhere and the conversion from [start, stop) to [start, stop] is performed when doing the cursor request to get the next cursor. Co-authored-by: Sam Arnold <sarnold@influxdata.com>pull/21345/head^2
parent
c9a4bd436f
commit
7766672797
|
@ -88,12 +88,12 @@ func (i *mockStringArrayCursor) Next() *cursors.StringArray {
|
|||
}
|
||||
|
||||
type mockCursorIterator struct {
|
||||
newCursorFn func() cursors.Cursor
|
||||
newCursorFn func(req *cursors.CursorRequest) cursors.Cursor
|
||||
statsFn func() cursors.CursorStats
|
||||
}
|
||||
|
||||
func (i *mockCursorIterator) Next(ctx context.Context, req *cursors.CursorRequest) (cursors.Cursor, error) {
|
||||
return i.newCursorFn(), nil
|
||||
return i.newCursorFn(req), nil
|
||||
}
|
||||
func (i *mockCursorIterator) Stats() cursors.CursorStats {
|
||||
if i.statsFn == nil {
|
||||
|
@ -114,7 +114,7 @@ func newMockReadCursor(keys ...string) mockReadCursor {
|
|||
rows[i].Tags = rows[i].SeriesTags.Clone()
|
||||
var itrs cursors.CursorIterators
|
||||
cur := &mockCursorIterator{
|
||||
newCursorFn: func() cursors.Cursor {
|
||||
newCursorFn: func(req *cursors.CursorRequest) cursors.Cursor {
|
||||
return &mockIntegerArrayCursor{}
|
||||
},
|
||||
statsFn: func() cursors.CursorStats {
|
||||
|
@ -260,7 +260,7 @@ func TestNewWindowAggregateResultSet_UnsupportedTyped(t *testing.T) {
|
|||
)
|
||||
for i := range newCursor.rows[0].Query {
|
||||
newCursor.rows[0].Query[i] = &mockCursorIterator{
|
||||
newCursorFn: func() cursors.Cursor {
|
||||
newCursorFn: func(req *cursors.CursorRequest) cursors.Cursor {
|
||||
return &mockStringArrayCursor{}
|
||||
},
|
||||
}
|
||||
|
@ -289,3 +289,47 @@ func TestNewWindowAggregateResultSet_UnsupportedTyped(t *testing.T) {
|
|||
t.Fatalf("unexpected error:\n\t- %q\n\t+ %q", want, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewWindowAggregateResultSet_TimeRange(t *testing.T) {
|
||||
newCursor := newMockReadCursor(
|
||||
"clicks click=1 1",
|
||||
)
|
||||
for i := range newCursor.rows[0].Query {
|
||||
newCursor.rows[0].Query[i] = &mockCursorIterator{
|
||||
newCursorFn: func(req *cursors.CursorRequest) cursors.Cursor {
|
||||
if want, got := int64(0), req.StartTime; want != got {
|
||||
t.Errorf("unexpected start time -want/+got:\n\t- %d\n\t+ %d", want, got)
|
||||
}
|
||||
if want, got := int64(29), req.EndTime; want != got {
|
||||
t.Errorf("unexpected end time -want/+got:\n\t- %d\n\t+ %d", want, got)
|
||||
}
|
||||
return &mockIntegerArrayCursor{}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
req := datatypes.ReadWindowAggregateRequest{
|
||||
Range: datatypes.TimestampRange{
|
||||
Start: 0,
|
||||
End: 30,
|
||||
},
|
||||
Aggregate: []*datatypes.Aggregate{
|
||||
{
|
||||
Type: datatypes.AggregateTypeCount,
|
||||
},
|
||||
},
|
||||
Window: &datatypes.Window{
|
||||
Every: &datatypes.Duration{Nsecs: int64(time.Minute)},
|
||||
},
|
||||
}
|
||||
|
||||
resultSet, err := reads.NewWindowAggregateResultSet(ctx, &req, &newCursor)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
|
||||
if !resultSet.Next() {
|
||||
t.Fatal("expected result")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -71,13 +71,19 @@ type multiShardArrayCursors struct {
|
|||
}
|
||||
}
|
||||
|
||||
// newMultiShardArrayCursors is a factory for creating cursors for each series key.
|
||||
// The range of the cursor is [start, end). The start time is the lower absolute time
|
||||
// and the end time is the higher absolute time regardless of ascending or descending order.
|
||||
func newMultiShardArrayCursors(ctx context.Context, start, end int64, asc bool) *multiShardArrayCursors {
|
||||
// When we construct the CursorRequest, we translate the time range
|
||||
// from [start, stop) to [start, stop]. The cursor readers from storage are
|
||||
// inclusive on both ends and we perform that conversion here.
|
||||
m := &multiShardArrayCursors{
|
||||
ctx: ctx,
|
||||
req: cursors.CursorRequest{
|
||||
Ascending: asc,
|
||||
StartTime: start,
|
||||
EndTime: end,
|
||||
EndTime: end - 1,
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -6,11 +6,11 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
|
||||
"github.com/influxdata/influxdb/v2/models"
|
||||
"github.com/influxdata/influxdb/v2/pkg/data/gen"
|
||||
"github.com/influxdata/influxdb/v2/storage/reads"
|
||||
"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/cursors"
|
||||
)
|
||||
|
||||
func TestNewGroupResultSet_Sorting(t *testing.T) {
|
||||
|
@ -463,3 +463,41 @@ func BenchmarkNewGroupResultSet_GroupBy(b *testing.B) {
|
|||
rs.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewGroupResultSet_TimeRange(t *testing.T) {
|
||||
newCursor := newMockReadCursor(
|
||||
"clicks click=1 1",
|
||||
)
|
||||
for i := range newCursor.rows {
|
||||
newCursor.rows[0].Query[i] = &mockCursorIterator{
|
||||
newCursorFn: func(req *cursors.CursorRequest) cursors.Cursor {
|
||||
if want, got := int64(0), req.StartTime; want != got {
|
||||
t.Errorf("unexpected start time -want/+got:\n\t- %d\n\t+ %d", want, got)
|
||||
}
|
||||
if want, got := int64(29), req.EndTime; want != got {
|
||||
t.Errorf("unexpected end time -want/+got:\n\t- %d\n\t+ %d", want, got)
|
||||
}
|
||||
return &mockIntegerArrayCursor{}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
req := datatypes.ReadGroupRequest{
|
||||
Range: datatypes.TimestampRange{
|
||||
Start: 0,
|
||||
End: 30,
|
||||
},
|
||||
}
|
||||
|
||||
resultSet := reads.NewGroupResultSet(ctx, &req, func() (reads.SeriesCursor, error) {
|
||||
return &newCursor, nil
|
||||
})
|
||||
groupByCursor := resultSet.Next()
|
||||
if groupByCursor == nil {
|
||||
t.Fatal("unexpected: groupByCursor was nil")
|
||||
}
|
||||
if groupByCursor.Next() {
|
||||
t.Fatal("unexpected: groupByCursor.Next should not have advanced")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,10 @@ type resultSet struct {
|
|||
arrayCursors multiShardCursors
|
||||
}
|
||||
|
||||
// TODO(jsternberg): The range is [start, end) for this function which is consistent
|
||||
// with the documented interface for datatypes.ReadFilterRequest. This function should
|
||||
// be refactored to take in a datatypes.ReadFilterRequest similar to the other
|
||||
// ResultSet functions.
|
||||
func NewFilteredResultSet(ctx context.Context, start, end int64, seriesCursor SeriesCursor) ResultSet {
|
||||
return &resultSet{
|
||||
ctx: ctx,
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
package reads_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influxdb/v2/storage/reads"
|
||||
"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/cursors"
|
||||
)
|
||||
|
||||
func TestNewFilteredResultSet_TimeRange(t *testing.T) {
|
||||
newCursor := newMockReadCursor(
|
||||
"clicks click=1 1",
|
||||
)
|
||||
for i := range newCursor.rows {
|
||||
newCursor.rows[0].Query[i] = &mockCursorIterator{
|
||||
newCursorFn: func(req *cursors.CursorRequest) cursors.Cursor {
|
||||
if want, got := int64(0), req.StartTime; want != got {
|
||||
t.Errorf("unexpected start time -want/+got:\n\t- %d\n\t+ %d", want, got)
|
||||
}
|
||||
if want, got := int64(29), req.EndTime; want != got {
|
||||
t.Errorf("unexpected end time -want/+got:\n\t- %d\n\t+ %d", want, got)
|
||||
}
|
||||
return &mockIntegerArrayCursor{}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
req := datatypes.ReadFilterRequest{
|
||||
Range: datatypes.TimestampRange{
|
||||
Start: 0,
|
||||
End: 30,
|
||||
},
|
||||
}
|
||||
|
||||
resultSet := reads.NewFilteredResultSet(ctx, req.Range.Start, req.Range.End, &newCursor)
|
||||
if !resultSet.Next() {
|
||||
t.Fatal("expected result")
|
||||
}
|
||||
}
|
|
@ -39,13 +39,32 @@ type BooleanArrayCursor interface {
|
|||
Next() *BooleanArray
|
||||
}
|
||||
|
||||
// CursorRequest is a request to the storage engine for a cursor to be
|
||||
// created with the given name, tags, and field for a given direction
|
||||
// and time range.
|
||||
type CursorRequest struct {
|
||||
Name []byte
|
||||
Tags models.Tags
|
||||
Field string
|
||||
// Name is the measurement name a cursor is requested for.
|
||||
Name []byte
|
||||
|
||||
// Tags is the set of series tags a cursor is requested for.
|
||||
Tags models.Tags
|
||||
|
||||
// Field is the selected field for the cursor that is requested.
|
||||
Field string
|
||||
|
||||
// Ascending is whether the cursor should move in an ascending
|
||||
// or descending time order.
|
||||
Ascending bool
|
||||
|
||||
// StartTime is the start time of the cursor. It is the lower
|
||||
// absolute time regardless of the Ascending flag. This value
|
||||
// is an inclusive bound.
|
||||
StartTime int64
|
||||
EndTime int64
|
||||
|
||||
// EndTime is the end time of the cursor. It is the higher
|
||||
// absolute time regardless of the Ascending flag. This value
|
||||
// is an inclusive bound.
|
||||
EndTime int64
|
||||
}
|
||||
|
||||
type CursorIterator interface {
|
||||
|
|
|
@ -135,9 +135,10 @@ func (c *floatArrayAscendingCursor) Next() *tsdb.FloatArray {
|
|||
}
|
||||
}
|
||||
|
||||
if pos > 0 && c.res.Timestamps[pos-1] >= c.end {
|
||||
// Strip timestamps from after the end time.
|
||||
if pos > 0 && c.res.Timestamps[pos-1] > c.end {
|
||||
pos -= 2
|
||||
for pos >= 0 && c.res.Timestamps[pos] >= c.end {
|
||||
for pos >= 0 && c.res.Timestamps[pos] > c.end {
|
||||
pos--
|
||||
}
|
||||
pos++
|
||||
|
@ -182,17 +183,20 @@ func newFloatArrayDescendingCursor() *floatArrayDescendingCursor {
|
|||
}
|
||||
|
||||
func (c *floatArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) {
|
||||
// Search for the time value greater than the seek time (not included)
|
||||
// and then move our position back one which will include the values in
|
||||
// our time range.
|
||||
c.end = end
|
||||
c.cache.values = cacheValues
|
||||
c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool {
|
||||
return c.cache.values[i].UnixNano() >= seek
|
||||
return c.cache.values[i].UnixNano() > seek
|
||||
})
|
||||
c.cache.pos--
|
||||
|
||||
c.tsm.keyCursor = tsmKeyCursor
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadFloatArrayBlock(c.tsm.buf)
|
||||
c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool {
|
||||
return c.tsm.values.Timestamps[i] >= seek
|
||||
return c.tsm.values.Timestamps[i] > seek
|
||||
})
|
||||
c.tsm.pos--
|
||||
}
|
||||
|
@ -270,6 +274,7 @@ func (c *floatArrayDescendingCursor) Next() *tsdb.FloatArray {
|
|||
}
|
||||
}
|
||||
|
||||
// Strip timestamps from before the end time.
|
||||
if pos > 0 && c.res.Timestamps[pos-1] < c.end {
|
||||
pos -= 2
|
||||
for pos >= 0 && c.res.Timestamps[pos] < c.end {
|
||||
|
@ -412,9 +417,10 @@ func (c *integerArrayAscendingCursor) Next() *tsdb.IntegerArray {
|
|||
}
|
||||
}
|
||||
|
||||
if pos > 0 && c.res.Timestamps[pos-1] >= c.end {
|
||||
// Strip timestamps from after the end time.
|
||||
if pos > 0 && c.res.Timestamps[pos-1] > c.end {
|
||||
pos -= 2
|
||||
for pos >= 0 && c.res.Timestamps[pos] >= c.end {
|
||||
for pos >= 0 && c.res.Timestamps[pos] > c.end {
|
||||
pos--
|
||||
}
|
||||
pos++
|
||||
|
@ -459,17 +465,20 @@ func newIntegerArrayDescendingCursor() *integerArrayDescendingCursor {
|
|||
}
|
||||
|
||||
func (c *integerArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) {
|
||||
// Search for the time value greater than the seek time (not included)
|
||||
// and then move our position back one which will include the values in
|
||||
// our time range.
|
||||
c.end = end
|
||||
c.cache.values = cacheValues
|
||||
c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool {
|
||||
return c.cache.values[i].UnixNano() >= seek
|
||||
return c.cache.values[i].UnixNano() > seek
|
||||
})
|
||||
c.cache.pos--
|
||||
|
||||
c.tsm.keyCursor = tsmKeyCursor
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerArrayBlock(c.tsm.buf)
|
||||
c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool {
|
||||
return c.tsm.values.Timestamps[i] >= seek
|
||||
return c.tsm.values.Timestamps[i] > seek
|
||||
})
|
||||
c.tsm.pos--
|
||||
}
|
||||
|
@ -547,6 +556,7 @@ func (c *integerArrayDescendingCursor) Next() *tsdb.IntegerArray {
|
|||
}
|
||||
}
|
||||
|
||||
// Strip timestamps from before the end time.
|
||||
if pos > 0 && c.res.Timestamps[pos-1] < c.end {
|
||||
pos -= 2
|
||||
for pos >= 0 && c.res.Timestamps[pos] < c.end {
|
||||
|
@ -689,9 +699,10 @@ func (c *unsignedArrayAscendingCursor) Next() *tsdb.UnsignedArray {
|
|||
}
|
||||
}
|
||||
|
||||
if pos > 0 && c.res.Timestamps[pos-1] >= c.end {
|
||||
// Strip timestamps from after the end time.
|
||||
if pos > 0 && c.res.Timestamps[pos-1] > c.end {
|
||||
pos -= 2
|
||||
for pos >= 0 && c.res.Timestamps[pos] >= c.end {
|
||||
for pos >= 0 && c.res.Timestamps[pos] > c.end {
|
||||
pos--
|
||||
}
|
||||
pos++
|
||||
|
@ -736,17 +747,20 @@ func newUnsignedArrayDescendingCursor() *unsignedArrayDescendingCursor {
|
|||
}
|
||||
|
||||
func (c *unsignedArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) {
|
||||
// Search for the time value greater than the seek time (not included)
|
||||
// and then move our position back one which will include the values in
|
||||
// our time range.
|
||||
c.end = end
|
||||
c.cache.values = cacheValues
|
||||
c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool {
|
||||
return c.cache.values[i].UnixNano() >= seek
|
||||
return c.cache.values[i].UnixNano() > seek
|
||||
})
|
||||
c.cache.pos--
|
||||
|
||||
c.tsm.keyCursor = tsmKeyCursor
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadUnsignedArrayBlock(c.tsm.buf)
|
||||
c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool {
|
||||
return c.tsm.values.Timestamps[i] >= seek
|
||||
return c.tsm.values.Timestamps[i] > seek
|
||||
})
|
||||
c.tsm.pos--
|
||||
}
|
||||
|
@ -824,6 +838,7 @@ func (c *unsignedArrayDescendingCursor) Next() *tsdb.UnsignedArray {
|
|||
}
|
||||
}
|
||||
|
||||
// Strip timestamps from before the end time.
|
||||
if pos > 0 && c.res.Timestamps[pos-1] < c.end {
|
||||
pos -= 2
|
||||
for pos >= 0 && c.res.Timestamps[pos] < c.end {
|
||||
|
@ -966,9 +981,10 @@ func (c *stringArrayAscendingCursor) Next() *tsdb.StringArray {
|
|||
}
|
||||
}
|
||||
|
||||
if pos > 0 && c.res.Timestamps[pos-1] >= c.end {
|
||||
// Strip timestamps from after the end time.
|
||||
if pos > 0 && c.res.Timestamps[pos-1] > c.end {
|
||||
pos -= 2
|
||||
for pos >= 0 && c.res.Timestamps[pos] >= c.end {
|
||||
for pos >= 0 && c.res.Timestamps[pos] > c.end {
|
||||
pos--
|
||||
}
|
||||
pos++
|
||||
|
@ -1013,17 +1029,20 @@ func newStringArrayDescendingCursor() *stringArrayDescendingCursor {
|
|||
}
|
||||
|
||||
func (c *stringArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) {
|
||||
// Search for the time value greater than the seek time (not included)
|
||||
// and then move our position back one which will include the values in
|
||||
// our time range.
|
||||
c.end = end
|
||||
c.cache.values = cacheValues
|
||||
c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool {
|
||||
return c.cache.values[i].UnixNano() >= seek
|
||||
return c.cache.values[i].UnixNano() > seek
|
||||
})
|
||||
c.cache.pos--
|
||||
|
||||
c.tsm.keyCursor = tsmKeyCursor
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadStringArrayBlock(c.tsm.buf)
|
||||
c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool {
|
||||
return c.tsm.values.Timestamps[i] >= seek
|
||||
return c.tsm.values.Timestamps[i] > seek
|
||||
})
|
||||
c.tsm.pos--
|
||||
}
|
||||
|
@ -1101,6 +1120,7 @@ func (c *stringArrayDescendingCursor) Next() *tsdb.StringArray {
|
|||
}
|
||||
}
|
||||
|
||||
// Strip timestamps from before the end time.
|
||||
if pos > 0 && c.res.Timestamps[pos-1] < c.end {
|
||||
pos -= 2
|
||||
for pos >= 0 && c.res.Timestamps[pos] < c.end {
|
||||
|
@ -1243,9 +1263,10 @@ func (c *booleanArrayAscendingCursor) Next() *tsdb.BooleanArray {
|
|||
}
|
||||
}
|
||||
|
||||
if pos > 0 && c.res.Timestamps[pos-1] >= c.end {
|
||||
// Strip timestamps from after the end time.
|
||||
if pos > 0 && c.res.Timestamps[pos-1] > c.end {
|
||||
pos -= 2
|
||||
for pos >= 0 && c.res.Timestamps[pos] >= c.end {
|
||||
for pos >= 0 && c.res.Timestamps[pos] > c.end {
|
||||
pos--
|
||||
}
|
||||
pos++
|
||||
|
@ -1290,17 +1311,20 @@ func newBooleanArrayDescendingCursor() *booleanArrayDescendingCursor {
|
|||
}
|
||||
|
||||
func (c *booleanArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) {
|
||||
// Search for the time value greater than the seek time (not included)
|
||||
// and then move our position back one which will include the values in
|
||||
// our time range.
|
||||
c.end = end
|
||||
c.cache.values = cacheValues
|
||||
c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool {
|
||||
return c.cache.values[i].UnixNano() >= seek
|
||||
return c.cache.values[i].UnixNano() > seek
|
||||
})
|
||||
c.cache.pos--
|
||||
|
||||
c.tsm.keyCursor = tsmKeyCursor
|
||||
c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanArrayBlock(c.tsm.buf)
|
||||
c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool {
|
||||
return c.tsm.values.Timestamps[i] >= seek
|
||||
return c.tsm.values.Timestamps[i] > seek
|
||||
})
|
||||
c.tsm.pos--
|
||||
}
|
||||
|
@ -1378,6 +1402,7 @@ func (c *booleanArrayDescendingCursor) Next() *tsdb.BooleanArray {
|
|||
}
|
||||
}
|
||||
|
||||
// Strip timestamps from before the end time.
|
||||
if pos > 0 && c.res.Timestamps[pos-1] < c.end {
|
||||
pos -= 2
|
||||
for pos >= 0 && c.res.Timestamps[pos] < c.end {
|
||||
|
|
|
@ -134,9 +134,10 @@ func (c *{{$type}}) Next() {{$arrayType}} {
|
|||
}
|
||||
}
|
||||
|
||||
if pos > 0 && c.res.Timestamps[pos-1] >= c.end {
|
||||
// Strip timestamps from after the end time.
|
||||
if pos > 0 && c.res.Timestamps[pos-1] > c.end {
|
||||
pos -= 2
|
||||
for pos >= 0 && c.res.Timestamps[pos] >= c.end {
|
||||
for pos >= 0 && c.res.Timestamps[pos] > c.end {
|
||||
pos--
|
||||
}
|
||||
pos++
|
||||
|
@ -184,17 +185,20 @@ func new{{$Type}}() *{{$type}} {
|
|||
}
|
||||
|
||||
func (c *{{$type}}) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) {
|
||||
// Search for the time value greater than the seek time (not included)
|
||||
// and then move our position back one which will include the values in
|
||||
// our time range.
|
||||
c.end = end
|
||||
c.cache.values = cacheValues
|
||||
c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool {
|
||||
return c.cache.values[i].UnixNano() >= seek
|
||||
return c.cache.values[i].UnixNano() > seek
|
||||
})
|
||||
c.cache.pos--
|
||||
|
||||
c.tsm.keyCursor = tsmKeyCursor
|
||||
c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}ArrayBlock(c.tsm.buf)
|
||||
c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool {
|
||||
return c.tsm.values.Timestamps[i] >= seek
|
||||
return c.tsm.values.Timestamps[i] > seek
|
||||
})
|
||||
c.tsm.pos--
|
||||
}
|
||||
|
@ -272,6 +276,7 @@ func (c *{{$type}}) Next() {{$arrayType}} {
|
|||
}
|
||||
}
|
||||
|
||||
// Strip timestamps from before the end time.
|
||||
if pos > 0 && c.res.Timestamps[pos-1] < c.end {
|
||||
pos -= 2
|
||||
for pos >= 0 && c.res.Timestamps[pos] < c.end {
|
||||
|
|
|
@ -57,7 +57,7 @@ func (q *arrayCursorIterator) Next(ctx context.Context, r *tsdb.CursorRequest) (
|
|||
var opt query.IteratorOptions
|
||||
opt.Ascending = r.Ascending
|
||||
opt.StartTime = r.StartTime
|
||||
opt.EndTime = r.EndTime
|
||||
opt.EndTime = r.EndTime // inclusive
|
||||
|
||||
// Return appropriate cursor based on type.
|
||||
switch f.Type {
|
||||
|
|
|
@ -160,7 +160,7 @@ func TestFileStore_DuplicatePoints(t *testing.T) {
|
|||
_ = fs.Replace(nil, files)
|
||||
|
||||
t.Run("ascending", func(t *testing.T) {
|
||||
const START, END = 21, 100
|
||||
const START, END = 0, 100
|
||||
kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, true)
|
||||
defer kc.Close()
|
||||
cur := newFloatArrayAscendingCursor()
|
||||
|
@ -179,7 +179,7 @@ func TestFileStore_DuplicatePoints(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("descending", func(t *testing.T) {
|
||||
const START, END = 51, 0
|
||||
const START, END = 100, 0
|
||||
kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, false)
|
||||
defer kc.Close()
|
||||
cur := newFloatArrayDescendingCursor()
|
||||
|
@ -192,7 +192,7 @@ func TestFileStore_DuplicatePoints(t *testing.T) {
|
|||
ar = cur.Next()
|
||||
}
|
||||
|
||||
if exp := []int64{46, 44, 40, 21}; !cmp.Equal(got, exp) {
|
||||
if exp := []int64{51, 46, 44, 40, 21}; !cmp.Equal(got, exp) {
|
||||
t.Errorf("unexpected values; -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
})
|
||||
|
@ -379,8 +379,8 @@ func TestFileStore_MergeBlocksLargerThat1000_MultipleBlocksInEachFile(t *testing
|
|||
}
|
||||
|
||||
assert.Len(t, got.Timestamps, exp.Len())
|
||||
assert.Equal(t, got.Timestamps, exp.Timestamps)
|
||||
assert.Equal(t, got.Values, exp.Values)
|
||||
assert.Equal(t, exp.Timestamps, got.Timestamps)
|
||||
assert.Equal(t, exp.Values, got.Values)
|
||||
})
|
||||
|
||||
t.Run("descending", func(t *testing.T) {
|
||||
|
@ -407,7 +407,152 @@ func TestFileStore_MergeBlocksLargerThat1000_MultipleBlocksInEachFile(t *testing
|
|||
}
|
||||
|
||||
assert.Len(t, got.Timestamps, exp.Len())
|
||||
assert.Equal(t, got.Timestamps, exp.Timestamps)
|
||||
assert.Equal(t, got.Values, exp.Values)
|
||||
assert.Equal(t, exp.Timestamps, got.Timestamps)
|
||||
assert.Equal(t, exp.Values, got.Values)
|
||||
})
|
||||
}
|
||||
|
||||
func TestFileStore_SeekBoundaries(t *testing.T) {
|
||||
dir := MustTempDir()
|
||||
defer os.RemoveAll(dir)
|
||||
fs := NewFileStore(dir)
|
||||
|
||||
// makeVals creates count points starting at ts and incrementing by step
|
||||
makeVals := func(ts, count, step int64, v float64) []Value {
|
||||
vals := make([]Value, count)
|
||||
for i := range vals {
|
||||
vals[i] = NewFloatValue(ts, v)
|
||||
ts += step
|
||||
}
|
||||
return vals
|
||||
}
|
||||
|
||||
makeArray := func(ts, count, step int64, v float64) *cursors.FloatArray {
|
||||
ar := cursors.NewFloatArrayLen(int(count))
|
||||
for i := range ar.Timestamps {
|
||||
ar.Timestamps[i] = ts
|
||||
ar.Values[i] = v
|
||||
ts += step
|
||||
}
|
||||
return ar
|
||||
}
|
||||
|
||||
// Setup 2 files where the seek time matches the end time.
|
||||
data := []keyValues{
|
||||
{"m,_field=v#!~#v", makeVals(1000, 100, 1, 1.01)},
|
||||
{"m,_field=v#!~#v", makeVals(1100, 100, 1, 2.01)},
|
||||
}
|
||||
|
||||
files, err := newFiles(dir, data...)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating files: %s", err)
|
||||
}
|
||||
|
||||
_ = fs.Replace(nil, files)
|
||||
|
||||
t.Run("ascending full", func(t *testing.T) {
|
||||
const START, END = 1000, 1099
|
||||
kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, true)
|
||||
defer kc.Close()
|
||||
cur := newFloatArrayAscendingCursor()
|
||||
cur.reset(START, END, nil, kc)
|
||||
|
||||
exp := makeArray(1000, 100, 1, 1.01)
|
||||
|
||||
got := cursors.NewFloatArrayLen(exp.Len())
|
||||
got.Timestamps = got.Timestamps[:0]
|
||||
got.Values = got.Values[:0]
|
||||
|
||||
ar := cur.Next()
|
||||
for ar.Len() > 0 {
|
||||
got.Timestamps = append(got.Timestamps, ar.Timestamps...)
|
||||
got.Values = append(got.Values, ar.Values...)
|
||||
ar = cur.Next()
|
||||
}
|
||||
|
||||
assert.Len(t, got.Timestamps, exp.Len())
|
||||
assert.Equal(t, exp.Timestamps, got.Timestamps)
|
||||
assert.Equal(t, exp.Values, got.Values)
|
||||
})
|
||||
|
||||
t.Run("ascending split", func(t *testing.T) {
|
||||
const START, END = 1050, 1149
|
||||
kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, true)
|
||||
defer kc.Close()
|
||||
cur := newFloatArrayAscendingCursor()
|
||||
cur.reset(START, END, nil, kc)
|
||||
|
||||
exp := makeArray(1050, 50, 1, 1.01)
|
||||
a2 := makeArray(1100, 50, 1, 2.01)
|
||||
exp.Merge(a2)
|
||||
|
||||
got := cursors.NewFloatArrayLen(exp.Len())
|
||||
got.Timestamps = got.Timestamps[:0]
|
||||
got.Values = got.Values[:0]
|
||||
|
||||
ar := cur.Next()
|
||||
for ar.Len() > 0 {
|
||||
got.Timestamps = append(got.Timestamps, ar.Timestamps...)
|
||||
got.Values = append(got.Values, ar.Values...)
|
||||
ar = cur.Next()
|
||||
}
|
||||
|
||||
assert.Len(t, got.Timestamps, exp.Len())
|
||||
assert.Equal(t, exp.Timestamps, got.Timestamps)
|
||||
assert.Equal(t, exp.Values, got.Values)
|
||||
})
|
||||
|
||||
t.Run("descending full", func(t *testing.T) {
|
||||
const START, END = 1099, 1000
|
||||
kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, false)
|
||||
defer kc.Close()
|
||||
cur := newFloatArrayDescendingCursor()
|
||||
cur.reset(START, END, nil, kc)
|
||||
|
||||
exp := makeArray(1000, 100, 1, 1.01)
|
||||
sort.Sort(sort.Reverse(&FloatArray{exp}))
|
||||
|
||||
got := cursors.NewFloatArrayLen(exp.Len())
|
||||
got.Timestamps = got.Timestamps[:0]
|
||||
got.Values = got.Values[:0]
|
||||
|
||||
ar := cur.Next()
|
||||
for ar.Len() > 0 {
|
||||
got.Timestamps = append(got.Timestamps, ar.Timestamps...)
|
||||
got.Values = append(got.Values, ar.Values...)
|
||||
ar = cur.Next()
|
||||
}
|
||||
|
||||
assert.Len(t, got.Timestamps, exp.Len())
|
||||
assert.Equal(t, exp.Timestamps, got.Timestamps)
|
||||
assert.Equal(t, exp.Values, got.Values)
|
||||
})
|
||||
|
||||
t.Run("descending split", func(t *testing.T) {
|
||||
const START, END = 1149, 1050
|
||||
kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, false)
|
||||
defer kc.Close()
|
||||
cur := newFloatArrayDescendingCursor()
|
||||
cur.reset(START, END, nil, kc)
|
||||
|
||||
exp := makeArray(1050, 50, 1, 1.01)
|
||||
a2 := makeArray(1100, 50, 1, 2.01)
|
||||
exp.Merge(a2)
|
||||
sort.Sort(sort.Reverse(&FloatArray{exp}))
|
||||
|
||||
got := cursors.NewFloatArrayLen(exp.Len())
|
||||
got.Timestamps = got.Timestamps[:0]
|
||||
got.Values = got.Values[:0]
|
||||
|
||||
ar := cur.Next()
|
||||
for ar.Len() > 0 {
|
||||
got.Timestamps = append(got.Timestamps, ar.Timestamps...)
|
||||
got.Values = append(got.Values, ar.Values...)
|
||||
ar = cur.Next()
|
||||
}
|
||||
|
||||
assert.Len(t, got.Timestamps, exp.Len())
|
||||
assert.Equal(t, exp.Timestamps, got.Timestamps)
|
||||
assert.Equal(t, exp.Values, got.Values)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1944,7 +1944,7 @@ func TestEngine_CreateCursor_Ascending(t *testing.T) {
|
|||
Field: "value",
|
||||
Ascending: true,
|
||||
StartTime: 2,
|
||||
EndTime: 12,
|
||||
EndTime: 11,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -2004,7 +2004,7 @@ func TestEngine_CreateCursor_Descending(t *testing.T) {
|
|||
Field: "value",
|
||||
Ascending: false,
|
||||
StartTime: 1,
|
||||
EndTime: 11,
|
||||
EndTime: 10,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
|
Loading…
Reference in New Issue