fix #298. fix limit when querying multiple shards

pull/298/head
John Shahid 2014-03-05 12:50:46 -05:00
parent 91dfff2ef7
commit f3f36e0782
6 changed files with 192 additions and 86 deletions

View File

@ -268,3 +268,5 @@
## v0.5.0-rc.4 [unreleased]
### Bugfixes
- [Issue #298](https://github.com/influxdb/influxdb/issues/298). Fix limit when querying multiple shards

View File

@ -213,43 +213,60 @@ func (self *CoordinatorImpl) runDropSeriesQuery(querySpec *parser.QuerySpec, ser
return self.runQuerySpec(querySpec, seriesWriter)
}
func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error {
shards := self.clusterConfiguration.GetShards(querySpec)
shouldAggregateLocally := true
var processor cluster.QueryProcessor
var responseChan chan *protocol.Response
var seriesClosed chan bool
func (self *CoordinatorImpl) shouldAggregateLocally(shards []*cluster.ShardData, querySpec *parser.QuerySpec) bool {
for _, s := range shards {
// If the aggregation is done at the shard level, we don't need to
// do it here at the coordinator level.
if !s.ShouldAggregateLocally(querySpec) {
seriesClosed = make(chan bool)
shouldAggregateLocally = false
responseChan = make(chan *protocol.Response)
if querySpec.SelectQuery() != nil {
processor = engine.NewQueryEngine(querySpec.SelectQuery(), responseChan)
} else {
bufferSize := 100
processor = engine.NewPassthroughEngine(responseChan, bufferSize)
}
go func() {
for {
res := <-responseChan
if *res.Type == endStreamResponse || *res.Type == accessDeniedResponse {
seriesWriter.Close()
seriesClosed <- true
return
}
if res.Series != nil && len(res.Series.Points) > 0 {
seriesWriter.Write(res.Series)
}
}
}()
break
return false
}
}
return true
}
func (self *CoordinatorImpl) getShardsAndProcessor(querySpec *parser.QuerySpec, writer SeriesWriter) ([]*cluster.ShardData, cluster.QueryProcessor, chan bool) {
shards := self.clusterConfiguration.GetShards(querySpec)
shouldAggregateLocally := self.shouldAggregateLocally(shards, querySpec)
var processor cluster.QueryProcessor
responseChan := make(chan *protocol.Response)
seriesClosed := make(chan bool)
selectQuery := querySpec.SelectQuery()
if selectQuery != nil && !shouldAggregateLocally {
// if we should aggregate in the coordinator (i.e. aggregation
// isn't happening locally at the shard level), create an engine
processor = engine.NewQueryEngine(querySpec.SelectQuery(), responseChan)
} else if selectQuery != nil && selectQuery.Limit > 0 {
// if we have a query with limit, then create an engine, or we can
// make the passthrough limit aware
processor = engine.NewPassthroughEngineWithLimit(responseChan, 100, selectQuery.Limit)
} else if !shouldAggregateLocally {
processor = engine.NewPassthroughEngine(responseChan, 100)
}
if processor == nil {
return shards, nil, nil
}
go func() {
for {
res := <-responseChan
if *res.Type == endStreamResponse || *res.Type == accessDeniedResponse {
writer.Close()
seriesClosed <- true
return
}
if res.Series != nil && len(res.Series.Points) > 0 {
writer.Write(res.Series)
}
}
}()
return shards, processor, seriesClosed
}
func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error {
shards, processor, seriesClosed := self.getShardsAndProcessor(querySpec, seriesWriter)
responses := make([]chan *protocol.Response, 0)
for _, shard := range shards {
@ -266,7 +283,9 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri
if *response.Type == endStreamResponse || *response.Type == accessDeniedResponse {
break
}
if shouldAggregateLocally {
// if we don't have a processor, yield the point to the writer
if processor == nil {
log.Debug("WRITING: ", len(response.Series.Points))
seriesWriter.Write(response.Series)
log.Debug("WRITING (done)")
@ -284,7 +303,8 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri
}
log.Debug("DONE: shard: ", shards[i].String())
}
if !shouldAggregateLocally {
if processor != nil {
processor.Close()
<-seriesClosed
return nil

View File

@ -16,9 +16,7 @@ type QueryEngine struct {
query *parser.SelectQuery
where *parser.WhereCondition
responseChan chan *protocol.Response
shouldLimit bool
limit int
limits map[string]int
limiter *Limiter
seriesToPoints map[string]*protocol.Series
yield func(*protocol.Series) error
@ -60,17 +58,16 @@ func (self *QueryEngine) distributeQuery(query *parser.SelectQuery, yield func(*
func NewQueryEngine(query *parser.SelectQuery, responseChan chan *protocol.Response) *QueryEngine {
limit := query.Limit
shouldLimit := true
if limit == 0 {
shouldLimit = false
// disable limit if the query has aggregates let the coordinator
// deal with the limit
if query.HasAggregates() {
limit = 0
}
queryEngine := &QueryEngine{
query: query,
where: query.GetWhereCondition(),
limit: limit,
limits: make(map[string]int),
shouldLimit: shouldLimit,
limiter: NewLimiter(limit),
responseChan: responseChan,
seriesToPoints: make(map[string]*protocol.Series),
}
@ -119,14 +116,14 @@ func (self *QueryEngine) yieldSeriesData(series *protocol.Series) bool {
}
for _, series := range serieses {
if len(series.Points) > 0 {
self.calculateLimitAndSlicePoints(series)
self.limiter.calculateLimitAndSlicePoints(series)
if len(series.Points) > 0 {
err = self.yield(series)
}
}
}
} else {
self.calculateLimitAndSlicePoints(series)
self.limiter.calculateLimitAndSlicePoints(series)
if len(series.Points) > 0 {
err = self.yield(series)
@ -136,42 +133,7 @@ func (self *QueryEngine) yieldSeriesData(series *protocol.Series) bool {
log.Error(err)
return false
}
return !self.hitLimit(*series.Name)
}
// TODO: make limits work for aggregate queries and for queries that pull from multiple series.
func (self *QueryEngine) calculateLimitAndSlicePoints(series *protocol.Series) {
if !self.isAggregateQuery && self.shouldLimit {
// if the limit is 0, stop returning any points
limit := self.limitForSeries(*series.Name)
defer func() { self.limits[*series.Name] = limit }()
if limit == 0 {
series.Points = nil
return
}
limit -= len(series.Points)
if limit <= 0 {
sliceTo := len(series.Points) + limit
series.Points = series.Points[0:sliceTo]
limit = 0
}
}
}
func (self *QueryEngine) hitLimit(seriesName string) bool {
if self.isAggregateQuery || !self.shouldLimit {
return false
}
return self.limitForSeries(seriesName) <= 0
}
func (self *QueryEngine) limitForSeries(name string) int {
currentLimit, ok := self.limits[name]
if !ok {
currentLimit = self.limit
self.limits[name] = currentLimit
}
return currentLimit
return !self.limiter.hitLimit(*series.Name)
}
func (self *QueryEngine) filter(series *protocol.Series) ([]*protocol.Series, error) {

53
src/engine/limiter.go Normal file
View File

@ -0,0 +1,53 @@
package engine
import (
"protocol"
)
type Limiter struct {
shouldLimit bool
limit int
limits map[string]int
}
func NewLimiter(limit int) *Limiter {
return &Limiter{
limit: limit,
limits: map[string]int{},
shouldLimit: limit > 0,
}
}
func (self *Limiter) calculateLimitAndSlicePoints(series *protocol.Series) {
if self.shouldLimit {
// if the limit is 0, stop returning any points
limit := self.limitForSeries(*series.Name)
defer func() { self.limits[*series.Name] = limit }()
if limit == 0 {
series.Points = nil
return
}
limit -= len(series.Points)
if limit <= 0 {
sliceTo := len(series.Points) + limit
series.Points = series.Points[0:sliceTo]
limit = 0
}
}
}
func (self *Limiter) hitLimit(seriesName string) bool {
if !self.shouldLimit {
return false
}
return self.limitForSeries(seriesName) <= 0
}
func (self *Limiter) limitForSeries(name string) int {
currentLimit, ok := self.limits[name]
if !ok {
currentLimit = self.limit
self.limits[name] = currentLimit
}
return currentLimit
}

View File

@ -10,37 +10,49 @@ type PassthroughEngine struct {
responseChan chan *protocol.Response
response *protocol.Response
maxPointsInResponse int
limiter *Limiter
}
func NewPassthroughEngine(responseChan chan *protocol.Response, maxPointsInResponse int) *PassthroughEngine {
return NewPassthroughEngineWithLimit(responseChan, maxPointsInResponse, 0)
}
func NewPassthroughEngineWithLimit(responseChan chan *protocol.Response, maxPointsInResponse, limit int) *PassthroughEngine {
return &PassthroughEngine{
responseChan: responseChan,
maxPointsInResponse: maxPointsInResponse,
limiter: NewLimiter(limit),
}
}
func (self *PassthroughEngine) YieldPoint(seriesName *string, columnNames []string, point *protocol.Point) bool {
series := &protocol.Series{Name: seriesName, Points: []*protocol.Point{point}, Fields: columnNames}
self.limiter.calculateLimitAndSlicePoints(series)
if len(series.Points) == 0 {
return false
}
if self.response == nil {
self.response = &protocol.Response{
Type: &queryResponse,
Series: &protocol.Series{Name: seriesName, Points: []*protocol.Point{point}, Fields: columnNames},
Series: series,
}
} else if self.response.Series.Name != seriesName {
self.responseChan <- self.response
self.response = &protocol.Response{
Type: &queryResponse,
Series: &protocol.Series{Name: seriesName, Points: []*protocol.Point{point}, Fields: columnNames},
Series: series,
}
} else if len(self.response.Series.Points) > self.maxPointsInResponse {
self.responseChan <- self.response
self.response = &protocol.Response{
Type: &queryResponse,
Series: &protocol.Series{Name: seriesName, Points: []*protocol.Point{point}, Fields: columnNames},
Series: series,
}
} else {
self.response.Series.Points = append(self.response.Series.Points, point)
}
return true
return !self.limiter.hitLimit(*seriesName)
}
func (self *PassthroughEngine) Close() {

View File

@ -601,6 +601,34 @@ func (self *IntegrationSuite) TestIssue89(c *C) {
c.Assert(sums, DeepEquals, map[string]float64{"y": 30.0, "z": 40.0})
}
// make sure aggregation when happen locally at the shard level don't
// get repeated at the coordinator level, otherwise unexpected
// behavior will happen
func (self *IntegrationSuite) TestCountWithGroupByTimeAndLimit(c *C) {
for i := 0; i < 1; i++ {
err := self.server.WriteData(fmt.Sprintf(`
[
{
"name": "test_count_with_groupby_and_limit",
"columns": ["cpu", "host"],
"points": [[%d, "hosta"], [%d, "hostb"]]
}
]
`, 60+i*10, 70+i*10))
c.Assert(err, IsNil)
}
bs, err := self.server.RunQuery("select count(cpu) from test_count_with_groupby_and_limit group by time(5m) limit 10", "m")
c.Assert(err, IsNil)
data := []*SerializedSeries{}
err = json.Unmarshal(bs, &data)
c.Assert(data, HasLen, 1)
c.Assert(data[0].Name, Equals, "test_count_with_groupby_and_limit")
c.Assert(data[0].Columns, HasLen, 2)
c.Assert(data[0].Points, HasLen, 1)
// count should be 3
c.Assert(data[0].Points[0][1], Equals, 2.0)
}
func (self *IntegrationSuite) TestCountWithGroupBy(c *C) {
for i := 0; i < 20; i++ {
err := self.server.WriteData(fmt.Sprintf(`
@ -689,6 +717,35 @@ func (self *IntegrationSuite) TestHttpPostWithTime(c *C) {
c.Assert(values["val2"], Equals, 2.0)
}
// test limit when getting data from multiple shards
func (self *IntegrationSuite) TestLimitMultipleShards(c *C) {
err := self.server.WriteData(`
[
{
"name": "test_limit_with_multiple_shards",
"columns": ["time", "a"],
"points":[
[1393577978000, 1],
[1383577978000, 2],
[1373577978000, 2],
[1363577978000, 2],
[1353577978000, 2],
[1343577978000, 2],
[1333577978000, 2],
[1323577978000, 2],
[1313577978000, 2]
]
}
]`, "time_precision=m")
c.Assert(err, IsNil)
bs, err := self.server.RunQuery("select * from test_limit_with_multiple_shards limit 1", "m")
c.Assert(err, IsNil)
data := []*SerializedSeries{}
err = json.Unmarshal(bs, &data)
c.Assert(data, HasLen, 1)
c.Assert(data[0].Points, HasLen, 1)
}
// test for issue #106
func (self *IntegrationSuite) TestIssue106(c *C) {
err := self.server.WriteData(`