fix(platform): use authorizer instead of authorizion for endpoints
feat(platform): add ToPermissions method to user resource mapping The ToPermissions method returns a set of permissions that is granted via a user resource mapping. feat(bolt): resolve sessions permissions on lookup feat(http): use authorizer instead of authorization service for write api feat(bolt): create user resource mappings for org users in bucket create feat(bolt): create user resource mapping for first org/user fix(platform): use authorizer for query endpoint instead of authorization test(http): use cmp instead of reflect for decode testpull/10616/head
parent
806c69dc90
commit
407c01cb17
|
@ -282,7 +282,11 @@ func (c *Client) CreateBucket(ctx context.Context, b *platform.Bucket) error {
|
|||
return err
|
||||
}
|
||||
|
||||
return c.putBucket(ctx, tx, b)
|
||||
if err := c.putBucket(ctx, tx, b); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.createBucketUserResourceMappings(ctx, tx, b)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -293,6 +297,29 @@ func (c *Client) PutBucket(ctx context.Context, b *platform.Bucket) error {
|
|||
})
|
||||
}
|
||||
|
||||
func (c *Client) createBucketUserResourceMappings(ctx context.Context, tx *bolt.Tx, b *platform.Bucket) error {
|
||||
ms, err := c.findUserResourceMappings(ctx, tx, platform.UserResourceMappingFilter{
|
||||
ResourceType: platform.OrgResourceType,
|
||||
ResourceID: b.OrganizationID,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, m := range ms {
|
||||
if err := c.createUserResourceMapping(ctx, tx, &platform.UserResourceMapping{
|
||||
ResourceType: platform.BucketResourceType,
|
||||
ResourceID: b.ID,
|
||||
UserID: m.UserID,
|
||||
UserType: m.UserType,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) putBucket(ctx context.Context, tx *bolt.Tx, b *platform.Bucket) error {
|
||||
b.Organization = ""
|
||||
v, err := json.Marshal(b)
|
||||
|
|
|
@ -105,6 +105,14 @@ func (c *Client) Generate(ctx context.Context, req *platform.OnboardingRequest)
|
|||
OrganizationID: o.ID,
|
||||
RetentionPeriod: time.Duration(req.RetentionPeriod) * time.Hour,
|
||||
}
|
||||
if err := c.CreateUserResourceMapping(ctx, &platform.UserResourceMapping{
|
||||
ResourceType: platform.OrgResourceType,
|
||||
ResourceID: o.ID,
|
||||
UserID: u.ID,
|
||||
UserType: platform.Owner,
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = c.CreateBucket(ctx, bucket); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -55,6 +55,18 @@ func (c *Client) findSession(ctx context.Context, tx *bolt.Tx, key string) (*pla
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// TODO(desa): these values should be cached so it's not so expensive to lookup each time.
|
||||
f := platform.UserResourceMappingFilter{UserID: s.UserID}
|
||||
mappings, err := c.findUserResourceMappings(ctx, tx, f)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ps := make([]platform.Permission, 0, len(mappings))
|
||||
for _, m := range mappings {
|
||||
ps = append(ps, m.ToPermissions()...)
|
||||
}
|
||||
s.Permissions = ps
|
||||
return s, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -65,9 +65,30 @@ func (c *Client) findUserResourceMappings(ctx context.Context, tx *bolt.Tx, filt
|
|||
return ms, nil
|
||||
}
|
||||
|
||||
func (c *Client) findUserResourceMapping(ctx context.Context, tx *bolt.Tx, filter platform.UserResourceMappingFilter) (*platform.UserResourceMapping, error) {
|
||||
ms, err := c.findUserResourceMappings(ctx, tx, filter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(ms) == 0 {
|
||||
return nil, fmt.Errorf("userResource mapping not found")
|
||||
}
|
||||
|
||||
return ms[0], nil
|
||||
}
|
||||
|
||||
func (c *Client) CreateUserResourceMapping(ctx context.Context, m *platform.UserResourceMapping) error {
|
||||
return c.db.Update(func(tx *bolt.Tx) error {
|
||||
return c.createUserResourceMapping(ctx, tx, m)
|
||||
if err := c.createUserResourceMapping(ctx, tx, m); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if m.ResourceType == platform.OrgResourceType {
|
||||
return c.createOrgDependentMappings(ctx, tx, m)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -95,6 +116,29 @@ func (c *Client) createUserResourceMapping(ctx context.Context, tx *bolt.Tx, m *
|
|||
return nil
|
||||
}
|
||||
|
||||
// This method creates the user/resource mappings for resources that belong to an organization.
|
||||
func (c *Client) createOrgDependentMappings(ctx context.Context, tx *bolt.Tx, m *platform.UserResourceMapping) error {
|
||||
bf := platform.BucketFilter{OrganizationID: &m.ResourceID}
|
||||
bs, err := c.findBuckets(ctx, tx, bf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, b := range bs {
|
||||
m := &platform.UserResourceMapping{
|
||||
ResourceType: platform.BucketResourceType,
|
||||
ResourceID: b.ID,
|
||||
UserType: m.UserType,
|
||||
UserID: m.UserID,
|
||||
}
|
||||
if err := c.createUserResourceMapping(ctx, tx, m); err != nil {
|
||||
return err
|
||||
}
|
||||
// TODO(desa): add support for all other resource types.
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func userResourceKey(m *platform.UserResourceMapping) ([]byte, error) {
|
||||
encodedResourceID, err := m.ResourceID.Encode()
|
||||
if err != nil {
|
||||
|
@ -141,10 +185,26 @@ func (c *Client) uniqueUserResourceMapping(ctx context.Context, tx *bolt.Tx, m *
|
|||
// DeleteUserResourceMapping deletes a user resource mapping.
|
||||
func (c *Client) DeleteUserResourceMapping(ctx context.Context, resourceID platform.ID, userID platform.ID) error {
|
||||
return c.db.Update(func(tx *bolt.Tx) error {
|
||||
return c.deleteUserResourceMapping(ctx, tx, platform.UserResourceMappingFilter{
|
||||
m, err := c.findUserResourceMapping(ctx, tx, platform.UserResourceMappingFilter{
|
||||
ResourceID: resourceID,
|
||||
UserID: userID,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := c.deleteUserResourceMapping(ctx, tx, platform.UserResourceMappingFilter{
|
||||
ResourceID: resourceID,
|
||||
UserID: userID,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if m.ResourceType == platform.OrgResourceType {
|
||||
return c.deleteOrgDependentMappings(ctx, tx, m)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -181,3 +241,24 @@ func (c *Client) deleteUserResourceMappings(ctx context.Context, tx *bolt.Tx, fi
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// This method deletes the user/resource mappings for resources that belong to an organization.
|
||||
func (c *Client) deleteOrgDependentMappings(ctx context.Context, tx *bolt.Tx, m *platform.UserResourceMapping) error {
|
||||
bf := platform.BucketFilter{OrganizationID: &m.ResourceID}
|
||||
bs, err := c.findBuckets(ctx, tx, bf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, b := range bs {
|
||||
if err := c.deleteUserResourceMapping(ctx, tx, platform.UserResourceMappingFilter{
|
||||
ResourceType: platform.BucketResourceType,
|
||||
ResourceID: b.ID,
|
||||
UserID: m.UserID,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
// TODO(desa): add support for all other resource types.
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -119,13 +119,11 @@ func NewAPIHandler(b *APIBackend) *APIHandler {
|
|||
)
|
||||
|
||||
h.WriteHandler = NewWriteHandler(b.PointsWriter)
|
||||
h.WriteHandler.AuthorizationService = b.AuthorizationService
|
||||
h.WriteHandler.OrganizationService = b.OrganizationService
|
||||
h.WriteHandler.BucketService = b.BucketService
|
||||
h.WriteHandler.Logger = b.Logger.With(zap.String("handler", "write"))
|
||||
|
||||
h.QueryHandler = NewFluxHandler()
|
||||
h.QueryHandler.AuthorizationService = b.AuthorizationService
|
||||
h.QueryHandler.OrganizationService = b.OrganizationService
|
||||
h.QueryHandler.Logger = b.Logger.With(zap.String("handler", "query"))
|
||||
h.QueryHandler.ProxyQueryService = b.ProxyQueryService
|
||||
|
|
|
@ -222,7 +222,7 @@ func decodeQueryRequest(ctx context.Context, r *http.Request, svc platform.Organ
|
|||
return &req, err
|
||||
}
|
||||
|
||||
func decodeProxyQueryRequest(ctx context.Context, r *http.Request, auth *platform.Authorization, svc platform.OrganizationService) (*query.ProxyRequest, error) {
|
||||
func decodeProxyQueryRequest(ctx context.Context, r *http.Request, auth platform.Authorizer, svc platform.OrganizationService) (*query.ProxyRequest, error) {
|
||||
req, err := decodeQueryRequest(ctx, r, svc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -233,6 +233,6 @@ func decodeProxyQueryRequest(ctx context.Context, r *http.Request, auth *platfor
|
|||
return nil, err
|
||||
}
|
||||
|
||||
pr.Request.Authorization = auth
|
||||
pr.Request.Authorizer = auth
|
||||
return pr, nil
|
||||
}
|
||||
|
|
|
@ -35,10 +35,9 @@ type FluxHandler struct {
|
|||
|
||||
Logger *zap.Logger
|
||||
|
||||
Now func() time.Time
|
||||
AuthorizationService platform.AuthorizationService
|
||||
OrganizationService platform.OrganizationService
|
||||
ProxyQueryService query.ProxyQueryService
|
||||
Now func() time.Time
|
||||
OrganizationService platform.OrganizationService
|
||||
ProxyQueryService query.ProxyQueryService
|
||||
}
|
||||
|
||||
// NewFluxHandler returns a new handler at /api/v2/query for flux queries.
|
||||
|
@ -60,24 +59,14 @@ func NewFluxHandler() *FluxHandler {
|
|||
|
||||
func (h *FluxHandler) handlePostQuery(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
||||
a, err := pcontext.GetAuthorizer(ctx)
|
||||
if err != nil {
|
||||
EncodeError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
|
||||
auth, err := h.AuthorizationService.FindAuthorizationByID(ctx, a.Identifier())
|
||||
if err != nil {
|
||||
EncodeError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
|
||||
if !auth.IsActive() {
|
||||
EncodeError(ctx, errors.Forbiddenf("insufficient permissions for query"), w)
|
||||
return
|
||||
}
|
||||
|
||||
req, err := decodeProxyQueryRequest(ctx, r, auth, h.OrganizationService)
|
||||
req, err := decodeProxyQueryRequest(ctx, r, a, h.OrganizationService)
|
||||
if err != nil {
|
||||
EncodeError(ctx, err, w)
|
||||
return
|
||||
|
|
|
@ -9,6 +9,8 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/flux/ast"
|
||||
"github.com/influxdata/flux/csv"
|
||||
|
@ -461,6 +463,12 @@ func Test_decodeProxyQueryRequest(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
var cmpOptions = cmp.Options{
|
||||
cmpopts.IgnoreUnexported(query.ProxyRequest{}),
|
||||
cmpopts.IgnoreUnexported(query.Request{}),
|
||||
cmpopts.IgnoreFields(query.Request{}, "Authorizer"),
|
||||
cmpopts.EquateEmpty(),
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := decodeProxyQueryRequest(tt.args.ctx, tt.args.r, tt.args.auth, tt.args.svc)
|
||||
|
@ -468,8 +476,8 @@ func Test_decodeProxyQueryRequest(t *testing.T) {
|
|||
t.Errorf("decodeProxyQueryRequest() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("decodeProxyQueryRequest() = %v, want %v", got, tt.want)
|
||||
if diff := cmp.Diff(got, tt.want, cmpOptions...); diff != "" {
|
||||
t.Errorf("decodeProxyQueryRequest() = got/want %v", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -25,9 +25,8 @@ type WriteHandler struct {
|
|||
|
||||
Logger *zap.Logger
|
||||
|
||||
AuthorizationService platform.AuthorizationService
|
||||
BucketService platform.BucketService
|
||||
OrganizationService platform.OrganizationService
|
||||
BucketService platform.BucketService
|
||||
OrganizationService platform.OrganizationService
|
||||
|
||||
PointsWriter storage.PointsWriter
|
||||
}
|
||||
|
@ -69,12 +68,6 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
auth, err := h.AuthorizationService.FindAuthorizationByID(ctx, a.Identifier())
|
||||
if err != nil {
|
||||
EncodeError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
|
||||
req, err := decodeWriteRequest(ctx, r)
|
||||
if err != nil {
|
||||
EncodeError(ctx, err, w)
|
||||
|
@ -134,7 +127,7 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
|
|||
bucket = b
|
||||
}
|
||||
|
||||
if !auth.Allowed(platform.WriteBucketPermission(bucket.ID)) {
|
||||
if !a.Allowed(platform.WriteBucketPermission(bucket.ID)) {
|
||||
EncodeError(ctx, errors.Forbiddenf("insufficient permissions for write"), w)
|
||||
return
|
||||
}
|
||||
|
|
|
@ -47,7 +47,7 @@ type REPLQuerier struct {
|
|||
|
||||
func (q *REPLQuerier) Query(ctx context.Context, compiler flux.Compiler) (flux.ResultIterator, error) {
|
||||
req := &Request{
|
||||
Authorization: q.Authorization,
|
||||
Authorizer: q.Authorization,
|
||||
OrganizationID: q.OrganizationID,
|
||||
Compiler: compiler,
|
||||
}
|
||||
|
|
|
@ -31,19 +31,31 @@ type Log struct {
|
|||
|
||||
// Redact removes any sensitive information before logging
|
||||
func (q *Log) Redact() {
|
||||
if q.ProxyRequest != nil && q.ProxyRequest.Request.Authorization != nil {
|
||||
if q.ProxyRequest != nil && q.ProxyRequest.Request.Authorizer != nil {
|
||||
// Make shallow copy of request
|
||||
request := new(ProxyRequest)
|
||||
*request = *q.ProxyRequest
|
||||
|
||||
// Make shallow copy of authorization
|
||||
auth := new(platform.Authorization)
|
||||
*auth = *q.ProxyRequest.Request.Authorization
|
||||
// Redact authorization token
|
||||
auth.Token = ""
|
||||
var az platform.Authorizer
|
||||
switch a := q.ProxyRequest.Request.Authorizer.(type) {
|
||||
case *platform.Authorization:
|
||||
// Make shallow copy of authorization
|
||||
auth := new(platform.Authorization)
|
||||
*auth = *a
|
||||
// Redact authorization token
|
||||
auth.Token = ""
|
||||
az = auth
|
||||
case *platform.Session:
|
||||
// Make shallow copy of Session
|
||||
sess := new(platform.Session)
|
||||
*sess = *a
|
||||
// Redact session key
|
||||
sess.Key = ""
|
||||
az = sess
|
||||
}
|
||||
|
||||
// Apply redacted authorization
|
||||
request.Request.Authorization = auth
|
||||
// Apply redacted autorizer
|
||||
request.Request.Authorizer = az
|
||||
|
||||
// Apply redacted request
|
||||
q.ProxyRequest = request
|
||||
|
|
|
@ -12,8 +12,8 @@ import (
|
|||
// Request respresents the query to run.
|
||||
type Request struct {
|
||||
// Scope
|
||||
Authorization *platform.Authorization `json:"authorization,omitempty"`
|
||||
OrganizationID platform.ID `json:"organization_id"`
|
||||
Authorizer platform.Authorizer `json:"authorization,omitempty"`
|
||||
OrganizationID platform.ID `json:"organization_id"`
|
||||
|
||||
// Command
|
||||
|
||||
|
|
|
@ -58,7 +58,7 @@ func (qlr *QueryLogReader) ListLogs(ctx context.Context, logFilter platform.LogF
|
|||
if auth.Kind() != "authorization" {
|
||||
return nil, errAuthorizerNotSupported
|
||||
}
|
||||
request := &query.Request{Authorization: auth.(*platform.Authorization), OrganizationID: *logFilter.Org, Compiler: lang.FluxCompiler{Query: listScript}}
|
||||
request := &query.Request{Authorizer: auth.(*platform.Authorization), OrganizationID: *logFilter.Org, Compiler: lang.FluxCompiler{Query: listScript}}
|
||||
|
||||
ittr, err := qlr.queryService.Query(ctx, request)
|
||||
if err != nil {
|
||||
|
@ -136,7 +136,7 @@ join(tables: {main: main, supl: supl}, on: ["_start", "_stop", "orgID", "taskID"
|
|||
if auth.Kind() != "authorization" {
|
||||
return nil, errAuthorizerNotSupported
|
||||
}
|
||||
request := &query.Request{Authorization: auth.(*platform.Authorization), OrganizationID: *runFilter.Org, Compiler: lang.FluxCompiler{Query: listScript}}
|
||||
request := &query.Request{Authorizer: auth.(*platform.Authorization), OrganizationID: *runFilter.Org, Compiler: lang.FluxCompiler{Query: listScript}}
|
||||
|
||||
ittr, err := qlr.queryService.Query(ctx, request)
|
||||
if err != nil {
|
||||
|
@ -184,7 +184,7 @@ logs |> yield(name: "logs")
|
|||
if auth.Kind() != "authorization" {
|
||||
return nil, errAuthorizerNotSupported
|
||||
}
|
||||
request := &query.Request{Authorization: auth.(*platform.Authorization), OrganizationID: orgID, Compiler: lang.FluxCompiler{Query: showScript}}
|
||||
request := &query.Request{Authorizer: auth.(*platform.Authorization), OrganizationID: orgID, Compiler: lang.FluxCompiler{Query: showScript}}
|
||||
|
||||
ittr, err := qlr.queryService.Query(ctx, request)
|
||||
if err != nil {
|
||||
|
|
|
@ -3,6 +3,7 @@ package platform
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type UserType string
|
||||
|
@ -66,3 +67,33 @@ type UserResourceMappingFilter struct {
|
|||
UserID ID
|
||||
UserType UserType
|
||||
}
|
||||
|
||||
var ownerActions = []action{WriteAction, CreateAction, DeleteAction}
|
||||
var memberActions = []action{ReadAction}
|
||||
|
||||
// ToPermission converts a user resource mapping into a set of permissions.
|
||||
func (m *UserResourceMapping) ToPermissions() []Permission {
|
||||
// TODO(desa): we'll have to do something more fine-grained eventually
|
||||
// but this should be good enough for now.
|
||||
ps := []Permission{}
|
||||
r := resource(fmt.Sprintf("%s/%s", m.ResourceType, m.ResourceID))
|
||||
if m.UserType == Owner {
|
||||
for _, a := range ownerActions {
|
||||
p := Permission{
|
||||
Resource: r,
|
||||
Action: a,
|
||||
}
|
||||
ps = append(ps, p)
|
||||
}
|
||||
}
|
||||
|
||||
for _, a := range memberActions {
|
||||
p := Permission{
|
||||
Resource: r,
|
||||
Action: a,
|
||||
}
|
||||
ps = append(ps, p)
|
||||
}
|
||||
|
||||
return ps
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue