feat(api/task): Add validation for both tasks and buckets on task create (#1900)
fixes #1622pull/10616/head
parent
d65a980f9b
commit
f7ae8c3524
|
|
@ -321,8 +321,7 @@ func (m *Main) run(ctx context.Context) (err error) {
|
|||
queryService := query.QueryServiceBridge{AsyncQueryService: m.queryController}
|
||||
lr := taskbackend.NewQueryLogReader(queryService)
|
||||
taskSvc = task.PlatformAdapter(coordinator.New(m.logger.With(zap.String("service", "task-coordinator")), m.scheduler, boltStore), lr, m.scheduler)
|
||||
// TODO(lh): Add in `taskSvc = task.NewValidator(taskSvc)` once we have Authentication coming in the context.
|
||||
// see issue #563
|
||||
taskSvc = task.NewValidator(taskSvc, bucketSvc)
|
||||
}
|
||||
|
||||
// NATS streaming server
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ import (
|
|||
// callers to fail early for operations that are not allowed. However, it's still possible
|
||||
// for authorization to be denied at runtime even if this check passes.
|
||||
type PreAuthorizer interface {
|
||||
PreAuthorize(ctx context.Context, spec *flux.Spec, auth *platform.Authorization) error
|
||||
PreAuthorize(ctx context.Context, spec *flux.Spec, auth platform.Authorizer) error
|
||||
}
|
||||
|
||||
// NewPreAuthorizer creates a new PreAuthorizer
|
||||
|
|
@ -26,8 +26,8 @@ type preAuthorizer struct {
|
|||
}
|
||||
|
||||
// PreAuthorize finds all the buckets read and written by the given spec, and ensures that execution is allowed
|
||||
// given the Authorization. Returns nil on success, and an error with an appropriate message otherwise.
|
||||
func (a *preAuthorizer) PreAuthorize(ctx context.Context, spec *flux.Spec, auth *platform.Authorization) error {
|
||||
// given the Authorizer. Returns nil on success, and an error with an appropriate message otherwise.
|
||||
func (a *preAuthorizer) PreAuthorize(ctx context.Context, spec *flux.Spec, auth platform.Authorizer) error {
|
||||
|
||||
readBuckets, writeBuckets, err := BucketsAccessed(spec)
|
||||
|
||||
|
|
|
|||
|
|
@ -4,9 +4,12 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/platform"
|
||||
platcontext "github.com/influxdata/platform/context"
|
||||
"github.com/influxdata/platform/query"
|
||||
)
|
||||
|
||||
type authError struct {
|
||||
|
|
@ -23,11 +26,13 @@ var ErrFailedPermission = errors.New("unauthorized")
|
|||
|
||||
type taskServiceValidator struct {
|
||||
platform.TaskService
|
||||
preAuth query.PreAuthorizer
|
||||
}
|
||||
|
||||
func NewValidator(ts platform.TaskService) platform.TaskService {
|
||||
func NewValidator(ts platform.TaskService, bs platform.BucketService) platform.TaskService {
|
||||
return &taskServiceValidator{
|
||||
TaskService: ts,
|
||||
preAuth: query.NewPreAuthorizer(bs),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -36,6 +41,10 @@ func (ts *taskServiceValidator) CreateTask(ctx context.Context, t *platform.Task
|
|||
return err
|
||||
}
|
||||
|
||||
if err := validateBucket(ctx, t.Flux, ts.preAuth); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return ts.TaskService.CreateTask(ctx, t)
|
||||
}
|
||||
|
||||
|
|
@ -53,3 +62,21 @@ func validatePermission(ctx context.Context, perm platform.Permission) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateBucket(ctx context.Context, script string, preAuth query.PreAuthorizer) error {
|
||||
auth, err := platcontext.GetAuthorizer(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
spec, err := flux.Compile(ctx, script, time.Now())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := preAuth.PreAuthorize(ctx, spec, auth); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue