From f7d07910aa340b9f5ef085e3fc752e5c2f2b4ae7 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Mon, 17 Jul 2017 13:34:21 -0500 Subject: [PATCH] Fix the cq start and end times to use unix timestamps The Go timestamp leads Truncate to start a week on Monday, but the query engine uses unix timestamps which has the week start on a Thursday. Updating the service so it uses a custom truncate method that uses the unix timestamp instead of `time.Time`. Fixes #8569. --- CHANGELOG.md | 1 + services/continuous_querier/service.go | 27 +++-- services/continuous_querier/service_test.go | 111 ++++++++++++++++++++ 3 files changed, 133 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index db2b44805d..f13ac3ae13 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ - [#8466](https://github.com/influxdata/influxdb/issues/8466): illumos build broken on syscall.Mmap - [#8124](https://github.com/influxdata/influxdb/issues/8124): Prevent privileges on non-existent databases from being set. - [#8558](https://github.com/influxdata/influxdb/issues/8558): Dropping measurement used several GB disk space +- [#8569](https://github.com/influxdata/influxdb/issues/8569): Fix the cq start and end times to use unix timestamps. ## v1.3.1 [unreleased] diff --git a/services/continuous_querier/service.go b/services/continuous_querier/service.go index 4ef6054c4f..b9e1c743fb 100644 --- a/services/continuous_querier/service.go +++ b/services/continuous_querier/service.go @@ -327,7 +327,7 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti // We're about to run the query so store the current time closest to the nearest interval. // If all is going well, this time should be the same as nextRun. - cq.LastRun = now.Add(-offset).Truncate(resampleEvery).Add(offset) + cq.LastRun = truncate(now.Add(-offset), resampleEvery).Add(offset) s.lastRuns[id] = cq.LastRun // Retrieve the oldest interval we should calculate based on the next time @@ -348,8 +348,8 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti } // Calculate and set the time range for the query. - startTime := nextRun.Add(interval - resampleFor - offset - 1).Truncate(interval).Add(offset) - endTime := now.Add(interval - resampleEvery - offset).Truncate(interval).Add(offset) + startTime := truncate(nextRun.Add(interval-resampleFor-offset-1), interval).Add(offset) + endTime := truncate(now.Add(interval-resampleEvery-offset), interval).Add(offset) if !endTime.After(startTime) { // Exit early since there is no time interval. return false, nil @@ -482,18 +482,19 @@ func NewContinuousQuery(database string, cqi *meta.ContinuousQueryInfo) (*Contin // lastRunTime of the CQ and the rules for when to run set through the query to determine // if this CQ should be run. func (cq *ContinuousQuery) shouldRunContinuousQuery(now time.Time, interval time.Duration) (bool, time.Time, error) { - // if it's not aggregated we don't run it + // If it's not aggregated, do not run the query. if cq.q.IsRawQuery { return false, cq.LastRun, errors.New("continuous queries must be aggregate queries") } - // allow the interval to be overwritten by the query's resample options + // Override the query's default run interval with the resample options. resampleEvery := interval if cq.Resample.Every != 0 { resampleEvery = cq.Resample.Every } - // if we've passed the amount of time since the last run, or there was no last run, do it up + // Determine if we should run the continuous query based on the last time it ran. + // If the query never ran, execute it using the current time. if cq.HasRun { nextRun := cq.LastRun.Add(resampleEvery) if nextRun.UnixNano() <= now.UnixNano() { @@ -512,3 +513,17 @@ func assert(condition bool, msg string, v ...interface{}) { panic(fmt.Sprintf("assert failed: "+msg, v...)) } } + +// truncate truncates the time based on the unix timestamp instead of the +// Go time library. The Go time library has the start of the week on Monday +// while the start of the week for the unix timestamp is a Thursday. +func truncate(ts time.Time, d time.Duration) time.Time { + t := ts.UnixNano() + dt := t % int64(d) + if dt < 0 { + // Negative modulo rounds up instead of down, so offset + // with the duration. + dt += int64(d) + } + return time.Unix(0, t-dt).UTC() +} diff --git a/services/continuous_querier/service_test.go b/services/continuous_querier/service_test.go index b5452e90e4..b0b532f3b3 100644 --- a/services/continuous_querier/service_test.go +++ b/services/continuous_querier/service_test.go @@ -358,6 +358,109 @@ func TestExecuteContinuousQuery_InvalidQueries(t *testing.T) { } } +// Test the time range for different CQ durations. +func TestExecuteContinuousQuery_TimeRange(t *testing.T) { + // Choose a start date that is not on an interval border for anyone. + now := mustParseTime(t, "2000-01-01T00:00:00Z") + for _, tt := range []struct { + d string + start, end time.Time + }{ + { + d: "10s", + start: mustParseTime(t, "2000-01-01T00:00:00Z"), + end: mustParseTime(t, "2000-01-01T00:00:10Z"), + }, + { + d: "1m", + start: mustParseTime(t, "2000-01-01T00:00:00Z"), + end: mustParseTime(t, "2000-01-01T00:01:00Z"), + }, + { + d: "10m", + start: mustParseTime(t, "2000-01-01T00:00:00Z"), + end: mustParseTime(t, "2000-01-01T00:10:00Z"), + }, + { + d: "30m", + start: mustParseTime(t, "2000-01-01T00:00:00Z"), + end: mustParseTime(t, "2000-01-01T00:30:00Z"), + }, + { + d: "1h", + start: mustParseTime(t, "2000-01-01T00:00:00Z"), + end: mustParseTime(t, "2000-01-01T01:00:00Z"), + }, + { + d: "2h", + start: mustParseTime(t, "2000-01-01T00:00:00Z"), + end: mustParseTime(t, "2000-01-01T02:00:00Z"), + }, + { + d: "12h", + start: mustParseTime(t, "2000-01-01T00:00:00Z"), + end: mustParseTime(t, "2000-01-01T12:00:00Z"), + }, + { + d: "1d", + start: mustParseTime(t, "2000-01-01T00:00:00Z"), + end: mustParseTime(t, "2000-01-02T00:00:00Z"), + }, + { + d: "1w", + start: mustParseTime(t, "1999-12-30T00:00:00Z"), + end: mustParseTime(t, "2000-01-06T00:00:00Z"), + }, + } { + t.Run(tt.d, func(t *testing.T) { + d, err := influxql.ParseDuration(tt.d) + if err != nil { + t.Fatalf("unable to parse duration: %s", err) + } + + s := NewTestService(t) + mc := NewMetaClient(t) + mc.CreateDatabase("db", "") + mc.CreateContinuousQuery("db", "cq", + fmt.Sprintf(`CREATE CONTINUOUS QUERY cq ON db BEGIN SELECT mean(value) INTO cpu_mean FROM cpu GROUP BY time(%s) END`, tt.d)) + s.MetaClient = mc + + // Set RunInterval high so we can trigger using Run method. + s.RunInterval = 10 * time.Minute + done := make(chan struct{}) + + // Set a callback for ExecuteStatement. + s.QueryExecutor.StatementExecutor = &StatementExecutor{ + ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { + s := stmt.(*influxql.SelectStatement) + min, max, err := influxql.TimeRange(s.Condition) + max = max.Add(time.Nanosecond) + if err != nil { + t.Errorf("unexpected error parsing time range: %s", err) + } else if !tt.start.Equal(min) || !tt.end.Equal(max) { + t.Errorf("mismatched time range: got=(%s, %s) exp=(%s, %s)", min, max, tt.start, tt.end) + } + done <- struct{}{} + ctx.Results <- &influxql.Result{} + return nil + }, + } + + s.Open() + defer s.Close() + + // Send an initial run request one nanosecond after the start to + // prime the last CQ map. + s.RunCh <- &RunRequest{Now: now.Add(time.Nanosecond)} + // Execute the real request after the time interval. + s.RunCh <- &RunRequest{Now: now.Add(d)} + if err := wait(done, 100*time.Millisecond); err != nil { + t.Fatal(err) + } + }) + } +} + // Test ExecuteContinuousQuery when QueryExecutor returns an error. func TestExecuteContinuousQuery_QueryExecutor_Error(t *testing.T) { s := NewTestService(t) @@ -617,3 +720,11 @@ type monitor struct { func (m *monitor) Enabled() bool { return m.EnabledFn() } func (m *monitor) WritePoints(p models.Points) error { return m.WritePointsFn(p) } + +func mustParseTime(t *testing.T, value string) time.Time { + ts, err := time.Parse(time.RFC3339, value) + if err != nil { + t.Fatalf("unable to parse time: %s", err) + } + return ts +}