package raft import ( "sort" "sync" "time" ) // Clock represents an interface to the functions in the standard library time // package. Two implementations are available in the clock package. The first // is a real-time clock which simply wraps the time package's functions. The // second is a mock clock which will only make forward progress when // programmatically adjusted. type Clock interface { Add(d time.Duration) After(d time.Duration) <-chan time.Time AfterFunc(d time.Duration, f func()) *Timer Now() time.Time Sleep(d time.Duration) Tick(d time.Duration) <-chan time.Time Ticker(d time.Duration) *Ticker Timer(d time.Duration) *Timer } // clock implements a real-time clock by simply wrapping the time package functions. type clock struct{} func (c *clock) Add(d time.Duration) { panic("real-time clock cannot manually adjust") } func (c *clock) After(d time.Duration) <-chan time.Time { return time.After(d) } func (c *clock) AfterFunc(d time.Duration, f func()) *Timer { return &Timer{timer: time.AfterFunc(d, f)} } func (c *clock) Now() time.Time { return time.Now() } func (c *clock) Sleep(d time.Duration) { time.Sleep(d) } func (c *clock) Tick(d time.Duration) <-chan time.Time { return time.Tick(d) } func (c *clock) Ticker(d time.Duration) *Ticker { t := time.NewTicker(d) return &Ticker{C: t.C, ticker: t} } func (c *clock) Timer(d time.Duration) *Timer { t := time.NewTimer(d) return &Timer{C: t.C, timer: t} } // mockClock represents a mock clock that only moves forward programmically. // It can be preferable to a real-time clock when testing time-based functionality. type mockClock struct { mu sync.Mutex now time.Time // current time timers clockTimers // tickers & timers } // NewMockClock returns an instance of a mock clock. // The current time of the mock clock on initialization is the Unix epoch. func NewMockClock() Clock { return &mockClock{now: time.Unix(0, 0)} } // Add moves the current time of the mock clock forward by the duration. // This should only be called from a single goroutine at a time. func (m *mockClock) Add(d time.Duration) { // Calculate the final current time. t := m.now.Add(d) // Continue to execute timers until there are no more before the new time. for { if !m.runNextTimer(t) { break } } // Ensure that we end with the new time. m.mu.Lock() m.now = t m.mu.Unlock() // Give a small buffer to make sure the other goroutines get handled. gosched() } // runNextTimer executes the next timer in chronological order and moves the // current time to the timer's next tick time. The next time is not executed if // it's next time if after the max time. Returns true if a timer is executed. func (m *mockClock) runNextTimer(max time.Time) bool { m.mu.Lock() // Sort timers by time. sort.Sort(m.timers) // If we have no more timers then exit. if len(m.timers) == 0 { m.mu.Unlock() return false } // Retrieve next timer. Exit if next tick is after new time. t := m.timers[0] if t.Next().After(max) { m.mu.Unlock() return false } // Move "now" forward and unlock clock. m.now = t.Next() m.mu.Unlock() // Execute timer. t.Tick(m.now) return true } // After waits for the duration to elapse and then sends the current time on the returned channel. func (m *mockClock) After(d time.Duration) <-chan time.Time { return m.Timer(d).C } // AfterFunc waits for the duration to elapse and then executes a function. // A Timer is returned that can be stopped. func (m *mockClock) AfterFunc(d time.Duration, f func()) *Timer { t := m.Timer(d) t.C = nil t.fn = f return t } // Now returns the current wall time on the mock clock. func (m *mockClock) Now() time.Time { m.mu.Lock() defer m.mu.Unlock() return m.now } // Sleep pauses the goroutine for the given duration on the mock clock. // The clock must be moved forward in a separate goroutine. func (m *mockClock) Sleep(d time.Duration) { <-m.After(d) } // Tick is a convenience function for Ticker(). // It will return a ticker channel that cannot be stopped. func (m *mockClock) Tick(d time.Duration) <-chan time.Time { return m.Ticker(d).C } // Ticker creates a new instance of Ticker. func (m *mockClock) Ticker(d time.Duration) *Ticker { m.mu.Lock() defer m.mu.Unlock() ch := make(chan time.Time, 1) t := &Ticker{ C: ch, c: ch, mock: m, d: d, next: m.now.Add(d), } m.timers = append(m.timers, (*internalTicker)(t)) return t } // Timer creates a new instance of Timer. func (m *mockClock) Timer(d time.Duration) *Timer { m.mu.Lock() defer m.mu.Unlock() ch := make(chan time.Time, 1) t := &Timer{ C: ch, c: ch, mock: m, next: m.now.Add(d), } m.timers = append(m.timers, (*internalTimer)(t)) return t } func (m *mockClock) removeClockTimer(t clockTimer) { m.mu.Lock() defer m.mu.Unlock() for i, timer := range m.timers { if timer == t { copy(m.timers[i:], m.timers[i+1:]) m.timers[len(m.timers)-1] = nil m.timers = m.timers[:len(m.timers)-1] break } } sort.Sort(m.timers) } // clockTimer represents an object with an associated start time. type clockTimer interface { Next() time.Time Tick(time.Time) } // clockTimers represents a list of sortable timers. type clockTimers []clockTimer func (a clockTimers) Len() int { return len(a) } func (a clockTimers) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a clockTimers) Less(i, j int) bool { return a[i].Next().Before(a[j].Next()) } // Timer represents a single event. // The current time will be sent on C, unless the timer was created by AfterFunc. type Timer struct { C <-chan time.Time c chan time.Time timer *time.Timer // realtime impl, if set next time.Time // next tick time mock *mockClock // mock clock, if set fn func() // AfterFunc function, if set } // Stop turns off the ticker. func (t *Timer) Stop() { if t.timer != nil { t.timer.Stop() } else { t.mock.removeClockTimer((*internalTimer)(t)) } } type internalTimer Timer func (t *internalTimer) Next() time.Time { return t.next } func (t *internalTimer) Tick(now time.Time) { if t.fn != nil { t.fn() } else { t.c <- now } t.mock.removeClockTimer((*internalTimer)(t)) gosched() } // Ticker holds a channel that receives "ticks" at regular intervals. type Ticker struct { C <-chan time.Time c chan time.Time ticker *time.Ticker // realtime impl, if set next time.Time // next tick time mock *mockClock // mock clock, if set d time.Duration // time between ticks } // Stop turns off the ticker. func (t *Ticker) Stop() { if t.ticker != nil { t.ticker.Stop() } else { t.mock.removeClockTimer((*internalTicker)(t)) } } type internalTicker Ticker func (t *internalTicker) Next() time.Time { return t.next } func (t *internalTicker) Tick(now time.Time) { select { case t.c <- now: case <-time.After(goschedTimeout): } t.next = now.Add(t.d) gosched() } // goschedTimeout is the amount of wall time to sleep during goroutine scheduling. var goschedTimeout = 1 * time.Millisecond // Sleep momentarily so that other goroutines can process. func gosched() { time.Sleep(goschedTimeout) }