feat(kv): add support for WithCursorLimit to ForwardCursor (#17524)
parent
6f805cbc2b
commit
e2e954b47a
22
bolt/kv.go
22
bolt/kv.go
|
@ -304,6 +304,7 @@ type Cursor struct {
|
||||||
|
|
||||||
config kv.CursorConfig
|
config kv.CursorConfig
|
||||||
closed bool
|
closed bool
|
||||||
|
seen int
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close sets the closed to closed
|
// Close sets the closed to closed
|
||||||
|
@ -351,12 +352,16 @@ func (c *Cursor) Last() ([]byte, []byte) {
|
||||||
|
|
||||||
// Next retrieves the next key in the bucket.
|
// Next retrieves the next key in the bucket.
|
||||||
func (c *Cursor) Next() (k []byte, v []byte) {
|
func (c *Cursor) Next() (k []byte, v []byte) {
|
||||||
if c.closed || (c.key != nil && c.missingPrefix(c.key)) {
|
if c.closed ||
|
||||||
|
c.atLimit() ||
|
||||||
|
(c.key != nil && c.missingPrefix(c.key)) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// get and unset previously seeked values if they exist
|
// get and unset previously seeked values if they exist
|
||||||
k, v, c.key, c.value = c.key, c.value, nil, nil
|
k, v, c.key, c.value = c.key, c.value, nil, nil
|
||||||
if len(k) > 0 || len(v) > 0 {
|
if len(k) > 0 || len(v) > 0 {
|
||||||
|
c.seen++
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -369,18 +374,24 @@ func (c *Cursor) Next() (k []byte, v []byte) {
|
||||||
if (len(k) == 0 && len(v) == 0) || c.missingPrefix(k) {
|
if (len(k) == 0 && len(v) == 0) || c.missingPrefix(k) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.seen++
|
||||||
|
|
||||||
return k, v
|
return k, v
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prev retrieves the previous key in the bucket.
|
// Prev retrieves the previous key in the bucket.
|
||||||
func (c *Cursor) Prev() (k []byte, v []byte) {
|
func (c *Cursor) Prev() (k []byte, v []byte) {
|
||||||
if c.closed || (c.key != nil && c.missingPrefix(c.key)) {
|
if c.closed ||
|
||||||
|
c.atLimit() ||
|
||||||
|
(c.key != nil && c.missingPrefix(c.key)) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// get and unset previously seeked values if they exist
|
// get and unset previously seeked values if they exist
|
||||||
k, v, c.key, c.value = c.key, c.value, nil, nil
|
k, v, c.key, c.value = c.key, c.value, nil, nil
|
||||||
if len(k) > 0 && len(v) > 0 {
|
if len(k) > 0 && len(v) > 0 {
|
||||||
|
c.seen++
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -393,6 +404,9 @@ func (c *Cursor) Prev() (k []byte, v []byte) {
|
||||||
if (len(k) == 0 && len(v) == 0) || c.missingPrefix(k) {
|
if (len(k) == 0 && len(v) == 0) || c.missingPrefix(k) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.seen++
|
||||||
|
|
||||||
return k, v
|
return k, v
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -400,6 +414,10 @@ func (c *Cursor) missingPrefix(key []byte) bool {
|
||||||
return c.config.Prefix != nil && !bytes.HasPrefix(key, c.config.Prefix)
|
return c.config.Prefix != nil && !bytes.HasPrefix(key, c.config.Prefix)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Cursor) atLimit() bool {
|
||||||
|
return c.config.Limit != nil && c.seen >= *c.config.Limit
|
||||||
|
}
|
||||||
|
|
||||||
// Err always returns nil as nothing can go wrong™ during iteration
|
// Err always returns nil as nothing can go wrong™ during iteration
|
||||||
func (c *Cursor) Err() error {
|
func (c *Cursor) Err() error {
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -324,6 +324,7 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward
|
||||||
fn = config.Hints.PredicateFn
|
fn = config.Hints.PredicateFn
|
||||||
iterate = b.ascend
|
iterate = b.ascend
|
||||||
skipFirst = config.SkipFirst
|
skipFirst = config.SkipFirst
|
||||||
|
seen int
|
||||||
)
|
)
|
||||||
|
|
||||||
if config.Direction == kv.CursorDescending {
|
if config.Direction == kv.CursorDescending {
|
||||||
|
@ -350,6 +351,11 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// enforce limit
|
||||||
|
if config.Limit != nil && seen >= *config.Limit {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
j, ok := i.(*item)
|
j, ok := i.(*item)
|
||||||
if !ok {
|
if !ok {
|
||||||
batch = append(batch, pair{err: fmt.Errorf("error item is type %T not *item", i)})
|
batch = append(batch, pair{err: fmt.Errorf("error item is type %T not *item", i)})
|
||||||
|
@ -363,6 +369,7 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward
|
||||||
|
|
||||||
if fn == nil || fn(j.key, j.value) {
|
if fn == nil || fn(j.key, j.value) {
|
||||||
batch = append(batch, pair{Pair: kv.Pair{Key: j.key, Value: j.value}})
|
batch = append(batch, pair{Pair: kv.Pair{Key: j.key, Value: j.value}})
|
||||||
|
seen++
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(batch) < cursorBatchSize {
|
if len(batch) < cursorBatchSize {
|
||||||
|
|
|
@ -172,6 +172,7 @@ type CursorConfig struct {
|
||||||
Hints CursorHints
|
Hints CursorHints
|
||||||
Prefix []byte
|
Prefix []byte
|
||||||
SkipFirst bool
|
SkipFirst bool
|
||||||
|
Limit *int
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewCursorConfig constructs and configures a CursorConfig used to configure
|
// NewCursorConfig constructs and configures a CursorConfig used to configure
|
||||||
|
@ -223,6 +224,14 @@ func WithCursorSkipFirstItem() CursorOption {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithCursorLimit restricts the number of key values return by the cursor
|
||||||
|
// to the provided limit count.
|
||||||
|
func WithCursorLimit(limit int) CursorOption {
|
||||||
|
return func(c *CursorConfig) {
|
||||||
|
c.Limit = &limit
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// VisitFunc is called for each k, v byte slice pair from the underlying source bucket
|
// VisitFunc is called for each k, v byte slice pair from the underlying source bucket
|
||||||
// which are found in the index bucket for a provided foreign key.
|
// which are found in the index bucket for a provided foreign key.
|
||||||
type VisitFunc func(k, v []byte) error
|
type VisitFunc func(k, v []byte) error
|
||||||
|
|
|
@ -838,6 +838,23 @@ func KVForwardCursor(
|
||||||
},
|
},
|
||||||
exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03", "bbb/00"},
|
exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03", "bbb/00"},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "limit",
|
||||||
|
fields: KVStoreFields{
|
||||||
|
Bucket: []byte("bucket"),
|
||||||
|
Pairs: pairs(
|
||||||
|
"aa/00", "aa/01",
|
||||||
|
"aaa/00", "aaa/01", "aaa/02", "aaa/03",
|
||||||
|
"bbb/00", "bbb/01", "bbb/02"),
|
||||||
|
},
|
||||||
|
args: args{
|
||||||
|
seek: "aaa",
|
||||||
|
opts: []kv.CursorOption{
|
||||||
|
kv.WithCursorLimit(4),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03"},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "prefix - no hints",
|
name: "prefix - no hints",
|
||||||
fields: KVStoreFields{
|
fields: KVStoreFields{
|
||||||
|
@ -856,6 +873,26 @@ func KVForwardCursor(
|
||||||
},
|
},
|
||||||
exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03"},
|
exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03"},
|
||||||
},
|
},
|
||||||
|
|
||||||
|
{
|
||||||
|
name: "prefix with limit",
|
||||||
|
fields: KVStoreFields{
|
||||||
|
Bucket: []byte("bucket"),
|
||||||
|
Pairs: pairs(
|
||||||
|
"aa/00", "aa/01",
|
||||||
|
"aaa/00", "aaa/01", "aaa/02", "aaa/03",
|
||||||
|
"bbb/00", "bbb/01", "bbb/02"),
|
||||||
|
},
|
||||||
|
args: args{
|
||||||
|
seek: "aaa/00",
|
||||||
|
until: "bbb/02",
|
||||||
|
opts: []kv.CursorOption{
|
||||||
|
kv.WithCursorPrefix([]byte("aaa")),
|
||||||
|
kv.WithCursorLimit(2),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
exp: []string{"aaa/00", "aaa/01"},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "prefix - skip first",
|
name: "prefix - skip first",
|
||||||
fields: KVStoreFields{
|
fields: KVStoreFields{
|
||||||
|
@ -875,6 +912,26 @@ func KVForwardCursor(
|
||||||
},
|
},
|
||||||
exp: []string{"aaa/01", "aaa/02", "aaa/03"},
|
exp: []string{"aaa/01", "aaa/02", "aaa/03"},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "prefix - skip first with limit",
|
||||||
|
fields: KVStoreFields{
|
||||||
|
Bucket: []byte("bucket"),
|
||||||
|
Pairs: pairs(
|
||||||
|
"aa/00", "aa/01",
|
||||||
|
"aaa/00", "aaa/01", "aaa/02", "aaa/03",
|
||||||
|
"bbb/00", "bbb/01", "bbb/02"),
|
||||||
|
},
|
||||||
|
args: args{
|
||||||
|
seek: "aaa/00",
|
||||||
|
until: "bbb/02",
|
||||||
|
opts: []kv.CursorOption{
|
||||||
|
kv.WithCursorPrefix([]byte("aaa")),
|
||||||
|
kv.WithCursorSkipFirstItem(),
|
||||||
|
kv.WithCursorLimit(2),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
exp: []string{"aaa/01", "aaa/02"},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "prefix - skip first (one item)",
|
name: "prefix - skip first (one item)",
|
||||||
fields: KVStoreFields{
|
fields: KVStoreFields{
|
||||||
|
@ -1002,6 +1059,25 @@ func KVForwardCursor(
|
||||||
},
|
},
|
||||||
exp: []string{"bbb/00", "aaa/03", "aaa/02", "aaa/01", "aaa/00"},
|
exp: []string{"bbb/00", "aaa/03", "aaa/02", "aaa/01", "aaa/00"},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "no hints - descending - with limit",
|
||||||
|
fields: KVStoreFields{
|
||||||
|
Bucket: []byte("bucket"),
|
||||||
|
Pairs: pairs(
|
||||||
|
"aa/00", "aa/01",
|
||||||
|
"aaa/00", "aaa/01", "aaa/02", "aaa/03",
|
||||||
|
"bbb/00", "bbb/01", "bbb/02"),
|
||||||
|
},
|
||||||
|
args: args{
|
||||||
|
seek: "bbb/00",
|
||||||
|
until: "aaa/00",
|
||||||
|
opts: []kv.CursorOption{
|
||||||
|
kv.WithCursorDirection(kv.CursorDescending),
|
||||||
|
kv.WithCursorLimit(3),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
exp: []string{"bbb/00", "aaa/03", "aaa/02"},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "prefixed - no hints - descending",
|
name: "prefixed - no hints - descending",
|
||||||
fields: KVStoreFields{
|
fields: KVStoreFields{
|
||||||
|
@ -1021,6 +1097,26 @@ func KVForwardCursor(
|
||||||
},
|
},
|
||||||
exp: []string{"aaa/02", "aaa/01", "aaa/00"},
|
exp: []string{"aaa/02", "aaa/01", "aaa/00"},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "prefixed - no hints - descending - with limit",
|
||||||
|
fields: KVStoreFields{
|
||||||
|
Bucket: []byte("bucket"),
|
||||||
|
Pairs: pairs(
|
||||||
|
"aa/00", "aa/01",
|
||||||
|
"aaa/00", "aaa/01", "aaa/02", "aaa/03",
|
||||||
|
"bbb/00", "bbb/01", "bbb/02"),
|
||||||
|
},
|
||||||
|
args: args{
|
||||||
|
seek: "aaa/02",
|
||||||
|
until: "aa/",
|
||||||
|
opts: []kv.CursorOption{
|
||||||
|
kv.WithCursorPrefix([]byte("aaa/")),
|
||||||
|
kv.WithCursorDirection(kv.CursorDescending),
|
||||||
|
kv.WithCursorLimit(2),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
exp: []string{"aaa/02", "aaa/01"},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "start hint - descending",
|
name: "start hint - descending",
|
||||||
fields: KVStoreFields{
|
fields: KVStoreFields{
|
||||||
|
|
Loading…
Reference in New Issue