feat(influxdb): add authorized user resource mappings
feat(bolt): add function to find a resources organization id rename platform to influxdb Co-authored-by: Leonardo Di Donato <leodidonato@gmail.com> Co-authored-by: Michael Desa <mjdesa@gmail.com> fix(bolt): rename FindResoureOrganization to FindResoureOrganizationID feat(authorizer): add authorized user resource mapping service Co-authored-by: Leonardo Di Donato <leodidonato@gmail.com> Co-authored-by: Michael Desa <mjdesa@gmail.com> feat(influxdb): wire up authorized user resource mapping Co-authored-by: Leonardo Di Donato <leodidonato@gmail.com> Co-authored-by: Michael Desa <mjdesa@gmail.com> fix(authorizer): remove unused field from tests Co-authored-by: Leonardo Di Donato <leodidonato@gmail.com> Co-authored-by: Michael Desa <mjdesa@gmail.com>pull/11301/head
parent
a5fe808680
commit
cdc9146b78
|
@ -0,0 +1,114 @@
|
|||
package authorizer
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
)
|
||||
|
||||
type OrganizationService interface {
|
||||
FindResourceOrganizationID(ctx context.Context, rt influxdb.ResourceType, id influxdb.ID) (influxdb.ID, error)
|
||||
}
|
||||
|
||||
type URMService struct {
|
||||
s influxdb.UserResourceMappingService
|
||||
orgService OrganizationService
|
||||
}
|
||||
|
||||
func NewURMService(orgSvc OrganizationService, s influxdb.UserResourceMappingService) *URMService {
|
||||
return &URMService{
|
||||
s: s,
|
||||
orgService: orgSvc,
|
||||
}
|
||||
}
|
||||
|
||||
func newURMPermission(a influxdb.Action, rt influxdb.ResourceType, orgID, id influxdb.ID) (*influxdb.Permission, error) {
|
||||
return influxdb.NewPermissionAtID(id, a, rt, orgID)
|
||||
}
|
||||
|
||||
func authorizeReadURM(ctx context.Context, rt influxdb.ResourceType, orgID, id influxdb.ID) error {
|
||||
p, err := newURMPermission(influxdb.ReadAction, rt, orgID, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := IsAllowed(ctx, *p); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func authorizeWriteURM(ctx context.Context, rt influxdb.ResourceType, orgID, id influxdb.ID) error {
|
||||
p, err := newURMPermission(influxdb.WriteAction, rt, orgID, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := IsAllowed(ctx, *p); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *URMService) FindUserResourceMappings(ctx context.Context, filter influxdb.UserResourceMappingFilter, opt ...influxdb.FindOptions) ([]*influxdb.UserResourceMapping, int, error) {
|
||||
urms, _, err := s.s.FindUserResourceMappings(ctx, filter, opt...)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
mappings := urms[:0]
|
||||
for _, urm := range urms {
|
||||
orgID, err := s.orgService.FindResourceOrganizationID(ctx, urm.ResourceType, urm.ResourceID)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
if err := authorizeReadURM(ctx, urm.ResourceType, orgID, urm.ResourceID); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
mappings = append(mappings, urm)
|
||||
}
|
||||
|
||||
return mappings, len(mappings), nil
|
||||
}
|
||||
|
||||
func (s *URMService) CreateUserResourceMapping(ctx context.Context, m *influxdb.UserResourceMapping) error {
|
||||
orgID, err := s.orgService.FindResourceOrganizationID(ctx, m.ResourceType, m.ResourceID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := authorizeWriteURM(ctx, m.ResourceType, orgID, m.ResourceID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.s.CreateUserResourceMapping(ctx, m)
|
||||
}
|
||||
|
||||
func (s *URMService) DeleteUserResourceMapping(ctx context.Context, resourceID influxdb.ID, userID influxdb.ID) error {
|
||||
f := influxdb.UserResourceMappingFilter{ResourceID: resourceID, UserID: userID}
|
||||
urms, _, err := s.s.FindUserResourceMappings(ctx, f)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, urm := range urms {
|
||||
orgID, err := s.orgService.FindResourceOrganizationID(ctx, urm.ResourceType, urm.ResourceID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := authorizeWriteURM(ctx, urm.ResourceType, orgID, urm.ResourceID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.s.DeleteUserResourceMapping(ctx, urm.ResourceID, urm.UserID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,258 @@
|
|||
package authorizer_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/authorizer"
|
||||
influxdbcontext "github.com/influxdata/influxdb/context"
|
||||
"github.com/influxdata/influxdb/mock"
|
||||
influxdbtesting "github.com/influxdata/influxdb/testing"
|
||||
)
|
||||
|
||||
type OrgService struct {
|
||||
OrgID influxdb.ID
|
||||
}
|
||||
|
||||
func (s *OrgService) FindResourceOrganizationID(ctx context.Context, rt influxdb.ResourceType, id influxdb.ID) (influxdb.ID, error) {
|
||||
return s.OrgID, nil
|
||||
}
|
||||
|
||||
func TestURMService_FindUserResourceMappings(t *testing.T) {
|
||||
type fields struct {
|
||||
UserResourceMappingService influxdb.UserResourceMappingService
|
||||
OrgService authorizer.OrganizationService
|
||||
}
|
||||
type args struct {
|
||||
permission influxdb.Permission
|
||||
}
|
||||
type wants struct {
|
||||
err error
|
||||
urms []*influxdb.UserResourceMapping
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
wants wants
|
||||
}{
|
||||
{
|
||||
name: "authorized to see all users",
|
||||
fields: fields{
|
||||
OrgService: &OrgService{OrgID: 10},
|
||||
UserResourceMappingService: &mock.UserResourceMappingService{
|
||||
FindMappingsFn: func(ctx context.Context, filter influxdb.UserResourceMappingFilter) ([]*influxdb.UserResourceMapping, int, error) {
|
||||
return []*influxdb.UserResourceMapping{
|
||||
{
|
||||
ResourceID: 1,
|
||||
ResourceType: influxdb.BucketsResourceType,
|
||||
},
|
||||
{
|
||||
ResourceID: 2,
|
||||
ResourceType: influxdb.BucketsResourceType,
|
||||
},
|
||||
{
|
||||
ResourceID: 3,
|
||||
ResourceType: influxdb.BucketsResourceType,
|
||||
},
|
||||
}, 3, nil
|
||||
},
|
||||
},
|
||||
},
|
||||
args: args{
|
||||
permission: influxdb.Permission{
|
||||
Action: "read",
|
||||
Resource: influxdb.Resource{
|
||||
Type: influxdb.BucketsResourceType,
|
||||
OrgID: influxdbtesting.IDPtr(10),
|
||||
},
|
||||
},
|
||||
},
|
||||
wants: wants{
|
||||
urms: []*influxdb.UserResourceMapping{
|
||||
{
|
||||
ResourceID: 1,
|
||||
ResourceType: influxdb.BucketsResourceType,
|
||||
},
|
||||
{
|
||||
ResourceID: 2,
|
||||
ResourceType: influxdb.BucketsResourceType,
|
||||
},
|
||||
{
|
||||
ResourceID: 3,
|
||||
ResourceType: influxdb.BucketsResourceType,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "authorized to see all users",
|
||||
fields: fields{
|
||||
OrgService: &OrgService{OrgID: 10},
|
||||
UserResourceMappingService: &mock.UserResourceMappingService{
|
||||
FindMappingsFn: func(ctx context.Context, filter influxdb.UserResourceMappingFilter) ([]*influxdb.UserResourceMapping, int, error) {
|
||||
return []*influxdb.UserResourceMapping{
|
||||
{
|
||||
ResourceID: 1,
|
||||
ResourceType: influxdb.BucketsResourceType,
|
||||
},
|
||||
{
|
||||
ResourceID: 2,
|
||||
ResourceType: influxdb.BucketsResourceType,
|
||||
},
|
||||
{
|
||||
ResourceID: 3,
|
||||
ResourceType: influxdb.BucketsResourceType,
|
||||
},
|
||||
}, 3, nil
|
||||
},
|
||||
},
|
||||
},
|
||||
args: args{
|
||||
permission: influxdb.Permission{
|
||||
Action: "read",
|
||||
Resource: influxdb.Resource{
|
||||
Type: influxdb.BucketsResourceType,
|
||||
OrgID: influxdbtesting.IDPtr(11),
|
||||
},
|
||||
},
|
||||
},
|
||||
wants: wants{
|
||||
urms: []*influxdb.UserResourceMapping{},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
s := authorizer.NewURMService(tt.fields.OrgService, tt.fields.UserResourceMappingService)
|
||||
|
||||
ctx := context.Background()
|
||||
ctx = influxdbcontext.SetAuthorizer(ctx, &Authorizer{[]influxdb.Permission{tt.args.permission}})
|
||||
|
||||
urms, _, err := s.FindUserResourceMappings(ctx, influxdb.UserResourceMappingFilter{})
|
||||
influxdbtesting.ErrorsEqual(t, err, tt.wants.err)
|
||||
|
||||
if diff := cmp.Diff(urms, tt.wants.urms); diff != "" {
|
||||
t.Errorf("urms are different -got/+want\ndiff %s", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestURMService_WriteUserResourceMapping(t *testing.T) {
|
||||
type fields struct {
|
||||
UserResourceMappingService influxdb.UserResourceMappingService
|
||||
OrgService authorizer.OrganizationService
|
||||
}
|
||||
type args struct {
|
||||
permission influxdb.Permission
|
||||
}
|
||||
type wants struct {
|
||||
err error
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
wants wants
|
||||
}{
|
||||
{
|
||||
name: "authorized to write urm",
|
||||
fields: fields{
|
||||
OrgService: &OrgService{OrgID: 10},
|
||||
UserResourceMappingService: &mock.UserResourceMappingService{
|
||||
CreateMappingFn: func(ctx context.Context, m *influxdb.UserResourceMapping) error {
|
||||
return nil
|
||||
},
|
||||
DeleteMappingFn: func(ctx context.Context, rid, uid influxdb.ID) error {
|
||||
return nil
|
||||
},
|
||||
FindMappingsFn: func(ctx context.Context, filter influxdb.UserResourceMappingFilter) ([]*influxdb.UserResourceMapping, int, error) {
|
||||
return []*influxdb.UserResourceMapping{
|
||||
{
|
||||
ResourceID: 1,
|
||||
ResourceType: influxdb.BucketsResourceType,
|
||||
UserID: 100,
|
||||
},
|
||||
}, 3, nil
|
||||
},
|
||||
},
|
||||
},
|
||||
args: args{
|
||||
permission: influxdb.Permission{
|
||||
Action: "write",
|
||||
Resource: influxdb.Resource{
|
||||
Type: influxdb.BucketsResourceType,
|
||||
OrgID: influxdbtesting.IDPtr(10),
|
||||
},
|
||||
},
|
||||
},
|
||||
wants: wants{
|
||||
err: nil,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "unauthorized to write urm",
|
||||
fields: fields{
|
||||
OrgService: &OrgService{OrgID: 10},
|
||||
UserResourceMappingService: &mock.UserResourceMappingService{
|
||||
CreateMappingFn: func(ctx context.Context, m *influxdb.UserResourceMapping) error {
|
||||
return nil
|
||||
},
|
||||
DeleteMappingFn: func(ctx context.Context, rid, uid influxdb.ID) error {
|
||||
return nil
|
||||
},
|
||||
FindMappingsFn: func(ctx context.Context, filter influxdb.UserResourceMappingFilter) ([]*influxdb.UserResourceMapping, int, error) {
|
||||
return []*influxdb.UserResourceMapping{
|
||||
{
|
||||
ResourceID: 1,
|
||||
ResourceType: influxdb.BucketsResourceType,
|
||||
UserID: 100,
|
||||
},
|
||||
}, 3, nil
|
||||
},
|
||||
},
|
||||
},
|
||||
args: args{
|
||||
permission: influxdb.Permission{
|
||||
Action: "write",
|
||||
Resource: influxdb.Resource{
|
||||
Type: influxdb.BucketsResourceType,
|
||||
OrgID: influxdbtesting.IDPtr(11),
|
||||
},
|
||||
},
|
||||
},
|
||||
wants: wants{
|
||||
err: &influxdb.Error{
|
||||
Msg: "write:orgs/000000000000000a/buckets/0000000000000001 is unauthorized",
|
||||
Code: influxdb.EUnauthorized,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
s := authorizer.NewURMService(tt.fields.OrgService, tt.fields.UserResourceMappingService)
|
||||
|
||||
ctx := context.Background()
|
||||
ctx = influxdbcontext.SetAuthorizer(ctx, &Authorizer{[]influxdb.Permission{tt.args.permission}})
|
||||
|
||||
t.Run("create urm", func(t *testing.T) {
|
||||
err := s.CreateUserResourceMapping(ctx, &influxdb.UserResourceMapping{ResourceType: influxdb.BucketsResourceType, ResourceID: 1})
|
||||
influxdbtesting.ErrorsEqual(t, err, tt.wants.err)
|
||||
})
|
||||
|
||||
t.Run("delete urm", func(t *testing.T) {
|
||||
err := s.DeleteUserResourceMapping(ctx, 1, 100)
|
||||
influxdbtesting.ErrorsEqual(t, err, tt.wants.err)
|
||||
})
|
||||
|
||||
})
|
||||
}
|
||||
}
|
|
@ -7,8 +7,8 @@ import (
|
|||
"time"
|
||||
|
||||
bolt "github.com/coreos/bbolt"
|
||||
platform "github.com/influxdata/influxdb"
|
||||
platformcontext "github.com/influxdata/influxdb/context"
|
||||
influxdb "github.com/influxdata/influxdb"
|
||||
influxdbcontext "github.com/influxdata/influxdb/context"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -16,8 +16,8 @@ var (
|
|||
organizationIndex = []byte("organizationindexv1")
|
||||
)
|
||||
|
||||
var _ platform.OrganizationService = (*Client)(nil)
|
||||
var _ platform.OrganizationOperationLogService = (*Client)(nil)
|
||||
var _ influxdb.OrganizationService = (*Client)(nil)
|
||||
var _ influxdb.OrganizationOperationLogService = (*Client)(nil)
|
||||
|
||||
func (c *Client) initializeOrganizations(ctx context.Context, tx *bolt.Tx) error {
|
||||
if _, err := tx.CreateBucketIfNotExists([]byte(organizationBucket)); err != nil {
|
||||
|
@ -30,13 +30,13 @@ func (c *Client) initializeOrganizations(ctx context.Context, tx *bolt.Tx) error
|
|||
}
|
||||
|
||||
// FindOrganizationByID retrieves a organization by id.
|
||||
func (c *Client) FindOrganizationByID(ctx context.Context, id platform.ID) (*platform.Organization, error) {
|
||||
var o *platform.Organization
|
||||
func (c *Client) FindOrganizationByID(ctx context.Context, id influxdb.ID) (*influxdb.Organization, error) {
|
||||
var o *influxdb.Organization
|
||||
err := c.db.View(func(tx *bolt.Tx) error {
|
||||
org, pe := c.findOrganizationByID(ctx, tx, id)
|
||||
if pe != nil {
|
||||
return &platform.Error{
|
||||
Op: getOp(platform.OpFindOrganizationByID),
|
||||
return &influxdb.Error{
|
||||
Op: getOp(influxdb.OpFindOrganizationByID),
|
||||
Err: pe,
|
||||
}
|
||||
}
|
||||
|
@ -51,26 +51,26 @@ func (c *Client) FindOrganizationByID(ctx context.Context, id platform.ID) (*pla
|
|||
return o, nil
|
||||
}
|
||||
|
||||
func (c *Client) findOrganizationByID(ctx context.Context, tx *bolt.Tx, id platform.ID) (*platform.Organization, *platform.Error) {
|
||||
func (c *Client) findOrganizationByID(ctx context.Context, tx *bolt.Tx, id influxdb.ID) (*influxdb.Organization, *influxdb.Error) {
|
||||
encodedID, err := id.Encode()
|
||||
if err != nil {
|
||||
return nil, &platform.Error{
|
||||
Code: platform.EInvalid,
|
||||
return nil, &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
v := tx.Bucket(organizationBucket).Get(encodedID)
|
||||
if len(v) == 0 {
|
||||
return nil, &platform.Error{
|
||||
Code: platform.ENotFound,
|
||||
return nil, &influxdb.Error{
|
||||
Code: influxdb.ENotFound,
|
||||
Msg: "organization not found",
|
||||
}
|
||||
}
|
||||
|
||||
var o platform.Organization
|
||||
var o influxdb.Organization
|
||||
if err := json.Unmarshal(v, &o); err != nil {
|
||||
return nil, &platform.Error{
|
||||
return nil, &influxdb.Error{
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
@ -79,8 +79,8 @@ func (c *Client) findOrganizationByID(ctx context.Context, tx *bolt.Tx, id platf
|
|||
}
|
||||
|
||||
// FindOrganizationByName returns a organization by name for a particular organization.
|
||||
func (c *Client) FindOrganizationByName(ctx context.Context, n string) (*platform.Organization, error) {
|
||||
var o *platform.Organization
|
||||
func (c *Client) FindOrganizationByName(ctx context.Context, n string) (*influxdb.Organization, error) {
|
||||
var o *influxdb.Organization
|
||||
|
||||
err := c.db.View(func(tx *bolt.Tx) error {
|
||||
org, pe := c.findOrganizationByName(ctx, tx, n)
|
||||
|
@ -94,19 +94,19 @@ func (c *Client) FindOrganizationByName(ctx context.Context, n string) (*platfor
|
|||
return o, err
|
||||
}
|
||||
|
||||
func (c *Client) findOrganizationByName(ctx context.Context, tx *bolt.Tx, n string) (*platform.Organization, *platform.Error) {
|
||||
func (c *Client) findOrganizationByName(ctx context.Context, tx *bolt.Tx, n string) (*influxdb.Organization, *influxdb.Error) {
|
||||
o := tx.Bucket(organizationIndex).Get(organizationIndexKey(n))
|
||||
if o == nil {
|
||||
return nil, &platform.Error{
|
||||
Code: platform.ENotFound,
|
||||
return nil, &influxdb.Error{
|
||||
Code: influxdb.ENotFound,
|
||||
Msg: "organization not found",
|
||||
}
|
||||
}
|
||||
|
||||
var id platform.ID
|
||||
var id influxdb.ID
|
||||
if err := id.Decode(o); err != nil {
|
||||
return nil, &platform.Error{
|
||||
Code: platform.EInvalid,
|
||||
return nil, &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
@ -116,12 +116,12 @@ func (c *Client) findOrganizationByName(ctx context.Context, tx *bolt.Tx, n stri
|
|||
// FindOrganization retrives a organization using an arbitrary organization filter.
|
||||
// Filters using ID, or Name should be efficient.
|
||||
// Other filters will do a linear scan across organizations until it finds a match.
|
||||
func (c *Client) FindOrganization(ctx context.Context, filter platform.OrganizationFilter) (*platform.Organization, error) {
|
||||
op := getOp(platform.OpFindOrganization)
|
||||
func (c *Client) FindOrganization(ctx context.Context, filter influxdb.OrganizationFilter) (*influxdb.Organization, error) {
|
||||
op := getOp(influxdb.OpFindOrganization)
|
||||
if filter.ID != nil {
|
||||
o, err := c.FindOrganizationByID(ctx, *filter.ID)
|
||||
if err != nil {
|
||||
return nil, &platform.Error{
|
||||
return nil, &influxdb.Error{
|
||||
Err: err,
|
||||
Op: op,
|
||||
}
|
||||
|
@ -132,7 +132,7 @@ func (c *Client) FindOrganization(ctx context.Context, filter platform.Organizat
|
|||
if filter.Name != nil {
|
||||
o, err := c.FindOrganizationByName(ctx, *filter.Name)
|
||||
if err != nil {
|
||||
return nil, &platform.Error{
|
||||
return nil, &influxdb.Error{
|
||||
Err: err,
|
||||
Op: op,
|
||||
}
|
||||
|
@ -142,9 +142,9 @@ func (c *Client) FindOrganization(ctx context.Context, filter platform.Organizat
|
|||
|
||||
filterFn := filterOrganizationsFn(filter)
|
||||
|
||||
var o *platform.Organization
|
||||
var o *influxdb.Organization
|
||||
err := c.db.View(func(tx *bolt.Tx) error {
|
||||
return forEachOrganization(ctx, tx, func(org *platform.Organization) bool {
|
||||
return forEachOrganization(ctx, tx, func(org *influxdb.Organization) bool {
|
||||
if filterFn(org) {
|
||||
o = org
|
||||
return false
|
||||
|
@ -154,15 +154,15 @@ func (c *Client) FindOrganization(ctx context.Context, filter platform.Organizat
|
|||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, &platform.Error{
|
||||
return nil, &influxdb.Error{
|
||||
Op: op,
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
if o == nil {
|
||||
return nil, &platform.Error{
|
||||
Code: platform.ENotFound,
|
||||
return nil, &influxdb.Error{
|
||||
Code: influxdb.ENotFound,
|
||||
Op: op,
|
||||
Msg: "organization not found",
|
||||
}
|
||||
|
@ -171,55 +171,55 @@ func (c *Client) FindOrganization(ctx context.Context, filter platform.Organizat
|
|||
return o, nil
|
||||
}
|
||||
|
||||
func filterOrganizationsFn(filter platform.OrganizationFilter) func(o *platform.Organization) bool {
|
||||
func filterOrganizationsFn(filter influxdb.OrganizationFilter) func(o *influxdb.Organization) bool {
|
||||
if filter.ID != nil {
|
||||
return func(o *platform.Organization) bool {
|
||||
return func(o *influxdb.Organization) bool {
|
||||
return o.ID == *filter.ID
|
||||
}
|
||||
}
|
||||
|
||||
if filter.Name != nil {
|
||||
return func(o *platform.Organization) bool {
|
||||
return func(o *influxdb.Organization) bool {
|
||||
return o.Name == *filter.Name
|
||||
}
|
||||
}
|
||||
|
||||
return func(o *platform.Organization) bool { return true }
|
||||
return func(o *influxdb.Organization) bool { return true }
|
||||
}
|
||||
|
||||
// FindOrganizations retrives all organizations that match an arbitrary organization filter.
|
||||
// Filters using ID, or Name should be efficient.
|
||||
// Other filters will do a linear scan across all organizations searching for a match.
|
||||
func (c *Client) FindOrganizations(ctx context.Context, filter platform.OrganizationFilter, opt ...platform.FindOptions) ([]*platform.Organization, int, error) {
|
||||
op := getOp(platform.OpFindOrganizations)
|
||||
func (c *Client) FindOrganizations(ctx context.Context, filter influxdb.OrganizationFilter, opt ...influxdb.FindOptions) ([]*influxdb.Organization, int, error) {
|
||||
op := getOp(influxdb.OpFindOrganizations)
|
||||
if filter.ID != nil {
|
||||
o, err := c.FindOrganizationByID(ctx, *filter.ID)
|
||||
if err != nil {
|
||||
return nil, 0, &platform.Error{
|
||||
return nil, 0, &influxdb.Error{
|
||||
Err: err,
|
||||
Op: op,
|
||||
}
|
||||
}
|
||||
|
||||
return []*platform.Organization{o}, 1, nil
|
||||
return []*influxdb.Organization{o}, 1, nil
|
||||
}
|
||||
|
||||
if filter.Name != nil {
|
||||
o, err := c.FindOrganizationByName(ctx, *filter.Name)
|
||||
if err != nil {
|
||||
return nil, 0, &platform.Error{
|
||||
return nil, 0, &influxdb.Error{
|
||||
Err: err,
|
||||
Op: op,
|
||||
}
|
||||
}
|
||||
|
||||
return []*platform.Organization{o}, 1, nil
|
||||
return []*influxdb.Organization{o}, 1, nil
|
||||
}
|
||||
|
||||
os := []*platform.Organization{}
|
||||
os := []*influxdb.Organization{}
|
||||
filterFn := filterOrganizationsFn(filter)
|
||||
err := c.db.View(func(tx *bolt.Tx) error {
|
||||
return forEachOrganization(ctx, tx, func(o *platform.Organization) bool {
|
||||
return forEachOrganization(ctx, tx, func(o *influxdb.Organization) bool {
|
||||
if filterFn(o) {
|
||||
os = append(os, o)
|
||||
}
|
||||
|
@ -228,7 +228,7 @@ func (c *Client) FindOrganizations(ctx context.Context, filter platform.Organiza
|
|||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, 0, &platform.Error{
|
||||
return nil, 0, &influxdb.Error{
|
||||
Err: err,
|
||||
Op: op,
|
||||
}
|
||||
|
@ -237,14 +237,14 @@ func (c *Client) FindOrganizations(ctx context.Context, filter platform.Organiza
|
|||
return os, len(os), nil
|
||||
}
|
||||
|
||||
// CreateOrganization creates a platform organization and sets b.ID.
|
||||
func (c *Client) CreateOrganization(ctx context.Context, o *platform.Organization) error {
|
||||
op := getOp(platform.OpCreateOrganization)
|
||||
// CreateOrganization creates a influxdb organization and sets b.ID.
|
||||
func (c *Client) CreateOrganization(ctx context.Context, o *influxdb.Organization) error {
|
||||
op := getOp(influxdb.OpCreateOrganization)
|
||||
return c.db.Update(func(tx *bolt.Tx) error {
|
||||
unique := c.uniqueOrganizationName(ctx, tx, o)
|
||||
if !unique {
|
||||
return &platform.Error{
|
||||
Code: platform.EConflict,
|
||||
return &influxdb.Error{
|
||||
Code: influxdb.EConflict,
|
||||
Op: op,
|
||||
Msg: fmt.Sprintf("organization with name %s already exists", o.Name),
|
||||
}
|
||||
|
@ -252,14 +252,14 @@ func (c *Client) CreateOrganization(ctx context.Context, o *platform.Organizatio
|
|||
|
||||
o.ID = c.IDGenerator.ID()
|
||||
if err := c.appendOrganizationEventToLog(ctx, tx, o.ID, organizationCreatedEvent); err != nil {
|
||||
return &platform.Error{
|
||||
return &influxdb.Error{
|
||||
Err: err,
|
||||
Op: op,
|
||||
}
|
||||
}
|
||||
|
||||
if err := c.putOrganization(ctx, tx, o); err != nil {
|
||||
return &platform.Error{
|
||||
return &influxdb.Error{
|
||||
Err: err,
|
||||
Op: op,
|
||||
}
|
||||
|
@ -269,7 +269,7 @@ func (c *Client) CreateOrganization(ctx context.Context, o *platform.Organizatio
|
|||
}
|
||||
|
||||
// PutOrganization will put a organization without setting an ID.
|
||||
func (c *Client) PutOrganization(ctx context.Context, o *platform.Organization) error {
|
||||
func (c *Client) PutOrganization(ctx context.Context, o *influxdb.Organization) error {
|
||||
var err error
|
||||
return c.db.Update(func(tx *bolt.Tx) error {
|
||||
if pe := c.putOrganization(ctx, tx, o); pe != nil {
|
||||
|
@ -279,27 +279,27 @@ func (c *Client) PutOrganization(ctx context.Context, o *platform.Organization)
|
|||
})
|
||||
}
|
||||
|
||||
func (c *Client) putOrganization(ctx context.Context, tx *bolt.Tx, o *platform.Organization) *platform.Error {
|
||||
func (c *Client) putOrganization(ctx context.Context, tx *bolt.Tx, o *influxdb.Organization) *influxdb.Error {
|
||||
v, err := json.Marshal(o)
|
||||
if err != nil {
|
||||
return &platform.Error{
|
||||
return &influxdb.Error{
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
encodedID, err := o.ID.Encode()
|
||||
if err != nil {
|
||||
return &platform.Error{
|
||||
Code: platform.EInvalid,
|
||||
return &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
if err := tx.Bucket(organizationIndex).Put(organizationIndexKey(o.Name), encodedID); err != nil {
|
||||
return &platform.Error{
|
||||
return &influxdb.Error{
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
if err = tx.Bucket(organizationBucket).Put(encodedID, v); err != nil {
|
||||
return &platform.Error{
|
||||
return &influxdb.Error{
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
@ -311,10 +311,10 @@ func organizationIndexKey(n string) []byte {
|
|||
}
|
||||
|
||||
// forEachOrganization will iterate through all organizations while fn returns true.
|
||||
func forEachOrganization(ctx context.Context, tx *bolt.Tx, fn func(*platform.Organization) bool) error {
|
||||
func forEachOrganization(ctx context.Context, tx *bolt.Tx, fn func(*influxdb.Organization) bool) error {
|
||||
cur := tx.Bucket(organizationBucket).Cursor()
|
||||
for k, v := cur.First(); k != nil; k, v = cur.Next() {
|
||||
o := &platform.Organization{}
|
||||
o := &influxdb.Organization{}
|
||||
if err := json.Unmarshal(v, o); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -326,20 +326,20 @@ func forEachOrganization(ctx context.Context, tx *bolt.Tx, fn func(*platform.Org
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) uniqueOrganizationName(ctx context.Context, tx *bolt.Tx, o *platform.Organization) bool {
|
||||
func (c *Client) uniqueOrganizationName(ctx context.Context, tx *bolt.Tx, o *influxdb.Organization) bool {
|
||||
v := tx.Bucket(organizationIndex).Get(organizationIndexKey(o.Name))
|
||||
return len(v) == 0
|
||||
}
|
||||
|
||||
// UpdateOrganization updates a organization according the parameters set on upd.
|
||||
func (c *Client) UpdateOrganization(ctx context.Context, id platform.ID, upd platform.OrganizationUpdate) (*platform.Organization, error) {
|
||||
var o *platform.Organization
|
||||
func (c *Client) UpdateOrganization(ctx context.Context, id influxdb.ID, upd influxdb.OrganizationUpdate) (*influxdb.Organization, error) {
|
||||
var o *influxdb.Organization
|
||||
err := c.db.Update(func(tx *bolt.Tx) error {
|
||||
org, pe := c.updateOrganization(ctx, tx, id, upd)
|
||||
if pe != nil {
|
||||
return &platform.Error{
|
||||
return &influxdb.Error{
|
||||
Err: pe,
|
||||
Op: getOp(platform.OpUpdateOrganization),
|
||||
Op: getOp(influxdb.OpUpdateOrganization),
|
||||
}
|
||||
}
|
||||
o = org
|
||||
|
@ -349,7 +349,7 @@ func (c *Client) UpdateOrganization(ctx context.Context, id platform.ID, upd pla
|
|||
return o, err
|
||||
}
|
||||
|
||||
func (c *Client) updateOrganization(ctx context.Context, tx *bolt.Tx, id platform.ID, upd platform.OrganizationUpdate) (*platform.Organization, *platform.Error) {
|
||||
func (c *Client) updateOrganization(ctx context.Context, tx *bolt.Tx, id influxdb.ID, upd influxdb.OrganizationUpdate) (*influxdb.Organization, *influxdb.Error) {
|
||||
o, pe := c.findOrganizationByID(ctx, tx, id)
|
||||
if pe != nil {
|
||||
return nil, pe
|
||||
|
@ -359,7 +359,7 @@ func (c *Client) updateOrganization(ctx context.Context, tx *bolt.Tx, id platfor
|
|||
// Organizations are indexed by name and so the organization index must be pruned
|
||||
// when name is modified.
|
||||
if err := tx.Bucket(organizationIndex).Delete(organizationIndexKey(o.Name)); err != nil {
|
||||
return nil, &platform.Error{
|
||||
return nil, &influxdb.Error{
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
@ -367,7 +367,7 @@ func (c *Client) updateOrganization(ctx context.Context, tx *bolt.Tx, id platfor
|
|||
}
|
||||
|
||||
if err := c.appendOrganizationEventToLog(ctx, tx, o.ID, organizationUpdatedEvent); err != nil {
|
||||
return nil, &platform.Error{
|
||||
return nil, &influxdb.Error{
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
@ -380,7 +380,7 @@ func (c *Client) updateOrganization(ctx context.Context, tx *bolt.Tx, id platfor
|
|||
}
|
||||
|
||||
// DeleteOrganization deletes a organization and prunes it from the index.
|
||||
func (c *Client) DeleteOrganization(ctx context.Context, id platform.ID) error {
|
||||
func (c *Client) DeleteOrganization(ctx context.Context, id influxdb.ID) error {
|
||||
err := c.db.Update(func(tx *bolt.Tx) error {
|
||||
if pe := c.deleteOrganizationsBuckets(ctx, tx, id); pe != nil {
|
||||
return pe
|
||||
|
@ -391,41 +391,41 @@ func (c *Client) DeleteOrganization(ctx context.Context, id platform.ID) error {
|
|||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return &platform.Error{
|
||||
Op: getOp(platform.OpDeleteOrganization),
|
||||
return &influxdb.Error{
|
||||
Op: getOp(influxdb.OpDeleteOrganization),
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) deleteOrganization(ctx context.Context, tx *bolt.Tx, id platform.ID) *platform.Error {
|
||||
func (c *Client) deleteOrganization(ctx context.Context, tx *bolt.Tx, id influxdb.ID) *influxdb.Error {
|
||||
o, pe := c.findOrganizationByID(ctx, tx, id)
|
||||
if pe != nil {
|
||||
return pe
|
||||
}
|
||||
if err := tx.Bucket(organizationIndex).Delete(organizationIndexKey(o.Name)); err != nil {
|
||||
return &platform.Error{
|
||||
return &influxdb.Error{
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
encodedID, err := id.Encode()
|
||||
if err != nil {
|
||||
return &platform.Error{
|
||||
Code: platform.EInvalid,
|
||||
return &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
if err = tx.Bucket(organizationBucket).Delete(encodedID); err != nil {
|
||||
return &platform.Error{
|
||||
return &influxdb.Error{
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) deleteOrganizationsBuckets(ctx context.Context, tx *bolt.Tx, id platform.ID) *platform.Error {
|
||||
filter := platform.BucketFilter{
|
||||
func (c *Client) deleteOrganizationsBuckets(ctx context.Context, tx *bolt.Tx, id influxdb.ID) *influxdb.Error {
|
||||
filter := influxdb.BucketFilter{
|
||||
OrganizationID: &id,
|
||||
}
|
||||
bs, pe := c.findBuckets(ctx, tx, filter)
|
||||
|
@ -441,9 +441,9 @@ func (c *Client) deleteOrganizationsBuckets(ctx context.Context, tx *bolt.Tx, id
|
|||
}
|
||||
|
||||
// GeOrganizationOperationLog retrieves a organization operation log.
|
||||
func (c *Client) GetOrganizationOperationLog(ctx context.Context, id platform.ID, opts platform.FindOptions) ([]*platform.OperationLogEntry, int, error) {
|
||||
func (c *Client) GetOrganizationOperationLog(ctx context.Context, id influxdb.ID, opts influxdb.FindOptions) ([]*influxdb.OperationLogEntry, int, error) {
|
||||
// TODO(desa): might be worthwhile to allocate a slice of size opts.Limit
|
||||
log := []*platform.OperationLogEntry{}
|
||||
log := []*influxdb.OperationLogEntry{}
|
||||
|
||||
err := c.db.View(func(tx *bolt.Tx) error {
|
||||
key, err := encodeBucketOperationLogKey(id)
|
||||
|
@ -452,7 +452,7 @@ func (c *Client) GetOrganizationOperationLog(ctx context.Context, id platform.ID
|
|||
}
|
||||
|
||||
return c.forEachLogEntry(ctx, tx, key, opts, func(v []byte, t time.Time) error {
|
||||
e := &platform.OperationLogEntry{}
|
||||
e := &influxdb.OperationLogEntry{}
|
||||
if err := json.Unmarshal(v, e); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -477,7 +477,7 @@ const (
|
|||
organizationUpdatedEvent = "Organization Updated"
|
||||
)
|
||||
|
||||
func encodeOrganizationOperationLogKey(id platform.ID) ([]byte, error) {
|
||||
func encodeOrganizationOperationLogKey(id influxdb.ID) ([]byte, error) {
|
||||
buf, err := id.Encode()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -485,14 +485,14 @@ func encodeOrganizationOperationLogKey(id platform.ID) ([]byte, error) {
|
|||
return append([]byte(bucketOperationLogKeyPrefix), buf...), nil
|
||||
}
|
||||
|
||||
func (c *Client) appendOrganizationEventToLog(ctx context.Context, tx *bolt.Tx, id platform.ID, s string) error {
|
||||
e := &platform.OperationLogEntry{
|
||||
func (c *Client) appendOrganizationEventToLog(ctx context.Context, tx *bolt.Tx, id influxdb.ID, s string) error {
|
||||
e := &influxdb.OperationLogEntry{
|
||||
Description: s,
|
||||
}
|
||||
// TODO(desa): this is fragile and non explicit since it requires an authorizer to be on context. It should be
|
||||
// replaced with a higher level transaction so that adding to the log can take place in the http handler
|
||||
// where the organizationID will exist explicitly.
|
||||
a, err := platformcontext.GetAuthorizer(ctx)
|
||||
a, err := influxdbcontext.GetAuthorizer(ctx)
|
||||
if err == nil {
|
||||
// Add the organization to the log if you can, but don't error if its not there.
|
||||
e.UserID = a.GetUserID()
|
||||
|
@ -510,3 +510,60 @@ func (c *Client) appendOrganizationEventToLog(ctx context.Context, tx *bolt.Tx,
|
|||
|
||||
return c.addLogEntry(ctx, tx, k, v, c.time())
|
||||
}
|
||||
|
||||
func (c *Client) FindResourceOrganizationID(ctx context.Context, rt influxdb.ResourceType, id influxdb.ID) (influxdb.ID, error) {
|
||||
switch rt {
|
||||
case influxdb.AuthorizationsResourceType:
|
||||
r, err := c.FindAuthorizationByID(ctx, id)
|
||||
if err != nil {
|
||||
return influxdb.InvalidID(), err
|
||||
}
|
||||
return r.OrgID, nil
|
||||
case influxdb.BucketsResourceType:
|
||||
r, err := c.FindBucketByID(ctx, id)
|
||||
if err != nil {
|
||||
return influxdb.InvalidID(), err
|
||||
}
|
||||
return r.OrganizationID, nil
|
||||
case influxdb.DashboardsResourceType:
|
||||
r, err := c.FindDashboardByID(ctx, id)
|
||||
if err != nil {
|
||||
return influxdb.InvalidID(), err
|
||||
}
|
||||
return r.OrganizationID, nil
|
||||
case influxdb.OrgsResourceType:
|
||||
r, err := c.FindOrganizationByID(ctx, id)
|
||||
if err != nil {
|
||||
return influxdb.InvalidID(), err
|
||||
}
|
||||
return r.ID, nil
|
||||
case influxdb.SourcesResourceType:
|
||||
r, err := c.FindSourceByID(ctx, id)
|
||||
if err != nil {
|
||||
return influxdb.InvalidID(), err
|
||||
}
|
||||
return r.OrganizationID, nil
|
||||
case influxdb.TelegrafsResourceType:
|
||||
r, err := c.FindTelegrafConfigByID(ctx, id)
|
||||
if err != nil {
|
||||
return influxdb.InvalidID(), err
|
||||
}
|
||||
return r.OrganizationID, nil
|
||||
case influxdb.MacrosResourceType:
|
||||
r, err := c.FindMacroByID(ctx, id)
|
||||
if err != nil {
|
||||
return influxdb.InvalidID(), err
|
||||
}
|
||||
return r.OrganizationID, nil
|
||||
case influxdb.ScraperResourceType:
|
||||
r, err := c.GetTargetByID(ctx, id)
|
||||
if err != nil {
|
||||
return influxdb.InvalidID(), err
|
||||
}
|
||||
return r.OrgID, nil
|
||||
}
|
||||
|
||||
return influxdb.InvalidID(), &influxdb.Error{
|
||||
Msg: fmt.Sprintf("unsupported resource type %s", rt),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -432,6 +432,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
|
|||
SecretService: secretSvc,
|
||||
LookupService: lookupSvc,
|
||||
ProtoService: protoSvc,
|
||||
OrgLookupService: m.boltClient,
|
||||
}
|
||||
|
||||
// HTTP server
|
||||
|
|
|
@ -68,6 +68,7 @@ type APIBackend struct {
|
|||
LookupService influxdb.LookupService
|
||||
ChronografService *server.Service
|
||||
ProtoService influxdb.ProtoService
|
||||
OrgLookupService authorizer.OrganizationService
|
||||
}
|
||||
|
||||
// NewAPIHandler constructs all api handlers beneath it and returns an APIHandler
|
||||
|
@ -76,6 +77,7 @@ func NewAPIHandler(b *APIBackend) *APIHandler {
|
|||
|
||||
sessionBackend := NewSessionBackend(b)
|
||||
h.SessionHandler = NewSessionHandler(sessionBackend)
|
||||
b.UserResourceMappingService = authorizer.NewURMService(b.OrgLookupService, b.UserResourceMappingService)
|
||||
|
||||
h.BucketHandler = NewBucketHandler(b.UserResourceMappingService, b.LabelService, b.UserService)
|
||||
h.BucketHandler.BucketService = authorizer.NewBucketService(b.BucketService)
|
||||
|
|
Loading…
Reference in New Issue