From 36b0dc7da39e031d5e53b36cf2c67d38f9ec01d7 Mon Sep 17 00:00:00 2001 From: Stuart Carnie Date: Fri, 22 Nov 2019 10:34:20 -0700 Subject: [PATCH 1/4] chore(vendor): Add jsonparser dependency for filter predicates * Added jsonparser helper package to decode influxdb.ID --- go.mod | 1 + go.sum | 2 ++ pkg/jsonparser/jsonparser.go | 45 +++++++++++++++++++++++++++++++ pkg/jsonparser/jsonparser_test.go | 45 +++++++++++++++++++++++++++++++ 4 files changed, 93 insertions(+) create mode 100644 pkg/jsonparser/jsonparser.go create mode 100644 pkg/jsonparser/jsonparser_test.go diff --git a/go.mod b/go.mod index c220dcc223..4801415ace 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/benbjohnson/tmpl v1.0.0 github.com/boltdb/bolt v1.3.1 // indirect github.com/bouk/httprouter v0.0.0-20160817010721-ee8b3818a7f5 + github.com/buger/jsonparser v0.0.0-20191004114745-ee4c978eae7e github.com/cespare/xxhash v1.1.0 github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect github.com/coreos/bbolt v1.3.1-coreos.6 diff --git a/go.sum b/go.sum index f1460462b6..23d58ea438 100644 --- a/go.sum +++ b/go.sum @@ -64,6 +64,8 @@ github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/bouk/httprouter v0.0.0-20160817010721-ee8b3818a7f5 h1:kS0dw4K730x7cxT+bVyTyYJZHuSoH7ofSr/Ijit56Qw= github.com/bouk/httprouter v0.0.0-20160817010721-ee8b3818a7f5/go.mod h1:CDReaxg1cmLrtcasZy43l4EYPAknXLiQSrb7tLw5zXM= +github.com/buger/jsonparser v0.0.0-20191004114745-ee4c978eae7e h1:oJCXMss/3rg5F6Poy9wG3JQusc58Mzk5B9Z6wSnssNE= +github.com/buger/jsonparser v0.0.0-20191004114745-ee4c978eae7e/go.mod h1:errmMKH8tTB49UR2A8C8DPYkyudelsYJwJFaZHQ6ik8= github.com/c-bata/go-prompt v0.2.2 h1:uyKRz6Z6DUyj49QVijyM339UJV9yhbr70gESwbNU3e0= github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34= github.com/caarlos0/ctrlc v1.0.0 h1:2DtF8GSIcajgffDFJzyG15vO+1PuBWOMUdFut7NnXhw= diff --git a/pkg/jsonparser/jsonparser.go b/pkg/jsonparser/jsonparser.go new file mode 100644 index 0000000000..e07a84049c --- /dev/null +++ b/pkg/jsonparser/jsonparser.go @@ -0,0 +1,45 @@ +package jsonparser + +import ( + "github.com/buger/jsonparser" + "github.com/influxdata/influxdb" +) + +// GetID returns an influxdb.ID for the specified keys path or an error if +// the value cannot be decoded or does not exist. +func GetID(data []byte, keys ...string) (val influxdb.ID, err error) { + v, _, _, err := jsonparser.Get(data, keys...) + if err != nil { + return 0, err + } + + var id influxdb.ID + err = id.Decode(v) + if err != nil { + return 0, err + } + + return id, nil +} + +// GetOptionalID returns an influxdb.ID for the specified keys path or an error if +// the value cannot be decoded. The value of exists will be false if the keys path +// does not exist. +func GetOptionalID(data []byte, keys ...string) (val influxdb.ID, exists bool, err error) { + v, typ, _, err := jsonparser.Get(data, keys...) + if typ == jsonparser.NotExist { + return 0, false, nil + } + + if err != nil { + return 0, false, err + } + + var id influxdb.ID + err = id.Decode(v) + if err != nil { + return 0, false, err + } + + return id, true, nil +} diff --git a/pkg/jsonparser/jsonparser_test.go b/pkg/jsonparser/jsonparser_test.go new file mode 100644 index 0000000000..52dd82cb7b --- /dev/null +++ b/pkg/jsonparser/jsonparser_test.go @@ -0,0 +1,45 @@ +package jsonparser_test + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/pkg/jsonparser" +) + +func TestGetID(t *testing.T) { + t.Run("decode valid id", func(t *testing.T) { + json := `{ "id": "000000000000000a" }` + got, err := jsonparser.GetID([]byte(json), "id") + if err != nil { + t.Error("unexpected error:", err) + } + + if exp := influxdb.ID(10); got != exp { + t.Error("unexpected value: -got/+exp", cmp.Diff(got, exp)) + } + }) + + t.Run("error invalid id", func(t *testing.T) { + json := `{ "id": "00000000000a" }` + _, err := jsonparser.GetID([]byte(json), "id") + if err == nil { + t.Error("expected error") + } + }) +} + +func TestGetOptionalID(t *testing.T) { + t.Run("missing id", func(t *testing.T) { + json := `{ "name": "foo" }` + _, got, err := jsonparser.GetOptionalID([]byte(json), "id") + if err != nil { + t.Error("unexpected error:", err) + } + + if exp := false; got != exp { + t.Error("unexpected value: -got/+exp", cmp.Diff(got, exp)) + } + }) +} From fdb25560c497ce20ce52423daa5b8c70e4b2a5d2 Mon Sep 17 00:00:00 2001 From: Stuart Carnie Date: Fri, 22 Nov 2019 10:53:53 -0700 Subject: [PATCH 2/4] chore(inmem): Created a R/O bucket and cache object * Avoids bucket allocation calling View * Simplifies Put and Delete functions of R/O bucket --- inmem/kv.go | 36 ++++++++++++++---------------------- 1 file changed, 14 insertions(+), 22 deletions(-) diff --git a/inmem/kv.go b/inmem/kv.go index 9c6548065e..ddc9e8b196 100644 --- a/inmem/kv.go +++ b/inmem/kv.go @@ -7,7 +7,6 @@ import ( "sync" "github.com/google/btree" - "github.com/influxdata/influxdb/kv" ) @@ -15,12 +14,14 @@ import ( type KVStore struct { mu sync.RWMutex buckets map[string]*Bucket + ro map[string]*bucket } // NewKVStore creates an instance of a KVStore. func NewKVStore() *KVStore { return &KVStore{ buckets: map[string]*Bucket{}, + ro: map[string]*bucket{}, } } @@ -30,6 +31,7 @@ func (s *KVStore) View(ctx context.Context, fn func(kv.Tx) error) error { defer s.mu.RUnlock() if s.buckets == nil { s.buckets = map[string]*Bucket{} + s.ro = map[string]*bucket{} } return fn(&Tx{ kv: s, @@ -44,6 +46,7 @@ func (s *KVStore) Update(ctx context.Context, fn func(kv.Tx) error) error { defer s.mu.Unlock() if s.buckets == nil { s.buckets = map[string]*Bucket{} + s.ro = map[string]*bucket{} } return fn(&Tx{ @@ -99,16 +102,11 @@ func (t *Tx) createBucketIfNotExists(b []byte) (kv.Bucket, error) { if !ok { bkt = &Bucket{btree.New(2)} t.kv.buckets[string(b)] = bkt - return &bucket{ - Bucket: bkt, - writable: t.writable, - }, nil + t.kv.ro[string(b)] = &bucket{Bucket: bkt} + return bkt, nil } - return &bucket{ - Bucket: bkt, - writable: t.writable, - }, nil + return bkt, nil } return nil, kv.ErrTxNotWritable @@ -121,10 +119,11 @@ func (t *Tx) Bucket(b []byte) (kv.Bucket, error) { return t.createBucketIfNotExists(b) } - return &bucket{ - Bucket: bkt, - writable: t.writable, - }, nil + if t.writable { + return bkt, nil + } + + return t.kv.ro[string(b)], nil } // Bucket is a btree that implements kv.Bucket. @@ -134,24 +133,17 @@ type Bucket struct { type bucket struct { kv.Bucket - writable bool } // Put wraps the put method of a kv bucket and ensures that the // bucket is writable. -func (b *bucket) Put(key, value []byte) error { - if b.writable { - return b.Bucket.Put(key, value) - } +func (b *bucket) Put(_, _ []byte) error { return kv.ErrTxNotWritable } // Delete wraps the delete method of a kv bucket and ensures that the // bucket is writable. -func (b *bucket) Delete(key []byte) error { - if b.writable { - return b.Bucket.Delete(key) - } +func (b *bucket) Delete(_ []byte) error { return kv.ErrTxNotWritable } From a5b29843cf8692a761994395a4f5b93e5307d137 Mon Sep 17 00:00:00 2001 From: Stuart Carnie Date: Fri, 22 Nov 2019 10:55:01 -0700 Subject: [PATCH 3/4] fix(kv): Add push down predicate hint to filter by keys and values MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A KV store may optionally implement a key predicate function to filter on keys or values, which may reduce memory and CPU usage. Expected FindByTaskID resource mapping improvements for a single user: Before: 206966085 ns/op   37672164 B/op   445060 allocs/op After:   1514118 ns/op      11184 B/op      131 allocs/op --- inmem/kv.go | 4 +- inmem/kv_test.go | 66 ++++++++++++------ inmem/task_test.go | 108 +++++++++++++++++++++++++++++ kv/auth.go | 74 ++++++++++++++++++-- kv/auth_private_test.go | 149 ++++++++++++++++++++++++++++++++++++++++ kv/store.go | 19 +++-- kv/urm.go | 16 ++--- kv/urm_private_test.go | 12 ++-- testing/kv.go | 38 ++++++++++ 9 files changed, 438 insertions(+), 48 deletions(-) create mode 100644 inmem/task_test.go create mode 100644 kv/auth_private_test.go diff --git a/inmem/kv.go b/inmem/kv.go index ddc9e8b196..da719fd11c 100644 --- a/inmem/kv.go +++ b/inmem/kv.go @@ -208,7 +208,7 @@ func (b *Bucket) Cursor(opts ...kv.CursorHint) (kv.Cursor, error) { } func (b *Bucket) getAll(o *kv.CursorHints) ([]kv.Pair, error) { - fn := o.KeyPredicateFn + fn := o.PredicateFn var pairs []kv.Pair var err error @@ -219,7 +219,7 @@ func (b *Bucket) getAll(o *kv.CursorHints) ([]kv.Pair, error) { return false } - if fn == nil || fn(j.key) { + if fn == nil || fn(j.key, j.value) { pairs = append(pairs, kv.Pair{Key: j.key, Value: j.value}) } diff --git a/inmem/kv_test.go b/inmem/kv_test.go index 6aa0f1ac38..1ac35a5a1f 100644 --- a/inmem/kv_test.go +++ b/inmem/kv_test.go @@ -84,32 +84,58 @@ func TestKVStore_Buckets(t *testing.T) { } } -func TestKVStore_Bucket_CursorHintKeyPredicate(t *testing.T) { +func TestKVStore_Bucket_CursorHintPredicate(t *testing.T) { s := inmem.NewKVStore() bucket := "urm" fillBucket(t, s, bucket, 10) - _ = s.View(context.Background(), func(tx kv.Tx) error { - b, err := tx.Bucket([]byte(bucket)) - if err != nil { - return err - } + t.Run("filter by key", func(t *testing.T) { + _ = s.View(context.Background(), func(tx kv.Tx) error { + b, err := tx.Bucket([]byte(bucket)) + if err != nil { + return err + } - cur, _ := b.Cursor(kv.WithCursorHintKeyPredicate(func(key []byte) bool { - return len(key) < 32 || string(key[16:]) == "8d5dc900004589c3" - })) + cur, _ := b.Cursor(kv.WithCursorHintPredicate(func(key, _ []byte) bool { + return len(key) < 32 || string(key[16:]) == "8d5dc900004589c3" + })) - count := 0 - for k, _ := cur.First(); len(k) > 0; k, _ = cur.Next() { - count++ - } + count := 0 + for k, _ := cur.First(); len(k) > 0; k, _ = cur.Next() { + count++ + } - if exp, got := 1, count; got != exp { - t.Errorf("unexpected number of keys, -got/+exp\n%s", cmp.Diff(got, exp)) - } + if exp, got := 1, count; got != exp { + t.Errorf("unexpected number of keys, -got/+exp\n%s", cmp.Diff(got, exp)) + } - return nil + return nil + }) + }) + + t.Run("filter by value", func(t *testing.T) { + _ = s.View(context.Background(), func(tx kv.Tx) error { + b, err := tx.Bucket([]byte(bucket)) + if err != nil { + return err + } + + cur, _ := b.Cursor(kv.WithCursorHintPredicate(func(_, val []byte) bool { + return len(val) < 32 || string(val[16:]) == "8d5dc900004589c3" + })) + + count := 0 + for k, _ := cur.First(); len(k) > 0; k, _ = cur.Next() { + count++ + } + + if exp, got := 1, count; got != exp { + t.Errorf("unexpected number of keys, -got/+exp\n%s", cmp.Diff(got, exp)) + } + + return nil + }) }) } @@ -143,7 +169,7 @@ func BenchmarkKVStore_Bucket_Cursor(b *testing.B) { } searchKey := "629ffa00003dd2ce" - predicate := kv.KeyPredicateFunc(func(key []byte) bool { + predicate := kv.CursorPredicateFunc(func(key, _ []byte) bool { return len(key) < 32 || string(key[16:]) == searchKey }) @@ -166,7 +192,7 @@ func BenchmarkKVStore_Bucket_Cursor(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { - openCursor(b, s, bucket, scanAll, kv.WithCursorHintKeyPredicate(predicate)) + openCursor(b, s, bucket, scanAll, kv.WithCursorHintPredicate(predicate)) } }) }) @@ -196,7 +222,7 @@ func fillBucket(t testing.TB, s *inmem.KVStore, bucket string, lines int) { for scan.Scan() { var key []byte key = append(key, scan.Bytes()...) - _ = b.Put(key, nil) + _ = b.Put(key, key) lines-- if lines <= 0 { break diff --git a/inmem/task_test.go b/inmem/task_test.go new file mode 100644 index 0000000000..5ccf62c120 --- /dev/null +++ b/inmem/task_test.go @@ -0,0 +1,108 @@ +package inmem_test + +import ( + "context" + "encoding/json" + "testing" + + "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/inmem" + "github.com/influxdata/influxdb/kv" + "github.com/influxdata/influxdb/snowflake" +) + +var ( + taskBucket = []byte("tasksv1") + organizationBucket = []byte("organizationsv1") + authBucket = []byte("authorizationsv1") + urmBucket = []byte("userresourcemappingsv1") + idgen influxdb.IDGenerator = snowflake.NewIDGenerator() +) + +func BenchmarkFindTaskByID_CursorHints(b *testing.B) { + kvs := inmem.NewKVStore() + ctx := context.Background() + _ = kvs.Update(ctx, func(tx kv.Tx) error { + createData(b, tx) + createTasks(b, tx) + + return nil + }) + + s := kv.NewService(kvs) + _ = s.Initialize(ctx) + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + _, _ = s.FindTaskByID(ctx, 1) + } +} + +func createData(tb testing.TB, tx kv.Tx) { + tb.Helper() + + authB, err := tx.Bucket(authBucket) + if err != nil { + tb.Fatal("authBucket:", err) + } + orgB, err := tx.Bucket(organizationBucket) + if err != nil { + tb.Fatal("organizationBucket:", err) + } + + a := influxdb.Authorization{ + Permissions: influxdb.OperPermissions(), + } + o := influxdb.Organization{} + + var orgID = influxdb.ID(1e4) + var userID = influxdb.ID(1e7) + for i := 1; i <= 1000; i++ { + o.ID = orgID + val := mustMarshal(tb, &o) + key, _ := a.OrgID.Encode() + _ = orgB.Put(key, val) + + a.OrgID = o.ID + orgID++ + + for j := 1; j <= 5; j++ { + a.ID = idgen.ID() + a.UserID = userID + userID++ + + val = mustMarshal(tb, &a) + key, _ = a.ID.Encode() + _ = authB.Put(key, val) + } + } +} + +func createTasks(tb testing.TB, tx kv.Tx) { + tb.Helper() + + taskB, err := tx.Bucket(taskBucket) + if err != nil { + tb.Fatal("taskBucket:", err) + } + + t := influxdb.Task{ + ID: 1, + OrganizationID: 1e4, + OwnerID: 1e7, + } + + val := mustMarshal(tb, &t) + key, _ := t.ID.Encode() + _ = taskB.Put(key, val) +} + +func mustMarshal(t testing.TB, v interface{}) []byte { + t.Helper() + d, err := json.Marshal(v) + if err != nil { + t.Fatal(err) + } + return d +} diff --git a/kv/auth.go b/kv/auth.go index bd25149c2d..5edee12a83 100644 --- a/kv/auth.go +++ b/kv/auth.go @@ -5,7 +5,9 @@ import ( "encoding/json" "fmt" + "github.com/buger/jsonparser" influxdb "github.com/influxdata/influxdb" + jsonp "github.com/influxdata/influxdb/pkg/jsonparser" ) var ( @@ -127,6 +129,59 @@ func (s *Service) findAuthorizationByToken(ctx context.Context, tx Tx, n string) return s.findAuthorizationByID(ctx, tx, id) } +func authorizationsPredicateFn(f influxdb.AuthorizationFilter) CursorPredicateFunc { + // if any errors occur reading the JSON data, the predicate will always return true + // to ensure the value is included and handled higher up. + + if f.ID != nil { + exp := *f.ID + return func(_, value []byte) bool { + got, err := jsonp.GetID(value, "id") + if err != nil { + return true + } + return got == exp + } + } + + if f.Token != nil { + exp := *f.Token + return func(_, value []byte) bool { + // it is assumed that token never has escaped string data + got, _, _, err := jsonparser.Get(value, "token") + if err != nil { + return true + } + return string(got) == exp + } + } + + var pred CursorPredicateFunc + if f.OrgID != nil { + exp := *f.OrgID + pred = func(_, value []byte) bool { + got, err := jsonp.GetID(value, "orgID") + if err != nil { + return true + } + + return got == exp + } + } + + if f.UserID != nil { + exp := *f.UserID + prevFn := pred + pred = func(key, value []byte) bool { + prev := prevFn == nil || prevFn(key, value) + got, exists, err := jsonp.GetOptionalID(value, "userID") + return prev && ((exp == got && exists) || err != nil) + } + } + + return pred +} + func filterAuthorizationsFn(filter influxdb.AuthorizationFilter) func(a *influxdb.Authorization) bool { if filter.ID != nil { return func(a *influxdb.Authorization) bool { @@ -225,9 +280,10 @@ func (s *Service) findAuthorizations(ctx context.Context, tx Tx, f influxdb.Auth f.OrgID = &o.ID } - as := []*influxdb.Authorization{} + var as []*influxdb.Authorization + pred := authorizationsPredicateFn(f) filterFn := filterAuthorizationsFn(f) - err := s.forEachAuthorization(ctx, tx, func(a *influxdb.Authorization) bool { + err := s.forEachAuthorization(ctx, tx, pred, func(a *influxdb.Authorization) bool { if filterFn(a) { as = append(as, a) } @@ -369,19 +425,27 @@ func decodeAuthorization(b []byte, a *influxdb.Authorization) error { } // forEachAuthorization will iterate through all authorizations while fn returns true. -func (s *Service) forEachAuthorization(ctx context.Context, tx Tx, fn func(*influxdb.Authorization) bool) error { +func (s *Service) forEachAuthorization(ctx context.Context, tx Tx, pred CursorPredicateFunc, fn func(*influxdb.Authorization) bool) error { b, err := tx.Bucket(authBucket) if err != nil { return err } - cur, err := b.Cursor() + var cur Cursor + if pred != nil { + cur, err = b.Cursor(WithCursorHintPredicate(pred)) + } else { + cur, err = b.Cursor() + } if err != nil { return err } for k, v := cur.First(); k != nil; k, v = cur.Next() { - a := &influxdb.Authorization{} + // preallocate Permissions to reduce multiple slice re-allocations + a := &influxdb.Authorization{ + Permissions: make([]influxdb.Permission, 64), + } if err := decodeAuthorization(v, a); err != nil { return err diff --git a/kv/auth_private_test.go b/kv/auth_private_test.go new file mode 100644 index 0000000000..da8e1e4515 --- /dev/null +++ b/kv/auth_private_test.go @@ -0,0 +1,149 @@ +package kv + +import ( + "encoding/json" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/influxdata/influxdb" +) + +func mustMarshal(t testing.TB, v interface{}) []byte { + t.Helper() + d, err := json.Marshal(v) + if err != nil { + t.Fatal(err) + } + return d +} + +func Test_authorizationsPredicateFn(t *testing.T) { + t.Run("ID", func(t *testing.T) { + val := influxdb.ID(1) + f := influxdb.AuthorizationFilter{ID: &val} + fn := authorizationsPredicateFn(f) + + t.Run("does match", func(t *testing.T) { + a := &influxdb.Authorization{ID: val, OrgID: 2} + if got, exp := fn(nil, mustMarshal(t, a)), true; got != exp { + t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp)) + } + }) + + t.Run("does not match", func(t *testing.T) { + a := &influxdb.Authorization{ID: 10, OrgID: 2} + if got, exp := fn(nil, mustMarshal(t, a)), false; got != exp { + t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp)) + } + }) + }) + + t.Run("token", func(t *testing.T) { + val := "token_token" + f := influxdb.AuthorizationFilter{Token: &val} + fn := authorizationsPredicateFn(f) + + t.Run("does match", func(t *testing.T) { + a := &influxdb.Authorization{ID: 10, OrgID: 2, Token: val} + if got, exp := fn(nil, mustMarshal(t, a)), true; got != exp { + t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp)) + } + }) + + t.Run("does not match", func(t *testing.T) { + a := &influxdb.Authorization{ID: 10, OrgID: 2, Token: "no_no_no"} + if got, exp := fn(nil, mustMarshal(t, a)), false; got != exp { + t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp)) + } + }) + }) + + t.Run("orgID", func(t *testing.T) { + val := influxdb.ID(1) + f := influxdb.AuthorizationFilter{OrgID: &val} + fn := authorizationsPredicateFn(f) + + t.Run("does match", func(t *testing.T) { + a := &influxdb.Authorization{ID: 10, OrgID: val} + if got, exp := fn(nil, mustMarshal(t, a)), true; got != exp { + t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp)) + } + }) + + t.Run("does not match", func(t *testing.T) { + a := &influxdb.Authorization{ID: 10, OrgID: 2} + if got, exp := fn(nil, mustMarshal(t, a)), false; got != exp { + t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp)) + } + }) + }) + + t.Run("userID", func(t *testing.T) { + val := influxdb.ID(1) + f := influxdb.AuthorizationFilter{UserID: &val} + fn := authorizationsPredicateFn(f) + + t.Run("does match", func(t *testing.T) { + a := &influxdb.Authorization{ID: 10, OrgID: 5, UserID: val} + if got, exp := fn(nil, mustMarshal(t, a)), true; got != exp { + t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp)) + } + }) + + t.Run("does not match", func(t *testing.T) { + a := &influxdb.Authorization{ID: 10, OrgID: 5, UserID: 2} + if got, exp := fn(nil, mustMarshal(t, a)), false; got != exp { + t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp)) + } + }) + + t.Run("missing userID", func(t *testing.T) { + a := &influxdb.Authorization{ID: 10, OrgID: 5} + if got, exp := fn(nil, mustMarshal(t, a)), false; got != exp { + t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp)) + } + }) + }) + + t.Run("orgID and userID", func(t *testing.T) { + orgID := influxdb.ID(1) + userID := influxdb.ID(10) + f := influxdb.AuthorizationFilter{OrgID: &orgID, UserID: &userID} + fn := authorizationsPredicateFn(f) + + t.Run("does match", func(t *testing.T) { + a := &influxdb.Authorization{ID: 10, OrgID: orgID, UserID: userID} + if got, exp := fn(nil, mustMarshal(t, a)), true; got != exp { + t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp)) + } + }) + + t.Run("org match user not match", func(t *testing.T) { + a := &influxdb.Authorization{ID: 10, OrgID: orgID, UserID: 11} + if got, exp := fn(nil, mustMarshal(t, a)), false; got != exp { + t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp)) + } + }) + + t.Run("org not match user match", func(t *testing.T) { + a := &influxdb.Authorization{ID: 10, OrgID: 2, UserID: userID} + if got, exp := fn(nil, mustMarshal(t, a)), false; got != exp { + t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp)) + } + }) + + t.Run("org and user not match", func(t *testing.T) { + a := &influxdb.Authorization{ID: 10, OrgID: 2, UserID: 11} + if got, exp := fn(nil, mustMarshal(t, a)), false; got != exp { + t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp)) + } + }) + + t.Run("org match user missing", func(t *testing.T) { + a := &influxdb.Authorization{ID: 10, OrgID: orgID} + if got, exp := fn(nil, mustMarshal(t, a)), false; got != exp { + t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp)) + } + }) + }) +} diff --git a/kv/store.go b/kv/store.go index 1b0e656857..2b45e418af 100644 --- a/kv/store.go +++ b/kv/store.go @@ -38,12 +38,12 @@ type Tx interface { WithContext(ctx context.Context) } -type KeyPredicateFunc func(key []byte) bool +type CursorPredicateFunc func(key, value []byte) bool type CursorHints struct { - KeyPrefix *string - KeyStart *string - KeyPredicateFn KeyPredicateFunc + KeyPrefix *string + KeyStart *string + PredicateFn CursorPredicateFunc } // CursorHint configures CursorHints @@ -67,12 +67,17 @@ func WithCursorHintKeyStart(start string) CursorHint { } } -// WithCursorHintKeyPredicate is a hint to the store +// WithCursorHintPredicate is a hint to the store // to return only key / values which return true for the // f. -func WithCursorHintKeyPredicate(f KeyPredicateFunc) CursorHint { +// +// The primary concern of the predicate is to improve performance. +// Therefore, it should perform tests on the data at minimal cost. +// If the predicate has no meaningful impact on reducing memory or +// CPU usage, there is no benefit to using it. +func WithCursorHintPredicate(f CursorPredicateFunc) CursorHint { return func(o *CursorHints) { - o.KeyPredicateFn = f + o.PredicateFn = f } } diff --git a/kv/urm.go b/kv/urm.go index 1634a5e34d..972ff2a847 100644 --- a/kv/urm.go +++ b/kv/urm.go @@ -97,23 +97,23 @@ func (s *Service) FindUserResourceMappings(ctx context.Context, filter influxdb. return ms, len(ms), nil } -func userResourceMappingPredicate(filter influxdb.UserResourceMappingFilter) KeyPredicateFunc { +func userResourceMappingPredicate(filter influxdb.UserResourceMappingFilter) CursorPredicateFunc { switch { case filter.ResourceID.Valid() && filter.UserID.Valid(): keyPredicate := filter.ResourceID.String() + filter.UserID.String() - return func(key []byte) bool { + return func(key, _ []byte) bool { return len(key) >= 32 && string(key[:32]) == keyPredicate } case !filter.ResourceID.Valid() && filter.UserID.Valid(): keyPredicate := filter.UserID.String() - return func(key []byte) bool { + return func(key, _ []byte) bool { return len(key) >= 32 && string(key[16:32]) == keyPredicate } case filter.ResourceID.Valid() && !filter.UserID.Valid(): keyPredicate := filter.ResourceID.String() - return func(key []byte) bool { + return func(key, _ []byte) bool { return len(key) >= 16 && string(key[:16]) == keyPredicate } @@ -124,9 +124,9 @@ func userResourceMappingPredicate(filter influxdb.UserResourceMappingFilter) Key func (s *Service) findUserResourceMappings(ctx context.Context, tx Tx, filter influxdb.UserResourceMappingFilter) ([]*influxdb.UserResourceMapping, error) { ms := []*influxdb.UserResourceMapping{} - keyPred := userResourceMappingPredicate(filter) + pred := userResourceMappingPredicate(filter) filterFn := filterMappingsFn(filter) - err := s.forEachUserResourceMapping(ctx, tx, keyPred, func(m *influxdb.UserResourceMapping) bool { + err := s.forEachUserResourceMapping(ctx, tx, pred, func(m *influxdb.UserResourceMapping) bool { if filterFn(m) { ms = append(ms, m) } @@ -235,14 +235,14 @@ func userResourceKey(m *influxdb.UserResourceMapping) ([]byte, error) { return key, nil } -func (s *Service) forEachUserResourceMapping(ctx context.Context, tx Tx, pred KeyPredicateFunc, fn func(*influxdb.UserResourceMapping) bool) error { +func (s *Service) forEachUserResourceMapping(ctx context.Context, tx Tx, pred CursorPredicateFunc, fn func(*influxdb.UserResourceMapping) bool) error { b, err := tx.Bucket(urmBucket) if err != nil { return UnavailableURMServiceError(err) } var cur Cursor if pred != nil { - cur, err = b.Cursor(WithCursorHintKeyPredicate(pred)) + cur, err = b.Cursor(WithCursorHintPredicate(pred)) } else { cur, err = b.Cursor() } diff --git a/kv/urm_private_test.go b/kv/urm_private_test.go index 2f9cac460c..27e4b1f4e1 100644 --- a/kv/urm_private_test.go +++ b/kv/urm_private_test.go @@ -22,12 +22,12 @@ func Test_userResourceMappingPredicate(t *testing.T) { u, k := mk(10, 20) f := influxdb.UserResourceMappingFilter{ResourceID: u.ResourceID} fn := userResourceMappingPredicate(f) - if got, exp := fn(k), true; got != exp { + if got, exp := fn(k, nil), true; got != exp { t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp)) } _, k = mk(10, 21) - if got, exp := fn(k), false; got != exp { + if got, exp := fn(k, nil), false; got != exp { t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp)) } }) @@ -36,12 +36,12 @@ func Test_userResourceMappingPredicate(t *testing.T) { u, k := mk(10, 20) f := influxdb.UserResourceMappingFilter{UserID: u.UserID} fn := userResourceMappingPredicate(f) - if got, exp := fn(k), true; got != exp { + if got, exp := fn(k, nil), true; got != exp { t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp)) } _, k = mk(11, 20) - if got, exp := fn(k), false; got != exp { + if got, exp := fn(k, nil), false; got != exp { t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp)) } }) @@ -50,12 +50,12 @@ func Test_userResourceMappingPredicate(t *testing.T) { u, k := mk(10, 20) f := influxdb.UserResourceMappingFilter{ResourceID: u.ResourceID, UserID: u.UserID} fn := userResourceMappingPredicate(f) - if got, exp := fn(k), true; got != exp { + if got, exp := fn(k, nil), true; got != exp { t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp)) } _, k = mk(11, 20) - if got, exp := fn(k), false; got != exp { + if got, exp := fn(k, nil), false; got != exp { t.Errorf("unexpected result -got/+exp\n%s", cmp.Diff(got, exp)) } }) diff --git a/testing/kv.go b/testing/kv.go index 7296021a27..57b5468e68 100644 --- a/testing/kv.go +++ b/testing/kv.go @@ -590,6 +590,44 @@ func KVCursorWithHints( }, exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03", "bbb/00"}, }, + { + name: "predicate for key", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: pairs( + "aa/00", "aa/01", + "aaa/00", "aaa/01", "aaa/02", "aaa/03", + "bbb/00", "bbb/01", "bbb/02"), + }, + args: args{ + seek: "aaa", + until: "aaa/03", + hints: []kv.CursorHint{ + kv.WithCursorHintPredicate(func(key, _ []byte) bool { + return len(key) < 3 || string(key[:3]) == "aaa" + })}, + }, + exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03"}, + }, + { + name: "predicate for value", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: pairs( + "aa/00", "aa/01", + "aaa/00", "aaa/01", "aaa/02", "aaa/03", + "bbb/00", "bbb/01", "bbb/02"), + }, + args: args{ + seek: "", + until: "aa/01", + hints: []kv.CursorHint{ + kv.WithCursorHintPredicate(func(_, val []byte) bool { + return len(val) < 7 || string(val[:7]) == "val:aa/" + })}, + }, + exp: []string{"aa/00", "aa/01"}, + }, } for _, tt := range tests { From 4c0cf991173ae4d4c424a3fb88fa4b9bb9679f89 Mon Sep 17 00:00:00 2001 From: Stuart Carnie Date: Fri, 22 Nov 2019 11:13:03 -0700 Subject: [PATCH 4/4] chore(inmem): Resolve staticcheck issue --- inmem/task_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/inmem/task_test.go b/inmem/task_test.go index 5ccf62c120..3fe769f8f5 100644 --- a/inmem/task_test.go +++ b/inmem/task_test.go @@ -15,7 +15,6 @@ var ( taskBucket = []byte("tasksv1") organizationBucket = []byte("organizationsv1") authBucket = []byte("authorizationsv1") - urmBucket = []byte("userresourcemappingsv1") idgen influxdb.IDGenerator = snowflake.NewIDGenerator() )