fix(tenant): add support for offset to list urms (#18944)

* fix(tenant): add support for offset to list urms

* fix(urm): treat urm limit 0 and unlimited
pull/18935/head
George 2020-07-14 19:40:04 +01:00 committed by GitHub
parent 0265acf744
commit c6967ee7b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 79 additions and 7 deletions

View File

@ -147,7 +147,6 @@ func (s *Service) RenewSession(ctx context.Context, session *influxdb.Session, n
}
func (s *Service) getPermissionSet(ctx context.Context, uid influxdb.ID) ([]influxdb.Permission, error) {
mappings, _, err := s.urmService.FindUserResourceMappings(ctx, influxdb.UserResourceMappingFilter{UserID: uid}, influxdb.FindOptions{Limit: 100})
if err != nil {
return nil, err

View File

@ -71,25 +71,32 @@ func (s *Store) ListURMs(ctx context.Context, tx kv.Tx, filter influxdb.UserReso
}
if filter.UserID.Valid() {
errPageLimit := errors.New("page limit reached")
var (
errPageLimit = errors.New("page limit reached")
// urm by user index lookup
userID, _ = filter.UserID.Encode()
seen int
)
// urm by user index lookup
userID, _ := filter.UserID.Encode()
if err := s.urmByUserIndex.Walk(ctx, tx, userID, func(k, v []byte) error {
m := &influxdb.UserResourceMapping{}
if err := json.Unmarshal(v, m); err != nil {
return CorruptURMError(err)
}
if filterFn(m) {
// respect offset parameter
reachedOffset := (len(opt) == 0 || seen >= opt[0].Offset)
if filterFn(m) && reachedOffset {
ms = append(ms, m)
}
// respect pagination in URMs
if len(opt) > 0 && len(ms) >= opt[0].Limit {
if len(opt) > 0 && opt[0].Limit > 0 && len(ms) >= opt[0].Limit {
return errPageLimit
}
seen++
return nil
}); err != nil && err != errPageLimit {
return nil, err
@ -127,7 +134,7 @@ func (s *Store) ListURMs(ctx context.Context, tx kv.Tx, filter influxdb.UserReso
ms = append(ms, m)
}
if len(opt) > 0 && len(ms) >= opt[0].Limit {
if len(opt) > 0 && opt[0].Limit > 0 && len(ms) >= opt[0].Limit {
break
}
}

View File

@ -241,6 +241,72 @@ func TestURM(t *testing.T) {
}
},
},
{
name: "list by user with limit and offset",
setup: func(t *testing.T, store *tenant.Store, tx kv.Tx) {
uid := influxdb.ID(1)
err := store.CreateUser(context.Background(), tx, &influxdb.User{
ID: uid,
Name: "user",
})
if err != nil {
t.Fatal(err)
}
for i := 1; i <= 25; i++ {
// User must exist to create urm.
err = store.CreateURM(context.Background(), tx, &influxdb.UserResourceMapping{
UserID: uid,
UserType: influxdb.Owner,
MappingType: influxdb.UserMappingType,
ResourceType: influxdb.OrgsResourceType,
ResourceID: influxdb.ID(i + 1),
})
if err != nil {
t.Fatal(err)
}
}
},
results: func(t *testing.T, store *tenant.Store, tx kv.Tx) {
urms, err := store.ListURMs(
context.Background(),
tx,
influxdb.UserResourceMappingFilter{
UserID: influxdb.ID(1)},
influxdb.FindOptions{
Offset: 10,
Limit: 10,
},
)
if err != nil {
t.Fatal(err)
}
if len(urms) != 10 {
t.Fatalf("when setting the limit to 10 we got: %d", len(urms))
}
var expected []*influxdb.UserResourceMapping
for i := 11; i <= 20; i++ {
expected = append(expected, &influxdb.UserResourceMapping{
UserID: influxdb.ID(1),
UserType: influxdb.Owner,
MappingType: influxdb.UserMappingType,
ResourceType: influxdb.OrgsResourceType,
ResourceID: influxdb.ID(i + 1),
})
}
sort.Slice(expected, func(i, j int) bool {
irid, _ := expected[i].ResourceID.Encode()
iuid, _ := expected[i].UserID.Encode()
jrid, _ := expected[j].ResourceID.Encode()
juid, _ := expected[j].UserID.Encode()
return string(irid)+string(iuid) < string(jrid)+string(juid)
})
if !reflect.DeepEqual(urms, expected) {
t.Fatalf("expected identical urms: \n%s", cmp.Diff(urms, expected))
}
},
},
{
name: "delete",
setup: simpleSetup,