diff --git a/task/backend/coordinator/coordinator.go b/task/backend/coordinator/coordinator.go index f6ba941464..18c55fb7c4 100644 --- a/task/backend/coordinator/coordinator.go +++ b/task/backend/coordinator/coordinator.go @@ -15,7 +15,8 @@ type Coordinator struct { logger *zap.Logger sch backend.Scheduler - limit int + limit int + claimExisting bool } type Option func(*Coordinator) @@ -26,19 +27,29 @@ func WithLimit(i int) Option { } } +// WithoutExistingTasks allows us to skip claiming tasks already in the system. +func WithoutExistingTasks() Option { + return func(c *Coordinator) { + c.claimExisting = false + } +} + func New(logger *zap.Logger, scheduler backend.Scheduler, ts platform.TaskService, opts ...Option) *Coordinator { c := &Coordinator{ - logger: logger, - sch: scheduler, - TaskService: ts, - limit: 1000, + logger: logger, + sch: scheduler, + TaskService: ts, + limit: 1000, + claimExisting: true, } for _, opt := range opts { opt(c) } - go c.claimExistingTasks() + if c.claimExisting { + go c.claimExistingTasks() + } return c } diff --git a/task/backend/coordinator/coordinator_test.go b/task/backend/coordinator/coordinator_test.go index 3ee0c5fd2b..26de5e0e01 100644 --- a/task/backend/coordinator/coordinator_test.go +++ b/task/backend/coordinator/coordinator_test.go @@ -109,7 +109,7 @@ func TestCoordinator(t *testing.T) { ts := inmemTaskService() sched := mock.NewScheduler() - coord := coordinator.New(zaptest.NewLogger(t), sched, ts) + coord := coordinator.New(zaptest.NewLogger(t), sched, ts, coordinator.WithoutExistingTasks()) createChan := sched.TaskCreateChan() releaseChan := sched.TaskReleaseChan() updateChan := sched.TaskUpdateChan() @@ -211,7 +211,7 @@ func TestCoordinator_DeleteUnclaimedTask(t *testing.T) { ts := inmemTaskService() sched := mock.NewScheduler() - coord := coordinator.New(zaptest.NewLogger(t), sched, ts) + coord := coordinator.New(zaptest.NewLogger(t), sched, ts, coordinator.WithoutExistingTasks()) // Create an isolated task directly through the store so the coordinator doesn't know about it. task, err := ts.CreateTask(context.Background(), platform.TaskCreate{OrganizationID: 1, Token: "token", Flux: script}) @@ -281,7 +281,7 @@ func TestCoordinator_ForceRun(t *testing.T) { ts := inmemTaskService() sched := mock.NewScheduler() - coord := coordinator.New(zaptest.NewLogger(t), sched, ts) + coord := coordinator.New(zaptest.NewLogger(t), sched, ts, coordinator.WithoutExistingTasks()) // Create an isolated task directly through the store so the coordinator doesn't know about it. task, err := coord.CreateTask(context.Background(), platform.TaskCreate{OrganizationID: 1, Token: "token", Flux: script})