fix(kv): Install feature-flag for switching between normal and simplified options parsing. (#18662)
parent
a96f21f27c
commit
81e4b02b42
|
@ -153,3 +153,10 @@
|
||||||
default: false
|
default: false
|
||||||
contact: Lyon Hill
|
contact: Lyon Hill
|
||||||
lifetime: temporary
|
lifetime: temporary
|
||||||
|
|
||||||
|
- name: Simple Task Options Extraction
|
||||||
|
description: Simplified task options extraction to avoid undefined functions when saving tasks
|
||||||
|
key: simpleTaskOptionsExtraction
|
||||||
|
default: false
|
||||||
|
contact: Brett Buddin
|
||||||
|
lifetime: temporary
|
||||||
|
|
|
@ -296,6 +296,20 @@ func UrmFreeTasks() BoolFlag {
|
||||||
return urmFreeTasks
|
return urmFreeTasks
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var simpleTaskOptionsExtraction = MakeBoolFlag(
|
||||||
|
"Simple Task Options Extraction",
|
||||||
|
"simpleTaskOptionsExtraction",
|
||||||
|
"Brett Buddin",
|
||||||
|
false,
|
||||||
|
Temporary,
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
|
||||||
|
// SimpleTaskOptionsExtraction - Simplified task options extraction to avoid undefined functions when saving tasks
|
||||||
|
func SimpleTaskOptionsExtraction() BoolFlag {
|
||||||
|
return simpleTaskOptionsExtraction
|
||||||
|
}
|
||||||
|
|
||||||
var all = []Flag{
|
var all = []Flag{
|
||||||
appMetrics,
|
appMetrics,
|
||||||
backendExample,
|
backendExample,
|
||||||
|
@ -318,6 +332,7 @@ var all = []Flag{
|
||||||
hydratevars,
|
hydratevars,
|
||||||
memoryOptimizedFill,
|
memoryOptimizedFill,
|
||||||
urmFreeTasks,
|
urmFreeTasks,
|
||||||
|
simpleTaskOptionsExtraction,
|
||||||
}
|
}
|
||||||
|
|
||||||
var byKey = map[string]Flag{
|
var byKey = map[string]Flag{
|
||||||
|
@ -342,4 +357,5 @@ var byKey = map[string]Flag{
|
||||||
"hydratevars": hydratevars,
|
"hydratevars": hydratevars,
|
||||||
"memoryOptimizedFill": memoryOptimizedFill,
|
"memoryOptimizedFill": memoryOptimizedFill,
|
||||||
"urmFreeTasks": urmFreeTasks,
|
"urmFreeTasks": urmFreeTasks,
|
||||||
|
"simpleTaskOptionsExtraction": simpleTaskOptionsExtraction,
|
||||||
}
|
}
|
||||||
|
|
58
kv/task.go
58
kv/task.go
|
@ -3,6 +3,7 @@ package kv
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -674,7 +675,7 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate)
|
||||||
// return nil, influxdb.ErrInvalidOwnerID
|
// return nil, influxdb.ErrInvalidOwnerID
|
||||||
// }
|
// }
|
||||||
|
|
||||||
opt, err := options.FromScript(s.FluxLanguageService, tc.Flux)
|
opts, err := ExtractTaskOptions(ctx, s.FluxLanguageService, tc.Flux)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, influxdb.ErrTaskOptionParse(err)
|
return nil, influxdb.ErrTaskOptionParse(err)
|
||||||
}
|
}
|
||||||
|
@ -691,19 +692,19 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate)
|
||||||
Organization: org.Name,
|
Organization: org.Name,
|
||||||
OwnerID: tc.OwnerID,
|
OwnerID: tc.OwnerID,
|
||||||
Metadata: tc.Metadata,
|
Metadata: tc.Metadata,
|
||||||
Name: opt.Name,
|
Name: opts.Name,
|
||||||
Description: tc.Description,
|
Description: tc.Description,
|
||||||
Status: tc.Status,
|
Status: tc.Status,
|
||||||
Flux: tc.Flux,
|
Flux: tc.Flux,
|
||||||
Every: opt.Every.String(),
|
Every: opts.Every.String(),
|
||||||
Cron: opt.Cron,
|
Cron: opts.Cron,
|
||||||
CreatedAt: createdAt,
|
CreatedAt: createdAt,
|
||||||
LatestCompleted: createdAt,
|
LatestCompleted: createdAt,
|
||||||
LatestScheduled: createdAt,
|
LatestScheduled: createdAt,
|
||||||
}
|
}
|
||||||
|
|
||||||
if opt.Offset != nil {
|
if opts.Offset != nil {
|
||||||
off, err := time.ParseDuration(opt.Offset.String())
|
off, err := time.ParseDuration(opts.Offset.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, influxdb.ErrTaskTimeParse(err)
|
return nil, influxdb.ErrTaskTimeParse(err)
|
||||||
}
|
}
|
||||||
|
@ -830,17 +831,17 @@ func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd inf
|
||||||
}
|
}
|
||||||
task.Flux = *upd.Flux
|
task.Flux = *upd.Flux
|
||||||
|
|
||||||
options, err := options.FromScript(s.FluxLanguageService, *upd.Flux)
|
opts, err := ExtractTaskOptions(ctx, s.FluxLanguageService, *upd.Flux)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, influxdb.ErrTaskOptionParse(err)
|
return nil, influxdb.ErrTaskOptionParse(err)
|
||||||
}
|
}
|
||||||
task.Name = options.Name
|
task.Name = opts.Name
|
||||||
task.Every = options.Every.String()
|
task.Every = opts.Every.String()
|
||||||
task.Cron = options.Cron
|
task.Cron = opts.Cron
|
||||||
|
|
||||||
var off time.Duration
|
var off time.Duration
|
||||||
if options.Offset != nil {
|
if opts.Offset != nil {
|
||||||
off, err = time.ParseDuration(options.Offset.String())
|
off, err = time.ParseDuration(opts.Offset.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, influxdb.ErrTaskTimeParse(err)
|
return nil, influxdb.ErrTaskTimeParse(err)
|
||||||
}
|
}
|
||||||
|
@ -1919,3 +1920,36 @@ func (s *Service) TaskOwnerIDUpMigration(ctx context.Context, store Store) error
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var taskOptionsPattern = regexp.MustCompile(`option\s+task\s*=\s*{.*}`)
|
||||||
|
|
||||||
|
// ExtractTaskOptions is a feature-flag driven switch between normal options
|
||||||
|
// parsing and a more simplified variant.
|
||||||
|
//
|
||||||
|
// The simplified variant extracts the options assignment and passes only that
|
||||||
|
// content through the parser. This allows us to allow scenarios like [1] to
|
||||||
|
// pass through options validation. One clear drawback of this is that it
|
||||||
|
// requires constant values for the parameter assignments. However, most people
|
||||||
|
// are doing that anyway.
|
||||||
|
//
|
||||||
|
// [1]: https://github.com/influxdata/influxdb/issues/17666
|
||||||
|
func ExtractTaskOptions(ctx context.Context, lang influxdb.FluxLanguageService, flux string) (options.Options, error) {
|
||||||
|
if !feature.SimpleTaskOptionsExtraction().Enabled(ctx) {
|
||||||
|
return options.FromScript(lang, flux)
|
||||||
|
}
|
||||||
|
|
||||||
|
matches := taskOptionsPattern.FindAllString(flux, -1)
|
||||||
|
if len(matches) == 0 {
|
||||||
|
return options.Options{}, &influxdb.Error{
|
||||||
|
Code: influxdb.EInvalid,
|
||||||
|
Msg: "no task options defined",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(matches) > 1 {
|
||||||
|
return options.Options{}, &influxdb.Error{
|
||||||
|
Code: influxdb.EInvalid,
|
||||||
|
Msg: "multiple task options defined",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return options.FromScript(lang, matches[0])
|
||||||
|
}
|
||||||
|
|
105
kv/task_test.go
105
kv/task_test.go
|
@ -12,10 +12,15 @@ import (
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
"github.com/influxdata/influxdb/v2"
|
"github.com/influxdata/influxdb/v2"
|
||||||
icontext "github.com/influxdata/influxdb/v2/context"
|
icontext "github.com/influxdata/influxdb/v2/context"
|
||||||
|
"github.com/influxdata/influxdb/v2/kit/feature"
|
||||||
"github.com/influxdata/influxdb/v2/kv"
|
"github.com/influxdata/influxdb/v2/kv"
|
||||||
|
"github.com/influxdata/influxdb/v2/mock"
|
||||||
_ "github.com/influxdata/influxdb/v2/query/builtin"
|
_ "github.com/influxdata/influxdb/v2/query/builtin"
|
||||||
"github.com/influxdata/influxdb/v2/query/fluxlang"
|
"github.com/influxdata/influxdb/v2/query/fluxlang"
|
||||||
|
"github.com/influxdata/influxdb/v2/task/options"
|
||||||
"github.com/influxdata/influxdb/v2/task/servicetest"
|
"github.com/influxdata/influxdb/v2/task/servicetest"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
"go.uber.org/zap/zaptest"
|
"go.uber.org/zap/zaptest"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -387,3 +392,103 @@ func TestTaskMigrate(t *testing.T) {
|
||||||
t.Fatal("failed to fill in ownerID")
|
t.Fatal("failed to fill in ownerID")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type taskOptions struct {
|
||||||
|
name string
|
||||||
|
every string
|
||||||
|
cron string
|
||||||
|
offset string
|
||||||
|
concurrency int64
|
||||||
|
retry int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExtractTaskOptions(t *testing.T) {
|
||||||
|
tcs := []struct {
|
||||||
|
name string
|
||||||
|
flux string
|
||||||
|
expected taskOptions
|
||||||
|
errMsg string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "all parameters",
|
||||||
|
flux: `option task = {name: "whatever", every: 1s, offset: 0s, concurrency: 2, retry: 2}`,
|
||||||
|
expected: taskOptions{
|
||||||
|
name: "whatever",
|
||||||
|
every: "1s",
|
||||||
|
offset: "0s",
|
||||||
|
concurrency: 2,
|
||||||
|
retry: 2,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "some extra whitespace and bad content around it",
|
||||||
|
flux: `howdy()
|
||||||
|
option task = { name:"whatever", cron: "* * * * *" }
|
||||||
|
hello()
|
||||||
|
`,
|
||||||
|
expected: taskOptions{
|
||||||
|
name: "whatever",
|
||||||
|
cron: "* * * * *",
|
||||||
|
concurrency: 1,
|
||||||
|
retry: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "bad options",
|
||||||
|
flux: `option task = {name: "whatever", every: 1s, cron: "* * * * *"}`,
|
||||||
|
errMsg: "cannot use both cron and every in task options",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no options",
|
||||||
|
flux: `doesntexist()`,
|
||||||
|
errMsg: "no task options defined",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "multiple assignments",
|
||||||
|
flux: `
|
||||||
|
option task = {name: "whatever", every: 1s, offset: 0s, concurrency: 2, retry: 2}
|
||||||
|
option task = {name: "whatever", every: 1s, offset: 0s, concurrency: 2, retry: 2}
|
||||||
|
`,
|
||||||
|
errMsg: "multiple task options defined",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tcs {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
flagger := mock.NewFlagger(map[feature.Flag]interface{}{
|
||||||
|
feature.SimpleTaskOptionsExtraction(): true,
|
||||||
|
})
|
||||||
|
ctx, _ := feature.Annotate(context.Background(), flagger)
|
||||||
|
opts, err := kv.ExtractTaskOptions(ctx, fluxlang.DefaultService, tc.flux)
|
||||||
|
if tc.errMsg != "" {
|
||||||
|
require.Error(t, err)
|
||||||
|
assert.Equal(t, tc.errMsg, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var offset options.Duration
|
||||||
|
if opts.Offset != nil {
|
||||||
|
offset = *opts.Offset
|
||||||
|
}
|
||||||
|
|
||||||
|
var concur int64
|
||||||
|
if opts.Concurrency != nil {
|
||||||
|
concur = *opts.Concurrency
|
||||||
|
}
|
||||||
|
|
||||||
|
var retry int64
|
||||||
|
if opts.Retry != nil {
|
||||||
|
retry = *opts.Retry
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(t, tc.expected.name, opts.Name)
|
||||||
|
assert.Equal(t, tc.expected.cron, opts.Cron)
|
||||||
|
assert.Equal(t, tc.expected.every, opts.Every.String())
|
||||||
|
assert.Equal(t, tc.expected.offset, offset.String())
|
||||||
|
assert.Equal(t, tc.expected.concurrency, concur)
|
||||||
|
assert.Equal(t, tc.expected.retry, retry)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue