From 986a1acdbea6e266c8710936a1be80799e730409 Mon Sep 17 00:00:00 2001 From: Lyon Hill Date: Tue, 2 Apr 2019 15:55:31 -0600 Subject: [PATCH] Update task executor to use a TaskService (#13099) --- cmd/influxd/launcher/launcher.go | 3 +- task/backend/executor/executor.go | 46 ++++++++++------ task/backend/executor/executor_test.go | 73 +++++++++++++++----------- 3 files changed, 74 insertions(+), 48 deletions(-) diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 279cb5ed9f..2c0239b15c 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -544,7 +544,7 @@ func (m *Launcher) run(ctx context.Context) (err error) { store = taskbackend.NewInMemStore() } - executor := taskexecutor.NewAsyncQueryServiceExecutor(m.logger.With(zap.String("service", "task-executor")), m.queryController, authSvc, store) + executor := taskexecutor.NewAsyncQueryServiceExecutor(m.logger.With(zap.String("service", "task-executor")), m.queryController, authSvc, nil) lw := taskbackend.NewPointLogWriter(pointsWriter) queryService := query.QueryServiceBridge{AsyncQueryService: m.queryController} @@ -555,6 +555,7 @@ func (m *Launcher) run(ctx context.Context) (err error) { m.reg.MustRegister(m.scheduler.PrometheusCollectors()...) taskSvc = task.PlatformAdapter(store, lr, m.scheduler, authSvc, userResourceSvc, orgSvc) + taskexecutor.AddTaskService(executor, taskSvc) taskSvc = coordinator.New(m.logger.With(zap.String("service", "task-coordinator")), m.scheduler, taskSvc) taskSvc = task.NewValidator(m.logger.With(zap.String("service", "task-authz-validator")), taskSvc, bucketSvc) m.taskStore = store diff --git a/task/backend/executor/executor.go b/task/backend/executor/executor.go index b158f9b836..94f0a28de5 100644 --- a/task/backend/executor/executor.go +++ b/task/backend/executor/executor.go @@ -21,7 +21,7 @@ import ( type queryServiceExecutor struct { qs query.QueryService as influxdb.AuthorizationService - st backend.Store + ts influxdb.TaskService logger *zap.Logger wg sync.WaitGroup } @@ -31,17 +31,31 @@ var _ backend.Executor = (*queryServiceExecutor)(nil) // NewQueryServiceExecutor returns a new executor based on the given QueryService. // In general, you should prefer NewAsyncQueryServiceExecutor, as that code is smaller and simpler, // because asynchronous queries are more in line with the Executor interface. -func NewQueryServiceExecutor(logger *zap.Logger, qs query.QueryService, as influxdb.AuthorizationService, st backend.Store) backend.Executor { - return &queryServiceExecutor{logger: logger, qs: qs, as: as, st: st} +func NewQueryServiceExecutor(logger *zap.Logger, qs query.QueryService, as influxdb.AuthorizationService, ts influxdb.TaskService) *queryServiceExecutor { + return &queryServiceExecutor{logger: logger, qs: qs, as: as, ts: ts} +} + +// AddTaskService is a temporary solution to a chicken and egg problem. It takes a executor and sets the task service. +// This is required because the platform adaptor requires a executor but the executor requires a task service. +// TODO(lh): Remove this function once we are no longer using the PlatformAdaptor +func AddTaskService(e backend.Executor, ts influxdb.TaskService) { + qe, ok := e.(*queryServiceExecutor) + if ok { + qe.ts = ts + } + ae, ok := e.(*asyncQueryServiceExecutor) + if ok { + ae.ts = ts + } } func (e *queryServiceExecutor) Execute(ctx context.Context, run backend.QueuedRun) (backend.RunPromise, error) { - t, m, err := e.st.FindTaskByIDWithMeta(ctx, run.TaskID) + t, err := e.ts.FindTaskByID(ctx, run.TaskID) if err != nil { return nil, err } - auth, err := e.as.FindAuthorizationByID(ctx, influxdb.ID(m.AuthorizationID)) + auth, err := e.as.FindAuthorizationByID(ctx, influxdb.ID(t.AuthorizationID)) if err != nil { return nil, err } @@ -59,7 +73,7 @@ type syncRunPromise struct { qr backend.QueuedRun auth *influxdb.Authorization qs query.QueryService - t *backend.StoreTask + t *influxdb.Task ctx context.Context cancel context.CancelFunc logger *zap.Logger @@ -73,7 +87,7 @@ type syncRunPromise struct { var _ backend.RunPromise = (*syncRunPromise)(nil) -func newSyncRunPromise(ctx context.Context, auth *influxdb.Authorization, qr backend.QueuedRun, e *queryServiceExecutor, t *backend.StoreTask) *syncRunPromise { +func newSyncRunPromise(ctx context.Context, auth *influxdb.Authorization, qr backend.QueuedRun, e *queryServiceExecutor, t *influxdb.Task) *syncRunPromise { ctx, cancel := context.WithCancel(ctx) opLogger := e.logger.With(zap.Stringer("task_id", qr.TaskID), zap.Stringer("run_id", qr.RunID)) log, logEnd := logger.NewOperation(opLogger, "Executing task", "execute") @@ -139,7 +153,7 @@ func (p *syncRunPromise) finish(res *runResult, err error) { func (p *syncRunPromise) doQuery(wg *sync.WaitGroup) { defer wg.Done() - spec, err := flux.Compile(p.ctx, p.t.Script, time.Unix(p.qr.Now, 0)) + spec, err := flux.Compile(p.ctx, p.t.Flux, time.Unix(p.qr.Now, 0)) if err != nil { p.finish(nil, err) return @@ -147,7 +161,7 @@ func (p *syncRunPromise) doQuery(wg *sync.WaitGroup) { req := &query.Request{ Authorization: p.auth, - OrganizationID: p.t.Org, + OrganizationID: p.t.OrganizationID, Compiler: lang.SpecCompiler{ Spec: spec, }, @@ -195,7 +209,7 @@ func (p *syncRunPromise) cancelOnContextDone(wg *sync.WaitGroup) { type asyncQueryServiceExecutor struct { qs query.AsyncQueryService as influxdb.AuthorizationService - st backend.Store + ts influxdb.TaskService logger *zap.Logger wg sync.WaitGroup } @@ -203,29 +217,29 @@ type asyncQueryServiceExecutor struct { var _ backend.Executor = (*asyncQueryServiceExecutor)(nil) // NewAsyncQueryServiceExecutor returns a new executor based on the given AsyncQueryService. -func NewAsyncQueryServiceExecutor(logger *zap.Logger, qs query.AsyncQueryService, as influxdb.AuthorizationService, st backend.Store) backend.Executor { - return &asyncQueryServiceExecutor{logger: logger, qs: qs, as: as, st: st} +func NewAsyncQueryServiceExecutor(logger *zap.Logger, qs query.AsyncQueryService, as influxdb.AuthorizationService, ts influxdb.TaskService) backend.Executor { + return &asyncQueryServiceExecutor{logger: logger, qs: qs, as: as, ts: ts} } func (e *asyncQueryServiceExecutor) Execute(ctx context.Context, run backend.QueuedRun) (backend.RunPromise, error) { - t, m, err := e.st.FindTaskByIDWithMeta(ctx, run.TaskID) + t, err := e.ts.FindTaskByID(ctx, run.TaskID) if err != nil { return nil, err } - auth, err := e.as.FindAuthorizationByID(ctx, influxdb.ID(m.AuthorizationID)) + auth, err := e.as.FindAuthorizationByID(ctx, influxdb.ID(t.AuthorizationID)) if err != nil { return nil, err } - spec, err := flux.Compile(ctx, t.Script, time.Unix(run.Now, 0)) + spec, err := flux.Compile(ctx, t.Flux, time.Unix(run.Now, 0)) if err != nil { return nil, err } req := &query.Request{ Authorization: auth, - OrganizationID: t.Org, + OrganizationID: t.OrganizationID, Compiler: lang.SpecCompiler{ Spec: spec, }, diff --git a/task/backend/executor/executor_test.go b/task/backend/executor/executor_test.go index d95ac94d15..a0c67c687d 100644 --- a/task/backend/executor/executor_test.go +++ b/task/backend/executor/executor_test.go @@ -20,6 +20,7 @@ import ( "github.com/influxdata/influxdb/inmem" "github.com/influxdata/influxdb/query" _ "github.com/influxdata/influxdb/query/builtin" + "github.com/influxdata/influxdb/task" "github.com/influxdata/influxdb/task/backend" "github.com/influxdata/influxdb/task/backend/executor" "go.uber.org/zap" @@ -230,10 +231,14 @@ func (ts tables) Do(f func(flux.Table) error) error { func (ts tables) Statistics() flux.Statistics { return flux.Statistics{} } +type noopRunCanceler struct{} + +func (noopRunCanceler) CancelRun(ctx context.Context, taskID, runID platform.ID) error { return nil } + type system struct { name string svc *fakeQueryService - st backend.Store + ts platform.TaskService ex backend.Executor // We really just want an authorization service here, but we take a whole inmem service // to ensure that the authorization service validates org and user existence properly. @@ -244,32 +249,32 @@ type createSysFn func() *system func createAsyncSystem() *system { svc := newFakeQueryService() - st := backend.NewInMemStore() i := inmem.NewService() + ts := task.PlatformAdapter(backend.NewInMemStore(), backend.NopLogReader{}, noopRunCanceler{}, i, i, i) return &system{ name: "AsyncExecutor", svc: svc, - st: st, - ex: executor.NewAsyncQueryServiceExecutor(zap.NewNop(), svc, i, st), + ts: ts, + ex: executor.NewAsyncQueryServiceExecutor(zap.NewNop(), svc, i, ts), i: i, } } func createSyncSystem() *system { svc := newFakeQueryService() - st := backend.NewInMemStore() i := inmem.NewService() + ts := task.PlatformAdapter(backend.NewInMemStore(), backend.NopLogReader{}, noopRunCanceler{}, i, i, i) return &system{ name: "SynchronousExecutor", svc: svc, - st: st, + ts: ts, ex: executor.NewQueryServiceExecutor( zap.NewNop(), query.QueryServiceBridge{ AsyncQueryService: svc, }, i, - st, + ts, ), i: i, } @@ -304,11 +309,12 @@ func testExecutorQuerySuccess(t *testing.T, fn createSysFn) { t.Parallel() script := fmt.Sprintf(fmtTestScript, t.Name()) - tid, err := sys.st.CreateTask(context.Background(), backend.CreateTaskRequest{Org: tc.OrgID, AuthorizationID: tc.AuthzID, Script: script}) + ctx := icontext.SetAuthorizer(context.Background(), tc.Auth) + task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, Token: tc.Auth.Token, Flux: script}) if err != nil { t.Fatal(err) } - qr := backend.QueuedRun{TaskID: tid, RunID: platform.ID(1), Now: 123} + qr := backend.QueuedRun{TaskID: task.ID, RunID: platform.ID(1), Now: 123} rp, err := sys.ex.Execute(context.Background(), qr) if err != nil { t.Fatal(err) @@ -354,8 +360,8 @@ func testExecutorQuerySuccess(t *testing.T, fn createSysFn) { if err != nil { t.Fatal(err) } - if qa.Identifier() != tc.AuthzID { - t.Fatalf("expected query authorizer to have ID %v, got %v", tc.AuthzID, qa.Identifier()) + if qa.Identifier() != tc.Auth.ID { + t.Fatalf("expected query authorizer to have ID %v, got %v", tc.Auth.ID, qa.Identifier()) } }) } @@ -366,11 +372,12 @@ func testExecutorQueryFailure(t *testing.T, fn createSysFn) { t.Run(sys.name+"/QueryFail", func(t *testing.T) { t.Parallel() script := fmt.Sprintf(fmtTestScript, t.Name()) - tid, err := sys.st.CreateTask(context.Background(), backend.CreateTaskRequest{Org: tc.OrgID, AuthorizationID: tc.AuthzID, Script: script}) + ctx := icontext.SetAuthorizer(context.Background(), tc.Auth) + task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, Token: tc.Auth.Token, Flux: script}) if err != nil { t.Fatal(err) } - qr := backend.QueuedRun{TaskID: tid, RunID: platform.ID(1), Now: 123} + qr := backend.QueuedRun{TaskID: task.ID, RunID: platform.ID(1), Now: 123} rp, err := sys.ex.Execute(context.Background(), qr) if err != nil { t.Fatal(err) @@ -395,11 +402,12 @@ func testExecutorPromiseCancel(t *testing.T, fn createSysFn) { t.Run(sys.name+"/PromiseCancel", func(t *testing.T) { t.Parallel() script := fmt.Sprintf(fmtTestScript, t.Name()) - tid, err := sys.st.CreateTask(context.Background(), backend.CreateTaskRequest{Org: tc.OrgID, AuthorizationID: tc.AuthzID, Script: script}) + ctx := icontext.SetAuthorizer(context.Background(), tc.Auth) + task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, Token: tc.Auth.Token, Flux: script}) if err != nil { t.Fatal(err) } - qr := backend.QueuedRun{TaskID: tid, RunID: platform.ID(1), Now: 123} + qr := backend.QueuedRun{TaskID: task.ID, RunID: platform.ID(1), Now: 123} rp, err := sys.ex.Execute(context.Background(), qr) if err != nil { t.Fatal(err) @@ -423,11 +431,12 @@ func testExecutorServiceError(t *testing.T, fn createSysFn) { t.Run(sys.name+"/ServiceError", func(t *testing.T) { t.Parallel() script := fmt.Sprintf(fmtTestScript, t.Name()) - tid, err := sys.st.CreateTask(context.Background(), backend.CreateTaskRequest{Org: tc.OrgID, AuthorizationID: tc.AuthzID, Script: script}) + ctx := icontext.SetAuthorizer(context.Background(), tc.Auth) + task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, Token: tc.Auth.Token, Flux: script}) if err != nil { t.Fatal(err) } - qr := backend.QueuedRun{TaskID: tid, RunID: platform.ID(1), Now: 123} + qr := backend.QueuedRun{TaskID: task.ID, RunID: platform.ID(1), Now: 123} var forced = errors.New("forced") sys.svc.FailNextQuery(forced) @@ -490,11 +499,12 @@ func testExecutorWait(t *testing.T, createSys createSysFn) { defer ctxCancel() script := fmt.Sprintf(fmtTestScript, t.Name()) - tid, err := sys.st.CreateTask(ctx, backend.CreateTaskRequest{Org: tc.OrgID, AuthorizationID: tc.AuthzID, Script: script}) + ctx = icontext.SetAuthorizer(ctx, tc.Auth) + task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, Token: tc.Auth.Token, Flux: script}) if err != nil { t.Fatal(err) } - qr := backend.QueuedRun{TaskID: tid, RunID: platform.ID(1), Now: 123} + qr := backend.QueuedRun{TaskID: task.ID, RunID: platform.ID(1), Now: 123} if _, err := sys.ex.Execute(ctx, qr); err != nil { t.Fatal(err) } @@ -527,14 +537,14 @@ func testExecutorWait(t *testing.T, createSys createSysFn) { sys := createSys() tc := createCreds(t, sys.i) - ctx := context.Background() + ctx := icontext.SetAuthorizer(context.Background(), tc.Auth) script := fmt.Sprintf(fmtTestScript, t.Name()) - tid, err := sys.st.CreateTask(ctx, backend.CreateTaskRequest{Org: tc.OrgID, AuthorizationID: tc.AuthzID, Script: script}) + task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, Token: tc.Auth.Token, Flux: script}) if err != nil { t.Fatal(err) } - qr := backend.QueuedRun{TaskID: tid, RunID: platform.ID(1), Now: 123} + qr := backend.QueuedRun{TaskID: task.ID, RunID: platform.ID(1), Now: 123} rp, err := sys.ex.Execute(ctx, qr) if err != nil { t.Fatal(err) @@ -568,14 +578,14 @@ func testExecutorWait(t *testing.T, createSys createSysFn) { sys := createSys() tc := createCreds(t, sys.i) - ctx := context.Background() + ctx := icontext.SetAuthorizer(context.Background(), tc.Auth) script := fmt.Sprintf(fmtTestScript, t.Name()) - tid, err := sys.st.CreateTask(ctx, backend.CreateTaskRequest{Org: tc.OrgID, AuthorizationID: tc.AuthzID, Script: script}) + task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, Token: tc.Auth.Token, Flux: script}) if err != nil { t.Fatal(err) } - qr := backend.QueuedRun{TaskID: tid, RunID: platform.ID(1), Now: 123} + qr := backend.QueuedRun{TaskID: task.ID, RunID: platform.ID(1), Now: 123} if _, err := sys.ex.Execute(ctx, qr); err != nil { t.Fatal(err) } @@ -609,14 +619,13 @@ func testExecutorWait(t *testing.T, createSys createSysFn) { sys := createSys() tc := createCreds(t, sys.i) - ctx := context.Background() - script := fmt.Sprintf(fmtTestScript, t.Name()) - tid, err := sys.st.CreateTask(ctx, backend.CreateTaskRequest{Org: tc.OrgID, AuthorizationID: tc.AuthzID, Script: script}) + ctx := icontext.SetAuthorizer(context.Background(), tc.Auth) + task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, Token: tc.Auth.Token, Flux: script}) if err != nil { t.Fatal(err) } - qr := backend.QueuedRun{TaskID: tid, RunID: platform.ID(1), Now: 123} + qr := backend.QueuedRun{TaskID: task.ID, RunID: platform.ID(1), Now: 123} if _, err := sys.ex.Execute(ctx, qr); err != nil { t.Fatal(err) } @@ -648,7 +657,8 @@ func testExecutorWait(t *testing.T, createSys createSysFn) { } type testCreds struct { - OrgID, UserID, AuthzID platform.ID + OrgID, UserID platform.ID + Auth *platform.Authorization } func createCreds(t *testing.T, i *inmem.Service) testCreds { @@ -675,11 +685,12 @@ func createCreds(t *testing.T, i *inmem.Service) testCreds { auth := &platform.Authorization{ OrgID: org.ID, UserID: user.ID, + Token: "hifriend!", Permissions: []platform.Permission{*readPerm, *writePerm}, } if err := i.CreateAuthorization(context.Background(), auth); err != nil { t.Fatal(err) } - return testCreds{OrgID: org.ID, AuthzID: auth.ID} + return testCreds{OrgID: org.ID, Auth: auth} }