diff --git a/flags.yml b/flags.yml index ddb98b1a8f..58104c4205 100644 --- a/flags.yml +++ b/flags.yml @@ -153,3 +153,10 @@ default: false contact: Lyon Hill lifetime: temporary + +- name: Simple Task Options Extraction + description: Simplified task options extraction to avoid undefined functions when saving tasks + key: simpleTaskOptionsExtraction + default: false + contact: Brett Buddin + lifetime: temporary diff --git a/kit/feature/list.go b/kit/feature/list.go index ae3cb857e9..a2c32551a8 100644 --- a/kit/feature/list.go +++ b/kit/feature/list.go @@ -296,6 +296,20 @@ func UrmFreeTasks() BoolFlag { return urmFreeTasks } +var simpleTaskOptionsExtraction = MakeBoolFlag( + "Simple Task Options Extraction", + "simpleTaskOptionsExtraction", + "Brett Buddin", + false, + Temporary, + false, +) + +// SimpleTaskOptionsExtraction - Simplified task options extraction to avoid undefined functions when saving tasks +func SimpleTaskOptionsExtraction() BoolFlag { + return simpleTaskOptionsExtraction +} + var all = []Flag{ appMetrics, backendExample, @@ -318,6 +332,7 @@ var all = []Flag{ hydratevars, memoryOptimizedFill, urmFreeTasks, + simpleTaskOptionsExtraction, } var byKey = map[string]Flag{ @@ -342,4 +357,5 @@ var byKey = map[string]Flag{ "hydratevars": hydratevars, "memoryOptimizedFill": memoryOptimizedFill, "urmFreeTasks": urmFreeTasks, + "simpleTaskOptionsExtraction": simpleTaskOptionsExtraction, } diff --git a/kv/task.go b/kv/task.go index a3b2e24f1b..c49c29c70c 100644 --- a/kv/task.go +++ b/kv/task.go @@ -3,6 +3,7 @@ package kv import ( "context" "encoding/json" + "regexp" "strings" "time" @@ -674,7 +675,7 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate) // return nil, influxdb.ErrInvalidOwnerID // } - opt, err := options.FromScript(s.FluxLanguageService, tc.Flux) + opts, err := ExtractTaskOptions(ctx, s.FluxLanguageService, tc.Flux) if err != nil { return nil, influxdb.ErrTaskOptionParse(err) } @@ -691,19 +692,19 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate) Organization: org.Name, OwnerID: tc.OwnerID, Metadata: tc.Metadata, - Name: opt.Name, + Name: opts.Name, Description: tc.Description, Status: tc.Status, Flux: tc.Flux, - Every: opt.Every.String(), - Cron: opt.Cron, + Every: opts.Every.String(), + Cron: opts.Cron, CreatedAt: createdAt, LatestCompleted: createdAt, LatestScheduled: createdAt, } - if opt.Offset != nil { - off, err := time.ParseDuration(opt.Offset.String()) + if opts.Offset != nil { + off, err := time.ParseDuration(opts.Offset.String()) if err != nil { return nil, influxdb.ErrTaskTimeParse(err) } @@ -830,17 +831,17 @@ func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd inf } task.Flux = *upd.Flux - options, err := options.FromScript(s.FluxLanguageService, *upd.Flux) + opts, err := ExtractTaskOptions(ctx, s.FluxLanguageService, *upd.Flux) if err != nil { return nil, influxdb.ErrTaskOptionParse(err) } - task.Name = options.Name - task.Every = options.Every.String() - task.Cron = options.Cron + task.Name = opts.Name + task.Every = opts.Every.String() + task.Cron = opts.Cron var off time.Duration - if options.Offset != nil { - off, err = time.ParseDuration(options.Offset.String()) + if opts.Offset != nil { + off, err = time.ParseDuration(opts.Offset.String()) if err != nil { return nil, influxdb.ErrTaskTimeParse(err) } @@ -1919,3 +1920,36 @@ func (s *Service) TaskOwnerIDUpMigration(ctx context.Context, store Store) error } return nil } + +var taskOptionsPattern = regexp.MustCompile(`option\s+task\s*=\s*{.*}`) + +// ExtractTaskOptions is a feature-flag driven switch between normal options +// parsing and a more simplified variant. +// +// The simplified variant extracts the options assignment and passes only that +// content through the parser. This allows us to allow scenarios like [1] to +// pass through options validation. One clear drawback of this is that it +// requires constant values for the parameter assignments. However, most people +// are doing that anyway. +// +// [1]: https://github.com/influxdata/influxdb/issues/17666 +func ExtractTaskOptions(ctx context.Context, lang influxdb.FluxLanguageService, flux string) (options.Options, error) { + if !feature.SimpleTaskOptionsExtraction().Enabled(ctx) { + return options.FromScript(lang, flux) + } + + matches := taskOptionsPattern.FindAllString(flux, -1) + if len(matches) == 0 { + return options.Options{}, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: "no task options defined", + } + } + if len(matches) > 1 { + return options.Options{}, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: "multiple task options defined", + } + } + return options.FromScript(lang, matches[0]) +} diff --git a/kv/task_test.go b/kv/task_test.go index d19be783d8..a66855b60b 100644 --- a/kv/task_test.go +++ b/kv/task_test.go @@ -12,10 +12,15 @@ import ( "github.com/google/go-cmp/cmp" "github.com/influxdata/influxdb/v2" icontext "github.com/influxdata/influxdb/v2/context" + "github.com/influxdata/influxdb/v2/kit/feature" "github.com/influxdata/influxdb/v2/kv" + "github.com/influxdata/influxdb/v2/mock" _ "github.com/influxdata/influxdb/v2/query/builtin" "github.com/influxdata/influxdb/v2/query/fluxlang" + "github.com/influxdata/influxdb/v2/task/options" "github.com/influxdata/influxdb/v2/task/servicetest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" ) @@ -387,3 +392,103 @@ func TestTaskMigrate(t *testing.T) { t.Fatal("failed to fill in ownerID") } } + +type taskOptions struct { + name string + every string + cron string + offset string + concurrency int64 + retry int64 +} + +func TestExtractTaskOptions(t *testing.T) { + tcs := []struct { + name string + flux string + expected taskOptions + errMsg string + }{ + { + name: "all parameters", + flux: `option task = {name: "whatever", every: 1s, offset: 0s, concurrency: 2, retry: 2}`, + expected: taskOptions{ + name: "whatever", + every: "1s", + offset: "0s", + concurrency: 2, + retry: 2, + }, + }, + { + name: "some extra whitespace and bad content around it", + flux: `howdy() + option task = { name:"whatever", cron: "* * * * *" } + hello() + `, + expected: taskOptions{ + name: "whatever", + cron: "* * * * *", + concurrency: 1, + retry: 1, + }, + }, + { + name: "bad options", + flux: `option task = {name: "whatever", every: 1s, cron: "* * * * *"}`, + errMsg: "cannot use both cron and every in task options", + }, + { + name: "no options", + flux: `doesntexist()`, + errMsg: "no task options defined", + }, + { + name: "multiple assignments", + flux: ` + option task = {name: "whatever", every: 1s, offset: 0s, concurrency: 2, retry: 2} + option task = {name: "whatever", every: 1s, offset: 0s, concurrency: 2, retry: 2} + `, + errMsg: "multiple task options defined", + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + flagger := mock.NewFlagger(map[feature.Flag]interface{}{ + feature.SimpleTaskOptionsExtraction(): true, + }) + ctx, _ := feature.Annotate(context.Background(), flagger) + opts, err := kv.ExtractTaskOptions(ctx, fluxlang.DefaultService, tc.flux) + if tc.errMsg != "" { + require.Error(t, err) + assert.Equal(t, tc.errMsg, err.Error()) + return + } + + require.NoError(t, err) + + var offset options.Duration + if opts.Offset != nil { + offset = *opts.Offset + } + + var concur int64 + if opts.Concurrency != nil { + concur = *opts.Concurrency + } + + var retry int64 + if opts.Retry != nil { + retry = *opts.Retry + } + + assert.Equal(t, tc.expected.name, opts.Name) + assert.Equal(t, tc.expected.cron, opts.Cron) + assert.Equal(t, tc.expected.every, opts.Every.String()) + assert.Equal(t, tc.expected.offset, offset.String()) + assert.Equal(t, tc.expected.concurrency, concur) + assert.Equal(t, tc.expected.retry, retry) + }) + } +}