273 lines
7.3 KiB
Go
273 lines
7.3 KiB
Go
// Package options provides ways to extract the task-related options from a Flux script.
|
|
package options
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/influxdata/flux"
|
|
"github.com/influxdata/flux/semantic"
|
|
"github.com/influxdata/flux/values"
|
|
"github.com/influxdata/influxdb/pkg/pointer"
|
|
cron "gopkg.in/robfig/cron.v2"
|
|
)
|
|
|
|
// optionCache is enabled for tests, to work around https://github.com/influxdata/platform/issues/484.
|
|
var optionCache map[string]Options
|
|
|
|
var optionCacheMu sync.Mutex
|
|
|
|
// EnableScriptCacheForTest is used as a workaround for https://github.com/influxdata/platform/issues/484,
|
|
// and should be removed after that issue is addressed.
|
|
// Do not call this method in production code, as it will leak memory.
|
|
func EnableScriptCacheForTest() {
|
|
optionCache = make(map[string]Options)
|
|
}
|
|
|
|
const maxConcurrency = 100
|
|
const maxRetry = 10
|
|
|
|
// Options are the task-related options that can be specified in a Flux script.
|
|
type Options struct {
|
|
// Name is a non optional name designator for each task.
|
|
Name string `json:"name,omitempty"`
|
|
|
|
// Cron is a cron style time schedule that can be used in place of Every.
|
|
Cron string `json:"cron,omitempty"`
|
|
|
|
// Every represents a fixed period to repeat execution.
|
|
// this can be unmarshaled from json as a string i.e.: "1d" will unmarshal as 1 day
|
|
Every time.Duration `json:"every,omitempty"`
|
|
|
|
// Offset represents a delay before execution.
|
|
// this can be unmarshaled from json as a string i.e.: "1d" will unmarshal as 1 day
|
|
Offset *time.Duration `json:"offset,omitempty"`
|
|
|
|
Concurrency *int64 `json:"concurrency,omitempty"`
|
|
|
|
Retry *int64 `json:"retry,omitempty"`
|
|
}
|
|
|
|
// Clear clears out all options in the options struct, it us useful if you wish to reuse it.
|
|
func (o *Options) Clear() {
|
|
o.Name = ""
|
|
o.Cron = ""
|
|
o.Every = 0
|
|
o.Offset = nil
|
|
o.Concurrency = nil
|
|
o.Retry = nil
|
|
}
|
|
|
|
func (o *Options) IsZero() bool {
|
|
return o.Name == "" &&
|
|
o.Cron == "" &&
|
|
o.Every == 0 &&
|
|
o.Offset == nil &&
|
|
o.Concurrency == nil &&
|
|
o.Retry == nil
|
|
}
|
|
|
|
// All the task option names we accept.
|
|
const (
|
|
optName = "name"
|
|
optCron = "cron"
|
|
optEvery = "every"
|
|
optOffset = "offset"
|
|
optConcurrency = "concurrency"
|
|
optRetry = "retry"
|
|
)
|
|
|
|
// FromScript extracts Options from a Flux script.
|
|
func FromScript(script string) (Options, error) {
|
|
if optionCache != nil {
|
|
optionCacheMu.Lock()
|
|
opt, ok := optionCache[script]
|
|
optionCacheMu.Unlock()
|
|
|
|
if ok {
|
|
return opt, nil
|
|
}
|
|
}
|
|
opt := Options{Retry: pointer.Int64(1), Concurrency: pointer.Int64(1)}
|
|
|
|
_, scope, err := flux.Eval(script)
|
|
if err != nil {
|
|
return opt, err
|
|
}
|
|
|
|
// pull options from the program scope
|
|
task, ok := scope.Lookup("task")
|
|
if !ok {
|
|
return opt, errors.New("missing required option: 'task'")
|
|
}
|
|
optObject := task.Object()
|
|
if err := validateOptionNames(optObject); err != nil {
|
|
return opt, err
|
|
}
|
|
|
|
nameVal, ok := optObject.Get(optName)
|
|
if !ok {
|
|
return opt, errors.New("missing name in task options")
|
|
}
|
|
|
|
if err := checkNature(nameVal.PolyType().Nature(), semantic.String); err != nil {
|
|
return opt, err
|
|
}
|
|
opt.Name = nameVal.Str()
|
|
crVal, cronOK := optObject.Get(optCron)
|
|
everyVal, everyOK := optObject.Get(optEvery)
|
|
if cronOK && everyOK {
|
|
return opt, errors.New("cannot use both cron and every in task options")
|
|
}
|
|
|
|
if !cronOK && !everyOK {
|
|
return opt, errors.New("cron or every is required")
|
|
}
|
|
|
|
if cronOK {
|
|
if err := checkNature(crVal.PolyType().Nature(), semantic.String); err != nil {
|
|
return opt, err
|
|
}
|
|
opt.Cron = crVal.Str()
|
|
}
|
|
|
|
if everyOK {
|
|
if err := checkNature(everyVal.PolyType().Nature(), semantic.Duration); err != nil {
|
|
return opt, err
|
|
}
|
|
opt.Every = everyVal.Duration().Duration()
|
|
}
|
|
|
|
if offsetVal, ok := optObject.Get(optOffset); ok {
|
|
if err := checkNature(offsetVal.PolyType().Nature(), semantic.Duration); err != nil {
|
|
return opt, err
|
|
}
|
|
opt.Offset = pointer.Duration(offsetVal.Duration().Duration())
|
|
}
|
|
|
|
if concurrencyVal, ok := optObject.Get(optConcurrency); ok {
|
|
if err := checkNature(concurrencyVal.PolyType().Nature(), semantic.Int); err != nil {
|
|
return opt, err
|
|
}
|
|
opt.Concurrency = pointer.Int64(concurrencyVal.Int())
|
|
}
|
|
|
|
if retryVal, ok := optObject.Get(optRetry); ok {
|
|
if err := checkNature(retryVal.PolyType().Nature(), semantic.Int); err != nil {
|
|
return opt, err
|
|
}
|
|
opt.Retry = pointer.Int64(retryVal.Int())
|
|
}
|
|
|
|
if err := opt.Validate(); err != nil {
|
|
return opt, err
|
|
}
|
|
|
|
if optionCache != nil {
|
|
optionCacheMu.Lock()
|
|
optionCache[script] = opt
|
|
optionCacheMu.Unlock()
|
|
}
|
|
|
|
return opt, nil
|
|
}
|
|
|
|
// Validate returns an error if the options aren't valid.
|
|
func (o *Options) Validate() error {
|
|
var errs []string
|
|
if o.Name == "" {
|
|
errs = append(errs, "name required")
|
|
}
|
|
|
|
cronPresent := o.Cron != ""
|
|
everyPresent := o.Every != 0
|
|
if cronPresent == everyPresent {
|
|
// They're both present or both missing.
|
|
errs = append(errs, "must specify exactly one of either cron or every")
|
|
} else if cronPresent {
|
|
_, err := cron.Parse(o.Cron)
|
|
if err != nil {
|
|
errs = append(errs, "cron invalid: "+err.Error())
|
|
}
|
|
} else if everyPresent {
|
|
if o.Every < time.Second {
|
|
errs = append(errs, "every option must be at least 1 second")
|
|
} else if o.Every.Truncate(time.Second) != o.Every {
|
|
errs = append(errs, "every option must be expressible as whole seconds")
|
|
}
|
|
}
|
|
|
|
if o.Offset != nil && o.Offset.Truncate(time.Second) != *o.Offset {
|
|
// For now, allowing negative offset delays. Maybe they're useful for forecasting?
|
|
errs = append(errs, "offset option must be expressible as whole seconds")
|
|
}
|
|
if o.Concurrency != nil {
|
|
if *o.Concurrency < 1 {
|
|
errs = append(errs, "concurrency must be at least 1")
|
|
} else if *o.Concurrency > maxConcurrency {
|
|
errs = append(errs, fmt.Sprintf("concurrency exceeded max of %d", maxConcurrency))
|
|
}
|
|
}
|
|
if o.Retry != nil {
|
|
if *o.Retry < 1 {
|
|
errs = append(errs, "retry must be at least 1")
|
|
} else if *o.Retry > maxRetry {
|
|
errs = append(errs, fmt.Sprintf("retry exceeded max of %d", maxRetry))
|
|
}
|
|
}
|
|
|
|
if len(errs) == 0 {
|
|
return nil
|
|
}
|
|
|
|
return fmt.Errorf("invalid options: %s", strings.Join(errs, ", "))
|
|
}
|
|
|
|
// EffectiveCronString returns the effective cron string of the options.
|
|
// If the cron option was specified, it is returned.
|
|
// If the every option was specified, it is converted into a cron string using "@every".
|
|
// Otherwise, the empty string is returned.
|
|
// The value of the offset option is not considered.
|
|
func (o *Options) EffectiveCronString() string {
|
|
if o.Cron != "" {
|
|
return o.Cron
|
|
}
|
|
if o.Every > 0 {
|
|
return "@every " + o.Every.String()
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// checkNature returns a clean error of got and expected dont match.
|
|
func checkNature(got, exp semantic.Nature) error {
|
|
if got != exp {
|
|
return fmt.Errorf("unexpected kind: got %q expected %q", got, exp)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// validateOptionNames returns an error if any keys in the option object o
|
|
// do not match an expected option name.
|
|
func validateOptionNames(o values.Object) error {
|
|
var unexpected []string
|
|
o.Range(func(name string, _ values.Value) {
|
|
switch name {
|
|
case optName, optCron, optEvery, optOffset, optConcurrency, optRetry:
|
|
// Known option. Nothing to do.
|
|
default:
|
|
unexpected = append(unexpected, name)
|
|
}
|
|
})
|
|
|
|
if len(unexpected) > 0 {
|
|
u := strings.Join(unexpected, ", ")
|
|
v := strings.Join([]string{optName, optCron, optEvery, optOffset, optConcurrency, optRetry}, ", ")
|
|
return fmt.Errorf("unknown task option(s): %s. valid options are %s", u, v)
|
|
}
|
|
|
|
return nil
|
|
}
|