fix(kv): push down org ID to skip in delete URM (#16841)

* fix(kv): push down org ID to skip in delete URM

* fix(kv): use database key rather than resource id

We are trying to skip deletes that would remove keys
that have already been deleted.  This is a rather
extreme approach and I believe we should think about how
to fix user-resource-mappings.

Co-authored-by:  Lyon Hill <lyondhill@gmail.com>
Signed-off-by: Chris Goller <goller@gmail.com>

Co-authored-by: George <me@georgemac.com>
Co-authored-by: Lyon Hill <lyondhill@gmail.com>
pull/16843/head
Chris Goller 2020-02-12 14:33:57 -06:00 committed by GitHub
parent 7349216e94
commit a5f508de77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 57 additions and 13 deletions

View File

@ -126,14 +126,39 @@ func userResourceMappingPredicate(filter influxdb.UserResourceMappingFilter) Cur
}
}
func (s *Service) findUserResourceMappings(ctx context.Context, tx Tx, filter influxdb.UserResourceMappingFilter) ([]*influxdb.UserResourceMapping, error) {
type urmFindOptions struct {
skipKeys map[string]struct{}
}
type urmFindOption func(*urmFindOptions)
func (f *urmFindOptions) skip(key string) (skip bool) {
if f.skipKeys == nil {
return false
}
_, skip = f.skipKeys[key]
return
}
func withSkipKey(key string) urmFindOption {
return func(o *urmFindOptions) {
if o.skipKeys == nil {
o.skipKeys = map[string]struct{}{}
}
o.skipKeys[key] = struct{}{}
}
}
func (s *Service) findUserResourceMappings(ctx context.Context, tx Tx, filter influxdb.UserResourceMappingFilter, opts ...urmFindOption) ([]*influxdb.UserResourceMapping, error) {
ms := []*influxdb.UserResourceMapping{}
pred := userResourceMappingPredicate(filter)
filterFn := filterMappingsFn(filter)
// if we are given a user id we should try finding by index
if filter.UserID.Valid() {
var err error
ms, err = s.findUserResourceMappingsByIndex(ctx, tx, filter)
ms, err = s.findUserResourceMappingsByIndex(ctx, tx, filter, opts...)
if err != nil {
return nil, err
}
@ -166,9 +191,16 @@ func (s *Service) findUserResourceMappings(ctx context.Context, tx Tx, filter in
return ms, err
}
func (s *Service) findUserResourceMappingsByIndex(ctx context.Context, tx Tx, filter influxdb.UserResourceMappingFilter) ([]*influxdb.UserResourceMapping, error) {
ms := []*influxdb.UserResourceMapping{}
filterFn := filterMappingsFn(filter)
func (s *Service) findUserResourceMappingsByIndex(ctx context.Context, tx Tx, filter influxdb.UserResourceMappingFilter, opts ...urmFindOption) ([]*influxdb.UserResourceMapping, error) {
var (
ms = []*influxdb.UserResourceMapping{}
filterFn = filterMappingsFn(filter)
options = urmFindOptions{}
)
for _, opt := range opts {
opt(&options)
}
bkt, err := tx.Bucket(urmBucket)
if err != nil {
@ -195,6 +227,11 @@ func (s *Service) findUserResourceMappingsByIndex(ctx context.Context, tx Tx, fi
}
for k, v := cursor.Next(); k != nil && v != nil; k, v = cursor.Next() {
// step over skip keys
if options.skip(string(v)) {
continue
}
nv, err := bkt.Get(v)
if err != nil {
return nil, &influxdb.Error{
@ -426,17 +463,24 @@ func (s *Service) DeleteUserResourceMapping(ctx context.Context, resourceID infl
}
if m.ResourceType == influxdb.OrgsResourceType {
return s.deleteOrgDependentMappings(ctx, tx, m)
key, err := userResourceKey(m)
if err != nil {
// I'm not super concerned that we will get here. We know this is a valid resource
// because we've just found it above. Me of the future... if this was a problem,
// sorry.
return err
}
return s.deleteOrgDependentMappings(ctx, tx, m, withSkipKey(string(key)))
}
return nil
})
}
func (s *Service) deleteUserResourceMapping(ctx context.Context, tx Tx, filter influxdb.UserResourceMappingFilter) error {
func (s *Service) deleteUserResourceMapping(ctx context.Context, tx Tx, filter influxdb.UserResourceMappingFilter, opts ...urmFindOption) error {
// TODO(goller): do we really need to find here? Seems like a Get is
// good enough.
ms, err := s.findUserResourceMappings(ctx, tx, filter)
ms, err := s.findUserResourceMappings(ctx, tx, filter, opts...)
if err != nil {
return err
}
@ -527,7 +571,7 @@ func (s *Service) deleteUserResourceMappings(ctx context.Context, tx Tx, filter
}
// This method deletes the user/resource mappings for resources that belong to an organization.
func (s *Service) deleteOrgDependentMappings(ctx context.Context, tx Tx, m *influxdb.UserResourceMapping) error {
func (s *Service) deleteOrgDependentMappings(ctx context.Context, tx Tx, m *influxdb.UserResourceMapping, opts ...urmFindOption) error {
bf := influxdb.BucketFilter{OrganizationID: &m.ResourceID}
bs, err := s.findBuckets(ctx, tx, bf)
if err != nil {
@ -540,7 +584,7 @@ func (s *Service) deleteOrgDependentMappings(ctx context.Context, tx Tx, m *infl
UserID: m.UserID,
}
if err := s.deleteUserResourceMapping(ctx, tx, filter); err != nil {
if err := s.deleteUserResourceMapping(ctx, tx, filter, opts...); err != nil {
if influxdb.ErrorCode(err) == influxdb.ENotFound {
s.log.Info("URM bucket is missing", zap.Stringer("orgID", m.ResourceID))
continue

View File

@ -12,9 +12,9 @@ import (
)
const (
userOneID = "020f755c3c082000"
userTwoID = "020f755c3c082001"
userThreeID = "020f755c3c082002"
userOneID = "020f755c3c084000"
userTwoID = "020f755c3c084001"
userThreeID = "020f755c3c084002"
)
var userCmpOptions = cmp.Options{