Merge branch 'plumb-nqe-fail' of github.com:zorkian/influxdb

Conflicts:
	src/cluster/shard.go
pull/324/head
John Shahid 2014-03-10 12:19:11 -04:00
commit be7137a246
3 changed files with 48 additions and 26 deletions

View File

@ -192,6 +192,8 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Respo
if self.IsLocal {
var processor QueryProcessor
var err error
if querySpec.IsListSeriesQuery() {
processor = engine.NewListSeriesEngine(response)
} else if querySpec.IsDeleteFromSeriesQuery() || querySpec.IsDropSeriesQuery() || querySpec.IsSinglePointQuery() {
@ -199,7 +201,10 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Respo
processor = engine.NewPassthroughEngine(response, maxDeleteResults)
} else {
if self.ShouldAggregateLocally(querySpec) {
processor = engine.NewQueryEngine(querySpec.SelectQuery(), response)
processor, err = engine.NewQueryEngine(querySpec.SelectQuery(), response)
if err != nil {
return err
}
} else {
maxPointsToBufferBeforeSending := 1000
processor = engine.NewPassthroughEngine(response, maxPointsToBufferBeforeSending)
@ -210,7 +215,7 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Respo
return err
}
defer self.store.ReturnShard(self.id)
shard.Query(querySpec, processor)
err = shard.Query(querySpec, processor)
processor.Close()
return err
}

View File

@ -222,10 +222,11 @@ func (self *CoordinatorImpl) shouldAggregateLocally(shards []*cluster.ShardData,
return true
}
func (self *CoordinatorImpl) getShardsAndProcessor(querySpec *parser.QuerySpec, writer SeriesWriter) ([]*cluster.ShardData, cluster.QueryProcessor, chan bool) {
func (self *CoordinatorImpl) getShardsAndProcessor(querySpec *parser.QuerySpec, writer SeriesWriter) ([]*cluster.ShardData, cluster.QueryProcessor, chan bool, error) {
shards := self.clusterConfiguration.GetShards(querySpec)
shouldAggregateLocally := self.shouldAggregateLocally(shards, querySpec)
var err error
var processor cluster.QueryProcessor
responseChan := make(chan *protocol.Response)
@ -235,7 +236,7 @@ func (self *CoordinatorImpl) getShardsAndProcessor(querySpec *parser.QuerySpec,
if selectQuery != nil && !shouldAggregateLocally {
// if we should aggregate in the coordinator (i.e. aggregation
// isn't happening locally at the shard level), create an engine
processor = engine.NewQueryEngine(querySpec.SelectQuery(), responseChan)
processor, err = engine.NewQueryEngine(querySpec.SelectQuery(), responseChan)
} else if selectQuery != nil && selectQuery.Limit > 0 {
// if we have a query with limit, then create an engine, or we can
// make the passthrough limit aware
@ -244,8 +245,12 @@ func (self *CoordinatorImpl) getShardsAndProcessor(querySpec *parser.QuerySpec,
processor = engine.NewPassthroughEngine(responseChan, 100)
}
if err != nil {
return nil, nil, nil, err
}
if processor == nil {
return shards, nil, nil
return shards, nil, nil, nil
}
go func() {
@ -262,11 +267,14 @@ func (self *CoordinatorImpl) getShardsAndProcessor(querySpec *parser.QuerySpec,
}
}()
return shards, processor, seriesClosed
return shards, processor, seriesClosed, nil
}
func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error {
shards, processor, seriesClosed := self.getShardsAndProcessor(querySpec, seriesWriter)
shards, processor, seriesClosed, err := self.getShardsAndProcessor(querySpec, seriesWriter)
if err != nil {
return err
}
responses := make([]chan *protocol.Response, 0)
for _, shard := range shards {

View File

@ -41,7 +41,7 @@ var (
)
// distribute query and possibly do the merge/join before yielding the points
func (self *QueryEngine) distributeQuery(query *parser.SelectQuery, yield func(*protocol.Series) error) (err error) {
func (self *QueryEngine) distributeQuery(query *parser.SelectQuery, yield func(*protocol.Series) error) error {
// see if this is a merge query
fromClause := query.GetFromClause()
if fromClause.Type == parser.FromClauseMerge {
@ -56,7 +56,7 @@ func (self *QueryEngine) distributeQuery(query *parser.SelectQuery, yield func(*
return nil
}
func NewQueryEngine(query *parser.SelectQuery, responseChan chan *protocol.Response) *QueryEngine {
func NewQueryEngine(query *parser.SelectQuery, responseChan chan *protocol.Response) (*QueryEngine, error) {
limit := query.Limit
// disable limit if the query has aggregates let the coordinator
// deal with the limit
@ -78,15 +78,19 @@ func NewQueryEngine(query *parser.SelectQuery, responseChan chan *protocol.Respo
return nil
}
var err error
if query.HasAggregates() {
queryEngine.executeCountQueryWithGroupBy(query, yield)
err = queryEngine.executeCountQueryWithGroupBy(query, yield)
} else if containsArithmeticOperators(query) {
queryEngine.executeArithmeticQuery(query, yield)
err = queryEngine.executeArithmeticQuery(query, yield)
} else {
queryEngine.distributeQuery(query, yield)
err = queryEngine.distributeQuery(query, yield)
}
return queryEngine
if err != nil {
return nil, err
}
return queryEngine, nil
}
// Returns false if the query should be stopped (either because of limit or error)
@ -118,7 +122,9 @@ func (self *QueryEngine) yieldSeriesData(series *protocol.Series) bool {
if len(series.Points) > 0 {
self.limiter.calculateLimitAndSlicePoints(series)
if len(series.Points) > 0 {
err = self.yield(series)
if err = self.yield(series); err != nil {
return false
}
}
}
}
@ -126,7 +132,9 @@ func (self *QueryEngine) yieldSeriesData(series *protocol.Series) bool {
self.limiter.calculateLimitAndSlicePoints(series)
if len(series.Points) > 0 {
err = self.yield(series)
if err = self.yield(series); err != nil {
return false
}
}
}
if err != nil {
@ -340,18 +348,19 @@ func (self *QueryEngine) executeCountQueryWithGroupBy(query *parser.SelectQuery,
self.aggregators = []Aggregator{}
for _, value := range query.GetColumnNames() {
if value.IsFunctionCall() {
lowerCaseName := strings.ToLower(value.Name)
initializer := registeredAggregators[lowerCaseName]
if initializer == nil {
return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("Unknown function %s", value.Name))
}
aggregator, err := initializer(query, value, query.GetGroupByClause().FillValue)
if err != nil {
return err
}
self.aggregators = append(self.aggregators, aggregator)
if !value.IsFunctionCall() {
continue
}
lowerCaseName := strings.ToLower(value.Name)
initializer := registeredAggregators[lowerCaseName]
if initializer == nil {
return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("Unknown function %s", value.Name))
}
aggregator, err := initializer(query, value, query.GetGroupByClause().FillValue)
if err != nil {
return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("%s", err))
}
self.aggregators = append(self.aggregators, aggregator)
}
timestampAggregator, err := NewTimestampAggregator(query, nil)
if err != nil {