fix few bugs introduced in dbe76df

The changes in that commit broke Join and Merge, because they were
bypassing the QueryEngine which kept track of the series that should be
closed at the end of the query. Aggregators require an empty time series
to be sent after the query ends to flush their internal state.

The other bug was in deletes. Fixing the first bug broke the new
behavior of deletes which is return nothing if the time series is
empty. This is also fixed.
pull/344/head
John Shahid 2014-03-12 19:37:16 -04:00
parent f5706829e2
commit df84b7fe79
7 changed files with 44 additions and 24 deletions

View File

@ -31,7 +31,7 @@ type QueryProcessor interface {
// This method returns true if the query should continue. If the query should be stopped,
// like maybe the limit was hit, it should return false
YieldPoint(seriesName *string, columnNames []string, point *p.Point) bool
YieldSeries(seriesName *string, columnNames []string, seriesIncoming *p.Series) bool
YieldSeries(seriesIncoming *p.Series) bool
Close()
// Set by the shard, so EXPLAIN query can know query against which shard is being measured
@ -217,6 +217,7 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Respo
processor = engine.NewPassthroughEngine(response, maxDeleteResults)
} else {
if self.ShouldAggregateLocally(querySpec) {
fmt.Printf("creating a query engine\n")
processor, err = engine.NewQueryEngine(querySpec.SelectQuery(), response)
if err != nil {
response <- &p.Response{Type: &endStreamResponse, ErrorMessage: p.String(err.Error())}
@ -226,6 +227,7 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Respo
processor.SetShardInfo(int(self.Id()), self.IsLocal)
} else {
maxPointsToBufferBeforeSending := 1000
fmt.Printf("creating a passthrough engine\n")
processor = engine.NewPassthroughEngine(response, maxPointsToBufferBeforeSending)
}
}

View File

@ -301,22 +301,26 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri
break
}
// if we don't have a processor, yield the point to the writer
// this happens if shard took care of the query
// otherwise client will get points from passthrough engine
if processor == nil {
// If we have EXPLAIN query, we don't write actual points (of response.Type Query) to the client
if !(*response.Type == queryResponse && querySpec.IsExplainQuery()) {
seriesWriter.Write(response.Series)
}
if response.Series == nil || len(response.Series.Points) == 0 {
log.Debug("Series has no points, continue")
continue
}
// if the data wasn't aggregated at the shard level, aggregate
// the data here
if response.Series != nil {
// if we don't have a processor, yield the point to the writer
// this happens if shard took care of the query
// otherwise client will get points from passthrough engine
if processor != nil {
// if the data wasn't aggregated at the shard level, aggregate
// the data here
log.Debug("YIELDING: %d points", len(response.Series.Points))
processor.YieldSeries(response.Series.Name, response.Series.Fields, response.Series)
processor.YieldSeries(response.Series)
continue
}
// If we have EXPLAIN query, we don't write actual points (of
// response.Type Query) to the client
if !(*response.Type == queryResponse && querySpec.IsExplainQuery()) {
seriesWriter.Write(response.Series)
}
}
log.Debug("DONE: shard: ", shards[i].String())

View File

@ -281,8 +281,12 @@ func (self *LevelDbShard) executeQueryForSeries(querySpec *parser.QuerySpec, ser
if len(seriesOutgoing.Points) >= self.pointBatchSize {
for _, alias := range aliases {
_alias := alias
if !processor.YieldSeries(&_alias, fieldNames, seriesOutgoing) {
series := &protocol.Series{
Name: proto.String(alias),
Fields: fieldNames,
Points: seriesOutgoing.Points,
}
if !processor.YieldSeries(series) {
shouldContinue = false
}
}
@ -296,9 +300,9 @@ func (self *LevelDbShard) executeQueryForSeries(querySpec *parser.QuerySpec, ser
//Yield remaining data
for _, alias := range aliases {
_alias := alias
log.Debug("Final Flush %s", _alias)
if !processor.YieldSeries(&_alias, fieldNames, seriesOutgoing) {
log.Debug("Final Flush %s", alias)
series := &protocol.Series{Name: protocol.String(alias), Fields: seriesOutgoing.Fields, Points: seriesOutgoing.Points}
if !processor.YieldSeries(series) {
log.Debug("Cancelled...")
}
}

View File

@ -145,13 +145,16 @@ func (self *QueryEngine) YieldPoint(seriesName *string, fieldNames []string, poi
self.pointsRead++
}
fmt.Printf("self.seriesToPoints: %#v\n", self.seriesToPoints)
return shouldContinue
}
func (self *QueryEngine) YieldSeries(seriesName *string, fieldNames []string, seriesIncoming *protocol.Series) (shouldContinue bool) {
func (self *QueryEngine) YieldSeries(seriesIncoming *protocol.Series) (shouldContinue bool) {
if self.explain {
self.pointsRead += int64(len(seriesIncoming.Points))
}
seriesName := seriesIncoming.GetName()
self.seriesToPoints[seriesName] = &protocol.Series{Name: &seriesName, Fields: seriesIncoming.Fields}
return self.yieldSeriesData(seriesIncoming)
}
@ -210,6 +213,8 @@ func (self *QueryEngine) filter(series *protocol.Series) ([]*protocol.Series, er
}
func (self *QueryEngine) Close() {
fmt.Printf("Closing: %#v\n", self.seriesToPoints)
for _, series := range self.seriesToPoints {
if len(series.Points) == 0 {
continue
@ -223,6 +228,7 @@ func (self *QueryEngine) Close() {
Name: series.Name,
Fields: series.Fields,
}
fmt.Printf("yielding empty series for %s\n", series.GetName())
err = self.yield(s)
if err != nil {
break

View File

@ -41,7 +41,7 @@ func (self *ListSeriesEngine) YieldPoint(seriesName *string, columnNames []strin
return true
}
func (self *ListSeriesEngine) YieldSeries(seriesName *string, columnNames []string, seriesIncoming *protocol.Series) bool {
func (self *ListSeriesEngine) YieldSeries(seriesIncoming *protocol.Series) bool {
if len(self.response.MultiSeries) > MAX_SERIES_IN_RESPONSE {
self.responseChan <- self.response
self.response = &protocol.Response{
@ -49,7 +49,7 @@ func (self *ListSeriesEngine) YieldSeries(seriesName *string, columnNames []stri
MultiSeries: make([]*protocol.Series, 0),
}
}
self.response.MultiSeries = append(self.response.MultiSeries, &protocol.Series{Name: seriesName})
self.response.MultiSeries = append(self.response.MultiSeries, &protocol.Series{Name: seriesIncoming.Name})
return true
}

View File

@ -1,6 +1,7 @@
package engine
import (
"fmt"
"parser"
"protocol"
)
@ -16,6 +17,7 @@ func getJoinYield(query *parser.SelectQuery, yield func(*protocol.Series) error)
name := table1 + "_join_" + table2
return mergeYield(table1, table2, false, query.Ascending, func(s *protocol.Series) error {
fmt.Printf("join series: %d\n", len(s.Points))
if *s.Name == table1 {
lastPoint1 = s.Points[len(s.Points)-1]
if lastFields1 == nil {
@ -269,6 +271,8 @@ func mergeYield(table1, table2 string, modifyValues bool, ascending bool, yield
}
return func(p *protocol.Series) error {
fmt.Printf("series: %v\n", len(p.Points))
state.updateState(p)
if err := state.flushIfNecessary(yield); err != nil {

View File

@ -75,7 +75,7 @@ func (self *PassthroughEngine) YieldPoint(seriesName *string, columnNames []stri
return !self.limiter.hitLimit(*seriesName)
}
func (self *PassthroughEngine) YieldSeries(seriesName *string, fieldNames []string, seriesIncoming *protocol.Series) bool {
func (self *PassthroughEngine) YieldSeries(seriesIncoming *protocol.Series) bool {
log.Debug("PassthroughEngine YieldSeries %d", len(seriesIncoming.Points))
if *seriesIncoming.Name == "explain query" {
self.responseType = &explainQueryResponse
@ -95,7 +95,7 @@ func (self *PassthroughEngine) YieldSeries(seriesName *string, fieldNames []stri
Type: self.responseType,
Series: seriesIncoming,
}
} else if *self.response.Series.Name != *seriesName {
} else if self.response.Series.GetName() != seriesIncoming.GetName() {
self.responseChan <- self.response
self.response = &protocol.Response{
Type: self.responseType,
@ -110,7 +110,7 @@ func (self *PassthroughEngine) YieldSeries(seriesName *string, fieldNames []stri
} else {
self.response.Series.Points = append(self.response.Series.Points, seriesIncoming.Points...)
}
return !self.limiter.hitLimit(*seriesName)
return !self.limiter.hitLimit(seriesIncoming.GetName())
//return true
}