influxdb/timer.go

171 lines
3.5 KiB
Go
Raw Normal View History

2013-04-28 21:23:21 +00:00
package raft
import (
"math/rand"
"sync"
"time"
)
//------------------------------------------------------------------------------
//
// Typedefs
//
//------------------------------------------------------------------------------
2013-07-06 04:49:47 +00:00
type timer struct {
fireChan chan time.Time
stopChan chan bool
state int
2013-05-27 02:06:08 +00:00
2013-04-28 21:23:21 +00:00
rand *rand.Rand
2013-05-05 20:01:06 +00:00
minDuration time.Duration
maxDuration time.Duration
2013-04-28 21:23:21 +00:00
internalTimer *time.Timer
mutex sync.Mutex
2013-04-28 21:23:21 +00:00
}
const (
2013-06-27 02:35:32 +00:00
STOPPED = iota
READY
RUNNING
)
2013-04-28 21:23:21 +00:00
//------------------------------------------------------------------------------
//
// Constructors
//
//------------------------------------------------------------------------------
// Creates a new timer. Panics if a non-positive duration is used.
2013-07-06 04:49:47 +00:00
func newTimer(minDuration time.Duration, maxDuration time.Duration) *timer {
2013-05-05 20:01:06 +00:00
if minDuration <= 0 {
2013-06-08 02:41:36 +00:00
panic("raft: Non-positive minimum duration not allowed")
2013-06-27 02:35:32 +00:00
} else if maxDuration <= 0 {
2013-06-08 02:41:36 +00:00
panic("raft: Non-positive maximum duration not allowed")
2013-06-27 02:35:32 +00:00
} else if minDuration > maxDuration {
2013-06-08 02:41:36 +00:00
panic("raft: Minimum duration cannot be greater than maximum duration")
2013-05-05 20:01:06 +00:00
}
2013-07-06 04:49:47 +00:00
return &timer{
2013-05-27 02:06:08 +00:00
minDuration: minDuration,
maxDuration: maxDuration,
state: READY,
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
2013-07-06 04:49:47 +00:00
stopChan: make(chan bool, 1),
fireChan: make(chan time.Time),
2013-04-28 21:23:21 +00:00
}
}
2013-05-01 05:21:56 +00:00
//------------------------------------------------------------------------------
//
// Accessors
//
//------------------------------------------------------------------------------
2013-05-05 20:26:04 +00:00
// Sets the minimum and maximum duration of the timer.
2013-07-06 04:49:47 +00:00
func (t *timer) setDuration(duration time.Duration) {
2013-05-05 20:26:04 +00:00
t.minDuration = duration
t.maxDuration = duration
}
2013-04-28 21:23:21 +00:00
//------------------------------------------------------------------------------
//
// Methods
//
//------------------------------------------------------------------------------
// Checks if the timer is currently running.
2013-07-06 04:49:47 +00:00
func (t *timer) running() bool {
return t.state == RUNNING
2013-04-28 21:23:21 +00:00
}
// Stops the timer and closes the channel.
2013-07-06 04:49:47 +00:00
func (t *timer) stop() {
2013-04-28 21:23:21 +00:00
t.mutex.Lock()
2013-06-27 02:35:32 +00:00
defer t.mutex.Unlock()
if t.internalTimer != nil {
t.internalTimer.Stop()
}
2013-06-27 02:35:32 +00:00
if t.state != STOPPED {
t.state = STOPPED
// non-blocking buffer
2013-07-06 04:49:47 +00:00
t.stopChan <- true
2013-04-28 21:23:21 +00:00
}
2013-05-27 00:02:31 +00:00
}
// Change the state of timer to ready
2013-07-06 04:49:47 +00:00
func (t *timer) ready() {
t.mutex.Lock()
2013-06-27 02:35:32 +00:00
defer t.mutex.Unlock()
if t.state == RUNNING {
2013-06-27 02:35:32 +00:00
panic("Timer is already running")
2013-04-28 21:23:21 +00:00
}
t.state = READY
2013-07-06 04:49:47 +00:00
t.stopChan = make(chan bool, 1)
t.fireChan = make(chan time.Time)
2013-04-28 21:23:21 +00:00
}
// Fire at the timer
2013-07-06 04:49:47 +00:00
func (t *timer) fire() {
select {
2013-07-06 04:49:47 +00:00
case t.fireChan <- time.Now():
return
2013-06-24 16:52:51 +00:00
default:
return
}
}
2013-06-27 02:35:32 +00:00
// Start the timer, this func will be blocked until the timer:
// (1) times out
// (2) stopped
// (3) fired
// Return false if stopped.
// Make sure the start func will not restart the stopped timer.
2013-07-06 04:49:47 +00:00
func (t *timer) start() bool {
2013-04-28 21:23:21 +00:00
t.mutex.Lock()
if t.state != READY {
t.mutex.Unlock()
return false
2013-04-28 21:23:21 +00:00
}
t.state = RUNNING
2013-04-28 21:23:21 +00:00
d := t.minDuration
if t.maxDuration > t.minDuration {
2013-05-10 14:47:24 +00:00
d += time.Duration(t.rand.Int63n(int64(t.maxDuration - t.minDuration)))
}
2013-04-28 21:23:21 +00:00
t.internalTimer = time.NewTimer(d)
2013-06-27 02:35:32 +00:00
internalTimer := t.internalTimer
t.mutex.Unlock()
2013-06-27 02:35:32 +00:00
// Wait for the timer channel, stop channel or fire channel.
stopped := false
select {
case <-internalTimer.C:
2013-07-06 04:49:47 +00:00
case <-t.fireChan:
case <-t.stopChan:
2013-06-27 02:35:32 +00:00
stopped = true
}
2013-06-27 02:35:32 +00:00
// Clean up timer and state.
t.mutex.Lock()
t.internalTimer.Stop()
t.internalTimer = nil
if stopped {
t.state = STOPPED
} else if t.state == RUNNING {
t.state = READY
}
t.mutex.Unlock()
return !stopped
2013-04-28 21:23:21 +00:00
}