feat(tenant): add urm indexing into the tenant service (#17467)
parent
2200143a22
commit
4e38c4e636
|
|
@ -2,6 +2,7 @@ package tenant
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/kit/tracing"
|
||||
|
|
@ -13,14 +14,28 @@ const MaxIDGenerationN = 100
|
|||
const ReservedIDs = 1000
|
||||
|
||||
type Store struct {
|
||||
kvStore kv.Store
|
||||
IDGen influxdb.IDGenerator
|
||||
kvStore kv.Store
|
||||
IDGen influxdb.IDGenerator
|
||||
urmByUserIndex *kv.Index
|
||||
}
|
||||
|
||||
func NewStore(kvStore kv.Store) (*Store, error) {
|
||||
st := &Store{
|
||||
kvStore: kvStore,
|
||||
IDGen: snowflake.NewDefaultIDGenerator(),
|
||||
urmByUserIndex: kv.NewIndex(kv.NewIndexMapping(
|
||||
urmBucket,
|
||||
urmByUserIndexBucket,
|
||||
func(v []byte) ([]byte, error) {
|
||||
var urm influxdb.UserResourceMapping
|
||||
if err := json.Unmarshal(v, &urm); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
id, _ := urm.UserID.Encode()
|
||||
return id, nil
|
||||
},
|
||||
), kv.WithIndexReadPathEnabled),
|
||||
}
|
||||
return st, st.setup()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,7 +8,10 @@ import (
|
|||
"github.com/influxdata/influxdb/kv"
|
||||
)
|
||||
|
||||
var urmBucket = []byte("userresourcemappingsv1")
|
||||
var (
|
||||
urmBucket = []byte("userresourcemappingsv1")
|
||||
urmByUserIndexBucket = []byte("userresourcemappingsbyuserindexv1")
|
||||
)
|
||||
|
||||
func (s *Store) CreateURM(ctx context.Context, tx kv.Tx, urm *influxdb.UserResourceMapping) error {
|
||||
if err := s.uniqueUserResourceMapping(ctx, tx, urm); err != nil {
|
||||
|
|
@ -34,6 +37,15 @@ func (s *Store) CreateURM(ctx context.Context, tx kv.Tx, urm *influxdb.UserResou
|
|||
return UnavailableURMServiceError(err)
|
||||
}
|
||||
|
||||
// insert urm into by user index
|
||||
userID, err := urm.UserID.Encode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.urmByUserIndex.Insert(tx, userID, key); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -45,7 +57,34 @@ func (s *Store) ListURMs(ctx context.Context, tx kv.Tx, filter influxdb.UserReso
|
|||
return nil, UnavailableURMServiceError(err)
|
||||
}
|
||||
|
||||
// TODO(compute): Once we have an index we should be able to use it somewhere in here
|
||||
filterFn := func(m *influxdb.UserResourceMapping) bool {
|
||||
return (!filter.UserID.Valid() || (filter.UserID == m.UserID)) &&
|
||||
(!filter.ResourceID.Valid() || (filter.ResourceID == m.ResourceID)) &&
|
||||
(filter.UserType == "" || (filter.UserType == m.UserType)) &&
|
||||
(filter.ResourceType == "" || (filter.ResourceType == m.ResourceType))
|
||||
}
|
||||
|
||||
if filter.UserID.Valid() {
|
||||
// urm by user index lookup
|
||||
userID, _ := filter.UserID.Encode()
|
||||
if err := s.urmByUserIndex.Walk(tx, userID, func(k, v []byte) error {
|
||||
m := &influxdb.UserResourceMapping{}
|
||||
if err := json.Unmarshal(v, m); err != nil {
|
||||
return CorruptURMError(err)
|
||||
}
|
||||
|
||||
if filterFn(m) {
|
||||
ms = append(ms, m)
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ms, nil
|
||||
}
|
||||
|
||||
// for now the best we can do is use the resourceID if we have that as a forward cursor option
|
||||
var prefix []byte
|
||||
var cursorOptions []kv.CursorOption
|
||||
|
|
@ -71,10 +110,7 @@ func (s *Store) ListURMs(ctx context.Context, tx kv.Tx, filter influxdb.UserReso
|
|||
}
|
||||
|
||||
// check to see if it matches the filter
|
||||
if (!filter.UserID.Valid() || (filter.UserID == m.UserID)) &&
|
||||
(!filter.ResourceID.Valid() || (filter.ResourceID == m.ResourceID)) &&
|
||||
(filter.UserType == "" || (filter.UserType == m.UserType)) &&
|
||||
(filter.ResourceType == "" || (filter.ResourceType == m.ResourceType)) {
|
||||
if filterFn(m) {
|
||||
ms = append(ms, m)
|
||||
}
|
||||
|
||||
|
|
@ -120,6 +156,16 @@ func (s *Store) DeleteURM(ctx context.Context, tx kv.Tx, resourceID, userID infl
|
|||
return err
|
||||
}
|
||||
|
||||
// remove user resource mapping from by user index
|
||||
uid, err := userID.Encode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.urmByUserIndex.Delete(tx, uid, key); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return b.Delete(key)
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue