From f7ae8c35248953b2ebe0452875eafed764d2a25a Mon Sep 17 00:00:00 2001 From: Lyon Hill Date: Wed, 12 Dec 2018 16:07:40 -0700 Subject: [PATCH] feat(api/task): Add validation for both tasks and buckets on task create (#1900) fixes #1622 --- cmd/influxd/main.go | 3 +-- query/preauthorizer.go | 6 +++--- task/validator.go | 29 ++++++++++++++++++++++++++++- 3 files changed, 32 insertions(+), 6 deletions(-) diff --git a/cmd/influxd/main.go b/cmd/influxd/main.go index 4dbf32430b..11527486be 100644 --- a/cmd/influxd/main.go +++ b/cmd/influxd/main.go @@ -321,8 +321,7 @@ func (m *Main) run(ctx context.Context) (err error) { queryService := query.QueryServiceBridge{AsyncQueryService: m.queryController} lr := taskbackend.NewQueryLogReader(queryService) taskSvc = task.PlatformAdapter(coordinator.New(m.logger.With(zap.String("service", "task-coordinator")), m.scheduler, boltStore), lr, m.scheduler) - // TODO(lh): Add in `taskSvc = task.NewValidator(taskSvc)` once we have Authentication coming in the context. - // see issue #563 + taskSvc = task.NewValidator(taskSvc, bucketSvc) } // NATS streaming server diff --git a/query/preauthorizer.go b/query/preauthorizer.go index cd82f5cfb8..fccb8da95f 100644 --- a/query/preauthorizer.go +++ b/query/preauthorizer.go @@ -13,7 +13,7 @@ import ( // callers to fail early for operations that are not allowed. However, it's still possible // for authorization to be denied at runtime even if this check passes. type PreAuthorizer interface { - PreAuthorize(ctx context.Context, spec *flux.Spec, auth *platform.Authorization) error + PreAuthorize(ctx context.Context, spec *flux.Spec, auth platform.Authorizer) error } // NewPreAuthorizer creates a new PreAuthorizer @@ -26,8 +26,8 @@ type preAuthorizer struct { } // PreAuthorize finds all the buckets read and written by the given spec, and ensures that execution is allowed -// given the Authorization. Returns nil on success, and an error with an appropriate message otherwise. -func (a *preAuthorizer) PreAuthorize(ctx context.Context, spec *flux.Spec, auth *platform.Authorization) error { +// given the Authorizer. Returns nil on success, and an error with an appropriate message otherwise. +func (a *preAuthorizer) PreAuthorize(ctx context.Context, spec *flux.Spec, auth platform.Authorizer) error { readBuckets, writeBuckets, err := BucketsAccessed(spec) diff --git a/task/validator.go b/task/validator.go index e611442626..506720e159 100644 --- a/task/validator.go +++ b/task/validator.go @@ -4,9 +4,12 @@ import ( "context" "errors" "fmt" + "time" + "github.com/influxdata/flux" "github.com/influxdata/platform" platcontext "github.com/influxdata/platform/context" + "github.com/influxdata/platform/query" ) type authError struct { @@ -23,11 +26,13 @@ var ErrFailedPermission = errors.New("unauthorized") type taskServiceValidator struct { platform.TaskService + preAuth query.PreAuthorizer } -func NewValidator(ts platform.TaskService) platform.TaskService { +func NewValidator(ts platform.TaskService, bs platform.BucketService) platform.TaskService { return &taskServiceValidator{ TaskService: ts, + preAuth: query.NewPreAuthorizer(bs), } } @@ -36,6 +41,10 @@ func (ts *taskServiceValidator) CreateTask(ctx context.Context, t *platform.Task return err } + if err := validateBucket(ctx, t.Flux, ts.preAuth); err != nil { + return err + } + return ts.TaskService.CreateTask(ctx, t) } @@ -53,3 +62,21 @@ func validatePermission(ctx context.Context, perm platform.Permission) error { return nil } + +func validateBucket(ctx context.Context, script string, preAuth query.PreAuthorizer) error { + auth, err := platcontext.GetAuthorizer(ctx) + if err != nil { + return err + } + + spec, err := flux.Compile(ctx, script, time.Now()) + if err != nil { + return err + } + + if err := preAuth.PreAuthorize(ctx, spec, auth); err != nil { + return err + } + + return nil +}