Refactor CQ code to avoid race conditions.
Fixes race detected by CircleCI: https://circle-artifacts.com/gh/influxdb/influxdb/2332/artifacts/1/tmp/circle-artifacts.8UTeJCy/test_logs_race.txtpull/2553/head
parent
8cced04917
commit
beb58eeb18
|
@ -24,6 +24,7 @@ This release has a breaking API change for writes -- the field previously called
|
|||
- [#2548](https://github.com/influxdb/influxdb/issues/2548): Return an error when numeric aggregate applied to non-numeric data.
|
||||
- [#2487](https://github.com/influxdb/influxdb/issues/2487): Aggregate query with exact timestamp causes panic. Thanks @neonstalwart!
|
||||
- [#2552](https://github.com/influxdb/influxdb/issues/2552): Run CQ that is actually passed into go-routine.
|
||||
- [#2553](https://github.com/influxdb/influxdb/issues/2553): Fix race condition during CQ execution.
|
||||
|
||||
## v0.9.0-rc29 [2015-05-05]
|
||||
|
||||
|
|
80
server.go
80
server.go
|
@ -3817,6 +3817,37 @@ func NewContinuousQuery(q string) (*ContinuousQuery, error) {
|
|||
return cquery, nil
|
||||
}
|
||||
|
||||
// shouldRunContinuousQuery returns true if the CQ should be schedule to run. It will use the
|
||||
// lastRunTime of the CQ and the rules for when to run set through the config to determine
|
||||
// if this CQ should be run
|
||||
func (cq *ContinuousQuery) shouldRunContinuousQuery(runsPerInterval int, noMoreThan time.Duration) bool {
|
||||
// if it's not aggregated we don't run it
|
||||
if cq.cq.Source.IsRawQuery {
|
||||
return false
|
||||
}
|
||||
|
||||
// since it's aggregated we need to figure how often it should be run
|
||||
interval, err := cq.cq.Source.GroupByInterval()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// determine how often we should run this continuous query.
|
||||
// group by time / the number of times to compute
|
||||
computeEvery := time.Duration(interval.Nanoseconds()/int64(runsPerInterval)) * time.Nanosecond
|
||||
// make sure we're running no more frequently than the setting in the config
|
||||
if computeEvery < noMoreThan {
|
||||
computeEvery = noMoreThan
|
||||
}
|
||||
|
||||
// if we've passed the amount of time since the last run, do it up
|
||||
if cq.lastRun.Add(computeEvery).UnixNano() <= time.Now().UnixNano() {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// applyCreateContinuousQueryCommand adds the continuous query to the database object and saves it to the metastore
|
||||
func (s *Server) applyCreateContinuousQueryCommand(m *messaging.Message) error {
|
||||
var c createContinuousQueryCommand
|
||||
|
@ -3902,52 +3933,19 @@ func (s *Server) RunContinuousQueries() error {
|
|||
|
||||
for _, d := range s.databases {
|
||||
for _, c := range d.continuousQueries {
|
||||
if s.shouldRunContinuousQuery(c) {
|
||||
// set the into retention policy based on what is now the default
|
||||
if c.intoRP() == "" {
|
||||
c.setIntoRP(d.defaultRetentionPolicy)
|
||||
}
|
||||
go func(cq *ContinuousQuery) {
|
||||
s.runContinuousQuery(cq)
|
||||
}(c)
|
||||
// set the into retention policy based on what is now the default
|
||||
if c.intoRP() == "" {
|
||||
c.setIntoRP(d.defaultRetentionPolicy)
|
||||
}
|
||||
go func(cq *ContinuousQuery) {
|
||||
s.runContinuousQuery(cq)
|
||||
}(c)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// shouldRunContinuousQuery returns true if the CQ should be schedule to run. It will use the
|
||||
// lastRunTime of the CQ and the rules for when to run set through the config to determine
|
||||
// if this CQ should be run
|
||||
func (s *Server) shouldRunContinuousQuery(cq *ContinuousQuery) bool {
|
||||
// if it's not aggregated we don't run it
|
||||
if cq.cq.Source.IsRawQuery {
|
||||
return false
|
||||
}
|
||||
|
||||
// since it's aggregated we need to figure how often it should be run
|
||||
interval, err := cq.cq.Source.GroupByInterval()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// determine how often we should run this continuous query.
|
||||
// group by time / the number of times to compute
|
||||
computeEvery := time.Duration(interval.Nanoseconds()/int64(s.ComputeRunsPerInterval)) * time.Nanosecond
|
||||
// make sure we're running no more frequently than the setting in the config
|
||||
if computeEvery < s.ComputeNoMoreThan {
|
||||
computeEvery = s.ComputeNoMoreThan
|
||||
}
|
||||
|
||||
// if we've passed the amount of time since the last run, do it up
|
||||
if cq.lastRun.Add(computeEvery).UnixNano() <= time.Now().UnixNano() {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// runContinuousQuery will execute a continuous query
|
||||
// TODO: make this fan out to the cluster instead of running all the queries on this single data node
|
||||
func (s *Server) runContinuousQuery(cq *ContinuousQuery) {
|
||||
|
@ -3955,6 +3953,10 @@ func (s *Server) runContinuousQuery(cq *ContinuousQuery) {
|
|||
cq.mu.Lock()
|
||||
defer cq.mu.Unlock()
|
||||
|
||||
if !cq.shouldRunContinuousQuery(s.ComputeRunsPerInterval, s.ComputeNoMoreThan) {
|
||||
return
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
cq.lastRun = now
|
||||
|
||||
|
|
Loading…
Reference in New Issue