From 8104677639bcfaa809633cf4cb2e6ca09c68e2f9 Mon Sep 17 00:00:00 2001 From: Stuart Carnie Date: Wed, 26 Aug 2020 09:44:21 -0700 Subject: [PATCH] revert(tasks): Revert incompatible commits Revert 1ae2541 Revert fde2129 --- CHANGELOG.md | 1 - flags.yml | 6 - kit/feature/list.go | 16 --- task/backend/executor/executor_test.go | 163 ++++++------------------- 4 files changed, 36 insertions(+), 150 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8432013353..b87d432c03 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,6 @@ 1. [19334](https://github.com/influxdata/influxdb/pull/19334): Add --active-config flag to influx to set config for single command 1. [19219](https://github.com/influxdata/influxdb/pull/19219): List buckets via the API now supports after (ID) parameter as an alternative to offset. 1. [19390](https://github.com/influxdata/influxdb/pull/19390): Record last success and failure run times in the Task -1. [19402](https://github.com/influxdata/influxdb/pull/19402): Inject Task's LatestSuccess Timestamp In Flux Extern ### Bug Fixes diff --git a/flags.yml b/flags.yml index 5c32ba0873..859b6991f7 100644 --- a/flags.yml +++ b/flags.yml @@ -138,9 +138,3 @@ key: enforceOrgDashboardLimits default: false contact: Compute Team - -- name: Inject Latest Success Time - description: Inject the latest successful task run timestamp into a Task query extern when executing. - key: injectLatestSuccessTime - default: false - contact: Compute Team diff --git a/kit/feature/list.go b/kit/feature/list.go index c85cf7d7c8..5ed755c25a 100644 --- a/kit/feature/list.go +++ b/kit/feature/list.go @@ -254,20 +254,6 @@ func EnforceOrganizationDashboardLimits() BoolFlag { return enforceOrgDashboardLimits } -var injectLatestSuccessTime = MakeBoolFlag( - "Inject Latest Success Time", - "injectLatestSuccessTime", - "Compute Team", - false, - Temporary, - false, -) - -// InjectLatestSuccessTime - Inject the latest successful task run timestamp into a Task query extern when executing. -func InjectLatestSuccessTime() BoolFlag { - return injectLatestSuccessTime -} - var all = []Flag{ appMetrics, backendExample, @@ -287,7 +273,6 @@ var all = []Flag{ pushDownGroupAggregateMinMax, orgOnlyMemberList, enforceOrgDashboardLimits, - injectLatestSuccessTime, } var byKey = map[string]Flag{ @@ -309,5 +294,4 @@ var byKey = map[string]Flag{ "pushDownGroupAggregateMinMax": pushDownGroupAggregateMinMax, "orgOnlyMemberList": orgOnlyMemberList, "enforceOrgDashboardLimits": enforceOrgDashboardLimits, - "injectLatestSuccessTime": injectLatestSuccessTime, } diff --git a/task/backend/executor/executor_test.go b/task/backend/executor/executor_test.go index 9e719ef411..4a1b6b97fc 100644 --- a/task/backend/executor/executor_test.go +++ b/task/backend/executor/executor_test.go @@ -12,17 +12,14 @@ import ( "github.com/golang/mock/gomock" "github.com/influxdata/flux" - "github.com/influxdata/flux/ast" "github.com/influxdata/influxdb/v2" icontext "github.com/influxdata/influxdb/v2/context" "github.com/influxdata/influxdb/v2/inmem" - "github.com/influxdata/influxdb/v2/kit/feature" "github.com/influxdata/influxdb/v2/kit/prom" "github.com/influxdata/influxdb/v2/kit/prom/promtest" tracetest "github.com/influxdata/influxdb/v2/kit/tracing/testing" "github.com/influxdata/influxdb/v2/kv" "github.com/influxdata/influxdb/v2/kv/migration/all" - influxdbmock "github.com/influxdata/influxdb/v2/mock" "github.com/influxdata/influxdb/v2/query" "github.com/influxdata/influxdb/v2/query/fluxlang" "github.com/influxdata/influxdb/v2/task/backend" @@ -88,7 +85,19 @@ func taskExecutorSystem(t *testing.T) tes { } } -func TestTaskExecutor_QuerySuccess(t *testing.T) { +func TestTaskExecutor(t *testing.T) { + t.Run("QuerySuccess", testQuerySuccess) + t.Run("QueryFailure", testQueryFailure) + t.Run("ManualRun", testManualRun) + t.Run("ResumeRun", testResumingRun) + t.Run("WorkerLimit", testWorkerLimit) + t.Run("LimitFunc", testLimitFunc) + t.Run("Metrics", testMetrics) + t.Run("IteratorFailure", testIteratorFailure) + t.Run("ErrorHandling", testErrorHandling) +} + +func testQuerySuccess(t *testing.T) { t.Parallel() tes := taskExecutorSystem(t) @@ -124,8 +133,8 @@ func TestTaskExecutor_QuerySuccess(t *testing.T) { t.Fatalf("did not correctly set RunAt value, got: %v", run.RunAt) } - tes.svc.WaitForQueryLive(t, script, nil) - tes.svc.SucceedQuery(script, nil) + tes.svc.WaitForQueryLive(t, script) + tes.svc.SucceedQuery(script) <-promise.Done() @@ -156,107 +165,7 @@ func TestTaskExecutor_QuerySuccess(t *testing.T) { } } -func TestTaskExecutor_QuerySuccessWithExternInjection(t *testing.T) { - t.Parallel() - - tes := taskExecutorSystem(t) - - var ( - script = fmt.Sprintf(fmtTestScript, t.Name()) - ctx = icontext.SetAuthorizer(context.Background(), tes.tc.Auth) - span = opentracing.GlobalTracer().StartSpan("test-span") - ) - ctx = opentracing.ContextWithSpan(ctx, span) - - task, err := tes.i.CreateTask(ctx, influxdb.TaskCreate{ - OrganizationID: tes.tc.OrgID, - OwnerID: tes.tc.Auth.GetUserID(), - Flux: script, - }) - if err != nil { - t.Fatal(err) - } - - // Simulate previous run to establish a timestamp - latestSuccess := time.Now().UTC() - task, err = tes.i.UpdateTask(ctx, task.ID, influxdb.TaskUpdate{ - LatestSuccess: &latestSuccess, - }) - if err != nil { - t.Fatal(err) - } - - extern := &ast.File{ - Body: []ast.Statement{&ast.OptionStatement{ - Assignment: &ast.VariableAssignment{ - ID: &ast.Identifier{Name: "tasks.lastSuccessTime"}, - Init: &ast.DateTimeLiteral{ - Value: latestSuccess, - }, - }, - }, - }, - } - - ctx, err = feature.Annotate(ctx, influxdbmock.NewFlagger(map[feature.Flag]interface{}{ - feature.InjectLatestSuccessTime(): true, - })) - if err != nil { - t.Fatal(err) - } - - promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0)) - if err != nil { - t.Fatal(err) - } - promiseID := influxdb.ID(promise.ID()) - - run, err := tes.i.FindRunByID(context.Background(), task.ID, promiseID) - if err != nil { - t.Fatal(err) - } - - if run.ID != promiseID { - t.Fatal("promise and run dont match") - } - - if run.RunAt != time.Unix(126, 0).UTC() { - t.Fatalf("did not correctly set RunAt value, got: %v", run.RunAt) - } - - tes.svc.WaitForQueryLive(t, script, extern) - tes.svc.SucceedQuery(script, extern) - - <-promise.Done() - - if got := promise.Error(); got != nil { - t.Fatal(got) - } - - // confirm run is removed from in-mem store - run, err = tes.i.FindRunByID(context.Background(), task.ID, run.ID) - if run != nil || err == nil || !strings.Contains(err.Error(), "run not found") { - t.Fatal("run was returned when it should have been removed from kv") - } - - // ensure the run returned by TaskControlService.FinishRun(...) - // has run logs formatted as expected - if run = tes.tcs.run; run == nil { - t.Fatal("expected run returned by FinishRun to not be nil") - } - - if len(run.Log) < 3 { - t.Fatalf("expected 3 run logs, found %d", len(run.Log)) - } - - sctx := span.Context().(jaeger.SpanContext) - expectedMessage := fmt.Sprintf("trace_id=%s is_sampled=true", sctx.TraceID()) - if expectedMessage != run.Log[1].Message { - t.Errorf("expected %q, found %q", expectedMessage, run.Log[1].Message) - } -} - -func TestTaskExecutor_QueryFailure(t *testing.T) { +func testQueryFailure(t *testing.T) { t.Parallel() tes := taskExecutorSystem(t) @@ -282,8 +191,8 @@ func TestTaskExecutor_QueryFailure(t *testing.T) { t.Fatal("promise and run dont match") } - tes.svc.WaitForQueryLive(t, script, nil) - tes.svc.FailQuery(script, nil, errors.New("blargyblargblarg")) + tes.svc.WaitForQueryLive(t, script) + tes.svc.FailQuery(script, errors.New("blargyblargblarg")) <-promise.Done() @@ -292,7 +201,7 @@ func TestTaskExecutor_QueryFailure(t *testing.T) { } } -func TestManualRun(t *testing.T) { +func testManualRun(t *testing.T) { t.Parallel() tes := taskExecutorSystem(t) @@ -331,15 +240,15 @@ func TestManualRun(t *testing.T) { t.Fatal("promise and run and manual run dont match") } - tes.svc.WaitForQueryLive(t, script, nil) - tes.svc.SucceedQuery(script, nil) + tes.svc.WaitForQueryLive(t, script) + tes.svc.SucceedQuery(script) if got := promise.Error(); got != nil { t.Fatal(got) } } -func TestTaskExecutor_ResumingRun(t *testing.T) { +func testResumingRun(t *testing.T) { t.Parallel() tes := taskExecutorSystem(t) @@ -374,15 +283,15 @@ func TestTaskExecutor_ResumingRun(t *testing.T) { t.Fatal("promise and run and manual run dont match") } - tes.svc.WaitForQueryLive(t, script, nil) - tes.svc.SucceedQuery(script, nil) + tes.svc.WaitForQueryLive(t, script) + tes.svc.SucceedQuery(script) if got := promise.Error(); got != nil { t.Fatal(got) } } -func TestTaskExecutor_WorkerLimit(t *testing.T) { +func testWorkerLimit(t *testing.T) { t.Parallel() tes := taskExecutorSystem(t) @@ -402,8 +311,8 @@ func TestTaskExecutor_WorkerLimit(t *testing.T) { t.Fatal("expected a worker to be started") } - tes.svc.WaitForQueryLive(t, script, nil) - tes.svc.FailQuery(script, nil, errors.New("blargyblargblarg")) + tes.svc.WaitForQueryLive(t, script) + tes.svc.FailQuery(script, errors.New("blargyblargblarg")) <-promise.Done() @@ -412,7 +321,7 @@ func TestTaskExecutor_WorkerLimit(t *testing.T) { } } -func TestTaskExecutor_LimitFunc(t *testing.T) { +func testLimitFunc(t *testing.T) { t.Parallel() tes := taskExecutorSystem(t) @@ -451,7 +360,7 @@ func TestTaskExecutor_LimitFunc(t *testing.T) { } } -func TestTaskExecutor_Metrics(t *testing.T) { +func testMetrics(t *testing.T) { t.Parallel() tes := taskExecutorSystem(t) metrics := tes.metrics @@ -486,7 +395,7 @@ func TestTaskExecutor_Metrics(t *testing.T) { t.Fatal("promise and run dont match") } - tes.svc.WaitForQueryLive(t, script, nil) + tes.svc.WaitForQueryLive(t, script) mg = promtest.MustGather(t, reg) m = promtest.MustFindMetric(t, mg, "task_executor_total_runs_active", nil) @@ -494,7 +403,7 @@ func TestTaskExecutor_Metrics(t *testing.T) { t.Fatalf("expected 1 total runs active, got %v", got) } - tes.svc.SucceedQuery(script, nil) + tes.svc.SucceedQuery(script) <-promise.Done() mg = promtest.MustGather(t, reg) @@ -548,7 +457,7 @@ func TestTaskExecutor_Metrics(t *testing.T) { } -func TestTaskExecutor_IteratorFailure(t *testing.T) { +func testIteratorFailure(t *testing.T) { t.Parallel() tes := taskExecutorSystem(t) @@ -586,8 +495,8 @@ func TestTaskExecutor_IteratorFailure(t *testing.T) { t.Fatal("promise and run dont match") } - tes.svc.WaitForQueryLive(t, script, nil) - tes.svc.SucceedQuery(script, nil) + tes.svc.WaitForQueryLive(t, script) + tes.svc.SucceedQuery(script) <-promise.Done() @@ -596,7 +505,7 @@ func TestTaskExecutor_IteratorFailure(t *testing.T) { } } -func TestTaskExecutor_ErrorHandling(t *testing.T) { +func testErrorHandling(t *testing.T) { t.Parallel() tes := taskExecutorSystem(t) @@ -642,7 +551,7 @@ func TestTaskExecutor_ErrorHandling(t *testing.T) { */ } -func TestTaskExecutor_PromiseFailure(t *testing.T) { +func TestPromiseFailure(t *testing.T) { t.Parallel() tes := taskExecutorSystem(t)