From 0780232b83166d7d467d66b491077c72d4d7bad5 Mon Sep 17 00:00:00 2001 From: Brett Buddin Date: Thu, 20 Aug 2020 11:57:23 -0400 Subject: [PATCH] feat(task): Parse Task Options using Flux AST Helpers (#19326) * feat(task): Extract options using AST-based method. * feat(task): Use AST-based option APIs for updating task option. * chore(task): Use the old way of parsing durations. * fix(task): Ordering changed on us. Fixing tests to reflect the new order. * fix(task): There's no way for us to know if there are multiples with the current APIs. * chore(task): Guard against duplicate options. Minor cleanup. * fix(kit/feature): Break cyclical dependency between influxdb and pkgs that use feature. * chore(task): Feature flag updating Flux options. * chore(task): Ensure we are testing both paths of feature flag. * chore: Remove dead code. * chore(task/options): Remove unnecessary conditional. * chore(task/options): Unexport some error helpers. --- kit/feature/middleware.go | 12 +- kv/task.go | 25 +---- task.go | 66 ++++++++++- task/options/options.go | 190 ++++++++++++++++++++++++++++++-- task/options/options_errors.go | 24 +++- task/options/options_test.go | 68 ++++++++++++ task/servicetest/servicetest.go | 11 +- task_test.go | 31 +++++- 8 files changed, 377 insertions(+), 50 deletions(-) diff --git a/kit/feature/middleware.go b/kit/feature/middleware.go index 0c60c40978..20d374c746 100644 --- a/kit/feature/middleware.go +++ b/kit/feature/middleware.go @@ -1,10 +1,10 @@ package feature import ( + "context" "encoding/json" "net/http" - "github.com/influxdata/influxdb/v2" "go.uber.org/zap" ) @@ -44,8 +44,16 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } +// HTTPErrorHandler is an influxdb.HTTPErrorHandler. It's defined here instead +// of referencing the other interface type, because we want to try our best to +// avoid cyclical dependencies when feature package is used throughout the +// codebase. +type HTTPErrorHandler interface { + HandleHTTPError(ctx context.Context, err error, w http.ResponseWriter) +} + // NewFlagsHandler returns a handler that returns the map of computed feature flags on the request context. -func NewFlagsHandler(errorHandler influxdb.HTTPErrorHandler, byKey ByKeyFn) http.Handler { +func NewFlagsHandler(errorHandler HTTPErrorHandler, byKey ByKeyFn) http.Handler { fn := func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json; charset=utf-8") w.WriteHeader(http.StatusOK) diff --git a/kv/task.go b/kv/task.go index ac86d15b9f..3c2b53002a 100644 --- a/kv/task.go +++ b/kv/task.go @@ -3,7 +3,6 @@ package kv import ( "context" "encoding/json" - "regexp" "strings" "time" @@ -708,7 +707,7 @@ func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd inf // update the flux script if !upd.Options.IsZero() || upd.Flux != nil { - if err = upd.UpdateFlux(s.FluxLanguageService, task.Flux); err != nil { + if err = upd.UpdateFlux(ctx, s.FluxLanguageService, task.Flux); err != nil { return nil, err } task.Flux = *upd.Flux @@ -1661,8 +1660,6 @@ func taskRunKey(taskID, runID influxdb.ID) ([]byte, error) { return []byte(string(encodedID) + "/" + string(encodedRunID)), nil } -var taskOptionsPattern = regexp.MustCompile(`option\s+task\s*=\s*{.*}`) - // ExtractTaskOptions is a feature-flag driven switch between normal options // parsing and a more simplified variant. // @@ -1674,24 +1671,10 @@ var taskOptionsPattern = regexp.MustCompile(`option\s+task\s*=\s*{.*}`) // // [1]: https://github.com/influxdata/influxdb/issues/17666 func ExtractTaskOptions(ctx context.Context, lang influxdb.FluxLanguageService, flux string) (options.Options, error) { - if !feature.SimpleTaskOptionsExtraction().Enabled(ctx) { - return options.FromScript(lang, flux) + if feature.SimpleTaskOptionsExtraction().Enabled(ctx) { + return options.FromScriptAST(lang, flux) } - - matches := taskOptionsPattern.FindAllString(flux, -1) - if len(matches) == 0 { - return options.Options{}, &influxdb.Error{ - Code: influxdb.EInvalid, - Msg: "no task options defined", - } - } - if len(matches) > 1 { - return options.Options{}, &influxdb.Error{ - Code: influxdb.EInvalid, - Msg: "multiple task options defined", - } - } - return options.FromScript(lang, matches[0]) + return options.FromScript(lang, flux) } func (s *Service) maxPermissions(ctx context.Context, tx Tx, userID influxdb.ID) ([]influxdb.Permission, error) { diff --git a/task.go b/task.go index b1804c406e..02e7b59a40 100644 --- a/task.go +++ b/task.go @@ -10,6 +10,7 @@ import ( "github.com/influxdata/flux/ast" "github.com/influxdata/flux/ast/edit" + "github.com/influxdata/influxdb/v2/kit/feature" "github.com/influxdata/influxdb/v2/task/options" ) @@ -308,9 +309,17 @@ func safeParseSource(parser FluxLanguageService, f string) (pkg *ast.Package, er return parser.Parse(f) } -// UpdateFlux updates the TaskUpdate to go from updating options to updating a flux string, that now has those updated options in it -// It zeros the options in the TaskUpdate. -func (t *TaskUpdate) UpdateFlux(parser FluxLanguageService, oldFlux string) (err error) { +// UpdateFlux updates the TaskUpdate to go from updating options to updating a +// flux string, that now has those updated options in it. It zeros the options +// in the TaskUpdate. +func (t *TaskUpdate) UpdateFlux(ctx context.Context, parser FluxLanguageService, oldFlux string) error { + if !feature.SimpleTaskOptionsExtraction().Enabled(ctx) { + return t.updateFluxAST(parser, oldFlux) + } + return t.updateFlux(parser, oldFlux) +} + +func (t *TaskUpdate) updateFlux(parser FluxLanguageService, oldFlux string) error { if t.Flux != nil && *t.Flux != "" { oldFlux = *t.Flux } @@ -415,6 +424,57 @@ func (t *TaskUpdate) UpdateFlux(parser FluxLanguageService, oldFlux string) (err return nil } +func (t *TaskUpdate) updateFluxAST(parser FluxLanguageService, oldFlux string) error { + if t.Flux != nil && *t.Flux != "" { + oldFlux = *t.Flux + } + parsedPKG, err := safeParseSource(parser, oldFlux) + if err != nil { + return err + } + + parsed := parsedPKG.Files[0] + if !t.Options.Every.IsZero() && t.Options.Cron != "" { + return errors.New("cannot specify both cron and every") + } + + taskOptions, err := edit.GetOption(parsed, "task") + if err != nil { + return err + } + + optsExpr := taskOptions.(*ast.ObjectExpression) + + if t.Options.Name != "" { + edit.SetProperty(optsExpr, "name", &ast.StringLiteral{ + Value: t.Options.Name, + }) + } + if !t.Options.Every.IsZero() { + edit.SetProperty(optsExpr, "every", t.Options.Every.Node.Copy().(*ast.DurationLiteral)) + edit.DeleteProperty(optsExpr, "cron") + } + if t.Options.Cron != "" { + edit.SetProperty(optsExpr, "cron", &ast.StringLiteral{ + Value: t.Options.Cron, + }) + edit.DeleteProperty(optsExpr, "every") + } + if t.Options.Offset != nil { + if !t.Options.Offset.IsZero() { + edit.SetProperty(optsExpr, "offset", t.Options.Offset.Node.Copy().(*ast.DurationLiteral)) + } else { + edit.DeleteProperty(optsExpr, "offset") + } + } + + t.Options.Clear() + s := ast.Format(parsed) + t.Flux = &s + + return nil +} + // TaskFilter represents a set of filters that restrict the returned results type TaskFilter struct { Type *string diff --git a/task/options/options.go b/task/options/options.go index 630331fca9..11b9e4e897 100644 --- a/task/options/options.go +++ b/task/options/options.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/cron" "github.com/influxdata/flux" "github.com/influxdata/flux/ast" + "github.com/influxdata/flux/ast/edit" "github.com/influxdata/flux/interpreter" "github.com/influxdata/flux/semantic" "github.com/influxdata/flux/values" @@ -54,7 +55,7 @@ func (a Duration) String() string { func (a *Duration) Parse(s string) error { q, err := ParseSignedDuration(s) if err != nil { - return ErrTaskInvalidDuration(err) + return errTaskInvalidDuration(err) } a.Node = *q return nil @@ -206,6 +207,179 @@ type FluxLanguageService interface { EvalAST(ctx context.Context, astPkg *ast.Package) ([]interpreter.SideEffect, values.Scope, error) } +// FromScriptAST extracts Task options from a Flux script using only the AST (no +// evaluation of the script). Using AST here allows us to avoid having to +// contend with functions that aren't available in some parsing contexts (within +// Gateway for example). +func FromScriptAST(lang FluxLanguageService, script string) (Options, error) { + opts := Options{ + Retry: pointer.Int64(1), + Concurrency: pointer.Int64(1), + } + + fluxAST, err := parse(lang, script) + if err != nil { + return opts, err + } + + if len(fluxAST.Files) == 0 { + return opts, ErrNoASTFile + } + + file := fluxAST.Files[0] + if hasDuplicateOptions(file, "task") { + return opts, ErrMultipleTaskOptionsDefined + } + + obj, err := edit.GetOption(file, "task") + if err != nil { + return opts, ErrNoTaskOptionsDefined + } + + objExpr, ok := obj.(*ast.ObjectExpression) + if !ok { + return opts, errTaskOptionNotObjectExpression(objExpr.Type()) + } + + for _, fn := range taskOptionExtractors { + if err := fn(&opts, objExpr); err != nil { + return opts, err + } + } + + if err := opts.Validate(); err != nil { + return opts, err + } + + return opts, nil +} + +// hasDuplicateOptions determines whether or not there are multiple assignments +// to the same option variable. +// +// TODO(brett): This will be superceded by edit.HasDuplicateOptions once its available. +func hasDuplicateOptions(file *ast.File, name string) bool { + var n int + for _, st := range file.Body { + if val, ok := st.(*ast.OptionStatement); ok { + assign := val.Assignment + if va, ok := assign.(*ast.VariableAssignment); ok { + if va.ID.Name == name { + n++ + } + } + } + } + return n > 1 +} + +type extractFn func(*Options, *ast.ObjectExpression) error + +var taskOptionExtractors = []extractFn{ + extractNameOption, + extractScheduleOptions, + extractOffsetOption, + extractConcurrencyOption, + extractRetryOption, +} + +func extractNameOption(opts *Options, objExpr *ast.ObjectExpression) error { + nameExpr, err := edit.GetProperty(objExpr, optName) + if err != nil { + return errMissingRequiredTaskOption(optName) + } + nameStr, ok := nameExpr.(*ast.StringLiteral) + if !ok { + return errParseTaskOptionField(optName) + } + opts.Name = ast.StringFromLiteral(nameStr) + + return nil +} + +func extractScheduleOptions(opts *Options, objExpr *ast.ObjectExpression) error { + cronExpr, cronErr := edit.GetProperty(objExpr, optCron) + everyExpr, everyErr := edit.GetProperty(objExpr, optEvery) + if cronErr == nil && everyErr == nil { + return ErrDuplicateIntervalField + } + if cronErr != nil && everyErr != nil { + return errMissingRequiredTaskOption("cron or every") + } + + if cronErr == nil { + cronExprStr, ok := cronExpr.(*ast.StringLiteral) + if !ok { + return errParseTaskOptionField(optCron) + } + opts.Cron = ast.StringFromLiteral(cronExprStr) + } + + if everyErr == nil { + everyDur, ok := everyExpr.(*ast.DurationLiteral) + if !ok { + return errParseTaskOptionField(optEvery) + } + opts.Every = Duration{Node: *everyDur} + } + + return nil +} + +func extractOffsetOption(opts *Options, objExpr *ast.ObjectExpression) error { + offsetExpr, offsetErr := edit.GetProperty(objExpr, optOffset) + if offsetErr != nil { + return nil + } + + switch offsetExprV := offsetExpr.(type) { + case *ast.UnaryExpression: + offsetDur, err := ParseSignedDuration(offsetExprV.Loc.Source) + if err != nil { + return err + } + opts.Offset = &Duration{Node: *offsetDur} + case *ast.DurationLiteral: + opts.Offset = &Duration{Node: *offsetExprV} + default: + return errParseTaskOptionField(optOffset) + } + + return nil +} + +func extractConcurrencyOption(opts *Options, objExpr *ast.ObjectExpression) error { + concurExpr, err := edit.GetProperty(objExpr, optConcurrency) + if err != nil { + return nil + } + + concurInt, ok := concurExpr.(*ast.IntegerLiteral) + if !ok { + return errParseTaskOptionField(optConcurrency) + } + val := ast.IntegerFromLiteral(concurInt) + opts.Concurrency = &val + + return nil +} + +func extractRetryOption(opts *Options, objExpr *ast.ObjectExpression) error { + retryExpr, err := edit.GetProperty(objExpr, optRetry) + if err != nil { + return nil + } + + retryInt, ok := retryExpr.(*ast.IntegerLiteral) + if !ok { + return errParseTaskOptionField(optRetry) + } + val := ast.IntegerFromLiteral(retryInt) + opts.Retry = &val + + return nil +} + // FromScript extracts Options from a Flux script. func FromScript(lang FluxLanguageService, script string) (Options, error) { opt := Options{Retry: pointer.Int64(1), Concurrency: pointer.Int64(1)} @@ -225,7 +399,7 @@ func FromScript(lang FluxLanguageService, script string) (Options, error) { // pull options from the program scope task, ok := scope.Lookup("task") if !ok { - return opt, ErrMissingRequiredTaskOption("task") + return opt, errMissingRequiredTaskOption("task") } // check to make sure task is an object if err := checkNature(task.Type().Nature(), semantic.Object); err != nil { @@ -238,7 +412,7 @@ func FromScript(lang FluxLanguageService, script string) (Options, error) { nameVal, ok := optObject.Get(optName) if !ok { - return opt, ErrMissingRequiredTaskOption("name") + return opt, errMissingRequiredTaskOption("name") } if err := checkNature(nameVal.Type().Nature(), semantic.String); err != nil { @@ -252,7 +426,7 @@ func FromScript(lang FluxLanguageService, script string) (Options, error) { } if !cronOK && !everyOK { - return opt, ErrMissingRequiredTaskOption("cron or every is required") + return opt, errMissingRequiredTaskOption("cron or every is required") } if cronOK { @@ -268,7 +442,7 @@ func FromScript(lang FluxLanguageService, script string) (Options, error) { } dur, ok := durTypes["every"] if !ok || dur == nil { - return opt, ErrParseTaskOptionField("every") + return opt, errParseTaskOptionField("every") } durNode, err := ParseSignedDuration(dur.Location().Source) if err != nil { @@ -276,7 +450,7 @@ func FromScript(lang FluxLanguageService, script string) (Options, error) { } if !ok || durNode == nil { - return opt, ErrParseTaskOptionField("every") + return opt, errParseTaskOptionField("every") } durNode.BaseNode = ast.BaseNode{} @@ -289,14 +463,14 @@ func FromScript(lang FluxLanguageService, script string) (Options, error) { } dur, ok := durTypes["offset"] if !ok || dur == nil { - return opt, ErrParseTaskOptionField("offset") + return opt, errParseTaskOptionField("offset") } durNode, err := ParseSignedDuration(dur.Location().Source) if err != nil { return opt, err } if !ok || durNode == nil { - return opt, ErrParseTaskOptionField("offset") + return opt, errParseTaskOptionField("offset") } durNode.BaseNode = ast.BaseNode{} opt.Offset = &Duration{} diff --git a/task/options/options_errors.go b/task/options/options_errors.go index 40f75bb578..f630675944 100644 --- a/task/options/options_errors.go +++ b/task/options/options_errors.go @@ -1,22 +1,36 @@ package options import ( + "errors" "fmt" ) -func ErrParseTaskOptionField(opt string) error { +// errParseTaskOptionField is returned when we fail to parse a single field in +// task options. +func errParseTaskOptionField(opt string) error { return fmt.Errorf("failed to parse field '%s' in task options", opt) } -func ErrMissingRequiredTaskOption(opt string) error { +// errMissingRequiredTaskOption is returned when we a required option is +// missing. +func errMissingRequiredTaskOption(opt string) error { return fmt.Errorf("missing required option: %s", opt) } -// ErrTaskInvalidDuration is returned when an "every" or "offset" option is invalid in a task. -func ErrTaskInvalidDuration(err error) error { +// errTaskInvalidDuration is returned when an "every" or "offset" option is invalid in a task. +func errTaskInvalidDuration(err error) error { return fmt.Errorf("invalid duration in task %s", err) } +// errTaskOptionNotObjectExpression is returned when the type of an task option +// value is not an object literal expression. +func errTaskOptionNotObjectExpression(actualType string) error { + return fmt.Errorf("task option expected to be object literal, but found %q", actualType) +} + var ( - ErrDuplicateIntervalField = fmt.Errorf("cannot use both cron and every in task options") + ErrDuplicateIntervalField = errors.New("cannot use both cron and every in task options") + ErrNoTaskOptionsDefined = errors.New("no task options defined") + ErrMultipleTaskOptionsDefined = errors.New("multiple task options defined") + ErrNoASTFile = errors.New("expected parsed file, but found none") ) diff --git a/task/options/options_test.go b/task/options/options_test.go index a99fc84391..ffdd7c0247 100644 --- a/task/options/options_test.go +++ b/task/options/options_test.go @@ -8,6 +8,8 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/influxdata/flux/ast" "github.com/influxdata/influxdb/v2/pkg/pointer" _ "github.com/influxdata/influxdb/v2/query/builtin" "github.com/influxdata/influxdb/v2/query/fluxlang" @@ -57,6 +59,72 @@ func TestNegDurations(t *testing.T) { } } +func TestFromScriptAST(t *testing.T) { + for _, c := range []struct { + script string + exp options.Options + shouldErr bool + }{ + {script: scriptGenerator(options.Options{Name: "name0", Cron: "* * * * *", Concurrency: pointer.Int64(2), Retry: pointer.Int64(3), Offset: options.MustParseDuration("-1m")}, ""), + exp: options.Options{Name: "name0", + Cron: "* * * * *", + Concurrency: pointer.Int64(2), + Retry: pointer.Int64(3), + Offset: options.MustParseDuration("-1m")}}, + {script: scriptGenerator(options.Options{Name: "name1", Every: *(options.MustParseDuration("5s"))}, ""), exp: options.Options{Name: "name1", Every: *(options.MustParseDuration("5s")), Concurrency: pointer.Int64(1), Retry: pointer.Int64(1)}}, + {script: scriptGenerator(options.Options{Name: "name2", Cron: "* * * * *"}, ""), exp: options.Options{Name: "name2", Cron: "* * * * *", Concurrency: pointer.Int64(1), Retry: pointer.Int64(1)}}, + {script: scriptGenerator(options.Options{Name: "name3", Every: *(options.MustParseDuration("1h")), Cron: "* * * * *"}, ""), shouldErr: true}, + {script: scriptGenerator(options.Options{Name: "name4", Concurrency: pointer.Int64(1000), Every: *(options.MustParseDuration("1h"))}, ""), shouldErr: true}, + {script: "option task = {\n name: \"name5\",\n concurrency: 0,\n every: 1m0s,\n\n}\n\nfrom(bucket: \"test\")\n |> range(start:-1h)", shouldErr: true}, + {script: "option task = {\n name: \"name6\",\n concurrency: 1,\n every: 1,\n\n}\n\nfrom(bucket: \"test\")\n |> range(start:-1h)", shouldErr: true}, + {script: scriptGenerator(options.Options{Name: "name7", Retry: pointer.Int64(20), Every: *(options.MustParseDuration("1h"))}, ""), shouldErr: true}, + {script: "option task = {\n name: \"name8\",\n retry: 0,\n every: 1m0s,\n\n}\n\nfrom(bucket: \"test\")\n |> range(start:-1h)", shouldErr: true}, + {script: scriptGenerator(options.Options{Name: "name9"}, ""), shouldErr: true}, + {script: scriptGenerator(options.Options{}, ""), shouldErr: true}, + {script: `option task = { + name: "name10", + every: 1d, + offset: 1m, + } + from(bucket: "metrics") + |> range(start: now(), stop: 8w) + `, + exp: options.Options{Name: "name10", Every: *(options.MustParseDuration("1d")), Concurrency: pointer.Int64(1), Retry: pointer.Int64(1), Offset: options.MustParseDuration("1m")}, + }, + {script: `option task = { + name: "name11", + every: 1m, + offset: 1d, + } + from(bucket: "metrics") + |> range(start: now(), stop: 8w) + + `, + exp: options.Options{Name: "name11", Every: *(options.MustParseDuration("1m")), Concurrency: pointer.Int64(1), Retry: pointer.Int64(1), Offset: options.MustParseDuration("1d")}, + }, + {script: "option task = {name:\"test_task_smoke_name\", every:30s} from(bucket:\"test_tasks_smoke_bucket_source\") |> range(start: -1h) |> map(fn: (r) => ({r with _time: r._time, _value:r._value, t : \"quality_rocks\"}))|> to(bucket:\"test_tasks_smoke_bucket_dest\", orgID:\"3e73e749495d37d5\")", + exp: options.Options{Name: "test_task_smoke_name", Every: *(options.MustParseDuration("30s")), Retry: pointer.Int64(1), Concurrency: pointer.Int64(1)}, shouldErr: false}, // TODO(docmerlin): remove this once tasks fully supports all flux duration units. + + } { + o, err := options.FromScriptAST(fluxlang.DefaultService, c.script) + if c.shouldErr && err == nil { + t.Fatalf("script %q should have errored but didn't", c.script) + } else if !c.shouldErr && err != nil { + t.Fatalf("script %q should not have errored, but got %v", c.script, err) + } + + if err != nil { + continue + } + + ignoreLocation := cmpopts.IgnoreFields(ast.BaseNode{}, "Loc") + + if !cmp.Equal(o, c.exp, ignoreLocation) { + t.Fatalf("script %q got unexpected result -got/+exp\n%s", c.script, cmp.Diff(o, c.exp)) + } + } +} + func TestFromScript(t *testing.T) { for _, c := range []struct { script string diff --git a/task/servicetest/servicetest.go b/task/servicetest/servicetest.go index e1970c7961..007548f5d6 100644 --- a/task/servicetest/servicetest.go +++ b/task/servicetest/servicetest.go @@ -375,6 +375,7 @@ func testTaskCRUD(t *testing.T, sys *System) { if origID != f.ID { t.Fatalf("task ID unexpectedly changed during update, from %s to %s", origID.String(), f.ID.String()) } + if f.Flux != newFlux { t.Fatalf("wrong flux from update; want %q, got %q", newFlux, f.Flux) } @@ -426,7 +427,7 @@ func testTaskCRUD(t *testing.T, sys *System) { // Update task: switch to every. newStatus = string(influxdb.TaskActive) - newFlux = "option task = {\n\tname: \"task-changed #98\",\n\tevery: 30s,\n\toffset: 5s,\n\tconcurrency: 100,\n}\n\nfrom(bucket: \"b\")\n\t|> to(bucket: \"two\", orgID: \"000000000000000\")" + newFlux = "option task = {\n\tname: \"task-changed #98\",\n\toffset: 5s,\n\tconcurrency: 100,\n\tevery: 30s,\n}\n\nfrom(bucket: \"b\")\n\t|> to(bucket: \"two\", orgID: \"000000000000000\")" f, err = sys.TaskService.UpdateTask(authorizedCtx, origID, influxdb.TaskUpdate{Options: options.Options{Every: *(options.MustParseDuration("30s"))}}) if err != nil { t.Fatal(err) @@ -655,7 +656,7 @@ from(bucket: "b") t.Fatal(err) } t.Run("update task and delete offset", func(t *testing.T) { - expectedFlux := `option task = {name: "task-Options-Update", every: 10s, concurrency: 100} + expectedFlux := `option task = {name: "task-Options-Update", concurrency: 100, every: 10s} from(bucket: "b") |> to(bucket: "two", orgID: "000000000000000")` @@ -675,8 +676,8 @@ from(bucket: "b") t.Run("update task with different offset option", func(t *testing.T) { expectedFlux := `option task = { name: "task-Options-Update", - every: 10s, concurrency: 100, + every: 10s, offset: 10s, } @@ -1738,14 +1739,14 @@ const ( concurrency: 100, } -from(bucket:"b") +from(bucket: "b") |> to(bucket: "two", orgID: "000000000000000")` scriptDifferentName = `option task = { name: "task-changed #%d", - cron: "* * * * *", offset: 5s, concurrency: 100, + cron: "* * * * *", } from(bucket: "b") diff --git a/task_test.go b/task_test.go index ab0470a947..3f2aaecd8e 100644 --- a/task_test.go +++ b/task_test.go @@ -1,11 +1,14 @@ package influxdb_test import ( + "context" "encoding/json" "testing" "github.com/google/go-cmp/cmp" platform "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kit/feature" + "github.com/influxdata/influxdb/v2/mock" _ "github.com/influxdata/influxdb/v2/query/builtin" "github.com/influxdata/influxdb/v2/query/fluxlang" "github.com/influxdata/influxdb/v2/task/options" @@ -53,10 +56,26 @@ func TestOptionsMarshal(t *testing.T) { } } +func TestOptionsEditWithAST(t *testing.T) { + flagger := mock.NewFlagger(map[feature.Flag]interface{}{ + feature.SimpleTaskOptionsExtraction(): true, + }) + testOptionsEdit(t, flagger) +} + func TestOptionsEdit(t *testing.T) { + flagger := mock.NewFlagger(map[feature.Flag]interface{}{ + feature.SimpleTaskOptionsExtraction(): false, + }) + testOptionsEdit(t, flagger) +} + +func testOptionsEdit(t *testing.T, flagger feature.Flagger) { + ctx, _ := feature.Annotate(context.Background(), flagger) + tu := &platform.TaskUpdate{} tu.Options.Every = *(options.MustParseDuration("10s")) - if err := tu.UpdateFlux(fluxlang.DefaultService, `option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil { + if err := tu.UpdateFlux(ctx, fluxlang.DefaultService, `option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil { t.Fatal(err) } t.Run("zeroing", func(t *testing.T) { @@ -86,7 +105,7 @@ from(bucket: "x") t.Run("add new option", func(t *testing.T) { tu := &platform.TaskUpdate{} tu.Options.Offset = options.MustParseDuration("30s") - if err := tu.UpdateFlux(fluxlang.DefaultService, `option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil { + if err := tu.UpdateFlux(ctx, fluxlang.DefaultService, `option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil { t.Fatal(err) } op, err := options.FromScript(fluxlang.DefaultService, *tu.Flux) @@ -94,13 +113,13 @@ from(bucket: "x") t.Error(err) } if op.Offset == nil || op.Offset.String() != "30s" { - t.Fatalf("expected every to be 30s but was %s", op.Every) + t.Fatalf("expected offset to be 30s but was %s", op.Offset) } }) t.Run("switching from every to cron", func(t *testing.T) { tu := &platform.TaskUpdate{} tu.Options.Cron = "* * * * *" - if err := tu.UpdateFlux(fluxlang.DefaultService, `option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil { + if err := tu.UpdateFlux(ctx, fluxlang.DefaultService, `option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil { t.Fatal(err) } op, err := options.FromScript(fluxlang.DefaultService, *tu.Flux) @@ -117,7 +136,7 @@ from(bucket: "x") t.Run("switching from cron to every", func(t *testing.T) { tu := &platform.TaskUpdate{} tu.Options.Every = *(options.MustParseDuration("10s")) - if err := tu.UpdateFlux(fluxlang.DefaultService, `option task = {cron: "* * * * *", name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil { + if err := tu.UpdateFlux(ctx, fluxlang.DefaultService, `option task = {cron: "* * * * *", name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil { t.Fatal(err) } op, err := options.FromScript(fluxlang.DefaultService, *tu.Flux) @@ -138,7 +157,7 @@ from(bucket: "x") from(bucket: "x") |> range(start: -1h)` - if err := tu.UpdateFlux(fluxlang.DefaultService, `option task = {cron: "* * * * *", name: "foo", offset: 10s} from(bucket:"x") |> range(start:-1h)`); err != nil { + if err := tu.UpdateFlux(ctx, fluxlang.DefaultService, `option task = {cron: "* * * * *", name: "foo", offset: 10s} from(bucket:"x") |> range(start:-1h)`); err != nil { t.Fatal(err) } op, err := options.FromScript(fluxlang.DefaultService, *tu.Flux)