From ebb597d16cd83d50e04ca9e29b6fbdee816db73c Mon Sep 17 00:00:00 2001 From: davidby-influx <72418212+davidby-influx@users.noreply.github.com> Date: Thu, 27 Jun 2024 16:14:45 -0700 Subject: [PATCH] fix: preserve time zone information in Task Scheduler (#25112) Avoid converting times to int64 in the Task Scheduler to preserve time zone information. This corrects a failure after fall back time changes which halts every-type tasks closes https://github.com/influxdata/influxdb/issues/25110 --- task/backend/scheduler/scheduler_test.go | 98 ++++++++++++++++++++++++ task/backend/scheduler/treescheduler.go | 36 ++++----- 2 files changed, 116 insertions(+), 18 deletions(-) diff --git a/task/backend/scheduler/scheduler_test.go b/task/backend/scheduler/scheduler_test.go index 88161c53ad..07bcf96c75 100644 --- a/task/backend/scheduler/scheduler_test.go +++ b/task/backend/scheduler/scheduler_test.go @@ -388,6 +388,96 @@ func TestSchedule_Next(t *testing.T) { }) } +func TestTreeScheduler_TimeChange(t *testing.T) { + loc := mustParseLocation("America/Los_Angeles") + tests := []struct { + name string // also used as the cron time string + start time.Time + timeElapsed time.Duration + }{ + { + // Daylight Savings time boundary autumn (fall back) + start: time.Date(2023, 11, 05, 01, 58, 00, 0, loc), + name: "@every 1m", + timeElapsed: time.Minute, + }, + { + // Daylight Savings time boundary autumn (fall back) + start: time.Date(2023, 11, 05, 00, 00, 00, 0, loc), + name: "@every 1h", + timeElapsed: time.Hour, + }, + { + // Daylight Savings time boundary (spring forward) + start: time.Date(2023, 3, 12, 00, 00, 00, 0, loc), + name: "@every 1h", + timeElapsed: time.Hour, + }, + { + // Daylight Savings time boundary (spring forward) + start: time.Date(2023, 3, 12, 01, 58, 00, 0, loc), + name: "@every 1m", + timeElapsed: time.Minute, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := make(chan time.Time, 100) + exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) { + select { + case <-ctx.Done(): + t.Log("ctx done") + case c <- scheduledAt: + } + }} + mockTime := clock.NewMock() + mockTime.Set(tt.start) + sch, _, err := NewScheduler( + exe, + &mockSchedulableService{fn: func(ctx context.Context, id ID, t time.Time) error { + return nil + }}, + WithTime(mockTime), + WithMaxConcurrentWorkers(20)) + if err != nil { + t.Fatal(err) + } + defer sch.Stop() + schedule, ts, err := NewSchedule(tt.name, mockTime.Now().UTC()) + if err != nil { + t.Fatal(err) + } + + err = sch.Schedule(mockSchedulable{id: 1, schedule: schedule, offset: time.Second, lastScheduled: ts}) + if err != nil { + t.Fatal(err) + } + go func() { + sch.mu.Lock() + mockTime.Set(mockTime.Now().UTC().Add(17 * tt.timeElapsed)) + sch.mu.Unlock() + }() + + after := time.After(6 * time.Second) + oldCheckC := ts + for i := 0; i < 16; i++ { + select { + case checkC := <-c: + if checkC.Sub(oldCheckC) != tt.timeElapsed { + t.Fatalf("task didn't fire on correct interval fired on %s interval", checkC.Sub(oldCheckC)) + } + if !checkC.Truncate(tt.timeElapsed).Equal(checkC) { + t.Fatalf("task didn't fire at the correct time boundary") + } + oldCheckC = checkC + case <-after: + t.Fatalf("test timed out, only fired %d times but should have fired 16 times", i) + } + } + }) + } +} + func TestTreeScheduler_Stop(t *testing.T) { now := time.Now().Add(-20 * time.Second) mockTime := clock.NewMock() @@ -641,3 +731,11 @@ func TestNewSchedule(t *testing.T) { }) } } + +func mustParseLocation(tzname string) *time.Location { + loc, err := time.LoadLocation(tzname) + if err != nil { + panic(err) + } + return loc +} diff --git a/task/backend/scheduler/treescheduler.go b/task/backend/scheduler/treescheduler.go index 31f02fde05..69e98551f9 100644 --- a/task/backend/scheduler/treescheduler.go +++ b/task/backend/scheduler/treescheduler.go @@ -63,7 +63,7 @@ const ( type TreeScheduler struct { mu sync.RWMutex priorityQueue *btree.BTree - nextTime map[ID]int64 // we need this index so we can delete items from the scheduled + nextTime map[ID]time.Time // we need this index so we can delete items from the scheduled when time.Time executor Executor onErr ErrorFunc @@ -114,7 +114,7 @@ func NewScheduler(executor Executor, checkpointer SchedulableService, opts ...tr s := &TreeScheduler{ executor: executor, priorityQueue: btree.New(degreeBtreeScheduled), - nextTime: map[ID]int64{}, + nextTime: map[ID]time.Time{}, onErr: func(_ context.Context, _ ID, _ time.Time, _ error) {}, time: clock.New(), done: make(chan struct{}, 1), @@ -173,7 +173,7 @@ func NewScheduler(executor Executor, checkpointer SchedulableService, opts ...tr continue schedulerLoop } it := min.(Item) - if ts := s.time.Now().UTC(); it.When().After(ts) { + if ts := s.time.Now(); it.When().After(ts) { s.timer.Reset(ts.Sub(it.When())) s.mu.Unlock() continue schedulerLoop @@ -245,7 +245,7 @@ func (s *TreeScheduler) iterator(ts time.Time) btree.ItemIterator { return false } it := i.(Item) // we want it to panic if things other than Items are populating the scheduler, as it is something we can't recover from. - if time.Unix(it.next+it.Offset, 0).After(ts) { + if it.next.Add(it.Offset).After(ts) { return false } // distribute to the right worker. @@ -310,7 +310,7 @@ func (s *TreeScheduler) work(ctx context.Context, ch chan Item) { s.wg.Done() }() for it = range ch { - t := time.Unix(it.next, 0) + t := it.next err := func() (err error) { defer func() { if r := recover(); r != nil { @@ -342,7 +342,7 @@ func (s *TreeScheduler) Schedule(sch Schedulable) error { it := Item{ cron: sch.Schedule(), id: sch.ID(), - Offset: int64(sch.Offset().Seconds()), + Offset: sch.Offset(), //last: sch.LastScheduled().Unix(), } nt, err := it.cron.Next(sch.LastScheduled()) @@ -351,8 +351,8 @@ func (s *TreeScheduler) Schedule(sch Schedulable) error { s.onErr(context.Background(), it.id, time.Time{}, err) return err } - it.next = nt.UTC().Unix() - it.when = it.next + it.Offset + it.next = nt + it.when = it.next.Add(it.Offset) s.mu.Lock() defer s.mu.Unlock() @@ -377,7 +377,7 @@ func (s *TreeScheduler) Schedule(sch Schedulable) error { id: it.id, }) } - s.nextTime[it.id] = it.next + it.Offset + s.nextTime[it.id] = it.next.Add(it.Offset) // insert the new task run time s.priorityQueue.ReplaceOrInsert(it) @@ -386,33 +386,33 @@ func (s *TreeScheduler) Schedule(sch Schedulable) error { // Item is a task in the scheduler. type Item struct { - when int64 + when time.Time id ID cron Schedule - next int64 - Offset int64 + next time.Time + Offset time.Duration } func (it Item) Next() time.Time { - return time.Unix(it.next, 0) + return it.next } func (it Item) When() time.Time { - return time.Unix(it.when, 0) + return it.when } // Less tells us if one Item is less than another func (it Item) Less(bItem btree.Item) bool { it2 := bItem.(Item) - return it.when < it2.when || ((it.when == it2.when) && it.id < it2.id) + return it2.when.After(it.when) || (it.when.Equal(it2.when) && it.id < it2.id) } func (it *Item) updateNext() error { - newNext, err := it.cron.Next(time.Unix(it.next, 0)) + newNext, err := it.cron.Next(it.next) if err != nil { return err } - it.next = newNext.UTC().Unix() - it.when = it.next + it.Offset + it.next = newNext + it.when = it.next.Add(it.Offset) return nil }