Merge pull request #10954 from influxdata/feature/update_task_options

feat(tasks): update task options from api
pull/11023/head
Jorge Landivar 2019-01-11 14:58:10 -06:00 committed by GitHub
commit f8f1c1e7b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 368 additions and 42 deletions

5
go.mod
View File

@ -128,14 +128,15 @@ require (
github.com/yudai/pp v2.0.1+incompatible // indirect
go.uber.org/zap v1.9.1
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519
golang.org/x/net v0.0.0-20181106065722-10aee1819953
golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2
golang.org/x/tools v0.0.0-20181221154417-3ad2d988d5e2
google.golang.org/api v0.0.0-20181021000519-a2651947f503
google.golang.org/grpc v1.15.0
google.golang.org/genproto v0.0.0-20190108161440-ae2f86662275 // indirect
google.golang.org/grpc v1.16.0
gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d // indirect
gopkg.in/ldap.v2 v2.5.1 // indirect
gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce // indirect

10
go.sum
View File

@ -413,8 +413,8 @@ golang.org/x/exp v0.0.0-20181112044915-a3060d491354/go.mod h1:CJ0aWSM057203Lf6IL
golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519 h1:x6rhz8Y9CjbgQkccRGmELH6K+LJj7tOoh3XWeC1yaQM=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181106065722-10aee1819953 h1:LuZIitY8waaxUfNIdtajyE/YzA/zyf0YxXG27VpLrkg=
golang.org/x/net v0.0.0-20181106065722-10aee1819953/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4 h1:99CA0JJbUX4ozCnLon680Jc9e0T1i8HCaLVJMwtI8Hc=
golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@ -445,9 +445,11 @@ google.golang.org/appengine v1.1.0 h1:igQkv0AAhEIvTEpD5LIpAfav2eeVO9HBTjvKHVJPRS
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190108161440-ae2f86662275 h1:9oFlwfEGIvmxXTcY53ygNyxIQtWciRHjrnUvZJCYXYU=
google.golang.org/genproto v0.0.0-20190108161440-ae2f86662275/go.mod h1:7Ep/1NZk928CDR8SjdVbjWNpdIf6nzjE3BTgJDr2Atg=
google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.15.0 h1:Az/KuahOM4NAidTEuJCv/RonAA7rYsTPkqXVjr+8OOw=
google.golang.org/grpc v1.15.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio=
google.golang.org/grpc v1.16.0 h1:dz5IJGuC2BB7qXR5AyHNwAUBhZscK2xVez7mznh72sY=
google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio=
gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d h1:TxyelI5cVkbREznMhfzycHdkp5cLA7DpE+GKjSslYhM=
gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d/go.mod h1:cuepJuh7vyXfUyUwEgHQXw849cJrilpS5NeIjOWESAw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@ -385,7 +385,6 @@ func (h *TaskHandler) handleUpdateTask(w http.ResponseWriter, r *http.Request) {
EncodeError(ctx, err, w)
return
}
task, err := h.TaskService.UpdateTask(ctx, req.TaskID, req.Update)
if err != nil {
EncodeError(ctx, err, w)

View File

@ -0,0 +1,4 @@
// Package tests just has tests
package tests
// This file is just to keep linters and go build from complaining.

137
task.go
View File

@ -2,6 +2,16 @@ package influxdb
import (
"context"
"encoding/json"
"errors"
"time"
"github.com/influxdata/flux"
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux/ast/edit"
"github.com/influxdata/flux/parser"
"github.com/influxdata/flux/values"
"github.com/influxdata/influxdb/task/options"
)
const (
@ -76,10 +86,135 @@ type TaskService interface {
ForceRun(ctx context.Context, taskID ID, scheduledFor int64) (*Run, error)
}
// TaskUpdate represents updates to a task
// TaskUpdate represents updates to a task. Options updates override any options set in the Flux field.
type TaskUpdate struct {
Flux *string `json:"flux,omitempty"`
Status *string `json:"status,omitempty"`
// Options gets unmarshalled from json as if it was flat, with the same level as Flux and Status.
Options options.Options // when we unmarshal this gets unmarshalled from flat key-values
}
func (t *TaskUpdate) UnmarshalJSON(data []byte) error {
// this is a type so we can marshal string into durations nicely
jo := struct {
Flux *string `json:"flux,omitempty"`
Status *string `json:"status,omitempty"`
Name string `json:"options,omitempty"`
// Cron is a cron style time schedule that can be used in place of Every.
Cron string `json:"cron,omitempty"`
// Every represents a fixed period to repeat execution.
// It gets marshalled from a string duration, i.e.: "10s" is 10 seconds
Every flux.Duration `json:"every,omitempty"`
// Offset represents a delay before execution.
// It gets marshalled from a string duration, i.e.: "10s" is 10 seconds
Offset flux.Duration `json:"offset,omitempty"`
Concurrency int64 `json:"concurrency,omitempty"`
Retry int64 `json:"retry,omitempty"`
}{}
if err := json.Unmarshal(data, &jo); err != nil {
return err
}
t.Options.Name = jo.Name
t.Options.Cron = jo.Cron
t.Options.Every = time.Duration(jo.Every)
t.Options.Offset = time.Duration(jo.Offset)
t.Options.Concurrency = jo.Concurrency
t.Options.Retry = jo.Retry
t.Flux = jo.Flux
t.Status = jo.Status
return nil
}
func (t TaskUpdate) MarshalJSON() ([]byte, error) {
jo := struct {
Flux *string `json:"flux,omitempty"`
Status *string `json:"status,omitempty"`
Name string `json:"options,omitempty"`
// Cron is a cron style time schedule that can be used in place of Every.
Cron string `json:"cron,omitempty"`
// Every represents a fixed period to repeat execution.
Every flux.Duration `json:"every,omitempty"`
// Offset represents a delay before execution.
Offset flux.Duration `json:"offset,omitempty"`
Concurrency int64 `json:"concurrency,omitempty"`
Retry int64 `json:"retry,omitempty"`
}{}
jo.Name = t.Options.Name
jo.Cron = t.Options.Cron
jo.Every = flux.Duration(t.Options.Every)
jo.Offset = flux.Duration(t.Options.Offset)
jo.Concurrency = t.Options.Concurrency
jo.Retry = t.Options.Retry
jo.Flux = t.Flux
jo.Status = t.Status
return json.Marshal(jo)
}
func (t TaskUpdate) Validate() error {
switch {
case t.Options.Every != 0 && t.Options.Cron != "":
return errors.New("cannot specify both every and cron")
case t.Flux == nil && t.Status == nil && t.Options.IsZero():
return errors.New("cannot update task without content")
}
return nil
}
// UpdateFlux updates the TaskUpdate to go from updating options to updating a flux string, that now has those updated options in it
// It zeros the options in the TaskUpdate.
func (t *TaskUpdate) UpdateFlux(oldFlux string) error {
if t.Flux != nil {
return nil
}
parsedPKG := parser.ParseSource(oldFlux)
if ast.Check(parsedPKG) > 0 {
return ast.GetError(parsedPKG)
}
parsed := parsedPKG.Files[0] //TODO: remove this line when flux 0.14 is upgraded into platform
if t.Options.Every != 0 && t.Options.Cron != "" {
return errors.New("cannot specify both every and cron")
}
// so we don't allocate if we are just changing the status
if t.Options.Name != "" || t.Options.Every != 0 || t.Options.Cron != "" || t.Options.Offset != 0 {
op := make(map[string]values.Value, 4)
if t.Options.Name != "" {
op["name"] = values.NewString(t.Options.Name)
}
if t.Options.Every != 0 {
op["every"] = values.NewDuration(values.Duration(t.Options.Every))
}
if t.Options.Cron != "" {
op["cron"] = values.NewString(t.Options.Cron)
}
if t.Options.Offset != 0 {
op["offset"] = values.NewDuration(values.Duration(t.Options.Offset))
}
ok, err := edit.Option(parsed, "task", edit.OptionObjectFn(op))
if err != nil {
return err
}
if !ok {
return errors.New("unable to edit option")
}
t.Options.Clear()
s := ast.Format(parsed)
t.Flux = &s
return nil
}
return nil
}
// TaskFilter represents a set of filters that restrict the returned results

View File

@ -205,16 +205,28 @@ func (s *Store) UpdateTask(ctx context.Context, req backend.UpdateTaskRequest) (
return backend.ErrTaskNotFound
}
res.OldScript = string(v)
newScript := req.Script
if res.OldScript == "" {
return errors.New("task script not stored properly")
}
var newScript string
if !req.Options.IsZero() || req.Script != "" {
if err = req.UpdateFlux(res.OldScript); err != nil {
return err
}
newScript = req.Script
}
if req.Script == "" {
// Need to build op from existing script.
op, err = options.FromScript(string(v))
op, err = options.FromScript(res.OldScript)
if err != nil {
return err
}
newScript = string(v)
newScript = res.OldScript
} else {
op, err = options.FromScript(req.Script)
if err != nil {
return err
}
if err := bt.Put(encodedID, []byte(req.Script)); err != nil {
return err
}

View File

@ -69,7 +69,6 @@ func (s *inmem) UpdateTask(_ context.Context, req UpdateTaskRequest) (UpdateTask
if err != nil {
return res, err
}
idStr := req.ID.String()
s.mu.Lock()
@ -83,6 +82,9 @@ func (s *inmem) UpdateTask(_ context.Context, req UpdateTaskRequest) (UpdateTask
found = true
res.OldScript = t.Script
if err = req.UpdateFlux(t.Script); err != nil {
return res, err
}
if req.Script == "" {
op, err = options.FromScript(t.Script)
if err != nil {
@ -113,7 +115,6 @@ func (s *inmem) UpdateTask(_ context.Context, req UpdateTaskRequest) (UpdateTask
s.meta[req.ID] = stm
}
res.NewMeta = stm
return res, nil
}

View File

@ -11,6 +11,10 @@ import (
"strings"
"time"
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux/ast/edit"
"github.com/influxdata/flux/parser"
"github.com/influxdata/flux/values"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/task/options"
)
@ -172,6 +176,58 @@ type UpdateTaskRequest struct {
// The new desired task status.
// If empty, do not modify the existing status.
Status TaskStatus
// These options are for editing options via request. Zeroed options will be ignored.
options.Options
}
// UpdateFlux updates the taskupdate to go from updating options to updating a flux string, that now has those updated options in it
// It zeros the options in the TaskUpdate.
func (t *UpdateTaskRequest) UpdateFlux(oldFlux string) error {
if t.Script != "" {
oldFlux = t.Script
}
parsedPKG := parser.ParseSource(oldFlux)
if ast.Check(parsedPKG) > 0 {
return ast.GetError(parsedPKG)
}
parsed := parsedPKG.Files[0] //TODO: remove this line when flux 0.14 is upgraded into platform
// so we don't allocate if we are just changing the status
if t.Every != 0 && t.Cron != "" {
return errors.New("cannot specify both every and cron")
}
if t.Name != "" || !t.IsZero() {
op := make(map[string]values.Value, 5)
if t.Name != "" {
op["name"] = values.NewString(t.Name)
}
if t.Every != 0 {
op["every"] = values.NewDuration(values.Duration(t.Every))
}
if t.Cron != "" {
op["cron"] = values.NewString(t.Cron)
}
if t.Offset != 0 {
op["offset"] = values.NewDuration(values.Duration(t.Offset))
}
if t.Concurrency != 0 {
op["concurrency"] = values.NewInt(t.Concurrency)
}
if t.Retry != 0 {
op["retry"] = values.NewInt(t.Retry)
}
ok, err := edit.Option(parsed, "task", edit.OptionObjectFn(op))
if err != nil {
return err
}
if !ok {
return errors.New("unable to edit option")
}
t.Options.Clear()
t.Script = ast.Format(parsed)
return nil
}
return nil
}
// UpdateTaskResult describes the result of modifying a single task.
@ -386,21 +442,23 @@ func (StoreValidation) CreateArgs(req CreateTaskRequest) (options.Options, error
// If the update contains neither a new script nor a new status, or if the script is invalid, an error is returned.
func (StoreValidation) UpdateArgs(req UpdateTaskRequest) (options.Options, error) {
var missing []string
var o options.Options
if req.Script == "" && req.Status == "" {
missing = append(missing, "script or status")
} else {
if req.Script != "" {
var err error
o, err = options.FromScript(req.Script)
if err != nil {
return o, err
}
}
if err := req.Status.validate(true); err != nil {
o := req.Options
if req.Script == "" && req.Status == "" && req.Options.IsZero() {
missing = append(missing, "script or status or options")
}
if req.Script != "" {
err := req.UpdateFlux(req.Script)
if err != nil {
return o, err
}
req.Clear()
o, err = options.FromScript(req.Script)
if err != nil {
return o, err
}
}
if err := req.Status.validate(true); err != nil {
return o, err
}
if !req.ID.Valid() {

View File

@ -8,6 +8,7 @@ import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/snowflake"
"github.com/influxdata/influxdb/task/backend"
@ -173,7 +174,7 @@ from(bucket:"y") |> range(start:-1h)`
task = res.NewTask
meta = res.NewMeta
if task.Script != script2 {
t.Fatalf("Task script unexpectedly updated: %s", task.Script)
t.Fatalf("Task script unexpectedly updated: newtask:\n%s\n, oldtask:\n%s, \ndiff:\n %s", task.Script, script2, cmp.Diff(task.Script, script2))
}
if task.Name != "a task2" {
t.Fatalf("Task name unexpectedly updated: %q", task.Name)

View File

@ -31,20 +31,36 @@ const maxRetry = 10
// Options are the task-related options that can be specified in a Flux script.
type Options struct {
// Name is a non optional name designator for each task.
Name string
Name string `json:"options,omitempty"`
// Cron is a cron style time schedule that can be used in place of Every.
Cron string
Cron string `json:"cron,omitempty"`
// Every represents a fixed period to repeat execution.
Every time.Duration
// this can be unmarshaled from json as a string i.e.: "1d" will unmarshal as 1 day
Every time.Duration `json:"every,omitempty"`
// Offset represents a delay before execution.
Offset time.Duration
// this can be unmarshaled from json as a string i.e.: "1d" will unmarshal as 1 day
Offset time.Duration `json:"offset,omitempty"`
Concurrency int64
Concurrency int64 `json:"concurrency,omitempty"`
Retry int64
Retry int64 `json:"retry,omitempty"`
}
// Clear clears out all options in the options struct, it us useful if you wish to reuse it.
func (o *Options) Clear() {
o.Name = ""
o.Cron = ""
o.Every = 0
o.Offset = 0
o.Concurrency = 0
o.Retry = 0
}
func (o *Options) IsZero() bool {
return o.Name == "" && o.Cron == "" && o.Every == 0 && o.Offset == 0 && o.Concurrency == 0 && o.Retry == 0
}
// FromScript extracts Options from a Flux script.
@ -72,7 +88,6 @@ func FromScript(script string) (Options, error) {
return opt, errors.New("task not defined")
}
optObject := task.Object()
nameVal, ok := optObject.Get("name")
if !ok {
return opt, errors.New("missing name in task options")
@ -82,12 +97,12 @@ func FromScript(script string) (Options, error) {
return opt, err
}
opt.Name = nameVal.Str()
crVal, cronOK := optObject.Get("cron")
everyVal, everyOK := optObject.Get("every")
if cronOK && everyOK {
return opt, errors.New("cannot use both cron and every in task options")
}
if !cronOK && !everyOK {
return opt, errors.New("cron or every is required")
}

View File

@ -102,10 +102,9 @@ func (p pAdapter) CreateTask(ctx context.Context, t *platform.Task) error {
}
func (p pAdapter) UpdateTask(ctx context.Context, id platform.ID, upd platform.TaskUpdate) (*platform.Task, error) {
if upd.Flux == nil && upd.Status == nil {
return nil, errors.New("cannot update task without content")
if err := upd.Validate(); err != nil {
return nil, err
}
req := backend.UpdateTaskRequest{ID: id}
if upd.Flux != nil {
req.Script = *upd.Flux
@ -113,11 +112,14 @@ func (p pAdapter) UpdateTask(ctx context.Context, id platform.ID, upd platform.T
if upd.Status != nil {
req.Status = backend.TaskStatus(*upd.Status)
}
req.Options = upd.Options
res, err := p.s.UpdateTask(ctx, req)
if err != nil {
return nil, err
}
if res.NewTask.Script == "" {
return nil, errors.New("script not defined in the store")
}
opts, err := options.FromScript(res.NewTask.Script)
if err != nil {
return nil, err

View File

@ -219,6 +219,21 @@ func testTaskCRUD(t *testing.T, sys *System) {
t.Fatalf("expected task status to be inactive, got %q", f.Status)
}
// Update task: just update an option.
newStatus = string(backend.TaskActive)
newFlux = fmt.Sprintf(scriptDifferentName, 98)
f, err = sys.ts.UpdateTask(sys.Ctx, origID, platform.TaskUpdate{Options: options.Options{Name: "task-changed #98"}})
if err != nil {
t.Fatal(err)
}
if f.Flux != newFlux {
diff := cmp.Diff(f.Flux, newFlux)
t.Fatalf("flux unexpected updated: %s", diff)
}
if f.Status != newStatus {
t.Fatalf("expected task status to be active, got %q", f.Status)
}
// Delete task.
if err := sys.ts.DeleteTask(sys.Ctx, origID); err != nil {
t.Fatal(err)
@ -802,12 +817,26 @@ func creds(t *testing.T, s *System) (orgID, userID platform.ID, token string) {
return o, u, tok
}
const scriptFmt = `option task = {
const (
scriptFmt = `option task = {
name: "task #%d",
cron: "* * * * *",
offset: 5s,
concurrency: 100,
}
from(bucket:"b") |> toHTTP(url:"http://example.com")`
from(bucket: "b")
|> toHTTP(url: "http://example.com")`
scriptDifferentName = `option task = {
name: "task-changed #%d",
cron: "* * * * *",
offset: 5s,
concurrency: 100,
}
from(bucket: "b")
|> toHTTP(url: "http://example.com")`
)
var idGen = snowflake.NewIDGenerator()

67
task_test.go Normal file
View File

@ -0,0 +1,67 @@
package influxdb_test
import (
"encoding/json"
"testing"
"time"
platform "github.com/influxdata/influxdb"
_ "github.com/influxdata/influxdb/query/builtin"
"github.com/influxdata/influxdb/task/options"
)
func TestOptionsMarshal(t *testing.T) {
tu := &platform.TaskUpdate{}
// this is to make sure that string durations are properly marshaled into durations
if err := json.Unmarshal([]byte(`{"every":"10s", "offset":"1h"}`), tu); err != nil {
t.Fatal(err)
}
if tu.Options.Every != 10*time.Second {
t.Fatalf("option.every not properly unmarshaled, expected 10s got %s", tu.Options.Every)
}
if tu.Options.Offset != time.Hour {
t.Fatalf("option.every not properly unmarshaled, expected 1h got %s", tu.Options.Offset)
}
tu = &platform.TaskUpdate{}
// this is to make sure that string durations are properly marshaled into durations
if err := json.Unmarshal([]byte(`{"flux":"option task = {\n\tname: \"task #99\",\n\tcron: \"* * * * *\",\n\toffset: 5s,\n\tconcurrency: 100,\n}\nfrom(bucket:\"b\") |\u003e toHTTP(url:\"http://example.com\")"}`), tu); err != nil {
t.Fatal(err)
}
if tu.Flux == nil {
t.Fatalf("flux not properly unmarshaled, expected not nil but got nil")
}
}
func TestOptionsEdit(t *testing.T) {
tu := &platform.TaskUpdate{}
tu.Options.Every = 10 * time.Second
if err := tu.UpdateFlux(`option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil {
t.Fatal(err)
}
t.Run("test zeroing", func(t *testing.T) {
if tu.Options.Every != 0 {
t.Errorf("expected Every to be zeroed but it wasn't")
}
})
t.Run("test fmt string", func(t *testing.T) {
t.Skip("This won't work until the flux formatter formats durations in a nicer way")
expected := `option task = {every: 10s, name: "foo"}
from(bucket:"x")
|> range(start:-1h)`
if *tu.Flux != expected {
t.Errorf("got the wrong task back, expected %s,\n got %s\n", expected, *tu.Flux)
}
})
t.Run("test replacement", func(t *testing.T) {
op, err := options.FromScript(*tu.Flux)
if err != nil {
t.Error(err)
}
if op.Every != 10*time.Second {
t.Logf("expected every to be 10s but was %s", op.Every)
t.Fail()
}
})
}