feat(kv): add support for batch getting keys on bucket interface (#17531)
parent
eda8d44c74
commit
9d5d63518d
15
bolt/kv.go
15
bolt/kv.go
|
@ -197,6 +197,21 @@ func (b *Bucket) Get(key []byte) ([]byte, error) {
|
|||
return val, nil
|
||||
}
|
||||
|
||||
// GetBatch retrieves the values for the provided keys.
|
||||
func (b *Bucket) GetBatch(keys ...[]byte) ([][]byte, error) {
|
||||
values := make([][]byte, len(keys))
|
||||
for idx, key := range keys {
|
||||
val := b.bucket.Get(key)
|
||||
if len(val) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
values[idx] = val
|
||||
}
|
||||
|
||||
return values, nil
|
||||
}
|
||||
|
||||
// Put sets the value at the provided key.
|
||||
func (b *Bucket) Put(key []byte, value []byte) error {
|
||||
err := b.bucket.Put(key, value)
|
||||
|
|
26
inmem/kv.go
26
inmem/kv.go
|
@ -195,6 +195,32 @@ func (b *Bucket) Get(key []byte) ([]byte, error) {
|
|||
return j.value, nil
|
||||
}
|
||||
|
||||
// Get retrieves a batch of values for the provided keys.
|
||||
func (b *Bucket) GetBatch(keys ...[]byte) ([][]byte, error) {
|
||||
b.mu.RLock()
|
||||
defer b.mu.RUnlock()
|
||||
|
||||
values := make([][]byte, len(keys))
|
||||
|
||||
for idx, key := range keys {
|
||||
i := b.btree.Get(&item{key: key})
|
||||
|
||||
if i == nil {
|
||||
// leave value as nil slice
|
||||
continue
|
||||
}
|
||||
|
||||
j, ok := i.(*item)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("error item is type %T not *item", i)
|
||||
}
|
||||
|
||||
values[idx] = j.value
|
||||
}
|
||||
|
||||
return values, nil
|
||||
}
|
||||
|
||||
// Put sets the key value pair provided.
|
||||
func (b *Bucket) Put(key []byte, value []byte) error {
|
||||
b.mu.Lock()
|
||||
|
|
|
@ -93,6 +93,10 @@ type Bucket interface {
|
|||
// TODO context?
|
||||
// Get returns a key within this bucket. Errors if key does not exist.
|
||||
Get(key []byte) ([]byte, error)
|
||||
// GetBatch returns a corresponding set of values for the provided
|
||||
// set of keys. If a value cannot be found for any provided key its
|
||||
// value will be nil at the same index for the provided key.
|
||||
GetBatch(keys ...[]byte) ([][]byte, error)
|
||||
// Cursor returns a cursor at the beginning of this bucket optionally
|
||||
// using the provided hints to improve performance.
|
||||
Cursor(hints ...CursorHint) (Cursor, error)
|
||||
|
|
|
@ -61,6 +61,7 @@ var _ (kv.Bucket) = (*Bucket)(nil)
|
|||
// in a key value store
|
||||
type Bucket struct {
|
||||
GetFn func(key []byte) ([]byte, error)
|
||||
GetBatchFn func(keys ...[]byte) ([][]byte, error)
|
||||
CursorFn func() (kv.Cursor, error)
|
||||
PutFn func(key, value []byte) error
|
||||
DeleteFn func(key []byte) error
|
||||
|
@ -72,6 +73,11 @@ func (b *Bucket) Get(key []byte) ([]byte, error) {
|
|||
return b.GetFn(key)
|
||||
}
|
||||
|
||||
// GetBatch returns a set of keys values within this bucket.
|
||||
func (b *Bucket) GetBatch(keys ...[]byte) ([][]byte, error) {
|
||||
return b.GetBatchFn(keys...)
|
||||
}
|
||||
|
||||
// Cursor returns a cursor at the beginning of this bucket.
|
||||
func (b *Bucket) Cursor(opts ...kv.CursorHint) (kv.Cursor, error) {
|
||||
return b.CursorFn()
|
||||
|
|
117
testing/kv.go
117
testing/kv.go
|
@ -5,6 +5,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -35,6 +36,10 @@ func KVStore(
|
|||
name: "Get",
|
||||
fn: KVGet,
|
||||
},
|
||||
{
|
||||
name: "GetBatch",
|
||||
fn: KVGetBatch,
|
||||
},
|
||||
{
|
||||
name: "Put",
|
||||
fn: KVPut,
|
||||
|
@ -171,6 +176,118 @@ func KVGet(
|
|||
}
|
||||
}
|
||||
|
||||
// KVGetBatch tests the get batch method contract for the key value store.
|
||||
func KVGetBatch(
|
||||
init func(KVStoreFields, *testing.T) (kv.Store, func()),
|
||||
t *testing.T,
|
||||
) {
|
||||
type args struct {
|
||||
bucket []byte
|
||||
keys [][]byte
|
||||
}
|
||||
type wants struct {
|
||||
err error
|
||||
vals [][]byte
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
fields KVStoreFields
|
||||
args args
|
||||
wants wants
|
||||
}{
|
||||
{
|
||||
name: "get keys",
|
||||
fields: KVStoreFields{
|
||||
Bucket: []byte("bucket"),
|
||||
Pairs: []kv.Pair{
|
||||
{
|
||||
Key: []byte("hello"),
|
||||
Value: []byte("world"),
|
||||
},
|
||||
{
|
||||
Key: []byte("color"),
|
||||
Value: []byte("orange"),
|
||||
},
|
||||
{
|
||||
Key: []byte("organization"),
|
||||
Value: []byte("influx"),
|
||||
},
|
||||
},
|
||||
},
|
||||
args: args{
|
||||
bucket: []byte("bucket"),
|
||||
keys: [][]byte{[]byte("hello"), []byte("organization")},
|
||||
},
|
||||
wants: wants{
|
||||
vals: [][]byte{[]byte("world"), []byte("influx")},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "get keys with missing",
|
||||
fields: KVStoreFields{
|
||||
Bucket: []byte("bucket"),
|
||||
Pairs: []kv.Pair{
|
||||
{
|
||||
Key: []byte("hello"),
|
||||
Value: []byte("world"),
|
||||
},
|
||||
{
|
||||
Key: []byte("organization"),
|
||||
Value: []byte("influx"),
|
||||
},
|
||||
},
|
||||
},
|
||||
args: args{
|
||||
bucket: []byte("bucket"),
|
||||
keys: [][]byte{[]byte("hello"), []byte("color")},
|
||||
},
|
||||
wants: wants{
|
||||
vals: [][]byte{[]byte("world"), nil},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
s, close := init(tt.fields, t)
|
||||
defer close()
|
||||
|
||||
err := s.View(context.Background(), func(tx kv.Tx) error {
|
||||
b, err := tx.Bucket(tt.args.bucket)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error retrieving bucket: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
vals, err := b.GetBatch(tt.args.keys...)
|
||||
if (err != nil) != (tt.wants.err != nil) {
|
||||
t.Errorf("expected error '%v' got '%v'", tt.wants.err, err)
|
||||
return err
|
||||
}
|
||||
|
||||
if err != nil && tt.wants.err != nil {
|
||||
if err.Error() != tt.wants.err.Error() {
|
||||
t.Errorf("expected error messages to match '%v' got '%v'", tt.wants.err, err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if want, got := tt.wants.vals, vals; !reflect.DeepEqual(want, got) {
|
||||
t.Errorf("exptected to get value %q got %q", want, got)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("error during view transaction: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// KVPut tests the get method contract for the key value store.
|
||||
func KVPut(
|
||||
init func(KVStoreFields, *testing.T) (kv.Store, func()),
|
||||
|
|
Loading…
Reference in New Issue