diff --git a/CHANGELOG.md b/CHANGELOG.md index 28ec59f9c6..4bb8a091af 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ - [#2057](https://github.com/influxdb/influxdb/pull/2057): Move racy "in order" test to integration test suite. - [#2060](https://github.com/influxdb/influxdb/pull/2060): Reload server shard map on restart. - [#2068](https://github.com/influxdb/influxdb/pull/2068): Fix misspelled JSON field. +- [#2067](https://github.com/influxdb/influxdb/pull/2067): Fixing intervals for GROUP BY. ## v0.9.0-rc15 [2015-03-19] diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index 7f8b10452a..fbc4d173fe 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -341,6 +341,9 @@ func runTests_Errors(t *testing.T, nodes Cluster) { func runTestsData(t *testing.T, testName string, nodes Cluster, database, retention string) { t.Logf("Running tests against %d-node cluster", len(nodes)) + yesterday := time.Now().Add(-1 * time.Hour * 24).UTC() + now := time.Now().UTC() + // Start by ensuring database and retention policy exist. createDatabase(t, testName, nodes, database) createRetentionPolicy(t, testName, nodes, database, retention) @@ -405,6 +408,22 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent expected: `{"results":[{}]}`, }, + // Data read and write tests using relative time + { + reset: true, + name: "single point with timestamp pre-calculated for past time queries yesterday", + write: `{"database" : "%DB%", "retentionPolicy" : "%RP%", "points": [{"name": "cpu", "timestamp": "` + yesterday.Format(time.RFC3339Nano) + `", "tags": {"host": "server01"}, "fields": {"value": 100}}]}`, + query: `SELECT * FROM "%DB%"."%RP%".cpu where time >= '` + yesterday.Add(-1*time.Minute).Format(time.RFC3339Nano) + `'`, + expected: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["%s",100]]}]}]}`, yesterday.Format(time.RFC3339Nano)), + }, + { + reset: true, + name: "single point with timestamp pre-calculated for relative time queries now", + write: `{"database" : "%DB%", "retentionPolicy" : "%RP%", "points": [{"name": "cpu", "timestamp": "` + now.Format(time.RFC3339Nano) + `", "tags": {"host": "server01"}, "fields": {"value": 100}}]}`, + query: `SELECT * FROM "%DB%"."%RP%".cpu where time >= now() - 1m`, + expected: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["%s",100]]}]}]}`, now.Format(time.RFC3339Nano)), + }, + // Merge tests. { reset: true, diff --git a/influxql/engine.go b/influxql/engine.go index 548da5fe04..25caa168d8 100644 --- a/influxql/engine.go +++ b/influxql/engine.go @@ -496,11 +496,19 @@ func (m *MapReduceJob) processAggregate(c *Call, reduceFunc ReduceFunc, resultVa } } + firstInterval := m.interval + if !m.stmt.IsRawQuery { + firstInterval = (m.TMin/m.interval*m.interval + m.interval) - m.TMin + } // populate the result values for each interval of time for i, _ := range resultValues { // collect the results from each mapper for j, mm := range m.Mappers { - res, err := mm.NextInterval(m.interval) + interval := m.interval + if i == 0 { + interval = firstInterval + } + res, err := mm.NextInterval(interval) if err != nil { return err } @@ -569,7 +577,7 @@ func NewPlanner(db DB) *Planner { // Plan creates an execution plan for the given SelectStatement and returns an Executor. func (p *Planner) Plan(stmt *SelectStatement) (*Executor, error) { - now := p.Now() + now := p.Now().UTC() // Replace instances of "now()" with the current time. stmt.Condition = Reduce(stmt.Condition, &nowValuer{Now: now}) diff --git a/tx.go b/tx.go index baca0dfe0c..356188a699 100644 --- a/tx.go +++ b/tx.go @@ -321,13 +321,10 @@ func (l *LocalMapper) NextInterval(interval int64) (interface{}, error) { return nil, nil } - intervalBottom := l.tmin - // Set the upper bound of the interval. if interval > 0 { // Make sure the bottom of the interval lands on a natural boundary. - intervalBottom = intervalBottom / interval * interval - l.tmax = intervalBottom + interval - 1 + l.tmax = l.tmin + interval - 1 } // Execute the map function. This local mapper acts as the iterator @@ -343,7 +340,7 @@ func (l *LocalMapper) NextInterval(interval int64) (interface{}, error) { } // Move the interval forward. - l.tmin = intervalBottom + interval + l.tmin += interval return val, nil }