From bcbb9df72e23a6a4ada8655706c677478118740f Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Thu, 5 Mar 2020 14:36:58 -0600 Subject: [PATCH] refactor(task): tasks will now use the flux language service (#17104) The tasks subsystem will now use the flux language service to parse and evaluate flux instead of directly interacting with the parser or runtime. This helps break the dependency on the libflux parser for the base influxdb package. This includes the task notification packages which were changed at the same time. --- check.go | 4 +- cmd/influxd/launcher/launcher.go | 3 +- http/check_service.go | 12 ++- http/check_test.go | 2 + http/query.go | 2 +- http/query_handler.go | 2 +- kv/check.go | 12 +-- kv/check_test.go | 5 +- kv/notification_rule_test.go | 5 +- kv/service.go | 11 ++- kv/task.go | 6 +- kv/task_test.go | 14 ++- notification/check/check.go | 2 +- notification/check/check_test.go | 3 +- notification/check/custom.go | 35 +++++--- notification/check/custom_test.go | 3 +- notification/check/deadman.go | 13 +-- notification/check/deadman_test.go | 3 +- notification/check/threshold.go | 17 ++-- notification/check/threshold_test.go | 3 +- notification/duration.go | 86 +++++++++++++++++-- notification/rule/slack_test.go | 3 +- query.go | 11 ++- query/fluxlang/service.go | 9 ++ query/service.go | 33 +++++++ task.go | 19 +++-- task/backend/analytical_storage_test.go | 5 +- task/backend/executor/executor_test.go | 5 +- task/backend/executor/limits.go | 4 +- task/backend/executor/limits_test.go | 3 +- task/options/options.go | 56 ++++++++---- task/options/options_test.go | 9 +- task/options/strconv.go | 109 ++++++++++++++++++++++++ task_test.go | 21 ++--- testing/checks.go | 2 + 35 files changed, 425 insertions(+), 107 deletions(-) create mode 100644 task/options/strconv.go diff --git a/check.go b/check.go index 7a4c6df1ff..c64a113604 100644 --- a/check.go +++ b/check.go @@ -13,14 +13,14 @@ const ( // Check represents the information required to generate a periodic check task. type Check interface { - Valid() error + Valid(lang FluxLanguageService) error Type() string ClearPrivateData() SetTaskID(ID) GetTaskID() ID GetOwnerID() ID SetOwnerID(ID) - GenerateFlux() (string, error) + GenerateFlux(lang FluxLanguageService) (string, error) json.Marshaler CRUDLogSetter diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 9d00cb8761..f9efd4e95a 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -480,7 +480,8 @@ func (m *Launcher) run(ctx context.Context) (err error) { } serviceConfig := kv.ServiceConfig{ - SessionLength: time.Duration(m.sessionLength) * time.Minute, + SessionLength: time.Duration(m.sessionLength) * time.Minute, + FluxLanguageService: fluxlang.DefaultService, } flushers := flushers{} diff --git a/http/check_service.go b/http/check_service.go index e950ff17a7..297e6aaf43 100644 --- a/http/check_service.go +++ b/http/check_service.go @@ -27,6 +27,7 @@ type CheckBackend struct { LabelService influxdb.LabelService UserService influxdb.UserService OrganizationService influxdb.OrganizationService + FluxLanguageService influxdb.FluxLanguageService } // NewCheckBackend returns a new instance of CheckBackend. @@ -41,6 +42,7 @@ func NewCheckBackend(log *zap.Logger, b *APIBackend) *CheckBackend { LabelService: b.LabelService, UserService: b.UserService, OrganizationService: b.OrganizationService, + FluxLanguageService: b.FluxLanguageService, } } @@ -56,6 +58,7 @@ type CheckHandler struct { LabelService influxdb.LabelService UserService influxdb.UserService OrganizationService influxdb.OrganizationService + FluxLanguageService influxdb.FluxLanguageService } const ( @@ -83,6 +86,7 @@ func NewCheckHandler(log *zap.Logger, b *CheckBackend) *CheckHandler { UserService: b.UserService, TaskService: b.TaskService, OrganizationService: b.OrganizationService, + FluxLanguageService: b.FluxLanguageService, } h.HandlerFunc("POST", prefixChecks, h.handlePostCheck) h.HandlerFunc("GET", prefixChecks, h.handleGetChecks) @@ -294,7 +298,7 @@ func (h *CheckHandler) handleGetCheckQuery(w http.ResponseWriter, r *http.Reques h.HandleHTTPError(ctx, err, w) return } - flux, err := chk.GenerateFlux() + flux, err := chk.GenerateFlux(h.FluxLanguageService) if err != nil { h.HandleHTTPError(ctx, err, w) return @@ -429,7 +433,7 @@ func decodePostCheckRequest(r *http.Request) (postCheckRequest, error) { }, nil } -func decodePutCheckRequest(ctx context.Context, r *http.Request) (influxdb.CheckCreate, error) { +func decodePutCheckRequest(ctx context.Context, lang influxdb.FluxLanguageService, r *http.Request) (influxdb.CheckCreate, error) { params := httprouter.ParamsFromContext(ctx) id := params.ByName("id") if id == "" { @@ -467,7 +471,7 @@ func decodePutCheckRequest(ctx context.Context, r *http.Request) (influxdb.Check } chk.SetID(*i) - if err := chk.Valid(); err != nil { + if err := chk.Valid(lang); err != nil { return influxdb.CheckCreate{}, err } @@ -596,7 +600,7 @@ func (h *CheckHandler) mapNewCheckLabels(ctx context.Context, chk influxdb.Check // handlePutCheck is the HTTP handler for the PUT /api/v2/checks route. func (h *CheckHandler) handlePutCheck(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - chk, err := decodePutCheckRequest(ctx, r) + chk, err := decodePutCheckRequest(ctx, h.FluxLanguageService, r) if err != nil { h.log.Debug("Failed to decode request", zap.Error(err)) h.HandleHTTPError(ctx, err, w) diff --git a/http/check_test.go b/http/check_test.go index 64d23d6de2..4e5ca82fec 100644 --- a/http/check_test.go +++ b/http/check_test.go @@ -20,6 +20,7 @@ import ( "github.com/influxdata/influxdb/notification" "github.com/influxdata/influxdb/notification/check" "github.com/influxdata/influxdb/pkg/testttp" + "github.com/influxdata/influxdb/query/fluxlang" influxTesting "github.com/influxdata/influxdb/testing" "go.uber.org/zap/zaptest" ) @@ -34,6 +35,7 @@ func NewMockCheckBackend(t *testing.T) *CheckBackend { LabelService: mock.NewLabelService(), UserService: mock.NewUserService(), OrganizationService: mock.NewOrganizationService(), + FluxLanguageService: fluxlang.DefaultService, } } diff --git a/http/query.go b/http/query.go index de77e424b0..b99f8f94a1 100644 --- a/http/query.go +++ b/http/query.go @@ -157,7 +157,7 @@ func (r QueryRequest) Analyze(l influxdb.FluxLanguageService) (*QueryAnalysis, e func (r QueryRequest) analyzeFluxQuery(l influxdb.FluxLanguageService) (*QueryAnalysis, error) { a := &QueryAnalysis{} - pkg, err := l.Parse(r.Query) + pkg, err := query.Parse(l, r.Query) if pkg == nil { return nil, err } diff --git a/http/query_handler.go b/http/query_handler.go index 2586786fb5..50602a455e 100644 --- a/http/query_handler.go +++ b/http/query_handler.go @@ -219,7 +219,7 @@ func (h *FluxHandler) postFluxAST(w http.ResponseWriter, r *http.Request) { return } - pkg, err := h.LanguageService.Parse(request.Query) + pkg, err := query.Parse(h.LanguageService, request.Query) if err != nil { h.HandleHTTPError(ctx, &influxdb.Error{ Code: influxdb.EInvalid, diff --git a/kv/check.go b/kv/check.go index 3a7ecddbd4..3497f23949 100644 --- a/kv/check.go +++ b/kv/check.go @@ -269,7 +269,7 @@ func (s *Service) createCheck(ctx context.Context, tx Tx, c influxdb.CheckCreate c.SetCreatedAt(now) c.SetUpdatedAt(now) - if err := c.Valid(); err != nil { + if err := c.Valid(s.FluxLanguageService); err != nil { return err } @@ -291,7 +291,7 @@ func (s *Service) createCheck(ctx context.Context, tx Tx, c influxdb.CheckCreate } func (s *Service) createCheckTask(ctx context.Context, tx Tx, c influxdb.CheckCreate) (*influxdb.Task, error) { - script, err := c.GenerateFlux() + script, err := c.GenerateFlux(s.FluxLanguageService) if err != nil { return nil, err } @@ -314,7 +314,7 @@ func (s *Service) createCheckTask(ctx context.Context, tx Tx, c influxdb.CheckCr // PutCheck will put a check without setting an ID. func (s *Service) PutCheck(ctx context.Context, c influxdb.Check) error { - if err := c.Valid(); err != nil { + if err := c.Valid(s.FluxLanguageService); err != nil { return err } return s.kv.Update(ctx, func(tx Tx) error { @@ -393,7 +393,7 @@ func (s *Service) updateCheck(ctx context.Context, tx Tx, id influxdb.ID, chk in } chk.SetTaskID(current.GetTaskID()) - flux, err := chk.GenerateFlux() + flux, err := chk.GenerateFlux(s.FluxLanguageService) if err != nil { return nil, err } @@ -418,7 +418,7 @@ func (s *Service) updateCheck(ctx context.Context, tx Tx, id influxdb.ID, chk in chk.SetCreatedAt(current.GetCRUDLog().CreatedAt) chk.SetUpdatedAt(s.Now()) - if err := chk.Valid(); err != nil { + if err := chk.Valid(s.FluxLanguageService); err != nil { return nil, err } @@ -459,7 +459,7 @@ func (s *Service) patchCheck(ctx context.Context, tx Tx, id influxdb.ID, upd inf tu.Status = strPtr(string(*upd.Status)) } - if err := c.Valid(); err != nil { + if err := c.Valid(s.FluxLanguageService); err != nil { return nil, err } diff --git a/kv/check_test.go b/kv/check_test.go index d547f4f5e3..2b8b0b65fc 100644 --- a/kv/check_test.go +++ b/kv/check_test.go @@ -6,6 +6,7 @@ import ( "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/kv" + "github.com/influxdata/influxdb/query/fluxlang" influxdbtesting "github.com/influxdata/influxdb/testing" "go.uber.org/zap/zaptest" ) @@ -28,7 +29,9 @@ func initBoltCheckService(f influxdbtesting.CheckFields, t *testing.T) (influxdb } func initCheckService(s kv.Store, f influxdbtesting.CheckFields, t *testing.T) (influxdb.CheckService, string, func()) { - svc := kv.NewService(zaptest.NewLogger(t), s) + svc := kv.NewService(zaptest.NewLogger(t), s, kv.ServiceConfig{ + FluxLanguageService: fluxlang.DefaultService, + }) svc.IDGenerator = f.IDGenerator svc.TimeGenerator = f.TimeGenerator if f.TimeGenerator == nil { diff --git a/kv/notification_rule_test.go b/kv/notification_rule_test.go index 667e83cc87..4526eb142d 100644 --- a/kv/notification_rule_test.go +++ b/kv/notification_rule_test.go @@ -6,6 +6,7 @@ import ( "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/kv" + "github.com/influxdata/influxdb/query/fluxlang" influxdbtesting "github.com/influxdata/influxdb/testing" "go.uber.org/zap/zaptest" ) @@ -28,7 +29,9 @@ func initBoltNotificationRuleStore(f influxdbtesting.NotificationRuleFields, t * } func initNotificationRuleStore(s kv.Store, f influxdbtesting.NotificationRuleFields, t *testing.T) (influxdb.NotificationRuleStore, func()) { - svc := kv.NewService(zaptest.NewLogger(t), s) + svc := kv.NewService(zaptest.NewLogger(t), s, kv.ServiceConfig{ + FluxLanguageService: fluxlang.DefaultService, + }) svc.IDGenerator = f.IDGenerator svc.TimeGenerator = f.TimeGenerator if f.TimeGenerator == nil { diff --git a/kv/service.go b/kv/service.go index 252ca373d8..233d9a660c 100644 --- a/kv/service.go +++ b/kv/service.go @@ -31,6 +31,11 @@ type Service struct { IDGenerator influxdb.IDGenerator + // FluxLanguageService is used for parsing flux. + // If this is unset, operations that require parsing flux + // will fail. + FluxLanguageService influxdb.FluxLanguageService + // special ID generator that never returns bytes with backslash, // comma, or space. Used to support very specific encoding of org & // bucket into the old measurement in storage. @@ -73,14 +78,16 @@ func NewService(log *zap.Logger, kv Store, configs ...ServiceConfig) *Service { if s.clock == nil { s.clock = clock.New() } + s.FluxLanguageService = s.Config.FluxLanguageService return s } // ServiceConfig allows us to configure Services type ServiceConfig struct { - SessionLength time.Duration - Clock clock.Clock + SessionLength time.Duration + Clock clock.Clock + FluxLanguageService influxdb.FluxLanguageService } // Initialize creates Buckets needed. diff --git a/kv/task.go b/kv/task.go index cc117c7ba1..72d1291f93 100644 --- a/kv/task.go +++ b/kv/task.go @@ -565,7 +565,7 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate) // return nil, influxdb.ErrInvalidOwnerID // } - opt, err := options.FromScript(tc.Flux) + opt, err := options.FromScript(s.FluxLanguageService, tc.Flux) if err != nil { return nil, influxdb.ErrTaskOptionParse(err) } @@ -714,12 +714,12 @@ func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd inf // update the flux script if !upd.Options.IsZero() || upd.Flux != nil { - if err = upd.UpdateFlux(task.Flux); err != nil { + if err = upd.UpdateFlux(s.FluxLanguageService, task.Flux); err != nil { return nil, err } task.Flux = *upd.Flux - options, err := options.FromScript(*upd.Flux) + options, err := options.FromScript(s.FluxLanguageService, *upd.Flux) if err != nil { return nil, influxdb.ErrTaskOptionParse(err) } diff --git a/kv/task_test.go b/kv/task_test.go index dbde31f131..0862d2d032 100644 --- a/kv/task_test.go +++ b/kv/task_test.go @@ -13,6 +13,7 @@ import ( icontext "github.com/influxdata/influxdb/context" "github.com/influxdata/influxdb/kv" _ "github.com/influxdata/influxdb/query/builtin" + "github.com/influxdata/influxdb/query/fluxlang" "github.com/influxdata/influxdb/task/backend" "github.com/influxdata/influxdb/task/servicetest" "go.uber.org/zap/zaptest" @@ -27,7 +28,9 @@ func TestBoltTaskService(t *testing.T) { t.Fatal(err) } - service := kv.NewService(zaptest.NewLogger(t), store) + service := kv.NewService(zaptest.NewLogger(t), store, kv.ServiceConfig{ + FluxLanguageService: fluxlang.DefaultService, + }) ctx, cancelFunc := context.WithCancel(context.Background()) if err := service.Initialize(ctx); err != nil { t.Fatalf("error initializing urm service: %v", err) @@ -78,7 +81,10 @@ func newService(t *testing.T, ctx context.Context, c clock.Clock) *testService { t.Fatal("failed to create InmemStore", err) } - ts.Service = kv.NewService(zaptest.NewLogger(t), ts.Store, kv.ServiceConfig{Clock: c}) + ts.Service = kv.NewService(zaptest.NewLogger(t), ts.Store, kv.ServiceConfig{ + Clock: c, + FluxLanguageService: fluxlang.DefaultService, + }) err = ts.Service.Initialize(ctx) if err != nil { t.Fatal("Service.Initialize", err) @@ -246,7 +252,9 @@ func TestTaskRunCancellation(t *testing.T) { } defer close() - service := kv.NewService(zaptest.NewLogger(t), store) + service := kv.NewService(zaptest.NewLogger(t), store, kv.ServiceConfig{ + FluxLanguageService: fluxlang.DefaultService, + }) ctx, cancelFunc := context.WithCancel(context.Background()) if err := service.Initialize(ctx); err != nil { t.Fatalf("error initializing urm service: %v", err) diff --git a/notification/check/check.go b/notification/check/check.go index c1d5cf9f47..b4601185a3 100644 --- a/notification/check/check.go +++ b/notification/check/check.go @@ -36,7 +36,7 @@ type Base struct { } // Valid returns err if the check is invalid. -func (b Base) Valid() error { +func (b Base) Valid(lang influxdb.FluxLanguageService) error { if !b.ID.Valid() { return &influxdb.Error{ Code: influxdb.EInvalid, diff --git a/notification/check/check_test.go b/notification/check/check_test.go index 37322551a1..a197c1d4be 100644 --- a/notification/check/check_test.go +++ b/notification/check/check_test.go @@ -9,6 +9,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "github.com/influxdata/flux/parser" "github.com/influxdata/influxdb/notification" + "github.com/influxdata/influxdb/query/fluxlang" "github.com/influxdata/influxdb/mock" @@ -156,7 +157,7 @@ func TestValidCheck(t *testing.T) { }, } for _, c := range cases { - got := c.src.Valid() + got := c.src.Valid(fluxlang.DefaultService) influxTesting.ErrorsEqual(t, got, c.err) } } diff --git a/notification/check/custom.go b/notification/check/custom.go index d731979ad6..d43e89f7bc 100644 --- a/notification/check/custom.go +++ b/notification/check/custom.go @@ -5,9 +5,9 @@ import ( "time" "github.com/influxdata/flux/ast" - "github.com/influxdata/flux/parser" "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/notification/flux" + "github.com/influxdata/influxdb/query" ) var _ influxdb.Check = &Custom{} @@ -61,15 +61,16 @@ type Custom struct { // |> monitor.check(data: check, messageFn:messageFn, warn:warn, crit:crit, info:info) // GenerateFlux returns the check query text directly -func (c Custom) GenerateFlux() (string, error) { +func (c Custom) GenerateFlux(lang influxdb.FluxLanguageService) (string, error) { return c.Query.Text, nil } // sanitizeFlux modifies the check query text to include correct _check_id param in check object -func (c Custom) sanitizeFlux() (string, error) { - p := parser.ParseSource(c.Query.Text) - - if errs := ast.GetErrors(p); len(errs) != 0 { +func (c Custom) sanitizeFlux(lang influxdb.FluxLanguageService) (string, error) { + p, err := query.Parse(lang, c.Query.Text) + if p == nil { + return "", err + } else if errs := ast.GetErrors(p); len(errs) != 0 { return "", multiError(errs) } @@ -102,9 +103,12 @@ func propertyHasValue(prop *ast.Property, key string, value string) bool { return ok && prop.Key.Key() == key && stringLit.Value == value } -func (c *Custom) hasRequiredTaskOptions() (err error) { +func (c *Custom) hasRequiredTaskOptions(lang influxdb.FluxLanguageService) (err error) { - p := parser.ParseSource(c.Query.Text) + p, err := query.Parse(lang, c.Query.Text) + if p == nil { + return err + } hasOptionTask := false hasName := false @@ -168,8 +172,11 @@ func (c *Custom) hasRequiredTaskOptions() (err error) { return nil } -func (c *Custom) hasRequiredCheckParameters() (err error) { - p := parser.ParseSource(c.Query.Text) +func (c *Custom) hasRequiredCheckParameters(lang influxdb.FluxLanguageService) (err error) { + p, err := query.Parse(lang, c.Query.Text) + if p == nil { + return err + } hasCheckObject := false checkNameMatches := false @@ -213,18 +220,18 @@ func (c *Custom) hasRequiredCheckParameters() (err error) { } // Valid checks whether check flux is valid, returns error if invalid -func (c *Custom) Valid() error { +func (c *Custom) Valid(lang influxdb.FluxLanguageService) error { - if err := c.hasRequiredCheckParameters(); err != nil { + if err := c.hasRequiredCheckParameters(lang); err != nil { return err } - if err := c.hasRequiredTaskOptions(); err != nil { + if err := c.hasRequiredTaskOptions(lang); err != nil { return err } // add or replace _check_id parameter on the check object - script, err := c.sanitizeFlux() + script, err := c.sanitizeFlux(lang) if err != nil { return err } diff --git a/notification/check/custom_test.go b/notification/check/custom_test.go index d4926e455a..e0ea887747 100644 --- a/notification/check/custom_test.go +++ b/notification/check/custom_test.go @@ -10,6 +10,7 @@ import ( "github.com/influxdata/flux/parser" "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/notification/check" + "github.com/influxdata/influxdb/query/fluxlang" ) func TestCheck_Valid(t *testing.T) { @@ -169,7 +170,7 @@ data for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - err := tt.args.custom.Valid() + err := tt.args.custom.Valid(fluxlang.DefaultService) if exp, got := tt.wants.err, err; exp != nil && got != nil { // expected error, got error check that they match diff --git a/notification/check/deadman.go b/notification/check/deadman.go index 94088b876e..55ea8be26f 100644 --- a/notification/check/deadman.go +++ b/notification/check/deadman.go @@ -6,10 +6,10 @@ import ( "strings" "github.com/influxdata/flux/ast" - "github.com/influxdata/flux/parser" "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/notification" "github.com/influxdata/influxdb/notification/flux" + "github.com/influxdata/influxdb/query" ) var _ influxdb.Check = (*Deadman)(nil) @@ -31,8 +31,8 @@ func (c Deadman) Type() string { } // GenerateFlux returns a flux script for the Deadman provided. -func (c Deadman) GenerateFlux() (string, error) { - p, err := c.GenerateFluxAST() +func (c Deadman) GenerateFlux(lang influxdb.FluxLanguageService) (string, error) { + p, err := c.GenerateFluxAST(lang) if err != nil { return "", err } @@ -43,8 +43,11 @@ func (c Deadman) GenerateFlux() (string, error) { // GenerateFluxAST returns a flux AST for the deadman provided. If there // are any errors in the flux that the user provided the function will return // an error for each error found when the script is parsed. -func (c Deadman) GenerateFluxAST() (*ast.Package, error) { - p := parser.ParseSource(c.Query.Text) +func (c Deadman) GenerateFluxAST(lang influxdb.FluxLanguageService) (*ast.Package, error) { + p, err := query.Parse(lang, c.Query.Text) + if p == nil { + return nil, err + } removeAggregateWindow(p) replaceDurationsWithEvery(p, c.StaleTime) removeStopFromRange(p) diff --git a/notification/check/deadman_test.go b/notification/check/deadman_test.go index b8a8769c38..b631ab91ed 100644 --- a/notification/check/deadman_test.go +++ b/notification/check/deadman_test.go @@ -6,6 +6,7 @@ import ( "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/notification" "github.com/influxdata/influxdb/notification/check" + "github.com/influxdata/influxdb/query/fluxlang" ) func TestDeadman_GenerateFlux(t *testing.T) { @@ -147,7 +148,7 @@ data for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s, err := tt.args.deadman.GenerateFlux() + s, err := tt.args.deadman.GenerateFlux(fluxlang.DefaultService) if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/notification/check/threshold.go b/notification/check/threshold.go index c770787de2..8ed4672148 100644 --- a/notification/check/threshold.go +++ b/notification/check/threshold.go @@ -6,10 +6,10 @@ import ( "strings" "github.com/influxdata/flux/ast" - "github.com/influxdata/flux/parser" "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/notification" "github.com/influxdata/influxdb/notification/flux" + "github.com/influxdata/influxdb/query" ) var _ influxdb.Check = (*Threshold)(nil) @@ -26,8 +26,8 @@ func (t Threshold) Type() string { } // Valid returns error if something is invalid. -func (t Threshold) Valid() error { - if err := t.Base.Valid(); err != nil { +func (t Threshold) Valid(lang influxdb.FluxLanguageService) error { + if err := t.Base.Valid(lang); err != nil { return err } for _, cc := range t.Thresholds { @@ -104,8 +104,8 @@ func multiError(errs []error) error { // GenerateFlux returns a flux script for the threshold provided. If there // are any errors in the flux that the user provided the function will return // an error for each error found when the script is parsed. -func (t Threshold) GenerateFlux() (string, error) { - p, err := t.GenerateFluxAST() +func (t Threshold) GenerateFlux(lang influxdb.FluxLanguageService) (string, error) { + p, err := t.GenerateFluxAST(lang) if err != nil { return "", err } @@ -116,8 +116,11 @@ func (t Threshold) GenerateFlux() (string, error) { // GenerateFluxAST returns a flux AST for the threshold provided. If there // are any errors in the flux that the user provided the function will return // an error for each error found when the script is parsed. -func (t Threshold) GenerateFluxAST() (*ast.Package, error) { - p := parser.ParseSource(t.Query.Text) +func (t Threshold) GenerateFluxAST(lang influxdb.FluxLanguageService) (*ast.Package, error) { + p, err := query.Parse(lang, t.Query.Text) + if p == nil { + return nil, err + } replaceDurationsWithEvery(p, t.Every) removeStopFromRange(p) addCreateEmptyFalseToAggregateWindow(p) diff --git a/notification/check/threshold_test.go b/notification/check/threshold_test.go index 7955cfac60..d1339e6565 100644 --- a/notification/check/threshold_test.go +++ b/notification/check/threshold_test.go @@ -7,6 +7,7 @@ import ( "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/notification" "github.com/influxdata/influxdb/notification/check" + "github.com/influxdata/influxdb/query/fluxlang" "github.com/stretchr/testify/assert" ) @@ -305,7 +306,7 @@ data t.Run(tt.name, func(t *testing.T) { // TODO(desa): change this to GenerateFlux() when we don't need to code // around the monitor package not being available. - p, err := tt.args.threshold.GenerateFluxAST() + p, err := tt.args.threshold.GenerateFluxAST(fluxlang.DefaultService) if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/notification/duration.go b/notification/duration.go index 6e766fb654..9474f28100 100644 --- a/notification/duration.go +++ b/notification/duration.go @@ -2,11 +2,15 @@ package notification import ( "bytes" + "fmt" "strconv" "time" + "unicode" + "unicode/utf8" + "github.com/influxdata/flux" "github.com/influxdata/flux/ast" - "github.com/influxdata/flux/parser" + "github.com/influxdata/flux/codes" ) // Duration is a custom type used for generating flux compatible durations. @@ -34,21 +38,93 @@ func (d Duration) MarshalJSON() ([]byte, error) { // UnmarshalJSON turns a flux duration literal into a Duration. func (d *Duration) UnmarshalJSON(b []byte) error { - dur, err := parser.ParseDuration(string(b[1 : len(b)-1])) + dur, err := parseDuration(string(b[1 : len(b)-1])) if err != nil { return err } - *d = *(*Duration)(dur) + *d = Duration{Values: dur} return nil } // FromTimeDuration converts a time.Duration to a notification.Duration type. func FromTimeDuration(d time.Duration) (Duration, error) { - dur, err := parser.ParseDuration(d.String()) + dur, err := parseDuration(d.String()) if err != nil { return Duration{}, err } - return Duration(*dur), nil + return Duration{Values: dur}, nil +} + +// TODO(jsternberg): This file copies over code from an internal package +// because we need them from an internal package and the only way they +// are exposed is through a package that depends on the core flux parser. +// We want to avoid a dependency on the core parser so we copy these +// implementations. +// +// In the future, we should consider exposing these functions from flux +// in a non-internal package outside of the parser package. + +// parseDuration will convert a string into components of the duration. +func parseDuration(lit string) ([]ast.Duration, error) { + var values []ast.Duration + for len(lit) > 0 { + n := 0 + for n < len(lit) { + ch, size := utf8.DecodeRuneInString(lit[n:]) + if size == 0 { + panic("invalid rune in duration") + } + + if !unicode.IsDigit(ch) { + break + } + n += size + } + + if n == 0 { + return nil, &flux.Error{ + Code: codes.Invalid, + Msg: fmt.Sprintf("invalid duration %s", lit), + } + } + + magnitude, err := strconv.ParseInt(lit[:n], 10, 64) + if err != nil { + return nil, err + } + lit = lit[n:] + + n = 0 + for n < len(lit) { + ch, size := utf8.DecodeRuneInString(lit[n:]) + if size == 0 { + panic("invalid rune in duration") + } + + if !unicode.IsLetter(ch) { + break + } + n += size + } + + if n == 0 { + return nil, &flux.Error{ + Code: codes.Invalid, + Msg: fmt.Sprintf("duration is missing a unit: %s", lit), + } + } + + unit := lit[:n] + if unit == "µs" { + unit = "us" + } + values = append(values, ast.Duration{ + Magnitude: magnitude, + Unit: unit, + }) + lit = lit[n:] + } + return values, nil } diff --git a/notification/rule/slack_test.go b/notification/rule/slack_test.go index 1c77481f9a..5f0d17d856 100644 --- a/notification/rule/slack_test.go +++ b/notification/rule/slack_test.go @@ -3,6 +3,7 @@ package rule_test import ( "testing" + "github.com/influxdata/flux/ast" "github.com/influxdata/flux/parser" "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/notification" @@ -15,7 +16,7 @@ func mustDuration(d string) *notification.Duration { if err != nil { panic(err) } - + dur.BaseNode = ast.BaseNode{} return (*notification.Duration)(dur) } diff --git a/query.go b/query.go index 9fc94dfac8..a90f19a4e2 100644 --- a/query.go +++ b/query.go @@ -1,6 +1,12 @@ package influxdb -import "github.com/influxdata/flux/ast" +import ( + "context" + + "github.com/influxdata/flux/ast" + "github.com/influxdata/flux/interpreter" + "github.com/influxdata/flux/values" +) // TODO(desa): These files are possibly a temporary. This is needed // as a part of the source work that is being done. @@ -19,4 +25,7 @@ type FluxLanguageService interface { // An ast.Package may be returned when a parsing error occurs, // but it may be null if parsing didn't even occur. Parse(source string) (*ast.Package, error) + + // EvalAST will evaluate and run an AST. + EvalAST(ctx context.Context, astPkg *ast.Package) ([]interpreter.SideEffect, values.Scope, error) } diff --git a/query/fluxlang/service.go b/query/fluxlang/service.go index 2e6747d4e8..956df1abfc 100644 --- a/query/fluxlang/service.go +++ b/query/fluxlang/service.go @@ -2,8 +2,13 @@ package fluxlang import ( + "context" + "github.com/influxdata/flux/ast" + "github.com/influxdata/flux/interpreter" "github.com/influxdata/flux/parser" + "github.com/influxdata/flux/runtime" + "github.com/influxdata/flux/values" "github.com/influxdata/influxdb" ) @@ -19,3 +24,7 @@ func (d defaultService) Parse(source string) (pkg *ast.Package, err error) { } return pkg, err } + +func (d defaultService) EvalAST(ctx context.Context, astPkg *ast.Package) ([]interpreter.SideEffect, values.Scope, error) { + return runtime.EvalAST(ctx, astPkg) +} diff --git a/query/service.go b/query/service.go index 79e41268b9..ea240c212c 100644 --- a/query/service.go +++ b/query/service.go @@ -5,6 +5,10 @@ import ( "io" "github.com/influxdata/flux" + "github.com/influxdata/flux/ast" + "github.com/influxdata/flux/interpreter" + "github.com/influxdata/flux/values" + "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/kit/check" ) @@ -33,3 +37,32 @@ type ProxyQueryService interface { // The number of bytes written to w is returned __independent__ of any error. Query(ctx context.Context, w io.Writer, req *ProxyRequest) (flux.Statistics, error) } + +// Parse will take flux source code and produce a package. +// If there are errors when parsing, the first error is returned. +// An ast.Package may be returned when a parsing error occurs, +// but it may be null if parsing didn't even occur. +// +// This will return an error if the FluxLanguageService is nil. +func Parse(lang influxdb.FluxLanguageService, source string) (*ast.Package, error) { + if lang == nil { + return nil, &influxdb.Error{ + Code: influxdb.EInternal, + Msg: "flux is not configured; cannot parse", + } + } + return lang.Parse(source) +} + +// EvalAST will evaluate and run an AST. +// +// This will return an error if the FluxLanguageService is nil. +func EvalAST(ctx context.Context, lang influxdb.FluxLanguageService, astPkg *ast.Package) ([]interpreter.SideEffect, values.Scope, error) { + if lang == nil { + return nil, nil, &influxdb.Error{ + Code: influxdb.EInternal, + Msg: "flux is not configured; cannot evaluate", + } + } + return lang.EvalAST(ctx, astPkg) +} diff --git a/task.go b/task.go index 8eac75ee85..a335cdb21a 100644 --- a/task.go +++ b/task.go @@ -10,7 +10,6 @@ import ( "github.com/influxdata/flux/ast" "github.com/influxdata/flux/ast/edit" - "github.com/influxdata/flux/parser" "github.com/influxdata/influxdb/task/options" ) @@ -284,7 +283,13 @@ func (t TaskUpdate) Validate() error { // safeParseSource calls the Flux parser.ParseSource function // and is guaranteed not to panic. -func safeParseSource(f string) (pkg *ast.Package, err error) { +func safeParseSource(parser FluxLanguageService, f string) (pkg *ast.Package, err error) { + if parser == nil { + return nil, &Error{ + Code: EInternal, + Msg: "flux parser is not configured; updating a task requires the flux parser to be set", + } + } defer func() { if r := recover(); r != nil { err = &Error{ @@ -294,25 +299,21 @@ func safeParseSource(f string) (pkg *ast.Package, err error) { } }() - pkg = parser.ParseSource(f) - return pkg, err + return parser.Parse(f) } // UpdateFlux updates the TaskUpdate to go from updating options to updating a flux string, that now has those updated options in it // It zeros the options in the TaskUpdate. -func (t *TaskUpdate) UpdateFlux(oldFlux string) (err error) { +func (t *TaskUpdate) UpdateFlux(parser FluxLanguageService, oldFlux string) (err error) { if t.Flux != nil && *t.Flux != "" { oldFlux = *t.Flux } toDelete := map[string]struct{}{} - parsedPKG, err := safeParseSource(oldFlux) + parsedPKG, err := safeParseSource(parser, oldFlux) if err != nil { return err } - if ast.Check(parsedPKG) > 0 { - return ast.GetError(parsedPKG) - } parsed := parsedPKG.Files[0] if !t.Options.Every.IsZero() && t.Options.Cron != "" { return errors.New("cannot specify both cron and every") diff --git a/task/backend/analytical_storage_test.go b/task/backend/analytical_storage_test.go index c6ac1f902c..46b099e42c 100644 --- a/task/backend/analytical_storage_test.go +++ b/task/backend/analytical_storage_test.go @@ -16,6 +16,7 @@ import ( "github.com/influxdata/influxdb/query" _ "github.com/influxdata/influxdb/query/builtin" "github.com/influxdata/influxdb/query/control" + "github.com/influxdata/influxdb/query/fluxlang" stdlib "github.com/influxdata/influxdb/query/stdlib/influxdata/influxdb" "github.com/influxdata/influxdb/storage" "github.com/influxdata/influxdb/storage/reads" @@ -31,7 +32,9 @@ func TestAnalyticalStore(t *testing.T) { t, func(t *testing.T) (*servicetest.System, context.CancelFunc) { ctx, cancelFunc := context.WithCancel(context.Background()) - svc := kv.NewService(zaptest.NewLogger(t), inmem.NewKVStore()) + svc := kv.NewService(zaptest.NewLogger(t), inmem.NewKVStore(), kv.ServiceConfig{ + FluxLanguageService: fluxlang.DefaultService, + }) if err := svc.Initialize(ctx); err != nil { t.Fatalf("error initializing urm service: %v", err) } diff --git a/task/backend/executor/executor_test.go b/task/backend/executor/executor_test.go index 44213b7a9d..8884fbacd9 100644 --- a/task/backend/executor/executor_test.go +++ b/task/backend/executor/executor_test.go @@ -19,6 +19,7 @@ import ( tracetest "github.com/influxdata/influxdb/kit/tracing/testing" "github.com/influxdata/influxdb/kv" "github.com/influxdata/influxdb/query" + "github.com/influxdata/influxdb/query/fluxlang" "github.com/influxdata/influxdb/task/backend" "github.com/influxdata/influxdb/task/backend/scheduler" "github.com/opentracing/opentracing-go" @@ -52,7 +53,9 @@ func taskExecutorSystem(t *testing.T) tes { qs = query.QueryServiceBridge{ AsyncQueryService: aqs, } - i = kv.NewService(zaptest.NewLogger(t), inmem.NewKVStore()) + i = kv.NewService(zaptest.NewLogger(t), inmem.NewKVStore(), kv.ServiceConfig{ + FluxLanguageService: fluxlang.DefaultService, + }) tcs = &taskControlService{TaskControlService: i} ex, metrics = NewExecutor(zaptest.NewLogger(t), qs, i, i, tcs) ) diff --git a/task/backend/executor/limits.go b/task/backend/executor/limits.go index de20fea561..ffc3edde2c 100644 --- a/task/backend/executor/limits.go +++ b/task/backend/executor/limits.go @@ -10,9 +10,9 @@ import ( // ConcurrencyLimit creates a concurrency limit func that uses the executor to determine // if the task has exceeded the concurrency limit. -func ConcurrencyLimit(exec *Executor) LimitFunc { +func ConcurrencyLimit(exec *Executor, lang influxdb.FluxLanguageService) LimitFunc { return func(t *influxdb.Task, r *influxdb.Run) error { - o, err := options.FromScript(t.Flux) + o, err := options.FromScript(lang, t.Flux) if err != nil { return err } diff --git a/task/backend/executor/limits_test.go b/task/backend/executor/limits_test.go index dd8dd1228f..81b588b2a0 100644 --- a/task/backend/executor/limits_test.go +++ b/task/backend/executor/limits_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/query/fluxlang" ) var ( @@ -34,7 +35,7 @@ func TestTaskConcurrency(t *testing.T) { ScheduledFor: time.Now(), } - clFunc := ConcurrencyLimit(te) + clFunc := ConcurrencyLimit(te, fluxlang.DefaultService) if err := clFunc(taskWith1Concurrency, r1); err != nil { t.Fatal(err) } diff --git a/task/options/options.go b/task/options/options.go index 22463d5acf..edbd21c547 100644 --- a/task/options/options.go +++ b/task/options/options.go @@ -3,6 +3,7 @@ package options import ( "context" + "errors" "fmt" "strings" "time" @@ -10,8 +11,7 @@ import ( "github.com/influxdata/cron" "github.com/influxdata/flux" "github.com/influxdata/flux/ast" - "github.com/influxdata/flux/parser" - "github.com/influxdata/flux/runtime" + "github.com/influxdata/flux/interpreter" "github.com/influxdata/flux/semantic" "github.com/influxdata/flux/values" "github.com/influxdata/influxdb/pkg/pointer" @@ -69,17 +69,6 @@ func MustParseDuration(s string) (dur *Duration) { return dur } -// parseSignedDuration is a helper wrapper around parser.ParseSignedDuration. -// We use it because we need to clear the basenode, but flux does not. -func parseSignedDuration(text string) (*ast.DurationLiteral, error) { - q, err := parser.ParseSignedDuration(text) - if err != nil { - return nil, err - } - q.BaseNode = ast.BaseNode{} - return q, err -} - // UnmarshalText unmarshals text into a Duration. func (a *Duration) UnmarshalText(text []byte) error { q, err := parseSignedDuration(string(text)) @@ -203,18 +192,30 @@ func newDeps() flux.Dependencies { return deps } +// FluxLanguageService is a service for interacting with flux code. +type FluxLanguageService interface { + // Parse will take flux source code and produce a package. + // If there are errors when parsing, the first error is returned. + // An ast.Package may be returned when a parsing error occurs, + // but it may be null if parsing didn't even occur. + Parse(source string) (*ast.Package, error) + + // EvalAST will evaluate and run an AST. + EvalAST(ctx context.Context, astPkg *ast.Package) ([]interpreter.SideEffect, values.Scope, error) +} + // FromScript extracts Options from a Flux script. -func FromScript(script string) (Options, error) { +func FromScript(lang FluxLanguageService, script string) (Options, error) { opt := Options{Retry: pointer.Int64(1), Concurrency: pointer.Int64(1)} - fluxAST, err := runtime.Parse(script) + fluxAST, err := parse(lang, script) if err != nil { return opt, err } durTypes := grabTaskOptionAST(fluxAST, optEvery, optOffset) // TODO(desa): should be dependencies.NewEmpty(), but for now we'll hack things together ctx := newDeps().Inject(context.Background()) - _, scope, err := runtime.EvalAST(ctx, fluxAST) + _, scope, err := evalAST(ctx, lang, fluxAST) if err != nil { return opt, err } @@ -430,3 +431,26 @@ func validateOptionNames(o values.Object) error { return nil } + +// parse will take flux source code and produce a package. +// If there are errors when parsing, the first error is returned. +// An ast.Package may be returned when a parsing error occurs, +// but it may be null if parsing didn't even occur. +// +// This will return an error if the FluxLanguageService is nil. +func parse(lang FluxLanguageService, source string) (*ast.Package, error) { + if lang == nil { + return nil, errors.New("flux is not configured; cannot parse") + } + return lang.Parse(source) +} + +// EvalAST will evaluate and run an AST. +// +// This will return an error if the FluxLanguageService is nil. +func evalAST(ctx context.Context, lang FluxLanguageService, astPkg *ast.Package) ([]interpreter.SideEffect, values.Scope, error) { + if lang == nil { + return nil, nil, errors.New("flux is not configured; cannot evaluate") + } + return lang.EvalAST(ctx, astPkg) +} diff --git a/task/options/options_test.go b/task/options/options_test.go index a791451104..5e775f75a3 100644 --- a/task/options/options_test.go +++ b/task/options/options_test.go @@ -10,6 +10,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/influxdata/influxdb/pkg/pointer" _ "github.com/influxdata/influxdb/query/builtin" + "github.com/influxdata/influxdb/query/fluxlang" "github.com/influxdata/influxdb/task/options" ) @@ -103,7 +104,7 @@ func TestFromScript(t *testing.T) { exp: options.Options{Name: "test_task_smoke_name", Every: *(options.MustParseDuration("30s")), Retry: pointer.Int64(1), Concurrency: pointer.Int64(1)}, shouldErr: false}, // TODO(docmerlin): remove this once tasks fully supports all flux duration units. } { - o, err := options.FromScript(c.script) + o, err := options.FromScript(fluxlang.DefaultService, c.script) if c.shouldErr && err == nil { t.Fatalf("script %q should have errored but didn't", c.script) } else if !c.shouldErr && err != nil { @@ -121,7 +122,7 @@ func TestFromScript(t *testing.T) { func BenchmarkFromScriptFunc(b *testing.B) { for n := 0; n < b.N; n++ { - _, err := options.FromScript(`option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`) + _, err := options.FromScript(fluxlang.DefaultService, `option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`) if err != nil { fmt.Printf("error: %v", err) } @@ -133,11 +134,11 @@ func TestFromScriptWithUnknownOptions(t *testing.T) { const bodySuffix = `} from(bucket:"b") |> range(start:-1m)` // Script without unknown option should be good. - if _, err := options.FromScript(optPrefix + bodySuffix); err != nil { + if _, err := options.FromScript(fluxlang.DefaultService, optPrefix+bodySuffix); err != nil { t.Fatal(err) } - _, err := options.FromScript(optPrefix + `, Offset: 2s, foo: "bar"` + bodySuffix) + _, err := options.FromScript(fluxlang.DefaultService, optPrefix+`, Offset: 2s, foo: "bar"`+bodySuffix) if err == nil { t.Fatal("expected error from unknown option but got nil") } diff --git a/task/options/strconv.go b/task/options/strconv.go new file mode 100644 index 0000000000..c76f975373 --- /dev/null +++ b/task/options/strconv.go @@ -0,0 +1,109 @@ +package options + +import ( + "fmt" + "strconv" + "unicode" + "unicode/utf8" + + "github.com/influxdata/flux" + "github.com/influxdata/flux/ast" + "github.com/influxdata/flux/codes" +) + +// TODO(jsternberg): This file copies over code from an internal package +// because we need them from an internal package and the only way they +// are exposed is through a package that depends on the core flux parser. +// We want to avoid a dependency on the core parser so we copy these +// implementations. +// +// In the future, we should consider exposing these functions from flux +// in a non-internal package outside of the parser package. + +// parseSignedDuration is a helper wrapper around parser.ParseSignedDuration. +// We use it because we need to clear the basenode, but flux does not. +func parseSignedDuration(text string) (*ast.DurationLiteral, error) { + // TODO(jsternberg): This is copied from an internal package in flux to break a dependency + // on the parser package where this method is exposed. + // Consider exposing this properly in flux. + r, s := utf8.DecodeRuneInString(text) + if r == '-' { + d, err := parseDuration(text[s:]) + if err != nil { + return nil, err + } + for i := range d { + d[i].Magnitude = -d[i].Magnitude + } + return &ast.DurationLiteral{Values: d}, nil + } + + d, err := parseDuration(text) + if err != nil { + return nil, err + } + return &ast.DurationLiteral{Values: d}, nil +} + +// parseDuration will convert a string into components of the duration. +func parseDuration(lit string) ([]ast.Duration, error) { + var values []ast.Duration + for len(lit) > 0 { + n := 0 + for n < len(lit) { + ch, size := utf8.DecodeRuneInString(lit[n:]) + if size == 0 { + panic("invalid rune in duration") + } + + if !unicode.IsDigit(ch) { + break + } + n += size + } + + if n == 0 { + return nil, &flux.Error{ + Code: codes.Invalid, + Msg: fmt.Sprintf("invalid duration %s", lit), + } + } + + magnitude, err := strconv.ParseInt(lit[:n], 10, 64) + if err != nil { + return nil, err + } + lit = lit[n:] + + n = 0 + for n < len(lit) { + ch, size := utf8.DecodeRuneInString(lit[n:]) + if size == 0 { + panic("invalid rune in duration") + } + + if !unicode.IsLetter(ch) { + break + } + n += size + } + + if n == 0 { + return nil, &flux.Error{ + Code: codes.Invalid, + Msg: fmt.Sprintf("duration is missing a unit: %s", lit), + } + } + + unit := lit[:n] + if unit == "µs" { + unit = "us" + } + values = append(values, ast.Duration{ + Magnitude: magnitude, + Unit: unit, + }) + lit = lit[n:] + } + return values, nil +} diff --git a/task_test.go b/task_test.go index 1723e9f7dc..be369fcc18 100644 --- a/task_test.go +++ b/task_test.go @@ -7,6 +7,7 @@ import ( "github.com/google/go-cmp/cmp" platform "github.com/influxdata/influxdb" _ "github.com/influxdata/influxdb/query/builtin" + "github.com/influxdata/influxdb/query/fluxlang" "github.com/influxdata/influxdb/task/options" ) @@ -37,7 +38,7 @@ func TestOptionsMarshal(t *testing.T) { func TestOptionsEdit(t *testing.T) { tu := &platform.TaskUpdate{} tu.Options.Every = *(options.MustParseDuration("10s")) - if err := tu.UpdateFlux(`option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil { + if err := tu.UpdateFlux(fluxlang.DefaultService, `option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil { t.Fatal(err) } t.Run("zeroing", func(t *testing.T) { @@ -55,7 +56,7 @@ from(bucket: "x") } }) t.Run("replacement", func(t *testing.T) { - op, err := options.FromScript(*tu.Flux) + op, err := options.FromScript(fluxlang.DefaultService, *tu.Flux) if err != nil { t.Error(err) } @@ -67,10 +68,10 @@ from(bucket: "x") t.Run("add new option", func(t *testing.T) { tu := &platform.TaskUpdate{} tu.Options.Offset = options.MustParseDuration("30s") - if err := tu.UpdateFlux(`option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil { + if err := tu.UpdateFlux(fluxlang.DefaultService, `option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil { t.Fatal(err) } - op, err := options.FromScript(*tu.Flux) + op, err := options.FromScript(fluxlang.DefaultService, *tu.Flux) if err != nil { t.Error(err) } @@ -81,10 +82,10 @@ from(bucket: "x") t.Run("switching from every to cron", func(t *testing.T) { tu := &platform.TaskUpdate{} tu.Options.Cron = "* * * * *" - if err := tu.UpdateFlux(`option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil { + if err := tu.UpdateFlux(fluxlang.DefaultService, `option task = {every: 20s, name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil { t.Fatal(err) } - op, err := options.FromScript(*tu.Flux) + op, err := options.FromScript(fluxlang.DefaultService, *tu.Flux) if err != nil { t.Error(err) } @@ -98,10 +99,10 @@ from(bucket: "x") t.Run("switching from cron to every", func(t *testing.T) { tu := &platform.TaskUpdate{} tu.Options.Every = *(options.MustParseDuration("10s")) - if err := tu.UpdateFlux(`option task = {cron: "* * * * *", name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil { + if err := tu.UpdateFlux(fluxlang.DefaultService, `option task = {cron: "* * * * *", name: "foo"} from(bucket:"x") |> range(start:-1h)`); err != nil { t.Fatal(err) } - op, err := options.FromScript(*tu.Flux) + op, err := options.FromScript(fluxlang.DefaultService, *tu.Flux) if err != nil { t.Error(err) } @@ -119,10 +120,10 @@ from(bucket: "x") from(bucket: "x") |> range(start: -1h)` - if err := tu.UpdateFlux(`option task = {cron: "* * * * *", name: "foo", offset: 10s} from(bucket:"x") |> range(start:-1h)`); err != nil { + if err := tu.UpdateFlux(fluxlang.DefaultService, `option task = {cron: "* * * * *", name: "foo", offset: 10s} from(bucket:"x") |> range(start:-1h)`); err != nil { t.Fatal(err) } - op, err := options.FromScript(*tu.Flux) + op, err := options.FromScript(fluxlang.DefaultService, *tu.Flux) if err != nil { t.Error(err) } diff --git a/testing/checks.go b/testing/checks.go index 8ce0fc4306..ad4e59d39e 100644 --- a/testing/checks.go +++ b/testing/checks.go @@ -10,6 +10,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "github.com/influxdata/flux/ast" "github.com/influxdata/flux/parser" "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/mock" @@ -22,6 +23,7 @@ func mustDuration(d string) *notification.Duration { if err != nil { panic(err) } + dur.BaseNode = ast.BaseNode{} return (*notification.Duration)(dur) }