feat(task): Parse Task Options using Flux AST Helpers (#19326)
* feat(task): Extract options using AST-based method. * feat(task): Use AST-based option APIs for updating task option. * chore(task): Use the old way of parsing durations. * fix(task): Ordering changed on us. Fixing tests to reflect the new order. * fix(task): There's no way for us to know if there are multiples with the current APIs. * chore(task): Guard against duplicate options. Minor cleanup. * fix(kit/feature): Break cyclical dependency between influxdb and pkgs that use feature. * chore(task): Feature flag updating Flux options. * chore(task): Ensure we are testing both paths of feature flag. * chore: Remove dead code. * chore(task/options): Remove unnecessary conditional. * chore(task/options): Unexport some error helpers.pull/19401/head
parent
519f60f7d5
commit
0780232b83
|
@ -1,10 +1,10 @@
|
|||
package feature
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -44,8 +44,16 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
// HTTPErrorHandler is an influxdb.HTTPErrorHandler. It's defined here instead
|
||||
// of referencing the other interface type, because we want to try our best to
|
||||
// avoid cyclical dependencies when feature package is used throughout the
|
||||
// codebase.
|
||||
type HTTPErrorHandler interface {
|
||||
HandleHTTPError(ctx context.Context, err error, w http.ResponseWriter)
|
||||
}
|
||||
|
||||
// NewFlagsHandler returns a handler that returns the map of computed feature flags on the request context.
|
||||
func NewFlagsHandler(errorHandler influxdb.HTTPErrorHandler, byKey ByKeyFn) http.Handler {
|
||||
func NewFlagsHandler(errorHandler HTTPErrorHandler, byKey ByKeyFn) http.Handler {
|
||||
fn := func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
|
|
25
kv/task.go
25
kv/task.go
|
@ -3,7 +3,6 @@ package kv
|
|||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
@ -708,7 +707,7 @@ func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd inf
|
|||
|
||||
// update the flux script
|
||||
if !upd.Options.IsZero() || upd.Flux != nil {
|
||||
if err = upd.UpdateFlux(s.FluxLanguageService, task.Flux); err != nil {
|
||||
if err = upd.UpdateFlux(ctx, s.FluxLanguageService, task.Flux); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
task.Flux = *upd.Flux
|
||||
|
@ -1661,8 +1660,6 @@ func taskRunKey(taskID, runID influxdb.ID) ([]byte, error) {
|
|||
return []byte(string(encodedID) + "/" + string(encodedRunID)), nil
|
||||
}
|
||||
|
||||
var taskOptionsPattern = regexp.MustCompile(`option\s+task\s*=\s*{.*}`)
|
||||
|
||||
// ExtractTaskOptions is a feature-flag driven switch between normal options
|
||||
// parsing and a more simplified variant.
|
||||
//
|
||||
|
@ -1674,24 +1671,10 @@ var taskOptionsPattern = regexp.MustCompile(`option\s+task\s*=\s*{.*}`)
|
|||
//
|
||||
// [1]: https://github.com/influxdata/influxdb/issues/17666
|
||||
func ExtractTaskOptions(ctx context.Context, lang influxdb.FluxLanguageService, flux string) (options.Options, error) {
|
||||
if !feature.SimpleTaskOptionsExtraction().Enabled(ctx) {
|
||||
return options.FromScript(lang, flux)
|
||||
if feature.SimpleTaskOptionsExtraction().Enabled(ctx) {
|
||||
return options.FromScriptAST(lang, flux)
|
||||
}
|
||||
|
||||
matches := taskOptionsPattern.FindAllString(flux, -1)
|
||||
if len(matches) == 0 {
|
||||
return options.Options{}, &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Msg: "no task options defined",
|
||||
}
|
||||
}
|
||||
if len(matches) > 1 {
|
||||
return options.Options{}, &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Msg: "multiple task options defined",
|
||||
}
|
||||
}
|
||||
return options.FromScript(lang, matches[0])
|
||||
return options.FromScript(lang, flux)
|
||||
}
|
||||
|
||||
func (s *Service) maxPermissions(ctx context.Context, tx Tx, userID influxdb.ID) ([]influxdb.Permission, error) {
|
||||
|
|
66
task.go
66
task.go
|
@ -10,6 +10,7 @@ import (
|
|||
|
||||
"github.com/influxdata/flux/ast"
|
||||
"github.com/influxdata/flux/ast/edit"
|
||||
"github.com/influxdata/influxdb/v2/kit/feature"
|
||||
"github.com/influxdata/influxdb/v2/task/options"
|
||||
)
|
||||
|
||||
|
@ -308,9 +309,17 @@ func safeParseSource(parser FluxLanguageService, f string) (pkg *ast.Package, er
|
|||
return parser.Parse(f)
|
||||
}
|
||||
|
||||
// UpdateFlux updates the TaskUpdate to go from updating options to updating a flux string, that now has those updated options in it
|
||||
// It zeros the options in the TaskUpdate.
|
||||
func (t *TaskUpdate) UpdateFlux(parser FluxLanguageService, oldFlux string) (err error) {
|
||||
// UpdateFlux updates the TaskUpdate to go from updating options to updating a
|
||||
// flux string, that now has those updated options in it. It zeros the options
|
||||
// in the TaskUpdate.
|
||||
func (t *TaskUpdate) UpdateFlux(ctx context.Context, parser FluxLanguageService, oldFlux string) error {
|
||||
if !feature.SimpleTaskOptionsExtraction().Enabled(ctx) {
|
||||
return t.updateFluxAST(parser, oldFlux)
|
||||
}
|
||||
return t.updateFlux(parser, oldFlux)
|
||||
}
|
||||
|
||||
func (t *TaskUpdate) updateFlux(parser FluxLanguageService, oldFlux string) error {
|
||||
if t.Flux != nil && *t.Flux != "" {
|
||||
oldFlux = *t.Flux
|
||||
}
|
||||
|
@ -415,6 +424,57 @@ func (t *TaskUpdate) UpdateFlux(parser FluxLanguageService, oldFlux string) (err
|
|||
return nil
|
||||
}
|
||||
|
||||
func (t *TaskUpdate) updateFluxAST(parser FluxLanguageService, oldFlux string) error {
|
||||
if t.Flux != nil && *t.Flux != "" {
|
||||
oldFlux = *t.Flux
|
||||
}
|
||||
parsedPKG, err := safeParseSource(parser, oldFlux)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
parsed := parsedPKG.Files[0]
|
||||
if !t.Options.Every.IsZero() && t.Options.Cron != "" {
|
||||
return errors.New("cannot specify both cron and every")
|
||||
}
|
||||
|
||||
taskOptions, err := edit.GetOption(parsed, "task")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
optsExpr := taskOptions.(*ast.ObjectExpression)
|
||||
|
||||
if t.Options.Name != "" {
|
||||
edit.SetProperty(optsExpr, "name", &ast.StringLiteral{
|
||||
Value: t.Options.Name,
|
||||
})
|
||||
}
|
||||
if !t.Options.Every.IsZero() {
|
||||
edit.SetProperty(optsExpr, "every", t.Options.Every.Node.Copy().(*ast.DurationLiteral))
|
||||
edit.DeleteProperty(optsExpr, "cron")
|
||||
}
|
||||
if t.Options.Cron != "" {
|
||||
edit.SetProperty(optsExpr, "cron", &ast.StringLiteral{
|
||||
Value: t.Options.Cron,
|
||||
})
|
||||
edit.DeleteProperty(optsExpr, "every")
|
||||
}
|
||||
if t.Options.Offset != nil {
|
||||
if !t.Options.Offset.IsZero() {
|
||||
edit.SetProperty(optsExpr, "offset", t.Options.Offset.Node.Copy().(*ast.DurationLiteral))
|
||||
} else {
|
||||
edit.DeleteProperty(optsExpr, "offset")
|
||||
}
|
||||
}
|
||||
|
||||
t.Options.Clear()
|
||||
s := ast.Format(parsed)
|
||||
t.Flux = &s
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// TaskFilter represents a set of filters that restrict the returned results
|
||||
type TaskFilter struct {
|
||||
Type *string
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"github.com/influxdata/cron"
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/flux/ast"
|
||||
"github.com/influxdata/flux/ast/edit"
|
||||
"github.com/influxdata/flux/interpreter"
|
||||
"github.com/influxdata/flux/semantic"
|
||||
"github.com/influxdata/flux/values"
|
||||
|
@ -54,7 +55,7 @@ func (a Duration) String() string {
|
|||
func (a *Duration) Parse(s string) error {
|
||||
q, err := ParseSignedDuration(s)
|
||||
if err != nil {
|
||||
return ErrTaskInvalidDuration(err)
|
||||
return errTaskInvalidDuration(err)
|
||||
}
|
||||
a.Node = *q
|
||||
return nil
|
||||
|
@ -206,6 +207,179 @@ type FluxLanguageService interface {
|
|||
EvalAST(ctx context.Context, astPkg *ast.Package) ([]interpreter.SideEffect, values.Scope, error)
|
||||
}
|
||||
|
||||
// FromScriptAST extracts Task options from a Flux script using only the AST (no
|
||||
// evaluation of the script). Using AST here allows us to avoid having to
|
||||
// contend with functions that aren't available in some parsing contexts (within
|
||||
// Gateway for example).
|
||||
func FromScriptAST(lang FluxLanguageService, script string) (Options, error) {
|
||||
opts := Options{
|
||||
Retry: pointer.Int64(1),
|
||||
Concurrency: pointer.Int64(1),
|
||||
}
|
||||
|
||||
fluxAST, err := parse(lang, script)
|
||||
if err != nil {
|
||||
return opts, err
|
||||
}
|
||||
|
||||
if len(fluxAST.Files) == 0 {
|
||||
return opts, ErrNoASTFile
|
||||
}
|
||||
|
||||
file := fluxAST.Files[0]
|
||||
if hasDuplicateOptions(file, "task") {
|
||||
return opts, ErrMultipleTaskOptionsDefined
|
||||
}
|
||||
|
||||
obj, err := edit.GetOption(file, "task")
|
||||
if err != nil {
|
||||
return opts, ErrNoTaskOptionsDefined
|
||||
}
|
||||
|
||||
objExpr, ok := obj.(*ast.ObjectExpression)
|
||||
if !ok {
|
||||
return opts, errTaskOptionNotObjectExpression(objExpr.Type())
|
||||
}
|
||||
|
||||
for _, fn := range taskOptionExtractors {
|
||||
if err := fn(&opts, objExpr); err != nil {
|
||||
return opts, err
|
||||
}
|
||||
}
|
||||
|
||||
if err := opts.Validate(); err != nil {
|
||||
return opts, err
|
||||
}
|
||||
|
||||
return opts, nil
|
||||
}
|
||||
|
||||
// hasDuplicateOptions determines whether or not there are multiple assignments
|
||||
// to the same option variable.
|
||||
//
|
||||
// TODO(brett): This will be superceded by edit.HasDuplicateOptions once its available.
|
||||
func hasDuplicateOptions(file *ast.File, name string) bool {
|
||||
var n int
|
||||
for _, st := range file.Body {
|
||||
if val, ok := st.(*ast.OptionStatement); ok {
|
||||
assign := val.Assignment
|
||||
if va, ok := assign.(*ast.VariableAssignment); ok {
|
||||
if va.ID.Name == name {
|
||||
n++
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return n > 1
|
||||
}
|
||||
|
||||
type extractFn func(*Options, *ast.ObjectExpression) error
|
||||
|
||||
var taskOptionExtractors = []extractFn{
|
||||
extractNameOption,
|
||||
extractScheduleOptions,
|
||||
extractOffsetOption,
|
||||
extractConcurrencyOption,
|
||||
extractRetryOption,
|
||||
}
|
||||
|
||||
func extractNameOption(opts *Options, objExpr *ast.ObjectExpression) error {
|
||||
nameExpr, err := edit.GetProperty(objExpr, optName)
|
||||
if err != nil {
|
||||
return errMissingRequiredTaskOption(optName)
|
||||
}
|
||||
nameStr, ok := nameExpr.(*ast.StringLiteral)
|
||||
if !ok {
|
||||
return errParseTaskOptionField(optName)
|
||||
}
|
||||
opts.Name = ast.StringFromLiteral(nameStr)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func extractScheduleOptions(opts *Options, objExpr *ast.ObjectExpression) error {
|
||||
cronExpr, cronErr := edit.GetProperty(objExpr, optCron)
|
||||
everyExpr, everyErr := edit.GetProperty(objExpr, optEvery)
|
||||
if cronErr == nil && everyErr == nil {
|
||||
return ErrDuplicateIntervalField
|
||||
}
|
||||
if cronErr != nil && everyErr != nil {
|
||||
return errMissingRequiredTaskOption("cron or every")
|
||||
}
|
||||
|
||||
if cronErr == nil {
|
||||
cronExprStr, ok := cronExpr.(*ast.StringLiteral)
|
||||
if !ok {
|
||||
return errParseTaskOptionField(optCron)
|
||||
}
|
||||
opts.Cron = ast.StringFromLiteral(cronExprStr)
|
||||
}
|
||||
|
||||
if everyErr == nil {
|
||||
everyDur, ok := everyExpr.(*ast.DurationLiteral)
|
||||
if !ok {
|
||||
return errParseTaskOptionField(optEvery)
|
||||
}
|
||||
opts.Every = Duration{Node: *everyDur}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func extractOffsetOption(opts *Options, objExpr *ast.ObjectExpression) error {
|
||||
offsetExpr, offsetErr := edit.GetProperty(objExpr, optOffset)
|
||||
if offsetErr != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
switch offsetExprV := offsetExpr.(type) {
|
||||
case *ast.UnaryExpression:
|
||||
offsetDur, err := ParseSignedDuration(offsetExprV.Loc.Source)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
opts.Offset = &Duration{Node: *offsetDur}
|
||||
case *ast.DurationLiteral:
|
||||
opts.Offset = &Duration{Node: *offsetExprV}
|
||||
default:
|
||||
return errParseTaskOptionField(optOffset)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func extractConcurrencyOption(opts *Options, objExpr *ast.ObjectExpression) error {
|
||||
concurExpr, err := edit.GetProperty(objExpr, optConcurrency)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
concurInt, ok := concurExpr.(*ast.IntegerLiteral)
|
||||
if !ok {
|
||||
return errParseTaskOptionField(optConcurrency)
|
||||
}
|
||||
val := ast.IntegerFromLiteral(concurInt)
|
||||
opts.Concurrency = &val
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func extractRetryOption(opts *Options, objExpr *ast.ObjectExpression) error {
|
||||
retryExpr, err := edit.GetProperty(objExpr, optRetry)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
retryInt, ok := retryExpr.(*ast.IntegerLiteral)
|
||||
if !ok {
|
||||
return errParseTaskOptionField(optRetry)
|
||||
}
|
||||
val := ast.IntegerFromLiteral(retryInt)
|
||||
opts.Retry = &val
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// FromScript extracts Options from a Flux script.
|
||||
func FromScript(lang FluxLanguageService, script string) (Options, error) {
|
||||
opt := Options{Retry: pointer.Int64(1), Concurrency: pointer.Int64(1)}
|
||||
|
@ -225,7 +399,7 @@ func FromScript(lang FluxLanguageService, script string) (Options, error) {
|
|||
// pull options from the program scope
|
||||
task, ok := scope.Lookup("task")
|
||||
if !ok {
|
||||
return opt, ErrMissingRequiredTaskOption("task")
|
||||
return opt, errMissingRequiredTaskOption("task")
|
||||
}
|
||||
// check to make sure task is an object
|
||||
if err := checkNature(task.Type().Nature(), semantic.Object); err != nil {
|
||||
|
@ -238,7 +412,7 @@ func FromScript(lang FluxLanguageService, script string) (Options, error) {
|
|||
|
||||
nameVal, ok := optObject.Get(optName)
|
||||
if !ok {
|
||||
return opt, ErrMissingRequiredTaskOption("name")
|
||||
return opt, errMissingRequiredTaskOption("name")
|
||||
}
|
||||
|
||||
if err := checkNature(nameVal.Type().Nature(), semantic.String); err != nil {
|
||||
|
@ -252,7 +426,7 @@ func FromScript(lang FluxLanguageService, script string) (Options, error) {
|
|||
}
|
||||
|
||||
if !cronOK && !everyOK {
|
||||
return opt, ErrMissingRequiredTaskOption("cron or every is required")
|
||||
return opt, errMissingRequiredTaskOption("cron or every is required")
|
||||
}
|
||||
|
||||
if cronOK {
|
||||
|
@ -268,7 +442,7 @@ func FromScript(lang FluxLanguageService, script string) (Options, error) {
|
|||
}
|
||||
dur, ok := durTypes["every"]
|
||||
if !ok || dur == nil {
|
||||
return opt, ErrParseTaskOptionField("every")
|
||||
return opt, errParseTaskOptionField("every")
|
||||
}
|
||||
durNode, err := ParseSignedDuration(dur.Location().Source)
|
||||
if err != nil {
|
||||
|
@ -276,7 +450,7 @@ func FromScript(lang FluxLanguageService, script string) (Options, error) {
|
|||
}
|
||||
|
||||
if !ok || durNode == nil {
|
||||
return opt, ErrParseTaskOptionField("every")
|
||||
return opt, errParseTaskOptionField("every")
|
||||
}
|
||||
|
||||
durNode.BaseNode = ast.BaseNode{}
|
||||
|
@ -289,14 +463,14 @@ func FromScript(lang FluxLanguageService, script string) (Options, error) {
|
|||
}
|
||||
dur, ok := durTypes["offset"]
|
||||
if !ok || dur == nil {
|
||||
return opt, ErrParseTaskOptionField("offset")
|
||||
return opt, errParseTaskOptionField("offset")
|
||||
}
|
||||
durNode, err := ParseSignedDuration(dur.Location().Source)
|
||||
if err != nil {
|
||||
return opt, err
|
||||
}
|
||||
if !ok || durNode == nil {
|
||||
return opt, ErrParseTaskOptionField("offset")
|
||||
return opt, errParseTaskOptionField("offset")
|
||||
}
|
||||
durNode.BaseNode = ast.BaseNode{}
|
||||
opt.Offset = &Duration{}
|
||||
|
|
|
@ -1,22 +1,36 @@
|
|||
package options
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
func ErrParseTaskOptionField(opt string) error {
|
||||
// errParseTaskOptionField is returned when we fail to parse a single field in
|
||||
// task options.
|
||||
func errParseTaskOptionField(opt string) error {
|
||||
return fmt.Errorf("failed to parse field '%s' in task options", opt)
|
||||
}
|
||||
|
||||
func ErrMissingRequiredTaskOption(opt string) error {
|
||||
// errMissingRequiredTaskOption is returned when we a required option is
|
||||
// missing.
|
||||
func errMissingRequiredTaskOption(opt string) error {
|
||||
return fmt.Errorf("missing required option: %s", opt)
|
||||
}
|
||||
|
||||
// ErrTaskInvalidDuration is returned when an "every" or "offset" option is invalid in a task.
|
||||
func ErrTaskInvalidDuration(err error) error {
|
||||
// errTaskInvalidDuration is returned when an "every" or "offset" option is invalid in a task.
|
||||
func errTaskInvalidDuration(err error) error {
|
||||
return fmt.Errorf("invalid duration in task %s", err)
|
||||
}
|
||||
|
||||
// errTaskOptionNotObjectExpression is returned when the type of an task option
|
||||
// value is not an object literal expression.
|
||||
func errTaskOptionNotObjectExpression(actualType string) error {
|
||||
return fmt.Errorf("task option expected to be object literal, but found %q", actualType)
|
||||
}
|
||||
|
||||
var (
|
||||
ErrDuplicateIntervalField = fmt.Errorf("cannot use both cron and every in task options")
|
||||
ErrDuplicateIntervalField = errors.New("cannot use both cron and every in task options")
|
||||
ErrNoTaskOptionsDefined = errors.New("no task options defined")
|
||||
ErrMultipleTaskOptionsDefined = errors.New("multiple task options defined")
|
||||
ErrNoASTFile = errors.New("expected parsed file, but found none")
|
||||
)
|
||||
|
|
|
@ -8,6 +8,8 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"github.com/influxdata/flux/ast"
|
||||
"github.com/influxdata/influxdb/v2/pkg/pointer"
|
||||
_ "github.com/influxdata/influxdb/v2/query/builtin"
|
||||
"github.com/influxdata/influxdb/v2/query/fluxlang"
|
||||
|
@ -57,6 +59,72 @@ func TestNegDurations(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestFromScriptAST(t *testing.T) {
|
||||
for _, c := range []struct {
|
||||
script string
|
||||
exp options.Options
|
||||
shouldErr bool
|
||||
}{
|
||||
{script: scriptGenerator(options.Options{Name: "name0", Cron: "* * * * *", Concurrency: pointer.Int64(2), Retry: pointer.Int64(3), Offset: options.MustParseDuration("-1m")}, ""),
|
||||
exp: options.Options{Name: "name0",
|
||||
Cron: "* * * * *",
|
||||
Concurrency: pointer.Int64(2),
|
||||
Retry: pointer.Int64(3),
|
||||
Offset: options.MustParseDuration("-1m")}},
|
||||
{script: scriptGenerator(options.Options{Name: "name1", Every: *(options.MustParseDuration("5s"))}, ""), exp: options.Options{Name: "name1", Every: *(options.MustParseDuration("5s")), Concurrency: pointer.Int64(1), Retry: pointer.Int64(1)}},
|
||||
{script: scriptGenerator(options.Options{Name: "name2", Cron: "* * * * *"}, ""), exp: options.Options{Name: "name2", Cron: "* * * * *", Concurrency: pointer.Int64(1), Retry: pointer.Int64(1)}},
|
||||
{script: scriptGenerator(options.Options{Name: "name3", Every: *(options.MustParseDuration("1h")), Cron: "* * * * *"}, ""), shouldErr: true},
|
||||
{script: scriptGenerator(options.Options{Name: "name4", Concurrency: pointer.Int64(1000), Every: *(options.MustParseDuration("1h"))}, ""), shouldErr: true},
|
||||
{script: "option task = {\n name: \"name5\",\n concurrency: 0,\n every: 1m0s,\n\n}\n\nfrom(bucket: \"test\")\n |> range(start:-1h)", shouldErr: true},
|
||||
{script: "option task = {\n name: \"name6\",\n concurrency: 1,\n every: 1,\n\n}\n\nfrom(bucket: \"test\")\n |> range(start:-1h)", shouldErr: true},
|
||||
{script: scriptGenerator(options.Options{Name: "name7", Retry: pointer.Int64(20), Every: *(options.MustParseDuration("1h"))}, ""), shouldErr: true},
|
||||
{script: "option task = {\n name: \"name8\",\n retry: 0,\n every: 1m0s,\n\n}\n\nfrom(bucket: \"test\")\n |> range(start:-1h)", shouldErr: true},
|
||||
{script: scriptGenerator(options.Options{Name: "name9"}, ""), shouldErr: true},
|
||||
{script: scriptGenerator(options.Options{}, ""), shouldErr: true},
|
||||
{script: `option task = {
|
||||
name: "name10",
|
||||
every: 1d,
|
||||
offset: 1m,
|
||||
}
|
||||
from(bucket: "metrics")
|
||||
|> range(start: now(), stop: 8w)
|
||||
`,
|
||||
exp: options.Options{Name: "name10", Every: *(options.MustParseDuration("1d")), Concurrency: pointer.Int64(1), Retry: pointer.Int64(1), Offset: options.MustParseDuration("1m")},
|
||||
},
|
||||
{script: `option task = {
|
||||
name: "name11",
|
||||
every: 1m,
|
||||
offset: 1d,
|
||||
}
|
||||
from(bucket: "metrics")
|
||||
|> range(start: now(), stop: 8w)
|
||||
|
||||
`,
|
||||
exp: options.Options{Name: "name11", Every: *(options.MustParseDuration("1m")), Concurrency: pointer.Int64(1), Retry: pointer.Int64(1), Offset: options.MustParseDuration("1d")},
|
||||
},
|
||||
{script: "option task = {name:\"test_task_smoke_name\", every:30s} from(bucket:\"test_tasks_smoke_bucket_source\") |> range(start: -1h) |> map(fn: (r) => ({r with _time: r._time, _value:r._value, t : \"quality_rocks\"}))|> to(bucket:\"test_tasks_smoke_bucket_dest\", orgID:\"3e73e749495d37d5\")",
|
||||
exp: options.Options{Name: "test_task_smoke_name", Every: *(options.MustParseDuration("30s")), Retry: pointer.Int64(1), Concurrency: pointer.Int64(1)}, shouldErr: false}, // TODO(docmerlin): remove this once tasks fully supports all flux duration units.
|
||||
|
||||
} {
|
||||
o, err := options.FromScriptAST(fluxlang.DefaultService, c.script)
|
||||
if c.shouldErr && err == nil {
|
||||
t.Fatalf("script %q should have errored but didn't", c.script)
|
||||
} else if !c.shouldErr && err != nil {
|
||||
t.Fatalf("script %q should not have errored, but got %v", c.script, err)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
ignoreLocation := cmpopts.IgnoreFields(ast.BaseNode{}, "Loc")
|
||||
|
||||
if !cmp.Equal(o, c.exp, ignoreLocation) {
|
||||
t.Fatalf("script %q got unexpected result -got/+exp\n%s", c.script, cmp.Diff(o, c.exp))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFromScript(t *testing.T) {
|
||||
for _, c := range []struct {
|
||||
script string
|
||||
|
|
|
@ -375,6 +375,7 @@ func testTaskCRUD(t *testing.T, sys *System) {
|
|||
if origID != f.ID {
|
||||
t.Fatalf("task ID unexpectedly changed during update, from %s to %s", origID.String(), f.ID.String())
|
||||
}
|
||||
|
||||
if f.Flux != newFlux {
|
||||
t.Fatalf("wrong flux from update; want %q, got %q", newFlux, f.Flux)
|
||||
}
|
||||
|
@ -426,7 +427,7 @@ func testTaskCRUD(t *testing.T, sys *System) {
|
|||
|
||||
// Update task: switch to every.
|
||||
newStatus = string(influxdb.TaskActive)
|
||||
newFlux = "option task = {\n\tname: \"task-changed #98\",\n\tevery: 30s,\n\toffset: 5s,\n\tconcurrency: 100,\n}\n\nfrom(bucket: \"b\")\n\t|> to(bucket: \"two\", orgID: \"000000000000000\")"
|
||||
newFlux = "option task = {\n\tname: \"task-changed #98\",\n\toffset: 5s,\n\tconcurrency: 100,\n\tevery: 30s,\n}\n\nfrom(bucket: \"b\")\n\t|> to(bucket: \"two\", orgID: \"000000000000000\")"
|
||||
f, err = sys.TaskService.UpdateTask(authorizedCtx, origID, influxdb.TaskUpdate{Options: options.Options{Every: *(options.MustParseDuration("30s"))}})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -655,7 +656,7 @@ from(bucket: "b")
|
|||
t.Fatal(err)
|
||||
}
|
||||
t.Run("update task and delete offset", func(t *testing.T) {
|
||||
expectedFlux := `option task = {name: "task-Options-Update", every: 10s, concurrency: 100}
|
||||
expectedFlux := `option task = {name: "task-Options-Update", concurrency: 100, every: 10s}
|
||||
|
||||
from(bucket: "b")
|
||||
|> to(bucket: "two", orgID: "000000000000000")`
|
||||
|
@ -675,8 +676,8 @@ from(bucket: "b")
|
|||
t.Run("update task with different offset option", func(t *testing.T) {
|
||||
expectedFlux := `option task = {
|
||||
name: "task-Options-Update",
|
||||
every: 10s,
|
||||
concurrency: 100,
|
||||
every: 10s,
|
||||
offset: 10s,
|
||||
}
|
||||
|
||||
|
@ -1738,14 +1739,14 @@ const (
|
|||
concurrency: 100,
|
||||
}
|
||||
|
||||
from(bucket:"b")
|
||||
from(bucket: "b")
|
||||
|> to(bucket: "two", orgID: "000000000000000")`
|
||||
|
||||
scriptDifferentName = `option task = {
|
||||
name: "task-changed #%d",
|
||||
cron: "* * * * *",
|
||||
offset: 5s,
|
||||
concurrency: 100,
|
||||
cron: "* * * * *",
|
||||
}
|
||||
|
||||
from(bucket: "b")
|
||||
|
|
31
task_test.go
31
task_test.go
|
@ -1,11 +1,14 @@
|
|||
package influxdb_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
platform "github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/kit/feature"
|
||||
"github.com/influxdata/influxdb/v2/mock"
|
||||
_ "github.com/influxdata/influxdb/v2/query/builtin"
|
||||
"github.com/influxdata/influxdb/v2/query/fluxlang"
|
||||
"github.com/influxdata/influxdb/v2/task/options"
|
||||
|
@ -53,10 +56,26 @@ func TestOptionsMarshal(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestOptionsEditWithAST(t *testing.T) {
|
||||
flagger := mock.NewFlagger(map[feature.Flag]interface{}{
|
||||
feature.SimpleTaskOptionsExtraction(): true,
|
||||
})
|
||||
testOptionsEdit(t, flagger)
|
||||
}
|
||||
|
||||
func TestOptionsEdit(t *testing.T) {
|
||||
flagger := mock.NewFlagger(map[feature.Flag]interface{}{
|
||||
feature.SimpleTaskOptionsExtraction(): false,
|
||||
})
|
||||
testOptionsEdit(t, flagger)
|
||||
}
|
||||
|
||||
func testOptionsEdit(t *testing.T, flagger feature.Flagger) {
|
||||
ctx, _ := feature.Annotate(context.Background(), flagger)
|
||||
|
||||
tu := &platform.TaskUpdate{}
|
||||
tu.Options.Every = *(options.MustParseDuration("10s"))
|
||||
if err := tu.UpdateFlux(fluxlang.DefaultService, `option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil {
|
||||
if err := tu.UpdateFlux(ctx, fluxlang.DefaultService, `option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Run("zeroing", func(t *testing.T) {
|
||||
|
@ -86,7 +105,7 @@ from(bucket: "x")
|
|||
t.Run("add new option", func(t *testing.T) {
|
||||
tu := &platform.TaskUpdate{}
|
||||
tu.Options.Offset = options.MustParseDuration("30s")
|
||||
if err := tu.UpdateFlux(fluxlang.DefaultService, `option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil {
|
||||
if err := tu.UpdateFlux(ctx, fluxlang.DefaultService, `option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
op, err := options.FromScript(fluxlang.DefaultService, *tu.Flux)
|
||||
|
@ -94,13 +113,13 @@ from(bucket: "x")
|
|||
t.Error(err)
|
||||
}
|
||||
if op.Offset == nil || op.Offset.String() != "30s" {
|
||||
t.Fatalf("expected every to be 30s but was %s", op.Every)
|
||||
t.Fatalf("expected offset to be 30s but was %s", op.Offset)
|
||||
}
|
||||
})
|
||||
t.Run("switching from every to cron", func(t *testing.T) {
|
||||
tu := &platform.TaskUpdate{}
|
||||
tu.Options.Cron = "* * * * *"
|
||||
if err := tu.UpdateFlux(fluxlang.DefaultService, `option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil {
|
||||
if err := tu.UpdateFlux(ctx, fluxlang.DefaultService, `option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
op, err := options.FromScript(fluxlang.DefaultService, *tu.Flux)
|
||||
|
@ -117,7 +136,7 @@ from(bucket: "x")
|
|||
t.Run("switching from cron to every", func(t *testing.T) {
|
||||
tu := &platform.TaskUpdate{}
|
||||
tu.Options.Every = *(options.MustParseDuration("10s"))
|
||||
if err := tu.UpdateFlux(fluxlang.DefaultService, `option task = {cron: "* * * * *", name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil {
|
||||
if err := tu.UpdateFlux(ctx, fluxlang.DefaultService, `option task = {cron: "* * * * *", name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
op, err := options.FromScript(fluxlang.DefaultService, *tu.Flux)
|
||||
|
@ -138,7 +157,7 @@ from(bucket: "x")
|
|||
|
||||
from(bucket: "x")
|
||||
|> range(start: -1h)`
|
||||
if err := tu.UpdateFlux(fluxlang.DefaultService, `option task = {cron: "* * * * *", name: "foo", offset: 10s} from(bucket:"x") |> range(start:-1h)`); err != nil {
|
||||
if err := tu.UpdateFlux(ctx, fluxlang.DefaultService, `option task = {cron: "* * * * *", name: "foo", offset: 10s} from(bucket:"x") |> range(start:-1h)`); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
op, err := options.FromScript(fluxlang.DefaultService, *tu.Flux)
|
||||
|
|
Loading…
Reference in New Issue