feat(query/functions/limit): Add offset to limit functions

pull/10616/head
Nathaniel Cook 2018-05-29 17:09:47 -06:00
parent a073ff0538
commit cbd219941a
2 changed files with 106 additions and 33 deletions

View File

@ -16,7 +16,7 @@ const LimitKind = "limit"
// Currently offset is not supported.
type LimitOpSpec struct {
N int64 `json:"n"`
//Offset int64 `json:"offset"`
Offset int64 `json:"offset"`
}
var limitSignature = query.DefaultFunctionSignature()
@ -44,6 +44,12 @@ func createLimitOpSpec(args query.Arguments, a *query.Administration) (query.Ope
}
spec.N = n
if offset, ok, err := args.GetInt("offset"); err != nil {
return nil, err
} else if ok {
spec.Offset = offset
}
return spec, nil
}
@ -57,7 +63,7 @@ func (s *LimitOpSpec) Kind() query.OperationKind {
type LimitProcedureSpec struct {
N int64 `json:"n"`
//Offset int64 `json:"offset"`
Offset int64 `json:"offset"`
}
func newLimitProcedure(qs query.OperationSpec, pa plan.Administration) (plan.ProcedureSpec, error) {
@ -67,7 +73,7 @@ func newLimitProcedure(qs query.OperationSpec, pa plan.Administration) (plan.Pro
}
return &LimitProcedureSpec{
N: spec.N,
//Offset: spec.Offset,
Offset: spec.Offset,
}, nil
}
@ -76,8 +82,7 @@ func (s *LimitProcedureSpec) Kind() plan.ProcedureKind {
}
func (s *LimitProcedureSpec) Copy() plan.ProcedureSpec {
ns := new(LimitProcedureSpec)
ns.N = s.N
//ns.Offset = s.Offset
*ns = *s
return ns
}
@ -94,12 +99,14 @@ func (s *LimitProcedureSpec) PushDown(root *plan.Procedure, dup func() *plan.Pro
selectSpec = root.Spec.(*FromProcedureSpec)
selectSpec.LimitSet = false
selectSpec.PointsLimit = 0
//selectSpec.PointsOffset = 0
selectSpec.SeriesLimit = 0
selectSpec.SeriesOffset = 0
return
}
selectSpec.LimitSet = true
selectSpec.PointsLimit = s.N
//selectSpec.PointsOffset = s.Offset
selectSpec.SeriesLimit = 0
selectSpec.SeriesOffset = 0
}
@ -119,7 +126,7 @@ type limitTransformation struct {
d execute.Dataset
cache execute.BlockBuilderCache
n int
n, offset int
colMap []int
}
@ -129,6 +136,7 @@ func NewLimitTransformation(d execute.Dataset, cache execute.BlockBuilderCache,
d: d,
cache: cache,
n: int(spec.N),
offset: int(spec.Offset),
}
}
@ -155,19 +163,30 @@ func (t *limitTransformation) Process(id execute.DatasetID, b query.Block) error
// AppendBlock with limit
n := t.n
offset := t.offset
b.Do(func(cr query.ColReader) error {
if n <= 0 {
// Returning an error terminates iteration
return errors.New("finished")
}
l := cr.Len()
if l > n {
l = n
if l <= offset {
offset -= l
// Skip entire batch
return nil
}
n -= l
lcr := limitColReader{
start := offset
stop := l
count := stop - start
if count > n {
count = n
stop = start + count
}
n -= count
lcr := sliceColReader{
ColReader: cr,
n: l,
start: start,
stop: stop,
}
execute.AppendCols(lcr, builder, t.colMap)
return nil
@ -175,37 +194,37 @@ func (t *limitTransformation) Process(id execute.DatasetID, b query.Block) error
return nil
}
type limitColReader struct {
type sliceColReader struct {
query.ColReader
n int
start, stop int
}
func (cr limitColReader) Len() int {
return cr.n
func (cr sliceColReader) Len() int {
return cr.stop
}
func (cr limitColReader) Bools(j int) []bool {
return cr.ColReader.Bools(j)[:cr.n]
func (cr sliceColReader) Bools(j int) []bool {
return cr.ColReader.Bools(j)[cr.start:cr.stop]
}
func (cr limitColReader) Ints(j int) []int64 {
return cr.ColReader.Ints(j)[:cr.n]
func (cr sliceColReader) Ints(j int) []int64 {
return cr.ColReader.Ints(j)[cr.start:cr.stop]
}
func (cr limitColReader) UInts(j int) []uint64 {
return cr.ColReader.UInts(j)[:cr.n]
func (cr sliceColReader) UInts(j int) []uint64 {
return cr.ColReader.UInts(j)[cr.start:cr.stop]
}
func (cr limitColReader) Floats(j int) []float64 {
return cr.ColReader.Floats(j)[:cr.n]
func (cr sliceColReader) Floats(j int) []float64 {
return cr.ColReader.Floats(j)[cr.start:cr.stop]
}
func (cr limitColReader) Strings(j int) []string {
return cr.ColReader.Strings(j)[:cr.n]
func (cr sliceColReader) Strings(j int) []string {
return cr.ColReader.Strings(j)[cr.start:cr.stop]
}
func (cr limitColReader) Times(j int) []execute.Time {
return cr.ColReader.Times(j)[:cr.n]
func (cr sliceColReader) Times(j int) []execute.Time {
return cr.ColReader.Times(j)[cr.start:cr.stop]
}
func (t *limitTransformation) UpdateWatermark(id execute.DatasetID, mark execute.Time) error {

View File

@ -56,6 +56,60 @@ func TestLimit_Process(t *testing.T) {
},
}},
},
{
name: "one block with offset single batch",
spec: &functions.LimitProcedureSpec{
N: 1,
Offset: 1,
},
data: []query.Block{execute.CopyBlock(&executetest.Block{
ColMeta: []query.ColMeta{
{Label: "_time", Type: query.TTime},
{Label: "_value", Type: query.TFloat},
},
Data: [][]interface{}{
{execute.Time(1), 2.0},
{execute.Time(2), 1.0},
{execute.Time(3), 0.0},
},
}, executetest.UnlimitedAllocator)},
want: []*executetest.Block{{
ColMeta: []query.ColMeta{
{Label: "_time", Type: query.TTime},
{Label: "_value", Type: query.TFloat},
},
Data: [][]interface{}{
{execute.Time(2), 1.0},
},
}},
},
{
name: "one block with offset multiple batches",
spec: &functions.LimitProcedureSpec{
N: 1,
Offset: 1,
},
data: []query.Block{&executetest.Block{
ColMeta: []query.ColMeta{
{Label: "_time", Type: query.TTime},
{Label: "_value", Type: query.TFloat},
},
Data: [][]interface{}{
{execute.Time(1), 2.0},
{execute.Time(2), 1.0},
{execute.Time(3), 0.0},
},
}},
want: []*executetest.Block{{
ColMeta: []query.ColMeta{
{Label: "_time", Type: query.TTime},
{Label: "_value", Type: query.TFloat},
},
Data: [][]interface{}{
{execute.Time(2), 1.0},
},
}},
},
{
name: "multiple blocks",
spec: &functions.LimitProcedureSpec{