diff --git a/CHANGELOG.md b/CHANGELOG.md index e6eeb6d98c..c96f94f005 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ This release has a breaking API change for writes -- the field previously called - [#2548](https://github.com/influxdb/influxdb/issues/2548): Return an error when numeric aggregate applied to non-numeric data. - [#2487](https://github.com/influxdb/influxdb/issues/2487): Aggregate query with exact timestamp causes panic. Thanks @neonstalwart! - [#2552](https://github.com/influxdb/influxdb/issues/2552): Run CQ that is actually passed into go-routine. +- [#2553](https://github.com/influxdb/influxdb/issues/2553): Fix race condition during CQ execution. ## v0.9.0-rc29 [2015-05-05] diff --git a/server.go b/server.go index b06002bb9d..07943dcf5c 100644 --- a/server.go +++ b/server.go @@ -3817,6 +3817,37 @@ func NewContinuousQuery(q string) (*ContinuousQuery, error) { return cquery, nil } +// shouldRunContinuousQuery returns true if the CQ should be schedule to run. It will use the +// lastRunTime of the CQ and the rules for when to run set through the config to determine +// if this CQ should be run +func (cq *ContinuousQuery) shouldRunContinuousQuery(runsPerInterval int, noMoreThan time.Duration) bool { + // if it's not aggregated we don't run it + if cq.cq.Source.IsRawQuery { + return false + } + + // since it's aggregated we need to figure how often it should be run + interval, err := cq.cq.Source.GroupByInterval() + if err != nil { + return false + } + + // determine how often we should run this continuous query. + // group by time / the number of times to compute + computeEvery := time.Duration(interval.Nanoseconds()/int64(runsPerInterval)) * time.Nanosecond + // make sure we're running no more frequently than the setting in the config + if computeEvery < noMoreThan { + computeEvery = noMoreThan + } + + // if we've passed the amount of time since the last run, do it up + if cq.lastRun.Add(computeEvery).UnixNano() <= time.Now().UnixNano() { + return true + } + + return false +} + // applyCreateContinuousQueryCommand adds the continuous query to the database object and saves it to the metastore func (s *Server) applyCreateContinuousQueryCommand(m *messaging.Message) error { var c createContinuousQueryCommand @@ -3902,52 +3933,19 @@ func (s *Server) RunContinuousQueries() error { for _, d := range s.databases { for _, c := range d.continuousQueries { - if s.shouldRunContinuousQuery(c) { - // set the into retention policy based on what is now the default - if c.intoRP() == "" { - c.setIntoRP(d.defaultRetentionPolicy) - } - go func(cq *ContinuousQuery) { - s.runContinuousQuery(cq) - }(c) + // set the into retention policy based on what is now the default + if c.intoRP() == "" { + c.setIntoRP(d.defaultRetentionPolicy) } + go func(cq *ContinuousQuery) { + s.runContinuousQuery(cq) + }(c) } } return nil } -// shouldRunContinuousQuery returns true if the CQ should be schedule to run. It will use the -// lastRunTime of the CQ and the rules for when to run set through the config to determine -// if this CQ should be run -func (s *Server) shouldRunContinuousQuery(cq *ContinuousQuery) bool { - // if it's not aggregated we don't run it - if cq.cq.Source.IsRawQuery { - return false - } - - // since it's aggregated we need to figure how often it should be run - interval, err := cq.cq.Source.GroupByInterval() - if err != nil { - return false - } - - // determine how often we should run this continuous query. - // group by time / the number of times to compute - computeEvery := time.Duration(interval.Nanoseconds()/int64(s.ComputeRunsPerInterval)) * time.Nanosecond - // make sure we're running no more frequently than the setting in the config - if computeEvery < s.ComputeNoMoreThan { - computeEvery = s.ComputeNoMoreThan - } - - // if we've passed the amount of time since the last run, do it up - if cq.lastRun.Add(computeEvery).UnixNano() <= time.Now().UnixNano() { - return true - } - - return false -} - // runContinuousQuery will execute a continuous query // TODO: make this fan out to the cluster instead of running all the queries on this single data node func (s *Server) runContinuousQuery(cq *ContinuousQuery) { @@ -3955,6 +3953,10 @@ func (s *Server) runContinuousQuery(cq *ContinuousQuery) { cq.mu.Lock() defer cq.mu.Unlock() + if !cq.shouldRunContinuousQuery(s.ComputeRunsPerInterval, s.ComputeNoMoreThan) { + return + } + now := time.Now() cq.lastRun = now