Merge pull request #15991 from influxdata/sgc/issue/5335
fix(kv): Add push down predicate hint to filter by keyspull/15994/head
commit
88e019b6fb
18
inmem/kv.go
18
inmem/kv.go
|
@ -200,9 +200,14 @@ func (b *Bucket) Delete(key []byte) error {
|
|||
|
||||
// Cursor creates a static cursor from all entries in the database.
|
||||
func (b *Bucket) Cursor(opts ...kv.CursorHint) (kv.Cursor, error) {
|
||||
var o kv.CursorHints
|
||||
for _, opt := range opts {
|
||||
opt(&o)
|
||||
}
|
||||
|
||||
// TODO we should do this by using the Ascend/Descend methods that
|
||||
// the btree provides.
|
||||
pairs, err := b.getAll()
|
||||
pairs, err := b.getAll(&o)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -210,8 +215,10 @@ func (b *Bucket) Cursor(opts ...kv.CursorHint) (kv.Cursor, error) {
|
|||
return kv.NewStaticCursor(pairs), nil
|
||||
}
|
||||
|
||||
func (b *Bucket) getAll() ([]kv.Pair, error) {
|
||||
pairs := []kv.Pair{}
|
||||
func (b *Bucket) getAll(o *kv.CursorHints) ([]kv.Pair, error) {
|
||||
fn := o.KeyPredicateFn
|
||||
|
||||
var pairs []kv.Pair
|
||||
var err error
|
||||
b.btree.Ascend(func(i btree.Item) bool {
|
||||
j, ok := i.(*item)
|
||||
|
@ -220,7 +227,10 @@ func (b *Bucket) getAll() ([]kv.Pair, error) {
|
|||
return false
|
||||
}
|
||||
|
||||
pairs = append(pairs, kv.Pair{Key: j.key, Value: j.value})
|
||||
if fn == nil || fn(j.key) {
|
||||
pairs = append(pairs, kv.Pair{Key: j.key, Value: j.value})
|
||||
}
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
|
|
131
inmem/kv_test.go
131
inmem/kv_test.go
|
@ -1,11 +1,15 @@
|
|||
package inmem_test
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"math"
|
||||
"os"
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/influxdata/influxdb/inmem"
|
||||
"github.com/influxdata/influxdb/kv"
|
||||
platformtesting "github.com/influxdata/influxdb/testing"
|
||||
|
@ -79,3 +83,130 @@ func TestKVStore_Buckets(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestKVStore_Bucket_CursorHintKeyPredicate(t *testing.T) {
|
||||
s := inmem.NewKVStore()
|
||||
|
||||
bucket := "urm"
|
||||
fillBucket(t, s, bucket, 10)
|
||||
|
||||
_ = s.View(context.Background(), func(tx kv.Tx) error {
|
||||
b, err := tx.Bucket([]byte(bucket))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cur, _ := b.Cursor(kv.WithCursorHintKeyPredicate(func(key []byte) bool {
|
||||
return len(key) < 32 || string(key[16:]) == "8d5dc900004589c3"
|
||||
}))
|
||||
|
||||
count := 0
|
||||
for k, _ := cur.First(); len(k) > 0; k, _ = cur.Next() {
|
||||
count++
|
||||
}
|
||||
|
||||
if exp, got := 1, count; got != exp {
|
||||
t.Errorf("unexpected number of keys, -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func openCursor(t testing.TB, s *inmem.KVStore, bucket string, fn func(cur kv.Cursor), hints ...kv.CursorHint) {
|
||||
t.Helper()
|
||||
|
||||
_ = s.View(context.Background(), func(tx kv.Tx) error {
|
||||
b, err := tx.Bucket([]byte(bucket))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
cur, err := b.Cursor(hints...)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if fn != nil {
|
||||
fn(cur)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkKVStore_Bucket_Cursor(b *testing.B) {
|
||||
scanAll := func(cur kv.Cursor) {
|
||||
for k, v := cur.First(); k != nil; k, v = cur.Next() {
|
||||
_, _ = k, v
|
||||
}
|
||||
}
|
||||
|
||||
searchKey := "629ffa00003dd2ce"
|
||||
predicate := kv.KeyPredicateFunc(func(key []byte) bool {
|
||||
return len(key) < 32 || string(key[16:]) == searchKey
|
||||
})
|
||||
|
||||
b.Run("16000 keys", func(b *testing.B) {
|
||||
s := inmem.NewKVStore()
|
||||
bucket := "urm"
|
||||
fillBucket(b, s, bucket, 0)
|
||||
|
||||
b.Run("without hint", func(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
openCursor(b, s, bucket, scanAll)
|
||||
}
|
||||
})
|
||||
|
||||
b.Run("with hint", func(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
openCursor(b, s, bucket, scanAll, kv.WithCursorHintKeyPredicate(predicate))
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
const sourceFile = "kvdata/keys.txt"
|
||||
|
||||
func fillBucket(t testing.TB, s *inmem.KVStore, bucket string, lines int) {
|
||||
t.Helper()
|
||||
err := s.Update(context.Background(), func(tx kv.Tx) error {
|
||||
b, err := tx.Bucket([]byte(bucket))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
f, err := os.Open(sourceFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
if lines == 0 {
|
||||
lines = math.MaxInt64
|
||||
}
|
||||
|
||||
scan := bufio.NewScanner(bufio.NewReader(f))
|
||||
for scan.Scan() {
|
||||
var key []byte
|
||||
key = append(key, scan.Bytes()...)
|
||||
_ = b.Put(key, nil)
|
||||
lines--
|
||||
if lines <= 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
16
kv/store.go
16
kv/store.go
|
@ -38,9 +38,12 @@ type Tx interface {
|
|||
WithContext(ctx context.Context)
|
||||
}
|
||||
|
||||
type KeyPredicateFunc func(key []byte) bool
|
||||
|
||||
type CursorHints struct {
|
||||
KeyPrefix *string
|
||||
KeyStart *string
|
||||
KeyPrefix *string
|
||||
KeyStart *string
|
||||
KeyPredicateFn KeyPredicateFunc
|
||||
}
|
||||
|
||||
// CursorHint configures CursorHints
|
||||
|
@ -64,6 +67,15 @@ func WithCursorHintKeyStart(start string) CursorHint {
|
|||
}
|
||||
}
|
||||
|
||||
// WithCursorHintKeyPredicate is a hint to the store
|
||||
// to return only key / values which return true for the
|
||||
// f.
|
||||
func WithCursorHintKeyPredicate(f KeyPredicateFunc) CursorHint {
|
||||
return func(o *CursorHints) {
|
||||
o.KeyPredicateFn = f
|
||||
}
|
||||
}
|
||||
|
||||
// Bucket is the abstraction used to perform get/put/delete/get-many operations
|
||||
// in a key value store.
|
||||
type Bucket interface {
|
||||
|
|
38
kv/urm.go
38
kv/urm.go
|
@ -97,10 +97,36 @@ func (s *Service) FindUserResourceMappings(ctx context.Context, filter influxdb.
|
|||
return ms, len(ms), nil
|
||||
}
|
||||
|
||||
func userResourceMappingPredicate(filter influxdb.UserResourceMappingFilter) KeyPredicateFunc {
|
||||
switch {
|
||||
case filter.ResourceID.Valid() && filter.UserID.Valid():
|
||||
keyPredicate := filter.ResourceID.String() + filter.UserID.String()
|
||||
return func(key []byte) bool {
|
||||
return len(key) >= 32 && string(key[:32]) == keyPredicate
|
||||
}
|
||||
|
||||
case !filter.ResourceID.Valid() && filter.UserID.Valid():
|
||||
keyPredicate := filter.UserID.String()
|
||||
return func(key []byte) bool {
|
||||
return len(key) >= 32 && string(key[16:32]) == keyPredicate
|
||||
}
|
||||
|
||||
case filter.ResourceID.Valid() && !filter.UserID.Valid():
|
||||
keyPredicate := filter.ResourceID.String()
|
||||
return func(key []byte) bool {
|
||||
return len(key) >= 16 && string(key[:16]) == keyPredicate
|
||||
}
|
||||
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) findUserResourceMappings(ctx context.Context, tx Tx, filter influxdb.UserResourceMappingFilter) ([]*influxdb.UserResourceMapping, error) {
|
||||
ms := []*influxdb.UserResourceMapping{}
|
||||
keyPred := userResourceMappingPredicate(filter)
|
||||
filterFn := filterMappingsFn(filter)
|
||||
err := s.forEachUserResourceMapping(ctx, tx, func(m *influxdb.UserResourceMapping) bool {
|
||||
err := s.forEachUserResourceMapping(ctx, tx, keyPred, func(m *influxdb.UserResourceMapping) bool {
|
||||
if filterFn(m) {
|
||||
ms = append(ms, m)
|
||||
}
|
||||
|
@ -209,13 +235,17 @@ func userResourceKey(m *influxdb.UserResourceMapping) ([]byte, error) {
|
|||
return key, nil
|
||||
}
|
||||
|
||||
func (s *Service) forEachUserResourceMapping(ctx context.Context, tx Tx, fn func(*influxdb.UserResourceMapping) bool) error {
|
||||
func (s *Service) forEachUserResourceMapping(ctx context.Context, tx Tx, pred KeyPredicateFunc, fn func(*influxdb.UserResourceMapping) bool) error {
|
||||
b, err := tx.Bucket(urmBucket)
|
||||
if err != nil {
|
||||
return UnavailableURMServiceError(err)
|
||||
}
|
||||
|
||||
cur, err := b.Cursor()
|
||||
var cur Cursor
|
||||
if pred != nil {
|
||||
cur, err = b.Cursor(WithCursorHintKeyPredicate(pred))
|
||||
} else {
|
||||
cur, err = b.Cursor()
|
||||
}
|
||||
if err != nil {
|
||||
return UnavailableURMServiceError(err)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,70 @@
|
|||
package kv
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/influxdata/influxdb"
|
||||
)
|
||||
|
||||
func Test_userResourceMappingPredicate(t *testing.T) {
|
||||
mk := func(rid, uid influxdb.ID) (urm *influxdb.UserResourceMapping, key []byte) {
|
||||
t.Helper()
|
||||
urm = &influxdb.UserResourceMapping{UserID: rid, ResourceID: uid}
|
||||
key, err := userResourceKey(urm)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return urm, key
|
||||
}
|
||||
|
||||
t.Run("match only ResourceID", func(t *testing.T) {
|
||||
u, k := mk(10, 20)
|
||||
f := influxdb.UserResourceMappingFilter{ResourceID: u.ResourceID}
|
||||
fn := userResourceMappingPredicate(f)
|
||||
if got, exp := fn(k), true; got != exp {
|
||||
t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
|
||||
_, k = mk(10, 21)
|
||||
if got, exp := fn(k), false; got != exp {
|
||||
t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("match only UserID", func(t *testing.T) {
|
||||
u, k := mk(10, 20)
|
||||
f := influxdb.UserResourceMappingFilter{UserID: u.UserID}
|
||||
fn := userResourceMappingPredicate(f)
|
||||
if got, exp := fn(k), true; got != exp {
|
||||
t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
|
||||
_, k = mk(11, 20)
|
||||
if got, exp := fn(k), false; got != exp {
|
||||
t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("match ResourceID and UserID", func(t *testing.T) {
|
||||
u, k := mk(10, 20)
|
||||
f := influxdb.UserResourceMappingFilter{ResourceID: u.ResourceID, UserID: u.UserID}
|
||||
fn := userResourceMappingPredicate(f)
|
||||
if got, exp := fn(k), true; got != exp {
|
||||
t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
|
||||
_, k = mk(11, 20)
|
||||
if got, exp := fn(k), false; got != exp {
|
||||
t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("no match function", func(t *testing.T) {
|
||||
f := influxdb.UserResourceMappingFilter{}
|
||||
fn := userResourceMappingPredicate(f)
|
||||
if fn != nil {
|
||||
t.Errorf("expected nil")
|
||||
}
|
||||
})
|
||||
}
|
Loading…
Reference in New Issue