From ed151598ffbdcfa5c55798d05d81f100af5303ee Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Tue, 9 Feb 2016 18:03:44 -0500 Subject: [PATCH] Modify the AuxIterator to include a Start method The AuxIterator streams points to the underlying iterators. When it started automatically, race conditions occurred between the stream closing the iterators and creating iterators from the AuxIterator. --- influxql/iterator.gen.go | 40 +++++++---------------------------- influxql/iterator.gen.go.tmpl | 10 ++------- influxql/iterator.go | 30 ++++++++++++-------------- influxql/iterator_test.go | 1 + influxql/select.go | 2 ++ 5 files changed, 27 insertions(+), 56 deletions(-) diff --git a/influxql/iterator.gen.go b/influxql/iterator.gen.go index 2d865f6619..c9b76d67d1 100644 --- a/influxql/iterator.gen.go +++ b/influxql/iterator.gen.go @@ -558,18 +558,12 @@ func newFloatAuxIterator(input FloatIterator, seriesKeys SeriesList, opt Iterato itr := &floatAuxIterator{ input: newBufFloatIterator(input), output: make(chan *FloatPoint, 1), - fields: newAuxIteratorFields(opt), + fields: newAuxIteratorFields(seriesKeys, opt), } - - // Initialize auxiliary fields. - if len(opt.Aux) > 0 { - itr.fields.init(seriesKeys) - } - - go itr.stream() return itr } +func (itr *floatAuxIterator) Start() { go itr.stream() } func (itr *floatAuxIterator) Close() error { return itr.input.Close() } func (itr *floatAuxIterator) Next() *FloatPoint { return <-itr.output } func (itr *floatAuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) } @@ -1453,18 +1447,12 @@ func newIntegerAuxIterator(input IntegerIterator, seriesKeys SeriesList, opt Ite itr := &integerAuxIterator{ input: newBufIntegerIterator(input), output: make(chan *IntegerPoint, 1), - fields: newAuxIteratorFields(opt), + fields: newAuxIteratorFields(seriesKeys, opt), } - - // Initialize auxiliary fields. - if len(opt.Aux) > 0 { - itr.fields.init(seriesKeys) - } - - go itr.stream() return itr } +func (itr *integerAuxIterator) Start() { go itr.stream() } func (itr *integerAuxIterator) Close() error { return itr.input.Close() } func (itr *integerAuxIterator) Next() *IntegerPoint { return <-itr.output } func (itr *integerAuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) } @@ -2348,18 +2336,12 @@ func newStringAuxIterator(input StringIterator, seriesKeys SeriesList, opt Itera itr := &stringAuxIterator{ input: newBufStringIterator(input), output: make(chan *StringPoint, 1), - fields: newAuxIteratorFields(opt), + fields: newAuxIteratorFields(seriesKeys, opt), } - - // Initialize auxiliary fields. - if len(opt.Aux) > 0 { - itr.fields.init(seriesKeys) - } - - go itr.stream() return itr } +func (itr *stringAuxIterator) Start() { go itr.stream() } func (itr *stringAuxIterator) Close() error { return itr.input.Close() } func (itr *stringAuxIterator) Next() *StringPoint { return <-itr.output } func (itr *stringAuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) } @@ -3243,18 +3225,12 @@ func newBooleanAuxIterator(input BooleanIterator, seriesKeys SeriesList, opt Ite itr := &booleanAuxIterator{ input: newBufBooleanIterator(input), output: make(chan *BooleanPoint, 1), - fields: newAuxIteratorFields(opt), + fields: newAuxIteratorFields(seriesKeys, opt), } - - // Initialize auxiliary fields. - if len(opt.Aux) > 0 { - itr.fields.init(seriesKeys) - } - - go itr.stream() return itr } +func (itr *booleanAuxIterator) Start() { go itr.stream() } func (itr *booleanAuxIterator) Close() error { return itr.input.Close() } func (itr *booleanAuxIterator) Next() *BooleanPoint { return <-itr.output } func (itr *booleanAuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) } diff --git a/influxql/iterator.gen.go.tmpl b/influxql/iterator.gen.go.tmpl index 6fc3efe0c5..d877388466 100644 --- a/influxql/iterator.gen.go.tmpl +++ b/influxql/iterator.gen.go.tmpl @@ -560,18 +560,12 @@ func new{{.Name}}AuxIterator(input {{.Name}}Iterator, seriesKeys SeriesList, opt itr := &{{.name}}AuxIterator{ input: newBuf{{.Name}}Iterator(input), output: make(chan *{{.Name}}Point, 1), - fields: newAuxIteratorFields(opt), + fields: newAuxIteratorFields(seriesKeys, opt), } - - // Initialize auxiliary fields. - if len(opt.Aux) > 0 { - itr.fields.init(seriesKeys) - } - - go itr.stream() return itr } +func (itr *{{.name}}AuxIterator) Start() { go itr.stream() } func (itr *{{.name}}AuxIterator) Close() error { return itr.input.Close() } func (itr *{{.name}}AuxIterator) Next() *{{.Name}}Point { return <-itr.output } func (itr *{{.name}}AuxIterator) Iterator(name string) Iterator { return itr.fields.iterator(name) } diff --git a/influxql/iterator.go b/influxql/iterator.go index 78c177dd6d..b4ea7afa49 100644 --- a/influxql/iterator.go +++ b/influxql/iterator.go @@ -207,6 +207,9 @@ type AuxIterator interface { // Auxilary iterator Iterator(name string) Iterator + + // Start starts writing to the created iterators. + Start() } // NewAuxIterator returns a new instance of AuxIterator. @@ -236,10 +239,20 @@ type auxIteratorField struct { type auxIteratorFields []*auxIteratorField // newAuxIteratorFields returns a new instance of auxIteratorFields from a list of field names. -func newAuxIteratorFields(opt IteratorOptions) auxIteratorFields { +func newAuxIteratorFields(seriesKeys SeriesList, opt IteratorOptions) auxIteratorFields { fields := make(auxIteratorFields, len(opt.Aux)) for i, name := range opt.Aux { fields[i] = &auxIteratorField{name: name, opt: opt} + for _, s := range seriesKeys { + aux := s.Aux[i] + if aux == Unknown { + continue + } + + if fields[i].typ == Unknown || aux < fields[i].typ { + fields[i].typ = aux + } + } } return fields } @@ -252,21 +265,6 @@ func (a auxIteratorFields) close() { } } -// init initializes all auxilary fields with initial points. -func (a auxIteratorFields) init(seriesKeys SeriesList) { - for _, s := range seriesKeys { - for i, aux := range s.Aux { - if aux == Unknown { - continue - } - - if a[i].typ == Unknown || aux < a[i].typ { - a[i].typ = aux - } - } - } -} - // iterator creates a new iterator for a named auxilary field. func (a auxIteratorFields) iterator(name string) Iterator { for _, f := range a { diff --git a/influxql/iterator_test.go b/influxql/iterator_test.go index ef9a1949f7..8dcda80f9f 100644 --- a/influxql/iterator_test.go +++ b/influxql/iterator_test.go @@ -611,6 +611,7 @@ func TestFloatAuxIterator(t *testing.T) { }, influxql.IteratorOptions{Aux: []string{"f0", "f1"}}, ) + itr.Start() itrs := []influxql.Iterator{ itr, diff --git a/influxql/select.go b/influxql/select.go index 48863ec696..938cf0150a 100644 --- a/influxql/select.go +++ b/influxql/select.go @@ -111,6 +111,7 @@ func buildAuxIterators(fields Fields, ic IteratorCreator, opt IteratorOptions) ( panic("unreachable") } } + aitr.Start() // Drain primary aux iterator since there is no reader for it. go drainIterator(aitr) @@ -168,6 +169,7 @@ func buildFieldIterators(fields Fields, ic IteratorCreator, opt IteratorOptions) } itrs[i] = itr } + aitr.Start() return nil }(); err != nil {