diff --git a/kv/index.go b/kv/index.go index 340de07cca..9b9fe5cc76 100644 --- a/kv/index.go +++ b/kv/index.go @@ -251,7 +251,7 @@ func (i *Index) Delete(tx Tx, foreignKey, primaryKey []byte) error { // Walk walks the source bucket using keys found in the index using the provided foreign key // given the index has been fully populated. -func (i *Index) Walk(tx Tx, foreignKey []byte, visitFn VisitFunc) error { +func (i *Index) Walk(ctx context.Context, tx Tx, foreignKey []byte, visitFn VisitFunc) error { // skip walking if configured to do so as the index // is currently being used purely to write the index if !i.canRead { @@ -274,7 +274,7 @@ func (i *Index) Walk(tx Tx, foreignKey []byte, visitFn VisitFunc) error { return err } - return indexWalk(cursor, sourceBucket, visitFn) + return indexWalk(ctx, cursor, sourceBucket, visitFn) } // PopulateConfig configures a call to Populate @@ -549,7 +549,7 @@ func (i *Index) verify(ctx context.Context, store Store, includeMissingSource bo // indexWalk consumes the indexKey and primaryKey pairs in the index bucket and looks up their // associated primaryKey's value in the provided source bucket. // When an item is located in the source, the provided visit function is called with primary key and associated value. -func indexWalk(indexCursor ForwardCursor, sourceBucket Bucket, visit VisitFunc) (err error) { +func indexWalk(ctx context.Context, indexCursor ForwardCursor, sourceBucket Bucket, visit VisitFunc) (err error) { defer func() { if cerr := indexCursor.Close(); cerr != nil && err == nil { err = cerr @@ -557,7 +557,13 @@ func indexWalk(indexCursor ForwardCursor, sourceBucket Bucket, visit VisitFunc) }() for ik, pk := indexCursor.Next(); ik != nil; ik, pk = indexCursor.Next() { - v, err := sourceBucket.Get(pk) + // TODO(george): this is a work-around as lots of calls to Get() + // on a transaction causes issues with a particular implementation + // of kv.Store. + // The use of a cursor on this store bypasses the transaction + // and gives us the access pattern we desire. + // Please do not change back to a bucket.Get(). + v, err := getKeyUsingRange(ctx, sourceBucket, pk) if err != nil { return err } @@ -570,6 +576,27 @@ func indexWalk(indexCursor ForwardCursor, sourceBucket Bucket, visit VisitFunc) return indexCursor.Err() } +// getKeyUsingRange is a work around to for a particular implementation of kv.Store +// which needs to lookup using cursors instead of individual get operations. +func getKeyUsingRange(ctx context.Context, bucket Bucket, key []byte) ([]byte, error) { + cursor, err := bucket.ForwardCursor(key, + WithCursorPrefix(key)) + if err != nil { + return nil, err + } + + _, value := cursor.Next() + if value == nil { + return nil, ErrKeyNotFound + } + + if err := cursor.Err(); err != nil { + return nil, err + } + + return value, cursor.Close() +} + // readEntireIndex returns the entire current state of the index func (i *Index) readEntireIndex(ctx context.Context, store Store) (map[string]map[string]struct{}, error) { kvs, err := consumeBucket(ctx, store, i.indexBucket) diff --git a/kv/urm.go b/kv/urm.go index a3d353e966..247dd3e480 100644 --- a/kv/urm.go +++ b/kv/urm.go @@ -128,7 +128,7 @@ func (s *Service) findUserResourceMappings(ctx context.Context, tx Tx, filter in if filter.UserID.Valid() { // urm by user index lookup userID, _ := filter.UserID.Encode() - if err := s.urmByUserIndex.Walk(tx, userID, func(k, v []byte) error { + if err := s.urmByUserIndex.Walk(ctx, tx, userID, func(k, v []byte) error { m := &influxdb.UserResourceMapping{} if err := json.Unmarshal(v, m); err != nil { return CorruptURMError(err) diff --git a/tenant/storage_urm.go b/tenant/storage_urm.go index 8cf2d7d2a4..6c7e1fc798 100644 --- a/tenant/storage_urm.go +++ b/tenant/storage_urm.go @@ -67,7 +67,7 @@ func (s *Store) ListURMs(ctx context.Context, tx kv.Tx, filter influxdb.UserReso if filter.UserID.Valid() { // urm by user index lookup userID, _ := filter.UserID.Encode() - if err := s.urmByUserIndex.Walk(tx, userID, func(k, v []byte) error { + if err := s.urmByUserIndex.Walk(ctx, tx, userID, func(k, v []byte) error { m := &influxdb.UserResourceMapping{} if err := json.Unmarshal(v, m); err != nil { return CorruptURMError(err) diff --git a/testing/index.go b/testing/index.go index 71f31da292..7e530dd7e2 100644 --- a/testing/index.go +++ b/testing/index.go @@ -45,7 +45,7 @@ func newSomeResourceStore(ctx context.Context, store kv.Store) *someResourceStor func (s *someResourceStore) FindByOwner(ctx context.Context, ownerID string) (resources []someResource, err error) { err = s.store.View(ctx, func(tx kv.Tx) error { - return s.ownerIDIndex.Walk(tx, []byte(ownerID), func(k, v []byte) error { + return s.ownerIDIndex.Walk(ctx, tx, []byte(ownerID), func(k, v []byte) error { var resource someResource if err := json.Unmarshal(v, &resource); err != nil { return err