diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e9141f6c3..641601b2c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -268,3 +268,5 @@ ## v0.5.0-rc.4 [unreleased] ### Bugfixes + +- [Issue #298](https://github.com/influxdb/influxdb/issues/298). Fix limit when querying multiple shards diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index 38e90b75dc..bbab6d4574 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -213,43 +213,60 @@ func (self *CoordinatorImpl) runDropSeriesQuery(querySpec *parser.QuerySpec, ser return self.runQuerySpec(querySpec, seriesWriter) } -func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error { - shards := self.clusterConfiguration.GetShards(querySpec) - - shouldAggregateLocally := true - var processor cluster.QueryProcessor - var responseChan chan *protocol.Response - var seriesClosed chan bool +func (self *CoordinatorImpl) shouldAggregateLocally(shards []*cluster.ShardData, querySpec *parser.QuerySpec) bool { for _, s := range shards { - // If the aggregation is done at the shard level, we don't need to - // do it here at the coordinator level. if !s.ShouldAggregateLocally(querySpec) { - seriesClosed = make(chan bool) - shouldAggregateLocally = false - responseChan = make(chan *protocol.Response) - - if querySpec.SelectQuery() != nil { - processor = engine.NewQueryEngine(querySpec.SelectQuery(), responseChan) - } else { - bufferSize := 100 - processor = engine.NewPassthroughEngine(responseChan, bufferSize) - } - go func() { - for { - res := <-responseChan - if *res.Type == endStreamResponse || *res.Type == accessDeniedResponse { - seriesWriter.Close() - seriesClosed <- true - return - } - if res.Series != nil && len(res.Series.Points) > 0 { - seriesWriter.Write(res.Series) - } - } - }() - break + return false } } + return true +} + +func (self *CoordinatorImpl) getShardsAndProcessor(querySpec *parser.QuerySpec, writer SeriesWriter) ([]*cluster.ShardData, cluster.QueryProcessor, chan bool) { + shards := self.clusterConfiguration.GetShards(querySpec) + shouldAggregateLocally := self.shouldAggregateLocally(shards, querySpec) + + var processor cluster.QueryProcessor + + responseChan := make(chan *protocol.Response) + seriesClosed := make(chan bool) + + selectQuery := querySpec.SelectQuery() + if selectQuery != nil && !shouldAggregateLocally { + // if we should aggregate in the coordinator (i.e. aggregation + // isn't happening locally at the shard level), create an engine + processor = engine.NewQueryEngine(querySpec.SelectQuery(), responseChan) + } else if selectQuery != nil && selectQuery.Limit > 0 { + // if we have a query with limit, then create an engine, or we can + // make the passthrough limit aware + processor = engine.NewPassthroughEngineWithLimit(responseChan, 100, selectQuery.Limit) + } else if !shouldAggregateLocally { + processor = engine.NewPassthroughEngine(responseChan, 100) + } + + if processor == nil { + return shards, nil, nil + } + + go func() { + for { + res := <-responseChan + if *res.Type == endStreamResponse || *res.Type == accessDeniedResponse { + writer.Close() + seriesClosed <- true + return + } + if res.Series != nil && len(res.Series.Points) > 0 { + writer.Write(res.Series) + } + } + }() + + return shards, processor, seriesClosed +} + +func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error { + shards, processor, seriesClosed := self.getShardsAndProcessor(querySpec, seriesWriter) responses := make([]chan *protocol.Response, 0) for _, shard := range shards { @@ -266,7 +283,9 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri if *response.Type == endStreamResponse || *response.Type == accessDeniedResponse { break } - if shouldAggregateLocally { + + // if we don't have a processor, yield the point to the writer + if processor == nil { log.Debug("WRITING: ", len(response.Series.Points)) seriesWriter.Write(response.Series) log.Debug("WRITING (done)") @@ -284,7 +303,8 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri } log.Debug("DONE: shard: ", shards[i].String()) } - if !shouldAggregateLocally { + + if processor != nil { processor.Close() <-seriesClosed return nil diff --git a/src/engine/engine.go b/src/engine/engine.go index 02129639c5..a8213535dd 100644 --- a/src/engine/engine.go +++ b/src/engine/engine.go @@ -16,9 +16,7 @@ type QueryEngine struct { query *parser.SelectQuery where *parser.WhereCondition responseChan chan *protocol.Response - shouldLimit bool - limit int - limits map[string]int + limiter *Limiter seriesToPoints map[string]*protocol.Series yield func(*protocol.Series) error @@ -60,17 +58,16 @@ func (self *QueryEngine) distributeQuery(query *parser.SelectQuery, yield func(* func NewQueryEngine(query *parser.SelectQuery, responseChan chan *protocol.Response) *QueryEngine { limit := query.Limit - shouldLimit := true - if limit == 0 { - shouldLimit = false + // disable limit if the query has aggregates let the coordinator + // deal with the limit + if query.HasAggregates() { + limit = 0 } queryEngine := &QueryEngine{ query: query, where: query.GetWhereCondition(), - limit: limit, - limits: make(map[string]int), - shouldLimit: shouldLimit, + limiter: NewLimiter(limit), responseChan: responseChan, seriesToPoints: make(map[string]*protocol.Series), } @@ -119,14 +116,14 @@ func (self *QueryEngine) yieldSeriesData(series *protocol.Series) bool { } for _, series := range serieses { if len(series.Points) > 0 { - self.calculateLimitAndSlicePoints(series) + self.limiter.calculateLimitAndSlicePoints(series) if len(series.Points) > 0 { err = self.yield(series) } } } } else { - self.calculateLimitAndSlicePoints(series) + self.limiter.calculateLimitAndSlicePoints(series) if len(series.Points) > 0 { err = self.yield(series) @@ -136,42 +133,7 @@ func (self *QueryEngine) yieldSeriesData(series *protocol.Series) bool { log.Error(err) return false } - return !self.hitLimit(*series.Name) -} - -// TODO: make limits work for aggregate queries and for queries that pull from multiple series. -func (self *QueryEngine) calculateLimitAndSlicePoints(series *protocol.Series) { - if !self.isAggregateQuery && self.shouldLimit { - // if the limit is 0, stop returning any points - limit := self.limitForSeries(*series.Name) - defer func() { self.limits[*series.Name] = limit }() - if limit == 0 { - series.Points = nil - return - } - limit -= len(series.Points) - if limit <= 0 { - sliceTo := len(series.Points) + limit - series.Points = series.Points[0:sliceTo] - limit = 0 - } - } -} - -func (self *QueryEngine) hitLimit(seriesName string) bool { - if self.isAggregateQuery || !self.shouldLimit { - return false - } - return self.limitForSeries(seriesName) <= 0 -} - -func (self *QueryEngine) limitForSeries(name string) int { - currentLimit, ok := self.limits[name] - if !ok { - currentLimit = self.limit - self.limits[name] = currentLimit - } - return currentLimit + return !self.limiter.hitLimit(*series.Name) } func (self *QueryEngine) filter(series *protocol.Series) ([]*protocol.Series, error) { diff --git a/src/engine/limiter.go b/src/engine/limiter.go new file mode 100644 index 0000000000..11f0fe83d7 --- /dev/null +++ b/src/engine/limiter.go @@ -0,0 +1,53 @@ +package engine + +import ( + "protocol" +) + +type Limiter struct { + shouldLimit bool + limit int + limits map[string]int +} + +func NewLimiter(limit int) *Limiter { + return &Limiter{ + limit: limit, + limits: map[string]int{}, + shouldLimit: limit > 0, + } +} + +func (self *Limiter) calculateLimitAndSlicePoints(series *protocol.Series) { + if self.shouldLimit { + // if the limit is 0, stop returning any points + limit := self.limitForSeries(*series.Name) + defer func() { self.limits[*series.Name] = limit }() + if limit == 0 { + series.Points = nil + return + } + limit -= len(series.Points) + if limit <= 0 { + sliceTo := len(series.Points) + limit + series.Points = series.Points[0:sliceTo] + limit = 0 + } + } +} + +func (self *Limiter) hitLimit(seriesName string) bool { + if !self.shouldLimit { + return false + } + return self.limitForSeries(seriesName) <= 0 +} + +func (self *Limiter) limitForSeries(name string) int { + currentLimit, ok := self.limits[name] + if !ok { + currentLimit = self.limit + self.limits[name] = currentLimit + } + return currentLimit +} diff --git a/src/engine/passthrough_engine.go b/src/engine/passthrough_engine.go index ad5d5e7dac..85252beaa1 100644 --- a/src/engine/passthrough_engine.go +++ b/src/engine/passthrough_engine.go @@ -10,37 +10,49 @@ type PassthroughEngine struct { responseChan chan *protocol.Response response *protocol.Response maxPointsInResponse int + limiter *Limiter } func NewPassthroughEngine(responseChan chan *protocol.Response, maxPointsInResponse int) *PassthroughEngine { + return NewPassthroughEngineWithLimit(responseChan, maxPointsInResponse, 0) +} + +func NewPassthroughEngineWithLimit(responseChan chan *protocol.Response, maxPointsInResponse, limit int) *PassthroughEngine { return &PassthroughEngine{ responseChan: responseChan, maxPointsInResponse: maxPointsInResponse, + limiter: NewLimiter(limit), } } func (self *PassthroughEngine) YieldPoint(seriesName *string, columnNames []string, point *protocol.Point) bool { + series := &protocol.Series{Name: seriesName, Points: []*protocol.Point{point}, Fields: columnNames} + self.limiter.calculateLimitAndSlicePoints(series) + if len(series.Points) == 0 { + return false + } + if self.response == nil { self.response = &protocol.Response{ Type: &queryResponse, - Series: &protocol.Series{Name: seriesName, Points: []*protocol.Point{point}, Fields: columnNames}, + Series: series, } } else if self.response.Series.Name != seriesName { self.responseChan <- self.response self.response = &protocol.Response{ Type: &queryResponse, - Series: &protocol.Series{Name: seriesName, Points: []*protocol.Point{point}, Fields: columnNames}, + Series: series, } } else if len(self.response.Series.Points) > self.maxPointsInResponse { self.responseChan <- self.response self.response = &protocol.Response{ Type: &queryResponse, - Series: &protocol.Series{Name: seriesName, Points: []*protocol.Point{point}, Fields: columnNames}, + Series: series, } } else { self.response.Series.Points = append(self.response.Series.Points, point) } - return true + return !self.limiter.hitLimit(*seriesName) } func (self *PassthroughEngine) Close() { diff --git a/src/integration/benchmark_test.go b/src/integration/benchmark_test.go index db0fdb4e8b..06253ff917 100644 --- a/src/integration/benchmark_test.go +++ b/src/integration/benchmark_test.go @@ -601,6 +601,34 @@ func (self *IntegrationSuite) TestIssue89(c *C) { c.Assert(sums, DeepEquals, map[string]float64{"y": 30.0, "z": 40.0}) } +// make sure aggregation when happen locally at the shard level don't +// get repeated at the coordinator level, otherwise unexpected +// behavior will happen +func (self *IntegrationSuite) TestCountWithGroupByTimeAndLimit(c *C) { + for i := 0; i < 1; i++ { + err := self.server.WriteData(fmt.Sprintf(` +[ + { + "name": "test_count_with_groupby_and_limit", + "columns": ["cpu", "host"], + "points": [[%d, "hosta"], [%d, "hostb"]] + } +] +`, 60+i*10, 70+i*10)) + c.Assert(err, IsNil) + } + bs, err := self.server.RunQuery("select count(cpu) from test_count_with_groupby_and_limit group by time(5m) limit 10", "m") + c.Assert(err, IsNil) + data := []*SerializedSeries{} + err = json.Unmarshal(bs, &data) + c.Assert(data, HasLen, 1) + c.Assert(data[0].Name, Equals, "test_count_with_groupby_and_limit") + c.Assert(data[0].Columns, HasLen, 2) + c.Assert(data[0].Points, HasLen, 1) + // count should be 3 + c.Assert(data[0].Points[0][1], Equals, 2.0) +} + func (self *IntegrationSuite) TestCountWithGroupBy(c *C) { for i := 0; i < 20; i++ { err := self.server.WriteData(fmt.Sprintf(` @@ -689,6 +717,35 @@ func (self *IntegrationSuite) TestHttpPostWithTime(c *C) { c.Assert(values["val2"], Equals, 2.0) } +// test limit when getting data from multiple shards +func (self *IntegrationSuite) TestLimitMultipleShards(c *C) { + err := self.server.WriteData(` +[ + { + "name": "test_limit_with_multiple_shards", + "columns": ["time", "a"], + "points":[ + [1393577978000, 1], + [1383577978000, 2], + [1373577978000, 2], + [1363577978000, 2], + [1353577978000, 2], + [1343577978000, 2], + [1333577978000, 2], + [1323577978000, 2], + [1313577978000, 2] + ] + } +]`, "time_precision=m") + c.Assert(err, IsNil) + bs, err := self.server.RunQuery("select * from test_limit_with_multiple_shards limit 1", "m") + c.Assert(err, IsNil) + data := []*SerializedSeries{} + err = json.Unmarshal(bs, &data) + c.Assert(data, HasLen, 1) + c.Assert(data[0].Points, HasLen, 1) +} + // test for issue #106 func (self *IntegrationSuite) TestIssue106(c *C) { err := self.server.WriteData(`