From 1ae2541bf34ab8a4a9f0fcfeeaa2d93dad6646e2 Mon Sep 17 00:00:00 2001 From: Brett Buddin Date: Tue, 25 Aug 2020 13:06:26 -0400 Subject: [PATCH] feat(task): Inject Task's LatestSuccess Timestamp In Flux Extern (#19402) * feat(task): Inject latest success/failure into extern. * chore(task/backend): Don't specify an extern if there are no statements. * chore(task/executor): Don't apply the latest failure for now. * chore(changelog): Add 19402 to changelog. * chore(kit): Introduce feature flag for time injection. * chore(task/executor): Guard injection into extern by feature flag. * chore(task/executor): No need for this subtest pattern. * chore(task/executor): Add tests for extern injection. --- CHANGELOG.md | 1 + flags.yml | 6 + kit/feature/list.go | 16 +++ task/backend/executor/executor.go | 80 ++++++++++-- task/backend/executor/executor_test.go | 163 +++++++++++++++++++------ task/backend/executor/support_test.go | 31 +++-- 6 files changed, 241 insertions(+), 56 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b87d432c03..8432013353 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ 1. [19334](https://github.com/influxdata/influxdb/pull/19334): Add --active-config flag to influx to set config for single command 1. [19219](https://github.com/influxdata/influxdb/pull/19219): List buckets via the API now supports after (ID) parameter as an alternative to offset. 1. [19390](https://github.com/influxdata/influxdb/pull/19390): Record last success and failure run times in the Task +1. [19402](https://github.com/influxdata/influxdb/pull/19402): Inject Task's LatestSuccess Timestamp In Flux Extern ### Bug Fixes diff --git a/flags.yml b/flags.yml index 859b6991f7..5c32ba0873 100644 --- a/flags.yml +++ b/flags.yml @@ -138,3 +138,9 @@ key: enforceOrgDashboardLimits default: false contact: Compute Team + +- name: Inject Latest Success Time + description: Inject the latest successful task run timestamp into a Task query extern when executing. + key: injectLatestSuccessTime + default: false + contact: Compute Team diff --git a/kit/feature/list.go b/kit/feature/list.go index 5ed755c25a..c85cf7d7c8 100644 --- a/kit/feature/list.go +++ b/kit/feature/list.go @@ -254,6 +254,20 @@ func EnforceOrganizationDashboardLimits() BoolFlag { return enforceOrgDashboardLimits } +var injectLatestSuccessTime = MakeBoolFlag( + "Inject Latest Success Time", + "injectLatestSuccessTime", + "Compute Team", + false, + Temporary, + false, +) + +// InjectLatestSuccessTime - Inject the latest successful task run timestamp into a Task query extern when executing. +func InjectLatestSuccessTime() BoolFlag { + return injectLatestSuccessTime +} + var all = []Flag{ appMetrics, backendExample, @@ -273,6 +287,7 @@ var all = []Flag{ pushDownGroupAggregateMinMax, orgOnlyMemberList, enforceOrgDashboardLimits, + injectLatestSuccessTime, } var byKey = map[string]Flag{ @@ -294,4 +309,5 @@ var byKey = map[string]Flag{ "pushDownGroupAggregateMinMax": pushDownGroupAggregateMinMax, "orgOnlyMemberList": orgOnlyMemberList, "enforceOrgDashboardLimits": enforceOrgDashboardLimits, + "injectLatestSuccessTime": injectLatestSuccessTime, } diff --git a/task/backend/executor/executor.go b/task/backend/executor/executor.go index 952d5f3ced..941392773c 100644 --- a/task/backend/executor/executor.go +++ b/task/backend/executor/executor.go @@ -2,11 +2,13 @@ package executor import ( "context" + "encoding/json" "fmt" "sync" "time" "github.com/influxdata/flux" + "github.com/influxdata/flux/ast" "github.com/influxdata/flux/lang" "github.com/influxdata/flux/runtime" "github.com/influxdata/influxdb/v2" @@ -22,6 +24,9 @@ import ( const ( maxPromises = 1000 defaultMaxWorkers = 100 + + latestSuccessOption = "tasks.latestSuccessTime" + latestFailureOption = "tasks.latestFailureTime" ) var _ scheduler.Executor = (*Executor)(nil) @@ -70,7 +75,31 @@ func WithMaxWorkers(n int) executorOption { // CompilerBuilderFunc is a function that yields a new flux.Compiler. The // context.Context provided can be assumed to be an authorized context. -type CompilerBuilderFunc func(ctx context.Context, query string, now time.Time) (flux.Compiler, error) +type CompilerBuilderFunc func(ctx context.Context, query string, ts CompilerBuilderTimestamps) (flux.Compiler, error) + +// CompilerBuilderTimestamps contains timestamps which should be provided along +// with a Task query. +type CompilerBuilderTimestamps struct { + Now time.Time + LatestSuccess time.Time +} + +func (ts CompilerBuilderTimestamps) Extern() *ast.File { + var body []ast.Statement + + if !ts.LatestSuccess.IsZero() { + body = append(body, &ast.OptionStatement{ + Assignment: &ast.VariableAssignment{ + ID: &ast.Identifier{Name: latestSuccessOption}, + Init: &ast.DateTimeLiteral{ + Value: ts.LatestSuccess, + }, + }, + }) + } + + return &ast.File{Body: body} +} // WithSystemCompilerBuilder is an Executor option that configures a // CompilerBuilderFunc to be used when compiling queries for System Tasks. @@ -416,8 +445,6 @@ func (w *worker) start(p *promise) { } func (w *worker) finish(p *promise, rs influxdb.RunStatus, err error) { - - // trace span, ctx := tracing.StartSpanFromContext(p.ctx) defer span.Finish() @@ -471,7 +498,10 @@ func (w *worker) executeQuery(p *promise) { if p.task.Type != influxdb.TaskSystemType { buildCompiler = w.nonSystemBuildCompiler } - compiler, err := buildCompiler(ctx, p.task.Flux, p.run.ScheduledFor) + compiler, err := buildCompiler(ctx, p.task.Flux, CompilerBuilderTimestamps{ + Now: p.run.ScheduledFor, + LatestSuccess: p.task.LatestSuccess, + }) if err != nil { w.finish(p, influxdb.RunFail, influxdb.ErrFluxParseError(err)) return @@ -592,21 +622,45 @@ func exhaustResultIterators(res flux.Result) error { } // NewASTCompiler parses a Flux query string into an AST representatation. -func NewASTCompiler(_ context.Context, query string, now time.Time) (flux.Compiler, error) { +func NewASTCompiler(ctx context.Context, query string, ts CompilerBuilderTimestamps) (flux.Compiler, error) { pkg, err := runtime.ParseToJSON(query) if err != nil { return nil, err } + var externBytes []byte + if feature.InjectLatestSuccessTime().Enabled(ctx) { + extern := ts.Extern() + if len(extern.Body) > 0 { + var err error + externBytes, err = json.Marshal(extern) + if err != nil { + return nil, err + } + } + } return lang.ASTCompiler{ - AST: pkg, - Now: now, + AST: pkg, + Now: ts.Now, + Extern: externBytes, }, nil } // NewFluxCompiler wraps a Flux query string in a raw-query representation. -func NewFluxCompiler(_ context.Context, query string, _ time.Time) (flux.Compiler, error) { +func NewFluxCompiler(ctx context.Context, query string, ts CompilerBuilderTimestamps) (flux.Compiler, error) { + var externBytes []byte + if feature.InjectLatestSuccessTime().Enabled(ctx) { + extern := ts.Extern() + if len(extern.Body) > 0 { + var err error + externBytes, err = json.Marshal(extern) + if err != nil { + return nil, err + } + } + } return lang.FluxCompiler{ - Query: query, + Query: query, + Extern: externBytes, // TODO(brett): This mitigates an immediate problem where // Checks/Notifications breaks when sending Now, and system Tasks do not // break when sending Now. We are currently sending C+N through using @@ -617,7 +671,13 @@ func NewFluxCompiler(_ context.Context, query string, _ time.Time) (flux.Compile // we are able to locate the root cause and use Flux Compiler for all // Task types. // - // This should be removed once we diagnose the problem. + // It turns out this is due to the exclusive nature of the stop time in + // Flux "from" and that we weren't including the left-hand boundary of + // the range check for notifications. We're shipping a fix soon in + // + // https://github.com/influxdata/influxdb/pull/19392 + // + // Once this has merged, we can send Now again. // // Now: now, }, nil diff --git a/task/backend/executor/executor_test.go b/task/backend/executor/executor_test.go index 4a1b6b97fc..4b74d84187 100644 --- a/task/backend/executor/executor_test.go +++ b/task/backend/executor/executor_test.go @@ -12,14 +12,17 @@ import ( "github.com/golang/mock/gomock" "github.com/influxdata/flux" + "github.com/influxdata/flux/ast" "github.com/influxdata/influxdb/v2" icontext "github.com/influxdata/influxdb/v2/context" "github.com/influxdata/influxdb/v2/inmem" + "github.com/influxdata/influxdb/v2/kit/feature" "github.com/influxdata/influxdb/v2/kit/prom" "github.com/influxdata/influxdb/v2/kit/prom/promtest" tracetest "github.com/influxdata/influxdb/v2/kit/tracing/testing" "github.com/influxdata/influxdb/v2/kv" "github.com/influxdata/influxdb/v2/kv/migration/all" + influxdbmock "github.com/influxdata/influxdb/v2/mock" "github.com/influxdata/influxdb/v2/query" "github.com/influxdata/influxdb/v2/query/fluxlang" "github.com/influxdata/influxdb/v2/task/backend" @@ -85,19 +88,7 @@ func taskExecutorSystem(t *testing.T) tes { } } -func TestTaskExecutor(t *testing.T) { - t.Run("QuerySuccess", testQuerySuccess) - t.Run("QueryFailure", testQueryFailure) - t.Run("ManualRun", testManualRun) - t.Run("ResumeRun", testResumingRun) - t.Run("WorkerLimit", testWorkerLimit) - t.Run("LimitFunc", testLimitFunc) - t.Run("Metrics", testMetrics) - t.Run("IteratorFailure", testIteratorFailure) - t.Run("ErrorHandling", testErrorHandling) -} - -func testQuerySuccess(t *testing.T) { +func TestTaskExecutor_QuerySuccess(t *testing.T) { t.Parallel() tes := taskExecutorSystem(t) @@ -133,8 +124,8 @@ func testQuerySuccess(t *testing.T) { t.Fatalf("did not correctly set RunAt value, got: %v", run.RunAt) } - tes.svc.WaitForQueryLive(t, script) - tes.svc.SucceedQuery(script) + tes.svc.WaitForQueryLive(t, script, nil) + tes.svc.SucceedQuery(script, nil) <-promise.Done() @@ -165,7 +156,107 @@ func testQuerySuccess(t *testing.T) { } } -func testQueryFailure(t *testing.T) { +func TestTaskExecutor_QuerySuccessWithExternInjection(t *testing.T) { + t.Parallel() + + tes := taskExecutorSystem(t) + + var ( + script = fmt.Sprintf(fmtTestScript, t.Name()) + ctx = icontext.SetAuthorizer(context.Background(), tes.tc.Auth) + span = opentracing.GlobalTracer().StartSpan("test-span") + ) + ctx = opentracing.ContextWithSpan(ctx, span) + + task, err := tes.i.CreateTask(ctx, influxdb.TaskCreate{ + OrganizationID: tes.tc.OrgID, + OwnerID: tes.tc.Auth.GetUserID(), + Flux: script, + }) + if err != nil { + t.Fatal(err) + } + + // Simulate previous run to establish a timestamp + latestSuccess := time.Now().UTC() + task, err = tes.i.UpdateTask(ctx, task.ID, influxdb.TaskUpdate{ + LatestSuccess: &latestSuccess, + }) + if err != nil { + t.Fatal(err) + } + + extern := &ast.File{ + Body: []ast.Statement{&ast.OptionStatement{ + Assignment: &ast.VariableAssignment{ + ID: &ast.Identifier{Name: latestSuccessOption}, + Init: &ast.DateTimeLiteral{ + Value: latestSuccess, + }, + }, + }, + }, + } + + ctx, err = feature.Annotate(ctx, influxdbmock.NewFlagger(map[feature.Flag]interface{}{ + feature.InjectLatestSuccessTime(): true, + })) + if err != nil { + t.Fatal(err) + } + + promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0)) + if err != nil { + t.Fatal(err) + } + promiseID := influxdb.ID(promise.ID()) + + run, err := tes.i.FindRunByID(context.Background(), task.ID, promiseID) + if err != nil { + t.Fatal(err) + } + + if run.ID != promiseID { + t.Fatal("promise and run dont match") + } + + if run.RunAt != time.Unix(126, 0).UTC() { + t.Fatalf("did not correctly set RunAt value, got: %v", run.RunAt) + } + + tes.svc.WaitForQueryLive(t, script, extern) + tes.svc.SucceedQuery(script, extern) + + <-promise.Done() + + if got := promise.Error(); got != nil { + t.Fatal(got) + } + + // confirm run is removed from in-mem store + run, err = tes.i.FindRunByID(context.Background(), task.ID, run.ID) + if run != nil || err == nil || !strings.Contains(err.Error(), "run not found") { + t.Fatal("run was returned when it should have been removed from kv") + } + + // ensure the run returned by TaskControlService.FinishRun(...) + // has run logs formatted as expected + if run = tes.tcs.run; run == nil { + t.Fatal("expected run returned by FinishRun to not be nil") + } + + if len(run.Log) < 3 { + t.Fatalf("expected 3 run logs, found %d", len(run.Log)) + } + + sctx := span.Context().(jaeger.SpanContext) + expectedMessage := fmt.Sprintf("trace_id=%s is_sampled=true", sctx.TraceID()) + if expectedMessage != run.Log[1].Message { + t.Errorf("expected %q, found %q", expectedMessage, run.Log[1].Message) + } +} + +func TestTaskExecutor_QueryFailure(t *testing.T) { t.Parallel() tes := taskExecutorSystem(t) @@ -191,8 +282,8 @@ func testQueryFailure(t *testing.T) { t.Fatal("promise and run dont match") } - tes.svc.WaitForQueryLive(t, script) - tes.svc.FailQuery(script, errors.New("blargyblargblarg")) + tes.svc.WaitForQueryLive(t, script, nil) + tes.svc.FailQuery(script, nil, errors.New("blargyblargblarg")) <-promise.Done() @@ -201,7 +292,7 @@ func testQueryFailure(t *testing.T) { } } -func testManualRun(t *testing.T) { +func TestManualRun(t *testing.T) { t.Parallel() tes := taskExecutorSystem(t) @@ -240,15 +331,15 @@ func testManualRun(t *testing.T) { t.Fatal("promise and run and manual run dont match") } - tes.svc.WaitForQueryLive(t, script) - tes.svc.SucceedQuery(script) + tes.svc.WaitForQueryLive(t, script, nil) + tes.svc.SucceedQuery(script, nil) if got := promise.Error(); got != nil { t.Fatal(got) } } -func testResumingRun(t *testing.T) { +func TestTaskExecutor_ResumingRun(t *testing.T) { t.Parallel() tes := taskExecutorSystem(t) @@ -283,15 +374,15 @@ func testResumingRun(t *testing.T) { t.Fatal("promise and run and manual run dont match") } - tes.svc.WaitForQueryLive(t, script) - tes.svc.SucceedQuery(script) + tes.svc.WaitForQueryLive(t, script, nil) + tes.svc.SucceedQuery(script, nil) if got := promise.Error(); got != nil { t.Fatal(got) } } -func testWorkerLimit(t *testing.T) { +func TestTaskExecutor_WorkerLimit(t *testing.T) { t.Parallel() tes := taskExecutorSystem(t) @@ -311,8 +402,8 @@ func testWorkerLimit(t *testing.T) { t.Fatal("expected a worker to be started") } - tes.svc.WaitForQueryLive(t, script) - tes.svc.FailQuery(script, errors.New("blargyblargblarg")) + tes.svc.WaitForQueryLive(t, script, nil) + tes.svc.FailQuery(script, nil, errors.New("blargyblargblarg")) <-promise.Done() @@ -321,7 +412,7 @@ func testWorkerLimit(t *testing.T) { } } -func testLimitFunc(t *testing.T) { +func TestTaskExecutor_LimitFunc(t *testing.T) { t.Parallel() tes := taskExecutorSystem(t) @@ -360,7 +451,7 @@ func testLimitFunc(t *testing.T) { } } -func testMetrics(t *testing.T) { +func TestTaskExecutor_Metrics(t *testing.T) { t.Parallel() tes := taskExecutorSystem(t) metrics := tes.metrics @@ -395,7 +486,7 @@ func testMetrics(t *testing.T) { t.Fatal("promise and run dont match") } - tes.svc.WaitForQueryLive(t, script) + tes.svc.WaitForQueryLive(t, script, nil) mg = promtest.MustGather(t, reg) m = promtest.MustFindMetric(t, mg, "task_executor_total_runs_active", nil) @@ -403,7 +494,7 @@ func testMetrics(t *testing.T) { t.Fatalf("expected 1 total runs active, got %v", got) } - tes.svc.SucceedQuery(script) + tes.svc.SucceedQuery(script, nil) <-promise.Done() mg = promtest.MustGather(t, reg) @@ -457,7 +548,7 @@ func testMetrics(t *testing.T) { } -func testIteratorFailure(t *testing.T) { +func TestTaskExecutor_IteratorFailure(t *testing.T) { t.Parallel() tes := taskExecutorSystem(t) @@ -495,8 +586,8 @@ func testIteratorFailure(t *testing.T) { t.Fatal("promise and run dont match") } - tes.svc.WaitForQueryLive(t, script) - tes.svc.SucceedQuery(script) + tes.svc.WaitForQueryLive(t, script, nil) + tes.svc.SucceedQuery(script, nil) <-promise.Done() @@ -505,7 +596,7 @@ func testIteratorFailure(t *testing.T) { } } -func testErrorHandling(t *testing.T) { +func TestTaskExecutor_ErrorHandling(t *testing.T) { t.Parallel() tes := taskExecutorSystem(t) @@ -551,7 +642,7 @@ func testErrorHandling(t *testing.T) { */ } -func TestPromiseFailure(t *testing.T) { +func TestTaskExecutor_PromiseFailure(t *testing.T) { t.Parallel() tes := taskExecutorSystem(t) diff --git a/task/backend/executor/support_test.go b/task/backend/executor/support_test.go index 8a552958d0..bb565d6833 100644 --- a/task/backend/executor/support_test.go +++ b/task/backend/executor/support_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/influxdata/flux" + "github.com/influxdata/flux/ast" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/lang" "github.com/influxdata/flux/memory" @@ -31,14 +32,24 @@ type fakeQueryService struct { var _ query.AsyncQueryService = (*fakeQueryService)(nil) -func makeAST(q string) lang.ASTCompiler { +func makeAST(q string, extern *ast.File) lang.ASTCompiler { pkg, err := runtime.ParseToJSON(q) if err != nil { panic(err) } + + var externBytes []byte + if extern != nil && len(extern.Body) > 0 { + var err error + externBytes, err = json.Marshal(extern) + if err != nil { + panic(err) + } + } return lang.ASTCompiler{ - AST: pkg, - Now: time.Unix(123, 0), + AST: pkg, + Now: time.Unix(123, 0), + Extern: externBytes, } } @@ -85,12 +96,12 @@ func (s *fakeQueryService) Query(ctx context.Context, req *query.Request) (flux. } // SucceedQuery allows the running query matching the given script to return on its Ready channel. -func (s *fakeQueryService) SucceedQuery(script string) { +func (s *fakeQueryService) SucceedQuery(script string, extern *ast.File) { s.mu.Lock() defer s.mu.Unlock() // Unblock the flux. - ast := makeAST(script) + ast := makeAST(script, extern) spec := makeASTString(ast) fq, ok := s.queries[spec] if !ok { @@ -103,12 +114,12 @@ func (s *fakeQueryService) SucceedQuery(script string) { } // FailQuery closes the running query's Ready channel and sets its error to the given value. -func (s *fakeQueryService) FailQuery(script string, forced error) { +func (s *fakeQueryService) FailQuery(script string, extern *ast.File, forced error) { s.mu.Lock() defer s.mu.Unlock() // Unblock the flux. - ast := makeAST(script) + ast := makeAST(script, nil) spec := makeASTString(ast) fq, ok := s.queries[spec] if !ok { @@ -129,12 +140,12 @@ func (s *fakeQueryService) FailNextQuery(forced error) { // WaitForQueryLive ensures that the query has made it into the service. // This is particularly useful for the synchronous executor, // because the execution starts on a separate goroutine. -func (s *fakeQueryService) WaitForQueryLive(t *testing.T, script string) { +func (s *fakeQueryService) WaitForQueryLive(t *testing.T, script string, extern *ast.File) { t.Helper() const attempts = 10 - ast := makeAST(script) - astUTC := makeAST(script) + ast := makeAST(script, extern) + astUTC := makeAST(script, extern) astUTC.Now = ast.Now.UTC() spec := makeASTString(ast) specUTC := makeASTString(astUTC)