From 9561d0a4f4efb6d1a752bd36bb1e55d8578c5547 Mon Sep 17 00:00:00 2001 From: Lyon Hill Date: Tue, 11 Feb 2020 09:17:37 -0700 Subject: [PATCH] feat: add indexes to urm for user lookups (#16789) --- kv/indexer.go | 83 ++++++++++++++----------- kv/indexer_test.go | 16 ++--- kv/service.go | 7 +-- kv/urm.go | 148 ++++++++++++++++++++++++++++++++++++++++++++- kv/urm_test.go | 45 +++++++++++++- 5 files changed, 248 insertions(+), 51 deletions(-) diff --git a/kv/indexer.go b/kv/indexer.go index c986e3ab8a..c727fcc5e0 100644 --- a/kv/indexer.go +++ b/kv/indexer.go @@ -13,71 +13,84 @@ type kvIndexer struct { ctx context.Context cancel context.CancelFunc indexChan chan indexBatch - finished chan struct{} - oncer sync.Once + wg sync.WaitGroup + working chan struct{} } type indexBatch struct { bucketName []byte - keys [][]byte + idxs map[string][]byte } func NewIndexer(log *zap.Logger, kv Store) *kvIndexer { ctx, cancel := context.WithCancel(context.Background()) - i := &kvIndexer{ + return &kvIndexer{ log: log, kv: kv, ctx: ctx, cancel: cancel, indexChan: make(chan indexBatch, 10), - finished: make(chan struct{}), + working: make(chan struct{}, 1), } - - go i.workIndexes() - return i } -func (i *kvIndexer) AddToIndex(bucketName []byte, keys [][]byte) { +func (i *kvIndexer) AddToIndex(bucketName []byte, idxs map[string][]byte) { // check for close select { case <-i.ctx.Done(): return - case i.indexChan <- indexBatch{bucketName, keys}: + case i.indexChan <- indexBatch{bucketName, idxs}: + } + + // add to the waitgroup and start the work process + select { + case i.working <- struct{}{}: + // i was able to insert i should start a worker + i.wg.Add(1) + go i.workIndexes() + default: + // we have reached our worker limit and we cannot start any more. + return } } func (i *kvIndexer) workIndexes() { - defer close(i.finished) - for batch := range i.indexChan { - // open update tx - err := i.kv.Update(i.ctx, func(tx Tx) error { - // create a bucket for this batch - bucket, err := tx.Bucket(batch.bucketName) - if err != nil { - return err - } - // insert all the keys - for _, key := range batch.keys { - err := bucket.Put(key, nil) + // let the system know we have finished + defer i.wg.Done() + // releasee the worker hold so the system can start more later + defer func() { <-i.working }() + + for { + select { + case batch := <-i.indexChan: + // open update tx + err := i.kv.Update(i.ctx, func(tx Tx) error { + // create a bucket for this batch + bucket, err := tx.Bucket(batch.bucketName) if err != nil { return err } - } - return nil - }) + // insert all the keys + for k, v := range batch.idxs { + err := bucket.Put([]byte(k), v) + if err != nil { + return err + } + } + return nil + }) - if err != nil { - //only option is to log - i.log.Error("failed to update index bucket", zap.Error(err)) + if err != nil { + //only option is to log + i.log.Error("failed to update index bucket", zap.Error(err)) + } + default: + // we have finished working + return } } } -func (i *kvIndexer) Stop() { - i.cancel() - i.oncer.Do(func() { - close(i.indexChan) - }) - - <-i.finished +func (i *kvIndexer) Wait() { + i.wg.Wait() } diff --git a/kv/indexer_test.go b/kv/indexer_test.go index 2cf689dd4d..e373062ca9 100644 --- a/kv/indexer_test.go +++ b/kv/indexer_test.go @@ -13,14 +13,14 @@ func TestIndexer(t *testing.T) { store := inmem.NewKVStore() indexer := kv.NewIndexer(zaptest.NewLogger(t), store) - indexes := [][]byte{ - []byte("1"), - []byte("2"), - []byte("3"), - []byte("4"), + indexes := map[string][]byte{ + "1": []byte("1"), + "2": []byte("2"), + "3": []byte("3"), + "4": []byte("4"), } indexer.AddToIndex([]byte("bucket"), indexes) - indexer.Stop() + indexer.Wait() count := 0 err := store.View(context.Background(), func(tx kv.Tx) error { @@ -33,8 +33,8 @@ func TestIndexer(t *testing.T) { t.Fatal(err) } for k, _ := cur.Next(); k != nil; k, _ = cur.Next() { - if string(k) != string(indexes[count]) { - t.Fatalf("failed to find correct index, found: %s, expected: %s", k, indexes[count]) + if string(k) != string(indexes[string(k)]) { + t.Fatalf("failed to find correct index, found: %s, expected: %s", k, indexes[string(k)]) } count++ } diff --git a/kv/service.go b/kv/service.go index 5c21bd9b3b..9aeae9c58d 100644 --- a/kv/service.go +++ b/kv/service.go @@ -19,8 +19,7 @@ var ( ) type indexer interface { - AddToIndex([]byte, [][]byte) - Stop() + AddToIndex([]byte, map[string][]byte) } // OpPrefix is the prefix for kv errors. @@ -183,10 +182,6 @@ func (s *Service) Initialize(ctx context.Context) error { } -func (s *Service) Stop() { - s.indexer.Stop() -} - // WithResourceLogger sets the resource audit logger for the service. func (s *Service) WithResourceLogger(audit resource.Logger) { s.audit = audit diff --git a/kv/urm.go b/kv/urm.go index 8c6ff087bd..13c77d14a3 100644 --- a/kv/urm.go +++ b/kv/urm.go @@ -12,7 +12,8 @@ import ( ) var ( - urmBucket = []byte("userresourcemappingsv1") + urmBucket = []byte("userresourcemappingsv1") + urmIndexBucket = []byte("userresourcemappingsindexv1") // ErrInvalidURMID is used when the service was provided // an invalid ID format. @@ -69,6 +70,9 @@ func (s *Service) initializeURMs(ctx context.Context, tx Tx) error { if _, err := tx.Bucket(urmBucket); err != nil { return UnavailableURMServiceError(err) } + if _, err := tx.Bucket(urmIndexBucket); err != nil { + return UnavailableURMServiceError(err) + } return nil } @@ -126,6 +130,20 @@ func (s *Service) findUserResourceMappings(ctx context.Context, tx Tx, filter in ms := []*influxdb.UserResourceMapping{} pred := userResourceMappingPredicate(filter) filterFn := filterMappingsFn(filter) + // if we are given a user id we should try finding by index + if filter.UserID.Valid() { + var err error + ms, err = s.findUserResourceMappingsByIndex(ctx, tx, filter) + if err != nil { + return nil, err + } + + // if we found nothing we need to fall back on the old method because the index may not have been created + if len(ms) > 0 { + return ms, nil + } + } + err := s.forEachUserResourceMapping(ctx, tx, pred, func(m *influxdb.UserResourceMapping) bool { if filterFn(m) { ms = append(ms, m) @@ -133,9 +151,76 @@ func (s *Service) findUserResourceMappings(ctx context.Context, tx Tx, filter in return true }) + // if we got to this point we failed to find the user by the index so we need to populate the index + if filter.UserID.Valid() && len(ms) > 0 { + indexes := map[string][]byte{} + for _, m := range ms { + key, _ := userResourceKey(m) + ikey, _ := userResourceIndexKey(m) + indexes[string(ikey)] = key + + } + + s.indexer.AddToIndex(urmIndexBucket, indexes) + } return ms, err } +func (s *Service) findUserResourceMappingsByIndex(ctx context.Context, tx Tx, filter influxdb.UserResourceMappingFilter) ([]*influxdb.UserResourceMapping, error) { + ms := []*influxdb.UserResourceMapping{} + filterFn := filterMappingsFn(filter) + + bkt, err := tx.Bucket(urmBucket) + if err != nil { + return nil, err + } + + idx, err := tx.Bucket(urmIndexBucket) + if err != nil { + return nil, err + } + + prefix := urmIndexPrefix(filter.UserID) + wrapInternal := func(err error) *influxdb.Error { + return &influxdb.Error{ + Code: influxdb.EInternal, + Err: err, + } + } + + // index scan + cursor, err := idx.ForwardCursor(prefix, WithCursorPrefix(prefix)) + if err != nil { + return nil, wrapInternal(err) + } + + for k, v := cursor.Next(); k != nil && v != nil; k, v = cursor.Next() { + v, err := bkt.Get(v) + if err != nil { + return nil, err + } + + m := &influxdb.UserResourceMapping{} + if err := json.Unmarshal(v, m); err != nil { + return nil, CorruptURMError(err) + } + + if filterFn(m) { + ms = append(ms, m) + } + } + + if err := cursor.Err(); err != nil { + return nil, wrapInternal(err) + } + + if err := cursor.Close(); err != nil { + return nil, wrapInternal(err) + } + + return ms, nil +} + func (s *Service) findUserResourceMapping(ctx context.Context, tx Tx, filter influxdb.UserResourceMappingFilter) (*influxdb.UserResourceMapping, error) { ms, err := s.findUserResourceMappings(ctx, tx, filter) if err != nil { @@ -184,6 +269,20 @@ func (s *Service) createUserResourceMapping(ctx context.Context, tx Tx, m *influ return UnavailableURMServiceError(err) } + ikey, err := userResourceIndexKey(m) + if err != nil { + return err + } + + ib, err := tx.Bucket(urmIndexBucket) + if err != nil { + return UnavailableURMServiceError(err) + } + + if err := ib.Put(ikey, key); err != nil { + return UnavailableURMServiceError(err) + } + if m.ResourceType == influxdb.OrgsResourceType { return s.createOrgDependentMappings(ctx, tx, m) } @@ -235,6 +334,26 @@ func userResourceKey(m *influxdb.UserResourceMapping) ([]byte, error) { return key, nil } +func userResourceIndexKey(m *influxdb.UserResourceMapping) ([]byte, error) { + encodedResourceID, err := m.ResourceID.Encode() + if err != nil { + return nil, ErrInvalidURMID + } + + encodedUserID, err := m.UserID.Encode() + if err != nil { + return nil, ErrInvalidURMID + } + + key := append(encodedUserID, '/') + return append(key, encodedResourceID...), nil +} + +func urmIndexPrefix(userID influxdb.ID) []byte { + id, _ := userID.Encode() + return append(id, '/') +} + func (s *Service) forEachUserResourceMapping(ctx context.Context, tx Tx, pred CursorPredicateFunc, fn func(*influxdb.UserResourceMapping) bool) error { b, err := tx.Bucket(urmBucket) if err != nil { @@ -327,11 +446,20 @@ func (s *Service) deleteUserResourceMapping(ctx context.Context, tx Tx, filter i return err } + ikey, err := userResourceIndexKey(ms[0]) + if err != nil { + return err + } + b, err := tx.Bucket(urmBucket) if err != nil { return UnavailableURMServiceError(err) } + ib, err := tx.Bucket(urmIndexBucket) + if err != nil { + return UnavailableURMServiceError(err) + } _, err = b.Get(key) if IsNotFound(err) { return ErrURMNotFound @@ -343,6 +471,11 @@ func (s *Service) deleteUserResourceMapping(ctx context.Context, tx Tx, filter i if err := b.Delete(key); err != nil { return UnavailableURMServiceError(err) } + + if err := ib.Delete(ikey); err != nil { + return UnavailableURMServiceError(err) + } + return nil } @@ -357,6 +490,11 @@ func (s *Service) deleteUserResourceMappings(ctx context.Context, tx Tx, filter return err } + ikey, err := userResourceIndexKey(m) + if err != nil { + return err + } + b, err := tx.Bucket(urmBucket) if err != nil { return UnavailableURMServiceError(err) @@ -370,9 +508,17 @@ func (s *Service) deleteUserResourceMappings(ctx context.Context, tx Tx, filter return UnavailableURMServiceError(err) } + ib, err := tx.Bucket(urmIndexBucket) + if err != nil { + return UnavailableURMServiceError(err) + } + if err := b.Delete(key); err != nil { return UnavailableURMServiceError(err) } + if err := ib.Delete(ikey); err != nil { + return UnavailableURMServiceError(err) + } } return nil } diff --git a/kv/urm_test.go b/kv/urm_test.go index 8af3002bf7..5a2fd018a5 100644 --- a/kv/urm_test.go +++ b/kv/urm_test.go @@ -5,11 +5,25 @@ import ( "testing" "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/inmem" "github.com/influxdata/influxdb/kv" + "github.com/influxdata/influxdb/snowflake" influxdbtesting "github.com/influxdata/influxdb/testing" "go.uber.org/zap/zaptest" ) +type testable interface { + Logf(string, ...interface{}) + Error(args ...interface{}) + Errorf(string, ...interface{}) + Fail() + Failed() bool + Name() string + FailNow() + Fatal(args ...interface{}) + Fatalf(format string, args ...interface{}) +} + func TestBoltUserResourceMappingService(t *testing.T) { influxdbtesting.UserResourceMappingService(initBoltUserResourceMappingService, t) } @@ -27,7 +41,7 @@ func initBoltUserResourceMappingService(f influxdbtesting.UserResourceFields, t } } -func initUserResourceMappingService(s kv.Store, f influxdbtesting.UserResourceFields, t *testing.T) (influxdb.UserResourceMappingService, func()) { +func initUserResourceMappingService(s kv.Store, f influxdbtesting.UserResourceFields, t testable) (influxdb.UserResourceMappingService, func()) { svc := kv.NewService(zaptest.NewLogger(t), s) ctx := context.Background() @@ -49,3 +63,32 @@ func initUserResourceMappingService(s kv.Store, f influxdbtesting.UserResourceFi } } } + +func BenchmarkReadURMs(b *testing.B) { + urms := influxdbtesting.UserResourceFields{ + UserResourceMappings: make([]*influxdb.UserResourceMapping, 10000), + } + idgen := snowflake.NewDefaultIDGenerator() + users := make([]influxdb.ID, 10) + for i := 0; i < 10; i++ { + users[i] = idgen.ID() + } + + for i := 0; i < 10000; i++ { + urms.UserResourceMappings[i] = &influxdb.UserResourceMapping{ + ResourceID: idgen.ID(), + UserID: users[i%len(users)], + UserType: influxdb.Member, + ResourceType: influxdb.BucketsResourceType, + } + } + st := inmem.NewKVStore() + initUserResourceMappingService(st, urms, b) + svc := kv.NewService(zaptest.NewLogger(b), st) + b.ResetTimer() + for i := 0; i < b.N; i++ { + svc.FindUserResourceMappings(context.Background(), influxdb.UserResourceMappingFilter{ + UserID: users[0], + }) + } +}