Merge pull request #2067 from influxdb/relative-times-off-2045

Fixing intervals for group by.
pull/2072/head
Paul Dix 2015-03-24 22:17:12 -04:00
commit 7076b9ea9a
4 changed files with 32 additions and 7 deletions

View File

@ -14,6 +14,7 @@
- [#2057](https://github.com/influxdb/influxdb/pull/2057): Move racy "in order" test to integration test suite.
- [#2060](https://github.com/influxdb/influxdb/pull/2060): Reload server shard map on restart.
- [#2068](https://github.com/influxdb/influxdb/pull/2068): Fix misspelled JSON field.
- [#2067](https://github.com/influxdb/influxdb/pull/2067): Fixing intervals for GROUP BY.
## v0.9.0-rc15 [2015-03-19]

View File

@ -341,6 +341,9 @@ func runTests_Errors(t *testing.T, nodes Cluster) {
func runTestsData(t *testing.T, testName string, nodes Cluster, database, retention string) {
t.Logf("Running tests against %d-node cluster", len(nodes))
yesterday := time.Now().Add(-1 * time.Hour * 24).UTC()
now := time.Now().UTC()
// Start by ensuring database and retention policy exist.
createDatabase(t, testName, nodes, database)
createRetentionPolicy(t, testName, nodes, database, retention)
@ -405,6 +408,22 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
expected: `{"results":[{}]}`,
},
// Data read and write tests using relative time
{
reset: true,
name: "single point with timestamp pre-calculated for past time queries yesterday",
write: `{"database" : "%DB%", "retentionPolicy" : "%RP%", "points": [{"name": "cpu", "timestamp": "` + yesterday.Format(time.RFC3339Nano) + `", "tags": {"host": "server01"}, "fields": {"value": 100}}]}`,
query: `SELECT * FROM "%DB%"."%RP%".cpu where time >= '` + yesterday.Add(-1*time.Minute).Format(time.RFC3339Nano) + `'`,
expected: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["%s",100]]}]}]}`, yesterday.Format(time.RFC3339Nano)),
},
{
reset: true,
name: "single point with timestamp pre-calculated for relative time queries now",
write: `{"database" : "%DB%", "retentionPolicy" : "%RP%", "points": [{"name": "cpu", "timestamp": "` + now.Format(time.RFC3339Nano) + `", "tags": {"host": "server01"}, "fields": {"value": 100}}]}`,
query: `SELECT * FROM "%DB%"."%RP%".cpu where time >= now() - 1m`,
expected: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["%s",100]]}]}]}`, now.Format(time.RFC3339Nano)),
},
// Merge tests.
{
reset: true,

View File

@ -496,11 +496,19 @@ func (m *MapReduceJob) processAggregate(c *Call, reduceFunc ReduceFunc, resultVa
}
}
firstInterval := m.interval
if !m.stmt.IsRawQuery {
firstInterval = (m.TMin/m.interval*m.interval + m.interval) - m.TMin
}
// populate the result values for each interval of time
for i, _ := range resultValues {
// collect the results from each mapper
for j, mm := range m.Mappers {
res, err := mm.NextInterval(m.interval)
interval := m.interval
if i == 0 {
interval = firstInterval
}
res, err := mm.NextInterval(interval)
if err != nil {
return err
}
@ -569,7 +577,7 @@ func NewPlanner(db DB) *Planner {
// Plan creates an execution plan for the given SelectStatement and returns an Executor.
func (p *Planner) Plan(stmt *SelectStatement) (*Executor, error) {
now := p.Now()
now := p.Now().UTC()
// Replace instances of "now()" with the current time.
stmt.Condition = Reduce(stmt.Condition, &nowValuer{Now: now})

7
tx.go
View File

@ -321,13 +321,10 @@ func (l *LocalMapper) NextInterval(interval int64) (interface{}, error) {
return nil, nil
}
intervalBottom := l.tmin
// Set the upper bound of the interval.
if interval > 0 {
// Make sure the bottom of the interval lands on a natural boundary.
intervalBottom = intervalBottom / interval * interval
l.tmax = intervalBottom + interval - 1
l.tmax = l.tmin + interval - 1
}
// Execute the map function. This local mapper acts as the iterator
@ -343,7 +340,7 @@ func (l *LocalMapper) NextInterval(interval int64) (interface{}, error) {
}
// Move the interval forward.
l.tmin = intervalBottom + interval
l.tmin += interval
return val, nil
}