parent
bc05f69491
commit
dad731ceb2
|
@ -15,7 +15,8 @@ type Coordinator struct {
|
|||
logger *zap.Logger
|
||||
sch backend.Scheduler
|
||||
|
||||
limit int
|
||||
limit int
|
||||
claimExisting bool
|
||||
}
|
||||
|
||||
type Option func(*Coordinator)
|
||||
|
@ -26,19 +27,29 @@ func WithLimit(i int) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// WithoutExistingTasks allows us to skip claiming tasks already in the system.
|
||||
func WithoutExistingTasks() Option {
|
||||
return func(c *Coordinator) {
|
||||
c.claimExisting = false
|
||||
}
|
||||
}
|
||||
|
||||
func New(logger *zap.Logger, scheduler backend.Scheduler, ts platform.TaskService, opts ...Option) *Coordinator {
|
||||
c := &Coordinator{
|
||||
logger: logger,
|
||||
sch: scheduler,
|
||||
TaskService: ts,
|
||||
limit: 1000,
|
||||
logger: logger,
|
||||
sch: scheduler,
|
||||
TaskService: ts,
|
||||
limit: 1000,
|
||||
claimExisting: true,
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(c)
|
||||
}
|
||||
|
||||
go c.claimExistingTasks()
|
||||
if c.claimExisting {
|
||||
go c.claimExistingTasks()
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
|
|
@ -109,7 +109,7 @@ func TestCoordinator(t *testing.T) {
|
|||
ts := inmemTaskService()
|
||||
sched := mock.NewScheduler()
|
||||
|
||||
coord := coordinator.New(zaptest.NewLogger(t), sched, ts)
|
||||
coord := coordinator.New(zaptest.NewLogger(t), sched, ts, coordinator.WithoutExistingTasks())
|
||||
createChan := sched.TaskCreateChan()
|
||||
releaseChan := sched.TaskReleaseChan()
|
||||
updateChan := sched.TaskUpdateChan()
|
||||
|
@ -211,7 +211,7 @@ func TestCoordinator_DeleteUnclaimedTask(t *testing.T) {
|
|||
ts := inmemTaskService()
|
||||
sched := mock.NewScheduler()
|
||||
|
||||
coord := coordinator.New(zaptest.NewLogger(t), sched, ts)
|
||||
coord := coordinator.New(zaptest.NewLogger(t), sched, ts, coordinator.WithoutExistingTasks())
|
||||
|
||||
// Create an isolated task directly through the store so the coordinator doesn't know about it.
|
||||
task, err := ts.CreateTask(context.Background(), platform.TaskCreate{OrganizationID: 1, Token: "token", Flux: script})
|
||||
|
@ -281,7 +281,7 @@ func TestCoordinator_ForceRun(t *testing.T) {
|
|||
ts := inmemTaskService()
|
||||
sched := mock.NewScheduler()
|
||||
|
||||
coord := coordinator.New(zaptest.NewLogger(t), sched, ts)
|
||||
coord := coordinator.New(zaptest.NewLogger(t), sched, ts, coordinator.WithoutExistingTasks())
|
||||
|
||||
// Create an isolated task directly through the store so the coordinator doesn't know about it.
|
||||
task, err := coord.CreateTask(context.Background(), platform.TaskCreate{OrganizationID: 1, Token: "token", Flux: script})
|
||||
|
|
Loading…
Reference in New Issue