Fix the cq start and end times to use unix timestamps

The Go timestamp leads Truncate to start a week on Monday, but the query
engine uses unix timestamps which has the week start on a Thursday.

Updating the service so it uses a custom truncate method that uses the
unix timestamp instead of `time.Time`.

Fixes #8569.
pull/8600/head
Jonathan A. Sternberg 2017-07-17 13:34:21 -05:00
parent 4244d0e053
commit f7d07910aa
3 changed files with 133 additions and 6 deletions

View File

@ -18,6 +18,7 @@
- [#8466](https://github.com/influxdata/influxdb/issues/8466): illumos build broken on syscall.Mmap
- [#8124](https://github.com/influxdata/influxdb/issues/8124): Prevent privileges on non-existent databases from being set.
- [#8558](https://github.com/influxdata/influxdb/issues/8558): Dropping measurement used several GB disk space
- [#8569](https://github.com/influxdata/influxdb/issues/8569): Fix the cq start and end times to use unix timestamps.
## v1.3.1 [unreleased]

View File

@ -327,7 +327,7 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti
// We're about to run the query so store the current time closest to the nearest interval.
// If all is going well, this time should be the same as nextRun.
cq.LastRun = now.Add(-offset).Truncate(resampleEvery).Add(offset)
cq.LastRun = truncate(now.Add(-offset), resampleEvery).Add(offset)
s.lastRuns[id] = cq.LastRun
// Retrieve the oldest interval we should calculate based on the next time
@ -348,8 +348,8 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti
}
// Calculate and set the time range for the query.
startTime := nextRun.Add(interval - resampleFor - offset - 1).Truncate(interval).Add(offset)
endTime := now.Add(interval - resampleEvery - offset).Truncate(interval).Add(offset)
startTime := truncate(nextRun.Add(interval-resampleFor-offset-1), interval).Add(offset)
endTime := truncate(now.Add(interval-resampleEvery-offset), interval).Add(offset)
if !endTime.After(startTime) {
// Exit early since there is no time interval.
return false, nil
@ -482,18 +482,19 @@ func NewContinuousQuery(database string, cqi *meta.ContinuousQueryInfo) (*Contin
// lastRunTime of the CQ and the rules for when to run set through the query to determine
// if this CQ should be run.
func (cq *ContinuousQuery) shouldRunContinuousQuery(now time.Time, interval time.Duration) (bool, time.Time, error) {
// if it's not aggregated we don't run it
// If it's not aggregated, do not run the query.
if cq.q.IsRawQuery {
return false, cq.LastRun, errors.New("continuous queries must be aggregate queries")
}
// allow the interval to be overwritten by the query's resample options
// Override the query's default run interval with the resample options.
resampleEvery := interval
if cq.Resample.Every != 0 {
resampleEvery = cq.Resample.Every
}
// if we've passed the amount of time since the last run, or there was no last run, do it up
// Determine if we should run the continuous query based on the last time it ran.
// If the query never ran, execute it using the current time.
if cq.HasRun {
nextRun := cq.LastRun.Add(resampleEvery)
if nextRun.UnixNano() <= now.UnixNano() {
@ -512,3 +513,17 @@ func assert(condition bool, msg string, v ...interface{}) {
panic(fmt.Sprintf("assert failed: "+msg, v...))
}
}
// truncate truncates the time based on the unix timestamp instead of the
// Go time library. The Go time library has the start of the week on Monday
// while the start of the week for the unix timestamp is a Thursday.
func truncate(ts time.Time, d time.Duration) time.Time {
t := ts.UnixNano()
dt := t % int64(d)
if dt < 0 {
// Negative modulo rounds up instead of down, so offset
// with the duration.
dt += int64(d)
}
return time.Unix(0, t-dt).UTC()
}

View File

@ -358,6 +358,109 @@ func TestExecuteContinuousQuery_InvalidQueries(t *testing.T) {
}
}
// Test the time range for different CQ durations.
func TestExecuteContinuousQuery_TimeRange(t *testing.T) {
// Choose a start date that is not on an interval border for anyone.
now := mustParseTime(t, "2000-01-01T00:00:00Z")
for _, tt := range []struct {
d string
start, end time.Time
}{
{
d: "10s",
start: mustParseTime(t, "2000-01-01T00:00:00Z"),
end: mustParseTime(t, "2000-01-01T00:00:10Z"),
},
{
d: "1m",
start: mustParseTime(t, "2000-01-01T00:00:00Z"),
end: mustParseTime(t, "2000-01-01T00:01:00Z"),
},
{
d: "10m",
start: mustParseTime(t, "2000-01-01T00:00:00Z"),
end: mustParseTime(t, "2000-01-01T00:10:00Z"),
},
{
d: "30m",
start: mustParseTime(t, "2000-01-01T00:00:00Z"),
end: mustParseTime(t, "2000-01-01T00:30:00Z"),
},
{
d: "1h",
start: mustParseTime(t, "2000-01-01T00:00:00Z"),
end: mustParseTime(t, "2000-01-01T01:00:00Z"),
},
{
d: "2h",
start: mustParseTime(t, "2000-01-01T00:00:00Z"),
end: mustParseTime(t, "2000-01-01T02:00:00Z"),
},
{
d: "12h",
start: mustParseTime(t, "2000-01-01T00:00:00Z"),
end: mustParseTime(t, "2000-01-01T12:00:00Z"),
},
{
d: "1d",
start: mustParseTime(t, "2000-01-01T00:00:00Z"),
end: mustParseTime(t, "2000-01-02T00:00:00Z"),
},
{
d: "1w",
start: mustParseTime(t, "1999-12-30T00:00:00Z"),
end: mustParseTime(t, "2000-01-06T00:00:00Z"),
},
} {
t.Run(tt.d, func(t *testing.T) {
d, err := influxql.ParseDuration(tt.d)
if err != nil {
t.Fatalf("unable to parse duration: %s", err)
}
s := NewTestService(t)
mc := NewMetaClient(t)
mc.CreateDatabase("db", "")
mc.CreateContinuousQuery("db", "cq",
fmt.Sprintf(`CREATE CONTINUOUS QUERY cq ON db BEGIN SELECT mean(value) INTO cpu_mean FROM cpu GROUP BY time(%s) END`, tt.d))
s.MetaClient = mc
// Set RunInterval high so we can trigger using Run method.
s.RunInterval = 10 * time.Minute
done := make(chan struct{})
// Set a callback for ExecuteStatement.
s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
s := stmt.(*influxql.SelectStatement)
min, max, err := influxql.TimeRange(s.Condition)
max = max.Add(time.Nanosecond)
if err != nil {
t.Errorf("unexpected error parsing time range: %s", err)
} else if !tt.start.Equal(min) || !tt.end.Equal(max) {
t.Errorf("mismatched time range: got=(%s, %s) exp=(%s, %s)", min, max, tt.start, tt.end)
}
done <- struct{}{}
ctx.Results <- &influxql.Result{}
return nil
},
}
s.Open()
defer s.Close()
// Send an initial run request one nanosecond after the start to
// prime the last CQ map.
s.RunCh <- &RunRequest{Now: now.Add(time.Nanosecond)}
// Execute the real request after the time interval.
s.RunCh <- &RunRequest{Now: now.Add(d)}
if err := wait(done, 100*time.Millisecond); err != nil {
t.Fatal(err)
}
})
}
}
// Test ExecuteContinuousQuery when QueryExecutor returns an error.
func TestExecuteContinuousQuery_QueryExecutor_Error(t *testing.T) {
s := NewTestService(t)
@ -617,3 +720,11 @@ type monitor struct {
func (m *monitor) Enabled() bool { return m.EnabledFn() }
func (m *monitor) WritePoints(p models.Points) error { return m.WritePointsFn(p) }
func mustParseTime(t *testing.T, value string) time.Time {
ts, err := time.Parse(time.RFC3339, value)
if err != nil {
t.Fatalf("unable to parse time: %s", err)
}
return ts
}