From e2e954b47aa32572fc2ec8b814e6fd9c9fee3e70 Mon Sep 17 00:00:00 2001 From: George Date: Wed, 26 Aug 2020 12:10:00 +0100 Subject: [PATCH] feat(kv): add support for WithCursorLimit to ForwardCursor (#17524) --- bolt/kv.go | 22 ++++++++++-- inmem/kv.go | 7 ++++ kv/store.go | 9 +++++ testing/kv.go | 96 +++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 132 insertions(+), 2 deletions(-) diff --git a/bolt/kv.go b/bolt/kv.go index 3616a91361..97a00121e6 100644 --- a/bolt/kv.go +++ b/bolt/kv.go @@ -304,6 +304,7 @@ type Cursor struct { config kv.CursorConfig closed bool + seen int } // Close sets the closed to closed @@ -351,12 +352,16 @@ func (c *Cursor) Last() ([]byte, []byte) { // Next retrieves the next key in the bucket. func (c *Cursor) Next() (k []byte, v []byte) { - if c.closed || (c.key != nil && c.missingPrefix(c.key)) { + if c.closed || + c.atLimit() || + (c.key != nil && c.missingPrefix(c.key)) { return nil, nil } + // get and unset previously seeked values if they exist k, v, c.key, c.value = c.key, c.value, nil, nil if len(k) > 0 || len(v) > 0 { + c.seen++ return } @@ -369,18 +374,24 @@ func (c *Cursor) Next() (k []byte, v []byte) { if (len(k) == 0 && len(v) == 0) || c.missingPrefix(k) { return nil, nil } + + c.seen++ + return k, v } // Prev retrieves the previous key in the bucket. func (c *Cursor) Prev() (k []byte, v []byte) { - if c.closed || (c.key != nil && c.missingPrefix(c.key)) { + if c.closed || + c.atLimit() || + (c.key != nil && c.missingPrefix(c.key)) { return nil, nil } // get and unset previously seeked values if they exist k, v, c.key, c.value = c.key, c.value, nil, nil if len(k) > 0 && len(v) > 0 { + c.seen++ return } @@ -393,6 +404,9 @@ func (c *Cursor) Prev() (k []byte, v []byte) { if (len(k) == 0 && len(v) == 0) || c.missingPrefix(k) { return nil, nil } + + c.seen++ + return k, v } @@ -400,6 +414,10 @@ func (c *Cursor) missingPrefix(key []byte) bool { return c.config.Prefix != nil && !bytes.HasPrefix(key, c.config.Prefix) } +func (c *Cursor) atLimit() bool { + return c.config.Limit != nil && c.seen >= *c.config.Limit +} + // Err always returns nil as nothing can go wrong™ during iteration func (c *Cursor) Err() error { return nil diff --git a/inmem/kv.go b/inmem/kv.go index c097e7c0f8..ebf17352c0 100644 --- a/inmem/kv.go +++ b/inmem/kv.go @@ -324,6 +324,7 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward fn = config.Hints.PredicateFn iterate = b.ascend skipFirst = config.SkipFirst + seen int ) if config.Direction == kv.CursorDescending { @@ -350,6 +351,11 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward return true } + // enforce limit + if config.Limit != nil && seen >= *config.Limit { + return false + } + j, ok := i.(*item) if !ok { batch = append(batch, pair{err: fmt.Errorf("error item is type %T not *item", i)}) @@ -363,6 +369,7 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward if fn == nil || fn(j.key, j.value) { batch = append(batch, pair{Pair: kv.Pair{Key: j.key, Value: j.value}}) + seen++ } if len(batch) < cursorBatchSize { diff --git a/kv/store.go b/kv/store.go index db237f3753..32e4544292 100644 --- a/kv/store.go +++ b/kv/store.go @@ -172,6 +172,7 @@ type CursorConfig struct { Hints CursorHints Prefix []byte SkipFirst bool + Limit *int } // NewCursorConfig constructs and configures a CursorConfig used to configure @@ -223,6 +224,14 @@ func WithCursorSkipFirstItem() CursorOption { } } +// WithCursorLimit restricts the number of key values return by the cursor +// to the provided limit count. +func WithCursorLimit(limit int) CursorOption { + return func(c *CursorConfig) { + c.Limit = &limit + } +} + // VisitFunc is called for each k, v byte slice pair from the underlying source bucket // which are found in the index bucket for a provided foreign key. type VisitFunc func(k, v []byte) error diff --git a/testing/kv.go b/testing/kv.go index e15f447e70..00303e13cd 100644 --- a/testing/kv.go +++ b/testing/kv.go @@ -838,6 +838,23 @@ func KVForwardCursor( }, exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03", "bbb/00"}, }, + { + name: "limit", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: pairs( + "aa/00", "aa/01", + "aaa/00", "aaa/01", "aaa/02", "aaa/03", + "bbb/00", "bbb/01", "bbb/02"), + }, + args: args{ + seek: "aaa", + opts: []kv.CursorOption{ + kv.WithCursorLimit(4), + }, + }, + exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03"}, + }, { name: "prefix - no hints", fields: KVStoreFields{ @@ -856,6 +873,26 @@ func KVForwardCursor( }, exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03"}, }, + + { + name: "prefix with limit", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: pairs( + "aa/00", "aa/01", + "aaa/00", "aaa/01", "aaa/02", "aaa/03", + "bbb/00", "bbb/01", "bbb/02"), + }, + args: args{ + seek: "aaa/00", + until: "bbb/02", + opts: []kv.CursorOption{ + kv.WithCursorPrefix([]byte("aaa")), + kv.WithCursorLimit(2), + }, + }, + exp: []string{"aaa/00", "aaa/01"}, + }, { name: "prefix - skip first", fields: KVStoreFields{ @@ -875,6 +912,26 @@ func KVForwardCursor( }, exp: []string{"aaa/01", "aaa/02", "aaa/03"}, }, + { + name: "prefix - skip first with limit", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: pairs( + "aa/00", "aa/01", + "aaa/00", "aaa/01", "aaa/02", "aaa/03", + "bbb/00", "bbb/01", "bbb/02"), + }, + args: args{ + seek: "aaa/00", + until: "bbb/02", + opts: []kv.CursorOption{ + kv.WithCursorPrefix([]byte("aaa")), + kv.WithCursorSkipFirstItem(), + kv.WithCursorLimit(2), + }, + }, + exp: []string{"aaa/01", "aaa/02"}, + }, { name: "prefix - skip first (one item)", fields: KVStoreFields{ @@ -1002,6 +1059,25 @@ func KVForwardCursor( }, exp: []string{"bbb/00", "aaa/03", "aaa/02", "aaa/01", "aaa/00"}, }, + { + name: "no hints - descending - with limit", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: pairs( + "aa/00", "aa/01", + "aaa/00", "aaa/01", "aaa/02", "aaa/03", + "bbb/00", "bbb/01", "bbb/02"), + }, + args: args{ + seek: "bbb/00", + until: "aaa/00", + opts: []kv.CursorOption{ + kv.WithCursorDirection(kv.CursorDescending), + kv.WithCursorLimit(3), + }, + }, + exp: []string{"bbb/00", "aaa/03", "aaa/02"}, + }, { name: "prefixed - no hints - descending", fields: KVStoreFields{ @@ -1021,6 +1097,26 @@ func KVForwardCursor( }, exp: []string{"aaa/02", "aaa/01", "aaa/00"}, }, + { + name: "prefixed - no hints - descending - with limit", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: pairs( + "aa/00", "aa/01", + "aaa/00", "aaa/01", "aaa/02", "aaa/03", + "bbb/00", "bbb/01", "bbb/02"), + }, + args: args{ + seek: "aaa/02", + until: "aa/", + opts: []kv.CursorOption{ + kv.WithCursorPrefix([]byte("aaa/")), + kv.WithCursorDirection(kv.CursorDescending), + kv.WithCursorLimit(2), + }, + }, + exp: []string{"aaa/02", "aaa/01"}, + }, { name: "start hint - descending", fields: KVStoreFields{