Added limits for non-aggregate queries.

pull/249/head
Paul Dix 2014-02-10 16:10:39 -05:00
parent d59238c0fd
commit c9217769fc
2 changed files with 33 additions and 4 deletions

View File

@ -111,19 +111,35 @@ func (self *QueryEngine) yieldSeriesData(series *protocol.Series) bool {
serieses := self.filter(series)
for _, series := range serieses {
if len(series.Points) > 0 {
self.calculateLimitAndSlicePoints(series)
err = self.yield(series)
}
}
} else {
self.calculateLimitAndSlicePoints(series)
err = self.yield(series)
}
if err != nil {
return false
}
if self.isAggregateQuery {
self.runAggregates()
return !self.hitLimit()
}
// TODO: make limits work for aggregate queries and for queries that pull from multiple series.
func (self *QueryEngine) calculateLimitAndSlicePoints(series *protocol.Series) {
if !self.isAggregateQuery && self.shouldLimit {
self.limit -= len(series.Points)
if self.limit < 0 {
series.Points = series.Points[0 : len(series.Points)+self.limit]
}
}
return true
}
func (self *QueryEngine) hitLimit() bool {
if self.isAggregateQuery || !self.shouldLimit {
return false
}
return self.limit < 1
}
func (self *QueryEngine) filter(series *protocol.Series) []*protocol.Series {
@ -150,6 +166,9 @@ func (self *QueryEngine) Close() {
for _, series := range self.seriesToPoints {
self.yieldSeriesData(series)
}
if self.isAggregateQuery {
self.runAggregates()
}
response := &protocol.Response{Type: &responseEndStream}
self.responseChan <- response
}

View File

@ -234,7 +234,6 @@ func (self *ServerSuite) TestCountQueryOnSingleShard(c *C) {
t := (time.Now().Unix() - 60) * 1000
data = fmt.Sprintf(`[{"points": [[2, %d]], "name": "test_count_query_single_shard", "columns": ["value", "time"]}]`, t)
self.serverProcesses[0].Post("/db/test_rep/series?u=paul&p=pass", data, c)
self.serverProcesses[0].Query("test_rep", "select * from test_count_query_single_shard", false, c)
collection := self.serverProcesses[0].Query("test_rep", "select count(value) from test_count_query_single_shard group by time(1m)", false, c)
c.Assert(collection.Members, HasLen, 1)
series := collection.GetSeries("test_count_query_single_shard", c)
@ -243,6 +242,17 @@ func (self *ServerSuite) TestCountQueryOnSingleShard(c *C) {
c.Assert(series.GetValueForPointAndColumn(1, "count", c).(float64), Equals, float64(1))
}
func (self *ServerSuite) TestLimitQueryOnSingleShard(c *C) {
data := `[{"points": [[4], [10], [5]], "name": "test_limit_query_single_shard", "columns": ["value"]}]`
self.serverProcesses[0].Post("/db/test_rep/series?u=paul&p=pass", data, c)
collection := self.serverProcesses[0].Query("test_rep", "select * from test_limit_query_single_shard limit 2", false, c)
c.Assert(collection.Members, HasLen, 1)
series := collection.GetSeries("test_limit_query_single_shard", c)
c.Assert(series.Points, HasLen, 2)
c.Assert(series.GetValueForPointAndColumn(0, "value", c).(float64), Equals, float64(5))
c.Assert(series.GetValueForPointAndColumn(1, "value", c).(float64), Equals, float64(10))
}
func (self *ServerSuite) TestRestartAfterCompaction(c *C) {
data := `
[{