From 7be574534cb97f0d30993bb40f30e65e6a0f6959 Mon Sep 17 00:00:00 2001 From: Cory Lanou Date: Tue, 24 Mar 2015 16:24:24 -0600 Subject: [PATCH 1/7] Fixing intervals for group by. Fixing bad PR --- cmd/influxd/server_integration_test.go | 19 +++++++++++++++++++ influxql/engine.go | 12 ++++++++++-- tx.go | 8 +++----- 3 files changed, 32 insertions(+), 7 deletions(-) diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index 25eead9951..ad80747e8c 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -340,6 +340,9 @@ func runTests_Errors(t *testing.T, nodes Cluster) { func runTestsData(t *testing.T, testName string, nodes Cluster, database, retention string) { t.Logf("Running tests against %d-node cluster", len(nodes)) + yesterday := time.Now().Add(-1 * time.Hour * 24).UTC() + now := time.Now().UTC() + // Start by ensuring database and retention policy exist. createDatabase(t, testName, nodes, database) createRetentionPolicy(t, testName, nodes, database, retention) @@ -404,6 +407,22 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent expected: `{"results":[{}]}`, }, + // Data read and write tests using relative time + { + reset: true, + name: "single point with timestamp pre-calculated for relative time queries yesterday", + write: `{"database" : "%DB%", "retentionPolicy" : "%RP%", "points": [{"name": "cpu", "timestamp": "` + yesterday.Format(time.RFC3339Nano) + `", "tags": {"host": "server01"}, "fields": {"value": 100}}]}`, + query: `SELECT * FROM "%DB%"."%RP%".cpu where time >= '` + yesterday.Add(-1*time.Minute).Format(time.RFC3339Nano) + `'`, + expected: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["%s",100]]}]}]}`, yesterday.Format(time.RFC3339Nano)), + }, + { + reset: true, + name: "single point with timestamp pre-calculated for relative time queries now", + write: `{"database" : "%DB%", "retentionPolicy" : "%RP%", "points": [{"name": "cpu", "timestamp": "` + now.Format(time.RFC3339Nano) + `", "tags": {"host": "server01"}, "fields": {"value": 100}}]}`, + query: `SELECT * FROM "%DB%"."%RP%".cpu where time >= now() - 1m`, + expected: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["%s",100]]}]}]}`, now.Format(time.RFC3339Nano)), + }, + // Merge tests. { reset: true, diff --git a/influxql/engine.go b/influxql/engine.go index 548da5fe04..25caa168d8 100644 --- a/influxql/engine.go +++ b/influxql/engine.go @@ -496,11 +496,19 @@ func (m *MapReduceJob) processAggregate(c *Call, reduceFunc ReduceFunc, resultVa } } + firstInterval := m.interval + if !m.stmt.IsRawQuery { + firstInterval = (m.TMin/m.interval*m.interval + m.interval) - m.TMin + } // populate the result values for each interval of time for i, _ := range resultValues { // collect the results from each mapper for j, mm := range m.Mappers { - res, err := mm.NextInterval(m.interval) + interval := m.interval + if i == 0 { + interval = firstInterval + } + res, err := mm.NextInterval(interval) if err != nil { return err } @@ -569,7 +577,7 @@ func NewPlanner(db DB) *Planner { // Plan creates an execution plan for the given SelectStatement and returns an Executor. func (p *Planner) Plan(stmt *SelectStatement) (*Executor, error) { - now := p.Now() + now := p.Now().UTC() // Replace instances of "now()" with the current time. stmt.Condition = Reduce(stmt.Condition, &nowValuer{Now: now}) diff --git a/tx.go b/tx.go index baca0dfe0c..93505f9dc2 100644 --- a/tx.go +++ b/tx.go @@ -317,17 +317,15 @@ func (l *LocalMapper) Begin(c *influxql.Call, startingTime int64) error { // NextInterval will get the time ordered next interval of the given interval size from the mapper. This is a // forward only operation from the start time passed into Begin. Will return nil when there is no more data to be read. func (l *LocalMapper) NextInterval(interval int64) (interface{}, error) { + warn("> ", time.Unix(0, l.tmin).UTC(), time.Unix(0, l.tmax).UTC()) if l.cursorsEmpty || l.tmin > l.job.TMax { return nil, nil } - intervalBottom := l.tmin - // Set the upper bound of the interval. if interval > 0 { // Make sure the bottom of the interval lands on a natural boundary. - intervalBottom = intervalBottom / interval * interval - l.tmax = intervalBottom + interval - 1 + l.tmax = l.tmin + interval - 1 } // Execute the map function. This local mapper acts as the iterator @@ -343,7 +341,7 @@ func (l *LocalMapper) NextInterval(interval int64) (interface{}, error) { } // Move the interval forward. - l.tmin = intervalBottom + interval + l.tmin += interval return val, nil } From f5659183a5729a0dc21b5df85978b5bc82e2792c Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 24 Mar 2015 16:31:45 -0600 Subject: [PATCH 2/7] update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 28ec59f9c6..60d001239a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ - [#2057](https://github.com/influxdb/influxdb/pull/2057): Move racy "in order" test to integration test suite. - [#2060](https://github.com/influxdb/influxdb/pull/2060): Reload server shard map on restart. - [#2068](https://github.com/influxdb/influxdb/pull/2068): Fix misspelled JSON field. +- [#2067](https://github.com/influxdb/influxdb/pull/2067): Fixing intervals for group by. ## v0.9.0-rc15 [2015-03-19] From d2529a24e7396f80c826d4f3f9f0ca0045174cc2 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 24 Mar 2015 17:07:37 -0600 Subject: [PATCH 3/7] make comment clearer --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 60d001239a..4bb8a091af 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,7 @@ - [#2057](https://github.com/influxdb/influxdb/pull/2057): Move racy "in order" test to integration test suite. - [#2060](https://github.com/influxdb/influxdb/pull/2060): Reload server shard map on restart. - [#2068](https://github.com/influxdb/influxdb/pull/2068): Fix misspelled JSON field. -- [#2067](https://github.com/influxdb/influxdb/pull/2067): Fixing intervals for group by. +- [#2067](https://github.com/influxdb/influxdb/pull/2067): Fixing intervals for GROUP BY. ## v0.9.0-rc15 [2015-03-19] From 09097c7d68071c3803fd9eed9fd370109acc9a1b Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 24 Mar 2015 17:08:19 -0600 Subject: [PATCH 4/7] remove warning --- tx.go | 1 - 1 file changed, 1 deletion(-) diff --git a/tx.go b/tx.go index 93505f9dc2..356188a699 100644 --- a/tx.go +++ b/tx.go @@ -317,7 +317,6 @@ func (l *LocalMapper) Begin(c *influxql.Call, startingTime int64) error { // NextInterval will get the time ordered next interval of the given interval size from the mapper. This is a // forward only operation from the start time passed into Begin. Will return nil when there is no more data to be read. func (l *LocalMapper) NextInterval(interval int64) (interface{}, error) { - warn("> ", time.Unix(0, l.tmin).UTC(), time.Unix(0, l.tmax).UTC()) if l.cursorsEmpty || l.tmin > l.job.TMax { return nil, nil } From 805cba71a4cc566cd62afc4bbc8b709a17a21247 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 24 Mar 2015 17:08:35 -0600 Subject: [PATCH 5/7] clarify test name --- cmd/influxd/server_integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index ad80747e8c..2b2a565ef4 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -410,7 +410,7 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent // Data read and write tests using relative time { reset: true, - name: "single point with timestamp pre-calculated for relative time queries yesterday", + name: "single point with timestamp pre-calculated for past time queries yesterday", write: `{"database" : "%DB%", "retentionPolicy" : "%RP%", "points": [{"name": "cpu", "timestamp": "` + yesterday.Format(time.RFC3339Nano) + `", "tags": {"host": "server01"}, "fields": {"value": 100}}]}`, query: `SELECT * FROM "%DB%"."%RP%".cpu where time >= '` + yesterday.Add(-1*time.Minute).Format(time.RFC3339Nano) + `'`, expected: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["%s",100]]}]}]}`, yesterday.Format(time.RFC3339Nano)), From c21ca156e9630369181038de3b546fd2203a61e0 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 24 Mar 2015 17:09:17 -0600 Subject: [PATCH 6/7] this logic should never happen --- influxql/engine.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/influxql/engine.go b/influxql/engine.go index 25caa168d8..62f22fddb6 100644 --- a/influxql/engine.go +++ b/influxql/engine.go @@ -497,9 +497,7 @@ func (m *MapReduceJob) processAggregate(c *Call, reduceFunc ReduceFunc, resultVa } firstInterval := m.interval - if !m.stmt.IsRawQuery { - firstInterval = (m.TMin/m.interval*m.interval + m.interval) - m.TMin - } + firstInterval = (m.TMin/m.interval*m.interval + m.interval) - m.TMin // populate the result values for each interval of time for i, _ := range resultValues { // collect the results from each mapper From 84a9b8a3fa5e1ef238db5ec75badd6a19d8b40c1 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 24 Mar 2015 17:10:20 -0600 Subject: [PATCH 7/7] the naming of the function is wrong, need future refactor, this logic is valid --- influxql/engine.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/influxql/engine.go b/influxql/engine.go index 62f22fddb6..25caa168d8 100644 --- a/influxql/engine.go +++ b/influxql/engine.go @@ -497,7 +497,9 @@ func (m *MapReduceJob) processAggregate(c *Call, reduceFunc ReduceFunc, resultVa } firstInterval := m.interval - firstInterval = (m.TMin/m.interval*m.interval + m.interval) - m.TMin + if !m.stmt.IsRawQuery { + firstInterval = (m.TMin/m.interval*m.interval + m.interval) - m.TMin + } // populate the result values for each interval of time for i, _ := range resultValues { // collect the results from each mapper