Correct the AuxIterator test and adding some additional locks

The additional locks shouldn't be necessary due to how the code is used,
but should prevent any potential data races in case we accidentally do
something bad.
pull/5196/head
Jonathan A. Sternberg 2016-02-09 18:59:59 -05:00 committed by Ben Johnson
parent ed151598ff
commit 98810a363a
4 changed files with 32 additions and 23 deletions

View File

@ -555,12 +555,11 @@ type floatAuxIterator struct {
}
func newFloatAuxIterator(input FloatIterator, seriesKeys SeriesList, opt IteratorOptions) *floatAuxIterator {
itr := &floatAuxIterator{
return &floatAuxIterator{
input: newBufFloatIterator(input),
output: make(chan *FloatPoint, 1),
fields: newAuxIteratorFields(seriesKeys, opt),
}
return itr
}
func (itr *floatAuxIterator) Start() { go itr.stream() }
@ -576,7 +575,7 @@ func (itr *floatAuxIterator) CreateIterator(opt IteratorOptions) (Iterator, erro
switch expr := expr.(type) {
case *VarRef:
return itr.fields.iterator(expr.Val), nil
return itr.Iterator(expr.Val), nil
default:
panic(fmt.Sprintf("invalid expression type for an aux iterator: %T", expr))
}
@ -1444,12 +1443,11 @@ type integerAuxIterator struct {
}
func newIntegerAuxIterator(input IntegerIterator, seriesKeys SeriesList, opt IteratorOptions) *integerAuxIterator {
itr := &integerAuxIterator{
return &integerAuxIterator{
input: newBufIntegerIterator(input),
output: make(chan *IntegerPoint, 1),
fields: newAuxIteratorFields(seriesKeys, opt),
}
return itr
}
func (itr *integerAuxIterator) Start() { go itr.stream() }
@ -1465,7 +1463,7 @@ func (itr *integerAuxIterator) CreateIterator(opt IteratorOptions) (Iterator, er
switch expr := expr.(type) {
case *VarRef:
return itr.fields.iterator(expr.Val), nil
return itr.Iterator(expr.Val), nil
default:
panic(fmt.Sprintf("invalid expression type for an aux iterator: %T", expr))
}
@ -2333,12 +2331,11 @@ type stringAuxIterator struct {
}
func newStringAuxIterator(input StringIterator, seriesKeys SeriesList, opt IteratorOptions) *stringAuxIterator {
itr := &stringAuxIterator{
return &stringAuxIterator{
input: newBufStringIterator(input),
output: make(chan *StringPoint, 1),
fields: newAuxIteratorFields(seriesKeys, opt),
}
return itr
}
func (itr *stringAuxIterator) Start() { go itr.stream() }
@ -2354,7 +2351,7 @@ func (itr *stringAuxIterator) CreateIterator(opt IteratorOptions) (Iterator, err
switch expr := expr.(type) {
case *VarRef:
return itr.fields.iterator(expr.Val), nil
return itr.Iterator(expr.Val), nil
default:
panic(fmt.Sprintf("invalid expression type for an aux iterator: %T", expr))
}
@ -3222,12 +3219,11 @@ type booleanAuxIterator struct {
}
func newBooleanAuxIterator(input BooleanIterator, seriesKeys SeriesList, opt IteratorOptions) *booleanAuxIterator {
itr := &booleanAuxIterator{
return &booleanAuxIterator{
input: newBufBooleanIterator(input),
output: make(chan *BooleanPoint, 1),
fields: newAuxIteratorFields(seriesKeys, opt),
}
return itr
}
func (itr *booleanAuxIterator) Start() { go itr.stream() }
@ -3243,7 +3239,7 @@ func (itr *booleanAuxIterator) CreateIterator(opt IteratorOptions) (Iterator, er
switch expr := expr.(type) {
case *VarRef:
return itr.fields.iterator(expr.Val), nil
return itr.Iterator(expr.Val), nil
default:
panic(fmt.Sprintf("invalid expression type for an aux iterator: %T", expr))
}

View File

@ -557,12 +557,11 @@ type {{.name}}AuxIterator struct {
}
func new{{.Name}}AuxIterator(input {{.Name}}Iterator, seriesKeys SeriesList, opt IteratorOptions) *{{.name}}AuxIterator {
itr := &{{.name}}AuxIterator{
return &{{.name}}AuxIterator{
input: newBuf{{.Name}}Iterator(input),
output: make(chan *{{.Name}}Point, 1),
fields: newAuxIteratorFields(seriesKeys, opt),
}
return itr
}
func (itr *{{.name}}AuxIterator) Start() { go itr.stream() }
@ -578,7 +577,7 @@ func (itr *{{.name}}AuxIterator) CreateIterator(opt IteratorOptions) (Iterator,
switch expr := expr.(type) {
case *VarRef:
return itr.fields.iterator(expr.Val), nil
return itr.Iterator(expr.Val), nil
default:
panic(fmt.Sprintf("invalid expression type for an aux iterator: %T", expr))
}

View File

@ -3,6 +3,7 @@ package influxql
import (
"errors"
"fmt"
"sync"
"time"
)
@ -233,9 +234,24 @@ type auxIteratorField struct {
name string // field name
typ DataType // detected data type
itrs []Iterator // auxillary iterators
mu sync.Mutex
opt IteratorOptions
}
func (f *auxIteratorField) append(itr Iterator) {
f.mu.Lock()
defer f.mu.Unlock()
f.itrs = append(f.itrs, itr)
}
func (f *auxIteratorField) close() {
f.mu.Lock()
defer f.mu.Unlock()
for _, itr := range f.itrs {
itr.Close()
}
}
type auxIteratorFields []*auxIteratorField
// newAuxIteratorFields returns a new instance of auxIteratorFields from a list of field names.
@ -259,9 +275,7 @@ func newAuxIteratorFields(seriesKeys SeriesList, opt IteratorOptions) auxIterato
func (a auxIteratorFields) close() {
for _, f := range a {
for _, itr := range f.itrs {
itr.Close()
}
f.close()
}
}
@ -278,19 +292,19 @@ func (a auxIteratorFields) iterator(name string) Iterator {
switch f.typ {
case Float:
itr := &floatChanIterator{c: make(chan *FloatPoint, 1)}
f.itrs = append(f.itrs, itr)
f.append(itr)
return itr
case Integer:
itr := &integerChanIterator{c: make(chan *IntegerPoint, 1)}
f.itrs = append(f.itrs, itr)
f.append(itr)
return itr
case String:
itr := &stringChanIterator{c: make(chan *StringPoint, 1)}
f.itrs = append(f.itrs, itr)
f.append(itr)
return itr
case Boolean:
itr := &booleanChanIterator{c: make(chan *BooleanPoint, 1)}
f.itrs = append(f.itrs, itr)
f.append(itr)
return itr
default:
break

View File

@ -611,7 +611,6 @@ func TestFloatAuxIterator(t *testing.T) {
},
influxql.IteratorOptions{Aux: []string{"f0", "f1"}},
)
itr.Start()
itrs := []influxql.Iterator{
itr,
@ -619,6 +618,7 @@ func TestFloatAuxIterator(t *testing.T) {
itr.Iterator("f1"),
itr.Iterator("f0"),
}
itr.Start()
if a := Iterators(itrs).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{