refactor(tasks): separate coordinator and middleware (#14779)

pull/14789/head
George 2019-08-23 19:05:13 +00:00 committed by GitHub
parent 6ffbab89d4
commit 0cc9caa1d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 941 additions and 510 deletions

View File

@ -18,6 +18,7 @@
1. [14497](https://github.com/influxdata/influxdb/pull/14497): Update the documentation link for Telegraf.
1. [14492](https://github.com/influxdata/influxdb/pull/14492): Fix to surface errors properly as task notifications on create.
1. [14569](https://github.com/influxdata/influxdb/pull/14569): Fix limiting of get runs for task.
1. [14779](https://github.com/influxdata/influxdb/pull/14779): Refactor tasks coordinator.
## v2.0.0-alpha.16 [2019-07-25]

View File

@ -40,6 +40,7 @@ import (
taskbackend "github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb/task/backend/coordinator"
taskexecutor "github.com/influxdata/influxdb/task/backend/executor"
"github.com/influxdata/influxdb/task/backend/middleware"
"github.com/influxdata/influxdb/telemetry"
_ "github.com/influxdata/influxdb/tsdb/tsi1" // needed for tsi1
_ "github.com/influxdata/influxdb/tsdb/tsm1" // needed for tsm1
@ -551,7 +552,15 @@ func (m *Launcher) run(ctx context.Context) (err error) {
m.scheduler.Start(ctx)
m.reg.MustRegister(m.scheduler.PrometheusCollectors()...)
taskSvc = coordinator.New(m.logger.With(zap.String("service", "task-coordinator")), m.scheduler, combinedTaskService)
logger := m.logger.With(zap.String("service", "task-coordinator"))
coordinator := coordinator.New(logger, m.scheduler)
// resume existing task claims from task service
if err := taskbackend.NotifyCoordinatorOfExisting(ctx, combinedTaskService, coordinator, logger); err != nil {
logger.Error("failed to resume existing tasks", zap.Error(err))
}
taskSvc = middleware.New(combinedTaskService, coordinator)
taskSvc = authorizer.NewTaskService(m.logger.With(zap.String("service", "task-authz-validator")), taskSvc, bucketSvc)
m.taskControlService = combinedTaskService
}

View File

@ -0,0 +1,60 @@
package backend
import (
"context"
"time"
"github.com/influxdata/influxdb"
"go.uber.org/zap"
)
var now = func() time.Time {
return time.Now().UTC()
}
// TaskService is a type on which tasks can be listed
type TaskService interface {
FindTasks(context.Context, influxdb.TaskFilter) ([]*influxdb.Task, int, error)
UpdateTask(context.Context, influxdb.ID, influxdb.TaskUpdate) (*influxdb.Task, error)
}
// Coordinator is a type with a single method which
// is called when a task has been created
type Coordinator interface {
TaskCreated(context.Context, *influxdb.Task) error
}
// NotifyCoordinatorOfExisting lists all tasks by the provided task service and for
// each task it calls the provided coordinators task created method
func NotifyCoordinatorOfExisting(ctx context.Context, ts TaskService, coord Coordinator, logger *zap.Logger) error {
// If we missed a Create Action
wildcard := influxdb.TaskTypeWildcard
tasks, _, err := ts.FindTasks(ctx, influxdb.TaskFilter{Type: &wildcard})
if err != nil {
return err
}
latestCompleted := now().Format(time.RFC3339)
for len(tasks) > 0 {
for _, task := range tasks {
task, err := ts.UpdateTask(context.Background(), task.ID, influxdb.TaskUpdate{
LatestCompleted: &latestCompleted,
})
if err != nil {
logger.Error("failed to set latestCompleted", zap.Error(err))
}
coord.TaskCreated(ctx, task)
}
tasks, _, err = ts.FindTasks(ctx, influxdb.TaskFilter{
Type: &wildcard,
After: &tasks[len(tasks)-1].ID,
})
if err != nil {
return err
}
}
return nil
}

View File

@ -2,22 +2,20 @@ package coordinator
import (
"context"
"fmt"
"time"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb/task/backend/middleware"
"go.uber.org/zap"
)
type Coordinator struct {
platform.TaskService
var _ middleware.Coordinator = (*Coordinator)(nil)
type Coordinator struct {
logger *zap.Logger
sch backend.Scheduler
limit int
claimExisting bool
}
type Option func(*Coordinator)
@ -28,168 +26,62 @@ func WithLimit(i int) Option {
}
}
// WithoutExistingTasks allows us to skip claiming tasks already in the system.
func WithoutExistingTasks() Option {
return func(c *Coordinator) {
c.claimExisting = false
}
}
func New(logger *zap.Logger, scheduler backend.Scheduler, ts platform.TaskService, opts ...Option) *Coordinator {
func New(logger *zap.Logger, scheduler backend.Scheduler, opts ...Option) *Coordinator {
c := &Coordinator{
logger: logger,
sch: scheduler,
TaskService: ts,
limit: 1000,
claimExisting: true,
}
for _, opt := range opts {
opt(c)
}
if c.claimExisting {
go c.claimExistingTasks()
}
return c
}
// claimExistingTasks is called on startup to claim all tasks in the store.
func (c *Coordinator) claimExistingTasks() {
tasks, _, err := c.TaskService.FindTasks(context.Background(), platform.TaskFilter{})
if err != nil {
return
}
newLatestCompleted := time.Now().UTC().Format(time.RFC3339)
for len(tasks) > 0 {
for _, task := range tasks {
task, err := c.TaskService.UpdateTask(context.Background(), task.ID, platform.TaskUpdate{LatestCompleted: &newLatestCompleted})
if err != nil {
c.logger.Error("failed to set latestCompleted", zap.Error(err))
func (c *Coordinator) TaskCreated(ctx context.Context, task *influxdb.Task) error {
return c.sch.ClaimTask(ctx, task)
}
if task.Status != string(backend.TaskActive) {
// Don't claim inactive tasks at startup.
continue
}
// I may need a context with an auth here
if err := c.sch.ClaimTask(context.Background(), task); err != nil {
c.logger.Error("failed claim task", zap.Error(err))
continue
}
}
tasks, _, err = c.TaskService.FindTasks(context.Background(), platform.TaskFilter{
After: &tasks[len(tasks)-1].ID,
})
if err != nil {
c.logger.Error("failed list additional tasks", zap.Error(err))
return
}
func (c *Coordinator) TaskUpdated(ctx context.Context, from, to *influxdb.Task) error {
// if disabling the task release it before schedule update
if to.Status != from.Status && to.Status == string(backend.TaskInactive) {
if err := c.sch.ReleaseTask(to.ID); err != nil && err != influxdb.ErrTaskNotClaimed {
return err
}
}
func (c *Coordinator) CreateTask(ctx context.Context, t platform.TaskCreate) (*platform.Task, error) {
task, err := c.TaskService.CreateTask(ctx, t)
if err != nil {
return task, err
}
if err := c.sch.ClaimTask(ctx, task); err != nil {
delErr := c.TaskService.DeleteTask(ctx, task.ID)
if delErr != nil {
return task, fmt.Errorf("schedule task failed: %s\n\tcleanup also failed: %s", err, delErr)
}
return task, err
}
return task, nil
}
func (c *Coordinator) UpdateTask(ctx context.Context, id platform.ID, upd platform.TaskUpdate) (*platform.Task, error) {
oldTask, err := c.TaskService.FindTaskByID(ctx, id)
if err != nil {
return nil, err
}
task, err := c.TaskService.UpdateTask(ctx, id, upd)
if err != nil {
return task, err
}
// If disabling the task, do so before modifying the script.
if task.Status != oldTask.Status && task.Status == string(backend.TaskInactive) {
if err := c.sch.ReleaseTask(id); err != nil && err != platform.ErrTaskNotClaimed {
return task, err
}
}
if err := c.sch.UpdateTask(ctx, task); err != nil && err != platform.ErrTaskNotClaimed {
return task, err
}
// If enabling the task, claim it after modifying the script.
if task.Status != oldTask.Status && task.Status == string(backend.TaskActive) {
// don't catch up on all the missed task runs while disabled
newLatestCompleted := c.sch.Now().UTC().Format(time.RFC3339)
task, err := c.TaskService.UpdateTask(ctx, task.ID, platform.TaskUpdate{LatestCompleted: &newLatestCompleted})
if err != nil {
return task, err
}
if err := c.sch.ClaimTask(ctx, task); err != nil && err != platform.ErrTaskAlreadyClaimed {
return task, err
}
}
return task, nil
}
func (c *Coordinator) DeleteTask(ctx context.Context, id platform.ID) error {
if err := c.sch.ReleaseTask(id); err != nil && err != platform.ErrTaskNotClaimed {
if err := c.sch.UpdateTask(ctx, to); err != nil && err != influxdb.ErrTaskNotClaimed {
return err
}
return c.TaskService.DeleteTask(ctx, id)
// if enabling the task then claim it
if to.Status != from.Status && to.Status == string(backend.TaskActive) {
if err := c.sch.ClaimTask(ctx, to); err != nil && err != influxdb.ErrTaskAlreadyClaimed {
return err
}
}
func (c *Coordinator) CancelRun(ctx context.Context, taskID, runID platform.ID) error {
err := c.sch.CancelRun(ctx, taskID, runID)
if err != nil {
return nil
}
func (c *Coordinator) TaskDeleted(ctx context.Context, id influxdb.ID) error {
if err := c.sch.ReleaseTask(id); err != nil && err != influxdb.ErrTaskNotClaimed {
return err
}
// TODO(lh): Im not sure if we need to call the task service here directly or if the scheduler does that
// for now we will do it and then if it causes errors we can opt to do it in the scheduler only
return c.TaskService.CancelRun(ctx, taskID, runID)
return nil
}
func (c *Coordinator) RetryRun(ctx context.Context, taskID, runID platform.ID) (*platform.Run, error) {
task, err := c.TaskService.FindTaskByID(ctx, taskID)
if err != nil {
return nil, err
func (c *Coordinator) RunCancelled(ctx context.Context, taskID, runID influxdb.ID) error {
return c.sch.CancelRun(ctx, taskID, runID)
}
r, err := c.TaskService.RetryRun(ctx, taskID, runID)
if err != nil {
return r, err
func (c *Coordinator) RunRetried(ctx context.Context, task *influxdb.Task, run *influxdb.Run) error {
return c.sch.UpdateTask(ctx, task)
}
return r, c.sch.UpdateTask(ctx, task)
}
func (c *Coordinator) ForceRun(ctx context.Context, taskID platform.ID, scheduledFor int64) (*platform.Run, error) {
task, err := c.TaskService.FindTaskByID(ctx, taskID)
if err != nil {
return nil, err
}
r, err := c.TaskService.ForceRun(ctx, taskID, scheduledFor)
if err != nil {
return r, err
}
return r, c.sch.UpdateTask(ctx, task)
func (c *Coordinator) RunForced(ctx context.Context, task *influxdb.Task, run *influxdb.Run) error {
return c.sch.UpdateTask(ctx, task)
}

View File

@ -1,387 +1,199 @@
package coordinator_test
package coordinator
import (
"context"
"errors"
"sync"
"testing"
"time"
platform "github.com/influxdata/influxdb"
pmock "github.com/influxdata/influxdb/mock"
_ "github.com/influxdata/influxdb/query/builtin"
"github.com/influxdata/influxdb/snowflake"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb/task/backend/coordinator"
"github.com/influxdata/influxdb/task/mock"
"go.uber.org/zap/zaptest"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb"
"go.uber.org/zap"
)
func timeoutSelector(ch <-chan *platform.Task) (*platform.Task, error) {
select {
case task := <-ch:
return task, nil
case <-time.After(10 * time.Second):
return nil, errors.New("timeout on select")
}
var (
one = influxdb.ID(1)
two = influxdb.ID(2)
three = influxdb.ID(3)
taskOne = &influxdb.Task{ID: one}
taskTwo = &influxdb.Task{ID: two, Status: "active"}
taskThree = &influxdb.Task{ID: three, Status: "inactive"}
activeThree = &influxdb.Task{
ID: three,
Status: "active",
}
const script = `option task = {name: "a task",cron: "* * * * *"} from(bucket:"test") |> range(start:-1h)`
// TODO(lh): Once we have a kv.TaskService this entire part can be replaced with kv.TaskService using a inmem kv.Store
func inmemTaskService() platform.TaskService {
gen := snowflake.NewDefaultIDGenerator()
tasks := map[platform.ID]*platform.Task{}
mu := sync.Mutex{}
ts := &pmock.TaskService{
CreateTaskFn: func(ctx context.Context, tc platform.TaskCreate) (*platform.Task, error) {
mu.Lock()
defer mu.Unlock()
id := gen.ID()
task := &platform.Task{ID: id, Flux: tc.Flux, Status: tc.Status, OrganizationID: tc.OrganizationID, Organization: tc.Organization}
if task.Status == "" {
task.Status = string(backend.TaskActive)
runOne = &influxdb.Run{
ID: one,
TaskID: one,
}
tasks[id] = task
return tasks[id], nil
allowUnexported = cmp.AllowUnexported(scheduler{})
)
func Test_Coordinator(t *testing.T) {
for _, test := range []struct {
name string
claimErr error
updateErr error
releaseErr error
call func(*testing.T, *Coordinator)
scheduler *scheduler
}{
{
name: "TaskCreated",
call: func(t *testing.T, c *Coordinator) {
if err := c.TaskCreated(context.Background(), taskOne); err != nil {
t.Errorf("expected nil error found %q", err)
}
},
DeleteTaskFn: func(ctx context.Context, id platform.ID) error {
mu.Lock()
defer mu.Unlock()
delete(tasks, id)
return nil
scheduler: &scheduler{
calls: []interface{}{
claimCall{taskOne},
},
UpdateTaskFn: func(ctx context.Context, id platform.ID, upd platform.TaskUpdate) (*platform.Task, error) {
mu.Lock()
defer mu.Unlock()
t, ok := tasks[id]
if !ok {
return nil, platform.ErrTaskNotFound
}
if upd.Flux != nil {
t.Flux = *upd.Flux
}
if upd.Status != nil {
t.Status = *upd.Status
}
if upd.LatestCompleted != nil {
t.LatestCompleted = *upd.LatestCompleted
}
return t, nil
},
FindTaskByIDFn: func(ctx context.Context, id platform.ID) (*platform.Task, error) {
mu.Lock()
defer mu.Unlock()
t, ok := tasks[id]
if !ok {
return nil, platform.ErrTaskNotFound
}
newt := *t
return &newt, nil
},
FindTasksFn: func(ctx context.Context, tf platform.TaskFilter) ([]*platform.Task, int, error) {
mu.Lock()
defer mu.Unlock()
if tf.After != nil {
return []*platform.Task{}, 0, nil
{
name: "TaskUpdated from inactive to active",
call: func(t *testing.T, c *Coordinator) {
if err := c.TaskUpdated(context.Background(), taskThree, activeThree); err != nil {
t.Errorf("expected nil error found %q", err)
}
rtn := []*platform.Task{}
for _, task := range tasks {
rtn = append(rtn, task)
}
return rtn, len(rtn), nil
},
ForceRunFn: func(ctx context.Context, id platform.ID, scheduledFor int64) (*platform.Run, error) {
mu.Lock()
defer mu.Unlock()
t, ok := tasks[id]
if !ok {
return nil, platform.ErrTaskNotFound
}
return &platform.Run{ID: id, TaskID: t.ID, ScheduledFor: time.Unix(scheduledFor, 0).Format(time.RFC3339)}, nil
scheduler: &scheduler{
calls: []interface{}{
updateCall{activeThree},
claimCall{activeThree},
},
},
},
{
name: "TaskUpdated from active to inactive",
call: func(t *testing.T, c *Coordinator) {
if err := c.TaskUpdated(context.Background(), activeThree, taskThree); err != nil {
t.Errorf("expected nil error found %q", err)
}
return ts
},
scheduler: &scheduler{
calls: []interface{}{
releaseCall{three},
updateCall{taskThree},
},
},
},
{
name: "TaskUpdated from active to inactive task not claimed error on release",
releaseErr: influxdb.ErrTaskNotClaimed,
call: func(t *testing.T, c *Coordinator) {
if err := c.TaskUpdated(context.Background(), activeThree, taskThree); err != nil {
t.Errorf("expected nil error found %q", err)
}
},
scheduler: &scheduler{
calls: []interface{}{
releaseCall{three},
updateCall{taskThree},
},
},
},
{
name: "TaskUpdated from active to inactive task not claimed error on update",
updateErr: influxdb.ErrTaskNotClaimed,
call: func(t *testing.T, c *Coordinator) {
if err := c.TaskUpdated(context.Background(), activeThree, taskThree); err != nil {
t.Errorf("expected nil error found %q", err)
}
},
scheduler: &scheduler{
calls: []interface{}{
releaseCall{three},
updateCall{taskThree},
},
},
},
{
name: "TaskUpdated with no status change",
call: func(t *testing.T, c *Coordinator) {
if err := c.TaskUpdated(context.Background(), taskTwo, taskTwo); err != nil {
t.Errorf("expected nil error found %q", err)
}
},
scheduler: &scheduler{
calls: []interface{}{
updateCall{taskTwo},
},
},
},
{
name: "TaskDeleted releases the task ID",
call: func(t *testing.T, c *Coordinator) {
if err := c.TaskDeleted(context.Background(), two); err != nil {
t.Errorf("expected nil error found %q", err)
}
},
scheduler: &scheduler{
calls: []interface{}{
releaseCall{two},
},
},
},
{
name: "RunCancelled delegates to the scheduler",
call: func(t *testing.T, c *Coordinator) {
if err := c.RunCancelled(context.Background(), one, one); err != nil {
t.Errorf("expected nil error found %q", err)
}
},
scheduler: &scheduler{
calls: []interface{}{
cancelCall{one, one},
},
},
},
{
name: "RunRetried delegates to Update",
call: func(t *testing.T, c *Coordinator) {
if err := c.RunRetried(context.Background(), taskOne, runOne); err != nil {
t.Errorf("expected nil error found %q", err)
}
},
scheduler: &scheduler{
calls: []interface{}{
updateCall{taskOne},
},
},
},
{
name: "RunForced delegates to Update",
call: func(t *testing.T, c *Coordinator) {
if err := c.RunForced(context.Background(), taskOne, runOne); err != nil {
t.Errorf("expected nil error found %q", err)
}
},
scheduler: &scheduler{
calls: []interface{}{
updateCall{taskOne},
},
},
},
} {
t.Run(test.name, func(t *testing.T) {
var (
scheduler = &scheduler{
claimErr: test.claimErr,
updateErr: test.updateErr,
releaseErr: test.releaseErr,
}
coord = New(zap.NewNop(), scheduler)
)
}
test.call(t, coord)
func TestCoordinator(t *testing.T) {
ts := inmemTaskService()
sched := mock.NewScheduler()
coord := coordinator.New(zaptest.NewLogger(t), sched, ts, coordinator.WithoutExistingTasks())
createChan := sched.TaskCreateChan()
releaseChan := sched.TaskReleaseChan()
updateChan := sched.TaskUpdateChan()
task, err := coord.CreateTask(context.Background(), platform.TaskCreate{OrganizationID: 1, Flux: script})
if err != nil {
t.Fatal(err)
if diff := cmp.Diff(
test.scheduler.calls,
scheduler.calls,
allowUnexported); diff != "" {
t.Errorf("unexpected scheduler contents %s", diff)
}
createdTask, err := timeoutSelector(createChan)
if err != nil {
t.Fatal(err)
}
if task.ID != createdTask.ID {
t.Fatal("task given to scheduler not the same as task created")
}
if task.Flux != script {
t.Fatal("task sent to scheduler doesnt match task created")
}
if err := coord.DeleteTask(context.Background(), task.ID); err != nil {
t.Fatal(err)
}
task, err = timeoutSelector(releaseChan)
if err != nil {
t.Fatal(err)
}
if task.Flux != script {
t.Fatal("task sent to scheduler doesn't match task created")
}
task, err = coord.CreateTask(context.Background(), platform.TaskCreate{OrganizationID: 1, Flux: script})
if err != nil {
t.Fatal(err)
}
_, err = timeoutSelector(createChan)
if err != nil {
t.Fatal(err)
}
inactive := string(backend.TaskInactive)
res, err := coord.UpdateTask(context.Background(), task.ID, platform.TaskUpdate{Status: &inactive})
if err != nil {
t.Fatal(err)
}
// Only validating res on the first update.
if res.ID != task.ID {
t.Fatalf("unexpected ID on update result: got %s, want %s", res.ID.String(), task.ID.String())
}
if res.Flux != task.Flux {
t.Fatalf("unexpected script on update result: got %q, want %q", res.Flux, task.Flux)
}
if res.Status != inactive {
t.Fatalf("unexpected meta status on update result: got %q, want %q", res.Status, inactive)
}
task, err = timeoutSelector(releaseChan)
if err != nil {
t.Fatal(err)
}
if task.Flux != script {
t.Fatal("task sent to scheduler doesnt match task created")
}
active := string(backend.TaskActive)
if _, err := coord.UpdateTask(context.Background(), task.ID, platform.TaskUpdate{Status: &active}); err != nil {
t.Fatal(err)
}
task, err = timeoutSelector(createChan)
if err != nil {
t.Fatal(err)
}
if task.Flux != script {
t.Fatal("task sent to scheduler doesnt match task created")
}
newScript := `option task = {name: "a task",cron: "1 * * * *"} from(bucket:"test") |> range(start:-2h)`
if _, err := coord.UpdateTask(context.Background(), task.ID, platform.TaskUpdate{Flux: &newScript}); err != nil {
t.Fatal(err)
}
task, err = timeoutSelector(updateChan)
if err != nil {
t.Fatal(err)
}
if task.Flux != newScript {
t.Fatal("task sent to scheduler doesnt match task created")
}
}
func TestCoordinator_ClaimTaskUpdatesLatestCompleted(t *testing.T) {
t.Parallel()
ts := inmemTaskService()
sched := mock.NewScheduler()
coord := coordinator.New(zaptest.NewLogger(t), sched, ts, coordinator.WithoutExistingTasks())
task, err := coord.CreateTask(context.Background(), platform.TaskCreate{OrganizationID: 1, Flux: script})
if err != nil {
t.Fatal(err)
}
rchan := sched.TaskReleaseChan()
activeStr := string(backend.TaskActive)
inactiveStr := string(backend.TaskInactive)
task, err = coord.UpdateTask(context.Background(), task.ID, platform.TaskUpdate{Status: &inactiveStr})
if err != nil {
t.Fatal(err)
}
select {
case <-rchan:
case <-time.After(time.Second):
t.Fatal("failed to release claimed task")
}
newNow := time.Now().Add(time.Second)
sched.Tick(newNow.Unix())
cchan := sched.TaskCreateChan()
_, err = coord.UpdateTask(context.Background(), task.ID, platform.TaskUpdate{Status: &activeStr})
if err != nil {
t.Fatal(err)
}
select {
case claimedTask := <-cchan:
if claimedTask.LatestCompleted != newNow.UTC().Format(time.RFC3339) {
t.Fatal("failed up update latest completed in claimed task")
}
case <-time.After(time.Second):
t.Fatal("failed to release claimed task")
}
}
func TestCoordinator_DeleteUnclaimedTask(t *testing.T) {
ts := inmemTaskService()
sched := mock.NewScheduler()
coord := coordinator.New(zaptest.NewLogger(t), sched, ts, coordinator.WithoutExistingTasks())
// Create an isolated task directly through the store so the coordinator doesn't know about it.
task, err := ts.CreateTask(context.Background(), platform.TaskCreate{OrganizationID: 1, Flux: script})
if err != nil {
t.Fatal(err)
}
// Deleting the task through the coordinator should succeed.
if err := coord.DeleteTask(context.Background(), task.ID); err != nil {
t.Fatal(err)
}
if _, err := ts.FindTaskByID(context.Background(), task.ID); err != platform.ErrTaskNotFound {
t.Fatalf("expected deleted task not to be found; got %v", err)
}
}
func TestCoordinator_ClaimExistingTasks(t *testing.T) {
if testing.Short() {
t.Skip("skipping in short mode")
}
ts := inmemTaskService()
sched := mock.NewScheduler()
createChan := sched.TaskCreateChan()
const numTasks = 110 // One page of listed tasks should be 100, so pick more than that.
const inactiveTaskIndex = 13 // One arbitrary task is set to inactive.
createdIDs := make([]platform.ID, numTasks)
for i := 0; i < numTasks; i++ {
ctr := platform.TaskCreate{OrganizationID: 1, Flux: script}
if i == inactiveTaskIndex {
ctr.Status = string(backend.TaskInactive)
}
task, err := ts.CreateTask(context.Background(), ctr)
if err != nil {
t.Fatal(err)
}
createdIDs[i] = task.ID
}
origActive, err := ts.FindTaskByID(context.Background(), createdIDs[0])
if err != nil {
t.Fatal(err)
}
origInactive, err := ts.FindTaskByID(context.Background(), createdIDs[inactiveTaskIndex])
if err != nil {
t.Fatal(err)
}
coordinator.New(zaptest.NewLogger(t), sched, ts)
const expectedCreatedTasks = numTasks - 1 // -1 to skip the single inactive task.
for i := 0; i < expectedCreatedTasks; i++ {
_, err := timeoutSelector(createChan)
if err != nil {
t.Fatal(err)
}
}
for i, id := range createdIDs {
task := sched.TaskFor(id)
if i == inactiveTaskIndex {
if task != nil {
t.Fatalf("inactive task with id %s claimed by coordinator at startup", id)
}
} else if task == nil {
t.Fatalf("did not find created task with ID %s", id)
}
}
active, err := ts.FindTaskByID(context.Background(), createdIDs[0])
if err != nil {
t.Fatal(err)
}
inactive, err := ts.FindTaskByID(context.Background(), createdIDs[inactiveTaskIndex])
if err != nil {
t.Fatal(err)
}
if origActive.LatestCompleted == active.LatestCompleted {
t.Fatalf("active tasks not update with latest completed time")
}
if origInactive.LatestCompleted == inactive.LatestCompleted {
t.Fatalf("inactive tasks not update with latest completed time")
}
}
func TestCoordinator_ForceRun(t *testing.T) {
ts := inmemTaskService()
sched := mock.NewScheduler()
coord := coordinator.New(zaptest.NewLogger(t), sched, ts, coordinator.WithoutExistingTasks())
// Create an isolated task directly through the store so the coordinator doesn't know about it.
task, err := coord.CreateTask(context.Background(), platform.TaskCreate{OrganizationID: 1, Flux: script})
if err != nil {
t.Fatal(err)
}
task, err = coord.FindTaskByID(context.Background(), task.ID)
if err != nil {
t.Fatal(err)
}
ch := sched.TaskUpdateChan()
manualRunTime := time.Now().Unix()
if _, err := coord.ForceRun(context.Background(), task.ID, manualRunTime); err != nil {
t.Fatal(err)
}
select {
case <-ch:
// great!
case <-time.After(time.Second):
t.Fatal("didn't receive task update in time")
})
}
}

View File

@ -0,0 +1,60 @@
package coordinator
import (
"context"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/task/backend"
)
type (
scheduler struct {
backend.Scheduler
claimErr,
updateErr,
releaseErr error
calls []interface{}
}
claimCall struct {
Task *influxdb.Task
}
updateCall struct {
Task *influxdb.Task
}
releaseCall struct {
ID influxdb.ID
}
cancelCall struct {
TaskID, RunID influxdb.ID
}
)
func (s *scheduler) ClaimTask(_ context.Context, task *influxdb.Task) error {
s.calls = append(s.calls, claimCall{task})
return s.claimErr
}
func (s *scheduler) UpdateTask(_ context.Context, task *influxdb.Task) error {
s.calls = append(s.calls, updateCall{task})
return s.updateErr
}
func (s *scheduler) ReleaseTask(taskID influxdb.ID) error {
s.calls = append(s.calls, releaseCall{taskID})
return s.releaseErr
}
func (s *scheduler) CancelRun(_ context.Context, taskID influxdb.ID, runID influxdb.ID) error {
s.calls = append(s.calls, cancelCall{taskID, runID})
return nil
}

View File

@ -0,0 +1,123 @@
package backend
import (
"context"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb"
"go.uber.org/zap"
)
var (
one = influxdb.ID(1)
two = influxdb.ID(2)
three = influxdb.ID(3)
four = influxdb.ID(4)
aTime = time.Now()
aTimeStamp = aTime.Format(time.RFC3339)
taskOne = &influxdb.Task{ID: one}
taskTwo = &influxdb.Task{ID: two, Status: "active"}
taskThree = &influxdb.Task{ID: three, Status: "inactive"}
taskFour = &influxdb.Task{ID: four}
allTasks = map[influxdb.ID]*influxdb.Task{
one: taskOne,
two: taskTwo,
three: taskThree,
four: taskFour,
}
)
func Test_NotifyCoordinatorOfCreated(t *testing.T) {
var (
coordinator = &coordinator{}
tasks = &taskService{
// paginated reponses
pageOne: []*influxdb.Task{taskOne},
otherPages: map[influxdb.ID][]*influxdb.Task{
one: []*influxdb.Task{taskTwo, taskThree},
three: []*influxdb.Task{taskFour},
},
}
)
defer func(old func() time.Time) {
now = old
}(now)
now = func() time.Time { return aTime }
if err := NotifyCoordinatorOfExisting(context.Background(), tasks, coordinator, zap.NewNop()); err != nil {
t.Errorf("expected nil, found %q", err)
}
if *tasks.filter.Type != influxdb.TaskTypeWildcard {
t.Error("expected wildcard type filter")
}
if diff := cmp.Diff([]update{
{one, influxdb.TaskUpdate{LatestCompleted: &aTimeStamp}},
{two, influxdb.TaskUpdate{LatestCompleted: &aTimeStamp}},
{three, influxdb.TaskUpdate{LatestCompleted: &aTimeStamp}},
{four, influxdb.TaskUpdate{LatestCompleted: &aTimeStamp}},
}, tasks.updates); diff != "" {
t.Errorf("unexpected updates to task service %v", diff)
}
if diff := cmp.Diff([]*influxdb.Task{
taskOne,
taskTwo,
taskThree,
taskFour,
}, coordinator.tasks); diff != "" {
t.Errorf("unexpected tasks sent to coordinator %v", diff)
}
}
type coordinator struct {
tasks []*influxdb.Task
}
func (c *coordinator) TaskCreated(_ context.Context, task *influxdb.Task) error {
c.tasks = append(c.tasks, task)
return nil
}
// TasksService mocking
type taskService struct {
// paginated tasks
pageOne []*influxdb.Task
otherPages map[influxdb.ID][]*influxdb.Task
// find tasks call
filter influxdb.TaskFilter
// update call
updates []update
}
type update struct {
ID influxdb.ID
Update influxdb.TaskUpdate
}
func (t *taskService) UpdateTask(_ context.Context, id influxdb.ID, upd influxdb.TaskUpdate) (*influxdb.Task, error) {
t.updates = append(t.updates, update{id, upd})
return allTasks[id], nil
}
func (t *taskService) FindTasks(_ context.Context, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) {
t.filter = filter
if filter.After == nil {
return t.pageOne, len(t.pageOne), nil
}
tasks := t.otherPages[*filter.After]
return tasks, len(tasks), nil
}

View File

@ -0,0 +1,138 @@
package middleware
import (
"context"
"fmt"
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/task/backend"
)
// Coordinator is a type which is used to react to
// task related actions
type Coordinator interface {
TaskCreated(context.Context, *influxdb.Task) error
TaskUpdated(ctx context.Context, from, to *influxdb.Task) error
TaskDeleted(context.Context, influxdb.ID) error
RunCancelled(ctx context.Context, taskID, runID influxdb.ID) error
RunRetried(ctx context.Context, task *influxdb.Task, run *influxdb.Run) error
RunForced(ctx context.Context, task *influxdb.Task, run *influxdb.Run) error
}
// CoordinatingTaskService acts as a TaskService decorator that handles coordinating the api request
// with the required task control actions asynchronously via a message dispatcher
type CoordinatingTaskService struct {
influxdb.TaskService
coordinator Coordinator
now func() time.Time
}
// New constructs a new coordinating task service
func New(service influxdb.TaskService, coordinator Coordinator, opts ...Option) *CoordinatingTaskService {
c := &CoordinatingTaskService{
TaskService: service,
coordinator: coordinator,
now: func() time.Time {
return time.Now().UTC()
},
}
for _, opt := range opts {
opt(c)
}
return c
}
// CreateTask Creates a task in the existing task service and Publishes the change so any TaskD service can lease it.
func (s *CoordinatingTaskService) CreateTask(ctx context.Context, tc influxdb.TaskCreate) (*influxdb.Task, error) {
t, err := s.TaskService.CreateTask(ctx, tc)
if err != nil {
return t, err
}
if err := s.coordinator.TaskCreated(ctx, t); err != nil {
if derr := s.TaskService.DeleteTask(ctx, t.ID); derr != nil {
return t, fmt.Errorf("schedule task failed: %s\n\tcleanup also failed: %s", err, derr)
}
return t, err
}
return t, nil
}
// UpdateTask Updates a task and publishes the change so the task owner can act on the update
func (s *CoordinatingTaskService) UpdateTask(ctx context.Context, id influxdb.ID, upd influxdb.TaskUpdate) (*influxdb.Task, error) {
from, err := s.TaskService.FindTaskByID(ctx, id)
if err != nil {
return nil, err
}
// if the update is to activate and the previous task was inactive we should add a "latest completed" update
// this allows us to see not run the task for inactive time
if upd.Status != nil && *upd.Status == string(backend.TaskActive) {
// confirm that it was inactive and this is an attempt to activate
if from.Status == string(backend.TaskInactive) {
lc := s.now().Format(time.RFC3339)
upd.LatestCompleted = &lc
}
}
to, err := s.TaskService.UpdateTask(ctx, id, upd)
if err != nil {
return to, err
}
return to, s.coordinator.TaskUpdated(ctx, from, to)
}
// DeleteTask delete the task and publishes the change, to allow the task owner to find out about this change faster.
func (s *CoordinatingTaskService) DeleteTask(ctx context.Context, id influxdb.ID) error {
if err := s.coordinator.TaskDeleted(ctx, id); err != nil {
return err
}
return s.TaskService.DeleteTask(ctx, id)
}
// CancelRun Cancel the run and publish the cancelation.
func (s *CoordinatingTaskService) CancelRun(ctx context.Context, taskID, runID influxdb.ID) error {
if err := s.TaskService.CancelRun(ctx, taskID, runID); err != nil {
return err
}
return s.coordinator.RunCancelled(ctx, taskID, runID)
}
// RetryRun calls retry on the task service and publishes the retry.
func (s *CoordinatingTaskService) RetryRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) {
t, err := s.TaskService.FindTaskByID(ctx, taskID)
if err != nil {
return nil, err
}
r, err := s.TaskService.RetryRun(ctx, taskID, runID)
if err != nil {
return r, err
}
return r, s.coordinator.RunRetried(ctx, t, r)
}
// ForceRun create the forced run in the task system and publish to the pubSub.
func (s *CoordinatingTaskService) ForceRun(ctx context.Context, taskID influxdb.ID, scheduledFor int64) (*influxdb.Run, error) {
t, err := s.TaskService.FindTaskByID(ctx, taskID)
if err != nil {
return nil, err
}
r, err := s.TaskService.ForceRun(ctx, taskID, scheduledFor)
if err != nil {
return r, err
}
return r, s.coordinator.RunForced(ctx, t, r)
}

View File

@ -0,0 +1,323 @@
package middleware_test
import (
"context"
"errors"
"sync"
"testing"
"time"
platform "github.com/influxdata/influxdb"
pmock "github.com/influxdata/influxdb/mock"
_ "github.com/influxdata/influxdb/query/builtin"
"github.com/influxdata/influxdb/snowflake"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb/task/backend/coordinator"
"github.com/influxdata/influxdb/task/backend/middleware"
"github.com/influxdata/influxdb/task/mock"
"go.uber.org/zap/zaptest"
)
func timeoutSelector(ch <-chan *platform.Task) (*platform.Task, error) {
select {
case task := <-ch:
return task, nil
case <-time.After(10 * time.Second):
return nil, errors.New("timeout on select")
}
}
const script = `option task = {name: "a task",cron: "* * * * *"} from(bucket:"test") |> range(start:-1h)`
// TODO(lh): Once we have a kv.TaskService this entire part can be replaced with kv.TaskService using a inmem kv.Store
func inmemTaskService() platform.TaskService {
gen := snowflake.NewDefaultIDGenerator()
tasks := map[platform.ID]*platform.Task{}
mu := sync.Mutex{}
ts := &pmock.TaskService{
CreateTaskFn: func(ctx context.Context, tc platform.TaskCreate) (*platform.Task, error) {
mu.Lock()
defer mu.Unlock()
id := gen.ID()
task := &platform.Task{ID: id, Flux: tc.Flux, Status: tc.Status, OrganizationID: tc.OrganizationID, Organization: tc.Organization}
if task.Status == "" {
task.Status = string(backend.TaskActive)
}
tasks[id] = task
return tasks[id], nil
},
DeleteTaskFn: func(ctx context.Context, id platform.ID) error {
mu.Lock()
defer mu.Unlock()
delete(tasks, id)
return nil
},
UpdateTaskFn: func(ctx context.Context, id platform.ID, upd platform.TaskUpdate) (*platform.Task, error) {
mu.Lock()
defer mu.Unlock()
t, ok := tasks[id]
if !ok {
return nil, platform.ErrTaskNotFound
}
if upd.Flux != nil {
t.Flux = *upd.Flux
}
if upd.Status != nil {
t.Status = *upd.Status
}
if upd.LatestCompleted != nil {
t.LatestCompleted = *upd.LatestCompleted
}
return t, nil
},
FindTaskByIDFn: func(ctx context.Context, id platform.ID) (*platform.Task, error) {
mu.Lock()
defer mu.Unlock()
t, ok := tasks[id]
if !ok {
return nil, platform.ErrTaskNotFound
}
newt := *t
return &newt, nil
},
FindTasksFn: func(ctx context.Context, tf platform.TaskFilter) ([]*platform.Task, int, error) {
mu.Lock()
defer mu.Unlock()
if tf.After != nil {
return []*platform.Task{}, 0, nil
}
rtn := []*platform.Task{}
for _, task := range tasks {
rtn = append(rtn, task)
}
return rtn, len(rtn), nil
},
ForceRunFn: func(ctx context.Context, id platform.ID, scheduledFor int64) (*platform.Run, error) {
mu.Lock()
defer mu.Unlock()
t, ok := tasks[id]
if !ok {
return nil, platform.ErrTaskNotFound
}
return &platform.Run{ID: id, TaskID: t.ID, ScheduledFor: time.Unix(scheduledFor, 0).Format(time.RFC3339)}, nil
},
}
return ts
}
func TestCoordinatingTaskService(t *testing.T) {
var (
ts = inmemTaskService()
sched = mock.NewScheduler()
coord = coordinator.New(zaptest.NewLogger(t), sched)
middleware = middleware.New(ts, coord)
createChan = sched.TaskCreateChan()
releaseChan = sched.TaskReleaseChan()
updateChan = sched.TaskUpdateChan()
)
task, err := middleware.CreateTask(context.Background(), platform.TaskCreate{OrganizationID: 1, Flux: script})
if err != nil {
t.Fatal(err)
}
createdTask, err := timeoutSelector(createChan)
if err != nil {
t.Fatal(err)
}
if task.ID != createdTask.ID {
t.Fatal("task given to scheduler not the same as task created")
}
if task.Flux != script {
t.Fatal("task sent to scheduler doesnt match task created")
}
if err := middleware.DeleteTask(context.Background(), task.ID); err != nil {
t.Fatal(err)
}
task, err = timeoutSelector(releaseChan)
if err != nil {
t.Fatal(err)
}
if task.Flux != script {
t.Fatal("task sent to scheduler doesn't match task created")
}
task, err = middleware.CreateTask(context.Background(), platform.TaskCreate{OrganizationID: 1, Flux: script})
if err != nil {
t.Fatal(err)
}
_, err = timeoutSelector(createChan)
if err != nil {
t.Fatal(err)
}
inactive := string(backend.TaskInactive)
res, err := middleware.UpdateTask(context.Background(), task.ID, platform.TaskUpdate{Status: &inactive})
if err != nil {
t.Fatal(err)
}
// Only validating res on the first update.
if res.ID != task.ID {
t.Fatalf("unexpected ID on update result: got %s, want %s", res.ID.String(), task.ID.String())
}
if res.Flux != task.Flux {
t.Fatalf("unexpected script on update result: got %q, want %q", res.Flux, task.Flux)
}
if res.Status != inactive {
t.Fatalf("unexpected meta status on update result: got %q, want %q", res.Status, inactive)
}
task, err = timeoutSelector(releaseChan)
if err != nil {
t.Fatal(err)
}
if task.Flux != script {
t.Fatal("task sent to scheduler doesnt match task created")
}
active := string(backend.TaskActive)
if _, err := middleware.UpdateTask(context.Background(), task.ID, platform.TaskUpdate{Status: &active}); err != nil {
t.Fatal(err)
}
task, err = timeoutSelector(createChan)
if err != nil {
t.Fatal(err)
}
if task.Flux != script {
t.Fatal("task sent to scheduler doesnt match task created")
}
newScript := `option task = {name: "a task",cron: "1 * * * *"} from(bucket:"test") |> range(start:-2h)`
if _, err := middleware.UpdateTask(context.Background(), task.ID, platform.TaskUpdate{Flux: &newScript}); err != nil {
t.Fatal(err)
}
task, err = timeoutSelector(updateChan)
if err != nil {
t.Fatal(err)
}
if task.Flux != newScript {
t.Fatal("task sent to scheduler doesnt match task created")
}
}
func TestCoordinatingTaskService_ClaimTaskUpdatesLatestCompleted(t *testing.T) {
t.Parallel()
var (
ts = inmemTaskService()
sched = mock.NewScheduler()
coord = coordinator.New(zaptest.NewLogger(t), sched)
latest = time.Now().Add(time.Second)
middleware = middleware.New(ts, coord, middleware.WithNowFunc(func() time.Time {
return latest
}))
)
task, err := middleware.CreateTask(context.Background(), platform.TaskCreate{OrganizationID: 1, Flux: script})
if err != nil {
t.Fatal(err)
}
rchan := sched.TaskReleaseChan()
activeStr := string(backend.TaskActive)
inactiveStr := string(backend.TaskInactive)
task, err = middleware.UpdateTask(context.Background(), task.ID, platform.TaskUpdate{Status: &inactiveStr})
if err != nil {
t.Fatal(err)
}
select {
case <-rchan:
case <-time.After(time.Second):
t.Fatal("failed to release claimed task")
}
cchan := sched.TaskCreateChan()
_, err = middleware.UpdateTask(context.Background(), task.ID, platform.TaskUpdate{Status: &activeStr})
if err != nil {
t.Fatal(err)
}
select {
case claimedTask := <-cchan:
if claimedTask.LatestCompleted != latest.UTC().Format(time.RFC3339) {
t.Fatal("failed up update latest completed in claimed task")
}
case <-time.After(time.Second):
t.Fatal("failed to release claimed task")
}
}
func TestCoordinatingTaskService_DeleteUnclaimedTask(t *testing.T) {
var (
ts = inmemTaskService()
sched = mock.NewScheduler()
coord = coordinator.New(zaptest.NewLogger(t), sched)
middleware = middleware.New(ts, coord)
)
// Create an isolated task directly through the store so the coordinator doesn't know about it.
task, err := ts.CreateTask(context.Background(), platform.TaskCreate{OrganizationID: 1, Flux: script})
if err != nil {
t.Fatal(err)
}
// Deleting the task through the coordinator should succeed.
if err := middleware.DeleteTask(context.Background(), task.ID); err != nil {
t.Fatal(err)
}
if _, err := ts.FindTaskByID(context.Background(), task.ID); err != platform.ErrTaskNotFound {
t.Fatalf("expected deleted task not to be found; got %v", err)
}
}
func TestCoordinatingTaskService_ForceRun(t *testing.T) {
var (
ts = inmemTaskService()
sched = mock.NewScheduler()
coord = coordinator.New(zaptest.NewLogger(t), sched)
middleware = middleware.New(ts, coord)
)
// Create an isolated task directly through the store so the coordinator doesn't know about it.
task, err := middleware.CreateTask(context.Background(), platform.TaskCreate{OrganizationID: 1, Flux: script})
if err != nil {
t.Fatal(err)
}
task, err = middleware.FindTaskByID(context.Background(), task.ID)
if err != nil {
t.Fatal(err)
}
ch := sched.TaskUpdateChan()
manualRunTime := time.Now().Unix()
if _, err := middleware.ForceRun(context.Background(), task.ID, manualRunTime); err != nil {
t.Fatal(err)
}
select {
case <-ch:
// great!
case <-time.After(time.Second):
t.Fatal("didn't receive task update in time")
}
}

View File

@ -0,0 +1,13 @@
package middleware
import "time"
// Option is a functional option for the coordinating task service
type Option func(*CoordinatingTaskService)
// WithNowFunc sets the now func used to derive time
func WithNowFunc(fn func() time.Time) Option {
return func(c *CoordinatingTaskService) {
c.now = fn
}
}