parent
3633a974b6
commit
8104677639
|
@ -6,7 +6,6 @@
|
||||||
1. [19334](https://github.com/influxdata/influxdb/pull/19334): Add --active-config flag to influx to set config for single command
|
1. [19334](https://github.com/influxdata/influxdb/pull/19334): Add --active-config flag to influx to set config for single command
|
||||||
1. [19219](https://github.com/influxdata/influxdb/pull/19219): List buckets via the API now supports after (ID) parameter as an alternative to offset.
|
1. [19219](https://github.com/influxdata/influxdb/pull/19219): List buckets via the API now supports after (ID) parameter as an alternative to offset.
|
||||||
1. [19390](https://github.com/influxdata/influxdb/pull/19390): Record last success and failure run times in the Task
|
1. [19390](https://github.com/influxdata/influxdb/pull/19390): Record last success and failure run times in the Task
|
||||||
1. [19402](https://github.com/influxdata/influxdb/pull/19402): Inject Task's LatestSuccess Timestamp In Flux Extern
|
|
||||||
|
|
||||||
### Bug Fixes
|
### Bug Fixes
|
||||||
|
|
||||||
|
|
|
@ -138,9 +138,3 @@
|
||||||
key: enforceOrgDashboardLimits
|
key: enforceOrgDashboardLimits
|
||||||
default: false
|
default: false
|
||||||
contact: Compute Team
|
contact: Compute Team
|
||||||
|
|
||||||
- name: Inject Latest Success Time
|
|
||||||
description: Inject the latest successful task run timestamp into a Task query extern when executing.
|
|
||||||
key: injectLatestSuccessTime
|
|
||||||
default: false
|
|
||||||
contact: Compute Team
|
|
||||||
|
|
|
@ -254,20 +254,6 @@ func EnforceOrganizationDashboardLimits() BoolFlag {
|
||||||
return enforceOrgDashboardLimits
|
return enforceOrgDashboardLimits
|
||||||
}
|
}
|
||||||
|
|
||||||
var injectLatestSuccessTime = MakeBoolFlag(
|
|
||||||
"Inject Latest Success Time",
|
|
||||||
"injectLatestSuccessTime",
|
|
||||||
"Compute Team",
|
|
||||||
false,
|
|
||||||
Temporary,
|
|
||||||
false,
|
|
||||||
)
|
|
||||||
|
|
||||||
// InjectLatestSuccessTime - Inject the latest successful task run timestamp into a Task query extern when executing.
|
|
||||||
func InjectLatestSuccessTime() BoolFlag {
|
|
||||||
return injectLatestSuccessTime
|
|
||||||
}
|
|
||||||
|
|
||||||
var all = []Flag{
|
var all = []Flag{
|
||||||
appMetrics,
|
appMetrics,
|
||||||
backendExample,
|
backendExample,
|
||||||
|
@ -287,7 +273,6 @@ var all = []Flag{
|
||||||
pushDownGroupAggregateMinMax,
|
pushDownGroupAggregateMinMax,
|
||||||
orgOnlyMemberList,
|
orgOnlyMemberList,
|
||||||
enforceOrgDashboardLimits,
|
enforceOrgDashboardLimits,
|
||||||
injectLatestSuccessTime,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var byKey = map[string]Flag{
|
var byKey = map[string]Flag{
|
||||||
|
@ -309,5 +294,4 @@ var byKey = map[string]Flag{
|
||||||
"pushDownGroupAggregateMinMax": pushDownGroupAggregateMinMax,
|
"pushDownGroupAggregateMinMax": pushDownGroupAggregateMinMax,
|
||||||
"orgOnlyMemberList": orgOnlyMemberList,
|
"orgOnlyMemberList": orgOnlyMemberList,
|
||||||
"enforceOrgDashboardLimits": enforceOrgDashboardLimits,
|
"enforceOrgDashboardLimits": enforceOrgDashboardLimits,
|
||||||
"injectLatestSuccessTime": injectLatestSuccessTime,
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,17 +12,14 @@ import (
|
||||||
|
|
||||||
"github.com/golang/mock/gomock"
|
"github.com/golang/mock/gomock"
|
||||||
"github.com/influxdata/flux"
|
"github.com/influxdata/flux"
|
||||||
"github.com/influxdata/flux/ast"
|
|
||||||
"github.com/influxdata/influxdb/v2"
|
"github.com/influxdata/influxdb/v2"
|
||||||
icontext "github.com/influxdata/influxdb/v2/context"
|
icontext "github.com/influxdata/influxdb/v2/context"
|
||||||
"github.com/influxdata/influxdb/v2/inmem"
|
"github.com/influxdata/influxdb/v2/inmem"
|
||||||
"github.com/influxdata/influxdb/v2/kit/feature"
|
|
||||||
"github.com/influxdata/influxdb/v2/kit/prom"
|
"github.com/influxdata/influxdb/v2/kit/prom"
|
||||||
"github.com/influxdata/influxdb/v2/kit/prom/promtest"
|
"github.com/influxdata/influxdb/v2/kit/prom/promtest"
|
||||||
tracetest "github.com/influxdata/influxdb/v2/kit/tracing/testing"
|
tracetest "github.com/influxdata/influxdb/v2/kit/tracing/testing"
|
||||||
"github.com/influxdata/influxdb/v2/kv"
|
"github.com/influxdata/influxdb/v2/kv"
|
||||||
"github.com/influxdata/influxdb/v2/kv/migration/all"
|
"github.com/influxdata/influxdb/v2/kv/migration/all"
|
||||||
influxdbmock "github.com/influxdata/influxdb/v2/mock"
|
|
||||||
"github.com/influxdata/influxdb/v2/query"
|
"github.com/influxdata/influxdb/v2/query"
|
||||||
"github.com/influxdata/influxdb/v2/query/fluxlang"
|
"github.com/influxdata/influxdb/v2/query/fluxlang"
|
||||||
"github.com/influxdata/influxdb/v2/task/backend"
|
"github.com/influxdata/influxdb/v2/task/backend"
|
||||||
|
@ -88,7 +85,19 @@ func taskExecutorSystem(t *testing.T) tes {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTaskExecutor_QuerySuccess(t *testing.T) {
|
func TestTaskExecutor(t *testing.T) {
|
||||||
|
t.Run("QuerySuccess", testQuerySuccess)
|
||||||
|
t.Run("QueryFailure", testQueryFailure)
|
||||||
|
t.Run("ManualRun", testManualRun)
|
||||||
|
t.Run("ResumeRun", testResumingRun)
|
||||||
|
t.Run("WorkerLimit", testWorkerLimit)
|
||||||
|
t.Run("LimitFunc", testLimitFunc)
|
||||||
|
t.Run("Metrics", testMetrics)
|
||||||
|
t.Run("IteratorFailure", testIteratorFailure)
|
||||||
|
t.Run("ErrorHandling", testErrorHandling)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testQuerySuccess(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
tes := taskExecutorSystem(t)
|
tes := taskExecutorSystem(t)
|
||||||
|
@ -124,8 +133,8 @@ func TestTaskExecutor_QuerySuccess(t *testing.T) {
|
||||||
t.Fatalf("did not correctly set RunAt value, got: %v", run.RunAt)
|
t.Fatalf("did not correctly set RunAt value, got: %v", run.RunAt)
|
||||||
}
|
}
|
||||||
|
|
||||||
tes.svc.WaitForQueryLive(t, script, nil)
|
tes.svc.WaitForQueryLive(t, script)
|
||||||
tes.svc.SucceedQuery(script, nil)
|
tes.svc.SucceedQuery(script)
|
||||||
|
|
||||||
<-promise.Done()
|
<-promise.Done()
|
||||||
|
|
||||||
|
@ -156,107 +165,7 @@ func TestTaskExecutor_QuerySuccess(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTaskExecutor_QuerySuccessWithExternInjection(t *testing.T) {
|
func testQueryFailure(t *testing.T) {
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
tes := taskExecutorSystem(t)
|
|
||||||
|
|
||||||
var (
|
|
||||||
script = fmt.Sprintf(fmtTestScript, t.Name())
|
|
||||||
ctx = icontext.SetAuthorizer(context.Background(), tes.tc.Auth)
|
|
||||||
span = opentracing.GlobalTracer().StartSpan("test-span")
|
|
||||||
)
|
|
||||||
ctx = opentracing.ContextWithSpan(ctx, span)
|
|
||||||
|
|
||||||
task, err := tes.i.CreateTask(ctx, influxdb.TaskCreate{
|
|
||||||
OrganizationID: tes.tc.OrgID,
|
|
||||||
OwnerID: tes.tc.Auth.GetUserID(),
|
|
||||||
Flux: script,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Simulate previous run to establish a timestamp
|
|
||||||
latestSuccess := time.Now().UTC()
|
|
||||||
task, err = tes.i.UpdateTask(ctx, task.ID, influxdb.TaskUpdate{
|
|
||||||
LatestSuccess: &latestSuccess,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
extern := &ast.File{
|
|
||||||
Body: []ast.Statement{&ast.OptionStatement{
|
|
||||||
Assignment: &ast.VariableAssignment{
|
|
||||||
ID: &ast.Identifier{Name: "tasks.lastSuccessTime"},
|
|
||||||
Init: &ast.DateTimeLiteral{
|
|
||||||
Value: latestSuccess,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, err = feature.Annotate(ctx, influxdbmock.NewFlagger(map[feature.Flag]interface{}{
|
|
||||||
feature.InjectLatestSuccessTime(): true,
|
|
||||||
}))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
promiseID := influxdb.ID(promise.ID())
|
|
||||||
|
|
||||||
run, err := tes.i.FindRunByID(context.Background(), task.ID, promiseID)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if run.ID != promiseID {
|
|
||||||
t.Fatal("promise and run dont match")
|
|
||||||
}
|
|
||||||
|
|
||||||
if run.RunAt != time.Unix(126, 0).UTC() {
|
|
||||||
t.Fatalf("did not correctly set RunAt value, got: %v", run.RunAt)
|
|
||||||
}
|
|
||||||
|
|
||||||
tes.svc.WaitForQueryLive(t, script, extern)
|
|
||||||
tes.svc.SucceedQuery(script, extern)
|
|
||||||
|
|
||||||
<-promise.Done()
|
|
||||||
|
|
||||||
if got := promise.Error(); got != nil {
|
|
||||||
t.Fatal(got)
|
|
||||||
}
|
|
||||||
|
|
||||||
// confirm run is removed from in-mem store
|
|
||||||
run, err = tes.i.FindRunByID(context.Background(), task.ID, run.ID)
|
|
||||||
if run != nil || err == nil || !strings.Contains(err.Error(), "run not found") {
|
|
||||||
t.Fatal("run was returned when it should have been removed from kv")
|
|
||||||
}
|
|
||||||
|
|
||||||
// ensure the run returned by TaskControlService.FinishRun(...)
|
|
||||||
// has run logs formatted as expected
|
|
||||||
if run = tes.tcs.run; run == nil {
|
|
||||||
t.Fatal("expected run returned by FinishRun to not be nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(run.Log) < 3 {
|
|
||||||
t.Fatalf("expected 3 run logs, found %d", len(run.Log))
|
|
||||||
}
|
|
||||||
|
|
||||||
sctx := span.Context().(jaeger.SpanContext)
|
|
||||||
expectedMessage := fmt.Sprintf("trace_id=%s is_sampled=true", sctx.TraceID())
|
|
||||||
if expectedMessage != run.Log[1].Message {
|
|
||||||
t.Errorf("expected %q, found %q", expectedMessage, run.Log[1].Message)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestTaskExecutor_QueryFailure(t *testing.T) {
|
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
tes := taskExecutorSystem(t)
|
tes := taskExecutorSystem(t)
|
||||||
|
|
||||||
|
@ -282,8 +191,8 @@ func TestTaskExecutor_QueryFailure(t *testing.T) {
|
||||||
t.Fatal("promise and run dont match")
|
t.Fatal("promise and run dont match")
|
||||||
}
|
}
|
||||||
|
|
||||||
tes.svc.WaitForQueryLive(t, script, nil)
|
tes.svc.WaitForQueryLive(t, script)
|
||||||
tes.svc.FailQuery(script, nil, errors.New("blargyblargblarg"))
|
tes.svc.FailQuery(script, errors.New("blargyblargblarg"))
|
||||||
|
|
||||||
<-promise.Done()
|
<-promise.Done()
|
||||||
|
|
||||||
|
@ -292,7 +201,7 @@ func TestTaskExecutor_QueryFailure(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestManualRun(t *testing.T) {
|
func testManualRun(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
tes := taskExecutorSystem(t)
|
tes := taskExecutorSystem(t)
|
||||||
|
|
||||||
|
@ -331,15 +240,15 @@ func TestManualRun(t *testing.T) {
|
||||||
t.Fatal("promise and run and manual run dont match")
|
t.Fatal("promise and run and manual run dont match")
|
||||||
}
|
}
|
||||||
|
|
||||||
tes.svc.WaitForQueryLive(t, script, nil)
|
tes.svc.WaitForQueryLive(t, script)
|
||||||
tes.svc.SucceedQuery(script, nil)
|
tes.svc.SucceedQuery(script)
|
||||||
|
|
||||||
if got := promise.Error(); got != nil {
|
if got := promise.Error(); got != nil {
|
||||||
t.Fatal(got)
|
t.Fatal(got)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTaskExecutor_ResumingRun(t *testing.T) {
|
func testResumingRun(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
tes := taskExecutorSystem(t)
|
tes := taskExecutorSystem(t)
|
||||||
|
|
||||||
|
@ -374,15 +283,15 @@ func TestTaskExecutor_ResumingRun(t *testing.T) {
|
||||||
t.Fatal("promise and run and manual run dont match")
|
t.Fatal("promise and run and manual run dont match")
|
||||||
}
|
}
|
||||||
|
|
||||||
tes.svc.WaitForQueryLive(t, script, nil)
|
tes.svc.WaitForQueryLive(t, script)
|
||||||
tes.svc.SucceedQuery(script, nil)
|
tes.svc.SucceedQuery(script)
|
||||||
|
|
||||||
if got := promise.Error(); got != nil {
|
if got := promise.Error(); got != nil {
|
||||||
t.Fatal(got)
|
t.Fatal(got)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTaskExecutor_WorkerLimit(t *testing.T) {
|
func testWorkerLimit(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
tes := taskExecutorSystem(t)
|
tes := taskExecutorSystem(t)
|
||||||
|
|
||||||
|
@ -402,8 +311,8 @@ func TestTaskExecutor_WorkerLimit(t *testing.T) {
|
||||||
t.Fatal("expected a worker to be started")
|
t.Fatal("expected a worker to be started")
|
||||||
}
|
}
|
||||||
|
|
||||||
tes.svc.WaitForQueryLive(t, script, nil)
|
tes.svc.WaitForQueryLive(t, script)
|
||||||
tes.svc.FailQuery(script, nil, errors.New("blargyblargblarg"))
|
tes.svc.FailQuery(script, errors.New("blargyblargblarg"))
|
||||||
|
|
||||||
<-promise.Done()
|
<-promise.Done()
|
||||||
|
|
||||||
|
@ -412,7 +321,7 @@ func TestTaskExecutor_WorkerLimit(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTaskExecutor_LimitFunc(t *testing.T) {
|
func testLimitFunc(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
tes := taskExecutorSystem(t)
|
tes := taskExecutorSystem(t)
|
||||||
|
|
||||||
|
@ -451,7 +360,7 @@ func TestTaskExecutor_LimitFunc(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTaskExecutor_Metrics(t *testing.T) {
|
func testMetrics(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
tes := taskExecutorSystem(t)
|
tes := taskExecutorSystem(t)
|
||||||
metrics := tes.metrics
|
metrics := tes.metrics
|
||||||
|
@ -486,7 +395,7 @@ func TestTaskExecutor_Metrics(t *testing.T) {
|
||||||
t.Fatal("promise and run dont match")
|
t.Fatal("promise and run dont match")
|
||||||
}
|
}
|
||||||
|
|
||||||
tes.svc.WaitForQueryLive(t, script, nil)
|
tes.svc.WaitForQueryLive(t, script)
|
||||||
|
|
||||||
mg = promtest.MustGather(t, reg)
|
mg = promtest.MustGather(t, reg)
|
||||||
m = promtest.MustFindMetric(t, mg, "task_executor_total_runs_active", nil)
|
m = promtest.MustFindMetric(t, mg, "task_executor_total_runs_active", nil)
|
||||||
|
@ -494,7 +403,7 @@ func TestTaskExecutor_Metrics(t *testing.T) {
|
||||||
t.Fatalf("expected 1 total runs active, got %v", got)
|
t.Fatalf("expected 1 total runs active, got %v", got)
|
||||||
}
|
}
|
||||||
|
|
||||||
tes.svc.SucceedQuery(script, nil)
|
tes.svc.SucceedQuery(script)
|
||||||
<-promise.Done()
|
<-promise.Done()
|
||||||
|
|
||||||
mg = promtest.MustGather(t, reg)
|
mg = promtest.MustGather(t, reg)
|
||||||
|
@ -548,7 +457,7 @@ func TestTaskExecutor_Metrics(t *testing.T) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTaskExecutor_IteratorFailure(t *testing.T) {
|
func testIteratorFailure(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
tes := taskExecutorSystem(t)
|
tes := taskExecutorSystem(t)
|
||||||
|
|
||||||
|
@ -586,8 +495,8 @@ func TestTaskExecutor_IteratorFailure(t *testing.T) {
|
||||||
t.Fatal("promise and run dont match")
|
t.Fatal("promise and run dont match")
|
||||||
}
|
}
|
||||||
|
|
||||||
tes.svc.WaitForQueryLive(t, script, nil)
|
tes.svc.WaitForQueryLive(t, script)
|
||||||
tes.svc.SucceedQuery(script, nil)
|
tes.svc.SucceedQuery(script)
|
||||||
|
|
||||||
<-promise.Done()
|
<-promise.Done()
|
||||||
|
|
||||||
|
@ -596,7 +505,7 @@ func TestTaskExecutor_IteratorFailure(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTaskExecutor_ErrorHandling(t *testing.T) {
|
func testErrorHandling(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
tes := taskExecutorSystem(t)
|
tes := taskExecutorSystem(t)
|
||||||
|
|
||||||
|
@ -642,7 +551,7 @@ func TestTaskExecutor_ErrorHandling(t *testing.T) {
|
||||||
*/
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTaskExecutor_PromiseFailure(t *testing.T) {
|
func TestPromiseFailure(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
tes := taskExecutorSystem(t)
|
tes := taskExecutorSystem(t)
|
||||||
|
|
Loading…
Reference in New Issue