fix(tasks): allow switching between cron and every options
parent
6e1ee40f45
commit
df75f9b9a0
95
task.go
95
task.go
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/flux"
|
"github.com/influxdata/flux"
|
||||||
|
@ -114,7 +115,7 @@ func (t *TaskUpdate) UnmarshalJSON(data []byte) error {
|
||||||
jo := struct {
|
jo := struct {
|
||||||
Flux *string `json:"flux,omitempty"`
|
Flux *string `json:"flux,omitempty"`
|
||||||
Status *string `json:"status,omitempty"`
|
Status *string `json:"status,omitempty"`
|
||||||
Name string `json:"options,omitempty"`
|
Name string `json:"name,omitempty"`
|
||||||
|
|
||||||
// Cron is a cron style time schedule that can be used in place of Every.
|
// Cron is a cron style time schedule that can be used in place of Every.
|
||||||
Cron string `json:"cron,omitempty"`
|
Cron string `json:"cron,omitempty"`
|
||||||
|
@ -154,7 +155,7 @@ func (t TaskUpdate) MarshalJSON() ([]byte, error) {
|
||||||
jo := struct {
|
jo := struct {
|
||||||
Flux *string `json:"flux,omitempty"`
|
Flux *string `json:"flux,omitempty"`
|
||||||
Status *string `json:"status,omitempty"`
|
Status *string `json:"status,omitempty"`
|
||||||
Name string `json:"options,omitempty"`
|
Name string `json:"name,omitempty"`
|
||||||
|
|
||||||
// Cron is a cron style time schedule that can be used in place of Every.
|
// Cron is a cron style time schedule that can be used in place of Every.
|
||||||
Cron string `json:"cron,omitempty"`
|
Cron string `json:"cron,omitempty"`
|
||||||
|
@ -203,35 +204,89 @@ func (t *TaskUpdate) UpdateFlux(oldFlux string) error {
|
||||||
if ast.Check(parsedPKG) > 0 {
|
if ast.Check(parsedPKG) > 0 {
|
||||||
return ast.GetError(parsedPKG)
|
return ast.GetError(parsedPKG)
|
||||||
}
|
}
|
||||||
parsed := parsedPKG.Files[0] //TODO: remove this line when flux 0.14 is upgraded into platform
|
parsed := parsedPKG.Files[0]
|
||||||
if t.Options.Every != 0 && t.Options.Cron != "" {
|
if t.Options.Every != 0 && t.Options.Cron != "" {
|
||||||
return errors.New("cannot specify both every and cron")
|
return errors.New("cannot specify both every and cron")
|
||||||
}
|
}
|
||||||
// so we don't allocate if we are just changing the status
|
op := make(map[string]ast.Expression, 4)
|
||||||
if t.Options.Name != "" || t.Options.Every != 0 || t.Options.Cron != "" || t.Options.Offset != 0 {
|
|
||||||
op := make(map[string]ast.Expression, 4)
|
|
||||||
|
|
||||||
if t.Options.Name != "" {
|
if t.Options.Name != "" {
|
||||||
op["name"] = &ast.StringLiteral{Value: t.Options.Name}
|
op["name"] = &ast.StringLiteral{Value: t.Options.Name}
|
||||||
|
}
|
||||||
|
if t.Options.Every != 0 {
|
||||||
|
d := ast.Duration{Magnitude: int64(t.Options.Every), Unit: "ns"}
|
||||||
|
op["every"] = &ast.DurationLiteral{Values: []ast.Duration{d}}
|
||||||
|
}
|
||||||
|
if t.Options.Cron != "" {
|
||||||
|
op["cron"] = &ast.StringLiteral{Value: t.Options.Cron}
|
||||||
|
}
|
||||||
|
if t.Options.Offset != 0 {
|
||||||
|
d := ast.Duration{Magnitude: int64(t.Options.Offset), Unit: "ns"}
|
||||||
|
op["offset"] = &ast.DurationLiteral{Values: []ast.Duration{d}}
|
||||||
|
}
|
||||||
|
if len(op) > 0 {
|
||||||
|
editFunc := func(opt *ast.OptionStatement) (ast.Expression, error) {
|
||||||
|
a, ok := opt.Assignment.(*ast.VariableAssignment)
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.New("option assignment must be variable assignment")
|
||||||
|
}
|
||||||
|
obj, ok := a.Init.(*ast.ObjectExpression)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("value is is %s, not an object expression", a.Init.Type())
|
||||||
|
}
|
||||||
|
// modify in the keys and values that already are in the ast
|
||||||
|
for _, p := range obj.Properties {
|
||||||
|
k := p.Key.Key()
|
||||||
|
switch k {
|
||||||
|
case "name":
|
||||||
|
if name, ok := op["name"]; ok && t.Options.Name != "" {
|
||||||
|
delete(op, "name")
|
||||||
|
p.Value = name
|
||||||
|
}
|
||||||
|
case "offset":
|
||||||
|
if offset, ok := op["offset"]; ok && t.Options.Offset != 0 {
|
||||||
|
delete(op, "offset")
|
||||||
|
p.Value = offset
|
||||||
|
}
|
||||||
|
case "every":
|
||||||
|
if every, ok := op["every"]; ok && t.Options.Every != 0 {
|
||||||
|
delete(op, "every")
|
||||||
|
p.Value = every
|
||||||
|
} else if cron, ok := op["cron"]; ok && t.Options.Cron != "" {
|
||||||
|
delete(op, "cron")
|
||||||
|
p.Value = cron
|
||||||
|
p.Key = &ast.Identifier{Name: "cron"}
|
||||||
|
}
|
||||||
|
case "cron":
|
||||||
|
if cron, ok := op["cron"]; ok && t.Options.Cron != "" {
|
||||||
|
delete(op, "cron")
|
||||||
|
p.Value = cron
|
||||||
|
} else if every, ok := op["every"]; ok && t.Options.Every != 0 {
|
||||||
|
delete(op, "every")
|
||||||
|
p.Key = &ast.Identifier{Name: "every"}
|
||||||
|
p.Value = every
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// add in new keys and values to the ast
|
||||||
|
for k := range op {
|
||||||
|
obj.Properties = append(obj.Properties, &ast.Property{
|
||||||
|
Key: &ast.Identifier{Name: k},
|
||||||
|
Value: op[k],
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
}
|
}
|
||||||
if t.Options.Every != 0 {
|
|
||||||
d := ast.Duration{Magnitude: int64(t.Options.Every), Unit: "ns"}
|
ok, err := edit.Option(parsed, "task", editFunc)
|
||||||
op["every"] = &ast.DurationLiteral{Values: []ast.Duration{d}}
|
|
||||||
}
|
|
||||||
if t.Options.Cron != "" {
|
|
||||||
op["cron"] = &ast.StringLiteral{Value: t.Options.Cron}
|
|
||||||
}
|
|
||||||
if t.Options.Offset != 0 {
|
|
||||||
d := ast.Duration{Magnitude: int64(t.Options.Offset), Unit: "ns"}
|
|
||||||
op["offset"] = &ast.DurationLiteral{Values: []ast.Duration{d}}
|
|
||||||
}
|
|
||||||
ok, err := edit.Option(parsed, "task", edit.OptionObjectFn(op))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.New("unable to edit option")
|
return errors.New("unable to edit option")
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Options.Clear()
|
t.Options.Clear()
|
||||||
s := ast.Format(parsed)
|
s := ast.Format(parsed)
|
||||||
t.Flux = &s
|
t.Flux = &s
|
||||||
|
|
|
@ -31,7 +31,7 @@ const maxRetry = 10
|
||||||
// Options are the task-related options that can be specified in a Flux script.
|
// Options are the task-related options that can be specified in a Flux script.
|
||||||
type Options struct {
|
type Options struct {
|
||||||
// Name is a non optional name designator for each task.
|
// Name is a non optional name designator for each task.
|
||||||
Name string `json:"options,omitempty"`
|
Name string `json:"name,omitempty"`
|
||||||
|
|
||||||
// Cron is a cron style time schedule that can be used in place of Every.
|
// Cron is a cron style time schedule that can be used in place of Every.
|
||||||
Cron string `json:"cron,omitempty"`
|
Cron string `json:"cron,omitempty"`
|
||||||
|
@ -60,7 +60,12 @@ func (o *Options) Clear() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o *Options) IsZero() bool {
|
func (o *Options) IsZero() bool {
|
||||||
return o.Name == "" && o.Cron == "" && o.Every == 0 && o.Offset == 0 && o.Concurrency == 0 && o.Retry == 0
|
return o.Name == "" &&
|
||||||
|
o.Cron == "" &&
|
||||||
|
o.Every == 0 &&
|
||||||
|
o.Offset == 0 &&
|
||||||
|
o.Concurrency == 0 &&
|
||||||
|
o.Retry == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// FromScript extracts Options from a Flux script.
|
// FromScript extracts Options from a Flux script.
|
||||||
|
|
44
task_test.go
44
task_test.go
|
@ -40,12 +40,12 @@ func TestOptionsEdit(t *testing.T) {
|
||||||
if err := tu.UpdateFlux(`option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil {
|
if err := tu.UpdateFlux(`option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
t.Run("test zeroing", func(t *testing.T) {
|
t.Run("zeroing", func(t *testing.T) {
|
||||||
if tu.Options.Every != 0 {
|
if tu.Options.Every != 0 {
|
||||||
t.Errorf("expected Every to be zeroed but it wasn't")
|
t.Errorf("expected Every to be zeroed but it wasn't")
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
t.Run("test fmt string", func(t *testing.T) {
|
t.Run("fmt string", func(t *testing.T) {
|
||||||
t.Skip("This won't work until the flux formatter formats durations in a nicer way")
|
t.Skip("This won't work until the flux formatter formats durations in a nicer way")
|
||||||
expected := `option task = {every: 10s, name: "foo"}
|
expected := `option task = {every: 10s, name: "foo"}
|
||||||
from(bucket:"x")
|
from(bucket:"x")
|
||||||
|
@ -54,7 +54,7 @@ from(bucket:"x")
|
||||||
t.Errorf("got the wrong task back, expected %s,\n got %s\n", expected, *tu.Flux)
|
t.Errorf("got the wrong task back, expected %s,\n got %s\n", expected, *tu.Flux)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
t.Run("test replacement", func(t *testing.T) {
|
t.Run("replacement", func(t *testing.T) {
|
||||||
op, err := options.FromScript(*tu.Flux)
|
op, err := options.FromScript(*tu.Flux)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
|
@ -64,13 +64,12 @@ from(bucket:"x")
|
||||||
t.Fail()
|
t.Fail()
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
t.Run("test add new option", func(t *testing.T) {
|
t.Run("add new option", func(t *testing.T) {
|
||||||
tu := &platform.TaskUpdate{}
|
tu := &platform.TaskUpdate{}
|
||||||
tu.Options.Offset = 30 * time.Second
|
tu.Options.Offset = 30 * time.Second
|
||||||
if err := tu.UpdateFlux(`option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil {
|
if err := tu.UpdateFlux(`option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
op, err := options.FromScript(*tu.Flux)
|
op, err := options.FromScript(*tu.Flux)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
|
@ -79,5 +78,38 @@ from(bucket:"x")
|
||||||
t.Fatalf("expected every to be 30s but was %s", op.Every)
|
t.Fatalf("expected every to be 30s but was %s", op.Every)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
t.Run("switching from every to cron", func(t *testing.T) {
|
||||||
|
tu := &platform.TaskUpdate{}
|
||||||
|
tu.Options.Cron = "* * * * *"
|
||||||
|
if err := tu.UpdateFlux(`option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
op, err := options.FromScript(*tu.Flux)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
if op.Every != 0 {
|
||||||
|
t.Fatalf("expected every to be 0 but was %s", op.Every)
|
||||||
|
}
|
||||||
|
if op.Cron != "* * * * *" {
|
||||||
|
t.Fatalf("expected Cron to be \"* * * * *\" but was %s", op.Cron)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
t.Run("switching from cron to every", func(t *testing.T) {
|
||||||
|
tu := &platform.TaskUpdate{}
|
||||||
|
tu.Options.Every = 10 * time.Second
|
||||||
|
if err := tu.UpdateFlux(`option task = {cron: "* * * * *", name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
op, err := options.FromScript(*tu.Flux)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
if op.Every != 10*time.Second {
|
||||||
|
t.Fatalf("expected every to be 10s but was %s", op.Every)
|
||||||
|
}
|
||||||
|
if op.Cron != "" {
|
||||||
|
t.Fatalf("expected Cron to be \"\" but was %s", op.Cron)
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue