Fix for subqueries to use the parallel iterator correctly

Also, fix the `Iterators.Merge(IteratorOptions)` function so it consults
the `Ordered` attribute to determine which iterator it should use to
merge the input iterators.
pull/7819/head
Jonathan A. Sternberg 2017-01-10 16:44:35 -06:00
parent e7b7984c27
commit 3ba950b029
2 changed files with 72 additions and 45 deletions

View File

@ -117,8 +117,11 @@ func (a Iterators) cast() interface{} {
// Merge combines all iterators into a single iterator.
// A sorted merge iterator or a merge iterator can be used based on opt.
func (a Iterators) Merge(opt IteratorOptions) (Iterator, error) {
// Check if this is a call expression.
call, ok := opt.Expr.(*Call)
// Merge into a single iterator.
if opt.MergeSorted() {
if !ok && opt.MergeSorted() {
itr := NewSortedMergeIterator(a, opt)
if itr != nil && opt.InterruptCh != nil {
itr = NewInterruptIterator(itr, opt.InterruptCh)
@ -126,6 +129,7 @@ func (a Iterators) Merge(opt IteratorOptions) (Iterator, error) {
return itr, nil
}
// We do not need an ordered output so use a merge iterator.
itr := NewMergeIterator(a, opt)
if itr == nil {
return nil, nil
@ -135,7 +139,6 @@ func (a Iterators) Merge(opt IteratorOptions) (Iterator, error) {
itr = NewInterruptIterator(itr, opt.InterruptCh)
}
call, ok := opt.Expr.(*Call)
if !ok {
// This is not a call expression so do not use a call iterator.
return itr, nil
@ -780,11 +783,7 @@ func newIteratorOptionsSubstatement(stmt *SelectStatement, opt IteratorOptions)
// MergeSorted returns true if the options require a sorted merge.
func (opt IteratorOptions) MergeSorted() bool {
if opt.Expr == nil {
return true
}
_, ok := opt.Expr.(*VarRef)
return ok
return opt.Ordered
}
// SeekTime returns the time the iterator should start from.

View File

@ -1246,64 +1246,92 @@ func (e *Engine) KeyCursor(key string, t int64, ascending bool) *KeyCursor {
// CreateIterator returns an iterator for the measurement based on opt.
func (e *Engine) CreateIterator(measurement string, opt influxql.IteratorOptions) (influxql.Iterator, error) {
if call, ok := opt.Expr.(*influxql.Call); ok {
refOpt := opt
refOpt.Expr = call.Args[0].(*influxql.VarRef)
aggregate := true
if opt.Interval.IsZero() {
switch call.Name {
case "first":
aggregate = false
if call.Name == "first" || call.Name == "last" {
refOpt := opt
refOpt.Limit = 1
refOpt.Ascending = true
refOpt.Ordered = true
case "last":
aggregate = false
refOpt.Limit = 1
refOpt.Ascending = false
refOpt.Ascending = call.Name == "first"
refOpt.Ordered = true
refOpt.Expr = call.Args[0]
itrs, err := e.createVarRefIterator(measurement, refOpt)
if err != nil {
return nil, err
}
return influxql.Iterators(itrs).Merge(opt)
}
}
inputs, err := e.createVarRefIterator(measurement, refOpt, aggregate)
inputs, err := e.createCallIterator(measurement, call, opt)
if err != nil {
return nil, err
} else if len(inputs) == 0 {
return nil, nil
}
// Wrap each series in a call iterator.
for i, input := range inputs {
if opt.InterruptCh != nil {
input = influxql.NewInterruptIterator(input, opt.InterruptCh)
}
itr, err := influxql.NewCallIterator(input, opt)
if err != nil {
return nil, err
}
inputs[i] = itr
}
return influxql.NewParallelMergeIterator(inputs, opt, runtime.GOMAXPROCS(0)), nil
return influxql.Iterators(inputs).Merge(opt)
}
itrs, err := e.createVarRefIterator(measurement, opt, false)
itrs, err := e.createVarRefIterator(measurement, opt)
if err != nil {
return nil, err
}
return influxql.Iterators(itrs).Merge(opt)
}
func (e *Engine) createCallIterator(measurement string, call *influxql.Call, opt influxql.IteratorOptions) ([]influxql.Iterator, error) {
ref, _ := call.Args[0].(*influxql.VarRef)
mm := e.index.Measurement(measurement)
if mm == nil {
return nil, nil
}
// Determine tagsets for this measurement based on dimensions and filters.
tagSets, err := mm.TagSets(e.id, opt.Dimensions, opt.Condition)
if err != nil {
return nil, err
}
itr := influxql.NewSortedMergeIterator(itrs, opt)
if itr != nil && opt.InterruptCh != nil {
itr = influxql.NewInterruptIterator(itr, opt.InterruptCh)
// Calculate tag sets and apply SLIMIT/SOFFSET.
tagSets = influxql.LimitTagSets(tagSets, opt.SLimit, opt.SOffset)
itrs := make([]influxql.Iterator, 0, len(tagSets))
if err := func() error {
for _, t := range tagSets {
inputs, err := e.createTagSetIterators(ref, mm, t, opt)
if err != nil {
return err
} else if len(inputs) == 0 {
continue
}
// Wrap each series in a call iterator.
for i, input := range inputs {
if opt.InterruptCh != nil {
input = influxql.NewInterruptIterator(input, opt.InterruptCh)
}
itr, err := influxql.NewCallIterator(input, opt)
if err != nil {
return err
}
inputs[i] = itr
}
itr := influxql.NewParallelMergeIterator(inputs, opt, runtime.GOMAXPROCS(0))
itrs = append(itrs, itr)
}
return nil
}(); err != nil {
influxql.Iterators(itrs).Close()
return nil, err
}
return itr, nil
return itrs, nil
}
// createVarRefIterator creates an iterator for a variable reference.
// The aggregate argument determines this is being created for an aggregate.
// If this is an aggregate, the limit optimization is disabled temporarily. See #6661.
func (e *Engine) createVarRefIterator(measurement string, opt influxql.IteratorOptions, aggregate bool) ([]influxql.Iterator, error) {
func (e *Engine) createVarRefIterator(measurement string, opt influxql.IteratorOptions) ([]influxql.Iterator, error) {
ref, _ := opt.Expr.(*influxql.VarRef)
mm := e.index.Measurement(measurement)
@ -1336,7 +1364,7 @@ func (e *Engine) createVarRefIterator(measurement string, opt influxql.IteratorO
return err
}
if !aggregate && len(inputs) > 0 && (opt.Limit > 0 || opt.Offset > 0) {
if opt.Limit > 0 || opt.Offset > 0 {
itr = newLimitIterator(itr, opt)
}
itrs = append(itrs, itr)