feat(kv): add support for prefixed cursor search (#16545)
* feat(kv): add support for prefixed cursor search * chore(kv): ensure kv store implementation return seek missing prefix error in tests * chore(kv): update changelogpull/16605/head
parent
aa8b12f2aa
commit
4f14ceabab
|
@ -4,6 +4,7 @@
|
|||
|
||||
1. [16523](https://github.com/influxdata/influxdb/pull/16523): Change influx packages to be CRD compliant
|
||||
1. [16547](https://github.com/influxdata/influxdb/pull/16547): Allow trailing newline in credentials file and CLI integration
|
||||
1. [16545](https://github.com/influxdata/influxdb/pull/16545): Add support for prefixed cursor search to ForwardCursor types
|
||||
|
||||
### UI Improvements
|
||||
|
||||
|
|
20
bolt/kv.go
20
bolt/kv.go
|
@ -1,6 +1,7 @@
|
|||
package bolt
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
|
@ -200,13 +201,18 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward
|
|||
var (
|
||||
cursor = b.bucket.Cursor()
|
||||
key, value = cursor.Seek(seek)
|
||||
config = kv.NewCursorConfig(opts...)
|
||||
)
|
||||
|
||||
if config.Prefix != nil && !bytes.HasPrefix(seek, config.Prefix) {
|
||||
return nil, fmt.Errorf("seek bytes %q not prefixed with %q: %w", string(seek), string(config.Prefix), kv.ErrSeekMissingPrefix)
|
||||
}
|
||||
|
||||
return &Cursor{
|
||||
cursor: cursor,
|
||||
key: key,
|
||||
value: value,
|
||||
config: kv.NewCursorConfig(opts...),
|
||||
config: config,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -275,7 +281,7 @@ func (c *Cursor) Last() ([]byte, []byte) {
|
|||
|
||||
// Next retrieves the next key in the bucket.
|
||||
func (c *Cursor) Next() (k []byte, v []byte) {
|
||||
if c.closed {
|
||||
if c.closed || (c.key != nil && c.missingPrefix(c.key)) {
|
||||
return nil, nil
|
||||
}
|
||||
// get and unset previously seeked values if they exist
|
||||
|
@ -290,7 +296,7 @@ func (c *Cursor) Next() (k []byte, v []byte) {
|
|||
}
|
||||
|
||||
k, v = next()
|
||||
if len(k) == 0 && len(v) == 0 {
|
||||
if (len(k) == 0 && len(v) == 0) || c.missingPrefix(k) {
|
||||
return nil, nil
|
||||
}
|
||||
return k, v
|
||||
|
@ -298,7 +304,7 @@ func (c *Cursor) Next() (k []byte, v []byte) {
|
|||
|
||||
// Prev retrieves the previous key in the bucket.
|
||||
func (c *Cursor) Prev() (k []byte, v []byte) {
|
||||
if c.closed {
|
||||
if c.closed || (c.key != nil && c.missingPrefix(c.key)) {
|
||||
return nil, nil
|
||||
}
|
||||
// get and unset previously seeked values if they exist
|
||||
|
@ -313,12 +319,16 @@ func (c *Cursor) Prev() (k []byte, v []byte) {
|
|||
}
|
||||
|
||||
k, v = prev()
|
||||
if len(k) == 0 && len(v) == 0 {
|
||||
if (len(k) == 0 && len(v) == 0) || c.missingPrefix(k) {
|
||||
return nil, nil
|
||||
}
|
||||
return k, v
|
||||
}
|
||||
|
||||
func (c *Cursor) missingPrefix(key []byte) bool {
|
||||
return c.config.Prefix != nil && !bytes.HasPrefix(key, c.config.Prefix)
|
||||
}
|
||||
|
||||
// Err always returns nil as nothing can go wrong™ during iteration
|
||||
func (c *Cursor) Err() error {
|
||||
return nil
|
||||
|
|
28
inmem/kv.go
28
inmem/kv.go
|
@ -240,6 +240,11 @@ type pair struct {
|
|||
|
||||
// ForwardCursor returns a directional cursor which starts at the provided seeked key
|
||||
func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.ForwardCursor, error) {
|
||||
config := kv.NewCursorConfig(opts...)
|
||||
if config.Prefix != nil && !bytes.HasPrefix(seek, config.Prefix) {
|
||||
return nil, fmt.Errorf("seek bytes %q not prefixed with %q: %w", string(seek), string(config.Prefix), kv.ErrSeekMissingPrefix)
|
||||
}
|
||||
|
||||
var (
|
||||
pairs = make(chan []pair)
|
||||
stop = make(chan struct{})
|
||||
|
@ -262,20 +267,15 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward
|
|||
|
||||
var (
|
||||
batch []pair
|
||||
config = kv.NewCursorConfig(opts...)
|
||||
fn = config.Hints.PredicateFn
|
||||
iterate = func(it btree.ItemIterator) {
|
||||
b.btree.AscendGreaterOrEqual(&item{key: seek}, it)
|
||||
}
|
||||
iterate = b.ascend
|
||||
)
|
||||
|
||||
if config.Direction == kv.CursorDescending {
|
||||
iterate = func(it btree.ItemIterator) {
|
||||
b.btree.DescendLessOrEqual(&item{key: seek}, it)
|
||||
}
|
||||
iterate = b.descend
|
||||
}
|
||||
|
||||
iterate(func(i btree.Item) bool {
|
||||
iterate(seek, config, func(i btree.Item) bool {
|
||||
select {
|
||||
case <-stop:
|
||||
// if signalled to stop then exit iteration
|
||||
|
@ -290,6 +290,10 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward
|
|||
return false
|
||||
}
|
||||
|
||||
if config.Prefix != nil && !bytes.HasPrefix(j.key, config.Prefix) {
|
||||
return false
|
||||
}
|
||||
|
||||
if fn == nil || fn(j.key, j.value) {
|
||||
batch = append(batch, pair{Pair: kv.Pair{Key: j.key, Value: j.value}})
|
||||
}
|
||||
|
@ -317,6 +321,14 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward
|
|||
return &ForwardCursor{pairs: pairs, stop: stop}, nil
|
||||
}
|
||||
|
||||
func (b *Bucket) ascend(seek []byte, config kv.CursorConfig, it btree.ItemIterator) {
|
||||
b.btree.AscendGreaterOrEqual(&item{key: seek}, it)
|
||||
}
|
||||
|
||||
func (b *Bucket) descend(seek []byte, config kv.CursorConfig, it btree.ItemIterator) {
|
||||
b.btree.DescendLessOrEqual(&item{key: seek}, it)
|
||||
}
|
||||
|
||||
// ForwardCursor is a kv.ForwardCursor which iterates over an in-memory btree
|
||||
type ForwardCursor struct {
|
||||
pairs <-chan []pair
|
||||
|
|
16
kv/store.go
16
kv/store.go
|
@ -11,6 +11,9 @@ var (
|
|||
// ErrTxNotWritable is the error returned when an mutable operation is called during
|
||||
// a non-writable transaction.
|
||||
ErrTxNotWritable = errors.New("transaction is not writable")
|
||||
// ErrSeekMissingPrefix is returned when seek bytes is missing the prefix defined via
|
||||
// WithCursorPrefix
|
||||
ErrSeekMissingPrefix = errors.New("seek missing prefix bytes")
|
||||
)
|
||||
|
||||
// IsNotFound returns a boolean indicating whether the error is known to report that a key or was not found.
|
||||
|
@ -141,6 +144,7 @@ const (
|
|||
type CursorConfig struct {
|
||||
Direction CursorDirection
|
||||
Hints CursorHints
|
||||
Prefix []byte
|
||||
}
|
||||
|
||||
// NewCursorConfig constructs and configures a CursorConfig used to configure
|
||||
|
@ -171,3 +175,15 @@ func WithCursorHints(hints ...CursorHint) CursorOption {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WithCursorPrefix configures the forward cursor to retrieve keys
|
||||
// with a particular prefix. This implies the cursor will start and end
|
||||
// at a specific location based on the prefix [prefix, prefix + 1).
|
||||
//
|
||||
// The value of the seek bytes must be prefixed with the provided
|
||||
// prefix, otherwise an error will be returned.
|
||||
func WithCursorPrefix(prefix []byte) CursorOption {
|
||||
return func(c *CursorConfig) {
|
||||
c.Prefix = prefix
|
||||
}
|
||||
}
|
||||
|
|
136
testing/kv.go
136
testing/kv.go
|
@ -3,6 +3,7 @@ package testing
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -682,10 +683,9 @@ func KVForwardCursor(
|
|||
t *testing.T,
|
||||
) {
|
||||
type args struct {
|
||||
seek string
|
||||
direction kv.CursorDirection
|
||||
until string
|
||||
hints []kv.CursorHint
|
||||
seek string
|
||||
until string
|
||||
opts []kv.CursorOption
|
||||
}
|
||||
|
||||
pairs := func(keys ...string) []kv.Pair {
|
||||
|
@ -719,6 +719,42 @@ func KVForwardCursor(
|
|||
},
|
||||
exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03", "bbb/00"},
|
||||
},
|
||||
{
|
||||
name: "prefix - no hints",
|
||||
fields: KVStoreFields{
|
||||
Bucket: []byte("bucket"),
|
||||
Pairs: pairs(
|
||||
"aa/00", "aa/01",
|
||||
"aaa/00", "aaa/01", "aaa/02", "aaa/03",
|
||||
"bbb/00", "bbb/01", "bbb/02"),
|
||||
},
|
||||
args: args{
|
||||
seek: "aaa/00",
|
||||
until: "bbb/02",
|
||||
opts: []kv.CursorOption{
|
||||
kv.WithCursorPrefix([]byte("aaa")),
|
||||
},
|
||||
},
|
||||
exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03"},
|
||||
},
|
||||
{
|
||||
name: "prefix - does not prefix seek",
|
||||
fields: KVStoreFields{
|
||||
Bucket: []byte("bucket"),
|
||||
Pairs: pairs(
|
||||
"aa/00", "aa/01",
|
||||
"aaa/00", "aaa/01", "aaa/02", "aaa/03",
|
||||
"bbb/00", "bbb/01", "bbb/02"),
|
||||
},
|
||||
args: args{
|
||||
seek: "aaa/00",
|
||||
until: "bbb/02",
|
||||
opts: []kv.CursorOption{
|
||||
kv.WithCursorPrefix([]byte("aab")),
|
||||
},
|
||||
},
|
||||
expErr: kv.ErrSeekMissingPrefix,
|
||||
},
|
||||
{
|
||||
name: "prefix hint",
|
||||
fields: KVStoreFields{
|
||||
|
@ -731,7 +767,9 @@ func KVForwardCursor(
|
|||
args: args{
|
||||
seek: "aaa",
|
||||
until: "aaa/03",
|
||||
hints: []kv.CursorHint{kv.WithCursorHintPrefix("aaa/")},
|
||||
opts: []kv.CursorOption{
|
||||
kv.WithCursorHints(kv.WithCursorHintPrefix("aaa/")),
|
||||
},
|
||||
},
|
||||
exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03"},
|
||||
},
|
||||
|
@ -747,7 +785,9 @@ func KVForwardCursor(
|
|||
args: args{
|
||||
seek: "aaa",
|
||||
until: "bbb/00",
|
||||
hints: []kv.CursorHint{kv.WithCursorHintKeyStart("aaa/")},
|
||||
opts: []kv.CursorOption{
|
||||
kv.WithCursorHints(kv.WithCursorHintKeyStart("aaa/")),
|
||||
},
|
||||
},
|
||||
exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03", "bbb/00"},
|
||||
},
|
||||
|
@ -763,10 +803,11 @@ func KVForwardCursor(
|
|||
args: args{
|
||||
seek: "aaa",
|
||||
until: "aaa/03",
|
||||
hints: []kv.CursorHint{
|
||||
kv.WithCursorHintPredicate(func(key, _ []byte) bool {
|
||||
opts: []kv.CursorOption{
|
||||
kv.WithCursorHints(kv.WithCursorHintPredicate(func(key, _ []byte) bool {
|
||||
return len(key) < 3 || string(key[:3]) == "aaa"
|
||||
})},
|
||||
})),
|
||||
},
|
||||
},
|
||||
exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03"},
|
||||
},
|
||||
|
@ -782,10 +823,11 @@ func KVForwardCursor(
|
|||
args: args{
|
||||
seek: "",
|
||||
until: "aa/01",
|
||||
hints: []kv.CursorHint{
|
||||
kv.WithCursorHintPredicate(func(_, val []byte) bool {
|
||||
opts: []kv.CursorOption{
|
||||
kv.WithCursorHints(kv.WithCursorHintPredicate(func(_, val []byte) bool {
|
||||
return len(val) < 7 || string(val[:7]) == "val:aa/"
|
||||
})},
|
||||
})),
|
||||
},
|
||||
},
|
||||
exp: []string{"aa/00", "aa/01"},
|
||||
},
|
||||
|
@ -799,12 +841,31 @@ func KVForwardCursor(
|
|||
"bbb/00", "bbb/01", "bbb/02"),
|
||||
},
|
||||
args: args{
|
||||
seek: "bbb/00",
|
||||
until: "aaa/00",
|
||||
direction: kv.CursorDescending,
|
||||
seek: "bbb/00",
|
||||
until: "aaa/00",
|
||||
opts: []kv.CursorOption{kv.WithCursorDirection(kv.CursorDescending)},
|
||||
},
|
||||
exp: []string{"bbb/00", "aaa/03", "aaa/02", "aaa/01", "aaa/00"},
|
||||
},
|
||||
{
|
||||
name: "prefixed - no hints - descending",
|
||||
fields: KVStoreFields{
|
||||
Bucket: []byte("bucket"),
|
||||
Pairs: pairs(
|
||||
"aa/00", "aa/01",
|
||||
"aaa/00", "aaa/01", "aaa/02", "aaa/03",
|
||||
"bbb/00", "bbb/01", "bbb/02"),
|
||||
},
|
||||
args: args{
|
||||
seek: "aaa/02",
|
||||
until: "aa/",
|
||||
opts: []kv.CursorOption{
|
||||
kv.WithCursorPrefix([]byte("aaa/")),
|
||||
kv.WithCursorDirection(kv.CursorDescending),
|
||||
},
|
||||
},
|
||||
exp: []string{"aaa/02", "aaa/01", "aaa/00"},
|
||||
},
|
||||
{
|
||||
name: "start hint - descending",
|
||||
fields: KVStoreFields{
|
||||
|
@ -815,10 +876,12 @@ func KVForwardCursor(
|
|||
"bbb/00", "bbb/01", "bbb/02"),
|
||||
},
|
||||
args: args{
|
||||
seek: "bbb/00",
|
||||
until: "aaa/00",
|
||||
direction: kv.CursorDescending,
|
||||
hints: []kv.CursorHint{kv.WithCursorHintKeyStart("aaa/")},
|
||||
seek: "bbb/00",
|
||||
until: "aaa/00",
|
||||
opts: []kv.CursorOption{
|
||||
kv.WithCursorDirection(kv.CursorDescending),
|
||||
kv.WithCursorHints(kv.WithCursorHintKeyStart("aaa/")),
|
||||
},
|
||||
},
|
||||
exp: []string{"bbb/00", "aaa/03", "aaa/02", "aaa/01", "aaa/00"},
|
||||
},
|
||||
|
@ -832,13 +895,14 @@ func KVForwardCursor(
|
|||
"bbb/00", "bbb/01", "bbb/02"),
|
||||
},
|
||||
args: args{
|
||||
seek: "aaa/03",
|
||||
until: "aaa/00",
|
||||
direction: kv.CursorDescending,
|
||||
hints: []kv.CursorHint{
|
||||
kv.WithCursorHintPredicate(func(key, _ []byte) bool {
|
||||
seek: "aaa/03",
|
||||
until: "aaa/00",
|
||||
opts: []kv.CursorOption{
|
||||
kv.WithCursorDirection(kv.CursorDescending),
|
||||
kv.WithCursorHints(kv.WithCursorHintPredicate(func(key, _ []byte) bool {
|
||||
return len(key) < 3 || string(key[:3]) == "aaa"
|
||||
})},
|
||||
})),
|
||||
},
|
||||
},
|
||||
exp: []string{"aaa/03", "aaa/02", "aaa/01", "aaa/00"},
|
||||
},
|
||||
|
@ -852,13 +916,14 @@ func KVForwardCursor(
|
|||
"bbb/00", "bbb/01", "bbb/02"),
|
||||
},
|
||||
args: args{
|
||||
seek: "aa/01",
|
||||
until: "aa/00",
|
||||
direction: kv.CursorDescending,
|
||||
hints: []kv.CursorHint{
|
||||
kv.WithCursorHintPredicate(func(_, val []byte) bool {
|
||||
seek: "aa/01",
|
||||
until: "aa/00",
|
||||
opts: []kv.CursorOption{
|
||||
kv.WithCursorDirection(kv.CursorDescending),
|
||||
kv.WithCursorHints(kv.WithCursorHintPredicate(func(_, val []byte) bool {
|
||||
return len(val) >= 7 && string(val[:7]) == "val:aa/"
|
||||
})},
|
||||
})),
|
||||
},
|
||||
},
|
||||
exp: []string{"aa/01", "aa/00"},
|
||||
},
|
||||
|
@ -876,10 +941,13 @@ func KVForwardCursor(
|
|||
return err
|
||||
}
|
||||
|
||||
cur, err := b.ForwardCursor([]byte(tt.args.seek),
|
||||
kv.WithCursorDirection(tt.args.direction),
|
||||
kv.WithCursorHints(tt.args.hints...))
|
||||
cur, err := b.ForwardCursor([]byte(tt.args.seek), tt.args.opts...)
|
||||
if err != nil {
|
||||
if tt.expErr != nil && errors.Is(err, tt.expErr) {
|
||||
// successfully returned expected error
|
||||
return nil
|
||||
}
|
||||
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue