diff --git a/src/engine/engine.go b/src/engine/engine.go index 087eddba99..6c7ef175bb 100644 --- a/src/engine/engine.go +++ b/src/engine/engine.go @@ -111,19 +111,35 @@ func (self *QueryEngine) yieldSeriesData(series *protocol.Series) bool { serieses := self.filter(series) for _, series := range serieses { if len(series.Points) > 0 { + self.calculateLimitAndSlicePoints(series) err = self.yield(series) } } } else { + self.calculateLimitAndSlicePoints(series) err = self.yield(series) } if err != nil { return false } - if self.isAggregateQuery { - self.runAggregates() + return !self.hitLimit() +} + +// TODO: make limits work for aggregate queries and for queries that pull from multiple series. +func (self *QueryEngine) calculateLimitAndSlicePoints(series *protocol.Series) { + if !self.isAggregateQuery && self.shouldLimit { + self.limit -= len(series.Points) + if self.limit < 0 { + series.Points = series.Points[0 : len(series.Points)+self.limit] + } } - return true +} + +func (self *QueryEngine) hitLimit() bool { + if self.isAggregateQuery || !self.shouldLimit { + return false + } + return self.limit < 1 } func (self *QueryEngine) filter(series *protocol.Series) []*protocol.Series { @@ -150,6 +166,9 @@ func (self *QueryEngine) Close() { for _, series := range self.seriesToPoints { self.yieldSeriesData(series) } + if self.isAggregateQuery { + self.runAggregates() + } response := &protocol.Response{Type: &responseEndStream} self.responseChan <- response } diff --git a/src/integration/server_test.go b/src/integration/server_test.go index 306d5e4473..43560a9b5c 100644 --- a/src/integration/server_test.go +++ b/src/integration/server_test.go @@ -234,7 +234,6 @@ func (self *ServerSuite) TestCountQueryOnSingleShard(c *C) { t := (time.Now().Unix() - 60) * 1000 data = fmt.Sprintf(`[{"points": [[2, %d]], "name": "test_count_query_single_shard", "columns": ["value", "time"]}]`, t) self.serverProcesses[0].Post("/db/test_rep/series?u=paul&p=pass", data, c) - self.serverProcesses[0].Query("test_rep", "select * from test_count_query_single_shard", false, c) collection := self.serverProcesses[0].Query("test_rep", "select count(value) from test_count_query_single_shard group by time(1m)", false, c) c.Assert(collection.Members, HasLen, 1) series := collection.GetSeries("test_count_query_single_shard", c) @@ -243,6 +242,17 @@ func (self *ServerSuite) TestCountQueryOnSingleShard(c *C) { c.Assert(series.GetValueForPointAndColumn(1, "count", c).(float64), Equals, float64(1)) } +func (self *ServerSuite) TestLimitQueryOnSingleShard(c *C) { + data := `[{"points": [[4], [10], [5]], "name": "test_limit_query_single_shard", "columns": ["value"]}]` + self.serverProcesses[0].Post("/db/test_rep/series?u=paul&p=pass", data, c) + collection := self.serverProcesses[0].Query("test_rep", "select * from test_limit_query_single_shard limit 2", false, c) + c.Assert(collection.Members, HasLen, 1) + series := collection.GetSeries("test_limit_query_single_shard", c) + c.Assert(series.Points, HasLen, 2) + c.Assert(series.GetValueForPointAndColumn(0, "value", c).(float64), Equals, float64(5)) + c.Assert(series.GetValueForPointAndColumn(1, "value", c).(float64), Equals, float64(10)) +} + func (self *ServerSuite) TestRestartAfterCompaction(c *C) { data := ` [{