Teach the AuxIterator how to background

Now the AuxIterator will know when it is backgrounded so that it can
stop reading from the primary iterator when all of the child iterators
have been closed.
pull/5979/head
Jonathan A. Sternberg 2016-03-13 12:05:47 -04:00
parent 09a9b3c53e
commit 0042866002
4 changed files with 77 additions and 28 deletions

View File

@ -572,9 +572,10 @@ func (itr *floatIntervalIterator) Next() *FloatPoint {
// floatAuxIterator represents a float implementation of AuxIterator.
type floatAuxIterator struct {
input *bufFloatIterator
output chan *FloatPoint
fields auxIteratorFields
input *bufFloatIterator
output chan *FloatPoint
fields auxIteratorFields
background bool
}
func newFloatAuxIterator(input FloatIterator, seriesKeys SeriesList, opt IteratorOptions) *floatAuxIterator {
@ -585,6 +586,12 @@ func newFloatAuxIterator(input FloatIterator, seriesKeys SeriesList, opt Iterato
}
}
func (itr *floatAuxIterator) Background() {
itr.background = true
itr.Start()
go drainIterator(itr)
}
func (itr *floatAuxIterator) Start() { go itr.stream() }
func (itr *floatAuxIterator) Close() error { return itr.input.Close() }
func (itr *floatAuxIterator) Next() *FloatPoint { return <-itr.output }
@ -622,7 +629,9 @@ func (itr *floatAuxIterator) stream() {
// Send point to output and to each field iterator.
itr.output <- p
itr.fields.send(p)
if ok := itr.fields.send(p); !ok && itr.background {
break
}
}
close(itr.output)
@ -1767,9 +1776,10 @@ func (itr *integerIntervalIterator) Next() *IntegerPoint {
// integerAuxIterator represents a integer implementation of AuxIterator.
type integerAuxIterator struct {
input *bufIntegerIterator
output chan *IntegerPoint
fields auxIteratorFields
input *bufIntegerIterator
output chan *IntegerPoint
fields auxIteratorFields
background bool
}
func newIntegerAuxIterator(input IntegerIterator, seriesKeys SeriesList, opt IteratorOptions) *integerAuxIterator {
@ -1780,6 +1790,12 @@ func newIntegerAuxIterator(input IntegerIterator, seriesKeys SeriesList, opt Ite
}
}
func (itr *integerAuxIterator) Background() {
itr.background = true
itr.Start()
go drainIterator(itr)
}
func (itr *integerAuxIterator) Start() { go itr.stream() }
func (itr *integerAuxIterator) Close() error { return itr.input.Close() }
func (itr *integerAuxIterator) Next() *IntegerPoint { return <-itr.output }
@ -1817,7 +1833,9 @@ func (itr *integerAuxIterator) stream() {
// Send point to output and to each field iterator.
itr.output <- p
itr.fields.send(p)
if ok := itr.fields.send(p); !ok && itr.background {
break
}
}
close(itr.output)
@ -2959,9 +2977,10 @@ func (itr *stringIntervalIterator) Next() *StringPoint {
// stringAuxIterator represents a string implementation of AuxIterator.
type stringAuxIterator struct {
input *bufStringIterator
output chan *StringPoint
fields auxIteratorFields
input *bufStringIterator
output chan *StringPoint
fields auxIteratorFields
background bool
}
func newStringAuxIterator(input StringIterator, seriesKeys SeriesList, opt IteratorOptions) *stringAuxIterator {
@ -2972,6 +2991,12 @@ func newStringAuxIterator(input StringIterator, seriesKeys SeriesList, opt Itera
}
}
func (itr *stringAuxIterator) Background() {
itr.background = true
itr.Start()
go drainIterator(itr)
}
func (itr *stringAuxIterator) Start() { go itr.stream() }
func (itr *stringAuxIterator) Close() error { return itr.input.Close() }
func (itr *stringAuxIterator) Next() *StringPoint { return <-itr.output }
@ -3009,7 +3034,9 @@ func (itr *stringAuxIterator) stream() {
// Send point to output and to each field iterator.
itr.output <- p
itr.fields.send(p)
if ok := itr.fields.send(p); !ok && itr.background {
break
}
}
close(itr.output)
@ -4151,9 +4178,10 @@ func (itr *booleanIntervalIterator) Next() *BooleanPoint {
// booleanAuxIterator represents a boolean implementation of AuxIterator.
type booleanAuxIterator struct {
input *bufBooleanIterator
output chan *BooleanPoint
fields auxIteratorFields
input *bufBooleanIterator
output chan *BooleanPoint
fields auxIteratorFields
background bool
}
func newBooleanAuxIterator(input BooleanIterator, seriesKeys SeriesList, opt IteratorOptions) *booleanAuxIterator {
@ -4164,6 +4192,12 @@ func newBooleanAuxIterator(input BooleanIterator, seriesKeys SeriesList, opt Ite
}
}
func (itr *booleanAuxIterator) Background() {
itr.background = true
itr.Start()
go drainIterator(itr)
}
func (itr *booleanAuxIterator) Start() { go itr.stream() }
func (itr *booleanAuxIterator) Close() error { return itr.input.Close() }
func (itr *booleanAuxIterator) Next() *BooleanPoint { return <-itr.output }
@ -4201,7 +4235,9 @@ func (itr *booleanAuxIterator) stream() {
// Send point to output and to each field iterator.
itr.output <- p
itr.fields.send(p)
if ok := itr.fields.send(p); !ok && itr.background {
break
}
}
close(itr.output)

View File

@ -571,9 +571,10 @@ func (itr *{{$k.name}}IntervalIterator) Next() *{{$k.Name}}Point {
// {{$k.name}}AuxIterator represents a {{$k.name}} implementation of AuxIterator.
type {{$k.name}}AuxIterator struct {
input *buf{{$k.Name}}Iterator
output chan *{{$k.Name}}Point
fields auxIteratorFields
input *buf{{$k.Name}}Iterator
output chan *{{$k.Name}}Point
fields auxIteratorFields
background bool
}
func new{{$k.Name}}AuxIterator(input {{$k.Name}}Iterator, seriesKeys SeriesList, opt IteratorOptions) *{{$k.name}}AuxIterator {
@ -584,6 +585,12 @@ func new{{$k.Name}}AuxIterator(input {{$k.Name}}Iterator, seriesKeys SeriesList,
}
}
func (itr *{{$k.name}}AuxIterator) Background() {
itr.background = true
itr.Start()
go drainIterator(itr)
}
func (itr *{{$k.name}}AuxIterator) Start() { go itr.stream() }
func (itr *{{$k.name}}AuxIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}AuxIterator) Next() *{{$k.Name}}Point { return <-itr.output }
@ -621,7 +628,9 @@ func (itr *{{$k.name}}AuxIterator) stream() {
// Send point to output and to each field iterator.
itr.output <- p
itr.fields.send(p)
if ok := itr.fields.send(p); !ok && itr.background {
break
}
}
close(itr.output)

View File

@ -232,6 +232,10 @@ type AuxIterator interface {
// Start starts writing to the created iterators.
Start()
// Backgrounds the iterator so that, when start is called, it will
// continuously read from the iterator.
Background()
}
// NewAuxIterator returns a new instance of AuxIterator.
@ -336,7 +340,7 @@ func (a auxIteratorFields) iterator(name string) Iterator {
}
// send sends a point to all field iterators.
func (a auxIteratorFields) send(p Point) {
func (a auxIteratorFields) send(p Point) (ok bool) {
values := p.aux()
for i, f := range a {
v := values[i]
@ -349,18 +353,19 @@ func (a auxIteratorFields) send(p Point) {
for _, itr := range f.itrs {
switch itr := itr.(type) {
case *floatChanIterator:
itr.setBuf(p.name(), tags, p.time(), v)
ok = itr.setBuf(p.name(), tags, p.time(), v) || ok
case *integerChanIterator:
itr.setBuf(p.name(), tags, p.time(), v)
ok = itr.setBuf(p.name(), tags, p.time(), v) || ok
case *stringChanIterator:
itr.setBuf(p.name(), tags, p.time(), v)
ok = itr.setBuf(p.name(), tags, p.time(), v) || ok
case *booleanChanIterator:
itr.setBuf(p.name(), tags, p.time(), v)
ok = itr.setBuf(p.name(), tags, p.time(), v) || ok
default:
panic(fmt.Sprintf("invalid aux itr type: %T", itr))
}
}
}
return ok
}
// drainIterator reads all points from an iterator.

View File

@ -121,10 +121,9 @@ func buildAuxIterators(fields Fields, ic IteratorCreator, opt IteratorOptions) (
panic("unreachable")
}
}
aitr.Start()
// Drain primary aux iterator since there is no reader for it.
go drainIterator(aitr)
// Background the primary iterator since there is no reader for it.
aitr.Background()
return itrs, nil
}