refactor(task): enable high-frequency WithTicker option

Previously, the WithTicker option would call TickScheduler.Tick every
time the underlying time.Ticker sent a time on its channel. This meant
we used a 1s period, which meant that in the worst case, we would see a
tick at about 999ms after the second rollover.

This change increases the underlying time.Ticker frequency, but only
calls TickScheduler.Tick after a second rolls over. Since we now use a
tick frequency of 100ms, during normal operation, TickScheduler.Tick
will be called within 0.1s after the second rolls over.
pull/10616/head
Mark Rushakoff 2018-12-06 21:31:50 -08:00 committed by Mark Rushakoff
parent 8d546ff689
commit 2e81dd31b4
3 changed files with 50 additions and 3 deletions

View File

@ -309,7 +309,7 @@ func (m *Main) run(ctx context.Context) (err error) {
executor := taskexecutor.NewAsyncQueryServiceExecutor(m.logger.With(zap.String("service", "task-executor")), m.queryController, boltStore)
lw := taskbackend.NewPointLogWriter(pointsWriter)
m.scheduler = taskbackend.NewScheduler(boltStore, executor, lw, time.Now().UTC().Unix(), taskbackend.WithTicker(ctx, time.Second), taskbackend.WithLogger(m.logger))
m.scheduler = taskbackend.NewScheduler(boltStore, executor, lw, time.Now().UTC().Unix(), taskbackend.WithTicker(ctx, 100*time.Millisecond), taskbackend.WithLogger(m.logger))
m.scheduler.Start(ctx)
reg.MustRegister(m.scheduler.PrometheusCollectors()...)

View File

@ -125,15 +125,24 @@ type Scheduler interface {
// TickSchedulerOption is a option you can use to modify the schedulers behavior.
type TickSchedulerOption func(*TickScheduler)
// WithTicker sets a time.Ticker with period d,
// and calls TickScheduler.Tick when the ticker rolls over to a new second.
// With a sub-second d, TickScheduler.Tick should be called roughly no later than d after a second:
// this can help ensure tasks happen early with a second window.
func WithTicker(ctx context.Context, d time.Duration) TickSchedulerOption {
return func(s *TickScheduler) {
ticker := time.NewTicker(d)
go func() {
prev := time.Now().Unix() - 1
for {
select {
case time := <-ticker.C:
go s.Tick(time.Unix())
case t := <-ticker.C:
u := t.Unix()
if u > prev {
prev = u
go s.Tick(u)
}
case <-ctx.Done():
ticker.Stop()
return

View File

@ -677,3 +677,41 @@ func TestScheduler_Stop(t *testing.T) {
t.Fatalf("scheduler did not stop after executor Wait returned")
}
}
func TestScheduler_WithTicker(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tickFreq := 100 * time.Millisecond
d := mock.NewDesiredState()
e := mock.NewExecutor()
o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5, backend.WithLogger(zaptest.NewLogger(t)), backend.WithTicker(ctx, tickFreq))
o.Start(ctx)
defer o.Stop()
task := &backend.StoreTask{
ID: platform.ID(1),
}
createdAt := time.Now().Unix()
meta := &backend.StoreTaskMeta{
MaxConcurrency: 5,
EffectiveCron: "@every 1s",
LatestCompleted: createdAt,
}
d.SetTaskMeta(task.ID, *meta)
if err := o.ClaimTask(task, meta); err != nil {
t.Fatal(err)
}
for time.Now().Unix() == createdAt {
time.Sleep(tickFreq + 10*time.Millisecond)
}
if x, err := d.PollForNumberCreated(task.ID, 1); err != nil {
t.Fatalf("expected 1 run queued, but got %d", len(x))
}
}