fix(kv): make index walk use cursor instead of get (#17501)
parent
d7f7cf26b1
commit
173ded1a10
35
kv/index.go
35
kv/index.go
|
@ -251,7 +251,7 @@ func (i *Index) Delete(tx Tx, foreignKey, primaryKey []byte) error {
|
|||
|
||||
// Walk walks the source bucket using keys found in the index using the provided foreign key
|
||||
// given the index has been fully populated.
|
||||
func (i *Index) Walk(tx Tx, foreignKey []byte, visitFn VisitFunc) error {
|
||||
func (i *Index) Walk(ctx context.Context, tx Tx, foreignKey []byte, visitFn VisitFunc) error {
|
||||
// skip walking if configured to do so as the index
|
||||
// is currently being used purely to write the index
|
||||
if !i.canRead {
|
||||
|
@ -274,7 +274,7 @@ func (i *Index) Walk(tx Tx, foreignKey []byte, visitFn VisitFunc) error {
|
|||
return err
|
||||
}
|
||||
|
||||
return indexWalk(cursor, sourceBucket, visitFn)
|
||||
return indexWalk(ctx, cursor, sourceBucket, visitFn)
|
||||
}
|
||||
|
||||
// PopulateConfig configures a call to Populate
|
||||
|
@ -549,7 +549,7 @@ func (i *Index) verify(ctx context.Context, store Store, includeMissingSource bo
|
|||
// indexWalk consumes the indexKey and primaryKey pairs in the index bucket and looks up their
|
||||
// associated primaryKey's value in the provided source bucket.
|
||||
// When an item is located in the source, the provided visit function is called with primary key and associated value.
|
||||
func indexWalk(indexCursor ForwardCursor, sourceBucket Bucket, visit VisitFunc) (err error) {
|
||||
func indexWalk(ctx context.Context, indexCursor ForwardCursor, sourceBucket Bucket, visit VisitFunc) (err error) {
|
||||
defer func() {
|
||||
if cerr := indexCursor.Close(); cerr != nil && err == nil {
|
||||
err = cerr
|
||||
|
@ -557,7 +557,13 @@ func indexWalk(indexCursor ForwardCursor, sourceBucket Bucket, visit VisitFunc)
|
|||
}()
|
||||
|
||||
for ik, pk := indexCursor.Next(); ik != nil; ik, pk = indexCursor.Next() {
|
||||
v, err := sourceBucket.Get(pk)
|
||||
// TODO(george): this is a work-around as lots of calls to Get()
|
||||
// on a transaction causes issues with a particular implementation
|
||||
// of kv.Store.
|
||||
// The use of a cursor on this store bypasses the transaction
|
||||
// and gives us the access pattern we desire.
|
||||
// Please do not change back to a bucket.Get().
|
||||
v, err := getKeyUsingRange(ctx, sourceBucket, pk)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -570,6 +576,27 @@ func indexWalk(indexCursor ForwardCursor, sourceBucket Bucket, visit VisitFunc)
|
|||
return indexCursor.Err()
|
||||
}
|
||||
|
||||
// getKeyUsingRange is a work around to for a particular implementation of kv.Store
|
||||
// which needs to lookup using cursors instead of individual get operations.
|
||||
func getKeyUsingRange(ctx context.Context, bucket Bucket, key []byte) ([]byte, error) {
|
||||
cursor, err := bucket.ForwardCursor(key,
|
||||
WithCursorPrefix(key))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, value := cursor.Next()
|
||||
if value == nil {
|
||||
return nil, ErrKeyNotFound
|
||||
}
|
||||
|
||||
if err := cursor.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return value, cursor.Close()
|
||||
}
|
||||
|
||||
// readEntireIndex returns the entire current state of the index
|
||||
func (i *Index) readEntireIndex(ctx context.Context, store Store) (map[string]map[string]struct{}, error) {
|
||||
kvs, err := consumeBucket(ctx, store, i.indexBucket)
|
||||
|
|
|
@ -128,7 +128,7 @@ func (s *Service) findUserResourceMappings(ctx context.Context, tx Tx, filter in
|
|||
if filter.UserID.Valid() {
|
||||
// urm by user index lookup
|
||||
userID, _ := filter.UserID.Encode()
|
||||
if err := s.urmByUserIndex.Walk(tx, userID, func(k, v []byte) error {
|
||||
if err := s.urmByUserIndex.Walk(ctx, tx, userID, func(k, v []byte) error {
|
||||
m := &influxdb.UserResourceMapping{}
|
||||
if err := json.Unmarshal(v, m); err != nil {
|
||||
return CorruptURMError(err)
|
||||
|
|
|
@ -67,7 +67,7 @@ func (s *Store) ListURMs(ctx context.Context, tx kv.Tx, filter influxdb.UserReso
|
|||
if filter.UserID.Valid() {
|
||||
// urm by user index lookup
|
||||
userID, _ := filter.UserID.Encode()
|
||||
if err := s.urmByUserIndex.Walk(tx, userID, func(k, v []byte) error {
|
||||
if err := s.urmByUserIndex.Walk(ctx, tx, userID, func(k, v []byte) error {
|
||||
m := &influxdb.UserResourceMapping{}
|
||||
if err := json.Unmarshal(v, m); err != nil {
|
||||
return CorruptURMError(err)
|
||||
|
|
|
@ -45,7 +45,7 @@ func newSomeResourceStore(ctx context.Context, store kv.Store) *someResourceStor
|
|||
|
||||
func (s *someResourceStore) FindByOwner(ctx context.Context, ownerID string) (resources []someResource, err error) {
|
||||
err = s.store.View(ctx, func(tx kv.Tx) error {
|
||||
return s.ownerIDIndex.Walk(tx, []byte(ownerID), func(k, v []byte) error {
|
||||
return s.ownerIDIndex.Walk(ctx, tx, []byte(ownerID), func(k, v []byte) error {
|
||||
var resource someResource
|
||||
if err := json.Unmarshal(v, &resource); err != nil {
|
||||
return err
|
||||
|
|
Loading…
Reference in New Issue