Do not increment the continuous query statistic if no query is run
Instead of incrementing the `queryOk` statistic with or without the continuous query running, it will only increment when the query is actually executed.pull/8102/head
parent
ce8d8e46cc
commit
9cdfdd04e9
|
@ -12,6 +12,10 @@
|
|||
- [#7821](https://github.com/influxdata/influxdb/issues/7821): Expose some configuration settings via SHOW DIAGNOSTICS
|
||||
- [#8025](https://github.com/influxdata/influxdb/issues/8025): Support single and multiline comments in InfluxQL.
|
||||
|
||||
### Bugfixes
|
||||
|
||||
- [#8091](https://github.com/influxdata/influxdb/issues/8091): Do not increment the continuous query statistic if no query is run.
|
||||
|
||||
## v1.2.2 [2017-03-14]
|
||||
|
||||
### Release Notes
|
||||
|
|
|
@ -247,25 +247,25 @@ func (s *Service) runContinuousQueries(req *RunRequest) {
|
|||
if !req.matches(&cq) {
|
||||
continue
|
||||
}
|
||||
if err := s.ExecuteContinuousQuery(&db, &cq, req.Now); err != nil {
|
||||
if ok, err := s.ExecuteContinuousQuery(&db, &cq, req.Now); err != nil {
|
||||
s.Logger.Info(fmt.Sprintf("error executing query: %s: err = %s", cq.Query, err))
|
||||
atomic.AddInt64(&s.stats.QueryFail, 1)
|
||||
} else {
|
||||
} else if ok {
|
||||
atomic.AddInt64(&s.stats.QueryOK, 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ExecuteContinuousQuery executes a single CQ.
|
||||
func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.ContinuousQueryInfo, now time.Time) error {
|
||||
// ExecuteContinuousQuery may execute a single CQ. This will return false if there were no errors and the CQ was not run.
|
||||
func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.ContinuousQueryInfo, now time.Time) (bool, error) {
|
||||
// TODO: re-enable stats
|
||||
//s.stats.Inc("continuousQueryExecuted")
|
||||
|
||||
// Local wrapper / helper.
|
||||
cq, err := NewContinuousQuery(dbi.Name, cqi)
|
||||
if err != nil {
|
||||
return err
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Get the last time this CQ was run from the service's cache.
|
||||
|
@ -279,26 +279,26 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti
|
|||
cq.setIntoRP(dbi.DefaultRetentionPolicy)
|
||||
}
|
||||
|
||||
// See if this query needs to be run.
|
||||
run, nextRun, err := cq.shouldRunContinuousQuery(now)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if !run {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get the group by interval.
|
||||
interval, err := cq.q.GroupByInterval()
|
||||
if err != nil {
|
||||
return err
|
||||
return false, err
|
||||
} else if interval == 0 {
|
||||
return nil
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Get the group by offset.
|
||||
offset, err := cq.q.GroupByOffset()
|
||||
if err != nil {
|
||||
return err
|
||||
return false, err
|
||||
}
|
||||
|
||||
// See if this query needs to be run.
|
||||
run, nextRun, err := cq.shouldRunContinuousQuery(now, interval)
|
||||
if err != nil {
|
||||
return false, err
|
||||
} else if !run {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
resampleEvery := interval
|
||||
|
@ -333,12 +333,12 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti
|
|||
endTime := now.Add(interval - resampleEvery - offset).Truncate(interval).Add(offset)
|
||||
if !endTime.After(startTime) {
|
||||
// Exit early since there is no time interval.
|
||||
return nil
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if err := cq.q.SetTimeRange(startTime, endTime); err != nil {
|
||||
s.Logger.Info(fmt.Sprintf("error setting time range: %s\n", err))
|
||||
return err
|
||||
return false, err
|
||||
}
|
||||
|
||||
var start time.Time
|
||||
|
@ -350,13 +350,13 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti
|
|||
// Do the actual processing of the query & writing of results.
|
||||
if err := s.runContinuousQueryAndWriteResult(cq); err != nil {
|
||||
s.Logger.Info(fmt.Sprintf("error: %s. running: %s\n", err, cq.q.String()))
|
||||
return err
|
||||
return false, err
|
||||
}
|
||||
|
||||
if s.loggingEnabled {
|
||||
s.Logger.Info(fmt.Sprintf("finished continuous query %s (%v to %v) in %s", cq.Info.Name, startTime, endTime, time.Since(start)))
|
||||
}
|
||||
return nil
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// runContinuousQueryAndWriteResult will run the query against the cluster and write the results back in
|
||||
|
@ -441,18 +441,12 @@ func NewContinuousQuery(database string, cqi *meta.ContinuousQueryInfo) (*Contin
|
|||
// shouldRunContinuousQuery returns true if the CQ should be schedule to run. It will use the
|
||||
// lastRunTime of the CQ and the rules for when to run set through the query to determine
|
||||
// if this CQ should be run.
|
||||
func (cq *ContinuousQuery) shouldRunContinuousQuery(now time.Time) (bool, time.Time, error) {
|
||||
func (cq *ContinuousQuery) shouldRunContinuousQuery(now time.Time, interval time.Duration) (bool, time.Time, error) {
|
||||
// if it's not aggregated we don't run it
|
||||
if cq.q.IsRawQuery {
|
||||
return false, cq.LastRun, errors.New("continuous queries must be aggregate queries")
|
||||
}
|
||||
|
||||
// since it's aggregated we need to figure how often it should be run
|
||||
interval, err := cq.q.GroupByInterval()
|
||||
if err != nil {
|
||||
return false, cq.LastRun, err
|
||||
}
|
||||
|
||||
// allow the interval to be overwritten by the query's resample options
|
||||
resampleEvery := interval
|
||||
if cq.Resample.Every != 0 {
|
||||
|
|
|
@ -340,22 +340,19 @@ func TestExecuteContinuousQuery_InvalidQueries(t *testing.T) {
|
|||
cqi := dbi.ContinuousQueries[0]
|
||||
|
||||
cqi.Query = `this is not a query`
|
||||
err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now())
|
||||
if err == nil {
|
||||
if _, err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()); err == nil {
|
||||
t.Error("expected error but got nil")
|
||||
}
|
||||
|
||||
// Valid query but invalid continuous query.
|
||||
cqi.Query = `SELECT * FROM cpu`
|
||||
err = s.ExecuteContinuousQuery(&dbi, &cqi, time.Now())
|
||||
if err == nil {
|
||||
if _, err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()); err == nil {
|
||||
t.Error("expected error but got nil")
|
||||
}
|
||||
|
||||
// Group by requires aggregate.
|
||||
cqi.Query = `SELECT value INTO other_value FROM cpu WHERE time > now() - 1h GROUP BY time(1s)`
|
||||
err = s.ExecuteContinuousQuery(&dbi, &cqi, time.Now())
|
||||
if err == nil {
|
||||
if _, err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()); err == nil {
|
||||
t.Error("expected error but got nil")
|
||||
}
|
||||
}
|
||||
|
@ -374,8 +371,7 @@ func TestExecuteContinuousQuery_QueryExecutor_Error(t *testing.T) {
|
|||
cqi := dbi.ContinuousQueries[0]
|
||||
|
||||
now := time.Now().Truncate(10 * time.Minute)
|
||||
err := s.ExecuteContinuousQuery(&dbi, &cqi, now)
|
||||
if err != errExpected {
|
||||
if _, err := s.ExecuteContinuousQuery(&dbi, &cqi, now); err != errExpected {
|
||||
t.Errorf("exp = %s, got = %v", errExpected, err)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue