From a12f028de42497487a1215b8dbe66549c4f35b37 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Mon, 9 Mar 2015 19:17:03 -0700 Subject: [PATCH] Wire up limit and offset per series. Fixes #1709 and fixes #1103 --- cmd/influxd/server_integration_test.go | 59 ++++++++++++++++++++++++++ influxql/engine.go | 36 ++++++++++++++++ 2 files changed, 95 insertions(+) diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index 5e7f49e385..df7c2e683f 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -372,6 +372,65 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent expected: `{"results":[{"series":[{"name":"where_events","columns":["time","foo"],"values":[["2009-11-10T23:00:02Z","bar"],["2009-11-10T23:00:03Z","baz"]]}]}]}`, }, + // LIMIT and OFFSET tests + + { + name: "limit on points", + write: `{"database" : "%DB%", "retentionPolicy" : "%RP%", "points": [ + {"name": "limit", "timestamp": "2009-11-10T23:00:02Z","fields": {"foo": "bar"}, "tags": {"tennant": "paul"}}, + {"name": "limit", "timestamp": "2009-11-10T23:00:03Z","fields": {"foo": "baz"}, "tags": {"tennant": "paul"}}, + {"name": "limit", "timestamp": "2009-11-10T23:00:04Z","fields": {"foo": "bat"}, "tags": {"tennant": "paul"}}, + {"name": "limit", "timestamp": "2009-11-10T23:00:05Z","fields": {"foo": "bar"}, "tags": {"tennant": "todd"}} + ]}`, + query: `select foo from "%DB%"."%RP%".limit LIMIT 2`, + expected: `{"results":[{"series":[{"name":"limit","columns":["time","foo"],"values":[["2009-11-10T23:00:02Z","bar"],["2009-11-10T23:00:03Z","baz"]]}]}]}`, + }, + { + name: "limit higher than the number of data points", + query: `select foo from "%DB%"."%RP%".limit LIMIT 20`, + expected: `{"results":[{"series":[{"name":"limit","columns":["time","foo"],"values":[["2009-11-10T23:00:02Z","bar"],["2009-11-10T23:00:03Z","baz"],["2009-11-10T23:00:04Z","bat"],["2009-11-10T23:00:05Z","bar"]]}]}]}`, + }, + { + name: "limit and offset", + query: `select foo from "%DB%"."%RP%".limit LIMIT 2 OFFSET 1`, + expected: `{"results":[{"series":[{"name":"limit","columns":["time","foo"],"values":[["2009-11-10T23:00:03Z","baz"],["2009-11-10T23:00:04Z","bat"]]}]}]}`, + }, + { + name: "limit + offset higher than number of points", + query: `select foo from "%DB%"."%RP%".limit LIMIT 3 OFFSET 3`, + expected: `{"results":[{"series":[{"name":"limit","columns":["time","foo"],"values":[["2009-11-10T23:00:05Z","bar"]]}]}]}`, + }, + { + name: "offset higher than number of points", + query: `select foo from "%DB%"."%RP%".limit LIMIT 2 OFFSET 20`, + expected: `{"results":[{"series":[{"name":"limit","columns":["time","foo"]}]}]}`, + }, + { + name: "limit on points with group by time", + query: `select foo from "%DB%"."%RP%".limit WHERE time >= '2009-11-10T23:00:02Z' AND time < '2009-11-10T23:00:06Z' GROUP BY time(1s) LIMIT 2`, + expected: `{"results":[{"series":[{"name":"limit","columns":["time","foo"],"values":[["2009-11-10T23:00:02Z","bar"],["2009-11-10T23:00:03Z","baz"]]}]}]}`, + }, + { + name: "limit higher than the number of data points with group by time", + query: `select foo from "%DB%"."%RP%".limit WHERE time >= '2009-11-10T23:00:02Z' AND time < '2009-11-10T23:00:06Z' GROUP BY time(1s) LIMIT 20`, + expected: `{"results":[{"series":[{"name":"limit","columns":["time","foo"],"values":[["2009-11-10T23:00:02Z","bar"],["2009-11-10T23:00:03Z","baz"],["2009-11-10T23:00:04Z","bat"],["2009-11-10T23:00:05Z","bar"]]}]}]}`, + }, + { + name: "limit and offset with group by time", + query: `select foo from "%DB%"."%RP%".limit WHERE time >= '2009-11-10T23:00:02Z' AND time < '2009-11-10T23:00:06Z' GROUP BY time(1s) LIMIT 2 OFFSET 1`, + expected: `{"results":[{"series":[{"name":"limit","columns":["time","foo"],"values":[["2009-11-10T23:00:03Z","baz"],["2009-11-10T23:00:04Z","bat"]]}]}]}`, + }, + { + name: "limit + offset higher than number of points with group by time", + query: `select foo from "%DB%"."%RP%".limit WHERE time >= '2009-11-10T23:00:02Z' AND time < '2009-11-10T23:00:06Z' GROUP BY time(1s) LIMIT 3 OFFSET 3`, + expected: `{"results":[{"series":[{"name":"limit","columns":["time","foo"],"values":[["2009-11-10T23:00:05Z","bar"]]}]}]}`, + }, + { + name: "offset higher than number of points with group by time", + query: `select foo from "%DB%"."%RP%".limit WHERE time >= '2009-11-10T23:00:02Z' AND time < '2009-11-10T23:00:06Z' GROUP BY time(1s) LIMIT 2 OFFSET 20`, + expected: `{"results":[{"series":[{"name":"limit","columns":["time","foo"]}]}]}`, + }, + // Metadata display tests { diff --git a/influxql/engine.go b/influxql/engine.go index cb5ad22e3f..66319bbcbe 100644 --- a/influxql/engine.go +++ b/influxql/engine.go @@ -121,6 +121,27 @@ func (m *MapReduceJob) Execute(out chan *Row, filterEmptyResults bool) { resultValues[i] = append(vals, time.Unix(0, t).UTC()) } + // now limit the number of data points returned by the limit and offset + if pointCountInResult > 1 && (m.stmt.Limit > 0 || m.stmt.Offset > 0) { + if m.stmt.Offset > len(resultValues) { + out <- &Row{ + Name: m.MeasurementName, + Tags: m.TagSet.Tags, + } + + return + } else { + limit := m.stmt.Limit + if m.stmt.Offset+m.stmt.Limit > len(resultValues) { + limit = len(resultValues) - m.stmt.Offset + } + + resultTimes = resultTimes[m.stmt.Offset : m.stmt.Offset+limit] + resultValues = resultValues[m.stmt.Offset : m.stmt.Offset+limit] + } + m.TMin = resultTimes[0] + } + // now loop through the aggregate functions and populate everything for i, c := range aggregates { if err := m.processAggregate(c, reduceFuncs[i], resultValues); err != nil { @@ -384,6 +405,21 @@ func (m *MapReduceJob) processRawResults(resultValues [][]interface{}) *Row { row.Values = append(row.Values, vals) } + // apply limit and offset, if applicable + // TODO: make this so it doesn't read the whole result set into memory + if m.stmt.Limit > 0 || m.stmt.Offset > 0 { + if m.stmt.Offset > len(row.Values) { + row.Values = nil + } else { + limit := m.stmt.Limit + if m.stmt.Offset+m.stmt.Limit > len(row.Values) { + limit = len(row.Values) - m.stmt.Offset + } + + row.Values = row.Values[m.stmt.Offset : m.stmt.Offset+limit] + } + } + return row }