feat: add indexes to urm for user lookups (#16789)

pull/16821/head
Lyon Hill 2020-02-11 09:17:37 -07:00 committed by GitHub
parent b6d1cc6dd0
commit 9561d0a4f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 248 additions and 51 deletions

View File

@ -13,71 +13,84 @@ type kvIndexer struct {
ctx context.Context
cancel context.CancelFunc
indexChan chan indexBatch
finished chan struct{}
oncer sync.Once
wg sync.WaitGroup
working chan struct{}
}
type indexBatch struct {
bucketName []byte
keys [][]byte
idxs map[string][]byte
}
func NewIndexer(log *zap.Logger, kv Store) *kvIndexer {
ctx, cancel := context.WithCancel(context.Background())
i := &kvIndexer{
return &kvIndexer{
log: log,
kv: kv,
ctx: ctx,
cancel: cancel,
indexChan: make(chan indexBatch, 10),
finished: make(chan struct{}),
working: make(chan struct{}, 1),
}
go i.workIndexes()
return i
}
func (i *kvIndexer) AddToIndex(bucketName []byte, keys [][]byte) {
func (i *kvIndexer) AddToIndex(bucketName []byte, idxs map[string][]byte) {
// check for close
select {
case <-i.ctx.Done():
return
case i.indexChan <- indexBatch{bucketName, keys}:
case i.indexChan <- indexBatch{bucketName, idxs}:
}
// add to the waitgroup and start the work process
select {
case i.working <- struct{}{}:
// i was able to insert i should start a worker
i.wg.Add(1)
go i.workIndexes()
default:
// we have reached our worker limit and we cannot start any more.
return
}
}
func (i *kvIndexer) workIndexes() {
defer close(i.finished)
for batch := range i.indexChan {
// open update tx
err := i.kv.Update(i.ctx, func(tx Tx) error {
// create a bucket for this batch
bucket, err := tx.Bucket(batch.bucketName)
if err != nil {
return err
}
// insert all the keys
for _, key := range batch.keys {
err := bucket.Put(key, nil)
// let the system know we have finished
defer i.wg.Done()
// releasee the worker hold so the system can start more later
defer func() { <-i.working }()
for {
select {
case batch := <-i.indexChan:
// open update tx
err := i.kv.Update(i.ctx, func(tx Tx) error {
// create a bucket for this batch
bucket, err := tx.Bucket(batch.bucketName)
if err != nil {
return err
}
}
return nil
})
// insert all the keys
for k, v := range batch.idxs {
err := bucket.Put([]byte(k), v)
if err != nil {
return err
}
}
return nil
})
if err != nil {
//only option is to log
i.log.Error("failed to update index bucket", zap.Error(err))
if err != nil {
//only option is to log
i.log.Error("failed to update index bucket", zap.Error(err))
}
default:
// we have finished working
return
}
}
}
func (i *kvIndexer) Stop() {
i.cancel()
i.oncer.Do(func() {
close(i.indexChan)
})
<-i.finished
func (i *kvIndexer) Wait() {
i.wg.Wait()
}

View File

@ -13,14 +13,14 @@ func TestIndexer(t *testing.T) {
store := inmem.NewKVStore()
indexer := kv.NewIndexer(zaptest.NewLogger(t), store)
indexes := [][]byte{
[]byte("1"),
[]byte("2"),
[]byte("3"),
[]byte("4"),
indexes := map[string][]byte{
"1": []byte("1"),
"2": []byte("2"),
"3": []byte("3"),
"4": []byte("4"),
}
indexer.AddToIndex([]byte("bucket"), indexes)
indexer.Stop()
indexer.Wait()
count := 0
err := store.View(context.Background(), func(tx kv.Tx) error {
@ -33,8 +33,8 @@ func TestIndexer(t *testing.T) {
t.Fatal(err)
}
for k, _ := cur.Next(); k != nil; k, _ = cur.Next() {
if string(k) != string(indexes[count]) {
t.Fatalf("failed to find correct index, found: %s, expected: %s", k, indexes[count])
if string(k) != string(indexes[string(k)]) {
t.Fatalf("failed to find correct index, found: %s, expected: %s", k, indexes[string(k)])
}
count++
}

View File

@ -19,8 +19,7 @@ var (
)
type indexer interface {
AddToIndex([]byte, [][]byte)
Stop()
AddToIndex([]byte, map[string][]byte)
}
// OpPrefix is the prefix for kv errors.
@ -183,10 +182,6 @@ func (s *Service) Initialize(ctx context.Context) error {
}
func (s *Service) Stop() {
s.indexer.Stop()
}
// WithResourceLogger sets the resource audit logger for the service.
func (s *Service) WithResourceLogger(audit resource.Logger) {
s.audit = audit

148
kv/urm.go
View File

@ -12,7 +12,8 @@ import (
)
var (
urmBucket = []byte("userresourcemappingsv1")
urmBucket = []byte("userresourcemappingsv1")
urmIndexBucket = []byte("userresourcemappingsindexv1")
// ErrInvalidURMID is used when the service was provided
// an invalid ID format.
@ -69,6 +70,9 @@ func (s *Service) initializeURMs(ctx context.Context, tx Tx) error {
if _, err := tx.Bucket(urmBucket); err != nil {
return UnavailableURMServiceError(err)
}
if _, err := tx.Bucket(urmIndexBucket); err != nil {
return UnavailableURMServiceError(err)
}
return nil
}
@ -126,6 +130,20 @@ func (s *Service) findUserResourceMappings(ctx context.Context, tx Tx, filter in
ms := []*influxdb.UserResourceMapping{}
pred := userResourceMappingPredicate(filter)
filterFn := filterMappingsFn(filter)
// if we are given a user id we should try finding by index
if filter.UserID.Valid() {
var err error
ms, err = s.findUserResourceMappingsByIndex(ctx, tx, filter)
if err != nil {
return nil, err
}
// if we found nothing we need to fall back on the old method because the index may not have been created
if len(ms) > 0 {
return ms, nil
}
}
err := s.forEachUserResourceMapping(ctx, tx, pred, func(m *influxdb.UserResourceMapping) bool {
if filterFn(m) {
ms = append(ms, m)
@ -133,9 +151,76 @@ func (s *Service) findUserResourceMappings(ctx context.Context, tx Tx, filter in
return true
})
// if we got to this point we failed to find the user by the index so we need to populate the index
if filter.UserID.Valid() && len(ms) > 0 {
indexes := map[string][]byte{}
for _, m := range ms {
key, _ := userResourceKey(m)
ikey, _ := userResourceIndexKey(m)
indexes[string(ikey)] = key
}
s.indexer.AddToIndex(urmIndexBucket, indexes)
}
return ms, err
}
func (s *Service) findUserResourceMappingsByIndex(ctx context.Context, tx Tx, filter influxdb.UserResourceMappingFilter) ([]*influxdb.UserResourceMapping, error) {
ms := []*influxdb.UserResourceMapping{}
filterFn := filterMappingsFn(filter)
bkt, err := tx.Bucket(urmBucket)
if err != nil {
return nil, err
}
idx, err := tx.Bucket(urmIndexBucket)
if err != nil {
return nil, err
}
prefix := urmIndexPrefix(filter.UserID)
wrapInternal := func(err error) *influxdb.Error {
return &influxdb.Error{
Code: influxdb.EInternal,
Err: err,
}
}
// index scan
cursor, err := idx.ForwardCursor(prefix, WithCursorPrefix(prefix))
if err != nil {
return nil, wrapInternal(err)
}
for k, v := cursor.Next(); k != nil && v != nil; k, v = cursor.Next() {
v, err := bkt.Get(v)
if err != nil {
return nil, err
}
m := &influxdb.UserResourceMapping{}
if err := json.Unmarshal(v, m); err != nil {
return nil, CorruptURMError(err)
}
if filterFn(m) {
ms = append(ms, m)
}
}
if err := cursor.Err(); err != nil {
return nil, wrapInternal(err)
}
if err := cursor.Close(); err != nil {
return nil, wrapInternal(err)
}
return ms, nil
}
func (s *Service) findUserResourceMapping(ctx context.Context, tx Tx, filter influxdb.UserResourceMappingFilter) (*influxdb.UserResourceMapping, error) {
ms, err := s.findUserResourceMappings(ctx, tx, filter)
if err != nil {
@ -184,6 +269,20 @@ func (s *Service) createUserResourceMapping(ctx context.Context, tx Tx, m *influ
return UnavailableURMServiceError(err)
}
ikey, err := userResourceIndexKey(m)
if err != nil {
return err
}
ib, err := tx.Bucket(urmIndexBucket)
if err != nil {
return UnavailableURMServiceError(err)
}
if err := ib.Put(ikey, key); err != nil {
return UnavailableURMServiceError(err)
}
if m.ResourceType == influxdb.OrgsResourceType {
return s.createOrgDependentMappings(ctx, tx, m)
}
@ -235,6 +334,26 @@ func userResourceKey(m *influxdb.UserResourceMapping) ([]byte, error) {
return key, nil
}
func userResourceIndexKey(m *influxdb.UserResourceMapping) ([]byte, error) {
encodedResourceID, err := m.ResourceID.Encode()
if err != nil {
return nil, ErrInvalidURMID
}
encodedUserID, err := m.UserID.Encode()
if err != nil {
return nil, ErrInvalidURMID
}
key := append(encodedUserID, '/')
return append(key, encodedResourceID...), nil
}
func urmIndexPrefix(userID influxdb.ID) []byte {
id, _ := userID.Encode()
return append(id, '/')
}
func (s *Service) forEachUserResourceMapping(ctx context.Context, tx Tx, pred CursorPredicateFunc, fn func(*influxdb.UserResourceMapping) bool) error {
b, err := tx.Bucket(urmBucket)
if err != nil {
@ -327,11 +446,20 @@ func (s *Service) deleteUserResourceMapping(ctx context.Context, tx Tx, filter i
return err
}
ikey, err := userResourceIndexKey(ms[0])
if err != nil {
return err
}
b, err := tx.Bucket(urmBucket)
if err != nil {
return UnavailableURMServiceError(err)
}
ib, err := tx.Bucket(urmIndexBucket)
if err != nil {
return UnavailableURMServiceError(err)
}
_, err = b.Get(key)
if IsNotFound(err) {
return ErrURMNotFound
@ -343,6 +471,11 @@ func (s *Service) deleteUserResourceMapping(ctx context.Context, tx Tx, filter i
if err := b.Delete(key); err != nil {
return UnavailableURMServiceError(err)
}
if err := ib.Delete(ikey); err != nil {
return UnavailableURMServiceError(err)
}
return nil
}
@ -357,6 +490,11 @@ func (s *Service) deleteUserResourceMappings(ctx context.Context, tx Tx, filter
return err
}
ikey, err := userResourceIndexKey(m)
if err != nil {
return err
}
b, err := tx.Bucket(urmBucket)
if err != nil {
return UnavailableURMServiceError(err)
@ -370,9 +508,17 @@ func (s *Service) deleteUserResourceMappings(ctx context.Context, tx Tx, filter
return UnavailableURMServiceError(err)
}
ib, err := tx.Bucket(urmIndexBucket)
if err != nil {
return UnavailableURMServiceError(err)
}
if err := b.Delete(key); err != nil {
return UnavailableURMServiceError(err)
}
if err := ib.Delete(ikey); err != nil {
return UnavailableURMServiceError(err)
}
}
return nil
}

View File

@ -5,11 +5,25 @@ import (
"testing"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/inmem"
"github.com/influxdata/influxdb/kv"
"github.com/influxdata/influxdb/snowflake"
influxdbtesting "github.com/influxdata/influxdb/testing"
"go.uber.org/zap/zaptest"
)
type testable interface {
Logf(string, ...interface{})
Error(args ...interface{})
Errorf(string, ...interface{})
Fail()
Failed() bool
Name() string
FailNow()
Fatal(args ...interface{})
Fatalf(format string, args ...interface{})
}
func TestBoltUserResourceMappingService(t *testing.T) {
influxdbtesting.UserResourceMappingService(initBoltUserResourceMappingService, t)
}
@ -27,7 +41,7 @@ func initBoltUserResourceMappingService(f influxdbtesting.UserResourceFields, t
}
}
func initUserResourceMappingService(s kv.Store, f influxdbtesting.UserResourceFields, t *testing.T) (influxdb.UserResourceMappingService, func()) {
func initUserResourceMappingService(s kv.Store, f influxdbtesting.UserResourceFields, t testable) (influxdb.UserResourceMappingService, func()) {
svc := kv.NewService(zaptest.NewLogger(t), s)
ctx := context.Background()
@ -49,3 +63,32 @@ func initUserResourceMappingService(s kv.Store, f influxdbtesting.UserResourceFi
}
}
}
func BenchmarkReadURMs(b *testing.B) {
urms := influxdbtesting.UserResourceFields{
UserResourceMappings: make([]*influxdb.UserResourceMapping, 10000),
}
idgen := snowflake.NewDefaultIDGenerator()
users := make([]influxdb.ID, 10)
for i := 0; i < 10; i++ {
users[i] = idgen.ID()
}
for i := 0; i < 10000; i++ {
urms.UserResourceMappings[i] = &influxdb.UserResourceMapping{
ResourceID: idgen.ID(),
UserID: users[i%len(users)],
UserType: influxdb.Member,
ResourceType: influxdb.BucketsResourceType,
}
}
st := inmem.NewKVStore()
initUserResourceMappingService(st, urms, b)
svc := kv.NewService(zaptest.NewLogger(b), st)
b.ResetTimer()
for i := 0; i < b.N; i++ {
svc.FindUserResourceMappings(context.Background(), influxdb.UserResourceMappingFilter{
UserID: users[0],
})
}
}