fix: manual task runs are scheduled asyncronously (#22702)

pull/22732/head
William Baker 2021-10-20 09:12:57 -06:00 committed by GitHub
parent 05e6dc65c5
commit 84776d7428
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 25 additions and 44 deletions

View File

@ -5,12 +5,13 @@ import (
"errors"
"time"
"go.uber.org/zap"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/task/backend/executor"
"github.com/influxdata/influxdb/v2/task/backend/middleware"
"github.com/influxdata/influxdb/v2/task/backend/scheduler"
"github.com/influxdata/influxdb/v2/task/taskmodel"
"go.uber.org/zap"
)
var _ middleware.Coordinator = (*Coordinator)(nil)
@ -166,32 +167,14 @@ func (c *Coordinator) RunCancelled(ctx context.Context, runID platform.ID) error
return err
}
// RunRetried speaks directly to the executor to re-try a task run immediately
func (c *Coordinator) RunRetried(ctx context.Context, task *taskmodel.Task, run *taskmodel.Run) error {
promise, err := c.ex.ManualRun(ctx, task.ID, run.ID)
if err != nil {
return taskmodel.ErrRunExecutionError(err)
}
<-promise.Done()
if err = promise.Error(); err != nil {
return err
}
return nil
}
// RunForced speaks directly to the Executor to run a task immediately
func (c *Coordinator) RunForced(ctx context.Context, task *taskmodel.Task, run *taskmodel.Run) error {
promise, err := c.ex.ManualRun(ctx, task.ID, run.ID)
// the returned promise is not used, since clients expect the HTTP server to return immediately after scheduling the
// task rather than waiting for the task to finish
_, err := c.ex.ManualRun(ctx, task.ID, run.ID)
if err != nil {
return taskmodel.ErrRunExecutionError(err)
}
<-promise.Done()
if err = promise.Error(); err != nil {
return err
}
return nil
}

View File

@ -8,10 +8,11 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"go.uber.org/zap/zaptest"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/task/backend/scheduler"
"github.com/influxdata/influxdb/v2/task/taskmodel"
"go.uber.org/zap/zaptest"
)
func Test_Coordinator_Executor_Methods(t *testing.T) {
@ -49,19 +50,6 @@ func Test_Coordinator_Executor_Methods(t *testing.T) {
},
},
},
{
name: "RunRetried",
call: func(t *testing.T, c *Coordinator) {
if err := c.RunRetried(context.Background(), taskOne, runOne); err != nil {
t.Errorf("expected nil error found %q", err)
}
},
executor: &executorE{
calls: []interface{}{
manualRunCall{taskOne.ID, runOne.ID},
},
},
},
{
name: "RunCancelled",
call: func(t *testing.T, c *Coordinator) {

View File

@ -11,6 +11,8 @@ import (
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux/lang"
"github.com/influxdata/flux/runtime"
"go.uber.org/zap"
"github.com/influxdata/influxdb/v2"
icontext "github.com/influxdata/influxdb/v2/context"
"github.com/influxdata/influxdb/v2/kit/feature"
@ -20,7 +22,6 @@ import (
"github.com/influxdata/influxdb/v2/task/backend"
"github.com/influxdata/influxdb/v2/task/backend/scheduler"
"github.com/influxdata/influxdb/v2/task/taskmodel"
"go.uber.org/zap"
)
const (
@ -225,6 +226,15 @@ func (e *Executor) ManualRun(ctx context.Context, id platform.ID, runID platform
if err != nil {
return nil, err
}
auth, err := icontext.GetAuthorizer(ctx)
if err != nil {
return nil, err
}
// create a new context for running the task in the background so that returning the HTTP response does not cancel the
// context of the task to be run
ctx = icontext.SetAuthorizer(context.Background(), auth)
p, err := e.createPromise(ctx, r)
e.startWorker()

View File

@ -12,6 +12,12 @@ import (
"github.com/golang/mock/gomock"
"github.com/influxdata/flux"
"github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-client-go"
"go.uber.org/zap/zaptest"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/authorization"
icontext "github.com/influxdata/influxdb/v2/context"
@ -29,11 +35,6 @@ import (
"github.com/influxdata/influxdb/v2/task/backend/scheduler"
"github.com/influxdata/influxdb/v2/task/taskmodel"
"github.com/influxdata/influxdb/v2/tenant"
"github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-client-go"
"go.uber.org/zap/zaptest"
)
func TestMain(m *testing.M) {

View File

@ -16,7 +16,6 @@ type Coordinator interface {
TaskUpdated(ctx context.Context, from, to *taskmodel.Task) error
TaskDeleted(context.Context, platform.ID) error
RunCancelled(ctx context.Context, runID platform.ID) error
RunRetried(ctx context.Context, task *taskmodel.Task, run *taskmodel.Run) error
RunForced(ctx context.Context, task *taskmodel.Task, run *taskmodel.Run) error
}
@ -109,7 +108,7 @@ func (s *CoordinatingTaskService) RetryRun(ctx context.Context, taskID, runID pl
return r, err
}
return r, s.coordinator.RunRetried(ctx, t, r)
return r, s.coordinator.RunForced(ctx, t, r)
}
// ForceRun create the forced run in the task system and publish to the pubSub.