Modify the AuxIterator to include a Start method

The AuxIterator streams points to the underlying iterators. When it
started automatically, race conditions occurred between the stream
closing the iterators and creating iterators from the AuxIterator.
pull/5196/head
Jonathan A. Sternberg 2016-02-09 18:03:44 -05:00 committed by Ben Johnson
parent b3d5aa82b7
commit ed151598ff
5 changed files with 27 additions and 56 deletions

View File

@ -558,18 +558,12 @@ func newFloatAuxIterator(input FloatIterator, seriesKeys SeriesList, opt Iterato
itr := &floatAuxIterator{
input: newBufFloatIterator(input),
output: make(chan *FloatPoint, 1),
fields: newAuxIteratorFields(opt),
fields: newAuxIteratorFields(seriesKeys, opt),
}
// Initialize auxiliary fields.
if len(opt.Aux) > 0 {
itr.fields.init(seriesKeys)
}
go itr.stream()
return itr
}
func (itr *floatAuxIterator) Start() { go itr.stream() }
func (itr *floatAuxIterator) Close() error { return itr.input.Close() }
func (itr *floatAuxIterator) Next() *FloatPoint { return <-itr.output }
func (itr *floatAuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) }
@ -1453,18 +1447,12 @@ func newIntegerAuxIterator(input IntegerIterator, seriesKeys SeriesList, opt Ite
itr := &integerAuxIterator{
input: newBufIntegerIterator(input),
output: make(chan *IntegerPoint, 1),
fields: newAuxIteratorFields(opt),
fields: newAuxIteratorFields(seriesKeys, opt),
}
// Initialize auxiliary fields.
if len(opt.Aux) > 0 {
itr.fields.init(seriesKeys)
}
go itr.stream()
return itr
}
func (itr *integerAuxIterator) Start() { go itr.stream() }
func (itr *integerAuxIterator) Close() error { return itr.input.Close() }
func (itr *integerAuxIterator) Next() *IntegerPoint { return <-itr.output }
func (itr *integerAuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) }
@ -2348,18 +2336,12 @@ func newStringAuxIterator(input StringIterator, seriesKeys SeriesList, opt Itera
itr := &stringAuxIterator{
input: newBufStringIterator(input),
output: make(chan *StringPoint, 1),
fields: newAuxIteratorFields(opt),
fields: newAuxIteratorFields(seriesKeys, opt),
}
// Initialize auxiliary fields.
if len(opt.Aux) > 0 {
itr.fields.init(seriesKeys)
}
go itr.stream()
return itr
}
func (itr *stringAuxIterator) Start() { go itr.stream() }
func (itr *stringAuxIterator) Close() error { return itr.input.Close() }
func (itr *stringAuxIterator) Next() *StringPoint { return <-itr.output }
func (itr *stringAuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) }
@ -3243,18 +3225,12 @@ func newBooleanAuxIterator(input BooleanIterator, seriesKeys SeriesList, opt Ite
itr := &booleanAuxIterator{
input: newBufBooleanIterator(input),
output: make(chan *BooleanPoint, 1),
fields: newAuxIteratorFields(opt),
fields: newAuxIteratorFields(seriesKeys, opt),
}
// Initialize auxiliary fields.
if len(opt.Aux) > 0 {
itr.fields.init(seriesKeys)
}
go itr.stream()
return itr
}
func (itr *booleanAuxIterator) Start() { go itr.stream() }
func (itr *booleanAuxIterator) Close() error { return itr.input.Close() }
func (itr *booleanAuxIterator) Next() *BooleanPoint { return <-itr.output }
func (itr *booleanAuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) }

View File

@ -560,18 +560,12 @@ func new{{.Name}}AuxIterator(input {{.Name}}Iterator, seriesKeys SeriesList, opt
itr := &{{.name}}AuxIterator{
input: newBuf{{.Name}}Iterator(input),
output: make(chan *{{.Name}}Point, 1),
fields: newAuxIteratorFields(opt),
fields: newAuxIteratorFields(seriesKeys, opt),
}
// Initialize auxiliary fields.
if len(opt.Aux) > 0 {
itr.fields.init(seriesKeys)
}
go itr.stream()
return itr
}
func (itr *{{.name}}AuxIterator) Start() { go itr.stream() }
func (itr *{{.name}}AuxIterator) Close() error { return itr.input.Close() }
func (itr *{{.name}}AuxIterator) Next() *{{.Name}}Point { return <-itr.output }
func (itr *{{.name}}AuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) }

View File

@ -207,6 +207,9 @@ type AuxIterator interface {
// Auxilary iterator
Iterator(name string) Iterator
// Start starts writing to the created iterators.
Start()
}
// NewAuxIterator returns a new instance of AuxIterator.
@ -236,10 +239,20 @@ type auxIteratorField struct {
type auxIteratorFields []*auxIteratorField
// newAuxIteratorFields returns a new instance of auxIteratorFields from a list of field names.
func newAuxIteratorFields(opt IteratorOptions) auxIteratorFields {
func newAuxIteratorFields(seriesKeys SeriesList, opt IteratorOptions) auxIteratorFields {
fields := make(auxIteratorFields, len(opt.Aux))
for i, name := range opt.Aux {
fields[i] = &auxIteratorField{name: name, opt: opt}
for _, s := range seriesKeys {
aux := s.Aux[i]
if aux == Unknown {
continue
}
if fields[i].typ == Unknown || aux < fields[i].typ {
fields[i].typ = aux
}
}
}
return fields
}
@ -252,21 +265,6 @@ func (a auxIteratorFields) close() {
}
}
// init initializes all auxilary fields with initial points.
func (a auxIteratorFields) init(seriesKeys SeriesList) {
for _, s := range seriesKeys {
for i, aux := range s.Aux {
if aux == Unknown {
continue
}
if a[i].typ == Unknown || aux < a[i].typ {
a[i].typ = aux
}
}
}
}
// iterator creates a new iterator for a named auxilary field.
func (a auxIteratorFields) iterator(name string) Iterator {
for _, f := range a {

View File

@ -611,6 +611,7 @@ func TestFloatAuxIterator(t *testing.T) {
},
influxql.IteratorOptions{Aux: []string{"f0", "f1"}},
)
itr.Start()
itrs := []influxql.Iterator{
itr,

View File

@ -111,6 +111,7 @@ func buildAuxIterators(fields Fields, ic IteratorCreator, opt IteratorOptions) (
panic("unreachable")
}
}
aitr.Start()
// Drain primary aux iterator since there is no reader for it.
go drainIterator(aitr)
@ -168,6 +169,7 @@ func buildFieldIterators(fields Fields, ic IteratorCreator, opt IteratorOptions)
}
itrs[i] = itr
}
aitr.Start()
return nil
}(); err != nil {