feat(kv): define forward cursor interface (#16212)

* feat(kv): define forward cursor interface

* feat(kv): implement ForwardCursor on bolt and inmem buckets

* feat(kv): update tests to capture forward cursor

* fix(kv): typo in docs

* feat(kv): add Err method to ForwardCursor interface

* feat(inmem): batch pair channel sends in forward cursor

* fix(kv): remove Err field from kv.Pair

* feat(kv): add Close to kv.ForwardCursor interface
pull/16297/head
George 2019-12-19 17:30:05 +01:00 committed by GitHub
parent cc0943ceeb
commit 48b8cb84f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 542 additions and 8 deletions

View File

@ -8,6 +8,7 @@
4. [16262](https://github.com/influxdata/influxdb/pull/16262): add support for check resource dry run functionality 4. [16262](https://github.com/influxdata/influxdb/pull/16262): add support for check resource dry run functionality
5. [16275](https://github.com/influxdata/influxdb/pull/16275): add support for check resource apply functionality 5. [16275](https://github.com/influxdata/influxdb/pull/16275): add support for check resource apply functionality
6. [16283](https://github.com/influxdata/influxdb/pull/16283): add support for check resource export functionality 6. [16283](https://github.com/influxdata/influxdb/pull/16283): add support for check resource export functionality
1. [16212](https://github.com/influxdata/influxdb/pull/16212): Add new kv.ForwardCursor interface
### Bug Fixes ### Bug Fixes

View File

@ -13,6 +13,9 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
// check that *KVStore implement kv.Store interface.
var _ (kv.Store) = (*KVStore)(nil)
// KVStore is a kv.Store backed by boltdb. // KVStore is a kv.Store backed by boltdb.
type KVStore struct { type KVStore struct {
path string path string
@ -191,6 +194,22 @@ func (b *Bucket) Delete(key []byte) error {
return err return err
} }
// ForwardCursor retrieves a cursor for iterating through the entries
// in the key value store in a given direction (ascending / descending).
func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.ForwardCursor, error) {
var (
cursor = b.bucket.Cursor()
key, value = cursor.Seek(seek)
)
return &Cursor{
cursor: cursor,
key: key,
value: value,
config: kv.NewCursorConfig(opts...),
}, nil
}
// Cursor retrieves a cursor for iterating through the entries // Cursor retrieves a cursor for iterating through the entries
// in the key value store. // in the key value store.
func (b *Bucket) Cursor(opts ...kv.CursorHint) (kv.Cursor, error) { func (b *Bucket) Cursor(opts ...kv.CursorHint) (kv.Cursor, error) {
@ -203,10 +222,26 @@ func (b *Bucket) Cursor(opts ...kv.CursorHint) (kv.Cursor, error) {
// in the key value store. // in the key value store.
type Cursor struct { type Cursor struct {
cursor *bolt.Cursor cursor *bolt.Cursor
// previously seeked key/value
key, value []byte
config kv.CursorConfig
closed bool
}
// Close sets the closed to closed
func (c *Cursor) Close() error {
c.closed = true
return nil
} }
// Seek seeks for the first key that matches the prefix provided. // Seek seeks for the first key that matches the prefix provided.
func (c *Cursor) Seek(prefix []byte) ([]byte, []byte) { func (c *Cursor) Seek(prefix []byte) ([]byte, []byte) {
if c.closed {
return nil, nil
}
k, v := c.cursor.Seek(prefix) k, v := c.cursor.Seek(prefix)
if len(k) == 0 && len(v) == 0 { if len(k) == 0 && len(v) == 0 {
return nil, nil return nil, nil
@ -216,6 +251,9 @@ func (c *Cursor) Seek(prefix []byte) ([]byte, []byte) {
// First retrieves the first key value pair in the bucket. // First retrieves the first key value pair in the bucket.
func (c *Cursor) First() ([]byte, []byte) { func (c *Cursor) First() ([]byte, []byte) {
if c.closed {
return nil, nil
}
k, v := c.cursor.First() k, v := c.cursor.First()
if len(k) == 0 && len(v) == 0 { if len(k) == 0 && len(v) == 0 {
return nil, nil return nil, nil
@ -225,6 +263,9 @@ func (c *Cursor) First() ([]byte, []byte) {
// Last retrieves the last key value pair in the bucket. // Last retrieves the last key value pair in the bucket.
func (c *Cursor) Last() ([]byte, []byte) { func (c *Cursor) Last() ([]byte, []byte) {
if c.closed {
return nil, nil
}
k, v := c.cursor.Last() k, v := c.cursor.Last()
if len(k) == 0 && len(v) == 0 { if len(k) == 0 && len(v) == 0 {
return nil, nil return nil, nil
@ -233,8 +274,22 @@ func (c *Cursor) Last() ([]byte, []byte) {
} }
// Next retrieves the next key in the bucket. // Next retrieves the next key in the bucket.
func (c *Cursor) Next() ([]byte, []byte) { func (c *Cursor) Next() (k []byte, v []byte) {
k, v := c.cursor.Next() if c.closed {
return nil, nil
}
// get and unset previously seeked values if they exist
k, v, c.key, c.value = c.key, c.value, nil, nil
if len(k) > 0 && len(v) > 0 {
return
}
next := c.cursor.Next
if c.config.Direction == kv.CursorDescending {
next = c.cursor.Prev
}
k, v = next()
if len(k) == 0 && len(v) == 0 { if len(k) == 0 && len(v) == 0 {
return nil, nil return nil, nil
} }
@ -242,10 +297,29 @@ func (c *Cursor) Next() ([]byte, []byte) {
} }
// Prev retrieves the previous key in the bucket. // Prev retrieves the previous key in the bucket.
func (c *Cursor) Prev() ([]byte, []byte) { func (c *Cursor) Prev() (k []byte, v []byte) {
k, v := c.cursor.Prev() if c.closed {
return nil, nil
}
// get and unset previously seeked values if they exist
k, v, c.key, c.value = c.key, c.value, nil, nil
if len(k) > 0 && len(v) > 0 {
return
}
prev := c.cursor.Prev
if c.config.Direction == kv.CursorDescending {
prev = c.cursor.Next
}
k, v = prev()
if len(k) == 0 && len(v) == 0 { if len(k) == 0 && len(v) == 0 {
return nil, nil return nil, nil
} }
return k, v return k, v
} }
// Err always returns nil as nothing can go wrong™ during iteration
func (c *Cursor) Err() error {
return nil
}

View File

@ -10,6 +10,13 @@ import (
"github.com/influxdata/influxdb/kv" "github.com/influxdata/influxdb/kv"
) )
// ensure *KVStore implement kv.Store interface
var _ kv.Store = (*KVStore)(nil)
// cursorBatchSize is the size of a batch sent by a forward cursors
// tree iterator
const cursorBatchSize = 1000
// KVStore is an in memory btree backed kv.Store. // KVStore is an in memory btree backed kv.Store.
type KVStore struct { type KVStore struct {
mu sync.RWMutex mu sync.RWMutex
@ -225,3 +232,142 @@ func (b *Bucket) getAll(o *kv.CursorHints) ([]kv.Pair, error) {
return pairs, nil return pairs, nil
} }
type pair struct {
kv.Pair
err error
}
// ForwardCursor returns a directional cursor which starts at the provided seeked key
func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.ForwardCursor, error) {
var (
pairs = make(chan []pair)
stop = make(chan struct{})
send = func(batch []pair) bool {
if len(batch) == 0 {
return true
}
select {
case pairs <- batch:
return true
case <-stop:
return false
}
}
)
go func() {
defer close(pairs)
var (
batch []pair
config = kv.NewCursorConfig(opts...)
fn = config.Hints.PredicateFn
iterate = func(it btree.ItemIterator) {
b.btree.AscendGreaterOrEqual(&item{key: seek}, it)
}
)
if config.Direction == kv.CursorDescending {
iterate = func(it btree.ItemIterator) {
b.btree.DescendLessOrEqual(&item{key: seek}, it)
}
}
iterate(func(i btree.Item) bool {
select {
case <-stop:
// if signalled to stop then exit iteration
return false
default:
}
j, ok := i.(*item)
if !ok {
batch = append(batch, pair{err: fmt.Errorf("error item is type %T not *item", i)})
return false
}
if fn == nil || fn(j.key, j.value) {
batch = append(batch, pair{Pair: kv.Pair{Key: j.key, Value: j.value}})
}
if len(batch) < cursorBatchSize {
return true
}
if send(batch) {
// batch flushed successfully so we can
// begin a new batch
batch = nil
return true
}
// we've been signalled to stop
return false
})
// send if any left in batch
send(batch)
}()
return &ForwardCursor{pairs: pairs, stop: stop}, nil
}
// ForwardCursor is a kv.ForwardCursor which iterates over an in-memory btree
type ForwardCursor struct {
pairs <-chan []pair
cur []pair
n int
stop chan struct{}
closed bool
// error found during iteration
err error
}
// Err returns a non-nil error when an error occurred during cursor iteration.
func (c *ForwardCursor) Err() error {
return c.err
}
// Close releases the producing goroutines for the forward cursor.
// It blocks until the producing goroutine exits.
func (c *ForwardCursor) Close() error {
if c.closed {
return nil
}
close(c.stop)
c.closed = true
return nil
}
// Next returns the next key/value pair in the cursor
func (c *ForwardCursor) Next() ([]byte, []byte) {
if c.err != nil || c.closed {
return nil, nil
}
if c.n >= len(c.cur) {
var ok bool
c.cur, ok = <-c.pairs
if !ok {
return nil, nil
}
c.n = 0
}
pair := c.cur[c.n]
c.err = pair.err
c.n++
return pair.Key, pair.Value
}

View File

@ -94,6 +94,9 @@ type Bucket interface {
Put(key, value []byte) error Put(key, value []byte) error
// Delete should error if the transaction it was called in is not writable. // Delete should error if the transaction it was called in is not writable.
Delete(key []byte) error Delete(key []byte) error
// ForwardCursor returns a forward cursor from the seek position provided.
// Other options can be supplied to provide direction and hints.
ForwardCursor(seek []byte, opts ...CursorOption) (ForwardCursor, error)
} }
// Cursor is an abstraction for iterating/ranging through data. A concrete implementation // Cursor is an abstraction for iterating/ranging through data. A concrete implementation
@ -110,3 +113,61 @@ type Cursor interface {
// Prev moves the cursor to the prev key in the bucket. // Prev moves the cursor to the prev key in the bucket.
Prev() (k []byte, v []byte) Prev() (k []byte, v []byte)
} }
// ForwardCursor is an abstraction for interacting/ranging through data in one direction.
type ForwardCursor interface {
// Next moves the cursor to the next key in the bucket.
Next() (k, v []byte)
// Err returns non-nil if an error occurred during cursor iteration.
// This should always be checked after Next returns a nil key/value.
Err() error
// Close is reponsible for freeing any resources created by the cursor.
Close() error
}
// CursorDirection is an integer used to define the direction
// a request cursor operates in.
type CursorDirection int
const (
// CursorAscending directs a cursor to range in ascending order
CursorAscending CursorDirection = iota
// CursorAscending directs a cursor to range in descending order
CursorDescending
)
// CursorConfig is a type used to configure a new forward cursor.
// It includes a direction and a set of hints
type CursorConfig struct {
Direction CursorDirection
Hints CursorHints
}
// NewCursorConfig constructs and configures a CursorConfig used to configure
// a forward cursor.
func NewCursorConfig(opts ...CursorOption) CursorConfig {
conf := CursorConfig{}
for _, opt := range opts {
opt(&conf)
}
return conf
}
// CursorOption is a functional option for configuring a forward cursor
type CursorOption func(*CursorConfig)
// WithCursorDirection sets the cursor direction on a provided cursor config
func WithCursorDirection(direction CursorDirection) CursorOption {
return func(c *CursorConfig) {
c.Direction = direction
}
}
// WithCursorHints configs the provided hints on the cursor config
func WithCursorHints(hints ...CursorHint) CursorOption {
return func(c *CursorConfig) {
for _, hint := range hints {
hint(&c.Hints)
}
}
}

View File

@ -54,10 +54,11 @@ var _ (kv.Bucket) = (*Bucket)(nil)
// Bucket is the abstraction used to perform get/put/delete/get-many operations // Bucket is the abstraction used to perform get/put/delete/get-many operations
// in a key value store // in a key value store
type Bucket struct { type Bucket struct {
GetFn func(key []byte) ([]byte, error) GetFn func(key []byte) ([]byte, error)
CursorFn func() (kv.Cursor, error) CursorFn func() (kv.Cursor, error)
PutFn func(key, value []byte) error PutFn func(key, value []byte) error
DeleteFn func(key []byte) error DeleteFn func(key []byte) error
ForwardCursorFn func([]byte, ...kv.CursorOption) kv.ForwardCursor
} }
// Get returns a key within this bucket. Errors if key does not exist. // Get returns a key within this bucket. Errors if key does not exist.
@ -80,6 +81,11 @@ func (b *Bucket) Delete(key []byte) error {
return b.DeleteFn(key) return b.DeleteFn(key)
} }
// ForwardCursor returns a cursor from the seek points in the configured direction.
func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.ForwardCursor, error) {
return b.ForwardCursorFn(seek, opts...), nil
}
var _ (kv.Cursor) = (*Cursor)(nil) var _ (kv.Cursor) = (*Cursor)(nil)
// Cursor is an abstraction for iterating/ranging through data. A concrete implementation // Cursor is an abstraction for iterating/ranging through data. A concrete implementation

View File

@ -50,6 +50,10 @@ func KVStore(
name: "CursorWithHints", name: "CursorWithHints",
fn: KVCursorWithHints, fn: KVCursorWithHints,
}, },
{
name: "ForwardCursor",
fn: KVForwardCursor,
},
{ {
name: "View", name: "View",
fn: KVView, fn: KVView,
@ -672,6 +676,248 @@ func KVCursorWithHints(
} }
} }
// KVForwardCursor tests the forward cursor contract for the key value store.
func KVForwardCursor(
init func(KVStoreFields, *testing.T) (kv.Store, func()),
t *testing.T,
) {
type args struct {
seek string
direction kv.CursorDirection
until string
hints []kv.CursorHint
}
pairs := func(keys ...string) []kv.Pair {
p := make([]kv.Pair, len(keys))
for i, k := range keys {
p[i].Key = []byte(k)
p[i].Value = []byte("val:" + k)
}
return p
}
tests := []struct {
name string
fields KVStoreFields
args args
exp []string
expErr error
}{
{
name: "no hints",
fields: KVStoreFields{
Bucket: []byte("bucket"),
Pairs: pairs(
"aa/00", "aa/01",
"aaa/00", "aaa/01", "aaa/02", "aaa/03",
"bbb/00", "bbb/01", "bbb/02"),
},
args: args{
seek: "aaa",
until: "bbb/00",
},
exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03", "bbb/00"},
},
{
name: "prefix hint",
fields: KVStoreFields{
Bucket: []byte("bucket"),
Pairs: pairs(
"aa/00", "aa/01",
"aaa/00", "aaa/01", "aaa/02", "aaa/03",
"bbb/00", "bbb/01", "bbb/02"),
},
args: args{
seek: "aaa",
until: "aaa/03",
hints: []kv.CursorHint{kv.WithCursorHintPrefix("aaa/")},
},
exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03"},
},
{
name: "start hint",
fields: KVStoreFields{
Bucket: []byte("bucket"),
Pairs: pairs(
"aa/00", "aa/01",
"aaa/00", "aaa/01", "aaa/02", "aaa/03",
"bbb/00", "bbb/01", "bbb/02"),
},
args: args{
seek: "aaa",
until: "bbb/00",
hints: []kv.CursorHint{kv.WithCursorHintKeyStart("aaa/")},
},
exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03", "bbb/00"},
},
{
name: "predicate for key",
fields: KVStoreFields{
Bucket: []byte("bucket"),
Pairs: pairs(
"aa/00", "aa/01",
"aaa/00", "aaa/01", "aaa/02", "aaa/03",
"bbb/00", "bbb/01", "bbb/02"),
},
args: args{
seek: "aaa",
until: "aaa/03",
hints: []kv.CursorHint{
kv.WithCursorHintPredicate(func(key, _ []byte) bool {
return len(key) < 3 || string(key[:3]) == "aaa"
})},
},
exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03"},
},
{
name: "predicate for value",
fields: KVStoreFields{
Bucket: []byte("bucket"),
Pairs: pairs(
"aa/00", "aa/01",
"aaa/00", "aaa/01", "aaa/02", "aaa/03",
"bbb/00", "bbb/01", "bbb/02"),
},
args: args{
seek: "",
until: "aa/01",
hints: []kv.CursorHint{
kv.WithCursorHintPredicate(func(_, val []byte) bool {
return len(val) < 7 || string(val[:7]) == "val:aa/"
})},
},
exp: []string{"aa/00", "aa/01"},
},
{
name: "no hints - descending",
fields: KVStoreFields{
Bucket: []byte("bucket"),
Pairs: pairs(
"aa/00", "aa/01",
"aaa/00", "aaa/01", "aaa/02", "aaa/03",
"bbb/00", "bbb/01", "bbb/02"),
},
args: args{
seek: "bbb/00",
until: "aaa/00",
direction: kv.CursorDescending,
},
exp: []string{"bbb/00", "aaa/03", "aaa/02", "aaa/01", "aaa/00"},
},
{
name: "start hint - descending",
fields: KVStoreFields{
Bucket: []byte("bucket"),
Pairs: pairs(
"aa/00", "aa/01",
"aaa/00", "aaa/01", "aaa/02", "aaa/03",
"bbb/00", "bbb/01", "bbb/02"),
},
args: args{
seek: "bbb/00",
until: "aaa/00",
direction: kv.CursorDescending,
hints: []kv.CursorHint{kv.WithCursorHintKeyStart("aaa/")},
},
exp: []string{"bbb/00", "aaa/03", "aaa/02", "aaa/01", "aaa/00"},
},
{
name: "predicate for key - descending",
fields: KVStoreFields{
Bucket: []byte("bucket"),
Pairs: pairs(
"aa/00", "aa/01",
"aaa/00", "aaa/01", "aaa/02", "aaa/03",
"bbb/00", "bbb/01", "bbb/02"),
},
args: args{
seek: "aaa/03",
until: "aaa/00",
direction: kv.CursorDescending,
hints: []kv.CursorHint{
kv.WithCursorHintPredicate(func(key, _ []byte) bool {
return len(key) < 3 || string(key[:3]) == "aaa"
})},
},
exp: []string{"aaa/03", "aaa/02", "aaa/01", "aaa/00"},
},
{
name: "predicate for value - descending",
fields: KVStoreFields{
Bucket: []byte("bucket"),
Pairs: pairs(
"aa/00", "aa/01",
"aaa/00", "aaa/01", "aaa/02", "aaa/03",
"bbb/00", "bbb/01", "bbb/02"),
},
args: args{
seek: "aa/01",
until: "aa/00",
direction: kv.CursorDescending,
hints: []kv.CursorHint{
kv.WithCursorHintPredicate(func(_, val []byte) bool {
return len(val) >= 7 && string(val[:7]) == "val:aa/"
})},
},
exp: []string{"aa/01", "aa/00"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, fin := init(tt.fields, t)
defer fin()
err := s.View(context.Background(), func(tx kv.Tx) error {
b, err := tx.Bucket([]byte("bucket"))
if err != nil {
t.Errorf("unexpected error retrieving bucket: %v", err)
return err
}
cur, err := b.ForwardCursor([]byte(tt.args.seek),
kv.WithCursorDirection(tt.args.direction),
kv.WithCursorHints(tt.args.hints...))
if err != nil {
t.Errorf("unexpected error: %v", err)
return err
}
var got []string
k, _ := cur.Next()
for len(k) > 0 {
got = append(got, string(k))
if string(k) == tt.args.until {
break
}
k, _ = cur.Next()
}
if exp := tt.exp; !cmp.Equal(got, exp) {
t.Errorf("unexpected cursor values: -got/+exp\n%v", cmp.Diff(got, exp))
}
if err := cur.Err(); !cmp.Equal(err, tt.expErr) {
t.Errorf("expected error to be %v, got %v", tt.expErr, err)
}
if err := cur.Close(); err != nil {
t.Errorf("expected cursor to close with nil error, found %v", err)
}
return nil
})
if err != nil {
t.Fatalf("error during view transaction: %v", err)
}
})
}
}
// KVView tests the view method contract for the key value store. // KVView tests the view method contract for the key value store.
func KVView( func KVView(
init func(KVStoreFields, *testing.T) (kv.Store, func()), init func(KVStoreFields, *testing.T) (kv.Store, func()),