Merge pull request #6539 from influxdata/js-6495-fix-aggregates-with-empty-shards

Fix aggregate returns when data is missing from some shards
pull/6544/head
Jonathan A. Sternberg 2016-05-03 10:56:21 -04:00
commit a2a5c32770
5 changed files with 54 additions and 31 deletions

View File

@ -55,6 +55,7 @@
- [#6505](https://github.com/influxdata/influxdb/issues/6505): Add regex literal to InfluxQL spec for FROM clause.
- [#5890](https://github.com/influxdata/influxdb/issues/5890): Return the time with a selector when there is no group by interval.
- [#6496](https://github.com/influxdata/influxdb/issues/6496): Fix parsing escaped series key when loading database index
- [#6495](https://github.com/influxdata/influxdb/issues/6495): Fix aggregate returns when data is missing from some shards.
## v0.12.2 [2016-04-20]

View File

@ -118,7 +118,7 @@ func (a Iterators) cast() interface{} {
func NewMergeIterator(inputs []Iterator, opt IteratorOptions) Iterator {
inputs = Iterators(inputs).filterNonNil()
if len(inputs) == 0 {
return &nilFloatIterator{}
return nil
}
// Aggregate functions can use a more relaxed sorting so that points
@ -145,7 +145,7 @@ func NewMergeIterator(inputs []Iterator, opt IteratorOptions) Iterator {
func NewSortedMergeIterator(inputs []Iterator, opt IteratorOptions) Iterator {
inputs = Iterators(inputs).filterNonNil()
if len(inputs) == 0 {
return &nilFloatIterator{}
return nil
}
switch inputs := Iterators(inputs).cast().(type) {
@ -535,26 +535,29 @@ func (a IteratorCreators) CreateIterator(opt IteratorOptions) (Iterator, error)
// Merge into a single iterator.
if opt.MergeSorted() {
itr := NewSortedMergeIterator(itrs, opt)
if opt.InterruptCh != nil {
if itr != nil && opt.InterruptCh != nil {
itr = NewInterruptIterator(itr, opt.InterruptCh)
}
return itr, nil
}
itr := NewMergeIterator(itrs, opt)
if opt.Expr != nil {
if expr, ok := opt.Expr.(*Call); ok && expr.Name == "count" {
opt.Expr = &Call{
Name: "sum",
Args: expr.Args,
if itr != nil {
if opt.Expr != nil {
if expr, ok := opt.Expr.(*Call); ok && expr.Name == "count" {
opt.Expr = &Call{
Name: "sum",
Args: expr.Args,
}
}
}
}
if opt.InterruptCh != nil {
itr = NewInterruptIterator(itr, opt.InterruptCh)
if opt.InterruptCh != nil {
itr = NewInterruptIterator(itr, opt.InterruptCh)
}
return NewCallIterator(itr, opt)
}
return NewCallIterator(itr, opt)
return nil, nil
}
// FieldDimensions returns unique fields and dimensions from multiple iterator creators.

View File

@ -209,13 +209,10 @@ func TestMergeIterator_Boolean(t *testing.T) {
}
func TestMergeIterator_Nil(t *testing.T) {
itr := influxql.NewMergeIterator([]influxql.Iterator{nil}, influxql.IteratorOptions{}).(influxql.FloatIterator)
if p, err := itr.Next(); p != nil {
t.Fatalf("unexpected point: %#v", p)
} else if err != nil {
t.Fatalf("unexpected error: %#v", err)
itr := influxql.NewMergeIterator([]influxql.Iterator{nil}, influxql.IteratorOptions{})
if itr != nil {
t.Fatalf("unexpected iterator: %#v", itr)
}
itr.Close()
}
func TestMergeIterator_Cast_Float(t *testing.T) {
@ -460,13 +457,10 @@ func TestSortedMergeIterator_Boolean(t *testing.T) {
}
func TestSortedMergeIterator_Nil(t *testing.T) {
itr := influxql.NewSortedMergeIterator([]influxql.Iterator{nil}, influxql.IteratorOptions{}).(influxql.FloatIterator)
if p, err := itr.Next(); p != nil {
t.Fatalf("unexpected point: %#v", p)
} else if err != nil {
t.Fatalf("unexpected error: %#v", err)
itr := influxql.NewSortedMergeIterator([]influxql.Iterator{nil}, influxql.IteratorOptions{})
if itr != nil {
t.Fatalf("unexpected iterator: %#v", itr)
}
itr.Close()
}
func TestSortedMergeIterator_Cast_Float(t *testing.T) {

View File

@ -98,6 +98,8 @@ func buildAuxIterators(fields Fields, ic IteratorCreator, opt IteratorOptions) (
input, err := ic.CreateIterator(opt)
if err != nil {
return nil, err
} else if input == nil {
input = &nilFloatIterator{}
}
// Filter out duplicate rows, if required.
@ -226,7 +228,13 @@ func buildExprIterator(expr Expr, ic IteratorCreator, opt IteratorOptions, selec
switch expr := expr.(type) {
case *VarRef:
return ic.CreateIterator(opt)
itr, err := ic.CreateIterator(opt)
if err != nil {
return nil, err
} else if itr == nil {
itr = &nilFloatIterator{}
}
return itr, nil
case *Call:
// FIXME(benbjohnson): Validate that only calls with 1 arg are passed to IC.
@ -291,9 +299,22 @@ func buildExprIterator(expr Expr, ic IteratorCreator, opt IteratorOptions, selec
return newCountIterator(input, opt)
}
}
return ic.CreateIterator(opt)
itr, err := ic.CreateIterator(opt)
if err != nil {
return nil, err
} else if itr == nil {
itr = &nilFloatIterator{}
}
return itr, nil
case "min", "max", "sum", "first", "last", "mean":
return ic.CreateIterator(opt)
itr, err := ic.CreateIterator(opt)
if err != nil {
return nil, err
} else if itr == nil {
itr = &nilFloatIterator{}
}
return itr, nil
case "median":
input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt, false)
if err != nil {

View File

@ -749,18 +749,22 @@ func (e *Engine) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator
}
input := influxql.NewMergeIterator(inputs, opt)
if opt.InterruptCh != nil {
input = influxql.NewInterruptIterator(input, opt.InterruptCh)
if input != nil {
if opt.InterruptCh != nil {
input = influxql.NewInterruptIterator(input, opt.InterruptCh)
}
return influxql.NewCallIterator(input, opt)
}
return influxql.NewCallIterator(input, opt)
return nil, nil
}
itrs, err := e.createVarRefIterator(opt)
if err != nil {
return nil, err
}
itr := influxql.NewSortedMergeIterator(itrs, opt)
if opt.InterruptCh != nil {
if itr != nil && opt.InterruptCh != nil {
itr = influxql.NewInterruptIterator(itr, opt.InterruptCh)
}
return itr, nil