Merge pull request #16019 from influxdata/sgc/issue/5365
fix(kv): Add push down predicate hint to filter by keys and valuespull/16035/head
commit
895e7c66d0
1
go.mod
1
go.mod
|
|
@ -13,6 +13,7 @@ require (
|
|||
github.com/benbjohnson/tmpl v1.0.0
|
||||
github.com/boltdb/bolt v1.3.1 // indirect
|
||||
github.com/bouk/httprouter v0.0.0-20160817010721-ee8b3818a7f5
|
||||
github.com/buger/jsonparser v0.0.0-20191004114745-ee4c978eae7e
|
||||
github.com/cespare/xxhash v1.1.0
|
||||
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect
|
||||
github.com/coreos/bbolt v1.3.1-coreos.6
|
||||
|
|
|
|||
2
go.sum
2
go.sum
|
|
@ -64,6 +64,8 @@ github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
|
|||
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
|
||||
github.com/bouk/httprouter v0.0.0-20160817010721-ee8b3818a7f5 h1:kS0dw4K730x7cxT+bVyTyYJZHuSoH7ofSr/Ijit56Qw=
|
||||
github.com/bouk/httprouter v0.0.0-20160817010721-ee8b3818a7f5/go.mod h1:CDReaxg1cmLrtcasZy43l4EYPAknXLiQSrb7tLw5zXM=
|
||||
github.com/buger/jsonparser v0.0.0-20191004114745-ee4c978eae7e h1:oJCXMss/3rg5F6Poy9wG3JQusc58Mzk5B9Z6wSnssNE=
|
||||
github.com/buger/jsonparser v0.0.0-20191004114745-ee4c978eae7e/go.mod h1:errmMKH8tTB49UR2A8C8DPYkyudelsYJwJFaZHQ6ik8=
|
||||
github.com/c-bata/go-prompt v0.2.2 h1:uyKRz6Z6DUyj49QVijyM339UJV9yhbr70gESwbNU3e0=
|
||||
github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34=
|
||||
github.com/caarlos0/ctrlc v1.0.0 h1:2DtF8GSIcajgffDFJzyG15vO+1PuBWOMUdFut7NnXhw=
|
||||
|
|
|
|||
40
inmem/kv.go
40
inmem/kv.go
|
|
@ -7,7 +7,6 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/google/btree"
|
||||
|
||||
"github.com/influxdata/influxdb/kv"
|
||||
)
|
||||
|
||||
|
|
@ -15,12 +14,14 @@ import (
|
|||
type KVStore struct {
|
||||
mu sync.RWMutex
|
||||
buckets map[string]*Bucket
|
||||
ro map[string]*bucket
|
||||
}
|
||||
|
||||
// NewKVStore creates an instance of a KVStore.
|
||||
func NewKVStore() *KVStore {
|
||||
return &KVStore{
|
||||
buckets: map[string]*Bucket{},
|
||||
ro: map[string]*bucket{},
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -30,6 +31,7 @@ func (s *KVStore) View(ctx context.Context, fn func(kv.Tx) error) error {
|
|||
defer s.mu.RUnlock()
|
||||
if s.buckets == nil {
|
||||
s.buckets = map[string]*Bucket{}
|
||||
s.ro = map[string]*bucket{}
|
||||
}
|
||||
return fn(&Tx{
|
||||
kv: s,
|
||||
|
|
@ -44,6 +46,7 @@ func (s *KVStore) Update(ctx context.Context, fn func(kv.Tx) error) error {
|
|||
defer s.mu.Unlock()
|
||||
if s.buckets == nil {
|
||||
s.buckets = map[string]*Bucket{}
|
||||
s.ro = map[string]*bucket{}
|
||||
}
|
||||
|
||||
return fn(&Tx{
|
||||
|
|
@ -99,16 +102,11 @@ func (t *Tx) createBucketIfNotExists(b []byte) (kv.Bucket, error) {
|
|||
if !ok {
|
||||
bkt = &Bucket{btree.New(2)}
|
||||
t.kv.buckets[string(b)] = bkt
|
||||
return &bucket{
|
||||
Bucket: bkt,
|
||||
writable: t.writable,
|
||||
}, nil
|
||||
t.kv.ro[string(b)] = &bucket{Bucket: bkt}
|
||||
return bkt, nil
|
||||
}
|
||||
|
||||
return &bucket{
|
||||
Bucket: bkt,
|
||||
writable: t.writable,
|
||||
}, nil
|
||||
return bkt, nil
|
||||
}
|
||||
|
||||
return nil, kv.ErrTxNotWritable
|
||||
|
|
@ -121,10 +119,11 @@ func (t *Tx) Bucket(b []byte) (kv.Bucket, error) {
|
|||
return t.createBucketIfNotExists(b)
|
||||
}
|
||||
|
||||
return &bucket{
|
||||
Bucket: bkt,
|
||||
writable: t.writable,
|
||||
}, nil
|
||||
if t.writable {
|
||||
return bkt, nil
|
||||
}
|
||||
|
||||
return t.kv.ro[string(b)], nil
|
||||
}
|
||||
|
||||
// Bucket is a btree that implements kv.Bucket.
|
||||
|
|
@ -134,24 +133,17 @@ type Bucket struct {
|
|||
|
||||
type bucket struct {
|
||||
kv.Bucket
|
||||
writable bool
|
||||
}
|
||||
|
||||
// Put wraps the put method of a kv bucket and ensures that the
|
||||
// bucket is writable.
|
||||
func (b *bucket) Put(key, value []byte) error {
|
||||
if b.writable {
|
||||
return b.Bucket.Put(key, value)
|
||||
}
|
||||
func (b *bucket) Put(_, _ []byte) error {
|
||||
return kv.ErrTxNotWritable
|
||||
}
|
||||
|
||||
// Delete wraps the delete method of a kv bucket and ensures that the
|
||||
// bucket is writable.
|
||||
func (b *bucket) Delete(key []byte) error {
|
||||
if b.writable {
|
||||
return b.Bucket.Delete(key)
|
||||
}
|
||||
func (b *bucket) Delete(_ []byte) error {
|
||||
return kv.ErrTxNotWritable
|
||||
}
|
||||
|
||||
|
|
@ -216,7 +208,7 @@ func (b *Bucket) Cursor(opts ...kv.CursorHint) (kv.Cursor, error) {
|
|||
}
|
||||
|
||||
func (b *Bucket) getAll(o *kv.CursorHints) ([]kv.Pair, error) {
|
||||
fn := o.KeyPredicateFn
|
||||
fn := o.PredicateFn
|
||||
|
||||
var pairs []kv.Pair
|
||||
var err error
|
||||
|
|
@ -227,7 +219,7 @@ func (b *Bucket) getAll(o *kv.CursorHints) ([]kv.Pair, error) {
|
|||
return false
|
||||
}
|
||||
|
||||
if fn == nil || fn(j.key) {
|
||||
if fn == nil || fn(j.key, j.value) {
|
||||
pairs = append(pairs, kv.Pair{Key: j.key, Value: j.value})
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -84,32 +84,58 @@ func TestKVStore_Buckets(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestKVStore_Bucket_CursorHintKeyPredicate(t *testing.T) {
|
||||
func TestKVStore_Bucket_CursorHintPredicate(t *testing.T) {
|
||||
s := inmem.NewKVStore()
|
||||
|
||||
bucket := "urm"
|
||||
fillBucket(t, s, bucket, 10)
|
||||
|
||||
_ = s.View(context.Background(), func(tx kv.Tx) error {
|
||||
b, err := tx.Bucket([]byte(bucket))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.Run("filter by key", func(t *testing.T) {
|
||||
_ = s.View(context.Background(), func(tx kv.Tx) error {
|
||||
b, err := tx.Bucket([]byte(bucket))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cur, _ := b.Cursor(kv.WithCursorHintKeyPredicate(func(key []byte) bool {
|
||||
return len(key) < 32 || string(key[16:]) == "8d5dc900004589c3"
|
||||
}))
|
||||
cur, _ := b.Cursor(kv.WithCursorHintPredicate(func(key, _ []byte) bool {
|
||||
return len(key) < 32 || string(key[16:]) == "8d5dc900004589c3"
|
||||
}))
|
||||
|
||||
count := 0
|
||||
for k, _ := cur.First(); len(k) > 0; k, _ = cur.Next() {
|
||||
count++
|
||||
}
|
||||
count := 0
|
||||
for k, _ := cur.First(); len(k) > 0; k, _ = cur.Next() {
|
||||
count++
|
||||
}
|
||||
|
||||
if exp, got := 1, count; got != exp {
|
||||
t.Errorf("unexpected number of keys, -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
if exp, got := 1, count; got != exp {
|
||||
t.Errorf("unexpected number of keys, -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
|
||||
return nil
|
||||
return nil
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("filter by value", func(t *testing.T) {
|
||||
_ = s.View(context.Background(), func(tx kv.Tx) error {
|
||||
b, err := tx.Bucket([]byte(bucket))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cur, _ := b.Cursor(kv.WithCursorHintPredicate(func(_, val []byte) bool {
|
||||
return len(val) < 32 || string(val[16:]) == "8d5dc900004589c3"
|
||||
}))
|
||||
|
||||
count := 0
|
||||
for k, _ := cur.First(); len(k) > 0; k, _ = cur.Next() {
|
||||
count++
|
||||
}
|
||||
|
||||
if exp, got := 1, count; got != exp {
|
||||
t.Errorf("unexpected number of keys, -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -143,7 +169,7 @@ func BenchmarkKVStore_Bucket_Cursor(b *testing.B) {
|
|||
}
|
||||
|
||||
searchKey := "629ffa00003dd2ce"
|
||||
predicate := kv.KeyPredicateFunc(func(key []byte) bool {
|
||||
predicate := kv.CursorPredicateFunc(func(key, _ []byte) bool {
|
||||
return len(key) < 32 || string(key[16:]) == searchKey
|
||||
})
|
||||
|
||||
|
|
@ -166,7 +192,7 @@ func BenchmarkKVStore_Bucket_Cursor(b *testing.B) {
|
|||
b.ReportAllocs()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
openCursor(b, s, bucket, scanAll, kv.WithCursorHintKeyPredicate(predicate))
|
||||
openCursor(b, s, bucket, scanAll, kv.WithCursorHintPredicate(predicate))
|
||||
}
|
||||
})
|
||||
})
|
||||
|
|
@ -196,7 +222,7 @@ func fillBucket(t testing.TB, s *inmem.KVStore, bucket string, lines int64) {
|
|||
for scan.Scan() {
|
||||
var key []byte
|
||||
key = append(key, scan.Bytes()...)
|
||||
_ = b.Put(key, nil)
|
||||
_ = b.Put(key, key)
|
||||
lines--
|
||||
if lines <= 0 {
|
||||
break
|
||||
|
|
|
|||
|
|
@ -0,0 +1,107 @@
|
|||
package inmem_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/inmem"
|
||||
"github.com/influxdata/influxdb/kv"
|
||||
"github.com/influxdata/influxdb/snowflake"
|
||||
)
|
||||
|
||||
var (
|
||||
taskBucket = []byte("tasksv1")
|
||||
organizationBucket = []byte("organizationsv1")
|
||||
authBucket = []byte("authorizationsv1")
|
||||
idgen influxdb.IDGenerator = snowflake.NewIDGenerator()
|
||||
)
|
||||
|
||||
func BenchmarkFindTaskByID_CursorHints(b *testing.B) {
|
||||
kvs := inmem.NewKVStore()
|
||||
ctx := context.Background()
|
||||
_ = kvs.Update(ctx, func(tx kv.Tx) error {
|
||||
createData(b, tx)
|
||||
createTasks(b, tx)
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
s := kv.NewService(kvs)
|
||||
_ = s.Initialize(ctx)
|
||||
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, _ = s.FindTaskByID(ctx, 1)
|
||||
}
|
||||
}
|
||||
|
||||
func createData(tb testing.TB, tx kv.Tx) {
|
||||
tb.Helper()
|
||||
|
||||
authB, err := tx.Bucket(authBucket)
|
||||
if err != nil {
|
||||
tb.Fatal("authBucket:", err)
|
||||
}
|
||||
orgB, err := tx.Bucket(organizationBucket)
|
||||
if err != nil {
|
||||
tb.Fatal("organizationBucket:", err)
|
||||
}
|
||||
|
||||
a := influxdb.Authorization{
|
||||
Permissions: influxdb.OperPermissions(),
|
||||
}
|
||||
o := influxdb.Organization{}
|
||||
|
||||
var orgID = influxdb.ID(1e4)
|
||||
var userID = influxdb.ID(1e7)
|
||||
for i := 1; i <= 1000; i++ {
|
||||
o.ID = orgID
|
||||
val := mustMarshal(tb, &o)
|
||||
key, _ := a.OrgID.Encode()
|
||||
_ = orgB.Put(key, val)
|
||||
|
||||
a.OrgID = o.ID
|
||||
orgID++
|
||||
|
||||
for j := 1; j <= 5; j++ {
|
||||
a.ID = idgen.ID()
|
||||
a.UserID = userID
|
||||
userID++
|
||||
|
||||
val = mustMarshal(tb, &a)
|
||||
key, _ = a.ID.Encode()
|
||||
_ = authB.Put(key, val)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func createTasks(tb testing.TB, tx kv.Tx) {
|
||||
tb.Helper()
|
||||
|
||||
taskB, err := tx.Bucket(taskBucket)
|
||||
if err != nil {
|
||||
tb.Fatal("taskBucket:", err)
|
||||
}
|
||||
|
||||
t := influxdb.Task{
|
||||
ID: 1,
|
||||
OrganizationID: 1e4,
|
||||
OwnerID: 1e7,
|
||||
}
|
||||
|
||||
val := mustMarshal(tb, &t)
|
||||
key, _ := t.ID.Encode()
|
||||
_ = taskB.Put(key, val)
|
||||
}
|
||||
|
||||
func mustMarshal(t testing.TB, v interface{}) []byte {
|
||||
t.Helper()
|
||||
d, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return d
|
||||
}
|
||||
74
kv/auth.go
74
kv/auth.go
|
|
@ -5,7 +5,9 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/buger/jsonparser"
|
||||
influxdb "github.com/influxdata/influxdb"
|
||||
jsonp "github.com/influxdata/influxdb/pkg/jsonparser"
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
@ -127,6 +129,59 @@ func (s *Service) findAuthorizationByToken(ctx context.Context, tx Tx, n string)
|
|||
return s.findAuthorizationByID(ctx, tx, id)
|
||||
}
|
||||
|
||||
func authorizationsPredicateFn(f influxdb.AuthorizationFilter) CursorPredicateFunc {
|
||||
// if any errors occur reading the JSON data, the predicate will always return true
|
||||
// to ensure the value is included and handled higher up.
|
||||
|
||||
if f.ID != nil {
|
||||
exp := *f.ID
|
||||
return func(_, value []byte) bool {
|
||||
got, err := jsonp.GetID(value, "id")
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
return got == exp
|
||||
}
|
||||
}
|
||||
|
||||
if f.Token != nil {
|
||||
exp := *f.Token
|
||||
return func(_, value []byte) bool {
|
||||
// it is assumed that token never has escaped string data
|
||||
got, _, _, err := jsonparser.Get(value, "token")
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
return string(got) == exp
|
||||
}
|
||||
}
|
||||
|
||||
var pred CursorPredicateFunc
|
||||
if f.OrgID != nil {
|
||||
exp := *f.OrgID
|
||||
pred = func(_, value []byte) bool {
|
||||
got, err := jsonp.GetID(value, "orgID")
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
|
||||
return got == exp
|
||||
}
|
||||
}
|
||||
|
||||
if f.UserID != nil {
|
||||
exp := *f.UserID
|
||||
prevFn := pred
|
||||
pred = func(key, value []byte) bool {
|
||||
prev := prevFn == nil || prevFn(key, value)
|
||||
got, exists, err := jsonp.GetOptionalID(value, "userID")
|
||||
return prev && ((exp == got && exists) || err != nil)
|
||||
}
|
||||
}
|
||||
|
||||
return pred
|
||||
}
|
||||
|
||||
func filterAuthorizationsFn(filter influxdb.AuthorizationFilter) func(a *influxdb.Authorization) bool {
|
||||
if filter.ID != nil {
|
||||
return func(a *influxdb.Authorization) bool {
|
||||
|
|
@ -225,9 +280,10 @@ func (s *Service) findAuthorizations(ctx context.Context, tx Tx, f influxdb.Auth
|
|||
f.OrgID = &o.ID
|
||||
}
|
||||
|
||||
as := []*influxdb.Authorization{}
|
||||
var as []*influxdb.Authorization
|
||||
pred := authorizationsPredicateFn(f)
|
||||
filterFn := filterAuthorizationsFn(f)
|
||||
err := s.forEachAuthorization(ctx, tx, func(a *influxdb.Authorization) bool {
|
||||
err := s.forEachAuthorization(ctx, tx, pred, func(a *influxdb.Authorization) bool {
|
||||
if filterFn(a) {
|
||||
as = append(as, a)
|
||||
}
|
||||
|
|
@ -369,19 +425,27 @@ func decodeAuthorization(b []byte, a *influxdb.Authorization) error {
|
|||
}
|
||||
|
||||
// forEachAuthorization will iterate through all authorizations while fn returns true.
|
||||
func (s *Service) forEachAuthorization(ctx context.Context, tx Tx, fn func(*influxdb.Authorization) bool) error {
|
||||
func (s *Service) forEachAuthorization(ctx context.Context, tx Tx, pred CursorPredicateFunc, fn func(*influxdb.Authorization) bool) error {
|
||||
b, err := tx.Bucket(authBucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cur, err := b.Cursor()
|
||||
var cur Cursor
|
||||
if pred != nil {
|
||||
cur, err = b.Cursor(WithCursorHintPredicate(pred))
|
||||
} else {
|
||||
cur, err = b.Cursor()
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for k, v := cur.First(); k != nil; k, v = cur.Next() {
|
||||
a := &influxdb.Authorization{}
|
||||
// preallocate Permissions to reduce multiple slice re-allocations
|
||||
a := &influxdb.Authorization{
|
||||
Permissions: make([]influxdb.Permission, 64),
|
||||
}
|
||||
|
||||
if err := decodeAuthorization(v, a); err != nil {
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -0,0 +1,149 @@
|
|||
package kv
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/influxdata/influxdb"
|
||||
)
|
||||
|
||||
func mustMarshal(t testing.TB, v interface{}) []byte {
|
||||
t.Helper()
|
||||
d, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
func Test_authorizationsPredicateFn(t *testing.T) {
|
||||
t.Run("ID", func(t *testing.T) {
|
||||
val := influxdb.ID(1)
|
||||
f := influxdb.AuthorizationFilter{ID: &val}
|
||||
fn := authorizationsPredicateFn(f)
|
||||
|
||||
t.Run("does match", func(t *testing.T) {
|
||||
a := &influxdb.Authorization{ID: val, OrgID: 2}
|
||||
if got, exp := fn(nil, mustMarshal(t, a)), true; got != exp {
|
||||
t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("does not match", func(t *testing.T) {
|
||||
a := &influxdb.Authorization{ID: 10, OrgID: 2}
|
||||
if got, exp := fn(nil, mustMarshal(t, a)), false; got != exp {
|
||||
t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("token", func(t *testing.T) {
|
||||
val := "token_token"
|
||||
f := influxdb.AuthorizationFilter{Token: &val}
|
||||
fn := authorizationsPredicateFn(f)
|
||||
|
||||
t.Run("does match", func(t *testing.T) {
|
||||
a := &influxdb.Authorization{ID: 10, OrgID: 2, Token: val}
|
||||
if got, exp := fn(nil, mustMarshal(t, a)), true; got != exp {
|
||||
t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("does not match", func(t *testing.T) {
|
||||
a := &influxdb.Authorization{ID: 10, OrgID: 2, Token: "no_no_no"}
|
||||
if got, exp := fn(nil, mustMarshal(t, a)), false; got != exp {
|
||||
t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("orgID", func(t *testing.T) {
|
||||
val := influxdb.ID(1)
|
||||
f := influxdb.AuthorizationFilter{OrgID: &val}
|
||||
fn := authorizationsPredicateFn(f)
|
||||
|
||||
t.Run("does match", func(t *testing.T) {
|
||||
a := &influxdb.Authorization{ID: 10, OrgID: val}
|
||||
if got, exp := fn(nil, mustMarshal(t, a)), true; got != exp {
|
||||
t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("does not match", func(t *testing.T) {
|
||||
a := &influxdb.Authorization{ID: 10, OrgID: 2}
|
||||
if got, exp := fn(nil, mustMarshal(t, a)), false; got != exp {
|
||||
t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("userID", func(t *testing.T) {
|
||||
val := influxdb.ID(1)
|
||||
f := influxdb.AuthorizationFilter{UserID: &val}
|
||||
fn := authorizationsPredicateFn(f)
|
||||
|
||||
t.Run("does match", func(t *testing.T) {
|
||||
a := &influxdb.Authorization{ID: 10, OrgID: 5, UserID: val}
|
||||
if got, exp := fn(nil, mustMarshal(t, a)), true; got != exp {
|
||||
t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("does not match", func(t *testing.T) {
|
||||
a := &influxdb.Authorization{ID: 10, OrgID: 5, UserID: 2}
|
||||
if got, exp := fn(nil, mustMarshal(t, a)), false; got != exp {
|
||||
t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("missing userID", func(t *testing.T) {
|
||||
a := &influxdb.Authorization{ID: 10, OrgID: 5}
|
||||
if got, exp := fn(nil, mustMarshal(t, a)), false; got != exp {
|
||||
t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("orgID and userID", func(t *testing.T) {
|
||||
orgID := influxdb.ID(1)
|
||||
userID := influxdb.ID(10)
|
||||
f := influxdb.AuthorizationFilter{OrgID: &orgID, UserID: &userID}
|
||||
fn := authorizationsPredicateFn(f)
|
||||
|
||||
t.Run("does match", func(t *testing.T) {
|
||||
a := &influxdb.Authorization{ID: 10, OrgID: orgID, UserID: userID}
|
||||
if got, exp := fn(nil, mustMarshal(t, a)), true; got != exp {
|
||||
t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("org match user not match", func(t *testing.T) {
|
||||
a := &influxdb.Authorization{ID: 10, OrgID: orgID, UserID: 11}
|
||||
if got, exp := fn(nil, mustMarshal(t, a)), false; got != exp {
|
||||
t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("org not match user match", func(t *testing.T) {
|
||||
a := &influxdb.Authorization{ID: 10, OrgID: 2, UserID: userID}
|
||||
if got, exp := fn(nil, mustMarshal(t, a)), false; got != exp {
|
||||
t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("org and user not match", func(t *testing.T) {
|
||||
a := &influxdb.Authorization{ID: 10, OrgID: 2, UserID: 11}
|
||||
if got, exp := fn(nil, mustMarshal(t, a)), false; got != exp {
|
||||
t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("org match user missing", func(t *testing.T) {
|
||||
a := &influxdb.Authorization{ID: 10, OrgID: orgID}
|
||||
if got, exp := fn(nil, mustMarshal(t, a)), false; got != exp {
|
||||
t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
19
kv/store.go
19
kv/store.go
|
|
@ -38,12 +38,12 @@ type Tx interface {
|
|||
WithContext(ctx context.Context)
|
||||
}
|
||||
|
||||
type KeyPredicateFunc func(key []byte) bool
|
||||
type CursorPredicateFunc func(key, value []byte) bool
|
||||
|
||||
type CursorHints struct {
|
||||
KeyPrefix *string
|
||||
KeyStart *string
|
||||
KeyPredicateFn KeyPredicateFunc
|
||||
KeyPrefix *string
|
||||
KeyStart *string
|
||||
PredicateFn CursorPredicateFunc
|
||||
}
|
||||
|
||||
// CursorHint configures CursorHints
|
||||
|
|
@ -67,12 +67,17 @@ func WithCursorHintKeyStart(start string) CursorHint {
|
|||
}
|
||||
}
|
||||
|
||||
// WithCursorHintKeyPredicate is a hint to the store
|
||||
// WithCursorHintPredicate is a hint to the store
|
||||
// to return only key / values which return true for the
|
||||
// f.
|
||||
func WithCursorHintKeyPredicate(f KeyPredicateFunc) CursorHint {
|
||||
//
|
||||
// The primary concern of the predicate is to improve performance.
|
||||
// Therefore, it should perform tests on the data at minimal cost.
|
||||
// If the predicate has no meaningful impact on reducing memory or
|
||||
// CPU usage, there is no benefit to using it.
|
||||
func WithCursorHintPredicate(f CursorPredicateFunc) CursorHint {
|
||||
return func(o *CursorHints) {
|
||||
o.KeyPredicateFn = f
|
||||
o.PredicateFn = f
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
16
kv/urm.go
16
kv/urm.go
|
|
@ -97,23 +97,23 @@ func (s *Service) FindUserResourceMappings(ctx context.Context, filter influxdb.
|
|||
return ms, len(ms), nil
|
||||
}
|
||||
|
||||
func userResourceMappingPredicate(filter influxdb.UserResourceMappingFilter) KeyPredicateFunc {
|
||||
func userResourceMappingPredicate(filter influxdb.UserResourceMappingFilter) CursorPredicateFunc {
|
||||
switch {
|
||||
case filter.ResourceID.Valid() && filter.UserID.Valid():
|
||||
keyPredicate := filter.ResourceID.String() + filter.UserID.String()
|
||||
return func(key []byte) bool {
|
||||
return func(key, _ []byte) bool {
|
||||
return len(key) >= 32 && string(key[:32]) == keyPredicate
|
||||
}
|
||||
|
||||
case !filter.ResourceID.Valid() && filter.UserID.Valid():
|
||||
keyPredicate := filter.UserID.String()
|
||||
return func(key []byte) bool {
|
||||
return func(key, _ []byte) bool {
|
||||
return len(key) >= 32 && string(key[16:32]) == keyPredicate
|
||||
}
|
||||
|
||||
case filter.ResourceID.Valid() && !filter.UserID.Valid():
|
||||
keyPredicate := filter.ResourceID.String()
|
||||
return func(key []byte) bool {
|
||||
return func(key, _ []byte) bool {
|
||||
return len(key) >= 16 && string(key[:16]) == keyPredicate
|
||||
}
|
||||
|
||||
|
|
@ -124,9 +124,9 @@ func userResourceMappingPredicate(filter influxdb.UserResourceMappingFilter) Key
|
|||
|
||||
func (s *Service) findUserResourceMappings(ctx context.Context, tx Tx, filter influxdb.UserResourceMappingFilter) ([]*influxdb.UserResourceMapping, error) {
|
||||
ms := []*influxdb.UserResourceMapping{}
|
||||
keyPred := userResourceMappingPredicate(filter)
|
||||
pred := userResourceMappingPredicate(filter)
|
||||
filterFn := filterMappingsFn(filter)
|
||||
err := s.forEachUserResourceMapping(ctx, tx, keyPred, func(m *influxdb.UserResourceMapping) bool {
|
||||
err := s.forEachUserResourceMapping(ctx, tx, pred, func(m *influxdb.UserResourceMapping) bool {
|
||||
if filterFn(m) {
|
||||
ms = append(ms, m)
|
||||
}
|
||||
|
|
@ -235,14 +235,14 @@ func userResourceKey(m *influxdb.UserResourceMapping) ([]byte, error) {
|
|||
return key, nil
|
||||
}
|
||||
|
||||
func (s *Service) forEachUserResourceMapping(ctx context.Context, tx Tx, pred KeyPredicateFunc, fn func(*influxdb.UserResourceMapping) bool) error {
|
||||
func (s *Service) forEachUserResourceMapping(ctx context.Context, tx Tx, pred CursorPredicateFunc, fn func(*influxdb.UserResourceMapping) bool) error {
|
||||
b, err := tx.Bucket(urmBucket)
|
||||
if err != nil {
|
||||
return UnavailableURMServiceError(err)
|
||||
}
|
||||
var cur Cursor
|
||||
if pred != nil {
|
||||
cur, err = b.Cursor(WithCursorHintKeyPredicate(pred))
|
||||
cur, err = b.Cursor(WithCursorHintPredicate(pred))
|
||||
} else {
|
||||
cur, err = b.Cursor()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,12 +22,12 @@ func Test_userResourceMappingPredicate(t *testing.T) {
|
|||
u, k := mk(10, 20)
|
||||
f := influxdb.UserResourceMappingFilter{ResourceID: u.ResourceID}
|
||||
fn := userResourceMappingPredicate(f)
|
||||
if got, exp := fn(k), true; got != exp {
|
||||
if got, exp := fn(k, nil), true; got != exp {
|
||||
t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
|
||||
_, k = mk(10, 21)
|
||||
if got, exp := fn(k), false; got != exp {
|
||||
if got, exp := fn(k, nil), false; got != exp {
|
||||
t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
})
|
||||
|
|
@ -36,12 +36,12 @@ func Test_userResourceMappingPredicate(t *testing.T) {
|
|||
u, k := mk(10, 20)
|
||||
f := influxdb.UserResourceMappingFilter{UserID: u.UserID}
|
||||
fn := userResourceMappingPredicate(f)
|
||||
if got, exp := fn(k), true; got != exp {
|
||||
if got, exp := fn(k, nil), true; got != exp {
|
||||
t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
|
||||
_, k = mk(11, 20)
|
||||
if got, exp := fn(k), false; got != exp {
|
||||
if got, exp := fn(k, nil), false; got != exp {
|
||||
t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
})
|
||||
|
|
@ -50,12 +50,12 @@ func Test_userResourceMappingPredicate(t *testing.T) {
|
|||
u, k := mk(10, 20)
|
||||
f := influxdb.UserResourceMappingFilter{ResourceID: u.ResourceID, UserID: u.UserID}
|
||||
fn := userResourceMappingPredicate(f)
|
||||
if got, exp := fn(k), true; got != exp {
|
||||
if got, exp := fn(k, nil), true; got != exp {
|
||||
t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
|
||||
_, k = mk(11, 20)
|
||||
if got, exp := fn(k), false; got != exp {
|
||||
if got, exp := fn(k, nil), false; got != exp {
|
||||
t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp))
|
||||
}
|
||||
})
|
||||
|
|
|
|||
|
|
@ -0,0 +1,45 @@
|
|||
package jsonparser
|
||||
|
||||
import (
|
||||
"github.com/buger/jsonparser"
|
||||
"github.com/influxdata/influxdb"
|
||||
)
|
||||
|
||||
// GetID returns an influxdb.ID for the specified keys path or an error if
|
||||
// the value cannot be decoded or does not exist.
|
||||
func GetID(data []byte, keys ...string) (val influxdb.ID, err error) {
|
||||
v, _, _, err := jsonparser.Get(data, keys...)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var id influxdb.ID
|
||||
err = id.Decode(v)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// GetOptionalID returns an influxdb.ID for the specified keys path or an error if
|
||||
// the value cannot be decoded. The value of exists will be false if the keys path
|
||||
// does not exist.
|
||||
func GetOptionalID(data []byte, keys ...string) (val influxdb.ID, exists bool, err error) {
|
||||
v, typ, _, err := jsonparser.Get(data, keys...)
|
||||
if typ == jsonparser.NotExist {
|
||||
return 0, false, nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return 0, false, err
|
||||
}
|
||||
|
||||
var id influxdb.ID
|
||||
err = id.Decode(v)
|
||||
if err != nil {
|
||||
return 0, false, err
|
||||
}
|
||||
|
||||
return id, true, nil
|
||||
}
|
||||
|
|
@ -0,0 +1,45 @@
|
|||
package jsonparser_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/pkg/jsonparser"
|
||||
)
|
||||
|
||||
func TestGetID(t *testing.T) {
|
||||
t.Run("decode valid id", func(t *testing.T) {
|
||||
json := `{ "id": "000000000000000a" }`
|
||||
got, err := jsonparser.GetID([]byte(json), "id")
|
||||
if err != nil {
|
||||
t.Error("unexpected error:", err)
|
||||
}
|
||||
|
||||
if exp := influxdb.ID(10); got != exp {
|
||||
t.Error("unexpected value: -got/+exp", cmp.Diff(got, exp))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("error invalid id", func(t *testing.T) {
|
||||
json := `{ "id": "00000000000a" }`
|
||||
_, err := jsonparser.GetID([]byte(json), "id")
|
||||
if err == nil {
|
||||
t.Error("expected error")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetOptionalID(t *testing.T) {
|
||||
t.Run("missing id", func(t *testing.T) {
|
||||
json := `{ "name": "foo" }`
|
||||
_, got, err := jsonparser.GetOptionalID([]byte(json), "id")
|
||||
if err != nil {
|
||||
t.Error("unexpected error:", err)
|
||||
}
|
||||
|
||||
if exp := false; got != exp {
|
||||
t.Error("unexpected value: -got/+exp", cmp.Diff(got, exp))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
@ -590,6 +590,44 @@ func KVCursorWithHints(
|
|||
},
|
||||
exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03", "bbb/00"},
|
||||
},
|
||||
{
|
||||
name: "predicate for key",
|
||||
fields: KVStoreFields{
|
||||
Bucket: []byte("bucket"),
|
||||
Pairs: pairs(
|
||||
"aa/00", "aa/01",
|
||||
"aaa/00", "aaa/01", "aaa/02", "aaa/03",
|
||||
"bbb/00", "bbb/01", "bbb/02"),
|
||||
},
|
||||
args: args{
|
||||
seek: "aaa",
|
||||
until: "aaa/03",
|
||||
hints: []kv.CursorHint{
|
||||
kv.WithCursorHintPredicate(func(key, _ []byte) bool {
|
||||
return len(key) < 3 || string(key[:3]) == "aaa"
|
||||
})},
|
||||
},
|
||||
exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03"},
|
||||
},
|
||||
{
|
||||
name: "predicate for value",
|
||||
fields: KVStoreFields{
|
||||
Bucket: []byte("bucket"),
|
||||
Pairs: pairs(
|
||||
"aa/00", "aa/01",
|
||||
"aaa/00", "aaa/01", "aaa/02", "aaa/03",
|
||||
"bbb/00", "bbb/01", "bbb/02"),
|
||||
},
|
||||
args: args{
|
||||
seek: "",
|
||||
until: "aa/01",
|
||||
hints: []kv.CursorHint{
|
||||
kv.WithCursorHintPredicate(func(_, val []byte) bool {
|
||||
return len(val) < 7 || string(val[:7]) == "val:aa/"
|
||||
})},
|
||||
},
|
||||
exp: []string{"aa/00", "aa/01"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
|
|
|
|||
Loading…
Reference in New Issue